From fa1c1b9f6ce62cab389403e46f2529c99f915b53 Mon Sep 17 00:00:00 2001 From: Val Alexander Date: Fri, 10 Apr 2026 11:27:16 -0500 Subject: [PATCH] Add pending user input projections and thread queries - project pending user inputs into persistence - add overview and thread detail projection queries - include pending user input counts in orchestration snapshot --- .../Layers/ProjectionOverviewQuery.ts | 428 ++++++++++++++++++ .../Layers/ProjectionPipeline.ts | 98 ++++ .../Layers/ProjectionThreadDetailQuery.ts | 373 +++++++++++++++ .../Services/ProjectionOverviewQuery.ts | 14 + .../Services/ProjectionThreadDetailQuery.ts | 16 + .../Layers/ProjectionPendingUserInputs.ts | 131 ++++++ .../021_ProjectionPendingUserInputs.ts | 22 + .../Services/ProjectionPendingUserInputs.ts | 52 +++ apps/server/src/serverLayers.ts | 8 + packages/contracts/src/ipc.ts | 7 +- packages/contracts/src/orchestration.ts | 77 +++- 11 files changed, 1223 insertions(+), 3 deletions(-) create mode 100644 apps/server/src/orchestration/Layers/ProjectionOverviewQuery.ts create mode 100644 apps/server/src/orchestration/Layers/ProjectionThreadDetailQuery.ts create mode 100644 apps/server/src/orchestration/Services/ProjectionOverviewQuery.ts create mode 100644 apps/server/src/orchestration/Services/ProjectionThreadDetailQuery.ts create mode 100644 apps/server/src/persistence/Layers/ProjectionPendingUserInputs.ts create mode 100644 apps/server/src/persistence/Migrations/021_ProjectionPendingUserInputs.ts create mode 100644 apps/server/src/persistence/Services/ProjectionPendingUserInputs.ts diff --git a/apps/server/src/orchestration/Layers/ProjectionOverviewQuery.ts b/apps/server/src/orchestration/Layers/ProjectionOverviewQuery.ts new file mode 100644 index 000000000..0cf520523 --- /dev/null +++ b/apps/server/src/orchestration/Layers/ProjectionOverviewQuery.ts @@ -0,0 +1,428 @@ +import { + IsoDateTime, + MAX_PROJECTS, + MAX_THREADS_PER_PROJECT, + MessageId, + NonNegativeInt, + OrchestrationLatestTurn, + OrchestrationOverviewSnapshot, + OrchestrationProposedPlanId, + OrchestrationSession, + ProjectScript, + ThreadId, + TurnId, + type OrchestrationOverviewProject, + type OrchestrationOverviewThread, +} from "@okcode/contracts"; +import { Effect, Layer, Schema } from "effect"; +import * as SqlClient from "effect/unstable/sql/SqlClient"; +import * as SqlSchema from "effect/unstable/sql/SqlSchema"; + +import { + isPersistenceError, + toPersistenceDecodeError, + toPersistenceSqlError, + type ProjectionRepositoryError, +} from "../../persistence/Errors.ts"; +import { ProjectionOverviewQuery, type ProjectionOverviewQueryShape } from "../Services/ProjectionOverviewQuery.ts"; +import { ProjectionState } from "../../persistence/Services/ProjectionState.ts"; +import { ProjectionProject } from "../../persistence/Services/ProjectionProjects.ts"; +import { ProjectionThread } from "../../persistence/Services/ProjectionThreads.ts"; +import { ProjectionThreadProposedPlan } from "../../persistence/Services/ProjectionThreadProposedPlans.ts"; +import { ProjectionThreadSession } from "../../persistence/Services/ProjectionThreadSessions.ts"; +import { ORCHESTRATION_PROJECTOR_NAMES } from "./ProjectionPipeline.ts"; + +const ProjectionProjectOverviewRow = ProjectionProject.mapFields({ + scripts: Schema.fromJsonString(Schema.Array(ProjectScript)), +}).pipe( + Schema.extend( + Schema.Struct({ + activeThreadCount: NonNegativeInt, + }), + ), +); + +const ProjectionThreadOverviewRow = ProjectionThread.pipe( + Schema.extend( + Schema.Struct({ + lastUserMessageAt: Schema.NullOr(IsoDateTime), + pendingApprovalCount: NonNegativeInt, + pendingUserInputCount: NonNegativeInt, + }), + ), +); + +const ProjectionLatestTurnDbRowSchema = Schema.Struct({ + threadId: ProjectionThread.fields.threadId, + turnId: TurnId, + state: Schema.String, + requestedAt: IsoDateTime, + startedAt: Schema.NullOr(IsoDateTime), + completedAt: Schema.NullOr(IsoDateTime), + assistantMessageId: Schema.NullOr(MessageId), + sourceProposedPlanThreadId: Schema.NullOr(ThreadId), + sourceProposedPlanId: Schema.NullOr(OrchestrationProposedPlanId), +}); + +const ProjectionThreadPlanSummaryRow = Schema.Struct({ + planId: OrchestrationProposedPlanId, + threadId: ThreadId, + turnId: Schema.NullOr(TurnId), + implementedAt: Schema.NullOr(IsoDateTime), + updatedAt: IsoDateTime, +}); + +const ProjectionStateDbRowSchema = ProjectionState; +const ProjectionThreadSessionDbRowSchema = ProjectionThreadSession; + +function parseGithubRef(serialized: string | null): OrchestrationOverviewThread["githubRef"] | undefined { + if (!serialized) return undefined; + try { + return JSON.parse(serialized) as OrchestrationOverviewThread["githubRef"]; + } catch { + return undefined; + } +} + +const REQUIRED_OVERVIEW_PROJECTORS = [ + ORCHESTRATION_PROJECTOR_NAMES.projects, + ORCHESTRATION_PROJECTOR_NAMES.threads, + ORCHESTRATION_PROJECTOR_NAMES.threadMessages, + ORCHESTRATION_PROJECTOR_NAMES.threadProposedPlans, + ORCHESTRATION_PROJECTOR_NAMES.threadSessions, + ORCHESTRATION_PROJECTOR_NAMES.pendingApprovals, +] as const; + +function computeSnapshotSequence( + stateRows: ReadonlyArray>, +): number { + if (stateRows.length === 0) { + return 0; + } + const sequenceByProjector = new Map( + stateRows.map((row) => [row.projector, row.lastAppliedSequence] as const), + ); + + let minSequence = Number.POSITIVE_INFINITY; + for (const projector of REQUIRED_OVERVIEW_PROJECTORS) { + const sequence = sequenceByProjector.get(projector); + if (sequence === undefined) { + return 0; + } + if (sequence < minSequence) { + minSequence = sequence; + } + } + + return Number.isFinite(minSequence) ? minSequence : 0; +} + +function toLatestTurn( + row: Schema.Schema.Type, +): OrchestrationLatestTurn { + return { + turnId: row.turnId, + state: + row.state === "error" + ? "error" + : row.state === "interrupted" + ? "interrupted" + : row.state === "completed" + ? "completed" + : "running", + requestedAt: row.requestedAt, + startedAt: row.startedAt, + completedAt: row.completedAt, + assistantMessageId: row.assistantMessageId, + ...(row.sourceProposedPlanThreadId !== null && row.sourceProposedPlanId !== null + ? { + sourceProposedPlan: { + threadId: row.sourceProposedPlanThreadId, + planId: row.sourceProposedPlanId, + }, + } + : {}), + }; +} + +function hasActionablePlan( + plans: ReadonlyArray>, + latestTurn: OrchestrationLatestTurn | null, +): boolean { + const latestTurnId = latestTurn?.turnId ?? null; + const matchingTurnPlan = + latestTurnId === null + ? null + : [...plans] + .filter((plan) => plan.turnId === latestTurnId) + .toSorted( + (left, right) => + left.updatedAt.localeCompare(right.updatedAt) || left.planId.localeCompare(right.planId), + ) + .at(-1) ?? null; + if (matchingTurnPlan) { + return matchingTurnPlan.implementedAt === null; + } + const latestPlan = + [...plans] + .toSorted( + (left, right) => + left.updatedAt.localeCompare(right.updatedAt) || left.planId.localeCompare(right.planId), + ) + .at(-1) ?? null; + return latestPlan?.implementedAt === null; +} + +const makeProjectionOverviewQuery = Effect.gen(function* () { + const sql = yield* SqlClient.SqlClient; + + const listProjectRows = SqlSchema.findAll({ + Request: Schema.Void, + Result: ProjectionProjectOverviewRow, + execute: () => + sql` + SELECT + p.project_id AS "projectId", + p.title, + p.workspace_root AS "workspaceRoot", + p.default_model AS "defaultModel", + p.scripts_json AS "scripts", + p.created_at AS "createdAt", + p.updated_at AS "updatedAt", + p.deleted_at AS "deletedAt", + CAST(COUNT(t.thread_id) AS INTEGER) AS "activeThreadCount" + FROM projection_projects p + LEFT JOIN projection_threads t + ON t.project_id = p.project_id + AND t.deleted_at IS NULL + WHERE p.deleted_at IS NULL + GROUP BY p.project_id + ORDER BY p.created_at ASC, p.project_id ASC + `, + }); + + const listThreadRows = SqlSchema.findAll({ + Request: Schema.Void, + Result: ProjectionThreadOverviewRow, + execute: () => + sql` + SELECT + t.thread_id AS "threadId", + t.project_id AS "projectId", + t.title, + t.model, + t.runtime_mode AS "runtimeMode", + t.interaction_mode AS "interactionMode", + t.branch, + t.worktree_path AS "worktreePath", + t.github_ref AS "githubRef", + t.latest_turn_id AS "latestTurnId", + t.created_at AS "createdAt", + t.updated_at AS "updatedAt", + t.deleted_at AS "deletedAt", + ( + SELECT MAX(m.created_at) + FROM projection_thread_messages m + WHERE m.thread_id = t.thread_id + AND m.role = 'user' + ) AS "lastUserMessageAt", + CAST(( + SELECT COUNT(*) + FROM projection_pending_approvals pa + WHERE pa.thread_id = t.thread_id + AND pa.status = 'pending' + ) AS INTEGER) AS "pendingApprovalCount", + CAST(( + SELECT COUNT(*) + FROM projection_pending_user_inputs pui + WHERE pui.thread_id = t.thread_id + AND pui.status = 'pending' + ) AS INTEGER) AS "pendingUserInputCount" + FROM projection_threads t + WHERE t.deleted_at IS NULL + ORDER BY t.created_at ASC, t.thread_id ASC + `, + }); + + const listLatestTurnRows = SqlSchema.findAll({ + Request: Schema.Void, + Result: ProjectionLatestTurnDbRowSchema, + execute: () => + sql` + SELECT + thread_id AS "threadId", + turn_id AS "turnId", + state, + requested_at AS "requestedAt", + started_at AS "startedAt", + completed_at AS "completedAt", + assistant_message_id AS "assistantMessageId", + source_proposed_plan_thread_id AS "sourceProposedPlanThreadId", + source_proposed_plan_id AS "sourceProposedPlanId" + FROM projection_turns + WHERE turn_id IS NOT NULL + ORDER BY thread_id ASC, requested_at DESC, turn_id DESC + `, + }); + + const listSessionRows = SqlSchema.findAll({ + Request: Schema.Void, + Result: ProjectionThreadSessionDbRowSchema, + execute: () => + sql` + SELECT + thread_id AS "threadId", + status, + provider_name AS "providerName", + provider_session_id AS "providerSessionId", + provider_thread_id AS "providerThreadId", + runtime_mode AS "runtimeMode", + active_turn_id AS "activeTurnId", + last_error AS "lastError", + updated_at AS "updatedAt" + FROM projection_thread_sessions + ORDER BY thread_id ASC + `, + }); + + const listPlanRows = SqlSchema.findAll({ + Request: Schema.Void, + Result: ProjectionThreadPlanSummaryRow, + execute: () => + sql` + SELECT + plan_id AS "planId", + thread_id AS "threadId", + turn_id AS "turnId", + implemented_at AS "implementedAt", + updated_at AS "updatedAt" + FROM projection_thread_proposed_plans + ORDER BY thread_id ASC, updated_at ASC, plan_id ASC + `, + }); + + const listProjectionStateRows = SqlSchema.findAll({ + Request: Schema.Void, + Result: ProjectionStateDbRowSchema, + execute: () => + sql` + SELECT + projector, + last_applied_sequence AS "lastAppliedSequence", + updated_at AS "updatedAt" + FROM projection_state + `, + }); + + const getOverview: ProjectionOverviewQueryShape["getOverview"] = () => + sql.withTransaction( + Effect.gen(function* () { + const [projectRows, threadRows, latestTurnRows, sessionRows, planRows, stateRows] = + yield* Effect.all([ + listProjectRows(undefined), + listThreadRows(undefined), + listLatestTurnRows(undefined), + listSessionRows(undefined), + listPlanRows(undefined), + listProjectionStateRows(undefined), + ]); + + const latestTurnByThread = new Map(); + for (const row of latestTurnRows) { + if (!latestTurnByThread.has(row.threadId)) { + latestTurnByThread.set(row.threadId, toLatestTurn(row)); + } + } + + const sessionByThread = new Map(); + for (const row of sessionRows) { + sessionByThread.set(row.threadId, { + threadId: row.threadId, + status: row.status, + providerName: row.providerName, + runtimeMode: row.runtimeMode, + activeTurnId: row.activeTurnId, + lastError: row.lastError, + updatedAt: row.updatedAt, + }); + } + + const plansByThread = new Map>>(); + for (const row of planRows) { + const plans = plansByThread.get(row.threadId) ?? []; + plans.push(row); + plansByThread.set(row.threadId, plans); + } + + const projects: OrchestrationOverviewProject[] = projectRows.map((row) => ({ + id: row.projectId, + title: row.title, + workspaceRoot: row.workspaceRoot, + defaultModel: row.defaultModel, + scripts: row.scripts, + activeThreadCount: row.activeThreadCount, + createdAt: row.createdAt, + updatedAt: row.updatedAt, + })); + + const threads: OrchestrationOverviewThread[] = threadRows.map((row) => { + const latestTurn = latestTurnByThread.get(row.threadId) ?? null; + return { + id: row.threadId, + projectId: row.projectId, + title: row.title, + model: row.model, + runtimeMode: row.runtimeMode, + interactionMode: row.interactionMode, + branch: row.branch, + worktreePath: row.worktreePath, + ...(parseGithubRef(row.githubRef) ? { githubRef: parseGithubRef(row.githubRef) } : {}), + latestTurn, + session: sessionByThread.get(row.threadId) ?? null, + createdAt: row.createdAt, + updatedAt: row.updatedAt, + lastUserMessageAt: row.lastUserMessageAt, + pendingApprovalCount: row.pendingApprovalCount, + pendingUserInputCount: row.pendingUserInputCount, + hasActionablePlan: hasActionablePlan(plansByThread.get(row.threadId) ?? [], latestTurn), + }; + }); + + const updatedAtCandidates = [ + ...projectRows.map((row) => row.updatedAt), + ...threadRows.map((row) => row.updatedAt), + ...sessionRows.map((row) => row.updatedAt), + ...stateRows.map((row) => row.updatedAt), + ]; + + return Schema.decodeUnknownSync(OrchestrationOverviewSnapshot)({ + snapshotSequence: computeSnapshotSequence(stateRows), + limits: { + maxProjects: MAX_PROJECTS, + maxThreadsPerProject: MAX_THREADS_PER_PROJECT, + }, + projects, + threads, + updatedAt: + updatedAtCandidates.sort((left, right) => (left < right ? 1 : left > right ? -1 : 0))[0] ?? + new Date(0).toISOString(), + }); + }), + ).pipe( + Effect.mapError((cause): ProjectionRepositoryError => { + if (Schema.isSchemaError(cause)) { + return toPersistenceDecodeError("ProjectionOverviewQuery.getOverview:decode")(cause); + } + return isPersistenceError(cause) + ? cause + : toPersistenceSqlError("ProjectionOverviewQuery.getOverview:query")(cause); + }), + ); + + return { getOverview } satisfies ProjectionOverviewQueryShape; +}); + +export const OrchestrationProjectionOverviewQueryLive = Layer.effect( + ProjectionOverviewQuery, + makeProjectionOverviewQuery, +); diff --git a/apps/server/src/orchestration/Layers/ProjectionPipeline.ts b/apps/server/src/orchestration/Layers/ProjectionPipeline.ts index 1b9b3e8a2..3ce3b5b18 100644 --- a/apps/server/src/orchestration/Layers/ProjectionPipeline.ts +++ b/apps/server/src/orchestration/Layers/ProjectionPipeline.ts @@ -11,6 +11,7 @@ import * as SqlClient from "effect/unstable/sql/SqlClient"; import { toPersistenceSqlError, type ProjectionRepositoryError } from "../../persistence/Errors.ts"; import { OrchestrationEventStore } from "../../persistence/Services/OrchestrationEventStore.ts"; import { ProjectionPendingApprovalRepository } from "../../persistence/Services/ProjectionPendingApprovals.ts"; +import { ProjectionPendingUserInputRepository } from "../../persistence/Services/ProjectionPendingUserInputs.ts"; import { ProjectionProjectRepository } from "../../persistence/Services/ProjectionProjects.ts"; import { ProjectionStateRepository } from "../../persistence/Services/ProjectionState.ts"; import { ProjectionThreadActivityRepository } from "../../persistence/Services/ProjectionThreadActivities.ts"; @@ -30,6 +31,7 @@ import { } from "../../persistence/Services/ProjectionTurns.ts"; import { ProjectionThreadRepository } from "../../persistence/Services/ProjectionThreads.ts"; import { ProjectionPendingApprovalRepositoryLive } from "../../persistence/Layers/ProjectionPendingApprovals.ts"; +import { ProjectionPendingUserInputRepositoryLive } from "../../persistence/Layers/ProjectionPendingUserInputs.ts"; import { ProjectionProjectRepositoryLive } from "../../persistence/Layers/ProjectionProjects.ts"; import { ProjectionStateRepositoryLive } from "../../persistence/Layers/ProjectionState.ts"; import { ProjectionThreadActivityRepositoryLive } from "../../persistence/Layers/ProjectionThreadActivities.ts"; @@ -60,6 +62,7 @@ export const ORCHESTRATION_PROJECTOR_NAMES = { threadTurns: "projection.thread-turns", checkpoints: "projection.checkpoints", pendingApprovals: "projection.pending-approvals", + pendingUserInputs: "projection.pending-user-inputs", } as const; type ProjectorName = @@ -347,6 +350,7 @@ const makeOrchestrationProjectionPipeline = Effect.gen(function* () { const projectionThreadSessionRepository = yield* ProjectionThreadSessionRepository; const projectionTurnRepository = yield* ProjectionTurnRepository; const projectionPendingApprovalRepository = yield* ProjectionPendingApprovalRepository; + const projectionPendingUserInputRepository = yield* ProjectionPendingUserInputRepository; const fileSystem = yield* FileSystem.FileSystem; const path = yield* Path.Path; @@ -371,6 +375,18 @@ const makeOrchestrationProjectionPipeline = Effect.gen(function* () { }), { concurrency: 1 }, ).pipe(Effect.asVoid); + + const pendingUserInputs = yield* projectionPendingUserInputRepository.listByThreadId({ + threadId, + }); + yield* Effect.forEach( + pendingUserInputs, + (userInput) => + projectionPendingUserInputRepository.deleteByRequestId({ + requestId: userInput.requestId, + }), + { concurrency: 1 }, + ).pipe(Effect.asVoid); }); const applyProjectsProjection: ProjectorDefinition["apply"] = (event, _attachmentSideEffects) => @@ -1145,6 +1161,83 @@ const makeOrchestrationProjectionPipeline = Effect.gen(function* () { } }); + const applyPendingUserInputsProjection: ProjectorDefinition["apply"] = ( + event, + _attachmentSideEffects, + ) => + Effect.gen(function* () { + switch (event.type) { + case "thread.activity-appended": { + const requestId = + extractActivityRequestId(event.payload.activity.payload) ?? + event.metadata.requestId ?? + null; + if (requestId === null) { + return; + } + const existingRow = yield* projectionPendingUserInputRepository.getByRequestId({ + requestId, + }); + if (event.payload.activity.kind === "user-input.resolved") { + yield* projectionPendingUserInputRepository.upsert({ + requestId, + threadId: Option.isSome(existingRow) + ? existingRow.value.threadId + : event.payload.threadId, + turnId: Option.isSome(existingRow) + ? existingRow.value.turnId + : event.payload.activity.turnId, + status: "resolved", + createdAt: Option.isSome(existingRow) + ? existingRow.value.createdAt + : event.payload.activity.createdAt, + resolvedAt: event.payload.activity.createdAt, + }); + return; + } + if (event.payload.activity.kind !== "user-input.requested") { + return; + } + if (Option.isSome(existingRow) && existingRow.value.status === "resolved") { + return; + } + yield* projectionPendingUserInputRepository.upsert({ + requestId, + threadId: event.payload.threadId, + turnId: event.payload.activity.turnId, + status: "pending", + createdAt: Option.isSome(existingRow) + ? existingRow.value.createdAt + : event.payload.activity.createdAt, + resolvedAt: null, + }); + return; + } + + case "thread.user-input-response-requested": { + const existingRow = yield* projectionPendingUserInputRepository.getByRequestId({ + requestId: event.payload.requestId, + }); + yield* projectionPendingUserInputRepository.upsert({ + requestId: event.payload.requestId, + threadId: Option.isSome(existingRow) + ? existingRow.value.threadId + : event.payload.threadId, + turnId: Option.isSome(existingRow) ? existingRow.value.turnId : null, + status: "resolved", + createdAt: Option.isSome(existingRow) + ? existingRow.value.createdAt + : event.payload.createdAt, + resolvedAt: event.payload.createdAt, + }); + return; + } + + default: + return; + } + }); + const projectors: ReadonlyArray = [ { name: ORCHESTRATION_PROJECTOR_NAMES.projects, @@ -1178,6 +1271,10 @@ const makeOrchestrationProjectionPipeline = Effect.gen(function* () { name: ORCHESTRATION_PROJECTOR_NAMES.pendingApprovals, apply: applyPendingApprovalsProjection, }, + { + name: ORCHESTRATION_PROJECTOR_NAMES.pendingUserInputs, + apply: applyPendingUserInputsProjection, + }, { name: ORCHESTRATION_PROJECTOR_NAMES.threads, apply: applyThreadsProjection, @@ -1282,5 +1379,6 @@ export const OrchestrationProjectionPipelineLive = Layer.effect( Layer.provideMerge(ProjectionThreadSessionRepositoryLive), Layer.provideMerge(ProjectionTurnRepositoryLive), Layer.provideMerge(ProjectionPendingApprovalRepositoryLive), + Layer.provideMerge(ProjectionPendingUserInputRepositoryLive), Layer.provideMerge(ProjectionStateRepositoryLive), ); diff --git a/apps/server/src/orchestration/Layers/ProjectionThreadDetailQuery.ts b/apps/server/src/orchestration/Layers/ProjectionThreadDetailQuery.ts new file mode 100644 index 000000000..4a763b596 --- /dev/null +++ b/apps/server/src/orchestration/Layers/ProjectionThreadDetailQuery.ts @@ -0,0 +1,373 @@ +import { + ChatAttachment, + IsoDateTime, + MessageId, + NonNegativeInt, + OrchestrationCheckpointFile, + OrchestrationProposedPlanId, + OrchestrationThread, + ThreadId, + TurnId, + type OrchestrationCheckpointSummary, + type OrchestrationLatestTurn, + type OrchestrationMessage, + type OrchestrationProposedPlan, + type OrchestrationSession, + type OrchestrationThreadActivity, +} from "@okcode/contracts"; +import { Effect, Layer, Option, Schema, Struct } from "effect"; +import * as SqlClient from "effect/unstable/sql/SqlClient"; +import * as SqlSchema from "effect/unstable/sql/SqlSchema"; + +import { + isPersistenceError, + toPersistenceDecodeError, + toPersistenceSqlError, + type ProjectionRepositoryError, +} from "../../persistence/Errors.ts"; +import { ProjectionCheckpoint } from "../../persistence/Services/ProjectionCheckpoints.ts"; +import { ProjectionThreadActivity } from "../../persistence/Services/ProjectionThreadActivities.ts"; +import { ProjectionThreadMessage } from "../../persistence/Services/ProjectionThreadMessages.ts"; +import { ProjectionThreadProposedPlan } from "../../persistence/Services/ProjectionThreadProposedPlans.ts"; +import { ProjectionThreadSession } from "../../persistence/Services/ProjectionThreadSessions.ts"; +import { ProjectionThread } from "../../persistence/Services/ProjectionThreads.ts"; +import { ProjectionThreadDetailQuery, type ProjectionThreadDetailQueryShape } from "../Services/ProjectionThreadDetailQuery.ts"; + +const ProjectionThreadMessageDbRowSchema = ProjectionThreadMessage.mapFields( + Struct.assign({ + isStreaming: Schema.Number, + attachments: Schema.NullOr(Schema.fromJsonString(Schema.Array(ChatAttachment))), + }), +); +const ProjectionThreadProposedPlanDbRowSchema = ProjectionThreadProposedPlan; +const ProjectionThreadDbRowSchema = ProjectionThread; +const ProjectionThreadActivityDbRowSchema = ProjectionThreadActivity.mapFields( + Struct.assign({ + payload: Schema.fromJsonString(Schema.Unknown), + sequence: Schema.NullOr(NonNegativeInt), + }), +); +const ProjectionThreadSessionDbRowSchema = ProjectionThreadSession; +const ProjectionCheckpointDbRowSchema = ProjectionCheckpoint.mapFields( + Struct.assign({ + files: Schema.fromJsonString(Schema.Array(OrchestrationCheckpointFile)), + }), +); +const ProjectionLatestTurnDbRowSchema = Schema.Struct({ + threadId: ProjectionThread.fields.threadId, + turnId: TurnId, + state: Schema.String, + requestedAt: IsoDateTime, + startedAt: Schema.NullOr(IsoDateTime), + completedAt: Schema.NullOr(IsoDateTime), + assistantMessageId: Schema.NullOr(MessageId), + sourceProposedPlanThreadId: Schema.NullOr(ThreadId), + sourceProposedPlanId: Schema.NullOr(OrchestrationProposedPlanId), +}); + +function parseGithubRef(serialized: string | null): OrchestrationThread["githubRef"] | undefined { + if (!serialized) return undefined; + try { + return JSON.parse(serialized) as OrchestrationThread["githubRef"]; + } catch { + return undefined; + } +} + +function toLatestTurn( + row: Schema.Schema.Type, +): OrchestrationLatestTurn { + return { + turnId: row.turnId, + state: + row.state === "error" + ? "error" + : row.state === "interrupted" + ? "interrupted" + : row.state === "completed" + ? "completed" + : "running", + requestedAt: row.requestedAt, + startedAt: row.startedAt, + completedAt: row.completedAt, + assistantMessageId: row.assistantMessageId, + ...(row.sourceProposedPlanThreadId !== null && row.sourceProposedPlanId !== null + ? { + sourceProposedPlan: { + threadId: row.sourceProposedPlanThreadId, + planId: row.sourceProposedPlanId, + }, + } + : {}), + }; +} + +const makeProjectionThreadDetailQuery = Effect.gen(function* () { + const sql = yield* SqlClient.SqlClient; + + const getThreadRow = SqlSchema.findOneOption({ + Request: Schema.Struct({ threadId: ThreadId }), + Result: ProjectionThreadDbRowSchema, + execute: ({ threadId }) => + sql` + SELECT + thread_id AS "threadId", + project_id AS "projectId", + title, + model, + runtime_mode AS "runtimeMode", + interaction_mode AS "interactionMode", + branch, + worktree_path AS "worktreePath", + github_ref AS "githubRef", + latest_turn_id AS "latestTurnId", + created_at AS "createdAt", + updated_at AS "updatedAt", + deleted_at AS "deletedAt" + FROM projection_threads + WHERE thread_id = ${threadId} + `, + }); + + const listThreadMessageRows = SqlSchema.findAll({ + Request: Schema.Struct({ threadId: ThreadId }), + Result: ProjectionThreadMessageDbRowSchema, + execute: ({ threadId }) => + sql` + SELECT + message_id AS "messageId", + thread_id AS "threadId", + turn_id AS "turnId", + role, + text, + attachments_json AS "attachments", + is_streaming AS "isStreaming", + created_at AS "createdAt", + updated_at AS "updatedAt" + FROM projection_thread_messages + WHERE thread_id = ${threadId} + ORDER BY created_at ASC, message_id ASC + `, + }); + + const listThreadProposedPlanRows = SqlSchema.findAll({ + Request: Schema.Struct({ threadId: ThreadId }), + Result: ProjectionThreadProposedPlanDbRowSchema, + execute: ({ threadId }) => + sql` + SELECT + plan_id AS "planId", + thread_id AS "threadId", + turn_id AS "turnId", + plan_markdown AS "planMarkdown", + implemented_at AS "implementedAt", + implementation_thread_id AS "implementationThreadId", + created_at AS "createdAt", + updated_at AS "updatedAt" + FROM projection_thread_proposed_plans + WHERE thread_id = ${threadId} + ORDER BY created_at ASC, plan_id ASC + `, + }); + + const listThreadActivityRows = SqlSchema.findAll({ + Request: Schema.Struct({ threadId: ThreadId }), + Result: ProjectionThreadActivityDbRowSchema, + execute: ({ threadId }) => + sql` + SELECT + activity_id AS "activityId", + thread_id AS "threadId", + turn_id AS "turnId", + tone, + kind, + summary, + payload_json AS "payload", + sequence, + created_at AS "createdAt" + FROM projection_thread_activities + WHERE thread_id = ${threadId} + ORDER BY + CASE WHEN sequence IS NULL THEN 0 ELSE 1 END ASC, + sequence ASC, + created_at ASC, + activity_id ASC + `, + }); + + const getThreadSessionRow = SqlSchema.findOneOption({ + Request: Schema.Struct({ threadId: ThreadId }), + Result: ProjectionThreadSessionDbRowSchema, + execute: ({ threadId }) => + sql` + SELECT + thread_id AS "threadId", + status, + provider_name AS "providerName", + provider_session_id AS "providerSessionId", + provider_thread_id AS "providerThreadId", + runtime_mode AS "runtimeMode", + active_turn_id AS "activeTurnId", + last_error AS "lastError", + updated_at AS "updatedAt" + FROM projection_thread_sessions + WHERE thread_id = ${threadId} + `, + }); + + const listCheckpointRows = SqlSchema.findAll({ + Request: Schema.Struct({ threadId: ThreadId }), + Result: ProjectionCheckpointDbRowSchema, + execute: ({ threadId }) => + sql` + SELECT + thread_id AS "threadId", + turn_id AS "turnId", + checkpoint_turn_count AS "checkpointTurnCount", + checkpoint_ref AS "checkpointRef", + checkpoint_status AS "status", + checkpoint_files_json AS "files", + assistant_message_id AS "assistantMessageId", + completed_at AS "completedAt" + FROM projection_turns + WHERE thread_id = ${threadId} + AND checkpoint_turn_count IS NOT NULL + ORDER BY checkpoint_turn_count ASC + `, + }); + + const listLatestTurnRows = SqlSchema.findAll({ + Request: Schema.Struct({ threadId: ThreadId }), + Result: ProjectionLatestTurnDbRowSchema, + execute: ({ threadId }) => + sql` + SELECT + thread_id AS "threadId", + turn_id AS "turnId", + state, + requested_at AS "requestedAt", + started_at AS "startedAt", + completed_at AS "completedAt", + assistant_message_id AS "assistantMessageId", + source_proposed_plan_thread_id AS "sourceProposedPlanThreadId", + source_proposed_plan_id AS "sourceProposedPlanId" + FROM projection_turns + WHERE thread_id = ${threadId} + AND turn_id IS NOT NULL + ORDER BY requested_at DESC, turn_id DESC + `, + }); + + const getThreadDetail: ProjectionThreadDetailQueryShape["getThreadDetail"] = (input) => + sql.withTransaction( + Effect.gen(function* () { + const threadRow = yield* getThreadRow(input); + if (Option.isNone(threadRow) || threadRow.value.deletedAt !== null) { + return null; + } + + const [messageRows, proposedPlanRows, activityRows, sessionRow, checkpointRows, latestTurnRows] = + yield* Effect.all([ + listThreadMessageRows(input), + listThreadProposedPlanRows(input), + listThreadActivityRows(input), + getThreadSessionRow(input), + listCheckpointRows(input), + listLatestTurnRows(input), + ]); + + const messages: OrchestrationMessage[] = messageRows.map((row) => ({ + id: row.messageId, + role: row.role, + text: row.text, + ...(row.attachments !== null ? { attachments: row.attachments } : {}), + turnId: row.turnId, + streaming: row.isStreaming === 1, + createdAt: row.createdAt, + updatedAt: row.updatedAt, + })); + + const proposedPlans: OrchestrationProposedPlan[] = proposedPlanRows.map((row) => ({ + id: row.planId, + turnId: row.turnId, + planMarkdown: row.planMarkdown, + implementedAt: row.implementedAt, + implementationThreadId: row.implementationThreadId, + createdAt: row.createdAt, + updatedAt: row.updatedAt, + })); + + const activities: OrchestrationThreadActivity[] = activityRows.map((row) => ({ + id: row.activityId, + tone: row.tone, + kind: row.kind, + summary: row.summary, + payload: row.payload, + turnId: row.turnId, + ...(row.sequence !== null ? { sequence: row.sequence } : {}), + createdAt: row.createdAt, + })); + + const checkpoints: OrchestrationCheckpointSummary[] = checkpointRows.map((row) => ({ + turnId: row.turnId, + checkpointTurnCount: row.checkpointTurnCount, + checkpointRef: row.checkpointRef, + status: row.status, + files: row.files, + assistantMessageId: row.assistantMessageId, + completedAt: row.completedAt, + })); + + const session: OrchestrationSession | null = Option.isSome(sessionRow) + ? { + threadId: sessionRow.value.threadId, + status: sessionRow.value.status, + providerName: sessionRow.value.providerName, + runtimeMode: sessionRow.value.runtimeMode, + activeTurnId: sessionRow.value.activeTurnId, + lastError: sessionRow.value.lastError, + updatedAt: sessionRow.value.updatedAt, + } + : null; + + return Schema.decodeUnknownSync(OrchestrationThread)({ + id: threadRow.value.threadId, + projectId: threadRow.value.projectId, + title: threadRow.value.title, + model: threadRow.value.model, + runtimeMode: threadRow.value.runtimeMode, + interactionMode: threadRow.value.interactionMode, + branch: threadRow.value.branch, + worktreePath: threadRow.value.worktreePath, + ...(parseGithubRef(threadRow.value.githubRef) + ? { githubRef: parseGithubRef(threadRow.value.githubRef) } + : {}), + latestTurn: latestTurnRows[0] ? toLatestTurn(latestTurnRows[0]) : null, + createdAt: threadRow.value.createdAt, + updatedAt: threadRow.value.updatedAt, + deletedAt: null, + messages, + proposedPlans, + activities, + checkpoints, + session, + }); + }), + ).pipe( + Effect.mapError((cause): ProjectionRepositoryError => { + if (Schema.isSchemaError(cause)) { + return toPersistenceDecodeError("ProjectionThreadDetailQuery.getThreadDetail:decode")(cause); + } + return isPersistenceError(cause) + ? cause + : toPersistenceSqlError("ProjectionThreadDetailQuery.getThreadDetail:query")(cause); + }), + ); + + return { getThreadDetail } satisfies ProjectionThreadDetailQueryShape; +}); + +export const OrchestrationProjectionThreadDetailQueryLive = Layer.effect( + ProjectionThreadDetailQuery, + makeProjectionThreadDetailQuery, +); diff --git a/apps/server/src/orchestration/Services/ProjectionOverviewQuery.ts b/apps/server/src/orchestration/Services/ProjectionOverviewQuery.ts new file mode 100644 index 000000000..9cdfdef72 --- /dev/null +++ b/apps/server/src/orchestration/Services/ProjectionOverviewQuery.ts @@ -0,0 +1,14 @@ +import type { OrchestrationOverviewSnapshot } from "@okcode/contracts"; +import { ServiceMap } from "effect"; +import type { Effect } from "effect"; + +import type { ProjectionRepositoryError } from "../../persistence/Errors.ts"; + +export interface ProjectionOverviewQueryShape { + readonly getOverview: () => Effect.Effect; +} + +export class ProjectionOverviewQuery extends ServiceMap.Service< + ProjectionOverviewQuery, + ProjectionOverviewQueryShape +>()("okcode/orchestration/Services/ProjectionOverviewQuery") {} diff --git a/apps/server/src/orchestration/Services/ProjectionThreadDetailQuery.ts b/apps/server/src/orchestration/Services/ProjectionThreadDetailQuery.ts new file mode 100644 index 000000000..d20b28ccb --- /dev/null +++ b/apps/server/src/orchestration/Services/ProjectionThreadDetailQuery.ts @@ -0,0 +1,16 @@ +import type { OrchestrationThread, ThreadId } from "@okcode/contracts"; +import { ServiceMap } from "effect"; +import type { Effect } from "effect"; + +import type { ProjectionRepositoryError } from "../../persistence/Errors.ts"; + +export interface ProjectionThreadDetailQueryShape { + readonly getThreadDetail: ( + input: { readonly threadId: ThreadId }, + ) => Effect.Effect; +} + +export class ProjectionThreadDetailQuery extends ServiceMap.Service< + ProjectionThreadDetailQuery, + ProjectionThreadDetailQueryShape +>()("okcode/orchestration/Services/ProjectionThreadDetailQuery") {} diff --git a/apps/server/src/persistence/Layers/ProjectionPendingUserInputs.ts b/apps/server/src/persistence/Layers/ProjectionPendingUserInputs.ts new file mode 100644 index 000000000..596a2a4fb --- /dev/null +++ b/apps/server/src/persistence/Layers/ProjectionPendingUserInputs.ts @@ -0,0 +1,131 @@ +import * as SqlClient from "effect/unstable/sql/SqlClient"; +import * as SqlSchema from "effect/unstable/sql/SqlSchema"; +import { Effect, Layer } from "effect"; + +import { toPersistenceSqlError } from "../Errors.ts"; +import { + DeleteProjectionPendingUserInputInput, + GetProjectionPendingUserInputInput, + ListProjectionPendingUserInputsInput, + ProjectionPendingUserInput, + ProjectionPendingUserInputRepository, + type ProjectionPendingUserInputRepositoryShape, +} from "../Services/ProjectionPendingUserInputs.ts"; + +const makeProjectionPendingUserInputRepository = Effect.gen(function* () { + const sql = yield* SqlClient.SqlClient; + + const upsertProjectionPendingUserInputRow = SqlSchema.void({ + Request: ProjectionPendingUserInput, + execute: (row) => + sql` + INSERT INTO projection_pending_user_inputs ( + request_id, + thread_id, + turn_id, + status, + created_at, + resolved_at + ) + VALUES ( + ${row.requestId}, + ${row.threadId}, + ${row.turnId}, + ${row.status}, + ${row.createdAt}, + ${row.resolvedAt} + ) + ON CONFLICT (request_id) + DO UPDATE SET + thread_id = excluded.thread_id, + turn_id = excluded.turn_id, + status = excluded.status, + created_at = excluded.created_at, + resolved_at = excluded.resolved_at + `, + }); + + const listProjectionPendingUserInputRows = SqlSchema.findAll({ + Request: ListProjectionPendingUserInputsInput, + Result: ProjectionPendingUserInput, + execute: ({ threadId }) => + sql` + SELECT + request_id AS "requestId", + thread_id AS "threadId", + turn_id AS "turnId", + status, + created_at AS "createdAt", + resolved_at AS "resolvedAt" + FROM projection_pending_user_inputs + WHERE thread_id = ${threadId} + ORDER BY created_at ASC, request_id ASC + `, + }); + + const getProjectionPendingUserInputRow = SqlSchema.findOneOption({ + Request: GetProjectionPendingUserInputInput, + Result: ProjectionPendingUserInput, + execute: ({ requestId }) => + sql` + SELECT + request_id AS "requestId", + thread_id AS "threadId", + turn_id AS "turnId", + status, + created_at AS "createdAt", + resolved_at AS "resolvedAt" + FROM projection_pending_user_inputs + WHERE request_id = ${requestId} + `, + }); + + const deleteProjectionPendingUserInputRow = SqlSchema.void({ + Request: DeleteProjectionPendingUserInputInput, + execute: ({ requestId }) => + sql` + DELETE FROM projection_pending_user_inputs + WHERE request_id = ${requestId} + `, + }); + + const upsert: ProjectionPendingUserInputRepositoryShape["upsert"] = (row) => + upsertProjectionPendingUserInputRow(row).pipe( + Effect.mapError(toPersistenceSqlError("ProjectionPendingUserInputRepository.upsert:query")), + ); + + const listByThreadId: ProjectionPendingUserInputRepositoryShape["listByThreadId"] = (input) => + listProjectionPendingUserInputRows(input).pipe( + Effect.mapError( + toPersistenceSqlError("ProjectionPendingUserInputRepository.listByThreadId:query"), + ), + ); + + const getByRequestId: ProjectionPendingUserInputRepositoryShape["getByRequestId"] = (input) => + getProjectionPendingUserInputRow(input).pipe( + Effect.mapError( + toPersistenceSqlError("ProjectionPendingUserInputRepository.getByRequestId:query"), + ), + ); + + const deleteByRequestId: ProjectionPendingUserInputRepositoryShape["deleteByRequestId"] = ( + input, + ) => + deleteProjectionPendingUserInputRow(input).pipe( + Effect.mapError( + toPersistenceSqlError("ProjectionPendingUserInputRepository.deleteByRequestId:query"), + ), + ); + + return { + upsert, + listByThreadId, + getByRequestId, + deleteByRequestId, + } satisfies ProjectionPendingUserInputRepositoryShape; +}); + +export const ProjectionPendingUserInputRepositoryLive = Layer.effect( + ProjectionPendingUserInputRepository, + makeProjectionPendingUserInputRepository, +); diff --git a/apps/server/src/persistence/Migrations/021_ProjectionPendingUserInputs.ts b/apps/server/src/persistence/Migrations/021_ProjectionPendingUserInputs.ts new file mode 100644 index 000000000..511d10a93 --- /dev/null +++ b/apps/server/src/persistence/Migrations/021_ProjectionPendingUserInputs.ts @@ -0,0 +1,22 @@ +import * as SqlClient from "effect/unstable/sql/SqlClient"; +import * as Effect from "effect/Effect"; + +export default Effect.gen(function* () { + const sql = yield* SqlClient.SqlClient; + + yield* sql` + CREATE TABLE IF NOT EXISTS projection_pending_user_inputs ( + request_id TEXT PRIMARY KEY, + thread_id TEXT NOT NULL, + turn_id TEXT, + status TEXT NOT NULL, + created_at TEXT NOT NULL, + resolved_at TEXT + ) + `; + + yield* sql` + CREATE INDEX IF NOT EXISTS idx_projection_pending_user_inputs_thread_status + ON projection_pending_user_inputs(thread_id, status) + `; +}); diff --git a/apps/server/src/persistence/Services/ProjectionPendingUserInputs.ts b/apps/server/src/persistence/Services/ProjectionPendingUserInputs.ts new file mode 100644 index 000000000..eab307ad7 --- /dev/null +++ b/apps/server/src/persistence/Services/ProjectionPendingUserInputs.ts @@ -0,0 +1,52 @@ +import { ApprovalRequestId, IsoDateTime, ThreadId, TurnId } from "@okcode/contracts"; +import { Option, Schema, ServiceMap } from "effect"; +import type { Effect } from "effect"; + +import type { ProjectionRepositoryError } from "../Errors.ts"; + +export const ProjectionPendingUserInput = Schema.Struct({ + requestId: ApprovalRequestId, + threadId: ThreadId, + turnId: Schema.NullOr(TurnId), + status: Schema.Literals(["pending", "resolved"]), + createdAt: IsoDateTime, + resolvedAt: Schema.NullOr(IsoDateTime), +}); +export type ProjectionPendingUserInput = typeof ProjectionPendingUserInput.Type; + +export const ListProjectionPendingUserInputsInput = Schema.Struct({ + threadId: ThreadId, +}); +export type ListProjectionPendingUserInputsInput = typeof ListProjectionPendingUserInputsInput.Type; + +export const GetProjectionPendingUserInputInput = Schema.Struct({ + requestId: ApprovalRequestId, +}); +export type GetProjectionPendingUserInputInput = typeof GetProjectionPendingUserInputInput.Type; + +export const DeleteProjectionPendingUserInputInput = Schema.Struct({ + requestId: ApprovalRequestId, +}); +export type DeleteProjectionPendingUserInputInput = typeof DeleteProjectionPendingUserInputInput.Type; + +export interface ProjectionPendingUserInputRepositoryShape { + readonly upsert: ( + row: ProjectionPendingUserInput, + ) => Effect.Effect; + readonly listByThreadId: ( + input: ListProjectionPendingUserInputsInput, + ) => Effect.Effect, ProjectionRepositoryError>; + readonly getByRequestId: ( + input: GetProjectionPendingUserInputInput, + ) => Effect.Effect, ProjectionRepositoryError>; + readonly deleteByRequestId: ( + input: DeleteProjectionPendingUserInputInput, + ) => Effect.Effect; +} + +export class ProjectionPendingUserInputRepository extends ServiceMap.Service< + ProjectionPendingUserInputRepository, + ProjectionPendingUserInputRepositoryShape +>()( + "okcode/persistence/Services/ProjectionPendingUserInputs/ProjectionPendingUserInputRepository", +) {} diff --git a/apps/server/src/serverLayers.ts b/apps/server/src/serverLayers.ts index b7ee3bb6c..13f9345de 100644 --- a/apps/server/src/serverLayers.ts +++ b/apps/server/src/serverLayers.ts @@ -14,6 +14,8 @@ import { OrchestrationReactorLive } from "./orchestration/Layers/OrchestrationRe import { ProviderCommandReactorLive } from "./orchestration/Layers/ProviderCommandReactor"; import { OrchestrationProjectionPipelineLive } from "./orchestration/Layers/ProjectionPipeline"; import { OrchestrationProjectionSnapshotQueryLive } from "./orchestration/Layers/ProjectionSnapshotQuery"; +import { OrchestrationProjectionOverviewQueryLive } from "./orchestration/Layers/ProjectionOverviewQuery"; +import { OrchestrationProjectionThreadDetailQueryLive } from "./orchestration/Layers/ProjectionThreadDetailQuery"; import { ProviderRuntimeIngestionLive } from "./orchestration/Layers/ProviderRuntimeIngestion"; import { RuntimeReceiptBusLive } from "./orchestration/Layers/RuntimeReceiptBus"; import { ProviderUnsupportedError } from "./provider/Errors"; @@ -85,12 +87,16 @@ export function makeServerProviderLayer(): Layer.Layer< ).pipe( Layer.provideMerge(EnvironmentVariablesLive), Layer.provideMerge(OrchestrationProjectionSnapshotQueryLive), + Layer.provideMerge(OrchestrationProjectionOverviewQueryLive), + Layer.provideMerge(OrchestrationProjectionThreadDetailQueryLive), ); const claudeAdapterLayer = makeClaudeAdapterLive( nativeEventLogger ? { nativeEventLogger } : undefined, ).pipe( Layer.provideMerge(EnvironmentVariablesLive), Layer.provideMerge(OrchestrationProjectionSnapshotQueryLive), + Layer.provideMerge(OrchestrationProjectionOverviewQueryLive), + Layer.provideMerge(OrchestrationProjectionThreadDetailQueryLive), ); const openclawAdapterLayer = makeOpenClawAdapterLive( nativeEventLogger ? { nativeEventLogger } : undefined, @@ -125,6 +131,8 @@ export function makeServerRuntimeServicesLayer() { const runtimeServicesLayer = Layer.empty.pipe( Layer.provideMerge(EnvironmentVariablesLive), Layer.provideMerge(OrchestrationProjectionSnapshotQueryLive), + Layer.provideMerge(OrchestrationProjectionOverviewQueryLive), + Layer.provideMerge(OrchestrationProjectionThreadDetailQueryLive), Layer.provideMerge(orchestrationLayer), Layer.provideMerge(checkpointStoreLayer), Layer.provideMerge(checkpointDiffQueryLayer), diff --git a/packages/contracts/src/ipc.ts b/packages/contracts/src/ipc.ts index cb10d73ed..827181203 100644 --- a/packages/contracts/src/ipc.ts +++ b/packages/contracts/src/ipc.ts @@ -116,10 +116,12 @@ import type { ClientOrchestrationCommand, OrchestrationGetFullThreadDiffInput, OrchestrationGetFullThreadDiffResult, + OrchestrationGetThreadDetailInput, OrchestrationGetTurnDiffInput, OrchestrationGetTurnDiffResult, OrchestrationEvent, - OrchestrationReadModel, + OrchestrationOverviewSnapshot, + OrchestrationThread, } from "./orchestration"; import type { SmeConversation, @@ -455,7 +457,8 @@ export interface NativeApi { testOpenclawGateway: (input: TestOpenclawGatewayInput) => Promise; }; orchestration: { - getSnapshot: () => Promise; + getSnapshot: () => Promise; + getThreadDetail: (input: OrchestrationGetThreadDetailInput) => Promise; dispatchCommand: (command: ClientOrchestrationCommand) => Promise<{ sequence: number }>; getTurnDiff: (input: OrchestrationGetTurnDiffInput) => Promise; getFullThreadDiff: ( diff --git a/packages/contracts/src/orchestration.ts b/packages/contracts/src/orchestration.ts index 7a6a8d250..2321bd529 100644 --- a/packages/contracts/src/orchestration.ts +++ b/packages/contracts/src/orchestration.ts @@ -18,6 +18,7 @@ import { GitHubRef } from "./github"; export const ORCHESTRATION_WS_METHODS = { getSnapshot: "orchestration.getSnapshot", + getThreadDetail: "orchestration.getThreadDetail", dispatchCommand: "orchestration.dispatchCommand", getTurnDiff: "orchestration.getTurnDiff", getFullThreadDiff: "orchestration.getFullThreadDiff", @@ -347,6 +348,56 @@ export const OrchestrationThread = Schema.Struct({ }); export type OrchestrationThread = typeof OrchestrationThread.Type; +export const OrchestrationLimits = Schema.Struct({ + maxProjects: NonNegativeInt, + maxThreadsPerProject: NonNegativeInt, +}); +export type OrchestrationLimits = typeof OrchestrationLimits.Type; + +export const OrchestrationOverviewProject = Schema.Struct({ + id: ProjectId, + title: TrimmedNonEmptyString, + workspaceRoot: TrimmedNonEmptyString, + defaultModel: Schema.NullOr(TrimmedNonEmptyString), + scripts: Schema.Array(ProjectScript), + activeThreadCount: NonNegativeInt, + createdAt: IsoDateTime, + updatedAt: IsoDateTime, +}); +export type OrchestrationOverviewProject = typeof OrchestrationOverviewProject.Type; + +export const OrchestrationOverviewThread = Schema.Struct({ + id: ThreadId, + projectId: ProjectId, + title: TrimmedNonEmptyString, + model: TrimmedNonEmptyString, + runtimeMode: RuntimeMode, + interactionMode: ProviderInteractionMode.pipe( + Schema.withDecodingDefault(() => DEFAULT_PROVIDER_INTERACTION_MODE), + ), + branch: Schema.NullOr(TrimmedNonEmptyString), + worktreePath: Schema.NullOr(TrimmedNonEmptyString), + githubRef: Schema.optional(GitHubRef), + latestTurn: Schema.NullOr(OrchestrationLatestTurn), + session: Schema.NullOr(OrchestrationSession), + createdAt: IsoDateTime, + updatedAt: IsoDateTime, + lastUserMessageAt: Schema.NullOr(IsoDateTime), + pendingApprovalCount: NonNegativeInt, + pendingUserInputCount: NonNegativeInt, + hasActionablePlan: Schema.Boolean, +}); +export type OrchestrationOverviewThread = typeof OrchestrationOverviewThread.Type; + +export const OrchestrationOverviewSnapshot = Schema.Struct({ + snapshotSequence: NonNegativeInt, + limits: OrchestrationLimits, + projects: Schema.Array(OrchestrationOverviewProject), + threads: Schema.Array(OrchestrationOverviewThread), + updatedAt: IsoDateTime, +}); +export type OrchestrationOverviewSnapshot = typeof OrchestrationOverviewSnapshot.Type; + export const OrchestrationReadModel = Schema.Struct({ snapshotSequence: NonNegativeInt, projects: Schema.Array(OrchestrationProject), @@ -689,9 +740,13 @@ export const ProjectMetaUpdatedPayload = Schema.Struct({ updatedAt: IsoDateTime, }); +export const ProjectDeletedReason = Schema.Literals(["manual", "limit-eviction"]); +export type ProjectDeletedReason = typeof ProjectDeletedReason.Type; + export const ProjectDeletedPayload = Schema.Struct({ projectId: ProjectId, deletedAt: IsoDateTime, + reason: ProjectDeletedReason, }); export const ThreadCreatedPayload = Schema.Struct({ @@ -710,9 +765,17 @@ export const ThreadCreatedPayload = Schema.Struct({ updatedAt: IsoDateTime, }); +export const ThreadDeletedReason = Schema.Literals([ + "manual", + "limit-eviction", + "project-deleted", +]); +export type ThreadDeletedReason = typeof ThreadDeletedReason.Type; + export const ThreadDeletedPayload = Schema.Struct({ threadId: ThreadId, deletedAt: IsoDateTime, + reason: ThreadDeletedReason, }); export const ThreadMetaUpdatedPayload = Schema.Struct({ @@ -1026,9 +1089,17 @@ export type DispatchResult = typeof DispatchResult.Type; export const OrchestrationGetSnapshotInput = Schema.Struct({}); export type OrchestrationGetSnapshotInput = typeof OrchestrationGetSnapshotInput.Type; -const OrchestrationGetSnapshotResult = OrchestrationReadModel; +const OrchestrationGetSnapshotResult = OrchestrationOverviewSnapshot; export type OrchestrationGetSnapshotResult = typeof OrchestrationGetSnapshotResult.Type; +export const OrchestrationGetThreadDetailInput = Schema.Struct({ + threadId: ThreadId, +}); +export type OrchestrationGetThreadDetailInput = typeof OrchestrationGetThreadDetailInput.Type; + +const OrchestrationGetThreadDetailResult = Schema.NullOr(OrchestrationThread); +export type OrchestrationGetThreadDetailResult = typeof OrchestrationGetThreadDetailResult.Type; + export const OrchestrationGetTurnDiffInput = TurnCountRange.mapFields( Struct.assign({ threadId: ThreadId, @@ -1066,6 +1137,10 @@ export const OrchestrationRpcSchemas = { input: OrchestrationGetSnapshotInput, output: OrchestrationGetSnapshotResult, }, + getThreadDetail: { + input: OrchestrationGetThreadDetailInput, + output: OrchestrationGetThreadDetailResult, + }, dispatchCommand: { input: ClientOrchestrationCommand, output: DispatchResult,