Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions spark-plugin/.gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,8 @@ plugin/src/main/resources/io/

# custom
.bsp
.sbtopts
.claude

# Downloaded Spark versions (large binaries, not for VCS)
utils/.spark-versions/
25 changes: 23 additions & 2 deletions spark-plugin/build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,8 @@ lazy val dataflint = project
example_3_4_1,
example_3_5_1,
example_3_4_1_remote,
example_4_0_1
example_4_0_1,
example_4_1_0
).settings(
crossScalaVersions := Nil, // Aggregate project version must be Nil, see docs: https://www.scala-sbt.org/1.x/docs/Cross-Build.html
publish / skip := true
Expand Down Expand Up @@ -88,7 +89,15 @@ lazy val pluginspark3 = (project in file("pluginspark3"))
Compile / unmanagedSourceDirectories += (plugin / Compile / sourceDirectory).value / "scala",

// Include resources from plugin directory for static UI files
Compile / unmanagedResourceDirectories += (plugin / Compile / resourceDirectory).value
Compile / unmanagedResourceDirectories += (plugin / Compile / resourceDirectory).value,
libraryDependencies += "org.scalatest" %% "scalatest-funsuite" % "3.2.17" % Test,
libraryDependencies += "org.scalatest" %% "scalatest-shouldmatchers" % "3.2.17" % Test,
libraryDependencies += "org.apache.spark" %% "spark-core" % "3.5.1" % Test,
libraryDependencies += "org.apache.spark" %% "spark-sql" % "3.5.1" % Test,

// Include source and resources from plugin directory for tests
Test / unmanagedSourceDirectories += (plugin / Compile / sourceDirectory).value / "scala",

)

lazy val pluginspark4 = (project in file("pluginspark4"))
Expand Down Expand Up @@ -222,4 +231,16 @@ lazy val example_4_0_1 = (project in file("example_4_0_1"))
libraryDependencies += "org.apache.spark" % "spark-core_2.13" % "4.0.1",
libraryDependencies += "org.apache.spark" % "spark-sql_2.13" % "4.0.1",
publish / skip := true
).dependsOn(pluginspark4)

lazy val example_4_1_0 = (project in file("example_4_1_0"))
.settings(
name := "DataflintSparkExample410",
organization := "io.dataflint",
scalaVersion := scala213,
crossScalaVersions := List(scala213), // Only Scala 2.13 for Spark 4.x
// there is no scala 2.12 version so we need to force 2.13 to make it compile
libraryDependencies += "org.apache.spark" % "spark-core_2.13" % "4.1.0",
libraryDependencies += "org.apache.spark" % "spark-sql_2.13" % "4.1.0",
publish / skip := true
).dependsOn(pluginspark4)
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
package org.apache.spark.dataflint

import org.apache.spark.rdd.RDD
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.execution.metric.SQLMetric

import java.util.concurrent.TimeUnit.NANOSECONDS

