From d7b5c0dbad744ce2016771a78f5fc01bfa50ce4f Mon Sep 17 00:00:00 2001 From: Brendan DeBeasi Date: Sat, 2 May 2026 09:03:27 -0700 Subject: [PATCH 1/2] fix(channel): retry retrigger relay sends and preserve delivery state --- src/agent/channel.rs | 108 +++++++++++++++++++++++++++++++++++++------ 1 file changed, 94 insertions(+), 14 deletions(-) diff --git a/src/agent/channel.rs b/src/agent/channel.rs index 963fdeab8..9b3b58ebb 100644 --- a/src/agent/channel.rs +++ b/src/agent/channel.rs @@ -40,6 +40,7 @@ use std::collections::HashSet; use std::sync::{Arc, Weak}; use tokio::sync::broadcast; use tokio::sync::{RwLock, mpsc}; +use tokio::time::{Duration, sleep}; /// Shared cache of in-flight worker transcript steps, keyed by worker ID. pub type LiveWorkerTranscripts = @@ -2959,8 +2960,7 @@ impl Channel { }) } - /// Send outbound text and record send metrics. - async fn send_outbound_text(&self, text: String, error_context: &str) { + async fn send_outbound_text(&self, text: String, error_context: &str) -> bool { match self.send_routed(OutboundResponse::Text(text)).await { Ok(()) => { #[cfg(feature = "metrics")] @@ -2971,6 +2971,7 @@ impl Channel { .with_label_values(&[&self.deps.agent_id, channel_type]) .inc(); } + true } Err(error) => { #[cfg(feature = "metrics")] @@ -2982,8 +2983,57 @@ impl Channel { .inc(); } tracing::error!(%error, channel_id = %self.id, "{error_context}"); + false + } + } + } + + async fn send_outbound_text_with_retry( + &self, + text: String, + error_context: &str, + max_attempts: usize, + ) -> bool { + let attempts = max_attempts.max(1); + for attempt in 1..=attempts { + if self + .send_outbound_text(text.clone(), error_context) + .await + { + if attempt > 1 { + tracing::info!( + channel_id = %self.id, + attempt, + attempts, + "outbound relay succeeded after retry" + ); + } + return true; + } + + if attempt < attempts { + let delay_ms = match attempt { + 1 => 250, + 2 => 1_000, + _ => 2_000, + }; + tracing::warn!( + channel_id = %self.id, + attempt, + attempts, + delay_ms, + "outbound relay failed; retrying" + ); + sleep(Duration::from_millis(delay_ms)).await; } } + + tracing::warn!( + channel_id = %self.id, + attempts, + "outbound relay failed after retries" + ); + false } /// Dispatch the LLM result: send fallback text, log errors, clean up typing. @@ -3061,11 +3111,24 @@ impl Channel { self.state .conversation_logger .log_bot_message(&self.state.channel_id, &final_text); - self.send_outbound_text( - final_text, - "failed to send retrigger fallback reply", - ) - .await; + let delivered = self + .send_outbound_text_with_retry( + final_text, + "failed to send retrigger fallback reply", + 3, + ) + .await; + if delivered { + replied_flag.store(true, std::sync::atomic::Ordering::Relaxed); + } else { + let _ = self + .send_outbound_text_with_retry( + "Delivery issue: your background result is preserved. Send 'continue' to replay it.".to_string(), + "failed to send relay failure backup notice", + 1, + ) + .await; + } } } } else { @@ -3126,11 +3189,24 @@ impl Channel { self.state .conversation_logger .log_bot_message(&self.state.channel_id, &final_text); - self.send_outbound_text( - final_text, - "failed to send retrigger fallback reply", - ) - .await; + let delivered = self + .send_outbound_text_with_retry( + final_text, + "failed to send retrigger fallback reply", + 3, + ) + .await; + if delivered { + replied_flag.store(true, std::sync::atomic::Ordering::Relaxed); + } else { + let _ = self + .send_outbound_text_with_retry( + "Delivery issue: your background result is preserved. Send 'continue' to replay it.".to_string(), + "failed to send relay failure backup notice", + 1, + ) + .await; + } } } } else { @@ -3186,8 +3262,12 @@ impl Channel { Some(self.agent_display_name()), tool_calls_json, ); - self.send_outbound_text(final_text, "failed to send fallback reply") - .await; + self.send_outbound_text_with_retry( + final_text, + "failed to send fallback reply", + 2, + ) + .await; } } From 1010830cc9e0421b0bef899de4f5b6174c60de98 Mon Sep 17 00:00:00 2001 From: Brendan DeBeasi Date: Sat, 9 May 2026 21:36:02 -0700 Subject: [PATCH 2/2] fix(channel): drop dead retry loop, keep replied_flag fix on fallback success MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Per CodeRabbit: tokio mpsc::Sender::send only fails when the receiver is closed, so the 250/1000/2000ms retry loop and the "send 'continue' to replay" backup notice are dead code (the backup goes through the same closed channel and would also fail). The real bug fix in this PR was setting replied_flag = true on successful fallback delivery — without it, the post-turn check at line ~2354 still logs "retrigger relay failed" and injects "[background work completed but relay to user failed]" into history even when the fallback succeeded, which is what produced the user-visible truncation symptom. Now: send_outbound_text returns bool, logs a clear warning on failure, fallback paths set replied_flag on success. Net diff: +13/-77. --- src/agent/channel.rs | 90 +++++++------------------------------------- 1 file changed, 13 insertions(+), 77 deletions(-) diff --git a/src/agent/channel.rs b/src/agent/channel.rs index 9b3b58ebb..0b80a1c7f 100644 --- a/src/agent/channel.rs +++ b/src/agent/channel.rs @@ -40,7 +40,6 @@ use std::collections::HashSet; use std::sync::{Arc, Weak}; use tokio::sync::broadcast; use tokio::sync::{RwLock, mpsc}; -use tokio::time::{Duration, sleep}; /// Shared cache of in-flight worker transcript steps, keyed by worker ID. pub type LiveWorkerTranscripts = @@ -2960,6 +2959,10 @@ impl Channel { }) } + /// Send outbound text and record send metrics. Returns `true` on success. + /// + /// Failure here means the outbound response channel's receiver has closed + /// (e.g. shutdown) — `mpsc::Sender::send` does not fail transiently. async fn send_outbound_text(&self, text: String, error_context: &str) -> bool { match self.send_routed(OutboundResponse::Text(text)).await { Ok(()) => { @@ -2988,54 +2991,6 @@ impl Channel { } } - async fn send_outbound_text_with_retry( - &self, - text: String, - error_context: &str, - max_attempts: usize, - ) -> bool { - let attempts = max_attempts.max(1); - for attempt in 1..=attempts { - if self - .send_outbound_text(text.clone(), error_context) - .await - { - if attempt > 1 { - tracing::info!( - channel_id = %self.id, - attempt, - attempts, - "outbound relay succeeded after retry" - ); - } - return true; - } - - if attempt < attempts { - let delay_ms = match attempt { - 1 => 250, - 2 => 1_000, - _ => 2_000, - }; - tracing::warn!( - channel_id = %self.id, - attempt, - attempts, - delay_ms, - "outbound relay failed; retrying" - ); - sleep(Duration::from_millis(delay_ms)).await; - } - } - - tracing::warn!( - channel_id = %self.id, - attempts, - "outbound relay failed after retries" - ); - false - } - /// Dispatch the LLM result: send fallback text, log errors, clean up typing. /// /// On retrigger turns (`is_retrigger = true`), fallback text is suppressed @@ -3111,23 +3066,14 @@ impl Channel { self.state .conversation_logger .log_bot_message(&self.state.channel_id, &final_text); - let delivered = self - .send_outbound_text_with_retry( + if self + .send_outbound_text( final_text, "failed to send retrigger fallback reply", - 3, ) - .await; - if delivered { + .await + { replied_flag.store(true, std::sync::atomic::Ordering::Relaxed); - } else { - let _ = self - .send_outbound_text_with_retry( - "Delivery issue: your background result is preserved. Send 'continue' to replay it.".to_string(), - "failed to send relay failure backup notice", - 1, - ) - .await; } } } @@ -3189,23 +3135,14 @@ impl Channel { self.state .conversation_logger .log_bot_message(&self.state.channel_id, &final_text); - let delivered = self - .send_outbound_text_with_retry( + if self + .send_outbound_text( final_text, "failed to send retrigger fallback reply", - 3, ) - .await; - if delivered { + .await + { replied_flag.store(true, std::sync::atomic::Ordering::Relaxed); - } else { - let _ = self - .send_outbound_text_with_retry( - "Delivery issue: your background result is preserved. Send 'continue' to replay it.".to_string(), - "failed to send relay failure backup notice", - 1, - ) - .await; } } } @@ -3262,10 +3199,9 @@ impl Channel { Some(self.agent_display_name()), tool_calls_json, ); - self.send_outbound_text_with_retry( + self.send_outbound_text( final_text, "failed to send fallback reply", - 2, ) .await; }