Skip to content

Implement futures::Sink#215

Open
Ali-Mirghasemi wants to merge 6 commits into
libp2p:masterfrom
Ali-Mirghasemi:master
Open

Implement futures::Sink#215
Ali-Mirghasemi wants to merge 6 commits into
libp2p:masterfrom
Ali-Mirghasemi:master

Conversation

@Ali-Mirghasemi

@Ali-Mirghasemi Ali-Mirghasemi commented Dec 8, 2025

Copy link
Copy Markdown

Summary

This PR adds an implementation of the futures::Sink trait for the Stream struct, enabling native async stream support.

What’s Included

  • Implemented futures::Sink for Stream
  • Verified functionality over Actix WebSocket
  • Ensured compatibility with Actix actors

Benefits

This change allows Stream to be used directly in async contexts and makes it interoperable with the broader async ecosystem, especially within Actix-based applications.

Testing

  • Tested with Actix WebSocket
  • Tested with Actix actor-based workflows

@elenaf9 elenaf9 left a comment

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks! Couple of comments below.
Could you please also add a test for this?

Comment thread yamux/src/connection/stream.rs Outdated
Comment on lines +367 to +371
match <Stream as AsyncWrite>::poll_flush(self, cx) {
Poll::Ready(Ok(())) => Poll::Ready(Ok(())),
Poll::Ready(Err(e)) => Poll::Ready(Err(e)),
Poll::Pending => Poll::Pending,
}

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
match <Stream as AsyncWrite>::poll_flush(self, cx) {
Poll::Ready(Ok(())) => Poll::Ready(Ok(())),
Poll::Ready(Err(e)) => Poll::Ready(Err(e)),
Poll::Pending => Poll::Pending,
}
<Stream as AsyncWrite>::poll_flush(self, cx)

The match block doesn't change any types, does it?

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah it's better without match block, i'll change it

Comment thread yamux/src/connection/stream.rs Outdated
Comment on lines +375 to +379
match <Stream as AsyncWrite>::poll_close(self, cx) {
Poll::Ready(Ok(())) => Poll::Ready(Ok(())),
Poll::Ready(Err(e)) => Poll::Ready(Err(e)),
Poll::Pending => Poll::Pending,
}

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
match <Stream as AsyncWrite>::poll_close(self, cx) {
Poll::Ready(Ok(())) => Poll::Ready(Ok(())),
Poll::Ready(Err(e)) => Poll::Ready(Err(e)),
Poll::Pending => Poll::Pending,
}
<Stream as AsyncWrite>::poll_close(self, cx)

Same here.

Comment on lines +310 to +364
fn poll_ready(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
// Ensure the outgoing mpsc is ready.
ready!(self
.sender
.poll_ready(cx)
.map_err(|_| self.write_zero_err())?)
;

let mut shared = self.shared();

// If we cannot write any more (stream closed for writing) -> error.
if !shared.state().can_write() {
return Poll::Ready(Err(self.write_zero_err()));
}

// If no send credit, arrange to be woken when credit arrives and return Pending.
if shared.send_window() == 0 {
shared.writer = Some(cx.waker().clone());
return Poll::Pending;
}

Poll::Ready(Ok(()))
}

fn start_send(mut self: Pin<&mut Self>, item: Packet) -> Result<(), Self::Error> {
let body = item.0;

// Limit the mutex guard scope
{
let mut shared = self.shared();
if !shared.state().can_write() {
return Err(self.write_zero_err());
}
if shared.send_window() == 0 {
return Err(io::Error::new(
io::ErrorKind::WouldBlock,
"no credit"
));
}
let k = std::cmp::min(
shared.send_window(),
body.len().try_into().unwrap_or(u32::MAX),
);
shared.consume_send_window(k);
} // guard dropped

// Now we can mutate self
let mut frame = Frame::data(self.id, body).unwrap().left();
self.add_flag(frame.header_mut());

let cmd = StreamCommand::SendFrame(frame);
self.sender
.start_send(cmd)
.map_err(|_| self.write_zero_err())
}

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we extract common logic from here and <Stream as AsyncWrite>::poll_write into separate functions so that we don't duplicate any logic?

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i didn't want to change too much, yeah i try to merge AsyncWrite and Sink logic together

@Ali-Mirghasemi Ali-Mirghasemi changed the title Implement futures::Stream Implement futures::Sink Dec 9, 2025
@Ali-Mirghasemi

Copy link
Copy Markdown
Author

Can you review latest commits please
thank you

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants