diff --git a/xtable-core/src/main/java/org/apache/xtable/hudi/HudiFileStatsExtractor.java b/xtable-core/src/main/java/org/apache/xtable/hudi/HudiFileStatsExtractor.java index 5a0b70cb9..14575e4ef 100644 --- a/xtable-core/src/main/java/org/apache/xtable/hudi/HudiFileStatsExtractor.java +++ b/xtable-core/src/main/java/org/apache/xtable/hudi/HudiFileStatsExtractor.java @@ -104,7 +104,11 @@ public Stream addStatsToFiles( private Stream computeColumnStatsFromParquetFooters( Stream files, Map nameFieldMap) { - return files.map( + // Use the common ForkJoinPool for parquet footer reads; parallelism can be tuned via + // -Djava.util.concurrent.ForkJoinPool.common.parallelism. + return files + .parallel() + .map( file -> { HudiFileStats fileStats = computeColumnStatsForFile(new Path(file.getPhysicalPath()), nameFieldMap); diff --git a/xtable-core/src/test/java/org/apache/xtable/hudi/TestHudiFileStatsExtractor.java b/xtable-core/src/test/java/org/apache/xtable/hudi/TestHudiFileStatsExtractor.java index a18bb743d..70667299b 100644 --- a/xtable-core/src/test/java/org/apache/xtable/hudi/TestHudiFileStatsExtractor.java +++ b/xtable-core/src/test/java/org/apache/xtable/hudi/TestHudiFileStatsExtractor.java @@ -39,6 +39,7 @@ import java.util.List; import java.util.Map; import java.util.stream.Collectors; +import java.util.stream.IntStream; import java.util.stream.Stream; import org.apache.avro.Conversions; @@ -199,6 +200,50 @@ void columnStatsWithoutMetadataTable(@TempDir Path tempDir) throws IOException { validateOutput(output); } + @Test + void columnStatsWithoutMetadataTable_parallelFooterReadsAreThreadSafe(@TempDir Path tempDir) + throws IOException { + Path file = tempDir.resolve("tmp.parquet"); + GenericData genericData = GenericData.get(); + genericData.addLogicalTypeConversion(new Conversions.DecimalConversion()); + try (ParquetWriter writer = + AvroParquetWriter.builder( + HadoopOutputFile.fromPath( + new org.apache.hadoop.fs.Path(file.toUri()), configuration)) + .withSchema(AVRO_SCHEMA) + .withDataModel(genericData) + .build()) { + for (GenericRecord record : getRecords()) { + writer.write(record); + } + } + + HoodieTableMetaClient mockMetaClient = mock(HoodieTableMetaClient.class); + when(mockMetaClient.getHadoopConf()).thenReturn(configuration); + HudiFileStatsExtractor fileStatsExtractor = new HudiFileStatsExtractor(mockMetaClient); + + List inputFiles = + IntStream.range(0, 200) + .mapToObj( + i -> + InternalDataFile.builder() + .physicalPath(file.toString()) + .columnStats(Collections.emptyList()) + .fileFormat(FileFormat.APACHE_PARQUET) + .lastModified(1234L) + .fileSizeBytes(4321L) + .recordCount(0) + .build()) + .collect(Collectors.toList()); + + List output = + fileStatsExtractor.addStatsToFiles(null, inputFiles.stream(), schema).collect(Collectors.toList()); + + assertEquals(200, output.size()); + assertTrue(output.stream().allMatch(fileWithStats -> fileWithStats.getRecordCount() == 2)); + assertTrue(output.stream().allMatch(fileWithStats -> fileWithStats.getColumnStats().size() == 9)); + } + private void validateOutput(List output) { assertEquals(1, output.size()); InternalDataFile fileWithStats = output.get(0);