diff --git a/README.md b/README.md index 4d9990214..3236e1025 100644 --- a/README.md +++ b/README.md @@ -27,7 +27,7 @@ --- -`agent()` is the Angular equivalent of LangGraph's React `useStream()` hook — a full-parity implementation built on Angular Signals and the Angular Resource API. It gives enterprise Angular teams the same production-grade streaming primitives available to React developers on LangChain, without compromises or workarounds. Drop it into any Angular 20+ component, point it at your LangGraph Platform endpoint, and get reactive, signal-driven access to streaming state, messages, tool calls, interrupts, and thread history. +`agent()` is the Angular equivalent of LangGraph's React `useStream()` hook, built on Angular Signals and the Angular Resource API. It gives enterprise Angular teams production-grade streaming primitives for LangChain. Drop it into any Angular 20+ component, point it at your LangGraph Platform endpoint, and get reactive, signal-driven access to streaming state, messages, tool calls, interrupts, and thread history. --- @@ -94,7 +94,7 @@ That's it. `chat.messages()` is an Angular Signal. Bind it directly in your temp | Tool call progress | `toolProgress()` | `toolProgress` | | Tool calls with results | `toolCalls()` | `toolCalls` | | Branch / history | `branch()` / `history()` | `branch` / `history` | -| Subagent streaming | `subagents()` / `activeSubagents()` | `subagents` / `activeSubagents` | +| Subagent streaming | Planned next | `subagents` / `activeSubagents` | | Reactive thread switching | `Signal` input | prop | | Submit | `submit(values, opts?)` | `submit(values, opts?)` | | Stop | `stop()` | `stop()` | diff --git a/apps/website/content/docs/agent/concepts/agent-architecture.mdx b/apps/website/content/docs/agent/concepts/agent-architecture.mdx index 84fc43b5c..00c3f11ad 100644 --- a/apps/website/content/docs/agent/concepts/agent-architecture.mdx +++ b/apps/website/content/docs/agent/concepts/agent-architecture.mdx @@ -417,21 +417,21 @@ interface OrchestratorState {
-

All Subagents

