diff --git a/packages/ocap-kernel/src/remotes/kernel/RemoteHandle.test.ts b/packages/ocap-kernel/src/remotes/kernel/RemoteHandle.test.ts index 09b24ac56..59896146e 100644 --- a/packages/ocap-kernel/src/remotes/kernel/RemoteHandle.test.ts +++ b/packages/ocap-kernel/src/remotes/kernel/RemoteHandle.test.ts @@ -1010,16 +1010,8 @@ describe('RemoteHandle', () => { mockKernelStore.setRemoteNextSendSeq(mockRemoteId, 3); mockKernelStore.setRemoteHighestReceivedSeq(mockRemoteId, 2); mockKernelStore.setRemoteStartSeq(mockRemoteId, 2); - mockKernelStore.setPendingMessage( - mockRemoteId, - 2, - '{"seq":2,"method":"deliver","params":["notify",[]]}', - ); - mockKernelStore.setPendingMessage( - mockRemoteId, - 3, - '{"seq":3,"method":"deliver","params":["notify",[]]}', - ); + mockKernelStore.setPendingMessage(mockRemoteId, 2, 'message 2'); + mockKernelStore.setPendingMessage(mockRemoteId, 3, 'message 3'); // Create a new RemoteHandle - should restore state const remote = makeRemote(); @@ -1046,9 +1038,9 @@ describe('RemoteHandle', () => { mockKernelStore.setRemoteNextSendSeq(mockRemoteId, 2); mockKernelStore.setRemoteStartSeq(mockRemoteId, 1); // But messages 1, 2, and 3 exist (3 was written but seq not incremented) - mockKernelStore.setPendingMessage(mockRemoteId, 1, '{"seq":1}'); - mockKernelStore.setPendingMessage(mockRemoteId, 2, '{"seq":2}'); - mockKernelStore.setPendingMessage(mockRemoteId, 3, '{"seq":3}'); + mockKernelStore.setPendingMessage(mockRemoteId, 1, 'message 1'); + mockKernelStore.setPendingMessage(mockRemoteId, 2, 'message 2'); + mockKernelStore.setPendingMessage(mockRemoteId, 3, 'message 3'); // Create RemoteHandle - should detect and repair const remote = makeRemote(); @@ -1077,7 +1069,7 @@ describe('RemoteHandle', () => { // startSeq is persisted before nextSendSeq. mockKernelStore.setRemoteStartSeq(mockRemoteId, 1); // nextSendSeq not written (defaults to 0) - mockKernelStore.setPendingMessage(mockRemoteId, 1, '{"seq":1}'); + mockKernelStore.setPendingMessage(mockRemoteId, 1, 'message 1'); // Create RemoteHandle - should detect message at nextSendSeq+1 and repair const remote = makeRemote(); @@ -1105,7 +1097,7 @@ describe('RemoteHandle', () => { // Simulate crash during first enqueue: message written but NO seq state // persisted at all. This can happen if crash occurs after setPendingMessage // but before setRemoteStartSeq. - mockKernelStore.setPendingMessage(mockRemoteId, 1, '{"seq":1}'); + mockKernelStore.setPendingMessage(mockRemoteId, 1, 'message 1'); // No seq state set - getRemoteSeqState will return undefined // Create RemoteHandle - should scan and find orphan message at seq 1 @@ -1130,25 +1122,27 @@ describe('RemoteHandle', () => { expect(seqState?.nextSendSeq).toBe(2); // Original orphan message still exists expect(mockKernelStore.getPendingMessage(mockRemoteId, 1)).toBe( - '{"seq":1}', + 'message 1', ); }); - it('ignores orphan messages (seq < startSeq) on recovery', () => { + it('cleans up orphan messages (seq < startSeq) on recovery', () => { // Simulate crash during ACK: startSeq updated but message not deleted mockKernelStore.setRemoteNextSendSeq(mockRemoteId, 3); mockKernelStore.setRemoteStartSeq(mockRemoteId, 2); // Orphan message at seq 1 (already acked per startSeq=2) - mockKernelStore.setPendingMessage(mockRemoteId, 1, '{"seq":1}'); + mockKernelStore.setPendingMessage(mockRemoteId, 1, 'message 1'); // Valid pending at seq 2 and 3 - mockKernelStore.setPendingMessage(mockRemoteId, 2, '{"seq":2}'); - mockKernelStore.setPendingMessage(mockRemoteId, 3, '{"seq":3}'); + mockKernelStore.setPendingMessage(mockRemoteId, 2, 'message 2'); + mockKernelStore.setPendingMessage(mockRemoteId, 3, 'message 3'); - // Create RemoteHandle - orphan is ignored (lazy cleanup) + // Create RemoteHandle - should clean up orphan during recovery makeRemote(); - // Orphan still exists in storage (cleaned lazily when remote is deleted) - expect(mockKernelStore.getPendingMessage(mockRemoteId, 1)).toBeDefined(); + // Orphan should be deleted + expect( + mockKernelStore.getPendingMessage(mockRemoteId, 1), + ).toBeUndefined(); // Valid pending messages should remain expect(mockKernelStore.getPendingMessage(mockRemoteId, 2)).toBeDefined(); expect(mockKernelStore.getPendingMessage(mockRemoteId, 3)).toBeDefined(); diff --git a/packages/ocap-kernel/src/remotes/kernel/RemoteHandle.ts b/packages/ocap-kernel/src/remotes/kernel/RemoteHandle.ts index 5ffbf50b9..a8aa71a10 100644 --- a/packages/ocap-kernel/src/remotes/kernel/RemoteHandle.ts +++ b/packages/ocap-kernel/src/remotes/kernel/RemoteHandle.ts @@ -232,6 +232,17 @@ export class RemoteHandle implements EndpointHandle { this.#kernelStore.setRemoteNextSendSeq(this.remoteId, this.#nextSendSeq); } + // Clean up orphan messages (seq < startSeq) left behind by crashes during ACK + const orphansDeleted = this.#kernelStore.cleanupOrphanMessages( + this.remoteId, + this.#startSeq, + ); + if (orphansDeleted > 0) { + this.#logger.log( + `${this.#peerId.slice(0, 8)}:: cleaned up ${orphansDeleted} orphan message(s) during recovery`, + ); + } + // If we have pending messages after recovery, start ACK timeout for retransmission if (this.#hasPendingMessages()) { this.#logger.log( diff --git a/packages/ocap-kernel/src/store/index.test.ts b/packages/ocap-kernel/src/store/index.test.ts index 7e305f7cc..5f47cb80b 100644 --- a/packages/ocap-kernel/src/store/index.test.ts +++ b/packages/ocap-kernel/src/store/index.test.ts @@ -46,6 +46,7 @@ describe('kernel store', () => { 'addSubcluster', 'addSubclusterVat', 'allocateErefForKref', + 'cleanupOrphanMessages', 'cleanupTerminatedVat', 'clear', 'clearEmptySubclusters', diff --git a/packages/ocap-kernel/src/store/methods/remote.test.ts b/packages/ocap-kernel/src/store/methods/remote.test.ts index 12f6df0ad..82abbeda0 100644 --- a/packages/ocap-kernel/src/store/methods/remote.test.ts +++ b/packages/ocap-kernel/src/store/methods/remote.test.ts @@ -295,4 +295,50 @@ describe('remote store methods', () => { ).not.toThrow(); }); }); + + describe('cleanupOrphanMessages', () => { + it('deletes messages with seq < startSeq', () => { + mockKV.set(`remotePending.${remoteId1}.1`, 'message 1'); + mockKV.set(`remotePending.${remoteId1}.2`, 'message 2'); + mockKV.set(`remotePending.${remoteId1}.3`, 'message 3'); + mockKV.set(`remotePending.${remoteId1}.4`, 'message 4'); + mockGetPrefixedKeys.mockReturnValue([ + `remotePending.${remoteId1}.1`, + `remotePending.${remoteId1}.2`, + `remotePending.${remoteId1}.3`, + `remotePending.${remoteId1}.4`, + ]); + + const deleted = remoteMethods.cleanupOrphanMessages(remoteId1, 3); + + expect(deleted).toBe(2); + expect(mockKV.has(`remotePending.${remoteId1}.1`)).toBe(false); + expect(mockKV.has(`remotePending.${remoteId1}.2`)).toBe(false); + expect(mockKV.has(`remotePending.${remoteId1}.3`)).toBe(true); + expect(mockKV.has(`remotePending.${remoteId1}.4`)).toBe(true); + }); + + it('returns 0 when no orphans exist', () => { + mockKV.set(`remotePending.${remoteId1}.3`, 'message 3'); + mockKV.set(`remotePending.${remoteId1}.4`, 'message 4'); + mockGetPrefixedKeys.mockReturnValue([ + `remotePending.${remoteId1}.3`, + `remotePending.${remoteId1}.4`, + ]); + + const deleted = remoteMethods.cleanupOrphanMessages(remoteId1, 3); + + expect(deleted).toBe(0); + expect(mockKV.has(`remotePending.${remoteId1}.3`)).toBe(true); + expect(mockKV.has(`remotePending.${remoteId1}.4`)).toBe(true); + }); + + it('handles empty pending queue', () => { + mockGetPrefixedKeys.mockReturnValue([]); + + const deleted = remoteMethods.cleanupOrphanMessages(remoteId1, 5); + + expect(deleted).toBe(0); + }); + }); }); diff --git a/packages/ocap-kernel/src/store/methods/remote.ts b/packages/ocap-kernel/src/store/methods/remote.ts index 4511600a3..14c6e1059 100644 --- a/packages/ocap-kernel/src/store/methods/remote.ts +++ b/packages/ocap-kernel/src/store/methods/remote.ts @@ -205,6 +205,31 @@ export function getRemoteMethods(ctx: StoreContext) { } } + /** + * Delete orphan pending messages (messages with seq < startSeq). + * Called during recovery to clean up messages left behind by crashes + * during ACK processing. + * + * @param remoteId - The remote whose orphans are to be cleaned. + * @param startSeq - The current start sequence; messages below this are orphans. + * @returns The number of orphan messages deleted. + */ + function cleanupOrphanMessages(remoteId: RemoteId, startSeq: number): number { + const pendingPrefix = `${REMOTE_PENDING_BASE}${remoteId}.`; + const prefixLen = pendingPrefix.length; + let deletedCount = 0; + + for (const key of getPrefixedKeys(pendingPrefix)) { + const seq = Number(key.slice(prefixLen)); + if (seq < startSeq) { + kv.delete(key); + deletedCount += 1; + } + } + + return deletedCount; + } + return { getAllRemoteRecords, getRemoteInfo, @@ -219,5 +244,6 @@ export function getRemoteMethods(ctx: StoreContext) { setPendingMessage, deletePendingMessage, deleteRemotePendingState, + cleanupOrphanMessages, }; }