Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion paimon-python/dev/lint-python.sh
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ function collect_checks() {
function get_all_supported_checks() {
_OLD_IFS=$IFS
IFS=$'\n'
SUPPORT_CHECKS=("flake8_check" "pytest_torch_check" "pytest_check" "mixed_check") # control the calling sequence
SUPPORT_CHECKS=("flake8_check" "pytest_check" "pytest_torch_check" "mixed_check") # control the calling sequence
for fun in $(declare -F); do
if [[ `regexp_match "$fun" "_check$"` = true ]]; then
check_name="${fun:11}"
Expand Down
753 changes: 391 additions & 362 deletions paimon-python/pypaimon/read/scanner/full_starting_scanner.py

Large diffs are not rendered by default.

16 changes: 13 additions & 3 deletions paimon-python/pypaimon/read/table_scan.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@

from pypaimon.read.plan import Plan
from pypaimon.read.scanner.empty_starting_scanner import EmptyStartingScanner
from pypaimon.read.scanner.full_starting_scanner import FullStartingScanner
from pypaimon.read.scanner.full_starting_scanner import FullStartingScanner, PartialStartingScanner
from pypaimon.read.scanner.incremental_starting_scanner import \
IncrementalStartingScanner
from pypaimon.read.scanner.starting_scanner import StartingScanner
Expand All @@ -39,9 +39,12 @@ def __init__(self, table, predicate: Optional[Predicate], limit: Optional[int]):
self.table: FileStoreTable = table
self.predicate = predicate
self.limit = limit
self.starting_scanner = self._create_starting_scanner()
self.starting_scanner = None
self.partial_read: Optional[bool] = None

def plan(self) -> Plan:
if self.starting_scanner is None:
self.starting_scanner = self._create_starting_scanner()
return self.starting_scanner.scan()

def _create_starting_scanner(self) -> Optional[StartingScanner]:
Expand All @@ -66,12 +69,19 @@ def _create_starting_scanner(self) -> Optional[StartingScanner]:
return EmptyStartingScanner()
return IncrementalStartingScanner.between_timestamps(self.table, self.predicate, self.limit,
start_timestamp, end_timestamp)
return FullStartingScanner(self.table, self.predicate, self.limit)
elif self.partial_read:
return PartialStartingScanner(self.table, self.predicate, self.limit)
else:
return FullStartingScanner(self.table, self.predicate, self.limit)

def with_shard(self, idx_of_this_subtask, number_of_para_subtasks) -> 'TableScan':
self.partial_read = True
self.starting_scanner = self._create_starting_scanner()
self.starting_scanner.with_shard(idx_of_this_subtask, number_of_para_subtasks)
return self
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If INCREMENTAL_BETWEEN_TIMESTAMP is configured, _create_starting_scanner() returns IncrementalStartingScanner which doesn't have with_shard(). This will cause AttributeError.


def with_slice(self, start_pos, end_pos) -> 'TableScan':
self.partial_read = True
self.starting_scanner = self._create_starting_scanner()
self.starting_scanner.with_slice(start_pos, end_pos)
return self
17 changes: 10 additions & 7 deletions paimon-python/pypaimon/tests/reader_base_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -512,6 +512,7 @@ def test_primary_key_value_stats_excludes_system_fields(self):

pk_read_builder = pk_table.new_read_builder()
pk_table_scan = pk_read_builder.new_scan()
pk_table_scan.starting_scanner = pk_table_scan._create_starting_scanner()
latest_snapshot = SnapshotManager(pk_table).get_latest_snapshot()
pk_manifest_files = pk_table_scan.starting_scanner.manifest_list_manager.read_all(latest_snapshot)
pk_manifest_entries = pk_table_scan.starting_scanner.manifest_file_manager.read(
Expand Down Expand Up @@ -582,6 +583,7 @@ def test_value_stats_empty_when_stats_disabled(self):

read_builder = table.new_read_builder()
table_scan = read_builder.new_scan()
table_scan.starting_scanner = table_scan._create_starting_scanner()
latest_snapshot = SnapshotManager(table).get_latest_snapshot()
manifest_files = table_scan.starting_scanner.manifest_list_manager.read_all(latest_snapshot)
manifest_entries = table_scan.starting_scanner.manifest_file_manager.read(
Expand Down Expand Up @@ -1089,6 +1091,7 @@ def test_primary_key_value_stats(self):
# Read manifest to verify value_stats_cols is None (all fields included)
read_builder = table.new_read_builder()
table_scan = read_builder.new_scan()
table_scan.starting_scanner = table_scan._create_starting_scanner()
latest_snapshot = SnapshotManager(table).get_latest_snapshot()
manifest_files = table_scan.starting_scanner.manifest_list_manager.read_all(latest_snapshot)
manifest_entries = table_scan.starting_scanner.manifest_file_manager.read(
Expand All @@ -1111,20 +1114,20 @@ def test_primary_key_value_stats(self):

value_stats = file_meta.value_stats
self.assertIsNotNone(value_stats, "value_stats should not be None")

if file_meta.value_stats_cols is None:
expected_value_fields = ['name', 'price', 'category']
self.assertGreaterEqual(value_stats.min_values.arity, len(expected_value_fields),
f"value_stats should contain at least {len(expected_value_fields)} value fields")
else:
self.assertNotIn('id', file_meta.value_stats_cols,
"Key field 'id' should NOT be in value_stats_cols")

expected_value_fields = ['name', 'price', 'category']
self.assertTrue(set(expected_value_fields).issubset(set(file_meta.value_stats_cols)),
f"value_stats_cols should contain value fields: {expected_value_fields}, "
f"but got: {file_meta.value_stats_cols}")

expected_arity = len(file_meta.value_stats_cols)
self.assertEqual(value_stats.min_values.arity, expected_arity,
f"value_stats should contain {expected_arity} fields (matching value_stats_cols), "
Expand All @@ -1135,17 +1138,17 @@ def test_primary_key_value_stats(self):
self.assertEqual(len(value_stats.null_counts), expected_arity,
f"value_stats null_counts should have {expected_arity} elements, "
f"but got {len(value_stats.null_counts)}")

self.assertEqual(value_stats.min_values.arity, len(file_meta.value_stats_cols),
f"value_stats.min_values.arity ({value_stats.min_values.arity}) must match "
f"value_stats_cols length ({len(file_meta.value_stats_cols)})")

for field_name in file_meta.value_stats_cols:
is_system_field = (field_name.startswith('_KEY_') or
field_name in ['_SEQUENCE_NUMBER', '_VALUE_KIND', '_ROW_ID'])
self.assertFalse(is_system_field,
f"value_stats_cols should not contain system field: {field_name}")

value_stats_fields = table_scan.starting_scanner.manifest_file_manager._get_value_stats_fields(
{'_VALUE_STATS_COLS': file_meta.value_stats_cols},
table.fields
Expand All @@ -1161,7 +1164,7 @@ def test_primary_key_value_stats(self):

self.assertEqual(len(min_value_stats), 3, "min_value_stats should have 3 values")
self.assertEqual(len(max_value_stats), 3, "max_value_stats should have 3 values")

actual_data = read_builder.new_read().to_arrow(table_scan.plan().splits())
self.assertEqual(actual_data.num_rows, 5, "Should have 5 rows")
actual_ids = sorted(actual_data.column('id').to_pylist())
Expand Down
1 change: 1 addition & 0 deletions paimon-python/pypaimon/tests/reader_primary_key_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -358,6 +358,7 @@ def test_manifest_creation_time_timestamp(self):
latest_snapshot = snapshot_manager.get_latest_snapshot()
read_builder = table.new_read_builder()
table_scan = read_builder.new_scan()
table_scan.starting_scanner = table_scan._create_starting_scanner()
manifest_list_manager = table_scan.starting_scanner.manifest_list_manager
manifest_files = manifest_list_manager.read_all(latest_snapshot)

Expand Down
5 changes: 4 additions & 1 deletion paimon-python/pypaimon/tests/schema_evolution_read_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -259,12 +259,15 @@ def test_schema_evolution_with_scan_filter(self):
schema_manager.commit(TableSchema.from_schema(schema_id=0, schema=schema))
schema_manager.commit(TableSchema.from_schema(schema_id=1, schema=schema2))
# scan filter for schema evolution
latest_snapshot = table1.new_read_builder().new_scan().starting_scanner.snapshot_manager.get_latest_snapshot()
table_scan = table1.new_read_builder().new_scan()
table_scan.starting_scanner = table_scan._create_starting_scanner()
latest_snapshot = table_scan.starting_scanner.snapshot_manager.get_latest_snapshot()
table2.table_path = table1.table_path
new_read_buidler = table2.new_read_builder()
predicate_builder = new_read_buidler.new_predicate_builder()
predicate = predicate_builder.less_than('user_id', 3)
new_scan = new_read_buidler.with_filter(predicate).new_scan()
new_scan.starting_scanner = new_scan._create_starting_scanner()
manifest_files = new_scan.starting_scanner.manifest_list_manager.read_all(latest_snapshot)
entries = new_scan.starting_scanner.read_manifest_entries(manifest_files)
self.assertEqual(1, len(entries)) # verify scan filter success for schema evolution
Expand Down