Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
55 changes: 50 additions & 5 deletions ARCHITECTURE.md
Original file line number Diff line number Diff line change
Expand Up @@ -297,15 +297,60 @@ When the worker encounters this, it executes all steps within the `Promise.all`
concurrently. It waits for all of them to complete before proceeding. Each step
attempt is persisted individually as a `step_attempt`.

### 5.2. Workflow Concurrency
### 5.2. Worker Concurrency

Workers are configured with a concurrency limit (e.g., 10). A worker will
maintain up to 10 in-flight workflow runs simultaneously. It polls for new work
only when it has available capacity. The Backend's atomic `dequeue` operation
(`FOR UPDATE SKIP LOCKED`) ensures that multiple workers can poll the same table
without race conditions or processing the same run twice.
only when it has available capacity. Claim atomicity is backend-specific:

### 5.3. Handling Crashes During Parallel Execution
- Postgres uses `FOR UPDATE SKIP LOCKED` plus advisory locks for constrained
buckets.
- SQLite uses transaction-level single-writer locking (`BEGIN IMMEDIATE`) to
serialize claim writes.

### 5.3. Workflow-Run Concurrency

In addition to worker-slot concurrency, workflows can define a per-run
concurrency policy in the workflow spec:

```ts
defineWorkflow(
{
name: "process-order",
concurrency: {
key: ({ input }) => `tenant:${input.tenantId}`,
limit: ({ input }) => input.maxConcurrentOrders,
},
},
async ({ step }) => {
// ...
},
);
```

`key` and `limit` can each be either static values (`string`/`number`) or
functions of the validated workflow input, and `key` is optional. They are
resolved once when the run is created and persisted on `workflow_runs`.
Resolved keys are stored verbatim; only empty/all-whitespace keys are rejected.
When `key` is omitted, the run uses the default bucket for
`namespace_id + workflow_name + version`.

During claim/dequeue, a run is claimable only when the number of active leased
`running` runs in the same bucket is below the run's `limit`. The bucket scope
is:

- `namespace_id`
- `workflow_name`
- `version` (version-aware buckets)
- `concurrency_key` (nullable for the default bucket)

`pending`, `sleeping`, and expired-lease `running` runs do not consume
concurrency slots.
For active runs in a bucket (`pending`, `running`), the resolved
`concurrency_limit` is required to be consistent; conflicting limits are
rejected at run creation.

### 5.4. Handling Crashes During Parallel Execution

The `availableAt` heartbeat mechanism provides robust recovery. If a worker
crashes while executing parallel steps, its heartbeat stops. The `availableAt`
Expand Down
38 changes: 37 additions & 1 deletion packages/docs/docs/workers.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ When a worker picks up a workflow:
5. **Execute**: New steps are executed and their results are stored
6. **Complete**: The workflow status is updated to `completed` or `failed`

## Concurrency
## Worker Concurrency

Workers can process multiple workflow runs simultaneously. Configure concurrency
in your `openworkflow.config.ts`:
Expand Down Expand Up @@ -88,6 +88,40 @@ bunx @openworkflow/cli worker start --concurrency 10
capacity.
</Note>

## Workflow Concurrency

Workflow specs can also define concurrency buckets that are enforced at claim
time:

```ts
defineWorkflow(
{
name: "process-order",
concurrency: {
key: ({ input }) => `tenant:${input.tenantId}`, // optional
limit: ({ input }) => input.maxConcurrentOrders, // or: 5
},
},
async ({ step }) => {
// ...
},
);
```

Workers will only claim a run when the bucket has capacity. Bucket scope is:

- namespace
- workflow name
- workflow version
- resolved concurrency key (or default bucket when key is omitted)

Only active leased `running` runs consume workflow-concurrency slots.
Resolved keys are stored verbatim; only empty/all-whitespace keys are rejected.
Sleeping runs are non-consuming until they are claimed again as actively leased
`running` runs.
Within active `pending` and actively leased `running` runs for the same
workflow+version+key bucket, the resolved `limit` must remain consistent.

