Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,6 @@

import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.condition.DisabledIfSystemProperty;
import org.junit.jupiter.api.condition.EnabledIfSystemProperty;

import java.nio.file.Files;
Expand Down Expand Up @@ -357,7 +356,6 @@ public void testPKDeletionVectorWriteMultiBatchRawConvertable() throws Exception

@Test
@EnabledIfSystemProperty(named = "run.e2e.tests", matches = "true")
@DisabledIfSystemProperty(named = "python.version", matches = "3.6")
public void testReadPkTable() throws Exception {
Identifier identifier = identifier("mixed_test_pk_tablep_parquet");
Table table = catalog.getTable(identifier);
Expand Down
5 changes: 0 additions & 5 deletions paimon-python/pypaimon/tests/e2e/java_py_read_write_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -105,11 +105,6 @@ def test_read_append_table(self):

@parameterized.expand(get_file_format_params())
def test_py_write_read_pk_table(self, file_format):
if sys.version_info[:2] == (3, 6):
self.skipTest(
"Skipping on Python 3.6 due to PyArrow compatibility issue (RecordBatch.add_column not available). "
"Will be fixed in next PR."
)
pa_schema = pa.schema([
('id', pa.int32()),
('name', pa.string()),
Expand Down
34 changes: 21 additions & 13 deletions paimon-python/pypaimon/write/writer/key_value_data_writer.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,26 +36,34 @@ def _merge_data(self, existing_data: pa.Table, new_data: pa.Table) -> pa.Table:
def _add_system_fields(self, data: pa.RecordBatch) -> pa.RecordBatch:
"""Add system fields: _KEY_{pk_key}, _SEQUENCE_NUMBER, _VALUE_KIND."""
num_rows = data.num_rows
enhanced_table = data

for pk_key in reversed(self.trimmed_primary_keys):
if pk_key in data.column_names:

new_arrays = []
new_names = []

for pk_key in self.trimmed_primary_keys:
if pk_key in data.schema.names:
key_column = data.column(pk_key)
enhanced_table = enhanced_table.add_column(0, f'_KEY_{pk_key}', key_column)

new_arrays.append(key_column)
new_names.append(f'_KEY_{pk_key}')

sequence_column = pa.array([self.sequence_generator.next() for _ in range(num_rows)], type=pa.int64())
enhanced_table = enhanced_table.add_column(len(self.trimmed_primary_keys), '_SEQUENCE_NUMBER', sequence_column)

new_arrays.append(sequence_column)
new_names.append('_SEQUENCE_NUMBER')

# TODO: support real row kind here
value_kind_column = pa.array([0] * num_rows, type=pa.int8())
enhanced_table = enhanced_table.add_column(len(self.trimmed_primary_keys) + 1, '_VALUE_KIND',
value_kind_column)

return enhanced_table
new_arrays.append(value_kind_column)
new_names.append('_VALUE_KIND')

for i in range(data.num_columns):
new_arrays.append(data.column(i))
new_names.append(data.schema.names[i])

return pa.RecordBatch.from_arrays(new_arrays, names=new_names)

def _sort_by_primary_key(self, data: pa.RecordBatch) -> pa.RecordBatch:
sort_keys = [(key, 'ascending') for key in self.trimmed_primary_keys]
if '_SEQUENCE_NUMBER' in data.column_names:
if '_SEQUENCE_NUMBER' in data.schema.names:
sort_keys.append(('_SEQUENCE_NUMBER', 'ascending'))

sorted_indices = pc.sort_indices(data, sort_keys=sort_keys)
Expand Down