Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 22 additions & 2 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion ant-core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ tower-http = { version = "0.6.8", features = ["cors"] }
# deps here or the version can skew between ant-client and ant-node.
ant-protocol = { git = "https://github.com/WithAutonomi/ant-protocol", branch = "rc-2026.5.2" }
xor_name = "5"
self_encryption = "0.35.0"
self_encryption = "0.36"
futures = "0.3"
postcard = { version = "1.1.3", features = ["use-std"] }
rmp-serde = "1"
Expand Down
279 changes: 217 additions & 62 deletions ant-core/src/data/client/file.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,10 @@ use ant_protocol::{compute_address, DATA_TYPE_CHUNK};
use bytes::Bytes;
use fs2::FileExt;
use futures::stream::{self, StreamExt};
use self_encryption::{get_root_data_map_parallel, stream_encrypt, streaming_decrypt, DataMap};
use self_encryption::{
get_root_data_map_parallel, stream_decrypt_batch_size, stream_encrypt,
streaming_decrypt_with_batch_size, DataMap,
};
use std::collections::{HashMap, HashSet};
use std::io::Write;
use std::path::{Path, PathBuf};
Expand Down Expand Up @@ -85,6 +88,19 @@ type QuoteEntry = (PeerId, Vec<MultiAddr>, PaymentQuote, Amount);
/// Number of chunks per upload wave (matches batch.rs PAYMENT_WAVE_SIZE).
const UPLOAD_WAVE_SIZE: usize = 64;

/// Stream decrypt batches should be larger than fetch fan-out so
/// `buffer_unordered` can keep launching new chunk GETs as earlier ones
/// complete, instead of stopping at each self-encryption batch boundary.
const DOWNLOAD_STREAM_BATCH_FETCH_MULTIPLIER: usize = 4;

/// Use at most this fraction of currently usable RAM for one decrypt batch.
const DOWNLOAD_STREAM_BATCH_MEMORY_BUDGET_DIVISOR: u64 = 4;

/// A decrypt batch briefly holds encrypted chunk bytes, decrypted chunk bytes,
/// and Vec/Bytes overhead. Use a conservative multiplier rather than assuming
/// payload bytes alone.
const DOWNLOAD_STREAM_BATCH_BYTES_PER_CHUNK_MULTIPLIER: u64 = 3;

/// Maximum number of distinct chunk addresses to sample when probing for a
/// representative quote in [`Client::estimate_upload_cost`].
///
Expand Down Expand Up @@ -428,6 +444,75 @@ fn check_disk_space_for_spill(file_size: u64) -> Result<()> {
Ok(())
}

fn usable_memory_bytes() -> Option<u64> {
let mut system = sysinfo::System::new();
system.refresh_memory();

let available_memory = system.available_memory();
let free_memory = system.free_memory();
let used_memory = system.used_memory();
let total_memory = system.total_memory();
let unused_memory = total_memory.saturating_sub(used_memory);

let mut usable = [available_memory, free_memory, unused_memory]
.into_iter()
.filter(|bytes| *bytes > 0)
.max();

let cgroup_free_memory = system
.cgroup_limits()
.filter(|limits| limits.total_memory > 0)
.map(|limits| limits.free_memory);
if let Some(cgroup_free_memory) = cgroup_free_memory {
usable = Some(usable.unwrap_or(u64::MAX).min(cgroup_free_memory));
}

debug!(
available_memory,
free_memory,
used_memory,
total_memory,
cgroup_free_memory,
usable_memory = ?usable,
"Detected usable memory for stream decrypt batch sizing"
);

usable
}

fn stream_decrypt_batch_memory_cap(usable_memory_bytes: u64) -> usize {
let budget = usable_memory_bytes / DOWNLOAD_STREAM_BATCH_MEMORY_BUDGET_DIVISOR;
let estimated_bytes_per_chunk = (self_encryption::MAX_CHUNK_SIZE as u64)
.saturating_mul(DOWNLOAD_STREAM_BATCH_BYTES_PER_CHUNK_MULTIPLIER)
.max(1);
let cap = (budget / estimated_bytes_per_chunk).max(1);

usize::try_from(cap).unwrap_or(usize::MAX)
}