- @for (entry of allSubagents(); track entry[0]) { - +

Completed Tool Results

+ @for (tool of completedTools(); track tool.id) { + }
`, @@ -439,18 +439,15 @@ interface OrchestratorState { export class MultiAgentComponent { orchestrator = agent({ assistantId: 'orchestrator', - subagentToolNames: ['researcher', 'analyst', 'writer'], }); messages = this.orchestrator.messages; - // Currently running subagents with live status - activeWorkers = computed(() => this.orchestrator.activeSubagents()); + // Currently running delegated work with live status + activeTools = computed(() => this.orchestrator.toolProgress()); - // Full map of all subagents (active + completed) - allSubagents = computed(() => - Array.from(this.orchestrator.subagents().entries()) - ); + // Completed tool calls with results + completedTools = computed(() => this.orchestrator.toolCalls()); send(text: string) { this.orchestrator.submit({ @@ -463,8 +460,12 @@ export class MultiAgentComponent { + +Tool calls, tool progress, and tool results stream today. Dedicated `subagents()` and `activeSubagents()` tracking is planned for the next implementation phase; use `toolCalls()` and `toolProgress()` for current delegated-work visibility. + + -The `subagentToolNames` option tells agent() which graph nodes are subagents. Without it, subagent execution looks like regular tool calls. With it, `activeSubagents()` and `subagents()` provide dedicated tracking with isolated message histories. +The `subagentToolNames` option will tell agent() which graph nodes are subagents. Until dedicated tracking lands, subagent execution appears through the regular tool-call and tool-progress signals. ## Error Handling and Recovery @@ -661,7 +662,7 @@ builder.add_node("analyst", analyst_subgraph) builder.add_conditional_edges("supervisor", route_to_agent) ``` -**Angular signals used:** `messages()`, `subagents()`, `activeSubagents()`, `toolCalls()`, `status()` +**Angular signals used today:** `messages()`, `toolCalls()`, `toolProgress()`, `status()`; dedicated `subagents()` / `activeSubagents()` tracking is planned next. ### Decision Matrix diff --git a/apps/website/content/docs/agent/concepts/langgraph-basics.mdx b/apps/website/content/docs/agent/concepts/langgraph-basics.mdx index 3c0ba4b34..2537fbfea 100644 --- a/apps/website/content/docs/agent/concepts/langgraph-basics.mdx +++ b/apps/website/content/docs/agent/concepts/langgraph-basics.mdx @@ -227,16 +227,14 @@ builder.add_node("analyst", analyst_subgraph) builder.add_conditional_edges("supervisor", lambda s: s["next_agent"]) ``` -**Angular connection:** Track each sub-agent independently: +**Angular connection:** Dedicated subagent tracking is planned for the next implementation phase. Today, track delegated work through tool progress and tool results: ```typescript const orchestrator = agent({ assistantId: 'orchestrator', - subagentToolNames: ['researcher', 'analyst', 'writer'], }); -// See all active sub-agents -const workers = computed(() => orchestrator.activeSubagents()); -const workerCount = computed(() => workers().length); +const activeTools = computed(() => orchestrator.toolProgress()); +const completedTools = computed(() => orchestrator.toolCalls()); ``` ### Pattern 4: Persistent Conversations @@ -322,10 +320,9 @@ agent.interrupt() // Signal — agent is paused agent.history() // Signal — checkpoint timeline agent.branch() // Signal — time-travel branch -// Multi-agent -agent.subagents() // Signal — delegated agents -agent.activeSubagents() // Signal — running workers agent.toolCalls() // Signal — tool results +agent.toolProgress() // Signal — active tool execution +// Dedicated subagent signals are planned next. ``` diff --git a/apps/website/content/docs/agent/guides/subgraphs.mdx b/apps/website/content/docs/agent/guides/subgraphs.mdx index f8618be7e..0cd51ca3e 100644 --- a/apps/website/content/docs/agent/guides/subgraphs.mdx +++ b/apps/website/content/docs/agent/guides/subgraphs.mdx @@ -6,6 +6,10 @@ Subgraphs let you compose complex agents from smaller, focused units. agent() tr LangGraph calls them subgraphs (modular graph composition). Deep Agents calls them subagents (task delegation). agent() supports both patterns through the same API. + +Tool calls, tool progress, and tool results stream today. The `subagents()` / `activeSubagents()` examples below describe the planned Phase 2 API; until that lands, use `toolCalls()` and `toolProgress()` for visibility into delegated work. + + ## How subgraph composition works Subgraph composition starts on the agent side. Each subgraph is a fully compiled `StateGraph` that can be added as a node in a parent graph. diff --git a/apps/website/src/components/docs/mdx/FeatureChips.tsx b/apps/website/src/components/docs/mdx/FeatureChips.tsx index e888e7cff..25e429a11 100644 --- a/apps/website/src/components/docs/mdx/FeatureChips.tsx +++ b/apps/website/src/components/docs/mdx/FeatureChips.tsx @@ -17,7 +17,7 @@ const CHIPS: ChipData[] = [ { icon: '💾', title: 'Persistence', signal: 'threadId', href: '/docs/guides/persistence', gradient: 'linear-gradient(135deg, rgba(16,185,129,0.06), rgba(52,199,89,0.08))', border: 'rgba(16,185,129,0.1)' }, { icon: '✋', title: 'Interrupts', signal: 'chat.interrupt()', href: '/docs/guides/interrupts', gradient: 'linear-gradient(135deg, rgba(232,147,12,0.06), rgba(245,180,60,0.08))', border: 'rgba(232,147,12,0.1)' }, { icon: '⏪', title: 'Time Travel', signal: 'chat.history()', href: '/docs/guides/time-travel', gradient: 'linear-gradient(135deg, rgba(221,0,49,0.05), rgba(255,100,130,0.07))', border: 'rgba(221,0,49,0.08)' }, - { icon: '🔀', title: 'Subagents', signal: 'chat.subagents()', href: '/docs/guides/subgraphs', gradient: 'linear-gradient(135deg, rgba(0,64,144,0.05), rgba(0,100,180,0.07))', border: 'rgba(0,64,144,0.08)' }, + { icon: '🔀', title: 'Subagents', signal: 'Phase 2', href: '/docs/guides/subgraphs', gradient: 'linear-gradient(135deg, rgba(0,64,144,0.05), rgba(0,100,180,0.07))', border: 'rgba(0,64,144,0.08)' }, { icon: '🔧', title: 'Tool Calls', signal: 'chat.toolCalls()', href: '/docs/guides/streaming', gradient: 'linear-gradient(135deg, rgba(100,80,200,0.05), rgba(120,100,210,0.07))', border: 'rgba(100,80,200,0.08)' }, { icon: '🧪', title: 'Testing', signal: 'MockTransport', href: '/docs/guides/testing', gradient: 'linear-gradient(135deg, rgba(16,185,129,0.05), rgba(40,200,140,0.07))', border: 'rgba(16,185,129,0.08)' }, ]; diff --git a/apps/website/src/components/landing/PositioningStrip.tsx b/apps/website/src/components/landing/PositioningStrip.tsx index 6bbe1fb41..f14b8cd6d 100644 --- a/apps/website/src/components/landing/PositioningStrip.tsx +++ b/apps/website/src/components/landing/PositioningStrip.tsx @@ -17,11 +17,10 @@ const CARDS: Card[] = [ }, { eyebrow: 'Streaming', - headline: 'Full-parity LangGraph streaming.', + headline: 'LangGraph streaming for Angular.', body: ( <> - agent() ships everything React's{' '} - useStream() does — interrupt, subagents, branch and history, tool progress — plus{' '} + agent() ships LangGraph streaming for interrupts, branch and history, tool progress, and tool results — plus{' '} error(),{' '} status(), and{' '} reload(). diff --git a/docs/limitations.md b/docs/limitations.md index 49b9e2935..0b017fdb4 100644 --- a/docs/limitations.md +++ b/docs/limitations.md @@ -79,32 +79,17 @@ automatically on `submit()` calls. --- -### Limitation: `getMessagesMetadata()` and `getToolCalls()` always return empty +### Limitation: subagent tracking is deferred -**Feature:** `getMessagesMetadata(msg, idx?)` / `getToolCalls(msg)` +**Feature:** `subagents()` / `activeSubagents()` / `filterSubagentMessages` / +`subagentToolNames` -**React behavior:** `useStream()` derives per-message metadata (run ID, feedback -keys, tool results) from an internal StreamManager message registry populated via -the `onMessagesMetadata` callback. +**React behavior:** `useStream()` can track Deep Agent subagent execution by +combining subgraph stream events with tool-call registration. -**Angular behavior:** v1 returns `undefined` / `[]` unconditionally. The -`StreamManager` callback integration is deferred. +**Angular behavior:** Tool calls, tool progress, message metadata, and +per-message tool results are implemented. Subagent-specific stream routing is +deferred to the next implementation phase. -**Workaround:** None in v1. Tool call results are available via `toolCalls()`. - ---- - -### Limitation: `toolProgress()` and `toolCalls()` signals always return empty - -**Feature:** `toolProgress()` / `toolCalls()` reactive signals - -**React behavior:** `useStream()` populates these from `tool_progress` and -`tool_calls` LangGraph SSE event types via StreamManager's internal dispatcher. - -**Angular behavior:** v1 leaves these unhandled in `processEvent` because the -LangGraph SDK's `ToolProgressEvent` and `ToolCallEvent` shapes need to be -confirmed against the published SDK types before implementation. Both signals -return `[]` unconditionally. - -**Workaround:** None in v1. Subscribe to raw stream events via a custom transport -if tool progress visibility is required. +**Workaround:** Use `toolCalls()` and `toolProgress()` for tool-level visibility +until dedicated subagent tracking lands. diff --git a/libs/langgraph/src/lib/agent.fn.spec.ts b/libs/langgraph/src/lib/agent.fn.spec.ts index aa4d89309..8bae682f9 100644 --- a/libs/langgraph/src/lib/agent.fn.spec.ts +++ b/libs/langgraph/src/lib/agent.fn.spec.ts @@ -1,10 +1,10 @@ import { describe, it, expect, beforeEach } from 'vitest'; import { TestBed } from '@angular/core/testing'; import { signal } from '@angular/core'; -import { HumanMessage, AIMessage } from '@langchain/core/messages'; +import type { AIMessage as CoreAIMessage } from '@langchain/core/messages'; import { agent } from './agent.fn'; import { MockAgentTransport } from './transport/mock-stream.transport'; -import { ResourceStatus } from './agent.types'; +import type { StreamEvent } from './agent.types'; function withInjectionContext(fn: () => T): T { let result!: T; @@ -235,6 +235,101 @@ describe('agent', () => { expect(Array.isArray(ref.langGraphToolCalls())).toBe(true); }); + it('toolProgress() reflects tools stream lifecycle events', async () => { + const transport = new MockAgentTransport(); + const ref = withInjectionContext(() => + agent({ apiUrl: '', assistantId: 'a', transport, throttle: false }) + ); + + ref.submit({ message: 'hello' }); + transport.emit([{ + type: 'tools', + data: { event: 'on_tool_start', toolCallId: 'call-1', name: 'search', input: { q: 'angular' } }, + } satisfies StreamEvent]); + transport.close(); + await new Promise(r => setTimeout(r, 20)); + + expect(ref.toolProgress()).toEqual([ + { + toolCallId: 'call-1', + name: 'search', + state: 'starting', + input: { q: 'angular' }, + }, + ]); + }); + + it('toolCalls() and getToolCalls() expose tool results derived from messages', async () => { + const transport = new MockAgentTransport(); + const ref = withInjectionContext(() => + agent({ apiUrl: '', assistantId: 'a', transport, throttle: false }) + ); + + ref.submit({ message: 'hello' }); + transport.emit([{ + type: 'messages', + messages: [ + { + id: 'ai-1', + type: 'ai', + content: '', + tool_calls: [{ id: 'call-1', name: 'search', args: { q: 'angular' } }], + }, + { + id: 'tool-1', + type: 'tool', + tool_call_id: 'call-1', + content: 'result', + status: 'success', + }, + ], + } satisfies StreamEvent]); + transport.close(); + await new Promise(r => setTimeout(r, 20)); + + expect(ref.langGraphToolCalls()).toHaveLength(1); + expect(ref.toolCalls()).toEqual([ + { + id: 'call-1', + name: 'search', + args: { q: 'angular' }, + status: 'complete', + result: 'result', + error: undefined, + }, + ]); + expect(ref.getToolCalls(ref.langGraphMessages()[0] as CoreAIMessage)).toHaveLength(1); + }); + + it('getMessagesMetadata() returns stream metadata captured from message tuples', async () => { + const transport = new MockAgentTransport(); + const ref = withInjectionContext(() => + agent({ apiUrl: '', assistantId: 'a', transport, throttle: false }) + ); + + ref.submit({ message: 'hello' }); + transport.emit([{ + type: 'messages', + messages: [{ id: 'ai-1', type: 'ai', content: 'hello' }], + messageMetadata: { langgraph_node: 'model', run_id: 'run-1' }, + } satisfies StreamEvent]); + transport.close(); + await new Promise(r => setTimeout(r, 20)); + + const aiMessage = ref.langGraphMessages().find( + msg => (msg as unknown as Record)['id'] === 'ai-1', + ); + if (!aiMessage) throw new Error('Expected streamed AI message'); + + expect(ref.getMessagesMetadata(aiMessage, 0)).toEqual({ + messageId: 'ai-1', + firstSeenState: undefined, + branch: undefined, + branchOptions: undefined, + streamMetadata: { langgraph_node: 'model', run_id: 'run-1' }, + }); + }); + it('events$ is an Observable-like with .subscribe', () => { const transport = new MockAgentTransport(); const ref = withInjectionContext(() => diff --git a/libs/langgraph/src/lib/agent.fn.ts b/libs/langgraph/src/lib/agent.fn.ts index 6f60f7bf4..74501659f 100644 --- a/libs/langgraph/src/lib/agent.fn.ts +++ b/libs/langgraph/src/lib/agent.fn.ts @@ -24,6 +24,7 @@ import type { Subagent, ToolCall, ToolCallStatus, + ContentBlock, AgentSubmitInput, AgentSubmitOptions, } from '@ngaf/chat'; @@ -37,6 +38,7 @@ import { ResourceStatus, } from './agent.types'; import type { ThreadState, ToolProgress } from '@langchain/langgraph-sdk'; +import type { MessageMetadata } from '@langchain/langgraph-sdk/ui'; import { createStreamManagerBridge } from './internals/stream-manager.bridge'; /** @@ -96,6 +98,7 @@ export function agent< const isThreadLoading$ = new BehaviorSubject(false); const toolProgress$ = new BehaviorSubject([]); const toolCalls$ = new BehaviorSubject([]); + const messageMetadata$ = new BehaviorSubject>>>(new Map()); const subagents$ = new BehaviorSubject>(new Map()); const custom$ = new BehaviorSubject([]); const hasValue$ = new BehaviorSubject(false); @@ -115,7 +118,7 @@ export function agent< const subjects: StreamSubjects> = { status$, values$, messages$, error$, interrupt$, interrupts$, branch$, history$, - isThreadLoading$, toolProgress$, toolCalls$, subagents$, custom$, + isThreadLoading$, toolProgress$, toolCalls$, messageMetadata$, subagents$, custom$, }; // threadId$ — resolved before bridge creation (injection context required for toObservable) @@ -238,9 +241,17 @@ export function agent< manager.switchThread(id); }, joinStream: (id, last) => manager.joinStream(id, last), - // V1 deferred: requires StreamManager's internal message registry - getMessagesMetadata: (_msg, _idx) => undefined, - getToolCalls: (_msg) => [], + getMessagesMetadata: (msg, idx) => { + const id = (msg as unknown as Record)['id']; + const key = id != null ? String(id) : idx != null ? String(idx) : undefined; + return key ? messageMetadata$.value.get(key) : undefined; + }, + getToolCalls: (msg) => { + const id = (msg as unknown as Record)['id']; + return id == null + ? [] + : toolCalls$.value.filter(tc => (tc.aiMessage as unknown as Record)['id'] === id); + }, }; } @@ -348,7 +359,7 @@ function buildSubmitPayload(input: AgentSubmitInput): unknown { if (input.message !== undefined) { const content = typeof input.message === 'string' ? input.message - : input.message.map((b: any) => (b.type === 'text' ? b.text : JSON.stringify(b))).join(''); + : input.message.map((b: ContentBlock) => (b.type === 'text' ? b.text : JSON.stringify(b))).join(''); return { messages: [{ role: 'human', content }], ...(input.state ?? {}) }; } return input.state ?? {}; diff --git a/libs/langgraph/src/lib/agent.types.ts b/libs/langgraph/src/lib/agent.types.ts index 051ca1722..896a495b4 100644 --- a/libs/langgraph/src/lib/agent.types.ts +++ b/libs/langgraph/src/lib/agent.types.ts @@ -56,6 +56,8 @@ export interface StreamEvent { | 'events' | 'interrupt' | 'interrupts'; + messages?: unknown[]; + messageMetadata?: Record; [key: string]: unknown; } @@ -212,6 +214,7 @@ export interface StreamSubjects; toolProgress$: BehaviorSubject; toolCalls$: BehaviorSubject; + messageMetadata$: BehaviorSubject>>>; subagents$: BehaviorSubject>; custom$: BehaviorSubject; } diff --git a/libs/langgraph/src/lib/internals/stream-manager.bridge.spec.ts b/libs/langgraph/src/lib/internals/stream-manager.bridge.spec.ts index 9c9b9e7f1..7f5b6c0e9 100644 --- a/libs/langgraph/src/lib/internals/stream-manager.bridge.spec.ts +++ b/libs/langgraph/src/lib/internals/stream-manager.bridge.spec.ts @@ -1,8 +1,8 @@ -import { describe, it, expect, beforeEach } from 'vitest'; +import { describe, it, expect } from 'vitest'; import { BehaviorSubject, Subject } from 'rxjs'; import { createStreamManagerBridge } from './stream-manager.bridge'; import { MockAgentTransport } from '../transport/mock-stream.transport'; -import { ResourceStatus, AgentTransport, StreamSubjects, CustomStreamEvent } from '../agent.types'; +import { ResourceStatus, AgentTransport, StreamSubjects, CustomStreamEvent, StreamEvent } from '../agent.types'; import { of } from 'rxjs'; function makeSubjects(): StreamSubjects> { @@ -18,6 +18,7 @@ function makeSubjects(): StreamSubjects> { isThreadLoading$: new BehaviorSubject(false), toolProgress$: new BehaviorSubject([]), toolCalls$: new BehaviorSubject([]), + messageMetadata$: new BehaviorSubject(new Map()), subagents$: new BehaviorSubject(new Map()), custom$: new BehaviorSubject([]), }; @@ -324,6 +325,180 @@ describe('createStreamManagerBridge', () => { destroy$.next(); }); + it('updates toolProgress$ from tools stream events', async () => { + const transport = new MockAgentTransport(); + const subjects = makeSubjects(); + const destroy$ = new Subject(); + const bridge = createStreamManagerBridge({ + options: { apiUrl: '', assistantId: 'test', transport }, + subjects, + threadId$: of(null), + destroy$: destroy$.asObservable(), + }); + + bridge.submit({}); + transport.emit([ + { type: 'tools', data: { event: 'on_tool_start', toolCallId: 'call-1', name: 'search', input: { q: 'angular' } } }, + { type: 'tools', data: { event: 'on_tool_event', toolCallId: 'call-1', name: 'search', data: { step: 1 } } }, + { type: 'tools', data: { event: 'on_tool_end', toolCallId: 'call-1', name: 'search', output: 'done' } }, + ] satisfies StreamEvent[]); + transport.close(); + + await new Promise(r => setTimeout(r, 10)); + + expect(subjects.toolProgress$.value).toEqual([ + { + toolCallId: 'call-1', + name: 'search', + state: 'completed', + input: { q: 'angular' }, + data: { step: 1 }, + result: 'done', + }, + ]); + destroy$.next(); + }); + + it('marks tool progress as error when a tool error event is received', async () => { + const transport = new MockAgentTransport(); + const subjects = makeSubjects(); + const destroy$ = new Subject(); + const bridge = createStreamManagerBridge({ + options: { apiUrl: '', assistantId: 'test', transport }, + subjects, + threadId$: of(null), + destroy$: destroy$.asObservable(), + }); + + bridge.submit({}); + transport.emit([ + { type: 'tools', data: { event: 'on_tool_start', toolCallId: 'call-1', name: 'search', input: { q: 'angular' } } }, + { type: 'tools', data: { event: 'on_tool_error', toolCallId: 'call-1', name: 'search', error: 'failed' } }, + ] satisfies StreamEvent[]); + transport.close(); + + await new Promise(r => setTimeout(r, 10)); + + expect(subjects.toolProgress$.value).toEqual([ + { + toolCallId: 'call-1', + name: 'search', + state: 'error', + input: { q: 'angular' }, + error: 'failed', + }, + ]); + destroy$.next(); + }); + + it('derives toolCalls$ from AI tool calls and matching tool messages', async () => { + const transport = new MockAgentTransport(); + const subjects = makeSubjects(); + const destroy$ = new Subject(); + const bridge = createStreamManagerBridge({ + options: { apiUrl: '', assistantId: 'test', transport }, + subjects, + threadId$: of(null), + destroy$: destroy$.asObservable(), + }); + + bridge.submit({}); + transport.emit([{ + type: 'messages', + messages: [ + { + id: 'ai-1', + type: 'ai', + content: '', + tool_calls: [{ id: 'call-1', name: 'search', args: { q: 'angular' } }], + }, + { + id: 'tool-1', + type: 'tool', + tool_call_id: 'call-1', + content: 'result', + status: 'success', + }, + ], + } satisfies StreamEvent]); + transport.close(); + + await new Promise(r => setTimeout(r, 10)); + + expect(subjects.toolCalls$.value).toHaveLength(1); + expect(subjects.toolCalls$.value[0]).toMatchObject({ + id: 'call-1', + state: 'completed', + call: { name: 'search', args: { q: 'angular' } }, + result: { content: 'result' }, + }); + destroy$.next(); + }); + + it('stores message tuple metadata by message id', async () => { + const transport = new MockAgentTransport(); + const subjects = makeSubjects(); + const destroy$ = new Subject(); + const bridge = createStreamManagerBridge({ + options: { apiUrl: '', assistantId: 'test', transport }, + subjects, + threadId$: of(null), + destroy$: destroy$.asObservable(), + }); + + bridge.submit({}); + transport.emit([{ + type: 'messages', + messages: [{ id: 'ai-1', type: 'ai', content: 'hello' }], + messageMetadata: { langgraph_node: 'model', run_id: 'run-1' }, + } satisfies StreamEvent]); + transport.close(); + + await new Promise(r => setTimeout(r, 10)); + + expect(subjects.messageMetadata$.value.get('ai-1')).toEqual({ + messageId: 'ai-1', + firstSeenState: undefined, + branch: undefined, + branchOptions: undefined, + streamMetadata: { langgraph_node: 'model', run_id: 'run-1' }, + }); + destroy$.next(); + }); + + it('merges message tuple events into the existing transcript', async () => { + const transport = new MockAgentTransport(); + const subjects = makeSubjects(); + const destroy$ = new Subject(); + const bridge = createStreamManagerBridge({ + options: { apiUrl: '', assistantId: 'test', transport }, + subjects, + threadId$: of(null), + destroy$: destroy$.asObservable(), + }); + + bridge.submit({ messages: [{ type: 'human', content: 'hello' }] }); + transport.emit([{ + type: 'messages', + messages: [{ id: 'ai-1', type: 'ai', content: 'hel' }], + messageMetadata: { langgraph_node: 'model' }, + } satisfies StreamEvent]); + transport.emit([{ + type: 'messages', + messages: [{ id: 'ai-1', type: 'ai', content: 'hello' }], + messageMetadata: { langgraph_node: 'model' }, + } satisfies StreamEvent]); + transport.close(); + + await new Promise(r => setTimeout(r, 10)); + + expect(subjects.messages$.value).toEqual([ + { type: 'human', content: 'hello' }, + { id: 'ai-1', type: 'ai', content: 'hello' }, + ]); + destroy$.next(); + }); + it('accumulates multiple custom events in order', async () => { const transport = new MockAgentTransport(); const subjects = makeSubjects(); diff --git a/libs/langgraph/src/lib/internals/stream-manager.bridge.ts b/libs/langgraph/src/lib/internals/stream-manager.bridge.ts index 59f621c4e..526d9931b 100644 --- a/libs/langgraph/src/lib/internals/stream-manager.bridge.ts +++ b/libs/langgraph/src/lib/internals/stream-manager.bridge.ts @@ -9,8 +9,9 @@ import { } from '../agent.types'; import { FetchStreamTransport } from '../transport/fetch-stream.transport'; import { BagTemplate } from '@langchain/langgraph-sdk'; +import { getToolCallsWithResults } from '@langchain/langgraph-sdk/utils'; import type { BaseMessage } from '@langchain/core/messages'; -import type { Interrupt } from '@langchain/langgraph-sdk'; +import type { Interrupt, Message as LangGraphMessage, ToolCallWithResult, ToolProgress } from '@langchain/langgraph-sdk'; export interface StreamManagerBridgeOptions { options: AgentOptions; @@ -45,6 +46,7 @@ export function createStreamManagerBridge(); function resetThreadState(): void { subjects.values$.next({} as T); @@ -52,8 +54,12 @@ export function createStreamManagerBridge)['id']; - const idx = id ? merged.findIndex(m => (m as unknown as Record)['id'] === id) : -1; - if (idx >= 0) { - merged[idx] = msg; - } else { - merged.push(msg); - } - } - subjects.messages$.next(merged); + // Partial and message-tuple events are incremental. Merge them by id + // so optimistic human messages and earlier tool messages are preserved. + if (event.type === 'messages/partial' || event.messageMetadata) { + subjects.messages$.next(mergeMessages(subjects.messages$.value, normalized)); } else { subjects.messages$.next(normalized); } + storeMessageMetadata(normalized, event); + syncToolCallsFromMessages(); return; } @@ -164,6 +162,7 @@ export function createStreamManagerBridge { + const id = (message as unknown as Record)['id']; + const messageId = String(id ?? index); + next.set(messageId, { + messageId, + firstSeenState: undefined, + branch: undefined, + branchOptions: undefined, + streamMetadata: event.messageMetadata, + }); + }); + subjects.messageMetadata$.next(next); + } + + function updateToolProgress(event: StreamEvent): void { + const data = extractEventData(event); + if (!isRecord(data)) return; + + const toolEvent = data['event']; + const name = data['name']; + if (typeof toolEvent !== 'string' || typeof name !== 'string') return; + + const toolCallId = typeof data['toolCallId'] === 'string' ? data['toolCallId'] : undefined; + const key = toolCallId ?? name; + const existing = toolProgressMap.get(key); + + switch (toolEvent) { + case 'on_tool_start': + toolProgressMap.set(key, { + toolCallId, + name, + state: 'starting', + input: data['input'], + }); + break; + case 'on_tool_event': + toolProgressMap.set(key, { + toolCallId, + name, + ...existing, + state: 'running', + data: data['data'], + }); + break; + case 'on_tool_end': + toolProgressMap.set(key, { + toolCallId, + name, + ...existing, + state: 'completed', + result: data['output'], + }); + break; + case 'on_tool_error': + toolProgressMap.set(key, { + toolCallId, + name, + ...existing, + state: 'error', + error: data['error'], + }); + break; + default: + return; } + + subjects.toolProgress$.next([...toolProgressMap.values()]); } return { - submit: (payload, _opts) => runStream(payload), + submit: (payload) => runStream(payload), stop: async () => { abortController?.abort(); @@ -220,12 +297,13 @@ export function createStreamManagerBridge key !== 'type' && key !== 'data'), + ); return Object.keys(rest).length > 0 ? rest : d; } @@ -305,6 +385,20 @@ function normalizeMessages(event: StreamEvent): unknown[] | null { return null; } +function mergeMessages(existing: BaseMessage[], incoming: BaseMessage[]): BaseMessage[] { + const merged = [...existing]; + for (const msg of incoming) { + const id = (msg as unknown as Record)['id']; + const idx = id ? merged.findIndex(m => (m as unknown as Record)['id'] === id) : -1; + if (idx >= 0) { + merged[idx] = msg; + } else { + merged.push(msg); + } + } + return merged; +} + function isMessageLike(value: unknown): value is Record { return typeof value === 'object' && value !== null @@ -314,3 +408,7 @@ function isMessageLike(value: unknown): value is Record { || 'id' in value ); } + +function isRecord(value: unknown): value is Record { + return typeof value === 'object' && value !== null && !Array.isArray(value); +} diff --git a/libs/langgraph/src/lib/transport/fetch-stream.transport.spec.ts b/libs/langgraph/src/lib/transport/fetch-stream.transport.spec.ts index 152d6821e..25427f064 100644 --- a/libs/langgraph/src/lib/transport/fetch-stream.transport.spec.ts +++ b/libs/langgraph/src/lib/transport/fetch-stream.transport.spec.ts @@ -60,6 +60,55 @@ describe('FetchStreamTransport', () => { ]); }); + it('requests the stream modes required for values, messages, tools, and custom events', async () => { + mocks.runsStream.mockReturnValue((async function* () { + yield { event: 'metadata', data: { run_id: 'run-1', thread_id: 'thread-1' } }; + })()); + + const transport = new FetchStreamTransport('http://example.test'); + await collect( + transport.stream('assistant-1', 'thread-1', { input: 'hello' }, new AbortController().signal), + ); + + expect(mocks.runsStream).toHaveBeenCalledWith( + 'thread-1', + 'assistant-1', + expect.objectContaining({ + streamMode: expect.arrayContaining([ + 'values', + 'messages-tuple', + 'updates', + 'tools', + 'custom', + ]), + }), + ); + }); + + it('normalizes message tuple events without dropping metadata', async () => { + const message = { id: 'ai-1', type: 'ai', content: 'pong' }; + const metadata = { langgraph_node: 'model', run_id: 'run-1' }; + mocks.runsStream.mockReturnValue( + (async function* () { + yield { event: 'messages', data: [message, metadata] }; + })(), + ); + + const transport = new FetchStreamTransport('http://example.test'); + const events = await collect( + transport.stream('assistant-1', 'thread-1', { input: 'hello' }, new AbortController().signal), + ); + + expect(events).toEqual([ + { + type: 'messages', + messages: [message], + messageMetadata: metadata, + data: [message, metadata], + }, + ]); + }); + it('normalizes updates, interrupt, and interrupts payloads', async () => { mocks.runsStream.mockReturnValue( (async function* () { diff --git a/libs/langgraph/src/lib/transport/fetch-stream.transport.ts b/libs/langgraph/src/lib/transport/fetch-stream.transport.ts index 378a3bb21..6120bb001 100644 --- a/libs/langgraph/src/lib/transport/fetch-stream.transport.ts +++ b/libs/langgraph/src/lib/transport/fetch-stream.transport.ts @@ -1,5 +1,6 @@ // SPDX-License-Identifier: MIT import { Client } from '@langchain/langgraph-sdk'; +import type { StreamMode } from '@langchain/langgraph-sdk'; import { AgentTransport, StreamEvent } from '../agent.types'; /** @@ -44,7 +45,7 @@ export class FetchStreamTransport implements AgentTransport { payload: unknown, signal: AbortSignal, ): AsyncIterable { - const streamMode = ['values', 'messages', 'updates', 'events', 'debug'] as const; + const streamMode = ['values', 'messages-tuple', 'updates', 'tools', 'custom'] satisfies StreamMode[]; const opts = { signal }; let thread = threadId; @@ -85,6 +86,10 @@ export class FetchStreamTransport implements AgentTransport { } function normalizeSdkEvent(type: StreamEvent['type'], data: unknown): StreamEvent { + if (type === 'messages' && Array.isArray(data) && data.length === 2 && isRecord(data[1])) { + return { type, messages: [data[0]], messageMetadata: data[1], data }; + } + if (isMessagesEvent(type) && Array.isArray(data)) { return { type, messages: data, data }; }