Skip to content

Upsert migration#47

Open
adampolomski wants to merge 176 commits intoupsert-mainfrom
upsert
Open

Upsert migration#47
adampolomski wants to merge 176 commits intoupsert-mainfrom
upsert

Conversation

@adampolomski
Copy link
Copy Markdown
Collaborator

Description

The description of the main changes of your pull request

Related Issue(s)

Documentation

niltecedu and others added 30 commits September 11, 2025 19:05
…elta-io#3737)

# Description
This PR aims to add per column parquet encoding support similar to what
pyarrow has right now. Since pyarrow engine is being deprecated this is
the best way to resolve the issue. It adds encoding spec same as what
parquet spec has defined.

https://parquet.apache.org/docs/file-format/data-pages/encodings/

Thanks to ghost or deleted-user who has carried out most of the work but
it never got merged in.

# Related Issue(s)

- closes delta-io#3319


# Documentation

This is implementing 

https://parquet.apache.org/docs/file-format/data-pages/encodings/

into ColumnProperties, as part of this it has also added Encoding enum.
It will the same format as the other ColumnProperties arguments,
PLAIN_DICTIONARY and RLE_DICTIONARY could not be implemented due to some
issues, will be looked into later on.

Here is a code snippet on how you can how use the delta tables and how
you can compare against existing implementation

```python
import os
import pandas as pd
import numpy as np

from deltalake import write_deltalake, WriterProperties, ColumnProperties,DeltaTable
from pathlib import Path

import pyarrow.parquet as pq
import pyarrow as pa

# Make some fake time series data
TOTAL_ROWS = 100_000_00
timestamps = pd.date_range(start=pd.Timestamp.now(), periods=TOTAL_ROWS, freq="5ms")
timeline = np.linspace(0, len(timestamps), len(timestamps))
print("Generating data...")
pat = pa.Table.from_pandas(
    pd.DataFrame(
        {
            # timestamp (auto-generated)
            "timestamp": timestamps,
            # Timeseries data as float32
            "timeseries_data": (10 * np.sin(2 * np.pi * 50 * timeline)).astype(
                np.float32
            ),
            "timeseries_int_data": (1000 * np.sin(2 * np.pi * 50 * timeline)).astype(
                np.int32
            ),
            # 1 minute partitions
            "partition_label": timestamps.strftime("%H%M"),
        }
    )
)
print("Data generated.")
output_path_normal = "example_deltalake"
write_deltalake(
    output_path_normal,
    data=pat,
    partition_by=["partition_label"],
    # Enabled compression for equivalent comparison, dictionary enabled leads to a larger file size
    writer_properties=WriterProperties(
        compression="ZSTD",
        compression_level=1,
        default_column_properties=ColumnProperties(dictionary_enabled=True),
    ),
    mode="overwrite",
    # Can't specify per-column encoding
)
print("Wrote normal delta table.")



output_path_encoded = "encoded_example_deltalake"
write_deltalake(
    output_path_encoded,
    data=pat,
    partition_by=["partition_label"],
    # Enabled compression for equivalent comparison, dictionary enabled leads to a larger file size
    writer_properties=WriterProperties(
        compression="ZSTD",
        compression_level=1,
        column_properties={
            "timestamp": ColumnProperties(dictionary_enabled=False,encoding="DELTA_BINARY_PACKED"),
            "timeseries_data": ColumnProperties(dictionary_enabled=False,encoding="BYTE_STREAM_SPLIT"),
            "timeseries_int_data": ColumnProperties(dictionary_enabled=False,encoding="DELTA_BINARY_PACKED"),
            "partition_label": ColumnProperties(dictionary_enabled=False,encoding="DELTA_BINARY_PACKED"),
        },
    ),
    mode="overwrite",
    # Can't specify per-column encoding
)
print("Wrote encoded delta table.")

output_path_default_encoded = "example_pyarrow_delta_default_encoding"
pq.write_to_dataset(
    pat,
    output_path_default_encoded,
    partition_cols=["partition_label"],
    use_dictionary=False,
    use_byte_stream_split=True,
    compression="ZSTD",
    compression_level=1,
)


output_path_delta_specifc_encoded = "example_pyarrow_delta_specifc_col_encoding"
pq.write_to_dataset(
    pat,
    output_path_delta_specifc_encoded,
    partition_cols=["partition_label"],
    # Ability to specify column encodings here
    use_dictionary=False,
    use_byte_stream_split=False,
    column_encoding={
        "timestamp": "DELTA_BINARY_PACKED",
        "timeseries_data": "BYTE_STREAM_SPLIT",
        "timeseries_int_data": "DELTA_BINARY_PACKED",
        "partition_label": "DELTA_BINARY_PACKED",
    },
    compression="ZSTD",
    compression_level=1,
)
print("Wrote delta table with pyarrow column encodings.")


def get_folder_size(folder):
    return ByteSize(
        sum(file.stat().st_size for file in Path(folder).rglob("*"))
    ).megabytes


class ByteSize(int):
    _KB = 1024
    _suffixes = "B", "KB", "MB", "GB", "PB"

    def __new__(cls, *args, **kwargs):
        return super().__new__(cls, *args, **kwargs)

    def __init__(self, *args, **kwargs):
        self.bytes = self.B = int(self)
        self.kilobytes = self.KB = self / self._KB**1
        self.megabytes = self.MB = self / self._KB**2
        self.gigabytes = self.GB = self / self._KB**3
        self.petabytes = self.PB = self / self._KB**4
        *suffixes, last = self._suffixes
        suffix = next(
            (suffix for suffix in suffixes if 1 < getattr(self, suffix) < self._KB),
            last,
        )
        self.readable = suffix, getattr(self, suffix)

        super().__init__()

    def __str__(self):
        return self.__format__(".2f")

print(DeltaTable(output_path_encoded).to_pandas())

print(f"The File size of delta table is {get_folder_size(output_path_normal)} MB")
print(f"The File size of delta table with parquet encoding is {get_folder_size(output_path_default_encoded)} MB")
print(
    f"The File size of delta table with pyarrow default column encodings is {get_folder_size(output_path_default_encoded)} MB"
)
print(
    f"The File size of delta table with pyarrow specific column encodings is {get_folder_size(output_path_delta_specifc_encoded)} MB"
)


print("Deleting the folders now...")
import shutil

shutil.rmtree(output_path_normal)
shutil.rmtree(output_path_encoded)
shutil.rmtree(output_path_default_encoded)
shutil.rmtree(output_path_delta_specifc_encoded)
print("Deleted the folders.")


``` 


