Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 21 additions & 13 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -44,8 +44,9 @@ Argus is a .NET library that turns the Cardano blockchain into structured, query
- 🧩 **Customizable reducers** — define exactly how blockchain data is processed and stored.
- 🗄️ **Storage-agnostic** — ship on **PostgreSQL** (Entity Framework Core) or **MongoDB** out of the box; add your own backend behind one interface.
- 🔌 **Flexible connectivity** — connect to a node via Unix socket (N2C), TCP (N2N), or gRPC/UtxoRPC (U5C).
- ⚡ **Channel pipeline** — reducers run as a `System.Threading.Channels` graph, decoupling per-block latency from dependency depth.
- ⚡ **Batched commits** — a root and all its dependents run as one sequential graph into a single unit of work; one transaction (one fsync) covers a whole batch of blocks, so throughput is bound by your reducer logic, not by per-block durability.
- 🔗 **Reducer dependencies** — declare `[DependsOn(...)]` and a dependent sees its parent's writes in-process, before they're even committed.
- 🚀 **Pipelined N2N** — node-to-node chain-sync keeps many requests in flight and batches block-fetch, so a remote peer is no longer the bottleneck.
- 🔄 **Robust rollback handling** — chain reorganizations and operator-initiated rewinds are first-class.
- 🛡️ **Crash-safe & single-writer** — data and checkpoint commit in one transaction; a per-database lock prevents two indexers from clobbering each other.
- 📊 **Built-in dashboard** — track sync progress in the terminal.
Expand All @@ -56,20 +57,20 @@ Argus is a .NET library that turns the Cardano blockchain into structured, query
| --------- | ---- |
| **Chain Provider** | Connects to a Cardano node and streams roll-forward / roll-backward events (`N2CProvider`, `U5CProvider`, `N2NProvider`). |
| **`CardanoIndexWorker`** | The hosted service that drives synchronization: builds the reducer dependency graph, manages connections, and feeds blocks into the pipeline. |
| **`ReducerPipeline`** | A `System.Threading.Channels` graph mirroring your `[DependsOn]` declarations. Each reducer has a bounded inbox and its own run-loop; backpressure is automatic. |
| **`ReducerGraphProcessor`** | One per root reducer. Runs the root and all its dependents in topological order (parents first) through a bounded `System.Threading.Channels` inbox, accumulating blocks into a batch. Backpressure is automatic. |
| **`IReducer`** | Your transformation logic — `RollForwardAsync` / `RollBackwardAsync`. |
| **`IBlockUnitOfWork`** | A framework-managed, per-branch transactional unit. Reducers register writes against it; the framework commits data **and** the sync checkpoint atomically. |
| **`IBlockUnitOfWork`** | A framework-managed transactional unit shared by the whole graph for one batch. Reducers register writes against it; the framework commits all of them **and** every reducer's checkpoint together, atomically, when a batch trigger fires. |
| **`IBlockUnitOfWorkFactory`** | The storage-backend seam. One implementation per backend (EF/Postgres, Mongo, …). |
| **Single-instance lock** | Guarantees exactly one active indexer per database (Postgres advisory lock / Mongo lease). |

**A few design points worth knowing up front:**

- **The framework owns commit timing.** Reducers never call `SaveChangesAsync` (or a Mongo equivalent). You register writes through the unit of work; Argus commits your data and the reducer's checkpoint together, in one transaction, once per block per dependency branch. If anything throws, the whole branch rolls back — no partial writes.
- **Dependents read their parent's pending writes.** Within one branch the unit of work shares a single storage handle, so a dependent reducer can see what its parent just wrote via the change-tracker's `Local` view — no DB round-trip, no stale read.
- **The framework owns commit timing.** Reducers never call `SaveChangesAsync` (or a Mongo equivalent). You register writes through the unit of work; Argus commits your data and every reducer's checkpoint together, in one transaction, once per **batch**. A batch closes when it fills (`Sync:Commit:BatchSize`, default 500), ages out (`Sync:Commit:MaxDelayMs`, default 1000), or the inbox drains at the chain tip — so a single fsync covers many blocks while you never lag the tip. If anything throws, the **whole open batch** rolls back — no partial writes.
- **Dependents read their parent's pending writes.** Across the graph the unit of work shares a single storage handle, so a dependent reducer can see what its parent just wrote via the change-tracker's `Local` view — no DB round-trip, no stale read.
- **Recovery is fail-fast + restart.** Argus does not retry database faults in-process. On an unrecoverable error it stops the host; your supervisor (systemd, Kubernetes, `docker restart`) restarts the process, which resumes from the last committed checkpoint and replays. Because data and checkpoint are committed together, replay is at-least-once and idempotent.

<div align="center">
<img src="assets/argus_architecture.png" alt="Argus Architecture" width="100%" />
<img src="assets/argus_architecture.svg" alt="How Argus indexes a block: a Cardano node streams blocks through CardanoIndexWorker into one batched reducer graph per root (root with nested and fan-out dependents), committing atomically to PostgreSQL or MongoDB" width="100%" />
</div>

