From 0274b1826246280a882f215b9edd23f7dcb96195 Mon Sep 17 00:00:00 2001 From: vahid torkaman <692343+vahidlazio@users.noreply.github.com> Date: Tue, 19 May 2026 12:09:57 +0200 Subject: [PATCH 1/2] fix(cloudflare): protect FLAG_LOG from async request interleaving MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit CF Workers handle concurrent async requests on a single thread, interleaving at await points. The thread_local FLAG_LOG was set before `req.bytes().await` and read after `scheduler.wait(0).await`, so a concurrent request could overwrite another's telemetry data. Fix: set FLAG_LOG immediately before the synchronous resolve_flags call and take it into a local variable immediately after. The entire set→resolve→take window has no await points, so no interleaving. The scheduler.wait and telemetry code then operate on the local. Data is put back into FLAG_LOG only at the end with no await before handler return. Co-Authored-By: Claude Opus 4.6 (1M context) --- confidence-cloudflare-resolver/src/lib.rs | 34 +++++++++++++++-------- 1 file changed, 22 insertions(+), 12 deletions(-) diff --git a/confidence-cloudflare-resolver/src/lib.rs b/confidence-cloudflare-resolver/src/lib.rs index 05671b15..28832532 100644 --- a/confidence-cloudflare-resolver/src/lib.rs +++ b/confidence-cloudflare-resolver/src/lib.rs @@ -217,7 +217,6 @@ pub async fn main(req: Request, env: Env, ctx: Context) -> Result { let path = ctx.param("path").unwrap(); match path.as_str() { "flags:resolve" => { - FLAG_LOG.with(|f| *f.borrow_mut() = Some(WriteFlagLogsRequest::default())); let body_bytes: Vec = req.bytes().await?; let mut resolver_request: ResolveFlagsRequest = match from_slice(&body_bytes) { @@ -230,17 +229,19 @@ pub async fn main(req: Request, env: Env, ctx: Context) -> Result { .with_cors_headers(&allowed_origin); } }; - // Default apply to true for Cloudflare resolver resolver_request.apply = true; let evaluation_context = resolver_request .evaluation_context .clone() .unwrap_or_default(); - // Start timer before resolve. CF Workers freeze timers - // during sync CPU, but scheduler.wait(0) unfreezes them. let t0 = js_sys::Date::now(); + // FLAG_LOG is set right before the synchronous resolve + // and taken right after — no await points in between, + // so concurrent async requests cannot interleave. + FLAG_LOG.with(|f| *f.borrow_mut() = Some(WriteFlagLogsRequest::default())); + let (reasons, resp) = match state.get_resolver::( &resolver_request.client_secret, evaluation_context, @@ -286,6 +287,12 @@ pub async fn main(req: Request, env: Env, ctx: Context) -> Result { } }; + // Take into a local immediately — everything above + // is synchronous, so no other request could interleave. + let mut request_log = FLAG_LOG.with(|f| f.borrow_mut().take()); + + // scheduler.wait(0) yields to the runtime; safe because + // request_log is a local, not in the thread_local. let elapsed_us = { let scheduler = js_sys::Reflect::get( &js_sys::global(), &wasm_bindgen::JsValue::from_str("scheduler") @@ -307,18 +314,19 @@ pub async fn main(req: Request, env: Env, ctx: Context) -> Result { } }; - let mut td = telemetry::build_request_telemetry(elapsed_us, &reasons); - td.sdk = Some(sdk_info()); - FLAG_LOG.with(|f| { - if let Some(req) = f.borrow_mut().as_mut() { - req.telemetry_data = Some(td); - } - }); + if let Some(ref mut req) = request_log { + let mut td = telemetry::build_request_telemetry(elapsed_us, &reasons); + td.sdk = Some(sdk_info()); + req.telemetry_data = Some(td); + } + + // Put back for wait_until — no await between here + // and handler return, so no interleaving possible. + FLAG_LOG.with(|f| *f.borrow_mut() = request_log); resp } "flags:apply" => { - FLAG_LOG.with(|f| *f.borrow_mut() = Some(WriteFlagLogsRequest::default())); let body_bytes: Vec = req.bytes().await?; let apply_flag_req: ApplyFlagsRequest = match from_slice(&body_bytes) { Ok(req) => req, @@ -331,6 +339,8 @@ pub async fn main(req: Request, env: Env, ctx: Context) -> Result { } }; + FLAG_LOG.with(|f| *f.borrow_mut() = Some(WriteFlagLogsRequest::default())); + match state.get_resolver::( &apply_flag_req.client_secret, Struct::default(), From c5d119b1acb572bd576511874a2c8304e92cc474 Mon Sep 17 00:00:00 2001 From: vahid torkaman <692343+vahidlazio@users.noreply.github.com> Date: Tue, 19 May 2026 12:15:07 +0200 Subject: [PATCH 2/2] fix(cloudflare): restore batched checkpoint pattern for telemetry Restores the old atomic ResolveLogger/AssignLogger statics that safely accumulate data across concurrent async requests, replacing the thread_local RefCell that was not safe across await points. Key changes: - Restore static RESOLVE_LOGGER (ArcSwap) and ASSIGN_LOGGER (SegQueue) - Add static TELEMETRY_LOG (Mutex) to accumulate per-request latency and resolve-rate deltas across requests - Restore checkpoint() that atomically drains all three accumulators into a single WriteFlagLogsRequest per queue message - Keep telemetry collection: timer, scheduler.wait(0), latency histogram - Keep /metrics endpoint and KV-backed Prometheus exposition - Remove thread_local FLAG_LOG and RefCell This is both more correct (no data races between concurrent async requests) and more efficient (one batched queue message per checkpoint vs one per request). Co-Authored-By: Claude Opus 4.6 (1M context) --- confidence-cloudflare-resolver/src/lib.rs | 144 +++++++++++----------- 1 file changed, 69 insertions(+), 75 deletions(-) diff --git a/confidence-cloudflare-resolver/src/lib.rs b/confidence-cloudflare-resolver/src/lib.rs index 28832532..b2005059 100644 --- a/confidence-cloudflare-resolver/src/lib.rs +++ b/confidence-cloudflare-resolver/src/lib.rs @@ -1,7 +1,8 @@ use confidence_resolver::{ - assign_logger, flag_logger, + assign_logger::AssignLogger, + flag_logger, proto::{confidence, google::Struct}, - resolve_logger, + resolve_logger::ResolveLogger, telemetry::{self, TelemetrySnapshot}, FlagToApply, Host, ResolvedValue, ResolverState, }; @@ -13,7 +14,6 @@ use bytes::Bytes; use prost::Message; use serde_json::from_slice; use serde_json::json; -use std::cell::RefCell; use wasm_bindgen::JsCast; use confidence::flags::resolver::v1::{ApplyFlagsRequest, ApplyFlagsResponse, ResolveFlagsRequest}; @@ -39,12 +39,10 @@ const CDN_STATE_BYTES: &[u8] = include_bytes!("../../data/resolver_state_current const ENCRYPTION_KEY_BASE64: &str = include_str!("../../data/encryption_key"); use confidence::flags::resolver::v1::Sdk; -use confidence_resolver::proto::confidence::flags::resolver::v1::WriteFlagLogsRequest; -use std::sync::OnceLock; - -thread_local! { - static FLAG_LOG: RefCell> = const { RefCell::new(None) }; -} +use confidence_resolver::proto::confidence::flags::resolver::v1::{ + TelemetryData, WriteFlagLogsRequest, +}; +use std::sync::{LazyLock, Mutex, OnceLock}; /// Prometheus exposition format content type (version 0.0.4). const PROMETHEUS_CONTENT_TYPE: &str = "text/plain; version=0.0.4; charset=utf-8"; @@ -53,6 +51,10 @@ static FLAGS_LOGS_QUEUE: OnceLock = OnceLock::new(); static CONFIDENCE_CLIENT_SECRET: OnceLock = OnceLock::new(); +static RESOLVE_LOGGER: LazyLock> = LazyLock::new(ResolveLogger::new); +static ASSIGN_LOGGER: LazyLock = LazyLock::new(AssignLogger::new); +static TELEMETRY_LOG: Mutex> = Mutex::new(None); + /// Parsed CDN state request containing both state and account_id static CDN_STATE_REQUEST: Lazy = Lazy::new(|| { SetResolverStateRequest::decode(Bytes::from_static(CDN_STATE_BYTES)) @@ -79,22 +81,18 @@ struct H {} impl Host for H { fn log_resolve( - _resolve_id: &str, + resolve_id: &str, evaluation_context: &Struct, values: &[ResolvedValue<'_>], client: &Client, ) { - FLAG_LOG.with(|f| { - if let Some(req) = f.borrow_mut().as_mut() { - let (flag_infos, client_info) = resolve_logger::build_resolve_log( - evaluation_context, - client.client_credential_name.as_str(), - values, - ); - req.flag_resolve_info.extend(flag_infos); - req.client_resolve_info.push(client_info); - } - }); + RESOLVE_LOGGER.log_resolve( + resolve_id, + evaluation_context, + client.client_credential_name.as_str(), + values, + client, + ); } fn log_assign( @@ -103,17 +101,7 @@ impl Host for H { client: &Client, sdk: &Option, ) { - FLAG_LOG.with(|f| { - if let Some(req) = f.borrow_mut().as_mut() { - req.flag_assigned - .push(assign_logger::build_flag_assigned( - resolve_id, - assigned_flags, - client, - sdk, - )); - } - }); + ASSIGN_LOGGER.log_assigns(resolve_id, assigned_flags, client, sdk); } } @@ -134,6 +122,47 @@ fn sdk_info() -> Sdk { } } +fn accumulate_telemetry(td: TelemetryData) { + if let Ok(mut guard) = TELEMETRY_LOG.lock() { + match guard.as_mut() { + Some(acc) => { + match (&mut acc.resolve_latency, td.resolve_latency) { + (Some(a), Some(d)) => { + a.sum = a.sum.wrapping_add(d.sum); + a.count = a.count.wrapping_add(d.count); + a.buckets.extend(d.buckets); + } + (None, some) => acc.resolve_latency = some, + _ => {} + } + for dr in td.resolve_rate { + if let Some(ar) = acc.resolve_rate.iter_mut().find(|r| r.reason == dr.reason) { + ar.count = ar.count.wrapping_add(dr.count); + } else { + acc.resolve_rate.push(dr); + } + } + if !td.resolver_version.is_empty() { + acc.resolver_version = td.resolver_version; + } + } + None => *guard = Some(td), + } + } +} + +fn checkpoint() -> WriteFlagLogsRequest { + let mut req = RESOLVE_LOGGER.checkpoint(); + ASSIGN_LOGGER.checkpoint_fill(&mut req); + if let Ok(mut guard) = TELEMETRY_LOG.lock() { + if let Some(mut td) = guard.take() { + td.sdk = Some(sdk_info()); + req.telemetry_data = Some(td); + } + } + req +} + #[event(fetch)] pub async fn main(req: Request, env: Env, ctx: Context) -> Result { match env.queue("flag_logs_queue") { @@ -150,15 +179,13 @@ pub async fn main(req: Request, env: Env, ctx: Context) -> Result { let allowed_origin_env = env .var("ALLOWED_ORIGIN") .map(|var| var.to_string()) - .unwrap_or("*".to_string()); // Fallback to "*" if the variable is not set + .unwrap_or("*".to_string()); - // Optional env var containing the resolver state ETag for this deployment let state_etag_env = env .var("RESOLVER_STATE_ETAG") .map(|var| var.to_string()) .unwrap_or_default(); - // Optional env var containing the confidence-resolver commit used for this deployment let resolver_version_env = env .var("DEPLOYER_VERSION") .map(|var| var.to_string()) @@ -175,7 +202,6 @@ pub async fn main(req: Request, env: Env, ctx: Context) -> Result { .get_async("/metrics", |req, ctx| { let allowed_origin = allowed_origin_env.clone(); async move { - // Require client secret — metrics are not public. if let Some(expected) = CONFIDENCE_CLIENT_SECRET.get() { let authorized = req.headers().get("Authorization").ok().flatten() .map(|v| v.strip_prefix("ClientSecret ").unwrap_or("") == expected.as_str()) @@ -196,7 +222,6 @@ pub async fn main(req: Request, env: Env, ctx: Context) -> Result { Response::ok(body)?.with_headers(headers).with_cors_headers(&allowed_origin) } }) - // GET endpoint to expose the current deployment state etag and resolver version .get_async("/v1/state:etag", |_req, _ctx| { let allowed_origin = allowed_origin_env.clone(); let etag_value = state_etag_env.clone(); @@ -209,8 +234,6 @@ pub async fn main(req: Request, env: Env, ctx: Context) -> Result { Response::from_json(&body)?.with_cors_headers(&allowed_origin) } }) - // Router treats ":name" as parameters, which is incompatible without URLs - // so we use "*path" to match the whole path and do the matching in the handler .post_async("/v1/*path", |mut req, ctx| { let allowed_origin = allowed_origin_env.clone(); async move { @@ -237,11 +260,6 @@ pub async fn main(req: Request, env: Env, ctx: Context) -> Result { let t0 = js_sys::Date::now(); - // FLAG_LOG is set right before the synchronous resolve - // and taken right after — no await points in between, - // so concurrent async requests cannot interleave. - FLAG_LOG.with(|f| *f.borrow_mut() = Some(WriteFlagLogsRequest::default())); - let (reasons, resp) = match state.get_resolver::( &resolver_request.client_secret, evaluation_context, @@ -287,12 +305,6 @@ pub async fn main(req: Request, env: Env, ctx: Context) -> Result { } }; - // Take into a local immediately — everything above - // is synchronous, so no other request could interleave. - let mut request_log = FLAG_LOG.with(|f| f.borrow_mut().take()); - - // scheduler.wait(0) yields to the runtime; safe because - // request_log is a local, not in the thread_local. let elapsed_us = { let scheduler = js_sys::Reflect::get( &js_sys::global(), &wasm_bindgen::JsValue::from_str("scheduler") @@ -314,15 +326,9 @@ pub async fn main(req: Request, env: Env, ctx: Context) -> Result { } }; - if let Some(ref mut req) = request_log { - let mut td = telemetry::build_request_telemetry(elapsed_us, &reasons); - td.sdk = Some(sdk_info()); - req.telemetry_data = Some(td); - } - - // Put back for wait_until — no await between here - // and handler return, so no interleaving possible. - FLAG_LOG.with(|f| *f.borrow_mut() = request_log); + accumulate_telemetry( + telemetry::build_request_telemetry(elapsed_us, &reasons), + ); resp } @@ -339,8 +345,6 @@ pub async fn main(req: Request, env: Env, ctx: Context) -> Result { } }; - FLAG_LOG.with(|f| *f.borrow_mut() = Some(WriteFlagLogsRequest::default())); - match state.get_resolver::( &apply_flag_req.client_secret, Struct::default(), @@ -364,14 +368,11 @@ pub async fn main(req: Request, env: Env, ctx: Context) -> Result { .run(req, env) .await; - // Use ctx.waitUntil to run logging and telemetry after response is returned. - let flag_log = FLAG_LOG.with(|f| f.borrow_mut().take()); ctx.wait_until(async move { - if let Some(req) = flag_log { - if let Ok(json) = serde_json::to_string(&req) { - if let Some(queue) = FLAGS_LOGS_QUEUE.get() { - let _ = queue.send(json).await; - } + let req = checkpoint(); + if let Ok(json) = serde_json::to_string(&req) { + if let Some(queue) = FLAGS_LOGS_QUEUE.get() { + let _ = queue.send(json).await; } } }); @@ -396,7 +397,6 @@ pub async fn consume_flag_logs_queue( let req = flag_logger::aggregate_batch(logs); - // Accumulate telemetry deltas into KV-backed cumulative snapshot for /metrics. if let Ok(kv) = env.kv("CONFIDENCE_METRICS_KV") { update_prometheus_kv(&kv, &req).await; } @@ -407,12 +407,6 @@ pub async fn consume_flag_logs_queue( Ok(()) } -/// Accumulate telemetry deltas from all isolates into a cumulative -/// `TelemetrySnapshot` stored in KV, then write its Prometheus text -/// representation for the /metrics endpoint. -/// -/// Note: concurrent queue consumer invocations can race on KV read-modify-write. -/// Acceptable for metrics — at worst one batch's deltas are lost, not cumulative state. async fn update_prometheus_kv(kv: &kv::KvStore, req: &WriteFlagLogsRequest) { let mut cumulative = match kv.get("snapshot").text().await { Ok(Some(text)) => serde_json::from_str::(&text).unwrap_or_default(),