From a8b4e54a29ab283312907a9bb01dadde83d4755b Mon Sep 17 00:00:00 2001 From: dailiang Date: Tue, 13 Jan 2026 16:41:30 +0800 Subject: [PATCH] Fix distinct collect agg bug of un merged initial collect. --- .../mergetree/compact/ReducerMergeFunctionWrapper.java | 4 ++++ .../flink/aggregation/CollectAggregationITCase.java | 8 ++++++-- 2 files changed, 10 insertions(+), 2 deletions(-) diff --git a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/ReducerMergeFunctionWrapper.java b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/ReducerMergeFunctionWrapper.java index 40c0732b08ff..75bd17828795 100644 --- a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/ReducerMergeFunctionWrapper.java +++ b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/ReducerMergeFunctionWrapper.java @@ -53,6 +53,10 @@ public void reset() { public void add(KeyValue kv) { if (initialKv == null) { initialKv = kv; + if (kv.isAdd()) { + merge(initialKv); + isInitialized = true; + } } else { if (!isInitialized) { merge(initialKv); diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/aggregation/CollectAggregationITCase.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/aggregation/CollectAggregationITCase.java index 518dff4c25a8..89e83c336b5a 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/aggregation/CollectAggregationITCase.java +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/aggregation/CollectAggregationITCase.java @@ -61,23 +61,27 @@ public void testAggWithDistinct() { "INSERT INTO test_collect VALUES " + "(1, CAST (NULL AS ARRAY)), " + "(2, ARRAY['A', 'B']), " - + "(3, ARRAY['car', 'watch'])"); + + "(3, ARRAY['car', 'watch']), " + + "(4, ARRAY['A', 'B', 'A'])"); List result = queryAndSort("SELECT * FROM test_collect"); checkOneRecord(result.get(0), 1); checkOneRecord(result.get(1), 2, "A", "B"); checkOneRecord(result.get(2), 3, "car", "watch"); + checkOneRecord(result.get(3), 4, "A", "B"); sql( "INSERT INTO test_collect VALUES " + "(1, ARRAY['paimon', 'paimon']), " + "(2, ARRAY['A', 'B', 'C']), " - + "(3, CAST (NULL AS ARRAY))"); + + "(3, CAST (NULL AS ARRAY)), " + + "(4, ARRAY['C', 'D', 'C'])"); result = queryAndSort("SELECT * FROM test_collect"); checkOneRecord(result.get(0), 1, "paimon"); checkOneRecord(result.get(1), 2, "A", "B", "C"); checkOneRecord(result.get(2), 3, "car", "watch"); + checkOneRecord(result.get(3), 4, "A", "B", "C", "D"); } @Test