diff --git a/datafusion/physical-plan/src/aggregates/row_hash.rs b/datafusion/physical-plan/src/aggregates/row_hash.rs index de857370ce28..35f32ac7ae03 100644 --- a/datafusion/physical-plan/src/aggregates/row_hash.rs +++ b/datafusion/physical-plan/src/aggregates/row_hash.rs @@ -1036,7 +1036,19 @@ impl GroupedHashAggregateStream { self.group_values.len() }; - if let Some(batch) = self.emit(EmitTo::First(n), false)? { + // Clamp to the sort boundary when using partial group ordering, + // otherwise remove_groups panics (#20445). + let n = match &self.group_ordering { + GroupOrdering::None => n, + _ => match self.group_ordering.emit_to() { + Some(EmitTo::First(max)) => n.min(max), + _ => 0, + }, + }; + + if n > 0 + && let Some(batch) = self.emit(EmitTo::First(n), false)? + { Ok(Some(ExecutionState::ProducingOutput(batch))) } else { Err(oom) @@ -1291,6 +1303,7 @@ impl GroupedHashAggregateStream { #[cfg(test)] mod tests { use super::*; + use crate::InputOrderMode; use crate::execution_plan::ExecutionPlan; use crate::test::TestMemoryExec; use arrow::array::{Int32Array, Int64Array}; @@ -1553,4 +1566,88 @@ mod tests { Ok(()) } + + #[tokio::test] + async fn test_emit_early_with_partially_sorted() -> Result<()> { + // Reproducer for #20445: EmitEarly with PartiallySorted panics in + // remove_groups because it emits more groups than the sort boundary. + let schema = Arc::new(Schema::new(vec![ + Field::new("sort_col", DataType::Int32, false), + Field::new("group_col", DataType::Int32, false), + Field::new("value_col", DataType::Int64, false), + ])); + + // All rows share sort_col=1 (no sort boundary), with unique group_col + // values to create many groups and trigger memory pressure. + let n = 256; + let batch = RecordBatch::try_new( + Arc::clone(&schema), + vec![ + Arc::new(Int32Array::from(vec![1; n])), + Arc::new(Int32Array::from((0..n as i32).collect::>())), + Arc::new(Int64Array::from(vec![1; n])), + ], + )?; + + let runtime = RuntimeEnvBuilder::default() + .with_memory_limit(4096, 1.0) + .build_arc()?; + let mut task_ctx = TaskContext::default().with_runtime(runtime); + let mut cfg = task_ctx.session_config().clone(); + cfg = cfg.set( + "datafusion.execution.batch_size", + &datafusion_common::ScalarValue::UInt64(Some(128)), + ); + cfg = cfg.set( + "datafusion.execution.skip_partial_aggregation_probe_rows_threshold", + &datafusion_common::ScalarValue::UInt64(Some(u64::MAX)), + ); + task_ctx = task_ctx.with_session_config(cfg); + let task_ctx = Arc::new(task_ctx); + + let ordering = LexOrdering::new(vec![PhysicalSortExpr::new_default(Arc::new( + Column::new("sort_col", 0), + ) + as _)]) + .unwrap(); + let exec = TestMemoryExec::try_new(&[vec![batch]], Arc::clone(&schema), None)? + .try_with_sort_information(vec![ordering])?; + let exec = Arc::new(TestMemoryExec::update_cache(&Arc::new(exec))); + + // GROUP BY sort_col, group_col with input sorted on sort_col + // gives PartiallySorted([0]) + let aggregate_exec = AggregateExec::try_new( + AggregateMode::Partial, + PhysicalGroupBy::new_single(vec![ + (col("sort_col", &schema)?, "sort_col".to_string()), + (col("group_col", &schema)?, "group_col".to_string()), + ]), + vec![Arc::new( + AggregateExprBuilder::new(count_udaf(), vec![col("value_col", &schema)?]) + .schema(Arc::clone(&schema)) + .alias("count_value") + .build()?, + )], + vec![None], + exec, + Arc::clone(&schema), + )?; + assert!(matches!( + aggregate_exec.input_order_mode(), + InputOrderMode::PartiallySorted(_) + )); + + // Must not panic with "assertion failed: *current_sort >= n" + let mut stream = GroupedHashAggregateStream::new(&aggregate_exec, &task_ctx, 0)?; + while let Some(result) = stream.next().await { + if let Err(e) = result { + if e.to_string().contains("Resources exhausted") { + break; + } + return Err(e); + } + } + + Ok(()) + } }