I have ran this script on my dev environment of Ubuntu 24.04 WSL2 and
python 3.12.3

The outputs came to 

The File size of delta table is 122.21567821502686 MB
The File size of delta table with parquet encoding is 25.041666984558105
MB
The File size of delta table with pyarrow default column encodings is
25.041666984558105 MB
The File size of delta table with pyarrow specific column encodings is
20.985719680786133 MB

Which is roughly 75% improvement with mixed data. If you decide to
further optimisations you can go down to 95%-98% (if all INT data)

---------

Signed-off-by: Nilesh Zagade <Nilesh.Zagade@ukpowernetworks.co.uk>
Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
# Description
Just generally check for non-200 response codes and give them back
instead of a "token missing" error which is not helpful.

# Related Issue(s)
Fixes delta-io#3739

---------

Signed-off-by: Stephen Carman <shcarman@gmail.com>
…se it via SQL through DataSink

Signed-off-by: Florian Valeye <florian.valeye@gmail.com>
This comes from an error report in Slack. Since url::ParseError gives us
a few different types, we can use those to determine if something is
"file pathy" compared to just an outright incorrect URL

Signed-off-by: R. Tyler Croy <rtyler@brokenco.de>
This change had far reaching consequences due to the excessive use of
history() in our own tests

Fixes delta-io#3753

Signed-off-by: R. Tyler Croy <rtyler@brokenco.de>
Signed-off-by: R. Tyler Croy <rtyler@brokenco.de>
# Description
The description of the main changes of your pull request

