diff --git a/libs/langgraph/package.json b/libs/langgraph/package.json index 79d6ae532..ce48a6f0e 100644 --- a/libs/langgraph/package.json +++ b/libs/langgraph/package.json @@ -1,6 +1,6 @@ { "name": "@ngaf/langgraph", - "version": "0.0.4", + "version": "0.0.6", "peerDependencies": { "@ngaf/chat": "*", "@ngaf/licensing": "*", diff --git a/libs/langgraph/src/lib/internals/stream-manager.bridge.ts b/libs/langgraph/src/lib/internals/stream-manager.bridge.ts index f67ff37b5..21d305338 100644 --- a/libs/langgraph/src/lib/internals/stream-manager.bridge.ts +++ b/libs/langgraph/src/lib/internals/stream-manager.bridge.ts @@ -337,7 +337,9 @@ export function createStreamManagerBridge 0) { // Defensive: only sync when state carries messages. An empty // values payload shouldn't wipe the UI mid-stream. - if (options.toMessage) { - subjects.messages$.next(stateMessages.map(options.toMessage)); - } else { - subjects.messages$.next(stateMessages as BaseMessage[]); - } + const projected = options.toMessage + ? stateMessages.map(options.toMessage) + : (stateMessages as BaseMessage[]); + // Preserve existing message ids when content matches. Server- + // echoed human messages and final AI messages often arrive with + // different ids than the optimistic / partial we already have — + // letting that id swap reach chat-message-list's track-by-id + // tears down DOM mid-stream. preserveIds maps new messages to + // existing-id-by-content where possible. + subjects.messages$.next(preserveIds(subjects.messages$.value, projected)); syncSubagentsFromMessages(stateMessages as BaseMessage[]); subagentManager.reconstructFromMessages( stateMessages as BaseMessage[], @@ -670,9 +677,23 @@ function mergeMessages(existing: BaseMessage[], incoming: BaseMessage[]): BaseMe const merged = [...existing]; for (const msg of incoming) { const id = (msg as unknown as Record)['id']; - const idx = id ? merged.findIndex(m => (m as unknown as Record)['id'] === id) : -1; + let idx = id ? merged.findIndex(m => (m as unknown as Record)['id'] === id) : -1; + // Fallback: match by (role, content) when ids differ. This is the path + // that fires when the server echoes back our optimistic human message + // with a server-assigned id, or when partial AI tokens carry a chunk + // id but the final canonical message has a run id. Preserving the + // existing id here keeps track-by-id stable in the chat list and + // prevents DOM teardown + animation restarts mid-stream. + if (idx < 0) { + idx = findContentMatch(merged, msg); + } if (idx >= 0) { - merged[idx] = msg; + const existingId = (merged[idx] as unknown as Record)['id']; + // Keep the *existing* id so downstream track-by-id sees stable identity. + // The replacement carries the latest content + metadata. + merged[idx] = existingId + ? ({ ...(msg as object), id: existingId } as BaseMessage) + : msg; } else { merged.push(msg); } @@ -680,6 +701,69 @@ function mergeMessages(existing: BaseMessage[], incoming: BaseMessage[]): BaseMe return merged; } +/** + * Replace the incoming messages' ids with the existing array's ids whenever + * (role, content) matches positionally and the existing id differs. Keeps + * track-by-id stable across server echoes and final-id swaps. + */ +function preserveIds(existing: BaseMessage[], incoming: BaseMessage[]): BaseMessage[] { + if (existing.length === 0) return incoming; + const usedExisting = new Set(); + return incoming.map((msg, i) => { + const inRaw = msg as unknown as Record; + const inId = inRaw['id']; + // First try same-position match (the dominant case). + let matchIdx = -1; + if (i < existing.length && !usedExisting.has(i) && sameRoleAndContent(existing[i], msg)) { + matchIdx = i; + } else { + // Fallback: any unused existing message with matching role+content. + matchIdx = existing.findIndex((m, j) => !usedExisting.has(j) && sameRoleAndContent(m, msg)); + } + if (matchIdx < 0) return msg; + usedExisting.add(matchIdx); + const existingId = (existing[matchIdx] as unknown as Record)['id']; + if (!existingId || existingId === inId) return msg; + return { ...(msg as object), id: existingId } as BaseMessage; + }); +} + +function sameRoleAndContent(a: BaseMessage, b: BaseMessage): boolean { + const aType = typeof a._getType === 'function' ? a._getType() : (a as unknown as Record)['type']; + const bType = typeof b._getType === 'function' ? b._getType() : (b as unknown as Record)['type']; + if (aType !== bType) return false; + const aContent = typeof a.content === 'string' ? a.content : JSON.stringify(a.content); + const bContent = typeof b.content === 'string' ? b.content : JSON.stringify(b.content); + if (aContent === bContent) return true; + // For AI messages we accept prefix relationships (streaming → final). + if (aType === 'ai' && typeof aContent === 'string' && typeof bContent === 'string') { + return aContent.length > 0 && (bContent.startsWith(aContent) || aContent.startsWith(bContent)); + } + return false; +} + +function findContentMatch(merged: BaseMessage[], incoming: BaseMessage): number { + const inRaw = incoming as unknown as Record; + const inType = typeof incoming._getType === 'function' ? incoming._getType() : (inRaw['type'] as string | undefined); + const inContent = typeof incoming.content === 'string' ? incoming.content : JSON.stringify(incoming.content); + // Only worth matching for human messages (where the optimistic→echo + // mismatch happens) and for AI messages where content is a strict prefix + // of the existing (token-streaming + final-id swap pattern). + for (let i = merged.length - 1; i >= 0; i--) { + const m = merged[i] as unknown as Record; + const mType = typeof (merged[i] as BaseMessage)._getType === 'function' + ? (merged[i] as BaseMessage)._getType() + : (m['type'] as string | undefined); + if (mType !== inType) continue; + const mContent = typeof (merged[i] as BaseMessage).content === 'string' + ? (merged[i] as BaseMessage).content as string + : JSON.stringify((merged[i] as BaseMessage).content); + if (inType === 'human' && mContent === inContent) return i; + if (inType === 'ai' && (mContent === inContent || (typeof mContent === 'string' && typeof inContent === 'string' && (inContent.startsWith(mContent) || mContent.startsWith(inContent))))) return i; + } + return -1; +} + function toSubagentRefs( subagents: Map, ): Map { diff --git a/libs/langgraph/src/lib/transport/fetch-stream.transport.spec.ts b/libs/langgraph/src/lib/transport/fetch-stream.transport.spec.ts index 355a0296e..6aed5f028 100644 --- a/libs/langgraph/src/lib/transport/fetch-stream.transport.spec.ts +++ b/libs/langgraph/src/lib/transport/fetch-stream.transport.spec.ts @@ -69,7 +69,7 @@ describe('FetchStreamTransport', () => { ]); }); - it('requests the stream modes required for values, messages, tools, and custom events', async () => { + it('requests the stream modes required for values, messages, and custom events', async () => { mocks.runsStream.mockReturnValue((async function* () { yield { event: 'metadata', data: { run_id: 'run-1', thread_id: 'thread-1' } }; })()); @@ -87,7 +87,6 @@ describe('FetchStreamTransport', () => { 'values', 'messages-tuple', 'updates', - 'tools', 'custom', ]), }), diff --git a/libs/langgraph/src/lib/transport/fetch-stream.transport.ts b/libs/langgraph/src/lib/transport/fetch-stream.transport.ts index b765c4356..bd7b21541 100644 --- a/libs/langgraph/src/lib/transport/fetch-stream.transport.ts +++ b/libs/langgraph/src/lib/transport/fetch-stream.transport.ts @@ -147,7 +147,11 @@ function buildRunPayload( } function defaultStreamMode(): StreamMode[] { - return ['values', 'messages-tuple', 'updates', 'tools', 'custom']; + // 'tools' is intentionally omitted: not supported by langgraph_api < 0.9.x + // Servers reject the entire request with HTTP 422 if any stream_mode in + // the array is unknown to them. Tool-call data is still derivable from + // the messages stream. + return ['values', 'messages-tuple', 'updates', 'custom']; } function normalizeSdkEvent(type: StreamEvent['type'], data: unknown): StreamEvent {