From 258f207f47b738ad2ebf9244de7464eec3c137f7 Mon Sep 17 00:00:00 2001 From: Milind Srivastava Date: Fri, 10 Apr 2026 14:56:47 -0400 Subject: [PATCH] refactor: accumulator factory in precomput engine --- .../benches/simple_store_bench.rs | 2 +- .../physical/summary_merge_multiple_exec.rs | 8 +- .../precompute_engine/accumulator_factory.rs | 153 ++++++++++-------- .../datasketches_kll_accumulator.rs | 28 ++-- .../src/tests/capability_matching_tests.rs | 2 +- .../datafusion/accumulator_serde_tests.rs | 4 +- .../plan_execution_temporal_tests.rs | 2 +- .../tests/datafusion/plan_execution_tests.rs | 12 +- .../src/tests/elastic_dsl_query_tests.rs | 2 +- .../src/tests/store_correctness_tests.rs | 2 +- 10 files changed, 119 insertions(+), 96 deletions(-) diff --git a/asap-query-engine/benches/simple_store_bench.rs b/asap-query-engine/benches/simple_store_bench.rs index d225188..ef49abc 100644 --- a/asap-query-engine/benches/simple_store_bench.rs +++ b/asap-query-engine/benches/simple_store_bench.rs @@ -111,7 +111,7 @@ impl AccumulatorKind { Self::Kll => { let mut acc = DatasketchesKLLAccumulator::new(200); for v in 0..20 { - acc._update(v as f64 * (value + 1.0)); + acc.update(v as f64 * (value + 1.0)); } Box::new(acc) } diff --git a/asap-query-engine/src/engines/physical/summary_merge_multiple_exec.rs b/asap-query-engine/src/engines/physical/summary_merge_multiple_exec.rs index f335482..de5ec43 100644 --- a/asap-query-engine/src/engines/physical/summary_merge_multiple_exec.rs +++ b/asap-query-engine/src/engines/physical/summary_merge_multiple_exec.rs @@ -466,11 +466,11 @@ mod tests { #[test] fn test_merge_kll_sketches() { let mut kll1 = DatasketchesKLLAccumulator::new(200); - kll1._update(1.0); - kll1._update(2.0); + kll1.update(1.0); + kll1.update(2.0); let mut kll2 = DatasketchesKLLAccumulator::new(200); - kll2._update(3.0); - kll2._update(4.0); + kll2.update(3.0); + kll2.update(4.0); let bytes1 = serialize_accumulator_arroyo(&kll1); let bytes2 = serialize_accumulator_arroyo(&kll2); diff --git a/asap-query-engine/src/precompute_engine/accumulator_factory.rs b/asap-query-engine/src/precompute_engine/accumulator_factory.rs index 4852126..13c86b0 100644 --- a/asap-query-engine/src/precompute_engine/accumulator_factory.rs +++ b/asap-query-engine/src/precompute_engine/accumulator_factory.rs @@ -104,14 +104,18 @@ impl AccumulatorUpdater for SumAccumulatorUpdater { pub struct MinMaxAccumulatorUpdater { acc: MinMaxAccumulator, - sub_type: String, + is_max: bool, } impl MinMaxAccumulatorUpdater { - pub fn new(sub_type: String) -> Self { + pub fn new(is_max: bool) -> Self { Self { - acc: MinMaxAccumulator::new(sub_type.clone()), - sub_type, + acc: if is_max { + MinMaxAccumulator::new_max() + } else { + MinMaxAccumulator::new_min() + }, + is_max, } } } @@ -128,7 +132,11 @@ impl AccumulatorUpdater for MinMaxAccumulatorUpdater { impl_clone_accumulator_methods!(acc); fn reset(&mut self) { - self.acc = MinMaxAccumulator::new(self.sub_type.clone()); + self.acc = if self.is_max { + MinMaxAccumulator::new_max() + } else { + MinMaxAccumulator::new_min() + }; } fn is_keyed(&self) -> bool { @@ -235,7 +243,7 @@ impl KllAccumulatorUpdater { impl AccumulatorUpdater for KllAccumulatorUpdater { fn update_single(&mut self, value: f64, _timestamp_ms: i64) { - self.acc._update(value); + self.acc.update(value); } fn update_keyed(&mut self, _key: &KeyByLabelValues, value: f64, timestamp_ms: i64) { @@ -259,14 +267,14 @@ impl AccumulatorUpdater for KllAccumulatorUpdater { } // --------------------------------------------------------------------------- -// MultipleSumUpdater +// MultipleSumAccumulatorUpdater // --------------------------------------------------------------------------- -pub struct MultipleSumUpdater { +pub struct MultipleSumAccumulatorUpdater { acc: MultipleSumAccumulator, } -impl MultipleSumUpdater { +impl MultipleSumAccumulatorUpdater { pub fn new() -> Self { Self { acc: MultipleSumAccumulator::new(), @@ -274,15 +282,18 @@ impl MultipleSumUpdater { } } -impl Default for MultipleSumUpdater { +impl Default for MultipleSumAccumulatorUpdater { fn default() -> Self { Self::new() } } -impl AccumulatorUpdater for MultipleSumUpdater { +impl AccumulatorUpdater for MultipleSumAccumulatorUpdater { fn update_single(&mut self, _value: f64, _timestamp_ms: i64) { - // Multiple-subpopulation — use update_keyed instead + debug_assert!( + false, + "update_single called on keyed updater; use update_keyed" + ); } fn update_keyed(&mut self, key: &KeyByLabelValues, value: f64, _timestamp_ms: i64) { @@ -306,26 +317,33 @@ impl AccumulatorUpdater for MultipleSumUpdater { } // --------------------------------------------------------------------------- -// MultipleMinMaxUpdater +// MultipleMinMaxAccumulatorUpdater // --------------------------------------------------------------------------- -pub struct MultipleMinMaxUpdater { +pub struct MultipleMinMaxAccumulatorUpdater { acc: MultipleMinMaxAccumulator, - sub_type: String, + is_max: bool, } -impl MultipleMinMaxUpdater { - pub fn new(sub_type: String) -> Self { +impl MultipleMinMaxAccumulatorUpdater { + pub fn new(is_max: bool) -> Self { Self { - acc: MultipleMinMaxAccumulator::new(sub_type.clone()), - sub_type, + acc: if is_max { + MultipleMinMaxAccumulator::new_max() + } else { + MultipleMinMaxAccumulator::new_min() + }, + is_max, } } } -impl AccumulatorUpdater for MultipleMinMaxUpdater { +impl AccumulatorUpdater for MultipleMinMaxAccumulatorUpdater { fn update_single(&mut self, _value: f64, _timestamp_ms: i64) { - // Multiple-subpopulation — use update_keyed instead + debug_assert!( + false, + "update_single called on keyed updater; use update_keyed" + ); } fn update_keyed(&mut self, key: &KeyByLabelValues, value: f64, _timestamp_ms: i64) { @@ -335,7 +353,11 @@ impl AccumulatorUpdater for MultipleMinMaxUpdater { impl_clone_accumulator_methods!(acc); fn reset(&mut self) { - self.acc = MultipleMinMaxAccumulator::new(self.sub_type.clone()); + self.acc = if self.is_max { + MultipleMinMaxAccumulator::new_max() + } else { + MultipleMinMaxAccumulator::new_min() + }; } fn is_keyed(&self) -> bool { @@ -349,14 +371,14 @@ impl AccumulatorUpdater for MultipleMinMaxUpdater { } // --------------------------------------------------------------------------- -// MultipleIncreaseUpdater +// MultipleIncreaseAccumulatorUpdater // --------------------------------------------------------------------------- -pub struct MultipleIncreaseUpdater { +pub struct MultipleIncreaseAccumulatorUpdater { acc: MultipleIncreaseAccumulator, } -impl MultipleIncreaseUpdater { +impl MultipleIncreaseAccumulatorUpdater { pub fn new() -> Self { Self { acc: MultipleIncreaseAccumulator::new(), @@ -364,32 +386,34 @@ impl MultipleIncreaseUpdater { } } -impl Default for MultipleIncreaseUpdater { +impl Default for MultipleIncreaseAccumulatorUpdater { fn default() -> Self { Self::new() } } -impl AccumulatorUpdater for MultipleIncreaseUpdater { +impl AccumulatorUpdater for MultipleIncreaseAccumulatorUpdater { fn update_single(&mut self, _value: f64, _timestamp_ms: i64) { - // Multiple-subpopulation — use update_keyed instead + debug_assert!( + false, + "update_single called on keyed updater; use update_keyed" + ); } fn update_keyed(&mut self, key: &KeyByLabelValues, value: f64, timestamp_ms: i64) { let measurement = Measurement::new(value); - // If key already exists, update it; otherwise create new - if self.acc.increases.contains_key(key) { - if let Some(existing) = self.acc.increases.get_mut(key) { - existing.update(measurement, timestamp_ms); + match self.acc.increases.entry(key.clone()) { + std::collections::hash_map::Entry::Occupied(mut e) => { + e.get_mut().update(measurement, timestamp_ms); + } + std::collections::hash_map::Entry::Vacant(e) => { + e.insert(IncreaseAccumulator::new( + measurement.clone(), + timestamp_ms, + measurement, + timestamp_ms, + )); } - } else { - let new_acc = IncreaseAccumulator::new( - measurement.clone(), - timestamp_ms, - measurement, - timestamp_ms, - ); - self.acc.update(key.clone(), new_acc); } } @@ -433,7 +457,10 @@ impl CmsAccumulatorUpdater { impl AccumulatorUpdater for CmsAccumulatorUpdater { fn update_single(&mut self, _value: f64, _timestamp_ms: i64) { - // CMS is keyed — use update_keyed + debug_assert!( + false, + "update_single called on keyed updater; use update_keyed" + ); } fn update_keyed(&mut self, key: &KeyByLabelValues, value: f64, _timestamp_ms: i64) { @@ -480,7 +507,10 @@ impl HydraKllAccumulatorUpdater { impl AccumulatorUpdater for HydraKllAccumulatorUpdater { fn update_single(&mut self, _value: f64, _timestamp_ms: i64) { - // HydraKLL is keyed — use update_keyed + debug_assert!( + false, + "update_single called on keyed updater; use update_keyed" + ); } fn update_keyed(&mut self, key: &KeyByLabelValues, value: f64, _timestamp_ms: i64) { @@ -534,7 +564,8 @@ fn kll_k_param(config: &AggregationConfig) -> u16 { .get("K") .or_else(|| config.parameters.get("k")) .and_then(|v| v.as_u64()) - .unwrap_or(200) as u16 + .and_then(|v| u16::try_from(v).ok()) + .unwrap_or(200) } /// Extract `(row_num, col_num)` for CMS / HydraKLL configs. @@ -569,8 +600,8 @@ pub fn create_accumulator_updater(config: &AggregationConfig) -> Box match sub_type { "Sum" | "sum" => Box::new(SumAccumulatorUpdater::new()), - "Min" | "min" => Box::new(MinMaxAccumulatorUpdater::new("min".to_string())), - "Max" | "max" => Box::new(MinMaxAccumulatorUpdater::new("max".to_string())), + "Min" | "min" => Box::new(MinMaxAccumulatorUpdater::new(false)), + "Max" | "max" => Box::new(MinMaxAccumulatorUpdater::new(true)), "Increase" | "increase" => Box::new(IncreaseAccumulatorUpdater::new()), "DatasketchesKLL" | "datasketches_kll" | "KLL" | "kll" => { Box::new(KllAccumulatorUpdater::new(kll_k_param(config))) @@ -584,10 +615,10 @@ pub fn create_accumulator_updater(config: &AggregationConfig) -> Box match sub_type { - "Sum" | "sum" => Box::new(MultipleSumUpdater::new()), - "Min" | "min" => Box::new(MultipleMinMaxUpdater::new("min".to_string())), - "Max" | "max" => Box::new(MultipleMinMaxUpdater::new("max".to_string())), - "Increase" | "increase" => Box::new(MultipleIncreaseUpdater::new()), + "Sum" | "sum" => Box::new(MultipleSumAccumulatorUpdater::new()), + "Min" | "min" => Box::new(MultipleMinMaxAccumulatorUpdater::new(false)), + "Max" | "max" => Box::new(MultipleMinMaxAccumulatorUpdater::new(true)), + "Increase" | "increase" => Box::new(MultipleIncreaseAccumulatorUpdater::new()), "CountMinSketch" | "count_min_sketch" | "CMS" | "cms" => { let (row_num, col_num) = cms_params(config); Box::new(CmsAccumulatorUpdater::new(row_num, col_num)) @@ -601,28 +632,20 @@ pub fn create_accumulator_updater(config: &AggregationConfig) -> Box { Box::new(KllAccumulatorUpdater::new(kll_k_param(config))) } - AggregationType::MultipleSum => Box::new(MultipleSumUpdater::new()), - AggregationType::MultipleIncrease => Box::new(MultipleIncreaseUpdater::new()), - AggregationType::MultipleMinMax => Box::new(MultipleMinMaxUpdater::new( - if sub_type.eq_ignore_ascii_case("max") { - "max".to_string() - } else { - "min".to_string() - }, + AggregationType::MultipleSum => Box::new(MultipleSumAccumulatorUpdater::new()), + AggregationType::MultipleIncrease => Box::new(MultipleIncreaseAccumulatorUpdater::new()), + AggregationType::MultipleMinMax => Box::new(MultipleMinMaxAccumulatorUpdater::new( + sub_type.eq_ignore_ascii_case("max"), )), AggregationType::Sum => Box::new(SumAccumulatorUpdater::new()), AggregationType::MinMax => Box::new(MinMaxAccumulatorUpdater::new( - if sub_type.eq_ignore_ascii_case("max") { - "max".to_string() - } else { - "min".to_string() - }, + sub_type.eq_ignore_ascii_case("max"), )), AggregationType::Increase => Box::new(IncreaseAccumulatorUpdater::new()), AggregationType::CountMinSketch | AggregationType::CountMinSketchWithHeap => { @@ -663,7 +686,7 @@ mod tests { #[test] fn test_minmax_updater() { - let mut updater = MinMaxAccumulatorUpdater::new("max".to_string()); + let mut updater = MinMaxAccumulatorUpdater::new(true); updater.update_single(5.0, 1000); updater.update_single(3.0, 2000); updater.update_single(7.0, 3000); @@ -695,7 +718,7 @@ mod tests { #[test] fn test_multiple_sum_updater() { - let mut updater = MultipleSumUpdater::new(); + let mut updater = MultipleSumAccumulatorUpdater::new(); assert!(updater.is_keyed()); let key_a = KeyByLabelValues::new_with_labels(vec!["a".to_string()]); diff --git a/asap-query-engine/src/precompute_operators/datasketches_kll_accumulator.rs b/asap-query-engine/src/precompute_operators/datasketches_kll_accumulator.rs index 1781de0..df0dfb5 100644 --- a/asap-query-engine/src/precompute_operators/datasketches_kll_accumulator.rs +++ b/asap-query-engine/src/precompute_operators/datasketches_kll_accumulator.rs @@ -26,7 +26,7 @@ impl DatasketchesKLLAccumulator { } } - pub fn _update(&mut self, value: f64) { + pub fn update(&mut self, value: f64) { self.inner.update(value); } @@ -267,9 +267,9 @@ mod tests { #[test] fn test_datasketches_kll_update() { let mut kll = DatasketchesKLLAccumulator::new(200); - kll._update(10.0); - kll._update(20.0); - kll._update(15.0); + kll.update(10.0); + kll.update(20.0); + kll.update(15.0); assert_eq!(kll.inner.count(), 3); } @@ -277,7 +277,7 @@ mod tests { fn test_datasketches_kll_quantile() { let mut kll = DatasketchesKLLAccumulator::new(200); for i in 1..=10 { - kll._update(i as f64); + kll.update(i as f64); } assert_eq!(kll.get_quantile(0.0), 1.0); assert_eq!(kll.get_quantile(1.0), 10.0); @@ -290,7 +290,7 @@ mod tests { fn test_datasketches_kll_query() { let mut kll = DatasketchesKLLAccumulator::new(200); for i in 1..=10 { - kll._update(i as f64); + kll.update(i as f64); } let mut query_kwargs = HashMap::new(); @@ -311,10 +311,10 @@ mod tests { let mut kll2 = DatasketchesKLLAccumulator::new(200); for i in 1..=5 { - kll1._update(i as f64); + kll1.update(i as f64); } for i in 6..=10 { - kll2._update(i as f64); + kll2.update(i as f64); } let merged = DatasketchesKLLAccumulator::merge_accumulators(vec![kll1, kll2]).unwrap(); @@ -327,7 +327,7 @@ mod tests { fn test_datasketches_kll_serialization() { let mut kll = DatasketchesKLLAccumulator::new(200); for i in 1..=5 { - kll._update(i as f64); + kll.update(i as f64); } let bytes = kll.serialize_to_bytes(); @@ -349,7 +349,7 @@ mod tests { #[test] fn test_trait_object() { let mut kll = DatasketchesKLLAccumulator::new(200); - kll._update(5.0); + kll.update(5.0); let trait_obj: Box = Box::new(kll); assert_eq!(trait_obj.type_name(), "DatasketchesKLLAccumulator"); } @@ -358,7 +358,7 @@ mod tests { fn test_datasketches_kll_query_with_kwargs() { let mut kll = DatasketchesKLLAccumulator::new(200); for i in 1..=10 { - kll._update(i as f64); + kll.update(i as f64); } let mut query_kwargs = HashMap::new(); @@ -412,13 +412,13 @@ mod tests { let mut kll3 = DatasketchesKLLAccumulator::new(200); for i in 1..=5 { - kll1._update(i as f64); + kll1.update(i as f64); } for i in 6..=10 { - kll2._update(i as f64); + kll2.update(i as f64); } for i in 11..=15 { - kll3._update(i as f64); + kll3.update(i as f64); } let boxed_accs: Vec> = diff --git a/asap-query-engine/src/tests/capability_matching_tests.rs b/asap-query-engine/src/tests/capability_matching_tests.rs index db10b4c..9cc8d3f 100644 --- a/asap-query-engine/src/tests/capability_matching_tests.rs +++ b/asap-query-engine/src/tests/capability_matching_tests.rs @@ -80,7 +80,7 @@ fn engine_no_query_configs( let acc: Box = match c.aggregation_type.as_str() { "DatasketchesKLL" => { let mut kll = DatasketchesKLLAccumulator::new(200); - kll._update(1.0); + kll.update(1.0); Box::new(kll) } _ => Box::new(SumAccumulator::with_sum(42.0)), diff --git a/asap-query-engine/src/tests/datafusion/accumulator_serde_tests.rs b/asap-query-engine/src/tests/datafusion/accumulator_serde_tests.rs index 7565717..c90e8e5 100644 --- a/asap-query-engine/src/tests/datafusion/accumulator_serde_tests.rs +++ b/asap-query-engine/src/tests/datafusion/accumulator_serde_tests.rs @@ -41,7 +41,7 @@ mod tests { fn test_round_trip_kll() { let mut kll = DatasketchesKLLAccumulator::new(200); for v in [1.0, 2.0, 3.0, 4.0, 5.0] { - kll._update(v); + kll.update(v); } let bytes = serialize_accumulator_arroyo(&kll); @@ -316,7 +316,7 @@ mod tests { fn test_serialize_arroyo_dispatch_kll_uses_native() { // KLL's serialize_to_bytes already uses MessagePack, so arroyo falls through let mut kll = DatasketchesKLLAccumulator::new(200); - kll._update(1.0); + kll.update(1.0); let arroyo_bytes = serialize_accumulator_arroyo(&kll); let native_bytes = kll.serialize_to_bytes(); diff --git a/asap-query-engine/src/tests/datafusion/plan_execution_temporal_tests.rs b/asap-query-engine/src/tests/datafusion/plan_execution_temporal_tests.rs index 8d7e915..0cdaf6b 100644 --- a/asap-query-engine/src/tests/datafusion/plan_execution_temporal_tests.rs +++ b/asap-query-engine/src/tests/datafusion/plan_execution_temporal_tests.rs @@ -140,7 +140,7 @@ mod tests { for (i, &ts) in TEMPORAL_TIMESTAMPS.iter().enumerate() { let mut kll = DatasketchesKLLAccumulator::new(200); // Insert values 10, 20, 30, 40, 50 at successive timestamps - kll._update((i as f64 + 1.0) * 10.0); + kll.update((i as f64 + 1.0) * 10.0); data.push(( ts, Some(vec!["host-a".to_string()]), diff --git a/asap-query-engine/src/tests/datafusion/plan_execution_tests.rs b/asap-query-engine/src/tests/datafusion/plan_execution_tests.rs index ba32446..169d4f3 100644 --- a/asap-query-engine/src/tests/datafusion/plan_execution_tests.rs +++ b/asap-query-engine/src/tests/datafusion/plan_execution_tests.rs @@ -183,11 +183,11 @@ mod tests { async fn test_old_vs_new_kll_quantile() { let mut kll_a = DatasketchesKLLAccumulator::new(200); for v in [10.0, 20.0, 30.0, 40.0, 50.0] { - kll_a._update(v); + kll_a.update(v); } let mut kll_b = DatasketchesKLLAccumulator::new(200); for v in [100.0, 200.0, 300.0] { - kll_b._update(v); + kll_b.update(v); } let engine = create_engine_single_pop( @@ -208,7 +208,7 @@ mod tests { async fn test_old_vs_new_kll_quantile_p99() { let mut kll = DatasketchesKLLAccumulator::new(200); for v in 1..=100 { - kll._update(v as f64); + kll.update(v as f64); } let engine = create_engine_single_pop( @@ -226,7 +226,7 @@ mod tests { async fn test_old_vs_new_kll_quantile_p0() { let mut kll = DatasketchesKLLAccumulator::new(200); for v in [5.0, 10.0, 15.0, 20.0, 25.0] { - kll._update(v); + kll.update(v); } let engine = create_engine_single_pop( @@ -244,7 +244,7 @@ mod tests { async fn test_old_vs_new_kll_quantile_p1() { let mut kll = DatasketchesKLLAccumulator::new(200); for v in [5.0, 10.0, 15.0, 20.0, 25.0] { - kll._update(v); + kll.update(v); } let engine = create_engine_single_pop( @@ -262,7 +262,7 @@ mod tests { async fn test_old_vs_new_kll_quantile_p25() { let mut kll = DatasketchesKLLAccumulator::new(200); for v in 1..=1000 { - kll._update(v as f64); + kll.update(v as f64); } let engine = create_engine_single_pop( diff --git a/asap-query-engine/src/tests/elastic_dsl_query_tests.rs b/asap-query-engine/src/tests/elastic_dsl_query_tests.rs index 07097ea..c11b2a5 100644 --- a/asap-query-engine/src/tests/elastic_dsl_query_tests.rs +++ b/asap-query-engine/src/tests/elastic_dsl_query_tests.rs @@ -14,7 +14,7 @@ mod tests { fn create_kll_accumulator_with_values(values: &[f64]) -> DatasketchesKLLAccumulator { let mut kll = DatasketchesKLLAccumulator::new(200); for &v in values { - kll._update(v); + kll.update(v); } kll } diff --git a/asap-query-engine/src/tests/store_correctness_tests.rs b/asap-query-engine/src/tests/store_correctness_tests.rs index 2032aa0..ef9425b 100644 --- a/asap-query-engine/src/tests/store_correctness_tests.rs +++ b/asap-query-engine/src/tests/store_correctness_tests.rs @@ -836,7 +836,7 @@ fn test_clone_fidelity_min_max(strategy: LockStrategy) { fn test_clone_fidelity_kll(strategy: LockStrategy) { let mut acc = DatasketchesKLLAccumulator::new(200); for v in [1.0, 5.0, 10.0, 50.0, 100.0] { - acc._update(v); + acc.update(v); } roundtrip(strategy, acc); }