fn adaptive_stream_decrypt_batch_size(
total_chunks: usize,
fetch_cap: usize,
configured_batch_floor: usize,
usable_memory_bytes: Option<u64>,
) -> usize {
let fetch_target = fetch_cap
.max(1)
.saturating_mul(DOWNLOAD_STREAM_BATCH_FETCH_MULTIPLIER);
let requested = match usable_memory_bytes {
Some(bytes) => {
let memory_cap = stream_decrypt_batch_memory_cap(bytes);
configured_batch_floor
.max(fetch_target)
.max(1)
.min(memory_cap)
}
None => configured_batch_floor.max(1),
};

requested.min(total_chunks.max(1)).max(1)
}

/// Whether the data map is published to the network for address-based retrieval.
///
/// A private upload stores only the data chunks and returns the `DataMap` to
Expand Down Expand Up @@ -1793,70 +1878,92 @@ impl Client {
let progress_for_closure = progress.clone();

let fetch_limiter_outer = self.controller().fetch.clone();
let stream = streaming_decrypt(&root_map, |batch: &[(usize, XorName)]| {
let batch_owned: Vec<(usize, XorName)> = batch.to_vec();
let fetched_ref = fetched_for_closure.clone();
let progress_ref = progress_for_closure.clone();
let fetch_limiter = fetch_limiter_outer.clone();

tokio::task::block_in_place(|| {
handle.block_on(async {
// Clamp fan-out to batch size — see PERF-RESULTS.md.
let cap = fetch_limiter.current().min(batch_owned.len().max(1));
let mut stream = futures::stream::iter(batch_owned)
.map(|(idx, hash)| {
let addr = hash.0;
let limiter = fetch_limiter.clone();
async move {
let result = observe_op(
&limiter,
|| async move { self.chunk_get(&addr).await },
classify_error,
)
.await;
(idx, hash, result)
let usable_memory = usable_memory_bytes();
let configured_batch_floor = stream_decrypt_batch_size();
let fetch_cap = fetch_limiter_outer.current();
let decrypt_batch_size = adaptive_stream_decrypt_batch_size(
total_chunks,
fetch_cap,
configured_batch_floor,
usable_memory,
);
info!(
total_chunks,
fetch_cap,
configured_batch_floor,
?usable_memory,
decrypt_batch_size,
"Selected adaptive stream decrypt batch size"
);

let stream = streaming_decrypt_with_batch_size(
&root_map,
|batch: &[(usize, XorName)]| {
let batch_owned: Vec<(usize, XorName)> = batch.to_vec();
let fetched_ref = fetched_for_closure.clone();
let progress_ref = progress_for_closure.clone();
let fetch_limiter = fetch_limiter_outer.clone();

tokio::task::block_in_place(|| {
handle.block_on(async {
// Clamp fan-out to batch size — see PERF-RESULTS.md.
let cap = fetch_limiter.current().min(batch_owned.len().max(1));
let mut stream = futures::stream::iter(batch_owned)
.map(|(idx, hash)| {
let addr = hash.0;
let limiter = fetch_limiter.clone();
async move {
let result = observe_op(
&limiter,
|| async move { self.chunk_get(&addr).await },
classify_error,
)
.await;
(idx, hash, result)
}
})
.buffer_unordered(cap);

let mut results = Vec::new();
while let Some((idx, hash, result)) =
futures::StreamExt::next(&mut stream).await
{
let addr_hex = hex::encode(hash.0);
let chunk = result
.map_err(|e| {
self_encryption::Error::Generic(format!(
"Network fetch failed for {addr_hex}: {e}"
))
})?
.ok_or_else(|| {
self_encryption::Error::Generic(format!(
"Chunk not found: {addr_hex}"
))
})?;
results.push((idx, chunk.content));
let fetched =
fetched_ref.fetch_add(1, std::sync::atomic::Ordering::Relaxed) + 1;
info!("Downloaded {fetched}/{total_chunks}");
if let Some(ref tx) = progress_ref {
let _ = tx.try_send(DownloadEvent::ChunksFetched {
fetched,
total: total_chunks,
});
}
})
.buffer_unordered(cap);

let mut results = Vec::new();
while let Some((idx, hash, result)) =
futures::StreamExt::next(&mut stream).await
{
let addr_hex = hex::encode(hash.0);
let chunk = result
.map_err(|e| {
self_encryption::Error::Generic(format!(
"Network fetch failed for {addr_hex}: {e}"
))
})?
.ok_or_else(|| {
self_encryption::Error::Generic(format!(
"Chunk not found: {addr_hex}"
))
})?;
results.push((idx, chunk.content));
let fetched =
fetched_ref.fetch_add(1, std::sync::atomic::Ordering::Relaxed) + 1;
info!("Downloaded {fetched}/{total_chunks}");
if let Some(ref tx) = progress_ref {
let _ = tx.try_send(DownloadEvent::ChunksFetched {
fetched,
total: total_chunks,
});
}
}
// streaming_decrypt itself sort_by_keys before
// zipping, but the same closure is also passed
// through get_root_data_map_parallel internally
// (see self_encryption::stream_decrypt.rs::new), and
// THAT path zips positionally without sorting. Sort
// here so both consumers see input order.
results.sort_by_key(|(idx, _)| *idx);
Ok(results)
// streaming_decrypt itself sort_by_keys before
// zipping, but the same closure is also passed
// through get_root_data_map_parallel internally
// (see self_encryption::stream_decrypt.rs::new), and
// THAT path zips positionally without sorting. Sort
// here so both consumers see input order.
results.sort_by_key(|(idx, _)| *idx);
Ok(results)
})
})
})
})
},
decrypt_batch_size,
)
.map_err(|e| Error::Encryption(format!("streaming decrypt failed: {e}")))?;

// Write decrypted chunks to a temp file, then rename atomically.
Expand Down Expand Up @@ -1932,6 +2039,54 @@ mod tests {
);
}

#[test]
fn adaptive_stream_decrypt_batch_size_tracks_fetch_headroom() {
let batch_size = adaptive_stream_decrypt_batch_size(1_000, 64, 10, Some(u64::MAX));

assert_eq!(batch_size, 64 * DOWNLOAD_STREAM_BATCH_FETCH_MULTIPLIER);
}

#[test]
fn adaptive_stream_decrypt_batch_size_caps_to_total_chunks() {
let batch_size = adaptive_stream_decrypt_batch_size(12, 64, 10, Some(u64::MAX));

assert_eq!(batch_size, 12);
}

#[test]
fn adaptive_stream_decrypt_batch_size_honours_configured_floor() {
let batch_size = adaptive_stream_decrypt_batch_size(1_000, 1, 32, None);

assert_eq!(batch_size, 32);
}

#[test]
fn adaptive_stream_decrypt_batch_size_does_not_expand_without_memory_reading() {
let batch_size = adaptive_stream_decrypt_batch_size(1_000, 64, 10, None);

assert_eq!(batch_size, 10);
}

#[test]
fn adaptive_stream_decrypt_batch_size_caps_to_memory_budget() {
let estimated_bytes_per_chunk = (self_encryption::MAX_CHUNK_SIZE as u64)
.saturating_mul(DOWNLOAD_STREAM_BATCH_BYTES_PER_CHUNK_MULTIPLIER)
.max(1);
let usable_memory = estimated_bytes_per_chunk
.saturating_mul(16)
.saturating_mul(DOWNLOAD_STREAM_BATCH_MEMORY_BUDGET_DIVISOR);
let batch_size = adaptive_stream_decrypt_batch_size(1_000, 256, 10, Some(usable_memory));

assert_eq!(batch_size, 16);
}

#[test]
fn adaptive_stream_decrypt_batch_size_keeps_one_chunk_when_memory_is_tight() {
let batch_size = adaptive_stream_decrypt_batch_size(1_000, 64, 10, Some(1));

assert_eq!(batch_size, 1);
}

#[test]
fn chunk_spill_round_trip() {
let mut spill = ChunkSpill::new().unwrap();
Expand Down
18 changes: 5 additions & 13 deletions ant-core/src/data/client/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -283,19 +283,11 @@ fn build_controller(config: &ClientConfig) -> (AdaptiveController, Option<PathBu
adaptive::default_persist_path()
};

// Note: self_encryption's `STREAM_DECRYPT_BATCH_SIZE` is a
// `LazyLock<usize>` populated from the env var at first access
// and frozen for the process lifetime. Setting the env var from
// Rust would require `std::env::set_var`, which is `unsafe`
// since Rust 1.80 (it races against concurrent reads in any
// other thread); per project policy, `unsafe` is banned.
//
// The adaptive controller still drives fan-out *inside* each
// batch — we re-read `controller.fetch.current()` in the
// `streaming_decrypt` callback. The upstream batch size only
// controls how many chunks `self_encryption` asks us for at a
// time (default 10). For larger batch sizes export
// `STREAM_DECRYPT_BATCH_SIZE` before launching the process.
// File downloads choose a stream-decrypt batch size per download
// from the current fetch cap and usable RAM, then pass it into
// self_encryption's runtime batch-size API. The adaptive controller
// still drives fan-out inside each batch by re-reading
// `controller.fetch.current()` in the decrypt callback.

(controller, persist_path)
}
Expand Down