Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 13 additions & 6 deletions src/daemon/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -815,6 +815,12 @@ where
// Spawn a processing task
let model_clone = model.clone();
let user_id = request.data.created_by.clone();
let completion_window = request
.data
.batch_metadata
.get("completion_window")
.cloned()
.unwrap_or_default();
let storage = self.storage.clone();
let http_client = (*self.http_client).clone();
let retry_config = (&self.config).into();
Expand Down Expand Up @@ -845,7 +851,7 @@ where
.entry(user_id.clone())
.or_default()
.fetch_add(1, Ordering::Relaxed);
gauge!("fusillade_user_requests_in_flight", "user" => user_id.clone())
gauge!("fusillade_user_requests_in_flight", "user" => user_id.clone(), "completion_window" => completion_window.clone())
.increment(1.0);

let process_span = tracing::info_span!(
Expand Down Expand Up @@ -873,14 +879,15 @@ where
// Ensure we decrement the per-model and per-user counters when this task completes
let model_for_guard = model_clone.clone();
let user_for_guard = user_id.clone();
let cw_for_guard = completion_window.clone();
let in_flight_for_guard = requests_in_flight.clone();
let user_in_flight_for_guard = user_requests_in_flight.clone();
let _guard = scopeguard::guard((), move |_| {
if let Some(counter) = in_flight_for_guard.get(&model_for_guard) {
counter.value().fetch_sub(1, Ordering::Relaxed);
}
gauge!("fusillade_requests_in_flight", "model" => model_for_guard).decrement(1.0);
gauge!("fusillade_user_requests_in_flight", "user" => user_for_guard.clone()).decrement(1.0);
gauge!("fusillade_user_requests_in_flight", "user" => user_for_guard.clone(), "completion_window" => cw_for_guard).decrement(1.0);
if let Some(counter) = user_in_flight_for_guard.get(&user_for_guard) {
let prev = counter.value().fetch_sub(1, Ordering::Relaxed);
drop(counter);
Expand Down Expand Up @@ -944,7 +951,7 @@ where
failed: AtomicU64::new(0),
}).completed.fetch_add(1, Ordering::Relaxed);
counter!("fusillade_requests_completed_total", "model" => model_clone.clone(), "status" => "success").increment(1);
counter!("fusillade_user_requests_completed_total", "user" => user_id.clone(), "status" => "success").increment(1);
counter!("fusillade_user_requests_completed_total", "user" => user_id.clone(), "status" => "success", "completion_window" => completion_window.clone()).increment(1);
histogram!("fusillade_request_duration_seconds", "model" => model_clone.clone(), "status" => "success")
.record(processing_start.elapsed().as_secs_f64());
// Record how many retries it took to succeed (0 = first attempt succeeded)
Expand Down Expand Up @@ -1002,7 +1009,7 @@ where
}).failed.fetch_add(1, Ordering::Relaxed);
requests_failed.fetch_add(1, Ordering::Relaxed);
counter!("fusillade_requests_completed_total", "model" => model_clone.clone(), "status" => "failed", "reason" => failed.state.reason.metric_label(), "status_code" => failed.state.reason.status_code_label()).increment(1);
counter!("fusillade_user_requests_completed_total", "user" => user_id.clone(), "status" => "failed").increment(1);
counter!("fusillade_user_requests_completed_total", "user" => user_id.clone(), "status" => "failed", "completion_window" => completion_window.clone()).increment(1);
histogram!("fusillade_request_duration_seconds", "model" => model_clone.clone(), "status" => "failed")
.record(processing_start.elapsed().as_secs_f64());

Expand Down Expand Up @@ -1033,7 +1040,7 @@ where
failed: AtomicU64::new(0),
}).failed.fetch_add(1, Ordering::Relaxed);
counter!("fusillade_requests_completed_total", "model" => model_clone.clone(), "status" => "failed", "reason" => failed.state.reason.metric_label(), "status_code" => failed.state.reason.status_code_label()).increment(1);
counter!("fusillade_user_requests_completed_total", "user" => user_id.clone(), "status" => "failed").increment(1);
counter!("fusillade_user_requests_completed_total", "user" => user_id.clone(), "status" => "failed", "completion_window" => completion_window.clone()).increment(1);
histogram!("fusillade_request_duration_seconds", "model" => model_clone.clone(), "status" => "failed")
.record(processing_start.elapsed().as_secs_f64());

