Skip to content

Spline doesn't register delta overwritePartitions() operation #900

@denstern

Description

@denstern

I'm running spark application with spark == 3.5.5 and delta == 3.3.2 versions.

Code contains methods of DataFrameWriterv2 spark class. Error occures when delta-table is saved using overwritePartitions() method.

Used spark-agent: spark-3.5-spline-agent-bundle_2.12-2.2.3.jar

Error message:

java.lang.RuntimeException: Write extraction failed for: class org.apache.spark.sql.delta.DeltaDynamicPartitionOverwriteCommand
	at za.co.absa.spline.harvester.LineageHarvester.tryExtractWriteCommand(LineageHarvester.scala:153)
	at za.co.absa.spline.harvester.LineageHarvester.harvest(LineageHarvester.scala:61)
	at za.co.absa.spline.agent.SplineAgent$$anon$1.$anonfun$handle$1(SplineAgent.scala:91)
	at za.co.absa.spline.agent.SplineAgent$$anon$1.withErrorHandling(SplineAgent.scala:100)
	at za.co.absa.spline.agent.SplineAgent$$anon$1.handle(SplineAgent.scala:72)
	at za.co.absa.spline.harvester.listener.QueryExecutionListenerDelegate.onSuccess(QueryExecutionListenerDelegate.scala:28)
	at za.co.absa.spline.harvester.listener.SplineQueryExecutionListener.$anonfun$onSuccess$1(SplineQueryExecutionListener.scala:41)
	at za.co.absa.spline.harvester.listener.SplineQueryExecutionListener.$anonfun$onSuccess$1$adapted(SplineQueryExecutionListener.scala:41)
	at scala.Option.foreach(Option.scala:407)
	at za.co.absa.spline.harvester.listener.SplineQueryExecutionListener.onSuccess(SplineQueryExecutionListener.scala:41)
	at org.apache.spark.sql.util.ExecutionListenerBus.doPostEvent(QueryExecutionListener.scala:173)
	at org.apache.spark.sql.util.ExecutionListenerBus.doPostEvent(QueryExecutionListener.scala:143)
	at org.apache.spark.util.ListenerBus.postToAll(ListenerBus.scala:117)
	at org.apache.spark.util.ListenerBus.postToAll$(ListenerBus.scala:101)
	at org.apache.spark.sql.util.ExecutionListenerBus.postToAll(QueryExecutionListener.scala:143)
	at org.apache.spark.sql.util.ExecutionListenerBus.onOtherEvent(QueryExecutionListener.scala:155)
	at org.apache.spark.scheduler.SparkListenerBus.doPostEvent(SparkListenerBus.scala:100)
	at org.apache.spark.scheduler.SparkListenerBus.doPostEvent$(SparkListenerBus.scala:28)
	at org.apache.spark.scheduler.AsyncEventQueue.doPostEvent(AsyncEventQueue.scala:37)
	at org.apache.spark.scheduler.AsyncEventQueue.doPostEvent(AsyncEventQueue.scala:37)
	at org.apache.spark.util.ListenerBus.postToAll(ListenerBus.scala:117)
	at org.apache.spark.util.ListenerBus.postToAll$(ListenerBus.scala:101)
	at org.apache.spark.scheduler.AsyncEventQueue.super$postToAll(AsyncEventQueue.scala:105)
	at org.apache.spark.scheduler.AsyncEventQueue.$anonfun$dispatch$1(AsyncEventQueue.scala:105)
	at scala.runtime.java8.JFunction0$mcJ$sp.apply(JFunction0$mcJ$sp.java:23)
	at scala.util.DynamicVariable.withValue(DynamicVariable.scala:62)
	at org.apache.spark.scheduler.AsyncEventQueue.org$apache$spark$scheduler$AsyncEventQueue$$dispatch(AsyncEventQueue.scala:100)
	at org.apache.spark.scheduler.AsyncEventQueue$$anon$2.$anonfun$run$1(AsyncEventQueue.scala:96)
	at org.apache.spark.util.Utils$.tryOrStopSparkContext(Utils.scala:1356)
	at org.apache.spark.scheduler.AsyncEventQueue$$anon$2.run(AsyncEventQueue.scala:96)
