Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
134 changes: 69 additions & 65 deletions confidence-cloudflare-resolver/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
use confidence_resolver::{
assign_logger, flag_logger,
assign_logger::AssignLogger,
flag_logger,
proto::{confidence, google::Struct},
resolve_logger,
resolve_logger::ResolveLogger,
telemetry::{self, TelemetrySnapshot},
FlagToApply, Host, ResolvedValue, ResolverState,
};
Expand All @@ -13,7 +14,6 @@ use bytes::Bytes;
use prost::Message;
use serde_json::from_slice;
use serde_json::json;
use std::cell::RefCell;
use wasm_bindgen::JsCast;

use confidence::flags::resolver::v1::{ApplyFlagsRequest, ApplyFlagsResponse, ResolveFlagsRequest};
Expand All @@ -39,12 +39,10 @@ const CDN_STATE_BYTES: &[u8] = include_bytes!("../../data/resolver_state_current
const ENCRYPTION_KEY_BASE64: &str = include_str!("../../data/encryption_key");

use confidence::flags::resolver::v1::Sdk;
use confidence_resolver::proto::confidence::flags::resolver::v1::WriteFlagLogsRequest;
use std::sync::OnceLock;

thread_local! {
static FLAG_LOG: RefCell<Option<WriteFlagLogsRequest>> = const { RefCell::new(None) };
}
use confidence_resolver::proto::confidence::flags::resolver::v1::{
TelemetryData, WriteFlagLogsRequest,
};
use std::sync::{LazyLock, Mutex, OnceLock};

/// Prometheus exposition format content type (version 0.0.4).
const PROMETHEUS_CONTENT_TYPE: &str = "text/plain; version=0.0.4; charset=utf-8";
Expand All @@ -53,6 +51,10 @@ static FLAGS_LOGS_QUEUE: OnceLock<Queue> = OnceLock::new();

static CONFIDENCE_CLIENT_SECRET: OnceLock<String> = OnceLock::new();

static RESOLVE_LOGGER: LazyLock<ResolveLogger<H>> = LazyLock::new(ResolveLogger::new);
static ASSIGN_LOGGER: LazyLock<AssignLogger> = LazyLock::new(AssignLogger::new);
static TELEMETRY_LOG: Mutex<Option<TelemetryData>> = Mutex::new(None);

/// Parsed CDN state request containing both state and account_id
static CDN_STATE_REQUEST: Lazy<SetResolverStateRequest> = Lazy::new(|| {
SetResolverStateRequest::decode(Bytes::from_static(CDN_STATE_BYTES))
Expand All @@ -79,22 +81,18 @@ struct H {}

impl Host for H {
fn log_resolve(
_resolve_id: &str,
resolve_id: &str,
evaluation_context: &Struct,
values: &[ResolvedValue<'_>],
client: &Client,
) {
FLAG_LOG.with(|f| {
if let Some(req) = f.borrow_mut().as_mut() {
let (flag_infos, client_info) = resolve_logger::build_resolve_log(
evaluation_context,
client.client_credential_name.as_str(),
values,
);
req.flag_resolve_info.extend(flag_infos);
req.client_resolve_info.push(client_info);
}
});
RESOLVE_LOGGER.log_resolve(
resolve_id,
evaluation_context,
client.client_credential_name.as_str(),
values,
client,
);
}

fn log_assign(
Expand All @@ -103,17 +101,7 @@ impl Host for H {
client: &Client,
sdk: &Option<Sdk>,
) {
FLAG_LOG.with(|f| {
if let Some(req) = f.borrow_mut().as_mut() {
req.flag_assigned
.push(assign_logger::build_flag_assigned(
resolve_id,
assigned_flags,
client,
sdk,
));
}
});
ASSIGN_LOGGER.log_assigns(resolve_id, assigned_flags, client, sdk);
}
}

Expand All @@ -134,6 +122,47 @@ fn sdk_info() -> Sdk {
}
}

fn accumulate_telemetry(td: TelemetryData) {
if let Ok(mut guard) = TELEMETRY_LOG.lock() {
match guard.as_mut() {
Some(acc) => {
match (&mut acc.resolve_latency, td.resolve_latency) {
(Some(a), Some(d)) => {
a.sum = a.sum.wrapping_add(d.sum);
a.count = a.count.wrapping_add(d.count);
a.buckets.extend(d.buckets);
}
(None, some) => acc.resolve_latency = some,
_ => {}
}
for dr in td.resolve_rate {
if let Some(ar) = acc.resolve_rate.iter_mut().find(|r| r.reason == dr.reason) {
ar.count = ar.count.wrapping_add(dr.count);
} else {
acc.resolve_rate.push(dr);
}
}
if !td.resolver_version.is_empty() {
acc.resolver_version = td.resolver_version;
}
}
None => *guard = Some(td),
}
}
}

fn checkpoint() -> WriteFlagLogsRequest {
let mut req = RESOLVE_LOGGER.checkpoint();
ASSIGN_LOGGER.checkpoint_fill(&mut req);
if let Ok(mut guard) = TELEMETRY_LOG.lock() {
if let Some(mut td) = guard.take() {
td.sdk = Some(sdk_info());
req.telemetry_data = Some(td);
}
}
req
}

#[event(fetch)]
pub async fn main(req: Request, env: Env, ctx: Context) -> Result<Response> {
match env.queue("flag_logs_queue") {
Expand All @@ -150,15 +179,13 @@ pub async fn main(req: Request, env: Env, ctx: Context) -> Result<Response> {
let allowed_origin_env = env
.var("ALLOWED_ORIGIN")
.map(|var| var.to_string())
.unwrap_or("*".to_string()); // Fallback to "*" if the variable is not set
.unwrap_or("*".to_string());

// Optional env var containing the resolver state ETag for this deployment
let state_etag_env = env
.var("RESOLVER_STATE_ETAG")
.map(|var| var.to_string())
.unwrap_or_default();

// Optional env var containing the confidence-resolver commit used for this deployment
let resolver_version_env = env
.var("DEPLOYER_VERSION")
.map(|var| var.to_string())
Expand All @@ -175,7 +202,6 @@ pub async fn main(req: Request, env: Env, ctx: Context) -> Result<Response> {
.get_async("/metrics", |req, ctx| {
let allowed_origin = allowed_origin_env.clone();
async move {
// Require client secret — metrics are not public.
if let Some(expected) = CONFIDENCE_CLIENT_SECRET.get() {
let authorized = req.headers().get("Authorization").ok().flatten()
.map(|v| v.strip_prefix("ClientSecret ").unwrap_or("") == expected.as_str())
Expand All @@ -196,7 +222,6 @@ pub async fn main(req: Request, env: Env, ctx: Context) -> Result<Response> {
Response::ok(body)?.with_headers(headers).with_cors_headers(&allowed_origin)
}
})
// GET endpoint to expose the current deployment state etag and resolver version
.get_async("/v1/state:etag", |_req, _ctx| {
let allowed_origin = allowed_origin_env.clone();
let etag_value = state_etag_env.clone();
Expand All @@ -209,15 +234,12 @@ pub async fn main(req: Request, env: Env, ctx: Context) -> Result<Response> {
Response::from_json(&body)?.with_cors_headers(&allowed_origin)
}
})
// Router treats ":name" as parameters, which is incompatible without URLs
// so we use "*path" to match the whole path and do the matching in the handler
.post_async("/v1/*path", |mut req, ctx| {
let allowed_origin = allowed_origin_env.clone();
async move {
let path = ctx.param("path").unwrap();
match path.as_str() {
"flags:resolve" => {
FLAG_LOG.with(|f| *f.borrow_mut() = Some(WriteFlagLogsRequest::default()));
let body_bytes: Vec<u8> = req.bytes().await?;
let mut resolver_request: ResolveFlagsRequest =
match from_slice(&body_bytes) {
Expand All @@ -230,15 +252,12 @@ pub async fn main(req: Request, env: Env, ctx: Context) -> Result<Response> {
.with_cors_headers(&allowed_origin);
}
};
// Default apply to true for Cloudflare resolver
resolver_request.apply = true;
let evaluation_context = resolver_request
.evaluation_context
.clone()
.unwrap_or_default();

// Start timer before resolve. CF Workers freeze timers
// during sync CPU, but scheduler.wait(0) unfreezes them.
let t0 = js_sys::Date::now();

let (reasons, resp) = match state.get_resolver::<H>(
Expand Down Expand Up @@ -307,18 +326,13 @@ pub async fn main(req: Request, env: Env, ctx: Context) -> Result<Response> {
}
};

let mut td = telemetry::build_request_telemetry(elapsed_us, &reasons);
td.sdk = Some(sdk_info());
FLAG_LOG.with(|f| {
if let Some(req) = f.borrow_mut().as_mut() {
req.telemetry_data = Some(td);
}
});
accumulate_telemetry(
telemetry::build_request_telemetry(elapsed_us, &reasons),
);

resp
}
"flags:apply" => {
FLAG_LOG.with(|f| *f.borrow_mut() = Some(WriteFlagLogsRequest::default()));
let body_bytes: Vec<u8> = req.bytes().await?;
let apply_flag_req: ApplyFlagsRequest = match from_slice(&body_bytes) {
Ok(req) => req,
Expand Down Expand Up @@ -354,14 +368,11 @@ pub async fn main(req: Request, env: Env, ctx: Context) -> Result<Response> {
.run(req, env)
.await;

// Use ctx.waitUntil to run logging and telemetry after response is returned.
let flag_log = FLAG_LOG.with(|f| f.borrow_mut().take());
ctx.wait_until(async move {
if let Some(req) = flag_log {
if let Ok(json) = serde_json::to_string(&req) {
if let Some(queue) = FLAGS_LOGS_QUEUE.get() {
let _ = queue.send(json).await;
}
let req = checkpoint();
if let Ok(json) = serde_json::to_string(&req) {
if let Some(queue) = FLAGS_LOGS_QUEUE.get() {
let _ = queue.send(json).await;
}
}
});
Expand All @@ -386,7 +397,6 @@ pub async fn consume_flag_logs_queue(

let req = flag_logger::aggregate_batch(logs);

// Accumulate telemetry deltas into KV-backed cumulative snapshot for /metrics.
if let Ok(kv) = env.kv("CONFIDENCE_METRICS_KV") {
update_prometheus_kv(&kv, &req).await;
}
Expand All @@ -397,12 +407,6 @@ pub async fn consume_flag_logs_queue(
Ok(())
}

/// Accumulate telemetry deltas from all isolates into a cumulative
/// `TelemetrySnapshot` stored in KV, then write its Prometheus text
/// representation for the /metrics endpoint.
///
/// Note: concurrent queue consumer invocations can race on KV read-modify-write.
/// Acceptable for metrics — at worst one batch's deltas are lost, not cumulative state.
async fn update_prometheus_kv(kv: &kv::KvStore, req: &WriteFlagLogsRequest) {
let mut cumulative = match kv.get("snapshot").text().await {
Ok(Some(text)) => serde_json::from_str::<TelemetrySnapshot>(&text).unwrap_or_default(),
Expand Down