diff --git a/docs/content/program-api/python-api.md b/docs/content/program-api/python-api.md index 406c8c1ef69f..a3a4abf44603 100644 --- a/docs/content/program-api/python-api.md +++ b/docs/content/program-api/python-api.md @@ -180,7 +180,7 @@ write_builder = table.new_batch_write_builder() table_write = write_builder.new_write() table_commit = write_builder.new_commit() -# 2. Write data. Support 3 methods: +# 2. Write data. Support 4 methods: # 2.1 Write pandas.DataFrame dataframe = ... table_write.write_pandas(dataframe) @@ -193,7 +193,19 @@ table_write.write_arrow(pa_table) record_batch = ... table_write.write_arrow_batch(record_batch) -# 3. Commit data +# 2.4 Write Ray Dataset (requires ray to be installed) +import ray +ray_dataset = ray.data.read_json("/path/to/data.jsonl") +table_write.write_ray(ray_dataset, overwrite=False, concurrency=2) +# Parameters: +# - dataset: Ray Dataset to write +# - overwrite: Whether to overwrite existing data (default: False) +# - concurrency: Optional max number of concurrent Ray tasks +# - ray_remote_args: Optional kwargs passed to ray.remote() (e.g., {"num_cpus": 2}) +# Note: write_ray() handles commit internally through Ray Datasink API. +# Skip steps 3-4 if using write_ray() - just close the writer. + +# 3. Commit data (required for write_pandas/write_arrow/write_arrow_batch only) commit_messages = table_write.prepare_commit() table_commit.commit(commit_messages) diff --git a/paimon-python/dev/.rat-excludes b/paimon-python/dev/.rat-excludes index 576371afaea1..d2cc8061e94e 100644 --- a/paimon-python/dev/.rat-excludes +++ b/paimon-python/dev/.rat-excludes @@ -18,3 +18,4 @@ .gitignore rat-results.txt +pypaimon/sample/data/data.jsonl diff --git a/paimon-python/pypaimon/sample/data/__init__.py b/paimon-python/pypaimon/sample/data/__init__.py new file mode 100644 index 000000000000..65b48d4d79b4 --- /dev/null +++ b/paimon-python/pypaimon/sample/data/__init__.py @@ -0,0 +1,17 @@ +################################################################################ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +################################################################################ diff --git a/paimon-python/pypaimon/sample/data/data.jsonl b/paimon-python/pypaimon/sample/data/data.jsonl new file mode 100644 index 000000000000..f33de19c57be --- /dev/null +++ b/paimon-python/pypaimon/sample/data/data.jsonl @@ -0,0 +1,100 @@ +{"id": 1, "name": "Item_1", "category": "B", "value": 12.8, "score": 55, "timestamp": "2024-01-02T10:00:00Z"} +{"id": 2, "name": "Item_2", "category": "C", "value": 15.1, "score": 60, "timestamp": "2024-01-03T10:00:00Z"} +{"id": 3, "name": "Item_3", "category": "A", "value": 17.4, "score": 65, "timestamp": "2024-01-04T10:00:00Z"} +{"id": 4, "name": "Item_4", "category": "B", "value": 19.7, "score": 70, "timestamp": "2024-01-05T10:00:00Z"} +{"id": 5, "name": "Item_5", "category": "C", "value": 22.0, "score": 75, "timestamp": "2024-01-06T10:00:00Z"} +{"id": 6, "name": "Item_6", "category": "A", "value": 24.299999999999997, "score": 80, "timestamp": "2024-01-07T10:00:00Z"} +{"id": 7, "name": "Item_7", "category": "B", "value": 26.599999999999998, "score": 85, "timestamp": "2024-01-08T10:00:00Z"} +{"id": 8, "name": "Item_8", "category": "C", "value": 28.9, "score": 90, "timestamp": "2024-01-09T10:00:00Z"} +{"id": 9, "name": "Item_9", "category": "A", "value": 31.2, "score": 95, "timestamp": "2024-01-10T10:00:00Z"} +{"id": 10, "name": "Item_10", "category": "B", "value": 33.5, "score": 100, "timestamp": "2024-01-11T10:00:00Z"} +{"id": 11, "name": "Item_11", "category": "C", "value": 35.8, "score": 105, "timestamp": "2024-01-12T10:00:00Z"} +{"id": 12, "name": "Item_12", "category": "A", "value": 38.099999999999994, "score": 110, "timestamp": "2024-01-13T10:00:00Z"} +{"id": 13, "name": "Item_13", "category": "B", "value": 40.4, "score": 115, "timestamp": "2024-01-14T10:00:00Z"} +{"id": 14, "name": "Item_14", "category": "C", "value": 42.699999999999996, "score": 120, "timestamp": "2024-01-15T10:00:00Z"} +{"id": 15, "name": "Item_15", "category": "A", "value": 45.0, "score": 125, "timestamp": "2024-01-16T10:00:00Z"} +{"id": 16, "name": "Item_16", "category": "B", "value": 47.3, "score": 130, "timestamp": "2024-01-17T10:00:00Z"} +{"id": 17, "name": "Item_17", "category": "C", "value": 49.599999999999994, "score": 135, "timestamp": "2024-01-18T10:00:00Z"} +{"id": 18, "name": "Item_18", "category": "A", "value": 51.9, "score": 140, "timestamp": "2024-01-19T10:00:00Z"} +{"id": 19, "name": "Item_19", "category": "B", "value": 54.199999999999996, "score": 145, "timestamp": "2024-01-20T10:00:00Z"} +{"id": 20, "name": "Item_20", "category": "C", "value": 56.5, "score": 150, "timestamp": "2024-01-21T10:00:00Z"} +{"id": 21, "name": "Item_21", "category": "A", "value": 58.8, "score": 155, "timestamp": "2024-01-22T10:00:00Z"} +{"id": 22, "name": "Item_22", "category": "B", "value": 61.099999999999994, "score": 160, "timestamp": "2024-01-23T10:00:00Z"} +{"id": 23, "name": "Item_23", "category": "C", "value": 63.4, "score": 165, "timestamp": "2024-01-24T10:00:00Z"} +{"id": 24, "name": "Item_24", "category": "A", "value": 65.69999999999999, "score": 170, "timestamp": "2024-01-25T10:00:00Z"} +{"id": 25, "name": "Item_25", "category": "B", "value": 68.0, "score": 175, "timestamp": "2024-01-26T10:00:00Z"} +{"id": 26, "name": "Item_26", "category": "C", "value": 70.3, "score": 180, "timestamp": "2024-01-27T10:00:00Z"} +{"id": 27, "name": "Item_27", "category": "A", "value": 72.6, "score": 185, "timestamp": "2024-01-28T10:00:00Z"} +{"id": 28, "name": "Item_28", "category": "B", "value": 74.89999999999999, "score": 190, "timestamp": "2024-01-01T10:00:00Z"} +{"id": 29, "name": "Item_29", "category": "C", "value": 77.19999999999999, "score": 195, "timestamp": "2024-01-02T10:00:00Z"} +{"id": 30, "name": "Item_30", "category": "A", "value": 79.5, "score": 200, "timestamp": "2024-01-03T10:00:00Z"} +{"id": 31, "name": "Item_31", "category": "B", "value": 81.8, "score": 205, "timestamp": "2024-01-04T10:00:00Z"} +{"id": 32, "name": "Item_32", "category": "C", "value": 84.1, "score": 210, "timestamp": "2024-01-05T10:00:00Z"} +{"id": 33, "name": "Item_33", "category": "A", "value": 86.39999999999999, "score": 215, "timestamp": "2024-01-06T10:00:00Z"} +{"id": 34, "name": "Item_34", "category": "B", "value": 88.69999999999999, "score": 220, "timestamp": "2024-01-07T10:00:00Z"} +{"id": 35, "name": "Item_35", "category": "C", "value": 91.0, "score": 225, "timestamp": "2024-01-08T10:00:00Z"} +{"id": 36, "name": "Item_36", "category": "A", "value": 93.3, "score": 230, "timestamp": "2024-01-09T10:00:00Z"} +{"id": 37, "name": "Item_37", "category": "B", "value": 95.6, "score": 235, "timestamp": "2024-01-10T10:00:00Z"} +{"id": 38, "name": "Item_38", "category": "C", "value": 97.89999999999999, "score": 240, "timestamp": "2024-01-11T10:00:00Z"} +{"id": 39, "name": "Item_39", "category": "A", "value": 100.19999999999999, "score": 245, "timestamp": "2024-01-12T10:00:00Z"} +{"id": 40, "name": "Item_40", "category": "B", "value": 102.5, "score": 250, "timestamp": "2024-01-13T10:00:00Z"} +{"id": 41, "name": "Item_41", "category": "C", "value": 104.8, "score": 255, "timestamp": "2024-01-14T10:00:00Z"} +{"id": 42, "name": "Item_42", "category": "A", "value": 107.1, "score": 260, "timestamp": "2024-01-15T10:00:00Z"} +{"id": 43, "name": "Item_43", "category": "B", "value": 109.39999999999999, "score": 265, "timestamp": "2024-01-16T10:00:00Z"} +{"id": 44, "name": "Item_44", "category": "C", "value": 111.69999999999999, "score": 270, "timestamp": "2024-01-17T10:00:00Z"} +{"id": 45, "name": "Item_45", "category": "A", "value": 113.99999999999999, "score": 275, "timestamp": "2024-01-18T10:00:00Z"} +{"id": 46, "name": "Item_46", "category": "B", "value": 116.3, "score": 280, "timestamp": "2024-01-19T10:00:00Z"} +{"id": 47, "name": "Item_47", "category": "C", "value": 118.6, "score": 285, "timestamp": "2024-01-20T10:00:00Z"} +{"id": 48, "name": "Item_48", "category": "A", "value": 120.89999999999999, "score": 290, "timestamp": "2024-01-21T10:00:00Z"} +{"id": 49, "name": "Item_49", "category": "B", "value": 123.19999999999999, "score": 295, "timestamp": "2024-01-22T10:00:00Z"} +{"id": 50, "name": "Item_50", "category": "C", "value": 125.49999999999999, "score": 300, "timestamp": "2024-01-23T10:00:00Z"} +{"id": 51, "name": "Item_51", "category": "A", "value": 127.8, "score": 305, "timestamp": "2024-01-24T10:00:00Z"} +{"id": 52, "name": "Item_52", "category": "B", "value": 130.1, "score": 310, "timestamp": "2024-01-25T10:00:00Z"} +{"id": 53, "name": "Item_53", "category": "C", "value": 132.39999999999998, "score": 315, "timestamp": "2024-01-26T10:00:00Z"} +{"id": 54, "name": "Item_54", "category": "A", "value": 134.7, "score": 320, "timestamp": "2024-01-27T10:00:00Z"} +{"id": 55, "name": "Item_55", "category": "B", "value": 137.0, "score": 325, "timestamp": "2024-01-28T10:00:00Z"} +{"id": 56, "name": "Item_56", "category": "C", "value": 139.29999999999998, "score": 330, "timestamp": "2024-01-01T10:00:00Z"} +{"id": 57, "name": "Item_57", "category": "A", "value": 141.6, "score": 335, "timestamp": "2024-01-02T10:00:00Z"} +{"id": 58, "name": "Item_58", "category": "B", "value": 143.89999999999998, "score": 340, "timestamp": "2024-01-03T10:00:00Z"} +{"id": 59, "name": "Item_59", "category": "C", "value": 146.2, "score": 345, "timestamp": "2024-01-04T10:00:00Z"} +{"id": 60, "name": "Item_60", "category": "A", "value": 148.5, "score": 350, "timestamp": "2024-01-05T10:00:00Z"} +{"id": 61, "name": "Item_61", "category": "B", "value": 150.79999999999998, "score": 355, "timestamp": "2024-01-06T10:00:00Z"} +{"id": 62, "name": "Item_62", "category": "C", "value": 153.1, "score": 360, "timestamp": "2024-01-07T10:00:00Z"} +{"id": 63, "name": "Item_63", "category": "A", "value": 155.39999999999998, "score": 365, "timestamp": "2024-01-08T10:00:00Z"} +{"id": 64, "name": "Item_64", "category": "B", "value": 157.7, "score": 370, "timestamp": "2024-01-09T10:00:00Z"} +{"id": 65, "name": "Item_65", "category": "C", "value": 160.0, "score": 375, "timestamp": "2024-01-10T10:00:00Z"} +{"id": 66, "name": "Item_66", "category": "A", "value": 162.29999999999998, "score": 380, "timestamp": "2024-01-11T10:00:00Z"} +{"id": 67, "name": "Item_67", "category": "B", "value": 164.6, "score": 385, "timestamp": "2024-01-12T10:00:00Z"} +{"id": 68, "name": "Item_68", "category": "C", "value": 166.89999999999998, "score": 390, "timestamp": "2024-01-13T10:00:00Z"} +{"id": 69, "name": "Item_69", "category": "A", "value": 169.2, "score": 395, "timestamp": "2024-01-14T10:00:00Z"} +{"id": 70, "name": "Item_70", "category": "B", "value": 171.5, "score": 400, "timestamp": "2024-01-15T10:00:00Z"} +{"id": 71, "name": "Item_71", "category": "C", "value": 173.79999999999998, "score": 405, "timestamp": "2024-01-16T10:00:00Z"} +{"id": 72, "name": "Item_72", "category": "A", "value": 176.1, "score": 410, "timestamp": "2024-01-17T10:00:00Z"} +{"id": 73, "name": "Item_73", "category": "B", "value": 178.39999999999998, "score": 415, "timestamp": "2024-01-18T10:00:00Z"} +{"id": 74, "name": "Item_74", "category": "C", "value": 180.7, "score": 420, "timestamp": "2024-01-19T10:00:00Z"} +{"id": 75, "name": "Item_75", "category": "A", "value": 183.0, "score": 425, "timestamp": "2024-01-20T10:00:00Z"} +{"id": 76, "name": "Item_76", "category": "B", "value": 185.29999999999998, "score": 430, "timestamp": "2024-01-21T10:00:00Z"} +{"id": 77, "name": "Item_77", "category": "C", "value": 187.6, "score": 435, "timestamp": "2024-01-22T10:00:00Z"} +{"id": 78, "name": "Item_78", "category": "A", "value": 189.89999999999998, "score": 440, "timestamp": "2024-01-23T10:00:00Z"} +{"id": 79, "name": "Item_79", "category": "B", "value": 192.2, "score": 445, "timestamp": "2024-01-24T10:00:00Z"} +{"id": 80, "name": "Item_80", "category": "C", "value": 194.5, "score": 450, "timestamp": "2024-01-25T10:00:00Z"} +{"id": 81, "name": "Item_81", "category": "A", "value": 196.79999999999998, "score": 455, "timestamp": "2024-01-26T10:00:00Z"} +{"id": 82, "name": "Item_82", "category": "B", "value": 199.1, "score": 460, "timestamp": "2024-01-27T10:00:00Z"} +{"id": 83, "name": "Item_83", "category": "C", "value": 201.39999999999998, "score": 465, "timestamp": "2024-01-28T10:00:00Z"} +{"id": 84, "name": "Item_84", "category": "A", "value": 203.7, "score": 470, "timestamp": "2024-01-01T10:00:00Z"} +{"id": 85, "name": "Item_85", "category": "B", "value": 205.99999999999997, "score": 475, "timestamp": "2024-01-02T10:00:00Z"} +{"id": 86, "name": "Item_86", "category": "C", "value": 208.29999999999998, "score": 480, "timestamp": "2024-01-03T10:00:00Z"} +{"id": 87, "name": "Item_87", "category": "A", "value": 210.6, "score": 485, "timestamp": "2024-01-04T10:00:00Z"} +{"id": 88, "name": "Item_88", "category": "B", "value": 212.89999999999998, "score": 490, "timestamp": "2024-01-05T10:00:00Z"} +{"id": 89, "name": "Item_89", "category": "C", "value": 215.2, "score": 495, "timestamp": "2024-01-06T10:00:00Z"} +{"id": 90, "name": "Item_90", "category": "A", "value": 217.49999999999997, "score": 500, "timestamp": "2024-01-07T10:00:00Z"} +{"id": 91, "name": "Item_91", "category": "B", "value": 219.79999999999998, "score": 505, "timestamp": "2024-01-08T10:00:00Z"} +{"id": 92, "name": "Item_92", "category": "C", "value": 222.1, "score": 510, "timestamp": "2024-01-09T10:00:00Z"} +{"id": 93, "name": "Item_93", "category": "A", "value": 224.39999999999998, "score": 515, "timestamp": "2024-01-10T10:00:00Z"} +{"id": 94, "name": "Item_94", "category": "B", "value": 226.7, "score": 520, "timestamp": "2024-01-11T10:00:00Z"} +{"id": 95, "name": "Item_95", "category": "C", "value": 228.99999999999997, "score": 525, "timestamp": "2024-01-12T10:00:00Z"} +{"id": 96, "name": "Item_96", "category": "A", "value": 231.29999999999998, "score": 530, "timestamp": "2024-01-13T10:00:00Z"} +{"id": 97, "name": "Item_97", "category": "B", "value": 233.6, "score": 535, "timestamp": "2024-01-14T10:00:00Z"} +{"id": 98, "name": "Item_98", "category": "C", "value": 235.89999999999998, "score": 540, "timestamp": "2024-01-15T10:00:00Z"} +{"id": 99, "name": "Item_99", "category": "A", "value": 238.2, "score": 545, "timestamp": "2024-01-16T10:00:00Z"} +{"id": 100, "name": "Item_100", "category": "B", "value": 240.49999999999997, "score": 550, "timestamp": "2024-01-17T10:00:00Z"} diff --git a/paimon-python/pypaimon/sample/rest_catalog_ray_sink_sample.py b/paimon-python/pypaimon/sample/rest_catalog_ray_sink_sample.py new file mode 100644 index 000000000000..feb2471de937 --- /dev/null +++ b/paimon-python/pypaimon/sample/rest_catalog_ray_sink_sample.py @@ -0,0 +1,146 @@ +################################################################################ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +################################################################################ +""" +Example: REST Catalog + Ray Sink with Local File + Mock Server + +Demonstrates reading JSON data from local file using Ray Data and writing to Paimon table +using Ray Sink (write_ray) with a mock REST catalog server. +""" + +import os +import tempfile +import uuid + +import pyarrow as pa +import ray + +from pypaimon import CatalogFactory, Schema +from pypaimon.common.options.core_options import CoreOptions +from pypaimon.schema.data_types import PyarrowFieldParser +from pypaimon.tests.rest.rest_server import RESTCatalogServer +from pypaimon.api.api_response import ConfigResponse +from pypaimon.api.auth import BearTokenAuthProvider + + +def _get_sample_data_path(filename: str) -> str: + sample_dir = os.path.dirname(os.path.abspath(__file__)) + return os.path.join(sample_dir, 'data', filename) + + +def main(): + ray.init(ignore_reinit_error=True, num_cpus=2) + + temp_dir = tempfile.mkdtemp() + token = str(uuid.uuid4()) + server = RESTCatalogServer( + data_path=temp_dir, + auth_provider=BearTokenAuthProvider(token), + config=ConfigResponse(defaults={"prefix": "mock-test"}), + warehouse="warehouse" + ) + server.start() + print(f"REST server started at: {server.get_url()}") + + json_file_path = _get_sample_data_path('data.jsonl') + + try: + catalog = CatalogFactory.create({ + 'metastore': 'rest', + 'uri': f"http://localhost:{server.port}", + 'warehouse': "warehouse", + 'token.provider': 'bear', + 'token': token, + }) + catalog.create_database("default", ignore_if_exists=True) + + schema = Schema.from_pyarrow_schema(pa.schema([ + ('id', pa.int64()), + ('name', pa.string()), + ('category', pa.string()), + ('value', pa.float64()), + ('score', pa.int64()), + ('timestamp', pa.timestamp('s')), + ]), primary_keys=['id'], options={ + CoreOptions.BUCKET.key(): '4' + }) + + table_name = 'default.oss_json_import_table' + catalog.create_table(table_name, schema, ignore_if_exists=True) + table = catalog.get_table(table_name) + + print(f"Reading JSON from local file: {json_file_path}") + ray_dataset = ray.data.read_json( + json_file_path, + concurrency=2, + ) + + print(f"Ray Dataset: {ray_dataset.count()} rows") + print(f"Schema: {ray_dataset.schema()}") + + table_pa_schema = PyarrowFieldParser.from_paimon_schema(table.table_schema.fields) + + def cast_batch_to_table_schema(batch: pa.RecordBatch) -> pa.Table: + arrays = [] + for field in table_pa_schema: + col_name = field.name + col_array = batch.column(col_name) + if isinstance(col_array, pa.ChunkedArray): + col_array = col_array.combine_chunks() + if col_array.type != field.type: + col_array = col_array.cast(field.type) + arrays.append(col_array) + record_batch = pa.RecordBatch.from_arrays(arrays, schema=table_pa_schema) + return pa.Table.from_batches([record_batch]) + + ray_dataset = ray_dataset.map_batches( + cast_batch_to_table_schema, + batch_format="pyarrow", + ) + + write_builder = table.new_batch_write_builder() + table_write = write_builder.new_write() + + table_write.write_ray( + ray_dataset, + overwrite=False, + concurrency=2, + ray_remote_args={"num_cpus": 1} + ) + + # write_ray() has already committed the data, just close the writer + table_write.close() + + print(f"Successfully wrote {ray_dataset.count()} rows to table") + + read_builder = table.new_read_builder() + table_read = read_builder.new_read() + table_scan = read_builder.new_scan() + splits = table_scan.plan().splits() + + result_df = table_read.to_pandas(splits) + print(f"Read back {len(result_df)} rows from table") + print(result_df.head()) + + finally: + server.shutdown() + if ray.is_initialized(): + ray.shutdown() + + +if __name__ == '__main__': + main() diff --git a/paimon-python/pypaimon/tests/ray_data_test.py b/paimon-python/pypaimon/tests/ray_data_test.py index e931a4c7dcc1..00995d697673 100644 --- a/paimon-python/pypaimon/tests/ray_data_test.py +++ b/paimon-python/pypaimon/tests/ray_data_test.py @@ -155,7 +155,7 @@ def test_basic_ray_data_write(self): ds = from_arrow(test_data) write_builder = table.new_batch_write_builder() writer = write_builder.new_write() - writer.write_raydata(ds, parallelism=2) + writer.write_ray(ds, concurrency=2) # Read using Ray Data read_builder = table.new_read_builder() table_read = read_builder.new_read() diff --git a/paimon-python/pypaimon/tests/ray_sink_test.py b/paimon-python/pypaimon/tests/ray_sink_test.py new file mode 100644 index 000000000000..bc1ff3ef9f48 --- /dev/null +++ b/paimon-python/pypaimon/tests/ray_sink_test.py @@ -0,0 +1,343 @@ +################################################################################ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +################################################################################ + +import os +import tempfile +import unittest +from unittest.mock import Mock, patch + +import pyarrow as pa +from ray.data._internal.execution.interfaces import TaskContext + +from pypaimon import CatalogFactory, Schema +from pypaimon.write.ray_datasink import PaimonDatasink +from pypaimon.write.commit_message import CommitMessage + + +class RaySinkTest(unittest.TestCase): + def setUp(self): + self.temp_dir = tempfile.mkdtemp() + self.warehouse_path = os.path.join(self.temp_dir, "warehouse") + os.makedirs(self.warehouse_path, exist_ok=True) + + catalog_options = { + "warehouse": self.warehouse_path + } + self.catalog = CatalogFactory.create(catalog_options) + self.catalog.create_database("test_db", ignore_if_exists=True) + + pa_schema = pa.schema([ + ('id', pa.int64()), + ('name', pa.string()), + ('value', pa.float64()) + ]) + + schema = Schema.from_pyarrow_schema( + pa_schema=pa_schema, + partition_keys=None, + primary_keys=['id'], + options={'bucket': '2'}, # Use fixed bucket mode for testing + comment='test table' + ) + + self.table_identifier = "test_db.test_table" + self.catalog.create_table(self.table_identifier, schema, ignore_if_exists=False) + self.table = self.catalog.get_table(self.table_identifier) + + def tearDown(self): + import shutil + if os.path.exists(self.temp_dir): + shutil.rmtree(self.temp_dir) + + def test_init_and_serialization(self): + """Test initialization, serialization, and table name.""" + datasink = PaimonDatasink(self.table, overwrite=False) + self.assertEqual(datasink.table, self.table) + self.assertFalse(datasink.overwrite) + self.assertIsNone(datasink._writer_builder) + self.assertEqual(datasink._table_name, "test_db.test_table") + + datasink_overwrite = PaimonDatasink(self.table, overwrite=True) + self.assertTrue(datasink_overwrite.overwrite) + + # Test serialization + datasink._writer_builder = Mock() + state = datasink.__getstate__() + self.assertIn('table', state) + self.assertIn('overwrite', state) + self.assertIn('_writer_builder', state) + + new_datasink = PaimonDatasink.__new__(PaimonDatasink) + new_datasink.__setstate__(state) + self.assertEqual(new_datasink.table, self.table) + self.assertFalse(new_datasink.overwrite) + + def test_table_and_writer_builder_serializable(self): + import pickle + try: + pickled_table = pickle.dumps(self.table) + unpickled_table = pickle.loads(pickled_table) + self.assertIsNotNone(unpickled_table) + builder = unpickled_table.new_batch_write_builder() + self.assertIsNotNone(builder) + except Exception as e: + self.fail(f"Table object is not serializable: {e}") + + writer_builder = self.table.new_batch_write_builder() + try: + pickled_builder = pickle.dumps(writer_builder) + unpickled_builder = pickle.loads(pickled_builder) + self.assertIsNotNone(unpickled_builder) + table_write = unpickled_builder.new_write() + self.assertIsNotNone(table_write) + table_write.close() + except Exception as e: + self.fail(f"WriterBuilder is not serializable: {e}") + + overwrite_builder = self.table.new_batch_write_builder().overwrite() + try: + pickled_overwrite = pickle.dumps(overwrite_builder) + unpickled_overwrite = pickle.loads(pickled_overwrite) + self.assertIsNotNone(unpickled_overwrite) + # static_partition is a dict, empty dict {} means overwrite all partitions + self.assertIsNotNone(unpickled_overwrite.static_partition) + self.assertIsInstance(unpickled_overwrite.static_partition, dict) + except Exception as e: + self.fail(f"Overwrite WriterBuilder is not serializable: {e}") + + def test_on_write_start(self): + """Test on_write_start with normal and overwrite modes.""" + datasink = PaimonDatasink(self.table, overwrite=False) + datasink.on_write_start() + self.assertIsNotNone(datasink._writer_builder) + self.assertFalse(datasink._writer_builder.static_partition) + + datasink_overwrite = PaimonDatasink(self.table, overwrite=True) + datasink_overwrite.on_write_start() + self.assertIsNotNone(datasink_overwrite._writer_builder.static_partition) + + def test_write(self): + """Test write method: empty blocks, multiple blocks, error handling, and resource cleanup.""" + datasink = PaimonDatasink(self.table, overwrite=False) + datasink.on_write_start() + ctx = Mock(spec=TaskContext) + + # Test empty block + empty_table = pa.table({ + 'id': pa.array([], type=pa.int64()), + 'name': pa.array([], type=pa.string()), + 'value': pa.array([], type=pa.float64()) + }) + result = datasink.write([empty_table], ctx) + self.assertEqual(result, []) + + # Test single and multiple blocks + single_block = pa.table({ + 'id': [1, 2, 3], + 'name': ['Alice', 'Bob', 'Charlie'], + 'value': [1.1, 2.2, 3.3] + }) + result = datasink.write([single_block], ctx) + self.assertIsInstance(result, list) + if result: + self.assertTrue(all(isinstance(msg, CommitMessage) for msg in result)) + + block1 = pa.table({ + 'id': [4, 5], + 'name': ['David', 'Eve'], + 'value': [4.4, 5.5] + }) + block2 = pa.table({ + 'id': [6, 7], + 'name': ['Frank', 'Grace'], + 'value': [6.6, 7.7] + }) + result = datasink.write([block1, block2], ctx) + self.assertIsInstance(result, list) + if result: + self.assertTrue(all(isinstance(msg, CommitMessage) for msg in result)) + + # Test that write creates WriteBuilder on worker (not using driver's builder) + with patch.object(self.table, 'new_batch_write_builder') as mock_builder: + mock_write_builder = Mock() + mock_write_builder.overwrite.return_value = mock_write_builder + mock_write = Mock() + mock_write.prepare_commit.return_value = [] + mock_write_builder.new_write.return_value = mock_write + mock_builder.return_value = mock_write_builder + + data_table = pa.table({ + 'id': [1], + 'name': ['Alice'], + 'value': [1.1] + }) + datasink.write([data_table], ctx) + mock_builder.assert_called_once() + + invalid_table = pa.table({ + 'wrong_column': [1, 2, 3] + }) + with self.assertRaises(Exception): + datasink.write([invalid_table], ctx) + + with patch.object(self.table, 'new_batch_write_builder') as mock_builder: + mock_write_builder = Mock() + mock_write_builder.overwrite.return_value = mock_write_builder + mock_write = Mock() + mock_write.write_arrow.side_effect = Exception("Write error") + mock_write_builder.new_write.return_value = mock_write + mock_builder.return_value = mock_write_builder + + data_table = pa.table({ + 'id': [1], + 'name': ['Alice'], + 'value': [1.1] + }) + with self.assertRaises(Exception): + datasink.write([data_table], ctx) + mock_write.close.assert_called_once() + + def test_on_write_complete(self): + from ray.data.datasource.datasink import WriteResult + + # Test empty messages + datasink = PaimonDatasink(self.table, overwrite=False) + datasink.on_write_start() + write_result = WriteResult( + num_rows=0, + size_bytes=0, + write_returns=[[], []] + ) + datasink.on_write_complete(write_result) + + # Test with messages and filtering empty messages + datasink = PaimonDatasink(self.table, overwrite=False) + datasink.on_write_start() + commit_msg1 = Mock(spec=CommitMessage) + commit_msg1.is_empty.return_value = False + commit_msg2 = Mock(spec=CommitMessage) + commit_msg2.is_empty.return_value = False + empty_msg = Mock(spec=CommitMessage) + empty_msg.is_empty.return_value = True + + write_result = WriteResult( + num_rows=0, + size_bytes=0, + write_returns=[[commit_msg1], [commit_msg2], [empty_msg]] + ) + + mock_commit = Mock() + datasink._writer_builder.new_commit = Mock(return_value=mock_commit) + datasink.on_write_complete(write_result) + + mock_commit.commit.assert_called_once() + commit_args = mock_commit.commit.call_args[0][0] + self.assertEqual(len(commit_args), 2) # Empty message filtered out + mock_commit.close.assert_called_once() + + # Test commit failure: abort should be called + datasink = PaimonDatasink(self.table, overwrite=False) + datasink.on_write_start() + commit_msg1 = Mock(spec=CommitMessage) + commit_msg1.is_empty.return_value = False + commit_msg2 = Mock(spec=CommitMessage) + commit_msg2.is_empty.return_value = False + + write_result = WriteResult( + num_rows=0, + size_bytes=0, + write_returns=[[commit_msg1], [commit_msg2]] + ) + + mock_commit = Mock() + mock_commit.commit.side_effect = Exception("Commit failed") + datasink._writer_builder.new_commit = Mock(return_value=mock_commit) + + with self.assertRaises(Exception): + datasink.on_write_complete(write_result) + + mock_commit.abort.assert_called_once() + abort_args = mock_commit.abort.call_args[0][0] + self.assertEqual(len(abort_args), 2) + mock_commit.close.assert_called_once() + + # Test table_commit creation failure + datasink = PaimonDatasink(self.table, overwrite=False) + datasink.on_write_start() + commit_msg1 = Mock(spec=CommitMessage) + commit_msg1.is_empty.return_value = False + + write_result = WriteResult( + num_rows=0, + size_bytes=0, + write_returns=[[commit_msg1]] + ) + + mock_new_commit = Mock(side_effect=Exception("Failed to create table_commit")) + datasink._writer_builder.new_commit = mock_new_commit + with self.assertRaises(Exception): + datasink.on_write_complete(write_result) + self.assertEqual(len(datasink._pending_commit_messages), 1) + + def test_on_write_failed(self): + # Test without pending messages (on_write_complete() never called) + datasink = PaimonDatasink(self.table, overwrite=False) + datasink.on_write_start() + self.assertEqual(datasink._pending_commit_messages, []) + error = Exception("Write job failed") + datasink.on_write_failed(error) # Should not raise exception + + # Test with pending messages (on_write_complete() was called but failed) + datasink = PaimonDatasink(self.table, overwrite=False) + datasink.on_write_start() + commit_msg1 = Mock(spec=CommitMessage) + commit_msg2 = Mock(spec=CommitMessage) + datasink._pending_commit_messages = [commit_msg1, commit_msg2] + + mock_commit = Mock() + datasink._writer_builder.new_commit = Mock(return_value=mock_commit) + error = Exception("Write job failed") + datasink.on_write_failed(error) + + mock_commit.abort.assert_called_once() + abort_args = mock_commit.abort.call_args[0][0] + self.assertEqual(len(abort_args), 2) + self.assertEqual(abort_args[0], commit_msg1) + self.assertEqual(abort_args[1], commit_msg2) + mock_commit.close.assert_called_once() + self.assertEqual(datasink._pending_commit_messages, []) + + # Test abort failure handling (should not raise exception) + datasink = PaimonDatasink(self.table, overwrite=False) + datasink.on_write_start() + commit_msg1 = Mock(spec=CommitMessage) + datasink._pending_commit_messages = [commit_msg1] + + mock_commit = Mock() + mock_commit.abort.side_effect = Exception("Abort failed") + datasink._writer_builder.new_commit = Mock(return_value=mock_commit) + error = Exception("Write job failed") + datasink.on_write_failed(error) + + mock_commit.abort.assert_called_once() + mock_commit.close.assert_called_once() + self.assertEqual(datasink._pending_commit_messages, []) + + +if __name__ == '__main__': + unittest.main() diff --git a/paimon-python/pypaimon/write/ray_datasink.py b/paimon-python/pypaimon/write/ray_datasink.py index 709a010fa42d..cb0335068339 100644 --- a/paimon-python/pypaimon/write/ray_datasink.py +++ b/paimon-python/pypaimon/write/ray_datasink.py @@ -1,3 +1,4 @@ +################################################################################ # Licensed to the Apache Software Foundation (ASF) under one # or more contributor license agreements. See the NOTICE file # distributed with this work for additional information @@ -15,77 +16,174 @@ # limitations under the License. ################################################################################ """ -Module to reawrited a Paimon table from a Ray Dataset, by using the Ray Datasink API. +Module to write a Paimon table from a Ray Dataset, by using the Ray Datasink API. """ -from typing import Iterable -from ray.data.datasource.datasink import Datasink, WriteResult, WriteReturnType -from pypaimon.table.table import Table -from pypaimon.write.write_builder import WriteBuilder -from ray.data.block import BlockAccessor -from ray.data.block import Block +import logging +from typing import TYPE_CHECKING, Iterable, List, Optional + +from ray.data.datasource.datasink import Datasink, WriteResult +from ray.util.annotations import DeveloperAPI +from ray.data.block import BlockAccessor, Block from ray.data._internal.execution.interfaces import TaskContext import pyarrow as pa +if TYPE_CHECKING: + from pypaimon.table.table import Table + from pypaimon.write.write_builder import WriteBuilder + from pypaimon.write.commit_message import CommitMessage + +logger = logging.getLogger(__name__) -class PaimonDatasink(Datasink): - - def __init__(self, table: Table, overwrite=False): + +@DeveloperAPI +class PaimonDatasink(Datasink[List["CommitMessage"]]): + def __init__( + self, + table: "Table", + overwrite: bool = False, + ): self.table = table self.overwrite = overwrite + self._table_name = table.identifier.get_full_name() + self._writer_builder: Optional["WriteBuilder"] = None + self._pending_commit_messages: List["CommitMessage"] = [] - def on_write_start(self, schema=None) -> None: - """Callback for when a write job starts. + def __getstate__(self) -> dict: + state = self.__dict__.copy() + return state - Use this method to perform setup for write tasks. For example, creating a - staging bucket in S3. + def __setstate__(self, state: dict) -> None: + self.__dict__.update(state) + if self._writer_builder is not None and not hasattr(self._writer_builder, 'table'): + self._writer_builder = None + if not hasattr(self, '_table_name'): + self._table_name = self.table.identifier.get_full_name() - Args: - schema: Optional schema information passed by Ray Data. - """ - self.writer_builder: WriteBuilder = self.table.new_batch_write_builder() + def on_write_start(self, schema=None) -> None: + logger.info(f"Starting write job for table {self._table_name}") + + self._writer_builder = self.table.new_batch_write_builder() if self.overwrite: - self.writer_builder = self.writer_builder.overwrite() + self._writer_builder = self._writer_builder.overwrite() def write( self, blocks: Iterable[Block], ctx: TaskContext, - ) -> WriteReturnType: - """Write blocks. This is used by a single write task. - - Args: - blocks: Generator of data blocks. - ctx: ``TaskContext`` for the write task. - - Returns: - Result of this write task. When the entire write operator finishes, - All returned values will be passed as `WriteResult.write_returns` - to `Datasink.on_write_complete`. - """ - table_write = self.writer_builder.new_write() - for block in blocks: - block_arrow: pa.Table = BlockAccessor.for_block(block).to_arrow() - table_write.write_arrow(block_arrow) - commit_messages = table_write.prepare_commit() - table_write.close() - return commit_messages - - def on_write_complete(self, write_result: WriteResult[WriteReturnType]): - """Callback for when a write job completes. - - This can be used to `commit` a write output. This method must - succeed prior to ``write_datasink()`` returning to the user. If this - method fails, then ``on_write_failed()`` is called. - - Args: - write_result: Aggregated result of the - Write operator, containing write results and stats. - """ - table_commit = self.writer_builder.new_commit() - table_commit.commit([ - commit_message - for commit_messages in write_result.write_returns - for commit_message in commit_messages - ]) - table_commit.close() + ) -> List["CommitMessage"]: + commit_messages_list: List["CommitMessage"] = [] + table_write = None + + try: + writer_builder = self.table.new_batch_write_builder() + if self.overwrite: + writer_builder = writer_builder.overwrite() + + table_write = writer_builder.new_write() + + for block in blocks: + block_arrow: pa.Table = BlockAccessor.for_block(block).to_arrow() + + if block_arrow.num_rows == 0: + continue + + table_write.write_arrow(block_arrow) + + commit_messages = table_write.prepare_commit() + commit_messages_list.extend(commit_messages) + finally: + if table_write is not None: + table_write.close() + + return commit_messages_list + + def on_write_complete( + self, write_result: WriteResult[List["CommitMessage"]] + ): + table_commit = None + commit_messages_to_abort = [] + try: + all_commit_messages = [ + commit_message + for commit_messages in write_result.write_returns + for commit_message in commit_messages + ] + + non_empty_messages = [ + msg for msg in all_commit_messages if not msg.is_empty() + ] + + self._pending_commit_messages = non_empty_messages + + if not non_empty_messages: + logger.info("No data to commit (all commit messages are empty)") + self._pending_commit_messages = [] + return + + logger.info( + f"Committing {len(non_empty_messages)} commit messages " + f"for table {self._table_name}" + ) + + table_commit = self._writer_builder.new_commit() + commit_messages_to_abort = non_empty_messages + table_commit.commit(non_empty_messages) + + logger.info(f"Successfully committed write job for table {self._table_name}") + commit_messages_to_abort = [] + self._pending_commit_messages = [] + except Exception as e: + logger.error( + f"Error committing write job for table {self._table_name}: {e}", + exc_info=e + ) + if table_commit is not None and commit_messages_to_abort: + try: + table_commit.abort(commit_messages_to_abort) + logger.info( + f"Aborted {len(commit_messages_to_abort)} commit messages " + f"for table {self._table_name}" + ) + except Exception as abort_error: + logger.error( + f"Error aborting commit messages: {abort_error}", + exc_info=abort_error + ) + finally: + self._pending_commit_messages = [] + raise + finally: + if table_commit is not None: + try: + table_commit.close() + except Exception as e: + logger.warning( + f"Error closing table_commit: {e}", + exc_info=e + ) + + def on_write_failed(self, error: Exception) -> None: + logger.error( + f"Write job failed for table {self._table_name}. Error: {error}", + exc_info=error + ) + + if self._pending_commit_messages: + try: + table_commit = self._writer_builder.new_commit() + try: + table_commit.abort(self._pending_commit_messages) + logger.info( + f"Aborted {len(self._pending_commit_messages)} commit messages " + f"for table {self._table_name} in on_write_failed()" + ) + finally: + table_commit.close() + except Exception as abort_error: + logger.error( + f"Error aborting commit messages in on_write_failed(): {abort_error}", + exc_info=abort_error + ) + finally: + self._pending_commit_messages = [] diff --git a/paimon-python/pypaimon/write/table_write.py b/paimon-python/pypaimon/write/table_write.py index 9a4bdecf26f6..a2c9a9a4c364 100644 --- a/paimon-python/pypaimon/write/table_write.py +++ b/paimon-python/pypaimon/write/table_write.py @@ -16,7 +16,7 @@ # limitations under the License. ################################################################################ from collections import defaultdict -from typing import List +from typing import TYPE_CHECKING, Any, Dict, List, Optional import pyarrow as pa @@ -25,6 +25,9 @@ from pypaimon.write.commit_message import CommitMessage from pypaimon.write.file_store_write import FileStoreWrite +if TYPE_CHECKING: + from ray.data import Dataset + class TableWrite: def __init__(self, table, commit_user): @@ -68,10 +71,32 @@ def with_write_type(self, write_cols: List[str]): self.file_store_write.write_cols = write_cols return self - def write_raydata(self, dataset, overwrite=False, parallelism=1): + def write_ray( + self, + dataset: "Dataset", + overwrite: bool = False, + concurrency: Optional[int] = None, + ray_remote_args: Optional[Dict[str, Any]] = None, + ) -> None: + """ + Write a Ray Dataset to Paimon table. + + Args: + dataset: Ray Dataset to write. This is a distributed data collection + from Ray Data (ray.data.Dataset). + overwrite: Whether to overwrite existing data. Defaults to False. + concurrency: Optional max number of Ray tasks to run concurrently. + By default, dynamically decided based on available resources. + ray_remote_args: Optional kwargs passed to :func:`ray.remote` in write tasks. + For example, ``{"num_cpus": 2, "max_retries": 3}``. + """ from pypaimon.write.ray_datasink import PaimonDatasink datasink = PaimonDatasink(self.table, overwrite=overwrite) - dataset.write_datasink(datasink, concurrency=parallelism) + dataset.write_datasink( + datasink, + concurrency=concurrency, + ray_remote_args=ray_remote_args, + ) def close(self): self.file_store_write.close() diff --git a/pom.xml b/pom.xml index 6176baaab666..be463ef6b041 100644 --- a/pom.xml +++ b/pom.xml @@ -626,6 +626,7 @@ under the License. paimon-common/src/main/antlr4/** paimon-core/src/test/resources/compatibility/** + paimon-python/pypaimon/sample/data/**