Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 13 additions & 0 deletions paimon-api/src/main/java/org/apache/paimon/CoreOptions.java
Original file line number Diff line number Diff line change
Expand Up @@ -1801,6 +1801,15 @@ public InlineElement getDescription() {
"For DELETE manifest entry in manifest file, drop stats to reduce memory and storage."
+ " Default value is false only for compatibility of old reader.");

public static final ConfigOption<Boolean> MANIFEST_PREFETCH_ENTRIES =
key("manifest.prefetch-entries")
.booleanType()
.defaultValue(false)
.withDescription(
"Prefetch all manifest entries when initializing writers, reduces API requests to object store filesystems."
+ " This is useful for jobs that write to or compacts many partitions at once,"
+ " however more memory is required as the entire manifest is loaded.");

public static final ConfigOption<Boolean> DATA_FILE_THIN_MODE =
key("data-file.thin-mode")
.booleanType()
Expand Down Expand Up @@ -2537,6 +2546,10 @@ public boolean manifestDeleteFileDropStats() {
return options.get(MANIFEST_DELETE_FILE_DROP_STATS);
}

public boolean prefetchManifestEntries() {
return options.get(MANIFEST_PREFETCH_ENTRIES);
}

public boolean disableNullToNotNull() {
return options.get(DISABLE_ALTER_COLUMN_NULL_TO_NOT_NULL);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,9 @@
import org.apache.paimon.utils.SegmentsCache;
import org.apache.paimon.utils.VersionedObjectSerializer;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import javax.annotation.Nullable;

import java.io.IOException;
Expand All @@ -53,6 +56,8 @@
*/
public class ManifestFile extends ObjectsFile<ManifestEntry> {

private static final Logger LOG = LoggerFactory.getLogger(ManifestFile.class);

private final SchemaManager schemaManager;
private final RowType partitionType;
private final FormatWriterFactory writerFactory;
Expand Down Expand Up @@ -108,13 +113,15 @@ public List<ManifestEntry> read(
try {
Path path = pathFactory.toPath(fileName);
if (cache != null) {
LOG.info("Reading manifest file from cache: {}", fileName);
return cache.read(
path,
fileSize,
new ManifestEntryFilters(
partitionFilter, bucketFilter, readFilter, readTFilter));
}

LOG.info("Reading manifest file without cache: {}", fileName);
return readFromIterator(
createIterator(path, fileSize), serializer, readFilter, readTFilter);
} catch (IOException e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,9 @@
import org.apache.paimon.utils.Pair;
import org.apache.paimon.utils.SnapshotManager;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import javax.annotation.Nullable;

import java.util.ArrayList;
Expand All @@ -67,6 +70,8 @@
/** Default implementation of {@link FileStoreScan}. */
public abstract class AbstractFileStoreScan implements FileStoreScan {
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Logging changes to this class only, can ignore


private static final Logger LOG = LoggerFactory.getLogger(AbstractFileStoreScan.class);

private final ManifestsReader manifestsReader;
private final SnapshotManager snapshotManager;
private final ManifestFile.Factory manifestFileFactory;
Expand Down Expand Up @@ -251,14 +256,35 @@ public ManifestsReader manifestsReader() {
@Override
public Plan plan() {
long started = System.nanoTime();
long startManifests = System.currentTimeMillis();
ManifestsReader.Result manifestsResult = readManifests();
long manifestsDuration = System.currentTimeMillis() - startManifests;
LOG.info(
"Read manifests, duration: {}ms, count: {}",
manifestsDuration,
manifestsResult.filteredManifests.size());

Snapshot snapshot = manifestsResult.snapshot;
List<ManifestFileMeta> manifests = manifestsResult.filteredManifests;

long startIterator = System.currentTimeMillis();
Iterator<ManifestEntry> iterator = readManifestEntries(manifests, false);
long iteratorDuration = System.currentTimeMillis() - startIterator;
LOG.info("Obtained iterator for manifest entries, duration: {}ms", iteratorDuration);

List<ManifestEntry> files = new ArrayList<>();
while (iterator.hasNext()) {
files.add(iterator.next());
long startIteration = System.currentTimeMillis();
ManifestEntry entry = iterator.next();
long iterationDuration = System.currentTimeMillis() - startIteration;
// this step is pretty fast - entries are become available in "chunks"
// i.e. once a manifest is read in full, and returned by readManifest, the individual
// entries are processed and made available to the iterator in one batch
// LOG.info(
// "Iterated manifest entry: {}, duration: {}ms",
// entry.file().fileName(),
// iterationDuration);
files.add(entry);
}

if (wholeBucketFilterEnabled()) {
Expand Down Expand Up @@ -372,16 +398,28 @@ private <T extends FileEntry> Iterator<T> readAndMergeFileEntries(
List<ManifestFileMeta> manifests,
Function<List<ManifestEntry>, List<T>> converter,
boolean useSequential) {
long startDeletedEntriesTime = System.currentTimeMillis();
Set<Identifier> deletedEntries =
FileEntry.readDeletedEntries(
manifest -> readManifest(manifest, FileEntry.deletedFilter(), null),
manifests,
parallelism);
long deletedEntriesDuration = System.currentTimeMillis() - startDeletedEntriesTime;
LOG.info(
"Read deleted entries: {}, duration: {}ms",
deletedEntries.size(),
deletedEntriesDuration);

long startFilterTime = System.currentTimeMillis();
manifests =
manifests.stream()
.filter(file -> file.numAddedFiles() > 0)
.collect(Collectors.toList());
long filterDuration = System.currentTimeMillis() - startFilterTime;
LOG.info(
"Filtered manifests: {} remaining manifests, duration: {}ms",
manifests.size(),
filterDuration);

Function<ManifestFileMeta, List<T>> processor =
manifest ->
Expand All @@ -391,8 +429,10 @@ private <T extends FileEntry> Iterator<T> readAndMergeFileEntries(
FileEntry.addFilter(),
entry -> !deletedEntries.contains(entry.identifier())));
if (useSequential) {
LOG.info("Using sequential execution for file entry processing");
return sequentialBatchedExecute(processor, manifests, parallelism).iterator();
} else {
LOG.info("Using random execution for file entry processing");
return randomlyExecuteSequentialReturn(processor, manifests, parallelism);
}
}
Expand Down Expand Up @@ -446,6 +486,8 @@ private List<ManifestEntry> readManifest(
ManifestFileMeta manifest,
@Nullable Filter<InternalRow> additionalFilter,
@Nullable Filter<ManifestEntry> additionalTFilter) {
long startTime = System.currentTimeMillis();
LOG.info("Reading manifest file: {}", manifest.fileName());
List<ManifestEntry> entries =
manifestFileFactory
.create()
Expand All @@ -462,12 +504,25 @@ private List<ManifestEntry> readManifest(
&& (manifestEntryFilter == null
|| manifestEntryFilter.test(entry))
&& filterByStats(entry));
// SLOW PART IS HERE!!!
long duration = System.currentTimeMillis() - startTime;
LOG.info(
"Successfully read manifest file: {}, entries: {}, duration: {}ms",
manifest.fileName(),
entries.size(),
duration);
if (dropStats) {
List<ManifestEntry> copied = new ArrayList<>(entries.size());
for (ManifestEntry entry : entries) {
copied.add(dropStats(entry));
}
entries = copied;
long duration2 = System.currentTimeMillis() - startTime;
LOG.info(
"Successfully pruned manifest file due to dropStats: {}, entries: {}, duration: {}ms",
manifest.fileName(),
entries.size(),
duration2);
}
return entries;
}
Expand Down
Loading
Loading