Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .changeset/ninety-cows-lay.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"@trigger.dev/sdk": patch
---

feat(sdk): Support debouncing runs when triggering with new debounce options
6 changes: 6 additions & 0 deletions .cursor/rules/migrations.mdc
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
---
description: how to create and apply database migrations
alwaysApply: false
---

Follow our [migrations.md](mdc:ai/references/migrations.md) guide for how to create and apply database migrations.
121 changes: 121 additions & 0 deletions ai/references/migrations.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,121 @@
## Creating and applying migrations

We use prisma migrations to manage the database schema. Please follow the following steps when editing the `internal-packages/database/prisma/schema.prisma` file:

Edit the `schema.prisma` file to add or modify the schema.

Create a new migration file but don't apply it yet:

```bash
cd internal-packages/database
pnpm run db:migrate:dev:create --name "add_new_column_to_table"
```

The migration file will be created in the `prisma/migrations` directory, but it will have a bunch of edits to the schema that are not needed and will need to be removed before we can apply the migration. Here's an example of what the migration file might look like:

```sql
-- AlterEnum
ALTER TYPE "public"."TaskRunExecutionStatus" ADD VALUE 'DELAYED';

-- AlterTable
ALTER TABLE "public"."TaskRun" ADD COLUMN "debounce" JSONB;

-- AlterTable
ALTER TABLE "public"."_BackgroundWorkerToBackgroundWorkerFile" ADD CONSTRAINT "_BackgroundWorkerToBackgroundWorkerFile_AB_pkey" PRIMARY KEY ("A", "B");

-- DropIndex
DROP INDEX "public"."_BackgroundWorkerToBackgroundWorkerFile_AB_unique";

-- AlterTable
ALTER TABLE "public"."_BackgroundWorkerToTaskQueue" ADD CONSTRAINT "_BackgroundWorkerToTaskQueue_AB_pkey" PRIMARY KEY ("A", "B");

-- DropIndex
DROP INDEX "public"."_BackgroundWorkerToTaskQueue_AB_unique";

-- AlterTable
ALTER TABLE "public"."_TaskRunToTaskRunTag" ADD CONSTRAINT "_TaskRunToTaskRunTag_AB_pkey" PRIMARY KEY ("A", "B");

-- DropIndex
DROP INDEX "public"."_TaskRunToTaskRunTag_AB_unique";

-- AlterTable
ALTER TABLE "public"."_WaitpointRunConnections" ADD CONSTRAINT "_WaitpointRunConnections_AB_pkey" PRIMARY KEY ("A", "B");

-- DropIndex
DROP INDEX "public"."_WaitpointRunConnections_AB_unique";

-- AlterTable
ALTER TABLE "public"."_completedWaitpoints" ADD CONSTRAINT "_completedWaitpoints_AB_pkey" PRIMARY KEY ("A", "B");

-- DropIndex
DROP INDEX "public"."_completedWaitpoints_AB_unique";

-- CreateIndex
CREATE INDEX "SecretStore_key_idx" ON "public"."SecretStore"("key" text_pattern_ops);

-- CreateIndex
CREATE INDEX "TaskRun_runtimeEnvironmentId_id_idx" ON "public"."TaskRun"("runtimeEnvironmentId", "id" DESC);

-- CreateIndex
CREATE INDEX "TaskRun_runtimeEnvironmentId_createdAt_idx" ON "public"."TaskRun"("runtimeEnvironmentId", "createdAt" DESC);
```

All the following lines should be removed:

```sql
-- AlterTable
ALTER TABLE "public"."_BackgroundWorkerToBackgroundWorkerFile" ADD CONSTRAINT "_BackgroundWorkerToBackgroundWorkerFile_AB_pkey" PRIMARY KEY ("A", "B");

-- DropIndex
DROP INDEX "public"."_BackgroundWorkerToBackgroundWorkerFile_AB_unique";

-- AlterTable
ALTER TABLE "public"."_BackgroundWorkerToTaskQueue" ADD CONSTRAINT "_BackgroundWorkerToTaskQueue_AB_pkey" PRIMARY KEY ("A", "B");

-- DropIndex
DROP INDEX "public"."_BackgroundWorkerToTaskQueue_AB_unique";

-- AlterTable
ALTER TABLE "public"."_TaskRunToTaskRunTag" ADD CONSTRAINT "_TaskRunToTaskRunTag_AB_pkey" PRIMARY KEY ("A", "B");

-- DropIndex
DROP INDEX "public"."_TaskRunToTaskRunTag_AB_unique";

-- AlterTable
ALTER TABLE "public"."_WaitpointRunConnections" ADD CONSTRAINT "_WaitpointRunConnections_AB_pkey" PRIMARY KEY ("A", "B");

-- DropIndex
DROP INDEX "public"."_WaitpointRunConnections_AB_unique";

-- AlterTable
ALTER TABLE "public"."_completedWaitpoints" ADD CONSTRAINT "_completedWaitpoints_AB_pkey" PRIMARY KEY ("A", "B");

-- DropIndex
DROP INDEX "public"."_completedWaitpoints_AB_unique";

-- CreateIndex
CREATE INDEX "SecretStore_key_idx" ON "public"."SecretStore"("key" text_pattern_ops);

-- CreateIndex
CREATE INDEX "TaskRun_runtimeEnvironmentId_id_idx" ON "public"."TaskRun"("runtimeEnvironmentId", "id" DESC);

-- CreateIndex
CREATE INDEX "TaskRun_runtimeEnvironmentId_createdAt_idx" ON "public"."TaskRun"("runtimeEnvironmentId", "createdAt" DESC);
```

