From 6feb7484c07198b3de7f5bdd9cd36c680d185e81 Mon Sep 17 00:00:00 2001 From: roharon Date: Sat, 4 Jan 2025 14:53:50 +0900 Subject: [PATCH] modify: improve run_with_timeout method using tokio::time::interval instead of timeout --- glide-core/src/client/mod.rs | 38 ++++++++++++++++++++++++------------ 1 file changed, 25 insertions(+), 13 deletions(-) diff --git a/glide-core/src/client/mod.rs b/glide-core/src/client/mod.rs index 005a38a9cae..7f0634b7a59 100644 --- a/glide-core/src/client/mod.rs +++ b/glide-core/src/client/mod.rs @@ -1,7 +1,7 @@ // Copyright Valkey GLIDE Project Contributors - SPDX Identifier: Apache-2.0 mod types; - +use std::future::Future; use crate::cluster_scan_container::insert_cluster_scan_cursor; use crate::scripts_container::get_script; use futures::FutureExt; @@ -20,6 +20,7 @@ pub use standalone_client::StandaloneClient; use std::io; use std::sync::atomic::{AtomicIsize, Ordering}; use std::sync::Arc; +use std::task::Poll; use std::time::Duration; pub use types::*; @@ -132,10 +133,21 @@ async fn run_with_timeout( future: impl futures::Future> + Send, ) -> redis::RedisResult { match timeout { - Some(duration) => tokio::time::timeout(duration, future) - .await - .map_err(|_| io::Error::from(io::ErrorKind::TimedOut).into()) - .and_then(|res| res), + Some(duration) => { + let interval = tokio::time::interval(duration); + tokio::pin!(interval); + + loop { + match future.as_mut().poll() { + Poll::Ready(result) => return result, + Poll::Pending => { + if let Poll::Ready(_) = future.as_mut().poll().await { + return Err(io::Error::from(io::ErrorKind::TimedOut).into()); + } + } + } + } + } None => future.await, } } @@ -291,9 +303,9 @@ impl Client { client.route_command(cmd, routing).await } } - .and_then(|value| convert_to_expected_type(value, expected_type)) + .and_then(|value| convert_to_expected_type(value, expected_type)) }) - .boxed() + .boxed() } // Cluster scan is not passed to redis-rs as a regular command, so we need to handle it separately. @@ -410,7 +422,7 @@ impl Client { Self::get_transaction_values(pipeline, values, command_count, offset) }) - .boxed() + .boxed() } pub async fn invoke_script<'a>( @@ -491,7 +503,7 @@ impl Client { } } }) - .await + .await { Ok(result) => { if immediate_auth { @@ -738,8 +750,8 @@ fn sanitized_request_string(request: &ConnectionRequest) -> String { }) .unwrap_or_default(); let connection_retry_strategy = request.connection_retry_strategy.as_ref().map(|strategy| - format!("\nreconnect backoff strategy: number of increasing duration retries: {}, base: {}, factor: {}", - strategy.number_of_retries, strategy.exponent_base, strategy.factor)).unwrap_or_default(); + format!("\nreconnect backoff strategy: number of increasing duration retries: {}, base: {}, factor: {}", + strategy.number_of_retries, strategy.exponent_base, strategy.factor)).unwrap_or_default(); let protocol = request .protocol .map(|protocol| format!("\nProtocol: {protocol:?}")) @@ -820,8 +832,8 @@ impl Client { inflight_requests_allowed, }) }) - .await - .map_err(|_| ConnectionError::Timeout)? + .await + .map_err(|_| ConnectionError::Timeout)? } }