Expand All @@ -1059,7 +1066,7 @@ where
Ok(RequestCompletionResult::Canceled(_canceled)) => {
tracing::Span::current().record("outcome", "canceled");
counter!("fusillade_requests_completed_total", "model" => model_clone.clone(), "status" => "cancelled").increment(1);
counter!("fusillade_user_requests_completed_total", "user" => user_id.clone(), "status" => "cancelled").increment(1);
counter!("fusillade_user_requests_completed_total", "user" => user_id.clone(), "status" => "cancelled", "completion_window" => completion_window.clone()).increment(1);
tracing::debug!(request_id = %request_id, "Request canceled by user");
}
Err(FusilladeError::Shutdown) => {
Expand Down
7 changes: 7 additions & 0 deletions src/manager/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -200,6 +200,13 @@ pub trait Storage: Send + Sync {
/// This is a destructive operation that removes the batch and all request data.
async fn delete_batch(&self, batch_id: BatchId) -> Result<()>;

/// Soft-delete batches and files belonging to a creator, nullifying metadata.
///
/// Processes up to `batch_size` batches and `batch_size` files per call.
/// Returns the total number of rows updated. Callers should loop until
/// the return value is 0 to ensure all records are processed.
async fn bulk_delete_data(&self, creator_id: &str, batch_size: i64) -> Result<u64>;

/// Retry failed requests by resetting them to pending state.
///
/// This resets the specified failed requests to pending state with retry_attempt = 0,
Expand Down
74 changes: 74 additions & 0 deletions src/manager/postgres.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2855,6 +2855,80 @@ impl<P: PoolProvider, H: HttpClient + 'static> Storage for PostgresRequestManage
Ok(())
}

async fn bulk_delete_data(&self, creator_id: &str, batch_size: i64) -> Result<u64> {
// Stage 1: Soft-delete batches, cancel active ones, and nullify metadata
let batches_affected = sqlx::query_scalar!(
r#"
WITH to_delete AS (
SELECT id FROM batches
WHERE created_by = $1
AND deleted_at IS NULL
LIMIT $2
FOR UPDATE SKIP LOCKED
)
UPDATE batches b
SET deleted_at = NOW(),
metadata = NULL,
cancelling_at = CASE
WHEN b.completed_at IS NULL AND b.failed_at IS NULL AND b.cancelled_at IS NULL
THEN COALESCE(b.cancelling_at, NOW())
ELSE b.cancelling_at
END,
cancelled_at = CASE
WHEN b.completed_at IS NULL AND b.failed_at IS NULL AND b.cancelled_at IS NULL
THEN COALESCE(b.cancelled_at, NOW())
ELSE b.cancelled_at
END
FROM to_delete td
WHERE b.id = td.id
RETURNING b.id
"#,
creator_id,
batch_size,
)
.fetch_all(self.pools.write())
.await
.map_err(|e| FusilladeError::Other(anyhow!("Failed to soft-delete user batches: {}", e)))?
.len() as u64;

// Stage 2: Soft-delete files uploaded by this creator
let files_affected = sqlx::query_scalar!(
r#"
WITH to_delete AS (
SELECT id FROM files
WHERE uploaded_by = $1
AND deleted_at IS NULL
LIMIT $2
FOR UPDATE SKIP LOCKED
)
UPDATE files f
SET deleted_at = NOW(),
status = 'deleted'
FROM to_delete td
WHERE f.id = td.id
RETURNING f.id
"#,
creator_id,
batch_size,
)
.fetch_all(self.pools.write())
.await
.map_err(|e| FusilladeError::Other(anyhow!("Failed to soft-delete user files: {}", e)))?
.len() as u64;

let total = batches_affected + files_affected;
if total > 0 {
tracing::info!(
creator_id = %creator_id,
batches = batches_affected,
files = files_affected,
"Soft-deleted creator data"
);
}

Ok(total)
}

async fn retry_failed_requests(&self, ids: Vec<RequestId>) -> Result<Vec<Result<()>>> {
tracing::debug!(count = ids.len(), "Retrying failed requests");

Expand Down
Loading