From c7217d2147416ec39b5a3dc7257d277d2e702319 Mon Sep 17 00:00:00 2001 From: mao-liu <1684060+mao-liu@users.noreply.github.com> Date: Tue, 6 Jan 2026 12:43:06 +1100 Subject: [PATCH 1/4] WIP: manifest read optimisations --- .../java/org/apache/paimon/CoreOptions.java | 13 +++ .../apache/paimon/manifest/ManifestFile.java | 7 ++ .../operation/AbstractFileStoreScan.java | 57 +++++++++- .../operation/FileSystemWriteRestore.java | 98 ++++++++++++++++- .../paimon/operation/ManifestsReader.java | 4 + .../org/apache/paimon/utils/ObjectsCache.java | 7 ++ .../paimon/flink/sink/CompactorSink.java | 11 +- .../flink/sink/StoreCompactOperator.java | 76 +++++++++++++ .../paimon/format/avro/AvroBulkFormat.java | 100 +++++++++++++++++- 9 files changed, 365 insertions(+), 8 deletions(-) diff --git a/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java b/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java index a3da8ef5f1cb..e28b03be1236 100644 --- a/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java +++ b/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java @@ -1801,6 +1801,15 @@ public InlineElement getDescription() { "For DELETE manifest entry in manifest file, drop stats to reduce memory and storage." + " Default value is false only for compatibility of old reader."); + public static final ConfigOption MANIFEST_PREFETCH_ENTRIES = + key("manifest.prefetch-entries") + .booleanType() + .defaultValue(false) + .withDescription( + "Prefetch all manifest entries when initializing writers, reduces API requests to object store filesystems." + + " This is useful for jobs that write to or compacts many partitions at once," + + " however more memory is required as the entire manifest is loaded."); + public static final ConfigOption DATA_FILE_THIN_MODE = key("data-file.thin-mode") .booleanType() @@ -2537,6 +2546,10 @@ public boolean manifestDeleteFileDropStats() { return options.get(MANIFEST_DELETE_FILE_DROP_STATS); } + public boolean prefetchManifestEntries() { + return options.get(MANIFEST_PREFETCH_ENTRIES); + } + public boolean disableNullToNotNull() { return options.get(DISABLE_ALTER_COLUMN_NULL_TO_NOT_NULL); } diff --git a/paimon-core/src/main/java/org/apache/paimon/manifest/ManifestFile.java b/paimon-core/src/main/java/org/apache/paimon/manifest/ManifestFile.java index 38d8e0a43711..b549a5f0b390 100644 --- a/paimon-core/src/main/java/org/apache/paimon/manifest/ManifestFile.java +++ b/paimon-core/src/main/java/org/apache/paimon/manifest/ManifestFile.java @@ -40,6 +40,9 @@ import org.apache.paimon.utils.SegmentsCache; import org.apache.paimon.utils.VersionedObjectSerializer; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + import javax.annotation.Nullable; import java.io.IOException; @@ -53,6 +56,8 @@ */ public class ManifestFile extends ObjectsFile { + private static final Logger LOG = LoggerFactory.getLogger(ManifestFile.class); + private final SchemaManager schemaManager; private final RowType partitionType; private final FormatWriterFactory writerFactory; @@ -108,6 +113,7 @@ public List read( try { Path path = pathFactory.toPath(fileName); if (cache != null) { + LOG.info("Reading manifest file from cache: {}", fileName); return cache.read( path, fileSize, @@ -115,6 +121,7 @@ public List read( partitionFilter, bucketFilter, readFilter, readTFilter)); } + LOG.info("Reading manifest file without cache: {}", fileName); return readFromIterator( createIterator(path, fileSize), serializer, readFilter, readTFilter); } catch (IOException e) { diff --git a/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreScan.java b/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreScan.java index 7ed77406c777..1286c49c3255 100644 --- a/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreScan.java +++ b/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreScan.java @@ -43,6 +43,9 @@ import org.apache.paimon.utils.Pair; import org.apache.paimon.utils.SnapshotManager; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + import javax.annotation.Nullable; import java.util.ArrayList; @@ -67,6 +70,8 @@ /** Default implementation of {@link FileStoreScan}. */ public abstract class AbstractFileStoreScan implements FileStoreScan { + private static final Logger LOG = LoggerFactory.getLogger(AbstractFileStoreScan.class); + private final ManifestsReader manifestsReader; private final SnapshotManager snapshotManager; private final ManifestFile.Factory manifestFileFactory; @@ -251,14 +256,35 @@ public ManifestsReader manifestsReader() { @Override public Plan plan() { long started = System.nanoTime(); + long startManifests = System.currentTimeMillis(); ManifestsReader.Result manifestsResult = readManifests(); + long manifestsDuration = System.currentTimeMillis() - startManifests; + LOG.info( + "Read manifests, duration: {}ms, count: {}", + manifestsDuration, + manifestsResult.filteredManifests.size()); + Snapshot snapshot = manifestsResult.snapshot; List manifests = manifestsResult.filteredManifests; + long startIterator = System.currentTimeMillis(); Iterator iterator = readManifestEntries(manifests, false); + long iteratorDuration = System.currentTimeMillis() - startIterator; + LOG.info("Obtained iterator for manifest entries, duration: {}ms", iteratorDuration); + List files = new ArrayList<>(); while (iterator.hasNext()) { - files.add(iterator.next()); + long startIteration = System.currentTimeMillis(); + ManifestEntry entry = iterator.next(); + long iterationDuration = System.currentTimeMillis() - startIteration; + // this step is pretty fast - entries are become available in "chunks" + // i.e. once a manifest is read in full, and returned by readManifest, the individual + // entries are processed and made available to the iterator in one batch + // LOG.info( + // "Iterated manifest entry: {}, duration: {}ms", + // entry.file().fileName(), + // iterationDuration); + files.add(entry); } if (wholeBucketFilterEnabled()) { @@ -372,16 +398,28 @@ private Iterator readAndMergeFileEntries( List manifests, Function, List> converter, boolean useSequential) { + long startDeletedEntriesTime = System.currentTimeMillis(); Set deletedEntries = FileEntry.readDeletedEntries( manifest -> readManifest(manifest, FileEntry.deletedFilter(), null), manifests, parallelism); + long deletedEntriesDuration = System.currentTimeMillis() - startDeletedEntriesTime; + LOG.info( + "Read deleted entries: {}, duration: {}ms", + deletedEntries.size(), + deletedEntriesDuration); + long startFilterTime = System.currentTimeMillis(); manifests = manifests.stream() .filter(file -> file.numAddedFiles() > 0) .collect(Collectors.toList()); + long filterDuration = System.currentTimeMillis() - startFilterTime; + LOG.info( + "Filtered manifests: {} remaining manifests, duration: {}ms", + manifests.size(), + filterDuration); Function> processor = manifest -> @@ -391,8 +429,10 @@ private Iterator readAndMergeFileEntries( FileEntry.addFilter(), entry -> !deletedEntries.contains(entry.identifier()))); if (useSequential) { + LOG.info("Using sequential execution for file entry processing"); return sequentialBatchedExecute(processor, manifests, parallelism).iterator(); } else { + LOG.info("Using random execution for file entry processing"); return randomlyExecuteSequentialReturn(processor, manifests, parallelism); } } @@ -446,6 +486,8 @@ private List readManifest( ManifestFileMeta manifest, @Nullable Filter additionalFilter, @Nullable Filter additionalTFilter) { + long startTime = System.currentTimeMillis(); + LOG.info("Reading manifest file: {}", manifest.fileName()); List entries = manifestFileFactory .create() @@ -462,12 +504,25 @@ private List readManifest( && (manifestEntryFilter == null || manifestEntryFilter.test(entry)) && filterByStats(entry)); + // SLOW PART IS HERE!!! + long duration = System.currentTimeMillis() - startTime; + LOG.info( + "Successfully read manifest file: {}, entries: {}, duration: {}ms", + manifest.fileName(), + entries.size(), + duration); if (dropStats) { List copied = new ArrayList<>(entries.size()); for (ManifestEntry entry : entries) { copied.add(dropStats(entry)); } entries = copied; + long duration2 = System.currentTimeMillis() - startTime; + LOG.info( + "Successfully pruned manifest file due to dropStats: {}, entries: {}, duration: {}ms", + manifest.fileName(), + entries.size(), + duration2); } return entries; } diff --git a/paimon-core/src/main/java/org/apache/paimon/operation/FileSystemWriteRestore.java b/paimon-core/src/main/java/org/apache/paimon/operation/FileSystemWriteRestore.java index e7faf2a24569..c59d26547d0b 100644 --- a/paimon-core/src/main/java/org/apache/paimon/operation/FileSystemWriteRestore.java +++ b/paimon-core/src/main/java/org/apache/paimon/operation/FileSystemWriteRestore.java @@ -24,21 +24,36 @@ import org.apache.paimon.index.IndexFileHandler; import org.apache.paimon.index.IndexFileMeta; import org.apache.paimon.io.DataFileMeta; +import org.apache.paimon.manifest.BucketFilter; import org.apache.paimon.manifest.ManifestEntry; +import org.apache.paimon.partition.PartitionPredicate; +import org.apache.paimon.types.RowType; import org.apache.paimon.utils.SnapshotManager; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.Nullable; + import java.util.ArrayList; +import java.util.Collections; import java.util.List; +import java.util.stream.Collectors; import static org.apache.paimon.deletionvectors.DeletionVectorsIndexFile.DELETION_VECTORS_INDEX; /** {@link WriteRestore} to restore files directly from file system. */ public class FileSystemWriteRestore implements WriteRestore { + private static final Logger LOG = LoggerFactory.getLogger(FileSystemWriteRestore.class); + private final SnapshotManager snapshotManager; private final FileStoreScan scan; private final IndexFileHandler indexFileHandler; + private final Boolean usePrefetchManifestEntries; + @Nullable private transient PrefetchedManifestEntries prefetchedManifestEntries; + public FileSystemWriteRestore( CoreOptions options, SnapshotManager snapshotManager, @@ -52,6 +67,7 @@ public FileSystemWriteRestore( this.scan.dropStats(); } } + this.usePrefetchManifestEntries = options.prefetchManifestEntries(); } @Override @@ -62,6 +78,19 @@ public long latestCommittedIdentifier(String user) { .orElse(Long.MIN_VALUE); } + public synchronized PrefetchedManifestEntries prefetchManifestEntries(Snapshot snapshot) { + RowType partitionType = scan.manifestsReader().partitionType(); + List manifestEntries = scan.withSnapshot(snapshot).plan().files(); + LOG.info( + "FileSystemWriteRestore prefetched manifestEntries for snapshot {}: {} entries", + snapshot.id(), + manifestEntries.size()); + + prefetchedManifestEntries = + new PrefetchedManifestEntries(snapshot, partitionType, manifestEntries); + return prefetchedManifestEntries; + } + @Override public RestoreFiles restoreFiles( BinaryRow partition, @@ -75,9 +104,29 @@ public RestoreFiles restoreFiles( return RestoreFiles.empty(); } + List entries; + if (usePrefetchManifestEntries) { + // local reference of the prefetch container for safety across mutation + PrefetchedManifestEntries prefetch = prefetchedManifestEntries; + if (prefetch == null || prefetch.snapshot().id() != snapshot.id()) { + // refresh the prefetched manifest entries + prefetch = prefetchManifestEntries(snapshot); + } + entries = prefetch.filter(partition, bucket); + } else { + entries = + scan.withSnapshot(snapshot) + .withPartitionBucket(partition, bucket) + .plan() + .files(); + } + LOG.info( + "FileSystemWriteRestore filtered manifestEntries for {}, {}: {} entries", + partition, + bucket, + entries.size()); + List restoreFiles = new ArrayList<>(); - List entries = - scan.withSnapshot(snapshot).withPartitionBucket(partition, bucket).plan().files(); Integer totalBuckets = WriteRestore.extractDataFiles(entries, restoreFiles); IndexFileMeta dynamicBucketIndex = null; @@ -95,4 +144,49 @@ public RestoreFiles restoreFiles( return new RestoreFiles( snapshot, totalBuckets, restoreFiles, dynamicBucketIndex, deleteVectorsIndex); } + + /** + * Container for a {@link Snapshot}'s manifest entries, used by {@link FileSystemWriteRestore} + * to broker thread-safe access to cached results. + */ + public static class PrefetchedManifestEntries { + + private final Snapshot snapshot; + private final RowType partitionType; + private final List manifestEntries; + + public PrefetchedManifestEntries( + Snapshot snapshot, RowType partitionType, List manifestEntries) { + this.snapshot = snapshot; + this.partitionType = partitionType; + this.manifestEntries = manifestEntries; + } + + public Snapshot snapshot() { + return snapshot; + } + + public RowType partitionType() { + return partitionType; + } + + public List manifestEntries() { + return manifestEntries; + } + + public List filter(BinaryRow partition, int bucket) { + PartitionPredicate partitionPredicate = + PartitionPredicate.fromMultiple( + partitionType, Collections.singletonList(partition)); + + BucketFilter bucketFilter = BucketFilter.create(false, bucket, null, null); + return manifestEntries.stream() + .filter( + m -> + (partitionPredicate == null + || partitionPredicate.test(m.partition())) + && bucketFilter.test(m.bucket(), m.totalBuckets())) + .collect(Collectors.toList()); + } + } } diff --git a/paimon-core/src/main/java/org/apache/paimon/operation/ManifestsReader.java b/paimon-core/src/main/java/org/apache/paimon/operation/ManifestsReader.java index b3b89e72aafb..f6ce0c0c3d6f 100644 --- a/paimon-core/src/main/java/org/apache/paimon/operation/ManifestsReader.java +++ b/paimon-core/src/main/java/org/apache/paimon/operation/ManifestsReader.java @@ -66,6 +66,10 @@ public ManifestsReader( this.manifestListFactory = manifestListFactory; } + public RowType partitionType() { + return partitionType; + } + public ManifestsReader onlyReadRealBuckets() { this.onlyReadRealBuckets = true; return this; diff --git a/paimon-core/src/main/java/org/apache/paimon/utils/ObjectsCache.java b/paimon-core/src/main/java/org/apache/paimon/utils/ObjectsCache.java index 21b17549b3bf..85fb14c3038c 100644 --- a/paimon-core/src/main/java/org/apache/paimon/utils/ObjectsCache.java +++ b/paimon-core/src/main/java/org/apache/paimon/utils/ObjectsCache.java @@ -24,6 +24,9 @@ import org.apache.paimon.operation.metrics.CacheMetrics; import org.apache.paimon.types.RowType; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + import javax.annotation.Nullable; import javax.annotation.concurrent.ThreadSafe; @@ -36,6 +39,8 @@ @ThreadSafe public abstract class ObjectsCache { + private static final Logger LOG = LoggerFactory.getLogger(ObjectsCache.class); + protected final SegmentsCache cache; protected final ObjectSerializer projectedSerializer; protected final ThreadLocal formatSerializer; @@ -69,8 +74,10 @@ public List read(K key, @Nullable Long fileSize, Filters filters) throws I if (cacheMetrics != null) { cacheMetrics.increaseHitObject(); } + LOG.info("ObjectsCache cache-hit for {}", key.toString()); return readFromSegments(segments, filters); } else { + LOG.info("ObjectsCache cache-miss for {}", key.toString()); if (cacheMetrics != null) { cacheMetrics.increaseMissedObject(); } diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/CompactorSink.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/CompactorSink.java index a9c6031dfa34..cfaabe9564a2 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/CompactorSink.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/CompactorSink.java @@ -19,11 +19,14 @@ package org.apache.paimon.flink.sink; import org.apache.paimon.manifest.ManifestCommittable; +import org.apache.paimon.options.Options; import org.apache.paimon.table.FileStoreTable; import org.apache.flink.streaming.api.operators.OneInputStreamOperatorFactory; import org.apache.flink.table.data.RowData; +import static org.apache.paimon.flink.FlinkConnectorOptions.SINK_WRITER_COORDINATOR_ENABLED; + /** {@link FlinkSink} for dedicated compact jobs. */ public class CompactorSink extends FlinkSink { @@ -39,7 +42,13 @@ public CompactorSink(FileStoreTable table, boolean fullCompaction) { @Override protected OneInputStreamOperatorFactory createWriteOperatorFactory( StoreSinkWrite.Provider writeProvider, String commitUser) { - return new StoreCompactOperator.Factory(table, writeProvider, commitUser, fullCompaction); + Options options = table.coreOptions().toConfiguration(); + boolean coordinatorEnabled = options.get(SINK_WRITER_COORDINATOR_ENABLED); + return coordinatorEnabled + ? new StoreCompactOperator.CoordinatedFactory( + table, writeProvider, commitUser, fullCompaction) + : new StoreCompactOperator.Factory( + table, writeProvider, commitUser, fullCompaction); } @Override diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreCompactOperator.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreCompactOperator.java index 989c8e91dc83..2246e9f5a415 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreCompactOperator.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreCompactOperator.java @@ -21,17 +21,24 @@ import org.apache.paimon.CoreOptions; import org.apache.paimon.annotation.VisibleForTesting; import org.apache.paimon.data.BinaryRow; +import org.apache.paimon.flink.sink.coordinator.CoordinatedWriteRestore; +import org.apache.paimon.flink.sink.coordinator.WriteOperatorCoordinator; import org.apache.paimon.flink.utils.RuntimeContextUtils; import org.apache.paimon.io.DataFileMeta; import org.apache.paimon.io.DataFileMetaSerializer; +import org.apache.paimon.operation.WriteRestore; import org.apache.paimon.options.Options; import org.apache.paimon.table.FileStoreTable; import org.apache.paimon.table.sink.ChannelComputer; import org.apache.paimon.utils.Pair; import org.apache.paimon.utils.Preconditions; +import org.apache.flink.runtime.jobgraph.OperatorID; +import org.apache.flink.runtime.jobgraph.tasks.TaskOperatorEventGateway; +import org.apache.flink.runtime.operators.coordination.OperatorCoordinator; import org.apache.flink.runtime.state.StateInitializationContext; import org.apache.flink.runtime.state.StateSnapshotContext; +import org.apache.flink.streaming.api.operators.CoordinatedOperatorFactory; import org.apache.flink.streaming.api.operators.StreamOperator; import org.apache.flink.streaming.api.operators.StreamOperatorFactory; import org.apache.flink.streaming.api.operators.StreamOperatorParameters; @@ -69,6 +76,7 @@ public class StoreCompactOperator extends PrepareCommitOperator> waitToCompact; + protected transient @Nullable WriteRestore writeRestore; protected transient @Nullable WriterRefresher writeRefresher; @@ -119,9 +127,16 @@ public void initializeState(StateInitializationContext context) throws Exception getContainingTask().getEnvironment().getIOManager(), memoryPoolFactory, getMetricGroup()); + if (writeRestore != null) { + write.setWriteRestore(writeRestore); + } this.writeRefresher = WriterRefresher.create(write.streamingMode(), table, write::replace); } + public void setWriteRestore(@Nullable WriteRestore writeRestore) { + this.writeRestore = writeRestore; + } + @Override public void open() throws Exception { super.open(); @@ -244,4 +259,65 @@ public Class getStreamOperatorClass(ClassLoader classL return StoreCompactOperator.class; } } + + /** {@link StreamOperatorFactory} of {@link StoreCompactOperator}. */ + public static class CoordinatedFactory + extends PrepareCommitOperator.Factory + implements CoordinatedOperatorFactory { + + private static final long serialVersionUID = 1L; + + private final FileStoreTable table; + private final StoreSinkWrite.Provider storeSinkWriteProvider; + private final String initialCommitUser; + private final boolean fullCompaction; + + public CoordinatedFactory( + FileStoreTable table, + StoreSinkWrite.Provider storeSinkWriteProvider, + String initialCommitUser, + boolean fullCompaction) { + super(Options.fromMap(table.options())); + Preconditions.checkArgument( + !table.coreOptions().writeOnly(), + CoreOptions.WRITE_ONLY.key() + " should not be true for StoreCompactOperator."); + this.table = table; + this.storeSinkWriteProvider = storeSinkWriteProvider; + this.initialCommitUser = initialCommitUser; + this.fullCompaction = fullCompaction; + } + + @Override + public OperatorCoordinator.Provider getCoordinatorProvider( + String operatorName, OperatorID operatorID) { + return new WriteOperatorCoordinator.Provider(operatorID, table); + } + + @Override + @SuppressWarnings("unchecked") + public > T createStreamOperator( + StreamOperatorParameters parameters) { + OperatorID operatorID = parameters.getStreamConfig().getOperatorID(); + TaskOperatorEventGateway gateway = + parameters + .getContainingTask() + .getEnvironment() + .getOperatorCoordinatorEventGateway(); + + StoreCompactOperator operator = + new StoreCompactOperator( + parameters, + table, + storeSinkWriteProvider, + initialCommitUser, + fullCompaction); + + operator.setWriteRestore(new CoordinatedWriteRestore(gateway, operatorID)); + return (T) operator; + } + + public Class getStreamOperatorClass(ClassLoader classLoader) { + return StoreCompactOperator.class; + } + } } diff --git a/paimon-format/src/main/java/org/apache/paimon/format/avro/AvroBulkFormat.java b/paimon-format/src/main/java/org/apache/paimon/format/avro/AvroBulkFormat.java index a06ca9948c44..dc37a9cf6c8d 100644 --- a/paimon-format/src/main/java/org/apache/paimon/format/avro/AvroBulkFormat.java +++ b/paimon-format/src/main/java/org/apache/paimon/format/avro/AvroBulkFormat.java @@ -20,6 +20,7 @@ import org.apache.paimon.data.InternalRow; import org.apache.paimon.format.FormatReaderFactory; +import org.apache.paimon.fs.ByteArraySeekableStream; import org.apache.paimon.fs.FileIO; import org.apache.paimon.fs.Path; import org.apache.paimon.reader.FileRecordReader; @@ -33,15 +34,25 @@ import org.apache.avro.file.DataFileReader; import org.apache.avro.file.SeekableInput; import org.apache.avro.io.DatumReader; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import javax.annotation.Nullable; +import java.io.File; +import java.io.FileOutputStream; import java.io.IOException; +import java.io.InputStream; import java.util.function.Supplier; /** Provides a {@link FormatReaderFactory} for Avro records. */ public class AvroBulkFormat implements FormatReaderFactory { + private static final Logger LOG = LoggerFactory.getLogger(AvroBulkFormat.class); + + private static final String CACHE_DIR = + System.getProperty("java.io.tmpdir") + "/paimon_avro_cache"; + protected final RowType projectedRowType; public AvroBulkFormat(RowType projectedRowType) { @@ -78,8 +89,52 @@ private AvroReader(FileIO fileIO, Path path, long fileSize) throws IOException { private DataFileReader createReaderFromPath(Path path, long fileSize) throws IOException { DatumReader datumReader = new AvroRowDatumReader(projectedRowType); - SeekableInput in = - new SeekableInputStreamWrapper(fileIO.newInputStream(path), fileSize); + + // LOG.info("createReaderFromPath for {}, {}", path, fileSize); + // long startTime = System.currentTimeMillis(); + + // byte[] content = new byte[(int) fileSize]; + // File localCacheFile = getLocalCacheFile(path); + // if (localCacheFile.exists() && localCacheFile.length() == fileSize) { + // LOG.info("Using local cache for {}: {}", path, localCacheFile); + // try (InputStream inputStream = + // new LocalFileIO.LocalSeekableInputStream(localCacheFile)) { + // IOUtils.readFully(inputStream, content); + // } + // LOG.info( + // "Loaded local cache for {}: {}, duration: {}ms", + // path, + // localCacheFile, + // System.currentTimeMillis() - startTime); + // } else { + // LOG.info("Downloading remote file to local cache for {}: {}", path, + // localCacheFile); + // try (InputStream inputStream = fileIO.newInputStream(path)) { + // IOUtils.readFully(inputStream, content); + // } + // LOG.info( + // "createReaderFromPath read byteArray for {}, bytes: {}, duration: {}ms", + // path, + // content.length, + // System.currentTimeMillis() - startTime); + // saveToLocalCache(localCacheFile, content); + // LOG.info( + // "Downloaded remote file to local cache for {}: {}, duration: {}ms", + // path, + // localCacheFile, + // System.currentTimeMillis() - startTime); + // } + + SeekableInput in; + if (fileSize > Integer.MAX_VALUE) { + in = new SeekableInputStreamWrapper(fileIO.newInputStream(path), fileSize); + } else { + byte[] content = new byte[(int) fileSize]; + try (InputStream inputStream = fileIO.newInputStream(path)) { + IOUtils.readFully(inputStream, content); + } + in = new SeekableInputStreamWrapper(new ByteArraySeekableStream(content), fileSize); + } try { return (DataFileReader) DataFileReader.openReader(in, datumReader); } catch (Throwable e) { @@ -88,9 +143,38 @@ private DataFileReader createReaderFromPath(Path path, long fileSiz } } + private File getLocalCacheFile(Path path) { + if (path.toUri().getScheme() == null) { + // already a local file + return new File(path.toString()); + } + String fsPath = path.toString().split("://")[1]; + return new File(CACHE_DIR, fsPath); + } + + private void saveToLocalCache(File localFile, byte[] content) { + try { + if (!localFile.getParentFile().exists()) { + localFile.getParentFile().mkdirs(); + } + File tempFile = new File(localFile.getPath() + ".tmp." + System.nanoTime()); + try (FileOutputStream out = new FileOutputStream(tempFile)) { + out.write(content); + } + if (!tempFile.renameTo(localFile)) { + tempFile.delete(); + } + } catch (IOException e) { + LOG.warn("Failed to save local cache for {}: {}", localFile, e.getMessage()); + } + } + @Nullable @Override public IteratorResultIterator readBatch() throws IOException { + + // LOG.info("readBatch started for {}", filePath); + Object ticket; try { ticket = pool.pollEntry(); @@ -107,10 +191,18 @@ public IteratorResultIterator readBatch() throws IOException { long rowPosition = currentRowPosition; currentRowPosition += reader.getBlockCount(); + + // LOG.info("readBatch for {} found block of {}", reader.getBlockCount()); + IteratorWithException iterator = new AvroBlockIterator(reader.getBlockCount(), reader); - return new IteratorResultIterator( - iterator, () -> pool.recycler().recycle(ticket), filePath, rowPosition); + IteratorResultIterator out = + new IteratorResultIterator( + iterator, () -> pool.recycler().recycle(ticket), filePath, rowPosition); + + // LOG.info("readBatch finished for {}", filePath); + + return out; } private boolean readNextBlock() throws IOException { From 11a359c2eeecf574ce78e7ea9a930856ce59665b Mon Sep 17 00:00:00 2001 From: mao-liu <1684060+mao-liu@users.noreply.github.com> Date: Mon, 12 Jan 2026 12:22:22 +1100 Subject: [PATCH 2/4] FileSystemWriteRestore: using static cache for manifest prefetching --- .../operation/FileSystemWriteRestore.java | 79 +++++++++++++++---- 1 file changed, 63 insertions(+), 16 deletions(-) diff --git a/paimon-core/src/main/java/org/apache/paimon/operation/FileSystemWriteRestore.java b/paimon-core/src/main/java/org/apache/paimon/operation/FileSystemWriteRestore.java index c59d26547d0b..b4d2064c7f7e 100644 --- a/paimon-core/src/main/java/org/apache/paimon/operation/FileSystemWriteRestore.java +++ b/paimon-core/src/main/java/org/apache/paimon/operation/FileSystemWriteRestore.java @@ -30,11 +30,15 @@ import org.apache.paimon.types.RowType; import org.apache.paimon.utils.SnapshotManager; +import org.apache.paimon.shade.caffeine2.com.github.benmanes.caffeine.cache.Cache; +import org.apache.paimon.shade.caffeine2.com.github.benmanes.caffeine.cache.Caffeine; + import org.slf4j.Logger; import org.slf4j.LoggerFactory; import javax.annotation.Nullable; +import java.time.Duration; import java.util.ArrayList; import java.util.Collections; import java.util.List; @@ -52,7 +56,9 @@ public class FileSystemWriteRestore implements WriteRestore { private final IndexFileHandler indexFileHandler; private final Boolean usePrefetchManifestEntries; - @Nullable private transient PrefetchedManifestEntries prefetchedManifestEntries; + + @Nullable + private static Cache prefetchedManifestEntriesCache; public FileSystemWriteRestore( CoreOptions options, @@ -68,6 +74,24 @@ public FileSystemWriteRestore( } } this.usePrefetchManifestEntries = options.prefetchManifestEntries(); + initializeCacheIfNeeded(); + } + + private static synchronized void initializeCacheIfNeeded() { + if (prefetchedManifestEntriesCache == null) { + prefetchedManifestEntriesCache = + Caffeine.newBuilder() + .expireAfterAccess(Duration.ofMinutes(30)) + .executor(Runnable::run) + .build(); + // .softValues() - not used as we want to hold onto a copy of manifest for each table we + // are writing to + // .maximumSize(...) - not used as number of keys is static = number of tables being + // written to + // .maximumWeight(...).weigher(...) - not used as there isn't a convenient way of + // measuring memory size + // of a ManifestEntry object + } } @Override @@ -78,16 +102,40 @@ public long latestCommittedIdentifier(String user) { .orElse(Long.MIN_VALUE); } - public synchronized PrefetchedManifestEntries prefetchManifestEntries(Snapshot snapshot) { - RowType partitionType = scan.manifestsReader().partitionType(); - List manifestEntries = scan.withSnapshot(snapshot).plan().files(); + private String getPrefetchManifestEntriesCacheKey() { + return snapshotManager.tablePath().toString(); + } + + private List fetchManifestEntries( + Snapshot snapshot, @Nullable BinaryRow partition, @Nullable Integer bucket) { + FileStoreScan snapshotScan = scan.withSnapshot(snapshot); + if (partition != null && bucket != null) { + snapshotScan = snapshotScan.withPartitionBucket(partition, bucket); + } + return snapshotScan.plan().files(); + } + + public PrefetchedManifestEntries prefetchManifestEntries(Snapshot snapshot) { + LOG.info( + "FileSystemWriteRestore started prefetching manifestEntries for table {}, snapshot {}", + snapshotManager.tablePath(), + snapshot.id()); + List manifestEntries = fetchManifestEntries(snapshot, null, null); LOG.info( - "FileSystemWriteRestore prefetched manifestEntries for snapshot {}: {} entries", + "FileSystemWriteRestore prefetched manifestEntries for table {}, snapshot {}: {} entries", + snapshotManager.tablePath(), snapshot.id(), manifestEntries.size()); - prefetchedManifestEntries = + RowType partitionType = scan.manifestsReader().partitionType(); + PrefetchedManifestEntries prefetchedManifestEntries = new PrefetchedManifestEntries(snapshot, partitionType, manifestEntries); + + if (prefetchedManifestEntriesCache == null) { + initializeCacheIfNeeded(); + } + prefetchedManifestEntriesCache.put( + getPrefetchManifestEntriesCacheKey(), prefetchedManifestEntries); return prefetchedManifestEntries; } @@ -106,22 +154,21 @@ public RestoreFiles restoreFiles( List entries; if (usePrefetchManifestEntries) { - // local reference of the prefetch container for safety across mutation - PrefetchedManifestEntries prefetch = prefetchedManifestEntries; - if (prefetch == null || prefetch.snapshot().id() != snapshot.id()) { - // refresh the prefetched manifest entries + PrefetchedManifestEntries prefetch = + prefetchedManifestEntriesCache.getIfPresent( + getPrefetchManifestEntriesCacheKey()); + if (prefetch == null || prefetch.snapshot.id() != snapshot.id()) { + // manifest entries if snapshot ids don't match prefetch = prefetchManifestEntries(snapshot); } + entries = prefetch.filter(partition, bucket); } else { - entries = - scan.withSnapshot(snapshot) - .withPartitionBucket(partition, bucket) - .plan() - .files(); + entries = fetchManifestEntries(snapshot, partition, bucket); } LOG.info( - "FileSystemWriteRestore filtered manifestEntries for {}, {}: {} entries", + "FileSystemWriteRestore filtered manifestEntries for {}, {}, {}: {} entries", + snapshotManager.tablePath(), partition, bucket, entries.size()); From 4196c3c670e7727604c372fd869aed9ec1ad57b6 Mon Sep 17 00:00:00 2001 From: mao-liu <1684060+mao-liu@users.noreply.github.com> Date: Mon, 12 Jan 2026 12:52:39 +1100 Subject: [PATCH 3/4] FileSystemWriteRestore: prevent simultaneous prefetching --- .../operation/FileSystemWriteRestore.java | 52 ++++++++++++------- 1 file changed, 34 insertions(+), 18 deletions(-) diff --git a/paimon-core/src/main/java/org/apache/paimon/operation/FileSystemWriteRestore.java b/paimon-core/src/main/java/org/apache/paimon/operation/FileSystemWriteRestore.java index b4d2064c7f7e..ea05b3b793b1 100644 --- a/paimon-core/src/main/java/org/apache/paimon/operation/FileSystemWriteRestore.java +++ b/paimon-core/src/main/java/org/apache/paimon/operation/FileSystemWriteRestore.java @@ -116,27 +116,43 @@ private List fetchManifestEntries( } public PrefetchedManifestEntries prefetchManifestEntries(Snapshot snapshot) { - LOG.info( - "FileSystemWriteRestore started prefetching manifestEntries for table {}, snapshot {}", - snapshotManager.tablePath(), - snapshot.id()); - List manifestEntries = fetchManifestEntries(snapshot, null, null); - LOG.info( - "FileSystemWriteRestore prefetched manifestEntries for table {}, snapshot {}: {} entries", - snapshotManager.tablePath(), - snapshot.id(), - manifestEntries.size()); + synchronized (this.getClass()) { + if (prefetchedManifestEntriesCache == null) { + initializeCacheIfNeeded(); + } - RowType partitionType = scan.manifestsReader().partitionType(); - PrefetchedManifestEntries prefetchedManifestEntries = - new PrefetchedManifestEntries(snapshot, partitionType, manifestEntries); + // check if fetch is needed - if it was done by another thread then we can skip + // altogether + PrefetchedManifestEntries prefetch = + prefetchedManifestEntriesCache.getIfPresent( + getPrefetchManifestEntriesCacheKey()); + if (prefetch != null && prefetch.snapshot.id() == snapshot.id()) { + LOG.info( + "FileSystemWriteRestore skipping prefetching manifestEntries for table {}, snapshot {} as it was fetched by another thread", + snapshotManager.tablePath(), + snapshot.id()); + return prefetch; + } - if (prefetchedManifestEntriesCache == null) { - initializeCacheIfNeeded(); + LOG.info( + "FileSystemWriteRestore started prefetching manifestEntries for table {}, snapshot {}", + snapshotManager.tablePath(), + snapshot.id()); + List manifestEntries = fetchManifestEntries(snapshot, null, null); + LOG.info( + "FileSystemWriteRestore prefetched manifestEntries for table {}, snapshot {}: {} entries", + snapshotManager.tablePath(), + snapshot.id(), + manifestEntries.size()); + + RowType partitionType = scan.manifestsReader().partitionType(); + PrefetchedManifestEntries prefetchedManifestEntries = + new PrefetchedManifestEntries(snapshot, partitionType, manifestEntries); + + prefetchedManifestEntriesCache.put( + getPrefetchManifestEntriesCacheKey(), prefetchedManifestEntries); + return prefetchedManifestEntries; } - prefetchedManifestEntriesCache.put( - getPrefetchManifestEntriesCacheKey(), prefetchedManifestEntries); - return prefetchedManifestEntries; } @Override From 7cf4693e253535ece0de64f3c05f914eee66e8a4 Mon Sep 17 00:00:00 2001 From: mao-liu <1684060+mao-liu@users.noreply.github.com> Date: Tue, 13 Jan 2026 21:33:46 +1100 Subject: [PATCH 4/4] avro: revert changes to avro --- .../paimon/format/avro/AvroBulkFormat.java | 100 +----------------- 1 file changed, 4 insertions(+), 96 deletions(-) diff --git a/paimon-format/src/main/java/org/apache/paimon/format/avro/AvroBulkFormat.java b/paimon-format/src/main/java/org/apache/paimon/format/avro/AvroBulkFormat.java index dc37a9cf6c8d..a06ca9948c44 100644 --- a/paimon-format/src/main/java/org/apache/paimon/format/avro/AvroBulkFormat.java +++ b/paimon-format/src/main/java/org/apache/paimon/format/avro/AvroBulkFormat.java @@ -20,7 +20,6 @@ import org.apache.paimon.data.InternalRow; import org.apache.paimon.format.FormatReaderFactory; -import org.apache.paimon.fs.ByteArraySeekableStream; import org.apache.paimon.fs.FileIO; import org.apache.paimon.fs.Path; import org.apache.paimon.reader.FileRecordReader; @@ -34,25 +33,15 @@ import org.apache.avro.file.DataFileReader; import org.apache.avro.file.SeekableInput; import org.apache.avro.io.DatumReader; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; import javax.annotation.Nullable; -import java.io.File; -import java.io.FileOutputStream; import java.io.IOException; -import java.io.InputStream; import java.util.function.Supplier; /** Provides a {@link FormatReaderFactory} for Avro records. */ public class AvroBulkFormat implements FormatReaderFactory { - private static final Logger LOG = LoggerFactory.getLogger(AvroBulkFormat.class); - - private static final String CACHE_DIR = - System.getProperty("java.io.tmpdir") + "/paimon_avro_cache"; - protected final RowType projectedRowType; public AvroBulkFormat(RowType projectedRowType) { @@ -89,52 +78,8 @@ private AvroReader(FileIO fileIO, Path path, long fileSize) throws IOException { private DataFileReader createReaderFromPath(Path path, long fileSize) throws IOException { DatumReader datumReader = new AvroRowDatumReader(projectedRowType); - - // LOG.info("createReaderFromPath for {}, {}", path, fileSize); - // long startTime = System.currentTimeMillis(); - - // byte[] content = new byte[(int) fileSize]; - // File localCacheFile = getLocalCacheFile(path); - // if (localCacheFile.exists() && localCacheFile.length() == fileSize) { - // LOG.info("Using local cache for {}: {}", path, localCacheFile); - // try (InputStream inputStream = - // new LocalFileIO.LocalSeekableInputStream(localCacheFile)) { - // IOUtils.readFully(inputStream, content); - // } - // LOG.info( - // "Loaded local cache for {}: {}, duration: {}ms", - // path, - // localCacheFile, - // System.currentTimeMillis() - startTime); - // } else { - // LOG.info("Downloading remote file to local cache for {}: {}", path, - // localCacheFile); - // try (InputStream inputStream = fileIO.newInputStream(path)) { - // IOUtils.readFully(inputStream, content); - // } - // LOG.info( - // "createReaderFromPath read byteArray for {}, bytes: {}, duration: {}ms", - // path, - // content.length, - // System.currentTimeMillis() - startTime); - // saveToLocalCache(localCacheFile, content); - // LOG.info( - // "Downloaded remote file to local cache for {}: {}, duration: {}ms", - // path, - // localCacheFile, - // System.currentTimeMillis() - startTime); - // } - - SeekableInput in; - if (fileSize > Integer.MAX_VALUE) { - in = new SeekableInputStreamWrapper(fileIO.newInputStream(path), fileSize); - } else { - byte[] content = new byte[(int) fileSize]; - try (InputStream inputStream = fileIO.newInputStream(path)) { - IOUtils.readFully(inputStream, content); - } - in = new SeekableInputStreamWrapper(new ByteArraySeekableStream(content), fileSize); - } + SeekableInput in = + new SeekableInputStreamWrapper(fileIO.newInputStream(path), fileSize); try { return (DataFileReader) DataFileReader.openReader(in, datumReader); } catch (Throwable e) { @@ -143,38 +88,9 @@ private DataFileReader createReaderFromPath(Path path, long fileSiz } } - private File getLocalCacheFile(Path path) { - if (path.toUri().getScheme() == null) { - // already a local file - return new File(path.toString()); - } - String fsPath = path.toString().split("://")[1]; - return new File(CACHE_DIR, fsPath); - } - - private void saveToLocalCache(File localFile, byte[] content) { - try { - if (!localFile.getParentFile().exists()) { - localFile.getParentFile().mkdirs(); - } - File tempFile = new File(localFile.getPath() + ".tmp." + System.nanoTime()); - try (FileOutputStream out = new FileOutputStream(tempFile)) { - out.write(content); - } - if (!tempFile.renameTo(localFile)) { - tempFile.delete(); - } - } catch (IOException e) { - LOG.warn("Failed to save local cache for {}: {}", localFile, e.getMessage()); - } - } - @Nullable @Override public IteratorResultIterator readBatch() throws IOException { - - // LOG.info("readBatch started for {}", filePath); - Object ticket; try { ticket = pool.pollEntry(); @@ -191,18 +107,10 @@ public IteratorResultIterator readBatch() throws IOException { long rowPosition = currentRowPosition; currentRowPosition += reader.getBlockCount(); - - // LOG.info("readBatch for {} found block of {}", reader.getBlockCount()); - IteratorWithException iterator = new AvroBlockIterator(reader.getBlockCount(), reader); - IteratorResultIterator out = - new IteratorResultIterator( - iterator, () -> pool.recycler().recycle(ticket), filePath, rowPosition); - - // LOG.info("readBatch finished for {}", filePath); - - return out; + return new IteratorResultIterator( + iterator, () -> pool.recycler().recycle(ticket), filePath, rowPosition); } private boolean readNextBlock() throws IOException {