diff --git a/xtable-core/src/main/java/org/apache/xtable/delta/DeltaConversionSource.java b/xtable-core/src/main/java/org/apache/xtable/delta/DeltaConversionSource.java index 7fc8ea7b7..323c2cd5e 100644 --- a/xtable-core/src/main/java/org/apache/xtable/delta/DeltaConversionSource.java +++ b/xtable-core/src/main/java/org/apache/xtable/delta/DeltaConversionSource.java @@ -70,6 +70,8 @@ public class DeltaConversionSource implements ConversionSource { @Builder.Default private final DeltaTableExtractor tableExtractor = DeltaTableExtractor.builder().build(); + @Builder.Default private final boolean skipColumnStats = false; + private Optional deltaIncrementalChangesState = Optional.empty(); private final SparkSession sparkSession; @@ -123,7 +125,7 @@ public TableChange getTableChangeForCommit(Long versionNumber) { fileFormat, tableAtVersion.getPartitioningFields(), tableAtVersion.getReadSchema().getAllFields(), - true, + !skipColumnStats, DeltaPartitionExtractor.getInstance(), DeltaStatsExtractor.getInstance()); addedFiles.put(dataFile.getPhysicalPath(), dataFile); @@ -223,7 +225,8 @@ private void resetState(long versionToStartFrom) { } private List getInternalDataFiles(Snapshot snapshot, InternalSchema schema) { - try (DataFileIterator fileIterator = dataFileExtractor.iterator(snapshot, schema)) { + try (DataFileIterator fileIterator = + dataFileExtractor.iterator(snapshot, schema, !skipColumnStats)) { List dataFiles = new ArrayList<>(); fileIterator.forEachRemaining(dataFiles::add); return PartitionFileGroup.fromFiles(dataFiles); diff --git a/xtable-core/src/main/java/org/apache/xtable/delta/DeltaConversionSourceProvider.java b/xtable-core/src/main/java/org/apache/xtable/delta/DeltaConversionSourceProvider.java index 045e2b724..926dd8617 100644 --- a/xtable-core/src/main/java/org/apache/xtable/delta/DeltaConversionSourceProvider.java +++ b/xtable-core/src/main/java/org/apache/xtable/delta/DeltaConversionSourceProvider.java @@ -31,12 +31,15 @@ public class DeltaConversionSourceProvider extends ConversionSourceProvider fields; diff --git a/xtable-core/src/main/java/org/apache/xtable/delta/DeltaDataFileUpdatesExtractor.java b/xtable-core/src/main/java/org/apache/xtable/delta/DeltaDataFileUpdatesExtractor.java index 71cebf9ae..177bc43a2 100644 --- a/xtable-core/src/main/java/org/apache/xtable/delta/DeltaDataFileUpdatesExtractor.java +++ b/xtable-core/src/main/java/org/apache/xtable/delta/DeltaDataFileUpdatesExtractor.java @@ -130,6 +130,12 @@ private Stream createAddFileAction( private String getColumnStats( InternalSchema schema, long recordCount, List columnStats) { + // In skip-column-stats mode source files may not have row count/column stats. + // Return null only when row count is unknown (negative sentinel). + // Explicit rowCount=0 should be persisted as numRecords=0. + if (recordCount < 0 && (columnStats == null || columnStats.isEmpty())) { + return null; + } try { return deltaStatsExtractor.convertStatsToDeltaFormat(schema, recordCount, columnStats); } catch (JsonProcessingException e) { diff --git a/xtable-core/src/main/java/org/apache/xtable/delta/DeltaSourceConfig.java b/xtable-core/src/main/java/org/apache/xtable/delta/DeltaSourceConfig.java new file mode 100644 index 000000000..5a247503d --- /dev/null +++ b/xtable-core/src/main/java/org/apache/xtable/delta/DeltaSourceConfig.java @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.xtable.delta; + +import java.util.Properties; + +import org.apache.hadoop.conf.Configuration; + +/** Configuration keys for Delta source format. */ +public final class DeltaSourceConfig { + public static final String SKIP_COLUMN_STATS_CONFIG = "xtable.source.skip_column_stats"; + + private DeltaSourceConfig() {} + + public static boolean getSkipColumnStats(Properties properties, Configuration configuration) { + if (properties != null) { + String propertyValue = properties.getProperty(SKIP_COLUMN_STATS_CONFIG); + if (propertyValue != null) { + return Boolean.parseBoolean(propertyValue); + } + } + if (configuration == null) { + return false; + } + String configValue = configuration.get(SKIP_COLUMN_STATS_CONFIG); + return configValue != null && Boolean.parseBoolean(configValue); + } +} diff --git a/xtable-core/src/main/java/org/apache/xtable/hudi/HudiConversionSource.java b/xtable-core/src/main/java/org/apache/xtable/hudi/HudiConversionSource.java index 00faa97d3..763ac0767 100644 --- a/xtable-core/src/main/java/org/apache/xtable/hudi/HudiConversionSource.java +++ b/xtable-core/src/main/java/org/apache/xtable/hudi/HudiConversionSource.java @@ -60,6 +60,13 @@ public class HudiConversionSource implements ConversionSource { public HudiConversionSource( HoodieTableMetaClient metaClient, PathBasedPartitionSpecExtractor sourcePartitionSpecExtractor) { + this(metaClient, sourcePartitionSpecExtractor, false); + } + + public HudiConversionSource( + HoodieTableMetaClient metaClient, + PathBasedPartitionSpecExtractor sourcePartitionSpecExtractor, + boolean skipColumnStats) { this.metaClient = metaClient; this.tableExtractor = new HudiTableExtractor(new HudiSchemaExtractor(), sourcePartitionSpecExtractor); @@ -68,7 +75,8 @@ public HudiConversionSource( metaClient, new PathBasedPartitionValuesExtractor( sourcePartitionSpecExtractor.getPathToPartitionFieldFormat()), - new HudiFileStatsExtractor(metaClient)); + new HudiFileStatsExtractor(metaClient), + skipColumnStats); } @Override diff --git a/xtable-core/src/main/java/org/apache/xtable/hudi/HudiConversionSourceProvider.java b/xtable-core/src/main/java/org/apache/xtable/hudi/HudiConversionSourceProvider.java index aad7e0a16..3eabb9b7b 100644 --- a/xtable-core/src/main/java/org/apache/xtable/hudi/HudiConversionSourceProvider.java +++ b/xtable-core/src/main/java/org/apache/xtable/hudi/HudiConversionSourceProvider.java @@ -46,7 +46,9 @@ public HudiConversionSource getConversionSourceInstance(SourceTable sourceTable) final PathBasedPartitionSpecExtractor sourcePartitionSpecExtractor = HudiSourceConfig.fromProperties(sourceTable.getAdditionalProperties()) .loadSourcePartitionSpecExtractor(); + boolean skipColumnStats = + HudiSourceConfig.getSkipColumnStats(sourceTable.getAdditionalProperties(), hadoopConf); - return new HudiConversionSource(metaClient, sourcePartitionSpecExtractor); + return new HudiConversionSource(metaClient, sourcePartitionSpecExtractor, skipColumnStats); } } diff --git a/xtable-core/src/main/java/org/apache/xtable/hudi/HudiDataFileExtractor.java b/xtable-core/src/main/java/org/apache/xtable/hudi/HudiDataFileExtractor.java index 5e17b389f..1951ec407 100644 --- a/xtable-core/src/main/java/org/apache/xtable/hudi/HudiDataFileExtractor.java +++ b/xtable-core/src/main/java/org/apache/xtable/hudi/HudiDataFileExtractor.java @@ -77,6 +77,7 @@ public class HudiDataFileExtractor implements AutoCloseable { private final HoodieEngineContext engineContext; private final PathBasedPartitionValuesExtractor partitionValuesExtractor; private final HudiFileStatsExtractor fileStatsExtractor; + private final boolean skipColumnStats; private final HoodieMetadataConfig metadataConfig; private final FileSystemViewManager fileSystemViewManager; private final Path basePath; @@ -85,6 +86,14 @@ public HudiDataFileExtractor( HoodieTableMetaClient metaClient, PathBasedPartitionValuesExtractor hudiPartitionValuesExtractor, HudiFileStatsExtractor hudiFileStatsExtractor) { + this(metaClient, hudiPartitionValuesExtractor, hudiFileStatsExtractor, false); + } + + public HudiDataFileExtractor( + HoodieTableMetaClient metaClient, + PathBasedPartitionValuesExtractor hudiPartitionValuesExtractor, + HudiFileStatsExtractor hudiFileStatsExtractor, + boolean skipColumnStats) { this.engineContext = new HoodieLocalEngineContext(metaClient.getHadoopConf()); metadataConfig = HoodieMetadataConfig.newBuilder() @@ -107,6 +116,7 @@ public HudiDataFileExtractor( this.metaClient = metaClient; this.partitionValuesExtractor = hudiPartitionValuesExtractor; this.fileStatsExtractor = hudiFileStatsExtractor; + this.skipColumnStats = skipColumnStats; } public List getFilesCurrentState(InternalTable table) { @@ -132,11 +142,15 @@ public InternalFilesDiff getDiffForCommit( getAddedAndRemovedPartitionInfo( visibleTimeline, instant, fsView, hoodieInstantForDiff, table.getPartitioningFields()); - Stream filesAddedWithoutStats = allInfo.getAdded().stream(); List filesAdded = - fileStatsExtractor - .addStatsToFiles(tableMetadata, filesAddedWithoutStats, table.getReadSchema()) - .collect(Collectors.toList()); + skipColumnStats + ? fileStatsExtractor + .addRecordCountToFiles( + tableMetadata, allInfo.getAdded().stream(), table.getReadSchema()) + .collect(Collectors.toList()) + : fileStatsExtractor + .addStatsToFiles(tableMetadata, allInfo.getAdded().stream(), table.getReadSchema()) + .collect(Collectors.toList()); List filesRemoved = allInfo.getRemoved(); return InternalFilesDiff.builder().filesAdded(filesAdded).filesRemoved(filesRemoved).build(); @@ -359,6 +373,12 @@ private List getInternalDataFilesForPartitions( .getLatestBaseFiles(partitionPath) .map(baseFile -> buildFileWithoutStats(partitionValues, baseFile)); }); + if (skipColumnStats) { + Stream filesWithRecordCount = + fileStatsExtractor.addRecordCountToFiles( + tableMetadata, filesWithoutStats, table.getReadSchema()); + return PartitionFileGroup.fromFiles(filesWithRecordCount); + } Stream files = fileStatsExtractor.addStatsToFiles(tableMetadata, filesWithoutStats, table.getReadSchema()); return PartitionFileGroup.fromFiles(files); diff --git a/xtable-core/src/main/java/org/apache/xtable/hudi/HudiFileStatsExtractor.java b/xtable-core/src/main/java/org/apache/xtable/hudi/HudiFileStatsExtractor.java index 5a0b70cb9..9dd47bf07 100644 --- a/xtable-core/src/main/java/org/apache/xtable/hudi/HudiFileStatsExtractor.java +++ b/xtable-core/src/main/java/org/apache/xtable/hudi/HudiFileStatsExtractor.java @@ -102,6 +102,30 @@ public Stream addStatsToFiles( : computeColumnStatsFromParquetFooters(files, nameFieldMap); } + /** + * Adds record count information only. + * + *

This avoids materializing full min/max/null column stats in memory while still writing + * correct file row counts (for example Delta `numRecords`). + */ + public Stream addRecordCountToFiles( + HoodieTableMetadata metadataTable, Stream files, InternalSchema schema) { + boolean useMetadataTableColStats = + metadataTable != null + && metaClient + .getTableConfig() + .isMetadataPartitionAvailable(MetadataPartitionType.COLUMN_STATS); + final Map nameFieldMap = + schema.getAllFields().stream() + .collect( + Collectors.toMap( + field -> getFieldNameForStats(field, useMetadataTableColStats), + Function.identity())); + return useMetadataTableColStats + ? computeRecordCountFromMetadataTable(metadataTable, files, nameFieldMap) + : computeRecordCountFromParquetFooters(files); + } + private Stream computeColumnStatsFromParquetFooters( Stream files, Map nameFieldMap) { return files.map( @@ -115,6 +139,16 @@ private Stream computeColumnStatsFromParquetFooters( }); } + private Stream computeRecordCountFromParquetFooters( + Stream files) { + return files.map( + file -> + file.toBuilder() + .recordCount( + UTILS.getRowCount(metaClient.getHadoopConf(), new Path(file.getPhysicalPath()))) + .build()); + } + private Pair getPartitionAndFileName(String path) { Path filePath = new CachingPath(path); String partitionPath = HudiPathUtils.getPartitionPath(metaClient.getBasePathV2(), filePath); @@ -167,6 +201,37 @@ private Stream computeColumnStatsFromMetadataTable( }); } + private Stream computeRecordCountFromMetadataTable( + HoodieTableMetadata metadataTable, + Stream files, + Map nameFieldMap) { + if (nameFieldMap.isEmpty()) { + return files.map(file -> file.toBuilder().recordCount(0L).build()); + } + Map, InternalDataFile> filePathsToDataFile = + files.collect( + Collectors.toMap( + file -> getPartitionAndFileName(file.getPhysicalPath()), Function.identity())); + if (filePathsToDataFile.isEmpty()) { + return Stream.empty(); + } + // Query a single column to fetch per-file valueCount, which is sufficient for row count. + String anyField = nameFieldMap.keySet().iterator().next(); + Map, HoodieMetadataColumnStats> statsByFile = + metadataTable.getColumnStats(new ArrayList<>(filePathsToDataFile.keySet()), anyField); + return filePathsToDataFile.entrySet().stream() + .map( + entry -> { + HoodieMetadataColumnStats stats = statsByFile.get(entry.getKey()); + long recordCount = + stats != null + ? stats.getValueCount() + : UTILS.getRowCount( + metaClient.getHadoopConf(), new Path(entry.getValue().getPhysicalPath())); + return entry.getValue().toBuilder().recordCount(recordCount).build(); + }); + } + private Optional getMaxFromColumnStats(List columnStats) { return columnStats.stream() .filter(entry -> entry.getField().getParentPath() == null) diff --git a/xtable-core/src/main/java/org/apache/xtable/hudi/HudiSourceConfig.java b/xtable-core/src/main/java/org/apache/xtable/hudi/HudiSourceConfig.java index 7732c3820..6345c273f 100644 --- a/xtable-core/src/main/java/org/apache/xtable/hudi/HudiSourceConfig.java +++ b/xtable-core/src/main/java/org/apache/xtable/hudi/HudiSourceConfig.java @@ -26,6 +26,8 @@ import lombok.Value; +import org.apache.hadoop.conf.Configuration; + import com.google.common.base.Preconditions; import org.apache.xtable.model.schema.PartitionFieldSpec; @@ -39,6 +41,7 @@ public class HudiSourceConfig { "xtable.hudi.source.partition_spec_extractor_class"; public static final String PARTITION_FIELD_SPEC_CONFIG = "xtable.hudi.source.partition_field_spec_config"; + public static final String SKIP_COLUMN_STATS_CONFIG = "xtable.source.skip_column_stats"; String partitionSpecExtractorClass; List partitionFieldSpecs; @@ -84,4 +87,23 @@ public PathBasedPartitionSpecExtractor loadSourcePartitionSpecExtractor() { return ReflectionUtils.createInstanceOfClass( partitionSpecExtractorClass, this.getPartitionFieldSpecs()); } + + public static boolean getSkipColumnStats(Properties properties, Configuration configuration) { + String propertyValue = getPropertyOrNull(properties, SKIP_COLUMN_STATS_CONFIG); + if (propertyValue != null) { + return Boolean.parseBoolean(propertyValue); + } + if (configuration == null) { + return false; + } + String configValue = configuration.get(SKIP_COLUMN_STATS_CONFIG); + return configValue != null && Boolean.parseBoolean(configValue); + } + + private static String getPropertyOrNull(Properties properties, String key) { + if (properties == null) { + return null; + } + return properties.getProperty(key); + } } diff --git a/xtable-core/src/main/java/org/apache/xtable/iceberg/IcebergConversionSource.java b/xtable-core/src/main/java/org/apache/xtable/iceberg/IcebergConversionSource.java index 325b50e9b..73ea161a8 100644 --- a/xtable-core/src/main/java/org/apache/xtable/iceberg/IcebergConversionSource.java +++ b/xtable-core/src/main/java/org/apache/xtable/iceberg/IcebergConversionSource.java @@ -87,6 +87,8 @@ public class IcebergConversionSource implements ConversionSource { private final IcebergDataFileExtractor dataFileExtractor = IcebergDataFileExtractor.builder().build(); + @Builder.Default private final boolean skipColumnStats = false; + private Table initSourceTable() { IcebergTableManager tableManager = IcebergTableManager.of(hadoopConf); String[] namespace = sourceTableConfig.getNamespace(); @@ -147,6 +149,9 @@ public InternalTable getTable(Snapshot snapshot) { public InternalTable getCurrentTable() { Table iceTable = getSourceTable(); Snapshot currentSnapshot = iceTable.currentSnapshot(); + if (currentSnapshot == null) { + throw new ReadException("Unable to read latest snapshot from Iceberg source table"); + } return getTable(currentSnapshot); } @@ -166,11 +171,12 @@ public InternalSnapshot getCurrentSnapshot() { .sourceIdentifier("0") .build(); } - InternalTable irTable = getTable(currentSnapshot); - TableScan scan = - iceTable.newScan().useSnapshot(currentSnapshot.snapshotId()).includeColumnStats(); + TableScan scan = iceTable.newScan().useSnapshot(currentSnapshot.snapshotId()); + if (!skipColumnStats) { + scan = scan.includeColumnStats(); + } PartitionSpec partitionSpec = iceTable.spec(); List partitionedDataFiles; try (CloseableIterable files = scan.planFiles()) { @@ -197,7 +203,8 @@ private InternalDataFile fromIceberg( DataFile file, PartitionSpec partitionSpec, InternalTable internalTable) { List partitionValues = partitionConverter.toXTable(internalTable, file.partition(), partitionSpec); - return dataFileExtractor.fromIceberg(file, partitionValues, internalTable.getReadSchema()); + return dataFileExtractor.fromIceberg( + file, partitionValues, internalTable.getReadSchema(), !skipColumnStats); } @Override diff --git a/xtable-core/src/main/java/org/apache/xtable/iceberg/IcebergConversionSourceProvider.java b/xtable-core/src/main/java/org/apache/xtable/iceberg/IcebergConversionSourceProvider.java index 449ebe5d3..6e52a187b 100644 --- a/xtable-core/src/main/java/org/apache/xtable/iceberg/IcebergConversionSourceProvider.java +++ b/xtable-core/src/main/java/org/apache/xtable/iceberg/IcebergConversionSourceProvider.java @@ -27,9 +27,13 @@ public class IcebergConversionSourceProvider extends ConversionSourceProvider { @Override public IcebergConversionSource getConversionSourceInstance(SourceTable sourceTableConfig) { + boolean skipColumnStats = + IcebergSourceConfig.getSkipColumnStats( + sourceTableConfig.getAdditionalProperties(), hadoopConf); return IcebergConversionSource.builder() .sourceTableConfig(sourceTableConfig) .hadoopConf(hadoopConf) + .skipColumnStats(skipColumnStats) .build(); } } diff --git a/xtable-core/src/main/java/org/apache/xtable/iceberg/IcebergDataFileExtractor.java b/xtable-core/src/main/java/org/apache/xtable/iceberg/IcebergDataFileExtractor.java index 1b9d70f39..869cc27a9 100644 --- a/xtable-core/src/main/java/org/apache/xtable/iceberg/IcebergDataFileExtractor.java +++ b/xtable-core/src/main/java/org/apache/xtable/iceberg/IcebergDataFileExtractor.java @@ -53,7 +53,7 @@ InternalDataFile fromIceberg( return fromIceberg(dataFile, partitionValues, schema, true); } - private InternalDataFile fromIceberg( + InternalDataFile fromIceberg( DataFile dataFile, List partitionValues, InternalSchema schema, diff --git a/xtable-core/src/main/java/org/apache/xtable/iceberg/IcebergDataFileUpdatesSync.java b/xtable-core/src/main/java/org/apache/xtable/iceberg/IcebergDataFileUpdatesSync.java index 56948cd1b..fdbd78a88 100644 --- a/xtable-core/src/main/java/org/apache/xtable/iceberg/IcebergDataFileUpdatesSync.java +++ b/xtable-core/src/main/java/org/apache/xtable/iceberg/IcebergDataFileUpdatesSync.java @@ -122,10 +122,13 @@ private DataFile getDataFile( DataFiles.builder(partitionSpec) .withPath(dataFile.getPhysicalPath()) .withFileSizeInBytes(dataFile.getFileSizeBytes()) - .withMetrics( - columnStatsConverter.toIceberg( - schema, dataFile.getRecordCount(), dataFile.getColumnStats())) .withFormat(convertFileFormat(dataFile.getFileFormat())); + // Iceberg data files always require a record count. Persist explicit zero counts as metrics. + if (dataFile.getRecordCount() >= 0 || !dataFile.getColumnStats().isEmpty()) { + builder.withMetrics( + columnStatsConverter.toIceberg( + schema, dataFile.getRecordCount(), dataFile.getColumnStats())); + } if (partitionSpec.isPartitioned()) { builder.withPartition( partitionValueConverter.toIceberg(partitionSpec, schema, dataFile.getPartitionValues())); diff --git a/xtable-core/src/main/java/org/apache/xtable/iceberg/IcebergSourceConfig.java b/xtable-core/src/main/java/org/apache/xtable/iceberg/IcebergSourceConfig.java new file mode 100644 index 000000000..da1f47c3b --- /dev/null +++ b/xtable-core/src/main/java/org/apache/xtable/iceberg/IcebergSourceConfig.java @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.xtable.iceberg; + +import java.util.Properties; + +import org.apache.hadoop.conf.Configuration; + +/** Configuration keys for Iceberg source format. */ +public final class IcebergSourceConfig { + public static final String SKIP_COLUMN_STATS_CONFIG = "xtable.source.skip_column_stats"; + + private IcebergSourceConfig() {} + + public static boolean getSkipColumnStats(Properties properties, Configuration configuration) { + if (properties != null) { + String propertyValue = properties.getProperty(SKIP_COLUMN_STATS_CONFIG); + if (propertyValue != null) { + return Boolean.parseBoolean(propertyValue); + } + } + if (configuration == null) { + return false; + } + String configValue = configuration.get(SKIP_COLUMN_STATS_CONFIG); + return configValue != null && Boolean.parseBoolean(configValue); + } +} diff --git a/xtable-core/src/test/java/org/apache/xtable/ITConversionController.java b/xtable-core/src/test/java/org/apache/xtable/ITConversionController.java index 2da3078b6..af4e61d12 100644 --- a/xtable-core/src/test/java/org/apache/xtable/ITConversionController.java +++ b/xtable-core/src/test/java/org/apache/xtable/ITConversionController.java @@ -28,6 +28,7 @@ import static org.apache.xtable.model.storage.TableFormat.PARQUET; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertTrue; import java.net.URI; import java.nio.ByteBuffer; @@ -103,9 +104,11 @@ import org.apache.xtable.hudi.HudiTestUtil; import org.apache.xtable.iceberg.IcebergConversionSourceProvider; import org.apache.xtable.iceberg.TestIcebergDataHelper; +import org.apache.xtable.model.storage.InternalDataFile; import org.apache.xtable.model.storage.TableFormat; import org.apache.xtable.model.sync.SyncMode; import org.apache.xtable.paimon.PaimonConversionSourceProvider; +import org.apache.xtable.spi.extractor.ConversionSource; public class ITConversionController { @TempDir public static Path tempDir; @@ -174,6 +177,18 @@ private static Stream testCasesWithSyncModes() { return Stream.of(Arguments.of(SyncMode.INCREMENTAL), Arguments.of(SyncMode.FULL)); } + private static Stream sourceSkipColumnStatsAndSyncModes() { + List arguments = new ArrayList<>(); + for (String sourceFormat : Arrays.asList(HUDI, DELTA, ICEBERG)) { + for (SyncMode syncMode : SyncMode.values()) { + for (boolean skipColumnStats : new boolean[] {true, false}) { + arguments.add(Arguments.of(sourceFormat, syncMode, skipColumnStats)); + } + } + } + return arguments.stream(); + } + private ConversionSourceProvider getConversionSourceProvider(String sourceTableFormat) { switch (sourceTableFormat.toUpperCase()) { case HUDI: @@ -503,6 +518,60 @@ public void testTimeTravelQueries(String sourceTableFormat) throws Exception { } } + @ParameterizedTest + @MethodSource("sourceSkipColumnStatsAndSyncModes") + public void testSkipColumnStatsAcrossSources( + String sourceTableFormat, SyncMode syncMode, boolean skipColumnStats) throws Exception { + String tableName = getTableName(); + ConversionSourceProvider conversionSourceProvider = + getConversionSourceProvider(sourceTableFormat); + List targetTableFormats = getOtherFormats(sourceTableFormat); + try (GenericTable table = + GenericTable.getInstance(tableName, tempDir, sparkSession, jsc, sourceTableFormat, false)) { + table.insertRows(20); + try (ConversionSource conversionSource = + conversionSourceProvider.getConversionSourceInstance( + SourceTable.builder() + .name(tableName) + .formatName(sourceTableFormat) + .basePath(table.getBasePath()) + .dataPath(table.getDataPath()) + .additionalProperties( + getSourcePropertiesForSkipColumnStats(skipColumnStats, null)) + .build())) { + List dataFiles = + conversionSource.getCurrentSnapshot().getPartitionedDataFiles().stream() + .flatMap(group -> group.getDataFiles().stream()) + .collect(Collectors.toList()); + assertFalse(dataFiles.isEmpty(), "Expected at least one data file in source snapshot"); + boolean hasAnyColumnStats = + dataFiles.stream().anyMatch(file -> !file.getColumnStats().isEmpty()); + if (skipColumnStats) { + assertFalse( + hasAnyColumnStats, "Column stats should be skipped when skip_column_stats=true"); + } else { + assertTrue( + hasAnyColumnStats, "Column stats should be present when skip_column_stats=false"); + } + + ConversionConfig conversionConfig = + getTableSyncConfig( + sourceTableFormat, + syncMode, + tableName, + table, + targetTableFormats, + null, + null, + getSourcePropertiesForSkipColumnStats(skipColumnStats, null)); + ConversionController conversionController = + new ConversionController(jsc.hadoopConfiguration()); + conversionController.sync(conversionConfig, conversionSourceProvider); + checkDatasetEquivalence(sourceTableFormat, table, targetTableFormats, 20); + } + } + } + private static List getOtherFormats(String sourceTableFormat) { return Arrays.stream(TableFormat.values()) .filter(fmt -> !fmt.equals(sourceTableFormat)) @@ -1115,7 +1184,28 @@ private static ConversionConfig getTableSyncConfig( List targetTableFormats, String partitionConfig, Duration metadataRetention) { + return getTableSyncConfig( + sourceTableFormat, + syncMode, + tableName, + table, + targetTableFormats, + partitionConfig, + metadataRetention, + new Properties()); + } + + private static ConversionConfig getTableSyncConfig( + String sourceTableFormat, + SyncMode syncMode, + String tableName, + GenericTable table, + List targetTableFormats, + String partitionConfig, + Duration metadataRetention, + Properties additionalSourceProperties) { Properties sourceProperties = new Properties(); + sourceProperties.putAll(additionalSourceProperties); if (partitionConfig != null) { sourceProperties.put(PARTITION_FIELD_SPEC_CONFIG, partitionConfig); } @@ -1147,4 +1237,14 @@ private static ConversionConfig getTableSyncConfig( .syncMode(syncMode) .build(); } + + private static Properties getSourcePropertiesForSkipColumnStats( + boolean skipColumnStats, String partitionConfig) { + Properties sourceProperties = new Properties(); + sourceProperties.put("xtable.source.skip_column_stats", String.valueOf(skipColumnStats)); + if (partitionConfig != null) { + sourceProperties.put(PARTITION_FIELD_SPEC_CONFIG, partitionConfig); + } + return sourceProperties; + } } diff --git a/xtable-core/src/test/java/org/apache/xtable/delta/TestDeltaDataFileUpdatesExtractor.java b/xtable-core/src/test/java/org/apache/xtable/delta/TestDeltaDataFileUpdatesExtractor.java new file mode 100644 index 000000000..8accbfcbb --- /dev/null +++ b/xtable-core/src/test/java/org/apache/xtable/delta/TestDeltaDataFileUpdatesExtractor.java @@ -0,0 +1,91 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.xtable.delta; + +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertTrue; + +import java.util.Collections; + +import org.junit.jupiter.api.Test; + +import org.apache.spark.sql.delta.actions.Action; +import org.apache.spark.sql.delta.actions.AddFile; + +import scala.collection.JavaConverters; +import scala.collection.Seq; + +import org.apache.xtable.model.schema.InternalField; +import org.apache.xtable.model.schema.InternalSchema; +import org.apache.xtable.model.schema.InternalType; +import org.apache.xtable.model.storage.FileFormat; +import org.apache.xtable.model.storage.InternalDataFile; +import org.apache.xtable.model.storage.InternalFilesDiff; + +class TestDeltaDataFileUpdatesExtractor { + + @Test + void shouldPersistNumRecordsZeroWhenRecordCountIsZeroAndColumnStatsAreEmpty() { + InternalSchema schema = + InternalSchema.builder() + .name("root") + .dataType(InternalType.RECORD) + .fields( + Collections.singletonList( + InternalField.builder() + .name("id") + .schema( + InternalSchema.builder() + .name("id") + .dataType(InternalType.INT) + .isNullable(true) + .build()) + .build())) + .isNullable(false) + .build(); + + InternalDataFile dataFile = + InternalDataFile.builder() + .physicalPath("s3://bucket/table/path/file-1.parquet") + .fileFormat(FileFormat.APACHE_PARQUET) + .fileSizeBytes(128) + .recordCount(0) + .columnStats(Collections.emptyList()) + .lastModified(1234L) + .build(); + + InternalFilesDiff diff = + InternalFilesDiff.builder() + .fileAdded(dataFile) + .filesRemoved(Collections.emptySet()) + .build(); + + DeltaDataFileUpdatesExtractor extractor = DeltaDataFileUpdatesExtractor.builder().build(); + Seq actions = extractor.applyDiff(diff, schema, "s3://bucket/table/path"); + AddFile addFile = + JavaConverters.seqAsJavaList(actions).stream() + .filter(AddFile.class::isInstance) + .map(AddFile.class::cast) + .findFirst() + .orElseThrow(() -> new AssertionError("Expected AddFile action")); + + assertNotNull(addFile.stats()); + assertTrue(addFile.stats().contains("\"numRecords\":0")); + } +} diff --git a/xtable-core/src/test/java/org/apache/xtable/hudi/TestHudiFileStatsExtractor.java b/xtable-core/src/test/java/org/apache/xtable/hudi/TestHudiFileStatsExtractor.java index a18bb743d..fe30beb9b 100644 --- a/xtable-core/src/test/java/org/apache/xtable/hudi/TestHudiFileStatsExtractor.java +++ b/xtable-core/src/test/java/org/apache/xtable/hudi/TestHudiFileStatsExtractor.java @@ -162,6 +162,51 @@ void columnStatsWithMetadataTable(@TempDir Path tempDir) throws Exception { validateOutput(output); } + @Test + void recordCountOnlyWithMetadataTable(@TempDir Path tempDir) throws Exception { + String tableName = GenericTable.getTableName(); + String basePath; + try (TestJavaHudiTable table = + TestJavaHudiTable.withSchema( + tableName, tempDir, "long_field:SIMPLE", HoodieTableType.COPY_ON_WRITE, AVRO_SCHEMA)) { + List> records = + getRecords().stream().map(this::buildRecord).collect(Collectors.toList()); + table.insertRecords(true, records); + basePath = table.getBasePath(); + } + HoodieTableMetadata tableMetadata = + HoodieTableMetadata.create( + new HoodieJavaEngineContext(configuration), + HoodieMetadataConfig.newBuilder().enable(true).build(), + basePath, + true); + Path parquetFile = + Files.list(Paths.get(new URI(basePath))) + .filter(path -> path.toString().endsWith(".parquet")) + .findFirst() + .orElseThrow(() -> new RuntimeException("No files found")); + InternalDataFile inputFile = + InternalDataFile.builder() + .physicalPath(parquetFile.toString()) + .columnStats(Collections.emptyList()) + .fileFormat(FileFormat.APACHE_PARQUET) + .lastModified(1234L) + .fileSizeBytes(4321L) + .recordCount(0) + .build(); + HoodieTableMetaClient metaClient = + HoodieTableMetaClient.builder().setBasePath(basePath).setConf(configuration).build(); + HudiFileStatsExtractor fileStatsExtractor = new HudiFileStatsExtractor(metaClient); + List output = + fileStatsExtractor + .addRecordCountToFiles(tableMetadata, Stream.of(inputFile), schema) + .collect(Collectors.toList()); + + assertEquals(1, output.size()); + assertEquals(2, output.get(0).getRecordCount()); + assertEquals(0, output.get(0).getColumnStats().size()); + } + @Test void columnStatsWithoutMetadataTable(@TempDir Path tempDir) throws IOException { Path file = tempDir.resolve("tmp.parquet"); @@ -199,6 +244,46 @@ void columnStatsWithoutMetadataTable(@TempDir Path tempDir) throws IOException { validateOutput(output); } + @Test + void recordCountOnlyWithoutMetadataTable(@TempDir Path tempDir) throws IOException { + Path file = tempDir.resolve("tmp.parquet"); + GenericData genericData = GenericData.get(); + genericData.addLogicalTypeConversion(new Conversions.DecimalConversion()); + try (ParquetWriter writer = + AvroParquetWriter.builder( + HadoopOutputFile.fromPath( + new org.apache.hadoop.fs.Path(file.toUri()), configuration)) + .withSchema(AVRO_SCHEMA) + .withDataModel(genericData) + .build()) { + for (GenericRecord record : getRecords()) { + writer.write(record); + } + } + + InternalDataFile inputFile = + InternalDataFile.builder() + .physicalPath(file.toString()) + .columnStats(Collections.emptyList()) + .fileFormat(FileFormat.APACHE_PARQUET) + .lastModified(1234L) + .fileSizeBytes(4321L) + .recordCount(0) + .build(); + + HoodieTableMetaClient mockMetaClient = mock(HoodieTableMetaClient.class); + when(mockMetaClient.getHadoopConf()).thenReturn(configuration); + HudiFileStatsExtractor fileStatsExtractor = new HudiFileStatsExtractor(mockMetaClient); + List output = + fileStatsExtractor + .addRecordCountToFiles(null, Stream.of(inputFile), schema) + .collect(Collectors.toList()); + + assertEquals(1, output.size()); + assertEquals(2, output.get(0).getRecordCount()); + assertEquals(0, output.get(0).getColumnStats().size()); + } + private void validateOutput(List output) { assertEquals(1, output.size()); InternalDataFile fileWithStats = output.get(0); diff --git a/xtable-core/src/test/java/org/apache/xtable/iceberg/TestIcebergSync.java b/xtable-core/src/test/java/org/apache/xtable/iceberg/TestIcebergSync.java index cc70d4c0b..199a2c790 100644 --- a/xtable-core/src/test/java/org/apache/xtable/iceberg/TestIcebergSync.java +++ b/xtable-core/src/test/java/org/apache/xtable/iceberg/TestIcebergSync.java @@ -299,6 +299,33 @@ public void testCreateSnapshotControlFlow() throws Exception { transactionArgumentCaptor.getAllValues().get(2)); } + @Test + public void testSnapshotWithZeroRecordCountFileAndNoColumnStats() throws Exception { + InternalTable table = + getInternalTable(tableName, basePath, internalSchema, null, LAST_COMMIT_TIME); + + InternalDataFile dataFileWithZeroCount = + getDataFile(1, Collections.emptyList()).toBuilder().recordCount(0).build(); + InternalSnapshot snapshot = buildSnapshot(table, "0", dataFileWithZeroCount); + + when(mockSchemaExtractor.toIceberg(internalSchema)).thenReturn(icebergSchema); + when(mockPartitionSpecExtractor.toIceberg(eq(null), any())) + .thenReturn(PartitionSpec.unpartitioned()); + mockColStatsForFile(dataFileWithZeroCount, 1); + + TableFormatSync.getInstance() + .syncSnapshot(Collections.singletonList(conversionTarget), snapshot); + + validateIcebergTable( + tableName, + table, + Sets.newHashSet(dataFileWithZeroCount), + Expressions.alwaysTrue(), + icebergSchema); + verify(mockColumnStatsConverter, times(1)) + .toIceberg(any(Schema.class), eq(0L), eq(Collections.emptyList())); + } + @Test public void testIncompleteWriteRollback() throws Exception { List fields2 = new ArrayList<>(internalSchema.getFields()); diff --git a/xtable-utilities/src/main/java/org/apache/xtable/utilities/RunCatalogSync.java b/xtable-utilities/src/main/java/org/apache/xtable/utilities/RunCatalogSync.java index e04d69850..e45ecd08c 100644 --- a/xtable-utilities/src/main/java/org/apache/xtable/utilities/RunCatalogSync.java +++ b/xtable-utilities/src/main/java/org/apache/xtable/utilities/RunCatalogSync.java @@ -48,6 +48,8 @@ import org.apache.commons.cli.Options; import org.apache.commons.cli.ParseException; import org.apache.hadoop.conf.Configuration; +import org.apache.logging.log4j.Level; +import org.apache.logging.log4j.core.config.Configurator; import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.dataformat.yaml.YAMLFactory; @@ -111,6 +113,10 @@ public class RunCatalogSync { .addOption(HELP_OPTION, "help", false, "Displays help information to run this utility"); public static void main(String[] args) throws Exception { + // Reduce noisy per-partition FS-view logs while keeping useful metadata index INFO logs. + Configurator.setLevel( + "org.apache.hudi.common.table.view.AbstractTableFileSystemView", Level.WARN); + CommandLineParser parser = new DefaultParser(); CommandLine cmd; try {