perf(cubestore): worker-side group-by-limit hash-aggregate trim#11144
perf(cubestore): worker-side group-by-limit hash-aggregate trim#11144waralexrom wants to merge 16 commits into
Conversation
…topk), wip behind flag Brings the worker-side partial hash-aggregate trim for `GROUP BY <non-index-prefix> ORDER BY <subset of group-by> LIMIT k` onto this branch, renamed from the misleading "TopK" naming (it trims by group-key order, not by an aggregate top-N): - GroupByLimitAggregateExec + group_by_limit_aggregate module - group_by_limit_rewriter - config group_by_limit_factor / CUBESTORE_GROUP_BY_LIMIT_FACTOR (default 2) Reuses DataFusion's GroupValues building blocks; the only fork change is pub new_group_values (df branch cubestore-hash-aggregate-limit). WIP: the trim is planted during router planning but is NOT yet carried across the ClusterSend boundary to the worker's independent physical re-plan, so it does not engage at distributed execution yet. Next: cluster-boundary carry. (cherry picked from commit d3aa25f)
…via worker_sort_and_limit The rewriter only plants GroupByLimitAggregate during ROUTER planning, but the worker re-plans physical from logical independently, so the trim never reached execution. Instead of a new proto, reuse the existing worker_sort_and_limit carry (already serialized across ClusterSend): when group_by_limit_factor > 0, resort_worker_subtree now wraps the worker partial hash aggregate in GroupByLimitAggregateExec (trim during aggregation, bounded O(factor*k) memory) instead of a SortExec over the full partial; the Sort above still orders the <= k surviving groups so the router's sort-preserving merge stays correct. group_by_limit_factor (CUBESTORE_GROUP_BY_LIMIT_FACTOR, default 2) is threaded router->worker via CubeQueryPlanner; 0 keeps the previous sort-then-trim. Verified on the production-dump stend: with factor>0 the worker EXPLAIN ANALYZE shows GroupByLimitAggregate (was LinearPartialAggregate) and results are identical. Value is bounded hash-table memory (OOM avoidance) on high-cardinality group-bys, not speed on small inputs. WIP: only fires when worker_sort_and_limit fires, i.e. single-table `ORDER BY <group subset> LIMIT k`. UNION-of-tables (the prod query.sql shape) does not populate worker_sort_and_limit yet (separate ctx-propagation gap). (cherry picked from commit f6d0995)
…ough UNION The worker group-by-limit trim previously required an ORDER BY and was dropped when the per-branch ClusterSends of a UNION were merged. Now: - compute_worker_sort_and_limit handles a bare LIMIT (no ORDER BY): the total order is the full group key in group-by order, so "any n" becomes "the n smallest by group key" -- a valid deterministic choice. An ORDER BY prefix, when present, still sorts first. - pull_up_cluster_send preserves worker_sort_and_limit across a UNION (the same group-by/limit context descends to every branch, so the descriptors are positionally identical and stay valid over the union); kept only when all branches agree. Adds planning coverage (bare LIMIT, UNION ALL + bare LIMIT) and a bare-LIMIT execution assertion to topk_hash_aggregate_trim; excludes that test from the migration harness (new test, no recorded fixture). (cherry picked from commit 65420c6)
…-gated) Behind CUBESTORE_GROUP_BY_LIMIT_HASH_FINAL (default off, gated on group_by_limit_factor > 0): route the worker top-k through a router-side hash final aggregate instead of the sorted-merge framework. - Worker subtree: CoalescePartitions <- GroupByLimitAggregate (drops the per-partition SortExec and SortPreservingMerge; emits the trimmed top-k unsorted). CoalescePartitions spawns a task per input partition, so the per-union-branch aggregates run in parallel -- the sorted merge drained them on a single task. - Router: SortExec(T, fetch=k) <- AggregateExec(Final, hash) <- CoalescePartitions <- ClusterSend. The explicit top-k sort by the full total order T is required even for a bare LIMIT: it keeps exactly the groups every worker kept (fully combined here), where a plain limit could take an undercounted group only one worker retained. On the production-dump UNION query (GROUP BY 6 cols, LIMIT 10): ~2.1x faster (~3.1s -> ~1.5s) at neutral peak RSS, because the worker now uses ~2 cores instead of 1. Same result as the sorted path (ORDER BY and bare LIMIT verified). The flag must be set fleet-wide: worker and router re-plan independently, and a worker-on/router-off split would feed unsorted streams into a sorted merge. (cherry picked from commit 4c9d5a6)
…lag-gated) Behind CUBESTORE_GROUP_BY_LIMIT_PER_PARTITION (default off; only active with CUBESTORE_GROUP_BY_LIMIT_HASH_FINAL and group_by_limit_factor > 0): strip the CoalescePartitions below the trimmed worker aggregate so it runs over every raw CubeTableExec partition instead of one stream per union branch. The hash-final CoalescePartitions on top then parallelizes all partitions. Per-partition top-k stays complete by the same total-order argument as the per-worker cut: a group in the global top-k by T has fewer than k smaller-keyed groups globally, hence in any single partition, so it survives every partition's local top-k and reaches the router fully combined. The strip only removes CoalescePartitions, never the SortPreservingMerge that feeds the single-partition LastRowByUniqueKeyExec, so unique-key tables stay correct. On the production-dump UNION query: ~799ms vs ~1504ms (hash-final 3-way) and ~2159ms (master), ~9 cores at peak, peak RSS still neutral because the index-sorted partitions are key-local (small per-partition tables). Memory is bounded by partition count, so a group key spread across partitions would cost ~N x; gated off by default. (cherry picked from commit d430167)
… path The sorted-merge worker top-k (per-partition SortExec + SortPreservingMerge -> router SortedFinalAggregate) drained the worker partitions on a single task, so it ran on one core and was slower than master, which keeps a parallel CoalescePartitions over the partial aggregate. The hash-final path (worker CoalescePartitions -> router hash Final + Sort(T, fetch=k)) is strictly better: same memory, ~2x faster because CoalescePartitions parallelizes the per-partition aggregates. So remove the sorted variant and the CUBESTORE_GROUP_BY_LIMIT_HASH_FINAL flag entirely; hash-final is now how the trimmed top-k is always combined. Two knobs remain: - group_by_limit_factor (env CUBESTORE_GROUP_BY_LIMIT_FACTOR, >0): whether to use the trimming aggregate at all. 0 leaves the plan untouched (master behavior). - CUBESTORE_GROUP_BY_LIMIT_PER_PARTITION: whether to push it below the merge (per-partition N-way) -- still experimental, default off. push_worker_sort_and_limit now returns early when factor == 0. Existing planning/execution/limit_pushdown/union tests pass against the hash-final plan. (cherry picked from commit e52d56a)
Adapted from e8a2a90 (dropped the dictionary-encoding default; dictionaries are not ported to this branch).
Non-dictionary subset of 4f9f439: - per-partition: strip only the leading CoalescePartitionsExec feeding the aggregate, not every one in the subtree (preserves UNION/mixed-partitioning plan semantics) - GroupByLimitAggregateExec::statistics: report an inexact upper bound (min(input_rows, factor*k*partitions)) instead of Absent - drop unused col_idx_base from aggregate_expressions (Partial-only) - comment: duplicate ORDER BY skip
…ushdown Sort not emitted) (cherry picked from commit cbff840)
…t path Cherry-pick of 4bb1499 (without the RocksDB test scratch dirs it had accidentally committed; .gitignore pattern from c6faaed folded in). Split resort_worker_subtree by partial aggregate kind: - inline/sorted: SortPreservingMerge(T) <- Sort(T, fetch, per partition) -- can't trim a sorted aggregate, so always bound with a sort. - hash: the trimming GroupByLimitAggregate + coalesce (factor-aware), router does the hash final + Sort(fetch). Drop the top-level factor==0 early return so the inline bounding applies regardless of the trim factor (factor only gates the hash trim). Un-ignore worker_sort_and_limit_cluster and assert the GroupByLimitAggregate trim.
…sort-merge The scan inserts a SortPreservingMergeExec to combine an index's range-disjoint partitions into one sorted stream. A Linear (hash) aggregate ignores input order, so that per-row merge is wasted work -- and degenerates into full-length key comparisons when the leading sort columns are low-cardinality / NULL-heavy. drop_sort_merge_under_global_aggregate already replaces such merges with plain CoalescePartitions for global (no GROUP BY) aggregates. Extend it, behind CUBESTORE_COALESCE_UNDER_HASH_AGGREGATE (default off), to grouped hash aggregates: CoalescePartitions still yields a single output partition (one hash table, no per-partition memory blowup), it just drops the useless sort. The flag is read once at the call site and passed in, so tests are deterministic. On a real 117M-row, 403809-group pre-agg query (GROUP BY 6 dims + 2 SUM, LIMIT): hash+trim path ~5500ms -> ~2526ms; correctness unchanged. (cherry picked from commit e6250b57fc7082b3383d2497cc3a848bfec9decc)
…ip global aggregates Cherry-pick of fbd20aed75 (dropped the dictionary-column regression test; dictionaries are not ported to this branch). - A partial aggregate preserves its input's partitioning; derive the output partitioning from the input in both try_new_from_partial and with_new_children instead of copying the wrapped aggregate's stale cached value. A too-low count made the parent CoalescePartitions read only some partitions and silently drop the rest (undercount on real multi-file/multi-partition scans). - Skip global aggregates (no GROUP BY): GroupValues can't be built over an empty schema (intern panics). Leave them to DataFusion. - Adds partial/final, multi-partition, re-child, and IPC-roundtrip regression tests.
…der-merge hash trim CUBESTORE_GROUP_BY_LIMIT_PER_PARTITION now genuinely controls where the worker's group-by-limit hash table lives, defaulting to over-merge: - off (default): coalesce the partial aggregate's input to a single partition, so the worker builds one hash table over the merged partitions (peak ~k, no intra-worker aggregation parallelism). - on: keep the raw multi-partition input, so the aggregate runs per partition (N hash tables; more parallelism, peak ~N*k). Previously the flag only tried to strip a leading CoalescePartitions feeding the aggregate -- a no-op on every real plan shape (the scan feeds raw partitions straight into the partial agg, so the trim was always per-partition and the flag did nothing). Replace strip_leading_coalesce_partitions (now dead, removed) with a coalesce-insert in the default path. Both modes produce identical results (verified md5-equal to ClickHouse on the union hash-trim). A/B on hash_union_trim: over-merge ~1717ms/+203MiB vs under-merge ~1482ms/+239MiB. group_by_limit lib, worker_sort_and_limit_cluster, topk and limit_pushdown_group suites stay green.
…s both planters The top-k total order T (ORDER BY cols ++ remaining group cols) is built in two places that must agree: the rewriter (pre_optimize, SortOptions::default() = ascending nulls-first for appended cols) and the descriptor in compute_worker_sort_and_limit (drives the finalize-time worker cut + router sort). The descriptor used nulls-last for appended cols, so worker and router could rank a group with a NULL in an appended column differently and drop its partial states (latent undercount; safe today only because the descriptor's router branch no-ops once the rewriter has planted the exec). Align the descriptor to ascending nulls-first. Also: - trim_top_k: guard factor == 0 (no trim), mirroring statistics(); defensive, the exec is only built with factor > 0. - new test limit_pushdown_group_null_appended: multi-column GROUP BY with a NULL in the appended total-order column, group spanning three chunks, trim engaged; asserts the fully-combined sum (undercount guard), membership fixed by the ORDER BY prefix so it does not depend on tie order. - update planning_topk_hash_aggregate's bare-LIMIT assertion to the aligned nulls-first appended order (it had encoded the desync).
|
Claude finished @waralexrom's task in 3m 12s —— View job Code ReviewVerdict: Solid PR — careful flag-gated rollout, thorough test coverage including the IPC/multi-partition undercount reproductions, well-thought correctness arguments around the total order. No correctness bugs found in the changed code. 0 high, 1 medium, 2 low + 1 doc-accuracy nit. Posted inline. DetailsInline findings posted
Strengths
Checked
|
| fn emit_all_trimmed(&mut self) -> DFResult<Option<RecordBatch>> { | ||
| if self.group_values.is_empty() { | ||
| return Ok(None); | ||
| } | ||
| let mut columns = self.group_values.emit(EmitTo::All)?; | ||
| for acc in &mut self.accumulators { | ||
| columns.extend(acc.state(EmitTo::All)?); | ||
| } | ||
| let batch = RecordBatch::try_new(Arc::clone(&self.schema), columns)?; | ||
| Ok(Some(self.trim_top_k(batch)?)) |
There was a problem hiding this comment.
emit_all_trimmed doesn't bound the in-memory hash table; only the emit batch.
self.group_values.emit(EmitTo::All) and acc.state(EmitTo::All) materialize state for every group that was interned during the run. The lexsort_to_indices(&sort_columns, Some(self.k)) then slices that materialized batch down to k. So the hash table's peak memory still grows to the full group cardinality — the win here is purely network bytes (and downstream router work).
The PR description claims "bounds worker hash-table memory (OOM avoidance on high-cardinality group-bys)". The OOM win actually comes from the over-merge coalesce in resort_worker_subtree (one hash table per worker instead of N), not from the trim itself. Consider tightening the wording in the PR description / GroupByLimitAggregateExec doc-comment ("trims its output to the top-k groups" is accurate; "bounds worker hash-table memory" is not).
If genuine streaming OOM avoidance is a goal, it'd require an incremental top-k variant (e.g. evict groups outside the running top-factor*k between batches) — out of scope for this PR, but worth noting.
| continue; | ||
| } | ||
| let name = &group_names[idx]; | ||
| let options = SortOptions::default(); |
There was a problem hiding this comment.
The total-order alignment between this rewriter and planning.rs's ChooseIndex::worker_sort_and_limit_descriptor rests entirely on SortOptions::default() here equaling (idx, true, true) (asc, nulls_first=true) in planning.rs. That alignment was just fixed in commit 7d00752 and is now correct, but it's load-bearing and silently brittle — if either side drifts (e.g. someone changes the rewriter to SortOptions { descending: false, nulls_first: false } for "stability"), the two planters' total orders disagree, the router/worker top-k arguments break, and groups silently disappear.
Suggest extracting a single pub const APPENDED_GROUP_COL_OPTIONS: SortOptions (or a small fn) shared by both call sites, with a doc-comment pointing each side at the other. Cheap insurance against a drift bug that won't show up in unit tests.
Codecov Report✅ All modified and coverable lines are covered by tests.
Additional details and impacted files@@ Coverage Diff @@
## master #11144 +/- ##
===========================================
- Coverage 83.63% 58.80% -24.83%
===========================================
Files 256 217 -39
Lines 79019 17511 -61508
Branches 0 3563 +3563
===========================================
- Hits 66085 10298 -55787
+ Misses 12934 6695 -6239
- Partials 0 518 +518
Flags with carried forward coverage won't be shown. Click here to find out more. ☔ View full report in Codecov by Harness. 🚀 New features to boost your workflow:
|
… ConfigObj CUBESTORE_GROUP_BY_LIMIT_PER_PARTITION and CUBESTORE_COALESCE_UNDER_HASH_AGGREGATE were read via raw std::env::var, bypassing the ConfigObj surface that CUBESTORE_GROUP_BY_LIMIT_FACTOR already uses -- so they couldn't be overridden by the test context or configured from the JS side, and the flag surface was inconsistent (CI review). Add group_by_limit_per_partition() and coalesce_under_hash_aggregate() to ConfigObj and thread them like group_by_limit_factor: coalesce via PreOptimizeRule -> pre_optimize_physical_plan, per-partition via CubeQueryPlanner -> finalize_physical_plan -> push_worker_sort_and_limit. Delete the two raw-env helpers. Behavior-preserving: the from-env constructor still reads both via env_bool, so stend --env keeps working; the test config keeps both false (= previous unset-env default). Note: env_bool is stricter than the old lenient `=="true"||=="1"` reads -- a malformed value now fails fast at startup instead of silently defaulting to false, matching the group_by_limit_factor convention. Also downgrade an assert_eq! to debug_assert_eq! (the single-group invariant is guaranteed by try_new_from_partial rejecting grouping sets).
env_bool panics at startup on anything but 0/1/true/false. For opt-in performance toggles a typo in CUBESTORE_GROUP_BY_LIMIT_PER_PARTITION / CUBESTORE_COALESCE_UNDER_HASH_AGGREGATE must not take a node down. Read them with a lenient env_flag helper (1/true = on, anything else = off), restoring the previous raw-env behavior while keeping the ConfigObj surface.
|
Claude finished @waralexrom's task in 56s —— View job Re-review (delta since 7d00752)Verdict: Two new commits since the prior review ( DetailsWhat changed and how it addresses prior review
Still open from the prior review
Checked
|
Summary
Adds a worker-side group-by-limit optimization to CubeStore: for
GROUP BY <non-index-prefix> [ORDER BY <subset of group cols>] LIMIT kqueries, each worker bounds its partial hash aggregate to the top-k groups (GroupByLimitAggregateExec) instead of materializing every group. This bounds worker hash-table memory (OOM avoidance on high-cardinality group-bys) and cuts the rows crossing the network. All behavior is runtime-env-gated and off by default.Changes
GroupByLimitAggregateExec(queryplanner/group_by_limit_aggregate/): a custom partial hash-aggregate exec that trims its output to the top-k groups by the total orderT = ORDER BY cols ++ remaining group cols, built over DataFusion'sGroupValues(mirrors the existingInlineAggregateExecpattern). Correct because top-k membership depends only on the group key, andTis applied at both the worker cut and the router's final sort/merge.group_by_limit_rewriter.rs,planning.rs): plant the exec for the matching plan shape; extend limit pushdown to bareLIMIT(noORDER BY) and throughUNION. Correctness guards: bail onHAVING/filter-above-aggregate, skip global (no-GROUP-BY) aggregates, keep the custom exec's output partitioning following its input.distributed_partial_aggregate.rs): inline/sorted aggregates stay bounded by a per-partitionSort(fetch); hash aggregates use the trimming exec.CUBESTORE_GROUP_BY_LIMIT_PER_PARTITIONtoggles where the worker hash table lives — over-merge (default: one table per worker over coalesced partitions, lower memory) vs under-merge (per-partition, more parallelism).CUBESTORE_GROUP_BY_LIMIT_FACTOR(0= disabled),CUBESTORE_GROUP_BY_LIMIT_PER_PARTITION,CUBESTORE_COALESCE_UNDER_HASH_AGGREGATE(drop the useless sort-merge under a Linear hash aggregate).Testing
GroupByLimitAggregateExec(partial/final, multi-partition, IPC roundtrip, global-aggregate skip, output-partitioning-follows-rechilded-input).planning_topk_hash_aggregate,topk_hash_aggregate_trim,limit_pushdown_group*(incl. a newlimit_pushdown_group_null_appendedcovering a NULL in an appended total-order column spanning chunks),group_by_prefix_limit_high_cardinality; cluster testworker_sort_and_limit_cluster.cubestorelib suite (290) green with all optimization envs enabled.🤖 Generated with Claude Code