From ae48e157cb87a9b8b9e159108153bdca6f32c5f9 Mon Sep 17 00:00:00 2001 From: xiaohongbo Date: Wed, 7 Jan 2026 11:52:11 +0800 Subject: [PATCH 01/10] refactor ray sink --- paimon-python/pypaimon/tests/ray_sink_test.py | 268 ++++++++++++++++++ paimon-python/pypaimon/write/ray_datasink.py | 174 ++++++++---- 2 files changed, 386 insertions(+), 56 deletions(-) create mode 100644 paimon-python/pypaimon/tests/ray_sink_test.py diff --git a/paimon-python/pypaimon/tests/ray_sink_test.py b/paimon-python/pypaimon/tests/ray_sink_test.py new file mode 100644 index 000000000000..cee834f425dd --- /dev/null +++ b/paimon-python/pypaimon/tests/ray_sink_test.py @@ -0,0 +1,268 @@ +################################################################################ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +################################################################################ + +import os +import tempfile +import unittest +from unittest.mock import Mock, patch + +import pyarrow as pa +from ray.data._internal.execution.interfaces import TaskContext + +from pypaimon import CatalogFactory, Schema +from pypaimon.write.ray_datasink import PaimonDatasink +from pypaimon.write.commit_message import CommitMessage + + +class RaySinkTest(unittest.TestCase): + def setUp(self): + self.temp_dir = tempfile.mkdtemp() + self.warehouse_path = os.path.join(self.temp_dir, "warehouse") + os.makedirs(self.warehouse_path, exist_ok=True) + + catalog_options = { + "warehouse": self.warehouse_path + } + self.catalog = CatalogFactory.create(catalog_options) + self.catalog.create_database("test_db", ignore_if_exists=True) + + pa_schema = pa.schema([ + ('id', pa.int64()), + ('name', pa.string()), + ('value', pa.float64()) + ]) + + schema = Schema.from_pyarrow_schema( + pa_schema=pa_schema, + partition_keys=None, + primary_keys=['id'], + options={'bucket': '2'}, # Use fixed bucket mode for testing + comment='test table' + ) + + self.table_identifier = "test_db.test_table" + self.catalog.create_table(self.table_identifier, schema, ignore_if_exists=False) + self.table = self.catalog.get_table(self.table_identifier) + + def tearDown(self): + import shutil + if os.path.exists(self.temp_dir): + shutil.rmtree(self.temp_dir) + + def test_init_and_serialization(self): + """Test initialization, serialization, and table name.""" + datasink = PaimonDatasink(self.table, overwrite=False) + self.assertEqual(datasink.table, self.table) + self.assertFalse(datasink.overwrite) + self.assertIsNone(datasink._writer_builder) + self.assertEqual(datasink._get_table_name(), "test_db.test_table") + + datasink_overwrite = PaimonDatasink(self.table, overwrite=True) + self.assertTrue(datasink_overwrite.overwrite) + + # Test serialization + datasink._writer_builder = Mock() + state = datasink.__getstate__() + self.assertIn('table', state) + self.assertIn('overwrite', state) + self.assertIn('_writer_builder', state) + + new_datasink = PaimonDatasink.__new__(PaimonDatasink) + new_datasink.__setstate__(state) + self.assertEqual(new_datasink.table, self.table) + self.assertFalse(new_datasink.overwrite) + + def test_on_write_start(self): + """Test on_write_start with normal and overwrite modes.""" + datasink = PaimonDatasink(self.table, overwrite=False) + datasink.on_write_start() + self.assertIsNotNone(datasink._writer_builder) + self.assertFalse(datasink._writer_builder.static_partition) + + datasink_overwrite = PaimonDatasink(self.table, overwrite=True) + datasink_overwrite.on_write_start() + self.assertIsNotNone(datasink_overwrite._writer_builder.static_partition) + + def test_write_blocks(self): + """Test write method with empty, single, and multiple blocks.""" + datasink = PaimonDatasink(self.table, overwrite=False) + datasink.on_write_start() + ctx = Mock(spec=TaskContext) + + # Test empty block + empty_table = pa.table({ + 'id': pa.array([], type=pa.int64()), + 'name': pa.array([], type=pa.string()), + 'value': pa.array([], type=pa.float64()) + }) + result = datasink.write([empty_table], ctx) + self.assertEqual(result, []) + + # Test single block + single_block = pa.table({ + 'id': [1, 2, 3], + 'name': ['Alice', 'Bob', 'Charlie'], + 'value': [1.1, 2.2, 3.3] + }) + result = datasink.write([single_block], ctx) + self.assertIsInstance(result, list) + if result: + self.assertTrue(all(isinstance(msg, CommitMessage) for msg in result)) + + # Test multiple blocks + block1 = pa.table({ + 'id': [4, 5], + 'name': ['David', 'Eve'], + 'value': [4.4, 5.5] + }) + block2 = pa.table({ + 'id': [6, 7], + 'name': ['Frank', 'Grace'], + 'value': [6.6, 7.7] + }) + result = datasink.write([block1, block2], ctx) + self.assertIsInstance(result, list) + if result: + self.assertTrue(all(isinstance(msg, CommitMessage) for msg in result)) + + def test_write_creates_writer_builder_on_worker(self): + """Test that write method creates WriteBuilder on worker (not using driver's builder).""" + datasink = PaimonDatasink(self.table, overwrite=False) + datasink.on_write_start() + + with patch.object(self.table, 'new_batch_write_builder') as mock_builder: + mock_write_builder = Mock() + mock_write_builder.overwrite.return_value = mock_write_builder + mock_write = Mock() + mock_write.prepare_commit.return_value = [] + mock_write_builder.new_write.return_value = mock_write + mock_builder.return_value = mock_write_builder + + data_table = pa.table({ + 'id': [1], + 'name': ['Alice'], + 'value': [1.1] + }) + ctx = Mock(spec=TaskContext) + + datasink.write([data_table], ctx) + + mock_builder.assert_called_once() + + def test_on_write_complete(self): + """Test on_write_complete with empty messages, normal messages, and filtering.""" + from ray.data.datasource.datasink import WriteResult + + # Test empty messages + datasink = PaimonDatasink(self.table, overwrite=False) + datasink.on_write_start() + write_result = WriteResult( + num_rows=0, + size_bytes=0, + write_returns=[[], []] # Empty commit messages from workers + ) + datasink.on_write_complete(write_result) + + # Test with messages + datasink = PaimonDatasink(self.table, overwrite=False) + datasink.on_write_start() + commit_msg1 = Mock(spec=CommitMessage) + commit_msg1.is_empty.return_value = False + commit_msg2 = Mock(spec=CommitMessage) + commit_msg2.is_empty.return_value = False + + write_result = WriteResult( + num_rows=0, + size_bytes=0, + write_returns=[[commit_msg1], [commit_msg2]] + ) + + mock_commit = Mock() + datasink._writer_builder.new_commit = Mock(return_value=mock_commit) + datasink.on_write_complete(write_result) + + mock_commit.commit.assert_called_once() + commit_args = mock_commit.commit.call_args[0][0] + self.assertEqual(len(commit_args), 2) + + # Test filtering empty messages + datasink = PaimonDatasink(self.table, overwrite=False) + datasink.on_write_start() + empty_msg = Mock(spec=CommitMessage) + empty_msg.is_empty.return_value = True + non_empty_msg = Mock(spec=CommitMessage) + non_empty_msg.is_empty.return_value = False + + write_result = WriteResult( + num_rows=0, + size_bytes=0, + write_returns=[[empty_msg], [non_empty_msg]] + ) + + mock_commit = Mock() + datasink._writer_builder.new_commit = Mock(return_value=mock_commit) + datasink.on_write_complete(write_result) + + mock_commit.commit.assert_called_once() + commit_args = mock_commit.commit.call_args[0][0] + self.assertEqual(len(commit_args), 1) + self.assertEqual(commit_args[0], non_empty_msg) + + def test_error_handling(self): + """Test error handling in write method and on_write_failed.""" + datasink = PaimonDatasink(self.table, overwrite=False) + datasink.on_write_start() + + # Test write error handling + invalid_table = pa.table({ + 'wrong_column': [1, 2, 3] # Wrong schema + }) + ctx = Mock(spec=TaskContext) + + with self.assertRaises(Exception): + datasink.write([invalid_table], ctx) + + # Test that table_write is closed on error + with patch.object(self.table, 'new_batch_write_builder') as mock_builder: + mock_write_builder = Mock() + mock_write_builder.overwrite.return_value = mock_write_builder + mock_write = Mock() + mock_write.write_arrow.side_effect = Exception("Write error") + mock_write_builder.new_write.return_value = mock_write + mock_builder.return_value = mock_write_builder + + data_table = pa.table({ + 'id': [1], + 'name': ['Alice'], + 'value': [1.1] + }) + + with self.assertRaises(Exception): + datasink.write([data_table], ctx) + + mock_write.close.assert_called_once() + + # Test on_write_failed + datasink = PaimonDatasink(self.table, overwrite=False) + error = Exception("Test error") + datasink.on_write_failed(error) # Should not raise exception + + +if __name__ == '__main__': + unittest.main() + diff --git a/paimon-python/pypaimon/write/ray_datasink.py b/paimon-python/pypaimon/write/ray_datasink.py index 709a010fa42d..cb7c2fd17a22 100644 --- a/paimon-python/pypaimon/write/ray_datasink.py +++ b/paimon-python/pypaimon/write/ray_datasink.py @@ -18,74 +18,136 @@ Module to reawrited a Paimon table from a Ray Dataset, by using the Ray Datasink API. """ -from typing import Iterable -from ray.data.datasource.datasink import Datasink, WriteResult, WriteReturnType -from pypaimon.table.table import Table -from pypaimon.write.write_builder import WriteBuilder -from ray.data.block import BlockAccessor -from ray.data.block import Block +import logging +from typing import TYPE_CHECKING, Iterable, List, Optional + +from ray.data.datasource.datasink import Datasink, WriteResult +from ray.util.annotations import DeveloperAPI +from ray.data.block import BlockAccessor, Block from ray.data._internal.execution.interfaces import TaskContext import pyarrow as pa +if TYPE_CHECKING: + from pypaimon.table.table import Table + from pypaimon.write.write_builder import WriteBuilder + from pypaimon.write.commit_message import CommitMessage -class PaimonDatasink(Datasink): - - def __init__(self, table: Table, overwrite=False): +logger = logging.getLogger(__name__) + + +@DeveloperAPI +class PaimonDatasink(Datasink[List["CommitMessage"]]): + def __init__( + self, + table: "Table", + overwrite: bool = False, + ): self.table = table self.overwrite = overwrite + self._writer_builder: Optional["WriteBuilder"] = None - def on_write_start(self, schema=None) -> None: - """Callback for when a write job starts. + def __getstate__(self) -> dict: + state = self.__dict__.copy() + return state - Use this method to perform setup for write tasks. For example, creating a - staging bucket in S3. + def __setstate__(self, state: dict) -> None: + self.__dict__.update(state) + if self._writer_builder is not None and not hasattr(self._writer_builder, 'table'): + self._writer_builder = None - Args: - schema: Optional schema information passed by Ray Data. - """ - self.writer_builder: WriteBuilder = self.table.new_batch_write_builder() + def on_write_start(self, schema=None) -> None: + table_name = self._get_table_name() + logger.info(f"Starting write job for table {table_name}") + + self._writer_builder = self.table.new_batch_write_builder() if self.overwrite: - self.writer_builder = self.writer_builder.overwrite() + self._writer_builder = self._writer_builder.overwrite() def write( self, blocks: Iterable[Block], ctx: TaskContext, - ) -> WriteReturnType: - """Write blocks. This is used by a single write task. - - Args: - blocks: Generator of data blocks. - ctx: ``TaskContext`` for the write task. - - Returns: - Result of this write task. When the entire write operator finishes, - All returned values will be passed as `WriteResult.write_returns` - to `Datasink.on_write_complete`. - """ - table_write = self.writer_builder.new_write() - for block in blocks: - block_arrow: pa.Table = BlockAccessor.for_block(block).to_arrow() - table_write.write_arrow(block_arrow) - commit_messages = table_write.prepare_commit() - table_write.close() - return commit_messages - - def on_write_complete(self, write_result: WriteResult[WriteReturnType]): - """Callback for when a write job completes. - - This can be used to `commit` a write output. This method must - succeed prior to ``write_datasink()`` returning to the user. If this - method fails, then ``on_write_failed()`` is called. - - Args: - write_result: Aggregated result of the - Write operator, containing write results and stats. - """ - table_commit = self.writer_builder.new_commit() - table_commit.commit([ - commit_message - for commit_messages in write_result.write_returns - for commit_message in commit_messages - ]) - table_commit.close() + ) -> List["CommitMessage"]: + commit_messages_list: List["CommitMessage"] = [] + table_write = None + + try: + writer_builder = self.table.new_batch_write_builder() + if self.overwrite: + writer_builder = writer_builder.overwrite() + + table_write = writer_builder.new_write() + + for block in blocks: + block_arrow: pa.Table = BlockAccessor.for_block(block).to_arrow() + + if block_arrow.num_rows == 0: + continue + + table_write.write_arrow(block_arrow) + + commit_messages = table_write.prepare_commit() + commit_messages_list.extend(commit_messages) + + finally: + if table_write is not None: + try: + table_write.close() + except Exception as e: + logger.warning( + f"Error closing table_write: {e}", + exc_info=e + ) + + return commit_messages_list + + def on_write_complete( + self, write_result: WriteResult[List["CommitMessage"]] + ): + table_commit = None + try: + all_commit_messages = [ + commit_message + for commit_messages in write_result.write_returns + for commit_message in commit_messages + ] + + non_empty_messages = [ + msg for msg in all_commit_messages if not msg.is_empty() + ] + + if not non_empty_messages: + logger.info("No data to commit (all commit messages are empty)") + return + + table_name = self._get_table_name() + logger.info( + f"Committing {len(non_empty_messages)} commit messages " + f"for table {table_name}" + ) + + table_commit = self._writer_builder.new_commit() + table_commit.commit(non_empty_messages) + + logger.info(f"Successfully committed write job for table {table_name}") + finally: + if table_commit is not None: + try: + table_commit.close() + except Exception as e: + logger.warning( + f"Error closing table_commit: {e}", + exc_info=e + ) + + def on_write_failed(self, error: Exception) -> None: + table_name = self._get_table_name() + logger.warning( + f"Write job failed for table {table_name}. Error: {error}", + exc_info=error + ) + + def _get_table_name(self) -> str: + if hasattr(self.table, 'identifier'): + return self.table.identifier.get_full_name() + return 'unknown' From fbfa0fcf79d04110cf41d83fc61b7f957fb42155 Mon Sep 17 00:00:00 2001 From: xiaohongbo Date: Tue, 13 Jan 2026 15:22:48 +0800 Subject: [PATCH 02/10] invode abort when commit failed --- paimon-python/pypaimon/tests/ray_sink_test.py | 155 +++++++++++------- paimon-python/pypaimon/write/ray_datasink.py | 54 +++++- 2 files changed, 152 insertions(+), 57 deletions(-) diff --git a/paimon-python/pypaimon/tests/ray_sink_test.py b/paimon-python/pypaimon/tests/ray_sink_test.py index cee834f425dd..d3ef2c95d05b 100644 --- a/paimon-python/pypaimon/tests/ray_sink_test.py +++ b/paimon-python/pypaimon/tests/ray_sink_test.py @@ -98,8 +98,8 @@ def test_on_write_start(self): datasink_overwrite.on_write_start() self.assertIsNotNone(datasink_overwrite._writer_builder.static_partition) - def test_write_blocks(self): - """Test write method with empty, single, and multiple blocks.""" + def test_write(self): + """Test write method: empty blocks, multiple blocks, error handling, and resource cleanup.""" datasink = PaimonDatasink(self.table, overwrite=False) datasink.on_write_start() ctx = Mock(spec=TaskContext) @@ -113,7 +113,7 @@ def test_write_blocks(self): result = datasink.write([empty_table], ctx) self.assertEqual(result, []) - # Test single block + # Test single and multiple blocks single_block = pa.table({ 'id': [1, 2, 3], 'name': ['Alice', 'Bob', 'Charlie'], @@ -124,7 +124,6 @@ def test_write_blocks(self): if result: self.assertTrue(all(isinstance(msg, CommitMessage) for msg in result)) - # Test multiple blocks block1 = pa.table({ 'id': [4, 5], 'name': ['David', 'Eve'], @@ -140,11 +139,7 @@ def test_write_blocks(self): if result: self.assertTrue(all(isinstance(msg, CommitMessage) for msg in result)) - def test_write_creates_writer_builder_on_worker(self): - """Test that write method creates WriteBuilder on worker (not using driver's builder).""" - datasink = PaimonDatasink(self.table, overwrite=False) - datasink.on_write_start() - + # Test that write creates WriteBuilder on worker (not using driver's builder) with patch.object(self.table, 'new_batch_write_builder') as mock_builder: mock_write_builder = Mock() mock_write_builder.overwrite.return_value = mock_write_builder @@ -158,14 +153,33 @@ def test_write_creates_writer_builder_on_worker(self): 'name': ['Alice'], 'value': [1.1] }) - ctx = Mock(spec=TaskContext) - datasink.write([data_table], ctx) - mock_builder.assert_called_once() + invalid_table = pa.table({ + 'wrong_column': [1, 2, 3] + }) + with self.assertRaises(Exception): + datasink.write([invalid_table], ctx) + + with patch.object(self.table, 'new_batch_write_builder') as mock_builder: + mock_write_builder = Mock() + mock_write_builder.overwrite.return_value = mock_write_builder + mock_write = Mock() + mock_write.write_arrow.side_effect = Exception("Write error") + mock_write_builder.new_write.return_value = mock_write + mock_builder.return_value = mock_write_builder + + data_table = pa.table({ + 'id': [1], + 'name': ['Alice'], + 'value': [1.1] + }) + with self.assertRaises(Exception): + datasink.write([data_table], ctx) + mock_write.close.assert_called_once() + def test_on_write_complete(self): - """Test on_write_complete with empty messages, normal messages, and filtering.""" from ray.data.datasource.datasink import WriteResult # Test empty messages @@ -174,22 +188,24 @@ def test_on_write_complete(self): write_result = WriteResult( num_rows=0, size_bytes=0, - write_returns=[[], []] # Empty commit messages from workers + write_returns=[[], []] ) datasink.on_write_complete(write_result) - # Test with messages + # Test with messages and filtering empty messages datasink = PaimonDatasink(self.table, overwrite=False) datasink.on_write_start() commit_msg1 = Mock(spec=CommitMessage) commit_msg1.is_empty.return_value = False commit_msg2 = Mock(spec=CommitMessage) commit_msg2.is_empty.return_value = False + empty_msg = Mock(spec=CommitMessage) + empty_msg.is_empty.return_value = True write_result = WriteResult( num_rows=0, size_bytes=0, - write_returns=[[commit_msg1], [commit_msg2]] + write_returns=[[commit_msg1], [commit_msg2], [empty_msg]] ) mock_commit = Mock() @@ -198,69 +214,96 @@ def test_on_write_complete(self): mock_commit.commit.assert_called_once() commit_args = mock_commit.commit.call_args[0][0] - self.assertEqual(len(commit_args), 2) + self.assertEqual(len(commit_args), 2) # Empty message filtered out + mock_commit.close.assert_called_once() - # Test filtering empty messages + # Test commit failure: abort should be called datasink = PaimonDatasink(self.table, overwrite=False) datasink.on_write_start() - empty_msg = Mock(spec=CommitMessage) - empty_msg.is_empty.return_value = True - non_empty_msg = Mock(spec=CommitMessage) - non_empty_msg.is_empty.return_value = False + commit_msg1 = Mock(spec=CommitMessage) + commit_msg1.is_empty.return_value = False + commit_msg2 = Mock(spec=CommitMessage) + commit_msg2.is_empty.return_value = False write_result = WriteResult( num_rows=0, size_bytes=0, - write_returns=[[empty_msg], [non_empty_msg]] + write_returns=[[commit_msg1], [commit_msg2]] ) mock_commit = Mock() + mock_commit.commit.side_effect = Exception("Commit failed") datasink._writer_builder.new_commit = Mock(return_value=mock_commit) - datasink.on_write_complete(write_result) - mock_commit.commit.assert_called_once() - commit_args = mock_commit.commit.call_args[0][0] - self.assertEqual(len(commit_args), 1) - self.assertEqual(commit_args[0], non_empty_msg) + with self.assertRaises(Exception): + datasink.on_write_complete(write_result) + + mock_commit.abort.assert_called_once() + abort_args = mock_commit.abort.call_args[0][0] + self.assertEqual(len(abort_args), 2) + mock_commit.close.assert_called_once() - def test_error_handling(self): - """Test error handling in write method and on_write_failed.""" + # Test table_commit creation failure datasink = PaimonDatasink(self.table, overwrite=False) datasink.on_write_start() + commit_msg1 = Mock(spec=CommitMessage) + commit_msg1.is_empty.return_value = False - # Test write error handling - invalid_table = pa.table({ - 'wrong_column': [1, 2, 3] # Wrong schema - }) - ctx = Mock(spec=TaskContext) + write_result = WriteResult( + num_rows=0, + size_bytes=0, + write_returns=[[commit_msg1]] + ) + mock_new_commit = Mock(side_effect=Exception("Failed to create table_commit")) + datasink._writer_builder.new_commit = mock_new_commit with self.assertRaises(Exception): - datasink.write([invalid_table], ctx) + datasink.on_write_complete(write_result) + self.assertEqual(len(datasink._pending_commit_messages), 1) - # Test that table_write is closed on error - with patch.object(self.table, 'new_batch_write_builder') as mock_builder: - mock_write_builder = Mock() - mock_write_builder.overwrite.return_value = mock_write_builder - mock_write = Mock() - mock_write.write_arrow.side_effect = Exception("Write error") - mock_write_builder.new_write.return_value = mock_write - mock_builder.return_value = mock_write_builder + def test_on_write_failed(self): + # Test without pending messages (on_write_complete() never called) + datasink = PaimonDatasink(self.table, overwrite=False) + datasink.on_write_start() + self.assertEqual(datasink._pending_commit_messages, []) + error = Exception("Write job failed") + datasink.on_write_failed(error) # Should not raise exception - data_table = pa.table({ - 'id': [1], - 'name': ['Alice'], - 'value': [1.1] - }) + # Test with pending messages (on_write_complete() was called but failed) + datasink = PaimonDatasink(self.table, overwrite=False) + datasink.on_write_start() + commit_msg1 = Mock(spec=CommitMessage) + commit_msg2 = Mock(spec=CommitMessage) + datasink._pending_commit_messages = [commit_msg1, commit_msg2] - with self.assertRaises(Exception): - datasink.write([data_table], ctx) + mock_commit = Mock() + datasink._writer_builder.new_commit = Mock(return_value=mock_commit) + error = Exception("Write job failed") + datasink.on_write_failed(error) + + mock_commit.abort.assert_called_once() + abort_args = mock_commit.abort.call_args[0][0] + self.assertEqual(len(abort_args), 2) + self.assertEqual(abort_args[0], commit_msg1) + self.assertEqual(abort_args[1], commit_msg2) + mock_commit.close.assert_called_once() + self.assertEqual(datasink._pending_commit_messages, []) + + # Test abort failure handling (should not raise exception) + datasink = PaimonDatasink(self.table, overwrite=False) + datasink.on_write_start() + commit_msg1 = Mock(spec=CommitMessage) + datasink._pending_commit_messages = [commit_msg1] - mock_write.close.assert_called_once() + mock_commit = Mock() + mock_commit.abort.side_effect = Exception("Abort failed") + datasink._writer_builder.new_commit = Mock(return_value=mock_commit) + error = Exception("Write job failed") + datasink.on_write_failed(error) - # Test on_write_failed - datasink = PaimonDatasink(self.table, overwrite=False) - error = Exception("Test error") - datasink.on_write_failed(error) # Should not raise exception + mock_commit.abort.assert_called_once() + mock_commit.close.assert_called_once() + self.assertEqual(datasink._pending_commit_messages, []) if __name__ == '__main__': diff --git a/paimon-python/pypaimon/write/ray_datasink.py b/paimon-python/pypaimon/write/ray_datasink.py index cb7c2fd17a22..eda8f3743e15 100644 --- a/paimon-python/pypaimon/write/ray_datasink.py +++ b/paimon-python/pypaimon/write/ray_datasink.py @@ -45,6 +45,7 @@ def __init__( self.table = table self.overwrite = overwrite self._writer_builder: Optional["WriteBuilder"] = None + self._pending_commit_messages: List["CommitMessage"] = [] def __getstate__(self) -> dict: state = self.__dict__.copy() @@ -89,6 +90,12 @@ def write( commit_messages = table_write.prepare_commit() commit_messages_list.extend(commit_messages) + except Exception as e: + logger.error( + f"Error writing data to table {self._get_table_name()}: {e}", + exc_info=e + ) + raise finally: if table_write is not None: try: @@ -105,6 +112,7 @@ def on_write_complete( self, write_result: WriteResult[List["CommitMessage"]] ): table_commit = None + commit_messages_to_abort = [] try: all_commit_messages = [ commit_message @@ -116,8 +124,11 @@ def on_write_complete( msg for msg in all_commit_messages if not msg.is_empty() ] + self._pending_commit_messages = non_empty_messages + if not non_empty_messages: logger.info("No data to commit (all commit messages are empty)") + self._pending_commit_messages = [] # Clear after successful check return table_name = self._get_table_name() @@ -127,9 +138,31 @@ def on_write_complete( ) table_commit = self._writer_builder.new_commit() + commit_messages_to_abort = non_empty_messages table_commit.commit(non_empty_messages) logger.info(f"Successfully committed write job for table {table_name}") + commit_messages_to_abort = [] # Clear after successful commit + self._pending_commit_messages = [] # Clear after successful commit + except Exception as e: + table_name = self._get_table_name() + logger.error( + f"Error committing write job for table {table_name}: {e}", + exc_info=e + ) + if table_commit is not None and commit_messages_to_abort: + try: + table_commit.abort(commit_messages_to_abort) + logger.info( + f"Aborted {len(commit_messages_to_abort)} commit messages " + f"for table {table_name}" + ) + except Exception as abort_error: + logger.error( + f"Error aborting commit messages: {abort_error}", + exc_info=abort_error + ) + raise finally: if table_commit is not None: try: @@ -142,10 +175,29 @@ def on_write_complete( def on_write_failed(self, error: Exception) -> None: table_name = self._get_table_name() - logger.warning( + logger.error( f"Write job failed for table {table_name}. Error: {error}", exc_info=error ) + + if self._pending_commit_messages: + try: + table_commit = self._writer_builder.new_commit() + try: + table_commit.abort(self._pending_commit_messages) + logger.info( + f"Aborted {len(self._pending_commit_messages)} commit messages " + f"for table {table_name} in on_write_failed()" + ) + finally: + table_commit.close() + except Exception as abort_error: + logger.error( + f"Error aborting commit messages in on_write_failed(): {abort_error}", + exc_info=abort_error + ) + finally: + self._pending_commit_messages = [] # Clear after abort attempt def _get_table_name(self) -> str: if hasattr(self.table, 'identifier'): From c2b8ee038fc25ff53859dba899446a80e7587070 Mon Sep 17 00:00:00 2001 From: xiaohongbo Date: Tue, 13 Jan 2026 15:27:30 +0800 Subject: [PATCH 03/10] rename write_raydata to write_ray to be consistent withe write_pandas and write_arrow --- paimon-python/pypaimon/tests/ray_data_test.py | 2 +- paimon-python/pypaimon/write/table_write.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/paimon-python/pypaimon/tests/ray_data_test.py b/paimon-python/pypaimon/tests/ray_data_test.py index e931a4c7dcc1..01a85ab5cbc6 100644 --- a/paimon-python/pypaimon/tests/ray_data_test.py +++ b/paimon-python/pypaimon/tests/ray_data_test.py @@ -155,7 +155,7 @@ def test_basic_ray_data_write(self): ds = from_arrow(test_data) write_builder = table.new_batch_write_builder() writer = write_builder.new_write() - writer.write_raydata(ds, parallelism=2) + writer.write_ray(ds, parallelism=2) # Read using Ray Data read_builder = table.new_read_builder() table_read = read_builder.new_read() diff --git a/paimon-python/pypaimon/write/table_write.py b/paimon-python/pypaimon/write/table_write.py index 9a4bdecf26f6..d60176752fde 100644 --- a/paimon-python/pypaimon/write/table_write.py +++ b/paimon-python/pypaimon/write/table_write.py @@ -68,7 +68,7 @@ def with_write_type(self, write_cols: List[str]): self.file_store_write.write_cols = write_cols return self - def write_raydata(self, dataset, overwrite=False, parallelism=1): + def write_ray(self, dataset, overwrite=False, parallelism=1): from pypaimon.write.ray_datasink import PaimonDatasink datasink = PaimonDatasink(self.table, overwrite=overwrite) dataset.write_datasink(datasink, concurrency=parallelism) From d8effc2d762d23ceb75135487fdbbedbcb2ce5ce Mon Sep 17 00:00:00 2001 From: xiaohongbo Date: Tue, 13 Jan 2026 17:59:39 +0800 Subject: [PATCH 04/10] rename write_raydata to write_ray to be consistent withe write_pandas and write_arrow --- docs/content/program-api/python-api.md | 12 +- .../sample/rest_catalog_ray_sink_sample.py | 261 ++++++++++++++++++ paimon-python/pypaimon/tests/ray_data_test.py | 2 +- paimon-python/pypaimon/tests/ray_sink_test.py | 33 +++ paimon-python/pypaimon/write/table_write.py | 31 ++- 5 files changed, 334 insertions(+), 5 deletions(-) create mode 100644 paimon-python/pypaimon/sample/rest_catalog_ray_sink_sample.py diff --git a/docs/content/program-api/python-api.md b/docs/content/program-api/python-api.md index 406c8c1ef69f..d07224c2c3cc 100644 --- a/docs/content/program-api/python-api.md +++ b/docs/content/program-api/python-api.md @@ -180,7 +180,7 @@ write_builder = table.new_batch_write_builder() table_write = write_builder.new_write() table_commit = write_builder.new_commit() -# 2. Write data. Support 3 methods: +# 2. Write data. Support 4 methods: # 2.1 Write pandas.DataFrame dataframe = ... table_write.write_pandas(dataframe) @@ -193,6 +193,16 @@ table_write.write_arrow(pa_table) record_batch = ... table_write.write_arrow_batch(record_batch) +# 2.4 Write Ray Dataset (requires ray to be installed) +import ray +ray_dataset = ray.data.read_json("/path/to/data.jsonl") +table_write.write_ray(ray_dataset, overwrite=False, concurrency=2) +# Parameters: +# - dataset: Ray Dataset to write +# - overwrite: Whether to overwrite existing data (default: False) +# - concurrency: Optional max number of concurrent Ray tasks +# - ray_remote_args: Optional kwargs passed to ray.remote() (e.g., {"num_cpus": 2}) + # 3. Commit data commit_messages = table_write.prepare_commit() table_commit.commit(commit_messages) diff --git a/paimon-python/pypaimon/sample/rest_catalog_ray_sink_sample.py b/paimon-python/pypaimon/sample/rest_catalog_ray_sink_sample.py new file mode 100644 index 000000000000..555af19cd11b --- /dev/null +++ b/paimon-python/pypaimon/sample/rest_catalog_ray_sink_sample.py @@ -0,0 +1,261 @@ +################################################################################ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +################################################################################ +""" +Example: REST Catalog + Ray Sink + OSS JSON Data Source + +Demonstrates: +1. Reading JSON data from OSS using Ray Data +2. Writing data to Paimon table using Ray Sink (write_ray) +3. REST catalog integration for production scenarios +""" + +import json +import tempfile +import uuid + +import pandas as pd +import pyarrow as pa +import pyarrow.fs as pafs +import ray + +from pypaimon import CatalogFactory, Schema +from pypaimon.common.options.core_options import CoreOptions +from pypaimon.tests.rest.rest_server import RESTCatalogServer +from pypaimon.api.api_response import ConfigResponse +from pypaimon.api.auth import BearTokenAuthProvider + + +def create_sample_json_file(local_path: str, num_records: int = 100): + data = [] + for i in range(1, num_records + 1): + data.append({ + 'id': i, + 'name': f'Item_{i}', + 'category': ['A', 'B', 'C'][i % 3], + 'value': 10.5 + i * 2.3, + 'score': 50 + i * 5, + 'timestamp': f'2024-01-{i % 28 + 1:02d}T10:00:00Z' + }) + + with open(local_path, 'w') as f: + for record in data: + f.write(json.dumps(record) + '\n') # JSONL format + + print(f"Created sample JSON file with {num_records} records: {local_path}") + + +def create_oss_filesystem_for_ray( + endpoint: str, + access_key_id: str, + access_key_secret: str, + bucket: str = None, + region: str = None, + session_token: str = None +) -> pafs.S3FileSystem: + import pyarrow as pyarrow_module + from packaging import version + + client_kwargs = { + "access_key": access_key_id, + "secret_key": access_key_secret, + } + + if session_token: + client_kwargs["session_token"] = session_token + + if region: + client_kwargs["region"] = region + + if version.parse(pyarrow_module.__version__) >= version.parse("7.0.0"): + client_kwargs['force_virtual_addressing'] = True + client_kwargs['endpoint_override'] = endpoint + else: + if bucket: + client_kwargs['endpoint_override'] = f"{bucket}.{endpoint}" + else: + client_kwargs['endpoint_override'] = endpoint + + return pafs.S3FileSystem(**client_kwargs) + + +def main(): + ray.init(ignore_reinit_error=True, num_cpus=2) + print("Ray initialized successfully") + + temp_dir = tempfile.mkdtemp() + token = str(uuid.uuid4()) + server = RESTCatalogServer( + data_path=temp_dir, + auth_provider=BearTokenAuthProvider(token), + config=ConfigResponse(defaults={"prefix": "mock-test"}), + warehouse="warehouse" + ) + server.start() + print(f"REST server started at: {server.get_url()}") + + temp_json_dir = tempfile.mkdtemp() + json_file_path = f"{temp_json_dir}/data.jsonl" + create_sample_json_file(json_file_path, num_records=100) + + try: + catalog = CatalogFactory.create({ + 'metastore': 'rest', + 'uri': f"http://localhost:{server.port}", + 'warehouse': "warehouse", + 'token.provider': 'bear', + 'token': token, + }) + catalog.create_database("default", True) + + from pypaimon.common.options.core_options import CoreOptions + schema = Schema.from_pyarrow_schema(pa.schema([ + ('id', pa.int64()), + ('name', pa.string()), + ('category', pa.string()), + ('value', pa.float64()), + ('score', pa.int64()), + ('timestamp', pa.timestamp('s')), + ]), primary_keys=['id'], options={ + CoreOptions.BUCKET.key(): '4' + }) + + table_name = 'default.oss_json_import_table' + catalog.create_table(table_name, schema, True) + table = catalog.get_table(table_name) + + print(f"\nTable created: {table_name}") + print(f"Table path: {table.table_path}") + + print("\n" + "="*60) + print("Step 1: Reading JSON data from OSS using Ray Data") + print("="*60) + + # If using actual OSS, create filesystem and read + # Uncomment and configure with your OSS credentials: + # oss_fs = create_oss_filesystem_for_ray( + # endpoint=OSS_ENDPOINT, + # access_key_id=OSS_ACCESS_KEY_ID, + # access_key_secret=OSS_ACCESS_KEY_SECRET, + # bucket=OSS_BUCKET, + # region="cn-hangzhou", # Optional: OSS region + # ) + # ray_dataset = ray.data.read_json( + # OSS_JSON_PATH, + # filesystem=oss_fs, + # concurrency=2, + # ) + + print(f"Reading JSON from: {json_file_path}") + ray_dataset = ray.data.read_json( + json_file_path, + concurrency=2, + ) + + print(f"✓ Ray Dataset created successfully") + print(f" - Total rows: {ray_dataset.count()}") + print(f" - Schema: {ray_dataset.schema()}") + + sample_data = ray_dataset.take(3) + print("\nSample data (first 3 rows):") + for i, row in enumerate(sample_data, 1): + print(f" Row {i}: {row}") + + # Step 2: Write to Paimon table using Ray Sink + print("\n" + "="*60) + print("Step 2: Writing data to Paimon table using Ray Sink") + print("="*60) + + write_builder = table.new_batch_write_builder() + table_write = write_builder.new_write() + + print("Writing Ray Dataset to Paimon table...") + table_write.write_ray( + ray_dataset, + overwrite=False, + concurrency=2, + ray_remote_args={"num_cpus": 1} + ) + + # Commit the write + table_commit = write_builder.new_commit() + commit_messages = table_write.prepare_commit() + table_commit.commit(commit_messages) + table_write.close() + table_commit.close() + + print(f"✓ Successfully wrote {ray_dataset.count()} rows to table") + + # Step 3: Verify data by reading back + print("\n" + "="*60) + print("Step 3: Verifying data by reading back from table") + print("="*60) + + read_builder = table.new_read_builder() + table_read = read_builder.new_read() + table_scan = read_builder.new_scan() + splits = table_scan.plan().splits() + + # Read to Pandas for verification + result_df = table_read.to_pandas(splits) + print(f"✓ Read back {len(result_df)} rows from table") + print("\nFirst 5 rows from table:") + print(result_df.head().to_string()) + + ray_df = ray_dataset.to_pandas() + ray_df_sorted = ray_df.sort_values(by='id').reset_index(drop=True) + result_df_sorted = result_df.sort_values(by='id').reset_index(drop=True) + + pd.testing.assert_frame_equal( + ray_df_sorted[['id', 'name', 'category', 'value', 'score']], + result_df_sorted[['id', 'name', 'category', 'value', 'score']], + check_dtype=False # Allow type differences (e.g., int64 vs int32) + ) + print("✓ Data verification passed: written data matches source data") + + # Step 4: Demonstrate additional Ray Data operations before writing + print("\n" + "="*60) + print("Step 4: Demonstrating data transformation with Ray Data") + print("="*60) + + def add_computed_field(row): + """Add a computed field based on existing data.""" + row['value_score_ratio'] = row['value'] / row['score'] if row['score'] > 0 else 0.0 + return row + + transformed_dataset = ray_dataset.map(add_computed_field) + print(f"✓ Transformed dataset: {transformed_dataset.count()} rows") + + # Filter data + filtered_dataset = ray_dataset.filter(lambda row: row['value'] > 50.0) + print(f"✓ Filtered dataset (value > 50): {filtered_dataset.count()} rows") + + print("\n" + "="*60) + print("Summary") + print("="*60) + print("✓ Successfully demonstrated Ray Sink with OSS JSON data source") + + finally: + server.shutdown() + if ray.is_initialized(): + ray.shutdown() + print("\n✓ Server stopped and Ray shutdown") + + +if __name__ == '__main__': + main() + diff --git a/paimon-python/pypaimon/tests/ray_data_test.py b/paimon-python/pypaimon/tests/ray_data_test.py index 01a85ab5cbc6..00995d697673 100644 --- a/paimon-python/pypaimon/tests/ray_data_test.py +++ b/paimon-python/pypaimon/tests/ray_data_test.py @@ -155,7 +155,7 @@ def test_basic_ray_data_write(self): ds = from_arrow(test_data) write_builder = table.new_batch_write_builder() writer = write_builder.new_write() - writer.write_ray(ds, parallelism=2) + writer.write_ray(ds, concurrency=2) # Read using Ray Data read_builder = table.new_read_builder() table_read = read_builder.new_read() diff --git a/paimon-python/pypaimon/tests/ray_sink_test.py b/paimon-python/pypaimon/tests/ray_sink_test.py index d3ef2c95d05b..620fbf4b2961 100644 --- a/paimon-python/pypaimon/tests/ray_sink_test.py +++ b/paimon-python/pypaimon/tests/ray_sink_test.py @@ -87,6 +87,39 @@ def test_init_and_serialization(self): self.assertEqual(new_datasink.table, self.table) self.assertFalse(new_datasink.overwrite) + def test_table_and_writer_builder_serializable(self): + import pickle + try: + pickled_table = pickle.dumps(self.table) + unpickled_table = pickle.loads(pickled_table) + self.assertIsNotNone(unpickled_table) + builder = unpickled_table.new_batch_write_builder() + self.assertIsNotNone(builder) + except Exception as e: + self.fail(f"Table object is not serializable: {e}") + + writer_builder = self.table.new_batch_write_builder() + try: + pickled_builder = pickle.dumps(writer_builder) + unpickled_builder = pickle.loads(pickled_builder) + self.assertIsNotNone(unpickled_builder) + table_write = unpickled_builder.new_write() + self.assertIsNotNone(table_write) + table_write.close() + except Exception as e: + self.fail(f"WriterBuilder is not serializable: {e}") + + overwrite_builder = self.table.new_batch_write_builder().overwrite() + try: + pickled_overwrite = pickle.dumps(overwrite_builder) + unpickled_overwrite = pickle.loads(pickled_overwrite) + self.assertIsNotNone(unpickled_overwrite) + # static_partition is a dict, empty dict {} means overwrite all partitions + self.assertIsNotNone(unpickled_overwrite.static_partition) + self.assertIsInstance(unpickled_overwrite.static_partition, dict) + except Exception as e: + self.fail(f"Overwrite WriterBuilder is not serializable: {e}") + def test_on_write_start(self): """Test on_write_start with normal and overwrite modes.""" datasink = PaimonDatasink(self.table, overwrite=False) diff --git a/paimon-python/pypaimon/write/table_write.py b/paimon-python/pypaimon/write/table_write.py index d60176752fde..a2c9a9a4c364 100644 --- a/paimon-python/pypaimon/write/table_write.py +++ b/paimon-python/pypaimon/write/table_write.py @@ -16,7 +16,7 @@ # limitations under the License. ################################################################################ from collections import defaultdict -from typing import List +from typing import TYPE_CHECKING, Any, Dict, List, Optional import pyarrow as pa @@ -25,6 +25,9 @@ from pypaimon.write.commit_message import CommitMessage from pypaimon.write.file_store_write import FileStoreWrite +if TYPE_CHECKING: + from ray.data import Dataset + class TableWrite: def __init__(self, table, commit_user): @@ -68,10 +71,32 @@ def with_write_type(self, write_cols: List[str]): self.file_store_write.write_cols = write_cols return self - def write_ray(self, dataset, overwrite=False, parallelism=1): + def write_ray( + self, + dataset: "Dataset", + overwrite: bool = False, + concurrency: Optional[int] = None, + ray_remote_args: Optional[Dict[str, Any]] = None, + ) -> None: + """ + Write a Ray Dataset to Paimon table. + + Args: + dataset: Ray Dataset to write. This is a distributed data collection + from Ray Data (ray.data.Dataset). + overwrite: Whether to overwrite existing data. Defaults to False. + concurrency: Optional max number of Ray tasks to run concurrently. + By default, dynamically decided based on available resources. + ray_remote_args: Optional kwargs passed to :func:`ray.remote` in write tasks. + For example, ``{"num_cpus": 2, "max_retries": 3}``. + """ from pypaimon.write.ray_datasink import PaimonDatasink datasink = PaimonDatasink(self.table, overwrite=overwrite) - dataset.write_datasink(datasink, concurrency=parallelism) + dataset.write_datasink( + datasink, + concurrency=concurrency, + ray_remote_args=ray_remote_args, + ) def close(self): self.file_store_write.close() From 316834ea6b4c619a34a0068ffca5d2881b648aaa Mon Sep 17 00:00:00 2001 From: xiaohongbo Date: Tue, 13 Jan 2026 20:11:20 +0800 Subject: [PATCH 05/10] clean code --- .../pypaimon/sample/data/__init__.py | 0 paimon-python/pypaimon/sample/data/data.jsonl | 100 ++++++++++ .../sample/rest_catalog_ray_sink_sample.py | 186 ++++-------------- paimon-python/pypaimon/tests/ray_sink_test.py | 2 +- paimon-python/pypaimon/write/ray_datasink.py | 49 ++--- 5 files changed, 153 insertions(+), 184 deletions(-) create mode 100644 paimon-python/pypaimon/sample/data/__init__.py create mode 100644 paimon-python/pypaimon/sample/data/data.jsonl diff --git a/paimon-python/pypaimon/sample/data/__init__.py b/paimon-python/pypaimon/sample/data/__init__.py new file mode 100644 index 000000000000..e69de29bb2d1 diff --git a/paimon-python/pypaimon/sample/data/data.jsonl b/paimon-python/pypaimon/sample/data/data.jsonl new file mode 100644 index 000000000000..f33de19c57be --- /dev/null +++ b/paimon-python/pypaimon/sample/data/data.jsonl @@ -0,0 +1,100 @@ +{"id": 1, "name": "Item_1", "category": "B", "value": 12.8, "score": 55, "timestamp": "2024-01-02T10:00:00Z"} +{"id": 2, "name": "Item_2", "category": "C", "value": 15.1, "score": 60, "timestamp": "2024-01-03T10:00:00Z"} +{"id": 3, "name": "Item_3", "category": "A", "value": 17.4, "score": 65, "timestamp": "2024-01-04T10:00:00Z"} +{"id": 4, "name": "Item_4", "category": "B", "value": 19.7, "score": 70, "timestamp": "2024-01-05T10:00:00Z"} +{"id": 5, "name": "Item_5", "category": "C", "value": 22.0, "score": 75, "timestamp": "2024-01-06T10:00:00Z"} +{"id": 6, "name": "Item_6", "category": "A", "value": 24.299999999999997, "score": 80, "timestamp": "2024-01-07T10:00:00Z"} +{"id": 7, "name": "Item_7", "category": "B", "value": 26.599999999999998, "score": 85, "timestamp": "2024-01-08T10:00:00Z"} +{"id": 8, "name": "Item_8", "category": "C", "value": 28.9, "score": 90, "timestamp": "2024-01-09T10:00:00Z"} +{"id": 9, "name": "Item_9", "category": "A", "value": 31.2, "score": 95, "timestamp": "2024-01-10T10:00:00Z"} +{"id": 10, "name": "Item_10", "category": "B", "value": 33.5, "score": 100, "timestamp": "2024-01-11T10:00:00Z"} +{"id": 11, "name": "Item_11", "category": "C", "value": 35.8, "score": 105, "timestamp": "2024-01-12T10:00:00Z"} +{"id": 12, "name": "Item_12", "category": "A", "value": 38.099999999999994, "score": 110, "timestamp": "2024-01-13T10:00:00Z"} +{"id": 13, "name": "Item_13", "category": "B", "value": 40.4, "score": 115, "timestamp": "2024-01-14T10:00:00Z"} +{"id": 14, "name": "Item_14", "category": "C", "value": 42.699999999999996, "score": 120, "timestamp": "2024-01-15T10:00:00Z"} +{"id": 15, "name": "Item_15", "category": "A", "value": 45.0, "score": 125, "timestamp": "2024-01-16T10:00:00Z"} +{"id": 16, "name": "Item_16", "category": "B", "value": 47.3, "score": 130, "timestamp": "2024-01-17T10:00:00Z"} +{"id": 17, "name": "Item_17", "category": "C", "value": 49.599999999999994, "score": 135, "timestamp": "2024-01-18T10:00:00Z"} +{"id": 18, "name": "Item_18", "category": "A", "value": 51.9, "score": 140, "timestamp": "2024-01-19T10:00:00Z"} +{"id": 19, "name": "Item_19", "category": "B", "value": 54.199999999999996, "score": 145, "timestamp": "2024-01-20T10:00:00Z"} +{"id": 20, "name": "Item_20", "category": "C", "value": 56.5, "score": 150, "timestamp": "2024-01-21T10:00:00Z"} +{"id": 21, "name": "Item_21", "category": "A", "value": 58.8, "score": 155, "timestamp": "2024-01-22T10:00:00Z"} +{"id": 22, "name": "Item_22", "category": "B", "value": 61.099999999999994, "score": 160, "timestamp": "2024-01-23T10:00:00Z"} +{"id": 23, "name": "Item_23", "category": "C", "value": 63.4, "score": 165, "timestamp": "2024-01-24T10:00:00Z"} +{"id": 24, "name": "Item_24", "category": "A", "value": 65.69999999999999, "score": 170, "timestamp": "2024-01-25T10:00:00Z"} +{"id": 25, "name": "Item_25", "category": "B", "value": 68.0, "score": 175, "timestamp": "2024-01-26T10:00:00Z"} +{"id": 26, "name": "Item_26", "category": "C", "value": 70.3, "score": 180, "timestamp": "2024-01-27T10:00:00Z"} +{"id": 27, "name": "Item_27", "category": "A", "value": 72.6, "score": 185, "timestamp": "2024-01-28T10:00:00Z"} +{"id": 28, "name": "Item_28", "category": "B", "value": 74.89999999999999, "score": 190, "timestamp": "2024-01-01T10:00:00Z"} +{"id": 29, "name": "Item_29", "category": "C", "value": 77.19999999999999, "score": 195, "timestamp": "2024-01-02T10:00:00Z"} +{"id": 30, "name": "Item_30", "category": "A", "value": 79.5, "score": 200, "timestamp": "2024-01-03T10:00:00Z"} +{"id": 31, "name": "Item_31", "category": "B", "value": 81.8, "score": 205, "timestamp": "2024-01-04T10:00:00Z"} +{"id": 32, "name": "Item_32", "category": "C", "value": 84.1, "score": 210, "timestamp": "2024-01-05T10:00:00Z"} +{"id": 33, "name": "Item_33", "category": "A", "value": 86.39999999999999, "score": 215, "timestamp": "2024-01-06T10:00:00Z"} +{"id": 34, "name": "Item_34", "category": "B", "value": 88.69999999999999, "score": 220, "timestamp": "2024-01-07T10:00:00Z"} +{"id": 35, "name": "Item_35", "category": "C", "value": 91.0, "score": 225, "timestamp": "2024-01-08T10:00:00Z"} +{"id": 36, "name": "Item_36", "category": "A", "value": 93.3, "score": 230, "timestamp": "2024-01-09T10:00:00Z"} +{"id": 37, "name": "Item_37", "category": "B", "value": 95.6, "score": 235, "timestamp": "2024-01-10T10:00:00Z"} +{"id": 38, "name": "Item_38", "category": "C", "value": 97.89999999999999, "score": 240, "timestamp": "2024-01-11T10:00:00Z"} +{"id": 39, "name": "Item_39", "category": "A", "value": 100.19999999999999, "score": 245, "timestamp": "2024-01-12T10:00:00Z"} +{"id": 40, "name": "Item_40", "category": "B", "value": 102.5, "score": 250, "timestamp": "2024-01-13T10:00:00Z"} +{"id": 41, "name": "Item_41", "category": "C", "value": 104.8, "score": 255, "timestamp": "2024-01-14T10:00:00Z"} +{"id": 42, "name": "Item_42", "category": "A", "value": 107.1, "score": 260, "timestamp": "2024-01-15T10:00:00Z"} +{"id": 43, "name": "Item_43", "category": "B", "value": 109.39999999999999, "score": 265, "timestamp": "2024-01-16T10:00:00Z"} +{"id": 44, "name": "Item_44", "category": "C", "value": 111.69999999999999, "score": 270, "timestamp": "2024-01-17T10:00:00Z"} +{"id": 45, "name": "Item_45", "category": "A", "value": 113.99999999999999, "score": 275, "timestamp": "2024-01-18T10:00:00Z"} +{"id": 46, "name": "Item_46", "category": "B", "value": 116.3, "score": 280, "timestamp": "2024-01-19T10:00:00Z"} +{"id": 47, "name": "Item_47", "category": "C", "value": 118.6, "score": 285, "timestamp": "2024-01-20T10:00:00Z"} +{"id": 48, "name": "Item_48", "category": "A", "value": 120.89999999999999, "score": 290, "timestamp": "2024-01-21T10:00:00Z"} +{"id": 49, "name": "Item_49", "category": "B", "value": 123.19999999999999, "score": 295, "timestamp": "2024-01-22T10:00:00Z"} +{"id": 50, "name": "Item_50", "category": "C", "value": 125.49999999999999, "score": 300, "timestamp": "2024-01-23T10:00:00Z"} +{"id": 51, "name": "Item_51", "category": "A", "value": 127.8, "score": 305, "timestamp": "2024-01-24T10:00:00Z"} +{"id": 52, "name": "Item_52", "category": "B", "value": 130.1, "score": 310, "timestamp": "2024-01-25T10:00:00Z"} +{"id": 53, "name": "Item_53", "category": "C", "value": 132.39999999999998, "score": 315, "timestamp": "2024-01-26T10:00:00Z"} +{"id": 54, "name": "Item_54", "category": "A", "value": 134.7, "score": 320, "timestamp": "2024-01-27T10:00:00Z"} +{"id": 55, "name": "Item_55", "category": "B", "value": 137.0, "score": 325, "timestamp": "2024-01-28T10:00:00Z"} +{"id": 56, "name": "Item_56", "category": "C", "value": 139.29999999999998, "score": 330, "timestamp": "2024-01-01T10:00:00Z"} +{"id": 57, "name": "Item_57", "category": "A", "value": 141.6, "score": 335, "timestamp": "2024-01-02T10:00:00Z"} +{"id": 58, "name": "Item_58", "category": "B", "value": 143.89999999999998, "score": 340, "timestamp": "2024-01-03T10:00:00Z"} +{"id": 59, "name": "Item_59", "category": "C", "value": 146.2, "score": 345, "timestamp": "2024-01-04T10:00:00Z"} +{"id": 60, "name": "Item_60", "category": "A", "value": 148.5, "score": 350, "timestamp": "2024-01-05T10:00:00Z"} +{"id": 61, "name": "Item_61", "category": "B", "value": 150.79999999999998, "score": 355, "timestamp": "2024-01-06T10:00:00Z"} +{"id": 62, "name": "Item_62", "category": "C", "value": 153.1, "score": 360, "timestamp": "2024-01-07T10:00:00Z"} +{"id": 63, "name": "Item_63", "category": "A", "value": 155.39999999999998, "score": 365, "timestamp": "2024-01-08T10:00:00Z"} +{"id": 64, "name": "Item_64", "category": "B", "value": 157.7, "score": 370, "timestamp": "2024-01-09T10:00:00Z"} +{"id": 65, "name": "Item_65", "category": "C", "value": 160.0, "score": 375, "timestamp": "2024-01-10T10:00:00Z"} +{"id": 66, "name": "Item_66", "category": "A", "value": 162.29999999999998, "score": 380, "timestamp": "2024-01-11T10:00:00Z"} +{"id": 67, "name": "Item_67", "category": "B", "value": 164.6, "score": 385, "timestamp": "2024-01-12T10:00:00Z"} +{"id": 68, "name": "Item_68", "category": "C", "value": 166.89999999999998, "score": 390, "timestamp": "2024-01-13T10:00:00Z"} +{"id": 69, "name": "Item_69", "category": "A", "value": 169.2, "score": 395, "timestamp": "2024-01-14T10:00:00Z"} +{"id": 70, "name": "Item_70", "category": "B", "value": 171.5, "score": 400, "timestamp": "2024-01-15T10:00:00Z"} +{"id": 71, "name": "Item_71", "category": "C", "value": 173.79999999999998, "score": 405, "timestamp": "2024-01-16T10:00:00Z"} +{"id": 72, "name": "Item_72", "category": "A", "value": 176.1, "score": 410, "timestamp": "2024-01-17T10:00:00Z"} +{"id": 73, "name": "Item_73", "category": "B", "value": 178.39999999999998, "score": 415, "timestamp": "2024-01-18T10:00:00Z"} +{"id": 74, "name": "Item_74", "category": "C", "value": 180.7, "score": 420, "timestamp": "2024-01-19T10:00:00Z"} +{"id": 75, "name": "Item_75", "category": "A", "value": 183.0, "score": 425, "timestamp": "2024-01-20T10:00:00Z"} +{"id": 76, "name": "Item_76", "category": "B", "value": 185.29999999999998, "score": 430, "timestamp": "2024-01-21T10:00:00Z"} +{"id": 77, "name": "Item_77", "category": "C", "value": 187.6, "score": 435, "timestamp": "2024-01-22T10:00:00Z"} +{"id": 78, "name": "Item_78", "category": "A", "value": 189.89999999999998, "score": 440, "timestamp": "2024-01-23T10:00:00Z"} +{"id": 79, "name": "Item_79", "category": "B", "value": 192.2, "score": 445, "timestamp": "2024-01-24T10:00:00Z"} +{"id": 80, "name": "Item_80", "category": "C", "value": 194.5, "score": 450, "timestamp": "2024-01-25T10:00:00Z"} +{"id": 81, "name": "Item_81", "category": "A", "value": 196.79999999999998, "score": 455, "timestamp": "2024-01-26T10:00:00Z"} +{"id": 82, "name": "Item_82", "category": "B", "value": 199.1, "score": 460, "timestamp": "2024-01-27T10:00:00Z"} +{"id": 83, "name": "Item_83", "category": "C", "value": 201.39999999999998, "score": 465, "timestamp": "2024-01-28T10:00:00Z"} +{"id": 84, "name": "Item_84", "category": "A", "value": 203.7, "score": 470, "timestamp": "2024-01-01T10:00:00Z"} +{"id": 85, "name": "Item_85", "category": "B", "value": 205.99999999999997, "score": 475, "timestamp": "2024-01-02T10:00:00Z"} +{"id": 86, "name": "Item_86", "category": "C", "value": 208.29999999999998, "score": 480, "timestamp": "2024-01-03T10:00:00Z"} +{"id": 87, "name": "Item_87", "category": "A", "value": 210.6, "score": 485, "timestamp": "2024-01-04T10:00:00Z"} +{"id": 88, "name": "Item_88", "category": "B", "value": 212.89999999999998, "score": 490, "timestamp": "2024-01-05T10:00:00Z"} +{"id": 89, "name": "Item_89", "category": "C", "value": 215.2, "score": 495, "timestamp": "2024-01-06T10:00:00Z"} +{"id": 90, "name": "Item_90", "category": "A", "value": 217.49999999999997, "score": 500, "timestamp": "2024-01-07T10:00:00Z"} +{"id": 91, "name": "Item_91", "category": "B", "value": 219.79999999999998, "score": 505, "timestamp": "2024-01-08T10:00:00Z"} +{"id": 92, "name": "Item_92", "category": "C", "value": 222.1, "score": 510, "timestamp": "2024-01-09T10:00:00Z"} +{"id": 93, "name": "Item_93", "category": "A", "value": 224.39999999999998, "score": 515, "timestamp": "2024-01-10T10:00:00Z"} +{"id": 94, "name": "Item_94", "category": "B", "value": 226.7, "score": 520, "timestamp": "2024-01-11T10:00:00Z"} +{"id": 95, "name": "Item_95", "category": "C", "value": 228.99999999999997, "score": 525, "timestamp": "2024-01-12T10:00:00Z"} +{"id": 96, "name": "Item_96", "category": "A", "value": 231.29999999999998, "score": 530, "timestamp": "2024-01-13T10:00:00Z"} +{"id": 97, "name": "Item_97", "category": "B", "value": 233.6, "score": 535, "timestamp": "2024-01-14T10:00:00Z"} +{"id": 98, "name": "Item_98", "category": "C", "value": 235.89999999999998, "score": 540, "timestamp": "2024-01-15T10:00:00Z"} +{"id": 99, "name": "Item_99", "category": "A", "value": 238.2, "score": 545, "timestamp": "2024-01-16T10:00:00Z"} +{"id": 100, "name": "Item_100", "category": "B", "value": 240.49999999999997, "score": 550, "timestamp": "2024-01-17T10:00:00Z"} diff --git a/paimon-python/pypaimon/sample/rest_catalog_ray_sink_sample.py b/paimon-python/pypaimon/sample/rest_catalog_ray_sink_sample.py index 555af19cd11b..072a70cf7ccb 100644 --- a/paimon-python/pypaimon/sample/rest_catalog_ray_sink_sample.py +++ b/paimon-python/pypaimon/sample/rest_catalog_ray_sink_sample.py @@ -16,86 +16,34 @@ # limitations under the License. ################################################################################ """ -Example: REST Catalog + Ray Sink + OSS JSON Data Source +Example: REST Catalog + Ray Sink with Local File + Mock Server -Demonstrates: -1. Reading JSON data from OSS using Ray Data -2. Writing data to Paimon table using Ray Sink (write_ray) -3. REST catalog integration for production scenarios +Demonstrates reading JSON data from local file using Ray Data and writing to Paimon table +using Ray Sink (write_ray) with a mock REST catalog server. """ -import json +import os import tempfile import uuid -import pandas as pd import pyarrow as pa -import pyarrow.fs as pafs import ray from pypaimon import CatalogFactory, Schema from pypaimon.common.options.core_options import CoreOptions +from pypaimon.schema.data_types import PyarrowFieldParser from pypaimon.tests.rest.rest_server import RESTCatalogServer from pypaimon.api.api_response import ConfigResponse from pypaimon.api.auth import BearTokenAuthProvider -def create_sample_json_file(local_path: str, num_records: int = 100): - data = [] - for i in range(1, num_records + 1): - data.append({ - 'id': i, - 'name': f'Item_{i}', - 'category': ['A', 'B', 'C'][i % 3], - 'value': 10.5 + i * 2.3, - 'score': 50 + i * 5, - 'timestamp': f'2024-01-{i % 28 + 1:02d}T10:00:00Z' - }) - - with open(local_path, 'w') as f: - for record in data: - f.write(json.dumps(record) + '\n') # JSONL format - - print(f"Created sample JSON file with {num_records} records: {local_path}") - - -def create_oss_filesystem_for_ray( - endpoint: str, - access_key_id: str, - access_key_secret: str, - bucket: str = None, - region: str = None, - session_token: str = None -) -> pafs.S3FileSystem: - import pyarrow as pyarrow_module - from packaging import version - - client_kwargs = { - "access_key": access_key_id, - "secret_key": access_key_secret, - } - - if session_token: - client_kwargs["session_token"] = session_token - - if region: - client_kwargs["region"] = region - - if version.parse(pyarrow_module.__version__) >= version.parse("7.0.0"): - client_kwargs['force_virtual_addressing'] = True - client_kwargs['endpoint_override'] = endpoint - else: - if bucket: - client_kwargs['endpoint_override'] = f"{bucket}.{endpoint}" - else: - client_kwargs['endpoint_override'] = endpoint - - return pafs.S3FileSystem(**client_kwargs) +def _get_sample_data_path(filename: str) -> str: + sample_dir = os.path.dirname(os.path.abspath(__file__)) + return os.path.join(sample_dir, 'data', filename) def main(): ray.init(ignore_reinit_error=True, num_cpus=2) - print("Ray initialized successfully") temp_dir = tempfile.mkdtemp() token = str(uuid.uuid4()) @@ -108,9 +56,7 @@ def main(): server.start() print(f"REST server started at: {server.get_url()}") - temp_json_dir = tempfile.mkdtemp() - json_file_path = f"{temp_json_dir}/data.jsonl" - create_sample_json_file(json_file_path, num_records=100) + json_file_path = _get_sample_data_path('data.jsonl') try: catalog = CatalogFactory.create({ @@ -120,9 +66,8 @@ def main(): 'token.provider': 'bear', 'token': token, }) - catalog.create_database("default", True) + catalog.create_database("default", ignore_if_exists=True) - from pypaimon.common.options.core_options import CoreOptions schema = Schema.from_pyarrow_schema(pa.schema([ ('id', pa.int64()), ('name', pa.string()), @@ -135,55 +80,41 @@ def main(): }) table_name = 'default.oss_json_import_table' - catalog.create_table(table_name, schema, True) + catalog.create_table(table_name, schema, ignore_if_exists=True) table = catalog.get_table(table_name) - print(f"\nTable created: {table_name}") - print(f"Table path: {table.table_path}") - - print("\n" + "="*60) - print("Step 1: Reading JSON data from OSS using Ray Data") - print("="*60) - - # If using actual OSS, create filesystem and read - # Uncomment and configure with your OSS credentials: - # oss_fs = create_oss_filesystem_for_ray( - # endpoint=OSS_ENDPOINT, - # access_key_id=OSS_ACCESS_KEY_ID, - # access_key_secret=OSS_ACCESS_KEY_SECRET, - # bucket=OSS_BUCKET, - # region="cn-hangzhou", # Optional: OSS region - # ) - # ray_dataset = ray.data.read_json( - # OSS_JSON_PATH, - # filesystem=oss_fs, - # concurrency=2, - # ) - - print(f"Reading JSON from: {json_file_path}") + print(f"Reading JSON from local file: {json_file_path}") ray_dataset = ray.data.read_json( json_file_path, concurrency=2, ) - print(f"✓ Ray Dataset created successfully") - print(f" - Total rows: {ray_dataset.count()}") - print(f" - Schema: {ray_dataset.schema()}") - - sample_data = ray_dataset.take(3) - print("\nSample data (first 3 rows):") - for i, row in enumerate(sample_data, 1): - print(f" Row {i}: {row}") - - # Step 2: Write to Paimon table using Ray Sink - print("\n" + "="*60) - print("Step 2: Writing data to Paimon table using Ray Sink") - print("="*60) + print(f"Ray Dataset: {ray_dataset.count()} rows") + print(f"Schema: {ray_dataset.schema()}") + + table_pa_schema = PyarrowFieldParser.from_paimon_schema(table.table_schema.fields) + + def cast_batch_to_table_schema(batch: pa.RecordBatch) -> pa.Table: + arrays = [] + for field in table_pa_schema: + col_name = field.name + col_array = batch.column(col_name) + if isinstance(col_array, pa.ChunkedArray): + col_array = col_array.combine_chunks() + if col_array.type != field.type: + col_array = col_array.cast(field.type) + arrays.append(col_array) + record_batch = pa.RecordBatch.from_arrays(arrays, schema=table_pa_schema) + return pa.Table.from_batches([record_batch]) + + ray_dataset = ray_dataset.map_batches( + cast_batch_to_table_schema, + batch_format="pyarrow", + ) write_builder = table.new_batch_write_builder() table_write = write_builder.new_write() - print("Writing Ray Dataset to Paimon table...") table_write.write_ray( ray_dataset, overwrite=False, @@ -191,71 +122,28 @@ def main(): ray_remote_args={"num_cpus": 1} ) - # Commit the write table_commit = write_builder.new_commit() commit_messages = table_write.prepare_commit() table_commit.commit(commit_messages) table_write.close() table_commit.close() - print(f"✓ Successfully wrote {ray_dataset.count()} rows to table") - - # Step 3: Verify data by reading back - print("\n" + "="*60) - print("Step 3: Verifying data by reading back from table") - print("="*60) + print(f"Successfully wrote {ray_dataset.count()} rows to table") read_builder = table.new_read_builder() table_read = read_builder.new_read() table_scan = read_builder.new_scan() splits = table_scan.plan().splits() - # Read to Pandas for verification result_df = table_read.to_pandas(splits) - print(f"✓ Read back {len(result_df)} rows from table") - print("\nFirst 5 rows from table:") - print(result_df.head().to_string()) - - ray_df = ray_dataset.to_pandas() - ray_df_sorted = ray_df.sort_values(by='id').reset_index(drop=True) - result_df_sorted = result_df.sort_values(by='id').reset_index(drop=True) - - pd.testing.assert_frame_equal( - ray_df_sorted[['id', 'name', 'category', 'value', 'score']], - result_df_sorted[['id', 'name', 'category', 'value', 'score']], - check_dtype=False # Allow type differences (e.g., int64 vs int32) - ) - print("✓ Data verification passed: written data matches source data") + print(f"Read back {len(result_df)} rows from table") + print(result_df.head()) - # Step 4: Demonstrate additional Ray Data operations before writing - print("\n" + "="*60) - print("Step 4: Demonstrating data transformation with Ray Data") - print("="*60) - - def add_computed_field(row): - """Add a computed field based on existing data.""" - row['value_score_ratio'] = row['value'] / row['score'] if row['score'] > 0 else 0.0 - return row - - transformed_dataset = ray_dataset.map(add_computed_field) - print(f"✓ Transformed dataset: {transformed_dataset.count()} rows") - - # Filter data - filtered_dataset = ray_dataset.filter(lambda row: row['value'] > 50.0) - print(f"✓ Filtered dataset (value > 50): {filtered_dataset.count()} rows") - - print("\n" + "="*60) - print("Summary") - print("="*60) - print("✓ Successfully demonstrated Ray Sink with OSS JSON data source") - finally: server.shutdown() if ray.is_initialized(): ray.shutdown() - print("\n✓ Server stopped and Ray shutdown") if __name__ == '__main__': main() - diff --git a/paimon-python/pypaimon/tests/ray_sink_test.py b/paimon-python/pypaimon/tests/ray_sink_test.py index 620fbf4b2961..f9d3c21dd27b 100644 --- a/paimon-python/pypaimon/tests/ray_sink_test.py +++ b/paimon-python/pypaimon/tests/ray_sink_test.py @@ -70,7 +70,7 @@ def test_init_and_serialization(self): self.assertEqual(datasink.table, self.table) self.assertFalse(datasink.overwrite) self.assertIsNone(datasink._writer_builder) - self.assertEqual(datasink._get_table_name(), "test_db.test_table") + self.assertEqual(datasink._table_name, "test_db.test_table") datasink_overwrite = PaimonDatasink(self.table, overwrite=True) self.assertTrue(datasink_overwrite.overwrite) diff --git a/paimon-python/pypaimon/write/ray_datasink.py b/paimon-python/pypaimon/write/ray_datasink.py index eda8f3743e15..ec2b282f53a0 100644 --- a/paimon-python/pypaimon/write/ray_datasink.py +++ b/paimon-python/pypaimon/write/ray_datasink.py @@ -44,6 +44,7 @@ def __init__( ): self.table = table self.overwrite = overwrite + self._table_name = table.identifier.get_full_name() self._writer_builder: Optional["WriteBuilder"] = None self._pending_commit_messages: List["CommitMessage"] = [] @@ -55,10 +56,11 @@ def __setstate__(self, state: dict) -> None: self.__dict__.update(state) if self._writer_builder is not None and not hasattr(self._writer_builder, 'table'): self._writer_builder = None + if not hasattr(self, '_table_name'): + self._table_name = self.table.identifier.get_full_name() def on_write_start(self, schema=None) -> None: - table_name = self._get_table_name() - logger.info(f"Starting write job for table {table_name}") + logger.info(f"Starting write job for table {self._table_name}") self._writer_builder = self.table.new_batch_write_builder() if self.overwrite: @@ -89,22 +91,9 @@ def write( commit_messages = table_write.prepare_commit() commit_messages_list.extend(commit_messages) - - except Exception as e: - logger.error( - f"Error writing data to table {self._get_table_name()}: {e}", - exc_info=e - ) - raise finally: if table_write is not None: - try: - table_write.close() - except Exception as e: - logger.warning( - f"Error closing table_write: {e}", - exc_info=e - ) + table_write.close() return commit_messages_list @@ -128,26 +117,24 @@ def on_write_complete( if not non_empty_messages: logger.info("No data to commit (all commit messages are empty)") - self._pending_commit_messages = [] # Clear after successful check + self._pending_commit_messages = [] return - table_name = self._get_table_name() logger.info( f"Committing {len(non_empty_messages)} commit messages " - f"for table {table_name}" + f"for table {self._table_name}" ) table_commit = self._writer_builder.new_commit() commit_messages_to_abort = non_empty_messages table_commit.commit(non_empty_messages) - logger.info(f"Successfully committed write job for table {table_name}") - commit_messages_to_abort = [] # Clear after successful commit - self._pending_commit_messages = [] # Clear after successful commit + logger.info(f"Successfully committed write job for table {self._table_name}") + commit_messages_to_abort = [] + self._pending_commit_messages = [] except Exception as e: - table_name = self._get_table_name() logger.error( - f"Error committing write job for table {table_name}: {e}", + f"Error committing write job for table {self._table_name}: {e}", exc_info=e ) if table_commit is not None and commit_messages_to_abort: @@ -155,7 +142,7 @@ def on_write_complete( table_commit.abort(commit_messages_to_abort) logger.info( f"Aborted {len(commit_messages_to_abort)} commit messages " - f"for table {table_name}" + f"for table {self._table_name}" ) except Exception as abort_error: logger.error( @@ -174,9 +161,8 @@ def on_write_complete( ) def on_write_failed(self, error: Exception) -> None: - table_name = self._get_table_name() logger.error( - f"Write job failed for table {table_name}. Error: {error}", + f"Write job failed for table {self._table_name}. Error: {error}", exc_info=error ) @@ -187,7 +173,7 @@ def on_write_failed(self, error: Exception) -> None: table_commit.abort(self._pending_commit_messages) logger.info( f"Aborted {len(self._pending_commit_messages)} commit messages " - f"for table {table_name} in on_write_failed()" + f"for table {self._table_name} in on_write_failed()" ) finally: table_commit.close() @@ -197,9 +183,4 @@ def on_write_failed(self, error: Exception) -> None: exc_info=abort_error ) finally: - self._pending_commit_messages = [] # Clear after abort attempt - - def _get_table_name(self) -> str: - if hasattr(self.table, 'identifier'): - return self.table.identifier.get_full_name() - return 'unknown' + self._pending_commit_messages = [] \ No newline at end of file From cbaf3105a250441cce7bd858c6be54a9a2060660 Mon Sep 17 00:00:00 2001 From: xiaohongbo Date: Tue, 13 Jan 2026 20:14:43 +0800 Subject: [PATCH 06/10] fix --- paimon-python/pypaimon/write/ray_datasink.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/paimon-python/pypaimon/write/ray_datasink.py b/paimon-python/pypaimon/write/ray_datasink.py index ec2b282f53a0..16282a7ae348 100644 --- a/paimon-python/pypaimon/write/ray_datasink.py +++ b/paimon-python/pypaimon/write/ray_datasink.py @@ -149,6 +149,8 @@ def on_write_complete( f"Error aborting commit messages: {abort_error}", exc_info=abort_error ) + finally: + self._pending_commit_messages = [] raise finally: if table_commit is not None: From 92631bd72047e0ec71c7fdf6c53a165f9f01de57 Mon Sep 17 00:00:00 2001 From: xiaohongbo Date: Tue, 13 Jan 2026 20:18:05 +0800 Subject: [PATCH 07/10] fix --- paimon-python/pypaimon/write/ray_datasink.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/paimon-python/pypaimon/write/ray_datasink.py b/paimon-python/pypaimon/write/ray_datasink.py index 16282a7ae348..0353b52c4aa1 100644 --- a/paimon-python/pypaimon/write/ray_datasink.py +++ b/paimon-python/pypaimon/write/ray_datasink.py @@ -1,3 +1,4 @@ +################################################################################ # Licensed to the Apache Software Foundation (ASF) under one # or more contributor license agreements. See the NOTICE file # distributed with this work for additional information @@ -15,7 +16,7 @@ # limitations under the License. ################################################################################ """ -Module to reawrited a Paimon table from a Ray Dataset, by using the Ray Datasink API. +Module to write a Paimon table from a Ray Dataset, by using the Ray Datasink API. """ import logging From c2c24ccf57b025a0b4f6035cf5686c5db4aea10e Mon Sep 17 00:00:00 2001 From: xiaohongbo Date: Tue, 13 Jan 2026 21:00:46 +0800 Subject: [PATCH 08/10] fix --- paimon-python/pypaimon/sample/data/__init__.py | 18 ++++++++++++++++++ pom.xml | 1 + 2 files changed, 19 insertions(+) diff --git a/paimon-python/pypaimon/sample/data/__init__.py b/paimon-python/pypaimon/sample/data/__init__.py index e69de29bb2d1..a23bd62cc725 100644 --- a/paimon-python/pypaimon/sample/data/__init__.py +++ b/paimon-python/pypaimon/sample/data/__init__.py @@ -0,0 +1,18 @@ +################################################################################ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +################################################################################ + diff --git a/pom.xml b/pom.xml index 6176baaab666..be463ef6b041 100644 --- a/pom.xml +++ b/pom.xml @@ -626,6 +626,7 @@ under the License. paimon-common/src/main/antlr4/** paimon-core/src/test/resources/compatibility/** + paimon-python/pypaimon/sample/data/** From b5b2f4b6a1bb6869a568525948e4fa35c2d025a0 Mon Sep 17 00:00:00 2001 From: xiaohongbo Date: Tue, 13 Jan 2026 21:25:08 +0800 Subject: [PATCH 09/10] fix --- docs/content/program-api/python-api.md | 4 +++- paimon-python/dev/.rat-excludes | 1 + .../pypaimon/sample/rest_catalog_ray_sink_sample.py | 5 +---- 3 files changed, 5 insertions(+), 5 deletions(-) diff --git a/docs/content/program-api/python-api.md b/docs/content/program-api/python-api.md index d07224c2c3cc..a3a4abf44603 100644 --- a/docs/content/program-api/python-api.md +++ b/docs/content/program-api/python-api.md @@ -202,8 +202,10 @@ table_write.write_ray(ray_dataset, overwrite=False, concurrency=2) # - overwrite: Whether to overwrite existing data (default: False) # - concurrency: Optional max number of concurrent Ray tasks # - ray_remote_args: Optional kwargs passed to ray.remote() (e.g., {"num_cpus": 2}) +# Note: write_ray() handles commit internally through Ray Datasink API. +# Skip steps 3-4 if using write_ray() - just close the writer. -# 3. Commit data +# 3. Commit data (required for write_pandas/write_arrow/write_arrow_batch only) commit_messages = table_write.prepare_commit() table_commit.commit(commit_messages) diff --git a/paimon-python/dev/.rat-excludes b/paimon-python/dev/.rat-excludes index 576371afaea1..d2cc8061e94e 100644 --- a/paimon-python/dev/.rat-excludes +++ b/paimon-python/dev/.rat-excludes @@ -18,3 +18,4 @@ .gitignore rat-results.txt +pypaimon/sample/data/data.jsonl diff --git a/paimon-python/pypaimon/sample/rest_catalog_ray_sink_sample.py b/paimon-python/pypaimon/sample/rest_catalog_ray_sink_sample.py index 072a70cf7ccb..feb2471de937 100644 --- a/paimon-python/pypaimon/sample/rest_catalog_ray_sink_sample.py +++ b/paimon-python/pypaimon/sample/rest_catalog_ray_sink_sample.py @@ -122,11 +122,8 @@ def cast_batch_to_table_schema(batch: pa.RecordBatch) -> pa.Table: ray_remote_args={"num_cpus": 1} ) - table_commit = write_builder.new_commit() - commit_messages = table_write.prepare_commit() - table_commit.commit(commit_messages) + # write_ray() has already committed the data, just close the writer table_write.close() - table_commit.close() print(f"Successfully wrote {ray_dataset.count()} rows to table") From b6edff883815f0cf414716864df302cb5d196589 Mon Sep 17 00:00:00 2001 From: xiaohongbo Date: Tue, 13 Jan 2026 21:38:30 +0800 Subject: [PATCH 10/10] fix --- paimon-python/pypaimon/sample/data/__init__.py | 1 - paimon-python/pypaimon/tests/ray_sink_test.py | 1 - paimon-python/pypaimon/write/ray_datasink.py | 2 +- 3 files changed, 1 insertion(+), 3 deletions(-) diff --git a/paimon-python/pypaimon/sample/data/__init__.py b/paimon-python/pypaimon/sample/data/__init__.py index a23bd62cc725..65b48d4d79b4 100644 --- a/paimon-python/pypaimon/sample/data/__init__.py +++ b/paimon-python/pypaimon/sample/data/__init__.py @@ -15,4 +15,3 @@ # See the License for the specific language governing permissions and # limitations under the License. ################################################################################ - diff --git a/paimon-python/pypaimon/tests/ray_sink_test.py b/paimon-python/pypaimon/tests/ray_sink_test.py index f9d3c21dd27b..bc1ff3ef9f48 100644 --- a/paimon-python/pypaimon/tests/ray_sink_test.py +++ b/paimon-python/pypaimon/tests/ray_sink_test.py @@ -341,4 +341,3 @@ def test_on_write_failed(self): if __name__ == '__main__': unittest.main() - diff --git a/paimon-python/pypaimon/write/ray_datasink.py b/paimon-python/pypaimon/write/ray_datasink.py index 0353b52c4aa1..cb0335068339 100644 --- a/paimon-python/pypaimon/write/ray_datasink.py +++ b/paimon-python/pypaimon/write/ray_datasink.py @@ -186,4 +186,4 @@ def on_write_failed(self, error: Exception) -> None: exc_info=abort_error ) finally: - self._pending_commit_messages = [] \ No newline at end of file + self._pending_commit_messages = []