Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
38 changes: 25 additions & 13 deletions glide-core/src/client/mod.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
// Copyright Valkey GLIDE Project Contributors - SPDX Identifier: Apache-2.0

mod types;

use std::future::Future;
use crate::cluster_scan_container::insert_cluster_scan_cursor;
use crate::scripts_container::get_script;
use futures::FutureExt;
Expand All @@ -20,6 +20,7 @@ pub use standalone_client::StandaloneClient;
use std::io;
use std::sync::atomic::{AtomicIsize, Ordering};
use std::sync::Arc;
use std::task::Poll;
use std::time::Duration;
pub use types::*;

Expand Down Expand Up @@ -132,10 +133,21 @@ async fn run_with_timeout<T>(
future: impl futures::Future<Output = RedisResult<T>> + Send,
) -> redis::RedisResult<T> {
match timeout {
Some(duration) => tokio::time::timeout(duration, future)
.await
.map_err(|_| io::Error::from(io::ErrorKind::TimedOut).into())
.and_then(|res| res),
Some(duration) => {
let interval = tokio::time::interval(duration);
tokio::pin!(interval);

loop {
match future.as_mut().poll() {
Poll::Ready(result) => return result,
Poll::Pending => {
if let Poll::Ready(_) = future.as_mut().poll().await {
return Err(io::Error::from(io::ErrorKind::TimedOut).into());
}
}
}
}
}
None => future.await,
}
}
Expand Down Expand Up @@ -291,9 +303,9 @@ impl Client {
client.route_command(cmd, routing).await
}
}
.and_then(|value| convert_to_expected_type(value, expected_type))
.and_then(|value| convert_to_expected_type(value, expected_type))
})
.boxed()
.boxed()
}

// Cluster scan is not passed to redis-rs as a regular command, so we need to handle it separately.
Expand Down Expand Up @@ -410,7 +422,7 @@ impl Client {

Self::get_transaction_values(pipeline, values, command_count, offset)
})
.boxed()
.boxed()
}

pub async fn invoke_script<'a>(
Expand Down Expand Up @@ -491,7 +503,7 @@ impl Client {
}
}
})
.await
.await
{
Ok(result) => {
if immediate_auth {
Expand Down Expand Up @@ -738,8 +750,8 @@ fn sanitized_request_string(request: &ConnectionRequest) -> String {
})
.unwrap_or_default();
let connection_retry_strategy = request.connection_retry_strategy.as_ref().map(|strategy|
format!("\nreconnect backoff strategy: number of increasing duration retries: {}, base: {}, factor: {}",
strategy.number_of_retries, strategy.exponent_base, strategy.factor)).unwrap_or_default();
format!("\nreconnect backoff strategy: number of increasing duration retries: {}, base: {}, factor: {}",
strategy.number_of_retries, strategy.exponent_base, strategy.factor)).unwrap_or_default();
let protocol = request
.protocol
.map(|protocol| format!("\nProtocol: {protocol:?}"))
Expand Down Expand Up @@ -820,8 +832,8 @@ impl Client {
inflight_requests_allowed,
})
})
.await
.map_err(|_| ConnectionError::Timeout)?
.await
.map_err(|_| ConnectionError::Timeout)?
}
}

Expand Down