Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
44 changes: 42 additions & 2 deletions pkg/capabilities/base_trigger.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,13 @@ type BaseTriggerMetrics interface {
IncAckError(reason string)
// IncAckMemoryOutcome records how an ACK related to the in-memory pending map: hit, miss_no_trigger_bucket, miss_no_event, miss_nil_record.
IncAckMemoryOutcome(outcome string)
// AddPendingEvents adjusts the live gauge of events awaiting ACK. Positive on insert, negative on ACK/unregister.
AddPendingEvents(delta int64)
// IncStuckEvent increments the live gauge of events stuck past the critical undelivered threshold.
// Keyed by (capability_id, trigger_id, event_id) so you can see exactly which events are stuck.
IncStuckEvent(triggerID, eventID string)
// DecStuckEvent decrements the stuck-event gauge when a previously-critical event is ACKed or unregistered.
DecStuckEvent(triggerID, eventID string)
}

type undeliveredState struct {
Expand Down Expand Up @@ -185,6 +192,10 @@ func (b *BaseTriggerCapability[T]) Start(ctx context.Context) error {
}
b.mu.Unlock()

if n := int64(len(recs)); n > 0 {
b.metrics.AddPendingEvents(n)
}

b.wg.Add(1)
go func() {
defer b.wg.Done()
Expand Down Expand Up @@ -212,14 +223,32 @@ func (b *BaseTriggerCapability[T]) RegisterTrigger(triggerID string, sendCh chan
func (b *BaseTriggerCapability[T]) UnregisterTrigger(triggerID string) {
b.mu.Lock()
_, existed := b.inboxes[triggerID]
pendingCount := int64(len(b.pending[triggerID]))

var criticalEvents []string
if m, ok := b.undeliveredAlertStates[triggerID]; ok {
for eventID, s := range m {
if s != nil && s.emittedCritical {
criticalEvents = append(criticalEvents, eventID)
}
}
}

delete(b.inboxes, triggerID)
delete(b.pending, triggerID)
delete(b.undeliveredAlertStates, triggerID)
b.mu.Unlock()

for _, eventID := range criticalEvents {
b.metrics.DecStuckEvent(triggerID, eventID)
}

if existed {
b.metrics.DecActiveTriggers()
}
if pendingCount > 0 {
b.metrics.AddPendingEvents(-pendingCount)
}

if err := b.store.DeleteEventsForTrigger(b.ctx, triggerID); err != nil {
b.lggr.Errorf("Failed to delete events for trigger (TriggerID=%s): %v", triggerID, err)
Expand Down Expand Up @@ -258,6 +287,7 @@ func (b *BaseTriggerCapability[T]) DeliverEvent(
b.pending[triggerID][te.ID] = &rec
b.mu.Unlock()

b.metrics.AddPendingEvents(1)
b.trySend(rec)
return nil
}
Expand Down Expand Up @@ -327,14 +357,22 @@ func (b *BaseTriggerCapability[T]) AckEvent(ctx context.Context, triggerId strin
b.metrics.IncAckMemoryOutcome("miss_no_trigger_bucket")
}

var wasCritical bool
if m, ok := b.undeliveredAlertStates[triggerId]; ok {
if s, exists := m[eventId]; exists && s != nil && s.emittedCritical {
wasCritical = true
}
delete(m, eventId)
if len(m) == 0 {
delete(b.undeliveredAlertStates, triggerId)
}
}
b.mu.Unlock()

if wasCritical {
b.metrics.DecStuckEvent(triggerId, eventId)
}

switch {
case found:
b.lggr.Infow("base trigger ACK matched in-memory pending event",
Expand All @@ -343,6 +381,7 @@ func (b *BaseTriggerCapability[T]) AckEvent(ctx context.Context, triggerId strin
b.metrics.IncAckMemoryOutcome("hit")
b.metrics.IncAck(triggerId, eventId)
b.metrics.ObserveTimeToAck(triggerId, eventId, time.Since(firstAt), attempts)
b.metrics.AddPendingEvents(-1)
case hadNilPendingRecord:
b.lggr.Warnw("base trigger ACK: pending map had nil record for event (treating as miss; reconciling store)",
"capabilityID", b.capabilityId, "triggerID", triggerId, "eventID", eventId)
Expand Down Expand Up @@ -396,8 +435,8 @@ func (b *BaseTriggerCapability[T]) scanPending() {
return
}

warnThreshold := 5 * interval
critThreshold := 20 * interval
warnThreshold := 1 * interval
critThreshold := 3 * interval

b.mu.Lock()
toResend := make([]PendingEvent, 0, len(b.pending))
Expand Down Expand Up @@ -433,6 +472,7 @@ func (b *BaseTriggerCapability[T]) scanPending() {

if critThreshold > 0 && !state.emittedCritical && age >= critThreshold {
b.metrics.EmitUndeliveredCritical(triggerID, eventID)
b.metrics.IncStuckEvent(triggerID, eventID)
state.emittedCritical = true
}
}
Expand Down
35 changes: 35 additions & 0 deletions pkg/capabilities/base_trigger_metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@ type BaseTriggerBeholderMetrics struct {
timeToAckMs metric.Int64Histogram
ackAttempts metric.Int64Histogram // attempts distribution at ACK time
activeRegistrations metric.Int64UpDownCounter
pendingEvents metric.Int64UpDownCounter
stuckEvents metric.Int64UpDownCounter
}

var _ BaseTriggerMetrics = &BaseTriggerBeholderMetrics{}
Expand Down Expand Up @@ -75,6 +77,16 @@ func NewBaseTriggerBeholderMetrics(capabilityID string) (BaseTriggerMetrics, err
return nil, err
}

pendingEvents, err := beholder.GetMeter().Int64UpDownCounter("capabilities_base_trigger_pending_events")
if err != nil {
return nil, err
}

stuckEvents, err := beholder.GetMeter().Int64UpDownCounter("capabilities_base_trigger_stuck_events")
if err != nil {
return nil, err
}

return &BaseTriggerBeholderMetrics{
capabilityID: capabilityID,
retryCount: retryCount,
Expand All @@ -88,6 +100,8 @@ func NewBaseTriggerBeholderMetrics(capabilityID string) (BaseTriggerMetrics, err
timeToAckMs: timeToAckMs,
ackAttempts: ackAttempts,
activeRegistrations: activeRegistrations,
pendingEvents: pendingEvents,
stuckEvents: stuckEvents,
}, nil
}

Expand Down Expand Up @@ -180,6 +194,24 @@ func (m *BaseTriggerBeholderMetrics) EmitUndeliveredCritical(triggerID, eventID
)
}

func (m *BaseTriggerBeholderMetrics) AddPendingEvents(delta int64) {
m.pendingEvents.Add(context.Background(), delta,
metric.WithAttributes(attribute.String("capability_id", m.capabilityID)),
)
}

func (m *BaseTriggerBeholderMetrics) IncStuckEvent(triggerID, eventID string) {
m.stuckEvents.Add(context.Background(), 1,
metric.WithAttributes(m.attrs(triggerID, eventID)...),
)
}

func (m *BaseTriggerBeholderMetrics) DecStuckEvent(triggerID, eventID string) {
m.stuckEvents.Add(context.Background(), -1,
metric.WithAttributes(m.attrs(triggerID, eventID)...),
)
}

type noopBaseTriggerMetrics struct{}

var _ BaseTriggerMetrics = &noopBaseTriggerMetrics{}
Expand All @@ -195,3 +227,6 @@ func (noopBaseTriggerMetrics) EmitUndeliveredWarning(string, string)
func (noopBaseTriggerMetrics) EmitUndeliveredCritical(string, string) {}
func (noopBaseTriggerMetrics) IncAckError(string) {}
func (noopBaseTriggerMetrics) IncAckMemoryOutcome(string) {}
func (noopBaseTriggerMetrics) AddPendingEvents(int64) {}
func (noopBaseTriggerMetrics) IncStuckEvent(string, string) {}
func (noopBaseTriggerMetrics) DecStuckEvent(string, string) {}
Loading