Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 27 additions & 3 deletions src/stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,8 @@ pub struct WebSocketStream {
needs_pong_flush: bool,
/// Pending messages from array-formatted snapshot (e.g. initial book snapshot)
pending_books: VecDeque<StreamMessage>,
/// Flag indicating reconnection is needed (set when connection lost)
needs_reconnect: bool,
}

/// Stream statistics
Expand Down Expand Up @@ -113,6 +115,7 @@ impl WebSocketStream {
reconnect_config: ReconnectConfig::default(),
needs_pong_flush: false,
pending_books: VecDeque::new(),
needs_reconnect: false,
}
}

Expand Down Expand Up @@ -492,9 +495,16 @@ impl WebSocketStream {
}
}

/// Check if reconnection is needed (connection was lost)
pub fn needs_reconnect(&self) -> bool {
self.needs_reconnect
}
Comment on lines +498 to +501

Copilot AI Jan 31, 2026

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

needs_reconnect is only reset inside reconnect(). If the stream re-establishes a connection via another path that calls connect() (e.g., subscribe_async when connection.is_none()), needs_reconnect() can continue to return true even after a successful connect. Consider resetting the flag on any successful (re)connection (e.g., in connect()), or ensuring all reconnection entry points funnel through reconnect().

Copilot uses AI. Check for mistakes.

/// Reconnect with exponential backoff
#[allow(dead_code)]
async fn reconnect(&mut self) -> Result<()> {
///
/// Call this method when `needs_reconnect()` returns true.
/// It will attempt to reconnect to the WebSocket and resubscribe to all previous subscriptions.
pub async fn reconnect(&mut self) -> Result<()> {
Comment on lines 495 to +507

Copilot AI Jan 31, 2026

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The reconnect() rustdoc has an empty doc line (/// ) and doesn’t mention that poll_next returns None on Close/stream end (terminating typical while let Some(...) consumption). Consider tightening the rustdoc (remove the blank doc line) and documenting the expected consumer pattern (e.g., handle None/end-of-stream, then call reconnect() and resume polling).

Copilot uses AI. Check for mistakes.
let mut delay = self.reconnect_config.base_delay;
let mut retries = 0;

Expand All @@ -505,6 +515,7 @@ impl WebSocketStream {
Ok(()) => {
info!("Successfully reconnected");
self.stats.reconnect_count += 1;
self.needs_reconnect = false; // Reset flag on success

// Resubscribe to all previous subscriptions
let subscriptions = self.subscriptions.clone();
Expand Down Expand Up @@ -688,6 +699,7 @@ impl Stream for WebSocketStream {
tokio_tungstenite::tungstenite::Message::Close(_) => {
info!("WebSocket connection closed by server");
self.connection = None;
self.needs_reconnect = true;
Poll::Ready(None)
},
_ => Poll::Pending,
Expand All @@ -696,16 +708,28 @@ impl Stream for WebSocketStream {
Poll::Ready(Some(Err(e))) => {
error!("WebSocket error: {}", e);
self.stats.errors += 1;
self.needs_reconnect = true;
self.connection = None;
Poll::Ready(Some(Err(e.into())))
},
Poll::Ready(None) => {
debug!("WebSocket stream ended");
self.needs_reconnect = true;
self.connection = None;
Poll::Ready(None)
},
Poll::Pending => Poll::Pending,
}
} else {
Poll::Ready(None)
// Connection is None - check if we're waiting for reconnection
if self.needs_reconnect {
// Return Pending to avoid busy loop in select! while waiting for reconnect()
// The consumer should call reconnect() when they see needs_reconnect()
Poll::Pending
} else {
// Stream is truly ended (no reconnect requested)
Poll::Ready(None)
}
}
}
}
Expand Down
Loading