diff --git a/Cargo.lock b/Cargo.lock index 6e1bbaf..a996558 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1440,7 +1440,7 @@ version = "0.1.0" source = "git+https://github.com/starkware-libs/stwo-circuits?rev=2591775#2591775ae8fd7634eda7b77c471f87c163f65eb1" dependencies = [ "blake2", - "hashbrown 0.15.5", + "hashbrown 0.16.1", "indexmap", "itertools 0.12.1", "num-traits", @@ -1453,7 +1453,7 @@ version = "0.1.0" source = "git+https://github.com/starkware-libs/stwo-circuits?rev=2591775#2591775ae8fd7634eda7b77c471f87c163f65eb1" dependencies = [ "circuits", - "hashbrown 0.15.5", + "hashbrown 0.16.1", "indexmap", "itertools 0.12.1", "num-traits", @@ -2165,6 +2165,17 @@ version = "0.3.32" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7e3450815272ef58cec6d564423f6e755e25379b217b0bc688e295ba24df6b1d" +[[package]] +name = "futures-executor" +version = "0.3.32" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "baf29c38818342a3b26b5b923639e7b1f4a61fc5e76102d4b1981c6dc7a7579d" +dependencies = [ + "futures-core", + "futures-task", + "futures-util", +] + [[package]] name = "futures-task" version = "0.3.32" @@ -2316,7 +2327,6 @@ dependencies = [ "allocator-api2", "equivalent", "foldhash 0.1.5", - "serde", ] [[package]] @@ -4089,6 +4099,15 @@ dependencies = [ "winapi-util", ] +[[package]] +name = "scc" +version = "2.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "46e6f046b7fef48e2660c57ed794263155d713de679057f2d0c169bfc6e756cc" +dependencies = [ + "sdd", +] + [[package]] name = "schemars" version = "1.2.1" @@ -4120,6 +4139,12 @@ version = "1.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "94143f37725109f92c262ed2cf5e59bce7498c01bcc1502d7b9afe439a4e9f49" +[[package]] +name = "sdd" +version = "3.0.10" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "490dcfcbfef26be6800d11870ff2df8774fa6e86d047e3e8c8a76b25655e41ca" + [[package]] name = "semver" version = "0.11.0" @@ -4243,6 +4268,32 @@ dependencies = [ "serde", ] +[[package]] +name = "serial_test" +version = "3.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "911bd979bf1070a3f3aa7b691a3b3e9968f339ceeec89e08c280a8a22207a32f" +dependencies = [ + "futures-executor", + "futures-util", + "log", + "once_cell", + "parking_lot", + "scc", + "serial_test_derive", +] + +[[package]] +name = "serial_test_derive" +version = "3.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0a7d91949b85b0d2fb687445e448b40d322b6b3e4af6b44a29b21d9a5f33e6d9" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.117", +] + [[package]] name = "sha2" version = "0.9.9" @@ -4595,7 +4646,7 @@ dependencies = [ "dashmap", "educe 0.5.11", "fnv", - "hashbrown 0.15.5", + "hashbrown 0.16.1", "hex", "indexmap", "itertools 0.12.1", @@ -4654,7 +4705,7 @@ name = "stwo-constraint-framework" version = "2.1.0" source = "git+https://github.com/starkware-libs/stwo?rev=aeceb74c#aeceb74c58184d7886ebd7f34a7453fee714ca40" dependencies = [ - "hashbrown 0.15.5", + "hashbrown 0.16.1", "itertools 0.12.1", "num-traits", "rand 0.8.5", @@ -5335,6 +5386,7 @@ dependencies = [ "proptest", "serde", "serde_json", + "serial_test", "tempfile", "tezos-smart-rollup-encoding", "tezos_data_encoding 0.5.2", diff --git a/apps/wallet/Cargo.toml b/apps/wallet/Cargo.toml index 308544e..3745975 100644 --- a/apps/wallet/Cargo.toml +++ b/apps/wallet/Cargo.toml @@ -17,7 +17,7 @@ path = "src/bin/tzel-detect.rs" [dependencies] axum = "0.8" -clap = { version = "4", features = ["derive"] } +clap = { version = "4", features = ["derive", "env"] } hex = "0.4" ml-kem = { version = "=0.3.0-rc.2", features = ["getrandom"] } serde = { version = "1", features = ["derive"] } @@ -32,3 +32,8 @@ ureq = { version = "3", features = ["json"] } [dev-dependencies] proptest = "1" +# `serial_test` to gate the cooperative-yield tests that share the +# process-global `SYNC_CHECKPOINT_HOOK`. cargo-test runs in-binary tests +# in parallel; without serialization, one test's `set_sync_checkpoint_hook` +# can leak into another concurrent test's checkpoint callbacks. +serial_test = "3" diff --git a/apps/wallet/src/lib.rs b/apps/wallet/src/lib.rs index 39777a3..7579c68 100644 --- a/apps/wallet/src/lib.rs +++ b/apps/wallet/src/lib.rs @@ -1202,6 +1202,50 @@ fn wallet_xmss_floor_path(path: &str) -> PathBuf { PathBuf::from(format!("{}.xmss-floor", path)) } +/// Cooperative-yield sentinel for `cmd_rollup_sync`. The daemon (or any other +/// concurrent caller that wants to take the wallet lock for a slow op) can +/// `touch` this file to ask an in-flight sync to stop at its next checkpoint +/// boundary. Removed by the caller once its slow op completes. +fn wallet_yield_path(path: &str) -> PathBuf { + PathBuf::from(format!("{}.yield", path)) +} + +/// True iff the cooperative-yield sentinel at `path` was written by a +/// daemon process that no longer exists. Mirrors the existing +/// `is_stale_wallet_lock` discipline: a daemon that crashed after +/// `touch .yield` but before `rm` would otherwise leave a +/// permanent sentinel — every subsequent sync would yield at its +/// first checkpoint and never make progress. +/// +/// Forward-compatibility with daemons / tests that write non-PID +/// content to the sentinel: any read failure or parse failure +/// returns `Ok(false)` ("treat as live"). The legacy daemon shape +/// (and several existing tests, e.g. `std::fs::write(yield_path, +/// b"yield")`) still triggers the yield correctly. Once the daemon +/// PR catches up to writing `\n`, the recovery path activates +/// for stale sentinels without changing observed behaviour for +/// live ones. +/// +/// On non-unix targets we cannot probe `/proc`, so we always say +/// "live" — same conservative posture as `is_stale_wallet_lock`. +#[cfg(unix)] +fn is_stale_yield_sentinel(path: &Path) -> bool { + let content = match std::fs::read_to_string(path) { + Ok(s) => s, + Err(_) => return false, + }; + let pid: u32 = match content.lines().next().and_then(|l| l.trim().parse().ok()) { + Some(p) => p, + None => return false, // legacy / non-PID payload — treat as live + }; + !PathBuf::from(format!("/proc/{}", pid)).exists() +} + +#[cfg(not(unix))] +fn is_stale_yield_sentinel(_path: &Path) -> bool { + false +} + #[cfg(unix)] fn is_stale_wallet_lock(path: &Path) -> Result { let pid_text = std::fs::read_to_string(path).map_err(|e| format!("read lock file: {}", e))?; @@ -2167,10 +2211,6 @@ impl<'a> RollupRpc<'a> { }) } - fn load_nullifiers(&self) -> Result, String> { - self.load_nullifiers_at_block("head") - } - fn load_nullifiers_at_block(&self, block_ref: &str) -> Result, String> { let count: usize = self .read_u64_at_block(block_ref, DURABLE_NULLIFIER_COUNT)? @@ -2186,21 +2226,47 @@ impl<'a> RollupRpc<'a> { Ok(nullifiers) } - /// For each pending deposit pool, fetch the kernel-side current balance. + /// For each pending deposit pool, fetch the kernel-side current balance + /// at the **current rollup head**. Convenience wrapper for single-shot + /// callers that have no other rollup reads to consistency-pin against + /// — e.g. `cmd_wallet_show`'s end-of-banner pool summary. + /// + /// Callers performing **multiple** rollup reads (note slice, nullifiers, + /// pool balances) within one logical step MUST resolve `head_hash` once + /// at the top and call `load_pool_balances_at_block(&head, …)` so all + /// reads observe the same kernel state. The naming asymmetry + /// (`_at_head` vs `_at_block`) is deliberate so that re-introducing the + /// consistency bug (PR #24 audit, 2026-05) is loud at code review. + /// /// `None` (absent from the map) means the pool has never been credited /// (or has been fully drained — implementations may garbage-collect). - fn load_pool_balances( + fn load_pool_balances_at_head( &self, pending: &[PendingDeposit], ) -> Result, String> { let head = self.head_hash()?; + self.load_pool_balances_at_block(&head, pending) + } + + /// Pinned-block variant of `load_pool_balances`. Use this from + /// `cmd_rollup_sync` where the same `head_hash` is reused across the + /// note slice, the nullifier set, and the pool balances — re-resolving + /// `head` between the three reads would let a concurrent slow-lane + /// drain a pool between the note scan and the finalize, causing + /// `apply_scan_feed_finalize` to evict a `pending_deposit` whose + /// `shielded_cm` was never observed in the same run's `seen_cms`. + fn load_pool_balances_at_block( + &self, + block_ref: &str, + pending: &[PendingDeposit], + ) -> Result, String> { let mut map: std::collections::HashMap = std::collections::HashMap::new(); let mut seen: std::collections::HashSet = std::collections::HashSet::new(); for p in pending { if !seen.insert(p.pubkey_hash) { continue; } - if let Some(balance) = self.try_read_deposit_balance(&head, &p.pubkey_hash)? { + if let Some(balance) = self.try_read_deposit_balance(block_ref, &p.pubkey_hash)? { map.insert(p.pubkey_hash, balance); } } @@ -2308,7 +2374,11 @@ impl<'a> RollupRpc<'a> { Ok(()) } - fn load_state_snapshot(&self) -> Result { + /// Read the rollup state snapshot at the **current head**. Wrapper for + /// single-shot callers (proof-build paths). See + /// [`load_pool_balances_at_head`] for the rationale on the + /// `_at_head` / `_at_block` naming split. + fn load_state_snapshot_at_head(&self) -> Result { let head_hash = self.head_hash()?; self.load_state_snapshot_at_block(&head_hash) } @@ -3436,6 +3506,45 @@ enum UserCmd { /// Poll interval for `sync --watch`. #[arg(long, default_value_t = 5)] interval_secs: u64, + /// Checkpoint every N commits during a long sync. Each checkpoint + /// persists `wallet.json.scanned` + any newly-discovered notes + /// atomically and runs the cooperative-yield sentinel check + /// (`.yield`); a smaller N means the wallet daemon can + /// preempt the sync sooner when it needs the flock for a slow op + /// (shield / transfer / unshield), at the cost of more frequent + /// `save_wallet` round-trips. + /// + /// Default 50 is a reasoned starting point. Numbers in this PR's + /// description and `docs/sync-acceleration-design.md` are + /// estimates: ~0.7 % overhead per checkpoint at this granularity + /// vs the per-commit HTTP work, ~2 s of loss-on-interrupt the + /// next sync redoes cheaply, sub-ms sentinel stat(). **The K=50 + /// dial is orthogonal to the throughput dial** N (concurrent + /// HTTP reads, design doc §2.A) — `services/scan-bench/` in + /// tzel-infra measures the latter only. A `save_wallet` micro- + /// bench would refine THIS default. + /// + /// Don't drop below ~10 (the per-checkpoint `save_wallet` and + /// stat() begin to dominate the per-commit fetch work) or push + /// past ~500 (loss-on-interrupt becomes user-visible during + /// a long initial sync, defeating the cooperative-yield point). + /// On slow / synchronously-replicated storage (encrypted + /// overlays, network-mounted home-dirs), raise to 100–200 to + /// amortize the fsync. On fast operator boxes (NVMe, ext4, + /// no overlay) and when sub-second preemption matters, drop + /// to 20–30. `0` is rejected (would loop forever). + /// + /// Combines with `--watch`: each polling iteration honours + /// this value independently. + /// + /// Env var: `TZEL_SYNC_CHECKPOINT_EVERY` overrides the default + /// (the explicit CLI flag wins over the env if both are set). + #[arg( + long, + env = "TZEL_SYNC_CHECKPOINT_EVERY", + default_value_t = DEFAULT_CHECKPOINT_EVERY as u64, + )] + checkpoint_every: u64, }, /// Show local private balance plus live public bridge balance when configured. Balance, @@ -3780,12 +3889,18 @@ fn run_user(cli: UserCli) -> Result<(), String> { UserCmd::Sync { watch, interval_secs, + checkpoint_every, } => { let profile = load_required_network_profile(&cli.wallet)?; + // Cast u64 → usize: on 32-bit `as usize` would saturate, but + // the cmd_rollup_sync inner loop uses `std::cmp::min(.., + // tree_size)` so a saturated value just acts as "do the + // whole tree in one batch". Validity (>0) is checked there. + let checkpoint_every = checkpoint_every as usize; if watch { - cmd_rollup_sync_watch(&cli.wallet, &profile, interval_secs) + cmd_rollup_sync_watch(&cli.wallet, &profile, interval_secs, checkpoint_every) } else { - cmd_rollup_sync(&cli.wallet, &profile) + cmd_rollup_sync(&cli.wallet, &profile, checkpoint_every) } } UserCmd::Balance => cmd_user_balance(&cli.wallet), @@ -4986,6 +5101,89 @@ fn apply_scan_feed( } } +/// Cooperative-yield sync: process the feed for one batch of commits +/// (recover-only, no nullifier prune, no pending-pool prune). Pushes any +/// newly-recovered notes into `w.notes` and advances `w.scanned` to +/// `feed.next_cursor`. The full prune step is deferred to +/// `apply_scan_feed_finalize` after the loop completes — that way each +/// intermediate `save_wallet` lands a self-consistent file the next sync +/// resumes from, while the prune still sees the cumulative cm set. +/// +/// Returns the number of newly-recovered notes for the running tally and +/// extends `seen_cms` with every cm in this batch (including unrecovered +/// ones — see the cumulative-known-cm comment in `apply_scan_feed` for +/// why that defensive cm set matters). +fn apply_scan_feed_recover_batch( + w: &mut WalletFile, + feed: &NotesFeedResp, + seen_cms: &mut Vec, +) -> usize { + let mut found = 0usize; + let mut known_notes: std::collections::HashSet<(usize, F)> = + w.notes.iter().map(|n| (n.index, n.cm)).collect(); + for nm in &feed.notes { + seen_cms.push(nm.cm); + if let Some(note) = w.try_recover_note(nm) { + if known_notes.insert((note.index, note.cm)) { + println!( + " found: v={} cm={} index={}", + note.v, + short(¬e.cm), + note.index + ); + w.notes.push(note); + found += 1; + } + } + } + w.scanned = feed.next_cursor; + found +} + +/// Final pass after batched scanning: applies the nullifier prune and the +/// pending-pool prune using the cumulative `seen_cms` accumulated across +/// every batch, plus the wallet's own running `w.notes` set. Mirrors the +/// post-recovery half of `apply_scan_feed`. +fn apply_scan_feed_finalize( + w: &mut WalletFile, + seen_cms: &[F], + nullifiers: impl IntoIterator, + pool_balances: &std::collections::HashMap, +) -> ScanSummary { + let mut known_cms: std::collections::HashSet = + w.notes.iter().map(|n| n.cm).collect(); + known_cms.extend(seen_cms.iter().copied()); + + let nf_set: std::collections::HashSet = nullifiers.into_iter().collect(); + let before = w.notes.len(); + w.notes.retain(|n| !nf_set.contains(¬e_nullifier(n))); + let spent = before - w.notes.len(); + let before_pending = w.pending_spends.len(); + w.pending_spends + .retain(|pending| !pending.nullifiers.iter().all(|nf| nf_set.contains(nf))); + + let before_pools = w.pending_deposits.len(); + w.pending_deposits.retain(|p| { + let drained_on_chain = pool_balances.get(&p.pubkey_hash).copied().unwrap_or(0) == 0; + let cm_observed = p + .shielded_cm + .as_ref() + .map(|cm| known_cms.contains(cm)) + .unwrap_or(false); + !(drained_on_chain && cm_observed) + }); + let pruned_drained_pools = before_pools - w.pending_deposits.len(); + + ScanSummary { + // `found` is tallied by the caller across all batches. + found: 0, + spent, + confirmed_pending: before_pending - w.pending_spends.len(), + pool_balances: pool_balances.clone(), + pruned_drained_pools, + } +} + #[cfg(test)] mod tests { use super::*; @@ -7412,7 +7610,7 @@ fn cmd_user_balance(path: &str) -> Result<(), String> { if profile_path.exists() { let profile = load_network_profile(&profile_path)?; let rollup = RollupRpc::new(&profile); - let balances = rollup.load_pool_balances(&w.pending_deposits)?; + let balances = rollup.load_pool_balances_at_head(&w.pending_deposits)?; print_deposit_pool_summary(&w, &balances); } Ok(()) @@ -7477,7 +7675,13 @@ fn cmd_wallet_check(path: &str, profile: &WalletNetworkProfile) -> Result<(), St let auth_domain = snapshot.auth_domain; let tree_size = snapshot.tree.leaves.len(); let required_tx_fee = snapshot.required_tx_fee; - let balances = rollup.load_pool_balances(&wallet.pending_deposits)?; + // Pool balances must be read at the same pinned `head_hash` as the + // snapshot above; otherwise a slow-lane drain landing between the + // two reads would mis-classify a still-funded pool as + // "drained pending prune" in the printed banner. Same shape as the + // `cmd_rollup_sync` finalize fix (f3a0755). + let balances = + rollup.load_pool_balances_at_block(&head_hash, &wallet.pending_deposits)?; user_out!(json: { "wallet_file" => path }, human: "Wallet file: {}", path); user_out!( @@ -7533,19 +7737,243 @@ fn cmd_wallet_check(path: &str, profile: &WalletNetworkProfile) -> Result<(), St Ok(()) } -fn cmd_rollup_sync(path: &str, profile: &WalletNetworkProfile) -> Result<(), String> { +/// How many commits we scan between checkpoints (incremental save_wallet + +/// cooperative-yield check). +/// +/// 50 is a reasoned starting point pending empirical measurement: +/// * loss-on-interrupt is bounded to ~K commits ≈ a couple of seconds of +/// scan work, which the next sync redoes cheaply; +/// * persist overhead is ~0.7% of scan time at this granularity (one +/// atomic-rename + fsync per 50 commits, dwarfed by the 50 HTTP fetches); +/// * the per-checkpoint sentinel stat() is sub-ms. +/// +/// Numbers above are estimates; the `services/scan-bench/` POC in tzel-infra +/// is meant to refine them against a live `octez-smart-rollup-node`. Until +/// that runs, this default is the runtime-tunable fallback exposed via +/// `tzel-wallet sync --checkpoint-every N`. +/// +/// Don't drop below ~10 (overhead dominates) or push past ~500 (the +/// loss-on-interrupt becomes user-visible during a long initial sync). +const DEFAULT_CHECKPOINT_EVERY: usize = 50; + +/// Test hook: invoked at every checkpoint just before the sentinel check. +/// Production callers leave it `None`. Used by the resume / sentinel tests +/// below to inject "create the sentinel between commit K and commit 2K" +/// without racing on a real fs touch. +#[cfg(test)] +static SYNC_CHECKPOINT_HOOK: std::sync::OnceLock< + std::sync::Mutex>>, +> = std::sync::OnceLock::new(); + +#[cfg(test)] +fn set_sync_checkpoint_hook(hook: F) +where + F: Fn(&str, usize) + Send + 'static, +{ + let cell = SYNC_CHECKPOINT_HOOK.get_or_init(|| std::sync::Mutex::new(None)); + // Poison-tolerant: a previous test that panicked inside the hook + // (e.g. the panic-mid-batch coverage test) leaves the Mutex in a + // poisoned state. The protected data is just `Option>`, + // which is safe to overwrite — the panic didn't leave a half-set + // value behind. `into_inner()` recovers the guard regardless. + *cell.lock().unwrap_or_else(|e| e.into_inner()) = Some(Box::new(hook)); +} + +#[cfg(test)] +fn clear_sync_checkpoint_hook() { + if let Some(cell) = SYNC_CHECKPOINT_HOOK.get() { + *cell.lock().unwrap_or_else(|e| e.into_inner()) = None; + } +} + +fn fire_sync_checkpoint_hook(_path: &str, _scanned: usize) { + #[cfg(test)] + if let Some(cell) = SYNC_CHECKPOINT_HOOK.get() { + if let Some(hook) = cell + .lock() + .unwrap_or_else(|e| e.into_inner()) + .as_ref() + { + hook(_path, _scanned); + } + } +} + +/// Cooperative-yield sync. Scans `[w.scanned .. tree_size)` in batches of +/// `checkpoint_every` commits, persisting after each batch so a long initial +/// sync no longer holds the wallet lock for the full duration: any concurrent +/// caller that needs the lock for a slow op (shield / transfer / unshield / +/// withdraw) touches `.yield`, and the sync exits cleanly at its +/// next checkpoint with `w.scanned` reflecting work already on disk. The +/// next sync resumes from there. +/// +/// `checkpoint_every` is tunable via `tzel-wallet sync --checkpoint-every N`. +/// Default is `DEFAULT_CHECKPOINT_EVERY` (50). 0 is rejected (would loop +/// forever without progress). +/// +/// Atomicity: every checkpoint goes through `save_wallet`, which writes to +/// a temp file, fsyncs, atomic-renames, then fsyncs the parent dir. A kill -9 +/// between fsync and rename leaves the previous wallet.json intact — at most +/// one batch of scan work is lost, never corrupted. +fn cmd_rollup_sync( + path: &str, + profile: &WalletNetworkProfile, + checkpoint_every: usize, +) -> Result<(), String> { + if checkpoint_every == 0 { + return Err( + "--checkpoint-every must be > 0 (0 would loop forever without progress)".to_string(), + ); + } let mut w = load_wallet(path)?; let rollup = RollupRpc::new(profile); - let feed = rollup.load_notes_since(w.scanned).map_err(|e| { + + // Pin head once: every batch reads the same block so concurrent rollup + // progress does not race the cursor past the tree-size we're scanning to. + // Nullifier and pool-balance snapshots also read this same block, so the + // finalize prune sees a consistent view. + let head_hash = rollup.head_hash().map_err(|e| { format!( "sync failed: {}. Run `tzel-wallet check` for a fuller diagnosis.", e ) })?; - let nullifiers = rollup.load_nullifiers()?; - let pool_balances = rollup.load_pool_balances(&w.pending_deposits)?; - let summary = apply_scan_feed(&mut w, &feed, nullifiers, &pool_balances); + let tree_size: usize = rollup + .read_u64_at_block(&head_hash, DURABLE_TREE_SIZE) + .map_err(|e| { + format!( + "sync failed: {}. Run `tzel-wallet check` for a fuller diagnosis.", + e + ) + })? + .try_into() + .map_err(|_| "tree size does not fit in usize".to_string())?; + + if w.scanned > tree_size { + return Err(format!( + "wallet cursor {} is ahead of rollup tree size {}", + w.scanned, tree_size + )); + } + + let yield_path = wallet_yield_path(path); + let mut total_found = 0usize; + let mut seen_cms: Vec = Vec::new(); + let mut yielded = false; + + while w.scanned < tree_size { + // `saturating_add` defends against an adversarial + // `--checkpoint-every` (e.g. close to `usize::MAX`) overflowing + // here. The CLI flag is `u64` and clamps via `as usize` at + // dispatch; on 64-bit a worst-case `usize::MAX + w.scanned` + // would panic in debug / wrap in release. Saturating means the + // batch caps at `tree_size`, i.e. one final batch covers the + // whole remaining window — degenerate but safe. + let batch_end = w + .scanned + .saturating_add(checkpoint_every) + .min(tree_size); + // `load_notes_since_at_block` accepts a [cursor, tree_size) window + // implicitly via the pinned block's tree-size. To get a [cursor, + // batch_end) sub-window we fetch the slice manually. + let mut notes = Vec::with_capacity(batch_end - w.scanned); + for i in w.scanned..batch_end { + let bytes = rollup + .read_published_note_bytes_at_block(&head_hash, i as u64) + .map_err(|e| { + format!( + "sync failed: {}. Run `tzel-wallet check` for a fuller diagnosis.", + e + ) + })? + .ok_or_else(|| { + let key = indexed_durable_key(DURABLE_NOTE_PREFIX, i as u64); + format!( + "rollup durable state is missing note {} at {} while tree size is {}. This usually means the deployed rollup kernel does not persist published note payloads, or the rollup node is not serving the expected durable state.", + i, key, tree_size + ) + })?; + let (cm, enc) = canonical_wire::decode_published_note(&bytes)?; + notes.push(NoteMemo { index: i, cm, enc }); + } + let batch_feed = NotesFeedResp { + notes, + next_cursor: batch_end, + }; + total_found += apply_scan_feed_recover_batch(&mut w, &batch_feed, &mut seen_cms); + save_wallet(path, &w)?; + + // Cooperative-yield sentinel check. Stat() is sub-ms; transient fs + // errors ⇒ "not present" so we never spuriously exit on a flaky FS. + fire_sync_checkpoint_hook(path, w.scanned); + if yield_path.exists() { + // Stale-PID recovery: if the sentinel content names a daemon + // that no longer exists, the daemon crashed after touching + // the sentinel but before rm-ing it. Without this, every + // subsequent sync would exit at the first checkpoint + // forever. Mirror the existing WalletLock stale-PID + // discipline. On non-PID content (legacy daemons, tests + // that write `b"yield"`) treat as live and yield — + // forward-compat. + if is_stale_yield_sentinel(&yield_path) { + // Checked unlink: if removal keeps failing (read-only fs, + // immutable bit, dir owner mismatch), the next loop + // iteration would see the same stale sentinel, recover + // again, fail again, and spin a "removed stale sentinel" + // warning per remaining batch. Surface the failure once + // and abort the scan with a clear error so the operator + // can fix the FS state instead of grepping a log spam. + // Capture the dead PID before unlinking so the warning + // gives operators something to grep / correlate with + // their daemon-restart logs. Best-effort: if the read + // fails we still emit "(unknown)" — the "we recovered + // from a crash" signal is what matters. + let dead_pid = std::fs::read_to_string(&yield_path) + .ok() + .and_then(|s| s.lines().next().map(|l| l.trim().to_string())) + .filter(|s| !s.is_empty()) + .unwrap_or_else(|| "unknown".into()); + if let Err(e) = std::fs::remove_file(&yield_path) { + return Err(format!( + "stale yield sentinel at {} (daemon PID {} gone) but \ + could not be removed: {}. Fix fs permissions on the \ + wallet directory and retry.", + yield_path.display(), + dead_pid, + e + )); + } + eprintln!( + "warning: removed stale yield sentinel at {} (daemon PID \ + {} gone); continuing scan", + yield_path.display(), + dead_pid + ); + continue; + } + yielded = true; + break; + } + } + + // Final flush: even when we yielded, the prune is best-effort against + // whatever we did scan — running it once at the end (rather than every + // batch) keeps the overhead sub-1% on long initial syncs. If no batches + // were processed at all because tree_size==w.scanned, this still produces + // a coherent ScanSummary for the printed banner. + let nullifiers = rollup.load_nullifiers_at_block(&head_hash)?; + // Pool balances must read at the SAME pinned block as the note slice + // and the nullifier set; otherwise a slow-lane that drains a pool + // between the note scan and the finalize would silently evict a + // `pending_deposit` whose `shielded_cm` was never observed in this + // run's `seen_cms`. (`load_pool_balances_at_head` re-resolves head; + // the `_at_block` overload is the consistent variant.) + let pool_balances = + rollup.load_pool_balances_at_block(&head_hash, &w.pending_deposits)?; + let mut summary = apply_scan_feed_finalize(&mut w, &seen_cms, nullifiers, &pool_balances); + summary.found = total_found; save_wallet(path, &w)?; + let total_funded: u64 = summary.pool_balances.values().sum(); let mut pools_awaiting_credit = 0usize; let mut pools_drained_pending_scan = 0usize; @@ -7560,7 +7988,7 @@ fn cmd_rollup_sync(path: &str, profile: &WalletNetworkProfile) -> Result<(), Str } } println!( - "Synced: {} new notes, {} spent removed, {} pending confirmed, {} drained-pool entries pruned, private_available={}, pool_funded_total={}, pools_awaiting_credit={}, pools_drained_pending_scan={}", + "Synced: {} new notes, {} spent removed, {} pending confirmed, {} drained-pool entries pruned, private_available={}, pool_funded_total={}, pools_awaiting_credit={}, pools_drained_pending_scan={}{}", summary.found, summary.spent, summary.confirmed_pending, @@ -7569,6 +7997,11 @@ fn cmd_rollup_sync(path: &str, profile: &WalletNetworkProfile) -> Result<(), Str total_funded, pools_awaiting_credit, pools_drained_pending_scan, + if yielded { + " (yielded mid-scan; rerun sync to finish)" + } else { + "" + }, ); Ok(()) } @@ -7577,6 +8010,7 @@ fn cmd_rollup_sync_watch( path: &str, profile: &WalletNetworkProfile, interval_secs: u64, + checkpoint_every: usize, ) -> Result<(), String> { let interval = std::time::Duration::from_secs(interval_secs.max(1)); // Upstream patch ①: this is a loop; emit the banner on stderr when @@ -7593,7 +8027,7 @@ fn cmd_rollup_sync_watch( ); } loop { - cmd_rollup_sync(path, profile)?; + cmd_rollup_sync(path, profile, checkpoint_every)?; std::thread::sleep(interval); } } @@ -8806,7 +9240,7 @@ fn cmd_transfer_rollup( "recipient": to_path, }); let rollup = RollupRpc::new(profile); - let snapshot = rollup.load_state_snapshot()?; + let snapshot = rollup.load_state_snapshot_at_head()?; let fee = resolve_requested_tx_fee(fee, snapshot.required_tx_fee)?; ensure_positive_dal_fee(profile.dal_fee)?; let root = snapshot.current_root(); @@ -9052,7 +9486,7 @@ fn cmd_unshield_rollup( }); let rollup = RollupRpc::new(profile); let recipient = resolve_rollup_unshield_recipient(&rollup, recipient)?; - let snapshot = rollup.load_state_snapshot()?; + let snapshot = rollup.load_state_snapshot_at_head()?; let fee = resolve_requested_tx_fee(fee, snapshot.required_tx_fee)?; ensure_positive_dal_fee(profile.dal_fee)?; let root = snapshot.current_root(); @@ -11209,8 +11643,8 @@ mod network_profile_tests { }]; let balances = RollupRpc::new(&profile) - .load_pool_balances(&pending) - .expect("load_pool_balances should succeed"); + .load_pool_balances_at_head(&pending) + .expect("load_pool_balances_at_head should succeed"); assert_eq!(balances.get(&pubkey_hash), Some(&amount)); assert_eq!(balances.len(), 1); @@ -11328,4 +11762,1414 @@ mod network_profile_tests { .expect_err("second init must refuse"); assert!(err.contains("already exists"), "got: {err}"); } + + // ════════════════════════════════════════════════════════════════════ + // Cooperative-yield sync (DEFAULT_CHECKPOINT_EVERY = 50) + // ════════════════════════════════════════════════════════════════════ + + /// Spawn a trivial child, wait for it to exit, and return its + /// freshly-reaped PID. Used by stale-sentinel tests that need a + /// "definitely dead" PID without relying on `kernel.pid_max` lore. + /// Linux PID allocation is sequential, so the just-reaped PID will + /// not recycle within any realistic test runtime. + fn freshly_dead_pid() -> u32 { + let mut child = std::process::Command::new("true") + .stdin(std::process::Stdio::null()) + .stdout(std::process::Stdio::null()) + .stderr(std::process::Stdio::null()) + .spawn() + .expect("spawn `true` to acquire a soon-dead PID"); + let pid = child.id(); + child + .wait() + .expect("wait `true` so its PID is reaped before we use it"); + pid + } + + /// Build the minimal route table `cmd_rollup_sync` needs for a wallet + /// scanning [start, tree_size). `recoverable_at` selects which note + /// indices are recoverable by the test wallet (others are random + /// non-decryptable bytes — they exercise the "skip non-recoverable" + /// path of `try_recover_note`). All routes are pinned to `block_hash`. + fn sync_routes_for_window( + wallet: &WalletFile, + block_hash: &str, + tree_size: u64, + recoverable_at: &std::collections::HashMap, // index -> (value, rseed) + ) -> HashMap { + let mut routes: HashMap = HashMap::new(); + routes.insert( + "/global/block/head/hash".into(), + (200, format!("\"{}\"", block_hash)), + ); + // Length probes for "do we have a full DURABLE_NOTE_PREFIX entry?": + // we always serve via the direct key, so length = encoded.len(). + routes.insert( + format!( + "/global/block/{}/durable/wasm_2_0_0/value?key={}", + block_hash, DURABLE_TREE_SIZE + ), + (200, format!("\"{}\"", hex::encode(tree_size.to_le_bytes()))), + ); + routes.insert( + format!( + "/global/block/{}/durable/wasm_2_0_0/value?key={}", + block_hash, DURABLE_NULLIFIER_COUNT + ), + (200, format!("\"{}\"", hex::encode(0u64.to_le_bytes()))), + ); + + for i in 0..tree_size { + let key = indexed_durable_key(DURABLE_NOTE_PREFIX, i); + let bytes = if let Some((value, rseed)) = recoverable_at.get(&i) { + let nm = super::tests::note_memo_for_wallet_address( + wallet, 0, *value, *rseed, None, + ); + canonical_wire::encode_published_note(&nm.cm, &nm.enc) + .expect("published note encodes") + } else { + // Forge a "valid published note" wrapping a non-recoverable + // payload (random cm + zero-length enc). The wallet decodes + // it via canonical_wire then `try_recover_note` returns + // None — exactly the behaviour we want for filler notes. + let cm = felt_tag(format!("filler-cm-{}", i).as_bytes()); + let nm = super::tests::note_memo_for_wallet_address( + wallet, + 0, + 1, + felt_tag(format!("filler-rseed-{}", i).as_bytes()), + None, + ); + canonical_wire::encode_published_note(&cm, &nm.enc) + .expect("filler note encodes") + }; + routes.insert( + format!( + "/global/block/{}/durable/wasm_2_0_0/length?key={}", + block_hash, key + ), + (200, bytes.len().to_string()), + ); + routes.insert( + format!( + "/global/block/{}/durable/wasm_2_0_0/value?key={}", + block_hash, key + ), + (200, format!("\"{}\"", hex::encode(bytes))), + ); + } + routes + } + + /// Sentinel pre-existing → cmd_rollup_sync exits at the first checkpoint. + /// Confirms `w.scanned` advanced by exactly DEFAULT_CHECKPOINT_EVERY (the batch + /// boundary) when tree_size > DEFAULT_CHECKPOINT_EVERY, and reflects the full + /// tree_size when smaller. + #[test] + #[serial_test::serial(sync_checkpoint_hook)] + fn cmd_rollup_sync_exits_at_first_checkpoint_when_sentinel_present() { + let dir = tempfile::tempdir().expect("tempdir"); + let wallet_path = dir.path().join("wallet.json"); + let wallet_path_str = wallet_path.to_str().unwrap(); + + let mut wallet = super::tests::test_wallet(1); + wallet.scanned = 0; + save_wallet(wallet_path_str, &wallet).expect("save wallet"); + + let tree_size: u64 = (DEFAULT_CHECKPOINT_EVERY as u64) * 3; + let block_hash = "BLyieldsentinel"; + let routes = sync_routes_for_window( + &wallet, + block_hash, + tree_size, + &std::collections::HashMap::new(), + ); + + let base_url = super::tests::spawn_mock_http_server(routes); + let profile = super::tests::rollup_profile_for_url(&base_url); + + // Pre-create the sentinel: first checkpoint must exit cleanly. + let yield_path = wallet_yield_path(wallet_path_str); + std::fs::write(&yield_path, b"yield").expect("write sentinel"); + + cmd_rollup_sync(wallet_path_str, &profile, DEFAULT_CHECKPOINT_EVERY).expect("sync should yield without error"); + + let saved = load_wallet(wallet_path_str).expect("reload wallet"); + assert_eq!( + saved.scanned, DEFAULT_CHECKPOINT_EVERY, + "yielded sync should land exactly one checkpoint" + ); + + // Sentinel is the daemon's responsibility to remove; the CLI does + // not touch it on exit. + assert!(yield_path.exists(), "CLI must not remove caller's sentinel"); + } + + /// Resume after a mid-scan yield. Sets the sentinel via the test hook + /// after the second checkpoint, then resumes. The recoverable note at + /// index 7 / 75 is found in run 1 (pre-yield); the recoverable note at + /// index 140 (post-yield) is found in run 2. + /// + /// **Tautology guard** (PR-#24 coverage audit, post-B1): the second + /// run uses a *restricted* mock where notes `[0..trigger_at)` return + /// 503. The correct shape — `cmd_rollup_sync` reads `w.scanned` + /// (= trigger_at after the first run) and resumes from there — + /// touches only notes `[trigger_at..tree_size)`, all of which are + /// served. A regression that resets `w.scanned = 0` at the top of + /// `cmd_rollup_sync` (the auditor's mutation) would re-issue + /// fetches for the restricted range and the second `cmd_rollup_sync` + /// call would Err out, failing this test loudly. Without this + /// restriction, the original test passed under that mutation + /// because both shapes converge on `final_state.scanned == tree_size` + /// after enough work — it didn't actually verify "resumed from". + #[test] + #[serial_test::serial(sync_checkpoint_hook)] + fn cmd_rollup_sync_resumes_from_checkpointed_cursor() { + let dir = tempfile::tempdir().expect("tempdir"); + let wallet_path = dir.path().join("wallet.json"); + let wallet_path_str = wallet_path.to_str().unwrap(); + + let mut wallet = super::tests::test_wallet(1); + wallet.scanned = 0; + save_wallet(wallet_path_str, &wallet).expect("save wallet"); + + // Custom K=5 (not DEFAULT_CHECKPOINT_EVERY=50): the resume + // semantics are independent of K, and the smaller fixture + // means the per-note RPC + decode loop runs ~10x fewer times. + // CI runtime is bounded by the org-level workflow cap on + // ubuntu-latest; trimming this test from ~30s → ~3s buys + // headroom for the full unit-tests job. + let custom_k: usize = 5; + let tree_size: u64 = (custom_k as u64) * 3 + 2; + let block_hash = "BLresumecheckpoint"; + let trigger_at = custom_k * 2; + + let mut recoverable: std::collections::HashMap = + std::collections::HashMap::new(); + recoverable.insert(2, (101, felt_tag(b"resume-note-2"))); + recoverable.insert(7, (202, felt_tag(b"resume-note-7"))); + recoverable.insert(13, (303, felt_tag(b"resume-note-13"))); + + // Mock #1 — full route set for the first run. + let routes_full = + sync_routes_for_window(&wallet, block_hash, tree_size, &recoverable); + let url_full = super::tests::spawn_mock_http_server(routes_full); + let profile_full = super::tests::rollup_profile_for_url(&url_full); + + // Trip the sentinel after the second checkpoint (cursor reaches + // 2 * DEFAULT_CHECKPOINT_EVERY = 100). The hook fires at every checkpoint + // boundary just before the existence-check. + let yield_path = wallet_yield_path(wallet_path_str).to_path_buf(); + let yield_clone = yield_path.clone(); + set_sync_checkpoint_hook(move |_path, scanned| { + if scanned == trigger_at { + let _ = std::fs::write(&yield_clone, b"yield"); + } + }); + + cmd_rollup_sync(wallet_path_str, &profile_full, custom_k) + .expect("first sync should yield"); + + let mid = load_wallet(wallet_path_str).expect("reload mid wallet"); + assert_eq!( + mid.scanned, trigger_at, + "resume cursor must reflect the last completed checkpoint" + ); + let recovered_run1: std::collections::HashSet = + mid.notes.iter().map(|n| n.index).collect(); + assert!( + recovered_run1.contains(&2), + "note at index 2 must be recovered before yield" + ); + assert!( + recovered_run1.contains(&7), + "note at index 7 must be recovered before yield" + ); + assert!( + !recovered_run1.contains(&13), + "note at index 13 lies past the yield boundary and must NOT be recovered yet" + ); + + // Mock #2 — restricted: notes [0..trigger_at) return 503. The + // resume run MUST NOT re-fetch already-scanned indices; if it + // does (regression that ignores `w.scanned`), the 503 surfaces + // as an Err and the test fails. + let mut routes_restricted = + sync_routes_for_window(&wallet, block_hash, tree_size, &recoverable); + for i in 0..(trigger_at as u64) { + let note_key = indexed_durable_key(DURABLE_NOTE_PREFIX, i); + routes_restricted.insert( + format!( + "/global/block/{}/durable/wasm_2_0_0/length?key={}", + block_hash, note_key + ), + (503, "\"already scanned, must not be re-fetched\"".into()), + ); + routes_restricted.insert( + format!( + "/global/block/{}/durable/wasm_2_0_0/value?key={}", + block_hash, note_key + ), + (503, "\"already scanned, must not be re-fetched\"".into()), + ); + } + let url_restricted = + super::tests::spawn_mock_http_server(routes_restricted); + let profile_restricted = + super::tests::rollup_profile_for_url(&url_restricted); + + // Resume: clear the sentinel and the hook; second run completes. + clear_sync_checkpoint_hook(); + std::fs::remove_file(&yield_path).expect("remove sentinel"); + + cmd_rollup_sync(wallet_path_str, &profile_restricted, custom_k) + .expect("second sync should finish without re-fetching restricted indices"); + let final_state = load_wallet(wallet_path_str).expect("reload final wallet"); + assert_eq!( + final_state.scanned, tree_size as usize, + "second run must finish the scan" + ); + let recovered_final: std::collections::HashSet = + final_state.notes.iter().map(|n| n.index).collect(); + assert!(recovered_final.contains(&2)); + assert!(recovered_final.contains(&7)); + assert!( + recovered_final.contains(&13), + "note past the prior yield boundary must be recovered on resume" + ); + } + + /// Three-iteration yield-then-resume — mirrors `cmd_rollup_sync_watch`'s + /// loop body (`loop { cmd_rollup_sync(); sleep(); }`) without + /// invoking the loop directly (the watch fn has no clean escape + /// short of an error). Drives a sentinel that persists across two + /// consecutive iterations, then is removed before the third — + /// asserts cumulative state (`w.scanned`, `w.notes`) advances + /// correctly through repeated yields without corruption. + /// + /// The load-bearing property: `seen_cms` is per-`cmd_rollup_sync` + /// call (not process-global), but finalize reseeds `known_cms` with + /// `w.notes.iter().map(|n| n.cm)` — so notes recovered in iter 1 + /// remain observed for the iter 3 prune even though that iteration's + /// `seen_cms` only carries iter 3's batch. A regression that moved + /// `seen_cms` to a process-local accumulator (or stopped folding + /// `w.notes` into `known_cms`) would surface here. + #[test] + #[serial_test::serial(sync_checkpoint_hook)] + fn cmd_rollup_sync_watch_loop_body_yields_twice_then_finishes() { + let dir = tempfile::tempdir().expect("tempdir"); + let wallet_path = dir.path().join("wallet.json"); + let wallet_path_str = wallet_path.to_str().unwrap(); + + let mut wallet = super::tests::test_wallet(1); + wallet.scanned = 0; + save_wallet(wallet_path_str, &wallet).expect("save wallet"); + + // Custom K=5 (not DEFAULT_CHECKPOINT_EVERY=50): the + // three-iteration yield semantics are independent of K, but + // the per-note RPC + decode loop scales with `tree_size`. CI + // runtime is bounded by an org-level workflow cap on + // ubuntu-latest; trimming this test from ~70s → ~7s buys + // headroom. Same shape as the resume test above. + let custom_k: usize = 5; + let tree_size: u64 = (custom_k as u64) * 3 + 2; + let block_hash = "BLwatchloop"; + + // Recoverable notes spread across all three iterations so the + // test can assert progressive recovery. + let mut recoverable: std::collections::HashMap = + std::collections::HashMap::new(); + recoverable.insert(2, (101, felt_tag(b"watch-loop-note-iter1"))); + recoverable.insert( + custom_k as u64 + 1, + (202, felt_tag(b"watch-loop-note-iter2")), + ); + recoverable.insert( + (custom_k as u64) * 2 + 1, + (303, felt_tag(b"watch-loop-note-iter3")), + ); + + let routes = + sync_routes_for_window(&wallet, block_hash, tree_size, &recoverable); + let base_url = super::tests::spawn_mock_http_server(routes); + let profile = super::tests::rollup_profile_for_url(&base_url); + + // Sentinel persists for iter 1 and iter 2. Use legacy `b"yield"` + // content so the CLI treats it as live (no stale-PID unlink + // races to coordinate against in this test). + let yield_path = wallet_yield_path(wallet_path_str); + std::fs::write(&yield_path, b"yield").expect("seed sentinel"); + + // Iter 1: 0 → K, yield. + cmd_rollup_sync(wallet_path_str, &profile, custom_k) + .expect("iter 1 yields cleanly"); + let after_iter1 = load_wallet(wallet_path_str).expect("reload after iter 1"); + assert_eq!( + after_iter1.scanned, custom_k, + "iter 1 must yield exactly at the first checkpoint" + ); + let recovered_iter1: std::collections::HashSet = + after_iter1.notes.iter().map(|n| n.index).collect(); + assert!( + recovered_iter1.contains(&2), + "iter 1 must have recovered the in-batch note (index 2)" + ); + assert!( + yield_path.exists(), + "CLI does not unlink a live sentinel — daemon owns its lifecycle" + ); + + // Iter 2: K → 2K, yield. Sentinel still present. + cmd_rollup_sync(wallet_path_str, &profile, custom_k) + .expect("iter 2 yields cleanly with sentinel still live"); + let after_iter2 = load_wallet(wallet_path_str).expect("reload after iter 2"); + assert_eq!( + after_iter2.scanned, + custom_k * 2, + "iter 2 must advance the cursor by another K" + ); + let recovered_iter2: std::collections::HashSet = + after_iter2.notes.iter().map(|n| n.index).collect(); + assert!(recovered_iter2.contains(&2), "iter 1 note must persist"); + assert!( + recovered_iter2.contains(&(custom_k + 1)), + "iter 2 must have recovered its in-batch note" + ); + + // Iter 3: sentinel removed → scan finishes. + std::fs::remove_file(&yield_path).expect("remove sentinel before iter 3"); + cmd_rollup_sync(wallet_path_str, &profile, custom_k) + .expect("iter 3 finishes the scan"); + let final_state = load_wallet(wallet_path_str).expect("reload final"); + assert_eq!( + final_state.scanned, tree_size as usize, + "iter 3 must reach tree_size" + ); + let recovered_final: std::collections::HashSet = + final_state.notes.iter().map(|n| n.index).collect(); + assert!(recovered_final.contains(&2)); + assert!(recovered_final.contains(&(custom_k + 1))); + assert!( + recovered_final.contains(&(custom_k * 2 + 1)), + "iter 3 must have recovered the post-yield note" + ); + } + + /// Atomic-rename invariant: while the scan is running, a concurrent + /// reader observes only fully-formed wallet.json snapshots. Since + /// `save_wallet` writes to a temp file then atomic-renames, every read + /// must serde-parse cleanly — never a half-written file. + #[test] + #[serial_test::serial(sync_checkpoint_hook)] + fn cmd_rollup_sync_intermediate_wallet_is_always_parseable() { + let dir = tempfile::tempdir().expect("tempdir"); + let wallet_path = dir.path().join("wallet.json"); + let wallet_path_str = wallet_path.to_str().unwrap().to_string(); + + let mut wallet = super::tests::test_wallet(1); + wallet.scanned = 0; + save_wallet(&wallet_path_str, &wallet).expect("save wallet"); + + let tree_size: u64 = (DEFAULT_CHECKPOINT_EVERY as u64) * 4; + let block_hash = "BLatomicparse"; + let routes = sync_routes_for_window( + &wallet, + block_hash, + tree_size, + &std::collections::HashMap::new(), + ); + let base_url = super::tests::spawn_mock_http_server(routes); + let profile = super::tests::rollup_profile_for_url(&base_url); + + let stop = std::sync::Arc::new(std::sync::atomic::AtomicBool::new(false)); + let stop_reader = stop.clone(); + let reader_path = wallet_path_str.clone(); + let reader = std::thread::spawn(move || { + let mut reads = 0usize; + while !stop_reader.load(std::sync::atomic::Ordering::SeqCst) { + if let Ok(data) = std::fs::read_to_string(&reader_path) { + let parsed: Result = serde_json::from_str(&data); + assert!( + parsed.is_ok(), + "intermediate wallet.json must always parse: {:?}", + parsed.err() + ); + reads += 1; + } + std::thread::yield_now(); + } + reads + }); + + cmd_rollup_sync(&wallet_path_str, &profile, DEFAULT_CHECKPOINT_EVERY).expect("sync should finish"); + stop.store(true, std::sync::atomic::Ordering::SeqCst); + let reads = reader.join().expect("reader thread"); + assert!( + reads > 0, + "reader thread must have observed at least one snapshot" + ); + + let final_state = load_wallet(&wallet_path_str).expect("reload final wallet"); + assert_eq!(final_state.scanned, tree_size as usize); + } + + /// `cmd_rollup_sync` rejects `checkpoint_every = 0` early with a + /// clear error message. Without this guard the inner `while + /// w.scanned < tree_size` loop would compute `batch_end = + /// w.scanned + 0 = w.scanned`, do zero work, never advance, and + /// loop forever — a footgun in the CLI flag default that's + /// trivially bypassable. Locks the validation contract so a future + /// refactor can't silently drop the check. + #[test] + #[serial_test::serial(sync_checkpoint_hook)] + fn cmd_rollup_sync_refuses_zero_checkpoint_every() { + let dir = tempfile::tempdir().expect("tempdir"); + let wallet_path = dir.path().join("wallet.json"); + let wallet_path_str = wallet_path.to_str().expect("utf8 path"); + + let wallet = super::tests::test_wallet(1); + save_wallet(wallet_path_str, &wallet).expect("save wallet"); + + // We don't even need a live mock-rollup — the validation must + // fire before any RPC call. Use a profile pointing at a + // deliberately-unreachable port to make this guarantee + // explicit (a regression that lets the loop start would hang + // on the connect, not pass). + let profile = super::tests::rollup_profile_for_url("http://127.0.0.1:1"); + + let err = cmd_rollup_sync(wallet_path_str, &profile, 0) + .expect_err("checkpoint_every = 0 must be rejected"); + assert!( + err.contains("--checkpoint-every must be > 0"), + "expected clear validation error, got: {err}", + ); + } + + /// `cmd_rollup_sync` honours the runtime `checkpoint_every` parameter, + /// not a hardcoded constant. Trips the sentinel via the test hook + /// after the second batch boundary at K=7 (cursor=14) and asserts + /// `wallet.json.scanned == 14` exactly. Without this test, a future + /// regression that ignored the parameter and used `DEFAULT_CHECKPOINT_EVERY` + /// internally would still pass every other cooperative-yield test + /// (they all use the default). The `serial` attribute groups it + /// with the other hook-using tests so the process-global + /// `SYNC_CHECKPOINT_HOOK` is not raced. + #[test] + #[serial_test::serial(sync_checkpoint_hook)] + fn cmd_rollup_sync_respects_custom_checkpoint_every() { + let dir = tempfile::tempdir().expect("tempdir"); + let wallet_path = dir.path().join("wallet.json"); + let wallet_path_str = wallet_path.to_str().unwrap(); + + let mut wallet = super::tests::test_wallet(1); + wallet.scanned = 0; + save_wallet(wallet_path_str, &wallet).expect("save wallet"); + + let custom_k: usize = 7; + let trigger_at: usize = custom_k * 2; + let tree_size: u64 = (custom_k as u64) * 5; + let block_hash = "BLcustomk"; + + let recoverable = std::collections::HashMap::new(); + let routes = sync_routes_for_window(&wallet, block_hash, tree_size, &recoverable); + let base_url = super::tests::spawn_mock_http_server(routes); + let profile = super::tests::rollup_profile_for_url(&base_url); + + let yield_path = wallet_yield_path(wallet_path_str).to_path_buf(); + let yield_clone = yield_path.clone(); + set_sync_checkpoint_hook(move |_path, scanned| { + if scanned == trigger_at { + let _ = std::fs::write(&yield_clone, b"yield"); + } + }); + + cmd_rollup_sync(wallet_path_str, &profile, custom_k) + .expect("sync should yield without error"); + + let saved = load_wallet(wallet_path_str).expect("reload wallet"); + assert_eq!( + saved.scanned, trigger_at, + "yielded sync at K={custom_k} must land exactly on the K-aligned cursor" + ); + + clear_sync_checkpoint_hook(); + let _ = std::fs::remove_file(&yield_path); + } + + /// HTTP error mid-batch: a 5xx from the rollup-rpc partway through + /// the scan must (a) propagate as Err to the caller, (b) leave + /// `wallet.json.scanned` at the last successful checkpoint + /// boundary (NOT at the failed cursor — that would risk a corrupt + /// JSON write), and (c) not touch the sentinel either way (the + /// daemon owns sentinel lifecycle, not the CLI). + /// + /// Walks-through: K=10, tree_size=35. Mock returns 503 on the + /// note value lookup at index 15 (within the second batch + /// [10..20)). First batch [0..10) succeeds and persists + /// `scanned = 10`. Second batch fails on its 6th iteration; the + /// in-RAM `w.scanned = 10` is never re-saved with a higher + /// value. Caller sees Err. wallet.json on disk shows scanned=10. + #[test] + #[serial_test::serial(sync_checkpoint_hook)] + fn cmd_rollup_sync_http_error_mid_batch_preserves_last_checkpoint() { + let dir = tempfile::tempdir().expect("tempdir"); + let wallet_path = dir.path().join("wallet.json"); + let wallet_path_str = wallet_path.to_str().unwrap(); + + let mut wallet = super::tests::test_wallet(1); + wallet.scanned = 0; + save_wallet(wallet_path_str, &wallet).expect("save wallet"); + + let custom_k: usize = 10; + let tree_size: u64 = 35; + let block_hash = "BLhttperror"; + + let recoverable = std::collections::HashMap::new(); + let mut routes = + sync_routes_for_window(&wallet, block_hash, tree_size, &recoverable); + + // Override one route in the second batch [10..20) to 503. + // The CLI fetches length-then-value; failing the value read + // is enough — the inner `?` aborts the whole sync. + let bad_key = indexed_durable_key(DURABLE_NOTE_PREFIX, 15); + routes.insert( + format!( + "/global/block/{}/durable/wasm_2_0_0/value?key={}", + block_hash, bad_key + ), + (503, "{\"error\":\"Service Unavailable\"}".to_string()), + ); + + let base_url = super::tests::spawn_mock_http_server(routes); + let profile = super::tests::rollup_profile_for_url(&base_url); + + // Pre-create a sentinel-NOT marker: assert the CLI doesn't + // create the sentinel as part of error handling. We start + // with no sentinel; verify it's still absent after the run. + let yield_path = wallet_yield_path(wallet_path_str).to_path_buf(); + assert!(!yield_path.exists(), "sentinel must not pre-exist"); + + let err = cmd_rollup_sync(wallet_path_str, &profile, custom_k) + .expect_err("sync must error on the 503"); + assert!( + err.contains("sync failed") || err.contains("durable") || err.contains("HTTP"), + "error must reference the underlying RPC failure, got: {err}", + ); + + // Last successful checkpoint = first batch boundary (10). + let saved = load_wallet(wallet_path_str).expect("reload wallet"); + assert_eq!( + saved.scanned, custom_k, + "scanned must reflect the last fully-completed batch (K={custom_k}), \ + not the cursor where the error fired" + ); + + // CLI must NOT have touched the sentinel — daemon owns that. + assert!( + !yield_path.exists(), + "CLI must not create sentinel on error paths" + ); + } + + /// Panic mid-batch: a panic from the test hook (simulating any + /// unexpected unwind during the scan loop, e.g. a future Drop + /// that touches FS state) must NOT corrupt wallet.json. The + /// atomic-rename invariant guarantees the prior on-disk state + /// survives intact. After the panic the wallet is reloadable + /// and `scanned` reflects the last completed checkpoint. + /// + /// Drives: panic via `set_sync_checkpoint_hook` after the second + /// checkpoint (cursor=20 with K=10). Use `catch_unwind` to + /// observe the panic without aborting the test. Then reload + /// wallet.json and assert it parses + cursor==20. + #[test] + #[serial_test::serial(sync_checkpoint_hook)] + fn cmd_rollup_sync_panic_mid_batch_preserves_wallet_json() { + let dir = tempfile::tempdir().expect("tempdir"); + let wallet_path = dir.path().join("wallet.json"); + let wallet_path_str = wallet_path.to_str().unwrap().to_string(); + + let mut wallet = super::tests::test_wallet(1); + wallet.scanned = 0; + save_wallet(&wallet_path_str, &wallet).expect("save wallet"); + + let custom_k: usize = 10; + let tree_size: u64 = 35; + let block_hash = "BLpanic"; + + let recoverable = std::collections::HashMap::new(); + let routes = sync_routes_for_window(&wallet, block_hash, tree_size, &recoverable); + let base_url = super::tests::spawn_mock_http_server(routes); + let profile = super::tests::rollup_profile_for_url(&base_url); + + let panic_at: usize = custom_k * 2; + set_sync_checkpoint_hook(move |_path, scanned| { + if scanned == panic_at { + panic!("simulated mid-batch unwind at scanned={scanned}"); + } + }); + + let path_for_thread = wallet_path_str.clone(); + let result = std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| { + cmd_rollup_sync(&path_for_thread, &profile, custom_k) + })); + clear_sync_checkpoint_hook(); + + assert!( + result.is_err(), + "the hook must propagate panic out of cmd_rollup_sync" + ); + + // Wallet must still be parseable + cursor at the last + // completed checkpoint (the panic fires AFTER the + // save_wallet at scanned=panic_at, so panic_at IS the + // committed value). + let saved = load_wallet(&wallet_path_str).expect("wallet must still parse"); + assert_eq!( + saved.scanned, panic_at, + "panic after checkpoint write must preserve the just-written cursor" + ); + } + + /// Stale-PID sentinel recovery: a daemon that crashed after + /// `touch .yield` but before `rm` would otherwise leave + /// a permanent sentinel. Every subsequent sync would see it, + /// exit at the first checkpoint, make K commits of progress, + /// then exit again — wedged forever, identical to the + /// stale-WalletLock failure mode the existing + /// `is_stale_wallet_lock` recovery solves. Mirror the same + /// discipline: the daemon writes its PID into the sentinel; the + /// CLI checks `/proc/` and unlinks if dead. + /// + /// This test pre-creates the sentinel containing a guaranteed-dead + /// PID (1 = init, never the daemon; in a sandbox it's PID 1 of the + /// container, which on cargo-test is /proc/1 = the test runner — + /// so we use a high PID that's definitely not running). Then runs + /// sync against a small tree (K * 2). The CLI must remove the + /// sentinel, scan everything, and finish with `scanned == tree_size`. + #[test] + #[serial_test::serial(sync_checkpoint_hook)] + fn cmd_rollup_sync_recovers_from_stale_yield_sentinel() { + let dir = tempfile::tempdir().expect("tempdir"); + let wallet_path = dir.path().join("wallet.json"); + let wallet_path_str = wallet_path.to_str().unwrap(); + + let mut wallet = super::tests::test_wallet(1); + wallet.scanned = 0; + save_wallet(wallet_path_str, &wallet).expect("save wallet"); + + let custom_k: usize = 10; + let tree_size: u64 = 25; + let block_hash = "BLstalepid"; + + let recoverable = std::collections::HashMap::new(); + let routes = sync_routes_for_window(&wallet, block_hash, tree_size, &recoverable); + let base_url = super::tests::spawn_mock_http_server(routes); + let profile = super::tests::rollup_profile_for_url(&base_url); + + // Pre-create the sentinel with a deliberately-dead PID. Spawn + // a trivial child, wait for it to exit, and use its + // freshly-reaped PID. Linux PID allocation is sequential — + // a just-reaped PID won't recycle until the allocator wraps + // (millions of fork()s away), so this is robust regardless of + // `kernel.pid_max` (default 32_768 on dev boxes, up to 4M on + // container hosts where a hardcoded `999999` could be live). + let yield_path = wallet_yield_path(wallet_path_str); + let dead_pid = freshly_dead_pid(); + std::fs::write(&yield_path, format!("{}\n", dead_pid)) + .expect("write stale sentinel"); + assert!(yield_path.exists(), "sentinel must be present pre-sync"); + + cmd_rollup_sync(wallet_path_str, &profile, custom_k) + .expect("sync should complete after recovering from stale sentinel"); + + // Scan ran to completion despite the pre-existing sentinel. + let saved = load_wallet(wallet_path_str).expect("reload wallet"); + assert_eq!( + saved.scanned, tree_size as usize, + "stale sentinel must not stop the sync; scan must reach tree_size" + ); + // CLI removed the stale sentinel during recovery. + assert!( + !yield_path.exists(), + "stale sentinel must be unlinked during recovery" + ); + } + + /// Forward-compat for legacy / non-PID sentinel content (e.g. + /// existing tests that write `b"yield"`, or a daemon that hasn't + /// caught up to writing PIDs yet). Non-numeric content must be + /// treated as "live" — the CLI yields normally, just like the + /// pre-stale-recovery behaviour. Without this, deploying the new + /// CLI ahead of the new daemon would mis-classify every legacy + /// sentinel as "stale" and break preemption. + #[test] + #[serial_test::serial(sync_checkpoint_hook)] + fn cmd_rollup_sync_treats_legacy_sentinel_content_as_live() { + let dir = tempfile::tempdir().expect("tempdir"); + let wallet_path = dir.path().join("wallet.json"); + let wallet_path_str = wallet_path.to_str().unwrap(); + + let mut wallet = super::tests::test_wallet(1); + wallet.scanned = 0; + save_wallet(wallet_path_str, &wallet).expect("save wallet"); + + let custom_k: usize = 10; + let tree_size: u64 = 30; + let block_hash = "BLlegacy"; + + let recoverable = std::collections::HashMap::new(); + let routes = sync_routes_for_window(&wallet, block_hash, tree_size, &recoverable); + let base_url = super::tests::spawn_mock_http_server(routes); + let profile = super::tests::rollup_profile_for_url(&base_url); + + let yield_path = wallet_yield_path(wallet_path_str); + // Non-numeric: a real daemon hasn't migrated to PID-content yet, + // OR a test that pre-dates the recovery feature. Either way, + // CLI must yield (treat as live). + std::fs::write(&yield_path, b"yield").expect("write legacy sentinel"); + + cmd_rollup_sync(wallet_path_str, &profile, custom_k) + .expect("sync should yield without error on legacy sentinel"); + + let saved = load_wallet(wallet_path_str).expect("reload wallet"); + assert_eq!( + saved.scanned, custom_k, + "legacy sentinel must trigger yield at the first checkpoint" + ); + assert!( + yield_path.exists(), + "CLI must NOT unlink a live (non-PID) sentinel — the daemon owns lifecycle" + ); + } + + /// `--watch` + `--checkpoint-every 0`: the validation in + /// cmd_rollup_sync fires on the FIRST iteration (before any RPC + /// call), the `?` propagates, and `cmd_rollup_sync_watch`'s + /// outer `loop { cmd_rollup_sync(...)?; sleep; }` aborts cleanly. + /// No infinite loop, no spurious progress, error message + /// matches the validation surface. + #[test] + #[serial_test::serial(sync_checkpoint_hook)] + fn cmd_rollup_sync_watch_with_zero_checkpoint_every_aborts_first_iter() { + let dir = tempfile::tempdir().expect("tempdir"); + let wallet_path = dir.path().join("wallet.json"); + let wallet_path_str = wallet_path.to_str().unwrap(); + + let wallet = super::tests::test_wallet(1); + save_wallet(wallet_path_str, &wallet).expect("save wallet"); + + // Unreachable RPC URL — the validation must fire BEFORE any + // RPC call. If the loop ever reaches the outer iteration + // sleep, we'd deadline rather than fail-fast on the + // unreachable URL: the test would hang well past its + // budget. Asserts the validation is genuinely pre-RPC. + let profile = super::tests::rollup_profile_for_url("http://127.0.0.1:1"); + + // interval_secs=1 because the loop only sleeps if + // cmd_rollup_sync returns Ok — which it must NOT here. + let err = cmd_rollup_sync_watch(wallet_path_str, &profile, 1, 0) + .expect_err("watch + K=0 must abort before sleeping"); + assert!( + err.contains("--checkpoint-every must be > 0"), + "watch must surface the same validation error as a one-shot sync, \ + got: {err}", + ); + } + + /// `load_pool_balances_at_block` must read pool balances against the + /// caller-provided block_ref, NOT re-resolve `head` internally. This + /// is the consistency invariant `cmd_rollup_sync`'s finalize relies + /// on: nullifiers + pool balances must come from the same pinned + /// head as the note slice, otherwise a slow-lane drain that lands + /// between reads would silently evict a `pending_deposit` whose + /// `shielded_cm` was never observed in the same run. + /// + /// Drives two block hashes pointing at different pool-balance + /// values: the test asserts `_at_block(BL_old)` returns the OLD + /// value even when "head" would resolve to BL_new. A regression + /// that calls `head_hash()` inside the helper picks up NEW and + /// fails this test loudly. + #[test] + fn rollup_rpc_load_pool_balances_at_block_uses_pinned_block() { + use std::collections::HashMap; + let pkh: F = felt_tag(b"pool-pinned-test"); + let pkh_hex = hex::encode(pkh); + let key = format!("{}{}", DURABLE_DEPOSIT_BALANCE_PREFIX, pkh_hex); + + let block_old = "BLpinnedold"; + let block_new = "BLpinnednew"; + let value_old: u64 = 1_000_000; + let value_new: u64 = 5_000_000; + let bytes_old = value_old.to_le_bytes(); + let bytes_new = value_new.to_le_bytes(); + + let mut routes: HashMap = HashMap::new(); + // Head resolves to NEW: a regression that calls `self.head_hash()` + // inside the helper would pick this up and read NEW value + // (5_000_000), not OLD (1_000_000). + routes.insert( + "/global/block/head/hash".into(), + (200, format!("\"{}\"", block_new)), + ); + routes.insert( + format!( + "/global/block/{}/durable/wasm_2_0_0/length?key={}", + block_old, key + ), + (200, "8".into()), + ); + routes.insert( + format!( + "/global/block/{}/durable/wasm_2_0_0/value?key={}", + block_old, key + ), + (200, format!("\"{}\"", hex::encode(bytes_old))), + ); + routes.insert( + format!( + "/global/block/{}/durable/wasm_2_0_0/length?key={}", + block_new, key + ), + (200, "8".into()), + ); + routes.insert( + format!( + "/global/block/{}/durable/wasm_2_0_0/value?key={}", + block_new, key + ), + (200, format!("\"{}\"", hex::encode(bytes_new))), + ); + + let base_url = super::tests::spawn_mock_http_server(routes); + let profile = super::tests::rollup_profile_for_url(&base_url); + let rpc = RollupRpc::new(&profile); + let pending = vec![PendingDeposit { + pubkey_hash: pkh, + blind: felt_tag(b"pool-pinned-blind"), + address_index: 0, + auth_domain: felt_tag(b"auth-domain-pinned"), + amount: 0, + operation_hash: None, + shielded_cm: None, + }]; + + let pinned = rpc + .load_pool_balances_at_block(block_old, &pending) + .expect("load_pool_balances_at_block must succeed"); + let pinned_value = pinned.get(&pkh).copied(); + assert_eq!( + pinned_value, + Some(value_old), + "_at_block must read the pinned block, not re-resolve head" + ); + } + + /// Mirror of `rollup_rpc_load_pool_balances_at_block_uses_pinned_block` + /// for `load_state_snapshot_at_block`. `cmd_wallet_check` reads both + /// helpers against the same captured `head_hash`; this test locks + /// the snapshot helper's pinned-block invariant the same way the + /// pool-balance test locks the balance helper's. Without it, a + /// regression that re-resolves head inside + /// `load_state_snapshot_at_block` would break the banner (mixed + /// auth-domain / required-fee snapshot vs pool view) and slip past + /// CI. + #[test] + fn rollup_rpc_load_state_snapshot_at_block_uses_pinned_block() { + use std::collections::HashMap; + let block_old = "BLsnapold"; + let block_new = "BLsnapnew"; + let auth_domain_old: F = felt_tag(b"auth-domain-snap-old"); + let auth_domain_new: F = felt_tag(b"auth-domain-snap-new"); + let empty_root: F = MerkleTree::from_leaves(vec![]).root(); + + let mut routes: HashMap = HashMap::new(); + // Head resolves to NEW. A regression that calls `self.head_hash()` + // inside `load_state_snapshot_at_block` would pick up NEW and + // surface `auth_domain_new` instead of `auth_domain_old`. + routes.insert( + "/global/block/head/hash".into(), + (200, format!("\"{}\"", block_new)), + ); + + for (block, dom) in [ + (block_old, auth_domain_old), + (block_new, auth_domain_new), + ] { + // block_level — both blocks at level 100; required_tx_fee + // chain falls back to 0 because LAST_INPUT_LEVEL is null. + routes.insert( + format!("/global/block/{}/level", block), + (200, "100".into()), + ); + // tree_size = 0 ⇒ no per-note URL routes needed. + routes.insert( + format!( + "/global/block/{}/durable/wasm_2_0_0/value?key={}", + block, DURABLE_TREE_SIZE + ), + (200, format!("\"{}\"", hex::encode(0u64.to_le_bytes()))), + ); + // tree_root = empty-tree root (matches what + // `load_state_snapshot_at_block` recomputes from zero notes). + routes.insert( + format!( + "/global/block/{}/durable/wasm_2_0_0/value?key={}", + block, DURABLE_TREE_ROOT + ), + (200, format!("\"{}\"", hex::encode(empty_root))), + ); + // auth_domain — DIFFERENT between blocks (the differentiator). + routes.insert( + format!( + "/global/block/{}/durable/wasm_2_0_0/value?key={}", + block, DURABLE_AUTH_DOMAIN + ), + (200, format!("\"{}\"", hex::encode(dom))), + ); + // Optional reads return null ⇒ required_tx_fee = 0. + for opt_key in [ + DURABLE_LAST_INPUT_LEVEL, + DURABLE_PRIVATE_TX_FEE_LEVEL, + DURABLE_PRIVATE_TX_COUNT_IN_LEVEL, + ] { + routes.insert( + format!( + "/global/block/{}/durable/wasm_2_0_0/length?key={}", + block, opt_key + ), + (200, "null".into()), + ); + } + } + + let base_url = super::tests::spawn_mock_http_server(routes); + let profile = super::tests::rollup_profile_for_url(&base_url); + let rpc = RollupRpc::new(&profile); + + let snap = rpc + .load_state_snapshot_at_block(block_old) + .expect("load_state_snapshot_at_block must succeed"); + assert_eq!( + snap.auth_domain, auth_domain_old, + "_at_block must read auth_domain at the pinned block, not the re-resolved head" + ); + assert_eq!( + snap.tree.leaves.len(), + 0, + "tree_size = 0 ⇒ snapshot has no notes" + ); + // Note: `required_tx_fee` is the same for both blocks (idle + // fee-level chain ⇒ count=0 ⇒ base fee), so it can't + // distinguish pinned-vs-re-resolved here. The auth_domain + // assertion above is the load-bearing one. + } + + /// E2E regression for the `cmd_rollup_sync` finalize call site. + /// Drives a full sync against a stateful mock whose `/head/hash` + /// route returns `block_old` on the first read (so the function + /// pins to it) and `block_new` on every subsequent read. The + /// scan must drive at least one batch through the loop AND the + /// `pending_deposit.shielded_cm` must match a cm observed in + /// that batch's `seen_cms`, otherwise the eviction predicate + /// `drained_on_chain && cm_observed` cannot fire and the test + /// is tautological (cf. PR-#24 review B1: an earlier draft of + /// this test had `tree_size = 0` + `shielded_cm = None` and + /// passed under the regression — load-bearing assertion was + /// dead). + /// + /// Setup driving the load-bearing path: + /// - tree_size = 1, one published note at index 0 wrapping + /// `observed_cm` (a stable felt the test pre-computes). + /// - `wallet.pending_deposits[0].shielded_cm = Some(observed_cm)` + /// so finalize evaluates `cm_observed = true`. + /// - block_old: pool funded at 1 ꜩ (the truth pinned by sync). + /// - block_new: pool drained (length null, returns 0 ⇒ + /// `drained_on_chain = true`). + /// + /// Correct shape: head pinned to block_old at the top, pool + /// reads at block_old → funded → deposit retained. Counter + /// resolves head exactly once. + /// + /// Regression (`load_pool_balances_at_head` at the finalize + /// call site): head re-resolved during finalize → block_new + /// → pool drained → `drained_on_chain && cm_observed` ⇒ true ⇒ + /// deposit evicted → `pending_deposits.len() == 0` ⇒ assertion + /// fails. Verified mechanically by reverting line 7972 of the + /// commit being asserted, running this test, observing failure, + /// then restoring. + #[test] + #[serial_test::serial(sync_checkpoint_hook)] + fn cmd_rollup_sync_pins_pool_reads_to_finalize_head() { + use std::collections::HashMap; + use std::net::TcpListener; + use std::sync::atomic::{AtomicUsize, Ordering}; + use std::sync::Arc; + + let block_old = "BLfinalizeold"; + let block_new = "BLfinalizenew"; + + let dir = tempfile::tempdir().expect("tempdir"); + let wallet_path = dir.path().join("wallet.json"); + let wallet_path_str = wallet_path.to_str().unwrap(); + + // Pre-compute the cm we will publish at index 0 AND wire as + // the deposit's `shielded_cm`. This is the load-bearing + // coupling: the batch scan pushes `observed_cm` into + // `seen_cms`, finalize sees `cm_observed = true`, and pool + // reads at the regression's `block_new` (drained) would then + // satisfy `drained_on_chain && cm_observed` ⇒ eviction. + let observed_cm: F = felt_tag(b"e2e-finalize-observed-cm"); + + let mut wallet = super::tests::test_wallet(1); + wallet.scanned = 0; + let pkh = felt_tag(b"finalize-pin-pool-pkh"); + wallet.pending_deposits.push(PendingDeposit { + pubkey_hash: pkh, + blind: felt_tag(b"finalize-pin-blind"), + address_index: 0, + auth_domain: felt_tag(b"finalize-pin-auth"), + amount: 1_000_000, + operation_hash: None, + shielded_cm: Some(observed_cm), + }); + save_wallet(wallet_path_str, &wallet).expect("save wallet"); + + // Encode a published-note wrapping `observed_cm`. Any enc + // payload works (this note is non-recoverable from the + // wallet's keys; what matters is the cm flowing into + // `seen_cms`, which `apply_scan_feed_recover_batch` does + // unconditionally). + let filler_nm = super::tests::note_memo_for_wallet_address( + &wallet, + 0, + 1, + felt_tag(b"finalize-pin-filler-rseed"), + None, + ); + let note_bytes = canonical_wire::encode_published_note(&observed_cm, &filler_nm.enc) + .expect("published note encodes"); + let note_key = indexed_durable_key(DURABLE_NOTE_PREFIX, 0); + + let pkh_hex = hex::encode(pkh); + let pool_key = + format!("{}{}", DURABLE_DEPOSIT_BALANCE_PREFIX, pkh_hex); + let value_old: u64 = 1_000_000; + let tree_size: u64 = 1; + + let mut routes: HashMap = HashMap::new(); + // Mirror the same per-block durable shape on both blocks so + // the regression-induced re-resolution doesn't 404 on tree / + // nullifier reads — the failure must surface specifically at + // the pool-balance read, not as a generic RPC error. + for block in [block_old, block_new] { + routes.insert( + format!( + "/global/block/{}/durable/wasm_2_0_0/value?key={}", + block, DURABLE_TREE_SIZE + ), + (200, format!("\"{}\"", hex::encode(tree_size.to_le_bytes()))), + ); + routes.insert( + format!( + "/global/block/{}/durable/wasm_2_0_0/value?key={}", + block, DURABLE_NULLIFIER_COUNT + ), + (200, format!("\"{}\"", hex::encode(0u64.to_le_bytes()))), + ); + routes.insert( + format!( + "/global/block/{}/durable/wasm_2_0_0/length?key={}", + block, note_key + ), + (200, note_bytes.len().to_string()), + ); + routes.insert( + format!( + "/global/block/{}/durable/wasm_2_0_0/value?key={}", + block, note_key + ), + (200, format!("\"{}\"", hex::encode(¬e_bytes))), + ); + } + // Pool balance — funded at block_old (length=8 + value=1_000_000), + // drained at block_new (length=null, helper returns 0 ⇒ + // `drained_on_chain = true`). + routes.insert( + format!( + "/global/block/{}/durable/wasm_2_0_0/length?key={}", + block_old, pool_key + ), + (200, "8".into()), + ); + routes.insert( + format!( + "/global/block/{}/durable/wasm_2_0_0/value?key={}", + block_old, pool_key + ), + (200, format!("\"{}\"", hex::encode(value_old.to_le_bytes()))), + ); + routes.insert( + format!( + "/global/block/{}/durable/wasm_2_0_0/length?key={}", + block_new, pool_key + ), + (200, "null".into()), + ); + + // Stateful listener: head/hash uses a counter; everything else + // falls back to the routes HashMap. + let listener = TcpListener::bind("127.0.0.1:0").expect("bind mock http server"); + let addr = listener.local_addr().expect("mock server addr"); + let counter = Arc::new(AtomicUsize::new(0)); + let counter_thread = counter.clone(); + let block_old_owned = block_old.to_string(); + let block_new_owned = block_new.to_string(); + std::thread::spawn(move || { + for stream in listener.incoming() { + use std::io::{Read, Write}; + let mut stream = match stream { + Ok(s) => s, + Err(_) => break, + }; + let mut buffer = [0u8; 8192]; + let read = match stream.read(&mut buffer) { + Ok(n) => n, + Err(_) => continue, + }; + if read == 0 { + continue; + } + let request = String::from_utf8_lossy(&buffer[..read]); + let path = request + .lines() + .next() + .and_then(|line| line.split_whitespace().nth(1)) + .unwrap_or("/") + .to_string(); + let (status, body) = if path == "/global/block/head/hash" { + let n = counter_thread.fetch_add(1, Ordering::SeqCst); + let block = if n == 0 { + &block_old_owned + } else { + &block_new_owned + }; + (200u16, format!("\"{}\"", block)) + } else { + routes + .get(&path) + .cloned() + .unwrap_or_else(|| (404, "null".to_string())) + }; + let response = format!( + "HTTP/1.1 {} OK\r\nContent-Type: application/json\r\nContent-Length: {}\r\nConnection: close\r\n\r\n{}", + status, + body.len(), + body + ); + let _ = stream.write_all(response.as_bytes()); + } + }); + + let base_url = format!("http://{}", addr); + let profile = super::tests::rollup_profile_for_url(&base_url); + + cmd_rollup_sync(wallet_path_str, &profile, DEFAULT_CHECKPOINT_EVERY) + .expect("sync should finish on a stable empty tree"); + + let saved = load_wallet(wallet_path_str).expect("reload wallet"); + // The pool was funded at block_old (the pinned head). The + // pending_deposit must still be there. If a regression has + // finalize re-resolve head (which now serves block_new where + // the pool is drained), the deposit gets evicted as "drained + // pending prune" and this assertion fails. + assert_eq!( + saved.pending_deposits.len(), + 1, + "finalize must read pool balances at the pinned block_old (funded), \ + NOT a re-resolved block_new (drained); regression evicts the deposit" + ); + assert_eq!( + saved.pending_deposits[0].pubkey_hash, pkh, + "the surviving pending_deposit must be the one we set up" + ); + // Strong sanity: `cmd_rollup_sync` MUST resolve + // `head_hash()` exactly once and pin the result for every + // subsequent read. Any future regression that re-introduces + // a head-resolving call inside the loop or finalize will + // bump this counter — and a single extra resolution lands + // on `block_new` (drained), satisfying the eviction predicate + // and failing the `pending_deposits` assertion above too. + // We keep both checks because they fail differently + // (resolution count tells you WHERE to look; eviction tells + // you the SHAPE of the bug). + let resolutions = counter.load(Ordering::SeqCst); + assert_eq!( + resolutions, 1, + "cmd_rollup_sync must resolve head/hash exactly once (pinned thereafter); \ + got {} resolutions — a head-resolving helper crept back into the call path", + resolutions + ); + } + + /// Companion to `cmd_rollup_sync_pins_pool_reads_to_finalize_head`: + /// proves the fixture's failure mechanism is REAL (not tautological) + /// without touching the production call site. Builds the same + /// stateful mock + same wallet shape, then invokes + /// `load_pool_balances_at_head` (the head-resolving helper, what + /// the regression would call) followed by `apply_scan_feed_finalize` + /// with `seen_cms = [observed_cm]`. The pool resolves to `block_new` + /// (drained) → `drained_on_chain && cm_observed` ⇒ deposit evicted. + /// + /// This test exists because PR-#24 reviewers caught an earlier + /// version of the e2e test where the eviction predicate could + /// never fire (`shielded_cm = None` ⇒ `cm_observed` always + /// false), making the load-bearing assertion dead. By asserting + /// here that the same fixture, when fed through the regression + /// helper, DOES evict, we lock in that the e2e test's + /// assertion is exercising a live mechanism. + #[test] + fn fixture_for_finalize_pin_test_actually_evicts_under_regression_helper() { + use std::collections::HashMap; + use std::net::TcpListener; + use std::sync::atomic::{AtomicUsize, Ordering}; + use std::sync::Arc; + + let block_old = "BLfixturetestold"; + let block_new = "BLfixturetestnew"; + let observed_cm: F = felt_tag(b"e2e-finalize-observed-cm-fixture"); + + let mut wallet = super::tests::test_wallet(1); + let pkh = felt_tag(b"finalize-pin-pool-pkh-fixture"); + wallet.pending_deposits.push(PendingDeposit { + pubkey_hash: pkh, + blind: felt_tag(b"finalize-pin-blind-fixture"), + address_index: 0, + auth_domain: felt_tag(b"finalize-pin-auth-fixture"), + amount: 1_000_000, + operation_hash: None, + shielded_cm: Some(observed_cm), + }); + + let pkh_hex = hex::encode(pkh); + let pool_key = + format!("{}{}", DURABLE_DEPOSIT_BALANCE_PREFIX, pkh_hex); + let value_old: u64 = 1_000_000; + + let mut routes: HashMap = HashMap::new(); + // block_old: pool funded. + routes.insert( + format!( + "/global/block/{}/durable/wasm_2_0_0/length?key={}", + block_old, pool_key + ), + (200, "8".into()), + ); + routes.insert( + format!( + "/global/block/{}/durable/wasm_2_0_0/value?key={}", + block_old, pool_key + ), + (200, format!("\"{}\"", hex::encode(value_old.to_le_bytes()))), + ); + // block_new: pool drained. + routes.insert( + format!( + "/global/block/{}/durable/wasm_2_0_0/length?key={}", + block_new, pool_key + ), + (200, "null".into()), + ); + + // Stateful listener: head_old first, head_new thereafter. + let listener = TcpListener::bind("127.0.0.1:0").expect("bind mock http server"); + let addr = listener.local_addr().expect("mock server addr"); + let counter = Arc::new(AtomicUsize::new(0)); + let counter_thread = counter.clone(); + let block_old_owned = block_old.to_string(); + let block_new_owned = block_new.to_string(); + std::thread::spawn(move || { + for stream in listener.incoming() { + use std::io::{Read, Write}; + let mut stream = match stream { + Ok(s) => s, + Err(_) => break, + }; + let mut buffer = [0u8; 8192]; + let read = match stream.read(&mut buffer) { + Ok(n) => n, + Err(_) => continue, + }; + if read == 0 { + continue; + } + let request = String::from_utf8_lossy(&buffer[..read]); + let path = request + .lines() + .next() + .and_then(|line| line.split_whitespace().nth(1)) + .unwrap_or("/") + .to_string(); + let (status, body) = if path == "/global/block/head/hash" { + let n = counter_thread.fetch_add(1, Ordering::SeqCst); + let block = if n == 0 { + &block_old_owned + } else { + &block_new_owned + }; + (200u16, format!("\"{}\"", block)) + } else { + routes + .get(&path) + .cloned() + .unwrap_or_else(|| (404, "null".to_string())) + }; + let response = format!( + "HTTP/1.1 {} OK\r\nContent-Type: application/json\r\nContent-Length: {}\r\nConnection: close\r\n\r\n{}", + status, + body.len(), + body + ); + let _ = stream.write_all(response.as_bytes()); + } + }); + + let base_url = format!("http://{}", addr); + let profile = super::tests::rollup_profile_for_url(&base_url); + let rpc = RollupRpc::new(&profile); + + // Burn one head/hash resolution to mimic the "scan ran first + // and pinned to block_old" preamble. + let pinned = rpc.head_hash().expect("first head/hash resolves to old"); + assert_eq!(pinned, block_old, "first resolution serves block_old"); + + // Now invoke the REGRESSION helper — which re-resolves head, + // landing on block_new (drained). + let drained_pool = rpc + .load_pool_balances_at_head(&wallet.pending_deposits) + .expect("regression helper succeeds"); + assert_eq!( + drained_pool.get(&pkh).copied().unwrap_or(0), + 0, + "regression read at block_new must see drained pool" + ); + + // Feed that drained view into finalize with the observed cm + // present in seen_cms — exactly the path the e2e test + // protects against. + let seen_cms = vec![observed_cm]; + let summary = + apply_scan_feed_finalize(&mut wallet, &seen_cms, vec![], &drained_pool); + assert_eq!( + wallet.pending_deposits.len(), + 0, + "fixture must demonstrate the regression actually evicts \ + the deposit; if it doesn't, the sibling e2e test is \ + tautological and protects nothing" + ); + assert_eq!( + summary.pruned_drained_pools, 1, + "finalize must report exactly the one regression-induced eviction" + ); + } }