From 62096c459fa593623f9a8d4c3ecdbad5c149a8cf Mon Sep 17 00:00:00 2001 From: Joseph Shearer Date: Fri, 10 Apr 2026 15:34:35 -0400 Subject: [PATCH] controller: clean up open alerts when a spec is deleted When a task with active alerts is deleted, the alert resolution actions were never applied, leaving orphaned rows in `alert_history` with `resolved_at = NULL`. The `alertHistory` query then returned these phantom alerts for tasks that no longer exist. * Add `cleanup_open_alerts()` that sets `resolved_at` and deletes the associated notification tasks without sending resolution emails * Call it in `Outcome::apply` before the hard delete when `live_spec_deleted` is true * Add a one-time migration to resolve existing orphaned alerts and their notification tasks Fixes #2846 --- ...81f78ea62aeba6d29d1e02ee8d3fbef96a1ff.json | 28 +++++ ...1002698c3615bcb0301917dca116f8cda0617.json | 22 ++++ crates/agent/src/controllers/executor.rs | 11 ++ .../src/integration_tests/shard_failures.rs | 111 ++++++++++++++++++ crates/control-plane-api/src/alerts.rs | 42 +++++++ ...20260410120000_resolve_orphaned_alerts.sql | 20 ++++ 6 files changed, 234 insertions(+) create mode 100644 .sqlx/query-36cf5f128777775f0a77bd1d1eb81f78ea62aeba6d29d1e02ee8d3fbef96a1ff.json create mode 100644 .sqlx/query-f09dd417c5a1dd6c8ab000d433a1002698c3615bcb0301917dca116f8cda0617.json create mode 100644 supabase/migrations/20260410120000_resolve_orphaned_alerts.sql diff --git a/.sqlx/query-36cf5f128777775f0a77bd1d1eb81f78ea62aeba6d29d1e02ee8d3fbef96a1ff.json b/.sqlx/query-36cf5f128777775f0a77bd1d1eb81f78ea62aeba6d29d1e02ee8d3fbef96a1ff.json new file mode 100644 index 00000000000..d8c98d65bcf --- /dev/null +++ b/.sqlx/query-36cf5f128777775f0a77bd1d1eb81f78ea62aeba6d29d1e02ee8d3fbef96a1ff.json @@ -0,0 +1,28 @@ +{ + "db_name": "PostgreSQL", + "query": "\n delete from internal.tasks t\n where t.task_id = any($1::flowid[])\n ", + "describe": { + "columns": [], + "parameters": { + "Left": [ + { + "Custom": { + "name": "flowid[]", + "kind": { + "Array": { + "Custom": { + "name": "flowid", + "kind": { + "Domain": "Macaddr8" + } + } + } + } + } + } + ] + }, + "nullable": [] + }, + "hash": "36cf5f128777775f0a77bd1d1eb81f78ea62aeba6d29d1e02ee8d3fbef96a1ff" +} diff --git a/.sqlx/query-f09dd417c5a1dd6c8ab000d433a1002698c3615bcb0301917dca116f8cda0617.json b/.sqlx/query-f09dd417c5a1dd6c8ab000d433a1002698c3615bcb0301917dca116f8cda0617.json new file mode 100644 index 00000000000..ff2a4d65c78 --- /dev/null +++ b/.sqlx/query-f09dd417c5a1dd6c8ab000d433a1002698c3615bcb0301917dca116f8cda0617.json @@ -0,0 +1,22 @@ +{ + "db_name": "PostgreSQL", + "query": "\n update alert_history\n set resolved_at = now()\n where catalog_name = $1\n and resolved_at is null\n returning id as \"id: models::Id\"\n ", + "describe": { + "columns": [ + { + "ordinal": 0, + "name": "id: models::Id", + "type_info": "Macaddr8" + } + ], + "parameters": { + "Left": [ + "Text" + ] + }, + "nullable": [ + false + ] + }, + "hash": "f09dd417c5a1dd6c8ab000d433a1002698c3615bcb0301917dca116f8cda0617" +} diff --git a/crates/agent/src/controllers/executor.rs b/crates/agent/src/controllers/executor.rs index 9afcc4a3568..48443824439 100644 --- a/crates/agent/src/controllers/executor.rs +++ b/crates/agent/src/controllers/executor.rs @@ -57,6 +57,8 @@ pub struct State { #[derive(Debug)] pub struct Outcome { live_spec_id: models::Id, + /// The catalog name of the live spec, used for alert cleanup on deletion. + catalog_name: String, /// The next status of the controller. next_status: ControllerStatus, /// When to run the controller next. This will account for any backoff after errors. @@ -78,6 +80,7 @@ impl automations::Outcome for Outcome { async fn apply(self, txn: &mut sqlx::PgConnection) -> anyhow::Result { let Outcome { live_spec_id, + catalog_name, next_status: status, next_run, failures, @@ -87,6 +90,12 @@ impl automations::Outcome for Outcome { } = self; if live_spec_deleted && error.is_none() { + // Clean up any open alerts without sending resolution + // notifications, since the task is being deleted. + control_plane_api::alerts::cleanup_open_alerts(&catalog_name, txn) + .await + .context("resolving alerts for deleted spec")?; + // Do we need to delete the live spec? If `live_spec_id.is_zero()`, // it means that the `live_specs` row had _already_ been deleted // before this controller run began. That can happen due an edge @@ -162,6 +171,7 @@ impl Executor for LiveSpecControllerExe inbox.clear(); return Ok(Outcome { live_spec_id: models::Id::zero(), + catalog_name: String::new(), live_spec_deleted: true, failures: 0, next_run: None, @@ -198,6 +208,7 @@ impl Executor for LiveSpecControllerExe Ok(Outcome { live_spec_id: controller_state.live_spec_id, + catalog_name: controller_state.catalog_name.to_string(), next_status, failures, error, diff --git a/crates/agent/src/integration_tests/shard_failures.rs b/crates/agent/src/integration_tests/shard_failures.rs index 54a820a8e74..54cd3c9a8ab 100644 --- a/crates/agent/src/integration_tests/shard_failures.rs +++ b/crates/agent/src/integration_tests/shard_failures.rs @@ -822,6 +822,117 @@ async fn assert_status_shards_pending(harness: &mut TestHarness, task: &str) { .await; } +#[tokio::test] +async fn test_spec_deletion_cleans_up_alerts() { + let mut harness = TestHarness::init("test_spec_deletion_cleans_up_alerts").await; + let user_id = harness.setup_tenant("foxes").await; + + let draft = draft_catalog(serde_json::json!({ + "collections": { + "foxes/den": { + "schema": { + "type": "object", + "properties": { + "id": { "type": "string" } + } + }, + "key": ["/id"] + } + }, + "captures": { + "foxes/capture": { + "endpoint": { + "connector": { + "image": "source/test:test", + "config": {} + } + }, + "bindings": [ + { + "resource": { "table": "den" }, + "target": "foxes/den" + } + ] + } + } + })); + + let result = harness + .control_plane() + .publish( + Some("initial publication".to_string()), + Uuid::new_v4(), + draft, + Some("ops/dp/public/test".to_string()), + ) + .await + .expect("initial publish failed"); + assert!(result.status.is_success()); + + harness.run_pending_controllers(None).await; + harness.control_plane().reset_activations(); + + // Trigger enough shard failures to fire a ShardFailed alert (threshold is 3). + let state = harness.get_controller_state("foxes/capture").await; + let shard = shard_ref(state.last_build_id, "foxes/capture"); + for _ in 0..3 { + harness.fail_shard(&shard).await; + harness.run_pending_controller("foxes/capture").await; + } + + harness + .assert_alert_firing("foxes/capture", AlertType::ShardFailed) + .await; + + // Delete the capture. + let mut draft = tables::DraftCatalog::default(); + draft.delete("foxes/capture", CatalogType::Capture, None); + let del_result = harness + .user_publication(user_id, "delete capture", draft) + .await; + assert!(del_result.status.is_success()); + + harness.run_pending_controllers(None).await; + harness.assert_live_spec_hard_deleted("foxes/capture").await; + + // The GQL active alerts query should no longer include the shard_failed alert. + let active_after: serde_json::Value = harness + .execute_graphql_query( + user_id, + r#"query($by: AlertsBy!, $first: Int) { + alerts(by: $by, first: $first) { + edges { node { catalogName alertType resolvedAt } } + } + }"#, + &serde_json::json!({"by": {"prefix": "foxes/", "active": true}, "first": 10}), + ) + .await + .expect("graphql query failed"); + let edges = active_after["alerts"]["edges"].as_array().unwrap(); + assert!( + !edges + .iter() + .any(|e| e["node"]["catalogName"] == "foxes/capture" + && e["node"]["alertType"] == "shard_failed"), + "shard_failed alert for foxes/capture should not be active after deletion, got: {edges:?}" + ); + + // The notification tasks for foxes/capture alerts should have been deleted. + let orphaned_tasks = sqlx::query!( + r#"select t.task_id as "task_id: models::Id" + from internal.tasks t + join alert_history ah on ah.id = t.task_id + where ah.catalog_name = 'foxes/capture'"#, + ) + .fetch_all(&harness.pool) + .await + .unwrap(); + assert!( + orphaned_tasks.is_empty(), + "notification tasks should have been deleted on spec deletion" + ); +} + /// Simulates the passage of time after a series of shard failures. The /// `shard_status` must already be `Ok`, or this will panic. The timestamps of /// all failure events will have `by_duration` subtracted from them. Also pushes diff --git a/crates/control-plane-api/src/alerts.rs b/crates/control-plane-api/src/alerts.rs index e93ef53f868..54b2a99ba63 100644 --- a/crates/control-plane-api/src/alerts.rs +++ b/crates/control-plane-api/src/alerts.rs @@ -270,6 +270,48 @@ async fn resolve_alert( Ok(()) } +/// Cleans up all open alerts for a catalog name: sets `resolved_at` and deletes +/// the associated notification tasks. Unlike `resolve_alert`, this does not send +/// resolution notifications. Used when a spec is being deleted. +pub async fn cleanup_open_alerts( + catalog_name: &str, + txn: &mut sqlx::PgConnection, +) -> anyhow::Result<()> { + // Resolve open alerts and collect their IDs for notification task cleanup. + let resolved_ids: Vec = sqlx::query_scalar!( + r#" + update alert_history + set resolved_at = now() + where catalog_name = $1 + and resolved_at is null + returning id as "id: models::Id" + "#, + catalog_name as &str, + ) + .fetch_all(&mut *txn) + .await?; + + if resolved_ids.is_empty() { + return Ok(()); + } + + let count = resolved_ids.len(); + + // Delete the associated notification tasks so they don't sit suspended. + sqlx::query!( + r#" + delete from internal.tasks t + where t.task_id = any($1::flowid[]) + "#, + resolved_ids as Vec, + ) + .execute(&mut *txn) + .await?; + + tracing::info!(%catalog_name, %count, "silently resolved alerts for deleted spec"); + Ok(()) +} + pub async fn fetch_open_alerts_by_type( alert_types: &[AlertType], pool: &sqlx::PgPool, diff --git a/supabase/migrations/20260410120000_resolve_orphaned_alerts.sql b/supabase/migrations/20260410120000_resolve_orphaned_alerts.sql new file mode 100644 index 00000000000..ba982313c60 --- /dev/null +++ b/supabase/migrations/20260410120000_resolve_orphaned_alerts.sql @@ -0,0 +1,20 @@ +-- Resolve orphaned alerts for tasks that have already been deleted. +-- These are alerts in alert_history with resolved_at IS NULL where either: +-- (a) the live_specs row has been hard-deleted (no matching row), or +-- (b) the live_specs row is soft-deleted (spec IS NULL). +with resolved as ( + update public.alert_history ah + set resolved_at = now() + where ah.resolved_at is null + and not exists ( + select 1 from public.live_specs ls + where ls.catalog_name = ah.catalog_name + and ls.spec is not null + ) + returning ah.id +) +-- Clean up the orphaned notification tasks for these alerts. +-- alert_history.id is the task_id of the corresponding notification task. +delete from internal.tasks t +using resolved r +where t.task_id = r.id;