Enable Flink projection pushdown for PSC connector#131
Enable Flink projection pushdown for PSC connector#131KevBrowne wants to merge 4 commits intopinterest:mainfrom
Conversation
43f57d1 to
5f25ef7
Compare
5f25ef7 to
813dd37
Compare
nickpan47
left a comment
There was a problem hiding this comment.
Could you please check w/ Ashish to see why nested projection is not supported? If you plan to add another PR to supported nested projection, please add a TODO as quick followup. If we don't plan to support it in the first release, I wonder why, given we have tested that partial deserializer actually supports nested pushdown?
...flink/src/main/java/com/pinterest/flink/streaming/connectors/psc/table/PscDynamicSource.java
Outdated
Show resolved
Hide resolved
...flink/src/main/java/com/pinterest/flink/streaming/connectors/psc/table/PscDynamicSource.java
Show resolved
Hide resolved
813dd37 to
125e731
Compare
nickpan47
left a comment
There was a problem hiding this comment.
Can you answer the question on how the nested pushdown will be translated into the selected Thrift metadata needed in the PartialThriftDeserializer? Thanks!
| "Projection path must have at least one element but got: %s", | ||
| Arrays.toString(path)); | ||
| // For nested projection, we only need the top-level field index to determine | ||
| // which fields to deserialize. The format (e.g., Thrift) handles nested extraction. |
There was a problem hiding this comment.
How do we pass the list of nested fields as the ThriftMetadata.ThriftStruct metadata to the PartialThriftDeserializer, if we only record the top-level field index?
There was a problem hiding this comment.
Not sure why the PR did not show updated.
but to expand on just getting the top level row i.e
int physicalPos = path[0];
as part of that loop I added logic to get all the paths for the top level field. So for instance we have
CREATE TABLE events (
id INT,
user ROW<
name STRING,
age INT,
address ROW<
city STRING,
zip STRING
>
>,
timestamp BIGINT
)
we should now be able to capture
[1, 0], [1, 1], [1, 2], [1, 2, 0], etc.
i also updated the unit tests to capture this as well.
nickpan47
left a comment
There was a problem hiding this comment.
Thanks! Have a comment on the separation of projection variables between top-level only vs nested. PTAL
| physicalPos >= 0 && physicalPos < physicalFieldCount, | ||
| "Projected field index out of bounds: %s", | ||
| physicalPos); | ||
| physicalIndexToOutputIndex[physicalPos] = outputPos; |
There was a problem hiding this comment.
Would there be an issue if we have a table foo (a string, b row<key string, value string>>), then we have select b.key, b.value from foo. In this case, physicalIndexToOutputIndex[1] would collide since there are two sub-fields in the same top-level field that need to map to two outputPos?
There was a problem hiding this comment.
made changes to this because yes physicalIndexToOutputIndex[1] would override losing b.key,
so now instead we properly record the outputPos in a map i.e like
pathsByTopLevelIndex = {
1: [
{ path: [1, 0], outputPos: 0 }, b.key → output position 0
{ path: [1, 1], outputPos: 1 } b.value → output position 1
]
}
| this.valueNestedProjection = valueNestedList.toArray(new int[0][]); | ||
|
|
||
| // Remap decoded fields into the projected output row order. | ||
| this.keyOutputProjection = |
There was a problem hiding this comment.
So, for nested projection that included multiple sub-fields in the same top-level field, keyOutputProjection will only include the last sub-field's output pos as the top-level field's keyOutputProjection[topFieldIndex]? It is the same as valueOutputProjection. Hence, for the example foo with. b.key and b.value above, valueOutputProjection[1]=1, which is the index position of b.value. Is this intended?
There was a problem hiding this comment.
similar to comment above
| * Converts nested projection paths to dot-separated field names. | ||
| * Example: [[1, 0], [2]] with schema (a, b ROW<x, y>, c) → ["b.x", "c"] | ||
| */ | ||
| public static List<String> convertPathsToFieldNames(int[][] paths, DataType dataType) { |
There was a problem hiding this comment.
Is this only used by unit tests? I don't see it access the private static member variables in PscDynamicSource class either. It can be completely moved to the test class.
| // Full nested projection paths for each format field. | ||
| // Each int[] is a path: [topLevelIndex] for top-level, [topLevelIndex, nestedIndex, ...] for nested. | ||
| // Used by formats that support nested projection (e.g., Thrift's PartialThriftDeserializer). | ||
| protected int[][] keyNestedProjection; |
There was a problem hiding this comment.
Why do we instantiate two separate variables for nested projection? Instead, we can change the definition of keyProjection and valueProjection to int[][] and consolidate the member variables.
There was a problem hiding this comment.
done i change keyProjection/valueProjection to int[][] and consolidated the variables so we no longer need keyNestedProjection and valueNestedProjection
| DataType physicalDataType, | ||
| @Nullable DecodingFormat<DeserializationSchema<RowData>> keyDecodingFormat, | ||
| DecodingFormat<DeserializationSchema<RowData>> valueDecodingFormat, | ||
| int[][] keyProjection, |
There was a problem hiding this comment.
One main question that I still have here: in this keyProjection / valueProjection arrays for nested projection, those embedded indices are based on the field/sub-field indices in the DDL table schema definition, right? So, does it also mean that the nested projection also relies on the field/sub-field indices of the table DDL schema are exactly the same as the Thrift schema? i.e. if Thrift schema has 1: f1, 2:f2, 3:f3, 4:f4 fields, and the table DDL has not been updated yet and only has 1:f1, 2:f2, 3:f4 three fields. If the projection is on f1 and f4, the indices will be 1,3. However, due to difference between table DDL and Thrift schema, the projection will become 1:f1, and 3:f3, not expected.
The only reliable way to implement that is to have the nested projection based on field/sub-field names, instead of indices. This is what I believe to be implemented: a) int[][] keyProjection and int[][] valueProjection are indices to the table DDL schema; b) we need to find the list of field/sub-field names according to the table DDL schema based on keyProjection and valueProjection; c) query and convert the field/sub-field names to Thrift field/sub-field indices, which is the actual nested projection int[][] that are passed into the partial Thrift deserializer.
nickpan47
left a comment
There was a problem hiding this comment.
One last comment on the indices based matching to name-based matching between table DDL vs Thrift. PTAL.
Summary
This PR implements SupportsProjectionPushDown for the PSC Flink connector, enabling Flink's optimizer to push column projections down to the source. This optimization reduces deserialization overhead by only deserializing the columns actually needed by queries, rather than deserializing the entire schema.
Key changes:
PscDynamicSource now implements SupportsProjectionPushDown
Added applyProjection() method to compute query-specific projections
Introduced format vs output projection separation (keyFormatProjection/valueFormatProjection for deserialization, keyOutputProjection/valueOutputProjection for row assembly)
Updated getScanRuntimeProvider() and createPscDeserializationSchema() to use the new projections
##Test plan
Unit Test
Internal Testing E2E w/ Flink
After submitting DDL with 42 columns
SELECT * FROM <Table> LIMIT 10; --> Produced RowType w/ all 42 columnsSELECT a, b, c FROM <Table> LIMIT 10;--> Produced RowType w/ 3 columns a,b,cSELECT a FROM <table> WHERE b = '<value>' LIMIT 10;Produced Rowtype w/ 2 columns a,b