diff --git a/spark-plugin/.gitignore b/spark-plugin/.gitignore index 793aa5b..618b484 100644 --- a/spark-plugin/.gitignore +++ b/spark-plugin/.gitignore @@ -55,6 +55,8 @@ plugin/src/main/resources/io/ # custom .bsp +.sbtopts +.claude # Downloaded Spark versions (large binaries, not for VCS) utils/.spark-versions/ diff --git a/spark-plugin/build.sbt b/spark-plugin/build.sbt index cc58838..9aa8505 100644 --- a/spark-plugin/build.sbt +++ b/spark-plugin/build.sbt @@ -18,7 +18,8 @@ lazy val dataflint = project example_3_4_1, example_3_5_1, example_3_4_1_remote, - example_4_0_1 + example_4_0_1, + example_4_1_0 ).settings( crossScalaVersions := Nil, // Aggregate project version must be Nil, see docs: https://www.scala-sbt.org/1.x/docs/Cross-Build.html publish / skip := true @@ -88,7 +89,15 @@ lazy val pluginspark3 = (project in file("pluginspark3")) Compile / unmanagedSourceDirectories += (plugin / Compile / sourceDirectory).value / "scala", // Include resources from plugin directory for static UI files - Compile / unmanagedResourceDirectories += (plugin / Compile / resourceDirectory).value + Compile / unmanagedResourceDirectories += (plugin / Compile / resourceDirectory).value, + libraryDependencies += "org.scalatest" %% "scalatest-funsuite" % "3.2.17" % Test, + libraryDependencies += "org.scalatest" %% "scalatest-shouldmatchers" % "3.2.17" % Test, + libraryDependencies += "org.apache.spark" %% "spark-core" % "3.5.1" % Test, + libraryDependencies += "org.apache.spark" %% "spark-sql" % "3.5.1" % Test, + + // Include source and resources from plugin directory for tests + Test / unmanagedSourceDirectories += (plugin / Compile / sourceDirectory).value / "scala", + ) lazy val pluginspark4 = (project in file("pluginspark4")) @@ -222,4 +231,16 @@ lazy val example_4_0_1 = (project in file("example_4_0_1")) libraryDependencies += "org.apache.spark" % "spark-core_2.13" % "4.0.1", libraryDependencies += "org.apache.spark" % "spark-sql_2.13" % "4.0.1", publish / skip := true + ).dependsOn(pluginspark4) + +lazy val example_4_1_0 = (project in file("example_4_1_0")) + .settings( + name := "DataflintSparkExample410", + organization := "io.dataflint", + scalaVersion := scala213, + crossScalaVersions := List(scala213), // Only Scala 2.13 for Spark 4.x + // there is no scala 2.12 version so we need to force 2.13 to make it compile + libraryDependencies += "org.apache.spark" % "spark-core_2.13" % "4.1.0", + libraryDependencies += "org.apache.spark" % "spark-sql_2.13" % "4.1.0", + publish / skip := true ).dependsOn(pluginspark4) \ No newline at end of file diff --git a/spark-plugin/plugin/src/main/scala/org/apache/spark/dataflint/DataFlintRDDUtils.scala b/spark-plugin/plugin/src/main/scala/org/apache/spark/dataflint/DataFlintRDDUtils.scala new file mode 100644 index 0000000..2135492 --- /dev/null +++ b/spark-plugin/plugin/src/main/scala/org/apache/spark/dataflint/DataFlintRDDUtils.scala @@ -0,0 +1,26 @@ +package org.apache.spark.dataflint + +import org.apache.spark.rdd.RDD +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.execution.metric.SQLMetric + +import java.util.concurrent.TimeUnit.NANOSECONDS + +object DataFlintRDDUtils { + def withDurationMetric(rdd: RDD[InternalRow], durationMetric: SQLMetric): RDD[InternalRow] = + rdd.mapPartitions { iter => + val startTime = System.nanoTime() + var done = false + new Iterator[InternalRow] { + override def hasNext: Boolean = { + val r = iter.hasNext + if (!r && !done) { + durationMetric += NANOSECONDS.toMillis(System.nanoTime() - startTime) + done = true + } + r + } + override def next(): InternalRow = iter.next() + } + } +} \ No newline at end of file diff --git a/spark-plugin/plugin/src/main/scala/org/apache/spark/dataflint/DataflintSparkUICommonLoader.scala b/spark-plugin/plugin/src/main/scala/org/apache/spark/dataflint/DataflintSparkUICommonLoader.scala index 8536f54..5b2bf06 100644 --- a/spark-plugin/plugin/src/main/scala/org/apache/spark/dataflint/DataflintSparkUICommonLoader.scala +++ b/spark-plugin/plugin/src/main/scala/org/apache/spark/dataflint/DataflintSparkUICommonLoader.scala @@ -134,6 +134,7 @@ object DataflintSparkUICommonLoader extends Logging { val INSTRUMENT_SPARK_ENABLED = "spark.dataflint.instrument.spark.enabled" val INSTRUMENT_MAP_IN_PANDAS_ENABLED = "spark.dataflint.instrument.spark.mapInPandas.enabled" val INSTRUMENT_MAP_IN_ARROW_ENABLED = "spark.dataflint.instrument.spark.mapInArrow.enabled" + val INSTRUMENT_WINDOW_ENABLED = "spark.dataflint.instrument.spark.window.enabled" def install(context: SparkContext, pageFactory: DataflintPageFactory): String = { new DataflintSparkUICommonInstaller().install(context, pageFactory) @@ -164,7 +165,8 @@ object DataflintSparkUICommonLoader extends Logging { val instrumentEnabled = sc.conf.getBoolean(INSTRUMENT_SPARK_ENABLED, defaultValue = false) val mapInPandasEnabled = sc.conf.getBoolean(INSTRUMENT_MAP_IN_PANDAS_ENABLED, defaultValue = false) val mapInArrowEnabled = sc.conf.getBoolean(INSTRUMENT_MAP_IN_ARROW_ENABLED, defaultValue = false) - val anyInstrumentationEnabled = instrumentEnabled || mapInPandasEnabled || mapInArrowEnabled + val windowEnabled = sc.conf.getBoolean(INSTRUMENT_WINDOW_ENABLED, defaultValue = false) + val anyInstrumentationEnabled = instrumentEnabled || mapInPandasEnabled || mapInArrowEnabled || windowEnabled if (!anyInstrumentationEnabled) { logInfo("DataFlint instrumentation extension is disabled (no instrumentation flags enabled)") return diff --git a/spark-plugin/pluginspark3/src/main/scala/org/apache/spark/dataflint/DataFlintInstrumentationExtension.scala b/spark-plugin/pluginspark3/src/main/scala/org/apache/spark/dataflint/DataFlintInstrumentationExtension.scala index ca4b1d0..59e9fc7 100644 --- a/spark-plugin/pluginspark3/src/main/scala/org/apache/spark/dataflint/DataFlintInstrumentationExtension.scala +++ b/spark-plugin/pluginspark3/src/main/scala/org/apache/spark/dataflint/DataFlintInstrumentationExtension.scala @@ -2,21 +2,14 @@ package org.apache.spark.dataflint import org.apache.spark.SPARK_VERSION import org.apache.spark.internal.Logging -import org.apache.spark.sql.SparkSession -import org.apache.spark.sql.SparkSessionExtensions +import org.apache.spark.sql.{SparkSession, SparkSessionExtensions, Strategy, execution} +import org.apache.spark.sql.catalyst.expressions.{NamedExpression, WindowFunctionType} +import org.apache.spark.sql.catalyst.planning.PhysicalWindow +import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Window => LogicalWindow} import org.apache.spark.sql.catalyst.rules.Rule import org.apache.spark.sql.execution.{ColumnarRule, SparkPlan} -import org.apache.spark.sql.execution.python.{ - DataFlintMapInPandasExec_3_0, - DataFlintMapInPandasExec_3_1, - DataFlintMapInPandasExec_3_3, - DataFlintMapInPandasExec_3_4, - DataFlintMapInPandasExec_3_5, - DataFlintPythonMapInArrowExec_3_3, - DataFlintPythonMapInArrowExec_3_4, - DataFlintPythonMapInArrowExec_3_5, - MapInPandasExec -} +import org.apache.spark.sql.execution.python.{DataFlintMapInPandasExec_3_0, DataFlintMapInPandasExec_3_1, DataFlintMapInPandasExec_3_3, DataFlintMapInPandasExec_3_4, DataFlintMapInPandasExec_3_5, DataFlintPythonMapInArrowExec_3_3, DataFlintPythonMapInArrowExec_3_4, DataFlintPythonMapInArrowExec_3_5, DataFlintWindowInPandasExec, MapInPandasExec, WindowInPandasExec} +import org.apache.spark.sql.execution.window.DataFlintWindowExec /** * A SparkSessionExtension that injects DataFlint instrumentation into Spark's physical planning phase. @@ -46,6 +39,10 @@ class DataFlintInstrumentationExtension extends (SparkSessionExtensions => Unit) extensions.injectColumnar { session => DataFlintInstrumentationColumnarRule(session) } + + extensions.injectPlannerStrategy { session => + DataFlintWindowPlannerStrategy(session) + } } } @@ -189,3 +186,38 @@ case class DataFlintInstrumentationColumnarRule(session: SparkSession) extends C } } } + +/** + * A planner Strategy that converts the logical Window plan node directly into + * DataFlintWindowExec, bypassing WindowExec entirely. + * + * Using a Strategy (rather than a ColumnarRule) is correct for row-based operators + * like WindowExec that do not participate in columnar execution. + */ +case class DataFlintWindowPlannerStrategy(session: SparkSession) extends Strategy with Logging { + + private val windowEnabled: Boolean = { + val conf = session.sparkContext.conf + val globalEnabled = conf.getBoolean(DataflintSparkUICommonLoader.INSTRUMENT_SPARK_ENABLED, defaultValue = false) + val specificEnabled = conf.getBoolean(DataflintSparkUICommonLoader.INSTRUMENT_WINDOW_ENABLED, defaultValue = false) + globalEnabled || specificEnabled + } + + override def apply(plan: LogicalPlan): Seq[SparkPlan] = { + if (!windowEnabled) return Nil + plan match { + case PhysicalWindow( + WindowFunctionType.SQL, windowExprs, partitionSpec, orderSpec, child) => + logInfo("Replacing logical Window with DataFlintWindowExec") + DataFlintWindowExec( + windowExprs, partitionSpec, orderSpec, planLater(child)) :: Nil + + case PhysicalWindow( + WindowFunctionType.Python, windowExprs, partitionSpec, orderSpec, child) => + logInfo("Replacing logical Window (Python UDF) with DataFlintWindowInPandasExec") + DataFlintWindowInPandasExec( + windowExprs, partitionSpec, orderSpec, planLater(child)) :: Nil + case _ => Nil + } + } +} diff --git a/spark-plugin/pluginspark3/src/main/scala/org/apache/spark/sql/execution/python/DataFlintWindowInPandasExec.scala b/spark-plugin/pluginspark3/src/main/scala/org/apache/spark/sql/execution/python/DataFlintWindowInPandasExec.scala new file mode 100644 index 0000000..c757288 --- /dev/null +++ b/spark-plugin/pluginspark3/src/main/scala/org/apache/spark/sql/execution/python/DataFlintWindowInPandasExec.scala @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.sql.execution.python + +import org.apache.spark.dataflint.DataFlintRDDUtils +import org.apache.spark.internal.Logging +import org.apache.spark.rdd.RDD +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.expressions._ +import org.apache.spark.sql.execution.SparkPlan +import org.apache.spark.sql.execution.metric.{SQLMetric, SQLMetrics} + +class DataFlintWindowInPandasExec private ( + windowExpression: Seq[NamedExpression], + partitionSpec: Seq[Expression], + orderSpec: Seq[SortOrder], + child: SparkPlan) + extends WindowInPandasExec(windowExpression, partitionSpec, orderSpec, child) with Logging { + + override def nodeName: String = "DataFlintWindowInPandas" + + override lazy val metrics: Map[String, SQLMetric] = pythonMetrics ++ Map( + "spillSize" -> SQLMetrics.createSizeMetric(sparkContext, "spill size"), + "duration" -> SQLMetrics.createTimingMetric(sparkContext, "duration") + ) + + override protected def doExecute(): RDD[InternalRow] = + DataFlintRDDUtils.withDurationMetric(super.doExecute(), longMetric("duration")) + + override protected def withNewChildInternal(newChild: SparkPlan): DataFlintWindowInPandasExec = + DataFlintWindowInPandasExec(windowExpression, partitionSpec, orderSpec, newChild) +} + +object DataFlintWindowInPandasExec { + def apply( + windowExpression: Seq[NamedExpression], + partitionSpec: Seq[Expression], + orderSpec: Seq[SortOrder], + child: SparkPlan): DataFlintWindowInPandasExec = + new DataFlintWindowInPandasExec(windowExpression, partitionSpec, orderSpec, child) +} \ No newline at end of file diff --git a/spark-plugin/pluginspark3/src/main/scala/org/apache/spark/sql/execution/window/DataFlintWindowExec.scala b/spark-plugin/pluginspark3/src/main/scala/org/apache/spark/sql/execution/window/DataFlintWindowExec.scala new file mode 100644 index 0000000..1ad2522 --- /dev/null +++ b/spark-plugin/pluginspark3/src/main/scala/org/apache/spark/sql/execution/window/DataFlintWindowExec.scala @@ -0,0 +1,76 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * What's currently included in duration + + The timer wraps iter from super.doExecute(). When you call iter.hasNext, the parent WindowExec iterator internally: + 1. Pulls rows from its child (e.g., a SortExec) + 2. Accumulates a full partition group + 3. Computes window function values + 4. Emits output rows + + Steps 1–3 all happen during the first iter.hasNext call. So the timer captures child-fetch time + window computation time combined. + + How to isolate just the window computation + + You can't easily do this with the current wrapping approach because WindowExec uses an internal buffer pattern — it reads all input for a partition group eagerly on the first hasNext. To truly isolate window computation you'd + need to: + + 1. Override doExecute more deeply — instrument inside WindowFunctionFrame.prepare() and write() calls, which are the actual window computation steps. This requires overriding private Spark internals. + 2. Subtract child time — measure the child RDD's execution time separately and subtract it. Fragile and inaccurate. + 3. Accept the current scope as "window stage time" — which is the pragmatic choice. The metric measures the time from when the partition iterator is first pulled until it's exhausted. This is a reasonable proxy for window + operator cost in a query plan. + + */ +package org.apache.spark.sql.execution.window + +import org.apache.spark.dataflint.DataFlintRDDUtils +import org.apache.spark.internal.Logging +import org.apache.spark.rdd.RDD +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.expressions._ +import org.apache.spark.sql.execution.SparkPlan +import org.apache.spark.sql.execution.metric.{SQLMetric, SQLMetrics} + +class DataFlintWindowExec private ( + windowExpression: Seq[NamedExpression], + partitionSpec: Seq[Expression], + orderSpec: Seq[SortOrder], + child: SparkPlan) + extends WindowExec(windowExpression, partitionSpec, orderSpec, child) with Logging { + + override def nodeName: String = "DataFlintWindow" + + override lazy val metrics: Map[String, SQLMetric] = Map( + "spillSize" -> SQLMetrics.createSizeMetric(sparkContext, "spill size"), + "duration" -> SQLMetrics.createTimingMetric(sparkContext, "duration") + ) + + override protected def doExecute(): RDD[InternalRow] = + DataFlintRDDUtils.withDurationMetric(super.doExecute(), longMetric("duration")) + + override protected def withNewChildInternal(newChild: SparkPlan): DataFlintWindowExec = + DataFlintWindowExec(windowExpression, partitionSpec, orderSpec, newChild) +} + +object DataFlintWindowExec { + def apply( + windowExpression: Seq[NamedExpression], + partitionSpec: Seq[Expression], + orderSpec: Seq[SortOrder], + child: SparkPlan): DataFlintWindowExec = + new DataFlintWindowExec(windowExpression, partitionSpec, orderSpec, child) +} \ No newline at end of file diff --git a/spark-plugin/pluginspark3/src/test/scala/org/apache/spark/dataflint/DataFlintWindowExecSpec.scala b/spark-plugin/pluginspark3/src/test/scala/org/apache/spark/dataflint/DataFlintWindowExecSpec.scala new file mode 100644 index 0000000..d452e37 --- /dev/null +++ b/spark-plugin/pluginspark3/src/test/scala/org/apache/spark/dataflint/DataFlintWindowExecSpec.scala @@ -0,0 +1,182 @@ +package org.apache.spark.dataflint + +import org.apache.spark.sql.{Encoder, Encoders, SparkSession} +import org.apache.spark.sql.execution.window.DataFlintWindowExec +import org.apache.spark.sql.expressions.Aggregator +import org.apache.spark.sql.functions.udaf +import org.scalatest.BeforeAndAfterAll +import org.scalatest.funsuite.AnyFunSuite +import org.scalatest.matchers.should.Matchers + +import java.util.concurrent.TimeUnit.NANOSECONDS + +private class SlowSumAggregator(sleep: Long) extends Aggregator[Long, Long, Long] { + def zero: Long = 0L + def reduce(b: Long, a: Long): Long = { + Thread.sleep(sleep); b + a + } + def merge(b1: Long, b2: Long): Long = b1 + b2 + def finish(r: Long): Long = r + def bufferEncoder: Encoder[Long] = Encoders.scalaLong + def outputEncoder: Encoder[Long] = Encoders.scalaLong +} + +class DataFlintWindowExecSpec extends AnyFunSuite with Matchers with BeforeAndAfterAll { + + private var spark: SparkSession = _ + + override def beforeAll(): Unit = { + spark = SparkSession.builder() + .master("local[1]") + .appName("DataFlintWindowExecSpec") + .config("spark.sql.extensions", "org.apache.spark.dataflint.DataFlintInstrumentationExtension") + .config(DataflintSparkUICommonLoader.INSTRUMENT_SPARK_ENABLED, "true") + .config(DataflintSparkUICommonLoader.INSTRUMENT_WINDOW_ENABLED, "true") + .config("spark.ui.enabled", "true") + .config("spark.sql.adaptive.enabled", "false") + .getOrCreate() + } + + override def afterAll(): Unit = { + if (spark != null) spark.stop() + } + + test("DataFlintWindowPlannerStrategy replaces WindowExec with DataFlintWindowExec for SQL window") { + val session = spark + import session.implicits._ + val df = Seq((1, "a"), (2, "b"), (3, "a"), (4, "b"), (5, "a")).toDF("id", "cat") + df.createOrReplaceTempView("test_window_plan") + + val result = spark.sql( + "SELECT id, cat, rank() OVER (PARTITION BY cat ORDER BY id) AS r FROM test_window_plan" + ) + val windowNodes = result.queryExecution.executedPlan.collect { + case w: DataFlintWindowExec => w + } + + withClue("Expected DataFlintWindowExec in physical plan but found: " + + result.queryExecution.executedPlan.treeString) { + windowNodes should not be empty + } + } + + test("DataFlintWindowExec duration metric is positive after execution") { + val session = spark + import session.implicits._ + // 100 rows across 5 partitions — enough to ensure window work takes > 0ms + val df = (1 to 10000).map(i => (i, i % 5)).toDF("id", "cat") + df.createOrReplaceTempView("test_window_timing") + + val result = spark.sql( + "SELECT id, cat, rank() OVER (PARTITION BY cat ORDER BY id) AS r FROM test_window_timing" + ) + result.collect() + + val windowNode = result.queryExecution.executedPlan.collect { + case w: DataFlintWindowExec => w + }.head + val duration = windowNode.metrics("duration").value + duration should be > 0L + } + + test("DataFlintWindowExec duration metric is bounded by wall clock time") { + val session = spark + import session.implicits._ + val df = (1 to 100000).map(i => (i, i % 5)).toDF("id", "cat") + df.createOrReplaceTempView("test_window_wall_clock") + + val result = spark.sql( + "SELECT id, cat, rank() OVER (PARTITION BY cat ORDER BY id) AS r FROM test_window_wall_clock" + ) + + val wallStart = System.nanoTime() + result.collect() + val wallMs = NANOSECONDS.toMillis(System.nanoTime() - wallStart) + + val windowNode = result.queryExecution.executedPlan.collect { + case w: DataFlintWindowExec => w + }.head + val duration = windowNode.metrics("duration").value + + duration should be > 0L + duration should be <= wallMs + } + + test("DataFlintWindowExec duration metric scales with data size") { + val session = spark + import session.implicits._ + + val dfSmall = (1 to 100).map(i => (i, i % 5)).toDF("id", "cat") + dfSmall.createOrReplaceTempView("test_window_small") + val resultSmall = spark.sql( + "SELECT id, cat, rank() OVER (PARTITION BY cat ORDER BY id) AS r FROM test_window_small" + ) + resultSmall.collect() + val durationSmall = resultSmall.queryExecution.executedPlan.collect { + case w: DataFlintWindowExec => w + }.head.metrics("duration").value + + val dfLarge = (1 to 1000000).map(i => (i, i % 5)).toDF("id", "cat") + dfLarge.createOrReplaceTempView("test_window_large") + val resultLarge = spark.sql( + "SELECT id, cat, rank() OVER (PARTITION BY cat ORDER BY id) AS r FROM test_window_large" + ) + resultLarge.collect() + val durationLarge = resultLarge.queryExecution.executedPlan.collect { + case w: DataFlintWindowExec => w + }.head.metrics("duration").value + + durationLarge should be > durationSmall + } + + test("DataFlintWindowExec duration metric captures window-internal UDAF computation") { + val session = spark + import session.implicits._ + + // A UDAF whose reduce() sleeps 1ms per row — all work happens inside the window operator. + // With 200 rows / 5 categories = 40 rows per partition, reduce() is called 200 times total + // → at least 200ms of window-internal computation that the metric must capture. + val sleepTime=4 + spark.udf.register("slow_sum", udaf(new SlowSumAggregator(sleepTime))) + + val rows = 200 + val partitions = 5 + val df = (1 to rows).map(i => (i, i % partitions)).toDF("id", "cat") + df.createOrReplaceTempView("test_window_udaf") + + // Fast baseline: rank() has negligible per-row computation + val resultFast = spark.sql( + "SELECT id, cat, rank() OVER (PARTITION BY cat ORDER BY id) AS r FROM test_window_udaf" + ) + resultFast.collect() + val durationFast = resultFast.queryExecution.executedPlan.collect { + case w: DataFlintWindowExec => w + }.head.metrics("duration").value + + // Slow: UDAF sleeps 1ms per row inside the window operator — 200ms+ of window-internal work + val resultSlow = spark.sql( + "SELECT id, cat, slow_sum(CAST(id AS BIGINT)) OVER (PARTITION BY cat) AS r FROM test_window_udaf" + ) + val wallSlowStart = System.nanoTime() + resultSlow.collect() + val wallSlowMs = NANOSECONDS.toMillis(System.nanoTime() - wallSlowStart) + val durationSlow = resultSlow.queryExecution.executedPlan.collect { + case w: DataFlintWindowExec => w + }.head.metrics("duration").value + // The metric must capture the UDAF's sleep time (at least rows*sleepTime ms = 1ms × 200 rows) + withClue(s"durationSlow=$durationSlow ms should be >= ${rows*sleepTime}ms (UDAF sleep captured in metric)") { + durationSlow should be >= rows.toLong + } + + // The metric must reflect actual window computation — slow >> fast + withClue(s"durationSlow=$durationSlow ms should exceed durationFast=$durationFast ms") { + durationSlow should be > durationFast + } + + // The metric must be bounded by wall-clock time (no phantom time) + withClue(s"durationSlow=$durationSlow ms should be <= wallSlowMs=$wallSlowMs ms") { + durationSlow should be <= wallSlowMs + } + } + +} \ No newline at end of file diff --git a/spark-plugin/pluginspark4/src/main/scala/org/apache/spark/dataflint/DataFlintInstrumentationExtension.scala b/spark-plugin/pluginspark4/src/main/scala/org/apache/spark/dataflint/DataFlintInstrumentationExtension.scala index 388c744..e035795 100644 --- a/spark-plugin/pluginspark4/src/main/scala/org/apache/spark/dataflint/DataFlintInstrumentationExtension.scala +++ b/spark-plugin/pluginspark4/src/main/scala/org/apache/spark/dataflint/DataFlintInstrumentationExtension.scala @@ -4,16 +4,14 @@ import org.apache.spark.SPARK_VERSION import org.apache.spark.internal.Logging import org.apache.spark.sql.SparkSession import org.apache.spark.sql.SparkSessionExtensions +import org.apache.spark.sql.catalyst.expressions.{NamedExpression, WindowFunctionType} +import org.apache.spark.sql.catalyst.planning.PhysicalWindow +import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Window => LogicalWindow} +import org.apache.spark.sql.execution.SparkStrategy import org.apache.spark.sql.catalyst.rules.Rule import org.apache.spark.sql.execution.{ColumnarRule, SparkPlan} -import org.apache.spark.sql.execution.python.{ - DataFlintMapInPandasExec_4_0, - DataFlintMapInPandasExec_4_1, - DataFlintPythonMapInArrowExec_4_0, - DataFlintPythonMapInArrowExec_4_1, - MapInArrowExec, - MapInPandasExec -} +import org.apache.spark.sql.execution.python.{DataFlintArrowWindowPythonExec_4_1, DataFlintMapInPandasExec_4_0, DataFlintMapInPandasExec_4_1, DataFlintPythonMapInArrowExec_4_0, DataFlintPythonMapInArrowExec_4_1, DataFlintWindowInPandasExec_4_0, MapInArrowExec, MapInPandasExec} +import org.apache.spark.sql.execution.window.DataFlintWindowExec /** * A SparkSessionExtension that injects DataFlint instrumentation into Spark's physical planning phase. @@ -40,6 +38,10 @@ class DataFlintInstrumentationExtension extends (SparkSessionExtensions => Unit) extensions.injectColumnar { session => DataFlintInstrumentationColumnarRule(session) } + + extensions.injectPlannerStrategy { session => + DataFlintWindowPlannerStrategy(session) + } } } @@ -72,6 +74,7 @@ case class DataFlintInstrumentationColumnarRule(session: SparkSession) extends C globalEnabled || specificEnabled } + override def preColumnarTransitions: Rule[SparkPlan] = { plan => if (!mapInPandasEnabled && !mapInArrowEnabled) plan else plan.transformUp { @@ -120,3 +123,47 @@ case class DataFlintInstrumentationColumnarRule(session: SparkSession) extends C } } } + +/** + * A planner Strategy that converts the logical Window plan node directly into + * DataFlintWindowExec, bypassing WindowExec entirely. + * + * Using a Strategy (rather than a ColumnarRule) is correct for row-based operators + * like WindowExec that do not participate in columnar execution. + */ +case class DataFlintWindowPlannerStrategy(session: SparkSession) extends SparkStrategy with Logging { + + private val sparkMinorVersion: String = { + val parts = SPARK_VERSION.split("\\.") + if (parts.length >= 2) s"${parts(0)}.${parts(1)}" else SPARK_VERSION + } + + private val windowEnabled: Boolean = { + val conf = session.sparkContext.conf + val globalEnabled = conf.getBoolean(DataflintSparkUICommonLoader.INSTRUMENT_SPARK_ENABLED, defaultValue = false) + val specificEnabled = conf.getBoolean(DataflintSparkUICommonLoader.INSTRUMENT_WINDOW_ENABLED, defaultValue = false) + globalEnabled || specificEnabled + } + + override def apply(plan: LogicalPlan): Seq[SparkPlan] = { + if (!windowEnabled) return Nil + plan match { + case PhysicalWindow( + WindowFunctionType.SQL, windowExprs, partitionSpec, orderSpec, child) => + logInfo("Replacing logical Window with DataFlintWindowExec") + DataFlintWindowExec( + windowExprs, partitionSpec, orderSpec, planLater(child)) :: Nil + + case PhysicalWindow( + WindowFunctionType.Python, windowExprs, partitionSpec, orderSpec, child) => + logInfo(s"Replacing logical Window (Python UDF) with DataFlint version for Spark $sparkMinorVersion") + sparkMinorVersion match { + case "4.0" => + DataFlintWindowInPandasExec_4_0(windowExprs, partitionSpec, orderSpec, planLater(child)) :: Nil + case _ => DataFlintArrowWindowPythonExec_4_1(windowExprs, partitionSpec, orderSpec, planLater(child)) :: Nil + } + + case _ => Nil + } + } +} diff --git a/spark-plugin/pluginspark4/src/main/scala/org/apache/spark/sql/execution/python/DataFlintArrowWindowPythonExec_4_1.scala b/spark-plugin/pluginspark4/src/main/scala/org/apache/spark/sql/execution/python/DataFlintArrowWindowPythonExec_4_1.scala new file mode 100644 index 0000000..f142e7d --- /dev/null +++ b/spark-plugin/pluginspark4/src/main/scala/org/apache/spark/sql/execution/python/DataFlintArrowWindowPythonExec_4_1.scala @@ -0,0 +1,102 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +/* + * DataFlint instrumented version for Python UDF window functions on Spark 4.1.x. + * + * In Spark 4.1, WindowInPandasExec was replaced by ArrowWindowPythonExec. + * Since pluginspark4 compiles against Spark 4.0.1, we cannot extend ArrowWindowPythonExec + * directly. Instead, we use reflection to create an ArrowWindowPythonExec at runtime + * and delegate execution to it, wrapping the result with a duration metric. + */ +package org.apache.spark.sql.execution.python + +import org.apache.spark.dataflint.DataFlintRDDUtils +import org.apache.spark.internal.Logging +import org.apache.spark.rdd.RDD +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.expressions._ +import org.apache.spark.sql.catalyst.plans.physical.Partitioning +import org.apache.spark.sql.execution.{SparkPlan, UnaryExecNode} +import org.apache.spark.sql.execution.metric.{SQLMetric, SQLMetrics} + +class DataFlintArrowWindowPythonExec_4_1 private ( + val windowExpression: Seq[NamedExpression], + val partitionSpec: Seq[Expression], + val orderSpec: Seq[SortOrder], + val child: SparkPlan) + extends UnaryExecNode with PythonSQLMetrics with Logging { + + override def nodeName: String = "DataFlintArrowWindowPython" + + override def canEqual(that: Any): Boolean = that.isInstanceOf[DataFlintArrowWindowPythonExec_4_1] + override def productArity: Int = 4 + override def productElement(n: Int): Any = n match { + case 0 => windowExpression + case 1 => partitionSpec + case 2 => orderSpec + case 3 => child + case _ => throw new IndexOutOfBoundsException(s"$n") + } + + override def output: Seq[Attribute] = child.output ++ windowExpression.map(_.toAttribute) + + override def outputOrdering: Seq[SortOrder] = child.outputOrdering + + override def outputPartitioning: Partitioning = child.outputPartitioning + + override lazy val metrics: Map[String, SQLMetric] = pythonMetrics ++ Map( + "spillSize" -> SQLMetrics.createSizeMetric(sparkContext, "spill size"), + "duration" -> SQLMetrics.createTimingMetric(sparkContext, "duration") + ) + + override protected def doExecute(): RDD[InternalRow] = { + val durationMetric = longMetric("duration") + + // Create ArrowWindowPythonExec at runtime via reflection (Spark 4.1-only class). + // We use the companion object's apply method which deduces evalType from window expressions. + val innerRDD: RDD[InternalRow] = try { + val companionClass = Class.forName( + "org.apache.spark.sql.execution.python.ArrowWindowPythonExec$") + val companion = companionClass.getField("MODULE$").get(null) + val applyMethod = companion.getClass.getMethods + .find(m => m.getName == "apply" && m.getParameterCount == 4) + .getOrElse(throw new RuntimeException( + "ArrowWindowPythonExec$.apply(4) not found — Spark 4.1.x required")) + val innerExec = applyMethod.invoke(companion, + windowExpression, partitionSpec, orderSpec, child).asInstanceOf[SparkPlan] + innerExec.execute() + } catch { + case e: Exception => + logWarning(s"DataFlint: failed to create ArrowWindowPythonExec via reflection: ${e.getMessage}") + throw e + } + + DataFlintRDDUtils.withDurationMetric(innerRDD, durationMetric) + } + + override protected def withNewChildInternal(newChild: SparkPlan): DataFlintArrowWindowPythonExec_4_1 = + new DataFlintArrowWindowPythonExec_4_1(windowExpression, partitionSpec, orderSpec, newChild) +} + +object DataFlintArrowWindowPythonExec_4_1 { + def apply( + windowExpression: Seq[NamedExpression], + partitionSpec: Seq[Expression], + orderSpec: Seq[SortOrder], + child: SparkPlan): DataFlintArrowWindowPythonExec_4_1 = + new DataFlintArrowWindowPythonExec_4_1(windowExpression, partitionSpec, orderSpec, child) +} \ No newline at end of file diff --git a/spark-plugin/pluginspark4/src/main/scala/org/apache/spark/sql/execution/python/DataFlintWindowInPandasExec_4_0.scala b/spark-plugin/pluginspark4/src/main/scala/org/apache/spark/sql/execution/python/DataFlintWindowInPandasExec_4_0.scala new file mode 100644 index 0000000..d86367c --- /dev/null +++ b/spark-plugin/pluginspark4/src/main/scala/org/apache/spark/sql/execution/python/DataFlintWindowInPandasExec_4_0.scala @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.sql.execution.python + +import org.apache.spark.dataflint.DataFlintRDDUtils +import org.apache.spark.internal.Logging +import org.apache.spark.rdd.RDD +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.expressions._ +import org.apache.spark.sql.execution.SparkPlan +import org.apache.spark.sql.execution.metric.{SQLMetric, SQLMetrics} + +class DataFlintWindowInPandasExec_4_0 private ( + windowExpression: Seq[NamedExpression], + partitionSpec: Seq[Expression], + orderSpec: Seq[SortOrder], + child: SparkPlan) + extends WindowInPandasExec(windowExpression, partitionSpec, orderSpec, child) with Logging { + + override def nodeName: String = "DataFlintWindowInPandas" + + override lazy val metrics: Map[String, SQLMetric] = pythonMetrics ++ Map( + "spillSize" -> SQLMetrics.createSizeMetric(sparkContext, "spill size"), + "duration" -> SQLMetrics.createTimingMetric(sparkContext, "duration") + ) + + override protected def doExecute(): RDD[InternalRow] = + DataFlintRDDUtils.withDurationMetric(super.doExecute(), longMetric("duration")) + + override protected def withNewChildInternal(newChild: SparkPlan): DataFlintWindowInPandasExec_4_0 = + DataFlintWindowInPandasExec_4_0(windowExpression, partitionSpec, orderSpec, newChild) +} + +object DataFlintWindowInPandasExec_4_0 { + def apply( + windowExpression: Seq[NamedExpression], + partitionSpec: Seq[Expression], + orderSpec: Seq[SortOrder], + child: SparkPlan): DataFlintWindowInPandasExec_4_0 = + new DataFlintWindowInPandasExec_4_0(windowExpression, partitionSpec, orderSpec, child) +} \ No newline at end of file diff --git a/spark-plugin/pluginspark4/src/main/scala/org/apache/spark/sql/execution/window/DataFlintWindowExec.scala b/spark-plugin/pluginspark4/src/main/scala/org/apache/spark/sql/execution/window/DataFlintWindowExec.scala new file mode 100644 index 0000000..454bca0 --- /dev/null +++ b/spark-plugin/pluginspark4/src/main/scala/org/apache/spark/sql/execution/window/DataFlintWindowExec.scala @@ -0,0 +1,64 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.sql.execution.window + +import org.apache.spark.dataflint.DataFlintRDDUtils +import org.apache.spark.internal.Logging +import org.apache.spark.rdd.RDD +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.expressions._ +import org.apache.spark.sql.execution.SparkPlan +import org.apache.spark.sql.execution.metric.{SQLMetric, SQLMetrics} + +/** + * DataFlint instrumented version of WindowExec for Spark 4.x. + * + * Spark 4.0 uses WindowEvaluatorFactory whose eval() returns a lazy iterator, + * so wrapping super.doExecute() is correct: startTime is set before any rows + * are consumed and the duration covers the full window computation. + */ +class DataFlintWindowExec private ( + windowExpression: Seq[NamedExpression], + partitionSpec: Seq[Expression], + orderSpec: Seq[SortOrder], + child: SparkPlan) + extends WindowExec(windowExpression, partitionSpec, orderSpec, child) with Logging { + + logInfo("DataFlint WindowExec is connected") + + override def nodeName: String = "DataFlintWindow" + + override lazy val metrics: Map[String, SQLMetric] = Map( + "spillSize" -> SQLMetrics.createSizeMetric(sparkContext, "spill size"), + "duration" -> SQLMetrics.createTimingMetric(sparkContext, "duration") + ) + + override protected def doExecute(): RDD[InternalRow] = + DataFlintRDDUtils.withDurationMetric(super.doExecute(), longMetric("duration")) + + override protected def withNewChildInternal(newChild: SparkPlan): DataFlintWindowExec = + DataFlintWindowExec(windowExpression, partitionSpec, orderSpec, newChild) +} + +object DataFlintWindowExec { + def apply( + windowExpression: Seq[NamedExpression], + partitionSpec: Seq[Expression], + orderSpec: Seq[SortOrder], + child: SparkPlan): DataFlintWindowExec = + new DataFlintWindowExec(windowExpression, partitionSpec, orderSpec, child) +} diff --git a/spark-plugin/pyspark-testing/dataflint_pyspark_example.py b/spark-plugin/pyspark-testing/dataflint_pyspark_example.py index 803d017..8af12b6 100755 --- a/spark-plugin/pyspark-testing/dataflint_pyspark_example.py +++ b/spark-plugin/pyspark-testing/dataflint_pyspark_example.py @@ -25,11 +25,10 @@ spark_major_version = 3 # default to Spark 3 if spark_home: - # Try to extract version from SPARK_HOME path - if '4.0' in spark_home or 'spark-4' in spark_home: - spark_major_version = 4 - elif '3.' in spark_home or 'spark-3' in spark_home: - spark_major_version = 3 + import re + m = re.search(r'[/_-](\d+)\.\d', spark_home) + if m: + spark_major_version = int(m.group(1)) # Select the appropriate plugin JAR based on Spark version if spark_major_version == 4: @@ -52,12 +51,15 @@ .config("spark.plugins", "io.dataflint.spark.SparkDataflintPlugin") \ .config("spark.ui.port", "10000") \ .config("spark.sql.maxMetadataStringLength", "10000") \ + .config("spark.sql.adaptive.enabled", "false") \ .config("spark.dataflint.telemetry.enabled", "false") \ + .config("spark.dataflint.instrument.spark.enable", "true") \ .config("spark.dataflint.instrument.spark.mapInPandas.enabled", "true") \ .config("spark.dataflint.instrument.spark.mapInArrow.enabled", "true") \ + .config("spark.dataflint.instrument.spark.window.enabled", "true") \ .master("local[*]") \ .getOrCreate() - +# spark.sparkContext.setLogLevel("INFO") # Get Spark version and check if mapInArrow is supported spark_version = spark.version version_parts = spark_version.split('.') @@ -181,6 +183,59 @@ def compute_discounted_totals_arrow(iterator): print(f"Current version: {spark_version}") + +print("\n" + "="*80) +print("Running Window function example") +print("="*80) + +from pyspark.sql import Window +from pyspark.sql.functions import rank, sum as spark_sum, avg +from pyspark.sql.types import DoubleType + +window_by_category = Window.partitionBy("category").orderBy("price") +window_category_total = Window.partitionBy("category") + +df_window = df.withColumn("rank_in_category", rank().over(window_by_category)) \ + .withColumn("cumulative_revenue", spark_sum("price").over(window_by_category)) \ + .withColumn("avg_price_in_category", avg("price").over(window_category_total)) + +df_window.write \ + .mode("overwrite") \ + .parquet("/tmp/dataflint_window_example") + +print("\nResult written to /tmp/dataflint_window_example") +print("\nSample output:") +df_window.show(10, truncate=False) + + +print("\n" + "="*80) +print("Running Window function with Python UDF example") +print("="*80) + +import pandas as pd +from pyspark.sql.functions import pandas_udf + +@pandas_udf(DoubleType()) +def discounted_sum(prices: pd.Series) -> float: + """Pandas UDF used as a window aggregate: sum of prices with a 10% discount.""" + import time + time.sleep(2) + return prices.sum() * 0.9 + +df_window_udf = df.withColumn( + "discounted_category_revenue", + discounted_sum("price").over(window_category_total) +) + +df_window_udf.write \ + .mode("overwrite") \ + .parquet("/tmp/dataflint_window_udf_example") + +print("\nResult written to /tmp/dataflint_window_udf_example") +print("\nSample output:") +df_window_udf.show(10, truncate=False) + + print("\n" + "="*80) print("Done!") print("="*80) diff --git a/spark-plugin/utils/run-with-spark.sh b/spark-plugin/utils/run-with-spark.sh index 776c7e6..c61b14b 100755 --- a/spark-plugin/utils/run-with-spark.sh +++ b/spark-plugin/utils/run-with-spark.sh @@ -98,6 +98,10 @@ fi # Get current Java version for display JAVA_VERSION=$(java -version 2>&1 | awk -F '"' '/version/ {print $2}' | cut -d'.' -f1) +# Get Scala version from the scala-library JAR bundled with Spark +SCALA_VERSION=$(ls "${SPARK_HOME}/jars/scala-library-"*.jar 2>/dev/null | sed 's/.*scala-library-\([0-9.]*\)\.jar/\1/' | head -1) +SCALA_VERSION="${SCALA_VERSION:-unknown}" + # Ensure driver and worker use the same Python interpreter PYTHON_BIN="$(which python3)" export PYSPARK_PYTHON="${PYTHON_BIN}" @@ -110,9 +114,12 @@ export PYTHONPATH="${SPARK_HOME}/python${PY4J_ZIP:+:${PY4J_ZIP}}${PYTHONPATH:+:$ echo "╔══════════════════════════════════════════════════════════════╗" echo "║ SPARK_HOME : ${SPARK_HOME}" echo "║ Spark ver : ${SPARK_VERSION}" +echo "║ Scala ver : ${SCALA_VERSION}" echo "║ Java ver : ${JAVA_VERSION}" echo "║ Script : ${PYSPARK_SCRIPT}" echo "╚══════════════════════════════════════════════════════════════╝" echo "" +#export SPARK_LOG_LEVEL="${SPARK_LOG_LEVEL:-WARN}" + exec python3 "${PYSPARK_SCRIPT}" "$@" diff --git a/spark-ui/src/reducers/SQLNodeStageReducer.ts b/spark-ui/src/reducers/SQLNodeStageReducer.ts index 63d1016..a361263 100644 --- a/spark-ui/src/reducers/SQLNodeStageReducer.ts +++ b/spark-ui/src/reducers/SQLNodeStageReducer.ts @@ -111,7 +111,9 @@ export function calculateSQLNodeStage(sql: EnrichedSparkSQL, sqlStages: SparkSta if (node.nodeName === "AQEShuffleRead" || node.nodeName === "Coalesce" || node.nodeName === "BatchEvalPython" || node.nodeName === "MapInPandas" || node.nodeName === "DataFlintMapInPandas" || node.nodeName === "MapInArrow" || node.nodeName === "PythonMapInArrow" || node.nodeName === "DataFlintMapInArrow" || - node.nodeName === "ArrowEvalPython" || node.nodeName === "FlatMapGroupsInPandas") { + node.nodeName === "ArrowEvalPython" || node.nodeName === "FlatMapGroupsInPandas" || + node.nodeName === "WindowInPandas" || node.nodeName === "DataFlintWindowInPandas" || node.nodeName === "DataFlintArrowWindowPython" || + node.nodeName === "Window" || node.nodeName === "DataFlintWindow") { const nextNode = findNextNode(node.nodeId); if (nextNode !== undefined && nextNode.stage !== undefined) { return { ...node, stage: nextNode.stage }; diff --git a/spark-ui/src/reducers/SqlReducer.ts b/spark-ui/src/reducers/SqlReducer.ts index dfda358..7fe9eb7 100644 --- a/spark-ui/src/reducers/SqlReducer.ts +++ b/spark-ui/src/reducers/SqlReducer.ts @@ -164,6 +164,10 @@ export function parseNodePlan( plan: parseSort(plan.planDescription), }; case "Window": + case "DataFlintWindow": + case "WindowInPandas": + case "DataFlintWindowInPandas": + case "DataFlintArrowWindowPython": return { type: "Window", plan: parseWindow(plan.planDescription), diff --git a/spark-ui/src/reducers/SqlReducerUtils.ts b/spark-ui/src/reducers/SqlReducerUtils.ts index 94a80d3..98987d6 100644 --- a/spark-ui/src/reducers/SqlReducerUtils.ts +++ b/spark-ui/src/reducers/SqlReducerUtils.ts @@ -192,6 +192,10 @@ const nodeTypeDict: Record = { ArrowEvalPython: "transformation", FlatMapGroupsInPandas: "transformation", BatchEvalPython: "transformation", + WindowInPandas: "transformation", + DataFlintWindowInPandas: "transformation", + DataFlintArrowWindowPython: "transformation", + DataFlintWindow: "transformation", Generate: "transformation", Expand: "transformation", }; @@ -272,6 +276,11 @@ const nodeRenamerDict: Record = { ArrowEvalPython: "Select (with Arrow)", FlatMapGroupsInPandas: "Select Flat (with Pandas)", BatchEvalPython: "Run Python UDF", + Window: "Window", + WindowInPandas: "Window (with Pandas UDF)", + DataFlintWindow: "Window", + DataFlintWindowInPandas: "Window (with Pandas UDF)", + DataFlintArrowWindowPython: "Window (with Pandas UDF)", Expand: "Expand", };