From 511aaa6d595aa06362c94747f93c821355128e35 Mon Sep 17 00:00:00 2001 From: umi Date: Tue, 13 Jan 2026 20:43:26 +0800 Subject: [PATCH 1/3] [python] Refactor FullStartingScanner by introducing PartialStartingScanner --- paimon-python/dev/lint-python.sh | 2 +- .../read/scanner/full_starting_scanner.py | 753 +++++++++--------- paimon-python/pypaimon/read/table_scan.py | 27 +- 3 files changed, 416 insertions(+), 366 deletions(-) diff --git a/paimon-python/dev/lint-python.sh b/paimon-python/dev/lint-python.sh index 44be2871493e..4ebbe1acf87f 100755 --- a/paimon-python/dev/lint-python.sh +++ b/paimon-python/dev/lint-python.sh @@ -107,7 +107,7 @@ function collect_checks() { function get_all_supported_checks() { _OLD_IFS=$IFS IFS=$'\n' - SUPPORT_CHECKS=("flake8_check" "pytest_torch_check" "pytest_check" "mixed_check") # control the calling sequence + SUPPORT_CHECKS=("flake8_check" "pytest_check" "pytest_torch_check" "mixed_check") # control the calling sequence for fun in $(declare -F); do if [[ `regexp_match "$fun" "_check$"` = true ]]; then check_name="${fun:11}" diff --git a/paimon-python/pypaimon/read/scanner/full_starting_scanner.py b/paimon-python/pypaimon/read/scanner/full_starting_scanner.py index 3765baffa67f..df88ea725d59 100755 --- a/paimon-python/pypaimon/read/scanner/full_starting_scanner.py +++ b/paimon-python/pypaimon/read/scanner/full_starting_scanner.py @@ -61,11 +61,6 @@ def __init__(self, table, predicate: Optional[Predicate], limit: Optional[int]): self.target_split_size = options.source_split_target_size() self.open_file_cost = options.source_split_open_file_cost() - self.idx_of_this_subtask = None - self.number_of_para_subtasks = None - self.start_pos_of_this_subtask = None - self.end_pos_of_this_subtask = None - self.only_read_real_buckets = True if options.bucket() == BucketMode.POSTPONE_BUCKET.value else False self.data_evolution = options.data_evolution_enabled() self.deletion_vectors_enabled = options.deletion_vectors_enabled() @@ -120,209 +115,138 @@ def read_manifest_entries(self, manifest_files: List[ManifestFileMeta]) -> List[ self._filter_manifest_entry, max_workers=max_workers) - def with_shard(self, idx_of_this_subtask, number_of_para_subtasks) -> 'FullStartingScanner': - if idx_of_this_subtask >= number_of_para_subtasks: - raise Exception("idx_of_this_subtask must be less than number_of_para_subtasks") - if self.start_pos_of_this_subtask is not None: - raise Exception("with_shard and with_slice cannot be used simultaneously") - self.idx_of_this_subtask = idx_of_this_subtask - self.number_of_para_subtasks = number_of_para_subtasks - return self + def _create_append_only_splits( + self, file_entries: List[ManifestEntry], deletion_files_map: dict = None) -> List['Split']: + partitioned_files = defaultdict(list) + for entry in file_entries: + partitioned_files[(tuple(entry.partition.values), entry.bucket)].append(entry) - def with_slice(self, start_pos, end_pos) -> 'FullStartingScanner': - if start_pos >= end_pos: - raise Exception("start_pos must be less than end_pos") - if self.idx_of_this_subtask is not None: - raise Exception("with_slice and with_shard cannot be used simultaneously") - self.start_pos_of_this_subtask = start_pos - self.end_pos_of_this_subtask = end_pos - return self + if self._partial_read(): + partitioned_files = self._filter_by_pos(partitioned_files) - @staticmethod - def _append_only_filter_by_slice(partitioned_files: defaultdict, - start_pos: int, - end_pos: int) -> (defaultdict, int, int): - plan_start_pos = 0 - plan_end_pos = 0 - entry_end_pos = 0 # end row position of current file in all data - splits_start_pos = 0 - filtered_partitioned_files = defaultdict(list) - # Iterate through all file entries to find files that overlap with current shard range + def weight_func(f: DataFileMeta) -> int: + return max(f.file_size, self.open_file_cost) + + splits = [] for key, file_entries in partitioned_files.items(): - filtered_entries = [] - for entry in file_entries: - entry_begin_pos = entry_end_pos # Starting row position of current file in all data - entry_end_pos += entry.file.row_count # Update to row position after current file + if not file_entries: + return [] - # If current file is completely after shard range, stop iteration - if entry_begin_pos >= end_pos: - break - # If current file is completely before shard range, skip it - if entry_end_pos <= start_pos: - continue - if entry_begin_pos <= start_pos < entry_end_pos: - splits_start_pos = entry_begin_pos - plan_start_pos = start_pos - entry_begin_pos - # If shard end position is within current file, record relative end position - if entry_begin_pos < end_pos <= entry_end_pos: - plan_end_pos = end_pos - splits_start_pos - # Add files that overlap with shard range to result - filtered_entries.append(entry) - if filtered_entries: - filtered_partitioned_files[key] = filtered_entries + data_files: List[DataFileMeta] = [e.file for e in file_entries] - return filtered_partitioned_files, plan_start_pos, plan_end_pos + packed_files: List[List[DataFileMeta]] = self._pack_for_ordered(data_files, weight_func, + self.target_split_size) + splits += self._build_split_from_pack(packed_files, file_entries, False, deletion_files_map) + if self._partial_read(): + self._compute_split_pos(splits) + return splits - def _append_only_filter_by_shard(self, partitioned_files: defaultdict) -> (defaultdict, int, int): - """ - Filter file entries by shard. Only keep the files within the range, which means - that only the starting and ending files need to be further divided subsequently - """ - total_row = 0 - # Sort by file creation time to ensure consistent sharding - for key, file_entries in partitioned_files.items(): - for entry in file_entries: - total_row += entry.file.row_count + def _create_primary_key_splits( + self, file_entries: List[ManifestEntry], deletion_files_map: dict = None) -> List['Split']: + if self._partial_read(): + file_entries = self._filter_by_pos(file_entries) - # Calculate number of rows this shard should process using balanced distribution - # Distribute remainder evenly among first few shards to avoid last shard overload - base_rows_per_shard = total_row // self.number_of_para_subtasks - remainder = total_row % self.number_of_para_subtasks + partitioned_files = defaultdict(list) + for entry in file_entries: + partitioned_files[(tuple(entry.partition.values), entry.bucket)].append(entry) - # Each of the first 'remainder' shards gets one extra row - if self.idx_of_this_subtask < remainder: - num_row = base_rows_per_shard + 1 - start_pos = self.idx_of_this_subtask * (base_rows_per_shard + 1) - else: - num_row = base_rows_per_shard - start_pos = (remainder * (base_rows_per_shard + 1) + - (self.idx_of_this_subtask - remainder) * base_rows_per_shard) + def single_weight_func(f: DataFileMeta) -> int: + return max(f.file_size, self.open_file_cost) - end_pos = start_pos + num_row + def weight_func(fl: List[DataFileMeta]) -> int: + return max(sum(f.file_size for f in fl), self.open_file_cost) - return self._append_only_filter_by_slice(partitioned_files, start_pos, end_pos) + merge_engine = self.table.options.merge_engine() + merge_engine_first_row = merge_engine == MergeEngine.FIRST_ROW - def _data_evolution_filter_by_row_range(self, partitioned_files: defaultdict, - start_pos: int, - end_pos: int) -> (defaultdict, int, int): - plan_start_pos = 0 - plan_end_pos = 0 - entry_end_pos = 0 # end row position of current file in all data - splits_start_pos = 0 - filtered_partitioned_files = defaultdict(list) - # Iterate through all file entries to find files that overlap with current shard range + splits = [] for key, file_entries in partitioned_files.items(): - filtered_entries = [] - blob_added = False # If it is true, all blobs corresponding to this data file are added - for entry in file_entries: - if self._is_blob_file(entry.file.file_name): - if blob_added: - filtered_entries.append(entry) - continue - blob_added = False - entry_begin_pos = entry_end_pos # Starting row position of current file in all data - entry_end_pos += entry.file.row_count # Update to row position after current file + if not file_entries: + continue - # If current file is completely after shard range, stop iteration - if entry_begin_pos >= end_pos: - break - # If current file is completely before shard range, skip it - if entry_end_pos <= start_pos: - continue - if entry_begin_pos <= start_pos < entry_end_pos: - splits_start_pos = entry_begin_pos - plan_start_pos = start_pos - entry_begin_pos - # If shard end position is within current file, record relative end position - if entry_begin_pos < end_pos <= entry_end_pos: - plan_end_pos = end_pos - splits_start_pos - # Add files that overlap with shard range to result - filtered_entries.append(entry) - blob_added = True - if filtered_entries: - filtered_partitioned_files[key] = filtered_entries + data_files: List[DataFileMeta] = [e.file for e in file_entries] - return filtered_partitioned_files, plan_start_pos, plan_end_pos + raw_convertible = all( + f.level != 0 and self._without_delete_row(f) + for f in data_files + ) - def _data_evolution_filter_by_shard(self, partitioned_files: defaultdict) -> (defaultdict, int, int): - total_row = 0 - for key, file_entries in partitioned_files.items(): - for entry in file_entries: - if not self._is_blob_file(entry.file.file_name): - total_row += entry.file.row_count + levels = {f.level for f in data_files} + one_level = len(levels) == 1 - # Calculate number of rows this shard should process using balanced distribution - # Distribute remainder evenly among first few shards to avoid last shard overload - base_rows_per_shard = total_row // self.number_of_para_subtasks - remainder = total_row % self.number_of_para_subtasks + use_optimized_path = raw_convertible and ( + self.deletion_vectors_enabled or merge_engine_first_row or one_level) + if use_optimized_path: + packed_files: List[List[DataFileMeta]] = self._pack_for_ordered( + data_files, single_weight_func, self.target_split_size + ) + splits += self._build_split_from_pack( + packed_files, file_entries, True, deletion_files_map, + use_optimized_path) + else: + partition_sort_runs: List[List[SortedRun]] = IntervalPartition(data_files).partition() + sections: List[List[DataFileMeta]] = [ + [file for s in sl for file in s.files] + for sl in partition_sort_runs + ] - # Each of the first 'remainder' shards gets one extra row - if self.idx_of_this_subtask < remainder: - num_row = base_rows_per_shard + 1 - start_pos = self.idx_of_this_subtask * (base_rows_per_shard + 1) - else: - num_row = base_rows_per_shard - start_pos = (remainder * (base_rows_per_shard + 1) + - (self.idx_of_this_subtask - remainder) * base_rows_per_shard) + packed_files: List[List[List[DataFileMeta]]] = self._pack_for_ordered(sections, weight_func, + self.target_split_size) - end_pos = start_pos + num_row - return self._data_evolution_filter_by_row_range(partitioned_files, start_pos, end_pos) + flatten_packed_files: List[List[DataFileMeta]] = [ + [file for sub_pack in pack for file in sub_pack] + for pack in packed_files + ] + splits += self._build_split_from_pack( + flatten_packed_files, file_entries, True, + deletion_files_map, False) + return splits - def _compute_split_start_end_pos(self, splits: List[Split], plan_start_pos, plan_end_pos): - """ - Find files that needs to be divided for each split - :param splits: splits - :param plan_start_pos: plan begin row in all splits data - :param plan_end_pos: plan end row in all splits data - """ - file_end_pos = 0 # end row position of current file in all splits data + def _create_data_evolution_splits( + self, file_entries: List[ManifestEntry], deletion_files_map: dict = None) -> List['Split']: + def sort_key(manifest_entry: ManifestEntry) -> tuple: + first_row_id = manifest_entry.file.first_row_id if manifest_entry.file.first_row_id is not None else float( + '-inf') + is_blob = 1 if self._is_blob_file(manifest_entry.file.file_name) else 0 + # For files with same firstRowId, sort by maxSequenceNumber in descending order + # (larger sequence number means more recent data) + max_seq = manifest_entry.file.max_sequence_number + return first_row_id, is_blob, -max_seq - for split in splits: - cur_split_end_pos = file_end_pos - # Compute split_file_idx_map for data files - file_end_pos = self._compute_split_file_idx_map(plan_start_pos, plan_end_pos, - split, cur_split_end_pos, False) - # Compute split_file_idx_map for blob files - if self.data_evolution: - self._compute_split_file_idx_map(plan_start_pos, plan_end_pos, - split, cur_split_end_pos, True) + partitioned_files = defaultdict(list) + for entry in file_entries: + partitioned_files[(tuple(entry.partition.values), entry.bucket)].append(entry) - def _compute_split_file_idx_map(self, plan_start_pos, plan_end_pos, split: Split, - file_end_pos: int, is_blob: bool = False): - """ - Traverse all the files in current split, find the starting shard and ending shard files, - and add them to shard_file_idx_map; - - for data file, only two data files will be divided in all splits. - - for blob file, perhaps there will be some unnecessary files in addition to two files(start and end). - Add them to shard_file_idx_map as well, because they need to be removed later. - """ - row_cnt = 0 - for file in split.files: - if not is_blob and self._is_blob_file(file.file_name): - continue - if is_blob and not self._is_blob_file(file.file_name): + if self._partial_read(): + partitioned_files = self._filter_by_pos(partitioned_files) + + def weight_func(file_list: List[DataFileMeta]) -> int: + return max(sum(f.file_size for f in file_list), self.open_file_cost) + + splits = [] + for key, key_entries in partitioned_files.items(): + if not key_entries: continue - row_cnt += file.row_count - file_begin_pos = file_end_pos # Starting row position of current file in all data - file_end_pos += file.row_count # Update to row position after current file - if file_begin_pos <= plan_start_pos < plan_end_pos <= file_end_pos: - split.shard_file_idx_map[file.file_name] = ( - plan_start_pos - file_begin_pos, plan_end_pos - file_begin_pos) - # If shard start position is within current file, record actual start position and relative offset - elif file_begin_pos < plan_start_pos < file_end_pos: - split.shard_file_idx_map[file.file_name] = (plan_start_pos - file_begin_pos, file.row_count) - # If shard end position is within current file, record relative end position - elif file_begin_pos < plan_end_pos < file_end_pos: - split.shard_file_idx_map[file.file_name] = (0, plan_end_pos - file_begin_pos) - elif file_end_pos <= plan_start_pos or file_begin_pos >= plan_end_pos: - split.shard_file_idx_map[file.file_name] = (-1, -1) - return file_end_pos + sorted_entries = sorted(key_entries, key=sort_key) + data_files: List[DataFileMeta] = [e.file for e in sorted_entries] - def _primary_key_filter_by_shard(self, file_entries: List[ManifestEntry]) -> List[ManifestEntry]: - filtered_entries = [] - for entry in file_entries: - if entry.bucket % self.number_of_para_subtasks == self.idx_of_this_subtask: - filtered_entries.append(entry) - return filtered_entries + # Split files by firstRowId for data evolution + split_by_row_id = self._split_by_row_id(data_files) + + # Pack the split groups for optimal split sizes + packed_files: List[List[List[DataFileMeta]]] = self._pack_for_ordered(split_by_row_id, weight_func, + self.target_split_size) + + # Flatten the packed files and build splits + flatten_packed_files: List[List[DataFileMeta]] = [ + [file for sub_pack in pack for file in sub_pack] + for pack in packed_files + ] + + splits += self._build_split_from_pack(flatten_packed_files, sorted_entries, False, deletion_files_map) + if self._partial_read(): + self._compute_split_pos(splits) + return splits def _apply_push_down_limit(self, splits: List[Split]) -> List[Split]: if self.limit is None: @@ -440,188 +364,33 @@ def _to_deletion_files(self, index_entry) -> Dict[str, DeletionFile]: ) deletion_files[data_file_name] = deletion_file - return deletion_files - - def _get_deletion_files_for_split(self, data_files: List[DataFileMeta], - deletion_files_map: dict, - partition: GenericRow, - bucket: int) -> Optional[List[DeletionFile]]: - """ - Get deletion files for the given data files in a split. - """ - if not deletion_files_map: - return None - - partition_key = (tuple(partition.values), bucket) - file_deletion_map = deletion_files_map.get(partition_key, {}) - - if not file_deletion_map: - return None - - deletion_files = [] - for data_file in data_files: - deletion_file = file_deletion_map.get(data_file.file_name) - if deletion_file: - deletion_files.append(deletion_file) - else: - deletion_files.append(None) - - return deletion_files if any(df is not None for df in deletion_files) else None - - def _create_append_only_splits( - self, file_entries: List[ManifestEntry], deletion_files_map: dict = None) -> List['Split']: - partitioned_files = defaultdict(list) - for entry in file_entries: - partitioned_files[(tuple(entry.partition.values), entry.bucket)].append(entry) - - if self.start_pos_of_this_subtask is not None: - # shard data range: [plan_start_pos, plan_end_pos) - partitioned_files, plan_start_pos, plan_end_pos = \ - self._append_only_filter_by_slice(partitioned_files, - self.start_pos_of_this_subtask, - self.end_pos_of_this_subtask) - elif self.idx_of_this_subtask is not None: - partitioned_files, plan_start_pos, plan_end_pos = self._append_only_filter_by_shard(partitioned_files) - - def weight_func(f: DataFileMeta) -> int: - return max(f.file_size, self.open_file_cost) - - splits = [] - for key, file_entries in partitioned_files.items(): - if not file_entries: - return [] - - data_files: List[DataFileMeta] = [e.file for e in file_entries] - - packed_files: List[List[DataFileMeta]] = self._pack_for_ordered(data_files, weight_func, - self.target_split_size) - splits += self._build_split_from_pack(packed_files, file_entries, False, deletion_files_map) - if self.start_pos_of_this_subtask is not None or self.idx_of_this_subtask is not None: - # When files are combined into splits, it is necessary to find files that needs to be divided for each split - self._compute_split_start_end_pos(splits, plan_start_pos, plan_end_pos) - return splits - - def _without_delete_row(self, data_file_meta: DataFileMeta) -> bool: - # null to true to be compatible with old version - if data_file_meta.delete_row_count is None: - return True - return data_file_meta.delete_row_count == 0 - - def _create_primary_key_splits( - self, file_entries: List[ManifestEntry], deletion_files_map: dict = None) -> List['Split']: - if self.idx_of_this_subtask is not None: - file_entries = self._primary_key_filter_by_shard(file_entries) - partitioned_files = defaultdict(list) - for entry in file_entries: - partitioned_files[(tuple(entry.partition.values), entry.bucket)].append(entry) - - def single_weight_func(f: DataFileMeta) -> int: - return max(f.file_size, self.open_file_cost) - - def weight_func(fl: List[DataFileMeta]) -> int: - return max(sum(f.file_size for f in fl), self.open_file_cost) - - merge_engine = self.table.options.merge_engine() - merge_engine_first_row = merge_engine == MergeEngine.FIRST_ROW - - splits = [] - for key, file_entries in partitioned_files.items(): - if not file_entries: - continue - - data_files: List[DataFileMeta] = [e.file for e in file_entries] - - raw_convertible = all( - f.level != 0 and self._without_delete_row(f) - for f in data_files - ) - - levels = {f.level for f in data_files} - one_level = len(levels) == 1 - - use_optimized_path = raw_convertible and ( - self.deletion_vectors_enabled or merge_engine_first_row or one_level) - if use_optimized_path: - packed_files: List[List[DataFileMeta]] = self._pack_for_ordered( - data_files, single_weight_func, self.target_split_size - ) - splits += self._build_split_from_pack( - packed_files, file_entries, True, deletion_files_map, - use_optimized_path) - else: - partition_sort_runs: List[List[SortedRun]] = IntervalPartition(data_files).partition() - sections: List[List[DataFileMeta]] = [ - [file for s in sl for file in s.files] - for sl in partition_sort_runs - ] - - packed_files: List[List[List[DataFileMeta]]] = self._pack_for_ordered(sections, weight_func, - self.target_split_size) - - flatten_packed_files: List[List[DataFileMeta]] = [ - [file for sub_pack in pack for file in sub_pack] - for pack in packed_files - ] - splits += self._build_split_from_pack( - flatten_packed_files, file_entries, True, - deletion_files_map, False) - return splits - - def _create_data_evolution_splits( - self, file_entries: List[ManifestEntry], deletion_files_map: dict = None) -> List['Split']: - def sort_key(manifest_entry: ManifestEntry) -> tuple: - first_row_id = manifest_entry.file.first_row_id if manifest_entry.file.first_row_id is not None else float( - '-inf') - is_blob = 1 if self._is_blob_file(manifest_entry.file.file_name) else 0 - # For files with same firstRowId, sort by maxSequenceNumber in descending order - # (larger sequence number means more recent data) - max_seq = manifest_entry.file.max_sequence_number - return first_row_id, is_blob, -max_seq - - sorted_entries = sorted(file_entries, key=sort_key) - - partitioned_files = defaultdict(list) - for entry in sorted_entries: - partitioned_files[(tuple(entry.partition.values), entry.bucket)].append(entry) - - if self.start_pos_of_this_subtask is not None: - # shard data range: [plan_start_pos, plan_end_pos) - partitioned_files, plan_start_pos, plan_end_pos = \ - self._data_evolution_filter_by_row_range(partitioned_files, - self.start_pos_of_this_subtask, - self.end_pos_of_this_subtask) - elif self.idx_of_this_subtask is not None: - # shard data range: [plan_start_pos, plan_end_pos) - partitioned_files, plan_start_pos, plan_end_pos = self._data_evolution_filter_by_shard(partitioned_files) - - def weight_func(file_list: List[DataFileMeta]) -> int: - return max(sum(f.file_size for f in file_list), self.open_file_cost) - - splits = [] - for key, sorted_entries in partitioned_files.items(): - if not sorted_entries: - continue - - data_files: List[DataFileMeta] = [e.file for e in sorted_entries] + return deletion_files - # Split files by firstRowId for data evolution - split_by_row_id = self._split_by_row_id(data_files) + def _get_deletion_files_for_split(self, data_files: List[DataFileMeta], + deletion_files_map: dict, + partition: GenericRow, + bucket: int) -> Optional[List[DeletionFile]]: + """ + Get deletion files for the given data files in a split. + """ + if not deletion_files_map: + return None - # Pack the split groups for optimal split sizes - packed_files: List[List[List[DataFileMeta]]] = self._pack_for_ordered(split_by_row_id, weight_func, - self.target_split_size) + partition_key = (tuple(partition.values), bucket) + file_deletion_map = deletion_files_map.get(partition_key, {}) - # Flatten the packed files and build splits - flatten_packed_files: List[List[DataFileMeta]] = [ - [file for sub_pack in pack for file in sub_pack] - for pack in packed_files - ] + if not file_deletion_map: + return None - splits += self._build_split_from_pack(flatten_packed_files, sorted_entries, False, deletion_files_map) + deletion_files = [] + for data_file in data_files: + deletion_file = file_deletion_map.get(data_file.file_name) + if deletion_file: + deletion_files.append(deletion_file) + else: + deletion_files.append(None) - if self.start_pos_of_this_subtask is not None or self.idx_of_this_subtask is not None: - self._compute_split_start_end_pos(splits, plan_start_pos, plan_end_pos) - return splits + return deletion_files if any(df is not None for df in deletion_files) else None def _split_by_row_id(self, files: List[DataFileMeta]) -> List[List[DataFileMeta]]: split_by_row_id = [] @@ -753,3 +522,263 @@ def _filter_blob(files: List[DataFileMeta]) -> List[DataFileMeta]: result.append(file) return result + + def _without_delete_row(self, data_file_meta: DataFileMeta) -> bool: + # null to true to be compatible with old version + if data_file_meta.delete_row_count is None: + return True + return data_file_meta.delete_row_count == 0 + + def _partial_read(self): + return False + + def _filter_by_pos(self, files): + pass + + def _compute_split_pos(self, splits: List['Split']) -> None: + pass + + +class PartialStartingScanner(FullStartingScanner): + def __init__(self, table, predicate: Optional[Predicate], limit: Optional[int]): + super().__init__(table, predicate, limit) + # for shard + self.idx_of_this_subtask = None + self.number_of_para_subtasks = None + self.start_pos_of_this_subtask = None + self.end_pos_of_this_subtask = None + self.plan_start_end_pos = None + + def with_shard(self, idx_of_this_subtask, number_of_para_subtasks) -> 'FullStartingScanner': + if idx_of_this_subtask >= number_of_para_subtasks: + raise Exception("idx_of_this_subtask must be less than number_of_para_subtasks") + if self.start_pos_of_this_subtask is not None: + raise Exception("with_shard and with_slice cannot be used simultaneously") + self.idx_of_this_subtask = idx_of_this_subtask + self.number_of_para_subtasks = number_of_para_subtasks + return self + + def with_slice(self, start_pos, end_pos) -> 'FullStartingScanner': + if start_pos >= end_pos: + raise Exception("start_pos must be less than end_pos") + if self.idx_of_this_subtask is not None: + raise Exception("with_slice and with_shard cannot be used simultaneously") + self.start_pos_of_this_subtask = start_pos + self.end_pos_of_this_subtask = end_pos + return self + + def _filter_by_pos(self, files): + if self.table.is_primary_key_table: + return self._primary_key_filter_by_shard(files) + elif self.data_evolution: + if self.start_pos_of_this_subtask is not None: + # shard data range: [plan_start_pos, plan_end_pos) + files, self.plan_start_end_pos = \ + self._data_evolution_filter_by_slice(files, + self.start_pos_of_this_subtask, + self.end_pos_of_this_subtask) + elif self.idx_of_this_subtask is not None: + files, self.plan_start_end_pos = self._data_evolution_filter_by_shard(files) + return files + else: + if self.start_pos_of_this_subtask is not None: + # shard data range: [plan_start_pos, plan_end_pos) + files, self.plan_start_end_pos = \ + self._append_only_filter_by_slice(files, + self.start_pos_of_this_subtask, + self.end_pos_of_this_subtask) + elif self.idx_of_this_subtask is not None: + files, self.plan_start_end_pos = self._append_only_filter_by_shard(files) + return files + + def _compute_split_pos(self, splits: List['Split']) -> None: + if self.start_pos_of_this_subtask is not None or self.idx_of_this_subtask is not None: + # When files are combined into splits, it is necessary to find files that needs to be divided for each split + self._compute_split_start_end_pos(splits, self.plan_start_end_pos[0], self.plan_start_end_pos[1]) + + def _append_only_filter_by_slice(self, partitioned_files: defaultdict, start_pos: int, + end_pos: int) -> (defaultdict, int, int): + plan_start_pos = 0 + plan_end_pos = 0 + entry_end_pos = 0 # end row position of current file in all data + splits_start_pos = 0 + filtered_partitioned_files = defaultdict(list) + # Iterate through all file entries to find files that overlap with current shard range + for key, file_entries in partitioned_files.items(): + filtered_entries = [] + for entry in file_entries: + entry_begin_pos = entry_end_pos # Starting row position of current file in all data + entry_end_pos += entry.file.row_count # Update to row position after current file + + # If current file is completely after shard range, stop iteration + if entry_begin_pos >= end_pos: + break + # If current file is completely before shard range, skip it + if entry_end_pos <= start_pos: + continue + if entry_begin_pos <= start_pos < entry_end_pos: + splits_start_pos = entry_begin_pos + plan_start_pos = start_pos - entry_begin_pos + # If shard end position is within current file, record relative end position + if entry_begin_pos < end_pos <= entry_end_pos: + plan_end_pos = end_pos - splits_start_pos + # Add files that overlap with shard range to result + filtered_entries.append(entry) + if filtered_entries: + filtered_partitioned_files[key] = filtered_entries + + return filtered_partitioned_files, (plan_start_pos, plan_end_pos) + + def _append_only_filter_by_shard(self, partitioned_files: defaultdict) -> (defaultdict, int, int): + """ + Filter file entries by shard. Only keep the files within the range, which means + that only the starting and ending files need to be further divided subsequently + """ + total_row = 0 + # Sort by file creation time to ensure consistent sharding + for key, file_entries in partitioned_files.items(): + for entry in file_entries: + total_row += entry.file.row_count + + # Calculate number of rows this shard should process using balanced distribution + # Distribute remainder evenly among first few shards to avoid last shard overload + base_rows_per_shard = total_row // self.number_of_para_subtasks + remainder = total_row % self.number_of_para_subtasks + + # Each of the first 'remainder' shards gets one extra row + if self.idx_of_this_subtask < remainder: + num_row = base_rows_per_shard + 1 + start_pos = self.idx_of_this_subtask * (base_rows_per_shard + 1) + else: + num_row = base_rows_per_shard + start_pos = (remainder * (base_rows_per_shard + 1) + + (self.idx_of_this_subtask - remainder) * base_rows_per_shard) + + end_pos = start_pos + num_row + + return self._append_only_filter_by_slice(partitioned_files, start_pos, end_pos) + + def _data_evolution_filter_by_slice(self, partitioned_files: defaultdict, + start_pos: int, + end_pos: int) -> (defaultdict, int, int): + plan_start_pos = 0 + plan_end_pos = 0 + entry_end_pos = 0 # end row position of current file in all data + splits_start_pos = 0 + filtered_partitioned_files = defaultdict(list) + # Iterate through all file entries to find files that overlap with current shard range + for key, file_entries in partitioned_files.items(): + filtered_entries = [] + blob_added = False # If it is true, all blobs corresponding to this data file are added + for entry in file_entries: + if self._is_blob_file(entry.file.file_name): + if blob_added: + filtered_entries.append(entry) + continue + blob_added = False + entry_begin_pos = entry_end_pos # Starting row position of current file in all data + entry_end_pos += entry.file.row_count # Update to row position after current file + + # If current file is completely after shard range, stop iteration + if entry_begin_pos >= end_pos: + break + # If current file is completely before shard range, skip it + if entry_end_pos <= start_pos: + continue + if entry_begin_pos <= start_pos < entry_end_pos: + splits_start_pos = entry_begin_pos + plan_start_pos = start_pos - entry_begin_pos + # If shard end position is within current file, record relative end position + if entry_begin_pos < end_pos <= entry_end_pos: + plan_end_pos = end_pos - splits_start_pos + # Add files that overlap with shard range to result + filtered_entries.append(entry) + blob_added = True + if filtered_entries: + filtered_partitioned_files[key] = filtered_entries + + return filtered_partitioned_files, (plan_start_pos, plan_end_pos) + + def _data_evolution_filter_by_shard(self, partitioned_files: defaultdict) -> (defaultdict, int, int): + total_row = 0 + for key, file_entries in partitioned_files.items(): + for entry in file_entries: + if not self._is_blob_file(entry.file.file_name): + total_row += entry.file.row_count + + # Calculate number of rows this shard should process using balanced distribution + # Distribute remainder evenly among first few shards to avoid last shard overload + base_rows_per_shard = total_row // self.number_of_para_subtasks + remainder = total_row % self.number_of_para_subtasks + + # Each of the first 'remainder' shards gets one extra row + if self.idx_of_this_subtask < remainder: + num_row = base_rows_per_shard + 1 + start_pos = self.idx_of_this_subtask * (base_rows_per_shard + 1) + else: + num_row = base_rows_per_shard + start_pos = (remainder * (base_rows_per_shard + 1) + + (self.idx_of_this_subtask - remainder) * base_rows_per_shard) + + end_pos = start_pos + num_row + return self._data_evolution_filter_by_slice(partitioned_files, start_pos, end_pos) + + def _primary_key_filter_by_shard(self, file_entries: List[ManifestEntry]) -> List[ManifestEntry]: + filtered_entries = [] + for entry in file_entries: + if entry.bucket % self.number_of_para_subtasks == self.idx_of_this_subtask: + filtered_entries.append(entry) + return filtered_entries + + def _compute_split_start_end_pos(self, splits: List[Split], plan_start_pos, plan_end_pos): + """ + Find files that needs to be divided for each split + :param splits: splits + :param plan_start_pos: plan begin row in all splits data + :param plan_end_pos: plan end row in all splits data + """ + file_end_pos = 0 # end row position of current file in all splits data + + for split in splits: + cur_split_end_pos = file_end_pos + # Compute split_file_idx_map for data files + file_end_pos = self._compute_split_file_idx_map(plan_start_pos, plan_end_pos, + split, cur_split_end_pos, False) + # Compute split_file_idx_map for blob files + if self.data_evolution: + self._compute_split_file_idx_map(plan_start_pos, plan_end_pos, + split, cur_split_end_pos, True) + + def _compute_split_file_idx_map(self, plan_start_pos, plan_end_pos, split: Split, + file_end_pos: int, is_blob: bool = False): + """ + Traverse all the files in current split, find the starting shard and ending shard files, + and add them to shard_file_idx_map; + - for data file, only two data files will be divided in all splits. + - for blob file, perhaps there will be some unnecessary files in addition to two files(start and end). + Add them to shard_file_idx_map as well, because they need to be removed later. + """ + row_cnt = 0 + for file in split.files: + if not is_blob and self._is_blob_file(file.file_name): + continue + if is_blob and not self._is_blob_file(file.file_name): + continue + row_cnt += file.row_count + file_begin_pos = file_end_pos # Starting row position of current file in all data + file_end_pos += file.row_count # Update to row position after current file + if file_begin_pos <= plan_start_pos < plan_end_pos <= file_end_pos: + split.shard_file_idx_map[file.file_name] = ( + plan_start_pos - file_begin_pos, plan_end_pos - file_begin_pos) + # If shard start position is within current file, record actual start position and relative offset + elif file_begin_pos < plan_start_pos < file_end_pos: + split.shard_file_idx_map[file.file_name] = (plan_start_pos - file_begin_pos, file.row_count) + # If shard end position is within current file, record relative end position + elif file_begin_pos < plan_end_pos < file_end_pos: + split.shard_file_idx_map[file.file_name] = (0, plan_end_pos - file_begin_pos) + elif file_end_pos <= plan_start_pos or file_begin_pos >= plan_end_pos: + split.shard_file_idx_map[file.file_name] = (-1, -1) + return file_end_pos + + def _partial_read(self): + return True diff --git a/paimon-python/pypaimon/read/table_scan.py b/paimon-python/pypaimon/read/table_scan.py index 7aac62865932..bd1681522207 100755 --- a/paimon-python/pypaimon/read/table_scan.py +++ b/paimon-python/pypaimon/read/table_scan.py @@ -23,7 +23,7 @@ from pypaimon.read.plan import Plan from pypaimon.read.scanner.empty_starting_scanner import EmptyStartingScanner -from pypaimon.read.scanner.full_starting_scanner import FullStartingScanner +from pypaimon.read.scanner.full_starting_scanner import FullStartingScanner, PartialStartingScanner from pypaimon.read.scanner.incremental_starting_scanner import \ IncrementalStartingScanner from pypaimon.read.scanner.starting_scanner import StartingScanner @@ -39,9 +39,12 @@ def __init__(self, table, predicate: Optional[Predicate], limit: Optional[int]): self.table: FileStoreTable = table self.predicate = predicate self.limit = limit - self.starting_scanner = self._create_starting_scanner() + self.starting_scanner = None + self.partial_read: Optional[bool] = None def plan(self) -> Plan: + if self.starting_scanner is None: + self.starting_scanner = self._create_starting_scanner() return self.starting_scanner.scan() def _create_starting_scanner(self) -> Optional[StartingScanner]: @@ -66,12 +69,30 @@ def _create_starting_scanner(self) -> Optional[StartingScanner]: return EmptyStartingScanner() return IncrementalStartingScanner.between_timestamps(self.table, self.predicate, self.limit, start_timestamp, end_timestamp) - return FullStartingScanner(self.table, self.predicate, self.limit) + elif self.partial_read: + return PartialStartingScanner(self.table, self.predicate, self.limit) + else: + return FullStartingScanner(self.table, self.predicate, self.limit) def with_shard(self, idx_of_this_subtask, number_of_para_subtasks) -> 'TableScan': + self.partial_read = True + self.starting_scanner = self._create_starting_scanner() self.starting_scanner.with_shard(idx_of_this_subtask, number_of_para_subtasks) return self def with_slice(self, start_pos, end_pos) -> 'TableScan': + self.partial_read = True + self.starting_scanner = self._create_starting_scanner() self.starting_scanner.with_slice(start_pos, end_pos) return self + + def with_sample(self, num_rows: int) -> 'TableScan': + """Sample the table with the given number of rows. + + params: + num_rows: The number of rows to sample. + """ + self.partial_read = True + self.starting_scanner = self._create_starting_scanner() + self.starting_scanner.with_sample(num_rows) + return self From 7d587e6888a885c0c1e41b4589620e0d26245e44 Mon Sep 17 00:00:00 2001 From: umi Date: Tue, 13 Jan 2026 20:51:03 +0800 Subject: [PATCH 2/3] fix --- paimon-python/pypaimon/read/table_scan.py | 11 ----------- 1 file changed, 11 deletions(-) diff --git a/paimon-python/pypaimon/read/table_scan.py b/paimon-python/pypaimon/read/table_scan.py index bd1681522207..8f7348ae5982 100755 --- a/paimon-python/pypaimon/read/table_scan.py +++ b/paimon-python/pypaimon/read/table_scan.py @@ -85,14 +85,3 @@ def with_slice(self, start_pos, end_pos) -> 'TableScan': self.starting_scanner = self._create_starting_scanner() self.starting_scanner.with_slice(start_pos, end_pos) return self - - def with_sample(self, num_rows: int) -> 'TableScan': - """Sample the table with the given number of rows. - - params: - num_rows: The number of rows to sample. - """ - self.partial_read = True - self.starting_scanner = self._create_starting_scanner() - self.starting_scanner.with_sample(num_rows) - return self From 81294e3de1a91c648582f905c627c8e5bac57915 Mon Sep 17 00:00:00 2001 From: umi Date: Tue, 13 Jan 2026 21:32:40 +0800 Subject: [PATCH 3/3] fix --- .../pypaimon/tests/reader_base_test.py | 17 ++++++++++------- .../pypaimon/tests/reader_primary_key_test.py | 1 + .../tests/schema_evolution_read_test.py | 5 ++++- 3 files changed, 15 insertions(+), 8 deletions(-) diff --git a/paimon-python/pypaimon/tests/reader_base_test.py b/paimon-python/pypaimon/tests/reader_base_test.py index a7b2abd516dd..50d7c627457e 100644 --- a/paimon-python/pypaimon/tests/reader_base_test.py +++ b/paimon-python/pypaimon/tests/reader_base_test.py @@ -512,6 +512,7 @@ def test_primary_key_value_stats_excludes_system_fields(self): pk_read_builder = pk_table.new_read_builder() pk_table_scan = pk_read_builder.new_scan() + pk_table_scan.starting_scanner = pk_table_scan._create_starting_scanner() latest_snapshot = SnapshotManager(pk_table).get_latest_snapshot() pk_manifest_files = pk_table_scan.starting_scanner.manifest_list_manager.read_all(latest_snapshot) pk_manifest_entries = pk_table_scan.starting_scanner.manifest_file_manager.read( @@ -582,6 +583,7 @@ def test_value_stats_empty_when_stats_disabled(self): read_builder = table.new_read_builder() table_scan = read_builder.new_scan() + table_scan.starting_scanner = table_scan._create_starting_scanner() latest_snapshot = SnapshotManager(table).get_latest_snapshot() manifest_files = table_scan.starting_scanner.manifest_list_manager.read_all(latest_snapshot) manifest_entries = table_scan.starting_scanner.manifest_file_manager.read( @@ -1089,6 +1091,7 @@ def test_primary_key_value_stats(self): # Read manifest to verify value_stats_cols is None (all fields included) read_builder = table.new_read_builder() table_scan = read_builder.new_scan() + table_scan.starting_scanner = table_scan._create_starting_scanner() latest_snapshot = SnapshotManager(table).get_latest_snapshot() manifest_files = table_scan.starting_scanner.manifest_list_manager.read_all(latest_snapshot) manifest_entries = table_scan.starting_scanner.manifest_file_manager.read( @@ -1111,7 +1114,7 @@ def test_primary_key_value_stats(self): value_stats = file_meta.value_stats self.assertIsNotNone(value_stats, "value_stats should not be None") - + if file_meta.value_stats_cols is None: expected_value_fields = ['name', 'price', 'category'] self.assertGreaterEqual(value_stats.min_values.arity, len(expected_value_fields), @@ -1119,12 +1122,12 @@ def test_primary_key_value_stats(self): else: self.assertNotIn('id', file_meta.value_stats_cols, "Key field 'id' should NOT be in value_stats_cols") - + expected_value_fields = ['name', 'price', 'category'] self.assertTrue(set(expected_value_fields).issubset(set(file_meta.value_stats_cols)), f"value_stats_cols should contain value fields: {expected_value_fields}, " f"but got: {file_meta.value_stats_cols}") - + expected_arity = len(file_meta.value_stats_cols) self.assertEqual(value_stats.min_values.arity, expected_arity, f"value_stats should contain {expected_arity} fields (matching value_stats_cols), " @@ -1135,17 +1138,17 @@ def test_primary_key_value_stats(self): self.assertEqual(len(value_stats.null_counts), expected_arity, f"value_stats null_counts should have {expected_arity} elements, " f"but got {len(value_stats.null_counts)}") - + self.assertEqual(value_stats.min_values.arity, len(file_meta.value_stats_cols), f"value_stats.min_values.arity ({value_stats.min_values.arity}) must match " f"value_stats_cols length ({len(file_meta.value_stats_cols)})") - + for field_name in file_meta.value_stats_cols: is_system_field = (field_name.startswith('_KEY_') or field_name in ['_SEQUENCE_NUMBER', '_VALUE_KIND', '_ROW_ID']) self.assertFalse(is_system_field, f"value_stats_cols should not contain system field: {field_name}") - + value_stats_fields = table_scan.starting_scanner.manifest_file_manager._get_value_stats_fields( {'_VALUE_STATS_COLS': file_meta.value_stats_cols}, table.fields @@ -1161,7 +1164,7 @@ def test_primary_key_value_stats(self): self.assertEqual(len(min_value_stats), 3, "min_value_stats should have 3 values") self.assertEqual(len(max_value_stats), 3, "max_value_stats should have 3 values") - + actual_data = read_builder.new_read().to_arrow(table_scan.plan().splits()) self.assertEqual(actual_data.num_rows, 5, "Should have 5 rows") actual_ids = sorted(actual_data.column('id').to_pylist()) diff --git a/paimon-python/pypaimon/tests/reader_primary_key_test.py b/paimon-python/pypaimon/tests/reader_primary_key_test.py index c22346afe739..3db91f3c4c97 100644 --- a/paimon-python/pypaimon/tests/reader_primary_key_test.py +++ b/paimon-python/pypaimon/tests/reader_primary_key_test.py @@ -358,6 +358,7 @@ def test_manifest_creation_time_timestamp(self): latest_snapshot = snapshot_manager.get_latest_snapshot() read_builder = table.new_read_builder() table_scan = read_builder.new_scan() + table_scan.starting_scanner = table_scan._create_starting_scanner() manifest_list_manager = table_scan.starting_scanner.manifest_list_manager manifest_files = manifest_list_manager.read_all(latest_snapshot) diff --git a/paimon-python/pypaimon/tests/schema_evolution_read_test.py b/paimon-python/pypaimon/tests/schema_evolution_read_test.py index a67a927a5e61..3f8384f9b2cb 100644 --- a/paimon-python/pypaimon/tests/schema_evolution_read_test.py +++ b/paimon-python/pypaimon/tests/schema_evolution_read_test.py @@ -259,12 +259,15 @@ def test_schema_evolution_with_scan_filter(self): schema_manager.commit(TableSchema.from_schema(schema_id=0, schema=schema)) schema_manager.commit(TableSchema.from_schema(schema_id=1, schema=schema2)) # scan filter for schema evolution - latest_snapshot = table1.new_read_builder().new_scan().starting_scanner.snapshot_manager.get_latest_snapshot() + table_scan = table1.new_read_builder().new_scan() + table_scan.starting_scanner = table_scan._create_starting_scanner() + latest_snapshot = table_scan.starting_scanner.snapshot_manager.get_latest_snapshot() table2.table_path = table1.table_path new_read_buidler = table2.new_read_builder() predicate_builder = new_read_buidler.new_predicate_builder() predicate = predicate_builder.less_than('user_id', 3) new_scan = new_read_buidler.with_filter(predicate).new_scan() + new_scan.starting_scanner = new_scan._create_starting_scanner() manifest_files = new_scan.starting_scanner.manifest_list_manager.read_all(latest_snapshot) entries = new_scan.starting_scanner.read_manifest_entries(manifest_files) self.assertEqual(1, len(entries)) # verify scan filter success for schema evolution