Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions src/ts/ssh-tcp/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ export { PortForwardingService } from './services/portForwardingService';
export { LocalPortForwarder } from './services/localPortForwarder';
export { RemotePortForwarder } from './services/remotePortForwarder';
export { RemotePortStreamer } from './services/remotePortStreamer';
export { StreamForwarder } from './services/streamForwarder';
Comment thread
DavidObando marked this conversation as resolved.
export { PortForwardMessageFactory } from './portForwardMessageFactory';

export { PortForwardRequestMessage } from './messages/portForwardRequestMessage';
Expand Down
52 changes: 45 additions & 7 deletions src/ts/ssh-tcp/services/localPortForwarder.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import {
SshProtocolExtensionNames,
SshStream,
} from '@microsoft/dev-tunnels-ssh';
import { Duplex } from 'stream';
import { StreamForwarder } from './streamForwarder';
import { PortForwardingService } from './portForwardingService';

Expand Down Expand Up @@ -157,6 +158,20 @@ export class LocalPortForwarder extends SshService {

// TODO: Set socket options?

// Attach a temporary 'error' handler so a peer reset between accept and
// StreamForwarder construction (while we await openChannel and the
// forwardedPortConnecting event handler) does not crash the host with
// an unhandled 'error' event. Removed once the forwarder takes over.
const acceptErrorHandler = (e: Error) => {
this.trace(
TraceLevel.Warning,
SshTraceEventIds.portForwardConnectionFailed,
`PortForwardingService accepted socket errored before forwarding started: ${e.message}`,
e,
);
};
socket.on('error', acceptErrorHandler);

let channel: SshChannel | null;
try {
channel = await this.pfs.openChannel(
Expand All @@ -172,6 +187,7 @@ export class LocalPortForwarder extends SshService {

// TODO: Destroy the socket in a way that causes a connection reset:
// https://github.com/nodejs/node/issues/27428
socket.removeListener('error', acceptErrorHandler);
socket.destroy();

// Don't re-throw. This is an async event handler so the caller isn't awaiting.
Expand All @@ -180,19 +196,41 @@ export class LocalPortForwarder extends SshService {
}

// The event handler may return a transformed stream.
const forwardedStream = await this.pfs.forwardedPortConnecting(
this.remotePort ?? this.localPort,
false,
new SshStream(channel),
);
const sshStream = new SshStream(channel);
let forwardedStream: Duplex | null;
try {
forwardedStream = await this.pfs.forwardedPortConnecting(
this.remotePort ?? this.localPort,
false,
sshStream,
);
} catch (e) {
socket.removeListener('error', acceptErrorHandler);
socket.destroy();
sshStream.destroy();
throw e;
}

if (!forwardedStream) {
// The event handler rejected the connection.
socket.removeListener('error', acceptErrorHandler);
socket.destroy();
sshStream.destroy();
return;
}

const forwarder = new StreamForwarder(socket, forwardedStream, channel.session.trace);
this.pfs.streamForwarders.push(forwarder);
// Hand off socket error handling to the StreamForwarder.
socket.removeListener('error', acceptErrorHandler);

const forwarder = new StreamForwarder(
socket,
forwardedStream,
channel.session.trace,
this.pfs.removeStreamForwarder,
);
if (!forwarder.isDisposed) {
this.pfs.streamForwarders.add(forwarder);
}
}

public dispose() {
Expand Down
9 changes: 7 additions & 2 deletions src/ts/ssh-tcp/services/portForwardingService.ts
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,12 @@ export class PortForwardingService extends SshService {
private readonly remoteConnectors = new Map<number, RemotePortConnector>();

/* @internal */
public readonly streamForwarders: StreamForwarder[] = [];
public readonly streamForwarders: Set<StreamForwarder> = new Set<StreamForwarder>();

/* @internal */
public readonly removeStreamForwarder = (forwarder: StreamForwarder): void => {
this.streamForwarders.delete(forwarder);
};

/* @internal */
public constructor(session: SshSession) {
Expand Down Expand Up @@ -936,7 +941,7 @@ export class PortForwardingService extends SshService {
...this.remoteConnectors.values(),
];

this.streamForwarders.splice(0, this.streamForwarders.length);
this.streamForwarders.clear();
this.localForwarders.clear();
this.remoteConnectors.clear();

Expand Down
67 changes: 59 additions & 8 deletions src/ts/ssh-tcp/services/remotePortForwarder.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import {
TraceLevel,
SshStream,
} from '@microsoft/dev-tunnels-ssh';
import { Duplex } from 'stream';
import { StreamForwarder } from './streamForwarder';
import { PortForwardingService } from './portForwardingService';
import { RemotePortConnector } from './remotePortConnector';
Expand Down Expand Up @@ -75,16 +76,41 @@ export class RemotePortForwarder extends RemotePortConnector {
cancellation?: CancellationToken,
): Promise<void> {
const channel = request.channel;
const sshStream = new SshStream(channel);

// Attach a temporary 'error' handler so a remote channel reset between
// channel-open and StreamForwarder construction (while we await
// forwardedPortConnecting and the local TCP connect) does not crash
// the host with an unhandled 'error' event. Removed once the forwarder
// takes over (or the connection is rejected/aborted below).
const channelErrorHandler = (e: Error) => {
trace(
TraceLevel.Warning,
SshTraceEventIds.portForwardConnectionFailed,
`PortForwardingService channel stream errored before forwarding started: ${e.message}`,
e,
);
};
sshStream.on('error', channelErrorHandler);

const forwardedStream = await pfs.forwardedPortConnecting(
remotePort ?? localPort,
true,
new SshStream(channel),
cancellation,
);
let forwardedStream: Duplex | null;
try {
forwardedStream = await pfs.forwardedPortConnecting(
remotePort ?? localPort,
true,
sshStream,
cancellation,
);
} catch (e) {
sshStream.removeListener('error', channelErrorHandler);
sshStream.destroy();
throw e;
}

if (!forwardedStream) {
// The event handler rejected the connection.
sshStream.removeListener('error', channelErrorHandler);
Comment thread
DavidObando marked this conversation as resolved.
sshStream.destroy();
request.failureReason = SshChannelOpenFailureReason.connectFailed;
return;
}
Expand Down Expand Up @@ -119,6 +145,11 @@ export class RemotePortForwarder extends RemotePortConnector {
await connectCompletion.promise;
} catch (e) {
if (!(e instanceof Error) || cancellation?.isCancellationRequested) {
sshStream.removeListener('error', channelErrorHandler);
// The forwardedStream may be the user-substituted stream; close it
// so we don't leak the underlying SSH channel.
forwardedStream.destroy();
if (forwardedStream !== sshStream) sshStream.destroy();
throw e;
}

Expand All @@ -131,19 +162,39 @@ export class RemotePortForwarder extends RemotePortConnector {
);
request.failureReason = SshChannelOpenFailureReason.connectFailed;
request.failureDescription = e.message;

// Tear down the SSH side and abandon: do NOT proceed to construct a
// StreamForwarder around a destroyed socket. Pre-existing bug: the
// previous code fell through and built a forwarder anyway, leaking
// resources and (before PR #138) potentially crashing the host.
sshStream.removeListener('error', channelErrorHandler);
forwardedStream.destroy();
if (forwardedStream !== sshStream) sshStream.destroy();
return;
} finally {
cancellationRegistration?.dispose();
}

// TODO: Set socket options?

const streamForwarder = new StreamForwarder(socket, forwardedStream, channel.session.trace);
// Hand off SSH stream error handling to the StreamForwarder.
sshStream.removeListener('error', channelErrorHandler);

const streamForwarder = new StreamForwarder(
socket,
forwardedStream,
channel.session.trace,
pfs.removeStreamForwarder,
);
trace(
TraceLevel.Info,
SshTraceEventIds.portForwardConnectionOpened,
`${channel.session} PortForwardingService forwarded channel ` +
`#${channel.channelId} connection to ${localHost}:${localPort}.`,
);
pfs.streamForwarders.push(streamForwarder);
pfs.streamForwarders.add(streamForwarder);
if (streamForwarder.isDisposed) {
pfs.streamForwarders.delete(streamForwarder);
}
}
}
24 changes: 23 additions & 1 deletion src/ts/ssh-tcp/services/streamForwarder.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,23 +9,33 @@ import { Socket } from 'net';

export class StreamForwarder implements Disposable {
private disposed: boolean = false;
private readonly onDisposedCallback?: (forwarder: StreamForwarder) => void;

public get isDisposed(): boolean {
return this.disposed;
}

/* @internal */
public constructor(
public readonly localStream: Duplex,
public readonly remoteStream: Duplex,
public readonly trace: Trace,
onDisposed?: (forwarder: StreamForwarder) => void,
) {
if (!localStream) throw new TypeError('Local stream is required.');
if (!remoteStream) throw new TypeError('Remote stream is required.');

this.onDisposedCallback = onDisposed;

// Without these listeners, errors from either side of the forwarder
// propagate up to the Node process as unhandled 'error' events and
// crash the host. Node's pipe() does not propagate errors between
// streams, so each side must be handled independently.
localStream.on('error', (err) => this.onStreamError('local', err));
remoteStream.on('error', (err) => this.onStreamError('remote', err));

// pipe() forwards 'end' (so EOF on one side gracefully ends the other),
// but does NOT forward 'error'. Error propagation is handled above
// by disposing the forwarder, which tears down both sides.
localStream.pipe(remoteStream);
remoteStream.pipe(localStream);
}
Expand Down Expand Up @@ -72,6 +82,18 @@ export class StreamForwarder implements Disposable {
if (!this.disposed) {
this.disposed = true;
this.close(true);
if (this.onDisposedCallback) {
try {
this.onDisposedCallback(this);
} catch (e) {
const errorMessage = e instanceof Error ? e.message : String(e);
this.trace(
TraceLevel.Warning,
SshTraceEventIds.unknownError,
`Stream forwarder onDisposed callback threw: ${errorMessage}`,
);
}
}
}
}
}
10 changes: 9 additions & 1 deletion src/ts/ssh/sshStream.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

import { SshChannel } from './sshChannel';
import { PromiseCompletionSource } from './util/promiseCompletionSource';
import { TraceLevel, SshTraceEventIds } from './trace';
import { Duplex } from 'stream';

/**
Expand Down Expand Up @@ -132,7 +133,14 @@ export class SshStream extends Duplex {
* Destroys the stream and closes the underlying SSH channel.
*/
public destroy(error?: Error) {
void this.channel.close().catch();
this.channel.close().catch((e) => {
const message = e instanceof Error ? e.message : String(e);
this.channel.session.trace(
TraceLevel.Warning,
SshTraceEventIds.unknownError,
`${this} channel close on destroy failed: ${message}`,
);
});
super.destroy(error);
return this;
}
Expand Down
Loading
Loading