From 1cabd37572cfb4f060556fc3017885d888ce71ef Mon Sep 17 00:00:00 2001 From: Adrian Tanase Date: Sat, 15 Mar 2025 13:27:01 +0200 Subject: [PATCH 1/2] [build] - cargo lock update --- Cargo.lock | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/Cargo.lock b/Cargo.lock index 8eaaad283b3bc..bad318e3f77a5 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1890,10 +1890,12 @@ dependencies = [ "object_store", "parquet", "paste", + "prost", "pyo3", "rand 0.8.5", "recursive", "sqlparser", + "substrait", "tokio", "web-time", ] @@ -2003,6 +2005,7 @@ dependencies = [ "datafusion-physical-expr-common", "env_logger", "indexmap 2.7.1", + "log", "paste", "recursive", "serde_json", @@ -2179,7 +2182,9 @@ dependencies = [ "ctor", "datafusion-common", "datafusion-expr", + "datafusion-functions", "datafusion-functions-aggregate", + "datafusion-functions-nested", "datafusion-functions-window", "datafusion-functions-window-common", "datafusion-physical-expr", From ccd8a753192538f46c1057eacde5d4cc10d1386d Mon Sep 17 00:00:00 2001 From: Adrian Tanase Date: Sat, 15 Mar 2025 13:27:01 +0200 Subject: [PATCH 2/2] [HSTACK] - inlinetablescan top-down instead of bottom-up --- .../src/analyzer/inline_table_scan.rs | 143 ++++++++++++++---- 1 file changed, 113 insertions(+), 30 deletions(-) diff --git a/datafusion/optimizer/src/analyzer/inline_table_scan.rs b/datafusion/optimizer/src/analyzer/inline_table_scan.rs index 95781b395f3c3..c6422972b2637 100644 --- a/datafusion/optimizer/src/analyzer/inline_table_scan.rs +++ b/datafusion/optimizer/src/analyzer/inline_table_scan.rs @@ -38,7 +38,7 @@ impl InlineTableScan { impl AnalyzerRule for InlineTableScan { fn analyze(&self, plan: LogicalPlan, _: &ConfigOptions) -> Result { - plan.transform_up(analyze_internal).data() + plan.transform_down_with_subqueries(analyze_internal).data() } fn name(&self) -> &str { @@ -47,37 +47,30 @@ impl AnalyzerRule for InlineTableScan { } fn analyze_internal(plan: LogicalPlan) -> Result> { - // rewrite any subqueries in the plan first - let transformed_plan = - plan.map_subqueries(|plan| plan.transform_up(analyze_internal))?; - - let transformed_plan = transformed_plan.transform_data(|plan| { - match plan { - // Match only on scans without filter / projection / fetch - // Views and DataFrames won't have those added - // during the early stage of planning. - LogicalPlan::TableScan(table_scan) if table_scan.filters.is_empty() => { - if let Some(sub_plan) = table_scan.source.get_logical_plan() { - let sub_plan = sub_plan.into_owned(); - let projection_exprs = - generate_projection_expr(&table_scan.projection, &sub_plan)?; - LogicalPlanBuilder::from(sub_plan) - .project(projection_exprs)? - // Ensures that the reference to the inlined table remains the - // same, meaning we don't have to change any of the parent nodes - // that reference this table. - .alias(table_scan.table_name)? - .build() - .map(Transformed::yes) - } else { - Ok(Transformed::no(LogicalPlan::TableScan(table_scan))) - } + match plan { + // Match only on scans without filter / projection / fetch + // Views and DataFrames won't have those added + // during the early stage of planning. + LogicalPlan::TableScan(table_scan) if table_scan.filters.is_empty() => { + if let Some(sub_plan) = table_scan.source.get_logical_plan() { + let sub_plan = sub_plan.into_owned(); + let projection_exprs = + generate_projection_expr(&table_scan.projection, &sub_plan)?; + + LogicalPlanBuilder::from(sub_plan) + .project(projection_exprs)? + // Ensures that the reference to the inlined table remains the + // same, meaning we don't have to change any of the parent nodes + // that reference this table. + .alias(table_scan.table_name)? + .build() + .map(Transformed::yes) + } else { + Ok(Transformed::no(LogicalPlan::TableScan(table_scan))) } - _ => Ok(Transformed::no(plan)), } - })?; - - Ok(transformed_plan) + _ => Ok(Transformed::no(plan)), + } } fn generate_projection_expr( @@ -202,4 +195,94 @@ mod tests { assert_analyzed_plan_eq(Arc::new(InlineTableScan::new()), plan, expected) } + + #[derive(Debug)] + // stand-in for DataFrameTableProvider which we can't access here + struct WrappingSource { + plan: LogicalPlan, + } + + impl TableSource for WrappingSource { + fn as_any(&self) -> &dyn std::any::Any { + self + } + + fn schema(&self) -> arrow::datatypes::SchemaRef { + let schema: Schema = self.plan.schema().as_ref().into(); + Arc::new(schema) + } + + fn supports_filters_pushdown( + &self, + filters: &[&Expr], + ) -> datafusion_common::Result> + { + // A filter is added on the DataFrame when given + Ok(vec![ + datafusion_expr::TableProviderFilterPushDown::Exact; + filters.len() + ]) + } + + fn get_logical_plan(&self) -> Option> { + Some(Cow::Borrowed(&self.plan)) + } + } + + #[test] + fn inline_table_scan_wrapped_once() -> datafusion_common::Result<()> { + let custom_source = CustomSource::new(); + let wrapped_source = WrappingSource { + plan: LogicalPlanBuilder::scan("wrapped", Arc::new(custom_source), None)? + .build()?, + }; + + let scan = + LogicalPlanBuilder::scan("x".to_string(), Arc::new(wrapped_source), None)?; + let plan = scan.build()?; + let expected = "SubqueryAlias: x\ + \n Projection: *\ + \n SubqueryAlias: wrapped\ + \n Projection: *\ + \n TableScan: y"; + + assert_analyzed_plan_eq(Arc::new(InlineTableScan::new()), plan, expected) + } + + #[test] + fn inline_table_scan_wrapped_twice_with_projection() -> datafusion_common::Result<()> { + let custom_source = CustomSource::new(); + let wrapped_source_once = WrappingSource { + plan: LogicalPlanBuilder::scan( + "wrapped_once", + Arc::new(custom_source), + Some(vec![0]), + )? + .build()?, + }; + let wrapped_source_twice = WrappingSource { + plan: LogicalPlanBuilder::scan( + "wrapped_twice", + Arc::new(wrapped_source_once), + None, + )? + .build()?, + }; + + let scan = LogicalPlanBuilder::scan( + "x".to_string(), + Arc::new(wrapped_source_twice), + None, + )?; + let plan = scan.build()?; + let expected = "SubqueryAlias: x\ + \n Projection: *\ + \n SubqueryAlias: wrapped_twice\ + \n Projection: *\ + \n SubqueryAlias: wrapped_once\ + \n Projection: y.a\ + \n TableScan: y"; + + assert_analyzed_plan_eq(Arc::new(InlineTableScan::new()), plan, expected) + } }