diff --git a/README.md b/README.md index 7481d5acf..9d96556ec 100644 --- a/README.md +++ b/README.md @@ -93,7 +93,7 @@ That's it. `chat.messages()` is an Angular Signal. Bind it directly in your temp | Interrupt / human-in-the-loop | `interrupt()` / `interrupts()` | `interrupt` / `interrupts` | | Tool call progress | `toolProgress()` | `toolProgress` | | Tool calls with results | `toolCalls()` | `toolCalls` | -| Branch / history | `branch()` / `history()` | `branch` / `history` | +| Branch / history | `branch()` / `history()` / `experimentalBranchTree()` | `branch` / `history` / `experimental_branchTree` | | Pending run queue | `queue()` | `queue` | | Subagent streaming and lookup helpers | `subagents()` / `activeSubagents()` / `getSubagent()` | `subagents` / `activeSubagents` / helper methods | | Reactive thread switching | `Signal` input | prop | diff --git a/apps/website/content/docs/agent/api/api-docs.json b/apps/website/content/docs/agent/api/api-docs.json index c1bdbdb44..01a7079e4 100644 --- a/apps/website/content/docs/agent/api/api-docs.json +++ b/apps/website/content/docs/agent/api/api-docs.json @@ -78,6 +78,25 @@ } ] }, + { + "name": "getHistory", + "signature": "getHistory(threadId: string, signal: AbortSignal)", + "description": "Load persisted checkpoint history for a thread.", + "params": [ + { + "name": "threadId", + "type": "string", + "description": "", + "optional": false + }, + { + "name": "signal", + "type": "AbortSignal", + "description": "", + "optional": false + } + ] + }, { "name": "joinStream", "signature": "joinStream(threadId: string, runId: string, lastEventId: string | undefined, signal: AbortSignal)", @@ -170,6 +189,18 @@ "description": "", "optional": false }, + { + "name": "history", + "type": "ThreadState[]", + "description": "", + "optional": false + }, + { + "name": "historyCalls", + "type": "string[]", + "description": "", + "optional": false + }, { "name": "joinedRuns", "type": "object[]", @@ -266,6 +297,25 @@ } ] }, + { + "name": "getHistory", + "signature": "getHistory(threadId: string, signal: AbortSignal)", + "description": "Optional: load persisted checkpoint history for a thread.", + "params": [ + { + "name": "threadId", + "type": "string", + "description": "", + "optional": false + }, + { + "name": "signal", + "type": "AbortSignal", + "description": "", + "optional": false + } + ] + }, { "name": "isStreaming", "signature": "isStreaming()", @@ -342,6 +392,72 @@ } ] }, + { + "name": "AgentBranchTree", + "kind": "interface", + "description": "Tree representation of LangGraph checkpoint history for time-travel UIs.", + "properties": [ + { + "name": "items", + "type": "AgentBranchTreeNode | AgentBranchTreeFork[]", + "description": "", + "optional": false + }, + { + "name": "type", + "type": "\"sequence\"", + "description": "", + "optional": false + } + ], + "examples": [] + }, + { + "name": "AgentBranchTreeFork", + "kind": "interface", + "description": "A branch fork where each item is an alternate checkpoint sequence.", + "properties": [ + { + "name": "items", + "type": "AgentBranchTree[]", + "description": "", + "optional": false + }, + { + "name": "type", + "type": "\"fork\"", + "description": "", + "optional": false + } + ], + "examples": [] + }, + { + "name": "AgentBranchTreeNode", + "kind": "interface", + "description": "A checkpoint entry in the experimental branch tree.", + "properties": [ + { + "name": "path", + "type": "string[]", + "description": "", + "optional": false + }, + { + "name": "type", + "type": "\"node\"", + "description": "", + "optional": false + }, + { + "name": "value", + "type": "ThreadState", + "description": "", + "optional": false + } + ], + "examples": [] + }, { "name": "AgentConfig", "kind": "interface", @@ -541,6 +657,12 @@ "description": "", "optional": true }, + { + "name": "getHistory", + "type": "unknown", + "description": "", + "optional": true + }, { "name": "joinStream", "type": "unknown", @@ -649,6 +771,12 @@ "description": "", "optional": false }, + { + "name": "experimentalBranchTree", + "type": "Signal>", + "description": "Experimental branch tree derived from LangGraph checkpoint history.", + "optional": false + }, { "name": "getMessagesMetadata", "type": "object", @@ -875,6 +1003,12 @@ "description": "", "optional": false }, + { + "name": "experimentalBranchTree", + "type": "WritableSignal>", + "description": "Experimental branch tree derived from LangGraph checkpoint history.", + "optional": false + }, { "name": "getMessagesMetadata", "type": "object", diff --git a/apps/website/content/docs/agent/concepts/langgraph-basics.mdx b/apps/website/content/docs/agent/concepts/langgraph-basics.mdx index 2e7bd5bc2..6b573de35 100644 --- a/apps/website/content/docs/agent/concepts/langgraph-basics.mdx +++ b/apps/website/content/docs/agent/concepts/langgraph-basics.mdx @@ -319,8 +319,10 @@ agent.isLoading() // Signal — is the agent running? agent.interrupt() // Signal — agent is paused // Debugging -agent.history() // Signal — checkpoint timeline +agent.history() // Signal — runtime-neutral timeline +agent.langGraphHistory() // Signal — raw LangGraph checkpoints agent.branch() // Signal — time-travel branch +agent.experimentalBranchTree() // Signal> — branch tree agent.toolCalls() // Signal — tool results agent.toolProgress() // Signal — active tool execution diff --git a/apps/website/content/docs/agent/concepts/state-management.mdx b/apps/website/content/docs/agent/concepts/state-management.mdx index f6d4caa8c..18f6fb8c5 100644 --- a/apps/website/content/docs/agent/concepts/state-management.mdx +++ b/apps/website/content/docs/agent/concepts/state-management.mdx @@ -355,8 +355,10 @@ const agent = agent({ }); // Read checkpoint history for time-travel UI -const history = agent.history(); // Signal -const branch = agent.branch(); // Signal — active branch ID +const history = agent.history(); // Signal +const rawHistory = agent.langGraphHistory(); // Signal +const branch = agent.branch(); // Signal — active branch ID +const branchTree = agent.experimentalBranchTree(); ``` For full checkpoint and time-travel patterns, see the [Persistence guide](/docs/guides/persistence) and [Time Travel guide](/docs/guides/time-travel). diff --git a/apps/website/content/docs/agent/getting-started/introduction.mdx b/apps/website/content/docs/agent/getting-started/introduction.mdx index d8e503a9a..738c9f797 100644 --- a/apps/website/content/docs/agent/getting-started/introduction.mdx +++ b/apps/website/content/docs/agent/getting-started/introduction.mdx @@ -20,7 +20,8 @@ chat.messages() // Signal chat.status() // Signal<'idle' | 'loading' | 'resolved' | 'error'> chat.error() // Signal chat.interrupt() // Signal -chat.history() // Signal +chat.history() // Signal +chat.langGraphHistory() // Signal ``` No RxJS. No manual subscriptions. No async pipes. Just Signals that work with Angular's `OnPush` change detection out of the box. diff --git a/apps/website/content/docs/agent/guides/time-travel.mdx b/apps/website/content/docs/agent/guides/time-travel.mdx index bb97a05a8..8c84a1ff6 100644 --- a/apps/website/content/docs/agent/guides/time-travel.mdx +++ b/apps/website/content/docs/agent/guides/time-travel.mdx @@ -74,6 +74,7 @@ export class HistoryViewerComponent { readonly agent = this.agentService.agent; readonly checkpoints = computed(() => this.agent.history()); + readonly rawCheckpoints = computed(() => this.agent.langGraphHistory()); readonly checkpointCount = computed(() => this.agent.history().length); readonly activeIndex = computed(() => @@ -81,10 +82,11 @@ export class HistoryViewerComponent { ); fork(index: number) { - const checkpoint = this.checkpoints()[index]; + const checkpoint = this.rawCheckpoints()[index]?.checkpoint; + if (!checkpoint) return; this.agent.submit( { messages: [{ role: 'user', content: 'Try a different approach' }] }, - { checkpoint: checkpoint.checkpoint } + { checkpoint } ); } @@ -99,7 +101,7 @@ export class HistoryViewerComponent { ## Browsing execution history -The `history()` signal contains an array of `ThreadState` checkpoints ordered from oldest to newest. Each checkpoint captures the complete agent state at that point in execution, including messages, intermediate results, and any custom state fields. +The `history()` signal contains runtime-neutral `AgentCheckpoint` entries for the thread. For LangGraph-specific checkpoint metadata, `langGraphHistory()` exposes the raw `ThreadState[]`. The framework loads this history with `threads.getHistory()` when a thread is selected and refreshes it after a run completes. ```typescript const agent = agent({ @@ -107,10 +109,13 @@ const agent = agent({ threadId: signal(threadId), }); -// Full execution timeline +// Runtime-neutral execution timeline const checkpoints = computed(() => agent.history()); const checkpointCount = computed(() => agent.history().length); +// Raw LangGraph checkpoints +const rawCheckpoints = computed(() => agent.langGraphHistory()); + // Access a specific checkpoint const latestCheckpoint = computed(() => { const history = agent.history(); @@ -118,7 +123,7 @@ const latestCheckpoint = computed(() => { }); ``` -Each `ThreadState` entry exposes `checkpoint`, `metadata`, `created_at`, and the full `values` snapshot, giving you complete visibility into every step of execution. +Each runtime-neutral checkpoint exposes `id`, `label`, and `values`. Each raw `ThreadState` entry exposes `checkpoint`, `parent_checkpoint`, `metadata`, `created_at`, and the full `values` snapshot, giving you complete visibility into every step of execution. ## Forking from a checkpoint @@ -126,36 +131,35 @@ Submit with a specific checkpoint to branch execution from an earlier state. Thi ```typescript forkFromCheckpoint(index: number) { - const checkpoint = this.agent.history()[index]; + const checkpoint = this.agent.langGraphHistory()[index]?.checkpoint; + if (!checkpoint) return; this.agent.submit( { messages: [{ role: 'user', content: 'Try a different approach' }] }, - { checkpoint: checkpoint.checkpoint } + { checkpoint } ); } // Fork with a completely different input retryWithAlternative(index: number, newInput: string) { - const checkpoint = this.agent.history()[index]; + const checkpoint = this.agent.langGraphHistory()[index]?.checkpoint; + if (!checkpoint) return; this.agent.submit( { messages: [{ role: 'user', content: newInput }] }, - { checkpoint: checkpoint.checkpoint } + { checkpoint } ); } ``` ## Branch navigation -Use `branch()` and `setBranch()` to navigate between execution branches. Branches are automatically created when you fork from a checkpoint. +Use `branch()`, `setBranch()`, and `experimentalBranchTree()` to navigate between execution branches. Branches are automatically created when you fork from a checkpoint, and the branch tree is derived from raw `ThreadState.parent_checkpoint` relationships. ```typescript // Current branch identifier const activeBranch = computed(() => agent.branch()); -// All available branches (if exposed by your graph) -const allBranches = computed(() => agent.history() - .map(s => s.metadata?.branch) - .filter(Boolean) -); +// Full branch tree for custom time-travel UIs +const branchTree = computed(() => agent.experimentalBranchTree()); // Switch to a different branch selectBranch(branchId: string) { @@ -184,15 +188,17 @@ export class HistoryViewerComponent { readonly agent = this.agentService.agent; readonly checkpoints = computed(() => this.agent.history()); + readonly rawCheckpoints = computed(() => this.agent.langGraphHistory()); readonly activeIndex = computed(() => this.checkpoints().length - 1 ); fork(index: number) { - const checkpoint = this.checkpoints()[index]; + const checkpoint = this.rawCheckpoints()[index]?.checkpoint; + if (!checkpoint) return; this.agent.submit( { messages: [{ role: 'user', content: 'Try a different approach' }] }, - { checkpoint: checkpoint.checkpoint } + { checkpoint } ); } @@ -205,10 +211,10 @@ export class HistoryViewerComponent { ```html
    - @for (cp of checkpoints(); track cp.checkpoint.id; let i = $index) { + @for (cp of checkpoints(); track cp.id; let i = $index) {
  • Step {{ i + 1 }} - {{ formatTime(cp.created_at) }} + {{ cp.label ?? cp.id }}
  • } @@ -258,6 +264,7 @@ export class ReplayComponent { readonly agent = inject(AgentService).agent; readonly history = computed(() => this.agent.history()); + readonly rawHistory = computed(() => this.agent.langGraphHistory()); readonly canUndo = computed(() => this.history().length > 1); undo() { @@ -265,17 +272,19 @@ export class ReplayComponent { if (history.length < 2) return; // Go back one step - const previousCheckpoint = history[history.length - 2]; + const previousCheckpoint = this.rawHistory()[history.length - 2]?.checkpoint; + if (!previousCheckpoint) return; this.agent.submit(undefined, { - checkpoint: previousCheckpoint.checkpoint, + checkpoint: previousCheckpoint, }); } replayWith(index: number, newMessage: string) { - const checkpoint = this.history()[index]; + const checkpoint = this.rawHistory()[index]?.checkpoint; + if (!checkpoint) return; this.agent.submit( { messages: [{ role: 'user', content: newMessage }] }, - { checkpoint: checkpoint.checkpoint } + { checkpoint } ); } } diff --git a/docs/limitations.md b/docs/limitations.md index c3948253c..162c7794b 100644 --- a/docs/limitations.md +++ b/docs/limitations.md @@ -51,17 +51,3 @@ equivalent. `agent()` is called once. No behavioral impact. from ported test patterns. --- - -## 4. `experimental_branchTree` - -**React behavior:** `useStream()` exposes `experimental_branchTree` — a -`Sequence` for visualizing the full branch tree of a thread. - -**Angular behavior:** This feature is marked experimental in the React -SDK and depends on internal tree-diffing utilities not exported from -`@langchain/langgraph-sdk/ui`. It is not implemented in v1. - -**Workaround:** Use `branch` (Signal) and `history` -(Signal) to reconstruct branch relationships manually. - ---- diff --git a/libs/langgraph/src/lib/agent.fn.spec.ts b/libs/langgraph/src/lib/agent.fn.spec.ts index b57af23f7..bb36bc658 100644 --- a/libs/langgraph/src/lib/agent.fn.spec.ts +++ b/libs/langgraph/src/lib/agent.fn.spec.ts @@ -5,6 +5,7 @@ import type { AIMessage as CoreAIMessage } from '@langchain/core/messages'; import { agent } from './agent.fn'; import { MockAgentTransport } from './transport/mock-stream.transport'; import type { StreamEvent } from './agent.types'; +import type { ThreadState } from '@langchain/langgraph-sdk'; function withInjectionContext(fn: () => T): T { let result!: T; @@ -12,6 +13,33 @@ function withInjectionContext(fn: () => T): T { return result; } +function threadState( + checkpointId: string, + parentCheckpointId: string | null = null, +): ThreadState> { + return { + values: { messages: [checkpointId] }, + next: [], + checkpoint: { + thread_id: 'thread-1', + checkpoint_ns: '', + checkpoint_id: checkpointId, + checkpoint_map: null, + }, + metadata: null, + created_at: '2026-05-02T00:00:00.000Z', + parent_checkpoint: parentCheckpointId + ? { + thread_id: 'thread-1', + checkpoint_ns: '', + checkpoint_id: parentCheckpointId, + checkpoint_map: null, + } + : null, + tasks: [], + }; +} + describe('agent', () => { beforeEach(() => TestBed.configureTestingModule({})); @@ -181,6 +209,32 @@ describe('agent', () => { expect(Array.isArray(rawHist)).toBe(true); }); + it('experimentalBranchTree() exposes a branch tree derived from LangGraph history', async () => { + const root = threadState('root'); + const left = threadState('left', 'root'); + const right = threadState('right', 'root'); + const transport = new MockAgentTransport(); + transport.history = [root, left, right]; + + const ref = withInjectionContext(() => + agent({ apiUrl: '', assistantId: 'a', transport, threadId: 'thread-1', throttle: false }) + ); + await new Promise(r => setTimeout(r, 20)); + + const tree = ref.experimentalBranchTree(); + expect(tree.type).toBe('sequence'); + expect(tree.items[0]).toEqual({ type: 'node', value: root, path: [] }); + const fork = tree.items[1]; + expect(fork?.type).toBe('fork'); + if (fork?.type !== 'fork') throw new Error('Expected fork node'); + expect(fork.items.map(sequence => sequence.items[0])).toEqual( + expect.arrayContaining([ + { type: 'node', value: left, path: ['left'] }, + { type: 'node', value: right, path: ['right'] }, + ]), + ); + }); + it('messages() translates AIMessage role to "assistant"', async () => { const transport = new MockAgentTransport(); const ref = withInjectionContext(() => diff --git a/libs/langgraph/src/lib/agent.fn.ts b/libs/langgraph/src/lib/agent.fn.ts index e8c4379c3..5a3e226f4 100644 --- a/libs/langgraph/src/lib/agent.fn.ts +++ b/libs/langgraph/src/lib/agent.fn.ts @@ -42,6 +42,7 @@ import { import type { ThreadState, ToolProgress } from '@langchain/langgraph-sdk'; import type { MessageMetadata } from '@langchain/langgraph-sdk/ui'; import { createStreamManagerBridge } from './internals/stream-manager.bridge'; +import { buildBranchTree } from './internals/branch-tree'; /** * Creates a streaming resource connected to a LangGraph agent. @@ -208,6 +209,9 @@ export function agent< const historyNeutral = computed(() => historySig().map(toCheckpoint), ); + const experimentalBranchTree = computed(() => + buildBranchTree(historySig() as ThreadState[]), + ); const events$ = buildEvents$(customSig); @@ -234,6 +238,7 @@ export function agent< langGraphInterrupts: interruptsSig, langGraphToolCalls: rawToolCalls, langGraphHistory: historySig, + experimentalBranchTree, // ── Other AgentRef fields preserved ────────────────────────────────── value: value as Signal, diff --git a/libs/langgraph/src/lib/agent.types.ts b/libs/langgraph/src/lib/agent.types.ts index fa5c288d0..7f555ccb8 100644 --- a/libs/langgraph/src/lib/agent.types.ts +++ b/libs/langgraph/src/lib/agent.types.ts @@ -109,6 +109,25 @@ export interface AgentQueue { clear: () => Promise; } +/** A checkpoint entry in the experimental branch tree. */ +export interface AgentBranchTreeNode { + type: 'node'; + value: ThreadState; + path: string[]; +} + +/** A branch fork where each item is an alternate checkpoint sequence. */ +export interface AgentBranchTreeFork { + type: 'fork'; + items: AgentBranchTree[]; +} + +/** Tree representation of LangGraph checkpoint history for time-travel UIs. */ +export interface AgentBranchTree { + type: 'sequence'; + items: Array | AgentBranchTreeFork>; +} + /** A custom event emitted by the LangGraph backend via adispatch_custom_event(). */ export interface CustomStreamEvent { /** Event name set by the backend (e.g., 'state_update'). */ @@ -149,6 +168,12 @@ export interface AgentTransport { runId: string, signal: AbortSignal, ): Promise; + + /** Optional: load persisted checkpoint history for a thread. */ + getHistory?( + threadId: string, + signal: AbortSignal, + ): Promise; } // ── Options ────────────────────────────────────────────────────────────────── @@ -221,6 +246,9 @@ export interface LangGraphAgent[]>; + /** Experimental branch tree derived from LangGraph checkpoint history. */ + experimentalBranchTree: Signal>; + // ── AgentRef fields preserved on the unified surface ───────────────────── /** Current agent state values (raw, typed per the type parameter T). */ diff --git a/libs/langgraph/src/lib/internals/branch-tree.spec.ts b/libs/langgraph/src/lib/internals/branch-tree.spec.ts new file mode 100644 index 000000000..c73914cf6 --- /dev/null +++ b/libs/langgraph/src/lib/internals/branch-tree.spec.ts @@ -0,0 +1,89 @@ +import { describe, expect, it } from 'vitest'; +import type { ThreadState } from '@langchain/langgraph-sdk'; +import { buildBranchTree } from './branch-tree'; + +function state( + checkpointId: string, + parentCheckpointId: string | null = null, +): ThreadState<{ messages: string[] }> { + return { + values: { messages: [checkpointId] }, + next: [], + checkpoint: { + thread_id: 'thread-1', + checkpoint_ns: '', + checkpoint_id: checkpointId, + checkpoint_map: null, + }, + metadata: null, + created_at: `2026-05-02T00:00:00.000Z`, + parent_checkpoint: parentCheckpointId + ? { + thread_id: 'thread-1', + checkpoint_ns: '', + checkpoint_id: parentCheckpointId, + checkpoint_map: null, + } + : null, + tasks: [], + }; +} + +describe('buildBranchTree', () => { + it('returns a linear sequence when history does not branch', () => { + const root = state('root'); + const child = state('child', 'root'); + + expect(buildBranchTree([root, child])).toEqual({ + type: 'sequence', + items: [ + { type: 'node', value: root, path: [] }, + { type: 'node', value: child, path: [] }, + ], + }); + }); + + it('creates fork sequences with branch paths for sibling checkpoints', () => { + const root = state('root'); + const left = state('left', 'root'); + const right = state('right', 'root'); + const leftChild = state('left-child', 'left'); + + const tree = buildBranchTree([root, left, right, leftChild]); + const fork = tree.items[1]; + + expect(tree.items[0]).toEqual({ type: 'node', value: root, path: [] }); + expect(fork?.type).toBe('fork'); + if (fork?.type !== 'fork') throw new Error('Expected fork node'); + + const branchHeads = fork.items.map(sequence => sequence.items[0]); + expect(branchHeads).toEqual( + expect.arrayContaining([ + { type: 'node', value: left, path: ['left'] }, + { type: 'node', value: right, path: ['right'] }, + ]), + ); + + const leftSequence = fork.items.find(sequence => + sequence.items.some(item => item.type === 'node' && item.value === left), + ); + expect(leftSequence?.items).toContainEqual({ + type: 'node', + value: leftChild, + path: ['left'], + }); + }); + + it('builds from an orphaned parent when history is partial', () => { + const child = state('child', 'missing-parent'); + const grandchild = state('grandchild', 'child'); + + expect(buildBranchTree([child, grandchild])).toEqual({ + type: 'sequence', + items: [ + { type: 'node', value: child, path: [] }, + { type: 'node', value: grandchild, path: [] }, + ], + }); + }); +}); diff --git a/libs/langgraph/src/lib/internals/branch-tree.ts b/libs/langgraph/src/lib/internals/branch-tree.ts new file mode 100644 index 000000000..d31d327c3 --- /dev/null +++ b/libs/langgraph/src/lib/internals/branch-tree.ts @@ -0,0 +1,120 @@ +// SPDX-License-Identifier: MIT +import type { ThreadState } from '@langchain/langgraph-sdk'; +import type { AgentBranchTree, AgentBranchTreeFork } from '../agent.types'; + +const ROOT_ID = '$'; + +/** + * Builds a branch-aware checkpoint tree from LangGraph thread history. + * + * This mirrors the small SDK UI branching data shape without importing the + * SDK UI runtime helper, keeping Angular bundles independent of React UI code. + */ +export function buildBranchTree(history: ThreadState[] = []): AgentBranchTree { + if (history.length <= 1) { + return { + type: 'sequence', + items: history.map(value => ({ type: 'node', value, path: [] })), + }; + } + + const nodeIds = new Set(); + const childrenMap: Record[]> = {}; + + for (const state of history) { + const parentId = state.parent_checkpoint?.checkpoint_id ?? ROOT_ID; + childrenMap[parentId] ??= []; + childrenMap[parentId].push(state); + + const checkpointId = state.checkpoint?.checkpoint_id; + if (checkpointId != null) { + nodeIds.add(checkpointId); + } + } + + const orphanRoot = findLatestOrphanRoot(childrenMap, nodeIds); + if (orphanRoot != null) { + childrenMap[ROOT_ID] = childrenMap[orphanRoot]; + } + + const rootSequence: AgentBranchTree = { type: 'sequence', items: [] }; + const queue: Array<{ id: string; sequence: AgentBranchTree; path: string[] }> = [ + { id: ROOT_ID, sequence: rootSequence, path: [] }, + ]; + const visited = new Set(); + + while (queue.length > 0) { + const task = queue.shift(); + if (!task || visited.has(task.id)) continue; + visited.add(task.id); + + const children = childrenMap[task.id]; + if (!children?.length) continue; + + let fork: AgentBranchTreeFork | undefined; + if (children.length > 1) { + fork = { type: 'fork', items: [] }; + task.sequence.items.push(fork); + } + + for (const value of children) { + const id = value.checkpoint?.checkpoint_id; + if (id == null) continue; + + let sequence = task.sequence; + let path = task.path; + if (fork != null) { + sequence = { type: 'sequence', items: [] }; + fork.items.unshift(sequence); + path = [...task.path, id]; + } + + sequence.items.push({ type: 'node', value, path }); + queue.push({ id, sequence, path }); + } + } + + return rootSequence; +} + +function findLatestOrphanRoot( + childrenMap: Record[]>, + nodeIds: Set, +): string | undefined { + if (childrenMap[ROOT_ID] != null) return undefined; + + return Object.keys(childrenMap) + .filter(parentId => !nodeIds.has(parentId)) + .map(parentId => ({ + parentId, + lastId: findLatestDescendantId(parentId, childrenMap), + })) + .sort((left, right) => left.lastId.localeCompare(right.lastId)) + .at(-1)?.parentId; +} + +function findLatestDescendantId( + parentId: string, + childrenMap: Record[]>, +): string { + const queue = [parentId]; + const seen = new Set(); + let latestId = parentId; + + while (queue.length > 0) { + const current = queue.shift(); + if (!current || seen.has(current)) continue; + seen.add(current); + + for (const child of childrenMap[current] ?? []) { + const childId = child.checkpoint?.checkpoint_id; + if (childId == null) continue; + if (childId.localeCompare(latestId) > 0) { + latestId = childId; + } + queue.push(childId); + } + } + + return latestId; +} diff --git a/libs/langgraph/src/lib/internals/stream-manager.bridge.spec.ts b/libs/langgraph/src/lib/internals/stream-manager.bridge.spec.ts index 95a881a04..3e3c82b87 100644 --- a/libs/langgraph/src/lib/internals/stream-manager.bridge.spec.ts +++ b/libs/langgraph/src/lib/internals/stream-manager.bridge.spec.ts @@ -3,6 +3,7 @@ import { BehaviorSubject, Subject } from 'rxjs'; import { createStreamManagerBridge } from './stream-manager.bridge'; import { MockAgentTransport } from '../transport/mock-stream.transport'; import { ResourceStatus, AgentTransport, StreamSubjects, CustomStreamEvent, StreamEvent } from '../agent.types'; +import type { ThreadState } from '@langchain/langgraph-sdk'; import { of } from 'rxjs'; function makeSubjects(): StreamSubjects> { @@ -30,6 +31,33 @@ function makeSubjects(): StreamSubjects> { }; } +function makeThreadState( + checkpointId: string, + parentCheckpointId: string | null = null, +): ThreadState> { + return { + values: { checkpointId }, + next: [], + checkpoint: { + thread_id: 'thread-1', + checkpoint_ns: '', + checkpoint_id: checkpointId, + checkpoint_map: null, + }, + metadata: null, + created_at: `2026-05-02T00:00:0${checkpointId.length}.000Z`, + parent_checkpoint: parentCheckpointId + ? { + thread_id: 'thread-1', + checkpoint_ns: '', + checkpoint_id: parentCheckpointId, + checkpoint_map: null, + } + : null, + tasks: [], + }; +} + describe('createStreamManagerBridge', () => { it('creates a bridge with submit and stop methods', () => { const transport = new MockAgentTransport(); @@ -61,6 +89,73 @@ describe('createStreamManagerBridge', () => { destroy$.next(); }); + it('loads history when initialized with a thread id', async () => { + const history = [makeThreadState('checkpoint-1')]; + const historyCalls: string[] = []; + const transport: AgentTransport & { + getHistory: (threadId: string, signal: AbortSignal) => Promise>[]>; + } = { + async *stream() { + yield* []; + }, + async getHistory(threadId, signal) { + expect(signal.aborted).toBe(false); + historyCalls.push(threadId); + return history; + }, + }; + const subjects = makeSubjects(); + const destroy$ = new Subject(); + + createStreamManagerBridge({ + options: { apiUrl: '', assistantId: 'test', transport }, + subjects, + threadId$: of('thread-1'), + destroy$: destroy$.asObservable(), + }); + await new Promise(r => setTimeout(r, 10)); + + expect(historyCalls).toEqual(['thread-1']); + expect(subjects.history$.value).toBe(history); + expect(subjects.isThreadLoading$.value).toBe(false); + destroy$.next(); + }); + + it('refreshes history after a stream completes', async () => { + const firstHistory = [makeThreadState('checkpoint-1')]; + const secondHistory = [ + makeThreadState('checkpoint-1'), + makeThreadState('checkpoint-2', 'checkpoint-1'), + ]; + let historyCalls = 0; + const transport: AgentTransport & { + getHistory: (threadId: string, signal: AbortSignal) => Promise>[]>; + } = { + async *stream() { + yield { type: 'values', values: { answer: 42 } }; + }, + async getHistory() { + historyCalls += 1; + return historyCalls === 1 ? firstHistory : secondHistory; + }, + }; + const subjects = makeSubjects(); + const destroy$ = new Subject(); + const bridge = createStreamManagerBridge({ + options: { apiUrl: '', assistantId: 'test', transport }, + subjects, + threadId$: of('thread-1'), + destroy$: destroy$.asObservable(), + }); + await new Promise(r => setTimeout(r, 10)); + + await bridge.submit({ messages: [] }); + + expect(historyCalls).toBe(2); + expect(subjects.history$.value).toBe(secondHistory); + destroy$.next(); + }); + it('exposes enqueue submissions through queue$ without starting a second stream immediately', async () => { const transport = new MockAgentTransport(); const subjects = makeSubjects(); diff --git a/libs/langgraph/src/lib/internals/stream-manager.bridge.ts b/libs/langgraph/src/lib/internals/stream-manager.bridge.ts index cf6edbeda..fda4ab872 100644 --- a/libs/langgraph/src/lib/internals/stream-manager.bridge.ts +++ b/libs/langgraph/src/lib/internals/stream-manager.bridge.ts @@ -22,7 +22,7 @@ import { isSubagentNamespace, } from './subagent-tracker'; import type { BaseMessage } from '@langchain/core/messages'; -import type { Interrupt, Message as LangGraphMessage, ToolCallWithResult, ToolProgress } from '@langchain/langgraph-sdk'; +import type { Interrupt, Message as LangGraphMessage, ThreadState, ToolCallWithResult, ToolProgress } from '@langchain/langgraph-sdk'; export interface StreamManagerBridgeOptions { options: AgentOptions; @@ -56,6 +56,7 @@ export function createStreamManagerBridge(); const queuedRuns: AgentQueueEntry[] = []; @@ -66,6 +67,7 @@ export function createStreamManagerBridge { + abortController?.abort(); + historyAbortController?.abort(); + }); + + async function refreshHistory(): Promise { + const getHistory = transport.getHistory?.bind(transport); + if (!currentThreadId || !getHistory) return; + + historyAbortController?.abort(); + const controller = new AbortController(); + historyAbortController = controller; + const threadId = currentThreadId; + subjects.isThreadLoading$.next(true); + + try { + const history = await getHistory(threadId, controller.signal); + if (!controller.signal.aborted && currentThreadId === threadId) { + subjects.history$.next(history as ThreadState[]); + } + } catch (err) { + if (!controller.signal.aborted && (err as Error)?.name !== 'AbortError') { + subjects.error$.next(err); + } + } finally { + if (historyAbortController === controller) { + historyAbortController = null; + subjects.isThreadLoading$.next(false); + } + } + } + function publishQueue(): void { subjects.queue$.next(createQueueSnapshot()); } @@ -198,6 +233,7 @@ export function createStreamManagerBridge { branch: WritableSignal; history: WritableSignal; langGraphHistory: WritableSignal[]>; + experimentalBranchTree: WritableSignal>; isThreadLoading: WritableSignal; subagents: WritableSignal>; activeSubagents: WritableSignal; @@ -88,6 +90,7 @@ export function mockLangGraphAgent( const branch$ = signal(''); const history$ = signal([]); const langGraphHistory$ = signal[]>([]); + const experimentalBranchTree$ = signal>({ type: 'sequence', items: [] }); const isThreadLoading$ = signal(initial.isThreadLoading ?? false); const subagents$ = signal>(new Map()); const activeSubagents$ = signal([]); @@ -120,6 +123,7 @@ export function mockLangGraphAgent( langGraphInterrupts: langGraphInterrupts$, langGraphToolCalls: langGraphToolCalls$, langGraphHistory: langGraphHistory$, + experimentalBranchTree: experimentalBranchTree$, // ── Other AgentRef fields preserved ────────────────────────────────── value: value$, diff --git a/libs/langgraph/src/lib/transport/fetch-stream.transport.spec.ts b/libs/langgraph/src/lib/transport/fetch-stream.transport.spec.ts index a7d8dae60..408f09ff0 100644 --- a/libs/langgraph/src/lib/transport/fetch-stream.transport.spec.ts +++ b/libs/langgraph/src/lib/transport/fetch-stream.transport.spec.ts @@ -3,6 +3,7 @@ import { FetchStreamTransport } from './fetch-stream.transport'; const mocks = vi.hoisted(() => ({ threadsCreate: vi.fn(), + threadsGetHistory: vi.fn(), runsStream: vi.fn(), runsCreate: vi.fn(), runsCancel: vi.fn(), @@ -11,6 +12,7 @@ const mocks = vi.hoisted(() => ({ return { threads: { create: mocks.threadsCreate, + getHistory: mocks.threadsGetHistory, }, runs: { stream: mocks.runsStream, @@ -37,6 +39,7 @@ async function collect(iter: AsyncIterable): Promise { describe('FetchStreamTransport', () => { beforeEach(() => { mocks.threadsCreate.mockReset(); + mocks.threadsGetHistory.mockReset(); mocks.runsStream.mockReset(); mocks.runsCreate.mockReset(); mocks.runsCancel.mockReset(); @@ -251,4 +254,31 @@ describe('FetchStreamTransport', () => { expect.objectContaining({ signal: expect.any(AbortSignal) }), ); }); + + it('loads thread history through the LangGraph SDK client', async () => { + const history = [ + { + values: { messages: [] }, + next: [], + checkpoint: { + thread_id: 'thread-1', + checkpoint_ns: '', + checkpoint_id: 'checkpoint-1', + checkpoint_map: null, + }, + metadata: null, + created_at: '2026-05-02T00:00:00.000Z', + parent_checkpoint: null, + tasks: [], + }, + ]; + mocks.threadsGetHistory.mockResolvedValue(history); + const signal = new AbortController().signal; + + const transport = new FetchStreamTransport('http://example.test'); + const result = await transport.getHistory('thread-1', signal); + + expect(mocks.threadsGetHistory).toHaveBeenCalledWith('thread-1', { signal }); + expect(result).toBe(history); + }); }); diff --git a/libs/langgraph/src/lib/transport/fetch-stream.transport.ts b/libs/langgraph/src/lib/transport/fetch-stream.transport.ts index 7edc2d99e..202bae515 100644 --- a/libs/langgraph/src/lib/transport/fetch-stream.transport.ts +++ b/libs/langgraph/src/lib/transport/fetch-stream.transport.ts @@ -1,6 +1,6 @@ // SPDX-License-Identifier: MIT import { Client } from '@langchain/langgraph-sdk'; -import type { StreamMode } from '@langchain/langgraph-sdk'; +import type { StreamMode, ThreadState } from '@langchain/langgraph-sdk'; import { AgentQueueEntry, AgentTransport, StreamEvent } from '../agent.types'; /** @@ -114,6 +114,11 @@ export class FetchStreamTransport implements AgentTransport { async cancelRun(threadId: string, runId: string, signal: AbortSignal): Promise { await this.client.runs.cancel(threadId, runId, false, 'interrupt', { signal }); } + + /** Load persisted checkpoint history for a thread. */ + async getHistory(threadId: string, signal: AbortSignal): Promise { + return this.client.threads.getHistory(threadId, { signal }); + } } function normalizeSdkEvent(type: StreamEvent['type'], data: unknown): StreamEvent { diff --git a/libs/langgraph/src/lib/transport/mock-stream.transport.ts b/libs/langgraph/src/lib/transport/mock-stream.transport.ts index 460d4d9d6..d2f6ad284 100644 --- a/libs/langgraph/src/lib/transport/mock-stream.transport.ts +++ b/libs/langgraph/src/lib/transport/mock-stream.transport.ts @@ -1,5 +1,6 @@ // SPDX-License-Identifier: MIT import { AgentQueueEntry, AgentTransport, StreamEvent } from '../agent.types'; +import type { ThreadState } from '@langchain/langgraph-sdk'; /** * Test transport for deterministic agent testing without a real LangGraph server. @@ -16,6 +17,8 @@ import { AgentQueueEntry, AgentTransport, StreamEvent } from '../agent.types'; * ``` */ export class MockAgentTransport implements AgentTransport { + history: ThreadState[] = []; + readonly historyCalls: string[] = []; readonly createdQueuedRuns: AgentQueueEntry[] = []; readonly cancelledRuns: Array<{ threadId: string; runId: string }> = []; readonly joinedRuns: Array<{ threadId: string; runId: string }> = []; @@ -117,6 +120,12 @@ export class MockAgentTransport implements AgentTransport { this.cancelledRuns.push({ threadId, runId }); } + async getHistory(threadId: string, signal: AbortSignal): Promise { + void signal; + this.historyCalls.push(threadId); + return this.history; + } + async *joinStream( threadId: string, runId: string, diff --git a/libs/langgraph/src/public-api.ts b/libs/langgraph/src/public-api.ts index 8567342a2..3dbfb5e11 100644 --- a/libs/langgraph/src/public-api.ts +++ b/libs/langgraph/src/public-api.ts @@ -9,6 +9,9 @@ export type { AgentConfig } from './lib/agent.provider'; // Public types export type { AgentOptions, + AgentBranchTree, + AgentBranchTreeFork, + AgentBranchTreeNode, AgentQueue, AgentQueueEntry, LangGraphAgent,