From a48352450ca1411e37faff2b26e9cae5f370d981 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=8F=B2=E5=A4=A7=E6=B4=8B?= Date: Thu, 27 Nov 2025 15:29:28 +0800 Subject: [PATCH 01/11] [spark] Support data evolution for bucket table --- docs/content/append-table/row-tracking.md | 2 +- .../org/apache/paimon/schema/TableSchema.java | 29 +- .../BucketedAppendFileStoreWrite.java | 2 +- .../paimon/operation/FileStoreCommitImpl.java | 4 +- .../paimon/schema/SchemaValidation.java | 4 - .../spark/DataEvolutionSparkTableWrite.scala | 11 +- .../analysis/PaimonMergeIntoBase.scala | 31 +- .../catalyst/analysis/RowLevelHelper.scala | 18 +- .../commands/DataEvolutionPaimonWriter.scala | 10 +- .../spark/sql/RowTrackingTestBase.scala | 815 +++++++++++------- 10 files changed, 587 insertions(+), 339 deletions(-) diff --git a/docs/content/append-table/row-tracking.md b/docs/content/append-table/row-tracking.md index c1c815c487c7..aa804f98e70e 100644 --- a/docs/content/append-table/row-tracking.md +++ b/docs/content/append-table/row-tracking.md @@ -47,7 +47,7 @@ CREATE TABLE part_t ( WITH ('row-tracking.enabled' = 'true'); ``` Notice that: -- Row tracking is only supported for unaware append tables, not for primary key tables. Which means you can't define `bucket` and `bucket-key` for the table. +- Row tracking is only supported for append tables, not for primary key tables. - Only spark support update, merge into and delete operations on row-tracking tables, Flink SQL does not support these operations yet. - This function is experimental, this line will be removed after being stable. diff --git a/paimon-api/src/main/java/org/apache/paimon/schema/TableSchema.java b/paimon-api/src/main/java/org/apache/paimon/schema/TableSchema.java index 8fee740fd1cd..f0432191586c 100644 --- a/paimon-api/src/main/java/org/apache/paimon/schema/TableSchema.java +++ b/paimon-api/src/main/java/org/apache/paimon/schema/TableSchema.java @@ -127,6 +127,31 @@ public TableSchema( numBucket = CoreOptions.fromMap(options).bucket(); } + private TableSchema( + int version, + long id, + List fields, + int highestFieldId, + List partitionKeys, + List primaryKeys, + Map options, + @Nullable String comment, + long timeMillis, + List bucketKeys, + int numBucket) { + this.version = version; + this.id = id; + this.fields = Collections.unmodifiableList(new ArrayList<>(fields)); + this.highestFieldId = highestFieldId; + this.partitionKeys = partitionKeys; + this.primaryKeys = primaryKeys; + this.options = options; + this.comment = comment; + this.timeMillis = timeMillis; + this.bucketKeys = bucketKeys; + this.numBucket = numBucket; + } + public int version() { return version; } @@ -294,7 +319,9 @@ public TableSchema project(@Nullable List writeCols) { primaryKeys, options, comment, - timeMillis); + timeMillis, + bucketKeys, + numBucket); } private List projectedDataFields(List projectedFieldNames) { diff --git a/paimon-core/src/main/java/org/apache/paimon/operation/BucketedAppendFileStoreWrite.java b/paimon-core/src/main/java/org/apache/paimon/operation/BucketedAppendFileStoreWrite.java index 71f10d4b3df4..23e4357f21a8 100644 --- a/paimon-core/src/main/java/org/apache/paimon/operation/BucketedAppendFileStoreWrite.java +++ b/paimon-core/src/main/java/org/apache/paimon/operation/BucketedAppendFileStoreWrite.java @@ -78,7 +78,7 @@ protected CompactManager getCompactManager( List restoredFiles, ExecutorService compactExecutor, @Nullable BucketedDvMaintainer dvMaintainer) { - if (options.writeOnly()) { + if (options.writeOnly() || options.dataEvolutionEnabled()) { return new NoopCompactManager(); } else { Function dvFactory = diff --git a/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java b/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java index ceb68ed6ba5c..2a33ac264c25 100644 --- a/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java +++ b/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java @@ -29,7 +29,6 @@ import org.apache.paimon.io.DataFilePathFactory; import org.apache.paimon.manifest.FileEntry; import org.apache.paimon.manifest.FileKind; -import org.apache.paimon.manifest.FileSource; import org.apache.paimon.manifest.IndexManifestEntry; import org.apache.paimon.manifest.IndexManifestFile; import org.apache.paimon.manifest.ManifestCommittable; @@ -1284,8 +1283,7 @@ private long assignRowTrackingMeta( checkArgument( entry.file().fileSource().isPresent(), "This is a bug, file source field for row-tracking table must present."); - if (entry.file().fileSource().get().equals(FileSource.APPEND) - && entry.file().firstRowId() == null) { + if (entry.file().firstRowId() == null) { if (isBlobFile(entry.file().fileName())) { if (blobStart >= start) { throw new IllegalStateException( diff --git a/paimon-core/src/main/java/org/apache/paimon/schema/SchemaValidation.java b/paimon-core/src/main/java/org/apache/paimon/schema/SchemaValidation.java index f84a277c075a..69e5e1867715 100644 --- a/paimon-core/src/main/java/org/apache/paimon/schema/SchemaValidation.java +++ b/paimon-core/src/main/java/org/apache/paimon/schema/SchemaValidation.java @@ -631,10 +631,6 @@ private static void validateMergeFunctionFactory(TableSchema schema) { private static void validateRowTracking(TableSchema schema, CoreOptions options) { boolean rowTrackingEnabled = options.rowTrackingEnabled(); if (rowTrackingEnabled) { - checkArgument( - options.bucket() == -1, - "Cannot define %s for row tracking table, it only support bucket = -1", - CoreOptions.BUCKET.key()); checkArgument( schema.primaryKeys().isEmpty(), "Cannot define %s for row tracking table.", diff --git a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/DataEvolutionSparkTableWrite.scala b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/DataEvolutionSparkTableWrite.scala index 0ca68e49337f..29b8d0af0971 100644 --- a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/DataEvolutionSparkTableWrite.scala +++ b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/DataEvolutionSparkTableWrite.scala @@ -40,7 +40,7 @@ import scala.collection.mutable.ListBuffer case class DataEvolutionSparkTableWrite( writeBuilder: BatchWriteBuilder, writeType: RowType, - firstRowIdToPartitionMap: mutable.HashMap[Long, Tuple2[BinaryRow, Long]], + firstRowIdToPartitionMap: mutable.HashMap[Long, Tuple3[BinaryRow, Int, Long]], blobAsDescriptor: Boolean, catalogContext: CatalogContext) extends SparkTableWriteTrait { @@ -68,7 +68,7 @@ case class DataEvolutionSparkTableWrite( def newCurrentWriter(firstRowId: Long): Unit = { finishCurrentWriter() - val (partition, numRecords) = firstRowIdToPartitionMap.getOrElse(firstRowId, null) + val (partition, bucket, numRecords) = firstRowIdToPartitionMap.getOrElse(firstRowId, null) if (partition == null) { throw new IllegalArgumentException( s"First row ID $firstRowId not found in partition map. " + @@ -81,8 +81,8 @@ case class DataEvolutionSparkTableWrite( .asInstanceOf[TableWriteImpl[InternalRow]] .getWrite .asInstanceOf[AbstractFileStoreWrite[InternalRow]] - .createWriter(partition, 0) - currentWriter = PerFileWriter(partition, firstRowId, writer, numRecords) + .createWriter(partition, bucket) + currentWriter = PerFileWriter(partition, bucket, firstRowId, writer, numRecords) } def finishCurrentWriter(): Unit = { @@ -122,6 +122,7 @@ case class DataEvolutionSparkTableWrite( private case class PerFileWriter( partition: BinaryRow, + bucket: Int, firstRowId: Long, recordWriter: RecordWriter[InternalRow], numRecords: Long) { @@ -149,7 +150,7 @@ case class DataEvolutionSparkTableWrite( val dataFileMeta = dataFiles.get(0).assignFirstRowId(firstRowId) new CommitMessageImpl( partition, - 0, + bucket, null, new DataIncrement( java.util.Arrays.asList(dataFileMeta), diff --git a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/analysis/PaimonMergeIntoBase.scala b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/analysis/PaimonMergeIntoBase.scala index 8a52273eeab2..04336bbc8098 100644 --- a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/analysis/PaimonMergeIntoBase.scala +++ b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/analysis/PaimonMergeIntoBase.scala @@ -18,15 +18,19 @@ package org.apache.paimon.spark.catalyst.analysis +import org.apache.paimon.CoreOptions.BUCKET_KEY import org.apache.paimon.spark.SparkTable import org.apache.paimon.spark.catalyst.analysis.expressions.ExpressionHelper import org.apache.paimon.spark.commands.{MergeIntoPaimonDataEvolutionTable, MergeIntoPaimonTable} +import org.apache.paimon.utils.StringUtils import org.apache.spark.sql.SparkSession import org.apache.spark.sql.catalyst.expressions.{AttributeReference, AttributeSet, Expression, SubqueryExpression} import org.apache.spark.sql.catalyst.plans.logical._ import org.apache.spark.sql.catalyst.rules.Rule +import java.util + import scala.collection.JavaConverters._ trait PaimonMergeIntoBase @@ -72,6 +76,13 @@ trait PaimonMergeIntoBase resolveNotMatchedBySourceActions(merge, targetOutput, dataEvolutionEnabled) if (dataEvolutionEnabled) { + val bucketKeySt = v2Table.getTable.options().get(BUCKET_KEY.key) + if (!StringUtils.isNullOrWhitespaceOnly(bucketKeySt)) { + checkUpdateActionValidityForBucketKey( + AttributeSet(targetOutput), + updateActions, + bucketKeySt.split(",").toSeq) + } MergeIntoPaimonDataEvolutionTable( v2Table, merge.targetTable, @@ -142,7 +153,7 @@ trait PaimonMergeIntoBase lazy val isMergeConditionValid = { val mergeExpressions = splitConjunctivePredicates(mergeCondition) primaryKeys.forall { - primaryKey => isUpdateExpressionToPrimaryKey(targetOutput, mergeExpressions, primaryKey) + primaryKey => isUpdateExpressionForKey(targetOutput, mergeExpressions, primaryKey) } } @@ -156,4 +167,22 @@ trait PaimonMergeIntoBase throw new RuntimeException("Can't update the primary key column in update clause.") } } + + /** This check will avoid to update the bucket key columns */ + private def checkUpdateActionValidityForBucketKey( + targetOutput: AttributeSet, + actions: Seq[UpdateAction], + bucketKeys: Seq[String]): Unit = { + + // Check whether there are an update expression related to any primary key. + def isUpdateActionValid(action: UpdateAction): Boolean = { + validUpdateAssignment(targetOutput, bucketKeys, action.assignments) + } + + val valid = actions.forall(isUpdateActionValid) + if (!valid) { + throw new RuntimeException( + "Can't update the bucket key column in data-evolution update clause.") + } + } } diff --git a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/analysis/RowLevelHelper.scala b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/analysis/RowLevelHelper.scala index eecf0542e1d1..534ba8c60b03 100644 --- a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/analysis/RowLevelHelper.scala +++ b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/analysis/RowLevelHelper.scala @@ -41,23 +41,21 @@ trait RowLevelHelper extends SQLConfHelper { protected def validUpdateAssignment( output: AttributeSet, - primaryKeys: Seq[String], + keys: Seq[String], assignments: Seq[Assignment]): Boolean = { - !primaryKeys.exists { - primaryKey => isUpdateExpressionToPrimaryKey(output, assignments, primaryKey) - } + !keys.exists(key => isUpdateExpressionForKey(output, assignments, key)) } // Check whether there is an update expression related to primary key. - protected def isUpdateExpressionToPrimaryKey( + protected def isUpdateExpressionForKey( output: AttributeSet, expressions: Seq[Expression], - primaryKey: String): Boolean = { + key: String): Boolean = { val resolver = conf.resolver // Check whether this attribute is same to primary key and is from target table. - def isTargetPrimaryKey(attr: AttributeReference): Boolean = { - resolver(primaryKey, attr.name) && output.contains(attr) + def isTargetKey(attr: AttributeReference): Boolean = { + resolver(key, attr.name) && output.contains(attr) } expressions @@ -67,9 +65,9 @@ trait RowLevelHelper extends SQLConfHelper { } .exists { case EqualTo(left: AttributeReference, right: AttributeReference) => - isTargetPrimaryKey(left) || isTargetPrimaryKey(right) + isTargetKey(left) || isTargetKey(right) case Assignment(key: AttributeReference, _) => - isTargetPrimaryKey(key) + isTargetKey(key) case _ => false } } diff --git a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/DataEvolutionPaimonWriter.scala b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/DataEvolutionPaimonWriter.scala index 189a2088ae99..cdc40ca57938 100644 --- a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/DataEvolutionPaimonWriter.scala +++ b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/DataEvolutionPaimonWriter.scala @@ -38,14 +38,14 @@ import scala.collection.mutable case class DataEvolutionPaimonWriter(paimonTable: FileStoreTable) extends WriteHelper { - private lazy val firstRowIdToPartitionMap: mutable.HashMap[Long, Tuple2[BinaryRow, Long]] = + private lazy val firstRowIdToPartitionMap: mutable.HashMap[Long, Tuple3[BinaryRow, Int, Long]] = initPartitionMap() override val table: FileStoreTable = paimonTable.copy(dynamicOp) @transient private lazy val serializer = new CommitMessageSerializer - private def initPartitionMap(): mutable.HashMap[Long, Tuple2[BinaryRow, Long]] = { - val firstRowIdToPartitionMap = new mutable.HashMap[Long, Tuple2[BinaryRow, Long]] + private def initPartitionMap(): mutable.HashMap[Long, Tuple3[BinaryRow, Int, Long]] = { + val firstRowIdToPartitionMap = new mutable.HashMap[Long, Tuple3[BinaryRow, Int, Long]] table .store() .newScan() @@ -53,7 +53,9 @@ case class DataEvolutionPaimonWriter(paimonTable: FileStoreTable) extends WriteH .forEachRemaining( k => firstRowIdToPartitionMap - .put(k.file().firstRowId(), Tuple2.apply(k.partition(), k.file().rowCount()))) + .put( + k.file().firstRowId(), + Tuple3.apply(k.partition(), k.bucket(), k.file().rowCount()))) firstRowIdToPartitionMap } diff --git a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/RowTrackingTestBase.scala b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/RowTrackingTestBase.scala index cda852f3a402..afdd1a476940 100644 --- a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/RowTrackingTestBase.scala +++ b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/RowTrackingTestBase.scala @@ -25,30 +25,35 @@ import org.apache.spark.sql.Row abstract class RowTrackingTestBase extends PaimonSparkTestBase { - test("Row Tracking: read row Tracking") { - withTable("t") { - sql("CREATE TABLE t (id INT, data STRING) TBLPROPERTIES ('row-tracking.enabled' = 'true')") - sql("INSERT INTO t VALUES (11, 'a'), (22, 'b')") - - checkAnswer( - sql("SELECT *, _ROW_ID, _SEQUENCE_NUMBER FROM t"), - Seq(Row(11, "a", 0, 1), Row(22, "b", 1, 1)) - ) - checkAnswer( - sql("SELECT _ROW_ID, data, _SEQUENCE_NUMBER, id FROM t"), - Seq(Row(0, "a", 1, 11), Row(1, "b", 1, 22)) - ) - } + Seq(false, true).foreach { + bucketEnable => + { + test(s"Row Tracking: read row Tracking, bucket: $bucketEnable") { + withTable("t") { + sql( + s"CREATE TABLE t (id INT, data STRING) TBLPROPERTIES ('row-tracking.enabled' = 'true' ${bucketProperties(bucketEnable)})") + sql("INSERT INTO t VALUES (11, 'a'), (11, 'b')") + sql("INSERT INTO t VALUES (22, 'a'), (22, 'b')") + + checkAnswer( + sql("SELECT *, _ROW_ID, _SEQUENCE_NUMBER FROM t"), + Seq(Row(11, "a", 0, 1), Row(11, "b", 1, 1), Row(22, "a", 2, 2), Row(22, "b", 3, 2)) + ) + checkAnswer( + sql("SELECT _ROW_ID, data, _SEQUENCE_NUMBER, id FROM t"), + Seq(Row(0, "a", 1, 11), Row(1, "b", 1, 11), Row(2, "a", 2, 22), Row(3, "b", 2, 22)) + ) + } + } + } } test("Row Tracking: compact table") { withTable("t") { sql( - "CREATE TABLE t (id INT, data INT) TBLPROPERTIES ('row-tracking.enabled' = 'true', 'compaction.min.file-num'='2')") + s"CREATE TABLE t (id INT, data INT) TBLPROPERTIES ('row-tracking.enabled' = 'true', 'compaction.min.file-num'='2')") - sql("INSERT INTO t VALUES (1, 1)") - sql("INSERT INTO t VALUES (2, 2)") - sql("INSERT INTO t VALUES (3, 3)") + batchInsert(Seq((1, 1), (2, 2), (3, 3)), "t") checkAnswer( sql("SELECT *, _ROW_ID, _SEQUENCE_NUMBER FROM t ORDER BY id"), Seq(Row(1, 1, 0, 1), Row(2, 2, 1, 2), Row(3, 3, 2, 3)) @@ -64,345 +69,537 @@ abstract class RowTrackingTestBase extends PaimonSparkTestBase { } } - test("Row Tracking: delete table") { + test("Row Tracking: compact bucket table") { withTable("t") { - sql("CREATE TABLE t (id INT, data INT) TBLPROPERTIES ('row-tracking.enabled' = 'true')") + sql( + s"CREATE TABLE t (id INT, data INT) TBLPROPERTIES ('row-tracking.enabled' = 'true', 'compaction.min.file-num'='2' ${bucketProperties(true)})") - sql("INSERT INTO t SELECT /*+ REPARTITION(1) */ id, id AS data FROM range(1, 4)") - sql("DELETE FROM t WHERE id = 2") - checkAnswer( - sql("SELECT *, _ROW_ID, _SEQUENCE_NUMBER FROM t ORDER BY id"), - Seq(Row(1, 1, 0, 1), Row(3, 3, 2, 1)) - ) - sql("DELETE FROM t WHERE _ROW_ID = 2") + batchInsert(Seq((1, 1), (2, 2), (3, 3)), "t") checkAnswer( sql("SELECT *, _ROW_ID, _SEQUENCE_NUMBER FROM t ORDER BY id"), - Seq(Row(1, 1, 0, 1)) + Seq(Row(1, 1, 3, 3), Row(2, 2, 4, 3), Row(3, 3, 5, 4)) ) } } - test("Row Tracking: update table") { - withTable("t") { - sql("CREATE TABLE t (id INT, data INT) TBLPROPERTIES ('row-tracking.enabled' = 'true')") - - sql("INSERT INTO t SELECT /*+ REPARTITION(1) */ id, id AS data FROM range(1, 4)") - checkAnswer( - sql("SELECT *, _ROW_ID, _SEQUENCE_NUMBER FROM t ORDER BY id"), - Seq(Row(1, 1, 0, 1), Row(2, 2, 1, 1), Row(3, 3, 2, 1)) - ) - - sql("UPDATE t SET data = 22 WHERE id = 2") - checkAnswer( - sql("SELECT *, _ROW_ID, _SEQUENCE_NUMBER FROM t ORDER BY id"), - Seq(Row(1, 1, 0, 1), Row(2, 22, 1, 2), Row(3, 3, 2, 1)) - ) - - sql("UPDATE t SET data = 222 WHERE _ROW_ID = 1") - checkAnswer( - sql("SELECT *, _ROW_ID, _SEQUENCE_NUMBER FROM t ORDER BY id"), - Seq(Row(1, 1, 0, 1), Row(2, 222, 1, 3), Row(3, 3, 2, 1)) - ) - } + Seq(false, true).foreach { + bucketEnable => + { + test(s"Row Tracking: delete table, bucket: $bucketEnable") { + withTable("t") { + sql( + s"CREATE TABLE t (id INT, data INT) TBLPROPERTIES ('row-tracking.enabled' = 'true' ${bucketProperties(bucketEnable)})") + + batchInsert(Seq((1, 1), (2, 2), (3, 3)), "t") + + sql("DELETE FROM t WHERE id = 2") + checkAnswer( + sql("SELECT *, _ROW_ID, _SEQUENCE_NUMBER FROM t ORDER BY id"), + Seq(Row(1, 1, 0, 1), Row(3, 3, 2, 3)) + ) + sql("DELETE FROM t WHERE _ROW_ID = 2") + checkAnswer( + sql("SELECT *, _ROW_ID, _SEQUENCE_NUMBER FROM t ORDER BY id"), + Seq(Row(1, 1, 0, 1)) + ) + } + } + } } - test("Row Tracking: update table without condition") { - withTable("t") { - sql("CREATE TABLE t (id INT, data INT) TBLPROPERTIES ('row-tracking.enabled' = 'true')") - - sql("INSERT INTO t SELECT /*+ REPARTITION(1) */ id, id AS data FROM range(1, 4)") - - sql("UPDATE t SET data = 22") - checkAnswer( - sql("SELECT *, _ROW_ID, _SEQUENCE_NUMBER FROM t ORDER BY id"), - Seq(Row(1, 22, 0, 2), Row(2, 22, 1, 2), Row(3, 22, 2, 2)) - ) - } + Seq(false, true).foreach { + bucketEnable => + { + test(s"Row Tracking: update table, bucket: $bucketEnable") { + withTable("t") { + sql( + s"CREATE TABLE t (id INT, data INT) TBLPROPERTIES ('row-tracking.enabled' = 'true' ${bucketProperties(bucketEnable)})") + + batchInsert(Seq((1, 1), (2, 2), (3, 3)), "t") + checkAnswer( + sql("SELECT *, _ROW_ID, _SEQUENCE_NUMBER FROM t ORDER BY id"), + Seq(Row(1, 1, 0, 1), Row(2, 2, 1, 2), Row(3, 3, 2, 3)) + ) + + sql("UPDATE t SET data = 22 WHERE id = 2") + checkAnswer( + sql("SELECT *, _ROW_ID, _SEQUENCE_NUMBER FROM t ORDER BY id"), + Seq(Row(1, 1, 0, 1), Row(2, 22, 1, 4), Row(3, 3, 2, 3)) + ) + + sql("UPDATE t SET data = 222 WHERE _ROW_ID = 1") + checkAnswer( + sql("SELECT *, _ROW_ID, _SEQUENCE_NUMBER FROM t ORDER BY id"), + Seq(Row(1, 1, 0, 1), Row(2, 222, 1, 5), Row(3, 3, 2, 3)) + ) + } + } + } } - test("Row Tracking: merge into table") { - withTable("s", "t") { - sql("CREATE TABLE s (id INT, b INT) TBLPROPERTIES ('row-tracking.enabled' = 'true')") - sql("INSERT INTO s VALUES (1, 11), (2, 22)") - - sql("CREATE TABLE t (id INT, b INT) TBLPROPERTIES ('row-tracking.enabled' = 'true')") - sql("INSERT INTO t SELECT /*+ REPARTITION(1) */ id, id AS b FROM range(2, 4)") - checkAnswer( - sql("SELECT *, _ROW_ID, _SEQUENCE_NUMBER FROM t ORDER BY id"), - Seq(Row(2, 2, 0, 1), Row(3, 3, 1, 1)) - ) - - sql(""" - |MERGE INTO t - |USING s - |ON t.id = s.id - |WHEN MATCHED THEN UPDATE SET t.b = s.b - |WHEN NOT MATCHED THEN INSERT * - |""".stripMargin) - checkAnswer( - sql("SELECT *, _ROW_ID, _SEQUENCE_NUMBER FROM t ORDER BY id"), - Seq(Row(1, 11, 2, 2), Row(2, 22, 0, 2), Row(3, 3, 1, 1)) - ) - } + Seq(false, true).foreach { + bucketEnable => + { + test(s"Row Tracking: update table without condition, bucket: $bucketEnable") { + withTable("t") { + sql( + s"CREATE TABLE t (id INT, data INT) TBLPROPERTIES ('row-tracking.enabled' = 'true' ${bucketProperties(bucketEnable)})") + + batchInsert(Seq((1, 1), (2, 2), (3, 3)), "t") + + sql("UPDATE t SET data = 22") + checkAnswer( + sql("SELECT *, _ROW_ID, _SEQUENCE_NUMBER FROM t ORDER BY id"), + Seq(Row(1, 22, 0, 4), Row(2, 22, 1, 4), Row(3, 22, 2, 4)) + ) + } + } + } } - test("Row Tracking: merge into table with only insert") { - withTable("s", "t") { - sql("CREATE TABLE s (id INT, b INT) TBLPROPERTIES ('row-tracking.enabled' = 'true')") - sql("INSERT INTO s VALUES (1, 11), (2, 22)") - - sql("CREATE TABLE t (id INT, b INT) TBLPROPERTIES ('row-tracking.enabled' = 'true')") - sql("INSERT INTO t SELECT /*+ REPARTITION(1) */ id, id AS b FROM range(2, 4)") + Seq(false, true).foreach { + bucketEnable => + { + test(s"Row Tracking: merge into table, bucket: $bucketEnable") { + withTable("s", "t") { + sql( + s"CREATE TABLE s (id INT, b INT) TBLPROPERTIES ('row-tracking.enabled' = 'true' ${bucketProperties(bucketEnable)})") + sql("INSERT INTO s VALUES (1, 11), (2, 22)") + + sql( + s"CREATE TABLE t (id INT, b INT) TBLPROPERTIES ('row-tracking.enabled' = 'true' ${bucketProperties(bucketEnable)})") + sql("INSERT INTO t VALUES(2, 2),(4, 4)") + + checkAnswer( + sql("SELECT *, _ROW_ID, _SEQUENCE_NUMBER FROM t ORDER BY id"), + Seq(Row(2, 2, 0, 1), Row(4, 4, 1, 1)) + ) + + sql(""" + |MERGE INTO t + |USING s + |ON t.id = s.id + |WHEN MATCHED THEN UPDATE SET t.b = s.b + |WHEN NOT MATCHED THEN INSERT * + |""".stripMargin) + checkAnswer( + sql("SELECT *, _ROW_ID, _SEQUENCE_NUMBER FROM t ORDER BY id"), + Seq(Row(1, 11, 2, 2), Row(2, 22, 0, 2), Row(4, 4, 1, 1)) + ) + } + } + } + } - sql(""" - |MERGE INTO t - |USING s - |ON t.id = s.id - |WHEN NOT MATCHED THEN INSERT * - |""".stripMargin) - checkAnswer( - sql("SELECT *, _ROW_ID, _SEQUENCE_NUMBER FROM t ORDER BY id"), - Seq(Row(1, 11, 2, 2), Row(2, 2, 0, 1), Row(3, 3, 1, 1)) - ) - } + Seq(false, true).foreach { + bucketEnable => + { + test(s"Row Tracking: merge into table with only insert, bucket:$bucketEnable") { + withTable("s", "t") { + sql( + s"CREATE TABLE s (id INT, b INT) TBLPROPERTIES ('row-tracking.enabled' = 'true' ${bucketProperties(bucketEnable)})") + sql("INSERT INTO s VALUES (1, 11), (2, 22)") + + sql( + s"CREATE TABLE t (id INT, b INT) TBLPROPERTIES ('row-tracking.enabled' = 'true' ${bucketProperties(bucketEnable)})") + sql("INSERT INTO t VALUES(2, 2),(4, 4)") + + sql(""" + |MERGE INTO t + |USING s + |ON t.id = s.id + |WHEN NOT MATCHED THEN INSERT * + |""".stripMargin) + checkAnswer( + sql("SELECT *, _ROW_ID, _SEQUENCE_NUMBER FROM t ORDER BY id"), + Seq(Row(1, 11, 2, 2), Row(2, 2, 0, 1), Row(4, 4, 1, 1)) + ) + } + } + } } - test("Row Tracking: merge into table with only delete") { - withTable("s", "t") { - sql("CREATE TABLE s (id INT, b INT) TBLPROPERTIES ('row-tracking.enabled' = 'true')") - sql("INSERT INTO s VALUES (1, 11), (2, 22)") + Seq(false, true).foreach { + bucketEnable => + { + test(s"Row Tracking: merge into table with only delete, bucket: $bucketEnable") { + withTable("s", "t") { + sql( + s"CREATE TABLE s (id INT, b INT) TBLPROPERTIES ('row-tracking.enabled' = 'true' ${bucketProperties(bucketEnable)})") + sql("INSERT INTO s VALUES (1, 11), (2, 22)") + + sql( + s"CREATE TABLE t (id INT, b INT) TBLPROPERTIES ('row-tracking.enabled' = 'true' ${bucketProperties(bucketEnable)})") + sql("INSERT INTO t VALUES(2, 2),(4, 4)") + + sql(""" + |MERGE INTO t + |USING s + |ON t.id = s.id + |WHEN MATCHED THEN DELETE + |""".stripMargin) + checkAnswer( + sql("SELECT *, _ROW_ID, _SEQUENCE_NUMBER FROM t ORDER BY id"), + Seq(Row(4, 4, 1, 1)) + ) + } + } + } + } - sql("CREATE TABLE t (id INT, b INT) TBLPROPERTIES ('row-tracking.enabled' = 'true')") - sql("INSERT INTO t SELECT /*+ REPARTITION(1) */ id, id AS b FROM range(2, 4)") + Seq(false, true).foreach { + bucketEnable => + { + test(s"Row Tracking: merge into table with only update, bucket: $bucketEnable") { + withTable("s", "t") { + sql( + s"CREATE TABLE s (id INT, b INT) TBLPROPERTIES ('row-tracking.enabled' = 'true' ${bucketProperties(bucketEnable)})") + sql("INSERT INTO s VALUES (1, 11), (2, 22)") + + sql( + s"CREATE TABLE t (id INT, b INT) TBLPROPERTIES ('row-tracking.enabled' = 'true' ${bucketProperties(bucketEnable)})") + sql("INSERT INTO t VALUES(2, 2),(4, 4)") + + sql(""" + |MERGE INTO t + |USING s + |ON t.id = s.id + |WHEN MATCHED THEN UPDATE SET * + |""".stripMargin) + checkAnswer( + sql("SELECT *, _ROW_ID, _SEQUENCE_NUMBER FROM t ORDER BY id"), + Seq(Row(2, 22, 0, 2), Row(4, 4, 1, 1)) + ) + } + } + } + } - sql(""" - |MERGE INTO t - |USING s - |ON t.id = s.id - |WHEN MATCHED THEN DELETE - |""".stripMargin) - checkAnswer( - sql("SELECT *, _ROW_ID, _SEQUENCE_NUMBER FROM t ORDER BY id"), - Seq(Row(3, 3, 1, 1)) - ) - } + Seq(false, true).foreach { + bucketEnable => + { + test(s"Data Evolution: insert into table with data-evolution, bucket: $bucketEnable") { + withTable("s", "t") { + sql("CREATE TABLE s (id INT, b INT)") + sql("INSERT INTO s VALUES (1, 11), (2, 22)") + + sql( + s"CREATE TABLE t (id INT, b INT, c INT) TBLPROPERTIES ('row-tracking.enabled' = 'true', 'data-evolution.enabled' = 'true' ${bucketProperties(bucketEnable)})") + sql("INSERT INTO t VALUES(2, 2, 2),(4, 4, 4)") + + sql(""" + |MERGE INTO t + |USING s + |ON t.id = s.id + |WHEN NOT MATCHED THEN INSERT (id, b, c) VALUES (id, b, 11) + |""".stripMargin) + + checkAnswer( + sql("SELECT *, _ROW_ID, _SEQUENCE_NUMBER FROM t ORDER BY id"), + Seq(Row(1, 11, 11, 2, 3), Row(2, 2, 2, 0, 1), Row(4, 4, 4, 1, 2)) + ) + } + } + } } - test("Row Tracking: merge into table with only update") { - withTable("s", "t") { - sql("CREATE TABLE s (id INT, b INT) TBLPROPERTIES ('row-tracking.enabled' = 'true')") - sql("INSERT INTO s VALUES (1, 11), (2, 22)") + Seq(false, true).foreach { + bucketEnable => + { + test( + s"Data Evolution: insert into table with data-evolution partial insert, bucket: $bucketEnable") { + withTable("s", "t") { + sql("CREATE TABLE s (id INT, b INT)") + sql("INSERT INTO s VALUES (1, 11), (2, 22)") + + sql( + s"CREATE TABLE t (id INT, b INT, c INT) TBLPROPERTIES ('row-tracking.enabled' = 'true', 'data-evolution.enabled' = 'true' ${bucketProperties(bucketEnable)})") + sql("INSERT INTO t SELECT /*+ REPARTITION(1) */ id, id AS b, id AS c FROM range(2, 4)") + + sql(""" + |MERGE INTO t + |USING s + |ON t.id = s.id + |WHEN NOT MATCHED THEN INSERT (id, b) VALUES (-1, b) + |""".stripMargin) + + sql(""" + |MERGE INTO t + |USING s + |ON t.id = s.id + |WHEN NOT MATCHED THEN INSERT (b) VALUES (b) + |""".stripMargin) + + sql(""" + |MERGE INTO t + |USING s + |ON t.id = s.id + |WHEN NOT MATCHED THEN INSERT (id, c) VALUES (3, 4) + |""".stripMargin) + + checkAnswer( + sql("SELECT * FROM t ORDER BY id"), + Seq( + Row(null, 11, null), + Row(-1, 11, null), + Row(2, 2, 2), + Row(3, 3, 3), + Row(3, null, 4)) + ) + } + } + } + } - sql("CREATE TABLE t (id INT, b INT) TBLPROPERTIES ('row-tracking.enabled' = 'true')") - sql("INSERT INTO t SELECT /*+ REPARTITION(1) */ id, id AS b FROM range(2, 4)") + Seq(false, true).foreach { + bucketEnable => + { + test(s"Data Evolution: merge into table with data-evolution, bucket: $bucketEnable") { + withTable("s", "t") { + sql("CREATE TABLE s (id INT, b INT)") + sql("INSERT INTO s VALUES (1, 11), (2, 22)") + + sql( + s"CREATE TABLE t (id INT, b INT, c INT) TBLPROPERTIES ('row-tracking.enabled' = 'true', 'data-evolution.enabled' = 'true' ${bucketProperties(bucketEnable)})") + sql("INSERT INTO t VALUES (2, 2, 2),(4, 4, 4)") + + sql(""" + |MERGE INTO t + |USING s + |ON t.id = s.id + |WHEN MATCHED THEN UPDATE SET t.b = s.b + |WHEN NOT MATCHED THEN INSERT (id, b, c) VALUES (id, b, 11) + |""".stripMargin) + checkAnswer(sql("SELECT count(*) FROM t"), Seq(Row(3))) + checkAnswer( + sql("SELECT *, _ROW_ID, _SEQUENCE_NUMBER FROM t ORDER BY id"), + Seq(Row(1, 11, 11, 2, 2), Row(2, 22, 2, 0, 2), Row(4, 4, 4, 1, 2)) + ) + } + } + } + } - sql(""" - |MERGE INTO t - |USING s - |ON t.id = s.id - |WHEN MATCHED THEN UPDATE SET * - |""".stripMargin) - checkAnswer( - sql("SELECT *, _ROW_ID, _SEQUENCE_NUMBER FROM t ORDER BY id"), - Seq(Row(2, 22, 0, 2), Row(3, 3, 1, 1)) - ) - } + Seq(false, true).foreach { + bucketEnable => + { + test( + s"Data Evolution: merge into table with data-evolution complex, bucket: $bucketEnable") { + withTable("source", "target") { + sql("CREATE TABLE source (id INT, b INT, c STRING)") + batchInsert( + Seq( + (1, 100, "c11"), + (3, 300, "c33"), + (5, 500, "c55"), + (7, 700, "c77"), + (9, 900, "c99")), + "source") + + sql( + s"CREATE TABLE target (id INT, b INT, c STRING) TBLPROPERTIES ('row-tracking.enabled' = 'true', 'data-evolution.enabled' = 'true' ${bucketProperties(bucketEnable)})") + batchInsert( + Seq((1, 10, "c1"), (2, 20, "c2"), (3, 30, "c3"), (4, 40, "c4"), (5, 50, "c5")), + "target") + + sql(s""" + |MERGE INTO target + |USING source + |ON target.id = source.id + |WHEN MATCHED AND target.id = 5 THEN UPDATE SET b = source.b + target.b + |WHEN MATCHED AND source.c > 'c2' THEN UPDATE SET b = source.b, c = source.c + |WHEN NOT MATCHED AND c > 'c9' THEN INSERT (id, b, c) VALUES (id, b * 1.1, c) + |WHEN NOT MATCHED THEN INSERT * + |""".stripMargin) + checkAnswer( + sql("SELECT *, _ROW_ID, _SEQUENCE_NUMBER FROM target ORDER BY id"), + Seq( + Row(1, 10, "c1", 0, 6), + Row(2, 20, "c2", 1, 6), + Row(3, 300, "c33", 2, 6), + Row(4, 40, "c4", 3, 6), + Row(5, 550, "c5", 4, 6), + Row(7, 700, "c77", 5, 6), + Row(9, 990, "c99", 6, 6)) + ) + } + } + } } - test("Data Evolution: insert into table with data-evolution") { + test( + "Data Evolution: merge into bucket table with data-evolution update bucket key throw exception") { withTable("s", "t") { sql("CREATE TABLE s (id INT, b INT)") sql("INSERT INTO s VALUES (1, 11), (2, 22)") sql( - "CREATE TABLE t (id INT, b INT, c INT) TBLPROPERTIES ('row-tracking.enabled' = 'true', 'data-evolution.enabled' = 'true')") - sql("INSERT INTO t SELECT /*+ REPARTITION(1) */ id, id AS b, id AS c FROM range(2, 4)") - - sql(""" - |MERGE INTO t - |USING s - |ON t.id = s.id - |WHEN NOT MATCHED THEN INSERT (id, b, c) VALUES (id, b, 11) - |""".stripMargin) - - checkAnswer( - sql("SELECT *, _ROW_ID, _SEQUENCE_NUMBER FROM t ORDER BY id"), - Seq(Row(1, 11, 11, 2, 2), Row(2, 2, 2, 0, 1), Row(3, 3, 3, 1, 1)) - ) + "CREATE TABLE t (id INT, b INT, c INT) TBLPROPERTIES (" + + "'row-tracking.enabled' = 'true', " + + "'data-evolution.enabled' = 'true', " + + "'bucket'='2', " + + "'bucket-key'='id')") + + assertThrows[RuntimeException] { + sql(""" + |MERGE INTO t + |USING s + |ON t.id = s.id + |WHEN MATCHED THEN UPDATE SET t.id = s.id + |WHEN NOT MATCHED THEN INSERT (id, b, c) VALUES (id, b, 11) + |""".stripMargin) + } } } - test("Data Evolution: insert into table with data-evolution partial insert") { - withTable("s", "t") { - sql("CREATE TABLE s (id INT, b INT)") - sql("INSERT INTO s VALUES (1, 11), (2, 22)") + Seq(false, true).foreach { + bucketEnable => + { + test(s"Data Evolution: update table throws exception, bucket: $bucketEnable") { + withTable("t") { + sql( + s"CREATE TABLE t (id INT, b INT, c INT) TBLPROPERTIES ('row-tracking.enabled' = 'true', 'data-evolution.enabled' = 'true' ${bucketProperties(bucketEnable)})") + sql("INSERT INTO t SELECT /*+ REPARTITION(1) */ id, id AS b, id AS c FROM range(2, 4)") + assert( + intercept[RuntimeException] { + sql("UPDATE t SET b = 22") + }.getMessage + .contains("Update operation is not supported when data evolution is enabled yet.")) + } + } + } + } - sql( - "CREATE TABLE t (id INT, b INT, c INT) TBLPROPERTIES ('row-tracking.enabled' = 'true', 'data-evolution.enabled' = 'true')") - sql("INSERT INTO t SELECT /*+ REPARTITION(1) */ id, id AS b, id AS c FROM range(2, 4)") - - sql(""" - |MERGE INTO t - |USING s - |ON t.id = s.id - |WHEN NOT MATCHED THEN INSERT (id, b) VALUES (-1, b) - |""".stripMargin) - - sql(""" - |MERGE INTO t - |USING s - |ON t.id = s.id - |WHEN NOT MATCHED THEN INSERT (b) VALUES (b) - |""".stripMargin) - - sql(""" - |MERGE INTO t - |USING s - |ON t.id = s.id - |WHEN NOT MATCHED THEN INSERT (id, c) VALUES (3, 4) - |""".stripMargin) + Seq(false, true).foreach { + bucketEnable => + { + test(s"Data Evolution: compact table throws exception, bucket:$bucketEnable") { + withTable("t") { + sql( + s"CREATE TABLE t (id INT, b INT) TBLPROPERTIES ('row-tracking.enabled' = 'true', 'data-evolution.enabled' = 'true' ${bucketProperties(bucketEnable)})") + for (i <- 1 to 6) { + sql(s"INSERT INTO t VALUES ($i, $i)") + } + assert( + intercept[RuntimeException] { + sql("CALL sys.compact(table => 't')") + }.getMessage + .contains("Compact operation is not supported when data evolution is enabled yet.")) + } + } + } + } - checkAnswer( - sql("SELECT * FROM t ORDER BY id"), - Seq(Row(null, 11, null), Row(-1, 11, null), Row(2, 2, 2), Row(3, 3, 3), Row(3, null, 4)) - ) - } + Seq(false, true).foreach { + bucketEnable => + { + test(s"Data Evolution: delete table throws exception, bucket: $bucketEnable") { + withTable("t") { + sql( + s"CREATE TABLE t (id INT, b INT, c INT) TBLPROPERTIES ('row-tracking.enabled' = 'true', 'data-evolution.enabled' = 'true' ${bucketProperties(bucketEnable)})") + sql("INSERT INTO t SELECT /*+ REPARTITION(1) */ id, id AS b, id AS c FROM range(2, 4)") + assert( + intercept[RuntimeException] { + sql("DELETE FROM t WHERE id = 2") + }.getMessage + .contains("Delete operation is not supported when data evolution is enabled yet.")) + } + } + } } - test("Data Evolution: merge into table with data-evolution") { - withTable("s", "t") { - sql("CREATE TABLE s (id INT, b INT)") - sql("INSERT INTO s VALUES (1, 11), (2, 22)") + Seq(false, true).foreach { + bucketEnable => + { + test(s"Row Tracking: merge into table not matched by source, bucket: $bucketEnable") { + if (gteqSpark3_4) { + withTable("source", "target") { + sql( + s"CREATE TABLE source (id INT, b INT, c STRING) TBLPROPERTIES ('row-tracking.enabled' = 'true' ${bucketProperties(bucketEnable)})") + batchInsert( + Seq( + (1, 100, "c11"), + (3, 300, "c33"), + (5, 500, "c55"), + (7, 700, "c77"), + (9, 900, "c99")), + "source") + + sql( + s"CREATE TABLE target (id INT, b INT, c STRING) TBLPROPERTIES ('row-tracking.enabled' = 'true' ${bucketProperties(bucketEnable)})") + batchInsert( + Seq((1, 10, "c1"), (2, 20, "c2"), (3, 30, "c3"), (4, 40, "c4"), (5, 50, "c5")), + "target") + + sql(s""" + |MERGE INTO target + |USING source + |ON target.id = source.id + |WHEN MATCHED AND target.id = 5 THEN UPDATE SET b = source.b + target.b + |WHEN MATCHED AND source.c > 'c2' THEN UPDATE SET * + |WHEN NOT MATCHED AND c > 'c9' THEN INSERT (id, b, c) VALUES (id, b * 1.1, c) + |WHEN NOT MATCHED THEN INSERT * + |WHEN NOT MATCHED BY SOURCE AND id = 2 THEN UPDATE SET b = b * 10 + |WHEN NOT MATCHED BY SOURCE THEN DELETE + |""".stripMargin) + checkAnswer( + sql("SELECT *, _ROW_ID, _SEQUENCE_NUMBER FROM target ORDER BY id"), + Seq( + Row(1, 10, "c1", 0, 1), + Row(2, 200, "c2", 1, 6), + Row(3, 300, "c33", 2, 6), + Row(5, 550, "c5", 4, 6), + Row(7, 700, "c77", 9, 6), + Row(9, 990, "c99", 10, 6)) + ) + } + } + } + } + } - sql( - "CREATE TABLE t (id INT, b INT, c INT) TBLPROPERTIES ('row-tracking.enabled' = 'true', 'data-evolution.enabled' = 'true')") - sql("INSERT INTO t SELECT /*+ REPARTITION(1) */ id, id AS b, id AS c FROM range(2, 4)") - - sql(""" - |MERGE INTO t - |USING s - |ON t.id = s.id - |WHEN MATCHED THEN UPDATE SET t.b = s.b - |WHEN NOT MATCHED THEN INSERT (id, b, c) VALUES (id, b, 11) - |""".stripMargin) - checkAnswer(sql("SELECT count(*) FROM t"), Seq(Row(3))) - checkAnswer( - sql("SELECT *, _ROW_ID, _SEQUENCE_NUMBER FROM t ORDER BY id"), - Seq(Row(1, 11, 11, 2, 2), Row(2, 22, 2, 0, 2), Row(3, 3, 3, 1, 2)) - ) + def bucketProperties(enableBucket: Boolean): String = { + if (enableBucket) { + ",'bucket'='2','bucket-key'='id'" + } else { + "" } } - test("Data Evolution: merge into table with data-evolution complex") { - withTable("source", "target") { - sql("CREATE TABLE source (a INT, b INT, c STRING)") - sql( - "INSERT INTO source VALUES (1, 100, 'c11'), (3, 300, 'c33'), (5, 500, 'c55'), (7, 700, 'c77'), (9, 900, 'c99')") + def formatValue(value: Any): String = { + if (value == null) return "NULL" - sql( - "CREATE TABLE target (a INT, b INT, c STRING) TBLPROPERTIES ('row-tracking.enabled' = 'true', 'data-evolution.enabled' = 'true')") - sql( - "INSERT INTO target values (1, 10, 'c1'), (2, 20, 'c2'), (3, 30, 'c3'), (4, 40, 'c4'), (5, 50, 'c5')") - - sql(s""" - |MERGE INTO target - |USING source - |ON target.a = source.a - |WHEN MATCHED AND target.a = 5 THEN UPDATE SET b = source.b + target.b - |WHEN MATCHED AND source.c > 'c2' THEN UPDATE SET b = source.b, c = source.c - |WHEN NOT MATCHED AND c > 'c9' THEN INSERT (a, b, c) VALUES (a, b * 1.1, c) - |WHEN NOT MATCHED THEN INSERT * - |""".stripMargin) - checkAnswer( - sql("SELECT *, _ROW_ID, _SEQUENCE_NUMBER FROM target ORDER BY a"), - Seq( - Row(1, 10, "c1", 0, 2), - Row(2, 20, "c2", 1, 2), - Row(3, 300, "c33", 2, 2), - Row(4, 40, "c4", 3, 2), - Row(5, 550, "c5", 4, 2), - Row(7, 700, "c77", 5, 2), - Row(9, 990, "c99", 6, 2)) - ) - } - } + value match { + case b: java.lang.Boolean => if (b.booleanValue()) "TRUE" else "FALSE" + case b: Boolean => if (b) "TRUE" else "FALSE" - test("Data Evolution: update table throws exception") { - withTable("t") { - sql( - "CREATE TABLE t (id INT, b INT, c INT) TBLPROPERTIES ('row-tracking.enabled' = 'true', 'data-evolution.enabled' = 'true')") - sql("INSERT INTO t SELECT /*+ REPARTITION(1) */ id, id AS b, id AS c FROM range(2, 4)") - assert( - intercept[RuntimeException] { - sql("UPDATE t SET b = 22") - }.getMessage - .contains("Update operation is not supported when data evolution is enabled yet.")) - } - } + case n: Byte => n.toString + case n: Short => n.toString + case n: Int => n.toString + case n: Long => n.toString + case n: Float => n.toString + case n: Double => n.toString + case n: BigInt => n.toString + case n: BigDecimal => n.bigDecimal.toPlainString - test("Data Evolution: compact table throws exception") { - withTable("t") { - sql( - "CREATE TABLE t (id INT, b INT) TBLPROPERTIES ('row-tracking.enabled' = 'true', 'data-evolution.enabled' = 'true')") - for (i <- 1 to 6) { - sql(s"INSERT INTO t VALUES ($i, $i)") - } - assert( - intercept[RuntimeException] { - sql("CALL sys.compact(table => 't')") - }.getMessage - .contains("Compact operation is not supported when data evolution is enabled yet.")) - } - } + case s: String => s"'${escapeSingleQuotes(s)}'" + case c: Char => s"'${escapeSingleQuotes(c.toString)}'" - test("Data Evolution: delete table throws exception") { - withTable("t") { - sql( - "CREATE TABLE t (id INT, b INT, c INT) TBLPROPERTIES ('row-tracking.enabled' = 'true', 'data-evolution.enabled' = 'true')") - sql("INSERT INTO t SELECT /*+ REPARTITION(1) */ id, id AS b, id AS c FROM range(2, 4)") - assert( - intercept[RuntimeException] { - sql("DELETE FROM t WHERE id = 2") - }.getMessage - .contains("Delete operation is not supported when data evolution is enabled yet.")) + case other => s"'${escapeSingleQuotes(String.valueOf(other))}'" } } - test("Row Tracking: merge into table not matched by source") { - if (gteqSpark3_4) { - withTable("source", "target") { - sql( - "CREATE TABLE source (a INT, b INT, c STRING) TBLPROPERTIES ('row-tracking.enabled' = 'true')") - sql( - "INSERT INTO source VALUES (1, 100, 'c11'), (3, 300, 'c33'), (5, 500, 'c55'), (7, 700, 'c77'), (9, 900, 'c99')") - - sql( - "CREATE TABLE target (a INT, b INT, c STRING) TBLPROPERTIES ('row-tracking.enabled' = 'true')") - sql( - "INSERT INTO target values (1, 10, 'c1'), (2, 20, 'c2'), (3, 30, 'c3'), (4, 40, 'c4'), (5, 50, 'c5')") - - sql(s""" - |MERGE INTO target - |USING source - |ON target.a = source.a - |WHEN MATCHED AND target.a = 5 THEN UPDATE SET b = source.b + target.b - |WHEN MATCHED AND source.c > 'c2' THEN UPDATE SET * - |WHEN NOT MATCHED AND c > 'c9' THEN INSERT (a, b, c) VALUES (a, b * 1.1, c) - |WHEN NOT MATCHED THEN INSERT * - |WHEN NOT MATCHED BY SOURCE AND a = 2 THEN UPDATE SET b = b * 10 - |WHEN NOT MATCHED BY SOURCE THEN DELETE - |""".stripMargin) - checkAnswer( - sql("SELECT *, _ROW_ID, _SEQUENCE_NUMBER FROM target ORDER BY a"), - Seq( - Row(1, 10, "c1", 0, 1), - Row(2, 200, "c2", 1, 2), - Row(3, 300, "c33", 2, 2), - Row(5, 550, "c5", 4, 2), - Row(7, 700, "c77", 9, 2), - Row(9, 990, "c99", 10, 2)) - ) + def batchInsert(rows: Seq[Product], table: String): String = { + rows + .map { + p => + val values = p.productIterator.toList + val args = values.map(formatValue).mkString(", ") + sql(s"INSERT INTO $table VALUES ($args)") } - } + .mkString("") } + + private def escapeSingleQuotes(s: String): String = s.replace("'", "''") + } From 1b4b945a914403272b76c82c8cdd02f424cb87ae Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=8F=B2=E5=A4=A7=E6=B4=8B?= Date: Sun, 4 Jan 2026 20:09:00 +0800 Subject: [PATCH 02/11] [spark] Support data evolution for bucket table --- paimon-api/src/main/java/org/apache/paimon/CoreOptions.java | 4 ++++ .../apache/paimon/operation/BucketedAppendFileStoreWrite.java | 2 +- .../java/org/apache/paimon/operation/FileStoreCommitImpl.java | 4 +++- .../main/java/org/apache/paimon/schema/SchemaValidation.java | 4 ++++ 4 files changed, 12 insertions(+), 2 deletions(-) diff --git a/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java b/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java index 648abc0bc663..f1970a7e337d 100644 --- a/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java +++ b/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java @@ -2108,6 +2108,10 @@ public BucketFunctionType bucketFunctionType() { return options.get(BUCKET_FUNCTION_TYPE); } + public boolean bucketAppendOrdered() { + return options.get(BUCKET_APPEND_ORDERED); + } + public Path path() { return path(options.toMap()); } diff --git a/paimon-core/src/main/java/org/apache/paimon/operation/BucketedAppendFileStoreWrite.java b/paimon-core/src/main/java/org/apache/paimon/operation/BucketedAppendFileStoreWrite.java index 23e4357f21a8..5459550af3ad 100644 --- a/paimon-core/src/main/java/org/apache/paimon/operation/BucketedAppendFileStoreWrite.java +++ b/paimon-core/src/main/java/org/apache/paimon/operation/BucketedAppendFileStoreWrite.java @@ -78,7 +78,7 @@ protected CompactManager getCompactManager( List restoredFiles, ExecutorService compactExecutor, @Nullable BucketedDvMaintainer dvMaintainer) { - if (options.writeOnly() || options.dataEvolutionEnabled()) { + if (options.writeOnly() || options.bucketAppendOrdered()) { return new NoopCompactManager(); } else { Function dvFactory = diff --git a/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java b/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java index 2a33ac264c25..52fa37bf43d0 100644 --- a/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java +++ b/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java @@ -29,6 +29,7 @@ import org.apache.paimon.io.DataFilePathFactory; import org.apache.paimon.manifest.FileEntry; import org.apache.paimon.manifest.FileKind; +import org.apache.paimon.manifest.FileSource; import org.apache.paimon.manifest.IndexManifestEntry; import org.apache.paimon.manifest.IndexManifestFile; import org.apache.paimon.manifest.ManifestCommittable; @@ -1283,7 +1284,8 @@ private long assignRowTrackingMeta( checkArgument( entry.file().fileSource().isPresent(), "This is a bug, file source field for row-tracking table must present."); - if (entry.file().firstRowId() == null) { + if (entry.file().fileSource().get().equals(FileSource.APPEND) + && entry.file().firstRowId() == null) { if (isBlobFile(entry.file().fileName())) { if (blobStart >= start) { throw new IllegalStateException( diff --git a/paimon-core/src/main/java/org/apache/paimon/schema/SchemaValidation.java b/paimon-core/src/main/java/org/apache/paimon/schema/SchemaValidation.java index 69e5e1867715..78abb12ba635 100644 --- a/paimon-core/src/main/java/org/apache/paimon/schema/SchemaValidation.java +++ b/paimon-core/src/main/java/org/apache/paimon/schema/SchemaValidation.java @@ -635,6 +635,10 @@ private static void validateRowTracking(TableSchema schema, CoreOptions options) schema.primaryKeys().isEmpty(), "Cannot define %s for row tracking table.", PRIMARY_KEY.key()); + if (options.bucket() != -1) { + checkArgument(!options.bucketAppendOrdered(), + "Row tracking config must disabled bucket-append-ordered in bucket table"); + } } if (options.dataEvolutionEnabled()) { From 8680fb46eda599c89080de32a6484f8cb63beb83 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=8F=B2=E5=A4=A7=E6=B4=8B?= Date: Sun, 4 Jan 2026 20:36:01 +0800 Subject: [PATCH 03/11] [spark] Support data evolution for bucket table --- .../org/apache/paimon/operation/FileStoreCommitImpl.java | 2 +- .../java/org/apache/paimon/schema/SchemaValidation.java | 5 +++-- .../org/apache/paimon/spark/sql/RowTrackingTestBase.scala | 6 +++--- 3 files changed, 7 insertions(+), 6 deletions(-) diff --git a/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java b/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java index 8b4051d5737a..fc89ff1ecde3 100644 --- a/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java +++ b/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java @@ -1273,7 +1273,7 @@ private long assignRowTrackingMeta( entry.file().fileSource().isPresent(), "This is a bug, file source field for row-tracking table must present."); if (entry.file().fileSource().get().equals(FileSource.APPEND) - && entry.file().firstRowId() == null) { + && entry.file().firstRowId() == null) { if (isBlobFile(entry.file().fileName())) { if (blobStart >= start) { throw new IllegalStateException( diff --git a/paimon-core/src/main/java/org/apache/paimon/schema/SchemaValidation.java b/paimon-core/src/main/java/org/apache/paimon/schema/SchemaValidation.java index 78abb12ba635..ea7d7ac9e6a9 100644 --- a/paimon-core/src/main/java/org/apache/paimon/schema/SchemaValidation.java +++ b/paimon-core/src/main/java/org/apache/paimon/schema/SchemaValidation.java @@ -636,8 +636,9 @@ private static void validateRowTracking(TableSchema schema, CoreOptions options) "Cannot define %s for row tracking table.", PRIMARY_KEY.key()); if (options.bucket() != -1) { - checkArgument(!options.bucketAppendOrdered(), - "Row tracking config must disabled bucket-append-ordered in bucket table"); + checkArgument( + !options.bucketAppendOrdered(), + "Row tracking config must disabled bucket-append-ordered in bucket table"); } } diff --git a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/RowTrackingTestBase.scala b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/RowTrackingTestBase.scala index 4fd6137e975e..460d0c99a716 100644 --- a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/RowTrackingTestBase.scala +++ b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/RowTrackingTestBase.scala @@ -23,9 +23,7 @@ import org.apache.paimon.spark.PaimonSparkTestBase import org.apache.spark.sql.Row import org.apache.spark.sql.catalyst.plans.logical.{Join, LogicalPlan} -import org.apache.spark.sql.execution.{QueryExecution, SparkPlan} -import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec -import org.apache.spark.sql.execution.joins.BaseJoinExec +import org.apache.spark.sql.execution.QueryExecution import org.apache.spark.sql.util.QueryExecutionListener import scala.collection.mutable @@ -552,6 +550,8 @@ abstract class RowTrackingTestBase extends PaimonSparkTestBase { }.getMessage .contains("Update operation is not supported when data evolution is enabled yet.")) } + } + Seq(false, true).foreach { bucketEnable => { From 086538332851b9adfbdf84dc41ea30c73f0a7716 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=8F=B2=E5=A4=A7=E6=B4=8B?= Date: Mon, 5 Jan 2026 21:46:56 +0800 Subject: [PATCH 04/11] [spark] Support data evolution for bucket table --- docs/content/append-table/row-tracking.md | 1 + .../java/org/apache/paimon/CoreOptions.java | 4 - .../DataEvolutionCompactCoordinator.java | 164 ++++++++++-------- .../DataEvolutionCompactTask.java | 10 +- .../DataEvolutionCompactTaskSerializer.java | 2 + .../BucketedAppendFileStoreWrite.java | 2 +- .../DataEvolutionCompactCoordinatorTest.java | 6 +- .../spark/procedure/CompactProcedure.java | 17 +- .../commands/DataEvolutionPaimonWriter.scala | 4 +- .../write/DataEvolutionTableDataWrite.scala | 2 +- .../spark/sql/RowTrackingTestBase.scala | 131 +++++++------- 11 files changed, 183 insertions(+), 160 deletions(-) diff --git a/docs/content/append-table/row-tracking.md b/docs/content/append-table/row-tracking.md index aa804f98e70e..ed04010c5fcc 100644 --- a/docs/content/append-table/row-tracking.md +++ b/docs/content/append-table/row-tracking.md @@ -48,6 +48,7 @@ WITH ('row-tracking.enabled' = 'true'); ``` Notice that: - Row tracking is only supported for append tables, not for primary key tables. +- Config bucket-append-ordered must be set to false when using bucket tables. - Only spark support update, merge into and delete operations on row-tracking tables, Flink SQL does not support these operations yet. - This function is experimental, this line will be removed after being stable. diff --git a/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java b/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java index 835811933e20..78270d7523bc 100644 --- a/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java +++ b/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java @@ -2132,10 +2132,6 @@ public BucketFunctionType bucketFunctionType() { return options.get(BUCKET_FUNCTION_TYPE); } - public boolean bucketAppendOrdered() { - return options.get(BUCKET_APPEND_ORDERED); - } - public Path path() { return path(options.toMap()); } diff --git a/paimon-core/src/main/java/org/apache/paimon/append/dataevolution/DataEvolutionCompactCoordinator.java b/paimon-core/src/main/java/org/apache/paimon/append/dataevolution/DataEvolutionCompactCoordinator.java index 14bc83c547b4..6259fd45e7a1 100644 --- a/paimon-core/src/main/java/org/apache/paimon/append/dataevolution/DataEvolutionCompactCoordinator.java +++ b/paimon-core/src/main/java/org/apache/paimon/append/dataevolution/DataEvolutionCompactCoordinator.java @@ -143,98 +143,115 @@ static class CompactPlanner { List compactPlan(List input) { List tasks = new ArrayList<>(); - Map> partitionedFiles = new LinkedHashMap<>(); + Map>> partitionedFiles = + new LinkedHashMap<>(); for (ManifestEntry entry : input) { partitionedFiles - .computeIfAbsent(entry.partition(), k -> new ArrayList<>()) + .computeIfAbsent(entry.partition(), k -> new LinkedHashMap<>()) + .computeIfAbsent(entry.bucket(), k -> new ArrayList<>()) .add(entry.file()); } - for (Map.Entry> partitionFiles : + for (Map.Entry>> partitionFiles : partitionedFiles.entrySet()) { - BinaryRow partition = partitionFiles.getKey(); - List files = partitionFiles.getValue(); - RangeHelper rangeHelper = - new RangeHelper<>( - DataFileMeta::nonNullFirstRowId, - // merge adjacent files - f -> f.nonNullFirstRowId() + f.rowCount()); + for (Map.Entry> bucketFiles : + partitionFiles.getValue().entrySet()) { + BinaryRow partition = partitionFiles.getKey(); + int bucket = bucketFiles.getKey(); + List files = bucketFiles.getValue(); + RangeHelper rangeHelper = + new RangeHelper<>( + DataFileMeta::nonNullFirstRowId, + // merge adjacent files + f -> f.nonNullFirstRowId() + f.rowCount()); - List> ranges = rangeHelper.mergeOverlappingRanges(files); + List> ranges = rangeHelper.mergeOverlappingRanges(files); - for (List group : ranges) { - List dataFiles = new ArrayList<>(); - List blobFiles = new ArrayList<>(); - TreeMap treeMap = new TreeMap<>(); - Map> dataFileToBlobFiles = new HashMap<>(); - for (DataFileMeta f : group) { - if (!isBlobFile(f.fileName())) { - treeMap.put(f.nonNullFirstRowId(), f); - dataFiles.add(f); - } else { - blobFiles.add(f); + for (List group : ranges) { + List dataFiles = new ArrayList<>(); + List blobFiles = new ArrayList<>(); + TreeMap treeMap = new TreeMap<>(); + Map> dataFileToBlobFiles = new HashMap<>(); + for (DataFileMeta f : group) { + if (!isBlobFile(f.fileName())) { + treeMap.put(f.nonNullFirstRowId(), f); + dataFiles.add(f); + } else { + blobFiles.add(f); + } } - } - if (compactBlob) { - // associate blob files to data files - for (DataFileMeta blobFile : blobFiles) { - Long key = treeMap.floorKey(blobFile.nonNullFirstRowId()); - if (key != null) { - DataFileMeta dataFile = treeMap.get(key); - if (blobFile.nonNullFirstRowId() >= dataFile.nonNullFirstRowId() - && blobFile.nonNullFirstRowId() - <= dataFile.nonNullFirstRowId() - + dataFile.rowCount() - - 1) { - dataFileToBlobFiles - .computeIfAbsent(dataFile, k -> new ArrayList<>()) - .add(blobFile); + if (compactBlob) { + // associate blob files to data files + for (DataFileMeta blobFile : blobFiles) { + Long key = treeMap.floorKey(blobFile.nonNullFirstRowId()); + if (key != null) { + DataFileMeta dataFile = treeMap.get(key); + if (blobFile.nonNullFirstRowId() >= dataFile.nonNullFirstRowId() + && blobFile.nonNullFirstRowId() + <= dataFile.nonNullFirstRowId() + + dataFile.rowCount() + - 1) { + dataFileToBlobFiles + .computeIfAbsent(dataFile, k -> new ArrayList<>()) + .add(blobFile); + } } } } - } - RangeHelper rangeHelper2 = - new RangeHelper<>( - DataFileMeta::nonNullFirstRowId, - // files group - f -> f.nonNullFirstRowId() + f.rowCount() - 1); - List> groupedFiles = - rangeHelper2.mergeOverlappingRanges(dataFiles); - List waitCompactFiles = new ArrayList<>(); + RangeHelper rangeHelper2 = + new RangeHelper<>( + DataFileMeta::nonNullFirstRowId, + // files group + f -> f.nonNullFirstRowId() + f.rowCount() - 1); + List> groupedFiles = + rangeHelper2.mergeOverlappingRanges(dataFiles); + List waitCompactFiles = new ArrayList<>(); - long weightSum = 0L; - for (List fileGroup : groupedFiles) { - checkArgument( - rangeHelper.areAllRangesSame(fileGroup), - "Data files %s should be all row id ranges same.", - dataFiles); - long currentGroupWeight = - fileGroup.stream() - .mapToLong(d -> Math.max(d.fileSize(), openFileCost)) - .sum(); - if (currentGroupWeight > targetFileSize) { - // compact current file group to merge field files - tasks.addAll(triggerTask(fileGroup, partition, dataFileToBlobFiles)); - // compact wait compact files - tasks.addAll( - triggerTask(waitCompactFiles, partition, dataFileToBlobFiles)); - waitCompactFiles = new ArrayList<>(); - weightSum = 0; - } else { - weightSum += currentGroupWeight; - waitCompactFiles.addAll(fileGroup); - if (weightSum > targetFileSize) { + long weightSum = 0L; + for (List fileGroup : groupedFiles) { + checkArgument( + rangeHelper.areAllRangesSame(fileGroup), + "Data files %s should be all row id ranges same.", + dataFiles); + long currentGroupWeight = + fileGroup.stream() + .mapToLong(d -> Math.max(d.fileSize(), openFileCost)) + .sum(); + if (currentGroupWeight > targetFileSize) { + // compact current file group to merge field files tasks.addAll( triggerTask( - waitCompactFiles, partition, dataFileToBlobFiles)); + fileGroup, partition, bucket, dataFileToBlobFiles)); + // compact wait compact files + tasks.addAll( + triggerTask( + waitCompactFiles, + partition, + bucket, + dataFileToBlobFiles)); waitCompactFiles = new ArrayList<>(); - weightSum = 0L; + weightSum = 0; + } else { + weightSum += currentGroupWeight; + waitCompactFiles.addAll(fileGroup); + if (weightSum > targetFileSize) { + tasks.addAll( + triggerTask( + waitCompactFiles, + partition, + bucket, + dataFileToBlobFiles)); + waitCompactFiles = new ArrayList<>(); + weightSum = 0L; + } } } + tasks.addAll( + triggerTask( + waitCompactFiles, partition, bucket, dataFileToBlobFiles)); } - tasks.addAll(triggerTask(waitCompactFiles, partition, dataFileToBlobFiles)); } } return tasks; @@ -243,10 +260,11 @@ List compactPlan(List input) { private List triggerTask( List dataFiles, BinaryRow partition, + int bucket, Map> dataFileToBlobFiles) { List tasks = new ArrayList<>(); if (dataFiles.size() >= compactMinFileNum) { - tasks.add(new DataEvolutionCompactTask(partition, dataFiles, false)); + tasks.add(new DataEvolutionCompactTask(partition, bucket, dataFiles, false)); } if (compactBlob) { @@ -256,7 +274,7 @@ private List triggerTask( dataFileToBlobFiles.getOrDefault(dataFile, Collections.emptyList())); } if (blobFiles.size() >= compactMinFileNum) { - tasks.add(new DataEvolutionCompactTask(partition, blobFiles, true)); + tasks.add(new DataEvolutionCompactTask(partition, bucket, blobFiles, true)); } } return tasks; diff --git a/paimon-core/src/main/java/org/apache/paimon/append/dataevolution/DataEvolutionCompactTask.java b/paimon-core/src/main/java/org/apache/paimon/append/dataevolution/DataEvolutionCompactTask.java index d4da47bdc574..639a60d826c0 100644 --- a/paimon-core/src/main/java/org/apache/paimon/append/dataevolution/DataEvolutionCompactTask.java +++ b/paimon-core/src/main/java/org/apache/paimon/append/dataevolution/DataEvolutionCompactTask.java @@ -52,13 +52,15 @@ public class DataEvolutionCompactTask { Collections.singletonMap(CoreOptions.TARGET_FILE_SIZE.key(), "99999 G"); private final BinaryRow partition; + private final int bucket; private final List compactBefore; private final List compactAfter; private final boolean blobTask; public DataEvolutionCompactTask( - BinaryRow partition, List files, boolean blobTask) { + BinaryRow partition, int bucket, List files, boolean blobTask) { this.partition = partition; + this.bucket = bucket; this.compactBefore = new ArrayList<>(files); this.compactAfter = new ArrayList<>(); this.blobTask = blobTask; @@ -68,6 +70,10 @@ public BinaryRow partition() { return partition; } + public int getBucket() { + return bucket; + } + public List compactBefore() { return compactBefore; } @@ -100,7 +106,7 @@ public CommitMessage doCompact(FileStoreTable table, String commitUser) throws E DataSplit dataSplit = DataSplit.builder() .withPartition(partition) - .withBucket(0) + .withBucket(bucket) .withDataFiles(compactBefore) .withBucketPath(pathFactory.bucketPath(partition, 0).toString()) .rawConvertible(false) diff --git a/paimon-core/src/main/java/org/apache/paimon/append/dataevolution/DataEvolutionCompactTaskSerializer.java b/paimon-core/src/main/java/org/apache/paimon/append/dataevolution/DataEvolutionCompactTaskSerializer.java index 821f9f15e741..2c59680822b6 100644 --- a/paimon-core/src/main/java/org/apache/paimon/append/dataevolution/DataEvolutionCompactTaskSerializer.java +++ b/paimon-core/src/main/java/org/apache/paimon/append/dataevolution/DataEvolutionCompactTaskSerializer.java @@ -68,6 +68,7 @@ public void serializeList(List list, DataOutputView vi private void serialize(DataEvolutionCompactTask task, DataOutputView view) throws IOException { serializeBinaryRow(task.partition(), view); + view.writeInt(task.getBucket()); dataFileSerializer.serializeList(task.compactBefore(), view); view.writeBoolean(task.isBlobTask()); } @@ -105,6 +106,7 @@ private void checkVersion(int version) { private DataEvolutionCompactTask deserialize(DataInputView view) throws IOException { return new DataEvolutionCompactTask( deserializeBinaryRow(view), + view.readInt(), dataFileSerializer.deserializeList(view), view.readBoolean()); } diff --git a/paimon-core/src/main/java/org/apache/paimon/operation/BucketedAppendFileStoreWrite.java b/paimon-core/src/main/java/org/apache/paimon/operation/BucketedAppendFileStoreWrite.java index 719b4aecea96..be3d8843abc1 100644 --- a/paimon-core/src/main/java/org/apache/paimon/operation/BucketedAppendFileStoreWrite.java +++ b/paimon-core/src/main/java/org/apache/paimon/operation/BucketedAppendFileStoreWrite.java @@ -92,7 +92,7 @@ protected CompactManager getCompactManager( List restoredFiles, ExecutorService compactExecutor, @Nullable BucketedDvMaintainer dvMaintainer) { - if (options.writeOnly() || options.bucketAppendOrdered()) { + if (options.writeOnly() || !options.bucketAppendOrdered()) { return new NoopCompactManager(); } else { Function dvFactory = diff --git a/paimon-core/src/test/java/org/apache/paimon/append/dataevolution/DataEvolutionCompactCoordinatorTest.java b/paimon-core/src/test/java/org/apache/paimon/append/dataevolution/DataEvolutionCompactCoordinatorTest.java index de4545c6df8f..1767e187618b 100644 --- a/paimon-core/src/test/java/org/apache/paimon/append/dataevolution/DataEvolutionCompactCoordinatorTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/append/dataevolution/DataEvolutionCompactCoordinatorTest.java @@ -247,7 +247,7 @@ public void testSerializerBasic() throws IOException { createDataFileMeta("file2.parquet", 100L, 100L, 0, 1024)); DataEvolutionCompactTask task = - new DataEvolutionCompactTask(BinaryRow.EMPTY_ROW, files, false); + new DataEvolutionCompactTask(BinaryRow.EMPTY_ROW, 0, files, false); byte[] bytes = serializer.serialize(task); DataEvolutionCompactTask deserialized = @@ -266,7 +266,7 @@ public void testSerializerBlobTask() throws IOException { createDataFileMeta("file2.blob", 0L, 100L, 0, 1024)); DataEvolutionCompactTask task = - new DataEvolutionCompactTask(BinaryRow.EMPTY_ROW, files, true); + new DataEvolutionCompactTask(BinaryRow.EMPTY_ROW, 0, files, true); byte[] bytes = serializer.serialize(task); DataEvolutionCompactTask deserialized = @@ -286,7 +286,7 @@ public void testSerializerWithPartition() throws IOException { createDataFileMeta("file2.parquet", 100L, 100L, 0, 1024)); BinaryRow partition = BinaryRow.singleColumn(42); - DataEvolutionCompactTask task = new DataEvolutionCompactTask(partition, files, false); + DataEvolutionCompactTask task = new DataEvolutionCompactTask(partition, 0, files, false); byte[] bytes = serializer.serialize(task); DataEvolutionCompactTask deserialized = diff --git a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/CompactProcedure.java b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/CompactProcedure.java index 7785735d04fd..547b61877d4d 100644 --- a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/CompactProcedure.java +++ b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/CompactProcedure.java @@ -259,12 +259,17 @@ private boolean execute( switch (bucketMode) { case HASH_FIXED: case HASH_DYNAMIC: - compactAwareBucketTable( - table, - fullCompact, - partitionPredicate, - partitionIdleTime, - javaSparkContext); + if (table.coreOptions().dataEvolutionEnabled()) { + compactDataEvolutionTable( + table, partitionPredicate, partitionIdleTime, javaSparkContext); + } else { + compactAwareBucketTable( + table, + fullCompact, + partitionPredicate, + partitionIdleTime, + javaSparkContext); + } break; case BUCKET_UNAWARE: if (table.coreOptions().dataEvolutionEnabled()) { diff --git a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/DataEvolutionPaimonWriter.scala b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/DataEvolutionPaimonWriter.scala index 6407faec6b3a..49ff8f9979b1 100644 --- a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/DataEvolutionPaimonWriter.scala +++ b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/DataEvolutionPaimonWriter.scala @@ -52,9 +52,7 @@ case class DataEvolutionPaimonWriter(paimonTable: FileStoreTable) extends WriteH .forEachRemaining( k => firstRowIdToPartitionMap - .put( - k.file().firstRowId(), - (k.partition(), k.bucket(), k.file().rowCount()))) + .put(k.file().firstRowId(), (k.partition(), k.bucket(), k.file().rowCount()))) firstRowIdToPartitionMap } diff --git a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/write/DataEvolutionTableDataWrite.scala b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/write/DataEvolutionTableDataWrite.scala index 913a009a6fad..b204b31bc9ea 100644 --- a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/write/DataEvolutionTableDataWrite.scala +++ b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/write/DataEvolutionTableDataWrite.scala @@ -39,7 +39,7 @@ import scala.collection.mutable.ListBuffer case class DataEvolutionTableDataWrite( writeBuilder: BatchWriteBuilder, writeType: RowType, - firstRowIdToPartitionMap: mutable.HashMap[Long, (BinaryRow, Long)], + firstRowIdToPartitionMap: mutable.HashMap[Long, (BinaryRow, Int, Long)], blobAsDescriptor: Boolean, catalogContext: CatalogContext) extends InnerTableV1DataWrite { diff --git a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/RowTrackingTestBase.scala b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/RowTrackingTestBase.scala index dd363696f1d4..b82449ecafc6 100644 --- a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/RowTrackingTestBase.scala +++ b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/RowTrackingTestBase.scala @@ -82,43 +82,43 @@ abstract class RowTrackingTestBase extends PaimonSparkTestBase { batchInsert(Seq((1, 1), (2, 2), (3, 3)), "t") checkAnswer( sql("SELECT *, _ROW_ID, _SEQUENCE_NUMBER FROM t ORDER BY id"), - Seq(Row(1, 1, 3, 3), Row(2, 2, 4, 3), Row(3, 3, 5, 4)) + Seq(Row(1, 1, 0, 1), Row(2, 2, 1, 2), Row(3, 3, 2, 3)) ) } } Seq(false, true).foreach { bucketEnable => - { - test(s"Row Tracking: delete table, bucket: $bucketEnable") { - withTable("t") { - // only enable row tracking - sql("CREATE TABLE t (id INT, data INT) TBLPROPERTIES ('row-tracking.enabled' = 'true')") - runAndCheckAnswer() - sql("DROP TABLE t") - - // enable row tracking and deletion vectors - sql( - "CREATE TABLE t (id INT, data INT) TBLPROPERTIES ('row-tracking.enabled' = 'true', 'deletion-vectors.enabled' = 'true')") - runAndCheckAnswer() - - def runAndCheckAnswer(): Unit = { - batchInsert(Seq((1, 1), (2, 2), (3, 3)), "t") + { + test(s"Row Tracking: delete table, bucket: $bucketEnable") { + withTable("t") { + // only enable row tracking + sql("CREATE TABLE t (id INT, data INT) TBLPROPERTIES ('row-tracking.enabled' = 'true')") + runAndCheckAnswer() + sql("DROP TABLE t") - sql("DELETE FROM t WHERE id = 2") - checkAnswer( - sql("SELECT *, _ROW_ID, _SEQUENCE_NUMBER FROM t ORDER BY id"), - Seq(Row(1, 1, 0, 1), Row(3, 3, 2, 3)) - ) - sql("DELETE FROM t WHERE _ROW_ID = 2") - checkAnswer( - sql("SELECT *, _ROW_ID, _SEQUENCE_NUMBER FROM t ORDER BY id"), - Seq(Row(1, 1, 0, 1)) - ) + // enable row tracking and deletion vectors + sql( + "CREATE TABLE t (id INT, data INT) TBLPROPERTIES ('row-tracking.enabled' = 'true', 'deletion-vectors.enabled' = 'true')") + runAndCheckAnswer() + + def runAndCheckAnswer(): Unit = { + batchInsert(Seq((1, 1), (2, 2), (3, 3)), "t") + + sql("DELETE FROM t WHERE id = 2") + checkAnswer( + sql("SELECT *, _ROW_ID, _SEQUENCE_NUMBER FROM t ORDER BY id"), + Seq(Row(1, 1, 0, 1), Row(3, 3, 2, 3)) + ) + sql("DELETE FROM t WHERE _ROW_ID = 2") + checkAnswer( + sql("SELECT *, _ROW_ID, _SEQUENCE_NUMBER FROM t ORDER BY id"), + Seq(Row(1, 1, 0, 1)) + ) + } } } } - } } Seq(false, true).foreach { @@ -158,6 +158,7 @@ abstract class RowTrackingTestBase extends PaimonSparkTestBase { } } } + } Seq(false, true).foreach { bucketEnable => @@ -334,7 +335,7 @@ abstract class RowTrackingTestBase extends PaimonSparkTestBase { checkAnswer( sql("SELECT *, _ROW_ID, _SEQUENCE_NUMBER FROM t ORDER BY id"), - Seq(Row(1, 11, 11, 2, 3), Row(2, 2, 2, 0, 1), Row(4, 4, 4, 1, 2)) + Seq(Row(1, 11, 11, 2, 2), Row(2, 2, 2, 0, 1), Row(4, 4, 4, 1, 1)) ) } } @@ -399,7 +400,7 @@ abstract class RowTrackingTestBase extends PaimonSparkTestBase { sql( s"CREATE TABLE t (id INT, b INT, c INT) TBLPROPERTIES ('row-tracking.enabled' = 'true', 'data-evolution.enabled' = 'true' ${bucketProperties(bucketEnable)})") - sql("INSERT INTO t VALUES (2, 2, 2),(4, 4, 4)") + sql("INSERT INTO t SELECT /*+ REPARTITION(1) */ * FROM VALUES (2, 2, 2),(4, 4, 4)") sql(""" |MERGE INTO t @@ -453,9 +454,9 @@ abstract class RowTrackingTestBase extends PaimonSparkTestBase { sql("SELECT *, _ROW_ID, _SEQUENCE_NUMBER FROM target ORDER BY id"), Seq( Row(1, 10, "c1", 0, 6), - Row(2, 20, "c2", 1, 6), + Row(2, 20, "c2", 1, 2), Row(3, 300, "c33", 2, 6), - Row(4, 40, "c4", 3, 6), + Row(4, 40, "c4", 3, 4), Row(5, 550, "c5", 4, 6), Row(7, 700, "c77", 5, 6), Row(9, 990, "c99", 6, 6)) @@ -472,11 +473,7 @@ abstract class RowTrackingTestBase extends PaimonSparkTestBase { sql("INSERT INTO s VALUES (1, 11), (2, 22)") sql( - "CREATE TABLE t (id INT, b INT, c INT) TBLPROPERTIES (" + - "'row-tracking.enabled' = 'true', " + - "'data-evolution.enabled' = 'true', " + - "'bucket'='2', " + - "'bucket-key'='id')") + s"CREATE TABLE t (id INT, b INT, c INT) TBLPROPERTIES ('row-tracking.enabled' = 'true', 'data-evolution.enabled' = 'true' ${bucketProperties(true)})") assertThrows[RuntimeException] { sql(""" @@ -543,6 +540,7 @@ abstract class RowTrackingTestBase extends PaimonSparkTestBase { findSplitsPlan = qe.analyzed } } + override def onFailure(funcName: String, qe: QueryExecution, exception: Exception): Unit = { if (qe.analyzed.collectFirst { case _: Deduplicate => true }.nonEmpty) { latch.countDown() @@ -596,6 +594,7 @@ abstract class RowTrackingTestBase extends PaimonSparkTestBase { updatePlan = qe.analyzed } } + override def onFailure(funcName: String, qe: QueryExecution, exception: Exception): Unit = { if (qe.analyzed.collectFirst { case _: MergeRows => true }.nonEmpty) { latch.countDown() @@ -638,42 +637,41 @@ abstract class RowTrackingTestBase extends PaimonSparkTestBase { } } - Seq(false, true).foreach { - bucketEnable => - { - test(s"Data Evolution: update table throws exception, bucket: $bucketEnable") { - withTable("t") { - sql( - s"CREATE TABLE t (id INT, b INT, c INT) TBLPROPERTIES ('row-tracking.enabled' = 'true', 'data-evolution.enabled' = 'true' ${bucketProperties(bucketEnable)})") - sql("INSERT INTO t SELECT /*+ REPARTITION(1) */ id, id AS b, id AS c FROM range(2, 4)") - assert( - intercept[RuntimeException] { - sql("UPDATE t SET b = 22") - }.getMessage - .contains("Update operation is not supported when data evolution is enabled yet.")) - } + Seq(false, true).foreach { + bucketEnable => + { + test(s"Data Evolution: update table throws exception, bucket: $bucketEnable") { + withTable("t") { + sql( + s"CREATE TABLE t (id INT, b INT, c INT) TBLPROPERTIES ('row-tracking.enabled' = 'true', 'data-evolution.enabled' = 'true' ${bucketProperties(bucketEnable)})") + sql("INSERT INTO t SELECT /*+ REPARTITION(1) */ id, id AS b, id AS c FROM range(2, 4)") + assert( + intercept[RuntimeException] { + sql("UPDATE t SET b = 22") + }.getMessage + .contains("Update operation is not supported when data evolution is enabled yet.")) } } } + } - - Seq(false, true).foreach { - bucketEnable => - { - test(s"Data Evolution: delete table throws exception, bucket: $bucketEnable") { - withTable("t") { - sql( - s"CREATE TABLE t (id INT, b INT, c INT) TBLPROPERTIES ('row-tracking.enabled' = 'true', 'data-evolution.enabled' = 'true' ${bucketProperties(bucketEnable)})") - sql("INSERT INTO t SELECT /*+ REPARTITION(1) */ id, id AS b, id AS c FROM range(2, 4)") - assert( - intercept[RuntimeException] { - sql("DELETE FROM t WHERE id = 2") - }.getMessage - .contains("Delete operation is not supported when data evolution is enabled yet.")) - } + Seq(false, true).foreach { + bucketEnable => + { + test(s"Data Evolution: delete table throws exception, bucket: $bucketEnable") { + withTable("t") { + sql( + s"CREATE TABLE t (id INT, b INT, c INT) TBLPROPERTIES ('row-tracking.enabled' = 'true', 'data-evolution.enabled' = 'true' ${bucketProperties(bucketEnable)})") + sql("INSERT INTO t SELECT /*+ REPARTITION(1) */ id, id AS b, id AS c FROM range(2, 4)") + assert( + intercept[RuntimeException] { + sql("DELETE FROM t WHERE id = 2") + }.getMessage + .contains("Delete operation is not supported when data evolution is enabled yet.")) } } } + } Seq(false, true).foreach { bucketEnable => @@ -765,7 +763,7 @@ abstract class RowTrackingTestBase extends PaimonSparkTestBase { def bucketProperties(enableBucket: Boolean): String = { if (enableBucket) { - ",'bucket'='2','bucket-key'='id'" + ",'bucket'='2','bucket-key'='id', 'bucket-append-ordered'='false'" } else { "" } @@ -806,5 +804,4 @@ abstract class RowTrackingTestBase extends PaimonSparkTestBase { } private def escapeSingleQuotes(s: String): String = s.replace("'", "''") - } From d5be0f293b92ceed368d7aa0da6d8700a606808a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=8F=B2=E5=A4=A7=E6=B4=8B?= Date: Fri, 9 Jan 2026 11:35:03 +0800 Subject: [PATCH 05/11] [spark] Support data evolution for bucket table --- .../org/apache/paimon/spark/sql/RowTrackingTestBase.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/RowTrackingTestBase.scala b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/RowTrackingTestBase.scala index b82449ecafc6..b3264fed6618 100644 --- a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/RowTrackingTestBase.scala +++ b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/RowTrackingTestBase.scala @@ -714,8 +714,8 @@ abstract class RowTrackingTestBase extends PaimonSparkTestBase { Row(2, 200, "c2", 1, 6), Row(3, 300, "c33", 2, 6), Row(5, 550, "c5", 4, 6), - Row(7, 700, "c77", 9, 6), - Row(9, 990, "c99", 10, 6)) + Row(7, 700, "c77", 5, 6), + Row(9, 990, "c99", 6, 6)) ) } } From e339bdced70a4c463b8a87bc3e8837dd93630a3b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=8F=B2=E5=A4=A7=E6=B4=8B?= Date: Wed, 14 Jan 2026 15:22:01 +0800 Subject: [PATCH 06/11] [spark] Support data evolution for bucket table --- .../operation/BucketedAppendFileStoreWrite.java | 2 +- .../paimon/spark/sql/RowTrackingTestBase.scala | 15 ++++++++------- 2 files changed, 9 insertions(+), 8 deletions(-) diff --git a/paimon-core/src/main/java/org/apache/paimon/operation/BucketedAppendFileStoreWrite.java b/paimon-core/src/main/java/org/apache/paimon/operation/BucketedAppendFileStoreWrite.java index d4dba1b61220..bdc3867e7ec1 100644 --- a/paimon-core/src/main/java/org/apache/paimon/operation/BucketedAppendFileStoreWrite.java +++ b/paimon-core/src/main/java/org/apache/paimon/operation/BucketedAppendFileStoreWrite.java @@ -97,7 +97,7 @@ protected CompactManager getCompactManager( List restoredFiles, ExecutorService compactExecutor, @Nullable BucketedDvMaintainer dvMaintainer) { - if (options.writeOnly() || !options.bucketAppendOrdered()) { + if (options.writeOnly() || !options.dataEvolutionEnabled()) { return new NoopCompactManager(); } else if (options.bucketClusterEnabled()) { return new BucketedAppendClusterManager( diff --git a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/RowTrackingTestBase.scala b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/RowTrackingTestBase.scala index 8fecd2079a38..d35c7c57b18e 100644 --- a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/RowTrackingTestBase.scala +++ b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/RowTrackingTestBase.scala @@ -405,13 +405,14 @@ abstract class RowTrackingTestBase extends PaimonSparkTestBase { s"CREATE TABLE t (id INT, b INT, c INT) TBLPROPERTIES ('row-tracking.enabled' = 'true', 'data-evolution.enabled' = 'true','file.format' = '$format' ${bucketProperties(bucketEnable)})") sql("INSERT INTO t SELECT /*+ REPARTITION(1) */ * FROM VALUES (2, 2, 2),(4, 4, 4)") - sql(""" - |MERGE INTO t - |USING s - |ON t.id = s.id - |WHEN MATCHED THEN UPDATE SET t.b = s.b - |WHEN NOT MATCHED THEN INSERT (id, b, c) VALUES (id, b, 11) - |""".stripMargin) + sql( + """ + |MERGE INTO t + |USING s + |ON t.id = s.id + |WHEN MATCHED THEN UPDATE SET t.b = s.b + |WHEN NOT MATCHED THEN INSERT (id, b, c) VALUES (id, b, 11) + |""".stripMargin) checkAnswer(sql("SELECT count(*) FROM t"), Seq(Row(3))) checkAnswer( sql("SELECT *, _ROW_ID, _SEQUENCE_NUMBER FROM t ORDER BY id"), From 5a6780885ce74029be06e3bff021fba978a8deba Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=8F=B2=E5=A4=A7=E6=B4=8B?= Date: Wed, 14 Jan 2026 15:28:13 +0800 Subject: [PATCH 07/11] fix check-style --- .../spark/sql/RowTrackingTestBase.scala | 48 +++++++++---------- 1 file changed, 24 insertions(+), 24 deletions(-) diff --git a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/RowTrackingTestBase.scala b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/RowTrackingTestBase.scala index d35c7c57b18e..74c0785f1f83 100644 --- a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/RowTrackingTestBase.scala +++ b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/RowTrackingTestBase.scala @@ -395,32 +395,32 @@ abstract class RowTrackingTestBase extends PaimonSparkTestBase { Seq(false, true).foreach { bucketEnable => Seq("parquet", "avro").foreach { - format => { - test(s"Data Evolution: merge into table with data-evolution, bucket: $bucketEnable") { - withTable("s", "t") { - sql("CREATE TABLE s (id INT, b INT)") - sql("INSERT INTO s VALUES (1, 11), (2, 22)") - - sql( - s"CREATE TABLE t (id INT, b INT, c INT) TBLPROPERTIES ('row-tracking.enabled' = 'true', 'data-evolution.enabled' = 'true','file.format' = '$format' ${bucketProperties(bucketEnable)})") - sql("INSERT INTO t SELECT /*+ REPARTITION(1) */ * FROM VALUES (2, 2, 2),(4, 4, 4)") - - sql( - """ - |MERGE INTO t - |USING s - |ON t.id = s.id - |WHEN MATCHED THEN UPDATE SET t.b = s.b - |WHEN NOT MATCHED THEN INSERT (id, b, c) VALUES (id, b, 11) - |""".stripMargin) - checkAnswer(sql("SELECT count(*) FROM t"), Seq(Row(3))) - checkAnswer( - sql("SELECT *, _ROW_ID, _SEQUENCE_NUMBER FROM t ORDER BY id"), - Seq(Row(1, 11, 11, 2, 2), Row(2, 22, 2, 0, 2), Row(4, 4, 4, 1, 2)) - ) + format => + { + test(s"Data Evolution: merge into table with data-evolution, bucket: $bucketEnable") { + withTable("s", "t") { + sql("CREATE TABLE s (id INT, b INT)") + sql("INSERT INTO s VALUES (1, 11), (2, 22)") + + sql( + s"CREATE TABLE t (id INT, b INT, c INT) TBLPROPERTIES ('row-tracking.enabled' = 'true', 'data-evolution.enabled' = 'true','file.format' = '$format' ${bucketProperties(bucketEnable)})") + sql("INSERT INTO t SELECT /*+ REPARTITION(1) */ * FROM VALUES (2, 2, 2),(4, 4, 4)") + + sql(""" + |MERGE INTO t + |USING s + |ON t.id = s.id + |WHEN MATCHED THEN UPDATE SET t.b = s.b + |WHEN NOT MATCHED THEN INSERT (id, b, c) VALUES (id, b, 11) + |""".stripMargin) + checkAnswer(sql("SELECT count(*) FROM t"), Seq(Row(3))) + checkAnswer( + sql("SELECT *, _ROW_ID, _SEQUENCE_NUMBER FROM t ORDER BY id"), + Seq(Row(1, 11, 11, 2, 2), Row(2, 22, 2, 0, 2), Row(4, 4, 4, 1, 2)) + ) + } } } - } } } From 7938bc897d6a127f7dc2ec19b39ec993e7df0beb Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=8F=B2=E5=A4=A7=E6=B4=8B?= Date: Wed, 14 Jan 2026 15:32:52 +0800 Subject: [PATCH 08/11] fix bucket row-tracking compaction --- .../apache/paimon/operation/BucketedAppendFileStoreWrite.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/paimon-core/src/main/java/org/apache/paimon/operation/BucketedAppendFileStoreWrite.java b/paimon-core/src/main/java/org/apache/paimon/operation/BucketedAppendFileStoreWrite.java index bdc3867e7ec1..c2c937ebf965 100644 --- a/paimon-core/src/main/java/org/apache/paimon/operation/BucketedAppendFileStoreWrite.java +++ b/paimon-core/src/main/java/org/apache/paimon/operation/BucketedAppendFileStoreWrite.java @@ -97,7 +97,7 @@ protected CompactManager getCompactManager( List restoredFiles, ExecutorService compactExecutor, @Nullable BucketedDvMaintainer dvMaintainer) { - if (options.writeOnly() || !options.dataEvolutionEnabled()) { + if (options.writeOnly() || options.rowTrackingEnabled()) { return new NoopCompactManager(); } else if (options.bucketClusterEnabled()) { return new BucketedAppendClusterManager( From 0ddb37f2a6a9d7e7d9d27e01cf5daffef8fa94f5 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=8F=B2=E5=A4=A7=E6=B4=8B?= Date: Wed, 14 Jan 2026 15:37:31 +0800 Subject: [PATCH 09/11] fix bucket row-tracking compaction --- .../main/java/org/apache/paimon/schema/SchemaValidation.java | 5 ----- 1 file changed, 5 deletions(-) diff --git a/paimon-core/src/main/java/org/apache/paimon/schema/SchemaValidation.java b/paimon-core/src/main/java/org/apache/paimon/schema/SchemaValidation.java index 06a37e84add2..da26122b8307 100644 --- a/paimon-core/src/main/java/org/apache/paimon/schema/SchemaValidation.java +++ b/paimon-core/src/main/java/org/apache/paimon/schema/SchemaValidation.java @@ -638,11 +638,6 @@ private static void validateRowTracking(TableSchema schema, CoreOptions options) schema.primaryKeys().isEmpty(), "Cannot define %s for row tracking table.", PRIMARY_KEY.key()); - if (options.bucket() != -1) { - checkArgument( - !options.bucketAppendOrdered(), - "Row tracking config must disabled bucket-append-ordered in bucket table"); - } } if (options.dataEvolutionEnabled()) { From 94a0b2a8e838576faae118b466f2351d3771f08e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=8F=B2=E5=A4=A7=E6=B4=8B?= Date: Wed, 14 Jan 2026 16:19:03 +0800 Subject: [PATCH 10/11] fix ut --- .../org/apache/paimon/spark/sql/RowTrackingTestBase.scala | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/RowTrackingTestBase.scala b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/RowTrackingTestBase.scala index 74c0785f1f83..1ea15c4a0def 100644 --- a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/RowTrackingTestBase.scala +++ b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/RowTrackingTestBase.scala @@ -18,9 +18,7 @@ package org.apache.paimon.spark.sql -import org.apache.paimon.CoreOptions import org.apache.paimon.Snapshot.CommitKind -import org.apache.paimon.format.FileFormat import org.apache.paimon.spark.PaimonSparkTestBase import org.apache.spark.sql.Row @@ -397,7 +395,7 @@ abstract class RowTrackingTestBase extends PaimonSparkTestBase { Seq("parquet", "avro").foreach { format => { - test(s"Data Evolution: merge into table with data-evolution, bucket: $bucketEnable") { + test(s"Data Evolution: merge into table with data-evolution, bucket: $bucketEnable, format: $format") { withTable("s", "t") { sql("CREATE TABLE s (id INT, b INT)") sql("INSERT INTO s VALUES (1, 11), (2, 22)") From 3645f41d1c7e72263831d1d8931da357e9372465 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=8F=B2=E5=A4=A7=E6=B4=8B?= Date: Wed, 14 Jan 2026 16:25:27 +0800 Subject: [PATCH 11/11] fix check-style --- .../org/apache/paimon/spark/sql/RowTrackingTestBase.scala | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/RowTrackingTestBase.scala b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/RowTrackingTestBase.scala index 1ea15c4a0def..8f9264529437 100644 --- a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/RowTrackingTestBase.scala +++ b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/RowTrackingTestBase.scala @@ -395,7 +395,8 @@ abstract class RowTrackingTestBase extends PaimonSparkTestBase { Seq("parquet", "avro").foreach { format => { - test(s"Data Evolution: merge into table with data-evolution, bucket: $bucketEnable, format: $format") { + test( + s"Data Evolution: merge into table with data-evolution, bucket: $bucketEnable, format: $format") { withTable("s", "t") { sql("CREATE TABLE s (id INT, b INT)") sql("INSERT INTO s VALUES (1, 11), (2, 22)")