Fix BloomFilter buffer incompatibility between Spark and Comet #3003
+26
−3
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
Handle Spark's full serialization format (12-byte header + bits) in merge_filter() to support Spark partial / Comet final execution. The fix automatically detects the format and extracts bits data accordingly.
Fixes #2889
Rationale for this change
Spark's serialize() returns full format: 12-byte header (version + numHashFunctions + numWords) + bits data
Comet's state_as_bytes() returns bits data only
When Spark partial sends full format, Comet's merge_filter() expects bits-only, causing mismatch
Ref https://github.com/apache/spark/blob/master/common/sketch/src/main/java/org/apache/spark/util/sketch/BitArray.java#L99
Ref https://github.com/apache/spark/blob/master/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/BloomFilterAggregate.scala#L219
Spark format: BloomFilterImpl.writeTo() (4+4 bytes) + BitArray.writeTo() (4 bytes + bits)
What changes are included in this PR?
Detects Spark format (buffer size = 12 + expected_bits_size)
Extracts bits data by skipping 12-byte header if Spark format
Returns bits as-is if Comet format
How are these changes tested?
Spark SQL test