Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 13 additions & 1 deletion apps/server/src/orchestration/Layers/ProjectionSnapshotQuery.ts
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import { Effect, Layer, Schema, Struct } from "effect";
import * as SqlClient from "effect/unstable/sql/SqlClient";
import * as SqlSchema from "effect/unstable/sql/SqlSchema";

import { createLogger } from "../../logger";
import {
isPersistenceError,
toPersistenceDecodeError,
Expand Down Expand Up @@ -140,6 +141,7 @@ function toPersistenceSqlOrDecodeError(sqlOperation: string, decodeOperation: st

const makeProjectionSnapshotQuery = Effect.gen(function* () {
const sql = yield* SqlClient.SqlClient;
const logger = createLogger("projection-snapshot");

const listProjectRows = SqlSchema.findAll({
Request: Schema.Void,
Expand Down Expand Up @@ -326,6 +328,7 @@ const makeProjectionSnapshotQuery = Effect.gen(function* () {
sql
.withTransaction(
Effect.gen(function* () {
const startedAt = performance.now();
const [
projectRows,
threadRows,
Expand Down Expand Up @@ -584,11 +587,20 @@ const makeProjectionSnapshotQuery = Effect.gen(function* () {
updatedAt: updatedAt ?? new Date(0).toISOString(),
};

return yield* decodeReadModel(snapshot).pipe(
const decodedSnapshot = yield* decodeReadModel(snapshot).pipe(
Effect.mapError(
toPersistenceDecodeError("ProjectionSnapshotQuery.getSnapshot:decodeReadModel"),
),
);
logger.info("built orchestration snapshot", {
durationMs: Math.round((performance.now() - startedAt) * 100) / 100,
projectCount: decodedSnapshot.projects.length,
threadCount: decodedSnapshot.threads.length,
messageCount: messageRows.length,
activityCount: activityRows.length,
checkpointCount: checkpointRows.length,
});
return decodedSnapshot;
}),
)
.pipe(
Expand Down
2 changes: 2 additions & 0 deletions apps/server/src/serverLayers.ts
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import { makeEventNdjsonLogger } from "./provider/Layers/EventNdjsonLogger";
import { EnvironmentVariablesLive } from "./persistence/Services/EnvironmentVariables";

import { TerminalManagerLive } from "./terminal/Layers/Manager";
import { TerminalRuntimeEnvResolverLive } from "./terminal/Layers/RuntimeEnvResolver";
import { KeybindingsLive } from "./keybindings";
import { SkillServiceLive } from "./skills/SkillService";
import { GitManagerLive } from "./git/Layers/GitManager";
Expand Down Expand Up @@ -177,6 +178,7 @@ export function makeServerRuntimeServicesLayer() {
prReviewLayer,
githubLayer,
terminalLayer,
TerminalRuntimeEnvResolverLive,
KeybindingsLive,
SkillServiceLive,
smeChatLayer,
Expand Down
18 changes: 17 additions & 1 deletion apps/server/src/terminal/Layers/Manager.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ import {
type TerminalOpenInput,
type TerminalRestartInput,
} from "@okcode/contracts";
import { afterEach, describe, expect, it } from "vitest";
import { afterEach, describe, expect, it, vi } from "vitest";

import {
PtySpawnError,
Expand Down Expand Up @@ -381,6 +381,22 @@ describe("TerminalManager", () => {
manager.dispose();
});

it("skips repeated missing-history reads for brand-new sidecar terminals", async () => {
const { manager, logsDir } = makeManager();
const targetHistoryPath = multiTerminalHistoryLogPath(logsDir, "thread-1", "sidecar");
const readFileSpy = vi.spyOn(fs.promises, "readFile");

await manager.open(openInput({ terminalId: "sidecar" }));
await manager.close({ threadId: "thread-1", terminalId: "sidecar" });
await manager.open(openInput({ terminalId: "sidecar" }));

expect(
readFileSpy.mock.calls.filter(([filePath]) => String(filePath) === targetHistoryPath),
).toHaveLength(1);

manager.dispose();
});

it("emits exited event and reopens with clean transcript after exit", async () => {
const { manager, ptyAdapter, logsDir } = makeManager();
const events: TerminalEvent[] = [];
Expand Down
60 changes: 60 additions & 0 deletions apps/server/src/terminal/Layers/Manager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -507,6 +507,7 @@ export class TerminalManagerRuntime extends EventEmitter<TerminalManagerEvents>
private readonly persistQueues = new Map<string, Promise<void>>();
private readonly persistTimers = new Map<string, ReturnType<typeof setTimeout>>();
private readonly pendingPersistHistory = new Map<string, string>();
private readonly historyKnownMissing = new Set<string>();
private readonly threadLocks = new Map<string, Promise<void>>();
private readonly persistDebounceMs: number;
private readonly subprocessChecker: TerminalSubprocessChecker;
Expand Down Expand Up @@ -759,6 +760,7 @@ export class TerminalManagerRuntime extends EventEmitter<TerminalManagerEvents>
}
this.killEscalationTimers.clear();
this.pendingPersistHistory.clear();
this.historyKnownMissing.clear();
this.threadLocks.clear();
this.persistQueues.clear();
}
Expand All @@ -781,6 +783,7 @@ export class TerminalManagerRuntime extends EventEmitter<TerminalManagerEvents>

let ptyProcess: PtyProcess | null = null;
let startedShell: string | null = null;
const spawnStartedAt = performance.now();
try {
const shellCandidates = resolveShellCandidates(this.shellResolver);
const terminalEnv = createTerminalSpawnEnv(process.env, session.runtimeEnv);
Expand Down Expand Up @@ -856,6 +859,13 @@ export class TerminalManagerRuntime extends EventEmitter<TerminalManagerEvents>
createdAt: new Date().toISOString(),
snapshot: this.snapshot(session),
});
this.logger.info("terminal session started", {
threadId: session.threadId,
terminalId: session.terminalId,
cwd: session.cwd,
durationMs: Math.round((performance.now() - spawnStartedAt) * 100) / 100,
...(startedShell ? { shell: startedShell } : {}),
});
} catch (error) {
if (ptyProcess) {
this.killProcessWithEscalation(ptyProcess, session.threadId, session.terminalId);
Expand All @@ -878,6 +888,7 @@ export class TerminalManagerRuntime extends EventEmitter<TerminalManagerEvents>
this.logger.error("failed to start terminal", {
threadId: session.threadId,
terminalId: session.terminalId,
durationMs: Math.round((performance.now() - spawnStartedAt) * 100) / 100,
error: message,
...(startedShell ? { shell: startedShell } : {}),
});
Expand Down Expand Up @@ -1014,6 +1025,7 @@ export class TerminalManagerRuntime extends EventEmitter<TerminalManagerEvents>
this.sessions.delete(key);
this.clearPersistTimer(session.threadId, session.terminalId);
this.pendingPersistHistory.delete(key);
this.historyKnownMissing.delete(key);
this.persistQueues.delete(key);
this.clearKillEscalationTimer(session.process);
}
Expand Down Expand Up @@ -1044,6 +1056,7 @@ export class TerminalManagerRuntime extends EventEmitter<TerminalManagerEvents>
const persistenceKey = toSessionKey(threadId, terminalId);
const task = async () => {
await fs.promises.writeFile(this.historyPath(threadId, terminalId), history, "utf8");
this.historyKnownMissing.delete(persistenceKey);
};
const previous = this.persistQueues.get(persistenceKey) ?? Promise.resolve();
const next = previous
Expand Down Expand Up @@ -1094,13 +1107,34 @@ export class TerminalManagerRuntime extends EventEmitter<TerminalManagerEvents>
}

private async readHistory(threadId: string, terminalId: string): Promise<string> {
const persistenceKey = toSessionKey(threadId, terminalId);
if (this.historyKnownMissing.has(persistenceKey)) {
this.logger.info("restored terminal history", {
threadId,
terminalId,
source: "missing-cache",
durationMs: 0,
bytes: 0,
});
return "";
}

const nextPath = this.historyPath(threadId, terminalId);
const startedAt = performance.now();
try {
const raw = await fs.promises.readFile(nextPath, "utf8");
const capped = capHistory(raw, this.historyLineLimit);
if (capped !== raw) {
await fs.promises.writeFile(nextPath, capped, "utf8");
}
this.historyKnownMissing.delete(persistenceKey);
this.logger.info("restored terminal history", {
threadId,
terminalId,
source: "current",
durationMs: Math.round((performance.now() - startedAt) * 100) / 100,
bytes: capped.length,
});
return capped;
} catch (error) {
if ((error as NodeJS.ErrnoException).code !== "ENOENT") {
Expand All @@ -1109,6 +1143,14 @@ export class TerminalManagerRuntime extends EventEmitter<TerminalManagerEvents>
}

if (terminalId !== DEFAULT_TERMINAL_ID) {
this.historyKnownMissing.add(persistenceKey);
this.logger.info("restored terminal history", {
threadId,
terminalId,
source: "missing",
durationMs: Math.round((performance.now() - startedAt) * 100) / 100,
bytes: 0,
});
return "";
}

Expand All @@ -1128,22 +1170,40 @@ export class TerminalManagerRuntime extends EventEmitter<TerminalManagerEvents>
});
}

this.historyKnownMissing.delete(persistenceKey);
this.logger.info("restored terminal history", {
threadId,
terminalId,
source: "legacy",
durationMs: Math.round((performance.now() - startedAt) * 100) / 100,
bytes: capped.length,
});
return capped;
} catch (error) {
if ((error as NodeJS.ErrnoException).code === "ENOENT") {
this.historyKnownMissing.add(persistenceKey);
this.logger.info("restored terminal history", {
threadId,
terminalId,
source: "missing",
durationMs: Math.round((performance.now() - startedAt) * 100) / 100,
bytes: 0,
});
return "";
}
throw error;
}
}

private async deleteHistory(threadId: string, terminalId: string): Promise<void> {
const persistenceKey = toSessionKey(threadId, terminalId);
const deletions = [fs.promises.rm(this.historyPath(threadId, terminalId), { force: true })];
if (terminalId === DEFAULT_TERMINAL_ID) {
deletions.push(fs.promises.rm(this.legacyHistoryPath(threadId), { force: true }));
}
try {
await Promise.all(deletions);
this.historyKnownMissing.add(persistenceKey);
} catch (error) {
this.logger.warn("failed to delete terminal history", {
threadId,
Expand Down
16 changes: 16 additions & 0 deletions apps/server/src/terminal/Layers/RuntimeEnvResolver.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
import { Layer } from "effect";

import { ProjectionThreadRepositoryLive } from "../../persistence/Layers/ProjectionThreads.ts";
import { EnvironmentVariablesLive } from "../../persistence/Services/EnvironmentVariables.ts";
import {
makeTerminalRuntimeEnvResolver,
TerminalRuntimeEnvResolver,
} from "../Services/RuntimeEnvResolver.ts";

export const TerminalRuntimeEnvResolverLive = Layer.effect(
TerminalRuntimeEnvResolver,
makeTerminalRuntimeEnvResolver,
).pipe(
Layer.provideMerge(ProjectionThreadRepositoryLive),
Layer.provideMerge(EnvironmentVariablesLive),
);
111 changes: 111 additions & 0 deletions apps/server/src/terminal/Services/RuntimeEnvResolver.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,111 @@
import { Effect, Option } from "effect";
import { describe, expect, it } from "vitest";

import {
EnvironmentVariables,
type EnvironmentVariablesShape,
} from "../../persistence/Services/EnvironmentVariables.ts";
import {
type ProjectionThread,
ProjectionThreadRepository,
type ProjectionThreadRepositoryShape,
} from "../../persistence/Services/ProjectionThreads.ts";
import { makeTerminalRuntimeEnvResolver } from "./RuntimeEnvResolver.ts";

const baseThread: ProjectionThread = {
threadId: "thread-1" as never,
projectId: "project-1" as never,
title: "Thread",
model: "gpt-5.4",
runtimeMode: "full-access",
interactionMode: "chat",
branch: null,
worktreePath: null,
githubRef: null,
latestTurnId: null,
createdAt: "2026-01-01T00:00:00.000Z",
updatedAt: "2026-01-01T00:00:00.000Z",
deletedAt: null,
};

describe("TerminalRuntimeEnvResolver", () => {
it("resolves project-scoped env for a live thread and lets extra env win", async () => {
const threadRepository: ProjectionThreadRepositoryShape = {
upsert: () => Effect.void,
getById: () => Effect.succeed(Option.some(baseThread)),
listByProjectId: () => Effect.succeed([]),
deleteById: () => Effect.void,
};
const environmentVariables: EnvironmentVariablesShape = {
getGlobal: () => Effect.succeed({ entries: [] }),
saveGlobal: () => Effect.succeed({ entries: [] }),
getProject: () => Effect.succeed({ projectId: "project-1" as never, entries: [] }),
saveProject: () => Effect.succeed({ projectId: "project-1" as never, entries: [] }),
resolveEnvironment: (input) =>
Effect.succeed(
input?.projectId
? { SHARED: "project", PROJECT_ONLY: input.projectId }
: { SHARED: "global", GLOBAL_ONLY: "1" },
),
};

const resolver = await Effect.runPromise(
makeTerminalRuntimeEnvResolver.pipe(
Effect.provideService(ProjectionThreadRepository, threadRepository),
Effect.provideService(EnvironmentVariables, environmentVariables),
),
);
const resolved = await Effect.runPromise(
resolver.resolve({
threadId: "thread-1" as never,
cwd: "/repo",
extraEnv: { SHARED: "extra", EXTRA_ONLY: "1" },
}),
);

expect(resolved).toEqual({
SHARED: "extra",
PROJECT_ONLY: "project-1",
EXTRA_ONLY: "1",
});
});

it("falls back to global env when the thread is missing or deleted", async () => {
const threadRepository: ProjectionThreadRepositoryShape = {
upsert: () => Effect.void,
getById: () =>
Effect.succeed(
Option.some({
...baseThread,
deletedAt: "2026-01-02T00:00:00.000Z",
}),
),
listByProjectId: () => Effect.succeed([]),
deleteById: () => Effect.void,
};
const environmentVariables: EnvironmentVariablesShape = {
getGlobal: () => Effect.succeed({ entries: [] }),
saveGlobal: () => Effect.succeed({ entries: [] }),
getProject: () => Effect.succeed({ projectId: "project-1" as never, entries: [] }),
saveProject: () => Effect.succeed({ projectId: "project-1" as never, entries: [] }),
resolveEnvironment: (input) =>
Effect.succeed(input?.projectId ? { PROJECT_ONLY: "1" } : { GLOBAL_ONLY: "1" }),
};

const resolver = await Effect.runPromise(
makeTerminalRuntimeEnvResolver.pipe(
Effect.provideService(ProjectionThreadRepository, threadRepository),
Effect.provideService(EnvironmentVariables, environmentVariables),
),
);
const resolved = await Effect.runPromise(
resolver.resolve({
threadId: "thread-1" as never,
}),
);

expect(resolved).toEqual({
GLOBAL_ONLY: "1",
});
});
});
Loading
Loading