Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion packages/interface/src/stream-handler.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,12 @@ export interface StreamHandler {

/**
* Stream middleware allows accessing stream data outside of the stream handler
*
* Return false to stop the middleware chain without aborting the stream.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

could said returning true or void lets the middleware continue.

* Throw or reject to abort the stream.
*/
export interface StreamMiddleware {
(stream: Stream, connection: Connection, next: (stream: Stream, connection: Connection) => void): void | Promise<void>
(stream: Stream, connection: Connection, next: (stream: Stream, connection: Connection) => void): void | false | Promise<void | false>

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should this be void | boolean | Promise<void | boolean>?

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should be false - kind of want false to be an explicit option to cancel the middleware... or could make it true & void do the same thing which feels a bit off

@tabcat tabcat Jun 10, 2026

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thinking about cases where the function returns some boolean logic. would have to coerce to void or false to stop or not stop the middleware while also fitting the type constraint.

}

export interface StreamHandlerOptions extends AbortOptions {
Expand Down
64 changes: 47 additions & 17 deletions packages/libp2p/src/connection.ts
Original file line number Diff line number Diff line change
Expand Up @@ -250,7 +250,11 @@ export class Connection extends TypedEventEmitter<MessageStreamEvents> implement
throw new LimitedConnectionError('Cannot open protocol stream on limited connection')
}

const middleware = this.components.registrar.getMiddleware(muxedStream.protocol)
// Copy registered middleware before appending the handler wrapper below;
// the registered middleware array is reused across streams.
const middleware = [
...this.components.registrar.getMiddleware(muxedStream.protocol)
]

middleware.push(async (stream, connection, next) => {
await handler(stream, connection)
Expand All @@ -268,22 +272,14 @@ export class Connection extends TypedEventEmitter<MessageStreamEvents> implement
const mw = middleware[i]
stream.log.trace('running middleware', i, mw)

// eslint-disable-next-line no-loop-func
await new Promise<void>((resolve, reject) => {
try {
const result = mw(stream, connection, (s, c) => {
stream = s
connection = c
resolve()
})

if (result instanceof Promise) {
result.catch(reject)
}
} catch (err) {
reject(err)
}
})
const result = await runMiddleware(mw, stream, connection)
stream = result.stream
connection = result.connection

if (result.stop) {
stream.log.trace('middleware stopped chain', i, mw)
break
}

stream.log.trace('ran middleware', i, mw)
}
Expand Down Expand Up @@ -353,6 +349,40 @@ function findOutgoingStreamLimit (protocol: string, registrar: Registrar, option
return options.maxOutboundStreams ?? DEFAULT_MAX_OUTBOUND_STREAMS
}

interface RunMiddlewareResult {
stream: Stream
connection: ConnectionInterface
stop: boolean
}

function runMiddleware (mw: StreamMiddleware, stream: Stream, connection: ConnectionInterface): Promise<RunMiddlewareResult> {
return new Promise<RunMiddlewareResult>((resolve, reject) => {
const continueChain = (s: Stream, c: ConnectionInterface): void => {
resolve({ stream: s, connection: c, stop: false })
}

const stopChain = (): void => {
resolve({ stream, connection, stop: true })
}

try {
const result = mw(stream, connection, continueChain)

if (result === false) {
stopChain()
} else if (result != null) {
result.then(result => {
if (result === false) {
stopChain()
}
}).catch(reject)
}
Comment on lines +371 to +379

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why not just await mw call?

} catch (err) {
reject(err)
}
})
}

function countStreams (protocol: string, direction: 'inbound' | 'outbound', connection: Connection): number {
let streamCount = 0

Expand Down
Loading
Loading