Skip to content

feat(hudi): enable concurrent hudi s3 footer collection#815

Open
parisni wants to merge 2 commits intoapache:mainfrom
leboncoin:pr-optim-hudi-s3
Open

feat(hudi): enable concurrent hudi s3 footer collection#815
parisni wants to merge 2 commits intoapache:mainfrom
leboncoin:pr-optim-hudi-s3

Conversation

@parisni
Copy link
Contributor

@parisni parisni commented Mar 6, 2026

Improve Hudi sync performance when Hudi metadata column stats are not enabled and XTable falls back to parquet footer reads on S3.

Brief change log

  • Parallelized computeColumnStatsFromParquetFooters in HudiFileStatsExtractor using Stream.parallel().
  • Added regression test columnStatsWithoutMetadataTable_parallelFooterReadsAreThreadSafe to validate stability under parallel execution.
  • Added runtime tuning note for ForkJoinPool parallelism:
    -Djava.util.concurrent.ForkJoinPool.common.parallelism=16.

Impact

  • In our XTable runs (fallback-to-S3 footer path), this improved processing time by about 4x.

Verify this pull request

This change added tests and can be verified as follows:

  • Run:
    • mvn -pl xtable-core -Dtest=TestHudiFileStatsExtractor test -DskipITs
  • Optional tuned run:
    • MAVEN_OPTS="-Djava.util.concurrent.ForkJoinPool.common.parallelism=16" mvn -pl xtable-core -Dtest=TestHudiFileStatsExtractor test -DskipITs
  • Result:
    • TestHudiFileStatsExtractor passed (3 tests, 0 failures).

// Use the common ForkJoinPool for parquet footer reads; parallelism can be tuned via
// -Djava.util.concurrent.ForkJoinPool.common.parallelism.
return files
.parallel()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Stream.parallel() uses the common ForkJoinPool, which is shared across the entire JVM. Since readRangeFromParquetMetadata is a blocking I/O call (especially on S3), this can starve other parallel streams and CompletableFuture.supplyAsync calls that share the same pool. ForkJoinPool is designed for CPU-bound divide-and-conquer work — for blocking I/O, a dedicated ExecutorService (fixed thread pool) is the right tool. Consider injecting an executor and using CompletableFuture.supplyAsync:

List<CompletableFuture<InternalDataFile>> futures = files
    .map(file -> CompletableFuture.supplyAsync(() -> {
        HudiFileStats stats = computeColumnStatsForFile(new Path(file.getPhysicalPath()), nameFieldMap);
        return file.toBuilder().columnStats(stats.getColumnStats()).recordCount(stats.getRowCount()).build();
    }, footerReadExecutor))
    .collect(Collectors.toList());
CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])).join();
return futures.stream().map(CompletableFuture::join);

The executor can be a constructor parameter (e.g., Executors.newFixedThreadPool(n)) so callers control threading.

Also — is UTILS (ParquetUtils, the static singleton on line 73) thread-safe? readRangeFromParquetMetadata is now called concurrently from multiple threads on the same instance. This should be verified before relying on it here.

fileStatsExtractor.addStatsToFiles(null, inputFiles.stream(), schema).collect(Collectors.toList());

assertEquals(200, output.size());
assertTrue(output.stream().allMatch(fileWithStats -> fileWithStats.getRecordCount() == 2));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The parallel test only checks getRecordCount() == 2 and getColumnStats().size() == 9 — counts only, not actual values. Thread-safety bugs (e.g., corrupted ranges from shared state) could produce 9 stats with wrong min/max/null counts and this test wouldn't catch it. Consider asserting actual stat values on at least one output file. The existing validateOutput method does this thoroughly but currently requires output.size() == 1 — either extract the value assertions into a separate helper or call it on output.subList(0, 1) if validateOutput is refactored to take a single InternalDataFile.

.collect(Collectors.toList());

List<InternalDataFile> output =
fileStatsExtractor.addStatsToFiles(null, inputFiles.stream(), schema).collect(Collectors.toList());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: Line exceeds style limit. Split:

List<InternalDataFile> output =
    fileStatsExtractor
        .addStatsToFiles(null, inputFiles.stream(), schema)
        .collect(Collectors.toList());

@vinishjail97
Copy link
Contributor

Thanks for contributing this optimization @parisni! Added few comments on the parallelism approach and improvements in test assertions. Let me know what you think.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants