Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
46 changes: 22 additions & 24 deletions console/web/src/components/chat/ChatView.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,6 @@ export function ChatView({
const harnessBlockedRef = useRef(harnessBlocked)
harnessBlockedRef.current = harnessBlocked

// Matches iii.session.id on every span so the traces UI can group by it.
const sessionId = useMemo(
() => makeSessionId(conversation.id),
[conversation.id],
Expand Down Expand Up @@ -151,10 +150,6 @@ export function ChatView({
const handleAlwaysAllow = useMemo(() => {
const resolveFn = backend.resolveApproval
if (!resolveFn) return undefined
// "Approve always" is a per-session grant honored in every mode, so the
// button shows on every prompt (full mode never produces prompts, so
// it's moot there). Approves this call and stops asking for the same
// function for the rest of the conversation.
return async (sId: string, functionCallId: string, functionId: string) => {
await approvalSettings.approveAlways(functionId)
await resolveFn(sId, functionCallId, 'allow')
Expand Down Expand Up @@ -233,7 +228,6 @@ export function ChatView({
}
onCompactConversation(conversationId, marker)
} else {
// empty | busy | overflow | error: nothing rewritten server-side.
onPatchMessage(conversationId, pendingId, {
content: formatCompactResult(result),
tone: compactResultTone(result),
Expand Down Expand Up @@ -340,15 +334,31 @@ export function ChatView({
break
}
case 'fcall-end': {
if (!fcallId) break
onPatchMessage(conversationId, fcallId, {
const targetId: string | null = event.functionCallId
? ([...fcallMap.entries()].find(
([, fcid]) => fcid === event.functionCallId,
)?.[0] ?? null)
: fcallId
if (!targetId) break
onPatchMessage(conversationId, targetId, {
output: event.output,
durationMs: event.durationMs,
running: false,
pendingApproval: false,
})
fcallMap.delete(fcallId)
fcallId = null
fcallMap.delete(targetId)
if (targetId === fcallId) fcallId = null
break
}
case 'fcall-approval-cleared': {
const clearedId = [...fcallMap.entries()].find(
([, fcid]) => fcid === event.functionCallId,
)?.[0]
if (clearedId) {
onPatchMessage(conversationId, clearedId, {
pendingApproval: false,
})
}
break
}
case 'assistant-token': {
Expand Down Expand Up @@ -384,12 +394,6 @@ export function ChatView({
break
}
case 'compaction': {
// Server-side context-compaction finished rewriting the
// session's flat-state. Append a marker so:
// 1. The user sees that compaction happened.
// 2. estimateConversationTokens stops counting pre-marker
// messages → the CTX bar drops to the real value.
// We append (not replace) so the transcript stays scrollable.
const compactionContent =
event.mode === 'sync'
? `compacted ${event.tokensBefore.toLocaleString()} tokens before continuing`
Expand All @@ -409,10 +413,6 @@ export function ChatView({
break
}
case 'stop-reason': {
// The assistant turn ended abnormally — show the user why
// instead of leaving the response looking like it just
// ran out of words. Pre-fix this event didn't exist and
// the same condition produced a silently truncated reply.
const noticeContent = formatStopReason(
event.reason,
event.message,
Expand Down Expand Up @@ -443,7 +443,6 @@ export function ChatView({
}
}
} catch (err) {
// Backends signal semantic failures via fcall-end; thrown = abort or bug.
if (!isAbortError(err)) {
console.warn('[chat] stream errored', err)
}
Expand Down Expand Up @@ -477,10 +476,9 @@ export function ChatView({

const handleStop = useCallback(() => {
abortRef.current?.abort()
}, [])
void backend.abortRun?.(sessionId).catch(() => {})
}, [backend, sessionId])

// Covers the gap between submit / fcall-end and the next streamed token,
// where the assistant/thought shimmer hasn't yet rendered.
const isThinking =
isStreaming &&
(() => {
Expand Down
75 changes: 47 additions & 28 deletions console/web/src/lib/backend/real.ts
Original file line number Diff line number Diff line change
Expand Up @@ -54,12 +54,6 @@ async function* realStream(
r?.()
}

// Subscribe directly to this session's `agent::events` stream via a scoped
// engine stream trigger (`group_id = sessionId`), replacing the harness
// fanout hop (`ui::subscribe` → per-browser `ui::session::event` push).
// Registered before the `harness::trigger` kickoff below — both travel the
// same ordered WS connection, so the trigger is in place before the turn's
// first event is written.
const stopSubscription = startSessionEventsSubscription(
client,
sessionId,
Expand Down Expand Up @@ -97,24 +91,34 @@ async function* realStream(
const { provider, model: modelId } = resolveRunParams(model)

let kickoffError: Error | null = null
let sessionBusy = false
client
.call('harness::trigger', {
session_id: sessionId,
message_id: messageId,
payload: {
.call<{ status_code?: number; body?: { started?: boolean } }>(
'harness::trigger',
{
session_id: sessionId,
message_id: messageId,
provider,
model: modelId,
mode,
messages: [
{
role: 'user',
content: [{ type: 'text', text: prompt }],
timestamp: Date.now(),
},
],
payload: {
session_id: sessionId,
message_id: messageId,
provider,
model: modelId,
mode,
messages: [
{
role: 'user',
content: [{ type: 'text', text: prompt }],
timestamp: Date.now(),
},
],
},
},
)
.then((res) => {
if (res?.body?.started === false) {
sessionBusy = true
wake()
}
})
.catch((err) => {
kickoffError = err instanceof Error ? err : new Error(String(err))
Expand All @@ -126,6 +130,16 @@ async function* realStream(

while (true) {
if (signal?.aborted) return
if (sessionBusy) {
yield {
kind: 'stop-reason',
reason: 'error',
message:
'A turn is still running in this session — your message was not sent. Stop the current turn (or answer its pending approval), then resend.',
}
yield { kind: 'assistant-end' }
return
}
if (kickoffError) {
const err = kickoffError as Error
yield {
Expand All @@ -135,13 +149,18 @@ async function* realStream(
yield { kind: 'assistant-end' }
return
}
while (queue.length === 0 && !kickoffError && !signal?.aborted) {
while (
queue.length === 0 &&
!kickoffError &&
!sessionBusy &&
!signal?.aborted
) {
await new Promise<void>((resolve) => {
resolveNext = resolve
})
}
if (signal?.aborted) return
if (kickoffError) continue
if (kickoffError || sessionBusy) continue
const event = queue.shift()
if (!event) continue
const streamEvents = translate(event, sessionId)
Expand Down Expand Up @@ -169,6 +188,11 @@ async function realResolveApproval(
})
}

async function realAbortRun(sessionId: string): Promise<void> {
const client = await getIiiClient()
await client.call('run::abort', { session_id: sessionId })
}

async function realCompactSession(
sessionId: string,
model: ModelId,
Expand All @@ -177,9 +201,6 @@ async function realCompactSession(
const { provider, model: modelId } = resolveRunParams(model)
const client = await getIiiClient()
try {
// Passing limit.context lets the server skip the models::get lookup.
// We don't know max_output here; 4096 is the same conservative default
// the server falls back to when models::get returns nothing.
const DEFAULT_MAX_OUTPUT = 4_096
const modelPayload: {
id: string
Expand Down Expand Up @@ -209,10 +230,7 @@ async function realCompactSession(
if (resp?.status === 'ok') {
const tokensBefore =
typeof resp.tokens_before === 'number' ? resp.tokens_before : 0
// Surface zero-token "ok" as semantic empty.
if (tokensBefore === 0) return { status: 'empty' }
// Fallback placeholder for engines that predate summary_text on the
// wire; without it the marker has no <conversation-summary> to ship.
const summaryText =
typeof resp.summary_text === 'string' && resp.summary_text.length > 0
? resp.summary_text
Expand Down Expand Up @@ -249,5 +267,6 @@ export const realBackend: ChatBackend = {
id: 'real',
stream: realStream,
resolveApproval: realResolveApproval,
abortRun: realAbortRun,
compactSession: realCompactSession,
}
27 changes: 25 additions & 2 deletions console/web/src/lib/backend/translate.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -231,7 +231,7 @@ describe('createAgentEventTranslator — turn_state_changed', () => {
).toEqual([])
})

it('emits nothing when state leaves function_awaiting_approval (the orchestrator emits the matching function_execution_end)', () => {
it('clears the pending prompt when a call leaves function_awaiting_approval', () => {
const { translate } = createAgentEventTranslator()
translate(
{
Expand Down Expand Up @@ -265,7 +265,30 @@ describe('createAgentEventTranslator — turn_state_changed', () => {
},
'sess-a',
),
).toEqual([])
).toEqual([{ kind: 'fcall-approval-cleared', functionCallId: 'fc-1' }])
})

it('threads function_call_id onto fcall-end so the consumer can correlate', () => {
const { translate } = createAgentEventTranslator()
const out = translate(
{
type: 'function_execution_end',
function_call_id: 'fc-7',
function_id: 'shell::fs::ls',
result: { content: [], details: {} },
is_error: false,
duration_ms: 30,
},
'sess-a',
)
expect(out).toEqual([
{
kind: 'fcall-end',
output: { content: [], details: {} },
durationMs: 30,
functionCallId: 'fc-7',
},
])
})

it('partitions mirrors by sessionId so two chats do not interfere', () => {
Expand Down
12 changes: 10 additions & 2 deletions console/web/src/lib/backend/translate.ts
Original file line number Diff line number Diff line change
Expand Up @@ -46,15 +46,22 @@ export function createAgentEventTranslator(): {
const prev = mirrors.get(sessionId) ?? []
const next = pendingApprovalsFromTurnState(event.new_value)
mirrors.set(sessionId, next)
const { added } = diffPending(prev, next)
return added.map((entry) => ({
const { added, removed } = diffPending(prev, next)
const out: StreamEvent[] = added.map((entry) => ({
kind: 'fcall-start' as const,
functionId: entry.function_id,
input: entry.args,
pendingApproval: true,
functionCallId: entry.function_call_id,
sessionId,
}))
for (const entry of removed) {
out.push({
kind: 'fcall-approval-cleared',
functionCallId: entry.function_call_id,
})
}
return out
}

function translate(event: AgentEvent, sessionId?: string): StreamEvent[] {
Expand Down Expand Up @@ -90,6 +97,7 @@ export function createAgentEventTranslator(): {
? wrapErrorOutput(event.result)
: event.result,
durationMs: event.duration_ms,
functionCallId: event.function_call_id,
},
]

Expand Down
29 changes: 28 additions & 1 deletion console/web/src/lib/backend/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,28 @@ export type StreamEvent =
/** iii session_id owning this call — needed to resolve approval. */
sessionId?: string
}
| { kind: 'fcall-end'; output: unknown; durationMs: number }
| {
kind: 'fcall-end'
output: unknown
durationMs: number
/**
* iii function_call_id of the call that ended. Parallel tool calls (and
* approval-resolved calls, which emit no fcall-start) finish out of
* order, so the consumer MUST match the end to its card by this id, not
* by "the most recently started call".
*/
functionCallId?: string
}
| {
/**
* A call left the pending-approval set without executing (aborted, or
* resolved out-of-band): clear its card's approval prompt so it doesn't
* hang on "approving…". A resolved-and-executed call also emits
* `fcall-end`; both patching to non-pending is idempotent.
*/
kind: 'fcall-approval-cleared'
functionCallId: string
}
| { kind: 'assistant-token'; token: string }
| { kind: 'assistant-end' }
| {
Expand Down Expand Up @@ -114,6 +135,12 @@ export interface ChatBackend {
functionCallId: string,
decision: 'allow' | 'deny',
): Promise<void>
/**
* Server-side cancel of the session's in-flight turn (`run::abort`).
* The client-side AbortSignal only stops rendering; without this the
* server keeps running and `run::start` rejects new messages as busy.
*/
abortRun?(sessionId: string): Promise<void>
/**
* Powers `/compact`. Compacts the session-tree (the single source of
* truth) directly. `contextWindow` skips the server's `models::get`
Expand Down
1 change: 1 addition & 0 deletions console/web/src/stories/playground/EventLog.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -175,6 +175,7 @@ function toneFor(kind: StreamEvent['kind']): string {
return 'text-ink-ghost'
case 'fcall-start':
case 'fcall-end':
case 'fcall-approval-cleared':
return 'text-accent'
case 'assistant-token':
return 'text-ink-ghost'
Expand Down
8 changes: 1 addition & 7 deletions harness/src/approval-gate/schemas.ts
Original file line number Diff line number Diff line change
Expand Up @@ -24,13 +24,7 @@ export type AlwaysAllowEntry = z.infer<typeof allowEntrySchema>;

export const ApprovalSettingsSchema = z.object({
mode: PermissionModeSchema,
// Auto-mode allowlist: curated on the Configuration screen, consulted
// ONLY in auto mode.
always_allow: z.array(allowEntrySchema),
// Per-session "approve always" grants made from an approval prompt.
// Consulted in EVERY mode (it's a remembered human decision, not an
// auto-policy). `.default([])` keeps records written before this field
// existed parseable.
approved_always: z.array(allowEntrySchema).default([]),
mode_set_at: z.number().int().nonnegative(),
});
Expand All @@ -43,7 +37,7 @@ export const DEFAULT_APPROVAL_SETTINGS: ApprovalSettings = {
mode_set_at: 0,
};

const wireDecisionSchema = z.enum(['allow', 'deny']);
const wireDecisionSchema = z.enum(['allow', 'deny', 'aborted']);

const deniedBySchema = z.enum(['permissions', 'user', 'gate_unavailable']);

Expand Down
Loading
Loading