diff --git a/docs/static/img/chain-table.png b/docs/static/img/chain-table.png index 19fbc0668519..c55e9e5e348a 100644 Binary files a/docs/static/img/chain-table.png and b/docs/static/img/chain-table.png differ diff --git a/paimon-core/src/main/java/org/apache/paimon/utils/ChainTableUtils.java b/paimon-core/src/main/java/org/apache/paimon/utils/ChainTableUtils.java index 9148f26ec6a3..c508a38e24a3 100644 --- a/paimon-core/src/main/java/org/apache/paimon/utils/ChainTableUtils.java +++ b/paimon-core/src/main/java/org/apache/paimon/utils/ChainTableUtils.java @@ -25,6 +25,11 @@ import org.apache.paimon.partition.PartitionTimeExtractor; import org.apache.paimon.predicate.Predicate; import org.apache.paimon.predicate.PredicateBuilder; +import org.apache.paimon.table.ChainGroupReadTable; +import org.apache.paimon.table.FallbackReadFileStoreTable; +import org.apache.paimon.table.FileStoreTable; +import org.apache.paimon.table.sink.BatchTableCommit; +import org.apache.paimon.table.sink.CommitMessage; import org.apache.paimon.types.RowType; import java.time.LocalDateTime; @@ -204,4 +209,49 @@ public static LinkedHashMap calPartValues( } return res; } + + public static void postCommit( + FileStoreTable table, boolean overwrite, List commitMessages) { + if (overwrite) { + FileStoreTable candidateTbl = table; + if (table instanceof FallbackReadFileStoreTable) { + candidateTbl = + ((ChainGroupReadTable) ((FallbackReadFileStoreTable) table).fallback()) + .wrapped(); + } + FileStoreTable snapshotTbl = + candidateTbl.switchToBranch(table.coreOptions().scanFallbackSnapshotBranch()); + InternalRowPartitionComputer partitionComputer = + new InternalRowPartitionComputer( + table.coreOptions().partitionDefaultName(), + table.schema().logicalPartitionType(), + table.schema().partitionKeys().toArray(new String[0]), + table.coreOptions().legacyPartitionName()); + List overwritePartitions = + commitMessages.stream() + .map(CommitMessage::partition) + .distinct() + .collect(Collectors.toList()); + if (!overwritePartitions.isEmpty()) { + List> candidatePartitions = + overwritePartitions.stream() + .map(partition -> partitionComputer.generatePartValues(partition)) + .collect(Collectors.toList()); + try (BatchTableCommit commit = snapshotTbl.newBatchWriteBuilder().newCommit()) { + commit.truncatePartitions(candidatePartitions); + } catch (Exception e) { + throw new RuntimeException( + String.format( + "Failed to truncate partitions in snapshot table %s.", + candidatePartitions), + e); + } + } + } + } + + public static boolean chainScanFallbackDeltaBranch(CoreOptions options) { + return options.isChainTable() + && options.scanFallbackDeltaBranch().equalsIgnoreCase(options.branch()); + } } diff --git a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/WriteIntoPaimonTable.scala b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/WriteIntoPaimonTable.scala index 4025725f0204..94f90db13da5 100644 --- a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/WriteIntoPaimonTable.scala +++ b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/WriteIntoPaimonTable.scala @@ -23,6 +23,8 @@ import org.apache.paimon.options.Options import org.apache.paimon.spark._ import org.apache.paimon.spark.catalyst.analysis.expressions.ExpressionHelper import org.apache.paimon.table.FileStoreTable +import org.apache.paimon.table.sink.CommitMessage +import org.apache.paimon.utils.ChainTableUtils import org.apache.spark.internal.Logging import org.apache.spark.sql.{DataFrame, Row, SparkSession} @@ -57,7 +59,7 @@ case class WriteIntoPaimonTable( } val commitMessages = writer.write(data) writer.commit(commitMessages) - + postCommit(dynamicPartitionOverwriteMode, overwritePartition, commitMessages) Seq.empty } @@ -82,6 +84,18 @@ case class WriteIntoPaimonTable( (dynamicPartitionOverwriteMode, overwritePartition) } + private def postCommit( + dynamicPartitionOverwrite: Boolean, + staticOverwritePartition: Map[String, String], + commitMessages: Seq[CommitMessage]): Unit = { + if (ChainTableUtils.chainScanFallbackDeltaBranch(table.coreOptions())) { + ChainTableUtils.postCommit( + table, + dynamicPartitionOverwrite || staticOverwritePartition != null, + commitMessages.toList.asJava) + } + } + override def withNewChildrenInternal(newChildren: IndexedSeq[LogicalPlan]): LogicalPlan = this.asInstanceOf[WriteIntoPaimonTable] } diff --git a/paimon-spark/paimon-spark-ut/src/test/java/org/apache/paimon/spark/SparkCatalogWithHiveTest.java b/paimon-spark/paimon-spark-ut/src/test/java/org/apache/paimon/spark/SparkCatalogWithHiveTest.java index aaaaec1854bc..583522822b9e 100644 --- a/paimon-spark/paimon-spark-ut/src/test/java/org/apache/paimon/spark/SparkCatalogWithHiveTest.java +++ b/paimon-spark/paimon-spark-ut/src/test/java/org/apache/paimon/spark/SparkCatalogWithHiveTest.java @@ -174,428 +174,4 @@ private SparkSession.Builder createSessionBuilder() { "org.apache.paimon.spark.extensions.PaimonSparkSessionExtensions") .master("local[2]"); } - - @Test - public void testChainTable(@TempDir java.nio.file.Path tempDir) throws IOException { - Path warehousePath = new Path("file:" + tempDir.toString()); - SparkSession.Builder builder = - SparkSession.builder() - .config("spark.sql.warehouse.dir", warehousePath.toString()) - // with hive metastore - .config("spark.sql.catalogImplementation", "hive") - .config("hive.metastore.uris", "thrift://localhost:" + PORT) - .config("spark.sql.catalog.spark_catalog", SparkCatalog.class.getName()) - .config("spark.sql.catalog.spark_catalog.metastore", "hive") - .config( - "spark.sql.catalog.spark_catalog.hive.metastore.uris", - "thrift://localhost:" + PORT) - .config("spark.sql.catalog.spark_catalog.format-table.enabled", "true") - .config( - "spark.sql.catalog.spark_catalog.warehouse", - warehousePath.toString()) - .config( - "spark.sql.extensions", - "org.apache.paimon.spark.extensions.PaimonSparkSessionExtensions") - .master("local[2]"); - SparkSession spark = builder.getOrCreate(); - spark.sql("CREATE DATABASE IF NOT EXISTS my_db1"); - spark.sql("USE spark_catalog.my_db1"); - - /** Create table */ - spark.sql( - "CREATE TABLE IF NOT EXISTS \n" - + " `my_db1`.`chain_test` (\n" - + " `t1` BIGINT COMMENT 't1',\n" - + " `t2` BIGINT COMMENT 't2',\n" - + " `t3` STRING COMMENT 't3'\n" - + " ) PARTITIONED BY (`dt` STRING COMMENT 'dt') ROW FORMAT SERDE 'org.apache.paimon.hive.PaimonSerDe'\n" - + "WITH\n" - + " SERDEPROPERTIES ('serialization.format' = '1') STORED AS INPUTFORMAT 'org.apache.paimon.hive.mapred.PaimonInputFormat' OUTPUTFORMAT 'org.apache.paimon.hive.mapred.PaimonOutputFormat' TBLPROPERTIES (\n" - + " 'bucket-key' = 't1',\n" - + " 'primary-key' = 'dt,t1',\n" - + " 'partition.timestamp-pattern' = '$dt',\n" - + " 'partition.timestamp-formatter' = 'yyyyMMdd',\n" - + " 'chain-table.enabled' = 'true',\n" - + " 'bucket' = '2',\n" - + " 'merge-engine' = 'deduplicate', \n" - + " 'sequence.field' = 't2'\n" - + " )"); - - /** Create branch */ - spark.sql("CALL sys.create_branch('my_db1.chain_test', 'snapshot');"); - spark.sql("CALL sys.create_branch('my_db1.chain_test', 'delta')"); - - /** Set branch */ - spark.sql( - "ALTER TABLE my_db1.chain_test SET tblproperties (" - + "'scan.fallback-snapshot-branch' = 'snapshot', " - + "'scan.fallback-delta-branch' = 'delta')"); - spark.sql( - "ALTER TABLE `my_db1`.`chain_test$branch_snapshot` SET tblproperties (" - + "'scan.fallback-snapshot-branch' = 'snapshot'," - + "'scan.fallback-delta-branch' = 'delta')"); - spark.sql( - "ALTER TABLE `my_db1`.`chain_test$branch_delta` SET tblproperties (" - + "'scan.fallback-snapshot-branch' = 'snapshot'," - + "'scan.fallback-delta-branch' = 'delta')"); - spark.close(); - spark = builder.getOrCreate(); - - /** Write main branch */ - spark.sql( - "insert overwrite table `my_db1`.`chain_test` partition (dt = '20250810') values (1, 1, '1'),(2, 1, '1');"); - - /** Write delta branch */ - spark.sql("set spark.paimon.branch=delta;"); - spark.sql( - "insert overwrite table `my_db1`.`chain_test` partition (dt = '20250809') values (1, 1, '1'),(2, 1, '1');"); - spark.sql( - "insert overwrite table `my_db1`.`chain_test` partition (dt = '20250810') values (1, 2, '1-1' ),(3, 1, '1' );"); - spark.sql( - "insert overwrite table `my_db1`.`chain_test` partition (dt = '20250811') values (2, 2, '1-1' ),(4, 1, '1' );"); - spark.sql( - "insert overwrite table `my_db1`.`chain_test` partition (dt = '20250812') values (3, 2, '1-1' ),(4, 2, '1-1' );"); - spark.sql( - "insert overwrite table `my_db1`.`chain_test` partition (dt = '20250813') values (5, 1, '1' ),(6, 1, '1' );"); - spark.sql( - "insert overwrite table `my_db1`.`chain_test` partition (dt = '20250814') values (5, 2, '1-1' ),(6, 2, '1-1' );"); - - /** Write snapshot branch */ - spark.sql("set spark.paimon.branch=snapshot;"); - spark.sql( - "insert overwrite table `my_db1`.`chain_test` partition (dt = '20250810') values (1, 2, '1-1'),(2, 1, '1'),(3, 1, '1');"); - spark.sql( - "insert overwrite table `my_db1`.`chain_test` partition (dt = '20250812') values (1, 2, '1-1'),(2, 2, '1-1'),(3, 2, '1-1'), (4, 2, '1-1');"); - spark.sql( - "insert overwrite table `my_db1`.`chain_test` partition (dt = '20250814') values (1, 2, '1-1'),(2, 2, '1-1'),(3, 2, '1-1'), (4, 2, '1-1'), (5, 1, '1' ), (6, 1, '1');"); - - spark.close(); - spark = builder.getOrCreate(); - /** Main read */ - assertThat( - spark.sql("SELECT * FROM `my_db1`.`chain_test` where dt = '20250810'") - .collectAsList().stream() - .map(Row::toString) - .collect(Collectors.toList())) - .containsExactlyInAnyOrder("[1,1,1,20250810]", "[2,1,1,20250810]"); - - /** Snapshot read */ - assertThat( - spark.sql("SELECT * FROM `my_db1`.`chain_test` where dt = '20250814'") - .collectAsList().stream() - .map(Row::toString) - .collect(Collectors.toList())) - .containsExactlyInAnyOrder( - "[1,2,1-1,20250814]", - "[2,2,1-1,20250814]", - "[3,2,1-1,20250814]", - "[4,2,1-1,20250814]", - "[5,1,1,20250814]", - "[6,1,1,20250814]"); - - /** Chain read */ - /** 1. non pre snapshot */ - assertThat( - spark.sql("SELECT * FROM `my_db1`.`chain_test` where dt = '20250809'") - .collectAsList().stream() - .map(Row::toString) - .collect(Collectors.toList())) - .containsExactlyInAnyOrder("[1,1,1,20250809]", "[2,1,1,20250809]"); - /** 2. has pre snapshot */ - assertThat( - spark.sql("SELECT * FROM `my_db1`.`chain_test` where dt = '20250811'") - .collectAsList().stream() - .map(Row::toString) - .collect(Collectors.toList())) - .containsExactlyInAnyOrder( - "[1,2,1-1,20250811]", - "[2,2,1-1,20250811]", - "[3,1,1,20250811]", - "[4,1,1,20250811]"); - - /** Multi partition Read */ - assertThat( - spark - .sql( - "SELECT * FROM `my_db1`.`chain_test` where dt in ('20250810', '20250811', '20250812');") - .collectAsList().stream() - .map(Row::toString) - .collect(Collectors.toList())) - .containsExactlyInAnyOrder( - "[1,1,1,20250810]", - "[2,1,1,20250810]", - "[1,2,1-1,20250811]", - "[2,2,1-1,20250811]", - "[3,1,1,20250811]", - "[4,1,1,20250811]", - "[1,2,1-1,20250812]", - "[2,2,1-1,20250812]", - "[3,2,1-1,20250812]", - "[4,2,1-1,20250812]"); - - /** Incremental read */ - assertThat( - spark - .sql( - "SELECT * FROM `my_db1`.`chain_test$branch_delta` where dt = '20250811'") - .collectAsList().stream() - .map(Row::toString) - .collect(Collectors.toList())) - .containsExactlyInAnyOrder("[2,2,1-1,20250811]", "[4,1,1,20250811]"); - - /** Multi partition incremental read */ - assertThat( - spark - .sql( - "SELECT * FROM `my_db1`.`chain_test$branch_delta` where dt in ('20250810', '20250811', '20250812');") - .collectAsList().stream() - .map(Row::toString) - .collect(Collectors.toList())) - .containsExactlyInAnyOrder( - "[1,2,1-1,20250810]", - "[3,1,1,20250810]", - "[2,2,1-1,20250811]", - "[4,1,1,20250811]", - "[3,2,1-1,20250812]", - "[4,2,1-1,20250812]"); - - /** Hybrid read */ - assertThat( - spark - .sql( - "select * from `my_db1`.`chain_test` where dt = '20250811'\n" - + "union all\n" - + "select * from `my_db1`.`chain_test$branch_delta` where dt = '20250811'") - .collectAsList().stream() - .map(Row::toString) - .collect(Collectors.toList())) - .containsExactlyInAnyOrder( - "[1,2,1-1,20250811]", - "[2,2,1-1,20250811]", - "[3,1,1,20250811]", - "[4,1,1,20250811]", - "[2,2,1-1,20250811]", - "[4,1,1,20250811]"); - - spark.close(); - spark = builder.getOrCreate(); - - /** Drop table */ - spark.sql("DROP TABLE IF EXISTS `my_db1`.`chain_test`;"); - - spark.close(); - } - - @Test - public void testHourlyChainTable(@TempDir java.nio.file.Path tempDir) throws IOException { - Path warehousePath = new Path("file:" + tempDir.toString()); - SparkSession.Builder builder = - SparkSession.builder() - .config("spark.sql.warehouse.dir", warehousePath.toString()) - // with hive metastore - .config("spark.sql.catalogImplementation", "hive") - .config("hive.metastore.uris", "thrift://localhost:" + PORT) - .config("spark.sql.catalog.spark_catalog", SparkCatalog.class.getName()) - .config("spark.sql.catalog.spark_catalog.metastore", "hive") - .config( - "spark.sql.catalog.spark_catalog.hive.metastore.uris", - "thrift://localhost:" + PORT) - .config("spark.sql.catalog.spark_catalog.format-table.enabled", "true") - .config( - "spark.sql.catalog.spark_catalog.warehouse", - warehousePath.toString()) - .config( - "spark.sql.extensions", - "org.apache.paimon.spark.extensions.PaimonSparkSessionExtensions") - .master("local[2]"); - SparkSession spark = builder.getOrCreate(); - spark.sql("CREATE DATABASE IF NOT EXISTS my_db1"); - spark.sql("USE spark_catalog.my_db1"); - - /** Create table */ - spark.sql( - "CREATE TABLE IF NOT EXISTS \n" - + " `my_db1`.`chain_test` (\n" - + " `t1` BIGINT COMMENT 't1',\n" - + " `t2` BIGINT COMMENT 't2',\n" - + " `t3` STRING COMMENT 't3'\n" - + " ) PARTITIONED BY (`dt` STRING COMMENT 'dt', `hour` STRING COMMENT 'hour') ROW FORMAT SERDE 'org.apache.paimon.hive.PaimonSerDe'\n" - + "WITH\n" - + " SERDEPROPERTIES ('serialization.format' = '1') STORED AS INPUTFORMAT 'org.apache.paimon.hive.mapred.PaimonInputFormat' OUTPUTFORMAT 'org.apache.paimon.hive.mapred.PaimonOutputFormat' TBLPROPERTIES (\n" - + " 'bucket-key' = 't1',\n" - + " 'primary-key' = 'dt,hour,t1',\n" - + " 'partition.timestamp-pattern' = '$dt $hour:00:00',\n" - + " 'partition.timestamp-formatter' = 'yyyyMMdd HH:mm:ss',\n" - + " 'chain-table.enabled' = 'true',\n" - + " 'bucket' = '2',\n" - + " 'merge-engine' = 'deduplicate', \n" - + " 'sequence.field' = 't2'\n" - + " )"); - - /** Create branch */ - spark.sql("CALL sys.create_branch('my_db1.chain_test', 'snapshot');"); - spark.sql("CALL sys.create_branch('my_db1.chain_test', 'delta')"); - - /** Set branch */ - spark.sql( - "ALTER TABLE my_db1.chain_test SET tblproperties (" - + "'scan.fallback-snapshot-branch' = 'snapshot', " - + "'scan.fallback-delta-branch' = 'delta')"); - spark.sql( - "ALTER TABLE `my_db1`.`chain_test$branch_snapshot` SET tblproperties (" - + "'scan.fallback-snapshot-branch' = 'snapshot'," - + "'scan.fallback-delta-branch' = 'delta')"); - spark.sql( - "ALTER TABLE `my_db1`.`chain_test$branch_delta` SET tblproperties (" - + "'scan.fallback-snapshot-branch' = 'snapshot'," - + "'scan.fallback-delta-branch' = 'delta')"); - spark.close(); - spark = builder.getOrCreate(); - - /** Write main branch */ - spark.sql( - "insert overwrite table `my_db1`.`chain_test` partition (dt = '20250810', hour = '22') values (1, 1, '1'),(2, 1, '1');"); - - /** Write delta branch */ - spark.sql("set spark.paimon.branch=delta;"); - spark.sql( - "insert overwrite table `my_db1`.`chain_test` partition (dt = '20250810', hour = '21') values (1, 1, '1'),(2, 1, '1');"); - spark.sql( - "insert overwrite table `my_db1`.`chain_test` partition (dt = '20250810', hour = '22') values (1, 2, '1-1' ),(3, 1, '1' );"); - spark.sql( - "insert overwrite table `my_db1`.`chain_test` partition (dt = '20250810', hour = '23') values (2, 2, '1-1' ),(4, 1, '1' );"); - spark.sql( - "insert overwrite table `my_db1`.`chain_test` partition (dt = '20250811', hour = '00') values (3, 2, '1-1' ),(4, 2, '1-1' );"); - spark.sql( - "insert overwrite table `my_db1`.`chain_test` partition (dt = '20250811', hour = '01') values (5, 1, '1' ),(6, 1, '1' );"); - spark.sql( - "insert overwrite table `my_db1`.`chain_test` partition (dt = '20250811', hour = '02') values (5, 2, '1-1' ),(6, 2, '1-1' );"); - - /** Write snapshot branch */ - spark.sql("set spark.paimon.branch=snapshot;"); - spark.sql( - "insert overwrite table `my_db1`.`chain_test` partition (dt = '20250810', hour = '22') values (1, 2, '1-1'),(2, 1, '1'),(3, 1, '1');"); - spark.sql( - "insert overwrite table `my_db1`.`chain_test` partition (dt = '20250811', hour = '00') values (1, 2, '1-1'),(2, 2, '1-1'),(3, 2, '1-1'), (4, 2, '1-1');"); - spark.sql( - "insert overwrite table `my_db1`.`chain_test` partition (dt = '20250811', hour = '02') values (1, 2, '1-1'),(2, 2, '1-1'),(3, 2, '1-1'), (4, 2, '1-1'), (5, 1, '1' ), (6, 1, '1');"); - - spark.close(); - spark = builder.getOrCreate(); - /** Main read */ - assertThat( - spark - .sql( - "SELECT * FROM `my_db1`.`chain_test` where dt = '20250810' and hour = '22'") - .collectAsList().stream() - .map(Row::toString) - .collect(Collectors.toList())) - .containsExactlyInAnyOrder("[1,1,1,20250810,22]", "[2,1,1,20250810,22]"); - - /** Snapshot read */ - assertThat( - spark - .sql( - "SELECT * FROM `my_db1`.`chain_test` where dt = '20250811' and hour = '02'") - .collectAsList().stream() - .map(Row::toString) - .collect(Collectors.toList())) - .containsExactlyInAnyOrder( - "[1,2,1-1,20250811,02]", - "[2,2,1-1,20250811,02]", - "[3,2,1-1,20250811,02]", - "[4,2,1-1,20250811,02]", - "[5,1,1,20250811,02]", - "[6,1,1,20250811,02]"); - - /** Chain read */ - /** 1. non pre snapshot */ - assertThat( - spark - .sql( - "SELECT * FROM `my_db1`.`chain_test` where dt = '20250810' and hour = '21'") - .collectAsList().stream() - .map(Row::toString) - .collect(Collectors.toList())) - .containsExactlyInAnyOrder("[1,1,1,20250810,21]", "[2,1,1,20250810,21]"); - /** 2. has pre snapshot */ - assertThat( - spark - .sql( - "SELECT * FROM `my_db1`.`chain_test` where dt = '20250810' and hour = '23'") - .collectAsList().stream() - .map(Row::toString) - .collect(Collectors.toList())) - .containsExactlyInAnyOrder( - "[1,2,1-1,20250810,23]", - "[2,2,1-1,20250810,23]", - "[3,1,1,20250810,23]", - "[4,1,1,20250810,23]"); - - /** Multi partition Read */ - assertThat( - spark - .sql( - "SELECT * FROM `my_db1`.`chain_test` where dt = '20250810' and hour in ('22', '23');") - .collectAsList().stream() - .map(Row::toString) - .collect(Collectors.toList())) - .containsExactlyInAnyOrder( - "[1,1,1,20250810,22]", - "[2,1,1,20250810,22]", - "[1,2,1-1,20250810,23]", - "[2,2,1-1,20250810,23]", - "[3,1,1,20250810,23]", - "[4,1,1,20250810,23]"); - - /** Incremental read */ - assertThat( - spark - .sql( - "SELECT * FROM `my_db1`.`chain_test$branch_delta` where dt = '20250810' and hour = '23'") - .collectAsList().stream() - .map(Row::toString) - .collect(Collectors.toList())) - .containsExactlyInAnyOrder("[2,2,1-1,20250810,23]", "[4,1,1,20250810,23]"); - - /** Multi partition incremental read */ - assertThat( - spark - .sql( - "SELECT * FROM `my_db1`.`chain_test$branch_delta` where dt = '20250810' and hour in ('22', '23');") - .collectAsList().stream() - .map(Row::toString) - .collect(Collectors.toList())) - .containsExactlyInAnyOrder( - "[1,2,1-1,20250810,22]", - "[3,1,1,20250810,22]", - "[2,2,1-1,20250810,23]", - "[4,1,1,20250810,23]"); - - /** Hybrid read */ - assertThat( - spark - .sql( - "select * from `my_db1`.`chain_test` where dt = '20250810' and hour = '23'\n" - + "union all\n" - + "select * from `my_db1`.`chain_test$branch_delta` where dt = '20250810' and hour = '23'") - .collectAsList().stream() - .map(Row::toString) - .collect(Collectors.toList())) - .containsExactlyInAnyOrder( - "[1,2,1-1,20250810,23]", - "[2,2,1-1,20250810,23]", - "[3,1,1,20250810,23]", - "[4,1,1,20250810,23]", - "[2,2,1-1,20250810,23]", - "[4,1,1,20250810,23]"); - - spark.close(); - spark = builder.getOrCreate(); - - /** Drop table */ - spark.sql("DROP TABLE IF EXISTS `my_db1`.`chain_test`;"); - - spark.close(); - } } diff --git a/paimon-spark/paimon-spark-ut/src/test/java/org/apache/paimon/spark/SparkChainTableITCase.java b/paimon-spark/paimon-spark-ut/src/test/java/org/apache/paimon/spark/SparkChainTableITCase.java new file mode 100644 index 000000000000..b5a28fe50010 --- /dev/null +++ b/paimon-spark/paimon-spark-ut/src/test/java/org/apache/paimon/spark/SparkChainTableITCase.java @@ -0,0 +1,509 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.spark; + +import org.apache.paimon.fs.Path; +import org.apache.paimon.hive.TestHiveMetastore; + +import org.apache.spark.sql.Dataset; +import org.apache.spark.sql.Row; +import org.apache.spark.sql.SparkSession; +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.io.TempDir; + +import java.io.IOException; +import java.util.stream.Collectors; + +import static org.assertj.core.api.Assertions.assertThat; + +/** Base tests for spark read. */ +public class SparkChainTableITCase { + + private static TestHiveMetastore testHiveMetastore; + private static final int PORT = 9091; + + @BeforeAll + public static void startMetastore() { + testHiveMetastore = new TestHiveMetastore(); + testHiveMetastore.start(PORT); + } + + @AfterAll + public static void closeMetastore() throws Exception { + testHiveMetastore.stop(); + } + + @Test + public void testChainTable(@TempDir java.nio.file.Path tempDir) throws IOException { + Path warehousePath = new Path("file:" + tempDir.toString()); + SparkSession.Builder builder = + SparkSession.builder() + .config("spark.sql.warehouse.dir", warehousePath.toString()) + // with hive metastore + .config("spark.sql.catalogImplementation", "hive") + .config("hive.metastore.uris", "thrift://localhost:" + PORT) + .config("spark.sql.catalog.spark_catalog", SparkCatalog.class.getName()) + .config("spark.sql.catalog.spark_catalog.metastore", "hive") + .config( + "spark.sql.catalog.spark_catalog.hive.metastore.uris", + "thrift://localhost:" + PORT) + .config("spark.sql.catalog.spark_catalog.format-table.enabled", "true") + .config( + "spark.sql.catalog.spark_catalog.warehouse", + warehousePath.toString()) + .config( + "spark.sql.extensions", + "org.apache.paimon.spark.extensions.PaimonSparkSessionExtensions") + .master("local[2]"); + SparkSession spark = builder.getOrCreate(); + spark.sql("CREATE DATABASE IF NOT EXISTS my_db1"); + spark.sql("USE spark_catalog.my_db1"); + + /** Create table */ + spark.sql( + "CREATE TABLE IF NOT EXISTS \n" + + " `my_db1`.`chain_test` (\n" + + " `t1` BIGINT COMMENT 't1',\n" + + " `t2` BIGINT COMMENT 't2',\n" + + " `t3` STRING COMMENT 't3'\n" + + " ) PARTITIONED BY (`dt` STRING COMMENT 'dt') ROW FORMAT SERDE 'org.apache.paimon.hive.PaimonSerDe'\n" + + "WITH\n" + + " SERDEPROPERTIES ('serialization.format' = '1') STORED AS INPUTFORMAT 'org.apache.paimon.hive.mapred.PaimonInputFormat' OUTPUTFORMAT 'org.apache.paimon.hive.mapred.PaimonOutputFormat' TBLPROPERTIES (\n" + + " 'bucket-key' = 't1',\n" + + " 'primary-key' = 'dt,t1',\n" + + " 'partition.timestamp-pattern' = '$dt',\n" + + " 'partition.timestamp-formatter' = 'yyyyMMdd',\n" + + " 'chain-table.enabled' = 'true',\n" + + " 'bucket' = '2',\n" + + " 'merge-engine' = 'deduplicate', \n" + + " 'sequence.field' = 't2'\n" + + " )"); + + /** Create branch */ + spark.sql("CALL sys.create_branch('my_db1.chain_test', 'snapshot');"); + spark.sql("CALL sys.create_branch('my_db1.chain_test', 'delta')"); + + /** Set branch */ + spark.sql( + "ALTER TABLE my_db1.chain_test SET tblproperties (" + + "'scan.fallback-snapshot-branch' = 'snapshot', " + + "'scan.fallback-delta-branch' = 'delta')"); + spark.sql( + "ALTER TABLE `my_db1`.`chain_test$branch_snapshot` SET tblproperties (" + + "'scan.fallback-snapshot-branch' = 'snapshot'," + + "'scan.fallback-delta-branch' = 'delta')"); + spark.sql( + "ALTER TABLE `my_db1`.`chain_test$branch_delta` SET tblproperties (" + + "'scan.fallback-snapshot-branch' = 'snapshot'," + + "'scan.fallback-delta-branch' = 'delta')"); + spark.close(); + spark = builder.getOrCreate(); + + /** Write main branch */ + spark.sql( + "insert overwrite table `my_db1`.`chain_test` partition (dt = '20250810') values (1, 1, '1'),(2, 1, '1');"); + + /** Write delta branch */ + spark.sql("set spark.paimon.branch=delta;"); + spark.sql( + "insert overwrite table `my_db1`.`chain_test` partition (dt = '20250809') values (1, 1, '1'),(2, 1, '1');"); + spark.sql( + "insert overwrite table `my_db1`.`chain_test` partition (dt = '20250810') values (1, 2, '1-1' ),(3, 1, '1' );"); + spark.sql( + "insert overwrite table `my_db1`.`chain_test` partition (dt = '20250811') values (2, 2, '1-1' ),(4, 1, '1' );"); + spark.sql( + "insert overwrite table `my_db1`.`chain_test` partition (dt = '20250812') values (3, 2, '1-1' ),(4, 2, '1-1' );"); + spark.sql( + "insert overwrite table `my_db1`.`chain_test` partition (dt = '20250813') values (5, 1, '1' ),(6, 1, '1' );"); + spark.sql( + "insert overwrite table `my_db1`.`chain_test` partition (dt = '20250814') values (5, 2, '1-1' ),(6, 2, '1-1' );"); + + /** Write snapshot branch */ + spark.sql("set spark.paimon.branch=snapshot;"); + spark.sql( + "insert overwrite table `my_db1`.`chain_test` partition (dt = '20250810') values (1, 2, '1-1'),(2, 1, '1'),(3, 1, '1');"); + spark.sql( + "insert overwrite table `my_db1`.`chain_test` partition (dt = '20250812') values (1, 2, '1-1'),(2, 2, '1-1'),(3, 2, '1-1'), (4, 2, '1-1');"); + spark.sql( + "insert overwrite table `my_db1`.`chain_test` partition (dt = '20250814') values (1, 2, '1-1'),(2, 2, '1-1'),(3, 2, '1-1'), (4, 2, '1-1'), (5, 1, '1' ), (6, 1, '1');"); + + spark.close(); + spark = builder.getOrCreate(); + /** Main read */ + assertThat( + spark.sql("SELECT * FROM `my_db1`.`chain_test` where dt = '20250810'") + .collectAsList().stream() + .map(Row::toString) + .collect(Collectors.toList())) + .containsExactlyInAnyOrder("[1,1,1,20250810]", "[2,1,1,20250810]"); + + /** Snapshot read */ + assertThat( + spark.sql("SELECT * FROM `my_db1`.`chain_test` where dt = '20250814'") + .collectAsList().stream() + .map(Row::toString) + .collect(Collectors.toList())) + .containsExactlyInAnyOrder( + "[1,2,1-1,20250814]", + "[2,2,1-1,20250814]", + "[3,2,1-1,20250814]", + "[4,2,1-1,20250814]", + "[5,1,1,20250814]", + "[6,1,1,20250814]"); + + /** Chain read */ + /** 1. non pre snapshot */ + assertThat( + spark.sql("SELECT * FROM `my_db1`.`chain_test` where dt = '20250809'") + .collectAsList().stream() + .map(Row::toString) + .collect(Collectors.toList())) + .containsExactlyInAnyOrder("[1,1,1,20250809]", "[2,1,1,20250809]"); + /** 2. has pre snapshot */ + assertThat( + spark.sql("SELECT * FROM `my_db1`.`chain_test` where dt = '20250811'") + .collectAsList().stream() + .map(Row::toString) + .collect(Collectors.toList())) + .containsExactlyInAnyOrder( + "[1,2,1-1,20250811]", + "[2,2,1-1,20250811]", + "[3,1,1,20250811]", + "[4,1,1,20250811]"); + + /** Multi partition Read */ + assertThat( + spark + .sql( + "SELECT * FROM `my_db1`.`chain_test` where dt in ('20250810', '20250811', '20250812');") + .collectAsList().stream() + .map(Row::toString) + .collect(Collectors.toList())) + .containsExactlyInAnyOrder( + "[1,1,1,20250810]", + "[2,1,1,20250810]", + "[1,2,1-1,20250811]", + "[2,2,1-1,20250811]", + "[3,1,1,20250811]", + "[4,1,1,20250811]", + "[1,2,1-1,20250812]", + "[2,2,1-1,20250812]", + "[3,2,1-1,20250812]", + "[4,2,1-1,20250812]"); + + /** Incremental read */ + assertThat( + spark + .sql( + "SELECT * FROM `my_db1`.`chain_test$branch_delta` where dt = '20250811'") + .collectAsList().stream() + .map(Row::toString) + .collect(Collectors.toList())) + .containsExactlyInAnyOrder("[2,2,1-1,20250811]", "[4,1,1,20250811]"); + + /** Multi partition incremental read */ + assertThat( + spark + .sql( + "SELECT * FROM `my_db1`.`chain_test$branch_delta` where dt in ('20250810', '20250811', '20250812');") + .collectAsList().stream() + .map(Row::toString) + .collect(Collectors.toList())) + .containsExactlyInAnyOrder( + "[1,2,1-1,20250810]", + "[3,1,1,20250810]", + "[2,2,1-1,20250811]", + "[4,1,1,20250811]", + "[3,2,1-1,20250812]", + "[4,2,1-1,20250812]"); + + /** Hybrid read */ + assertThat( + spark + .sql( + "select * from `my_db1`.`chain_test` where dt = '20250811'\n" + + "union all\n" + + "select * from `my_db1`.`chain_test$branch_delta` where dt = '20250811'") + .collectAsList().stream() + .map(Row::toString) + .collect(Collectors.toList())) + .containsExactlyInAnyOrder( + "[1,2,1-1,20250811]", + "[2,2,1-1,20250811]", + "[3,1,1,20250811]", + "[4,1,1,20250811]", + "[2,2,1-1,20250811]", + "[4,1,1,20250811]"); + + spark.close(); + spark = builder.getOrCreate(); + spark.sql("set spark.paimon.branch=delta;"); + spark.sql( + "insert overwrite table `my_db1`.`chain_test` values (5, 2, '1', '20250813'),(6, 2, '1', '20250814');"); + + spark.close(); + spark = builder.getOrCreate(); + Dataset df = + spark.sql( + "SELECT t1,t2,t3 FROM `my_db1`.`chain_test$branch_snapshot` where dt = '20250814'"); + assertThat(df.count()).isEqualTo(0); + df = + spark.sql( + "SELECT t1,t2,t3 FROM `my_db1`.`chain_test$branch_delta` where dt = '20250814'"); + assertThat(df.count()).isEqualTo(1); + + spark.close(); + spark = builder.getOrCreate(); + /** Drop table */ + spark.sql("DROP TABLE IF EXISTS `my_db1`.`chain_test`;"); + + spark.close(); + } + + @Test + public void testHourlyChainTable(@TempDir java.nio.file.Path tempDir) throws IOException { + Path warehousePath = new Path("file:" + tempDir.toString()); + SparkSession.Builder builder = + SparkSession.builder() + .config("spark.sql.warehouse.dir", warehousePath.toString()) + // with hive metastore + .config("spark.sql.catalogImplementation", "hive") + .config("hive.metastore.uris", "thrift://localhost:" + PORT) + .config("spark.sql.catalog.spark_catalog", SparkCatalog.class.getName()) + .config("spark.sql.catalog.spark_catalog.metastore", "hive") + .config( + "spark.sql.catalog.spark_catalog.hive.metastore.uris", + "thrift://localhost:" + PORT) + .config("spark.sql.catalog.spark_catalog.format-table.enabled", "true") + .config( + "spark.sql.catalog.spark_catalog.warehouse", + warehousePath.toString()) + .config( + "spark.sql.extensions", + "org.apache.paimon.spark.extensions.PaimonSparkSessionExtensions") + .master("local[2]"); + SparkSession spark = builder.getOrCreate(); + spark.sql("CREATE DATABASE IF NOT EXISTS my_db1"); + spark.sql("USE spark_catalog.my_db1"); + + /** Create table */ + spark.sql( + "CREATE TABLE IF NOT EXISTS \n" + + " `my_db1`.`chain_test` (\n" + + " `t1` BIGINT COMMENT 't1',\n" + + " `t2` BIGINT COMMENT 't2',\n" + + " `t3` STRING COMMENT 't3'\n" + + " ) PARTITIONED BY (`dt` STRING COMMENT 'dt', `hour` STRING COMMENT 'hour') ROW FORMAT SERDE 'org.apache.paimon.hive.PaimonSerDe'\n" + + "WITH\n" + + " SERDEPROPERTIES ('serialization.format' = '1') STORED AS INPUTFORMAT 'org.apache.paimon.hive.mapred.PaimonInputFormat' OUTPUTFORMAT 'org.apache.paimon.hive.mapred.PaimonOutputFormat' TBLPROPERTIES (\n" + + " 'bucket-key' = 't1',\n" + + " 'primary-key' = 'dt,hour,t1',\n" + + " 'partition.timestamp-pattern' = '$dt $hour:00:00',\n" + + " 'partition.timestamp-formatter' = 'yyyyMMdd HH:mm:ss',\n" + + " 'chain-table.enabled' = 'true',\n" + + " 'bucket' = '2',\n" + + " 'merge-engine' = 'deduplicate', \n" + + " 'sequence.field' = 't2'\n" + + " )"); + + /** Create branch */ + spark.sql("CALL sys.create_branch('my_db1.chain_test', 'snapshot');"); + spark.sql("CALL sys.create_branch('my_db1.chain_test', 'delta')"); + + /** Set branch */ + spark.sql( + "ALTER TABLE my_db1.chain_test SET tblproperties (" + + "'scan.fallback-snapshot-branch' = 'snapshot', " + + "'scan.fallback-delta-branch' = 'delta')"); + spark.sql( + "ALTER TABLE `my_db1`.`chain_test$branch_snapshot` SET tblproperties (" + + "'scan.fallback-snapshot-branch' = 'snapshot'," + + "'scan.fallback-delta-branch' = 'delta')"); + spark.sql( + "ALTER TABLE `my_db1`.`chain_test$branch_delta` SET tblproperties (" + + "'scan.fallback-snapshot-branch' = 'snapshot'," + + "'scan.fallback-delta-branch' = 'delta')"); + spark.close(); + spark = builder.getOrCreate(); + + /** Write main branch */ + spark.sql( + "insert overwrite table `my_db1`.`chain_test` partition (dt = '20250810', hour = '22') values (1, 1, '1'),(2, 1, '1');"); + + /** Write delta branch */ + spark.sql("set spark.paimon.branch=delta;"); + spark.sql( + "insert overwrite table `my_db1`.`chain_test` partition (dt = '20250810', hour = '21') values (1, 1, '1'),(2, 1, '1');"); + spark.sql( + "insert overwrite table `my_db1`.`chain_test` partition (dt = '20250810', hour = '22') values (1, 2, '1-1' ),(3, 1, '1' );"); + spark.sql( + "insert overwrite table `my_db1`.`chain_test` partition (dt = '20250810', hour = '23') values (2, 2, '1-1' ),(4, 1, '1' );"); + spark.sql( + "insert overwrite table `my_db1`.`chain_test` partition (dt = '20250811', hour = '00') values (3, 2, '1-1' ),(4, 2, '1-1' );"); + spark.sql( + "insert overwrite table `my_db1`.`chain_test` partition (dt = '20250811', hour = '01') values (5, 1, '1' ),(6, 1, '1' );"); + spark.sql( + "insert overwrite table `my_db1`.`chain_test` partition (dt = '20250811', hour = '02') values (5, 2, '1-1' ),(6, 2, '1-1' );"); + + /** Write snapshot branch */ + spark.sql("set spark.paimon.branch=snapshot;"); + spark.sql( + "insert overwrite table `my_db1`.`chain_test` partition (dt = '20250810', hour = '22') values (1, 2, '1-1'),(2, 1, '1'),(3, 1, '1');"); + spark.sql( + "insert overwrite table `my_db1`.`chain_test` partition (dt = '20250811', hour = '00') values (1, 2, '1-1'),(2, 2, '1-1'),(3, 2, '1-1'), (4, 2, '1-1');"); + spark.sql( + "insert overwrite table `my_db1`.`chain_test` partition (dt = '20250811', hour = '02') values (1, 2, '1-1'),(2, 2, '1-1'),(3, 2, '1-1'), (4, 2, '1-1'), (5, 1, '1' ), (6, 1, '1');"); + + spark.close(); + spark = builder.getOrCreate(); + /** Main read */ + assertThat( + spark + .sql( + "SELECT * FROM `my_db1`.`chain_test` where dt = '20250810' and hour = '22'") + .collectAsList().stream() + .map(Row::toString) + .collect(Collectors.toList())) + .containsExactlyInAnyOrder("[1,1,1,20250810,22]", "[2,1,1,20250810,22]"); + + /** Snapshot read */ + assertThat( + spark + .sql( + "SELECT * FROM `my_db1`.`chain_test` where dt = '20250811' and hour = '02'") + .collectAsList().stream() + .map(Row::toString) + .collect(Collectors.toList())) + .containsExactlyInAnyOrder( + "[1,2,1-1,20250811,02]", + "[2,2,1-1,20250811,02]", + "[3,2,1-1,20250811,02]", + "[4,2,1-1,20250811,02]", + "[5,1,1,20250811,02]", + "[6,1,1,20250811,02]"); + + /** Chain read */ + /** 1. non pre snapshot */ + assertThat( + spark + .sql( + "SELECT * FROM `my_db1`.`chain_test` where dt = '20250810' and hour = '21'") + .collectAsList().stream() + .map(Row::toString) + .collect(Collectors.toList())) + .containsExactlyInAnyOrder("[1,1,1,20250810,21]", "[2,1,1,20250810,21]"); + /** 2. has pre snapshot */ + assertThat( + spark + .sql( + "SELECT * FROM `my_db1`.`chain_test` where dt = '20250810' and hour = '23'") + .collectAsList().stream() + .map(Row::toString) + .collect(Collectors.toList())) + .containsExactlyInAnyOrder( + "[1,2,1-1,20250810,23]", + "[2,2,1-1,20250810,23]", + "[3,1,1,20250810,23]", + "[4,1,1,20250810,23]"); + + /** Multi partition Read */ + assertThat( + spark + .sql( + "SELECT * FROM `my_db1`.`chain_test` where dt = '20250810' and hour in ('22', '23');") + .collectAsList().stream() + .map(Row::toString) + .collect(Collectors.toList())) + .containsExactlyInAnyOrder( + "[1,1,1,20250810,22]", + "[2,1,1,20250810,22]", + "[1,2,1-1,20250810,23]", + "[2,2,1-1,20250810,23]", + "[3,1,1,20250810,23]", + "[4,1,1,20250810,23]"); + + /** Incremental read */ + assertThat( + spark + .sql( + "SELECT * FROM `my_db1`.`chain_test$branch_delta` where dt = '20250810' and hour = '23'") + .collectAsList().stream() + .map(Row::toString) + .collect(Collectors.toList())) + .containsExactlyInAnyOrder("[2,2,1-1,20250810,23]", "[4,1,1,20250810,23]"); + + /** Multi partition incremental read */ + assertThat( + spark + .sql( + "SELECT * FROM `my_db1`.`chain_test$branch_delta` where dt = '20250810' and hour in ('22', '23');") + .collectAsList().stream() + .map(Row::toString) + .collect(Collectors.toList())) + .containsExactlyInAnyOrder( + "[1,2,1-1,20250810,22]", + "[3,1,1,20250810,22]", + "[2,2,1-1,20250810,23]", + "[4,1,1,20250810,23]"); + + /** Hybrid read */ + assertThat( + spark + .sql( + "select * from `my_db1`.`chain_test` where dt = '20250810' and hour = '23'\n" + + "union all\n" + + "select * from `my_db1`.`chain_test$branch_delta` where dt = '20250810' and hour = '23'") + .collectAsList().stream() + .map(Row::toString) + .collect(Collectors.toList())) + .containsExactlyInAnyOrder( + "[1,2,1-1,20250810,23]", + "[2,2,1-1,20250810,23]", + "[3,1,1,20250810,23]", + "[4,1,1,20250810,23]", + "[2,2,1-1,20250810,23]", + "[4,1,1,20250810,23]"); + + spark.close(); + spark = builder.getOrCreate(); + spark.sql("set spark.paimon.branch=delta;"); + spark.sql( + "insert overwrite table `my_db1`.`chain_test` values (6, 2, '1', '20250811', '02');"); + + spark.close(); + spark = builder.getOrCreate(); + Dataset df = + spark.sql( + "SELECT t1,t2,t3 FROM `my_db1`.`chain_test$branch_snapshot` where dt = '20250811' and hour = '02'"); + assertThat(df.count()).isEqualTo(0); + df = + spark.sql( + "SELECT t1,t2,t3 FROM `my_db1`.`chain_test$branch_delta` where dt = '20250811' and hour = '02'"); + assertThat(df.count()).isEqualTo(1); + + spark.close(); + spark = builder.getOrCreate(); + /** Drop table */ + spark.sql("DROP TABLE IF EXISTS `my_db1`.`chain_test`;"); + + spark.close(); + } +}