fix(stream): enable public reconnect() with needs_reconnect() flag#2
fix(stream): enable public reconnect() with needs_reconnect() flag#2Canvinus wants to merge 1 commit into
Conversation
There was a problem hiding this comment.
Pull request overview
Enables consumers of WebSocketStream to detect when a WebSocket connection has been lost and explicitly trigger reconnection, including automatic resubscription.
Changes:
- Add a
needs_reconnect: boolflag toWebSocketStream, with a publicneeds_reconnect()getter. - Make
reconnect()public and implement exponential backoff retries. - Set
needs_reconnect = truewhen the websocket closes, errors, or ends; reset it on successful reconnection.
Comments suppressed due to low confidence (3)
src/stream.rs:552
- New reconnection behavior (
needs_reconnect()+ publicreconnect()) isn’t covered by the existing unit tests in this module. Adding tests that (a) setneeds_reconnectwhen the stream receives Close / Err / None, and (b) verify the flag is only cleared after a successful reconnect+resubscribe, would help prevent regressions. If direct websocket I/O is hard to test, consider extracting the resubscribe/flag-update logic behind a small injectable trait or helper that can be unit-tested.
/// Check if reconnection is needed (connection was lost)
pub fn needs_reconnect(&self) -> bool {
self.needs_reconnect
}
/// Reconnect with exponential backoff
///
/// Call this method when `needs_reconnect()` returns true.
/// It will attempt to reconnect to the WebSocket and resubscribe to all previous subscriptions.
pub async fn reconnect(&mut self) -> Result<()> {
let mut delay = self.reconnect_config.base_delay;
let mut retries = 0;
while retries < self.reconnect_config.max_retries {
warn!("Attempting to reconnect (attempt {})", retries + 1);
match self.connect().await {
Ok(()) => {
info!("Successfully reconnected");
self.stats.reconnect_count += 1;
self.needs_reconnect = false; // Reset flag on success
// Resubscribe to all previous subscriptions
let subscriptions = self.subscriptions.clone();
for subscription in subscriptions {
self.send_message(serde_json::to_value(subscription)?)
.await?;
}
return Ok(());
},
Err(e) => {
error!("Reconnection attempt {} failed: {}", retries + 1, e);
retries += 1;
if retries < self.reconnect_config.max_retries {
tokio::time::sleep(delay).await;
delay = std::cmp::min(
delay.mul_f64(self.reconnect_config.backoff_multiplier),
self.reconnect_config.max_delay,
);
}
},
}
}
Err(PolyfillError::stream(
format!(
"Failed to reconnect after {} attempts",
self.reconnect_config.max_retries
),
crate::errors::StreamErrorKind::ConnectionFailed,
))
}
}
src/stream.rs:525
reconnect()clearsneeds_reconnectbefore the resubscribe loop. Ifsend_message(...)(or serialization) fails mid-resubscribe, the method returnsErrbut the stream is left withneeds_reconnect = false, which is inconsistent with a failed reconnect/resubscribe attempt. Consider only resetting the flag (and incrementingreconnect_count) after the full reconnect+resubscribe sequence succeeds; on failure, keepneeds_reconnecttrue (and optionally close the new connection).
Ok(()) => {
info!("Successfully reconnected");
self.stats.reconnect_count += 1;
self.needs_reconnect = false; // Reset flag on success
// Resubscribe to all previous subscriptions
let subscriptions = self.subscriptions.clone();
for subscription in subscriptions {
self.send_message(serde_json::to_value(subscription)?)
.await?;
}
src/stream.rs:525
- After a disconnect,
pending_booksmay still contain messages from the prior connection, andpoll_nextwill yield them before reading from the new socket (it checkspending_booksfirst). With the new publicreconnect(), this can cause stale/out-of-order messages to be delivered after reconnect. Consider clearingpending_books(and any other per-connection transient state) when the connection is lost and/or at the start ofreconnect().
// Resubscribe to all previous subscriptions
let subscriptions = self.subscriptions.clone();
for subscription in subscriptions {
self.send_message(serde_json::to_value(subscription)?)
.await?;
}
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| /// Reconnect with exponential backoff | ||
| #[allow(dead_code)] | ||
| async fn reconnect(&mut self) -> Result<()> { | ||
| /// | ||
| /// Call this method when `needs_reconnect()` returns true. | ||
| /// It will attempt to reconnect to the WebSocket and resubscribe to all previous subscriptions. | ||
| pub async fn reconnect(&mut self) -> Result<()> { |
There was a problem hiding this comment.
The reconnect() rustdoc has an empty doc line (/// ) and doesn’t mention that poll_next returns None on Close/stream end (terminating typical while let Some(...) consumption). Consider tightening the rustdoc (remove the blank doc line) and documenting the expected consumer pattern (e.g., handle None/end-of-stream, then call reconnect() and resume polling).
| /// Check if reconnection is needed (connection was lost) | ||
| pub fn needs_reconnect(&self) -> bool { | ||
| self.needs_reconnect | ||
| } |
There was a problem hiding this comment.
needs_reconnect is only reset inside reconnect(). If the stream re-establishes a connection via another path that calls connect() (e.g., subscribe_async when connection.is_none()), needs_reconnect() can continue to return true even after a successful connect. Consider resetting the flag on any successful (re)connection (e.g., in connect()), or ensuring all reconnection entry points funnel through reconnect().
98ca78c to
113a447
Compare
Summary
Enables the existing reconnection logic by making it public and adding a flag to signal when reconnection is needed.
Changes
needs_reconnectfield toWebSocketStreamreconnect()public with exponential backoff (1s→60s, 2x multiplier, max 5 retries)needs_reconnect()getter for consumers to check reconnection statusneeds_reconnect=trueon Close frame, WS error, or stream endneeds_reconnect=falseon successful reconnectionUsage