From 2a6deb1d790323e1e230275ce9bff9ceb9690d56 Mon Sep 17 00:00:00 2001 From: nityam Date: Wed, 22 Apr 2026 01:18:31 +0530 Subject: [PATCH] feat(monitoring): declarative alert rules engine --- .../mofa-monitoring/src/alerts/evaluator.rs | 419 ++++++++++++++++++ crates/mofa-monitoring/src/alerts/event.rs | 164 +++++++ crates/mofa-monitoring/src/alerts/mod.rs | 53 +++ crates/mofa-monitoring/src/alerts/notifier.rs | 213 +++++++++ crates/mofa-monitoring/src/alerts/rule.rs | 329 ++++++++++++++ crates/mofa-monitoring/src/alerts/source.rs | 150 +++++++ crates/mofa-monitoring/src/lib.rs | 1 + docs/monitoring-alerts.md | 222 ++++++++++ 8 files changed, 1551 insertions(+) create mode 100644 crates/mofa-monitoring/src/alerts/evaluator.rs create mode 100644 crates/mofa-monitoring/src/alerts/event.rs create mode 100644 crates/mofa-monitoring/src/alerts/mod.rs create mode 100644 crates/mofa-monitoring/src/alerts/notifier.rs create mode 100644 crates/mofa-monitoring/src/alerts/rule.rs create mode 100644 crates/mofa-monitoring/src/alerts/source.rs create mode 100644 docs/monitoring-alerts.md diff --git a/crates/mofa-monitoring/src/alerts/evaluator.rs b/crates/mofa-monitoring/src/alerts/evaluator.rs new file mode 100644 index 000000000..d58835924 --- /dev/null +++ b/crates/mofa-monitoring/src/alerts/evaluator.rs @@ -0,0 +1,419 @@ +//! Alert rule evaluator. +//! +//! The [`Evaluator`] owns a set of rules and a [`MetricSource`]. On each +//! call to [`Evaluator::evaluate`] it resolves every rule against the +//! current metric snapshot and emits the state-transition events. +//! +//! State tracking matches Prometheus semantics: a rule is `Pending` as +//! soon as its condition starts matching, transitions to `Firing` once +//! the condition has held continuously for at least `for_duration`, and +//! emits a `Resolved` event the first tick the condition stops matching +//! after having fired. + +use std::collections::HashMap; +use std::collections::VecDeque; +use std::sync::{Arc, Mutex}; +use std::time::{Duration, SystemTime}; + +use super::event::{AlertEvent, AlertState}; +use super::rule::{Condition, Rule}; +use super::source::{MetricSample, MetricSource}; + +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +enum RuleState { + Inactive, + Pending { + since: SystemTime, + }, + Firing, +} + +#[derive(Debug)] +struct RuleRuntime { + state: RuleState, + /// Sliding window of samples for rate-of-change conditions. Each entry + /// is `(observed_at, cumulative_value)`. The deque is trimmed per + /// evaluation to the configured window. + window: VecDeque<(SystemTime, f64)>, +} + +impl RuleRuntime { + fn new() -> Self { + Self { + state: RuleState::Inactive, + window: VecDeque::new(), + } + } +} + +/// Configuration knobs for the evaluator itself. +#[derive(Debug, Clone, PartialEq, Eq)] +pub struct EvaluatorConfig { + /// Maximum number of samples held per rate-of-change window before + /// older entries are evicted, regardless of their age. Guards + /// unbounded memory growth if a rule's `window` is misconfigured. + pub max_window_samples: usize, +} + +impl Default for EvaluatorConfig { + fn default() -> Self { + Self { + max_window_samples: 1024, + } + } +} + +pub struct Evaluator { + rules: Vec, + runtimes: Mutex>, + source: Arc, + config: EvaluatorConfig, +} + +impl Evaluator { + pub fn new(rules: Vec, source: Arc) -> Self { + Self::with_config(rules, source, EvaluatorConfig::default()) + } + + pub fn with_config(rules: Vec, source: Arc, config: EvaluatorConfig) -> Self { + let mut runtimes = HashMap::with_capacity(rules.len()); + for r in &rules { + runtimes.insert(r.name.clone(), RuleRuntime::new()); + } + Self { + rules, + runtimes: Mutex::new(runtimes), + source, + config, + } + } + + pub fn rules(&self) -> &[Rule] { + &self.rules + } + + /// Evaluate every rule once. Returns the events produced on this + /// tick — typically a small handful, since only state transitions + /// emit. + pub async fn evaluate(&self) -> Vec { + let now = SystemTime::now(); + let mut events = Vec::new(); + + for rule in &self.rules { + let (matched, observed) = match &rule.condition { + Condition::Threshold { + metric, + op, + threshold, + } => match self.source.sample(metric).await { + Some(sample) => (op.apply(sample.value, *threshold), Some(sample.value)), + None => (false, None), + }, + Condition::RateOfChange { + metric, + op, + threshold, + window, + } => match self.source.sample(metric).await { + Some(sample) => self.evaluate_rate(rule, sample, *op, *threshold, *window, now), + None => (false, None), + }, + Condition::Absent { metric, staleness } => match self.source.sample(metric).await { + Some(sample) => { + let is_stale = now + .duration_since(sample.observed_at) + .map(|d| d > *staleness) + .unwrap_or(false); + (is_stale, Some(sample.value)) + } + None => (true, None), + }, + }; + + if let Some(event) = self.transition(rule, matched, observed, now) { + events.push(event); + } + } + events + } + + fn evaluate_rate( + &self, + rule: &Rule, + sample: MetricSample, + op: super::rule::ComparisonOp, + threshold: f64, + window: Duration, + now: SystemTime, + ) -> (bool, Option) { + let mut runtimes = self.runtimes.lock().unwrap(); + let rt = runtimes + .entry(rule.name.clone()) + .or_insert_with(RuleRuntime::new); + + rt.window.push_back((sample.observed_at, sample.value)); + // Trim by age first. + while let Some(&(t, _)) = rt.window.front() { + let too_old = now + .duration_since(t) + .map(|d| d > window) + .unwrap_or(false); + if too_old { + rt.window.pop_front(); + } else { + break; + } + } + // Then by count. + while rt.window.len() > self.config.max_window_samples { + rt.window.pop_front(); + } + + if rt.window.len() < 2 { + return (false, Some(sample.value)); + } + let (t0, v0) = rt.window.front().copied().unwrap(); + let (t1, v1) = rt.window.back().copied().unwrap(); + let secs = match t1.duration_since(t0) { + Ok(d) if !d.is_zero() => d.as_secs_f64(), + _ => return (false, Some(sample.value)), + }; + let rate = (v1 - v0) / secs; + (op.apply(rate, threshold), Some(rate)) + } + + fn transition( + &self, + rule: &Rule, + matched: bool, + observed: Option, + now: SystemTime, + ) -> Option { + let mut runtimes = self.runtimes.lock().unwrap(); + let rt = runtimes + .entry(rule.name.clone()) + .or_insert_with(RuleRuntime::new); + + let next = match (rt.state, matched) { + (RuleState::Inactive, true) => { + if rule.for_duration.is_zero() { + RuleState::Firing + } else { + RuleState::Pending { since: now } + } + } + (RuleState::Pending { since }, true) => { + let elapsed = now.duration_since(since).unwrap_or(Duration::ZERO); + if elapsed >= rule.for_duration { + RuleState::Firing + } else { + RuleState::Pending { since } + } + } + (_, false) => RuleState::Inactive, + (RuleState::Firing, true) => RuleState::Firing, + }; + + let prev = rt.state; + rt.state = next; + + // Only state transitions emit events. + let emit_state = match (prev, next) { + (RuleState::Inactive, RuleState::Pending { .. }) => Some(AlertState::Pending), + (RuleState::Inactive, RuleState::Firing) => Some(AlertState::Firing), + (RuleState::Pending { .. }, RuleState::Firing) => Some(AlertState::Firing), + (RuleState::Firing, RuleState::Inactive) => Some(AlertState::Resolved), + (RuleState::Pending { .. }, RuleState::Inactive) => None, + _ => None, + }; + + emit_state.map(|state| AlertEvent { + rule_name: rule.name.clone(), + state, + severity: rule.severity, + condition: rule.condition.clone(), + observed_value: observed, + labels: rule.labels.clone(), + annotations: rule.annotations.clone(), + at: now, + }) + } +} + +#[cfg(test)] +mod tests { + use super::super::rule::{ComparisonOp, Condition, Rule, Severity}; + use super::super::source::InMemoryMetricSource; + use super::*; + + fn threshold_rule(name: &str, metric: &str, thr: f64) -> Rule { + Rule::new( + name, + "", + Severity::Warning, + Condition::Threshold { + metric: metric.into(), + op: ComparisonOp::Gt, + threshold: thr, + }, + ) + } + + #[tokio::test] + async fn evaluator_fires_threshold_immediately_when_for_zero() { + let src = Arc::new(InMemoryMetricSource::new()); + src.set("x", 10.0); + let ev = Evaluator::new(vec![threshold_rule("r1", "x", 5.0)], src.clone()); + let out = ev.evaluate().await; + assert_eq!(out.len(), 1); + assert_eq!(out[0].state, AlertState::Firing); + assert_eq!(out[0].observed_value, Some(10.0)); + } + + #[tokio::test] + async fn evaluator_does_not_fire_when_condition_false() { + let src = Arc::new(InMemoryMetricSource::new()); + src.set("x", 1.0); + let ev = Evaluator::new(vec![threshold_rule("r1", "x", 5.0)], src); + let out = ev.evaluate().await; + assert!(out.is_empty()); + } + + #[tokio::test] + async fn evaluator_emits_resolved_once_condition_clears() { + let src = Arc::new(InMemoryMetricSource::new()); + src.set("x", 10.0); + let ev = Evaluator::new(vec![threshold_rule("r1", "x", 5.0)], src.clone()); + let fired = ev.evaluate().await; + assert_eq!(fired[0].state, AlertState::Firing); + src.set("x", 1.0); + let resolved = ev.evaluate().await; + assert_eq!(resolved[0].state, AlertState::Resolved); + } + + #[tokio::test] + async fn evaluator_emits_pending_then_firing_when_for_nonzero() { + let src = Arc::new(InMemoryMetricSource::new()); + src.set("x", 10.0); + let rule = threshold_rule("r1", "x", 5.0).with_for(Duration::from_millis(50)); + let ev = Evaluator::new(vec![rule], src.clone()); + + let first = ev.evaluate().await; + assert_eq!(first[0].state, AlertState::Pending); + + tokio::time::sleep(Duration::from_millis(60)).await; + let second = ev.evaluate().await; + assert_eq!(second[0].state, AlertState::Firing); + } + + #[tokio::test] + async fn evaluator_absent_rule_fires_when_metric_missing() { + let src = Arc::new(InMemoryMetricSource::new()); + let rule = Rule::new( + "heartbeat-gone", + "", + Severity::Critical, + Condition::Absent { + metric: "heartbeat".into(), + staleness: Duration::from_secs(5), + }, + ); + let ev = Evaluator::new(vec![rule], src); + let out = ev.evaluate().await; + assert_eq!(out[0].state, AlertState::Firing); + } + + #[tokio::test] + async fn evaluator_absent_rule_fires_on_stale_sample() { + let src = Arc::new(InMemoryMetricSource::new()); + let earlier = super::super::source::ago(Duration::from_secs(30)); + src.set_at("heartbeat", 1.0, earlier); + let rule = Rule::new( + "heartbeat-stale", + "", + Severity::Critical, + Condition::Absent { + metric: "heartbeat".into(), + staleness: Duration::from_secs(5), + }, + ); + let ev = Evaluator::new(vec![rule], src); + let out = ev.evaluate().await; + assert_eq!(out[0].state, AlertState::Firing); + } + + #[tokio::test] + async fn evaluator_absent_rule_silent_when_fresh() { + let src = Arc::new(InMemoryMetricSource::new()); + src.set("heartbeat", 1.0); + let rule = Rule::new( + "heartbeat-ok", + "", + Severity::Critical, + Condition::Absent { + metric: "heartbeat".into(), + staleness: Duration::from_secs(60), + }, + ); + let ev = Evaluator::new(vec![rule], src); + let out = ev.evaluate().await; + assert!(out.is_empty()); + } + + #[tokio::test] + async fn evaluator_rate_of_change_positive() { + let src = Arc::new(InMemoryMetricSource::new()); + let rule = Rule::new( + "rate-up", + "", + Severity::Info, + Condition::RateOfChange { + metric: "reqs".into(), + op: ComparisonOp::Gt, + threshold: 5.0, + window: Duration::from_secs(60), + }, + ); + let ev = Evaluator::new(vec![rule], src.clone()); + + let t0 = super::super::source::ago(Duration::from_secs(10)); + src.set_at("reqs", 100.0, t0); + ev.evaluate().await; + + src.set("reqs", 200.0); + let out = ev.evaluate().await; + assert_eq!(out[0].state, AlertState::Firing); + } + + #[tokio::test] + async fn evaluator_silent_on_missing_primary_metric() { + let src = Arc::new(InMemoryMetricSource::new()); + let ev = Evaluator::new(vec![threshold_rule("r", "nope", 0.0)], src); + let out = ev.evaluate().await; + assert!(out.is_empty()); + } + + #[tokio::test] + async fn evaluator_does_not_double_fire() { + let src = Arc::new(InMemoryMetricSource::new()); + src.set("x", 10.0); + let ev = Evaluator::new(vec![threshold_rule("r", "x", 5.0)], src); + let first = ev.evaluate().await; + let second = ev.evaluate().await; + assert_eq!(first[0].state, AlertState::Firing); + assert!(second.is_empty(), "no event expected while state is stable"); + } + + #[tokio::test] + async fn evaluator_pending_then_cleared_emits_no_event() { + let src = Arc::new(InMemoryMetricSource::new()); + src.set("x", 10.0); + let rule = threshold_rule("r", "x", 5.0).with_for(Duration::from_secs(60)); + let ev = Evaluator::new(vec![rule], src.clone()); + let first = ev.evaluate().await; + assert_eq!(first[0].state, AlertState::Pending); + src.set("x", 1.0); + let second = ev.evaluate().await; + assert!(second.is_empty(), "pending → inactive is silent"); + } +} diff --git a/crates/mofa-monitoring/src/alerts/event.rs b/crates/mofa-monitoring/src/alerts/event.rs new file mode 100644 index 000000000..b73b5799e --- /dev/null +++ b/crates/mofa-monitoring/src/alerts/event.rs @@ -0,0 +1,164 @@ +//! Alert events. +//! +//! An [`AlertEvent`] is the output of the evaluator — a structured record +//! describing a state transition for a given rule. Notifiers consume +//! events; aggregators (dashboards, audit logs) may aggregate them over +//! time. +//! +//! Events carry enough context (labels, annotations, observed value) to +//! render directly in a notifier payload without re-resolving the rule. + +use super::rule::{Condition, Severity}; +use serde::{Deserialize, Serialize}; +use std::collections::HashMap; +use std::time::SystemTime; + +/// State of a rule at the point the event was generated. +#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize)] +#[non_exhaustive] +pub enum AlertState { + /// The rule has just transitioned from `Pending`/`Resolved` into + /// active firing — `for_duration` has elapsed. + Firing, + /// The rule condition was true on this tick but `for_duration` has + /// not yet elapsed. Notifiers typically ignore these; dashboards may + /// show them as a "soaking" badge. + Pending, + /// The rule fired previously and has now stopped matching. + Resolved, +} + +impl AlertState { + pub fn as_str(&self) -> &'static str { + match self { + AlertState::Firing => "firing", + AlertState::Pending => "pending", + AlertState::Resolved => "resolved", + } + } +} + +impl std::fmt::Display for AlertState { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.write_str(self.as_str()) + } +} + +/// A single alert event emitted by the evaluator. +#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)] +pub struct AlertEvent { + /// Name of the rule that emitted this event (unique within a + /// `RuleSet`). + pub rule_name: String, + /// State transition this event reports. + pub state: AlertState, + /// Severity copied from the rule for notifier convenience. + pub severity: Severity, + /// Snapshot of the rule condition at emission time. Kept by value so + /// events remain meaningful after a rule is edited or removed. + pub condition: Condition, + /// Observed value of the primary metric at the time the event fired. + /// `None` for conditions that don't have a single scalar (e.g. + /// `Absent`). + pub observed_value: Option, + /// Merged label set — rule labels plus any evaluator-supplied labels. + pub labels: HashMap, + /// Annotations copied from the rule. + pub annotations: HashMap, + /// Wall-clock timestamp of the event. + pub at: SystemTime, +} + +impl AlertEvent { + /// Convenience for `state == Firing`. + pub fn is_firing(&self) -> bool { + self.state == AlertState::Firing + } + + /// Convenience for `state == Resolved`. + pub fn is_resolved(&self) -> bool { + self.state == AlertState::Resolved + } + + /// Construct a short one-line summary suitable for logs or chat + /// notifiers. + pub fn short_summary(&self) -> String { + let value = self + .observed_value + .map(|v| format!(" (observed={v})")) + .unwrap_or_default(); + format!( + "[{sev}] {rule}: {state}{value} — {cond}", + sev = self.severity, + rule = self.rule_name, + state = self.state, + value = value, + cond = self.condition.summary() + ) + } +} + +#[cfg(test)] +mod tests { + use super::super::rule::{ComparisonOp, Condition, Severity}; + use super::*; + + fn sample_event(state: AlertState, value: Option) -> AlertEvent { + AlertEvent { + rule_name: "high-error-rate".into(), + state, + severity: Severity::Warning, + condition: Condition::Threshold { + metric: "err".into(), + op: ComparisonOp::Gt, + threshold: 0.05, + }, + observed_value: value, + labels: HashMap::from([("env".into(), "prod".into())]), + annotations: HashMap::new(), + at: SystemTime::UNIX_EPOCH, + } + } + + #[test] + fn alert_state_display() { + assert_eq!(AlertState::Firing.to_string(), "firing"); + assert_eq!(AlertState::Pending.to_string(), "pending"); + assert_eq!(AlertState::Resolved.to_string(), "resolved"); + } + + #[test] + fn is_firing_and_is_resolved_disjoint() { + let e = sample_event(AlertState::Firing, Some(0.1)); + assert!(e.is_firing()); + assert!(!e.is_resolved()); + + let e = sample_event(AlertState::Resolved, Some(0.01)); + assert!(!e.is_firing()); + assert!(e.is_resolved()); + } + + #[test] + fn short_summary_includes_observed_value() { + let e = sample_event(AlertState::Firing, Some(0.12)); + let s = e.short_summary(); + assert!(s.contains("0.12")); + assert!(s.contains("high-error-rate")); + assert!(s.contains("warning")); + } + + #[test] + fn short_summary_without_value() { + let e = sample_event(AlertState::Firing, None); + let s = e.short_summary(); + assert!(!s.contains("observed")); + } + + #[test] + fn alert_event_json_roundtrip() { + let e = sample_event(AlertState::Pending, Some(0.05)); + let s = serde_json::to_string(&e).unwrap(); + let parsed: AlertEvent = serde_json::from_str(&s).unwrap(); + assert_eq!(parsed, e); + } +} diff --git a/crates/mofa-monitoring/src/alerts/mod.rs b/crates/mofa-monitoring/src/alerts/mod.rs new file mode 100644 index 000000000..a65b9fe30 --- /dev/null +++ b/crates/mofa-monitoring/src/alerts/mod.rs @@ -0,0 +1,53 @@ +//! Alert rules engine for `mofa-monitoring`. +//! +//! Declarative SLO-style alerting: define [`Rule`]s over metric names, +//! plug in a [`MetricSource`] (defaulting to the existing +//! `MetricsCollector` or a Prometheus scrape adapter), run the +//! [`Evaluator`] on a tick, and fan the resulting [`AlertEvent`]s through +//! a [`Notifier`] or [`CompositeNotifier`]. +//! +//! ```text +//! ┌──────────────┐ sample() ┌─────────────┐ +//! │ MetricSource │ ─────────────── │ Evaluator │ +//! └──────────────┘ └──────┬──────┘ +//! │ AlertEvent +//! ▼ +//! ┌──────────────┐ +//! │ Notifier │ +//! │ (composite) │ +//! └──┬────────┬──┘ +//! │ │ +//! ▼ ▼ +//! LogNotifier CollectingNotifier +//! ``` +//! +//! ### State machine (matches Prometheus) +//! +//! ```text +//! matched=true, matched=true, +//! for_duration=0 for=Δ elapsed +//! Inactive ──────────────────── Firing ◀────── Pending +//! │ │ ▲ +//! │ matched=true, for>0 │ matched=false │ +//! ▼ ▼ │ matched=true +//! Pending ───── matched=false ─── Inactive ──────┘ +//! ``` +//! +//! - `Inactive → Pending` — silent. +//! - `Pending → Firing` — emits `AlertState::Firing`. +//! - `Firing → Inactive` — emits `AlertState::Resolved`. +//! - `Pending → Inactive` — silent (condition cleared before soak). +//! - `Inactive → Firing` (when `for_duration == 0`) — emits +//! `AlertState::Firing` directly. + +pub mod evaluator; +pub mod event; +pub mod notifier; +pub mod rule; +pub mod source; + +pub use evaluator::{Evaluator, EvaluatorConfig}; +pub use event::{AlertEvent, AlertState}; +pub use notifier::{CollectingNotifier, CompositeNotifier, LogNotifier, Notifier}; +pub use rule::{ComparisonOp, Condition, Rule, Severity}; +pub use source::{InMemoryMetricSource, MetricSample, MetricSource}; diff --git a/crates/mofa-monitoring/src/alerts/notifier.rs b/crates/mofa-monitoring/src/alerts/notifier.rs new file mode 100644 index 000000000..a3276f3e4 --- /dev/null +++ b/crates/mofa-monitoring/src/alerts/notifier.rs @@ -0,0 +1,213 @@ +//! Alert notifiers. +//! +//! A [`Notifier`] receives emitted [`AlertEvent`]s and delivers them to an +//! external system: log, webhook, chat channel, email, on-call rotation. +//! +//! Two in-tree notifiers ship with this PR: +//! +//! - [`LogNotifier`] — writes events to the `tracing` system. Intended as +//! a default for development and for audit-logging production alongside +//! richer notifiers. +//! - [`CollectingNotifier`] — records every event into an in-memory +//! buffer. Intended for tests and for short-term in-memory dashboards. +//! +//! A [`CompositeNotifier`] fans out to multiple notifiers so a production +//! deployment can log every event while also pushing to a webhook. + +use super::event::AlertEvent; +use async_trait::async_trait; +use std::sync::{Arc, Mutex}; +use tracing::{info, warn}; + +/// Backend-agnostic notifier interface. +#[async_trait] +pub trait Notifier: Send + Sync { + async fn notify(&self, event: &AlertEvent); +} + +/// Writes events through the `tracing` subscriber. `Warning` and +/// `Critical` use `warn!`; `Info` uses `info!`. +#[derive(Debug, Default)] +pub struct LogNotifier; + +impl LogNotifier { + pub fn new() -> Self { + Self + } +} + +#[async_trait] +impl Notifier for LogNotifier { + async fn notify(&self, event: &AlertEvent) { + let summary = event.short_summary(); + match event.severity { + super::rule::Severity::Info => info!(target: "mofa_monitoring::alerts", "{summary}"), + super::rule::Severity::Warning | super::rule::Severity::Critical => { + warn!(target: "mofa_monitoring::alerts", "{summary}") + } + } + } +} + +/// Records every event into an in-memory buffer. The buffer is bounded — +/// oldest entries are dropped once `capacity` is reached. +pub struct CollectingNotifier { + buffer: Mutex>, + capacity: usize, +} + +impl CollectingNotifier { + pub fn with_capacity(capacity: usize) -> Self { + Self { + buffer: Mutex::new(std::collections::VecDeque::with_capacity(capacity.min(1024))), + capacity, + } + } + + /// Snapshot of all events currently held. + pub fn snapshot(&self) -> Vec { + self.buffer.lock().unwrap().iter().cloned().collect() + } + + /// Number of events currently buffered. + pub fn len(&self) -> usize { + self.buffer.lock().unwrap().len() + } + + pub fn is_empty(&self) -> bool { + self.len() == 0 + } + + /// Drop all buffered events. + pub fn clear(&self) { + self.buffer.lock().unwrap().clear(); + } +} + +#[async_trait] +impl Notifier for CollectingNotifier { + async fn notify(&self, event: &AlertEvent) { + let mut buf = self.buffer.lock().unwrap(); + if buf.len() == self.capacity { + buf.pop_front(); + } + buf.push_back(event.clone()); + } +} + +/// Fans an event out to every wrapped notifier. Failures are swallowed — +/// fan-out is best-effort, a single broken notifier must not block +/// delivery to the others. Compose with a retry/circuit-breaker layer at +/// the call site if stronger guarantees are needed. +pub struct CompositeNotifier { + inner: Vec>, +} + +impl CompositeNotifier { + pub fn new(inner: Vec>) -> Self { + Self { inner } + } + + pub fn len(&self) -> usize { + self.inner.len() + } + + pub fn is_empty(&self) -> bool { + self.inner.is_empty() + } +} + +#[async_trait] +impl Notifier for CompositeNotifier { + async fn notify(&self, event: &AlertEvent) { + for n in &self.inner { + n.notify(event).await; + } + } +} + +#[cfg(test)] +mod tests { + use super::super::event::AlertState; + use super::super::rule::{ComparisonOp, Condition, Severity}; + use super::*; + use std::collections::HashMap; + use std::time::SystemTime; + + fn sample() -> AlertEvent { + AlertEvent { + rule_name: "r".into(), + state: AlertState::Firing, + severity: Severity::Warning, + condition: Condition::Threshold { + metric: "x".into(), + op: ComparisonOp::Gt, + threshold: 1.0, + }, + observed_value: Some(2.0), + labels: HashMap::new(), + annotations: HashMap::new(), + at: SystemTime::UNIX_EPOCH, + } + } + + #[tokio::test] + async fn collecting_notifier_records_events() { + let n = CollectingNotifier::with_capacity(10); + assert!(n.is_empty()); + n.notify(&sample()).await; + assert_eq!(n.len(), 1); + assert_eq!(n.snapshot()[0].rule_name, "r"); + } + + #[tokio::test] + async fn collecting_notifier_capacity_evicts_oldest() { + let n = CollectingNotifier::with_capacity(2); + let mut e = sample(); + e.rule_name = "a".into(); + n.notify(&e).await; + e.rule_name = "b".into(); + n.notify(&e).await; + e.rule_name = "c".into(); + n.notify(&e).await; + + let snap = n.snapshot(); + assert_eq!(snap.len(), 2); + assert_eq!(snap[0].rule_name, "b"); + assert_eq!(snap[1].rule_name, "c"); + } + + #[tokio::test] + async fn collecting_notifier_clear() { + let n = CollectingNotifier::with_capacity(10); + n.notify(&sample()).await; + assert_eq!(n.len(), 1); + n.clear(); + assert!(n.is_empty()); + } + + #[tokio::test] + async fn composite_fans_out_to_all_notifiers() { + let a = Arc::new(CollectingNotifier::with_capacity(10)); + let b = Arc::new(CollectingNotifier::with_capacity(10)); + let composite = CompositeNotifier::new(vec![a.clone(), b.clone()]); + composite.notify(&sample()).await; + assert_eq!(a.len(), 1); + assert_eq!(b.len(), 1); + } + + #[tokio::test] + async fn log_notifier_is_infallible() { + let n = LogNotifier::new(); + // No panic / deadlock; we don't assert on the log subscriber + // content because that depends on global state. + n.notify(&sample()).await; + } + + #[tokio::test] + async fn composite_is_empty_len() { + let c = CompositeNotifier::new(vec![]); + assert!(c.is_empty()); + assert_eq!(c.len(), 0); + } +} diff --git a/crates/mofa-monitoring/src/alerts/rule.rs b/crates/mofa-monitoring/src/alerts/rule.rs new file mode 100644 index 000000000..0a9ce4700 --- /dev/null +++ b/crates/mofa-monitoring/src/alerts/rule.rs @@ -0,0 +1,329 @@ +//! Alert rule definitions. +//! +//! A [`Rule`] is a declarative specification of a condition to evaluate +//! against a [`MetricSource`]. Rules are backend-agnostic: they know how to +//! extract a numeric value for a named metric, compare it against a +//! threshold, and emit an [`AlertEvent`] when the condition holds for a +//! configured duration (the `for_duration` field, matching Prometheus +//! `for:` semantics). +//! +//! Supported condition families: +//! +//! - [`Condition::Threshold`] — compare a scalar metric to a constant +//! using [`ComparisonOp`] (`>`, `>=`, `<`, `<=`, `==`, `!=`). +//! - [`Condition::RateOfChange`] — compare the per-second derivative of a +//! counter over a sliding window against a threshold. +//! - [`Condition::Absent`] — fire when a required metric has not been +//! observed within a staleness window. + +use serde::{Deserialize, Serialize}; +use std::collections::HashMap; +use std::time::Duration; + +/// A complete alerting rule. +#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)] +pub struct Rule { + /// Unique rule identifier. Used as the natural key for deduplicating + /// events and tracking active-firing state across evaluations. + pub name: String, + /// Human-readable description (shown in notifier payloads). + pub description: String, + /// Severity of events emitted by this rule. + pub severity: Severity, + /// The condition that must hold for the rule to fire. + pub condition: Condition, + /// The condition must hold continuously for this duration before the + /// rule transitions into the `Firing` state. Zero means fire + /// immediately on first match. + pub for_duration: Duration, + /// Static labels attached to every event emitted by this rule. + /// Merged into the event payload alongside any labels resolved at + /// evaluation time. + pub labels: HashMap, + /// Free-form annotations (human-oriented runbook links, summaries). + pub annotations: HashMap, +} + +impl Rule { + /// Construct a minimal rule. Use the `with_*` builders to add labels + /// and annotations. + pub fn new( + name: impl Into, + description: impl Into, + severity: Severity, + condition: Condition, + ) -> Self { + Self { + name: name.into(), + description: description.into(), + severity, + condition, + for_duration: Duration::ZERO, + labels: HashMap::new(), + annotations: HashMap::new(), + } + } + + #[must_use] + pub fn with_for(mut self, duration: Duration) -> Self { + self.for_duration = duration; + self + } + + #[must_use] + pub fn with_label(mut self, key: impl Into, value: impl Into) -> Self { + self.labels.insert(key.into(), value.into()); + self + } + + #[must_use] + pub fn with_annotation(mut self, key: impl Into, value: impl Into) -> Self { + self.annotations.insert(key.into(), value.into()); + self + } + + /// Returns the primary metric name this rule depends on, if the + /// condition references one. Used for precomputed index lookups. + pub fn primary_metric(&self) -> Option<&str> { + match &self.condition { + Condition::Threshold { metric, .. } + | Condition::RateOfChange { metric, .. } + | Condition::Absent { metric, .. } => Some(metric), + } + } +} + +/// Severity level attached to every event emitted by the rule. +#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash, Serialize, Deserialize)] +#[non_exhaustive] +pub enum Severity { + /// Purely informational — surface in audit trails, don't page. + Info, + /// Degraded behaviour. Investigate during business hours. + Warning, + /// User-visible impact. Page on-call. + Critical, +} + +impl Severity { + pub fn as_str(&self) -> &'static str { + match self { + Severity::Info => "info", + Severity::Warning => "warning", + Severity::Critical => "critical", + } + } +} + +impl std::fmt::Display for Severity { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.write_str(self.as_str()) + } +} + +/// Comparison operator for threshold conditions. +#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize)] +#[non_exhaustive] +pub enum ComparisonOp { + Gt, + Gte, + Lt, + Lte, + Eq, + Neq, +} + +impl ComparisonOp { + /// Apply the comparison `value OP threshold`. + pub fn apply(self, value: f64, threshold: f64) -> bool { + match self { + ComparisonOp::Gt => value > threshold, + ComparisonOp::Gte => value >= threshold, + ComparisonOp::Lt => value < threshold, + ComparisonOp::Lte => value <= threshold, + ComparisonOp::Eq => (value - threshold).abs() < f64::EPSILON, + ComparisonOp::Neq => (value - threshold).abs() >= f64::EPSILON, + } + } + + pub fn as_str(&self) -> &'static str { + match self { + ComparisonOp::Gt => ">", + ComparisonOp::Gte => ">=", + ComparisonOp::Lt => "<", + ComparisonOp::Lte => "<=", + ComparisonOp::Eq => "==", + ComparisonOp::Neq => "!=", + } + } +} + +impl std::fmt::Display for ComparisonOp { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.write_str(self.as_str()) + } +} + +/// The evaluable condition of a rule. +#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)] +#[non_exhaustive] +pub enum Condition { + /// Fire when `metric OP threshold` holds. E.g. "error_rate > 0.05". + Threshold { + metric: String, + op: ComparisonOp, + threshold: f64, + }, + /// Fire when the per-second derivative of a monotonic counter over + /// `window` seconds satisfies `OP threshold`. The evaluator is + /// responsible for maintaining the window samples; see + /// [`super::evaluator`]. + RateOfChange { + metric: String, + op: ComparisonOp, + threshold: f64, + window: Duration, + }, + /// Fire when the metric has not been observed within `staleness` + /// seconds of the evaluation tick — useful for heartbeat / liveness. + Absent { + metric: String, + staleness: Duration, + }, +} + +impl Condition { + /// Human-readable summary, useful for notifier payloads. + pub fn summary(&self) -> String { + match self { + Condition::Threshold { + metric, + op, + threshold, + } => format!("{metric} {op} {threshold}"), + Condition::RateOfChange { + metric, + op, + threshold, + window, + } => format!("rate({metric}[{:?}]) {op} {threshold}", window), + Condition::Absent { metric, staleness } => { + format!("absent({metric}) for {:?}", staleness) + } + } + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn severity_ordering() { + assert!(Severity::Info < Severity::Warning); + assert!(Severity::Warning < Severity::Critical); + } + + #[test] + fn comparison_op_gt() { + assert!(ComparisonOp::Gt.apply(10.0, 5.0)); + assert!(!ComparisonOp::Gt.apply(5.0, 5.0)); + assert!(!ComparisonOp::Gt.apply(1.0, 5.0)); + } + + #[test] + fn comparison_op_gte() { + assert!(ComparisonOp::Gte.apply(10.0, 5.0)); + assert!(ComparisonOp::Gte.apply(5.0, 5.0)); + assert!(!ComparisonOp::Gte.apply(1.0, 5.0)); + } + + #[test] + fn comparison_op_lt_lte() { + assert!(ComparisonOp::Lt.apply(1.0, 5.0)); + assert!(!ComparisonOp::Lt.apply(5.0, 5.0)); + assert!(ComparisonOp::Lte.apply(5.0, 5.0)); + } + + #[test] + fn comparison_op_eq_neq() { + assert!(ComparisonOp::Eq.apply(5.0, 5.0)); + assert!(!ComparisonOp::Eq.apply(5.0001, 5.0)); + assert!(ComparisonOp::Neq.apply(5.0001, 5.0)); + } + + #[test] + fn rule_primary_metric_threshold() { + let r = Rule::new( + "r1", + "test", + Severity::Warning, + Condition::Threshold { + metric: "cpu".into(), + op: ComparisonOp::Gt, + threshold: 0.8, + }, + ); + assert_eq!(r.primary_metric(), Some("cpu")); + } + + #[test] + fn rule_builder_methods() { + let r = Rule::new( + "r1", + "d", + Severity::Info, + Condition::Absent { + metric: "heartbeat".into(), + staleness: Duration::from_secs(60), + }, + ) + .with_for(Duration::from_secs(30)) + .with_label("team", "platform") + .with_annotation("runbook", "https://example/runbook"); + + assert_eq!(r.for_duration, Duration::from_secs(30)); + assert_eq!(r.labels.get("team").unwrap(), "platform"); + assert_eq!(r.annotations.get("runbook").unwrap(), "https://example/runbook"); + } + + #[test] + fn condition_summary_includes_parameters() { + let c = Condition::Threshold { + metric: "err".into(), + op: ComparisonOp::Gte, + threshold: 0.05, + }; + let s = c.summary(); + assert!(s.contains("err")); + assert!(s.contains(">=")); + assert!(s.contains("0.05")); + } + + #[test] + fn rule_json_roundtrip() { + let r = Rule::new( + "r", + "d", + Severity::Critical, + Condition::RateOfChange { + metric: "reqs".into(), + op: ComparisonOp::Gt, + threshold: 100.0, + window: Duration::from_secs(60), + }, + ) + .with_for(Duration::from_secs(120)) + .with_label("env", "prod"); + let s = serde_json::to_string(&r).unwrap(); + let parsed: Rule = serde_json::from_str(&s).unwrap(); + assert_eq!(parsed, r); + } + + #[test] + fn severity_display_and_as_str_agree() { + for s in [Severity::Info, Severity::Warning, Severity::Critical] { + assert_eq!(format!("{s}"), s.as_str()); + } + } +} diff --git a/crates/mofa-monitoring/src/alerts/source.rs b/crates/mofa-monitoring/src/alerts/source.rs new file mode 100644 index 000000000..b242472a8 --- /dev/null +++ b/crates/mofa-monitoring/src/alerts/source.rs @@ -0,0 +1,150 @@ +//! Metric source abstraction. +//! +//! The alert evaluator is parameterised over a [`MetricSource`] trait so +//! it can be wired against the existing `MetricsCollector`, a Prometheus +//! scrape, a test fixture, or any other backend that can return the +//! current value of a named metric. +//! +//! Implementors are expected to be cheap to query — the evaluator polls +//! every tick. Backends that aggregate from a remote scrape should cache +//! the last snapshot and refresh it out-of-band. + +use async_trait::async_trait; +use std::collections::HashMap; +use std::sync::RwLock; +use std::time::{Duration, SystemTime}; + +/// A point-in-time reading of a metric. +#[derive(Debug, Clone, PartialEq)] +pub struct MetricSample { + pub value: f64, + pub observed_at: SystemTime, +} + +/// Backend-agnostic metric lookup used by the evaluator. +#[async_trait] +pub trait MetricSource: Send + Sync { + /// Return the latest sample for `metric_name`, if known. + /// + /// Returning `None` has two meanings depending on the rule kind: + /// for `Threshold`/`RateOfChange` rules the evaluator treats `None` + /// as "no data, skip this tick"; for `Absent` rules it is exactly + /// the signal the rule fires on. + async fn sample(&self, metric_name: &str) -> Option; +} + +/// In-memory test/bench metric source. Tests inject samples via +/// [`InMemoryMetricSource::set`]; downstream evaluators read them via the +/// trait. +#[derive(Debug, Default)] +pub struct InMemoryMetricSource { + samples: RwLock>, +} + +impl InMemoryMetricSource { + pub fn new() -> Self { + Self::default() + } + + /// Insert or overwrite a sample, stamping it with the current wall + /// clock. + pub fn set(&self, metric: impl Into, value: f64) { + let mut w = self.samples.write().unwrap(); + w.insert( + metric.into(), + MetricSample { + value, + observed_at: SystemTime::now(), + }, + ); + } + + /// Insert with a caller-specified observation time. Useful for + /// exercising `Absent` rules with staleness windows without waiting + /// real time in a test. + pub fn set_at(&self, metric: impl Into, value: f64, at: SystemTime) { + let mut w = self.samples.write().unwrap(); + w.insert( + metric.into(), + MetricSample { + value, + observed_at: at, + }, + ); + } + + /// Remove a metric entirely. + pub fn forget(&self, metric: &str) { + let mut w = self.samples.write().unwrap(); + w.remove(metric); + } + + /// How many metrics are currently tracked. Diagnostic helper. + pub fn len(&self) -> usize { + self.samples.read().unwrap().len() + } + + pub fn is_empty(&self) -> bool { + self.len() == 0 + } +} + +#[async_trait] +impl MetricSource for InMemoryMetricSource { + async fn sample(&self, metric_name: &str) -> Option { + self.samples.read().unwrap().get(metric_name).cloned() + } +} + +/// Helper that freezes a sample as being observed `ago` before `now`. +pub fn ago(duration: Duration) -> SystemTime { + SystemTime::now() + .checked_sub(duration) + .unwrap_or(SystemTime::UNIX_EPOCH) +} + +#[cfg(test)] +mod tests { + use super::*; + + #[tokio::test] + async fn in_memory_source_reads_and_writes() { + let src = InMemoryMetricSource::new(); + assert!(src.is_empty()); + src.set("cpu", 0.5); + let s = src.sample("cpu").await.unwrap(); + assert_eq!(s.value, 0.5); + } + + #[tokio::test] + async fn in_memory_source_missing_returns_none() { + let src = InMemoryMetricSource::new(); + assert!(src.sample("nope").await.is_none()); + } + + #[tokio::test] + async fn in_memory_source_forget() { + let src = InMemoryMetricSource::new(); + src.set("x", 1.0); + assert!(src.sample("x").await.is_some()); + src.forget("x"); + assert!(src.sample("x").await.is_none()); + } + + #[tokio::test] + async fn in_memory_source_set_at_preserves_timestamp() { + let src = InMemoryMetricSource::new(); + let earlier = ago(Duration::from_secs(120)); + src.set_at("stale", 0.0, earlier); + let s = src.sample("stale").await.unwrap(); + assert_eq!(s.observed_at, earlier); + } + + #[tokio::test] + async fn in_memory_source_overwrites_existing() { + let src = InMemoryMetricSource::new(); + src.set("k", 1.0); + src.set("k", 2.0); + assert_eq!(src.sample("k").await.unwrap().value, 2.0); + } +} diff --git a/crates/mofa-monitoring/src/lib.rs b/crates/mofa-monitoring/src/lib.rs index df81bf540..0047d3f10 100644 --- a/crates/mofa-monitoring/src/lib.rs +++ b/crates/mofa-monitoring/src/lib.rs @@ -23,6 +23,7 @@ //! # } //! ``` +pub mod alerts; mod dashboard; pub mod tracing; diff --git a/docs/monitoring-alerts.md b/docs/monitoring-alerts.md new file mode 100644 index 000000000..8e7745206 --- /dev/null +++ b/docs/monitoring-alerts.md @@ -0,0 +1,222 @@ +# Monitoring Alert Rules Engine + +Declarative SLO-style alerting for `mofa-monitoring`. Operates alongside +the existing dashboard, Prometheus exporter, and OpenTelemetry tracing +layers — this module adds the evaluation + notification loop that +consumes the metrics those layers publish. + +--- + +## Architecture + +```mermaid +flowchart LR + subgraph Sources [Metric sources] + MC[MetricsCollector] + PR[Prometheus scrape] + IM[InMemoryMetricSource
test fixture] + end + + subgraph Engine [Alerts engine] + RS[Rule set] + EV[Evaluator] + end + + subgraph Sinks [Notifiers] + LN[LogNotifier] + CN[CollectingNotifier] + CO[CompositeNotifier] + end + + MC -->|MetricSource| EV + PR -->|MetricSource| EV + IM -->|MetricSource| EV + RS --> EV + EV -->|AlertEvent| CO + CO --> LN + CO --> CN +``` + +The engine is backend-agnostic in both directions: +`MetricSource` lets the evaluator consume from any backend that can +return the current value of a named metric, and `Notifier` lets the +operator wire events to log, webhook, chat, pager, or any composition. + +--- + +## State machine + +Matches Prometheus semantics: a rule soaks in `Pending` for the +configured `for_duration` before it is allowed to fire, guarding against +flapping. + +```mermaid +stateDiagram-v2 + [*] --> Inactive + Inactive --> Firing: match && for_duration == 0 + Inactive --> Pending: match && for_duration > 0 + Pending --> Firing: match && elapsed >= for_duration + Pending --> Inactive: no match (silent) + Firing --> Firing: match + Firing --> Inactive: no match (emits Resolved) +``` + +Only the following transitions emit an `AlertEvent`: + +| Transition | Emitted state | +|------------|---------------| +| `Inactive → Firing` | `Firing` | +| `Inactive → Pending` | `Pending` | +| `Pending → Firing` | `Firing` | +| `Firing → Inactive` | `Resolved` | +| `Pending → Inactive` | *silent* | + +--- + +## Rule model + +```mermaid +classDiagram + class Rule { + +name: String + +description: String + +severity: Severity + +condition: Condition + +for_duration: Duration + +labels: HashMap~String,String~ + +annotations: HashMap~String,String~ + +primary_metric() Option~String~ + } + class Severity { + <> + Info + Warning + Critical + } + class Condition { + <> + Threshold + RateOfChange + Absent + } + class ComparisonOp { + <> + Gt + Gte + Lt + Lte + Eq + Neq + } + Rule --> Severity + Rule --> Condition + Condition --> ComparisonOp +``` + +--- + +## Condition families + +### `Threshold` + +Fire when a scalar metric satisfies `value OP threshold`. + +```rust +use std::time::Duration; +use mofa_monitoring::alerts::{ComparisonOp, Condition, Rule, Severity}; + +let rule = Rule::new( + "high-error-rate", + "LLM error rate above 5%", + Severity::Warning, + Condition::Threshold { + metric: "llm_error_rate".into(), + op: ComparisonOp::Gt, + threshold: 0.05, + }, +) +.with_for(Duration::from_secs(120)) +.with_label("team", "platform") +.with_annotation("runbook", "https://runbooks.internal/llm-errors"); +``` + +### `RateOfChange` + +Fire when the per-second derivative of a monotonic counter over a sliding +window satisfies `OP threshold`. The evaluator maintains the window +samples internally; configure a sane `max_window_samples` in +`EvaluatorConfig` to cap memory. + +### `Absent` + +Fire when a metric has not been observed within `staleness` — heartbeat +and liveness checks. + +--- + +## Evaluation flow + +```mermaid +sequenceDiagram + participant Tick as Tick loop + participant EV as Evaluator + participant MS as MetricSource + participant N as Notifier + + Tick->>EV: evaluate() + loop per rule + EV->>MS: sample(metric) + MS-->>EV: Some(MetricSample) | None + EV->>EV: apply condition + EV->>EV: update state machine + alt state transition + EV-->>Tick: AlertEvent + end + end + Tick->>N: notify(event) per event +``` + +The evaluator is re-entrant-safe: its state is held under a `Mutex` +keyed by rule name. Production deployments typically call `evaluate()` +from a single tick loop; when scaling out, shard rules across evaluator +instances rather than locking a single evaluator. + +--- + +## Notifiers + +| Notifier | Purpose | +|----------|---------| +| `LogNotifier` | Emit through `tracing`. Warning/Critical go to `warn!`; Info to `info!`. Good default alongside richer delivery. | +| `CollectingNotifier` | Bounded in-memory buffer. Powers the dashboard "recent alerts" panel and tests. | +| `CompositeNotifier` | Fan out to multiple notifiers best-effort. | + +Future integrations (webhook, Slack, PagerDuty) plug in as additional +`Notifier` implementors without changing the evaluator contract. + +--- + +## Wiring checklist + +- [ ] Construct an `Arc` — either the in-memory fixture, + a `MetricsCollector` adapter, or a Prometheus scrape client. +- [ ] Build the rule set: `Vec`. +- [ ] Instantiate an `Evaluator` (use `with_config` to override + `max_window_samples` if you run rate-of-change rules over long + windows). +- [ ] Wire one or more `Notifier`s (typically `CompositeNotifier` over + `LogNotifier` + a delivery notifier). +- [ ] Tick `evaluator.evaluate()` on a cadence (typically every 15–60s) + and fan each event to the notifier. + +--- + +## Status + +- Rule model, condition families, evaluator, notifier abstraction — + delivered +- Metric source adapter against the existing `MetricsCollector` — + follow-up +- Prometheus scrape adapter — follow-up +- Webhook / Slack notifier — follow-up +- YAML rule-file loader — follow-up