diff --git a/packages/ocap-kernel/src/remotes/platform/channel-utils.test.ts b/packages/ocap-kernel/src/remotes/platform/channel-utils.test.ts new file mode 100644 index 000000000..a41c13f43 --- /dev/null +++ b/packages/ocap-kernel/src/remotes/platform/channel-utils.test.ts @@ -0,0 +1,153 @@ +import { describe, it, expect, vi, beforeEach } from 'vitest'; + +import { makeErrorLogger, writeWithTimeout } from './channel-utils.ts'; +import type { Channel } from '../types.ts'; + +describe('channel-utils', () => { + describe('makeErrorLogger', () => { + it('creates an error logger function', () => { + const mockLogger = { log: vi.fn() }; + const outputError = makeErrorLogger(mockLogger); + + expect(typeof outputError).toBe('function'); + }); + + it('logs error with peer context when problem is provided', () => { + const mockLogger = { log: vi.fn() }; + const outputError = makeErrorLogger(mockLogger); + const error = new Error('test error'); + + outputError('peer123', 'sending message', error); + + expect(mockLogger.log).toHaveBeenCalledWith( + 'peer123:: error sending message: Error: test error', + ); + }); + + it('logs error without problem details when problem is null', () => { + const mockLogger = { log: vi.fn() }; + const outputError = makeErrorLogger(mockLogger); + + outputError('peer123', 'connection failed', null); + + expect(mockLogger.log).toHaveBeenCalledWith( + 'peer123:: error connection failed', + ); + }); + + it('logs error without problem details when problem is undefined', () => { + const mockLogger = { log: vi.fn() }; + const outputError = makeErrorLogger(mockLogger); + + outputError('peer123', 'timeout', undefined); + + expect(mockLogger.log).toHaveBeenCalledWith('peer123:: error timeout'); + }); + + it('handles non-Error objects as problems', () => { + const mockLogger = { log: vi.fn() }; + const outputError = makeErrorLogger(mockLogger); + const problem = { message: 'custom error' }; + + outputError('peer456', 'processing', problem); + + expect(mockLogger.log).toHaveBeenCalledWith( + 'peer456:: error processing: [object Object]', + ); + }); + + it('handles string problems', () => { + const mockLogger = { log: vi.fn() }; + const outputError = makeErrorLogger(mockLogger); + + outputError('peer789', 'reading', 'string error'); + + expect(mockLogger.log).toHaveBeenCalledWith( + 'peer789:: error reading: string error', + ); + }); + }); + + describe('writeWithTimeout', () => { + let mockChannel: Channel; + let writeResolve: () => void; + let writeReject: (error: Error) => void; + + beforeEach(() => { + const writePromise = new Promise((resolve, reject) => { + writeResolve = resolve; + writeReject = reject; + }); + + mockChannel = { + peerId: 'testPeer', + msgStream: { + write: vi.fn().mockReturnValue(writePromise), + read: vi.fn(), + unwrap: vi.fn(), + }, + } as unknown as Channel; + }); + + it('writes message to channel', async () => { + const message = new Uint8Array([1, 2, 3]); + + const writePromise = writeWithTimeout(mockChannel, message, 1000); + writeResolve(); + await writePromise; + + expect(mockChannel.msgStream.write).toHaveBeenCalledWith(message); + }); + + it('resolves when write completes before timeout', async () => { + const message = new Uint8Array([1, 2, 3]); + + const writePromise = writeWithTimeout(mockChannel, message, 1000); + writeResolve(); + + expect(await writePromise).toBeUndefined(); + }); + + it('rejects with timeout error when write takes too long', async () => { + const message = new Uint8Array([1, 2, 3]); + + const writePromise = writeWithTimeout(mockChannel, message, 50); + + await expect(writePromise).rejects.toThrow( + 'Message send timed out after 50ms', + ); + }); + + it('uses default timeout when not specified', async () => { + const message = new Uint8Array([1, 2, 3]); + + const writePromise = writeWithTimeout(mockChannel, message); + writeResolve(); + + expect(await writePromise).toBeUndefined(); + }); + + it('rejects with write error when write fails', async () => { + const message = new Uint8Array([1, 2, 3]); + const writeError = new Error('Write failed'); + + const writePromise = writeWithTimeout(mockChannel, message, 1000); + writeReject(writeError); + + await expect(writePromise).rejects.toThrow('Write failed'); + }); + + it('cleans up timeout listener after successful write', async () => { + const message = new Uint8Array([1, 2, 3]); + + const writePromise = writeWithTimeout(mockChannel, message, 1000); + writeResolve(); + const result = await writePromise; + + // If cleanup didn't happen, there would be an unhandled rejection + // when the timeout fires. This test verifies no error is thrown. + await new Promise((resolve) => setTimeout(resolve, 100)); + expect(result).toBeUndefined(); + }); + }); +}); diff --git a/packages/ocap-kernel/src/remotes/platform/channel-utils.ts b/packages/ocap-kernel/src/remotes/platform/channel-utils.ts new file mode 100644 index 000000000..34e3c2e09 --- /dev/null +++ b/packages/ocap-kernel/src/remotes/platform/channel-utils.ts @@ -0,0 +1,67 @@ +import type { Logger } from '@metamask/logger'; + +import { DEFAULT_WRITE_TIMEOUT_MS } from './constants.ts'; +import type { Channel } from '../types.ts'; + +/** + * Type for error logging function. + */ +export type ErrorLogger = ( + peerId: string, + task: string, + problem: unknown, +) => void; + +/** + * Creates an error logging function for transport operations. + * + * @param logger - The logger instance to use. + * @returns A function that logs errors with peer context. + */ +export function makeErrorLogger(logger: Logger): ErrorLogger { + return (peerId: string, task: string, problem: unknown): void => { + if (problem) { + const realProblem: Error = problem as Error; + logger.log(`${peerId}:: error ${task}: ${realProblem}`); + } else { + logger.log(`${peerId}:: error ${task}`); + } + }; +} + +/** + * Write a message to a channel stream with a timeout. + * + * @param channel - The channel to write to. + * @param message - The message bytes to write. + * @param timeoutMs - Timeout in milliseconds (default: 10 seconds). + * @returns Promise that resolves when the write completes or rejects on timeout. + * @throws Error if the write times out or fails. + */ +export async function writeWithTimeout( + channel: Channel, + message: Uint8Array, + timeoutMs = DEFAULT_WRITE_TIMEOUT_MS, +): Promise { + const timeoutSignal = AbortSignal.timeout(timeoutMs); + let abortHandler: (() => void) | undefined; + const timeoutPromise = new Promise((_resolve, reject) => { + abortHandler = () => { + reject(Error(`Message send timed out after ${timeoutMs}ms`)); + }; + timeoutSignal.addEventListener('abort', abortHandler); + }); + + try { + return await Promise.race([ + channel.msgStream.write(message), + timeoutPromise, + ]); + } finally { + // Clean up event listener to prevent unhandled rejection if operation + // completes before timeout + if (abortHandler) { + timeoutSignal.removeEventListener('abort', abortHandler); + } + } +} diff --git a/packages/ocap-kernel/src/remotes/platform/constants.ts b/packages/ocap-kernel/src/remotes/platform/constants.ts new file mode 100644 index 000000000..608a3ebe7 --- /dev/null +++ b/packages/ocap-kernel/src/remotes/platform/constants.ts @@ -0,0 +1,17 @@ +/** Default maximum number of concurrent connections */ +export const DEFAULT_MAX_CONCURRENT_CONNECTIONS = 100; + +/** Default maximum message size in bytes (1MB) */ +export const DEFAULT_MAX_MESSAGE_SIZE_BYTES = 1024 * 1024; + +/** Default stale peer cleanup interval in milliseconds (15 minutes) */ +export const DEFAULT_CLEANUP_INTERVAL_MS = 15 * 60 * 1000; + +/** Default stale peer timeout in milliseconds (1 hour) */ +export const DEFAULT_STALE_PEER_TIMEOUT_MS = 60 * 60 * 1000; + +/** Default message write timeout in milliseconds (10 seconds) */ +export const DEFAULT_WRITE_TIMEOUT_MS = 10_000; + +/** SCTP user initiated abort code (RFC 4960) */ +export const SCTP_USER_INITIATED_ABORT = 12; diff --git a/packages/ocap-kernel/src/remotes/platform/peer-state-manager.test.ts b/packages/ocap-kernel/src/remotes/platform/peer-state-manager.test.ts new file mode 100644 index 000000000..4f619ec98 --- /dev/null +++ b/packages/ocap-kernel/src/remotes/platform/peer-state-manager.test.ts @@ -0,0 +1,329 @@ +import { describe, it, expect, beforeEach, vi } from 'vitest'; + +import { PeerStateManager } from './peer-state-manager.ts'; +import type { Channel } from '../types.ts'; + +describe('PeerStateManager', () => { + let manager: PeerStateManager; + let mockLogger: { log: ReturnType }; + + beforeEach(() => { + mockLogger = { log: vi.fn() }; + manager = new PeerStateManager(mockLogger); + }); + + describe('getState', () => { + it('creates new state for unknown peer', () => { + const state = manager.getState('peer1'); + + expect(state).toStrictEqual({ + channel: undefined, + locationHints: [], + }); + }); + + it('returns same state for same peer', () => { + const state1 = manager.getState('peer1'); + const state2 = manager.getState('peer1'); + + expect(state1).toBe(state2); + }); + + it('returns different state for different peers', () => { + const state1 = manager.getState('peer1'); + const state2 = manager.getState('peer2'); + + expect(state1).not.toBe(state2); + }); + + it('initializes lastConnectionTime on first access', async () => { + // Use very short timeout for testing + const shortTimeoutManager = new PeerStateManager(mockLogger, 10); + shortTimeoutManager.getState('peer1'); + + // Wait for timeout + await new Promise((resolve) => setTimeout(resolve, 20)); + + // Verify by checking getStalePeers behavior + const stalePeers = shortTimeoutManager.getStalePeers(); + expect(stalePeers).toContain('peer1'); + }); + }); + + describe('countActiveConnections', () => { + it('returns 0 when no connections', () => { + expect(manager.countActiveConnections()).toBe(0); + }); + + it('counts peers with channels', () => { + const mockChannel = { peerId: 'peer1' } as Channel; + const state1 = manager.getState('peer1'); + state1.channel = mockChannel; + + expect(manager.countActiveConnections()).toBe(1); + }); + + it('does not count peers without channels', () => { + manager.getState('peer1'); + manager.getState('peer2'); + + expect(manager.countActiveConnections()).toBe(0); + }); + + it('counts multiple active connections', () => { + const state1 = manager.getState('peer1'); + const state2 = manager.getState('peer2'); + manager.getState('peer3'); // peer3 has no channel + + state1.channel = { peerId: 'peer1' } as Channel; + state2.channel = { peerId: 'peer2' } as Channel; + + expect(manager.countActiveConnections()).toBe(2); + }); + }); + + describe('updateConnectionTime', () => { + it('updates connection time for peer', async () => { + // Use very short timeout for testing + const shortTimeoutManager = new PeerStateManager(mockLogger, 50); + shortTimeoutManager.getState('peer1'); + + // Wait a bit, then update connection time + await new Promise((resolve) => setTimeout(resolve, 30)); + shortTimeoutManager.updateConnectionTime('peer1'); + + // Wait a bit more - total time is ~60ms but we updated at ~30ms + // So the effective time since update is ~30ms, less than 50ms timeout + await new Promise((resolve) => setTimeout(resolve, 30)); + + // Peer should not be stale because we updated the connection time + expect(shortTimeoutManager.getStalePeers()).not.toContain('peer1'); + }); + }); + + describe('intentionally closed tracking', () => { + describe('isIntentionallyClosed', () => { + it('returns false for new peer', () => { + expect(manager.isIntentionallyClosed('peer1')).toBe(false); + }); + + it('returns true after marking as closed', () => { + manager.markIntentionallyClosed('peer1'); + + expect(manager.isIntentionallyClosed('peer1')).toBe(true); + }); + }); + + describe('markIntentionallyClosed', () => { + it('marks peer as intentionally closed', () => { + manager.markIntentionallyClosed('peer1'); + + expect(manager.isIntentionallyClosed('peer1')).toBe(true); + }); + + it('is idempotent', () => { + manager.markIntentionallyClosed('peer1'); + manager.markIntentionallyClosed('peer1'); + + expect(manager.isIntentionallyClosed('peer1')).toBe(true); + }); + + it('does not affect other peers', () => { + manager.markIntentionallyClosed('peer1'); + + expect(manager.isIntentionallyClosed('peer2')).toBe(false); + }); + }); + + describe('clearIntentionallyClosed', () => { + it('clears intentionally closed flag', () => { + manager.markIntentionallyClosed('peer1'); + manager.clearIntentionallyClosed('peer1'); + + expect(manager.isIntentionallyClosed('peer1')).toBe(false); + }); + + it('works on peer that was not marked', () => { + manager.clearIntentionallyClosed('peer1'); + + expect(manager.isIntentionallyClosed('peer1')).toBe(false); + }); + }); + }); + + describe('addLocationHints', () => { + it('adds hints to empty list', () => { + manager.addLocationHints('peer1', ['hint1', 'hint2']); + + const state = manager.getState('peer1'); + expect(state.locationHints).toStrictEqual(['hint1', 'hint2']); + }); + + it('merges hints with existing hints', () => { + manager.addLocationHints('peer1', ['hint1', 'hint2']); + manager.addLocationHints('peer1', ['hint2', 'hint3']); + + const state = manager.getState('peer1'); + expect(state.locationHints).toContain('hint1'); + expect(state.locationHints).toContain('hint2'); + expect(state.locationHints).toContain('hint3'); + expect(state.locationHints).toHaveLength(3); + }); + + it('deduplicates hints when merging', () => { + manager.addLocationHints('peer1', ['hint1']); + manager.addLocationHints('peer1', ['hint1', 'hint2']); + + const state = manager.getState('peer1'); + expect(state.locationHints).toHaveLength(2); + expect(state.locationHints).toContain('hint1'); + expect(state.locationHints).toContain('hint2'); + }); + + it('creates state if it does not exist', () => { + manager.addLocationHints('newpeer', ['hint1']); + + const state = manager.getState('newpeer'); + expect(state.locationHints).toStrictEqual(['hint1']); + }); + }); + + describe('getStalePeers', () => { + it('returns empty array when no peers', () => { + expect(manager.getStalePeers()).toStrictEqual([]); + }); + + it('returns empty array when peers have active channels', () => { + // Use a very short timeout for testing + const shortTimeoutManager = new PeerStateManager(mockLogger, 1); + const state = shortTimeoutManager.getState('peer1'); + state.channel = { peerId: 'peer1' } as Channel; + + // Even with a tiny timeout, peers with channels are not stale + expect(shortTimeoutManager.getStalePeers()).toStrictEqual([]); + }); + + it('returns stale peers without channels after timeout', async () => { + // Use a very short timeout (10ms) for testing + const shortTimeoutManager = new PeerStateManager(mockLogger, 10); + shortTimeoutManager.getState('peer1'); + + // Wait for timeout + await new Promise((resolve) => setTimeout(resolve, 20)); + + expect(shortTimeoutManager.getStalePeers()).toContain('peer1'); + }); + + it('does not return peers with recent activity', () => { + // Use 1 hour timeout (default) + manager.getState('peer1'); + + // Peer should not be stale immediately + expect(manager.getStalePeers()).not.toContain('peer1'); + }); + + it('respects custom stale timeout', async () => { + // Use a very short timeout (10ms) + const customManager = new PeerStateManager(mockLogger, 10); + customManager.getState('peer1'); + + // Wait for timeout + await new Promise((resolve) => setTimeout(resolve, 20)); + + expect(customManager.getStalePeers()).toContain('peer1'); + }); + }); + + describe('removePeer', () => { + it('removes peer state', () => { + const state = manager.getState('peer1'); + state.channel = { peerId: 'peer1' } as Channel; + + manager.removePeer('peer1'); + + // New call should create fresh state + const newState = manager.getState('peer1'); + expect(newState.channel).toBeUndefined(); + }); + + it('clears intentionally closed flag', () => { + manager.markIntentionallyClosed('peer1'); + + manager.removePeer('peer1'); + + expect(manager.isIntentionallyClosed('peer1')).toBe(false); + }); + + it('logs cleanup message', () => { + manager.getState('peer1'); + + manager.removePeer('peer1'); + + expect(mockLogger.log).toHaveBeenCalledWith( + expect.stringContaining('Cleaning up stale peer peer1'), + ); + }); + + it('handles removal of non-existent peer', () => { + expect(() => manager.removePeer('nonexistent')).not.toThrow(); + }); + }); + + describe('getAllStates', () => { + it('returns empty iterator when no peers', () => { + const states = Array.from(manager.getAllStates()); + + expect(states).toStrictEqual([]); + }); + + it('returns all peer states', () => { + manager.getState('peer1'); + manager.getState('peer2'); + + const states = Array.from(manager.getAllStates()); + + expect(states).toHaveLength(2); + }); + + it('returns states with correct structure', () => { + const state = manager.getState('peer1'); + state.channel = { peerId: 'peer1' } as Channel; + state.locationHints = ['hint1']; + + const states = Array.from(manager.getAllStates()); + + expect(states[0]).toStrictEqual({ + channel: { peerId: 'peer1' }, + locationHints: ['hint1'], + }); + }); + }); + + describe('clear', () => { + it('removes all peer states', () => { + manager.getState('peer1'); + manager.getState('peer2'); + + manager.clear(); + + expect(Array.from(manager.getAllStates())).toStrictEqual([]); + }); + + it('clears intentionally closed flags', () => { + manager.markIntentionallyClosed('peer1'); + + manager.clear(); + + expect(manager.isIntentionallyClosed('peer1')).toBe(false); + }); + + it('clears connection times', () => { + manager.getState('peer1'); + + manager.clear(); + + // After clear, getStalePeers should be empty because there are no peers + expect(manager.getStalePeers()).toStrictEqual([]); + }); + }); +}); diff --git a/packages/ocap-kernel/src/remotes/platform/peer-state-manager.ts b/packages/ocap-kernel/src/remotes/platform/peer-state-manager.ts new file mode 100644 index 000000000..8cf99520a --- /dev/null +++ b/packages/ocap-kernel/src/remotes/platform/peer-state-manager.ts @@ -0,0 +1,191 @@ +import type { Logger } from '@metamask/logger'; + +import { DEFAULT_STALE_PEER_TIMEOUT_MS } from './constants.ts'; +import type { Channel } from '../types.ts'; + +/** + * Per-peer connection state. + */ +export type PeerState = { + channel: Channel | undefined; + locationHints: string[]; +}; + +/** + * Manages peer connection state, tracking channels, location hints, + * connection times, and intentional closures. + */ +export class PeerStateManager { + readonly #peerStates = new Map(); + + readonly #lastConnectionTime = new Map(); + + readonly #intentionallyClosed = new Set(); + + readonly #logger: Logger; + + readonly #stalePeerTimeoutMs: number; + + /** + * Create a new PeerStateManager. + * + * @param logger - Logger instance for logging. + * @param stalePeerTimeoutMs - Timeout for stale peer cleanup. + */ + constructor( + logger: Logger, + stalePeerTimeoutMs = DEFAULT_STALE_PEER_TIMEOUT_MS, + ) { + this.#logger = logger; + this.#stalePeerTimeoutMs = stalePeerTimeoutMs; + } + + /** + * Get or create peer connection state. + * + * @param peerId - The peer ID. + * @returns The peer connection state. + */ + getState(peerId: string): PeerState { + let state = this.#peerStates.get(peerId); + if (!state) { + state = { channel: undefined, locationHints: [] }; + this.#peerStates.set(peerId, state); + // Initialize lastConnectionTime to enable stale peer cleanup + // even for peers that never successfully connect + if (!this.#lastConnectionTime.has(peerId)) { + this.#lastConnectionTime.set(peerId, Date.now()); + } + } + return state; + } + + /** + * Count the number of active connections (peers with channels). + * + * @returns The number of active connections. + */ + countActiveConnections(): number { + let count = 0; + for (const state of this.#peerStates.values()) { + if (state.channel) { + count += 1; + } + } + return count; + } + + /** + * Update the last connection time for a peer. + * + * @param peerId - The peer ID. + */ + updateConnectionTime(peerId: string): void { + this.#lastConnectionTime.set(peerId, Date.now()); + } + + /** + * Check if a peer was intentionally closed. + * + * @param peerId - The peer ID. + * @returns True if the peer was intentionally closed. + */ + isIntentionallyClosed(peerId: string): boolean { + return this.#intentionallyClosed.has(peerId); + } + + /** + * Mark a peer as intentionally closed. + * + * @param peerId - The peer ID. + */ + markIntentionallyClosed(peerId: string): void { + this.#intentionallyClosed.add(peerId); + } + + /** + * Clear the intentional close flag for a peer. + * + * @param peerId - The peer ID. + */ + clearIntentionallyClosed(peerId: string): void { + this.#intentionallyClosed.delete(peerId); + } + + /** + * Register location hints for a peer. + * + * @param peerId - The peer ID. + * @param hints - Location hints to add. + */ + addLocationHints(peerId: string, hints: string[]): void { + const state = this.getState(peerId); + const { locationHints: oldHints } = state; + if (oldHints.length > 0) { + const newHints = new Set(oldHints); + for (const hint of hints) { + newHints.add(hint); + } + state.locationHints = Array.from(newHints); + } else { + state.locationHints = Array.from(hints); + } + } + + /** + * Get stale peers that should be cleaned up. + * A peer is considered stale if: + * - It has no active channel + * - It has been inactive for more than stalePeerTimeoutMs + * + * @returns Array of peer IDs that are stale. + */ + getStalePeers(): string[] { + const now = Date.now(); + const stalePeers: string[] = []; + + for (const [peerId, lastTime] of this.#lastConnectionTime.entries()) { + const state = this.#peerStates.get(peerId); + const timeSinceLastActivity = now - lastTime; + + if (!state?.channel && timeSinceLastActivity > this.#stalePeerTimeoutMs) { + stalePeers.push(peerId); + } + } + + return stalePeers; + } + + /** + * Remove a peer from all tracking state. + * + * @param peerId - The peer ID to remove. + */ + removePeer(peerId: string): void { + const lastTime = this.#lastConnectionTime.get(peerId); + this.#logger.log( + `Cleaning up stale peer ${peerId} (inactive for ${lastTime ? Date.now() - lastTime : 'unknown'}ms)`, + ); + this.#peerStates.delete(peerId); + this.#intentionallyClosed.delete(peerId); + this.#lastConnectionTime.delete(peerId); + } + + /** + * Get all peer states for iteration. + * + * @returns Iterator over all peer states. + */ + getAllStates(): IterableIterator { + return this.#peerStates.values(); + } + + /** + * Clear all state. Called during stop(). + */ + clear(): void { + this.#peerStates.clear(); + this.#intentionallyClosed.clear(); + this.#lastConnectionTime.clear(); + } +} diff --git a/packages/ocap-kernel/src/remotes/platform/reconnection-lifecycle.test.ts b/packages/ocap-kernel/src/remotes/platform/reconnection-lifecycle.test.ts new file mode 100644 index 000000000..5c45729e2 --- /dev/null +++ b/packages/ocap-kernel/src/remotes/platform/reconnection-lifecycle.test.ts @@ -0,0 +1,371 @@ +import * as kernelErrors from '@metamask/kernel-errors'; +import * as kernelUtils from '@metamask/kernel-utils'; +import { describe, it, expect, beforeEach, vi } from 'vitest'; + +import { makeReconnectionLifecycle } from './reconnection-lifecycle.ts'; +import type { ReconnectionLifecycleDeps } from './reconnection-lifecycle.ts'; +import type { Channel } from '../types.ts'; + +// Mock kernel-utils for abortableDelay +vi.mock('@metamask/kernel-utils', async () => { + const actual = await vi.importActual( + '@metamask/kernel-utils', + ); + return { + ...actual, + abortableDelay: vi.fn(), + }; +}); + +// Mock kernel-errors for isRetryableNetworkError +vi.mock('@metamask/kernel-errors', async () => { + const actual = await vi.importActual( + '@metamask/kernel-errors', + ); + return { + ...actual, + isRetryableNetworkError: vi.fn(), + }; +}); + +describe('reconnection-lifecycle', () => { + let deps: ReconnectionLifecycleDeps; + let abortController: AbortController; + let mockChannel: Channel; + + beforeEach(() => { + vi.clearAllMocks(); + + // Set up default mock behaviors + (kernelUtils.abortableDelay as ReturnType).mockResolvedValue( + undefined, + ); + ( + kernelErrors.isRetryableNetworkError as ReturnType + ).mockReturnValue(true); + + abortController = new AbortController(); + mockChannel = { + peerId: 'testPeer', + msgStream: { + write: vi.fn(), + read: vi.fn(), + unwrap: vi.fn(), + }, + } as unknown as Channel; + + deps = { + logger: { log: vi.fn() }, + outputError: vi.fn(), + signal: abortController.signal, + peerStateManager: { + getState: vi.fn().mockReturnValue({ + channel: undefined, + locationHints: ['hint1'], + }), + isIntentionallyClosed: vi.fn().mockReturnValue(false), + }, + reconnectionManager: { + isReconnecting: vi.fn().mockReturnValue(true), + shouldRetry: vi.fn().mockReturnValue(true), + incrementAttempt: vi.fn().mockReturnValue(1), + calculateBackoff: vi.fn().mockReturnValue(100), + startReconnection: vi.fn(), + stopReconnection: vi.fn(), + resetBackoff: vi.fn(), + }, + maxRetryAttempts: 3, + onRemoteGiveUp: vi.fn(), + dialPeer: vi.fn().mockResolvedValue(mockChannel), + reuseOrReturnChannel: vi.fn().mockResolvedValue(mockChannel), + checkConnectionLimit: vi.fn(), + registerChannel: vi.fn(), + } as unknown as ReconnectionLifecycleDeps; + }); + + describe('makeReconnectionLifecycle', () => { + it('returns handleConnectionLoss and attemptReconnection functions', () => { + const lifecycle = makeReconnectionLifecycle(deps); + + expect(typeof lifecycle.handleConnectionLoss).toBe('function'); + expect(typeof lifecycle.attemptReconnection).toBe('function'); + }); + }); + + describe('handleConnectionLoss', () => { + it('skips reconnection for intentionally closed peers', () => { + ( + deps.peerStateManager.isIntentionallyClosed as ReturnType + ).mockReturnValue(true); + const lifecycle = makeReconnectionLifecycle(deps); + + lifecycle.handleConnectionLoss('peer1'); + + expect(deps.reconnectionManager.startReconnection).not.toHaveBeenCalled(); + expect(deps.logger.log).toHaveBeenCalledWith( + expect.stringContaining('intentionally closed'), + ); + }); + + it('clears channel on connection loss', () => { + const state = { channel: mockChannel, locationHints: [] }; + ( + deps.peerStateManager.getState as ReturnType + ).mockReturnValue(state); + const lifecycle = makeReconnectionLifecycle(deps); + + lifecycle.handleConnectionLoss('peer1'); + + expect(state.channel).toBeUndefined(); + }); + + it('starts reconnection if not already reconnecting', () => { + ( + deps.reconnectionManager.isReconnecting as ReturnType + ).mockReturnValue(false); + const lifecycle = makeReconnectionLifecycle(deps); + + lifecycle.handleConnectionLoss('peer1'); + + expect(deps.reconnectionManager.startReconnection).toHaveBeenCalledWith( + 'peer1', + ); + }); + + it('does not start reconnection if already reconnecting', () => { + ( + deps.reconnectionManager.isReconnecting as ReturnType + ).mockReturnValue(true); + const lifecycle = makeReconnectionLifecycle(deps); + + lifecycle.handleConnectionLoss('peer1'); + + expect(deps.reconnectionManager.startReconnection).not.toHaveBeenCalled(); + }); + + it('logs connection loss message', () => { + ( + deps.reconnectionManager.isReconnecting as ReturnType + ).mockReturnValue(false); + const lifecycle = makeReconnectionLifecycle(deps); + + lifecycle.handleConnectionLoss('peer1'); + + expect(deps.logger.log).toHaveBeenCalledWith( + 'peer1:: connection lost, initiating reconnection', + ); + }); + }); + + describe('attemptReconnection', () => { + it('stops when max attempts reached', async () => { + ( + deps.reconnectionManager.shouldRetry as ReturnType + ).mockReturnValue(false); + const lifecycle = makeReconnectionLifecycle(deps); + + await lifecycle.attemptReconnection('peer1'); + + expect(deps.reconnectionManager.stopReconnection).toHaveBeenCalledWith( + 'peer1', + ); + expect(deps.onRemoteGiveUp).toHaveBeenCalledWith('peer1'); + }); + + it('dials peer with location hints', async () => { + // Make it succeed on first attempt + (deps.reconnectionManager.isReconnecting as ReturnType) + .mockReturnValueOnce(true) + .mockReturnValueOnce(false); + + const lifecycle = makeReconnectionLifecycle(deps); + + await lifecycle.attemptReconnection('peer1'); + + expect(deps.dialPeer).toHaveBeenCalledWith('peer1', ['hint1']); + }); + + it('registers channel on successful reconnection', async () => { + (deps.reconnectionManager.isReconnecting as ReturnType) + .mockReturnValueOnce(true) + .mockReturnValueOnce(false); + + const lifecycle = makeReconnectionLifecycle(deps); + + await lifecycle.attemptReconnection('peer1'); + + expect(deps.registerChannel).toHaveBeenCalledWith( + 'peer1', + mockChannel, + 'reading channel to', + ); + }); + + it('resets backoff on successful reconnection', async () => { + (deps.reconnectionManager.isReconnecting as ReturnType) + .mockReturnValueOnce(true) + .mockReturnValueOnce(false); + + const lifecycle = makeReconnectionLifecycle(deps); + + await lifecycle.attemptReconnection('peer1'); + + expect(deps.reconnectionManager.resetBackoff).toHaveBeenCalledWith( + 'peer1', + ); + expect(deps.reconnectionManager.stopReconnection).toHaveBeenCalledWith( + 'peer1', + ); + }); + + it('stops when signal is aborted during delay', async () => { + ( + kernelUtils.abortableDelay as ReturnType + ).mockRejectedValue(new Error('Aborted')); + abortController.abort(); + + const lifecycle = makeReconnectionLifecycle(deps); + + await lifecycle.attemptReconnection('peer1'); + + expect(deps.reconnectionManager.stopReconnection).toHaveBeenCalledWith( + 'peer1', + ); + }); + + it('stops when signal is aborted during dial', async () => { + (deps.dialPeer as ReturnType).mockRejectedValue( + new Error('Connection failed'), + ); + abortController.abort(); + + const lifecycle = makeReconnectionLifecycle(deps); + + await lifecycle.attemptReconnection('peer1'); + + expect(deps.reconnectionManager.stopReconnection).toHaveBeenCalled(); + }); + + it('gives up on non-retryable errors', async () => { + (deps.dialPeer as ReturnType).mockRejectedValueOnce( + new Error('Auth failed'), + ); + ( + kernelErrors.isRetryableNetworkError as ReturnType + ).mockReturnValue(false); + (deps.reconnectionManager.isReconnecting as ReturnType) + .mockReturnValueOnce(true) + .mockReturnValueOnce(false); + + const lifecycle = makeReconnectionLifecycle(deps); + + await lifecycle.attemptReconnection('peer1'); + + expect(deps.reconnectionManager.stopReconnection).toHaveBeenCalledWith( + 'peer1', + ); + expect(deps.onRemoteGiveUp).toHaveBeenCalledWith('peer1'); + expect(deps.outputError).toHaveBeenCalledWith( + 'peer1', + 'non-retryable failure', + expect.any(Error), + ); + }); + + it('retries on retryable errors', async () => { + (deps.dialPeer as ReturnType) + .mockRejectedValueOnce(new Error('Temporary failure')) + .mockResolvedValueOnce(mockChannel); + + ( + kernelErrors.isRetryableNetworkError as ReturnType + ).mockReturnValue(true); + (deps.reconnectionManager.isReconnecting as ReturnType) + .mockReturnValueOnce(true) // First attempt check + .mockReturnValueOnce(true) // Second attempt check + .mockReturnValueOnce(false); // Exit loop + + const lifecycle = makeReconnectionLifecycle(deps); + + await lifecycle.attemptReconnection('peer1'); + + expect(deps.dialPeer).toHaveBeenCalledTimes(2); + }); + + it('continues retry loop when reuseOrReturnChannel returns null', async () => { + (deps.reuseOrReturnChannel as ReturnType) + .mockResolvedValueOnce(null) + .mockResolvedValueOnce(mockChannel); + + (deps.reconnectionManager.isReconnecting as ReturnType) + .mockReturnValueOnce(true) // First attempt + .mockReturnValueOnce(true) // After null channel + .mockReturnValueOnce(false); // After success + + const lifecycle = makeReconnectionLifecycle(deps); + + await lifecycle.attemptReconnection('peer1'); + + expect(deps.reuseOrReturnChannel).toHaveBeenCalledTimes(2); + }); + + it('checks connection limit before registering', async () => { + (deps.reconnectionManager.isReconnecting as ReturnType) + .mockReturnValueOnce(true) + .mockReturnValueOnce(false); + + const lifecycle = makeReconnectionLifecycle(deps); + + await lifecycle.attemptReconnection('peer1'); + + expect(deps.checkConnectionLimit).toHaveBeenCalled(); + }); + + it('logs reconnection attempts', async () => { + (deps.reconnectionManager.isReconnecting as ReturnType) + .mockReturnValueOnce(true) + .mockReturnValueOnce(false); + + const lifecycle = makeReconnectionLifecycle(deps); + + await lifecycle.attemptReconnection('peer1'); + + expect(deps.logger.log).toHaveBeenCalledWith( + expect.stringContaining('scheduling reconnection attempt'), + ); + expect(deps.logger.log).toHaveBeenCalledWith( + expect.stringContaining('reconnection successful'), + ); + }); + + it('uses default max retry attempts when not specified', async () => { + deps.maxRetryAttempts = undefined; + ( + deps.reconnectionManager.shouldRetry as ReturnType + ).mockReturnValue(false); + + const lifecycle = makeReconnectionLifecycle(deps); + + await lifecycle.attemptReconnection('peer1'); + + // Should use DEFAULT_MAX_RETRY_ATTEMPTS (0 = infinite) + expect(deps.reconnectionManager.shouldRetry).toHaveBeenCalledWith( + 'peer1', + 0, + ); + }); + + it('cleans up reconnection state when loop exits naturally', async () => { + (deps.reconnectionManager.isReconnecting as ReturnType) + .mockReturnValueOnce(true) + .mockReturnValueOnce(false); + + const lifecycle = makeReconnectionLifecycle(deps); + + await lifecycle.attemptReconnection('peer1'); + + // stopReconnection should be called on success + expect(deps.reconnectionManager.stopReconnection).toHaveBeenCalled(); + }); + }); +}); diff --git a/packages/ocap-kernel/src/remotes/platform/reconnection-lifecycle.ts b/packages/ocap-kernel/src/remotes/platform/reconnection-lifecycle.ts new file mode 100644 index 000000000..5cc950f52 --- /dev/null +++ b/packages/ocap-kernel/src/remotes/platform/reconnection-lifecycle.ts @@ -0,0 +1,200 @@ +import { isRetryableNetworkError } from '@metamask/kernel-errors'; +import { + abortableDelay, + DEFAULT_MAX_RETRY_ATTEMPTS, +} from '@metamask/kernel-utils'; +import type { Logger } from '@metamask/logger'; + +import type { ErrorLogger } from './channel-utils.ts'; +import type { PeerStateManager, PeerState } from './peer-state-manager.ts'; +import type { ReconnectionManager } from './reconnection.ts'; +import type { Channel, OnRemoteGiveUp } from '../types.ts'; + +/** + * Dependencies for creating a reconnection lifecycle handler. + */ +export type ReconnectionLifecycleDeps = { + logger: Logger; + outputError: ErrorLogger; + signal: AbortSignal; + peerStateManager: PeerStateManager; + reconnectionManager: ReconnectionManager; + maxRetryAttempts: number | undefined; + onRemoteGiveUp: OnRemoteGiveUp | undefined; + dialPeer: (peerId: string, hints: string[]) => Promise; + reuseOrReturnChannel: ( + peerId: string, + dialedChannel: Channel, + ) => Promise; + checkConnectionLimit: () => void; + registerChannel: ( + peerId: string, + channel: Channel, + errorContext?: string, + ) => void; +}; + +/** + * Result of creating a reconnection lifecycle handler. + */ +export type ReconnectionLifecycle = { + handleConnectionLoss: (peerId: string) => void; + attemptReconnection: (peerId: string, maxAttempts?: number) => Promise; +}; + +/** + * Creates a reconnection lifecycle handler for managing connection loss and reconnection attempts. + * + * @param deps - Dependencies for the reconnection lifecycle. + * @returns Functions for handling connection loss and reconnection. + */ +export function makeReconnectionLifecycle( + deps: ReconnectionLifecycleDeps, +): ReconnectionLifecycle { + const { + logger, + outputError, + signal, + peerStateManager, + reconnectionManager, + maxRetryAttempts, + onRemoteGiveUp, + dialPeer, + reuseOrReturnChannel, + checkConnectionLimit, + registerChannel, + } = deps; + + /** + * Attempt to reconnect to a peer after connection loss. + * Single orchestration loop per peer; abortable. + * + * @param peerId - The peer ID to reconnect to. + * @param maxAttempts - The maximum number of reconnection attempts. 0 = infinite. + */ + async function attemptReconnection( + peerId: string, + maxAttempts = maxRetryAttempts ?? DEFAULT_MAX_RETRY_ATTEMPTS, + ): Promise { + const state = peerStateManager.getState(peerId); + + while (reconnectionManager.isReconnecting(peerId) && !signal.aborted) { + if (!reconnectionManager.shouldRetry(peerId, maxAttempts)) { + logger.log( + `${peerId}:: max reconnection attempts (${maxAttempts}) reached, giving up`, + ); + reconnectionManager.stopReconnection(peerId); + onRemoteGiveUp?.(peerId); + return; + } + + const nextAttempt = reconnectionManager.incrementAttempt(peerId); + const delayMs = reconnectionManager.calculateBackoff(peerId); + logger.log( + `${peerId}:: scheduling reconnection attempt ${nextAttempt}${maxAttempts ? `/${maxAttempts}` : ''} in ${delayMs}ms`, + ); + + try { + await abortableDelay(delayMs, signal); + } catch (error) { + if (signal.aborted) { + reconnectionManager.stopReconnection(peerId); + return; + } + throw error; + } + + logger.log( + `${peerId}:: reconnection attempt ${nextAttempt}${maxAttempts ? `/${maxAttempts}` : ''}`, + ); + + try { + const channel = await tryReconnect(state, peerId); + if (!channel) { + // Channel was closed and existing also died - continue retry loop + continue; + } + + logger.log(`${peerId}:: reconnection successful`); + reconnectionManager.resetBackoff(peerId); + reconnectionManager.stopReconnection(peerId); + return; // success + } catch (problem) { + if (signal.aborted) { + reconnectionManager.stopReconnection(peerId); + return; + } + if (!isRetryableNetworkError(problem)) { + outputError(peerId, `non-retryable failure`, problem); + reconnectionManager.stopReconnection(peerId); + onRemoteGiveUp?.(peerId); + return; + } + outputError(peerId, `reconnection attempt ${nextAttempt}`, problem); + // loop to next attempt + } + } + // Loop exited - clean up reconnection state + if (reconnectionManager.isReconnecting(peerId)) { + reconnectionManager.stopReconnection(peerId); + } + } + + /** + * Try to reconnect to a peer. + * + * @param state - The peer state. + * @param peerId - The peer ID. + * @returns The channel if successful, null if should retry. + */ + async function tryReconnect( + state: PeerState, + peerId: string, + ): Promise { + const { locationHints: hints } = state; + const dialedChannel = await dialPeer(peerId, hints); + + // Handle race condition - check if an existing channel appeared + const channel = await reuseOrReturnChannel(peerId, dialedChannel); + if (!channel) { + return null; + } + + // Re-check connection limit and register if this is a new channel + if (state.channel !== channel) { + checkConnectionLimit(); + registerChannel(peerId, channel, 'reading channel to'); + } + + return channel; + } + + /** + * Handle connection loss for a given peer ID. + * Skips reconnection if the peer was intentionally closed. + * + * @param peerId - The peer ID to handle the connection loss for. + */ + function handleConnectionLoss(peerId: string): void { + // Don't reconnect if this peer intentionally closed the connection + if (peerStateManager.isIntentionallyClosed(peerId)) { + logger.log( + `${peerId}:: connection lost but peer intentionally closed, skipping reconnection`, + ); + return; + } + logger.log(`${peerId}:: connection lost, initiating reconnection`); + const state = peerStateManager.getState(peerId); + state.channel = undefined; + + if (!reconnectionManager.isReconnecting(peerId)) { + reconnectionManager.startReconnection(peerId); + attemptReconnection(peerId).catch((problem) => { + outputError(peerId, 'reconnection error', problem); + reconnectionManager.stopReconnection(peerId); + }); + } + } + + return { handleConnectionLoss, attemptReconnection }; +} diff --git a/packages/ocap-kernel/src/remotes/platform/transport.ts b/packages/ocap-kernel/src/remotes/platform/transport.ts index 3a70d7060..dfc4e8282 100644 --- a/packages/ocap-kernel/src/remotes/platform/transport.ts +++ b/packages/ocap-kernel/src/remotes/platform/transport.ts @@ -1,18 +1,25 @@ -import { - AbortError, - isRetryableNetworkError, - ResourceLimitError, -} from '@metamask/kernel-errors'; -import { - abortableDelay, - DEFAULT_MAX_RETRY_ATTEMPTS, - installWakeDetector, -} from '@metamask/kernel-utils'; +import { AbortError, ResourceLimitError } from '@metamask/kernel-errors'; +import { installWakeDetector } from '@metamask/kernel-utils'; import { Logger } from '@metamask/logger'; import { toString as bufToString, fromString } from 'uint8arrays'; +import { makeErrorLogger, writeWithTimeout } from './channel-utils.ts'; import { ConnectionFactory } from './connection-factory.ts'; +import { + DEFAULT_CLEANUP_INTERVAL_MS, + DEFAULT_MAX_CONCURRENT_CONNECTIONS, + DEFAULT_MAX_MESSAGE_SIZE_BYTES, + DEFAULT_STALE_PEER_TIMEOUT_MS, + DEFAULT_WRITE_TIMEOUT_MS, + SCTP_USER_INITIATED_ABORT, +} from './constants.ts'; +import { PeerStateManager } from './peer-state-manager.ts'; +import { makeReconnectionLifecycle } from './reconnection-lifecycle.ts'; import { ReconnectionManager } from './reconnection.ts'; +import { + makeConnectionLimitChecker, + makeMessageSizeValidator, +} from './validators.ts'; import type { RemoteMessageHandler, SendRemoteMessage, @@ -22,18 +29,6 @@ import type { RemoteCommsOptions, } from '../types.ts'; -/** Default maximum number of concurrent connections */ -const DEFAULT_MAX_CONCURRENT_CONNECTIONS = 100; - -/** Default maximum message size in bytes (1MB) */ -const DEFAULT_MAX_MESSAGE_SIZE_BYTES = 1024 * 1024; - -/** Default stale peer cleanup interval in milliseconds (15 minutes) */ -const DEFAULT_CLEANUP_INTERVAL_MS = 15 * 60 * 1000; - -/** Default stale peer timeout in milliseconds (1 hour) */ -const DEFAULT_STALE_PEER_TIMEOUT_MS = 60 * 60 * 1000; - /** * Initialize the remote comm system with information that must be provided by the kernel. * @@ -75,11 +70,26 @@ export async function initTransport( const stopController = new AbortController(); const { signal } = stopController; const logger = new Logger(); + const outputError = makeErrorLogger(logger); const reconnectionManager = new ReconnectionManager(); - const intentionallyClosed = new Set(); // Peers that intentionally closed connections - const lastConnectionTime = new Map(); // Track last connection time for cleanup - const messageEncoder = new TextEncoder(); // Reused for message size validation + const peerStateManager = new PeerStateManager(logger, stalePeerTimeoutMs); + const validateMessageSize = makeMessageSizeValidator(maxMessageSizeBytes); + const checkConnectionLimit = makeConnectionLimitChecker( + maxConcurrentConnections, + () => peerStateManager.countActiveConnections(), + ); let cleanupIntervalId: ReturnType | undefined; + // Holder for handleConnectionLoss - initialized later after all dependencies are defined + // This breaks the circular dependency between readChannel → handleConnectionLoss → registerChannel + const reconnectionHolder: { + handleConnectionLoss: ((peerId: string) => void) | undefined; + } = { handleConnectionLoss: undefined }; + const handleConnectionLoss = (peerId: string): void => { + if (!reconnectionHolder.handleConnectionLoss) { + throw new Error('handleConnectionLoss not initialized'); + } + reconnectionHolder.handleConnectionLoss(peerId); + }; const connectionFactory = await ConnectionFactory.make( keySeed, relays, @@ -88,91 +98,6 @@ export async function initTransport( maxRetryAttempts, ); - // Per-peer connection state (simplified - just channel and hints) - type SimplePeerState = { - channel: Channel | undefined; - locationHints: string[]; - }; - const peerStates = new Map(); - - /** - * Get or create peer connection state. - * - * @param peerId - The peer ID. - * @returns The peer connection state. - */ - function getPeerState(peerId: string): SimplePeerState { - let state = peerStates.get(peerId); - if (!state) { - state = { channel: undefined, locationHints: [] }; - peerStates.set(peerId, state); - // Initialize lastConnectionTime to enable stale peer cleanup - // even for peers that never successfully connect - if (!lastConnectionTime.has(peerId)) { - lastConnectionTime.set(peerId, Date.now()); - } - } - return state; - } - - /** - * Count the number of active connections (peers with channels). - * - * @returns The number of active connections. - */ - function countActiveConnections(): number { - let count = 0; - for (const state of peerStates.values()) { - if (state.channel) { - count += 1; - } - } - return count; - } - - /** - * Validate that a message does not exceed the size limit. - * - * @param message - The message to validate. - * @throws ResourceLimitError if message exceeds size limit. - */ - function validateMessageSize(message: string): void { - const messageSizeBytes = messageEncoder.encode(message).length; - if (messageSizeBytes > maxMessageSizeBytes) { - throw new ResourceLimitError( - `Message size ${messageSizeBytes} bytes exceeds limit of ${maxMessageSizeBytes} bytes`, - { - data: { - limitType: 'messageSize', - current: messageSizeBytes, - limit: maxMessageSizeBytes, - }, - }, - ); - } - } - - /** - * Check if we can establish a new connection (within connection limit). - * - * @throws ResourceLimitError if connection limit is reached. - */ - function checkConnectionLimit(): void { - const currentConnections = countActiveConnections(); - if (currentConnections >= maxConcurrentConnections) { - throw new ResourceLimitError( - `Connection limit reached: ${currentConnections}/${maxConcurrentConnections} concurrent connections`, - { - data: { - limitType: 'connection', - current: currentConnections, - limit: maxConcurrentConnections, - }, - }, - ); - } - } - /** * Clean up stale peer data for peers inactive for more than stalePeerTimeoutMs. * A peer is considered stale if: @@ -180,84 +105,10 @@ export async function initTransport( * - It has been inactive for more than stalePeerTimeoutMs */ function cleanupStalePeers(): void { - const now = Date.now(); - const peersToCleanup: string[] = []; - - for (const [peerId, lastTime] of lastConnectionTime.entries()) { - const state = peerStates.get(peerId); - const timeSinceLastActivity = now - lastTime; - - // Only clean up peers that: - // - Have no active channel - // - Inactive for more than stalePeerTimeoutMs - if (!state?.channel && timeSinceLastActivity > stalePeerTimeoutMs) { - peersToCleanup.push(peerId); - } - } - - for (const peerId of peersToCleanup) { - const lastTime = lastConnectionTime.get(peerId); - logger.log( - `Cleaning up stale peer ${peerId} (inactive for ${lastTime ? Date.now() - lastTime : 'unknown'}ms)`, - ); - // Clean up all peer-related state - peerStates.delete(peerId); + const stalePeers = peerStateManager.getStalePeers(); + for (const peerId of stalePeers) { + peerStateManager.removePeer(peerId); reconnectionManager.stopReconnection(peerId); - intentionallyClosed.delete(peerId); - lastConnectionTime.delete(peerId); - } - } - - /** - * Output an error message. - * - * @param peerId - The peer ID that the error is associated with. - * @param task - The task that the error is associated with. - * @param problem - The error itself. - */ - function outputError(peerId: string, task: string, problem: unknown): void { - if (problem) { - const realProblem: Error = problem as Error; - logger.log(`${peerId}:: error ${task}: ${realProblem}`); - } else { - logger.log(`${peerId}:: error ${task}`); - } - } - - /** - * Write a message to a channel stream with a timeout. - * - * @param channel - The channel to write to. - * @param message - The message bytes to write. - * @param timeoutMs - Timeout in milliseconds (default: 10 seconds). - * @returns Promise that resolves when the write completes or rejects on timeout. - * @throws Error if the write times out or fails. - */ - async function writeWithTimeout( - channel: Channel, - message: Uint8Array, - timeoutMs = 10_000, - ): Promise { - const timeoutSignal = AbortSignal.timeout(timeoutMs); - let abortHandler: (() => void) | undefined; - const timeoutPromise = new Promise((_resolve, reject) => { - abortHandler = () => { - reject(Error(`Message send timed out after ${timeoutMs}ms`)); - }; - timeoutSignal.addEventListener('abort', abortHandler); - }); - - try { - return await Promise.race([ - channel.msgStream.write(message), - timeoutPromise, - ]); - } finally { - // Clean up event listener to prevent unhandled rejection if operation - // completes before timeout - if (abortHandler) { - timeoutSignal.removeEventListener('abort', abortHandler); - } } } @@ -274,10 +125,10 @@ export async function initTransport( channel: Channel, errorContext = 'reading channel to', ): void { - const state = getPeerState(peerId); + const state = peerStateManager.getState(peerId); const previousChannel = state.channel; state.channel = channel; - lastConnectionTime.set(peerId, Date.now()); + peerStateManager.updateConnectionTime(peerId); readChannel(channel).catch((problem) => { outputError(peerId, errorContext, problem); }); @@ -310,7 +161,7 @@ export async function initTransport( peerId: string, dialedChannel: Channel, ): Promise { - const state = getPeerState(peerId); + const state = peerStateManager.getState(peerId); const existingChannel = state.channel; if (existingChannel) { // Close the dialed channel if it's different from the existing one @@ -379,7 +230,6 @@ export async function initTransport( * @param channel - The channel to read from. */ async function readChannel(channel: Channel): Promise { - const SCTP_USER_INITIATED_ABORT = 12; // RFC 4960 try { for (;;) { if (signal.aborted) { @@ -401,7 +251,7 @@ export async function initTransport( ) { logger.log(`${channel.peerId}:: remote intentionally disconnected`); // Mark as intentionally closed and don't trigger reconnection - intentionallyClosed.add(channel.peerId); + peerStateManager.markIntentionallyClosed(channel.peerId); } else { outputError( channel.peerId, @@ -416,7 +266,7 @@ export async function initTransport( } if (readBuf) { reconnectionManager.resetBackoff(channel.peerId); // successful inbound traffic - lastConnectionTime.set(channel.peerId, Date.now()); + peerStateManager.updateConnectionTime(channel.peerId); await receiveMessage(channel.peerId, bufToString(readBuf.subarray())); } else { // Stream ended (returned undefined), exit the read loop @@ -427,134 +277,30 @@ export async function initTransport( } finally { // Always remove the channel when readChannel exits to prevent stale channels // This ensures that subsequent sends will establish a new connection - const state = getPeerState(channel.peerId); + const state = peerStateManager.getState(channel.peerId); if (state.channel === channel) { state.channel = undefined; } } } - /** - * Handle connection loss for a given peer ID. - * Skips reconnection if the peer was intentionally closed. - * - * @param peerId - The peer ID to handle the connection loss for. - */ - function handleConnectionLoss(peerId: string): void { - // Don't reconnect if this peer intentionally closed the connection - if (intentionallyClosed.has(peerId)) { - logger.log( - `${peerId}:: connection lost but peer intentionally closed, skipping reconnection`, - ); - return; - } - logger.log(`${peerId}:: connection lost, initiating reconnection`); - const state = getPeerState(peerId); - state.channel = undefined; - - if (!reconnectionManager.isReconnecting(peerId)) { - reconnectionManager.startReconnection(peerId); - attemptReconnection(peerId).catch((problem) => { - outputError(peerId, 'reconnection error', problem); - reconnectionManager.stopReconnection(peerId); - }); - } - } - - /** - * Attempt to reconnect to a peer after connection loss. - * Single orchestration loop per peer; abortable. - * - * @param peerId - The peer ID to reconnect to. - * @param maxAttempts - The maximum number of reconnection attempts. 0 = infinite. - */ - async function attemptReconnection( - peerId: string, - maxAttempts = maxRetryAttempts ?? DEFAULT_MAX_RETRY_ATTEMPTS, - ): Promise { - const state = getPeerState(peerId); - - while (reconnectionManager.isReconnecting(peerId) && !signal.aborted) { - if (!reconnectionManager.shouldRetry(peerId, maxAttempts)) { - logger.log( - `${peerId}:: max reconnection attempts (${maxAttempts}) reached, giving up`, - ); - reconnectionManager.stopReconnection(peerId); - onRemoteGiveUp?.(peerId); - return; - } - - const nextAttempt = reconnectionManager.incrementAttempt(peerId); - const delayMs = reconnectionManager.calculateBackoff(peerId); - logger.log( - `${peerId}:: scheduling reconnection attempt ${nextAttempt}${maxAttempts ? `/${maxAttempts}` : ''} in ${delayMs}ms`, - ); - - try { - await abortableDelay(delayMs, signal); - } catch (error) { - if (signal.aborted) { - reconnectionManager.stopReconnection(peerId); - return; - } - throw error; - } - - logger.log( - `${peerId}:: reconnection attempt ${nextAttempt}${maxAttempts ? `/${maxAttempts}` : ''}`, - ); - - try { - const { locationHints: hints } = state; - const dialedChannel = await connectionFactory.dialIdempotent( - peerId, - hints, - false, // No retry here, we're already in a retry loop - ); - - // Handle race condition - check if an existing channel appeared - const channel = await reuseOrReturnChannel(peerId, dialedChannel); - if (!channel) { - // Channel was closed and existing also died - continue retry loop - continue; - } - - // Re-check connection limit after reuseOrReturnChannel to prevent race conditions - if (state.channel !== channel) { - checkConnectionLimit(); - } - - // Only register if this is a new channel (not reusing existing) - if (state.channel !== channel) { - registerChannel(peerId, channel, 'reading channel to'); - } - - logger.log(`${peerId}:: reconnection successful`); - - // Connection established - RemoteHandle will retransmit unACKed messages - reconnectionManager.resetBackoff(peerId); - reconnectionManager.stopReconnection(peerId); - return; // success - } catch (problem) { - if (signal.aborted) { - reconnectionManager.stopReconnection(peerId); - return; - } - if (!isRetryableNetworkError(problem)) { - outputError(peerId, `non-retryable failure`, problem); - reconnectionManager.stopReconnection(peerId); - onRemoteGiveUp?.(peerId); - return; - } - outputError(peerId, `reconnection attempt ${nextAttempt}`, problem); - // loop to next attempt - } - } - // Loop exited - clean up reconnection state - if (reconnectionManager.isReconnecting(peerId)) { - reconnectionManager.stopReconnection(peerId); - } - } + // Initialize reconnection lifecycle and bind to the holder + const reconnectionLifecycle = makeReconnectionLifecycle({ + logger, + outputError, + signal, + peerStateManager, + reconnectionManager, + maxRetryAttempts, + onRemoteGiveUp, + dialPeer: async (peerId, hints) => + connectionFactory.dialIdempotent(peerId, hints, false), + reuseOrReturnChannel, + checkConnectionLimit, + registerChannel, + }); + reconnectionHolder.handleConnectionLoss = + reconnectionLifecycle.handleConnectionLoss; /** * Send a message string to a peer. @@ -573,14 +319,14 @@ export async function initTransport( } // Check if peer is intentionally closed - if (intentionallyClosed.has(targetPeerId)) { + if (peerStateManager.isIntentionallyClosed(targetPeerId)) { throw Error('Message delivery failed after intentional close'); } // Validate message size before sending validateMessageSize(message); - const state = getPeerState(targetPeerId); + const state = peerStateManager.getState(targetPeerId); // Get or establish channel let { channel } = state; @@ -624,8 +370,12 @@ export async function initTransport( } try { - await writeWithTimeout(channel, fromString(message), 10_000); - lastConnectionTime.set(targetPeerId, Date.now()); + await writeWithTimeout( + channel, + fromString(message), + DEFAULT_WRITE_TIMEOUT_MS, + ); + peerStateManager.updateConnectionTime(targetPeerId); reconnectionManager.resetBackoff(targetPeerId); } catch (problem) { outputError(targetPeerId, `sending message`, problem); @@ -645,7 +395,7 @@ export async function initTransport( // Set up inbound connection handler connectionFactory.onInboundConnection((channel) => { // Reject inbound connections from intentionally closed peers - if (intentionallyClosed.has(channel.peerId)) { + if (peerStateManager.isIntentionallyClosed(channel.peerId)) { logger.log( `${channel.peerId}:: rejecting inbound connection from intentionally closed peer`, ); @@ -685,8 +435,8 @@ export async function initTransport( */ async function closeConnection(peerId: string): Promise { logger.log(`${peerId}:: explicitly closing connection`); - intentionallyClosed.add(peerId); - const state = getPeerState(peerId); + peerStateManager.markIntentionallyClosed(peerId); + const state = peerStateManager.getState(peerId); // Remove channel - the readChannel cleanup will handle stream closure state.channel = undefined; if (reconnectionManager.isReconnecting(peerId)) { @@ -701,17 +451,7 @@ export async function initTransport( * @param hints - Location hints for the peer. */ function registerLocationHints(peerId: string, hints: string[]): void { - const state = getPeerState(peerId); - const { locationHints: oldHints } = state; - if (oldHints.length > 0) { - const newHints = new Set(oldHints); - for (const hint of hints) { - newHints.add(hint); - } - state.locationHints = Array.from(newHints); - } else { - state.locationHints = Array.from(hints); - } + peerStateManager.addLocationHints(peerId, hints); } /** @@ -726,7 +466,7 @@ export async function initTransport( hints: string[] = [], ): Promise { logger.log(`${peerId}:: manually reconnecting after intentional close`); - intentionallyClosed.delete(peerId); + peerStateManager.clearIntentionallyClosed(peerId); // If already reconnecting, don't start another attempt if (reconnectionManager.isReconnecting(peerId)) { return; @@ -752,7 +492,7 @@ export async function initTransport( } stopController.abort(); // cancels all delays and dials // Close all active channel streams to unblock pending reads - for (const state of peerStates.values()) { + for (const state of peerStateManager.getAllStates()) { const { channel } = state; if (channel) { try { @@ -766,10 +506,8 @@ export async function initTransport( } } await connectionFactory.stop(); - peerStates.clear(); + peerStateManager.clear(); reconnectionManager.clear(); - intentionallyClosed.clear(); - lastConnectionTime.clear(); } // Return the sender with a stop handle and connection management functions diff --git a/packages/ocap-kernel/src/remotes/platform/validators.test.ts b/packages/ocap-kernel/src/remotes/platform/validators.test.ts new file mode 100644 index 000000000..771671a27 --- /dev/null +++ b/packages/ocap-kernel/src/remotes/platform/validators.test.ts @@ -0,0 +1,146 @@ +import { ResourceLimitError } from '@metamask/kernel-errors'; +import { describe, it, expect } from 'vitest'; + +import { DEFAULT_MAX_MESSAGE_SIZE_BYTES } from './constants.ts'; +import { + makeMessageSizeValidator, + makeConnectionLimitChecker, +} from './validators.ts'; + +describe('validators', () => { + describe('makeMessageSizeValidator', () => { + it('creates a validator with default max size', () => { + const validator = makeMessageSizeValidator(); + const smallMessage = 'hello'; + + expect(() => validator(smallMessage)).not.toThrow(); + }); + + it('creates a validator with custom max size', () => { + const validator = makeMessageSizeValidator(10); + const smallMessage = 'hi'; + + expect(() => validator(smallMessage)).not.toThrow(); + }); + + it('allows messages within the size limit', () => { + const validator = makeMessageSizeValidator(100); + const message = 'a'.repeat(100); + + expect(() => validator(message)).not.toThrow(); + }); + + it('throws ResourceLimitError for messages exceeding size limit', () => { + const validator = makeMessageSizeValidator(10); + const largeMessage = 'a'.repeat(20); + + expect(() => validator(largeMessage)).toThrow(ResourceLimitError); + }); + + it('includes correct error data when throwing', () => { + const maxSize = 10; + const validator = makeMessageSizeValidator(maxSize); + const largeMessage = 'a'.repeat(20); + + let thrownError: ResourceLimitError | undefined; + try { + validator(largeMessage); + } catch (error) { + thrownError = error as ResourceLimitError; + } + + expect(thrownError).toBeInstanceOf(ResourceLimitError); + expect(thrownError?.data).toStrictEqual({ + limitType: 'messageSize', + current: 20, + limit: maxSize, + }); + }); + + it('calculates byte size correctly for multi-byte characters', () => { + const validator = makeMessageSizeValidator(10); + // Each emoji is 4 bytes in UTF-8 + const emojiMessage = '😀😀😀'; // 12 bytes + + expect(() => validator(emojiMessage)).toThrow(ResourceLimitError); + }); + + it('uses default max size constant', () => { + const validator = makeMessageSizeValidator(); + // Create a message just under the default limit + const message = 'a'.repeat(DEFAULT_MAX_MESSAGE_SIZE_BYTES - 1); + + expect(() => validator(message)).not.toThrow(); + }); + }); + + describe('makeConnectionLimitChecker', () => { + it('creates a checker that allows connections under the limit', () => { + const checker = makeConnectionLimitChecker(10, () => 5); + + expect(() => checker()).not.toThrow(); + }); + + it('throws ResourceLimitError when at the limit', () => { + const checker = makeConnectionLimitChecker(10, () => 10); + + expect(() => checker()).toThrow(ResourceLimitError); + }); + + it('throws ResourceLimitError when over the limit', () => { + const checker = makeConnectionLimitChecker(10, () => 15); + + expect(() => checker()).toThrow(ResourceLimitError); + }); + + it('includes correct error data when throwing', () => { + const maxConnections = 5; + const currentConnections = 5; + const checker = makeConnectionLimitChecker( + maxConnections, + () => currentConnections, + ); + + let thrownError: ResourceLimitError | undefined; + try { + checker(); + } catch (error) { + thrownError = error as ResourceLimitError; + } + + expect(thrownError).toBeInstanceOf(ResourceLimitError); + expect(thrownError?.data).toStrictEqual({ + limitType: 'connection', + current: currentConnections, + limit: maxConnections, + }); + }); + + it('calls getActiveConnectionCount on each check', () => { + let connectionCount = 0; + const checker = makeConnectionLimitChecker(10, () => connectionCount); + + expect(() => checker()).not.toThrow(); + + connectionCount = 10; + expect(() => checker()).toThrow(ResourceLimitError); + + connectionCount = 5; + expect(() => checker()).not.toThrow(); + }); + + it('allows zero connections', () => { + const checker = makeConnectionLimitChecker(10, () => 0); + + expect(() => checker()).not.toThrow(); + }); + + it('handles limit of 1', () => { + const checker = makeConnectionLimitChecker(1, () => 0); + expect(() => checker()).not.toThrow(); + + const checkerAtLimit = makeConnectionLimitChecker(1, () => 1); + expect(() => checkerAtLimit()).toThrow(ResourceLimitError); + }); + }); +}); diff --git a/packages/ocap-kernel/src/remotes/platform/validators.ts b/packages/ocap-kernel/src/remotes/platform/validators.ts new file mode 100644 index 000000000..117ede476 --- /dev/null +++ b/packages/ocap-kernel/src/remotes/platform/validators.ts @@ -0,0 +1,59 @@ +import { ResourceLimitError } from '@metamask/kernel-errors'; + +import { DEFAULT_MAX_MESSAGE_SIZE_BYTES } from './constants.ts'; + +/** + * Creates a message size validator function. + * + * @param maxMessageSizeBytes - Maximum allowed message size in bytes. + * @returns A function that validates message size. + */ +export function makeMessageSizeValidator( + maxMessageSizeBytes = DEFAULT_MAX_MESSAGE_SIZE_BYTES, +): (message: string) => void { + const encoder = new TextEncoder(); + + return (message: string): void => { + const messageSizeBytes = encoder.encode(message).length; + if (messageSizeBytes > maxMessageSizeBytes) { + throw new ResourceLimitError( + `Message size ${messageSizeBytes} bytes exceeds limit of ${maxMessageSizeBytes} bytes`, + { + data: { + limitType: 'messageSize', + current: messageSizeBytes, + limit: maxMessageSizeBytes, + }, + }, + ); + } + }; +} + +/** + * Creates a connection limit checker function. + * + * @param maxConcurrentConnections - Maximum allowed concurrent connections. + * @param getActiveConnectionCount - Function to get current active connection count. + * @returns A function that checks connection limits. + */ +export function makeConnectionLimitChecker( + maxConcurrentConnections: number, + getActiveConnectionCount: () => number, +): () => void { + return (): void => { + const currentConnections = getActiveConnectionCount(); + if (currentConnections >= maxConcurrentConnections) { + throw new ResourceLimitError( + `Connection limit reached: ${currentConnections}/${maxConcurrentConnections} concurrent connections`, + { + data: { + limitType: 'connection', + current: currentConnections, + limit: maxConcurrentConnections, + }, + }, + ); + } + }; +}