Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 7 additions & 32 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Original file line number Diff line number Diff line change
Expand Up @@ -464,7 +464,11 @@ impl AccumulatorUpdater for CmsAccumulatorUpdater {
}

fn update_keyed(&mut self, key: &KeyByLabelValues, value: f64, _timestamp_ms: i64) {
self.acc.inner.update(&key.to_semicolon_str(), value);
crate::precompute_operators::sketchlib_runtime::cms_update(
&mut self.acc.inner,
&key.to_semicolon_str(),
value,
);
}

impl_clone_accumulator_methods!(acc);
Expand Down Expand Up @@ -855,6 +859,6 @@ mod tests {
.as_any()
.downcast_ref::<crate::precompute_operators::datasketches_kll_accumulator::DatasketchesKLLAccumulator>()
.expect("should be KLL");
assert_eq!(kll.inner.k, 50, "k should be 50 from capital-K param");
assert_eq!(kll.inner.k(), 50, "k should be 50 from capital-K param");
}
}
4 changes: 2 additions & 2 deletions asap-query-engine/src/precompute_engine/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -776,7 +776,7 @@ mod tests {
use crate::precompute_operators::datasketches_kll_accumulator::DatasketchesKLLAccumulator;
use crate::precompute_operators::multiple_sum_accumulator::MultipleSumAccumulator;
use crate::precompute_operators::sum_accumulator::SumAccumulator;
use asap_sketchlib::sketches::kll::KllSketch;
use asap_sketchlib::KllSketch;
use asap_types::enums::{AggregationType, WindowType};

fn make_agg_config(
Expand Down Expand Up @@ -1511,7 +1511,7 @@ mod tests {
handcrafted_output.end_timestamp,
arroyo_output.end_timestamp
);
assert_eq!(handcrafted_acc.inner.k, arroyo_acc.inner.k);
assert_eq!(handcrafted_acc.inner.k(), arroyo_acc.inner.k());
assert_eq!(handcrafted_acc.inner.count(), arroyo_acc.inner.count());

for quantile in [0.0, 0.5, 1.0] {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,34 +2,37 @@ use crate::data_model::{
AggregateCore, AggregationType, KeyByLabelValues, MergeableAccumulator,
MultipleSubpopulationAggregate, SerializableToSink,
};
use asap_sketchlib::sketches::countminsketch::CountMinSketch;
use crate::precompute_operators::sketchlib_runtime::{
cms_estimate, cms_from_matrix, cms_from_msgpack, cms_matrix, cms_merge_refs, cms_new,
cms_to_msgpack, cms_update, RuntimeCountMin,
};
use serde_json::Value;
use std::collections::HashMap;

use promql_utilities::query_logics::enums::Statistic;

/// Count-Min Sketch accumulator — wraps asap_sketchlib::sketches::CountMinSketch.
/// Core struct, update/merge/serde logic live in `asap_sketchlib::sketches`.
/// This file retains QE-specific trait impls, legacy deserializers, and JSON output.
/// Count-Min Sketch accumulator — holds `sketches::CountMin<.., FastPath>`
/// directly. Wire format (Go-compatible MessagePack envelope) and
/// matrix-shape conversions live in `sketchlib_runtime`.
#[derive(Debug, Clone)]
pub struct CountMinSketchAccumulator {
pub inner: CountMinSketch,
pub inner: RuntimeCountMin,
}

impl CountMinSketchAccumulator {
pub fn new(row_num: usize, col_num: usize) -> Self {
Self {
inner: CountMinSketch::new(row_num, col_num),
inner: cms_new(row_num, col_num),
}
}

// Marked as _update and kept private; only called internally.
fn _update(&mut self, key: &KeyByLabelValues, value: f64) {
self.inner.update(&key.to_semicolon_str(), value);
cms_update(&mut self.inner, &key.to_semicolon_str(), value);
}

pub fn query_key(&self, key: &KeyByLabelValues) -> f64 {
self.inner.estimate(&key.to_semicolon_str())
cms_estimate(&self.inner, &key.to_semicolon_str())
}

pub fn deserialize_from_json(data: &Value) -> Result<Self, Box<dyn std::error::Error>> {
Expand All @@ -56,16 +59,15 @@ impl CountMinSketchAccumulator {
}

Ok(Self {
inner: CountMinSketch::from_legacy_matrix(sketch, row_num, col_num),
inner: cms_from_matrix(sketch, row_num, col_num),
})
}

pub fn deserialize_from_bytes_arroyo(
buffer: &[u8],
) -> Result<Self, Box<dyn std::error::Error>> {
Ok(Self {
inner: CountMinSketch::deserialize_msgpack(buffer)
.map_err(|e| -> Box<dyn std::error::Error> { e.to_string().into() })?,
inner: cms_from_msgpack(buffer)?,
})
}

Expand Down Expand Up @@ -108,7 +110,7 @@ impl CountMinSketchAccumulator {
}

Ok(Self {
inner: CountMinSketch::from_legacy_matrix(sketch, row_num, col_num),
inner: cms_from_matrix(sketch, row_num, col_num),
})
}

Expand Down Expand Up @@ -136,20 +138,9 @@ impl CountMinSketchAccumulator {
cms_accumulators.push(cms_acc);
}

// Check dimensions are consistent
let rows = cms_accumulators[0].inner.rows();
let cols = cms_accumulators[0].inner.cols();
for acc in &cms_accumulators {
if acc.inner.rows() != rows || acc.inner.cols() != cols {
return Err(
"Cannot merge CountMinSketch accumulators with different dimensions".into(),
);
}
}

let inner_refs: Vec<&CountMinSketch> =
let inner_refs: Vec<&RuntimeCountMin> =
cms_accumulators.iter().map(|acc| &acc.inner).collect();
let merged_inner = CountMinSketch::merge_refs(&inner_refs)?;
let merged_inner = cms_merge_refs(&inner_refs)?;
Ok(Self {
inner: merged_inner,
})
Expand All @@ -161,12 +152,12 @@ impl SerializableToSink for CountMinSketchAccumulator {
serde_json::json!({
"row_num": self.inner.rows(),
"col_num": self.inner.cols(),
"sketch": self.inner.sketch()
"sketch": cms_matrix(&self.inner)
})
}

fn serialize_to_bytes(&self) -> Vec<u8> {
self.inner.serialize_msgpack().unwrap_or_default()
cms_to_msgpack(&self.inner)
}
}

Expand Down Expand Up @@ -200,7 +191,7 @@ impl AggregateCore for CountMinSketchAccumulator {
.downcast_ref::<CountMinSketchAccumulator>()
.ok_or("Failed to downcast to CountMinSketchAccumulator")?;

let merged_inner = CountMinSketch::merge_refs(&[&self.inner, &other_cms.inner])?;
let merged_inner = cms_merge_refs(&[&self.inner, &other_cms.inner])?;
Ok(Box::new(Self {
inner: merged_inner,
}))
Expand Down Expand Up @@ -250,12 +241,11 @@ impl MergeableAccumulator<CountMinSketchAccumulator> for CountMinSketchAccumulat
if accumulators.is_empty() {
return Err("No accumulators to merge".into());
}
let mut iter = accumulators.into_iter();
let mut merged = iter.next().unwrap();
for acc in iter {
merged.inner.merge(&acc.inner)?;
}
Ok(merged)
let inner_refs: Vec<&RuntimeCountMin> = accumulators.iter().map(|acc| &acc.inner).collect();
let merged_inner = cms_merge_refs(&inner_refs)?;
Ok(Self {
inner: merged_inner,
})
}
}

Expand All @@ -268,7 +258,7 @@ mod tests {
let cms = CountMinSketchAccumulator::new(4, 1000);
assert_eq!(cms.inner.rows(), 4);
assert_eq!(cms.inner.cols(), 1000);
let sketch = cms.inner.sketch();
let sketch = cms_matrix(&cms.inner);
assert_eq!(sketch.len(), 4);
assert_eq!(sketch[0].len(), 1000);

Expand Down Expand Up @@ -300,25 +290,16 @@ mod tests {

#[test]
fn test_count_min_sketch_merge() {
// Build controlled state via from_legacy_matrix (works for both Legacy and Sketchlib backends).
let cms1 = CountMinSketchAccumulator {
inner: CountMinSketch::from_legacy_matrix(
vec![vec![5.0, 0.0, 0.0], vec![0.0, 0.0, 10.0]],
2,
3,
),
inner: cms_from_matrix(vec![vec![5.0, 0.0, 0.0], vec![0.0, 0.0, 10.0]], 2, 3),
};
let cms2 = CountMinSketchAccumulator {
inner: CountMinSketch::from_legacy_matrix(
vec![vec![3.0, 7.0, 0.0], vec![0.0, 0.0, 0.0]],
2,
3,
),
inner: cms_from_matrix(vec![vec![3.0, 7.0, 0.0], vec![0.0, 0.0, 0.0]], 2, 3),
};

let merged = CountMinSketchAccumulator::merge_accumulators(vec![cms1, cms2]).unwrap();

let merged_sketch = merged.inner.sketch();
let merged_sketch = cms_matrix(&merged.inner);
assert_eq!(merged_sketch[0][0], 8.0);
assert_eq!(merged_sketch[0][1], 7.0);
assert_eq!(merged_sketch[1][2], 10.0);
Expand All @@ -335,11 +316,7 @@ mod tests {
#[test]
fn test_count_min_sketch_serialization() {
let cms = CountMinSketchAccumulator {
inner: CountMinSketch::from_legacy_matrix(
vec![vec![0.0, 42.0, 0.0], vec![0.0, 0.0, 100.0]],
2,
3,
),
inner: cms_from_matrix(vec![vec![0.0, 42.0, 0.0], vec![0.0, 0.0, 100.0]], 2, 3),
};

let bytes = cms.serialize_to_bytes();
Expand All @@ -348,7 +325,7 @@ mod tests {

assert_eq!(deserialized.inner.rows(), 2);
assert_eq!(deserialized.inner.cols(), 3);
let deser_sketch = deserialized.inner.sketch();
let deser_sketch = cms_matrix(&deserialized.inner);
assert_eq!(deser_sketch[0][1], 42.0);
assert_eq!(deser_sketch[1][2], 100.0);
}
Expand Down Expand Up @@ -378,10 +355,6 @@ mod tests {

#[test]
fn test_update_and_query_use_same_key_encoding() {
// Regression test: _update and query_key must hash the same key string.
// Previously _update went through serialize_to_json (which returns a JSON
// array, so as_object() is always None) and always stored under key "".
// query_key correctly used key.labels.join(";"), so they never matched.
let mut cms = CountMinSketchAccumulator::new(4, 1000);
let key = KeyByLabelValues::new_with_labels(vec!["web".to_string(), "prod".to_string()]);
cms._update(&key, 5.0);
Expand All @@ -391,11 +364,8 @@ mod tests {
"_update and query_key used different key encodings: got {result}"
);

// Also verify a different key does not interfere.
let other_key = KeyByLabelValues::new_with_labels(vec!["api".to_string()]);
// other_key was never updated; its estimate should be lower than key's.
let other_result = cms.query_key(&other_key);
// In a sketch this large there should be no collision, so other_result == 0.
assert_eq!(
other_result, 0.0,
"unrelated key returned non-zero: {other_result}"
Expand All @@ -418,35 +388,22 @@ mod tests {

#[test]
fn test_count_min_sketch_merge_multiple() {
// Build controlled state via from_legacy_matrix (works for both Legacy and Sketchlib backends).
let cms1 = CountMinSketchAccumulator {
inner: CountMinSketch::from_legacy_matrix(
vec![vec![5.0, 0.0, 0.0], vec![0.0, 0.0, 10.0]],
2,
3,
),
inner: cms_from_matrix(vec![vec![5.0, 0.0, 0.0], vec![0.0, 0.0, 10.0]], 2, 3),
};
let cms2 = CountMinSketchAccumulator {
inner: CountMinSketch::from_legacy_matrix(
vec![vec![3.0, 7.0, 0.0], vec![0.0, 0.0, 0.0]],
2,
3,
),
inner: cms_from_matrix(vec![vec![3.0, 7.0, 0.0], vec![0.0, 0.0, 0.0]], 2, 3),
};
let cms3 = CountMinSketchAccumulator {
inner: CountMinSketch::from_legacy_matrix(
vec![vec![2.0, 0.0, 0.0], vec![0.0, 0.0, 5.0]],
2,
3,
),
inner: cms_from_matrix(vec![vec![2.0, 0.0, 0.0], vec![0.0, 0.0, 5.0]], 2, 3),
};

let boxed_accs: Vec<Box<dyn AggregateCore>> =
vec![Box::new(cms1), Box::new(cms2), Box::new(cms3)];

let merged = CountMinSketchAccumulator::merge_multiple(&boxed_accs).unwrap();

let merged_sketch = merged.inner.sketch();
let merged_sketch = cms_matrix(&merged.inner);
assert_eq!(merged_sketch[0][0], 10.0);
assert_eq!(merged_sketch[0][1], 7.0);
assert_eq!(merged_sketch[1][2], 15.0);
Expand Down
Loading
Loading