object DataFlintRDDUtils {
def withDurationMetric(rdd: RDD[InternalRow], durationMetric: SQLMetric): RDD[InternalRow] =
rdd.mapPartitions { iter =>
val startTime = System.nanoTime()
var done = false
new Iterator[InternalRow] {
override def hasNext: Boolean = {
val r = iter.hasNext
if (!r && !done) {
durationMetric += NANOSECONDS.toMillis(System.nanoTime() - startTime)
done = true
}
r
}
override def next(): InternalRow = iter.next()
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,7 @@ object DataflintSparkUICommonLoader extends Logging {
val INSTRUMENT_SPARK_ENABLED = "spark.dataflint.instrument.spark.enabled"
val INSTRUMENT_MAP_IN_PANDAS_ENABLED = "spark.dataflint.instrument.spark.mapInPandas.enabled"
val INSTRUMENT_MAP_IN_ARROW_ENABLED = "spark.dataflint.instrument.spark.mapInArrow.enabled"
val INSTRUMENT_WINDOW_ENABLED = "spark.dataflint.instrument.spark.window.enabled"

def install(context: SparkContext, pageFactory: DataflintPageFactory): String = {
new DataflintSparkUICommonInstaller().install(context, pageFactory)
Expand Down Expand Up @@ -164,7 +165,8 @@ object DataflintSparkUICommonLoader extends Logging {
val instrumentEnabled = sc.conf.getBoolean(INSTRUMENT_SPARK_ENABLED, defaultValue = false)
val mapInPandasEnabled = sc.conf.getBoolean(INSTRUMENT_MAP_IN_PANDAS_ENABLED, defaultValue = false)
val mapInArrowEnabled = sc.conf.getBoolean(INSTRUMENT_MAP_IN_ARROW_ENABLED, defaultValue = false)
val anyInstrumentationEnabled = instrumentEnabled || mapInPandasEnabled || mapInArrowEnabled
val windowEnabled = sc.conf.getBoolean(INSTRUMENT_WINDOW_ENABLED, defaultValue = false)
val anyInstrumentationEnabled = instrumentEnabled || mapInPandasEnabled || mapInArrowEnabled || windowEnabled
if (!anyInstrumentationEnabled) {
logInfo("DataFlint instrumentation extension is disabled (no instrumentation flags enabled)")
return
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,21 +2,14 @@ package org.apache.spark.dataflint

import org.apache.spark.SPARK_VERSION
import org.apache.spark.internal.Logging
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.SparkSessionExtensions
import org.apache.spark.sql.{SparkSession, SparkSessionExtensions, Strategy, execution}
import org.apache.spark.sql.catalyst.expressions.{NamedExpression, WindowFunctionType}
import org.apache.spark.sql.catalyst.planning.PhysicalWindow
import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Window => LogicalWindow}
import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.sql.execution.{ColumnarRule, SparkPlan}
import org.apache.spark.sql.execution.python.{
DataFlintMapInPandasExec_3_0,
DataFlintMapInPandasExec_3_1,
DataFlintMapInPandasExec_3_3,
DataFlintMapInPandasExec_3_4,
DataFlintMapInPandasExec_3_5,
DataFlintPythonMapInArrowExec_3_3,
DataFlintPythonMapInArrowExec_3_4,
DataFlintPythonMapInArrowExec_3_5,
MapInPandasExec
}
import org.apache.spark.sql.execution.python.{DataFlintMapInPandasExec_3_0, DataFlintMapInPandasExec_3_1, DataFlintMapInPandasExec_3_3, DataFlintMapInPandasExec_3_4, DataFlintMapInPandasExec_3_5, DataFlintPythonMapInArrowExec_3_3, DataFlintPythonMapInArrowExec_3_4, DataFlintPythonMapInArrowExec_3_5, DataFlintWindowInPandasExec, MapInPandasExec, WindowInPandasExec}
import org.apache.spark.sql.execution.window.DataFlintWindowExec

/**
* A SparkSessionExtension that injects DataFlint instrumentation into Spark's physical planning phase.
Expand Down Expand Up @@ -46,6 +39,10 @@ class DataFlintInstrumentationExtension extends (SparkSessionExtensions => Unit)
extensions.injectColumnar { session =>
DataFlintInstrumentationColumnarRule(session)
}

extensions.injectPlannerStrategy { session =>
DataFlintWindowPlannerStrategy(session)
}
}
}

Expand Down Expand Up @@ -189,3 +186,38 @@ case class DataFlintInstrumentationColumnarRule(session: SparkSession) extends C
}
}
}

/**
* A planner Strategy that converts the logical Window plan node directly into
* DataFlintWindowExec, bypassing WindowExec entirely.
*
* Using a Strategy (rather than a ColumnarRule) is correct for row-based operators
* like WindowExec that do not participate in columnar execution.
*/
case class DataFlintWindowPlannerStrategy(session: SparkSession) extends Strategy with Logging {

private val windowEnabled: Boolean = {
val conf = session.sparkContext.conf
val globalEnabled = conf.getBoolean(DataflintSparkUICommonLoader.INSTRUMENT_SPARK_ENABLED, defaultValue = false)
val specificEnabled = conf.getBoolean(DataflintSparkUICommonLoader.INSTRUMENT_WINDOW_ENABLED, defaultValue = false)
globalEnabled || specificEnabled
}

override def apply(plan: LogicalPlan): Seq[SparkPlan] = {
if (!windowEnabled) return Nil
plan match {
case PhysicalWindow(
WindowFunctionType.SQL, windowExprs, partitionSpec, orderSpec, child) =>
logInfo("Replacing logical Window with DataFlintWindowExec")
DataFlintWindowExec(
windowExprs, partitionSpec, orderSpec, planLater(child)) :: Nil

case PhysicalWindow(
WindowFunctionType.Python, windowExprs, partitionSpec, orderSpec, child) =>
logInfo("Replacing logical Window (Python UDF) with DataFlintWindowInPandasExec")
DataFlintWindowInPandasExec(
windowExprs, partitionSpec, orderSpec, planLater(child)) :: Nil
case _ => Nil
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.sql.execution.python

import org.apache.spark.dataflint.DataFlintRDDUtils
import org.apache.spark.internal.Logging
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.execution.SparkPlan
import org.apache.spark.sql.execution.metric.{SQLMetric, SQLMetrics}

class DataFlintWindowInPandasExec private (
windowExpression: Seq[NamedExpression],
partitionSpec: Seq[Expression],
orderSpec: Seq[SortOrder],
child: SparkPlan)
extends WindowInPandasExec(windowExpression, partitionSpec, orderSpec, child) with Logging {

override def nodeName: String = "DataFlintWindowInPandas"

override lazy val metrics: Map[String, SQLMetric] = pythonMetrics ++ Map(
"spillSize" -> SQLMetrics.createSizeMetric(sparkContext, "spill size"),
"duration" -> SQLMetrics.createTimingMetric(sparkContext, "duration")
)

override protected def doExecute(): RDD[InternalRow] =
DataFlintRDDUtils.withDurationMetric(super.doExecute(), longMetric("duration"))

override protected def withNewChildInternal(newChild: SparkPlan): DataFlintWindowInPandasExec =
DataFlintWindowInPandasExec(windowExpression, partitionSpec, orderSpec, newChild)
}

object DataFlintWindowInPandasExec {
def apply(
windowExpression: Seq[NamedExpression],
partitionSpec: Seq[Expression],
orderSpec: Seq[SortOrder],
child: SparkPlan): DataFlintWindowInPandasExec =
new DataFlintWindowInPandasExec(windowExpression, partitionSpec, orderSpec, child)
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
* What's currently included in duration

The timer wraps iter from super.doExecute(). When you call iter.hasNext, the parent WindowExec iterator internally:
1. Pulls rows from its child (e.g., a SortExec)
2. Accumulates a full partition group
3. Computes window function values
4. Emits output rows

Steps 1–3 all happen during the first iter.hasNext call. So the timer captures child-fetch time + window computation time combined.

How to isolate just the window computation

You can't easily do this with the current wrapping approach because WindowExec uses an internal buffer pattern — it reads all input for a partition group eagerly on the first hasNext. To truly isolate window computation you'd
need to:

1. Override doExecute more deeply — instrument inside WindowFunctionFrame.prepare() and write() calls, which are the actual window computation steps. This requires overriding private Spark internals.
2. Subtract child time — measure the child RDD's execution time separately and subtract it. Fragile and inaccurate.
3. Accept the current scope as "window stage time" — which is the pragmatic choice. The metric measures the time from when the partition iterator is first pulled until it's exhausted. This is a reasonable proxy for window
operator cost in a query plan.

*/
package org.apache.spark.sql.execution.window

import org.apache.spark.dataflint.DataFlintRDDUtils
import org.apache.spark.internal.Logging
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.execution.SparkPlan
import org.apache.spark.sql.execution.metric.{SQLMetric, SQLMetrics}

class DataFlintWindowExec private (
windowExpression: Seq[NamedExpression],
partitionSpec: Seq[Expression],
orderSpec: Seq[SortOrder],
child: SparkPlan)
extends WindowExec(windowExpression, partitionSpec, orderSpec, child) with Logging {

override def nodeName: String = "DataFlintWindow"

override lazy val metrics: Map[String, SQLMetric] = Map(
"spillSize" -> SQLMetrics.createSizeMetric(sparkContext, "spill size"),
"duration" -> SQLMetrics.createTimingMetric(sparkContext, "duration")
)

override protected def doExecute(): RDD[InternalRow] =
DataFlintRDDUtils.withDurationMetric(super.doExecute(), longMetric("duration"))

override protected def withNewChildInternal(newChild: SparkPlan): DataFlintWindowExec =
DataFlintWindowExec(windowExpression, partitionSpec, orderSpec, newChild)
}

object DataFlintWindowExec {
def apply(
windowExpression: Seq[NamedExpression],
partitionSpec: Seq[Expression],
orderSpec: Seq[SortOrder],
child: SparkPlan): DataFlintWindowExec =
new DataFlintWindowExec(windowExpression, partitionSpec, orderSpec, child)
}
Loading