Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 14 additions & 0 deletions ant-cli/src/commands/data/file.rs
Original file line number Diff line number Diff line change
Expand Up @@ -279,6 +279,9 @@ async fn handle_file_upload(
storage_cost_atto: result.storage_cost_atto.clone(),
gas_cost_wei: result.gas_cost_wei.to_string(),
elapsed_secs: elapsed.as_secs_f64(),
chunk_attempts_total: result.chunk_attempts_total,
store_durations_ms: result.store_durations_ms.clone(),
retries_histogram: result.retries_histogram,
};
println!("{}", serde_json::to_string(&out)?);
} else {
Expand Down Expand Up @@ -331,6 +334,9 @@ async fn handle_file_upload(
storage_cost_atto: result.storage_cost_atto.clone(),
gas_cost_wei: result.gas_cost_wei.to_string(),
elapsed_secs: elapsed.as_secs_f64(),
chunk_attempts_total: result.chunk_attempts_total,
store_durations_ms: result.store_durations_ms.clone(),
retries_histogram: result.retries_histogram,
};
println!("{}", serde_json::to_string(&out)?);
} else {
Expand Down Expand Up @@ -583,6 +589,14 @@ struct UploadJsonResult {
storage_cost_atto: String,
gas_cost_wei: String,
elapsed_secs: f64,
/// Sum of chunk-store RPC attempts; `>= chunks_stored` on success.
chunk_attempts_total: usize,
/// Per-chunk store wall-clock in ms. Empty for upload paths that
/// don't run the wave store loop.
#[serde(skip_serializing_if = "Vec::is_empty")]
store_durations_ms: Vec<u64>,
/// Stored-chunk count by retry round (index 0 = first attempt).
retries_histogram: [usize; 4],
}

#[derive(Serialize)]
Expand Down
163 changes: 153 additions & 10 deletions ant-core/src/data/client/batch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ use ant_protocol::{compute_address, XorName, DATA_TYPE_CHUNK};
use bytes::Bytes;
use futures::stream::{self, StreamExt};
use std::collections::{HashMap, HashSet};
use std::time::Duration;
use std::time::{Duration, Instant};
use tokio::sync::mpsc;
use tracing::{debug, info, warn};

Expand Down Expand Up @@ -62,6 +62,71 @@ pub struct WaveResult {
pub stored: Vec<XorName>,
/// Chunks that failed to store after all retries.
pub failed: Vec<(XorName, String)>,
/// Sum of store-RPC attempts across all chunks in this wave (>= stored.len() + failed.len()).
pub chunk_attempts_total: usize,
/// Per-chunk wall-clock (ms) from first attempt to successful store. Only populated for stored chunks.
pub store_durations_ms: Vec<u64>,
/// Histogram of which retry-round each stored chunk succeeded on (index 0 = first attempt).
pub retries_per_chunk: Vec<u32>,
}

/// Aggregated retry / wall-clock stats across one or more [`WaveResult`]s.
///
/// Used by [`Client::batch_upload_chunks_with_events`] (which may store
/// multiple waves per call) and surfaced upward into `FileUploadResult` so
/// downstream tooling can record per-upload retry pressure and per-chunk
/// store wall-clock without needing log parsing.
#[derive(Debug, Default, Clone)]
pub struct WaveAggregateStats {
/// Sum of store-RPC attempts across all waves (>= chunks_stored).
pub chunk_attempts_total: usize,
/// Per-chunk wall-clock (ms) from first attempt to successful store,
/// concatenated across waves.
pub store_durations_ms: Vec<u64>,
/// Count of stored chunks that succeeded on each retry round
/// (index 0 = first attempt, 1 = first retry, etc.). Indices match
/// the retry rounds emitted by `Client::store_paid_chunks_with_events`
/// which caps at `MAX_RETRIES = 3`, so an array of 4 suffices.
pub retries_histogram: [usize; 4],
}

impl WaveAggregateStats {
/// Fold one [`WaveResult`]'s stats into the running aggregate.
pub fn absorb(&mut self, wave: &WaveResult) {
self.chunk_attempts_total = self
.chunk_attempts_total
.saturating_add(wave.chunk_attempts_total);
self.store_durations_ms.extend(&wave.store_durations_ms);
for &r in &wave.retries_per_chunk {
let idx = (r as usize).min(self.retries_histogram.len() - 1);
self.retries_histogram[idx] = self.retries_histogram[idx].saturating_add(1);
}
}
}

/// Compute a percentile from an unsorted slice of `u64` values.
///
/// `p` is in `[0.0, 1.0]`. Returns 0 for an empty slice. Uses nearest-rank;
/// callers don't need numerical precision here — these are coarse log/metric
/// summaries.
fn percentile(values: &[u64], p: f64) -> u64 {
if values.is_empty() {
return 0;
}
let mut sorted = values.to_vec();
sorted.sort_unstable();
let p = p.clamp(0.0, 1.0);
// Nearest-rank: ceil(p * n) - 1, clamped to [0, n-1].
let n = sorted.len();
#[allow(
clippy::cast_possible_truncation,
clippy::cast_sign_loss,
clippy::cast_precision_loss
)]
let rank = ((p * n as f64).ceil() as usize)
.saturating_sub(1)
.min(n - 1);
sorted[rank]
}

