From 6ddfa49f2881fd73fe83e98b860a7a6a58fdb9f7 Mon Sep 17 00:00:00 2001 From: Avi Minsky Date: Sat, 7 Mar 2026 22:24:37 +0200 Subject: [PATCH 1/6] introduced new instrumentation for adding duration to window function --- .../instrumentation-extension/SKILL.md | 162 ++++++++++++++++++ .../DataflintSparkUICommonLoader.scala | 4 +- .../DataFlintInstrumentationExtension.scala | 34 ++++ .../window/DataFlintWindowExec.scala | 73 ++++++++ .../DataFlintInstrumentationExtension.scala | 116 ++++++++----- .../window/DataFlintWindowExec.scala | 82 +++++++++ .../dataflint_pyspark_example.py | 25 +++ 7 files changed, 454 insertions(+), 42 deletions(-) create mode 100644 spark-plugin/.claude/commands/instrumentation-extension/SKILL.md create mode 100644 spark-plugin/pluginspark3/src/main/scala/org/apache/spark/sql/execution/window/DataFlintWindowExec.scala create mode 100644 spark-plugin/pluginspark4/src/main/scala/org/apache/spark/sql/execution/window/DataFlintWindowExec.scala diff --git a/spark-plugin/.claude/commands/instrumentation-extension/SKILL.md b/spark-plugin/.claude/commands/instrumentation-extension/SKILL.md new file mode 100644 index 0000000..f920528 --- /dev/null +++ b/spark-plugin/.claude/commands/instrumentation-extension/SKILL.md @@ -0,0 +1,162 @@ +# Instrumentation Extension Expert + +You are an expert on the `DataFlintInstrumentationExtension` — the Spark SQL extension that injects duration metrics into MapInPandas and MapInArrow physical plan nodes across Spark 3.0–4.1. + +## How to use this skill + +Respond to any question or task about the instrumentation extension. Common tasks: +- Add a new plan node type (new `case` branch in the ColumnarRule) +- Debug why a metric isn't appearing +- Add support for a new Spark version +- Understand how a specific version implementation works + +--- + +## Architecture Overview + +### Entry Points +- **Spark 3.x**: `pluginspark3/src/main/scala/org/apache/spark/dataflint/DataFlintInstrumentationExtension.scala` +- **Spark 4.x**: `pluginspark4/src/main/scala/org/apache/spark/dataflint/DataFlintInstrumentationExtension.scala` +- **Config/registration**: `plugin/src/main/scala/org/apache/spark/dataflint/listener/DataflintSparkUICommonLoader.scala` + +### Registration Flow +1. `DataflintSparkUICommonLoader.registerInstrumentationExtension()` appends the extension class to `spark.sql.extensions` if any instrumentation config is enabled. +2. The extension registers a `DataFlintInstrumentationColumnarRule` via `injectColumnar`. +3. The rule's `preColumnarTransitions` hook runs `transformUp` on the physical plan, replacing target nodes. + +### Config Flags +``` +spark.dataflint.instrument.spark.enabled +spark.dataflint.instrument.spark.mapInPandas.enabled +spark.dataflint.instrument.spark.mapInArrow.enabled +``` + +--- + +## ColumnarRule Pattern + +### Spark 3.x (two separate methods to avoid ClassNotFoundError on old versions) +```scala +def replaceMapInPandas(plan: SparkPlan): SparkPlan = plan.transformUp { + case mapInPandas: MapInPandasExec if mapInPandasEnabled => + sparkMinorVersion match { + case "3.0" => DataFlintMapInPandasExec_3_0(...) + case "3.1" | "3.2" => DataFlintMapInPandasExec_3_1(...) + case "3.3" => DataFlintMapInPandasExec_3_3(...) + case "3.4" => DataFlintMapInPandasExec_3_4(...) + case _ => DataFlintMapInPandasExec_3_5(...) // 3.5+ + } +} + +def replaceMapInArrow(plan: SparkPlan): SparkPlan = plan.transformUp { + case mapInArrow: PythonMapInArrowExec if mapInArrowEnabled => + sparkMinorVersion match { + case "3.3" => DataFlintPythonMapInArrowExec_3_3(...) + case "3.4" => DataFlintPythonMapInArrowExec_3_4(...) + case _ => DataFlintPythonMapInArrowExec_3_5(...) // 3.5+ + } +} + +override def preColumnarTransitions: SparkPlan => SparkPlan = plan => { + var result = replaceMapInPandas(plan) + try { result = replaceMapInArrow(result) } catch { case _: ClassNotFoundException => } + result +} +``` + +### Spark 4.x (single combined transformUp) +```scala +override def preColumnarTransitions: SparkPlan => SparkPlan = plan => + plan.transformUp { + case mapInPandas: MapInPandasExec if mapInPandasEnabled => + sparkMinorVersion match { + case "4.0" => DataFlintMapInPandasExec_4_0(...) + case _ => DataFlintMapInPandasExec_4_1(...) + } + case mapInArrow: MapInArrowExec if mapInArrowEnabled => + sparkMinorVersion match { + case "4.0" => DataFlintPythonMapInArrowExec_4_0(...) + case _ => DataFlintPythonMapInArrowExec_4_1(...) + } + } +``` + +--- + +## Version-Specific Implementation Files + +All live under: +- `pluginspark3/src/main/scala/org/apache/spark/sql/execution/python/` +- `pluginspark4/src/main/scala/org/apache/spark/sql/execution/python/` + +### Key differences by version + +| Version | File suffix | Notable changes | +|---------|-------------|-----------------| +| 3.0 | `_3_0` | No MapInBatchExec trait, no PythonSQLMetrics, 6-arg ArrowPythonRunner via reflection | +| 3.1/3.2 | `_3_1` | Same as 3.0, 6-arg runner | +| 3.3 | `_3_3` | MapInBatchExec trait added; MapInArrow support introduced; still 6-arg runner | +| 3.4 | `_3_4` | PythonSQLMetrics mixin; 7-arg runner (adds pythonMetrics param) | +| 3.5 | `_3_5` | Major refactor: MapInBatchEvaluatorFactory; barrier mode; JobArtifactSet (Spark Connect) | +| 4.0 | `_4_0` | ResourceProfile field; chainedFunc includes resultId (`Seq[(ChainedPythonFunctions, Long)]`) | +| 4.1 | `_4_1` | sessionUUID for worker logging; pythonUDFProfiler; 13-arg EvaluatorFactory via reflection | + +### Duration metric pattern (all versions) +```scala +val durationMetric = SQLMetrics.createTimingMetric(sparkContext, "duration") +// ... measured in nanoseconds, reported in milliseconds +val startTime = System.nanoTime() +// ... execute the original logic ... +durationMetric += (System.nanoTime() - startTime) / 1000000 +``` + +### Why reflection? +Constructor signatures change between Spark minor versions. Rather than maintaining separate binary artifacts per patch version, reflection allows a single compiled class to work across minor versions within a series. + +--- + +## How to Add a New Plan Node Type + +### Step 1: Identify the target node +Find the physical plan node class (e.g., `FlatMapGroupsInPandasExec`) and its constructor parameters across all relevant Spark versions. + +### Step 2: Add config flag (optional) +In `DataflintSparkUICommonLoader.scala`, add a new constant and check it in `registerInstrumentationExtension()`. + +### Step 3: Create instrumented wrapper classes +For each Spark version that supports the node, create a new file: +``` +DataFlintExec__.scala +``` + +The wrapper must: +1. Extend the original node class (copy all constructor params) +2. Override `metrics` to add the `"duration"` timing metric +3. Wrap the execution logic to measure and record duration + +### Step 4: Add case branches +In both `pluginspark3` and `pluginspark4` extension files, add a new `case` branch: +```scala +case target: TargetExec if targetEnabled => + sparkMinorVersion match { + case "3.3" => DataFlintTargetExec_3_3(...) + // ... + } +``` + +For Spark 3.x: if the node doesn't exist in 3.0/3.1, wrap the replacement in a separate method with try-catch for `ClassNotFoundException`. + +### Step 5: Wire the guard flag +Add the enabled flag lookup: +```scala +val targetEnabled = conf.getBoolean("spark.dataflint.instrument.spark..enabled", defaultValue = true) +``` + +--- + +## Debugging Tips + +- **Metric not showing**: Check if the config flag is enabled and the node class name matches exactly (including package). +- **ClassNotFoundException on old Spark**: The replacement node class likely references a class that doesn't exist in that version. Move the `case` branch into a separate method and wrap with `try { } catch { case _: ClassNotFoundException => }`. +- **Wrong constructor args via reflection**: Print the available constructors with `clazz.getConstructors.foreach(println)` to find the right signature. +- **Version detection**: Version is extracted as `SPARK_VERSION.split("\\.").take(2).mkString(".")` giving `"3.5"`, `"4.1"`, etc. \ No newline at end of file diff --git a/spark-plugin/plugin/src/main/scala/org/apache/spark/dataflint/DataflintSparkUICommonLoader.scala b/spark-plugin/plugin/src/main/scala/org/apache/spark/dataflint/DataflintSparkUICommonLoader.scala index 8536f54..5b2bf06 100644 --- a/spark-plugin/plugin/src/main/scala/org/apache/spark/dataflint/DataflintSparkUICommonLoader.scala +++ b/spark-plugin/plugin/src/main/scala/org/apache/spark/dataflint/DataflintSparkUICommonLoader.scala @@ -134,6 +134,7 @@ object DataflintSparkUICommonLoader extends Logging { val INSTRUMENT_SPARK_ENABLED = "spark.dataflint.instrument.spark.enabled" val INSTRUMENT_MAP_IN_PANDAS_ENABLED = "spark.dataflint.instrument.spark.mapInPandas.enabled" val INSTRUMENT_MAP_IN_ARROW_ENABLED = "spark.dataflint.instrument.spark.mapInArrow.enabled" + val INSTRUMENT_WINDOW_ENABLED = "spark.dataflint.instrument.spark.window.enabled" def install(context: SparkContext, pageFactory: DataflintPageFactory): String = { new DataflintSparkUICommonInstaller().install(context, pageFactory) @@ -164,7 +165,8 @@ object DataflintSparkUICommonLoader extends Logging { val instrumentEnabled = sc.conf.getBoolean(INSTRUMENT_SPARK_ENABLED, defaultValue = false) val mapInPandasEnabled = sc.conf.getBoolean(INSTRUMENT_MAP_IN_PANDAS_ENABLED, defaultValue = false) val mapInArrowEnabled = sc.conf.getBoolean(INSTRUMENT_MAP_IN_ARROW_ENABLED, defaultValue = false) - val anyInstrumentationEnabled = instrumentEnabled || mapInPandasEnabled || mapInArrowEnabled + val windowEnabled = sc.conf.getBoolean(INSTRUMENT_WINDOW_ENABLED, defaultValue = false) + val anyInstrumentationEnabled = instrumentEnabled || mapInPandasEnabled || mapInArrowEnabled || windowEnabled if (!anyInstrumentationEnabled) { logInfo("DataFlint instrumentation extension is disabled (no instrumentation flags enabled)") return diff --git a/spark-plugin/pluginspark3/src/main/scala/org/apache/spark/dataflint/DataFlintInstrumentationExtension.scala b/spark-plugin/pluginspark3/src/main/scala/org/apache/spark/dataflint/DataFlintInstrumentationExtension.scala index ca4b1d0..5b56463 100644 --- a/spark-plugin/pluginspark3/src/main/scala/org/apache/spark/dataflint/DataFlintInstrumentationExtension.scala +++ b/spark-plugin/pluginspark3/src/main/scala/org/apache/spark/dataflint/DataFlintInstrumentationExtension.scala @@ -4,6 +4,8 @@ import org.apache.spark.SPARK_VERSION import org.apache.spark.internal.Logging import org.apache.spark.sql.SparkSession import org.apache.spark.sql.SparkSessionExtensions +import org.apache.spark.sql.Strategy +import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Window => LogicalWindow} import org.apache.spark.sql.catalyst.rules.Rule import org.apache.spark.sql.execution.{ColumnarRule, SparkPlan} import org.apache.spark.sql.execution.python.{ @@ -17,6 +19,7 @@ import org.apache.spark.sql.execution.python.{ DataFlintPythonMapInArrowExec_3_5, MapInPandasExec } +import org.apache.spark.sql.execution.window.DataFlintWindowExec /** * A SparkSessionExtension that injects DataFlint instrumentation into Spark's physical planning phase. @@ -46,6 +49,10 @@ class DataFlintInstrumentationExtension extends (SparkSessionExtensions => Unit) extensions.injectColumnar { session => DataFlintInstrumentationColumnarRule(session) } + + extensions.injectPlannerStrategy { session => + DataFlintWindowPlannerStrategy(session) + } } } @@ -189,3 +196,30 @@ case class DataFlintInstrumentationColumnarRule(session: SparkSession) extends C } } } + +/** + * A planner Strategy that converts the logical Window plan node directly into + * DataFlintWindowExec, bypassing WindowExec entirely. + * + * Using a Strategy (rather than a ColumnarRule) is correct for row-based operators + * like WindowExec that do not participate in columnar execution. + */ +case class DataFlintWindowPlannerStrategy(session: SparkSession) extends Strategy with Logging { + + private val windowEnabled: Boolean = { + val conf = session.sparkContext.conf + val globalEnabled = conf.getBoolean(DataflintSparkUICommonLoader.INSTRUMENT_SPARK_ENABLED, defaultValue = false) + val specificEnabled = conf.getBoolean(DataflintSparkUICommonLoader.INSTRUMENT_WINDOW_ENABLED, defaultValue = false) + globalEnabled || specificEnabled + } + + override def apply(plan: LogicalPlan): Seq[SparkPlan] = { + if (!windowEnabled) return Nil + plan match { + case w: LogicalWindow => + logInfo("Replacing logical Window with DataFlintWindowExec") + DataFlintWindowExec(w.windowExpressions, w.partitionSpec, w.orderSpec, planLater(w.child)) :: Nil + case _ => Nil + } + } +} diff --git a/spark-plugin/pluginspark3/src/main/scala/org/apache/spark/sql/execution/window/DataFlintWindowExec.scala b/spark-plugin/pluginspark3/src/main/scala/org/apache/spark/sql/execution/window/DataFlintWindowExec.scala new file mode 100644 index 0000000..bb8b83a --- /dev/null +++ b/spark-plugin/pluginspark3/src/main/scala/org/apache/spark/sql/execution/window/DataFlintWindowExec.scala @@ -0,0 +1,73 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.sql.execution.window + +import org.apache.spark.internal.Logging +import org.apache.spark.rdd.RDD +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.expressions._ +import org.apache.spark.sql.execution.SparkPlan +import org.apache.spark.sql.execution.metric.{SQLMetric, SQLMetrics} + +import java.util.concurrent.TimeUnit.NANOSECONDS + +class DataFlintWindowExec private ( + windowExpression: Seq[NamedExpression], + partitionSpec: Seq[Expression], + orderSpec: Seq[SortOrder], + child: SparkPlan) + extends WindowExec(windowExpression, partitionSpec, orderSpec, child) with Logging { + + override def nodeName: String = "DataFlintWindow" + + override lazy val metrics: Map[String, SQLMetric] = Map( + "spillSize" -> SQLMetrics.createSizeMetric(sparkContext, "spill size"), + "duration" -> SQLMetrics.createTimingMetric(sparkContext, "duration") + ) + + override protected def doExecute(): RDD[InternalRow] = { + val durationMetric = longMetric("duration") + val innerRDD = super.doExecute() + innerRDD.mapPartitions { iter => + val startTime = System.nanoTime() + var done = false + new Iterator[InternalRow] { + override def hasNext: Boolean = { + val r = iter.hasNext + if (!r && !done) { + durationMetric += NANOSECONDS.toMillis(System.nanoTime() - startTime) + done = true + } + r + } + override def next(): InternalRow = iter.next() + } + } + } + + override protected def withNewChildInternal(newChild: SparkPlan): DataFlintWindowExec = + DataFlintWindowExec(windowExpression, partitionSpec, orderSpec, newChild) +} + +object DataFlintWindowExec { + def apply( + windowExpression: Seq[NamedExpression], + partitionSpec: Seq[Expression], + orderSpec: Seq[SortOrder], + child: SparkPlan): DataFlintWindowExec = + new DataFlintWindowExec(windowExpression, partitionSpec, orderSpec, child) +} \ No newline at end of file diff --git a/spark-plugin/pluginspark4/src/main/scala/org/apache/spark/dataflint/DataFlintInstrumentationExtension.scala b/spark-plugin/pluginspark4/src/main/scala/org/apache/spark/dataflint/DataFlintInstrumentationExtension.scala index 388c744..b93e1b1 100644 --- a/spark-plugin/pluginspark4/src/main/scala/org/apache/spark/dataflint/DataFlintInstrumentationExtension.scala +++ b/spark-plugin/pluginspark4/src/main/scala/org/apache/spark/dataflint/DataFlintInstrumentationExtension.scala @@ -4,6 +4,8 @@ import org.apache.spark.SPARK_VERSION import org.apache.spark.internal.Logging import org.apache.spark.sql.SparkSession import org.apache.spark.sql.SparkSessionExtensions +import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Window => LogicalWindow} +import org.apache.spark.sql.execution.SparkStrategy import org.apache.spark.sql.catalyst.rules.Rule import org.apache.spark.sql.execution.{ColumnarRule, SparkPlan} import org.apache.spark.sql.execution.python.{ @@ -14,6 +16,7 @@ import org.apache.spark.sql.execution.python.{ MapInArrowExec, MapInPandasExec } +import org.apache.spark.sql.execution.window.DataFlintWindowExec /** * A SparkSessionExtension that injects DataFlint instrumentation into Spark's physical planning phase. @@ -40,6 +43,10 @@ class DataFlintInstrumentationExtension extends (SparkSessionExtensions => Unit) extensions.injectColumnar { session => DataFlintInstrumentationColumnarRule(session) } + + extensions.injectPlannerStrategy { session => + DataFlintWindowPlannerStrategy(session) + } } } @@ -75,48 +82,75 @@ case class DataFlintInstrumentationColumnarRule(session: SparkSession) extends C override def preColumnarTransitions: Rule[SparkPlan] = { plan => if (!mapInPandasEnabled && !mapInArrowEnabled) plan else plan.transformUp { - case mapInPandas: MapInPandasExec if mapInPandasEnabled => - logInfo(s"Replacing MapInPandasExec with DataFlint version for Spark $sparkMinorVersion") - sparkMinorVersion match { - case "4.0" => - DataFlintMapInPandasExec_4_0( - func = mapInPandas.func, - output = mapInPandas.output, - child = mapInPandas.child, - isBarrier = mapInPandas.isBarrier, - profile = mapInPandas.profile - ) - case _ => - // Default to 4.1 implementation for 4.1.x and any future 4.x - DataFlintMapInPandasExec_4_1( - func = mapInPandas.func, - output = mapInPandas.output, - child = mapInPandas.child, - isBarrier = mapInPandas.isBarrier, - profile = mapInPandas.profile - ) - } - case mapInArrow: MapInArrowExec if mapInArrowEnabled => - logInfo(s"Replacing MapInArrowExec with DataFlint version for Spark $sparkMinorVersion") - sparkMinorVersion match { - case "4.0" => - DataFlintPythonMapInArrowExec_4_0( - func = mapInArrow.func, - output = mapInArrow.output, - child = mapInArrow.child, - isBarrier = mapInArrow.isBarrier, - profile = mapInArrow.profile - ) - case _ => - // Default to 4.1 implementation for 4.1.x and any future 4.x - DataFlintPythonMapInArrowExec_4_1( - func = mapInArrow.func, - output = mapInArrow.output, - child = mapInArrow.child, - isBarrier = mapInArrow.isBarrier, - profile = mapInArrow.profile - ) + case mapInPandas: MapInPandasExec if mapInPandasEnabled => + logInfo(s"Replacing MapInPandasExec with DataFlint version for Spark $sparkMinorVersion") + sparkMinorVersion match { + case "4.0" => + DataFlintMapInPandasExec_4_0( + func = mapInPandas.func, + output = mapInPandas.output, + child = mapInPandas.child, + isBarrier = mapInPandas.isBarrier, + profile = mapInPandas.profile + ) + case _ => + // Default to 4.1 implementation for 4.1.x and any future 4.x + DataFlintMapInPandasExec_4_1( + func = mapInPandas.func, + output = mapInPandas.output, + child = mapInPandas.child, + isBarrier = mapInPandas.isBarrier, + profile = mapInPandas.profile + ) + } + case mapInArrow: MapInArrowExec if mapInArrowEnabled => + logInfo(s"Replacing MapInArrowExec with DataFlint version for Spark $sparkMinorVersion") + sparkMinorVersion match { + case "4.0" => + DataFlintPythonMapInArrowExec_4_0( + func = mapInArrow.func, + output = mapInArrow.output, + child = mapInArrow.child, + isBarrier = mapInArrow.isBarrier, + profile = mapInArrow.profile + ) + case _ => + // Default to 4.1 implementation for 4.1.x and any future 4.x + DataFlintPythonMapInArrowExec_4_1( + func = mapInArrow.func, + output = mapInArrow.output, + child = mapInArrow.child, + isBarrier = mapInArrow.isBarrier, + profile = mapInArrow.profile + ) + } } + } +} + +/** + * A planner Strategy that converts the logical Window plan node directly into + * DataFlintWindowExec, bypassing WindowExec entirely. + * + * Using a Strategy (rather than a ColumnarRule) is correct for row-based operators + * like WindowExec that do not participate in columnar execution. + */ +case class DataFlintWindowPlannerStrategy(session: SparkSession) extends SparkStrategy with Logging { + + private val windowEnabled: Boolean = { + val conf = session.sparkContext.conf + val globalEnabled = conf.getBoolean(DataflintSparkUICommonLoader.INSTRUMENT_SPARK_ENABLED, defaultValue = false) + val specificEnabled = conf.getBoolean(DataflintSparkUICommonLoader.INSTRUMENT_WINDOW_ENABLED, defaultValue = false) + globalEnabled || specificEnabled + } + + override def apply(plan: LogicalPlan): Seq[SparkPlan] = { + if (!windowEnabled) return Nil + plan match { + case w: LogicalWindow => + logInfo("Replacing logical Window with DataFlintWindowExec") + DataFlintWindowExec(w.windowExpressions, w.partitionSpec, w.orderSpec, planLater(w.child)) :: Nil + case _ => Nil } } } diff --git a/spark-plugin/pluginspark4/src/main/scala/org/apache/spark/sql/execution/window/DataFlintWindowExec.scala b/spark-plugin/pluginspark4/src/main/scala/org/apache/spark/sql/execution/window/DataFlintWindowExec.scala new file mode 100644 index 0000000..e05bf9a --- /dev/null +++ b/spark-plugin/pluginspark4/src/main/scala/org/apache/spark/sql/execution/window/DataFlintWindowExec.scala @@ -0,0 +1,82 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.sql.execution.window + +import org.apache.spark.internal.Logging +import org.apache.spark.rdd.RDD +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.expressions._ +import org.apache.spark.sql.execution.SparkPlan +import org.apache.spark.sql.execution.metric.{SQLMetric, SQLMetrics} + +import java.util.concurrent.TimeUnit.NANOSECONDS + +/** + * DataFlint instrumented version of WindowExec for Spark 4.x. + * + * Spark 4.0 uses WindowEvaluatorFactory whose eval() returns a lazy iterator, + * so wrapping super.doExecute() is correct: startTime is set before any rows + * are consumed and the duration covers the full window computation. + */ +class DataFlintWindowExec private ( + windowExpression: Seq[NamedExpression], + partitionSpec: Seq[Expression], + orderSpec: Seq[SortOrder], + child: SparkPlan) + extends WindowExec(windowExpression, partitionSpec, orderSpec, child) with Logging { + + logInfo("DataFlint WindowExec is connected") + + override def nodeName: String = "DataFlintWindow" + + override lazy val metrics: Map[String, SQLMetric] = Map( + "spillSize" -> SQLMetrics.createSizeMetric(sparkContext, "spill size"), + "duration" -> SQLMetrics.createTimingMetric(sparkContext, "duration") + ) + + override protected def doExecute(): RDD[InternalRow] = { + val durationMetric = longMetric("duration") + val innerRDD = super.doExecute() + innerRDD.mapPartitions { iter => + val startTime = System.nanoTime() + var done = false + new Iterator[InternalRow] { + override def hasNext: Boolean = { + val r = iter.hasNext + if (!r && !done) { + durationMetric += NANOSECONDS.toMillis(System.nanoTime() - startTime) + done = true + } + r + } + override def next(): InternalRow = iter.next() + } + } + } + + override protected def withNewChildInternal(newChild: SparkPlan): DataFlintWindowExec = + DataFlintWindowExec(windowExpression, partitionSpec, orderSpec, newChild) +} + +object DataFlintWindowExec { + def apply( + windowExpression: Seq[NamedExpression], + partitionSpec: Seq[Expression], + orderSpec: Seq[SortOrder], + child: SparkPlan): DataFlintWindowExec = + new DataFlintWindowExec(windowExpression, partitionSpec, orderSpec, child) +} diff --git a/spark-plugin/pyspark-testing/dataflint_pyspark_example.py b/spark-plugin/pyspark-testing/dataflint_pyspark_example.py index 803d017..c1d2425 100755 --- a/spark-plugin/pyspark-testing/dataflint_pyspark_example.py +++ b/spark-plugin/pyspark-testing/dataflint_pyspark_example.py @@ -55,6 +55,7 @@ .config("spark.dataflint.telemetry.enabled", "false") \ .config("spark.dataflint.instrument.spark.mapInPandas.enabled", "true") \ .config("spark.dataflint.instrument.spark.mapInArrow.enabled", "true") \ + .config("spark.dataflint.instrument.spark.window.enabled", "true") \ .master("local[*]") \ .getOrCreate() @@ -181,6 +182,30 @@ def compute_discounted_totals_arrow(iterator): print(f"Current version: {spark_version}") + +print("\n" + "="*80) +print("Running Window function example") +print("="*80) + +from pyspark.sql import Window +from pyspark.sql.functions import rank, sum as spark_sum, avg + +window_by_category = Window.partitionBy("category").orderBy("price") +window_category_total = Window.partitionBy("category") + +df_window = df.withColumn("rank_in_category", rank().over(window_by_category)) \ + .withColumn("cumulative_revenue", spark_sum("price").over(window_by_category)) \ + .withColumn("avg_price_in_category", avg("price").over(window_category_total)) + +df_window.write \ + .mode("overwrite") \ + .parquet("/tmp/dataflint_window_example") + +print("\nResult written to /tmp/dataflint_window_example") +print("\nSample output:") +df_window.show(10, truncate=False) + + print("\n" + "="*80) print("Done!") print("="*80) From caf9b74b09c39512d92944d78d60f8b49e570b4b Mon Sep 17 00:00:00 2001 From: Avi Minsky Date: Sun, 8 Mar 2026 17:23:05 +0200 Subject: [PATCH 2/6] introduced new instrumentation for adding duration to window function (sql, pythonUDF) added support for spark 4.1.0 and instrument for panda/arrow window function --- spark-plugin/build.sbt | 15 ++- .../DataFlintInstrumentationExtension.scala | 30 +++-- .../python/DataFlintWindowInPandasExec.scala | 73 +++++++++++ .../DataFlintInstrumentationExtension.scala | 117 ++++++++++-------- .../DataFlintArrowWindowPythonExec_4_1.scala | 117 ++++++++++++++++++ .../DataFlintWindowInPandasExec_4_0.scala | 73 +++++++++++ .../dataflint_pyspark_example.py | 42 ++++++- spark-plugin/utils/run-with-spark.sh | 7 ++ spark-ui/src/reducers/SQLNodeStageReducer.ts | 4 +- spark-ui/src/reducers/SqlReducer.ts | 4 + spark-ui/src/reducers/SqlReducerUtils.ts | 9 ++ 11 files changed, 415 insertions(+), 76 deletions(-) create mode 100644 spark-plugin/pluginspark3/src/main/scala/org/apache/spark/sql/execution/python/DataFlintWindowInPandasExec.scala create mode 100644 spark-plugin/pluginspark4/src/main/scala/org/apache/spark/sql/execution/python/DataFlintArrowWindowPythonExec_4_1.scala create mode 100644 spark-plugin/pluginspark4/src/main/scala/org/apache/spark/sql/execution/python/DataFlintWindowInPandasExec_4_0.scala diff --git a/spark-plugin/build.sbt b/spark-plugin/build.sbt index cc58838..dc468fb 100644 --- a/spark-plugin/build.sbt +++ b/spark-plugin/build.sbt @@ -18,7 +18,8 @@ lazy val dataflint = project example_3_4_1, example_3_5_1, example_3_4_1_remote, - example_4_0_1 + example_4_0_1, + example_4_1_0 ).settings( crossScalaVersions := Nil, // Aggregate project version must be Nil, see docs: https://www.scala-sbt.org/1.x/docs/Cross-Build.html publish / skip := true @@ -222,4 +223,16 @@ lazy val example_4_0_1 = (project in file("example_4_0_1")) libraryDependencies += "org.apache.spark" % "spark-core_2.13" % "4.0.1", libraryDependencies += "org.apache.spark" % "spark-sql_2.13" % "4.0.1", publish / skip := true + ).dependsOn(pluginspark4) + +lazy val example_4_1_0 = (project in file("example_4_1_0")) + .settings( + name := "DataflintSparkExample410", + organization := "io.dataflint", + scalaVersion := scala213, + crossScalaVersions := List(scala213), // Only Scala 2.13 for Spark 4.x + // there is no scala 2.12 version so we need to force 2.13 to make it compile + libraryDependencies += "org.apache.spark" % "spark-core_2.13" % "4.1.0", + libraryDependencies += "org.apache.spark" % "spark-sql_2.13" % "4.1.0", + publish / skip := true ).dependsOn(pluginspark4) \ No newline at end of file diff --git a/spark-plugin/pluginspark3/src/main/scala/org/apache/spark/dataflint/DataFlintInstrumentationExtension.scala b/spark-plugin/pluginspark3/src/main/scala/org/apache/spark/dataflint/DataFlintInstrumentationExtension.scala index 5b56463..59e9fc7 100644 --- a/spark-plugin/pluginspark3/src/main/scala/org/apache/spark/dataflint/DataFlintInstrumentationExtension.scala +++ b/spark-plugin/pluginspark3/src/main/scala/org/apache/spark/dataflint/DataFlintInstrumentationExtension.scala @@ -2,23 +2,13 @@ package org.apache.spark.dataflint import org.apache.spark.SPARK_VERSION import org.apache.spark.internal.Logging -import org.apache.spark.sql.SparkSession -import org.apache.spark.sql.SparkSessionExtensions -import org.apache.spark.sql.Strategy +import org.apache.spark.sql.{SparkSession, SparkSessionExtensions, Strategy, execution} +import org.apache.spark.sql.catalyst.expressions.{NamedExpression, WindowFunctionType} +import org.apache.spark.sql.catalyst.planning.PhysicalWindow import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Window => LogicalWindow} import org.apache.spark.sql.catalyst.rules.Rule import org.apache.spark.sql.execution.{ColumnarRule, SparkPlan} -import org.apache.spark.sql.execution.python.{ - DataFlintMapInPandasExec_3_0, - DataFlintMapInPandasExec_3_1, - DataFlintMapInPandasExec_3_3, - DataFlintMapInPandasExec_3_4, - DataFlintMapInPandasExec_3_5, - DataFlintPythonMapInArrowExec_3_3, - DataFlintPythonMapInArrowExec_3_4, - DataFlintPythonMapInArrowExec_3_5, - MapInPandasExec -} +import org.apache.spark.sql.execution.python.{DataFlintMapInPandasExec_3_0, DataFlintMapInPandasExec_3_1, DataFlintMapInPandasExec_3_3, DataFlintMapInPandasExec_3_4, DataFlintMapInPandasExec_3_5, DataFlintPythonMapInArrowExec_3_3, DataFlintPythonMapInArrowExec_3_4, DataFlintPythonMapInArrowExec_3_5, DataFlintWindowInPandasExec, MapInPandasExec, WindowInPandasExec} import org.apache.spark.sql.execution.window.DataFlintWindowExec /** @@ -216,9 +206,17 @@ case class DataFlintWindowPlannerStrategy(session: SparkSession) extends Strateg override def apply(plan: LogicalPlan): Seq[SparkPlan] = { if (!windowEnabled) return Nil plan match { - case w: LogicalWindow => + case PhysicalWindow( + WindowFunctionType.SQL, windowExprs, partitionSpec, orderSpec, child) => logInfo("Replacing logical Window with DataFlintWindowExec") - DataFlintWindowExec(w.windowExpressions, w.partitionSpec, w.orderSpec, planLater(w.child)) :: Nil + DataFlintWindowExec( + windowExprs, partitionSpec, orderSpec, planLater(child)) :: Nil + + case PhysicalWindow( + WindowFunctionType.Python, windowExprs, partitionSpec, orderSpec, child) => + logInfo("Replacing logical Window (Python UDF) with DataFlintWindowInPandasExec") + DataFlintWindowInPandasExec( + windowExprs, partitionSpec, orderSpec, planLater(child)) :: Nil case _ => Nil } } diff --git a/spark-plugin/pluginspark3/src/main/scala/org/apache/spark/sql/execution/python/DataFlintWindowInPandasExec.scala b/spark-plugin/pluginspark3/src/main/scala/org/apache/spark/sql/execution/python/DataFlintWindowInPandasExec.scala new file mode 100644 index 0000000..c4c2671 --- /dev/null +++ b/spark-plugin/pluginspark3/src/main/scala/org/apache/spark/sql/execution/python/DataFlintWindowInPandasExec.scala @@ -0,0 +1,73 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.sql.execution.python + +import org.apache.spark.internal.Logging +import org.apache.spark.rdd.RDD +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.expressions._ +import org.apache.spark.sql.execution.SparkPlan +import org.apache.spark.sql.execution.metric.{SQLMetric, SQLMetrics} + +import java.util.concurrent.TimeUnit.NANOSECONDS + +class DataFlintWindowInPandasExec private ( + windowExpression: Seq[NamedExpression], + partitionSpec: Seq[Expression], + orderSpec: Seq[SortOrder], + child: SparkPlan) + extends WindowInPandasExec(windowExpression, partitionSpec, orderSpec, child) with Logging { + + override def nodeName: String = "DataFlintWindowInPandas" + + override lazy val metrics: Map[String, SQLMetric] = pythonMetrics ++ Map( + "spillSize" -> SQLMetrics.createSizeMetric(sparkContext, "spill size"), + "duration" -> SQLMetrics.createTimingMetric(sparkContext, "duration") + ) + + override protected def doExecute(): RDD[InternalRow] = { + val durationMetric = longMetric("duration") + val innerRDD = super.doExecute() + innerRDD.mapPartitions { iter => + val startTime = System.nanoTime() + var done = false + new Iterator[InternalRow] { + override def hasNext: Boolean = { + val r = iter.hasNext + if (!r && !done) { + durationMetric += NANOSECONDS.toMillis(System.nanoTime() - startTime) + done = true + } + r + } + override def next(): InternalRow = iter.next() + } + } + } + + override protected def withNewChildInternal(newChild: SparkPlan): DataFlintWindowInPandasExec = + DataFlintWindowInPandasExec(windowExpression, partitionSpec, orderSpec, newChild) +} + +object DataFlintWindowInPandasExec { + def apply( + windowExpression: Seq[NamedExpression], + partitionSpec: Seq[Expression], + orderSpec: Seq[SortOrder], + child: SparkPlan): DataFlintWindowInPandasExec = + new DataFlintWindowInPandasExec(windowExpression, partitionSpec, orderSpec, child) +} \ No newline at end of file diff --git a/spark-plugin/pluginspark4/src/main/scala/org/apache/spark/dataflint/DataFlintInstrumentationExtension.scala b/spark-plugin/pluginspark4/src/main/scala/org/apache/spark/dataflint/DataFlintInstrumentationExtension.scala index b93e1b1..e035795 100644 --- a/spark-plugin/pluginspark4/src/main/scala/org/apache/spark/dataflint/DataFlintInstrumentationExtension.scala +++ b/spark-plugin/pluginspark4/src/main/scala/org/apache/spark/dataflint/DataFlintInstrumentationExtension.scala @@ -4,18 +4,13 @@ import org.apache.spark.SPARK_VERSION import org.apache.spark.internal.Logging import org.apache.spark.sql.SparkSession import org.apache.spark.sql.SparkSessionExtensions +import org.apache.spark.sql.catalyst.expressions.{NamedExpression, WindowFunctionType} +import org.apache.spark.sql.catalyst.planning.PhysicalWindow import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Window => LogicalWindow} import org.apache.spark.sql.execution.SparkStrategy import org.apache.spark.sql.catalyst.rules.Rule import org.apache.spark.sql.execution.{ColumnarRule, SparkPlan} -import org.apache.spark.sql.execution.python.{ - DataFlintMapInPandasExec_4_0, - DataFlintMapInPandasExec_4_1, - DataFlintPythonMapInArrowExec_4_0, - DataFlintPythonMapInArrowExec_4_1, - MapInArrowExec, - MapInPandasExec -} +import org.apache.spark.sql.execution.python.{DataFlintArrowWindowPythonExec_4_1, DataFlintMapInPandasExec_4_0, DataFlintMapInPandasExec_4_1, DataFlintPythonMapInArrowExec_4_0, DataFlintPythonMapInArrowExec_4_1, DataFlintWindowInPandasExec_4_0, MapInArrowExec, MapInPandasExec} import org.apache.spark.sql.execution.window.DataFlintWindowExec /** @@ -79,52 +74,53 @@ case class DataFlintInstrumentationColumnarRule(session: SparkSession) extends C globalEnabled || specificEnabled } + override def preColumnarTransitions: Rule[SparkPlan] = { plan => if (!mapInPandasEnabled && !mapInArrowEnabled) plan else plan.transformUp { - case mapInPandas: MapInPandasExec if mapInPandasEnabled => - logInfo(s"Replacing MapInPandasExec with DataFlint version for Spark $sparkMinorVersion") - sparkMinorVersion match { - case "4.0" => - DataFlintMapInPandasExec_4_0( - func = mapInPandas.func, - output = mapInPandas.output, - child = mapInPandas.child, - isBarrier = mapInPandas.isBarrier, - profile = mapInPandas.profile - ) - case _ => - // Default to 4.1 implementation for 4.1.x and any future 4.x - DataFlintMapInPandasExec_4_1( - func = mapInPandas.func, - output = mapInPandas.output, - child = mapInPandas.child, - isBarrier = mapInPandas.isBarrier, - profile = mapInPandas.profile - ) - } - case mapInArrow: MapInArrowExec if mapInArrowEnabled => - logInfo(s"Replacing MapInArrowExec with DataFlint version for Spark $sparkMinorVersion") - sparkMinorVersion match { - case "4.0" => - DataFlintPythonMapInArrowExec_4_0( - func = mapInArrow.func, - output = mapInArrow.output, - child = mapInArrow.child, - isBarrier = mapInArrow.isBarrier, - profile = mapInArrow.profile - ) - case _ => - // Default to 4.1 implementation for 4.1.x and any future 4.x - DataFlintPythonMapInArrowExec_4_1( - func = mapInArrow.func, - output = mapInArrow.output, - child = mapInArrow.child, - isBarrier = mapInArrow.isBarrier, - profile = mapInArrow.profile - ) - } + case mapInPandas: MapInPandasExec if mapInPandasEnabled => + logInfo(s"Replacing MapInPandasExec with DataFlint version for Spark $sparkMinorVersion") + sparkMinorVersion match { + case "4.0" => + DataFlintMapInPandasExec_4_0( + func = mapInPandas.func, + output = mapInPandas.output, + child = mapInPandas.child, + isBarrier = mapInPandas.isBarrier, + profile = mapInPandas.profile + ) + case _ => + // Default to 4.1 implementation for 4.1.x and any future 4.x + DataFlintMapInPandasExec_4_1( + func = mapInPandas.func, + output = mapInPandas.output, + child = mapInPandas.child, + isBarrier = mapInPandas.isBarrier, + profile = mapInPandas.profile + ) + } + case mapInArrow: MapInArrowExec if mapInArrowEnabled => + logInfo(s"Replacing MapInArrowExec with DataFlint version for Spark $sparkMinorVersion") + sparkMinorVersion match { + case "4.0" => + DataFlintPythonMapInArrowExec_4_0( + func = mapInArrow.func, + output = mapInArrow.output, + child = mapInArrow.child, + isBarrier = mapInArrow.isBarrier, + profile = mapInArrow.profile + ) + case _ => + // Default to 4.1 implementation for 4.1.x and any future 4.x + DataFlintPythonMapInArrowExec_4_1( + func = mapInArrow.func, + output = mapInArrow.output, + child = mapInArrow.child, + isBarrier = mapInArrow.isBarrier, + profile = mapInArrow.profile + ) } + } } } @@ -137,6 +133,11 @@ case class DataFlintInstrumentationColumnarRule(session: SparkSession) extends C */ case class DataFlintWindowPlannerStrategy(session: SparkSession) extends SparkStrategy with Logging { + private val sparkMinorVersion: String = { + val parts = SPARK_VERSION.split("\\.") + if (parts.length >= 2) s"${parts(0)}.${parts(1)}" else SPARK_VERSION + } + private val windowEnabled: Boolean = { val conf = session.sparkContext.conf val globalEnabled = conf.getBoolean(DataflintSparkUICommonLoader.INSTRUMENT_SPARK_ENABLED, defaultValue = false) @@ -147,9 +148,21 @@ case class DataFlintWindowPlannerStrategy(session: SparkSession) extends SparkSt override def apply(plan: LogicalPlan): Seq[SparkPlan] = { if (!windowEnabled) return Nil plan match { - case w: LogicalWindow => + case PhysicalWindow( + WindowFunctionType.SQL, windowExprs, partitionSpec, orderSpec, child) => logInfo("Replacing logical Window with DataFlintWindowExec") - DataFlintWindowExec(w.windowExpressions, w.partitionSpec, w.orderSpec, planLater(w.child)) :: Nil + DataFlintWindowExec( + windowExprs, partitionSpec, orderSpec, planLater(child)) :: Nil + + case PhysicalWindow( + WindowFunctionType.Python, windowExprs, partitionSpec, orderSpec, child) => + logInfo(s"Replacing logical Window (Python UDF) with DataFlint version for Spark $sparkMinorVersion") + sparkMinorVersion match { + case "4.0" => + DataFlintWindowInPandasExec_4_0(windowExprs, partitionSpec, orderSpec, planLater(child)) :: Nil + case _ => DataFlintArrowWindowPythonExec_4_1(windowExprs, partitionSpec, orderSpec, planLater(child)) :: Nil + } + case _ => Nil } } diff --git a/spark-plugin/pluginspark4/src/main/scala/org/apache/spark/sql/execution/python/DataFlintArrowWindowPythonExec_4_1.scala b/spark-plugin/pluginspark4/src/main/scala/org/apache/spark/sql/execution/python/DataFlintArrowWindowPythonExec_4_1.scala new file mode 100644 index 0000000..958ef17 --- /dev/null +++ b/spark-plugin/pluginspark4/src/main/scala/org/apache/spark/sql/execution/python/DataFlintArrowWindowPythonExec_4_1.scala @@ -0,0 +1,117 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +/* + * DataFlint instrumented version for Python UDF window functions on Spark 4.1.x. + * + * In Spark 4.1, WindowInPandasExec was replaced by ArrowWindowPythonExec. + * Since pluginspark4 compiles against Spark 4.0.1, we cannot extend ArrowWindowPythonExec + * directly. Instead, we use reflection to create an ArrowWindowPythonExec at runtime + * and delegate execution to it, wrapping the result with a duration metric. + */ +package org.apache.spark.sql.execution.python + +import org.apache.spark.internal.Logging +import org.apache.spark.rdd.RDD +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.expressions._ +import org.apache.spark.sql.catalyst.plans.physical.Partitioning +import org.apache.spark.sql.execution.{SparkPlan, UnaryExecNode} +import org.apache.spark.sql.execution.metric.{SQLMetric, SQLMetrics} + +import java.util.concurrent.TimeUnit.NANOSECONDS + +class DataFlintArrowWindowPythonExec_4_1 private ( + val windowExpression: Seq[NamedExpression], + val partitionSpec: Seq[Expression], + val orderSpec: Seq[SortOrder], + val child: SparkPlan) + extends UnaryExecNode with PythonSQLMetrics with Logging { + + override def nodeName: String = "DataFlintArrowWindowPython" + + override def canEqual(that: Any): Boolean = that.isInstanceOf[DataFlintArrowWindowPythonExec_4_1] + override def productArity: Int = 4 + override def productElement(n: Int): Any = n match { + case 0 => windowExpression + case 1 => partitionSpec + case 2 => orderSpec + case 3 => child + case _ => throw new IndexOutOfBoundsException(s"$n") + } + + override def output: Seq[Attribute] = child.output ++ windowExpression.map(_.toAttribute) + + override def outputOrdering: Seq[SortOrder] = child.outputOrdering + + override def outputPartitioning: Partitioning = child.outputPartitioning + + override lazy val metrics: Map[String, SQLMetric] = pythonMetrics ++ Map( + "spillSize" -> SQLMetrics.createSizeMetric(sparkContext, "spill size"), + "duration" -> SQLMetrics.createTimingMetric(sparkContext, "duration") + ) + + override protected def doExecute(): RDD[InternalRow] = { + val durationMetric = longMetric("duration") + + // Create ArrowWindowPythonExec at runtime via reflection (Spark 4.1-only class). + // We use the companion object's apply method which deduces evalType from window expressions. + val innerRDD: RDD[InternalRow] = try { + val companionClass = Class.forName( + "org.apache.spark.sql.execution.python.ArrowWindowPythonExec$") + val companion = companionClass.getField("MODULE$").get(null) + val applyMethod = companion.getClass.getMethods + .find(m => m.getName == "apply" && m.getParameterCount == 4) + .getOrElse(throw new RuntimeException( + "ArrowWindowPythonExec$.apply(4) not found — Spark 4.1.x required")) + val innerExec = applyMethod.invoke(companion, + windowExpression, partitionSpec, orderSpec, child).asInstanceOf[SparkPlan] + innerExec.execute() + } catch { + case e: Exception => + logWarning(s"DataFlint: failed to create ArrowWindowPythonExec via reflection: ${e.getMessage}") + throw e + } + + innerRDD.mapPartitions { iter => + val startTime = System.nanoTime() + var done = false + new Iterator[InternalRow] { + override def hasNext: Boolean = { + val r = iter.hasNext + if (!r && !done) { + durationMetric += NANOSECONDS.toMillis(System.nanoTime() - startTime) + done = true + } + r + } + override def next(): InternalRow = iter.next() + } + } + } + + override protected def withNewChildInternal(newChild: SparkPlan): DataFlintArrowWindowPythonExec_4_1 = + new DataFlintArrowWindowPythonExec_4_1(windowExpression, partitionSpec, orderSpec, newChild) +} + +object DataFlintArrowWindowPythonExec_4_1 { + def apply( + windowExpression: Seq[NamedExpression], + partitionSpec: Seq[Expression], + orderSpec: Seq[SortOrder], + child: SparkPlan): DataFlintArrowWindowPythonExec_4_1 = + new DataFlintArrowWindowPythonExec_4_1(windowExpression, partitionSpec, orderSpec, child) +} \ No newline at end of file diff --git a/spark-plugin/pluginspark4/src/main/scala/org/apache/spark/sql/execution/python/DataFlintWindowInPandasExec_4_0.scala b/spark-plugin/pluginspark4/src/main/scala/org/apache/spark/sql/execution/python/DataFlintWindowInPandasExec_4_0.scala new file mode 100644 index 0000000..3247aeb --- /dev/null +++ b/spark-plugin/pluginspark4/src/main/scala/org/apache/spark/sql/execution/python/DataFlintWindowInPandasExec_4_0.scala @@ -0,0 +1,73 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.sql.execution.python + +import org.apache.spark.internal.Logging +import org.apache.spark.rdd.RDD +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.expressions._ +import org.apache.spark.sql.execution.SparkPlan +import org.apache.spark.sql.execution.metric.{SQLMetric, SQLMetrics} + +import java.util.concurrent.TimeUnit.NANOSECONDS + +class DataFlintWindowInPandasExec_4_0 private ( + windowExpression: Seq[NamedExpression], + partitionSpec: Seq[Expression], + orderSpec: Seq[SortOrder], + child: SparkPlan) + extends WindowInPandasExec(windowExpression, partitionSpec, orderSpec, child) with Logging { + + override def nodeName: String = "DataFlintWindowInPandas" + + override lazy val metrics: Map[String, SQLMetric] = pythonMetrics ++ Map( + "spillSize" -> SQLMetrics.createSizeMetric(sparkContext, "spill size"), + "duration" -> SQLMetrics.createTimingMetric(sparkContext, "duration") + ) + + override protected def doExecute(): RDD[InternalRow] = { + val durationMetric = longMetric("duration") + val innerRDD = super.doExecute() + innerRDD.mapPartitions { iter => + val startTime = System.nanoTime() + var done = false + new Iterator[InternalRow] { + override def hasNext: Boolean = { + val r = iter.hasNext + if (!r && !done) { + durationMetric += NANOSECONDS.toMillis(System.nanoTime() - startTime) + done = true + } + r + } + override def next(): InternalRow = iter.next() + } + } + } + + override protected def withNewChildInternal(newChild: SparkPlan): DataFlintWindowInPandasExec_4_0 = + DataFlintWindowInPandasExec_4_0(windowExpression, partitionSpec, orderSpec, newChild) +} + +object DataFlintWindowInPandasExec_4_0 { + def apply( + windowExpression: Seq[NamedExpression], + partitionSpec: Seq[Expression], + orderSpec: Seq[SortOrder], + child: SparkPlan): DataFlintWindowInPandasExec_4_0 = + new DataFlintWindowInPandasExec_4_0(windowExpression, partitionSpec, orderSpec, child) +} \ No newline at end of file diff --git a/spark-plugin/pyspark-testing/dataflint_pyspark_example.py b/spark-plugin/pyspark-testing/dataflint_pyspark_example.py index c1d2425..8af12b6 100755 --- a/spark-plugin/pyspark-testing/dataflint_pyspark_example.py +++ b/spark-plugin/pyspark-testing/dataflint_pyspark_example.py @@ -25,11 +25,10 @@ spark_major_version = 3 # default to Spark 3 if spark_home: - # Try to extract version from SPARK_HOME path - if '4.0' in spark_home or 'spark-4' in spark_home: - spark_major_version = 4 - elif '3.' in spark_home or 'spark-3' in spark_home: - spark_major_version = 3 + import re + m = re.search(r'[/_-](\d+)\.\d', spark_home) + if m: + spark_major_version = int(m.group(1)) # Select the appropriate plugin JAR based on Spark version if spark_major_version == 4: @@ -52,13 +51,15 @@ .config("spark.plugins", "io.dataflint.spark.SparkDataflintPlugin") \ .config("spark.ui.port", "10000") \ .config("spark.sql.maxMetadataStringLength", "10000") \ + .config("spark.sql.adaptive.enabled", "false") \ .config("spark.dataflint.telemetry.enabled", "false") \ + .config("spark.dataflint.instrument.spark.enable", "true") \ .config("spark.dataflint.instrument.spark.mapInPandas.enabled", "true") \ .config("spark.dataflint.instrument.spark.mapInArrow.enabled", "true") \ .config("spark.dataflint.instrument.spark.window.enabled", "true") \ .master("local[*]") \ .getOrCreate() - +# spark.sparkContext.setLogLevel("INFO") # Get Spark version and check if mapInArrow is supported spark_version = spark.version version_parts = spark_version.split('.') @@ -189,6 +190,7 @@ def compute_discounted_totals_arrow(iterator): from pyspark.sql import Window from pyspark.sql.functions import rank, sum as spark_sum, avg +from pyspark.sql.types import DoubleType window_by_category = Window.partitionBy("category").orderBy("price") window_category_total = Window.partitionBy("category") @@ -206,6 +208,34 @@ def compute_discounted_totals_arrow(iterator): df_window.show(10, truncate=False) +print("\n" + "="*80) +print("Running Window function with Python UDF example") +print("="*80) + +import pandas as pd +from pyspark.sql.functions import pandas_udf + +@pandas_udf(DoubleType()) +def discounted_sum(prices: pd.Series) -> float: + """Pandas UDF used as a window aggregate: sum of prices with a 10% discount.""" + import time + time.sleep(2) + return prices.sum() * 0.9 + +df_window_udf = df.withColumn( + "discounted_category_revenue", + discounted_sum("price").over(window_category_total) +) + +df_window_udf.write \ + .mode("overwrite") \ + .parquet("/tmp/dataflint_window_udf_example") + +print("\nResult written to /tmp/dataflint_window_udf_example") +print("\nSample output:") +df_window_udf.show(10, truncate=False) + + print("\n" + "="*80) print("Done!") print("="*80) diff --git a/spark-plugin/utils/run-with-spark.sh b/spark-plugin/utils/run-with-spark.sh index 776c7e6..c61b14b 100755 --- a/spark-plugin/utils/run-with-spark.sh +++ b/spark-plugin/utils/run-with-spark.sh @@ -98,6 +98,10 @@ fi # Get current Java version for display JAVA_VERSION=$(java -version 2>&1 | awk -F '"' '/version/ {print $2}' | cut -d'.' -f1) +# Get Scala version from the scala-library JAR bundled with Spark +SCALA_VERSION=$(ls "${SPARK_HOME}/jars/scala-library-"*.jar 2>/dev/null | sed 's/.*scala-library-\([0-9.]*\)\.jar/\1/' | head -1) +SCALA_VERSION="${SCALA_VERSION:-unknown}" + # Ensure driver and worker use the same Python interpreter PYTHON_BIN="$(which python3)" export PYSPARK_PYTHON="${PYTHON_BIN}" @@ -110,9 +114,12 @@ export PYTHONPATH="${SPARK_HOME}/python${PY4J_ZIP:+:${PY4J_ZIP}}${PYTHONPATH:+:$ echo "╔══════════════════════════════════════════════════════════════╗" echo "║ SPARK_HOME : ${SPARK_HOME}" echo "║ Spark ver : ${SPARK_VERSION}" +echo "║ Scala ver : ${SCALA_VERSION}" echo "║ Java ver : ${JAVA_VERSION}" echo "║ Script : ${PYSPARK_SCRIPT}" echo "╚══════════════════════════════════════════════════════════════╝" echo "" +#export SPARK_LOG_LEVEL="${SPARK_LOG_LEVEL:-WARN}" + exec python3 "${PYSPARK_SCRIPT}" "$@" diff --git a/spark-ui/src/reducers/SQLNodeStageReducer.ts b/spark-ui/src/reducers/SQLNodeStageReducer.ts index 63d1016..a361263 100644 --- a/spark-ui/src/reducers/SQLNodeStageReducer.ts +++ b/spark-ui/src/reducers/SQLNodeStageReducer.ts @@ -111,7 +111,9 @@ export function calculateSQLNodeStage(sql: EnrichedSparkSQL, sqlStages: SparkSta if (node.nodeName === "AQEShuffleRead" || node.nodeName === "Coalesce" || node.nodeName === "BatchEvalPython" || node.nodeName === "MapInPandas" || node.nodeName === "DataFlintMapInPandas" || node.nodeName === "MapInArrow" || node.nodeName === "PythonMapInArrow" || node.nodeName === "DataFlintMapInArrow" || - node.nodeName === "ArrowEvalPython" || node.nodeName === "FlatMapGroupsInPandas") { + node.nodeName === "ArrowEvalPython" || node.nodeName === "FlatMapGroupsInPandas" || + node.nodeName === "WindowInPandas" || node.nodeName === "DataFlintWindowInPandas" || node.nodeName === "DataFlintArrowWindowPython" || + node.nodeName === "Window" || node.nodeName === "DataFlintWindow") { const nextNode = findNextNode(node.nodeId); if (nextNode !== undefined && nextNode.stage !== undefined) { return { ...node, stage: nextNode.stage }; diff --git a/spark-ui/src/reducers/SqlReducer.ts b/spark-ui/src/reducers/SqlReducer.ts index dfda358..7fe9eb7 100644 --- a/spark-ui/src/reducers/SqlReducer.ts +++ b/spark-ui/src/reducers/SqlReducer.ts @@ -164,6 +164,10 @@ export function parseNodePlan( plan: parseSort(plan.planDescription), }; case "Window": + case "DataFlintWindow": + case "WindowInPandas": + case "DataFlintWindowInPandas": + case "DataFlintArrowWindowPython": return { type: "Window", plan: parseWindow(plan.planDescription), diff --git a/spark-ui/src/reducers/SqlReducerUtils.ts b/spark-ui/src/reducers/SqlReducerUtils.ts index 94a80d3..98987d6 100644 --- a/spark-ui/src/reducers/SqlReducerUtils.ts +++ b/spark-ui/src/reducers/SqlReducerUtils.ts @@ -192,6 +192,10 @@ const nodeTypeDict: Record = { ArrowEvalPython: "transformation", FlatMapGroupsInPandas: "transformation", BatchEvalPython: "transformation", + WindowInPandas: "transformation", + DataFlintWindowInPandas: "transformation", + DataFlintArrowWindowPython: "transformation", + DataFlintWindow: "transformation", Generate: "transformation", Expand: "transformation", }; @@ -272,6 +276,11 @@ const nodeRenamerDict: Record = { ArrowEvalPython: "Select (with Arrow)", FlatMapGroupsInPandas: "Select Flat (with Pandas)", BatchEvalPython: "Run Python UDF", + Window: "Window", + WindowInPandas: "Window (with Pandas UDF)", + DataFlintWindow: "Window", + DataFlintWindowInPandas: "Window (with Pandas UDF)", + DataFlintArrowWindowPython: "Window (with Pandas UDF)", Expand: "Expand", }; From c9c01a07dfaa6aaa011dc7966953745ccdc5c21c Mon Sep 17 00:00:00 2001 From: Avi Minsky Date: Tue, 10 Mar 2026 15:53:59 +0200 Subject: [PATCH 3/6] created in plugin code a DataFlintRDDUtils object which does the actual duration wrapper. All implementations now use the DataFlintRDDUtils.withDurationMetric method instead of a copy --- spark-plugin/.sbtopts | 1 + spark-plugin/build.sbt | 10 +- .../spark/dataflint/DataFlintRDDUtils.scala | 26 +++ .../python/DataFlintWindowInPandasExec.scala | 24 +-- .../window/DataFlintWindowExec.scala | 45 +++-- .../dataflint/DataFlintWindowExecSpec.scala | 183 ++++++++++++++++++ .../DataFlintArrowWindowPythonExec_4_1.scala | 19 +- .../DataFlintWindowInPandasExec_4_0.scala | 24 +-- .../window/DataFlintWindowExec.scala | 24 +-- 9 files changed, 254 insertions(+), 102 deletions(-) create mode 100644 spark-plugin/.sbtopts create mode 100644 spark-plugin/plugin/src/main/scala/org/apache/spark/dataflint/DataFlintRDDUtils.scala create mode 100644 spark-plugin/pluginspark3/src/test/scala/org/apache/spark/dataflint/DataFlintWindowExecSpec.scala diff --git a/spark-plugin/.sbtopts b/spark-plugin/.sbtopts new file mode 100644 index 0000000..32d94f6 --- /dev/null +++ b/spark-plugin/.sbtopts @@ -0,0 +1 @@ +-java-home /Users/aviminsky/Library/Java/JavaVirtualMachines/corretto-17.0.18/Contents/Home \ No newline at end of file diff --git a/spark-plugin/build.sbt b/spark-plugin/build.sbt index dc468fb..9aa8505 100644 --- a/spark-plugin/build.sbt +++ b/spark-plugin/build.sbt @@ -89,7 +89,15 @@ lazy val pluginspark3 = (project in file("pluginspark3")) Compile / unmanagedSourceDirectories += (plugin / Compile / sourceDirectory).value / "scala", // Include resources from plugin directory for static UI files - Compile / unmanagedResourceDirectories += (plugin / Compile / resourceDirectory).value + Compile / unmanagedResourceDirectories += (plugin / Compile / resourceDirectory).value, + libraryDependencies += "org.scalatest" %% "scalatest-funsuite" % "3.2.17" % Test, + libraryDependencies += "org.scalatest" %% "scalatest-shouldmatchers" % "3.2.17" % Test, + libraryDependencies += "org.apache.spark" %% "spark-core" % "3.5.1" % Test, + libraryDependencies += "org.apache.spark" %% "spark-sql" % "3.5.1" % Test, + + // Include source and resources from plugin directory for tests + Test / unmanagedSourceDirectories += (plugin / Compile / sourceDirectory).value / "scala", + ) lazy val pluginspark4 = (project in file("pluginspark4")) diff --git a/spark-plugin/plugin/src/main/scala/org/apache/spark/dataflint/DataFlintRDDUtils.scala b/spark-plugin/plugin/src/main/scala/org/apache/spark/dataflint/DataFlintRDDUtils.scala new file mode 100644 index 0000000..2135492 --- /dev/null +++ b/spark-plugin/plugin/src/main/scala/org/apache/spark/dataflint/DataFlintRDDUtils.scala @@ -0,0 +1,26 @@ +package org.apache.spark.dataflint + +import org.apache.spark.rdd.RDD +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.execution.metric.SQLMetric + +import java.util.concurrent.TimeUnit.NANOSECONDS + +object DataFlintRDDUtils { + def withDurationMetric(rdd: RDD[InternalRow], durationMetric: SQLMetric): RDD[InternalRow] = + rdd.mapPartitions { iter => + val startTime = System.nanoTime() + var done = false + new Iterator[InternalRow] { + override def hasNext: Boolean = { + val r = iter.hasNext + if (!r && !done) { + durationMetric += NANOSECONDS.toMillis(System.nanoTime() - startTime) + done = true + } + r + } + override def next(): InternalRow = iter.next() + } + } +} \ No newline at end of file diff --git a/spark-plugin/pluginspark3/src/main/scala/org/apache/spark/sql/execution/python/DataFlintWindowInPandasExec.scala b/spark-plugin/pluginspark3/src/main/scala/org/apache/spark/sql/execution/python/DataFlintWindowInPandasExec.scala index c4c2671..c757288 100644 --- a/spark-plugin/pluginspark3/src/main/scala/org/apache/spark/sql/execution/python/DataFlintWindowInPandasExec.scala +++ b/spark-plugin/pluginspark3/src/main/scala/org/apache/spark/sql/execution/python/DataFlintWindowInPandasExec.scala @@ -16,6 +16,7 @@ */ package org.apache.spark.sql.execution.python +import org.apache.spark.dataflint.DataFlintRDDUtils import org.apache.spark.internal.Logging import org.apache.spark.rdd.RDD import org.apache.spark.sql.catalyst.InternalRow @@ -23,8 +24,6 @@ import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.execution.SparkPlan import org.apache.spark.sql.execution.metric.{SQLMetric, SQLMetrics} -import java.util.concurrent.TimeUnit.NANOSECONDS - class DataFlintWindowInPandasExec private ( windowExpression: Seq[NamedExpression], partitionSpec: Seq[Expression], @@ -39,25 +38,8 @@ class DataFlintWindowInPandasExec private ( "duration" -> SQLMetrics.createTimingMetric(sparkContext, "duration") ) - override protected def doExecute(): RDD[InternalRow] = { - val durationMetric = longMetric("duration") - val innerRDD = super.doExecute() - innerRDD.mapPartitions { iter => - val startTime = System.nanoTime() - var done = false - new Iterator[InternalRow] { - override def hasNext: Boolean = { - val r = iter.hasNext - if (!r && !done) { - durationMetric += NANOSECONDS.toMillis(System.nanoTime() - startTime) - done = true - } - r - } - override def next(): InternalRow = iter.next() - } - } - } + override protected def doExecute(): RDD[InternalRow] = + DataFlintRDDUtils.withDurationMetric(super.doExecute(), longMetric("duration")) override protected def withNewChildInternal(newChild: SparkPlan): DataFlintWindowInPandasExec = DataFlintWindowInPandasExec(windowExpression, partitionSpec, orderSpec, newChild) diff --git a/spark-plugin/pluginspark3/src/main/scala/org/apache/spark/sql/execution/window/DataFlintWindowExec.scala b/spark-plugin/pluginspark3/src/main/scala/org/apache/spark/sql/execution/window/DataFlintWindowExec.scala index bb8b83a..1ad2522 100644 --- a/spark-plugin/pluginspark3/src/main/scala/org/apache/spark/sql/execution/window/DataFlintWindowExec.scala +++ b/spark-plugin/pluginspark3/src/main/scala/org/apache/spark/sql/execution/window/DataFlintWindowExec.scala @@ -13,9 +13,31 @@ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. + * + * What's currently included in duration + + The timer wraps iter from super.doExecute(). When you call iter.hasNext, the parent WindowExec iterator internally: + 1. Pulls rows from its child (e.g., a SortExec) + 2. Accumulates a full partition group + 3. Computes window function values + 4. Emits output rows + + Steps 1–3 all happen during the first iter.hasNext call. So the timer captures child-fetch time + window computation time combined. + + How to isolate just the window computation + + You can't easily do this with the current wrapping approach because WindowExec uses an internal buffer pattern — it reads all input for a partition group eagerly on the first hasNext. To truly isolate window computation you'd + need to: + + 1. Override doExecute more deeply — instrument inside WindowFunctionFrame.prepare() and write() calls, which are the actual window computation steps. This requires overriding private Spark internals. + 2. Subtract child time — measure the child RDD's execution time separately and subtract it. Fragile and inaccurate. + 3. Accept the current scope as "window stage time" — which is the pragmatic choice. The metric measures the time from when the partition iterator is first pulled until it's exhausted. This is a reasonable proxy for window + operator cost in a query plan. + */ package org.apache.spark.sql.execution.window +import org.apache.spark.dataflint.DataFlintRDDUtils import org.apache.spark.internal.Logging import org.apache.spark.rdd.RDD import org.apache.spark.sql.catalyst.InternalRow @@ -23,8 +45,6 @@ import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.execution.SparkPlan import org.apache.spark.sql.execution.metric.{SQLMetric, SQLMetrics} -import java.util.concurrent.TimeUnit.NANOSECONDS - class DataFlintWindowExec private ( windowExpression: Seq[NamedExpression], partitionSpec: Seq[Expression], @@ -39,25 +59,8 @@ class DataFlintWindowExec private ( "duration" -> SQLMetrics.createTimingMetric(sparkContext, "duration") ) - override protected def doExecute(): RDD[InternalRow] = { - val durationMetric = longMetric("duration") - val innerRDD = super.doExecute() - innerRDD.mapPartitions { iter => - val startTime = System.nanoTime() - var done = false - new Iterator[InternalRow] { - override def hasNext: Boolean = { - val r = iter.hasNext - if (!r && !done) { - durationMetric += NANOSECONDS.toMillis(System.nanoTime() - startTime) - done = true - } - r - } - override def next(): InternalRow = iter.next() - } - } - } + override protected def doExecute(): RDD[InternalRow] = + DataFlintRDDUtils.withDurationMetric(super.doExecute(), longMetric("duration")) override protected def withNewChildInternal(newChild: SparkPlan): DataFlintWindowExec = DataFlintWindowExec(windowExpression, partitionSpec, orderSpec, newChild) diff --git a/spark-plugin/pluginspark3/src/test/scala/org/apache/spark/dataflint/DataFlintWindowExecSpec.scala b/spark-plugin/pluginspark3/src/test/scala/org/apache/spark/dataflint/DataFlintWindowExecSpec.scala new file mode 100644 index 0000000..872d00a --- /dev/null +++ b/spark-plugin/pluginspark3/src/test/scala/org/apache/spark/dataflint/DataFlintWindowExecSpec.scala @@ -0,0 +1,183 @@ +package org.apache.spark.dataflint + +import org.apache.spark.sql.{Encoder, Encoders, SparkSession} +import org.apache.spark.sql.execution.window.DataFlintWindowExec +import org.apache.spark.sql.expressions.Aggregator +import org.apache.spark.sql.functions.udaf +import org.scalatest.BeforeAndAfterAll +import org.scalatest.funsuite.AnyFunSuite +import org.scalatest.matchers.should.Matchers + +import java.util.concurrent.TimeUnit.NANOSECONDS + +private class SlowSumAggregator(sleep: Long) extends Aggregator[Long, Long, Long] { + def zero: Long = 0L + def reduce(b: Long, a: Long): Long = { + Thread.sleep(sleep); b + a + } + def merge(b1: Long, b2: Long): Long = b1 + b2 + def finish(r: Long): Long = r + def bufferEncoder: Encoder[Long] = Encoders.scalaLong + def outputEncoder: Encoder[Long] = Encoders.scalaLong +} + +class DataFlintWindowExecSpec extends AnyFunSuite with Matchers with BeforeAndAfterAll { + + private var spark: SparkSession = _ + + override def beforeAll(): Unit = { + spark = SparkSession.builder() + .master("local[1]") + .appName("DataFlintWindowExecSpec") + .config("spark.sql.extensions", "org.apache.spark.dataflint.DataFlintInstrumentationExtension") + .config(DataflintSparkUICommonLoader.INSTRUMENT_SPARK_ENABLED, "true") + .config(DataflintSparkUICommonLoader.INSTRUMENT_WINDOW_ENABLED, "true") + .config("spark.ui.enabled", "true") + .config("spark.sql.adaptive.enabled", "false") + .getOrCreate() + } + + override def afterAll(): Unit = { + if (spark != null) spark.stop() + } + + test("DataFlintWindowPlannerStrategy replaces WindowExec with DataFlintWindowExec for SQL window") { + val session = spark + import session.implicits._ + val df = Seq((1, "a"), (2, "b"), (3, "a"), (4, "b"), (5, "a")).toDF("id", "cat") + df.createOrReplaceTempView("test_window_plan") + + val result = spark.sql( + "SELECT id, cat, rank() OVER (PARTITION BY cat ORDER BY id) AS r FROM test_window_plan" + ) + val windowNodes = result.queryExecution.executedPlan.collect { + case w: DataFlintWindowExec => w + } + + withClue("Expected DataFlintWindowExec in physical plan but found: " + + result.queryExecution.executedPlan.treeString) { + windowNodes should not be empty + } + } + + test("DataFlintWindowExec duration metric is positive after execution") { + val session = spark + import session.implicits._ + // 100 rows across 5 partitions — enough to ensure window work takes > 0ms + val df = (1 to 10000).map(i => (i, i % 5)).toDF("id", "cat") + df.createOrReplaceTempView("test_window_timing") + + val result = spark.sql( + "SELECT id, cat, rank() OVER (PARTITION BY cat ORDER BY id) AS r FROM test_window_timing" + ) + result.collect() + + val windowNode = result.queryExecution.executedPlan.collect { + case w: DataFlintWindowExec => w + }.head + val duration = windowNode.metrics("duration").value + duration should be > 0L + } + + test("DataFlintWindowExec duration metric is bounded by wall clock time") { + val session = spark + import session.implicits._ + val df = (1 to 100000).map(i => (i, i % 5)).toDF("id", "cat") + df.createOrReplaceTempView("test_window_wall_clock") + + val result = spark.sql( + "SELECT id, cat, rank() OVER (PARTITION BY cat ORDER BY id) AS r FROM test_window_wall_clock" + ) + + val wallStart = System.nanoTime() + result.collect() + val wallMs = NANOSECONDS.toMillis(System.nanoTime() - wallStart) + + val windowNode = result.queryExecution.executedPlan.collect { + case w: DataFlintWindowExec => w + }.head + val duration = windowNode.metrics("duration").value + + duration should be > 0L + duration should be <= wallMs + } + + test("DataFlintWindowExec duration metric scales with data size") { + val session = spark + import session.implicits._ + + val dfSmall = (1 to 100).map(i => (i, i % 5)).toDF("id", "cat") + dfSmall.createOrReplaceTempView("test_window_small") + val resultSmall = spark.sql( + "SELECT id, cat, rank() OVER (PARTITION BY cat ORDER BY id) AS r FROM test_window_small" + ) + resultSmall.collect() + val durationSmall = resultSmall.queryExecution.executedPlan.collect { + case w: DataFlintWindowExec => w + }.head.metrics("duration").value + + val dfLarge = (1 to 1000000).map(i => (i, i % 5)).toDF("id", "cat") + dfLarge.createOrReplaceTempView("test_window_large") + val resultLarge = spark.sql( + "SELECT id, cat, rank() OVER (PARTITION BY cat ORDER BY id) AS r FROM test_window_large" + ) + resultLarge.collect() + val durationLarge = resultLarge.queryExecution.executedPlan.collect { + case w: DataFlintWindowExec => w + }.head.metrics("duration").value + + durationLarge should be > durationSmall + } + + test("DataFlintWindowExec duration metric captures window-internal UDAF computation") { + val session = spark + import session.implicits._ + + // A UDAF whose reduce() sleeps 1ms per row — all work happens inside the window operator. + // With 200 rows / 5 categories = 40 rows per partition, reduce() is called 200 times total + // → at least 200ms of window-internal computation that the metric must capture. + val sleepTime=4 + spark.udf.register("slow_sum", udaf(new SlowSumAggregator(sleepTime))) + + val rows = 200 + val partitions = 5 + val df = (1 to rows).map(i => (i, i % partitions)).toDF("id", "cat") + df.createOrReplaceTempView("test_window_udaf") + + // Fast baseline: rank() has negligible per-row computation + val resultFast = spark.sql( + "SELECT id, cat, rank() OVER (PARTITION BY cat ORDER BY id) AS r FROM test_window_udaf" + ) + resultFast.collect() + val durationFast = resultFast.queryExecution.executedPlan.collect { + case w: DataFlintWindowExec => w + }.head.metrics("duration").value + + // Slow: UDAF sleeps 1ms per row inside the window operator — 200ms+ of window-internal work + val resultSlow = spark.sql( + "SELECT id, cat, slow_sum(CAST(id AS BIGINT)) OVER (PARTITION BY cat) AS r FROM test_window_udaf" + ) + val wallSlowStart = System.nanoTime() + resultSlow.collect() + val wallSlowMs = NANOSECONDS.toMillis(System.nanoTime() - wallSlowStart) + val durationSlow = resultSlow.queryExecution.executedPlan.collect { + case w: DataFlintWindowExec => w + }.head.metrics("duration").value + Thread.sleep(50000) + // The metric must capture the UDAF's sleep time (at least rows*sleepTime ms = 1ms × 200 rows) + withClue(s"durationSlow=$durationSlow ms should be >= ${rows*sleepTime}ms (UDAF sleep captured in metric)") { + durationSlow should be >= rows.toLong + } + + // The metric must reflect actual window computation — slow >> fast + withClue(s"durationSlow=$durationSlow ms should exceed durationFast=$durationFast ms") { + durationSlow should be > durationFast + } + + // The metric must be bounded by wall-clock time (no phantom time) + withClue(s"durationSlow=$durationSlow ms should be <= wallSlowMs=$wallSlowMs ms") { + durationSlow should be <= wallSlowMs + } + } + +} \ No newline at end of file diff --git a/spark-plugin/pluginspark4/src/main/scala/org/apache/spark/sql/execution/python/DataFlintArrowWindowPythonExec_4_1.scala b/spark-plugin/pluginspark4/src/main/scala/org/apache/spark/sql/execution/python/DataFlintArrowWindowPythonExec_4_1.scala index 958ef17..f142e7d 100644 --- a/spark-plugin/pluginspark4/src/main/scala/org/apache/spark/sql/execution/python/DataFlintArrowWindowPythonExec_4_1.scala +++ b/spark-plugin/pluginspark4/src/main/scala/org/apache/spark/sql/execution/python/DataFlintArrowWindowPythonExec_4_1.scala @@ -24,6 +24,7 @@ */ package org.apache.spark.sql.execution.python +import org.apache.spark.dataflint.DataFlintRDDUtils import org.apache.spark.internal.Logging import org.apache.spark.rdd.RDD import org.apache.spark.sql.catalyst.InternalRow @@ -32,8 +33,6 @@ import org.apache.spark.sql.catalyst.plans.physical.Partitioning import org.apache.spark.sql.execution.{SparkPlan, UnaryExecNode} import org.apache.spark.sql.execution.metric.{SQLMetric, SQLMetrics} -import java.util.concurrent.TimeUnit.NANOSECONDS - class DataFlintArrowWindowPythonExec_4_1 private ( val windowExpression: Seq[NamedExpression], val partitionSpec: Seq[Expression], @@ -86,21 +85,7 @@ class DataFlintArrowWindowPythonExec_4_1 private ( throw e } - innerRDD.mapPartitions { iter => - val startTime = System.nanoTime() - var done = false - new Iterator[InternalRow] { - override def hasNext: Boolean = { - val r = iter.hasNext - if (!r && !done) { - durationMetric += NANOSECONDS.toMillis(System.nanoTime() - startTime) - done = true - } - r - } - override def next(): InternalRow = iter.next() - } - } + DataFlintRDDUtils.withDurationMetric(innerRDD, durationMetric) } override protected def withNewChildInternal(newChild: SparkPlan): DataFlintArrowWindowPythonExec_4_1 = diff --git a/spark-plugin/pluginspark4/src/main/scala/org/apache/spark/sql/execution/python/DataFlintWindowInPandasExec_4_0.scala b/spark-plugin/pluginspark4/src/main/scala/org/apache/spark/sql/execution/python/DataFlintWindowInPandasExec_4_0.scala index 3247aeb..d86367c 100644 --- a/spark-plugin/pluginspark4/src/main/scala/org/apache/spark/sql/execution/python/DataFlintWindowInPandasExec_4_0.scala +++ b/spark-plugin/pluginspark4/src/main/scala/org/apache/spark/sql/execution/python/DataFlintWindowInPandasExec_4_0.scala @@ -16,6 +16,7 @@ */ package org.apache.spark.sql.execution.python +import org.apache.spark.dataflint.DataFlintRDDUtils import org.apache.spark.internal.Logging import org.apache.spark.rdd.RDD import org.apache.spark.sql.catalyst.InternalRow @@ -23,8 +24,6 @@ import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.execution.SparkPlan import org.apache.spark.sql.execution.metric.{SQLMetric, SQLMetrics} -import java.util.concurrent.TimeUnit.NANOSECONDS - class DataFlintWindowInPandasExec_4_0 private ( windowExpression: Seq[NamedExpression], partitionSpec: Seq[Expression], @@ -39,25 +38,8 @@ class DataFlintWindowInPandasExec_4_0 private ( "duration" -> SQLMetrics.createTimingMetric(sparkContext, "duration") ) - override protected def doExecute(): RDD[InternalRow] = { - val durationMetric = longMetric("duration") - val innerRDD = super.doExecute() - innerRDD.mapPartitions { iter => - val startTime = System.nanoTime() - var done = false - new Iterator[InternalRow] { - override def hasNext: Boolean = { - val r = iter.hasNext - if (!r && !done) { - durationMetric += NANOSECONDS.toMillis(System.nanoTime() - startTime) - done = true - } - r - } - override def next(): InternalRow = iter.next() - } - } - } + override protected def doExecute(): RDD[InternalRow] = + DataFlintRDDUtils.withDurationMetric(super.doExecute(), longMetric("duration")) override protected def withNewChildInternal(newChild: SparkPlan): DataFlintWindowInPandasExec_4_0 = DataFlintWindowInPandasExec_4_0(windowExpression, partitionSpec, orderSpec, newChild) diff --git a/spark-plugin/pluginspark4/src/main/scala/org/apache/spark/sql/execution/window/DataFlintWindowExec.scala b/spark-plugin/pluginspark4/src/main/scala/org/apache/spark/sql/execution/window/DataFlintWindowExec.scala index e05bf9a..454bca0 100644 --- a/spark-plugin/pluginspark4/src/main/scala/org/apache/spark/sql/execution/window/DataFlintWindowExec.scala +++ b/spark-plugin/pluginspark4/src/main/scala/org/apache/spark/sql/execution/window/DataFlintWindowExec.scala @@ -16,6 +16,7 @@ */ package org.apache.spark.sql.execution.window +import org.apache.spark.dataflint.DataFlintRDDUtils import org.apache.spark.internal.Logging import org.apache.spark.rdd.RDD import org.apache.spark.sql.catalyst.InternalRow @@ -23,8 +24,6 @@ import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.execution.SparkPlan import org.apache.spark.sql.execution.metric.{SQLMetric, SQLMetrics} -import java.util.concurrent.TimeUnit.NANOSECONDS - /** * DataFlint instrumented version of WindowExec for Spark 4.x. * @@ -48,25 +47,8 @@ class DataFlintWindowExec private ( "duration" -> SQLMetrics.createTimingMetric(sparkContext, "duration") ) - override protected def doExecute(): RDD[InternalRow] = { - val durationMetric = longMetric("duration") - val innerRDD = super.doExecute() - innerRDD.mapPartitions { iter => - val startTime = System.nanoTime() - var done = false - new Iterator[InternalRow] { - override def hasNext: Boolean = { - val r = iter.hasNext - if (!r && !done) { - durationMetric += NANOSECONDS.toMillis(System.nanoTime() - startTime) - done = true - } - r - } - override def next(): InternalRow = iter.next() - } - } - } + override protected def doExecute(): RDD[InternalRow] = + DataFlintRDDUtils.withDurationMetric(super.doExecute(), longMetric("duration")) override protected def withNewChildInternal(newChild: SparkPlan): DataFlintWindowExec = DataFlintWindowExec(windowExpression, partitionSpec, orderSpec, newChild) From 475a093c940b647ebf8cd38d69978cae8c61d573 Mon Sep 17 00:00:00 2001 From: Avi Minsky Date: Tue, 10 Mar 2026 15:59:35 +0200 Subject: [PATCH 4/6] updated .gitignore --- spark-plugin/.gitignore | 1 + spark-plugin/.sbtopts | 1 - 2 files changed, 1 insertion(+), 1 deletion(-) delete mode 100644 spark-plugin/.sbtopts diff --git a/spark-plugin/.gitignore b/spark-plugin/.gitignore index 793aa5b..ff5d857 100644 --- a/spark-plugin/.gitignore +++ b/spark-plugin/.gitignore @@ -55,6 +55,7 @@ plugin/src/main/resources/io/ # custom .bsp +.sbtopts # Downloaded Spark versions (large binaries, not for VCS) utils/.spark-versions/ diff --git a/spark-plugin/.sbtopts b/spark-plugin/.sbtopts deleted file mode 100644 index 32d94f6..0000000 --- a/spark-plugin/.sbtopts +++ /dev/null @@ -1 +0,0 @@ --java-home /Users/aviminsky/Library/Java/JavaVirtualMachines/corretto-17.0.18/Contents/Home \ No newline at end of file From e49cdacbcf8066dcae9e6405f749270810e63619 Mon Sep 17 00:00:00 2001 From: Avi Minsky Date: Tue, 10 Mar 2026 16:02:26 +0200 Subject: [PATCH 5/6] updated .gitignore --- .../instrumentation-extension/SKILL.md | 162 ------------------ spark-plugin/.gitignore | 1 + 2 files changed, 1 insertion(+), 162 deletions(-) delete mode 100644 spark-plugin/.claude/commands/instrumentation-extension/SKILL.md diff --git a/spark-plugin/.claude/commands/instrumentation-extension/SKILL.md b/spark-plugin/.claude/commands/instrumentation-extension/SKILL.md deleted file mode 100644 index f920528..0000000 --- a/spark-plugin/.claude/commands/instrumentation-extension/SKILL.md +++ /dev/null @@ -1,162 +0,0 @@ -# Instrumentation Extension Expert - -You are an expert on the `DataFlintInstrumentationExtension` — the Spark SQL extension that injects duration metrics into MapInPandas and MapInArrow physical plan nodes across Spark 3.0–4.1. - -## How to use this skill - -Respond to any question or task about the instrumentation extension. Common tasks: -- Add a new plan node type (new `case` branch in the ColumnarRule) -- Debug why a metric isn't appearing -- Add support for a new Spark version -- Understand how a specific version implementation works - ---- - -## Architecture Overview - -### Entry Points -- **Spark 3.x**: `pluginspark3/src/main/scala/org/apache/spark/dataflint/DataFlintInstrumentationExtension.scala` -- **Spark 4.x**: `pluginspark4/src/main/scala/org/apache/spark/dataflint/DataFlintInstrumentationExtension.scala` -- **Config/registration**: `plugin/src/main/scala/org/apache/spark/dataflint/listener/DataflintSparkUICommonLoader.scala` - -### Registration Flow -1. `DataflintSparkUICommonLoader.registerInstrumentationExtension()` appends the extension class to `spark.sql.extensions` if any instrumentation config is enabled. -2. The extension registers a `DataFlintInstrumentationColumnarRule` via `injectColumnar`. -3. The rule's `preColumnarTransitions` hook runs `transformUp` on the physical plan, replacing target nodes. - -### Config Flags -``` -spark.dataflint.instrument.spark.enabled -spark.dataflint.instrument.spark.mapInPandas.enabled -spark.dataflint.instrument.spark.mapInArrow.enabled -``` - ---- - -## ColumnarRule Pattern - -### Spark 3.x (two separate methods to avoid ClassNotFoundError on old versions) -```scala -def replaceMapInPandas(plan: SparkPlan): SparkPlan = plan.transformUp { - case mapInPandas: MapInPandasExec if mapInPandasEnabled => - sparkMinorVersion match { - case "3.0" => DataFlintMapInPandasExec_3_0(...) - case "3.1" | "3.2" => DataFlintMapInPandasExec_3_1(...) - case "3.3" => DataFlintMapInPandasExec_3_3(...) - case "3.4" => DataFlintMapInPandasExec_3_4(...) - case _ => DataFlintMapInPandasExec_3_5(...) // 3.5+ - } -} - -def replaceMapInArrow(plan: SparkPlan): SparkPlan = plan.transformUp { - case mapInArrow: PythonMapInArrowExec if mapInArrowEnabled => - sparkMinorVersion match { - case "3.3" => DataFlintPythonMapInArrowExec_3_3(...) - case "3.4" => DataFlintPythonMapInArrowExec_3_4(...) - case _ => DataFlintPythonMapInArrowExec_3_5(...) // 3.5+ - } -} - -override def preColumnarTransitions: SparkPlan => SparkPlan = plan => { - var result = replaceMapInPandas(plan) - try { result = replaceMapInArrow(result) } catch { case _: ClassNotFoundException => } - result -} -``` - -### Spark 4.x (single combined transformUp) -```scala -override def preColumnarTransitions: SparkPlan => SparkPlan = plan => - plan.transformUp { - case mapInPandas: MapInPandasExec if mapInPandasEnabled => - sparkMinorVersion match { - case "4.0" => DataFlintMapInPandasExec_4_0(...) - case _ => DataFlintMapInPandasExec_4_1(...) - } - case mapInArrow: MapInArrowExec if mapInArrowEnabled => - sparkMinorVersion match { - case "4.0" => DataFlintPythonMapInArrowExec_4_0(...) - case _ => DataFlintPythonMapInArrowExec_4_1(...) - } - } -``` - ---- - -## Version-Specific Implementation Files - -All live under: -- `pluginspark3/src/main/scala/org/apache/spark/sql/execution/python/` -- `pluginspark4/src/main/scala/org/apache/spark/sql/execution/python/` - -### Key differences by version - -| Version | File suffix | Notable changes | -|---------|-------------|-----------------| -| 3.0 | `_3_0` | No MapInBatchExec trait, no PythonSQLMetrics, 6-arg ArrowPythonRunner via reflection | -| 3.1/3.2 | `_3_1` | Same as 3.0, 6-arg runner | -| 3.3 | `_3_3` | MapInBatchExec trait added; MapInArrow support introduced; still 6-arg runner | -| 3.4 | `_3_4` | PythonSQLMetrics mixin; 7-arg runner (adds pythonMetrics param) | -| 3.5 | `_3_5` | Major refactor: MapInBatchEvaluatorFactory; barrier mode; JobArtifactSet (Spark Connect) | -| 4.0 | `_4_0` | ResourceProfile field; chainedFunc includes resultId (`Seq[(ChainedPythonFunctions, Long)]`) | -| 4.1 | `_4_1` | sessionUUID for worker logging; pythonUDFProfiler; 13-arg EvaluatorFactory via reflection | - -### Duration metric pattern (all versions) -```scala -val durationMetric = SQLMetrics.createTimingMetric(sparkContext, "duration") -// ... measured in nanoseconds, reported in milliseconds -val startTime = System.nanoTime() -// ... execute the original logic ... -durationMetric += (System.nanoTime() - startTime) / 1000000 -``` - -### Why reflection? -Constructor signatures change between Spark minor versions. Rather than maintaining separate binary artifacts per patch version, reflection allows a single compiled class to work across minor versions within a series. - ---- - -## How to Add a New Plan Node Type - -### Step 1: Identify the target node -Find the physical plan node class (e.g., `FlatMapGroupsInPandasExec`) and its constructor parameters across all relevant Spark versions. - -### Step 2: Add config flag (optional) -In `DataflintSparkUICommonLoader.scala`, add a new constant and check it in `registerInstrumentationExtension()`. - -### Step 3: Create instrumented wrapper classes -For each Spark version that supports the node, create a new file: -``` -DataFlintExec__.scala -``` - -The wrapper must: -1. Extend the original node class (copy all constructor params) -2. Override `metrics` to add the `"duration"` timing metric -3. Wrap the execution logic to measure and record duration - -### Step 4: Add case branches -In both `pluginspark3` and `pluginspark4` extension files, add a new `case` branch: -```scala -case target: TargetExec if targetEnabled => - sparkMinorVersion match { - case "3.3" => DataFlintTargetExec_3_3(...) - // ... - } -``` - -For Spark 3.x: if the node doesn't exist in 3.0/3.1, wrap the replacement in a separate method with try-catch for `ClassNotFoundException`. - -### Step 5: Wire the guard flag -Add the enabled flag lookup: -```scala -val targetEnabled = conf.getBoolean("spark.dataflint.instrument.spark..enabled", defaultValue = true) -``` - ---- - -## Debugging Tips - -- **Metric not showing**: Check if the config flag is enabled and the node class name matches exactly (including package). -- **ClassNotFoundException on old Spark**: The replacement node class likely references a class that doesn't exist in that version. Move the `case` branch into a separate method and wrap with `try { } catch { case _: ClassNotFoundException => }`. -- **Wrong constructor args via reflection**: Print the available constructors with `clazz.getConstructors.foreach(println)` to find the right signature. -- **Version detection**: Version is extracted as `SPARK_VERSION.split("\\.").take(2).mkString(".")` giving `"3.5"`, `"4.1"`, etc. \ No newline at end of file diff --git a/spark-plugin/.gitignore b/spark-plugin/.gitignore index ff5d857..618b484 100644 --- a/spark-plugin/.gitignore +++ b/spark-plugin/.gitignore @@ -56,6 +56,7 @@ plugin/src/main/resources/io/ # custom .bsp .sbtopts +.claude # Downloaded Spark versions (large binaries, not for VCS) utils/.spark-versions/ From 1443c7e3355098dffe54d4f2cede1b1541e0b51b Mon Sep 17 00:00:00 2001 From: Avi Minsky Date: Tue, 10 Mar 2026 16:07:25 +0200 Subject: [PATCH 6/6] removed unnecessary sleep --- .../org/apache/spark/dataflint/DataFlintWindowExecSpec.scala | 1 - 1 file changed, 1 deletion(-) diff --git a/spark-plugin/pluginspark3/src/test/scala/org/apache/spark/dataflint/DataFlintWindowExecSpec.scala b/spark-plugin/pluginspark3/src/test/scala/org/apache/spark/dataflint/DataFlintWindowExecSpec.scala index 872d00a..d452e37 100644 --- a/spark-plugin/pluginspark3/src/test/scala/org/apache/spark/dataflint/DataFlintWindowExecSpec.scala +++ b/spark-plugin/pluginspark3/src/test/scala/org/apache/spark/dataflint/DataFlintWindowExecSpec.scala @@ -163,7 +163,6 @@ class DataFlintWindowExecSpec extends AnyFunSuite with Matchers with BeforeAndAf val durationSlow = resultSlow.queryExecution.executedPlan.collect { case w: DataFlintWindowExec => w }.head.metrics("duration").value - Thread.sleep(50000) // The metric must capture the UDAF's sleep time (at least rows*sleepTime ms = 1ms × 200 rows) withClue(s"durationSlow=$durationSlow ms should be >= ${rows*sleepTime}ms (UDAF sleep captured in metric)") { durationSlow should be >= rows.toLong