Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 32 additions & 0 deletions libs/langgraph/src/lib/internals/stream-manager.bridge.ts
Original file line number Diff line number Diff line change
Expand Up @@ -420,6 +420,7 @@ export function createStreamManagerBridge<T, ResolvedBag extends BagTemplate = B
break;
}
if (vals != null) {
extractInterrupts(vals, subjects);
subjects.values$.next(vals as T);
// Also sync messages$ from the values state so the full message
// history (including human messages) is available to consumers.
Expand Down Expand Up @@ -480,6 +481,7 @@ export function createStreamManagerBridge<T, ResolvedBag extends BagTemplate = B
break;
}
if (upd != null) {
extractInterrupts(upd, subjects);
subjects.values$.next({
...subjects.values$.value,
...(upd as object),
Expand Down Expand Up @@ -721,6 +723,36 @@ export function createStreamManagerBridge<T, ResolvedBag extends BagTemplate = B
* 1. SDK events (via normalizeSdkEvent): data at event['data'] (record) + spread into event
* 2. Mock/test events: data at event[event.type] (e.g., event['values'], event['updates'])
*/
/**
* LangGraph emits interrupts as part of `updates`/`values` events under the
* special `__interrupt__` key, not as standalone events. When such a payload
* appears, mirror it onto `interrupt$` (latest) and `interrupts$` (full list)
* so consumers can react via `agent.interrupt()` / `agent.interrupts()`.
*/
function extractInterrupts<T, B extends BagTemplate>(
payload: unknown,
subjects: StreamSubjects<T, B>,
): void {
if (!payload || typeof payload !== 'object' || Array.isArray(payload)) return;
const raw = (payload as Record<string, unknown>)['__interrupt__'];
// Cast through unknown — Interrupt$ is parameterized over Bag's InterruptType,
// and the SDK delivers raw Interrupt payloads here.
if (Array.isArray(raw) && raw.length > 0) {
const list = raw as Interrupt[];
subjects.interrupts$.next(list as unknown as Parameters<typeof subjects.interrupts$.next>[0]);
subjects.interrupt$.next(list[list.length - 1] as unknown as Parameters<typeof subjects.interrupt$.next>[0]);
return;
}
// Payload has no `__interrupt__` key. Clear any stale interrupt so the UI
// dismisses the panel after a resume completes (LangGraph does not emit a
// separate "cleared" event — the absence of `__interrupt__` in subsequent
// values/updates is the signal). No-op if interrupt$ was already empty.
if (subjects.interrupt$.value !== undefined) {
subjects.interrupt$.next(undefined);
subjects.interrupts$.next([]);
}
}

function extractEventData(event: StreamEvent): unknown {
// Try event['data'] first (SDK format from normalizeSdkEvent)
const d = event['data'];
Expand Down
Loading