From 28cf724947cb1959bb49f4c5b48235cd84fa58a5 Mon Sep 17 00:00:00 2001 From: Val Alexander Date: Thu, 9 Apr 2026 15:18:58 -0500 Subject: [PATCH] Archive project threads when enforcing project limits - Cascade project deletion to active threads and clear retained thread payloads - Treat archived projects and threads as unavailable in command invariants - Add coverage for limit handling and deletion projection cleanup --- .../Layers/ProjectionPipeline.test.ts | 122 +++++++++++++- .../Layers/ProjectionPipeline.ts | 30 +++- .../orchestration/commandInvariants.test.ts | 68 ++++++++ .../src/orchestration/commandInvariants.ts | 52 +++++- .../src/orchestration/decider.limits.test.ts | 151 ++++++++++++++++++ apps/server/src/orchestration/decider.ts | 149 ++++++++++------- .../src/orchestration/projector.test.ts | 119 ++++++++++++++ apps/server/src/orchestration/projector.ts | 6 + packages/contracts/src/orchestration.ts | 6 +- 9 files changed, 634 insertions(+), 69 deletions(-) create mode 100644 apps/server/src/orchestration/decider.limits.test.ts diff --git a/apps/server/src/orchestration/Layers/ProjectionPipeline.test.ts b/apps/server/src/orchestration/Layers/ProjectionPipeline.test.ts index cc559dd93..52e89fb15 100644 --- a/apps/server/src/orchestration/Layers/ProjectionPipeline.test.ts +++ b/apps/server/src/orchestration/Layers/ProjectionPipeline.test.ts @@ -1,4 +1,5 @@ import { + ApprovalRequestId, CheckpointRef, CommandId, CorrelationId, @@ -910,11 +911,14 @@ it.layer(Layer.fresh(makeProjectionPipelinePrefixedTestLayer("t3-projection-atta Effect.gen(function* () { const fileSystem = yield* FileSystem.FileSystem; const path = yield* Path.Path; + const sql = yield* SqlClient.SqlClient; const projectionPipeline = yield* OrchestrationProjectionPipeline; const eventStore = yield* OrchestrationEventStore; const { attachmentsDir } = yield* ServerConfig; const now = new Date().toISOString(); const threadId = ThreadId.makeUnsafe("Thread Delete.Files"); + const turnId = TurnId.makeUnsafe("turn-delete-files"); + const approvalRequestId = ApprovalRequestId.makeUnsafe("approval-delete-files"); const attachmentId = "thread-delete-files-00000000-0000-4000-8000-000000000001"; const otherThreadAttachmentId = "thread-delete-files-extra-00000000-0000-4000-8000-000000000002"; @@ -968,15 +972,91 @@ it.layer(Layer.fresh(makeProjectionPipelinePrefixedTestLayer("t3-projection-atta }, }); + yield* appendAndProject({ + type: "thread.proposed-plan-upserted", + eventId: EventId.makeUnsafe("evt-delete-files-3a"), + aggregateKind: "thread", + aggregateId: threadId, + occurredAt: now, + commandId: CommandId.makeUnsafe("cmd-delete-files-3a"), + causationEventId: null, + correlationId: CorrelationId.makeUnsafe("cmd-delete-files-3a"), + metadata: {}, + payload: { + threadId, + proposedPlan: { + id: "plan-delete-files", + turnId, + planMarkdown: "1. Delete files", + implementedAt: null, + implementationThreadId: null, + createdAt: now, + updatedAt: now, + }, + }, + }); + + yield* appendAndProject({ + type: "thread.session-set", + eventId: EventId.makeUnsafe("evt-delete-files-3b"), + aggregateKind: "thread", + aggregateId: threadId, + occurredAt: now, + commandId: CommandId.makeUnsafe("cmd-delete-files-3b"), + causationEventId: null, + correlationId: CorrelationId.makeUnsafe("cmd-delete-files-3b"), + metadata: {}, + payload: { + threadId, + session: { + threadId, + status: "running", + providerName: "codex", + runtimeMode: "full-access", + activeTurnId: turnId, + lastError: null, + updatedAt: now, + }, + }, + }); + + yield* appendAndProject({ + type: "thread.activity-appended", + eventId: EventId.makeUnsafe("evt-delete-files-3c"), + aggregateKind: "thread", + aggregateId: threadId, + occurredAt: now, + commandId: CommandId.makeUnsafe("cmd-delete-files-3c"), + causationEventId: null, + correlationId: CorrelationId.makeUnsafe("cmd-delete-files-3c"), + metadata: { + requestId: approvalRequestId, + }, + payload: { + threadId, + activity: { + id: EventId.makeUnsafe("activity-delete-files"), + tone: "approval", + kind: "approval.requested", + summary: "Delete files approval", + payload: { + requestId: approvalRequestId, + }, + turnId, + createdAt: now, + }, + }, + }); + yield* appendAndProject({ type: "thread.message-sent", - eventId: EventId.makeUnsafe("evt-delete-files-3"), + eventId: EventId.makeUnsafe("evt-delete-files-3d"), aggregateKind: "thread", aggregateId: threadId, occurredAt: now, - commandId: CommandId.makeUnsafe("cmd-delete-files-3"), + commandId: CommandId.makeUnsafe("cmd-delete-files-3d"), causationEventId: null, - correlationId: CorrelationId.makeUnsafe("cmd-delete-files-3"), + correlationId: CorrelationId.makeUnsafe("cmd-delete-files-3d"), metadata: {}, payload: { threadId, @@ -1028,6 +1108,42 @@ it.layer(Layer.fresh(makeProjectionPipelinePrefixedTestLayer("t3-projection-atta assert.isFalse(yield* exists(threadAttachmentPath)); assert.isTrue(yield* exists(otherThreadAttachmentPath)); + + const childRowCounts = yield* sql<{ + readonly messages: number; + readonly plans: number; + readonly activities: number; + readonly sessions: number; + readonly turns: number; + readonly approvals: number; + }>` + SELECT + (SELECT COUNT(*) FROM projection_thread_messages WHERE thread_id = ${threadId}) AS "messages", + (SELECT COUNT(*) FROM projection_thread_proposed_plans WHERE thread_id = ${threadId}) AS "plans", + (SELECT COUNT(*) FROM projection_thread_activities WHERE thread_id = ${threadId}) AS "activities", + (SELECT COUNT(*) FROM projection_thread_sessions WHERE thread_id = ${threadId}) AS "sessions", + (SELECT COUNT(*) FROM projection_turns WHERE thread_id = ${threadId}) AS "turns", + (SELECT COUNT(*) FROM projection_pending_approvals WHERE thread_id = ${threadId}) AS "approvals" + `; + assert.deepEqual(childRowCounts, [ + { + messages: 0, + plans: 0, + activities: 0, + sessions: 0, + turns: 0, + approvals: 0, + }, + ]); + + const threadRows = yield* sql<{ + readonly deletedAt: string | null; + }>` + SELECT deleted_at AS "deletedAt" + FROM projection_threads + WHERE thread_id = ${threadId} + `; + assert.deepEqual(threadRows, [{ deletedAt: now }]); }), ); }, diff --git a/apps/server/src/orchestration/Layers/ProjectionPipeline.ts b/apps/server/src/orchestration/Layers/ProjectionPipeline.ts index bc65acf27..1b9b3e8a2 100644 --- a/apps/server/src/orchestration/Layers/ProjectionPipeline.ts +++ b/apps/server/src/orchestration/Layers/ProjectionPipeline.ts @@ -1,4 +1,9 @@ -import { ApprovalRequestId, type ChatAttachment, type OrchestrationEvent } from "@okcode/contracts"; +import { + ApprovalRequestId, + type ChatAttachment, + type OrchestrationEvent, + type ThreadId, +} from "@okcode/contracts"; import * as NodeServices from "@effect/platform-node/NodeServices"; import { Effect, FileSystem, Layer, Option, Path, Stream } from "effect"; import * as SqlClient from "effect/unstable/sql/SqlClient"; @@ -347,6 +352,27 @@ const makeOrchestrationProjectionPipeline = Effect.gen(function* () { const path = yield* Path.Path; const serverConfig = yield* ServerConfig; + const deleteProjectedThreadChildren = (threadId: ThreadId) => + Effect.gen(function* () { + yield* projectionThreadMessageRepository.deleteByThreadId({ threadId }); + yield* projectionThreadProposedPlanRepository.deleteByThreadId({ threadId }); + yield* projectionThreadActivityRepository.deleteByThreadId({ threadId }); + yield* projectionThreadSessionRepository.deleteByThreadId({ threadId }); + yield* projectionTurnRepository.deleteByThreadId({ threadId }); + + const pendingApprovals = yield* projectionPendingApprovalRepository.listByThreadId({ + threadId, + }); + yield* Effect.forEach( + pendingApprovals, + (approval) => + projectionPendingApprovalRepository.deleteByRequestId({ + requestId: approval.requestId, + }), + { concurrency: 1 }, + ).pipe(Effect.asVoid); + }); + const applyProjectsProjection: ProjectorDefinition["apply"] = (event, _attachmentSideEffects) => Effect.gen(function* () { switch (event.type) { @@ -489,6 +515,7 @@ const makeOrchestrationProjectionPipeline = Effect.gen(function* () { threadId: event.payload.threadId, }); if (Option.isNone(existingRow)) { + yield* deleteProjectedThreadChildren(event.payload.threadId); return; } yield* projectionThreadRepository.upsert({ @@ -496,6 +523,7 @@ const makeOrchestrationProjectionPipeline = Effect.gen(function* () { deletedAt: event.payload.deletedAt, updatedAt: event.payload.deletedAt, }); + yield* deleteProjectedThreadChildren(event.payload.threadId); return; } diff --git a/apps/server/src/orchestration/commandInvariants.test.ts b/apps/server/src/orchestration/commandInvariants.test.ts index b53f636b4..d674346c7 100644 --- a/apps/server/src/orchestration/commandInvariants.test.ts +++ b/apps/server/src/orchestration/commandInvariants.test.ts @@ -12,6 +12,7 @@ import { Effect } from "effect"; import { findThreadById, + requireProject, listThreadsByProjectId, requireNonNegativeInteger, requireThread, @@ -44,6 +45,16 @@ const readModel: OrchestrationReadModel = { updatedAt: now, deletedAt: null, }, + { + id: ProjectId.makeUnsafe("project-archived"), + title: "Project Archived", + workspaceRoot: "/tmp/project-archived", + defaultModel: "gpt-5-codex", + scripts: [], + createdAt: now, + updatedAt: now, + deletedAt: now, + }, ], threads: [ { @@ -84,6 +95,25 @@ const readModel: OrchestrationReadModel = { checkpoints: [], deletedAt: null, }, + { + id: ThreadId.makeUnsafe("thread-archived"), + projectId: ProjectId.makeUnsafe("project-archived"), + title: "Thread Archived", + model: "gpt-5-codex", + interactionMode: DEFAULT_PROVIDER_INTERACTION_MODE, + runtimeMode: "full-access", + branch: null, + worktreePath: null, + createdAt: now, + updatedAt: now, + latestTurn: null, + messages: [], + session: null, + activities: [], + proposedPlans: [], + checkpoints: [], + deletedAt: now, + }, ], }; @@ -123,6 +153,16 @@ describe("commandInvariants", () => { ); expect(thread.id).toBe(ThreadId.makeUnsafe("thread-1")); + await expect( + Effect.runPromise( + requireThread({ + readModel, + command: messageSendCommand, + threadId: ThreadId.makeUnsafe("thread-archived"), + }), + ), + ).rejects.toThrow("has been archived"); + await expect( Effect.runPromise( requireThread({ @@ -134,6 +174,34 @@ describe("commandInvariants", () => { ).rejects.toThrow("does not exist"); }); + it("requires active projects for non-create flows", async () => { + await Effect.runPromise( + requireProject({ + readModel, + command: { + type: "project.meta.update", + commandId: CommandId.makeUnsafe("cmd-project-update"), + projectId: ProjectId.makeUnsafe("project-a"), + }, + projectId: ProjectId.makeUnsafe("project-a"), + }), + ); + + await expect( + Effect.runPromise( + requireProject({ + readModel, + command: { + type: "project.meta.update", + commandId: CommandId.makeUnsafe("cmd-project-update-archived"), + projectId: ProjectId.makeUnsafe("project-archived"), + }, + projectId: ProjectId.makeUnsafe("project-archived"), + }), + ), + ).rejects.toThrow("has been archived"); + }); + it("requires missing thread for create flows", async () => { await Effect.runPromise( requireThreadAbsent({ diff --git a/apps/server/src/orchestration/commandInvariants.ts b/apps/server/src/orchestration/commandInvariants.ts index bccabcc2e..dbdd82b3f 100644 --- a/apps/server/src/orchestration/commandInvariants.ts +++ b/apps/server/src/orchestration/commandInvariants.ts @@ -25,6 +25,13 @@ export function findThreadById( return readModel.threads.find((thread) => thread.id === threadId); } +export function findActiveThreadById( + readModel: OrchestrationReadModel, + threadId: ThreadId, +): OrchestrationThread | undefined { + return readModel.threads.find((thread) => thread.id === threadId && thread.deletedAt === null); +} + export function findProjectById( readModel: OrchestrationReadModel, projectId: ProjectId, @@ -32,6 +39,15 @@ export function findProjectById( return readModel.projects.find((project) => project.id === projectId); } +export function findActiveProjectById( + readModel: OrchestrationReadModel, + projectId: ProjectId, +): OrchestrationProject | undefined { + return readModel.projects.find( + (project) => project.id === projectId && project.deletedAt === null, + ); +} + export function listThreadsByProjectId( readModel: OrchestrationReadModel, projectId: ProjectId, @@ -44,14 +60,17 @@ export function requireProject(input: { readonly command: OrchestrationCommand; readonly projectId: ProjectId; }): Effect.Effect { - const project = findProjectById(input.readModel, input.projectId); + const project = findActiveProjectById(input.readModel, input.projectId); if (project) { return Effect.succeed(project); } + const archivedProject = findProjectById(input.readModel, input.projectId); return Effect.fail( invariantError( input.command.type, - `Project '${input.projectId}' does not exist for command '${input.command.type}'.`, + archivedProject + ? `Project '${input.projectId}' has been archived and cannot be used by command '${input.command.type}'.` + : `Project '${input.projectId}' does not exist for command '${input.command.type}'.`, ), ); } @@ -77,14 +96,17 @@ export function requireThread(input: { readonly command: OrchestrationCommand; readonly threadId: ThreadId; }): Effect.Effect { - const thread = findThreadById(input.readModel, input.threadId); + const thread = findActiveThreadById(input.readModel, input.threadId); if (thread) { return Effect.succeed(thread); } + const archivedThread = findThreadById(input.readModel, input.threadId); return Effect.fail( invariantError( input.command.type, - `Thread '${input.threadId}' does not exist for command '${input.command.type}'.`, + archivedThread + ? `Thread '${input.threadId}' has been archived and cannot be used by command '${input.command.type}'.` + : `Thread '${input.threadId}' does not exist for command '${input.command.type}'.`, ), ); } @@ -138,6 +160,28 @@ export function listActiveThreadsByProjectId( ); } +export function listActiveThreadsByProjectIds( + readModel: OrchestrationReadModel, + projectIds: ReadonlyArray, +): ReadonlyArray { + if (projectIds.length === 0) { + return []; + } + + const projectOrder = new Map(projectIds.map((projectId, index) => [projectId, index] as const)); + return readModel.threads + .filter((thread) => thread.deletedAt === null && projectOrder.has(thread.projectId)) + .toSorted((left, right) => { + const leftProjectOrder = projectOrder.get(left.projectId) ?? Number.MAX_SAFE_INTEGER; + const rightProjectOrder = projectOrder.get(right.projectId) ?? Number.MAX_SAFE_INTEGER; + return ( + leftProjectOrder - rightProjectOrder || + left.updatedAt.localeCompare(right.updatedAt) || + left.id.localeCompare(right.id) + ); + }); +} + /** * Returns the oldest active projects that must be archived to stay within * MAX_PROJECTS when a new project is about to be created. diff --git a/apps/server/src/orchestration/decider.limits.test.ts b/apps/server/src/orchestration/decider.limits.test.ts new file mode 100644 index 000000000..44a3d5f86 --- /dev/null +++ b/apps/server/src/orchestration/decider.limits.test.ts @@ -0,0 +1,151 @@ +import { + CommandId, + DEFAULT_PROVIDER_INTERACTION_MODE, + MAX_PROJECTS, + ProjectId, + ThreadId, + type OrchestrationProject, + type OrchestrationReadModel, + type OrchestrationThread, +} from "@okcode/contracts"; +import { Effect } from "effect"; +import { describe, expect, it } from "vitest"; + +import { decideOrchestrationCommand } from "./decider.ts"; + +function makeProject(input: { + id: string; + updatedAt: string; + deletedAt?: string | null; +}): OrchestrationProject { + return { + id: ProjectId.makeUnsafe(input.id), + title: input.id, + workspaceRoot: `/tmp/${input.id}`, + defaultModel: "gpt-5-codex", + scripts: [], + createdAt: input.updatedAt, + updatedAt: input.updatedAt, + deletedAt: input.deletedAt ?? null, + }; +} + +function makeThread(input: { + id: string; + projectId: string; + updatedAt: string; + deletedAt?: string | null; +}): OrchestrationThread { + return { + id: ThreadId.makeUnsafe(input.id), + projectId: ProjectId.makeUnsafe(input.projectId), + title: input.id, + model: "gpt-5-codex", + interactionMode: DEFAULT_PROVIDER_INTERACTION_MODE, + runtimeMode: "full-access", + branch: null, + worktreePath: null, + createdAt: input.updatedAt, + updatedAt: input.updatedAt, + latestTurn: null, + messages: [], + session: null, + activities: [], + proposedPlans: [], + checkpoints: [], + deletedAt: input.deletedAt ?? null, + }; +} + +describe("decider limits", () => { + it("archives the oldest project's active threads before creating a new project at the cap", async () => { + const createdAt = "2026-04-09T12:00:00.000Z"; + const projects = Array.from({ length: MAX_PROJECTS }, (_, index) => + makeProject({ + id: `project-${index}`, + updatedAt: `2026-04-01T00:00:${String(index).padStart(2, "0")}.000Z`, + }), + ); + const readModel: OrchestrationReadModel = { + snapshotSequence: MAX_PROJECTS, + updatedAt: createdAt, + projects, + threads: [ + makeThread({ + id: "thread-oldest-active", + projectId: "project-0", + updatedAt: "2026-04-01T00:00:00.000Z", + }), + makeThread({ + id: "thread-oldest-archived", + projectId: "project-0", + updatedAt: "2026-04-01T00:00:01.000Z", + deletedAt: "2026-04-02T00:00:00.000Z", + }), + ], + }; + + const result = await Effect.runPromise( + decideOrchestrationCommand({ + command: { + type: "project.create", + commandId: CommandId.makeUnsafe("cmd-project-cap"), + projectId: ProjectId.makeUnsafe("project-new"), + title: "project-new", + workspaceRoot: "/tmp/project-new", + defaultModel: "gpt-5-codex", + createdAt, + }, + readModel, + }), + ); + + const events = Array.isArray(result) ? result : [result]; + expect(events.map((event) => event.type)).toEqual([ + "thread.deleted", + "project.deleted", + "project.created", + ]); + expect(events[0]?.aggregateId).toBe("thread-oldest-active"); + expect(events[1]?.aggregateId).toBe("project-0"); + expect(events[2]?.aggregateId).toBe("project-new"); + }); + + it("cascades active thread archival when deleting a project", async () => { + const updatedAt = "2026-04-09T12:05:00.000Z"; + const readModel: OrchestrationReadModel = { + snapshotSequence: 2, + updatedAt, + projects: [makeProject({ id: "project-1", updatedAt })], + threads: [ + makeThread({ + id: "thread-active", + projectId: "project-1", + updatedAt, + }), + makeThread({ + id: "thread-archived", + projectId: "project-1", + updatedAt, + deletedAt: "2026-04-08T00:00:00.000Z", + }), + ], + }; + + const result = await Effect.runPromise( + decideOrchestrationCommand({ + command: { + type: "project.delete", + commandId: CommandId.makeUnsafe("cmd-project-delete"), + projectId: ProjectId.makeUnsafe("project-1"), + }, + readModel, + }), + ); + + const events = Array.isArray(result) ? result : [result]; + expect(events.map((event) => event.type)).toEqual(["thread.deleted", "project.deleted"]); + expect(events[0]?.aggregateId).toBe("thread-active"); + expect(events[1]?.aggregateId).toBe("project-1"); + }); +}); diff --git a/apps/server/src/orchestration/decider.ts b/apps/server/src/orchestration/decider.ts index 00f23dd15..ec08e7020 100644 --- a/apps/server/src/orchestration/decider.ts +++ b/apps/server/src/orchestration/decider.ts @@ -2,6 +2,8 @@ import type { OrchestrationCommand, OrchestrationEvent, OrchestrationReadModel, + ProjectId, + ThreadId, } from "@okcode/contracts"; import { Effect } from "effect"; @@ -9,6 +11,7 @@ import { OrchestrationCommandInvariantError } from "./Errors.ts"; import { getProjectsToArchive, getThreadsToArchive, + listActiveThreadsByProjectIds, requireProject, requireProjectAbsent, requireThread, @@ -49,6 +52,46 @@ function withEventBase( }; } +function createThreadDeletedEvent(input: { + readonly command: Pick; + readonly threadId: ThreadId; + readonly occurredAt: string; +}): Omit { + return { + ...withEventBase({ + aggregateKind: "thread", + aggregateId: input.threadId, + occurredAt: input.occurredAt, + commandId: input.command.commandId, + }), + type: "thread.deleted", + payload: { + threadId: input.threadId, + deletedAt: input.occurredAt, + }, + }; +} + +function createProjectDeletedEvent(input: { + readonly command: Pick; + readonly projectId: ProjectId; + readonly occurredAt: string; +}): Omit { + return { + ...withEventBase({ + aggregateKind: "project", + aggregateId: input.projectId, + occurredAt: input.occurredAt, + commandId: input.command.commandId, + }), + type: "project.deleted", + payload: { + projectId: input.projectId, + deletedAt: input.occurredAt, + }, + }; +} + export const decideOrchestrationCommand = Effect.fn("decideOrchestrationCommand")(function* ({ command, readModel, @@ -92,24 +135,26 @@ export const decideOrchestrationCommand = Effect.fn("decideOrchestrationCommand" return projectCreatedEvent; } - const archiveEvents: Omit[] = projectsToArchive.map( - (project) => - Object.assign( - withEventBase({ - aggregateKind: "project" as const, - aggregateId: project.id, - occurredAt: command.createdAt, - commandId: command.commandId, - }), - { - type: "project.deleted" as const, - payload: { - projectId: project.id, - deletedAt: command.createdAt, - }, - }, - ), + const threadsToArchive = listActiveThreadsByProjectIds( + readModel, + projectsToArchive.map((project) => project.id), ); + const archiveEvents: Omit[] = [ + ...threadsToArchive.map((thread) => + createThreadDeletedEvent({ + command, + threadId: thread.id, + occurredAt: command.createdAt, + }), + ), + ...projectsToArchive.map((project) => + createProjectDeletedEvent({ + command, + projectId: project.id, + occurredAt: command.createdAt, + }), + ), + ]; return [...archiveEvents, projectCreatedEvent]; } @@ -146,19 +191,25 @@ export const decideOrchestrationCommand = Effect.fn("decideOrchestrationCommand" projectId: command.projectId, }); const occurredAt = nowIso(); - return { - ...withEventBase({ - aggregateKind: "project", - aggregateId: command.projectId, - occurredAt, - commandId: command.commandId, - }), - type: "project.deleted", - payload: { - projectId: command.projectId, - deletedAt: occurredAt, - }, - }; + const threadsToArchive = listActiveThreadsByProjectIds(readModel, [command.projectId]); + const projectDeletedEvent = createProjectDeletedEvent({ + command, + projectId: command.projectId, + occurredAt, + }); + if (threadsToArchive.length === 0) { + return projectDeletedEvent; + } + return [ + ...threadsToArchive.map((thread) => + createThreadDeletedEvent({ + command, + threadId: thread.id, + occurredAt, + }), + ), + projectDeletedEvent, + ]; } case "thread.create": { @@ -203,21 +254,11 @@ export const decideOrchestrationCommand = Effect.fn("decideOrchestrationCommand" } const archiveEvents: Omit[] = threadsToArchive.map((thread) => - Object.assign( - withEventBase({ - aggregateKind: "thread" as const, - aggregateId: thread.id, - occurredAt: command.createdAt, - commandId: command.commandId, - }), - { - type: "thread.deleted" as const, - payload: { - threadId: thread.id, - deletedAt: command.createdAt, - }, - }, - ), + createThreadDeletedEvent({ + command, + threadId: thread.id, + occurredAt: command.createdAt, + }), ); return [...archiveEvents, threadCreatedEvent]; } @@ -229,19 +270,11 @@ export const decideOrchestrationCommand = Effect.fn("decideOrchestrationCommand" threadId: command.threadId, }); const occurredAt = nowIso(); - return { - ...withEventBase({ - aggregateKind: "thread", - aggregateId: command.threadId, - occurredAt, - commandId: command.commandId, - }), - type: "thread.deleted", - payload: { - threadId: command.threadId, - deletedAt: occurredAt, - }, - }; + return createThreadDeletedEvent({ + command, + threadId: command.threadId, + occurredAt, + }); } case "thread.meta.update": { diff --git a/apps/server/src/orchestration/projector.test.ts b/apps/server/src/orchestration/projector.test.ts index 4c6402b97..2414ecbb9 100644 --- a/apps/server/src/orchestration/projector.test.ts +++ b/apps/server/src/orchestration/projector.test.ts @@ -1,8 +1,12 @@ import { + CheckpointRef, CommandId, EventId, + MessageId, ProjectId, ThreadId, + TurnId, + type OrchestrationReadModel, type OrchestrationEvent, } from "@okcode/contracts"; import { Effect } from "effect"; @@ -122,6 +126,121 @@ describe("orchestration projector", () => { ).rejects.toBeDefined(); }); + it("clears retained thread payload when a thread is deleted", async () => { + const now = new Date().toISOString(); + const model: OrchestrationReadModel = { + snapshotSequence: 0, + updatedAt: now, + projects: [], + threads: [ + { + id: ThreadId.makeUnsafe("thread-delete"), + projectId: ProjectId.makeUnsafe("project-delete"), + title: "Delete me", + model: "gpt-5-codex", + runtimeMode: "full-access", + interactionMode: "chat", + branch: null, + worktreePath: null, + latestTurn: { + turnId: TurnId.makeUnsafe("turn-delete"), + state: "completed", + requestedAt: now, + startedAt: now, + completedAt: now, + assistantMessageId: MessageId.makeUnsafe("assistant-delete"), + }, + createdAt: now, + updatedAt: now, + deletedAt: null, + messages: [ + { + id: MessageId.makeUnsafe("assistant-delete"), + role: "assistant", + text: "done", + turnId: TurnId.makeUnsafe("turn-delete"), + streaming: false, + createdAt: now, + updatedAt: now, + }, + ], + proposedPlans: [ + { + id: "plan-delete", + turnId: TurnId.makeUnsafe("turn-delete"), + planMarkdown: "1. Delete", + implementedAt: null, + implementationThreadId: null, + createdAt: now, + updatedAt: now, + }, + ], + activities: [ + { + id: EventId.makeUnsafe("activity-delete"), + tone: "tool", + kind: "tool.completed", + summary: "Deleted thread payload", + payload: { toolKind: "command" }, + turnId: TurnId.makeUnsafe("turn-delete"), + createdAt: now, + }, + ], + checkpoints: [ + { + turnId: TurnId.makeUnsafe("turn-delete"), + checkpointTurnCount: 1, + checkpointRef: CheckpointRef.makeUnsafe("refs/t3/checkpoints/thread-delete/turn/1"), + status: "ready", + files: [], + assistantMessageId: MessageId.makeUnsafe("assistant-delete"), + completedAt: now, + }, + ], + session: { + threadId: ThreadId.makeUnsafe("thread-delete"), + status: "ready", + providerName: "codex", + runtimeMode: "full-access", + activeTurnId: null, + lastError: null, + updatedAt: now, + }, + }, + ], + }; + + const next = await Effect.runPromise( + projectEvent( + model, + makeEvent({ + sequence: 1, + type: "thread.deleted", + aggregateKind: "thread", + aggregateId: "thread-delete", + occurredAt: now, + commandId: "cmd-thread-delete", + payload: { + threadId: "thread-delete", + deletedAt: now, + }, + }), + ), + ); + + expect(next.threads[0]).toMatchObject({ + id: "thread-delete", + latestTurn: null, + messages: [], + proposedPlans: [], + activities: [], + checkpoints: [], + session: null, + deletedAt: now, + updatedAt: now, + }); + }); + it("keeps projector forward-compatible for unhandled event types", async () => { const now = new Date().toISOString(); const model = createEmptyReadModel(now); diff --git a/apps/server/src/orchestration/projector.ts b/apps/server/src/orchestration/projector.ts index a88f32f42..70d377467 100644 --- a/apps/server/src/orchestration/projector.ts +++ b/apps/server/src/orchestration/projector.ts @@ -284,6 +284,12 @@ export function projectEvent( Effect.map((payload) => ({ ...nextBase, threads: updateThread(nextBase.threads, payload.threadId, { + latestTurn: null, + messages: [], + activities: [], + proposedPlans: [], + checkpoints: [], + session: null, deletedAt: payload.deletedAt, updatedAt: payload.deletedAt, }), diff --git a/packages/contracts/src/orchestration.ts b/packages/contracts/src/orchestration.ts index 2b7f24c30..7a6a8d250 100644 --- a/packages/contracts/src/orchestration.ts +++ b/packages/contracts/src/orchestration.ts @@ -100,15 +100,15 @@ export type ProviderUserInputAnswers = typeof ProviderUserInputAnswers.Type; /** * Maximum number of active (non-deleted) projects allowed. - * When this limit is reached, the oldest project is automatically archived - * (soft-deleted) to make room for the new one. + * When this limit is reached, the oldest project and its active threads are + * automatically archived to make room for the new one. */ export const MAX_PROJECTS = 50; /** * Maximum number of active (non-deleted) threads allowed per project. * When this limit is reached, the oldest thread in the project is - * automatically archived (soft-deleted) to make room for the new one. + * automatically archived to make room for the new one. */ export const MAX_THREADS_PER_PROJECT = 100;