diff --git a/confidence-cloudflare-resolver/src/lib.rs b/confidence-cloudflare-resolver/src/lib.rs index 05671b15..b2005059 100644 --- a/confidence-cloudflare-resolver/src/lib.rs +++ b/confidence-cloudflare-resolver/src/lib.rs @@ -1,7 +1,8 @@ use confidence_resolver::{ - assign_logger, flag_logger, + assign_logger::AssignLogger, + flag_logger, proto::{confidence, google::Struct}, - resolve_logger, + resolve_logger::ResolveLogger, telemetry::{self, TelemetrySnapshot}, FlagToApply, Host, ResolvedValue, ResolverState, }; @@ -13,7 +14,6 @@ use bytes::Bytes; use prost::Message; use serde_json::from_slice; use serde_json::json; -use std::cell::RefCell; use wasm_bindgen::JsCast; use confidence::flags::resolver::v1::{ApplyFlagsRequest, ApplyFlagsResponse, ResolveFlagsRequest}; @@ -39,12 +39,10 @@ const CDN_STATE_BYTES: &[u8] = include_bytes!("../../data/resolver_state_current const ENCRYPTION_KEY_BASE64: &str = include_str!("../../data/encryption_key"); use confidence::flags::resolver::v1::Sdk; -use confidence_resolver::proto::confidence::flags::resolver::v1::WriteFlagLogsRequest; -use std::sync::OnceLock; - -thread_local! { - static FLAG_LOG: RefCell> = const { RefCell::new(None) }; -} +use confidence_resolver::proto::confidence::flags::resolver::v1::{ + TelemetryData, WriteFlagLogsRequest, +}; +use std::sync::{LazyLock, Mutex, OnceLock}; /// Prometheus exposition format content type (version 0.0.4). const PROMETHEUS_CONTENT_TYPE: &str = "text/plain; version=0.0.4; charset=utf-8"; @@ -53,6 +51,10 @@ static FLAGS_LOGS_QUEUE: OnceLock = OnceLock::new(); static CONFIDENCE_CLIENT_SECRET: OnceLock = OnceLock::new(); +static RESOLVE_LOGGER: LazyLock> = LazyLock::new(ResolveLogger::new); +static ASSIGN_LOGGER: LazyLock = LazyLock::new(AssignLogger::new); +static TELEMETRY_LOG: Mutex> = Mutex::new(None); + /// Parsed CDN state request containing both state and account_id static CDN_STATE_REQUEST: Lazy = Lazy::new(|| { SetResolverStateRequest::decode(Bytes::from_static(CDN_STATE_BYTES)) @@ -79,22 +81,18 @@ struct H {} impl Host for H { fn log_resolve( - _resolve_id: &str, + resolve_id: &str, evaluation_context: &Struct, values: &[ResolvedValue<'_>], client: &Client, ) { - FLAG_LOG.with(|f| { - if let Some(req) = f.borrow_mut().as_mut() { - let (flag_infos, client_info) = resolve_logger::build_resolve_log( - evaluation_context, - client.client_credential_name.as_str(), - values, - ); - req.flag_resolve_info.extend(flag_infos); - req.client_resolve_info.push(client_info); - } - }); + RESOLVE_LOGGER.log_resolve( + resolve_id, + evaluation_context, + client.client_credential_name.as_str(), + values, + client, + ); } fn log_assign( @@ -103,17 +101,7 @@ impl Host for H { client: &Client, sdk: &Option, ) { - FLAG_LOG.with(|f| { - if let Some(req) = f.borrow_mut().as_mut() { - req.flag_assigned - .push(assign_logger::build_flag_assigned( - resolve_id, - assigned_flags, - client, - sdk, - )); - } - }); + ASSIGN_LOGGER.log_assigns(resolve_id, assigned_flags, client, sdk); } } @@ -134,6 +122,47 @@ fn sdk_info() -> Sdk { } } +fn accumulate_telemetry(td: TelemetryData) { + if let Ok(mut guard) = TELEMETRY_LOG.lock() { + match guard.as_mut() { + Some(acc) => { + match (&mut acc.resolve_latency, td.resolve_latency) { + (Some(a), Some(d)) => { + a.sum = a.sum.wrapping_add(d.sum); + a.count = a.count.wrapping_add(d.count); + a.buckets.extend(d.buckets); + } + (None, some) => acc.resolve_latency = some, + _ => {} + } + for dr in td.resolve_rate { + if let Some(ar) = acc.resolve_rate.iter_mut().find(|r| r.reason == dr.reason) { + ar.count = ar.count.wrapping_add(dr.count); + } else { + acc.resolve_rate.push(dr); + } + } + if !td.resolver_version.is_empty() { + acc.resolver_version = td.resolver_version; + } + } + None => *guard = Some(td), + } + } +} + +fn checkpoint() -> WriteFlagLogsRequest { + let mut req = RESOLVE_LOGGER.checkpoint(); + ASSIGN_LOGGER.checkpoint_fill(&mut req); + if let Ok(mut guard) = TELEMETRY_LOG.lock() { + if let Some(mut td) = guard.take() { + td.sdk = Some(sdk_info()); + req.telemetry_data = Some(td); + } + } + req +} + #[event(fetch)] pub async fn main(req: Request, env: Env, ctx: Context) -> Result { match env.queue("flag_logs_queue") { @@ -150,15 +179,13 @@ pub async fn main(req: Request, env: Env, ctx: Context) -> Result { let allowed_origin_env = env .var("ALLOWED_ORIGIN") .map(|var| var.to_string()) - .unwrap_or("*".to_string()); // Fallback to "*" if the variable is not set + .unwrap_or("*".to_string()); - // Optional env var containing the resolver state ETag for this deployment let state_etag_env = env .var("RESOLVER_STATE_ETAG") .map(|var| var.to_string()) .unwrap_or_default(); - // Optional env var containing the confidence-resolver commit used for this deployment let resolver_version_env = env .var("DEPLOYER_VERSION") .map(|var| var.to_string()) @@ -175,7 +202,6 @@ pub async fn main(req: Request, env: Env, ctx: Context) -> Result { .get_async("/metrics", |req, ctx| { let allowed_origin = allowed_origin_env.clone(); async move { - // Require client secret — metrics are not public. if let Some(expected) = CONFIDENCE_CLIENT_SECRET.get() { let authorized = req.headers().get("Authorization").ok().flatten() .map(|v| v.strip_prefix("ClientSecret ").unwrap_or("") == expected.as_str()) @@ -196,7 +222,6 @@ pub async fn main(req: Request, env: Env, ctx: Context) -> Result { Response::ok(body)?.with_headers(headers).with_cors_headers(&allowed_origin) } }) - // GET endpoint to expose the current deployment state etag and resolver version .get_async("/v1/state:etag", |_req, _ctx| { let allowed_origin = allowed_origin_env.clone(); let etag_value = state_etag_env.clone(); @@ -209,15 +234,12 @@ pub async fn main(req: Request, env: Env, ctx: Context) -> Result { Response::from_json(&body)?.with_cors_headers(&allowed_origin) } }) - // Router treats ":name" as parameters, which is incompatible without URLs - // so we use "*path" to match the whole path and do the matching in the handler .post_async("/v1/*path", |mut req, ctx| { let allowed_origin = allowed_origin_env.clone(); async move { let path = ctx.param("path").unwrap(); match path.as_str() { "flags:resolve" => { - FLAG_LOG.with(|f| *f.borrow_mut() = Some(WriteFlagLogsRequest::default())); let body_bytes: Vec = req.bytes().await?; let mut resolver_request: ResolveFlagsRequest = match from_slice(&body_bytes) { @@ -230,15 +252,12 @@ pub async fn main(req: Request, env: Env, ctx: Context) -> Result { .with_cors_headers(&allowed_origin); } }; - // Default apply to true for Cloudflare resolver resolver_request.apply = true; let evaluation_context = resolver_request .evaluation_context .clone() .unwrap_or_default(); - // Start timer before resolve. CF Workers freeze timers - // during sync CPU, but scheduler.wait(0) unfreezes them. let t0 = js_sys::Date::now(); let (reasons, resp) = match state.get_resolver::( @@ -307,18 +326,13 @@ pub async fn main(req: Request, env: Env, ctx: Context) -> Result { } }; - let mut td = telemetry::build_request_telemetry(elapsed_us, &reasons); - td.sdk = Some(sdk_info()); - FLAG_LOG.with(|f| { - if let Some(req) = f.borrow_mut().as_mut() { - req.telemetry_data = Some(td); - } - }); + accumulate_telemetry( + telemetry::build_request_telemetry(elapsed_us, &reasons), + ); resp } "flags:apply" => { - FLAG_LOG.with(|f| *f.borrow_mut() = Some(WriteFlagLogsRequest::default())); let body_bytes: Vec = req.bytes().await?; let apply_flag_req: ApplyFlagsRequest = match from_slice(&body_bytes) { Ok(req) => req, @@ -354,14 +368,11 @@ pub async fn main(req: Request, env: Env, ctx: Context) -> Result { .run(req, env) .await; - // Use ctx.waitUntil to run logging and telemetry after response is returned. - let flag_log = FLAG_LOG.with(|f| f.borrow_mut().take()); ctx.wait_until(async move { - if let Some(req) = flag_log { - if let Ok(json) = serde_json::to_string(&req) { - if let Some(queue) = FLAGS_LOGS_QUEUE.get() { - let _ = queue.send(json).await; - } + let req = checkpoint(); + if let Ok(json) = serde_json::to_string(&req) { + if let Some(queue) = FLAGS_LOGS_QUEUE.get() { + let _ = queue.send(json).await; } } }); @@ -386,7 +397,6 @@ pub async fn consume_flag_logs_queue( let req = flag_logger::aggregate_batch(logs); - // Accumulate telemetry deltas into KV-backed cumulative snapshot for /metrics. if let Ok(kv) = env.kv("CONFIDENCE_METRICS_KV") { update_prometheus_kv(&kv, &req).await; } @@ -397,12 +407,6 @@ pub async fn consume_flag_logs_queue( Ok(()) } -/// Accumulate telemetry deltas from all isolates into a cumulative -/// `TelemetrySnapshot` stored in KV, then write its Prometheus text -/// representation for the /metrics endpoint. -/// -/// Note: concurrent queue consumer invocations can race on KV read-modify-write. -/// Acceptable for metrics — at worst one batch's deltas are lost, not cumulative state. async fn update_prometheus_kv(kv: &kv::KvStore, req: &WriteFlagLogsRequest) { let mut cumulative = match kv.get("snapshot").text().await { Ok(Some(text)) => serde_json::from_str::(&text).unwrap_or_default(),