diff --git a/harness/src/approval-gate/settings/add-always-allow.ts b/harness/src/approval-gate/settings/add-always-allow.ts index 7d0083ec..0dccb65a 100644 --- a/harness/src/approval-gate/settings/add-always-allow.ts +++ b/harness/src/approval-gate/settings/add-always-allow.ts @@ -5,7 +5,7 @@ import type { ApprovalSettings } from '../schemas.js'; import type { ISdk } from '../../runtime/iii.js'; import { type MutationReply, functionIdField, sessionIdField } from './types.js'; import { mutationError, ok } from './reply.js'; -import { readSettings, writeSettings } from './store.js'; +import { readSettings, updateSettings } from './store.js'; const PayloadSchema = z.object({ session_id: sessionIdField, @@ -26,15 +26,15 @@ export async function addAlwaysAllow( if (current.always_allow.some((entry) => entry.function_id === function_id)) { return current; } - const next: ApprovalSettings = { - ...current, - always_allow: [ - ...current.always_allow, - { function_id, granted_at: Date.now(), granted_by: 'user_click' }, - ], - }; - await writeSettings(iii, session_id, next); - return next; + // Known race: the pre-read only narrows the duplicate-entry window; concurrent + // adds of the same id can both append. Harmless — matched/removed set-wise. + return updateSettings(iii, session_id, [ + { + type: 'append', + path: 'always_allow', + value: { function_id, granted_at: Date.now(), granted_by: 'user_click' }, + }, + ]); } export function registerAddAlwaysAllow(iii: ISdk): void { diff --git a/harness/src/approval-gate/settings/approve-always.ts b/harness/src/approval-gate/settings/approve-always.ts index 1b0219ee..f09de5ee 100644 --- a/harness/src/approval-gate/settings/approve-always.ts +++ b/harness/src/approval-gate/settings/approve-always.ts @@ -5,7 +5,7 @@ import type { ApprovalSettings } from '../schemas.js'; import type { ISdk } from '../../runtime/iii.js'; import { type MutationReply, functionIdField, sessionIdField } from './types.js'; import { mutationError, ok } from './reply.js'; -import { readSettings, writeSettings } from './store.js'; +import { readSettings, updateSettings } from './store.js'; const PayloadSchema = z.object({ session_id: sessionIdField, @@ -28,15 +28,14 @@ export async function approveAlways( if (current.approved_always.some((entry) => entry.function_id === function_id)) { return current; } - const next: ApprovalSettings = { - ...current, - approved_always: [ - ...current.approved_always, - { function_id, granted_at: Date.now(), granted_by: 'user_click' }, - ], - }; - await writeSettings(iii, session_id, next); - return next; + // Known race: see addAlwaysAllow — pre-read only narrows the duplicate window. + return updateSettings(iii, session_id, [ + { + type: 'append', + path: 'approved_always', + value: { function_id, granted_at: Date.now(), granted_by: 'user_click' }, + }, + ]); } export function registerApproveAlways(iii: ISdk): void { diff --git a/harness/src/approval-gate/settings/remove-always-allow.ts b/harness/src/approval-gate/settings/remove-always-allow.ts index 2a6d3bff..24a26609 100644 --- a/harness/src/approval-gate/settings/remove-always-allow.ts +++ b/harness/src/approval-gate/settings/remove-always-allow.ts @@ -5,7 +5,7 @@ import type { ApprovalSettings } from '../schemas.js'; import type { ISdk } from '../../runtime/iii.js'; import { type MutationReply, functionIdField, sessionIdField } from './types.js'; import { mutationError, ok } from './reply.js'; -import { readSettings, writeSettings } from './store.js'; +import { readSettings, updateSettings } from './store.js'; const PayloadSchema = z.object({ session_id: sessionIdField, @@ -23,12 +23,12 @@ export async function removeAlwaysAllow( function_id: string, ): Promise { const current = await readSettings(iii, session_id); - const next: ApprovalSettings = { - ...current, - always_allow: current.always_allow.filter((entry) => entry.function_id !== function_id), - }; - await writeSettings(iii, session_id, next); - return next; + const always_allow = current.always_allow.filter((entry) => entry.function_id !== function_id); + // No array-element-remove op, so set just the always_allow field (not the whole + // record). Known race: concurrent add/remove on this field is last-writer-wins. + return updateSettings(iii, session_id, [ + { type: 'set', path: 'always_allow', value: always_allow }, + ]); } export function registerRemoveAlwaysAllow(iii: ISdk): void { diff --git a/harness/src/approval-gate/settings/set-mode.ts b/harness/src/approval-gate/settings/set-mode.ts index 604b1b17..b4f61d4c 100644 --- a/harness/src/approval-gate/settings/set-mode.ts +++ b/harness/src/approval-gate/settings/set-mode.ts @@ -4,7 +4,7 @@ import { zodToJsonSchema } from 'zod-to-json-schema'; import { PermissionModeSchema, type ApprovalSettings, type PermissionMode } from '../schemas.js'; import type { ISdk } from '../../runtime/iii.js'; import { mutationError, ok } from './reply.js'; -import { readSettings, writeSettings } from './store.js'; +import { updateSettings } from './store.js'; import { type MutationReply, sessionIdField } from './types.js'; const PayloadSchema = z.object({ @@ -22,10 +22,11 @@ export async function setMode( session_id: string, mode: PermissionMode, ): Promise { - const current = await readSettings(iii, session_id); - const next: ApprovalSettings = { ...current, mode, mode_set_at: Date.now() }; - await writeSettings(iii, session_id, next); - return next; + // Field-scoped, no read: disjoint from add_always_allow/approve_always so both survive. + return updateSettings(iii, session_id, [ + { type: 'set', path: 'mode', value: mode }, + { type: 'set', path: 'mode_set_at', value: Date.now() }, + ]); } export function registerSetMode(iii: ISdk): void { diff --git a/harness/src/approval-gate/settings/store.ts b/harness/src/approval-gate/settings/store.ts index 64ec2899..b7d6d582 100644 --- a/harness/src/approval-gate/settings/store.ts +++ b/harness/src/approval-gate/settings/store.ts @@ -5,50 +5,64 @@ import { type ApprovalSettings, } from '../schemas.js'; import type { ISdk } from '../../runtime/iii.js'; -import { logger } from '../../runtime/otel.js'; +import { createState, type UpdateOp } from '../../runtime/state.js'; import { getDefaultMode } from './default-mode.js'; -/** - * Default settings for a session that has none stored yet. The `mode` - * comes from the harness `permissions.default_mode` (manual | auto | full) - * rather than the hardcoded constant, so the operator-configured default - * applies to new sessions. - */ +// Reads degrade to defaults on failure (tolerant); writes rethrow so handlers +// can reply with mutationError (strict). +const tolerantState = (iii: ISdk) => createState(iii, { tolerant: true }); +const strictState = (iii: ISdk) => createState(iii, { tolerant: false }); + +/** Default settings; `mode` follows the operator-configured permissions.default_mode. */ function defaultSettings(): ApprovalSettings { return { ...DEFAULT_APPROVAL_SETTINGS, mode: getDefaultMode() }; } +/** + * Backfill missing fields from defaults, then validate. Field-scoped writes can + * persist a partial record (the first add_always_allow stores only + * `{ always_allow }`); merging over defaults keeps it valid on read. + */ +export function parseSettings(raw: unknown): ApprovalSettings { + const base = defaultSettings(); + const merged = + raw && typeof raw === 'object' && !Array.isArray(raw) + ? { ...base, ...(raw as Record) } + : base; + const parsed = ApprovalSettingsSchema.safeParse(merged); + return parsed.success ? parsed.data : base; +} + export async function readSettings(iii: ISdk, session_id: string): Promise { - try { - const raw = await iii.trigger({ - function_id: 'state::get', - payload: { scope: SETTINGS_STATE_SCOPE, key: session_id }, - }); - const parsed = ApprovalSettingsSchema.safeParse(raw); - return parsed.success ? parsed.data : defaultSettings(); - } catch (err) { - logger.warn('approval-settings read failed; using defaults', { - session_id, - err: String(err), - }); - return defaultSettings(); - } + const raw = await tolerantState(iii).get({ + scope: SETTINGS_STATE_SCOPE, + key: session_id, + }); + return parseSettings(raw); } -export async function writeSettings( +/** + * Apply field-scoped ops atomically. Each op targets one field, so concurrent + * mutations of different fields compose under the engine's per-key write-lock + * instead of clobbering the whole record (the old read/write-whole lost update). + */ +export async function updateSettings( iii: ISdk, session_id: string, - settings: ApprovalSettings, -): Promise { - await iii.trigger({ - function_id: 'state::set', - payload: { scope: SETTINGS_STATE_SCOPE, key: session_id, value: settings }, + ops: UpdateOp[], +): Promise { + const result = await strictState(iii).update({ + scope: SETTINGS_STATE_SCOPE, + key: session_id, + ops, }); + if (result?.errors && result.errors.length > 0) { + throw new Error(`approval-settings update rejected: ${JSON.stringify(result.errors)}`); + } + return parseSettings(result?.new_value ?? null); } export async function clearSettings(iii: ISdk, session_id: string): Promise { - await iii.trigger({ - function_id: 'state::set', - payload: { scope: SETTINGS_STATE_SCOPE, key: session_id, value: null }, - }); + // Delete the key rather than write a null tombstone; reads default either way. + await strictState(iii).delete({ scope: SETTINGS_STATE_SCOPE, key: session_id }); } diff --git a/harness/src/context-compaction/handler-async.ts b/harness/src/context-compaction/handler-async.ts index 53e0f818..09bbd094 100644 --- a/harness/src/context-compaction/handler-async.ts +++ b/harness/src/context-compaction/handler-async.ts @@ -1,13 +1,14 @@ /** - * Async (TurnEnd-driven) compaction. Mirrors Rust lib.rs::handle_event. - * Accepts both camelCase ({groupId, event:{data}}) and snake_case - * ({group_id, data}) envelopes. + * Async (turn_end-driven) compaction. Woken by a turn-orchestrator queue + * message (`context-compaction::on_turn_end`) carrying a typed + * `{ session_id, usage, provider, model, model_limit? }` payload. */ import { setCurrentSpanAttribute, withSpan } from '@iii-dev/observability'; import type { ISdk } from '../runtime/iii.js'; import { logger } from '../runtime/otel.js'; import type { ModelContextLimit } from '../types/agent-event.js'; +import type { Usage } from '../types/stream-event.js'; import { compactionConfig } from './config.js'; import { isSummarizeOk, @@ -18,38 +19,32 @@ import { acquireLease, releaseLease } from './lease.js'; import { fetchModelLimit } from './model-resolver.js'; import { isOverflow } from './overflow.js'; -export function extractEventPayload( - payload: unknown, -): { session_id: string; event: unknown } | null { +export type OnTurnEndPayload = { + session_id: string; + usage: Usage | null; + provider: string; + model: string; + /** + * Threaded from the turn record's `model_meta` when the catalog resolved at + * turn start; lets the compactor skip its `models::get` fetch. + */ + model_limit?: ModelContextLimit; +}; + +/** Parse the queue payload enqueued by turn-orchestrator at turn_end. */ +export function parseOnTurnEnd(payload: unknown): OnTurnEndPayload | null { if (!payload || typeof payload !== 'object') return null; const obj = payload as Record; - const session_id = - (typeof obj.groupId === 'string' && obj.groupId) || - (typeof obj.group_id === 'string' && obj.group_id) || - null; - if (!session_id) return null; - let event: unknown = null; - if ( - obj.event && - typeof obj.event === 'object' && - 'data' in (obj.event as Record) - ) { - event = (obj.event as Record).data; - } else if ('data' in obj) { - event = obj.data; - } - return { session_id, event }; -} - -export function turnEndUsage(event: unknown): Record | null { - if (!event || typeof event !== 'object') return null; - const obj = event as Record; - const kind = typeof obj.type === 'string' ? obj.type : null; - if (kind !== 'TurnEnd' && kind !== 'turn_end') return null; - const msg = obj.message as Record | undefined; - const usage = msg?.usage; - if (!usage || typeof usage !== 'object') return null; - return usage as Record; + if (typeof obj.session_id !== 'string' || !obj.session_id) return null; + return { + session_id: obj.session_id, + usage: obj.usage && typeof obj.usage === 'object' ? (obj.usage as Usage) : null, + provider: typeof obj.provider === 'string' ? obj.provider : '', + model: typeof obj.model === 'string' ? obj.model : '', + ...(obj.model_limit && typeof obj.model_limit === 'object' + ? { model_limit: obj.model_limit as ModelContextLimit } + : {}), + }; } type ResolvedModel = { @@ -59,27 +54,23 @@ type ResolvedModel = { } | null; /** - * The fields we read off the orchestrator-produced `turn_end` event. The - * message is always an AssistantMessage, so provider/model are present (empty - * strings on synthetic/error turns); model_limit is threaded when the catalog - * resolved at turn start. + * Resolve the model limit from the payload's provider/model, falling back to + * the session's most recent assistant message when either is missing. When the + * payload carries a threaded `model_limit` (catalog resolved at turn start), + * the `models::get` fetch is skipped entirely. */ -type TurnEndEventView = { - message: { provider: string; model: string }; - model_limit?: ModelContextLimit; -}; - -export async function resolveModelFromEvent( +export async function resolveModel( iii: ISdk, session_id: string, - event: unknown, + provider: string, + model: string, + model_limit?: ModelContextLimit, ): Promise { - const ev = event as TurnEndEventView; - let providerID = ev.message.provider || null; - let modelID = ev.message.model || null; + let providerID: string | null = provider || null; + let modelID: string | null = model || null; - if (providerID && modelID && ev.model_limit) { - return { providerID, modelID, modelLimit: ev.model_limit }; + if (providerID && modelID && model_limit) { + return { providerID, modelID, modelLimit: model_limit }; } if (!providerID || !modelID) { @@ -116,40 +107,35 @@ export async function resolveModelFromEvent( return fetchModelLimit(iii, providerID, modelID); } -export async function handleAsync(iii: ISdk, frame: unknown): Promise { +export async function handleAsync(iii: ISdk, payload: unknown): Promise { return withSpan('compaction::async', {}, async () => { - const payload = extractEventPayload(frame); - if (!payload) return; - - const usage = turnEndUsage(payload.event); - if (!usage) return; - - setCurrentSpanAttribute('session_id', payload.session_id); - - const model = await resolveModelFromEvent(iii, payload.session_id, payload.event); + const parsed = parseOnTurnEnd(payload); + if (!parsed || !parsed.usage) return; + const { session_id, usage } = parsed; + + setCurrentSpanAttribute('session_id', session_id); + + const model = await resolveModel( + iii, + session_id, + parsed.provider, + parsed.model, + parsed.model_limit, + ); if (!model) { logger.debug('handler-async: could not resolve model; skipping overflow check', { - session_id: payload.session_id, + session_id, }); return; } - const usageObj = usage as { - input?: number; - output?: number; - cache_read?: number; - cache_write?: number; - }; const tokens_before = - (usageObj.input ?? 0) + - (usageObj.output ?? 0) + - (usageObj.cache_read ?? 0) + - (usageObj.cache_write ?? 0); + (usage.input ?? 0) + (usage.output ?? 0) + (usage.cache_read ?? 0) + (usage.cache_write ?? 0); setCurrentSpanAttribute('tokens_before', tokens_before); if ( !isOverflow({ - tokens: usageObj, + tokens: usage, model: { id: model.modelID, limit: model.modelLimit }, reserved: compactionConfig().reservedTokens, }) @@ -157,33 +143,23 @@ export async function handleAsync(iii: ISdk, frame: unknown): Promise { return; } - const nonce = await acquireLease(iii, payload.session_id, 'compaction'); + const nonce = await acquireLease(iii, session_id, 'compaction'); if (!nonce) { - logger.debug('handler-async: compaction lease held; skipping', { - session_id: payload.session_id, - }); + logger.debug('handler-async: compaction lease held; skipping', { session_id }); return; } try { - const result = await runSummarizeCompaction( - iii, - payload.session_id, - { mode: 'async' }, - model, - ); + const result = await runSummarizeCompaction(iii, session_id, { mode: 'async' }, model); setCurrentSpanAttribute('used_prior_summary', isSummarizeOk(result)); if (isSummarizeOk(result)) { - await publishCompactionDone(iii, payload.session_id, 'async', result); + await publishCompactionDone(iii, session_id, 'async', result); } } catch (err) { - logger.warn('handler-async: compaction failed', { - session_id: payload.session_id, - err: String(err), - }); + logger.warn('handler-async: compaction failed', { session_id, err: String(err) }); } finally { - await releaseLease(iii, payload.session_id, nonce, 'compaction'); + await releaseLease(iii, session_id, nonce, 'compaction'); } }); } diff --git a/harness/src/context-compaction/iii.worker.yaml b/harness/src/context-compaction/iii.worker.yaml index d9e5ec96..2c8eb183 100644 --- a/harness/src/context-compaction/iii.worker.yaml +++ b/harness/src/context-compaction/iii.worker.yaml @@ -4,7 +4,7 @@ language: node deploy: binary manifest: package.json bin: iii-context-compaction -description: Out-of-band session-history compactor. Subscribes to agent::events::TurnEnd, summarises older turns via provider::::stream, and writes a session-tree Compaction entry so the next turn reads a compressed transcript. +description: Out-of-band session-history compactor. Woken by a turn-orchestrator queue message at turn_end, summarises older turns via provider::::stream, and writes a session-tree Compaction entry so the next turn reads a compressed transcript. runtime: kind: node diff --git a/harness/src/context-compaction/lease.ts b/harness/src/context-compaction/lease.ts index 4bcf49d6..cc41edf6 100644 --- a/harness/src/context-compaction/lease.ts +++ b/harness/src/context-compaction/lease.ts @@ -1,10 +1,16 @@ -// Single-writer lease via atomic state::update. The set-and-return-old -// semantics close the read-modify-write race: a concurrent writer sees -// our nonce in old_value and restores its predecessor. +/** + * Compaction/prune leases — thin (scope, ttl) adapters over the shared lease in + * runtime/lease.ts. Maps a LeaseKind to a state scope and the 300s TTL. + */ -import { randomUUID } from 'node:crypto'; import type { ISdk } from '../runtime/iii.js'; -import { stateGet, stateSet, stateUpdate } from '../runtime/state.js'; +import { + acquireLease as acquireLeaseShared, + acquireLeaseWithWait as acquireLeaseWithWaitShared, + mintLeaseNonce, + releaseLease as releaseLeaseShared, +} from '../runtime/lease.js'; +import { stateSet } from '../runtime/state.js'; const COMPACTION_LEASE_SCOPE = 'compaction_lease'; const PRUNE_LEASE_SCOPE = 'prune_lease'; @@ -13,30 +19,19 @@ const LAST_COMPACTION_AT_SCOPE = 'last_compaction_at'; export type LeaseKind = 'compaction' | 'prune'; export const LEASE_TTL_SECS = 300; +const LEASE_TTL_MS = LEASE_TTL_SECS * 1000; -let counter = 0; +export { mintLeaseNonce }; function leaseScope(kind: LeaseKind): string { return kind === 'compaction' ? COMPACTION_LEASE_SCOPE : PRUNE_LEASE_SCOPE; } -export function mintLeaseNonce(): string { - const pid = process.pid; - const nanos = process.hrtime.bigint().toString(); - const seq = counter++; - return `${pid}-${nanos}-${seq}-${randomUUID().slice(0, 8)}`; -} - +/** Epoch-secs of a `{nonce, ts}` lease claim; 0 for any other shape. */ export function readLeaseTimestampSecs(v: unknown): number { if (!v || typeof v !== 'object') return 0; const ts = (v as Record).ts; - if (typeof ts === 'number') return Math.floor(ts / 1000); - return 0; -} - -function isLeaseActive(v: unknown, now_secs: number): boolean { - const ts_secs = readLeaseTimestampSecs(v); - return ts_secs > 0 && now_secs - ts_secs < LEASE_TTL_SECS; + return typeof ts === 'number' ? Math.floor(ts / 1000) : 0; } export async function acquireLease( @@ -44,66 +39,16 @@ export async function acquireLease( session_id: string, kind: LeaseKind = 'compaction', ): Promise { - const scope = leaseScope(kind); - const key = session_id; - const now_ms = Date.now(); - const now_secs = Math.floor(now_ms / 1000); - - // Fast path: skip the atomic set when a valid lease is clearly held. - const existing = await stateGet(iii, scope, key); - if (existing && isLeaseActive(existing, now_secs)) return null; - - const nonce = mintLeaseNonce(); - // path: '' targets FieldPath::root in the engine — set the whole value - // atomically. Without `path`, the engine fails to deserialize the op and - // stateUpdate falls into its catch + returns null. - const result = await stateUpdate(iii, scope, key, [ - { type: 'set', path: '', value: { nonce, ts: now_ms } }, - ]); - // stateUpdate swallows backend errors and returns null. Treat a null - // envelope as "we did NOT acquire the lease" — otherwise a transient - // state-store failure would let every concurrent caller declare itself - // the winner, defeating the whole point of the lease. - if (!result) return null; - const oldValue = result.old_value; - - // If we just overwrote a concurrent acquirer's active lease, restore it - // and bow out. stateUpdate is atomic, so only one caller can see - // old_value == null (or expired) — exactly one winner. - if (oldValue && isLeaseActive(oldValue, now_secs)) { - await stateSet(iii, scope, key, oldValue); - return null; - } - return nonce; + return acquireLeaseShared(iii, leaseScope(kind), session_id, LEASE_TTL_MS); } -// Known race: between the stateGet and the stateSet a new acquirer could -// claim the lease, and our stateSet(null) would clear it. Closing this -// needs a CAS-like primitive in iii-database (delete-if-matches). In -// practice this is bounded by LEASE_TTL_SECS (300s) >> the summariser's -// own timeout (120s), so a duplicate claim resolves itself via TTL. export async function releaseLease( iii: ISdk, session_id: string, ourNonce: string, kind: LeaseKind = 'compaction', ): Promise { - const scope = leaseScope(kind); - const key = session_id; - const stored = await stateGet(iii, scope, key); - const storedNonce = - stored && - typeof stored === 'object' && - typeof (stored as Record).nonce === 'string' - ? ((stored as Record).nonce as string) - : null; - if (storedNonce === ourNonce) { - await stateSet(iii, scope, key, null); - } -} - -export async function stampLastCompaction(iii: ISdk, session_id: string): Promise { - await stateSet(iii, LAST_COMPACTION_AT_SCOPE, session_id, Date.now()); + await releaseLeaseShared(iii, leaseScope(kind), session_id, ourNonce); } export async function acquireLeaseWithWait( @@ -112,13 +57,15 @@ export async function acquireLeaseWithWait( kind: LeaseKind, totalTimeoutMs: number, ): Promise { - const deadline = Date.now() + totalTimeoutMs; - let backoff = 50; - while (true) { - const got = await acquireLease(iii, session_id, kind); - if (got) return got; - if (Date.now() + backoff > deadline) return null; - await new Promise((r) => setTimeout(r, backoff)); - backoff = Math.min(backoff * 2, 500); - } + return acquireLeaseWithWaitShared( + iii, + leaseScope(kind), + session_id, + LEASE_TTL_MS, + totalTimeoutMs, + ); +} + +export async function stampLastCompaction(iii: ISdk, session_id: string): Promise { + await stateSet(iii, LAST_COMPACTION_AT_SCOPE, session_id, Date.now()); } diff --git a/harness/src/context-compaction/main.ts b/harness/src/context-compaction/main.ts index be302bb1..25f184e8 100644 --- a/harness/src/context-compaction/main.ts +++ b/harness/src/context-compaction/main.ts @@ -5,6 +5,6 @@ import { register } from './register.js'; await bootstrapWorker({ name: 'context-compaction', description: - 'Out-of-band session-history compactor. Subscribes to agent::events::TurnEnd and writes a session-tree Compaction entry when the running token count crosses the configured threshold.', + 'Out-of-band session-history compactor. Woken by a turn-orchestrator queue message at turn_end; writes a session-tree Compaction entry when the running token count crosses the configured threshold.', register: (iii) => register(iii), }); diff --git a/harness/src/context-compaction/register.ts b/harness/src/context-compaction/register.ts index 9d407cf6..a101082c 100644 --- a/harness/src/context-compaction/register.ts +++ b/harness/src/context-compaction/register.ts @@ -11,11 +11,6 @@ import { } from './model-resolver.js'; import { prune } from './prune.js'; -// Compaction only acts on turn_end, so it subscribes to the dedicated -// turn_end stream (mirrored by the producer) rather than the full -// agent::events firehose — one wake per turn instead of per event. -const TURN_END_STREAM = 'agent::turn_end'; - // Sized so preserveRecentBudget clamps to its 2k minimum when the real // model is unknown — compaction is best-effort, not fatal. const FALLBACK_MODEL_LIMIT = { @@ -57,14 +52,14 @@ async function resolveExplicitModel( export async function register(iii: ISdk): Promise { iii.registerFunction( - 'context-compaction::on_agent_event', - async (frame: unknown) => { - await handleAsync(iii, frame); + 'context-compaction::on_turn_end', + async (payload: unknown) => { + await handleAsync(iii, payload); return null; }, { description: - 'Internal: subscribes to agent::turn_end; triggers async compaction on TurnEnd when running tokens exceed usable(model).', + 'Internal: woken by a turn-orchestrator queue message at turn_end; triggers async compaction when running tokens exceed usable(model).', }, ); @@ -183,10 +178,4 @@ export async function register(iii: ISdk): Promise { 'User-initiated synchronous compaction of a session. Required: session_id. Optional: model { id, providerID, limit? } to skip auto-resolution. If model is omitted, falls back to (1) most recent assistant message in session-tree, (2) orchestrator run_request.', }, ); - - iii.registerTrigger({ - type: 'stream', - function_id: 'context-compaction::on_agent_event', - config: { stream_name: TURN_END_STREAM }, - }); } diff --git a/harness/src/models-catalog/state.ts b/harness/src/models-catalog/state.ts index 7ca97765..46b63083 100644 --- a/harness/src/models-catalog/state.ts +++ b/harness/src/models-catalog/state.ts @@ -1,31 +1,24 @@ /** - * State-backed reads for the models catalog. Each provider owns one key in - * scope `models` whose value is a `Model[]`. Discovery writes via - * `models::reconcile` (single state set per provider). + * State-backed reads for the models catalog: scope `models`, one Model[] per + * provider key. `models::reconcile` validates entries via isModel before writing + * and state is cleared on deploy, so reads trust the stored shape (no re-parsing). */ import type { ISdk } from '../runtime/iii.js'; import { stateGet, stateListValues } from '../runtime/state.js'; import { type ListFilter, MODELS_SCOPE, type Model, supportsModel } from './types.js'; -/** State key for a provider's catalog array (scope `models`, key = provider id). */ export function providerStateKey(provider: string): string { return provider; } +/** Write-side boundary guard for provider-discovery output (used by models::reconcile). */ export function isModel(v: unknown): v is Model { return Boolean(v && typeof v === 'object' && typeof (v as Model).id === 'string'); } -/** Parse a stored catalog value; only `Model[]` is valid. */ -export function parseModelArray(v: unknown): Model[] { - if (!Array.isArray(v)) return []; - return v.filter(isModel); -} - export async function getProviderModels(iii: ISdk, provider: string): Promise { - const v = await stateGet(iii, MODELS_SCOPE, providerStateKey(provider)); - return parseModelArray(v); + return (await stateGet(iii, MODELS_SCOPE, providerStateKey(provider))) ?? []; } export async function listFromState(iii: ISdk, filter: ListFilter): Promise { @@ -36,11 +29,8 @@ export async function listFromState(iii: ISdk, filter: ListFilter): Promise supportsModel(m, filter.capability!)); } - const entries = await stateListValues(iii, { scope: MODELS_SCOPE }); - const out: Model[] = []; - for (const entry of entries) { - out.push(...parseModelArray(entry)); - } + // Each provider key stores one Model[]; flatten across providers. + const out = (await stateListValues(iii, { scope: MODELS_SCOPE })).flat(); return filter.capability === undefined ? out : out.filter((m) => supportsModel(m, filter.capability!)); diff --git a/harness/src/runtime/lease.ts b/harness/src/runtime/lease.ts new file mode 100644 index 00000000..4e4676c3 --- /dev/null +++ b/harness/src/runtime/lease.ts @@ -0,0 +1,80 @@ +/** + * Single-writer lease over `state::update` — the only atomic read-modify-write + * iii exposes (there's no native lock). Acquire writes a `{nonce, ts}` claim and + * reads the prior value in one atomic op, so exactly one concurrent acquirer + * sees a free/expired prior and wins; a claim older than `ttlMs` reads inactive, + * folding crash recovery into the same op. session-lease and + * context-compaction/lease are thin (scope, ttl) adapters. + */ + +import { randomUUID } from 'node:crypto'; +import type { ISdk } from './iii.js'; +import { stateGet, stateSet, stateUpdate } from './state.js'; + +export type LeaseRecord = { nonce: string; ts: number }; + +let counter = 0; + +export function mintLeaseNonce(): string { + return `${process.pid}-${process.hrtime.bigint()}-${counter++}-${randomUUID().slice(0, 8)}`; +} + +function isLeaseActive(rec: LeaseRecord | null | undefined, nowMs: number, ttlMs: number): boolean { + return rec != null && nowMs - rec.ts < ttlMs; +} + +/** Returns the nonce to release with, or null if another holder owns a valid lease. */ +export async function acquireLease( + iii: ISdk, + scope: string, + key: string, + ttlMs: number, +): Promise { + const now = Date.now(); + if (isLeaseActive(await stateGet(iii, scope, key), now, ttlMs)) return null; + + const nonce = mintLeaseNonce(); + const result = await stateUpdate(iii, scope, key, [ + { type: 'set', path: '', value: { nonce, ts: now } }, + ]); + // A null envelope means the atomic write failed — never treat that as a win, + // or an outage lets every contender through. + if (!result) return null; + + // We clobbered a still-valid claim (always someone else's — our nonce is + // fresh): restore it and bow out. + if (isLeaseActive(result.old_value, now, ttlMs)) { + await stateSet(iii, scope, key, result.old_value); + return null; + } + return nonce; +} + +export async function releaseLease( + iii: ISdk, + scope: string, + key: string, + nonce: string, +): Promise { + const stored = await stateGet(iii, scope, key); + if (stored?.nonce === nonce) await stateSet(iii, scope, key, null); +} + +/** Poll {@link acquireLease} with exponential backoff until acquired or timed out. */ +export async function acquireLeaseWithWait( + iii: ISdk, + scope: string, + key: string, + ttlMs: number, + totalTimeoutMs: number, +): Promise { + const deadline = Date.now() + totalTimeoutMs; + let backoff = 50; + while (true) { + const nonce = await acquireLease(iii, scope, key, ttlMs); + if (nonce) return nonce; + if (Date.now() + backoff > deadline) return null; + await new Promise((r) => setTimeout(r, backoff)); + backoff = Math.min(backoff * 2, 500); + } +} diff --git a/harness/src/runtime/state.ts b/harness/src/runtime/state.ts index 01762249..7e160292 100644 --- a/harness/src/runtime/state.ts +++ b/harness/src/runtime/state.ts @@ -40,38 +40,11 @@ export type CreateStateOptions = { tolerant?: boolean; }; -type StateListGroupsResult = { groups: string[] }; - function normalizeGetResult(v: unknown): T | null { if (v === null || v === undefined) return null; return v as T; } -/** Raw list rows before value unwrap; `null` when the response is not a list. */ -export function stateListResponseRows(response: unknown): unknown[] | null { - if (Array.isArray(response)) return response; - return null; -} - -function unwrapStateListEntry(entry: unknown): T { - if (entry && typeof entry === 'object' && 'value' in (entry as Record)) { - return (entry as Record).value as T; - } - return entry as T; -} - -/** - * Normalizes a `state::list` trigger result to stored values. - * - * Official iii returns a flat `T[]`. Some bridge deployments wrap rows as - * `{ value }`; we accept that shape for compatibility. - */ -export function parseStateListValues(response: unknown): T[] { - const arr = stateListResponseRows(response); - if (!arr) return []; - return arr.map((entry) => unwrapStateListEntry(entry)); -} - export function createState(iii: ISdk, opts: CreateStateOptions = {}): IState { const tolerant = opts.tolerant !== false; @@ -140,11 +113,12 @@ export function createState(iii: ISdk, opts: CreateStateOptions = {}): IState { 'state::list', { scope: input.scope }, async () => { + // state::list returns a flat array of stored values. const resp = await iii.trigger({ function_id: 'state::list', payload: input, }); - return parseStateListValues(resp); + return Array.isArray(resp) ? (resp as TData[]) : []; }, [], ), @@ -165,40 +139,25 @@ export function createState(iii: ISdk, opts: CreateStateOptions = {}): IState { }; } -/** Lists all scope names that contain state data. */ -export async function stateListGroups(iii: ISdk, opts: CreateStateOptions = {}): Promise { - const tolerant = opts.tolerant !== false; - try { - const result = await iii.trigger, StateListGroupsResult | string[]>({ - function_id: 'state::list_groups', - payload: {}, - }); - if (Array.isArray(result)) return result; - return result?.groups ?? []; - } catch (err) { - if (tolerant) { - logger.warn('state::list_groups failed', { err: String(err) }); - return []; - } - throw err; - } -} - // --- Tolerant (scope, key) ergonomics for turn-orchestrator --- const tolerantState = (iii: ISdk) => createState(iii, { tolerant: true }); -export async function stateGet(iii: ISdk, scope: string, key: string): Promise { - return tolerantState(iii).get({ scope, key }); +export async function stateGet( + iii: ISdk, + scope: string, + key: string, +): Promise { + return tolerantState(iii).get({ scope, key }); } -export async function stateSet( +export async function stateSet( iii: ISdk, scope: string, key: string, - value: unknown, -): Promise | null> { - return tolerantState(iii).set({ scope, key, value }); + value: T, +): Promise | null> { + return tolerantState(iii).set({ scope, key, value }); } export async function stateDelete(iii: ISdk, scope: string, key: string): Promise { @@ -209,11 +168,11 @@ export async function stateListValues(iii: ISdk, input: StateListInput): Prom return tolerantState(iii).list(input); } -export async function stateUpdate( +export async function stateUpdate( iii: ISdk, scope: string, key: string, ops: UpdateOp[], -): Promise | null> { - return tolerantState(iii).update({ scope, key, ops }); +): Promise | null> { + return tolerantState(iii).update({ scope, key, ops }); } diff --git a/harness/src/turn-orchestrator/events.ts b/harness/src/turn-orchestrator/events.ts index 07a48b96..1775d7af 100644 --- a/harness/src/turn-orchestrator/events.ts +++ b/harness/src/turn-orchestrator/events.ts @@ -1,7 +1,10 @@ /** * Emit AgentEvent frames on `agent::events`, one per call with a per-session - * monotonic sequence number. `turn_end` frames are additionally mirrored onto - * the dedicated `agent::turn_end` stream (see TURN_END_STREAM). + * monotonic sequence number. `turn_end` frames additionally enqueue a + * compaction wake (`context-compaction::on_turn_end`) on the default queue, so + * the out-of-band compactor runs once per turn instead of consuming the full + * `agent::events` firehose. The enqueue is best-effort: a failure (e.g. the + * compactor isn't deployed) is logged, never fatal. * * The sequence number is kept IN-PROCESS (not persisted): the old per-event * `state::update` increment cost one engine state write — and thus one @@ -16,17 +19,20 @@ */ import { uuidLike } from '../runtime/ids.js'; -import type { ISdk } from '../runtime/iii.js'; +import { TriggerAction, type ISdk } from '../runtime/iii.js'; import { logger } from '../runtime/otel.js'; import type { AgentEvent } from '../types/agent-event.js'; export const EVENTS_STREAM = 'agent::events'; + /** - * Dedicated stream carrying only `turn_end` frames. Compaction subscribes here - * instead of the full `agent::events` firehose so it wakes once per turn rather - * than on every event (token updates, function lifecycle, …). + * Out-of-band compactor woken once per turn. The turn-orchestrator enqueues a + * typed `{ session_id, usage, provider, model, model_limit? }` payload here at + * `turn_end` instead of mirroring the event onto a dedicated stream — a 1:1 + * channel is a queue message, not pub/sub. */ -export const TURN_END_STREAM = 'agent::turn_end'; +const COMPACTION_ON_TURN_END = 'context-compaction::on_turn_end'; +const COMPACTION_QUEUE = 'default'; /** Unique per process run; prefixes every item_id so a restart can't collide. */ const PROCESS_EPOCH = uuidLike(); @@ -78,11 +84,42 @@ async function setStream( } } +/** + * Enqueue an out-of-band compaction wake from a `turn_end` event. The assistant + * message carries everything the compactor needs to decide overflow — token + * usage plus the provider/model it ran on — so we pass them as a typed payload + * rather than making the compactor re-derive them from a stream frame. + */ +async function enqueueCompaction(iii: ISdk, session_id: string, event: AgentEvent): Promise { + const ev = event as { + message?: { usage?: unknown; provider?: unknown; model?: unknown }; + model_limit?: unknown; + }; + const message = ev.message; + try { + await iii.trigger({ + function_id: COMPACTION_ON_TURN_END, + payload: { + session_id, + usage: message?.usage ?? null, + provider: typeof message?.provider === 'string' ? message.provider : '', + model: typeof message?.model === 'string' ? message.model : '', + ...(ev.model_limit && typeof ev.model_limit === 'object' + ? { model_limit: ev.model_limit } + : {}), + }, + action: TriggerAction.Enqueue({ queue: COMPACTION_QUEUE }), + }); + } catch (err) { + logger.warn('compaction on_turn_end enqueue failed', { session_id, err: String(err) }); + } +} + export async function emit(iii: ISdk, session_id: string, event: AgentEvent): Promise { const seq = nextSeq(session_id); const item_id = formatItemId(session_id, seq); await setStream(iii, EVENTS_STREAM, session_id, item_id, event); if (isTurnEnd(event)) { - await setStream(iii, TURN_END_STREAM, session_id, item_id, event); + await enqueueCompaction(iii, session_id, event); } } diff --git a/harness/src/turn-orchestrator/run-request.ts b/harness/src/turn-orchestrator/run-request.ts index 04d8b868..225b4660 100644 --- a/harness/src/turn-orchestrator/run-request.ts +++ b/harness/src/turn-orchestrator/run-request.ts @@ -1,27 +1,22 @@ /** - * The persisted run request and its single typed parser. `loadRunRequest` - * (persistence) parses the raw scope `run_request` value through - * `parseRunRequest` once, so every consumer reads a fully-typed `RunRequest` - * instead of re-guarding `unknown` fields. + * The persisted run request. Writers (`run::start`, then `provisioning`) always + * store a complete record and state is cleared on deploy, so it's read back typed + * (not re-parsed); `loadRunRequest` falls back to defaultRunRequest when absent. */ -import { z } from 'zod'; import type { Mode } from './system-prompt.js'; -const RunRequestSchema = z.object({ - provider: z.string().catch(''), - model: z.string().catch(''), - mode: z - .unknown() - .transform((v): Mode | null => (v === 'plan' || v === 'ask' || v === 'agent' ? v : null)), - system_prompt: z.string().catch(''), - function_schemas: z.array(z.unknown()).catch([]), +export type RunRequest = { + provider: string; + model: string; + mode: Mode | null; + system_prompt: string; + function_schemas: unknown[]; /** Optional reasoning/thinking level ('off'|'minimal'|'low'|'medium'|'high'|'xhigh'). */ - thinking_level: z.string().optional().catch(undefined), -}); + thinking_level?: string; +}; -export type RunRequest = z.infer; - -export function parseRunRequest(raw: unknown): RunRequest { - return RunRequestSchema.parse(raw ?? {}); +/** Empty run request used as the absent-record fallback in `loadRunRequest`. */ +export function defaultRunRequest(): RunRequest { + return { provider: '', model: '', mode: null, system_prompt: '', function_schemas: [] }; } diff --git a/harness/src/turn-orchestrator/run-transition.ts b/harness/src/turn-orchestrator/run-transition.ts index 15c6af52..54c08194 100644 --- a/harness/src/turn-orchestrator/run-transition.ts +++ b/harness/src/turn-orchestrator/run-transition.ts @@ -88,8 +88,8 @@ export async function runTransition( if (!options?.serialize) { return runTransitionInner(iii, state, handle, payload); } - const acquired = await acquireSessionLease(iii, payload.session_id); - if (!acquired) { + const leaseNonce = await acquireSessionLease(iii, payload.session_id); + if (!leaseNonce) { // Another wake holds the session. Retry via the queue once it releases; // by then the persisted state has usually advanced and we stale-skip. throw new TransientError( @@ -99,7 +99,7 @@ export async function runTransition( try { return await runTransitionInner(iii, state, handle, payload); } finally { - await releaseSessionLease(iii, payload.session_id); + await releaseSessionLease(iii, payload.session_id, leaseNonce); } } diff --git a/harness/src/turn-orchestrator/state-runtime/session-lease.ts b/harness/src/turn-orchestrator/state-runtime/session-lease.ts index 90da1598..2ae05a65 100644 --- a/harness/src/turn-orchestrator/state-runtime/session-lease.ts +++ b/harness/src/turn-orchestrator/state-runtime/session-lease.ts @@ -1,69 +1,29 @@ /** - * Per-session mutual-exclusion lease for turn FSM transitions. + * Per-session mutual-exclusion lease for turn FSM transitions — a thin + * (scope, ttl) adapter over the shared lease in runtime/lease.ts. * - * The `turn-step` durable queue has no per-session ordering — `Enqueue` takes - * only a queue name (see iii-sdk `TriggerAction.Enqueue`). So two - * `turn::function_awaiting_approval` wakes for one session (one per - * `approval::resolve` write, fanned out by the `turn::on_approval` state - * trigger) can run concurrently. Without serialization both load the same - * parked `turn_state`, execute every call, and finalize — duplicating side - * effects (the function runs twice) and emitting duplicate - * `function_execution_end` / `turn_end` frames, which wedges the turn. - * - * The only atomic primitive the state worker exposes is `state::update` with - * `increment` — a locked read-modify-write that returns the prior value (the - * kv adapter holds the store write-lock for the whole op). Acquire increments - * a per-session holder counter; the caller that observes prior `0` (or a - * missing key) won and may proceed. Release resets the counter to `0`. - * - * Crash recovery: a holder that dies mid-transition never resets the counter, - * which would wedge the session forever. A contender whose acquire fails - * therefore steals a lease whose recorded acquire time is older than - * {@link LEASE_TTL_MS}. The steal is best-effort (a post-crash window can let - * two contenders through), but that degrades to the pre-fix behavior only - * briefly after a crash — far better than a permanent deadlock. + * The `turn-step` durable queue has no per-session ordering, so two + * `turn::function_awaiting_approval` wakes (one per `approval::resolve`, fanned + * out by `turn::on_approval`) can run concurrently and double-execute the parked + * turn. The lease lets one through; the loser throws TransientError and retries + * via the queue, by which point the state has usually advanced and it stale-skips. */ import type { ISdk } from '../../runtime/iii.js'; -import { stateGet, stateSet, stateUpdate } from '../../runtime/state.js'; +import { acquireLease, releaseLease } from '../../runtime/lease.js'; export const LEASE_SCOPE = 'turn_lease'; -export const LEASE_AT_SCOPE = 'turn_lease_at'; /** A holder older than this is assumed crashed and may be stolen. */ export const LEASE_TTL_MS = 30_000; -/** Atomically bump the holder counter; returns the prior count (0 when free/missing). */ -async function bumpHolders(iii: ISdk, session_id: string): Promise { - const res = await stateUpdate(iii, LEASE_SCOPE, session_id, [ - { type: 'increment', path: '', by: 1 }, - ]); - const prior = (res as { old_value?: unknown } | null)?.old_value; - return typeof prior === 'number' ? prior : 0; -} - -/** - * Try to acquire the session lease. Returns `true` when this caller holds it - * and must call {@link releaseSessionLease}; `false` when another transition - * holds it (the caller should back off / retry). - */ -export async function acquireSessionLease(iii: ISdk, session_id: string): Promise { - if ((await bumpHolders(iii, session_id)) === 0) { - await stateSet(iii, LEASE_AT_SCOPE, session_id, Date.now()); - return true; - } - // Contended — recover a lease abandoned by a crashed holder. - const acquiredAt = await stateGet(iii, LEASE_AT_SCOPE, session_id); - if (typeof acquiredAt === 'number' && Date.now() - acquiredAt > LEASE_TTL_MS) { - await stateSet(iii, LEASE_SCOPE, session_id, 0); - if ((await bumpHolders(iii, session_id)) === 0) { - await stateSet(iii, LEASE_AT_SCOPE, session_id, Date.now()); - return true; - } - } - return false; +export async function acquireSessionLease(iii: ISdk, session_id: string): Promise { + return acquireLease(iii, LEASE_SCOPE, session_id, LEASE_TTL_MS); } -/** Release the session lease. Safe to call only from the holder. */ -export async function releaseSessionLease(iii: ISdk, session_id: string): Promise { - await stateSet(iii, LEASE_SCOPE, session_id, 0); +export async function releaseSessionLease( + iii: ISdk, + session_id: string, + nonce: string, +): Promise { + await releaseLease(iii, LEASE_SCOPE, session_id, nonce); } diff --git a/harness/src/turn-orchestrator/state-runtime/store.ts b/harness/src/turn-orchestrator/state-runtime/store.ts index 9caa640a..cbdcd5f3 100644 --- a/harness/src/turn-orchestrator/state-runtime/store.ts +++ b/harness/src/turn-orchestrator/state-runtime/store.ts @@ -9,9 +9,9 @@ import { logger } from '../../runtime/otel.js'; import type { AgentMessage } from '../../types/agent-message.js'; import { RUN_REQUEST_SCOPE, TURN_STATE_SCOPE } from '../state.js'; import { emit } from '../events.js'; -import { type RunRequest, parseRunRequest } from '../run-request.js'; +import { type RunRequest, defaultRunRequest } from '../run-request.js'; import { toView, type TurnStateView } from '../schemas.js'; -import { type TurnState, type TurnStateRecord, parseTurnStateRecord } from '../state.js'; +import { type TurnState, type TurnStateRecord } from '../state.js'; import { loadContextView, loadSessionMessages } from './context-view.js'; import { persistedTrailingResultIds } from './transcript.js'; @@ -93,11 +93,6 @@ export type TurnStore = { saveRunRequest(session_id: string, request: RunRequest): Promise; }; -const scopedGet = (iii: ISdk, scope: string, session_id: string) => - stateGet(iii, scope, session_id); -const scopedSet = (iii: ISdk, scope: string, session_id: string, value: unknown) => - stateSet(iii, scope, session_id, value); - /** * Create the session-tree record if absent. Idempotent, but invoked exactly * once per run (at `run::start`) rather than wrapping every read/write — the @@ -139,8 +134,8 @@ async function persistRecord( rec: TurnStateRecord, previous?: TurnStateRecord | null, ): Promise { - const result = await scopedSet(iii, TURN_STATE_SCOPE, rec.session_id, rec); - const prev = previous !== undefined ? previous : parseTurnStateRecord(result?.old_value ?? null); + const result = await stateSet(iii, TURN_STATE_SCOPE, rec.session_id, rec); + const prev = previous !== undefined ? previous : (result?.old_value ?? null); const nextView = toView(rec); const prevView = prev != null ? toView(prev) : undefined; @@ -165,16 +160,17 @@ export function createTurnStore(iii: ISdk): TurnStore { return { async loadRecord(session_id) { - return parseTurnStateRecord(await scopedGet(iii, TURN_STATE_SCOPE, session_id)); + // null = absent (no session); otherwise a record this version wrote. + return stateGet(iii, TURN_STATE_SCOPE, session_id); }, async loadRecordStrict(session_id) { - const raw = await strictState.get({ scope: TURN_STATE_SCOPE, key: session_id }); - return parseTurnStateRecord(raw); + // null = absent (no session); otherwise a record this version wrote. + return strictState.get({ scope: TURN_STATE_SCOPE, key: session_id }); }, async writeRecord(rec) { - await scopedSet(iii, TURN_STATE_SCOPE, rec.session_id, rec); + await stateSet(iii, TURN_STATE_SCOPE, rec.session_id, rec); }, async saveRecord(rec, previous) { @@ -207,11 +203,13 @@ export function createTurnStore(iii: ISdk): TurnStore { }, async saveRunRequest(session_id, request) { - await scopedSet(iii, RUN_REQUEST_SCOPE, session_id, request); + await stateSet(iii, RUN_REQUEST_SCOPE, session_id, request); }, async loadRunRequest(session_id) { - return parseRunRequest(await scopedGet(iii, RUN_REQUEST_SCOPE, session_id)); + return ( + (await stateGet(iii, RUN_REQUEST_SCOPE, session_id)) ?? defaultRunRequest() + ); }, }; } diff --git a/harness/src/turn-orchestrator/state-runtime/turn-end.ts b/harness/src/turn-orchestrator/state-runtime/turn-end.ts index 88e4fb2f..a01d6714 100644 --- a/harness/src/turn-orchestrator/state-runtime/turn-end.ts +++ b/harness/src/turn-orchestrator/state-runtime/turn-end.ts @@ -1,5 +1,5 @@ /** - * Shared turn-end and FSM resume helpers for step outcome application. + * Shared turn-end helper for step outcome application. */ import { limitFromModel } from '../../context-compaction/model-resolver.js'; diff --git a/harness/src/turn-orchestrator/state.ts b/harness/src/turn-orchestrator/state.ts index 5b0435f8..d9d2f81e 100644 --- a/harness/src/turn-orchestrator/state.ts +++ b/harness/src/turn-orchestrator/state.ts @@ -1,12 +1,9 @@ /** - * TurnState + TurnStateRecord types and parsers. - * - * Persistence uses semantic iii scopes (`turn_state`, `run_request`, …). Conversation - * history lives in `session-tree::*` and is reconstructed at read time. - * keyed by `session_id`. Recovery lists scope `turn_state` via {@link parseTurnStateRecord}. + * TurnState + TurnStateRecord types and constructors. Persisted under iii scope + * `turn_state` keyed by session_id; conversation history lives in `session-tree::*`. + * State is cleared on deploy, so records are read back typed (no re-parsing). */ -import { z } from 'zod'; import type { Model } from '../models-catalog/types.js'; import type { AssistantMessage, FunctionResultMessage } from '../types/agent-message.js'; import type { ExecutedCall, FunctionBatchWork, PreparedCall } from './function-execute/types.js'; @@ -88,34 +85,6 @@ export type TurnStateRecord = awaiting_approval?: AwaitingApprovalEntry[]; }); -const TURN_STATES = [ - 'provisioning', - 'assistant_streaming', - 'function_execute', - 'function_awaiting_approval', - 'finishing', - 'stopped', - 'failed', -] as const satisfies readonly TurnState[]; - -/** Minimal structural guard for persisted turn_state — nested fields pass through. */ -const TurnStateRecordSchema = z - .object({ - session_id: z.string(), - state: z.enum(TURN_STATES), - turn_count: z.number().catch(0), - function_results: z.array(z.unknown()).catch([]), - turn_end_emitted: z.boolean().catch(false), - started_at_ms: z.number().catch(0), - updated_at_ms: z.number().catch(0), - }) - .passthrough(); - -export function parseTurnStateRecord(raw: unknown): TurnStateRecord | null { - const result = TurnStateRecordSchema.safeParse(raw); - return result.success ? (result.data as TurnStateRecord) : null; -} - export function newRecord(session_id: string, max_turns?: number): TurnStateRecord { const now = Date.now(); return { diff --git a/harness/tests/approval-gate/settings.test.ts b/harness/tests/approval-gate/settings.test.ts index e8b55c61..512c12ba 100644 --- a/harness/tests/approval-gate/settings.test.ts +++ b/harness/tests/approval-gate/settings.test.ts @@ -5,12 +5,17 @@ import { addAlwaysAllow } from '../../src/approval-gate/settings/add-always-allo import { approveAlways } from '../../src/approval-gate/settings/approve-always.js'; import { removeAlwaysAllow } from '../../src/approval-gate/settings/remove-always-allow.js'; import { setMode } from '../../src/approval-gate/settings/set-mode.js'; -import { readSettings } from '../../src/approval-gate/settings/store.js'; +import { clearSettings, readSettings } from '../../src/approval-gate/settings/store.js'; import { SETTINGS_STATE_SCOPE } from '../../src/approval-gate/schemas.js'; interface TriggerCall { function_id: string; - payload: { scope: string; key: string; value?: unknown }; + payload: { + scope: string; + key: string; + value?: unknown; + ops?: Array<{ type: string; path?: string; value?: unknown }>; + }; } function makeIii(initial: unknown = null) { @@ -27,6 +32,30 @@ function makeIii(initial: unknown = null) { else store.set(req.payload.key, req.payload.value); return null; } + if (req.function_id === 'state::delete') { + const old_value = store.get(req.payload.key) ?? null; + store.delete(req.payload.key); + return { old_value }; + } + // Atomic field-scoped update (synchronous read-modify-write) modelling the + // `set` and `append` ops the settings mutations use. + if (req.function_id === 'state::update') { + const old_value = store.get(req.payload.key) ?? null; + const rec: Record = + old_value && typeof old_value === 'object' && !Array.isArray(old_value) + ? { ...(old_value as Record) } + : {}; + for (const op of req.payload.ops ?? []) { + const path = op.path ?? ''; + if (op.type === 'set') rec[path] = op.value; + else if (op.type === 'append') { + const arr = Array.isArray(rec[path]) ? (rec[path] as unknown[]) : []; + rec[path] = [...arr, op.value]; + } + } + store.set(req.payload.key, rec); + return { old_value, new_value: rec }; + } throw new Error(`unexpected trigger ${req.function_id}`); }); return { iii: { trigger } as unknown as ISdk, calls, store }; @@ -41,6 +70,19 @@ describe('approval-gate settings', () => { expect(s.mode_set_at).toBe(0); }); + it('clearSettings deletes the key (no null tombstone) and reverts to defaults', async () => { + const { iii, store, calls } = makeIii(); + await setMode(iii, 'sess-1', 'auto'); + expect(store.has('sess-1')).toBe(true); + + await clearSettings(iii, 'sess-1'); + + expect(store.has('sess-1')).toBe(false); + expect(calls.some((c) => c.function_id === 'state::delete')).toBe(true); + const s = await readSettings(iii, 'sess-1'); + expect(s.mode).toBe('manual'); + }); + it('setMode persists with mode_set_at > 0', async () => { const { iii, store } = makeIii(); const result = await setMode(iii, 'sess-1', 'auto'); @@ -87,11 +129,60 @@ describe('approval-gate settings', () => { it('writes go to the SETTINGS_STATE_SCOPE keyed by session_id', async () => { const { iii, calls } = makeIii(); await setMode(iii, 'sess-1', 'full'); - const write = calls.find((c) => c.function_id === 'state::set'); + const write = calls.find( + (c) => c.function_id === 'state::set' || c.function_id === 'state::update', + ); expect(write?.payload.scope).toBe(SETTINGS_STATE_SCOPE); expect(write?.payload.key).toBe('sess-1'); }); + // --- Atomic field-scoped mutations: disjoint fields must not clobber. --- + + it('concurrent setMode and addAlwaysAllow both persist (no lost update)', async () => { + const { iii } = makeIii(); + await Promise.all([ + setMode(iii, 'sess-1', 'auto'), + addAlwaysAllow(iii, 'sess-1', 'shell::exec'), + ]); + const s = await readSettings(iii, 'sess-1'); + expect(s.mode).toBe('auto'); + expect(s.always_allow.map((e) => e.function_id)).toEqual(['shell::exec']); + }); + + it('concurrent setMode and approveAlways both persist (no lost update)', async () => { + const { iii } = makeIii(); + await Promise.all([ + setMode(iii, 'sess-1', 'full'), + approveAlways(iii, 'sess-1', 'shell::exec'), + ]); + const s = await readSettings(iii, 'sess-1'); + expect(s.mode).toBe('full'); + expect(s.approved_always.map((e) => e.function_id)).toEqual(['shell::exec']); + }); + + it('concurrent adds of different ids both land (append composition)', async () => { + const { iii } = makeIii(); + await Promise.all([ + addAlwaysAllow(iii, 'sess-1', 'a::one'), + addAlwaysAllow(iii, 'sess-1', 'b::two'), + ]); + const s = await readSettings(iii, 'sess-1'); + expect(s.always_allow.map((e) => e.function_id).sort()).toEqual(['a::one', 'b::two']); + }); + + it('readSettings backfills a partial record and keeps the configured default mode', async () => { + // A field-scoped append can persist only { always_allow }; the read must + // still produce a complete, valid record (not fall back to all-defaults). + const { iii } = makeIii({ + always_allow: [{ function_id: 'shell::exec', granted_at: 1, granted_by: 'user_click' }], + }); + const s = await readSettings(iii, 'sess-1'); + expect(s.always_allow.map((e) => e.function_id)).toEqual(['shell::exec']); + expect(s.mode).toBe('manual'); + expect(s.approved_always).toEqual([]); + expect(s.mode_set_at).toBe(0); + }); + it('isHumanOnlyApprovalFunction catches every settings handler id', () => { expect(isHumanOnlyApprovalFunction('approval::set_mode')).toBe(true); expect(isHumanOnlyApprovalFunction('approval::add_always_allow')).toBe(true); diff --git a/harness/tests/context-compaction/compaction-done-emit.test.ts b/harness/tests/context-compaction/compaction-done-emit.test.ts index 499f6c75..7bc9bdf3 100644 --- a/harness/tests/context-compaction/compaction-done-emit.test.ts +++ b/harness/tests/context-compaction/compaction-done-emit.test.ts @@ -188,17 +188,10 @@ describe('handleAsync emits compaction_done', () => { try { await handleAsync(iii, { - groupId: 'sess-async-1', - event: { - data: { - type: 'TurnEnd', - message: { - provider: 'anthropic', - model: 'claude-haiku-4-5', - usage: { input: 200_000, output: 50_000 }, - }, - }, - }, + session_id: 'sess-async-1', + provider: 'anthropic', + model: 'claude-haiku-4-5', + usage: { input: 200_000, output: 50_000 }, }); const events = streamSetCalls.filter((c) => c.stream_name === 'agent::events'); @@ -217,17 +210,10 @@ describe('handleAsync emits compaction_done', () => { const { iii, streamSetCalls } = makeStubIii(); await handleAsync(iii, { - groupId: 'sess-async-noop', - event: { - data: { - type: 'TurnEnd', - message: { - provider: 'anthropic', - model: 'claude-haiku-4-5', - usage: { input: 100, output: 50 }, - }, - }, - }, + session_id: 'sess-async-noop', + provider: 'anthropic', + model: 'claude-haiku-4-5', + usage: { input: 100, output: 50 }, }); const events = streamSetCalls.filter((c) => c.stream_name === 'agent::events'); diff --git a/harness/tests/context-compaction/handler-async.test.ts b/harness/tests/context-compaction/handler-async.test.ts index c65751f5..91ca0dc9 100644 --- a/harness/tests/context-compaction/handler-async.test.ts +++ b/harness/tests/context-compaction/handler-async.test.ts @@ -1,66 +1,52 @@ import { describe, expect, it, vi } from 'vitest'; -import { - extractEventPayload, - resolveModelFromEvent, - turnEndUsage, -} from '../../src/context-compaction/handler-async.js'; +import { parseOnTurnEnd, resolveModel } from '../../src/context-compaction/handler-async.js'; import type { ISdk } from '../../src/runtime/iii.js'; -describe('extractEventPayload', () => { - it('handles camelCase envelope (event.data shape)', () => { - const env = { - groupId: 'sess-1', - event: { data: { type: 'TurnEnd', message: {} } }, - }; - const out = extractEventPayload(env); - expect(out?.session_id).toBe('sess-1'); - expect((out?.event as { type: string }).type).toBe('TurnEnd'); - }); - - it('handles snake_case envelope (top-level data shape)', () => { - const env = { group_id: 'sess-2', data: { type: 'TurnEnd' } }; - const out = extractEventPayload(env); - expect(out?.session_id).toBe('sess-2'); - expect((out?.event as { type: string }).type).toBe('TurnEnd'); - }); - - it('returns null when session id is missing', () => { - expect(extractEventPayload({ data: { type: 'TurnEnd' } })).toBeNull(); - expect(extractEventPayload(null)).toBeNull(); - expect(extractEventPayload(42)).toBeNull(); +describe('parseOnTurnEnd', () => { + it('parses a well-formed turn_end payload', () => { + const out = parseOnTurnEnd({ + session_id: 'sess-1', + usage: { input: 100, output: 50, cache_read: 800 }, + provider: 'anthropic', + model: 'claude-haiku-4-5', + }); + expect(out).toEqual({ + session_id: 'sess-1', + usage: { input: 100, output: 50, cache_read: 800 }, + provider: 'anthropic', + model: 'claude-haiku-4-5', + }); }); -}); -describe('turnEndUsage', () => { - it('extracts usage on TurnEnd', () => { - const event = { - type: 'TurnEnd', - message: { usage: { input: 100, output: 50, cache_read: 800 } }, - }; - expect(turnEndUsage(event)).toEqual({ input: 100, output: 50, cache_read: 800 }); + it('defaults provider/model to empty strings when absent (handler falls back to session-tree)', () => { + const out = parseOnTurnEnd({ session_id: 'sess-2', usage: { input: 10 } }); + expect(out).toEqual({ session_id: 'sess-2', usage: { input: 10 }, provider: '', model: '' }); }); - it('extracts usage on turn_end (snake_case variant)', () => { - const event = { - type: 'turn_end', - message: { usage: { input: 200, output: 30 } }, - }; - expect(turnEndUsage(event)).toEqual({ input: 200, output: 30 }); + it('returns null usage when usage is missing or malformed', () => { + expect(parseOnTurnEnd({ session_id: 'sess-3' })?.usage).toBeNull(); + expect(parseOnTurnEnd({ session_id: 'sess-3', usage: 'nope' })?.usage).toBeNull(); }); - it('returns null for non-TurnEnd events', () => { - for (const kind of ['TurnStart', 'MessageStart', 'AgentStart']) { - expect(turnEndUsage({ type: kind, message: { usage: { input: 9999 } } })).toBeNull(); - } + it('returns null when session_id is missing', () => { + expect(parseOnTurnEnd({ usage: { input: 1 } })).toBeNull(); + expect(parseOnTurnEnd({ session_id: '' })).toBeNull(); + expect(parseOnTurnEnd(null)).toBeNull(); + expect(parseOnTurnEnd(42)).toBeNull(); }); - it('returns null when usage is missing', () => { - expect(turnEndUsage({ type: 'TurnEnd', message: {} })).toBeNull(); - expect(turnEndUsage({ type: 'TurnEnd' })).toBeNull(); + it('passes a threaded model_limit through and drops a malformed one', () => { + const limit = { context: 200_000, input: 200_000, output: 64_000 }; + expect(parseOnTurnEnd({ session_id: 'sess-4', model_limit: limit })?.model_limit).toEqual( + limit, + ); + expect(parseOnTurnEnd({ session_id: 'sess-4', model_limit: 'nope' })).not.toHaveProperty( + 'model_limit', + ); }); }); -describe('resolveModelFromEvent', () => { +describe('resolveModel', () => { function trackingIii(): { iii: ISdk; calls: string[] } { const calls: string[] = []; const iii = { @@ -72,15 +58,14 @@ describe('resolveModelFromEvent', () => { return { iii, calls }; } - it('uses the inline limit and skips models::get', async () => { + it('uses the threaded limit and skips models::get', async () => { const { iii, calls } = trackingIii(); - const event = { - type: 'turn_end', - message: { provider: 'anthropic', model: 'claude-sonnet-4-6' }, - model_limit: { context: 1_000_000, input: 1_000_000, output: 64_000 }, - }; - const resolved = await resolveModelFromEvent(iii, 'sess-1', event); + const resolved = await resolveModel(iii, 'sess-1', 'anthropic', 'claude-sonnet-4-6', { + context: 1_000_000, + input: 1_000_000, + output: 64_000, + }); expect(calls).not.toContain('models::get'); expect(resolved).toEqual({ @@ -90,14 +75,10 @@ describe('resolveModelFromEvent', () => { }); }); - it('falls back to models::get when no inline limit is present', async () => { + it('falls back to models::get when no threaded limit is present', async () => { const { iii, calls } = trackingIii(); - const event = { - type: 'turn_end', - message: { provider: 'anthropic', model: 'claude-sonnet-4-6' }, - }; - await resolveModelFromEvent(iii, 'sess-1', event); + await resolveModel(iii, 'sess-1', 'anthropic', 'claude-sonnet-4-6'); expect(calls).toContain('models::get'); }); diff --git a/harness/tests/context-compaction/integration/backward-compat.test.ts b/harness/tests/context-compaction/integration/backward-compat.test.ts index c71e53b8..33ce8d3a 100644 --- a/harness/tests/context-compaction/integration/backward-compat.test.ts +++ b/harness/tests/context-compaction/integration/backward-compat.test.ts @@ -156,25 +156,12 @@ describe('backward-compat: prior compaction (anchored update path)', () => { compactPayloads, }); - // Fire a TurnEnd that triggers overflow + // Fire a turn_end that triggers overflow const frame = { - groupId: `${largeFixture.session_id}-compat`, - event: { - data: { - type: 'TurnEnd', - message: { - role: 'assistant', - model: 'claude-haiku-4-5', - provider: 'anthropic', - usage: { - input: 185_000, - output: 0, - cache_read: 0, - cache_write: 0, - }, - }, - }, - }, + session_id: `${largeFixture.session_id}-compat`, + provider: 'anthropic', + model: 'claude-haiku-4-5', + usage: { input: 185_000, output: 0, cache_read: 0, cache_write: 0 }, }; await handleAsync(iii, frame); @@ -203,23 +190,10 @@ describe('backward-compat: prior compaction (anchored update path)', () => { }); const frame = { - groupId: `${largeFixture.session_id}-compat2`, - event: { - data: { - type: 'TurnEnd', - message: { - role: 'assistant', - model: 'claude-haiku-4-5', - provider: 'anthropic', - usage: { - input: 185_000, - output: 0, - cache_read: 0, - cache_write: 0, - }, - }, - }, - }, + session_id: `${largeFixture.session_id}-compat2`, + provider: 'anthropic', + model: 'claude-haiku-4-5', + usage: { input: 185_000, output: 0, cache_read: 0, cache_write: 0 }, }; await handleAsync(iii, frame); diff --git a/harness/tests/context-compaction/integration/flow-async.test.ts b/harness/tests/context-compaction/integration/flow-async.test.ts index 1cc26fcc..5757aefb 100644 --- a/harness/tests/context-compaction/integration/flow-async.test.ts +++ b/harness/tests/context-compaction/integration/flow-async.test.ts @@ -131,7 +131,7 @@ function buildAsyncMock(opts: { } /** - * Build a TurnEnd frame that handleAsync can process. + * Build an on_turn_end queue payload that handleAsync can process. * * @param sessionId - the session identifier * @param totalTokens - total token usage to report (drives overflow check) @@ -139,23 +139,10 @@ function buildAsyncMock(opts: { */ function makeTurnEndFrame(sessionId: string, totalTokens: number, modelId = 'claude-haiku-4-5') { return { - groupId: sessionId, - event: { - data: { - type: 'TurnEnd', - message: { - role: 'assistant', - model: modelId, - provider: 'anthropic', - usage: { - input: totalTokens, - output: 0, - cache_read: 0, - cache_write: 0, - }, - }, - }, - }, + session_id: sessionId, + provider: 'anthropic', + model: modelId, + usage: { input: totalTokens, output: 0, cache_read: 0, cache_write: 0 }, }; } diff --git a/harness/tests/context-compaction/registration.test.ts b/harness/tests/context-compaction/registration.test.ts new file mode 100644 index 00000000..dd427efb --- /dev/null +++ b/harness/tests/context-compaction/registration.test.ts @@ -0,0 +1,24 @@ +import { describe, expect, it, vi } from 'vitest'; +import { register } from '../../src/context-compaction/register.js'; +import type { ISdk } from '../../src/runtime/iii.js'; + +describe('context-compaction registration', () => { + it('registers on_turn_end as a function and no stream trigger (woken by a queue message)', async () => { + const registerFunction = vi.fn(); + const registerTrigger = vi.fn(); + const iii = { + registerFunction, + registerTrigger, + trigger: vi.fn(async () => null), + } as unknown as ISdk; + + await register(iii); + + // Compaction is woken by a turn-orchestrator queue message, not a stream. + expect(registerTrigger).not.toHaveBeenCalled(); + + const fns = registerFunction.mock.calls.map((c) => c[0] as string); + expect(fns).toContain('context-compaction::on_turn_end'); + expect(fns).not.toContain('context-compaction::on_agent_event'); + }); +}); diff --git a/harness/tests/context-compaction/turn-end-subscription.test.ts b/harness/tests/context-compaction/turn-end-subscription.test.ts deleted file mode 100644 index 7d2db72e..00000000 --- a/harness/tests/context-compaction/turn-end-subscription.test.ts +++ /dev/null @@ -1,27 +0,0 @@ -import { describe, expect, it, vi } from 'vitest'; -import { register } from '../../src/context-compaction/register.js'; -import type { ISdk } from '../../src/runtime/iii.js'; - -describe('context-compaction stream subscription', () => { - it('subscribes to agent::turn_end, not the full agent::events firehose', async () => { - const registerTrigger = vi.fn(); - const iii = { - registerFunction: vi.fn(), - registerTrigger, - trigger: vi.fn(async () => null), - } as unknown as ISdk; - - await register(iii); - - const streamTriggers = registerTrigger.mock.calls - .map( - (c) => c[0] as { type?: string; function_id?: string; config?: { stream_name?: string } }, - ) - .filter( - (t) => t?.type === 'stream' && t?.function_id === 'context-compaction::on_agent_event', - ); - - expect(streamTriggers).toHaveLength(1); - expect(streamTriggers[0].config?.stream_name).toBe('agent::turn_end'); - }); -}); diff --git a/harness/tests/integration/parallel-approval-harness.ts b/harness/tests/integration/parallel-approval-harness.ts index f10a00a5..0fe49b20 100644 --- a/harness/tests/integration/parallel-approval-harness.ts +++ b/harness/tests/integration/parallel-approval-harness.ts @@ -152,14 +152,12 @@ export function createParallelApprovalHarness(): ParallelApprovalHarness { } if (function_id === 'state::update') { - // Faithful atomic read-modify-write per (scope, key): the engine's - // kv adapter holds the store write-lock for the whole op, so - // increment returns the prior value (null/absent → treated as 0). - // Both the event counter and the per-session lease depend on this. + // Atomic read-modify-write returning the prior value. Models the + // `increment` op (event counter) and root-path `set` op (lease). const p = payload as { scope: string; key: string; - ops?: Array<{ type: string; path?: string; by?: number }>; + ops?: Array<{ type: string; path?: string; by?: number; value?: unknown }>; }; const storeKey = `${p.scope}/${p.key}`; const old_value = stateStore.has(storeKey) @@ -167,8 +165,11 @@ export function createParallelApprovalHarness(): ParallelApprovalHarness { : null; let next: unknown = old_value; for (const op of p.ops ?? []) { - if (op.type === 'increment' && (op.path ?? '') === '') { + if ((op.path ?? '') !== '') continue; + if (op.type === 'increment') { next = (typeof next === 'number' ? next : 0) + (op.by ?? 1); + } else if (op.type === 'set') { + next = op.value; } } stateStore.set(storeKey, next); @@ -177,10 +178,9 @@ export function createParallelApprovalHarness(): ParallelApprovalHarness { if (function_id === 'stream::set') { const p = payload as { stream_name?: string; data: AgentEvent }; - // events.ts mirrors every turn_end onto a second `agent::turn_end` - // stream for compaction. Record only the primary `agent::events` - // stream so `emitted` is a faithful one-entry-per-event log. - if (p.stream_name === 'agent::turn_end') return null; + // Only agent::events carries event frames; turn_end now enqueues a + // compaction wake (a separate function_id) rather than mirroring to a + // second stream, so `emitted` stays a faithful one-entry-per-event log. emitted.push(p.data); return null; } diff --git a/harness/tests/models-catalog/state.test.ts b/harness/tests/models-catalog/state.test.ts index dca0499b..6aab9e06 100644 --- a/harness/tests/models-catalog/state.test.ts +++ b/harness/tests/models-catalog/state.test.ts @@ -1,12 +1,12 @@ import { describe, expect, it } from 'vitest'; -import { parseModelArray, providerStateKey } from '../../src/models-catalog/state.js'; +import { isModel, providerStateKey } from '../../src/models-catalog/state.js'; describe('provider catalog state helpers', () => { it('providerStateKey uses bare provider id', () => { expect(providerStateKey('anthropic')).toBe('anthropic'); }); - it('parseModelArray accepts only Model[]', () => { + it('isModel guards the write-side boundary (object with a string id)', () => { const model = { id: 'm1', provider: 'anthropic', @@ -14,9 +14,9 @@ describe('provider catalog state helpers', () => { display_name: 'm1', context_window: 1, }; - expect(parseModelArray([model])).toEqual([model]); - expect(parseModelArray(model)).toEqual([]); - expect(parseModelArray(null)).toEqual([]); - expect(parseModelArray('bad')).toEqual([]); + expect(isModel(model)).toBe(true); + expect(isModel({ provider: 'anthropic' })).toBe(false); + expect(isModel(null)).toBe(false); + expect(isModel('bad')).toBe(false); }); }); diff --git a/harness/tests/runtime/lease.test.ts b/harness/tests/runtime/lease.test.ts new file mode 100644 index 00000000..75928180 --- /dev/null +++ b/harness/tests/runtime/lease.test.ts @@ -0,0 +1,207 @@ +import { describe, expect, it, vi } from 'vitest'; +import { payloadStoreKey } from '../_helpers/stateStoreKey.js'; +import { + acquireLease, + acquireLeaseWithWait, + mintLeaseNonce, + releaseLease, +} from '../../src/runtime/lease.js'; + +type Iii = Parameters[0]; + +/** + * In-memory ISdk stub. state::update is atomic by construction — its + * read-modify-write runs synchronously after the optional latency await, like + * the engine's store write-lock; latencyMs shifts the interleaving. + */ +function makeStateIii(latencyMs = 0): { iii: Iii; store: Map } { + const store = new Map(); + const maybeWait = () => + latencyMs > 0 ? new Promise((r) => setTimeout(r, latencyMs)) : Promise.resolve(); + + const iii = { + trigger: vi.fn(async ({ function_id, payload }: { function_id: string; payload: unknown }) => { + const p = payload as Record; + const key = payloadStoreKey(p as { scope?: string; key?: string }); + if (function_id === 'state::get') { + return store.has(key) ? store.get(key) : null; + } + if (function_id === 'state::set') { + await maybeWait(); + const v = p.value; + if (v === null || v === undefined) store.delete(key); + else store.set(key, v); + return { ok: true }; + } + if (function_id === 'state::update') { + await maybeWait(); + const ops = (p.ops ?? []) as Array<{ type: string; value?: unknown }>; + const old_value = store.has(key) ? store.get(key) : null; + let new_value: unknown = old_value; + for (const op of ops) if (op.type === 'set') new_value = op.value; + if (new_value === null || new_value === undefined) store.delete(key); + else store.set(key, new_value); + return { old_value, new_value }; + } + return null; + }), + }; + return { iii: iii as unknown as Iii, store }; +} + +/** state::update always fails (tolerant wrapper → null), mimicking an outage. */ +function makeFailingUpdateIii(): { iii: Iii } { + const iii = { + trigger: vi.fn(async ({ function_id }: { function_id: string }) => { + if (function_id === 'state::update') return null; + return null; + }), + }; + return { iii: iii as unknown as Iii }; +} + +const SCOPE = 'turn_lease'; +const TTL = 30_000; + +describe('mintLeaseNonce', () => { + it('produces unique values across rapid calls', () => { + const seen = new Set(); + for (let i = 0; i < 1000; i++) seen.add(mintLeaseNonce()); + expect(seen.size).toBe(1000); + }); +}); + +describe('acquireLease / releaseLease', () => { + it('grants a free lease and returns a nonce', async () => { + const { iii } = makeStateIii(); + await expect(acquireLease(iii, SCOPE, 's1', TTL)).resolves.toEqual(expect.any(String)); + }); + + it('denies a second acquirer while held', async () => { + const { iii } = makeStateIii(); + expect(await acquireLease(iii, SCOPE, 's1', TTL)).not.toBeNull(); + await expect(acquireLease(iii, SCOPE, 's1', TTL)).resolves.toBeNull(); + }); + + it('re-grants after the owner releases', async () => { + const { iii } = makeStateIii(); + const nonce = await acquireLease(iii, SCOPE, 's1', TTL); + expect(nonce).not.toBeNull(); + await releaseLease(iii, SCOPE, 's1', nonce as string); + await expect(acquireLease(iii, SCOPE, 's1', TTL)).resolves.not.toBeNull(); + }); + + it('ignores a release from a non-owner (wrong nonce)', async () => { + const { iii } = makeStateIii(); + const held = await acquireLease(iii, SCOPE, 's1', TTL); + expect(held).not.toBeNull(); + await releaseLease(iii, SCOPE, 's1', 'not-the-owner'); + // Lease is still held — a fresh acquire must still fail. + await expect(acquireLease(iii, SCOPE, 's1', TTL)).resolves.toBeNull(); + }); + + it('sends a single state::update set op at the root path', async () => { + const captured: Array<{ type: string; path?: unknown; value?: unknown }> = []; + const iii = { + trigger: vi.fn( + async ({ function_id, payload }: { function_id: string; payload: unknown }) => { + if (function_id === 'state::get') return null; + if (function_id === 'state::update') { + for (const op of ((payload as Record).ops ?? []) as Array<{ + type: string; + path?: unknown; + value?: unknown; + }>) + captured.push(op); + return { old_value: null, new_value: { dummy: true } }; + } + return null; + }, + ), + } as unknown as Iii; + const nonce = await acquireLease(iii, SCOPE, 's1', TTL); + expect(captured).toHaveLength(1); + expect(captured[0]).toMatchObject({ + type: 'set', + path: '', + value: { nonce, ts: expect.any(Number) }, + }); + }); +}); + +describe('acquireLease TTL steal', () => { + it('steals a lease whose ts is older than the TTL', async () => { + const { iii, store } = makeStateIii(); + store.set(`${SCOPE}/s1`, { nonce: 'crashed', ts: Date.now() - TTL - 1_000 }); + await expect(acquireLease(iii, SCOPE, 's1', TTL)).resolves.toEqual(expect.any(String)); + }); + + it('does not steal a lease still within its TTL', async () => { + const { iii, store } = makeStateIii(); + store.set(`${SCOPE}/s1`, { nonce: 'fresh', ts: Date.now() }); + await expect(acquireLease(iii, SCOPE, 's1', TTL)).resolves.toBeNull(); + }); +}); + +describe('acquireLease concurrency (the single-writer invariant)', () => { + it('lets exactly one of two concurrent acquirers win, with write latency', async () => { + const { iii } = makeStateIii(10); + const results = await Promise.all([ + acquireLease(iii, SCOPE, 'race', TTL), + acquireLease(iii, SCOPE, 'race', TTL), + ]); + expect(results.filter((r) => r !== null)).toHaveLength(1); + }); + + it('lets exactly one of eight concurrent acquirers win, with write latency', async () => { + const { iii } = makeStateIii(10); + const results = await Promise.all( + Array.from({ length: 8 }, () => acquireLease(iii, SCOPE, 'race8', TTL)), + ); + expect(results.filter((r) => r !== null)).toHaveLength(1); + }); + + it('never false-wins during a state-store outage (update returns null)', async () => { + const { iii } = makeFailingUpdateIii(); + await expect(acquireLease(iii, SCOPE, 's1', TTL)).resolves.toBeNull(); + }); + + it('returns null for every concurrent acquirer during an outage', async () => { + const { iii } = makeFailingUpdateIii(); + const results = await Promise.all( + Array.from({ length: 4 }, () => acquireLease(iii, SCOPE, 'out', TTL)), + ); + expect(results.every((r) => r === null)).toBe(true); + }); + + it('keeps two independent (scope,key) leases from blocking each other', async () => { + const { iii } = makeStateIii(); + const a = await acquireLease(iii, 'compaction_lease', 'sid', TTL); + const b = await acquireLease(iii, 'prune_lease', 'sid', TTL); + expect(a).not.toBeNull(); + expect(b).not.toBeNull(); + expect(a).not.toBe(b); + }); +}); + +describe('acquireLeaseWithWait', () => { + it('acquires immediately when free', async () => { + const { iii } = makeStateIii(); + await expect(acquireLeaseWithWait(iii, SCOPE, 's1', TTL, 200)).resolves.not.toBeNull(); + }); + + it('returns null when the lease never frees within the timeout', async () => { + const { iii } = makeStateIii(); + expect(await acquireLease(iii, SCOPE, 'busy', TTL)).not.toBeNull(); + await expect(acquireLeaseWithWait(iii, SCOPE, 'busy', TTL, 80)).resolves.toBeNull(); + }); + + it('acquires once the holder releases mid-wait', async () => { + const { iii } = makeStateIii(); + const held = (await acquireLease(iii, SCOPE, 'hand', TTL)) as string; + setTimeout(() => { + void releaseLease(iii, SCOPE, 'hand', held); + }, 30); + await expect(acquireLeaseWithWait(iii, SCOPE, 'hand', TTL, 1_000)).resolves.not.toBeNull(); + }); +}); diff --git a/harness/tests/runtime/state-list.test.ts b/harness/tests/runtime/state-list.test.ts index e043d382..a45897ae 100644 --- a/harness/tests/runtime/state-list.test.ts +++ b/harness/tests/runtime/state-list.test.ts @@ -1,20 +1,26 @@ -import { describe, expect, it } from 'vitest'; -import { parseStateListValues } from '../../src/runtime/state.js'; +import { describe, expect, it, vi } from 'vitest'; +import type { ISdk } from '../../src/runtime/iii.js'; +import { createState } from '../../src/runtime/state.js'; -describe('parseStateListValues', () => { - it('accepts flat array (official iii shape)', () => { - const rows = [{ session_id: 's1', state: 'stopped' }]; - expect(parseStateListValues(rows)).toEqual(rows); - }); +function makeIii(listResult: unknown): ISdk { + return { + trigger: vi.fn(async ({ function_id }: { function_id: string }) => + function_id === 'state::list' ? listResult : null, + ), + } as unknown as ISdk; +} - it('unwraps { value } rows', () => { - const inner = { session_id: 's1', state: 'function_awaiting_approval' }; - expect(parseStateListValues([{ value: inner }])).toEqual([inner]); +describe('createState().list', () => { + it('returns the flat array of stored values (official iii shape)', async () => { + const rows = [{ session_id: 's1', state: 'stopped' }]; + await expect(createState(makeIii(rows)).list({ scope: 'turn_state' })).resolves.toEqual(rows); }); - it('returns [] for non-array responses', () => { - expect(parseStateListValues(null)).toEqual([]); - expect(parseStateListValues({ ok: true })).toEqual([]); - expect(parseStateListValues({ items: [{ id: 'm1' }] })).toEqual([]); + it('returns [] for non-array responses', async () => { + await expect(createState(makeIii(null)).list({ scope: 's' })).resolves.toEqual([]); + await expect(createState(makeIii({ ok: true })).list({ scope: 's' })).resolves.toEqual([]); + await expect( + createState(makeIii({ items: [{ id: 'm1' }] })).list({ scope: 's' }), + ).resolves.toEqual([]); }); }); diff --git a/harness/tests/turn-orchestrator/events.test.ts b/harness/tests/turn-orchestrator/events.test.ts index 1e604768..62feb8f2 100644 --- a/harness/tests/turn-orchestrator/events.test.ts +++ b/harness/tests/turn-orchestrator/events.test.ts @@ -35,26 +35,33 @@ describe('emit (agent event producer)', () => { expect(streamSets(calls).map((c) => c.payload.stream_name)).toEqual(['agent::events']); }); - it('mirrors a turn_end event onto the dedicated agent::turn_end stream', async () => { + it('enqueues a compaction wake on turn_end instead of mirroring to a stream', async () => { const { iii, calls } = buildSdk(); const event = { type: 'turn_end', - message: { role: 'assistant' }, + message: { + role: 'assistant', + usage: { input: 100, output: 5 }, + provider: 'anthropic', + model: 'claude-haiku-4-5', + }, function_results: [], } as unknown as AgentEvent; await emit(iii, SID, event); - const sets = streamSets(calls); - const streams = sets.map((c) => c.payload.stream_name); - expect(streams).toContain('agent::events'); - expect(streams).toContain('agent::turn_end'); + // The event is written only to agent::events — no dedicated turn_end stream. + expect(streamSets(calls).map((c) => c.payload.stream_name)).toEqual(['agent::events']); - const mirror = sets.find((c) => c.payload.stream_name === 'agent::turn_end'); - expect(mirror?.payload.group_id).toBe(SID); - expect(mirror?.payload.data).toEqual(event); - // Same logical event → identical item_id on both streams (single seq per emit). - expect(new Set(sets.map((c) => c.payload.item_id)).size).toBe(1); + // A typed compaction wake is enqueued to the out-of-band compactor. + const wake = calls.find((c) => c.function_id === 'context-compaction::on_turn_end'); + expect(wake).toBeDefined(); + expect(wake?.payload).toMatchObject({ + session_id: SID, + usage: { input: 100, output: 5 }, + provider: 'anthropic', + model: 'claude-haiku-4-5', + }); }); }); diff --git a/harness/tests/turn-orchestrator/parse-turn-state-record.test.ts b/harness/tests/turn-orchestrator/parse-turn-state-record.test.ts deleted file mode 100644 index be83190b..00000000 --- a/harness/tests/turn-orchestrator/parse-turn-state-record.test.ts +++ /dev/null @@ -1,35 +0,0 @@ -import { describe, expect, it } from 'vitest'; -import { newRecord, parseTurnStateRecord } from '../../src/turn-orchestrator/state.js'; - -describe('parseTurnStateRecord', () => { - it('returns a valid record for a well-formed turn_state', () => { - const rec = newRecord('sess-1'); - expect(parseTurnStateRecord(rec)).toEqual(rec); - }); - - it('returns null for null, undefined, and primitives', () => { - expect(parseTurnStateRecord(null)).toBeNull(); - expect(parseTurnStateRecord(undefined)).toBeNull(); - expect(parseTurnStateRecord('nope')).toBeNull(); - expect(parseTurnStateRecord(42)).toBeNull(); - }); - - it('returns null when required identity fields are missing', () => { - expect(parseTurnStateRecord({ state: 'provisioning' })).toBeNull(); - expect(parseTurnStateRecord({ session_id: 's1' })).toBeNull(); - expect(parseTurnStateRecord({ messages: [] })).toBeNull(); - }); - - it('applies defaults for missing scalar fields on partial records', () => { - const parsed = parseTurnStateRecord({ - session_id: 's1', - state: 'provisioning', - }); - expect(parsed).toMatchObject({ - session_id: 's1', - state: 'provisioning', - turn_count: 0, - turn_end_emitted: false, - }); - }); -}); diff --git a/harness/tests/turn-orchestrator/run-request.test.ts b/harness/tests/turn-orchestrator/run-request.test.ts index 945c13f5..da10c146 100644 --- a/harness/tests/turn-orchestrator/run-request.test.ts +++ b/harness/tests/turn-orchestrator/run-request.test.ts @@ -1,9 +1,9 @@ import { describe, expect, it } from 'vitest'; -import { parseRunRequest } from '../../src/turn-orchestrator/run-request.js'; +import { defaultRunRequest } from '../../src/turn-orchestrator/run-request.js'; -describe('parseRunRequest', () => { - it('maps persisted run::start fields with defaults for missing keys', () => { - expect(parseRunRequest({})).toEqual({ +describe('defaultRunRequest', () => { + it('is the empty run request used when no record exists yet', () => { + expect(defaultRunRequest()).toEqual({ provider: '', model: '', mode: null, @@ -11,65 +11,4 @@ describe('parseRunRequest', () => { function_schemas: [], }); }); - - it('passes through provided string fields', () => { - expect(parseRunRequest({ provider: 'openai', model: 'gpt-4', system_prompt: 'hi' })).toEqual({ - provider: 'openai', - model: 'gpt-4', - mode: null, - system_prompt: 'hi', - function_schemas: [], - }); - }); - - it('rejects invalid mode values and accepts valid ones', () => { - expect(parseRunRequest({ mode: 'invalid' }).mode).toBeNull(); - expect(parseRunRequest({ mode: 'plan' }).mode).toBe('plan'); - expect(parseRunRequest({ mode: 'ask' }).mode).toBe('ask'); - expect(parseRunRequest({ mode: 'agent' }).mode).toBe('agent'); - }); - - it('coerces non-string fields to defaults', () => { - expect(parseRunRequest({ provider: 123, model: null, system_prompt: {} })).toEqual({ - provider: '', - model: '', - mode: null, - system_prompt: '', - function_schemas: [], - }); - }); - - it('treats null and undefined as empty run request', () => { - const empty = { - provider: '', - model: '', - mode: null, - system_prompt: '', - function_schemas: [], - }; - expect(parseRunRequest(null)).toEqual(empty); - expect(parseRunRequest(undefined)).toEqual(empty); - }); -}); - -describe('parseRunRequest function_schemas', () => { - it('defaults to [] and carries an array', () => { - expect(parseRunRequest({}).function_schemas).toEqual([]); - expect(parseRunRequest({ function_schemas: [{ name: 'x' }] }).function_schemas).toHaveLength(1); - }); -}); - -describe('parseRunRequest thinking_level', () => { - it('passes through a string level', () => { - expect(parseRunRequest({ thinking_level: 'high' }).thinking_level).toBe('high'); - }); - - it('is undefined when absent', () => { - expect(parseRunRequest({}).thinking_level).toBeUndefined(); - }); - - it('coerces non-string values to undefined', () => { - expect(parseRunRequest({ thinking_level: 42 }).thinking_level).toBeUndefined(); - expect(parseRunRequest({ thinking_level: { level: 'high' } }).thinking_level).toBeUndefined(); - }); }); diff --git a/harness/tests/turn-orchestrator/session-lease.test.ts b/harness/tests/turn-orchestrator/session-lease.test.ts new file mode 100644 index 00000000..ad530134 --- /dev/null +++ b/harness/tests/turn-orchestrator/session-lease.test.ts @@ -0,0 +1,86 @@ +import { describe, expect, it, vi } from 'vitest'; +import type { ISdk } from '../../src/runtime/iii.js'; +import { + acquireSessionLease, + releaseSessionLease, +} from '../../src/turn-orchestrator/state-runtime/session-lease.js'; + +/** + * Fake iii with an in-memory atomic state::update set-CAS (plus get/set). + * `failUpdate` makes update throw → tolerant client returns null, exercising the + * outage path that must never false-win. + */ +function makeIii(opts: { failUpdate?: boolean } = {}): ISdk { + const store = new Map(); + const id = (p: { scope: string; key: string }) => `${p.scope}/${p.key}`; + + const trigger = vi.fn( + async ({ function_id, payload }: { function_id: string; payload: Record }) => { + const k = id(payload as { scope: string; key: string }); + if (function_id === 'state::get') return store.has(k) ? store.get(k) : null; + if (function_id === 'state::set') { + if (payload.value === null || payload.value === undefined) store.delete(k); + else store.set(k, payload.value); + return { ok: true }; + } + if (function_id === 'state::update') { + if (opts.failUpdate) throw new Error('state-store down'); + const old_value = store.has(k) ? store.get(k) : null; + let new_value: unknown = old_value; + for (const op of (payload.ops ?? []) as Array<{ type: string; value?: unknown }>) { + if (op.type === 'set') new_value = op.value; + } + if (new_value === null || new_value === undefined) store.delete(k); + else store.set(k, new_value); + return { old_value, new_value }; + } + throw new Error(`unexpected function_id ${function_id}`); + }, + ); + + return { trigger } as unknown as ISdk; +} + +describe('acquireSessionLease', () => { + it('grants the lease on a fresh key and returns a nonce', async () => { + const iii = makeIii(); + await expect(acquireSessionLease(iii, 's1')).resolves.toEqual(expect.any(String)); + }); + + it('denies a second concurrent holder', async () => { + const iii = makeIii(); + expect(await acquireSessionLease(iii, 's1')).not.toBeNull(); + await expect(acquireSessionLease(iii, 's1')).resolves.toBeNull(); + }); + + it('re-grants after the owner releases', async () => { + const iii = makeIii(); + const nonce = await acquireSessionLease(iii, 's1'); + expect(nonce).not.toBeNull(); + await releaseSessionLease(iii, 's1', nonce as string); + await expect(acquireSessionLease(iii, 's1')).resolves.not.toBeNull(); + }); + + it('ignores a release from a non-owner (wrong nonce)', async () => { + const iii = makeIii(); + expect(await acquireSessionLease(iii, 's1')).not.toBeNull(); + await releaseSessionLease(iii, 's1', 'someone-else'); + await expect(acquireSessionLease(iii, 's1')).resolves.toBeNull(); + }); + + // Regression: a transient state-store outage must NOT be read as a win, or + // every concurrent contender claims the lease at once → duplicate side effects. + it('does not grant the lease when the atomic update fails', async () => { + const iii = makeIii({ failUpdate: true }); + await expect(acquireSessionLease(iii, 's1')).resolves.toBeNull(); + }); + + it('never lets two contenders both win under a state-store outage', async () => { + const iii = makeIii({ failUpdate: true }); + const [a, b] = await Promise.all([ + acquireSessionLease(iii, 's1'), + acquireSessionLease(iii, 's1'), + ]); + expect([a, b]).toEqual([null, null]); + }); +});