refactor(harness): resolve model/provider once per turn, drop duplicate reads and the steering hop#237
Conversation
…te reads and the steering hop The turn loop re-fetched constant data on every round-trip: models::get ran three times per iteration (orchestrator preflight, provider stream, compaction) and harness::provider::resolve ran once per stream, all returning the same value for the whole turn. finalizeBatch rebuilt the full compaction window only to read the trailing function_result ids for dedup, and a separate turn::steering_check FSM step gated every continue/end decision. - Resolve the full catalog Model once at provisioning, persist it on the turn record, and thread it to preflight, the provider (ProviderStreamInput.model_meta), and compaction (turn_end.model_limit). Every reader falls back to a live fetch when it is absent, so an empty/cold catalog is unchanged. models::get drops from ~3 per round-trip to ~1 per turn. - Cache the provider credential resolution in the provider process, keyed by a per-run id threaded on ProviderStreamInput.resolution_key, and invalidate it on a 401. Collapses the per-stream harness::provider::resolve to ~1 per turn while still picking up a key rotated between turns. - finalizeBatch reads only the raw trailing function_result run via a new TurnStore.loadTrailingResultIds (default leaf, matching the append path), skipping the paired session-tree::compactions read. - Inline the steering_check routing (continue / max_turns / end) into assistant_streaming and function_execute, removing one durable FSM step and its queue wake per round-trip. turn::steering_check stays registered as a compat drain so records persisted in that state at deploy time advance instead of DLQ-wedging the turn; the state, schemas, and registration are removed in a follow-up once that queue is confirmed empty. Continuing batches now resume assistant_streaming with cleared function_results (the results already travel on the turn_end event for consumers). No change to durability, approvals, compaction safety, or mid-turn model/credential changes.
|
The latest updates on your projects. Learn more about Vercel for GitHub.
|
|
Note Reviews pausedIt looks like this branch is under active development. To avoid overwhelming you with review comments due to an influx of new commits, CodeRabbit has automatically paused this review. You can configure this behavior by changing the Use the following commands to manage reviews:
Use the checkboxes below for quick actions:
📝 WalkthroughWalkthroughThreads model metadata through provisioning, preflight, and streaming; adds session-tree batch appends; inlines assistant end-turn routing and finishing; implements per-turn Anthropic credential caching/invalidation; deduplicates function results via trailing-result loading; and adds run-start busy detection with tests updated. ChangesTurn Orchestrator Model Threading, Routing, and Batch Operations
Estimated code review effort: 🎯 4 (Complex) | ⏱️ ~60 minutes
✨ Finishing Touches🧪 Generate unit tests (beta)
|
skill-check — worker0 verified, 14 skipped (no docs/).
Four for four. Nicely done. |
There was a problem hiding this comment.
Actionable comments posted: 3
🧹 Nitpick comments (2)
harness/tests/turn-orchestrator/provisioning-layer.test.ts (1)
133-147: ⚡ Quick winAdd a regression case where
recalready hasmodel_metabefore applying a null outcome.The current null-resolution test uses a fresh record, so it won’t catch stale carry-over. Seed
rec.model_metafirst, then assert it is cleared after applying{ model_meta: null }.🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@harness/tests/turn-orchestrator/provisioning-layer.test.ts` around lines 133 - 147, Add a regression case that ensures existing model_meta is cleared when a null outcome is applied: create a record via newRecord('s1') and set rec.model_meta = { some: 'value' } (or similar seed) before calling applyProvisioningOutcome(ports, rec, { kind: 'ready', runRequest, model_meta: null }); then assert rec.model_meta is undefined and rec.state is 'assistant_streaming'. Use the same test scaffolding (stubPorts, runRequest) and the existing test name or a new one to validate stale model_meta is removed by applyProvisioningOutcome.harness/src/turn-orchestrator/preflight.ts (1)
47-49: ⚡ Quick winValidate threaded
modelMetaidentity before using its limits.
runPreflightcurrently trustsmodelMetalimits even ifmodelMeta.id/providerdiffer frommodelID/providerID, which can compute overflow against the wrong model window and skip needed compaction.Suggested patch
- const model = modelMeta - ? { providerID, modelID, modelLimit: limitFromModel(modelMeta) } - : await fetchModelLimit(iii, providerID, modelID); + const threadedMatch = + modelMeta && modelMeta.id === modelID && modelMeta.provider === providerID; + if (modelMeta && !threadedMatch) { + logger.warn('preflight: threaded modelMeta mismatched requested model; falling back to models::get', { + providerID, + modelID, + meta_provider: modelMeta.provider, + meta_id: modelMeta.id, + }); + } + const model = + threadedMatch && modelMeta + ? { providerID, modelID, modelLimit: limitFromModel(modelMeta) } + : await fetchModelLimit(iii, providerID, modelID);🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@harness/src/turn-orchestrator/preflight.ts` around lines 47 - 49, runPreflight currently uses modelMeta's limits without verifying it matches the requested model, which can apply the wrong window; update the conditional around modelMeta in the model assignment so you only use { providerID, modelID, modelLimit: limitFromModel(modelMeta) } when modelMeta is present AND modelMeta.id === modelID AND modelMeta.provider === providerID (or the actual property names on modelMeta), otherwise call await fetchModelLimit(iii, providerID, modelID); ensure the check references the modelMeta object and uses limitFromModel and fetchModelLimit unchanged.
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
Inline comments:
In `@harness/src/provider-anthropic/auth.ts`:
- Around line 24-31: The single-slot process-global resolveCache (resolveCache)
keyed only by the numeric key param in resolveProviderForTurn(iii, key) risks
cross-tenant/run collisions (rec.started_at_ms); change to a per-ISdk namespaced
cache: e.g., replace resolveCache with a Map/WeakMap keyed by the ISdk identity
(or session/run UUID) that maps to an inner map keyed by the resolutionKey, then
in resolveProviderForTurn(iii, key) build a composite lookup using the iii
identity and the resolution key and only reuse a cached ProviderResolveResult
when both match; alternatively ensure each run supplies a strong unique per-run
id (UUID/monotonic counter) instead of the millisecond timestamp and include iii
when caching before calling resolveProvider(iii, PROVIDER_ID).
In `@harness/src/turn-orchestrator/provisioning/process.ts`:
- Around line 62-65: The code only assigns rec.model_meta when
outcome.model_meta is truthy, which allows stale metadata to persist; instead
assign unconditionally (rec.model_meta = outcome.model_meta) so a null/undefined
resolution clears the previous value; update the block that currently checks
outcome.model_meta before assignment (the rec.model_meta assignment near
transitionTo) to always overwrite rec.model_meta with outcome.model_meta.
In `@harness/src/turn-orchestrator/state-runtime/turn-end.ts`:
- Around line 58-75: endTurnForMaxTurns currently appends a synthetic assistant
message and emits message_complete before calling emitTurnEndOnce and does not
pass through rec.function_results, causing dropped tool results and possible
duplicate side effects on re-entry; modify endTurnForMaxTurns to first check/add
an idempotency guard on the TurnStateRecord (e.g., a rec.turn_end_emitted flag)
to early-return if already run, and when invoking emitTurnEndOnce pass
rec.function_results (instead of relying on default []) so trailing tool results
are preserved, keeping the existing ports.appendMessages, ports.emit
(message_complete) and ports.finishSession calls but ensuring they only run once
due to the guard.
---
Nitpick comments:
In `@harness/src/turn-orchestrator/preflight.ts`:
- Around line 47-49: runPreflight currently uses modelMeta's limits without
verifying it matches the requested model, which can apply the wrong window;
update the conditional around modelMeta in the model assignment so you only use
{ providerID, modelID, modelLimit: limitFromModel(modelMeta) } when modelMeta is
present AND modelMeta.id === modelID AND modelMeta.provider === providerID (or
the actual property names on modelMeta), otherwise call await
fetchModelLimit(iii, providerID, modelID); ensure the check references the
modelMeta object and uses limitFromModel and fetchModelLimit unchanged.
In `@harness/tests/turn-orchestrator/provisioning-layer.test.ts`:
- Around line 133-147: Add a regression case that ensures existing model_meta is
cleared when a null outcome is applied: create a record via newRecord('s1') and
set rec.model_meta = { some: 'value' } (or similar seed) before calling
applyProvisioningOutcome(ports, rec, { kind: 'ready', runRequest, model_meta:
null }); then assert rec.model_meta is undefined and rec.state is
'assistant_streaming'. Use the same test scaffolding (stubPorts, runRequest) and
the existing test name or a new one to validate stale model_meta is removed by
applyProvisioningOutcome.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: CHILL
Plan: Pro
Run ID: bac49502-9d42-47e2-a8bc-746c63d1724f
📒 Files selected for processing (36)
harness/src/context-compaction/handler-async.tsharness/src/context-compaction/model-resolver.tsharness/src/models-catalog/types.tsharness/src/provider-anthropic/auth.tsharness/src/provider-anthropic/stream-fn.tsharness/src/provider-anthropic/stream.tsharness/src/turn-orchestrator/assistant-streaming/ports.tsharness/src/turn-orchestrator/assistant-streaming/run.tsharness/src/turn-orchestrator/function-execute/ports.tsharness/src/turn-orchestrator/function-execute/run.tsharness/src/turn-orchestrator/preflight.tsharness/src/turn-orchestrator/provider-router.tsharness/src/turn-orchestrator/provisioning/ports.tsharness/src/turn-orchestrator/provisioning/process.tsharness/src/turn-orchestrator/state-runtime/ports.tsharness/src/turn-orchestrator/state-runtime/store.tsharness/src/turn-orchestrator/state-runtime/turn-end.tsharness/src/turn-orchestrator/state.tsharness/src/turn-orchestrator/steering-check/process.tsharness/src/turn-orchestrator/steering-check/run.tsharness/src/types/agent-event.tsharness/src/types/provider.tsharness/tests/context-compaction/handler-async.test.tsharness/tests/integration/mode-approval.e2e.test.tsharness/tests/integration/parallel-approval.e2e.test.tsharness/tests/provider-anthropic/auth.test.tsharness/tests/turn-orchestrator/_helpers/mockTurnStore.tsharness/tests/turn-orchestrator/assistant-streaming.test.tsharness/tests/turn-orchestrator/assistant.test.tsharness/tests/turn-orchestrator/function-awaiting-approval.test.tsharness/tests/turn-orchestrator/function-execute.test.tsharness/tests/turn-orchestrator/functions.test.tsharness/tests/turn-orchestrator/preflight.test.tsharness/tests/turn-orchestrator/provider-router.test.tsharness/tests/turn-orchestrator/provisioning-layer.test.tsharness/tests/turn-orchestrator/steering-check-layer.test.ts
| let resolveCache: { key: number; resolved: ProviderResolveResult } | null = null; | ||
|
|
||
| async function resolveProviderForTurn(iii: ISdk, key?: number): Promise<ProviderResolveResult> { | ||
| if (key === undefined) return resolveProvider(iii, PROVIDER_ID); | ||
| if (resolveCache?.key === key) return resolveCache.resolved; | ||
| const resolved = await resolveProvider(iii, PROVIDER_ID); | ||
| resolveCache = { key, resolved }; | ||
| return resolved; |
There was a problem hiding this comment.
🧩 Analysis chain
🏁 Script executed:
#!/bin/bash
set -euo pipefail
echo "1) Find resolution_key definition and call flow"
rg -n -C3 '\bresolution_key\b' harness/src harness/tests
echo
echo "2) Locate ProviderStreamInput contract carrying resolution_key"
rg -n -C3 'type ProviderStreamInput|interface ProviderStreamInput' harness/src
echo
echo "3) Trace where resolution_key is assigned from turn/run state"
rg -n -C4 'resolution_key\s*[:=]|run_id|turn_id|turn_count|session_id' harness/src/turn-orchestrator harness/src/provider-anthropicRepository: iii-hq/workers
Length of output: 50371
🏁 Script executed:
#!/bin/bash
set -euo pipefail
echo "== auth.ts: resolveProviderForTurn and resolveCache =="
rg -n "resolveCache|resolveProviderForTurn|resolveProvider\(" harness/src/provider-anthropic/auth.ts
echo
sed -n '1,120p' harness/src/provider-anthropic/auth.ts | nl -ba | sed -n '1,120p'
echo
echo "== Call sites of resolveProviderForTurn =="
rg -n "resolveProviderForTurn\(" harness/src/provider-anthropic harness/src | head -n 50
echo
echo "== resolution_key assignment sites =="
rg -n "resolution_key\s*:" harness/src/turn-orchestrator harness/src/provider-anthropic harness/src | head -n 50
echo
echo "== started_at_ms initialization in turn orchestrator =="
rg -n "started_at_ms" harness/src/turn-orchestrator/state.ts harness/src/turn-orchestrator/assistant-streaming/run.tsRepository: iii-hq/workers
Length of output: 722
🏁 Script executed:
#!/bin/bash
set -euo pipefail
echo "== auth.ts full (bounded) excerpt around resolveProviderForTurn =="
# Print first 220 lines with line numbers using cat -n (nl not available)
sed -n '1,220p' harness/src/provider-anthropic/auth.ts | cat -n
echo
echo "== Find resolveProviderForTurn call site that passes resolutionKey =="
rg -n "resolveProviderForTurn\\(iii" -n harness/src/provider-anthropic/auth.ts
echo
echo "== Show block around that call (if present) =="
line=$(rg -n "resolveProviderForTurn\\(iii" harness/src/provider-anthropic/auth.ts | cut -d: -f1 | head -n1)
start=$((line-40))
end=$((line+40))
sed -n "${start},${end}p" harness/src/provider-anthropic/auth.ts | cat -n
echo
echo "== Where resolutionKey comes from in auth.ts =="
rg -n "resolutionKey" harness/src/provider-anthropic/auth.tsRepository: iii-hq/workers
Length of output: 5850
🏁 Script executed:
#!/bin/bash
set -euo pipefail
echo "== assistant-streaming/run.ts: find resolution_key assignment =="
rg -n "resolution_key" harness/src/turn-orchestrator/assistant-streaming/run.ts
echo
sed -n '1,120p' harness/src/turn-orchestrator/assistant-streaming/run.ts | cat -n
echo
echo "== turn-orchestrator/state.ts: where started_at_ms is set in newRecord =="
rg -n "started_at_ms|newRecord\\(" harness/src/turn-orchestrator/state.ts
echo
# show excerpt around newRecord
line=$(rg -n "export function newRecord" harness/src/turn-orchestrator/state.ts | cut -d: -f1 | head -n1)
start=$((line-20))
end=$((line+80))
sed -n "${start},${end}p" harness/src/turn-orchestrator/state.ts | cat -n
echo
echo "== provider-router.ts: how resolution_key is threaded into ProviderStreamInput =="
rg -n "resolution_key" harness/src/turn-orchestrator/provider-router.ts
echo
sed -n '40,120p' harness/src/turn-orchestrator/provider-router.ts | cat -n
echo
echo "== provider-anthropic/stream-fn.ts: how input.resolution_key reaches buildConfig =="
rg -n "resolution_key" harness/src/provider-anthropic/stream-fn.ts
echo
sed -n '1,120p' harness/src/provider-anthropic/stream-fn.ts | cat -nRepository: iii-hq/workers
Length of output: 11874
🏁 Script executed:
#!/bin/bash
set -euo pipefail
echo "== Locate resolveProvider implementation =="
fd -a "provider-resolve" harness/src | head -n 20
echo
file=$(fd -a "provider-resolve" harness/src | head -n 1)
echo "== Using file: $file =="
sed -n '1,200p' "$file" | cat -n
echo
echo "== Search for harness::provider::resolve and credential scoping =="
rg -n "provider::resolve|resolveProvider|credential|api[_-]?key" "$file"Repository: iii-hq/workers
Length of output: 4143
🏁 Script executed:
#!/bin/bash
set -euo pipefail
sed -n '1,120p' harness/src/types/provider.ts | cat -n
rg -n "resolution_key" harness/src/types/provider.tsRepository: iii-hq/workers
Length of output: 4473
🏁 Script executed:
#!/bin/bash
set -euo pipefail
sed -n '1,140p' harness/src/provider-anthropic/auth.ts | cat -nRepository: iii-hq/workers
Length of output: 3651
Validate resolveCache keying for credential isolation.
In harness/src/provider-anthropic/auth.ts the single-slot, process-global resolveCache is keyed only by resolutionKey (resolveProviderForTurn(iii, resolutionKey)), and resolutionKey is set to rec.started_at_ms (from Date.now() in newRecord). That millisecond timestamp is not guaranteed unique across concurrent runs/sessions or across different ISdk/tenant contexts handled by the same process; if harness::provider::resolve can return different credentials per iii, a collision could reuse the wrong turn’s resolved credential.
Namespace the cache by iii (or session/run identity) and/or use a stronger per-run unique key (UUID/monotonic counter), rather than a bare number timestamp.
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@harness/src/provider-anthropic/auth.ts` around lines 24 - 31, The single-slot
process-global resolveCache (resolveCache) keyed only by the numeric key param
in resolveProviderForTurn(iii, key) risks cross-tenant/run collisions
(rec.started_at_ms); change to a per-ISdk namespaced cache: e.g., replace
resolveCache with a Map/WeakMap keyed by the ISdk identity (or session/run UUID)
that maps to an inner map keyed by the resolutionKey, then in
resolveProviderForTurn(iii, key) build a composite lookup using the iii identity
and the resolution key and only reuse a cached ProviderResolveResult when both
match; alternatively ensure each run supplies a strong unique per-run id
(UUID/monotonic counter) instead of the millisecond timestamp and include iii
when caching before calling resolveProvider(iii, PROVIDER_ID).
| // Persist the resolved catalog entry on the durable record so preflight, | ||
| // provider streaming, and turn-end compaction read it instead of re-fetching. | ||
| if (outcome.model_meta) rec.model_meta = outcome.model_meta; | ||
| transitionTo(rec, 'assistant_streaming'); |
There was a problem hiding this comment.
Clear stale model_meta when current turn resolution is null.
Line 64 only updates rec.model_meta on a truthy value, so an existing value can leak into the next turn when resolution misses. Always overwrite/clear it to keep per-turn model metadata accurate.
Proposed fix
- if (outcome.model_meta) rec.model_meta = outcome.model_meta;
+ if (outcome.model_meta) rec.model_meta = outcome.model_meta;
+ else delete rec.model_meta;📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| // Persist the resolved catalog entry on the durable record so preflight, | |
| // provider streaming, and turn-end compaction read it instead of re-fetching. | |
| if (outcome.model_meta) rec.model_meta = outcome.model_meta; | |
| transitionTo(rec, 'assistant_streaming'); | |
| // Persist the resolved catalog entry on the durable record so preflight, | |
| // provider streaming, and turn-end compaction read it instead of re-fetching. | |
| if (outcome.model_meta) rec.model_meta = outcome.model_meta; | |
| else delete rec.model_meta; | |
| transitionTo(rec, 'assistant_streaming'); |
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@harness/src/turn-orchestrator/provisioning/process.ts` around lines 62 - 65,
The code only assigns rec.model_meta when outcome.model_meta is truthy, which
allows stale metadata to persist; instead assign unconditionally (rec.model_meta
= outcome.model_meta) so a null/undefined resolution clears the previous value;
update the block that currently checks outcome.model_meta before assignment (the
rec.model_meta assignment near transitionTo) to always overwrite rec.model_meta
with outcome.model_meta.
| export async function endTurnForMaxTurns( | ||
| ports: MaxTurnsEndPorts, | ||
| rec: TurnStateRecord, | ||
| ): Promise<void> { | ||
| const msg = syntheticAssistant({ | ||
| stop_reason: 'end', | ||
| text: `loop stopped: max_turns (${rec.max_turns ?? 0}) reached`, | ||
| }); | ||
| rec.last_assistant = msg; | ||
| await ports.appendMessages(rec.session_id, [msg]); | ||
| await ports.emit(rec.session_id, { | ||
| type: 'message_complete', | ||
| message: msg, | ||
| body_streamed: false, | ||
| }); | ||
| await emitTurnEndOnce(ports, rec, msg); | ||
| await ports.finishSession(rec); | ||
| } |
There was a problem hiding this comment.
Preserve function_results and add re-entry guard in endTurnForMaxTurns.
Line 73 emits turn_end without passing function_results, so it always falls back to [] and can drop trailing tool results at the max-turns stop. Also, Lines 67-72 execute before any idempotency guard in this helper, so re-entry can duplicate the synthetic assistant append/message_complete side effects.
💡 Proposed fix
export async function endTurnForMaxTurns(
ports: MaxTurnsEndPorts,
rec: TurnStateRecord,
): Promise<void> {
+ if (rec.turn_end_emitted) {
+ await ports.finishSession(rec);
+ return;
+ }
+
const msg = syntheticAssistant({
stop_reason: 'end',
text: `loop stopped: max_turns (${rec.max_turns ?? 0}) reached`,
});
rec.last_assistant = msg;
await ports.appendMessages(rec.session_id, [msg]);
await ports.emit(rec.session_id, {
type: 'message_complete',
message: msg,
body_streamed: false,
});
- await emitTurnEndOnce(ports, rec, msg);
+ await emitTurnEndOnce(ports, rec, msg, rec.function_results);
await ports.finishSession(rec);
}🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@harness/src/turn-orchestrator/state-runtime/turn-end.ts` around lines 58 -
75, endTurnForMaxTurns currently appends a synthetic assistant message and emits
message_complete before calling emitTurnEndOnce and does not pass through
rec.function_results, causing dropped tool results and possible duplicate side
effects on re-entry; modify endTurnForMaxTurns to first check/add an idempotency
guard on the TurnStateRecord (e.g., a rec.turn_end_emitted flag) to early-return
if already run, and when invoking emitTurnEndOnce pass rec.function_results
(instead of relying on default []) so trailing tool results are preserved,
keeping the existing ports.appendMessages, ports.emit (message_complete) and
ports.finishSession calls but ensuring they only run once due to the guard.
appendMessages fired one session-tree::append trigger per message, and each append re-read the whole session (state::list) to resolve the active leaf and refreshed session meta — so persisting N messages cost N triggers, N full-session reads, and N meta writes. Add session-tree::append_batch: resolve the active leaf once, chain the messages in memory (each entry's parent is the previous), and write them through a new SessionStore.appendMany that refreshes meta once. The orchestrator's appendMessages now fires a single batch trigger. Behavior is identical to the serial loop (same parent chain, same order); batch entries get strictly increasing timestamps so the last entry stays the resolvable leaf for the next append. The per-message session-tree::append stays registered for other callers (compaction, tests).
…, crash-safe result dedup Addresses review findings on the turn-flow compaction PR. - session-tree leaf resolution: activePath now resolves the active leaf as the tip of the parent chain (the entry that is no one's parent) instead of the raw (timestamp, id) sort-max. A batch append shares one timestamp and a later append whose clock ties or steps back no longer sorts before the batch tail and gets orphaned off the active path. Drops the future-dated `base + i` timestamps the batch used to keep itself sort-last. - model_meta wire validation is now lenient: a sparse/partial catalog entry passes through (and the provider reads its fields defensively / falls back to a live fetch) instead of failing ZodError and killing every stream for that model. The field is an optimization, never a source of truth. Removes the hand-synced strict ModelSchema mirror and its compile-time guard. - trailing-result dedup skips a trailing no-call assistant (the synthetic max_turns "loop stopped" notice) instead of treating it as the turn boundary, so a crash-replay of finalizeBatch no longer re-appends the whole result run behind the notice. - session-tree::append_batch rejects falsy/non-object elements at the boundary, matching the singular append guard. Cleanups folded in: fetchModelLimit reuses limitFromModel (one mapping); ModelLimit aliases the single ModelContextLimit shape; loadContextView and loadTrailingResultIds share one loadSessionMessages reader (with a consistent timeout); IiiStateSessionStore append/appendMany/updateEntry share writeEntry + bestEffortMetaRefresh helpers.
run::start unconditionally reset the session's turn_state record to a fresh `provisioning` and appended the new message. A second run::start for a session with a turn already running (second tab, TUI/ACP client, or a double-submit) therefore raced the in-flight step's last-write-wins saveRecord, corrupting both turns and pairing the new run_request with a stale record. Guard it: read the committed record first and, if the turn is still in flight (any non-terminal state, including a function_awaiting_approval park), ignore the kickoff without mutating anything and report `started: false`. Terminal turns (stopped/failed) and fresh sessions start normally. harness::trigger maps a not-started result to a 409 so clients can wait/retry instead of treating the ignored call as a started turn.
There was a problem hiding this comment.
Actionable comments posted: 1
🧹 Nitpick comments (1)
harness/tests/turn-orchestrator/run-start.test.ts (1)
187-193: ⚡ Quick winConstrain the
state::getstub toturn_state/sess-1lookups.Returning
recordfor everystate::getcan hide scope/key regressions inexecute.Suggested test stub hardening
function fakeIiiWithRecord(record: unknown): { iii: ISdk; calls: TriggerCall[] } { const calls: TriggerCall[] = []; const iii = { trigger: async <T, R>(req: { function_id: string; payload: T }): Promise<R> => { calls.push({ function_id: req.function_id, payload: req.payload }); - if (req.function_id === 'state::get') return record as R; + if (req.function_id === 'state::get') { + const p = req.payload as { scope?: string; key?: string }; + if (p.scope === 'turn_state' && p.key === 'sess-1') return record as R; + return null as R; + } return null as R; }, registerFunction: vi.fn(), } as unknown as ISdk;🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@harness/tests/turn-orchestrator/run-start.test.ts` around lines 187 - 193, The test stub fakeIiiWithRecord currently returns record for every state::get call; change the trigger implementation so it only returns record when the request is a state::get for the specific key used in the test (e.g., the turn_state lookup "turn_state/sess-1") and otherwise returns null; update the conditional in fakeIiiWithRecord.trigger (the function handling req.function_id === 'state::get') to inspect req.payload (or the key inside) and compare to "turn_state/sess-1" before returning record, leaving all other behavior (pushing to calls and returning null) unchanged.
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
Inline comments:
In `@harness/src/turn-orchestrator/run-start.ts`:
- Around line 29-36: The current run::start path does a non-atomic
loadRecord(session_id) + isTurnInFlight(...) then unconditionally calls
saveRecord/newRecord which allows races where two requests both transition to
provisioning; make the guard atomic by either (A) performing a compare-and-set
style write using the underlying scopedSet/persistRecord primitives so the save
requires the previously-loaded state (old_value) to match before persisting the
new record, or (B) acquire a per-session lease/lock around the check-and-write
(e.g., lockManager.acquire(session_id) / release) so only one caller can perform
loadRecord -> isTurnInFlight -> saveRecord; update the run::start flow to check
the CAS result or lock acquisition and return {started:false,
reason:'session_busy'} when the CAS fails or the lock is not acquired. Ensure
you modify the code paths that call loadRecord, isTurnInFlight,
saveRecord/persistRecord/scopedSet and newRecord so they use the chosen atomic
mechanism.
---
Nitpick comments:
In `@harness/tests/turn-orchestrator/run-start.test.ts`:
- Around line 187-193: The test stub fakeIiiWithRecord currently returns record
for every state::get call; change the trigger implementation so it only returns
record when the request is a state::get for the specific key used in the test
(e.g., the turn_state lookup "turn_state/sess-1") and otherwise returns null;
update the conditional in fakeIiiWithRecord.trigger (the function handling
req.function_id === 'state::get') to inspect req.payload (or the key inside) and
compare to "turn_state/sess-1" before returning record, leaving all other
behavior (pushing to calls and returning null) unchanged.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: CHILL
Plan: Pro
Run ID: 2bc84c47-7152-4b9e-819a-91ffc630b268
📒 Files selected for processing (18)
harness/src/context-compaction/model-resolver.tsharness/src/context-compaction/overflow.tsharness/src/harness/trigger.tsharness/src/session/tree/operations.tsharness/src/session/tree/register.tsharness/src/session/tree/store.tsharness/src/turn-orchestrator/run-start.tsharness/src/turn-orchestrator/schemas.tsharness/src/turn-orchestrator/state-runtime/context-view.tsharness/src/turn-orchestrator/state-runtime/store.tsharness/src/turn-orchestrator/state-runtime/transcript.tsharness/src/turn-orchestrator/state.tsharness/src/types/provider.tsharness/tests/harness/trigger.test.tsharness/tests/session/operations.test.tsharness/tests/turn-orchestrator/function-execute.test.tsharness/tests/turn-orchestrator/run-start.test.tsharness/tests/types/provider.test.ts
🚧 Files skipped from review as they are similar to previous changes (7)
- harness/src/types/provider.ts
- harness/src/context-compaction/model-resolver.ts
- harness/src/session/tree/register.ts
- harness/src/turn-orchestrator/state.ts
- harness/src/session/tree/operations.ts
- harness/tests/session/operations.test.ts
- harness/tests/turn-orchestrator/function-execute.test.ts
| const existing = await store.loadRecord(session_id); | ||
| if (existing && isTurnInFlight(existing)) { | ||
| logger.warn('run::start ignored: session already has a turn in flight', { | ||
| session_id, | ||
| state: existing.state, | ||
| }); | ||
| return { session_id, started: false, reason: 'session_busy' }; | ||
| } |
There was a problem hiding this comment.
🧩 Analysis chain
🏁 Script executed:
#!/bin/bash
set -euo pipefail
echo "Inspect run-start busy check call flow"
rg -n -C4 'loadRecord|isTurnInFlight|saveRecord|newRecord' harness/src/turn-orchestrator/run-start.ts
echo
echo "Inspect store persistence path for conditional-write semantics"
rg -n -C6 'saveRecord|persistRecord|state::set|previous|version|etag|if_match|compare' harness/src/turn-orchestrator/state-runtime/store.ts
echo
echo "Inspect turn_state helpers for any optimistic concurrency contract"
rg -n -C4 'turn_state|persistRecord|shouldWakeStep|parseTurnStateRecord' harness/src/turn-orchestrator/state-runtime/store.ts harness/src/turn-orchestrator/state.tsRepository: iii-hq/workers
Length of output: 13987
Make the busy-path protection atomic for concurrent run::start calls
In harness/src/turn-orchestrator/run-start.ts (lines 29-36), the “session already has a turn in flight” guard is a non-atomic read-then-act: it does loadRecord(session_id) + isTurnInFlight(existing), then—after the check—always persists the new turn via store.saveRecord(newRecord(...)). saveRecord/persistRecord use scopedSet (i.e., state::set) without any CAS/version/“previous state must match” precondition (it only reads old_value from the set result for subsequent logic), so two concurrent run::start requests can both pass the guard and both write provisioning. Use a per-session lease/lock or a compare-and-set style write precondition so only one request can transition the turn from non-in-flight into an in-flight state.
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@harness/src/turn-orchestrator/run-start.ts` around lines 29 - 36, The current
run::start path does a non-atomic loadRecord(session_id) + isTurnInFlight(...)
then unconditionally calls saveRecord/newRecord which allows races where two
requests both transition to provisioning; make the guard atomic by either (A)
performing a compare-and-set style write using the underlying
scopedSet/persistRecord primitives so the save requires the previously-loaded
state (old_value) to match before persisting the new record, or (B) acquire a
per-session lease/lock around the check-and-write (e.g.,
lockManager.acquire(session_id) / release) so only one caller can perform
loadRecord -> isTurnInFlight -> saveRecord; update the run::start flow to check
the CAS result or lock acquisition and return {started:false,
reason:'session_busy'} when the CAS fails or the lock is not acquired. Ensure
you modify the code paths that call loadRecord, isTurnInFlight,
saveRecord/persistRecord/scopedSet and newRecord so they use the chosen atomic
mechanism.
…erminal) Inlining the steering hop moved the terminal agent_end / finishSession into the same step as the LLM stream and the result append, before runTransition's saveRecord. A crash in that window replayed the whole step — re-running the stream and re-appending — after consumers had already seen the run end. Restore the durable boundary with an outbox: terminating turns persist their work + turn_end inline, then transition to a new `finishing` state instead of emitting agent_end. The turn::finishing step emits agent_end and advances to stopped from a clean replayable boundary. A crash before the work is saved now re-runs it without any premature run-end reaching consumers; a crash in the finishing step just re-emits agent_end (idempotent-tolerated). Unlike the per-round-trip steering hop this fires once per turn, so the continue-path inlining (the perf win) is unchanged. `finishing` is non-terminal, so the run::start in-flight guard treats a finishing turn as busy.
…er, retried wakes
Three holes in the in-flight guard:
- The guard read used the tolerant state wrapper, so a transient state::get
failure looked like "no record" and the guard failed OPEN, clobbering a live
turn. The check now uses a strict read (loadRecordStrict) that throws, so a
read blip fails the kickoff with a retryable error instead.
- A record wedged in a non-terminal state (lost wake, DLQ'd step) made the
session busy forever — the old clobber was the accidental recovery path and
nothing replaced it. run::start now deliberately takes over an in-flight
record idle past STALE_TURN_TAKEOVER_MS (30m; updated_at_ms refreshes every
transition). function_awaiting_approval is exempt: it parks on the user by
design, and run::abort is its escape.
- enqueueTurnStep swallowed enqueue failures with a warn, silently stranding
the just-saved state. The wake is now retried with backoff; final failure
logs at error level and the stale takeover is the recovery valve.
Also: harness::trigger maps only an EXPLICIT started:false to 409 — a legacy
run::start registration that returns {session_id} without the field is a
success, not a conflict. And loadSessionMessages restores the 30s read budget
the path always ran under; the 10s introduced with the shared reader could
fail whole turns on long sessions.
The run::start busy guard means an in-flight session rejects new messages, so users need a deliberate way to end a turn: a runaway agent loop, an approval prompt they walked away from, or a turn they changed their mind about. Previously the only "interrupt" was the accidental clobber the guard removed. run::abort loads the record (strict read — an abort must not silently no-op on a read blip), and for any in-flight non-finishing turn: surfaces a synthetic aborted assistant (message_complete), emits turn_end, clears parked approvals from the record, and routes to the finishing step for agent_end + stopped. No-op when nothing is running or the turn is already finishing. Best-effort against an actively-executing step (record writes are last-write-wins); the abort lands reliably in the gaps between steps, so a retry interrupts even a busy loop. Parked approval turns abort cleanly — no step holds them. Also wires the 'aborted' approval decision end-to-end: approval::resolve now accepts it on the wire (the orchestrator's read side always handled it but no writer existed), and an aborted call's synthetic result terminates the turn instead of resuming the loop for another round-trip.
The chat backend discarded the harness::trigger result, so a refused run::start (started:false — the session already has a turn in flight) was invisible: the user's bubble rendered, the message was never appended, and the stream generator waited on session events that might never arrive (e.g. a turn parked on an approval) — a silent drop with a stuck spinner. realStream now reads the kickoff envelope: on started:false it yields a stop-reason notice telling the user the message was not sent and to stop the running turn or answer its pending approval, then ends the stream cleanly. The Stop button previously only aborted the client-side generator — the server turn kept running and kept the session busy. It now also calls the new run::abort (best-effort server cancel) so stopping actually frees the session for the next message.
There was a problem hiding this comment.
Actionable comments posted: 2
🧹 Nitpick comments (2)
harness/src/turn-orchestrator/state-runtime/store.ts (1)
36-68: ⚡ Quick winWell-hardened wake retry logic.
The retry loop correctly implements incremental backoff and fail-safe behavior. After exhausting retries, the error log provides good debugging context (session_id, state, attempts, err). The stale-takeover recovery mechanism (referenced in the comment and PR objectives) provides a fallback for stranded turns.
Consider adding a metric or alert for
wakeStep failed after retrieserrors, since they indicate turns stranded until the 30-minute takeover window—tracking the frequency and session impact would help tune retry parameters or detect systemic trigger failures.🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@harness/src/turn-orchestrator/state-runtime/store.ts` around lines 36 - 68, Add a metric/alert emission when the wake retry finally fails inside enqueueTurnStep: after the final catch branch that currently calls logger.error('wakeStep failed after retries; turn is stranded until stale takeover', {...}), also increment or emit a metric/telemetry event (e.g., metrics.increment or meter.counter) with a unique name like "turn.wake_failed" and include tags/labels for session_id, state and attempts so you can monitor frequency and build alerts; reference the function enqueueTurnStep and the existing constants WAKE_ENQUEUE_ATTEMPTS/WAKE_ENQUEUE_BACKOFF_MS and ensure the metric call is colocated with the existing logger.error so failures are both logged and counted/alertable.harness/src/turn-orchestrator/run-abort.ts (1)
52-52: ⚡ Quick winRedundant type assertion on line 52.
The variable
recis already typed asTurnStateRecordfrom theloadRecordStrictcall on line 34. The cast(rec as TurnStateRecord)is unnecessary sinceworkis an optional field on that type.♻️ Simplify by removing the cast
- (rec as TurnStateRecord).work = undefined; + rec.work = undefined;🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@harness/src/turn-orchestrator/run-abort.ts` at line 52, Remove the unnecessary type assertion when clearing the work field: `rec` is already a TurnStateRecord returned by `loadRecordStrict`, so replace the cast expression `(rec as TurnStateRecord).work = undefined;` with a direct assignment `rec.work = undefined;` (keep the same variable `rec` and the `work` property so type-checking and intent remain unchanged).
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
Inline comments:
In `@harness/src/turn-orchestrator/run-abort.ts`:
- Line 1: This file failed CI due to formatting differences; run the project's
formatter (e.g. npm run format or yarn format / prettier --write .) against the
repository, reformat the run-abort source (the file that begins with /**),
review the changed formatting, stage and commit the formatted file, and push the
commit so the CI static analysis passes.
In `@harness/tests/turn-orchestrator/run-abort.test.ts`:
- Line 1: The file's formatting doesn't match the project's expectations; run
the project's formatter (e.g., the configured Prettier/format script) on the
test file containing the vitest import line "import { describe, expect, it, vi }
from 'vitest';" so the file's whitespace/line endings and overall style match CI
rules; re-run the formatter across the repo or at least on this file and commit
the reformatted file.
---
Nitpick comments:
In `@harness/src/turn-orchestrator/run-abort.ts`:
- Line 52: Remove the unnecessary type assertion when clearing the work field:
`rec` is already a TurnStateRecord returned by `loadRecordStrict`, so replace
the cast expression `(rec as TurnStateRecord).work = undefined;` with a direct
assignment `rec.work = undefined;` (keep the same variable `rec` and the `work`
property so type-checking and intent remain unchanged).
In `@harness/src/turn-orchestrator/state-runtime/store.ts`:
- Around line 36-68: Add a metric/alert emission when the wake retry finally
fails inside enqueueTurnStep: after the final catch branch that currently calls
logger.error('wakeStep failed after retries; turn is stranded until stale
takeover', {...}), also increment or emit a metric/telemetry event (e.g.,
metrics.increment or meter.counter) with a unique name like "turn.wake_failed"
and include tags/labels for session_id, state and attempts so you can monitor
frequency and build alerts; reference the function enqueueTurnStep and the
existing constants WAKE_ENQUEUE_ATTEMPTS/WAKE_ENQUEUE_BACKOFF_MS and ensure the
metric call is colocated with the existing logger.error so failures are both
logged and counted/alertable.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: CHILL
Plan: Pro
Run ID: 0d34be2d-b832-4199-89af-32b928276ea3
📒 Files selected for processing (30)
console/web/src/components/chat/ChatView.tsxconsole/web/src/lib/backend/real.tsconsole/web/src/lib/backend/types.tsharness/src/approval-gate/schemas.tsharness/src/harness/trigger.tsharness/src/turn-orchestrator/assistant-streaming/run.tsharness/src/turn-orchestrator/finishing.tsharness/src/turn-orchestrator/function-awaiting-approval/run.tsharness/src/turn-orchestrator/function-execute/run.tsharness/src/turn-orchestrator/register.tsharness/src/turn-orchestrator/run-abort.tsharness/src/turn-orchestrator/run-start.tsharness/src/turn-orchestrator/schemas.tsharness/src/turn-orchestrator/state-runtime/context-view.tsharness/src/turn-orchestrator/state-runtime/store.tsharness/src/turn-orchestrator/state-runtime/turn-end.tsharness/src/turn-orchestrator/state.tsharness/src/turn-orchestrator/steering-check/run.tsharness/tests/approval-gate/schemas.test.tsharness/tests/turn-orchestrator/_helpers/mockTurnStore.tsharness/tests/turn-orchestrator/assistant-streaming.test.tsharness/tests/turn-orchestrator/assistant.test.tsharness/tests/turn-orchestrator/finishing.test.tsharness/tests/turn-orchestrator/function-awaiting-approval.test.tsharness/tests/turn-orchestrator/function-execute.test.tsharness/tests/turn-orchestrator/functions.test.tsharness/tests/turn-orchestrator/run-abort.test.tsharness/tests/turn-orchestrator/run-start.test.tsharness/tests/turn-orchestrator/steering-check-layer.test.tsharness/tests/turn-orchestrator/steering.test.ts
✅ Files skipped from review due to trivial changes (1)
- harness/src/turn-orchestrator/function-awaiting-approval/run.ts
🚧 Files skipped from review as they are similar to previous changes (8)
- harness/tests/turn-orchestrator/_helpers/mockTurnStore.ts
- harness/src/turn-orchestrator/run-start.ts
- harness/src/turn-orchestrator/steering-check/run.ts
- harness/src/turn-orchestrator/function-execute/run.ts
- harness/tests/turn-orchestrator/run-start.test.ts
- harness/src/turn-orchestrator/state-runtime/turn-end.ts
- harness/src/turn-orchestrator/assistant-streaming/run.ts
- harness/tests/turn-orchestrator/functions.test.ts
…rovals don't wedge A turn with parallel tool calls that each need approval got stuck: the user clicked approve and the prompt hung on "approving…" forever, while an already-run call was mislabeled as the pending one. Root cause was the fcall reducer matching function_execution_end to "the most recently started fcall" (a single cursor) instead of by function_call_id. Approval-resolved calls execute with skipStart (no function_execution_start), so the resolved call's end landed on whichever pending card was created last — clearing the WRONG card and leaving the real one stuck pending. The actually- awaiting call then had no live prompt, so the turn never resumed. (The backend resume itself is correct: writing the missing decision via approval::resolve advances the turn immediately.) - translate now threads function_call_id onto fcall-end (it was already on the wire event, just dropped), and emits a new fcall-approval-cleared event for calls that leave awaiting_approval so their prompt clears even when no execution follows (aborted / resolved out-of-band). - ChatView matches fcall-end to its card by function_call_id (cursor fallback only when no id is carried, e.g. the mock backend) and handles fcall-approval-cleared. This is the bug behind the stuck approval; the run::start guard added earlier made it unrecoverable-by-new-message, and the Stop→run::abort wiring is the safety net if a session ever does wedge.
Remove standalone and inline // comments across the turn-orchestrator, session-tree, provider, compaction, and console chat paths. Lint directives (biome-ignore) are kept.
Routing was inlined into assistant_streaming/function_execute in the previous release; the steering_check step existed only to drain records persisted in that state at deploy time. Remove the steering-check module, its registration, the 'steering_check' TurnState, the SteeringCheckTurnRecord type/schema/parser, and the steering tests. A record still persisted in steering_check no longer parses and is treated by run::start as absent; the queue must be confirmed drained before this ships.
Summary
The turn loop re-fetched constant data and re-wrote session state on every round-trip. Tracing one console turn showed
models::getrunning three times per iteration,harness::provider::resolveonce per stream,finalizeBatchrebuilding the full compaction window for a trailing-ids read, aturn::steering_checkFSM step gating every continue/end decision, andsession-tree::appendfiring once per message with a full-session leaf re-read each time.This collapses that redundancy, then hardens the paths the change touched: crash-safe terminal signaling, an in-flight guard for
run::startwith a deliberate recovery story, and a user-facing abort.Performance changes
Modelresolved atprovisioning, persisted on the turn record, threaded to preflight, the provider (ProviderStreamInput.model_meta, leniently validated — a sparse entry falls back to a live fetch, never fails the stream), and compaction (turn_end.model_limit).models::getdrops from ~3/round-trip to ~1/turn.ProviderStreamInput.resolution_key, invalidated on 401.finalizeBatchreads only the raw trailing result run (TurnStore.loadTrailingResultIds), skipping the paired compactions read. Dedup walks past trailing no-call assistants, so a max_turns crash-replay can't re-append the batch.assistant_streaming/function_execute;turn::steering_checkstays registered as a one-release compat drain.session-tree::append_batchresolves the active leaf once and writes the chain with one meta refresh. Leaf resolution is now chain-tip-based (resolveActiveLeaf), so sort-order/timestamp ties can no longer orphan entries off the active path.Correctness & UX changes
turn::finishingstep (agent_end outbox). Terminating turns persist their work +turn_end, then a separate replayable step emitsagent_endand advances tostopped— a crash can no longer replay a full LLM stream after consumers saw the run end.run::startin-flight guard. A second kickoff for a busy session no longer clobbers the live turn; it returnsstarted:false(mapped to 409 byharness::trigger, only on an explicitfalseso legacy registrations stay 200). The guard reads strictly (a state blip fails closed, not open) and deliberately takes over records idle past 30 minutes — the recovery valve the old clobber provided by accident. Approval-parked turns are exempt from takeover.run::abort— user interrupt. Ends an in-flight turn: synthetic aborted assistant,turn_end, cleared parked approvals, thenfinishing. Best-effort against an actively-executing step; clean for parked approvals. Theabortedapproval decision is also wired end-to-end (approval::resolveaccepts it; an aborted call terminates the turn instead of resuming the loop).run::abortso stopping actually frees the session.Test plan
pnpm typecheckclean,pnpm test1334 passing (123 files),biome checkno errorspnpm typecheckclean,pnpm test524 passing,biome checkcleanturn::steering_checkqueue drains empty before the release-B removalKnown tradeoffs (documented, deliberate)
Follow-ups (not in this PR)
turn::steering_checkstate/schemas/registration once the compat-drain queue is emptyrun::start's guard+seed in the session lease (atomic double-submit protection)turn_endintoturn::finishing(closes the remaining pre-save emit window at zero cost)stream_triggersspan on connection-delivery triggers (separate repo)Summary by CodeRabbit
New Features
Performance Improvements
Reliability Enhancements
Bug Fixes