/// Payment data for external signing.
Expand Down Expand Up @@ -286,8 +351,10 @@ impl Client {
&self,
chunks: Vec<Bytes>,
) -> Result<(Vec<XorName>, String, u128)> {
self.batch_upload_chunks_with_events(chunks, None, 0, 0)
.await
let (addresses, storage, gas, _stats) = self
.batch_upload_chunks_with_events(chunks, None, 0, 0)
.await?;
Ok((addresses, storage, gas))
}

/// Same as [`Client::batch_upload_chunks`] but sends [`UploadEvent::ChunkStored`]
Expand All @@ -302,9 +369,14 @@ impl Client {
progress: Option<&mpsc::Sender<UploadEvent>>,
stored_offset: usize,
file_total: usize,
) -> Result<(Vec<XorName>, String, u128)> {
) -> Result<(Vec<XorName>, String, u128, WaveAggregateStats)> {
if chunks.is_empty() {
return Ok((Vec::new(), "0".to_string(), 0));
return Ok((
Vec::new(),
"0".to_string(),
0,
WaveAggregateStats::default(),
));
}

let total_chunks = chunks.len();
Expand All @@ -321,6 +393,7 @@ impl Client {
// Accumulate costs across waves.
let mut total_storage = Amount::ZERO;
let mut total_gas: u128 = 0;
let mut agg_stats = WaveAggregateStats::default();

// Deduplicate chunks by content address.
let mut unique_chunks = Vec::with_capacity(total_chunks);
Expand Down Expand Up @@ -388,6 +461,7 @@ impl Client {
// Track partial progress from previous wave.
if let Some(wave_result) = store_result {
all_addresses.extend(&wave_result.stored);
agg_stats.absorb(&wave_result);
if !wave_result.failed.is_empty() {
let failed_count = wave_result.failed.len();
warn!("{failed_count} chunks failed to store after retries");
Expand Down Expand Up @@ -437,6 +511,7 @@ impl Client {
.store_paid_chunks_with_events(paid_chunks, progress, store_offset, file_total)
.await;
all_addresses.extend(&wave_result.stored);
agg_stats.absorb(&wave_result);
if !wave_result.failed.is_empty() {
let failed_count = wave_result.failed.len();
warn!("{failed_count} chunks failed to store after retries (final wave)");
Expand All @@ -452,7 +527,12 @@ impl Client {
}

debug!("Batch upload complete: {} addresses", all_addresses.len());
Ok((all_addresses, total_storage.to_string(), total_gas))
Ok((
all_addresses,
total_storage.to_string(),
total_gas,
agg_stats,
))
}

/// Prepare a wave of chunks by collecting quotes concurrently.
Expand Down Expand Up @@ -550,6 +630,18 @@ impl Client {
let mut stored = Vec::new();
let mut to_retry = paid_chunks;

// Per-chunk first-seen timestamps, keyed by chunk address.
// Inserted on first sight; never overwritten so wall-clock spans
// first attempt → eventual success across all retry rounds.
let mut first_seen: HashMap<XorName, Instant> = HashMap::with_capacity(to_retry.len());
for chunk in &to_retry {
first_seen.entry(chunk.address).or_insert_with(Instant::now);
}

let mut chunk_attempts_total: usize = 0;
let mut store_durations_ms: Vec<u64> = Vec::new();
let mut retries_per_chunk: Vec<u32> = Vec::new();

for attempt in 0..=MAX_RETRIES {
if attempt > 0 {
let delay = Duration::from_millis(BASE_DELAY_MS * 2u64.pow(attempt - 1));
Expand All @@ -560,6 +652,9 @@ impl Client {
);
}

// Each chunk in this round counts as one store-RPC attempt.
chunk_attempts_total = chunk_attempts_total.saturating_add(to_retry.len());

let store_limiter = self.controller().store.clone();
let store_concurrency = store_limiter.current().min(to_retry.len().max(1));
let mut upload_stream = stream::iter(to_retry)
Expand Down Expand Up @@ -589,6 +684,12 @@ impl Client {
while let Some((chunk, result)) = upload_stream.next().await {
match result {
Ok(name) => {
let duration_ms = first_seen
.get(&chunk.address)
.map(|t| u64::try_from(t.elapsed().as_millis()).unwrap_or(u64::MAX))
.unwrap_or(0);
store_durations_ms.push(duration_ms);
retries_per_chunk.push(attempt);
stored.push(name);
let stored_num = stored_before + stored.len();
if total_chunks > 0 {
Expand All @@ -606,18 +707,31 @@ impl Client {
}

if failed_this_round.is_empty() {
return WaveResult {
let result = WaveResult {
stored,
failed: Vec::new(),
chunk_attempts_total,
store_durations_ms,
retries_per_chunk,
};
log_wave_summary(&result);
return result;
}

if attempt == MAX_RETRIES {
let failed = failed_this_round
.into_iter()
.map(|(c, e)| (c.address, e))
.collect();
return WaveResult { stored, failed };
let result = WaveResult {
stored,
failed,
chunk_attempts_total,
store_durations_ms,
retries_per_chunk,
};
log_wave_summary(&result);
return result;
}

warn!(
Expand All @@ -629,13 +743,42 @@ impl Client {
}

// Unreachable due to loop structure, but satisfy the compiler.
WaveResult {
let result = WaveResult {
stored,
failed: Vec::new(),
}
chunk_attempts_total,
store_durations_ms,
retries_per_chunk,
};
log_wave_summary(&result);
result
}
}

/// Emit one structured info line summarising a wave's store-side stats.
///
/// Surfaces p50/p95/max chunk wall-clock and per-round retry counts so
/// log-based analysis tooling (Elasticsearch / Kibana) can identify
/// client-side quorum or retry cost without needing the `--json` output.
fn log_wave_summary(result: &WaveResult) {
let retries_round_1 = result.retries_per_chunk.iter().filter(|&&r| r == 1).count();
let retries_round_2 = result.retries_per_chunk.iter().filter(|&&r| r == 2).count();
let retries_round_3 = result.retries_per_chunk.iter().filter(|&&r| r == 3).count();
let chunk_attempts_total = result.chunk_attempts_total;
info!(
chunks_stored = result.stored.len(),
chunks_failed = result.failed.len(),
chunk_attempts_total,
retries_round_1,
retries_round_2,
retries_round_3,
store_duration_p50_ms = percentile(&result.store_durations_ms, 0.50),
store_duration_p95_ms = percentile(&result.store_durations_ms, 0.95),
store_duration_max_ms = result.store_durations_ms.iter().max().copied().unwrap_or(0),
"chunk_store_wave_complete"
);
}

/// Compile-time assertions that batch method futures are Send.
#[cfg(test)]
mod send_assertions {
Expand Down
2 changes: 1 addition & 1 deletion ant-core/src/data/client/data.rs
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@ impl Client {
Err(e) => return Err(e),
};

let chunks_stored = self
let (chunks_stored, _stats) = self
.merkle_upload_chunks(chunk_contents, addresses, &batch_result, None)
.await?;

Expand Down
Loading
Loading