Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
56 changes: 56 additions & 0 deletions src/storage/handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -164,7 +164,38 @@ impl AntProtocol {
}

/// Handle a PUT request.
///
/// Wraps `handle_put_inner` to emit a single structured tracing event per
/// PUT RPC at every exit path, including early-return validation paths.
/// The event uses `target: "ant_node::storage::rpc_latency"` so that
/// Elasticsearch / Kibana can build p50/p95/p99 store-RPC latency
/// histograms from the existing telegraf log forwarding.
async fn handle_put(&self, request: ChunkPutRequest) -> ChunkPutResponse {
let start = std::time::Instant::now();
let addr_hex = hex::encode(request.address);
let chunk_size = request.content.len();
let response = self.handle_put_inner(request).await;
let duration_ms = u64::try_from(start.elapsed().as_millis()).unwrap_or(u64::MAX);
let outcome: &'static str = match &response {
ChunkPutResponse::Success { .. } => "success",
ChunkPutResponse::AlreadyExists { .. } => "already_exists",
ChunkPutResponse::PaymentRequired { .. } => "payment_required",
ChunkPutResponse::Error(_) => "error",
_ => "unknown",
};
info!(
target: "ant_node::storage::rpc_latency",
duration_ms,
chunk_size,
outcome,
addr = %addr_hex,
"put_rpc"
);
response
Comment on lines +174 to +194
}

/// Inner body of `handle_put` — see the wrapper for the per-RPC latency log.
async fn handle_put_inner(&self, request: ChunkPutRequest) -> ChunkPutResponse {
let address = request.address;
let addr_hex = hex::encode(address);
debug!("Handling PUT request for {addr_hex}");
Expand Down Expand Up @@ -254,7 +285,32 @@ impl AntProtocol {
}

/// Handle a GET request.
///
/// Wraps `handle_get_inner` to emit a single structured tracing event per
/// GET RPC at every exit path. See `handle_put` for the rationale.
async fn handle_get(&self, request: ChunkGetRequest) -> ChunkGetResponse {
let start = std::time::Instant::now();
let addr_hex = hex::encode(request.address);
let response = self.handle_get_inner(request).await;
let duration_ms = u64::try_from(start.elapsed().as_millis()).unwrap_or(u64::MAX);
let outcome: &'static str = match &response {
ChunkGetResponse::Success { .. } => "success",
ChunkGetResponse::NotFound { .. } => "not_found",
ChunkGetResponse::Error(_) => "error",
_ => "unknown",
};
info!(
target: "ant_node::storage::rpc_latency",
duration_ms,
outcome,
addr = %addr_hex,
"get_rpc"
);
response
}

/// Inner body of `handle_get` — see the wrapper for the per-RPC latency log.
async fn handle_get_inner(&self, request: ChunkGetRequest) -> ChunkGetResponse {
let address = request.address;
let addr_hex = hex::encode(address);
debug!("Handling GET request for {addr_hex}");
Expand Down
Loading