diff --git a/pkg/capabilities/base_trigger.go b/pkg/capabilities/base_trigger.go index 142868878..151211569 100644 --- a/pkg/capabilities/base_trigger.go +++ b/pkg/capabilities/base_trigger.go @@ -45,6 +45,13 @@ type BaseTriggerMetrics interface { IncAckError(reason string) // IncAckMemoryOutcome records how an ACK related to the in-memory pending map: hit, miss_no_trigger_bucket, miss_no_event, miss_nil_record. IncAckMemoryOutcome(outcome string) + // AddPendingEvents adjusts the live gauge of events awaiting ACK. Positive on insert, negative on ACK/unregister. + AddPendingEvents(delta int64) + // IncStuckEvent increments the live gauge of events stuck past the critical undelivered threshold. + // Keyed by (capability_id, trigger_id, event_id) so you can see exactly which events are stuck. + IncStuckEvent(triggerID, eventID string) + // DecStuckEvent decrements the stuck-event gauge when a previously-critical event is ACKed or unregistered. + DecStuckEvent(triggerID, eventID string) } type undeliveredState struct { @@ -185,6 +192,10 @@ func (b *BaseTriggerCapability[T]) Start(ctx context.Context) error { } b.mu.Unlock() + if n := int64(len(recs)); n > 0 { + b.metrics.AddPendingEvents(n) + } + b.wg.Add(1) go func() { defer b.wg.Done() @@ -212,14 +223,32 @@ func (b *BaseTriggerCapability[T]) RegisterTrigger(triggerID string, sendCh chan func (b *BaseTriggerCapability[T]) UnregisterTrigger(triggerID string) { b.mu.Lock() _, existed := b.inboxes[triggerID] + pendingCount := int64(len(b.pending[triggerID])) + + var criticalEvents []string + if m, ok := b.undeliveredAlertStates[triggerID]; ok { + for eventID, s := range m { + if s != nil && s.emittedCritical { + criticalEvents = append(criticalEvents, eventID) + } + } + } + delete(b.inboxes, triggerID) delete(b.pending, triggerID) delete(b.undeliveredAlertStates, triggerID) b.mu.Unlock() + for _, eventID := range criticalEvents { + b.metrics.DecStuckEvent(triggerID, eventID) + } + if existed { b.metrics.DecActiveTriggers() } + if pendingCount > 0 { + b.metrics.AddPendingEvents(-pendingCount) + } if err := b.store.DeleteEventsForTrigger(b.ctx, triggerID); err != nil { b.lggr.Errorf("Failed to delete events for trigger (TriggerID=%s): %v", triggerID, err) @@ -258,6 +287,7 @@ func (b *BaseTriggerCapability[T]) DeliverEvent( b.pending[triggerID][te.ID] = &rec b.mu.Unlock() + b.metrics.AddPendingEvents(1) b.trySend(rec) return nil } @@ -327,7 +357,11 @@ func (b *BaseTriggerCapability[T]) AckEvent(ctx context.Context, triggerId strin b.metrics.IncAckMemoryOutcome("miss_no_trigger_bucket") } + var wasCritical bool if m, ok := b.undeliveredAlertStates[triggerId]; ok { + if s, exists := m[eventId]; exists && s != nil && s.emittedCritical { + wasCritical = true + } delete(m, eventId) if len(m) == 0 { delete(b.undeliveredAlertStates, triggerId) @@ -335,6 +369,10 @@ func (b *BaseTriggerCapability[T]) AckEvent(ctx context.Context, triggerId strin } b.mu.Unlock() + if wasCritical { + b.metrics.DecStuckEvent(triggerId, eventId) + } + switch { case found: b.lggr.Infow("base trigger ACK matched in-memory pending event", @@ -343,6 +381,7 @@ func (b *BaseTriggerCapability[T]) AckEvent(ctx context.Context, triggerId strin b.metrics.IncAckMemoryOutcome("hit") b.metrics.IncAck(triggerId, eventId) b.metrics.ObserveTimeToAck(triggerId, eventId, time.Since(firstAt), attempts) + b.metrics.AddPendingEvents(-1) case hadNilPendingRecord: b.lggr.Warnw("base trigger ACK: pending map had nil record for event (treating as miss; reconciling store)", "capabilityID", b.capabilityId, "triggerID", triggerId, "eventID", eventId) @@ -396,8 +435,8 @@ func (b *BaseTriggerCapability[T]) scanPending() { return } - warnThreshold := 5 * interval - critThreshold := 20 * interval + warnThreshold := 1 * interval + critThreshold := 3 * interval b.mu.Lock() toResend := make([]PendingEvent, 0, len(b.pending)) @@ -433,6 +472,7 @@ func (b *BaseTriggerCapability[T]) scanPending() { if critThreshold > 0 && !state.emittedCritical && age >= critThreshold { b.metrics.EmitUndeliveredCritical(triggerID, eventID) + b.metrics.IncStuckEvent(triggerID, eventID) state.emittedCritical = true } } diff --git a/pkg/capabilities/base_trigger_metrics.go b/pkg/capabilities/base_trigger_metrics.go index 24466ddae..025af1db4 100644 --- a/pkg/capabilities/base_trigger_metrics.go +++ b/pkg/capabilities/base_trigger_metrics.go @@ -23,6 +23,8 @@ type BaseTriggerBeholderMetrics struct { timeToAckMs metric.Int64Histogram ackAttempts metric.Int64Histogram // attempts distribution at ACK time activeRegistrations metric.Int64UpDownCounter + pendingEvents metric.Int64UpDownCounter + stuckEvents metric.Int64UpDownCounter } var _ BaseTriggerMetrics = &BaseTriggerBeholderMetrics{} @@ -75,6 +77,16 @@ func NewBaseTriggerBeholderMetrics(capabilityID string) (BaseTriggerMetrics, err return nil, err } + pendingEvents, err := beholder.GetMeter().Int64UpDownCounter("capabilities_base_trigger_pending_events") + if err != nil { + return nil, err + } + + stuckEvents, err := beholder.GetMeter().Int64UpDownCounter("capabilities_base_trigger_stuck_events") + if err != nil { + return nil, err + } + return &BaseTriggerBeholderMetrics{ capabilityID: capabilityID, retryCount: retryCount, @@ -88,6 +100,8 @@ func NewBaseTriggerBeholderMetrics(capabilityID string) (BaseTriggerMetrics, err timeToAckMs: timeToAckMs, ackAttempts: ackAttempts, activeRegistrations: activeRegistrations, + pendingEvents: pendingEvents, + stuckEvents: stuckEvents, }, nil } @@ -180,6 +194,24 @@ func (m *BaseTriggerBeholderMetrics) EmitUndeliveredCritical(triggerID, eventID ) } +func (m *BaseTriggerBeholderMetrics) AddPendingEvents(delta int64) { + m.pendingEvents.Add(context.Background(), delta, + metric.WithAttributes(attribute.String("capability_id", m.capabilityID)), + ) +} + +func (m *BaseTriggerBeholderMetrics) IncStuckEvent(triggerID, eventID string) { + m.stuckEvents.Add(context.Background(), 1, + metric.WithAttributes(m.attrs(triggerID, eventID)...), + ) +} + +func (m *BaseTriggerBeholderMetrics) DecStuckEvent(triggerID, eventID string) { + m.stuckEvents.Add(context.Background(), -1, + metric.WithAttributes(m.attrs(triggerID, eventID)...), + ) +} + type noopBaseTriggerMetrics struct{} var _ BaseTriggerMetrics = &noopBaseTriggerMetrics{} @@ -195,3 +227,6 @@ func (noopBaseTriggerMetrics) EmitUndeliveredWarning(string, string) func (noopBaseTriggerMetrics) EmitUndeliveredCritical(string, string) {} func (noopBaseTriggerMetrics) IncAckError(string) {} func (noopBaseTriggerMetrics) IncAckMemoryOutcome(string) {} +func (noopBaseTriggerMetrics) AddPendingEvents(int64) {} +func (noopBaseTriggerMetrics) IncStuckEvent(string, string) {} +func (noopBaseTriggerMetrics) DecStuckEvent(string, string) {}