From 113a447e07f6d7d88732a38b68fa24b07dea40e0 Mon Sep 17 00:00:00 2001 From: Andrey Date: Sat, 31 Jan 2026 22:53:57 +0100 Subject: [PATCH] fix(stream): add Poll::Pending to avoid busy loop on disconnect --- src/stream.rs | 30 +++++++++++++++++++++++++++--- 1 file changed, 27 insertions(+), 3 deletions(-) diff --git a/src/stream.rs b/src/stream.rs index 91348ad..2f1cf59 100644 --- a/src/stream.rs +++ b/src/stream.rs @@ -57,6 +57,8 @@ pub struct WebSocketStream { needs_pong_flush: bool, /// Pending messages from array-formatted snapshot (e.g. initial book snapshot) pending_books: VecDeque, + /// Flag indicating reconnection is needed (set when connection lost) + needs_reconnect: bool, } /// Stream statistics @@ -113,6 +115,7 @@ impl WebSocketStream { reconnect_config: ReconnectConfig::default(), needs_pong_flush: false, pending_books: VecDeque::new(), + needs_reconnect: false, } } @@ -492,9 +495,16 @@ impl WebSocketStream { } } + /// Check if reconnection is needed (connection was lost) + pub fn needs_reconnect(&self) -> bool { + self.needs_reconnect + } + /// Reconnect with exponential backoff - #[allow(dead_code)] - async fn reconnect(&mut self) -> Result<()> { + /// + /// Call this method when `needs_reconnect()` returns true. + /// It will attempt to reconnect to the WebSocket and resubscribe to all previous subscriptions. + pub async fn reconnect(&mut self) -> Result<()> { let mut delay = self.reconnect_config.base_delay; let mut retries = 0; @@ -505,6 +515,7 @@ impl WebSocketStream { Ok(()) => { info!("Successfully reconnected"); self.stats.reconnect_count += 1; + self.needs_reconnect = false; // Reset flag on success // Resubscribe to all previous subscriptions let subscriptions = self.subscriptions.clone(); @@ -688,6 +699,7 @@ impl Stream for WebSocketStream { tokio_tungstenite::tungstenite::Message::Close(_) => { info!("WebSocket connection closed by server"); self.connection = None; + self.needs_reconnect = true; Poll::Ready(None) }, _ => Poll::Pending, @@ -696,16 +708,28 @@ impl Stream for WebSocketStream { Poll::Ready(Some(Err(e))) => { error!("WebSocket error: {}", e); self.stats.errors += 1; + self.needs_reconnect = true; + self.connection = None; Poll::Ready(Some(Err(e.into()))) }, Poll::Ready(None) => { debug!("WebSocket stream ended"); + self.needs_reconnect = true; + self.connection = None; Poll::Ready(None) }, Poll::Pending => Poll::Pending, } } else { - Poll::Ready(None) + // Connection is None - check if we're waiting for reconnection + if self.needs_reconnect { + // Return Pending to avoid busy loop in select! while waiting for reconnect() + // The consumer should call reconnect() when they see needs_reconnect() + Poll::Pending + } else { + // Stream is truly ended (no reconnect requested) + Poll::Ready(None) + } } } }