Leaving only this:

```sql
-- AlterEnum
ALTER TYPE "public"."TaskRunExecutionStatus" ADD VALUE 'DELAYED';

-- AlterTable
ALTER TABLE "public"."TaskRun" ADD COLUMN "debounce" JSONB;
```

After editing the migration file, apply the migration:

```bash
cd internal-packages/database
pnpm run db:migrate:deploy && pnpm run generate
```
6 changes: 6 additions & 0 deletions apps/webapp/app/env.server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -611,6 +611,12 @@ const EnvironmentSchema = z
.default(60_000),
RUN_ENGINE_SUSPENDED_HEARTBEAT_RETRIES_FACTOR: z.coerce.number().default(2),

/** Maximum duration in milliseconds that a run can be debounced. Default: 1 hour (3,600,000ms) */
RUN_ENGINE_MAXIMUM_DEBOUNCE_DURATION_MS: z.coerce
.number()
.int()
.default(60_000 * 60), // 1 hour

RUN_ENGINE_WORKER_REDIS_HOST: z
.string()
.optional()
Expand Down
3 changes: 3 additions & 0 deletions apps/webapp/app/presenters/v3/SpanPresenter.server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -234,6 +234,7 @@ export class SpanPresenter extends BasePresenter {
environmentId: run.runtimeEnvironment.id,
idempotencyKey: run.idempotencyKey,
idempotencyKeyExpiresAt: run.idempotencyKeyExpiresAt,
debounce: run.debounce as { key: string; delay: string; createdAt: Date } | null,
schedule: await this.resolveSchedule(run.scheduleId ?? undefined),
queue: {
name: run.queue,
Expand Down Expand Up @@ -357,6 +358,8 @@ export class SpanPresenter extends BasePresenter {
//idempotency
idempotencyKey: true,
idempotencyKeyExpiresAt: true,
//debounce
debounce: true,
//delayed
delayUntil: true,
//ttl
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -556,6 +556,19 @@ function RunBody({
)}
</Property.Value>
</Property.Item>
<Property.Item>
<Property.Label>Debounce</Property.Label>
<Property.Value>
{run.debounce ? (
<div>
<div className="break-all">Key: {run.debounce.key}</div>
<div>Delay: {run.debounce.delay}</div>
</div>
) : (
"–"
)}
</Property.Value>
</Property.Item>
<Property.Item>
<Property.Label>Version</Property.Label>
<Property.Value>
Expand Down
68 changes: 68 additions & 0 deletions apps/webapp/app/runEngine/concerns/traceEvents.server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ export class DefaultTraceEventsConcern implements TraceEventConcern {
traceparent,
setAttribute: (key, value) => event.setAttribute(key as any, value),
failWithError: event.failWithError.bind(event),
stop: event.stop.bind(event),
},
store
);
Expand Down Expand Up @@ -116,6 +117,73 @@ export class DefaultTraceEventsConcern implements TraceEventConcern {
traceparent,
setAttribute: (key, value) => event.setAttribute(key as any, value),
failWithError: event.failWithError.bind(event),
stop: event.stop.bind(event),
},
store
);
}
);
}

