From 61b9b929d6203df2ff7cf48d2bd41417bb2b7780 Mon Sep 17 00:00:00 2001 From: Vaggeilis Yfantis Date: Sun, 15 Feb 2026 10:32:45 +0200 Subject: [PATCH 1/3] feat: Introduce worflow level concurrency --- ARCHITECTURE.md | 41 +- packages/docs/docs/workers.mdx | 34 +- packages/docs/docs/workflows.mdx | 37 ++ packages/openworkflow/README.md | 27 + packages/openworkflow/backend.testsuite.ts | 624 +++++++++++++++++- packages/openworkflow/backend.ts | 2 + packages/openworkflow/client.test.ts | 118 ++++ packages/openworkflow/client.ts | 84 +++ packages/openworkflow/core/workflow.ts | 2 + packages/openworkflow/index.ts | 7 +- .../openworkflow/postgres/backend.test.ts | 4 + packages/openworkflow/postgres/backend.ts | 111 +++- .../openworkflow/postgres/postgres.test.ts | 42 ++ packages/openworkflow/postgres/postgres.ts | 27 + packages/openworkflow/sqlite/backend.test.ts | 4 + packages/openworkflow/sqlite/backend.ts | 121 +++- packages/openworkflow/sqlite/sqlite.test.ts | 17 + packages/openworkflow/sqlite/sqlite.ts | 26 + packages/openworkflow/worker.test.ts | 204 ++++++ packages/openworkflow/workflow.ts | 27 + 20 files changed, 1523 insertions(+), 36 deletions(-) diff --git a/ARCHITECTURE.md b/ARCHITECTURE.md index af2631bd..b749c000 100644 --- a/ARCHITECTURE.md +++ b/ARCHITECTURE.md @@ -297,7 +297,7 @@ When the worker encounters this, it executes all steps within the `Promise.all` concurrently. It waits for all of them to complete before proceeding. Each step attempt is persisted individually as a `step_attempt`. -### 5.2. Workflow Concurrency +### 5.2. Worker Concurrency Workers are configured with a concurrency limit (e.g., 10). A worker will maintain up to 10 in-flight workflow runs simultaneously. It polls for new work @@ -305,7 +305,44 @@ only when it has available capacity. The Backend's atomic `dequeue` operation (`FOR UPDATE SKIP LOCKED`) ensures that multiple workers can poll the same table without race conditions or processing the same run twice. -### 5.3. Handling Crashes During Parallel Execution +### 5.3. Workflow-Run Concurrency + +In addition to worker-slot concurrency, workflows can define a per-run +concurrency policy in the workflow spec: + +```ts +defineWorkflow( + { + name: "process-order", + concurrency: { + key: ({ input }) => `tenant:${input.tenantId}`, + limit: ({ input }) => input.maxConcurrentOrders, + }, + }, + async ({ step }) => { + // ... + }, +); +``` + +`key` and `limit` can each be either static values (`string`/`number`) or +functions of the validated workflow input. They are resolved once when the run +is created and persisted on the `workflow_run`. +Resolved keys are stored verbatim; only empty/all-whitespace keys are rejected. + +During claim/dequeue, a run is claimable only when the number of active leased +`running` runs in the same bucket is below the run's `limit`. The bucket scope +is: + +- `namespace_id` +- `workflow_name` +- `version` (version-aware buckets) +- `concurrency_key` + +`pending`, `sleeping`, and expired-lease `running` runs do not consume +concurrency slots. + +### 5.4. Handling Crashes During Parallel Execution The `availableAt` heartbeat mechanism provides robust recovery. If a worker crashes while executing parallel steps, its heartbeat stops. The `availableAt` diff --git a/packages/docs/docs/workers.mdx b/packages/docs/docs/workers.mdx index 0b3b05bc..f2f82999 100644 --- a/packages/docs/docs/workers.mdx +++ b/packages/docs/docs/workers.mdx @@ -49,7 +49,7 @@ When a worker picks up a workflow: 5. **Execute**: New steps are executed and their results are stored 6. **Complete**: The workflow status is updated to `completed` or `failed` -## Concurrency +## Worker Concurrency Workers can process multiple workflow runs simultaneously. Configure concurrency in your `openworkflow.config.ts`: @@ -88,6 +88,38 @@ bunx @openworkflow/cli worker start --concurrency 10 capacity. +## Workflow Concurrency + +Workflow specs can also define concurrency buckets that are enforced at claim +time: + +```ts +defineWorkflow( + { + name: "process-order", + concurrency: { + key: ({ input }) => `tenant:${input.tenantId}`, // or: "tenant:acme" + limit: ({ input }) => input.maxConcurrentOrders, // or: 5 + }, + }, + async ({ step }) => { + // ... + }, +); +``` + +Workers will only claim a run when the bucket has capacity. Bucket scope is: + +- namespace +- workflow name +- workflow version +- resolved concurrency key + +Only active leased `running` runs consume workflow-concurrency slots. +Resolved keys are stored verbatim; only empty/all-whitespace keys are rejected. +Sleeping runs are non-consuming until they are claimed again as actively leased +`running` runs. + ## Heartbeats and Crash Recovery Workers maintain their claim on workflow runs through a heartbeat mechanism: diff --git a/packages/docs/docs/workflows.mdx b/packages/docs/docs/workflows.mdx index 198639cf..1918f9c6 100644 --- a/packages/docs/docs/workflows.mdx +++ b/packages/docs/docs/workflows.mdx @@ -189,6 +189,43 @@ defineWorkflow( Any `retryPolicy` fields you omit fall back to defaults. See [Retries](/docs/retries) for the full behavior and defaults. +### Concurrency (Optional) + +Control how many active leased `running` runs are allowed for a workflow bucket: + +```ts +defineWorkflow( + { + name: "process-order", + concurrency: { + key: ({ input }) => `tenant:${input.tenantId}`, // or: "tenant:acme" + limit: ({ input }) => input.maxConcurrentOrders, // or: 5 + }, + }, + async ({ input, step }) => { + // ... + }, +); +``` + +- `key` can be a string or a function `({ input }) => string` +- `limit` can be a number or a function `({ input }) => number` +- key must resolve to a non-empty string +- limit must resolve to a positive integer +- resolved keys are stored verbatim; only empty/all-whitespace keys are rejected + +When concurrency is configured, runs in the same bucket are constrained by: + +- namespace +- workflow name +- workflow version +- resolved `key` + +Only actively leased `running` runs consume slots. `pending`, `sleeping`, and +expired-lease runs do not. +Sleeping runs become slot-consuming only after they are claimed again as +actively leased `running` runs. + ### Idempotency Key (Optional) You can prevent duplicate run creation by providing an idempotency key, though diff --git a/packages/openworkflow/README.md b/packages/openworkflow/README.md index b8d9a93b..72b8ed45 100644 --- a/packages/openworkflow/README.md +++ b/packages/openworkflow/README.md @@ -67,11 +67,38 @@ For more details, check out our [docs](https://openworkflow.dev/docs). - ✅ **Long pauses** - Sleep for seconds or months - ✅ **Scheduled runs** - Start workflows at a specific time - ✅ **Parallel execution** - Run steps concurrently +- ✅ **Workflow concurrency** - Limit active runs by key (static or input-based) - ✅ **Idempotency keys** - Deduplicate repeated run requests (24h window) - ✅ **No extra servers** - Uses your existing database - ✅ **Dashboard included** - Monitor and debug workflows - ✅ **Production ready** - PostgreSQL and SQLite support +## Workflow Concurrency + +You can limit active leased `running` runs per workflow bucket: + +```ts +const workflow = defineWorkflow( + { + name: "process-order", + concurrency: { + key: ({ input }) => `tenant:${input.tenantId}`, // or: "tenant:acme" + limit: ({ input }) => input.maxConcurrentOrders, // or: 5 + }, + }, + async ({ step }) => { + // ... + }, +); +``` + +`key` must resolve to a non-empty string and `limit` must resolve to a positive +integer. Invalid values fail run creation. +Keys are stored verbatim (for example, `" foo "` and `"foo"` are different +concurrency keys); only empty or all-whitespace keys are rejected. +Sleeping runs do not consume workflow-concurrency slots until they are claimed +again as actively leased `running` runs. + ## Documentation - [Documentation](https://openworkflow.dev/docs) diff --git a/packages/openworkflow/backend.testsuite.ts b/packages/openworkflow/backend.testsuite.ts index 41dfd9bf..c9a8f3e1 100644 --- a/packages/openworkflow/backend.testsuite.ts +++ b/packages/openworkflow/backend.testsuite.ts @@ -1,5 +1,5 @@ import { DEFAULT_RUN_IDEMPOTENCY_PERIOD_MS } from "./backend.js"; -import type { Backend } from "./backend.js"; +import type { Backend, CreateWorkflowRunParams } from "./backend.js"; import type { StepAttempt } from "./core/step.js"; import type { WorkflowRun } from "./core/workflow.js"; import { DEFAULT_WORKFLOW_RETRY_POLICY } from "./workflow.js"; @@ -30,6 +30,13 @@ export function testBackend(options: TestBackendOptions): void { ...DEFAULT_WORKFLOW_RETRY_POLICY, maximumAttempts: 3, } as const; + const SHORT_WORKFLOW_RETRY_POLICY = { + ...DEFAULT_WORKFLOW_RETRY_POLICY, + initialInterval: "40ms", + maximumInterval: "40ms", + backoffCoefficient: 1, + maximumAttempts: 2, + } as const; describe("Backend", () => { let backend: Backend; @@ -51,6 +58,8 @@ export function testBackend(options: TestBackendOptions): void { version: randomUUID(), status: "pending", idempotencyKey: randomUUID(), + concurrencyKey: randomUUID(), + concurrencyLimit: 3, config: { key: "val" }, context: { key: "val" }, input: { key: "val" }, @@ -73,6 +82,8 @@ export function testBackend(options: TestBackendOptions): void { workflowName: expected.workflowName, version: expected.version, idempotencyKey: expected.idempotencyKey, + concurrencyKey: expected.concurrencyKey, + concurrencyLimit: expected.concurrencyLimit, input: expected.input, config: expected.config, context: expected.context, @@ -97,6 +108,8 @@ export function testBackend(options: TestBackendOptions): void { workflowName: expected.workflowName, version: null, idempotencyKey: null, + concurrencyKey: null, + concurrencyLimit: null, input: null, config: {}, context: null, @@ -105,12 +118,91 @@ export function testBackend(options: TestBackendOptions): void { }); expect(createdMin.version).toBeNull(); expect(createdMin.idempotencyKey).toBeNull(); + expect(createdMin.concurrencyKey).toBeNull(); + expect(createdMin.concurrencyLimit).toBeNull(); expect(createdMin.input).toBeNull(); expect(createdMin.context).toBeNull(); expect(deltaSeconds(createdMin.availableAt)).toBeLessThan(1); // defaults to NOW() expect(createdMin.deadlineAt).toBeNull(); }); + test("stores workflow concurrency metadata when provided", async () => { + const workflowName = randomUUID(); + const version = "v1"; + const concurrencyKey = "tenant:acme"; + const concurrencyLimit = 5; + + const created = await backend.createWorkflowRun({ + workflowName, + version, + idempotencyKey: null, + concurrencyKey, + concurrencyLimit, + input: null, + config: {}, + context: null, + availableAt: null, + deadlineAt: null, + }); + + expect(created.workflowName).toBe(workflowName); + expect(created.version).toBe(version); + expect(created.concurrencyKey).toBe(concurrencyKey); + expect(created.concurrencyLimit).toBe(concurrencyLimit); + }); + + test("normalizes omitted concurrency metadata from raw JS callers", async () => { + const rawParams = { + workflowName: randomUUID(), + version: null, + idempotencyKey: null, + input: null, + config: {}, + context: null, + availableAt: null, + deadlineAt: null, + } as unknown as CreateWorkflowRunParams; + + const created = await backend.createWorkflowRun(rawParams); + expect(created.concurrencyKey).toBeNull(); + expect(created.concurrencyLimit).toBeNull(); + }); + + test("rejects mismatched workflow concurrency metadata pairs", async () => { + const base = { + workflowName: randomUUID(), + version: null, + idempotencyKey: null, + input: null, + config: {}, + context: null, + availableAt: null, + deadlineAt: null, + }; + + const keyOnly = { + ...base, + concurrencyKey: "tenant:acme", + concurrencyLimit: null, + }; + await expect( + Promise.resolve().then(() => backend.createWorkflowRun(keyOnly)), + ).rejects.toThrow( + 'Invalid workflow concurrency metadata: "concurrencyKey" and "concurrencyLimit" must both be null or both be set.', + ); + + const limitOnly = { + ...base, + concurrencyKey: null, + concurrencyLimit: 1, + }; + await expect( + Promise.resolve().then(() => backend.createWorkflowRun(limitOnly)), + ).rejects.toThrow( + 'Invalid workflow concurrency metadata: "concurrencyKey" and "concurrencyLimit" must both be null or both be set.', + ); + }); + test("reuses the same run for matching idempotency key and workflow identity", async () => { const backend = await setup(); const workflowName = randomUUID(); @@ -121,6 +213,8 @@ export function testBackend(options: TestBackendOptions): void { workflowName, version, idempotencyKey, + concurrencyKey: null, + concurrencyLimit: null, input: { val: 1 }, config: {}, context: null, @@ -132,6 +226,8 @@ export function testBackend(options: TestBackendOptions): void { workflowName, version, idempotencyKey, + concurrencyKey: null, + concurrencyLimit: null, input: { val: 2 }, config: { changed: true }, context: null, @@ -151,6 +247,8 @@ export function testBackend(options: TestBackendOptions): void { workflowName: "workflow-a", version: "v1", idempotencyKey, + concurrencyKey: null, + concurrencyLimit: null, input: null, config: {}, context: null, @@ -162,6 +260,8 @@ export function testBackend(options: TestBackendOptions): void { workflowName: "workflow-b", version: "v1", idempotencyKey, + concurrencyKey: null, + concurrencyLimit: null, input: null, config: {}, context: null, @@ -184,6 +284,8 @@ export function testBackend(options: TestBackendOptions): void { workflowName, version: "v1", idempotencyKey, + concurrencyKey: null, + concurrencyLimit: null, input: null, config: {}, context: null, @@ -195,6 +297,8 @@ export function testBackend(options: TestBackendOptions): void { workflowName, version: "v1", idempotencyKey, + concurrencyKey: null, + concurrencyLimit: null, input: null, config: {}, context: null, @@ -209,6 +313,8 @@ export function testBackend(options: TestBackendOptions): void { workflowName, version: "v1", idempotencyKey, + concurrencyKey: null, + concurrencyLimit: null, input: null, config: {}, context: null, @@ -220,6 +326,8 @@ export function testBackend(options: TestBackendOptions): void { workflowName, version: "v1", idempotencyKey, + concurrencyKey: null, + concurrencyLimit: null, input: null, config: {}, context: null, @@ -244,6 +352,8 @@ export function testBackend(options: TestBackendOptions): void { workflowName, version: "v1", idempotencyKey, + concurrencyKey: null, + concurrencyLimit: null, input: null, config: {}, context: null, @@ -255,6 +365,8 @@ export function testBackend(options: TestBackendOptions): void { workflowName, version: "v2", idempotencyKey, + concurrencyKey: null, + concurrencyLimit: null, input: null, config: {}, context: null, @@ -280,6 +392,8 @@ export function testBackend(options: TestBackendOptions): void { workflowName, version: "v1", idempotencyKey, + concurrencyKey: null, + concurrencyLimit: null, input: null, config: {}, context: null, @@ -295,6 +409,8 @@ export function testBackend(options: TestBackendOptions): void { workflowName, version: "v1", idempotencyKey, + concurrencyKey: null, + concurrencyLimit: null, input: null, config: {}, context: null, @@ -317,6 +433,8 @@ export function testBackend(options: TestBackendOptions): void { workflowName, version: null, idempotencyKey: null, + concurrencyKey: null, + concurrencyLimit: null, input: null, config: {}, context: null, @@ -328,6 +446,8 @@ export function testBackend(options: TestBackendOptions): void { workflowName, version: null, idempotencyKey: null, + concurrencyKey: null, + concurrencyLimit: null, input: null, config: {}, context: null, @@ -351,6 +471,8 @@ export function testBackend(options: TestBackendOptions): void { workflowName, version, idempotencyKey, + concurrencyKey: null, + concurrencyLimit: null, input: { i }, config: {}, context: null, @@ -375,6 +497,8 @@ export function testBackend(options: TestBackendOptions): void { workflowName, version, idempotencyKey, + concurrencyKey: null, + concurrencyLimit: null, input: null, config: {}, context: null, @@ -399,6 +523,8 @@ export function testBackend(options: TestBackendOptions): void { workflowName, version, idempotencyKey, + concurrencyKey: null, + concurrencyLimit: null, input: null, config: {}, context: null, @@ -421,6 +547,8 @@ export function testBackend(options: TestBackendOptions): void { workflowName, version, idempotencyKey: failedKey, + concurrencyKey: null, + concurrencyLimit: null, input: null, config: {}, context: null, @@ -446,6 +574,8 @@ export function testBackend(options: TestBackendOptions): void { workflowName, version, idempotencyKey: failedKey, + concurrencyKey: null, + concurrencyLimit: null, input: null, config: {}, context: null, @@ -460,6 +590,8 @@ export function testBackend(options: TestBackendOptions): void { workflowName, version, idempotencyKey: canceledKey, + concurrencyKey: null, + concurrencyLimit: null, input: null, config: {}, context: null, @@ -473,6 +605,8 @@ export function testBackend(options: TestBackendOptions): void { workflowName, version, idempotencyKey: canceledKey, + concurrencyKey: null, + concurrencyLimit: null, input: null, config: {}, context: null, @@ -692,6 +826,445 @@ export function testBackend(options: TestBackendOptions): void { await teardown(backend); }); + + test("enforces concurrency limit for the same workflow version and key", async () => { + const backend = await setup(); + const workflowName = randomUUID(); + const version = "v1"; + const concurrencyKey = "tenant:acme"; + const concurrencyLimit = 1; + + await createPendingWorkflowRun(backend, { + workflowName, + version, + concurrencyKey, + concurrencyLimit, + }); + await createPendingWorkflowRun(backend, { + workflowName, + version, + concurrencyKey, + concurrencyLimit, + }); + + const firstClaimed = await backend.claimWorkflowRun({ + workerId: randomUUID(), + leaseDurationMs: 100, + }); + expect(firstClaimed).not.toBeNull(); + + const blocked = await backend.claimWorkflowRun({ + workerId: randomUUID(), + leaseDurationMs: 100, + }); + expect(blocked).toBeNull(); + + await teardown(backend); + }); + + test("supports limits greater than one", async () => { + const backend = await setup(); + const workflowName = randomUUID(); + const version = "v1"; + const concurrencyKey = "tenant:acme"; + const concurrencyLimit = 2; + + const firstRun = await createPendingWorkflowRun(backend, { + workflowName, + version, + concurrencyKey, + concurrencyLimit, + }); + const secondRun = await createPendingWorkflowRun(backend, { + workflowName, + version, + concurrencyKey, + concurrencyLimit, + }); + const thirdRun = await createPendingWorkflowRun(backend, { + workflowName, + version, + concurrencyKey, + concurrencyLimit, + }); + const runIds = new Set([firstRun.id, secondRun.id, thirdRun.id]); + + const firstClaimed = await backend.claimWorkflowRun({ + workerId: randomUUID(), + leaseDurationMs: 100, + }); + const secondClaimed = await backend.claimWorkflowRun({ + workerId: randomUUID(), + leaseDurationMs: 100, + }); + + expect(firstClaimed).not.toBeNull(); + expect(secondClaimed).not.toBeNull(); + if (!firstClaimed || !secondClaimed) { + throw new Error("Expected two claimed workflow runs"); + } + expect(firstClaimed.id).not.toBe(secondClaimed.id); + expect(runIds.has(firstClaimed.id)).toBe(true); + expect(runIds.has(secondClaimed.id)).toBe(true); + + const blocked = await backend.claimWorkflowRun({ + workerId: randomUUID(), + leaseDurationMs: 100, + }); + expect(blocked).toBeNull(); + + await teardown(backend); + }); + + test("still claims unconstrained runs when a constrained bucket is full", async () => { + const backend = await setup(); + const workflowName = randomUUID(); + const version = "v1"; + const concurrencyKey = "tenant:acme"; + const concurrencyLimit = 1; + + const constrainedA = await createPendingWorkflowRun(backend, { + workflowName, + version, + concurrencyKey, + concurrencyLimit, + }); + const constrainedB = await createPendingWorkflowRun(backend, { + workflowName, + version, + concurrencyKey, + concurrencyLimit, + }); + const constrainedIds = new Set([constrainedA.id, constrainedB.id]); + const unconstrained = await createPendingWorkflowRun(backend, { + workflowName, + version, + availableAt: new Date(Date.now() + 20), + }); + + const firstClaimed = await backend.claimWorkflowRun({ + workerId: randomUUID(), + leaseDurationMs: 100, + }); + expect(firstClaimed).not.toBeNull(); + if (!firstClaimed) throw new Error("Expected constrained run to claim"); + expect(constrainedIds.has(firstClaimed.id)).toBe(true); + + await sleep(30); + + const secondClaimed = await backend.claimWorkflowRun({ + workerId: randomUUID(), + leaseDurationMs: 100, + }); + expect(secondClaimed?.id).toBe(unconstrained.id); + expect(secondClaimed?.concurrencyKey).toBeNull(); + expect(secondClaimed?.concurrencyLimit).toBeNull(); + + const blocked = await backend.claimWorkflowRun({ + workerId: randomUUID(), + leaseDurationMs: 100, + }); + expect(blocked).toBeNull(); + + await teardown(backend); + }); + + test("allows claims for different concurrency keys", async () => { + const backend = await setup(); + const workflowName = randomUUID(); + const version = "v1"; + + await createPendingWorkflowRun(backend, { + workflowName, + version, + concurrencyKey: "tenant:a", + concurrencyLimit: 1, + }); + await createPendingWorkflowRun(backend, { + workflowName, + version, + concurrencyKey: "tenant:b", + concurrencyLimit: 1, + }); + + const firstClaimed = await backend.claimWorkflowRun({ + workerId: randomUUID(), + leaseDurationMs: 100, + }); + const secondClaimed = await backend.claimWorkflowRun({ + workerId: randomUUID(), + leaseDurationMs: 100, + }); + + expect(firstClaimed).not.toBeNull(); + expect(secondClaimed).not.toBeNull(); + expect(secondClaimed?.id).not.toBe(firstClaimed?.id); + + await teardown(backend); + }); + + test("allows claims for different workflow versions", async () => { + const backend = await setup(); + const workflowName = randomUUID(); + const concurrencyKey = "tenant:acme"; + const concurrencyLimit = 1; + + await createPendingWorkflowRun(backend, { + workflowName, + version: "v1", + concurrencyKey, + concurrencyLimit, + }); + await createPendingWorkflowRun(backend, { + workflowName, + version: "v2", + concurrencyKey, + concurrencyLimit, + }); + + const firstClaimed = await backend.claimWorkflowRun({ + workerId: randomUUID(), + leaseDurationMs: 100, + }); + const secondClaimed = await backend.claimWorkflowRun({ + workerId: randomUUID(), + leaseDurationMs: 100, + }); + + expect(firstClaimed).not.toBeNull(); + expect(secondClaimed).not.toBeNull(); + expect(secondClaimed?.id).not.toBe(firstClaimed?.id); + + await teardown(backend); + }); + + test("allows claims after the active lease expires", async () => { + const backend = await setup(); + const workflowName = randomUUID(); + const version = "v1"; + const concurrencyKey = "tenant:acme"; + const concurrencyLimit = 1; + + await createPendingWorkflowRun(backend, { + workflowName, + version, + concurrencyKey, + concurrencyLimit, + }); + await createPendingWorkflowRun(backend, { + workflowName, + version, + concurrencyKey, + concurrencyLimit, + }); + + const firstClaimed = await backend.claimWorkflowRun({ + workerId: randomUUID(), + leaseDurationMs: 20, + }); + expect(firstClaimed).not.toBeNull(); + + const blocked = await backend.claimWorkflowRun({ + workerId: randomUUID(), + leaseDurationMs: 20, + }); + expect(blocked).toBeNull(); + + await sleep(30); + + const claimedAfterExpiry = await backend.claimWorkflowRun({ + workerId: randomUUID(), + leaseDurationMs: 20, + }); + expect(claimedAfterExpiry).not.toBeNull(); + expect(claimedAfterExpiry?.id).not.toBe(firstClaimed?.id); + + await teardown(backend); + }); + + test("does not consume concurrency slot for sleeping runs", async () => { + const backend = await setup(); + const workflowName = randomUUID(); + const version = "v1"; + const concurrencyKey = "tenant:acme"; + const concurrencyLimit = 1; + + const firstRun = await createPendingWorkflowRun(backend, { + workflowName, + version, + concurrencyKey, + concurrencyLimit, + }); + const secondRun = await createPendingWorkflowRun(backend, { + workflowName, + version, + concurrencyKey, + concurrencyLimit, + }); + + const workerId = randomUUID(); + const firstClaimed = await backend.claimWorkflowRun({ + workerId, + leaseDurationMs: 200, + }); + expect(firstClaimed?.id).toBe(firstRun.id); + + const sleeping = await backend.sleepWorkflowRun({ + workflowRunId: firstRun.id, + workerId, + availableAt: new Date(Date.now() + 200), + }); + expect(sleeping.status).toBe("sleeping"); + + const secondClaimed = await backend.claimWorkflowRun({ + workerId: randomUUID(), + leaseDurationMs: 200, + }); + expect(secondClaimed?.id).toBe(secondRun.id); + + await teardown(backend); + }); + + test("frees concurrency slot after terminal workflow failure", async () => { + const backend = await setup(); + const workflowName = randomUUID(); + const version = "v1"; + const concurrencyKey = "tenant:acme"; + const concurrencyLimit = 1; + + const firstRun = await createPendingWorkflowRun(backend, { + workflowName, + version, + concurrencyKey, + concurrencyLimit, + }); + const secondRun = await createPendingWorkflowRun(backend, { + workflowName, + version, + concurrencyKey, + concurrencyLimit, + }); + + const workerId = randomUUID(); + const firstClaimed = await backend.claimWorkflowRun({ + workerId, + leaseDurationMs: 200, + }); + expect(firstClaimed?.id).toBe(firstRun.id); + + const failed = await backend.failWorkflowRun({ + workflowRunId: firstRun.id, + workerId, + error: { message: "terminal failure" }, + retryPolicy: DEFAULT_WORKFLOW_RETRY_POLICY, + }); + expect(failed.status).toBe("failed"); + + const secondClaimed = await backend.claimWorkflowRun({ + workerId: randomUUID(), + leaseDurationMs: 200, + }); + expect(secondClaimed?.id).toBe(secondRun.id); + + await teardown(backend); + }); + + test("does not consume concurrency slot while waiting for retry", async () => { + const backend = await setup(); + const workflowName = randomUUID(); + const version = "v1"; + const concurrencyKey = "tenant:acme"; + const concurrencyLimit = 1; + + const firstRun = await createPendingWorkflowRun(backend, { + workflowName, + version, + concurrencyKey, + concurrencyLimit, + }); + const secondRun = await createPendingWorkflowRun(backend, { + workflowName, + version, + concurrencyKey, + concurrencyLimit, + }); + + const workerId = randomUUID(); + const firstClaimed = await backend.claimWorkflowRun({ + workerId, + leaseDurationMs: 200, + }); + expect(firstClaimed?.id).toBe(firstRun.id); + + const failed = await backend.failWorkflowRun({ + workflowRunId: firstRun.id, + workerId, + error: { message: "retry me" }, + retryPolicy: SHORT_WORKFLOW_RETRY_POLICY, + }); + expect(failed.status).toBe("pending"); + expect(failed.availableAt).not.toBeNull(); + + const secondClaimed = await backend.claimWorkflowRun({ + workerId: randomUUID(), + leaseDurationMs: 200, + }); + expect(secondClaimed?.id).toBe(secondRun.id); + + await teardown(backend); + }); + + test("blocks due retry when another run in same bucket is actively leased", async () => { + const backend = await setup(); + const workflowName = randomUUID(); + const version = "v1"; + const concurrencyKey = "tenant:acme"; + const concurrencyLimit = 1; + + const firstRun = await createPendingWorkflowRun(backend, { + workflowName, + version, + concurrencyKey, + concurrencyLimit, + }); + const secondRun = await createPendingWorkflowRun(backend, { + workflowName, + version, + concurrencyKey, + concurrencyLimit, + }); + + const firstWorkerId = randomUUID(); + const firstClaimed = await backend.claimWorkflowRun({ + workerId: firstWorkerId, + leaseDurationMs: 200, + }); + expect(firstClaimed?.id).toBe(firstRun.id); + + const failed = await backend.failWorkflowRun({ + workflowRunId: firstRun.id, + workerId: firstWorkerId, + error: { message: "retry later" }, + retryPolicy: SHORT_WORKFLOW_RETRY_POLICY, + }); + expect(failed.status).toBe("pending"); + + const secondClaimed = await backend.claimWorkflowRun({ + workerId: randomUUID(), + leaseDurationMs: 200, + }); + expect(secondClaimed?.id).toBe(secondRun.id); + + await sleep(60); // allow first retry to become due while second is leased + + const blocked = await backend.claimWorkflowRun({ + workerId: randomUUID(), + leaseDurationMs: 200, + }); + expect(blocked).toBeNull(); + + await teardown(backend); + }); }); describe("extendWorkflowRunLease()", () => { @@ -1341,6 +1914,8 @@ export function testBackend(options: TestBackendOptions): void { workflowName: randomUUID(), version: null, idempotencyKey: null, + concurrencyKey: null, + concurrencyLimit: null, input: null, config: {}, context: null, @@ -1360,6 +1935,8 @@ export function testBackend(options: TestBackendOptions): void { workflowName: randomUUID(), version: null, idempotencyKey: null, + concurrencyKey: null, + concurrencyLimit: null, input: null, config: {}, context: null, @@ -1385,6 +1962,8 @@ export function testBackend(options: TestBackendOptions): void { workflowName: randomUUID(), version: null, idempotencyKey: null, + concurrencyKey: null, + concurrencyLimit: null, input: null, config: {}, context: null, @@ -1421,6 +2000,8 @@ export function testBackend(options: TestBackendOptions): void { workflowName: randomUUID(), version: null, idempotencyKey: null, + concurrencyKey: null, + concurrencyLimit: null, input: null, config: {}, context: null, @@ -1459,6 +2040,8 @@ export function testBackend(options: TestBackendOptions): void { workflowName: randomUUID(), version: null, idempotencyKey: null, + concurrencyKey: null, + concurrencyLimit: null, input: null, config: {}, context: null, @@ -1585,6 +2168,8 @@ export function testBackend(options: TestBackendOptions): void { workflowName: randomUUID(), version: null, idempotencyKey: null, + concurrencyKey: null, + concurrencyLimit: null, input: null, config: {}, context: null, @@ -1682,21 +2267,48 @@ export function testBackend(options: TestBackendOptions): void { }); } +/** + * Options for creating a pending workflow run in tests. + */ +interface CreatePendingWorkflowRunOptions { + workflowName?: string; + version?: string | null; + concurrencyKey?: string | null; + concurrencyLimit?: number | null; + availableAt?: Date | null; + deadlineAt?: Date | null; +} + /** * Create a pending workflow run for tests. * @param b - Backend + * @param options - Optional run overrides * @returns Created workflow run */ -async function createPendingWorkflowRun(b: Backend) { +async function createPendingWorkflowRun( + b: Backend, + options: CreatePendingWorkflowRunOptions = {}, +) { + const { + workflowName = randomUUID(), + version = null, + concurrencyKey = null, + concurrencyLimit = null, + availableAt = null, + deadlineAt = null, + } = options; + return await b.createWorkflowRun({ - workflowName: randomUUID(), - version: null, + workflowName, + version, idempotencyKey: null, + concurrencyKey, + concurrencyLimit, input: null, config: {}, context: null, - availableAt: null, - deadlineAt: null, + availableAt, + deadlineAt, }); } diff --git a/packages/openworkflow/backend.ts b/packages/openworkflow/backend.ts index fdbc432f..11b5c73f 100644 --- a/packages/openworkflow/backend.ts +++ b/packages/openworkflow/backend.ts @@ -68,6 +68,8 @@ export interface CreateWorkflowRunParams { workflowName: string; version: string | null; idempotencyKey: string | null; + concurrencyKey: string | null; + concurrencyLimit: number | null; config: JsonValue; context: JsonValue | null; input: JsonValue | null; diff --git a/packages/openworkflow/client.test.ts b/packages/openworkflow/client.test.ts index 0e154cb8..bd0321f1 100644 --- a/packages/openworkflow/client.test.ts +++ b/packages/openworkflow/client.test.ts @@ -334,6 +334,124 @@ describe("OpenWorkflow", () => { expect(handle.workflowRun.version).toBeNull(); }); + test("resolves literal workflow concurrency values", async () => { + const backend = await createBackend(); + const client = new OpenWorkflow({ backend }); + + const workflow = client.defineWorkflow( + { + name: "concurrency-literal-test", + concurrency: { + key: "tenant:acme", + limit: 3, + }, + }, + noopFn, + ); + const handle = await workflow.run({ value: 1 }); + + expect(handle.workflowRun.concurrencyKey).toBe("tenant:acme"); + expect(handle.workflowRun.concurrencyLimit).toBe(3); + }); + + test("resolves function workflow concurrency values from parsed input", async () => { + const backend = await createBackend(); + const client = new OpenWorkflow({ backend }); + + const schema = z.object({ + tenant: z.string().transform((value) => value.trim()), + limit: z.coerce.number().int().positive(), + }); + + const workflow = client.defineWorkflow( + { + name: "concurrency-function-test", + schema, + concurrency: { + key: ({ input }) => `tenant:${input.tenant}`, + limit: ({ input }) => Number(input.limit), + }, + }, + noopFn, + ); + + const handle = await workflow.run({ + tenant: " acme ", + limit: "4", + }); + + expect(handle.workflowRun.concurrencyKey).toBe("tenant:acme"); + expect(handle.workflowRun.concurrencyLimit).toBe(4); + }); + + test("throws when resolved workflow concurrency key is invalid", async () => { + const backend = await createBackend(); + const client = new OpenWorkflow({ backend }); + + const workflow = client.defineWorkflow( + { + name: "concurrency-invalid-key-test", + concurrency: { + key: () => " ", + limit: 1, + }, + }, + noopFn, + ); + + await expect(workflow.run()).rejects.toThrow( + /Invalid concurrency key for workflow "concurrency-invalid-key-test"/, + ); + }); + + test("throws when resolved workflow concurrency limit is invalid", async () => { + const backend = await createBackend(); + const client = new OpenWorkflow({ backend }); + + const workflow = client.defineWorkflow( + { + name: "concurrency-invalid-limit-test", + concurrency: { + key: "tenant:acme", + limit: 0, + }, + }, + noopFn, + ); + + await expect(workflow.run()).rejects.toThrow( + /Invalid concurrency limit for workflow "concurrency-invalid-limit-test"/, + ); + }); + + test("throws when workflow concurrency resolver throws and does not enqueue", async () => { + const backend = await createBackend(); + const client = new OpenWorkflow({ backend }); + + const workflow = client.defineWorkflow( + { + name: "concurrency-throwing-resolver-test", + concurrency: { + key: () => { + throw new Error("resolver failed"); + }, + limit: 1, + }, + }, + noopFn, + ); + + await expect(workflow.run()).rejects.toThrow( + /Failed to resolve concurrency key for workflow "concurrency-throwing-resolver-test"/, + ); + + const claimed = await backend.claimWorkflowRun({ + workerId: randomUUID(), + leaseDurationMs: 1000, + }); + expect(claimed).toBeNull(); + }); + test("creates workflow run with idempotency key", async () => { const backend = await createBackend(); const client = new OpenWorkflow({ backend }); diff --git a/packages/openworkflow/client.ts b/packages/openworkflow/client.ts index 6be212b1..2b04c02a 100644 --- a/packages/openworkflow/client.ts +++ b/packages/openworkflow/client.ts @@ -13,6 +13,7 @@ import { WorkflowRegistry } from "./registry.js"; import { Worker } from "./worker.js"; import { defineWorkflow, + type WorkflowConcurrency, type Workflow, type WorkflowSpec, } from "./workflow.js"; @@ -100,11 +101,18 @@ export class OpenWorkflow { throw new Error(validationResult.error); } const parsedInput = validationResult.value; + const resolvedConcurrency = resolveWorkflowConcurrency( + spec.name, + spec.concurrency, + parsedInput, + ); const workflowRun = await this.backend.createWorkflowRun({ workflowName: spec.name, version: spec.version ?? null, idempotencyKey: options?.idempotencyKey ?? null, + concurrencyKey: resolvedConcurrency.concurrencyKey, + concurrencyLimit: resolvedConcurrency.concurrencyLimit, config: {}, context: null, input: parsedInput ?? null, @@ -234,6 +242,82 @@ function resolveAvailableAt( return result.value; } +/** + * Resolved workflow concurrency values persisted on a workflow run. + */ +interface ResolvedWorkflowConcurrency { + concurrencyKey: string | null; + concurrencyLimit: number | null; +} + +/** + * Resolve and validate workflow concurrency configuration. + * @param workflowName - Workflow name (for error messages) + * @param concurrency - Workflow concurrency definition + * @param input - Validated workflow input + * @returns Resolved concurrency fields to persist on the run + * @throws {Error} When resolver execution fails or resolved values are invalid + */ +function resolveWorkflowConcurrency( + workflowName: string, + concurrency: WorkflowConcurrency | undefined, + input: Input, +): ResolvedWorkflowConcurrency { + if (!concurrency) { + return { + concurrencyKey: null, + concurrencyLimit: null, + }; + } + + let keyValue: unknown; + try { + keyValue = + typeof concurrency.key === "function" + ? concurrency.key({ input }) + : concurrency.key; + } catch (error) { + throw new Error( + `Failed to resolve concurrency key for workflow "${workflowName}"`, + { cause: error }, + ); + } + + if (typeof keyValue !== "string" || keyValue.trim().length === 0) { + throw new Error( + `Invalid concurrency key for workflow "${workflowName}": expected a non-empty string`, + ); + } + + let limitValue: unknown; + try { + limitValue = + typeof concurrency.limit === "function" + ? concurrency.limit({ input }) + : concurrency.limit; + } catch (error) { + throw new Error( + `Failed to resolve concurrency limit for workflow "${workflowName}"`, + { cause: error }, + ); + } + + if ( + typeof limitValue !== "number" || + !Number.isInteger(limitValue) || + limitValue <= 0 + ) { + throw new Error( + `Invalid concurrency limit for workflow "${workflowName}": expected a positive integer`, + ); + } + + return { + concurrencyKey: keyValue, + concurrencyLimit: limitValue, + }; +} + /** * Options for WorkflowHandle. */ diff --git a/packages/openworkflow/core/workflow.ts b/packages/openworkflow/core/workflow.ts index a73dd253..62e17e8a 100644 --- a/packages/openworkflow/core/workflow.ts +++ b/packages/openworkflow/core/workflow.ts @@ -24,6 +24,8 @@ export interface WorkflowRun { version: string | null; status: WorkflowRunStatus; idempotencyKey: string | null; + concurrencyKey: string | null; + concurrencyLimit: number | null; config: JsonValue; // user-defined config context: JsonValue | null; // runtime execution metadata input: JsonValue | null; diff --git a/packages/openworkflow/index.ts b/packages/openworkflow/index.ts index fa0d6304..6a416413 100644 --- a/packages/openworkflow/index.ts +++ b/packages/openworkflow/index.ts @@ -7,7 +7,12 @@ export type { WorkerOptions } from "./worker.js"; export { Worker } from "./worker.js"; // workflow -export type { RetryPolicy, Workflow } from "./workflow.js"; +export type { + RetryPolicy, + Workflow, + WorkflowConcurrency, + WorkflowConcurrencyResolverParams, +} from "./workflow.js"; export { defineWorkflowSpec, defineWorkflow, diff --git a/packages/openworkflow/postgres/backend.test.ts b/packages/openworkflow/postgres/backend.test.ts index 53ba08e0..dd99736b 100644 --- a/packages/openworkflow/postgres/backend.test.ts +++ b/packages/openworkflow/postgres/backend.test.ts @@ -62,6 +62,8 @@ describe("BackendPostgres schema option", () => { workflowName: "schema-test", version: null, idempotencyKey: null, + concurrencyKey: null, + concurrencyLimit: null, input: null, config: {}, context: null, @@ -108,6 +110,8 @@ describe("BackendPostgres schema option", () => { workflowName: "schema-reschedule-test", version: null, idempotencyKey: null, + concurrencyKey: null, + concurrencyLimit: null, input: null, config: {}, context: null, diff --git a/packages/openworkflow/postgres/backend.ts b/packages/openworkflow/postgres/backend.ts index 359b5796..da0d4449 100644 --- a/packages/openworkflow/postgres/backend.ts +++ b/packages/openworkflow/postgres/backend.ts @@ -100,11 +100,13 @@ export class BackendPostgres implements Backend { async createWorkflowRun( params: CreateWorkflowRunParams, ): Promise { - if (params.idempotencyKey === null) { - return await this.insertWorkflowRun(this.pg, params); + const normalizedParams = normalizeCreateWorkflowRunParams(params); + + if (normalizedParams.idempotencyKey === null) { + return await this.insertWorkflowRun(this.pg, normalizedParams); } - const { workflowName, idempotencyKey } = params; + const { workflowName, idempotencyKey } = normalizedParams; const lockScope = JSON.stringify({ namespaceId: this.namespaceId, workflowName, @@ -131,7 +133,7 @@ export class BackendPostgres implements Backend { return existing; } - return await this.insertWorkflowRun(pgTx, params); + return await this.insertWorkflowRun(pgTx, normalizedParams); }); } @@ -149,6 +151,8 @@ export class BackendPostgres implements Backend { "version", "status", "idempotency_key", + "concurrency_key", + "concurrency_limit", "config", "context", "input", @@ -165,6 +169,8 @@ export class BackendPostgres implements Backend { ${params.version}, 'pending', ${params.idempotencyKey}, + ${params.concurrencyKey ?? null}, + ${params.concurrencyLimit ?? null}, ${pg.json(params.config)}, ${pg.json(params.context)}, ${pg.json(params.input)}, @@ -301,16 +307,36 @@ export class BackendPostgres implements Backend { RETURNING "id" ), candidate AS ( - SELECT "id" - FROM ${workflowRunsTable} - WHERE "namespace_id" = ${this.namespaceId} - AND "status" IN ('pending', 'running', 'sleeping') - AND "available_at" <= NOW() - AND ("deadline_at" IS NULL OR "deadline_at" > NOW()) + SELECT wr."id" + FROM ${workflowRunsTable} AS wr + WHERE wr."namespace_id" = ${this.namespaceId} + AND wr."status" IN ('pending', 'running', 'sleeping') + AND wr."available_at" <= NOW() + AND (wr."deadline_at" IS NULL OR wr."deadline_at" > NOW()) + AND ( + wr."concurrency_key" IS NULL + OR wr."concurrency_limit" IS NULL + OR ( + -- TODO: If claim latency becomes a hot spot, replace this + -- correlated count with precomputed bucket counts via a CTE. + SELECT COUNT(*) + FROM ${workflowRunsTable} AS active + WHERE active."namespace_id" = wr."namespace_id" + AND active."workflow_name" = wr."workflow_name" + AND active."version" IS NOT DISTINCT FROM wr."version" + AND active."concurrency_key" = wr."concurrency_key" + AND active."status" = 'running' + -- Candidates require available_at <= NOW(); active leased runs + -- require available_at > NOW(). Keep explicit self-exclusion + -- for readability/safety. + AND active."id" <> wr."id" + AND active."available_at" > NOW() + ) < wr."concurrency_limit" + ) ORDER BY - CASE WHEN "status" = 'pending' THEN 0 ELSE 1 END, - "available_at", - "created_at" + CASE WHEN wr."status" = 'pending' THEN 0 ELSE 1 END, + wr."available_at", + wr."created_at" LIMIT 1 FOR UPDATE SKIP LOCKED ) @@ -793,3 +819,62 @@ function decodeCursor(cursor: string): Cursor { id: parsed.id, }; } + +/** + * Normalize and validate workflow concurrency metadata passed to create calls. + * @param params - Workflow run creation params + * @returns Params with normalized concurrency fields + * @throws {Error} When concurrency metadata has invalid shape or types + */ +function normalizeCreateWorkflowRunParams( + params: CreateWorkflowRunParams, +): CreateWorkflowRunParams { + const rawParams = params as unknown as Record; + const rawConcurrencyKey = rawParams["concurrencyKey"]; + const rawConcurrencyLimit = rawParams["concurrencyLimit"]; + + if (rawConcurrencyKey === undefined && rawConcurrencyLimit === undefined) { + return { + ...params, + concurrencyKey: null, + concurrencyLimit: null, + }; + } + + if ( + rawConcurrencyKey !== undefined && + rawConcurrencyKey !== null && + typeof rawConcurrencyKey !== "string" + ) { + throw new Error( + 'Invalid workflow concurrency metadata: "concurrencyKey" must be a string or null.', + ); + } + + if ( + rawConcurrencyLimit !== undefined && + rawConcurrencyLimit !== null && + typeof rawConcurrencyLimit !== "number" + ) { + throw new Error( + 'Invalid workflow concurrency metadata: "concurrencyLimit" must be a number or null.', + ); + } + + const concurrencyKey = + rawConcurrencyKey === undefined ? null : rawConcurrencyKey; + const concurrencyLimit = + rawConcurrencyLimit === undefined ? null : rawConcurrencyLimit; + + if ((concurrencyKey === null) !== (concurrencyLimit === null)) { + throw new Error( + 'Invalid workflow concurrency metadata: "concurrencyKey" and "concurrencyLimit" must both be null or both be set.', + ); + } + + return { + ...params, + concurrencyKey, + concurrencyLimit, + }; +} diff --git a/packages/openworkflow/postgres/postgres.test.ts b/packages/openworkflow/postgres/postgres.test.ts index 5a11ad3b..141d7078 100644 --- a/packages/openworkflow/postgres/postgres.test.ts +++ b/packages/openworkflow/postgres/postgres.test.ts @@ -55,6 +55,48 @@ describe("postgres", () => { await migrate(pg, schema); await migrate(pg, schema); }); + + test("adds workflow concurrency columns and index", async () => { + const schema = "test_concurrency_columns"; + await dropSchema(pg, schema); + await migrate(pg, schema); + + try { + const columns = await pg< + { + columnName: string; + }[] + >` + SELECT column_name AS "columnName" + FROM information_schema.columns + WHERE table_schema = ${schema} + AND table_name = 'workflow_runs' + AND column_name IN ('concurrency_key', 'concurrency_limit') + ORDER BY column_name ASC + `; + expect(columns.map((column) => column.columnName)).toEqual([ + "concurrency_key", + "concurrency_limit", + ]); + + /* cspell:disable */ + const indexes = await pg< + { + indexName: string; + }[] + >` + SELECT indexname AS "indexName" + FROM pg_indexes + WHERE schemaname = ${schema} + AND tablename = 'workflow_runs' + AND indexname = 'workflow_runs_concurrency_active_idx' + `; + /* cspell:enable */ + expect(indexes).toHaveLength(1); + } finally { + await dropSchema(pg, schema); + } + }); }); describe("dropSchema()", () => { diff --git a/packages/openworkflow/postgres/postgres.ts b/packages/openworkflow/postgres/postgres.ts index 1a5d1b74..c224716b 100644 --- a/packages/openworkflow/postgres/postgres.ts +++ b/packages/openworkflow/postgres/postgres.ts @@ -199,6 +199,33 @@ export function migrations(schema: string): string[] { ON CONFLICT DO NOTHING; COMMIT;`, + + // 5 - workflow concurrency columns and indexes + `BEGIN; + + ALTER TABLE ${quotedSchema}."workflow_runs" + ADD COLUMN IF NOT EXISTS "concurrency_key" TEXT; + + ALTER TABLE ${quotedSchema}."workflow_runs" + ADD COLUMN IF NOT EXISTS "concurrency_limit" INTEGER; + + CREATE INDEX IF NOT EXISTS "workflow_runs_concurrency_active_idx" + ON ${quotedSchema}."workflow_runs" ( + "namespace_id", + "workflow_name", + "version", + "concurrency_key", + "status", + "available_at" + ) + WHERE "concurrency_key" IS NOT NULL + AND "concurrency_limit" IS NOT NULL; + + INSERT INTO ${quotedSchema}."openworkflow_migrations"("version") + VALUES (5) + ON CONFLICT DO NOTHING; + + COMMIT;`, ]; } diff --git a/packages/openworkflow/sqlite/backend.test.ts b/packages/openworkflow/sqlite/backend.test.ts index c0138658..883de220 100644 --- a/packages/openworkflow/sqlite/backend.test.ts +++ b/packages/openworkflow/sqlite/backend.test.ts @@ -96,6 +96,8 @@ describe("BackendSqlite.createWorkflowRun error handling", () => { workflowName: "failing-workflow", version: "v1", idempotencyKey: randomUUID(), + concurrencyKey: null, + concurrencyLimit: null, config: {}, context: null, input: null, @@ -143,6 +145,8 @@ describe("BackendSqlite.createWorkflowRun error handling", () => { workflowName: "failing-workflow", version: "v1", idempotencyKey: randomUUID(), + concurrencyKey: null, + concurrencyLimit: null, config: {}, context: null, input: null, diff --git a/packages/openworkflow/sqlite/backend.ts b/packages/openworkflow/sqlite/backend.ts index 30641213..f35ab223 100644 --- a/packages/openworkflow/sqlite/backend.ts +++ b/packages/openworkflow/sqlite/backend.ts @@ -94,10 +94,11 @@ export class BackendSqlite implements Backend { } createWorkflowRun(params: CreateWorkflowRunParams): Promise { - const { workflowName, idempotencyKey } = params; + const normalizedParams = normalizeCreateWorkflowRunParams(params); + const { workflowName, idempotencyKey } = normalizedParams; if (idempotencyKey === null) { - return Promise.resolve(this.insertWorkflowRun(params)); + return Promise.resolve(this.insertWorkflowRun(normalizedParams)); } try { @@ -113,7 +114,7 @@ export class BackendSqlite implements Backend { return Promise.resolve(existing); } - const workflowRun = this.insertWorkflowRun(params); + const workflowRun = this.insertWorkflowRun(normalizedParams); this.db.exec("COMMIT"); return Promise.resolve(workflowRun); } catch (error) { @@ -144,6 +145,8 @@ export class BackendSqlite implements Backend { "version", "status", "idempotency_key", + "concurrency_key", + "concurrency_limit", "config", "context", "input", @@ -153,7 +156,7 @@ export class BackendSqlite implements Backend { "created_at", "updated_at" ) - VALUES (?, ?, ?, ?, 'pending', ?, ?, ?, ?, 0, ?, ?, ?, ?) + VALUES (?, ?, ?, ?, 'pending', ?, ?, ?, ?, ?, ?, 0, ?, ?, ?, ?) `); stmt.run( @@ -162,6 +165,8 @@ export class BackendSqlite implements Backend { params.workflowName, params.version, params.idempotencyKey, + params.concurrencyKey ?? null, + params.concurrencyLimit ?? null, toJSON(params.config), toJSON(params.context), toJSON(params.input), @@ -262,16 +267,42 @@ export class BackendSqlite implements Backend { // 2. find an available workflow run to claim const findStmt = this.db.prepare(` - SELECT "id" - FROM "workflow_runs" - WHERE "namespace_id" = ? - AND "status" IN ('pending', 'running', 'sleeping') - AND "available_at" <= ? - AND ("deadline_at" IS NULL OR "deadline_at" > ?) + SELECT wr."id" + FROM "workflow_runs" AS wr + WHERE wr."namespace_id" = ? + AND wr."status" IN ('pending', 'running', 'sleeping') + AND wr."available_at" <= ? + AND (wr."deadline_at" IS NULL OR wr."deadline_at" > ?) + AND ( + wr."concurrency_key" IS NULL + OR wr."concurrency_limit" IS NULL + OR ( + -- TODO: If claim latency becomes a hot spot, replace this + -- correlated count with precomputed bucket counts via a CTE. + SELECT COUNT(*) + FROM "workflow_runs" AS active + WHERE active."namespace_id" = wr."namespace_id" + AND active."workflow_name" = wr."workflow_name" + AND ( + active."version" = wr."version" + OR ( + active."version" IS NULL + AND wr."version" IS NULL + ) + ) + AND active."concurrency_key" = wr."concurrency_key" + AND active."status" = 'running' + -- Candidates require available_at <= now; active leased runs + -- require available_at > now. Keep explicit self-exclusion + -- for readability/safety. + AND active."id" <> wr."id" + AND active."available_at" > ? + ) < wr."concurrency_limit" + ) ORDER BY - CASE WHEN "status" = 'pending' THEN 0 ELSE 1 END, - "available_at", - "created_at" + CASE WHEN wr."status" = 'pending' THEN 0 ELSE 1 END, + wr."available_at", + wr."created_at" LIMIT 1 `); @@ -279,6 +310,7 @@ export class BackendSqlite implements Backend { this.namespaceId, currentTime, currentTime, + currentTime, ) as { id: string } | undefined; if (!candidate) { @@ -941,6 +973,8 @@ interface WorkflowRunRow { version: string | null; status: string; idempotency_key: string | null; + concurrency_key: string | null; + concurrency_limit: number | null; config: string; context: string | null; input: string | null; @@ -1000,6 +1034,8 @@ function rowToWorkflowRun(row: WorkflowRunRow): WorkflowRun { version: row.version, status: row.status as WorkflowRun["status"], idempotencyKey: row.idempotency_key, + concurrencyKey: row.concurrency_key, + concurrencyLimit: row.concurrency_limit, config: config as WorkflowRun["config"], context: fromJSON(row.context) as WorkflowRun["context"], input: fromJSON(row.input) as WorkflowRun["input"], @@ -1088,3 +1124,62 @@ function decodeCursor(cursor: string): Cursor { id: parsed.id, }; } + +/** + * Normalize and validate workflow concurrency metadata passed to create calls. + * @param params - Workflow run creation params + * @returns Params with normalized concurrency fields + * @throws {Error} When concurrency metadata has invalid shape or types + */ +function normalizeCreateWorkflowRunParams( + params: CreateWorkflowRunParams, +): CreateWorkflowRunParams { + const rawParams = params as unknown as Record; + const rawConcurrencyKey = rawParams["concurrencyKey"]; + const rawConcurrencyLimit = rawParams["concurrencyLimit"]; + + if (rawConcurrencyKey === undefined && rawConcurrencyLimit === undefined) { + return { + ...params, + concurrencyKey: null, + concurrencyLimit: null, + }; + } + + if ( + rawConcurrencyKey !== undefined && + rawConcurrencyKey !== null && + typeof rawConcurrencyKey !== "string" + ) { + throw new Error( + 'Invalid workflow concurrency metadata: "concurrencyKey" must be a string or null.', + ); + } + + if ( + rawConcurrencyLimit !== undefined && + rawConcurrencyLimit !== null && + typeof rawConcurrencyLimit !== "number" + ) { + throw new Error( + 'Invalid workflow concurrency metadata: "concurrencyLimit" must be a number or null.', + ); + } + + const concurrencyKey = + rawConcurrencyKey === undefined ? null : rawConcurrencyKey; + const concurrencyLimit = + rawConcurrencyLimit === undefined ? null : rawConcurrencyLimit; + + if ((concurrencyKey === null) !== (concurrencyLimit === null)) { + throw new Error( + 'Invalid workflow concurrency metadata: "concurrencyKey" and "concurrencyLimit" must both be null or both be set.', + ); + } + + return { + ...params, + concurrencyKey, + concurrencyLimit, + }; +} diff --git a/packages/openworkflow/sqlite/sqlite.test.ts b/packages/openworkflow/sqlite/sqlite.test.ts index e08cfa88..b462117f 100644 --- a/packages/openworkflow/sqlite/sqlite.test.ts +++ b/packages/openworkflow/sqlite/sqlite.test.ts @@ -193,6 +193,23 @@ describe("sqlite", () => { .get() as { count: number }; expect(stepAttemptsCheck.count).toBe(1); }); + + test("adds workflow concurrency columns and index", () => { + migrate(db); + + const columns = db + .prepare(`PRAGMA table_info("workflow_runs")`) + .all() as { name: string }[]; + const columnNames = columns.map((column) => column.name); + expect(columnNames).toContain("concurrency_key"); + expect(columnNames).toContain("concurrency_limit"); + + const indexRows = db + .prepare(`PRAGMA index_list("workflow_runs")`) + .all() as { name: string }[]; + const indexNames = indexRows.map((index) => index.name); + expect(indexNames).toContain("workflow_runs_concurrency_active_idx"); + }); }); describe("migration version tracking", () => { diff --git a/packages/openworkflow/sqlite/sqlite.ts b/packages/openworkflow/sqlite/sqlite.ts index bf3fd163..607ae741 100644 --- a/packages/openworkflow/sqlite/sqlite.ts +++ b/packages/openworkflow/sqlite/sqlite.ts @@ -193,6 +193,32 @@ export function migrations(): string[] { VALUES (4); COMMIT;`, + + // 5 - workflow concurrency columns and indexes + `BEGIN; + + ALTER TABLE "workflow_runs" + ADD COLUMN "concurrency_key" TEXT; + + ALTER TABLE "workflow_runs" + ADD COLUMN "concurrency_limit" INTEGER; + + CREATE INDEX IF NOT EXISTS "workflow_runs_concurrency_active_idx" + ON "workflow_runs" ( + "namespace_id", + "workflow_name", + "version", + "concurrency_key", + "status", + "available_at" + ) + WHERE "concurrency_key" IS NOT NULL + AND "concurrency_limit" IS NOT NULL; + + INSERT OR IGNORE INTO "openworkflow_migrations" ("version") + VALUES (5); + + COMMIT;`, ]; } diff --git a/packages/openworkflow/worker.test.ts b/packages/openworkflow/worker.test.ts index 1a203cf6..e7d78780 100644 --- a/packages/openworkflow/worker.test.ts +++ b/packages/openworkflow/worker.test.ts @@ -84,6 +84,8 @@ describe("Worker", () => { workflowName: "missing", version: null, idempotencyKey: null, + concurrencyKey: null, + concurrencyLimit: null, config: {}, context: null, input: null, @@ -316,6 +318,202 @@ describe("Worker", () => { expect(completed).toBe(2); }); + test("worker concurrency > 1 still respects workflow concurrency limits", async () => { + const backend = await createBackend(); + const client = new OpenWorkflow({ backend }); + const stepReleases: (() => void)[] = []; + + const workflow = client.defineWorkflow( + { + name: "workflow-concurrency-worker-test", + concurrency: { + key: "tenant:acme", + limit: 1, + }, + }, + async ({ step }) => { + await step.run({ name: "block" }, async () => { + await new Promise((resolve) => { + stepReleases.push(resolve); + }); + }); + + return "done"; + }, + ); + + const worker = client.newWorker({ concurrency: 2 }); + const first = await workflow.run(); + const second = await workflow.run(); + + await worker.tick(); + await sleep(100); + + const firstStateAfterFirstTick = await backend.getWorkflowRun({ + workflowRunId: first.workflowRun.id, + }); + const secondStateAfterFirstTick = await backend.getWorkflowRun({ + workflowRunId: second.workflowRun.id, + }); + expect(firstStateAfterFirstTick?.status).toBe("running"); + expect(secondStateAfterFirstTick?.status).toBe("pending"); + + const releaseFirst = stepReleases.shift(); + releaseFirst?.(); + await sleep(100); + + await worker.tick(); + await sleep(100); + + const firstStateAfterSecondTick = await backend.getWorkflowRun({ + workflowRunId: first.workflowRun.id, + }); + const secondStateAfterSecondTick = await backend.getWorkflowRun({ + workflowRunId: second.workflowRun.id, + }); + expect(firstStateAfterSecondTick?.status).toBe("completed"); + expect(secondStateAfterSecondTick?.status).toBe("running"); + + const releaseSecond = stepReleases.shift(); + releaseSecond?.(); + await sleep(100); + + await expect(first.result()).resolves.toBe("done"); + await expect(second.result()).resolves.toBe("done"); + }); + + test("worker claims next same-bucket run after terminal failure", async () => { + const backend = await createBackend(); + const client = new OpenWorkflow({ backend }); + + const workflow = client.defineWorkflow( + { + name: "worker-concurrency-terminal-failure", + concurrency: { + key: "tenant:acme", + limit: 1, + }, + retryPolicy: { + maximumAttempts: 1, + }, + }, + ({ input }: { input: { shouldFail: boolean } }) => { + if (input.shouldFail) { + throw new Error("terminal failure"); + } + return "done"; + }, + ); + + const worker = client.newWorker({ concurrency: 2 }); + const failing = await workflow.run({ shouldFail: true }); + const succeeding = await workflow.run( + { shouldFail: false }, + { availableAt: new Date(Date.now() + 20) }, + ); + + await worker.tick(); + await sleep(60); + + const failedRun = await backend.getWorkflowRun({ + workflowRunId: failing.workflowRun.id, + }); + expect(failedRun?.status).toBe("failed"); + + await worker.tick(); + await sleep(60); + + const completedRun = await backend.getWorkflowRun({ + workflowRunId: succeeding.workflowRun.id, + }); + expect(completedRun?.status).toBe("completed"); + await expect(succeeding.result()).resolves.toBe("done"); + }); + + test("worker honors same-bucket serialization across retry reschedule", async () => { + const backend = await createBackend(); + const client = new OpenWorkflow({ backend }); + const attemptsById = new Map(); + + const workflow = client.defineWorkflow( + { + name: "worker-concurrency-retry-serialization", + concurrency: { + key: "tenant:acme", + limit: 1, + }, + retryPolicy: { + initialInterval: "120ms", + maximumInterval: "120ms", + backoffCoefficient: 1, + maximumAttempts: 2, + }, + }, + async ({ input, step }) => { + const typedInput = input as { id: "retry" | "sibling" }; + const currentAttempts = attemptsById.get(typedInput.id) ?? 0; + attemptsById.set(typedInput.id, currentAttempts + 1); + + if (typedInput.id === "retry" && currentAttempts === 0) { + throw new Error("retry once"); + } + + if (typedInput.id === "sibling") { + await step.run({ name: "hold-slot" }, async () => { + await sleep(180); + }); + } + + return typedInput.id; + }, + ); + + const worker = client.newWorker({ concurrency: 2 }); + const retrying = await workflow.run({ id: "retry" }); + const sibling = await workflow.run({ id: "sibling" }); + + await worker.tick(); + await sleep(30); + + const firstState = await backend.getWorkflowRun({ + workflowRunId: retrying.workflowRun.id, + }); + expect(firstState?.status).toBe("pending"); + expect(firstState?.availableAt).not.toBeNull(); + + await worker.tick(); + await sleep(40); + + const siblingRunning = await backend.getWorkflowRun({ + workflowRunId: sibling.workflowRun.id, + }); + expect(siblingRunning?.status).toBe("running"); + + await sleep(100); // retry run is due while sibling still holds the slot + + const blockedClaims = await worker.tick(); + expect(blockedClaims).toBe(0); + + await sleep(100); + + const retryClaimedAfterRelease = await worker.tick(); + expect(retryClaimedAfterRelease).toBe(1); + await sleep(60); + + const retryCompleted = await backend.getWorkflowRun({ + workflowRunId: retrying.workflowRun.id, + }); + const siblingCompleted = await backend.getWorkflowRun({ + workflowRunId: sibling.workflowRun.id, + }); + expect(retryCompleted?.status).toBe("completed"); + expect(retryCompleted?.attempts).toBe(2); + expect(siblingCompleted?.status).toBe("completed"); + + await expect(retrying.result()).resolves.toBe("retry"); + await expect(sibling.result()).resolves.toBe("sibling"); + }); + test("worker starts, processes work, and stops gracefully", async () => { const backend = await createBackend(); const client = new OpenWorkflow({ backend }); @@ -1180,6 +1378,8 @@ describe("Worker", () => { workflowName: "version-check", version: "v2", idempotencyKey: null, + concurrencyKey: null, + concurrencyLimit: null, config: {}, context: null, input: null, @@ -1215,6 +1415,8 @@ describe("Worker", () => { workflowName: "version-mismatch", version: "v1", idempotencyKey: null, + concurrencyKey: null, + concurrencyLimit: null, config: {}, context: null, input: null, @@ -1250,6 +1452,8 @@ describe("Worker", () => { workflowName: "version-required", version: null, idempotencyKey: null, + concurrencyKey: null, + concurrencyLimit: null, config: {}, context: null, input: null, diff --git a/packages/openworkflow/workflow.ts b/packages/openworkflow/workflow.ts index 56589cf6..0caa82b4 100644 --- a/packages/openworkflow/workflow.ts +++ b/packages/openworkflow/workflow.ts @@ -3,6 +3,31 @@ import type { SerializedError } from "./core/error.js"; import type { StandardSchemaV1 } from "./core/schema.js"; import { WorkflowFunction } from "./execution.js"; +/** + * Resolver input for workflow concurrency configuration. + */ +export interface WorkflowConcurrencyResolverParams { + input: Input; +} + +/** + * Workflow-level concurrency configuration. + */ +export interface WorkflowConcurrency { + /** + * Bucket key used to scope concurrency for this run. + */ + readonly key: + | string + | ((params: Readonly>) => string); + /** + * Maximum active leased runs allowed in the bucket. + */ + readonly limit: + | number + | ((params: Readonly>) => number); +} + /** * A workflow spec. */ @@ -15,6 +40,8 @@ export interface WorkflowSpec { readonly schema?: StandardSchemaV1; /** The retry policy for the workflow. */ readonly retryPolicy?: Partial; + /** Optional workflow-level concurrency configuration. */ + readonly concurrency?: WorkflowConcurrency; /** Phantom type carrier - won't exist at runtime. */ readonly __types?: { output: Output; From 8ffac8a2879e2e9fbdb581a73af65c8de0ed3d38 Mon Sep 17 00:00:00 2001 From: Vaggeilis Yfantis Date: Sun, 15 Feb 2026 12:27:20 +0200 Subject: [PATCH 2/3] feat: Enhance workflow concurrency management with validation and atomicity --- ARCHITECTURE.md | 14 +- packages/docs/docs/workers.mdx | 4 + packages/docs/docs/workflows.mdx | 2 + packages/openworkflow/README.md | 2 + packages/openworkflow/backend-concurrency.ts | 114 +++++++++ packages/openworkflow/backend.testsuite.ts | 208 ++++++++++++++- .../openworkflow/postgres/backend.test.ts | 88 +++++++ packages/openworkflow/postgres/backend.ts | 237 +++++++++++------- packages/openworkflow/sqlite/backend.ts | 134 +++++----- 9 files changed, 621 insertions(+), 182 deletions(-) create mode 100644 packages/openworkflow/backend-concurrency.ts diff --git a/ARCHITECTURE.md b/ARCHITECTURE.md index b749c000..3dd81409 100644 --- a/ARCHITECTURE.md +++ b/ARCHITECTURE.md @@ -301,9 +301,12 @@ attempt is persisted individually as a `step_attempt`. Workers are configured with a concurrency limit (e.g., 10). A worker will maintain up to 10 in-flight workflow runs simultaneously. It polls for new work -only when it has available capacity. The Backend's atomic `dequeue` operation -(`FOR UPDATE SKIP LOCKED`) ensures that multiple workers can poll the same table -without race conditions or processing the same run twice. +only when it has available capacity. Claim atomicity is backend-specific: + +- Postgres uses `FOR UPDATE SKIP LOCKED` plus advisory locks for constrained + buckets. +- SQLite uses transaction-level single-writer locking (`BEGIN IMMEDIATE`) to + serialize claim writes. ### 5.3. Workflow-Run Concurrency @@ -327,7 +330,7 @@ defineWorkflow( `key` and `limit` can each be either static values (`string`/`number`) or functions of the validated workflow input. They are resolved once when the run -is created and persisted on the `workflow_run`. +is created and persisted on `workflow_runs`. Resolved keys are stored verbatim; only empty/all-whitespace keys are rejected. During claim/dequeue, a run is claimable only when the number of active leased @@ -341,6 +344,9 @@ is: `pending`, `sleeping`, and expired-lease `running` runs do not consume concurrency slots. +For active runs in a bucket (`pending`, `running`), the resolved +`concurrency_limit` is required to be consistent; conflicting limits are +rejected at run creation. ### 5.4. Handling Crashes During Parallel Execution diff --git a/packages/docs/docs/workers.mdx b/packages/docs/docs/workers.mdx index f2f82999..475a62ca 100644 --- a/packages/docs/docs/workers.mdx +++ b/packages/docs/docs/workers.mdx @@ -119,6 +119,8 @@ Only active leased `running` runs consume workflow-concurrency slots. Resolved keys are stored verbatim; only empty/all-whitespace keys are rejected. Sleeping runs are non-consuming until they are claimed again as actively leased `running` runs. +Within active `pending` and actively leased `running` runs for the same +workflow+version+key bucket, the resolved `limit` must remain consistent. ## Heartbeats and Crash Recovery @@ -166,6 +168,8 @@ Workers coordinate through the database: - Each workflow run is claimed by exactly one worker at a time - Workers use atomic database operations to prevent duplicate processing - If a worker crashes, its workflows become available to other workers +- SQLite relies on transaction-level single-writer locking (`BEGIN IMMEDIATE`) + while Postgres uses row locks plus advisory locks for constrained buckets ## Graceful Shutdown diff --git a/packages/docs/docs/workflows.mdx b/packages/docs/docs/workflows.mdx index 1918f9c6..526695ad 100644 --- a/packages/docs/docs/workflows.mdx +++ b/packages/docs/docs/workflows.mdx @@ -213,6 +213,8 @@ defineWorkflow( - key must resolve to a non-empty string - limit must resolve to a positive integer - resolved keys are stored verbatim; only empty/all-whitespace keys are rejected +- within active runs (`pending`/`running`) for the same + workflow+version+key bucket, `limit` must remain consistent When concurrency is configured, runs in the same bucket are constrained by: diff --git a/packages/openworkflow/README.md b/packages/openworkflow/README.md index 72b8ed45..4bab0e77 100644 --- a/packages/openworkflow/README.md +++ b/packages/openworkflow/README.md @@ -98,6 +98,8 @@ Keys are stored verbatim (for example, `" foo "` and `"foo"` are different concurrency keys); only empty or all-whitespace keys are rejected. Sleeping runs do not consume workflow-concurrency slots until they are claimed again as actively leased `running` runs. +For a given active bucket (`workflow + version + key`), the resolved `limit` +must stay consistent across `pending`/`running` runs. ## Documentation diff --git a/packages/openworkflow/backend-concurrency.ts b/packages/openworkflow/backend-concurrency.ts new file mode 100644 index 00000000..40e3e5a9 --- /dev/null +++ b/packages/openworkflow/backend-concurrency.ts @@ -0,0 +1,114 @@ +import type { CreateWorkflowRunParams } from "./backend.js"; + +const INVALID_CONCURRENCY_KEY_TYPE_ERROR = + 'Invalid workflow concurrency metadata: "concurrencyKey" must be a string or null.'; +export const INVALID_CONCURRENCY_KEY_VALUE_ERROR = + 'Invalid workflow concurrency metadata: "concurrencyKey" must be a non-empty string when provided.'; +const INVALID_CONCURRENCY_LIMIT_TYPE_ERROR = + 'Invalid workflow concurrency metadata: "concurrencyLimit" must be a number or null.'; +export const INVALID_CONCURRENCY_LIMIT_VALUE_ERROR = + 'Invalid workflow concurrency metadata: "concurrencyLimit" must be a positive integer or null.'; +const INVALID_CONCURRENCY_PAIR_ERROR = + 'Invalid workflow concurrency metadata: "concurrencyKey" and "concurrencyLimit" must both be null or both be set.'; +export const CONCURRENCY_LIMIT_MISMATCH_ERROR = + 'Invalid workflow concurrency metadata: active runs in the same bucket must use the same "concurrencyLimit".'; + +/** + * Bucket identity for workflow-level concurrency. + */ +export interface ConcurrencyBucket { + workflowName: string; + version: string | null; + key: string; + limit: number; +} + +/** + * Normalize and validate workflow concurrency metadata passed to create calls. + * This protects direct backend callers that bypass client-side validation. + * @param params - Workflow run creation params + * @returns Params with normalized concurrency fields + * @throws {Error} When concurrency metadata has invalid shape or values + */ +export function normalizeCreateWorkflowRunParams( + params: CreateWorkflowRunParams, +): CreateWorkflowRunParams { + const rawParams = params as unknown as Record; + const rawConcurrencyKey = rawParams["concurrencyKey"]; + const rawConcurrencyLimit = rawParams["concurrencyLimit"]; + + if (rawConcurrencyKey === undefined && rawConcurrencyLimit === undefined) { + return { + ...params, + concurrencyKey: null, + concurrencyLimit: null, + }; + } + + if ( + rawConcurrencyKey !== undefined && + rawConcurrencyKey !== null && + typeof rawConcurrencyKey !== "string" + ) { + throw new Error(INVALID_CONCURRENCY_KEY_TYPE_ERROR); + } + + if ( + rawConcurrencyLimit !== undefined && + rawConcurrencyLimit !== null && + typeof rawConcurrencyLimit !== "number" + ) { + throw new Error(INVALID_CONCURRENCY_LIMIT_TYPE_ERROR); + } + + const concurrencyKey = + rawConcurrencyKey === undefined ? null : rawConcurrencyKey; + const concurrencyLimit = + rawConcurrencyLimit === undefined ? null : rawConcurrencyLimit; + + if ((concurrencyKey === null) !== (concurrencyLimit === null)) { + throw new Error(INVALID_CONCURRENCY_PAIR_ERROR); + } + + if ( + typeof concurrencyKey === "string" && + concurrencyKey.trim().length === 0 + ) { + throw new Error(INVALID_CONCURRENCY_KEY_VALUE_ERROR); + } + + if ( + typeof concurrencyLimit === "number" && + (!Number.isFinite(concurrencyLimit) || + !Number.isInteger(concurrencyLimit) || + concurrencyLimit <= 0) + ) { + throw new Error(INVALID_CONCURRENCY_LIMIT_VALUE_ERROR); + } + + return { + ...params, + concurrencyKey, + concurrencyLimit, + }; +} + +/** + * Return bucket identity for constrained runs, otherwise null. + * @param params - Normalized workflow run creation params + * @returns Concurrency bucket or null for unconstrained runs + */ +export function toConcurrencyBucket( + params: CreateWorkflowRunParams, +): ConcurrencyBucket | null { + if (params.concurrencyKey === null || params.concurrencyLimit === null) { + return null; + } + + return { + workflowName: params.workflowName, + version: params.version, + key: params.concurrencyKey, + limit: params.concurrencyLimit, + }; +} diff --git a/packages/openworkflow/backend.testsuite.ts b/packages/openworkflow/backend.testsuite.ts index c9a8f3e1..6d9de56e 100644 --- a/packages/openworkflow/backend.testsuite.ts +++ b/packages/openworkflow/backend.testsuite.ts @@ -1,3 +1,8 @@ +import { + CONCURRENCY_LIMIT_MISMATCH_ERROR, + INVALID_CONCURRENCY_KEY_VALUE_ERROR, + INVALID_CONCURRENCY_LIMIT_VALUE_ERROR, +} from "./backend-concurrency.js"; import { DEFAULT_RUN_IDEMPOTENCY_PERIOD_MS } from "./backend.js"; import type { Backend, CreateWorkflowRunParams } from "./backend.js"; import type { StepAttempt } from "./core/step.js"; @@ -203,6 +208,173 @@ export function testBackend(options: TestBackendOptions): void { ); }); + test("rejects invalid workflow concurrency limit values", async () => { + const base = { + workflowName: randomUUID(), + version: null, + idempotencyKey: null, + concurrencyKey: "tenant:acme", + input: null, + config: {}, + context: null, + availableAt: null, + deadlineAt: null, + }; + + const invalidLimits = [ + 0, + -1, + 1.5, + Number.NaN, + Number.POSITIVE_INFINITY, + ]; + for (const invalidLimit of invalidLimits) { + const invalid = { + ...base, + concurrencyLimit: invalidLimit, + } as unknown as CreateWorkflowRunParams; + await expect( + Promise.resolve().then(() => backend.createWorkflowRun(invalid)), + ).rejects.toThrow(INVALID_CONCURRENCY_LIMIT_VALUE_ERROR); + } + }); + + test("rejects whitespace-only workflow concurrency keys", async () => { + const invalid = { + workflowName: randomUUID(), + version: null, + idempotencyKey: null, + concurrencyKey: " ", + concurrencyLimit: 1, + input: null, + config: {}, + context: null, + availableAt: null, + deadlineAt: null, + } as unknown as CreateWorkflowRunParams; + + await expect( + Promise.resolve().then(() => backend.createWorkflowRun(invalid)), + ).rejects.toThrow(INVALID_CONCURRENCY_KEY_VALUE_ERROR); + }); + + test("rejects mixed concurrency limits for the same active bucket", async () => { + const backend = await setup(); + const workflowName = randomUUID(); + const version = "v1"; + const concurrencyKey = "tenant:acme"; + + await createPendingWorkflowRun(backend, { + workflowName, + version, + concurrencyKey, + concurrencyLimit: 1, + }); + + await expect( + backend.createWorkflowRun({ + workflowName, + version, + idempotencyKey: null, + concurrencyKey, + concurrencyLimit: 2, + input: null, + config: {}, + context: null, + availableAt: null, + deadlineAt: null, + }), + ).rejects.toThrow(CONCURRENCY_LIMIT_MISMATCH_ERROR); + + await teardown(backend); + }); + + test("allows changing concurrency limit after terminal runs leave active states", async () => { + const backend = await setup(); + const workflowName = randomUUID(); + const version = "v1"; + const concurrencyKey = "tenant:acme"; + + const first = await createPendingWorkflowRun(backend, { + workflowName, + version, + concurrencyKey, + concurrencyLimit: 1, + }); + const workerId = randomUUID(); + + const claimed = await backend.claimWorkflowRun({ + workerId, + leaseDurationMs: 100, + }); + expect(claimed?.id).toBe(first.id); + + await backend.completeWorkflowRun({ + workflowRunId: first.id, + workerId, + output: null, + }); + + const next = await backend.createWorkflowRun({ + workflowName, + version, + idempotencyKey: null, + concurrencyKey, + concurrencyLimit: 2, + input: null, + config: {}, + context: null, + availableAt: null, + deadlineAt: null, + }); + expect(next.concurrencyLimit).toBe(2); + + await teardown(backend); + }); + + test("allows changing concurrency limit when only sleeping runs remain", async () => { + const backend = await setup(); + const workflowName = randomUUID(); + const version = "v1"; + const concurrencyKey = "tenant:acme"; + + const first = await createPendingWorkflowRun(backend, { + workflowName, + version, + concurrencyKey, + concurrencyLimit: 1, + }); + const workerId = randomUUID(); + + const claimed = await backend.claimWorkflowRun({ + workerId, + leaseDurationMs: 100, + }); + expect(claimed?.id).toBe(first.id); + + await backend.sleepWorkflowRun({ + workflowRunId: first.id, + workerId, + availableAt: new Date(Date.now() + 60_000), + }); + + const next = await backend.createWorkflowRun({ + workflowName, + version, + idempotencyKey: null, + concurrencyKey, + concurrencyLimit: 2, + input: null, + config: {}, + context: null, + availableAt: null, + deadlineAt: null, + }); + expect(next.concurrencyLimit).toBe(2); + + await teardown(backend); + }); + test("reuses the same run for matching idempotency key and workflow identity", async () => { const backend = await setup(); const workflowName = randomUUID(); @@ -1107,10 +1279,13 @@ export function testBackend(options: TestBackendOptions): void { workerId, leaseDurationMs: 200, }); - expect(firstClaimed?.id).toBe(firstRun.id); + expect(firstClaimed).not.toBeNull(); + if (!firstClaimed) throw new Error("Expected first claim"); + const secondRunId = + firstClaimed.id === firstRun.id ? secondRun.id : firstRun.id; const sleeping = await backend.sleepWorkflowRun({ - workflowRunId: firstRun.id, + workflowRunId: firstClaimed.id, workerId, availableAt: new Date(Date.now() + 200), }); @@ -1120,7 +1295,7 @@ export function testBackend(options: TestBackendOptions): void { workerId: randomUUID(), leaseDurationMs: 200, }); - expect(secondClaimed?.id).toBe(secondRun.id); + expect(secondClaimed?.id).toBe(secondRunId); await teardown(backend); }); @@ -1150,10 +1325,13 @@ export function testBackend(options: TestBackendOptions): void { workerId, leaseDurationMs: 200, }); - expect(firstClaimed?.id).toBe(firstRun.id); + expect(firstClaimed).not.toBeNull(); + if (!firstClaimed) throw new Error("Expected first claim"); + const secondRunId = + firstClaimed.id === firstRun.id ? secondRun.id : firstRun.id; const failed = await backend.failWorkflowRun({ - workflowRunId: firstRun.id, + workflowRunId: firstClaimed.id, workerId, error: { message: "terminal failure" }, retryPolicy: DEFAULT_WORKFLOW_RETRY_POLICY, @@ -1164,7 +1342,7 @@ export function testBackend(options: TestBackendOptions): void { workerId: randomUUID(), leaseDurationMs: 200, }); - expect(secondClaimed?.id).toBe(secondRun.id); + expect(secondClaimed?.id).toBe(secondRunId); await teardown(backend); }); @@ -1194,10 +1372,13 @@ export function testBackend(options: TestBackendOptions): void { workerId, leaseDurationMs: 200, }); - expect(firstClaimed?.id).toBe(firstRun.id); + expect(firstClaimed).not.toBeNull(); + if (!firstClaimed) throw new Error("Expected first claim"); + const secondRunId = + firstClaimed.id === firstRun.id ? secondRun.id : firstRun.id; const failed = await backend.failWorkflowRun({ - workflowRunId: firstRun.id, + workflowRunId: firstClaimed.id, workerId, error: { message: "retry me" }, retryPolicy: SHORT_WORKFLOW_RETRY_POLICY, @@ -1209,7 +1390,7 @@ export function testBackend(options: TestBackendOptions): void { workerId: randomUUID(), leaseDurationMs: 200, }); - expect(secondClaimed?.id).toBe(secondRun.id); + expect(secondClaimed?.id).toBe(secondRunId); await teardown(backend); }); @@ -1239,10 +1420,13 @@ export function testBackend(options: TestBackendOptions): void { workerId: firstWorkerId, leaseDurationMs: 200, }); - expect(firstClaimed?.id).toBe(firstRun.id); + expect(firstClaimed).not.toBeNull(); + if (!firstClaimed) throw new Error("Expected first claim"); + const secondRunId = + firstClaimed.id === firstRun.id ? secondRun.id : firstRun.id; const failed = await backend.failWorkflowRun({ - workflowRunId: firstRun.id, + workflowRunId: firstClaimed.id, workerId: firstWorkerId, error: { message: "retry later" }, retryPolicy: SHORT_WORKFLOW_RETRY_POLICY, @@ -1253,7 +1437,7 @@ export function testBackend(options: TestBackendOptions): void { workerId: randomUUID(), leaseDurationMs: 200, }); - expect(secondClaimed?.id).toBe(secondRun.id); + expect(secondClaimed?.id).toBe(secondRunId); await sleep(60); // allow first retry to become due while second is leased diff --git a/packages/openworkflow/postgres/backend.test.ts b/packages/openworkflow/postgres/backend.test.ts index dd99736b..c8e95c38 100644 --- a/packages/openworkflow/postgres/backend.test.ts +++ b/packages/openworkflow/postgres/backend.test.ts @@ -170,3 +170,91 @@ describe("BackendPostgres schema option", () => { } }); }); + +describe("BackendPostgres concurrency claim atomicity", () => { + test("serializes same-bucket concurrent claims with advisory bucket locks", async () => { + const namespaceId = randomUUID(); + const workflowName = "advisory-claim-atomicity"; + const version = "v1"; + const concurrencyKey = "tenant:acme"; + const concurrencyLimit = 1; + const rounds = 5; + const claimers = 6; + + const backends = await Promise.all( + Array.from({ length: claimers }, async () => { + return await BackendPostgres.connect(DEFAULT_POSTGRES_URL, { + namespaceId, + }); + }), + ); + const primaryBackend = backends[0]; + if (!primaryBackend) { + throw new Error("Expected at least one backend instance"); + } + const inspector = newPostgresMaxOne(DEFAULT_POSTGRES_URL); + + try { + for (let i = 0; i < rounds; i += 1) { + await primaryBackend.createWorkflowRun({ + workflowName, + version, + idempotencyKey: null, + concurrencyKey, + concurrencyLimit, + input: null, + config: {}, + context: null, + availableAt: null, + deadlineAt: null, + }); + } + + for (let round = 0; round < rounds; round += 1) { + const claims = await Promise.all( + backends.map(async (backend, i) => { + return await backend.claimWorkflowRun({ + workerId: `worker-${String(round)}-${String(i)}-${randomUUID()}`, + leaseDurationMs: 5000, + }); + }), + ); + const claimed = claims.filter((run): run is NonNullable => { + return run !== null; + }); + expect(claimed).toHaveLength(1); + + const workflowRunsTable = inspector`${inspector("openworkflow")}.${inspector("workflow_runs")}`; + const [activeCount] = await inspector<{ count: number }[]>` + SELECT COUNT(*)::INT AS "count" + FROM ${workflowRunsTable} + WHERE "namespace_id" = ${namespaceId} + AND "workflow_name" = ${workflowName} + AND "version" IS NOT DISTINCT FROM ${version} + AND "concurrency_key" = ${concurrencyKey} + AND "status" = 'running' + AND "available_at" > NOW() + `; + expect(activeCount?.count).toBe(1); + + const claimedRun = claimed[0]; + if (!claimedRun?.workerId) { + throw new Error("Expected claimed workflow run to include worker id"); + } + + await primaryBackend.completeWorkflowRun({ + workflowRunId: claimedRun.id, + workerId: claimedRun.workerId, + output: null, + }); + } + } finally { + await Promise.all( + backends.map(async (backend) => { + await backend.stop(); + }), + ); + await inspector.end(); + } + }); +}); diff --git a/packages/openworkflow/postgres/backend.ts b/packages/openworkflow/postgres/backend.ts index da0d4449..e86dfeec 100644 --- a/packages/openworkflow/postgres/backend.ts +++ b/packages/openworkflow/postgres/backend.ts @@ -1,3 +1,8 @@ +import { + CONCURRENCY_LIMIT_MISMATCH_ERROR, + normalizeCreateWorkflowRunParams, + toConcurrencyBucket, +} from "../backend-concurrency.js"; import { DEFAULT_NAMESPACE_ID, DEFAULT_RUN_IDEMPOTENCY_PERIOD_MS, @@ -101,36 +106,44 @@ export class BackendPostgres implements Backend { params: CreateWorkflowRunParams, ): Promise { const normalizedParams = normalizeCreateWorkflowRunParams(params); + const concurrencyBucket = toConcurrencyBucket(normalizedParams); - if (normalizedParams.idempotencyKey === null) { + if ( + normalizedParams.idempotencyKey === null && + concurrencyBucket === null + ) { return await this.insertWorkflowRun(this.pg, normalizedParams); } - const { workflowName, idempotencyKey } = normalizedParams; - const lockScope = JSON.stringify({ - namespaceId: this.namespaceId, - workflowName, - idempotencyKey, - }); - return await this.pg.begin(async (_tx) => { const pgTx = _tx as unknown as Postgres; - /* eslint-disable @cspell/spellchecker */ - await pgTx.unsafe( - "SELECT pg_advisory_xact_lock(hashtextextended($1, 0::bigint))", - [lockScope], - ); - /* eslint-enable @cspell/spellchecker */ + if (normalizedParams.idempotencyKey !== null) { + // Acquire/check idempotency first so duplicate create requests can + // return early without taking a bucket lock. + await this.acquireIdempotencyCreateLock( + pgTx, + normalizedParams.workflowName, + normalizedParams.idempotencyKey, + ); - const existing = await this.getWorkflowRunByIdempotencyKey( - pgTx, - workflowName, - idempotencyKey, - new Date(Date.now() - DEFAULT_RUN_IDEMPOTENCY_PERIOD_MS), - ); - if (existing) { - return existing; + const existing = await this.getWorkflowRunByIdempotencyKey( + pgTx, + normalizedParams.workflowName, + normalizedParams.idempotencyKey, + new Date(Date.now() - DEFAULT_RUN_IDEMPOTENCY_PERIOD_MS), + ); + if (existing) { + return existing; + } + } + + if (concurrencyBucket) { + await this.acquireConcurrencyCreateLock(pgTx, concurrencyBucket); + await this.assertNoActiveBucketConcurrencyLimitMismatch( + pgTx, + concurrencyBucket, + ); } return await this.insertWorkflowRun(pgTx, normalizedParams); @@ -210,6 +223,74 @@ export class BackendPostgres implements Backend { return workflowRun ?? null; } + private async acquireConcurrencyCreateLock( + pg: Postgres, + params: { workflowName: string; version: string | null; key: string }, + ): Promise { + // Intentionally uses a different lock payload shape than claim-time locks. + // Create-time lock serializes concurrent creates in the bucket, while + // claim-time lock serializes concurrent claim gate evaluation. + const lockScope = JSON.stringify({ + namespaceId: this.namespaceId, + workflowName: params.workflowName, + version: params.version, + concurrencyKey: params.key, + }); + + // Hash collisions are extremely unlikely; if they happen, unrelated + // buckets may serialize, but correctness is preserved. + await pg.unsafe( + "SELECT pg_advisory_xact_lock(hashtextextended($1, 0::bigint))", + [lockScope], + ); + } + + private async acquireIdempotencyCreateLock( + pg: Postgres, + workflowName: string, + idempotencyKey: string, + ): Promise { + const lockScope = JSON.stringify({ + namespaceId: this.namespaceId, + workflowName, + idempotencyKey, + }); + + await pg.unsafe( + "SELECT pg_advisory_xact_lock(hashtextextended($1, 0::bigint))", + [lockScope], + ); + } + + private async assertNoActiveBucketConcurrencyLimitMismatch( + pg: Postgres, + params: { + workflowName: string; + version: string | null; + key: string; + limit: number; + }, + ): Promise { + const workflowRunsTable = this.workflowRunsTable(pg); + const [conflict] = await pg<{ id: string }[]>` + SELECT "id" + FROM ${workflowRunsTable} + WHERE "namespace_id" = ${this.namespaceId} + AND "workflow_name" = ${params.workflowName} + AND "version" IS NOT DISTINCT FROM ${params.version} + AND "concurrency_key" = ${params.key} + -- Sleeping runs are excluded so long sleeps do not pin historical + -- limits and block new run creation after config changes. + AND "status" IN ('pending', 'running') + AND "concurrency_limit" IS DISTINCT FROM ${params.limit} + LIMIT 1 + `; + + if (conflict) { + throw new Error(CONCURRENCY_LIMIT_MISMATCH_ERROR); + } + } + async getWorkflowRun( params: GetWorkflowRunParams, ): Promise { @@ -316,22 +397,43 @@ export class BackendPostgres implements Backend { AND ( wr."concurrency_key" IS NULL OR wr."concurrency_limit" IS NULL - OR ( - -- TODO: If claim latency becomes a hot spot, replace this - -- correlated count with precomputed bucket counts via a CTE. - SELECT COUNT(*) - FROM ${workflowRunsTable} AS active - WHERE active."namespace_id" = wr."namespace_id" - AND active."workflow_name" = wr."workflow_name" - AND active."version" IS NOT DISTINCT FROM wr."version" - AND active."concurrency_key" = wr."concurrency_key" - AND active."status" = 'running' - -- Candidates require available_at <= NOW(); active leased runs - -- require available_at > NOW(). Keep explicit self-exclusion - -- for readability/safety. - AND active."id" <> wr."id" - AND active."available_at" > NOW() - ) < wr."concurrency_limit" + OR CASE + -- cspell:ignore xact hashtextextended + -- Serialize constrained claims per bucket. pg_try_advisory lock + -- intentionally skips busy buckets (non-blocking) to avoid + -- over-claim races without stalling claim loops. This lock key + -- shape intentionally differs from create-time locking because + -- claims and creates are serialized within their own hot paths. + -- Hash collisions can over-serialize unrelated buckets + -- (throughput impact only). + WHEN pg_try_advisory_xact_lock( + hashtextextended( + json_build_array( + wr."namespace_id", + wr."workflow_name", + wr."version", + wr."concurrency_key" + )::text, + 0::bigint + ) + ) THEN ( + -- TODO: If claim latency becomes a hot spot, replace this + -- correlated count with precomputed bucket counts via a CTE. + SELECT COUNT(*) + FROM ${workflowRunsTable} AS active + WHERE active."namespace_id" = wr."namespace_id" + AND active."workflow_name" = wr."workflow_name" + AND active."version" IS NOT DISTINCT FROM wr."version" + AND active."concurrency_key" = wr."concurrency_key" + AND active."status" = 'running' + -- Candidates require available_at <= NOW(); active leased runs + -- require available_at > NOW(). Keep explicit self-exclusion + -- for readability/safety. + AND active."id" <> wr."id" + AND active."available_at" > NOW() + ) < wr."concurrency_limit" + ELSE FALSE + END ) ORDER BY CASE WHEN wr."status" = 'pending' THEN 0 ELSE 1 END, @@ -819,62 +921,3 @@ function decodeCursor(cursor: string): Cursor { id: parsed.id, }; } - -/** - * Normalize and validate workflow concurrency metadata passed to create calls. - * @param params - Workflow run creation params - * @returns Params with normalized concurrency fields - * @throws {Error} When concurrency metadata has invalid shape or types - */ -function normalizeCreateWorkflowRunParams( - params: CreateWorkflowRunParams, -): CreateWorkflowRunParams { - const rawParams = params as unknown as Record; - const rawConcurrencyKey = rawParams["concurrencyKey"]; - const rawConcurrencyLimit = rawParams["concurrencyLimit"]; - - if (rawConcurrencyKey === undefined && rawConcurrencyLimit === undefined) { - return { - ...params, - concurrencyKey: null, - concurrencyLimit: null, - }; - } - - if ( - rawConcurrencyKey !== undefined && - rawConcurrencyKey !== null && - typeof rawConcurrencyKey !== "string" - ) { - throw new Error( - 'Invalid workflow concurrency metadata: "concurrencyKey" must be a string or null.', - ); - } - - if ( - rawConcurrencyLimit !== undefined && - rawConcurrencyLimit !== null && - typeof rawConcurrencyLimit !== "number" - ) { - throw new Error( - 'Invalid workflow concurrency metadata: "concurrencyLimit" must be a number or null.', - ); - } - - const concurrencyKey = - rawConcurrencyKey === undefined ? null : rawConcurrencyKey; - const concurrencyLimit = - rawConcurrencyLimit === undefined ? null : rawConcurrencyLimit; - - if ((concurrencyKey === null) !== (concurrencyLimit === null)) { - throw new Error( - 'Invalid workflow concurrency metadata: "concurrencyKey" and "concurrencyLimit" must both be null or both be set.', - ); - } - - return { - ...params, - concurrencyKey, - concurrencyLimit, - }; -} diff --git a/packages/openworkflow/sqlite/backend.ts b/packages/openworkflow/sqlite/backend.ts index f35ab223..e821dd26 100644 --- a/packages/openworkflow/sqlite/backend.ts +++ b/packages/openworkflow/sqlite/backend.ts @@ -1,3 +1,8 @@ +import { + CONCURRENCY_LIMIT_MISMATCH_ERROR, + normalizeCreateWorkflowRunParams, + toConcurrencyBucket, +} from "../backend-concurrency.js"; import { DEFAULT_NAMESPACE_ID, DEFAULT_RUN_IDEMPOTENCY_PERIOD_MS, @@ -95,23 +100,33 @@ export class BackendSqlite implements Backend { createWorkflowRun(params: CreateWorkflowRunParams): Promise { const normalizedParams = normalizeCreateWorkflowRunParams(params); + const concurrencyBucket = toConcurrencyBucket(normalizedParams); const { workflowName, idempotencyKey } = normalizedParams; - if (idempotencyKey === null) { + if (idempotencyKey === null && concurrencyBucket === null) { return Promise.resolve(this.insertWorkflowRun(normalizedParams)); } try { + // BEGIN IMMEDIATE takes a RESERVED write lock up-front. This serializes + // create checks/writes across writers, which is the SQLite consistency + // model we rely on (instead of row/advisory locks). this.db.exec("BEGIN IMMEDIATE"); - const existing = this.getWorkflowRunByIdempotencyKey( - workflowName, - idempotencyKey, - new Date(Date.now() - DEFAULT_RUN_IDEMPOTENCY_PERIOD_MS), - ); - if (existing) { - this.db.exec("COMMIT"); - return Promise.resolve(existing); + if (idempotencyKey !== null) { + const existing = this.getWorkflowRunByIdempotencyKey( + workflowName, + idempotencyKey, + new Date(Date.now() - DEFAULT_RUN_IDEMPOTENCY_PERIOD_MS), + ); + if (existing) { + this.db.exec("COMMIT"); + return Promise.resolve(existing); + } + } + + if (concurrencyBucket) { + this.assertNoActiveBucketConcurrencyLimitMismatch(concurrencyBucket); } const workflowRun = this.insertWorkflowRun(normalizedParams); @@ -216,6 +231,45 @@ export class BackendSqlite implements Backend { return row ? rowToWorkflowRun(row) : null; } + private assertNoActiveBucketConcurrencyLimitMismatch(params: { + workflowName: string; + version: string | null; + key: string; + limit: number; + }): void { + const stmt = this.db.prepare(` + SELECT "id" + FROM "workflow_runs" + WHERE "namespace_id" = ? + AND "workflow_name" = ? + AND ( + "version" = ? + OR ("version" IS NULL AND ? IS NULL) + ) + AND "concurrency_key" = ? + -- Sleeping runs are excluded so long sleeps do not pin historical + -- limits and block new run creation after config changes. + AND "status" IN ('pending', 'running') + -- "params.limit" is always non-null because this is called only when + -- toConcurrencyBucket() returns a constrained bucket. + AND ("concurrency_limit" <> ? OR "concurrency_limit" IS NULL) + LIMIT 1 + `); + + const conflict = stmt.get( + this.namespaceId, + params.workflowName, + params.version, + params.version, + params.key, + params.limit, + ) as { id: string } | undefined; + + if (conflict) { + throw new Error(CONCURRENCY_LIMIT_MISMATCH_ERROR); + } + } + getWorkflowRun(params: GetWorkflowRunParams): Promise { const stmt = this.db.prepare(` SELECT * @@ -237,7 +291,8 @@ export class BackendSqlite implements Backend { const currentTime = now(); const newAvailableAt = addMilliseconds(currentTime, params.leaseDurationMs); - // SQLite doesn't have SKIP LOCKED, so we need to handle claims differently + // SQLite doesn't have SKIP LOCKED. BEGIN IMMEDIATE serializes writers with + // the database's single-writer lock, which is the intended claim model. this.db.exec("BEGIN IMMEDIATE"); try { @@ -1124,62 +1179,3 @@ function decodeCursor(cursor: string): Cursor { id: parsed.id, }; } - -/** - * Normalize and validate workflow concurrency metadata passed to create calls. - * @param params - Workflow run creation params - * @returns Params with normalized concurrency fields - * @throws {Error} When concurrency metadata has invalid shape or types - */ -function normalizeCreateWorkflowRunParams( - params: CreateWorkflowRunParams, -): CreateWorkflowRunParams { - const rawParams = params as unknown as Record; - const rawConcurrencyKey = rawParams["concurrencyKey"]; - const rawConcurrencyLimit = rawParams["concurrencyLimit"]; - - if (rawConcurrencyKey === undefined && rawConcurrencyLimit === undefined) { - return { - ...params, - concurrencyKey: null, - concurrencyLimit: null, - }; - } - - if ( - rawConcurrencyKey !== undefined && - rawConcurrencyKey !== null && - typeof rawConcurrencyKey !== "string" - ) { - throw new Error( - 'Invalid workflow concurrency metadata: "concurrencyKey" must be a string or null.', - ); - } - - if ( - rawConcurrencyLimit !== undefined && - rawConcurrencyLimit !== null && - typeof rawConcurrencyLimit !== "number" - ) { - throw new Error( - 'Invalid workflow concurrency metadata: "concurrencyLimit" must be a number or null.', - ); - } - - const concurrencyKey = - rawConcurrencyKey === undefined ? null : rawConcurrencyKey; - const concurrencyLimit = - rawConcurrencyLimit === undefined ? null : rawConcurrencyLimit; - - if ((concurrencyKey === null) !== (concurrencyLimit === null)) { - throw new Error( - 'Invalid workflow concurrency metadata: "concurrencyKey" and "concurrencyLimit" must both be null or both be set.', - ); - } - - return { - ...params, - concurrencyKey, - concurrencyLimit, - }; -} From 057d0a44e99766d5804732168ddb377b36e41e4e Mon Sep 17 00:00:00 2001 From: Vaggeilis Yfantis Date: Sun, 15 Feb 2026 14:48:05 +0200 Subject: [PATCH 3/3] feat: Update workflow concurrency to support optional keys and default bucket usage --- ARCHITECTURE.md | 8 +- packages/docs/docs/workers.mdx | 4 +- packages/docs/docs/workflows.mdx | 8 +- packages/openworkflow/README.md | 10 +- packages/openworkflow/backend-concurrency.ts | 8 +- packages/openworkflow/backend.testsuite.ts | 119 ++++++++++++++++-- packages/openworkflow/client.test.ts | 46 +++++++ packages/openworkflow/client.ts | 42 ++++--- packages/openworkflow/postgres/backend.ts | 15 ++- .../openworkflow/postgres/postgres.test.ts | 8 +- packages/openworkflow/postgres/postgres.ts | 3 +- packages/openworkflow/sqlite/backend.ts | 19 ++- packages/openworkflow/sqlite/sqlite.test.ts | 11 ++ packages/openworkflow/sqlite/sqlite.ts | 3 +- packages/openworkflow/workflow.ts | 5 +- 15 files changed, 248 insertions(+), 61 deletions(-) diff --git a/ARCHITECTURE.md b/ARCHITECTURE.md index 3dd81409..774e4aee 100644 --- a/ARCHITECTURE.md +++ b/ARCHITECTURE.md @@ -329,9 +329,11 @@ defineWorkflow( ``` `key` and `limit` can each be either static values (`string`/`number`) or -functions of the validated workflow input. They are resolved once when the run -is created and persisted on `workflow_runs`. +functions of the validated workflow input, and `key` is optional. They are +resolved once when the run is created and persisted on `workflow_runs`. Resolved keys are stored verbatim; only empty/all-whitespace keys are rejected. +When `key` is omitted, the run uses the default bucket for +`namespace_id + workflow_name + version`. During claim/dequeue, a run is claimable only when the number of active leased `running` runs in the same bucket is below the run's `limit`. The bucket scope @@ -340,7 +342,7 @@ is: - `namespace_id` - `workflow_name` - `version` (version-aware buckets) -- `concurrency_key` +- `concurrency_key` (nullable for the default bucket) `pending`, `sleeping`, and expired-lease `running` runs do not consume concurrency slots. diff --git a/packages/docs/docs/workers.mdx b/packages/docs/docs/workers.mdx index 475a62ca..e0df82c8 100644 --- a/packages/docs/docs/workers.mdx +++ b/packages/docs/docs/workers.mdx @@ -98,7 +98,7 @@ defineWorkflow( { name: "process-order", concurrency: { - key: ({ input }) => `tenant:${input.tenantId}`, // or: "tenant:acme" + key: ({ input }) => `tenant:${input.tenantId}`, // optional limit: ({ input }) => input.maxConcurrentOrders, // or: 5 }, }, @@ -113,7 +113,7 @@ Workers will only claim a run when the bucket has capacity. Bucket scope is: - namespace - workflow name - workflow version -- resolved concurrency key +- resolved concurrency key (or default bucket when key is omitted) Only active leased `running` runs consume workflow-concurrency slots. Resolved keys are stored verbatim; only empty/all-whitespace keys are rejected. diff --git a/packages/docs/docs/workflows.mdx b/packages/docs/docs/workflows.mdx index 526695ad..b9fdf4f3 100644 --- a/packages/docs/docs/workflows.mdx +++ b/packages/docs/docs/workflows.mdx @@ -198,7 +198,7 @@ defineWorkflow( { name: "process-order", concurrency: { - key: ({ input }) => `tenant:${input.tenantId}`, // or: "tenant:acme" + key: ({ input }) => `tenant:${input.tenantId}`, // optional limit: ({ input }) => input.maxConcurrentOrders, // or: 5 }, }, @@ -208,9 +208,9 @@ defineWorkflow( ); ``` -- `key` can be a string or a function `({ input }) => string` +- `key` is optional; when set it can be a string or `({ input }) => string` - `limit` can be a number or a function `({ input }) => number` -- key must resolve to a non-empty string +- if provided, key must resolve to a non-empty string - limit must resolve to a positive integer - resolved keys are stored verbatim; only empty/all-whitespace keys are rejected - within active runs (`pending`/`running`) for the same @@ -221,7 +221,7 @@ When concurrency is configured, runs in the same bucket are constrained by: - namespace - workflow name - workflow version -- resolved `key` +- resolved `key` (or the default bucket when key is omitted) Only actively leased `running` runs consume slots. `pending`, `sleeping`, and expired-lease runs do not. diff --git a/packages/openworkflow/README.md b/packages/openworkflow/README.md index 4bab0e77..34b085a1 100644 --- a/packages/openworkflow/README.md +++ b/packages/openworkflow/README.md @@ -67,7 +67,7 @@ For more details, check out our [docs](https://openworkflow.dev/docs). - ✅ **Long pauses** - Sleep for seconds or months - ✅ **Scheduled runs** - Start workflows at a specific time - ✅ **Parallel execution** - Run steps concurrently -- ✅ **Workflow concurrency** - Limit active runs by key (static or input-based) +- ✅ **Workflow concurrency** - Limit active runs by bucket (optional key) - ✅ **Idempotency keys** - Deduplicate repeated run requests (24h window) - ✅ **No extra servers** - Uses your existing database - ✅ **Dashboard included** - Monitor and debug workflows @@ -82,7 +82,7 @@ const workflow = defineWorkflow( { name: "process-order", concurrency: { - key: ({ input }) => `tenant:${input.tenantId}`, // or: "tenant:acme" + key: ({ input }) => `tenant:${input.tenantId}`, // optional limit: ({ input }) => input.maxConcurrentOrders, // or: 5 }, }, @@ -92,8 +92,10 @@ const workflow = defineWorkflow( ); ``` -`key` must resolve to a non-empty string and `limit` must resolve to a positive -integer. Invalid values fail run creation. +`limit` must resolve to a positive integer. If `key` is provided, it must +resolve to a non-empty string. Invalid values fail run creation. +When `key` is omitted, runs use the default bucket for +`namespace + workflow + version`. Keys are stored verbatim (for example, `" foo "` and `"foo"` are different concurrency keys); only empty or all-whitespace keys are rejected. Sleeping runs do not consume workflow-concurrency slots until they are claimed diff --git a/packages/openworkflow/backend-concurrency.ts b/packages/openworkflow/backend-concurrency.ts index 40e3e5a9..114969ef 100644 --- a/packages/openworkflow/backend-concurrency.ts +++ b/packages/openworkflow/backend-concurrency.ts @@ -9,7 +9,7 @@ const INVALID_CONCURRENCY_LIMIT_TYPE_ERROR = export const INVALID_CONCURRENCY_LIMIT_VALUE_ERROR = 'Invalid workflow concurrency metadata: "concurrencyLimit" must be a positive integer or null.'; const INVALID_CONCURRENCY_PAIR_ERROR = - 'Invalid workflow concurrency metadata: "concurrencyKey" and "concurrencyLimit" must both be null or both be set.'; + 'Invalid workflow concurrency metadata: "concurrencyLimit" must be set when "concurrencyKey" is provided.'; export const CONCURRENCY_LIMIT_MISMATCH_ERROR = 'Invalid workflow concurrency metadata: active runs in the same bucket must use the same "concurrencyLimit".'; @@ -19,7 +19,7 @@ export const CONCURRENCY_LIMIT_MISMATCH_ERROR = export interface ConcurrencyBucket { workflowName: string; version: string | null; - key: string; + key: string | null; limit: number; } @@ -66,7 +66,7 @@ export function normalizeCreateWorkflowRunParams( const concurrencyLimit = rawConcurrencyLimit === undefined ? null : rawConcurrencyLimit; - if ((concurrencyKey === null) !== (concurrencyLimit === null)) { + if (concurrencyKey !== null && concurrencyLimit === null) { throw new Error(INVALID_CONCURRENCY_PAIR_ERROR); } @@ -101,7 +101,7 @@ export function normalizeCreateWorkflowRunParams( export function toConcurrencyBucket( params: CreateWorkflowRunParams, ): ConcurrencyBucket | null { - if (params.concurrencyKey === null || params.concurrencyLimit === null) { + if (params.concurrencyLimit === null) { return null; } diff --git a/packages/openworkflow/backend.testsuite.ts b/packages/openworkflow/backend.testsuite.ts index 6d9de56e..c9b52d3d 100644 --- a/packages/openworkflow/backend.testsuite.ts +++ b/packages/openworkflow/backend.testsuite.ts @@ -173,7 +173,7 @@ export function testBackend(options: TestBackendOptions): void { expect(created.concurrencyLimit).toBeNull(); }); - test("rejects mismatched workflow concurrency metadata pairs", async () => { + test("rejects key-only workflow concurrency metadata", async () => { const base = { workflowName: randomUUID(), version: null, @@ -193,19 +193,30 @@ export function testBackend(options: TestBackendOptions): void { await expect( Promise.resolve().then(() => backend.createWorkflowRun(keyOnly)), ).rejects.toThrow( - 'Invalid workflow concurrency metadata: "concurrencyKey" and "concurrencyLimit" must both be null or both be set.', + 'Invalid workflow concurrency metadata: "concurrencyLimit" must be set when "concurrencyKey" is provided.', ); + }); + + test("accepts limit-only workflow concurrency metadata as default bucket", async () => { + const base = { + workflowName: randomUUID(), + version: null, + idempotencyKey: null, + input: null, + config: {}, + context: null, + availableAt: null, + deadlineAt: null, + }; const limitOnly = { ...base, concurrencyKey: null, concurrencyLimit: 1, }; - await expect( - Promise.resolve().then(() => backend.createWorkflowRun(limitOnly)), - ).rejects.toThrow( - 'Invalid workflow concurrency metadata: "concurrencyKey" and "concurrencyLimit" must both be null or both be set.', - ); + const created = await backend.createWorkflowRun(limitOnly); + expect(created.concurrencyKey).toBeNull(); + expect(created.concurrencyLimit).toBe(1); }); test("rejects invalid workflow concurrency limit values", async () => { @@ -289,6 +300,35 @@ export function testBackend(options: TestBackendOptions): void { await teardown(backend); }); + test("rejects mixed concurrency limits for the same active default bucket", async () => { + const backend = await setup(); + const workflowName = randomUUID(); + const version = "v1"; + + await createPendingWorkflowRun(backend, { + workflowName, + version, + concurrencyLimit: 1, + }); + + await expect( + backend.createWorkflowRun({ + workflowName, + version, + idempotencyKey: null, + concurrencyKey: null, + concurrencyLimit: 2, + input: null, + config: {}, + context: null, + availableAt: null, + deadlineAt: null, + }), + ).rejects.toThrow(CONCURRENCY_LIMIT_MISMATCH_ERROR); + + await teardown(backend); + }); + test("allows changing concurrency limit after terminal runs leave active states", async () => { const backend = await setup(); const workflowName = randomUUID(); @@ -1034,6 +1074,40 @@ export function testBackend(options: TestBackendOptions): void { await teardown(backend); }); + test("enforces concurrency limit for default workflow-version bucket when key is omitted", async () => { + const backend = await setup(); + const workflowName = randomUUID(); + const version = "v1"; + const concurrencyLimit = 1; + + await createPendingWorkflowRun(backend, { + workflowName, + version, + concurrencyLimit, + }); + await createPendingWorkflowRun(backend, { + workflowName, + version, + concurrencyLimit, + }); + + const firstClaimed = await backend.claimWorkflowRun({ + workerId: randomUUID(), + leaseDurationMs: 100, + }); + expect(firstClaimed).not.toBeNull(); + expect(firstClaimed?.concurrencyKey).toBeNull(); + expect(firstClaimed?.concurrencyLimit).toBe(concurrencyLimit); + + const blocked = await backend.claimWorkflowRun({ + workerId: randomUUID(), + leaseDurationMs: 100, + }); + expect(blocked).toBeNull(); + + await teardown(backend); + }); + test("supports limits greater than one", async () => { const backend = await setup(); const workflowName = randomUUID(); @@ -1210,6 +1284,37 @@ export function testBackend(options: TestBackendOptions): void { await teardown(backend); }); + test("allows claims for different versions in default bucket when key is omitted", async () => { + const backend = await setup(); + const workflowName = randomUUID(); + + await createPendingWorkflowRun(backend, { + workflowName, + version: "v1", + concurrencyLimit: 1, + }); + await createPendingWorkflowRun(backend, { + workflowName, + version: "v2", + concurrencyLimit: 1, + }); + + const firstClaimed = await backend.claimWorkflowRun({ + workerId: randomUUID(), + leaseDurationMs: 100, + }); + const secondClaimed = await backend.claimWorkflowRun({ + workerId: randomUUID(), + leaseDurationMs: 100, + }); + + expect(firstClaimed).not.toBeNull(); + expect(secondClaimed).not.toBeNull(); + expect(secondClaimed?.id).not.toBe(firstClaimed?.id); + + await teardown(backend); + }); + test("allows claims after the active lease expires", async () => { const backend = await setup(); const workflowName = randomUUID(); diff --git a/packages/openworkflow/client.test.ts b/packages/openworkflow/client.test.ts index bd0321f1..76cfc490 100644 --- a/packages/openworkflow/client.test.ts +++ b/packages/openworkflow/client.test.ts @@ -354,6 +354,25 @@ describe("OpenWorkflow", () => { expect(handle.workflowRun.concurrencyLimit).toBe(3); }); + test("resolves literal workflow concurrency limit without key", async () => { + const backend = await createBackend(); + const client = new OpenWorkflow({ backend }); + + const workflow = client.defineWorkflow( + { + name: "concurrency-literal-limit-only-test", + concurrency: { + limit: 3, + }, + }, + noopFn, + ); + const handle = await workflow.run({ value: 1 }); + + expect(handle.workflowRun.concurrencyKey).toBeNull(); + expect(handle.workflowRun.concurrencyLimit).toBe(3); + }); + test("resolves function workflow concurrency values from parsed input", async () => { const backend = await createBackend(); const client = new OpenWorkflow({ backend }); @@ -384,6 +403,33 @@ describe("OpenWorkflow", () => { expect(handle.workflowRun.concurrencyLimit).toBe(4); }); + test("resolves function workflow concurrency limit from parsed input without key", async () => { + const backend = await createBackend(); + const client = new OpenWorkflow({ backend }); + + const schema = z.object({ + limit: z.coerce.number().int().positive(), + }); + + const workflow = client.defineWorkflow( + { + name: "concurrency-function-limit-only-test", + schema, + concurrency: { + limit: ({ input }) => Number(input.limit), + }, + }, + noopFn, + ); + + const handle = await workflow.run({ + limit: "4", + }); + + expect(handle.workflowRun.concurrencyKey).toBeNull(); + expect(handle.workflowRun.concurrencyLimit).toBe(4); + }); + test("throws when resolved workflow concurrency key is invalid", async () => { const backend = await createBackend(); const client = new OpenWorkflow({ backend }); diff --git a/packages/openworkflow/client.ts b/packages/openworkflow/client.ts index 2b04c02a..5f39069a 100644 --- a/packages/openworkflow/client.ts +++ b/packages/openworkflow/client.ts @@ -270,25 +270,6 @@ function resolveWorkflowConcurrency( }; } - let keyValue: unknown; - try { - keyValue = - typeof concurrency.key === "function" - ? concurrency.key({ input }) - : concurrency.key; - } catch (error) { - throw new Error( - `Failed to resolve concurrency key for workflow "${workflowName}"`, - { cause: error }, - ); - } - - if (typeof keyValue !== "string" || keyValue.trim().length === 0) { - throw new Error( - `Invalid concurrency key for workflow "${workflowName}": expected a non-empty string`, - ); - } - let limitValue: unknown; try { limitValue = @@ -312,6 +293,29 @@ function resolveWorkflowConcurrency( ); } + let keyValue: string | null = null; + if (concurrency.key !== undefined) { + let resolvedKey: unknown; + try { + resolvedKey = + typeof concurrency.key === "function" + ? concurrency.key({ input }) + : concurrency.key; + } catch (error) { + throw new Error( + `Failed to resolve concurrency key for workflow "${workflowName}"`, + { cause: error }, + ); + } + + if (typeof resolvedKey !== "string" || resolvedKey.trim().length === 0) { + throw new Error( + `Invalid concurrency key for workflow "${workflowName}": expected a non-empty string`, + ); + } + keyValue = resolvedKey; + } + return { concurrencyKey: keyValue, concurrencyLimit: limitValue, diff --git a/packages/openworkflow/postgres/backend.ts b/packages/openworkflow/postgres/backend.ts index e86dfeec..c9a368e8 100644 --- a/packages/openworkflow/postgres/backend.ts +++ b/packages/openworkflow/postgres/backend.ts @@ -225,7 +225,11 @@ export class BackendPostgres implements Backend { private async acquireConcurrencyCreateLock( pg: Postgres, - params: { workflowName: string; version: string | null; key: string }, + params: { + workflowName: string; + version: string | null; + key: string | null; + }, ): Promise { // Intentionally uses a different lock payload shape than claim-time locks. // Create-time lock serializes concurrent creates in the bucket, while @@ -267,7 +271,7 @@ export class BackendPostgres implements Backend { params: { workflowName: string; version: string | null; - key: string; + key: string | null; limit: number; }, ): Promise { @@ -278,7 +282,7 @@ export class BackendPostgres implements Backend { WHERE "namespace_id" = ${this.namespaceId} AND "workflow_name" = ${params.workflowName} AND "version" IS NOT DISTINCT FROM ${params.version} - AND "concurrency_key" = ${params.key} + AND "concurrency_key" IS NOT DISTINCT FROM ${params.key} -- Sleeping runs are excluded so long sleeps do not pin historical -- limits and block new run creation after config changes. AND "status" IN ('pending', 'running') @@ -395,8 +399,7 @@ export class BackendPostgres implements Backend { AND wr."available_at" <= NOW() AND (wr."deadline_at" IS NULL OR wr."deadline_at" > NOW()) AND ( - wr."concurrency_key" IS NULL - OR wr."concurrency_limit" IS NULL + wr."concurrency_limit" IS NULL OR CASE -- cspell:ignore xact hashtextextended -- Serialize constrained claims per bucket. pg_try_advisory lock @@ -424,7 +427,7 @@ export class BackendPostgres implements Backend { WHERE active."namespace_id" = wr."namespace_id" AND active."workflow_name" = wr."workflow_name" AND active."version" IS NOT DISTINCT FROM wr."version" - AND active."concurrency_key" = wr."concurrency_key" + AND active."concurrency_key" IS NOT DISTINCT FROM wr."concurrency_key" AND active."status" = 'running' -- Candidates require available_at <= NOW(); active leased runs -- require available_at > NOW(). Keep explicit self-exclusion diff --git a/packages/openworkflow/postgres/postgres.test.ts b/packages/openworkflow/postgres/postgres.test.ts index 141d7078..b5fad2d1 100644 --- a/packages/openworkflow/postgres/postgres.test.ts +++ b/packages/openworkflow/postgres/postgres.test.ts @@ -83,9 +83,12 @@ describe("postgres", () => { const indexes = await pg< { indexName: string; + indexDef: string; }[] >` - SELECT indexname AS "indexName" + SELECT + indexname AS "indexName", + indexdef AS "indexDef" FROM pg_indexes WHERE schemaname = ${schema} AND tablename = 'workflow_runs' @@ -93,6 +96,9 @@ describe("postgres", () => { `; /* cspell:enable */ expect(indexes).toHaveLength(1); + expect(indexes[0]?.indexDef).toMatch( + /WHERE\s+\((?:"concurrency_limit"|concurrency_limit)\s+IS\s+NOT\s+NULL\)/i, + ); } finally { await dropSchema(pg, schema); } diff --git a/packages/openworkflow/postgres/postgres.ts b/packages/openworkflow/postgres/postgres.ts index c224716b..824562c9 100644 --- a/packages/openworkflow/postgres/postgres.ts +++ b/packages/openworkflow/postgres/postgres.ts @@ -218,8 +218,7 @@ export function migrations(schema: string): string[] { "status", "available_at" ) - WHERE "concurrency_key" IS NOT NULL - AND "concurrency_limit" IS NOT NULL; + WHERE "concurrency_limit" IS NOT NULL; INSERT INTO ${quotedSchema}."openworkflow_migrations"("version") VALUES (5) diff --git a/packages/openworkflow/sqlite/backend.ts b/packages/openworkflow/sqlite/backend.ts index e821dd26..30b94351 100644 --- a/packages/openworkflow/sqlite/backend.ts +++ b/packages/openworkflow/sqlite/backend.ts @@ -234,7 +234,7 @@ export class BackendSqlite implements Backend { private assertNoActiveBucketConcurrencyLimitMismatch(params: { workflowName: string; version: string | null; - key: string; + key: string | null; limit: number; }): void { const stmt = this.db.prepare(` @@ -246,7 +246,10 @@ export class BackendSqlite implements Backend { "version" = ? OR ("version" IS NULL AND ? IS NULL) ) - AND "concurrency_key" = ? + AND ( + "concurrency_key" = ? + OR ("concurrency_key" IS NULL AND ? IS NULL) + ) -- Sleeping runs are excluded so long sleeps do not pin historical -- limits and block new run creation after config changes. AND "status" IN ('pending', 'running') @@ -262,6 +265,7 @@ export class BackendSqlite implements Backend { params.version, params.version, params.key, + params.key, params.limit, ) as { id: string } | undefined; @@ -329,8 +333,7 @@ export class BackendSqlite implements Backend { AND wr."available_at" <= ? AND (wr."deadline_at" IS NULL OR wr."deadline_at" > ?) AND ( - wr."concurrency_key" IS NULL - OR wr."concurrency_limit" IS NULL + wr."concurrency_limit" IS NULL OR ( -- TODO: If claim latency becomes a hot spot, replace this -- correlated count with precomputed bucket counts via a CTE. @@ -345,7 +348,13 @@ export class BackendSqlite implements Backend { AND wr."version" IS NULL ) ) - AND active."concurrency_key" = wr."concurrency_key" + AND ( + active."concurrency_key" = wr."concurrency_key" + OR ( + active."concurrency_key" IS NULL + AND wr."concurrency_key" IS NULL + ) + ) AND active."status" = 'running' -- Candidates require available_at <= now; active leased runs -- require available_at > now. Keep explicit self-exclusion diff --git a/packages/openworkflow/sqlite/sqlite.test.ts b/packages/openworkflow/sqlite/sqlite.test.ts index b462117f..ce823540 100644 --- a/packages/openworkflow/sqlite/sqlite.test.ts +++ b/packages/openworkflow/sqlite/sqlite.test.ts @@ -209,6 +209,17 @@ describe("sqlite", () => { .all() as { name: string }[]; const indexNames = indexRows.map((index) => index.name); expect(indexNames).toContain("workflow_runs_concurrency_active_idx"); + + const concurrencyIndex = db + .prepare( + `SELECT sql FROM sqlite_master WHERE type = 'index' AND name = ?`, + ) + .get("workflow_runs_concurrency_active_idx") as + | { sql: string | null } + | undefined; + expect(concurrencyIndex?.sql).toContain( + 'WHERE "concurrency_limit" IS NOT NULL', + ); }); }); diff --git a/packages/openworkflow/sqlite/sqlite.ts b/packages/openworkflow/sqlite/sqlite.ts index 607ae741..5b511bd0 100644 --- a/packages/openworkflow/sqlite/sqlite.ts +++ b/packages/openworkflow/sqlite/sqlite.ts @@ -212,8 +212,7 @@ export function migrations(): string[] { "status", "available_at" ) - WHERE "concurrency_key" IS NOT NULL - AND "concurrency_limit" IS NOT NULL; + WHERE "concurrency_limit" IS NOT NULL; INSERT OR IGNORE INTO "openworkflow_migrations" ("version") VALUES (5); diff --git a/packages/openworkflow/workflow.ts b/packages/openworkflow/workflow.ts index 0caa82b4..30da25c3 100644 --- a/packages/openworkflow/workflow.ts +++ b/packages/openworkflow/workflow.ts @@ -15,9 +15,10 @@ export interface WorkflowConcurrencyResolverParams { */ export interface WorkflowConcurrency { /** - * Bucket key used to scope concurrency for this run. + * Optional bucket key used to scope concurrency for this run. + * When omitted, runs use the default workflow+version bucket. */ - readonly key: + readonly key?: | string | ((params: Readonly>) => string); /**