## 🚀 Getting Started
Expand Down Expand Up @@ -245,7 +246,7 @@ app.MapGet("/api/blocks/latest", async (IDbContextFactory<MyDbContext> dbf) =>

## 🔗 Reducer Dependencies

A reducer can declare a single dependency with `[DependsOn]`. Argus builds a dependency graph at startup, gives **root** reducers (no dependencies) the chain connections, and forwards blocks down to dependents through the pipeline.
A reducer can declare a single dependency with `[DependsOn]`. Argus builds a dependency graph at startup, gives **root** reducers (no dependencies) the chain connections, and runs each block through the root and its dependents in topological order.

```csharp
[DependsOn(typeof(BlockReducer))]
Expand All @@ -256,7 +257,7 @@ public class TransactionReducer : IReducer
MyDbContext db = uow.GetStorage<MyDbContext>();
ulong slot = block.Header().HeaderBody().Slot();

// BlockReducer ran first in this branch. Its pending Add() is visible here
// BlockReducer ran earlier in the graph. Its pending Add() is visible here
// via the change-tracker's Local view — before it's committed to the DB.
bool parentWroteThisBlock = db.Blocks.Local.Any(b => b.Slot == slot);
// ... your read-modify-write logic ...
Expand All @@ -272,9 +273,9 @@ public class TransactionReducer : IReducer

- **Single dependency per reducer** (prevents the diamond problem); circular dependencies are rejected at startup.
- **Only root reducers open chain connections** — fewer connections, less node load.
- **Linear chains share one unit of work**, so a dependent reads its parent's uncommitted writes from `Local`. At a **fork** (one parent, multiple dependents) the parent commits first and each child gets a fresh unit of work, reading the parent's now-committed data from the database.
- **The whole graph shares one unit of work per batch.** Every reducer — root and dependents, linear chains and fan-out siblings alike — runs in topological order into the same unit of work, so any dependent reads its parent's uncommitted writes from `Local` (no DB round-trip, no stale read). There is no separate "fork" code path.
- **Start points auto-adjust**: a fresh dependent of an already-synced parent begins at the parent's position instead of replaying from genesis.
- **Atomicity is per-branch, not across a fork.** Each branch commits in its own transaction; a fork child failing does not roll back a sibling or the parent that already committed.
- **Atomicity is whole-graph.** The entire graph commits in one transaction per batch; if any reducer throws, the whole open batch rolls back across **every** reducer — a sibling's writes never survive a crash that the parent or another sibling didn't.

## 💾 Storage Backends

Expand Down Expand Up @@ -374,7 +375,10 @@ On the next start every reducer rewinds to the global `Hash`/`Slot`; you can ove
| `CardanoNodeConnection:MaxRollbackSlots` | `10000` | Maximum automatic rollback depth. |
| `CardanoNodeConnection:RollbackBuffer` | `10` | Recent intersections retained per reducer. |
| `CardanoIndexReducers:ActiveReducers` | (all) | Allow-list of reducer class names to run. |
| `Sync:Pipeline:ChannelCapacity` | `256` | Bounded inbox size per reducer (backpressure). |
| `Sync:Pipeline:ChannelCapacity` | `256` | Bounded inbox size per reducer graph (backpressure). |
| `Sync:Commit:BatchSize` | `500` | Max blocks committed per transaction — one fsync per batch. |
| `Sync:Commit:MaxDelayMs` | `1000` | Max time (ms) a batch stays open before committing, even if not full. |
| `CardanoNodeConnection:TCP:PipelineDepth` | `100` | Max in-flight N2N chain-sync requests while catching up (pipelining). |
| `Sync:SingleInstanceLock:Enabled` | `true` | Enforce one active indexer per database. |
| `Sync:Rollback:Enabled` | `false` | Operator rollback mode (see [Rollbacks](#-rollbacks)). |
| `Sync:Worker:ExitOnCompletion` | `true` | Exit the process when sync reaches tip (set `false` in tests). |
Expand All @@ -400,7 +404,7 @@ dotnet pack src/Argus.Sync --configuration Release
dotnet pack src/Argus.Sync.MongoDb --configuration Release
```

Integration tests run against a real preprod/preview node and a local PostgreSQL (and, for the Mongo suite, a MongoDB replica set); they self-skip when those aren't reachable. The end-to-end suite under `src/Argus.Sync.Tests/EndToEnd` exercises the worker, the dependency pipeline, per-branch atomicity, crash-recovery, the single-instance lock, and both storage backends against real Conway-era blocks.
Integration tests run against a real preprod/preview node and a local PostgreSQL (and, for the Mongo suite, a MongoDB replica set); they self-skip when those aren't reachable. The end-to-end suite under `src/Argus.Sync.Tests/EndToEnd` exercises the worker, the dependency graph, whole-graph atomicity, batch commits, crash-recovery, N2N pipelining, the single-instance lock, and both storage backends against real Conway-era blocks.

## 📂 Project Layout

Expand All @@ -411,7 +415,11 @@ Integration tests run against a real preprod/preview node and a local PostgreSQL
| `src/Argus.Sync.Example` | Runnable reference app with example models and reducers. |
| `src/Argus.Sync.Tests` | Unit + end-to-end tests. |

## 🔼 Migrating from v0.x (pre-rearchitecture)
## 🔼 Migrating

> **1.1 → 1.2:** no code changes — the reducer contract is identical. Batched whole-graph commits and pipelined N2N apply automatically. Optionally tune `Sync:Commit:BatchSize` (500), `Sync:Commit:MaxDelayMs` (1000), and `CardanoNodeConnection:TCP:PipelineDepth` (100). Durability is now per-batch: after a hard crash, up to `BatchSize` blocks replay from the last committed checkpoint (idempotent — data and checkpoint commit together).

### From v0.x (pre-rearchitecture)

The rearchitecture — channel pipeline, storage-agnostic unit of work, and the package split — is a major version with breaking changes. The mapping:

Expand Down
72 changes: 72 additions & 0 deletions assets/argus_architecture.svg
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Loading
Loading