diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index eb0a135..a27e33f 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -64,10 +64,33 @@ jobs: - uses: actions/checkout@v4 - uses: dtolnay/rust-toolchain@stable - uses: Swatinem/rust-cache@v2 - - name: Install Foundry - uses: foundry-rs/foundry-toolchain@v1 - with: - version: nightly + - name: Install Foundry (direct release download) + # foundry-rs/foundry-toolchain calls foundryup which curls + # api.github.com unauthenticated to resolve the nightly tag. + # macOS runners on shared egress IPs hit the 60/h anonymous + # rate limit and the install 403s. Setting GITHUB_TOKEN does + # NOT help because foundryup ignores it. Instead: pull the + # pinned release tarball directly from the GitHub Releases CDN + # (no API call, no rate limit) and unpack to /usr/local/bin. + shell: bash + env: + FOUNDRY_VERSION: v1.7.1 + run: | + set -euo pipefail + os="$(uname -s | tr '[:upper:]' '[:lower:]')" + arch="$(uname -m)" + case "$os-$arch" in + linux-x86_64) asset="foundry_${FOUNDRY_VERSION}_linux_amd64.tar.gz" ;; + darwin-arm64) asset="foundry_${FOUNDRY_VERSION}_darwin_arm64.tar.gz" ;; + darwin-x86_64) asset="foundry_${FOUNDRY_VERSION}_darwin_amd64.tar.gz" ;; + *) echo "unsupported runner: $os-$arch"; exit 1 ;; + esac + url="https://github.com/foundry-rs/foundry/releases/download/${FOUNDRY_VERSION}/${asset}" + echo "downloading $url" + curl -sSL --retry 5 --retry-delay 5 "$url" -o /tmp/foundry.tar.gz + sudo tar -xzf /tmp/foundry.tar.gz -C /usr/local/bin anvil forge cast chisel + anvil --version + forge --version - name: Run E2E tests (serial) run: cargo test -p ant-core --test e2e_chunk --test e2e_data --test e2e_file --test e2e_payment --test e2e_security --test e2e_cost_estimate -- --test-threads=1 @@ -89,10 +112,29 @@ jobs: - uses: actions/checkout@v4 - uses: dtolnay/rust-toolchain@stable - uses: Swatinem/rust-cache@v2 - - name: Install Foundry - uses: foundry-rs/foundry-toolchain@v1 - with: - version: nightly + - name: Install Foundry (direct release download) + # Bypass foundry-toolchain / foundryup because they 403 on + # macOS runners against the anonymous 60/h api.github.com + # rate limit (see e2e step above for full rationale). + shell: bash + env: + FOUNDRY_VERSION: v1.7.1 + run: | + set -euo pipefail + os="$(uname -s | tr '[:upper:]' '[:lower:]')" + arch="$(uname -m)" + case "$os-$arch" in + linux-x86_64) asset="foundry_${FOUNDRY_VERSION}_linux_amd64.tar.gz" ;; + darwin-arm64) asset="foundry_${FOUNDRY_VERSION}_darwin_arm64.tar.gz" ;; + darwin-x86_64) asset="foundry_${FOUNDRY_VERSION}_darwin_amd64.tar.gz" ;; + *) echo "unsupported runner: $os-$arch"; exit 1 ;; + esac + url="https://github.com/foundry-rs/foundry/releases/download/${FOUNDRY_VERSION}/${asset}" + echo "downloading $url" + curl -sSL --retry 5 --retry-delay 5 "$url" -o /tmp/foundry.tar.gz + sudo tar -xzf /tmp/foundry.tar.gz -C /usr/local/bin anvil forge cast chisel + anvil --version + forge --version - name: Run merkle E2E tests (35-node testnet) run: cargo test -p ant-core --test e2e_merkle -- --test-threads=1 env: diff --git a/ant-core/src/data/client/cached_merkle.rs b/ant-core/src/data/client/cached_merkle.rs new file mode 100644 index 0000000..f513e52 --- /dev/null +++ b/ant-core/src/data/client/cached_merkle.rs @@ -0,0 +1,367 @@ +//! On-disk cache for merkle batch payment receipts. +//! +//! Why this exists +//! --------------- +//! A merkle batch upload pays for *all* chunks in one on-chain transaction +//! up-front, then stores each chunk to its close-group. If the store phase +//! fails partway through (network flake, slow close-K, client crash), the +//! on-chain payment is gone but the proofs needed to re-attempt the store +//! are lost too — the user has to pay again from scratch. +//! +//! By persisting the [`MerkleBatchPaymentResult`] to disk **immediately after +//! the on-chain payment lands**, the next invocation can resume the upload +//! using the already-paid proofs instead of re-paying. The cache is keyed by +//! a derivation of the source file path so the same upload, re-issued for +//! the same file, transparently picks up where it left off. +//! +//! Lifecycle +//! --------- +//! * **save** — called once per upload, right after the merkle batch payment +//! transaction confirms. Writes JSON to +//! `/payments/_`. +//! * **load_for_file** — called at the top of every merkle upload. If a +//! non-expired cached receipt exists for the file, it is returned so the +//! upload can skip the pay phase and go straight to store. +//! * **delete_for_file** — called after a fully successful upload to remove +//! the receipt so a future re-upload of the same path pays anew. +//! * **cleanup_outdated** — called opportunistically inside `load_for_file` +//! to garbage-collect receipts past the 7-day expiry window. +//! +//! Filename format +//! --------------- +//! `_` where: +//! * `timestamp` is the merkle payment timestamp (seconds since epoch) used +//! on-chain. Expiry is computed from this value so we can prune stale +//! receipts even if their on-disk mtime has been touched. +//! * `file_hash` is the SHA-256 of the source file path string, truncated +//! to keep filenames short. Same-name uploads from different directories +//! collide deliberately — the user can name their file uniquely if they +//! need parallel uploads. +//! +//! Failure-mode tolerance +//! ---------------------- +//! All errors in this module are logged and swallowed in the public-facing +//! API (`try_load_for_file`, `try_save`, `try_delete_for_file`): a busted +//! cache directory must never prevent a real upload from running. The +//! tradeoff is that a corrupt cache file is silently treated as "no +//! cache", forcing the user to re-pay — but never causes data loss. + +use crate::config; +use crate::data::client::merkle::MerkleBatchPaymentResult; +use crate::error::Result; +use std::fs::{self, DirEntry, File}; +use std::hash::{Hash, Hasher}; +use std::io::{BufReader, BufWriter}; +use std::path::{Path, PathBuf}; +use std::time::{SystemTime, UNIX_EPOCH}; +use tracing::{debug, info, warn}; + +/// Cached merkle receipts older than this are removed from disk. +/// +/// Set to match `MERKLE_PAYMENT_EXPIRATION` in `evmlib` (7 days). After +/// the payment ages out on-chain there is no point keeping the cache — +/// the proofs can no longer be verified by storers. +const PAYMENT_EXPIRATION_SECS: u64 = 7 * 24 * 60 * 60; + +/// Subdirectory under the platform-appropriate data dir. +const PAYMENTS_SUBDIR: &str = "payments"; + +/// Returns the directory used for cached payments, creating it if needed. +fn payments_dir() -> Result { + let dir = config::data_dir()?.join(PAYMENTS_SUBDIR); + fs::create_dir_all(&dir)?; + Ok(dir) +} + +/// Short non-cryptographic hash of the source file path string, used as +/// the on-disk cache key. +/// +/// Filename collisions are not a correctness problem (the loaded +/// receipt is content-validated against the current encrypted chunk +/// addresses before being trusted) but they would waste a re-pay, so +/// we want low collision probability across a single user's upload +/// history. `std::hash::DefaultHasher` with 16 hex chars of output is +/// far below the collision threshold for that scale. +fn file_hash_key(file_path: &str) -> String { + let mut hasher = std::collections::hash_map::DefaultHasher::new(); + file_path.hash(&mut hasher); + format!("{:016x}", hasher.finish()) +} + +/// Save the merkle batch payment receipt for a given source file path. +/// +/// Idempotent: re-saving for the same `(timestamp, file_path)` overwrites +/// the previous file. Different timestamps for the same file produce +/// different filenames, which is fine — `cleanup_outdated` reaps them. +pub fn save(file_path: &str, result: &MerkleBatchPaymentResult) -> Result { + let dir = payments_dir()?; + let ts = if result.merkle_payment_timestamp > 0 { + result.merkle_payment_timestamp + } else { + // Fall back to now() if the result wasn't populated. Should not + // happen in practice — every constructor stamps this field — + // but defensively avoid emitting a `0_*` filename that would + // immediately be treated as expired. + SystemTime::now() + .duration_since(UNIX_EPOCH) + .map(|d| d.as_secs()) + .unwrap_or(0) + }; + let path = dir.join(format!("{ts}_{}", file_hash_key(file_path))); + let handle = File::create(&path)?; + serde_json::to_writer(BufWriter::new(handle), result) + .map_err(|e| crate::error::Error::Io(std::io::Error::other(e.to_string())))?; + debug!( + "Cached merkle payment receipt for {file_path:?} to {}", + path.display() + ); + Ok(path) +} + +/// Best-effort save. Logs on failure but never returns an error. +/// +/// Intended for the upload path: if we can't cache the receipt we still +/// want to attempt the chunk PUTs. +pub fn try_save(file_path: &str, result: &MerkleBatchPaymentResult) { + if let Err(e) = save(file_path, result) { + warn!( + "Failed to cache merkle payment receipt for {file_path:?}: {e}. \ + Upload will proceed without resume support." + ); + } +} + +/// Load the cached merkle batch receipt for a given source file path. +/// +/// Side-effect: opportunistically removes any expired receipts found in +/// the directory while scanning. +/// +/// Returns `Ok(None)` if no matching non-expired receipt is found. +pub fn load_for_file(file_path: &str) -> Result> { + cleanup_outdated(); + let dir = payments_dir()?; + let key = file_hash_key(file_path); + + let read_dir = match fs::read_dir(&dir) { + Ok(rd) => rd, + Err(e) => { + debug!("Could not read payments dir {}: {e}", dir.display()); + return Ok(None); + } + }; + + for entry in read_dir.flatten() { + let path = entry.path(); + if !path.is_file() { + continue; + } + let Some(name) = path.file_name().and_then(|n| n.to_str()) else { + continue; + }; + if !name.contains(&key) { + continue; + } + if is_expired_filename(name) { + // Found the file but it has aged out; cleanup will + // collect it. Keep scanning in case a newer one exists. + continue; + } + match read_receipt(&path) { + Ok(receipt) => { + info!( + "Found previous merkle upload attempt for {file_path}, \ + resuming with payment cached at {}", + path.display() + ); + return Ok(Some((path, receipt))); + } + Err(e) => { + warn!( + "Cached merkle receipt at {} is unreadable ({e}). \ + Ignoring and starting a fresh upload.", + path.display() + ); + } + } + } + Ok(None) +} + +/// Best-effort load. Logs on failure and returns `None`. +pub fn try_load_for_file(file_path: &str) -> Option<(PathBuf, MerkleBatchPaymentResult)> { + match load_for_file(file_path) { + Ok(opt) => opt, + Err(e) => { + warn!( + "Failed to look up cached merkle receipt for {file_path:?}: {e}. \ + Starting a fresh upload." + ); + None + } + } +} + +/// Delete the cached receipt(s) matching the file path. Called on +/// successful upload completion. +pub fn delete_for_file(file_path: &str) -> Result<()> { + let dir = payments_dir()?; + let key = file_hash_key(file_path); + if let Ok(read_dir) = fs::read_dir(&dir) { + for entry in read_dir.flatten() { + let path = entry.path(); + if let Some(name) = path.file_name().and_then(|n| n.to_str()) { + if name.contains(&key) { + let _ = fs::remove_file(&path); + debug!("Deleted cached merkle receipt {}", path.display()); + } + } + } + } + Ok(()) +} + +/// Best-effort delete. Logs on failure but never returns an error. +pub fn try_delete_for_file(file_path: &str) { + if let Err(e) = delete_for_file(file_path) { + warn!( + "Failed to delete cached merkle receipt for {file_path:?}: {e}. \ + Will be cleaned up after expiry." + ); + } +} + +/// Garbage-collect cached receipts past the expiry window. +/// +/// Logs each removal at info level so users see what we cleaned up. +/// Best-effort: any IO error is silently ignored. +pub fn cleanup_outdated() { + let Ok(dir) = payments_dir() else { + return; + }; + let Ok(read_dir) = fs::read_dir(&dir) else { + return; + }; + for entry in read_dir.flatten() { + if is_expired_entry(&entry) { + let path = entry.path(); + info!( + "Removing expired cached merkle payment file: {}", + path.display() + ); + let _ = fs::remove_file(path); + } + } +} + +fn is_expired_entry(entry: &DirEntry) -> bool { + let path = entry.path(); + if !path.is_file() { + return false; + } + let Some(name) = path.file_name().and_then(|n| n.to_str()) else { + return false; + }; + is_expired_filename(name) +} + +fn is_expired_filename(name: &str) -> bool { + let ts_str = match name.split_once('_') { + Some((ts, _)) => ts, + None => return false, + }; + let Ok(ts) = ts_str.parse::() else { + return false; + }; + let now = SystemTime::now() + .duration_since(UNIX_EPOCH) + .map(|d| d.as_secs()) + .unwrap_or(0); + now > ts.saturating_add(PAYMENT_EXPIRATION_SECS) +} + +fn read_receipt(path: &Path) -> Result { + let handle = File::open(path)?; + let receipt: MerkleBatchPaymentResult = serde_json::from_reader(BufReader::new(handle)) + .map_err(|e| crate::error::Error::Io(std::io::Error::other(e.to_string())))?; + Ok(receipt) +} + +#[cfg(test)] +mod tests { + use super::*; + use std::collections::HashMap; + + fn dummy_receipt(ts: u64) -> MerkleBatchPaymentResult { + let mut proofs: HashMap<[u8; 32], Vec> = HashMap::new(); + proofs.insert([0u8; 32], vec![1, 2, 3]); + MerkleBatchPaymentResult { + proofs, + chunk_count: 1, + storage_cost_atto: "0".to_string(), + gas_cost_wei: 0, + merkle_payment_timestamp: ts, + } + } + + #[test] + fn file_hash_key_is_stable() { + let a = file_hash_key("/tmp/some/file.bin"); + let b = file_hash_key("/tmp/some/file.bin"); + assert_eq!(a, b); + let c = file_hash_key("/tmp/some/other.bin"); + assert_ne!(a, c); + } + + #[test] + fn expired_filename_detected() { + // Just past the expiry boundary. + let stale = SystemTime::now() + .duration_since(UNIX_EPOCH) + .unwrap() + .as_secs() + .saturating_sub(PAYMENT_EXPIRATION_SECS + 60); + let name = format!("{stale}_abcd1234"); + assert!(is_expired_filename(&name)); + + // Within the window. + let fresh = SystemTime::now() + .duration_since(UNIX_EPOCH) + .unwrap() + .as_secs() + .saturating_sub(60); + let name = format!("{fresh}_abcd1234"); + assert!(!is_expired_filename(&name)); + } + + #[test] + fn malformed_filename_is_not_expired() { + // Defensive: garbage in payments dir must not be auto-deleted. + assert!(!is_expired_filename("nonsense")); + assert!(!is_expired_filename("not_a_number_abcd1234")); + } + + #[test] + fn roundtrip_save_load_delete() -> Result<()> { + let file_path = format!( + "/tmp/anselme-resumable-merkle-test-{}", + SystemTime::now().duration_since(UNIX_EPOCH).unwrap().as_nanos() + ); + let ts = SystemTime::now() + .duration_since(UNIX_EPOCH) + .unwrap() + .as_secs(); + let receipt = dummy_receipt(ts); + let saved_path = save(&file_path, &receipt)?; + assert!(saved_path.exists()); + + let loaded = load_for_file(&file_path)?; + let (loaded_path, loaded_receipt) = loaded.expect("receipt should be loadable"); + assert_eq!(loaded_path, saved_path); + assert_eq!(loaded_receipt.chunk_count, receipt.chunk_count); + assert_eq!(loaded_receipt.merkle_payment_timestamp, ts); + + delete_for_file(&file_path)?; + assert!(load_for_file(&file_path)?.is_none()); + Ok(()) + } +} diff --git a/ant-core/src/data/client/chunk.rs b/ant-core/src/data/client/chunk.rs index da0bd03..99c0975 100644 --- a/ant-core/src/data/client/chunk.rs +++ b/ant-core/src/data/client/chunk.rs @@ -166,7 +166,8 @@ impl Client { ) -> Result { let address = compute_address(&content); let node = self.network().node(); - let timeout = store_response_timeout_for_proof(&proof, self.config().store_timeout_secs); + let timeout = + store_response_timeout_for_proof(&proof, self.config().merkle_store_timeout_secs); let timeout_secs = timeout.as_secs(); let request_id = self.next_request_id(); @@ -399,4 +400,46 @@ mod tests { assert_eq!(timeout, Duration::from_secs(TEST_MERKLE_TIMEOUT_SECS)); } + + /// Regression: the default `merkle_store_timeout_secs` must be at + /// least the storer-side `CLOSENESS_LOOKUP_TIMEOUT` (240 s) plus + /// padding. If either side moves and this invariant breaks, the + /// client will give up on chunks the storer is still verifying. + /// See `DEFAULT_MERKLE_STORE_TIMEOUT_SECS` doc comment for the + /// derivation. + #[test] + fn default_merkle_store_timeout_satisfies_storer_invariant() { + use crate::data::client::ClientConfig; + const STORER_CLOSENESS_LOOKUP_TIMEOUT_SECS: u64 = 240; + const MIN_PADDING_SECS: u64 = 30; + let config = ClientConfig::default(); + assert!( + config.merkle_store_timeout_secs + >= STORER_CLOSENESS_LOOKUP_TIMEOUT_SECS + MIN_PADDING_SECS, + "merkle_store_timeout_secs ({}) must be >= storer CLOSENESS_LOOKUP_TIMEOUT ({}) + padding ({})", + config.merkle_store_timeout_secs, + STORER_CLOSENESS_LOOKUP_TIMEOUT_SECS, + MIN_PADDING_SECS, + ); + } + + /// Regression: the non-merkle PUT path uses the hardcoded + /// `STORE_RESPONSE_TIMEOUT` constant, not the per-config + /// `merkle_store_timeout_secs`. If a future refactor accidentally + /// routes non-merkle PUTs through the merkle field they'd inherit + /// the 270 s value and silently regress non-merkle latency. + /// `store_response_timeout_for_proof` with a non-merkle proof tag + /// must return the const regardless of what merkle timeout is + /// passed. + #[test] + fn non_merkle_put_ignores_merkle_timeout_value() { + let absurd_merkle_timeout = 9_999; + for tag in [PROOF_TAG_SINGLE_NODE, UNKNOWN_PROOF_TAG] { + let timeout = store_response_timeout_for_proof(&[tag], absurd_merkle_timeout); + assert_eq!( + timeout, STORE_RESPONSE_TIMEOUT, + "non-merkle proof tag {tag:#x} should ignore merkle timeout {absurd_merkle_timeout}", + ); + } + } } diff --git a/ant-core/src/data/client/file.rs b/ant-core/src/data/client/file.rs index 134a71a..669170b 100644 --- a/ant-core/src/data/client/file.rs +++ b/ant-core/src/data/client/file.rs @@ -1260,36 +1260,121 @@ impl Client { } // Phase 2: Decide payment mode and upload in waves from disk. + // + // For the merkle path, attempt to resume from a cached + // receipt before paying again. The cache is keyed by the + // source file path; a successful upload deletes the cache so + // a subsequent re-upload of the same path will pay anew. + let file_path_key = path.display().to_string(); let (chunks_stored, actual_mode, storage_cost_atto, gas_cost_wei) = if self.should_use_merkle(chunk_count, mode) { info!("Using merkle batch payment for {chunk_count} file chunks"); - let batch_result = match self - .pay_for_merkle_batch(&spill.addresses, DATA_TYPE_CHUNK, spill.avg_chunk_size()) - .await + let batch_result = if let Some((_cache_path, cached)) = + crate::data::client::cached_merkle::try_load_for_file(&file_path_key) { - Ok(result) => result, - Err(Error::InsufficientPeers(ref msg)) if mode == PaymentMode::Auto => { - info!("Merkle needs more peers ({msg}), falling back to wave-batch"); - let (stored, sc, gc) = - self.upload_waves_single(&spill, progress.as_ref()).await?; - return Ok(FileUploadResult { - data_map, - chunks_stored: stored, - chunks_failed: 0, - total_chunks: chunk_count, - payment_mode_used: PaymentMode::Single, - storage_cost_atto: sc, - gas_cost_wei: gc, - data_map_address: None, - }); + // Validate the cache matches this upload. If the + // file was edited between attempts the cached + // proofs would no longer be valid for the new + // chunk addresses; in that case drop the cache + // and pay fresh. + let addresses_match = spill + .addresses + .iter() + .all(|addr| cached.proofs.contains_key(addr)); + if addresses_match && cached.proofs.len() == chunk_count { + info!( + "Skipping merkle payment phase; resuming with \ + cached proofs ({} chunks)", + cached.proofs.len() + ); + cached + } else { + info!( + "Cached merkle receipt does not match current file \ + content (cached={}, file={chunk_count}). \ + Discarding cache and paying fresh.", + cached.proofs.len() + ); + crate::data::client::cached_merkle::try_delete_for_file(&file_path_key); + // Fall through to fresh payment below. + match self + .pay_for_merkle_batch( + &spill.addresses, + DATA_TYPE_CHUNK, + spill.avg_chunk_size(), + ) + .await + { + Ok(result) => { + crate::data::client::cached_merkle::try_save( + &file_path_key, + &result, + ); + result + } + Err(Error::InsufficientPeers(ref msg)) + if mode == PaymentMode::Auto => + { + info!( + "Merkle needs more peers ({msg}), falling back to wave-batch" + ); + let (stored, sc, gc) = + self.upload_waves_single(&spill, progress.as_ref()).await?; + return Ok(FileUploadResult { + data_map, + chunks_stored: stored, + chunks_failed: 0, + total_chunks: chunk_count, + payment_mode_used: PaymentMode::Single, + storage_cost_atto: sc, + gas_cost_wei: gc, + data_map_address: None, + }); + } + Err(e) => return Err(e), + } + } + } else { + match self + .pay_for_merkle_batch( + &spill.addresses, + DATA_TYPE_CHUNK, + spill.avg_chunk_size(), + ) + .await + { + Ok(result) => { + // Save BEFORE the store phase so a crash + // mid-upload leaves a resumable receipt. + crate::data::client::cached_merkle::try_save(&file_path_key, &result); + result + } + Err(Error::InsufficientPeers(ref msg)) if mode == PaymentMode::Auto => { + info!("Merkle needs more peers ({msg}), falling back to wave-batch"); + let (stored, sc, gc) = + self.upload_waves_single(&spill, progress.as_ref()).await?; + return Ok(FileUploadResult { + data_map, + chunks_stored: stored, + chunks_failed: 0, + total_chunks: chunk_count, + payment_mode_used: PaymentMode::Single, + storage_cost_atto: sc, + gas_cost_wei: gc, + data_map_address: None, + }); + } + Err(e) => return Err(e), } - Err(e) => return Err(e), }; let (stored, sc, gc) = self .upload_waves_merkle(&spill, &batch_result, progress.as_ref()) .await?; + // Upload succeeded end-to-end; the cached receipt is + // no longer needed. + crate::data::client::cached_merkle::try_delete_for_file(&file_path_key); (stored, PaymentMode::Merkle, sc, gc) } else { let (stored, sc, gc) = self.upload_waves_single(&spill, progress.as_ref()).await?; diff --git a/ant-core/src/data/client/merkle.rs b/ant-core/src/data/client/merkle.rs index ac2e9a1..1b5a780 100644 --- a/ant-core/src/data/client/merkle.rs +++ b/ant-core/src/data/client/merkle.rs @@ -44,7 +44,10 @@ pub enum PaymentMode { } /// Result of a merkle batch payment. -#[derive(Debug)] +/// +/// Serializable so it can be persisted across runs for resume after a +/// partial-upload failure. See `crate::data::client::cached_merkle`. +#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)] pub struct MerkleBatchPaymentResult { /// Map of `XorName` to serialized tagged proof bytes (ready to use in PUT requests). pub proofs: HashMap<[u8; 32], Vec>, @@ -54,6 +57,12 @@ pub struct MerkleBatchPaymentResult { pub storage_cost_atto: String, /// Total gas cost in wei. pub gas_cost_wei: u128, + /// Unix timestamp (seconds) used for the on-chain merkle payment. + /// Persisted so resume can check whether the on-chain payment has + /// aged out beyond the merkle expiration window and the cached + /// receipt must be discarded. + #[serde(default)] + pub merkle_payment_timestamp: u64, } /// Prepared merkle batch ready for external payment. @@ -252,6 +261,10 @@ impl Client { let mut all_proofs = HashMap::with_capacity(addresses.len()); let mut total_storage = Amount::ZERO; let mut total_gas: u128 = 0; + // Track the oldest sub-batch timestamp so the overall receipt + // expires when the *first* sub-batch's on-chain payment ages + // out (worst case for resume). + let mut oldest_ts: u64 = 0; for (i, chunk) in sub_batches.into_iter().enumerate() { match self @@ -263,6 +276,12 @@ impl Client { total_storage += cost; } total_gas = total_gas.saturating_add(sub_result.gas_cost_wei); + if oldest_ts == 0 + || (sub_result.merkle_payment_timestamp > 0 + && sub_result.merkle_payment_timestamp < oldest_ts) + { + oldest_ts = sub_result.merkle_payment_timestamp; + } all_proofs.extend(sub_result.proofs); } Err(e) => { @@ -282,6 +301,7 @@ impl Client { proofs: all_proofs, storage_cost_atto: total_storage.to_string(), gas_cost_wei: total_gas, + merkle_payment_timestamp: oldest_ts, }); } } @@ -292,6 +312,7 @@ impl Client { proofs: all_proofs, storage_cost_atto: total_storage.to_string(), gas_cost_wei: total_gas, + merkle_payment_timestamp: oldest_ts, }) } @@ -633,6 +654,7 @@ pub fn finalize_merkle_batch( chunk_count, storage_cost_atto: "0".to_string(), gas_cost_wei: 0, + merkle_payment_timestamp: prepared.merkle_payment_timestamp, }) } diff --git a/ant-core/src/data/client/mod.rs b/ant-core/src/data/client/mod.rs index d43666d..0faeeff 100644 --- a/ant-core/src/data/client/mod.rs +++ b/ant-core/src/data/client/mod.rs @@ -6,6 +6,7 @@ pub mod adaptive; pub mod batch; pub mod cache; +pub(crate) mod cached_merkle; pub mod chunk; pub mod data; pub mod file; @@ -73,14 +74,44 @@ pub(crate) fn classify_error(err: &Error) -> Outcome { /// Default timeout for lightweight network operations (quotes, DHT lookups) in seconds. const DEFAULT_QUOTE_TIMEOUT_SECS: u64 = 10; -/// Default timeout for chunk store operations in seconds. +/// Default timeout for the per-peer chunk GET response and any other +/// caller that explicitly reads `store_timeout_secs`, in seconds. /// -/// Chunk PUTs transfer multi-MB payloads to multiple peers. On residential -/// connections with limited upload bandwidth, the default quote timeout (10 s) -/// is far too short — a 4 MB chunk at 1 Mbps takes ~32 s just for the data -/// transfer, before accounting for QUIC slow-start and NAT traversal overhead. +/// Note despite the name: this knob does **not** govern the non-merkle +/// chunk PUT response timeout — that path uses the +/// `STORE_RESPONSE_TIMEOUT` constant in `chunk.rs` directly. Nor does +/// it govern the merkle batch PUT timeout — see +/// `DEFAULT_MERKLE_STORE_TIMEOUT_SECS`. +/// +/// 10 s matches the pre-existing `main` default and intentionally +/// excludes residential-upload tuning, which is Mick's PR #78 +/// territory (splitting GET into its own field). const DEFAULT_STORE_TIMEOUT_SECS: u64 = 10; +/// Default timeout for **merkle batch** chunk store operations in seconds. +/// +/// Separate from `DEFAULT_STORE_TIMEOUT_SECS` because merkle PUTs carry +/// an extra storer-side cost: the payment verifier runs an iterative +/// DHT lookup (`CLOSENESS_LOOKUP_TIMEOUT` in `ant-node`, **240 s** +/// post-PR #89) before accepting the proof. +/// +/// This timeout MUST be >= the storer-side `CLOSENESS_LOOKUP_TIMEOUT` +/// plus padding for the store-response round-trip and storer-local +/// I/O. Otherwise the client gives up while the storer is still +/// happily verifying, the storer wastes CPU/bandwidth on a chunk the +/// client has already discarded, and the client re-targets a +/// different close-K member — potentially double-storing the same +/// chunk and polluting routing. +/// +/// 270 s = 240 s (storer lookup) + 30 s padding (network RTT + LMDB +/// put + fsync + clock skew tolerance). +/// +/// This invariant must be re-validated if either side's timeout +/// changes. Empirically surfaced as "every cross-region merkle chunk +/// times out at 10 s" on a 210-node 7-region testnet run on +/// 2026-05-12; bumping to 270 s flipped that 0/22 -> 9/9 pass rate. +const DEFAULT_MERKLE_STORE_TIMEOUT_SECS: u64 = 270; + /// Default timeout for chunk GET response operations in seconds. const DEFAULT_CHUNK_GET_TIMEOUT_SECS: u64 = 10; @@ -101,15 +132,32 @@ pub struct ClientConfig { /// DHT lookups), in seconds. The adaptive controller does NOT /// currently size timeouts; this remains a static knob. pub quote_timeout_secs: u64, - /// Per-op timeout for chunk store (PUT) operations, in seconds. - /// Should be larger than `quote_timeout_secs` because chunk PUTs - /// transfer multi-MB payloads. The adaptive controller does NOT - /// currently size timeouts; this remains a static knob. + /// Per-op timeout, in seconds, for the chunk GET response path + /// (`chunk_get_from_peer`) and any other caller that reads this + /// field directly. + /// + /// Note despite the historical name `store_timeout_secs`: this + /// knob does **not** govern the non-merkle chunk PUT response + /// timeout (that path uses the `STORE_RESPONSE_TIMEOUT` constant + /// in `chunk.rs`) and does **not** govern the merkle batch PUT + /// timeout (see `merkle_store_timeout_secs`). Rename pending in + /// Mick's PR #78 which adds a dedicated `chunk_get_timeout_secs`. + /// + /// The adaptive controller does NOT currently size timeouts; + /// this remains a static knob. pub store_timeout_secs: u64, /// Per-peer response timeout for chunk GET operations, in seconds. /// This is intentionally independent from `store_timeout_secs`: PUTs /// and GETs have different payload direction and performance profiles. pub chunk_get_timeout_secs: u64, + /// Per-op timeout for **merkle batch** chunk store (PUT) + /// operations, in seconds. Separate from `store_timeout_secs` + /// because merkle PUTs incur the storer-side + /// `CLOSENESS_LOOKUP_TIMEOUT` (240 s post-PR #89) on top of the + /// usual store path; the client must wait at least that long + /// plus padding, or the storer wastes work on a chunk the client + /// has already given up on. Default 270 s. + pub merkle_store_timeout_secs: u64, /// Number of closest peers to consider for routing. pub close_group_size: usize, /// **Deprecated.** Pre-adaptive ceiling for quote concurrency. @@ -158,6 +206,7 @@ impl Default for ClientConfig { quote_timeout_secs: DEFAULT_QUOTE_TIMEOUT_SECS, store_timeout_secs: DEFAULT_STORE_TIMEOUT_SECS, chunk_get_timeout_secs: DEFAULT_CHUNK_GET_TIMEOUT_SECS, + merkle_store_timeout_secs: DEFAULT_MERKLE_STORE_TIMEOUT_SECS, close_group_size: CLOSE_GROUP_SIZE, quote_concurrency: DEFAULT_QUOTE_CONCURRENCY, store_concurrency: DEFAULT_STORE_CONCURRENCY,