Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,11 @@ public Stream<InternalDataFile> addStatsToFiles(

private Stream<InternalDataFile> computeColumnStatsFromParquetFooters(
Stream<InternalDataFile> files, Map<String, InternalField> nameFieldMap) {
return files.map(
// Use the common ForkJoinPool for parquet footer reads; parallelism can be tuned via
// -Djava.util.concurrent.ForkJoinPool.common.parallelism.
return files
.parallel()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Stream.parallel() uses the common ForkJoinPool, which is shared across the entire JVM. Since readRangeFromParquetMetadata is a blocking I/O call (especially on S3), this can starve other parallel streams and CompletableFuture.supplyAsync calls that share the same pool. ForkJoinPool is designed for CPU-bound divide-and-conquer work — for blocking I/O, a dedicated ExecutorService (fixed thread pool) is the right tool. Consider injecting an executor and using CompletableFuture.supplyAsync:

List<CompletableFuture<InternalDataFile>> futures = files
    .map(file -> CompletableFuture.supplyAsync(() -> {
        HudiFileStats stats = computeColumnStatsForFile(new Path(file.getPhysicalPath()), nameFieldMap);
        return file.toBuilder().columnStats(stats.getColumnStats()).recordCount(stats.getRowCount()).build();
    }, footerReadExecutor))
    .collect(Collectors.toList());
CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])).join();
return futures.stream().map(CompletableFuture::join);

The executor can be a constructor parameter (e.g., Executors.newFixedThreadPool(n)) so callers control threading.

Also — is UTILS (ParquetUtils, the static singleton on line 73) thread-safe? readRangeFromParquetMetadata is now called concurrently from multiple threads on the same instance. This should be verified before relying on it here.

.map(
file -> {
HudiFileStats fileStats =
computeColumnStatsForFile(new Path(file.getPhysicalPath()), nameFieldMap);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import java.util.stream.Stream;

import org.apache.avro.Conversions;
Expand Down Expand Up @@ -199,6 +200,50 @@ void columnStatsWithoutMetadataTable(@TempDir Path tempDir) throws IOException {
validateOutput(output);
}

@Test
void columnStatsWithoutMetadataTable_parallelFooterReadsAreThreadSafe(@TempDir Path tempDir)
throws IOException {
Path file = tempDir.resolve("tmp.parquet");
GenericData genericData = GenericData.get();
genericData.addLogicalTypeConversion(new Conversions.DecimalConversion());
try (ParquetWriter<GenericRecord> writer =
AvroParquetWriter.<GenericRecord>builder(
HadoopOutputFile.fromPath(
new org.apache.hadoop.fs.Path(file.toUri()), configuration))
.withSchema(AVRO_SCHEMA)
.withDataModel(genericData)
.build()) {
for (GenericRecord record : getRecords()) {
writer.write(record);
}
}

HoodieTableMetaClient mockMetaClient = mock(HoodieTableMetaClient.class);
when(mockMetaClient.getHadoopConf()).thenReturn(configuration);
HudiFileStatsExtractor fileStatsExtractor = new HudiFileStatsExtractor(mockMetaClient);

List<InternalDataFile> inputFiles =
IntStream.range(0, 200)
.mapToObj(
i ->
InternalDataFile.builder()
.physicalPath(file.toString())
.columnStats(Collections.emptyList())
.fileFormat(FileFormat.APACHE_PARQUET)
.lastModified(1234L)
.fileSizeBytes(4321L)
.recordCount(0)
.build())
.collect(Collectors.toList());

List<InternalDataFile> output =
fileStatsExtractor.addStatsToFiles(null, inputFiles.stream(), schema).collect(Collectors.toList());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: Line exceeds style limit. Split:

List<InternalDataFile> output =
    fileStatsExtractor
        .addStatsToFiles(null, inputFiles.stream(), schema)
        .collect(Collectors.toList());


assertEquals(200, output.size());
assertTrue(output.stream().allMatch(fileWithStats -> fileWithStats.getRecordCount() == 2));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The parallel test only checks getRecordCount() == 2 and getColumnStats().size() == 9 — counts only, not actual values. Thread-safety bugs (e.g., corrupted ranges from shared state) could produce 9 stats with wrong min/max/null counts and this test wouldn't catch it. Consider asserting actual stat values on at least one output file. The existing validateOutput method does this thoroughly but currently requires output.size() == 1 — either extract the value assertions into a separate helper or call it on output.subList(0, 1) if validateOutput is refactored to take a single InternalDataFile.

assertTrue(output.stream().allMatch(fileWithStats -> fileWithStats.getColumnStats().size() == 9));
}

private void validateOutput(List<InternalDataFile> output) {
assertEquals(1, output.size());
InternalDataFile fileWithStats = output.get(0);
Expand Down
Loading