diff --git a/Cargo.lock b/Cargo.lock index 61d77dea88..8fd4a2f3e1 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2841,6 +2841,7 @@ dependencies = [ "http-body-util", "libdd-capabilities", "libdd-common", + "tokio", ] [[package]] @@ -3232,11 +3233,14 @@ version = "0.1.0" dependencies = [ "async-trait", "futures", + "futures-util", "libdd-capabilities", + "libdd-capabilities-impl", "libdd-common", "tokio", "tokio-util", "tracing", + "wasm-bindgen-futures", ] [[package]] diff --git a/datadog-sidecar/src/service/agent_info.rs b/datadog-sidecar/src/service/agent_info.rs index 82c94769a7..9854b138a9 100644 --- a/datadog-sidecar/src/service/agent_info.rs +++ b/datadog-sidecar/src/service/agent_info.rs @@ -16,7 +16,7 @@ use datadog_ipc::platform::NamedShmHandle; use futures::future::Shared; use futures::FutureExt; use http::uri::PathAndQuery; -use libdd_capabilities_impl::DefaultHttpClient; +use libdd_capabilities_impl::NativeCapabilities; use libdd_common::{Endpoint, MutexExt}; use libdd_data_pipeline::agent_info::schema::AgentInfoStruct; use libdd_data_pipeline::agent_info::{fetch_info_with_state, FetchInfoStatus}; @@ -103,7 +103,7 @@ impl AgentInfoFetcher { fetch_endpoint.url = http::Uri::from_parts(parts).unwrap(); loop { let fetched = - fetch_info_with_state::(&fetch_endpoint, state.as_deref()) + fetch_info_with_state::(&fetch_endpoint, state.as_deref()) .await; let mut complete_fut = None; { diff --git a/datadog-sidecar/src/service/stats_flusher.rs b/datadog-sidecar/src/service/stats_flusher.rs index a507477342..9865d52cef 100644 --- a/datadog-sidecar/src/service/stats_flusher.rs +++ b/datadog-sidecar/src/service/stats_flusher.rs @@ -15,7 +15,7 @@ use datadog_ipc::shm_stats::{ ShmSpanConcentrator, DEFAULT_SLOT_COUNT, DEFAULT_STRING_POOL_BYTES, RELOAD_FILL_RATIO, }; use http::uri::PathAndQuery; -use libdd_capabilities_impl::{HttpClientTrait, NativeCapabilities}; +use libdd_capabilities_impl::{HttpClientCapability, NativeCapabilities}; use libdd_common::{Endpoint, MutexExt}; use libdd_trace_stats::stats_exporter::{StatsExporter, StatsMetadata}; use std::collections::HashMap; diff --git a/datadog-sidecar/src/service/tracing/trace_flusher.rs b/datadog-sidecar/src/service/tracing/trace_flusher.rs index 53d09c5823..75897762ab 100644 --- a/datadog-sidecar/src/service/tracing/trace_flusher.rs +++ b/datadog-sidecar/src/service/tracing/trace_flusher.rs @@ -5,8 +5,7 @@ use super::TraceSendData; use crate::agent_remote_config::AgentRemoteConfigWriter; use datadog_ipc::platform::NamedShmHandle; use futures::future::join_all; -use libdd_capabilities::HttpClientTrait; -use libdd_capabilities_impl::DefaultHttpClient; +use libdd_capabilities_impl::{HttpClientCapability, NativeCapabilities}; use libdd_common::{Endpoint, MutexExt}; use libdd_trace_utils::trace_utils; use libdd_trace_utils::trace_utils::SendData; @@ -96,7 +95,7 @@ pub(crate) struct TraceFlusher { pub(crate) min_force_drop_size_bytes: AtomicU32, // put a limit on memory usage remote_config: Mutex, pub metrics: Mutex, - client: DefaultHttpClient, + capabilities: NativeCapabilities, } impl Default for TraceFlusher { fn default() -> Self { @@ -107,7 +106,7 @@ impl Default for TraceFlusher { min_force_drop_size_bytes: AtomicU32::new(trace_utils::MAX_PAYLOAD_SIZE as u32), remote_config: Mutex::new(Default::default()), metrics: Mutex::new(Default::default()), - client: DefaultHttpClient::new_client(), + capabilities: NativeCapabilities::new_client(), } } } @@ -249,7 +248,7 @@ impl TraceFlusher { async fn send_and_handle_trace(&self, send_data: SendData) { let endpoint = send_data.get_target().clone(); - let response = send_data.send(&self.client).await; + let response = send_data.send(&self.capabilities).await; self.metrics.lock_or_panic().update(&response); match response.last_result { Ok(response) => { diff --git a/libdd-capabilities-impl/Cargo.toml b/libdd-capabilities-impl/Cargo.toml index 4238cbdab6..0f9fe0bab9 100644 --- a/libdd-capabilities-impl/Cargo.toml +++ b/libdd-capabilities-impl/Cargo.toml @@ -20,6 +20,7 @@ bytes = "1" http = "1" libdd-capabilities = { path = "../libdd-capabilities", version = "1.0.0" } libdd-common = { path = "../libdd-common", version = "4.0.0", default-features = false } +tokio = { version = "1", features = ["time"] } [target.'cfg(not(target_arch = "wasm32"))'.dependencies] http-body-util = "0.1" diff --git a/libdd-capabilities-impl/README.md b/libdd-capabilities-impl/README.md index 01b45384ec..030b3cb23d 100644 --- a/libdd-capabilities-impl/README.md +++ b/libdd-capabilities-impl/README.md @@ -8,11 +8,14 @@ Native implementations of `libdd-capabilities` traits. ## Capabilities -- **`DefaultHttpClient`**: HTTP client backed by hyper and the `libdd-common` connector infrastructure (supports Unix sockets, HTTPS with rustls, Windows named pipes). +- **`NativeHttpClient`**: HTTP client backed by hyper and the `libdd-common` connector infrastructure (supports Unix sockets, HTTPS with rustls, Windows named pipes). +- **`NativeSleepCapability`**: Sleep backed by `tokio::time::sleep`. + +Task spawning is handled internally by `SharedRuntime` and is not exposed as a capability trait. ## Types -- **`NativeCapabilities`**: Bundle type alias that implements all capability traits using native backends. Currently delegates to `DefaultHttpClient`; as more capability traits are added (spawn, sleep, etc.), this type will implement all of them. +- **`NativeCapabilities`**: Bundle struct that implements HTTP and sleep capability traits using native backends. Delegates to `NativeHttpClient` and `NativeSleepCapability`. ## Usage diff --git a/libdd-capabilities-impl/src/http.rs b/libdd-capabilities-impl/src/http.rs index b0ae9b3c75..e6af5b853c 100644 --- a/libdd-capabilities-impl/src/http.rs +++ b/libdd-capabilities-impl/src/http.rs @@ -4,7 +4,9 @@ //! Native HTTP client implementation backed by hyper. mod native { - use libdd_capabilities::http::{HttpClientTrait, HttpError}; + use std::sync::{Arc, OnceLock}; + + use libdd_capabilities::http::{HttpClientCapability, HttpError}; use libdd_capabilities::maybe_send::MaybeSend; use libdd_common::connector::Connector; use libdd_common::http_common::{new_default_client, Body, GenericHttpClient}; @@ -12,20 +14,22 @@ mod native { use http_body_util::BodyExt; #[derive(Clone)] - pub struct DefaultHttpClient { - client: GenericHttpClient, + pub struct NativeHttpClient { + client: Arc>>, } - impl std::fmt::Debug for DefaultHttpClient { + impl std::fmt::Debug for NativeHttpClient { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - f.debug_struct("DefaultHttpClient").finish() + f.debug_struct("NativeHttpClient") + .field("initialized", &self.client.get().is_some()) + .finish() } } - impl HttpClientTrait for DefaultHttpClient { + impl HttpClientCapability for NativeHttpClient { fn new_client() -> Self { Self { - client: new_default_client(), + client: Arc::new(OnceLock::new()), } } @@ -35,7 +39,7 @@ mod native { req: http::Request, ) -> impl std::future::Future, HttpError>> + MaybeSend { - let client = self.client.clone(); + let client = self.client.get_or_init(new_default_client).clone(); async move { let hyper_req = req.map(Body::from_bytes); @@ -57,4 +61,4 @@ mod native { } } -pub use native::DefaultHttpClient; +pub use native::NativeHttpClient; diff --git a/libdd-capabilities-impl/src/lib.rs b/libdd-capabilities-impl/src/lib.rs index 6b3f6cd959..d90d19034b 100644 --- a/libdd-capabilities-impl/src/lib.rs +++ b/libdd-capabilities-impl/src/lib.rs @@ -4,58 +4,75 @@ //! Native capability implementations for libdatadog. //! //! `NativeCapabilities` is the bundle struct that implements all capability -//! traits using platform-native backends (hyper for HTTP, tokio for spawn, +//! traits using platform-native backends (hyper for HTTP, tokio for sleep, //! etc.). Leaf crates (FFI, benchmarks) pin this type as the generic parameter. mod http; +pub mod sleep; -pub use libdd_capabilities::HttpClientTrait; - -#[cfg(not(target_arch = "wasm32"))] -pub use http::DefaultHttpClient; - -#[cfg(not(target_arch = "wasm32"))] -mod native { - use core::future::Future; - - use libdd_capabilities::http::HttpError; - use libdd_capabilities::MaybeSend; - - use super::DefaultHttpClient; - use super::HttpClientTrait; - - /// Bundle struct for native platform capabilities. - /// - /// Delegates to [`DefaultHttpClient`] for HTTP. As more capability traits are - /// added (spawn, sleep, etc.), additional fields and impls are added here - /// without changing the type identity — consumers see the same - /// `NativeCapabilities` throughout. - /// - /// Individual capability traits keep minimal per-function bounds (e.g. - /// functions that only need HTTP require just `H: HttpClientTrait`, not the - /// full bundle) so that native callers like the sidecar can use - /// `DefaultHttpClient` directly without pulling in this bundle. - #[derive(Clone, Debug)] - pub struct NativeCapabilities { - http: DefaultHttpClient, +use core::future::Future; +use std::time::Duration; + +pub use http::NativeHttpClient; +use libdd_capabilities::{http::HttpError, MaybeSend}; +pub use libdd_capabilities::{HttpClientCapability, SleepCapability}; +pub use sleep::NativeSleepCapability; + +/// Bundle struct for native platform capabilities. +/// +/// Delegates to [`NativeHttpClient`] for HTTP and [`NativeSleepCapability`] for +/// sleep. Task spawning is handled internally by `SharedRuntime`. +/// +/// Individual capability traits keep minimal per-function bounds (e.g. +/// functions that only need HTTP require just `H: HttpClientCapability`, not the +/// full bundle) so that native callers like the sidecar can use +/// `NativeHttpClient` directly without pulling in this bundle. +#[derive(Clone, Debug)] +pub struct NativeCapabilities { + http: NativeHttpClient, + sleep: NativeSleepCapability, +} + +impl Default for NativeCapabilities { + fn default() -> Self { + Self::new() } +} - impl HttpClientTrait for NativeCapabilities { - fn new_client() -> Self { - Self { - http: DefaultHttpClient::new_client(), - } +impl NativeCapabilities { + pub fn new() -> Self { + Self { + http: NativeHttpClient::new_client(), + sleep: NativeSleepCapability, } + } +} - fn request( - &self, - req: ::http::Request, - ) -> impl Future, HttpError>> + MaybeSend - { - self.http.request(req) +impl HttpClientCapability for NativeCapabilities { + fn new_client() -> Self { + Self { + http: NativeHttpClient::new_client(), + sleep: NativeSleepCapability, } } + + fn request( + &self, + req: ::http::Request, + ) -> impl Future, HttpError>> + MaybeSend { + self.http.request(req) + } } -#[cfg(not(target_arch = "wasm32"))] -pub use native::NativeCapabilities; +impl SleepCapability for NativeCapabilities { + fn new() -> Self { + Self { + http: NativeHttpClient::new_client(), + sleep: NativeSleepCapability, + } + } + + fn sleep(&self, duration: Duration) -> impl Future + MaybeSend { + self.sleep.sleep(duration) + } +} diff --git a/libdd-capabilities-impl/src/sleep.rs b/libdd-capabilities-impl/src/sleep.rs new file mode 100644 index 0000000000..ddf9db7122 --- /dev/null +++ b/libdd-capabilities-impl/src/sleep.rs @@ -0,0 +1,23 @@ +// Copyright 2026-Present Datadog, Inc. https://www.datadoghq.com/ +// SPDX-License-Identifier: Apache-2.0 + +//! Native sleep implementation backed by `tokio::time::sleep`. + +use core::future::Future; +use std::time::Duration; + +use libdd_capabilities::maybe_send::MaybeSend; +use libdd_capabilities::sleep::SleepCapability; + +#[derive(Clone, Debug)] +pub struct NativeSleepCapability; + +impl SleepCapability for NativeSleepCapability { + fn new() -> Self { + Self + } + + fn sleep(&self, duration: Duration) -> impl Future + MaybeSend { + tokio::time::sleep(duration) + } +} diff --git a/libdd-capabilities/README.md b/libdd-capabilities/README.md index aff709d27a..c93fa7c19c 100644 --- a/libdd-capabilities/README.md +++ b/libdd-capabilities/README.md @@ -10,7 +10,7 @@ This crate has **zero platform dependencies**: it compiles on any target includi ## Traits -- **`HttpClientTrait`**: Async HTTP request/response using `http::Request` / `http::Response`. +- **`HttpClientCapability`**: Async HTTP request/response using `http::Request` / `http::Response`. - **`MaybeSend`**: Conditional `Send` bound: equivalent to `Send` on native, auto-implemented for all types on wasm. This bridges the gap between tokio's multi-threaded runtime (requires `Send` futures) and wasm's single-threaded model (where JS interop types are `!Send`). ## Architecture @@ -18,15 +18,15 @@ This crate has **zero platform dependencies**: it compiles on any target includi Three-layer design: 1. **Trait definitions** (this crate): Pure traits, no platform deps. -2. **Core crates** (`libdd-trace-utils`, `libdd-data-pipeline`): Generic over `C: HttpClientTrait`. Depend only on this crate for trait bounds. +2. **Core crates** (`libdd-trace-utils`, `libdd-data-pipeline`): Generic over `C: HttpClientCapability`. Depend only on this crate for trait bounds. 3. **Leaf crates** (FFI, wasm bindings): Pin a concrete type, `NativeCapabilities` from `libdd-capabilities-impl` on native, `WasmCapabilities` from the Node.js binding crate on wasm. ## Usage ```rust -use libdd_capabilities::{HttpClientTrait, MaybeSend}; +use libdd_capabilities::{HttpClientCapability, MaybeSend}; -async fn fetch(client: &C, req: http::Request) { +async fn fetch(client: &C, req: http::Request) { let response = client.request(req).await.unwrap(); println!("status: {}", response.status()); } diff --git a/libdd-capabilities/src/http.rs b/libdd-capabilities/src/http.rs index 98dab26abf..f4451acc1a 100644 --- a/libdd-capabilities/src/http.rs +++ b/libdd-capabilities/src/http.rs @@ -24,7 +24,7 @@ pub enum HttpError { Other(anyhow::Error), } -pub trait HttpClientTrait: Clone + std::fmt::Debug { +pub trait HttpClientCapability: Clone + std::fmt::Debug { fn new_client() -> Self; fn request( diff --git a/libdd-capabilities/src/lib.rs b/libdd-capabilities/src/lib.rs index 5e10d9fc88..340fbf2c6c 100644 --- a/libdd-capabilities/src/lib.rs +++ b/libdd-capabilities/src/lib.rs @@ -5,8 +5,12 @@ pub mod http; pub mod maybe_send; +pub mod sleep; +pub mod spawn; -pub use self::http::{HttpClientTrait, HttpError}; +pub use self::http::{HttpClientCapability, HttpError}; +pub use self::sleep::SleepCapability; +pub use self::spawn::SpawnError; pub use ::http::{Request, Response}; pub use bytes::Bytes; pub use maybe_send::MaybeSend; diff --git a/libdd-capabilities/src/sleep.rs b/libdd-capabilities/src/sleep.rs new file mode 100644 index 0000000000..a11e548d06 --- /dev/null +++ b/libdd-capabilities/src/sleep.rs @@ -0,0 +1,24 @@ +// Copyright 2026-Present Datadog, Inc. https://www.datadoghq.com/ +// SPDX-License-Identifier: Apache-2.0 + +//! Sleep capability trait. +//! +//! Abstracts async sleep so that native code can use `tokio::time::sleep` +//! while wasm delegates to `setTimeout` via `JsFuture`. + +use crate::maybe_send::MaybeSend; +use core::future::Future; +use std::time::Duration; + +pub trait SleepCapability: Clone + std::fmt::Debug { + /// Construct a new sleeper. + /// + /// Stateless impls return a unit struct; stateful impls (mock clocks, + /// virtual time sources, etc.) should return a sensible default. Callers + /// that don't have an instance handy can use the static-style + /// `C::new().sleep(duration)` pattern, mirroring `HttpClientCapability`'s + /// `new_client()` + `request(&self)` shape. + fn new() -> Self; + + fn sleep(&self, duration: Duration) -> impl Future + MaybeSend; +} diff --git a/libdd-capabilities/src/spawn.rs b/libdd-capabilities/src/spawn.rs new file mode 100644 index 0000000000..fffb1ef967 --- /dev/null +++ b/libdd-capabilities/src/spawn.rs @@ -0,0 +1,29 @@ +// Copyright 2026-Present Datadog, Inc. https://www.datadoghq.com/ +// SPDX-License-Identifier: Apache-2.0 + +//! Spawn-related types shared across platforms. +//! +//! Task spawning is handled internally by `SharedRuntime`; this module only +//! provides the executor-agnostic [`SpawnError`] type used in join handles. + +use core::fmt; + +/// Executor-agnostic error returned when a spawned task is aborted or panics. +#[derive(Debug)] +pub struct SpawnError { + msg: String, +} + +impl SpawnError { + pub fn new(msg: impl Into) -> Self { + Self { msg: msg.into() } + } +} + +impl fmt::Display for SpawnError { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + write!(f, "spawned task failed: {}", self.msg) + } +} + +impl core::error::Error for SpawnError {} diff --git a/libdd-data-pipeline/Cargo.toml b/libdd-data-pipeline/Cargo.toml index 05f1705e16..707b3dbebb 100644 --- a/libdd-data-pipeline/Cargo.toml +++ b/libdd-data-pipeline/Cargo.toml @@ -33,7 +33,7 @@ uuid = { version = "1.10.0", features = ["v4"] } tokio-util = "0.7.11" libdd-capabilities = { path = "../libdd-capabilities", version = "1.0.0" } libdd-common = { version = "4.0.0", path = "../libdd-common", default-features = false } -libdd-shared-runtime = { version = "0.1.0", path = "../libdd-shared-runtime" } +libdd-shared-runtime = { version = "0.1.0", path = "../libdd-shared-runtime", default-features = false } libdd-telemetry = { version = "5.0.0", path = "../libdd-telemetry", default-features = false, optional = true} libdd-trace-protobuf = { version = "3.0.1", path = "../libdd-trace-protobuf" } libdd-trace-stats = { version = "2.0.0", path = "../libdd-trace-stats", default-features = false } @@ -85,6 +85,7 @@ telemetry = ["libdd-telemetry"] https = [ "libdd-common/https", "libdd-capabilities-impl/https", + "libdd-shared-runtime/https", "libdd-telemetry?/https", "libdd-trace-stats/https", "libdd-trace-utils/https", @@ -93,6 +94,7 @@ https = [ fips = [ "libdd-common/fips", "libdd-capabilities-impl/fips", + "libdd-shared-runtime/fips", "libdd-telemetry?/fips", "libdd-trace-stats/fips", "libdd-trace-utils/fips", diff --git a/libdd-data-pipeline/src/agent_info/fetcher.rs b/libdd-data-pipeline/src/agent_info/fetcher.rs index 41a80364bd..dfca7f276a 100644 --- a/libdd-data-pipeline/src/agent_info/fetcher.rs +++ b/libdd-data-pipeline/src/agent_info/fetcher.rs @@ -10,7 +10,7 @@ use super::{ use anyhow::{anyhow, Result}; use async_trait::async_trait; use bytes::Bytes; -use libdd_capabilities::{HttpClientTrait, MaybeSend}; +use libdd_capabilities::{HttpClientCapability, MaybeSend, SleepCapability}; use libdd_common::Endpoint; use libdd_shared_runtime::Worker; use sha2::{Digest, Sha256}; @@ -18,7 +18,6 @@ use std::marker::PhantomData; use std::sync::Arc; use std::time::Duration; use tokio::sync::mpsc; -use tokio::time::sleep; use tracing::{debug, warn}; /// Whether the agent reported the same value or not. @@ -35,13 +34,13 @@ pub enum FetchInfoStatus { /// If either the agent state hash or container tags hash is different from the current one: /// - Return a `FetchInfoStatus::NewState` of the info struct /// - Else return `FetchInfoStatus::SameState` -async fn fetch_info_with_state_and_container_tags( +async fn fetch_info_with_state_and_container_tags( info_endpoint: &Endpoint, current_state_hash: Option<&str>, current_container_tags_hash: Option<&str>, ) -> Result { let (new_state_hash, body_data, container_tags_hash) = - fetch_and_hash_response::(info_endpoint).await?; + fetch_and_hash_response::(info_endpoint).await?; if current_state_hash.is_some_and(|state| state == new_state_hash) && (current_container_tags_hash.is_none() @@ -65,11 +64,11 @@ async fn fetch_info_with_state_and_container_tags( /// If the state hash is different from the current one: /// - Return a `FetchInfoStatus::NewState` of the info struct /// - Else return `FetchInfoStatus::SameState` -pub async fn fetch_info_with_state( +pub async fn fetch_info_with_state( info_endpoint: &Endpoint, current_state_hash: Option<&str>, ) -> Result { - fetch_info_with_state_and_container_tags::(info_endpoint, current_state_hash, None).await + fetch_info_with_state_and_container_tags::(info_endpoint, current_state_hash, None).await } /// Fetch the info endpoint once and return the info. @@ -93,8 +92,10 @@ pub async fn fetch_info_with_state( /// # Ok(()) /// # } /// ``` -pub async fn fetch_info(info_endpoint: &Endpoint) -> Result> { - match fetch_info_with_state::(info_endpoint, None).await? { +pub async fn fetch_info( + info_endpoint: &Endpoint, +) -> Result> { + match fetch_info_with_state::(info_endpoint, None).await? { FetchInfoStatus::NewState(info) => Ok(info), // Should never be reached since there is no previous state. FetchInfoStatus::SameState => Err(anyhow!("Invalid state header")), @@ -105,7 +106,7 @@ pub async fn fetch_info(info_endpoint: &Endpoint) -> Result< /// /// Returns a tuple of (state_hash, response_body_bytes, container_tags_hash). /// The hash is calculated using SHA256 to match the agent's calculation method. -async fn fetch_and_hash_response( +async fn fetch_and_hash_response( info_endpoint: &Endpoint, ) -> Result<(String, bytes::Bytes, Option)> { let req = info_endpoint @@ -114,10 +115,18 @@ async fn fetch_and_hash_response( .map_err(|e| anyhow!("Failed to build request: {}", e))?; let timeout = Duration::from_millis(info_endpoint.timeout_ms); - let client = H::new_client(); - let res = tokio::time::timeout(timeout, client.request(req)) - .await - .map_err(|_| anyhow!("Request to /info timed out after {:?}", timeout))??; + let client = C::new_client(); + let sleeper = ::new(); + // Runtime-agnostic timeout: race the request against a capability-driven + // sleep instead of `tokio::time::timeout`, which requires a tokio reactor + // (not available on wasm where we run on the JS event loop). + let res = tokio::select! { + biased; + result = client.request(req) => result?, + _ = sleeper.sleep(timeout) => { + return Err(anyhow!("Request to /info timed out after {:?}", timeout)); + } + }; // Extract the Datadog-Container-Tags-Hash header let container_tags_hash = res @@ -149,7 +158,7 @@ async fn fetch_and_hash_response( /// # Example /// ```no_run /// # use anyhow::Result; -/// # use libdd_capabilities_impl::NativeCapabilities; +/// # use libdd_capabilities_impl::{HttpClientCapability, NativeCapabilities}; /// # use libdd_shared_runtime::Worker; /// # #[tokio::main] /// # async fn main() -> Result<()> { @@ -179,20 +188,20 @@ async fn fetch_and_hash_response( /// # Ok(()) /// # } /// ``` -/// `H` is the HTTP client implementation, see [`HttpClientTrait`]. Leaf crates -/// pin it to a concrete type. +/// `C` is the capability bundle, see [`HttpClientCapability`] and [`SleepCapability`]. +/// Leaf crates pin it to a concrete type. #[derive(Debug)] -pub struct AgentInfoFetcher { +pub struct AgentInfoFetcher { info_endpoint: Endpoint, refresh_interval: Duration, trigger_rx: Option>, trigger_tx: mpsc::Sender<()>, - /// `H` must live on the struct because `Worker::run(&mut self)` (a fixed - /// trait signature) calls `fetch_info_with_state::()` internally. - _phantom: PhantomData, + /// `C` lives on the struct because `Worker::run(&mut self)` (a fixed trait + /// signature) calls `fetch_info_with_state::()` internally. + _phantom: PhantomData, } -impl AgentInfoFetcher { +impl AgentInfoFetcher { /// Return a new `AgentInfoFetcher` fetching the `info_endpoint` on each `refresh_interval` /// and updating the stored info. /// @@ -227,7 +236,9 @@ impl AgentInfoFetcher { #[cfg_attr(not(target_arch = "wasm32"), async_trait)] #[cfg_attr(target_arch = "wasm32", async_trait(?Send))] -impl Worker for AgentInfoFetcher { +impl Worker + for AgentInfoFetcher +{ async fn initial_trigger(&mut self) { // Skip initial wait if cache is not populated if AGENT_INFO_CACHE.load().is_none() { @@ -237,6 +248,7 @@ impl Worker for AgentInfoFetche } async fn trigger(&mut self) { + let sleeper = ::new(); // Wait for either a manual trigger or the refresh interval match &mut self.trigger_rx { Some(trigger_rx) => { @@ -249,12 +261,12 @@ impl Worker for AgentInfoFetche } } // Regular periodic fetch timer - _ = sleep(self.refresh_interval) => {} + _ = sleeper.sleep(self.refresh_interval) => {} } } None => { // If the trigger channel is closed we only use timed fetch. - sleep(self.refresh_interval).await; + sleeper.sleep(self.refresh_interval).await; } } } @@ -274,7 +286,7 @@ impl Worker for AgentInfoFetche } } -impl AgentInfoFetcher { +impl AgentInfoFetcher { /// Fetch agent info and update cache if needed async fn fetch_and_update(&self) { let current_info = AGENT_INFO_CACHE.load(); @@ -282,7 +294,7 @@ impl AgentInfoFetcher { let current_container_tags_hash = current_info .as_ref() .and_then(|info| info.info.container_tags_hash.as_deref()); - let res = fetch_info_with_state_and_container_tags::( + let res = fetch_info_with_state_and_container_tags::( &self.info_endpoint, current_hash, current_container_tags_hash, diff --git a/libdd-data-pipeline/src/otlp/exporter.rs b/libdd-data-pipeline/src/otlp/exporter.rs index 8cb13a8cae..08c849392e 100644 --- a/libdd-data-pipeline/src/otlp/exporter.rs +++ b/libdd-data-pipeline/src/otlp/exporter.rs @@ -5,7 +5,7 @@ use super::config::OtlpTraceConfig; use crate::trace_exporter::error::{InternalErrorKind, RequestError, TraceExporterError}; -use libdd_capabilities::HttpClientTrait; +use libdd_capabilities::{HttpClientCapability, SleepCapability}; use libdd_common::Endpoint; use libdd_trace_utils::send_with_retry::{ send_with_retry, RetryBackoffType, RetryStrategy, SendWithRetryError, @@ -22,8 +22,8 @@ const OTLP_RETRY_DELAY_MS: u64 = 100; /// /// `test_token` is forwarded as `X-Datadog-Test-Session-Token` when set, enabling snapshot tests /// against the Datadog test agent's OTLP endpoint. -pub async fn send_otlp_traces_http( - client: &H, +pub async fn send_otlp_traces_http( + capabilities: &C, config: &OtlpTraceConfig, test_token: Option<&str>, json_body: Vec, @@ -62,7 +62,7 @@ pub async fn send_otlp_traces_http( None, ); - match send_with_retry(client, &target, json_body, &headers, &retry_strategy).await { + match send_with_retry(capabilities, &target, json_body, &headers, &retry_strategy).await { Ok(_) => Ok(()), Err(e) => Err(map_send_error(e).await), } diff --git a/libdd-data-pipeline/src/telemetry/mod.rs b/libdd-data-pipeline/src/telemetry/mod.rs index 1676665614..9bf4177b5a 100644 --- a/libdd-data-pipeline/src/telemetry/mod.rs +++ b/libdd-data-pipeline/src/telemetry/mod.rs @@ -328,6 +328,7 @@ mod tests { use httpmock::Method::POST; use httpmock::MockServer; use libdd_capabilities::HttpError; + use libdd_shared_runtime::{SharedRuntime, WorkerHandle}; use libdd_trace_utils::test_utils::poll_for_mock_hits; // Use `regex::Regex` directly here because `httpmock`'s `body_matches` diff --git a/libdd-data-pipeline/src/trace_buffer/mod.rs b/libdd-data-pipeline/src/trace_buffer/mod.rs index 95e7ee52e8..f3239abc3f 100644 --- a/libdd-data-pipeline/src/trace_buffer/mod.rs +++ b/libdd-data-pipeline/src/trace_buffer/mod.rs @@ -13,7 +13,7 @@ use std::{ time::{Duration, Instant}, }; -use libdd_capabilities::{HttpClientTrait, MaybeSend}; +use libdd_capabilities::{HttpClientCapability, MaybeSend, SleepCapability}; use libdd_shared_runtime::Worker; use crate::trace_exporter::{ @@ -568,18 +568,18 @@ pub trait Export: Send + Debug { } #[derive(Debug)] -pub struct DefaultExport { - trace_exporter: TraceExporter, +pub struct DefaultExport { + trace_exporter: TraceExporter, } -impl DefaultExport { - pub fn new(trace_exporter: TraceExporter) -> Self { +impl DefaultExport { + pub fn new(trace_exporter: TraceExporter) -> Self { Self { trace_exporter } } } -impl - Export for DefaultExport +impl + Export for DefaultExport { fn export_trace_chunks( &mut self, diff --git a/libdd-data-pipeline/src/trace_exporter/builder.rs b/libdd-data-pipeline/src/trace_exporter/builder.rs index fbff2016b1..b4a00e3a14 100644 --- a/libdd-data-pipeline/src/trace_exporter/builder.rs +++ b/libdd-data-pipeline/src/trace_exporter/builder.rs @@ -4,11 +4,11 @@ use crate::agent_info::AgentInfoFetcher; use crate::otlp::config::{OtlpProtocol, DEFAULT_OTLP_TIMEOUT}; use crate::otlp::OtlpTraceConfig; -#[cfg(feature = "telemetry")] +#[cfg(all(not(target_arch = "wasm32"), feature = "telemetry"))] use crate::telemetry::TelemetryClientBuilder; use crate::trace_exporter::agent_response::AgentResponsePayloadVersion; use crate::trace_exporter::error::BuilderErrorKind; -#[cfg(feature = "telemetry")] +#[cfg(all(not(target_arch = "wasm32"), feature = "telemetry"))] use crate::trace_exporter::TelemetryConfig; #[cfg(not(target_arch = "wasm32"))] use crate::trace_exporter::TraceExporterWorkers; @@ -18,7 +18,7 @@ use crate::trace_exporter::{ TracerMetadata, INFO_ENDPOINT, }; use arc_swap::ArcSwap; -use libdd_capabilities::{HttpClientTrait, MaybeSend}; +use libdd_capabilities::{HttpClientCapability, MaybeSend, SleepCapability}; use libdd_common::{parse_uri, tag, Endpoint}; use libdd_dogstatsd_client::new; use libdd_shared_runtime::SharedRuntime; @@ -274,9 +274,9 @@ impl TraceExporterBuilder { } #[allow(missing_docs)] - pub fn build( + pub fn build( self, - ) -> Result, TraceExporterError> { + ) -> Result, TraceExporterError> { if !Self::is_inputs_outputs_formats_compatible(self.input_format, self.output_format) { return Err(TraceExporterError::Builder( BuilderErrorKind::InvalidConfiguration( @@ -305,230 +305,172 @@ impl TraceExporterBuilder { })?; let libdatadog_version = tag!("libdatadog_version", env!("CARGO_PKG_VERSION")); + + // On native, `C::new_client()` may capture `tokio::runtime::Handle::current()` + // internally (e.g. `NativeCapabilities`). Enter the SharedRuntime's tokio context + // so that handle is available. On wasm this is a no-op — the JS event loop is + // always the implicit executor. + #[cfg(not(target_arch = "wasm32"))] + let _guard = shared_runtime + .runtime_handle() + .map_err(|e| { + TraceExporterError::Builder(BuilderErrorKind::InvalidConfiguration(e.to_string())) + })? + .enter(); + let capabilities = C::new_client(); + + // --- Platform-specific worker setup --- + // The blocks below spawn background workers via `SharedRuntime`. On + // native, workers run on the tokio runtime; on wasm, they run on the JS + // event loop via `spawn_local`. Telemetry remains native-only for now. + + let info_endpoint = Endpoint::from_url(add_path(&agent_url, INFO_ENDPOINT)); + let (info_fetcher, info_response_observer) = + AgentInfoFetcher::::new(info_endpoint, Duration::from_secs(5 * 60)); + let info_fetcher_handle = + shared_runtime + .spawn_worker(info_fetcher, false) + .map_err(|e| { + TraceExporterError::Builder(BuilderErrorKind::InvalidConfiguration( + e.to_string(), + )) + })?; + // The handle is currently only tracked for shutdown on native; on wasm + // it is dropped here (the worker keeps running on the JS event loop + // until the page/module is torn down). + #[cfg(target_arch = "wasm32")] + let _ = info_fetcher_handle; + #[allow(unused_mut)] let mut stats = StatsComputationStatus::Disabled; - #[cfg(not(target_arch = "wasm32"))] - { - let info_endpoint = Endpoint::from_url(add_path(&agent_url, INFO_ENDPOINT)); - let (info_fetcher, info_response_observer) = - AgentInfoFetcher::::new(info_endpoint.clone(), Duration::from_secs(5 * 60)); - let info_fetcher_handle = - shared_runtime - .spawn_worker(info_fetcher, false) - .map_err(|e| { + if let Some(bucket_size) = self.stats_bucket_size { + stats = StatsComputationStatus::DisabledByAgent { bucket_size }; + } + + #[cfg(all(not(target_arch = "wasm32"), feature = "telemetry"))] + let (telemetry_client, telemetry_handle) = { + let sessions = self.telemetry_instrumentation_sessions; + let telemetry = self.telemetry.map(|telemetry_config| { + let mut builder = TelemetryClientBuilder::default() + .set_language(&self.language) + .set_language_version(&self.language_version) + .set_service_name(&self.service) + .set_service_version(&self.app_version) + .set_env(&self.env) + .set_tracer_version(&self.tracer_version) + .set_heartbeat(telemetry_config.heartbeat) + .set_url(base_url) + .set_debug_enabled(telemetry_config.debug_enabled); + if let Some(id) = telemetry_config.runtime_id { + builder = builder.set_runtime_id(&id); + } + if let Some(ref id) = sessions.session_id { + builder = builder.set_session_id(id); + } + if let Some(ref id) = sessions.root_session_id { + builder = builder.set_root_session_id(id); + } + if let Some(ref id) = sessions.parent_session_id { + builder = builder.set_parent_session_id(id); + } + Ok(builder.build()) + }); + match telemetry { + Some(Ok((client_tel, worker))) => { + let handle = shared_runtime.spawn_worker(worker, false).map_err(|e| { TraceExporterError::Builder(BuilderErrorKind::InvalidConfiguration( e.to_string(), )) })?; - - if let Some(bucket_size) = self.stats_bucket_size { - stats = StatsComputationStatus::DisabledByAgent { bucket_size }; + shared_runtime.block_on(client_tel.start()).map_err(|e| { + TraceExporterError::Builder(BuilderErrorKind::InvalidConfiguration( + e.to_string(), + )) + })?; + (Some(client_tel), Some(handle)) + } + Some(Err(e)) => return Err(e), + None => (None, None), } + }; - #[cfg(feature = "telemetry")] - let (telemetry_client, telemetry_handle) = { - let sessions = self.telemetry_instrumentation_sessions; - let telemetry = self.telemetry.map(|telemetry_config| { - let mut builder = TelemetryClientBuilder::default() - .set_language(&self.language) - .set_language_version(&self.language_version) - .set_service_name(&self.service) - .set_service_version(&self.app_version) - .set_env(&self.env) - .set_tracer_version(&self.tracer_version) - .set_heartbeat(telemetry_config.heartbeat) - .set_url(base_url) - .set_debug_enabled(telemetry_config.debug_enabled); - if let Some(id) = telemetry_config.runtime_id { - builder = builder.set_runtime_id(&id); - } - if let Some(ref id) = sessions.session_id { - builder = builder.set_session_id(id); - } - if let Some(ref id) = sessions.root_session_id { - builder = builder.set_root_session_id(id); + let otlp_config = self.otlp_endpoint.map(|url| { + let mut headers = http::HeaderMap::new(); + for (key, value) in self.otlp_headers { + match ( + http::HeaderName::from_bytes(key.as_bytes()), + http::HeaderValue::from_str(&value), + ) { + (Ok(name), Ok(val)) => { + headers.insert(name, val); } - if let Some(ref id) = sessions.parent_session_id { - builder = builder.set_parent_session_id(id); + _ => { + tracing::warn!("Skipping invalid OTLP header: {:?}={:?}", key, value); } - Ok(builder.build()) - }); - match telemetry { - Some(Ok((client, worker))) => { - let handle = shared_runtime.spawn_worker(worker, false).map_err(|e| { - TraceExporterError::Builder(BuilderErrorKind::InvalidConfiguration( - e.to_string(), - )) - })?; - shared_runtime.block_on(client.start()).map_err(|e| { - TraceExporterError::Builder(BuilderErrorKind::InvalidConfiguration( - e.to_string(), - )) - })?; - (Some(client), Some(handle)) - } - Some(Err(e)) => return Err(e), - None => (None, None), } - }; - - Ok(TraceExporter { - endpoint: Endpoint { - url: agent_url, - test_token: self.test_session_token.map(|token| token.into()), - timeout_ms: self - .connection_timeout - .unwrap_or(Endpoint::default().timeout_ms), - ..Default::default() - }, - metadata: TracerMetadata { - tracer_version: self.tracer_version, - language_version: self.language_version, - language_interpreter: self.language_interpreter, - language_interpreter_vendor: self.language_interpreter_vendor, - language: self.language, - git_commit_sha: self.git_commit_sha, - process_tags: self.process_tags, - client_computed_stats: self.client_computed_stats, - client_computed_top_level: self.client_computed_top_level, - hostname: self.hostname, - env: self.env, - app_version: self.app_version, - runtime_id: uuid::Uuid::new_v4().to_string(), - service: self.service, - }, - input_format: self.input_format, - output_format: self.output_format, - serializer: TraceSerializer::new(self.output_format), - client_computed_top_level: self.client_computed_top_level, - shared_runtime, - dogstatsd, - common_stats_tags: vec![libdatadog_version], - client_side_stats: ArcSwap::new(stats.into()), - previous_info_state: arc_swap::ArcSwapOption::new(None), - info_response_observer, - #[cfg(feature = "telemetry")] - telemetry: telemetry_client, - health_metrics_enabled: self.health_metrics_enabled, - client: H::new_client(), - workers: TraceExporterWorkers { - info_fetcher: info_fetcher_handle, - #[cfg(feature = "telemetry")] - telemetry: telemetry_handle, - }, - agent_payload_response_version: self - .agent_rates_payload_version_enabled - .then(AgentResponsePayloadVersion::new), - otlp_config: self.otlp_endpoint.map(|url| { - let mut headers = http::HeaderMap::new(); - for (key, value) in self.otlp_headers { - match ( - http::HeaderName::from_bytes(key.as_bytes()), - http::HeaderValue::from_str(&value), - ) { - (Ok(name), Ok(val)) => { - headers.insert(name, val); - } - _ => { - tracing::warn!( - "Skipping invalid OTLP header: {:?}={:?}", - key, - value - ); - } - } - } - OtlpTraceConfig { - endpoint_url: url, - headers, - timeout: self - .connection_timeout - .map(Duration::from_millis) - .unwrap_or(DEFAULT_OTLP_TIMEOUT), - protocol: OtlpProtocol::HttpJson, - } - }), - }) - } + } + OtlpTraceConfig { + endpoint_url: url, + headers, + timeout: self + .connection_timeout + .map(Duration::from_millis) + .unwrap_or(DEFAULT_OTLP_TIMEOUT), + protocol: OtlpProtocol::HttpJson, + } + }); - #[cfg(target_arch = "wasm32")] - { - let info_endpoint = Endpoint::from_url(add_path(&agent_url, INFO_ENDPOINT)); - let (_info_fetcher, info_response_observer) = - AgentInfoFetcher::::new(info_endpoint, Duration::from_secs(5 * 60)); - - Ok(TraceExporter { - endpoint: Endpoint { - url: agent_url, - test_token: self.test_session_token.map(|token| token.into()), - timeout_ms: self - .connection_timeout - .unwrap_or(Endpoint::default().timeout_ms), - ..Default::default() - }, - metadata: TracerMetadata { - tracer_version: self.tracer_version, - language_version: self.language_version, - language_interpreter: self.language_interpreter, - language_interpreter_vendor: self.language_interpreter_vendor, - language: self.language, - git_commit_sha: self.git_commit_sha, - process_tags: self.process_tags, - client_computed_stats: self.client_computed_stats, - client_computed_top_level: self.client_computed_top_level, - hostname: self.hostname, - env: self.env, - app_version: self.app_version, - runtime_id: uuid::Uuid::new_v4().to_string(), - service: self.service, - }, - input_format: self.input_format, - output_format: self.output_format, - serializer: TraceSerializer::new(self.output_format), + Ok(TraceExporter { + endpoint: Endpoint { + url: agent_url, + test_token: self.test_session_token.map(|token| token.into()), + timeout_ms: self + .connection_timeout + .unwrap_or(Endpoint::default().timeout_ms), + ..Default::default() + }, + metadata: TracerMetadata { + tracer_version: self.tracer_version, + language_version: self.language_version, + language_interpreter: self.language_interpreter, + language_interpreter_vendor: self.language_interpreter_vendor, + language: self.language, + git_commit_sha: self.git_commit_sha, + process_tags: self.process_tags, + client_computed_stats: self.client_computed_stats, client_computed_top_level: self.client_computed_top_level, - shared_runtime, - dogstatsd, - common_stats_tags: vec![libdatadog_version], - client_side_stats: ArcSwap::new(stats.into()), - previous_info_state: arc_swap::ArcSwapOption::new(None), - info_response_observer, - health_metrics_enabled: self.health_metrics_enabled, - client: H::new_client(), - agent_payload_response_version: self - .agent_rates_payload_version_enabled - .then(AgentResponsePayloadVersion::new), - otlp_config: self.otlp_endpoint.map(|url| { - let mut headers = http::HeaderMap::new(); - for (key, value) in self.otlp_headers { - match ( - http::HeaderName::from_bytes(key.as_bytes()), - http::HeaderValue::from_str(&value), - ) { - (Ok(name), Ok(val)) => { - headers.insert(name, val); - } - _ => { - tracing::warn!( - "Skipping invalid OTLP header: {:?}={:?}", - key, - value - ); - } - } - } - OtlpTraceConfig { - endpoint_url: url, - headers, - timeout: self - .connection_timeout - .map(Duration::from_millis) - .unwrap_or(DEFAULT_OTLP_TIMEOUT), - protocol: OtlpProtocol::HttpJson, - } - }), - }) - } + hostname: self.hostname, + env: self.env, + app_version: self.app_version, + runtime_id: uuid::Uuid::new_v4().to_string(), + service: self.service, + }, + input_format: self.input_format, + output_format: self.output_format, + serializer: TraceSerializer::new(self.output_format), + client_computed_top_level: self.client_computed_top_level, + shared_runtime, + dogstatsd, + common_stats_tags: vec![libdatadog_version], + client_side_stats: ArcSwap::new(stats.into()), + previous_info_state: arc_swap::ArcSwapOption::new(None), + info_response_observer, + #[cfg(all(not(target_arch = "wasm32"), feature = "telemetry"))] + telemetry: telemetry_client, + health_metrics_enabled: self.health_metrics_enabled, + capabilities, + #[cfg(not(target_arch = "wasm32"))] + workers: TraceExporterWorkers { + info_fetcher: info_fetcher_handle, + #[cfg(feature = "telemetry")] + telemetry: telemetry_handle, + }, + agent_payload_response_version: self + .agent_rates_payload_version_enabled + .then(AgentResponsePayloadVersion::new), + otlp_config, + }) } fn is_inputs_outputs_formats_compatible( diff --git a/libdd-data-pipeline/src/trace_exporter/mod.rs b/libdd-data-pipeline/src/trace_exporter/mod.rs index 71edd11f0a..95c13889c7 100644 --- a/libdd-data-pipeline/src/trace_exporter/mod.rs +++ b/libdd-data-pipeline/src/trace_exporter/mod.rs @@ -34,7 +34,7 @@ use bytes::Bytes; use http::header::HeaderMap; use http::uri::PathAndQuery; use http::Uri; -use libdd_capabilities::{HttpClientTrait, MaybeSend}; +use libdd_capabilities::{HttpClientCapability, MaybeSend, SleepCapability}; use libdd_common::tag::Tag; use libdd_common::Endpoint; use libdd_dogstatsd_client::Client; @@ -207,10 +207,11 @@ impl From for DeserInputFormat { } } -/// `H` is the HTTP client implementation, see [`HttpClientTrait`]. Leaf crates -/// pin it to a concrete type. +/// `C` is the capabilities bundle (HTTP, sleep). Leaf crates +/// pin it to a concrete type (`NativeCapabilities` or `WasmCapabilities`). +/// Task spawning is handled internally by `SharedRuntime`. #[derive(Debug)] -pub struct TraceExporter { +pub struct TraceExporter { endpoint: Endpoint, metadata: TracerMetadata, input_format: TraceExporterInputFormat, @@ -225,10 +226,10 @@ pub struct TraceExporter { #[cfg_attr(target_arch = "wasm32", allow(dead_code))] previous_info_state: ArcSwapOption, info_response_observer: ResponseObserver, - #[cfg(feature = "telemetry")] + #[cfg(all(not(target_arch = "wasm32"), feature = "telemetry"))] telemetry: Option, health_metrics_enabled: bool, - client: H, + capabilities: C, #[cfg(not(target_arch = "wasm32"))] workers: TraceExporterWorkers, agent_payload_response_version: Option, @@ -236,7 +237,7 @@ pub struct TraceExporter { otlp_config: Option, } -impl TraceExporter { +impl TraceExporter { #[allow(missing_docs)] pub fn builder() -> TraceExporterBuilder { TraceExporterBuilder::default() @@ -250,6 +251,7 @@ impl TraceExporter { /// # Errors /// Returns [`SharedRuntimeError::ShutdownTimedOut`] if a timeout was given and elapsed before /// all workers finished. + #[cfg(not(target_arch = "wasm32"))] pub fn shutdown(self, timeout: Option) -> Result<(), TraceExporterError> { let runtime = self.shared_runtime.clone(); if let Some(timeout) = timeout { @@ -313,6 +315,7 @@ impl TraceExporter { /// # Returns /// * Ok(AgentResponse): The response from the agent /// * Err(TraceExporterError): An error detailing what went wrong in the process + #[cfg(not(target_arch = "wasm32"))] pub fn send(&self, data: &[u8]) -> Result { self.check_agent_info(); @@ -390,7 +393,7 @@ impl TraceExporter { &ctx, &agent_info, &self.client_side_stats, - self.client.clone(), + self.capabilities.clone(), ); } StatsComputationStatus::Enabled { @@ -474,6 +477,7 @@ impl TraceExporter { /// # Returns /// * Ok(AgentResponse): The response from the agent (or Unchanged for OTLP) /// * Err(TraceExporterError): An error detailing what went wrong in the process + #[cfg(not(target_arch = "wasm32"))] pub fn send_trace_chunks( &self, trace_chunks: Vec>>, @@ -521,7 +525,7 @@ impl TraceExporter { TraceExporterError::Internal(InternalErrorKind::InvalidWorkerState(e.to_string())) })?; send_otlp_traces_http( - &self.client, + &self.capabilities, config, self.endpoint.test_token.as_deref(), json_body, @@ -531,6 +535,7 @@ impl TraceExporter { } /// Deserializes, processes and sends trace chunks to the agent + #[cfg(not(target_arch = "wasm32"))] fn send_deser( &self, data: &[u8], @@ -574,9 +579,16 @@ impl TraceExporter { let payload_len = mp_payload.len(); // Send traces to the agent - let result = send_with_retry(&self.client, endpoint, mp_payload, &headers, &strategy).await; + let result = send_with_retry( + &self.capabilities, + endpoint, + mp_payload, + &headers, + &strategy, + ) + .await; - #[cfg(feature = "telemetry")] + #[cfg(all(not(target_arch = "wasm32"), feature = "telemetry"))] if let Some(telemetry) = &self.telemetry { if let Err(e) = telemetry.send(&SendPayloadTelemetry::from_retry_result( &result, diff --git a/libdd-data-pipeline/src/trace_exporter/stats.rs b/libdd-data-pipeline/src/trace_exporter/stats.rs index 1fbaf82d0d..9f63597de1 100644 --- a/libdd-data-pipeline/src/trace_exporter/stats.rs +++ b/libdd-data-pipeline/src/trace_exporter/stats.rs @@ -10,7 +10,7 @@ #[cfg(not(target_arch = "wasm32"))] use crate::agent_info::schema::AgentInfo; use arc_swap::ArcSwap; -use libdd_capabilities::{HttpClientTrait, MaybeSend}; +use libdd_capabilities::{HttpClientCapability, MaybeSend, SleepCapability}; #[cfg(not(target_arch = "wasm32"))] use libdd_common::Endpoint; use libdd_common::MutexExt; @@ -71,12 +71,14 @@ fn get_span_kinds_for_stats(agent_info: &Arc) -> Vec { /// Start the stats exporter and enable stats computation /// /// Should only be used if the agent enabled stats computation -pub(crate) fn start_stats_computation( +pub(crate) fn start_stats_computation< + C: HttpClientCapability + SleepCapability + MaybeSend + Sync + 'static, +>( ctx: &StatsContext, client_side_stats: &ArcSwap, span_kinds: Vec, peer_tags: Vec, - client: H, + capabilities: C, ) -> anyhow::Result<()> { if let StatsComputationStatus::DisabledByAgent { bucket_size } = **client_side_stats.load() { let stats_concentrator = Arc::new(Mutex::new(SpanConcentrator::new( @@ -90,7 +92,7 @@ pub(crate) fn start_stats_computation( +fn create_and_start_stats_worker< + C: HttpClientCapability + SleepCapability + MaybeSend + Sync + 'static, +>( ctx: &StatsContext, bucket_size: Duration, stats_concentrator: &Arc>, client_side_stats: &ArcSwap, - client: H, + capabilities: C, ) -> anyhow::Result<()> { - let stats_exporter = StatsExporter::::new( + let stats_exporter = StatsExporter::::new( bucket_size, stats_concentrator.clone(), StatsMetadata::from(ctx.metadata.clone()), Endpoint::from_url(add_path(ctx.endpoint_url, STATS_ENDPOINT)), - client, + capabilities.clone(), ); let worker_handle = ctx .shared_runtime @@ -153,11 +157,13 @@ pub(crate) fn stop_stats_computation( #[cfg(not(target_arch = "wasm32"))] /// Handle stats computation when agent changes from disabled to enabled -pub(crate) fn handle_stats_disabled_by_agent( +pub(crate) fn handle_stats_disabled_by_agent< + C: HttpClientCapability + SleepCapability + MaybeSend + Sync + 'static, +>( ctx: &StatsContext, agent_info: &Arc, client_side_stats: &ArcSwap, - client: H, + capabilities: C, ) { if agent_info.info.client_drop_p0s.is_some_and(|v| v) { let status = start_stats_computation( @@ -165,7 +171,7 @@ pub(crate) fn handle_stats_disabled_by_agent debug!("Client-side stats enabled"), diff --git a/libdd-shared-runtime/Cargo.toml b/libdd-shared-runtime/Cargo.toml index 390958c9e7..62714ed9d5 100644 --- a/libdd-shared-runtime/Cargo.toml +++ b/libdd-shared-runtime/Cargo.toml @@ -25,7 +25,15 @@ libdd-capabilities = { path = "../libdd-capabilities", version = "1.0.0" } libdd-common = { version = "4.0.0", path = "../libdd-common", default-features = false } [features] +default = ["https"] +https = ["libdd-capabilities-impl/https"] +fips = ["libdd-capabilities-impl/fips"] regex-lite = ["libdd-common/regex-lite"] [target.'cfg(not(target_arch = "wasm32"))'.dependencies] +libdd-capabilities-impl = { path = "../libdd-capabilities-impl", version = "1.0.0", default-features = false } tokio = { version = "1.23", features = ["rt-multi-thread"] } + +[target.'cfg(target_arch = "wasm32")'.dependencies] +wasm-bindgen-futures = "0.4" +futures-util = { version = "0.3", default-features = false, features = ["channel"] } diff --git a/libdd-shared-runtime/src/shared_runtime/mod.rs b/libdd-shared-runtime/src/shared_runtime/mod.rs index 8fda5245b9..42f52837aa 100644 --- a/libdd-shared-runtime/src/shared_runtime/mod.rs +++ b/libdd-shared-runtime/src/shared_runtime/mod.rs @@ -14,12 +14,246 @@ use crate::worker::Worker; use futures::stream::{FuturesUnordered, StreamExt}; use libdd_common::MutexExt; use pausable_worker::{PausableWorker, PausableWorkerError}; -use std::sync::atomic::{AtomicU64, Ordering}; +use std::sync::atomic::AtomicU64; use std::sync::{Arc, Mutex}; use std::{fmt, io}; -use tokio::runtime::{Builder, Runtime}; use tracing::{debug, error}; +/// Native-only runtime management, fork safety, and tokio integration. +/// +/// Gated once here so individual items inside don't need `#[cfg]`. +#[cfg(not(target_arch = "wasm32"))] +mod native { + use super::*; + use pausable_worker::tokio_spawn_fn; + use std::sync::atomic::Ordering; + use tokio::runtime::{Builder, Runtime}; + + fn build_runtime() -> Result { + Builder::new_multi_thread() + .worker_threads(1) + .enable_all() + .build() + } + + impl SharedRuntime { + pub(in super::super) fn new_native() -> Result { + Ok(Self { + runtime: Arc::new(Mutex::new(Some(Arc::new(build_runtime()?)))), + workers: Arc::new(Mutex::new(Vec::new())), + next_worker_id: AtomicU64::new(1), + }) + } + + /// Returns a clone of the tokio runtime handle managed by this SharedRuntime. + /// + /// # Errors + /// Returns [`SharedRuntimeError::RuntimeUnavailable`] if the runtime has been shut down. + pub fn runtime_handle(&self) -> Result { + Ok(self + .runtime + .lock_or_panic() + .as_ref() + .ok_or(SharedRuntimeError::RuntimeUnavailable)? + .handle() + .clone()) + } + + /// Spawn a PausableWorker on this runtime. + /// + /// The worker will be tracked by this SharedRuntime and will be paused/resumed + /// during fork operations (native only). + /// If `restart_on_fork` is true, the worker will be reset and restarted when calling + /// `after_fork_child` else the worker is dropped *without* calling `Worker::shutdown`. + /// + /// # Errors + /// Returns an error if the worker cannot be started. + pub fn spawn_worker( + &self, + worker: T, + restart_on_fork: bool, + ) -> Result { + let boxed_worker: BoxedWorker = Box::new(worker); + debug!(?boxed_worker, "Spawning worker on SharedRuntime"); + let mut pausable_worker = PausableWorker::new(boxed_worker); + + // Lock runtime first, then workers, following the documented mutex + // lock order (matches before_fork). Both guards are held across + // start+push so that before_fork cannot interleave between them: + // otherwise before_fork could take the runtime, drop it, and miss + // our (not-yet-pushed) worker, leaving us with a worker running on + // a torn-down runtime that before_fork never paused. If the + // runtime has been taken (fork window already passed), we skip + // starting; after_fork_parent/child will start the worker on the + // new runtime. + let runtime_guard = self.runtime.lock_or_panic(); + let mut workers_guard = self.workers.lock_or_panic(); + + if let Some(rt) = runtime_guard.as_ref() { + if let Err(e) = pausable_worker.start(tokio_spawn_fn(rt.handle())) { + return Err(e.into()); + } + } + + let worker_id = self.next_worker_id.fetch_add(1, Ordering::Relaxed); + + workers_guard.push(WorkerEntry { + id: worker_id, + restart_on_fork, + worker: pausable_worker, + }); + + Ok(WorkerHandle { + worker_id, + workers: self.workers.clone(), + }) + } + + /// Hook to be called before forking. + /// + /// This method pauses all workers and prepares the runtime for forking. + /// It ensures that no background tasks are running when the fork occurs, + /// preventing potential deadlocks in the child process. + /// + /// Worker errors are logged but do not cause the function to fail. + /// If the worker fails to pause it is dropped without calling shutdown. + pub fn before_fork(&self) { + debug!("before_fork: pausing all workers"); + if let Some(runtime) = self.runtime.lock_or_panic().take() { + let mut workers_lock = self.workers.lock_or_panic(); + runtime.block_on(async { + let futures: FuturesUnordered<_> = workers_lock + .iter_mut() + .map(|worker_entry| async { + if let Err(e) = worker_entry.worker.pause().await { + error!("Worker failed to pause before fork: {:?}", e); + } + }) + .collect(); + + futures.collect::<()>().await; + }); + } + } + + fn restart_runtime(&self) -> Result<(), SharedRuntimeError> { + let mut runtime_lock = self.runtime.lock_or_panic(); + if runtime_lock.is_none() { + *runtime_lock = Some(Arc::new(build_runtime()?)); + } + Ok(()) + } + + /// Hook to be called in the parent process after forking. + /// + /// This method restarts workers and resumes normal operation in the parent process. + /// The runtime may need to be recreated if it was shut down in before_fork. + /// + /// # Errors + /// Returns an error if workers cannot be restarted or the runtime cannot be recreated. + pub fn after_fork_parent(&self) -> Result<(), SharedRuntimeError> { + debug!("after_fork_parent: restarting runtime and workers"); + self.restart_runtime()?; + + let runtime_lock = self.runtime.lock_or_panic(); + let handle = runtime_lock + .as_ref() + .ok_or(SharedRuntimeError::RuntimeUnavailable)? + .handle() + .clone(); + drop(runtime_lock); + + let mut workers_lock = self.workers.lock_or_panic(); + + for worker_entry in workers_lock.iter_mut() { + worker_entry.worker.start(tokio_spawn_fn(&handle))?; + } + + Ok(()) + } + + /// Hook to be called in the child process after forking. + /// + /// This method reinitializes the runtime and workers in the child process. + /// A new runtime must be created since tokio runtimes cannot be safely forked. + /// Workers are reset and restarted to resume operations in the child. + /// + /// # Errors + /// Returns an error if the runtime cannot be reinitialized or workers cannot be started. + pub fn after_fork_child(&self) -> Result<(), SharedRuntimeError> { + debug!("after_fork_child: reinitializing runtime and workers"); + self.restart_runtime()?; + + let runtime_lock = self.runtime.lock_or_panic(); + let handle = runtime_lock + .as_ref() + .ok_or(SharedRuntimeError::RuntimeUnavailable)? + .handle() + .clone(); + drop(runtime_lock); + + let mut workers_lock = self.workers.lock_or_panic(); + + workers_lock.retain(|entry| entry.restart_on_fork); + + for worker_entry in workers_lock.iter_mut() { + worker_entry.worker.reset(); + worker_entry.worker.start(tokio_spawn_fn(&handle))?; + } + + Ok(()) + } + + /// Run a future to completion on the shared runtime, blocking the current thread. + /// + /// If the runtime is not available (e.g. after calling before_fork), a temporary + /// single-threaded runtime is used. + /// + /// Not available on wasm32 -- use async paths instead. + /// + /// # Errors + /// Returns an error if it fails to create a fallback runtime. + pub fn block_on(&self, f: F) -> Result { + let runtime = match self.runtime.lock_or_panic().as_ref() { + None => Arc::new(Builder::new_current_thread().enable_all().build()?), + Some(runtime) => runtime.clone(), + }; + Ok(runtime.block_on(f)) + } + + /// Shutdown the runtime and all workers synchronously with optional timeout. + /// + /// Not available on wasm32 -- use [`shutdown_async`](Self::shutdown_async) instead. + /// + /// Worker errors are logged but do not cause the function to fail. + /// + /// # Errors + /// Returns an error only if shutdown times out. + pub fn shutdown( + &self, + timeout: Option, + ) -> Result<(), SharedRuntimeError> { + debug!(?timeout, "Shutting down SharedRuntime"); + match self.runtime.lock_or_panic().take() { + Some(runtime) => { + if let Some(timeout) = timeout { + match runtime.block_on(async { + tokio::time::timeout(timeout, self.shutdown_async()).await + }) { + Ok(()) => Ok(()), + Err(_) => Err(SharedRuntimeError::ShutdownTimedOut(timeout)), + } + } else { + runtime.block_on(self.shutdown_async()); + Ok(()) + } + } + None => Ok(()), + } + } + } +} + type BoxedWorker = Box; #[derive(Debug)] @@ -140,84 +374,70 @@ impl From for SharedRuntimeError { /// A shared runtime that manages PausableWorkers and provides fork safety hooks. /// -/// The SharedRuntime owns a tokio runtime and tracks PausableWorkers spawned on it. -/// It provides methods to safely pause workers before forking and restart them -/// after fork in both parent and child processes. +/// The SharedRuntime owns a tokio runtime (on native) and tracks PausableWorkers +/// spawned on it. It provides methods to safely pause workers before forking and +/// restart them after fork in both parent and child processes. +/// +/// On wasm32, no tokio runtime is created. Workers are spawned via `spawn_local` +/// on the JS event loop. /// /// # Mutex lock order /// When locking both [Self::runtime] and [Self::workers], the mutex must be locked in the order of /// the fields in the struct. When possible avoid holding both locks simultaneously. #[derive(Debug)] pub struct SharedRuntime { - runtime: Arc>>>, + #[cfg(not(target_arch = "wasm32"))] + runtime: Arc>>>, workers: Arc>>, next_worker_id: AtomicU64, } -/// Build a tokio runtime appropriate for the current platform. -/// -/// On wasm32, a single-threaded current-thread runtime is used since multi-threading -/// is not available. On all other platforms a multi-threaded runtime is used. -fn build_runtime() -> Result { - #[cfg(not(target_arch = "wasm32"))] - { - Builder::new_multi_thread() - .worker_threads(1) - .enable_all() - .build() - } - #[cfg(target_arch = "wasm32")] - { - Builder::new_current_thread().enable_all().build() - } -} - impl SharedRuntime { - /// Create a new SharedRuntime with a default tokio runtime. + /// Create a new SharedRuntime. + /// + /// On native, this creates a tokio multi-thread runtime. On wasm32, no runtime + /// is created (workers are spawned on the JS event loop via `spawn_local`). /// /// # Errors - /// Returns an error if the tokio runtime cannot be created. + /// Returns an error if the tokio runtime cannot be created (native only). pub fn new() -> Result { debug!("Creating new SharedRuntime"); - let runtime = build_runtime()?; - Ok(Self { - runtime: Arc::new(Mutex::new(Some(Arc::new(runtime)))), - workers: Arc::new(Mutex::new(Vec::new())), - next_worker_id: AtomicU64::new(1), - }) + #[cfg(not(target_arch = "wasm32"))] + { + Self::new_native() + } + #[cfg(target_arch = "wasm32")] + { + Ok(Self { + workers: Arc::new(Mutex::new(Vec::new())), + next_worker_id: AtomicU64::new(1), + }) + } } - /// Spawn a PausableWorker on this runtime. - /// - /// The worker will be tracked by this SharedRuntime and will be paused/resumed - /// during fork operations. - /// If `restart_on_fork` is true, the worker will be reset and restarted when calling - /// `after_fork_child` else the worker is dropped *without* calling `Worker::shutdown`. - /// - /// # Errors - /// Returns an error if the runtime is not available or the worker cannot be started. + /// Spawn a PausableWorker on the JS event loop (wasm variant). + #[cfg(target_arch = "wasm32")] pub fn spawn_worker( &self, worker: T, restart_on_fork: bool, ) -> Result { + use std::sync::atomic::Ordering; + let boxed_worker: BoxedWorker = Box::new(worker); debug!(?boxed_worker, "Spawning worker on SharedRuntime"); let mut pausable_worker = PausableWorker::new(boxed_worker); - // Hold the workers lock while starting the worker to avoid a race with - // before_fork: without this, before_fork could run after the worker is started but - // before it's added to the list, not pausing the worker before the runtime is dropped. - let runtime = self.runtime.lock_or_panic().clone(); let mut workers_guard = self.workers.lock_or_panic(); - // If the runtime is not available, the worker will be started - // when the runtime is recreated (after_fork_parent/child). - if let Some(runtime) = runtime { - if let Err(e) = pausable_worker.start(&runtime) { - return Err(e.into()); - } + if let Err(e) = pausable_worker.start(|future| { + use futures_util::FutureExt; + let (remote, handle) = future.remote_handle(); + wasm_bindgen_futures::spawn_local(remote); + Box::pin(async { Ok(handle.await) }) + }) { + return Err(e.into()); } let worker_id = self.next_worker_id.fetch_add(1, Ordering::Relaxed); @@ -234,145 +454,6 @@ impl SharedRuntime { }) } - /// Hook to be called before forking. - /// - /// This method pauses all workers and prepares the runtime for forking. - /// It ensures that no background tasks are running when the fork occurs, - /// preventing potential deadlocks in the child process. - /// - /// Worker errors are logged but do not cause the function to fail. - /// If the worker fails to pause it is dropped without calling shutdown. - #[cfg(not(target_arch = "wasm32"))] - pub fn before_fork(&self) { - debug!("before_fork: pausing all workers"); - if let Some(runtime) = self.runtime.lock_or_panic().take() { - let mut workers_lock = self.workers.lock_or_panic(); - runtime.block_on(async { - let futures: FuturesUnordered<_> = workers_lock - .iter_mut() - .map(|worker_entry| async { - if let Err(e) = worker_entry.worker.pause().await { - error!("Worker failed to pause before fork: {:?}", e); - } - }) - .collect(); - - futures.collect::<()>().await; - }); - } - } - - fn restart_runtime(&self) -> Result<(), SharedRuntimeError> { - let mut runtime_lock = self.runtime.lock_or_panic(); - if runtime_lock.is_none() { - *runtime_lock = Some(Arc::new(build_runtime()?)); - } - Ok(()) - } - - /// Hook to be called in the parent process after forking. - /// - /// This method restarts workers and resumes normal operation in the parent process. - /// The runtime may need to be recreated if it was shut down in before_fork. - /// - /// # Errors - /// Returns an error if workers cannot be restarted or the runtime cannot be recreated. - #[cfg(not(target_arch = "wasm32"))] - pub fn after_fork_parent(&self) -> Result<(), SharedRuntimeError> { - debug!("after_fork_parent: restarting runtime and workers"); - self.restart_runtime()?; - - let runtime_lock = self.runtime.lock_or_panic(); - let runtime = runtime_lock - .as_ref() - .ok_or(SharedRuntimeError::RuntimeUnavailable)? - .clone(); - drop(runtime_lock); - - let mut workers_lock = self.workers.lock_or_panic(); - - // Restart all workers - for worker_entry in workers_lock.iter_mut() { - worker_entry.worker.start(&runtime)?; - } - - Ok(()) - } - - /// Hook to be called in the child process after forking. - /// - /// This method reinitializes the runtime and workers in the child process. - /// A new runtime must be created since tokio runtimes cannot be safely forked. - /// Workers are reset and restarted to resume operations in the child. - /// - /// # Errors - /// Returns an error if the runtime cannot be reinitialized or workers cannot be started. - #[cfg(not(target_arch = "wasm32"))] - pub fn after_fork_child(&self) -> Result<(), SharedRuntimeError> { - debug!("after_fork_child: reinitializing runtime and workers"); - self.restart_runtime()?; - - let runtime_lock = self.runtime.lock_or_panic(); - let runtime = runtime_lock - .as_ref() - .ok_or(SharedRuntimeError::RuntimeUnavailable)? - .clone(); - drop(runtime_lock); - - let mut workers_lock = self.workers.lock_or_panic(); - - // Drop workers not marked as restart on fork - workers_lock.retain(|entry| entry.restart_on_fork); - - for worker_entry in workers_lock.iter_mut() { - worker_entry.worker.reset(); - worker_entry.worker.start(&runtime)?; - } - - Ok(()) - } - - /// Run a future to completion on the shared runtime, blocking the current thread. - /// - /// If the runtime is not available (e.g. after calling before_fork), a temporary - /// single-threaded runtime is used. - /// - /// # Errors - /// Returns an error if it fails to create a fallback runtime. - pub fn block_on(&self, f: F) -> Result { - let runtime = match self.runtime.lock_or_panic().as_ref() { - None => Arc::new(Builder::new_current_thread().enable_all().build()?), - Some(runtime) => runtime.clone(), - }; - Ok(runtime.block_on(f)) - } - - /// Shutdown the runtime and all workers synchronously with optional timeout. - /// - /// Worker errors are logged but do not cause the function to fail. - /// - /// # Errors - /// Returns an error only if shutdown times out. - pub fn shutdown(&self, timeout: Option) -> Result<(), SharedRuntimeError> { - debug!(?timeout, "Shutting down SharedRuntime"); - match self.runtime.lock_or_panic().take() { - Some(runtime) => { - if let Some(timeout) = timeout { - match runtime.block_on(async { - tokio::time::timeout(timeout, self.shutdown_async()).await - }) { - Ok(()) => Ok(()), - Err(_) => Err(SharedRuntimeError::ShutdownTimedOut(timeout)), - } - } else { - runtime.block_on(self.shutdown_async()); - Ok(()) - } - } - None => Ok(()), // The runtime is not running so there's nothing to shutdown - } - } - /// Shutdown all workers asynchronously. /// /// This should be called during application shutdown to cleanly stop all diff --git a/libdd-shared-runtime/src/shared_runtime/pausable_worker.rs b/libdd-shared-runtime/src/shared_runtime/pausable_worker.rs index e3dcec8701..7610fa7725 100644 --- a/libdd-shared-runtime/src/shared_runtime/pausable_worker.rs +++ b/libdd-shared-runtime/src/shared_runtime/pausable_worker.rs @@ -4,21 +4,47 @@ //! Defines a pausable worker to be able to stop background processes before forks use crate::worker::Worker; +use core::pin::Pin; +use libdd_capabilities::spawn::SpawnError; use libdd_capabilities::MaybeSend; use std::fmt::Display; -use tokio::{runtime::Runtime, select, task::JoinHandle}; +use std::future::Future; +use tokio::select; use tokio_util::sync::CancellationToken; use tracing::debug; +#[cfg(not(target_arch = "wasm32"))] +type WorkerFuture = Pin + Send + 'static>>; +#[cfg(target_arch = "wasm32")] +type WorkerFuture = Pin + 'static>>; + +#[cfg(not(target_arch = "wasm32"))] +type WorkerJoinHandle = Pin> + Send>>; +#[cfg(target_arch = "wasm32")] +type WorkerJoinHandle = Pin>>>; + +/// Build the spawn closure used by [`PausableWorker::start`] on native, backed by +/// `tokio::runtime::Handle::spawn`. Maps tokio's `JoinError` into the +/// executor-agnostic [`SpawnError`]. +#[cfg(not(target_arch = "wasm32"))] +pub(super) fn tokio_spawn_fn( + handle: &tokio::runtime::Handle, +) -> impl FnOnce(WorkerFuture) -> WorkerJoinHandle { + let h = handle.clone(); + move |future| { + let jh = h.spawn(future); + Box::pin(async { jh.await.map_err(|e| SpawnError::new(e.to_string())) }) + } +} + /// A pausable worker which can be paused and restarted on forks. /// /// Used to allow a [`super::Worker`] to be paused while saving its state when /// dropping a tokio runtime to be able to restart with the same state on a new runtime. This is /// used to stop all threads before a fork to avoid deadlocks in child. -#[derive(Debug)] pub enum PausableWorker { Running { - handle: JoinHandle, + handle: WorkerJoinHandle, stop_token: CancellationToken, }, Paused { @@ -27,6 +53,19 @@ pub enum PausableWorker { InvalidState, } +impl std::fmt::Debug for PausableWorker { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match self { + Self::Running { .. } => f.debug_struct("PausableWorker::Running").finish(), + Self::Paused { worker } => f + .debug_struct("PausableWorker::Paused") + .field("worker", worker) + .finish(), + Self::InvalidState => write!(f, "PausableWorker::InvalidState"), + } + } +} + #[derive(Debug)] pub enum PausableWorkerError { InvalidState, @@ -54,17 +93,19 @@ impl PausableWorker { Self::Paused { worker } } - /// Start the worker on the given runtime. + /// Start the worker using the given spawn function. /// - /// The worker's main loop will be run on the runtime. - pub fn start(&mut self, rt: &Runtime) -> Result<(), PausableWorkerError> { - #[cfg(target_arch = "wasm32")] - return Ok(()); - #[cfg(not(target_arch = "wasm32"))] + /// The worker's main loop will be spawned via the provided closure. + /// `SharedRuntime` constructs the appropriate platform-specific closure + /// (tokio on native, spawn_local on wasm). + pub fn start( + &mut self, + spawn_fn: impl FnOnce(WorkerFuture) -> WorkerJoinHandle, + ) -> Result<(), PausableWorkerError> { match self { PausableWorker::Running { .. } => Ok(()), - PausableWorker::Paused { worker } => { - debug!(?worker, "Starting pausable worker"); + PausableWorker::Paused { worker: _ } => { + debug!(?self, "Starting pausable worker"); let PausableWorker::Paused { mut worker } = std::mem::replace(self, PausableWorker::InvalidState) else { @@ -72,17 +113,12 @@ impl PausableWorker { return Ok(()); }; - // Worker is temporarily in an invalid state, but since this block is failsafe it - // will be replaced by a valid state. let stop_token = CancellationToken::new(); let cloned_token = stop_token.clone(); - let handle = rt.spawn(async move { + let future = Box::pin(async move { // First iteration using initial_trigger select! { - // Always check for cancellation first, to reduce time-to-pause in case - // the initial trigger is always ready. - biased; - + biased; _ = cloned_token.cancelled() => { return worker; } @@ -94,10 +130,7 @@ impl PausableWorker { // Regular iterations loop { select! { - // Always check for cancellation first, to reduce time-to-pause in case - // the trigger is always ready. biased; - _ = cloned_token.cancelled() => { break; } @@ -109,6 +142,8 @@ impl PausableWorker { worker }); + let handle = spawn_fn(future); + *self = PausableWorker::Running { handle, stop_token }; Ok(()) } @@ -141,7 +176,6 @@ impl PausableWorker { *self = PausableWorker::Paused { worker }; Ok(()) } else { - // The task has been aborted and the worker can't be retrieved. *self = PausableWorker::InvalidState; Err(PausableWorkerError::TaskAborted) } @@ -201,9 +235,11 @@ mod tests { let (sender, receiver) = channel::(); let worker = TestWorker { state: 0, sender }; let runtime = Builder::new_multi_thread().enable_time().build().unwrap(); - let mut pausable_worker = PausableWorker::new(worker); + let handle = runtime.handle().clone(); + let mut pausable_worker: PausableWorker> = + PausableWorker::new(Box::new(worker)); - pausable_worker.start(&runtime).unwrap(); + pausable_worker.start(tokio_spawn_fn(&handle)).unwrap(); assert_eq!(receiver.recv().unwrap(), 0); runtime.block_on(async { pausable_worker.pause().await.unwrap() }); @@ -212,7 +248,7 @@ mod tests { for message in receiver.try_iter() { next_message = message + 1; } - pausable_worker.start(&runtime).unwrap(); + pausable_worker.start(tokio_spawn_fn(&handle)).unwrap(); assert_eq!(receiver.recv().unwrap(), next_message); } } diff --git a/libdd-telemetry/Cargo.toml b/libdd-telemetry/Cargo.toml index f7f58afeb0..29ae444d5a 100644 --- a/libdd-telemetry/Cargo.toml +++ b/libdd-telemetry/Cargo.toml @@ -14,8 +14,8 @@ bench = false [features] default = ["tracing"] tracing = ["tracing/std"] -https = ["libdd-common/https"] -fips = ["libdd-common/fips"] +https = ["libdd-common/https", "libdd-shared-runtime/https"] +fips = ["libdd-common/fips", "libdd-shared-runtime/fips"] [dependencies] anyhow = { version = "1.0" } @@ -33,7 +33,7 @@ uuid = { version = "1.3", features = ["v4"] } hashbrown = "0.15" bytes = "1.4" libdd-common = { version = "4.0.0", path = "../libdd-common", default-features = false } -libdd-shared-runtime = { version = "0.1.0", path = "../libdd-shared-runtime" } +libdd-shared-runtime = { version = "0.1.0", path = "../libdd-shared-runtime", default-features = false } libdd-ddsketch = { version = "1.0.1", path = "../libdd-ddsketch" } [target.'cfg(not(target_arch = "wasm32"))'.dependencies] diff --git a/libdd-trace-stats/Cargo.toml b/libdd-trace-stats/Cargo.toml index 43604cadf0..d9546555b4 100644 --- a/libdd-trace-stats/Cargo.toml +++ b/libdd-trace-stats/Cargo.toml @@ -14,7 +14,7 @@ anyhow = "1.0" libdd-capabilities = { path = "../libdd-capabilities", version = "1.0.0" } libdd-common = { version = "4.0.0", path = "../libdd-common", default-features = false } libdd-ddsketch = { version = "1.0.1", path = "../libdd-ddsketch" } -libdd-shared-runtime = { version = "0.1.0", path = "../libdd-shared-runtime" } +libdd-shared-runtime = { version = "0.1.0", path = "../libdd-shared-runtime", default-features = false } libdd-trace-protobuf = { version = "3.0.1", path = "../libdd-trace-protobuf" } libdd-trace-obfuscation = { version = "2.0.0", path = "../libdd-trace-obfuscation", default-features = false } libdd-trace-utils = { version = "3.0.1", path = "../libdd-trace-utils", default-features = false } @@ -47,5 +47,5 @@ tokio = { version = "1.23", features = ["rt-multi-thread", "macros", "test-util" [features] default = ["https"] -https = ["libdd-common/https", "libdd-capabilities-impl/https"] -fips = ["libdd-common/fips", "libdd-capabilities-impl/fips"] +https = ["libdd-common/https", "libdd-capabilities-impl/https", "libdd-shared-runtime/https"] +fips = ["libdd-common/fips", "libdd-capabilities-impl/fips", "libdd-shared-runtime/fips"] diff --git a/libdd-trace-stats/src/stats_exporter.rs b/libdd-trace-stats/src/stats_exporter.rs index e0130804cd..a715167b01 100644 --- a/libdd-trace-stats/src/stats_exporter.rs +++ b/libdd-trace-stats/src/stats_exporter.rs @@ -11,7 +11,7 @@ use std::{ use crate::span_concentrator::{FlushableConcentrator, SpanConcentrator}; use async_trait::async_trait; -use libdd_capabilities::{HttpClientTrait, MaybeSend}; +use libdd_capabilities::{HttpClientCapability, MaybeSend, SleepCapability}; use libdd_common::Endpoint; use libdd_shared_runtime::Worker; use libdd_trace_protobuf::pb; @@ -54,19 +54,24 @@ impl<'a> From<&'a StatsMetadata> for TracerHeaderTags<'a> { /// An exporter that concentrates and sends stats to the agent. /// -/// `H` is the HTTP client implementation, see [`HttpClientTrait`]. Leaf crates -/// pin it to a concrete type. +/// `Cap` is the capabilities bundle (HTTP + sleep). Leaf crates pin it to a +/// concrete type (`NativeCapabilities` or `WasmCapabilities`). #[derive(Debug)] -pub struct StatsExporter { +pub struct StatsExporter< + Cap: HttpClientCapability + SleepCapability, + Con: FlushableConcentrator = SpanConcentrator, +> { flush_interval: time::Duration, - concentrator: Arc>, + concentrator: Arc>, endpoint: Endpoint, meta: StatsMetadata, sequence_id: AtomicU64, - client: H, + capabilities: Cap, } -impl StatsExporter { +impl + StatsExporter +{ /// Return a new StatsExporter /// /// - `flush_interval` the interval on which the concentrator is flushed @@ -78,10 +83,10 @@ impl StatsExporter { /// concentrator pub fn new( flush_interval: time::Duration, - concentrator: Arc>, + concentrator: Arc>, meta: StatsMetadata, endpoint: Endpoint, - client: H, + capabilities: Cap, ) -> Self { Self { flush_interval, @@ -89,7 +94,7 @@ impl StatsExporter { endpoint, meta, sequence_id: AtomicU64::new(0), - client, + capabilities, } } @@ -124,7 +129,7 @@ impl StatsExporter { ); let result = send_with_retry( - &self.client, + &self.capabilities, &self.endpoint, body, &headers, @@ -164,12 +169,12 @@ impl StatsExporter { #[cfg_attr(not(target_arch = "wasm32"), async_trait)] #[cfg_attr(target_arch = "wasm32", async_trait(?Send))] impl< - H: HttpClientTrait + MaybeSend + Sync + Debug + 'static, - C: FlushableConcentrator + Send + Debug, - > Worker for StatsExporter + Cap: HttpClientCapability + SleepCapability + MaybeSend + Sync + 'static, + Con: FlushableConcentrator + Send + Debug, + > Worker for StatsExporter { async fn trigger(&mut self) { - tokio::time::sleep(self.flush_interval).await; + self.capabilities.sleep(self.flush_interval).await; } /// Flush and send stats on every trigger. @@ -358,15 +363,15 @@ mod tests { then.status(200).body(""); }); + let caps = NativeCapabilities::new(); let stats_exporter = StatsExporter::::new( // Use smaller buckets duration to speed up test Duration::from_secs(1), Arc::new(Mutex::new(get_test_concentrator())), get_test_metadata(), Endpoint::from_url(stats_url_from_agent_url(&server.url("/")).unwrap()), - NativeCapabilities::new_client(), + caps.clone(), ); - let _handle = shared_runtime .spawn_worker(stats_exporter, true) .expect("Failed to spawn worker"); @@ -400,12 +405,13 @@ mod tests { let buckets_duration = Duration::from_secs(10); + let caps = NativeCapabilities::new(); let stats_exporter = StatsExporter::::new( buckets_duration, Arc::new(Mutex::new(get_test_concentrator())), get_test_metadata(), Endpoint::from_url(stats_url_from_agent_url(&server.url("/")).unwrap()), - NativeCapabilities::new_client(), + caps.clone(), ); let _handle = shared_runtime diff --git a/libdd-trace-utils/src/send_data/mod.rs b/libdd-trace-utils/src/send_data/mod.rs index c884566d17..39cc4eefff 100644 --- a/libdd-trace-utils/src/send_data/mod.rs +++ b/libdd-trace-utils/src/send_data/mod.rs @@ -11,7 +11,7 @@ use anyhow::{anyhow, Context}; use futures::stream::FuturesUnordered; use futures::StreamExt; use http::{header::CONTENT_TYPE, HeaderMap, HeaderValue}; -use libdd_capabilities::HttpClientTrait; +use libdd_capabilities::{HttpClientCapability, SleepCapability}; use libdd_common::{ header::{ APPLICATION_MSGPACK, APPLICATION_PROTOBUF, DATADOG_SEND_REAL_HTTP_STATUS, @@ -59,10 +59,10 @@ use zstd::stream::write::Encoder; /// /// send_data.set_retry_strategy(retry_strategy); /// -/// // Send the data (caller picks the HTTP client implementation) -/// use libdd_capabilities::HttpClientTrait; -/// let client = libdd_capabilities_impl::NativeCapabilities::new_client(); -/// let result = send_data.send(&client).await; +/// // Send the data (caller picks the capabilities implementation) +/// use libdd_capabilities::HttpClientCapability; +/// let capabilities = libdd_capabilities_impl::NativeCapabilities::new_client(); +/// let result = send_data.send(&capabilities).await; /// } /// ``` pub struct SendData { @@ -224,25 +224,28 @@ impl SendData { /// # Returns /// /// A `SendDataResult` instance containing the result of the operation. - pub async fn send(&self, client: &H) -> SendDataResult { - self.send_internal(client, None).await + pub async fn send( + &self, + capabilities: &C, + ) -> SendDataResult { + self.send_internal(capabilities, None).await } - async fn send_internal( + async fn send_internal( &self, - client: &H, + capabilities: &C, endpoint: Option, ) -> SendDataResult { if self.use_protobuf() { - self.send_with_protobuf(client, endpoint).await + self.send_with_protobuf(capabilities, endpoint).await } else { - self.send_with_msgpack(client, endpoint).await + self.send_with_msgpack(capabilities, endpoint).await } } - async fn send_payload( + async fn send_payload( &self, - client: &H, + capabilities: &C, chunks: u64, payload: Vec, headers: HeaderMap, @@ -252,7 +255,7 @@ impl SendData { let payload_len = u64::try_from(payload.len()).unwrap(); ( send_with_retry( - client, + capabilities, endpoint.unwrap_or(&self.target), payload, &headers, @@ -293,9 +296,9 @@ impl SendData { } } - async fn send_with_protobuf( + async fn send_with_protobuf( &self, - client: &H, + capabilities: &C, endpoint: Option, ) -> SendDataResult { let mut result = SendDataResult::default(); @@ -325,7 +328,7 @@ impl SendData { let (response, bytes_sent, chunks) = self .send_payload( - client, + capabilities, chunks, final_payload, request_headers, @@ -341,9 +344,9 @@ impl SendData { } } - async fn send_with_msgpack( + async fn send_with_msgpack( &self, - client: &H, + capabilities: &C, endpoint: Option, ) -> SendDataResult { let mut result = SendDataResult::default(); @@ -365,7 +368,7 @@ impl SendData { }; futures.push(self.send_payload( - client, + capabilities, chunks, payload, headers, @@ -384,7 +387,7 @@ impl SendData { let payload = msgpack_encoder::v04::to_vec(payload); futures.push(self.send_payload( - client, + capabilities, chunks, payload, headers, @@ -405,7 +408,7 @@ impl SendData { }; futures.push(self.send_payload( - client, + capabilities, chunks, payload, headers, @@ -460,7 +463,7 @@ mod tests { use crate::tracer_header_tags::TracerHeaderTags; use httpmock::prelude::*; use httpmock::MockServer; - use libdd_capabilities::HttpClientTrait; + use libdd_capabilities::HttpClientCapability; use libdd_capabilities_impl::NativeCapabilities; use libdd_common::Endpoint; use libdd_trace_protobuf::pb::Span; diff --git a/libdd-trace-utils/src/send_with_retry/mod.rs b/libdd-trace-utils/src/send_with_retry/mod.rs index 7ee59e9bf1..4509c95bd7 100644 --- a/libdd-trace-utils/src/send_with_retry/mod.rs +++ b/libdd-trace-utils/src/send_with_retry/mod.rs @@ -9,9 +9,8 @@ pub use retry_strategy::{RetryBackoffType, RetryStrategy}; use bytes::Bytes; use http::HeaderMap; -use libdd_capabilities::{HttpClientTrait, HttpError}; +use libdd_capabilities::{HttpClientCapability, HttpError, SleepCapability}; use libdd_common::Endpoint; -#[cfg(not(target_arch = "wasm32"))] use std::time::Duration; use tracing::{debug, error}; @@ -69,7 +68,7 @@ impl std::error::Error for SendWithRetryError {} /// /// ```rust, no_run /// # use libdd_common::Endpoint; -/// # use libdd_capabilities::HttpClientTrait; +/// # use libdd_capabilities::{HttpClientCapability, SleepCapability}; /// # use libdd_trace_utils::send_with_retry::*; /// # async fn run() -> SendWithRetryResult { /// let payload: Vec = vec![0, 1, 2, 3]; @@ -83,19 +82,18 @@ impl std::error::Error for SendWithRetryError {} /// http::HeaderValue::from_static("application/msgpack"), /// ); /// let retry_strategy = RetryStrategy::new(3, 10, RetryBackoffType::Exponential, Some(5)); -/// let client = libdd_capabilities_impl::NativeCapabilities::new_client(); -/// send_with_retry(&client, &target, payload, &headers, &retry_strategy).await +/// let capabilities = libdd_capabilities_impl::NativeCapabilities::new_client(); +/// send_with_retry(&capabilities, &target, payload, &headers, &retry_strategy).await /// # } /// ``` -pub async fn send_with_retry( - client: &H, +pub async fn send_with_retry( + capabilities: &C, target: &Endpoint, payload: Vec, headers: &HeaderMap, retry_strategy: &RetryStrategy, ) -> SendWithRetryResult { let mut request_attempt = 0; - #[cfg(not(target_arch = "wasm32"))] let timeout = Duration::from_millis(target.timeout_ms); debug!( @@ -130,10 +128,11 @@ pub async fn send_with_retry( } }; - #[cfg(not(target_arch = "wasm32"))] - let result = tokio::time::timeout(timeout, client.request(req)).await; - #[cfg(target_arch = "wasm32")] - let result: Result, std::convert::Infallible> = Ok(client.request(req).await); + let result = tokio::select! { + biased; + r = capabilities.request(req) => Ok(r), + _ = capabilities.sleep(timeout) => Err(()), + }; match result { Ok(Ok(response)) => { @@ -158,7 +157,7 @@ pub async fn send_with_retry( remaining_retries = retry_strategy.max_retries() - request_attempt, "Retrying after error status code" ); - retry_strategy.delay(request_attempt).await; + retry_strategy.delay(request_attempt, capabilities).await; continue; } else { error!( @@ -191,7 +190,7 @@ pub async fn send_with_retry( remaining_retries = retry_strategy.max_retries() - request_attempt, "Retrying after request error" ); - retry_strategy.delay(request_attempt).await; + retry_strategy.delay(request_attempt, capabilities).await; continue; } else { let classified_error = match e { @@ -223,7 +222,7 @@ pub async fn send_with_retry( remaining_retries = retry_strategy.max_retries() - request_attempt, "Retrying after timeout" ); - retry_strategy.delay(request_attempt).await; + retry_strategy.delay(request_attempt, capabilities).await; continue; } else { error!( @@ -242,7 +241,7 @@ mod tests { use super::*; use crate::test_utils::poll_for_mock_hit; use httpmock::MockServer; - use libdd_capabilities::HttpClientTrait; + use libdd_capabilities::HttpClientCapability; use libdd_capabilities_impl::NativeCapabilities; #[cfg_attr(miri, ignore)] @@ -273,11 +272,11 @@ mod tests { }; let strategy = RetryStrategy::new(0, 2, RetryBackoffType::Constant, None); - let client = NativeCapabilities::new_client(); + let capabilities = NativeCapabilities::new_client(); tokio::spawn(async move { let result = send_with_retry( - &client, + &capabilities, &target_endpoint, vec![0, 1, 2, 3], &HeaderMap::new(), @@ -322,11 +321,11 @@ mod tests { }; let strategy = RetryStrategy::new(2, 250, RetryBackoffType::Constant, None); - let client = NativeCapabilities::new_client(); + let capabilities = NativeCapabilities::new_client(); tokio::spawn(async move { let result = send_with_retry( - &client, + &capabilities, &target_endpoint, vec![0, 1, 2, 3], &HeaderMap::new(), @@ -371,11 +370,11 @@ mod tests { RetryBackoffType::Constant, None, ); - let client = NativeCapabilities::new_client(); + let capabilities = NativeCapabilities::new_client(); tokio::spawn(async move { let result = send_with_retry( - &client, + &capabilities, &target_endpoint, vec![0, 1, 2, 3], &HeaderMap::new(), @@ -420,11 +419,11 @@ mod tests { }; let strategy = RetryStrategy::new(2, 10, RetryBackoffType::Constant, None); - let client = NativeCapabilities::new_client(); + let capabilities = NativeCapabilities::new_client(); tokio::spawn(async move { let result = send_with_retry( - &client, + &capabilities, &target_endpoint, vec![0, 1, 2, 3], &HeaderMap::new(), diff --git a/libdd-trace-utils/src/send_with_retry/retry_strategy.rs b/libdd-trace-utils/src/send_with_retry/retry_strategy.rs index fba2191b8e..22ac83d76f 100644 --- a/libdd-trace-utils/src/send_with_retry/retry_strategy.rs +++ b/libdd-trace-utils/src/send_with_retry/retry_strategy.rs @@ -4,8 +4,8 @@ //! Types used when calling [`super::send_with_retry`] to configure the retry logic. use std::time::Duration; -#[cfg(not(target_arch = "wasm32"))] -use tokio::time::sleep; + +use libdd_capabilities::sleep::SleepCapability; /// Enum representing the type of backoff to use for the delay between retries. #[derive(Debug, Clone)] @@ -17,12 +17,6 @@ pub enum RetryBackoffType { Constant, /// The delay is doubled for each attempt. Exponential, - /// No delay between retries. Intended for wasm where `tokio::time::sleep` is unavailable. - /// Should be paired with `max_retries: 0` to avoid spamming the target. - /// - /// Temporary workaround: a proper solution would introduce a `SleepTrait` capability so that - /// wasm can delegate to a JS-side timer (e.g. `setTimeout`). - Disabled, } /// Struct representing the retry strategy for sending data. @@ -45,23 +39,11 @@ pub struct RetryStrategy { impl Default for RetryStrategy { fn default() -> Self { - #[cfg(not(target_arch = "wasm32"))] - { - RetryStrategy { - max_retries: 5, - delay_ms: Duration::from_millis(100), - backoff_type: RetryBackoffType::Exponential, - jitter: None, - } - } - #[cfg(target_arch = "wasm32")] - { - RetryStrategy { - max_retries: 0, - delay_ms: Duration::ZERO, - backoff_type: RetryBackoffType::Disabled, - jitter: None, - } + RetryStrategy { + max_retries: 5, + delay_ms: Duration::from_millis(100), + backoff_type: RetryBackoffType::Exponential, + jitter: None, } } } @@ -109,30 +91,21 @@ impl RetryStrategy { /// # Arguments /// /// * `attempt`: The number of the current attempt (1-indexed). - pub(crate) async fn delay(&self, attempt: u32) { - if matches!(self.backoff_type, RetryBackoffType::Disabled) { - return; - } - - #[cfg(not(target_arch = "wasm32"))] - { - let delay = match self.backoff_type { - RetryBackoffType::Exponential => self.delay_ms * 2u32.pow(attempt - 1), - RetryBackoffType::Constant => self.delay_ms, - RetryBackoffType::Linear => self.delay_ms + (self.delay_ms * (attempt - 1)), - RetryBackoffType::Disabled => unreachable!(), - }; + /// * `capabilities`: Provides the sleep capability for the delay. + pub(crate) async fn delay(&self, attempt: u32, capabilities: &C) { + let delay = match self.backoff_type { + RetryBackoffType::Exponential => self.delay_ms * 2u32.pow(attempt - 1), + RetryBackoffType::Constant => self.delay_ms, + RetryBackoffType::Linear => self.delay_ms + (self.delay_ms * (attempt - 1)), + }; - if let Some(jitter) = self.jitter { - let jitter = rand::random::() % jitter.as_millis() as u64; - sleep(delay + Duration::from_millis(jitter)).await; - } else { - sleep(delay).await; - } - } - #[cfg(target_arch = "wasm32")] - { - let _ = attempt; + if let Some(jitter) = self.jitter { + let jitter = rand::random::() % jitter.as_millis() as u64; + capabilities + .sleep(delay + Duration::from_millis(jitter)) + .await; + } else { + capabilities.sleep(delay).await; } } @@ -146,6 +119,7 @@ impl RetryStrategy { // For tests RetryStrategy tests the observed delay should be approximate. mod tests { use super::*; + use libdd_capabilities_impl::NativeSleepCapability; use tokio::time::Instant; // This tolerance is on the higher side to account for github's runners not having consistent @@ -162,9 +136,10 @@ mod tests { backoff_type: RetryBackoffType::Constant, jitter: None, }; + let capabilities = NativeSleepCapability; let start = Instant::now(); - retry_strategy.delay(1).await; + retry_strategy.delay(1, &capabilities).await; let elapsed = start.elapsed(); assert!( @@ -177,7 +152,7 @@ mod tests { ); let start = Instant::now(); - retry_strategy.delay(2).await; + retry_strategy.delay(2, &capabilities).await; let elapsed = start.elapsed(); assert!( @@ -199,9 +174,10 @@ mod tests { backoff_type: RetryBackoffType::Linear, jitter: None, }; + let capabilities = NativeSleepCapability; let start = Instant::now(); - retry_strategy.delay(1).await; + retry_strategy.delay(1, &capabilities).await; let elapsed = start.elapsed(); assert!( @@ -214,7 +190,7 @@ mod tests { ); let start = Instant::now(); - retry_strategy.delay(3).await; + retry_strategy.delay(3, &capabilities).await; let elapsed = start.elapsed(); // For the Linear strategy, the delay for the 3rd attempt should be delay_ms + (delay_ms * @@ -239,9 +215,10 @@ mod tests { backoff_type: RetryBackoffType::Exponential, jitter: None, }; + let capabilities = NativeSleepCapability; let start = Instant::now(); - retry_strategy.delay(1).await; + retry_strategy.delay(1, &capabilities).await; let elapsed = start.elapsed(); assert!( @@ -254,7 +231,7 @@ mod tests { ); let start = Instant::now(); - retry_strategy.delay(3).await; + retry_strategy.delay(3, &capabilities).await; let elapsed = start.elapsed(); // For the Exponential strategy, the delay for the 3rd attempt should be delay_ms * 2^(3-1) // = delay_ms * 4. @@ -277,9 +254,10 @@ mod tests { backoff_type: RetryBackoffType::Constant, jitter: Some(Duration::from_millis(50)), }; + let capabilities = NativeSleepCapability; let start = Instant::now(); - retry_strategy.delay(1).await; + retry_strategy.delay(1, &capabilities).await; let elapsed = start.elapsed(); // The delay should be between delay_ms and delay_ms + jitter diff --git a/libdd-trace-utils/src/stats_utils.rs b/libdd-trace-utils/src/stats_utils.rs index ffa9a72ba0..708b7ee883 100644 --- a/libdd-trace-utils/src/stats_utils.rs +++ b/libdd-trace-utils/src/stats_utils.rs @@ -8,7 +8,7 @@ pub use mini_agent::*; mod mini_agent { use bytes::{Buf, Bytes}; use http_body_util::BodyExt; - use libdd_capabilities::HttpClientTrait; + use libdd_capabilities::HttpClientCapability; use libdd_common::http_common; use libdd_common::Endpoint; use libdd_trace_protobuf::pb; @@ -63,7 +63,7 @@ mod mini_agent { } } - pub async fn send_stats_payload( + pub async fn send_stats_payload( data: Vec, target: &Endpoint, api_key: &str, diff --git a/libdd-trace-utils/src/test_utils/datadog_test_agent.rs b/libdd-trace-utils/src/test_utils/datadog_test_agent.rs index 9710700b05..dd8351493e 100644 --- a/libdd-trace-utils/src/test_utils/datadog_test_agent.rs +++ b/libdd-trace-utils/src/test_utils/datadog_test_agent.rs @@ -210,7 +210,7 @@ impl DatadogAgentContainerBuilder { /// Basic usage: /// /// ```no_run -/// use libdd_capabilities::HttpClientTrait; +/// use libdd_capabilities::HttpClientCapability; /// use libdd_capabilities_impl::NativeCapabilities; /// use libdd_common::Endpoint; /// use libdd_trace_utils::send_data::SendData; diff --git a/libdd-trace-utils/tests/test_send_data.rs b/libdd-trace-utils/tests/test_send_data.rs index 75b98fc146..37ae09d691 100644 --- a/libdd-trace-utils/tests/test_send_data.rs +++ b/libdd-trace-utils/tests/test_send_data.rs @@ -6,7 +6,7 @@ mod tracing_integration_tests { use http_body_util::BodyExt; #[cfg(target_os = "linux")] use hyper::Uri; - use libdd_capabilities_impl::{HttpClientTrait, NativeCapabilities}; + use libdd_capabilities_impl::{HttpClientCapability, NativeCapabilities}; #[cfg(target_os = "linux")] use libdd_common::connector::uds::socket_path_to_uri; use libdd_common::{http_common, Endpoint};