diff --git a/README.md b/README.md index 3236e1025..daeb3fa21 100644 --- a/README.md +++ b/README.md @@ -94,7 +94,7 @@ That's it. `chat.messages()` is an Angular Signal. Bind it directly in your temp | Tool call progress | `toolProgress()` | `toolProgress` | | Tool calls with results | `toolCalls()` | `toolCalls` | | Branch / history | `branch()` / `history()` | `branch` / `history` | -| Subagent streaming | Planned next | `subagents` / `activeSubagents` | +| Subagent streaming | `subagents()` / `activeSubagents()` | `subagents` / `activeSubagents` | | Reactive thread switching | `Signal` input | prop | | Submit | `submit(values, opts?)` | `submit(values, opts?)` | | Stop | `stop()` | `stop()` | diff --git a/apps/website/content/docs/agent/api/api-docs.json b/apps/website/content/docs/agent/api/api-docs.json index b60e0c8d3..094889408 100644 --- a/apps/website/content/docs/agent/api/api-docs.json +++ b/apps/website/content/docs/agent/api/api-docs.json @@ -739,9 +739,27 @@ "kind": "interface", "description": "An event emitted by a LangGraph stream.", "properties": [ + { + "name": "messageMetadata", + "type": "Record", + "description": "", + "optional": true + }, + { + "name": "messages", + "type": "unknown[]", + "description": "", + "optional": true + }, + { + "name": "namespace", + "type": "string[]", + "description": "", + "optional": true + }, { "name": "type", - "type": "\"error\" | \"values\" | \"messages\" | `messages/${string}` | \"updates\" | \"tools\" | \"custom\" | \"metadata\" | \"checkpoints\" | \"tasks\" | \"debug\" | \"events\" | \"interrupt\" | \"interrupts\"", + "type": "\"error\" | \"values\" | `values|${string}` | \"messages\" | `messages|${string}` | `messages/${string}` | `messages/${string}|${string}` | \"updates\" | `updates|${string}` | \"tools\" | `tools|${string}` | \"custom\" | `custom|${string}` | `error|${string}` | \"metadata\" | \"checkpoints\" | `checkpoints|${string}` | \"tasks\" | `tasks|${string}` | \"debug\" | `debug|${string}` | \"events\" | `events|${string}` | \"interrupt\" | \"interrupts\"", "description": "Event type identifier (e.g., 'values', 'messages', 'error', 'interrupt').", "optional": false } @@ -759,6 +777,12 @@ "description": "Messages from the subagent conversation.", "optional": false }, + { + "name": "name", + "type": "string", + "description": "Optional human-readable subagent type/name.", + "optional": true + }, { "name": "status", "type": "Signal<\"running\" | \"error\" | \"pending\" | \"complete\">", diff --git a/apps/website/content/docs/agent/concepts/agent-architecture.mdx b/apps/website/content/docs/agent/concepts/agent-architecture.mdx index 00c3f11ad..d4808e91b 100644 --- a/apps/website/content/docs/agent/concepts/agent-architecture.mdx +++ b/apps/website/content/docs/agent/concepts/agent-architecture.mdx @@ -460,12 +460,8 @@ export class MultiAgentComponent { - -Tool calls, tool progress, and tool results stream today. Dedicated `subagents()` and `activeSubagents()` tracking is planned for the next implementation phase; use `toolCalls()` and `toolProgress()` for current delegated-work visibility. - - -The `subagentToolNames` option will tell agent() which graph nodes are subagents. Until dedicated tracking lands, subagent execution appears through the regular tool-call and tool-progress signals. +The `subagentToolNames` option tells agent() which tool calls spawn subagents. The default Deep Agents tool name is `task`; set this option when your graph uses custom delegation tool names. ## Error Handling and Recovery @@ -662,7 +658,7 @@ builder.add_node("analyst", analyst_subgraph) builder.add_conditional_edges("supervisor", route_to_agent) ``` -**Angular signals used today:** `messages()`, `toolCalls()`, `toolProgress()`, `status()`; dedicated `subagents()` / `activeSubagents()` tracking is planned next. +**Angular signals used:** `messages()`, `subagents()`, `activeSubagents()`, `toolCalls()`, `toolProgress()`, `status()` ### Decision Matrix diff --git a/apps/website/content/docs/agent/concepts/langgraph-basics.mdx b/apps/website/content/docs/agent/concepts/langgraph-basics.mdx index 2537fbfea..d92d62195 100644 --- a/apps/website/content/docs/agent/concepts/langgraph-basics.mdx +++ b/apps/website/content/docs/agent/concepts/langgraph-basics.mdx @@ -227,14 +227,16 @@ builder.add_node("analyst", analyst_subgraph) builder.add_conditional_edges("supervisor", lambda s: s["next_agent"]) ``` -**Angular connection:** Dedicated subagent tracking is planned for the next implementation phase. Today, track delegated work through tool progress and tool results: +**Angular connection:** Track delegated work through dedicated subagent signals: ```typescript const orchestrator = agent({ assistantId: 'orchestrator', + subagentToolNames: ['task'], + filterSubagentMessages: true, }); -const activeTools = computed(() => orchestrator.toolProgress()); -const completedTools = computed(() => orchestrator.toolCalls()); +const workers = computed(() => orchestrator.activeSubagents()); +const workerCount = computed(() => workers().length); ``` ### Pattern 4: Persistent Conversations @@ -322,7 +324,8 @@ agent.branch() // Signal — time-travel branch agent.toolCalls() // Signal — tool results agent.toolProgress() // Signal — active tool execution -// Dedicated subagent signals are planned next. +agent.subagents() // Signal> — delegated agents +agent.activeSubagents() // Signal — running workers ``` diff --git a/apps/website/content/docs/agent/guides/subgraphs.mdx b/apps/website/content/docs/agent/guides/subgraphs.mdx index 0cd51ca3e..f8618be7e 100644 --- a/apps/website/content/docs/agent/guides/subgraphs.mdx +++ b/apps/website/content/docs/agent/guides/subgraphs.mdx @@ -6,10 +6,6 @@ Subgraphs let you compose complex agents from smaller, focused units. agent() tr LangGraph calls them subgraphs (modular graph composition). Deep Agents calls them subagents (task delegation). agent() supports both patterns through the same API. - -Tool calls, tool progress, and tool results stream today. The `subagents()` / `activeSubagents()` examples below describe the planned Phase 2 API; until that lands, use `toolCalls()` and `toolProgress()` for visibility into delegated work. - - ## How subgraph composition works Subgraph composition starts on the agent side. Each subgraph is a fully compiled `StateGraph` that can be added as a node in a parent graph. diff --git a/apps/website/src/components/docs/mdx/FeatureChips.tsx b/apps/website/src/components/docs/mdx/FeatureChips.tsx index 25e429a11..e888e7cff 100644 --- a/apps/website/src/components/docs/mdx/FeatureChips.tsx +++ b/apps/website/src/components/docs/mdx/FeatureChips.tsx @@ -17,7 +17,7 @@ const CHIPS: ChipData[] = [ { icon: '💾', title: 'Persistence', signal: 'threadId', href: '/docs/guides/persistence', gradient: 'linear-gradient(135deg, rgba(16,185,129,0.06), rgba(52,199,89,0.08))', border: 'rgba(16,185,129,0.1)' }, { icon: '✋', title: 'Interrupts', signal: 'chat.interrupt()', href: '/docs/guides/interrupts', gradient: 'linear-gradient(135deg, rgba(232,147,12,0.06), rgba(245,180,60,0.08))', border: 'rgba(232,147,12,0.1)' }, { icon: '⏪', title: 'Time Travel', signal: 'chat.history()', href: '/docs/guides/time-travel', gradient: 'linear-gradient(135deg, rgba(221,0,49,0.05), rgba(255,100,130,0.07))', border: 'rgba(221,0,49,0.08)' }, - { icon: '🔀', title: 'Subagents', signal: 'Phase 2', href: '/docs/guides/subgraphs', gradient: 'linear-gradient(135deg, rgba(0,64,144,0.05), rgba(0,100,180,0.07))', border: 'rgba(0,64,144,0.08)' }, + { icon: '🔀', title: 'Subagents', signal: 'chat.subagents()', href: '/docs/guides/subgraphs', gradient: 'linear-gradient(135deg, rgba(0,64,144,0.05), rgba(0,100,180,0.07))', border: 'rgba(0,64,144,0.08)' }, { icon: '🔧', title: 'Tool Calls', signal: 'chat.toolCalls()', href: '/docs/guides/streaming', gradient: 'linear-gradient(135deg, rgba(100,80,200,0.05), rgba(120,100,210,0.07))', border: 'rgba(100,80,200,0.08)' }, { icon: '🧪', title: 'Testing', signal: 'MockTransport', href: '/docs/guides/testing', gradient: 'linear-gradient(135deg, rgba(16,185,129,0.05), rgba(40,200,140,0.07))', border: 'rgba(16,185,129,0.08)' }, ]; diff --git a/docs/limitations.md b/docs/limitations.md index 0b017fdb4..dcdc388dc 100644 --- a/docs/limitations.md +++ b/docs/limitations.md @@ -79,17 +79,17 @@ automatically on `submit()` calls. --- -### Limitation: subagent tracking is deferred +### Limitation: subagent helper methods are not exposed -**Feature:** `subagents()` / `activeSubagents()` / `filterSubagentMessages` / -`subagentToolNames` +**Feature:** `getSubagent()` / `getSubagentsByType()` / +`getSubagentsByMessage()` -**React behavior:** `useStream()` can track Deep Agent subagent execution by -combining subgraph stream events with tool-call registration. +**React behavior:** `useStream()` exposes helper methods for looking up +subagent streams by tool call ID, subagent type, or triggering message. -**Angular behavior:** Tool calls, tool progress, message metadata, and -per-message tool results are implemented. Subagent-specific stream routing is -deferred to the next implementation phase. +**Angular behavior:** `subagents()` and `activeSubagents()` are implemented. +Use the `subagents()` map directly for lookups. Helper methods can be added +later if Angular consumers need parity beyond the signal surface. -**Workaround:** Use `toolCalls()` and `toolProgress()` for tool-level visibility -until dedicated subagent tracking lands. +**Workaround:** Read from `subagents().get(toolCallId)` or filter +`[...subagents().values()]` in a computed signal. diff --git a/libs/langgraph/src/lib/agent.fn.spec.ts b/libs/langgraph/src/lib/agent.fn.spec.ts index 8bae682f9..51e574a27 100644 --- a/libs/langgraph/src/lib/agent.fn.spec.ts +++ b/libs/langgraph/src/lib/agent.fn.spec.ts @@ -330,6 +330,62 @@ describe('agent', () => { }); }); + it('subagents() and activeSubagents() expose delegated work as signals', async () => { + const transport = new MockAgentTransport(); + const ref = withInjectionContext(() => + agent({ + apiUrl: '', + assistantId: 'a', + transport, + throttle: false, + subagentToolNames: ['task'], + filterSubagentMessages: true, + }) + ); + + ref.submit({ message: 'hello' }); + transport.emit([{ + type: 'messages', + messages: [{ + id: 'ai-1', + type: 'ai', + content: '', + tool_calls: [{ + id: 'call-1', + name: 'task', + args: { subagent_type: 'researcher', description: 'Research Angular signals' }, + }], + }], + } satisfies StreamEvent]); + transport.emit([{ + type: 'messages|tools:call-1' as StreamEvent['type'], + namespace: ['tools:call-1'], + messages: [{ id: 'sub-ai-1', type: 'ai', content: 'Subagent note' }], + messageMetadata: { checkpoint_ns: 'tools:call-1|model:abc' }, + } satisfies StreamEvent]); + + await new Promise(r => setTimeout(r, 20)); + + expect(ref.activeSubagents()).toHaveLength(1); + expect(ref.activeSubagents()[0].status()).toBe('running'); + expect(ref.subagents().get('call-1')?.name).toBe('researcher'); + expect(ref.subagents().get('call-1')?.messages()).toEqual([ + expect.objectContaining({ id: 'sub-ai-1', role: 'assistant', content: 'Subagent note' }), + ]); + expect(ref.messages()).toHaveLength(1); + expect(ref.messages()[0].id).toBe('ai-1'); + + transport.emit([{ + type: 'messages', + messages: [{ id: 'tool-1', type: 'tool', tool_call_id: 'call-1', content: 'done', status: 'success' }], + } satisfies StreamEvent]); + transport.close(); + await new Promise(r => setTimeout(r, 20)); + + expect(ref.activeSubagents()).toHaveLength(0); + expect(ref.subagents().get('call-1')?.status()).toBe('complete'); + }); + it('events$ is an Observable-like with .subscribe', () => { const transport = new MockAgentTransport(); const ref = withInjectionContext(() => diff --git a/libs/langgraph/src/lib/agent.fn.ts b/libs/langgraph/src/lib/agent.fn.ts index 74501659f..c662976fd 100644 --- a/libs/langgraph/src/lib/agent.fn.ts +++ b/libs/langgraph/src/lib/agent.fn.ts @@ -348,6 +348,7 @@ function toInterrupt(ix: Interrupt): AgentInterrupt { function toSubagent(sa: SubagentStreamRef): Subagent { return { toolCallId: sa.toolCallId, + name: sa.name, status: sa.status, messages: computed(() => sa.messages().map(toMessage)) as Signal, state: sa.values as Signal>, diff --git a/libs/langgraph/src/lib/agent.types.ts b/libs/langgraph/src/lib/agent.types.ts index 896a495b4..8a1717639 100644 --- a/libs/langgraph/src/lib/agent.types.ts +++ b/libs/langgraph/src/lib/agent.types.ts @@ -43,19 +43,31 @@ export interface StreamEvent { /** Event type identifier (e.g., 'values', 'messages', 'error', 'interrupt'). */ type: | 'values' + | `values|${string}` | 'messages' + | `messages|${string}` | `messages/${string}` + | `messages/${string}|${string}` | 'updates' + | `updates|${string}` | 'tools' + | `tools|${string}` | 'custom' + | `custom|${string}` | 'error' + | `error|${string}` | 'metadata' | 'checkpoints' + | `checkpoints|${string}` | 'tasks' + | `tasks|${string}` | 'debug' + | `debug|${string}` | 'events' + | `events|${string}` | 'interrupt' | 'interrupts'; + namespace?: string[]; messages?: unknown[]; messageMetadata?: Record; [key: string]: unknown; @@ -122,6 +134,8 @@ export interface AgentOptions { export interface SubagentStreamRef { /** The tool call ID that spawned this subagent. */ toolCallId: string; + /** Optional human-readable subagent type/name. */ + name?: string; /** Current execution status of the subagent. */ status: Signal<'pending' | 'running' | 'complete' | 'error'>; /** Current state values from the subagent. */ diff --git a/libs/langgraph/src/lib/internals/stream-manager.bridge.spec.ts b/libs/langgraph/src/lib/internals/stream-manager.bridge.spec.ts index 7f5b6c0e9..5574e216b 100644 --- a/libs/langgraph/src/lib/internals/stream-manager.bridge.spec.ts +++ b/libs/langgraph/src/lib/internals/stream-manager.bridge.spec.ts @@ -499,6 +499,156 @@ describe('createStreamManagerBridge', () => { destroy$.next(); }); + it('tracks configured subagent tool calls through running and completion states', async () => { + const transport = new MockAgentTransport(); + const subjects = makeSubjects(); + const destroy$ = new Subject(); + const bridge = createStreamManagerBridge({ + options: { + apiUrl: '', + assistantId: 'test', + transport, + subagentToolNames: ['task'], + }, + subjects, + threadId$: of(null), + destroy$: destroy$.asObservable(), + }); + + bridge.submit({}); + transport.emit([{ + type: 'messages', + messages: [{ + id: 'ai-1', + type: 'ai', + content: '', + tool_calls: [{ + id: 'call-1', + name: 'task', + args: { subagent_type: 'researcher', description: 'Research Angular signals' }, + }], + }], + } satisfies StreamEvent]); + transport.emit([{ + type: 'values|tools:call-1' as StreamEvent['type'], + namespace: ['tools:call-1'], + data: { messages: [{ type: 'human', content: 'Research Angular signals' }], notes: 'started' }, + } satisfies StreamEvent]); + + await new Promise(r => setTimeout(r, 10)); + + const running = subjects.subagents$.value.get('call-1'); + expect(running?.toolCallId).toBe('call-1'); + expect(running?.status()).toBe('running'); + expect(running?.values()).toMatchObject({ notes: 'started' }); + + transport.emit([{ + type: 'messages', + messages: [{ id: 'tool-1', type: 'tool', tool_call_id: 'call-1', content: 'done', status: 'success' }], + } satisfies StreamEvent]); + transport.close(); + + await new Promise(r => setTimeout(r, 10)); + + expect(subjects.subagents$.value.get('call-1')?.status()).toBe('complete'); + destroy$.next(); + }); + + it('routes subagent message tuples out of main messages when filtering is enabled', async () => { + const transport = new MockAgentTransport(); + const subjects = makeSubjects(); + const destroy$ = new Subject(); + const bridge = createStreamManagerBridge({ + options: { + apiUrl: '', + assistantId: 'test', + transport, + subagentToolNames: ['task'], + filterSubagentMessages: true, + }, + subjects, + threadId$: of(null), + destroy$: destroy$.asObservable(), + }); + + bridge.submit({}); + transport.emit([{ + type: 'messages', + messages: [{ + id: 'ai-1', + type: 'ai', + content: '', + tool_calls: [{ + id: 'call-1', + name: 'task', + args: { subagent_type: 'researcher', description: 'Research Angular signals' }, + }], + }], + } satisfies StreamEvent]); + transport.emit([{ + type: 'messages|tools:call-1' as StreamEvent['type'], + namespace: ['tools:call-1'], + messages: [{ id: 'sub-ai-1', type: 'ai', content: 'Subagent note' }], + messageMetadata: { checkpoint_ns: 'tools:call-1|model:abc' }, + } satisfies StreamEvent]); + transport.close(); + + await new Promise(r => setTimeout(r, 10)); + + expect(subjects.messages$.value).toHaveLength(1); + expect(subjects.messages$.value[0]).toMatchObject({ id: 'ai-1' }); + expect(subjects.subagents$.value.get('call-1')?.messages()).toEqual([ + expect.objectContaining({ id: 'sub-ai-1', type: 'ai', content: 'Subagent note' }), + ]); + destroy$.next(); + }); + + it('clears tracked subagents when the thread changes', async () => { + const transport = new MockAgentTransport(); + const subjects = makeSubjects(); + const destroy$ = new Subject(); + const threadId$ = new BehaviorSubject('thread-1'); + const bridge = createStreamManagerBridge({ + options: { + apiUrl: '', + assistantId: 'test', + transport, + subagentToolNames: ['task'], + }, + subjects, + threadId$: threadId$.asObservable(), + destroy$: destroy$.asObservable(), + }); + + bridge.submit({}); + transport.emit([{ + type: 'messages', + messages: [{ + id: 'ai-1', + type: 'ai', + content: '', + tool_calls: [{ + id: 'call-1', + name: 'task', + args: { subagent_type: 'researcher', description: 'Research Angular signals' }, + }], + }], + } satisfies StreamEvent]); + transport.emit([{ + type: 'values|tools:call-1' as StreamEvent['type'], + namespace: ['tools:call-1'], + data: { messages: [{ type: 'human', content: 'Research Angular signals' }] }, + } satisfies StreamEvent]); + await new Promise(r => setTimeout(r, 10)); + expect(subjects.subagents$.value.size).toBe(1); + + threadId$.next('thread-2'); + await new Promise(r => setTimeout(r, 10)); + + expect(subjects.subagents$.value.size).toBe(0); + destroy$.next(); + }); + it('accumulates multiple custom events in order', async () => { const transport = new MockAgentTransport(); const subjects = makeSubjects(); diff --git a/libs/langgraph/src/lib/internals/stream-manager.bridge.ts b/libs/langgraph/src/lib/internals/stream-manager.bridge.ts index 526d9931b..8f04456a9 100644 --- a/libs/langgraph/src/lib/internals/stream-manager.bridge.ts +++ b/libs/langgraph/src/lib/internals/stream-manager.bridge.ts @@ -1,4 +1,5 @@ // SPDX-License-Identifier: MIT +import { signal } from '@angular/core'; import { Observable, takeUntil } from 'rxjs'; import { ResourceStatus, @@ -6,10 +7,17 @@ import { StreamSubjects, StreamEvent, AgentTransport, + SubagentStreamRef, } from '../agent.types'; import { FetchStreamTransport } from '../transport/fetch-stream.transport'; import { BagTemplate } from '@langchain/langgraph-sdk'; import { getToolCallsWithResults } from '@langchain/langgraph-sdk/utils'; +import { + SubagentTracker, + TrackedSubagent, + extractToolCallIdFromNamespace, + isSubagentNamespace, +} from './subagent-tracker'; import type { BaseMessage } from '@langchain/core/messages'; import type { Interrupt, Message as LangGraphMessage, ToolCallWithResult, ToolProgress } from '@langchain/langgraph-sdk'; @@ -47,6 +55,10 @@ export function createStreamManagerBridge(); + const subagentManager = new SubagentTracker({ + subagentToolNames: options.subagentToolNames, + onSubagentChange: publishSubagents, + }); function resetThreadState(): void { subjects.values$.next({} as T); @@ -57,9 +69,11 @@ export function createStreamManagerBridge; + if (isAiMessageWithToolCalls(raw)) { + subagentManager.registerFromToolCalls( + raw['tool_calls'] as Array<{ id?: string; name: string; args: Record | string }>, + typeof raw['id'] === 'string' ? raw['id'] : null, + ); + } + if (isToolMessage(raw)) { + const content = typeof raw['content'] === 'string' + ? raw['content'] + : JSON.stringify(raw['content']); + const status = raw['status'] === 'error' ? 'error' : 'success'; + subagentManager.processToolMessage(raw['tool_call_id'], content, status); + } + } + publishSubagents(); + } + + function updateSubagentValues(namespace: string[] | undefined, values: Record): void { + const namespaceId = namespace ? extractToolCallIdFromNamespace(namespace) : undefined; + if (!namespaceId) return; + + const messages = values['messages']; + if (Array.isArray(messages) && messages.length > 0) { + const first = messages[0]; + if (isRecord(first) && (first['type'] === 'human' || first['type'] === 'user') && typeof first['content'] === 'string') { + subagentManager.matchSubgraphToSubagent(namespaceId, first['content']); + } + } + subagentManager.updateSubagentValues(namespaceId, values); + publishSubagents(); + } + + function markSubagentRunning(namespace: string[] | undefined): void { + const namespaceId = namespace ? extractToolCallIdFromNamespace(namespace) : undefined; + if (!namespaceId) return; + subagentManager.markRunningFromNamespace(namespaceId, namespace); + publishSubagents(); + } + + function publishSubagents(): void { + subjects.subagents$.next(toSubagentRefs(subagentManager.getSubagents())); + } + function storeMessageMetadata(messages: BaseMessage[], event: StreamEvent): void { if (!event.messageMetadata) return; const next = new Map(subjects.messageMetadata$.value); @@ -348,7 +440,18 @@ function extractEventData(event: StreamEvent): unknown { } function isMessagesEvent(type: StreamEvent['type']): boolean { - return type === 'messages' || type.startsWith('messages/'); + const baseType = getBaseEventType(type); + return baseType === 'messages' || baseType.startsWith('messages/'); +} + +function getBaseEventType(type: StreamEvent['type']): string { + return String(type).split('|')[0]; +} + +function getEventNamespace(event: StreamEvent): string[] | undefined { + if (Array.isArray(event.namespace)) return event.namespace; + const parts = String(event.type).split('|'); + return parts.length > 1 ? parts.slice(1) : undefined; } function normalizeMessages(event: StreamEvent): unknown[] | null { @@ -399,6 +502,33 @@ function mergeMessages(existing: BaseMessage[], incoming: BaseMessage[]): BaseMe return merged; } +function toSubagentRefs( + subagents: Map, +): Map { + const refs = new Map(); + subagents.forEach((subagent, key) => { + refs.set(key, { + toolCallId: subagent.id, + name: typeof subagent.toolCall.args['subagent_type'] === 'string' + ? subagent.toolCall.args['subagent_type'] + : undefined, + status: signal(subagent.status), + values: signal(subagent.values), + messages: signal(subagent.messages as unknown as BaseMessage[]), + }); + }); + return refs; +} + +function isAiMessageWithToolCalls(value: Record): boolean { + return (value['type'] === 'ai' || value['type'] === 'assistant') + && Array.isArray(value['tool_calls']); +} + +function isToolMessage(value: Record): value is Record & { tool_call_id: string } { + return value['type'] === 'tool' && typeof value['tool_call_id'] === 'string'; +} + function isMessageLike(value: unknown): value is Record { return typeof value === 'object' && value !== null diff --git a/libs/langgraph/src/lib/internals/subagent-tracker.ts b/libs/langgraph/src/lib/internals/subagent-tracker.ts new file mode 100644 index 000000000..d71295d76 --- /dev/null +++ b/libs/langgraph/src/lib/internals/subagent-tracker.ts @@ -0,0 +1,294 @@ +// SPDX-License-Identifier: MIT +import type { BaseMessage } from '@langchain/core/messages'; + +export interface TrackedToolCall { + id?: string; + name: string; + args: Record | string; +} + +export interface TrackedSubagent { + id: string; + status: 'pending' | 'running' | 'complete' | 'error'; + toolCall: { + id: string; + name: string; + args: Record; + }; + values: Record; + messages: BaseMessage[]; +} + +export interface SubagentTrackerOptions { + subagentToolNames?: string[]; + onSubagentChange?: () => void; +} + +const DEFAULT_SUBAGENT_TOOL_NAMES = ['task']; + +/** + * Lightweight Angular adapter for LangGraph subagent stream state. + * + * This intentionally mirrors only the SDK behavior this package exposes. Using + * the SDK UI barrel at runtime pulls StreamManager/client utilities into every + * Angular bundle, which breaks cockpit production budgets. + */ +export class SubagentTracker { + private readonly subagentToolNames: Set; + private readonly onSubagentChange?: () => void; + private readonly subagents = new Map(); + private readonly namespaceToToolCallId = new Map(); + private readonly pendingMatches = new Map(); + + constructor(options: SubagentTrackerOptions = {}) { + this.subagentToolNames = new Set(options.subagentToolNames ?? DEFAULT_SUBAGENT_TOOL_NAMES); + this.onSubagentChange = options.onSubagentChange; + } + + clear(): void { + this.subagents.clear(); + this.namespaceToToolCallId.clear(); + this.pendingMatches.clear(); + this.onSubagentChange?.(); + } + + getSubagents(): Map { + const visible = new Map(); + for (const [id, subagent] of this.subagents) { + if (subagent.status !== 'pending') { + visible.set(id, subagent); + } + } + return visible; + } + + registerFromToolCalls(toolCalls: TrackedToolCall[], aiMessageId?: string | null): void { + let changed = false; + for (const toolCall of toolCalls) { + if (!this.subagentToolNames.has(toolCall.name)) continue; + + const id = toolCall.id; + if (!id) continue; + + const args = parseToolCallArgs(toolCall.args); + if (!isValidSubagentType(args['subagent_type'])) continue; + + const existing = this.subagents.get(id); + this.subagents.set(id, { + id, + status: existing?.status ?? 'pending', + toolCall: { + id, + name: toolCall.name, + args: { + ...args, + ...(aiMessageId ? { _aiMessageId: aiMessageId } : {}), + }, + }, + values: existing?.values ?? {}, + messages: existing?.messages ?? [], + }); + changed = true; + } + + if (changed) { + this.retryPendingMatches(); + this.onSubagentChange?.(); + } + } + + reconstructFromMessages(messages: BaseMessage[], options: { skipIfPopulated?: boolean } = {}): void { + if (options.skipIfPopulated && this.subagents.size > 0) return; + + for (const message of messages) { + const raw = message as unknown as Record; + if (isAiMessageWithToolCalls(raw)) { + this.registerFromToolCalls( + raw['tool_calls'] as TrackedToolCall[], + typeof raw['id'] === 'string' ? raw['id'] : null, + ); + } else if (isToolMessage(raw)) { + this.processToolMessage(raw['tool_call_id'], raw['content'], raw['status'] === 'error' ? 'error' : 'success'); + } + } + } + + matchSubgraphToSubagent(namespaceId: string, description: string): string | undefined { + if (this.namespaceToToolCallId.has(namespaceId)) { + return this.namespaceToToolCallId.get(namespaceId); + } + + const mapped = new Set(this.namespaceToToolCallId.values()); + const establish = (toolCallId: string): string => { + this.namespaceToToolCallId.set(namespaceId, toolCallId); + const subagent = this.subagents.get(toolCallId); + if (subagent) { + this.subagents.set(toolCallId, { + ...subagent, + status: subagent.status === 'complete' || subagent.status === 'error' ? subagent.status : 'running', + }); + } + this.onSubagentChange?.(); + return toolCallId; + }; + + for (const [toolCallId, subagent] of this.subagents) { + if (mapped.has(toolCallId)) continue; + if (subagent.toolCall.args['description'] === description) { + return establish(toolCallId); + } + } + + for (const [toolCallId, subagent] of this.subagents) { + if (mapped.has(toolCallId)) continue; + const subagentDescription = subagent.toolCall.args['description']; + if (typeof subagentDescription !== 'string' || !subagentDescription) continue; + if (description.includes(subagentDescription) || subagentDescription.includes(description)) { + return establish(toolCallId); + } + } + + for (const [toolCallId, subagent] of this.subagents) { + if (!mapped.has(toolCallId) && (subagent.status === 'pending' || subagent.status === 'running')) { + return establish(toolCallId); + } + } + + if (description) { + this.pendingMatches.set(namespaceId, description); + } + return undefined; + } + + markRunningFromNamespace(namespaceId: string, namespace?: string[]): void { + const toolCallId = this.resolveToolCallId(namespaceId); + const subagent = this.subagents.get(toolCallId); + if (!subagent) return; + + if (!this.namespaceToToolCallId.has(namespaceId)) { + this.namespaceToToolCallId.set(namespaceId, toolCallId); + } + this.subagents.set(toolCallId, { + ...subagent, + status: subagent.status === 'complete' || subagent.status === 'error' ? subagent.status : 'running', + values: { + ...subagent.values, + ...(namespace ? { namespace } : {}), + }, + }); + this.onSubagentChange?.(); + } + + updateSubagentValues(namespaceId: string, values: Record): void { + const toolCallId = this.resolveToolCallId(namespaceId); + const subagent = this.subagents.get(toolCallId); + if (!subagent) return; + + this.subagents.set(toolCallId, { + ...subagent, + status: subagent.status === 'complete' || subagent.status === 'error' ? subagent.status : 'running', + values, + }); + this.onSubagentChange?.(); + } + + addMessageToSubagent(namespaceId: string, message: BaseMessage): void { + const toolCallId = this.resolveToolCallId(namespaceId); + const subagent = this.subagents.get(toolCallId); + if (!subagent) return; + + this.subagents.set(toolCallId, { + ...subagent, + status: subagent.status === 'complete' || subagent.status === 'error' ? subagent.status : 'running', + messages: mergeMessages(subagent.messages, [message]), + }); + this.onSubagentChange?.(); + } + + processToolMessage(toolCallId: string, content: unknown, status: 'success' | 'error'): void { + const subagent = this.subagents.get(toolCallId); + if (!subagent) return; + + this.subagents.set(toolCallId, { + ...subagent, + status: status === 'error' ? 'error' : 'complete', + values: { + ...subagent.values, + result: content, + }, + }); + this.onSubagentChange?.(); + } + + private retryPendingMatches(): void { + for (const [namespaceId, description] of this.pendingMatches) { + if (this.matchSubgraphToSubagent(namespaceId, description)) { + this.pendingMatches.delete(namespaceId); + } + } + } + + private resolveToolCallId(namespaceId: string): string { + return this.namespaceToToolCallId.get(namespaceId) ?? namespaceId; + } +} + +export function isSubagentNamespace(namespace: string[] | string | undefined): boolean { + if (!namespace) return false; + if (typeof namespace === 'string') return namespace.includes('tools:'); + return namespace.some(segment => segment.startsWith('tools:')); +} + +export function extractToolCallIdFromNamespace(namespace: string[] | undefined): string | undefined { + if (!namespace) return undefined; + for (const segment of namespace) { + if (segment.startsWith('tools:')) return segment.slice(6); + } + return undefined; +} + +function parseToolCallArgs(args: Record | string): Record { + if (typeof args !== 'string') return args; + try { + const parsed = JSON.parse(args) as unknown; + return parsed && typeof parsed === 'object' && !Array.isArray(parsed) + ? parsed as Record + : {}; + } catch { + return {}; + } +} + +function isValidSubagentType(value: unknown): value is string { + return typeof value === 'string' + && value.length >= 3 + && value.length <= 50 + && /^[a-zA-Z][a-zA-Z0-9_-]*$/.test(value); +} + +function isAiMessageWithToolCalls(value: Record): boolean { + return (value['type'] === 'ai' || value['type'] === 'assistant') + && Array.isArray(value['tool_calls']); +} + +function isToolMessage(value: Record): value is Record & { tool_call_id: string } { + return value['type'] === 'tool' && typeof value['tool_call_id'] === 'string'; +} + +function mergeMessages(existing: BaseMessage[], incoming: BaseMessage[]): BaseMessage[] { + const merged = [...existing]; + for (const msg of incoming) { + const id = getMessageId(msg); + const idx = id ? merged.findIndex(m => getMessageId(m) === id) : -1; + if (idx >= 0) { + merged[idx] = msg; + } else { + merged.push(msg); + } + } + return merged; +} + +function getMessageId(message: BaseMessage): string | undefined { + return (message as unknown as { id?: string }).id; +} diff --git a/libs/langgraph/src/lib/transport/fetch-stream.transport.spec.ts b/libs/langgraph/src/lib/transport/fetch-stream.transport.spec.ts index 25427f064..e23b3c877 100644 --- a/libs/langgraph/src/lib/transport/fetch-stream.transport.spec.ts +++ b/libs/langgraph/src/lib/transport/fetch-stream.transport.spec.ts @@ -85,6 +85,50 @@ describe('FetchStreamTransport', () => { ); }); + it('requests subgraph streams so subagent namespaces are delivered', async () => { + mocks.runsStream.mockReturnValue((async function* () { + yield { event: 'metadata', data: { run_id: 'run-1', thread_id: 'thread-1' } }; + })()); + + const transport = new FetchStreamTransport('http://example.test'); + await collect( + transport.stream('assistant-1', 'thread-1', { input: 'hello' }, new AbortController().signal), + ); + + expect(mocks.runsStream).toHaveBeenCalledWith( + 'thread-1', + 'assistant-1', + expect.objectContaining({ + streamSubgraphs: true, + }), + ); + }); + + it('preserves namespaced subgraph event types during normalization', async () => { + const message = { id: 'sub-ai-1', type: 'ai', content: 'working' }; + const metadata = { checkpoint_ns: 'tools:call-1|model:abc' }; + mocks.runsStream.mockReturnValue( + (async function* () { + yield { event: 'messages|tools:call-1', data: [message, metadata] }; + })(), + ); + + const transport = new FetchStreamTransport('http://example.test'); + const events = await collect( + transport.stream('assistant-1', 'thread-1', { input: 'hello' }, new AbortController().signal), + ); + + expect(events).toEqual([ + { + type: 'messages|tools:call-1', + namespace: ['tools:call-1'], + messages: [message], + messageMetadata: metadata, + data: [message, metadata], + }, + ]); + }); + it('normalizes message tuple events without dropping metadata', async () => { const message = { id: 'ai-1', type: 'ai', content: 'pong' }; const metadata = { langgraph_node: 'model', run_id: 'run-1' }; diff --git a/libs/langgraph/src/lib/transport/fetch-stream.transport.ts b/libs/langgraph/src/lib/transport/fetch-stream.transport.ts index 6120bb001..0ceef0e53 100644 --- a/libs/langgraph/src/lib/transport/fetch-stream.transport.ts +++ b/libs/langgraph/src/lib/transport/fetch-stream.transport.ts @@ -58,6 +58,7 @@ export class FetchStreamTransport implements AgentTransport { const run = this.client.runs.stream(thread, assistantId, { input: payload as Record, streamMode: streamMode as unknown as 'values', + streamSubgraphs: true, ...opts, }); @@ -86,23 +87,36 @@ export class FetchStreamTransport implements AgentTransport { } function normalizeSdkEvent(type: StreamEvent['type'], data: unknown): StreamEvent { - if (type === 'messages' && Array.isArray(data) && data.length === 2 && isRecord(data[1])) { - return { type, messages: [data[0]], messageMetadata: data[1], data }; + const namespace = extractNamespace(type); + const baseType = getBaseEventType(type); + + if (baseType === 'messages' && Array.isArray(data) && data.length === 2 && isRecord(data[1])) { + return { type, ...(namespace ? { namespace } : {}), messages: [data[0]], messageMetadata: data[1], data }; } if (isMessagesEvent(type) && Array.isArray(data)) { - return { type, messages: data, data }; + return { type, ...(namespace ? { namespace } : {}), messages: data, data }; } if (isRecord(data)) { - return { type, ...data, data }; + return { type, ...(namespace ? { namespace } : {}), ...data, data }; } - return { type, data }; + return { type, ...(namespace ? { namespace } : {}), data }; } function isMessagesEvent(type: StreamEvent['type']): boolean { - return type === 'messages' || type.startsWith('messages/'); + const baseType = getBaseEventType(type); + return baseType === 'messages' || baseType.startsWith('messages/'); +} + +function getBaseEventType(type: StreamEvent['type']): string { + return String(type).split('|')[0]; +} + +function extractNamespace(type: StreamEvent['type']): string[] | undefined { + const parts = String(type).split('|'); + return parts.length > 1 ? parts.slice(1) : undefined; } function isRecord(value: unknown): value is Record {