# Related Issue(s)
- apache/datafusion#16799

# Documentation
As part of preparing for the DataFusion 50 release I am making a PR to
upgrade delta.rs

Signed-off-by: Andrew Lamb <andrew@nerdnetworks.org>
… of execution

After some debate decided that SessionConfig is sufficient to pull
through rather than SessionContext. Due to the wacky nature of some of
our code-paths in this operation, the signatures of a number of internal
functions had to change to allow SessionConfig to be pulled all the way
down into the ZOrderExecContext

Fixes delta-io#3751

Signed-off-by: R. Tyler Croy <rtyler@brokenco.de>
…o#3769)

# Description
Add `StorageConfig` when building `UnityCatalog` to allow catalog
credentials.

# Related Issue(s)
- closes delta-io#3757

Signed-off-by: Florian Valeye <florian.valeye@gmail.com>
# Description
- closes delta-io#3765

Signed-off-by: Ion Koutsouris <15728914+ion-elgreco@users.noreply.github.com>
With the recent MSRV change for the newest datafusion release, finally
time to upgrade some of these.

In the course of doing this, I decided to get rid of the unnecessary
`maplit` crate dependency here. Making HashMaps isn't that hard 😄

Signed-off-by: R. Tyler Croy <rtyler@brokenco.de>
Committed a fix upstream
[here](bnjbvr/cargo-machete#199)

Signed-off-by: R. Tyler Croy <rtyler@brokenco.de>
Signed-off-by: juejinyuxitu <juejinyuxitu@outlook.com>
There were a number of symbols only used during `datafusion` usage

Signed-off-by: R. Tyler Croy <rtyler@brokenco.de>
Signed-off-by: R. Tyler Croy <rtyler@brokenco.de>
This dependency comes from the chrono org, so I kinda trust it, but it's
not something we should have in the hierarchy unless it's being used

Signed-off-by: R. Tyler Croy <rtyler@brokenco.de>
The StructType public API change was pretty invasive, oof.

Fixes delta-io#3774

Signed-off-by: R. Tyler Croy <rtyler@brokenco.de>
The kernel level is doing this validation now

Signed-off-by: R. Tyler Croy <rtyler@brokenco.de>
get the delta table row count
Get the number of rows from the underlying list of parquet files

Signed-off-by: ohadmata <ohad@pecan.ai>
Get the number of records from the add actions
Since count is approximate, I made sure to update the docstrings for the
function to helpfully inform users

Signed-off-by: R. Tyler Croy <rtyler@brokenco.de>
The check that caught this wasn't apparently "required" when I had
refactored our repo rules a month or so ago.

I had also made this change locally, there might even be video evidence
of it 😄 but I must not have correctly pushed it

Signed-off-by: R. Tyler Croy <rtyler@brokenco.de>
…utation

The problem here is that when partition columns are present the
RecordBatchWriter can/will mutate its internal schema in a way that
becomes incompatible with the Delta table it was originally created
with.

I am quite fascinated by this bug because it only manifested when going
from a development environment to production.

Signed-off-by: R. Tyler Croy <rtyler@brokenco.de>
…ma evolution

Signed-off-by: R. Tyler Croy <rtyler@brokenco.de>
# Description
Inside the function `table.cleanup_metadata `look for the last
checkpoint before the requested minimum cutoff time and version. Only
delete files before that checkpoint. This is to fix the bug below and
make sure that versions inside the retention period are still loadable.

# Related Issue(s)
Fixes delta-io#3692

---------

Signed-off-by: Corwin Joy <corwin.joy@gmail.com>
Signed-off-by: R. Tyler Croy <rtyler@brokenco.de>
Co-authored-by: R. Tyler Croy <rtyler@brokenco.de>
Updates the requirements on [sqlparser](https://github.com/apache/datafusion-sqlparser-rs) to permit the latest version.
- [Changelog](https://github.com/apache/datafusion-sqlparser-rs/blob/main/CHANGELOG.md)
- [Commits](apache/datafusion-sqlparser-rs@v0.56.0...v0.59.0)

---
updated-dependencies:
- dependency-name: sqlparser
  dependency-version: 0.59.0
  dependency-type: direct:production
...

Signed-off-by: dependabot[bot] <support@github.com>
…lta-io#3793)

# Description
Add a step to clean the cache entries of GitHub actions:
- Integrate cleanup-cache job into dev_pr.yml workflow  
- Delete up to 100 cache entries per closed PR targeting main
- Prevent cache thrashing with proper error handling and logging

# Documentation
-
https://docs.github.com/en/actions/how-tos/manage-workflow-runs/manage-caches#force-deleting-cache-entries

Signed-off-by: Florian Valeye <florian.valeye@gmail.com>
Co-authored-by: R. Tyler Croy <rtyler@brokenco.de>
Updates the requirements on [foyer](https://github.com/foyer-rs/foyer) to permit the latest version.
- [Release notes](https://github.com/foyer-rs/foyer/releases)
- [Changelog](https://github.com/foyer-rs/foyer/blob/main/CHANGELOG.md)
- [Commits](foyer-rs/foyer@v0.17.2...v0.20.0)

---
updated-dependencies:
- dependency-name: foyer
  dependency-version: 0.20.0
  dependency-type: direct:production
...

Signed-off-by: dependabot[bot] <support@github.com>
adampolomski and others added 27 commits February 16, 2026 12:59
…ing execution

Remove cache calls on large DataFrames while keeping small result caching:
- Keep conflicts_df cache (small: join keys + file paths only)
- Remove implicit materializations from target_df, filtered_target_df, non_conflicting_target, and result_df
- All large DataFrames now use lazy streaming execution
- Add schema normalization (cast Dictionary to Utf8) for file path column to fix compatibility
- Add helper method find_conflicts_keys_only for clean anti-join logic

Co-authored-by: adampolomski <10196659+adampolomski@users.noreply.github.com>
The target_df parameter was not used in the function body - it only selects
keys from self.source. Removed the parameter and updated the call site.

Co-authored-by: adampolomski <10196659+adampolomski@users.noreply.github.com>
… detection

Instead of caching the conflicts DataFrame to work around DataFusion's Dictionary
encoding schema mismatch, implement manual join logic:
- Collect target DataFrame with join keys + file paths (small result)
- Collect distinct source join keys (small result)
- Perform join in memory using HashSet for efficient lookup
- Extract file paths that have matching keys

This avoids materializing large DataFrames while still handling the schema
inconsistency by working entirely in memory on small, already-collected data.

Memory impact: Only materializes join keys + file paths (one row per conflicting
file), not full row data. Much more efficient than caching full DataFrames.

Co-authored-by: adampolomski <10196659+adampolomski@users.noreply.github.com>
The previous approach incorrectly materialized the entire target DataFrame which
could be billions of rows. The corrected approach:

1. Keeps target_df and source lazy (not materialized)
2. Performs inner join in DataFusion (lazy operation)
3. Selects only minimal columns (join keys + file path, not full rows)
4. Collects ONLY the join result which is small (only conflicting rows)

Memory footprint: For a table with billions of rows but only thousands of
conflicts, we materialize only thousands of rows with minimal columns, not
billions of full rows.

The join result is inherently small because it contains only rows where join
keys match between source and target (actual conflicts).

Co-authored-by: adampolomski <10196659+adampolomski@users.noreply.github.com>
…rics

- Changed extract_conflicting_filenames to extract_conflicts_dataframe to return a DataFrame
- Added extract_file_paths_from_conflicts to extract file paths from the cached DataFrame
- Cache the conflicts DataFrame for reuse in multiple places
- Added num_conflicting_records field to UpsertMetrics
- Count and report conflicting records in metrics
- Updated tests to verify num_conflicting_records metric

Co-authored-by: adampolomski <10196659+adampolomski@users.noreply.github.com>
Co-authored-by: adampolomski <10196659+adampolomski@users.noreply.github.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.