diff --git a/libs/langgraph/src/lib/agent.fn.spec.ts b/libs/langgraph/src/lib/agent.fn.spec.ts index 739ea3026..923ac04ac 100644 --- a/libs/langgraph/src/lib/agent.fn.spec.ts +++ b/libs/langgraph/src/lib/agent.fn.spec.ts @@ -76,6 +76,25 @@ describe('agent', () => { expect(ref.isLoading()).toBe(true); }); + it('submit() resolves after the active stream completes', async () => { + const transport = new MockAgentTransport(); + const ref = withInjectionContext(() => + agent({ apiUrl: '', assistantId: 'a', transport }) + ); + + let settled = false; + const submitted = ref.submit({ message: 'hello' }).then(() => { + settled = true; + }); + + await Promise.resolve(); + expect(settled).toBe(false); + + transport.close(); + await submitted; + expect(settled).toBe(true); + }); + it('queue() exposes server-side enqueue submissions', async () => { const transport = new MockAgentTransport(); const ref = withInjectionContext(() => @@ -135,8 +154,9 @@ describe('agent', () => { const ref = withInjectionContext(() => agent({ apiUrl: '', assistantId: 'a', transport }) ); - await ref.submit({ message: 'hello' }); + const submitted = ref.submit({ message: 'hello' }); transport.close(); + await submitted; await new Promise(r => setTimeout(r, 10)); ref.reload(); expect(ref.isLoading()).toBe(true); @@ -239,6 +259,130 @@ describe('agent', () => { ]); }); + it('normalizes resume submit input state into a LangGraph command update', async () => { + const seen: Array<{ payload: unknown; options: unknown }> = []; + const transport = new MockAgentTransport(); + transport.stream = async function* ( + _assistantId: string, + _threadId: string | null, + payload: unknown, + _signal: AbortSignal, + options?: unknown, + ) { + seen.push({ payload, options }); + yield* []; + }; + + const ref = withInjectionContext(() => + agent({ apiUrl: '', assistantId: 'a', transport, threadId: 'thread-1', throttle: false }) + ); + + await ref.submit({ + resume: { approved: true }, + state: { reviewNotes: 'ship it' }, + }); + + expect(seen).toEqual([ + { + payload: null, + options: expect.objectContaining({ + command: { + resume: { approved: true }, + update: { reviewNotes: 'ship it' }, + }, + }), + }, + ]); + }); + + it('normalizes resume submit input messages into a LangGraph command update', async () => { + const seen: Array<{ payload: unknown; options: unknown }> = []; + const transport = new MockAgentTransport(); + transport.stream = async function* ( + _assistantId: string, + _threadId: string | null, + payload: unknown, + _signal: AbortSignal, + options?: unknown, + ) { + seen.push({ payload, options }); + yield* []; + }; + + const ref = withInjectionContext(() => + agent({ apiUrl: '', assistantId: 'a', transport, threadId: 'thread-1', throttle: false }) + ); + + await ref.submit({ + message: 'Approved, continue', + resume: { approved: true }, + state: { reviewer: 'Ada' }, + }); + + expect(seen).toEqual([ + { + payload: null, + options: expect.objectContaining({ + command: { + resume: { approved: true }, + update: { + messages: [{ type: 'human', role: 'human', content: 'Approved, continue' }], + reviewer: 'Ada', + }, + }, + }), + }, + ]); + }); + + it('merges resume submit updates with existing LangGraph command updates', async () => { + const seen: Array<{ payload: unknown; options: unknown }> = []; + const transport = new MockAgentTransport(); + transport.stream = async function* ( + _assistantId: string, + _threadId: string | null, + payload: unknown, + _signal: AbortSignal, + options?: unknown, + ) { + seen.push({ payload, options }); + yield* []; + }; + + const ref = withInjectionContext(() => + agent({ apiUrl: '', assistantId: 'a', transport, threadId: 'thread-1', throttle: false }) + ); + + await ref.submit( + { + resume: { approved: true }, + state: { reviewer: 'Ada' }, + }, + { + command: { + update: { workflow: 'release' }, + goto: 'publish', + }, + }, + ); + + expect(seen).toEqual([ + { + payload: null, + options: expect.objectContaining({ + command: { + resume: { approved: true }, + update: { + workflow: 'release', + reviewer: 'Ada', + }, + goto: 'publish', + }, + }), + }, + ]); + }); + it('experimentalBranchTree() exposes a branch tree derived from LangGraph history', async () => { const root = threadState('root'); const left = threadState('left', 'root'); diff --git a/libs/langgraph/src/lib/agent.fn.ts b/libs/langgraph/src/lib/agent.fn.ts index 3cdc83262..1b9770e28 100644 --- a/libs/langgraph/src/lib/agent.fn.ts +++ b/libs/langgraph/src/lib/agent.fn.ts @@ -12,7 +12,7 @@ import { import { takeUntil } from 'rxjs/operators'; import type { Observable } from 'rxjs'; import type { BaseMessage, AIMessage as CoreAIMessage } from '@langchain/core/messages'; -import type { Interrupt, ToolCallWithResult } from '@langchain/langgraph-sdk'; +import type { Command, Interrupt, ToolCallWithResult } from '@langchain/langgraph-sdk'; import type { BagTemplate, InferBag } from '@langchain/langgraph-sdk'; import type { AgentEvent, @@ -242,8 +242,7 @@ export function agent< history: historyNeutral, submit: (input: AgentSubmitInput | null | undefined, opts?: AgentSubmitOptions & LangGraphSubmitOptions) => { const request = buildSubmitRequest(input, opts); - manager.submit(request.payload, request.options); - return Promise.resolve(); + return manager.submit(request.payload, request.options); }, stop: () => manager.stop(), @@ -416,18 +415,7 @@ function buildSubmitRequest( function buildSubmitPayload(input: AgentSubmitInput | null | undefined): unknown { if (input == null) return null; if (input.resume !== undefined) return null; - if (input.message !== undefined) { - const content = typeof input.message === 'string' - ? input.message - : input.message.map((b: ContentBlock) => (b.type === 'text' ? b.text : JSON.stringify(b))).join(''); - // `type: 'human'` is what `toMessage()` reads via `_getType` || raw['type']; - // `role: 'human'` is what the LangGraph server expects in submit payloads. - // Include both so the optimistic local copy projects as a 'user' bubble - // (otherwise toMessage falls through to the 'ai' default and renders the - // user's question as an assistant message). - return { messages: [{ type: 'human', role: 'human', content }], ...(input.state ?? {}) }; - } - return input.state ?? {}; + return buildSubmitUpdate(input) ?? {}; } function normalizeSubmitOptions( @@ -442,15 +430,45 @@ function normalizeSubmitOptions( const next = { ...(opts ?? {}) }; delete next.resume; const command = next.command; + const update = buildSubmitUpdate(input); + const commandUpdate = mergeCommandUpdate(command?.update, update); return { ...next, command: { ...command, resume, + ...(commandUpdate === undefined ? {} : { update: commandUpdate }), }, }; } +function buildSubmitUpdate(input: AgentSubmitInput | null | undefined): Record | undefined { + if (input == null) return undefined; + if (input.message !== undefined) { + const content = typeof input.message === 'string' + ? input.message + : input.message.map((b: ContentBlock) => (b.type === 'text' ? b.text : JSON.stringify(b))).join(''); + // `type: 'human'` is what `toMessage()` reads via `_getType` || raw['type']; + // `role: 'human'` is what the LangGraph server expects in submit payloads. + // Include both so the optimistic local copy projects as a 'user' bubble + // (otherwise toMessage falls through to the 'ai' default and renders the + // user's question as an assistant message). + return { messages: [{ type: 'human', role: 'human', content }], ...(input.state ?? {}) }; + } + return input.state; +} + +function mergeCommandUpdate( + existing: Command['update'] | undefined, + update: Record | undefined, +): Command['update'] | undefined { + if (update === undefined) return existing; + if (existing == null) return update; + if (isRecord(existing)) return { ...existing, ...update }; + if (Array.isArray(existing)) return [...existing, ...Object.entries(update)]; + return update; +} + function randomId(): string { return Math.random().toString(36).slice(2); }