diff --git a/rust/cubestore/.gitignore b/rust/cubestore/.gitignore index 66d0436bd8f34..64f8a320fdced 100644 --- a/rust/cubestore/.gitignore +++ b/rust/cubestore/.gitignore @@ -10,3 +10,6 @@ cubestore/target cubesql/target cubestore-sql-tests/data/** cubestore/db-tmp +# RocksDB scratch dirs left by metastore unit tests (run from the crate root) +/cubestore/test-*-local/ +/cubestore/test-*-upstream/ diff --git a/rust/cubestore/Cargo.lock b/rust/cubestore/Cargo.lock index b10bf2e1f5427..889e7636aa66d 100644 --- a/rust/cubestore/Cargo.lock +++ b/rust/cubestore/Cargo.lock @@ -1758,7 +1758,7 @@ checksum = "c2e66c9d817f1720209181c316d28635c050fa304f9c79e47a520882661b7308" [[package]] name = "datafusion" version = "46.0.1" -source = "git+https://github.com/cube-js/arrow-datafusion?branch=cube-46.0.1#dc9015e290adbeaff1da80c9c052219c50312f77" +source = "git+https://github.com/cube-js/arrow-datafusion?branch=cubestore-hash-aggregate-limit#ef839b67f88734804bb2127c7e27b25122b55690" dependencies = [ "arrow", "arrow-ipc", @@ -1811,7 +1811,7 @@ dependencies = [ [[package]] name = "datafusion-catalog" version = "46.0.1" -source = "git+https://github.com/cube-js/arrow-datafusion?branch=cube-46.0.1#dc9015e290adbeaff1da80c9c052219c50312f77" +source = "git+https://github.com/cube-js/arrow-datafusion?branch=cubestore-hash-aggregate-limit#ef839b67f88734804bb2127c7e27b25122b55690" dependencies = [ "arrow", "async-trait", @@ -1830,7 +1830,7 @@ dependencies = [ [[package]] name = "datafusion-catalog-listing" version = "46.0.1" -source = "git+https://github.com/cube-js/arrow-datafusion?branch=cube-46.0.1#dc9015e290adbeaff1da80c9c052219c50312f77" +source = "git+https://github.com/cube-js/arrow-datafusion?branch=cubestore-hash-aggregate-limit#ef839b67f88734804bb2127c7e27b25122b55690" dependencies = [ "arrow", "async-trait", @@ -1851,7 +1851,7 @@ dependencies = [ [[package]] name = "datafusion-common" version = "46.0.1" -source = "git+https://github.com/cube-js/arrow-datafusion?branch=cube-46.0.1#dc9015e290adbeaff1da80c9c052219c50312f77" +source = "git+https://github.com/cube-js/arrow-datafusion?branch=cubestore-hash-aggregate-limit#ef839b67f88734804bb2127c7e27b25122b55690" dependencies = [ "ahash 0.8.11", "arrow", @@ -1874,7 +1874,7 @@ dependencies = [ [[package]] name = "datafusion-common-runtime" version = "46.0.1" -source = "git+https://github.com/cube-js/arrow-datafusion?branch=cube-46.0.1#dc9015e290adbeaff1da80c9c052219c50312f77" +source = "git+https://github.com/cube-js/arrow-datafusion?branch=cubestore-hash-aggregate-limit#ef839b67f88734804bb2127c7e27b25122b55690" dependencies = [ "log", "tokio", @@ -1883,7 +1883,7 @@ dependencies = [ [[package]] name = "datafusion-datasource" version = "46.0.1" -source = "git+https://github.com/cube-js/arrow-datafusion?branch=cube-46.0.1#dc9015e290adbeaff1da80c9c052219c50312f77" +source = "git+https://github.com/cube-js/arrow-datafusion?branch=cubestore-hash-aggregate-limit#ef839b67f88734804bb2127c7e27b25122b55690" dependencies = [ "arrow", "async-compression 0.4.17", @@ -1916,12 +1916,12 @@ dependencies = [ [[package]] name = "datafusion-doc" version = "46.0.1" -source = "git+https://github.com/cube-js/arrow-datafusion?branch=cube-46.0.1#dc9015e290adbeaff1da80c9c052219c50312f77" +source = "git+https://github.com/cube-js/arrow-datafusion?branch=cubestore-hash-aggregate-limit#ef839b67f88734804bb2127c7e27b25122b55690" [[package]] name = "datafusion-execution" version = "46.0.1" -source = "git+https://github.com/cube-js/arrow-datafusion?branch=cube-46.0.1#dc9015e290adbeaff1da80c9c052219c50312f77" +source = "git+https://github.com/cube-js/arrow-datafusion?branch=cubestore-hash-aggregate-limit#ef839b67f88734804bb2127c7e27b25122b55690" dependencies = [ "arrow", "dashmap", @@ -1941,7 +1941,7 @@ dependencies = [ [[package]] name = "datafusion-expr" version = "46.0.1" -source = "git+https://github.com/cube-js/arrow-datafusion?branch=cube-46.0.1#dc9015e290adbeaff1da80c9c052219c50312f77" +source = "git+https://github.com/cube-js/arrow-datafusion?branch=cubestore-hash-aggregate-limit#ef839b67f88734804bb2127c7e27b25122b55690" dependencies = [ "arrow", "chrono", @@ -1961,7 +1961,7 @@ dependencies = [ [[package]] name = "datafusion-expr-common" version = "46.0.1" -source = "git+https://github.com/cube-js/arrow-datafusion?branch=cube-46.0.1#dc9015e290adbeaff1da80c9c052219c50312f77" +source = "git+https://github.com/cube-js/arrow-datafusion?branch=cubestore-hash-aggregate-limit#ef839b67f88734804bb2127c7e27b25122b55690" dependencies = [ "arrow", "datafusion-common", @@ -1973,7 +1973,7 @@ dependencies = [ [[package]] name = "datafusion-functions" version = "46.0.1" -source = "git+https://github.com/cube-js/arrow-datafusion?branch=cube-46.0.1#dc9015e290adbeaff1da80c9c052219c50312f77" +source = "git+https://github.com/cube-js/arrow-datafusion?branch=cubestore-hash-aggregate-limit#ef839b67f88734804bb2127c7e27b25122b55690" dependencies = [ "arrow", "arrow-buffer", @@ -2001,7 +2001,7 @@ dependencies = [ [[package]] name = "datafusion-functions-aggregate" version = "46.0.1" -source = "git+https://github.com/cube-js/arrow-datafusion?branch=cube-46.0.1#dc9015e290adbeaff1da80c9c052219c50312f77" +source = "git+https://github.com/cube-js/arrow-datafusion?branch=cubestore-hash-aggregate-limit#ef839b67f88734804bb2127c7e27b25122b55690" dependencies = [ "ahash 0.8.11", "arrow", @@ -2021,7 +2021,7 @@ dependencies = [ [[package]] name = "datafusion-functions-aggregate-common" version = "46.0.1" -source = "git+https://github.com/cube-js/arrow-datafusion?branch=cube-46.0.1#dc9015e290adbeaff1da80c9c052219c50312f77" +source = "git+https://github.com/cube-js/arrow-datafusion?branch=cubestore-hash-aggregate-limit#ef839b67f88734804bb2127c7e27b25122b55690" dependencies = [ "ahash 0.8.11", "arrow", @@ -2033,7 +2033,7 @@ dependencies = [ [[package]] name = "datafusion-functions-nested" version = "46.0.1" -source = "git+https://github.com/cube-js/arrow-datafusion?branch=cube-46.0.1#dc9015e290adbeaff1da80c9c052219c50312f77" +source = "git+https://github.com/cube-js/arrow-datafusion?branch=cubestore-hash-aggregate-limit#ef839b67f88734804bb2127c7e27b25122b55690" dependencies = [ "arrow", "arrow-ord", @@ -2053,7 +2053,7 @@ dependencies = [ [[package]] name = "datafusion-functions-table" version = "46.0.1" -source = "git+https://github.com/cube-js/arrow-datafusion?branch=cube-46.0.1#dc9015e290adbeaff1da80c9c052219c50312f77" +source = "git+https://github.com/cube-js/arrow-datafusion?branch=cubestore-hash-aggregate-limit#ef839b67f88734804bb2127c7e27b25122b55690" dependencies = [ "arrow", "async-trait", @@ -2068,7 +2068,7 @@ dependencies = [ [[package]] name = "datafusion-functions-window" version = "46.0.1" -source = "git+https://github.com/cube-js/arrow-datafusion?branch=cube-46.0.1#dc9015e290adbeaff1da80c9c052219c50312f77" +source = "git+https://github.com/cube-js/arrow-datafusion?branch=cubestore-hash-aggregate-limit#ef839b67f88734804bb2127c7e27b25122b55690" dependencies = [ "datafusion-common", "datafusion-doc", @@ -2084,7 +2084,7 @@ dependencies = [ [[package]] name = "datafusion-functions-window-common" version = "46.0.1" -source = "git+https://github.com/cube-js/arrow-datafusion?branch=cube-46.0.1#dc9015e290adbeaff1da80c9c052219c50312f77" +source = "git+https://github.com/cube-js/arrow-datafusion?branch=cubestore-hash-aggregate-limit#ef839b67f88734804bb2127c7e27b25122b55690" dependencies = [ "datafusion-common", "datafusion-physical-expr-common", @@ -2093,7 +2093,7 @@ dependencies = [ [[package]] name = "datafusion-macros" version = "46.0.1" -source = "git+https://github.com/cube-js/arrow-datafusion?branch=cube-46.0.1#dc9015e290adbeaff1da80c9c052219c50312f77" +source = "git+https://github.com/cube-js/arrow-datafusion?branch=cubestore-hash-aggregate-limit#ef839b67f88734804bb2127c7e27b25122b55690" dependencies = [ "datafusion-expr", "quote", @@ -2103,7 +2103,7 @@ dependencies = [ [[package]] name = "datafusion-optimizer" version = "46.0.1" -source = "git+https://github.com/cube-js/arrow-datafusion?branch=cube-46.0.1#dc9015e290adbeaff1da80c9c052219c50312f77" +source = "git+https://github.com/cube-js/arrow-datafusion?branch=cubestore-hash-aggregate-limit#ef839b67f88734804bb2127c7e27b25122b55690" dependencies = [ "arrow", "chrono", @@ -2121,7 +2121,7 @@ dependencies = [ [[package]] name = "datafusion-physical-expr" version = "46.0.1" -source = "git+https://github.com/cube-js/arrow-datafusion?branch=cube-46.0.1#dc9015e290adbeaff1da80c9c052219c50312f77" +source = "git+https://github.com/cube-js/arrow-datafusion?branch=cubestore-hash-aggregate-limit#ef839b67f88734804bb2127c7e27b25122b55690" dependencies = [ "ahash 0.8.11", "arrow", @@ -2142,7 +2142,7 @@ dependencies = [ [[package]] name = "datafusion-physical-expr-common" version = "46.0.1" -source = "git+https://github.com/cube-js/arrow-datafusion?branch=cube-46.0.1#dc9015e290adbeaff1da80c9c052219c50312f77" +source = "git+https://github.com/cube-js/arrow-datafusion?branch=cubestore-hash-aggregate-limit#ef839b67f88734804bb2127c7e27b25122b55690" dependencies = [ "ahash 0.8.11", "arrow", @@ -2155,7 +2155,7 @@ dependencies = [ [[package]] name = "datafusion-physical-optimizer" version = "46.0.1" -source = "git+https://github.com/cube-js/arrow-datafusion?branch=cube-46.0.1#dc9015e290adbeaff1da80c9c052219c50312f77" +source = "git+https://github.com/cube-js/arrow-datafusion?branch=cubestore-hash-aggregate-limit#ef839b67f88734804bb2127c7e27b25122b55690" dependencies = [ "arrow", "datafusion-common", @@ -2173,7 +2173,7 @@ dependencies = [ [[package]] name = "datafusion-physical-plan" version = "46.0.1" -source = "git+https://github.com/cube-js/arrow-datafusion?branch=cube-46.0.1#dc9015e290adbeaff1da80c9c052219c50312f77" +source = "git+https://github.com/cube-js/arrow-datafusion?branch=cubestore-hash-aggregate-limit#ef839b67f88734804bb2127c7e27b25122b55690" dependencies = [ "ahash 0.8.11", "arrow", @@ -2205,7 +2205,7 @@ dependencies = [ [[package]] name = "datafusion-proto" version = "46.0.1" -source = "git+https://github.com/cube-js/arrow-datafusion?branch=cube-46.0.1#dc9015e290adbeaff1da80c9c052219c50312f77" +source = "git+https://github.com/cube-js/arrow-datafusion?branch=cubestore-hash-aggregate-limit#ef839b67f88734804bb2127c7e27b25122b55690" dependencies = [ "arrow", "chrono", @@ -2220,7 +2220,7 @@ dependencies = [ [[package]] name = "datafusion-proto-common" version = "46.0.1" -source = "git+https://github.com/cube-js/arrow-datafusion?branch=cube-46.0.1#dc9015e290adbeaff1da80c9c052219c50312f77" +source = "git+https://github.com/cube-js/arrow-datafusion?branch=cubestore-hash-aggregate-limit#ef839b67f88734804bb2127c7e27b25122b55690" dependencies = [ "arrow", "datafusion-common", @@ -2230,7 +2230,7 @@ dependencies = [ [[package]] name = "datafusion-sql" version = "46.0.1" -source = "git+https://github.com/cube-js/arrow-datafusion?branch=cube-46.0.1#dc9015e290adbeaff1da80c9c052219c50312f77" +source = "git+https://github.com/cube-js/arrow-datafusion?branch=cubestore-hash-aggregate-limit#ef839b67f88734804bb2127c7e27b25122b55690" dependencies = [ "arrow", "bigdecimal 0.4.8", diff --git a/rust/cubestore/cubestore-sql-tests/src/tests.rs b/rust/cubestore/cubestore-sql-tests/src/tests.rs index cb6a87b8339cd..f9d54f405d4b7 100644 --- a/rust/cubestore/cubestore-sql-tests/src/tests.rs +++ b/rust/cubestore/cubestore-sql-tests/src/tests.rs @@ -154,6 +154,8 @@ pub fn sql_tests(prefix: &str) -> Vec<(&'static str, TestFn)> { t("planning_inplace_aggregate", planning_inplace_aggregate), t("planning_hints", planning_hints), t("planning_inplace_aggregate2", planning_inplace_aggregate2), + t("planning_topk_hash_aggregate", planning_topk_hash_aggregate), + t("topk_hash_aggregate_trim", topk_hash_aggregate_trim), t("topk_large_inputs", topk_large_inputs), t("partitioned_index", partitioned_index), t( @@ -330,6 +332,10 @@ pub fn sql_tests(prefix: &str) -> Vec<(&'static str, TestFn)> { "limit_pushdown_group_nonprefix_order", limit_pushdown_group_nonprefix_order, ), + t( + "limit_pushdown_group_null_appended", + limit_pushdown_group_null_appended, + ), t("limit_pushdown_group_order", limit_pushdown_group_order), t( "limit_pushdown_group_where_order", @@ -424,7 +430,10 @@ lazy_static::lazy_static! { "global_aggregate_unique_key_keeps_merge", "limit_pushdown_group_having", "limit_pushdown_group_nonprefix_order", + "limit_pushdown_group_null_appended", "prefilter_chunks_shared_scan", + "planning_topk_hash_aggregate", + "topk_hash_aggregate_trim", ].into_iter().map(ToOwned::to_owned).collect(); } @@ -3201,6 +3210,196 @@ async fn planning_inplace_aggregate(service: Box) -> Result<(), C Ok(()) } +async fn planning_topk_hash_aggregate(service: Box) -> Result<(), CubeError> { + service.exec_query("CREATE SCHEMA s").await?; + service + .exec_query("CREATE TABLE s.Data(url text, day int, hits int)") + .await?; + service + .exec_query("CREATE TABLE s.D3(a int, b int, c int, h int)") + .await?; + + // GROUP BY a non-indexed column -> hash (Linear) partial aggregate; ORDER BY the group + // column with a LIMIT -> the worker partial aggregate is replaced by GroupByLimitAggregate. + let p = service + .plan_query("SELECT day, SUM(hits) FROM s.Data GROUP BY 1 ORDER BY 1 LIMIT 10") + .await?; + let pp = pp_phys_plan_ext(p.worker.as_ref(), &PPOptions::none()); + assert!( + pp.contains("GroupByLimitAggregate, k: 10, factor: 2,"), + "expected GroupByLimitAggregate on the worker, got:\n{}", + pp + ); + + // LIMIT + OFFSET -> k = limit + offset. + let p = service + .plan_query("SELECT day, SUM(hits) FROM s.Data GROUP BY 1 ORDER BY 1 LIMIT 10 OFFSET 5") + .await?; + let pp = pp_phys_plan_ext(p.worker.as_ref(), &PPOptions::none()); + assert!( + pp.contains("GroupByLimitAggregate, k: 15, factor: 2,"), + "expected k=15 (limit+offset), got:\n{}", + pp + ); + + // ORDER BY an aggregate (not a group-by column) -> no trim. + let p = service + .plan_query("SELECT day, SUM(hits) FROM s.Data GROUP BY 1 ORDER BY 2 DESC LIMIT 10") + .await?; + let pp = pp_phys_plan_ext(p.worker.as_ref(), &PPOptions::none()); + assert!( + !pp.contains("GroupByLimitAggregate"), + "did not expect GroupByLimitAggregate when ordering by an aggregate, got:\n{}", + pp + ); + + // No LIMIT -> no trim. + let p = service + .plan_query("SELECT day, SUM(hits) FROM s.Data GROUP BY 1 ORDER BY 1") + .await?; + let pp = pp_phys_plan_ext(p.worker.as_ref(), &PPOptions::none()); + assert!( + !pp.contains("GroupByLimitAggregate"), + "did not expect GroupByLimitAggregate without a limit, got:\n{}", + pp + ); + + // ORDER BY a proper SUBSET of GROUP BY (b out of b, c). The worker cut and the router sort must + // both use the total order T = [b, c]: the worker trim order carries the tie-break column c, and + // the router's global Sort is extended with c so its top-k matches the global top-k by T. + let p = service + .plan_query("SELECT b, c, SUM(h) FROM s.D3 GROUP BY 1, 2 ORDER BY 1 LIMIT 3") + .await?; + let worker_pp = pp_phys_plan_ext(p.worker.as_ref(), &PPOptions::none()); + assert!( + worker_pp.contains("GroupByLimitAggregate, k: 3, factor: 2,") + && worker_pp.contains("(0, SortOptions { descending: false, nulls_first: false })") + && worker_pp.contains("(1, SortOptions { descending: false, nulls_first: true })"), + "expected worker trim order [b, c] totalized, got:\n{}", + worker_pp + ); + let router_pp = pp_phys_plan_ext( + p.router.as_ref(), + &PPOptions { + show_sort_by: true, + ..PPOptions::none() + }, + ); + assert!( + router_pp.contains("b@0") && router_pp.contains("c@1"), + "expected router Sort extended with the tie-break column c, got:\n{}", + router_pp + ); + + // Bare LIMIT (no ORDER BY) on a non-indexed group column: the limit can't ride the index, so the + // worker still trims to the smallest groups by the full group key -- "any k" made deterministic. + // The appended group column uses ascending nulls-first, matching the rewriter's appended-column + // order (`SortOptions::default()`) so the worker cut and the router select agree. + let p = service + .plan_query("SELECT day, SUM(hits) FROM s.Data GROUP BY 1 LIMIT 10") + .await?; + let pp = pp_phys_plan_ext(p.worker.as_ref(), &PPOptions::none()); + assert!( + pp.contains("GroupByLimitAggregate, k: 10, factor: 2,") + && pp.contains("(0, SortOptions { descending: false, nulls_first: true })"), + "expected GroupByLimitAggregate on a bare LIMIT, got:\n{}", + pp + ); + + // UNION ALL + bare LIMIT: the per-branch trim descriptor must survive the cluster-send pull-up + // over the union so the worker still trims above the union. + service + .exec_query("CREATE TABLE s.Data2(url text, day int, hits int)") + .await?; + let p = service + .plan_query( + "SELECT day, SUM(hits) FROM \ + (SELECT * FROM s.Data UNION ALL SELECT * FROM s.Data2) u GROUP BY 1 LIMIT 10", + ) + .await?; + let pp = pp_phys_plan_ext(p.worker.as_ref(), &PPOptions::none()); + assert!( + pp.contains("GroupByLimitAggregate, k: 10, factor: 2,") && pp.contains("Union"), + "expected GroupByLimitAggregate over the Union, got:\n{}", + pp + ); + + Ok(()) +} + +async fn topk_hash_aggregate_trim(service: Box) -> Result<(), CubeError> { + service.exec_query("CREATE SCHEMA s").await?; + service + .exec_query("CREATE TABLE s.Data(a int, b int, hits int)") + .await?; + // 12 distinct (a, b) groups, each with two rows so partial aggregation actually groups. + // With k=3 and factor=2 the trim activates (g=12 > 6) but the result must match a full + // top-k. ORDER BY a (a proper subset of GROUP BY a, b) exercises totalization: the worker + // breaks ties on a by b so the router still receives every needed partial state. + service + .exec_query( + "INSERT INTO s.Data(a, b, hits) VALUES \ + (1,1,10),(1,1,5),(1,2,1),(1,2,2),\ + (2,1,7),(2,1,3),(2,2,4),(2,2,6),\ + (3,1,8),(3,1,2),(3,2,9),(3,2,1),\ + (4,1,1),(4,1,1),(4,2,1),(4,2,1),\ + (5,1,1),(5,1,1),(5,2,1),(5,2,1),\ + (6,1,1),(6,1,1),(6,2,1),(6,2,1)", + ) + .await?; + + // ORDER BY a, b LIMIT 3 (ascending): smallest three groups by (a, b). + let r = service + .exec_query("SELECT a, b, SUM(hits) FROM s.Data GROUP BY 1, 2 ORDER BY 1, 2 LIMIT 3") + .await?; + assert_eq!(to_rows(&r), rows(&[(1, 1, 15), (1, 2, 3), (2, 1, 10)])); + + // ORDER BY a, b DESC LIMIT 3: largest three groups by (a, b). + let r = service + .exec_query( + "SELECT a, b, SUM(hits) FROM s.Data GROUP BY 1, 2 ORDER BY 1 DESC, 2 DESC LIMIT 3", + ) + .await?; + assert_eq!(to_rows(&r), rows(&[(6, 2, 2), (6, 1, 2), (5, 2, 2)])); + + // ORDER BY a only (a proper subset of GROUP BY a, b), LIMIT 2. The selected group SET is + // deterministic (both groups of a=1), but the intra-tie row order is not, so assert as a set. + // Each returned group must carry its complete sum regardless of cross-worker tie-breaking, + // which is what totalization (append b to the cut order) guarantees. + let r = service + .exec_query("SELECT a, b, SUM(hits) FROM s.Data GROUP BY 1, 2 ORDER BY 1 LIMIT 2") + .await?; + let got = to_rows(&r); + assert_eq!(got.len(), 2, "expected 2 rows, got: {:?}", got); + for expected in rows(&[(1, 1, 15), (1, 2, 3)]) { + assert!( + got.contains(&expected), + "missing {:?} in {:?}", + expected, + got + ); + } + + // Bare LIMIT 3 (no ORDER BY): the trim orders by the full group key, so "any 3" resolves to the + // 3 smallest by (a, b). The result order is unspecified, but the group SET and each group's full + // sum must be exact -- the latter guards against undercounting a group split across workers. + let r = service + .exec_query("SELECT a, b, SUM(hits) FROM s.Data GROUP BY 1, 2 LIMIT 3") + .await?; + let got = to_rows(&r); + assert_eq!(got.len(), 3, "expected 3 rows, got: {:?}", got); + for expected in rows(&[(1, 1, 15), (1, 2, 3), (2, 1, 10)]) { + assert!( + got.contains(&expected), + "missing {:?} in {:?}", + expected, + got + ); + } + + Ok(()) +} + async fn planning_hints(service: Box) -> Result<(), CubeError> { service.exec_query("CREATE SCHEMA s").await?; service @@ -9113,6 +9312,53 @@ async fn limit_pushdown_group_nonprefix_order( Ok(()) } +/// Multi-column GROUP BY where the appended (non-ORDER-BY) total-order column carries NULLs and the +/// group spans chunks. `GROUP BY b, c` is not the index prefix, so this is the hash trim path; the +/// worker cut and the router select both extend the order with the appended column `c`, and they must +/// agree on its NULL placement or the NULL group's partial states get dropped across chunks +/// (undercount). The NULL group's membership is fixed by `b` (uniquely smallest), so the assertion +/// does not depend on the tie order -- only on the sum being fully combined across all three chunks. +async fn limit_pushdown_group_null_appended(service: Box) -> Result<(), CubeError> { + service.exec_query("CREATE SCHEMA s").await?; + service + .exec_query("CREATE TABLE s.nag (a int, b int, c int, val int) INDEX bidx (a, b, c)") + .await?; + // Three chunks. The (b = 1, c = NULL) group appears in every chunk, so it spans all three and + // its sum must combine to 1110. Five distinct (b, c) groups so the trim engages (5 > factor*k). + service + .exec_query( + "INSERT INTO s.nag (a, b, c, val) VALUES \ + (1, 1, NULL, 10), (1, 2, 20, 1), (1, 3, 30, 1), (1, 4, 40, 1), (1, 5, 50, 1)", + ) + .await?; + service + .exec_query( + "INSERT INTO s.nag (a, b, c, val) VALUES \ + (1, 1, NULL, 100), (1, 2, 20, 1), (1, 3, 30, 1), (1, 4, 40, 1), (1, 5, 50, 1)", + ) + .await?; + service + .exec_query( + "INSERT INTO s.nag (a, b, c, val) VALUES \ + (1, 1, NULL, 1000), (1, 2, 20, 1), (1, 3, 30, 1), (1, 4, 40, 1), (1, 5, 50, 1)", + ) + .await?; + + // Smallest b is the NULL-c group; its sum must combine across all three chunks. + let r = service + .exec_query("SELECT b, c, sum(val) FROM s.nag GROUP BY b, c ORDER BY b LIMIT 1") + .await?; + assert_eq!( + to_rows(&r), + vec![vec![ + TableValue::Int(1), + TableValue::Null, + TableValue::Int(1110), + ]] + ); + Ok(()) +} + async fn limit_pushdown_group_order(service: Box) -> Result<(), CubeError> { service.exec_query("CREATE SCHEMA foo").await?; diff --git a/rust/cubestore/cubestore/Cargo.toml b/rust/cubestore/cubestore/Cargo.toml index 66bc586d6ad4f..9ab37d88d6c35 100644 --- a/rust/cubestore/cubestore/Cargo.toml +++ b/rust/cubestore/cubestore/Cargo.toml @@ -28,10 +28,10 @@ cubezetasketch = { path = "../cubezetasketch" } cubedatasketches = { path = "../cubedatasketches" } cubeshared = { path = "../../cube/cubeshared" } cuberpc = { path = "../cuberpc" } -datafusion = { git = "https://github.com/cube-js/arrow-datafusion", branch = "cube-46.0.1", features = ["serde"] } -datafusion-datasource = { git = "https://github.com/cube-js/arrow-datafusion", branch = "cube-46.0.1" } -datafusion-proto = { git = "https://github.com/cube-js/arrow-datafusion", branch = "cube-46.0.1" } -datafusion-proto-common = { git = "https://github.com/cube-js/arrow-datafusion", branch = "cube-46.0.1" } +datafusion = { git = "https://github.com/cube-js/arrow-datafusion", branch = "cubestore-hash-aggregate-limit", features = ["serde"] } +datafusion-datasource = { git = "https://github.com/cube-js/arrow-datafusion", branch = "cubestore-hash-aggregate-limit" } +datafusion-proto = { git = "https://github.com/cube-js/arrow-datafusion", branch = "cubestore-hash-aggregate-limit" } +datafusion-proto-common = { git = "https://github.com/cube-js/arrow-datafusion", branch = "cubestore-hash-aggregate-limit" } csv = "1.1.3" bytes = "1.6.0" serde_json = "1.0.56" diff --git a/rust/cubestore/cubestore/src/config/mod.rs b/rust/cubestore/cubestore/src/config/mod.rs index 5cba4c1222959..524228f667d97 100644 --- a/rust/cubestore/cubestore/src/config/mod.rs +++ b/rust/cubestore/cubestore/src/config/mod.rs @@ -585,6 +585,20 @@ pub trait ConfigObj: DIService { /// streaming split never drops rows (the first child is the low catch-all, the last /// the high one), and the legacy per-chunk path performed no such metadata check. fn repartition_check_overlapping_children(&self) -> bool; + /// Factor `f` controlling when the worker-side partial hash aggregate trims its output to the + /// top-k groups. Trimming happens only when the number of local groups exceeds `f * k`, where + /// `k = limit + offset`. `0` disables the optimization. + fn group_by_limit_factor(&self) -> usize; + + /// When the worker group-by-limit hash trim is active, controls where the worker's hash table + /// lives: `false` (default) coalesces the partial aggregate's input to one partition (one hash + /// table per worker, "over merge"); `true` keeps the raw multi-partition input so it runs per + /// partition ("under merge"). + fn group_by_limit_per_partition(&self) -> bool; + + /// Replace the sort-preserving merge feeding a grouped Linear (hash) aggregate with a plain + /// partition coalesce (the hash aggregate ignores input order, so the per-row merge is wasted). + fn coalesce_under_hash_aggregate(&self) -> bool; fn allow_decimal128(&self) -> bool; @@ -745,6 +759,9 @@ pub struct ConfigObjImpl { pub repartition_merge_max_input_files: usize, pub repartition_merge_max_rows: u64, pub repartition_check_overlapping_children: bool, + pub group_by_limit_factor: usize, + pub group_by_limit_per_partition: bool, + pub coalesce_under_hash_aggregate: bool, pub allow_decimal128: bool, pub enable_remove_orphaned_remote_files: bool, pub enable_startup_warmup: bool, @@ -1087,6 +1104,18 @@ impl ConfigObj for ConfigObjImpl { self.repartition_check_overlapping_children } + fn group_by_limit_factor(&self) -> usize { + self.group_by_limit_factor + } + + fn group_by_limit_per_partition(&self) -> bool { + self.group_by_limit_per_partition + } + + fn coalesce_under_hash_aggregate(&self) -> bool { + self.coalesce_under_hash_aggregate + } + fn allow_decimal128(&self) -> bool { self.allow_decimal128 } @@ -1273,6 +1302,13 @@ fn env_bool(name: &str, default: bool) -> bool { .unwrap_or(default) } +/// Lenient boolean env read for opt-in toggles: only `1`/`true` enable it, anything else (including a +/// typo) is treated as off. Unlike [`env_bool`] it never panics -- a malformed value on a +/// performance flag must not take a node down on startup. +fn env_flag(name: &str) -> bool { + env::var(name).map_or(false, |v| v == "true" || v == "1") +} + pub fn env_parse(name: &str, default: T) -> T where T: FromStr, @@ -1783,6 +1819,9 @@ impl Config { "CUBESTORE_REPARTITION_CHECK_OVERLAPPING_CHILDREN", false, ), + group_by_limit_factor: env_parse("CUBESTORE_GROUP_BY_LIMIT_FACTOR", 0), + group_by_limit_per_partition: env_flag("CUBESTORE_GROUP_BY_LIMIT_PER_PARTITION"), + coalesce_under_hash_aggregate: env_flag("CUBESTORE_COALESCE_UNDER_HASH_AGGREGATE"), allow_decimal128: env_bool("CUBESTORE_ALLOW_DECIMAL128", false), enable_remove_orphaned_remote_files: env_bool( "CUBESTORE_ENABLE_REMOVE_ORPHANED_REMOTE_FILES", @@ -2039,6 +2078,9 @@ impl Config { repartition_merge_max_input_files: 50, repartition_merge_max_rows: 4_000_000, repartition_check_overlapping_children: false, + group_by_limit_factor: 2, + group_by_limit_per_partition: false, + coalesce_under_hash_aggregate: false, allow_decimal128: false, enable_remove_orphaned_remote_files: false, enable_startup_warmup: true, @@ -2734,10 +2776,6 @@ impl Config { self.injector .register_typed_with_default::(async move |i| { - let push_partial_aggregate_below_merge = i - .get_service_typed::() - .await - .push_partial_aggregate_below_merge_enabled(); QueryExecutorImpl::new( i.get_service_typed::() .await @@ -2745,7 +2783,7 @@ impl Config { .clone(), i.get_service_typed().await, i.get_service_typed().await, - push_partial_aggregate_below_merge, + i.get_service_typed::().await, ) }) .await; diff --git a/rust/cubestore/cubestore/src/queryplanner/group_by_limit_aggregate/group_by_limit_aggregate_stream.rs b/rust/cubestore/cubestore/src/queryplanner/group_by_limit_aggregate/group_by_limit_aggregate_stream.rs new file mode 100644 index 0000000000000..707b55e889359 --- /dev/null +++ b/rust/cubestore/cubestore/src/queryplanner/group_by_limit_aggregate/group_by_limit_aggregate_stream.rs @@ -0,0 +1,286 @@ +use datafusion::arrow::array::{ArrayRef, AsArray, RecordBatch}; +use datafusion::arrow::compute::{lexsort_to_indices, take, SortColumn, SortOptions}; +use datafusion::arrow::datatypes::SchemaRef; +use datafusion::dfschema::internal_err; +use datafusion::error::Result as DFResult; +use datafusion::execution::{RecordBatchStream, TaskContext}; +use datafusion::logical_expr::{EmitTo, GroupsAccumulator}; +use datafusion::physical_expr::GroupsAccumulatorAdapter; +use datafusion::physical_plan::aggregates::group_values::{new_group_values, GroupValues}; +use datafusion::physical_plan::aggregates::order::GroupOrdering; +use datafusion::physical_plan::aggregates::PhysicalGroupBy; +use datafusion::physical_plan::udaf::AggregateFunctionExpr; +use datafusion::physical_plan::{ExecutionPlan, PhysicalExpr, SendableRecordBatchStream}; +use futures::ready; +use futures::stream::{Stream, StreamExt}; +use std::sync::Arc; +use std::task::{Context, Poll}; + +use super::GroupByLimitAggregateExec; + +enum ExecutionState { + ReadingInput, + ProducingOutput(RecordBatch), + Done, +} + +pub(crate) struct GroupByLimitAggregateStream { + schema: SchemaRef, + input: SendableRecordBatchStream, + aggregate_arguments: Vec>>, + filter_expressions: Vec>>, + group_by: PhysicalGroupBy, + batch_size: usize, + exec_state: ExecutionState, + input_done: bool, + accumulators: Vec>, + group_values: Box, + current_group_indices: Vec, + k: usize, + factor: usize, + order: Vec<(usize, SortOptions)>, +} + +impl GroupByLimitAggregateStream { + pub fn new( + agg: &GroupByLimitAggregateExec, + context: Arc, + partition: usize, + ) -> DFResult { + let agg_schema = Arc::clone(&agg.schema()); + let agg_group_by = agg.group_expr().clone(); + let agg_filter_expr = agg.filter_expr().to_vec(); + + let batch_size = context.session_config().batch_size(); + let input = agg.input().execute(partition, Arc::clone(&context))?; + + let aggregate_arguments = aggregate_expressions(agg.aggr_expr())?; + + let accumulators: Vec<_> = agg + .aggr_expr() + .iter() + .map(create_group_accumulator) + .collect::>()?; + + let group_schema = agg_group_by.group_schema(&agg.input().schema())?; + let group_values = new_group_values(group_schema, &GroupOrdering::None)?; + + Ok(GroupByLimitAggregateStream { + schema: agg_schema, + input, + aggregate_arguments, + filter_expressions: agg_filter_expr, + group_by: agg_group_by, + batch_size, + exec_state: ExecutionState::ReadingInput, + input_done: false, + accumulators, + group_values, + current_group_indices: Vec::with_capacity(batch_size), + k: agg.k(), + factor: agg.factor(), + order: agg.order().to_vec(), + }) + } +} + +impl Stream for GroupByLimitAggregateStream { + type Item = DFResult; + + fn poll_next( + mut self: std::pin::Pin<&mut Self>, + cx: &mut Context<'_>, + ) -> Poll> { + loop { + match &self.exec_state { + ExecutionState::ReadingInput => match ready!(self.input.poll_next_unpin(cx)) { + Some(Ok(batch)) => { + if let Err(e) = self.group_aggregate_batch(batch) { + return Poll::Ready(Some(Err(e))); + } + } + Some(Err(e)) => return Poll::Ready(Some(Err(e))), + // Input exhausted: emit the whole group table at once, then trim to top-k. + None => { + self.input_done = true; + match self.emit_all_trimmed() { + Ok(Some(batch)) => { + self.exec_state = ExecutionState::ProducingOutput(batch) + } + Ok(None) => self.exec_state = ExecutionState::Done, + Err(e) => return Poll::Ready(Some(Err(e))), + } + } + }, + + ExecutionState::ProducingOutput(batch) => { + let batch = batch.clone(); + let size = self.batch_size; + let (next_state, output) = if batch.num_rows() <= size { + (ExecutionState::Done, batch) + } else { + let remaining = batch.slice(size, batch.num_rows() - size); + let output = batch.slice(0, size); + (ExecutionState::ProducingOutput(remaining), output) + }; + self.exec_state = next_state; + return Poll::Ready(Some(Ok(output))); + } + + ExecutionState::Done => return Poll::Ready(None), + } + } + } +} + +impl RecordBatchStream for GroupByLimitAggregateStream { + fn schema(&self) -> SchemaRef { + Arc::clone(&self.schema) + } +} + +impl GroupByLimitAggregateStream { + fn group_aggregate_batch(&mut self, batch: RecordBatch) -> DFResult<()> { + let group_by_values = evaluate_group_by(&self.group_by, &batch)?; + let input_values = evaluate_many(&self.aggregate_arguments, &batch)?; + let filter_values = evaluate_optional(&self.filter_expressions, &batch)?; + + // Grouping sets are rejected by try_new_from_partial, so there is exactly one group value. + debug_assert_eq!(group_by_values.len(), 1, "Exactly 1 group value required"); + self.group_values + .intern(&group_by_values[0], &mut self.current_group_indices)?; + let group_indices = &self.current_group_indices; + let total_num_groups = self.group_values.len(); + + for ((acc, values), opt_filter) in self + .accumulators + .iter_mut() + .zip(input_values.iter()) + .zip(filter_values.iter()) + { + let opt_filter = opt_filter.as_ref().map(|filter| filter.as_boolean()); + acc.update_batch(values, group_indices, opt_filter, total_num_groups)?; + } + Ok(()) + } + + /// Build the partial-state batch for all groups, then keep only the `k` smallest by the total + /// order when the number of groups exceeds `factor * k`. + fn emit_all_trimmed(&mut self) -> DFResult> { + if self.group_values.is_empty() { + return Ok(None); + } + let mut columns = self.group_values.emit(EmitTo::All)?; + for acc in &mut self.accumulators { + columns.extend(acc.state(EmitTo::All)?); + } + let batch = RecordBatch::try_new(Arc::clone(&self.schema), columns)?; + Ok(Some(self.trim_top_k(batch)?)) + } + + fn trim_top_k(&self, batch: RecordBatch) -> DFResult { + let g = batch.num_rows(); + // `factor == 0` means "no trimming" (mirrors the guard in `statistics`); the exec is only + // built with `factor > 0` today, so this is defensive. + if self.k == 0 || self.factor == 0 || g <= self.factor.saturating_mul(self.k) { + return Ok(batch); + } + let sort_columns: Vec = self + .order + .iter() + .map(|(idx, options)| SortColumn { + values: Arc::clone(batch.column(*idx)), + options: Some(*options), + }) + .collect(); + let indices = lexsort_to_indices(&sort_columns, Some(self.k))?; + let columns = batch + .columns() + .iter() + .map(|c| take(c.as_ref(), &indices, None)) + .collect::, _>>()?; + Ok(RecordBatch::try_new(batch.schema(), columns)?) + } +} + +/// Partial-aggregate argument expressions, one vec per aggregate. Mirrors DataFusion's private +/// `aggregate_expressions` for `AggregateMode::Partial` only — the Final-mode column offset that +/// DataFusion's version takes is not needed here, so it is omitted. +fn aggregate_expressions( + aggr_expr: &[Arc], +) -> DFResult>>> { + Ok(aggr_expr + .iter() + .map(|agg| { + let mut result = agg.expressions(); + if let Some(ordering_req) = agg.order_bys() { + result.extend(ordering_req.iter().map(|item| Arc::clone(&item.expr))); + } + result + }) + .collect()) +} + +fn create_group_accumulator( + agg_expr: &Arc, +) -> DFResult> { + if agg_expr.groups_accumulator_supported() { + agg_expr.create_groups_accumulator() + } else { + let agg_expr_captured = Arc::clone(agg_expr); + let factory = move || agg_expr_captured.create_accumulator(); + Ok(Box::new(GroupsAccumulatorAdapter::new(factory))) + } +} + +fn evaluate(expr: &[Arc], batch: &RecordBatch) -> DFResult> { + expr.iter() + .map(|expr| { + expr.evaluate(batch) + .and_then(|v| v.into_array(batch.num_rows())) + }) + .collect() +} + +fn evaluate_many( + expr: &[Vec>], + batch: &RecordBatch, +) -> DFResult>> { + expr.iter().map(|expr| evaluate(expr, batch)).collect() +} + +fn evaluate_optional( + expr: &[Option>], + batch: &RecordBatch, +) -> DFResult>> { + expr.iter() + .map(|expr| { + expr.as_ref() + .map(|expr| { + expr.evaluate(batch) + .and_then(|v| v.into_array(batch.num_rows())) + }) + .transpose() + }) + .collect() +} + +fn evaluate_group_by( + group_by: &PhysicalGroupBy, + batch: &RecordBatch, +) -> DFResult>> { + let exprs: Vec = group_by + .expr() + .iter() + .map(|(expr, _)| { + let value = expr.evaluate(batch)?; + value.into_array(batch.num_rows()) + }) + .collect::>>()?; + + if !group_by.is_single() { + return internal_err!("GroupByLimitAggregate does not support grouping sets"); + } + + Ok(vec![exprs]) +} diff --git a/rust/cubestore/cubestore/src/queryplanner/group_by_limit_aggregate/mod.rs b/rust/cubestore/cubestore/src/queryplanner/group_by_limit_aggregate/mod.rs new file mode 100644 index 0000000000000..f53c07c91cad8 --- /dev/null +++ b/rust/cubestore/cubestore/src/queryplanner/group_by_limit_aggregate/mod.rs @@ -0,0 +1,735 @@ +mod group_by_limit_aggregate_stream; + +use datafusion::arrow::compute::SortOptions; +use datafusion::arrow::datatypes::SchemaRef; +use datafusion::common::stats::Precision; +use datafusion::common::Statistics; +use datafusion::error::Result as DFResult; +use datafusion::execution::TaskContext; +use datafusion::physical_expr::aggregate::AggregateFunctionExpr; +use datafusion::physical_expr::{Distribution, LexRequirement}; +use datafusion::physical_plan::execution_plan::CardinalityEffect; +use datafusion::physical_plan::metrics::MetricsSet; +use datafusion::physical_plan::{aggregates::*, InputOrderMode}; +use datafusion::physical_plan::{ + DisplayAs, DisplayFormatType, ExecutionPlan, ExecutionPlanProperties, PhysicalExpr, + PlanProperties, SendableRecordBatchStream, +}; +use std::any::Any; +use std::fmt::Debug; +use std::sync::Arc; + +/// Worker-side partial hash aggregate that trims its output to the top-k groups by a total order, +/// so far fewer partial-state rows cross the network to the router's Final aggregate. +/// +/// This is a custom copy of DataFusion's partial hash aggregate (it reuses DF's `GroupValues` and +/// `GroupsAccumulator` building blocks but owns the consume/emit loop), so the only change required +/// in the DataFusion fork is making `new_group_values` public. The aggregation builds the whole +/// group table and, at the single final emit, keeps only the `k` smallest groups by `order` when +/// the number of groups exceeds `factor * k`; otherwise it emits all groups unchanged. +/// +/// `order` is a TOTAL order over groups (ORDER BY columns followed by the remaining group-by +/// columns), expressed as `(partial-output column index, sort options)`. A total order is required +/// for correctness: the same group key can live on multiple workers, and a consistent cut across +/// workers guarantees every partial state the router selects reaches it. +#[derive(Debug, Clone)] +pub struct GroupByLimitAggregateExec { + group_by: PhysicalGroupBy, + aggr_expr: Vec>, + filter_expr: Vec>>, + pub input: Arc, + /// Partial-aggregate output schema (group columns followed by accumulator state columns). + schema: SchemaRef, + input_schema: SchemaRef, + cache: PlanProperties, + /// Fetch count, `k = limit + offset`. + k: usize, + /// Only trim when the number of local groups exceeds `factor * k`. + factor: usize, + /// Total order over the partial output columns. + order: Vec<(usize, SortOptions)>, +} + +impl GroupByLimitAggregateExec { + /// Build a `GroupByLimitAggregateExec` from a partial hash `AggregateExec`, or `None` if it is not a + /// single-group-by partial aggregate (grouping sets and non-partial modes are not supported). + pub fn try_new_from_partial( + aggregate: &AggregateExec, + k: usize, + factor: usize, + order: Vec<(usize, SortOptions)>, + ) -> Option { + if *aggregate.mode() != AggregateMode::Partial { + return None; + } + // Sorted-prefix aggregates are handled by InlineAggregateExec; this targets the hash path. + if matches!(aggregate.input_order_mode(), InputOrderMode::Sorted) { + return None; + } + let group_by = aggregate.group_expr().clone(); + if !group_by.is_single() { + return None; + } + // A global aggregate (no GROUP BY) has zero group columns. `GroupValues` can't be built over + // an empty schema -- `intern` indexes column 0 and panics -- and such aggregates need no + // trimming, so leave them to DataFusion. + if group_by.expr().is_empty() { + return None; + } + let input = aggregate.input().clone(); + // A partial aggregate preserves its input's partitioning (it runs once per input partition). + // Derive the output partitioning from the input rather than copying the wrapped aggregate's + // cached value, which can be stale: a later pass may swap our input for one with a different + // partition count via `with_new_children` without the cache following, and a too-low count + // makes the parent coalesce read only some partitions and silently drop the rest. + let cache = aggregate + .cache() + .clone() + .with_partitioning(input.output_partitioning().clone()); + Some(Self { + group_by, + aggr_expr: aggregate.aggr_expr().to_vec(), + filter_expr: aggregate.filter_expr().to_vec(), + input, + schema: aggregate.schema().clone(), + input_schema: aggregate.input_schema().clone(), + cache, + k, + factor, + order, + }) + } + + pub fn k(&self) -> usize { + self.k + } + + pub fn factor(&self) -> usize { + self.factor + } + + pub fn order(&self) -> &[(usize, SortOptions)] { + &self.order + } + + pub fn aggr_expr(&self) -> &[Arc] { + &self.aggr_expr + } + + pub fn filter_expr(&self) -> &[Option>] { + &self.filter_expr + } + + pub fn input(&self) -> &Arc { + &self.input + } + + pub fn group_expr(&self) -> &PhysicalGroupBy { + &self.group_by + } +} + +impl DisplayAs for GroupByLimitAggregateExec { + fn fmt_as(&self, t: DisplayFormatType, f: &mut std::fmt::Formatter) -> std::fmt::Result { + match t { + DisplayFormatType::Default | DisplayFormatType::Verbose => { + write!( + f, + "GroupByLimitAggregateExec: k={}, factor={}, order={:?}", + self.k, self.factor, self.order + )?; + } + } + Ok(()) + } +} + +impl ExecutionPlan for GroupByLimitAggregateExec { + fn name(&self) -> &'static str { + "GroupByLimitAggregateExec" + } + + fn as_any(&self) -> &dyn Any { + self + } + + fn properties(&self) -> &PlanProperties { + &self.cache + } + + fn required_input_distribution(&self) -> Vec { + vec![Distribution::UnspecifiedDistribution] + } + + fn required_input_ordering(&self) -> Vec> { + vec![None] + } + + fn maintains_input_order(&self) -> Vec { + vec![false] + } + + fn children(&self) -> Vec<&Arc> { + vec![&self.input] + } + + fn with_new_children( + self: Arc, + children: Vec>, + ) -> DFResult> { + let input = children[0].clone(); + // Track the (possibly changed) input's partitioning; a partial aggregate preserves it. + let cache = self + .cache + .clone() + .with_partitioning(input.output_partitioning().clone()); + Ok(Arc::new(Self { + group_by: self.group_by.clone(), + aggr_expr: self.aggr_expr.clone(), + filter_expr: self.filter_expr.clone(), + input, + schema: self.schema.clone(), + input_schema: self.input_schema.clone(), + cache, + k: self.k, + factor: self.factor, + order: self.order.clone(), + })) + } + + fn execute( + &self, + partition: usize, + context: Arc, + ) -> DFResult { + let stream = group_by_limit_aggregate_stream::GroupByLimitAggregateStream::new( + self, context, partition, + )?; + Ok(Box::pin(stream)) + } + + fn metrics(&self) -> Option { + None + } + + fn statistics(&self) -> DFResult { + // The trim keeps at most `factor * k` groups per output partition, so the output is bounded + // by that and by the input row count. Report it (inexact) instead of Absent, which makes + // downstream planners bail. `factor` is always > 0 here (the rewriter only builds this exec + // when trimming is enabled), but guard anyway. + let input_rows = self.input.statistics()?.num_rows; + let num_rows = if self.factor == 0 { + input_rows + } else { + let parts = self.cache.output_partitioning().partition_count().max(1); + let cap = self.factor.saturating_mul(self.k).saturating_mul(parts); + match input_rows { + Precision::Exact(n) | Precision::Inexact(n) => Precision::Inexact(n.min(cap)), + Precision::Absent => Precision::Inexact(cap), + } + }; + Ok(Statistics { + num_rows, + column_statistics: Statistics::unknown_column(&self.schema), + total_byte_size: Precision::Absent, + }) + } + + fn cardinality_effect(&self) -> CardinalityEffect { + CardinalityEffect::LowerEqual + } +} + +#[cfg(test)] +mod tests { + use super::*; + use datafusion::arrow::array::{Array, Int64Array, RecordBatch, StringArray}; + use datafusion::arrow::datatypes::{DataType, Field, Schema, SchemaRef}; + use datafusion::functions_aggregate::sum::sum_udaf; + use datafusion::physical_expr::aggregate::AggregateExprBuilder; + use datafusion::physical_expr::expressions::col; + use datafusion::physical_plan::{collect, ExecutionPlanProperties}; + use datafusion_datasource::memory::MemorySourceConfig; + use datafusion_datasource::source::DataSourceExec; + use std::collections::HashSet; + + fn input_schema() -> SchemaRef { + Arc::new(Schema::new(vec![ + Field::new("g0", DataType::Utf8, true), + Field::new("g1", DataType::Int64, true), + Field::new("v", DataType::Int64, false), + ])) + } + + /// `n` distinct (g0, g1) combos, each emitted twice in two different batches; every 7th combo + /// uses a NULL g1 to exercise null grouping. `v = 1` for every row, so the correct result is `n` + /// groups with `sum(v) = 2` each. + fn make_batches(n: usize, batch_rows: usize) -> Vec { + let schema = input_schema(); + let mut g0: Vec> = Vec::new(); + let mut g1: Vec> = Vec::new(); + for pass in 0..2 { + // Reverse one pass so the same combo lands in a different batch than the other pass. + let iter: Box> = if pass == 0 { + Box::new(0..n) + } else { + Box::new((0..n).rev()) + }; + for i in iter { + // g0 is unique per combo, so each (g0, g1) pair is distinct regardless of g1; g1 + // carries NULLs to exercise null grouping in the multi-column GroupValues path. + g0.push(Some(format!("a{}", i))); + g1.push(if i % 7 == 0 { + None + } else { + Some((i % 100) as i64) + }); + } + } + let total = g0.len(); + let mut batches = Vec::new(); + let mut start = 0; + while start < total { + let end = (start + batch_rows).min(total); + let s = StringArray::from(g0[start..end].to_vec()); + let l = Int64Array::from(g1[start..end].to_vec()); + let v = Int64Array::from(vec![1i64; end - start]); + batches.push( + RecordBatch::try_new(schema.clone(), vec![Arc::new(s), Arc::new(l), Arc::new(v)]) + .unwrap(), + ); + start = end; + } + batches + } + + fn partial_aggregate(input: Arc) -> AggregateExec { + let schema = input.schema(); + let group_by = PhysicalGroupBy::new_single(vec![ + (col("g0", &schema).unwrap(), "g0".to_string()), + (col("g1", &schema).unwrap(), "g1".to_string()), + ]); + let sum = AggregateExprBuilder::new(sum_udaf(), vec![col("v", &schema).unwrap()]) + .schema(schema.clone()) + .alias("sum_v") + .build() + .unwrap(); + AggregateExec::try_new( + AggregateMode::Partial, + group_by, + vec![Arc::new(sum)], + vec![None], + input, + schema, + ) + .unwrap() + } + + fn distinct_group_rows(batches: &[RecordBatch]) -> usize { + let mut seen: HashSet<(Option, Option)> = HashSet::new(); + for b in batches { + let g0 = b.column(0).as_any().downcast_ref::().unwrap(); + let g1 = b.column(1).as_any().downcast_ref::().unwrap(); + for r in 0..b.num_rows() { + let k0 = if g0.is_null(r) { + None + } else { + Some(g0.value(r).to_string()) + }; + let k1 = if g1.is_null(r) { + None + } else { + Some(g1.value(r)) + }; + seen.insert((k0, k1)); + } + } + seen.len() + } + + /// The no-trim (`k = 0`) partial path must group exactly like DataFusion's stock partial: every + /// distinct group present in the input must appear exactly once in the partial output, across + /// many input batches and an output larger than `batch_size`. Reproduces the multi-batch + /// undercount seen when routing the full hash aggregate through this exec. + #[tokio::test] + async fn no_trim_partial_emits_every_group() { + let n = 20_000; + let batches = make_batches(n, 4096); + let expected_groups = distinct_group_rows(&batches); + assert_eq!(expected_groups, n, "test setup: combos must be distinct"); + + let source = MemorySourceConfig::try_new(&vec![batches], input_schema(), None).unwrap(); + let input: Arc = Arc::new(DataSourceExec::new(Arc::new(source))); + let partial = partial_aggregate(input); + + let exec = GroupByLimitAggregateExec::try_new_from_partial(&partial, 0, 0, Vec::new()) + .expect("partial hash aggregate should be wrappable"); + + let ctx = Arc::new(TaskContext::default()); + let out = collect(Arc::new(exec), ctx).await.unwrap(); + + let total_rows: usize = out.iter().map(|b| b.num_rows()).sum(); + let distinct = distinct_group_rows(&out); + assert_eq!( + distinct, n, + "exec emitted {distinct} distinct groups, expected {n}" + ); + assert_eq!( + total_rows, n, + "exec emitted {total_rows} partial rows for {n} distinct groups (duplicate group keys in partial output)" + ); + } + + /// Our partial output fed into DataFusion's `Final` aggregate (as the router does) must still + /// yield every group. Reproduces the distributed undercount end-to-end without the cluster. + #[tokio::test] + async fn no_trim_partial_then_final_emits_every_group() { + let n = 20_000; + let batches = make_batches(n, 4096); + + let source = MemorySourceConfig::try_new(&vec![batches], input_schema(), None).unwrap(); + let input: Arc = Arc::new(DataSourceExec::new(Arc::new(source))); + let partial = partial_aggregate(input); + let aggr_expr = partial.aggr_expr().to_vec(); + + let exec: Arc = Arc::new( + GroupByLimitAggregateExec::try_new_from_partial(&partial, 0, 0, Vec::new()).unwrap(), + ); + + let partial_schema = exec.schema(); + let final_group_by = PhysicalGroupBy::new_single(vec![ + (col("g0", &partial_schema).unwrap(), "g0".to_string()), + (col("g1", &partial_schema).unwrap(), "g1".to_string()), + ]); + let final_agg = AggregateExec::try_new( + AggregateMode::Final, + final_group_by, + aggr_expr, + vec![None], + exec, + partial_schema, + ) + .unwrap(); + + let ctx = Arc::new(TaskContext::default()); + let out = collect(Arc::new(final_agg), ctx).await.unwrap(); + + let total_rows: usize = out.iter().map(|b| b.num_rows()).sum(); + let distinct = distinct_group_rows(&out); + assert_eq!( + distinct, n, + "final emitted {distinct} distinct groups, expected {n}" + ); + assert_eq!( + total_rows, n, + "final emitted {total_rows} rows, expected {n}" + ); + } + + /// A partial aggregate preserves its input's partitioning. If our exec keeps a stale + /// single-partition `cache` after `with_new_children` swaps in a multi-partition input, the + /// parent `CoalescePartitions` executes only partition 0 and silently drops the rest -- the + /// distributed undercount we hit on real data. Build the exec over a 1-partition input (so its + /// cache says 1), re-child it onto a 3-partition input, and require both the reported + /// partitioning and the aggregated rows to reflect all 3 partitions. + #[tokio::test] + async fn output_partitioning_follows_rechilded_input() { + use datafusion::physical_plan::coalesce_partitions::CoalescePartitionsExec; + + let n = 5_000; + let one_part = + MemorySourceConfig::try_new(&vec![make_batches(n, 4096)], input_schema(), None) + .unwrap(); + let one_input: Arc = Arc::new(DataSourceExec::new(Arc::new(one_part))); + let partial = partial_aggregate(one_input); + let aggr_expr = partial.aggr_expr().to_vec(); + let exec: Arc = Arc::new( + GroupByLimitAggregateExec::try_new_from_partial(&partial, 0, 0, Vec::new()).unwrap(), + ); + assert_eq!(exec.output_partitioning().partition_count(), 1); + + // Re-child onto a 3-partition input (same schema), as a later physical-plan pass would. + let three_parts: Vec> = (0..3).map(|_| make_batches(n, 4096)).collect(); + let three = MemorySourceConfig::try_new(&three_parts, input_schema(), None).unwrap(); + let three_input: Arc = Arc::new(DataSourceExec::new(Arc::new(three))); + let exec3 = exec.with_new_children(vec![three_input]).unwrap(); + assert_eq!( + exec3.output_partitioning().partition_count(), + 3, + "exec must report its re-childed input's partition count, not a stale 1" + ); + + // End-to-end: coalesce + final must see every partition's rows. + let coalesced: Arc = Arc::new(CoalescePartitionsExec::new(exec3)); + let pschema = coalesced.schema(); + let final_gb = PhysicalGroupBy::new_single(vec![ + (col("g0", &pschema).unwrap(), "g0".to_string()), + (col("g1", &pschema).unwrap(), "g1".to_string()), + ]); + let final_agg = AggregateExec::try_new( + AggregateMode::Final, + final_gb, + aggr_expr, + vec![None], + coalesced, + pschema, + ) + .unwrap(); + let out = collect(Arc::new(final_agg), Arc::new(TaskContext::default())) + .await + .unwrap(); + let distinct = distinct_group_rows(&out); + let total_v: i64 = out + .iter() + .map(|b| { + let s = b.column(2).as_any().downcast_ref::().unwrap(); + (0..s.len()).map(|i| s.value(i)).sum::() + }) + .sum(); + assert_eq!( + distinct, n, + "all {n} groups must survive across 3 partitions" + ); + // make_batches emits each combo twice; 3 partitions -> 6 rows per group. + assert_eq!( + total_v as usize, + 6 * n, + "all rows from all 3 partitions must be aggregated" + ); + } + + /// Real-data shape: a high-cardinality `Float64` group column (like `contrib`) at ~400k groups, + /// fed partial -> final. Floats route through the same multi-column `GroupValues` as strings, but + /// at this scale across many input batches; reproduces (or rules out) the on-cluster undercount. + #[tokio::test] + async fn no_trim_float_high_cardinality_partial_then_final() { + use datafusion::arrow::array::Float64Array; + + let n: usize = 410_000; + let batch_rows = 8192; + let schema = Arc::new(Schema::new(vec![ + Field::new("f", DataType::Float64, true), + Field::new("g", DataType::Utf8, true), + Field::new("v", DataType::Int64, false), + ])); + + // n distinct (f, g) combos, each twice (forward + reversed pass) so duplicates land in + // different batches; every 9th combo carries NULL f. + let mut f: Vec> = Vec::new(); + let mut g: Vec> = Vec::new(); + for pass in 0..2 { + let iter: Box> = if pass == 0 { + Box::new(0..n) + } else { + Box::new((0..n).rev()) + }; + for i in iter { + f.push(if i % 9 == 0 { + None + } else { + Some(i as f64 * 0.01) + }); + g.push(Some(format!("g{}", i))); + } + } + let total = f.len(); + let mut batches = Vec::new(); + let mut start = 0; + while start < total { + let end = (start + batch_rows).min(total); + batches.push( + RecordBatch::try_new( + schema.clone(), + vec![ + Arc::new(Float64Array::from(f[start..end].to_vec())), + Arc::new(StringArray::from(g[start..end].to_vec())), + Arc::new(Int64Array::from(vec![1i64; end - start])), + ], + ) + .unwrap(), + ); + start = end; + } + + let source = MemorySourceConfig::try_new(&vec![batches], schema.clone(), None).unwrap(); + let input: Arc = Arc::new(DataSourceExec::new(Arc::new(source))); + let group_by = PhysicalGroupBy::new_single(vec![ + (col("f", &schema).unwrap(), "f".to_string()), + (col("g", &schema).unwrap(), "g".to_string()), + ]); + let sum = AggregateExprBuilder::new(sum_udaf(), vec![col("v", &schema).unwrap()]) + .schema(schema.clone()) + .alias("sum_v") + .build() + .unwrap(); + let partial = AggregateExec::try_new( + AggregateMode::Partial, + group_by, + vec![Arc::new(sum)], + vec![None], + input, + schema.clone(), + ) + .unwrap(); + let aggr_expr = partial.aggr_expr().to_vec(); + + let exec: Arc = Arc::new( + GroupByLimitAggregateExec::try_new_from_partial(&partial, 0, 0, Vec::new()).unwrap(), + ); + let partial_schema = exec.schema(); + let final_group_by = PhysicalGroupBy::new_single(vec![ + (col("f", &partial_schema).unwrap(), "f".to_string()), + (col("g", &partial_schema).unwrap(), "g".to_string()), + ]); + let final_agg = AggregateExec::try_new( + AggregateMode::Final, + final_group_by, + aggr_expr, + vec![None], + exec, + partial_schema, + ) + .unwrap(); + + let ctx = Arc::new(TaskContext::default()); + let out = collect(Arc::new(final_agg), ctx).await.unwrap(); + let total_rows: usize = out.iter().map(|b| b.num_rows()).sum(); + assert_eq!( + total_rows, n, + "final emitted {total_rows} rows, expected {n}" + ); + } + + /// The worker serializes each partial-output batch to the router with Arrow IPC + /// (`StreamWriter`). Our exec emits slices of one big `emit(All)` batch (offset > 0), so the IPC + /// roundtrip must preserve every group of every sliced batch. Reproduces the distributed + /// undercount, which the in-process tests miss because they never serialize. + #[tokio::test] + async fn ipc_roundtrip_of_sliced_output_preserves_every_group() { + use datafusion::arrow::ipc::reader::StreamReader; + use datafusion::arrow::ipc::writer::StreamWriter; + use std::io::Cursor; + + let n = 20_000; + let batches = make_batches(n, 4096); + let source = MemorySourceConfig::try_new(&vec![batches], input_schema(), None).unwrap(); + let input: Arc = Arc::new(DataSourceExec::new(Arc::new(source))); + let partial = partial_aggregate(input); + let exec: Arc = Arc::new( + GroupByLimitAggregateExec::try_new_from_partial(&partial, 0, 0, Vec::new()).unwrap(), + ); + let schema = exec.schema(); + + let ctx = Arc::new(TaskContext::default()); + let out = collect(Arc::clone(&exec), ctx).await.unwrap(); + assert!( + out.len() > 1, + "test must exercise multiple sliced output batches" + ); + + let mut roundtripped = Vec::new(); + for batch in &out { + let mut buf = Vec::new(); + { + let mut w = StreamWriter::try_new(Cursor::new(&mut buf), &schema).unwrap(); + w.write(batch).unwrap(); + w.finish().unwrap(); + } + let mut r = StreamReader::try_new(Cursor::new(buf), None).unwrap(); + roundtripped.push(r.next().unwrap().unwrap()); + } + + let distinct = distinct_group_rows(&roundtripped); + assert_eq!( + distinct, n, + "after IPC roundtrip of sliced batches, {distinct} distinct groups survived, expected {n}" + ); + } + + /// A global aggregate (no GROUP BY) has zero group columns; wrapping it would build a + /// `GroupValues` over an empty schema and panic on `intern`. `try_new_from_partial` must decline + /// it so the full hash routing leaves global aggregates to DataFusion. + #[tokio::test] + async fn global_aggregate_is_not_wrapped() { + let batches = make_batches(100, 64); + let source = MemorySourceConfig::try_new(&vec![batches], input_schema(), None).unwrap(); + let input: Arc = Arc::new(DataSourceExec::new(Arc::new(source))); + let schema = input.schema(); + + let sum = AggregateExprBuilder::new(sum_udaf(), vec![col("v", &schema).unwrap()]) + .schema(schema.clone()) + .alias("sum_v") + .build() + .unwrap(); + let global = AggregateExec::try_new( + AggregateMode::Partial, + PhysicalGroupBy::new_single(vec![]), + vec![Arc::new(sum)], + vec![None], + input, + schema, + ) + .unwrap(); + + assert!( + GroupByLimitAggregateExec::try_new_from_partial(&global, 0, 0, Vec::new()).is_none(), + "global aggregate (no group columns) must not be wrapped" + ); + } + + /// Mirror the distributed worker shape: a multi-partition scan, our exec running once per + /// partition, then `CoalescePartitions` and the `Final` aggregate. Every group must survive even + /// though the same key appears in several partitions' partial outputs. + #[tokio::test] + async fn no_trim_multi_partition_then_final_emits_every_group() { + use datafusion::physical_plan::coalesce_partitions::CoalescePartitionsExec; + + let n = 20_000; + // Same n combos in each of 4 partitions -> heavy cross-partition group overlap. + let partitions: Vec> = (0..4).map(|_| make_batches(n, 4096)).collect(); + + let source = MemorySourceConfig::try_new(&partitions, input_schema(), None).unwrap(); + let input: Arc = Arc::new(DataSourceExec::new(Arc::new(source))); + let partial = partial_aggregate(input); + let aggr_expr = partial.aggr_expr().to_vec(); + + let exec: Arc = Arc::new( + GroupByLimitAggregateExec::try_new_from_partial(&partial, 0, 0, Vec::new()).unwrap(), + ); + assert_eq!( + exec.output_partitioning().partition_count(), + 4, + "exec should preserve the input's partition count" + ); + let coalesced: Arc = Arc::new(CoalescePartitionsExec::new(exec)); + + let partial_schema = coalesced.schema(); + let final_group_by = PhysicalGroupBy::new_single(vec![ + (col("g0", &partial_schema).unwrap(), "g0".to_string()), + (col("g1", &partial_schema).unwrap(), "g1".to_string()), + ]); + let final_agg = AggregateExec::try_new( + AggregateMode::Final, + final_group_by, + aggr_expr, + vec![None], + coalesced, + partial_schema, + ) + .unwrap(); + + let ctx = Arc::new(TaskContext::default()); + let out = collect(Arc::new(final_agg), ctx).await.unwrap(); + + let total_rows: usize = out.iter().map(|b| b.num_rows()).sum(); + let distinct = distinct_group_rows(&out); + assert_eq!( + distinct, n, + "final emitted {distinct} distinct groups, expected {n}" + ); + assert_eq!( + total_rows, n, + "final emitted {total_rows} rows, expected {n}" + ); + } +} diff --git a/rust/cubestore/cubestore/src/queryplanner/mod.rs b/rust/cubestore/cubestore/src/queryplanner/mod.rs index c4d4742312e4d..c02929c32cf88 100644 --- a/rust/cubestore/cubestore/src/queryplanner/mod.rs +++ b/rust/cubestore/cubestore/src/queryplanner/mod.rs @@ -10,6 +10,7 @@ use datafusion_datasource::memory::MemorySourceConfig; use datafusion_datasource::source::DataSourceExec; pub use planning::PlanningMeta; mod check_memory; +mod group_by_limit_aggregate; pub mod physical_plan_flags; pub mod pretty_printers; mod projection_above_limit; diff --git a/rust/cubestore/cubestore/src/queryplanner/optimizations/distributed_partial_aggregate.rs b/rust/cubestore/cubestore/src/queryplanner/optimizations/distributed_partial_aggregate.rs index 5246b1878f132..a1dd9586a027b 100644 --- a/rust/cubestore/cubestore/src/queryplanner/optimizations/distributed_partial_aggregate.rs +++ b/rust/cubestore/cubestore/src/queryplanner/optimizations/distributed_partial_aggregate.rs @@ -1,5 +1,6 @@ use crate::cluster::WorkerPlanningParams; use crate::queryplanner::check_memory::CheckMemoryExec; +use crate::queryplanner::group_by_limit_aggregate::GroupByLimitAggregateExec; use crate::queryplanner::inline_aggregate::{InlineAggregateExec, InlineAggregateMode}; use crate::queryplanner::planning::WorkerExec; use crate::queryplanner::query_executor::ClusterSendExec; @@ -130,30 +131,31 @@ pub fn push_aggregate_to_workers( )?)) } -/// A global (no GROUP BY) aggregate doesn't use the input ordering, but the scan still merges -/// its partitions with a SortPreservingMergeExec when the index has a sort key (e.g. picked -/// from the filters for partition pruning). Replace such merges under the aggregate with plain -/// partition coalescing: the per-row key comparisons are pure waste there, and particularly -/// bad when the filters above make the merge keys constant, turning every comparison into a -/// full-length tie. +/// A Linear aggregate doesn't use input ordering, but the scan still merges its partitions with a +/// SortPreservingMergeExec when the index has a sort key (e.g. picked from the filters for partition +/// pruning). Replace such merges under the aggregate with plain partition coalescing: the per-row +/// key comparisons are pure waste there, and particularly bad when the leading sort columns are +/// constant (low-cardinality / NULL-heavy), turning every comparison into a full-length tie. /// -/// Restricted to aggregates without group expressions: those hold a single accumulator set per -/// partition even when later optimizations make them run per partition. A grouped hash -/// aggregate over unmerged partitions could end up with a hash table per partition, -/// multiplying its memory by the partition count. +/// Always applied to global (no GROUP BY) aggregates. A grouped hash aggregate is equally safe -- +/// CoalescePartitions still yields a single output partition, hence one hash table and no +/// per-partition memory blowup -- but it is gated behind `coalesce_grouped_hash` while the win is +/// validated. pub fn drop_sort_merge_under_global_aggregate( p: Arc, + coalesce_grouped_hash: bool, ) -> Result, DataFusionError> { let Some(agg) = p.as_any().downcast_ref::() else { return Ok(p); }; - if !matches!(agg.input_order_mode(), InputOrderMode::Linear) - || !agg.group_expr().expr().is_empty() - { + if !matches!(agg.input_order_mode(), InputOrderMode::Linear) { + return Ok(p); + } + if !agg.group_expr().expr().is_empty() && !coalesce_grouped_hash { return Ok(p); } - // Order-sensitive aggregates (first_value, array_agg(ORDER BY), ...) stay Linear with an - // empty GROUP BY but still need their input ordered + // Order-sensitive aggregates (first_value, array_agg(ORDER BY), ...) stay Linear but still + // require their input ordered -- leave their merge in place. if agg.required_input_ordering()[0].is_some() { return Ok(p); } @@ -286,13 +288,25 @@ pub fn push_sorted_partial_aggregate_below_merge( /// key) would drop partial states of tied groups. pub fn push_worker_sort_and_limit( p: Arc, + group_by_limit_factor: usize, + group_by_limit_per_partition: bool, ) -> Result, DataFusionError> { - // Worker side: wrap the partial aggregate with a per-partition bounded sort. + // Worker side: bound the partial aggregate's output. The sorted (inline) aggregate is always + // bounded with a per-partition Sort(fetch) -- the group count is unknown, so we can't trim, we + // just sort. The hash aggregate uses the trimming top-k, engaged only when factor > 0. + // `resort_worker_subtree` returns None when nothing applies (hash with factor == 0), leaving the + // plan as planned. if let Some(w) = p.as_any().downcast_ref::() { let Some((cols, fetch)) = w.worker_sort_and_limit.clone() else { return Ok(p); }; - let Some(new_input) = resort_worker_subtree(&w.input, &cols, fetch) else { + let Some(new_input) = resort_worker_subtree( + &w.input, + &cols, + fetch, + group_by_limit_factor, + group_by_limit_per_partition, + ) else { return Ok(p); }; return Ok(Arc::new(WorkerExec::new( @@ -307,10 +321,8 @@ pub fn push_worker_sort_and_limit( ))); } - // Router side: rebuild the final aggregate over a sort-preserving merge in `worker_order`, and - // reorder the (optimization-only) worker subtree to match. Same keys are adjacent in - // `worker_order`, so the sorted final combines them; its output is `worker_order`-sorted (whose - // prefix is the query's ORDER BY), so the limit above stays correct. + // Router side: combine the workers' top-k with a hash final aggregate, then re-apply the top-k + // sort by `worker_order` (the total order T). let Some(final_agg) = FinalAggregateInfo::extract(&p) else { return Ok(p); }; @@ -324,23 +336,57 @@ pub fn push_worker_sort_and_limit( let Some((cols, fetch)) = cs.worker_sort_and_limit.clone() else { return Ok(p); }; - let Some(new_worker_subtree) = resort_worker_subtree(&cs.input_for_optimizations, &cols, fetch) - else { + // The hash aggregate emits unordered trimmed groups (combined by a hash final + re-sort); the + // sorted/inline aggregate emits bounded sorted streams (combined by a sort-preserving merge + + // sorted final). Decide from the worker's partial aggregate before rebuilding. + let is_hash = locate_partial_aggregate(&cs.input_for_optimizations) + .map_or(false, |partial| partial.as_any().is::()); + let Some(new_worker_subtree) = resort_worker_subtree( + &cs.input_for_optimizations, + &cols, + fetch, + group_by_limit_factor, + group_by_limit_per_partition, + ) else { return Ok(p); }; let new_cs: Arc = Arc::new(cs.with_changed_schema(new_worker_subtree, cs.required_input_ordering.clone())); let worker_order = worker_ordering(&final_agg.group_expr, &cols)?; - let merged: Arc = - Arc::new(SortPreservingMergeExec::new(worker_order, new_cs)); - Ok(Arc::new(AggregateExec::try_new( - AggregateMode::Final, - final_agg.group_expr, - final_agg.aggr_expr, - final_agg.filter_expr, - merged, - final_agg.input_schema, - )?)) + + if is_hash { + // Hash final over coalesced (unordered) streams, then re-apply the top-k sort by the total + // order T. The Sort(fetch) is required even for a bare LIMIT: it keeps the k smallest by T -- + // exactly the groups every worker kept and fully combined here -- where a plain limit could + // take a group only one worker kept (undercounted). The coalesce drains the workers in + // parallel. + let coalesced: Arc = Arc::new(CoalescePartitionsExec::new(new_cs)); + let final_hash: Arc = Arc::new(AggregateExec::try_new( + AggregateMode::Final, + final_agg.group_expr, + final_agg.aggr_expr, + final_agg.filter_expr, + coalesced, + final_agg.input_schema, + )?); + Ok(Arc::new( + SortExec::new(worker_order, final_hash).with_fetch(Some(fetch)), + )) + } else { + // Sorted final over a sort-preserving merge in worker_order: equal keys are adjacent so the + // sorted final combines them, and its output stays worker_order-sorted (whose prefix is the + // query's ORDER BY), so the query's limit above stays correct -- no extra Sort needed. + let merged: Arc = + Arc::new(SortPreservingMergeExec::new(worker_order, new_cs)); + Ok(Arc::new(AggregateExec::try_new( + AggregateMode::Final, + final_agg.group_expr, + final_agg.aggr_expr, + final_agg.filter_expr, + merged, + final_agg.input_schema, + )?)) + } } /// Builds the `worker_order` LexOrdering over an aggregate's group columns from the descriptor. @@ -365,33 +411,94 @@ fn worker_ordering( Ok(LexOrdering::new(exprs)) } -/// Rebuilds a worker subtree as `SortPreservingMerge(worker_order) <- Sort(worker_order, fetch, per -/// partition) <- partial`. Returns `None` for an unrecognized or already-rewritten subtree, which -/// keeps [push_worker_sort_and_limit] idempotent. +/// Rebuilds a worker subtree to bound its output to the top `fetch` groups by the total order in +/// `cols`. Two shapes, by partial aggregate kind: +/// - hash (`AggregateExec`): `CoalescePartitions <- GroupByLimitAggregate` -- trim during +/// aggregation, emitted unsorted for the router's hash final. Only when `factor > 0`; returns +/// `None` otherwise (trimming disabled, leave the plan as planned). +/// - sorted/inline: `SortPreservingMerge(T) <- Sort(T, fetch, per partition) <- PartialAggregate` -- +/// we can't trim a sorted aggregate and don't know the group count, so always bound with a sort. /// -/// The per-partition `Sort` does the bounding (a bounded heap, O(fetch) memory); the merge above it -/// carries no fetch. Because this pass runs last, `replace_suboptimal_merge_sorts` has already run -/// and won't push the query's row limit into the merge -- which would cut the merged stream of -/// (still uncombined) partial rows by rows and undercount groups split across partitions. +/// Returns `None` for an unrecognized subtree (no locatable partial aggregate). fn resort_worker_subtree( worker_subtree: &Arc, cols: &[(usize, bool, bool)], fetch: usize, + group_by_limit_factor: usize, + group_by_limit_per_partition: bool, ) -> Option> { let partial = locate_partial_aggregate(worker_subtree)?; - let schema = partial.schema(); - let mut exprs = Vec::with_capacity(cols.len()); - for (idx, asc, nulls_first) in cols { - let field = schema.fields().get(*idx)?; - exprs.push(PhysicalSortExpr { - expr: Arc::new(Column::new(field.name(), *idx)), - options: SortOptions { - descending: !asc, - nulls_first: *nulls_first, + + // Hash path: trim during aggregation, emit unsorted for the router's hash final. The factor + // gates whether trimming applies; with it off, nothing applies on this path. + if partial.as_any().is::() { + if group_by_limit_factor == 0 { + return None; + } + + // CUBESTORE_GROUP_BY_LIMIT_PER_PARTITION decides where the worker's hash table lives: + // - off (default): coalesce the aggregate's input to a single partition, so the worker + // builds ONE hash table over the merged partitions ("over merge"). Peak memory ~k, no + // intra-worker parallelism on the aggregation. + // - on: keep the raw multi-partition input, so the aggregate runs once per partition + // ("under merge"): N parallel hash tables -- more parallelism, peak ~N*k. The trim keeps + // each per-partition output to ~factor*k, and the global top-k argument (full group key) + // guarantees a surviving group stays within every partition's local top-k. + let partial = if group_by_limit_per_partition { + partial + } else { + partial + .as_any() + .downcast_ref::() + .and_then(|agg| { + if agg.input().output_partitioning().partition_count() <= 1 { + return None; + } + let merged: Arc = + Arc::new(CoalescePartitionsExec::new(agg.input().clone())); + partial.clone().with_new_children(vec![merged]).ok() + }) + .unwrap_or(partial) + }; + + let order: Vec<(usize, SortOptions)> = cols + .iter() + .map(|(idx, asc, nulls_first)| { + ( + *idx, + SortOptions { + descending: !asc, + nulls_first: *nulls_first, + }, + ) + }) + .collect(); + let trimmed: Arc = match partial.as_any().downcast_ref::() + { + Some(agg) => match GroupByLimitAggregateExec::try_new_from_partial( + agg, + fetch, + group_by_limit_factor, + order, + ) { + Some(e) => Arc::new(e), + None => partial, }, - }); + None => partial, + }; + + // Emit the trimmed top-k unsorted, coalesced to one stream for the ClusterSend. The router + // hash-combines and re-applies the top-k sort, so no worker-side sort/merge holds the whole + // result. Under "over merge" the trimmed agg is already single-partition and this coalesce is + // a passthrough; under "under merge" it drains the per-partition aggregates concurrently. + return Some(Arc::new(CoalescePartitionsExec::new(trimmed))); } - let worker_order = LexOrdering::new(exprs); + + // Sorted/inline path: bound each partition with Sort(fetch) and merge in the total order. The + // per-partition `fetch` is sound because the key is the full group key: a globally top-`fetch` + // group stays within every partition's first `fetch`, so the router's sorted final still sees + // all its partial states (see this function's doc and the module note on the total order). + let worker_order = lex_ordering_from_cols(cols, &partial.schema())?; let per_partition_sort: Arc = Arc::new( SortExec::new(worker_order.clone(), partial) .with_fetch(Some(fetch)) @@ -403,6 +510,26 @@ fn resort_worker_subtree( ))) } +/// Build a `LexOrdering` over the partial aggregate's group columns from the descriptor (indices +/// into the partial output schema). Returns `None` if a column index is out of range. +fn lex_ordering_from_cols( + cols: &[(usize, bool, bool)], + schema: &datafusion::arrow::datatypes::SchemaRef, +) -> Option { + let mut exprs = Vec::with_capacity(cols.len()); + for (idx, asc, nulls_first) in cols { + let field = schema.fields().get(*idx)?; + exprs.push(PhysicalSortExpr { + expr: Arc::new(Column::new(field.name(), *idx)), + options: SortOptions { + descending: !asc, + nulls_first: *nulls_first, + }, + }); + } + Some(LexOrdering::new(exprs)) +} + /// The group/aggregate state of either an `InlineAggregateExec` or a plain `AggregateExec`, when in /// Final mode. struct FinalAggregateInfo { @@ -1310,7 +1437,7 @@ mod tests { ); let original = global_sum_aggregate(filter); - let rewritten = drop_sort_merge_under_global_aggregate(original.clone()).unwrap(); + let rewritten = drop_sort_merge_under_global_aggregate(original.clone(), false).unwrap(); let agg = rewritten.as_any().downcast_ref::().unwrap(); let filter = agg @@ -1365,7 +1492,7 @@ mod tests { .required_input_ordering()[0] .is_some()); - let rewritten = drop_sort_merge_under_global_aggregate(original.clone()).unwrap(); + let rewritten = drop_sort_merge_under_global_aggregate(original.clone(), false).unwrap(); assert!(Arc::ptr_eq(&rewritten, &original)); } @@ -1398,10 +1525,41 @@ mod tests { InputOrderMode::Linear )); - let rewritten = drop_sort_merge_under_global_aggregate(original.clone()).unwrap(); + let rewritten = drop_sort_merge_under_global_aggregate(original.clone(), false).unwrap(); assert!(Arc::ptr_eq(&rewritten, &original)); } + /// With the flag on, the same grouped hash aggregate's under-scan merge is replaced by plain + /// partition coalescing (single output partition -> one hash table), dropping the useless sort. + #[test] + fn coalesces_sort_merge_under_grouped_hash_aggregate_when_enabled() { + let schema = Arc::new(Schema::new(vec![ + Field::new("k", DataType::Int64, false), + Field::new("g", DataType::Int64, false), + Field::new("v", DataType::Int64, false), + ])); + let batch = RecordBatch::try_new( + schema.clone(), + vec![ + Arc::new(Int64Array::from(vec![1, 2])), + Arc::new(Int64Array::from(vec![5, 4])), + Arc::new(Int64Array::from(vec![10, 20])), + ], + ) + .unwrap(); + let source = sorted_source(&schema, vec![vec![batch.clone()], vec![batch]]); + let original = sum_aggregate(AggregateMode::Partial, "g", merge_by_k(source)); + + let rewritten = drop_sort_merge_under_global_aggregate(original.clone(), true).unwrap(); + // The aggregate stays grouped, but its input merge becomes a plain CoalescePartitionsExec + // (still a single output partition -> one hash table). + assert!(!Arc::ptr_eq(&rewritten, &original)); + assert!(rewritten.as_any().is::()); + assert!(rewritten.children()[0] + .as_any() + .is::()); + } + fn global_sum_aggregate(input: Arc) -> Arc { let schema = input.schema(); let sum = AggregateExprBuilder::new(sum_udaf(), vec![col("v", &schema).unwrap()]) @@ -1446,7 +1604,7 @@ mod tests { let source = two_partition_source(&schema); let original = sum_aggregate(AggregateMode::Partial, "k", merge_by_k(source)); - let rewritten = drop_sort_merge_under_global_aggregate(original.clone()).unwrap(); + let rewritten = drop_sort_merge_under_global_aggregate(original.clone(), false).unwrap(); assert!(Arc::ptr_eq(&rewritten, &original)); } diff --git a/rust/cubestore/cubestore/src/queryplanner/optimizations/group_by_limit_rewriter.rs b/rust/cubestore/cubestore/src/queryplanner/optimizations/group_by_limit_rewriter.rs new file mode 100644 index 0000000000000..c2ebe2a667b47 --- /dev/null +++ b/rust/cubestore/cubestore/src/queryplanner/optimizations/group_by_limit_rewriter.rs @@ -0,0 +1,252 @@ +use crate::queryplanner::group_by_limit_aggregate::GroupByLimitAggregateExec; +use crate::queryplanner::planning::WorkerExec; +use crate::queryplanner::query_executor::ClusterSendExec; +use datafusion::arrow::compute::SortOptions; +use datafusion::error::DataFusionError; +use datafusion::physical_expr::{LexOrdering, PhysicalSortExpr}; +use datafusion::physical_plan::aggregates::{AggregateExec, AggregateMode}; +use datafusion::physical_plan::coalesce_partitions::CoalescePartitionsExec; +use datafusion::physical_plan::expressions::Column; +use datafusion::physical_plan::limit::GlobalLimitExec; +use datafusion::physical_plan::sorts::sort::SortExec; +use datafusion::physical_plan::sorts::sort_preserving_merge::SortPreservingMergeExec; +use datafusion::physical_plan::{ExecutionPlan, InputOrderMode}; +use std::sync::Arc; + +/// Trim the worker-side partial hash aggregate to the top-k groups when the plan is +/// `LIMIT k` over `ORDER BY ` over a distributed hash aggregate. +/// +/// Correctness requires a TOTAL order over groups (`T = ORDER BY ++ remaining group-by columns`, +/// in group-by order) applied in TWO places that must agree: +/// - the worker cut: each worker keeps its local top-k by `T`; +/// - the router select: the global Sort + Limit must also order by `T`. +/// Under `T` the router's top-k equals the global top-k by `T`, and every worker that holds a +/// partial state for such a group keeps it (its local rank can only be smaller), so every needed +/// partial state reaches the router. Ordering the router by `T` instead of the bare `ORDER BY` does +/// not change the query contract: `ORDER BY` is a prefix of `T`, so the output stays validly +/// ordered and the tie order, left unspecified by the query, just becomes deterministic. +/// +/// We only rewrite when the plan matches exactly `Sort(/Limit) -> [passthrough] -> Final aggregate +/// -> [passthrough/cluster boundary] -> Partial hash aggregate`; anything else on the path (a +/// HAVING filter, a nested aggregate, a computed projection) makes us bail, so we never trim a plan +/// where the limit does not directly govern this aggregate. +/// +/// `factor` gates trimming at runtime (only when local groups exceed `factor * k`); `0` disables. +pub fn replace_with_group_by_limit_aggregate( + plan: Arc, + factor: usize, +) -> Result, DataFusionError> { + if factor == 0 { + return Ok(plan); + } + let Some(target) = analyze(&plan) else { + return Ok(plan); + }; + apply(plan, &target, factor) +} + +struct Target { + /// The router `SortExec` whose ordering must be extended to the total order. + sort: Arc, + /// The worker-side partial hash `AggregateExec` to replace with a trimming exec. + partial: Arc, + /// Tail of the total order to append to the router sort (over the sort's input schema). + router_tail: Vec, + /// Full total order over the partial output schema for the worker cut. + trim_order: Vec<(usize, SortOptions)>, + /// `k = limit + offset`. + k: usize, +} + +fn analyze(root: &Arc) -> Option { + // Peel an optional top GlobalLimit (carries the offset), then require a SortExec. + let (skip, extra_fetch, sort_node) = + if let Some(gl) = root.as_any().downcast_ref::() { + (gl.skip(), gl.fetch(), child(root)?) + } else { + (0, None, root.clone()) + }; + let sort = sort_node.as_any().downcast_ref::()?; + let order: Vec = sort.expr().iter().cloned().collect(); + if order.is_empty() { + return None; + } + // The worker must keep enough groups to cover `limit + offset`. When a top GlobalLimit carries + // the offset, DataFusion already folds `skip + limit` into the sort's fetch, so prefer it; + // otherwise fall back to the GlobalLimit's own `skip + fetch`. + let k = sort + .fetch() + .or_else(|| extra_fetch.map(|fetch| skip + fetch))?; + + // Sort -> [passthrough] -> Final aggregate. + let final_agg_node = descend_to_final_aggregate(sort.input().clone())?; + let final_agg = final_agg_node.as_any().downcast_ref::()?; + + // Final aggregate -> [passthrough/boundary] -> Partial hash aggregate. + let partial_node = descend_to_worker_partial(final_agg.input().clone())?; + let partial = partial_node.as_any().downcast_ref::()?; + if !partial.group_expr().is_single() + || matches!(partial.input_order_mode(), InputOrderMode::Sorted) + { + return None; + } + + let num_group_cols = partial.group_expr().output_exprs().len(); + if num_group_cols == 0 { + return None; + } + let partial_schema = partial.schema(); + let group_names: Vec = partial_schema + .fields() + .iter() + .take(num_group_cols) + .map(|f| f.name().clone()) + .collect(); + + // Map ORDER BY columns onto group-by columns (by name; robust to projections). + let mut used = vec![false; num_group_cols]; + let mut trim_order: Vec<(usize, SortOptions)> = Vec::with_capacity(num_group_cols); + for e in &order { + let column = e.expr.as_any().downcast_ref::()?; + let idx = group_names.iter().position(|n| n == column.name())?; + // A repeated ORDER BY column adds nothing to the total order; skip it. + if used[idx] { + continue; + } + used[idx] = true; + trim_order.push((idx, e.options)); + } + if trim_order.is_empty() { + return None; + } + + // Totalize: append the remaining group-by columns in group-by order. Build the matching tail + // for the router sort over its own (Final-output) schema, resolved by name. + let sort_input_schema = sort.input().schema(); + let mut router_tail: Vec = Vec::new(); + for (idx, is_used) in used.into_iter().enumerate() { + if is_used { + continue; + } + let name = &group_names[idx]; + let options = SortOptions::default(); + let sort_col_idx = sort_input_schema.index_of(name).ok()?; + router_tail.push(PhysicalSortExpr { + expr: Arc::new(Column::new(name, sort_col_idx)), + options, + }); + trim_order.push((idx, options)); + } + + Some(Target { + sort: sort_node, + partial: partial_node, + router_tail, + trim_order, + k, + }) +} + +fn apply( + node: Arc, + target: &Target, + factor: usize, +) -> Result, DataFusionError> { + let is_sort = Arc::ptr_eq(&node, &target.sort); + let is_partial = Arc::ptr_eq(&node, &target.partial); + + let new_children = node + .children() + .into_iter() + .map(|c| apply(c.clone(), target, factor)) + .collect::, _>>()?; + let node = node.with_new_children(new_children)?; + + if is_partial { + if let Some(agg) = node.as_any().downcast_ref::() { + if let Some(exec) = GroupByLimitAggregateExec::try_new_from_partial( + agg, + target.k, + factor, + target.trim_order.clone(), + ) { + return Ok(Arc::new(exec)); + } + } + // Leaving the full aggregate in place stays correct; the router still sorts by the total + // order, it just receives every group instead of the trimmed top-k. + return Ok(node); + } + + if is_sort { + if let Some(sort) = node.as_any().downcast_ref::() { + let mut exprs: Vec = sort.expr().iter().cloned().collect(); + exprs.extend(target.router_tail.iter().cloned()); + let new_sort = SortExec::new(LexOrdering::new(exprs), sort.input().clone()) + .with_preserve_partitioning(sort.preserve_partitioning()) + .with_fetch(sort.fetch()); + return Ok(Arc::new(new_sort)); + } + } + + Ok(node) +} + +/// Walk down single-child passthrough nodes (which preserve rows and grouping) until the first +/// `Final`/`FinalPartitioned` `AggregateExec`. Returns `None` if a non-passthrough node is hit +/// first (e.g. a filter or a computed projection). +fn descend_to_final_aggregate(mut node: Arc) -> Option> { + loop { + if let Some(agg) = node.as_any().downcast_ref::() { + return matches!( + agg.mode(), + AggregateMode::Final | AggregateMode::FinalPartitioned + ) + .then_some(node.clone()); + } + if is_row_passthrough(&node) { + node = child(&node)?; + } else { + return None; + } + } +} + +/// Walk down passthrough nodes from a `Final` aggregate's input to the worker-side `Partial` +/// aggregate, requiring that exactly one `ClusterSend`/`Worker` boundary is crossed. Returns `None` +/// if anything unexpected (a second aggregate, a filter, ...) is on the path. +fn descend_to_worker_partial(mut node: Arc) -> Option> { + let mut crossed_boundary = false; + loop { + if let Some(agg) = node.as_any().downcast_ref::() { + return (crossed_boundary && *agg.mode() == AggregateMode::Partial) + .then_some(node.clone()); + } + if node.as_any().is::() || node.as_any().is::() { + crossed_boundary = true; + node = child(&node)?; + } else if is_row_passthrough(&node) { + node = child(&node)?; + } else { + return None; + } + } +} + +/// Single-child nodes that pass rows through unchanged (preserving grouping), so a limit/sort above +/// them governs the aggregate below them. +fn is_row_passthrough(node: &Arc) -> bool { + let any = node.as_any(); + any.is::() + || any.is::() + || any.is::() + || any.is::() +} + +fn child(node: &Arc) -> Option> { + let children = node.children(); + if children.len() != 1 { + return None; + } + Some(children[0].clone()) +} diff --git a/rust/cubestore/cubestore/src/queryplanner/optimizations/mod.rs b/rust/cubestore/cubestore/src/queryplanner/optimizations/mod.rs index edf902b44d3e5..ccf44b753d11e 100644 --- a/rust/cubestore/cubestore/src/queryplanner/optimizations/mod.rs +++ b/rust/cubestore/cubestore/src/queryplanner/optimizations/mod.rs @@ -1,5 +1,6 @@ mod check_memory; mod distributed_partial_aggregate; +mod group_by_limit_rewriter; mod inline_aggregate_rewriter; pub mod is_not_distinct_from_join_keys; pub mod rewrite_plan; @@ -13,6 +14,7 @@ use crate::queryplanner::optimizations::distributed_partial_aggregate::{ push_aggregate_to_workers, push_sorted_partial_aggregate_below_merge, push_worker_sort_and_limit, replace_suboptimal_merge_sorts, }; +use crate::queryplanner::optimizations::group_by_limit_rewriter::replace_with_group_by_limit_aggregate; use crate::queryplanner::optimizations::inline_aggregate_rewriter::replace_with_inline_aggregate; use crate::queryplanner::planning::CubeExtensionPlanner; use crate::queryplanner::pretty_printers::{pp_phys_plan_ext, PPOptions}; @@ -43,6 +45,8 @@ pub struct CubeQueryPlanner { serialized_plan: Arc, memory_handler: Arc, data_loaded_size: Option>, + group_by_limit_factor: usize, + group_by_limit_per_partition: bool, } impl CubeQueryPlanner { @@ -50,6 +54,8 @@ impl CubeQueryPlanner { cluster: Arc, serialized_plan: Arc, memory_handler: Arc, + group_by_limit_factor: usize, + group_by_limit_per_partition: bool, ) -> CubeQueryPlanner { CubeQueryPlanner { cluster: Some(cluster), @@ -57,6 +63,8 @@ impl CubeQueryPlanner { serialized_plan, memory_handler, data_loaded_size: None, + group_by_limit_factor, + group_by_limit_per_partition, } } @@ -65,6 +73,8 @@ impl CubeQueryPlanner { worker_planning_params: WorkerPlanningParams, memory_handler: Arc, data_loaded_size: Option>, + group_by_limit_factor: usize, + group_by_limit_per_partition: bool, ) -> CubeQueryPlanner { CubeQueryPlanner { serialized_plan, @@ -72,6 +82,8 @@ impl CubeQueryPlanner { worker_partition_count: Some(worker_planning_params), memory_handler, data_loaded_size, + group_by_limit_factor, + group_by_limit_per_partition, } } } @@ -104,6 +116,8 @@ impl QueryPlanner for CubeQueryPlanner { self.memory_handler.clone(), self.data_loaded_size.clone(), ctx_state.config().options(), + self.group_by_limit_factor, + self.group_by_limit_per_partition, ); result } @@ -112,12 +126,20 @@ impl QueryPlanner for CubeQueryPlanner { #[derive(Debug)] pub struct PreOptimizeRule { push_partial_aggregate_below_merge: bool, + group_by_limit_factor: usize, + coalesce_under_hash_aggregate: bool, } impl PreOptimizeRule { - pub fn new(push_partial_aggregate_below_merge: bool) -> Self { + pub fn new( + push_partial_aggregate_below_merge: bool, + group_by_limit_factor: usize, + coalesce_under_hash_aggregate: bool, + ) -> Self { Self { push_partial_aggregate_below_merge, + group_by_limit_factor, + coalesce_under_hash_aggregate, } } } @@ -128,7 +150,12 @@ impl PhysicalOptimizerRule for PreOptimizeRule { plan: Arc, _config: &ConfigOptions, ) -> datafusion::common::Result> { - pre_optimize_physical_plan(plan, self.push_partial_aggregate_below_merge) + pre_optimize_physical_plan( + plan, + self.push_partial_aggregate_below_merge, + self.group_by_limit_factor, + self.coalesce_under_hash_aggregate, + ) } fn name(&self) -> &str { @@ -143,6 +170,8 @@ impl PhysicalOptimizerRule for PreOptimizeRule { fn pre_optimize_physical_plan( p: Arc, push_partial_aggregate_below_merge: bool, + group_by_limit_factor: usize, + coalesce_under_hash_aggregate: bool, ) -> Result, DataFusionError> { let p = rewrite_physical_plan(p, &mut |p| push_aggregate_to_workers(p))?; @@ -158,12 +187,20 @@ fn pre_optimize_physical_plan( p }; - // Global (no GROUP BY) aggregates don't need their input merged in the sort order - let p = rewrite_physical_plan(p, &mut |p| drop_sort_merge_under_global_aggregate(p))?; + // Global (no GROUP BY) aggregates -- and, when enabled, grouped hash aggregates -- don't need + // their input merged in the sort order. + let p = rewrite_physical_plan(p, &mut |p| { + drop_sort_merge_under_global_aggregate(p, coalesce_under_hash_aggregate) + })?; // Replace sorted AggregateExec with InlineAggregateExec for better performance let p = rewrite_physical_plan(p, &mut |p| replace_with_inline_aggregate(p))?; + // Trim the worker-side partial hash aggregate to the top-k groups when the query orders by a + // subset of group-by columns and has a limit. Runs after inline-aggregate replacement so it + // only sees the remaining (hash) partial aggregates. + let p = replace_with_group_by_limit_aggregate(p, group_by_limit_factor)?; + Ok(p) } @@ -173,6 +210,8 @@ fn finalize_physical_plan( memory_handler: Arc, data_loaded_size: Option>, config: &ConfigOptions, + group_by_limit_factor: usize, + group_by_limit_per_partition: bool, ) -> Result, DataFusionError> { let p = rewrite_physical_plan(p, &mut |p| add_check_memory_exec(p, memory_handler.clone()))?; log::trace!( @@ -201,7 +240,9 @@ fn finalize_physical_plan( // Last: bound worker memory for ORDER BY LIMIT that isn't an index prefix. Runs // after replace_suboptimal_merge_sorts so it doesn't push the query's row limit into the // worker merge we add (which would cut uncombined partial rows and undercount). - let p = rewrite_physical_plan(p, &mut |p| push_worker_sort_and_limit(p))?; + let p = rewrite_physical_plan(p, &mut |p| { + push_worker_sort_and_limit(p, group_by_limit_factor, group_by_limit_per_partition) + })?; log::trace!( "Rewrote physical plan by push_worker_sort_and_limit:\n{}", pp_phys_plan_ext(p.as_ref(), &PPOptions::show_nonmeta()) diff --git a/rust/cubestore/cubestore/src/queryplanner/planning.rs b/rust/cubestore/cubestore/src/queryplanner/planning.rs index 9c9cd7b352df6..34c1d6c5be715 100644 --- a/rust/cubestore/cubestore/src/queryplanner/planning.rs +++ b/rust/cubestore/cubestore/src/queryplanner/planning.rs @@ -1143,24 +1143,30 @@ impl ChooseIndex<'_> { return None; } let limit = ctx.limit?; - let sort = ctx.sort.as_ref().filter(|s| !s.is_empty())?; let group_by = ctx.group_by.as_ref().filter(|g| !g.is_empty())?; - // Every ORDER BY column must be a group-by column; map it to its group-key position. + // Every ORDER BY column must be a group-by column; map it to its group-key position. A bare + // LIMIT (no ORDER BY) leaves the prefix empty, so the total order is the full group key in + // group-by order -- "any n groups" becomes "the n smallest by group key", still valid. let mut cols: Vec<(usize, bool, bool)> = Vec::with_capacity(group_by.len()); let mut used = vec![false; group_by.len()]; - for name in sort { - let idx = group_by.iter().position(|g| g == name)?; - if used[idx] { - continue; + if let Some(sort) = ctx.sort.as_ref().filter(|s| !s.is_empty()) { + for name in sort { + let idx = group_by.iter().position(|g| g == name)?; + if used[idx] { + continue; + } + used[idx] = true; + cols.push((idx, ctx.sort_is_asc, !ctx.sort_is_asc)); } - used[idx] = true; - cols.push((idx, ctx.sort_is_asc, !ctx.sort_is_asc)); } // Extend with the remaining group keys to make it a total order on the full group key. + // Their NULL placement is arbitrary (these columns are not in the query's ORDER BY), but it + // must match the rewriter's appended-column order (`SortOptions::default()`, i.e. ascending + // nulls-first) so the worker cut and the router select agree on the total order. for (idx, is_used) in used.iter().enumerate() { if !is_used { - cols.push((idx, true, false)); + cols.push((idx, true, true)); } } Some((cols, limit)) @@ -1833,6 +1839,7 @@ fn pull_up_cluster_send(mut p: LogicalPlan) -> Result Result Result { let lsend; diff --git a/rust/cubestore/cubestore/src/queryplanner/pretty_printers.rs b/rust/cubestore/cubestore/src/queryplanner/pretty_printers.rs index 63d386add4951..3b50620742de7 100644 --- a/rust/cubestore/cubestore/src/queryplanner/pretty_printers.rs +++ b/rust/cubestore/cubestore/src/queryplanner/pretty_printers.rs @@ -28,6 +28,7 @@ use std::sync::Arc; use crate::queryplanner::check_memory::CheckMemoryExec; use crate::queryplanner::filter_by_key_range::FilterByKeyRangeExec; +use crate::queryplanner::group_by_limit_aggregate::GroupByLimitAggregateExec; use crate::queryplanner::inline_aggregate::{InlineAggregateExec, InlineAggregateMode}; use crate::queryplanner::merge_sort::LastRowByUniqueKeyExec; use crate::queryplanner::panic::{PanicWorkerExec, PanicWorkerNode}; @@ -617,6 +618,16 @@ fn pp_phys_plan_indented(p: &dyn ExecutionPlan, indent: usize, o: &PPOptions, ou if let Some(limit) = agg.limit() { *out += &format!(", limit: {}", limit) } + } else if let Some(agg) = a.downcast_ref::() { + *out += &format!( + "GroupByLimitAggregate, k: {}, factor: {}, order: {:?}", + agg.k(), + agg.factor(), + agg.order() + ); + if o.show_aggregations { + *out += &format!(", aggs: {:?}", agg.aggr_expr()) + } } else if let Some(l) = a.downcast_ref::() { *out += &format!("LocalLimit, n: {}", l.fetch()); } else if let Some(l) = a.downcast_ref::() { diff --git a/rust/cubestore/cubestore/src/queryplanner/query_executor.rs b/rust/cubestore/cubestore/src/queryplanner/query_executor.rs index 528f268f7048f..c049b44c5b0ed 100644 --- a/rust/cubestore/cubestore/src/queryplanner/query_executor.rs +++ b/rust/cubestore/cubestore/src/queryplanner/query_executor.rs @@ -226,7 +226,7 @@ pub struct QueryExecutorImpl { metadata_cache_factory: Arc, parquet_metadata_cache: Arc, memory_handler: Arc, - push_partial_aggregate_below_merge: bool, + config: Arc, } crate::di_service!(QueryExecutorImpl, [QueryExecutor]); @@ -547,13 +547,13 @@ impl QueryExecutorImpl { metadata_cache_factory: Arc, parquet_metadata_cache: Arc, memory_handler: Arc, - push_partial_aggregate_below_merge: bool, + config: Arc, ) -> Arc { Arc::new(QueryExecutorImpl { metadata_cache_factory, parquet_metadata_cache, memory_handler, - push_partial_aggregate_below_merge, + config, }) } @@ -567,6 +567,8 @@ impl QueryExecutorImpl { cluster, serialized_plan, self.memory_handler.clone(), + self.config.group_by_limit_factor(), + self.config.group_by_limit_per_partition(), )) } @@ -582,6 +584,8 @@ impl QueryExecutorImpl { worker_planning_params, self.memory_handler.clone(), data_loaded_size.clone(), + self.config.group_by_limit_factor(), + self.config.group_by_limit_per_partition(), )) } @@ -603,7 +607,9 @@ impl QueryExecutorImpl { vec![ // Cube rules Arc::new(PreOptimizeRule::new( - self.push_partial_aggregate_below_merge, + self.config.push_partial_aggregate_below_merge_enabled(), + self.config.group_by_limit_factor(), + self.config.coalesce_under_hash_aggregate(), )), // DF rules without EnforceDistribution. We do need to keep EnforceSorting. Arc::new(OutputRequirements::new_add_mode()), diff --git a/rust/cubestore/cubestore/src/sql/mod.rs b/rust/cubestore/cubestore/src/sql/mod.rs index 472465ade9279..4a444123b6501 100644 --- a/rust/cubestore/cubestore/src/sql/mod.rs +++ b/rust/cubestore/cubestore/src/sql/mod.rs @@ -6487,7 +6487,9 @@ mod tests { } } - // Test 4: ORDER BY 1 DESC with LIMIT on non-prefix column + // Test 4: ORDER BY DESC + LIMIT on a non-prefix column, grouped by a + // non-prefix column (hash aggregate). The hash path bounds the worker + // output with the trimming aggregate, not a Sort. { let result = service .exec_query( @@ -6504,8 +6506,9 @@ mod tests { _ => panic!("expected string"), }; assert!( - worker_plan.contains("Sort, fetch: 2"), - "Worker should have Sort with fetch=2 for DESC. Plan: {}", + worker_plan.contains("GroupByLimitAggregate"), + "Hash-aggregate worker should bound output with \ + GroupByLimitAggregate. Plan: {}", worker_plan ); }