diff --git a/apps/website/content/docs/agent/api/api-docs.json b/apps/website/content/docs/agent/api/api-docs.json index 01a7079e4..f026a599e 100644 --- a/apps/website/content/docs/agent/api/api-docs.json +++ b/apps/website/content/docs/agent/api/api-docs.json @@ -49,7 +49,7 @@ }, { "name": "createQueuedRun", - "signature": "createQueuedRun(assistantId: string, threadId: string, payload: unknown, signal: AbortSignal)", + "signature": "createQueuedRun(assistantId: string, threadId: string, payload: unknown, signal: AbortSignal, options: LangGraphSubmitOptions)", "description": "Create a pending server-side run using LangGraph's enqueue strategy.", "params": [ { @@ -75,6 +75,12 @@ "type": "AbortSignal", "description": "", "optional": false + }, + { + "name": "options", + "type": "LangGraphSubmitOptions", + "description": "", + "optional": true } ] }, @@ -130,7 +136,7 @@ }, { "name": "stream", - "signature": "stream(assistantId: string, threadId: string | null, payload: unknown, signal: AbortSignal)", + "signature": "stream(assistantId: string, threadId: string | null, payload: unknown, signal: AbortSignal, options: LangGraphSubmitOptions)", "description": "Open a streaming connection, creating a thread if needed.", "params": [ { @@ -156,6 +162,12 @@ "type": "AbortSignal", "description": "", "optional": false + }, + { + "name": "options", + "type": "LangGraphSubmitOptions", + "description": "", + "optional": true } ] } @@ -206,6 +218,12 @@ "type": "object[]", "description": "", "optional": false + }, + { + "name": "streams", + "type": "object[]", + "description": "", + "optional": false } ], "methods": [ @@ -242,7 +260,7 @@ }, { "name": "createQueuedRun", - "signature": "createQueuedRun(_assistantId: string, threadId: string, payload: unknown, signal: AbortSignal)", + "signature": "createQueuedRun(_assistantId: string, threadId: string, payload: unknown, signal: AbortSignal, options: LangGraphSubmitOptions)", "description": "Optional: create a server-side queued run without joining it immediately.", "params": [ { @@ -268,6 +286,12 @@ "type": "AbortSignal", "description": "", "optional": false + }, + { + "name": "options", + "type": "LangGraphSubmitOptions", + "description": "", + "optional": true } ] }, @@ -361,7 +385,7 @@ }, { "name": "stream", - "signature": "stream(_assistantId: string, _threadId: string | null, _payload: unknown, signal: AbortSignal)", + "signature": "stream(_assistantId: string, _threadId: string | null, _payload: unknown, signal: AbortSignal, options: LangGraphSubmitOptions)", "description": "Open a streaming connection to an agent and yield events.", "params": [ { @@ -387,6 +411,12 @@ "type": "AbortSignal", "description": "", "optional": false + }, + { + "name": "options", + "type": "LangGraphSubmitOptions", + "description": "", + "optional": true } ] } @@ -918,7 +948,7 @@ { "name": "submit", "type": "object", - "description": "", + "description": "Submit input, resume commands, checkpoint forks, or other LangGraph run options.", "optional": false }, { @@ -953,17 +983,143 @@ "kind": "interface", "description": "Options accepted by LangGraph-backed submit calls.", "properties": [ + { + "name": "afterSeconds", + "type": "number", + "description": "", + "optional": true + }, + { + "name": "checkpoint", + "type": "Omit | null", + "description": "", + "optional": true + }, + { + "name": "checkpointDuring", + "type": "boolean", + "description": "", + "optional": true + }, + { + "name": "checkpointId", + "type": "string", + "description": "", + "optional": true + }, + { + "name": "command", + "type": "Command", + "description": "", + "optional": true + }, + { + "name": "config", + "type": "Config", + "description": "", + "optional": true + }, + { + "name": "context", + "type": "unknown", + "description": "", + "optional": true + }, + { + "name": "durability", + "type": "LangGraphDurability", + "description": "", + "optional": true + }, + { + "name": "feedbackKeys", + "type": "string[]", + "description": "", + "optional": true + }, + { + "name": "ifNotExists", + "type": "\"create\" | \"reject\"", + "description": "", + "optional": true + }, + { + "name": "interruptAfter", + "type": "string[] | \"*\"", + "description": "", + "optional": true + }, + { + "name": "interruptBefore", + "type": "string[] | \"*\"", + "description": "", + "optional": true + }, + { + "name": "metadata", + "type": "Metadata", + "description": "", + "optional": true + }, { "name": "multitaskStrategy", "type": "LangGraphMultitaskStrategy", "description": "Strategy for handling concurrent runs on the same thread.", "optional": true }, + { + "name": "onCompletion", + "type": "LangGraphOnCompletion", + "description": "", + "optional": true + }, + { + "name": "onDisconnect", + "type": "LangGraphOnDisconnect", + "description": "", + "optional": true + }, + { + "name": "onRunCreated", + "type": "object", + "description": "", + "optional": true + }, + { + "name": "resume", + "type": "unknown", + "description": "Convenience alias normalized to `command.resume` before invoking LangGraph.", + "optional": true + }, { "name": "signal", "type": "AbortSignal", "description": "", "optional": true + }, + { + "name": "streamMode", + "type": "StreamMode[]", + "description": "", + "optional": true + }, + { + "name": "streamResumable", + "type": "boolean", + "description": "", + "optional": true + }, + { + "name": "streamSubgraphs", + "type": "boolean", + "description": "", + "optional": true + }, + { + "name": "webhook", + "type": "string", + "description": "", + "optional": true } ], "examples": [] @@ -1150,7 +1306,7 @@ { "name": "submit", "type": "object", - "description": "", + "description": "Submit input, resume commands, checkpoint forks, or other LangGraph run options.", "optional": false }, { @@ -1205,7 +1361,7 @@ }, { "name": "type", - "type": "\"error\" | \"interrupt\" | \"values\" | `values|${string}` | \"messages\" | `messages|${string}` | `messages/${string}` | `messages/${string}|${string}` | \"updates\" | `updates|${string}` | \"tools\" | `tools|${string}` | \"custom\" | `custom|${string}` | `error|${string}` | \"metadata\" | \"checkpoints\" | `checkpoints|${string}` | \"tasks\" | `tasks|${string}` | \"debug\" | `debug|${string}` | \"events\" | `events|${string}` | \"interrupts\"", + "type": "\"error\" | \"values\" | \"messages\" | \"updates\" | \"events\" | \"debug\" | \"tasks\" | \"checkpoints\" | \"custom\" | \"tools\" | \"interrupt\" | `values|${string}` | `messages|${string}` | `messages/${string}` | `messages/${string}|${string}` | `updates|${string}` | `tools|${string}` | `custom|${string}` | `error|${string}` | \"metadata\" | `checkpoints|${string}` | `tasks|${string}` | `debug|${string}` | `events|${string}` | \"interrupts\"", "description": "Event type identifier (e.g., 'values', 'messages', 'error', 'interrupt').", "optional": false } @@ -1231,7 +1387,7 @@ }, { "name": "status", - "type": "Signal<\"running\" | \"error\" | \"pending\" | \"complete\">", + "type": "Signal<\"running\" | \"error\" | \"complete\" | \"pending\">", "description": "Current execution status of the subagent.", "optional": false }, diff --git a/apps/website/content/docs/agent/concepts/agent-architecture.mdx b/apps/website/content/docs/agent/concepts/agent-architecture.mdx index d4808e91b..1ba680049 100644 --- a/apps/website/content/docs/agent/concepts/agent-architecture.mdx +++ b/apps/website/content/docs/agent/concepts/agent-architecture.mdx @@ -593,12 +593,12 @@ export class DebugTimelineComponent { selectedState = computed(() => { const id = this.currentCheckpoint(); - return this.history().find(c => c.id === id)?.state; + return this.history().find(c => c.id === id)?.values; }); timeTravel(checkpointId: string) { this.currentCheckpoint.set(checkpointId); - this.agent.submit(null, { checkpoint: checkpointId }); + this.agent.submit(null, { checkpointId }); } } ``` diff --git a/libs/langgraph/src/lib/agent.fn.spec.ts b/libs/langgraph/src/lib/agent.fn.spec.ts index bb36bc658..739ea3026 100644 --- a/libs/langgraph/src/lib/agent.fn.spec.ts +++ b/libs/langgraph/src/lib/agent.fn.spec.ts @@ -209,6 +209,36 @@ describe('agent', () => { expect(Array.isArray(rawHist)).toBe(true); }); + it('normalizes resume submit options into a LangGraph command', async () => { + const seen: Array<{ payload: unknown; options: unknown }> = []; + const transport = new MockAgentTransport(); + transport.stream = async function* ( + _assistantId: string, + _threadId: string | null, + payload: unknown, + _signal: AbortSignal, + options?: unknown, + ) { + seen.push({ payload, options }); + yield* []; + }; + + const ref = withInjectionContext(() => + agent({ apiUrl: '', assistantId: 'a', transport, threadId: 'thread-1', throttle: false }) + ); + + await ref.submit(null, { resume: { approved: true } }); + + expect(seen).toEqual([ + { + payload: null, + options: expect.objectContaining({ + command: { resume: { approved: true } }, + }), + }, + ]); + }); + it('experimentalBranchTree() exposes a branch tree derived from LangGraph history', async () => { const root = threadState('root'); const left = threadState('left', 'root'); diff --git a/libs/langgraph/src/lib/agent.fn.ts b/libs/langgraph/src/lib/agent.fn.ts index 5a3e226f4..cc6720bed 100644 --- a/libs/langgraph/src/lib/agent.fn.ts +++ b/libs/langgraph/src/lib/agent.fn.ts @@ -227,8 +227,9 @@ export function agent< subagents: subagentsNeutral, events$, history: historyNeutral, - submit: (input: AgentSubmitInput, opts?: AgentSubmitOptions & LangGraphSubmitOptions) => { - manager.submit(buildSubmitPayload(input), opts); + submit: (input: AgentSubmitInput | null | undefined, opts?: AgentSubmitOptions & LangGraphSubmitOptions) => { + const request = buildSubmitRequest(input, opts); + manager.submit(request.payload, request.options); return Promise.resolve(); }, stop: () => manager.stop(), @@ -389,8 +390,19 @@ function getToolCallIds(msg: CoreAIMessage): string[] { .filter((id): id is string => id != null); } -function buildSubmitPayload(input: AgentSubmitInput): unknown { - if (input.resume !== undefined) return { __resume__: input.resume }; +function buildSubmitRequest( + input: AgentSubmitInput | null | undefined, + opts?: AgentSubmitOptions & LangGraphSubmitOptions, +): { payload: unknown; options?: AgentSubmitOptions & LangGraphSubmitOptions } { + return { + payload: buildSubmitPayload(input), + options: normalizeSubmitOptions(input, opts), + }; +} + +function buildSubmitPayload(input: AgentSubmitInput | null | undefined): unknown { + if (input == null) return null; + if (input.resume !== undefined) return null; if (input.message !== undefined) { const content = typeof input.message === 'string' ? input.message @@ -400,6 +412,27 @@ function buildSubmitPayload(input: AgentSubmitInput): unknown { return input.state ?? {}; } +function normalizeSubmitOptions( + input: AgentSubmitInput | null | undefined, + opts?: AgentSubmitOptions & LangGraphSubmitOptions, +): (AgentSubmitOptions & LangGraphSubmitOptions) | undefined { + const inputResume = input?.resume; + const optionResume = opts?.resume; + const resume = inputResume !== undefined ? inputResume : optionResume; + if (resume === undefined) return opts; + + const next = { ...(opts ?? {}) }; + delete next.resume; + const command = next.command; + return { + ...next, + command: { + ...command, + resume, + }, + }; +} + function randomId(): string { return Math.random().toString(36).slice(2); } diff --git a/libs/langgraph/src/lib/agent.types.ts b/libs/langgraph/src/lib/agent.types.ts index 7f555ccb8..fe689667b 100644 --- a/libs/langgraph/src/lib/agent.types.ts +++ b/libs/langgraph/src/lib/agent.types.ts @@ -4,18 +4,23 @@ import type { ResourceStatus as NgResourceStatus } from '@angular/core'; import { BehaviorSubject } from 'rxjs'; import type { BagTemplate, + Checkpoint, + Command, + Config, InferBag, Interrupt, + Metadata, ThreadState, ToolProgress, ToolCallWithResult, + StreamMode, } from '@langchain/langgraph-sdk'; import type { MessageMetadata, SubmitOptions, } from '@langchain/langgraph-sdk/ui'; import type { BaseMessage, AIMessage as CoreAIMessage } from '@langchain/core/messages'; -import type { AgentWithHistory } from '@ngaf/chat'; +import type { AgentSubmitInput, AgentSubmitOptions, AgentWithHistory } from '@ngaf/chat'; // Re-export SDK types so consumers don't need to import from langgraph-sdk directly export type { BagTemplate, InferBag, Interrupt, ThreadState, SubmitOptions }; @@ -76,9 +81,35 @@ export interface StreamEvent { /** Strategy for handling concurrent LangGraph runs on the same thread. */ export type LangGraphMultitaskStrategy = 'reject' | 'interrupt' | 'rollback' | 'enqueue'; +export type LangGraphDurability = 'exit' | 'async' | 'sync'; +export type LangGraphOnCompletion = 'complete' | 'continue'; +export type LangGraphOnDisconnect = 'cancel' | 'continue'; + /** Options accepted by LangGraph-backed submit calls. */ export interface LangGraphSubmitOptions { signal?: AbortSignal; + config?: Config; + context?: unknown; + checkpoint?: Omit | null; + checkpointId?: string; + command?: Command; + metadata?: Metadata; + checkpointDuring?: boolean; + durability?: LangGraphDurability; + interruptBefore?: '*' | string[]; + interruptAfter?: '*' | string[]; + onCompletion?: LangGraphOnCompletion; + webhook?: string; + onDisconnect?: LangGraphOnDisconnect; + afterSeconds?: number; + ifNotExists?: 'create' | 'reject'; + onRunCreated?: (params: { run_id: string; thread_id?: string }) => void; + streamMode?: StreamMode[]; + streamSubgraphs?: boolean; + streamResumable?: boolean; + feedbackKeys?: string[]; + /** Convenience alias normalized to `command.resume` before invoking LangGraph. */ + resume?: unknown; /** Strategy for handling concurrent runs on the same thread. */ multitaskStrategy?: LangGraphMultitaskStrategy; } @@ -144,6 +175,7 @@ export interface AgentTransport { threadId: string | null, payload: unknown, signal: AbortSignal, + options?: LangGraphSubmitOptions, ): AsyncIterable; /** Optional: join an already-started run without creating a new one. */ @@ -160,6 +192,7 @@ export interface AgentTransport { threadId: string, payload: unknown, signal: AbortSignal, + options?: LangGraphSubmitOptions, ): Promise; /** Optional: cancel a server-side run. */ @@ -249,6 +282,12 @@ export interface LangGraphAgent>; + /** Submit input, resume commands, checkpoint forks, or other LangGraph run options. */ + submit: ( + input: AgentSubmitInput | null | undefined, + opts?: AgentSubmitOptions & LangGraphSubmitOptions, + ) => Promise; + // ── AgentRef fields preserved on the unified surface ───────────────────── /** Current agent state values (raw, typed per the type parameter T). */ diff --git a/libs/langgraph/src/lib/internals/stream-manager.bridge.spec.ts b/libs/langgraph/src/lib/internals/stream-manager.bridge.spec.ts index 3e3c82b87..8675474f0 100644 --- a/libs/langgraph/src/lib/internals/stream-manager.bridge.spec.ts +++ b/libs/langgraph/src/lib/internals/stream-manager.bridge.spec.ts @@ -89,6 +89,50 @@ describe('createStreamManagerBridge', () => { destroy$.next(); }); + it('passes submit options through to the transport stream', async () => { + const seen: Array<{ payload: unknown; options: unknown }> = []; + const transport: AgentTransport = { + async *stream(_assistantId, _threadId, payload, _signal, options?: unknown) { + seen.push({ payload, options }); + yield* []; + }, + } as AgentTransport; + const subjects = makeSubjects(); + const destroy$ = new Subject(); + const bridge = createStreamManagerBridge({ + options: { apiUrl: '', assistantId: 'test', transport }, + subjects, + threadId$: of('thread-1'), + destroy$: destroy$.asObservable(), + }); + + await bridge.submit(null, { + checkpoint: { + checkpoint_ns: '', + checkpoint_id: 'checkpoint-1', + checkpoint_map: null, + }, + command: { resume: { approved: true } }, + metadata: { source: 'ui' }, + }); + + expect(seen).toEqual([ + { + payload: null, + options: expect.objectContaining({ + checkpoint: { + checkpoint_ns: '', + checkpoint_id: 'checkpoint-1', + checkpoint_map: null, + }, + command: { resume: { approved: true } }, + metadata: { source: 'ui' }, + }), + }, + ]); + destroy$.next(); + }); + it('loads history when initialized with a thread id', async () => { const history = [makeThreadState('checkpoint-1')]; const historyCalls: string[] = []; diff --git a/libs/langgraph/src/lib/internals/stream-manager.bridge.ts b/libs/langgraph/src/lib/internals/stream-manager.bridge.ts index fda4ab872..360457050 100644 --- a/libs/langgraph/src/lib/internals/stream-manager.bridge.ts +++ b/libs/langgraph/src/lib/internals/stream-manager.bridge.ts @@ -55,6 +55,7 @@ export function createStreamManagerBridge { + async function runStream(payload: unknown, opts?: LangGraphSubmitOptions): Promise { abortController?.abort(); abortController = new AbortController(); @@ -251,6 +253,7 @@ export function createStreamManagerBridge { @@ -566,7 +570,7 @@ export function createStreamManagerBridge { if (lastPayload !== null) { - await runStream(lastPayload); + await runStream(lastPayload, lastOptions); } }, }; diff --git a/libs/langgraph/src/lib/transport/fetch-stream.transport.spec.ts b/libs/langgraph/src/lib/transport/fetch-stream.transport.spec.ts index 408f09ff0..355a0296e 100644 --- a/libs/langgraph/src/lib/transport/fetch-stream.transport.spec.ts +++ b/libs/langgraph/src/lib/transport/fetch-stream.transport.spec.ts @@ -113,6 +113,83 @@ describe('FetchStreamTransport', () => { ); }); + it('forwards LangGraph submit options to streamed runs', async () => { + const checkpoint = { + checkpoint_ns: '', + checkpoint_id: 'checkpoint-1', + checkpoint_map: null, + }; + const config = { configurable: { userId: 'user-1' } }; + const metadata = { source: 'ui' }; + const command = { resume: { approved: true } }; + mocks.runsStream.mockReturnValue((async function* () { + yield { event: 'metadata', data: { run_id: 'run-1', thread_id: 'thread-1' } }; + })()); + + const transport = new FetchStreamTransport('http://example.test'); + await collect( + transport.stream( + 'assistant-1', + 'thread-1', + null, + new AbortController().signal, + { + checkpoint, + config, + metadata, + command, + durability: 'sync', + interruptBefore: ['review'], + onDisconnect: 'continue', + streamResumable: true, + feedbackKeys: ['quality'], + }, + ), + ); + + expect(mocks.runsStream).toHaveBeenCalledWith( + 'thread-1', + 'assistant-1', + expect.objectContaining({ + input: null, + checkpoint, + config, + metadata, + command, + durability: 'sync', + interruptBefore: ['review'], + onDisconnect: 'continue', + streamResumable: true, + feedbackKeys: ['quality'], + }), + ); + }); + + it('preserves explicit null checkpoints on streamed runs', async () => { + mocks.runsStream.mockReturnValue((async function* () { + yield { event: 'metadata', data: { run_id: 'run-1', thread_id: 'thread-1' } }; + })()); + + const transport = new FetchStreamTransport('http://example.test'); + await collect( + transport.stream( + 'assistant-1', + 'thread-1', + null, + new AbortController().signal, + { checkpoint: null }, + ), + ); + + expect(mocks.runsStream).toHaveBeenCalledWith( + 'thread-1', + 'assistant-1', + expect.objectContaining({ + checkpoint: null, + }), + ); + }); + it('preserves namespaced subgraph event types during normalization', async () => { const message = { id: 'sub-ai-1', type: 'ai', content: 'working' }; const metadata = { checkpoint_ns: 'tools:call-1|model:abc' }; @@ -241,6 +318,42 @@ describe('FetchStreamTransport', () => { expect(entry.createdAt).toBeInstanceOf(Date); }); + it('forwards LangGraph submit options when creating queued runs', async () => { + const checkpoint = { + checkpoint_ns: '', + checkpoint_id: 'checkpoint-queued', + checkpoint_map: null, + }; + mocks.runsCreate.mockResolvedValue({ + run_id: 'run-queued', + thread_id: 'thread-1', + created_at: '2026-05-02T00:00:00.000Z', + }); + + const transport = new FetchStreamTransport('http://example.test'); + await transport.createQueuedRun( + 'assistant-1', + 'thread-1', + { messages: [{ type: 'human', content: 'queued' }] }, + new AbortController().signal, + { + checkpoint, + command: { resume: { ok: true } }, + multitaskStrategy: 'interrupt', + }, + ); + + expect(mocks.runsCreate).toHaveBeenCalledWith( + 'thread-1', + 'assistant-1', + expect.objectContaining({ + checkpoint, + command: { resume: { ok: true } }, + multitaskStrategy: 'enqueue', + }), + ); + }); + it('cancels a queued run by thread and run id', async () => { const transport = new FetchStreamTransport('http://example.test'); diff --git a/libs/langgraph/src/lib/transport/fetch-stream.transport.ts b/libs/langgraph/src/lib/transport/fetch-stream.transport.ts index 202bae515..b765c4356 100644 --- a/libs/langgraph/src/lib/transport/fetch-stream.transport.ts +++ b/libs/langgraph/src/lib/transport/fetch-stream.transport.ts @@ -1,7 +1,7 @@ // SPDX-License-Identifier: MIT import { Client } from '@langchain/langgraph-sdk'; import type { StreamMode, ThreadState } from '@langchain/langgraph-sdk'; -import { AgentQueueEntry, AgentTransport, StreamEvent } from '../agent.types'; +import type { AgentQueueEntry, AgentTransport, LangGraphSubmitOptions, StreamEvent } from '../agent.types'; /** * Production transport that connects to a LangGraph Platform API via HTTP and SSE. @@ -44,10 +44,8 @@ export class FetchStreamTransport implements AgentTransport { threadId: string | null, payload: unknown, signal: AbortSignal, + options?: LangGraphSubmitOptions, ): AsyncIterable { - const streamMode = ['values', 'messages-tuple', 'updates', 'tools', 'custom'] satisfies StreamMode[]; - const opts = { signal }; - let thread = threadId; if (!thread) { const t = await this.client.threads.create(); @@ -55,12 +53,11 @@ export class FetchStreamTransport implements AgentTransport { this.onThreadId?.(thread); } - const run = this.client.runs.stream(thread, assistantId, { - input: payload as Record, - streamMode: streamMode as unknown as 'values', - streamSubgraphs: true, - ...opts, - }); + const run = this.client.runs.stream( + thread, + assistantId, + buildRunPayload(payload, signal, options), + ); for await (const event of run) { yield normalizeSdkEvent(event.event as StreamEvent['type'], event.data); @@ -91,14 +88,11 @@ export class FetchStreamTransport implements AgentTransport { threadId: string, payload: unknown, signal: AbortSignal, + options?: LangGraphSubmitOptions, ): Promise { - const streamMode = ['values', 'messages-tuple', 'updates', 'tools', 'custom'] satisfies StreamMode[]; const run = await this.client.runs.create(threadId, assistantId, { - input: payload as Record, - streamMode: streamMode as unknown as 'values', - streamSubgraphs: true, + ...buildRunPayload(payload, signal, options), multitaskStrategy: 'enqueue', - signal, }); return { @@ -121,6 +115,41 @@ export class FetchStreamTransport implements AgentTransport { } } +function buildRunPayload( + input: unknown, + signal: AbortSignal, + options?: LangGraphSubmitOptions, +): { + input: Record | null; + streamMode: StreamMode[]; + streamSubgraphs: boolean; + signal: AbortSignal; +} & Omit { + const runOptions = { ...(options ?? {}) }; + const hasCheckpoint = Object.prototype.hasOwnProperty.call(runOptions, 'checkpoint'); + const checkpoint = runOptions.checkpoint; + const streamMode = runOptions.streamMode; + const streamSubgraphs = runOptions.streamSubgraphs; + delete runOptions.signal; + delete runOptions.resume; + delete runOptions.checkpoint; + delete runOptions.streamMode; + delete runOptions.streamSubgraphs; + + return { + ...runOptions, + ...(hasCheckpoint ? { checkpoint } : {}), + input: input as Record | null, + streamMode: streamMode ?? defaultStreamMode(), + streamSubgraphs: streamSubgraphs ?? true, + signal, + }; +} + +function defaultStreamMode(): StreamMode[] { + return ['values', 'messages-tuple', 'updates', 'tools', 'custom']; +} + function normalizeSdkEvent(type: StreamEvent['type'], data: unknown): StreamEvent { const namespace = extractNamespace(type); const baseType = getBaseEventType(type); diff --git a/libs/langgraph/src/lib/transport/mock-stream.transport.ts b/libs/langgraph/src/lib/transport/mock-stream.transport.ts index d2f6ad284..56db5d2e4 100644 --- a/libs/langgraph/src/lib/transport/mock-stream.transport.ts +++ b/libs/langgraph/src/lib/transport/mock-stream.transport.ts @@ -1,5 +1,5 @@ // SPDX-License-Identifier: MIT -import { AgentQueueEntry, AgentTransport, StreamEvent } from '../agent.types'; +import type { AgentQueueEntry, AgentTransport, LangGraphSubmitOptions, StreamEvent } from '../agent.types'; import type { ThreadState } from '@langchain/langgraph-sdk'; /** @@ -19,6 +19,7 @@ import type { ThreadState } from '@langchain/langgraph-sdk'; export class MockAgentTransport implements AgentTransport { history: ThreadState[] = []; readonly historyCalls: string[] = []; + readonly streams: Array<{ threadId: string | null; payload: unknown; options?: LangGraphSubmitOptions }> = []; readonly createdQueuedRuns: AgentQueueEntry[] = []; readonly cancelledRuns: Array<{ threadId: string; runId: string }> = []; readonly joinedRuns: Array<{ threadId: string; runId: string }> = []; @@ -70,7 +71,9 @@ export class MockAgentTransport implements AgentTransport { _threadId: string | null, _payload: unknown, signal: AbortSignal, + options?: LangGraphSubmitOptions, ): AsyncIterable { + this.streams.push({ threadId: _threadId, payload: _payload, options }); this.streaming = true; try { while (!this.closed && !signal.aborted) { @@ -102,13 +105,14 @@ export class MockAgentTransport implements AgentTransport { threadId: string, payload: unknown, signal: AbortSignal, + options?: LangGraphSubmitOptions, ): Promise { void signal; const entry: AgentQueueEntry = { id: `queued-run-${this.createdQueuedRuns.length + 1}`, threadId, values: payload, - options: { multitaskStrategy: 'enqueue' }, + options: { ...options, multitaskStrategy: 'enqueue' }, createdAt: new Date(), }; this.createdQueuedRuns.push(entry);