Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
146 changes: 145 additions & 1 deletion libs/langgraph/src/lib/agent.fn.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,25 @@ describe('agent', () => {
expect(ref.isLoading()).toBe(true);
});

it('submit() resolves after the active stream completes', async () => {
const transport = new MockAgentTransport();
const ref = withInjectionContext(() =>
agent({ apiUrl: '', assistantId: 'a', transport })
);

let settled = false;
const submitted = ref.submit({ message: 'hello' }).then(() => {
settled = true;
});

await Promise.resolve();
expect(settled).toBe(false);

transport.close();
await submitted;
expect(settled).toBe(true);
});

it('queue() exposes server-side enqueue submissions', async () => {
const transport = new MockAgentTransport();
const ref = withInjectionContext(() =>
Expand Down Expand Up @@ -135,8 +154,9 @@ describe('agent', () => {
const ref = withInjectionContext(() =>
agent({ apiUrl: '', assistantId: 'a', transport })
);
await ref.submit({ message: 'hello' });
const submitted = ref.submit({ message: 'hello' });
transport.close();
await submitted;
await new Promise(r => setTimeout(r, 10));
ref.reload();
expect(ref.isLoading()).toBe(true);
Expand Down Expand Up @@ -239,6 +259,130 @@ describe('agent', () => {
]);
});

it('normalizes resume submit input state into a LangGraph command update', async () => {
const seen: Array<{ payload: unknown; options: unknown }> = [];
const transport = new MockAgentTransport();
transport.stream = async function* (
_assistantId: string,
_threadId: string | null,
payload: unknown,
_signal: AbortSignal,
options?: unknown,
) {
seen.push({ payload, options });
yield* [];
};

const ref = withInjectionContext(() =>
agent({ apiUrl: '', assistantId: 'a', transport, threadId: 'thread-1', throttle: false })
);

await ref.submit({
resume: { approved: true },
state: { reviewNotes: 'ship it' },
});

expect(seen).toEqual([
{
payload: null,
options: expect.objectContaining({
command: {
resume: { approved: true },
update: { reviewNotes: 'ship it' },
},
}),
},
]);
});

it('normalizes resume submit input messages into a LangGraph command update', async () => {
const seen: Array<{ payload: unknown; options: unknown }> = [];
const transport = new MockAgentTransport();
transport.stream = async function* (
_assistantId: string,
_threadId: string | null,
payload: unknown,
_signal: AbortSignal,
options?: unknown,
) {
seen.push({ payload, options });
yield* [];
};

const ref = withInjectionContext(() =>
agent({ apiUrl: '', assistantId: 'a', transport, threadId: 'thread-1', throttle: false })
);

await ref.submit({
message: 'Approved, continue',
resume: { approved: true },
state: { reviewer: 'Ada' },
});

expect(seen).toEqual([
{
payload: null,
options: expect.objectContaining({
command: {
resume: { approved: true },
update: {
messages: [{ type: 'human', role: 'human', content: 'Approved, continue' }],
reviewer: 'Ada',
},
},
}),
},
]);
});

it('merges resume submit updates with existing LangGraph command updates', async () => {
const seen: Array<{ payload: unknown; options: unknown }> = [];
const transport = new MockAgentTransport();
transport.stream = async function* (
_assistantId: string,
_threadId: string | null,
payload: unknown,
_signal: AbortSignal,
options?: unknown,
) {
seen.push({ payload, options });
yield* [];
};

const ref = withInjectionContext(() =>
agent({ apiUrl: '', assistantId: 'a', transport, threadId: 'thread-1', throttle: false })
);

await ref.submit(
{
resume: { approved: true },
state: { reviewer: 'Ada' },
},
{
command: {
update: { workflow: 'release' },
goto: 'publish',
},
},
);

expect(seen).toEqual([
{
payload: null,
options: expect.objectContaining({
command: {
resume: { approved: true },
update: {
workflow: 'release',
reviewer: 'Ada',
},
goto: 'publish',
},
}),
},
]);
});

it('experimentalBranchTree() exposes a branch tree derived from LangGraph history', async () => {
const root = threadState('root');
const left = threadState('left', 'root');
Expand Down
48 changes: 33 additions & 15 deletions libs/langgraph/src/lib/agent.fn.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ import {
import { takeUntil } from 'rxjs/operators';
import type { Observable } from 'rxjs';
import type { BaseMessage, AIMessage as CoreAIMessage } from '@langchain/core/messages';
import type { Interrupt, ToolCallWithResult } from '@langchain/langgraph-sdk';
import type { Command, Interrupt, ToolCallWithResult } from '@langchain/langgraph-sdk';
import type { BagTemplate, InferBag } from '@langchain/langgraph-sdk';
import type {
AgentEvent,
Expand Down Expand Up @@ -242,8 +242,7 @@ export function agent<
history: historyNeutral,
submit: (input: AgentSubmitInput | null | undefined, opts?: AgentSubmitOptions & LangGraphSubmitOptions) => {
const request = buildSubmitRequest(input, opts);
manager.submit(request.payload, request.options);
return Promise.resolve();
return manager.submit(request.payload, request.options);
},
stop: () => manager.stop(),

Expand Down Expand Up @@ -416,18 +415,7 @@ function buildSubmitRequest(
function buildSubmitPayload(input: AgentSubmitInput | null | undefined): unknown {
if (input == null) return null;
if (input.resume !== undefined) return null;
if (input.message !== undefined) {
const content = typeof input.message === 'string'
? input.message
: input.message.map((b: ContentBlock) => (b.type === 'text' ? b.text : JSON.stringify(b))).join('');
// `type: 'human'` is what `toMessage()` reads via `_getType` || raw['type'];
// `role: 'human'` is what the LangGraph server expects in submit payloads.
// Include both so the optimistic local copy projects as a 'user' bubble
// (otherwise toMessage falls through to the 'ai' default and renders the
// user's question as an assistant message).
return { messages: [{ type: 'human', role: 'human', content }], ...(input.state ?? {}) };
}
return input.state ?? {};
return buildSubmitUpdate(input) ?? {};
}

function normalizeSubmitOptions(
Expand All @@ -442,15 +430,45 @@ function normalizeSubmitOptions(
const next = { ...(opts ?? {}) };
delete next.resume;
const command = next.command;
const update = buildSubmitUpdate(input);
const commandUpdate = mergeCommandUpdate(command?.update, update);
return {
...next,
command: {
...command,
resume,
...(commandUpdate === undefined ? {} : { update: commandUpdate }),
},
};
}

function buildSubmitUpdate(input: AgentSubmitInput | null | undefined): Record<string, unknown> | undefined {
if (input == null) return undefined;
if (input.message !== undefined) {
const content = typeof input.message === 'string'
? input.message
: input.message.map((b: ContentBlock) => (b.type === 'text' ? b.text : JSON.stringify(b))).join('');
// `type: 'human'` is what `toMessage()` reads via `_getType` || raw['type'];
// `role: 'human'` is what the LangGraph server expects in submit payloads.
// Include both so the optimistic local copy projects as a 'user' bubble
// (otherwise toMessage falls through to the 'ai' default and renders the
// user's question as an assistant message).
return { messages: [{ type: 'human', role: 'human', content }], ...(input.state ?? {}) };
}
return input.state;
}

function mergeCommandUpdate(
existing: Command['update'] | undefined,
update: Record<string, unknown> | undefined,
): Command['update'] | undefined {
if (update === undefined) return existing;
if (existing == null) return update;
if (isRecord(existing)) return { ...existing, ...update };
if (Array.isArray(existing)) return [...existing, ...Object.entries(update)];
return update;
}

function randomId(): string {
return Math.random().toString(36).slice(2);
}
Expand Down
Loading