From b2182942e7ed701a2620dda2fb359bf7f165c02f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=96=86=E5=AE=87?= Date: Tue, 13 Jan 2026 18:00:30 +0800 Subject: [PATCH] [spark] calling merge into on DE table should not update indexed columns. --- .../MergeIntoPaimonDataEvolutionTable.scala | 56 +++++++++++++++---- .../spark/sql/RowTrackingTestBase.scala | 40 +++++++++++++ 2 files changed, 84 insertions(+), 12 deletions(-) diff --git a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/MergeIntoPaimonDataEvolutionTable.scala b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/MergeIntoPaimonDataEvolutionTable.scala index e2eaed8fe54f..1d55969e70b3 100644 --- a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/MergeIntoPaimonDataEvolutionTable.scala +++ b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/MergeIntoPaimonDataEvolutionTable.scala @@ -75,6 +75,50 @@ case class MergeIntoPaimonDataEvolutionTable( import MergeIntoPaimonDataEvolutionTable._ override val table: FileStoreTable = v2Table.getTable.asInstanceOf[FileStoreTable] + + private val updateColumns: Set[AttributeReference] = { + val columns = mutable.Set[AttributeReference]() + for (action <- matchedActions) { + action match { + case updateAction: UpdateAction => + for (assignment <- updateAction.assignments) { + if (!assignment.key.equals(assignment.value)) { + val key = assignment.key.asInstanceOf[AttributeReference] + columns ++= Seq(key) + } + } + } + } + columns.toSet + } + + Option(table.snapshotManager().latestSnapshot()) match { + case Some(snapshot) => { + val updatedColNames: Set[String] = updateColumns.map(_.name) + val rowType = table.rowType() + + val globalIndexedFieldNames: Set[String] = table + .store() + .newIndexFileHandler() + .scan(snapshot, entry => entry.indexFile().globalIndexMeta() != null) + .asScala + .map(entry => entry.indexFile().globalIndexMeta().indexFieldId()) + .map(fieldId => rowType.getField(fieldId).name()) + .toSet + + val conflicted = updatedColNames.intersect(globalIndexedFieldNames) + if (conflicted.nonEmpty) { + throw new RuntimeException( + s"""MergeInto: update columns contain globally indexed columns, not supported now. + |Updated columns: ${updatedColNames.toSeq.sorted.mkString("[", ", ", "]")} + |Global-indexed columns: ${globalIndexedFieldNames.toSeq.sorted.mkString("[", ", ", "]")} + |Conflicted columns: ${conflicted.toSeq.sorted.mkString("[", ", ", "]")} + |""".stripMargin) + } + } + case None => () + } + private val firstRowIds: Seq[Long] = table .store() .newScan() @@ -230,18 +274,6 @@ case class MergeIntoPaimonDataEvolutionTable( (o1, o2) => { o1.toString().compareTo(o2.toString()) }) ++ mergeFields - val updateColumns = mutable.Set[AttributeReference]() - for (action <- matchedActions) { - action match { - case updateAction: UpdateAction => - for (assignment <- updateAction.assignments) { - if (!assignment.key.equals(assignment.value)) { - val key = assignment.key.asInstanceOf[AttributeReference] - updateColumns ++= Seq(key) - } - } - } - } val updateColumnsSorted = updateColumns.toSeq.sortBy( s => targetTable.output.map(x => x.toString()).indexOf(s.toString())) diff --git a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/RowTrackingTestBase.scala b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/RowTrackingTestBase.scala index 12639e6ef3bd..79d7f65793ad 100644 --- a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/RowTrackingTestBase.scala +++ b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/RowTrackingTestBase.scala @@ -625,4 +625,44 @@ abstract class RowTrackingTestBase extends PaimonSparkTestBase { ) } } + + test("Data Evolution: Should not update indexed columns") { + withTable("T") { + spark.sql(""" + |CREATE TABLE T (id INT, name STRING) + |TBLPROPERTIES ( + | 'bucket' = '-1', + | 'global-index.row-count-per-shard' = '10000', + | 'row-tracking.enabled' = 'true', + | 'data-evolution.enabled' = 'true', + | 'btree-index.records-per-range' = '1000') + |""".stripMargin) + + val values = + (0 until 100000).map(i => s"($i, 'name_$i')").mkString(",") + spark.sql(s"INSERT INTO T VALUES $values") + + // create global index for table + val output = + spark + .sql( + "CALL sys.create_global_index(table => 'test.T', index_column => 'name', index_type => 'btree'," + + " options => 'btree-index.records-per-range=1000')") + .collect() + .head + + assert(output.getBoolean(0)) + + // call merge into to update data + assert(intercept[RuntimeException] { + sql(s""" + |MERGE INTO T + |USING T AS source + |ON T._ROW_ID = source._ROW_ID + |WHEN MATCHED AND T.id = 500 THEN UPDATE SET name = "updatedName" + |""".stripMargin) + }.getMessage + .contains("MergeInto: update columns contain globally indexed columns, not supported now.")) + } + } }