Skip to content
Open
153 changes: 153 additions & 0 deletions packages/ocap-kernel/src/remotes/platform/channel-utils.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,153 @@
import { describe, it, expect, vi, beforeEach } from 'vitest';

import { makeErrorLogger, writeWithTimeout } from './channel-utils.ts';
import type { Channel } from '../types.ts';

describe('channel-utils', () => {
describe('makeErrorLogger', () => {
it('creates an error logger function', () => {
const mockLogger = { log: vi.fn() };
const outputError = makeErrorLogger(mockLogger);

expect(typeof outputError).toBe('function');
});

it('logs error with peer context when problem is provided', () => {
const mockLogger = { log: vi.fn() };
const outputError = makeErrorLogger(mockLogger);
const error = new Error('test error');

outputError('peer123', 'sending message', error);

expect(mockLogger.log).toHaveBeenCalledWith(
'peer123:: error sending message: Error: test error',
);
});

it('logs error without problem details when problem is null', () => {
const mockLogger = { log: vi.fn() };
const outputError = makeErrorLogger(mockLogger);

outputError('peer123', 'connection failed', null);

expect(mockLogger.log).toHaveBeenCalledWith(
'peer123:: error connection failed',
);
});

it('logs error without problem details when problem is undefined', () => {
const mockLogger = { log: vi.fn() };
const outputError = makeErrorLogger(mockLogger);

outputError('peer123', 'timeout', undefined);

expect(mockLogger.log).toHaveBeenCalledWith('peer123:: error timeout');
});

it('handles non-Error objects as problems', () => {
const mockLogger = { log: vi.fn() };
const outputError = makeErrorLogger(mockLogger);
const problem = { message: 'custom error' };

outputError('peer456', 'processing', problem);

expect(mockLogger.log).toHaveBeenCalledWith(
'peer456:: error processing: [object Object]',
);
});

it('handles string problems', () => {
const mockLogger = { log: vi.fn() };
const outputError = makeErrorLogger(mockLogger);

outputError('peer789', 'reading', 'string error');

expect(mockLogger.log).toHaveBeenCalledWith(
'peer789:: error reading: string error',
);
});
});

describe('writeWithTimeout', () => {
let mockChannel: Channel;
let writeResolve: () => void;
let writeReject: (error: Error) => void;

beforeEach(() => {
const writePromise = new Promise<void>((resolve, reject) => {
writeResolve = resolve;
writeReject = reject;
});

mockChannel = {
peerId: 'testPeer',
msgStream: {
write: vi.fn().mockReturnValue(writePromise),
read: vi.fn(),
unwrap: vi.fn(),
},
} as unknown as Channel;
});

it('writes message to channel', async () => {
const message = new Uint8Array([1, 2, 3]);

const writePromise = writeWithTimeout(mockChannel, message, 1000);
writeResolve();
await writePromise;

expect(mockChannel.msgStream.write).toHaveBeenCalledWith(message);
});

it('resolves when write completes before timeout', async () => {
const message = new Uint8Array([1, 2, 3]);

const writePromise = writeWithTimeout(mockChannel, message, 1000);
writeResolve();

expect(await writePromise).toBeUndefined();
});

it('rejects with timeout error when write takes too long', async () => {
const message = new Uint8Array([1, 2, 3]);

const writePromise = writeWithTimeout(mockChannel, message, 50);

await expect(writePromise).rejects.toThrow(
'Message send timed out after 50ms',
);
});

it('uses default timeout when not specified', async () => {
const message = new Uint8Array([1, 2, 3]);

const writePromise = writeWithTimeout(mockChannel, message);
writeResolve();

expect(await writePromise).toBeUndefined();
});

it('rejects with write error when write fails', async () => {
const message = new Uint8Array([1, 2, 3]);
const writeError = new Error('Write failed');

const writePromise = writeWithTimeout(mockChannel, message, 1000);
writeReject(writeError);

await expect(writePromise).rejects.toThrow('Write failed');
});

it('cleans up timeout listener after successful write', async () => {
const message = new Uint8Array([1, 2, 3]);

const writePromise = writeWithTimeout(mockChannel, message, 1000);
writeResolve();
const result = await writePromise;

// If cleanup didn't happen, there would be an unhandled rejection
// when the timeout fires. This test verifies no error is thrown.
await new Promise((resolve) => setTimeout(resolve, 100));
expect(result).toBeUndefined();
});
});
});
67 changes: 67 additions & 0 deletions packages/ocap-kernel/src/remotes/platform/channel-utils.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
import type { Logger } from '@metamask/logger';

import { DEFAULT_WRITE_TIMEOUT_MS } from './constants.ts';
import type { Channel } from '../types.ts';

/**
* Type for error logging function.
*/
export type ErrorLogger = (
peerId: string,
task: string,
problem: unknown,
) => void;

/**
* Creates an error logging function for transport operations.
*
* @param logger - The logger instance to use.
* @returns A function that logs errors with peer context.
*/
export function makeErrorLogger(logger: Logger): ErrorLogger {
return (peerId: string, task: string, problem: unknown): void => {
if (problem) {
const realProblem: Error = problem as Error;
logger.log(`${peerId}:: error ${task}: ${realProblem}`);
} else {
logger.log(`${peerId}:: error ${task}`);
}
};
}

/**
* Write a message to a channel stream with a timeout.
*
* @param channel - The channel to write to.
* @param message - The message bytes to write.
* @param timeoutMs - Timeout in milliseconds (default: 10 seconds).
* @returns Promise that resolves when the write completes or rejects on timeout.
* @throws Error if the write times out or fails.
*/
export async function writeWithTimeout(
channel: Channel,
message: Uint8Array,
timeoutMs = DEFAULT_WRITE_TIMEOUT_MS,
): Promise<void> {
const timeoutSignal = AbortSignal.timeout(timeoutMs);
let abortHandler: (() => void) | undefined;
const timeoutPromise = new Promise<never>((_resolve, reject) => {
abortHandler = () => {
reject(Error(`Message send timed out after ${timeoutMs}ms`));
};
timeoutSignal.addEventListener('abort', abortHandler);
});

try {
return await Promise.race([
channel.msgStream.write(message),
timeoutPromise,
]);
} finally {
// Clean up event listener to prevent unhandled rejection if operation
// completes before timeout
if (abortHandler) {
timeoutSignal.removeEventListener('abort', abortHandler);
}
}
}
17 changes: 17 additions & 0 deletions packages/ocap-kernel/src/remotes/platform/constants.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
/** Default maximum number of concurrent connections */
export const DEFAULT_MAX_CONCURRENT_CONNECTIONS = 100;

/** Default maximum message size in bytes (1MB) */
export const DEFAULT_MAX_MESSAGE_SIZE_BYTES = 1024 * 1024;

/** Default stale peer cleanup interval in milliseconds (15 minutes) */
export const DEFAULT_CLEANUP_INTERVAL_MS = 15 * 60 * 1000;

/** Default stale peer timeout in milliseconds (1 hour) */
export const DEFAULT_STALE_PEER_TIMEOUT_MS = 60 * 60 * 1000;

/** Default message write timeout in milliseconds (10 seconds) */
export const DEFAULT_WRITE_TIMEOUT_MS = 10_000;

/** SCTP user initiated abort code (RFC 4960) */
export const SCTP_USER_INITIATED_ABORT = 12;
Loading
Loading