From 67fcc7e321196e254b963f5bc62a6fa2ea084545 Mon Sep 17 00:00:00 2001 From: Jack Kleeman Date: Fri, 20 Feb 2026 08:57:08 +0000 Subject: [PATCH 1/2] Add unit test triggering the panic --- .../physical-plan/src/aggregates/row_hash.rs | 85 +++++++++++++++++++ 1 file changed, 85 insertions(+) diff --git a/datafusion/physical-plan/src/aggregates/row_hash.rs b/datafusion/physical-plan/src/aggregates/row_hash.rs index de857370ce285..704db9bfc233a 100644 --- a/datafusion/physical-plan/src/aggregates/row_hash.rs +++ b/datafusion/physical-plan/src/aggregates/row_hash.rs @@ -1291,6 +1291,7 @@ impl GroupedHashAggregateStream { #[cfg(test)] mod tests { use super::*; + use crate::InputOrderMode; use crate::execution_plan::ExecutionPlan; use crate::test::TestMemoryExec; use arrow::array::{Int32Array, Int64Array}; @@ -1553,4 +1554,88 @@ mod tests { Ok(()) } + + #[tokio::test] + async fn test_emit_early_with_partially_sorted() -> Result<()> { + // Reproducer for #20445: EmitEarly with PartiallySorted panics in + // remove_groups because it emits more groups than the sort boundary. + let schema = Arc::new(Schema::new(vec![ + Field::new("sort_col", DataType::Int32, false), + Field::new("group_col", DataType::Int32, false), + Field::new("value_col", DataType::Int64, false), + ])); + + // All rows share sort_col=1 (no sort boundary), with unique group_col + // values to create many groups and trigger memory pressure. + let n = 256; + let batch = RecordBatch::try_new( + Arc::clone(&schema), + vec![ + Arc::new(Int32Array::from(vec![1; n])), + Arc::new(Int32Array::from((0..n as i32).collect::>())), + Arc::new(Int64Array::from(vec![1; n])), + ], + )?; + + let runtime = RuntimeEnvBuilder::default() + .with_memory_limit(4096, 1.0) + .build_arc()?; + let mut task_ctx = TaskContext::default().with_runtime(runtime); + let mut cfg = task_ctx.session_config().clone(); + cfg = cfg.set( + "datafusion.execution.batch_size", + &datafusion_common::ScalarValue::UInt64(Some(128)), + ); + cfg = cfg.set( + "datafusion.execution.skip_partial_aggregation_probe_rows_threshold", + &datafusion_common::ScalarValue::UInt64(Some(u64::MAX)), + ); + task_ctx = task_ctx.with_session_config(cfg); + let task_ctx = Arc::new(task_ctx); + + let ordering = LexOrdering::new(vec![PhysicalSortExpr::new_default(Arc::new( + Column::new("sort_col", 0), + ) + as _)]) + .unwrap(); + let exec = TestMemoryExec::try_new(&[vec![batch]], Arc::clone(&schema), None)? + .try_with_sort_information(vec![ordering])?; + let exec = Arc::new(TestMemoryExec::update_cache(&Arc::new(exec))); + + // GROUP BY sort_col, group_col with input sorted on sort_col + // gives PartiallySorted([0]) + let aggregate_exec = AggregateExec::try_new( + AggregateMode::Partial, + PhysicalGroupBy::new_single(vec![ + (col("sort_col", &schema)?, "sort_col".to_string()), + (col("group_col", &schema)?, "group_col".to_string()), + ]), + vec![Arc::new( + AggregateExprBuilder::new(count_udaf(), vec![col("value_col", &schema)?]) + .schema(Arc::clone(&schema)) + .alias("count_value") + .build()?, + )], + vec![None], + exec, + Arc::clone(&schema), + )?; + assert!(matches!( + aggregate_exec.input_order_mode(), + InputOrderMode::PartiallySorted(_) + )); + + // Must not panic with "assertion failed: *current_sort >= n" + let mut stream = GroupedHashAggregateStream::new(&aggregate_exec, &task_ctx, 0)?; + while let Some(result) = stream.next().await { + if let Err(e) = result { + if e.to_string().contains("Resources exhausted") { + break; + } + return Err(e); + } + } + + Ok(()) + } } From f5382f4b76b4f17a2d299d3d6d1a71f937220ae0 Mon Sep 17 00:00:00 2001 From: Jack Kleeman Date: Fri, 20 Feb 2026 09:00:25 +0000 Subject: [PATCH 2/2] Clamp to the sort boundary when using partial group ordering Closes #20445 --- .../physical-plan/src/aggregates/row_hash.rs | 14 +++++++++++++- 1 file changed, 13 insertions(+), 1 deletion(-) diff --git a/datafusion/physical-plan/src/aggregates/row_hash.rs b/datafusion/physical-plan/src/aggregates/row_hash.rs index 704db9bfc233a..35f32ac7ae038 100644 --- a/datafusion/physical-plan/src/aggregates/row_hash.rs +++ b/datafusion/physical-plan/src/aggregates/row_hash.rs @@ -1036,7 +1036,19 @@ impl GroupedHashAggregateStream { self.group_values.len() }; - if let Some(batch) = self.emit(EmitTo::First(n), false)? { + // Clamp to the sort boundary when using partial group ordering, + // otherwise remove_groups panics (#20445). + let n = match &self.group_ordering { + GroupOrdering::None => n, + _ => match self.group_ordering.emit_to() { + Some(EmitTo::First(max)) => n.min(max), + _ => 0, + }, + }; + + if n > 0 + && let Some(batch) = self.emit(EmitTo::First(n), false)? + { Ok(Some(ExecutionState::ProducingOutput(batch))) } else { Err(oom)