async traceDebouncedRun<T>(
request: TriggerTaskRequest,
parentStore: string | undefined,
options: {
existingRun: TaskRun;
debounceKey: string;
incomplete: boolean;
isError: boolean;
},
callback: (span: TracedEventSpan, store: string) => Promise<T>
): Promise<T> {
const { existingRun, debounceKey, incomplete, isError } = options;
const { repository, store } = await this.#getEventRepository(request, parentStore);

return await repository.traceEvent(
`${request.taskId} (debounced)`,
{
context: request.options?.traceContext,
spanParentAsLink: request.options?.spanParentAsLink,
kind: "SERVER",
environment: request.environment,
taskSlug: request.taskId,
attributes: {
properties: {
[SemanticInternalAttributes.ORIGINAL_RUN_ID]: existingRun.friendlyId,
},
style: {
icon: "task-cached",
},
runId: existingRun.friendlyId,
},
incomplete,
isError,
immediate: true,
},
async (event, traceContext, traceparent) => {
// Log a message about the debounced trigger
await repository.recordEvent(
`Debounced: using existing run with key "${debounceKey}"`,
{
taskSlug: request.taskId,
environment: request.environment,
attributes: {
runId: existingRun.friendlyId,
},
context: request.options?.traceContext,
parentId: event.spanId,
}
);

return await callback(
{
traceId: event.traceId,
spanId: event.spanId,
traceContext,
traceparent,
setAttribute: (key, value) => event.setAttribute(key as any, value),
failWithError: event.failWithError.bind(event),
stop: event.stop.bind(event),
},
store
);
Expand Down
66 changes: 64 additions & 2 deletions apps/webapp/app/runEngine/services/triggerTask.server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -160,10 +160,34 @@ export class RunEngineTriggerTaskService {
}
}

const [parseDelayError, delayUntil] = await tryCatch(parseDelay(body.options?.delay));
// Parse delay from either explicit delay option or debounce.delay
const delaySource = body.options?.delay ?? body.options?.debounce?.delay;
const [parseDelayError, delayUntil] = await tryCatch(parseDelay(delaySource));

if (parseDelayError) {
throw new ServiceValidationError(`Invalid delay ${body.options?.delay}`);
throw new ServiceValidationError(`Invalid delay ${delaySource}`);
}

// Validate debounce options
if (body.options?.debounce) {
if (!delayUntil) {
throw new ServiceValidationError(
`Debounce requires a valid delay duration. Provided: ${body.options.debounce.delay}`
);
}

// Always validate debounce.delay separately since it's used for rescheduling
// This catches the case where options.delay is valid but debounce.delay is invalid
const [debounceDelayError, debounceDelayUntil] = await tryCatch(
parseDelay(body.options.debounce.delay)
);

if (debounceDelayError || !debounceDelayUntil) {
throw new ServiceValidationError(
`Invalid debounce delay: ${body.options.debounce.delay}. ` +
`Supported formats: {number}s, {number}m, {number}h, {number}d, {number}w`
);
}
}

const ttl =
Expand Down Expand Up @@ -340,10 +364,48 @@ export class RunEngineTriggerTaskService {
bulkActionId: body.options?.bulkActionId,
planType,
realtimeStreamsVersion: options.realtimeStreamsVersion,
debounce: body.options?.debounce,
// When debouncing with triggerAndWait, create a span for the debounced trigger
onDebounced:
body.options?.debounce && body.options?.resumeParentOnCompletion
? async ({ existingRun, waitpoint, debounceKey }) => {
return await this.traceEventConcern.traceDebouncedRun(
triggerRequest,
parentRun?.taskEventStore,
{
existingRun,
debounceKey,
incomplete: waitpoint.status === "PENDING",
isError: waitpoint.outputIsError,
},
async (spanEvent) => {
const spanId =
options?.parentAsLinkType === "replay"
? spanEvent.spanId
: spanEvent.traceparent?.spanId
? `${spanEvent.traceparent.spanId}:${spanEvent.spanId}`
: spanEvent.spanId;
return spanId;
}
);
}
: undefined,
},
this.prisma
);

// If the returned run has a different friendlyId, it was debounced.
// For triggerAndWait: stop the outer span since a replacement debounced span was created via onDebounced.
// For regular trigger: let the span complete normally - no replacement span needed since the
// original run already has its span from when it was first created.
if (
taskRun.friendlyId !== runFriendlyId &&
body.options?.debounce &&
body.options?.resumeParentOnCompletion
) {
event.stop();
}

const error = taskRun.error ? TaskRunError.parse(taskRun.error) : undefined;

if (error) {
Expand Down
17 changes: 17 additions & 0 deletions apps/webapp/app/runEngine/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,12 @@ export type TracedEventSpan = {
};
setAttribute: (key: string, value: string) => void;
failWithError: (error: TaskRunError) => void;
/**
* Stop the span without writing any event.
* Used when a debounced run is returned - the span for the debounced
* trigger is created separately via traceDebouncedRun.
*/
stop: () => void;
};

export interface TraceEventConcern {
Expand All @@ -150,6 +156,17 @@ export interface TraceEventConcern {
},
callback: (span: TracedEventSpan, store: string) => Promise<T>
): Promise<T>;
traceDebouncedRun<T>(
request: TriggerTaskRequest,
parentStore: string | undefined,
options: {
existingRun: TaskRun;
debounceKey: string;
incomplete: boolean;
isError: boolean;
},
callback: (span: TracedEventSpan, store: string) => Promise<T>
): Promise<T>;
}

export type TriggerRacepoints = "idempotencyKey";
Expand Down
4 changes: 4 additions & 0 deletions apps/webapp/app/v3/runEngine.server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -182,6 +182,10 @@ function createRunEngine() {
? createBatchGlobalRateLimiter(env.BATCH_QUEUE_GLOBAL_RATE_LIMIT)
: undefined,
},
// Debounce configuration
debounce: {
maxDebounceDurationMs: env.RUN_ENGINE_MAXIMUM_DEBOUNCE_DURATION_MS,
},
});

return engine;
Expand Down
Loading
Loading