Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion asap-query-engine/benches/simple_store_bench.rs
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ impl AccumulatorKind {
Self::Kll => {
let mut acc = DatasketchesKLLAccumulator::new(200);
for v in 0..20 {
acc._update(v as f64 * (value + 1.0));
acc.update(v as f64 * (value + 1.0));
}
Box::new(acc)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -466,11 +466,11 @@ mod tests {
#[test]
fn test_merge_kll_sketches() {
let mut kll1 = DatasketchesKLLAccumulator::new(200);
kll1._update(1.0);
kll1._update(2.0);
kll1.update(1.0);
kll1.update(2.0);
let mut kll2 = DatasketchesKLLAccumulator::new(200);
kll2._update(3.0);
kll2._update(4.0);
kll2.update(3.0);
kll2.update(4.0);

let bytes1 = serialize_accumulator_arroyo(&kll1);
let bytes2 = serialize_accumulator_arroyo(&kll2);
Expand Down
153 changes: 88 additions & 65 deletions asap-query-engine/src/precompute_engine/accumulator_factory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -104,14 +104,18 @@ impl AccumulatorUpdater for SumAccumulatorUpdater {

pub struct MinMaxAccumulatorUpdater {
acc: MinMaxAccumulator,
sub_type: String,
is_max: bool,
}

impl MinMaxAccumulatorUpdater {
pub fn new(sub_type: String) -> Self {
pub fn new(is_max: bool) -> Self {
Self {
acc: MinMaxAccumulator::new(sub_type.clone()),
sub_type,
acc: if is_max {
MinMaxAccumulator::new_max()
} else {
MinMaxAccumulator::new_min()
},
is_max,
}
}
}
Expand All @@ -128,7 +132,11 @@ impl AccumulatorUpdater for MinMaxAccumulatorUpdater {
impl_clone_accumulator_methods!(acc);

fn reset(&mut self) {
self.acc = MinMaxAccumulator::new(self.sub_type.clone());
self.acc = if self.is_max {
MinMaxAccumulator::new_max()
} else {
MinMaxAccumulator::new_min()
};
}

fn is_keyed(&self) -> bool {
Expand Down Expand Up @@ -235,7 +243,7 @@ impl KllAccumulatorUpdater {

impl AccumulatorUpdater for KllAccumulatorUpdater {
fn update_single(&mut self, value: f64, _timestamp_ms: i64) {
self.acc._update(value);
self.acc.update(value);
}

fn update_keyed(&mut self, _key: &KeyByLabelValues, value: f64, timestamp_ms: i64) {
Expand All @@ -259,30 +267,33 @@ impl AccumulatorUpdater for KllAccumulatorUpdater {
}

// ---------------------------------------------------------------------------
// MultipleSumUpdater
// MultipleSumAccumulatorUpdater
// ---------------------------------------------------------------------------

pub struct MultipleSumUpdater {
pub struct MultipleSumAccumulatorUpdater {
acc: MultipleSumAccumulator,
}

impl MultipleSumUpdater {
impl MultipleSumAccumulatorUpdater {
pub fn new() -> Self {
Self {
acc: MultipleSumAccumulator::new(),
}
}
}

impl Default for MultipleSumUpdater {
impl Default for MultipleSumAccumulatorUpdater {
fn default() -> Self {
Self::new()
}
}

impl AccumulatorUpdater for MultipleSumUpdater {
impl AccumulatorUpdater for MultipleSumAccumulatorUpdater {
fn update_single(&mut self, _value: f64, _timestamp_ms: i64) {
// Multiple-subpopulation — use update_keyed instead
debug_assert!(
false,
"update_single called on keyed updater; use update_keyed"
);
}

fn update_keyed(&mut self, key: &KeyByLabelValues, value: f64, _timestamp_ms: i64) {
Expand All @@ -306,26 +317,33 @@ impl AccumulatorUpdater for MultipleSumUpdater {
}

// ---------------------------------------------------------------------------
// MultipleMinMaxUpdater
// MultipleMinMaxAccumulatorUpdater
// ---------------------------------------------------------------------------

pub struct MultipleMinMaxUpdater {
pub struct MultipleMinMaxAccumulatorUpdater {
acc: MultipleMinMaxAccumulator,
sub_type: String,
is_max: bool,
}

impl MultipleMinMaxUpdater {
pub fn new(sub_type: String) -> Self {
impl MultipleMinMaxAccumulatorUpdater {
pub fn new(is_max: bool) -> Self {
Self {
acc: MultipleMinMaxAccumulator::new(sub_type.clone()),
sub_type,
acc: if is_max {
MultipleMinMaxAccumulator::new_max()
} else {
MultipleMinMaxAccumulator::new_min()
},
is_max,
}
}
}

impl AccumulatorUpdater for MultipleMinMaxUpdater {
impl AccumulatorUpdater for MultipleMinMaxAccumulatorUpdater {
fn update_single(&mut self, _value: f64, _timestamp_ms: i64) {
// Multiple-subpopulation — use update_keyed instead
debug_assert!(
false,
"update_single called on keyed updater; use update_keyed"
);
}

fn update_keyed(&mut self, key: &KeyByLabelValues, value: f64, _timestamp_ms: i64) {
Expand All @@ -335,7 +353,11 @@ impl AccumulatorUpdater for MultipleMinMaxUpdater {
impl_clone_accumulator_methods!(acc);

fn reset(&mut self) {
self.acc = MultipleMinMaxAccumulator::new(self.sub_type.clone());
self.acc = if self.is_max {
MultipleMinMaxAccumulator::new_max()
} else {
MultipleMinMaxAccumulator::new_min()
};
}

fn is_keyed(&self) -> bool {
Expand All @@ -349,47 +371,49 @@ impl AccumulatorUpdater for MultipleMinMaxUpdater {
}

// ---------------------------------------------------------------------------
// MultipleIncreaseUpdater
// MultipleIncreaseAccumulatorUpdater
// ---------------------------------------------------------------------------

pub struct MultipleIncreaseUpdater {
pub struct MultipleIncreaseAccumulatorUpdater {
acc: MultipleIncreaseAccumulator,
}

impl MultipleIncreaseUpdater {
impl MultipleIncreaseAccumulatorUpdater {
pub fn new() -> Self {
Self {
acc: MultipleIncreaseAccumulator::new(),
}
}
}

impl Default for MultipleIncreaseUpdater {
impl Default for MultipleIncreaseAccumulatorUpdater {
fn default() -> Self {
Self::new()
}
}

impl AccumulatorUpdater for MultipleIncreaseUpdater {
impl AccumulatorUpdater for MultipleIncreaseAccumulatorUpdater {
fn update_single(&mut self, _value: f64, _timestamp_ms: i64) {
// Multiple-subpopulation — use update_keyed instead
debug_assert!(
false,
"update_single called on keyed updater; use update_keyed"
);
}

fn update_keyed(&mut self, key: &KeyByLabelValues, value: f64, timestamp_ms: i64) {
let measurement = Measurement::new(value);
// If key already exists, update it; otherwise create new
if self.acc.increases.contains_key(key) {
if let Some(existing) = self.acc.increases.get_mut(key) {
existing.update(measurement, timestamp_ms);
match self.acc.increases.entry(key.clone()) {
std::collections::hash_map::Entry::Occupied(mut e) => {
e.get_mut().update(measurement, timestamp_ms);
}
std::collections::hash_map::Entry::Vacant(e) => {
e.insert(IncreaseAccumulator::new(
measurement.clone(),
timestamp_ms,
measurement,
timestamp_ms,
));
}
} else {
let new_acc = IncreaseAccumulator::new(
measurement.clone(),
timestamp_ms,
measurement,
timestamp_ms,
);
self.acc.update(key.clone(), new_acc);
}
}

Expand Down Expand Up @@ -433,7 +457,10 @@ impl CmsAccumulatorUpdater {

impl AccumulatorUpdater for CmsAccumulatorUpdater {
fn update_single(&mut self, _value: f64, _timestamp_ms: i64) {
// CMS is keyed — use update_keyed
debug_assert!(
false,
"update_single called on keyed updater; use update_keyed"
);
}

fn update_keyed(&mut self, key: &KeyByLabelValues, value: f64, _timestamp_ms: i64) {
Expand Down Expand Up @@ -480,7 +507,10 @@ impl HydraKllAccumulatorUpdater {

impl AccumulatorUpdater for HydraKllAccumulatorUpdater {
fn update_single(&mut self, _value: f64, _timestamp_ms: i64) {
// HydraKLL is keyed — use update_keyed
debug_assert!(
false,
"update_single called on keyed updater; use update_keyed"
);
}

fn update_keyed(&mut self, key: &KeyByLabelValues, value: f64, _timestamp_ms: i64) {
Expand Down Expand Up @@ -534,7 +564,8 @@ fn kll_k_param(config: &AggregationConfig) -> u16 {
.get("K")
.or_else(|| config.parameters.get("k"))
.and_then(|v| v.as_u64())
.unwrap_or(200) as u16
.and_then(|v| u16::try_from(v).ok())
.unwrap_or(200)
}

/// Extract `(row_num, col_num)` for CMS / HydraKLL configs.
Expand Down Expand Up @@ -569,8 +600,8 @@ pub fn create_accumulator_updater(config: &AggregationConfig) -> Box<dyn Accumul
match config.aggregation_type {
AggregationType::SingleSubpopulation => match sub_type {
"Sum" | "sum" => Box::new(SumAccumulatorUpdater::new()),
"Min" | "min" => Box::new(MinMaxAccumulatorUpdater::new("min".to_string())),
"Max" | "max" => Box::new(MinMaxAccumulatorUpdater::new("max".to_string())),
"Min" | "min" => Box::new(MinMaxAccumulatorUpdater::new(false)),
"Max" | "max" => Box::new(MinMaxAccumulatorUpdater::new(true)),
"Increase" | "increase" => Box::new(IncreaseAccumulatorUpdater::new()),
"DatasketchesKLL" | "datasketches_kll" | "KLL" | "kll" => {
Box::new(KllAccumulatorUpdater::new(kll_k_param(config)))
Expand All @@ -584,10 +615,10 @@ pub fn create_accumulator_updater(config: &AggregationConfig) -> Box<dyn Accumul
}
},
AggregationType::MultipleSubpopulation => match sub_type {
"Sum" | "sum" => Box::new(MultipleSumUpdater::new()),
"Min" | "min" => Box::new(MultipleMinMaxUpdater::new("min".to_string())),
"Max" | "max" => Box::new(MultipleMinMaxUpdater::new("max".to_string())),
"Increase" | "increase" => Box::new(MultipleIncreaseUpdater::new()),
"Sum" | "sum" => Box::new(MultipleSumAccumulatorUpdater::new()),
"Min" | "min" => Box::new(MultipleMinMaxAccumulatorUpdater::new(false)),
"Max" | "max" => Box::new(MultipleMinMaxAccumulatorUpdater::new(true)),
"Increase" | "increase" => Box::new(MultipleIncreaseAccumulatorUpdater::new()),
"CountMinSketch" | "count_min_sketch" | "CMS" | "cms" => {
let (row_num, col_num) = cms_params(config);
Box::new(CmsAccumulatorUpdater::new(row_num, col_num))
Expand All @@ -601,28 +632,20 @@ pub fn create_accumulator_updater(config: &AggregationConfig) -> Box<dyn Accumul
"Unknown MultipleSubpopulation sub_type '{}', defaulting to Sum",
other
);
Box::new(MultipleSumUpdater::new())
Box::new(MultipleSumAccumulatorUpdater::new())
}
},
AggregationType::DatasketchesKLL => {
Box::new(KllAccumulatorUpdater::new(kll_k_param(config)))
}
AggregationType::MultipleSum => Box::new(MultipleSumUpdater::new()),
AggregationType::MultipleIncrease => Box::new(MultipleIncreaseUpdater::new()),
AggregationType::MultipleMinMax => Box::new(MultipleMinMaxUpdater::new(
if sub_type.eq_ignore_ascii_case("max") {
"max".to_string()
} else {
"min".to_string()
},
AggregationType::MultipleSum => Box::new(MultipleSumAccumulatorUpdater::new()),
AggregationType::MultipleIncrease => Box::new(MultipleIncreaseAccumulatorUpdater::new()),
AggregationType::MultipleMinMax => Box::new(MultipleMinMaxAccumulatorUpdater::new(
sub_type.eq_ignore_ascii_case("max"),
)),
AggregationType::Sum => Box::new(SumAccumulatorUpdater::new()),
AggregationType::MinMax => Box::new(MinMaxAccumulatorUpdater::new(
if sub_type.eq_ignore_ascii_case("max") {
"max".to_string()
} else {
"min".to_string()
},
sub_type.eq_ignore_ascii_case("max"),
)),
AggregationType::Increase => Box::new(IncreaseAccumulatorUpdater::new()),
AggregationType::CountMinSketch | AggregationType::CountMinSketchWithHeap => {
Expand Down Expand Up @@ -663,7 +686,7 @@ mod tests {

#[test]
fn test_minmax_updater() {
let mut updater = MinMaxAccumulatorUpdater::new("max".to_string());
let mut updater = MinMaxAccumulatorUpdater::new(true);
updater.update_single(5.0, 1000);
updater.update_single(3.0, 2000);
updater.update_single(7.0, 3000);
Expand Down Expand Up @@ -695,7 +718,7 @@ mod tests {

#[test]
fn test_multiple_sum_updater() {
let mut updater = MultipleSumUpdater::new();
let mut updater = MultipleSumAccumulatorUpdater::new();
assert!(updater.is_keyed());

let key_a = KeyByLabelValues::new_with_labels(vec!["a".to_string()]);
Expand Down
Loading
Loading