-
Notifications
You must be signed in to change notification settings - Fork 1.3k
Open
Labels
bugSomething isn't workingSomething isn't working
Description
Search before asking
- I searched in the issues and found nothing similar.
Paimon version
1.3.1
Compute Engine
Flink 1.20
Minimal reproduce step
Disclaimer: performance issues are observed for really big tables
Table:
- Several thousand partitions
- PK table
- Fixed bucket (e.g. 32)
- Partitioned by a string field in the data (not partitioned by time)
- Table stored in cloud object store (e.g. S3)
Flink jobs:
- Streaming, write-only, all partitions are receiving realtime updates
- Batch, full compaction, all partitions require compaction
For the above setup, we noticed that write performance degrades quite significantly, and the performance does not correlate with the size of the table or streaming data volume.
After a lot of debugging and logging, we've identified a significant bottleneck for highly partitioned tables:
- The bottleneck appears to be FileSystemWriteRestore.restoreFiles, which is invoked via AbstractFileStoreWrite.createWriterContainer for each combination of (partition, bucket)
- Each execution of
restoreFileswould fetch manifest files- This leads to API requests to retrieve files from cloud filesystem
- This is also computationally expensive as manifest files (avro/parquet) are decompressed and deserialized into the right object models
- For a highly partitioned table receiving writes across all partitions, this could lead to the manifest being fetched 100,000s times
- This results in excessively high API costs for object retrieval
- This also lead to very long execution time of batch jobs, or unstable streaming jobs that may not keep up with streaming data loads
- Dedicated compaction job performance is particularly impacted by this
What doesn't meet your expectations?
- Reduce cloud API requests to not fetch manifest in an excessively duplicated manner
- Improve initialization performance for writers
Anything else?
We have been testing a patch that pre-fetches the entire manifest and caches in memory, to avoid duplicated API requests to retrieve the same object.
Caching also avoids repeated decompression/deserialization of manifest files.
For a batch compaction job:
- Release 1.3: ~3h run time, ~300k API requests
- With patch: ~5min run time, ~10k API requests => ~30x improvement of both metrics
Draft PR for discussion: #7031
Are you willing to submit a PR?
- I'm willing to submit a PR!
Metadata
Metadata
Assignees
Labels
bugSomething isn't workingSomething isn't working