diff --git a/Cargo.lock b/Cargo.lock index 5833ccf..a5b5246 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -871,7 +871,7 @@ dependencies = [ "rand 0.8.6", "reqwest 0.12.28", "rmp-serde", - "saorsa-core 0.24.2 (git+https://github.com/saorsa-labs/saorsa-core?branch=rc-2026.4.4)", + "saorsa-core", "self-replace", "self_encryption", "semver 1.0.28", @@ -933,7 +933,7 @@ dependencies = [ "rand 0.8.6", "reqwest 0.13.3", "rmp-serde", - "saorsa-core 0.24.2 (git+https://github.com/saorsa-labs/saorsa-core?branch=fix%2Fstability-improvements)", + "saorsa-core", "saorsa-pqc 0.5.1", "self-replace", "self_encryption", @@ -965,7 +965,7 @@ dependencies = [ "hex", "postcard", "rmp-serde", - "saorsa-core 0.24.2 (git+https://github.com/saorsa-labs/saorsa-core?branch=fix%2Fstability-improvements)", + "saorsa-core", "saorsa-pqc 0.5.1", "serde", "tokio", @@ -983,7 +983,7 @@ dependencies = [ "hex", "postcard", "rmp-serde", - "saorsa-core 0.24.2 (git+https://github.com/saorsa-labs/saorsa-core?branch=fix%2Fstability-improvements)", + "saorsa-core", "saorsa-pqc 0.5.1", "serde", "tokio", @@ -2616,9 +2616,9 @@ checksum = "28dea519a9695b9977216879a3ebfddf92f1c08c05d984f8996aecd6ecdc811d" [[package]] name = "filetime" -version = "0.2.28" +version = "0.2.29" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2d5b2eef6fafbf69f877e55509ce5b11a760690ac9700a2921be067aa6afaef6" +checksum = "5c287a33c7f0a620c38e641e7f60827713987b3c0f26e8ddc9462cc69cf75759" dependencies = [ "cfg-if", "libc", @@ -3252,7 +3252,7 @@ dependencies = [ "js-sys", "log", "wasm-bindgen", - "windows-core 0.58.0", + "windows-core 0.62.2", ] [[package]] @@ -4573,7 +4573,7 @@ dependencies = [ "once_cell", "socket2 0.6.3", "tracing", - "windows-sys 0.59.0", + "windows-sys 0.60.2", ] [[package]] @@ -5230,38 +5230,7 @@ dependencies = [ "postcard", "rand 0.8.6", "saorsa-pqc 0.5.1", - "saorsa-transport 0.34.1 (git+https://github.com/saorsa-labs/saorsa-transport?branch=fix%2Fstability-improvements)", - "serde", - "serde_json", - "tempfile", - "thiserror 2.0.18", - "tokio", - "tokio-util", - "tracing", - "uuid", - "wyz", -] - -[[package]] -name = "saorsa-core" -version = "0.24.2" -source = "git+https://github.com/saorsa-labs/saorsa-core?branch=rc-2026.4.4#9ed219c687c7747a7842464d48d51d4ee5c0f0f1" -dependencies = [ - "anyhow", - "async-trait", - "blake3", - "bytes", - "dashmap", - "dirs 6.0.0", - "futures", - "hex", - "lru", - "once_cell", - "parking_lot", - "postcard", - "rand 0.8.6", - "saorsa-pqc 0.5.1", - "saorsa-transport 0.34.1 (registry+https://github.com/rust-lang/crates.io-index)", + "saorsa-transport", "serde", "serde_json", "tempfile", @@ -5356,66 +5325,6 @@ dependencies = [ "zeroize", ] -[[package]] -name = "saorsa-transport" -version = "0.34.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "31ebc09fb51e2c325e61ff09892f1256c60ef4f3beb881fd2980befe3e53e9bc" -dependencies = [ - "anyhow", - "async-trait", - "aws-lc-rs", - "blake3", - "bytes", - "chrono", - "clap", - "core-foundation 0.9.4", - "dashmap", - "dirs 5.0.1", - "enum_dispatch", - "futures-util", - "hex", - "igd-next", - "indexmap 2.14.0", - "keyring", - "libc", - "lru-slab", - "nix", - "once_cell", - "parking_lot", - "pin-project-lite", - "quinn-udp 0.6.1", - "rand 0.8.6", - "rcgen", - "regex", - "reqwest 0.13.3", - "rustc-hash", - "rustls", - "rustls-native-certs", - "rustls-pemfile", - "rustls-platform-verifier 0.6.2", - "rustls-post-quantum", - "saorsa-pqc 0.4.2", - "serde", - "serde_json", - "serde_yaml", - "slab", - "socket2 0.5.10", - "system-configuration 0.6.1", - "thiserror 2.0.18", - "time", - "tinyvec", - "tokio", - "tokio-util", - "tracing", - "tracing-subscriber", - "unicode-width", - "uuid", - "windows 0.58.0", - "x25519-dalek", - "zeroize", -] - [[package]] name = "saorsa-transport" version = "0.34.1" @@ -7028,6 +6937,19 @@ dependencies = [ "windows-targets 0.52.6", ] +[[package]] +name = "windows-core" +version = "0.62.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b8e83a14d34d0623b51dce9581199302a221863196a1dde71a7663a4c2be9deb" +dependencies = [ + "windows-implement 0.60.2", + "windows-interface 0.59.3", + "windows-link", + "windows-result 0.4.1", + "windows-strings 0.5.1", +] + [[package]] name = "windows-implement" version = "0.57.0" @@ -7050,6 +6972,17 @@ dependencies = [ "syn 2.0.117", ] +[[package]] +name = "windows-implement" +version = "0.60.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "053e2e040ab57b9dc951b72c264860db7eb3b0200ba345b4e4c3b14f67855ddf" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.117", +] + [[package]] name = "windows-interface" version = "0.57.0" @@ -7072,6 +7005,17 @@ dependencies = [ "syn 2.0.117", ] +[[package]] +name = "windows-interface" +version = "0.59.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3f316c4a2570ba26bbec722032c4099d8c8bc095efccdc15688708623367e358" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.117", +] + [[package]] name = "windows-link" version = "0.2.1" @@ -7171,6 +7115,15 @@ dependencies = [ "windows-targets 0.52.6", ] +[[package]] +name = "windows-sys" +version = "0.60.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f2f500e4d28234f72040990ec9d39e3a6b950f9f22d3dba18416c35882612bcb" +dependencies = [ + "windows-targets 0.53.5", +] + [[package]] name = "windows-sys" version = "0.61.2" @@ -7219,13 +7172,30 @@ dependencies = [ "windows_aarch64_gnullvm 0.52.6", "windows_aarch64_msvc 0.52.6", "windows_i686_gnu 0.52.6", - "windows_i686_gnullvm", + "windows_i686_gnullvm 0.52.6", "windows_i686_msvc 0.52.6", "windows_x86_64_gnu 0.52.6", "windows_x86_64_gnullvm 0.52.6", "windows_x86_64_msvc 0.52.6", ] +[[package]] +name = "windows-targets" +version = "0.53.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4945f9f551b88e0d65f3db0bc25c33b8acea4d9e41163edf90dcd0b19f9069f3" +dependencies = [ + "windows-link", + "windows_aarch64_gnullvm 0.53.1", + "windows_aarch64_msvc 0.53.1", + "windows_i686_gnu 0.53.1", + "windows_i686_gnullvm 0.53.1", + "windows_i686_msvc 0.53.1", + "windows_x86_64_gnu 0.53.1", + "windows_x86_64_gnullvm 0.53.1", + "windows_x86_64_msvc 0.53.1", +] + [[package]] name = "windows_aarch64_gnullvm" version = "0.42.2" @@ -7244,6 +7214,12 @@ version = "0.52.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "32a4622180e7a0ec044bb555404c800bc9fd9ec262ec147edd5989ccd0c02cd3" +[[package]] +name = "windows_aarch64_gnullvm" +version = "0.53.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a9d8416fa8b42f5c947f8482c43e7d89e73a173cead56d044f6a56104a6d1b53" + [[package]] name = "windows_aarch64_msvc" version = "0.42.2" @@ -7262,6 +7238,12 @@ version = "0.52.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "09ec2a7bb152e2252b53fa7803150007879548bc709c039df7627cabbd05d469" +[[package]] +name = "windows_aarch64_msvc" +version = "0.53.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b9d782e804c2f632e395708e99a94275910eb9100b2114651e04744e9b125006" + [[package]] name = "windows_i686_gnu" version = "0.42.2" @@ -7280,12 +7262,24 @@ version = "0.52.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8e9b5ad5ab802e97eb8e295ac6720e509ee4c243f69d781394014ebfe8bbfa0b" +[[package]] +name = "windows_i686_gnu" +version = "0.53.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "960e6da069d81e09becb0ca57a65220ddff016ff2d6af6a223cf372a506593a3" + [[package]] name = "windows_i686_gnullvm" version = "0.52.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0eee52d38c090b3caa76c563b86c3a4bd71ef1a819287c19d586d7334ae8ed66" +[[package]] +name = "windows_i686_gnullvm" +version = "0.53.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fa7359d10048f68ab8b09fa71c3daccfb0e9b559aed648a8f95469c27057180c" + [[package]] name = "windows_i686_msvc" version = "0.42.2" @@ -7304,6 +7298,12 @@ version = "0.52.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "240948bc05c5e7c6dabba28bf89d89ffce3e303022809e73deaefe4f6ec56c66" +[[package]] +name = "windows_i686_msvc" +version = "0.53.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1e7ac75179f18232fe9c285163565a57ef8d3c89254a30685b57d83a38d326c2" + [[package]] name = "windows_x86_64_gnu" version = "0.42.2" @@ -7322,6 +7322,12 @@ version = "0.52.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "147a5c80aabfbf0c7d901cb5895d1de30ef2907eb21fbbab29ca94c5b08b1a78" +[[package]] +name = "windows_x86_64_gnu" +version = "0.53.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9c3842cdd74a865a8066ab39c8a7a473c0778a3f29370b5fd6b4b9aa7df4a499" + [[package]] name = "windows_x86_64_gnullvm" version = "0.42.2" @@ -7340,6 +7346,12 @@ version = "0.52.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "24d5b23dc417412679681396f2b49f3de8c1473deb516bd34410872eff51ed0d" +[[package]] +name = "windows_x86_64_gnullvm" +version = "0.53.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0ffa179e2d07eee8ad8f57493436566c7cc30ac536a3379fdf008f47f6bb7ae1" + [[package]] name = "windows_x86_64_msvc" version = "0.42.2" @@ -7358,6 +7370,12 @@ version = "0.52.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "589f6da84c646204747d1270a2a5661ea66ed1cced2631d546fdfb155959f9ec" +[[package]] +name = "windows_x86_64_msvc" +version = "0.53.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d6bbff5f0aada427a1e5a6da5f1f98158182f26556f345ac9e04d36d0ebed650" + [[package]] name = "winnow" version = "0.7.15" @@ -7618,9 +7636,9 @@ dependencies = [ [[package]] name = "zerofrom" -version = "0.1.7" +version = "0.1.8" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "69faa1f2a1ea75661980b013019ed6687ed0e83d069bc1114e2cc74c6c04c4df" +checksum = "0ec05a11813ea801ff6d75110ad09cd0824ddba17dfe17128ea0d5f68e6c5272" dependencies = [ "zerofrom-derive", ] diff --git a/Cargo.toml b/Cargo.toml index 8c4c906..b3bbb58 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -16,3 +16,4 @@ resolver = "2" [patch.crates-io] ant-protocol = { git = "https://github.com/WithAutonomi/ant-protocol.git", rev = "93e63b8a41a97c37c24d1164a3ee5525e002ddcd" } ant-node = { git = "https://github.com/WithAutonomi/ant-node.git", rev = "8b68b2d7f4662faf67ed7812dc6cb37de0c74a8b" } + diff --git a/ant-cli/src/cli.rs b/ant-cli/src/cli.rs index ea23132..109ddb6 100644 --- a/ant-cli/src/cli.rs +++ b/ant-cli/src/cli.rs @@ -43,11 +43,17 @@ pub struct Cli { #[arg(long, default_value_t = 10, hide = true)] pub quote_timeout_secs: u64, - /// Per-op timeout for chunk store / retrieve operations (seconds). + /// Per-op timeout for chunk store operations (seconds). /// Static knob; the adaptive controller does not currently size /// timeouts. - #[arg(long, default_value_t = 60, hide = true)] - pub store_timeout_secs: u64, + #[arg(long, hide = true)] + pub store_timeout_secs: Option, + + /// Per-peer timeout for chunk retrieve operations (seconds). + /// Static knob; the adaptive controller does not currently size + /// timeouts. + #[arg(long, hide = true)] + pub chunk_get_timeout_secs: Option, /// **Deprecated.** Adaptive controller now sizes quote /// concurrency from observed network signals. Setting this caps diff --git a/ant-cli/src/main.rs b/ant-cli/src/main.rs index 26e849a..0430047 100644 --- a/ant-cli/src/main.rs +++ b/ant-cli/src/main.rs @@ -78,6 +78,7 @@ async fn run() -> anyhow::Result<()> { ipv4_only, quote_timeout_secs, store_timeout_secs, + chunk_get_timeout_secs, verbose: _, evm_network, quote_concurrency, @@ -92,6 +93,7 @@ async fn run() -> anyhow::Result<()> { ipv4_only, quote_timeout_secs, store_timeout_secs, + chunk_get_timeout_secs, evm_network, quote_concurrency, store_concurrency, @@ -170,7 +172,8 @@ struct DataCliContext { allow_loopback: bool, ipv4_only: bool, quote_timeout_secs: u64, - store_timeout_secs: u64, + store_timeout_secs: Option, + chunk_get_timeout_secs: Option, evm_network: String, quote_concurrency: Option, store_concurrency: Option, @@ -256,9 +259,14 @@ async fn build_data_client( let mut config = ClientConfig { quote_timeout_secs: ctx.quote_timeout_secs, - store_timeout_secs: ctx.store_timeout_secs, ..Default::default() }; + if let Some(t) = ctx.store_timeout_secs { + config.store_timeout_secs = t; + } + if let Some(t) = ctx.chunk_get_timeout_secs { + config.chunk_get_timeout_secs = t; + } // Legacy default values are treated as "not pinned" by build_controller // (so the default ClientConfig doesn't silently lower the new // adaptive ceilings). Mirror that here so the deprecation warning diff --git a/ant-core/Cargo.toml b/ant-core/Cargo.toml index 5d9c881..073b354 100644 --- a/ant-core/Cargo.toml +++ b/ant-core/Cargo.toml @@ -36,7 +36,7 @@ tower-http = { version = "0.6.8", features = ["cors"] } # under `ant_protocol::{evm, transport, pqc}`. This is the ONE pin for # those three deps — do not add direct evmlib/saorsa-core/saorsa-pqc # deps here or the version can skew between ant-client and ant-node. -ant-protocol = "2.0.3" +ant-protocol = { git = "https://github.com/WithAutonomi/ant-protocol", rev = "93e63b8a41a97c37c24d1164a3ee5525e002ddcd" } xor_name = "5" self_encryption = "0.35.0" futures = "0.3" @@ -56,7 +56,7 @@ sysinfo = { version = "0.32", default-features = false, features = ["system"] } # ant-node is optional. It is only linked for the `LocalDevnet` wrapper # that spawns a local in-process network for development and testing. # Enable with `--features devnet`. -ant-node = { version = "0.11.1", optional = true } +ant-node = { git = "https://github.com/WithAutonomi/ant-node", rev = "8b68b2d7f4662faf67ed7812dc6cb37de0c74a8b", optional = true } tracing-subscriber = { version = "0.3", features = ["env-filter"] } [target.'cfg(unix)'.dependencies] @@ -82,7 +82,7 @@ devnet = ["dep:ant-node"] # in-process to exercise the wire protocol end-to-end. ant-node is a # dev-dep here (separate from the optional runtime dep above) so tests # always compile even without the `devnet` feature. -ant-node = "0.11.1" +ant-node = { git = "https://github.com/WithAutonomi/ant-node", rev = "8b68b2d7f4662faf67ed7812dc6cb37de0c74a8b" } serial_test = "3" anyhow = "1" alloy = { version = "1.6", features = ["node-bindings"] } @@ -92,7 +92,7 @@ rmp-serde = "1" # which populates a cache via `add_peer_trusted` (bypasses Sybil rate limits) # and then verifies reload after save. Test-only — no runtime version-pin # concern. Tracks ant-node's transitive saorsa-core dep. -saorsa-core = { git = "https://github.com/saorsa-labs/saorsa-core", branch = "rc-2026.4.4" } +saorsa-core = { git = "https://github.com/saorsa-labs/saorsa-core", branch = "fix/stability-improvements" } [[example]] name = "start-local-devnet" diff --git a/ant-core/src/data/client/cached_merkle.rs b/ant-core/src/data/client/cached_merkle.rs new file mode 100644 index 0000000..78b83be --- /dev/null +++ b/ant-core/src/data/client/cached_merkle.rs @@ -0,0 +1,372 @@ +//! On-disk cache for merkle batch payment receipts. +//! +//! Why this exists +//! --------------- +//! A merkle batch upload pays for *all* chunks in one on-chain transaction +//! up-front, then stores each chunk to its close-group. If the store phase +//! fails partway through (network flake, slow close-K, client crash), the +//! on-chain payment is gone but the proofs needed to re-attempt the store +//! are lost too — the user has to pay again from scratch. +//! +//! By persisting the [`MerkleBatchPaymentResult`] to disk **immediately after +//! the on-chain payment lands**, the next invocation can resume the upload +//! using the already-paid proofs instead of re-paying. The cache is keyed by +//! a derivation of the source file path so the same upload, re-issued for +//! the same file, transparently picks up where it left off. +//! +//! Lifecycle +//! --------- +//! * **save** — called once per upload, right after the merkle batch payment +//! transaction confirms. Writes JSON to +//! `/payments/_`. +//! * **load_for_file** — called at the top of every merkle upload. If a +//! non-expired cached receipt exists for the file, it is returned so the +//! upload can skip the pay phase and go straight to store. +//! * **delete_for_file** — called after a fully successful upload to remove +//! the receipt so a future re-upload of the same path pays anew. +//! * **cleanup_outdated** — called opportunistically inside `load_for_file` +//! to garbage-collect receipts past the 7-day expiry window. +//! +//! Filename format +//! --------------- +//! `_` where: +//! * `timestamp` is the merkle payment timestamp (seconds since epoch) used +//! on-chain. Expiry is computed from this value so we can prune stale +//! receipts even if their on-disk mtime has been touched. +//! * `file_hash` is the SHA-256 of the source file path string, truncated +//! to keep filenames short. Same-name uploads from different directories +//! collide deliberately — the user can name their file uniquely if they +//! need parallel uploads. +//! +//! Failure-mode tolerance +//! ---------------------- +//! All errors in this module are logged and swallowed in the public-facing +//! API (`try_load_for_file`, `try_save`, `try_delete_for_file`): a busted +//! cache directory must never prevent a real upload from running. The +//! tradeoff is that a corrupt cache file is silently treated as "no +//! cache", forcing the user to re-pay — but never causes data loss. + +use crate::config; +use crate::data::client::merkle::MerkleBatchPaymentResult; +use crate::error::Result; +use std::fs::{self, DirEntry, File}; +use std::hash::{Hash, Hasher}; +use std::io::{BufReader, BufWriter}; +use std::path::{Path, PathBuf}; +use std::time::{SystemTime, UNIX_EPOCH}; +use tracing::{debug, info, warn}; + +/// Cached merkle receipts older than this are removed from disk. +/// +/// Set to match `MERKLE_PAYMENT_EXPIRATION` in `evmlib` (7 days). After +/// the payment ages out on-chain there is no point keeping the cache — +/// the proofs can no longer be verified by storers. +const PAYMENT_EXPIRATION_SECS: u64 = 7 * 24 * 60 * 60; + +/// Subdirectory under the platform-appropriate data dir. +const PAYMENTS_SUBDIR: &str = "payments"; + +/// Returns the directory used for cached payments, creating it if needed. +fn payments_dir() -> Result { + let dir = config::data_dir()?.join(PAYMENTS_SUBDIR); + fs::create_dir_all(&dir)?; + Ok(dir) +} + +/// Short non-cryptographic hash of the source file path string, used as +/// the on-disk cache key. +/// +/// Filename collisions are not a correctness problem (the loaded +/// receipt is content-validated against the current encrypted chunk +/// addresses before being trusted) but they would waste a re-pay, so +/// we want low collision probability across a single user's upload +/// history. `std::hash::DefaultHasher` with 16 hex chars of output is +/// far below the collision threshold for that scale. +fn file_hash_key(file_path: &str) -> String { + let mut hasher = std::collections::hash_map::DefaultHasher::new(); + file_path.hash(&mut hasher); + format!("{:016x}", hasher.finish()) +} + +/// Save the merkle batch payment receipt for a given source file path. +/// +/// Idempotent: re-saving for the same `(timestamp, file_path)` overwrites +/// the previous file. Different timestamps for the same file produce +/// different filenames, which is fine — `cleanup_outdated` reaps them. +pub fn save(file_path: &str, result: &MerkleBatchPaymentResult) -> Result { + let dir = payments_dir()?; + let ts = if result.merkle_payment_timestamp > 0 { + result.merkle_payment_timestamp + } else { + // Fall back to now() if the result wasn't populated. Should not + // happen in practice — every constructor stamps this field — + // but defensively avoid emitting a `0_*` filename that would + // immediately be treated as expired. + SystemTime::now() + .duration_since(UNIX_EPOCH) + .map(|d| d.as_secs()) + .unwrap_or(0) + }; + let path = dir.join(format!("{ts}_{}", file_hash_key(file_path))); + let handle = File::create(&path)?; + // msgpack (rmp-serde) rather than JSON because `proofs` is keyed by + // `[u8; 32]` which JSON cannot represent as a map key. + rmp_serde::encode::write(&mut BufWriter::new(handle), result) + .map_err(|e| crate::error::Error::Io(std::io::Error::other(e.to_string())))?; + debug!( + "Cached merkle payment receipt for {file_path:?} to {}", + path.display() + ); + Ok(path) +} + +/// Best-effort save. Logs on failure but never returns an error. +/// +/// Intended for the upload path: if we can't cache the receipt we still +/// want to attempt the chunk PUTs. +pub fn try_save(file_path: &str, result: &MerkleBatchPaymentResult) { + if let Err(e) = save(file_path, result) { + warn!( + "Failed to cache merkle payment receipt for {file_path:?}: {e}. \ + Upload will proceed without resume support." + ); + } +} + +/// Load the cached merkle batch receipt for a given source file path. +/// +/// Side-effect: opportunistically removes any expired receipts found in +/// the directory while scanning. +/// +/// Returns `Ok(None)` if no matching non-expired receipt is found. +pub fn load_for_file(file_path: &str) -> Result> { + cleanup_outdated(); + let dir = payments_dir()?; + let key = file_hash_key(file_path); + + let read_dir = match fs::read_dir(&dir) { + Ok(rd) => rd, + Err(e) => { + debug!("Could not read payments dir {}: {e}", dir.display()); + return Ok(None); + } + }; + + for entry in read_dir.flatten() { + let path = entry.path(); + if !path.is_file() { + continue; + } + let Some(name) = path.file_name().and_then(|n| n.to_str()) else { + continue; + }; + if !name.contains(&key) { + continue; + } + if is_expired_filename(name) { + // Found the file but it has aged out; cleanup will + // collect it. Keep scanning in case a newer one exists. + continue; + } + match read_receipt(&path) { + Ok(receipt) => { + info!( + "Found previous merkle upload attempt for {file_path}, \ + resuming with payment cached at {}", + path.display() + ); + return Ok(Some((path, receipt))); + } + Err(e) => { + warn!( + "Cached merkle receipt at {} is unreadable ({e}). \ + Ignoring and starting a fresh upload.", + path.display() + ); + } + } + } + Ok(None) +} + +/// Best-effort load. Logs on failure and returns `None`. +pub fn try_load_for_file(file_path: &str) -> Option<(PathBuf, MerkleBatchPaymentResult)> { + match load_for_file(file_path) { + Ok(opt) => opt, + Err(e) => { + warn!( + "Failed to look up cached merkle receipt for {file_path:?}: {e}. \ + Starting a fresh upload." + ); + None + } + } +} + +/// Delete the cached receipt(s) matching the file path. Called on +/// successful upload completion. +pub fn delete_for_file(file_path: &str) -> Result<()> { + let dir = payments_dir()?; + let key = file_hash_key(file_path); + if let Ok(read_dir) = fs::read_dir(&dir) { + for entry in read_dir.flatten() { + let path = entry.path(); + if let Some(name) = path.file_name().and_then(|n| n.to_str()) { + if name.contains(&key) { + let _ = fs::remove_file(&path); + debug!("Deleted cached merkle receipt {}", path.display()); + } + } + } + } + Ok(()) +} + +/// Best-effort delete. Logs on failure but never returns an error. +pub fn try_delete_for_file(file_path: &str) { + if let Err(e) = delete_for_file(file_path) { + warn!( + "Failed to delete cached merkle receipt for {file_path:?}: {e}. \ + Will be cleaned up after expiry." + ); + } +} + +/// Garbage-collect cached receipts past the expiry window. +/// +/// Logs each removal at info level so users see what we cleaned up. +/// Best-effort: any IO error is silently ignored. +pub fn cleanup_outdated() { + let Ok(dir) = payments_dir() else { + return; + }; + let Ok(read_dir) = fs::read_dir(&dir) else { + return; + }; + for entry in read_dir.flatten() { + if is_expired_entry(&entry) { + let path = entry.path(); + info!( + "Removing expired cached merkle payment file: {}", + path.display() + ); + let _ = fs::remove_file(path); + } + } +} + +fn is_expired_entry(entry: &DirEntry) -> bool { + let path = entry.path(); + if !path.is_file() { + return false; + } + let Some(name) = path.file_name().and_then(|n| n.to_str()) else { + return false; + }; + is_expired_filename(name) +} + +fn is_expired_filename(name: &str) -> bool { + let ts_str = match name.split_once('_') { + Some((ts, _)) => ts, + None => return false, + }; + let Ok(ts) = ts_str.parse::() else { + return false; + }; + let now = SystemTime::now() + .duration_since(UNIX_EPOCH) + .map(|d| d.as_secs()) + .unwrap_or(0); + now > ts.saturating_add(PAYMENT_EXPIRATION_SECS) +} + +fn read_receipt(path: &Path) -> Result { + let handle = File::open(path)?; + let receipt: MerkleBatchPaymentResult = rmp_serde::decode::from_read(BufReader::new(handle)) + .map_err(|e| crate::error::Error::Io(std::io::Error::other(e.to_string())))?; + Ok(receipt) +} + +#[cfg(test)] +mod tests { + use super::*; + use std::collections::HashMap; + + fn dummy_receipt(ts: u64) -> MerkleBatchPaymentResult { + let mut proofs: HashMap<[u8; 32], Vec> = HashMap::new(); + proofs.insert([0u8; 32], vec![1, 2, 3]); + MerkleBatchPaymentResult { + proofs, + chunk_count: 1, + storage_cost_atto: "0".to_string(), + gas_cost_wei: 0, + merkle_payment_timestamp: ts, + } + } + + #[test] + fn file_hash_key_is_stable() { + let a = file_hash_key("/tmp/some/file.bin"); + let b = file_hash_key("/tmp/some/file.bin"); + assert_eq!(a, b); + let c = file_hash_key("/tmp/some/other.bin"); + assert_ne!(a, c); + } + + #[test] + fn expired_filename_detected() { + // Just past the expiry boundary. + let stale = SystemTime::now() + .duration_since(UNIX_EPOCH) + .unwrap() + .as_secs() + .saturating_sub(PAYMENT_EXPIRATION_SECS + 60); + let name = format!("{stale}_abcd1234"); + assert!(is_expired_filename(&name)); + + // Within the window. + let fresh = SystemTime::now() + .duration_since(UNIX_EPOCH) + .unwrap() + .as_secs() + .saturating_sub(60); + let name = format!("{fresh}_abcd1234"); + assert!(!is_expired_filename(&name)); + } + + #[test] + fn malformed_filename_is_not_expired() { + // Defensive: garbage in payments dir must not be auto-deleted. + assert!(!is_expired_filename("nonsense")); + assert!(!is_expired_filename("not_a_number_abcd1234")); + } + + #[test] + fn roundtrip_save_load_delete() -> Result<()> { + let file_path = format!( + "/tmp/anselme-resumable-merkle-test-{}", + SystemTime::now() + .duration_since(UNIX_EPOCH) + .unwrap() + .as_nanos() + ); + let ts = SystemTime::now() + .duration_since(UNIX_EPOCH) + .unwrap() + .as_secs(); + let receipt = dummy_receipt(ts); + let saved_path = save(&file_path, &receipt)?; + assert!(saved_path.exists()); + + let loaded = load_for_file(&file_path)?; + let (loaded_path, loaded_receipt) = loaded.expect("receipt should be loadable"); + assert_eq!(loaded_path, saved_path); + assert_eq!(loaded_receipt.chunk_count, receipt.chunk_count); + assert_eq!(loaded_receipt.merkle_payment_timestamp, ts); + + delete_for_file(&file_path)?; + assert!(load_for_file(&file_path)?.is_none()); + Ok(()) + } +} diff --git a/ant-core/src/data/client/chunk.rs b/ant-core/src/data/client/chunk.rs index 8a4226b..c6a2ff7 100644 --- a/ant-core/src/data/client/chunk.rs +++ b/ant-core/src/data/client/chunk.rs @@ -22,7 +22,7 @@ use tracing::{debug, info, warn}; const CHUNK_DATA_TYPE: u32 = 0; /// Store-response timeout for non-merkle chunk PUTs. -const STORE_RESPONSE_TIMEOUT: Duration = Duration::from_secs(30); +const STORE_RESPONSE_TIMEOUT: Duration = Duration::from_secs(10); fn store_response_timeout_for_proof(proof: &[u8], merkle_timeout_secs: u64) -> Duration { match detect_proof_type(proof) { @@ -313,9 +313,9 @@ impl Client { .encode() .map_err(|e| Error::Protocol(format!("Failed to encode GET request: {e}")))?; - let timeout = Duration::from_secs(self.config().store_timeout_secs); + let timeout = Duration::from_secs(self.config().chunk_get_timeout_secs); let addr_hex = hex::encode(address); - let timeout_secs = self.config().store_timeout_secs; + let timeout_secs = self.config().chunk_get_timeout_secs; let start = Instant::now(); let result = send_and_await_chunk_response( diff --git a/ant-core/src/data/client/file.rs b/ant-core/src/data/client/file.rs index bf26cf0..70f0a88 100644 --- a/ant-core/src/data/client/file.rs +++ b/ant-core/src/data/client/file.rs @@ -1282,46 +1282,100 @@ impl Client { } // Phase 2: Decide payment mode and upload in waves from disk. - let (chunks_stored, actual_mode, storage_cost_atto, gas_cost_wei, stats) = - if self.should_use_merkle(chunk_count, mode) { - info!("Using merkle batch payment for {chunk_count} file chunks"); + // + // For the merkle path, attempt to resume from a cached + // receipt before paying again. The cache is keyed by the + // source file path; a successful upload deletes the cache so + // a subsequent re-upload of the same path will pay anew. + let file_path_key = path.display().to_string(); + let (chunks_stored, actual_mode, storage_cost_atto, gas_cost_wei, stats) = if self + .should_use_merkle(chunk_count, mode) + { + info!("Using merkle batch payment for {chunk_count} file chunks"); - let batch_result = match self - .pay_for_merkle_batch(&spill.addresses, DATA_TYPE_CHUNK, spill.avg_chunk_size()) + let batch_result = if let Some((_cache_path, cached)) = + crate::data::client::cached_merkle::try_load_for_file(&file_path_key) + { + // Validate the cache matches this upload. If the + // file was edited between attempts the cached + // proofs would no longer be valid for the new + // chunk addresses; in that case drop the cache + // and pay fresh. + let addresses_match = spill + .addresses + .iter() + .all(|addr| cached.proofs.contains_key(addr)); + if addresses_match && cached.proofs.len() == chunk_count { + info!( + "Skipping merkle payment phase; resuming with \ + cached proofs ({} chunks)", + cached.proofs.len() + ); + Ok(cached) + } else { + info!( + "Cached merkle receipt does not match current file \ + content (cached={}, file={chunk_count}). \ + Discarding cache and paying fresh.", + cached.proofs.len() + ); + crate::data::client::cached_merkle::try_delete_for_file(&file_path_key); + self.pay_for_merkle_batch( + &spill.addresses, + DATA_TYPE_CHUNK, + spill.avg_chunk_size(), + ) .await - { - Ok(result) => result, - Err(Error::InsufficientPeers(ref msg)) if mode == PaymentMode::Auto => { - info!("Merkle needs more peers ({msg}), falling back to wave-batch"); - let (stored, sc, gc, fb_stats) = - self.upload_waves_single(&spill, progress.as_ref()).await?; - return Ok(FileUploadResult { - data_map, - chunks_stored: stored, - chunks_failed: 0, - total_chunks: chunk_count, - payment_mode_used: PaymentMode::Single, - storage_cost_atto: sc, - gas_cost_wei: gc, - data_map_address: None, - chunk_attempts_total: fb_stats.chunk_attempts_total, - store_durations_ms: fb_stats.store_durations_ms, - retries_histogram: fb_stats.retries_histogram, - }); - } - Err(e) => return Err(e), - }; - - let (stored, sc, gc, stats) = self - .upload_waves_merkle(&spill, &batch_result, progress.as_ref()) - .await?; - (stored, PaymentMode::Merkle, sc, gc, stats) + .inspect(|result| { + crate::data::client::cached_merkle::try_save(&file_path_key, result); + }) + } } else { - let (stored, sc, gc, stats) = - self.upload_waves_single(&spill, progress.as_ref()).await?; - (stored, PaymentMode::Single, sc, gc, stats) + self.pay_for_merkle_batch(&spill.addresses, DATA_TYPE_CHUNK, spill.avg_chunk_size()) + .await + .inspect(|result| { + // Save BEFORE the store phase so a crash + // mid-upload leaves a resumable receipt. + crate::data::client::cached_merkle::try_save(&file_path_key, result); + }) }; + let batch_result = match batch_result { + Ok(result) => result, + Err(Error::InsufficientPeers(ref msg)) if mode == PaymentMode::Auto => { + info!("Merkle needs more peers ({msg}), falling back to wave-batch"); + let (stored, sc, gc, fb_stats) = + self.upload_waves_single(&spill, progress.as_ref()).await?; + return Ok(FileUploadResult { + data_map, + chunks_stored: stored, + chunks_failed: 0, + total_chunks: chunk_count, + payment_mode_used: PaymentMode::Single, + storage_cost_atto: sc, + gas_cost_wei: gc, + data_map_address: None, + chunk_attempts_total: fb_stats.chunk_attempts_total, + store_durations_ms: fb_stats.store_durations_ms, + retries_histogram: fb_stats.retries_histogram, + }); + } + Err(e) => return Err(e), + }; + + let (stored, sc, gc, stats) = self + .upload_waves_merkle(&spill, &batch_result, progress.as_ref()) + .await?; + // Upload succeeded end-to-end; the cached receipt is + // no longer needed. + crate::data::client::cached_merkle::try_delete_for_file(&file_path_key); + (stored, PaymentMode::Merkle, sc, gc, stats) + } else { + let (stored, sc, gc, stats) = + self.upload_waves_single(&spill, progress.as_ref()).await?; + (stored, PaymentMode::Single, sc, gc, stats) + }; + info!( "File uploaded with {actual_mode:?}: {chunks_stored} chunks stored ({})", path.display() diff --git a/ant-core/src/data/client/merkle.rs b/ant-core/src/data/client/merkle.rs index 8f4fea2..b673125 100644 --- a/ant-core/src/data/client/merkle.rs +++ b/ant-core/src/data/client/merkle.rs @@ -44,7 +44,10 @@ pub enum PaymentMode { } /// Result of a merkle batch payment. -#[derive(Debug)] +/// +/// Serializable so it can be persisted across runs for resume after a +/// partial-upload failure. See `crate::data::client::cached_merkle`. +#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)] pub struct MerkleBatchPaymentResult { /// Map of `XorName` to serialized tagged proof bytes (ready to use in PUT requests). pub proofs: HashMap<[u8; 32], Vec>, @@ -54,6 +57,12 @@ pub struct MerkleBatchPaymentResult { pub storage_cost_atto: String, /// Total gas cost in wei. pub gas_cost_wei: u128, + /// Unix timestamp (seconds) used for the on-chain merkle payment. + /// Persisted so resume can check whether the on-chain payment has + /// aged out beyond the merkle expiration window and the cached + /// receipt must be discarded. + #[serde(default)] + pub merkle_payment_timestamp: u64, } /// Prepared merkle batch ready for external payment. @@ -252,6 +261,10 @@ impl Client { let mut all_proofs = HashMap::with_capacity(addresses.len()); let mut total_storage = Amount::ZERO; let mut total_gas: u128 = 0; + // Track the oldest sub-batch timestamp so the overall receipt + // expires when the *first* sub-batch's on-chain payment ages + // out (worst case for resume). + let mut oldest_ts: u64 = 0; for (i, chunk) in sub_batches.into_iter().enumerate() { match self @@ -263,6 +276,12 @@ impl Client { total_storage += cost; } total_gas = total_gas.saturating_add(sub_result.gas_cost_wei); + if oldest_ts == 0 + || (sub_result.merkle_payment_timestamp > 0 + && sub_result.merkle_payment_timestamp < oldest_ts) + { + oldest_ts = sub_result.merkle_payment_timestamp; + } all_proofs.extend(sub_result.proofs); } Err(e) => { @@ -282,6 +301,7 @@ impl Client { proofs: all_proofs, storage_cost_atto: total_storage.to_string(), gas_cost_wei: total_gas, + merkle_payment_timestamp: oldest_ts, }); } } @@ -292,6 +312,7 @@ impl Client { proofs: all_proofs, storage_cost_atto: total_storage.to_string(), gas_cost_wei: total_gas, + merkle_payment_timestamp: oldest_ts, }) } @@ -640,6 +661,7 @@ pub fn finalize_merkle_batch( chunk_count, storage_cost_atto: "0".to_string(), gas_cost_wei: 0, + merkle_payment_timestamp: prepared.merkle_payment_timestamp, }) } diff --git a/ant-core/src/data/client/mod.rs b/ant-core/src/data/client/mod.rs index e2f3fb4..5e23ccb 100644 --- a/ant-core/src/data/client/mod.rs +++ b/ant-core/src/data/client/mod.rs @@ -6,6 +6,7 @@ pub mod adaptive; pub mod batch; pub mod cache; +pub(crate) mod cached_merkle; pub mod chunk; pub mod data; pub mod file; @@ -111,6 +112,9 @@ const DEFAULT_STORE_TIMEOUT_SECS: u64 = 10; /// 2026-05-12; bumping to 270 s flipped that 0/22 -> 9/9 pass rate. const DEFAULT_MERKLE_STORE_TIMEOUT_SECS: u64 = 270; +/// Default timeout for chunk GET response operations in seconds. +const DEFAULT_CHUNK_GET_TIMEOUT_SECS: u64 = 10; + /// Default quote concurrency: high because quoting is pure network I/O /// (DHT lookups + small request/response messages) with no CPU-bound work. const DEFAULT_QUOTE_CONCURRENCY: usize = 32; @@ -150,6 +154,10 @@ pub struct ClientConfig { /// plus padding, or the storer wastes work on a chunk the client /// has already given up on. Default 270 s. pub merkle_store_timeout_secs: u64, + /// Per-peer response timeout for chunk GET operations, in seconds. + /// This is intentionally independent from `store_timeout_secs`: PUTs + /// and GETs have different payload direction and performance profiles. + pub chunk_get_timeout_secs: u64, /// Number of closest peers to consider for routing. pub close_group_size: usize, /// **Deprecated.** Pre-adaptive ceiling for quote concurrency. @@ -198,6 +206,7 @@ impl Default for ClientConfig { quote_timeout_secs: DEFAULT_QUOTE_TIMEOUT_SECS, store_timeout_secs: DEFAULT_STORE_TIMEOUT_SECS, merkle_store_timeout_secs: DEFAULT_MERKLE_STORE_TIMEOUT_SECS, + chunk_get_timeout_secs: DEFAULT_CHUNK_GET_TIMEOUT_SECS, close_group_size: CLOSE_GROUP_SIZE, quote_concurrency: DEFAULT_QUOTE_CONCURRENCY, store_concurrency: DEFAULT_STORE_CONCURRENCY,