From 6f104271e82fe18be92ba675e44814919587ee46 Mon Sep 17 00:00:00 2001 From: Ytallo Layon Date: Mon, 8 Jun 2026 15:18:07 -0300 Subject: [PATCH 1/2] refactor(harness): unify state leases, atomic approval mutations, typed state layer Single-writer lease: - Add shared runtime/lease.ts (nonce+ttl set-CAS over atomic state::update); session-lease and compaction-lease become thin (scope, ttl) adapters. - Fix a false-win where a transient state-store outage (tolerant stateUpdate returning null) let every concurrent contender acquire the lease at once. Approval-gate settings: - Make mutations atomic and field-scoped (state::update set/append) to close the read-modify-write lost-update window; backfill partial records on read. - Route the store through the shared createState wrapper; clear via state::delete instead of writing a null tombstone. State layer typing: - Make runtime/state.ts helpers generic (stateGet/stateSet/stateUpdate); drop the dead {value}-row unwrap and the unused stateListGroups. - State is cleared on deploy, so trust types on read: drop parseTurnStateRecord, parseModelArray, and parseRunRequest in favor of typed stateGet. Keep runtime validation only at write boundaries (isModel) and shared scopes (llm-budget). - Remove redundant indirection (scopedGet/Set passthroughs, optional ports store). --- .../settings/add-always-allow.ts | 20 +- .../approval-gate/settings/approve-always.ts | 19 +- .../settings/remove-always-allow.ts | 14 +- .../src/approval-gate/settings/set-mode.ts | 11 +- harness/src/approval-gate/settings/store.ts | 76 ++++--- harness/src/context-compaction/lease.ts | 109 +++------ harness/src/models-catalog/state.ts | 24 +- harness/src/runtime/lease.ts | 80 +++++++ harness/src/runtime/state.ts | 71 ++---- harness/src/turn-orchestrator/run-request.ts | 33 ++- .../src/turn-orchestrator/run-transition.ts | 6 +- .../turn-orchestrator/state-runtime/ports.ts | 6 +- .../state-runtime/session-lease.ts | 72 ++---- .../turn-orchestrator/state-runtime/store.ts | 24 +- harness/src/turn-orchestrator/state.ts | 37 +--- harness/tests/approval-gate/settings.test.ts | 97 +++++++- .../integration/parallel-approval-harness.ts | 13 +- harness/tests/models-catalog/state.test.ts | 12 +- harness/tests/runtime/lease.test.ts | 207 ++++++++++++++++++ harness/tests/runtime/state-list.test.ts | 34 +-- .../parse-turn-state-record.test.ts | 35 --- .../turn-orchestrator/run-request.test.ts | 69 +----- .../turn-orchestrator/session-lease.test.ts | 86 ++++++++ 23 files changed, 681 insertions(+), 474 deletions(-) create mode 100644 harness/src/runtime/lease.ts create mode 100644 harness/tests/runtime/lease.test.ts delete mode 100644 harness/tests/turn-orchestrator/parse-turn-state-record.test.ts create mode 100644 harness/tests/turn-orchestrator/session-lease.test.ts diff --git a/harness/src/approval-gate/settings/add-always-allow.ts b/harness/src/approval-gate/settings/add-always-allow.ts index 7d0083ec..0dccb65a 100644 --- a/harness/src/approval-gate/settings/add-always-allow.ts +++ b/harness/src/approval-gate/settings/add-always-allow.ts @@ -5,7 +5,7 @@ import type { ApprovalSettings } from '../schemas.js'; import type { ISdk } from '../../runtime/iii.js'; import { type MutationReply, functionIdField, sessionIdField } from './types.js'; import { mutationError, ok } from './reply.js'; -import { readSettings, writeSettings } from './store.js'; +import { readSettings, updateSettings } from './store.js'; const PayloadSchema = z.object({ session_id: sessionIdField, @@ -26,15 +26,15 @@ export async function addAlwaysAllow( if (current.always_allow.some((entry) => entry.function_id === function_id)) { return current; } - const next: ApprovalSettings = { - ...current, - always_allow: [ - ...current.always_allow, - { function_id, granted_at: Date.now(), granted_by: 'user_click' }, - ], - }; - await writeSettings(iii, session_id, next); - return next; + // Known race: the pre-read only narrows the duplicate-entry window; concurrent + // adds of the same id can both append. Harmless — matched/removed set-wise. + return updateSettings(iii, session_id, [ + { + type: 'append', + path: 'always_allow', + value: { function_id, granted_at: Date.now(), granted_by: 'user_click' }, + }, + ]); } export function registerAddAlwaysAllow(iii: ISdk): void { diff --git a/harness/src/approval-gate/settings/approve-always.ts b/harness/src/approval-gate/settings/approve-always.ts index 1b0219ee..f09de5ee 100644 --- a/harness/src/approval-gate/settings/approve-always.ts +++ b/harness/src/approval-gate/settings/approve-always.ts @@ -5,7 +5,7 @@ import type { ApprovalSettings } from '../schemas.js'; import type { ISdk } from '../../runtime/iii.js'; import { type MutationReply, functionIdField, sessionIdField } from './types.js'; import { mutationError, ok } from './reply.js'; -import { readSettings, writeSettings } from './store.js'; +import { readSettings, updateSettings } from './store.js'; const PayloadSchema = z.object({ session_id: sessionIdField, @@ -28,15 +28,14 @@ export async function approveAlways( if (current.approved_always.some((entry) => entry.function_id === function_id)) { return current; } - const next: ApprovalSettings = { - ...current, - approved_always: [ - ...current.approved_always, - { function_id, granted_at: Date.now(), granted_by: 'user_click' }, - ], - }; - await writeSettings(iii, session_id, next); - return next; + // Known race: see addAlwaysAllow — pre-read only narrows the duplicate window. + return updateSettings(iii, session_id, [ + { + type: 'append', + path: 'approved_always', + value: { function_id, granted_at: Date.now(), granted_by: 'user_click' }, + }, + ]); } export function registerApproveAlways(iii: ISdk): void { diff --git a/harness/src/approval-gate/settings/remove-always-allow.ts b/harness/src/approval-gate/settings/remove-always-allow.ts index 2a6d3bff..24a26609 100644 --- a/harness/src/approval-gate/settings/remove-always-allow.ts +++ b/harness/src/approval-gate/settings/remove-always-allow.ts @@ -5,7 +5,7 @@ import type { ApprovalSettings } from '../schemas.js'; import type { ISdk } from '../../runtime/iii.js'; import { type MutationReply, functionIdField, sessionIdField } from './types.js'; import { mutationError, ok } from './reply.js'; -import { readSettings, writeSettings } from './store.js'; +import { readSettings, updateSettings } from './store.js'; const PayloadSchema = z.object({ session_id: sessionIdField, @@ -23,12 +23,12 @@ export async function removeAlwaysAllow( function_id: string, ): Promise { const current = await readSettings(iii, session_id); - const next: ApprovalSettings = { - ...current, - always_allow: current.always_allow.filter((entry) => entry.function_id !== function_id), - }; - await writeSettings(iii, session_id, next); - return next; + const always_allow = current.always_allow.filter((entry) => entry.function_id !== function_id); + // No array-element-remove op, so set just the always_allow field (not the whole + // record). Known race: concurrent add/remove on this field is last-writer-wins. + return updateSettings(iii, session_id, [ + { type: 'set', path: 'always_allow', value: always_allow }, + ]); } export function registerRemoveAlwaysAllow(iii: ISdk): void { diff --git a/harness/src/approval-gate/settings/set-mode.ts b/harness/src/approval-gate/settings/set-mode.ts index 604b1b17..b4f61d4c 100644 --- a/harness/src/approval-gate/settings/set-mode.ts +++ b/harness/src/approval-gate/settings/set-mode.ts @@ -4,7 +4,7 @@ import { zodToJsonSchema } from 'zod-to-json-schema'; import { PermissionModeSchema, type ApprovalSettings, type PermissionMode } from '../schemas.js'; import type { ISdk } from '../../runtime/iii.js'; import { mutationError, ok } from './reply.js'; -import { readSettings, writeSettings } from './store.js'; +import { updateSettings } from './store.js'; import { type MutationReply, sessionIdField } from './types.js'; const PayloadSchema = z.object({ @@ -22,10 +22,11 @@ export async function setMode( session_id: string, mode: PermissionMode, ): Promise { - const current = await readSettings(iii, session_id); - const next: ApprovalSettings = { ...current, mode, mode_set_at: Date.now() }; - await writeSettings(iii, session_id, next); - return next; + // Field-scoped, no read: disjoint from add_always_allow/approve_always so both survive. + return updateSettings(iii, session_id, [ + { type: 'set', path: 'mode', value: mode }, + { type: 'set', path: 'mode_set_at', value: Date.now() }, + ]); } export function registerSetMode(iii: ISdk): void { diff --git a/harness/src/approval-gate/settings/store.ts b/harness/src/approval-gate/settings/store.ts index 64ec2899..b7d6d582 100644 --- a/harness/src/approval-gate/settings/store.ts +++ b/harness/src/approval-gate/settings/store.ts @@ -5,50 +5,64 @@ import { type ApprovalSettings, } from '../schemas.js'; import type { ISdk } from '../../runtime/iii.js'; -import { logger } from '../../runtime/otel.js'; +import { createState, type UpdateOp } from '../../runtime/state.js'; import { getDefaultMode } from './default-mode.js'; -/** - * Default settings for a session that has none stored yet. The `mode` - * comes from the harness `permissions.default_mode` (manual | auto | full) - * rather than the hardcoded constant, so the operator-configured default - * applies to new sessions. - */ +// Reads degrade to defaults on failure (tolerant); writes rethrow so handlers +// can reply with mutationError (strict). +const tolerantState = (iii: ISdk) => createState(iii, { tolerant: true }); +const strictState = (iii: ISdk) => createState(iii, { tolerant: false }); + +/** Default settings; `mode` follows the operator-configured permissions.default_mode. */ function defaultSettings(): ApprovalSettings { return { ...DEFAULT_APPROVAL_SETTINGS, mode: getDefaultMode() }; } +/** + * Backfill missing fields from defaults, then validate. Field-scoped writes can + * persist a partial record (the first add_always_allow stores only + * `{ always_allow }`); merging over defaults keeps it valid on read. + */ +export function parseSettings(raw: unknown): ApprovalSettings { + const base = defaultSettings(); + const merged = + raw && typeof raw === 'object' && !Array.isArray(raw) + ? { ...base, ...(raw as Record) } + : base; + const parsed = ApprovalSettingsSchema.safeParse(merged); + return parsed.success ? parsed.data : base; +} + export async function readSettings(iii: ISdk, session_id: string): Promise { - try { - const raw = await iii.trigger({ - function_id: 'state::get', - payload: { scope: SETTINGS_STATE_SCOPE, key: session_id }, - }); - const parsed = ApprovalSettingsSchema.safeParse(raw); - return parsed.success ? parsed.data : defaultSettings(); - } catch (err) { - logger.warn('approval-settings read failed; using defaults', { - session_id, - err: String(err), - }); - return defaultSettings(); - } + const raw = await tolerantState(iii).get({ + scope: SETTINGS_STATE_SCOPE, + key: session_id, + }); + return parseSettings(raw); } -export async function writeSettings( +/** + * Apply field-scoped ops atomically. Each op targets one field, so concurrent + * mutations of different fields compose under the engine's per-key write-lock + * instead of clobbering the whole record (the old read/write-whole lost update). + */ +export async function updateSettings( iii: ISdk, session_id: string, - settings: ApprovalSettings, -): Promise { - await iii.trigger({ - function_id: 'state::set', - payload: { scope: SETTINGS_STATE_SCOPE, key: session_id, value: settings }, + ops: UpdateOp[], +): Promise { + const result = await strictState(iii).update({ + scope: SETTINGS_STATE_SCOPE, + key: session_id, + ops, }); + if (result?.errors && result.errors.length > 0) { + throw new Error(`approval-settings update rejected: ${JSON.stringify(result.errors)}`); + } + return parseSettings(result?.new_value ?? null); } export async function clearSettings(iii: ISdk, session_id: string): Promise { - await iii.trigger({ - function_id: 'state::set', - payload: { scope: SETTINGS_STATE_SCOPE, key: session_id, value: null }, - }); + // Delete the key rather than write a null tombstone; reads default either way. + await strictState(iii).delete({ scope: SETTINGS_STATE_SCOPE, key: session_id }); } diff --git a/harness/src/context-compaction/lease.ts b/harness/src/context-compaction/lease.ts index 4bcf49d6..cc41edf6 100644 --- a/harness/src/context-compaction/lease.ts +++ b/harness/src/context-compaction/lease.ts @@ -1,10 +1,16 @@ -// Single-writer lease via atomic state::update. The set-and-return-old -// semantics close the read-modify-write race: a concurrent writer sees -// our nonce in old_value and restores its predecessor. +/** + * Compaction/prune leases — thin (scope, ttl) adapters over the shared lease in + * runtime/lease.ts. Maps a LeaseKind to a state scope and the 300s TTL. + */ -import { randomUUID } from 'node:crypto'; import type { ISdk } from '../runtime/iii.js'; -import { stateGet, stateSet, stateUpdate } from '../runtime/state.js'; +import { + acquireLease as acquireLeaseShared, + acquireLeaseWithWait as acquireLeaseWithWaitShared, + mintLeaseNonce, + releaseLease as releaseLeaseShared, +} from '../runtime/lease.js'; +import { stateSet } from '../runtime/state.js'; const COMPACTION_LEASE_SCOPE = 'compaction_lease'; const PRUNE_LEASE_SCOPE = 'prune_lease'; @@ -13,30 +19,19 @@ const LAST_COMPACTION_AT_SCOPE = 'last_compaction_at'; export type LeaseKind = 'compaction' | 'prune'; export const LEASE_TTL_SECS = 300; +const LEASE_TTL_MS = LEASE_TTL_SECS * 1000; -let counter = 0; +export { mintLeaseNonce }; function leaseScope(kind: LeaseKind): string { return kind === 'compaction' ? COMPACTION_LEASE_SCOPE : PRUNE_LEASE_SCOPE; } -export function mintLeaseNonce(): string { - const pid = process.pid; - const nanos = process.hrtime.bigint().toString(); - const seq = counter++; - return `${pid}-${nanos}-${seq}-${randomUUID().slice(0, 8)}`; -} - +/** Epoch-secs of a `{nonce, ts}` lease claim; 0 for any other shape. */ export function readLeaseTimestampSecs(v: unknown): number { if (!v || typeof v !== 'object') return 0; const ts = (v as Record).ts; - if (typeof ts === 'number') return Math.floor(ts / 1000); - return 0; -} - -function isLeaseActive(v: unknown, now_secs: number): boolean { - const ts_secs = readLeaseTimestampSecs(v); - return ts_secs > 0 && now_secs - ts_secs < LEASE_TTL_SECS; + return typeof ts === 'number' ? Math.floor(ts / 1000) : 0; } export async function acquireLease( @@ -44,66 +39,16 @@ export async function acquireLease( session_id: string, kind: LeaseKind = 'compaction', ): Promise { - const scope = leaseScope(kind); - const key = session_id; - const now_ms = Date.now(); - const now_secs = Math.floor(now_ms / 1000); - - // Fast path: skip the atomic set when a valid lease is clearly held. - const existing = await stateGet(iii, scope, key); - if (existing && isLeaseActive(existing, now_secs)) return null; - - const nonce = mintLeaseNonce(); - // path: '' targets FieldPath::root in the engine — set the whole value - // atomically. Without `path`, the engine fails to deserialize the op and - // stateUpdate falls into its catch + returns null. - const result = await stateUpdate(iii, scope, key, [ - { type: 'set', path: '', value: { nonce, ts: now_ms } }, - ]); - // stateUpdate swallows backend errors and returns null. Treat a null - // envelope as "we did NOT acquire the lease" — otherwise a transient - // state-store failure would let every concurrent caller declare itself - // the winner, defeating the whole point of the lease. - if (!result) return null; - const oldValue = result.old_value; - - // If we just overwrote a concurrent acquirer's active lease, restore it - // and bow out. stateUpdate is atomic, so only one caller can see - // old_value == null (or expired) — exactly one winner. - if (oldValue && isLeaseActive(oldValue, now_secs)) { - await stateSet(iii, scope, key, oldValue); - return null; - } - return nonce; + return acquireLeaseShared(iii, leaseScope(kind), session_id, LEASE_TTL_MS); } -// Known race: between the stateGet and the stateSet a new acquirer could -// claim the lease, and our stateSet(null) would clear it. Closing this -// needs a CAS-like primitive in iii-database (delete-if-matches). In -// practice this is bounded by LEASE_TTL_SECS (300s) >> the summariser's -// own timeout (120s), so a duplicate claim resolves itself via TTL. export async function releaseLease( iii: ISdk, session_id: string, ourNonce: string, kind: LeaseKind = 'compaction', ): Promise { - const scope = leaseScope(kind); - const key = session_id; - const stored = await stateGet(iii, scope, key); - const storedNonce = - stored && - typeof stored === 'object' && - typeof (stored as Record).nonce === 'string' - ? ((stored as Record).nonce as string) - : null; - if (storedNonce === ourNonce) { - await stateSet(iii, scope, key, null); - } -} - -export async function stampLastCompaction(iii: ISdk, session_id: string): Promise { - await stateSet(iii, LAST_COMPACTION_AT_SCOPE, session_id, Date.now()); + await releaseLeaseShared(iii, leaseScope(kind), session_id, ourNonce); } export async function acquireLeaseWithWait( @@ -112,13 +57,15 @@ export async function acquireLeaseWithWait( kind: LeaseKind, totalTimeoutMs: number, ): Promise { - const deadline = Date.now() + totalTimeoutMs; - let backoff = 50; - while (true) { - const got = await acquireLease(iii, session_id, kind); - if (got) return got; - if (Date.now() + backoff > deadline) return null; - await new Promise((r) => setTimeout(r, backoff)); - backoff = Math.min(backoff * 2, 500); - } + return acquireLeaseWithWaitShared( + iii, + leaseScope(kind), + session_id, + LEASE_TTL_MS, + totalTimeoutMs, + ); +} + +export async function stampLastCompaction(iii: ISdk, session_id: string): Promise { + await stateSet(iii, LAST_COMPACTION_AT_SCOPE, session_id, Date.now()); } diff --git a/harness/src/models-catalog/state.ts b/harness/src/models-catalog/state.ts index 7ca97765..46b63083 100644 --- a/harness/src/models-catalog/state.ts +++ b/harness/src/models-catalog/state.ts @@ -1,31 +1,24 @@ /** - * State-backed reads for the models catalog. Each provider owns one key in - * scope `models` whose value is a `Model[]`. Discovery writes via - * `models::reconcile` (single state set per provider). + * State-backed reads for the models catalog: scope `models`, one Model[] per + * provider key. `models::reconcile` validates entries via isModel before writing + * and state is cleared on deploy, so reads trust the stored shape (no re-parsing). */ import type { ISdk } from '../runtime/iii.js'; import { stateGet, stateListValues } from '../runtime/state.js'; import { type ListFilter, MODELS_SCOPE, type Model, supportsModel } from './types.js'; -/** State key for a provider's catalog array (scope `models`, key = provider id). */ export function providerStateKey(provider: string): string { return provider; } +/** Write-side boundary guard for provider-discovery output (used by models::reconcile). */ export function isModel(v: unknown): v is Model { return Boolean(v && typeof v === 'object' && typeof (v as Model).id === 'string'); } -/** Parse a stored catalog value; only `Model[]` is valid. */ -export function parseModelArray(v: unknown): Model[] { - if (!Array.isArray(v)) return []; - return v.filter(isModel); -} - export async function getProviderModels(iii: ISdk, provider: string): Promise { - const v = await stateGet(iii, MODELS_SCOPE, providerStateKey(provider)); - return parseModelArray(v); + return (await stateGet(iii, MODELS_SCOPE, providerStateKey(provider))) ?? []; } export async function listFromState(iii: ISdk, filter: ListFilter): Promise { @@ -36,11 +29,8 @@ export async function listFromState(iii: ISdk, filter: ListFilter): Promise supportsModel(m, filter.capability!)); } - const entries = await stateListValues(iii, { scope: MODELS_SCOPE }); - const out: Model[] = []; - for (const entry of entries) { - out.push(...parseModelArray(entry)); - } + // Each provider key stores one Model[]; flatten across providers. + const out = (await stateListValues(iii, { scope: MODELS_SCOPE })).flat(); return filter.capability === undefined ? out : out.filter((m) => supportsModel(m, filter.capability!)); diff --git a/harness/src/runtime/lease.ts b/harness/src/runtime/lease.ts new file mode 100644 index 00000000..4e4676c3 --- /dev/null +++ b/harness/src/runtime/lease.ts @@ -0,0 +1,80 @@ +/** + * Single-writer lease over `state::update` — the only atomic read-modify-write + * iii exposes (there's no native lock). Acquire writes a `{nonce, ts}` claim and + * reads the prior value in one atomic op, so exactly one concurrent acquirer + * sees a free/expired prior and wins; a claim older than `ttlMs` reads inactive, + * folding crash recovery into the same op. session-lease and + * context-compaction/lease are thin (scope, ttl) adapters. + */ + +import { randomUUID } from 'node:crypto'; +import type { ISdk } from './iii.js'; +import { stateGet, stateSet, stateUpdate } from './state.js'; + +export type LeaseRecord = { nonce: string; ts: number }; + +let counter = 0; + +export function mintLeaseNonce(): string { + return `${process.pid}-${process.hrtime.bigint()}-${counter++}-${randomUUID().slice(0, 8)}`; +} + +function isLeaseActive(rec: LeaseRecord | null | undefined, nowMs: number, ttlMs: number): boolean { + return rec != null && nowMs - rec.ts < ttlMs; +} + +/** Returns the nonce to release with, or null if another holder owns a valid lease. */ +export async function acquireLease( + iii: ISdk, + scope: string, + key: string, + ttlMs: number, +): Promise { + const now = Date.now(); + if (isLeaseActive(await stateGet(iii, scope, key), now, ttlMs)) return null; + + const nonce = mintLeaseNonce(); + const result = await stateUpdate(iii, scope, key, [ + { type: 'set', path: '', value: { nonce, ts: now } }, + ]); + // A null envelope means the atomic write failed — never treat that as a win, + // or an outage lets every contender through. + if (!result) return null; + + // We clobbered a still-valid claim (always someone else's — our nonce is + // fresh): restore it and bow out. + if (isLeaseActive(result.old_value, now, ttlMs)) { + await stateSet(iii, scope, key, result.old_value); + return null; + } + return nonce; +} + +export async function releaseLease( + iii: ISdk, + scope: string, + key: string, + nonce: string, +): Promise { + const stored = await stateGet(iii, scope, key); + if (stored?.nonce === nonce) await stateSet(iii, scope, key, null); +} + +/** Poll {@link acquireLease} with exponential backoff until acquired or timed out. */ +export async function acquireLeaseWithWait( + iii: ISdk, + scope: string, + key: string, + ttlMs: number, + totalTimeoutMs: number, +): Promise { + const deadline = Date.now() + totalTimeoutMs; + let backoff = 50; + while (true) { + const nonce = await acquireLease(iii, scope, key, ttlMs); + if (nonce) return nonce; + if (Date.now() + backoff > deadline) return null; + await new Promise((r) => setTimeout(r, backoff)); + backoff = Math.min(backoff * 2, 500); + } +} diff --git a/harness/src/runtime/state.ts b/harness/src/runtime/state.ts index 01762249..7e160292 100644 --- a/harness/src/runtime/state.ts +++ b/harness/src/runtime/state.ts @@ -40,38 +40,11 @@ export type CreateStateOptions = { tolerant?: boolean; }; -type StateListGroupsResult = { groups: string[] }; - function normalizeGetResult(v: unknown): T | null { if (v === null || v === undefined) return null; return v as T; } -/** Raw list rows before value unwrap; `null` when the response is not a list. */ -export function stateListResponseRows(response: unknown): unknown[] | null { - if (Array.isArray(response)) return response; - return null; -} - -function unwrapStateListEntry(entry: unknown): T { - if (entry && typeof entry === 'object' && 'value' in (entry as Record)) { - return (entry as Record).value as T; - } - return entry as T; -} - -/** - * Normalizes a `state::list` trigger result to stored values. - * - * Official iii returns a flat `T[]`. Some bridge deployments wrap rows as - * `{ value }`; we accept that shape for compatibility. - */ -export function parseStateListValues(response: unknown): T[] { - const arr = stateListResponseRows(response); - if (!arr) return []; - return arr.map((entry) => unwrapStateListEntry(entry)); -} - export function createState(iii: ISdk, opts: CreateStateOptions = {}): IState { const tolerant = opts.tolerant !== false; @@ -140,11 +113,12 @@ export function createState(iii: ISdk, opts: CreateStateOptions = {}): IState { 'state::list', { scope: input.scope }, async () => { + // state::list returns a flat array of stored values. const resp = await iii.trigger({ function_id: 'state::list', payload: input, }); - return parseStateListValues(resp); + return Array.isArray(resp) ? (resp as TData[]) : []; }, [], ), @@ -165,40 +139,25 @@ export function createState(iii: ISdk, opts: CreateStateOptions = {}): IState { }; } -/** Lists all scope names that contain state data. */ -export async function stateListGroups(iii: ISdk, opts: CreateStateOptions = {}): Promise { - const tolerant = opts.tolerant !== false; - try { - const result = await iii.trigger, StateListGroupsResult | string[]>({ - function_id: 'state::list_groups', - payload: {}, - }); - if (Array.isArray(result)) return result; - return result?.groups ?? []; - } catch (err) { - if (tolerant) { - logger.warn('state::list_groups failed', { err: String(err) }); - return []; - } - throw err; - } -} - // --- Tolerant (scope, key) ergonomics for turn-orchestrator --- const tolerantState = (iii: ISdk) => createState(iii, { tolerant: true }); -export async function stateGet(iii: ISdk, scope: string, key: string): Promise { - return tolerantState(iii).get({ scope, key }); +export async function stateGet( + iii: ISdk, + scope: string, + key: string, +): Promise { + return tolerantState(iii).get({ scope, key }); } -export async function stateSet( +export async function stateSet( iii: ISdk, scope: string, key: string, - value: unknown, -): Promise | null> { - return tolerantState(iii).set({ scope, key, value }); + value: T, +): Promise | null> { + return tolerantState(iii).set({ scope, key, value }); } export async function stateDelete(iii: ISdk, scope: string, key: string): Promise { @@ -209,11 +168,11 @@ export async function stateListValues(iii: ISdk, input: StateListInput): Prom return tolerantState(iii).list(input); } -export async function stateUpdate( +export async function stateUpdate( iii: ISdk, scope: string, key: string, ops: UpdateOp[], -): Promise | null> { - return tolerantState(iii).update({ scope, key, ops }); +): Promise | null> { + return tolerantState(iii).update({ scope, key, ops }); } diff --git a/harness/src/turn-orchestrator/run-request.ts b/harness/src/turn-orchestrator/run-request.ts index 04d8b868..225b4660 100644 --- a/harness/src/turn-orchestrator/run-request.ts +++ b/harness/src/turn-orchestrator/run-request.ts @@ -1,27 +1,22 @@ /** - * The persisted run request and its single typed parser. `loadRunRequest` - * (persistence) parses the raw scope `run_request` value through - * `parseRunRequest` once, so every consumer reads a fully-typed `RunRequest` - * instead of re-guarding `unknown` fields. + * The persisted run request. Writers (`run::start`, then `provisioning`) always + * store a complete record and state is cleared on deploy, so it's read back typed + * (not re-parsed); `loadRunRequest` falls back to defaultRunRequest when absent. */ -import { z } from 'zod'; import type { Mode } from './system-prompt.js'; -const RunRequestSchema = z.object({ - provider: z.string().catch(''), - model: z.string().catch(''), - mode: z - .unknown() - .transform((v): Mode | null => (v === 'plan' || v === 'ask' || v === 'agent' ? v : null)), - system_prompt: z.string().catch(''), - function_schemas: z.array(z.unknown()).catch([]), +export type RunRequest = { + provider: string; + model: string; + mode: Mode | null; + system_prompt: string; + function_schemas: unknown[]; /** Optional reasoning/thinking level ('off'|'minimal'|'low'|'medium'|'high'|'xhigh'). */ - thinking_level: z.string().optional().catch(undefined), -}); + thinking_level?: string; +}; -export type RunRequest = z.infer; - -export function parseRunRequest(raw: unknown): RunRequest { - return RunRequestSchema.parse(raw ?? {}); +/** Empty run request used as the absent-record fallback in `loadRunRequest`. */ +export function defaultRunRequest(): RunRequest { + return { provider: '', model: '', mode: null, system_prompt: '', function_schemas: [] }; } diff --git a/harness/src/turn-orchestrator/run-transition.ts b/harness/src/turn-orchestrator/run-transition.ts index 15c6af52..54c08194 100644 --- a/harness/src/turn-orchestrator/run-transition.ts +++ b/harness/src/turn-orchestrator/run-transition.ts @@ -88,8 +88,8 @@ export async function runTransition( if (!options?.serialize) { return runTransitionInner(iii, state, handle, payload); } - const acquired = await acquireSessionLease(iii, payload.session_id); - if (!acquired) { + const leaseNonce = await acquireSessionLease(iii, payload.session_id); + if (!leaseNonce) { // Another wake holds the session. Retry via the queue once it releases; // by then the persisted state has usually advanced and we stale-skip. throw new TransientError( @@ -99,7 +99,7 @@ export async function runTransition( try { return await runTransitionInner(iii, state, handle, payload); } finally { - await releaseSessionLease(iii, payload.session_id); + await releaseSessionLease(iii, payload.session_id, leaseNonce); } } diff --git a/harness/src/turn-orchestrator/state-runtime/ports.ts b/harness/src/turn-orchestrator/state-runtime/ports.ts index c7c117ae..542263af 100644 --- a/harness/src/turn-orchestrator/state-runtime/ports.ts +++ b/harness/src/turn-orchestrator/state-runtime/ports.ts @@ -7,7 +7,7 @@ import type { RunRequest } from '../run-request.js'; import type { ISdk } from '../../runtime/iii.js'; import type { AgentMessage, FunctionResultMessage } from '../../types/agent-message.js'; import { transitionTo, type TurnStateRecord } from '../state.js'; -import { createTurnStore, type TurnStore } from './store.js'; +import { createTurnStore } from './store.js'; export type TurnStatePorts = { loadMessages(session_id: string): Promise; @@ -23,8 +23,8 @@ export type TurnStatePorts = { finishSession(rec: TurnStateRecord): Promise; }; -export function createTurnStatePorts(iii: ISdk, store?: TurnStore): TurnStatePorts { - const s = store ?? createTurnStore(iii); +export function createTurnStatePorts(iii: ISdk): TurnStatePorts { + const s = createTurnStore(iii); return { loadMessages(session_id) { diff --git a/harness/src/turn-orchestrator/state-runtime/session-lease.ts b/harness/src/turn-orchestrator/state-runtime/session-lease.ts index 90da1598..2ae05a65 100644 --- a/harness/src/turn-orchestrator/state-runtime/session-lease.ts +++ b/harness/src/turn-orchestrator/state-runtime/session-lease.ts @@ -1,69 +1,29 @@ /** - * Per-session mutual-exclusion lease for turn FSM transitions. + * Per-session mutual-exclusion lease for turn FSM transitions — a thin + * (scope, ttl) adapter over the shared lease in runtime/lease.ts. * - * The `turn-step` durable queue has no per-session ordering — `Enqueue` takes - * only a queue name (see iii-sdk `TriggerAction.Enqueue`). So two - * `turn::function_awaiting_approval` wakes for one session (one per - * `approval::resolve` write, fanned out by the `turn::on_approval` state - * trigger) can run concurrently. Without serialization both load the same - * parked `turn_state`, execute every call, and finalize — duplicating side - * effects (the function runs twice) and emitting duplicate - * `function_execution_end` / `turn_end` frames, which wedges the turn. - * - * The only atomic primitive the state worker exposes is `state::update` with - * `increment` — a locked read-modify-write that returns the prior value (the - * kv adapter holds the store write-lock for the whole op). Acquire increments - * a per-session holder counter; the caller that observes prior `0` (or a - * missing key) won and may proceed. Release resets the counter to `0`. - * - * Crash recovery: a holder that dies mid-transition never resets the counter, - * which would wedge the session forever. A contender whose acquire fails - * therefore steals a lease whose recorded acquire time is older than - * {@link LEASE_TTL_MS}. The steal is best-effort (a post-crash window can let - * two contenders through), but that degrades to the pre-fix behavior only - * briefly after a crash — far better than a permanent deadlock. + * The `turn-step` durable queue has no per-session ordering, so two + * `turn::function_awaiting_approval` wakes (one per `approval::resolve`, fanned + * out by `turn::on_approval`) can run concurrently and double-execute the parked + * turn. The lease lets one through; the loser throws TransientError and retries + * via the queue, by which point the state has usually advanced and it stale-skips. */ import type { ISdk } from '../../runtime/iii.js'; -import { stateGet, stateSet, stateUpdate } from '../../runtime/state.js'; +import { acquireLease, releaseLease } from '../../runtime/lease.js'; export const LEASE_SCOPE = 'turn_lease'; -export const LEASE_AT_SCOPE = 'turn_lease_at'; /** A holder older than this is assumed crashed and may be stolen. */ export const LEASE_TTL_MS = 30_000; -/** Atomically bump the holder counter; returns the prior count (0 when free/missing). */ -async function bumpHolders(iii: ISdk, session_id: string): Promise { - const res = await stateUpdate(iii, LEASE_SCOPE, session_id, [ - { type: 'increment', path: '', by: 1 }, - ]); - const prior = (res as { old_value?: unknown } | null)?.old_value; - return typeof prior === 'number' ? prior : 0; -} - -/** - * Try to acquire the session lease. Returns `true` when this caller holds it - * and must call {@link releaseSessionLease}; `false` when another transition - * holds it (the caller should back off / retry). - */ -export async function acquireSessionLease(iii: ISdk, session_id: string): Promise { - if ((await bumpHolders(iii, session_id)) === 0) { - await stateSet(iii, LEASE_AT_SCOPE, session_id, Date.now()); - return true; - } - // Contended — recover a lease abandoned by a crashed holder. - const acquiredAt = await stateGet(iii, LEASE_AT_SCOPE, session_id); - if (typeof acquiredAt === 'number' && Date.now() - acquiredAt > LEASE_TTL_MS) { - await stateSet(iii, LEASE_SCOPE, session_id, 0); - if ((await bumpHolders(iii, session_id)) === 0) { - await stateSet(iii, LEASE_AT_SCOPE, session_id, Date.now()); - return true; - } - } - return false; +export async function acquireSessionLease(iii: ISdk, session_id: string): Promise { + return acquireLease(iii, LEASE_SCOPE, session_id, LEASE_TTL_MS); } -/** Release the session lease. Safe to call only from the holder. */ -export async function releaseSessionLease(iii: ISdk, session_id: string): Promise { - await stateSet(iii, LEASE_SCOPE, session_id, 0); +export async function releaseSessionLease( + iii: ISdk, + session_id: string, + nonce: string, +): Promise { + await releaseLease(iii, LEASE_SCOPE, session_id, nonce); } diff --git a/harness/src/turn-orchestrator/state-runtime/store.ts b/harness/src/turn-orchestrator/state-runtime/store.ts index c135249d..15c2fb13 100644 --- a/harness/src/turn-orchestrator/state-runtime/store.ts +++ b/harness/src/turn-orchestrator/state-runtime/store.ts @@ -9,9 +9,9 @@ import { logger } from '../../runtime/otel.js'; import type { AgentMessage } from '../../types/agent-message.js'; import { RUN_REQUEST_SCOPE, TURN_STATE_SCOPE } from '../state.js'; import { emit } from '../events.js'; -import { type RunRequest, parseRunRequest } from '../run-request.js'; +import { type RunRequest, defaultRunRequest } from '../run-request.js'; import { toView, type TurnStateView } from '../schemas.js'; -import { type TurnState, type TurnStateRecord, parseTurnStateRecord } from '../state.js'; +import { type TurnState, type TurnStateRecord } from '../state.js'; import { loadContextView } from './context-view.js'; /** @@ -55,11 +55,6 @@ export type TurnStore = { saveRunRequest(session_id: string, request: RunRequest): Promise; }; -const scopedGet = (iii: ISdk, scope: string, session_id: string) => - stateGet(iii, scope, session_id); -const scopedSet = (iii: ISdk, scope: string, session_id: string, value: unknown) => - stateSet(iii, scope, session_id, value); - /** * Create the session-tree record if absent. Idempotent, but invoked exactly * once per run (at `run::start`) rather than wrapping every read/write — the @@ -101,8 +96,8 @@ async function persistRecord( rec: TurnStateRecord, previous?: TurnStateRecord | null, ): Promise { - const result = await scopedSet(iii, TURN_STATE_SCOPE, rec.session_id, rec); - const prev = previous !== undefined ? previous : parseTurnStateRecord(result?.old_value ?? null); + const result = await stateSet(iii, TURN_STATE_SCOPE, rec.session_id, rec); + const prev = previous !== undefined ? previous : (result?.old_value ?? null); const nextView = toView(rec); const prevView = prev != null ? toView(prev) : undefined; @@ -125,11 +120,12 @@ async function persistRecord( export function createTurnStore(iii: ISdk): TurnStore { return { async loadRecord(session_id) { - return parseTurnStateRecord(await scopedGet(iii, TURN_STATE_SCOPE, session_id)); + // null = absent (no session); otherwise a record this version wrote. + return stateGet(iii, TURN_STATE_SCOPE, session_id); }, async writeRecord(rec) { - await scopedSet(iii, TURN_STATE_SCOPE, rec.session_id, rec); + await stateSet(iii, TURN_STATE_SCOPE, rec.session_id, rec); }, async saveRecord(rec, previous) { @@ -158,11 +154,13 @@ export function createTurnStore(iii: ISdk): TurnStore { }, async saveRunRequest(session_id, request) { - await scopedSet(iii, RUN_REQUEST_SCOPE, session_id, request); + await stateSet(iii, RUN_REQUEST_SCOPE, session_id, request); }, async loadRunRequest(session_id) { - return parseRunRequest(await scopedGet(iii, RUN_REQUEST_SCOPE, session_id)); + return ( + (await stateGet(iii, RUN_REQUEST_SCOPE, session_id)) ?? defaultRunRequest() + ); }, }; } diff --git a/harness/src/turn-orchestrator/state.ts b/harness/src/turn-orchestrator/state.ts index 32785f87..22a9575f 100644 --- a/harness/src/turn-orchestrator/state.ts +++ b/harness/src/turn-orchestrator/state.ts @@ -1,12 +1,9 @@ /** - * TurnState + TurnStateRecord types and parsers. - * - * Persistence uses semantic iii scopes (`turn_state`, `run_request`, …). Conversation - * history lives in `session-tree::*` and is reconstructed at read time. - * keyed by `session_id`. Recovery lists scope `turn_state` via {@link parseTurnStateRecord}. + * TurnState + TurnStateRecord types and constructors. Persisted under iii scope + * `turn_state` keyed by session_id; conversation history lives in `session-tree::*`. + * State is cleared on deploy, so records are read back typed (no re-parsing). */ -import { z } from 'zod'; import type { AssistantMessage, FunctionResultMessage } from '../types/agent-message.js'; import type { ExecutedCall, FunctionBatchWork, PreparedCall } from './function-execute/types.js'; @@ -90,34 +87,6 @@ export type TurnStateRecord = awaiting_approval?: AwaitingApprovalEntry[]; }); -const TURN_STATES = [ - 'provisioning', - 'assistant_streaming', - 'function_execute', - 'function_awaiting_approval', - 'steering_check', - 'stopped', - 'failed', -] as const satisfies readonly TurnState[]; - -/** Minimal structural guard for persisted turn_state — nested fields pass through. */ -const TurnStateRecordSchema = z - .object({ - session_id: z.string(), - state: z.enum(TURN_STATES), - turn_count: z.number().catch(0), - function_results: z.array(z.unknown()).catch([]), - turn_end_emitted: z.boolean().catch(false), - started_at_ms: z.number().catch(0), - updated_at_ms: z.number().catch(0), - }) - .passthrough(); - -export function parseTurnStateRecord(raw: unknown): TurnStateRecord | null { - const result = TurnStateRecordSchema.safeParse(raw); - return result.success ? (result.data as TurnStateRecord) : null; -} - export function newRecord(session_id: string, max_turns?: number): TurnStateRecord { const now = Date.now(); return { diff --git a/harness/tests/approval-gate/settings.test.ts b/harness/tests/approval-gate/settings.test.ts index e8b55c61..512c12ba 100644 --- a/harness/tests/approval-gate/settings.test.ts +++ b/harness/tests/approval-gate/settings.test.ts @@ -5,12 +5,17 @@ import { addAlwaysAllow } from '../../src/approval-gate/settings/add-always-allo import { approveAlways } from '../../src/approval-gate/settings/approve-always.js'; import { removeAlwaysAllow } from '../../src/approval-gate/settings/remove-always-allow.js'; import { setMode } from '../../src/approval-gate/settings/set-mode.js'; -import { readSettings } from '../../src/approval-gate/settings/store.js'; +import { clearSettings, readSettings } from '../../src/approval-gate/settings/store.js'; import { SETTINGS_STATE_SCOPE } from '../../src/approval-gate/schemas.js'; interface TriggerCall { function_id: string; - payload: { scope: string; key: string; value?: unknown }; + payload: { + scope: string; + key: string; + value?: unknown; + ops?: Array<{ type: string; path?: string; value?: unknown }>; + }; } function makeIii(initial: unknown = null) { @@ -27,6 +32,30 @@ function makeIii(initial: unknown = null) { else store.set(req.payload.key, req.payload.value); return null; } + if (req.function_id === 'state::delete') { + const old_value = store.get(req.payload.key) ?? null; + store.delete(req.payload.key); + return { old_value }; + } + // Atomic field-scoped update (synchronous read-modify-write) modelling the + // `set` and `append` ops the settings mutations use. + if (req.function_id === 'state::update') { + const old_value = store.get(req.payload.key) ?? null; + const rec: Record = + old_value && typeof old_value === 'object' && !Array.isArray(old_value) + ? { ...(old_value as Record) } + : {}; + for (const op of req.payload.ops ?? []) { + const path = op.path ?? ''; + if (op.type === 'set') rec[path] = op.value; + else if (op.type === 'append') { + const arr = Array.isArray(rec[path]) ? (rec[path] as unknown[]) : []; + rec[path] = [...arr, op.value]; + } + } + store.set(req.payload.key, rec); + return { old_value, new_value: rec }; + } throw new Error(`unexpected trigger ${req.function_id}`); }); return { iii: { trigger } as unknown as ISdk, calls, store }; @@ -41,6 +70,19 @@ describe('approval-gate settings', () => { expect(s.mode_set_at).toBe(0); }); + it('clearSettings deletes the key (no null tombstone) and reverts to defaults', async () => { + const { iii, store, calls } = makeIii(); + await setMode(iii, 'sess-1', 'auto'); + expect(store.has('sess-1')).toBe(true); + + await clearSettings(iii, 'sess-1'); + + expect(store.has('sess-1')).toBe(false); + expect(calls.some((c) => c.function_id === 'state::delete')).toBe(true); + const s = await readSettings(iii, 'sess-1'); + expect(s.mode).toBe('manual'); + }); + it('setMode persists with mode_set_at > 0', async () => { const { iii, store } = makeIii(); const result = await setMode(iii, 'sess-1', 'auto'); @@ -87,11 +129,60 @@ describe('approval-gate settings', () => { it('writes go to the SETTINGS_STATE_SCOPE keyed by session_id', async () => { const { iii, calls } = makeIii(); await setMode(iii, 'sess-1', 'full'); - const write = calls.find((c) => c.function_id === 'state::set'); + const write = calls.find( + (c) => c.function_id === 'state::set' || c.function_id === 'state::update', + ); expect(write?.payload.scope).toBe(SETTINGS_STATE_SCOPE); expect(write?.payload.key).toBe('sess-1'); }); + // --- Atomic field-scoped mutations: disjoint fields must not clobber. --- + + it('concurrent setMode and addAlwaysAllow both persist (no lost update)', async () => { + const { iii } = makeIii(); + await Promise.all([ + setMode(iii, 'sess-1', 'auto'), + addAlwaysAllow(iii, 'sess-1', 'shell::exec'), + ]); + const s = await readSettings(iii, 'sess-1'); + expect(s.mode).toBe('auto'); + expect(s.always_allow.map((e) => e.function_id)).toEqual(['shell::exec']); + }); + + it('concurrent setMode and approveAlways both persist (no lost update)', async () => { + const { iii } = makeIii(); + await Promise.all([ + setMode(iii, 'sess-1', 'full'), + approveAlways(iii, 'sess-1', 'shell::exec'), + ]); + const s = await readSettings(iii, 'sess-1'); + expect(s.mode).toBe('full'); + expect(s.approved_always.map((e) => e.function_id)).toEqual(['shell::exec']); + }); + + it('concurrent adds of different ids both land (append composition)', async () => { + const { iii } = makeIii(); + await Promise.all([ + addAlwaysAllow(iii, 'sess-1', 'a::one'), + addAlwaysAllow(iii, 'sess-1', 'b::two'), + ]); + const s = await readSettings(iii, 'sess-1'); + expect(s.always_allow.map((e) => e.function_id).sort()).toEqual(['a::one', 'b::two']); + }); + + it('readSettings backfills a partial record and keeps the configured default mode', async () => { + // A field-scoped append can persist only { always_allow }; the read must + // still produce a complete, valid record (not fall back to all-defaults). + const { iii } = makeIii({ + always_allow: [{ function_id: 'shell::exec', granted_at: 1, granted_by: 'user_click' }], + }); + const s = await readSettings(iii, 'sess-1'); + expect(s.always_allow.map((e) => e.function_id)).toEqual(['shell::exec']); + expect(s.mode).toBe('manual'); + expect(s.approved_always).toEqual([]); + expect(s.mode_set_at).toBe(0); + }); + it('isHumanOnlyApprovalFunction catches every settings handler id', () => { expect(isHumanOnlyApprovalFunction('approval::set_mode')).toBe(true); expect(isHumanOnlyApprovalFunction('approval::add_always_allow')).toBe(true); diff --git a/harness/tests/integration/parallel-approval-harness.ts b/harness/tests/integration/parallel-approval-harness.ts index f10a00a5..68edee76 100644 --- a/harness/tests/integration/parallel-approval-harness.ts +++ b/harness/tests/integration/parallel-approval-harness.ts @@ -152,14 +152,12 @@ export function createParallelApprovalHarness(): ParallelApprovalHarness { } if (function_id === 'state::update') { - // Faithful atomic read-modify-write per (scope, key): the engine's - // kv adapter holds the store write-lock for the whole op, so - // increment returns the prior value (null/absent → treated as 0). - // Both the event counter and the per-session lease depend on this. + // Atomic read-modify-write returning the prior value. Models the + // `increment` op (event counter) and root-path `set` op (lease). const p = payload as { scope: string; key: string; - ops?: Array<{ type: string; path?: string; by?: number }>; + ops?: Array<{ type: string; path?: string; by?: number; value?: unknown }>; }; const storeKey = `${p.scope}/${p.key}`; const old_value = stateStore.has(storeKey) @@ -167,8 +165,11 @@ export function createParallelApprovalHarness(): ParallelApprovalHarness { : null; let next: unknown = old_value; for (const op of p.ops ?? []) { - if (op.type === 'increment' && (op.path ?? '') === '') { + if ((op.path ?? '') !== '') continue; + if (op.type === 'increment') { next = (typeof next === 'number' ? next : 0) + (op.by ?? 1); + } else if (op.type === 'set') { + next = op.value; } } stateStore.set(storeKey, next); diff --git a/harness/tests/models-catalog/state.test.ts b/harness/tests/models-catalog/state.test.ts index dca0499b..6aab9e06 100644 --- a/harness/tests/models-catalog/state.test.ts +++ b/harness/tests/models-catalog/state.test.ts @@ -1,12 +1,12 @@ import { describe, expect, it } from 'vitest'; -import { parseModelArray, providerStateKey } from '../../src/models-catalog/state.js'; +import { isModel, providerStateKey } from '../../src/models-catalog/state.js'; describe('provider catalog state helpers', () => { it('providerStateKey uses bare provider id', () => { expect(providerStateKey('anthropic')).toBe('anthropic'); }); - it('parseModelArray accepts only Model[]', () => { + it('isModel guards the write-side boundary (object with a string id)', () => { const model = { id: 'm1', provider: 'anthropic', @@ -14,9 +14,9 @@ describe('provider catalog state helpers', () => { display_name: 'm1', context_window: 1, }; - expect(parseModelArray([model])).toEqual([model]); - expect(parseModelArray(model)).toEqual([]); - expect(parseModelArray(null)).toEqual([]); - expect(parseModelArray('bad')).toEqual([]); + expect(isModel(model)).toBe(true); + expect(isModel({ provider: 'anthropic' })).toBe(false); + expect(isModel(null)).toBe(false); + expect(isModel('bad')).toBe(false); }); }); diff --git a/harness/tests/runtime/lease.test.ts b/harness/tests/runtime/lease.test.ts new file mode 100644 index 00000000..75928180 --- /dev/null +++ b/harness/tests/runtime/lease.test.ts @@ -0,0 +1,207 @@ +import { describe, expect, it, vi } from 'vitest'; +import { payloadStoreKey } from '../_helpers/stateStoreKey.js'; +import { + acquireLease, + acquireLeaseWithWait, + mintLeaseNonce, + releaseLease, +} from '../../src/runtime/lease.js'; + +type Iii = Parameters[0]; + +/** + * In-memory ISdk stub. state::update is atomic by construction — its + * read-modify-write runs synchronously after the optional latency await, like + * the engine's store write-lock; latencyMs shifts the interleaving. + */ +function makeStateIii(latencyMs = 0): { iii: Iii; store: Map } { + const store = new Map(); + const maybeWait = () => + latencyMs > 0 ? new Promise((r) => setTimeout(r, latencyMs)) : Promise.resolve(); + + const iii = { + trigger: vi.fn(async ({ function_id, payload }: { function_id: string; payload: unknown }) => { + const p = payload as Record; + const key = payloadStoreKey(p as { scope?: string; key?: string }); + if (function_id === 'state::get') { + return store.has(key) ? store.get(key) : null; + } + if (function_id === 'state::set') { + await maybeWait(); + const v = p.value; + if (v === null || v === undefined) store.delete(key); + else store.set(key, v); + return { ok: true }; + } + if (function_id === 'state::update') { + await maybeWait(); + const ops = (p.ops ?? []) as Array<{ type: string; value?: unknown }>; + const old_value = store.has(key) ? store.get(key) : null; + let new_value: unknown = old_value; + for (const op of ops) if (op.type === 'set') new_value = op.value; + if (new_value === null || new_value === undefined) store.delete(key); + else store.set(key, new_value); + return { old_value, new_value }; + } + return null; + }), + }; + return { iii: iii as unknown as Iii, store }; +} + +/** state::update always fails (tolerant wrapper → null), mimicking an outage. */ +function makeFailingUpdateIii(): { iii: Iii } { + const iii = { + trigger: vi.fn(async ({ function_id }: { function_id: string }) => { + if (function_id === 'state::update') return null; + return null; + }), + }; + return { iii: iii as unknown as Iii }; +} + +const SCOPE = 'turn_lease'; +const TTL = 30_000; + +describe('mintLeaseNonce', () => { + it('produces unique values across rapid calls', () => { + const seen = new Set(); + for (let i = 0; i < 1000; i++) seen.add(mintLeaseNonce()); + expect(seen.size).toBe(1000); + }); +}); + +describe('acquireLease / releaseLease', () => { + it('grants a free lease and returns a nonce', async () => { + const { iii } = makeStateIii(); + await expect(acquireLease(iii, SCOPE, 's1', TTL)).resolves.toEqual(expect.any(String)); + }); + + it('denies a second acquirer while held', async () => { + const { iii } = makeStateIii(); + expect(await acquireLease(iii, SCOPE, 's1', TTL)).not.toBeNull(); + await expect(acquireLease(iii, SCOPE, 's1', TTL)).resolves.toBeNull(); + }); + + it('re-grants after the owner releases', async () => { + const { iii } = makeStateIii(); + const nonce = await acquireLease(iii, SCOPE, 's1', TTL); + expect(nonce).not.toBeNull(); + await releaseLease(iii, SCOPE, 's1', nonce as string); + await expect(acquireLease(iii, SCOPE, 's1', TTL)).resolves.not.toBeNull(); + }); + + it('ignores a release from a non-owner (wrong nonce)', async () => { + const { iii } = makeStateIii(); + const held = await acquireLease(iii, SCOPE, 's1', TTL); + expect(held).not.toBeNull(); + await releaseLease(iii, SCOPE, 's1', 'not-the-owner'); + // Lease is still held — a fresh acquire must still fail. + await expect(acquireLease(iii, SCOPE, 's1', TTL)).resolves.toBeNull(); + }); + + it('sends a single state::update set op at the root path', async () => { + const captured: Array<{ type: string; path?: unknown; value?: unknown }> = []; + const iii = { + trigger: vi.fn( + async ({ function_id, payload }: { function_id: string; payload: unknown }) => { + if (function_id === 'state::get') return null; + if (function_id === 'state::update') { + for (const op of ((payload as Record).ops ?? []) as Array<{ + type: string; + path?: unknown; + value?: unknown; + }>) + captured.push(op); + return { old_value: null, new_value: { dummy: true } }; + } + return null; + }, + ), + } as unknown as Iii; + const nonce = await acquireLease(iii, SCOPE, 's1', TTL); + expect(captured).toHaveLength(1); + expect(captured[0]).toMatchObject({ + type: 'set', + path: '', + value: { nonce, ts: expect.any(Number) }, + }); + }); +}); + +describe('acquireLease TTL steal', () => { + it('steals a lease whose ts is older than the TTL', async () => { + const { iii, store } = makeStateIii(); + store.set(`${SCOPE}/s1`, { nonce: 'crashed', ts: Date.now() - TTL - 1_000 }); + await expect(acquireLease(iii, SCOPE, 's1', TTL)).resolves.toEqual(expect.any(String)); + }); + + it('does not steal a lease still within its TTL', async () => { + const { iii, store } = makeStateIii(); + store.set(`${SCOPE}/s1`, { nonce: 'fresh', ts: Date.now() }); + await expect(acquireLease(iii, SCOPE, 's1', TTL)).resolves.toBeNull(); + }); +}); + +describe('acquireLease concurrency (the single-writer invariant)', () => { + it('lets exactly one of two concurrent acquirers win, with write latency', async () => { + const { iii } = makeStateIii(10); + const results = await Promise.all([ + acquireLease(iii, SCOPE, 'race', TTL), + acquireLease(iii, SCOPE, 'race', TTL), + ]); + expect(results.filter((r) => r !== null)).toHaveLength(1); + }); + + it('lets exactly one of eight concurrent acquirers win, with write latency', async () => { + const { iii } = makeStateIii(10); + const results = await Promise.all( + Array.from({ length: 8 }, () => acquireLease(iii, SCOPE, 'race8', TTL)), + ); + expect(results.filter((r) => r !== null)).toHaveLength(1); + }); + + it('never false-wins during a state-store outage (update returns null)', async () => { + const { iii } = makeFailingUpdateIii(); + await expect(acquireLease(iii, SCOPE, 's1', TTL)).resolves.toBeNull(); + }); + + it('returns null for every concurrent acquirer during an outage', async () => { + const { iii } = makeFailingUpdateIii(); + const results = await Promise.all( + Array.from({ length: 4 }, () => acquireLease(iii, SCOPE, 'out', TTL)), + ); + expect(results.every((r) => r === null)).toBe(true); + }); + + it('keeps two independent (scope,key) leases from blocking each other', async () => { + const { iii } = makeStateIii(); + const a = await acquireLease(iii, 'compaction_lease', 'sid', TTL); + const b = await acquireLease(iii, 'prune_lease', 'sid', TTL); + expect(a).not.toBeNull(); + expect(b).not.toBeNull(); + expect(a).not.toBe(b); + }); +}); + +describe('acquireLeaseWithWait', () => { + it('acquires immediately when free', async () => { + const { iii } = makeStateIii(); + await expect(acquireLeaseWithWait(iii, SCOPE, 's1', TTL, 200)).resolves.not.toBeNull(); + }); + + it('returns null when the lease never frees within the timeout', async () => { + const { iii } = makeStateIii(); + expect(await acquireLease(iii, SCOPE, 'busy', TTL)).not.toBeNull(); + await expect(acquireLeaseWithWait(iii, SCOPE, 'busy', TTL, 80)).resolves.toBeNull(); + }); + + it('acquires once the holder releases mid-wait', async () => { + const { iii } = makeStateIii(); + const held = (await acquireLease(iii, SCOPE, 'hand', TTL)) as string; + setTimeout(() => { + void releaseLease(iii, SCOPE, 'hand', held); + }, 30); + await expect(acquireLeaseWithWait(iii, SCOPE, 'hand', TTL, 1_000)).resolves.not.toBeNull(); + }); +}); diff --git a/harness/tests/runtime/state-list.test.ts b/harness/tests/runtime/state-list.test.ts index e043d382..a45897ae 100644 --- a/harness/tests/runtime/state-list.test.ts +++ b/harness/tests/runtime/state-list.test.ts @@ -1,20 +1,26 @@ -import { describe, expect, it } from 'vitest'; -import { parseStateListValues } from '../../src/runtime/state.js'; +import { describe, expect, it, vi } from 'vitest'; +import type { ISdk } from '../../src/runtime/iii.js'; +import { createState } from '../../src/runtime/state.js'; -describe('parseStateListValues', () => { - it('accepts flat array (official iii shape)', () => { - const rows = [{ session_id: 's1', state: 'stopped' }]; - expect(parseStateListValues(rows)).toEqual(rows); - }); +function makeIii(listResult: unknown): ISdk { + return { + trigger: vi.fn(async ({ function_id }: { function_id: string }) => + function_id === 'state::list' ? listResult : null, + ), + } as unknown as ISdk; +} - it('unwraps { value } rows', () => { - const inner = { session_id: 's1', state: 'function_awaiting_approval' }; - expect(parseStateListValues([{ value: inner }])).toEqual([inner]); +describe('createState().list', () => { + it('returns the flat array of stored values (official iii shape)', async () => { + const rows = [{ session_id: 's1', state: 'stopped' }]; + await expect(createState(makeIii(rows)).list({ scope: 'turn_state' })).resolves.toEqual(rows); }); - it('returns [] for non-array responses', () => { - expect(parseStateListValues(null)).toEqual([]); - expect(parseStateListValues({ ok: true })).toEqual([]); - expect(parseStateListValues({ items: [{ id: 'm1' }] })).toEqual([]); + it('returns [] for non-array responses', async () => { + await expect(createState(makeIii(null)).list({ scope: 's' })).resolves.toEqual([]); + await expect(createState(makeIii({ ok: true })).list({ scope: 's' })).resolves.toEqual([]); + await expect( + createState(makeIii({ items: [{ id: 'm1' }] })).list({ scope: 's' }), + ).resolves.toEqual([]); }); }); diff --git a/harness/tests/turn-orchestrator/parse-turn-state-record.test.ts b/harness/tests/turn-orchestrator/parse-turn-state-record.test.ts deleted file mode 100644 index be83190b..00000000 --- a/harness/tests/turn-orchestrator/parse-turn-state-record.test.ts +++ /dev/null @@ -1,35 +0,0 @@ -import { describe, expect, it } from 'vitest'; -import { newRecord, parseTurnStateRecord } from '../../src/turn-orchestrator/state.js'; - -describe('parseTurnStateRecord', () => { - it('returns a valid record for a well-formed turn_state', () => { - const rec = newRecord('sess-1'); - expect(parseTurnStateRecord(rec)).toEqual(rec); - }); - - it('returns null for null, undefined, and primitives', () => { - expect(parseTurnStateRecord(null)).toBeNull(); - expect(parseTurnStateRecord(undefined)).toBeNull(); - expect(parseTurnStateRecord('nope')).toBeNull(); - expect(parseTurnStateRecord(42)).toBeNull(); - }); - - it('returns null when required identity fields are missing', () => { - expect(parseTurnStateRecord({ state: 'provisioning' })).toBeNull(); - expect(parseTurnStateRecord({ session_id: 's1' })).toBeNull(); - expect(parseTurnStateRecord({ messages: [] })).toBeNull(); - }); - - it('applies defaults for missing scalar fields on partial records', () => { - const parsed = parseTurnStateRecord({ - session_id: 's1', - state: 'provisioning', - }); - expect(parsed).toMatchObject({ - session_id: 's1', - state: 'provisioning', - turn_count: 0, - turn_end_emitted: false, - }); - }); -}); diff --git a/harness/tests/turn-orchestrator/run-request.test.ts b/harness/tests/turn-orchestrator/run-request.test.ts index 945c13f5..da10c146 100644 --- a/harness/tests/turn-orchestrator/run-request.test.ts +++ b/harness/tests/turn-orchestrator/run-request.test.ts @@ -1,9 +1,9 @@ import { describe, expect, it } from 'vitest'; -import { parseRunRequest } from '../../src/turn-orchestrator/run-request.js'; +import { defaultRunRequest } from '../../src/turn-orchestrator/run-request.js'; -describe('parseRunRequest', () => { - it('maps persisted run::start fields with defaults for missing keys', () => { - expect(parseRunRequest({})).toEqual({ +describe('defaultRunRequest', () => { + it('is the empty run request used when no record exists yet', () => { + expect(defaultRunRequest()).toEqual({ provider: '', model: '', mode: null, @@ -11,65 +11,4 @@ describe('parseRunRequest', () => { function_schemas: [], }); }); - - it('passes through provided string fields', () => { - expect(parseRunRequest({ provider: 'openai', model: 'gpt-4', system_prompt: 'hi' })).toEqual({ - provider: 'openai', - model: 'gpt-4', - mode: null, - system_prompt: 'hi', - function_schemas: [], - }); - }); - - it('rejects invalid mode values and accepts valid ones', () => { - expect(parseRunRequest({ mode: 'invalid' }).mode).toBeNull(); - expect(parseRunRequest({ mode: 'plan' }).mode).toBe('plan'); - expect(parseRunRequest({ mode: 'ask' }).mode).toBe('ask'); - expect(parseRunRequest({ mode: 'agent' }).mode).toBe('agent'); - }); - - it('coerces non-string fields to defaults', () => { - expect(parseRunRequest({ provider: 123, model: null, system_prompt: {} })).toEqual({ - provider: '', - model: '', - mode: null, - system_prompt: '', - function_schemas: [], - }); - }); - - it('treats null and undefined as empty run request', () => { - const empty = { - provider: '', - model: '', - mode: null, - system_prompt: '', - function_schemas: [], - }; - expect(parseRunRequest(null)).toEqual(empty); - expect(parseRunRequest(undefined)).toEqual(empty); - }); -}); - -describe('parseRunRequest function_schemas', () => { - it('defaults to [] and carries an array', () => { - expect(parseRunRequest({}).function_schemas).toEqual([]); - expect(parseRunRequest({ function_schemas: [{ name: 'x' }] }).function_schemas).toHaveLength(1); - }); -}); - -describe('parseRunRequest thinking_level', () => { - it('passes through a string level', () => { - expect(parseRunRequest({ thinking_level: 'high' }).thinking_level).toBe('high'); - }); - - it('is undefined when absent', () => { - expect(parseRunRequest({}).thinking_level).toBeUndefined(); - }); - - it('coerces non-string values to undefined', () => { - expect(parseRunRequest({ thinking_level: 42 }).thinking_level).toBeUndefined(); - expect(parseRunRequest({ thinking_level: { level: 'high' } }).thinking_level).toBeUndefined(); - }); }); diff --git a/harness/tests/turn-orchestrator/session-lease.test.ts b/harness/tests/turn-orchestrator/session-lease.test.ts new file mode 100644 index 00000000..ad530134 --- /dev/null +++ b/harness/tests/turn-orchestrator/session-lease.test.ts @@ -0,0 +1,86 @@ +import { describe, expect, it, vi } from 'vitest'; +import type { ISdk } from '../../src/runtime/iii.js'; +import { + acquireSessionLease, + releaseSessionLease, +} from '../../src/turn-orchestrator/state-runtime/session-lease.js'; + +/** + * Fake iii with an in-memory atomic state::update set-CAS (plus get/set). + * `failUpdate` makes update throw → tolerant client returns null, exercising the + * outage path that must never false-win. + */ +function makeIii(opts: { failUpdate?: boolean } = {}): ISdk { + const store = new Map(); + const id = (p: { scope: string; key: string }) => `${p.scope}/${p.key}`; + + const trigger = vi.fn( + async ({ function_id, payload }: { function_id: string; payload: Record }) => { + const k = id(payload as { scope: string; key: string }); + if (function_id === 'state::get') return store.has(k) ? store.get(k) : null; + if (function_id === 'state::set') { + if (payload.value === null || payload.value === undefined) store.delete(k); + else store.set(k, payload.value); + return { ok: true }; + } + if (function_id === 'state::update') { + if (opts.failUpdate) throw new Error('state-store down'); + const old_value = store.has(k) ? store.get(k) : null; + let new_value: unknown = old_value; + for (const op of (payload.ops ?? []) as Array<{ type: string; value?: unknown }>) { + if (op.type === 'set') new_value = op.value; + } + if (new_value === null || new_value === undefined) store.delete(k); + else store.set(k, new_value); + return { old_value, new_value }; + } + throw new Error(`unexpected function_id ${function_id}`); + }, + ); + + return { trigger } as unknown as ISdk; +} + +describe('acquireSessionLease', () => { + it('grants the lease on a fresh key and returns a nonce', async () => { + const iii = makeIii(); + await expect(acquireSessionLease(iii, 's1')).resolves.toEqual(expect.any(String)); + }); + + it('denies a second concurrent holder', async () => { + const iii = makeIii(); + expect(await acquireSessionLease(iii, 's1')).not.toBeNull(); + await expect(acquireSessionLease(iii, 's1')).resolves.toBeNull(); + }); + + it('re-grants after the owner releases', async () => { + const iii = makeIii(); + const nonce = await acquireSessionLease(iii, 's1'); + expect(nonce).not.toBeNull(); + await releaseSessionLease(iii, 's1', nonce as string); + await expect(acquireSessionLease(iii, 's1')).resolves.not.toBeNull(); + }); + + it('ignores a release from a non-owner (wrong nonce)', async () => { + const iii = makeIii(); + expect(await acquireSessionLease(iii, 's1')).not.toBeNull(); + await releaseSessionLease(iii, 's1', 'someone-else'); + await expect(acquireSessionLease(iii, 's1')).resolves.toBeNull(); + }); + + // Regression: a transient state-store outage must NOT be read as a win, or + // every concurrent contender claims the lease at once → duplicate side effects. + it('does not grant the lease when the atomic update fails', async () => { + const iii = makeIii({ failUpdate: true }); + await expect(acquireSessionLease(iii, 's1')).resolves.toBeNull(); + }); + + it('never lets two contenders both win under a state-store outage', async () => { + const iii = makeIii({ failUpdate: true }); + const [a, b] = await Promise.all([ + acquireSessionLease(iii, 's1'), + acquireSessionLease(iii, 's1'), + ]); + expect([a, b]).toEqual([null, null]); + }); +}); From 3c3785d15feeb8d32b607a4d1795592a0609238c Mon Sep 17 00:00:00 2001 From: Ytallo Layon Date: Mon, 8 Jun 2026 19:51:57 -0300 Subject: [PATCH 2/2] refactor(context-compaction): update async handler to use typed payloads and improve event parsing - Refactor `handleAsync` to accept a structured payload instead of a generic frame, enhancing clarity and type safety. - Introduce `parseOnTurnEnd` to handle the new payload format, replacing the previous `extractEventPayload` function. - Update related functions to align with the new payload structure, ensuring consistent handling of session IDs, usage, provider, and model. - Modify YAML and main registration files to reflect changes in event handling and descriptions. - Remove obsolete stream subscriptions, transitioning to a queue-based wake mechanism for compaction on turn end. - Update tests to validate the new payload structure and ensure backward compatibility with existing functionality. --- .../src/context-compaction/handler-async.ts | 126 ++++++------------ .../src/context-compaction/iii.worker.yaml | 2 +- harness/src/context-compaction/main.ts | 2 +- harness/src/context-compaction/register.ts | 19 +-- harness/src/turn-orchestrator/events.ts | 47 +++++-- .../state-runtime/turn-end.ts | 9 +- .../turn-orchestrator/steering-check/ports.ts | 24 ---- .../steering-check/process.ts | 4 +- .../turn-orchestrator/steering-check/run.ts | 116 ++++++---------- .../compaction-done-emit.test.ts | 30 ++--- .../context-compaction/handler-async.test.ts | 72 ++++------ .../integration/backward-compat.test.ts | 44 ++---- .../integration/flow-async.test.ts | 23 +--- .../context-compaction/registration.test.ts | 24 ++++ .../turn-end-subscription.test.ts | 27 ---- .../integration/parallel-approval-harness.ts | 7 +- .../tests/turn-orchestrator/events.test.ts | 29 ++-- .../steering-check-layer.test.ts | 116 ---------------- 18 files changed, 229 insertions(+), 492 deletions(-) delete mode 100644 harness/src/turn-orchestrator/steering-check/ports.ts create mode 100644 harness/tests/context-compaction/registration.test.ts delete mode 100644 harness/tests/context-compaction/turn-end-subscription.test.ts delete mode 100644 harness/tests/turn-orchestrator/steering-check-layer.test.ts diff --git a/harness/src/context-compaction/handler-async.ts b/harness/src/context-compaction/handler-async.ts index 518fafdb..bfd9f970 100644 --- a/harness/src/context-compaction/handler-async.ts +++ b/harness/src/context-compaction/handler-async.ts @@ -1,12 +1,13 @@ /** - * Async (TurnEnd-driven) compaction. Mirrors Rust lib.rs::handle_event. - * Accepts both camelCase ({groupId, event:{data}}) and snake_case - * ({group_id, data}) envelopes. + * Async (turn_end-driven) compaction. Woken by a turn-orchestrator queue + * message (`context-compaction::on_turn_end`) carrying a typed + * `{ session_id, usage, provider, model }` payload. */ import { setCurrentSpanAttribute, withSpan } from '@iii-dev/observability'; import type { ISdk } from '../runtime/iii.js'; import { logger } from '../runtime/otel.js'; +import type { Usage } from '../types/stream-event.js'; import { compactionConfig } from './config.js'; import { isSummarizeOk, @@ -17,38 +18,24 @@ import { acquireLease, releaseLease } from './lease.js'; import { fetchModelLimit } from './model-resolver.js'; import { isOverflow } from './overflow.js'; -export function extractEventPayload( - payload: unknown, -): { session_id: string; event: unknown } | null { +export type OnTurnEndPayload = { + session_id: string; + usage: Usage | null; + provider: string; + model: string; +}; + +/** Parse the queue payload enqueued by turn-orchestrator at turn_end. */ +export function parseOnTurnEnd(payload: unknown): OnTurnEndPayload | null { if (!payload || typeof payload !== 'object') return null; const obj = payload as Record; - const session_id = - (typeof obj.groupId === 'string' && obj.groupId) || - (typeof obj.group_id === 'string' && obj.group_id) || - null; - if (!session_id) return null; - let event: unknown = null; - if ( - obj.event && - typeof obj.event === 'object' && - 'data' in (obj.event as Record) - ) { - event = (obj.event as Record).data; - } else if ('data' in obj) { - event = obj.data; - } - return { session_id, event }; -} - -export function turnEndUsage(event: unknown): Record | null { - if (!event || typeof event !== 'object') return null; - const obj = event as Record; - const kind = typeof obj.type === 'string' ? obj.type : null; - if (kind !== 'TurnEnd' && kind !== 'turn_end') return null; - const msg = obj.message as Record | undefined; - const usage = msg?.usage; - if (!usage || typeof usage !== 'object') return null; - return usage as Record; + if (typeof obj.session_id !== 'string' || !obj.session_id) return null; + return { + session_id: obj.session_id, + usage: obj.usage && typeof obj.usage === 'object' ? (obj.usage as Usage) : null, + provider: typeof obj.provider === 'string' ? obj.provider : '', + model: typeof obj.model === 'string' ? obj.model : '', + }; } type ResolvedModel = { @@ -57,22 +44,18 @@ type ResolvedModel = { modelLimit: { context: number; input: number; output: number }; } | null; -async function resolveModelFromEvent( +/** + * Resolve the model limit from the payload's provider/model, falling back to + * the session's most recent assistant message when either is missing. + */ +async function resolveModel( iii: ISdk, session_id: string, - event: unknown, + provider: string, + model: string, ): Promise { - let providerID: string | null = null; - let modelID: string | null = null; - - if (event && typeof event === 'object') { - const ev = event as Record; - const msg = ev.message as Record | undefined; - if (msg) { - if (typeof msg.provider === 'string' && msg.provider) providerID = msg.provider; - if (typeof msg.model === 'string' && msg.model) modelID = msg.model; - } - } + let providerID: string | null = provider || null; + let modelID: string | null = model || null; if (!providerID || !modelID) { try { @@ -109,40 +92,29 @@ async function resolveModelFromEvent( return fetchModelLimit(iii, providerID, modelID); } -export async function handleAsync(iii: ISdk, frame: unknown): Promise { +export async function handleAsync(iii: ISdk, payload: unknown): Promise { return withSpan('compaction::async', {}, async () => { - const payload = extractEventPayload(frame); - if (!payload) return; + const parsed = parseOnTurnEnd(payload); + if (!parsed || !parsed.usage) return; + const { session_id, usage } = parsed; - const usage = turnEndUsage(payload.event); - if (!usage) return; + setCurrentSpanAttribute('session_id', session_id); - setCurrentSpanAttribute('session_id', payload.session_id); - - const model = await resolveModelFromEvent(iii, payload.session_id, payload.event); + const model = await resolveModel(iii, session_id, parsed.provider, parsed.model); if (!model) { logger.debug('handler-async: could not resolve model; skipping overflow check', { - session_id: payload.session_id, + session_id, }); return; } - const usageObj = usage as { - input?: number; - output?: number; - cache_read?: number; - cache_write?: number; - }; const tokens_before = - (usageObj.input ?? 0) + - (usageObj.output ?? 0) + - (usageObj.cache_read ?? 0) + - (usageObj.cache_write ?? 0); + (usage.input ?? 0) + (usage.output ?? 0) + (usage.cache_read ?? 0) + (usage.cache_write ?? 0); setCurrentSpanAttribute('tokens_before', tokens_before); if ( !isOverflow({ - tokens: usageObj, + tokens: usage, model: { id: model.modelID, limit: model.modelLimit }, reserved: compactionConfig().reservedTokens, }) @@ -150,33 +122,23 @@ export async function handleAsync(iii: ISdk, frame: unknown): Promise { return; } - const nonce = await acquireLease(iii, payload.session_id, 'compaction'); + const nonce = await acquireLease(iii, session_id, 'compaction'); if (!nonce) { - logger.debug('handler-async: compaction lease held; skipping', { - session_id: payload.session_id, - }); + logger.debug('handler-async: compaction lease held; skipping', { session_id }); return; } try { - const result = await runSummarizeCompaction( - iii, - payload.session_id, - { mode: 'async' }, - model, - ); + const result = await runSummarizeCompaction(iii, session_id, { mode: 'async' }, model); setCurrentSpanAttribute('used_prior_summary', isSummarizeOk(result)); if (isSummarizeOk(result)) { - await publishCompactionDone(iii, payload.session_id, 'async', result); + await publishCompactionDone(iii, session_id, 'async', result); } } catch (err) { - logger.warn('handler-async: compaction failed', { - session_id: payload.session_id, - err: String(err), - }); + logger.warn('handler-async: compaction failed', { session_id, err: String(err) }); } finally { - await releaseLease(iii, payload.session_id, nonce, 'compaction'); + await releaseLease(iii, session_id, nonce, 'compaction'); } }); } diff --git a/harness/src/context-compaction/iii.worker.yaml b/harness/src/context-compaction/iii.worker.yaml index d9e5ec96..2c8eb183 100644 --- a/harness/src/context-compaction/iii.worker.yaml +++ b/harness/src/context-compaction/iii.worker.yaml @@ -4,7 +4,7 @@ language: node deploy: binary manifest: package.json bin: iii-context-compaction -description: Out-of-band session-history compactor. Subscribes to agent::events::TurnEnd, summarises older turns via provider::::stream, and writes a session-tree Compaction entry so the next turn reads a compressed transcript. +description: Out-of-band session-history compactor. Woken by a turn-orchestrator queue message at turn_end, summarises older turns via provider::::stream, and writes a session-tree Compaction entry so the next turn reads a compressed transcript. runtime: kind: node diff --git a/harness/src/context-compaction/main.ts b/harness/src/context-compaction/main.ts index be302bb1..25f184e8 100644 --- a/harness/src/context-compaction/main.ts +++ b/harness/src/context-compaction/main.ts @@ -5,6 +5,6 @@ import { register } from './register.js'; await bootstrapWorker({ name: 'context-compaction', description: - 'Out-of-band session-history compactor. Subscribes to agent::events::TurnEnd and writes a session-tree Compaction entry when the running token count crosses the configured threshold.', + 'Out-of-band session-history compactor. Woken by a turn-orchestrator queue message at turn_end; writes a session-tree Compaction entry when the running token count crosses the configured threshold.', register: (iii) => register(iii), }); diff --git a/harness/src/context-compaction/register.ts b/harness/src/context-compaction/register.ts index 9d407cf6..a101082c 100644 --- a/harness/src/context-compaction/register.ts +++ b/harness/src/context-compaction/register.ts @@ -11,11 +11,6 @@ import { } from './model-resolver.js'; import { prune } from './prune.js'; -// Compaction only acts on turn_end, so it subscribes to the dedicated -// turn_end stream (mirrored by the producer) rather than the full -// agent::events firehose — one wake per turn instead of per event. -const TURN_END_STREAM = 'agent::turn_end'; - // Sized so preserveRecentBudget clamps to its 2k minimum when the real // model is unknown — compaction is best-effort, not fatal. const FALLBACK_MODEL_LIMIT = { @@ -57,14 +52,14 @@ async function resolveExplicitModel( export async function register(iii: ISdk): Promise { iii.registerFunction( - 'context-compaction::on_agent_event', - async (frame: unknown) => { - await handleAsync(iii, frame); + 'context-compaction::on_turn_end', + async (payload: unknown) => { + await handleAsync(iii, payload); return null; }, { description: - 'Internal: subscribes to agent::turn_end; triggers async compaction on TurnEnd when running tokens exceed usable(model).', + 'Internal: woken by a turn-orchestrator queue message at turn_end; triggers async compaction when running tokens exceed usable(model).', }, ); @@ -183,10 +178,4 @@ export async function register(iii: ISdk): Promise { 'User-initiated synchronous compaction of a session. Required: session_id. Optional: model { id, providerID, limit? } to skip auto-resolution. If model is omitted, falls back to (1) most recent assistant message in session-tree, (2) orchestrator run_request.', }, ); - - iii.registerTrigger({ - type: 'stream', - function_id: 'context-compaction::on_agent_event', - config: { stream_name: TURN_END_STREAM }, - }); } diff --git a/harness/src/turn-orchestrator/events.ts b/harness/src/turn-orchestrator/events.ts index 07a48b96..fc3b1f87 100644 --- a/harness/src/turn-orchestrator/events.ts +++ b/harness/src/turn-orchestrator/events.ts @@ -1,7 +1,10 @@ /** * Emit AgentEvent frames on `agent::events`, one per call with a per-session - * monotonic sequence number. `turn_end` frames are additionally mirrored onto - * the dedicated `agent::turn_end` stream (see TURN_END_STREAM). + * monotonic sequence number. `turn_end` frames additionally enqueue a + * compaction wake (`context-compaction::on_turn_end`) on the default queue, so + * the out-of-band compactor runs once per turn instead of consuming the full + * `agent::events` firehose. The enqueue is best-effort: a failure (e.g. the + * compactor isn't deployed) is logged, never fatal. * * The sequence number is kept IN-PROCESS (not persisted): the old per-event * `state::update` increment cost one engine state write — and thus one @@ -16,17 +19,20 @@ */ import { uuidLike } from '../runtime/ids.js'; -import type { ISdk } from '../runtime/iii.js'; +import { TriggerAction, type ISdk } from '../runtime/iii.js'; import { logger } from '../runtime/otel.js'; import type { AgentEvent } from '../types/agent-event.js'; export const EVENTS_STREAM = 'agent::events'; + /** - * Dedicated stream carrying only `turn_end` frames. Compaction subscribes here - * instead of the full `agent::events` firehose so it wakes once per turn rather - * than on every event (token updates, function lifecycle, …). + * Out-of-band compactor woken once per turn. The turn-orchestrator enqueues a + * typed `{ session_id, usage, provider, model }` payload here at `turn_end` + * instead of mirroring the event onto a dedicated stream — a 1:1 channel is a + * queue message, not pub/sub. */ -export const TURN_END_STREAM = 'agent::turn_end'; +const COMPACTION_ON_TURN_END = 'context-compaction::on_turn_end'; +const COMPACTION_QUEUE = 'default'; /** Unique per process run; prefixes every item_id so a restart can't collide. */ const PROCESS_EPOCH = uuidLike(); @@ -78,11 +84,36 @@ async function setStream( } } +/** + * Enqueue an out-of-band compaction wake from a `turn_end` event. The assistant + * message carries everything the compactor needs to decide overflow — token + * usage plus the provider/model it ran on — so we pass them as a typed payload + * rather than making the compactor re-derive them from a stream frame. + */ +async function enqueueCompaction(iii: ISdk, session_id: string, event: AgentEvent): Promise { + const message = (event as { message?: { usage?: unknown; provider?: unknown; model?: unknown } }) + .message; + try { + await iii.trigger({ + function_id: COMPACTION_ON_TURN_END, + payload: { + session_id, + usage: message?.usage ?? null, + provider: typeof message?.provider === 'string' ? message.provider : '', + model: typeof message?.model === 'string' ? message.model : '', + }, + action: TriggerAction.Enqueue({ queue: COMPACTION_QUEUE }), + }); + } catch (err) { + logger.warn('compaction on_turn_end enqueue failed', { session_id, err: String(err) }); + } +} + export async function emit(iii: ISdk, session_id: string, event: AgentEvent): Promise { const seq = nextSeq(session_id); const item_id = formatItemId(session_id, seq); await setStream(iii, EVENTS_STREAM, session_id, item_id, event); if (isTurnEnd(event)) { - await setStream(iii, TURN_END_STREAM, session_id, item_id, event); + await enqueueCompaction(iii, session_id, event); } } diff --git a/harness/src/turn-orchestrator/state-runtime/turn-end.ts b/harness/src/turn-orchestrator/state-runtime/turn-end.ts index d2be3af2..b4ba2421 100644 --- a/harness/src/turn-orchestrator/state-runtime/turn-end.ts +++ b/harness/src/turn-orchestrator/state-runtime/turn-end.ts @@ -1,5 +1,5 @@ /** - * Shared turn-end and FSM resume helpers for step outcome application. + * Shared turn-end helper for step outcome application. */ import { @@ -7,7 +7,7 @@ import { type AssistantMessage, type FunctionResultMessage, } from '../../types/agent-message.js'; -import { transitionTo, type TurnStateRecord } from '../state.js'; +import type { TurnStateRecord } from '../state.js'; export type TurnEndEmitter = { emitTurnEnd( @@ -28,8 +28,3 @@ export async function emitTurnEndOnce( await ports.emitTurnEnd(rec.session_id, last, function_results); rec.turn_end_emitted = true; } - -export function resumeToAssistantStreaming(rec: TurnStateRecord): void { - rec.function_results = []; - transitionTo(rec, 'assistant_streaming'); -} diff --git a/harness/src/turn-orchestrator/steering-check/ports.ts b/harness/src/turn-orchestrator/steering-check/ports.ts deleted file mode 100644 index fe6f4279..00000000 --- a/harness/src/turn-orchestrator/steering-check/ports.ts +++ /dev/null @@ -1,24 +0,0 @@ -/** - * Typed dependency ports for steering_check. - */ - -import type { ISdk } from '../../runtime/iii.js'; -import type { AgentEvent } from '../../types/agent-event.js'; -import { emit } from '../events.js'; -import { createTurnStatePorts, type TurnStatePorts } from '../state-runtime/ports.js'; - -export type SteeringCheckPorts = TurnStatePorts & { - emit(session_id: string, event: AgentEvent): Promise; -}; - -export function createSteeringCheckPorts(iii: ISdk): SteeringCheckPorts { - const base = createTurnStatePorts(iii); - - return { - ...base, - - emit(session_id, event) { - return emit(iii, session_id, event); - }, - }; -} diff --git a/harness/src/turn-orchestrator/steering-check/process.ts b/harness/src/turn-orchestrator/steering-check/process.ts index 83c1d133..2ce48c98 100644 --- a/harness/src/turn-orchestrator/steering-check/process.ts +++ b/harness/src/turn-orchestrator/steering-check/process.ts @@ -10,13 +10,11 @@ import { type TurnStepPayload, } from '../schemas.js'; import type { TurnStateRecord } from '../state.js'; -import { createSteeringCheckPorts } from './ports.js'; import { runSteeringCheck } from './run.js'; export async function handleSteering(iii: ISdk, rec: TurnStateRecord): Promise { const steering = parseSteeringCheckRecord(rec); - const ports = createSteeringCheckPorts(iii); - await runSteeringCheck(ports, steering); + await runSteeringCheck(iii, steering); } export function register(iii: ISdk): void { diff --git a/harness/src/turn-orchestrator/steering-check/run.ts b/harness/src/turn-orchestrator/steering-check/run.ts index 8763ec12..f0400488 100644 --- a/harness/src/turn-orchestrator/steering-check/run.ts +++ b/harness/src/turn-orchestrator/steering-check/run.ts @@ -1,87 +1,59 @@ /** - * Route steering_check outcomes and apply transitions. + * Run one steering_check transition: feed function results back to the model, + * or end the turn. + * + * The turn_state record is persisted atomically by the shared runTransition + * runner, so this only mutates `rec` and emits the things that write can't + * carry — the live-stream events (turn_end / agent_end / message_complete) and + * the session-tree history append. */ +import type { ISdk } from '../../runtime/iii.js'; +import { emptyAssistant } from '../../types/agent-message.js'; +import { emit } from '../events.js'; +import { createTurnStore } from '../state-runtime/store.js'; +import { transitionTo, type SteeringCheckTurnRecord } from '../state.js'; import { syntheticAssistant } from '../synthetic-assistant.js'; -import { emitTurnEndOnce, resumeToAssistantStreaming } from '../state-runtime/turn-end.js'; -import type { SteeringCheckTurnRecord } from '../state.js'; -import type { SteeringCheckPorts } from './ports.js'; - -export type SteeringRoute = 'continue_after_function' | 'end_turn'; - -export type SteeringCheckOutcome = - | { kind: 'max_turns_reached' } - | { kind: 'continue_after_function' } - | { kind: 'end_turn' }; - -export function route(has_function_results: boolean): SteeringRoute { - return has_function_results ? 'continue_after_function' : 'end_turn'; -} function maxTurnsReached(rec: SteeringCheckTurnRecord): boolean { return rec.max_turns !== undefined && rec.turn_count >= rec.max_turns; } -async function endForMaxTurns( - ports: SteeringCheckPorts, - rec: SteeringCheckTurnRecord, -): Promise { - const msg = syntheticAssistant({ - stop_reason: 'end', - text: `loop stopped: max_turns (${rec.max_turns ?? 0}) reached`, - }); - rec.last_assistant = msg; - await ports.appendMessages(rec.session_id, [msg]); - await ports.emit(rec.session_id, { - type: 'message_complete', - message: msg, - body_streamed: false, - }); - await emitTurnEndOnce(ports, rec, msg); - await ports.finishSession(rec); -} - -export async function processSteeringCheck( - _ports: SteeringCheckPorts, - rec: SteeringCheckTurnRecord, -): Promise { - const decision = route(rec.function_results.length > 0); - - if (decision === 'continue_after_function' && maxTurnsReached(rec)) { - return { kind: 'max_turns_reached' }; +export async function runSteeringCheck(iii: ISdk, rec: SteeringCheckTurnRecord): Promise { + // Function results under the turn cap: feed them back to the model. Pure state + // update — runTransition persists it and wakes turn::assistant_streaming. + if (rec.function_results.length > 0 && !maxTurnsReached(rec)) { + rec.function_results = []; + transitionTo(rec, 'assistant_streaming'); + return; } - switch (decision) { - case 'continue_after_function': - return { kind: 'continue_after_function' }; - case 'end_turn': - return { kind: 'end_turn' }; + // Function results but the turn cap is hit: append a synthetic notice so the + // user sees why the loop stopped, then fall through to teardown. + if (rec.function_results.length > 0) { + const msg = syntheticAssistant({ + stop_reason: 'end', + text: `loop stopped: max_turns (${rec.max_turns ?? 0}) reached`, + }); + rec.last_assistant = msg; + await createTurnStore(iii).appendMessages(rec.session_id, [msg]); + await emit(iii, rec.session_id, { + type: 'message_complete', + message: msg, + body_streamed: false, + }); } -} -export async function applySteeringCheckOutcome( - ports: SteeringCheckPorts, - rec: SteeringCheckTurnRecord, - outcome: SteeringCheckOutcome, -): Promise { - switch (outcome.kind) { - case 'max_turns_reached': - await endForMaxTurns(ports, rec); - return; - case 'continue_after_function': - resumeToAssistantStreaming(rec); - return; - case 'end_turn': - await emitTurnEndOnce(ports, rec); - await ports.finishSession(rec); - return; + // Terminal: emit turn_end once, then agent_end (a signal carrying no + // transcript) and settle in `stopped`. + if (!rec.turn_end_emitted) { + await emit(iii, rec.session_id, { + type: 'turn_end', + message: rec.last_assistant ?? emptyAssistant(), + function_results: [], + }); + rec.turn_end_emitted = true; } -} - -export async function runSteeringCheck( - ports: SteeringCheckPorts, - rec: SteeringCheckTurnRecord, -): Promise { - const outcome = await processSteeringCheck(ports, rec); - await applySteeringCheckOutcome(ports, rec, outcome); + await emit(iii, rec.session_id, { type: 'agent_end', messages: [] }); + transitionTo(rec, 'stopped'); } diff --git a/harness/tests/context-compaction/compaction-done-emit.test.ts b/harness/tests/context-compaction/compaction-done-emit.test.ts index 499f6c75..7bc9bdf3 100644 --- a/harness/tests/context-compaction/compaction-done-emit.test.ts +++ b/harness/tests/context-compaction/compaction-done-emit.test.ts @@ -188,17 +188,10 @@ describe('handleAsync emits compaction_done', () => { try { await handleAsync(iii, { - groupId: 'sess-async-1', - event: { - data: { - type: 'TurnEnd', - message: { - provider: 'anthropic', - model: 'claude-haiku-4-5', - usage: { input: 200_000, output: 50_000 }, - }, - }, - }, + session_id: 'sess-async-1', + provider: 'anthropic', + model: 'claude-haiku-4-5', + usage: { input: 200_000, output: 50_000 }, }); const events = streamSetCalls.filter((c) => c.stream_name === 'agent::events'); @@ -217,17 +210,10 @@ describe('handleAsync emits compaction_done', () => { const { iii, streamSetCalls } = makeStubIii(); await handleAsync(iii, { - groupId: 'sess-async-noop', - event: { - data: { - type: 'TurnEnd', - message: { - provider: 'anthropic', - model: 'claude-haiku-4-5', - usage: { input: 100, output: 50 }, - }, - }, - }, + session_id: 'sess-async-noop', + provider: 'anthropic', + model: 'claude-haiku-4-5', + usage: { input: 100, output: 50 }, }); const events = streamSetCalls.filter((c) => c.stream_name === 'agent::events'); diff --git a/harness/tests/context-compaction/handler-async.test.ts b/harness/tests/context-compaction/handler-async.test.ts index ef7c9c08..e2f98379 100644 --- a/harness/tests/context-compaction/handler-async.test.ts +++ b/harness/tests/context-compaction/handler-async.test.ts @@ -1,56 +1,36 @@ import { describe, expect, it } from 'vitest'; -import { extractEventPayload, turnEndUsage } from '../../src/context-compaction/handler-async.js'; +import { parseOnTurnEnd } from '../../src/context-compaction/handler-async.js'; -describe('extractEventPayload', () => { - it('handles camelCase envelope (event.data shape)', () => { - const env = { - groupId: 'sess-1', - event: { data: { type: 'TurnEnd', message: {} } }, - }; - const out = extractEventPayload(env); - expect(out?.session_id).toBe('sess-1'); - expect((out?.event as { type: string }).type).toBe('TurnEnd'); +describe('parseOnTurnEnd', () => { + it('parses a well-formed turn_end payload', () => { + const out = parseOnTurnEnd({ + session_id: 'sess-1', + usage: { input: 100, output: 50, cache_read: 800 }, + provider: 'anthropic', + model: 'claude-haiku-4-5', + }); + expect(out).toEqual({ + session_id: 'sess-1', + usage: { input: 100, output: 50, cache_read: 800 }, + provider: 'anthropic', + model: 'claude-haiku-4-5', + }); }); - it('handles snake_case envelope (top-level data shape)', () => { - const env = { group_id: 'sess-2', data: { type: 'TurnEnd' } }; - const out = extractEventPayload(env); - expect(out?.session_id).toBe('sess-2'); - expect((out?.event as { type: string }).type).toBe('TurnEnd'); + it('defaults provider/model to empty strings when absent (handler falls back to session-tree)', () => { + const out = parseOnTurnEnd({ session_id: 'sess-2', usage: { input: 10 } }); + expect(out).toEqual({ session_id: 'sess-2', usage: { input: 10 }, provider: '', model: '' }); }); - it('returns null when session id is missing', () => { - expect(extractEventPayload({ data: { type: 'TurnEnd' } })).toBeNull(); - expect(extractEventPayload(null)).toBeNull(); - expect(extractEventPayload(42)).toBeNull(); - }); -}); - -describe('turnEndUsage', () => { - it('extracts usage on TurnEnd', () => { - const event = { - type: 'TurnEnd', - message: { usage: { input: 100, output: 50, cache_read: 800 } }, - }; - expect(turnEndUsage(event)).toEqual({ input: 100, output: 50, cache_read: 800 }); - }); - - it('extracts usage on turn_end (snake_case variant)', () => { - const event = { - type: 'turn_end', - message: { usage: { input: 200, output: 30 } }, - }; - expect(turnEndUsage(event)).toEqual({ input: 200, output: 30 }); - }); - - it('returns null for non-TurnEnd events', () => { - for (const kind of ['TurnStart', 'MessageStart', 'AgentStart']) { - expect(turnEndUsage({ type: kind, message: { usage: { input: 9999 } } })).toBeNull(); - } + it('returns null usage when usage is missing or malformed', () => { + expect(parseOnTurnEnd({ session_id: 'sess-3' })?.usage).toBeNull(); + expect(parseOnTurnEnd({ session_id: 'sess-3', usage: 'nope' })?.usage).toBeNull(); }); - it('returns null when usage is missing', () => { - expect(turnEndUsage({ type: 'TurnEnd', message: {} })).toBeNull(); - expect(turnEndUsage({ type: 'TurnEnd' })).toBeNull(); + it('returns null when session_id is missing', () => { + expect(parseOnTurnEnd({ usage: { input: 1 } })).toBeNull(); + expect(parseOnTurnEnd({ session_id: '' })).toBeNull(); + expect(parseOnTurnEnd(null)).toBeNull(); + expect(parseOnTurnEnd(42)).toBeNull(); }); }); diff --git a/harness/tests/context-compaction/integration/backward-compat.test.ts b/harness/tests/context-compaction/integration/backward-compat.test.ts index c71e53b8..33ce8d3a 100644 --- a/harness/tests/context-compaction/integration/backward-compat.test.ts +++ b/harness/tests/context-compaction/integration/backward-compat.test.ts @@ -156,25 +156,12 @@ describe('backward-compat: prior compaction (anchored update path)', () => { compactPayloads, }); - // Fire a TurnEnd that triggers overflow + // Fire a turn_end that triggers overflow const frame = { - groupId: `${largeFixture.session_id}-compat`, - event: { - data: { - type: 'TurnEnd', - message: { - role: 'assistant', - model: 'claude-haiku-4-5', - provider: 'anthropic', - usage: { - input: 185_000, - output: 0, - cache_read: 0, - cache_write: 0, - }, - }, - }, - }, + session_id: `${largeFixture.session_id}-compat`, + provider: 'anthropic', + model: 'claude-haiku-4-5', + usage: { input: 185_000, output: 0, cache_read: 0, cache_write: 0 }, }; await handleAsync(iii, frame); @@ -203,23 +190,10 @@ describe('backward-compat: prior compaction (anchored update path)', () => { }); const frame = { - groupId: `${largeFixture.session_id}-compat2`, - event: { - data: { - type: 'TurnEnd', - message: { - role: 'assistant', - model: 'claude-haiku-4-5', - provider: 'anthropic', - usage: { - input: 185_000, - output: 0, - cache_read: 0, - cache_write: 0, - }, - }, - }, - }, + session_id: `${largeFixture.session_id}-compat2`, + provider: 'anthropic', + model: 'claude-haiku-4-5', + usage: { input: 185_000, output: 0, cache_read: 0, cache_write: 0 }, }; await handleAsync(iii, frame); diff --git a/harness/tests/context-compaction/integration/flow-async.test.ts b/harness/tests/context-compaction/integration/flow-async.test.ts index 1cc26fcc..5757aefb 100644 --- a/harness/tests/context-compaction/integration/flow-async.test.ts +++ b/harness/tests/context-compaction/integration/flow-async.test.ts @@ -131,7 +131,7 @@ function buildAsyncMock(opts: { } /** - * Build a TurnEnd frame that handleAsync can process. + * Build an on_turn_end queue payload that handleAsync can process. * * @param sessionId - the session identifier * @param totalTokens - total token usage to report (drives overflow check) @@ -139,23 +139,10 @@ function buildAsyncMock(opts: { */ function makeTurnEndFrame(sessionId: string, totalTokens: number, modelId = 'claude-haiku-4-5') { return { - groupId: sessionId, - event: { - data: { - type: 'TurnEnd', - message: { - role: 'assistant', - model: modelId, - provider: 'anthropic', - usage: { - input: totalTokens, - output: 0, - cache_read: 0, - cache_write: 0, - }, - }, - }, - }, + session_id: sessionId, + provider: 'anthropic', + model: modelId, + usage: { input: totalTokens, output: 0, cache_read: 0, cache_write: 0 }, }; } diff --git a/harness/tests/context-compaction/registration.test.ts b/harness/tests/context-compaction/registration.test.ts new file mode 100644 index 00000000..dd427efb --- /dev/null +++ b/harness/tests/context-compaction/registration.test.ts @@ -0,0 +1,24 @@ +import { describe, expect, it, vi } from 'vitest'; +import { register } from '../../src/context-compaction/register.js'; +import type { ISdk } from '../../src/runtime/iii.js'; + +describe('context-compaction registration', () => { + it('registers on_turn_end as a function and no stream trigger (woken by a queue message)', async () => { + const registerFunction = vi.fn(); + const registerTrigger = vi.fn(); + const iii = { + registerFunction, + registerTrigger, + trigger: vi.fn(async () => null), + } as unknown as ISdk; + + await register(iii); + + // Compaction is woken by a turn-orchestrator queue message, not a stream. + expect(registerTrigger).not.toHaveBeenCalled(); + + const fns = registerFunction.mock.calls.map((c) => c[0] as string); + expect(fns).toContain('context-compaction::on_turn_end'); + expect(fns).not.toContain('context-compaction::on_agent_event'); + }); +}); diff --git a/harness/tests/context-compaction/turn-end-subscription.test.ts b/harness/tests/context-compaction/turn-end-subscription.test.ts deleted file mode 100644 index 7d2db72e..00000000 --- a/harness/tests/context-compaction/turn-end-subscription.test.ts +++ /dev/null @@ -1,27 +0,0 @@ -import { describe, expect, it, vi } from 'vitest'; -import { register } from '../../src/context-compaction/register.js'; -import type { ISdk } from '../../src/runtime/iii.js'; - -describe('context-compaction stream subscription', () => { - it('subscribes to agent::turn_end, not the full agent::events firehose', async () => { - const registerTrigger = vi.fn(); - const iii = { - registerFunction: vi.fn(), - registerTrigger, - trigger: vi.fn(async () => null), - } as unknown as ISdk; - - await register(iii); - - const streamTriggers = registerTrigger.mock.calls - .map( - (c) => c[0] as { type?: string; function_id?: string; config?: { stream_name?: string } }, - ) - .filter( - (t) => t?.type === 'stream' && t?.function_id === 'context-compaction::on_agent_event', - ); - - expect(streamTriggers).toHaveLength(1); - expect(streamTriggers[0].config?.stream_name).toBe('agent::turn_end'); - }); -}); diff --git a/harness/tests/integration/parallel-approval-harness.ts b/harness/tests/integration/parallel-approval-harness.ts index 68edee76..0fe49b20 100644 --- a/harness/tests/integration/parallel-approval-harness.ts +++ b/harness/tests/integration/parallel-approval-harness.ts @@ -178,10 +178,9 @@ export function createParallelApprovalHarness(): ParallelApprovalHarness { if (function_id === 'stream::set') { const p = payload as { stream_name?: string; data: AgentEvent }; - // events.ts mirrors every turn_end onto a second `agent::turn_end` - // stream for compaction. Record only the primary `agent::events` - // stream so `emitted` is a faithful one-entry-per-event log. - if (p.stream_name === 'agent::turn_end') return null; + // Only agent::events carries event frames; turn_end now enqueues a + // compaction wake (a separate function_id) rather than mirroring to a + // second stream, so `emitted` stays a faithful one-entry-per-event log. emitted.push(p.data); return null; } diff --git a/harness/tests/turn-orchestrator/events.test.ts b/harness/tests/turn-orchestrator/events.test.ts index 1e604768..62feb8f2 100644 --- a/harness/tests/turn-orchestrator/events.test.ts +++ b/harness/tests/turn-orchestrator/events.test.ts @@ -35,26 +35,33 @@ describe('emit (agent event producer)', () => { expect(streamSets(calls).map((c) => c.payload.stream_name)).toEqual(['agent::events']); }); - it('mirrors a turn_end event onto the dedicated agent::turn_end stream', async () => { + it('enqueues a compaction wake on turn_end instead of mirroring to a stream', async () => { const { iii, calls } = buildSdk(); const event = { type: 'turn_end', - message: { role: 'assistant' }, + message: { + role: 'assistant', + usage: { input: 100, output: 5 }, + provider: 'anthropic', + model: 'claude-haiku-4-5', + }, function_results: [], } as unknown as AgentEvent; await emit(iii, SID, event); - const sets = streamSets(calls); - const streams = sets.map((c) => c.payload.stream_name); - expect(streams).toContain('agent::events'); - expect(streams).toContain('agent::turn_end'); + // The event is written only to agent::events — no dedicated turn_end stream. + expect(streamSets(calls).map((c) => c.payload.stream_name)).toEqual(['agent::events']); - const mirror = sets.find((c) => c.payload.stream_name === 'agent::turn_end'); - expect(mirror?.payload.group_id).toBe(SID); - expect(mirror?.payload.data).toEqual(event); - // Same logical event → identical item_id on both streams (single seq per emit). - expect(new Set(sets.map((c) => c.payload.item_id)).size).toBe(1); + // A typed compaction wake is enqueued to the out-of-band compactor. + const wake = calls.find((c) => c.function_id === 'context-compaction::on_turn_end'); + expect(wake).toBeDefined(); + expect(wake?.payload).toMatchObject({ + session_id: SID, + usage: { input: 100, output: 5 }, + provider: 'anthropic', + model: 'claude-haiku-4-5', + }); }); }); diff --git a/harness/tests/turn-orchestrator/steering-check-layer.test.ts b/harness/tests/turn-orchestrator/steering-check-layer.test.ts deleted file mode 100644 index ad6f567a..00000000 --- a/harness/tests/turn-orchestrator/steering-check-layer.test.ts +++ /dev/null @@ -1,116 +0,0 @@ -import { describe, expect, it, vi } from 'vitest'; -import type { AgentMessage } from '../../src/types/agent-message.js'; -import { - applySteeringCheckOutcome, - processSteeringCheck, - route, -} from '../../src/turn-orchestrator/steering-check/run.js'; -import type { SteeringCheckPorts } from '../../src/turn-orchestrator/steering-check/ports.js'; -import { newRecord } from '../../src/turn-orchestrator/state.js'; - -function stubPorts(overrides: Partial = {}): SteeringCheckPorts { - return { - loadMessages: vi.fn(async () => []), - appendMessages: vi.fn(async () => {}), - checkpoint: vi.fn(async () => {}), - loadRunRequest: vi.fn(async () => ({ - provider: 'openai', - model: 'gpt-4', - mode: null, - system_prompt: '', - function_schemas: [], - })), - saveRunRequest: vi.fn(async () => {}), - emitTurnEnd: vi.fn(async () => {}), - finishSession: vi.fn(async (rec) => { - rec.state = 'stopped'; - }), - emit: vi.fn(async () => {}), - ...overrides, - }; -} - -describe('route', () => { - it.each([ - [true, 'continue_after_function'], - [false, 'end_turn'], - ] as const)('route(%s) -> %s', (has_function_results, expected) => { - expect(route(has_function_results)).toBe(expected); - }); -}); - -describe('processSteeringCheck', () => { - it('returns continue_after_function when function_results present', async () => { - const ports = stubPorts(); - const rec = { - ...newRecord('s1'), - state: 'steering_check' as const, - function_results: [{ role: 'function_result', content: [] }] as never, - }; - - const outcome = await processSteeringCheck(ports, rec); - - expect(outcome).toEqual({ kind: 'continue_after_function' }); - }); - - it('returns max_turns_reached when cap hit on continue path', async () => { - const ports = stubPorts(); - const rec = { - ...newRecord('s1'), - state: 'steering_check' as const, - max_turns: 2, - turn_count: 2, - function_results: [{ role: 'function_result', content: [] }] as never, - }; - - const outcome = await processSteeringCheck(ports, rec); - - expect(outcome).toEqual({ kind: 'max_turns_reached' }); - }); - - it('returns end_turn when no function results', async () => { - const ports = stubPorts(); - const rec = { ...newRecord('s1'), state: 'steering_check' as const }; - - const outcome = await processSteeringCheck(ports, rec); - - expect(outcome).toEqual({ kind: 'end_turn' }); - }); -}); - -describe('applySteeringCheckOutcome', () => { - it('continue_after_function: transitions without reloading messages', async () => { - const loadMessages = vi.fn(async () => []); - const emitTurnEnd = vi.fn(async () => {}); - const ports = stubPorts({ loadMessages, emitTurnEnd }); - const rec = { - ...newRecord('s1'), - state: 'steering_check' as const, - function_results: [{ role: 'function_result', content: [] }] as never, - turn_end_emitted: true, - }; - - await applySteeringCheckOutcome(ports, rec, { kind: 'continue_after_function' }); - - expect(rec.state).toBe('assistant_streaming'); - expect(rec.function_results).toEqual([]); - expect(loadMessages).not.toHaveBeenCalled(); - expect(emitTurnEnd).not.toHaveBeenCalled(); - }); - - it('end_turn: emits turn_end and finishes session', async () => { - const emitTurnEnd = vi.fn(async () => {}); - const finishSession = vi.fn(async (rec) => { - rec.state = 'stopped'; - }); - const ports = stubPorts({ emitTurnEnd, finishSession }); - const rec = { ...newRecord('s1'), state: 'steering_check' as const }; - - await applySteeringCheckOutcome(ports, rec, { kind: 'end_turn' }); - - expect(rec.state).toBe('stopped'); - expect(rec.turn_end_emitted).toBe(true); - expect(emitTurnEnd).toHaveBeenCalledWith('s1', expect.anything(), []); - expect(finishSession).toHaveBeenCalled(); - }); -});