Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

11 changes: 11 additions & 0 deletions crates/agent/src/controllers/executor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,8 @@ pub struct State {
#[derive(Debug)]
pub struct Outcome {
live_spec_id: models::Id,
/// The catalog name of the live spec, used for alert cleanup on deletion.
catalog_name: String,
/// The next status of the controller.
next_status: ControllerStatus,
/// When to run the controller next. This will account for any backoff after errors.
Expand All @@ -78,6 +80,7 @@ impl automations::Outcome for Outcome {
async fn apply(self, txn: &mut sqlx::PgConnection) -> anyhow::Result<Action> {
let Outcome {
live_spec_id,
catalog_name,
next_status: status,
next_run,
failures,
Expand All @@ -87,6 +90,12 @@ impl automations::Outcome for Outcome {
} = self;

if live_spec_deleted && error.is_none() {
// Clean up any open alerts without sending resolution
// notifications, since the task is being deleted.
control_plane_api::alerts::cleanup_open_alerts(&catalog_name, txn)
.await
.context("resolving alerts for deleted spec")?;

// Do we need to delete the live spec? If `live_spec_id.is_zero()`,
// it means that the `live_specs` row had _already_ been deleted
// before this controller run began. That can happen due an edge
Expand Down Expand Up @@ -162,6 +171,7 @@ impl<C: ControlPlane + Send + Sync + 'static> Executor for LiveSpecControllerExe
inbox.clear();
return Ok(Outcome {
live_spec_id: models::Id::zero(),
catalog_name: String::new(),
live_spec_deleted: true,
failures: 0,
next_run: None,
Expand Down Expand Up @@ -198,6 +208,7 @@ impl<C: ControlPlane + Send + Sync + 'static> Executor for LiveSpecControllerExe

Ok(Outcome {
live_spec_id: controller_state.live_spec_id,
catalog_name: controller_state.catalog_name.to_string(),
next_status,
failures,
error,
Expand Down
111 changes: 111 additions & 0 deletions crates/agent/src/integration_tests/shard_failures.rs
Original file line number Diff line number Diff line change
Expand Up @@ -822,6 +822,117 @@ async fn assert_status_shards_pending(harness: &mut TestHarness, task: &str) {
.await;
}

#[tokio::test]
async fn test_spec_deletion_cleans_up_alerts() {
let mut harness = TestHarness::init("test_spec_deletion_cleans_up_alerts").await;
let user_id = harness.setup_tenant("foxes").await;

let draft = draft_catalog(serde_json::json!({
"collections": {
"foxes/den": {
"schema": {
"type": "object",
"properties": {
"id": { "type": "string" }
}
},
"key": ["/id"]
}
},
"captures": {
"foxes/capture": {
"endpoint": {
"connector": {
"image": "source/test:test",
"config": {}
}
},
"bindings": [
{
"resource": { "table": "den" },
"target": "foxes/den"
}
]
}
}
}));

let result = harness
.control_plane()
.publish(
Some("initial publication".to_string()),
Uuid::new_v4(),
draft,
Some("ops/dp/public/test".to_string()),
)
.await
.expect("initial publish failed");
assert!(result.status.is_success());

harness.run_pending_controllers(None).await;
harness.control_plane().reset_activations();

// Trigger enough shard failures to fire a ShardFailed alert (threshold is 3).
let state = harness.get_controller_state("foxes/capture").await;
let shard = shard_ref(state.last_build_id, "foxes/capture");
for _ in 0..3 {
harness.fail_shard(&shard).await;
harness.run_pending_controller("foxes/capture").await;
}

harness
.assert_alert_firing("foxes/capture", AlertType::ShardFailed)
.await;

// Delete the capture.
let mut draft = tables::DraftCatalog::default();
draft.delete("foxes/capture", CatalogType::Capture, None);
let del_result = harness
.user_publication(user_id, "delete capture", draft)
.await;
assert!(del_result.status.is_success());

harness.run_pending_controllers(None).await;
harness.assert_live_spec_hard_deleted("foxes/capture").await;

// The GQL active alerts query should no longer include the shard_failed alert.
let active_after: serde_json::Value = harness
.execute_graphql_query(
user_id,
r#"query($by: AlertsBy!, $first: Int) {
alerts(by: $by, first: $first) {
edges { node { catalogName alertType resolvedAt } }
}
}"#,
&serde_json::json!({"by": {"prefix": "foxes/", "active": true}, "first": 10}),
)
.await
.expect("graphql query failed");
let edges = active_after["alerts"]["edges"].as_array().unwrap();
assert!(
!edges
.iter()
.any(|e| e["node"]["catalogName"] == "foxes/capture"
&& e["node"]["alertType"] == "shard_failed"),
"shard_failed alert for foxes/capture should not be active after deletion, got: {edges:?}"
);

// The notification tasks for foxes/capture alerts should have been deleted.
let orphaned_tasks = sqlx::query!(
r#"select t.task_id as "task_id: models::Id"
from internal.tasks t
join alert_history ah on ah.id = t.task_id
where ah.catalog_name = 'foxes/capture'"#,
)
.fetch_all(&harness.pool)
.await
.unwrap();
assert!(
orphaned_tasks.is_empty(),
"notification tasks should have been deleted on spec deletion"
);
}

/// Simulates the passage of time after a series of shard failures. The
/// `shard_status` must already be `Ok`, or this will panic. The timestamps of
/// all failure events will have `by_duration` subtracted from them. Also pushes
Expand Down
42 changes: 42 additions & 0 deletions crates/control-plane-api/src/alerts.rs
Original file line number Diff line number Diff line change
Expand Up @@ -270,6 +270,48 @@ async fn resolve_alert(
Ok(())
}

/// Cleans up all open alerts for a catalog name: sets `resolved_at` and deletes
/// the associated notification tasks. Unlike `resolve_alert`, this does not send
/// resolution notifications. Used when a spec is being deleted.
pub async fn cleanup_open_alerts(
catalog_name: &str,
txn: &mut sqlx::PgConnection,
) -> anyhow::Result<()> {
// Resolve open alerts and collect their IDs for notification task cleanup.
let resolved_ids: Vec<models::Id> = sqlx::query_scalar!(
r#"
update alert_history
set resolved_at = now()
where catalog_name = $1
and resolved_at is null
returning id as "id: models::Id"
"#,
catalog_name as &str,
)
.fetch_all(&mut *txn)
.await?;

if resolved_ids.is_empty() {
return Ok(());
}

let count = resolved_ids.len();

// Delete the associated notification tasks so they don't sit suspended.
sqlx::query!(
r#"
delete from internal.tasks t
where t.task_id = any($1::flowid[])
"#,
resolved_ids as Vec<models::Id>,
)
.execute(&mut *txn)
.await?;

tracing::info!(%catalog_name, %count, "silently resolved alerts for deleted spec");
Ok(())
}

pub async fn fetch_open_alerts_by_type(
alert_types: &[AlertType],
pool: &sqlx::PgPool,
Expand Down
20 changes: 20 additions & 0 deletions supabase/migrations/20260410120000_resolve_orphaned_alerts.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
-- Resolve orphaned alerts for tasks that have already been deleted.
-- These are alerts in alert_history with resolved_at IS NULL where either:
-- (a) the live_specs row has been hard-deleted (no matching row), or
-- (b) the live_specs row is soft-deleted (spec IS NULL).
with resolved as (
update public.alert_history ah
set resolved_at = now()
where ah.resolved_at is null
and not exists (
select 1 from public.live_specs ls
where ls.catalog_name = ah.catalog_name
and ls.spec is not null
)
returning ah.id
)
-- Clean up the orphaned notification tasks for these alerts.
-- alert_history.id is the task_id of the corresponding notification task.
delete from internal.tasks t
using resolved r
where t.task_id = r.id;
Loading