From e0962d284b103a898d734854d2f8019f1e8fb36a Mon Sep 17 00:00:00 2001 From: David Obando Date: Mon, 4 May 2026 09:54:00 -0700 Subject: [PATCH 1/3] Improve port-forwarding resilience under bursty multi-port load MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Follow-up to #138. PR #138 prevented host-process crashes from unhandled 'error' events on duplex streams inside StreamForwarder, but several related issues remained that either (a) leaked resources, (b) still left a small unhandled-'error' window outside the forwarder, or (c) wasted work on doomed connections. This change addresses them. Changes: * StreamForwarder: accept an optional onDisposed callback so the PortForwardingService can remove disposed forwarders from its tracking collection. Add a clarifying comment about pipe()'s end-vs-error semantics. * PortForwardingService: change streamForwarders from an append-only array to a Set, with a removeStreamForwarder callback wired into every StreamForwarder. Previously disposed forwarders were retained for the lifetime of the session, holding references to their socket and SshStream — a real leak under bursty connect/disconnect workloads (the exact workload PR #138 was reacting to). * LocalPortForwarder.acceptConnection: attach a temporary 'error' handler on the accepted socket *before* awaiting openChannel and the forwardedPortConnecting event. Without this, a peer reset during that await window emits 'error' on a listener-less socket and crashes the host — the same crash class as #138, just earlier in the lifecycle. Also destroy the socket when the connecting event handler rejects the connection (was previously leaked). * RemotePortForwarder.forwardChannel: - Bug fix: when the local TCP connect fails, return early instead of falling through to construct a StreamForwarder around a destroyed socket. The forwardedStream is now also destroyed so the underlying SSH channel doesn't leak. - Attach a temporary 'error' handler on the SshStream during the forwardedPortConnecting + connect window for the same reason as the local side: a remote channel reset in that window would emit an unhandled 'error'. * SshStream.destroy: trace channel.close() failures instead of silently swallowing them with .catch(). Aids diagnosis of teardown issues that the new error path may now expose. Behavior is unchanged on the happy path. On the error path, forwarders self-dispose, are removed from the PFS collection, and traces include the side and reason. No public API changes. Verified locally: npm run compile, npm run eslint, and npm run test-ts all pass (one pre-existing unrelated failure in VersionTests). Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> --- src/ts/ssh-tcp/services/localPortForwarder.ts | 28 +++++++++++- .../ssh-tcp/services/portForwardingService.ts | 9 +++- .../ssh-tcp/services/remotePortForwarder.ts | 43 +++++++++++++++++-- src/ts/ssh-tcp/services/streamForwarder.ts | 19 ++++++++ src/ts/ssh/sshStream.ts | 10 ++++- 5 files changed, 101 insertions(+), 8 deletions(-) diff --git a/src/ts/ssh-tcp/services/localPortForwarder.ts b/src/ts/ssh-tcp/services/localPortForwarder.ts index 9943d11..c9c2e7e 100644 --- a/src/ts/ssh-tcp/services/localPortForwarder.ts +++ b/src/ts/ssh-tcp/services/localPortForwarder.ts @@ -157,6 +157,20 @@ export class LocalPortForwarder extends SshService { // TODO: Set socket options? + // Attach a temporary 'error' handler so a peer reset between accept and + // StreamForwarder construction (while we await openChannel and the + // forwardedPortConnecting event handler) does not crash the host with + // an unhandled 'error' event. Removed once the forwarder takes over. + const acceptErrorHandler = (e: Error) => { + this.trace( + TraceLevel.Warning, + SshTraceEventIds.portForwardConnectionFailed, + `PortForwardingService accepted socket errored before forwarding started: ${e.message}`, + e, + ); + }; + socket.on('error', acceptErrorHandler); + let channel: SshChannel | null; try { channel = await this.pfs.openChannel( @@ -188,11 +202,21 @@ export class LocalPortForwarder extends SshService { if (!forwardedStream) { // The event handler rejected the connection. + socket.removeListener('error', acceptErrorHandler); + socket.destroy(); return; } - const forwarder = new StreamForwarder(socket, forwardedStream, channel.session.trace); - this.pfs.streamForwarders.push(forwarder); + // Hand off socket error handling to the StreamForwarder. + socket.removeListener('error', acceptErrorHandler); + + const forwarder = new StreamForwarder( + socket, + forwardedStream, + channel.session.trace, + this.pfs.removeStreamForwarder, + ); + this.pfs.streamForwarders.add(forwarder); } public dispose() { diff --git a/src/ts/ssh-tcp/services/portForwardingService.ts b/src/ts/ssh-tcp/services/portForwardingService.ts index 50de5f6..e4bd947 100644 --- a/src/ts/ssh-tcp/services/portForwardingService.ts +++ b/src/ts/ssh-tcp/services/portForwardingService.ts @@ -89,7 +89,12 @@ export class PortForwardingService extends SshService { private readonly remoteConnectors = new Map(); /* @internal */ - public readonly streamForwarders: StreamForwarder[] = []; + public readonly streamForwarders: Set = new Set(); + + /* @internal */ + public readonly removeStreamForwarder = (forwarder: StreamForwarder): void => { + this.streamForwarders.delete(forwarder); + }; /* @internal */ public constructor(session: SshSession) { @@ -936,7 +941,7 @@ export class PortForwardingService extends SshService { ...this.remoteConnectors.values(), ]; - this.streamForwarders.splice(0, this.streamForwarders.length); + this.streamForwarders.clear(); this.localForwarders.clear(); this.remoteConnectors.clear(); diff --git a/src/ts/ssh-tcp/services/remotePortForwarder.ts b/src/ts/ssh-tcp/services/remotePortForwarder.ts index bf91ba7..162c196 100644 --- a/src/ts/ssh-tcp/services/remotePortForwarder.ts +++ b/src/ts/ssh-tcp/services/remotePortForwarder.ts @@ -75,16 +75,33 @@ export class RemotePortForwarder extends RemotePortConnector { cancellation?: CancellationToken, ): Promise { const channel = request.channel; + const sshStream = new SshStream(channel); + + // Attach a temporary 'error' handler so a remote channel reset between + // channel-open and StreamForwarder construction (while we await + // forwardedPortConnecting and the local TCP connect) does not crash + // the host with an unhandled 'error' event. Removed once the forwarder + // takes over (or the connection is rejected/aborted below). + const channelErrorHandler = (e: Error) => { + trace( + TraceLevel.Warning, + SshTraceEventIds.portForwardConnectionFailed, + `PortForwardingService channel stream errored before forwarding started: ${e.message}`, + e, + ); + }; + sshStream.on('error', channelErrorHandler); const forwardedStream = await pfs.forwardedPortConnecting( remotePort ?? localPort, true, - new SshStream(channel), + sshStream, cancellation, ); if (!forwardedStream) { // The event handler rejected the connection. + sshStream.removeListener('error', channelErrorHandler); request.failureReason = SshChannelOpenFailureReason.connectFailed; return; } @@ -119,6 +136,10 @@ export class RemotePortForwarder extends RemotePortConnector { await connectCompletion.promise; } catch (e) { if (!(e instanceof Error) || cancellation?.isCancellationRequested) { + sshStream.removeListener('error', channelErrorHandler); + // The forwardedStream may be the user-substituted stream; close it + // so we don't leak the underlying SSH channel. + forwardedStream.destroy(); throw e; } @@ -131,19 +152,35 @@ export class RemotePortForwarder extends RemotePortConnector { ); request.failureReason = SshChannelOpenFailureReason.connectFailed; request.failureDescription = e.message; + + // Tear down the SSH side and abandon: do NOT proceed to construct a + // StreamForwarder around a destroyed socket. Pre-existing bug: the + // previous code fell through and built a forwarder anyway, leaking + // resources and (before PR #138) potentially crashing the host. + sshStream.removeListener('error', channelErrorHandler); + forwardedStream.destroy(); + return; } finally { cancellationRegistration?.dispose(); } // TODO: Set socket options? - const streamForwarder = new StreamForwarder(socket, forwardedStream, channel.session.trace); + // Hand off SSH stream error handling to the StreamForwarder. + sshStream.removeListener('error', channelErrorHandler); + + const streamForwarder = new StreamForwarder( + socket, + forwardedStream, + channel.session.trace, + pfs.removeStreamForwarder, + ); trace( TraceLevel.Info, SshTraceEventIds.portForwardConnectionOpened, `${channel.session} PortForwardingService forwarded channel ` + `#${channel.channelId} connection to ${localHost}:${localPort}.`, ); - pfs.streamForwarders.push(streamForwarder); + pfs.streamForwarders.add(streamForwarder); } } diff --git a/src/ts/ssh-tcp/services/streamForwarder.ts b/src/ts/ssh-tcp/services/streamForwarder.ts index cd49f97..9940216 100644 --- a/src/ts/ssh-tcp/services/streamForwarder.ts +++ b/src/ts/ssh-tcp/services/streamForwarder.ts @@ -9,16 +9,20 @@ import { Socket } from 'net'; export class StreamForwarder implements Disposable { private disposed: boolean = false; + private readonly onDisposedCallback?: (forwarder: StreamForwarder) => void; /* @internal */ public constructor( public readonly localStream: Duplex, public readonly remoteStream: Duplex, public readonly trace: Trace, + onDisposed?: (forwarder: StreamForwarder) => void, ) { if (!localStream) throw new TypeError('Local stream is required.'); if (!remoteStream) throw new TypeError('Remote stream is required.'); + this.onDisposedCallback = onDisposed; + // Without these listeners, errors from either side of the forwarder // propagate up to the Node process as unhandled 'error' events and // crash the host. Node's pipe() does not propagate errors between @@ -26,6 +30,9 @@ export class StreamForwarder implements Disposable { localStream.on('error', (err) => this.onStreamError('local', err)); remoteStream.on('error', (err) => this.onStreamError('remote', err)); + // pipe() forwards 'end' (so EOF on one side gracefully ends the other), + // but does NOT forward 'error'. Error propagation is handled above + // by disposing the forwarder, which tears down both sides. localStream.pipe(remoteStream); remoteStream.pipe(localStream); } @@ -72,6 +79,18 @@ export class StreamForwarder implements Disposable { if (!this.disposed) { this.disposed = true; this.close(true); + if (this.onDisposedCallback) { + try { + this.onDisposedCallback(this); + } catch (e) { + if (!(e instanceof Error)) throw e; + this.trace( + TraceLevel.Warning, + SshTraceEventIds.unknownError, + `Stream forwarder onDisposed callback threw: ${e.message}`, + ); + } + } } } } diff --git a/src/ts/ssh/sshStream.ts b/src/ts/ssh/sshStream.ts index 3a282b6..74d6480 100644 --- a/src/ts/ssh/sshStream.ts +++ b/src/ts/ssh/sshStream.ts @@ -4,6 +4,7 @@ import { SshChannel } from './sshChannel'; import { PromiseCompletionSource } from './util/promiseCompletionSource'; +import { TraceLevel, SshTraceEventIds } from './trace'; import { Duplex } from 'stream'; /** @@ -132,7 +133,14 @@ export class SshStream extends Duplex { * Destroys the stream and closes the underlying SSH channel. */ public destroy(error?: Error) { - void this.channel.close().catch(); + this.channel.close().catch((e) => { + if (!(e instanceof Error)) return; + this.channel.session.trace( + TraceLevel.Warning, + SshTraceEventIds.unknownError, + `${this} channel close on destroy failed: ${e.message}`, + ); + }); super.destroy(error); return this; } From 8eafa29b839b0bf21f10e88deabbc9913266bde2 Mon Sep 17 00:00:00 2001 From: David Obando Date: Mon, 4 May 2026 12:17:57 -0700 Subject: [PATCH 2/3] Address review findings: listener leaks, channel leaks, dispose race, and tests Fix several edge cases identified during code review of PR #144: 1. Listener leak if forwardedPortConnecting throws: wrap the call in try/catch in both localPortForwarder and remotePortForwarder so the temporary error handler is always removed and streams are destroyed on unexpected exceptions. 2. SSH channel leak in localPortForwarder.acceptConnection: when the forwardedPortConnecting event handler rejects (returns null), the already-opened SSH channel was not being closed. Now destroy the SshStream so the channel is released. Also remove the temporary error listener in the openChannel catch path for consistency. 3. Synchronous-dispose race in StreamForwarder constructor: if an error fires during pipe() setup (before the caller can call streamForwarders.add()), the onDisposed callback runs on a forwarder not yet in the set, then the caller adds a dead forwarder. Fixed by adding a public isDisposed getter and guarding add() with it. 4. StreamForwarder tests: add streamForwarderTests.ts with 10 tests covering data forwarding, error-triggered dispose, callback invocation, idempotent dispose, the synchronous-dispose race, and onDisposed error swallowing. Export StreamForwarder from the package (constructor made public) to enable testing. 5. Ensure sshStream is destroyed when forwardedStream is user-substituted: in remotePortForwarder error paths, if the forwardedStream returned by the connecting event is not the original sshStream, explicitly destroy sshStream so the underlying SSH channel does not leak. Verified: npm run compile, npm run eslint, npm run test-ts all pass (357 passing, 1 pre-existing unrelated VersionTests failure). Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> --- src/ts/ssh-tcp/index.ts | 1 + src/ts/ssh-tcp/services/localPortForwarder.ts | 26 +- .../ssh-tcp/services/remotePortForwarder.ts | 25 +- src/ts/ssh-tcp/services/streamForwarder.ts | 5 +- test/ts/ssh-test/streamForwarderTests.ts | 234 ++++++++++++++++++ 5 files changed, 278 insertions(+), 13 deletions(-) create mode 100644 test/ts/ssh-test/streamForwarderTests.ts diff --git a/src/ts/ssh-tcp/index.ts b/src/ts/ssh-tcp/index.ts index d8a583e..cf7bf1c 100644 --- a/src/ts/ssh-tcp/index.ts +++ b/src/ts/ssh-tcp/index.ts @@ -11,6 +11,7 @@ export { PortForwardingService } from './services/portForwardingService'; export { LocalPortForwarder } from './services/localPortForwarder'; export { RemotePortForwarder } from './services/remotePortForwarder'; export { RemotePortStreamer } from './services/remotePortStreamer'; +export { StreamForwarder } from './services/streamForwarder'; export { PortForwardMessageFactory } from './portForwardMessageFactory'; export { PortForwardRequestMessage } from './messages/portForwardRequestMessage'; diff --git a/src/ts/ssh-tcp/services/localPortForwarder.ts b/src/ts/ssh-tcp/services/localPortForwarder.ts index c9c2e7e..3aaaabd 100644 --- a/src/ts/ssh-tcp/services/localPortForwarder.ts +++ b/src/ts/ssh-tcp/services/localPortForwarder.ts @@ -14,6 +14,7 @@ import { SshProtocolExtensionNames, SshStream, } from '@microsoft/dev-tunnels-ssh'; +import { Duplex } from 'stream'; import { StreamForwarder } from './streamForwarder'; import { PortForwardingService } from './portForwardingService'; @@ -186,6 +187,7 @@ export class LocalPortForwarder extends SshService { // TODO: Destroy the socket in a way that causes a connection reset: // https://github.com/nodejs/node/issues/27428 + socket.removeListener('error', acceptErrorHandler); socket.destroy(); // Don't re-throw. This is an async event handler so the caller isn't awaiting. @@ -194,16 +196,26 @@ export class LocalPortForwarder extends SshService { } // The event handler may return a transformed stream. - const forwardedStream = await this.pfs.forwardedPortConnecting( - this.remotePort ?? this.localPort, - false, - new SshStream(channel), - ); + const sshStream = new SshStream(channel); + let forwardedStream: Duplex | null; + try { + forwardedStream = await this.pfs.forwardedPortConnecting( + this.remotePort ?? this.localPort, + false, + sshStream, + ); + } catch (e) { + socket.removeListener('error', acceptErrorHandler); + socket.destroy(); + sshStream.destroy(); + throw e; + } if (!forwardedStream) { // The event handler rejected the connection. socket.removeListener('error', acceptErrorHandler); socket.destroy(); + sshStream.destroy(); return; } @@ -216,7 +228,9 @@ export class LocalPortForwarder extends SshService { channel.session.trace, this.pfs.removeStreamForwarder, ); - this.pfs.streamForwarders.add(forwarder); + if (!forwarder.isDisposed) { + this.pfs.streamForwarders.add(forwarder); + } } public dispose() { diff --git a/src/ts/ssh-tcp/services/remotePortForwarder.ts b/src/ts/ssh-tcp/services/remotePortForwarder.ts index 162c196..b045852 100644 --- a/src/ts/ssh-tcp/services/remotePortForwarder.ts +++ b/src/ts/ssh-tcp/services/remotePortForwarder.ts @@ -14,6 +14,7 @@ import { TraceLevel, SshStream, } from '@microsoft/dev-tunnels-ssh'; +import { Duplex } from 'stream'; import { StreamForwarder } from './streamForwarder'; import { PortForwardingService } from './portForwardingService'; import { RemotePortConnector } from './remotePortConnector'; @@ -92,12 +93,19 @@ export class RemotePortForwarder extends RemotePortConnector { }; sshStream.on('error', channelErrorHandler); - const forwardedStream = await pfs.forwardedPortConnecting( - remotePort ?? localPort, - true, - sshStream, - cancellation, - ); + let forwardedStream: Duplex | null; + try { + forwardedStream = await pfs.forwardedPortConnecting( + remotePort ?? localPort, + true, + sshStream, + cancellation, + ); + } catch (e) { + sshStream.removeListener('error', channelErrorHandler); + sshStream.destroy(); + throw e; + } if (!forwardedStream) { // The event handler rejected the connection. @@ -140,6 +148,7 @@ export class RemotePortForwarder extends RemotePortConnector { // The forwardedStream may be the user-substituted stream; close it // so we don't leak the underlying SSH channel. forwardedStream.destroy(); + if (forwardedStream !== sshStream) sshStream.destroy(); throw e; } @@ -159,6 +168,7 @@ export class RemotePortForwarder extends RemotePortConnector { // resources and (before PR #138) potentially crashing the host. sshStream.removeListener('error', channelErrorHandler); forwardedStream.destroy(); + if (forwardedStream !== sshStream) sshStream.destroy(); return; } finally { cancellationRegistration?.dispose(); @@ -182,5 +192,8 @@ export class RemotePortForwarder extends RemotePortConnector { `#${channel.channelId} connection to ${localHost}:${localPort}.`, ); pfs.streamForwarders.add(streamForwarder); + if (streamForwarder.isDisposed) { + pfs.streamForwarders.delete(streamForwarder); + } } } diff --git a/src/ts/ssh-tcp/services/streamForwarder.ts b/src/ts/ssh-tcp/services/streamForwarder.ts index 9940216..a81a751 100644 --- a/src/ts/ssh-tcp/services/streamForwarder.ts +++ b/src/ts/ssh-tcp/services/streamForwarder.ts @@ -11,7 +11,10 @@ export class StreamForwarder implements Disposable { private disposed: boolean = false; private readonly onDisposedCallback?: (forwarder: StreamForwarder) => void; - /* @internal */ + public get isDisposed(): boolean { + return this.disposed; + } + public constructor( public readonly localStream: Duplex, public readonly remoteStream: Duplex, diff --git a/test/ts/ssh-test/streamForwarderTests.ts b/test/ts/ssh-test/streamForwarderTests.ts new file mode 100644 index 0000000..4c7a242 --- /dev/null +++ b/test/ts/ssh-test/streamForwarderTests.ts @@ -0,0 +1,234 @@ +// +// Copyright (c) Microsoft Corporation. All rights reserved. +// + +import * as assert from 'assert'; +import { suite, test, slow, timeout } from '@testdeck/mocha'; +import { Duplex } from 'stream'; + +import { Trace } from '@microsoft/dev-tunnels-ssh'; +import { StreamForwarder } from '@microsoft/dev-tunnels-ssh-tcp'; + +const timeoutMs = 3000; + +/** + * A minimal Duplex stream that captures written data and allows pushing readable data. + * Does not echo writes back to the readable side (unlike PassThrough), making it safe + * for use in bidirectional pipe scenarios without creating infinite loops. + */ +class MockDuplex extends Duplex { + public readonly written: Buffer[] = []; + + constructor() { + super(); + } + + _write(chunk: Buffer, _encoding: string, callback: (error?: Error | null) => void): void { + this.written.push(Buffer.from(chunk)); + callback(); + } + + _read(_size: number): void { + // No-op; data is pushed externally via this.push() + } + + pushData(data: Buffer): void { + this.push(data); + } +} + +/** + * A Duplex that emits 'error' on nextTick after pipe() is called, + * simulating the synchronous-dispose race in StreamForwarder construction. + */ +class ErrorOnPipeDuplex extends MockDuplex { + private readonly pipeError: Error; + + constructor(error: Error) { + super(); + this.pipeError = error; + } + + pipe(destination: T): T { + const result = super.pipe(destination); + process.nextTick(() => this.emit('error', this.pipeError)); + return result; + } +} + +function createTrace(): Trace { + return () => {}; +} + +@suite +@slow(2000) +@timeout(timeoutMs * 2) +export class StreamForwarderTests { + @test + public async forwardDataLocalToRemote() { + const local = new MockDuplex(); + const remote = new MockDuplex(); + const forwarder = new StreamForwarder(local, remote, createTrace()); + + // Push data into local's readable side; it should be piped to remote's writable side. + local.pushData(Buffer.from('hello')); + + await new Promise((r) => setImmediate(r)); + assert.strictEqual(Buffer.concat(remote.written).toString(), 'hello'); + + forwarder.dispose(); + local.destroy(); + remote.destroy(); + } + + @test + public async forwardDataRemoteToLocal() { + const local = new MockDuplex(); + const remote = new MockDuplex(); + const forwarder = new StreamForwarder(local, remote, createTrace()); + + // Push data into remote's readable side; it should be piped to local's writable side. + remote.pushData(Buffer.from('world')); + + await new Promise((r) => setImmediate(r)); + assert.strictEqual(Buffer.concat(local.written).toString(), 'world'); + + forwarder.dispose(); + local.destroy(); + remote.destroy(); + } + + @test + public async localStreamErrorDisposesForwarder() { + const local = new MockDuplex(); + const remote = new MockDuplex(); + let disposedCalled = false; + const forwarder = new StreamForwarder(local, remote, createTrace(), () => { + disposedCalled = true; + }); + + local.emit('error', new Error('connection reset')); + + await new Promise((r) => setImmediate(r)); + assert.strictEqual(forwarder.isDisposed, true); + assert.strictEqual(disposedCalled, true); + local.destroy(); + remote.destroy(); + } + + @test + public async remoteStreamErrorDisposesForwarder() { + const local = new MockDuplex(); + const remote = new MockDuplex(); + let disposedCalled = false; + const forwarder = new StreamForwarder(local, remote, createTrace(), () => { + disposedCalled = true; + }); + + remote.emit('error', new Error('channel closed')); + + await new Promise((r) => setImmediate(r)); + assert.strictEqual(forwarder.isDisposed, true); + assert.strictEqual(disposedCalled, true); + local.destroy(); + remote.destroy(); + } + + @test + public async onDisposedCallbackInvokedOnDispose() { + const local = new MockDuplex(); + const remote = new MockDuplex(); + let callbackForwarder: StreamForwarder | null = null; + const forwarder = new StreamForwarder(local, remote, createTrace(), (f: StreamForwarder) => { + callbackForwarder = f; + }); + + forwarder.dispose(); + assert.strictEqual(callbackForwarder, forwarder); + local.destroy(); + remote.destroy(); + } + + @test + public async disposeIsIdempotent() { + const local = new MockDuplex(); + const remote = new MockDuplex(); + let disposeCount = 0; + const forwarder = new StreamForwarder(local, remote, createTrace(), () => { + disposeCount++; + }); + + forwarder.dispose(); + forwarder.dispose(); + forwarder.dispose(); + assert.strictEqual(disposeCount, 1); + local.destroy(); + remote.destroy(); + } + + @test + public async synchronousErrorDuringPipeMarksDisposed() { + const errorStream = new ErrorOnPipeDuplex(new Error('immediate failure')); + const remote = new MockDuplex(); + const forwarder = new StreamForwarder(errorStream, remote, createTrace()); + + // The error fires on nextTick, so wait a tick. + await new Promise((r) => setImmediate(r)); + assert.strictEqual(forwarder.isDisposed, true); + errorStream.destroy(); + remote.destroy(); + } + + @test + public async forwarderRemovedFromSetOnDispose() { + const local = new MockDuplex(); + const remote = new MockDuplex(); + const set = new Set(); + const forwarder = new StreamForwarder(local, remote, createTrace(), (f: StreamForwarder) => { + set.delete(f); + }); + set.add(forwarder); + + forwarder.dispose(); + assert.strictEqual(set.size, 0); + local.destroy(); + remote.destroy(); + } + + @test + public async synchronousDisposeRaceDoesNotLeaveStaleEntry() { + const errorStream = new ErrorOnPipeDuplex(new Error('race error')); + const remote = new MockDuplex(); + const set = new Set(); + const forwarder = new StreamForwarder(errorStream, remote, createTrace(), (f: StreamForwarder) => { + set.delete(f); + }); + + // Caller adds after construction (the real code pattern). + if (!forwarder.isDisposed) { + set.add(forwarder); + } + + // Wait for the error to fire and dispose to run. + await new Promise((r) => setImmediate(r)); + assert.strictEqual(set.size, 0); + assert.strictEqual(forwarder.isDisposed, true); + errorStream.destroy(); + remote.destroy(); + } + + @test + public async onDisposedCallbackErrorIsSwallowed() { + const local = new MockDuplex(); + const remote = new MockDuplex(); + const forwarder = new StreamForwarder(local, remote, createTrace(), () => { + throw new Error('callback error'); + }); + + // Should not throw — the error is traced and swallowed. + forwarder.dispose(); + assert.strictEqual(forwarder.isDisposed, true); + local.destroy(); + remote.destroy(); + } +} From d93043b6f4abcaea4933a028d82dce9b31adfac7 Mon Sep 17 00:00:00 2001 From: David Obando Date: Mon, 4 May 2026 13:17:17 -0700 Subject: [PATCH 3/3] Apply suggestions from code review Co-authored-by: Copilot Autofix powered by AI <175728472+Copilot@users.noreply.github.com> --- src/ts/ssh-tcp/services/remotePortForwarder.ts | 1 + src/ts/ssh-tcp/services/streamForwarder.ts | 4 ++-- src/ts/ssh/sshStream.ts | 4 ++-- 3 files changed, 5 insertions(+), 4 deletions(-) diff --git a/src/ts/ssh-tcp/services/remotePortForwarder.ts b/src/ts/ssh-tcp/services/remotePortForwarder.ts index b045852..84d310e 100644 --- a/src/ts/ssh-tcp/services/remotePortForwarder.ts +++ b/src/ts/ssh-tcp/services/remotePortForwarder.ts @@ -110,6 +110,7 @@ export class RemotePortForwarder extends RemotePortConnector { if (!forwardedStream) { // The event handler rejected the connection. sshStream.removeListener('error', channelErrorHandler); + sshStream.destroy(); request.failureReason = SshChannelOpenFailureReason.connectFailed; return; } diff --git a/src/ts/ssh-tcp/services/streamForwarder.ts b/src/ts/ssh-tcp/services/streamForwarder.ts index a81a751..6a0d89a 100644 --- a/src/ts/ssh-tcp/services/streamForwarder.ts +++ b/src/ts/ssh-tcp/services/streamForwarder.ts @@ -86,11 +86,11 @@ export class StreamForwarder implements Disposable { try { this.onDisposedCallback(this); } catch (e) { - if (!(e instanceof Error)) throw e; + const errorMessage = e instanceof Error ? e.message : String(e); this.trace( TraceLevel.Warning, SshTraceEventIds.unknownError, - `Stream forwarder onDisposed callback threw: ${e.message}`, + `Stream forwarder onDisposed callback threw: ${errorMessage}`, ); } } diff --git a/src/ts/ssh/sshStream.ts b/src/ts/ssh/sshStream.ts index 74d6480..9c5916b 100644 --- a/src/ts/ssh/sshStream.ts +++ b/src/ts/ssh/sshStream.ts @@ -134,11 +134,11 @@ export class SshStream extends Duplex { */ public destroy(error?: Error) { this.channel.close().catch((e) => { - if (!(e instanceof Error)) return; + const message = e instanceof Error ? e.message : String(e); this.channel.session.trace( TraceLevel.Warning, SshTraceEventIds.unknownError, - `${this} channel close on destroy failed: ${e.message}`, + `${this} channel close on destroy failed: ${message}`, ); }); super.destroy(error);