From b64c5d9812c7dceeaa860256609d1e85f59a99c6 Mon Sep 17 00:00:00 2001 From: JE-Chen <33644111+JE-Chen@users.noreply.github.com> Date: Tue, 17 Sep 2024 13:59:44 +0800 Subject: [PATCH 1/4] Use ArrowScan.to_table to replace project_table * Use ArrowScan.to_table to replace project_table on these file: ** pyiceberg\table\__init__.py ** pyiceberg\io\pyarrow.py ** pyiceberg\test_pyarrow.py --- pyiceberg/io/pyarrow.py | 1 + pyiceberg/table/__init__.py | 9 ++++----- tests/io/test_pyarrow.py | 28 +++++++++++++++------------- 3 files changed, 20 insertions(+), 18 deletions(-) diff --git a/pyiceberg/io/pyarrow.py b/pyiceberg/io/pyarrow.py index 999813d0c2..d0a6370f06 100644 --- a/pyiceberg/io/pyarrow.py +++ b/pyiceberg/io/pyarrow.py @@ -1379,6 +1379,7 @@ def _projected_field_ids(self) -> Set[int]: if not isinstance(self._projected_schema.find_type(id), (MapType, ListType)) }.union(extract_field_ids(self._bound_row_filter)) + def to_table(self, tasks: Iterable[FileScanTask]) -> pa.Table: """Scan the Iceberg table and return a pa.Table. diff --git a/pyiceberg/table/__init__.py b/pyiceberg/table/__init__.py index 3eedff4581..6804de784a 100644 --- a/pyiceberg/table/__init__.py +++ b/pyiceberg/table/__init__.py @@ -525,7 +525,7 @@ def delete(self, delete_filter: Union[str, BooleanExpression], snapshot_properti from pyiceberg.io.pyarrow import ( _dataframe_to_data_files, _expression_to_complementary_pyarrow, - project_table, + ArrowScan, ) if ( @@ -559,13 +559,12 @@ def delete(self, delete_filter: Union[str, BooleanExpression], snapshot_properti # - Apply the latest partition-spec # - And sort order when added for original_file in files: - df = project_table( - tasks=[original_file], + df = ArrowScan( table_metadata=self.table_metadata, io=self._table.io, - row_filter=AlwaysTrue(), projected_schema=self.table_metadata.schema(), - ) + row_filter=AlwaysTrue(), + ).to_table([original_file]) filtered_df = df.filter(preserve_row_filter) # Only rewrite if there are records being deleted diff --git a/tests/io/test_pyarrow.py b/tests/io/test_pyarrow.py index 82b35341b9..86b6845d40 100644 --- a/tests/io/test_pyarrow.py +++ b/tests/io/test_pyarrow.py @@ -69,6 +69,7 @@ _to_requested_schema, bin_pack_arrow_table, expression_to_pyarrow, + ArrowScan, project_table, schema_to_pyarrow, ) @@ -952,7 +953,19 @@ def file_map(schema_map: Schema, tmpdir: str) -> str: def project( schema: Schema, files: List[str], expr: Optional[BooleanExpression] = None, table_schema: Optional[Schema] = None ) -> pa.Table: - return project_table( + return ArrowScan( + table_metadata=TableMetadataV2( + location="file://a/b/", + last_column_id=1, + format_version=2, + schemas=[table_schema or schema], + partition_specs=[PartitionSpec()], + ), + io=PyArrowFileIO(), + projected_schema=schema, + row_filter=expr or AlwaysTrue(), + case_sensitive=True, + ).to_table( tasks=[ FileScanTask( DataFile( @@ -965,18 +978,7 @@ def project( ) ) for file in files - ], - table_metadata=TableMetadataV2( - location="file://a/b/", - last_column_id=1, - format_version=2, - schemas=[table_schema or schema], - partition_specs=[PartitionSpec()], - ), - io=PyArrowFileIO(), - row_filter=expr or AlwaysTrue(), - projected_schema=schema, - case_sensitive=True, + ] ) From 10c7b5ec688396fca58149397a15c5378f1b0905 Mon Sep 17 00:00:00 2001 From: JE-Chen <33644111+JE-Chen@users.noreply.github.com> Date: Tue, 17 Sep 2024 14:12:02 +0800 Subject: [PATCH 2/4] Replace all remaining of project_table using ArrowScan.to_table Replace all remaining of project_table using ArrowScan.to_table --- tests/io/test_pyarrow.py | 22 ++++++++++++---------- 1 file changed, 12 insertions(+), 10 deletions(-) diff --git a/tests/io/test_pyarrow.py b/tests/io/test_pyarrow.py index 86b6845d40..b062bf2a4e 100644 --- a/tests/io/test_pyarrow.py +++ b/tests/io/test_pyarrow.py @@ -70,7 +70,6 @@ bin_pack_arrow_table, expression_to_pyarrow, ArrowScan, - project_table, schema_to_pyarrow, ) from pyiceberg.manifest import DataFile, DataFileContent, FileFormat @@ -1413,9 +1412,7 @@ def test_delete(deletes_file: str, example_task: FileScanTask, table_schema_simp data_file=example_task.file, delete_files={DataFile(content=DataFileContent.POSITION_DELETES, file_path=deletes_file, file_format=FileFormat.PARQUET)}, ) - - with_deletes = project_table( - tasks=[example_task_with_delete], + with_deletes = ArrowScan( table_metadata=TableMetadataV2( location=metadata_location, last_column_id=1, @@ -1425,8 +1422,10 @@ def test_delete(deletes_file: str, example_task: FileScanTask, table_schema_simp partition_specs=[PartitionSpec()], ), io=load_file_io(), - row_filter=AlwaysTrue(), projected_schema=table_schema_simple, + row_filter=AlwaysTrue(), + ).to_table( + tasks=[example_task_with_delete] ) assert ( @@ -1452,8 +1451,7 @@ def test_delete_duplicates(deletes_file: str, example_task: FileScanTask, table_ }, ) - with_deletes = project_table( - tasks=[example_task_with_delete], + with_deletes = ArrowScan( table_metadata=TableMetadataV2( location=metadata_location, last_column_id=1, @@ -1463,8 +1461,10 @@ def test_delete_duplicates(deletes_file: str, example_task: FileScanTask, table_ partition_specs=[PartitionSpec()], ), io=load_file_io(), - row_filter=AlwaysTrue(), projected_schema=table_schema_simple, + row_filter=AlwaysTrue(), + ).to_table( + tasks=[example_task_with_delete] ) assert ( @@ -1482,8 +1482,8 @@ def test_delete_duplicates(deletes_file: str, example_task: FileScanTask, table_ def test_pyarrow_wrap_fsspec(example_task: FileScanTask, table_schema_simple: Schema) -> None: metadata_location = "file://a/b/c.json" - projection = project_table( - tasks=[example_task], + + projection = ArrowScan( table_metadata=TableMetadataV2( location=metadata_location, last_column_id=1, @@ -1496,6 +1496,8 @@ def test_pyarrow_wrap_fsspec(example_task: FileScanTask, table_schema_simple: Sc case_sensitive=True, projected_schema=table_schema_simple, row_filter=AlwaysTrue(), + ).to_table( + tasks=[example_task] ) assert ( From d5fa5f9838c1c9c3ae9e21f0ab52bbe8553d152b Mon Sep 17 00:00:00 2001 From: JE-Chen <33644111+JE-Chen@users.noreply.github.com> Date: Tue, 17 Sep 2024 23:49:59 +0800 Subject: [PATCH 3/4] Fix format Fix format --- pyiceberg/io/pyarrow.py | 1 - pyiceberg/table/__init__.py | 4 +++- 2 files changed, 3 insertions(+), 2 deletions(-) diff --git a/pyiceberg/io/pyarrow.py b/pyiceberg/io/pyarrow.py index d0a6370f06..999813d0c2 100644 --- a/pyiceberg/io/pyarrow.py +++ b/pyiceberg/io/pyarrow.py @@ -1379,7 +1379,6 @@ def _projected_field_ids(self) -> Set[int]: if not isinstance(self._projected_schema.find_type(id), (MapType, ListType)) }.union(extract_field_ids(self._bound_row_filter)) - def to_table(self, tasks: Iterable[FileScanTask]) -> pa.Table: """Scan the Iceberg table and return a pa.Table. diff --git a/pyiceberg/table/__init__.py b/pyiceberg/table/__init__.py index 6804de784a..b4edfc3a59 100644 --- a/pyiceberg/table/__init__.py +++ b/pyiceberg/table/__init__.py @@ -564,7 +564,9 @@ def delete(self, delete_filter: Union[str, BooleanExpression], snapshot_properti io=self._table.io, projected_schema=self.table_metadata.schema(), row_filter=AlwaysTrue(), - ).to_table([original_file]) + ).to_table( + tasks=[original_file] + ) filtered_df = df.filter(preserve_row_filter) # Only rewrite if there are records being deleted From 4a47b6148ff6b6f06b42c6daf6b4b5d19fd34e90 Mon Sep 17 00:00:00 2001 From: JE-Chen <33644111+JE-Chen@users.noreply.github.com> Date: Tue, 17 Sep 2024 23:55:04 +0800 Subject: [PATCH 4/4] Modify by ruff Modify by ruff --- pyiceberg/table/__init__.py | 6 ++---- tests/io/test_pyarrow.py | 14 ++++---------- 2 files changed, 6 insertions(+), 14 deletions(-) diff --git a/pyiceberg/table/__init__.py b/pyiceberg/table/__init__.py index b4edfc3a59..de621ead76 100644 --- a/pyiceberg/table/__init__.py +++ b/pyiceberg/table/__init__.py @@ -523,9 +523,9 @@ def delete(self, delete_filter: Union[str, BooleanExpression], snapshot_properti snapshot_properties: Custom properties to be added to the snapshot summary """ from pyiceberg.io.pyarrow import ( + ArrowScan, _dataframe_to_data_files, _expression_to_complementary_pyarrow, - ArrowScan, ) if ( @@ -564,9 +564,7 @@ def delete(self, delete_filter: Union[str, BooleanExpression], snapshot_properti io=self._table.io, projected_schema=self.table_metadata.schema(), row_filter=AlwaysTrue(), - ).to_table( - tasks=[original_file] - ) + ).to_table(tasks=[original_file]) filtered_df = df.filter(preserve_row_filter) # Only rewrite if there are records being deleted diff --git a/tests/io/test_pyarrow.py b/tests/io/test_pyarrow.py index b062bf2a4e..e4017e1df5 100644 --- a/tests/io/test_pyarrow.py +++ b/tests/io/test_pyarrow.py @@ -58,6 +58,7 @@ from pyiceberg.io import InputStream, OutputStream, load_file_io from pyiceberg.io.pyarrow import ( ICEBERG_SCHEMA, + ArrowScan, PyArrowFile, PyArrowFileIO, StatsAggregator, @@ -69,7 +70,6 @@ _to_requested_schema, bin_pack_arrow_table, expression_to_pyarrow, - ArrowScan, schema_to_pyarrow, ) from pyiceberg.manifest import DataFile, DataFileContent, FileFormat @@ -1424,9 +1424,7 @@ def test_delete(deletes_file: str, example_task: FileScanTask, table_schema_simp io=load_file_io(), projected_schema=table_schema_simple, row_filter=AlwaysTrue(), - ).to_table( - tasks=[example_task_with_delete] - ) + ).to_table(tasks=[example_task_with_delete]) assert ( str(with_deletes) @@ -1463,9 +1461,7 @@ def test_delete_duplicates(deletes_file: str, example_task: FileScanTask, table_ io=load_file_io(), projected_schema=table_schema_simple, row_filter=AlwaysTrue(), - ).to_table( - tasks=[example_task_with_delete] - ) + ).to_table(tasks=[example_task_with_delete]) assert ( str(with_deletes) @@ -1496,9 +1492,7 @@ def test_pyarrow_wrap_fsspec(example_task: FileScanTask, table_schema_simple: Sc case_sensitive=True, projected_schema=table_schema_simple, row_filter=AlwaysTrue(), - ).to_table( - tasks=[example_task] - ) + ).to_table(tasks=[example_task]) assert ( str(projection)