diff --git a/apps/server/src/codexAppServerManager.test.ts b/apps/server/src/codexAppServerManager.test.ts index 2b5b9f7fd..bb6b6429f 100644 --- a/apps/server/src/codexAppServerManager.test.ts +++ b/apps/server/src/codexAppServerManager.test.ts @@ -199,6 +199,12 @@ describe("classifyCodexStderrLine", () => { expect(classifyCodexStderrLine(line)).toBeNull(); }); + it("ignores redacted AuthRequired transport shutdown noise", () => { + const line = + '2026-04-10T17:26:09.835934Z ERROR rmcp::transport::worker: worker quit with fatal: Transport channel closed, when AuthRequired(AuthRequiredError { www_authenticate_header: "Bearer [REDACTED]" })'; + expect(classifyCodexStderrLine(line)).toBeNull(); + }); + it("keeps unknown structured errors", () => { const line = "2026-02-08T04:24:20.085687Z ERROR codex_core::runtime: unrecoverable failure"; expect(classifyCodexStderrLine(line)).toEqual({ diff --git a/apps/server/src/codexAppServerManager.ts b/apps/server/src/codexAppServerManager.ts index d3c74866f..da1f3a706 100644 --- a/apps/server/src/codexAppServerManager.ts +++ b/apps/server/src/codexAppServerManager.ts @@ -162,7 +162,7 @@ const BENIGN_ERROR_LOG_SNIPPETS = [ "state db record_discrepancy: find_thread_path_by_id_str_in_subdir, falling_back", ]; const BENIGN_STDERR_MESSAGE_SNIPPETS = [ - 'worker quit with fatal: Transport channel closed, when AuthRequired(AuthRequiredError { www_authenticate_header: "Bearer error=\\"invalid_request\\", error_description=\\"No access token was provided in this request\\", resource_metadata=\\"https://mcp.supabase.com/.well-known/oauth-protected-resource/mcp\\""', + "worker quit with fatal: Transport channel closed, when AuthRequired(AuthRequiredError", ]; const RECOVERABLE_THREAD_RESUME_ERROR_SNIPPETS = [ "not found", diff --git a/apps/server/src/orchestration/Layers/ProjectionOverviewQuery.ts b/apps/server/src/orchestration/Layers/ProjectionOverviewQuery.ts index 0cf520523..a011df914 100644 --- a/apps/server/src/orchestration/Layers/ProjectionOverviewQuery.ts +++ b/apps/server/src/orchestration/Layers/ProjectionOverviewQuery.ts @@ -24,7 +24,10 @@ import { toPersistenceSqlError, type ProjectionRepositoryError, } from "../../persistence/Errors.ts"; -import { ProjectionOverviewQuery, type ProjectionOverviewQueryShape } from "../Services/ProjectionOverviewQuery.ts"; +import { + ProjectionOverviewQuery, + type ProjectionOverviewQueryShape, +} from "../Services/ProjectionOverviewQuery.ts"; import { ProjectionState } from "../../persistence/Services/ProjectionState.ts"; import { ProjectionProject } from "../../persistence/Services/ProjectionProjects.ts"; import { ProjectionThread } from "../../persistence/Services/ProjectionThreads.ts"; @@ -75,7 +78,9 @@ const ProjectionThreadPlanSummaryRow = Schema.Struct({ const ProjectionStateDbRowSchema = ProjectionState; const ProjectionThreadSessionDbRowSchema = ProjectionThreadSession; -function parseGithubRef(serialized: string | null): OrchestrationOverviewThread["githubRef"] | undefined { +function parseGithubRef( + serialized: string | null, +): OrchestrationOverviewThread["githubRef"] | undefined { if (!serialized) return undefined; try { return JSON.parse(serialized) as OrchestrationOverviewThread["githubRef"]; @@ -153,13 +158,14 @@ function hasActionablePlan( const matchingTurnPlan = latestTurnId === null ? null - : [...plans] + : ([...plans] .filter((plan) => plan.turnId === latestTurnId) .toSorted( (left, right) => - left.updatedAt.localeCompare(right.updatedAt) || left.planId.localeCompare(right.planId), + left.updatedAt.localeCompare(right.updatedAt) || + left.planId.localeCompare(right.planId), ) - .at(-1) ?? null; + .at(-1) ?? null); if (matchingTurnPlan) { return matchingTurnPlan.implementedAt === null; } @@ -315,109 +321,120 @@ const makeProjectionOverviewQuery = Effect.gen(function* () { }); const getOverview: ProjectionOverviewQueryShape["getOverview"] = () => - sql.withTransaction( - Effect.gen(function* () { - const [projectRows, threadRows, latestTurnRows, sessionRows, planRows, stateRows] = - yield* Effect.all([ - listProjectRows(undefined), - listThreadRows(undefined), - listLatestTurnRows(undefined), - listSessionRows(undefined), - listPlanRows(undefined), - listProjectionStateRows(undefined), - ]); + sql + .withTransaction( + Effect.gen(function* () { + const [projectRows, threadRows, latestTurnRows, sessionRows, planRows, stateRows] = + yield* Effect.all([ + listProjectRows(undefined), + listThreadRows(undefined), + listLatestTurnRows(undefined), + listSessionRows(undefined), + listPlanRows(undefined), + listProjectionStateRows(undefined), + ]); - const latestTurnByThread = new Map(); - for (const row of latestTurnRows) { - if (!latestTurnByThread.has(row.threadId)) { - latestTurnByThread.set(row.threadId, toLatestTurn(row)); + const latestTurnByThread = new Map(); + for (const row of latestTurnRows) { + if (!latestTurnByThread.has(row.threadId)) { + latestTurnByThread.set(row.threadId, toLatestTurn(row)); + } } - } - const sessionByThread = new Map(); - for (const row of sessionRows) { - sessionByThread.set(row.threadId, { - threadId: row.threadId, - status: row.status, - providerName: row.providerName, - runtimeMode: row.runtimeMode, - activeTurnId: row.activeTurnId, - lastError: row.lastError, - updatedAt: row.updatedAt, - }); - } - - const plansByThread = new Map>>(); - for (const row of planRows) { - const plans = plansByThread.get(row.threadId) ?? []; - plans.push(row); - plansByThread.set(row.threadId, plans); - } + const sessionByThread = new Map(); + for (const row of sessionRows) { + sessionByThread.set(row.threadId, { + threadId: row.threadId, + status: row.status, + providerName: row.providerName, + runtimeMode: row.runtimeMode, + activeTurnId: row.activeTurnId, + lastError: row.lastError, + updatedAt: row.updatedAt, + }); + } - const projects: OrchestrationOverviewProject[] = projectRows.map((row) => ({ - id: row.projectId, - title: row.title, - workspaceRoot: row.workspaceRoot, - defaultModel: row.defaultModel, - scripts: row.scripts, - activeThreadCount: row.activeThreadCount, - createdAt: row.createdAt, - updatedAt: row.updatedAt, - })); + const plansByThread = new Map< + string, + Array> + >(); + for (const row of planRows) { + const plans = plansByThread.get(row.threadId) ?? []; + plans.push(row); + plansByThread.set(row.threadId, plans); + } - const threads: OrchestrationOverviewThread[] = threadRows.map((row) => { - const latestTurn = latestTurnByThread.get(row.threadId) ?? null; - return { - id: row.threadId, - projectId: row.projectId, + const projects: OrchestrationOverviewProject[] = projectRows.map((row) => ({ + id: row.projectId, title: row.title, - model: row.model, - runtimeMode: row.runtimeMode, - interactionMode: row.interactionMode, - branch: row.branch, - worktreePath: row.worktreePath, - ...(parseGithubRef(row.githubRef) ? { githubRef: parseGithubRef(row.githubRef) } : {}), - latestTurn, - session: sessionByThread.get(row.threadId) ?? null, + workspaceRoot: row.workspaceRoot, + defaultModel: row.defaultModel, + scripts: row.scripts, + activeThreadCount: row.activeThreadCount, createdAt: row.createdAt, updatedAt: row.updatedAt, - lastUserMessageAt: row.lastUserMessageAt, - pendingApprovalCount: row.pendingApprovalCount, - pendingUserInputCount: row.pendingUserInputCount, - hasActionablePlan: hasActionablePlan(plansByThread.get(row.threadId) ?? [], latestTurn), - }; - }); + })); - const updatedAtCandidates = [ - ...projectRows.map((row) => row.updatedAt), - ...threadRows.map((row) => row.updatedAt), - ...sessionRows.map((row) => row.updatedAt), - ...stateRows.map((row) => row.updatedAt), - ]; + const threads: OrchestrationOverviewThread[] = threadRows.map((row) => { + const latestTurn = latestTurnByThread.get(row.threadId) ?? null; + return { + id: row.threadId, + projectId: row.projectId, + title: row.title, + model: row.model, + runtimeMode: row.runtimeMode, + interactionMode: row.interactionMode, + branch: row.branch, + worktreePath: row.worktreePath, + ...(parseGithubRef(row.githubRef) + ? { githubRef: parseGithubRef(row.githubRef) } + : {}), + latestTurn, + session: sessionByThread.get(row.threadId) ?? null, + createdAt: row.createdAt, + updatedAt: row.updatedAt, + lastUserMessageAt: row.lastUserMessageAt, + pendingApprovalCount: row.pendingApprovalCount, + pendingUserInputCount: row.pendingUserInputCount, + hasActionablePlan: hasActionablePlan( + plansByThread.get(row.threadId) ?? [], + latestTurn, + ), + }; + }); - return Schema.decodeUnknownSync(OrchestrationOverviewSnapshot)({ - snapshotSequence: computeSnapshotSequence(stateRows), - limits: { - maxProjects: MAX_PROJECTS, - maxThreadsPerProject: MAX_THREADS_PER_PROJECT, - }, - projects, - threads, - updatedAt: - updatedAtCandidates.sort((left, right) => (left < right ? 1 : left > right ? -1 : 0))[0] ?? - new Date(0).toISOString(), - }); - }), - ).pipe( - Effect.mapError((cause): ProjectionRepositoryError => { - if (Schema.isSchemaError(cause)) { - return toPersistenceDecodeError("ProjectionOverviewQuery.getOverview:decode")(cause); - } - return isPersistenceError(cause) - ? cause - : toPersistenceSqlError("ProjectionOverviewQuery.getOverview:query")(cause); - }), - ); + const updatedAtCandidates = [ + ...projectRows.map((row) => row.updatedAt), + ...threadRows.map((row) => row.updatedAt), + ...sessionRows.map((row) => row.updatedAt), + ...stateRows.map((row) => row.updatedAt), + ]; + + return Schema.decodeUnknownSync(OrchestrationOverviewSnapshot)({ + snapshotSequence: computeSnapshotSequence(stateRows), + limits: { + maxProjects: MAX_PROJECTS, + maxThreadsPerProject: MAX_THREADS_PER_PROJECT, + }, + projects, + threads, + updatedAt: + updatedAtCandidates.sort((left, right) => + left < right ? 1 : left > right ? -1 : 0, + )[0] ?? new Date(0).toISOString(), + }); + }), + ) + .pipe( + Effect.mapError((cause): ProjectionRepositoryError => { + if (Schema.isSchemaError(cause)) { + return toPersistenceDecodeError("ProjectionOverviewQuery.getOverview:decode")(cause); + } + return isPersistenceError(cause) + ? cause + : toPersistenceSqlError("ProjectionOverviewQuery.getOverview:query")(cause); + }), + ); return { getOverview } satisfies ProjectionOverviewQueryShape; }); diff --git a/apps/server/src/orchestration/Layers/ProjectionThreadDetailQuery.ts b/apps/server/src/orchestration/Layers/ProjectionThreadDetailQuery.ts index 4a763b596..4e55adfe9 100644 --- a/apps/server/src/orchestration/Layers/ProjectionThreadDetailQuery.ts +++ b/apps/server/src/orchestration/Layers/ProjectionThreadDetailQuery.ts @@ -31,7 +31,10 @@ import { ProjectionThreadMessage } from "../../persistence/Services/ProjectionTh import { ProjectionThreadProposedPlan } from "../../persistence/Services/ProjectionThreadProposedPlans.ts"; import { ProjectionThreadSession } from "../../persistence/Services/ProjectionThreadSessions.ts"; import { ProjectionThread } from "../../persistence/Services/ProjectionThreads.ts"; -import { ProjectionThreadDetailQuery, type ProjectionThreadDetailQueryShape } from "../Services/ProjectionThreadDetailQuery.ts"; +import { + ProjectionThreadDetailQuery, + type ProjectionThreadDetailQueryShape, +} from "../Services/ProjectionThreadDetailQuery.ts"; const ProjectionThreadMessageDbRowSchema = ProjectionThreadMessage.mapFields( Struct.assign({ @@ -259,15 +262,22 @@ const makeProjectionThreadDetailQuery = Effect.gen(function* () { }); const getThreadDetail: ProjectionThreadDetailQueryShape["getThreadDetail"] = (input) => - sql.withTransaction( - Effect.gen(function* () { - const threadRow = yield* getThreadRow(input); - if (Option.isNone(threadRow) || threadRow.value.deletedAt !== null) { - return null; - } + sql + .withTransaction( + Effect.gen(function* () { + const threadRow = yield* getThreadRow(input); + if (Option.isNone(threadRow) || threadRow.value.deletedAt !== null) { + return null; + } - const [messageRows, proposedPlanRows, activityRows, sessionRow, checkpointRows, latestTurnRows] = - yield* Effect.all([ + const [ + messageRows, + proposedPlanRows, + activityRows, + sessionRow, + checkpointRows, + latestTurnRows, + ] = yield* Effect.all([ listThreadMessageRows(input), listThreadProposedPlanRows(input), listThreadActivityRows(input), @@ -276,93 +286,96 @@ const makeProjectionThreadDetailQuery = Effect.gen(function* () { listLatestTurnRows(input), ]); - const messages: OrchestrationMessage[] = messageRows.map((row) => ({ - id: row.messageId, - role: row.role, - text: row.text, - ...(row.attachments !== null ? { attachments: row.attachments } : {}), - turnId: row.turnId, - streaming: row.isStreaming === 1, - createdAt: row.createdAt, - updatedAt: row.updatedAt, - })); + const messages: OrchestrationMessage[] = messageRows.map((row) => ({ + id: row.messageId, + role: row.role, + text: row.text, + ...(row.attachments !== null ? { attachments: row.attachments } : {}), + turnId: row.turnId, + streaming: row.isStreaming === 1, + createdAt: row.createdAt, + updatedAt: row.updatedAt, + })); - const proposedPlans: OrchestrationProposedPlan[] = proposedPlanRows.map((row) => ({ - id: row.planId, - turnId: row.turnId, - planMarkdown: row.planMarkdown, - implementedAt: row.implementedAt, - implementationThreadId: row.implementationThreadId, - createdAt: row.createdAt, - updatedAt: row.updatedAt, - })); + const proposedPlans: OrchestrationProposedPlan[] = proposedPlanRows.map((row) => ({ + id: row.planId, + turnId: row.turnId, + planMarkdown: row.planMarkdown, + implementedAt: row.implementedAt, + implementationThreadId: row.implementationThreadId, + createdAt: row.createdAt, + updatedAt: row.updatedAt, + })); - const activities: OrchestrationThreadActivity[] = activityRows.map((row) => ({ - id: row.activityId, - tone: row.tone, - kind: row.kind, - summary: row.summary, - payload: row.payload, - turnId: row.turnId, - ...(row.sequence !== null ? { sequence: row.sequence } : {}), - createdAt: row.createdAt, - })); + const activities: OrchestrationThreadActivity[] = activityRows.map((row) => ({ + id: row.activityId, + tone: row.tone, + kind: row.kind, + summary: row.summary, + payload: row.payload, + turnId: row.turnId, + ...(row.sequence !== null ? { sequence: row.sequence } : {}), + createdAt: row.createdAt, + })); - const checkpoints: OrchestrationCheckpointSummary[] = checkpointRows.map((row) => ({ - turnId: row.turnId, - checkpointTurnCount: row.checkpointTurnCount, - checkpointRef: row.checkpointRef, - status: row.status, - files: row.files, - assistantMessageId: row.assistantMessageId, - completedAt: row.completedAt, - })); + const checkpoints: OrchestrationCheckpointSummary[] = checkpointRows.map((row) => ({ + turnId: row.turnId, + checkpointTurnCount: row.checkpointTurnCount, + checkpointRef: row.checkpointRef, + status: row.status, + files: row.files, + assistantMessageId: row.assistantMessageId, + completedAt: row.completedAt, + })); - const session: OrchestrationSession | null = Option.isSome(sessionRow) - ? { - threadId: sessionRow.value.threadId, - status: sessionRow.value.status, - providerName: sessionRow.value.providerName, - runtimeMode: sessionRow.value.runtimeMode, - activeTurnId: sessionRow.value.activeTurnId, - lastError: sessionRow.value.lastError, - updatedAt: sessionRow.value.updatedAt, - } - : null; + const session: OrchestrationSession | null = Option.isSome(sessionRow) + ? { + threadId: sessionRow.value.threadId, + status: sessionRow.value.status, + providerName: sessionRow.value.providerName, + runtimeMode: sessionRow.value.runtimeMode, + activeTurnId: sessionRow.value.activeTurnId, + lastError: sessionRow.value.lastError, + updatedAt: sessionRow.value.updatedAt, + } + : null; - return Schema.decodeUnknownSync(OrchestrationThread)({ - id: threadRow.value.threadId, - projectId: threadRow.value.projectId, - title: threadRow.value.title, - model: threadRow.value.model, - runtimeMode: threadRow.value.runtimeMode, - interactionMode: threadRow.value.interactionMode, - branch: threadRow.value.branch, - worktreePath: threadRow.value.worktreePath, - ...(parseGithubRef(threadRow.value.githubRef) - ? { githubRef: parseGithubRef(threadRow.value.githubRef) } - : {}), - latestTurn: latestTurnRows[0] ? toLatestTurn(latestTurnRows[0]) : null, - createdAt: threadRow.value.createdAt, - updatedAt: threadRow.value.updatedAt, - deletedAt: null, - messages, - proposedPlans, - activities, - checkpoints, - session, - }); - }), - ).pipe( - Effect.mapError((cause): ProjectionRepositoryError => { - if (Schema.isSchemaError(cause)) { - return toPersistenceDecodeError("ProjectionThreadDetailQuery.getThreadDetail:decode")(cause); - } - return isPersistenceError(cause) - ? cause - : toPersistenceSqlError("ProjectionThreadDetailQuery.getThreadDetail:query")(cause); - }), - ); + return Schema.decodeUnknownSync(OrchestrationThread)({ + id: threadRow.value.threadId, + projectId: threadRow.value.projectId, + title: threadRow.value.title, + model: threadRow.value.model, + runtimeMode: threadRow.value.runtimeMode, + interactionMode: threadRow.value.interactionMode, + branch: threadRow.value.branch, + worktreePath: threadRow.value.worktreePath, + ...(parseGithubRef(threadRow.value.githubRef) + ? { githubRef: parseGithubRef(threadRow.value.githubRef) } + : {}), + latestTurn: latestTurnRows[0] ? toLatestTurn(latestTurnRows[0]) : null, + createdAt: threadRow.value.createdAt, + updatedAt: threadRow.value.updatedAt, + deletedAt: null, + messages, + proposedPlans, + activities, + checkpoints, + session, + }); + }), + ) + .pipe( + Effect.mapError((cause): ProjectionRepositoryError => { + if (Schema.isSchemaError(cause)) { + return toPersistenceDecodeError("ProjectionThreadDetailQuery.getThreadDetail:decode")( + cause, + ); + } + return isPersistenceError(cause) + ? cause + : toPersistenceSqlError("ProjectionThreadDetailQuery.getThreadDetail:query")(cause); + }), + ); return { getThreadDetail } satisfies ProjectionThreadDetailQueryShape; }); diff --git a/apps/server/src/orchestration/Services/ProjectionOverviewQuery.ts b/apps/server/src/orchestration/Services/ProjectionOverviewQuery.ts index 9cdfdef72..cdba3e72e 100644 --- a/apps/server/src/orchestration/Services/ProjectionOverviewQuery.ts +++ b/apps/server/src/orchestration/Services/ProjectionOverviewQuery.ts @@ -5,7 +5,10 @@ import type { Effect } from "effect"; import type { ProjectionRepositoryError } from "../../persistence/Errors.ts"; export interface ProjectionOverviewQueryShape { - readonly getOverview: () => Effect.Effect; + readonly getOverview: () => Effect.Effect< + OrchestrationOverviewSnapshot, + ProjectionRepositoryError + >; } export class ProjectionOverviewQuery extends ServiceMap.Service< diff --git a/apps/server/src/orchestration/Services/ProjectionThreadDetailQuery.ts b/apps/server/src/orchestration/Services/ProjectionThreadDetailQuery.ts index d20b28ccb..d9e382099 100644 --- a/apps/server/src/orchestration/Services/ProjectionThreadDetailQuery.ts +++ b/apps/server/src/orchestration/Services/ProjectionThreadDetailQuery.ts @@ -5,9 +5,9 @@ import type { Effect } from "effect"; import type { ProjectionRepositoryError } from "../../persistence/Errors.ts"; export interface ProjectionThreadDetailQueryShape { - readonly getThreadDetail: ( - input: { readonly threadId: ThreadId }, - ) => Effect.Effect; + readonly getThreadDetail: (input: { + readonly threadId: ThreadId; + }) => Effect.Effect; } export class ProjectionThreadDetailQuery extends ServiceMap.Service< diff --git a/apps/server/src/persistence/Services/ProjectionPendingUserInputs.ts b/apps/server/src/persistence/Services/ProjectionPendingUserInputs.ts index eab307ad7..7a1f1b72b 100644 --- a/apps/server/src/persistence/Services/ProjectionPendingUserInputs.ts +++ b/apps/server/src/persistence/Services/ProjectionPendingUserInputs.ts @@ -27,7 +27,8 @@ export type GetProjectionPendingUserInputInput = typeof GetProjectionPendingUser export const DeleteProjectionPendingUserInputInput = Schema.Struct({ requestId: ApprovalRequestId, }); -export type DeleteProjectionPendingUserInputInput = typeof DeleteProjectionPendingUserInputInput.Type; +export type DeleteProjectionPendingUserInputInput = + typeof DeleteProjectionPendingUserInputInput.Type; export interface ProjectionPendingUserInputRepositoryShape { readonly upsert: ( diff --git a/packages/contracts/src/ipc.ts b/packages/contracts/src/ipc.ts index 827181203..9e33b7c44 100644 --- a/packages/contracts/src/ipc.ts +++ b/packages/contracts/src/ipc.ts @@ -458,7 +458,9 @@ export interface NativeApi { }; orchestration: { getSnapshot: () => Promise; - getThreadDetail: (input: OrchestrationGetThreadDetailInput) => Promise; + getThreadDetail: ( + input: OrchestrationGetThreadDetailInput, + ) => Promise; dispatchCommand: (command: ClientOrchestrationCommand) => Promise<{ sequence: number }>; getTurnDiff: (input: OrchestrationGetTurnDiffInput) => Promise; getFullThreadDiff: ( diff --git a/packages/contracts/src/orchestration.ts b/packages/contracts/src/orchestration.ts index 2321bd529..e65d94448 100644 --- a/packages/contracts/src/orchestration.ts +++ b/packages/contracts/src/orchestration.ts @@ -765,11 +765,7 @@ export const ThreadCreatedPayload = Schema.Struct({ updatedAt: IsoDateTime, }); -export const ThreadDeletedReason = Schema.Literals([ - "manual", - "limit-eviction", - "project-deleted", -]); +export const ThreadDeletedReason = Schema.Literals(["manual", "limit-eviction", "project-deleted"]); export type ThreadDeletedReason = typeof ThreadDeletedReason.Type; export const ThreadDeletedPayload = Schema.Struct({