diff --git a/Cargo.lock b/Cargo.lock index 15aa427ac6..ade0b8901a 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1507,6 +1507,7 @@ name = "datadog-sidecar" version = "0.0.1" dependencies = [ "anyhow", + "arc-swap", "arrayref", "base64 0.22.1", "bincode", @@ -2977,11 +2978,13 @@ dependencies = [ "libdd-capabilities", "libdd-capabilities-impl", "libdd-common", + "libdd-ddsketch", "libdd-dogstatsd-client", "libdd-log", "libdd-shared-runtime", "libdd-telemetry", "libdd-tinybytes", + "libdd-trace-obfuscation", "libdd-trace-protobuf", "libdd-trace-stats", "libdd-trace-utils", @@ -3356,6 +3359,7 @@ name = "libdd-trace-stats" version = "2.0.0" dependencies = [ "anyhow", + "arc-swap", "async-trait", "criterion", "hashbrown 0.15.1", diff --git a/Cargo.toml b/Cargo.toml index 1c43832ed2..da140d1ecc 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -77,6 +77,7 @@ license = "Apache-2.0" authors = ["Datadog Inc. "] [workspace.dependencies] +arc-swap = "1.7.1" hyper = { version = "1.6", features = [ "http1", "client", diff --git a/datadog-sidecar/Cargo.toml b/datadog-sidecar/Cargo.toml index 5c4ffffbf0..40b630fdfc 100644 --- a/datadog-sidecar/Cargo.toml +++ b/datadog-sidecar/Cargo.toml @@ -12,9 +12,11 @@ bench = false default = ["tracing"] tracing = ["tracing/std", "tracing-log", "tracing-subscriber"] tokio-console = ["tokio/full", "tokio/tracing", "console-subscriber"] +stats-obfuscation = ["libdd-trace-stats/stats-obfuscation", "libdd-data-pipeline/stats-obfuscation", "arc-swap"] [dependencies] anyhow = { version = "1.0" } +arc-swap = { workspace = true, optional = true } arrayref = "0.3.7" priority-queue = "2.1.1" libdd-common = { path = "../libdd-common" } @@ -24,7 +26,7 @@ datadog-sidecar-macros = { path = "../datadog-sidecar-macros" } libdd-telemetry = { path = "../libdd-telemetry", features = ["tracing"] } libdd-data-pipeline = { path = "../libdd-data-pipeline" } libdd-trace-utils = { path = "../libdd-trace-utils" } -libdd-trace-stats = { path = "../libdd-trace-stats" } +libdd-trace-stats = { path = "../libdd-trace-stats", default-features=false, features = ["https"] } datadog-remote-config = { path = "../datadog-remote-config" , features = ["live-debugger"]} datadog-live-debugger = { path = "../datadog-live-debugger" } libdd-crashtracker = { path = "../libdd-crashtracker" } diff --git a/datadog-sidecar/src/service/stats_flusher.rs b/datadog-sidecar/src/service/stats_flusher.rs index 9865d52cef..86bfd9f395 100644 --- a/datadog-sidecar/src/service/stats_flusher.rs +++ b/datadog-sidecar/src/service/stats_flusher.rs @@ -110,6 +110,14 @@ fn make_exporter( s.meta.clone(), endpoint, NativeCapabilities::new_client(), + // Sidecar does not perform client-side stats obfuscation. Pass a disabled + // default so the `datadog-obfuscation-version` header is never sent. + #[cfg(feature = "stats-obfuscation")] + Arc::new(arc_swap::ArcSwap::from_pointee( + libdd_trace_stats::span_concentrator::StatsComputationObfuscationConfig::default(), + )), + #[cfg(feature = "stats-obfuscation")] + "0", ) } diff --git a/libdd-crashtracker/src/receiver/ptrace_collector.rs b/libdd-crashtracker/src/receiver/ptrace_collector.rs index d7a563c52c..309170aa89 100644 --- a/libdd-crashtracker/src/receiver/ptrace_collector.rs +++ b/libdd-crashtracker/src/receiver/ptrace_collector.rs @@ -31,9 +31,9 @@ use std::ptr; use std::time::{Duration, Instant}; use libdd_libunwind_sys::{ - UnwAddrSpaceT, UnwCursor, UnwWord, _UPT_accessors, _UPT_create, _UPT_destroy, - unw_create_addr_space, unw_destroy_addr_space, unw_get_proc_name_remote, unw_get_reg_remote, - unw_init_remote, unw_step_remote, UNW_REG_IP, UNW_REG_SP, + _UPT_accessors, _UPT_create, _UPT_destroy, unw_create_addr_space, unw_destroy_addr_space, + unw_get_proc_name_remote, unw_get_reg_remote, unw_init_remote, unw_step_remote, UnwAddrSpaceT, + UnwCursor, UnwWord, UNW_REG_IP, UNW_REG_SP, }; use crate::crash_info::{StackFrame, StackTrace}; diff --git a/libdd-data-pipeline/Cargo.toml b/libdd-data-pipeline/Cargo.toml index 707b3dbebb..83fd731c1e 100644 --- a/libdd-data-pipeline/Cargo.toml +++ b/libdd-data-pipeline/Cargo.toml @@ -12,8 +12,8 @@ autobenches = false # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html [dependencies] +arc-swap.workspace = true anyhow = { version = "1.0" } -arc-swap = "1.7.1" async-trait = "0.1" http = "1" http-body-util = "0.1" @@ -38,6 +38,8 @@ libdd-telemetry = { version = "5.0.0", path = "../libdd-telemetry", default-feat libdd-trace-protobuf = { version = "3.0.1", path = "../libdd-trace-protobuf" } libdd-trace-stats = { version = "2.0.0", path = "../libdd-trace-stats", default-features = false } libdd-trace-utils = { version = "3.0.1", path = "../libdd-trace-utils", default-features = false } +libdd-trace-obfuscation = { version = "2.0.0", path = "../libdd-trace-obfuscation", optional = true } +libdd-ddsketch = { version = "1.0.1", path = "../libdd-ddsketch" } libdd-dogstatsd-client = { version = "2.0.0", path = "../libdd-dogstatsd-client", default-features = false } libdd-tinybytes = { version = "1.1.0", path = "../libdd-tinybytes", features = [ "bytes_string", @@ -91,6 +93,10 @@ https = [ "libdd-trace-utils/https", "libdd-dogstatsd-client/https", ] +stats-obfuscation = [ + "libdd-trace-obfuscation", + "libdd-trace-stats/stats-obfuscation" +] fips = [ "libdd-common/fips", "libdd-capabilities-impl/fips", diff --git a/libdd-data-pipeline/src/agent_info/fetcher.rs b/libdd-data-pipeline/src/agent_info/fetcher.rs index 97d0d3a14f..43e594caab 100644 --- a/libdd-data-pipeline/src/agent_info/fetcher.rs +++ b/libdd-data-pipeline/src/agent_info/fetcher.rs @@ -408,12 +408,12 @@ mod single_threaded_tests { }, "remove_stack_traces": false, "redis": { - "Enabled": true, - "RemoveAllArgs": false + "enabled": true, + "remove_all_args": false }, "memcached": { - "Enabled": true, - "KeepCommand": false + "enabled": true, + "keep_command": false } } }, @@ -424,7 +424,7 @@ mod single_threaded_tests { format!("{:x}", Sha256::digest(json.as_bytes())) } - const TEST_INFO_HASH: &str = "b7709671827946c15603847bca76c90438579c038ec134eae19c51f1f3e3dfea"; + const TEST_INFO_HASH: &str = "cce54bf6e7d1bf38088a3ec809bfeec160bc52d37f70bd6b581ce3c2f7be5a65"; #[cfg_attr(miri, ignore)] #[tokio::test] diff --git a/libdd-data-pipeline/src/agent_info/schema.rs b/libdd-data-pipeline/src/agent_info/schema.rs index 1a3a191f44..f0eedc97e1 100644 --- a/libdd-data-pipeline/src/agent_info/schema.rs +++ b/libdd-data-pipeline/src/agent_info/schema.rs @@ -35,6 +35,8 @@ pub struct AgentInfoStruct { pub peer_tags: Option>, /// List of span kinds eligible for stats computation pub span_kinds_stats_computed: Option>, + /// Obfuscation version supported by the agent for client-side stats + pub obfuscation_version: Option, /// Container tags hash from HTTP response header pub container_tags_hash: Option, /// Exact-match tag filters applied before stats computation (root span only). @@ -69,15 +71,19 @@ pub struct Config { pub max_memory: Option, pub max_cpu: Option, pub analyzed_spans_by_service: Option>>, + pub obfuscation: Option, } #[allow(missing_docs)] -#[derive(Clone, Deserialize, Default, Debug, PartialEq)] +#[derive(Clone, Serialize, Deserialize, Default, Debug, PartialEq)] pub struct ObfuscationConfig { pub elastic_search: bool, pub mongo: bool, pub sql_exec_plan: bool, pub sql_exec_plan_normalize: bool, + #[cfg(feature = "stats-obfuscation")] + // Option because it might not exist with old agents + pub sql_obfuscation_mode: Option, pub http: HttpObfuscationConfig, pub remove_stack_traces: bool, pub redis: RedisObfuscationConfig, @@ -85,21 +91,21 @@ pub struct ObfuscationConfig { } #[allow(missing_docs)] -#[derive(Clone, Deserialize, Default, Debug, PartialEq)] +#[derive(Clone, Serialize, Deserialize, Default, Debug, PartialEq)] pub struct HttpObfuscationConfig { pub remove_query_string: bool, pub remove_path_digits: bool, } #[allow(missing_docs)] -#[derive(Clone, Deserialize, Default, Debug, PartialEq)] +#[derive(Clone, Serialize, Deserialize, Default, Debug, PartialEq)] pub struct RedisObfuscationConfig { pub enabled: bool, pub remove_all_args: bool, } #[allow(missing_docs)] -#[derive(Clone, Deserialize, Default, Debug, PartialEq)] +#[derive(Clone, Serialize, Deserialize, Default, Debug, PartialEq)] pub struct MemcachedObfuscationConfig { pub enabled: bool, pub keep_command: bool, diff --git a/libdd-data-pipeline/src/lib.rs b/libdd-data-pipeline/src/lib.rs index 0ac5971f96..2b9955ce3d 100644 --- a/libdd-data-pipeline/src/lib.rs +++ b/libdd-data-pipeline/src/lib.rs @@ -3,6 +3,7 @@ #![cfg_attr(not(test), deny(clippy::panic))] #![cfg_attr(not(test), deny(clippy::unwrap_used))] #![cfg_attr(not(test), deny(clippy::expect_used))] +#![cfg_attr(not(test), deny(clippy::unreachable))] #![cfg_attr(not(test), deny(clippy::todo))] #![cfg_attr(not(test), deny(clippy::unimplemented))] diff --git a/libdd-data-pipeline/src/trace_exporter/builder.rs b/libdd-data-pipeline/src/trace_exporter/builder.rs index b4a00e3a14..bd157abe8d 100644 --- a/libdd-data-pipeline/src/trace_exporter/builder.rs +++ b/libdd-data-pipeline/src/trace_exporter/builder.rs @@ -53,6 +53,8 @@ pub struct TraceExporterBuilder { peer_tags_aggregation: bool, compute_stats_by_span_kind: bool, peer_tags: Vec, + #[cfg(feature = "stats-obfuscation")] + client_side_stats_obfuscation_enabled: bool, #[cfg(feature = "telemetry")] telemetry: Option, telemetry_instrumentation_sessions: TelemetryInstrumentationSessions, @@ -211,6 +213,17 @@ impl TraceExporterBuilder { self } + /// Enable client-side stats obfuscation. Disabled by default. + /// + /// Final activation also requires the agent to advertise a supported + /// `obfuscation_version` via the `/info` endpoint. When disabled, no + /// `datadog-obfuscation-version` header is sent on stats payloads. + #[cfg(feature = "stats-obfuscation")] + pub fn enable_client_side_stats_obfuscation(&mut self) -> &mut Self { + self.client_side_stats_obfuscation_enabled = true; + self + } + #[cfg(feature = "telemetry")] /// Enables sending telemetry metrics. pub fn enable_telemetry(&mut self, cfg: TelemetryConfig) -> &mut Self { @@ -324,6 +337,11 @@ impl TraceExporterBuilder { // native, workers run on the tokio runtime; on wasm, they run on the JS // event loop via `spawn_local`. Telemetry remains native-only for now. + #[cfg(feature = "stats-obfuscation")] + use libdd_trace_stats::span_concentrator::StatsComputationObfuscationConfig; + + use crate::trace_exporter::stats::StatsComputationConfig; + let info_endpoint = Endpoint::from_url(add_path(&agent_url, INFO_ENDPOINT)); let (info_fetcher, info_response_observer) = AgentInfoFetcher::::new(info_endpoint, Duration::from_secs(5 * 60)); @@ -453,7 +471,15 @@ impl TraceExporterBuilder { shared_runtime, dogstatsd, common_stats_tags: vec![libdatadog_version], - client_side_stats: ArcSwap::new(stats.into()), + client_side_stats: StatsComputationConfig { + status: ArcSwap::new(stats.into()), + #[cfg(feature = "stats-obfuscation")] + obfuscation_config: Arc::new(ArcSwap::from_pointee( + StatsComputationObfuscationConfig::default(), + )), + #[cfg(feature = "stats-obfuscation")] + obfuscation_enabled: self.client_side_stats_obfuscation_enabled, + }, previous_info_state: arc_swap::ArcSwapOption::new(None), info_response_observer, #[cfg(all(not(target_arch = "wasm32"), feature = "telemetry"))] diff --git a/libdd-data-pipeline/src/trace_exporter/mod.rs b/libdd-data-pipeline/src/trace_exporter/mod.rs index 95c13889c7..561bc56e88 100644 --- a/libdd-data-pipeline/src/trace_exporter/mod.rs +++ b/libdd-data-pipeline/src/trace_exporter/mod.rs @@ -24,12 +24,13 @@ use crate::trace_exporter::agent_response::{ use crate::trace_exporter::error::{ InternalErrorKind, RequestError, ShutdownError, TraceExporterError, }; +use crate::trace_exporter::stats::StatsComputationConfig; use crate::{ agent_info::{self, schema::AgentInfo}, health_metrics, health_metrics::{HealthMetric, SendResult, TransportErrorType}, }; -use arc_swap::{ArcSwap, ArcSwapOption}; +use arc_swap::ArcSwapOption; use bytes::Bytes; use http::header::HeaderMap; use http::uri::PathAndQuery; @@ -222,7 +223,7 @@ pub struct TraceExporter, common_stats_tags: Vec, client_computed_top_level: bool, - client_side_stats: ArcSwap, + client_side_stats: StatsComputationConfig, #[cfg_attr(target_arch = "wasm32", allow(dead_code))] previous_info_state: ArcSwapOption, info_response_observer: ResponseObserver, @@ -279,7 +280,7 @@ impl Tra // Extract the stats handle before moving other fields. if let StatsComputationStatus::Enabled { worker_handle, .. } = - &**self.client_side_stats.load() + &**self.client_side_stats.status.load() { let handle = worker_handle.clone(); join_set.spawn(async move { handle.stop().await }); @@ -381,7 +382,7 @@ impl Tra fn check_agent_info(&self) { if let Some(agent_info) = agent_info::get_agent_info() { if self.has_agent_info_state_changed(&agent_info) { - match &**self.client_side_stats.load() { + match &**self.client_side_stats.status.load() { StatsComputationStatus::Disabled => {} StatsComputationStatus::DisabledByAgent { .. } => { let ctx = stats::StatsContext { @@ -392,8 +393,8 @@ impl Tra stats::handle_stats_disabled_by_agent( &ctx, &agent_info, - &self.client_side_stats, self.capabilities.clone(), + &self.client_side_stats, ); } StatsComputationStatus::Enabled { @@ -573,7 +574,7 @@ impl Tra mp_payload: Vec, headers: HeaderMap, chunks: usize, - chunks_dropped_p0: usize, + #[cfg_attr(not(feature = "telemetry"), allow(unused_variables))] chunks_dropped_p0: usize, ) -> Result { let strategy = RetryStrategy::default(); let payload_len = mp_payload.len(); @@ -614,7 +615,7 @@ impl Tra let dropped_p0_stats = stats::process_traces_for_stats( &mut traces, &mut header_tags, - &self.client_side_stats, + &self.client_side_stats.status, self.client_computed_top_level, ); @@ -851,7 +852,7 @@ impl Tra #[cfg(not(target_arch = "wasm32"))] /// Test only function to check if the stats computation is active and the worker is running pub fn is_stats_worker_active(&self) -> bool { - stats::is_stats_worker_active(&self.client_side_stats) + stats::is_stats_worker_active(&self.client_side_stats.status) } } @@ -2071,4 +2072,122 @@ mod single_threaded_tests { mock_traces.assert(); } + + #[cfg(feature = "stats-obfuscation")] + fn build_obfuscation_test_exporter( + url: String, + runtime: Arc, + opt_in: bool, + ) -> TraceExporter { + let mut builder = TraceExporter::::builder(); + builder + .set_url(&url) + .set_service("test") + .set_env("staging") + .set_tracer_version("v0.1") + .set_language("nodejs") + .set_language_version("1.0") + .set_language_interpreter("v8") + .set_input_format(TraceExporterInputFormat::V04) + .set_output_format(TraceExporterOutputFormat::V04) + .set_shared_runtime(runtime) + .enable_stats(Duration::from_secs(10)); + if opt_in { + builder.enable_client_side_stats_obfuscation(); + } + builder.build::().unwrap() + } + + #[cfg(feature = "stats-obfuscation")] + fn run_obfuscation_test(opt_in: bool, agent_obfuscation_version: Option) -> bool { + agent_info::clear_cache_for_test(); + + let server = MockServer::start(); + + let _mock_traces = server.mock(|when, then| { + when.method(POST) + .header("Content-type", "application/msgpack") + .path("/v0.4/traces"); + then.status(200).body(""); + }); + + let _mock_stats = server.mock(|when, then| { + when.method(POST) + .header("Content-type", "application/msgpack") + .path("/v0.6/stats"); + then.status(200).body(""); + }); + + let info_body = match agent_obfuscation_version { + Some(v) => format!( + r#"{{"version":"1","client_drop_p0s":true,"obfuscation_version":{v},"endpoints":["/v0.4/traces","/v0.6/stats"]}}"# + ), + None => r#"{"version":"1","client_drop_p0s":true,"endpoints":["/v0.4/traces","/v0.6/stats"]}"#.to_string(), + }; + let _mock_info = server.mock(|when, then| { + when.method(GET).path("/info"); + then.status(200) + .header("content-type", "application/json") + .header("datadog-agent-state", "1") + .body(info_body); + }); + + let runtime = Arc::new(SharedRuntime::new().unwrap()); + let exporter = build_obfuscation_test_exporter(server.url("/"), runtime.clone(), opt_in); + + while agent_info::get_agent_info().is_none() { + std::thread::sleep(Duration::from_millis(100)); + } + + let trace_chunk = vec![SpanBytes { + duration: 10, + ..Default::default() + }]; + let data = msgpack_encoder::v04::to_vec(&[trace_chunk]); + let _ = exporter.send(data.as_ref()); + + let start = std::time::Instant::now(); + while !exporter.is_stats_worker_active() { + if start.elapsed() > Duration::from_secs(10) { + panic!("Timeout waiting for stats worker to become active"); + } + std::thread::sleep(Duration::from_millis(10)); + } + + let result = exporter.client_side_stats.obfuscation_config.load().enabled; + let _ = runtime.shutdown(None); + result + } + + /// Runs the three opt-in × agent-support cases sequentially in a single test + /// to avoid races on the process-global agent info cache. + #[cfg(feature = "stats-obfuscation")] + #[cfg_attr(miri, ignore)] + #[test] + fn test_client_side_stats_obfuscation_opt_in() { + let current_obf_version = crate::trace_exporter::stats::SUPPORTED_OBFUSCATION_VERSION; + let prev_obf_version = crate::trace_exporter::stats::SUPPORTED_OBFUSCATION_VERSION - 1; + // Opt-in OFF, agent supports → must stay disabled. + assert!( + !run_obfuscation_test(false, Some(current_obf_version)), + "obfuscation must stay disabled when builder opt-in is absent" + ); + // Opt-in ON, agent does not advertise support → disabled. + assert!( + !run_obfuscation_test(true, None), + "obfuscation must stay disabled when agent does not advertise support" + ); + + // Opt-in ON, agent obfuscation_version < tracer obfuscation_version -> disabled; + assert!( + !run_obfuscation_test(true, Some(prev_obf_version)), + "obfuscation must stay disabled when agent.obfuscation_version < tracer.obfuscation_version" + ); + + // Opt-in ON, agent supports → enabled. + assert!( + run_obfuscation_test(true, Some(current_obf_version)), + "obfuscation must activate when opted in and agent supports" + ); + } } diff --git a/libdd-data-pipeline/src/trace_exporter/stats.rs b/libdd-data-pipeline/src/trace_exporter/stats.rs index 9f63597de1..6e1d8a4f49 100644 --- a/libdd-data-pipeline/src/trace_exporter/stats.rs +++ b/libdd-data-pipeline/src/trace_exporter/stats.rs @@ -16,6 +16,10 @@ use libdd_common::Endpoint; use libdd_common::MutexExt; use libdd_shared_runtime::{SharedRuntime, WorkerHandle}; use libdd_trace_stats::span_concentrator::SpanConcentrator; +#[cfg(feature = "stats-obfuscation")] +use libdd_trace_stats::span_concentrator::{ + SharedStatsComputationObfuscationConfig, StatsComputationObfuscationConfig, +}; #[cfg(not(target_arch = "wasm32"))] use libdd_trace_stats::stats_exporter::{StatsExporter, StatsMetadata}; use std::sync::{Arc, Mutex}; @@ -33,6 +37,12 @@ pub(crate) const DEFAULT_STATS_ELIGIBLE_SPAN_KINDS: [&str; 4] = #[cfg(not(target_arch = "wasm32"))] pub(crate) const STATS_ENDPOINT: &str = "/v0.6/stats"; +/// The maximum obfuscation version this tracer supports. +#[cfg(feature = "stats-obfuscation")] +pub(crate) const SUPPORTED_OBFUSCATION_VERSION: u32 = 1; +#[cfg(feature = "stats-obfuscation")] +pub(crate) const SUPPORTED_OBFUSCATION_VERSION_STR: &str = "1"; + #[cfg(not(target_arch = "wasm32"))] /// Context struct that groups immutable parameters used by stats functions pub(crate) struct StatsContext<'a> { @@ -57,6 +67,27 @@ pub(crate) enum StatsComputationStatus { }, } +#[derive(Debug)] +#[cfg_attr(target_arch = "wasm32", allow(dead_code))] +pub(crate) struct StatsComputationConfig { + pub(crate) status: ArcSwap, + #[cfg(feature = "stats-obfuscation")] + pub(crate) obfuscation_config: SharedStatsComputationObfuscationConfig, + /// Builder-level opt-in. When false, stats obfuscation stays off + /// regardless of agent support. + #[cfg(feature = "stats-obfuscation")] + pub(crate) obfuscation_enabled: bool, +} + +/// Return true if the agent's obfuscation version is supported by this tracer +#[cfg(feature = "stats-obfuscation")] +fn is_obfuscation_active(agent_info: &AgentInfo) -> bool { + agent_info + .info + .obfuscation_version + .is_some_and(|v| v >= 1 && v <= SUPPORTED_OBFUSCATION_VERSION) +} + #[cfg(not(target_arch = "wasm32"))] /// Get span kinds for stats computation with default fallback fn get_span_kinds_for_stats(agent_info: &Arc) -> Vec { @@ -75,25 +106,23 @@ pub(crate) fn start_stats_computation< C: HttpClientCapability + SleepCapability + MaybeSend + Sync + 'static, >( ctx: &StatsContext, - client_side_stats: &ArcSwap, span_kinds: Vec, peer_tags: Vec, capabilities: C, + client_side_stats: &StatsComputationConfig, ) -> anyhow::Result<()> { - if let StatsComputationStatus::DisabledByAgent { bucket_size } = **client_side_stats.load() { + if let StatsComputationStatus::DisabledByAgent { bucket_size } = + **client_side_stats.status.load() + { let stats_concentrator = Arc::new(Mutex::new(SpanConcentrator::new( bucket_size, std::time::SystemTime::now(), span_kinds, peer_tags, + #[cfg(feature = "stats-obfuscation")] + Some(client_side_stats.obfuscation_config.clone()), ))); - create_and_start_stats_worker( - ctx, - bucket_size, - &stats_concentrator, - client_side_stats, - capabilities, - )?; + create_and_start_stats_worker(ctx, &stats_concentrator, capabilities, client_side_stats)?; } Ok(()) } @@ -104,17 +133,21 @@ fn create_and_start_stats_worker< C: HttpClientCapability + SleepCapability + MaybeSend + Sync + 'static, >( ctx: &StatsContext, - bucket_size: Duration, stats_concentrator: &Arc>, - client_side_stats: &ArcSwap, capabilities: C, + client_side_stats: &StatsComputationConfig, ) -> anyhow::Result<()> { + let bucket_size = stats_concentrator.lock_or_panic().get_bucket_size(); let stats_exporter = StatsExporter::::new( bucket_size, stats_concentrator.clone(), StatsMetadata::from(ctx.metadata.clone()), Endpoint::from_url(add_path(ctx.endpoint_url, STATS_ENDPOINT)), capabilities.clone(), + #[cfg(feature = "stats-obfuscation")] + client_side_stats.obfuscation_config.clone(), + #[cfg(feature = "stats-obfuscation")] + SUPPORTED_OBFUSCATION_VERSION_STR, ); let worker_handle = ctx .shared_runtime @@ -122,10 +155,12 @@ fn create_and_start_stats_worker< .map_err(|e| anyhow::anyhow!(e))?; // Update the stats computation state with the new worker components. - client_side_stats.store(Arc::new(StatsComputationStatus::Enabled { - stats_concentrator: stats_concentrator.clone(), - worker_handle, - })); + client_side_stats + .status + .store(Arc::new(StatsComputationStatus::Enabled { + stats_concentrator: stats_concentrator.clone(), + worker_handle, + })); Ok(()) } @@ -141,6 +176,7 @@ pub(crate) fn stop_stats_computation( if let StatsComputationStatus::Enabled { stats_concentrator, worker_handle, + .. } = &**client_side_stats.load() { let bucket_size = stats_concentrator.lock_or_panic().get_bucket_size(); @@ -162,19 +198,23 @@ pub(crate) fn handle_stats_disabled_by_agent< >( ctx: &StatsContext, agent_info: &Arc, - client_side_stats: &ArcSwap, capabilities: C, + client_side_stats: &StatsComputationConfig, ) { if agent_info.info.client_drop_p0s.is_some_and(|v| v) { let status = start_stats_computation( ctx, - client_side_stats, get_span_kinds_for_stats(agent_info), agent_info.info.peer_tags.clone().unwrap_or_default(), capabilities, + client_side_stats, ); match status { - Ok(()) => debug!("Client-side stats enabled"), + Ok(()) => { + #[cfg(feature = "stats-obfuscation")] + update_obfuscation_config(agent_info, client_side_stats); + debug!("Client-side stats enabled"); + } Err(_) => error!("Failed to start stats computation"), } } else { @@ -182,27 +222,62 @@ pub(crate) fn handle_stats_disabled_by_agent< } } +#[cfg(feature = "stats-obfuscation")] +#[cfg(not(target_arch = "wasm32"))] +fn update_obfuscation_config( + agent_info: &Arc, + client_side_stats: &StatsComputationConfig, +) { + if matches!( + &**client_side_stats.status.load(), + StatsComputationStatus::Enabled { .. } + ) { + let obfuscation_active = + client_side_stats.obfuscation_enabled && is_obfuscation_active(agent_info); + let sql_obfuscation_mode = (|| { + agent_info + .info + .config + .as_ref()? + .obfuscation + .as_ref()? + .sql_obfuscation_mode + })() + .unwrap_or_default(); + client_side_stats + .obfuscation_config + .store(Arc::new(StatsComputationObfuscationConfig { + enabled: obfuscation_active, + sql_obfuscation_mode, + })); + } +} + #[cfg(not(target_arch = "wasm32"))] /// Handle stats computation when it's already enabled pub(crate) fn handle_stats_enabled( ctx: &StatsContext, agent_info: &Arc, - stats_concentrator: &Mutex, - client_side_stats: &ArcSwap, + stats_concentrator: &Arc>, + client_side_stats: &StatsComputationConfig, ) { if agent_info.info.client_drop_p0s.is_some_and(|v| v) { let mut concentrator = stats_concentrator.lock_or_panic(); concentrator.set_span_kinds(get_span_kinds_for_stats(agent_info)); concentrator.set_peer_tags(agent_info.info.peer_tags.clone().unwrap_or_default()); + #[cfg(feature = "stats-obfuscation")] + update_obfuscation_config(agent_info, client_side_stats); } else { - stop_stats_computation(ctx, client_side_stats); + stop_stats_computation(ctx, &client_side_stats.status); debug!("Client-side stats computation has been disabled by the agent") } } -/// Add all spans from the given iterator into the stats concentrator +/// Add all spans from the given iterator into the stats concentrator, optionally obfuscating +/// resource names for client-side stats. +/// /// # Panic -/// Will panic if another thread panicked will holding the lock on `stats_concentrator` +/// Will panic if another thread panicked while holding the lock on `stats_concentrator` fn add_spans_to_stats( stats_concentrator: &Mutex, traces: &[Vec>], @@ -223,9 +298,10 @@ pub(crate) fn process_traces_for_stats( client_side_stats: &ArcSwap, client_computed_top_level: bool, ) -> libdd_trace_utils::span::trace_utils::DroppedP0Stats { + let status = client_side_stats.load(); if let StatsComputationStatus::Enabled { stats_concentrator, .. - } = &**client_side_stats.load() + } = &**status { if !client_computed_top_level { for chunk in traces.iter_mut() { @@ -282,3 +358,19 @@ impl From for StatsMetadata { } } } + +#[cfg(test)] +mod tests { + #[cfg(feature = "stats-obfuscation")] + #[test] + fn test_obfuscation_version_was_updated() { + use crate::trace_exporter::stats::{ + SUPPORTED_OBFUSCATION_VERSION, SUPPORTED_OBFUSCATION_VERSION_STR, + }; + + assert_eq!( + SUPPORTED_OBFUSCATION_VERSION.to_string(), + SUPPORTED_OBFUSCATION_VERSION_STR + ); + } +} diff --git a/libdd-trace-obfuscation/src/obfuscate.rs b/libdd-trace-obfuscation/src/obfuscate.rs index 5ae9015ca5..a87b6e9776 100644 --- a/libdd-trace-obfuscation/src/obfuscate.rs +++ b/libdd-trace-obfuscation/src/obfuscate.rs @@ -14,7 +14,7 @@ use crate::{ obfuscation_config::ObfuscationConfig, redis::{obfuscate_redis_string, quantize_redis_string, remove_all_redis_args}, replacer::replace_span_tags, - sql::DbmsKind, + sql::{DbmsKind, SqlObfuscationMode}, }; /// TAG_REDIS_RAW_COMMAND represents a redis raw command tag @@ -38,6 +38,36 @@ const TAG_DBMS: &str = "db.type"; /// TAG_CARD_NUMBER represents a card number tag const TAG_CARD_NUMBER: &str = "card.number"; +/// Obfuscate a resource name for client-side stats (Version 1). +/// +/// Applies the same resource transformations as `obfuscate_span`, but only for span types whose +/// resource names are modified: +/// - `"sql"`, `"cassandra"`: SQL obfuscation +/// - `"redis"`, `"valkey"`: Redis quantization (command names only) +/// +/// Returns `Some(obfuscated)` if the resource was modified, `None` if no obfuscation was needed. +pub fn obfuscate_resource_for_stats( + span_type: &str, + resource: &str, + dbms_hint: Option<&str>, + sql_obfuscation_mode: SqlObfuscationMode, +) -> Option { + match span_type { + "sql" | "cassandra" if !resource.is_empty() => { + let dbms: DbmsKind = dbms_hint + .and_then(|d| d.try_into().ok()) + .unwrap_or_default(); + let config = &crate::sql::SqlObfuscateConfig { + obfuscation_mode: sql_obfuscation_mode, + ..Default::default() + }; + Some(crate::sql::obfuscate_sql(resource, config, dbms)) + } + "redis" | "valkey" => Some(quantize_redis_string(resource)), + _ => None, + } +} + /// `obfuscate_span` goes through `span` fields and applies obfuscation on it // TODO(APMSP-2764): return parsing errors in a vec to log them ? pub fn obfuscate_span(span: &mut pb::Span, config: &ObfuscationConfig) { @@ -246,10 +276,58 @@ fn should_obfuscate_cc_key(key: &str, config: &ObfuscationConfig) -> bool { #[cfg(test)] mod tests { - use super::obfuscate_span; + use super::{obfuscate_resource_for_stats, obfuscate_span}; use crate::{obfuscation_config, replacer}; use libdd_trace_utils::test_utils; + // test helper with default params + fn obfuscate_stats(span_type: &str, resource: &str) -> Option { + obfuscate_resource_for_stats( + span_type, + resource, + None, + crate::sql::SqlObfuscationMode::default(), + ) + } + + #[test] + fn test_obfuscate_resource_for_stats_sql() { + let result = obfuscate_stats("sql", "SELECT * FROM users WHERE id = 42"); + assert_eq!(result.unwrap(), "SELECT * FROM users WHERE id = ?"); + } + + #[test] + fn test_obfuscate_resource_for_stats_cassandra() { + let result = obfuscate_stats("cassandra", "SELECT * FROM table1 WHERE id = 42"); + assert_eq!(result.unwrap(), "SELECT * FROM table1 WHERE id = ?"); + } + + #[test] + fn test_obfuscate_resource_for_stats_redis() { + let result = obfuscate_stats("redis", "SET mykey myvalue\nGET mykey"); + assert!(result.is_some()); + // quantize_redis_string extracts command names + assert_eq!(result.unwrap(), "SET GET"); + } + + #[test] + fn test_obfuscate_resource_for_stats_valkey() { + let result = obfuscate_stats("valkey", "SET mykey myvalue\nGET mykey"); + assert_eq!(result.unwrap(), "SET GET"); + } + + #[test] + fn test_obfuscate_resource_for_stats_no_match() { + assert!(obfuscate_stats("http", "/api/users").is_none()); + assert!(obfuscate_stats("web", "/api/users").is_none()); + assert!(obfuscate_stats("grpc", "MyService/MyMethod").is_none()); + } + + #[test] + fn test_obfuscate_resource_for_stats_empty_sql() { + assert!(obfuscate_stats("sql", "").is_none()); + } + #[test] fn test_obfuscates_span_url_strings() { let mut span = test_utils::create_test_span(111, 222, 0, 1, true); diff --git a/libdd-trace-obfuscation/src/obfuscation_config.rs b/libdd-trace-obfuscation/src/obfuscation_config.rs index f440063a9a..f8d478d322 100644 --- a/libdd-trace-obfuscation/src/obfuscation_config.rs +++ b/libdd-trace-obfuscation/src/obfuscation_config.rs @@ -9,7 +9,7 @@ use libdd_common::config::parse_env; use crate::{ replacer::{self, ReplaceRule}, - sql::SqlObfuscateConfig, + sql::{SqlObfuscateConfig, SqlObfuscationMode}, }; #[derive(Debug, Default, Deserialize)] @@ -92,6 +92,12 @@ pub struct ObfuscationConfig { pub mongodb: JsonObfuscatorConfig, } +// Small subset of `ObfuscationConfig` for stats obfuscation only +#[derive(Default)] +pub struct StatsObfuscationConfig { + pub sql_obfuscation_mode: SqlObfuscationMode, +} + impl ObfuscationConfig { pub fn new() -> Result> { let tag_replace_rules: Option> = match env::var("DD_APM_REPLACE_TAGS") { diff --git a/libdd-trace-obfuscation/src/sql.rs b/libdd-trace-obfuscation/src/sql.rs index 0971bc3180..ee120b77a4 100644 --- a/libdd-trace-obfuscation/src/sql.rs +++ b/libdd-trace-obfuscation/src/sql.rs @@ -35,12 +35,13 @@ impl TryFrom<&str> for DbmsKind { } #[allow(deprecated)] -#[derive(Debug, Default, Clone, Copy, Serialize, Deserialize)] +#[derive(Debug, Default, Clone, Copy, Serialize, Deserialize, PartialEq)] #[serde(rename_all = "snake_case")] #[non_exhaustive] pub enum SqlObfuscationMode { #[default] #[deprecated = "kept for compatibility with agent's obfuscator but has unintuitive behavior"] + #[serde(alias = "")] Unspecified, NormalizeOnly, ObfuscateOnly, diff --git a/libdd-trace-stats/Cargo.toml b/libdd-trace-stats/Cargo.toml index d9546555b4..d4c7c38e52 100644 --- a/libdd-trace-stats/Cargo.toml +++ b/libdd-trace-stats/Cargo.toml @@ -10,6 +10,7 @@ license.workspace = true autobenches = false [dependencies] +arc-swap.workspace = true anyhow = "1.0" libdd-capabilities = { path = "../libdd-capabilities", version = "1.0.0" } libdd-common = { version = "4.0.0", path = "../libdd-common", default-features = false } @@ -47,5 +48,6 @@ tokio = { version = "1.23", features = ["rt-multi-thread", "macros", "test-util" [features] default = ["https"] +stats-obfuscation = [] https = ["libdd-common/https", "libdd-capabilities-impl/https", "libdd-shared-runtime/https"] fips = ["libdd-common/fips", "libdd-capabilities-impl/fips", "libdd-shared-runtime/fips"] diff --git a/libdd-trace-stats/benches/span_concentrator_bench.rs b/libdd-trace-stats/benches/span_concentrator_bench.rs index 21c105c952..e4c7df43a2 100644 --- a/libdd-trace-stats/benches/span_concentrator_bench.rs +++ b/libdd-trace-stats/benches/span_concentrator_bench.rs @@ -47,6 +47,8 @@ pub fn criterion_benchmark(c: &mut Criterion) { now, vec![], vec!["db_name".into(), "bucket_s3".into()], + #[cfg(feature = "stats-obfuscation")] + None, ); let mut spans = vec![]; for trace_id in 1..100 { diff --git a/libdd-trace-stats/src/lib.rs b/libdd-trace-stats/src/lib.rs index ca62eeb1ab..8e5793c8bb 100644 --- a/libdd-trace-stats/src/lib.rs +++ b/libdd-trace-stats/src/lib.rs @@ -3,6 +3,7 @@ #![cfg_attr(not(test), deny(clippy::panic))] #![cfg_attr(not(test), deny(clippy::unwrap_used))] #![cfg_attr(not(test), deny(clippy::expect_used))] +#![cfg_attr(not(test), deny(clippy::unreachable))] #![cfg_attr(not(test), deny(clippy::todo))] #![cfg_attr(not(test), deny(clippy::unimplemented))] diff --git a/libdd-trace-stats/src/span_concentrator/aggregation.rs b/libdd-trace-stats/src/span_concentrator/aggregation.rs index d277e47db6..e9b24d0d2f 100644 --- a/libdd-trace-stats/src/span_concentrator/aggregation.rs +++ b/libdd-trace-stats/src/span_concentrator/aggregation.rs @@ -202,7 +202,20 @@ impl<'a> BorrowedAggregationKey<'a> { /// /// If `peer_tags_keys` is not empty then the peer tags of the span will be included in the /// key. - pub fn from_span>(span: &'a T, peer_tag_keys: &'a [String]) -> Self { + pub(super) fn from_span>(span: &'a T, peer_tag_keys: &'a [String]) -> Self { + Self::from_obfuscated_span(span.resource(), span, peer_tag_keys) + } + + pub(crate) fn from_obfuscated_span<'b, T>( + resource_name: &'a str, + span: &'b T, + peer_tag_keys: &'b [String], + ) -> BorrowedAggregationKey<'a> + where + T: StatSpan<'b>, + // resource_name is a temporary string on the stack the span will outlive it + 'b: 'a, + { let span_kind = span.get_meta(TAG_SPANKIND).unwrap_or_default(); let peer_tags = if should_track_peer_tags(span_kind) { // Parse the meta tags of the span and return a list of the peer tags based on the list @@ -242,7 +255,7 @@ impl<'a> BorrowedAggregationKey<'a> { Self { fixed: FixedAggregationKey { - resource_name: span.resource(), + resource_name, service_name: span.service(), operation_name: span.name(), span_type: span.r#type(), diff --git a/libdd-trace-stats/src/span_concentrator/mod.rs b/libdd-trace-stats/src/span_concentrator/mod.rs index 151d8fc2ef..1e83a4b342 100644 --- a/libdd-trace-stats/src/span_concentrator/mod.rs +++ b/libdd-trace-stats/src/span_concentrator/mod.rs @@ -53,6 +53,18 @@ where }) && !span.is_partial_snapshot() } +#[cfg(feature = "stats-obfuscation")] +#[derive(Clone, Debug, Default)] +#[cfg_attr(target_arch = "wasm32", allow(dead_code))] +pub struct StatsComputationObfuscationConfig { + pub enabled: bool, + pub sql_obfuscation_mode: libdd_trace_obfuscation::sql::SqlObfuscationMode, +} + +#[cfg(feature = "stats-obfuscation")] +pub type SharedStatsComputationObfuscationConfig = + std::sync::Arc>; + /// SpanConcentrator compute stats on span aggregated by time and span attributes /// /// # Aggregation @@ -82,6 +94,8 @@ pub struct SpanConcentrator { span_kinds_stats_computed: Vec, /// keys for supplementary tags that describe peer.service entities peer_tag_keys: Vec, + #[cfg(feature = "stats-obfuscation")] + obfuscation_config: SharedStatsComputationObfuscationConfig, } impl SpanConcentrator { @@ -90,11 +104,15 @@ impl SpanConcentrator { /// - `now` the current system time, used to define the oldest bucket /// - `span_kinds_stats_computed` list of span kinds eligible for stats computation /// - `peer_tags_keys` list of keys considered as peer tags for aggregation + /// - `obfuscation_config` optional and updatable config for resource key obfuscation pub fn new( bucket_size: Duration, now: SystemTime, span_kinds_stats_computed: Vec, peer_tag_keys: Vec, + #[cfg(feature = "stats-obfuscation")] obfuscation_config: Option< + SharedStatsComputationObfuscationConfig, + >, ) -> SpanConcentrator { SpanConcentrator { bucket_size: bucket_size.as_nanos() as u64, @@ -106,6 +124,8 @@ impl SpanConcentrator { buffer_len: 2, span_kinds_stats_computed, peer_tag_keys, + #[cfg(feature = "stats-obfuscation")] + obfuscation_config: obfuscation_config.unwrap_or_default(), } } @@ -136,32 +156,52 @@ impl SpanConcentrator { /// Add a span into the concentrator, by computing stats if the span is eligible for stats /// computation. - pub fn add_span<'a, T>(&'a mut self, span: &'a T) - where - T: StatSpan<'a>, - { - // If the span is eligible for stats computation - if is_span_eligible(span, self.span_kinds_stats_computed.as_slice()) { - let mut bucket_timestamp = - align_timestamp((span.start() + span.duration()) as u64, self.bucket_size); - // If the span is to old we aggregate it in the latest bucket instead of - // creating a new one - if bucket_timestamp < self.oldest_timestamp { - bucket_timestamp = self.oldest_timestamp; - } - - let agg_key = BorrowedAggregationKey::from_span(span, self.peer_tag_keys.as_slice()); - - self.buckets - .entry(bucket_timestamp) - .or_insert(StatsBucket::new(bucket_timestamp)) - .insert( - agg_key, - span.duration(), - span.is_error(), - span.has_top_level(), - ); + pub fn add_span<'a>(&'a mut self, span: &'a impl StatSpan<'a>) { + if !is_span_eligible(span, self.span_kinds_stats_computed.as_slice()) { + return; + } + let mut bucket_timestamp = + align_timestamp((span.start() + span.duration()) as u64, self.bucket_size); + // If the span is to old we aggregate it in the latest bucket instead of + // creating a new one + if bucket_timestamp < self.oldest_timestamp { + bucket_timestamp = self.oldest_timestamp; } + let obfuscated_resource = self.compute_obfuscated_span(span); + let agg_key = match obfuscated_resource.as_deref() { + Some(res) => BorrowedAggregationKey::from_obfuscated_span( + res, + span, + self.peer_tag_keys.as_slice(), + ), + None => BorrowedAggregationKey::from_span(span, self.peer_tag_keys.as_slice()), + }; + self.buckets + .entry(bucket_timestamp) + .or_insert(StatsBucket::new(bucket_timestamp)) + .insert( + agg_key, + span.duration(), + span.is_error(), + span.has_top_level(), + ); + } + + fn compute_obfuscated_span<'a>( + &self, + #[allow(unused)] span: &'a impl StatSpan<'a>, + ) -> Option { + #[cfg(feature = "stats-obfuscation")] + if self.obfuscation_config.load().enabled { + let dbms_hint: Option<&str> = span.get_meta("db.type"); + return libdd_trace_obfuscation::obfuscate::obfuscate_resource_for_stats( + span.r#type(), + span.resource(), + dbms_hint, + self.obfuscation_config.load().sql_obfuscation_mode, + ); + } + None } /// Flush all stats bucket except for the `buffer_len` most recent. If `force` is true, flush @@ -197,5 +237,17 @@ impl SpanConcentrator { } } +#[cfg(feature = "stats-obfuscation")] +impl StatsComputationObfuscationConfig { + pub fn disabled() -> SharedStatsComputationObfuscationConfig { + use arc_swap::ArcSwap; + use std::sync::Arc; + + Arc::new(ArcSwap::from_pointee( + StatsComputationObfuscationConfig::default(), + )) + } +} + #[cfg(test)] mod tests; diff --git a/libdd-trace-stats/src/span_concentrator/tests.rs b/libdd-trace-stats/src/span_concentrator/tests.rs index 25bbae29c0..c702344312 100644 --- a/libdd-trace-stats/src/span_concentrator/tests.rs +++ b/libdd-trace-stats/src/span_concentrator/tests.rs @@ -99,8 +99,14 @@ fn assert_counts_equal(expected: Vec, actual: Vec @@ -79,14 +85,15 @@ impl /// agent /// - `meta` metadata used in ClientStatsPayload and as headers to send stats to the agent /// - `endpoint` the Endpoint used to send stats to the agent - /// - `cancellation_token` Token used to safely shutdown the exporter by force flushing the - /// concentrator pub fn new( flush_interval: time::Duration, concentrator: Arc>, meta: StatsMetadata, endpoint: Endpoint, capabilities: Cap, + #[cfg(feature = "stats-obfuscation")] + obfuscation_config: SharedStatsComputationObfuscationConfig, + #[cfg(feature = "stats-obfuscation")] supported_obfuscation_version: &'static str, ) -> Self { Self { flush_interval, @@ -95,6 +102,10 @@ impl meta, sequence_id: AtomicU64::new(0), capabilities, + #[cfg(feature = "stats-obfuscation")] + obfuscation_config, + #[cfg(feature = "stats-obfuscation")] + supported_obfuscation_version, } } @@ -128,6 +139,14 @@ impl libdd_common::header::APPLICATION_MSGPACK, ); + #[cfg(feature = "stats-obfuscation")] + if self.obfuscation_config.load().enabled { + headers.insert( + http::HeaderName::from_static("datadog-obfuscation-version"), + http::HeaderValue::from_static(self.supported_obfuscation_version), + ); + } + let result = send_with_retry( &self.capabilities, &self.endpoint, @@ -227,6 +246,8 @@ pub fn stats_url_from_agent_url(agent_url: &str) -> anyhow::Result { #[cfg(test)] mod tests { use super::*; + #[cfg(feature = "stats-obfuscation")] + use crate::span_concentrator::StatsComputationObfuscationConfig; use httpmock::prelude::*; use httpmock::MockServer; use libdd_capabilities_impl::NativeCapabilities; @@ -268,6 +289,8 @@ mod tests { SystemTime::now() - BUCKETS_DURATION * 3, vec![], vec![], + #[cfg(feature = "stats-obfuscation")] + None, ); let mut trace = vec![]; @@ -309,6 +332,10 @@ mod tests { get_test_metadata(), Endpoint::from_url(stats_url_from_agent_url(&server.url("/")).unwrap()), NativeCapabilities::new_client(), + #[cfg(feature = "stats-obfuscation")] + StatsComputationObfuscationConfig::disabled(), + #[cfg(feature = "stats-obfuscation")] + "1", ); let send_status = stats_exporter.send(true).await; @@ -336,6 +363,10 @@ mod tests { get_test_metadata(), Endpoint::from_url(stats_url_from_agent_url(&server.url("/")).unwrap()), NativeCapabilities::new_client(), + #[cfg(feature = "stats-obfuscation")] + StatsComputationObfuscationConfig::disabled(), + #[cfg(feature = "stats-obfuscation")] + "1", ); let send_status = stats_exporter.send(true).await; @@ -371,6 +402,10 @@ mod tests { get_test_metadata(), Endpoint::from_url(stats_url_from_agent_url(&server.url("/")).unwrap()), caps.clone(), + #[cfg(feature = "stats-obfuscation")] + StatsComputationObfuscationConfig::disabled(), + #[cfg(feature = "stats-obfuscation")] + "1", ); let _handle = shared_runtime .spawn_worker(stats_exporter, true) @@ -412,6 +447,10 @@ mod tests { get_test_metadata(), Endpoint::from_url(stats_url_from_agent_url(&server.url("/")).unwrap()), caps.clone(), + #[cfg(feature = "stats-obfuscation")] + StatsComputationObfuscationConfig::disabled(), + #[cfg(feature = "stats-obfuscation")] + "1", ); let _handle = shared_runtime @@ -451,4 +490,43 @@ mod tests { "Non-empty env should be preserved" ); } + #[cfg(feature = "stats-obfuscation")] + #[cfg_attr(miri, ignore)] + #[tokio::test] + async fn test_send_stats_with_obfuscation_header() { + use arc_swap::ArcSwap; + + let server = MockServer::start_async().await; + + let mock = server + .mock_async(|when, then| { + when.method(POST) + .header("Content-type", "application/msgpack") + .header("datadog-obfuscation-version", "1") + .path("/v0.6/stats") + .body_includes("libdatadog-test"); + then.status(200).body(""); + }) + .await; + + let stats_exporter = StatsExporter::new( + BUCKETS_DURATION, + Arc::new(Mutex::new(get_test_concentrator())), + get_test_metadata(), + Endpoint::from_url(stats_url_from_agent_url(&server.url("/")).unwrap()), + NativeCapabilities::new_client(), + #[cfg(feature = "stats-obfuscation")] + Arc::new(ArcSwap::from_pointee(StatsComputationObfuscationConfig { + enabled: true, + ..Default::default() + })), + #[cfg(feature = "stats-obfuscation")] + "1", + ); + + let send_status = stats_exporter.send(true).await; + send_status.unwrap(); + + mock.assert_async().await; + } }