From ad3208bb4d5f57a3a4b0ee1a3cf14bfa716982f9 Mon Sep 17 00:00:00 2001 From: dailiang Date: Tue, 13 Jan 2026 20:35:51 +0800 Subject: [PATCH] Fix wrong merge order of increment diff split read due to concurrent write of a key with non-deterministic sequence number. --- .../splitread/IncrementalDiffSplitRead.java | 6 +++-- .../paimon/flink/BatchFileStoreITCase.java | 25 +++++++++++++++++++ 2 files changed, 29 insertions(+), 2 deletions(-) diff --git a/paimon-core/src/main/java/org/apache/paimon/table/source/splitread/IncrementalDiffSplitRead.java b/paimon-core/src/main/java/org/apache/paimon/table/source/splitread/IncrementalDiffSplitRead.java index ec4408fb9737..c449af042d4c 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/source/splitread/IncrementalDiffSplitRead.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/source/splitread/IncrementalDiffSplitRead.java @@ -213,9 +213,11 @@ public KeyValue getResult() { } else if (kvs.size() == 2) { KeyValue before = kvs.get(0); KeyValue after = kvs.get(1); - if (after.level() == AFTER_LEVEL) { - if (!valueAndRowKindEquals(before, after)) { + if (!valueAndRowKindEquals(before, after)) { + if (after.level() == AFTER_LEVEL) { toReturn = after; + } else if (after.level() == BEFORE_LEVEL) { + toReturn = before; } } } else { diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BatchFileStoreITCase.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BatchFileStoreITCase.java index 22271c8a7ebe..9d339937d3ba 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BatchFileStoreITCase.java +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BatchFileStoreITCase.java @@ -1019,6 +1019,31 @@ public void testIncrementScanMode() throws Exception { Row.of("+I", 2, "B"), Row.of("-D", 2, "B"), Row.of("+I", 3, "C")); } + @Test + public void testIncrementScanModeWithInsertOverwrite() throws Exception { + sql("CREATE TABLE test_scan_mode (id INT PRIMARY KEY NOT ENFORCED, v STRING)"); + + // snapshot 1 + sql("INSERT OVERWRITE test_scan_mode VALUES (1, 'A'), (1, 'B'), (1, 'C')"); + // snapshot 2 + sql("INSERT OVERWRITE test_scan_mode VALUES (1, 'C'), (1, 'D')"); + + List result = + sql( + "SELECT * FROM `test_scan_mode$audit_log` " + + "/*+ OPTIONS('incremental-between'='1,2','incremental-between-scan-mode'='diff') */"); + assertThat(result).containsExactlyInAnyOrder(Row.of("+I", 1, "D")); + + // snapshot 3 + sql("INSERT OVERWRITE test_scan_mode VALUES (1, 'D')"); + + result = + sql( + "SELECT * FROM `test_scan_mode$audit_log` " + + "/*+ OPTIONS('incremental-between'='2,2','incremental-between-scan-mode'='diff') */"); + assertThat(result).isEmpty(); + } + @Test public void testAuditLogTableWithComputedColumn() throws Exception { sql("CREATE TABLE test_table (a int, b int, c AS a + b);");