diff --git a/ARCHITECTURE.md b/ARCHITECTURE.md index af2631bd..774e4aee 100644 --- a/ARCHITECTURE.md +++ b/ARCHITECTURE.md @@ -297,15 +297,60 @@ When the worker encounters this, it executes all steps within the `Promise.all` concurrently. It waits for all of them to complete before proceeding. Each step attempt is persisted individually as a `step_attempt`. -### 5.2. Workflow Concurrency +### 5.2. Worker Concurrency Workers are configured with a concurrency limit (e.g., 10). A worker will maintain up to 10 in-flight workflow runs simultaneously. It polls for new work -only when it has available capacity. The Backend's atomic `dequeue` operation -(`FOR UPDATE SKIP LOCKED`) ensures that multiple workers can poll the same table -without race conditions or processing the same run twice. +only when it has available capacity. Claim atomicity is backend-specific: -### 5.3. Handling Crashes During Parallel Execution +- Postgres uses `FOR UPDATE SKIP LOCKED` plus advisory locks for constrained + buckets. +- SQLite uses transaction-level single-writer locking (`BEGIN IMMEDIATE`) to + serialize claim writes. + +### 5.3. Workflow-Run Concurrency + +In addition to worker-slot concurrency, workflows can define a per-run +concurrency policy in the workflow spec: + +```ts +defineWorkflow( + { + name: "process-order", + concurrency: { + key: ({ input }) => `tenant:${input.tenantId}`, + limit: ({ input }) => input.maxConcurrentOrders, + }, + }, + async ({ step }) => { + // ... + }, +); +``` + +`key` and `limit` can each be either static values (`string`/`number`) or +functions of the validated workflow input, and `key` is optional. They are +resolved once when the run is created and persisted on `workflow_runs`. +Resolved keys are stored verbatim; only empty/all-whitespace keys are rejected. +When `key` is omitted, the run uses the default bucket for +`namespace_id + workflow_name + version`. + +During claim/dequeue, a run is claimable only when the number of active leased +`running` runs in the same bucket is below the run's `limit`. The bucket scope +is: + +- `namespace_id` +- `workflow_name` +- `version` (version-aware buckets) +- `concurrency_key` (nullable for the default bucket) + +`pending`, `sleeping`, and expired-lease `running` runs do not consume +concurrency slots. +For active runs in a bucket (`pending`, `running`), the resolved +`concurrency_limit` is required to be consistent; conflicting limits are +rejected at run creation. + +### 5.4. Handling Crashes During Parallel Execution The `availableAt` heartbeat mechanism provides robust recovery. If a worker crashes while executing parallel steps, its heartbeat stops. The `availableAt` diff --git a/packages/docs/docs/workers.mdx b/packages/docs/docs/workers.mdx index 0b3b05bc..e0df82c8 100644 --- a/packages/docs/docs/workers.mdx +++ b/packages/docs/docs/workers.mdx @@ -49,7 +49,7 @@ When a worker picks up a workflow: 5. **Execute**: New steps are executed and their results are stored 6. **Complete**: The workflow status is updated to `completed` or `failed` -## Concurrency +## Worker Concurrency Workers can process multiple workflow runs simultaneously. Configure concurrency in your `openworkflow.config.ts`: @@ -88,6 +88,40 @@ bunx @openworkflow/cli worker start --concurrency 10 capacity. +## Workflow Concurrency + +Workflow specs can also define concurrency buckets that are enforced at claim +time: + +```ts +defineWorkflow( + { + name: "process-order", + concurrency: { + key: ({ input }) => `tenant:${input.tenantId}`, // optional + limit: ({ input }) => input.maxConcurrentOrders, // or: 5 + }, + }, + async ({ step }) => { + // ... + }, +); +``` + +Workers will only claim a run when the bucket has capacity. Bucket scope is: + +- namespace +- workflow name +- workflow version +- resolved concurrency key (or default bucket when key is omitted) + +Only active leased `running` runs consume workflow-concurrency slots. +Resolved keys are stored verbatim; only empty/all-whitespace keys are rejected. +Sleeping runs are non-consuming until they are claimed again as actively leased +`running` runs. +Within active `pending` and actively leased `running` runs for the same +workflow+version+key bucket, the resolved `limit` must remain consistent. + ## Heartbeats and Crash Recovery Workers maintain their claim on workflow runs through a heartbeat mechanism: @@ -134,6 +168,8 @@ Workers coordinate through the database: - Each workflow run is claimed by exactly one worker at a time - Workers use atomic database operations to prevent duplicate processing - If a worker crashes, its workflows become available to other workers +- SQLite relies on transaction-level single-writer locking (`BEGIN IMMEDIATE`) + while Postgres uses row locks plus advisory locks for constrained buckets ## Graceful Shutdown diff --git a/packages/docs/docs/workflows.mdx b/packages/docs/docs/workflows.mdx index 198639cf..b9fdf4f3 100644 --- a/packages/docs/docs/workflows.mdx +++ b/packages/docs/docs/workflows.mdx @@ -189,6 +189,45 @@ defineWorkflow( Any `retryPolicy` fields you omit fall back to defaults. See [Retries](/docs/retries) for the full behavior and defaults. +### Concurrency (Optional) + +Control how many active leased `running` runs are allowed for a workflow bucket: + +```ts +defineWorkflow( + { + name: "process-order", + concurrency: { + key: ({ input }) => `tenant:${input.tenantId}`, // optional + limit: ({ input }) => input.maxConcurrentOrders, // or: 5 + }, + }, + async ({ input, step }) => { + // ... + }, +); +``` + +- `key` is optional; when set it can be a string or `({ input }) => string` +- `limit` can be a number or a function `({ input }) => number` +- if provided, key must resolve to a non-empty string +- limit must resolve to a positive integer +- resolved keys are stored verbatim; only empty/all-whitespace keys are rejected +- within active runs (`pending`/`running`) for the same + workflow+version+key bucket, `limit` must remain consistent + +When concurrency is configured, runs in the same bucket are constrained by: + +- namespace +- workflow name +- workflow version +- resolved `key` (or the default bucket when key is omitted) + +Only actively leased `running` runs consume slots. `pending`, `sleeping`, and +expired-lease runs do not. +Sleeping runs become slot-consuming only after they are claimed again as +actively leased `running` runs. + ### Idempotency Key (Optional) You can prevent duplicate run creation by providing an idempotency key, though diff --git a/packages/openworkflow/README.md b/packages/openworkflow/README.md index b8d9a93b..34b085a1 100644 --- a/packages/openworkflow/README.md +++ b/packages/openworkflow/README.md @@ -67,11 +67,42 @@ For more details, check out our [docs](https://openworkflow.dev/docs). - ✅ **Long pauses** - Sleep for seconds or months - ✅ **Scheduled runs** - Start workflows at a specific time - ✅ **Parallel execution** - Run steps concurrently +- ✅ **Workflow concurrency** - Limit active runs by bucket (optional key) - ✅ **Idempotency keys** - Deduplicate repeated run requests (24h window) - ✅ **No extra servers** - Uses your existing database - ✅ **Dashboard included** - Monitor and debug workflows - ✅ **Production ready** - PostgreSQL and SQLite support +## Workflow Concurrency + +You can limit active leased `running` runs per workflow bucket: + +```ts +const workflow = defineWorkflow( + { + name: "process-order", + concurrency: { + key: ({ input }) => `tenant:${input.tenantId}`, // optional + limit: ({ input }) => input.maxConcurrentOrders, // or: 5 + }, + }, + async ({ step }) => { + // ... + }, +); +``` + +`limit` must resolve to a positive integer. If `key` is provided, it must +resolve to a non-empty string. Invalid values fail run creation. +When `key` is omitted, runs use the default bucket for +`namespace + workflow + version`. +Keys are stored verbatim (for example, `" foo "` and `"foo"` are different +concurrency keys); only empty or all-whitespace keys are rejected. +Sleeping runs do not consume workflow-concurrency slots until they are claimed +again as actively leased `running` runs. +For a given active bucket (`workflow + version + key`), the resolved `limit` +must stay consistent across `pending`/`running` runs. + ## Documentation - [Documentation](https://openworkflow.dev/docs) diff --git a/packages/openworkflow/backend-concurrency.ts b/packages/openworkflow/backend-concurrency.ts new file mode 100644 index 00000000..114969ef --- /dev/null +++ b/packages/openworkflow/backend-concurrency.ts @@ -0,0 +1,114 @@ +import type { CreateWorkflowRunParams } from "./backend.js"; + +const INVALID_CONCURRENCY_KEY_TYPE_ERROR = + 'Invalid workflow concurrency metadata: "concurrencyKey" must be a string or null.'; +export const INVALID_CONCURRENCY_KEY_VALUE_ERROR = + 'Invalid workflow concurrency metadata: "concurrencyKey" must be a non-empty string when provided.'; +const INVALID_CONCURRENCY_LIMIT_TYPE_ERROR = + 'Invalid workflow concurrency metadata: "concurrencyLimit" must be a number or null.'; +export const INVALID_CONCURRENCY_LIMIT_VALUE_ERROR = + 'Invalid workflow concurrency metadata: "concurrencyLimit" must be a positive integer or null.'; +const INVALID_CONCURRENCY_PAIR_ERROR = + 'Invalid workflow concurrency metadata: "concurrencyLimit" must be set when "concurrencyKey" is provided.'; +export const CONCURRENCY_LIMIT_MISMATCH_ERROR = + 'Invalid workflow concurrency metadata: active runs in the same bucket must use the same "concurrencyLimit".'; + +/** + * Bucket identity for workflow-level concurrency. + */ +export interface ConcurrencyBucket { + workflowName: string; + version: string | null; + key: string | null; + limit: number; +} + +/** + * Normalize and validate workflow concurrency metadata passed to create calls. + * This protects direct backend callers that bypass client-side validation. + * @param params - Workflow run creation params + * @returns Params with normalized concurrency fields + * @throws {Error} When concurrency metadata has invalid shape or values + */ +export function normalizeCreateWorkflowRunParams( + params: CreateWorkflowRunParams, +): CreateWorkflowRunParams { + const rawParams = params as unknown as Record; + const rawConcurrencyKey = rawParams["concurrencyKey"]; + const rawConcurrencyLimit = rawParams["concurrencyLimit"]; + + if (rawConcurrencyKey === undefined && rawConcurrencyLimit === undefined) { + return { + ...params, + concurrencyKey: null, + concurrencyLimit: null, + }; + } + + if ( + rawConcurrencyKey !== undefined && + rawConcurrencyKey !== null && + typeof rawConcurrencyKey !== "string" + ) { + throw new Error(INVALID_CONCURRENCY_KEY_TYPE_ERROR); + } + + if ( + rawConcurrencyLimit !== undefined && + rawConcurrencyLimit !== null && + typeof rawConcurrencyLimit !== "number" + ) { + throw new Error(INVALID_CONCURRENCY_LIMIT_TYPE_ERROR); + } + + const concurrencyKey = + rawConcurrencyKey === undefined ? null : rawConcurrencyKey; + const concurrencyLimit = + rawConcurrencyLimit === undefined ? null : rawConcurrencyLimit; + + if (concurrencyKey !== null && concurrencyLimit === null) { + throw new Error(INVALID_CONCURRENCY_PAIR_ERROR); + } + + if ( + typeof concurrencyKey === "string" && + concurrencyKey.trim().length === 0 + ) { + throw new Error(INVALID_CONCURRENCY_KEY_VALUE_ERROR); + } + + if ( + typeof concurrencyLimit === "number" && + (!Number.isFinite(concurrencyLimit) || + !Number.isInteger(concurrencyLimit) || + concurrencyLimit <= 0) + ) { + throw new Error(INVALID_CONCURRENCY_LIMIT_VALUE_ERROR); + } + + return { + ...params, + concurrencyKey, + concurrencyLimit, + }; +} + +/** + * Return bucket identity for constrained runs, otherwise null. + * @param params - Normalized workflow run creation params + * @returns Concurrency bucket or null for unconstrained runs + */ +export function toConcurrencyBucket( + params: CreateWorkflowRunParams, +): ConcurrencyBucket | null { + if (params.concurrencyLimit === null) { + return null; + } + + return { + workflowName: params.workflowName, + version: params.version, + key: params.concurrencyKey, + limit: params.concurrencyLimit, + }; +} diff --git a/packages/openworkflow/backend.testsuite.ts b/packages/openworkflow/backend.testsuite.ts index 41dfd9bf..c9b52d3d 100644 --- a/packages/openworkflow/backend.testsuite.ts +++ b/packages/openworkflow/backend.testsuite.ts @@ -1,5 +1,10 @@ +import { + CONCURRENCY_LIMIT_MISMATCH_ERROR, + INVALID_CONCURRENCY_KEY_VALUE_ERROR, + INVALID_CONCURRENCY_LIMIT_VALUE_ERROR, +} from "./backend-concurrency.js"; import { DEFAULT_RUN_IDEMPOTENCY_PERIOD_MS } from "./backend.js"; -import type { Backend } from "./backend.js"; +import type { Backend, CreateWorkflowRunParams } from "./backend.js"; import type { StepAttempt } from "./core/step.js"; import type { WorkflowRun } from "./core/workflow.js"; import { DEFAULT_WORKFLOW_RETRY_POLICY } from "./workflow.js"; @@ -30,6 +35,13 @@ export function testBackend(options: TestBackendOptions): void { ...DEFAULT_WORKFLOW_RETRY_POLICY, maximumAttempts: 3, } as const; + const SHORT_WORKFLOW_RETRY_POLICY = { + ...DEFAULT_WORKFLOW_RETRY_POLICY, + initialInterval: "40ms", + maximumInterval: "40ms", + backoffCoefficient: 1, + maximumAttempts: 2, + } as const; describe("Backend", () => { let backend: Backend; @@ -51,6 +63,8 @@ export function testBackend(options: TestBackendOptions): void { version: randomUUID(), status: "pending", idempotencyKey: randomUUID(), + concurrencyKey: randomUUID(), + concurrencyLimit: 3, config: { key: "val" }, context: { key: "val" }, input: { key: "val" }, @@ -73,6 +87,8 @@ export function testBackend(options: TestBackendOptions): void { workflowName: expected.workflowName, version: expected.version, idempotencyKey: expected.idempotencyKey, + concurrencyKey: expected.concurrencyKey, + concurrencyLimit: expected.concurrencyLimit, input: expected.input, config: expected.config, context: expected.context, @@ -97,6 +113,8 @@ export function testBackend(options: TestBackendOptions): void { workflowName: expected.workflowName, version: null, idempotencyKey: null, + concurrencyKey: null, + concurrencyLimit: null, input: null, config: {}, context: null, @@ -105,12 +123,298 @@ export function testBackend(options: TestBackendOptions): void { }); expect(createdMin.version).toBeNull(); expect(createdMin.idempotencyKey).toBeNull(); + expect(createdMin.concurrencyKey).toBeNull(); + expect(createdMin.concurrencyLimit).toBeNull(); expect(createdMin.input).toBeNull(); expect(createdMin.context).toBeNull(); expect(deltaSeconds(createdMin.availableAt)).toBeLessThan(1); // defaults to NOW() expect(createdMin.deadlineAt).toBeNull(); }); + test("stores workflow concurrency metadata when provided", async () => { + const workflowName = randomUUID(); + const version = "v1"; + const concurrencyKey = "tenant:acme"; + const concurrencyLimit = 5; + + const created = await backend.createWorkflowRun({ + workflowName, + version, + idempotencyKey: null, + concurrencyKey, + concurrencyLimit, + input: null, + config: {}, + context: null, + availableAt: null, + deadlineAt: null, + }); + + expect(created.workflowName).toBe(workflowName); + expect(created.version).toBe(version); + expect(created.concurrencyKey).toBe(concurrencyKey); + expect(created.concurrencyLimit).toBe(concurrencyLimit); + }); + + test("normalizes omitted concurrency metadata from raw JS callers", async () => { + const rawParams = { + workflowName: randomUUID(), + version: null, + idempotencyKey: null, + input: null, + config: {}, + context: null, + availableAt: null, + deadlineAt: null, + } as unknown as CreateWorkflowRunParams; + + const created = await backend.createWorkflowRun(rawParams); + expect(created.concurrencyKey).toBeNull(); + expect(created.concurrencyLimit).toBeNull(); + }); + + test("rejects key-only workflow concurrency metadata", async () => { + const base = { + workflowName: randomUUID(), + version: null, + idempotencyKey: null, + input: null, + config: {}, + context: null, + availableAt: null, + deadlineAt: null, + }; + + const keyOnly = { + ...base, + concurrencyKey: "tenant:acme", + concurrencyLimit: null, + }; + await expect( + Promise.resolve().then(() => backend.createWorkflowRun(keyOnly)), + ).rejects.toThrow( + 'Invalid workflow concurrency metadata: "concurrencyLimit" must be set when "concurrencyKey" is provided.', + ); + }); + + test("accepts limit-only workflow concurrency metadata as default bucket", async () => { + const base = { + workflowName: randomUUID(), + version: null, + idempotencyKey: null, + input: null, + config: {}, + context: null, + availableAt: null, + deadlineAt: null, + }; + + const limitOnly = { + ...base, + concurrencyKey: null, + concurrencyLimit: 1, + }; + const created = await backend.createWorkflowRun(limitOnly); + expect(created.concurrencyKey).toBeNull(); + expect(created.concurrencyLimit).toBe(1); + }); + + test("rejects invalid workflow concurrency limit values", async () => { + const base = { + workflowName: randomUUID(), + version: null, + idempotencyKey: null, + concurrencyKey: "tenant:acme", + input: null, + config: {}, + context: null, + availableAt: null, + deadlineAt: null, + }; + + const invalidLimits = [ + 0, + -1, + 1.5, + Number.NaN, + Number.POSITIVE_INFINITY, + ]; + for (const invalidLimit of invalidLimits) { + const invalid = { + ...base, + concurrencyLimit: invalidLimit, + } as unknown as CreateWorkflowRunParams; + await expect( + Promise.resolve().then(() => backend.createWorkflowRun(invalid)), + ).rejects.toThrow(INVALID_CONCURRENCY_LIMIT_VALUE_ERROR); + } + }); + + test("rejects whitespace-only workflow concurrency keys", async () => { + const invalid = { + workflowName: randomUUID(), + version: null, + idempotencyKey: null, + concurrencyKey: " ", + concurrencyLimit: 1, + input: null, + config: {}, + context: null, + availableAt: null, + deadlineAt: null, + } as unknown as CreateWorkflowRunParams; + + await expect( + Promise.resolve().then(() => backend.createWorkflowRun(invalid)), + ).rejects.toThrow(INVALID_CONCURRENCY_KEY_VALUE_ERROR); + }); + + test("rejects mixed concurrency limits for the same active bucket", async () => { + const backend = await setup(); + const workflowName = randomUUID(); + const version = "v1"; + const concurrencyKey = "tenant:acme"; + + await createPendingWorkflowRun(backend, { + workflowName, + version, + concurrencyKey, + concurrencyLimit: 1, + }); + + await expect( + backend.createWorkflowRun({ + workflowName, + version, + idempotencyKey: null, + concurrencyKey, + concurrencyLimit: 2, + input: null, + config: {}, + context: null, + availableAt: null, + deadlineAt: null, + }), + ).rejects.toThrow(CONCURRENCY_LIMIT_MISMATCH_ERROR); + + await teardown(backend); + }); + + test("rejects mixed concurrency limits for the same active default bucket", async () => { + const backend = await setup(); + const workflowName = randomUUID(); + const version = "v1"; + + await createPendingWorkflowRun(backend, { + workflowName, + version, + concurrencyLimit: 1, + }); + + await expect( + backend.createWorkflowRun({ + workflowName, + version, + idempotencyKey: null, + concurrencyKey: null, + concurrencyLimit: 2, + input: null, + config: {}, + context: null, + availableAt: null, + deadlineAt: null, + }), + ).rejects.toThrow(CONCURRENCY_LIMIT_MISMATCH_ERROR); + + await teardown(backend); + }); + + test("allows changing concurrency limit after terminal runs leave active states", async () => { + const backend = await setup(); + const workflowName = randomUUID(); + const version = "v1"; + const concurrencyKey = "tenant:acme"; + + const first = await createPendingWorkflowRun(backend, { + workflowName, + version, + concurrencyKey, + concurrencyLimit: 1, + }); + const workerId = randomUUID(); + + const claimed = await backend.claimWorkflowRun({ + workerId, + leaseDurationMs: 100, + }); + expect(claimed?.id).toBe(first.id); + + await backend.completeWorkflowRun({ + workflowRunId: first.id, + workerId, + output: null, + }); + + const next = await backend.createWorkflowRun({ + workflowName, + version, + idempotencyKey: null, + concurrencyKey, + concurrencyLimit: 2, + input: null, + config: {}, + context: null, + availableAt: null, + deadlineAt: null, + }); + expect(next.concurrencyLimit).toBe(2); + + await teardown(backend); + }); + + test("allows changing concurrency limit when only sleeping runs remain", async () => { + const backend = await setup(); + const workflowName = randomUUID(); + const version = "v1"; + const concurrencyKey = "tenant:acme"; + + const first = await createPendingWorkflowRun(backend, { + workflowName, + version, + concurrencyKey, + concurrencyLimit: 1, + }); + const workerId = randomUUID(); + + const claimed = await backend.claimWorkflowRun({ + workerId, + leaseDurationMs: 100, + }); + expect(claimed?.id).toBe(first.id); + + await backend.sleepWorkflowRun({ + workflowRunId: first.id, + workerId, + availableAt: new Date(Date.now() + 60_000), + }); + + const next = await backend.createWorkflowRun({ + workflowName, + version, + idempotencyKey: null, + concurrencyKey, + concurrencyLimit: 2, + input: null, + config: {}, + context: null, + availableAt: null, + deadlineAt: null, + }); + expect(next.concurrencyLimit).toBe(2); + + await teardown(backend); + }); + test("reuses the same run for matching idempotency key and workflow identity", async () => { const backend = await setup(); const workflowName = randomUUID(); @@ -121,6 +425,8 @@ export function testBackend(options: TestBackendOptions): void { workflowName, version, idempotencyKey, + concurrencyKey: null, + concurrencyLimit: null, input: { val: 1 }, config: {}, context: null, @@ -132,6 +438,8 @@ export function testBackend(options: TestBackendOptions): void { workflowName, version, idempotencyKey, + concurrencyKey: null, + concurrencyLimit: null, input: { val: 2 }, config: { changed: true }, context: null, @@ -151,6 +459,8 @@ export function testBackend(options: TestBackendOptions): void { workflowName: "workflow-a", version: "v1", idempotencyKey, + concurrencyKey: null, + concurrencyLimit: null, input: null, config: {}, context: null, @@ -162,6 +472,8 @@ export function testBackend(options: TestBackendOptions): void { workflowName: "workflow-b", version: "v1", idempotencyKey, + concurrencyKey: null, + concurrencyLimit: null, input: null, config: {}, context: null, @@ -184,6 +496,8 @@ export function testBackend(options: TestBackendOptions): void { workflowName, version: "v1", idempotencyKey, + concurrencyKey: null, + concurrencyLimit: null, input: null, config: {}, context: null, @@ -195,6 +509,8 @@ export function testBackend(options: TestBackendOptions): void { workflowName, version: "v1", idempotencyKey, + concurrencyKey: null, + concurrencyLimit: null, input: null, config: {}, context: null, @@ -209,6 +525,8 @@ export function testBackend(options: TestBackendOptions): void { workflowName, version: "v1", idempotencyKey, + concurrencyKey: null, + concurrencyLimit: null, input: null, config: {}, context: null, @@ -220,6 +538,8 @@ export function testBackend(options: TestBackendOptions): void { workflowName, version: "v1", idempotencyKey, + concurrencyKey: null, + concurrencyLimit: null, input: null, config: {}, context: null, @@ -244,6 +564,8 @@ export function testBackend(options: TestBackendOptions): void { workflowName, version: "v1", idempotencyKey, + concurrencyKey: null, + concurrencyLimit: null, input: null, config: {}, context: null, @@ -255,6 +577,8 @@ export function testBackend(options: TestBackendOptions): void { workflowName, version: "v2", idempotencyKey, + concurrencyKey: null, + concurrencyLimit: null, input: null, config: {}, context: null, @@ -280,6 +604,8 @@ export function testBackend(options: TestBackendOptions): void { workflowName, version: "v1", idempotencyKey, + concurrencyKey: null, + concurrencyLimit: null, input: null, config: {}, context: null, @@ -295,6 +621,8 @@ export function testBackend(options: TestBackendOptions): void { workflowName, version: "v1", idempotencyKey, + concurrencyKey: null, + concurrencyLimit: null, input: null, config: {}, context: null, @@ -317,6 +645,8 @@ export function testBackend(options: TestBackendOptions): void { workflowName, version: null, idempotencyKey: null, + concurrencyKey: null, + concurrencyLimit: null, input: null, config: {}, context: null, @@ -328,6 +658,8 @@ export function testBackend(options: TestBackendOptions): void { workflowName, version: null, idempotencyKey: null, + concurrencyKey: null, + concurrencyLimit: null, input: null, config: {}, context: null, @@ -351,6 +683,8 @@ export function testBackend(options: TestBackendOptions): void { workflowName, version, idempotencyKey, + concurrencyKey: null, + concurrencyLimit: null, input: { i }, config: {}, context: null, @@ -375,6 +709,8 @@ export function testBackend(options: TestBackendOptions): void { workflowName, version, idempotencyKey, + concurrencyKey: null, + concurrencyLimit: null, input: null, config: {}, context: null, @@ -399,6 +735,8 @@ export function testBackend(options: TestBackendOptions): void { workflowName, version, idempotencyKey, + concurrencyKey: null, + concurrencyLimit: null, input: null, config: {}, context: null, @@ -421,6 +759,8 @@ export function testBackend(options: TestBackendOptions): void { workflowName, version, idempotencyKey: failedKey, + concurrencyKey: null, + concurrencyLimit: null, input: null, config: {}, context: null, @@ -446,6 +786,8 @@ export function testBackend(options: TestBackendOptions): void { workflowName, version, idempotencyKey: failedKey, + concurrencyKey: null, + concurrencyLimit: null, input: null, config: {}, context: null, @@ -459,236 +801,756 @@ export function testBackend(options: TestBackendOptions): void { const canceledRun = await backend.createWorkflowRun({ workflowName, version, - idempotencyKey: canceledKey, - input: null, - config: {}, - context: null, - availableAt: null, - deadlineAt: null, + idempotencyKey: canceledKey, + concurrencyKey: null, + concurrencyLimit: null, + input: null, + config: {}, + context: null, + availableAt: null, + deadlineAt: null, + }); + + await backend.cancelWorkflowRun({ workflowRunId: canceledRun.id }); + + const canceledDeduped = await backend.createWorkflowRun({ + workflowName, + version, + idempotencyKey: canceledKey, + concurrencyKey: null, + concurrencyLimit: null, + input: null, + config: {}, + context: null, + availableAt: null, + deadlineAt: null, + }); + expect(canceledDeduped.id).toBe(canceledRun.id); + expect(canceledDeduped.status).toBe("canceled"); + + await teardown(backend); + }); + }); + + describe("listWorkflowRuns()", () => { + test("lists workflow runs ordered by creation time", async () => { + const backend = await setup(); + const first = await createPendingWorkflowRun(backend); + await sleep(10); // ensure timestamp difference + const second = await createPendingWorkflowRun(backend); + + const listed = await backend.listWorkflowRuns({}); + const listedIds = listed.data.map((run: WorkflowRun) => run.id); + expect(listedIds).toEqual([second.id, first.id]); + await teardown(backend); + }); + + test("paginates workflow runs", async () => { + const backend = await setup(); + const runs: WorkflowRun[] = []; + for (let i = 0; i < 5; i++) { + runs.push(await createPendingWorkflowRun(backend)); + await sleep(10); + } + + // p1 + const page1 = await backend.listWorkflowRuns({ limit: 2 }); + expect(page1.data).toHaveLength(2); + expect(page1.data[0]?.id).toBe(runs[4]?.id); + expect(page1.data[1]?.id).toBe(runs[3]?.id); + expect(page1.pagination.next).not.toBeNull(); + expect(page1.pagination.prev).toBeNull(); + + // p2 + const page2 = await backend.listWorkflowRuns({ + limit: 2, + after: page1.pagination.next!, // eslint-disable-line @typescript-eslint/no-non-null-assertion + }); + expect(page2.data).toHaveLength(2); + expect(page2.data[0]?.id).toBe(runs[2]?.id); + expect(page2.data[1]?.id).toBe(runs[1]?.id); + expect(page2.pagination.next).not.toBeNull(); + expect(page2.pagination.prev).not.toBeNull(); + + // p3 + const page3 = await backend.listWorkflowRuns({ + limit: 2, + after: page2.pagination.next!, // eslint-disable-line @typescript-eslint/no-non-null-assertion + }); + expect(page3.data).toHaveLength(1); + expect(page3.data[0]?.id).toBe(runs[0]?.id); + expect(page3.pagination.next).toBeNull(); + expect(page3.pagination.prev).not.toBeNull(); + + // p2 again + const page2Back = await backend.listWorkflowRuns({ + limit: 2, + before: page3.pagination.prev!, // eslint-disable-line @typescript-eslint/no-non-null-assertion + }); + expect(page2Back.data).toHaveLength(2); + expect(page2Back.data[0]?.id).toBe(runs[2]?.id); + expect(page2Back.data[1]?.id).toBe(runs[1]?.id); + expect(page2Back.pagination.next).toEqual(page2.pagination.next); + expect(page2Back.pagination.prev).toEqual(page2.pagination.prev); + await teardown(backend); + }); + + test("handles empty results", async () => { + const backend = await setup(); + const listed = await backend.listWorkflowRuns({}); + expect(listed.data).toHaveLength(0); + expect(listed.pagination.next).toBeNull(); + expect(listed.pagination.prev).toBeNull(); + await teardown(backend); + }); + + test("paginates correctly with id as tiebreaker when multiple items have the same created_at timestamp", async () => { + const backend = await setup(); + + const runs: WorkflowRun[] = []; + for (let i = 0; i < 5; i++) { + runs.push(await createPendingWorkflowRun(backend)); + } + + runs.sort((a, b) => { + const timeDiff = b.createdAt.getTime() - a.createdAt.getTime(); + if (timeDiff !== 0) return timeDiff; + return b.id.localeCompare(a.id); + }); + + const page1 = await backend.listWorkflowRuns({ limit: 2 }); + expect(page1.data).toHaveLength(2); + expect(page1.data[0]?.id).toBe(runs[0]?.id); + expect(page1.data[1]?.id).toBe(runs[1]?.id); + expect(page1.pagination.next).not.toBeNull(); + + const page2 = await backend.listWorkflowRuns({ + limit: 2, + after: page1.pagination.next!, // eslint-disable-line @typescript-eslint/no-non-null-assertion + }); + expect(page2.data).toHaveLength(2); + expect(page2.data[0]?.id).toBe(runs[2]?.id); + expect(page2.data[1]?.id).toBe(runs[3]?.id); + expect(page2.pagination.next).not.toBeNull(); + + const page3 = await backend.listWorkflowRuns({ + limit: 2, + after: page2.pagination.next!, // eslint-disable-line @typescript-eslint/no-non-null-assertion + }); + expect(page3.data).toHaveLength(1); + expect(page3.data[0]?.id).toBe(runs[4]?.id); + expect(page3.pagination.next).toBeNull(); + + const page2Back = await backend.listWorkflowRuns({ + limit: 2, + before: page3.pagination.prev!, // eslint-disable-line @typescript-eslint/no-non-null-assertion + }); + expect(page2Back.data).toHaveLength(2); + expect(page2Back.data[0]?.id).toBe(runs[2]?.id); + expect(page2Back.data[1]?.id).toBe(runs[3]?.id); + + await teardown(backend); + }); + }); + + describe("claimWorkflowRun()", () => { + // because claims involve timing and leases, we create and teardown a new + // namespaced backend instance for each test + + test("claims workflow runs and respects leases, reclaiming if lease expires", async () => { + const backend = await setup(); + + await createPendingWorkflowRun(backend); + + const firstLeaseMs = 30; + const firstWorker = randomUUID(); + const claimed = await backend.claimWorkflowRun({ + workerId: firstWorker, + leaseDurationMs: firstLeaseMs, + }); + expect(claimed?.status).toBe("running"); + expect(claimed?.workerId).toBe(firstWorker); + expect(claimed?.attempts).toBe(1); + expect(claimed?.startedAt).not.toBeNull(); + + const secondWorker = randomUUID(); + const blocked = await backend.claimWorkflowRun({ + workerId: secondWorker, + leaseDurationMs: 10, + }); + expect(blocked).toBeNull(); + + await sleep(firstLeaseMs + 5); // small buffer for timing variability + + const reclaimed = await backend.claimWorkflowRun({ + workerId: secondWorker, + leaseDurationMs: 10, + }); + expect(reclaimed?.id).toBe(claimed?.id); + expect(reclaimed?.attempts).toBe(2); + expect(reclaimed?.workerId).toBe(secondWorker); + expect(reclaimed?.startedAt?.getTime()).toBe( + claimed?.startedAt?.getTime(), + ); + + await teardown(backend); + }); + + test("prioritizes pending workflow runs over expired running ones", async () => { + const backend = await setup(); + + const running = await createPendingWorkflowRun(backend); + const runningClaim = await backend.claimWorkflowRun({ + workerId: "worker-running", + leaseDurationMs: 5, + }); + if (!runningClaim) throw new Error("expected claim"); + expect(runningClaim.id).toBe(running.id); + + await sleep(10); // wait for running's lease to expire + + // pending claimed first, even though running expired + const pending = await createPendingWorkflowRun(backend); + const claimedFirst = await backend.claimWorkflowRun({ + workerId: "worker-second", + leaseDurationMs: 100, + }); + expect(claimedFirst?.id).toBe(pending.id); + + // running claimed second + const claimedSecond = await backend.claimWorkflowRun({ + workerId: "worker-third", + leaseDurationMs: 100, + }); + expect(claimedSecond?.id).toBe(running.id); + + await teardown(backend); + }); + + test("returns null when no workflow runs are available", async () => { + const backend = await setup(); + + const claimed = await backend.claimWorkflowRun({ + workerId: randomUUID(), + leaseDurationMs: 10, + }); + expect(claimed).toBeNull(); + + await teardown(backend); + }); + + test("enforces concurrency limit for the same workflow version and key", async () => { + const backend = await setup(); + const workflowName = randomUUID(); + const version = "v1"; + const concurrencyKey = "tenant:acme"; + const concurrencyLimit = 1; + + await createPendingWorkflowRun(backend, { + workflowName, + version, + concurrencyKey, + concurrencyLimit, + }); + await createPendingWorkflowRun(backend, { + workflowName, + version, + concurrencyKey, + concurrencyLimit, + }); + + const firstClaimed = await backend.claimWorkflowRun({ + workerId: randomUUID(), + leaseDurationMs: 100, + }); + expect(firstClaimed).not.toBeNull(); + + const blocked = await backend.claimWorkflowRun({ + workerId: randomUUID(), + leaseDurationMs: 100, + }); + expect(blocked).toBeNull(); + + await teardown(backend); + }); + + test("enforces concurrency limit for default workflow-version bucket when key is omitted", async () => { + const backend = await setup(); + const workflowName = randomUUID(); + const version = "v1"; + const concurrencyLimit = 1; + + await createPendingWorkflowRun(backend, { + workflowName, + version, + concurrencyLimit, + }); + await createPendingWorkflowRun(backend, { + workflowName, + version, + concurrencyLimit, + }); + + const firstClaimed = await backend.claimWorkflowRun({ + workerId: randomUUID(), + leaseDurationMs: 100, + }); + expect(firstClaimed).not.toBeNull(); + expect(firstClaimed?.concurrencyKey).toBeNull(); + expect(firstClaimed?.concurrencyLimit).toBe(concurrencyLimit); + + const blocked = await backend.claimWorkflowRun({ + workerId: randomUUID(), + leaseDurationMs: 100, + }); + expect(blocked).toBeNull(); + + await teardown(backend); + }); + + test("supports limits greater than one", async () => { + const backend = await setup(); + const workflowName = randomUUID(); + const version = "v1"; + const concurrencyKey = "tenant:acme"; + const concurrencyLimit = 2; + + const firstRun = await createPendingWorkflowRun(backend, { + workflowName, + version, + concurrencyKey, + concurrencyLimit, + }); + const secondRun = await createPendingWorkflowRun(backend, { + workflowName, + version, + concurrencyKey, + concurrencyLimit, + }); + const thirdRun = await createPendingWorkflowRun(backend, { + workflowName, + version, + concurrencyKey, + concurrencyLimit, + }); + const runIds = new Set([firstRun.id, secondRun.id, thirdRun.id]); + + const firstClaimed = await backend.claimWorkflowRun({ + workerId: randomUUID(), + leaseDurationMs: 100, + }); + const secondClaimed = await backend.claimWorkflowRun({ + workerId: randomUUID(), + leaseDurationMs: 100, + }); + + expect(firstClaimed).not.toBeNull(); + expect(secondClaimed).not.toBeNull(); + if (!firstClaimed || !secondClaimed) { + throw new Error("Expected two claimed workflow runs"); + } + expect(firstClaimed.id).not.toBe(secondClaimed.id); + expect(runIds.has(firstClaimed.id)).toBe(true); + expect(runIds.has(secondClaimed.id)).toBe(true); + + const blocked = await backend.claimWorkflowRun({ + workerId: randomUUID(), + leaseDurationMs: 100, + }); + expect(blocked).toBeNull(); + + await teardown(backend); + }); + + test("still claims unconstrained runs when a constrained bucket is full", async () => { + const backend = await setup(); + const workflowName = randomUUID(); + const version = "v1"; + const concurrencyKey = "tenant:acme"; + const concurrencyLimit = 1; + + const constrainedA = await createPendingWorkflowRun(backend, { + workflowName, + version, + concurrencyKey, + concurrencyLimit, + }); + const constrainedB = await createPendingWorkflowRun(backend, { + workflowName, + version, + concurrencyKey, + concurrencyLimit, + }); + const constrainedIds = new Set([constrainedA.id, constrainedB.id]); + const unconstrained = await createPendingWorkflowRun(backend, { + workflowName, + version, + availableAt: new Date(Date.now() + 20), + }); + + const firstClaimed = await backend.claimWorkflowRun({ + workerId: randomUUID(), + leaseDurationMs: 100, + }); + expect(firstClaimed).not.toBeNull(); + if (!firstClaimed) throw new Error("Expected constrained run to claim"); + expect(constrainedIds.has(firstClaimed.id)).toBe(true); + + await sleep(30); + + const secondClaimed = await backend.claimWorkflowRun({ + workerId: randomUUID(), + leaseDurationMs: 100, + }); + expect(secondClaimed?.id).toBe(unconstrained.id); + expect(secondClaimed?.concurrencyKey).toBeNull(); + expect(secondClaimed?.concurrencyLimit).toBeNull(); + + const blocked = await backend.claimWorkflowRun({ + workerId: randomUUID(), + leaseDurationMs: 100, + }); + expect(blocked).toBeNull(); + + await teardown(backend); + }); + + test("allows claims for different concurrency keys", async () => { + const backend = await setup(); + const workflowName = randomUUID(); + const version = "v1"; + + await createPendingWorkflowRun(backend, { + workflowName, + version, + concurrencyKey: "tenant:a", + concurrencyLimit: 1, + }); + await createPendingWorkflowRun(backend, { + workflowName, + version, + concurrencyKey: "tenant:b", + concurrencyLimit: 1, + }); + + const firstClaimed = await backend.claimWorkflowRun({ + workerId: randomUUID(), + leaseDurationMs: 100, + }); + const secondClaimed = await backend.claimWorkflowRun({ + workerId: randomUUID(), + leaseDurationMs: 100, + }); + + expect(firstClaimed).not.toBeNull(); + expect(secondClaimed).not.toBeNull(); + expect(secondClaimed?.id).not.toBe(firstClaimed?.id); + + await teardown(backend); + }); + + test("allows claims for different workflow versions", async () => { + const backend = await setup(); + const workflowName = randomUUID(); + const concurrencyKey = "tenant:acme"; + const concurrencyLimit = 1; + + await createPendingWorkflowRun(backend, { + workflowName, + version: "v1", + concurrencyKey, + concurrencyLimit, + }); + await createPendingWorkflowRun(backend, { + workflowName, + version: "v2", + concurrencyKey, + concurrencyLimit, + }); + + const firstClaimed = await backend.claimWorkflowRun({ + workerId: randomUUID(), + leaseDurationMs: 100, + }); + const secondClaimed = await backend.claimWorkflowRun({ + workerId: randomUUID(), + leaseDurationMs: 100, + }); + + expect(firstClaimed).not.toBeNull(); + expect(secondClaimed).not.toBeNull(); + expect(secondClaimed?.id).not.toBe(firstClaimed?.id); + + await teardown(backend); + }); + + test("allows claims for different versions in default bucket when key is omitted", async () => { + const backend = await setup(); + const workflowName = randomUUID(); + + await createPendingWorkflowRun(backend, { + workflowName, + version: "v1", + concurrencyLimit: 1, + }); + await createPendingWorkflowRun(backend, { + workflowName, + version: "v2", + concurrencyLimit: 1, + }); + + const firstClaimed = await backend.claimWorkflowRun({ + workerId: randomUUID(), + leaseDurationMs: 100, + }); + const secondClaimed = await backend.claimWorkflowRun({ + workerId: randomUUID(), + leaseDurationMs: 100, + }); + + expect(firstClaimed).not.toBeNull(); + expect(secondClaimed).not.toBeNull(); + expect(secondClaimed?.id).not.toBe(firstClaimed?.id); + + await teardown(backend); + }); + + test("allows claims after the active lease expires", async () => { + const backend = await setup(); + const workflowName = randomUUID(); + const version = "v1"; + const concurrencyKey = "tenant:acme"; + const concurrencyLimit = 1; + + await createPendingWorkflowRun(backend, { + workflowName, + version, + concurrencyKey, + concurrencyLimit, }); - - await backend.cancelWorkflowRun({ workflowRunId: canceledRun.id }); - - const canceledDeduped = await backend.createWorkflowRun({ + await createPendingWorkflowRun(backend, { workflowName, version, - idempotencyKey: canceledKey, - input: null, - config: {}, - context: null, - availableAt: null, - deadlineAt: null, + concurrencyKey, + concurrencyLimit, }); - expect(canceledDeduped.id).toBe(canceledRun.id); - expect(canceledDeduped.status).toBe("canceled"); - await teardown(backend); - }); - }); + const firstClaimed = await backend.claimWorkflowRun({ + workerId: randomUUID(), + leaseDurationMs: 20, + }); + expect(firstClaimed).not.toBeNull(); - describe("listWorkflowRuns()", () => { - test("lists workflow runs ordered by creation time", async () => { - const backend = await setup(); - const first = await createPendingWorkflowRun(backend); - await sleep(10); // ensure timestamp difference - const second = await createPendingWorkflowRun(backend); + const blocked = await backend.claimWorkflowRun({ + workerId: randomUUID(), + leaseDurationMs: 20, + }); + expect(blocked).toBeNull(); + + await sleep(30); + + const claimedAfterExpiry = await backend.claimWorkflowRun({ + workerId: randomUUID(), + leaseDurationMs: 20, + }); + expect(claimedAfterExpiry).not.toBeNull(); + expect(claimedAfterExpiry?.id).not.toBe(firstClaimed?.id); - const listed = await backend.listWorkflowRuns({}); - const listedIds = listed.data.map((run: WorkflowRun) => run.id); - expect(listedIds).toEqual([second.id, first.id]); await teardown(backend); }); - test("paginates workflow runs", async () => { + test("does not consume concurrency slot for sleeping runs", async () => { const backend = await setup(); - const runs: WorkflowRun[] = []; - for (let i = 0; i < 5; i++) { - runs.push(await createPendingWorkflowRun(backend)); - await sleep(10); - } + const workflowName = randomUUID(); + const version = "v1"; + const concurrencyKey = "tenant:acme"; + const concurrencyLimit = 1; - // p1 - const page1 = await backend.listWorkflowRuns({ limit: 2 }); - expect(page1.data).toHaveLength(2); - expect(page1.data[0]?.id).toBe(runs[4]?.id); - expect(page1.data[1]?.id).toBe(runs[3]?.id); - expect(page1.pagination.next).not.toBeNull(); - expect(page1.pagination.prev).toBeNull(); + const firstRun = await createPendingWorkflowRun(backend, { + workflowName, + version, + concurrencyKey, + concurrencyLimit, + }); + const secondRun = await createPendingWorkflowRun(backend, { + workflowName, + version, + concurrencyKey, + concurrencyLimit, + }); - // p2 - const page2 = await backend.listWorkflowRuns({ - limit: 2, - after: page1.pagination.next!, // eslint-disable-line @typescript-eslint/no-non-null-assertion + const workerId = randomUUID(); + const firstClaimed = await backend.claimWorkflowRun({ + workerId, + leaseDurationMs: 200, }); - expect(page2.data).toHaveLength(2); - expect(page2.data[0]?.id).toBe(runs[2]?.id); - expect(page2.data[1]?.id).toBe(runs[1]?.id); - expect(page2.pagination.next).not.toBeNull(); - expect(page2.pagination.prev).not.toBeNull(); + expect(firstClaimed).not.toBeNull(); + if (!firstClaimed) throw new Error("Expected first claim"); + const secondRunId = + firstClaimed.id === firstRun.id ? secondRun.id : firstRun.id; - // p3 - const page3 = await backend.listWorkflowRuns({ - limit: 2, - after: page2.pagination.next!, // eslint-disable-line @typescript-eslint/no-non-null-assertion + const sleeping = await backend.sleepWorkflowRun({ + workflowRunId: firstClaimed.id, + workerId, + availableAt: new Date(Date.now() + 200), }); - expect(page3.data).toHaveLength(1); - expect(page3.data[0]?.id).toBe(runs[0]?.id); - expect(page3.pagination.next).toBeNull(); - expect(page3.pagination.prev).not.toBeNull(); + expect(sleeping.status).toBe("sleeping"); - // p2 again - const page2Back = await backend.listWorkflowRuns({ - limit: 2, - before: page3.pagination.prev!, // eslint-disable-line @typescript-eslint/no-non-null-assertion + const secondClaimed = await backend.claimWorkflowRun({ + workerId: randomUUID(), + leaseDurationMs: 200, }); - expect(page2Back.data).toHaveLength(2); - expect(page2Back.data[0]?.id).toBe(runs[2]?.id); - expect(page2Back.data[1]?.id).toBe(runs[1]?.id); - expect(page2Back.pagination.next).toEqual(page2.pagination.next); - expect(page2Back.pagination.prev).toEqual(page2.pagination.prev); - await teardown(backend); - }); + expect(secondClaimed?.id).toBe(secondRunId); - test("handles empty results", async () => { - const backend = await setup(); - const listed = await backend.listWorkflowRuns({}); - expect(listed.data).toHaveLength(0); - expect(listed.pagination.next).toBeNull(); - expect(listed.pagination.prev).toBeNull(); await teardown(backend); }); - test("paginates correctly with id as tiebreaker when multiple items have the same created_at timestamp", async () => { + test("frees concurrency slot after terminal workflow failure", async () => { const backend = await setup(); + const workflowName = randomUUID(); + const version = "v1"; + const concurrencyKey = "tenant:acme"; + const concurrencyLimit = 1; - const runs: WorkflowRun[] = []; - for (let i = 0; i < 5; i++) { - runs.push(await createPendingWorkflowRun(backend)); - } - - runs.sort((a, b) => { - const timeDiff = b.createdAt.getTime() - a.createdAt.getTime(); - if (timeDiff !== 0) return timeDiff; - return b.id.localeCompare(a.id); + const firstRun = await createPendingWorkflowRun(backend, { + workflowName, + version, + concurrencyKey, + concurrencyLimit, + }); + const secondRun = await createPendingWorkflowRun(backend, { + workflowName, + version, + concurrencyKey, + concurrencyLimit, }); - const page1 = await backend.listWorkflowRuns({ limit: 2 }); - expect(page1.data).toHaveLength(2); - expect(page1.data[0]?.id).toBe(runs[0]?.id); - expect(page1.data[1]?.id).toBe(runs[1]?.id); - expect(page1.pagination.next).not.toBeNull(); - - const page2 = await backend.listWorkflowRuns({ - limit: 2, - after: page1.pagination.next!, // eslint-disable-line @typescript-eslint/no-non-null-assertion + const workerId = randomUUID(); + const firstClaimed = await backend.claimWorkflowRun({ + workerId, + leaseDurationMs: 200, }); - expect(page2.data).toHaveLength(2); - expect(page2.data[0]?.id).toBe(runs[2]?.id); - expect(page2.data[1]?.id).toBe(runs[3]?.id); - expect(page2.pagination.next).not.toBeNull(); + expect(firstClaimed).not.toBeNull(); + if (!firstClaimed) throw new Error("Expected first claim"); + const secondRunId = + firstClaimed.id === firstRun.id ? secondRun.id : firstRun.id; - const page3 = await backend.listWorkflowRuns({ - limit: 2, - after: page2.pagination.next!, // eslint-disable-line @typescript-eslint/no-non-null-assertion + const failed = await backend.failWorkflowRun({ + workflowRunId: firstClaimed.id, + workerId, + error: { message: "terminal failure" }, + retryPolicy: DEFAULT_WORKFLOW_RETRY_POLICY, }); - expect(page3.data).toHaveLength(1); - expect(page3.data[0]?.id).toBe(runs[4]?.id); - expect(page3.pagination.next).toBeNull(); + expect(failed.status).toBe("failed"); - const page2Back = await backend.listWorkflowRuns({ - limit: 2, - before: page3.pagination.prev!, // eslint-disable-line @typescript-eslint/no-non-null-assertion + const secondClaimed = await backend.claimWorkflowRun({ + workerId: randomUUID(), + leaseDurationMs: 200, }); - expect(page2Back.data).toHaveLength(2); - expect(page2Back.data[0]?.id).toBe(runs[2]?.id); - expect(page2Back.data[1]?.id).toBe(runs[3]?.id); + expect(secondClaimed?.id).toBe(secondRunId); await teardown(backend); }); - }); - describe("claimWorkflowRun()", () => { - // because claims involve timing and leases, we create and teardown a new - // namespaced backend instance for each test - - test("claims workflow runs and respects leases, reclaiming if lease expires", async () => { + test("does not consume concurrency slot while waiting for retry", async () => { const backend = await setup(); + const workflowName = randomUUID(); + const version = "v1"; + const concurrencyKey = "tenant:acme"; + const concurrencyLimit = 1; - await createPendingWorkflowRun(backend); - - const firstLeaseMs = 30; - const firstWorker = randomUUID(); - const claimed = await backend.claimWorkflowRun({ - workerId: firstWorker, - leaseDurationMs: firstLeaseMs, + const firstRun = await createPendingWorkflowRun(backend, { + workflowName, + version, + concurrencyKey, + concurrencyLimit, + }); + const secondRun = await createPendingWorkflowRun(backend, { + workflowName, + version, + concurrencyKey, + concurrencyLimit, }); - expect(claimed?.status).toBe("running"); - expect(claimed?.workerId).toBe(firstWorker); - expect(claimed?.attempts).toBe(1); - expect(claimed?.startedAt).not.toBeNull(); - const secondWorker = randomUUID(); - const blocked = await backend.claimWorkflowRun({ - workerId: secondWorker, - leaseDurationMs: 10, + const workerId = randomUUID(); + const firstClaimed = await backend.claimWorkflowRun({ + workerId, + leaseDurationMs: 200, }); - expect(blocked).toBeNull(); + expect(firstClaimed).not.toBeNull(); + if (!firstClaimed) throw new Error("Expected first claim"); + const secondRunId = + firstClaimed.id === firstRun.id ? secondRun.id : firstRun.id; - await sleep(firstLeaseMs + 5); // small buffer for timing variability + const failed = await backend.failWorkflowRun({ + workflowRunId: firstClaimed.id, + workerId, + error: { message: "retry me" }, + retryPolicy: SHORT_WORKFLOW_RETRY_POLICY, + }); + expect(failed.status).toBe("pending"); + expect(failed.availableAt).not.toBeNull(); - const reclaimed = await backend.claimWorkflowRun({ - workerId: secondWorker, - leaseDurationMs: 10, + const secondClaimed = await backend.claimWorkflowRun({ + workerId: randomUUID(), + leaseDurationMs: 200, }); - expect(reclaimed?.id).toBe(claimed?.id); - expect(reclaimed?.attempts).toBe(2); - expect(reclaimed?.workerId).toBe(secondWorker); - expect(reclaimed?.startedAt?.getTime()).toBe( - claimed?.startedAt?.getTime(), - ); + expect(secondClaimed?.id).toBe(secondRunId); await teardown(backend); }); - test("prioritizes pending workflow runs over expired running ones", async () => { + test("blocks due retry when another run in same bucket is actively leased", async () => { const backend = await setup(); + const workflowName = randomUUID(); + const version = "v1"; + const concurrencyKey = "tenant:acme"; + const concurrencyLimit = 1; - const running = await createPendingWorkflowRun(backend); - const runningClaim = await backend.claimWorkflowRun({ - workerId: "worker-running", - leaseDurationMs: 5, + const firstRun = await createPendingWorkflowRun(backend, { + workflowName, + version, + concurrencyKey, + concurrencyLimit, + }); + const secondRun = await createPendingWorkflowRun(backend, { + workflowName, + version, + concurrencyKey, + concurrencyLimit, }); - if (!runningClaim) throw new Error("expected claim"); - expect(runningClaim.id).toBe(running.id); - - await sleep(10); // wait for running's lease to expire - // pending claimed first, even though running expired - const pending = await createPendingWorkflowRun(backend); - const claimedFirst = await backend.claimWorkflowRun({ - workerId: "worker-second", - leaseDurationMs: 100, + const firstWorkerId = randomUUID(); + const firstClaimed = await backend.claimWorkflowRun({ + workerId: firstWorkerId, + leaseDurationMs: 200, }); - expect(claimedFirst?.id).toBe(pending.id); + expect(firstClaimed).not.toBeNull(); + if (!firstClaimed) throw new Error("Expected first claim"); + const secondRunId = + firstClaimed.id === firstRun.id ? secondRun.id : firstRun.id; - // running claimed second - const claimedSecond = await backend.claimWorkflowRun({ - workerId: "worker-third", - leaseDurationMs: 100, + const failed = await backend.failWorkflowRun({ + workflowRunId: firstClaimed.id, + workerId: firstWorkerId, + error: { message: "retry later" }, + retryPolicy: SHORT_WORKFLOW_RETRY_POLICY, }); - expect(claimedSecond?.id).toBe(running.id); + expect(failed.status).toBe("pending"); - await teardown(backend); - }); + const secondClaimed = await backend.claimWorkflowRun({ + workerId: randomUUID(), + leaseDurationMs: 200, + }); + expect(secondClaimed?.id).toBe(secondRunId); - test("returns null when no workflow runs are available", async () => { - const backend = await setup(); + await sleep(60); // allow first retry to become due while second is leased - const claimed = await backend.claimWorkflowRun({ + const blocked = await backend.claimWorkflowRun({ workerId: randomUUID(), - leaseDurationMs: 10, + leaseDurationMs: 200, }); - expect(claimed).toBeNull(); + expect(blocked).toBeNull(); await teardown(backend); }); @@ -1341,6 +2203,8 @@ export function testBackend(options: TestBackendOptions): void { workflowName: randomUUID(), version: null, idempotencyKey: null, + concurrencyKey: null, + concurrencyLimit: null, input: null, config: {}, context: null, @@ -1360,6 +2224,8 @@ export function testBackend(options: TestBackendOptions): void { workflowName: randomUUID(), version: null, idempotencyKey: null, + concurrencyKey: null, + concurrencyLimit: null, input: null, config: {}, context: null, @@ -1385,6 +2251,8 @@ export function testBackend(options: TestBackendOptions): void { workflowName: randomUUID(), version: null, idempotencyKey: null, + concurrencyKey: null, + concurrencyLimit: null, input: null, config: {}, context: null, @@ -1421,6 +2289,8 @@ export function testBackend(options: TestBackendOptions): void { workflowName: randomUUID(), version: null, idempotencyKey: null, + concurrencyKey: null, + concurrencyLimit: null, input: null, config: {}, context: null, @@ -1459,6 +2329,8 @@ export function testBackend(options: TestBackendOptions): void { workflowName: randomUUID(), version: null, idempotencyKey: null, + concurrencyKey: null, + concurrencyLimit: null, input: null, config: {}, context: null, @@ -1585,6 +2457,8 @@ export function testBackend(options: TestBackendOptions): void { workflowName: randomUUID(), version: null, idempotencyKey: null, + concurrencyKey: null, + concurrencyLimit: null, input: null, config: {}, context: null, @@ -1682,21 +2556,48 @@ export function testBackend(options: TestBackendOptions): void { }); } +/** + * Options for creating a pending workflow run in tests. + */ +interface CreatePendingWorkflowRunOptions { + workflowName?: string; + version?: string | null; + concurrencyKey?: string | null; + concurrencyLimit?: number | null; + availableAt?: Date | null; + deadlineAt?: Date | null; +} + /** * Create a pending workflow run for tests. * @param b - Backend + * @param options - Optional run overrides * @returns Created workflow run */ -async function createPendingWorkflowRun(b: Backend) { +async function createPendingWorkflowRun( + b: Backend, + options: CreatePendingWorkflowRunOptions = {}, +) { + const { + workflowName = randomUUID(), + version = null, + concurrencyKey = null, + concurrencyLimit = null, + availableAt = null, + deadlineAt = null, + } = options; + return await b.createWorkflowRun({ - workflowName: randomUUID(), - version: null, + workflowName, + version, idempotencyKey: null, + concurrencyKey, + concurrencyLimit, input: null, config: {}, context: null, - availableAt: null, - deadlineAt: null, + availableAt, + deadlineAt, }); } diff --git a/packages/openworkflow/backend.ts b/packages/openworkflow/backend.ts index fdbc432f..11b5c73f 100644 --- a/packages/openworkflow/backend.ts +++ b/packages/openworkflow/backend.ts @@ -68,6 +68,8 @@ export interface CreateWorkflowRunParams { workflowName: string; version: string | null; idempotencyKey: string | null; + concurrencyKey: string | null; + concurrencyLimit: number | null; config: JsonValue; context: JsonValue | null; input: JsonValue | null; diff --git a/packages/openworkflow/client.test.ts b/packages/openworkflow/client.test.ts index 0e154cb8..76cfc490 100644 --- a/packages/openworkflow/client.test.ts +++ b/packages/openworkflow/client.test.ts @@ -334,6 +334,170 @@ describe("OpenWorkflow", () => { expect(handle.workflowRun.version).toBeNull(); }); + test("resolves literal workflow concurrency values", async () => { + const backend = await createBackend(); + const client = new OpenWorkflow({ backend }); + + const workflow = client.defineWorkflow( + { + name: "concurrency-literal-test", + concurrency: { + key: "tenant:acme", + limit: 3, + }, + }, + noopFn, + ); + const handle = await workflow.run({ value: 1 }); + + expect(handle.workflowRun.concurrencyKey).toBe("tenant:acme"); + expect(handle.workflowRun.concurrencyLimit).toBe(3); + }); + + test("resolves literal workflow concurrency limit without key", async () => { + const backend = await createBackend(); + const client = new OpenWorkflow({ backend }); + + const workflow = client.defineWorkflow( + { + name: "concurrency-literal-limit-only-test", + concurrency: { + limit: 3, + }, + }, + noopFn, + ); + const handle = await workflow.run({ value: 1 }); + + expect(handle.workflowRun.concurrencyKey).toBeNull(); + expect(handle.workflowRun.concurrencyLimit).toBe(3); + }); + + test("resolves function workflow concurrency values from parsed input", async () => { + const backend = await createBackend(); + const client = new OpenWorkflow({ backend }); + + const schema = z.object({ + tenant: z.string().transform((value) => value.trim()), + limit: z.coerce.number().int().positive(), + }); + + const workflow = client.defineWorkflow( + { + name: "concurrency-function-test", + schema, + concurrency: { + key: ({ input }) => `tenant:${input.tenant}`, + limit: ({ input }) => Number(input.limit), + }, + }, + noopFn, + ); + + const handle = await workflow.run({ + tenant: " acme ", + limit: "4", + }); + + expect(handle.workflowRun.concurrencyKey).toBe("tenant:acme"); + expect(handle.workflowRun.concurrencyLimit).toBe(4); + }); + + test("resolves function workflow concurrency limit from parsed input without key", async () => { + const backend = await createBackend(); + const client = new OpenWorkflow({ backend }); + + const schema = z.object({ + limit: z.coerce.number().int().positive(), + }); + + const workflow = client.defineWorkflow( + { + name: "concurrency-function-limit-only-test", + schema, + concurrency: { + limit: ({ input }) => Number(input.limit), + }, + }, + noopFn, + ); + + const handle = await workflow.run({ + limit: "4", + }); + + expect(handle.workflowRun.concurrencyKey).toBeNull(); + expect(handle.workflowRun.concurrencyLimit).toBe(4); + }); + + test("throws when resolved workflow concurrency key is invalid", async () => { + const backend = await createBackend(); + const client = new OpenWorkflow({ backend }); + + const workflow = client.defineWorkflow( + { + name: "concurrency-invalid-key-test", + concurrency: { + key: () => " ", + limit: 1, + }, + }, + noopFn, + ); + + await expect(workflow.run()).rejects.toThrow( + /Invalid concurrency key for workflow "concurrency-invalid-key-test"/, + ); + }); + + test("throws when resolved workflow concurrency limit is invalid", async () => { + const backend = await createBackend(); + const client = new OpenWorkflow({ backend }); + + const workflow = client.defineWorkflow( + { + name: "concurrency-invalid-limit-test", + concurrency: { + key: "tenant:acme", + limit: 0, + }, + }, + noopFn, + ); + + await expect(workflow.run()).rejects.toThrow( + /Invalid concurrency limit for workflow "concurrency-invalid-limit-test"/, + ); + }); + + test("throws when workflow concurrency resolver throws and does not enqueue", async () => { + const backend = await createBackend(); + const client = new OpenWorkflow({ backend }); + + const workflow = client.defineWorkflow( + { + name: "concurrency-throwing-resolver-test", + concurrency: { + key: () => { + throw new Error("resolver failed"); + }, + limit: 1, + }, + }, + noopFn, + ); + + await expect(workflow.run()).rejects.toThrow( + /Failed to resolve concurrency key for workflow "concurrency-throwing-resolver-test"/, + ); + + const claimed = await backend.claimWorkflowRun({ + workerId: randomUUID(), + leaseDurationMs: 1000, + }); + expect(claimed).toBeNull(); + }); + test("creates workflow run with idempotency key", async () => { const backend = await createBackend(); const client = new OpenWorkflow({ backend }); diff --git a/packages/openworkflow/client.ts b/packages/openworkflow/client.ts index 6be212b1..5f39069a 100644 --- a/packages/openworkflow/client.ts +++ b/packages/openworkflow/client.ts @@ -13,6 +13,7 @@ import { WorkflowRegistry } from "./registry.js"; import { Worker } from "./worker.js"; import { defineWorkflow, + type WorkflowConcurrency, type Workflow, type WorkflowSpec, } from "./workflow.js"; @@ -100,11 +101,18 @@ export class OpenWorkflow { throw new Error(validationResult.error); } const parsedInput = validationResult.value; + const resolvedConcurrency = resolveWorkflowConcurrency( + spec.name, + spec.concurrency, + parsedInput, + ); const workflowRun = await this.backend.createWorkflowRun({ workflowName: spec.name, version: spec.version ?? null, idempotencyKey: options?.idempotencyKey ?? null, + concurrencyKey: resolvedConcurrency.concurrencyKey, + concurrencyLimit: resolvedConcurrency.concurrencyLimit, config: {}, context: null, input: parsedInput ?? null, @@ -234,6 +242,86 @@ function resolveAvailableAt( return result.value; } +/** + * Resolved workflow concurrency values persisted on a workflow run. + */ +interface ResolvedWorkflowConcurrency { + concurrencyKey: string | null; + concurrencyLimit: number | null; +} + +/** + * Resolve and validate workflow concurrency configuration. + * @param workflowName - Workflow name (for error messages) + * @param concurrency - Workflow concurrency definition + * @param input - Validated workflow input + * @returns Resolved concurrency fields to persist on the run + * @throws {Error} When resolver execution fails or resolved values are invalid + */ +function resolveWorkflowConcurrency( + workflowName: string, + concurrency: WorkflowConcurrency | undefined, + input: Input, +): ResolvedWorkflowConcurrency { + if (!concurrency) { + return { + concurrencyKey: null, + concurrencyLimit: null, + }; + } + + let limitValue: unknown; + try { + limitValue = + typeof concurrency.limit === "function" + ? concurrency.limit({ input }) + : concurrency.limit; + } catch (error) { + throw new Error( + `Failed to resolve concurrency limit for workflow "${workflowName}"`, + { cause: error }, + ); + } + + if ( + typeof limitValue !== "number" || + !Number.isInteger(limitValue) || + limitValue <= 0 + ) { + throw new Error( + `Invalid concurrency limit for workflow "${workflowName}": expected a positive integer`, + ); + } + + let keyValue: string | null = null; + if (concurrency.key !== undefined) { + let resolvedKey: unknown; + try { + resolvedKey = + typeof concurrency.key === "function" + ? concurrency.key({ input }) + : concurrency.key; + } catch (error) { + throw new Error( + `Failed to resolve concurrency key for workflow "${workflowName}"`, + { cause: error }, + ); + } + + if (typeof resolvedKey !== "string" || resolvedKey.trim().length === 0) { + throw new Error( + `Invalid concurrency key for workflow "${workflowName}": expected a non-empty string`, + ); + } + keyValue = resolvedKey; + } + + return { + concurrencyKey: keyValue, + concurrencyLimit: limitValue, + }; +} + /** * Options for WorkflowHandle. */ diff --git a/packages/openworkflow/core/workflow.ts b/packages/openworkflow/core/workflow.ts index a73dd253..62e17e8a 100644 --- a/packages/openworkflow/core/workflow.ts +++ b/packages/openworkflow/core/workflow.ts @@ -24,6 +24,8 @@ export interface WorkflowRun { version: string | null; status: WorkflowRunStatus; idempotencyKey: string | null; + concurrencyKey: string | null; + concurrencyLimit: number | null; config: JsonValue; // user-defined config context: JsonValue | null; // runtime execution metadata input: JsonValue | null; diff --git a/packages/openworkflow/index.ts b/packages/openworkflow/index.ts index fa0d6304..6a416413 100644 --- a/packages/openworkflow/index.ts +++ b/packages/openworkflow/index.ts @@ -7,7 +7,12 @@ export type { WorkerOptions } from "./worker.js"; export { Worker } from "./worker.js"; // workflow -export type { RetryPolicy, Workflow } from "./workflow.js"; +export type { + RetryPolicy, + Workflow, + WorkflowConcurrency, + WorkflowConcurrencyResolverParams, +} from "./workflow.js"; export { defineWorkflowSpec, defineWorkflow, diff --git a/packages/openworkflow/postgres/backend.test.ts b/packages/openworkflow/postgres/backend.test.ts index 53ba08e0..c8e95c38 100644 --- a/packages/openworkflow/postgres/backend.test.ts +++ b/packages/openworkflow/postgres/backend.test.ts @@ -62,6 +62,8 @@ describe("BackendPostgres schema option", () => { workflowName: "schema-test", version: null, idempotencyKey: null, + concurrencyKey: null, + concurrencyLimit: null, input: null, config: {}, context: null, @@ -108,6 +110,8 @@ describe("BackendPostgres schema option", () => { workflowName: "schema-reschedule-test", version: null, idempotencyKey: null, + concurrencyKey: null, + concurrencyLimit: null, input: null, config: {}, context: null, @@ -166,3 +170,91 @@ describe("BackendPostgres schema option", () => { } }); }); + +describe("BackendPostgres concurrency claim atomicity", () => { + test("serializes same-bucket concurrent claims with advisory bucket locks", async () => { + const namespaceId = randomUUID(); + const workflowName = "advisory-claim-atomicity"; + const version = "v1"; + const concurrencyKey = "tenant:acme"; + const concurrencyLimit = 1; + const rounds = 5; + const claimers = 6; + + const backends = await Promise.all( + Array.from({ length: claimers }, async () => { + return await BackendPostgres.connect(DEFAULT_POSTGRES_URL, { + namespaceId, + }); + }), + ); + const primaryBackend = backends[0]; + if (!primaryBackend) { + throw new Error("Expected at least one backend instance"); + } + const inspector = newPostgresMaxOne(DEFAULT_POSTGRES_URL); + + try { + for (let i = 0; i < rounds; i += 1) { + await primaryBackend.createWorkflowRun({ + workflowName, + version, + idempotencyKey: null, + concurrencyKey, + concurrencyLimit, + input: null, + config: {}, + context: null, + availableAt: null, + deadlineAt: null, + }); + } + + for (let round = 0; round < rounds; round += 1) { + const claims = await Promise.all( + backends.map(async (backend, i) => { + return await backend.claimWorkflowRun({ + workerId: `worker-${String(round)}-${String(i)}-${randomUUID()}`, + leaseDurationMs: 5000, + }); + }), + ); + const claimed = claims.filter((run): run is NonNullable => { + return run !== null; + }); + expect(claimed).toHaveLength(1); + + const workflowRunsTable = inspector`${inspector("openworkflow")}.${inspector("workflow_runs")}`; + const [activeCount] = await inspector<{ count: number }[]>` + SELECT COUNT(*)::INT AS "count" + FROM ${workflowRunsTable} + WHERE "namespace_id" = ${namespaceId} + AND "workflow_name" = ${workflowName} + AND "version" IS NOT DISTINCT FROM ${version} + AND "concurrency_key" = ${concurrencyKey} + AND "status" = 'running' + AND "available_at" > NOW() + `; + expect(activeCount?.count).toBe(1); + + const claimedRun = claimed[0]; + if (!claimedRun?.workerId) { + throw new Error("Expected claimed workflow run to include worker id"); + } + + await primaryBackend.completeWorkflowRun({ + workflowRunId: claimedRun.id, + workerId: claimedRun.workerId, + output: null, + }); + } + } finally { + await Promise.all( + backends.map(async (backend) => { + await backend.stop(); + }), + ); + await inspector.end(); + } + }); +}); diff --git a/packages/openworkflow/postgres/backend.ts b/packages/openworkflow/postgres/backend.ts index 359b5796..c9a368e8 100644 --- a/packages/openworkflow/postgres/backend.ts +++ b/packages/openworkflow/postgres/backend.ts @@ -1,3 +1,8 @@ +import { + CONCURRENCY_LIMIT_MISMATCH_ERROR, + normalizeCreateWorkflowRunParams, + toConcurrencyBucket, +} from "../backend-concurrency.js"; import { DEFAULT_NAMESPACE_ID, DEFAULT_RUN_IDEMPOTENCY_PERIOD_MS, @@ -100,38 +105,48 @@ export class BackendPostgres implements Backend { async createWorkflowRun( params: CreateWorkflowRunParams, ): Promise { - if (params.idempotencyKey === null) { - return await this.insertWorkflowRun(this.pg, params); + const normalizedParams = normalizeCreateWorkflowRunParams(params); + const concurrencyBucket = toConcurrencyBucket(normalizedParams); + + if ( + normalizedParams.idempotencyKey === null && + concurrencyBucket === null + ) { + return await this.insertWorkflowRun(this.pg, normalizedParams); } - const { workflowName, idempotencyKey } = params; - const lockScope = JSON.stringify({ - namespaceId: this.namespaceId, - workflowName, - idempotencyKey, - }); - return await this.pg.begin(async (_tx) => { const pgTx = _tx as unknown as Postgres; - /* eslint-disable @cspell/spellchecker */ - await pgTx.unsafe( - "SELECT pg_advisory_xact_lock(hashtextextended($1, 0::bigint))", - [lockScope], - ); - /* eslint-enable @cspell/spellchecker */ + if (normalizedParams.idempotencyKey !== null) { + // Acquire/check idempotency first so duplicate create requests can + // return early without taking a bucket lock. + await this.acquireIdempotencyCreateLock( + pgTx, + normalizedParams.workflowName, + normalizedParams.idempotencyKey, + ); - const existing = await this.getWorkflowRunByIdempotencyKey( - pgTx, - workflowName, - idempotencyKey, - new Date(Date.now() - DEFAULT_RUN_IDEMPOTENCY_PERIOD_MS), - ); - if (existing) { - return existing; + const existing = await this.getWorkflowRunByIdempotencyKey( + pgTx, + normalizedParams.workflowName, + normalizedParams.idempotencyKey, + new Date(Date.now() - DEFAULT_RUN_IDEMPOTENCY_PERIOD_MS), + ); + if (existing) { + return existing; + } } - return await this.insertWorkflowRun(pgTx, params); + if (concurrencyBucket) { + await this.acquireConcurrencyCreateLock(pgTx, concurrencyBucket); + await this.assertNoActiveBucketConcurrencyLimitMismatch( + pgTx, + concurrencyBucket, + ); + } + + return await this.insertWorkflowRun(pgTx, normalizedParams); }); } @@ -149,6 +164,8 @@ export class BackendPostgres implements Backend { "version", "status", "idempotency_key", + "concurrency_key", + "concurrency_limit", "config", "context", "input", @@ -165,6 +182,8 @@ export class BackendPostgres implements Backend { ${params.version}, 'pending', ${params.idempotencyKey}, + ${params.concurrencyKey ?? null}, + ${params.concurrencyLimit ?? null}, ${pg.json(params.config)}, ${pg.json(params.context)}, ${pg.json(params.input)}, @@ -204,6 +223,78 @@ export class BackendPostgres implements Backend { return workflowRun ?? null; } + private async acquireConcurrencyCreateLock( + pg: Postgres, + params: { + workflowName: string; + version: string | null; + key: string | null; + }, + ): Promise { + // Intentionally uses a different lock payload shape than claim-time locks. + // Create-time lock serializes concurrent creates in the bucket, while + // claim-time lock serializes concurrent claim gate evaluation. + const lockScope = JSON.stringify({ + namespaceId: this.namespaceId, + workflowName: params.workflowName, + version: params.version, + concurrencyKey: params.key, + }); + + // Hash collisions are extremely unlikely; if they happen, unrelated + // buckets may serialize, but correctness is preserved. + await pg.unsafe( + "SELECT pg_advisory_xact_lock(hashtextextended($1, 0::bigint))", + [lockScope], + ); + } + + private async acquireIdempotencyCreateLock( + pg: Postgres, + workflowName: string, + idempotencyKey: string, + ): Promise { + const lockScope = JSON.stringify({ + namespaceId: this.namespaceId, + workflowName, + idempotencyKey, + }); + + await pg.unsafe( + "SELECT pg_advisory_xact_lock(hashtextextended($1, 0::bigint))", + [lockScope], + ); + } + + private async assertNoActiveBucketConcurrencyLimitMismatch( + pg: Postgres, + params: { + workflowName: string; + version: string | null; + key: string | null; + limit: number; + }, + ): Promise { + const workflowRunsTable = this.workflowRunsTable(pg); + const [conflict] = await pg<{ id: string }[]>` + SELECT "id" + FROM ${workflowRunsTable} + WHERE "namespace_id" = ${this.namespaceId} + AND "workflow_name" = ${params.workflowName} + AND "version" IS NOT DISTINCT FROM ${params.version} + AND "concurrency_key" IS NOT DISTINCT FROM ${params.key} + -- Sleeping runs are excluded so long sleeps do not pin historical + -- limits and block new run creation after config changes. + AND "status" IN ('pending', 'running') + AND "concurrency_limit" IS DISTINCT FROM ${params.limit} + LIMIT 1 + `; + + if (conflict) { + throw new Error(CONCURRENCY_LIMIT_MISMATCH_ERROR); + } + } + async getWorkflowRun( params: GetWorkflowRunParams, ): Promise { @@ -301,16 +392,56 @@ export class BackendPostgres implements Backend { RETURNING "id" ), candidate AS ( - SELECT "id" - FROM ${workflowRunsTable} - WHERE "namespace_id" = ${this.namespaceId} - AND "status" IN ('pending', 'running', 'sleeping') - AND "available_at" <= NOW() - AND ("deadline_at" IS NULL OR "deadline_at" > NOW()) + SELECT wr."id" + FROM ${workflowRunsTable} AS wr + WHERE wr."namespace_id" = ${this.namespaceId} + AND wr."status" IN ('pending', 'running', 'sleeping') + AND wr."available_at" <= NOW() + AND (wr."deadline_at" IS NULL OR wr."deadline_at" > NOW()) + AND ( + wr."concurrency_limit" IS NULL + OR CASE + -- cspell:ignore xact hashtextextended + -- Serialize constrained claims per bucket. pg_try_advisory lock + -- intentionally skips busy buckets (non-blocking) to avoid + -- over-claim races without stalling claim loops. This lock key + -- shape intentionally differs from create-time locking because + -- claims and creates are serialized within their own hot paths. + -- Hash collisions can over-serialize unrelated buckets + -- (throughput impact only). + WHEN pg_try_advisory_xact_lock( + hashtextextended( + json_build_array( + wr."namespace_id", + wr."workflow_name", + wr."version", + wr."concurrency_key" + )::text, + 0::bigint + ) + ) THEN ( + -- TODO: If claim latency becomes a hot spot, replace this + -- correlated count with precomputed bucket counts via a CTE. + SELECT COUNT(*) + FROM ${workflowRunsTable} AS active + WHERE active."namespace_id" = wr."namespace_id" + AND active."workflow_name" = wr."workflow_name" + AND active."version" IS NOT DISTINCT FROM wr."version" + AND active."concurrency_key" IS NOT DISTINCT FROM wr."concurrency_key" + AND active."status" = 'running' + -- Candidates require available_at <= NOW(); active leased runs + -- require available_at > NOW(). Keep explicit self-exclusion + -- for readability/safety. + AND active."id" <> wr."id" + AND active."available_at" > NOW() + ) < wr."concurrency_limit" + ELSE FALSE + END + ) ORDER BY - CASE WHEN "status" = 'pending' THEN 0 ELSE 1 END, - "available_at", - "created_at" + CASE WHEN wr."status" = 'pending' THEN 0 ELSE 1 END, + wr."available_at", + wr."created_at" LIMIT 1 FOR UPDATE SKIP LOCKED ) diff --git a/packages/openworkflow/postgres/postgres.test.ts b/packages/openworkflow/postgres/postgres.test.ts index 5a11ad3b..b5fad2d1 100644 --- a/packages/openworkflow/postgres/postgres.test.ts +++ b/packages/openworkflow/postgres/postgres.test.ts @@ -55,6 +55,54 @@ describe("postgres", () => { await migrate(pg, schema); await migrate(pg, schema); }); + + test("adds workflow concurrency columns and index", async () => { + const schema = "test_concurrency_columns"; + await dropSchema(pg, schema); + await migrate(pg, schema); + + try { + const columns = await pg< + { + columnName: string; + }[] + >` + SELECT column_name AS "columnName" + FROM information_schema.columns + WHERE table_schema = ${schema} + AND table_name = 'workflow_runs' + AND column_name IN ('concurrency_key', 'concurrency_limit') + ORDER BY column_name ASC + `; + expect(columns.map((column) => column.columnName)).toEqual([ + "concurrency_key", + "concurrency_limit", + ]); + + /* cspell:disable */ + const indexes = await pg< + { + indexName: string; + indexDef: string; + }[] + >` + SELECT + indexname AS "indexName", + indexdef AS "indexDef" + FROM pg_indexes + WHERE schemaname = ${schema} + AND tablename = 'workflow_runs' + AND indexname = 'workflow_runs_concurrency_active_idx' + `; + /* cspell:enable */ + expect(indexes).toHaveLength(1); + expect(indexes[0]?.indexDef).toMatch( + /WHERE\s+\((?:"concurrency_limit"|concurrency_limit)\s+IS\s+NOT\s+NULL\)/i, + ); + } finally { + await dropSchema(pg, schema); + } + }); }); describe("dropSchema()", () => { diff --git a/packages/openworkflow/postgres/postgres.ts b/packages/openworkflow/postgres/postgres.ts index 1a5d1b74..824562c9 100644 --- a/packages/openworkflow/postgres/postgres.ts +++ b/packages/openworkflow/postgres/postgres.ts @@ -199,6 +199,32 @@ export function migrations(schema: string): string[] { ON CONFLICT DO NOTHING; COMMIT;`, + + // 5 - workflow concurrency columns and indexes + `BEGIN; + + ALTER TABLE ${quotedSchema}."workflow_runs" + ADD COLUMN IF NOT EXISTS "concurrency_key" TEXT; + + ALTER TABLE ${quotedSchema}."workflow_runs" + ADD COLUMN IF NOT EXISTS "concurrency_limit" INTEGER; + + CREATE INDEX IF NOT EXISTS "workflow_runs_concurrency_active_idx" + ON ${quotedSchema}."workflow_runs" ( + "namespace_id", + "workflow_name", + "version", + "concurrency_key", + "status", + "available_at" + ) + WHERE "concurrency_limit" IS NOT NULL; + + INSERT INTO ${quotedSchema}."openworkflow_migrations"("version") + VALUES (5) + ON CONFLICT DO NOTHING; + + COMMIT;`, ]; } diff --git a/packages/openworkflow/sqlite/backend.test.ts b/packages/openworkflow/sqlite/backend.test.ts index c0138658..883de220 100644 --- a/packages/openworkflow/sqlite/backend.test.ts +++ b/packages/openworkflow/sqlite/backend.test.ts @@ -96,6 +96,8 @@ describe("BackendSqlite.createWorkflowRun error handling", () => { workflowName: "failing-workflow", version: "v1", idempotencyKey: randomUUID(), + concurrencyKey: null, + concurrencyLimit: null, config: {}, context: null, input: null, @@ -143,6 +145,8 @@ describe("BackendSqlite.createWorkflowRun error handling", () => { workflowName: "failing-workflow", version: "v1", idempotencyKey: randomUUID(), + concurrencyKey: null, + concurrencyLimit: null, config: {}, context: null, input: null, diff --git a/packages/openworkflow/sqlite/backend.ts b/packages/openworkflow/sqlite/backend.ts index 30641213..30b94351 100644 --- a/packages/openworkflow/sqlite/backend.ts +++ b/packages/openworkflow/sqlite/backend.ts @@ -1,3 +1,8 @@ +import { + CONCURRENCY_LIMIT_MISMATCH_ERROR, + normalizeCreateWorkflowRunParams, + toConcurrencyBucket, +} from "../backend-concurrency.js"; import { DEFAULT_NAMESPACE_ID, DEFAULT_RUN_IDEMPOTENCY_PERIOD_MS, @@ -94,26 +99,37 @@ export class BackendSqlite implements Backend { } createWorkflowRun(params: CreateWorkflowRunParams): Promise { - const { workflowName, idempotencyKey } = params; + const normalizedParams = normalizeCreateWorkflowRunParams(params); + const concurrencyBucket = toConcurrencyBucket(normalizedParams); + const { workflowName, idempotencyKey } = normalizedParams; - if (idempotencyKey === null) { - return Promise.resolve(this.insertWorkflowRun(params)); + if (idempotencyKey === null && concurrencyBucket === null) { + return Promise.resolve(this.insertWorkflowRun(normalizedParams)); } try { + // BEGIN IMMEDIATE takes a RESERVED write lock up-front. This serializes + // create checks/writes across writers, which is the SQLite consistency + // model we rely on (instead of row/advisory locks). this.db.exec("BEGIN IMMEDIATE"); - const existing = this.getWorkflowRunByIdempotencyKey( - workflowName, - idempotencyKey, - new Date(Date.now() - DEFAULT_RUN_IDEMPOTENCY_PERIOD_MS), - ); - if (existing) { - this.db.exec("COMMIT"); - return Promise.resolve(existing); + if (idempotencyKey !== null) { + const existing = this.getWorkflowRunByIdempotencyKey( + workflowName, + idempotencyKey, + new Date(Date.now() - DEFAULT_RUN_IDEMPOTENCY_PERIOD_MS), + ); + if (existing) { + this.db.exec("COMMIT"); + return Promise.resolve(existing); + } } - const workflowRun = this.insertWorkflowRun(params); + if (concurrencyBucket) { + this.assertNoActiveBucketConcurrencyLimitMismatch(concurrencyBucket); + } + + const workflowRun = this.insertWorkflowRun(normalizedParams); this.db.exec("COMMIT"); return Promise.resolve(workflowRun); } catch (error) { @@ -144,6 +160,8 @@ export class BackendSqlite implements Backend { "version", "status", "idempotency_key", + "concurrency_key", + "concurrency_limit", "config", "context", "input", @@ -153,7 +171,7 @@ export class BackendSqlite implements Backend { "created_at", "updated_at" ) - VALUES (?, ?, ?, ?, 'pending', ?, ?, ?, ?, 0, ?, ?, ?, ?) + VALUES (?, ?, ?, ?, 'pending', ?, ?, ?, ?, ?, ?, 0, ?, ?, ?, ?) `); stmt.run( @@ -162,6 +180,8 @@ export class BackendSqlite implements Backend { params.workflowName, params.version, params.idempotencyKey, + params.concurrencyKey ?? null, + params.concurrencyLimit ?? null, toJSON(params.config), toJSON(params.context), toJSON(params.input), @@ -211,6 +231,49 @@ export class BackendSqlite implements Backend { return row ? rowToWorkflowRun(row) : null; } + private assertNoActiveBucketConcurrencyLimitMismatch(params: { + workflowName: string; + version: string | null; + key: string | null; + limit: number; + }): void { + const stmt = this.db.prepare(` + SELECT "id" + FROM "workflow_runs" + WHERE "namespace_id" = ? + AND "workflow_name" = ? + AND ( + "version" = ? + OR ("version" IS NULL AND ? IS NULL) + ) + AND ( + "concurrency_key" = ? + OR ("concurrency_key" IS NULL AND ? IS NULL) + ) + -- Sleeping runs are excluded so long sleeps do not pin historical + -- limits and block new run creation after config changes. + AND "status" IN ('pending', 'running') + -- "params.limit" is always non-null because this is called only when + -- toConcurrencyBucket() returns a constrained bucket. + AND ("concurrency_limit" <> ? OR "concurrency_limit" IS NULL) + LIMIT 1 + `); + + const conflict = stmt.get( + this.namespaceId, + params.workflowName, + params.version, + params.version, + params.key, + params.key, + params.limit, + ) as { id: string } | undefined; + + if (conflict) { + throw new Error(CONCURRENCY_LIMIT_MISMATCH_ERROR); + } + } + getWorkflowRun(params: GetWorkflowRunParams): Promise { const stmt = this.db.prepare(` SELECT * @@ -232,7 +295,8 @@ export class BackendSqlite implements Backend { const currentTime = now(); const newAvailableAt = addMilliseconds(currentTime, params.leaseDurationMs); - // SQLite doesn't have SKIP LOCKED, so we need to handle claims differently + // SQLite doesn't have SKIP LOCKED. BEGIN IMMEDIATE serializes writers with + // the database's single-writer lock, which is the intended claim model. this.db.exec("BEGIN IMMEDIATE"); try { @@ -262,16 +326,47 @@ export class BackendSqlite implements Backend { // 2. find an available workflow run to claim const findStmt = this.db.prepare(` - SELECT "id" - FROM "workflow_runs" - WHERE "namespace_id" = ? - AND "status" IN ('pending', 'running', 'sleeping') - AND "available_at" <= ? - AND ("deadline_at" IS NULL OR "deadline_at" > ?) + SELECT wr."id" + FROM "workflow_runs" AS wr + WHERE wr."namespace_id" = ? + AND wr."status" IN ('pending', 'running', 'sleeping') + AND wr."available_at" <= ? + AND (wr."deadline_at" IS NULL OR wr."deadline_at" > ?) + AND ( + wr."concurrency_limit" IS NULL + OR ( + -- TODO: If claim latency becomes a hot spot, replace this + -- correlated count with precomputed bucket counts via a CTE. + SELECT COUNT(*) + FROM "workflow_runs" AS active + WHERE active."namespace_id" = wr."namespace_id" + AND active."workflow_name" = wr."workflow_name" + AND ( + active."version" = wr."version" + OR ( + active."version" IS NULL + AND wr."version" IS NULL + ) + ) + AND ( + active."concurrency_key" = wr."concurrency_key" + OR ( + active."concurrency_key" IS NULL + AND wr."concurrency_key" IS NULL + ) + ) + AND active."status" = 'running' + -- Candidates require available_at <= now; active leased runs + -- require available_at > now. Keep explicit self-exclusion + -- for readability/safety. + AND active."id" <> wr."id" + AND active."available_at" > ? + ) < wr."concurrency_limit" + ) ORDER BY - CASE WHEN "status" = 'pending' THEN 0 ELSE 1 END, - "available_at", - "created_at" + CASE WHEN wr."status" = 'pending' THEN 0 ELSE 1 END, + wr."available_at", + wr."created_at" LIMIT 1 `); @@ -279,6 +374,7 @@ export class BackendSqlite implements Backend { this.namespaceId, currentTime, currentTime, + currentTime, ) as { id: string } | undefined; if (!candidate) { @@ -941,6 +1037,8 @@ interface WorkflowRunRow { version: string | null; status: string; idempotency_key: string | null; + concurrency_key: string | null; + concurrency_limit: number | null; config: string; context: string | null; input: string | null; @@ -1000,6 +1098,8 @@ function rowToWorkflowRun(row: WorkflowRunRow): WorkflowRun { version: row.version, status: row.status as WorkflowRun["status"], idempotencyKey: row.idempotency_key, + concurrencyKey: row.concurrency_key, + concurrencyLimit: row.concurrency_limit, config: config as WorkflowRun["config"], context: fromJSON(row.context) as WorkflowRun["context"], input: fromJSON(row.input) as WorkflowRun["input"], diff --git a/packages/openworkflow/sqlite/sqlite.test.ts b/packages/openworkflow/sqlite/sqlite.test.ts index e08cfa88..ce823540 100644 --- a/packages/openworkflow/sqlite/sqlite.test.ts +++ b/packages/openworkflow/sqlite/sqlite.test.ts @@ -193,6 +193,34 @@ describe("sqlite", () => { .get() as { count: number }; expect(stepAttemptsCheck.count).toBe(1); }); + + test("adds workflow concurrency columns and index", () => { + migrate(db); + + const columns = db + .prepare(`PRAGMA table_info("workflow_runs")`) + .all() as { name: string }[]; + const columnNames = columns.map((column) => column.name); + expect(columnNames).toContain("concurrency_key"); + expect(columnNames).toContain("concurrency_limit"); + + const indexRows = db + .prepare(`PRAGMA index_list("workflow_runs")`) + .all() as { name: string }[]; + const indexNames = indexRows.map((index) => index.name); + expect(indexNames).toContain("workflow_runs_concurrency_active_idx"); + + const concurrencyIndex = db + .prepare( + `SELECT sql FROM sqlite_master WHERE type = 'index' AND name = ?`, + ) + .get("workflow_runs_concurrency_active_idx") as + | { sql: string | null } + | undefined; + expect(concurrencyIndex?.sql).toContain( + 'WHERE "concurrency_limit" IS NOT NULL', + ); + }); }); describe("migration version tracking", () => { diff --git a/packages/openworkflow/sqlite/sqlite.ts b/packages/openworkflow/sqlite/sqlite.ts index bf3fd163..5b511bd0 100644 --- a/packages/openworkflow/sqlite/sqlite.ts +++ b/packages/openworkflow/sqlite/sqlite.ts @@ -193,6 +193,31 @@ export function migrations(): string[] { VALUES (4); COMMIT;`, + + // 5 - workflow concurrency columns and indexes + `BEGIN; + + ALTER TABLE "workflow_runs" + ADD COLUMN "concurrency_key" TEXT; + + ALTER TABLE "workflow_runs" + ADD COLUMN "concurrency_limit" INTEGER; + + CREATE INDEX IF NOT EXISTS "workflow_runs_concurrency_active_idx" + ON "workflow_runs" ( + "namespace_id", + "workflow_name", + "version", + "concurrency_key", + "status", + "available_at" + ) + WHERE "concurrency_limit" IS NOT NULL; + + INSERT OR IGNORE INTO "openworkflow_migrations" ("version") + VALUES (5); + + COMMIT;`, ]; } diff --git a/packages/openworkflow/worker.test.ts b/packages/openworkflow/worker.test.ts index 1a203cf6..e7d78780 100644 --- a/packages/openworkflow/worker.test.ts +++ b/packages/openworkflow/worker.test.ts @@ -84,6 +84,8 @@ describe("Worker", () => { workflowName: "missing", version: null, idempotencyKey: null, + concurrencyKey: null, + concurrencyLimit: null, config: {}, context: null, input: null, @@ -316,6 +318,202 @@ describe("Worker", () => { expect(completed).toBe(2); }); + test("worker concurrency > 1 still respects workflow concurrency limits", async () => { + const backend = await createBackend(); + const client = new OpenWorkflow({ backend }); + const stepReleases: (() => void)[] = []; + + const workflow = client.defineWorkflow( + { + name: "workflow-concurrency-worker-test", + concurrency: { + key: "tenant:acme", + limit: 1, + }, + }, + async ({ step }) => { + await step.run({ name: "block" }, async () => { + await new Promise((resolve) => { + stepReleases.push(resolve); + }); + }); + + return "done"; + }, + ); + + const worker = client.newWorker({ concurrency: 2 }); + const first = await workflow.run(); + const second = await workflow.run(); + + await worker.tick(); + await sleep(100); + + const firstStateAfterFirstTick = await backend.getWorkflowRun({ + workflowRunId: first.workflowRun.id, + }); + const secondStateAfterFirstTick = await backend.getWorkflowRun({ + workflowRunId: second.workflowRun.id, + }); + expect(firstStateAfterFirstTick?.status).toBe("running"); + expect(secondStateAfterFirstTick?.status).toBe("pending"); + + const releaseFirst = stepReleases.shift(); + releaseFirst?.(); + await sleep(100); + + await worker.tick(); + await sleep(100); + + const firstStateAfterSecondTick = await backend.getWorkflowRun({ + workflowRunId: first.workflowRun.id, + }); + const secondStateAfterSecondTick = await backend.getWorkflowRun({ + workflowRunId: second.workflowRun.id, + }); + expect(firstStateAfterSecondTick?.status).toBe("completed"); + expect(secondStateAfterSecondTick?.status).toBe("running"); + + const releaseSecond = stepReleases.shift(); + releaseSecond?.(); + await sleep(100); + + await expect(first.result()).resolves.toBe("done"); + await expect(second.result()).resolves.toBe("done"); + }); + + test("worker claims next same-bucket run after terminal failure", async () => { + const backend = await createBackend(); + const client = new OpenWorkflow({ backend }); + + const workflow = client.defineWorkflow( + { + name: "worker-concurrency-terminal-failure", + concurrency: { + key: "tenant:acme", + limit: 1, + }, + retryPolicy: { + maximumAttempts: 1, + }, + }, + ({ input }: { input: { shouldFail: boolean } }) => { + if (input.shouldFail) { + throw new Error("terminal failure"); + } + return "done"; + }, + ); + + const worker = client.newWorker({ concurrency: 2 }); + const failing = await workflow.run({ shouldFail: true }); + const succeeding = await workflow.run( + { shouldFail: false }, + { availableAt: new Date(Date.now() + 20) }, + ); + + await worker.tick(); + await sleep(60); + + const failedRun = await backend.getWorkflowRun({ + workflowRunId: failing.workflowRun.id, + }); + expect(failedRun?.status).toBe("failed"); + + await worker.tick(); + await sleep(60); + + const completedRun = await backend.getWorkflowRun({ + workflowRunId: succeeding.workflowRun.id, + }); + expect(completedRun?.status).toBe("completed"); + await expect(succeeding.result()).resolves.toBe("done"); + }); + + test("worker honors same-bucket serialization across retry reschedule", async () => { + const backend = await createBackend(); + const client = new OpenWorkflow({ backend }); + const attemptsById = new Map(); + + const workflow = client.defineWorkflow( + { + name: "worker-concurrency-retry-serialization", + concurrency: { + key: "tenant:acme", + limit: 1, + }, + retryPolicy: { + initialInterval: "120ms", + maximumInterval: "120ms", + backoffCoefficient: 1, + maximumAttempts: 2, + }, + }, + async ({ input, step }) => { + const typedInput = input as { id: "retry" | "sibling" }; + const currentAttempts = attemptsById.get(typedInput.id) ?? 0; + attemptsById.set(typedInput.id, currentAttempts + 1); + + if (typedInput.id === "retry" && currentAttempts === 0) { + throw new Error("retry once"); + } + + if (typedInput.id === "sibling") { + await step.run({ name: "hold-slot" }, async () => { + await sleep(180); + }); + } + + return typedInput.id; + }, + ); + + const worker = client.newWorker({ concurrency: 2 }); + const retrying = await workflow.run({ id: "retry" }); + const sibling = await workflow.run({ id: "sibling" }); + + await worker.tick(); + await sleep(30); + + const firstState = await backend.getWorkflowRun({ + workflowRunId: retrying.workflowRun.id, + }); + expect(firstState?.status).toBe("pending"); + expect(firstState?.availableAt).not.toBeNull(); + + await worker.tick(); + await sleep(40); + + const siblingRunning = await backend.getWorkflowRun({ + workflowRunId: sibling.workflowRun.id, + }); + expect(siblingRunning?.status).toBe("running"); + + await sleep(100); // retry run is due while sibling still holds the slot + + const blockedClaims = await worker.tick(); + expect(blockedClaims).toBe(0); + + await sleep(100); + + const retryClaimedAfterRelease = await worker.tick(); + expect(retryClaimedAfterRelease).toBe(1); + await sleep(60); + + const retryCompleted = await backend.getWorkflowRun({ + workflowRunId: retrying.workflowRun.id, + }); + const siblingCompleted = await backend.getWorkflowRun({ + workflowRunId: sibling.workflowRun.id, + }); + expect(retryCompleted?.status).toBe("completed"); + expect(retryCompleted?.attempts).toBe(2); + expect(siblingCompleted?.status).toBe("completed"); + + await expect(retrying.result()).resolves.toBe("retry"); + await expect(sibling.result()).resolves.toBe("sibling"); + }); + test("worker starts, processes work, and stops gracefully", async () => { const backend = await createBackend(); const client = new OpenWorkflow({ backend }); @@ -1180,6 +1378,8 @@ describe("Worker", () => { workflowName: "version-check", version: "v2", idempotencyKey: null, + concurrencyKey: null, + concurrencyLimit: null, config: {}, context: null, input: null, @@ -1215,6 +1415,8 @@ describe("Worker", () => { workflowName: "version-mismatch", version: "v1", idempotencyKey: null, + concurrencyKey: null, + concurrencyLimit: null, config: {}, context: null, input: null, @@ -1250,6 +1452,8 @@ describe("Worker", () => { workflowName: "version-required", version: null, idempotencyKey: null, + concurrencyKey: null, + concurrencyLimit: null, config: {}, context: null, input: null, diff --git a/packages/openworkflow/workflow.ts b/packages/openworkflow/workflow.ts index 56589cf6..30da25c3 100644 --- a/packages/openworkflow/workflow.ts +++ b/packages/openworkflow/workflow.ts @@ -3,6 +3,32 @@ import type { SerializedError } from "./core/error.js"; import type { StandardSchemaV1 } from "./core/schema.js"; import { WorkflowFunction } from "./execution.js"; +/** + * Resolver input for workflow concurrency configuration. + */ +export interface WorkflowConcurrencyResolverParams { + input: Input; +} + +/** + * Workflow-level concurrency configuration. + */ +export interface WorkflowConcurrency { + /** + * Optional bucket key used to scope concurrency for this run. + * When omitted, runs use the default workflow+version bucket. + */ + readonly key?: + | string + | ((params: Readonly>) => string); + /** + * Maximum active leased runs allowed in the bucket. + */ + readonly limit: + | number + | ((params: Readonly>) => number); +} + /** * A workflow spec. */ @@ -15,6 +41,8 @@ export interface WorkflowSpec { readonly schema?: StandardSchemaV1; /** The retry policy for the workflow. */ readonly retryPolicy?: Partial; + /** Optional workflow-level concurrency configuration. */ + readonly concurrency?: WorkflowConcurrency; /** Phantom type carrier - won't exist at runtime. */ readonly __types?: { output: Output;