## Heartbeats and Crash Recovery

Workers maintain their claim on workflow runs through a heartbeat mechanism:
Expand Down Expand Up @@ -134,6 +168,8 @@ Workers coordinate through the database:
- Each workflow run is claimed by exactly one worker at a time
- Workers use atomic database operations to prevent duplicate processing
- If a worker crashes, its workflows become available to other workers
- SQLite relies on transaction-level single-writer locking (`BEGIN IMMEDIATE`)
while Postgres uses row locks plus advisory locks for constrained buckets

## Graceful Shutdown

Expand Down
39 changes: 39 additions & 0 deletions packages/docs/docs/workflows.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -189,6 +189,45 @@ defineWorkflow(
Any `retryPolicy` fields you omit fall back to defaults. See
[Retries](/docs/retries) for the full behavior and defaults.

### Concurrency (Optional)

Control how many active leased `running` runs are allowed for a workflow bucket:

```ts
defineWorkflow(
{
name: "process-order",
concurrency: {
key: ({ input }) => `tenant:${input.tenantId}`, // optional
limit: ({ input }) => input.maxConcurrentOrders, // or: 5
},
},
async ({ input, step }) => {
// ...
},
);
```

- `key` is optional; when set it can be a string or `({ input }) => string`
- `limit` can be a number or a function `({ input }) => number`
- if provided, key must resolve to a non-empty string
- limit must resolve to a positive integer
- resolved keys are stored verbatim; only empty/all-whitespace keys are rejected
- within active runs (`pending`/`running`) for the same
workflow+version+key bucket, `limit` must remain consistent

When concurrency is configured, runs in the same bucket are constrained by:

- namespace
- workflow name
- workflow version
- resolved `key` (or the default bucket when key is omitted)

Only actively leased `running` runs consume slots. `pending`, `sleeping`, and
expired-lease runs do not.
Sleeping runs become slot-consuming only after they are claimed again as
actively leased `running` runs.

### Idempotency Key (Optional)

You can prevent duplicate run creation by providing an idempotency key, though
Expand Down
31 changes: 31 additions & 0 deletions packages/openworkflow/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -67,11 +67,42 @@ For more details, check out our [docs](https://openworkflow.dev/docs).
- ✅ **Long pauses** - Sleep for seconds or months
- ✅ **Scheduled runs** - Start workflows at a specific time
- ✅ **Parallel execution** - Run steps concurrently
- ✅ **Workflow concurrency** - Limit active runs by bucket (optional key)
- ✅ **Idempotency keys** - Deduplicate repeated run requests (24h window)
- ✅ **No extra servers** - Uses your existing database
- ✅ **Dashboard included** - Monitor and debug workflows
- ✅ **Production ready** - PostgreSQL and SQLite support

## Workflow Concurrency

You can limit active leased `running` runs per workflow bucket:

```ts
const workflow = defineWorkflow(
{
name: "process-order",
concurrency: {
key: ({ input }) => `tenant:${input.tenantId}`, // optional
limit: ({ input }) => input.maxConcurrentOrders, // or: 5
},
},
async ({ step }) => {
// ...
},
);
```

`limit` must resolve to a positive integer. If `key` is provided, it must
resolve to a non-empty string. Invalid values fail run creation.
When `key` is omitted, runs use the default bucket for
`namespace + workflow + version`.
Keys are stored verbatim (for example, `" foo "` and `"foo"` are different
concurrency keys); only empty or all-whitespace keys are rejected.
Sleeping runs do not consume workflow-concurrency slots until they are claimed
again as actively leased `running` runs.
For a given active bucket (`workflow + version + key`), the resolved `limit`
must stay consistent across `pending`/`running` runs.

## Documentation

- [Documentation](https://openworkflow.dev/docs)
Expand Down
114 changes: 114 additions & 0 deletions packages/openworkflow/backend-concurrency.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,114 @@
import type { CreateWorkflowRunParams } from "./backend.js";

const INVALID_CONCURRENCY_KEY_TYPE_ERROR =
'Invalid workflow concurrency metadata: "concurrencyKey" must be a string or null.';
export const INVALID_CONCURRENCY_KEY_VALUE_ERROR =
'Invalid workflow concurrency metadata: "concurrencyKey" must be a non-empty string when provided.';
const INVALID_CONCURRENCY_LIMIT_TYPE_ERROR =
'Invalid workflow concurrency metadata: "concurrencyLimit" must be a number or null.';
export const INVALID_CONCURRENCY_LIMIT_VALUE_ERROR =
'Invalid workflow concurrency metadata: "concurrencyLimit" must be a positive integer or null.';
const INVALID_CONCURRENCY_PAIR_ERROR =
'Invalid workflow concurrency metadata: "concurrencyLimit" must be set when "concurrencyKey" is provided.';
export const CONCURRENCY_LIMIT_MISMATCH_ERROR =
'Invalid workflow concurrency metadata: active runs in the same bucket must use the same "concurrencyLimit".';

/**
* Bucket identity for workflow-level concurrency.
*/
export interface ConcurrencyBucket {
workflowName: string;
version: string | null;
key: string | null;
limit: number;
}

/**
* Normalize and validate workflow concurrency metadata passed to create calls.
* This protects direct backend callers that bypass client-side validation.
* @param params - Workflow run creation params
* @returns Params with normalized concurrency fields
* @throws {Error} When concurrency metadata has invalid shape or values
*/
export function normalizeCreateWorkflowRunParams(
params: CreateWorkflowRunParams,
): CreateWorkflowRunParams {
const rawParams = params as unknown as Record<string, unknown>;
const rawConcurrencyKey = rawParams["concurrencyKey"];
const rawConcurrencyLimit = rawParams["concurrencyLimit"];

if (rawConcurrencyKey === undefined && rawConcurrencyLimit === undefined) {
return {
...params,
concurrencyKey: null,
concurrencyLimit: null,
};
}

if (
rawConcurrencyKey !== undefined &&
rawConcurrencyKey !== null &&
typeof rawConcurrencyKey !== "string"
) {
throw new Error(INVALID_CONCURRENCY_KEY_TYPE_ERROR);
}

if (
rawConcurrencyLimit !== undefined &&
rawConcurrencyLimit !== null &&
typeof rawConcurrencyLimit !== "number"
) {
throw new Error(INVALID_CONCURRENCY_LIMIT_TYPE_ERROR);
}

const concurrencyKey =
rawConcurrencyKey === undefined ? null : rawConcurrencyKey;
const concurrencyLimit =
rawConcurrencyLimit === undefined ? null : rawConcurrencyLimit;

if (concurrencyKey !== null && concurrencyLimit === null) {
throw new Error(INVALID_CONCURRENCY_PAIR_ERROR);
}

if (
typeof concurrencyKey === "string" &&
concurrencyKey.trim().length === 0
) {
throw new Error(INVALID_CONCURRENCY_KEY_VALUE_ERROR);
}

if (
typeof concurrencyLimit === "number" &&
(!Number.isFinite(concurrencyLimit) ||
!Number.isInteger(concurrencyLimit) ||
concurrencyLimit <= 0)
) {
throw new Error(INVALID_CONCURRENCY_LIMIT_VALUE_ERROR);
}

return {
...params,
concurrencyKey,
concurrencyLimit,
};
}

/**
* Return bucket identity for constrained runs, otherwise null.
* @param params - Normalized workflow run creation params
* @returns Concurrency bucket or null for unconstrained runs
*/
export function toConcurrencyBucket(
params: CreateWorkflowRunParams,
): ConcurrencyBucket | null {
if (params.concurrencyLimit === null) {
return null;
}

return {
workflowName: params.workflowName,
version: params.version,
key: params.concurrencyKey,
limit: params.concurrencyLimit,
};
}
Loading
Loading