diff --git a/README.md b/README.md
index 6e906e7..dbbde24 100644
--- a/README.md
+++ b/README.md
@@ -44,8 +44,9 @@ Argus is a .NET library that turns the Cardano blockchain into structured, query
- ๐งฉ **Customizable reducers** โ define exactly how blockchain data is processed and stored.
- ๐๏ธ **Storage-agnostic** โ ship on **PostgreSQL** (Entity Framework Core) or **MongoDB** out of the box; add your own backend behind one interface.
- ๐ **Flexible connectivity** โ connect to a node via Unix socket (N2C), TCP (N2N), or gRPC/UtxoRPC (U5C).
-- โก **Channel pipeline** โ reducers run as a `System.Threading.Channels` graph, decoupling per-block latency from dependency depth.
+- โก **Batched commits** โ a root and all its dependents run as one sequential graph into a single unit of work; one transaction (one fsync) covers a whole batch of blocks, so throughput is bound by your reducer logic, not by per-block durability.
- ๐ **Reducer dependencies** โ declare `[DependsOn(...)]` and a dependent sees its parent's writes in-process, before they're even committed.
+- ๐ **Pipelined N2N** โ node-to-node chain-sync keeps many requests in flight and batches block-fetch, so a remote peer is no longer the bottleneck.
- ๐ **Robust rollback handling** โ chain reorganizations and operator-initiated rewinds are first-class.
- ๐ก๏ธ **Crash-safe & single-writer** โ data and checkpoint commit in one transaction; a per-database lock prevents two indexers from clobbering each other.
- ๐ **Built-in dashboard** โ track sync progress in the terminal.
@@ -56,20 +57,20 @@ Argus is a .NET library that turns the Cardano blockchain into structured, query
| --------- | ---- |
| **Chain Provider** | Connects to a Cardano node and streams roll-forward / roll-backward events (`N2CProvider`, `U5CProvider`, `N2NProvider`). |
| **`CardanoIndexWorker`** | The hosted service that drives synchronization: builds the reducer dependency graph, manages connections, and feeds blocks into the pipeline. |
-| **`ReducerPipeline`** | A `System.Threading.Channels` graph mirroring your `[DependsOn]` declarations. Each reducer has a bounded inbox and its own run-loop; backpressure is automatic. |
+| **`ReducerGraphProcessor`** | One per root reducer. Runs the root and all its dependents in topological order (parents first) through a bounded `System.Threading.Channels` inbox, accumulating blocks into a batch. Backpressure is automatic. |
| **`IReducer`** | Your transformation logic โ `RollForwardAsync` / `RollBackwardAsync`. |
-| **`IBlockUnitOfWork`** | A framework-managed, per-branch transactional unit. Reducers register writes against it; the framework commits data **and** the sync checkpoint atomically. |
+| **`IBlockUnitOfWork`** | A framework-managed transactional unit shared by the whole graph for one batch. Reducers register writes against it; the framework commits all of them **and** every reducer's checkpoint together, atomically, when a batch trigger fires. |
| **`IBlockUnitOfWorkFactory`** | The storage-backend seam. One implementation per backend (EF/Postgres, Mongo, โฆ). |
| **Single-instance lock** | Guarantees exactly one active indexer per database (Postgres advisory lock / Mongo lease). |
**A few design points worth knowing up front:**
-- **The framework owns commit timing.** Reducers never call `SaveChangesAsync` (or a Mongo equivalent). You register writes through the unit of work; Argus commits your data and the reducer's checkpoint together, in one transaction, once per block per dependency branch. If anything throws, the whole branch rolls back โ no partial writes.
-- **Dependents read their parent's pending writes.** Within one branch the unit of work shares a single storage handle, so a dependent reducer can see what its parent just wrote via the change-tracker's `Local` view โ no DB round-trip, no stale read.
+- **The framework owns commit timing.** Reducers never call `SaveChangesAsync` (or a Mongo equivalent). You register writes through the unit of work; Argus commits your data and every reducer's checkpoint together, in one transaction, once per **batch**. A batch closes when it fills (`Sync:Commit:BatchSize`, default 500), ages out (`Sync:Commit:MaxDelayMs`, default 1000), or the inbox drains at the chain tip โ so a single fsync covers many blocks while you never lag the tip. If anything throws, the **whole open batch** rolls back โ no partial writes.
+- **Dependents read their parent's pending writes.** Across the graph the unit of work shares a single storage handle, so a dependent reducer can see what its parent just wrote via the change-tracker's `Local` view โ no DB round-trip, no stale read.
- **Recovery is fail-fast + restart.** Argus does not retry database faults in-process. On an unrecoverable error it stops the host; your supervisor (systemd, Kubernetes, `docker restart`) restarts the process, which resumes from the last committed checkpoint and replays. Because data and checkpoint are committed together, replay is at-least-once and idempotent.
-
+
## ๐ Getting Started
@@ -245,7 +246,7 @@ app.MapGet("/api/blocks/latest", async (IDbContextFactory dbf) =>
## ๐ Reducer Dependencies
-A reducer can declare a single dependency with `[DependsOn]`. Argus builds a dependency graph at startup, gives **root** reducers (no dependencies) the chain connections, and forwards blocks down to dependents through the pipeline.
+A reducer can declare a single dependency with `[DependsOn]`. Argus builds a dependency graph at startup, gives **root** reducers (no dependencies) the chain connections, and runs each block through the root and its dependents in topological order.
```csharp
[DependsOn(typeof(BlockReducer))]
@@ -256,7 +257,7 @@ public class TransactionReducer : IReducer
MyDbContext db = uow.GetStorage();
ulong slot = block.Header().HeaderBody().Slot();
- // BlockReducer ran first in this branch. Its pending Add() is visible here
+ // BlockReducer ran earlier in the graph. Its pending Add() is visible here
// via the change-tracker's Local view โ before it's committed to the DB.
bool parentWroteThisBlock = db.Blocks.Local.Any(b => b.Slot == slot);
// ... your read-modify-write logic ...
@@ -272,9 +273,9 @@ public class TransactionReducer : IReducer
- **Single dependency per reducer** (prevents the diamond problem); circular dependencies are rejected at startup.
- **Only root reducers open chain connections** โ fewer connections, less node load.
-- **Linear chains share one unit of work**, so a dependent reads its parent's uncommitted writes from `Local`. At a **fork** (one parent, multiple dependents) the parent commits first and each child gets a fresh unit of work, reading the parent's now-committed data from the database.
+- **The whole graph shares one unit of work per batch.** Every reducer โ root and dependents, linear chains and fan-out siblings alike โ runs in topological order into the same unit of work, so any dependent reads its parent's uncommitted writes from `Local` (no DB round-trip, no stale read). There is no separate "fork" code path.
- **Start points auto-adjust**: a fresh dependent of an already-synced parent begins at the parent's position instead of replaying from genesis.
-- **Atomicity is per-branch, not across a fork.** Each branch commits in its own transaction; a fork child failing does not roll back a sibling or the parent that already committed.
+- **Atomicity is whole-graph.** The entire graph commits in one transaction per batch; if any reducer throws, the whole open batch rolls back across **every** reducer โ a sibling's writes never survive a crash that the parent or another sibling didn't.
## ๐พ Storage Backends
@@ -374,7 +375,10 @@ On the next start every reducer rewinds to the global `Hash`/`Slot`; you can ove
| `CardanoNodeConnection:MaxRollbackSlots` | `10000` | Maximum automatic rollback depth. |
| `CardanoNodeConnection:RollbackBuffer` | `10` | Recent intersections retained per reducer. |
| `CardanoIndexReducers:ActiveReducers` | (all) | Allow-list of reducer class names to run. |
-| `Sync:Pipeline:ChannelCapacity` | `256` | Bounded inbox size per reducer (backpressure). |
+| `Sync:Pipeline:ChannelCapacity` | `256` | Bounded inbox size per reducer graph (backpressure). |
+| `Sync:Commit:BatchSize` | `500` | Max blocks committed per transaction โ one fsync per batch. |
+| `Sync:Commit:MaxDelayMs` | `1000` | Max time (ms) a batch stays open before committing, even if not full. |
+| `CardanoNodeConnection:TCP:PipelineDepth` | `100` | Max in-flight N2N chain-sync requests while catching up (pipelining). |
| `Sync:SingleInstanceLock:Enabled` | `true` | Enforce one active indexer per database. |
| `Sync:Rollback:Enabled` | `false` | Operator rollback mode (see [Rollbacks](#-rollbacks)). |
| `Sync:Worker:ExitOnCompletion` | `true` | Exit the process when sync reaches tip (set `false` in tests). |
@@ -400,7 +404,7 @@ dotnet pack src/Argus.Sync --configuration Release
dotnet pack src/Argus.Sync.MongoDb --configuration Release
```
-Integration tests run against a real preprod/preview node and a local PostgreSQL (and, for the Mongo suite, a MongoDB replica set); they self-skip when those aren't reachable. The end-to-end suite under `src/Argus.Sync.Tests/EndToEnd` exercises the worker, the dependency pipeline, per-branch atomicity, crash-recovery, the single-instance lock, and both storage backends against real Conway-era blocks.
+Integration tests run against a real preprod/preview node and a local PostgreSQL (and, for the Mongo suite, a MongoDB replica set); they self-skip when those aren't reachable. The end-to-end suite under `src/Argus.Sync.Tests/EndToEnd` exercises the worker, the dependency graph, whole-graph atomicity, batch commits, crash-recovery, N2N pipelining, the single-instance lock, and both storage backends against real Conway-era blocks.
## ๐ Project Layout
@@ -411,7 +415,11 @@ Integration tests run against a real preprod/preview node and a local PostgreSQL
| `src/Argus.Sync.Example` | Runnable reference app with example models and reducers. |
| `src/Argus.Sync.Tests` | Unit + end-to-end tests. |
-## ๐ผ Migrating from v0.x (pre-rearchitecture)
+## ๐ผ Migrating
+
+> **1.1 โ 1.2:** no code changes โ the reducer contract is identical. Batched whole-graph commits and pipelined N2N apply automatically. Optionally tune `Sync:Commit:BatchSize` (500), `Sync:Commit:MaxDelayMs` (1000), and `CardanoNodeConnection:TCP:PipelineDepth` (100). Durability is now per-batch: after a hard crash, up to `BatchSize` blocks replay from the last committed checkpoint (idempotent โ data and checkpoint commit together).
+
+### From v0.x (pre-rearchitecture)
The rearchitecture โ channel pipeline, storage-agnostic unit of work, and the package split โ is a major version with breaking changes. The mapping:
diff --git a/assets/argus_architecture.svg b/assets/argus_architecture.svg
new file mode 100644
index 0000000..40a7081
--- /dev/null
+++ b/assets/argus_architecture.svg
@@ -0,0 +1,72 @@
+
diff --git a/src/Argus.Sync.Tests/EndToEnd/N2NPipeliningLiveTest.cs b/src/Argus.Sync.Tests/EndToEnd/N2NPipeliningLiveTest.cs
new file mode 100644
index 0000000..9fa1bc2
--- /dev/null
+++ b/src/Argus.Sync.Tests/EndToEnd/N2NPipeliningLiveTest.cs
@@ -0,0 +1,162 @@
+using Argus.Sync.Data.Models;
+using Argus.Sync.Providers;
+using Chrysalis.Codec.Extensions.Cardano.Core;
+using Chrysalis.Codec.Extensions.Cardano.Core.Header;
+using Chrysalis.Codec.Types.Cardano.Core;
+using Xunit.Abstractions;
+
+namespace Argus.Sync.Tests.EndToEnd;
+
+///
+/// Live integration tests for the pipelined against a local preprod
+/// node (N2N TCP, default 3001). Complements (intersection rollback
+/// + first blocks) by covering the two paths the pipelining rewrite added:
+///
+/// multi-batch catch-up โ pulling more blocks than one pipeline depth, asserting the drain-then-
+/// fetch batches yield strictly-ascending, gap/duplicate-free full blocks across batch boundaries;
+/// the tip path โ starting at the node's tip so the provider must handle
+/// MessageAwaitReply and then follow newly produced blocks in order, without hanging.
+///
+/// Both self-skip if the N2N port is unreachable.
+///
+public sealed class N2NPipeliningLiveTest(ITestOutputHelper output)
+{
+ private readonly ITestOutputHelper _output = output;
+
+ private const string Host = "localhost";
+ private const int Port = 3001;
+ private const ulong NetworkMagic = 1UL; // preprod
+ private const ulong IntersectionSlot = 126025608UL;
+ private const string IntersectionHash = "7ef942e6a670af6310737e9230b22e11a4bb1af69bed9affb09b1025b371d1cd";
+
+ [Fact]
+ [Trait("Category", "Integration")]
+ public async Task PipelinedCatchUp_AcrossMultipleBatches_StrictlyAscending_NoGapsOrDuplicates()
+ {
+ if (!await IsReachableAsync(Host, Port))
+ {
+ _output.WriteLine($"SKIP: N2N port {Host}:{Port} not reachable.");
+ return;
+ }
+
+ // Depth 50 with a 150-block target guarantees the run spans more than one drain-then-fetch batch,
+ // exercising ordering + de-duplication across batch boundaries.
+ await using N2NProvider provider = new(Host, Port, PipelineDepth: 50);
+ List intersection = [new(IntersectionHash, IntersectionSlot)];
+ using CancellationTokenSource cts = new(TimeSpan.FromSeconds(30));
+
+ const int target = 150;
+ List slots = [];
+ HashSet hashes = [];
+
+ try
+ {
+ await foreach (NextResponse response in provider.StartChainSyncAsync(intersection, NetworkMagic, cts.Token))
+ {
+ if (response.Action != NextResponseAction.RollForward)
+ {
+ continue;
+ }
+ IBlock block = response.Block!;
+ slots.Add(block.Header().HeaderBody().Slot());
+ _ = hashes.Add(block.Header().Hash());
+ _ = block.TransactionBodies(); // body present + decodes (header -> BlockFetch body worked)
+ if (slots.Count >= target)
+ {
+ break;
+ }
+ }
+ }
+ catch (OperationCanceledException)
+ {
+ // window elapsed โ handled by the skip below
+ }
+
+ if (slots.Count < target)
+ {
+ _output.WriteLine($"SKIP: only {slots.Count}/{target} blocks streamed (node not synced past the intersection).");
+ return;
+ }
+
+ for (int i = 1; i < slots.Count; i++)
+ {
+ Assert.True(slots[i] > slots[i - 1],
+ $"slot {slots[i]} must be strictly greater than {slots[i - 1]} (chain order, no reorder, no dupes)");
+ }
+ Assert.Equal(slots.Count, hashes.Count); // every block distinct across the batches
+ _output.WriteLine($"Pipelined {slots.Count} blocks, slots {slots[0]}..{slots[^1]} โ ascending + distinct.");
+ }
+
+ [Fact]
+ [Trait("Category", "Integration")]
+ public async Task AtTip_HandlesAwaitReply_AndFollowsNewBlocksInOrder()
+ {
+ if (!await IsReachableAsync(Host, Port))
+ {
+ _output.WriteLine($"SKIP: N2N port {Host}:{Port} not reachable.");
+ return;
+ }
+
+ await using N2NProvider provider = new(Host, Port);
+ Point tip = await provider.GetTipAsync(NetworkMagic);
+ using CancellationTokenSource cts = new(TimeSpan.FromSeconds(90));
+
+ NextResponse? firstRollback = null;
+ List followed = [];
+ ulong previous = tip.Slot;
+
+ try
+ {
+ await foreach (NextResponse response in provider.StartChainSyncAsync([tip], NetworkMagic, cts.Token))
+ {
+ if (response.Action == NextResponseAction.RollBack)
+ {
+ firstRollback ??= response;
+ }
+ else if (response.Action == NextResponseAction.RollForward)
+ {
+ ulong slot = response.Block!.Header().HeaderBody().Slot();
+ Assert.True(slot > previous, $"tip-follow slot {slot} must be greater than the previous {previous}");
+ previous = slot;
+ followed.Add(slot);
+ if (followed.Count >= 2)
+ {
+ break;
+ }
+ }
+ }
+ }
+ catch (OperationCanceledException)
+ {
+ // no/slow new blocks within the window โ acceptable (assertions below cover the deterministic part)
+ }
+
+ // Deterministic: starting AT the tip yields the intersection rollback, mapped Exclusive at the tip slot.
+ // Reaching this without a hang already proves the at-tip AwaitReply drain works.
+ Assert.NotNull(firstRollback);
+ Assert.Equal(RollBackType.Exclusive, firstRollback!.RollBackType);
+ Assert.Equal(tip.Slot, firstRollback.RollbackSlot);
+
+ // Best-effort: any blocks preprod produced during the window were past the tip and in order (asserted
+ // in the loop). Zero new blocks within 90s is acceptable โ the tip path was still exercised.
+ _output.WriteLine(followed.Count > 0
+ ? $"At tip {tip.Slot}: followed {followed.Count} new block(s) in order: {string.Join(", ", followed)}."
+ : $"At tip {tip.Slot}: AwaitReply handled cleanly, no new block within 90s (acceptable).");
+ }
+
+ private static async Task IsReachableAsync(string host, int port)
+ {
+ try
+ {
+ using System.Net.Sockets.TcpClient client = new();
+ using CancellationTokenSource cts = new(TimeSpan.FromSeconds(2));
+ await client.ConnectAsync(host, port, cts.Token);
+ return client.Connected;
+ }
+ catch
+ {
+ // N2N port not listening -> the test skips.
+ return false;
+ }
+ }
+}
diff --git a/src/Argus.Sync.Tests/Unit/AdaptivePipelineDepthTest.cs b/src/Argus.Sync.Tests/Unit/AdaptivePipelineDepthTest.cs
new file mode 100644
index 0000000..9acd83d
--- /dev/null
+++ b/src/Argus.Sync.Tests/Unit/AdaptivePipelineDepthTest.cs
@@ -0,0 +1,46 @@
+using Argus.Sync.Providers;
+
+namespace Argus.Sync.Tests.Unit;
+
+///
+/// Pins โ the function that scales the N2N chain-sync
+/// pipeline depth by the slot-gap to the node's tip. Pure, no node required (CI-runnable). It must
+/// collapse to 1 at the tip (so we never over-request), grow with the gap, never decrease as the gap
+/// grows, and never exceed the configured maximum.
+///
+public class AdaptivePipelineDepthTest
+{
+ [Theory]
+ [InlineData(0UL, 1)] // at the tip -> a single in-flight request
+ [InlineData(4UL, 1)]
+ [InlineData(20UL, 2)]
+ [InlineData(100UL, 5)]
+ [InlineData(500UL, 20)]
+ [InlineData(2_000UL, 100)]
+ public void MapsTipGapToDepth_UnderAMaxOf100(ulong tipGap, int expected)
+ => Assert.Equal(expected, N2NProvider.AdaptivePipelineDepth(maxDepth: 100, tipGap));
+
+ [Fact]
+ public void FarFromTip_ClampsToTheConfiguredMax()
+ {
+ Assert.Equal(100, N2NProvider.AdaptivePipelineDepth(maxDepth: 100, tipGap: 1_000_000));
+ Assert.Equal(50, N2NProvider.AdaptivePipelineDepth(maxDepth: 50, tipGap: 1_000_000));
+ Assert.Equal(500, N2NProvider.AdaptivePipelineDepth(maxDepth: 1_000, tipGap: 10_000));
+ }
+
+ [Fact]
+ public void NeverDecreasesAsTheGapGrows()
+ {
+ int previous = 0;
+ foreach (ulong gap in new ulong[] { 0, 4, 20, 100, 500, 2_000, 10_000, 50_000, 1_000_000 })
+ {
+ int depth = N2NProvider.AdaptivePipelineDepth(maxDepth: 1_000, tipGap: gap);
+ Assert.True(depth >= previous, $"depth must be monotonic non-decreasing in the gap (gap {gap} gave {depth} < {previous})");
+ previous = depth;
+ }
+ }
+
+ [Fact]
+ public void NeverBelowOne_EvenWithATinyMax()
+ => Assert.Equal(1, N2NProvider.AdaptivePipelineDepth(maxDepth: 1, tipGap: 1_000_000));
+}
diff --git a/src/Argus.Sync.Tests/Unit/ReducerGraphBatchCommitTest.cs b/src/Argus.Sync.Tests/Unit/ReducerGraphBatchCommitTest.cs
new file mode 100644
index 0000000..93111e6
--- /dev/null
+++ b/src/Argus.Sync.Tests/Unit/ReducerGraphBatchCommitTest.cs
@@ -0,0 +1,184 @@
+using Argus.Sync.Data.Models;
+using Argus.Sync.Reducers;
+using Argus.Sync.Tests.Mocks;
+using Argus.Sync.Workers;
+using Chrysalis.Codec.Extensions.Cardano.Core;
+using Chrysalis.Codec.Extensions.Cardano.Core.Header;
+using Microsoft.Extensions.Logging.Abstractions;
+using Xunit.Abstractions;
+using IBlock = Chrysalis.Codec.Types.Cardano.Core.IBlock;
+
+namespace Argus.Sync.Tests.Unit;
+
+///
+/// Drives directly against the committed TestData blocks with an
+/// in-memory unit of work โ no node, no database (CI-runnable). Pre-loading the inbox before
+/// RunAsync makes the inbox-drained trigger deterministic, so we can pin the two batch-commit
+/// behaviors the 1.2 architecture introduced: a fault rolls back the WHOLE open batch (not just the
+/// faulting block), and a partial batch commits as soon as the inbox drains (the at-tip trigger),
+/// without waiting for the size or delay triggers.
+///
+public sealed class ReducerGraphBatchCommitTest(ITestOutputHelper output)
+{
+ private readonly ITestOutputHelper _output = output;
+
+ [Fact]
+ public async Task FaultMidBatch_RollsBackTheWholeOpenBatch_NothingCommits()
+ {
+ IBlock[] blocks = LoadBlocks(3);
+ if (blocks.Length < 3)
+ {
+ _output.WriteLine("SKIP: TestData/Blocks not present.");
+ return;
+ }
+ ulong crashSlot = blocks[2].Header().HeaderBody().Slot();
+
+ RecordingBackend backend = new();
+ ReducerGraphProcessor processor = new(
+ [new StagingReducer(), new CrashOnSlotReducer(crashSlot)],
+ new FakeUnitOfWorkFactory(backend),
+ channelCapacity: 64,
+ batchSize: 10, // > 3 blocks, so only the fault can end the batch
+ maxBatchDelay: TimeSpan.FromMinutes(10), // delay trigger cannot fire in a ms-long test
+ NullLogger.Instance);
+
+ // Pre-load all three so the crashing (third) block shares ONE open batch with the first two
+ // (the inbox is never empty until the crash, so the drained trigger can't commit them early).
+ foreach (IBlock block in blocks)
+ {
+ await processor.EnqueueAsync(new NextResponse(NextResponseAction.RollForward, null, block), CancellationToken.None);
+ }
+ processor.Complete();
+
+ _ = await Assert.ThrowsAnyAsync(() => processor.RunAsync(CancellationToken.None));
+
+ // Whole-batch atomicity: the fault discarded the open batch โ none of the three blocks survived,
+ // including the two that ran cleanly before the crash.
+ Assert.Empty(backend.Committed);
+ }
+
+ [Fact]
+ public async Task DrainAtTip_CommitsAPartialBatch_WithoutWaitingForSizeOrDelay()
+ {
+ IBlock[] blocks = LoadBlocks(2);
+ if (blocks.Length < 2)
+ {
+ _output.WriteLine("SKIP: TestData/Blocks not present.");
+ return;
+ }
+
+ RecordingBackend backend = new();
+ ReducerGraphProcessor processor = new(
+ [new StagingReducer()],
+ new FakeUnitOfWorkFactory(backend),
+ channelCapacity: 64,
+ batchSize: 500, // far more than 2 โ the size trigger cannot fire
+ maxBatchDelay: TimeSpan.FromMinutes(10), // nor the delay trigger
+ NullLogger.Instance);
+
+ foreach (IBlock block in blocks)
+ {
+ await processor.EnqueueAsync(new NextResponse(NextResponseAction.RollForward, null, block), CancellationToken.None);
+ }
+ processor.Complete();
+ await processor.RunAsync(CancellationToken.None);
+
+ // Only the inbox-drained (at-tip) trigger could have committed โ both blocks landed promptly,
+ // in chain order, despite a batch size of 500.
+ ulong[] expected = [.. blocks.Select(b => b.Header().HeaderBody().Slot())];
+ Assert.Equal(expected, backend.Committed);
+ }
+
+ private static IBlock[] LoadBlocks(int count)
+ {
+ string testDataDir = Path.Combine(Directory.GetCurrentDirectory(), "TestData");
+ if (!Directory.Exists(Path.Combine(testDataDir, "Blocks")))
+ {
+ return [];
+ }
+ MockChainSyncProvider probe = new(testDataDir);
+ return [.. probe.AvailableBlocks.Take(count)];
+ }
+
+ // ----- In-memory unit of work: reducers stage slots; a commit moves them to the durable record,
+ // a rollback drops them. FlushAsync stays the default no-op so staged writes accumulate
+ // across the blocks of an open batch (read-your-own-writes), exactly like the real backends. -----
+
+ private sealed class RecordingBackend
+ {
+ public List Committed { get; } = [];
+ }
+
+ private sealed class TestStore
+ {
+ public List Staged { get; } = [];
+ }
+
+ private sealed class FakeUnitOfWorkFactory(RecordingBackend backend) : IBlockUnitOfWorkFactory
+ {
+ public Task CreateAsync(CancellationToken ct = default)
+ => Task.FromResult(new FakeUnitOfWork(backend));
+
+ public Task GetReducerStateAsync(string reducerName, CancellationToken ct = default)
+ => Task.FromResult(null);
+ }
+
+ private sealed class FakeUnitOfWork(RecordingBackend backend) : IBlockUnitOfWork
+ {
+ private readonly TestStore _store = new();
+ private readonly Dictionary _intersections = [];
+ private bool _marked;
+
+ public T GetStorage() where T : class
+ => _store as T ?? throw new InvalidCastException(typeof(T).Name);
+
+ public void TrackIntersection(string reducerName, Point point) => _intersections[reducerName] = point;
+
+ public void TrackRollback(string reducerName, ulong rollbackSlot) { }
+
+ public void MarkDataChanged() => _marked = true;
+
+ public IReadOnlyDictionary TrackedIntersections => _intersections;
+
+ public Task CommitAsync(bool deferIfEmpty = false, CancellationToken ct = default)
+ {
+ if (deferIfEmpty && _store.Staged.Count == 0 && !_marked)
+ {
+ return Task.FromResult(false); // empty batch โ defer (no durable write)
+ }
+ backend.Committed.AddRange(_store.Staged);
+ _store.Staged.Clear();
+ _marked = false;
+ return Task.FromResult(true);
+ }
+
+ public Task RollbackAsync(CancellationToken ct = default)
+ {
+ _store.Staged.Clear();
+ return Task.CompletedTask;
+ }
+
+ public ValueTask DisposeAsync() => ValueTask.CompletedTask;
+ }
+
+ private sealed class StagingReducer : IReducer
+ {
+ public Task RollForwardAsync(IBlock block, IBlockUnitOfWork uow, CancellationToken ct)
+ {
+ uow.GetStorage().Staged.Add(block.Header().HeaderBody().Slot());
+ return Task.CompletedTask;
+ }
+
+ public Task RollBackwardAsync(ulong slot, IBlockUnitOfWork uow, CancellationToken ct) => Task.CompletedTask;
+ }
+
+ private sealed class CrashOnSlotReducer(ulong crashSlot) : IReducer
+ {
+ public Task RollForwardAsync(IBlock block, IBlockUnitOfWork uow, CancellationToken ct)
+ => block.Header().HeaderBody().Slot() == crashSlot
+ ? throw new InvalidOperationException($"intentional crash at slot {crashSlot}")
+ : Task.CompletedTask;
+
+ public Task RollBackwardAsync(ulong slot, IBlockUnitOfWork uow, CancellationToken ct) => Task.CompletedTask;
+ }
+}
diff --git a/src/Argus.Sync/Providers/N2NProvider.cs b/src/Argus.Sync/Providers/N2NProvider.cs
index 3e89654..8f1f173 100644
--- a/src/Argus.Sync/Providers/N2NProvider.cs
+++ b/src/Argus.Sync/Providers/N2NProvider.cs
@@ -259,8 +259,8 @@ private static async IAsyncEnumerable DrainForwardsAsync(PeerClien
}
}
- /// Pipeline depth as a function of the gap (in slots) to the node's tip.
- private static int AdaptivePipelineDepth(int maxDepth, ulong tipGap)
+ /// Pipeline depth as a function of the gap (in slots) to the node's tip. Internal for unit testing.
+ internal static int AdaptivePipelineDepth(int maxDepth, ulong tipGap)
{
int target = tipGap switch
{