diff --git a/docs/content/append-table/row-tracking.md b/docs/content/append-table/row-tracking.md index c1c815c487c7..ed04010c5fcc 100644 --- a/docs/content/append-table/row-tracking.md +++ b/docs/content/append-table/row-tracking.md @@ -47,7 +47,8 @@ CREATE TABLE part_t ( WITH ('row-tracking.enabled' = 'true'); ``` Notice that: -- Row tracking is only supported for unaware append tables, not for primary key tables. Which means you can't define `bucket` and `bucket-key` for the table. +- Row tracking is only supported for append tables, not for primary key tables. +- Config bucket-append-ordered must be set to false when using bucket tables. - Only spark support update, merge into and delete operations on row-tracking tables, Flink SQL does not support these operations yet. - This function is experimental, this line will be removed after being stable. diff --git a/paimon-api/src/main/java/org/apache/paimon/schema/TableSchema.java b/paimon-api/src/main/java/org/apache/paimon/schema/TableSchema.java index 8fee740fd1cd..f0432191586c 100644 --- a/paimon-api/src/main/java/org/apache/paimon/schema/TableSchema.java +++ b/paimon-api/src/main/java/org/apache/paimon/schema/TableSchema.java @@ -127,6 +127,31 @@ public TableSchema( numBucket = CoreOptions.fromMap(options).bucket(); } + private TableSchema( + int version, + long id, + List fields, + int highestFieldId, + List partitionKeys, + List primaryKeys, + Map options, + @Nullable String comment, + long timeMillis, + List bucketKeys, + int numBucket) { + this.version = version; + this.id = id; + this.fields = Collections.unmodifiableList(new ArrayList<>(fields)); + this.highestFieldId = highestFieldId; + this.partitionKeys = partitionKeys; + this.primaryKeys = primaryKeys; + this.options = options; + this.comment = comment; + this.timeMillis = timeMillis; + this.bucketKeys = bucketKeys; + this.numBucket = numBucket; + } + public int version() { return version; } @@ -294,7 +319,9 @@ public TableSchema project(@Nullable List writeCols) { primaryKeys, options, comment, - timeMillis); + timeMillis, + bucketKeys, + numBucket); } private List projectedDataFields(List projectedFieldNames) { diff --git a/paimon-core/src/main/java/org/apache/paimon/append/dataevolution/DataEvolutionCompactCoordinator.java b/paimon-core/src/main/java/org/apache/paimon/append/dataevolution/DataEvolutionCompactCoordinator.java index 14bc83c547b4..6259fd45e7a1 100644 --- a/paimon-core/src/main/java/org/apache/paimon/append/dataevolution/DataEvolutionCompactCoordinator.java +++ b/paimon-core/src/main/java/org/apache/paimon/append/dataevolution/DataEvolutionCompactCoordinator.java @@ -143,98 +143,115 @@ static class CompactPlanner { List compactPlan(List input) { List tasks = new ArrayList<>(); - Map> partitionedFiles = new LinkedHashMap<>(); + Map>> partitionedFiles = + new LinkedHashMap<>(); for (ManifestEntry entry : input) { partitionedFiles - .computeIfAbsent(entry.partition(), k -> new ArrayList<>()) + .computeIfAbsent(entry.partition(), k -> new LinkedHashMap<>()) + .computeIfAbsent(entry.bucket(), k -> new ArrayList<>()) .add(entry.file()); } - for (Map.Entry> partitionFiles : + for (Map.Entry>> partitionFiles : partitionedFiles.entrySet()) { - BinaryRow partition = partitionFiles.getKey(); - List files = partitionFiles.getValue(); - RangeHelper rangeHelper = - new RangeHelper<>( - DataFileMeta::nonNullFirstRowId, - // merge adjacent files - f -> f.nonNullFirstRowId() + f.rowCount()); + for (Map.Entry> bucketFiles : + partitionFiles.getValue().entrySet()) { + BinaryRow partition = partitionFiles.getKey(); + int bucket = bucketFiles.getKey(); + List files = bucketFiles.getValue(); + RangeHelper rangeHelper = + new RangeHelper<>( + DataFileMeta::nonNullFirstRowId, + // merge adjacent files + f -> f.nonNullFirstRowId() + f.rowCount()); - List> ranges = rangeHelper.mergeOverlappingRanges(files); + List> ranges = rangeHelper.mergeOverlappingRanges(files); - for (List group : ranges) { - List dataFiles = new ArrayList<>(); - List blobFiles = new ArrayList<>(); - TreeMap treeMap = new TreeMap<>(); - Map> dataFileToBlobFiles = new HashMap<>(); - for (DataFileMeta f : group) { - if (!isBlobFile(f.fileName())) { - treeMap.put(f.nonNullFirstRowId(), f); - dataFiles.add(f); - } else { - blobFiles.add(f); + for (List group : ranges) { + List dataFiles = new ArrayList<>(); + List blobFiles = new ArrayList<>(); + TreeMap treeMap = new TreeMap<>(); + Map> dataFileToBlobFiles = new HashMap<>(); + for (DataFileMeta f : group) { + if (!isBlobFile(f.fileName())) { + treeMap.put(f.nonNullFirstRowId(), f); + dataFiles.add(f); + } else { + blobFiles.add(f); + } } - } - if (compactBlob) { - // associate blob files to data files - for (DataFileMeta blobFile : blobFiles) { - Long key = treeMap.floorKey(blobFile.nonNullFirstRowId()); - if (key != null) { - DataFileMeta dataFile = treeMap.get(key); - if (blobFile.nonNullFirstRowId() >= dataFile.nonNullFirstRowId() - && blobFile.nonNullFirstRowId() - <= dataFile.nonNullFirstRowId() - + dataFile.rowCount() - - 1) { - dataFileToBlobFiles - .computeIfAbsent(dataFile, k -> new ArrayList<>()) - .add(blobFile); + if (compactBlob) { + // associate blob files to data files + for (DataFileMeta blobFile : blobFiles) { + Long key = treeMap.floorKey(blobFile.nonNullFirstRowId()); + if (key != null) { + DataFileMeta dataFile = treeMap.get(key); + if (blobFile.nonNullFirstRowId() >= dataFile.nonNullFirstRowId() + && blobFile.nonNullFirstRowId() + <= dataFile.nonNullFirstRowId() + + dataFile.rowCount() + - 1) { + dataFileToBlobFiles + .computeIfAbsent(dataFile, k -> new ArrayList<>()) + .add(blobFile); + } } } } - } - RangeHelper rangeHelper2 = - new RangeHelper<>( - DataFileMeta::nonNullFirstRowId, - // files group - f -> f.nonNullFirstRowId() + f.rowCount() - 1); - List> groupedFiles = - rangeHelper2.mergeOverlappingRanges(dataFiles); - List waitCompactFiles = new ArrayList<>(); + RangeHelper rangeHelper2 = + new RangeHelper<>( + DataFileMeta::nonNullFirstRowId, + // files group + f -> f.nonNullFirstRowId() + f.rowCount() - 1); + List> groupedFiles = + rangeHelper2.mergeOverlappingRanges(dataFiles); + List waitCompactFiles = new ArrayList<>(); - long weightSum = 0L; - for (List fileGroup : groupedFiles) { - checkArgument( - rangeHelper.areAllRangesSame(fileGroup), - "Data files %s should be all row id ranges same.", - dataFiles); - long currentGroupWeight = - fileGroup.stream() - .mapToLong(d -> Math.max(d.fileSize(), openFileCost)) - .sum(); - if (currentGroupWeight > targetFileSize) { - // compact current file group to merge field files - tasks.addAll(triggerTask(fileGroup, partition, dataFileToBlobFiles)); - // compact wait compact files - tasks.addAll( - triggerTask(waitCompactFiles, partition, dataFileToBlobFiles)); - waitCompactFiles = new ArrayList<>(); - weightSum = 0; - } else { - weightSum += currentGroupWeight; - waitCompactFiles.addAll(fileGroup); - if (weightSum > targetFileSize) { + long weightSum = 0L; + for (List fileGroup : groupedFiles) { + checkArgument( + rangeHelper.areAllRangesSame(fileGroup), + "Data files %s should be all row id ranges same.", + dataFiles); + long currentGroupWeight = + fileGroup.stream() + .mapToLong(d -> Math.max(d.fileSize(), openFileCost)) + .sum(); + if (currentGroupWeight > targetFileSize) { + // compact current file group to merge field files tasks.addAll( triggerTask( - waitCompactFiles, partition, dataFileToBlobFiles)); + fileGroup, partition, bucket, dataFileToBlobFiles)); + // compact wait compact files + tasks.addAll( + triggerTask( + waitCompactFiles, + partition, + bucket, + dataFileToBlobFiles)); waitCompactFiles = new ArrayList<>(); - weightSum = 0L; + weightSum = 0; + } else { + weightSum += currentGroupWeight; + waitCompactFiles.addAll(fileGroup); + if (weightSum > targetFileSize) { + tasks.addAll( + triggerTask( + waitCompactFiles, + partition, + bucket, + dataFileToBlobFiles)); + waitCompactFiles = new ArrayList<>(); + weightSum = 0L; + } } } + tasks.addAll( + triggerTask( + waitCompactFiles, partition, bucket, dataFileToBlobFiles)); } - tasks.addAll(triggerTask(waitCompactFiles, partition, dataFileToBlobFiles)); } } return tasks; @@ -243,10 +260,11 @@ List compactPlan(List input) { private List triggerTask( List dataFiles, BinaryRow partition, + int bucket, Map> dataFileToBlobFiles) { List tasks = new ArrayList<>(); if (dataFiles.size() >= compactMinFileNum) { - tasks.add(new DataEvolutionCompactTask(partition, dataFiles, false)); + tasks.add(new DataEvolutionCompactTask(partition, bucket, dataFiles, false)); } if (compactBlob) { @@ -256,7 +274,7 @@ private List triggerTask( dataFileToBlobFiles.getOrDefault(dataFile, Collections.emptyList())); } if (blobFiles.size() >= compactMinFileNum) { - tasks.add(new DataEvolutionCompactTask(partition, blobFiles, true)); + tasks.add(new DataEvolutionCompactTask(partition, bucket, blobFiles, true)); } } return tasks; diff --git a/paimon-core/src/main/java/org/apache/paimon/append/dataevolution/DataEvolutionCompactTask.java b/paimon-core/src/main/java/org/apache/paimon/append/dataevolution/DataEvolutionCompactTask.java index d4da47bdc574..639a60d826c0 100644 --- a/paimon-core/src/main/java/org/apache/paimon/append/dataevolution/DataEvolutionCompactTask.java +++ b/paimon-core/src/main/java/org/apache/paimon/append/dataevolution/DataEvolutionCompactTask.java @@ -52,13 +52,15 @@ public class DataEvolutionCompactTask { Collections.singletonMap(CoreOptions.TARGET_FILE_SIZE.key(), "99999 G"); private final BinaryRow partition; + private final int bucket; private final List compactBefore; private final List compactAfter; private final boolean blobTask; public DataEvolutionCompactTask( - BinaryRow partition, List files, boolean blobTask) { + BinaryRow partition, int bucket, List files, boolean blobTask) { this.partition = partition; + this.bucket = bucket; this.compactBefore = new ArrayList<>(files); this.compactAfter = new ArrayList<>(); this.blobTask = blobTask; @@ -68,6 +70,10 @@ public BinaryRow partition() { return partition; } + public int getBucket() { + return bucket; + } + public List compactBefore() { return compactBefore; } @@ -100,7 +106,7 @@ public CommitMessage doCompact(FileStoreTable table, String commitUser) throws E DataSplit dataSplit = DataSplit.builder() .withPartition(partition) - .withBucket(0) + .withBucket(bucket) .withDataFiles(compactBefore) .withBucketPath(pathFactory.bucketPath(partition, 0).toString()) .rawConvertible(false) diff --git a/paimon-core/src/main/java/org/apache/paimon/append/dataevolution/DataEvolutionCompactTaskSerializer.java b/paimon-core/src/main/java/org/apache/paimon/append/dataevolution/DataEvolutionCompactTaskSerializer.java index 821f9f15e741..2c59680822b6 100644 --- a/paimon-core/src/main/java/org/apache/paimon/append/dataevolution/DataEvolutionCompactTaskSerializer.java +++ b/paimon-core/src/main/java/org/apache/paimon/append/dataevolution/DataEvolutionCompactTaskSerializer.java @@ -68,6 +68,7 @@ public void serializeList(List list, DataOutputView vi private void serialize(DataEvolutionCompactTask task, DataOutputView view) throws IOException { serializeBinaryRow(task.partition(), view); + view.writeInt(task.getBucket()); dataFileSerializer.serializeList(task.compactBefore(), view); view.writeBoolean(task.isBlobTask()); } @@ -105,6 +106,7 @@ private void checkVersion(int version) { private DataEvolutionCompactTask deserialize(DataInputView view) throws IOException { return new DataEvolutionCompactTask( deserializeBinaryRow(view), + view.readInt(), dataFileSerializer.deserializeList(view), view.readBoolean()); } diff --git a/paimon-core/src/main/java/org/apache/paimon/operation/BucketedAppendFileStoreWrite.java b/paimon-core/src/main/java/org/apache/paimon/operation/BucketedAppendFileStoreWrite.java index b93c2b511509..c2c937ebf965 100644 --- a/paimon-core/src/main/java/org/apache/paimon/operation/BucketedAppendFileStoreWrite.java +++ b/paimon-core/src/main/java/org/apache/paimon/operation/BucketedAppendFileStoreWrite.java @@ -97,7 +97,7 @@ protected CompactManager getCompactManager( List restoredFiles, ExecutorService compactExecutor, @Nullable BucketedDvMaintainer dvMaintainer) { - if (options.writeOnly()) { + if (options.writeOnly() || options.rowTrackingEnabled()) { return new NoopCompactManager(); } else if (options.bucketClusterEnabled()) { return new BucketedAppendClusterManager( diff --git a/paimon-core/src/main/java/org/apache/paimon/schema/SchemaValidation.java b/paimon-core/src/main/java/org/apache/paimon/schema/SchemaValidation.java index 1378ecc58651..da26122b8307 100644 --- a/paimon-core/src/main/java/org/apache/paimon/schema/SchemaValidation.java +++ b/paimon-core/src/main/java/org/apache/paimon/schema/SchemaValidation.java @@ -634,10 +634,6 @@ private static void validateMergeFunctionFactory(TableSchema schema) { private static void validateRowTracking(TableSchema schema, CoreOptions options) { boolean rowTrackingEnabled = options.rowTrackingEnabled(); if (rowTrackingEnabled) { - checkArgument( - options.bucket() == -1, - "Cannot define %s for row tracking table, it only support bucket = -1", - CoreOptions.BUCKET.key()); checkArgument( schema.primaryKeys().isEmpty(), "Cannot define %s for row tracking table.", diff --git a/paimon-core/src/test/java/org/apache/paimon/append/dataevolution/DataEvolutionCompactCoordinatorTest.java b/paimon-core/src/test/java/org/apache/paimon/append/dataevolution/DataEvolutionCompactCoordinatorTest.java index de4545c6df8f..1767e187618b 100644 --- a/paimon-core/src/test/java/org/apache/paimon/append/dataevolution/DataEvolutionCompactCoordinatorTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/append/dataevolution/DataEvolutionCompactCoordinatorTest.java @@ -247,7 +247,7 @@ public void testSerializerBasic() throws IOException { createDataFileMeta("file2.parquet", 100L, 100L, 0, 1024)); DataEvolutionCompactTask task = - new DataEvolutionCompactTask(BinaryRow.EMPTY_ROW, files, false); + new DataEvolutionCompactTask(BinaryRow.EMPTY_ROW, 0, files, false); byte[] bytes = serializer.serialize(task); DataEvolutionCompactTask deserialized = @@ -266,7 +266,7 @@ public void testSerializerBlobTask() throws IOException { createDataFileMeta("file2.blob", 0L, 100L, 0, 1024)); DataEvolutionCompactTask task = - new DataEvolutionCompactTask(BinaryRow.EMPTY_ROW, files, true); + new DataEvolutionCompactTask(BinaryRow.EMPTY_ROW, 0, files, true); byte[] bytes = serializer.serialize(task); DataEvolutionCompactTask deserialized = @@ -286,7 +286,7 @@ public void testSerializerWithPartition() throws IOException { createDataFileMeta("file2.parquet", 100L, 100L, 0, 1024)); BinaryRow partition = BinaryRow.singleColumn(42); - DataEvolutionCompactTask task = new DataEvolutionCompactTask(partition, files, false); + DataEvolutionCompactTask task = new DataEvolutionCompactTask(partition, 0, files, false); byte[] bytes = serializer.serialize(task); DataEvolutionCompactTask deserialized = diff --git a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/CompactProcedure.java b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/CompactProcedure.java index 7785735d04fd..547b61877d4d 100644 --- a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/CompactProcedure.java +++ b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/CompactProcedure.java @@ -259,12 +259,17 @@ private boolean execute( switch (bucketMode) { case HASH_FIXED: case HASH_DYNAMIC: - compactAwareBucketTable( - table, - fullCompact, - partitionPredicate, - partitionIdleTime, - javaSparkContext); + if (table.coreOptions().dataEvolutionEnabled()) { + compactDataEvolutionTable( + table, partitionPredicate, partitionIdleTime, javaSparkContext); + } else { + compactAwareBucketTable( + table, + fullCompact, + partitionPredicate, + partitionIdleTime, + javaSparkContext); + } break; case BUCKET_UNAWARE: if (table.coreOptions().dataEvolutionEnabled()) { diff --git a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/analysis/PaimonMergeIntoBase.scala b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/analysis/PaimonMergeIntoBase.scala index 8a52273eeab2..04336bbc8098 100644 --- a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/analysis/PaimonMergeIntoBase.scala +++ b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/analysis/PaimonMergeIntoBase.scala @@ -18,15 +18,19 @@ package org.apache.paimon.spark.catalyst.analysis +import org.apache.paimon.CoreOptions.BUCKET_KEY import org.apache.paimon.spark.SparkTable import org.apache.paimon.spark.catalyst.analysis.expressions.ExpressionHelper import org.apache.paimon.spark.commands.{MergeIntoPaimonDataEvolutionTable, MergeIntoPaimonTable} +import org.apache.paimon.utils.StringUtils import org.apache.spark.sql.SparkSession import org.apache.spark.sql.catalyst.expressions.{AttributeReference, AttributeSet, Expression, SubqueryExpression} import org.apache.spark.sql.catalyst.plans.logical._ import org.apache.spark.sql.catalyst.rules.Rule +import java.util + import scala.collection.JavaConverters._ trait PaimonMergeIntoBase @@ -72,6 +76,13 @@ trait PaimonMergeIntoBase resolveNotMatchedBySourceActions(merge, targetOutput, dataEvolutionEnabled) if (dataEvolutionEnabled) { + val bucketKeySt = v2Table.getTable.options().get(BUCKET_KEY.key) + if (!StringUtils.isNullOrWhitespaceOnly(bucketKeySt)) { + checkUpdateActionValidityForBucketKey( + AttributeSet(targetOutput), + updateActions, + bucketKeySt.split(",").toSeq) + } MergeIntoPaimonDataEvolutionTable( v2Table, merge.targetTable, @@ -142,7 +153,7 @@ trait PaimonMergeIntoBase lazy val isMergeConditionValid = { val mergeExpressions = splitConjunctivePredicates(mergeCondition) primaryKeys.forall { - primaryKey => isUpdateExpressionToPrimaryKey(targetOutput, mergeExpressions, primaryKey) + primaryKey => isUpdateExpressionForKey(targetOutput, mergeExpressions, primaryKey) } } @@ -156,4 +167,22 @@ trait PaimonMergeIntoBase throw new RuntimeException("Can't update the primary key column in update clause.") } } + + /** This check will avoid to update the bucket key columns */ + private def checkUpdateActionValidityForBucketKey( + targetOutput: AttributeSet, + actions: Seq[UpdateAction], + bucketKeys: Seq[String]): Unit = { + + // Check whether there are an update expression related to any primary key. + def isUpdateActionValid(action: UpdateAction): Boolean = { + validUpdateAssignment(targetOutput, bucketKeys, action.assignments) + } + + val valid = actions.forall(isUpdateActionValid) + if (!valid) { + throw new RuntimeException( + "Can't update the bucket key column in data-evolution update clause.") + } + } } diff --git a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/analysis/RowLevelHelper.scala b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/analysis/RowLevelHelper.scala index eecf0542e1d1..534ba8c60b03 100644 --- a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/analysis/RowLevelHelper.scala +++ b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/analysis/RowLevelHelper.scala @@ -41,23 +41,21 @@ trait RowLevelHelper extends SQLConfHelper { protected def validUpdateAssignment( output: AttributeSet, - primaryKeys: Seq[String], + keys: Seq[String], assignments: Seq[Assignment]): Boolean = { - !primaryKeys.exists { - primaryKey => isUpdateExpressionToPrimaryKey(output, assignments, primaryKey) - } + !keys.exists(key => isUpdateExpressionForKey(output, assignments, key)) } // Check whether there is an update expression related to primary key. - protected def isUpdateExpressionToPrimaryKey( + protected def isUpdateExpressionForKey( output: AttributeSet, expressions: Seq[Expression], - primaryKey: String): Boolean = { + key: String): Boolean = { val resolver = conf.resolver // Check whether this attribute is same to primary key and is from target table. - def isTargetPrimaryKey(attr: AttributeReference): Boolean = { - resolver(primaryKey, attr.name) && output.contains(attr) + def isTargetKey(attr: AttributeReference): Boolean = { + resolver(key, attr.name) && output.contains(attr) } expressions @@ -67,9 +65,9 @@ trait RowLevelHelper extends SQLConfHelper { } .exists { case EqualTo(left: AttributeReference, right: AttributeReference) => - isTargetPrimaryKey(left) || isTargetPrimaryKey(right) + isTargetKey(left) || isTargetKey(right) case Assignment(key: AttributeReference, _) => - isTargetPrimaryKey(key) + isTargetKey(key) case _ => false } } diff --git a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/DataEvolutionPaimonWriter.scala b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/DataEvolutionPaimonWriter.scala index c3e67895aba9..49ff8f9979b1 100644 --- a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/DataEvolutionPaimonWriter.scala +++ b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/DataEvolutionPaimonWriter.scala @@ -37,12 +37,14 @@ import scala.collection.mutable case class DataEvolutionPaimonWriter(paimonTable: FileStoreTable) extends WriteHelper { - private lazy val firstRowIdToPartitionMap: mutable.HashMap[Long, (BinaryRow, Long)] = + private lazy val firstRowIdToPartitionMap: mutable.HashMap[Long, (BinaryRow, Int, Long)] = initPartitionMap() override val table: FileStoreTable = paimonTable.copy(dynamicOp) - private def initPartitionMap(): mutable.HashMap[Long, (BinaryRow, Long)] = { - val firstRowIdToPartitionMap = new mutable.HashMap[Long, (BinaryRow, Long)] + @transient private lazy val serializer = new CommitMessageSerializer + + private def initPartitionMap(): mutable.HashMap[Long, (BinaryRow, Int, Long)] = { + val firstRowIdToPartitionMap = new mutable.HashMap[Long, (BinaryRow, Int, Long)] table .store() .newScan() @@ -50,7 +52,7 @@ case class DataEvolutionPaimonWriter(paimonTable: FileStoreTable) extends WriteH .forEachRemaining( k => firstRowIdToPartitionMap - .put(k.file().firstRowId(), (k.partition(), k.file().rowCount()))) + .put(k.file().firstRowId(), (k.partition(), k.bucket(), k.file().rowCount()))) firstRowIdToPartitionMap } diff --git a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/write/DataEvolutionTableDataWrite.scala b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/write/DataEvolutionTableDataWrite.scala index f1b2c55b4cd8..b204b31bc9ea 100644 --- a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/write/DataEvolutionTableDataWrite.scala +++ b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/write/DataEvolutionTableDataWrite.scala @@ -39,7 +39,7 @@ import scala.collection.mutable.ListBuffer case class DataEvolutionTableDataWrite( writeBuilder: BatchWriteBuilder, writeType: RowType, - firstRowIdToPartitionMap: mutable.HashMap[Long, (BinaryRow, Long)], + firstRowIdToPartitionMap: mutable.HashMap[Long, (BinaryRow, Int, Long)], blobAsDescriptor: Boolean, catalogContext: CatalogContext) extends InnerTableV1DataWrite { @@ -67,7 +67,7 @@ case class DataEvolutionTableDataWrite( def newCurrentWriter(firstRowId: Long): Unit = { finishCurrentWriter() - val (partition, numRecords) = firstRowIdToPartitionMap.getOrElse(firstRowId, null) + val (partition, bucket, numRecords) = firstRowIdToPartitionMap.getOrElse(firstRowId, null) if (partition == null) { throw new IllegalArgumentException( s"First row ID $firstRowId not found in partition map. " + @@ -80,8 +80,8 @@ case class DataEvolutionTableDataWrite( .asInstanceOf[TableWriteImpl[InternalRow]] .getWrite .asInstanceOf[AbstractFileStoreWrite[InternalRow]] - .createWriter(partition, 0) - currentWriter = PerFileWriter(partition, firstRowId, writer, numRecords) + .createWriter(partition, bucket) + currentWriter = PerFileWriter(partition, bucket, firstRowId, writer, numRecords) } def finishCurrentWriter(): Unit = { @@ -107,6 +107,7 @@ case class DataEvolutionTableDataWrite( private case class PerFileWriter( partition: BinaryRow, + bucket: Int, firstRowId: Long, recordWriter: RecordWriter[InternalRow], numRecords: Long) { @@ -134,7 +135,7 @@ case class DataEvolutionTableDataWrite( val dataFileMeta = dataFiles.get(0).assignFirstRowId(firstRowId) new CommitMessageImpl( partition, - 0, + bucket, null, new DataIncrement( java.util.Arrays.asList(dataFileMeta), diff --git a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/RowTrackingTestBase.scala b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/RowTrackingTestBase.scala index 12639e6ef3bd..8f9264529437 100644 --- a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/RowTrackingTestBase.scala +++ b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/RowTrackingTestBase.scala @@ -18,9 +18,7 @@ package org.apache.paimon.spark.sql -import org.apache.paimon.CoreOptions import org.apache.paimon.Snapshot.CommitKind -import org.apache.paimon.format.FileFormat import org.apache.paimon.spark.PaimonSparkTestBase import org.apache.spark.sql.Row @@ -32,30 +30,35 @@ import java.util.concurrent.{CountDownLatch, TimeUnit} abstract class RowTrackingTestBase extends PaimonSparkTestBase { - test("Row Tracking: read row Tracking") { - withTable("t") { - sql("CREATE TABLE t (id INT, data STRING) TBLPROPERTIES ('row-tracking.enabled' = 'true')") - sql("INSERT INTO t VALUES (11, 'a'), (22, 'b')") - - checkAnswer( - sql("SELECT *, _ROW_ID, _SEQUENCE_NUMBER FROM t"), - Seq(Row(11, "a", 0, 1), Row(22, "b", 1, 1)) - ) - checkAnswer( - sql("SELECT _ROW_ID, data, _SEQUENCE_NUMBER, id FROM t"), - Seq(Row(0, "a", 1, 11), Row(1, "b", 1, 22)) - ) - } + Seq(false, true).foreach { + bucketEnable => + { + test(s"Row Tracking: read row Tracking, bucket: $bucketEnable") { + withTable("t") { + sql( + s"CREATE TABLE t (id INT, data STRING) TBLPROPERTIES ('row-tracking.enabled' = 'true' ${bucketProperties(bucketEnable)})") + sql("INSERT INTO t VALUES (11, 'a'), (11, 'b')") + sql("INSERT INTO t VALUES (22, 'a'), (22, 'b')") + + checkAnswer( + sql("SELECT *, _ROW_ID, _SEQUENCE_NUMBER FROM t"), + Seq(Row(11, "a", 0, 1), Row(11, "b", 1, 1), Row(22, "a", 2, 2), Row(22, "b", 3, 2)) + ) + checkAnswer( + sql("SELECT _ROW_ID, data, _SEQUENCE_NUMBER, id FROM t"), + Seq(Row(0, "a", 1, 11), Row(1, "b", 1, 11), Row(2, "a", 2, 22), Row(3, "b", 2, 22)) + ) + } + } + } } test("Row Tracking: compact table") { withTable("t") { sql( - "CREATE TABLE t (id INT, data INT) TBLPROPERTIES ('row-tracking.enabled' = 'true', 'compaction.min.file-num'='2')") + s"CREATE TABLE t (id INT, data INT) TBLPROPERTIES ('row-tracking.enabled' = 'true', 'compaction.min.file-num'='2')") - sql("INSERT INTO t VALUES (1, 1)") - sql("INSERT INTO t VALUES (2, 2)") - sql("INSERT INTO t VALUES (3, 3)") + batchInsert(Seq((1, 1), (2, 2), (3, 3)), "t") checkAnswer( sql("SELECT *, _ROW_ID, _SEQUENCE_NUMBER FROM t ORDER BY id"), Seq(Row(1, 1, 0, 1), Row(2, 2, 1, 2), Row(3, 3, 2, 3)) @@ -71,80 +74,110 @@ abstract class RowTrackingTestBase extends PaimonSparkTestBase { } } - test("Row Tracking: delete table") { + test("Row Tracking: compact bucket table") { withTable("t") { - // only enable row tracking - sql("CREATE TABLE t (id INT, data INT) TBLPROPERTIES ('row-tracking.enabled' = 'true')") - runAndCheckAnswer() - sql("DROP TABLE t") - - // enable row tracking and deletion vectors sql( - "CREATE TABLE t (id INT, data INT) TBLPROPERTIES ('row-tracking.enabled' = 'true', 'deletion-vectors.enabled' = 'true')") - runAndCheckAnswer() - - def runAndCheckAnswer(): Unit = { - sql("INSERT INTO t SELECT /*+ REPARTITION(1) */ id, id AS data FROM range(1, 4)") - sql("DELETE FROM t WHERE id = 2") - checkAnswer( - sql("SELECT *, _ROW_ID, _SEQUENCE_NUMBER FROM t ORDER BY id"), - Seq(Row(1, 1, 0, 1), Row(3, 3, 2, 1)) - ) - sql("DELETE FROM t WHERE _ROW_ID = 2") - checkAnswer( - sql("SELECT *, _ROW_ID, _SEQUENCE_NUMBER FROM t ORDER BY id"), - Seq(Row(1, 1, 0, 1)) - ) - } + s"CREATE TABLE t (id INT, data INT) TBLPROPERTIES ('row-tracking.enabled' = 'true', 'compaction.min.file-num'='2' ${bucketProperties(true)})") + + batchInsert(Seq((1, 1), (2, 2), (3, 3)), "t") + checkAnswer( + sql("SELECT *, _ROW_ID, _SEQUENCE_NUMBER FROM t ORDER BY id"), + Seq(Row(1, 1, 0, 1), Row(2, 2, 1, 2), Row(3, 3, 2, 3)) + ) } } - test("Row Tracking: update table") { - withTable("t") { - // only enable row tracking - sql("CREATE TABLE t (id INT, data INT) TBLPROPERTIES ('row-tracking.enabled' = 'true')") - runAndCheckAnswer() - sql("DROP TABLE t") - - // enable row tracking and deletion vectors - sql( - "CREATE TABLE t (id INT, data INT) TBLPROPERTIES ('row-tracking.enabled' = 'true', 'deletion-vectors.enabled' = 'true')") - runAndCheckAnswer() - - def runAndCheckAnswer(): Unit = { - sql("INSERT INTO t SELECT /*+ REPARTITION(1) */ id, id AS data FROM range(1, 4)") - checkAnswer( - sql("SELECT *, _ROW_ID, _SEQUENCE_NUMBER FROM t ORDER BY id"), - Seq(Row(1, 1, 0, 1), Row(2, 2, 1, 1), Row(3, 3, 2, 1)) - ) - - sql("UPDATE t SET data = 22 WHERE id = 2") - checkAnswer( - sql("SELECT *, _ROW_ID, _SEQUENCE_NUMBER FROM t ORDER BY id"), - Seq(Row(1, 1, 0, 1), Row(2, 22, 1, 2), Row(3, 3, 2, 1)) - ) - - sql("UPDATE t SET data = 222 WHERE _ROW_ID = 1") - checkAnswer( - sql("SELECT *, _ROW_ID, _SEQUENCE_NUMBER FROM t ORDER BY id"), - Seq(Row(1, 1, 0, 1), Row(2, 222, 1, 3), Row(3, 3, 2, 1)) - ) + Seq(false, true).foreach { + bucketEnable => + { + test(s"Row Tracking: delete table, bucket: $bucketEnable") { + withTable("t") { + // only enable row tracking + sql("CREATE TABLE t (id INT, data INT) TBLPROPERTIES ('row-tracking.enabled' = 'true')") + runAndCheckAnswer() + sql("DROP TABLE t") + + // enable row tracking and deletion vectors + sql( + "CREATE TABLE t (id INT, data INT) TBLPROPERTIES ('row-tracking.enabled' = 'true', 'deletion-vectors.enabled' = 'true')") + runAndCheckAnswer() + + def runAndCheckAnswer(): Unit = { + batchInsert(Seq((1, 1), (2, 2), (3, 3)), "t") + + sql("DELETE FROM t WHERE id = 2") + checkAnswer( + sql("SELECT *, _ROW_ID, _SEQUENCE_NUMBER FROM t ORDER BY id"), + Seq(Row(1, 1, 0, 1), Row(3, 3, 2, 3)) + ) + sql("DELETE FROM t WHERE _ROW_ID = 2") + checkAnswer( + sql("SELECT *, _ROW_ID, _SEQUENCE_NUMBER FROM t ORDER BY id"), + Seq(Row(1, 1, 0, 1)) + ) + } + } + } } - } } - test("Row Tracking: update table without condition") { - withTable("t") { - sql("CREATE TABLE t (id INT, data INT) TBLPROPERTIES ('row-tracking.enabled' = 'true')") - - sql("INSERT INTO t SELECT /*+ REPARTITION(1) */ id, id AS data FROM range(1, 4)") + Seq(false, true).foreach { + bucketEnable => + { + test(s"Row Tracking: update table, bucket: $bucketEnable") { + withTable("t") { + // only enable row tracking + sql("CREATE TABLE t (id INT, data INT) TBLPROPERTIES ('row-tracking.enabled' = 'true')") + runAndCheckAnswer() + sql("DROP TABLE t") + + // enable row tracking and deletion vectors + sql( + "CREATE TABLE t (id INT, data INT) TBLPROPERTIES ('row-tracking.enabled' = 'true', 'deletion-vectors.enabled' = 'true')") + runAndCheckAnswer() + + def runAndCheckAnswer(): Unit = { + batchInsert(Seq((1, 1), (2, 2), (3, 3)), "t") + checkAnswer( + sql("SELECT *, _ROW_ID, _SEQUENCE_NUMBER FROM t ORDER BY id"), + Seq(Row(1, 1, 0, 1), Row(2, 2, 1, 2), Row(3, 3, 2, 3)) + ) + + sql("UPDATE t SET data = 22 WHERE id = 2") + checkAnswer( + sql("SELECT *, _ROW_ID, _SEQUENCE_NUMBER FROM t ORDER BY id"), + Seq(Row(1, 1, 0, 1), Row(2, 22, 1, 4), Row(3, 3, 2, 3)) + ) + + sql("UPDATE t SET data = 222 WHERE _ROW_ID = 1") + checkAnswer( + sql("SELECT *, _ROW_ID, _SEQUENCE_NUMBER FROM t ORDER BY id"), + Seq(Row(1, 1, 0, 1), Row(2, 222, 1, 5), Row(3, 3, 2, 3)) + ) + } + } + } + } + } - sql("UPDATE t SET data = 22") - checkAnswer( - sql("SELECT *, _ROW_ID, _SEQUENCE_NUMBER FROM t ORDER BY id"), - Seq(Row(1, 22, 0, 2), Row(2, 22, 1, 2), Row(3, 22, 2, 2)) - ) - } + Seq(false, true).foreach { + bucketEnable => + { + test(s"Row Tracking: update table without condition, bucket: $bucketEnable") { + withTable("t") { + sql( + s"CREATE TABLE t (id INT, data INT) TBLPROPERTIES ('row-tracking.enabled' = 'true' ${bucketProperties(bucketEnable)})") + + batchInsert(Seq((1, 1), (2, 2), (3, 3)), "t") + + sql("UPDATE t SET data = 22") + checkAnswer( + sql("SELECT *, _ROW_ID, _SEQUENCE_NUMBER FROM t ORDER BY id"), + Seq(Row(1, 22, 0, 4), Row(2, 22, 1, 4), Row(3, 22, 2, 4)) + ) + } + } + } } test("Row Tracking: update") { @@ -163,216 +196,298 @@ abstract class RowTrackingTestBase extends PaimonSparkTestBase { } } - test("Row Tracking: merge into table") { - withTable("s", "t") { - sql("CREATE TABLE s (id INT, b INT) TBLPROPERTIES ('row-tracking.enabled' = 'true')") - sql("INSERT INTO s VALUES (1, 11), (2, 22)") - - sql("CREATE TABLE t (id INT, b INT) TBLPROPERTIES ('row-tracking.enabled' = 'true')") - sql("INSERT INTO t SELECT /*+ REPARTITION(1) */ id, id AS b FROM range(2, 4)") - checkAnswer( - sql("SELECT *, _ROW_ID, _SEQUENCE_NUMBER FROM t ORDER BY id"), - Seq(Row(2, 2, 0, 1), Row(3, 3, 1, 1)) - ) - - sql(""" - |MERGE INTO t - |USING s - |ON t.id = s.id - |WHEN MATCHED THEN UPDATE SET t.b = s.b - |WHEN NOT MATCHED THEN INSERT * - |""".stripMargin) - checkAnswer( - sql("SELECT *, _ROW_ID, _SEQUENCE_NUMBER FROM t ORDER BY id"), - Seq(Row(1, 11, 2, 2), Row(2, 22, 0, 2), Row(3, 3, 1, 1)) - ) - } + Seq(false, true).foreach { + bucketEnable => + { + test(s"Row Tracking: merge into table, bucket: $bucketEnable") { + withTable("s", "t") { + sql( + s"CREATE TABLE s (id INT, b INT) TBLPROPERTIES ('row-tracking.enabled' = 'true' ${bucketProperties(bucketEnable)})") + sql("INSERT INTO s VALUES (1, 11), (2, 22)") + + sql( + s"CREATE TABLE t (id INT, b INT) TBLPROPERTIES ('row-tracking.enabled' = 'true' ${bucketProperties(bucketEnable)})") + sql("INSERT INTO t VALUES(2, 2),(4, 4)") + + checkAnswer( + sql("SELECT *, _ROW_ID, _SEQUENCE_NUMBER FROM t ORDER BY id"), + Seq(Row(2, 2, 0, 1), Row(4, 4, 1, 1)) + ) + + sql(""" + |MERGE INTO t + |USING s + |ON t.id = s.id + |WHEN MATCHED THEN UPDATE SET t.b = s.b + |WHEN NOT MATCHED THEN INSERT * + |""".stripMargin) + checkAnswer( + sql("SELECT *, _ROW_ID, _SEQUENCE_NUMBER FROM t ORDER BY id"), + Seq(Row(1, 11, 2, 2), Row(2, 22, 0, 2), Row(4, 4, 1, 1)) + ) + } + } + } } - test("Row Tracking: merge into table with only insert") { - withTable("s", "t") { - sql("CREATE TABLE s (id INT, b INT) TBLPROPERTIES ('row-tracking.enabled' = 'true')") - sql("INSERT INTO s VALUES (1, 11), (2, 22)") - - sql("CREATE TABLE t (id INT, b INT) TBLPROPERTIES ('row-tracking.enabled' = 'true')") - sql("INSERT INTO t SELECT /*+ REPARTITION(1) */ id, id AS b FROM range(2, 4)") - - sql(""" - |MERGE INTO t - |USING s - |ON t.id = s.id - |WHEN NOT MATCHED THEN INSERT * - |""".stripMargin) - checkAnswer( - sql("SELECT *, _ROW_ID, _SEQUENCE_NUMBER FROM t ORDER BY id"), - Seq(Row(1, 11, 2, 2), Row(2, 2, 0, 1), Row(3, 3, 1, 1)) - ) - } + Seq(false, true).foreach { + bucketEnable => + { + test(s"Row Tracking: merge into table with only insert, bucket:$bucketEnable") { + withTable("s", "t") { + sql( + s"CREATE TABLE s (id INT, b INT) TBLPROPERTIES ('row-tracking.enabled' = 'true' ${bucketProperties(bucketEnable)})") + sql("INSERT INTO s VALUES (1, 11), (2, 22)") + + sql( + s"CREATE TABLE t (id INT, b INT) TBLPROPERTIES ('row-tracking.enabled' = 'true' ${bucketProperties(bucketEnable)})") + sql("INSERT INTO t VALUES(2, 2),(4, 4)") + + sql(""" + |MERGE INTO t + |USING s + |ON t.id = s.id + |WHEN NOT MATCHED THEN INSERT * + |""".stripMargin) + checkAnswer( + sql("SELECT *, _ROW_ID, _SEQUENCE_NUMBER FROM t ORDER BY id"), + Seq(Row(1, 11, 2, 2), Row(2, 2, 0, 1), Row(4, 4, 1, 1)) + ) + } + } + } } - test("Row Tracking: merge into table with only delete") { - withTable("s", "t") { - sql("CREATE TABLE s (id INT, b INT) TBLPROPERTIES ('row-tracking.enabled' = 'true')") - sql("INSERT INTO s VALUES (1, 11), (2, 22)") - - sql("CREATE TABLE t (id INT, b INT) TBLPROPERTIES ('row-tracking.enabled' = 'true')") - sql("INSERT INTO t SELECT /*+ REPARTITION(1) */ id, id AS b FROM range(2, 4)") - - sql(""" - |MERGE INTO t - |USING s - |ON t.id = s.id - |WHEN MATCHED THEN DELETE - |""".stripMargin) - checkAnswer( - sql("SELECT *, _ROW_ID, _SEQUENCE_NUMBER FROM t ORDER BY id"), - Seq(Row(3, 3, 1, 1)) - ) - } + Seq(false, true).foreach { + bucketEnable => + { + test(s"Row Tracking: merge into table with only delete, bucket: $bucketEnable") { + withTable("s", "t") { + sql( + s"CREATE TABLE s (id INT, b INT) TBLPROPERTIES ('row-tracking.enabled' = 'true' ${bucketProperties(bucketEnable)})") + sql("INSERT INTO s VALUES (1, 11), (2, 22)") + + sql( + s"CREATE TABLE t (id INT, b INT) TBLPROPERTIES ('row-tracking.enabled' = 'true' ${bucketProperties(bucketEnable)})") + sql("INSERT INTO t VALUES(2, 2),(4, 4)") + + sql(""" + |MERGE INTO t + |USING s + |ON t.id = s.id + |WHEN MATCHED THEN DELETE + |""".stripMargin) + checkAnswer( + sql("SELECT *, _ROW_ID, _SEQUENCE_NUMBER FROM t ORDER BY id"), + Seq(Row(4, 4, 1, 1)) + ) + } + } + } } - test("Row Tracking: merge into table with only update") { - withTable("s", "t") { - sql("CREATE TABLE s (id INT, b INT) TBLPROPERTIES ('row-tracking.enabled' = 'true')") - sql("INSERT INTO s VALUES (1, 11), (2, 22)") - - sql("CREATE TABLE t (id INT, b INT) TBLPROPERTIES ('row-tracking.enabled' = 'true')") - sql("INSERT INTO t SELECT /*+ REPARTITION(1) */ id, id AS b FROM range(2, 4)") - - sql(""" - |MERGE INTO t - |USING s - |ON t.id = s.id - |WHEN MATCHED THEN UPDATE SET * - |""".stripMargin) - checkAnswer( - sql("SELECT *, _ROW_ID, _SEQUENCE_NUMBER FROM t ORDER BY id"), - Seq(Row(2, 22, 0, 2), Row(3, 3, 1, 1)) - ) - } + Seq(false, true).foreach { + bucketEnable => + { + test(s"Row Tracking: merge into table with only update, bucket: $bucketEnable") { + withTable("s", "t") { + sql( + s"CREATE TABLE s (id INT, b INT) TBLPROPERTIES ('row-tracking.enabled' = 'true' ${bucketProperties(bucketEnable)})") + sql("INSERT INTO s VALUES (1, 11), (2, 22)") + + sql( + s"CREATE TABLE t (id INT, b INT) TBLPROPERTIES ('row-tracking.enabled' = 'true' ${bucketProperties(bucketEnable)})") + sql("INSERT INTO t VALUES(2, 2),(4, 4)") + + sql(""" + |MERGE INTO t + |USING s + |ON t.id = s.id + |WHEN MATCHED THEN UPDATE SET * + |""".stripMargin) + checkAnswer( + sql("SELECT *, _ROW_ID, _SEQUENCE_NUMBER FROM t ORDER BY id"), + Seq(Row(2, 22, 0, 2), Row(4, 4, 1, 1)) + ) + } + } + } } - test("Data Evolution: insert into table with data-evolution") { - withTable("s", "t") { - sql("CREATE TABLE s (id INT, b INT)") - sql("INSERT INTO s VALUES (1, 11), (2, 22)") - - sql( - "CREATE TABLE t (id INT, b INT, c INT) TBLPROPERTIES ('row-tracking.enabled' = 'true', 'data-evolution.enabled' = 'true')") - sql("INSERT INTO t SELECT /*+ REPARTITION(1) */ id, id AS b, id AS c FROM range(2, 4)") - - sql(""" - |MERGE INTO t - |USING s - |ON t.id = s.id - |WHEN NOT MATCHED THEN INSERT (id, b, c) VALUES (id, b, 11) - |""".stripMargin) - - checkAnswer( - sql("SELECT *, _ROW_ID, _SEQUENCE_NUMBER FROM t ORDER BY id"), - Seq(Row(1, 11, 11, 2, 2), Row(2, 2, 2, 0, 1), Row(3, 3, 3, 1, 1)) - ) - } + Seq(false, true).foreach { + bucketEnable => + { + test(s"Data Evolution: insert into table with data-evolution, bucket: $bucketEnable") { + withTable("s", "t") { + sql("CREATE TABLE s (id INT, b INT)") + sql("INSERT INTO s VALUES (1, 11), (2, 22)") + + sql( + s"CREATE TABLE t (id INT, b INT, c INT) TBLPROPERTIES ('row-tracking.enabled' = 'true', 'data-evolution.enabled' = 'true' ${bucketProperties(bucketEnable)})") + sql("INSERT INTO t VALUES(2, 2, 2),(4, 4, 4)") + + sql(""" + |MERGE INTO t + |USING s + |ON t.id = s.id + |WHEN NOT MATCHED THEN INSERT (id, b, c) VALUES (id, b, 11) + |""".stripMargin) + + checkAnswer( + sql("SELECT *, _ROW_ID, _SEQUENCE_NUMBER FROM t ORDER BY id"), + Seq(Row(1, 11, 11, 2, 2), Row(2, 2, 2, 0, 1), Row(4, 4, 4, 1, 1)) + ) + } + } + } } - test("Data Evolution: insert into table with data-evolution partial insert") { - withTable("s", "t") { - sql("CREATE TABLE s (id INT, b INT)") - sql("INSERT INTO s VALUES (1, 11), (2, 22)") - - sql( - "CREATE TABLE t (id INT, b INT, c INT) TBLPROPERTIES ('row-tracking.enabled' = 'true', 'data-evolution.enabled' = 'true')") - sql("INSERT INTO t SELECT /*+ REPARTITION(1) */ id, id AS b, id AS c FROM range(2, 4)") - - sql(""" - |MERGE INTO t - |USING s - |ON t.id = s.id - |WHEN NOT MATCHED THEN INSERT (id, b) VALUES (-1, b) - |""".stripMargin) - - sql(""" - |MERGE INTO t - |USING s - |ON t.id = s.id - |WHEN NOT MATCHED THEN INSERT (b) VALUES (b) - |""".stripMargin) - - sql(""" - |MERGE INTO t - |USING s - |ON t.id = s.id - |WHEN NOT MATCHED THEN INSERT (id, c) VALUES (3, 4) - |""".stripMargin) + Seq(false, true).foreach { + bucketEnable => + { + test( + s"Data Evolution: insert into table with data-evolution partial insert, bucket: $bucketEnable") { + withTable("s", "t") { + sql("CREATE TABLE s (id INT, b INT)") + sql("INSERT INTO s VALUES (1, 11), (2, 22)") + + sql( + s"CREATE TABLE t (id INT, b INT, c INT) TBLPROPERTIES ('row-tracking.enabled' = 'true', 'data-evolution.enabled' = 'true' ${bucketProperties(bucketEnable)})") + sql("INSERT INTO t SELECT /*+ REPARTITION(1) */ id, id AS b, id AS c FROM range(2, 4)") + + sql(""" + |MERGE INTO t + |USING s + |ON t.id = s.id + |WHEN NOT MATCHED THEN INSERT (id, b) VALUES (-1, b) + |""".stripMargin) + + sql(""" + |MERGE INTO t + |USING s + |ON t.id = s.id + |WHEN NOT MATCHED THEN INSERT (b) VALUES (b) + |""".stripMargin) + + sql(""" + |MERGE INTO t + |USING s + |ON t.id = s.id + |WHEN NOT MATCHED THEN INSERT (id, c) VALUES (3, 4) + |""".stripMargin) + + checkAnswer( + sql("SELECT * FROM t ORDER BY id"), + Seq( + Row(null, 11, null), + Row(-1, 11, null), + Row(2, 2, 2), + Row(3, 3, 3), + Row(3, null, 4)) + ) + } + } + } + } - checkAnswer( - sql("SELECT * FROM t ORDER BY id"), - Seq(Row(null, 11, null), Row(-1, 11, null), Row(2, 2, 2), Row(3, 3, 3), Row(3, null, 4)) - ) - } + Seq(false, true).foreach { + bucketEnable => + Seq("parquet", "avro").foreach { + format => + { + test( + s"Data Evolution: merge into table with data-evolution, bucket: $bucketEnable, format: $format") { + withTable("s", "t") { + sql("CREATE TABLE s (id INT, b INT)") + sql("INSERT INTO s VALUES (1, 11), (2, 22)") + + sql( + s"CREATE TABLE t (id INT, b INT, c INT) TBLPROPERTIES ('row-tracking.enabled' = 'true', 'data-evolution.enabled' = 'true','file.format' = '$format' ${bucketProperties(bucketEnable)})") + sql("INSERT INTO t SELECT /*+ REPARTITION(1) */ * FROM VALUES (2, 2, 2),(4, 4, 4)") + + sql(""" + |MERGE INTO t + |USING s + |ON t.id = s.id + |WHEN MATCHED THEN UPDATE SET t.b = s.b + |WHEN NOT MATCHED THEN INSERT (id, b, c) VALUES (id, b, 11) + |""".stripMargin) + checkAnswer(sql("SELECT count(*) FROM t"), Seq(Row(3))) + checkAnswer( + sql("SELECT *, _ROW_ID, _SEQUENCE_NUMBER FROM t ORDER BY id"), + Seq(Row(1, 11, 11, 2, 2), Row(2, 22, 2, 0, 2), Row(4, 4, 4, 1, 2)) + ) + } + } + } + } } - test("Data Evolution: merge into table with data-evolution") { - Seq("parquet", "avro").foreach { - format => - withTable("s", "t") { - sql("CREATE TABLE s (id INT, b INT)") - sql("INSERT INTO s VALUES (1, 11), (2, 22)") - - sql(s"""CREATE TABLE t (id INT, b INT, c INT) TBLPROPERTIES - |('row-tracking.enabled' = 'true', - |'data-evolution.enabled' = 'true', - |'file.format' = '$format' - |)""".stripMargin) - sql("INSERT INTO t SELECT /*+ REPARTITION(1) */ id, id AS b, id AS c FROM range(2, 4)") - - sql(""" - |MERGE INTO t - |USING s - |ON t.id = s.id - |WHEN MATCHED THEN UPDATE SET t.b = s.b - |WHEN NOT MATCHED THEN INSERT (id, b, c) VALUES (id, b, 11) - |""".stripMargin) - checkAnswer(sql("SELECT count(*) FROM t"), Seq(Row(3))) - checkAnswer( - sql("SELECT *, _ROW_ID, _SEQUENCE_NUMBER FROM t ORDER BY id"), - Seq(Row(1, 11, 11, 2, 2), Row(2, 22, 2, 0, 2), Row(3, 3, 3, 1, 2)) - ) + Seq(false, true).foreach { + bucketEnable => + { + test( + s"Data Evolution: merge into table with data-evolution complex, bucket: $bucketEnable") { + withTable("source", "target") { + sql("CREATE TABLE source (id INT, b INT, c STRING)") + batchInsert( + Seq( + (1, 100, "c11"), + (3, 300, "c33"), + (5, 500, "c55"), + (7, 700, "c77"), + (9, 900, "c99")), + "source") + + sql( + s"CREATE TABLE target (id INT, b INT, c STRING) TBLPROPERTIES ('row-tracking.enabled' = 'true', 'data-evolution.enabled' = 'true' ${bucketProperties(bucketEnable)})") + batchInsert( + Seq((1, 10, "c1"), (2, 20, "c2"), (3, 30, "c3"), (4, 40, "c4"), (5, 50, "c5")), + "target") + + sql(s""" + |MERGE INTO target + |USING source + |ON target.id = source.id + |WHEN MATCHED AND target.id = 5 THEN UPDATE SET b = source.b + target.b + |WHEN MATCHED AND source.c > 'c2' THEN UPDATE SET b = source.b, c = source.c + |WHEN NOT MATCHED AND c > 'c9' THEN INSERT (id, b, c) VALUES (id, b * 1.1, c) + |WHEN NOT MATCHED THEN INSERT * + |""".stripMargin) + checkAnswer( + sql("SELECT *, _ROW_ID, _SEQUENCE_NUMBER FROM target ORDER BY id"), + Seq( + Row(1, 10, "c1", 0, 6), + Row(2, 20, "c2", 1, 2), + Row(3, 300, "c33", 2, 6), + Row(4, 40, "c4", 3, 4), + Row(5, 550, "c5", 4, 6), + Row(7, 700, "c77", 5, 6), + Row(9, 990, "c99", 6, 6)) + ) + } } - } + } } - test("Data Evolution: merge into table with data-evolution complex") { - withTable("source", "target") { - sql("CREATE TABLE source (a INT, b INT, c STRING)") - sql( - "INSERT INTO source VALUES (1, 100, 'c11'), (3, 300, 'c33'), (5, 500, 'c55'), (7, 700, 'c77'), (9, 900, 'c99')") + test( + "Data Evolution: merge into bucket table with data-evolution update bucket key throw exception") { + withTable("s", "t") { + sql("CREATE TABLE s (id INT, b INT)") + sql("INSERT INTO s VALUES (1, 11), (2, 22)") sql( - "CREATE TABLE target (a INT, b INT, c STRING) TBLPROPERTIES ('row-tracking.enabled' = 'true', 'data-evolution.enabled' = 'true')") - sql( - "INSERT INTO target values (1, 10, 'c1'), (2, 20, 'c2'), (3, 30, 'c3'), (4, 40, 'c4'), (5, 50, 'c5')") - - sql(s""" - |MERGE INTO target - |USING source - |ON target.a = source.a - |WHEN MATCHED AND target.a = 5 THEN UPDATE SET b = source.b + target.b - |WHEN MATCHED AND source.c > 'c2' THEN UPDATE SET b = source.b, c = source.c - |WHEN NOT MATCHED AND c > 'c9' THEN INSERT (a, b, c) VALUES (a, b * 1.1, c) - |WHEN NOT MATCHED THEN INSERT * - |""".stripMargin) - checkAnswer( - sql("SELECT *, _ROW_ID, _SEQUENCE_NUMBER FROM target ORDER BY a"), - Seq( - Row(1, 10, "c1", 0, 2), - Row(2, 20, "c2", 1, 2), - Row(3, 300, "c33", 2, 2), - Row(4, 40, "c4", 3, 2), - Row(5, 550, "c5", 4, 2), - Row(7, 700, "c77", 5, 2), - Row(9, 990, "c99", 6, 2)) - ) + s"CREATE TABLE t (id INT, b INT, c INT) TBLPROPERTIES ('row-tracking.enabled' = 'true', 'data-evolution.enabled' = 'true' ${bucketProperties(true)})") + + assertThrows[RuntimeException] { + sql(""" + |MERGE INTO t + |USING s + |ON t.id = s.id + |WHEN MATCHED THEN UPDATE SET t.id = s.id + |WHEN NOT MATCHED THEN INSERT (id, b, c) VALUES (id, b, 11) + |""".stripMargin) + } } } @@ -429,6 +544,7 @@ abstract class RowTrackingTestBase extends PaimonSparkTestBase { findSplitsPlan = qe.analyzed } } + override def onFailure(funcName: String, qe: QueryExecution, exception: Exception): Unit = { if (qe.analyzed.collectFirst { case _: Deduplicate => true }.nonEmpty) { latch.countDown() @@ -482,6 +598,7 @@ abstract class RowTrackingTestBase extends PaimonSparkTestBase { updatePlan = qe.analyzed } } + override def onFailure(funcName: String, qe: QueryExecution, exception: Exception): Unit = { if (qe.analyzed.collectFirst { case _: MergeRows => true }.nonEmpty) { latch.countDown() @@ -524,68 +641,90 @@ abstract class RowTrackingTestBase extends PaimonSparkTestBase { } } - test("Data Evolution: update table throws exception") { - withTable("t") { - sql( - "CREATE TABLE t (id INT, b INT, c INT) TBLPROPERTIES ('row-tracking.enabled' = 'true', 'data-evolution.enabled' = 'true')") - sql("INSERT INTO t SELECT /*+ REPARTITION(1) */ id, id AS b, id AS c FROM range(2, 4)") - assert( - intercept[RuntimeException] { - sql("UPDATE t SET b = 22") - }.getMessage - .contains("Update operation is not supported when data evolution is enabled yet.")) - } + Seq(false, true).foreach { + bucketEnable => + { + test(s"Data Evolution: update table throws exception, bucket: $bucketEnable") { + withTable("t") { + sql( + s"CREATE TABLE t (id INT, b INT, c INT) TBLPROPERTIES ('row-tracking.enabled' = 'true', 'data-evolution.enabled' = 'true' ${bucketProperties(bucketEnable)})") + sql("INSERT INTO t SELECT /*+ REPARTITION(1) */ id, id AS b, id AS c FROM range(2, 4)") + assert( + intercept[RuntimeException] { + sql("UPDATE t SET b = 22") + }.getMessage + .contains("Update operation is not supported when data evolution is enabled yet.")) + } + } + } } - test("Data Evolution: delete table throws exception") { - withTable("t") { - sql( - "CREATE TABLE t (id INT, b INT, c INT) TBLPROPERTIES ('row-tracking.enabled' = 'true', 'data-evolution.enabled' = 'true')") - sql("INSERT INTO t SELECT /*+ REPARTITION(1) */ id, id AS b, id AS c FROM range(2, 4)") - assert( - intercept[RuntimeException] { - sql("DELETE FROM t WHERE id = 2") - }.getMessage - .contains("Delete operation is not supported when data evolution is enabled yet.")) - } + Seq(false, true).foreach { + bucketEnable => + { + test(s"Data Evolution: delete table throws exception, bucket: $bucketEnable") { + withTable("t") { + sql( + s"CREATE TABLE t (id INT, b INT, c INT) TBLPROPERTIES ('row-tracking.enabled' = 'true', 'data-evolution.enabled' = 'true' ${bucketProperties(bucketEnable)})") + sql("INSERT INTO t SELECT /*+ REPARTITION(1) */ id, id AS b, id AS c FROM range(2, 4)") + assert( + intercept[RuntimeException] { + sql("DELETE FROM t WHERE id = 2") + }.getMessage + .contains("Delete operation is not supported when data evolution is enabled yet.")) + } + } + } } - test("Row Tracking: merge into table not matched by source") { - if (gteqSpark3_4) { - withTable("source", "target") { - sql( - "CREATE TABLE source (a INT, b INT, c STRING) TBLPROPERTIES ('row-tracking.enabled' = 'true')") - sql( - "INSERT INTO source VALUES (1, 100, 'c11'), (3, 300, 'c33'), (5, 500, 'c55'), (7, 700, 'c77'), (9, 900, 'c99')") - - sql( - "CREATE TABLE target (a INT, b INT, c STRING) TBLPROPERTIES ('row-tracking.enabled' = 'true')") - sql( - "INSERT INTO target values (1, 10, 'c1'), (2, 20, 'c2'), (3, 30, 'c3'), (4, 40, 'c4'), (5, 50, 'c5')") - - sql(s""" - |MERGE INTO target - |USING source - |ON target.a = source.a - |WHEN MATCHED AND target.a = 5 THEN UPDATE SET b = source.b + target.b - |WHEN MATCHED AND source.c > 'c2' THEN UPDATE SET * - |WHEN NOT MATCHED AND c > 'c9' THEN INSERT (a, b, c) VALUES (a, b * 1.1, c) - |WHEN NOT MATCHED THEN INSERT * - |WHEN NOT MATCHED BY SOURCE AND a = 2 THEN UPDATE SET b = b * 10 - |WHEN NOT MATCHED BY SOURCE THEN DELETE - |""".stripMargin) - checkAnswer( - sql("SELECT *, _ROW_ID, _SEQUENCE_NUMBER FROM target ORDER BY a"), - Seq( - Row(1, 10, "c1", 0, 1), - Row(2, 200, "c2", 1, 2), - Row(3, 300, "c33", 2, 2), - Row(5, 550, "c5", 4, 2), - Row(7, 700, "c77", 5, 2), - Row(9, 990, "c99", 6, 2)) - ) + Seq(false, true).foreach { + bucketEnable => + { + test(s"Row Tracking: merge into table not matched by source, bucket: $bucketEnable") { + if (gteqSpark3_4) { + withTable("source", "target") { + sql( + s"CREATE TABLE source (id INT, b INT, c STRING) TBLPROPERTIES ('row-tracking.enabled' = 'true' ${bucketProperties(bucketEnable)})") + batchInsert( + Seq( + (1, 100, "c11"), + (3, 300, "c33"), + (5, 500, "c55"), + (7, 700, "c77"), + (9, 900, "c99")), + "source") + + sql( + s"CREATE TABLE target (id INT, b INT, c STRING) TBLPROPERTIES ('row-tracking.enabled' = 'true' ${bucketProperties(bucketEnable)})") + batchInsert( + Seq((1, 10, "c1"), (2, 20, "c2"), (3, 30, "c3"), (4, 40, "c4"), (5, 50, "c5")), + "target") + + sql(s""" + |MERGE INTO target + |USING source + |ON target.id = source.id + |WHEN MATCHED AND target.id = 5 THEN UPDATE SET b = source.b + target.b + |WHEN MATCHED AND source.c > 'c2' THEN UPDATE SET * + |WHEN NOT MATCHED AND c > 'c9' THEN INSERT (id, b, c) VALUES (id, b * 1.1, c) + |WHEN NOT MATCHED THEN INSERT * + |WHEN NOT MATCHED BY SOURCE AND id = 2 THEN UPDATE SET b = b * 10 + |WHEN NOT MATCHED BY SOURCE THEN DELETE + |""".stripMargin) + checkAnswer( + sql("SELECT *, _ROW_ID, _SEQUENCE_NUMBER FROM target ORDER BY id"), + Seq( + Row(1, 10, "c1", 0, 1), + Row(2, 200, "c2", 1, 6), + Row(3, 300, "c33", 2, 6), + Row(5, 550, "c5", 4, 6), + Row(7, 700, "c77", 5, 6), + Row(9, 990, "c99", 6, 6)) + ) + } + } + } } - } } test("Data Evolution: compact fields action") { @@ -625,4 +764,48 @@ abstract class RowTrackingTestBase extends PaimonSparkTestBase { ) } } + + def bucketProperties(enableBucket: Boolean): String = { + if (enableBucket) { + ",'bucket'='2','bucket-key'='id', 'bucket-append-ordered'='false'" + } else { + "" + } + } + + def formatValue(value: Any): String = { + if (value == null) return "NULL" + + value match { + case b: java.lang.Boolean => if (b.booleanValue()) "TRUE" else "FALSE" + case b: Boolean => if (b) "TRUE" else "FALSE" + + case n: Byte => n.toString + case n: Short => n.toString + case n: Int => n.toString + case n: Long => n.toString + case n: Float => n.toString + case n: Double => n.toString + case n: BigInt => n.toString + case n: BigDecimal => n.bigDecimal.toPlainString + + case s: String => s"'${escapeSingleQuotes(s)}'" + case c: Char => s"'${escapeSingleQuotes(c.toString)}'" + + case other => s"'${escapeSingleQuotes(String.valueOf(other))}'" + } + } + + def batchInsert(rows: Seq[Product], table: String): String = { + rows + .map { + p => + val values = p.productIterator.toList + val args = values.map(formatValue).mkString(", ") + sql(s"INSERT INTO $table VALUES ($args)") + } + .mkString("") + } + + private def escapeSingleQuotes(s: String): String = s.replace("'", "''") }