From fd6121bfa2c784d4371d66bbf0f10ae47fe148bc Mon Sep 17 00:00:00 2001 From: Chris O'Neil Date: Tue, 12 May 2026 20:42:37 +0100 Subject: [PATCH] feat(upload): instrument per-chunk retries and store wall-clock Track per-chunk attempt counts and store-RPC wall-clock through the upload pipeline so testnet runs can identify when slowdowns are client-side quorum/retry cost vs network or storage cost. Surface on FileUploadResult and ant-cli --json output: - chunk_attempts_total: sum of store-RPC attempts (>= chunks_stored) - store_durations_ms: per-chunk wall-clock from first attempt to success - retries_histogram: how many stored chunks needed N retries Also emit a structured "chunk_store_wave_complete" info log per wave with p50/p95/max durations and per-round retry counts, for log-based analysis without --json parsing. Co-Authored-By: Claude Opus 4.7 (1M context) --- ant-cli/src/commands/data/file.rs | 14 +++ ant-core/src/data/client/batch.rs | 163 +++++++++++++++++++++++++++-- ant-core/src/data/client/data.rs | 2 +- ant-core/src/data/client/file.rs | 98 ++++++++++++++--- ant-core/src/data/client/merkle.rs | 13 ++- 5 files changed, 259 insertions(+), 31 deletions(-) diff --git a/ant-cli/src/commands/data/file.rs b/ant-cli/src/commands/data/file.rs index 34d69a9..c910501 100644 --- a/ant-cli/src/commands/data/file.rs +++ b/ant-cli/src/commands/data/file.rs @@ -279,6 +279,9 @@ async fn handle_file_upload( storage_cost_atto: result.storage_cost_atto.clone(), gas_cost_wei: result.gas_cost_wei.to_string(), elapsed_secs: elapsed.as_secs_f64(), + chunk_attempts_total: result.chunk_attempts_total, + store_durations_ms: result.store_durations_ms.clone(), + retries_histogram: result.retries_histogram, }; println!("{}", serde_json::to_string(&out)?); } else { @@ -331,6 +334,9 @@ async fn handle_file_upload( storage_cost_atto: result.storage_cost_atto.clone(), gas_cost_wei: result.gas_cost_wei.to_string(), elapsed_secs: elapsed.as_secs_f64(), + chunk_attempts_total: result.chunk_attempts_total, + store_durations_ms: result.store_durations_ms.clone(), + retries_histogram: result.retries_histogram, }; println!("{}", serde_json::to_string(&out)?); } else { @@ -583,6 +589,14 @@ struct UploadJsonResult { storage_cost_atto: String, gas_cost_wei: String, elapsed_secs: f64, + /// Sum of chunk-store RPC attempts; `>= chunks_stored` on success. + chunk_attempts_total: usize, + /// Per-chunk store wall-clock in ms. Empty for upload paths that + /// don't run the wave store loop. + #[serde(skip_serializing_if = "Vec::is_empty")] + store_durations_ms: Vec, + /// Stored-chunk count by retry round (index 0 = first attempt). + retries_histogram: [usize; 4], } #[derive(Serialize)] diff --git a/ant-core/src/data/client/batch.rs b/ant-core/src/data/client/batch.rs index d1ff614..7f1fc89 100644 --- a/ant-core/src/data/client/batch.rs +++ b/ant-core/src/data/client/batch.rs @@ -20,7 +20,7 @@ use ant_protocol::{compute_address, XorName, DATA_TYPE_CHUNK}; use bytes::Bytes; use futures::stream::{self, StreamExt}; use std::collections::{HashMap, HashSet}; -use std::time::Duration; +use std::time::{Duration, Instant}; use tokio::sync::mpsc; use tracing::{debug, info, warn}; @@ -62,6 +62,71 @@ pub struct WaveResult { pub stored: Vec, /// Chunks that failed to store after all retries. pub failed: Vec<(XorName, String)>, + /// Sum of store-RPC attempts across all chunks in this wave (>= stored.len() + failed.len()). + pub chunk_attempts_total: usize, + /// Per-chunk wall-clock (ms) from first attempt to successful store. Only populated for stored chunks. + pub store_durations_ms: Vec, + /// Histogram of which retry-round each stored chunk succeeded on (index 0 = first attempt). + pub retries_per_chunk: Vec, +} + +/// Aggregated retry / wall-clock stats across one or more [`WaveResult`]s. +/// +/// Used by [`Client::batch_upload_chunks_with_events`] (which may store +/// multiple waves per call) and surfaced upward into `FileUploadResult` so +/// downstream tooling can record per-upload retry pressure and per-chunk +/// store wall-clock without needing log parsing. +#[derive(Debug, Default, Clone)] +pub struct WaveAggregateStats { + /// Sum of store-RPC attempts across all waves (>= chunks_stored). + pub chunk_attempts_total: usize, + /// Per-chunk wall-clock (ms) from first attempt to successful store, + /// concatenated across waves. + pub store_durations_ms: Vec, + /// Count of stored chunks that succeeded on each retry round + /// (index 0 = first attempt, 1 = first retry, etc.). Indices match + /// the retry rounds emitted by `Client::store_paid_chunks_with_events` + /// which caps at `MAX_RETRIES = 3`, so an array of 4 suffices. + pub retries_histogram: [usize; 4], +} + +impl WaveAggregateStats { + /// Fold one [`WaveResult`]'s stats into the running aggregate. + pub fn absorb(&mut self, wave: &WaveResult) { + self.chunk_attempts_total = self + .chunk_attempts_total + .saturating_add(wave.chunk_attempts_total); + self.store_durations_ms.extend(&wave.store_durations_ms); + for &r in &wave.retries_per_chunk { + let idx = (r as usize).min(self.retries_histogram.len() - 1); + self.retries_histogram[idx] = self.retries_histogram[idx].saturating_add(1); + } + } +} + +/// Compute a percentile from an unsorted slice of `u64` values. +/// +/// `p` is in `[0.0, 1.0]`. Returns 0 for an empty slice. Uses nearest-rank; +/// callers don't need numerical precision here — these are coarse log/metric +/// summaries. +fn percentile(values: &[u64], p: f64) -> u64 { + if values.is_empty() { + return 0; + } + let mut sorted = values.to_vec(); + sorted.sort_unstable(); + let p = p.clamp(0.0, 1.0); + // Nearest-rank: ceil(p * n) - 1, clamped to [0, n-1]. + let n = sorted.len(); + #[allow( + clippy::cast_possible_truncation, + clippy::cast_sign_loss, + clippy::cast_precision_loss + )] + let rank = ((p * n as f64).ceil() as usize) + .saturating_sub(1) + .min(n - 1); + sorted[rank] } /// Payment data for external signing. @@ -286,8 +351,10 @@ impl Client { &self, chunks: Vec, ) -> Result<(Vec, String, u128)> { - self.batch_upload_chunks_with_events(chunks, None, 0, 0) - .await + let (addresses, storage, gas, _stats) = self + .batch_upload_chunks_with_events(chunks, None, 0, 0) + .await?; + Ok((addresses, storage, gas)) } /// Same as [`Client::batch_upload_chunks`] but sends [`UploadEvent::ChunkStored`] @@ -302,9 +369,14 @@ impl Client { progress: Option<&mpsc::Sender>, stored_offset: usize, file_total: usize, - ) -> Result<(Vec, String, u128)> { + ) -> Result<(Vec, String, u128, WaveAggregateStats)> { if chunks.is_empty() { - return Ok((Vec::new(), "0".to_string(), 0)); + return Ok(( + Vec::new(), + "0".to_string(), + 0, + WaveAggregateStats::default(), + )); } let total_chunks = chunks.len(); @@ -321,6 +393,7 @@ impl Client { // Accumulate costs across waves. let mut total_storage = Amount::ZERO; let mut total_gas: u128 = 0; + let mut agg_stats = WaveAggregateStats::default(); // Deduplicate chunks by content address. let mut unique_chunks = Vec::with_capacity(total_chunks); @@ -388,6 +461,7 @@ impl Client { // Track partial progress from previous wave. if let Some(wave_result) = store_result { all_addresses.extend(&wave_result.stored); + agg_stats.absorb(&wave_result); if !wave_result.failed.is_empty() { let failed_count = wave_result.failed.len(); warn!("{failed_count} chunks failed to store after retries"); @@ -437,6 +511,7 @@ impl Client { .store_paid_chunks_with_events(paid_chunks, progress, store_offset, file_total) .await; all_addresses.extend(&wave_result.stored); + agg_stats.absorb(&wave_result); if !wave_result.failed.is_empty() { let failed_count = wave_result.failed.len(); warn!("{failed_count} chunks failed to store after retries (final wave)"); @@ -452,7 +527,12 @@ impl Client { } debug!("Batch upload complete: {} addresses", all_addresses.len()); - Ok((all_addresses, total_storage.to_string(), total_gas)) + Ok(( + all_addresses, + total_storage.to_string(), + total_gas, + agg_stats, + )) } /// Prepare a wave of chunks by collecting quotes concurrently. @@ -550,6 +630,18 @@ impl Client { let mut stored = Vec::new(); let mut to_retry = paid_chunks; + // Per-chunk first-seen timestamps, keyed by chunk address. + // Inserted on first sight; never overwritten so wall-clock spans + // first attempt → eventual success across all retry rounds. + let mut first_seen: HashMap = HashMap::with_capacity(to_retry.len()); + for chunk in &to_retry { + first_seen.entry(chunk.address).or_insert_with(Instant::now); + } + + let mut chunk_attempts_total: usize = 0; + let mut store_durations_ms: Vec = Vec::new(); + let mut retries_per_chunk: Vec = Vec::new(); + for attempt in 0..=MAX_RETRIES { if attempt > 0 { let delay = Duration::from_millis(BASE_DELAY_MS * 2u64.pow(attempt - 1)); @@ -560,6 +652,9 @@ impl Client { ); } + // Each chunk in this round counts as one store-RPC attempt. + chunk_attempts_total = chunk_attempts_total.saturating_add(to_retry.len()); + let store_limiter = self.controller().store.clone(); let store_concurrency = store_limiter.current().min(to_retry.len().max(1)); let mut upload_stream = stream::iter(to_retry) @@ -589,6 +684,12 @@ impl Client { while let Some((chunk, result)) = upload_stream.next().await { match result { Ok(name) => { + let duration_ms = first_seen + .get(&chunk.address) + .map(|t| u64::try_from(t.elapsed().as_millis()).unwrap_or(u64::MAX)) + .unwrap_or(0); + store_durations_ms.push(duration_ms); + retries_per_chunk.push(attempt); stored.push(name); let stored_num = stored_before + stored.len(); if total_chunks > 0 { @@ -606,10 +707,15 @@ impl Client { } if failed_this_round.is_empty() { - return WaveResult { + let result = WaveResult { stored, failed: Vec::new(), + chunk_attempts_total, + store_durations_ms, + retries_per_chunk, }; + log_wave_summary(&result); + return result; } if attempt == MAX_RETRIES { @@ -617,7 +723,15 @@ impl Client { .into_iter() .map(|(c, e)| (c.address, e)) .collect(); - return WaveResult { stored, failed }; + let result = WaveResult { + stored, + failed, + chunk_attempts_total, + store_durations_ms, + retries_per_chunk, + }; + log_wave_summary(&result); + return result; } warn!( @@ -629,13 +743,42 @@ impl Client { } // Unreachable due to loop structure, but satisfy the compiler. - WaveResult { + let result = WaveResult { stored, failed: Vec::new(), - } + chunk_attempts_total, + store_durations_ms, + retries_per_chunk, + }; + log_wave_summary(&result); + result } } +/// Emit one structured info line summarising a wave's store-side stats. +/// +/// Surfaces p50/p95/max chunk wall-clock and per-round retry counts so +/// log-based analysis tooling (Elasticsearch / Kibana) can identify +/// client-side quorum or retry cost without needing the `--json` output. +fn log_wave_summary(result: &WaveResult) { + let retries_round_1 = result.retries_per_chunk.iter().filter(|&&r| r == 1).count(); + let retries_round_2 = result.retries_per_chunk.iter().filter(|&&r| r == 2).count(); + let retries_round_3 = result.retries_per_chunk.iter().filter(|&&r| r == 3).count(); + let chunk_attempts_total = result.chunk_attempts_total; + info!( + chunks_stored = result.stored.len(), + chunks_failed = result.failed.len(), + chunk_attempts_total, + retries_round_1, + retries_round_2, + retries_round_3, + store_duration_p50_ms = percentile(&result.store_durations_ms, 0.50), + store_duration_p95_ms = percentile(&result.store_durations_ms, 0.95), + store_duration_max_ms = result.store_durations_ms.iter().max().copied().unwrap_or(0), + "chunk_store_wave_complete" + ); +} + /// Compile-time assertions that batch method futures are Send. #[cfg(test)] mod send_assertions { diff --git a/ant-core/src/data/client/data.rs b/ant-core/src/data/client/data.rs index 04e783e..a42dc25 100644 --- a/ant-core/src/data/client/data.rs +++ b/ant-core/src/data/client/data.rs @@ -125,7 +125,7 @@ impl Client { Err(e) => return Err(e), }; - let chunks_stored = self + let (chunks_stored, _stats) = self .merkle_upload_chunks(chunk_contents, addresses, &batch_result, None) .await?; diff --git a/ant-core/src/data/client/file.rs b/ant-core/src/data/client/file.rs index cfedec6..825168b 100644 --- a/ant-core/src/data/client/file.rs +++ b/ant-core/src/data/client/file.rs @@ -11,7 +11,9 @@ //! For in-memory data uploads, see the `data` module. use crate::data::client::adaptive::observe_op; -use crate::data::client::batch::{finalize_batch_payment, PaymentIntent, PreparedChunk}; +use crate::data::client::batch::{ + finalize_batch_payment, PaymentIntent, PreparedChunk, WaveAggregateStats, +}; use crate::data::client::classify_error; use crate::data::client::merkle::{ finalize_merkle_batch, should_use_merkle, MerkleBatchPaymentResult, PaymentMode, @@ -480,6 +482,17 @@ pub struct FileUploadResult { /// file uploaded before; deterministic via self-encryption), the address /// is still returned but no storage payment was made for it. pub data_map_address: Option<[u8; 32]>, + /// Sum of chunk-store RPC attempts across the upload + /// (`>= chunks_stored` on full success; more if any chunk retried). + /// `0` for paths that don't run the wave store loop. + pub chunk_attempts_total: usize, + /// Per-chunk store wall-clock in ms (length == `chunks_stored` on full + /// success, empty for paths that don't run the wave store loop). + pub store_durations_ms: Vec, + /// Count of stored chunks that succeeded on each retry round + /// (index 0 = first attempt, 1 = first retry, etc.). All zeros for + /// paths that don't run the wave store loop. + pub retries_histogram: [usize; 4], } /// Payment information for external signing — either wave-batch or merkle. @@ -1095,6 +1108,9 @@ impl Client { info!("External-signer upload finalized: {chunks_stored} chunks stored"); + let mut stats = WaveAggregateStats::default(); + stats.absorb(&wave_result); + Ok(FileUploadResult { data_map: prepared.data_map, chunks_stored, @@ -1104,6 +1120,9 @@ impl Client { storage_cost_atto: "0".into(), gas_cost_wei: 0, data_map_address, + chunk_attempts_total: stats.chunk_attempts_total, + store_durations_ms: stats.store_durations_ms, + retries_histogram: stats.retries_histogram, }) } ExternalPaymentInfo::Merkle { .. } => Err(Error::Payment( @@ -1156,7 +1175,7 @@ impl Client { chunk_addresses, } => { let batch_result = finalize_merkle_batch(prepared_batch, winner_pool_hash)?; - let chunks_stored = self + let (chunks_stored, stats) = self .merkle_upload_chunks( chunk_contents, chunk_addresses, @@ -1176,6 +1195,9 @@ impl Client { storage_cost_atto: "0".into(), gas_cost_wei: 0, data_map_address, + chunk_attempts_total: stats.chunk_attempts_total, + store_durations_ms: stats.store_durations_ms, + retries_histogram: stats.retries_histogram, }) } ExternalPaymentInfo::WaveBatch { .. } => Err(Error::Payment( @@ -1246,7 +1268,7 @@ impl Client { } // Phase 2: Decide payment mode and upload in waves from disk. - let (chunks_stored, actual_mode, storage_cost_atto, gas_cost_wei) = + let (chunks_stored, actual_mode, storage_cost_atto, gas_cost_wei, stats) = if self.should_use_merkle(chunk_count, mode) { info!("Using merkle batch payment for {chunk_count} file chunks"); @@ -1257,7 +1279,7 @@ impl Client { Ok(result) => result, Err(Error::InsufficientPeers(ref msg)) if mode == PaymentMode::Auto => { info!("Merkle needs more peers ({msg}), falling back to wave-batch"); - let (stored, sc, gc) = + let (stored, sc, gc, fb_stats) = self.upload_waves_single(&spill, progress.as_ref()).await?; return Ok(FileUploadResult { data_map, @@ -1268,18 +1290,22 @@ impl Client { storage_cost_atto: sc, gas_cost_wei: gc, data_map_address: None, + chunk_attempts_total: fb_stats.chunk_attempts_total, + store_durations_ms: fb_stats.store_durations_ms, + retries_histogram: fb_stats.retries_histogram, }); } Err(e) => return Err(e), }; - let (stored, sc, gc) = self + let (stored, sc, gc, stats) = self .upload_waves_merkle(&spill, &batch_result, progress.as_ref()) .await?; - (stored, PaymentMode::Merkle, sc, gc) + (stored, PaymentMode::Merkle, sc, gc, stats) } else { - let (stored, sc, gc) = self.upload_waves_single(&spill, progress.as_ref()).await?; - (stored, PaymentMode::Single, sc, gc) + let (stored, sc, gc, stats) = + self.upload_waves_single(&spill, progress.as_ref()).await?; + (stored, PaymentMode::Single, sc, gc, stats) }; info!( @@ -1296,6 +1322,9 @@ impl Client { storage_cost_atto, gas_cost_wei, data_map_address: None, + chunk_attempts_total: stats.chunk_attempts_total, + store_durations_ms: stats.store_durations_ms, + retries_histogram: stats.retries_histogram, }) } @@ -1353,10 +1382,11 @@ impl Client { &self, spill: &ChunkSpill, progress: Option<&mpsc::Sender>, - ) -> Result<(usize, String, u128)> { + ) -> Result<(usize, String, u128, WaveAggregateStats)> { let mut total_stored = 0usize; let mut total_storage = Amount::ZERO; let mut total_gas: u128 = 0; + let mut agg_stats = WaveAggregateStats::default(); let total_chunks = spill.len(); let waves: Vec<&[[u8; 32]]> = spill.waves().collect(); let wave_count = waves.len(); @@ -1381,7 +1411,7 @@ impl Client { }) .await; } - let (addresses, wave_storage, wave_gas) = self + let (addresses, wave_storage, wave_gas, wave_stats) = self .batch_upload_chunks_with_events(wave_data, progress, total_stored, total_chunks) .await?; total_stored += addresses.len(); @@ -1389,6 +1419,21 @@ impl Client { total_storage += cost; } total_gas = total_gas.saturating_add(wave_gas); + // Merge per-call stats (each call already aggregates across the + // waves it ran internally, so a simple sum/extend is correct). + agg_stats.chunk_attempts_total = agg_stats + .chunk_attempts_total + .saturating_add(wave_stats.chunk_attempts_total); + agg_stats + .store_durations_ms + .extend(wave_stats.store_durations_ms); + for (slot, count) in agg_stats + .retries_histogram + .iter_mut() + .zip(wave_stats.retries_histogram.iter()) + { + *slot = slot.saturating_add(*count); + } if let Some(tx) = progress { let _ = tx .send(UploadEvent::WaveComplete { @@ -1401,7 +1446,12 @@ impl Client { } } - Ok((total_stored, total_storage.to_string(), total_gas)) + Ok(( + total_stored, + total_storage.to_string(), + total_gas, + agg_stats, + )) } /// Upload chunks from a spill using pre-computed merkle proofs. @@ -1416,12 +1466,13 @@ impl Client { spill: &ChunkSpill, batch_result: &MerkleBatchPaymentResult, progress: Option<&mpsc::Sender>, - ) -> Result<(usize, String, u128)> { + ) -> Result<(usize, String, u128, WaveAggregateStats)> { let mut total_stored = 0usize; let total_chunks = spill.len(); let waves: Vec<&[[u8; 32]]> = spill.waves().collect(); let wave_count = waves.len(); let mut stored_addresses: Vec<[u8; 32]> = Vec::new(); + let mut agg_stats = WaveAggregateStats::default(); for (wave_idx, wave_addrs) in waves.into_iter().enumerate() { let wave_num = wave_idx + 1; @@ -1440,6 +1491,7 @@ impl Client { let proof_bytes = batch_result.proofs.get(&addr).cloned(); let limiter = store_limiter.clone(); async move { + let started = std::time::Instant::now(); let proof = proof_bytes.ok_or_else(|| { ( addr, @@ -1447,9 +1499,13 @@ impl Client { "Missing merkle proof for chunk {}", hex::encode(addr) )), + started, ) })?; - let peers = self.close_group_peers(&addr).await.map_err(|e| (addr, e))?; + let peers = self + .close_group_peers(&addr) + .await + .map_err(|e| (addr, e, started))?; observe_op( &limiter, || async move { @@ -1458,15 +1514,22 @@ impl Client { classify_error, ) .await - .map(|_| addr) - .map_err(|e| (addr, e)) + .map(|_| (addr, started)) + .map_err(|e| (addr, e, started)) } })) .buffer_unordered(store_concurrency); while let Some(result) = upload_stream.next().await { match result { - Ok(addr) => { + Ok((addr, started)) => { + let duration_ms = + u64::try_from(started.elapsed().as_millis()).unwrap_or(u64::MAX); + agg_stats.store_durations_ms.push(duration_ms); + agg_stats.chunk_attempts_total = + agg_stats.chunk_attempts_total.saturating_add(1); + agg_stats.retries_histogram[0] = + agg_stats.retries_histogram[0].saturating_add(1); stored_addresses.push(addr); total_stored += 1; info!("Stored {total_stored}/{total_chunks}"); @@ -1479,7 +1542,7 @@ impl Client { .await; } } - Err((addr, e)) => { + Err((addr, e, _started)) => { warn!("merkle upload failed for chunk {}: {e}", hex::encode(addr)); return Err(Error::PartialUpload { stored: stored_addresses, @@ -1509,6 +1572,7 @@ impl Client { total_stored, batch_result.storage_cost_atto.clone(), batch_result.gas_cost_wei, + agg_stats, )) } diff --git a/ant-core/src/data/client/merkle.rs b/ant-core/src/data/client/merkle.rs index ac2e9a1..8f4fea2 100644 --- a/ant-core/src/data/client/merkle.rs +++ b/ant-core/src/data/client/merkle.rs @@ -534,8 +534,9 @@ impl Client { addresses: Vec<[u8; 32]>, batch_result: &MerkleBatchPaymentResult, progress: Option<&mpsc::Sender>, - ) -> Result { + ) -> Result<(usize, crate::data::client::batch::WaveAggregateStats)> { let mut stored = 0usize; + let mut stats = crate::data::client::batch::WaveAggregateStats::default(); let store_limiter = self.controller().store.clone(); // Clamp fan-out to batch size — partial batches should not // pay for unused slots (see PERF-RESULTS.md). @@ -546,6 +547,7 @@ impl Client { let proof_bytes = batch_result.proofs.get(&addr).cloned(); let limiter = store_limiter.clone(); async move { + let started = std::time::Instant::now(); let proof = proof_bytes.ok_or_else(|| { Error::Payment(format!( "Missing merkle proof for chunk {}", @@ -561,13 +563,18 @@ impl Client { classify_error, ) .await + .map(|_| started) } }, )) .buffer_unordered(store_concurrency); while let Some(result) = upload_stream.next().await { - result?; + let started = result?; + let duration_ms = u64::try_from(started.elapsed().as_millis()).unwrap_or(u64::MAX); + stats.store_durations_ms.push(duration_ms); + stats.chunk_attempts_total = stats.chunk_attempts_total.saturating_add(1); + stats.retries_histogram[0] = stats.retries_histogram[0].saturating_add(1); stored += 1; if let Some(tx) = progress { let _ = tx.try_send(UploadEvent::ChunkStored { @@ -577,7 +584,7 @@ impl Client { } } - Ok(stored) + Ok((stored, stats)) } }