Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
40 changes: 17 additions & 23 deletions packages/ocap-kernel/src/remotes/kernel/RemoteHandle.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1010,16 +1010,8 @@ describe('RemoteHandle', () => {
mockKernelStore.setRemoteNextSendSeq(mockRemoteId, 3);
mockKernelStore.setRemoteHighestReceivedSeq(mockRemoteId, 2);
mockKernelStore.setRemoteStartSeq(mockRemoteId, 2);
mockKernelStore.setPendingMessage(
mockRemoteId,
2,
'{"seq":2,"method":"deliver","params":["notify",[]]}',
);
mockKernelStore.setPendingMessage(
mockRemoteId,
3,
'{"seq":3,"method":"deliver","params":["notify",[]]}',
);
mockKernelStore.setPendingMessage(mockRemoteId, 2, 'message 2');
mockKernelStore.setPendingMessage(mockRemoteId, 3, 'message 3');

// Create a new RemoteHandle - should restore state
const remote = makeRemote();
Expand All @@ -1046,9 +1038,9 @@ describe('RemoteHandle', () => {
mockKernelStore.setRemoteNextSendSeq(mockRemoteId, 2);
mockKernelStore.setRemoteStartSeq(mockRemoteId, 1);
// But messages 1, 2, and 3 exist (3 was written but seq not incremented)
mockKernelStore.setPendingMessage(mockRemoteId, 1, '{"seq":1}');
mockKernelStore.setPendingMessage(mockRemoteId, 2, '{"seq":2}');
mockKernelStore.setPendingMessage(mockRemoteId, 3, '{"seq":3}');
mockKernelStore.setPendingMessage(mockRemoteId, 1, 'message 1');
mockKernelStore.setPendingMessage(mockRemoteId, 2, 'message 2');
mockKernelStore.setPendingMessage(mockRemoteId, 3, 'message 3');

// Create RemoteHandle - should detect and repair
const remote = makeRemote();
Expand Down Expand Up @@ -1077,7 +1069,7 @@ describe('RemoteHandle', () => {
// startSeq is persisted before nextSendSeq.
mockKernelStore.setRemoteStartSeq(mockRemoteId, 1);
// nextSendSeq not written (defaults to 0)
mockKernelStore.setPendingMessage(mockRemoteId, 1, '{"seq":1}');
mockKernelStore.setPendingMessage(mockRemoteId, 1, 'message 1');

// Create RemoteHandle - should detect message at nextSendSeq+1 and repair
const remote = makeRemote();
Expand Down Expand Up @@ -1105,7 +1097,7 @@ describe('RemoteHandle', () => {
// Simulate crash during first enqueue: message written but NO seq state
// persisted at all. This can happen if crash occurs after setPendingMessage
// but before setRemoteStartSeq.
mockKernelStore.setPendingMessage(mockRemoteId, 1, '{"seq":1}');
mockKernelStore.setPendingMessage(mockRemoteId, 1, 'message 1');
// No seq state set - getRemoteSeqState will return undefined

// Create RemoteHandle - should scan and find orphan message at seq 1
Expand All @@ -1130,25 +1122,27 @@ describe('RemoteHandle', () => {
expect(seqState?.nextSendSeq).toBe(2);
// Original orphan message still exists
expect(mockKernelStore.getPendingMessage(mockRemoteId, 1)).toBe(
'{"seq":1}',
'message 1',
);
});

it('ignores orphan messages (seq < startSeq) on recovery', () => {
it('cleans up orphan messages (seq < startSeq) on recovery', () => {
// Simulate crash during ACK: startSeq updated but message not deleted
mockKernelStore.setRemoteNextSendSeq(mockRemoteId, 3);
mockKernelStore.setRemoteStartSeq(mockRemoteId, 2);
// Orphan message at seq 1 (already acked per startSeq=2)
mockKernelStore.setPendingMessage(mockRemoteId, 1, '{"seq":1}');
mockKernelStore.setPendingMessage(mockRemoteId, 1, 'message 1');
// Valid pending at seq 2 and 3
mockKernelStore.setPendingMessage(mockRemoteId, 2, '{"seq":2}');
mockKernelStore.setPendingMessage(mockRemoteId, 3, '{"seq":3}');
mockKernelStore.setPendingMessage(mockRemoteId, 2, 'message 2');
mockKernelStore.setPendingMessage(mockRemoteId, 3, 'message 3');

// Create RemoteHandle - orphan is ignored (lazy cleanup)
// Create RemoteHandle - should clean up orphan during recovery
makeRemote();

// Orphan still exists in storage (cleaned lazily when remote is deleted)
expect(mockKernelStore.getPendingMessage(mockRemoteId, 1)).toBeDefined();
// Orphan should be deleted
expect(
mockKernelStore.getPendingMessage(mockRemoteId, 1),
).toBeUndefined();
// Valid pending messages should remain
expect(mockKernelStore.getPendingMessage(mockRemoteId, 2)).toBeDefined();
expect(mockKernelStore.getPendingMessage(mockRemoteId, 3)).toBeDefined();
Expand Down
11 changes: 11 additions & 0 deletions packages/ocap-kernel/src/remotes/kernel/RemoteHandle.ts
Original file line number Diff line number Diff line change
Expand Up @@ -232,6 +232,17 @@ export class RemoteHandle implements EndpointHandle {
this.#kernelStore.setRemoteNextSendSeq(this.remoteId, this.#nextSendSeq);
}

// Clean up orphan messages (seq < startSeq) left behind by crashes during ACK
const orphansDeleted = this.#kernelStore.cleanupOrphanMessages(
this.remoteId,
this.#startSeq,
);
if (orphansDeleted > 0) {
this.#logger.log(
`${this.#peerId.slice(0, 8)}:: cleaned up ${orphansDeleted} orphan message(s) during recovery`,
);
}

// If we have pending messages after recovery, start ACK timeout for retransmission
if (this.#hasPendingMessages()) {
this.#logger.log(
Expand Down
1 change: 1 addition & 0 deletions packages/ocap-kernel/src/store/index.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ describe('kernel store', () => {
'addSubcluster',
'addSubclusterVat',
'allocateErefForKref',
'cleanupOrphanMessages',
'cleanupTerminatedVat',
'clear',
'clearEmptySubclusters',
Expand Down
46 changes: 46 additions & 0 deletions packages/ocap-kernel/src/store/methods/remote.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -295,4 +295,50 @@ describe('remote store methods', () => {
).not.toThrow();
});
});

describe('cleanupOrphanMessages', () => {
it('deletes messages with seq < startSeq', () => {
mockKV.set(`remotePending.${remoteId1}.1`, 'message 1');
mockKV.set(`remotePending.${remoteId1}.2`, 'message 2');
mockKV.set(`remotePending.${remoteId1}.3`, 'message 3');
mockKV.set(`remotePending.${remoteId1}.4`, 'message 4');
mockGetPrefixedKeys.mockReturnValue([
`remotePending.${remoteId1}.1`,
`remotePending.${remoteId1}.2`,
`remotePending.${remoteId1}.3`,
`remotePending.${remoteId1}.4`,
]);

const deleted = remoteMethods.cleanupOrphanMessages(remoteId1, 3);

expect(deleted).toBe(2);
expect(mockKV.has(`remotePending.${remoteId1}.1`)).toBe(false);
expect(mockKV.has(`remotePending.${remoteId1}.2`)).toBe(false);
expect(mockKV.has(`remotePending.${remoteId1}.3`)).toBe(true);
expect(mockKV.has(`remotePending.${remoteId1}.4`)).toBe(true);
});

it('returns 0 when no orphans exist', () => {
mockKV.set(`remotePending.${remoteId1}.3`, 'message 3');
mockKV.set(`remotePending.${remoteId1}.4`, 'message 4');
mockGetPrefixedKeys.mockReturnValue([
`remotePending.${remoteId1}.3`,
`remotePending.${remoteId1}.4`,
]);

const deleted = remoteMethods.cleanupOrphanMessages(remoteId1, 3);

expect(deleted).toBe(0);
expect(mockKV.has(`remotePending.${remoteId1}.3`)).toBe(true);
expect(mockKV.has(`remotePending.${remoteId1}.4`)).toBe(true);
});

it('handles empty pending queue', () => {
mockGetPrefixedKeys.mockReturnValue([]);

const deleted = remoteMethods.cleanupOrphanMessages(remoteId1, 5);

expect(deleted).toBe(0);
});
});
});
26 changes: 26 additions & 0 deletions packages/ocap-kernel/src/store/methods/remote.ts
Original file line number Diff line number Diff line change
Expand Up @@ -205,6 +205,31 @@ export function getRemoteMethods(ctx: StoreContext) {
}
}

/**
* Delete orphan pending messages (messages with seq < startSeq).
* Called during recovery to clean up messages left behind by crashes
* during ACK processing.
*
* @param remoteId - The remote whose orphans are to be cleaned.
* @param startSeq - The current start sequence; messages below this are orphans.
* @returns The number of orphan messages deleted.
*/
function cleanupOrphanMessages(remoteId: RemoteId, startSeq: number): number {
const pendingPrefix = `${REMOTE_PENDING_BASE}${remoteId}.`;
const prefixLen = pendingPrefix.length;
let deletedCount = 0;

for (const key of getPrefixedKeys(pendingPrefix)) {
const seq = Number(key.slice(prefixLen));
if (seq < startSeq) {
kv.delete(key);
deletedCount += 1;
}
}

return deletedCount;
}

return {
getAllRemoteRecords,
getRemoteInfo,
Expand All @@ -219,5 +244,6 @@ export function getRemoteMethods(ctx: StoreContext) {
setPendingMessage,
deletePendingMessage,
deleteRemotePendingState,
cleanupOrphanMessages,
};
}
Loading