feat: Introduce workflow level concurrency#311
feat: Introduce workflow level concurrency#311octoper wants to merge 3 commits intoopenworkflowdev:mainfrom
Conversation
There was a problem hiding this comment.
Pull request overview
Adds workflow-level concurrency to OpenWorkflow by persisting a resolved (concurrencyKey, concurrencyLimit) on each run at enqueue time and enforcing bucket capacity during backend claim/dequeue, alongside existing worker-slot concurrency.
Changes:
- Extend workflow specs with optional
concurrency(static or input-derived key/limit) and resolve/validate it in the client before creating a run. - Persist
concurrency_key/concurrency_limitin SQLite/Postgres via migrations + indexes, and enforce limits duringclaimWorkflowRun. - Add backend/worker/client tests plus user-facing documentation (README + docs + architecture notes).
Reviewed changes
Copilot reviewed 20 out of 20 changed files in this pull request and generated 6 comments.
Show a summary per file
| File | Description |
|---|---|
| packages/openworkflow/workflow.ts | Adds workflow spec concurrency types/config. |
| packages/openworkflow/index.ts | Re-exports new concurrency types. |
| packages/openworkflow/core/workflow.ts | Adds persisted concurrency fields to WorkflowRun. |
| packages/openworkflow/backend.ts | Extends CreateWorkflowRunParams with concurrency metadata. |
| packages/openworkflow/client.ts | Resolves + validates concurrency at enqueue time and persists it on runs. |
| packages/openworkflow/sqlite/sqlite.ts | SQLite migration adds concurrency columns + index. |
| packages/openworkflow/sqlite/backend.ts | Persists concurrency metadata; enforces limit in claim query; adds normalization helper. |
| packages/openworkflow/postgres/postgres.ts | Postgres migration adds concurrency columns + index. |
| packages/openworkflow/postgres/backend.ts | Persists concurrency metadata; enforces limit in claim query; adds normalization helper. |
| packages/openworkflow/backend.testsuite.ts | Adds cross-backend tests for concurrency persistence + enforcement. |
| packages/openworkflow/worker.test.ts | Adds integration tests showing worker-slot concurrency still respects workflow concurrency. |
| packages/openworkflow/client.test.ts | Tests concurrency resolution/validation + enqueue failure behavior. |
| packages/openworkflow/sqlite/sqlite.test.ts / packages/openworkflow/postgres/postgres.test.ts | Verifies migrations create required columns + index. |
| packages/docs/docs/workflows.mdx / packages/docs/docs/workers.mdx / packages/openworkflow/README.md / ARCHITECTURE.md | Documents workflow concurrency behavior and scope. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
There was a problem hiding this comment.
Pull request overview
Copilot reviewed 21 out of 21 changed files in this pull request and generated no new comments.
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
There was a problem hiding this comment.
Pull request overview
Copilot reviewed 21 out of 21 changed files in this pull request and generated no new comments.
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
Summary
This PR adds workflow-level concurrency, It introduces input-based concurrency buckets (similar to Inngest-style concurrency) so runs can be constrained by a resolved key/limit at enqueue time, while still working alongside worker-slot concurrency.
Motivation
Worker concurrency limits total in-flight runs per worker, but we also need per-workflow/per-tenant style limits (eg.
tenant:acmemax 1 or max N active runs), resolved from validated workflow input and enforced centrally in the backend.Disclaimer
this is just a quick shot on the workflow level concurrency coded mainly by AI and small intervention by me, I want to first use it in a project to make sure everything works as expected and adjust it or scrape it off completely 😅