Caused by: scala.MatchError: DeltaDynamicPartitionOverwriteCommand RelationV2[date_id#1069, rand_num#1070] spark_catalog.test.test_dataframev2 spark_catalog.test.test_dataframev2, DeltaTableV2(org.apache.spark.sql.SparkSession@2f072633,s3a://BUCKET_NAME/warehouse/test.db/test_dataframev2,Some(CatalogTable(
Catalog: spark_catalog
Database: test
Table: test_dataframev2
Owner: spark
Created Time: Mon Nov 24 12:16:20 UTC 2025
Last Access: UNKNOWN
Created By: Spark 3.5.5
Type: EXTERNAL
Provider: delta
delta.minReaderVersion=1, delta.minWriterVersion=7, numFilesErasureCoded=0]
Statistics: 6075 bytes
Location: s3a://BUCKET_NAME/warehouse/test.db/test_dataframev2
Serde Library: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
InputFormat: org.apache.hadoop.mapred.SequenceFileInputFormat
OutputFormat: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat
Schema: root
	|-- date_id: date (nullable = true)
	|-- rand_num: integer (nullable = true)
	)),Some(test.test_dataframev2),None,Map()), true
	+- Project [date_id#2, rand_num#5]
		+- Project [id#0L, date_id#2, cast(((rand(3006909157548589595) * cast(200 as double)) - cast(100 as double)) as int) AS rand_num#5]
			+- Project [id#0L, date_sub(current_date(Some(Etc/UTC)), cast((rand(9151590119353770883) * cast(3 as double)) as int)) AS date_id#2]
				+- Range (0, 100, step=1, splits=Some(4))
	(of class org.apache.spark.sql.delta.DeltaDynamicPartitionOverwriteCommand)
	at za.co.absa.spline.harvester.plugin.embedded.DataSourceV2Plugin.za$co$absa$spline$harvester$plugin$embedded$DataSourceV2Plugin$$processV2WriteCommand(DataSourceV2Plugin.scala:92)
	at za.co.absa.spline.harvester.plugin.embedded.DataSourceV2Plugin$$anonfun$2.applyOrElse(DataSourceV2Plugin.scala:73)
	at za.co.absa.spline.harvester.plugin.embedded.DataSourceV2Plugin$$anonfun$2.applyOrElse(DataSourceV2Plugin.scala:56)
	at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:175)
at scala.PartialFunction$Lifted.apply(PartialFunction.scala:228)
at scala.PartialFunction$Lifted.apply(PartialFunction.scala:224)
at za.co.absa.spline.harvester.builder.write.PluggableWriteCommandExtractor.asWriteCommand(PluggableWriteCommandExtractor.scala:45)
at za.co.absa.spline.harvester.LineageHarvester.$anonfun$tryExtractWriteCommand$1(LineageHarvester.scala:145)
at scala.util.Try$.apply(Try.scala:213)
at za.co.absa.spline.harvester.LineageHarvester.tryExtractWriteCommand(LineageHarvester.scala:145)
... 29 more

Code:

import logging
from pyspark.sql import SparkSession
from pyspark.sql.functions import (
    date_sub,
    rand,
    current_date
)

logging.basicConfig(
    format="%(asctime)s %(levelname)-8s %(message)s",
    level=logging.INFO,
    datefmt="%Y-%m-%d %H:%M:%S",
)
logging.getLogger("py4j").setLevel(logging.ERROR)
logging.getLogger(__name__)
logger = logging.getLogger(__name__)

spark = (
      SparkSession.builder.appName("test_dataframev2_spline")
      .config("spark.sql.shuffle.partitions", "300")
      .config("spark.sql.autoBroadcastJoinThreshold", "-1")
      .config("spark.driver.maxResultSize", "4g")
      .config("spark.hadoop.fs.s3a.access.key", "access_key")
      .config("spark.hadoop.fs.s3a.secret.key", "secret_key")
      .enableHiveSupport()
      .getOrCreate()
  )
spark.sparkContext.setLogLevel("ERROR")

target_table_name = "test.test_dataframev2"

df = (
      spark.range(100)
      .withColumn("date_id", date_sub(current_date(), (rand() * 3).cast("integer")))
      .withColumn("rand_num", (rand() * 200 - 100).cast("integer"))
      .select("date_id", "rand_num")
)

try:
    df.writeTo(target_table_name).using("delta").partitionedBy("date_id").createOrReplace()
    logger.info(f"method createOrReplace succedeed")
except Exception as e:
    logger.info(f"error createOrReplace - {e}")
 
df.writeTo(target_table_name).using("delta").partitionedBy("date_id").overwritePartitions()

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type
    No fields configured for issues without a type.

    Projects

    Status
    New

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions