Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion paimon-python/dev/lint-python.sh
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ function collect_checks() {
function get_all_supported_checks() {
_OLD_IFS=$IFS
IFS=$'\n'
SUPPORT_CHECKS=("flake8_check" "pytest_torch_check" "pytest_check" "mixed_check") # control the calling sequence
SUPPORT_CHECKS=("flake8_check" "pytest_check" "pytest_torch_check" "mixed_check") # control the calling sequence
for fun in $(declare -F); do
if [[ `regexp_match "$fun" "_check$"` = true ]]; then
check_name="${fun:11}"
Expand Down
5 changes: 0 additions & 5 deletions paimon-python/pypaimon/read/reader/field_bunch.py
Original file line number Diff line number Diff line change
Expand Up @@ -82,11 +82,6 @@ def add(self, file: DataFileMeta) -> None:
"Blob file with overlapping row id should have decreasing sequence number."
)
return
elif first_row_id > self.expected_next_first_row_id:
raise ValueError(
f"Blob file first row id should be continuous, expect "
f"{self.expected_next_first_row_id} but got {first_row_id}"
)

if file.schema_id != self._files[0].schema_id:
raise ValueError(
Expand Down
78 changes: 78 additions & 0 deletions paimon-python/pypaimon/read/reader/shard_batch_reader.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ class ShardBatchReader(RecordBatchReader):
"""
A reader that reads a subset of rows from a data file
"""

def __init__(self, reader, start_pos, end_pos):
self.reader = reader
self.start_pos = start_pos
Expand Down Expand Up @@ -59,3 +60,80 @@ def read_arrow_batch(self) -> Optional[RecordBatch]:

def close(self):
self.reader.close()


class SampleBatchReader(RecordBatchReader):
"""
A reader that reads a subset of rows from a data file based on specified sample positions.

This reader wraps another RecordBatchReader and only returns rows at the specified
sample positions, enabling efficient random sampling of data without reading all rows.

The reader supports two modes:
1. For blob readers: Directly reads specific rows by index
2. For other readers: Reads batches sequentially and extracts only the sampled rows

Attributes:
reader: The underlying RecordBatchReader to read data from
sample_positions: A sorted list of row indices to sample (0-based)
sample_idx: Current index in the sample_positions list
current_pos: Current absolute row position in the data file
"""

def __init__(self, reader, sample_positions):
"""
Initialize the SampleBatchReader.

Args:
reader: The underlying RecordBatchReader to read data from
sample_positions: A sorted list of row indices to sample (0-based).
Must be sorted in ascending order for correct behavior.
"""
self.reader = reader
self.sample_positions = sample_positions
self.sample_idx = 0
self.current_pos = 0

def read_arrow_batch(self) -> Optional[RecordBatch]:
"""
Read the next batch containing sampled rows.

This method reads data from the underlying reader and returns only the rows
at the specified sample positions. The behavior differs based on reader type:

- For FormatBlobReader: Directly reads individual rows by index
- For other readers: Reads batches sequentially and extracts sampled rows
using PyArrow's take() method
"""
if self.sample_idx >= len(self.sample_positions):
return None
if isinstance(self.reader.format_reader, FormatBlobReader):
# For blob reader, pass begin_idx and end_idx parameters
batch = self.reader.read_arrow_batch(start_idx=self.sample_positions[self.sample_idx],
end_idx=self.sample_positions[self.sample_idx] + 1)
self.sample_idx += 1
return batch
else:
batch = self.reader.read_arrow_batch()
if batch is None:
return None

batch_begin = self.current_pos
self.current_pos += batch.num_rows
take_idxes = []

sample_pos = self.sample_positions[self.sample_idx]
while batch_begin <= sample_pos < self.current_pos:
take_idxes.append(sample_pos - batch_begin)
self.sample_idx += 1
if self.sample_idx >= len(self.sample_positions):
break
sample_pos = self.sample_positions[self.sample_idx]

if take_idxes:
return batch.take(take_idxes)
else: # batch is outside the desired range
return self.read_arrow_batch()

def close(self):
self.reader.close()
152 changes: 135 additions & 17 deletions paimon-python/pypaimon/read/scanner/full_starting_scanner.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
limitations under the License.
"""
import os
import random
from collections import defaultdict
from typing import Callable, List, Optional, Dict, Set

Expand Down Expand Up @@ -60,11 +61,15 @@ def __init__(self, table, predicate: Optional[Predicate], limit: Optional[int]):
# Get split target size and open file cost from table options
self.target_split_size = options.source_split_target_size()
self.open_file_cost = options.source_split_open_file_cost()

# for shard
self.idx_of_this_subtask = None
self.number_of_para_subtasks = None
self.start_pos_of_this_subtask = None
self.end_pos_of_this_subtask = None
# for sample
self.sample_num_rows = None
self.sample_indexes = None
self.file_positions = None

self.only_read_real_buckets = True if options.bucket() == BucketMode.POSTPONE_BUCKET.value else False
self.data_evolution = options.data_evolution_enabled()
Expand Down Expand Up @@ -138,6 +143,14 @@ def with_slice(self, start_pos, end_pos) -> 'FullStartingScanner':
self.end_pos_of_this_subtask = end_pos
return self

def with_sample(self, num_rows: int) -> 'FullStartingScanner':
if self.idx_of_this_subtask is not None:
raise Exception("with_sample and with_shard cannot be used simultaneously now")
if self.start_pos_of_this_subtask is not None:
raise Exception("with_sample and with_slice cannot be used simultaneously now")
self.sample_num_rows = num_rows
return self

@staticmethod
def _append_only_filter_by_slice(partitioned_files: defaultdict,
start_pos: int,
Expand Down Expand Up @@ -202,9 +215,32 @@ def _append_only_filter_by_shard(self, partitioned_files: defaultdict) -> (defau

return self._append_only_filter_by_slice(partitioned_files, start_pos, end_pos)

def _data_evolution_filter_by_row_range(self, partitioned_files: defaultdict,
start_pos: int,
end_pos: int) -> (defaultdict, int, int):
def _append_only_filter_by_sample(self, partitioned_files) -> (defaultdict, Dict[str, List[int]]):
"""
Randomly sample num_rows data from partitioned_files:
1. First use random to generate num_rows indexes
2. Iterate through partitioned_files, find the file entries where corresponding indexes are located,
add them to filtered_partitioned_files, and for each entry, add indexes to the list
"""
# Calculate total number of rows
total_rows = 0
for key, file_entries in partitioned_files.items():
for entry in file_entries:
total_rows += entry.file.row_count

# Generate random sample indexes
sample_indexes = sorted(random.sample(range(total_rows), self.sample_num_rows))

# Map each sample index to its corresponding file and local index
filtered_partitioned_files = defaultdict(list)
file_positions = {} # {file_name: [local_indexes]}
self._generate_file_sample_idx_map(partitioned_files, filtered_partitioned_files, file_positions,
sample_indexes, is_blob=False)
return filtered_partitioned_files, file_positions

def _data_evolution_filter_by_slice(self, partitioned_files: defaultdict,
start_pos: int,
end_pos: int) -> (defaultdict, int, int):
plan_start_pos = 0
plan_end_pos = 0
entry_end_pos = 0 # end row position of current file in all data
Expand Down Expand Up @@ -265,7 +301,76 @@ def _data_evolution_filter_by_shard(self, partitioned_files: defaultdict) -> (de
(self.idx_of_this_subtask - remainder) * base_rows_per_shard)

end_pos = start_pos + num_row
return self._data_evolution_filter_by_row_range(partitioned_files, start_pos, end_pos)
return self._data_evolution_filter_by_slice(partitioned_files, start_pos, end_pos)

def _data_evolution_filter_by_sample(self, partitioned_files) -> (defaultdict, Dict[str, List[int]]):
"""
Randomly sample num_rows data from partitioned_files:
1. First use random to generate num_rows indexes
2. Iterate through partitioned_files, find the file entries where corresponding indexes are located,
add them to filtered_partitioned_files, and for each entry, add indexes to the list
"""
# Calculate total number of rows
total_rows = 0
for key, file_entries in partitioned_files.items():
for entry in file_entries:
if not self._is_blob_file(entry.file.file_name):
total_rows += entry.file.row_count
# Generate random sample indexes
sample_indexes = sorted(random.sample(range(total_rows), self.sample_num_rows))

# Map each sample index to its corresponding file and local index
filtered_partitioned_files = defaultdict(list)
file_positions = {} # {file_name: [local_indexes]}
self._generate_file_sample_idx_map(partitioned_files, filtered_partitioned_files, file_positions,
sample_indexes, is_blob=False)
if self.data_evolution:
self._generate_file_sample_idx_map(partitioned_files, filtered_partitioned_files, file_positions,
sample_indexes, is_blob=True)

return filtered_partitioned_files, file_positions

def _generate_file_sample_idx_map(self, partitioned_files, filtered_partitioned_files, file_positions,
sample_indexes, is_blob):
current_row = 0
sample_idx = 0

for key, file_entries in partitioned_files.items():
filtered_entries = []
for entry in file_entries:
if not is_blob and self._is_blob_file(entry.file.file_name):
continue
if is_blob and not self._is_blob_file(entry.file.file_name):
continue
file_start_row = current_row
file_end_row = current_row + entry.file.row_count

# Find all sample indexes that fall within this file
local_indexes = []
while sample_idx < len(sample_indexes) and sample_indexes[sample_idx] < file_end_row:
if sample_indexes[sample_idx] >= file_start_row:
# Convert global index to local index within this file
local_index = sample_indexes[sample_idx] - file_start_row
local_indexes.append(range(local_index, local_index + 1))
sample_idx += 1

# If this file contains any sampled rows, include it
if local_indexes:
filtered_entries.append(entry)
file_positions[entry.file.file_name] = local_indexes

current_row = file_end_row

# Early exit if we've processed all sample indexes
if sample_idx >= len(sample_indexes):
break

if filtered_entries:
filtered_partitioned_files[key] = filtered_partitioned_files.get(key, []) + filtered_entries

# Early exit if we've processed all sample indexes
if sample_idx >= len(sample_indexes):
break

def _compute_split_start_end_pos(self, splits: List[Split], plan_start_pos, plan_end_pos):
"""
Expand Down Expand Up @@ -305,16 +410,16 @@ def _compute_split_file_idx_map(self, plan_start_pos, plan_end_pos, split: Split
file_begin_pos = file_end_pos # Starting row position of current file in all data
file_end_pos += file.row_count # Update to row position after current file
if file_begin_pos <= plan_start_pos < plan_end_pos <= file_end_pos:
split.shard_file_idx_map[file.file_name] = (
plan_start_pos - file_begin_pos, plan_end_pos - file_begin_pos)
split.shard_file_idx_map[file.file_name] = [
range(plan_start_pos - file_begin_pos, plan_end_pos - file_begin_pos)]
# If shard start position is within current file, record actual start position and relative offset
elif file_begin_pos < plan_start_pos < file_end_pos:
split.shard_file_idx_map[file.file_name] = (plan_start_pos - file_begin_pos, file.row_count)
split.shard_file_idx_map[file.file_name] = [range(plan_start_pos - file_begin_pos, file.row_count)]
# If shard end position is within current file, record relative end position
elif file_begin_pos < plan_end_pos < file_end_pos:
split.shard_file_idx_map[file.file_name] = (0, plan_end_pos - file_begin_pos)
split.shard_file_idx_map[file.file_name] = [range(0, plan_end_pos - file_begin_pos)]
elif file_end_pos <= plan_start_pos or file_begin_pos >= plan_end_pos:
split.shard_file_idx_map[file.file_name] = (-1, -1)
split.shard_file_idx_map[file.file_name] = [range(-1, -1)]
return file_end_pos

def _primary_key_filter_by_shard(self, file_entries: List[ManifestEntry]) -> List[ManifestEntry]:
Expand Down Expand Up @@ -482,6 +587,8 @@ def _create_append_only_splits(
self.end_pos_of_this_subtask)
elif self.idx_of_this_subtask is not None:
partitioned_files, plan_start_pos, plan_end_pos = self._append_only_filter_by_shard(partitioned_files)
elif self.sample_num_rows is not None:
partitioned_files, self.file_positions = self._append_only_filter_by_sample(partitioned_files)

def weight_func(f: DataFileMeta) -> int:
return max(f.file_size, self.open_file_cost)
Expand All @@ -499,6 +606,12 @@ def weight_func(f: DataFileMeta) -> int:
if self.start_pos_of_this_subtask is not None or self.idx_of_this_subtask is not None:
# When files are combined into splits, it is necessary to find files that needs to be divided for each split
self._compute_split_start_end_pos(splits, plan_start_pos, plan_end_pos)
elif self.sample_num_rows is not None:
# Set sample file positions for each split
for split in splits:
for file in split.files:
split.shard_file_idx_map[file.file_name] = self.file_positions[file.file_name]

return splits

def _without_delete_row(self, data_file_meta: DataFileMeta) -> bool:
Expand Down Expand Up @@ -578,21 +691,21 @@ def sort_key(manifest_entry: ManifestEntry) -> tuple:
max_seq = manifest_entry.file.max_sequence_number
return first_row_id, is_blob, -max_seq

sorted_entries = sorted(file_entries, key=sort_key)

partitioned_files = defaultdict(list)
for entry in sorted_entries:
for entry in file_entries:
partitioned_files[(tuple(entry.partition.values), entry.bucket)].append(entry)

if self.start_pos_of_this_subtask is not None:
# shard data range: [plan_start_pos, plan_end_pos)
partitioned_files, plan_start_pos, plan_end_pos = \
self._data_evolution_filter_by_row_range(partitioned_files,
self.start_pos_of_this_subtask,
self.end_pos_of_this_subtask)
self._data_evolution_filter_by_slice(partitioned_files,
self.start_pos_of_this_subtask,
self.end_pos_of_this_subtask)
elif self.idx_of_this_subtask is not None:
# shard data range: [plan_start_pos, plan_end_pos)
partitioned_files, plan_start_pos, plan_end_pos = self._data_evolution_filter_by_shard(partitioned_files)
elif self.sample_num_rows is not None:
partitioned_files, self.file_positions = self._data_evolution_filter_by_sample(partitioned_files)

def weight_func(file_list: List[DataFileMeta]) -> int:
return max(sum(f.file_size for f in file_list), self.open_file_cost)
Expand All @@ -601,7 +714,7 @@ def weight_func(file_list: List[DataFileMeta]) -> int:
for key, sorted_entries in partitioned_files.items():
if not sorted_entries:
continue

sorted_entries = sorted(sorted_entries, key=sort_key)
data_files: List[DataFileMeta] = [e.file for e in sorted_entries]

# Split files by firstRowId for data evolution
Expand All @@ -621,6 +734,11 @@ def weight_func(file_list: List[DataFileMeta]) -> int:

if self.start_pos_of_this_subtask is not None or self.idx_of_this_subtask is not None:
self._compute_split_start_end_pos(splits, plan_start_pos, plan_end_pos)
elif self.sample_num_rows is not None:
# Set sample file positions for each split
for split in splits:
for file in split.files:
split.shard_file_idx_map[file.file_name] = self.file_positions[file.file_name]
return splits

def _split_by_row_id(self, files: List[DataFileMeta]) -> List[List[DataFileMeta]]:
Expand Down
4 changes: 2 additions & 2 deletions paimon-python/pypaimon/read/split.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
################################################################################

from dataclasses import dataclass, field
from typing import List, Optional, Dict, Tuple
from typing import List, Optional, Dict

from pypaimon.manifest.schema.data_file_meta import DataFileMeta
from pypaimon.table.row.generic_row import GenericRow
Expand All @@ -33,7 +33,7 @@ class Split:
_file_paths: List[str]
_row_count: int
_file_size: int
shard_file_idx_map: Dict[str, Tuple[int, int]] = field(default_factory=dict) # file_name -> (start_idx, end_idx)
shard_file_idx_map: Dict[str, List[range]] = field(default_factory=dict) # file_name -> [ranges]
raw_convertible: bool = False
data_deletion_files: Optional[List[DeletionFile]] = None

Expand Down
Loading