From 8d0fc59131ef13988c9ac7a1000813ef10d95600 Mon Sep 17 00:00:00 2001 From: Michael Buntarman Date: Wed, 6 May 2026 23:12:57 +0700 Subject: [PATCH] docs: view backfill example for streams with revisions --- examples/revision_backfill_example/README.md | 82 ++++++++ examples/revision_backfill_example/main.py | 185 +++++++++++++++++++ 2 files changed, 267 insertions(+) create mode 100644 examples/revision_backfill_example/README.md create mode 100644 examples/revision_backfill_example/main.py diff --git a/examples/revision_backfill_example/README.md b/examples/revision_backfill_example/README.md new file mode 100644 index 0000000..c6558e2 --- /dev/null +++ b/examples/revision_backfill_example/README.md @@ -0,0 +1,82 @@ +# Revision Backfill Example + +Demonstrates how to backfill a primitive stream when the source data +contains multiple revisions for the same date — including reverts +(value A → value B → value A). + +## When you need this + +Steady-state ingest (one new value per date) does not need this +pattern. You need it on the **first/historical backfill** of a stream +whose source emits corrections, e.g. data that was reported, then +corrected, then re-corrected. To preserve every revision so that +`frozen_at` queries return the right value at each point in time, +each revision must land in its own block. + +## Why one big batch doesn't work + +TN auto-assigns `created_at` from the block height of the writing +transaction. Every row in one `insert_records` call shares the same +height. That has two consequences for same-date duplicates in one +batch: + +- They cannot be distinguished by `frozen_at` (same height = same + revision slot). +- Depending on the deployed schema, they may also collide on the + primary key (`stream`, `event_time`, `created_at`). + +Splitting same-date rows across separate transactions gives each one +a distinct `created_at`, which is what the frozen-revision model is +built around. + +## The pattern + +After your normal "fetch from API → fetch existing → reconcile (drop +consecutive duplicates) → sort by date asc" step, the result may +contain >1 row per date. Group those into **revision slots**: + +- Slot 0: first occurrence of each date +- Slot 1: second occurrence +- Slot N: Nth occurrence + +Submit one `insert_records` call per slot. Number of calls equals the +max revisions-per-date in the batch (typically 2-3, not one per row), +so the cost is bounded. + +```python +def split_into_revision_slots(records): + pending = list(records) + slots = [] + while pending: + seen = set() + slot, leftover = [], [] + for r in pending: + (slot if r["date"] not in seen else leftover).append(r) + seen.add(r["date"]) + slots.append(slot) + pending = leftover + return slots + +for slot in split_into_revision_slots(reconciled_rows): + client.insert_records(stream_id, slot, wait=True) +``` + +`wait=True` matters — each slot must land before the next is +submitted, so the chain mints a fresh block (and therefore a fresh +`created_at`) per slot. + +## Running + +```bash +# from the node repo +task single:start +task action:migrate:dev + +# from this repo +.venv/bin/python examples/revision_backfill_example/main.py +``` + +The example deploys a fresh stream, backfills 7 rows across 4 dates +(one date carries a revert from 2.0 → 2.5 → 2.0), then queries each +historical revision via `frozen_at` to prove all of them are +reachable. diff --git a/examples/revision_backfill_example/main.py b/examples/revision_backfill_example/main.py new file mode 100644 index 0000000..aec2b56 --- /dev/null +++ b/examples/revision_backfill_example/main.py @@ -0,0 +1,185 @@ +""" +Revision-aware backfill example — multiple revisions per date, same stream. + +Use case +-------- +Some sources emit a sequence of corrections for the same date — e.g. +"2024-02-01 was originally reported as 2.0, corrected to 2.5, then +re-corrected back to 2.0." For historical / `frozen_at` queries to +return the right value at each point in time, every revision must +land on TN as its own row with its own `created_at` (block height). + +The catch +--------- +TN auto-assigns `created_at` from the block height of the transaction +that wrote the row. All rows in one `insert_records` call share the +same height, so two rows for the same date in one batch cannot be +told apart by `frozen_at` — and depending on the deployed schema may +also collide on the primary key. The fix is to spread same-date rows +across separate transactions so each lands in its own block. + +Pattern +------- +After your normal "reconcile + sort by date asc" step, the result may +contain >1 row per date when the source has revisions. Group those +rows into **revision slots**: slot 0 = first occurrence of each date, +slot 1 = second occurrence, and so on. Submit one `insert_records` +call per slot. Number of calls = max revisions per date in the batch +(usually 2-3, not one per row). + +``` +while pending: + batch = pick first row per unique date from pending + insert_records(batch) + pending -= batch +``` + +Run against a local node spun up with `task single:start` from the +node repo. The dev key already has system:network_writer (granted by +`task action:migrate:dev`). +""" + +from collections import OrderedDict +from datetime import datetime, timezone + +from trufnetwork_sdk_py import STREAM_TYPE_PRIMITIVE, TNClient +from trufnetwork_sdk_py.utils import generate_stream_id + +TEST_PRIVATE_KEY = "0000000000000000000000000000000000000000000000000000000000000001" +TEST_PROVIDER_URL = "http://localhost:8484" + + +def _date_to_unix(d: datetime) -> int: + return int(d.replace(tzinfo=timezone.utc).timestamp()) + + +def _safe_drop(client: TNClient, stream_id: str) -> None: + try: + tx = client.destroy_stream(stream_id) + client.wait_for_tx(tx) + print(f" dropped existing stream (tx {tx[:10]}...)") + except Exception as e: + print(f" (no existing stream to drop: {e})") + + +def split_into_revision_slots(records: list[dict]) -> list[list[dict]]: + """ + Group records so each slot contains at most one row per date. + + Slot 0 holds the first occurrence of each date, slot 1 the second, + and so on. Records keep their original order within each slot, so + inserting slot N after slot N-1 preserves revision ordering for + each date. + + Returns [] if `records` is empty. + """ + pending = list(records) + slots: list[list[dict]] = [] + while pending: + seen_dates: set[int] = set() + slot: list[dict] = [] + leftover: list[dict] = [] + for r in pending: + if r["date"] in seen_dates: + leftover.append(r) + else: + seen_dates.add(r["date"]) + slot.append(r) + slots.append(slot) + pending = leftover + return slots + + +def main() -> None: + client = TNClient(TEST_PROVIDER_URL, TEST_PRIVATE_KEY) + print(f"connected as {client.get_current_account()}") + + stream_id = generate_stream_id("revision-backfill-example") + print(f"stream id: {stream_id}") + _safe_drop(client, stream_id) + + deploy_tx = client.deploy_stream(stream_id, STREAM_TYPE_PRIMITIVE) + client.wait_for_tx(deploy_tx) + print(f"stream deployed (tx {deploy_tx[:10]}...)\n") + + try: + # Simulated source data after your normal reconcile step. Two + # of the dates carry corrections; one is a revert (value 2.0 + # appears as both the first and the third revision for Feb 2). + d1 = datetime(2024, 2, 1) + d2 = datetime(2024, 2, 2) + d3 = datetime(2024, 2, 3) + d4 = datetime(2024, 2, 4) + source = [ + {"date": _date_to_unix(d1), "value": 1.0}, + {"date": _date_to_unix(d2), "value": 2.0}, + {"date": _date_to_unix(d2), "value": 2.5}, + {"date": _date_to_unix(d2), "value": 2.0}, + {"date": _date_to_unix(d3), "value": 3.0}, + {"date": _date_to_unix(d3), "value": 3.5}, + {"date": _date_to_unix(d4), "value": 4.0}, + ] + print(f"source: {len(source)} rows across 4 unique dates") + + # Group into revision slots and submit one tx per slot. + slots = split_into_revision_slots(source) + print(f"split into {len(slots)} slot(s) of size " + f"{[len(s) for s in slots]}\n") + + # Track block height per (date, value) so we can prove the + # frozen_at semantics work. In a real backfill you would not + # need this — it's purely for the demo at the end. + revision_heights: list[tuple[int, float, int]] = [] + for slot_idx, slot in enumerate(slots): + tx = client.insert_records(stream_id, slot, wait=True) + evt = client.get_transaction_event(tx) + height = evt["block_height"] + print(f"slot {slot_idx}: inserted {len(slot)} row(s) " + f"at block {height} (tx {tx[:10]}...)") + for r in slot: + revision_heights.append((r["date"], r["value"], height)) + + # Read back without frozen_at — should see the LATEST revision + # per date (highest created_at), so 4 rows with the final values. + print("\nlatest snapshot (no frozen_at):") + latest = client.get_records( + stream_id=stream_id, + data_provider=client.get_current_account(), + date_from=_date_to_unix(d1), + use_cache=False, + ).data + for r in latest: + print(f" date={_unix_to_iso(int(r['EventTime']))} " + f"value={r['Value']}") + + # Walk each revision and query with frozen_at = its block + # height to prove every step in the history is reachable. + print("\nrevision-by-revision (frozen_at per block):") + by_date: OrderedDict[int, list[tuple[float, int]]] = OrderedDict() + for d, v, h in revision_heights: + by_date.setdefault(d, []).append((v, h)) + for date_unix, revs in by_date.items(): + print(f" {_unix_to_iso(date_unix)}: " + f"{len(revs)} revision(s)") + for v, h in revs: + got = client.get_records( + stream_id=stream_id, + data_provider=client.get_current_account(), + date_from=date_unix, + date_to=date_unix, + frozen_at=h, + use_cache=False, + ).data + got_value = float(got[0]["Value"]) if got else None + ok = "OK" if got_value == v else "MISMATCH" + print(f" frozen_at={h}: stored={v} read={got_value} [{ok}]") + finally: + _safe_drop(client, stream_id) + + +def _unix_to_iso(ts: int) -> str: + return datetime.fromtimestamp(ts, tz=timezone.utc).strftime("%Y-%m-%d") + + +if __name__ == "__main__": + main()