From ac29552fc3cf946bd04b05181f0016f722d0adb5 Mon Sep 17 00:00:00 2001 From: GordonYuanyc Date: Mon, 11 May 2026 20:52:30 +0000 Subject: [PATCH 1/7] refactor: bump asap_sketchlib pin & migrate to new module layout MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit asap_sketchlib reorganized in the refactor/module-restructure branch: - Wire-format-aligned types (CountMinSketch, CountSketch, KllSketch, HydraKllSketch, SetAggregator, DeltaResult, CmsHeapItem, etc.) moved out of `sketches::*` submodules and now live in `wrapper::*`, re-exported at the crate root. - `sketches::delta_set_aggregator::{serialize,deserialize}_msgpack` shims were dropped in favor of the `MessagePackCodec` trait on `DeltaResult`. Changes: - Pin asap_sketchlib to the refactor/module-restructure branch. - Switch all `use asap_sketchlib::sketches::*` imports to the crate-root re-exports (`use asap_sketchlib::CountMinSketch`, etc.). - `delta_set_aggregator_accumulator` uses `DeltaResult::{from_msgpack,to_msgpack}` directly through the `MessagePackCodec` trait instead of the removed module-level shims. No behavior change — the wire format and runtime semantics of the underlying sketches are identical (locked in by sketchlib-go parity goldens in asap_sketchlib's tests/sketches_go_parity_probe.rs). Co-Authored-By: Claude Opus 4.7 (1M context) --- Cargo.lock | 49 +++++-------------- asap-query-engine/Cargo.toml | 2 +- .../src/precompute_engine/worker.rs | 2 +- .../count_min_sketch_accumulator.rs | 2 +- .../count_min_sketch_with_heap_accumulator.rs | 4 +- .../datasketches_kll_accumulator.rs | 2 +- .../delta_set_aggregator_accumulator.rs | 9 ++-- .../hydra_kll_accumulator.rs | 2 +- .../set_aggregator_accumulator.rs | 2 +- .../tests/e2e_precompute_equivalence.rs | 2 +- 10 files changed, 27 insertions(+), 49 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index e6f6925..7278e52 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -412,7 +412,7 @@ dependencies = [ [[package]] name = "asap_sketchlib" version = "0.1.0" -source = "git+https://github.com/ProjectASAP/asap_sketchlib#81c3436dde44cc587c098d42bf42db77acdb4fa5" +source = "git+https://github.com/ProjectASAP/asap_sketchlib?branch=refactor%2Fmodule-restructure#56494c2d7227d346db92f9103783edf379d3f61e" dependencies = [ "bytes", "prost", @@ -1435,7 +1435,7 @@ dependencies = [ "itertools 0.13.0", "log", "paste", - "petgraph 0.6.5", + "petgraph", ] [[package]] @@ -1618,7 +1618,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "39cab71617ae0d63f51a36d69f866391735b51691dbda63cf6f96d042b63efeb" dependencies = [ "libc", - "windows-sys 0.61.2", + "windows-sys 0.52.0", ] [[package]] @@ -1675,12 +1675,6 @@ version = "0.4.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0ce7134b9999ecaf8bcd65542e436736ef32ddca1b3e06094cb6ec5755203b80" -[[package]] -name = "fixedbitset" -version = "0.5.7" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1d674e81391d1e1ab681a28d99df07927c6d4aa5b027d7da16ba32d1d21ecd99" - [[package]] name = "flatbuffers" version = "24.12.23" @@ -2373,7 +2367,7 @@ checksum = "3640c1c38b8e4e43584d8df18be5fc6b0aa314ce6ebf51b53313d4306cca8e46" dependencies = [ "hermit-abi 0.5.2", "libc", - "windows-sys 0.61.2", + "windows-sys 0.52.0", ] [[package]] @@ -2400,15 +2394,6 @@ dependencies = [ "either", ] -[[package]] -name = "itertools" -version = "0.14.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2b192c782037fadd9cfa75548310488aabdbf3d2da73885b31bd0abd03351285" -dependencies = [ - "either", -] - [[package]] name = "itoa" version = "1.0.18" @@ -2427,7 +2412,7 @@ dependencies = [ "portable-atomic", "portable-atomic-util", "serde_core", - "windows-sys 0.61.2", + "windows-sys 0.52.0", ] [[package]] @@ -3152,17 +3137,7 @@ version = "0.6.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b4c5cc86750666a3ed20bdaf5ca2a0344f9c67674cae0515bec2da16fbaa47db" dependencies = [ - "fixedbitset 0.4.2", - "indexmap 2.14.0", -] - -[[package]] -name = "petgraph" -version = "0.7.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3672b37090dbd86368a4145bc067582552b29c27377cad4e0a306c97f9bd7772" -dependencies = [ - "fixedbitset 0.5.7", + "fixedbitset", "indexmap 2.14.0", ] @@ -3424,11 +3399,11 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "be769465445e8c1474e9c5dac2018218498557af32d9ed057325ec9a41ae81bf" dependencies = [ "heck 0.5.0", - "itertools 0.14.0", + "itertools 0.10.5", "log", "multimap", "once_cell", - "petgraph 0.7.1", + "petgraph", "prettyplease", "prost", "prost-types", @@ -3444,7 +3419,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8a56d757972c98b346a9b766e3f02746cde6dd1cd1d1d563472929fdd74bec4d" dependencies = [ "anyhow", - "itertools 0.14.0", + "itertools 0.10.5", "proc-macro2", "quote", "syn 2.0.117", @@ -3887,7 +3862,7 @@ dependencies = [ "errno", "libc", "linux-raw-sys", - "windows-sys 0.61.2", + "windows-sys 0.52.0", ] [[package]] @@ -4389,7 +4364,7 @@ dependencies = [ "getrandom 0.4.2", "once_cell", "rustix", - "windows-sys 0.61.2", + "windows-sys 0.52.0", ] [[package]] @@ -5105,7 +5080,7 @@ version = "0.1.11" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c2a7b1c03c876122aa43f3020e6c3c3ee5c05081c9a00739faf7503aeba10d22" dependencies = [ - "windows-sys 0.61.2", + "windows-sys 0.48.0", ] [[package]] diff --git a/asap-query-engine/Cargo.toml b/asap-query-engine/Cargo.toml index d2886c3..484f5d2 100644 --- a/asap-query-engine/Cargo.toml +++ b/asap-query-engine/Cargo.toml @@ -59,7 +59,7 @@ figment = { version = "0.10", features = ["yaml"] } arc-swap = "1" csv = "1" elastic_dsl_utilities.workspace = true -asap_sketchlib = { git = "https://github.com/ProjectASAP/asap_sketchlib" } +asap_sketchlib = { git = "https://github.com/ProjectASAP/asap_sketchlib", branch = "refactor/module-restructure" } [[bin]] name = "precompute_engine" diff --git a/asap-query-engine/src/precompute_engine/worker.rs b/asap-query-engine/src/precompute_engine/worker.rs index 02c330b..37fc8f3 100644 --- a/asap-query-engine/src/precompute_engine/worker.rs +++ b/asap-query-engine/src/precompute_engine/worker.rs @@ -776,7 +776,7 @@ mod tests { use crate::precompute_operators::datasketches_kll_accumulator::DatasketchesKLLAccumulator; use crate::precompute_operators::multiple_sum_accumulator::MultipleSumAccumulator; use crate::precompute_operators::sum_accumulator::SumAccumulator; - use asap_sketchlib::sketches::kll::KllSketch; + use asap_sketchlib::KllSketch; use asap_types::enums::{AggregationType, WindowType}; fn make_agg_config( diff --git a/asap-query-engine/src/precompute_operators/count_min_sketch_accumulator.rs b/asap-query-engine/src/precompute_operators/count_min_sketch_accumulator.rs index 6840cb5..1531566 100644 --- a/asap-query-engine/src/precompute_operators/count_min_sketch_accumulator.rs +++ b/asap-query-engine/src/precompute_operators/count_min_sketch_accumulator.rs @@ -2,7 +2,7 @@ use crate::data_model::{ AggregateCore, AggregationType, KeyByLabelValues, MergeableAccumulator, MultipleSubpopulationAggregate, SerializableToSink, }; -use asap_sketchlib::sketches::countminsketch::CountMinSketch; +use asap_sketchlib::CountMinSketch; use serde_json::Value; use std::collections::HashMap; diff --git a/asap-query-engine/src/precompute_operators/count_min_sketch_with_heap_accumulator.rs b/asap-query-engine/src/precompute_operators/count_min_sketch_with_heap_accumulator.rs index c96cde9..476c5bc 100644 --- a/asap-query-engine/src/precompute_operators/count_min_sketch_with_heap_accumulator.rs +++ b/asap-query-engine/src/precompute_operators/count_min_sketch_with_heap_accumulator.rs @@ -2,7 +2,7 @@ use crate::data_model::{ AggregateCore, AggregationType, KeyByLabelValues, MergeableAccumulator, MultipleSubpopulationAggregate, SerializableToSink, }; -use asap_sketchlib::sketches::countminsketch_topk::{CmsHeapItem, CountMinSketchWithHeap}; +use asap_sketchlib::{CmsHeapItem, CountMinSketchWithHeap}; use serde_json::Value; use std::collections::HashMap; @@ -17,7 +17,7 @@ pub struct CountMinSketchWithHeapAccumulator { } // Re-export HeapItem so existing code using CountMinSketchWithHeapAccumulator::HeapItem still works. -pub use asap_sketchlib::sketches::countminsketch_topk::CmsHeapItem as HeapItemReexport; +pub use asap_sketchlib::CmsHeapItem as HeapItemReexport; impl CountMinSketchWithHeapAccumulator { pub fn new(row_num: usize, col_num: usize, heap_size: usize) -> Self { diff --git a/asap-query-engine/src/precompute_operators/datasketches_kll_accumulator.rs b/asap-query-engine/src/precompute_operators/datasketches_kll_accumulator.rs index 33e085d..4f547b4 100644 --- a/asap-query-engine/src/precompute_operators/datasketches_kll_accumulator.rs +++ b/asap-query-engine/src/precompute_operators/datasketches_kll_accumulator.rs @@ -2,7 +2,7 @@ use crate::data_model::{ AggregateCore, AggregationType, MergeableAccumulator, SerializableToSink, SingleSubpopulationAggregate, }; -use asap_sketchlib::sketches::kll::KllSketch; +use asap_sketchlib::KllSketch; use base64::{engine::general_purpose, Engine as _}; use serde_json::Value; use std::collections::HashMap; diff --git a/asap-query-engine/src/precompute_operators/delta_set_aggregator_accumulator.rs b/asap-query-engine/src/precompute_operators/delta_set_aggregator_accumulator.rs index f323426..8261988 100644 --- a/asap-query-engine/src/precompute_operators/delta_set_aggregator_accumulator.rs +++ b/asap-query-engine/src/precompute_operators/delta_set_aggregator_accumulator.rs @@ -2,7 +2,8 @@ use crate::data_model::{ AggregateCore, AggregationType, KeyByLabelValues, MergeableAccumulator, MultipleSubpopulationAggregate, SerializableToSink, }; -use asap_sketchlib::sketches::delta_set_aggregator::{deserialize_msgpack, serialize_msgpack}; +use asap_sketchlib::DeltaResult; +use asap_sketchlib::message_pack_format::MessagePackCodec; use serde_json::Value; use std::collections::{HashMap, HashSet}; @@ -153,7 +154,7 @@ impl DeltaSetAggregatorAccumulator { buffer: &[u8], ) -> Result> { // Delegate to sketch-core canonical DeltaResult msgpack format - let delta = deserialize_msgpack(buffer) + let delta = DeltaResult::from_msgpack(buffer) .map_err(|e| -> Box { e.to_string().into() })?; let mut added = HashSet::new(); @@ -203,7 +204,9 @@ impl SerializableToSink for DeltaSetAggregatorAccumulator { .iter() .map(|key| key.to_semicolon_str()) .collect(); - serialize_msgpack(&added, &removed).unwrap_or_default() + DeltaResult { added, removed } + .to_msgpack() + .unwrap_or_default() } } diff --git a/asap-query-engine/src/precompute_operators/hydra_kll_accumulator.rs b/asap-query-engine/src/precompute_operators/hydra_kll_accumulator.rs index f33012d..4df1d8b 100644 --- a/asap-query-engine/src/precompute_operators/hydra_kll_accumulator.rs +++ b/asap-query-engine/src/precompute_operators/hydra_kll_accumulator.rs @@ -5,7 +5,7 @@ use crate::{ }, KeyByLabelValues, }; -use asap_sketchlib::sketches::hydra_kll::HydraKllSketch; +use asap_sketchlib::HydraKllSketch; use base64::{engine::general_purpose, Engine as _}; use std::collections::HashMap; diff --git a/asap-query-engine/src/precompute_operators/set_aggregator_accumulator.rs b/asap-query-engine/src/precompute_operators/set_aggregator_accumulator.rs index 45b74d5..e176ca9 100644 --- a/asap-query-engine/src/precompute_operators/set_aggregator_accumulator.rs +++ b/asap-query-engine/src/precompute_operators/set_aggregator_accumulator.rs @@ -2,7 +2,7 @@ use crate::data_model::{ AggregateCore, AggregationType, KeyByLabelValues, MergeableAccumulator, MultipleSubpopulationAggregate, SerializableToSink, }; -use asap_sketchlib::sketches::set_aggregator::SetAggregator; +use asap_sketchlib::SetAggregator; use serde_json::Value; use std::collections::{HashMap, HashSet}; diff --git a/asap-query-engine/tests/e2e_precompute_equivalence.rs b/asap-query-engine/tests/e2e_precompute_equivalence.rs index b8c6953..888ef5b 100644 --- a/asap-query-engine/tests/e2e_precompute_equivalence.rs +++ b/asap-query-engine/tests/e2e_precompute_equivalence.rs @@ -7,7 +7,7 @@ //! 3. Advances the watermark past the window boundary to close it //! 4. Drains captured outputs and verifies equivalence with wire-format accumulators -use asap_sketchlib::sketches::kll::KllSketch; +use asap_sketchlib::KllSketch; use asap_types::aggregation_config::AggregationConfig; use asap_types::enums::{AggregationType, WindowType}; use flate2::{write::GzEncoder, Compression}; From e04a931308f9a1b912bca724f4e93401289c320e Mon Sep 17 00:00:00 2001 From: GordonYuanyc Date: Mon, 11 May 2026 21:01:51 +0000 Subject: [PATCH 2/7] fix API change for cargo build --- .../precompute_operators/count_min_sketch_accumulator.rs | 5 +++-- .../count_min_sketch_with_heap_accumulator.rs | 5 +++-- .../precompute_operators/datasketches_kll_accumulator.rs | 5 +++-- .../src/precompute_operators/hydra_kll_accumulator.rs | 7 ++++--- .../src/precompute_operators/set_aggregator_accumulator.rs | 5 +++-- 5 files changed, 16 insertions(+), 11 deletions(-) diff --git a/asap-query-engine/src/precompute_operators/count_min_sketch_accumulator.rs b/asap-query-engine/src/precompute_operators/count_min_sketch_accumulator.rs index 1531566..ccde4c7 100644 --- a/asap-query-engine/src/precompute_operators/count_min_sketch_accumulator.rs +++ b/asap-query-engine/src/precompute_operators/count_min_sketch_accumulator.rs @@ -3,6 +3,7 @@ use crate::data_model::{ MultipleSubpopulationAggregate, SerializableToSink, }; use asap_sketchlib::CountMinSketch; +use asap_sketchlib::message_pack_format::MessagePackCodec; use serde_json::Value; use std::collections::HashMap; @@ -64,7 +65,7 @@ impl CountMinSketchAccumulator { buffer: &[u8], ) -> Result> { Ok(Self { - inner: CountMinSketch::deserialize_msgpack(buffer) + inner: CountMinSketch::from_msgpack(buffer) .map_err(|e| -> Box { e.to_string().into() })?, }) } @@ -166,7 +167,7 @@ impl SerializableToSink for CountMinSketchAccumulator { } fn serialize_to_bytes(&self) -> Vec { - self.inner.serialize_msgpack().unwrap_or_default() + self.inner.to_msgpack().unwrap_or_default() } } diff --git a/asap-query-engine/src/precompute_operators/count_min_sketch_with_heap_accumulator.rs b/asap-query-engine/src/precompute_operators/count_min_sketch_with_heap_accumulator.rs index 476c5bc..119bbc3 100644 --- a/asap-query-engine/src/precompute_operators/count_min_sketch_with_heap_accumulator.rs +++ b/asap-query-engine/src/precompute_operators/count_min_sketch_with_heap_accumulator.rs @@ -2,6 +2,7 @@ use crate::data_model::{ AggregateCore, AggregationType, KeyByLabelValues, MergeableAccumulator, MultipleSubpopulationAggregate, SerializableToSink, }; +use asap_sketchlib::message_pack_format::MessagePackCodec; use asap_sketchlib::{CmsHeapItem, CountMinSketchWithHeap}; use serde_json::Value; use std::collections::HashMap; @@ -85,7 +86,7 @@ impl CountMinSketchWithHeapAccumulator { buffer: &[u8], ) -> Result> { Ok(Self { - inner: CountMinSketchWithHeap::deserialize_msgpack(buffer) + inner: CountMinSketchWithHeap::from_msgpack(buffer) .map_err(|e| -> Box { e.to_string().into() })?, }) } @@ -131,7 +132,7 @@ impl SerializableToSink for CountMinSketchWithHeapAccumulator { } fn serialize_to_bytes(&self) -> Vec { - self.inner.serialize_msgpack().unwrap_or_default() + self.inner.to_msgpack().unwrap_or_default() } } diff --git a/asap-query-engine/src/precompute_operators/datasketches_kll_accumulator.rs b/asap-query-engine/src/precompute_operators/datasketches_kll_accumulator.rs index 4f547b4..49df517 100644 --- a/asap-query-engine/src/precompute_operators/datasketches_kll_accumulator.rs +++ b/asap-query-engine/src/precompute_operators/datasketches_kll_accumulator.rs @@ -3,6 +3,7 @@ use crate::data_model::{ SingleSubpopulationAggregate, }; use asap_sketchlib::KllSketch; +use asap_sketchlib::message_pack_format::MessagePackCodec; use base64::{engine::general_purpose, Engine as _}; use serde_json::Value; use std::collections::HashMap; @@ -42,7 +43,7 @@ impl DatasketchesKLLAccumulator { buffer.len() ); Ok(Self { - inner: KllSketch::deserialize_msgpack(buffer) + inner: KllSketch::from_msgpack(buffer) .map_err(|e| -> Box { e.to_string().into() })?, }) } @@ -113,7 +114,7 @@ impl SerializableToSink for DatasketchesKLLAccumulator { } fn serialize_to_bytes(&self) -> Vec { - self.inner.serialize_msgpack().unwrap_or_default() + self.inner.to_msgpack().unwrap_or_default() } } diff --git a/asap-query-engine/src/precompute_operators/hydra_kll_accumulator.rs b/asap-query-engine/src/precompute_operators/hydra_kll_accumulator.rs index 4df1d8b..2c6d440 100644 --- a/asap-query-engine/src/precompute_operators/hydra_kll_accumulator.rs +++ b/asap-query-engine/src/precompute_operators/hydra_kll_accumulator.rs @@ -6,6 +6,7 @@ use crate::{ KeyByLabelValues, }; use asap_sketchlib::HydraKllSketch; +use asap_sketchlib::message_pack_format::MessagePackCodec; use base64::{engine::general_purpose, Engine as _}; use std::collections::HashMap; @@ -38,7 +39,7 @@ impl HydraKllSketchAccumulator { buffer: &[u8], ) -> Result> { Ok(Self { - inner: HydraKllSketch::deserialize_msgpack(buffer) + inner: HydraKllSketch::from_msgpack(buffer) .map_err(|e| -> Box { e.to_string().into() })?, }) } @@ -51,13 +52,13 @@ impl HydraKllSketchAccumulator { impl SerializableToSink for HydraKllSketchAccumulator { fn serialize_to_json(&self) -> serde_json::Value { // Mirror Python implementation: {"sketch": base64_encoded_string} - let sketch_bytes = self.inner.serialize_msgpack().unwrap_or_default(); + let sketch_bytes = self.inner.to_msgpack().unwrap_or_default(); let sketch_b64 = general_purpose::STANDARD.encode(&sketch_bytes); serde_json::json!({ "sketch": sketch_b64 }) } fn serialize_to_bytes(&self) -> Vec { - self.inner.serialize_msgpack().unwrap_or_default() + self.inner.to_msgpack().unwrap_or_default() } } diff --git a/asap-query-engine/src/precompute_operators/set_aggregator_accumulator.rs b/asap-query-engine/src/precompute_operators/set_aggregator_accumulator.rs index e176ca9..83f5078 100644 --- a/asap-query-engine/src/precompute_operators/set_aggregator_accumulator.rs +++ b/asap-query-engine/src/precompute_operators/set_aggregator_accumulator.rs @@ -3,6 +3,7 @@ use crate::data_model::{ MultipleSubpopulationAggregate, SerializableToSink, }; use asap_sketchlib::SetAggregator; +use asap_sketchlib::message_pack_format::MessagePackCodec; use serde_json::Value; use std::collections::{HashMap, HashSet}; @@ -92,7 +93,7 @@ impl SetAggregatorAccumulator { pub fn deserialize_from_bytes_arroyo( buffer: &[u8], ) -> Result> { - let sa = SetAggregator::deserialize_msgpack(buffer) + let sa = SetAggregator::from_msgpack(buffer) .map_err(|e| -> Box { e.to_string().into() })?; let added = sa .values @@ -109,7 +110,7 @@ impl SetAggregatorAccumulator { for key in &self.added { sa.update(&key.to_semicolon_str()); } - sa.serialize_msgpack().unwrap_or_default() + sa.to_msgpack().unwrap_or_default() } } From e0143d11fb28bb4498d10741b9a8afb97763d04c Mon Sep 17 00:00:00 2001 From: GordonYuanyc Date: Mon, 11 May 2026 21:32:24 +0000 Subject: [PATCH 3/7] chore: bump asap_sketchlib pin to module-restructure HEAD Picks up the asap_sketchlib `bfe8f37` cleanup that deletes `mod wrapper` and moves all wire-format-aligned sketch types (`CountMinSketch`, `CountSketch`, `KllSketch`, `HllSketch`, `DdSketch`, `HydraKllSketch`, `CountMinSketchWithHeap`, `SetAggregator`, `DeltaResult`) into `asap_sketchlib::message_pack_format::portable::*`. The crate-root re-exports that this crate imports are preserved unchanged, so no source changes are needed on this side. Co-Authored-By: Claude Opus 4.7 (1M context) --- Cargo.lock | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 7278e52..23f313c 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -412,7 +412,7 @@ dependencies = [ [[package]] name = "asap_sketchlib" version = "0.1.0" -source = "git+https://github.com/ProjectASAP/asap_sketchlib?branch=refactor%2Fmodule-restructure#56494c2d7227d346db92f9103783edf379d3f61e" +source = "git+https://github.com/ProjectASAP/asap_sketchlib?branch=refactor%2Fmodule-restructure#bfe8f372c8aad09c0144672388e2060f1cd52cd7" dependencies = [ "bytes", "prost", @@ -1618,7 +1618,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "39cab71617ae0d63f51a36d69f866391735b51691dbda63cf6f96d042b63efeb" dependencies = [ "libc", - "windows-sys 0.52.0", + "windows-sys 0.61.2", ] [[package]] @@ -2367,7 +2367,7 @@ checksum = "3640c1c38b8e4e43584d8df18be5fc6b0aa314ce6ebf51b53313d4306cca8e46" dependencies = [ "hermit-abi 0.5.2", "libc", - "windows-sys 0.52.0", + "windows-sys 0.61.2", ] [[package]] @@ -2412,7 +2412,7 @@ dependencies = [ "portable-atomic", "portable-atomic-util", "serde_core", - "windows-sys 0.52.0", + "windows-sys 0.61.2", ] [[package]] @@ -3399,7 +3399,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "be769465445e8c1474e9c5dac2018218498557af32d9ed057325ec9a41ae81bf" dependencies = [ "heck 0.5.0", - "itertools 0.10.5", + "itertools 0.13.0", "log", "multimap", "once_cell", @@ -3419,7 +3419,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8a56d757972c98b346a9b766e3f02746cde6dd1cd1d1d563472929fdd74bec4d" dependencies = [ "anyhow", - "itertools 0.10.5", + "itertools 0.13.0", "proc-macro2", "quote", "syn 2.0.117", @@ -3862,7 +3862,7 @@ dependencies = [ "errno", "libc", "linux-raw-sys", - "windows-sys 0.52.0", + "windows-sys 0.61.2", ] [[package]] @@ -4364,7 +4364,7 @@ dependencies = [ "getrandom 0.4.2", "once_cell", "rustix", - "windows-sys 0.52.0", + "windows-sys 0.61.2", ] [[package]] From e6bf029a6ada30caf2f91dedfd09c53c669724fe Mon Sep 17 00:00:00 2001 From: GordonYuanyc Date: Mon, 11 May 2026 21:48:56 +0000 Subject: [PATCH 4/7] refactor: accumulators hold sketches::* directly, not the wire facade CountMinSketchAccumulator and DatasketchesKLLAccumulator previously held the wire-format facade types (`asap_sketchlib::CountMinSketch`, `asap_sketchlib::KllSketch`), each of which is a thin shim around an underlying `sketches::*` runtime sketch. With Go byte parity proven for the underlying types in `asap_sketchlib::tests::sketches_go_parity_probe`, the facade layer is no longer required. - New `precompute_operators::sketchlib_runtime` module exposes thin adapters (`cms_*` / `kll_*`) that translate the accumulator surface (string keys, `Vec>` matrices, Go-compatible MessagePack envelopes) onto `sketches::CountMin, FastPath, DefaultXxHasher>` and `sketches::KLL` directly. - `CountMinSketchAccumulator.inner` becomes `sketches::CountMin<.., FastPath>`; `DatasketchesKLLAccumulator.inner` becomes `sketches::KLL`. JSON / msgpack / merge paths route through the new helpers. - Wire format stays unchanged: the `CountMinSketchWire { sketch, row_num, col_num }` and `KllSketchData { k, sketch_bytes }` envelopes are still emitted, just constructed inline instead of through the facade's `to_msgpack`. - A handful of `inner.k` field accesses change to `inner.k()` (KLL's `k` is private in `sketches::*` and exposed via accessor). - Bump `asap_sketchlib` pin to `aea7d05` for the new `sketches::KLL::k()` accessor. Co-Authored-By: Claude Opus 4.7 (1M context) --- Cargo.lock | 16 +- .../precompute_engine/accumulator_factory.rs | 8 +- .../src/precompute_engine/worker.rs | 2 +- .../count_min_sketch_accumulator.rs | 89 ++++----- .../datasketches_kll_accumulator.rs | 65 +++---- .../src/precompute_operators/mod.rs | 1 + .../precompute_operators/sketchlib_runtime.rs | 181 ++++++++++++++++++ .../tests/e2e_precompute_equivalence.rs | 2 +- 8 files changed, 262 insertions(+), 102 deletions(-) create mode 100644 asap-query-engine/src/precompute_operators/sketchlib_runtime.rs diff --git a/Cargo.lock b/Cargo.lock index 23f313c..540764e 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -412,7 +412,7 @@ dependencies = [ [[package]] name = "asap_sketchlib" version = "0.1.0" -source = "git+https://github.com/ProjectASAP/asap_sketchlib?branch=refactor%2Fmodule-restructure#bfe8f372c8aad09c0144672388e2060f1cd52cd7" +source = "git+https://github.com/ProjectASAP/asap_sketchlib?branch=refactor%2Fmodule-restructure#aea7d055f1906c6ee05f750989b335ed47c98e3f" dependencies = [ "bytes", "prost", @@ -1618,7 +1618,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "39cab71617ae0d63f51a36d69f866391735b51691dbda63cf6f96d042b63efeb" dependencies = [ "libc", - "windows-sys 0.61.2", + "windows-sys 0.52.0", ] [[package]] @@ -2367,7 +2367,7 @@ checksum = "3640c1c38b8e4e43584d8df18be5fc6b0aa314ce6ebf51b53313d4306cca8e46" dependencies = [ "hermit-abi 0.5.2", "libc", - "windows-sys 0.61.2", + "windows-sys 0.52.0", ] [[package]] @@ -2412,7 +2412,7 @@ dependencies = [ "portable-atomic", "portable-atomic-util", "serde_core", - "windows-sys 0.61.2", + "windows-sys 0.52.0", ] [[package]] @@ -3399,7 +3399,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "be769465445e8c1474e9c5dac2018218498557af32d9ed057325ec9a41ae81bf" dependencies = [ "heck 0.5.0", - "itertools 0.13.0", + "itertools 0.10.5", "log", "multimap", "once_cell", @@ -3419,7 +3419,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8a56d757972c98b346a9b766e3f02746cde6dd1cd1d1d563472929fdd74bec4d" dependencies = [ "anyhow", - "itertools 0.13.0", + "itertools 0.10.5", "proc-macro2", "quote", "syn 2.0.117", @@ -3862,7 +3862,7 @@ dependencies = [ "errno", "libc", "linux-raw-sys", - "windows-sys 0.61.2", + "windows-sys 0.52.0", ] [[package]] @@ -4364,7 +4364,7 @@ dependencies = [ "getrandom 0.4.2", "once_cell", "rustix", - "windows-sys 0.61.2", + "windows-sys 0.52.0", ] [[package]] diff --git a/asap-query-engine/src/precompute_engine/accumulator_factory.rs b/asap-query-engine/src/precompute_engine/accumulator_factory.rs index 13c86b0..30fbc7e 100644 --- a/asap-query-engine/src/precompute_engine/accumulator_factory.rs +++ b/asap-query-engine/src/precompute_engine/accumulator_factory.rs @@ -464,7 +464,11 @@ impl AccumulatorUpdater for CmsAccumulatorUpdater { } fn update_keyed(&mut self, key: &KeyByLabelValues, value: f64, _timestamp_ms: i64) { - self.acc.inner.update(&key.to_semicolon_str(), value); + crate::precompute_operators::sketchlib_runtime::cms_update( + &mut self.acc.inner, + &key.to_semicolon_str(), + value, + ); } impl_clone_accumulator_methods!(acc); @@ -855,6 +859,6 @@ mod tests { .as_any() .downcast_ref::() .expect("should be KLL"); - assert_eq!(kll.inner.k, 50, "k should be 50 from capital-K param"); + assert_eq!(kll.inner.k(), 50, "k should be 50 from capital-K param"); } } diff --git a/asap-query-engine/src/precompute_engine/worker.rs b/asap-query-engine/src/precompute_engine/worker.rs index 37fc8f3..0578197 100644 --- a/asap-query-engine/src/precompute_engine/worker.rs +++ b/asap-query-engine/src/precompute_engine/worker.rs @@ -1511,7 +1511,7 @@ mod tests { handcrafted_output.end_timestamp, arroyo_output.end_timestamp ); - assert_eq!(handcrafted_acc.inner.k, arroyo_acc.inner.k); + assert_eq!(handcrafted_acc.inner.k(), arroyo_acc.inner.k()); assert_eq!(handcrafted_acc.inner.count(), arroyo_acc.inner.count()); for quantile in [0.0, 0.5, 1.0] { diff --git a/asap-query-engine/src/precompute_operators/count_min_sketch_accumulator.rs b/asap-query-engine/src/precompute_operators/count_min_sketch_accumulator.rs index ccde4c7..98de3d1 100644 --- a/asap-query-engine/src/precompute_operators/count_min_sketch_accumulator.rs +++ b/asap-query-engine/src/precompute_operators/count_min_sketch_accumulator.rs @@ -2,35 +2,37 @@ use crate::data_model::{ AggregateCore, AggregationType, KeyByLabelValues, MergeableAccumulator, MultipleSubpopulationAggregate, SerializableToSink, }; -use asap_sketchlib::CountMinSketch; -use asap_sketchlib::message_pack_format::MessagePackCodec; +use crate::precompute_operators::sketchlib_runtime::{ + RuntimeCountMin, cms_estimate, cms_from_matrix, cms_from_msgpack, cms_matrix, + cms_merge_refs, cms_new, cms_to_msgpack, cms_update, +}; use serde_json::Value; use std::collections::HashMap; use promql_utilities::query_logics::enums::Statistic; -/// Count-Min Sketch accumulator — wraps asap_sketchlib::sketches::CountMinSketch. -/// Core struct, update/merge/serde logic live in `asap_sketchlib::sketches`. -/// This file retains QE-specific trait impls, legacy deserializers, and JSON output. +/// Count-Min Sketch accumulator — holds `sketches::CountMin<.., FastPath>` +/// directly. Wire format (Go-compatible MessagePack envelope) and +/// matrix-shape conversions live in `sketchlib_runtime`. #[derive(Debug, Clone)] pub struct CountMinSketchAccumulator { - pub inner: CountMinSketch, + pub inner: RuntimeCountMin, } impl CountMinSketchAccumulator { pub fn new(row_num: usize, col_num: usize) -> Self { Self { - inner: CountMinSketch::new(row_num, col_num), + inner: cms_new(row_num, col_num), } } // Marked as _update and kept private; only called internally. fn _update(&mut self, key: &KeyByLabelValues, value: f64) { - self.inner.update(&key.to_semicolon_str(), value); + cms_update(&mut self.inner, &key.to_semicolon_str(), value); } pub fn query_key(&self, key: &KeyByLabelValues) -> f64 { - self.inner.estimate(&key.to_semicolon_str()) + cms_estimate(&self.inner, &key.to_semicolon_str()) } pub fn deserialize_from_json(data: &Value) -> Result> { @@ -57,7 +59,7 @@ impl CountMinSketchAccumulator { } Ok(Self { - inner: CountMinSketch::from_legacy_matrix(sketch, row_num, col_num), + inner: cms_from_matrix(sketch, row_num, col_num), }) } @@ -65,8 +67,7 @@ impl CountMinSketchAccumulator { buffer: &[u8], ) -> Result> { Ok(Self { - inner: CountMinSketch::from_msgpack(buffer) - .map_err(|e| -> Box { e.to_string().into() })?, + inner: cms_from_msgpack(buffer)?, }) } @@ -109,7 +110,7 @@ impl CountMinSketchAccumulator { } Ok(Self { - inner: CountMinSketch::from_legacy_matrix(sketch, row_num, col_num), + inner: cms_from_matrix(sketch, row_num, col_num), }) } @@ -137,20 +138,9 @@ impl CountMinSketchAccumulator { cms_accumulators.push(cms_acc); } - // Check dimensions are consistent - let rows = cms_accumulators[0].inner.rows(); - let cols = cms_accumulators[0].inner.cols(); - for acc in &cms_accumulators { - if acc.inner.rows() != rows || acc.inner.cols() != cols { - return Err( - "Cannot merge CountMinSketch accumulators with different dimensions".into(), - ); - } - } - - let inner_refs: Vec<&CountMinSketch> = + let inner_refs: Vec<&RuntimeCountMin> = cms_accumulators.iter().map(|acc| &acc.inner).collect(); - let merged_inner = CountMinSketch::merge_refs(&inner_refs)?; + let merged_inner = cms_merge_refs(&inner_refs)?; Ok(Self { inner: merged_inner, }) @@ -162,12 +152,12 @@ impl SerializableToSink for CountMinSketchAccumulator { serde_json::json!({ "row_num": self.inner.rows(), "col_num": self.inner.cols(), - "sketch": self.inner.sketch() + "sketch": cms_matrix(&self.inner) }) } fn serialize_to_bytes(&self) -> Vec { - self.inner.to_msgpack().unwrap_or_default() + cms_to_msgpack(&self.inner) } } @@ -201,7 +191,7 @@ impl AggregateCore for CountMinSketchAccumulator { .downcast_ref::() .ok_or("Failed to downcast to CountMinSketchAccumulator")?; - let merged_inner = CountMinSketch::merge_refs(&[&self.inner, &other_cms.inner])?; + let merged_inner = cms_merge_refs(&[&self.inner, &other_cms.inner])?; Ok(Box::new(Self { inner: merged_inner, })) @@ -251,12 +241,12 @@ impl MergeableAccumulator for CountMinSketchAccumulat if accumulators.is_empty() { return Err("No accumulators to merge".into()); } - let mut iter = accumulators.into_iter(); - let mut merged = iter.next().unwrap(); - for acc in iter { - merged.inner.merge(&acc.inner)?; - } - Ok(merged) + let inner_refs: Vec<&RuntimeCountMin> = + accumulators.iter().map(|acc| &acc.inner).collect(); + let merged_inner = cms_merge_refs(&inner_refs)?; + Ok(Self { + inner: merged_inner, + }) } } @@ -269,7 +259,7 @@ mod tests { let cms = CountMinSketchAccumulator::new(4, 1000); assert_eq!(cms.inner.rows(), 4); assert_eq!(cms.inner.cols(), 1000); - let sketch = cms.inner.sketch(); + let sketch = cms_matrix(&cms.inner); assert_eq!(sketch.len(), 4); assert_eq!(sketch[0].len(), 1000); @@ -301,16 +291,15 @@ mod tests { #[test] fn test_count_min_sketch_merge() { - // Build controlled state via from_legacy_matrix (works for both Legacy and Sketchlib backends). let cms1 = CountMinSketchAccumulator { - inner: CountMinSketch::from_legacy_matrix( + inner: cms_from_matrix( vec![vec![5.0, 0.0, 0.0], vec![0.0, 0.0, 10.0]], 2, 3, ), }; let cms2 = CountMinSketchAccumulator { - inner: CountMinSketch::from_legacy_matrix( + inner: cms_from_matrix( vec![vec![3.0, 7.0, 0.0], vec![0.0, 0.0, 0.0]], 2, 3, @@ -319,7 +308,7 @@ mod tests { let merged = CountMinSketchAccumulator::merge_accumulators(vec![cms1, cms2]).unwrap(); - let merged_sketch = merged.inner.sketch(); + let merged_sketch = cms_matrix(&merged.inner); assert_eq!(merged_sketch[0][0], 8.0); assert_eq!(merged_sketch[0][1], 7.0); assert_eq!(merged_sketch[1][2], 10.0); @@ -336,7 +325,7 @@ mod tests { #[test] fn test_count_min_sketch_serialization() { let cms = CountMinSketchAccumulator { - inner: CountMinSketch::from_legacy_matrix( + inner: cms_from_matrix( vec![vec![0.0, 42.0, 0.0], vec![0.0, 0.0, 100.0]], 2, 3, @@ -349,7 +338,7 @@ mod tests { assert_eq!(deserialized.inner.rows(), 2); assert_eq!(deserialized.inner.cols(), 3); - let deser_sketch = deserialized.inner.sketch(); + let deser_sketch = cms_matrix(&deserialized.inner); assert_eq!(deser_sketch[0][1], 42.0); assert_eq!(deser_sketch[1][2], 100.0); } @@ -379,10 +368,6 @@ mod tests { #[test] fn test_update_and_query_use_same_key_encoding() { - // Regression test: _update and query_key must hash the same key string. - // Previously _update went through serialize_to_json (which returns a JSON - // array, so as_object() is always None) and always stored under key "". - // query_key correctly used key.labels.join(";"), so they never matched. let mut cms = CountMinSketchAccumulator::new(4, 1000); let key = KeyByLabelValues::new_with_labels(vec!["web".to_string(), "prod".to_string()]); cms._update(&key, 5.0); @@ -392,11 +377,8 @@ mod tests { "_update and query_key used different key encodings: got {result}" ); - // Also verify a different key does not interfere. let other_key = KeyByLabelValues::new_with_labels(vec!["api".to_string()]); - // other_key was never updated; its estimate should be lower than key's. let other_result = cms.query_key(&other_key); - // In a sketch this large there should be no collision, so other_result == 0. assert_eq!( other_result, 0.0, "unrelated key returned non-zero: {other_result}" @@ -419,23 +401,22 @@ mod tests { #[test] fn test_count_min_sketch_merge_multiple() { - // Build controlled state via from_legacy_matrix (works for both Legacy and Sketchlib backends). let cms1 = CountMinSketchAccumulator { - inner: CountMinSketch::from_legacy_matrix( + inner: cms_from_matrix( vec![vec![5.0, 0.0, 0.0], vec![0.0, 0.0, 10.0]], 2, 3, ), }; let cms2 = CountMinSketchAccumulator { - inner: CountMinSketch::from_legacy_matrix( + inner: cms_from_matrix( vec![vec![3.0, 7.0, 0.0], vec![0.0, 0.0, 0.0]], 2, 3, ), }; let cms3 = CountMinSketchAccumulator { - inner: CountMinSketch::from_legacy_matrix( + inner: cms_from_matrix( vec![vec![2.0, 0.0, 0.0], vec![0.0, 0.0, 5.0]], 2, 3, @@ -447,7 +428,7 @@ mod tests { let merged = CountMinSketchAccumulator::merge_multiple(&boxed_accs).unwrap(); - let merged_sketch = merged.inner.sketch(); + let merged_sketch = cms_matrix(&merged.inner); assert_eq!(merged_sketch[0][0], 10.0); assert_eq!(merged_sketch[0][1], 7.0); assert_eq!(merged_sketch[1][2], 15.0); diff --git a/asap-query-engine/src/precompute_operators/datasketches_kll_accumulator.rs b/asap-query-engine/src/precompute_operators/datasketches_kll_accumulator.rs index 49df517..6bdb1d5 100644 --- a/asap-query-engine/src/precompute_operators/datasketches_kll_accumulator.rs +++ b/asap-query-engine/src/precompute_operators/datasketches_kll_accumulator.rs @@ -2,9 +2,11 @@ use crate::data_model::{ AggregateCore, AggregationType, MergeableAccumulator, SerializableToSink, SingleSubpopulationAggregate, }; -use asap_sketchlib::KllSketch; -use asap_sketchlib::message_pack_format::MessagePackCodec; -use base64::{engine::general_purpose, Engine as _}; +use crate::precompute_operators::sketchlib_runtime::{ + RuntimeKll, kll_from_msgpack, kll_merge_refs, kll_new, kll_quantile, kll_sketch_bytes, + kll_to_msgpack, kll_update, +}; +use base64::{Engine as _, engine::general_purpose}; use serde_json::Value; use std::collections::HashMap; #[cfg(feature = "extra_debugging")] @@ -13,26 +15,26 @@ use tracing::debug; use promql_utilities::query_logics::enums::Statistic; -/// KLL sketch accumulator — wraps asap_sketchlib::sketches::KllSketch. -/// Core struct, update/merge/serde logic live in `asap_sketchlib::sketches`. -/// This file retains QE-specific trait impls and JSON output. +/// KLL sketch accumulator — holds `sketches::KLL` directly. Wire +/// format (`KllSketchData { k, sketch_bytes }`) and base64-JSON output +/// live in `sketchlib_runtime`. pub struct DatasketchesKLLAccumulator { - pub inner: KllSketch, + pub inner: RuntimeKll, } impl DatasketchesKLLAccumulator { pub fn new(k: u16) -> Self { Self { - inner: KllSketch::new(k), + inner: kll_new(k), } } pub fn update(&mut self, value: f64) { - self.inner.update(value); + kll_update(&mut self.inner, value); } pub fn get_quantile(&self, quantile: f64) -> f64 { - self.inner.quantile(quantile) + kll_quantile(&self.inner, quantile) } pub fn deserialize_from_bytes_arroyo( @@ -43,8 +45,7 @@ impl DatasketchesKLLAccumulator { buffer.len() ); Ok(Self { - inner: KllSketch::from_msgpack(buffer) - .map_err(|e| -> Box { e.to_string().into() })?, + inner: kll_from_msgpack(buffer)?, }) } @@ -72,15 +73,14 @@ impl DatasketchesKLLAccumulator { kll_accumulators.push(kll_acc); } - let inner_refs: Vec<&KllSketch> = kll_accumulators.iter().map(|acc| &acc.inner).collect(); - let merged_inner = KllSketch::merge_refs(&inner_refs)?; + let inner_refs: Vec<&RuntimeKll> = kll_accumulators.iter().map(|acc| &acc.inner).collect(); + let merged_inner = kll_merge_refs(&inner_refs)?; Ok(Self { inner: merged_inner, }) } } -// Manual trait implementations since the C++ library doesn't provide them impl Clone for DatasketchesKLLAccumulator { fn clone(&self) -> Self { Self { @@ -92,29 +92,27 @@ impl Clone for DatasketchesKLLAccumulator { impl std::fmt::Debug for DatasketchesKLLAccumulator { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { f.debug_struct("DatasketchesKLLAccumulator") - .field("k", &self.inner.k) + .field("k", &self.inner.k()) .field("sketch_n", &self.inner.count()) .finish() } } -// TODO: verify this -// Thread safety: The C++ library is not thread-safe by default, but since we're using it -// in a single-threaded context per accumulator instance and only sharing read-only operations, -// this should be safe. +// Thread safety: each accumulator is used in single-threaded contexts; +// only read-only methods are shared across threads. unsafe impl Send for DatasketchesKLLAccumulator {} unsafe impl Sync for DatasketchesKLLAccumulator {} impl SerializableToSink for DatasketchesKLLAccumulator { fn serialize_to_json(&self) -> Value { // Mirror Python implementation: {"sketch": base64_encoded_string} - let sketch_bytes = self.inner.sketch_bytes(); + let sketch_bytes = kll_sketch_bytes(&self.inner); let sketch_b64 = general_purpose::STANDARD.encode(&sketch_bytes); serde_json::json!({ "sketch": sketch_b64 }) } fn serialize_to_bytes(&self) -> Vec { - self.inner.to_msgpack().unwrap_or_default() + kll_to_msgpack(&self.inner) } } @@ -140,7 +138,7 @@ impl AggregateCore for DatasketchesKLLAccumulator { #[cfg(feature = "extra_debugging")] debug!( "[PERF] DatasketchesKLLAccumulator::merge_with() started - self.k={}, self.n={}", - self.inner.k, + self.inner.k(), self.inner.count() ); @@ -157,7 +155,7 @@ impl AggregateCore for DatasketchesKLLAccumulator { .downcast_ref::() .ok_or("Failed to downcast to DatasketchesKLLAccumulator")?; - let merged_inner = KllSketch::merge_refs(&[&self.inner, &other_kll.inner])?; + let merged_inner = kll_merge_refs(&[&self.inner, &other_kll.inner])?; let merged = Self { inner: merged_inner, }; @@ -233,12 +231,11 @@ impl MergeableAccumulator for DatasketchesKLLAccumul if accumulators.is_empty() { return Err("No accumulators to merge".into()); } - let mut iter = accumulators.into_iter(); - let mut merged = iter.next().unwrap(); - for acc in iter { - merged.inner.merge(&acc.inner)?; - } - Ok(merged) + let inner_refs: Vec<&RuntimeKll> = accumulators.iter().map(|acc| &acc.inner).collect(); + let merged_inner = kll_merge_refs(&inner_refs)?; + Ok(Self { + inner: merged_inner, + }) } } @@ -250,7 +247,7 @@ mod tests { fn test_datasketches_kll_creation() { let kll = DatasketchesKLLAccumulator::new(200); assert!(kll.inner.count() == 0); - assert_eq!(kll.inner.k, 200); + assert_eq!(kll.inner.k(), 200); } #[test] @@ -270,7 +267,6 @@ mod tests { } assert_eq!(kll.get_quantile(0.0), 1.0); assert_eq!(kll.get_quantile(1.0), 10.0); - // Sketchlib KLL is approximate; 0.5 quantile of 1..10 may be 5, 6, or 7. let q50 = kll.get_quantile(0.5); assert!((q50 - 6.0).abs() <= 1.0, "expected median ~6, got {q50}"); } @@ -285,7 +281,6 @@ mod tests { let mut query_kwargs = HashMap::new(); query_kwargs.insert("quantile".to_string(), "0.5".to_string()); let result = kll.query(Statistic::Quantile, Some(&query_kwargs)).unwrap(); - // Sketchlib KLL is approximate; 0.5 quantile of 1..10 may be 5, 6, or 7. assert!( (result - 6.0).abs() <= 1.0, "expected median ~6, got {result}" @@ -323,7 +318,7 @@ mod tests { let deserialized = DatasketchesKLLAccumulator::deserialize_from_bytes_arroyo(&bytes).unwrap(); - assert_eq!(deserialized.inner.k, 200); + assert_eq!(deserialized.inner.k(), 200); assert_eq!(deserialized.inner.count(), 5); assert_eq!(deserialized.get_quantile(0.0), 1.0); assert_eq!(deserialized.get_quantile(1.0), 5.0); @@ -353,7 +348,6 @@ mod tests { let mut query_kwargs = HashMap::new(); query_kwargs.insert("quantile".to_string(), "0.5".to_string()); let result = kll.query(Statistic::Quantile, Some(&query_kwargs)).unwrap(); - // Sketchlib KLL is approximate; 0.5 quantile of 1..10 may be 5, 6, or 7. assert!( (result - 6.0).abs() <= 1.0, "expected median ~6, got {result}" @@ -361,7 +355,6 @@ mod tests { query_kwargs.insert("quantile".to_string(), "0.9".to_string()); let result = kll.query(Statistic::Quantile, Some(&query_kwargs)).unwrap(); - // Sketchlib KLL is approximate; 0.9 quantile of 1..10 may be 9 or 10. assert!( (9.0..=10.0).contains(&result), "expected 0.9 quantile in [9,10], got {result}" diff --git a/asap-query-engine/src/precompute_operators/mod.rs b/asap-query-engine/src/precompute_operators/mod.rs index fbbff1c..01d0608 100644 --- a/asap-query-engine/src/precompute_operators/mod.rs +++ b/asap-query-engine/src/precompute_operators/mod.rs @@ -9,6 +9,7 @@ pub mod multiple_increase_accumulator; pub mod multiple_min_max_accumulator; pub mod multiple_sum_accumulator; pub mod set_aggregator_accumulator; +pub mod sketchlib_runtime; pub mod sum_accumulator; pub use count_min_sketch_accumulator::*; diff --git a/asap-query-engine/src/precompute_operators/sketchlib_runtime.rs b/asap-query-engine/src/precompute_operators/sketchlib_runtime.rs new file mode 100644 index 0000000..25c8f4a --- /dev/null +++ b/asap-query-engine/src/precompute_operators/sketchlib_runtime.rs @@ -0,0 +1,181 @@ +//! Thin runtime adapters over `asap_sketchlib::sketches::*`. +//! +//! ASAPQuery accumulators hold the pure-Rust runtime sketch types +//! directly (`sketches::CountMin, FastPath, DefaultXxHasher>`, +//! `sketches::KLL`) and reach for these helpers to translate the +//! accumulator surface (string keys, `Vec>` matrices, +//! Go-compatible msgpack envelopes) onto those underlying types. +//! +//! Cross-language byte parity for the underlying `sketches::*` paths +//! is locked in by +//! `asap_sketchlib::tests::sketches_go_parity_probe`. + +use asap_sketchlib::message_pack_format::portable::countminsketch::CountMinSketchWire; +use asap_sketchlib::message_pack_format::portable::kll::KllSketchData; +use asap_sketchlib::sketches::countminsketch::CountMin; +use asap_sketchlib::sketches::kll::KLL; +use asap_sketchlib::{DataInput, DefaultXxHasher, FastPath, Vector2D}; + +// ============================================================================= +// CountMinSketch — sketches::CountMin, FastPath, DefaultXxHasher> +// ============================================================================= + +/// Concrete runtime CMS type used by `CountMinSketchAccumulator`. Same +/// dimensions + hasher choice as `asap_sketchlib`'s wire-format +/// `CountMinSketch` facade, so the on-the-wire byte shape is identical. +pub type RuntimeCountMin = CountMin, FastPath, DefaultXxHasher>; + +pub fn cms_new(rows: usize, cols: usize) -> RuntimeCountMin { + CountMin::with_dimensions(rows, cols) +} + +pub fn cms_update(sk: &mut RuntimeCountMin, key: &str, value: f64) { + if value <= 0.0 { + return; + } + sk.insert_many(&DataInput::String(key.to_owned()), value); +} + +pub fn cms_estimate(sk: &RuntimeCountMin, key: &str) -> f64 { + sk.estimate(&DataInput::String(key.to_owned())) +} + +/// Snapshot the storage as `Vec>` (used for JSON output + wire DTO). +pub fn cms_matrix(sk: &RuntimeCountMin) -> Vec> { + let storage = sk.as_storage(); + let rows = storage.rows(); + let cols = storage.cols(); + let mut out = vec![vec![0.0f64; cols]; rows]; + for r in 0..rows { + for c in 0..cols { + if let Some(v) = storage.get(r, c) { + out[r][c] = *v; + } + } + } + out +} + +/// Build a CountMin from an existing matrix (used by JSON / legacy +/// byte-format decoders). +pub fn cms_from_matrix(matrix: Vec>, rows: usize, cols: usize) -> RuntimeCountMin { + let storage = Vector2D::from_fn(rows, cols, |r, c| { + matrix + .get(r) + .and_then(|row| row.get(c)) + .copied() + .unwrap_or(0.0) + }); + CountMin::from_storage(storage) +} + +/// Serialize to the Go-compatible MessagePack envelope. +pub fn cms_to_msgpack(sk: &RuntimeCountMin) -> Vec { + let wire = CountMinSketchWire { + sketch: cms_matrix(sk), + rows: sk.rows(), + cols: sk.cols(), + }; + rmp_serde::to_vec(&wire).unwrap_or_default() +} + +/// Deserialize from the Go-compatible MessagePack envelope. +pub fn cms_from_msgpack(bytes: &[u8]) -> Result> { + let wire: CountMinSketchWire = rmp_serde::from_slice(bytes)?; + Ok(cms_from_matrix(wire.sketch, wire.rows, wire.cols)) +} + +/// Merge a slice of CMS references into a single new sketch. +pub fn cms_merge_refs( + sketches: &[&RuntimeCountMin], +) -> Result> { + let first = *sketches + .first() + .ok_or("cms_merge_refs called with empty input")?; + let rows = first.rows(); + let cols = first.cols(); + for s in sketches { + if s.rows() != rows || s.cols() != cols { + return Err(format!( + "CountMin dimension mismatch in merge: expected {rows}x{cols}, got {}x{}", + s.rows(), + s.cols() + ) + .into()); + } + } + let mut merged = cms_new(rows, cols); + for s in sketches { + merged.merge(s); + } + Ok(merged) +} + +// ============================================================================= +// KllSketch — sketches::KLL +// ============================================================================= + +/// Concrete runtime KLL type used by `DatasketchesKLLAccumulator`. +pub type RuntimeKll = KLL; + +pub fn kll_new(k: u16) -> RuntimeKll { + KLL::init_kll(k as i32) +} + +pub fn kll_update(sk: &mut RuntimeKll, value: f64) { + sk.update(&value); +} + +pub fn kll_quantile(sk: &RuntimeKll, q: f64) -> f64 { + if sk.count() == 0 { + return 0.0; + } + sk.quantile(q) +} + +/// Raw msgpack bytes of the KLL backend (sans the `k`-envelope outer +/// wrapper). Used by JSON output (base64-encoded) and the wire codec. +pub fn kll_sketch_bytes(sk: &RuntimeKll) -> Vec { + sk.serialize_to_bytes().unwrap_or_default() +} + +/// Serialize to the Go-compatible `KllSketchData { k, sketch_bytes }` +/// MessagePack envelope. +pub fn kll_to_msgpack(sk: &RuntimeKll) -> Vec { + let wire = KllSketchData { + k: sk.k() as u16, + sketch_bytes: kll_sketch_bytes(sk), + }; + rmp_serde::to_vec(&wire).unwrap_or_default() +} + +/// Deserialize from the Go-compatible `KllSketchData` envelope. +pub fn kll_from_msgpack(bytes: &[u8]) -> Result> { + let wire: KllSketchData = rmp_serde::from_slice(bytes)?; + Ok(KLL::deserialize_from_bytes(&wire.sketch_bytes)?) +} + +/// Merge a slice of KLL references into a single new sketch. All +/// inputs must share the same `k`. +pub fn kll_merge_refs( + sketches: &[&RuntimeKll], +) -> Result> { + let first = *sketches + .first() + .ok_or("kll_merge_refs called with empty input")?; + let k = first.k(); + for s in sketches { + if s.k() != k { + return Err(format!( + "KLL k mismatch in merge: expected {k}, got {}", + s.k() + ) + .into()); + } + } + let mut merged = kll_new(k as u16); + for s in sketches { + merged.merge(s); + } + Ok(merged) +} diff --git a/asap-query-engine/tests/e2e_precompute_equivalence.rs b/asap-query-engine/tests/e2e_precompute_equivalence.rs index 888ef5b..6a8299b 100644 --- a/asap-query-engine/tests/e2e_precompute_equivalence.rs +++ b/asap-query-engine/tests/e2e_precompute_equivalence.rs @@ -265,7 +265,7 @@ async fn e2e_kll_output_matches_arroyo() { // Sketch contents assert_eq!( - handcrafted_acc.inner.k, arroyo_acc.inner.k, + handcrafted_acc.inner.k(), arroyo_acc.inner.k(), "KLL k mismatch" ); assert_eq!( From a1d48dcd6ab8799d18f021ba57f257fabe2b01e5 Mon Sep 17 00:00:00 2001 From: GordonYuanyc Date: Tue, 12 May 2026 16:31:19 +0000 Subject: [PATCH 5/7] style: apply cargo fmt Co-Authored-By: Claude Opus 4.7 (1M context) --- .../count_min_sketch_accumulator.rs | 43 ++++--------------- .../datasketches_kll_accumulator.rs | 10 ++--- .../delta_set_aggregator_accumulator.rs | 2 +- .../hydra_kll_accumulator.rs | 2 +- .../set_aggregator_accumulator.rs | 2 +- .../precompute_operators/sketchlib_runtime.rs | 6 +-- .../tests/e2e_precompute_equivalence.rs | 3 +- 7 files changed, 19 insertions(+), 49 deletions(-) diff --git a/asap-query-engine/src/precompute_operators/count_min_sketch_accumulator.rs b/asap-query-engine/src/precompute_operators/count_min_sketch_accumulator.rs index 98de3d1..70e5ce5 100644 --- a/asap-query-engine/src/precompute_operators/count_min_sketch_accumulator.rs +++ b/asap-query-engine/src/precompute_operators/count_min_sketch_accumulator.rs @@ -3,8 +3,8 @@ use crate::data_model::{ MultipleSubpopulationAggregate, SerializableToSink, }; use crate::precompute_operators::sketchlib_runtime::{ - RuntimeCountMin, cms_estimate, cms_from_matrix, cms_from_msgpack, cms_matrix, - cms_merge_refs, cms_new, cms_to_msgpack, cms_update, + cms_estimate, cms_from_matrix, cms_from_msgpack, cms_matrix, cms_merge_refs, cms_new, + cms_to_msgpack, cms_update, RuntimeCountMin, }; use serde_json::Value; use std::collections::HashMap; @@ -241,8 +241,7 @@ impl MergeableAccumulator for CountMinSketchAccumulat if accumulators.is_empty() { return Err("No accumulators to merge".into()); } - let inner_refs: Vec<&RuntimeCountMin> = - accumulators.iter().map(|acc| &acc.inner).collect(); + let inner_refs: Vec<&RuntimeCountMin> = accumulators.iter().map(|acc| &acc.inner).collect(); let merged_inner = cms_merge_refs(&inner_refs)?; Ok(Self { inner: merged_inner, @@ -292,18 +291,10 @@ mod tests { #[test] fn test_count_min_sketch_merge() { let cms1 = CountMinSketchAccumulator { - inner: cms_from_matrix( - vec![vec![5.0, 0.0, 0.0], vec![0.0, 0.0, 10.0]], - 2, - 3, - ), + inner: cms_from_matrix(vec![vec![5.0, 0.0, 0.0], vec![0.0, 0.0, 10.0]], 2, 3), }; let cms2 = CountMinSketchAccumulator { - inner: cms_from_matrix( - vec![vec![3.0, 7.0, 0.0], vec![0.0, 0.0, 0.0]], - 2, - 3, - ), + inner: cms_from_matrix(vec![vec![3.0, 7.0, 0.0], vec![0.0, 0.0, 0.0]], 2, 3), }; let merged = CountMinSketchAccumulator::merge_accumulators(vec![cms1, cms2]).unwrap(); @@ -325,11 +316,7 @@ mod tests { #[test] fn test_count_min_sketch_serialization() { let cms = CountMinSketchAccumulator { - inner: cms_from_matrix( - vec![vec![0.0, 42.0, 0.0], vec![0.0, 0.0, 100.0]], - 2, - 3, - ), + inner: cms_from_matrix(vec![vec![0.0, 42.0, 0.0], vec![0.0, 0.0, 100.0]], 2, 3), }; let bytes = cms.serialize_to_bytes(); @@ -402,25 +389,13 @@ mod tests { #[test] fn test_count_min_sketch_merge_multiple() { let cms1 = CountMinSketchAccumulator { - inner: cms_from_matrix( - vec![vec![5.0, 0.0, 0.0], vec![0.0, 0.0, 10.0]], - 2, - 3, - ), + inner: cms_from_matrix(vec![vec![5.0, 0.0, 0.0], vec![0.0, 0.0, 10.0]], 2, 3), }; let cms2 = CountMinSketchAccumulator { - inner: cms_from_matrix( - vec![vec![3.0, 7.0, 0.0], vec![0.0, 0.0, 0.0]], - 2, - 3, - ), + inner: cms_from_matrix(vec![vec![3.0, 7.0, 0.0], vec![0.0, 0.0, 0.0]], 2, 3), }; let cms3 = CountMinSketchAccumulator { - inner: cms_from_matrix( - vec![vec![2.0, 0.0, 0.0], vec![0.0, 0.0, 5.0]], - 2, - 3, - ), + inner: cms_from_matrix(vec![vec![2.0, 0.0, 0.0], vec![0.0, 0.0, 5.0]], 2, 3), }; let boxed_accs: Vec> = diff --git a/asap-query-engine/src/precompute_operators/datasketches_kll_accumulator.rs b/asap-query-engine/src/precompute_operators/datasketches_kll_accumulator.rs index 6bdb1d5..8ca9cf5 100644 --- a/asap-query-engine/src/precompute_operators/datasketches_kll_accumulator.rs +++ b/asap-query-engine/src/precompute_operators/datasketches_kll_accumulator.rs @@ -3,10 +3,10 @@ use crate::data_model::{ SingleSubpopulationAggregate, }; use crate::precompute_operators::sketchlib_runtime::{ - RuntimeKll, kll_from_msgpack, kll_merge_refs, kll_new, kll_quantile, kll_sketch_bytes, - kll_to_msgpack, kll_update, + kll_from_msgpack, kll_merge_refs, kll_new, kll_quantile, kll_sketch_bytes, kll_to_msgpack, + kll_update, RuntimeKll, }; -use base64::{Engine as _, engine::general_purpose}; +use base64::{engine::general_purpose, Engine as _}; use serde_json::Value; use std::collections::HashMap; #[cfg(feature = "extra_debugging")] @@ -24,9 +24,7 @@ pub struct DatasketchesKLLAccumulator { impl DatasketchesKLLAccumulator { pub fn new(k: u16) -> Self { - Self { - inner: kll_new(k), - } + Self { inner: kll_new(k) } } pub fn update(&mut self, value: f64) { diff --git a/asap-query-engine/src/precompute_operators/delta_set_aggregator_accumulator.rs b/asap-query-engine/src/precompute_operators/delta_set_aggregator_accumulator.rs index 8261988..a8354be 100644 --- a/asap-query-engine/src/precompute_operators/delta_set_aggregator_accumulator.rs +++ b/asap-query-engine/src/precompute_operators/delta_set_aggregator_accumulator.rs @@ -2,8 +2,8 @@ use crate::data_model::{ AggregateCore, AggregationType, KeyByLabelValues, MergeableAccumulator, MultipleSubpopulationAggregate, SerializableToSink, }; -use asap_sketchlib::DeltaResult; use asap_sketchlib::message_pack_format::MessagePackCodec; +use asap_sketchlib::DeltaResult; use serde_json::Value; use std::collections::{HashMap, HashSet}; diff --git a/asap-query-engine/src/precompute_operators/hydra_kll_accumulator.rs b/asap-query-engine/src/precompute_operators/hydra_kll_accumulator.rs index 2c6d440..52b2f3d 100644 --- a/asap-query-engine/src/precompute_operators/hydra_kll_accumulator.rs +++ b/asap-query-engine/src/precompute_operators/hydra_kll_accumulator.rs @@ -5,8 +5,8 @@ use crate::{ }, KeyByLabelValues, }; -use asap_sketchlib::HydraKllSketch; use asap_sketchlib::message_pack_format::MessagePackCodec; +use asap_sketchlib::HydraKllSketch; use base64::{engine::general_purpose, Engine as _}; use std::collections::HashMap; diff --git a/asap-query-engine/src/precompute_operators/set_aggregator_accumulator.rs b/asap-query-engine/src/precompute_operators/set_aggregator_accumulator.rs index 83f5078..1f569db 100644 --- a/asap-query-engine/src/precompute_operators/set_aggregator_accumulator.rs +++ b/asap-query-engine/src/precompute_operators/set_aggregator_accumulator.rs @@ -2,8 +2,8 @@ use crate::data_model::{ AggregateCore, AggregationType, KeyByLabelValues, MergeableAccumulator, MultipleSubpopulationAggregate, SerializableToSink, }; -use asap_sketchlib::SetAggregator; use asap_sketchlib::message_pack_format::MessagePackCodec; +use asap_sketchlib::SetAggregator; use serde_json::Value; use std::collections::{HashMap, HashSet}; diff --git a/asap-query-engine/src/precompute_operators/sketchlib_runtime.rs b/asap-query-engine/src/precompute_operators/sketchlib_runtime.rs index 25c8f4a..1c8a8c4 100644 --- a/asap-query-engine/src/precompute_operators/sketchlib_runtime.rs +++ b/asap-query-engine/src/precompute_operators/sketchlib_runtime.rs @@ -166,11 +166,7 @@ pub fn kll_merge_refs( let k = first.k(); for s in sketches { if s.k() != k { - return Err(format!( - "KLL k mismatch in merge: expected {k}, got {}", - s.k() - ) - .into()); + return Err(format!("KLL k mismatch in merge: expected {k}, got {}", s.k()).into()); } } let mut merged = kll_new(k as u16); diff --git a/asap-query-engine/tests/e2e_precompute_equivalence.rs b/asap-query-engine/tests/e2e_precompute_equivalence.rs index 6a8299b..6e26bbc 100644 --- a/asap-query-engine/tests/e2e_precompute_equivalence.rs +++ b/asap-query-engine/tests/e2e_precompute_equivalence.rs @@ -265,7 +265,8 @@ async fn e2e_kll_output_matches_arroyo() { // Sketch contents assert_eq!( - handcrafted_acc.inner.k(), arroyo_acc.inner.k(), + handcrafted_acc.inner.k(), + arroyo_acc.inner.k(), "KLL k mismatch" ); assert_eq!( From 654630f0a5e5b5fde00b31a5f86dc29313a2bbbd Mon Sep 17 00:00:00 2001 From: GordonYuanyc Date: Tue, 12 May 2026 16:40:12 +0000 Subject: [PATCH 6/7] fix(clippy): use iter_mut().enumerate() in cms_matrix Co-Authored-By: Claude Opus 4.7 (1M context) --- .../src/precompute_operators/sketchlib_runtime.rs | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/asap-query-engine/src/precompute_operators/sketchlib_runtime.rs b/asap-query-engine/src/precompute_operators/sketchlib_runtime.rs index 1c8a8c4..1f6db6a 100644 --- a/asap-query-engine/src/precompute_operators/sketchlib_runtime.rs +++ b/asap-query-engine/src/precompute_operators/sketchlib_runtime.rs @@ -46,10 +46,10 @@ pub fn cms_matrix(sk: &RuntimeCountMin) -> Vec> { let rows = storage.rows(); let cols = storage.cols(); let mut out = vec![vec![0.0f64; cols]; rows]; - for r in 0..rows { - for c in 0..cols { + for (r, row) in out.iter_mut().enumerate() { + for (c, cell) in row.iter_mut().enumerate() { if let Some(v) = storage.get(r, c) { - out[r][c] = *v; + *cell = *v; } } } From 8652ec2ed4135acc0d2f9c67772156b4ab6960b2 Mon Sep 17 00:00:00 2001 From: GordonYuanyc Date: Tue, 12 May 2026 17:45:36 +0000 Subject: [PATCH 7/7] chore: repin asap_sketchlib to main after module-restructure merge The refactor/module-restructure branch has been merged and deleted on asap_sketchlib. Point at the default branch and lock to current main HEAD so a future cargo fetch doesn't fail on a missing branch ref. Co-Authored-By: Claude Opus 4.7 (1M context) --- Cargo.lock | 16 ++++++++-------- asap-query-engine/Cargo.toml | 2 +- 2 files changed, 9 insertions(+), 9 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 540764e..5b42d28 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -412,7 +412,7 @@ dependencies = [ [[package]] name = "asap_sketchlib" version = "0.1.0" -source = "git+https://github.com/ProjectASAP/asap_sketchlib?branch=refactor%2Fmodule-restructure#aea7d055f1906c6ee05f750989b335ed47c98e3f" +source = "git+https://github.com/ProjectASAP/asap_sketchlib#cd1f10d90d315c36306ff3b10748bac33e5fded6" dependencies = [ "bytes", "prost", @@ -1618,7 +1618,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "39cab71617ae0d63f51a36d69f866391735b51691dbda63cf6f96d042b63efeb" dependencies = [ "libc", - "windows-sys 0.52.0", + "windows-sys 0.61.2", ] [[package]] @@ -2367,7 +2367,7 @@ checksum = "3640c1c38b8e4e43584d8df18be5fc6b0aa314ce6ebf51b53313d4306cca8e46" dependencies = [ "hermit-abi 0.5.2", "libc", - "windows-sys 0.52.0", + "windows-sys 0.61.2", ] [[package]] @@ -2412,7 +2412,7 @@ dependencies = [ "portable-atomic", "portable-atomic-util", "serde_core", - "windows-sys 0.52.0", + "windows-sys 0.61.2", ] [[package]] @@ -3399,7 +3399,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "be769465445e8c1474e9c5dac2018218498557af32d9ed057325ec9a41ae81bf" dependencies = [ "heck 0.5.0", - "itertools 0.10.5", + "itertools 0.13.0", "log", "multimap", "once_cell", @@ -3419,7 +3419,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8a56d757972c98b346a9b766e3f02746cde6dd1cd1d1d563472929fdd74bec4d" dependencies = [ "anyhow", - "itertools 0.10.5", + "itertools 0.13.0", "proc-macro2", "quote", "syn 2.0.117", @@ -3862,7 +3862,7 @@ dependencies = [ "errno", "libc", "linux-raw-sys", - "windows-sys 0.52.0", + "windows-sys 0.61.2", ] [[package]] @@ -4364,7 +4364,7 @@ dependencies = [ "getrandom 0.4.2", "once_cell", "rustix", - "windows-sys 0.52.0", + "windows-sys 0.61.2", ] [[package]] diff --git a/asap-query-engine/Cargo.toml b/asap-query-engine/Cargo.toml index 484f5d2..d2886c3 100644 --- a/asap-query-engine/Cargo.toml +++ b/asap-query-engine/Cargo.toml @@ -59,7 +59,7 @@ figment = { version = "0.10", features = ["yaml"] } arc-swap = "1" csv = "1" elastic_dsl_utilities.workspace = true -asap_sketchlib = { git = "https://github.com/ProjectASAP/asap_sketchlib", branch = "refactor/module-restructure" } +asap_sketchlib = { git = "https://github.com/ProjectASAP/asap_sketchlib" } [[bin]] name = "precompute_engine"