diff --git a/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java b/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java index a3da8ef5f1cb..e28b03be1236 100644 --- a/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java +++ b/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java @@ -1801,6 +1801,15 @@ public InlineElement getDescription() { "For DELETE manifest entry in manifest file, drop stats to reduce memory and storage." + " Default value is false only for compatibility of old reader."); + public static final ConfigOption MANIFEST_PREFETCH_ENTRIES = + key("manifest.prefetch-entries") + .booleanType() + .defaultValue(false) + .withDescription( + "Prefetch all manifest entries when initializing writers, reduces API requests to object store filesystems." + + " This is useful for jobs that write to or compacts many partitions at once," + + " however more memory is required as the entire manifest is loaded."); + public static final ConfigOption DATA_FILE_THIN_MODE = key("data-file.thin-mode") .booleanType() @@ -2537,6 +2546,10 @@ public boolean manifestDeleteFileDropStats() { return options.get(MANIFEST_DELETE_FILE_DROP_STATS); } + public boolean prefetchManifestEntries() { + return options.get(MANIFEST_PREFETCH_ENTRIES); + } + public boolean disableNullToNotNull() { return options.get(DISABLE_ALTER_COLUMN_NULL_TO_NOT_NULL); } diff --git a/paimon-core/src/main/java/org/apache/paimon/manifest/ManifestFile.java b/paimon-core/src/main/java/org/apache/paimon/manifest/ManifestFile.java index 38d8e0a43711..b549a5f0b390 100644 --- a/paimon-core/src/main/java/org/apache/paimon/manifest/ManifestFile.java +++ b/paimon-core/src/main/java/org/apache/paimon/manifest/ManifestFile.java @@ -40,6 +40,9 @@ import org.apache.paimon.utils.SegmentsCache; import org.apache.paimon.utils.VersionedObjectSerializer; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + import javax.annotation.Nullable; import java.io.IOException; @@ -53,6 +56,8 @@ */ public class ManifestFile extends ObjectsFile { + private static final Logger LOG = LoggerFactory.getLogger(ManifestFile.class); + private final SchemaManager schemaManager; private final RowType partitionType; private final FormatWriterFactory writerFactory; @@ -108,6 +113,7 @@ public List read( try { Path path = pathFactory.toPath(fileName); if (cache != null) { + LOG.info("Reading manifest file from cache: {}", fileName); return cache.read( path, fileSize, @@ -115,6 +121,7 @@ public List read( partitionFilter, bucketFilter, readFilter, readTFilter)); } + LOG.info("Reading manifest file without cache: {}", fileName); return readFromIterator( createIterator(path, fileSize), serializer, readFilter, readTFilter); } catch (IOException e) { diff --git a/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreScan.java b/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreScan.java index 7ed77406c777..1286c49c3255 100644 --- a/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreScan.java +++ b/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreScan.java @@ -43,6 +43,9 @@ import org.apache.paimon.utils.Pair; import org.apache.paimon.utils.SnapshotManager; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + import javax.annotation.Nullable; import java.util.ArrayList; @@ -67,6 +70,8 @@ /** Default implementation of {@link FileStoreScan}. */ public abstract class AbstractFileStoreScan implements FileStoreScan { + private static final Logger LOG = LoggerFactory.getLogger(AbstractFileStoreScan.class); + private final ManifestsReader manifestsReader; private final SnapshotManager snapshotManager; private final ManifestFile.Factory manifestFileFactory; @@ -251,14 +256,35 @@ public ManifestsReader manifestsReader() { @Override public Plan plan() { long started = System.nanoTime(); + long startManifests = System.currentTimeMillis(); ManifestsReader.Result manifestsResult = readManifests(); + long manifestsDuration = System.currentTimeMillis() - startManifests; + LOG.info( + "Read manifests, duration: {}ms, count: {}", + manifestsDuration, + manifestsResult.filteredManifests.size()); + Snapshot snapshot = manifestsResult.snapshot; List manifests = manifestsResult.filteredManifests; + long startIterator = System.currentTimeMillis(); Iterator iterator = readManifestEntries(manifests, false); + long iteratorDuration = System.currentTimeMillis() - startIterator; + LOG.info("Obtained iterator for manifest entries, duration: {}ms", iteratorDuration); + List files = new ArrayList<>(); while (iterator.hasNext()) { - files.add(iterator.next()); + long startIteration = System.currentTimeMillis(); + ManifestEntry entry = iterator.next(); + long iterationDuration = System.currentTimeMillis() - startIteration; + // this step is pretty fast - entries are become available in "chunks" + // i.e. once a manifest is read in full, and returned by readManifest, the individual + // entries are processed and made available to the iterator in one batch + // LOG.info( + // "Iterated manifest entry: {}, duration: {}ms", + // entry.file().fileName(), + // iterationDuration); + files.add(entry); } if (wholeBucketFilterEnabled()) { @@ -372,16 +398,28 @@ private Iterator readAndMergeFileEntries( List manifests, Function, List> converter, boolean useSequential) { + long startDeletedEntriesTime = System.currentTimeMillis(); Set deletedEntries = FileEntry.readDeletedEntries( manifest -> readManifest(manifest, FileEntry.deletedFilter(), null), manifests, parallelism); + long deletedEntriesDuration = System.currentTimeMillis() - startDeletedEntriesTime; + LOG.info( + "Read deleted entries: {}, duration: {}ms", + deletedEntries.size(), + deletedEntriesDuration); + long startFilterTime = System.currentTimeMillis(); manifests = manifests.stream() .filter(file -> file.numAddedFiles() > 0) .collect(Collectors.toList()); + long filterDuration = System.currentTimeMillis() - startFilterTime; + LOG.info( + "Filtered manifests: {} remaining manifests, duration: {}ms", + manifests.size(), + filterDuration); Function> processor = manifest -> @@ -391,8 +429,10 @@ private Iterator readAndMergeFileEntries( FileEntry.addFilter(), entry -> !deletedEntries.contains(entry.identifier()))); if (useSequential) { + LOG.info("Using sequential execution for file entry processing"); return sequentialBatchedExecute(processor, manifests, parallelism).iterator(); } else { + LOG.info("Using random execution for file entry processing"); return randomlyExecuteSequentialReturn(processor, manifests, parallelism); } } @@ -446,6 +486,8 @@ private List readManifest( ManifestFileMeta manifest, @Nullable Filter additionalFilter, @Nullable Filter additionalTFilter) { + long startTime = System.currentTimeMillis(); + LOG.info("Reading manifest file: {}", manifest.fileName()); List entries = manifestFileFactory .create() @@ -462,12 +504,25 @@ private List readManifest( && (manifestEntryFilter == null || manifestEntryFilter.test(entry)) && filterByStats(entry)); + // SLOW PART IS HERE!!! + long duration = System.currentTimeMillis() - startTime; + LOG.info( + "Successfully read manifest file: {}, entries: {}, duration: {}ms", + manifest.fileName(), + entries.size(), + duration); if (dropStats) { List copied = new ArrayList<>(entries.size()); for (ManifestEntry entry : entries) { copied.add(dropStats(entry)); } entries = copied; + long duration2 = System.currentTimeMillis() - startTime; + LOG.info( + "Successfully pruned manifest file due to dropStats: {}, entries: {}, duration: {}ms", + manifest.fileName(), + entries.size(), + duration2); } return entries; } diff --git a/paimon-core/src/main/java/org/apache/paimon/operation/FileSystemWriteRestore.java b/paimon-core/src/main/java/org/apache/paimon/operation/FileSystemWriteRestore.java index e7faf2a24569..ea05b3b793b1 100644 --- a/paimon-core/src/main/java/org/apache/paimon/operation/FileSystemWriteRestore.java +++ b/paimon-core/src/main/java/org/apache/paimon/operation/FileSystemWriteRestore.java @@ -24,21 +24,42 @@ import org.apache.paimon.index.IndexFileHandler; import org.apache.paimon.index.IndexFileMeta; import org.apache.paimon.io.DataFileMeta; +import org.apache.paimon.manifest.BucketFilter; import org.apache.paimon.manifest.ManifestEntry; +import org.apache.paimon.partition.PartitionPredicate; +import org.apache.paimon.types.RowType; import org.apache.paimon.utils.SnapshotManager; +import org.apache.paimon.shade.caffeine2.com.github.benmanes.caffeine.cache.Cache; +import org.apache.paimon.shade.caffeine2.com.github.benmanes.caffeine.cache.Caffeine; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.Nullable; + +import java.time.Duration; import java.util.ArrayList; +import java.util.Collections; import java.util.List; +import java.util.stream.Collectors; import static org.apache.paimon.deletionvectors.DeletionVectorsIndexFile.DELETION_VECTORS_INDEX; /** {@link WriteRestore} to restore files directly from file system. */ public class FileSystemWriteRestore implements WriteRestore { + private static final Logger LOG = LoggerFactory.getLogger(FileSystemWriteRestore.class); + private final SnapshotManager snapshotManager; private final FileStoreScan scan; private final IndexFileHandler indexFileHandler; + private final Boolean usePrefetchManifestEntries; + + @Nullable + private static Cache prefetchedManifestEntriesCache; + public FileSystemWriteRestore( CoreOptions options, SnapshotManager snapshotManager, @@ -52,6 +73,25 @@ public FileSystemWriteRestore( this.scan.dropStats(); } } + this.usePrefetchManifestEntries = options.prefetchManifestEntries(); + initializeCacheIfNeeded(); + } + + private static synchronized void initializeCacheIfNeeded() { + if (prefetchedManifestEntriesCache == null) { + prefetchedManifestEntriesCache = + Caffeine.newBuilder() + .expireAfterAccess(Duration.ofMinutes(30)) + .executor(Runnable::run) + .build(); + // .softValues() - not used as we want to hold onto a copy of manifest for each table we + // are writing to + // .maximumSize(...) - not used as number of keys is static = number of tables being + // written to + // .maximumWeight(...).weigher(...) - not used as there isn't a convenient way of + // measuring memory size + // of a ManifestEntry object + } } @Override @@ -62,6 +102,59 @@ public long latestCommittedIdentifier(String user) { .orElse(Long.MIN_VALUE); } + private String getPrefetchManifestEntriesCacheKey() { + return snapshotManager.tablePath().toString(); + } + + private List fetchManifestEntries( + Snapshot snapshot, @Nullable BinaryRow partition, @Nullable Integer bucket) { + FileStoreScan snapshotScan = scan.withSnapshot(snapshot); + if (partition != null && bucket != null) { + snapshotScan = snapshotScan.withPartitionBucket(partition, bucket); + } + return snapshotScan.plan().files(); + } + + public PrefetchedManifestEntries prefetchManifestEntries(Snapshot snapshot) { + synchronized (this.getClass()) { + if (prefetchedManifestEntriesCache == null) { + initializeCacheIfNeeded(); + } + + // check if fetch is needed - if it was done by another thread then we can skip + // altogether + PrefetchedManifestEntries prefetch = + prefetchedManifestEntriesCache.getIfPresent( + getPrefetchManifestEntriesCacheKey()); + if (prefetch != null && prefetch.snapshot.id() == snapshot.id()) { + LOG.info( + "FileSystemWriteRestore skipping prefetching manifestEntries for table {}, snapshot {} as it was fetched by another thread", + snapshotManager.tablePath(), + snapshot.id()); + return prefetch; + } + + LOG.info( + "FileSystemWriteRestore started prefetching manifestEntries for table {}, snapshot {}", + snapshotManager.tablePath(), + snapshot.id()); + List manifestEntries = fetchManifestEntries(snapshot, null, null); + LOG.info( + "FileSystemWriteRestore prefetched manifestEntries for table {}, snapshot {}: {} entries", + snapshotManager.tablePath(), + snapshot.id(), + manifestEntries.size()); + + RowType partitionType = scan.manifestsReader().partitionType(); + PrefetchedManifestEntries prefetchedManifestEntries = + new PrefetchedManifestEntries(snapshot, partitionType, manifestEntries); + + prefetchedManifestEntriesCache.put( + getPrefetchManifestEntriesCacheKey(), prefetchedManifestEntries); + return prefetchedManifestEntries; + } + } + @Override public RestoreFiles restoreFiles( BinaryRow partition, @@ -75,9 +168,28 @@ public RestoreFiles restoreFiles( return RestoreFiles.empty(); } + List entries; + if (usePrefetchManifestEntries) { + PrefetchedManifestEntries prefetch = + prefetchedManifestEntriesCache.getIfPresent( + getPrefetchManifestEntriesCacheKey()); + if (prefetch == null || prefetch.snapshot.id() != snapshot.id()) { + // manifest entries if snapshot ids don't match + prefetch = prefetchManifestEntries(snapshot); + } + + entries = prefetch.filter(partition, bucket); + } else { + entries = fetchManifestEntries(snapshot, partition, bucket); + } + LOG.info( + "FileSystemWriteRestore filtered manifestEntries for {}, {}, {}: {} entries", + snapshotManager.tablePath(), + partition, + bucket, + entries.size()); + List restoreFiles = new ArrayList<>(); - List entries = - scan.withSnapshot(snapshot).withPartitionBucket(partition, bucket).plan().files(); Integer totalBuckets = WriteRestore.extractDataFiles(entries, restoreFiles); IndexFileMeta dynamicBucketIndex = null; @@ -95,4 +207,49 @@ public RestoreFiles restoreFiles( return new RestoreFiles( snapshot, totalBuckets, restoreFiles, dynamicBucketIndex, deleteVectorsIndex); } + + /** + * Container for a {@link Snapshot}'s manifest entries, used by {@link FileSystemWriteRestore} + * to broker thread-safe access to cached results. + */ + public static class PrefetchedManifestEntries { + + private final Snapshot snapshot; + private final RowType partitionType; + private final List manifestEntries; + + public PrefetchedManifestEntries( + Snapshot snapshot, RowType partitionType, List manifestEntries) { + this.snapshot = snapshot; + this.partitionType = partitionType; + this.manifestEntries = manifestEntries; + } + + public Snapshot snapshot() { + return snapshot; + } + + public RowType partitionType() { + return partitionType; + } + + public List manifestEntries() { + return manifestEntries; + } + + public List filter(BinaryRow partition, int bucket) { + PartitionPredicate partitionPredicate = + PartitionPredicate.fromMultiple( + partitionType, Collections.singletonList(partition)); + + BucketFilter bucketFilter = BucketFilter.create(false, bucket, null, null); + return manifestEntries.stream() + .filter( + m -> + (partitionPredicate == null + || partitionPredicate.test(m.partition())) + && bucketFilter.test(m.bucket(), m.totalBuckets())) + .collect(Collectors.toList()); + } + } } diff --git a/paimon-core/src/main/java/org/apache/paimon/operation/ManifestsReader.java b/paimon-core/src/main/java/org/apache/paimon/operation/ManifestsReader.java index b3b89e72aafb..f6ce0c0c3d6f 100644 --- a/paimon-core/src/main/java/org/apache/paimon/operation/ManifestsReader.java +++ b/paimon-core/src/main/java/org/apache/paimon/operation/ManifestsReader.java @@ -66,6 +66,10 @@ public ManifestsReader( this.manifestListFactory = manifestListFactory; } + public RowType partitionType() { + return partitionType; + } + public ManifestsReader onlyReadRealBuckets() { this.onlyReadRealBuckets = true; return this; diff --git a/paimon-core/src/main/java/org/apache/paimon/utils/ObjectsCache.java b/paimon-core/src/main/java/org/apache/paimon/utils/ObjectsCache.java index 21b17549b3bf..85fb14c3038c 100644 --- a/paimon-core/src/main/java/org/apache/paimon/utils/ObjectsCache.java +++ b/paimon-core/src/main/java/org/apache/paimon/utils/ObjectsCache.java @@ -24,6 +24,9 @@ import org.apache.paimon.operation.metrics.CacheMetrics; import org.apache.paimon.types.RowType; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + import javax.annotation.Nullable; import javax.annotation.concurrent.ThreadSafe; @@ -36,6 +39,8 @@ @ThreadSafe public abstract class ObjectsCache { + private static final Logger LOG = LoggerFactory.getLogger(ObjectsCache.class); + protected final SegmentsCache cache; protected final ObjectSerializer projectedSerializer; protected final ThreadLocal formatSerializer; @@ -69,8 +74,10 @@ public List read(K key, @Nullable Long fileSize, Filters filters) throws I if (cacheMetrics != null) { cacheMetrics.increaseHitObject(); } + LOG.info("ObjectsCache cache-hit for {}", key.toString()); return readFromSegments(segments, filters); } else { + LOG.info("ObjectsCache cache-miss for {}", key.toString()); if (cacheMetrics != null) { cacheMetrics.increaseMissedObject(); } diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/CompactorSink.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/CompactorSink.java index a9c6031dfa34..cfaabe9564a2 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/CompactorSink.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/CompactorSink.java @@ -19,11 +19,14 @@ package org.apache.paimon.flink.sink; import org.apache.paimon.manifest.ManifestCommittable; +import org.apache.paimon.options.Options; import org.apache.paimon.table.FileStoreTable; import org.apache.flink.streaming.api.operators.OneInputStreamOperatorFactory; import org.apache.flink.table.data.RowData; +import static org.apache.paimon.flink.FlinkConnectorOptions.SINK_WRITER_COORDINATOR_ENABLED; + /** {@link FlinkSink} for dedicated compact jobs. */ public class CompactorSink extends FlinkSink { @@ -39,7 +42,13 @@ public CompactorSink(FileStoreTable table, boolean fullCompaction) { @Override protected OneInputStreamOperatorFactory createWriteOperatorFactory( StoreSinkWrite.Provider writeProvider, String commitUser) { - return new StoreCompactOperator.Factory(table, writeProvider, commitUser, fullCompaction); + Options options = table.coreOptions().toConfiguration(); + boolean coordinatorEnabled = options.get(SINK_WRITER_COORDINATOR_ENABLED); + return coordinatorEnabled + ? new StoreCompactOperator.CoordinatedFactory( + table, writeProvider, commitUser, fullCompaction) + : new StoreCompactOperator.Factory( + table, writeProvider, commitUser, fullCompaction); } @Override diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreCompactOperator.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreCompactOperator.java index 989c8e91dc83..2246e9f5a415 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreCompactOperator.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreCompactOperator.java @@ -21,17 +21,24 @@ import org.apache.paimon.CoreOptions; import org.apache.paimon.annotation.VisibleForTesting; import org.apache.paimon.data.BinaryRow; +import org.apache.paimon.flink.sink.coordinator.CoordinatedWriteRestore; +import org.apache.paimon.flink.sink.coordinator.WriteOperatorCoordinator; import org.apache.paimon.flink.utils.RuntimeContextUtils; import org.apache.paimon.io.DataFileMeta; import org.apache.paimon.io.DataFileMetaSerializer; +import org.apache.paimon.operation.WriteRestore; import org.apache.paimon.options.Options; import org.apache.paimon.table.FileStoreTable; import org.apache.paimon.table.sink.ChannelComputer; import org.apache.paimon.utils.Pair; import org.apache.paimon.utils.Preconditions; +import org.apache.flink.runtime.jobgraph.OperatorID; +import org.apache.flink.runtime.jobgraph.tasks.TaskOperatorEventGateway; +import org.apache.flink.runtime.operators.coordination.OperatorCoordinator; import org.apache.flink.runtime.state.StateInitializationContext; import org.apache.flink.runtime.state.StateSnapshotContext; +import org.apache.flink.streaming.api.operators.CoordinatedOperatorFactory; import org.apache.flink.streaming.api.operators.StreamOperator; import org.apache.flink.streaming.api.operators.StreamOperatorFactory; import org.apache.flink.streaming.api.operators.StreamOperatorParameters; @@ -69,6 +76,7 @@ public class StoreCompactOperator extends PrepareCommitOperator> waitToCompact; + protected transient @Nullable WriteRestore writeRestore; protected transient @Nullable WriterRefresher writeRefresher; @@ -119,9 +127,16 @@ public void initializeState(StateInitializationContext context) throws Exception getContainingTask().getEnvironment().getIOManager(), memoryPoolFactory, getMetricGroup()); + if (writeRestore != null) { + write.setWriteRestore(writeRestore); + } this.writeRefresher = WriterRefresher.create(write.streamingMode(), table, write::replace); } + public void setWriteRestore(@Nullable WriteRestore writeRestore) { + this.writeRestore = writeRestore; + } + @Override public void open() throws Exception { super.open(); @@ -244,4 +259,65 @@ public Class getStreamOperatorClass(ClassLoader classL return StoreCompactOperator.class; } } + + /** {@link StreamOperatorFactory} of {@link StoreCompactOperator}. */ + public static class CoordinatedFactory + extends PrepareCommitOperator.Factory + implements CoordinatedOperatorFactory { + + private static final long serialVersionUID = 1L; + + private final FileStoreTable table; + private final StoreSinkWrite.Provider storeSinkWriteProvider; + private final String initialCommitUser; + private final boolean fullCompaction; + + public CoordinatedFactory( + FileStoreTable table, + StoreSinkWrite.Provider storeSinkWriteProvider, + String initialCommitUser, + boolean fullCompaction) { + super(Options.fromMap(table.options())); + Preconditions.checkArgument( + !table.coreOptions().writeOnly(), + CoreOptions.WRITE_ONLY.key() + " should not be true for StoreCompactOperator."); + this.table = table; + this.storeSinkWriteProvider = storeSinkWriteProvider; + this.initialCommitUser = initialCommitUser; + this.fullCompaction = fullCompaction; + } + + @Override + public OperatorCoordinator.Provider getCoordinatorProvider( + String operatorName, OperatorID operatorID) { + return new WriteOperatorCoordinator.Provider(operatorID, table); + } + + @Override + @SuppressWarnings("unchecked") + public > T createStreamOperator( + StreamOperatorParameters parameters) { + OperatorID operatorID = parameters.getStreamConfig().getOperatorID(); + TaskOperatorEventGateway gateway = + parameters + .getContainingTask() + .getEnvironment() + .getOperatorCoordinatorEventGateway(); + + StoreCompactOperator operator = + new StoreCompactOperator( + parameters, + table, + storeSinkWriteProvider, + initialCommitUser, + fullCompaction); + + operator.setWriteRestore(new CoordinatedWriteRestore(gateway, operatorID)); + return (T) operator; + } + + public Class getStreamOperatorClass(ClassLoader classLoader) { + return StoreCompactOperator.class; + } + } }