From 9db0f989e89474582451fd51254e558d2b8d4b26 Mon Sep 17 00:00:00 2001 From: xiaohongbo Date: Wed, 14 Jan 2026 23:56:49 +0800 Subject: [PATCH 1/3] [python] Fix RecordBatch compatibility issues in py36 --- .../java/org/apache/paimon/JavaPyE2ETest.java | 2 -- .../tests/e2e/java_py_read_write_test.py | 5 --- .../write/writer/key_value_data_writer.py | 32 ++++++++++++------- 3 files changed, 20 insertions(+), 19 deletions(-) diff --git a/paimon-core/src/test/java/org/apache/paimon/JavaPyE2ETest.java b/paimon-core/src/test/java/org/apache/paimon/JavaPyE2ETest.java index 03642b0e5547..39c2011bf486 100644 --- a/paimon-core/src/test/java/org/apache/paimon/JavaPyE2ETest.java +++ b/paimon-core/src/test/java/org/apache/paimon/JavaPyE2ETest.java @@ -53,7 +53,6 @@ import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; -import org.junit.jupiter.api.condition.DisabledIfSystemProperty; import org.junit.jupiter.api.condition.EnabledIfSystemProperty; import java.nio.file.Files; @@ -357,7 +356,6 @@ public void testPKDeletionVectorWriteMultiBatchRawConvertable() throws Exception @Test @EnabledIfSystemProperty(named = "run.e2e.tests", matches = "true") - @DisabledIfSystemProperty(named = "python.version", matches = "3.6") public void testReadPkTable() throws Exception { Identifier identifier = identifier("mixed_test_pk_tablep_parquet"); Table table = catalog.getTable(identifier); diff --git a/paimon-python/pypaimon/tests/e2e/java_py_read_write_test.py b/paimon-python/pypaimon/tests/e2e/java_py_read_write_test.py index 7a194574f5e5..661c5799f7d9 100644 --- a/paimon-python/pypaimon/tests/e2e/java_py_read_write_test.py +++ b/paimon-python/pypaimon/tests/e2e/java_py_read_write_test.py @@ -105,11 +105,6 @@ def test_read_append_table(self): @parameterized.expand(get_file_format_params()) def test_py_write_read_pk_table(self, file_format): - if sys.version_info[:2] == (3, 6): - self.skipTest( - "Skipping on Python 3.6 due to PyArrow compatibility issue (RecordBatch.add_column not available). " - "Will be fixed in next PR." - ) pa_schema = pa.schema([ ('id', pa.int32()), ('name', pa.string()), diff --git a/paimon-python/pypaimon/write/writer/key_value_data_writer.py b/paimon-python/pypaimon/write/writer/key_value_data_writer.py index 5e82369b6c97..f344ded600a9 100644 --- a/paimon-python/pypaimon/write/writer/key_value_data_writer.py +++ b/paimon-python/pypaimon/write/writer/key_value_data_writer.py @@ -36,26 +36,34 @@ def _merge_data(self, existing_data: pa.Table, new_data: pa.Table) -> pa.Table: def _add_system_fields(self, data: pa.RecordBatch) -> pa.RecordBatch: """Add system fields: _KEY_{pk_key}, _SEQUENCE_NUMBER, _VALUE_KIND.""" num_rows = data.num_rows - enhanced_table = data - + + new_arrays = [] + new_names = [] + for pk_key in reversed(self.trimmed_primary_keys): - if pk_key in data.column_names: + if pk_key in data.schema.names: key_column = data.column(pk_key) - enhanced_table = enhanced_table.add_column(0, f'_KEY_{pk_key}', key_column) - + new_arrays.append(key_column) + new_names.append(f'_KEY_{pk_key}') + + for i in range(data.num_columns): + new_arrays.append(data.column(i)) + new_names.append(data.schema.names[i]) + sequence_column = pa.array([self.sequence_generator.next() for _ in range(num_rows)], type=pa.int64()) - enhanced_table = enhanced_table.add_column(len(self.trimmed_primary_keys), '_SEQUENCE_NUMBER', sequence_column) - + new_arrays.append(sequence_column) + new_names.append('_SEQUENCE_NUMBER') + # TODO: support real row kind here value_kind_column = pa.array([0] * num_rows, type=pa.int8()) - enhanced_table = enhanced_table.add_column(len(self.trimmed_primary_keys) + 1, '_VALUE_KIND', - value_kind_column) - - return enhanced_table + new_arrays.append(value_kind_column) + new_names.append('_VALUE_KIND') + + return pa.RecordBatch.from_arrays(new_arrays, names=new_names) def _sort_by_primary_key(self, data: pa.RecordBatch) -> pa.RecordBatch: sort_keys = [(key, 'ascending') for key in self.trimmed_primary_keys] - if '_SEQUENCE_NUMBER' in data.column_names: + if '_SEQUENCE_NUMBER' in data.schema.names: sort_keys.append(('_SEQUENCE_NUMBER', 'ascending')) sorted_indices = pc.sort_indices(data, sort_keys=sort_keys) From 72bae6de0a8023977afd0672ce0ac0e42f36c846 Mon Sep 17 00:00:00 2001 From: xiaohongbo Date: Thu, 15 Jan 2026 00:21:19 +0800 Subject: [PATCH 2/3] fix Column order --- .../pypaimon/write/writer/key_value_data_writer.py | 14 ++++++++------ 1 file changed, 8 insertions(+), 6 deletions(-) diff --git a/paimon-python/pypaimon/write/writer/key_value_data_writer.py b/paimon-python/pypaimon/write/writer/key_value_data_writer.py index f344ded600a9..f652996557c2 100644 --- a/paimon-python/pypaimon/write/writer/key_value_data_writer.py +++ b/paimon-python/pypaimon/write/writer/key_value_data_writer.py @@ -34,22 +34,20 @@ def _merge_data(self, existing_data: pa.Table, new_data: pa.Table) -> pa.Table: return self._sort_by_primary_key(combined) def _add_system_fields(self, data: pa.RecordBatch) -> pa.RecordBatch: - """Add system fields: _KEY_{pk_key}, _SEQUENCE_NUMBER, _VALUE_KIND.""" + """ + Add system fields: _KEY_{pk_key}, _SEQUENCE_NUMBER, _VALUE_KIND. + """ num_rows = data.num_rows new_arrays = [] new_names = [] - for pk_key in reversed(self.trimmed_primary_keys): + for pk_key in self.trimmed_primary_keys: if pk_key in data.schema.names: key_column = data.column(pk_key) new_arrays.append(key_column) new_names.append(f'_KEY_{pk_key}') - for i in range(data.num_columns): - new_arrays.append(data.column(i)) - new_names.append(data.schema.names[i]) - sequence_column = pa.array([self.sequence_generator.next() for _ in range(num_rows)], type=pa.int64()) new_arrays.append(sequence_column) new_names.append('_SEQUENCE_NUMBER') @@ -59,6 +57,10 @@ def _add_system_fields(self, data: pa.RecordBatch) -> pa.RecordBatch: new_arrays.append(value_kind_column) new_names.append('_VALUE_KIND') + for i in range(data.num_columns): + new_arrays.append(data.column(i)) + new_names.append(data.schema.names[i]) + return pa.RecordBatch.from_arrays(new_arrays, names=new_names) def _sort_by_primary_key(self, data: pa.RecordBatch) -> pa.RecordBatch: From 7760003b69ee2b77ce364db2e9f78aa2047606f3 Mon Sep 17 00:00:00 2001 From: xiaohongbo Date: Thu, 15 Jan 2026 10:18:53 +0800 Subject: [PATCH 3/3] clean code --- paimon-python/pypaimon/write/writer/key_value_data_writer.py | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/paimon-python/pypaimon/write/writer/key_value_data_writer.py b/paimon-python/pypaimon/write/writer/key_value_data_writer.py index f652996557c2..3165a01069c0 100644 --- a/paimon-python/pypaimon/write/writer/key_value_data_writer.py +++ b/paimon-python/pypaimon/write/writer/key_value_data_writer.py @@ -34,9 +34,7 @@ def _merge_data(self, existing_data: pa.Table, new_data: pa.Table) -> pa.Table: return self._sort_by_primary_key(combined) def _add_system_fields(self, data: pa.RecordBatch) -> pa.RecordBatch: - """ - Add system fields: _KEY_{pk_key}, _SEQUENCE_NUMBER, _VALUE_KIND. - """ + """Add system fields: _KEY_{pk_key}, _SEQUENCE_NUMBER, _VALUE_KIND.""" num_rows = data.num_rows new_arrays = []