diff --git a/lib/sdk-bridge.js b/lib/sdk-bridge.js index 43339f35..3f3d87c4 100644 --- a/lib/sdk-bridge.js +++ b/lib/sdk-bridge.js @@ -1019,26 +1019,40 @@ function createSDKBridge(opts) { console.log("[sdk-bridge] Live query count: " + _activeLiveCount + "/" + MAX_CONCURRENT_SESSIONS + " (session " + session.localId + " done)"); } session.taskStopRequested = false; - session.pendingPermissions = {}; - // Preserve MCP-mode AskUserQuestion entries across turn boundaries. - // The MCP path is intentionally stateless: ask_user returns immediately - // ("card posted, end your turn") and the answer arrives as a new - // user_message on the next turn. Those entries MUST survive this finally - // block so ask_user_response can find the toolId and inject the answer. - // canUseTool-mode entries hold an open SDK callback that dies with the - // query anyway, so those are correctly cleared here. - var _keepAskUser = {}; - for (var _tid in session.pendingAskUser) { - var _pau = session.pendingAskUser[_tid]; - if (_pau && _pau.mode === "mcp") _keepAskUser[_tid] = _pau; - } - session.pendingAskUser = _keepAskUser; - session.pendingElicitations = {}; + // lr-8355: A rewind (or any other replacement) may have already started + // a NEW query on this session while this stream's for-await is still + // unwinding (e.g. rewind_execute nulls session.queryInstance, then the + // user immediately sends a message, starting a fresh query). If that + // happened, session.queryInstance now points at the newer query's + // instance, not ours. Resetting pendingPermissions/pendingAskUser/ + // pendingElicitations/isProcessing here would wipe the NEW query's + // in-flight state out from under it (approval clicks silently ignored, + // turn hangs forever) and flip isProcessing=false while the new query + // is still live. Only run the reset when we still own the session + // (queryInstance is unchanged or has already been nulled by our own + // completion path above) — never when a newer query has taken over. + if (session.queryInstance === myQueryInstance || session.queryInstance === null) { + session.pendingPermissions = {}; + // Preserve MCP-mode AskUserQuestion entries across turn boundaries. + // The MCP path is intentionally stateless: ask_user returns immediately + // ("card posted, end your turn") and the answer arrives as a new + // user_message on the next turn. Those entries MUST survive this finally + // block so ask_user_response can find the toolId and inject the answer. + // canUseTool-mode entries hold an open SDK callback that dies with the + // query anyway, so those are correctly cleared here. + var _keepAskUser = {}; + for (var _tid in session.pendingAskUser) { + var _pau = session.pendingAskUser[_tid]; + if (_pau && _pau.mode === "mcp") _keepAskUser[_tid] = _pau; + } + session.pendingAskUser = _keepAskUser; + session.pendingElicitations = {}; - // Auto-continue on rate limit (scheduler sessions, or user setting) - // Mark session as done processing so the late rate_limit_event handler - // can detect the race condition and schedule auto-continue itself. - session.isProcessing = false; + // Auto-continue on rate limit (scheduler sessions, or user setting) + // Mark session as done processing so the late rate_limit_event handler + // can detect the race condition and schedule auto-continue itself. + session.isProcessing = false; + } var didScheduleAutoContinue = false; var acEnabled = session.onQueryComplete || (typeof opts.getAutoContinueSetting === "function" && opts.getAutoContinueSetting(session)); diff --git a/test/sdk-bridge-replacement-query-lr-8355.test.js b/test/sdk-bridge-replacement-query-lr-8355.test.js new file mode 100644 index 00000000..951811c2 --- /dev/null +++ b/test/sdk-bridge-replacement-query-lr-8355.test.js @@ -0,0 +1,216 @@ +/** + * Regression test for lr-8355: query finally block clobbers a replacement + * query's session state. + * + * Scenario: a rewind (or any other replacement) nulls session.queryInstance + * and starts a NEW query while the OLD query's processQueryStream for-await + * is still unwinding in the background (e.g. draining a slow/blocked async + * iterator after abort()). Before the fix, the old stream's `finally` block + * unconditionally reset session.pendingPermissions / pendingAskUser / + * pendingElicitations / isProcessing — wiping out the NEW query's in-flight + * state and flipping isProcessing=false while the new query was still live. + * + * This test drives that exact race: start query A with a handle whose + * iterator blocks on the first next() call, "rewind" by nulling + * session.queryInstance and swapping in query B (also blocked, holding a + * pending permission), then unblock A's iterator so its finally block runs. + * Assert B's pendingPermissions entry and isProcessing survive. + */ + +var test = require("node:test"); +var assert = require("node:assert/strict"); + +function makeBlockedHandle() { + var resolveNext; + return { + _adapterState: null, + [Symbol.asyncIterator]: function () { + return { + next: function () { + return new Promise(function (resolve) { + resolveNext = function () { resolve({ value: undefined, done: true }); }; + }); + }, + }; + }, + pushMessage: function () {}, + close: function () { if (resolveNext) resolveNext(); }, + endInput: function () {}, + abort: function () { if (resolveNext) resolveNext(); }, + _unblock: function () { if (resolveNext) resolveNext(); }, + }; +} + +function makeSessionManager(messages) { + return { + sessions: new Map(), + currentModel: null, + currentPermissionMode: null, + currentEffort: null, + currentBetas: [], + modelsByVendor: {}, + availableVendors: [], + installedVendors: [], + defaultVendor: "claude", + saveSessionFile: function () {}, + broadcastSessionList: function () {}, + getActiveSession: function () { return null; }, + setSlashCommandsForVendor: function () {}, + sendAndRecord: function (session, obj) { + if (!session.history) session.history = []; + session.history.push(obj); + if (messages) messages.push(obj); + }, + sendToSession: function (session, obj) { + if (messages) messages.push(obj); + }, + }; +} + +var _localIdSeq = 1; +function makeSession() { + return { + localId: _localIdSeq++, + queryInstance: null, + messageQueue: null, + abortController: null, + isProcessing: true, + cliSessionId: null, + history: [], + blocks: {}, + sentToolResults: {}, + pendingPermissions: {}, + pendingAskUser: {}, + pendingElicitations: {}, + activeTaskToolIds: {}, + singleTurn: false, + lastActivityAt: Date.now(), + _isCountedLive: false, + _adapterWorkerState: null, + _workerExitPromise: null, + }; +} + +function makeAdapter(createQueryFn) { + return { + vendor: "claude", + createQuery: createQueryFn, + init: function () { return Promise.resolve({ models: [], skills: [] }); }, + supportedModels: function () { return Promise.resolve([]); }, + generateTitle: null, + renameSession: null, + forkSession: null, + }; +} + +function freshSdkBridge() { + var modPath = require.resolve("../lib/sdk-bridge"); + delete require.cache[modPath]; + return require("../lib/sdk-bridge"); +} + +function makeBridge(handles) { + var { createSDKBridge } = freshSdkBridge(); + var messages = []; + var sm = makeSessionManager(messages); + var idx = 0; + var adapter = makeAdapter(function () { + var h = handles[idx++]; + return Promise.resolve(h); + }); + var bridge = createSDKBridge({ + cwd: "/tmp/test-project", + slug: "test-project", + sessionManager: sm, + send: function (msg) { messages.push(msg); }, + adapter: adapter, + adapters: { claude: adapter }, + onProcessingChanged: function () {}, + getConfig: null, + }); + return { bridge, sm, messages }; +} + +test("lr-8355: old query's finally does not clobber a replacement query's pendingPermissions/isProcessing", async function () { + var handleA = makeBlockedHandle(); + var handleB = makeBlockedHandle(); + var { bridge, sm } = makeBridge([handleA, handleB]); + + var session = makeSession(); + sm.sessions.set(session.localId, session); + + // Start query A (simulates the pre-rewind, in-flight query). + await bridge.startQuery(session, "first message", null, null); + assert.equal(session.queryInstance, handleA, "session should own query A after startQuery"); + + // Simulate a permission request arriving on query A before the rewind. + session.pendingPermissions["perm-a"] = { resolve: function () {}, reject: function () {} }; + + // Simulate rewind_execute (project-sessions.js:874-965): it aborts/ends the + // old query and nulls session.queryInstance, but does NOT wait for query + // A's processQueryStream for-await loop to actually finish unwinding. + session.queryInstance = null; + session.pendingPermissions = {}; + session.isProcessing = false; + + // User immediately sends a new message — starts query B while A's stream + // (still blocked on its iterator's next()) has not yet run its finally. + await bridge.startQuery(session, "second message (post-rewind)", null, null); + assert.equal(session.queryInstance, handleB, "session should now own query B"); + + // Query B has an in-flight permission request the operator is about to + // approve/deny. This is the exact state that a clobbering finally block + // would wipe out. + session.pendingPermissions["perm-b"] = { resolve: function () {}, reject: function () {} }; + session.isProcessing = true; + + // Now let query A's stream actually finish unwinding — its finally block + // runs while session.queryInstance !== handleA (it's handleB). + handleA._unblock(); + if (session.streamPromiseA) { try { await session.streamPromiseA; } catch (e) {} } + // Give the finally block's microtasks a tick to run. + await new Promise(function (r) { setImmediate(r); }); + await new Promise(function (r) { setImmediate(r); }); + + // Query B's permission card must still be there, and isProcessing must + // still reflect query B being live — NOT reset by query A's stale finally. + assert.ok( + session.pendingPermissions["perm-b"], + "query B's pendingPermissions entry must survive query A's finally block" + ); + assert.equal( + session.isProcessing, + true, + "isProcessing must reflect the live query B, not be reset by the unwinding query A" + ); + + // Cleanup: unblock B so the test process doesn't hang. + handleB._unblock(); + if (session.streamPromise) { try { await session.streamPromise; } catch (e) {} } + await new Promise(function (r) { setImmediate(r); }); +}); + +test("lr-8355: finally block still resets state normally when no newer query has taken over", async function () { + var handleA = makeBlockedHandle(); + var { bridge, sm } = makeBridge([handleA]); + + var session = makeSession(); + sm.sessions.set(session.localId, session); + + await bridge.startQuery(session, "only message", null, null); + assert.equal(session.queryInstance, handleA); + + session.pendingPermissions["perm-a"] = { resolve: function () {}, reject: function () {} }; + + // No rewind, no replacement query — this is the normal single-query path. + handleA._unblock(); + if (session.streamPromise) { try { await session.streamPromise; } catch (e) {} } + await new Promise(function (r) { setImmediate(r); }); + + assert.deepEqual( + session.pendingPermissions, + {}, + "pendingPermissions should still be reset to {} for a normal (non-superseded) completion" + ); + assert.equal(session.isProcessing, false, "isProcessing should still be reset to false normally"); +});