Skip to content

Conversation

@LiangDai-Mars
Copy link

@LiangDai-Mars LiangDai-Mars commented Jan 13, 2026

Purpose

    This PR aims to fix a bug in the aggregation merge engine where the collect aggregate function with distinct mode enabled handles unmerged initial data (initial collect) incorrectly. Specifically, when the first record (an ADD operation) enters the ReducerMergeFunctionWrapper, if it is not immediately merged and initialized, the subsequent aggregation result will incorrectly lose this initial value. This fix ensures that the first ADD record is merged immediately, thus guaranteeing the correctness of distinct collect aggregation in various scenarios.

Tests

To verify the fix, this PR enhances the integration test case CollectAggregationITCase.java in the paimon-flink-common module.

  • New Test Scenario: A new record with primary key 4 is added in the testAggWithDistinct test method.
  • Initial Data: An INSERT statement inserts an array with duplicate elements: ARRAY['A', 'B', 'A'].
  • Subsequent Merge: A second INSERT adds a new array ARRAY['C', 'D', 'C'] for the primary key 4.
  • Key Assertions:
    • After the first query, the aggregation result for primary key 4 is asserted to be ['A', 'B'], verifying the deduplication logic on initial insertion.
    • After the second query, the aggregation result for primary key 4 is asserted to be ['A', 'B', 'C', 'D'], verifying the deduplication logic when merging with existing state.
      This test case effectively covers the problematic scenario and ensures the correctness of the fix.

API and Format

No API or format changes. This change only involves an internal logic correction and does not affect any external APIs, data storage formats, or configuration files.

Documentation

This change is an internal bug fix and does not require updates to user-facing documentation.

@LiangDai-Mars
Copy link
Author

@JingsongLi We found that when we using distinct collect agg, if some key only has single array record which deduplicated elements, it will return the initial array without removing duplicates. Please review the fix PR since it change the default logic of merge function.
cc @Aitozi

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant