From 2aa43ed15a5bc9dc29e98956eb8a7d05f666aa72 Mon Sep 17 00:00:00 2001 From: Shirly Radco Date: Thu, 12 Mar 2026 20:42:50 +0200 Subject: [PATCH] k8s: add orphan AlertRelabelConfig GC Detect and remove orphan AlertRelabelConfig resources that no longer have a matching PrometheusRule, preventing stale relabel configs from accumulating. Signed-off-by: Shirly Radco Co-authored-by: AI Assistant --- go.mod | 4 +- pkg/k8s/alert_relabel_config_gc.go | 52 ++++ pkg/k8s/alert_relabel_config_gc_test.go | 168 +++++++++++ pkg/k8s/relabeled_rules.go | 19 +- pkg/metrics/alerts_collector.go | 223 ++++++++++++++ pkg/metrics/alerts_collector_test.go | 354 +++++++++++++++++++++++ pkg/metrics/leader_election.go | 87 ++++++ test/e2e/alerts_effective_metric_test.go | 313 ++++++++++++++++++++ 8 files changed, 1212 insertions(+), 8 deletions(-) create mode 100644 pkg/k8s/alert_relabel_config_gc.go create mode 100644 pkg/k8s/alert_relabel_config_gc_test.go create mode 100644 pkg/metrics/alerts_collector.go create mode 100644 pkg/metrics/alerts_collector_test.go create mode 100644 pkg/metrics/leader_election.go create mode 100644 test/e2e/alerts_effective_metric_test.go diff --git a/go.mod b/go.mod index 1e2bae37b..05de94d9d 100644 --- a/go.mod +++ b/go.mod @@ -11,6 +11,8 @@ require ( github.com/openshift/library-go v0.0.0-20240905123346-5bdbfe35a6f5 github.com/prometheus-operator/prometheus-operator/pkg/apis/monitoring v0.87.0 github.com/prometheus-operator/prometheus-operator/pkg/client v0.87.0 + github.com/prometheus/client_golang v1.23.2 + github.com/prometheus/client_model v0.6.2 github.com/prometheus/common v0.67.4 github.com/prometheus/prometheus v0.308.0 github.com/sirupsen/logrus v1.9.3 @@ -57,8 +59,6 @@ require ( github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 // indirect github.com/pkg/errors v0.9.1 // indirect github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 // indirect - github.com/prometheus/client_golang v1.23.2 // indirect - github.com/prometheus/client_model v0.6.2 // indirect github.com/prometheus/procfs v0.16.1 // indirect github.com/spf13/pflag v1.0.6 // indirect github.com/x448/float16 v0.8.4 // indirect diff --git a/pkg/k8s/alert_relabel_config_gc.go b/pkg/k8s/alert_relabel_config_gc.go new file mode 100644 index 000000000..7c9e92a2e --- /dev/null +++ b/pkg/k8s/alert_relabel_config_gc.go @@ -0,0 +1,52 @@ +package k8s + +import ( + "context" + + "github.com/openshift/monitoring-plugin/pkg/managementlabels" +) + +// gcOrphanedARCs deletes AlertRelabelConfigs whose associated alert rule no +// longer exists. This handles the case where an operator (or manual action) +// removes rules from a PrometheusRule or deletes the CR entirely — the ARCs +// that were created by the plugin for classification/drop/stamp become orphans. +// +// Only ARCs carrying the plugin's alertRuleId annotation are considered. +// GitOps-managed ARCs are never deleted automatically; a warning is logged +// so that operators can clean them up manually. +func (rrm *relabeledRulesManager) gcOrphanedARCs(ctx context.Context, liveRuleIDs map[string]struct{}) { + if rrm.alertRelabelConfigs == nil { + return + } + + arcs, err := rrm.alertRelabelConfigs.List(ctx, "") + if err != nil { + log.Errorf("orphan ARC GC: failed to list ARCs: %v", err) + return + } + + for i := range arcs { + arc := &arcs[i] + + ruleID, ok := arc.Annotations[managementlabels.ARCAnnotationAlertRuleIDKey] + if !ok || ruleID == "" { + continue + } + + if _, alive := liveRuleIDs[ruleID]; alive { + continue + } + + if IsManagedByGitOps(arc.Annotations, arc.Labels) { + log.Warnf("orphan ARC GC: ARC %s/%s (ruleId=%s) is orphaned but GitOps-managed — skipping deletion, manual cleanup required", arc.Namespace, arc.Name, ruleID) + continue + } + + if err := rrm.alertRelabelConfigs.Delete(ctx, arc.Namespace, arc.Name); err != nil { + log.Errorf("orphan ARC GC: failed to delete ARC %s/%s: %v", arc.Namespace, arc.Name, err) + continue + } + + log.Infof("orphan ARC GC: deleted orphaned ARC %s/%s (ruleId=%s)", arc.Namespace, arc.Name, ruleID) + } +} diff --git a/pkg/k8s/alert_relabel_config_gc_test.go b/pkg/k8s/alert_relabel_config_gc_test.go new file mode 100644 index 000000000..e139ad965 --- /dev/null +++ b/pkg/k8s/alert_relabel_config_gc_test.go @@ -0,0 +1,168 @@ +package k8s + +import ( + "context" + "testing" + + osmv1 "github.com/openshift/api/monitoring/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + + "github.com/openshift/monitoring-plugin/pkg/managementlabels" +) + +type mockARCInterface struct { + arcs map[string]*osmv1.AlertRelabelConfig + deleted []string +} + +func (m *mockARCInterface) List(_ context.Context, _ string) ([]osmv1.AlertRelabelConfig, error) { + var result []osmv1.AlertRelabelConfig + for _, arc := range m.arcs { + result = append(result, *arc) + } + return result, nil +} + +func (m *mockARCInterface) Get(_ context.Context, ns, name string) (*osmv1.AlertRelabelConfig, bool, error) { + if arc, ok := m.arcs[ns+"/"+name]; ok { + return arc, true, nil + } + return nil, false, nil +} + +func (m *mockARCInterface) Create(_ context.Context, arc osmv1.AlertRelabelConfig) (*osmv1.AlertRelabelConfig, error) { + return &arc, nil +} + +func (m *mockARCInterface) Update(_ context.Context, _ osmv1.AlertRelabelConfig) error { return nil } + +func (m *mockARCInterface) Delete(_ context.Context, ns, name string) error { + m.deleted = append(m.deleted, ns+"/"+name) + delete(m.arcs, ns+"/"+name) + return nil +} + +func newARC(ns, name, ruleID string, annotations, labels map[string]string) *osmv1.AlertRelabelConfig { + if annotations == nil { + annotations = map[string]string{} + } + if ruleID != "" { + annotations[managementlabels.ARCAnnotationAlertRuleIDKey] = ruleID + } + return &osmv1.AlertRelabelConfig{ + ObjectMeta: metav1.ObjectMeta{ + Name: name, + Namespace: ns, + Annotations: annotations, + Labels: labels, + }, + } +} + +func TestGCOrphanedARCs_DeletesOrphan(t *testing.T) { + mock := &mockARCInterface{ + arcs: map[string]*osmv1.AlertRelabelConfig{ + "openshift-monitoring/arc-orphan": newARC("openshift-monitoring", "arc-orphan", "rule-gone", nil, nil), + }, + } + rrm := &relabeledRulesManager{alertRelabelConfigs: mock} + + rrm.gcOrphanedARCs(context.Background(), map[string]struct{}{}) + + if len(mock.deleted) != 1 || mock.deleted[0] != "openshift-monitoring/arc-orphan" { + t.Fatalf("expected orphan ARC to be deleted, got deleted=%v", mock.deleted) + } +} + +func TestGCOrphanedARCs_KeepsLiveRule(t *testing.T) { + mock := &mockARCInterface{ + arcs: map[string]*osmv1.AlertRelabelConfig{ + "openshift-monitoring/arc-live": newARC("openshift-monitoring", "arc-live", "rule-alive", nil, nil), + }, + } + rrm := &relabeledRulesManager{alertRelabelConfigs: mock} + + rrm.gcOrphanedARCs(context.Background(), map[string]struct{}{"rule-alive": {}}) + + if len(mock.deleted) != 0 { + t.Fatalf("expected no deletions, got deleted=%v", mock.deleted) + } +} + +func TestGCOrphanedARCs_SkipsGitOpsManaged(t *testing.T) { + mock := &mockARCInterface{ + arcs: map[string]*osmv1.AlertRelabelConfig{ + "openshift-monitoring/arc-gitops": newARC("openshift-monitoring", "arc-gitops", "rule-gone", + map[string]string{"argocd.argoproj.io/tracking-id": "some-id"}, nil), + }, + } + rrm := &relabeledRulesManager{alertRelabelConfigs: mock} + + rrm.gcOrphanedARCs(context.Background(), map[string]struct{}{}) + + if len(mock.deleted) != 0 { + t.Fatalf("expected GitOps-managed ARC to be preserved, got deleted=%v", mock.deleted) + } +} + +func TestGCOrphanedARCs_SkipsARCWithoutAnnotation(t *testing.T) { + mock := &mockARCInterface{ + arcs: map[string]*osmv1.AlertRelabelConfig{ + "openshift-monitoring/arc-manual": newARC("openshift-monitoring", "arc-manual", "", nil, nil), + }, + } + rrm := &relabeledRulesManager{alertRelabelConfigs: mock} + + rrm.gcOrphanedARCs(context.Background(), map[string]struct{}{}) + + if len(mock.deleted) != 0 { + t.Fatalf("expected ARC without annotation to be preserved, got deleted=%v", mock.deleted) + } +} + +func TestGCOrphanedARCs_MixedScenario(t *testing.T) { + mock := &mockARCInterface{ + arcs: map[string]*osmv1.AlertRelabelConfig{ + "openshift-monitoring/arc-live": newARC("openshift-monitoring", "arc-live", "rule-1", nil, nil), + "openshift-monitoring/arc-orphan1": newARC("openshift-monitoring", "arc-orphan1", "rule-deleted-1", nil, nil), + "openshift-monitoring/arc-orphan2": newARC("openshift-monitoring", "arc-orphan2", "rule-deleted-2", nil, nil), + "openshift-monitoring/arc-gitops": newARC("openshift-monitoring", "arc-gitops", "rule-deleted-3", + map[string]string{"argocd.argoproj.io/tracking-id": "t"}, nil), + "openshift-monitoring/arc-manual": newARC("openshift-monitoring", "arc-manual", "", nil, nil), + }, + } + rrm := &relabeledRulesManager{alertRelabelConfigs: mock} + + liveIDs := map[string]struct{}{"rule-1": {}} + rrm.gcOrphanedARCs(context.Background(), liveIDs) + + deletedSet := map[string]bool{} + for _, d := range mock.deleted { + deletedSet[d] = true + } + + if len(mock.deleted) != 2 { + t.Fatalf("expected 2 deletions, got %d: %v", len(mock.deleted), mock.deleted) + } + if !deletedSet["openshift-monitoring/arc-orphan1"] { + t.Error("expected arc-orphan1 to be deleted") + } + if !deletedSet["openshift-monitoring/arc-orphan2"] { + t.Error("expected arc-orphan2 to be deleted") + } + if deletedSet["openshift-monitoring/arc-live"] { + t.Error("arc-live should not have been deleted") + } + if deletedSet["openshift-monitoring/arc-gitops"] { + t.Error("arc-gitops should not have been deleted (GitOps-managed)") + } + if deletedSet["openshift-monitoring/arc-manual"] { + t.Error("arc-manual should not have been deleted (no annotation)") + } +} + +func TestGCOrphanedARCs_NilInterface(t *testing.T) { + rrm := &relabeledRulesManager{alertRelabelConfigs: nil} + // Should not panic + rrm.gcOrphanedARCs(context.Background(), map[string]struct{}{}) +} diff --git a/pkg/k8s/relabeled_rules.go b/pkg/k8s/relabeled_rules.go index a853630ea..9ecec5325 100644 --- a/pkg/k8s/relabeled_rules.go +++ b/pkg/k8s/relabeled_rules.go @@ -148,7 +148,7 @@ func newRelabeledRulesManager(ctx context.Context, namespaceManager NamespaceInt return nil, fmt.Errorf("failed to sync RelabeledRulesConfig informer") } - if err := rrm.sync(ctx); err != nil { + if err := rrm.sync(ctx, "initial-sync"); err != nil { return nil, fmt.Errorf("initial relabeled rules sync failed: %w", err) } @@ -179,7 +179,7 @@ func (rrm *relabeledRulesManager) processNextWorkItem(ctx context.Context) bool defer rrm.queue.Done(key) - if err := rrm.sync(ctx); err != nil { + if err := rrm.sync(ctx, key); err != nil { log.Errorf("error syncing relabeled rules: %v", err) rrm.queue.AddRateLimited(key) return true @@ -190,7 +190,7 @@ func (rrm *relabeledRulesManager) processNextWorkItem(ctx context.Context) bool return true } -func (rrm *relabeledRulesManager) sync(ctx context.Context) error { +func (rrm *relabeledRulesManager) sync(ctx context.Context, key string) error { relabelConfigs, err := rrm.loadRelabelConfigs() if err != nil { return fmt.Errorf("failed to load relabel configs: %w", err) @@ -200,13 +200,20 @@ func (rrm *relabeledRulesManager) sync(ctx context.Context) error { rrm.relabelConfigs = relabelConfigs rrm.mu.Unlock() - alerts := rrm.collectAlerts(ctx, relabelConfigs) + alerts, allRuleIDs := rrm.collectAlerts(ctx, relabelConfigs) rrm.mu.Lock() rrm.relabeledRules = alerts rrm.mu.Unlock() log.Infof("Synced %d relabeled rules in memory", len(alerts)) + + // GC orphaned ARCs only when triggered by PrometheusRule events or + // initial sync — secret-only changes cannot create orphans. + if key == "prometheus-rule-sync" || key == "initial-sync" { + rrm.gcOrphanedARCs(ctx, allRuleIDs) + } + return nil } @@ -255,7 +262,7 @@ func (rrm *relabeledRulesManager) loadRelabelConfigs() ([]*relabel.Config, error return configs, nil } -func (rrm *relabeledRulesManager) collectAlerts(ctx context.Context, relabelConfigs []*relabel.Config) map[string]monitoringv1.Rule { +func (rrm *relabeledRulesManager) collectAlerts(ctx context.Context, relabelConfigs []*relabel.Config) (map[string]monitoringv1.Rule, map[string]struct{}) { alerts := make(map[string]monitoringv1.Rule) seenIDs := make(map[string]struct{}) @@ -330,7 +337,7 @@ func (rrm *relabeledRulesManager) collectAlerts(ctx context.Context, relabelConf } log.Debugf("Collected %d alerts", len(alerts)) - return alerts + return alerts, seenIDs } // alertingRuleOwner returns the name of the AlertingRule CR that generated diff --git a/pkg/metrics/alerts_collector.go b/pkg/metrics/alerts_collector.go new file mode 100644 index 000000000..63aa18e83 --- /dev/null +++ b/pkg/metrics/alerts_collector.go @@ -0,0 +1,223 @@ +package metrics + +import ( + "context" + "fmt" + "net/http" + "sort" + "sync" + "time" + + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/promhttp" + "github.com/sirupsen/logrus" + + "github.com/openshift/monitoring-plugin/pkg/k8s" + "k8s.io/client-go/rest" +) + +var metricsLog = logrus.WithField("module", "metrics") + +const ( + MetricName = "alerts_effective_active_at_timestamp_seconds" + metricHelp = "The activeAt timestamp of effective (post-ARC) alerts. " + + "Value is the Unix timestamp when the alert became active." + + DefaultSyncInterval = 30 * time.Second + + labelAlertState = "alertstate" +) + +// AlertsFetcher retrieves enriched alerts for the metric. The management.Client +// satisfies this interface — it applies ARC relabeling and computes +// classification (AlertComponent / AlertLayer) on every alert. +type AlertsFetcher interface { + GetAlerts(ctx context.Context, req k8s.GetAlertsRequest) ([]k8s.PrometheusAlert, error) +} + +// alertMetric holds a single alert's pre-built metric data. +// The prometheus.Desc is created once during sync, not on every scrape. +type alertMetric struct { + desc *prometheus.Desc + labelValues []string + activeAtSec float64 +} + +// AlertsCollector implements prometheus.Collector. It periodically fetches +// alerts via the management client's GetAlerts (which applies ARC relabeling +// and computes classification) and exposes them as the +// alerts_effective_active_at_timestamp_seconds gauge. +// +// Only the leader pod (determined via Lease-based leader election) runs the +// sync loop and exposes metrics. Follower pods return nothing on Collect, +// ensuring each alert appears exactly once in Prometheus. +// +// Each alert produces one time series whose value is the alert's activeAt +// Unix timestamp. Labels are the alert's enriched labels (post-ARC, source, +// backend, component, layer) plus "alertstate". Thanos-sourced alerts are +// filtered out to avoid duplicates. Annotations are excluded because they +// are available from the alert rule definition. +type AlertsCollector struct { + fetcher AlertsFetcher + syncInterval time.Duration + isLeader func() bool + + mu sync.RWMutex + metrics []alertMetric + + sentinelDesc *prometheus.Desc +} + +// NewHandler creates a metrics HTTP handler that exposes the alerts effective +// metric. It sets up Lease-based leader election internally so that only one +// replica produces metrics, then wires the collector, registry and promhttp +// handler. Callers receive a ready-to-use http.Handler. +func NewHandler(ctx context.Context, fetcher AlertsFetcher, kubeConfig *rest.Config) (http.Handler, error) { + isLeader, err := startLeaderElection(ctx, kubeConfig, k8s.ClusterMonitoringNamespace) + if err != nil { + return nil, fmt.Errorf("start metrics leader election: %w", err) + } + + collector := NewAlertsCollector(ctx, fetcher, DefaultSyncInterval, isLeader) + registry := prometheus.NewRegistry() + registry.MustRegister(collector) + return promhttp.HandlerFor(registry, promhttp.HandlerOpts{}), nil +} + +// NewAlertsCollector creates a collector that periodically syncs alerts and +// exposes them as Prometheus metrics. The isLeader callback controls whether +// this replica actively syncs and exposes metrics (follower pods return nothing). +func NewAlertsCollector(ctx context.Context, fetcher AlertsFetcher, syncInterval time.Duration, isLeader func() bool) *AlertsCollector { + c := &AlertsCollector{ + fetcher: fetcher, + syncInterval: syncInterval, + isLeader: isLeader, + sentinelDesc: prometheus.NewDesc(MetricName, metricHelp, nil, nil), + } + go c.syncLoop(ctx) + return c +} + +// Describe sends a sentinel descriptor to satisfy the Collector contract. +func (c *AlertsCollector) Describe(ch chan<- *prometheus.Desc) { + ch <- c.sentinelDesc +} + +// Collect emits the current set of alert metrics using pre-built Descs. +// Returns nothing if this replica is not the leader. +func (c *AlertsCollector) Collect(ch chan<- prometheus.Metric) { + if !c.isLeader() { + return + } + + c.mu.RLock() + defer c.mu.RUnlock() + + for i := range c.metrics { + m := &c.metrics[i] + metric, err := prometheus.NewConstMetric(m.desc, prometheus.GaugeValue, m.activeAtSec, m.labelValues...) + if err != nil { + metricsLog.WithError(err).Warn("failed to create metric") + continue + } + ch <- metric + } +} + +func (c *AlertsCollector) syncLoop(ctx context.Context) { + c.sync(ctx) + + ticker := time.NewTicker(c.syncInterval) + defer ticker.Stop() + + for { + select { + case <-ctx.Done(): + return + case <-ticker.C: + c.sync(ctx) + } + } +} + +func (c *AlertsCollector) sync(ctx context.Context) { + if !c.isLeader() { + return + } + + alerts, err := c.fetcher.GetAlerts(ctx, k8s.GetAlertsRequest{}) + if err != nil { + metricsLog.WithError(err).Warn("failed to fetch alerts for effective metric") + return + } + + built := make([]alertMetric, 0, len(alerts)) + for i := range alerts { + alert := &alerts[i] + + // Drop Thanos-sourced alerts: they duplicate what Alertmanager and + // Prometheus already provide and would inflate the metric cardinality. + if alert.Labels[k8s.AlertBackendLabel] == k8s.AlertBackendThanos { + continue + } + + enrichClassificationLabels(alert) + + m := buildAlertMetric(alert) + if m != nil { + built = append(built, *m) + } + } + + c.mu.Lock() + c.metrics = built + c.mu.Unlock() + + metricsLog.Debugf("synced %d alerts for effective metric", len(built)) +} + +// enrichClassificationLabels copies the management-computed AlertComponent and +// AlertLayer into the alert's Labels map so they appear on the metric. Labels +// already set (e.g. via ARC) take precedence. +func enrichClassificationLabels(alert *k8s.PrometheusAlert) { + if alert.AlertComponent != "" { + if _, exists := alert.Labels[k8s.AlertRuleClassificationComponentKey]; !exists { + alert.Labels[k8s.AlertRuleClassificationComponentKey] = alert.AlertComponent + } + } + if alert.AlertLayer != "" { + if _, exists := alert.Labels[k8s.AlertRuleClassificationLayerKey]; !exists { + alert.Labels[k8s.AlertRuleClassificationLayerKey] = alert.AlertLayer + } + } +} + +// buildAlertMetric converts a PrometheusAlert into an alertMetric with a +// pre-built prometheus.Desc. Uses the alert's labels plus the alertstate label. +func buildAlertMetric(alert *k8s.PrometheusAlert) *alertMetric { + if alert.ActiveAt.IsZero() { + return nil + } + + labelNames := make([]string, 0, len(alert.Labels)+1) + for k := range alert.Labels { + labelNames = append(labelNames, k) + } + sort.Strings(labelNames) + labelNames = append(labelNames, labelAlertState) + + labelValues := make([]string, 0, len(labelNames)) + for _, name := range labelNames { + if name == labelAlertState { + labelValues = append(labelValues, alert.State) + } else { + labelValues = append(labelValues, alert.Labels[name]) + } + } + + return &alertMetric{ + desc: prometheus.NewDesc(MetricName, metricHelp, labelNames, nil), + labelValues: labelValues, + activeAtSec: float64(alert.ActiveAt.Unix()), + } +} diff --git a/pkg/metrics/alerts_collector_test.go b/pkg/metrics/alerts_collector_test.go new file mode 100644 index 000000000..6cc30efbe --- /dev/null +++ b/pkg/metrics/alerts_collector_test.go @@ -0,0 +1,354 @@ +package metrics_test + +import ( + "context" + "errors" + "testing" + "time" + + "github.com/prometheus/client_golang/prometheus" + dto "github.com/prometheus/client_model/go" + + "github.com/openshift/monitoring-plugin/pkg/k8s" + "github.com/openshift/monitoring-plugin/pkg/metrics" +) + +type mockAlertsFetcher struct { + alerts []k8s.PrometheusAlert + err error +} + +func (m *mockAlertsFetcher) GetAlerts(_ context.Context, _ k8s.GetAlertsRequest) ([]k8s.PrometheusAlert, error) { + return m.alerts, m.err +} + +func collectMetrics(t *testing.T, collector prometheus.Collector) []*dto.MetricFamily { + t.Helper() + reg := prometheus.NewRegistry() + reg.MustRegister(collector) + families, err := reg.Gather() + if err != nil { + t.Fatalf("gather metrics: %v", err) + } + return families +} + +func findFamily(families []*dto.MetricFamily, name string) *dto.MetricFamily { + for _, f := range families { + if f.GetName() == name { + return f + } + } + return nil +} + +func labelValue(m *dto.Metric, name string) string { + for _, lp := range m.GetLabel() { + if lp.GetName() == name { + return lp.GetValue() + } + } + return "" +} + +func newCollector(t *testing.T, mock *mockAlertsFetcher) (prometheus.Collector, context.CancelFunc) { + t.Helper() + ctx, cancel := context.WithCancel(context.Background()) + collector := metrics.NewAlertsCollector(ctx, mock, 1*time.Hour, func() bool { return true }) + time.Sleep(100 * time.Millisecond) + t.Cleanup(cancel) + return collector, cancel +} + +func TestAlertsCollector_FiringAndSilenced(t *testing.T) { + activeAt := time.Date(2026, 3, 5, 10, 0, 0, 0, time.UTC) + mock := &mockAlertsFetcher{ + alerts: []k8s.PrometheusAlert{ + { + Labels: map[string]string{"alertname": "HighCPU", "severity": "critical", "namespace": "production"}, + State: "firing", + ActiveAt: activeAt, + }, + { + Labels: map[string]string{"alertname": "DiskFull", "severity": "warning", "namespace": "storage"}, + State: "silenced", + ActiveAt: activeAt.Add(-1 * time.Hour), + }, + }, + } + collector, _ := newCollector(t, mock) + families := collectMetrics(t, collector) + family := findFamily(families, metrics.MetricName) + if family == nil { + t.Fatal("expected metric family, got nil") + } + if len(family.GetMetric()) != 2 { + t.Fatalf("expected 2 metrics, got %d", len(family.GetMetric())) + } + + var firing, silenced *dto.Metric + for _, m := range family.GetMetric() { + switch labelValue(m, "alertname") { + case "HighCPU": + firing = m + case "DiskFull": + silenced = m + } + } + + if firing == nil { + t.Fatal("expected HighCPU metric") + } + if labelValue(firing, "alertstate") != "firing" { + t.Errorf("alertstate: want firing, got %q", labelValue(firing, "alertstate")) + } + if labelValue(firing, "severity") != "critical" { + t.Errorf("severity: want critical, got %q", labelValue(firing, "severity")) + } + if labelValue(firing, "namespace") != "production" { + t.Errorf("namespace: want production, got %q", labelValue(firing, "namespace")) + } + if firing.GetGauge().GetValue() != float64(activeAt.Unix()) { + t.Errorf("gauge value: want %v, got %v", float64(activeAt.Unix()), firing.GetGauge().GetValue()) + } + + if silenced == nil { + t.Fatal("expected DiskFull metric") + } + if labelValue(silenced, "alertstate") != "silenced" { + t.Errorf("alertstate: want silenced, got %q", labelValue(silenced, "alertstate")) + } + if silenced.GetGauge().GetValue() != float64(activeAt.Add(-1*time.Hour).Unix()) { + t.Errorf("silenced gauge value mismatch") + } +} + +func TestAlertsCollector_NoAnnotationLabels(t *testing.T) { + mock := &mockAlertsFetcher{ + alerts: []k8s.PrometheusAlert{ + {Labels: map[string]string{"alertname": "TestAlert"}, State: "firing", ActiveAt: time.Now()}, + }, + } + collector, _ := newCollector(t, mock) + families := collectMetrics(t, collector) + family := findFamily(families, metrics.MetricName) + if family == nil { + t.Fatal("expected metric family") + } + if len(family.GetMetric()) != 1 { + t.Fatalf("expected 1 metric, got %d", len(family.GetMetric())) + } + for _, lp := range family.GetMetric()[0].GetLabel() { + switch lp.GetName() { + case "summary", "description", "runbook_url": + t.Errorf("unexpected annotation label: %s", lp.GetName()) + } + } +} + +func TestAlertsCollector_SkipsZeroActiveAt(t *testing.T) { + mock := &mockAlertsFetcher{ + alerts: []k8s.PrometheusAlert{ + {Labels: map[string]string{"alertname": "NoActiveAt"}, State: "firing", ActiveAt: time.Time{}}, + }, + } + collector, _ := newCollector(t, mock) + families := collectMetrics(t, collector) + family := findFamily(families, metrics.MetricName) + if family != nil && len(family.GetMetric()) != 0 { + t.Errorf("expected no metrics for zero ActiveAt, got %d", len(family.GetMetric())) + } +} + +func TestAlertsCollector_EmptyAlerts(t *testing.T) { + mock := &mockAlertsFetcher{alerts: []k8s.PrometheusAlert{}} + collector, _ := newCollector(t, mock) + families := collectMetrics(t, collector) + family := findFamily(families, metrics.MetricName) + if family != nil && len(family.GetMetric()) != 0 { + t.Errorf("expected no metrics for empty alerts, got %d", len(family.GetMetric())) + } +} + +func TestAlertsCollector_FetcherErrorProducesNoMetrics(t *testing.T) { + mock := &mockAlertsFetcher{err: errors.New("connection refused")} + collector, _ := newCollector(t, mock) + families := collectMetrics(t, collector) + family := findFamily(families, metrics.MetricName) + if family != nil && len(family.GetMetric()) != 0 { + t.Errorf("expected no metrics on initial failure, got %d", len(family.GetMetric())) + } +} + +func TestAlertsCollector_ClassificationLabels(t *testing.T) { + mock := &mockAlertsFetcher{ + alerts: []k8s.PrometheusAlert{ + { + Labels: map[string]string{ + "alertname": "KubePodCrashLooping", + "severity": "warning", + "namespace": "kube-system", + k8s.AlertRuleLabelId: "abc123", + k8s.AlertRuleClassificationComponentKey: "kube-controller-manager", + k8s.AlertRuleClassificationLayerKey: "cluster", + }, + State: "firing", + ActiveAt: time.Now(), + }, + }, + } + collector, _ := newCollector(t, mock) + families := collectMetrics(t, collector) + family := findFamily(families, metrics.MetricName) + if family == nil || len(family.GetMetric()) != 1 { + t.Fatalf("expected 1 metric, got family=%v", family) + } + m := family.GetMetric()[0] + checks := map[string]string{ + k8s.AlertRuleLabelId: "abc123", + k8s.AlertRuleClassificationComponentKey: "kube-controller-manager", + k8s.AlertRuleClassificationLayerKey: "cluster", + "alertstate": "firing", + } + for k, want := range checks { + if got := labelValue(m, k); got != want { + t.Errorf("label[%s]: want %q, got %q", k, want, got) + } + } +} + +func TestAlertsCollector_IncludesPendingAlerts(t *testing.T) { + mock := &mockAlertsFetcher{ + alerts: []k8s.PrometheusAlert{ + {Labels: map[string]string{"alertname": "Firing"}, State: "firing", ActiveAt: time.Now()}, + {Labels: map[string]string{"alertname": "Silenced"}, State: "silenced", ActiveAt: time.Now()}, + {Labels: map[string]string{"alertname": "Pending"}, State: "pending", ActiveAt: time.Now()}, + }, + } + collector, _ := newCollector(t, mock) + families := collectMetrics(t, collector) + family := findFamily(families, metrics.MetricName) + if family == nil || len(family.GetMetric()) != 3 { + t.Fatalf("expected 3 metrics, got %v", family) + } + states := map[string]bool{} + for _, m := range family.GetMetric() { + states[labelValue(m, "alertstate")] = true + } + for _, s := range []string{"firing", "silenced", "pending"} { + if !states[s] { + t.Errorf("expected state %q in metrics", s) + } + } +} + +func TestAlertsCollector_SourceAndBackendLabels(t *testing.T) { + mock := &mockAlertsFetcher{ + alerts: []k8s.PrometheusAlert{ + { + Labels: map[string]string{ + "alertname": "HighCPU", + "severity": "critical", + k8s.AlertSourceLabel: k8s.AlertSourcePlatform, + k8s.AlertBackendLabel: k8s.AlertBackendAM, + }, + State: "firing", + ActiveAt: time.Now(), + }, + }, + } + collector, _ := newCollector(t, mock) + families := collectMetrics(t, collector) + family := findFamily(families, metrics.MetricName) + if family == nil || len(family.GetMetric()) != 1 { + t.Fatalf("expected 1 metric") + } + m := family.GetMetric()[0] + if got := labelValue(m, k8s.AlertSourceLabel); got != k8s.AlertSourcePlatform { + t.Errorf("source: want %q, got %q", k8s.AlertSourcePlatform, got) + } + if got := labelValue(m, k8s.AlertBackendLabel); got != k8s.AlertBackendAM { + t.Errorf("backend: want %q, got %q", k8s.AlertBackendAM, got) + } +} + +func TestAlertsCollector_FiltersThanosBackend(t *testing.T) { + now := time.Now() + mock := &mockAlertsFetcher{ + alerts: []k8s.PrometheusAlert{ + {Labels: map[string]string{"alertname": "HighCPU", k8s.AlertBackendLabel: k8s.AlertBackendAM, k8s.AlertSourceLabel: k8s.AlertSourcePlatform}, State: "firing", ActiveAt: now}, + {Labels: map[string]string{"alertname": "HighCPU", k8s.AlertBackendLabel: k8s.AlertBackendThanos, k8s.AlertSourceLabel: k8s.AlertSourceUser}, State: "firing", ActiveAt: now}, + {Labels: map[string]string{"alertname": "PendingAlert", k8s.AlertBackendLabel: k8s.AlertBackendProm, k8s.AlertSourceLabel: k8s.AlertSourcePlatform}, State: "pending", ActiveAt: now}, + }, + } + collector, _ := newCollector(t, mock) + families := collectMetrics(t, collector) + family := findFamily(families, metrics.MetricName) + if family == nil || len(family.GetMetric()) != 2 { + t.Fatalf("expected 2 metrics (thanos filtered), got %v", family) + } + for _, m := range family.GetMetric() { + if labelValue(m, k8s.AlertBackendLabel) == k8s.AlertBackendThanos { + t.Error("thanos duplicate should be filtered out") + } + } +} + +func TestAlertsCollector_InjectsClassificationFromFields(t *testing.T) { + mock := &mockAlertsFetcher{ + alerts: []k8s.PrometheusAlert{ + { + Labels: map[string]string{"alertname": "TestAlert", k8s.AlertBackendLabel: k8s.AlertBackendAM}, + State: "firing", + ActiveAt: time.Now(), + AlertComponent: "networking", + AlertLayer: "cluster", + }, + }, + } + collector, _ := newCollector(t, mock) + families := collectMetrics(t, collector) + family := findFamily(families, metrics.MetricName) + if family == nil || len(family.GetMetric()) != 1 { + t.Fatalf("expected 1 metric") + } + m := family.GetMetric()[0] + if got := labelValue(m, k8s.AlertRuleClassificationComponentKey); got != "networking" { + t.Errorf("component: want networking, got %q", got) + } + if got := labelValue(m, k8s.AlertRuleClassificationLayerKey); got != "cluster" { + t.Errorf("layer: want cluster, got %q", got) + } +} + +func TestAlertsCollector_DoesNotOverwriteARCLabels(t *testing.T) { + mock := &mockAlertsFetcher{ + alerts: []k8s.PrometheusAlert{ + { + Labels: map[string]string{ + "alertname": "TestAlert", + k8s.AlertBackendLabel: k8s.AlertBackendAM, + k8s.AlertRuleClassificationComponentKey: "arc-component", + k8s.AlertRuleClassificationLayerKey: "namespace", + }, + State: "firing", + ActiveAt: time.Now(), + AlertComponent: "default-component", + AlertLayer: "cluster", + }, + }, + } + collector, _ := newCollector(t, mock) + families := collectMetrics(t, collector) + family := findFamily(families, metrics.MetricName) + if family == nil || len(family.GetMetric()) != 1 { + t.Fatalf("expected 1 metric") + } + m := family.GetMetric()[0] + if got := labelValue(m, k8s.AlertRuleClassificationComponentKey); got != "arc-component" { + t.Errorf("component: want arc-component, got %q", got) + } + if got := labelValue(m, k8s.AlertRuleClassificationLayerKey); got != "namespace" { + t.Errorf("layer: want namespace, got %q", got) + } +} diff --git a/pkg/metrics/leader_election.go b/pkg/metrics/leader_election.go new file mode 100644 index 000000000..2a71f9231 --- /dev/null +++ b/pkg/metrics/leader_election.go @@ -0,0 +1,87 @@ +package metrics + +import ( + "context" + "fmt" + "os" + "sync" + "time" + + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + coordinationv1client "k8s.io/client-go/kubernetes/typed/coordination/v1" + "k8s.io/client-go/rest" + "k8s.io/client-go/tools/leaderelection" + "k8s.io/client-go/tools/leaderelection/resourcelock" +) + +const ( + leaseName = "monitoring-plugin-metrics" + leaseDuration = 15 * time.Second + leaseRenew = 10 * time.Second + leaseRetry = 2 * time.Second +) + +// startLeaderElection sets up Lease-based leader election for the alerts +// effective metric. Returns a thread-safe isLeader callback. +func startLeaderElection(ctx context.Context, kubeConfig *rest.Config, namespace string) (func() bool, error) { + coordClient, err := coordinationv1client.NewForConfig(kubeConfig) + if err != nil { + return nil, fmt.Errorf("create coordination client: %w", err) + } + + identity, err := os.Hostname() + if err != nil { + return nil, fmt.Errorf("get hostname: %w", err) + } + + lock := &resourcelock.LeaseLock{ + LeaseMeta: metav1.ObjectMeta{ + Name: leaseName, + Namespace: namespace, + }, + Client: coordClient, + LockConfig: resourcelock.ResourceLockConfig{ + Identity: identity, + }, + } + + var mu sync.Mutex + isLeading := false + + isLeader := func() bool { + mu.Lock() + defer mu.Unlock() + return isLeading + } + + le, err := leaderelection.NewLeaderElector(leaderelection.LeaderElectionConfig{ + Lock: lock, + LeaseDuration: leaseDuration, + RenewDeadline: leaseRenew, + RetryPeriod: leaseRetry, + ReleaseOnCancel: true, + Callbacks: leaderelection.LeaderCallbacks{ + OnStartedLeading: func(_ context.Context) { + mu.Lock() + isLeading = true + mu.Unlock() + metricsLog.Info("became leader for alert management metrics") + }, + OnStoppedLeading: func() { + mu.Lock() + isLeading = false + mu.Unlock() + metricsLog.Info("lost leadership for alert management metrics") + }, + OnNewLeader: func(identity string) { + metricsLog.Infof("new leader for alert management metrics: %s", identity) + }, + }, + }) + if err != nil { + return nil, fmt.Errorf("create leader elector: %w", err) + } + + go le.Run(ctx) + return isLeader, nil +} diff --git a/test/e2e/alerts_effective_metric_test.go b/test/e2e/alerts_effective_metric_test.go new file mode 100644 index 000000000..cd59897f7 --- /dev/null +++ b/test/e2e/alerts_effective_metric_test.go @@ -0,0 +1,313 @@ +package e2e + +import ( + "context" + "fmt" + "io" + "net/http" + "strings" + "testing" + "time" + + "k8s.io/apimachinery/pkg/util/wait" + + "github.com/openshift/monitoring-plugin/pkg/k8s" + "github.com/openshift/monitoring-plugin/pkg/metrics" + "github.com/openshift/monitoring-plugin/test/e2e/framework" +) + +func fetchMetrics(f *framework.Framework) (string, error) { + resp, err := f.HTTPClient().Get(f.PluginURL + "/metrics") + if err != nil { + return "", err + } + defer resp.Body.Close() + + if resp.StatusCode != http.StatusOK { + return "", fmt.Errorf("unexpected status code: %d", resp.StatusCode) + } + + body, err := io.ReadAll(resp.Body) + if err != nil { + return "", err + } + return string(body), nil +} + +func parseMetricLines(body string) []string { + var lines []string + for _, line := range strings.Split(body, "\n") { + if strings.HasPrefix(line, metrics.MetricName+"{") { + lines = append(lines, line) + } + } + return lines +} + +func extractLabel(metricLine, labelName string) string { + key := labelName + `="` + idx := strings.Index(metricLine, key) + if idx < 0 { + return "" + } + start := idx + len(key) + end := strings.Index(metricLine[start:], `"`) + if end < 0 { + return "" + } + return metricLine[start : start+end] +} + +// TestMetricEndpointExposesEffectiveMetric +// Verifies that the /metrics endpoint exposes alerts_effective_active_at_timestamp_seconds. +func TestMetricEndpointExposesEffectiveMetric(t *testing.T) { + f, err := framework.New() + if err != nil { + t.Fatalf("Failed to create framework: %v", err) + } + + ctx := context.Background() + var metricBody string + + err = wait.PollUntilContextTimeout(ctx, 5*time.Second, 2*time.Minute, true, func(ctx context.Context) (bool, error) { + body, err := fetchMetrics(f) + if err != nil { + t.Logf("Failed to fetch metrics: %v", err) + return false, nil + } + + if !strings.Contains(body, metrics.MetricName) { + t.Logf("Metric %s not found yet (leader election may be in progress)", metrics.MetricName) + return false, nil + } + + metricBody = body + return true, nil + }) + if err != nil { + t.Fatalf("Timeout waiting for metric to appear: %v", err) + } + + if !strings.Contains(metricBody, "# HELP "+metrics.MetricName) { + t.Error("Missing HELP line for metric") + } + if !strings.Contains(metricBody, "# TYPE "+metrics.MetricName+" gauge") { + t.Error("Missing or incorrect TYPE line for metric (expected gauge)") + } + + lines := parseMetricLines(metricBody) + if len(lines) == 0 { + t.Fatal("Expected at least one metric series, got none") + } + + t.Logf("Found %d metric series for %s", len(lines), metrics.MetricName) +} + +// TestMetricSeriesHaveRequiredLabels +// Verifies every metric series has alertname, alertstate, openshift_io_alert_source, +// openshift_io_alert_backend, and a valid timestamp value. +func TestMetricSeriesHaveRequiredLabels(t *testing.T) { + f, err := framework.New() + if err != nil { + t.Fatalf("Failed to create framework: %v", err) + } + + ctx := context.Background() + var lines []string + + err = wait.PollUntilContextTimeout(ctx, 5*time.Second, 2*time.Minute, true, func(ctx context.Context) (bool, error) { + body, err := fetchMetrics(f) + if err != nil { + t.Logf("Failed to fetch metrics: %v", err) + return false, nil + } + lines = parseMetricLines(body) + return len(lines) > 0, nil + }) + if err != nil { + t.Fatalf("Timeout waiting for metric series: %v", err) + } + + requiredLabels := []string{ + "alertname", + "alertstate", + k8s.AlertSourceLabel, + k8s.AlertBackendLabel, + } + + for i, line := range lines { + for _, label := range requiredLabels { + val := extractLabel(line, label) + if val == "" { + t.Errorf("Series %d missing required label %q: %s", i, label, line) + } + } + + state := extractLabel(line, "alertstate") + validStates := map[string]bool{"firing": true, "pending": true, "silenced": true, "suppressed": true} + if !validStates[state] { + t.Errorf("Series %d has unexpected alertstate=%q: %s", i, state, line) + } + + parts := strings.Split(line, " ") + if len(parts) < 2 { + t.Errorf("Series %d has no value: %s", i, line) + continue + } + var ts float64 + if _, err := fmt.Sscanf(parts[len(parts)-1], "%g", &ts); err != nil { + t.Errorf("Series %d has unparseable value %q: %v", i, parts[len(parts)-1], err) + continue + } + if ts < 9.46e+08 { + t.Errorf("Series %d has suspiciously low timestamp value: %g (before year 2000)", i, ts) + } + } + + t.Logf("All %d series have required labels and valid values", len(lines)) +} + +// TestMetricIncludesClassificationLabels +// Verifies that all metric series have classification labels +// (openshift_io_alert_rule_component and openshift_io_alert_rule_layer). +func TestMetricIncludesClassificationLabels(t *testing.T) { + f, err := framework.New() + if err != nil { + t.Fatalf("Failed to create framework: %v", err) + } + + ctx := context.Background() + var lines []string + + err = wait.PollUntilContextTimeout(ctx, 5*time.Second, 2*time.Minute, true, func(ctx context.Context) (bool, error) { + body, err := fetchMetrics(f) + if err != nil { + return false, nil + } + lines = parseMetricLines(body) + return len(lines) > 0, nil + }) + if err != nil { + t.Fatalf("Timeout waiting for metric series: %v", err) + } + + for i, line := range lines { + if extractLabel(line, k8s.AlertRuleClassificationComponentKey) == "" { + t.Errorf("Series %d missing %s label: %s", i, k8s.AlertRuleClassificationComponentKey, line) + } + if extractLabel(line, k8s.AlertRuleClassificationLayerKey) == "" { + t.Errorf("Series %d missing %s label: %s", i, k8s.AlertRuleClassificationLayerKey, line) + } + } + + t.Logf("All %d series have classification labels (component + layer)", len(lines)) +} + +// TestMetricExcludesAnnotations +// Verifies that annotations (summary, description, runbook_url) are not +// included as metric labels. +func TestMetricExcludesAnnotations(t *testing.T) { + f, err := framework.New() + if err != nil { + t.Fatalf("Failed to create framework: %v", err) + } + + ctx := context.Background() + var lines []string + + err = wait.PollUntilContextTimeout(ctx, 5*time.Second, 2*time.Minute, true, func(ctx context.Context) (bool, error) { + body, err := fetchMetrics(f) + if err != nil { + return false, nil + } + lines = parseMetricLines(body) + return len(lines) > 0, nil + }) + if err != nil { + t.Fatalf("Timeout waiting for metric series: %v", err) + } + + annotationLabels := []string{"summary", "description", "runbook_url"} + + for i, line := range lines { + for _, annLabel := range annotationLabels { + if extractLabel(line, annLabel) != "" { + t.Errorf("Series %d contains annotation label %q (annotations should be excluded): %s", + i, annLabel, line) + } + } + } + + t.Logf("Verified %d series - none contain annotation labels", len(lines)) +} + +// TestMetricActiveAtTimestampsAreReasonable +// Verifies that activeAt timestamps are not too recent. +func TestMetricActiveAtTimestampsAreReasonable(t *testing.T) { + f, err := framework.New() + if err != nil { + t.Fatalf("Failed to create framework: %v", err) + } + + ctx := context.Background() + var lines []string + + err = wait.PollUntilContextTimeout(ctx, 5*time.Second, 2*time.Minute, true, func(ctx context.Context) (bool, error) { + body, err := fetchMetrics(f) + if err != nil { + return false, nil + } + lines = parseMetricLines(body) + return len(lines) > 0, nil + }) + if err != nil { + t.Fatalf("Timeout waiting for metric series: %v", err) + } + + now := float64(time.Now().Unix()) + fiveMinutesAgo := now - 300 + + recentCount := 0 + for _, line := range lines { + alertname := extractLabel(line, "alertname") + if alertname == "Watchdog" { + continue + } + + parts := strings.Split(line, " ") + if len(parts) < 2 { + continue + } + valueStr := parts[len(parts)-1] + + var ts float64 + if _, err := fmt.Sscanf(valueStr, "%e", &ts); err != nil { + if _, err := fmt.Sscanf(valueStr, "%f", &ts); err != nil { + continue + } + } + + if ts > fiveMinutesAgo { + recentCount++ + t.Logf("WARN: %s has activeAt within last 5 minutes (ts=%.0f, now=%.0f)", alertname, ts, now) + } + } + + totalNonWatchdog := 0 + for _, line := range lines { + if extractLabel(line, "alertname") != "Watchdog" { + totalNonWatchdog++ + } + } + + if totalNonWatchdog > 0 { + recentPct := float64(recentCount) / float64(totalNonWatchdog) * 100 + if recentPct > 80 { + t.Errorf("%.0f%% of alerts (%d/%d) have activeAt within last 5 minutes — "+ + "likely using Alertmanager startsAt instead of Prometheus activeAt", + recentPct, recentCount, totalNonWatchdog) + } + } + + t.Logf("Timestamp check: %d/%d non-Watchdog alerts have recent activeAt", recentCount, totalNonWatchdog) +}