From f0a17601b3956dfe08fa5e5c8a81c65ba2b9cd86 Mon Sep 17 00:00:00 2001 From: Warm Beer Date: Tue, 12 May 2026 14:05:41 +0200 Subject: [PATCH 1/2] feat: adapt stream decrypt batch sizing --- Cargo.lock | 24 ++- ant-core/Cargo.toml | 2 +- ant-core/src/data/client/file.rs | 279 ++++++++++++++++++++++++------- ant-core/src/data/client/mod.rs | 18 +- 4 files changed, 245 insertions(+), 78 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 8bd6b16..9a3e9fe 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -873,7 +873,7 @@ dependencies = [ "rmp-serde", "saorsa-core", "self-replace", - "self_encryption", + "self_encryption 0.36.0", "semver 1.0.28", "serde", "serde_json", @@ -936,7 +936,7 @@ dependencies = [ "saorsa-core", "saorsa-pqc 0.5.1", "self-replace", - "self_encryption", + "self_encryption 0.35.0", "semver 1.0.28", "serde", "serde_json", @@ -5512,6 +5512,26 @@ dependencies = [ "xor_name", ] +[[package]] +name = "self_encryption" +version = "0.36.0" +dependencies = [ + "bincode", + "blake3", + "brotli", + "bytes", + "chacha20poly1305", + "hex", + "rand 0.8.6", + "rand_chacha 0.3.1", + "rayon", + "serde", + "tempfile", + "thiserror 1.0.69", + "tokio", + "xor_name", +] + [[package]] name = "semver" version = "0.11.0" diff --git a/ant-core/Cargo.toml b/ant-core/Cargo.toml index e4e18c5..6c00ce1 100644 --- a/ant-core/Cargo.toml +++ b/ant-core/Cargo.toml @@ -38,7 +38,7 @@ tower-http = { version = "0.6.8", features = ["cors"] } # deps here or the version can skew between ant-client and ant-node. ant-protocol = { git = "https://github.com/WithAutonomi/ant-protocol", branch = "rc-2026.5.2" } xor_name = "5" -self_encryption = "0.35.0" +self_encryption = { path = "../../self_encryption" } futures = "0.3" postcard = { version = "1.1.3", features = ["use-std"] } rmp-serde = "1" diff --git a/ant-core/src/data/client/file.rs b/ant-core/src/data/client/file.rs index 70f0a88..2bf6544 100644 --- a/ant-core/src/data/client/file.rs +++ b/ant-core/src/data/client/file.rs @@ -27,7 +27,10 @@ use ant_protocol::{compute_address, DATA_TYPE_CHUNK}; use bytes::Bytes; use fs2::FileExt; use futures::stream::{self, StreamExt}; -use self_encryption::{get_root_data_map_parallel, stream_encrypt, streaming_decrypt, DataMap}; +use self_encryption::{ + get_root_data_map_parallel, stream_decrypt_batch_size, stream_encrypt, + streaming_decrypt_with_batch_size, DataMap, +}; use std::collections::{HashMap, HashSet}; use std::io::Write; use std::path::{Path, PathBuf}; @@ -85,6 +88,19 @@ type QuoteEntry = (PeerId, Vec, PaymentQuote, Amount); /// Number of chunks per upload wave (matches batch.rs PAYMENT_WAVE_SIZE). const UPLOAD_WAVE_SIZE: usize = 64; +/// Stream decrypt batches should be larger than fetch fan-out so +/// `buffer_unordered` can keep launching new chunk GETs as earlier ones +/// complete, instead of stopping at each self-encryption batch boundary. +const DOWNLOAD_STREAM_BATCH_FETCH_MULTIPLIER: usize = 4; + +/// Use at most this fraction of currently usable RAM for one decrypt batch. +const DOWNLOAD_STREAM_BATCH_MEMORY_BUDGET_DIVISOR: u64 = 4; + +/// A decrypt batch briefly holds encrypted chunk bytes, decrypted chunk bytes, +/// and Vec/Bytes overhead. Use a conservative multiplier rather than assuming +/// payload bytes alone. +const DOWNLOAD_STREAM_BATCH_BYTES_PER_CHUNK_MULTIPLIER: u64 = 3; + /// Maximum number of distinct chunk addresses to sample when probing for a /// representative quote in [`Client::estimate_upload_cost`]. /// @@ -428,6 +444,75 @@ fn check_disk_space_for_spill(file_size: u64) -> Result<()> { Ok(()) } +fn usable_memory_bytes() -> Option { + let mut system = sysinfo::System::new(); + system.refresh_memory(); + + let available_memory = system.available_memory(); + let free_memory = system.free_memory(); + let used_memory = system.used_memory(); + let total_memory = system.total_memory(); + let unused_memory = total_memory.saturating_sub(used_memory); + + let mut usable = [available_memory, free_memory, unused_memory] + .into_iter() + .filter(|bytes| *bytes > 0) + .max(); + + let cgroup_free_memory = system + .cgroup_limits() + .filter(|limits| limits.total_memory > 0) + .map(|limits| limits.free_memory); + if let Some(cgroup_free_memory) = cgroup_free_memory { + usable = Some(usable.unwrap_or(u64::MAX).min(cgroup_free_memory)); + } + + debug!( + available_memory, + free_memory, + used_memory, + total_memory, + cgroup_free_memory, + usable_memory = ?usable, + "Detected usable memory for stream decrypt batch sizing" + ); + + usable +} + +fn stream_decrypt_batch_memory_cap(usable_memory_bytes: u64) -> usize { + let budget = usable_memory_bytes / DOWNLOAD_STREAM_BATCH_MEMORY_BUDGET_DIVISOR; + let estimated_bytes_per_chunk = (self_encryption::MAX_CHUNK_SIZE as u64) + .saturating_mul(DOWNLOAD_STREAM_BATCH_BYTES_PER_CHUNK_MULTIPLIER) + .max(1); + let cap = (budget / estimated_bytes_per_chunk).max(1); + + usize::try_from(cap).unwrap_or(usize::MAX) +} + +fn adaptive_stream_decrypt_batch_size( + total_chunks: usize, + fetch_cap: usize, + configured_batch_floor: usize, + usable_memory_bytes: Option, +) -> usize { + let fetch_target = fetch_cap + .max(1) + .saturating_mul(DOWNLOAD_STREAM_BATCH_FETCH_MULTIPLIER); + let requested = match usable_memory_bytes { + Some(bytes) => { + let memory_cap = stream_decrypt_batch_memory_cap(bytes); + configured_batch_floor + .max(fetch_target) + .max(1) + .min(memory_cap) + } + None => configured_batch_floor.max(1), + }; + + requested.min(total_chunks.max(1)).max(1) +} + /// Whether the data map is published to the network for address-based retrieval. /// /// A private upload stores only the data chunks and returns the `DataMap` to @@ -1793,70 +1878,92 @@ impl Client { let progress_for_closure = progress.clone(); let fetch_limiter_outer = self.controller().fetch.clone(); - let stream = streaming_decrypt(&root_map, |batch: &[(usize, XorName)]| { - let batch_owned: Vec<(usize, XorName)> = batch.to_vec(); - let fetched_ref = fetched_for_closure.clone(); - let progress_ref = progress_for_closure.clone(); - let fetch_limiter = fetch_limiter_outer.clone(); - - tokio::task::block_in_place(|| { - handle.block_on(async { - // Clamp fan-out to batch size — see PERF-RESULTS.md. - let cap = fetch_limiter.current().min(batch_owned.len().max(1)); - let mut stream = futures::stream::iter(batch_owned) - .map(|(idx, hash)| { - let addr = hash.0; - let limiter = fetch_limiter.clone(); - async move { - let result = observe_op( - &limiter, - || async move { self.chunk_get(&addr).await }, - classify_error, - ) - .await; - (idx, hash, result) + let usable_memory = usable_memory_bytes(); + let configured_batch_floor = stream_decrypt_batch_size(); + let fetch_cap = fetch_limiter_outer.current(); + let decrypt_batch_size = adaptive_stream_decrypt_batch_size( + total_chunks, + fetch_cap, + configured_batch_floor, + usable_memory, + ); + info!( + total_chunks, + fetch_cap, + configured_batch_floor, + ?usable_memory, + decrypt_batch_size, + "Selected adaptive stream decrypt batch size" + ); + + let stream = streaming_decrypt_with_batch_size( + &root_map, + |batch: &[(usize, XorName)]| { + let batch_owned: Vec<(usize, XorName)> = batch.to_vec(); + let fetched_ref = fetched_for_closure.clone(); + let progress_ref = progress_for_closure.clone(); + let fetch_limiter = fetch_limiter_outer.clone(); + + tokio::task::block_in_place(|| { + handle.block_on(async { + // Clamp fan-out to batch size — see PERF-RESULTS.md. + let cap = fetch_limiter.current().min(batch_owned.len().max(1)); + let mut stream = futures::stream::iter(batch_owned) + .map(|(idx, hash)| { + let addr = hash.0; + let limiter = fetch_limiter.clone(); + async move { + let result = observe_op( + &limiter, + || async move { self.chunk_get(&addr).await }, + classify_error, + ) + .await; + (idx, hash, result) + } + }) + .buffer_unordered(cap); + + let mut results = Vec::new(); + while let Some((idx, hash, result)) = + futures::StreamExt::next(&mut stream).await + { + let addr_hex = hex::encode(hash.0); + let chunk = result + .map_err(|e| { + self_encryption::Error::Generic(format!( + "Network fetch failed for {addr_hex}: {e}" + )) + })? + .ok_or_else(|| { + self_encryption::Error::Generic(format!( + "Chunk not found: {addr_hex}" + )) + })?; + results.push((idx, chunk.content)); + let fetched = + fetched_ref.fetch_add(1, std::sync::atomic::Ordering::Relaxed) + 1; + info!("Downloaded {fetched}/{total_chunks}"); + if let Some(ref tx) = progress_ref { + let _ = tx.try_send(DownloadEvent::ChunksFetched { + fetched, + total: total_chunks, + }); } - }) - .buffer_unordered(cap); - - let mut results = Vec::new(); - while let Some((idx, hash, result)) = - futures::StreamExt::next(&mut stream).await - { - let addr_hex = hex::encode(hash.0); - let chunk = result - .map_err(|e| { - self_encryption::Error::Generic(format!( - "Network fetch failed for {addr_hex}: {e}" - )) - })? - .ok_or_else(|| { - self_encryption::Error::Generic(format!( - "Chunk not found: {addr_hex}" - )) - })?; - results.push((idx, chunk.content)); - let fetched = - fetched_ref.fetch_add(1, std::sync::atomic::Ordering::Relaxed) + 1; - info!("Downloaded {fetched}/{total_chunks}"); - if let Some(ref tx) = progress_ref { - let _ = tx.try_send(DownloadEvent::ChunksFetched { - fetched, - total: total_chunks, - }); } - } - // streaming_decrypt itself sort_by_keys before - // zipping, but the same closure is also passed - // through get_root_data_map_parallel internally - // (see self_encryption::stream_decrypt.rs::new), and - // THAT path zips positionally without sorting. Sort - // here so both consumers see input order. - results.sort_by_key(|(idx, _)| *idx); - Ok(results) + // streaming_decrypt itself sort_by_keys before + // zipping, but the same closure is also passed + // through get_root_data_map_parallel internally + // (see self_encryption::stream_decrypt.rs::new), and + // THAT path zips positionally without sorting. Sort + // here so both consumers see input order. + results.sort_by_key(|(idx, _)| *idx); + Ok(results) + }) }) - }) - }) + }, + decrypt_batch_size, + ) .map_err(|e| Error::Encryption(format!("streaming decrypt failed: {e}")))?; // Write decrypted chunks to a temp file, then rename atomically. @@ -1932,6 +2039,54 @@ mod tests { ); } + #[test] + fn adaptive_stream_decrypt_batch_size_tracks_fetch_headroom() { + let batch_size = adaptive_stream_decrypt_batch_size(1_000, 64, 10, Some(u64::MAX)); + + assert_eq!(batch_size, 64 * DOWNLOAD_STREAM_BATCH_FETCH_MULTIPLIER); + } + + #[test] + fn adaptive_stream_decrypt_batch_size_caps_to_total_chunks() { + let batch_size = adaptive_stream_decrypt_batch_size(12, 64, 10, Some(u64::MAX)); + + assert_eq!(batch_size, 12); + } + + #[test] + fn adaptive_stream_decrypt_batch_size_honours_configured_floor() { + let batch_size = adaptive_stream_decrypt_batch_size(1_000, 1, 32, None); + + assert_eq!(batch_size, 32); + } + + #[test] + fn adaptive_stream_decrypt_batch_size_does_not_expand_without_memory_reading() { + let batch_size = adaptive_stream_decrypt_batch_size(1_000, 64, 10, None); + + assert_eq!(batch_size, 10); + } + + #[test] + fn adaptive_stream_decrypt_batch_size_caps_to_memory_budget() { + let estimated_bytes_per_chunk = (self_encryption::MAX_CHUNK_SIZE as u64) + .saturating_mul(DOWNLOAD_STREAM_BATCH_BYTES_PER_CHUNK_MULTIPLIER) + .max(1); + let usable_memory = estimated_bytes_per_chunk + .saturating_mul(16) + .saturating_mul(DOWNLOAD_STREAM_BATCH_MEMORY_BUDGET_DIVISOR); + let batch_size = adaptive_stream_decrypt_batch_size(1_000, 256, 10, Some(usable_memory)); + + assert_eq!(batch_size, 16); + } + + #[test] + fn adaptive_stream_decrypt_batch_size_keeps_one_chunk_when_memory_is_tight() { + let batch_size = adaptive_stream_decrypt_batch_size(1_000, 64, 10, Some(1)); + + assert_eq!(batch_size, 1); + } + #[test] fn chunk_spill_round_trip() { let mut spill = ChunkSpill::new().unwrap(); diff --git a/ant-core/src/data/client/mod.rs b/ant-core/src/data/client/mod.rs index 5e23ccb..d2436ca 100644 --- a/ant-core/src/data/client/mod.rs +++ b/ant-core/src/data/client/mod.rs @@ -283,19 +283,11 @@ fn build_controller(config: &ClientConfig) -> (AdaptiveController, Option` populated from the env var at first access - // and frozen for the process lifetime. Setting the env var from - // Rust would require `std::env::set_var`, which is `unsafe` - // since Rust 1.80 (it races against concurrent reads in any - // other thread); per project policy, `unsafe` is banned. - // - // The adaptive controller still drives fan-out *inside* each - // batch — we re-read `controller.fetch.current()` in the - // `streaming_decrypt` callback. The upstream batch size only - // controls how many chunks `self_encryption` asks us for at a - // time (default 10). For larger batch sizes export - // `STREAM_DECRYPT_BATCH_SIZE` before launching the process. + // File downloads choose a stream-decrypt batch size per download + // from the current fetch cap and usable RAM, then pass it into + // self_encryption's runtime batch-size API. The adaptive controller + // still drives fan-out inside each batch by re-reading + // `controller.fetch.current()` in the decrypt callback. (controller, persist_path) } From 5cb4f183e18f2e9d3e9989db7f04b913a00d4aa6 Mon Sep 17 00:00:00 2001 From: Warm Beer Date: Wed, 13 May 2026 17:36:16 +0200 Subject: [PATCH 2/2] chore: update self_encryption dependency to version 0.36 in Cargo.toml --- ant-core/Cargo.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/ant-core/Cargo.toml b/ant-core/Cargo.toml index 6c00ce1..fe550e9 100644 --- a/ant-core/Cargo.toml +++ b/ant-core/Cargo.toml @@ -38,7 +38,7 @@ tower-http = { version = "0.6.8", features = ["cors"] } # deps here or the version can skew between ant-client and ant-node. ant-protocol = { git = "https://github.com/WithAutonomi/ant-protocol", branch = "rc-2026.5.2" } xor_name = "5" -self_encryption = { path = "../../self_encryption" } +self_encryption = "0.36" futures = "0.3" postcard = { version = "1.1.3", features = ["use-std"] } rmp-serde = "1"