From 2c8e81633876d2bddfa85473fc24fde7d94c24b8 Mon Sep 17 00:00:00 2001 From: Dylan Tinianov Date: Thu, 26 Mar 2026 11:03:11 -0400 Subject: [PATCH 1/2] Add state logging --- pkg/workflows/dontime/plugin.go | 6 ++++++ pkg/workflows/dontime/transmitter.go | 10 ++++++++-- 2 files changed, 14 insertions(+), 2 deletions(-) diff --git a/pkg/workflows/dontime/plugin.go b/pkg/workflows/dontime/plugin.go index 4cd521c630..3fa5d2f928 100644 --- a/pkg/workflows/dontime/plugin.go +++ b/pkg/workflows/dontime/plugin.go @@ -99,6 +99,11 @@ func (p *Plugin) Observation(_ context.Context, outctx ocr3types.OutcomeContext, requests[req.WorkflowExecutionID] = int64(req.SeqNum) } + p.lggr.Infow("Observation built", + "requestsIncluded", len(requests), + "pendingRequestsInStore", len(p.store.GetRequests()), + ) + observation := &pb.Observation{ Timestamp: time.Now().UTC().UnixMilli(), Requests: requests, @@ -214,6 +219,7 @@ func (p *Plugin) Outcome(_ context.Context, outctx ocr3types.OutcomeContext, _ t p.lggr.Infow("Outcome computed", "observedDonTimesEntries", len(outcome.ObservedDonTimes), "outcomeSizeBytes", len(outcomeBytes), + "pendingRequestsInStore", len(p.store.GetRequests()), ) return outcomeBytes, err } diff --git a/pkg/workflows/dontime/transmitter.go b/pkg/workflows/dontime/transmitter.go index c9d30c3a5f..9ecd46248e 100644 --- a/pkg/workflows/dontime/transmitter.go +++ b/pkg/workflows/dontime/transmitter.go @@ -40,8 +40,7 @@ func (t *Transmitter) Transmit(_ context.Context, _ types.ConfigDigest, _ uint64 t.store.replaceDonTimes(currentDonTimes) t.store.setLastObservedDonTime(outcome.Timestamp) - t.lggr.Infow("Transmitting timestamps", "lastObservedDonTime", outcome.Timestamp) - + responsesDelivered := 0 for executionID, donTimes := range outcome.ObservedDonTimes { request := t.store.GetRequest(executionID) if request == nil { @@ -59,9 +58,16 @@ func (t *Transmitter) Transmit(_ context.Context, _ types.ConfigDigest, _ uint64 Timestamp: donTime, Err: nil, }) + responsesDelivered++ } } + t.lggr.Infow("Transmitting timestamps", + "lastObservedDonTime", outcome.Timestamp, + "observedDonTimesEntries", len(outcome.ObservedDonTimes), + "responsesDelivered", responsesDelivered, + ) + return nil } From e4e5b82ce66a9ded308407e9a3ff3aebbccdd0f7 Mon Sep 17 00:00:00 2001 From: Dylan Tinianov Date: Mon, 30 Mar 2026 10:39:14 -0400 Subject: [PATCH 2/2] Add beholder metrics --- pkg/workflows/dontime/metrics.go | 67 ++++++++++++++++++++++++++++ pkg/workflows/dontime/plugin.go | 17 +++---- pkg/workflows/dontime/transmitter.go | 16 +++++-- 3 files changed, 88 insertions(+), 12 deletions(-) create mode 100644 pkg/workflows/dontime/metrics.go diff --git a/pkg/workflows/dontime/metrics.go b/pkg/workflows/dontime/metrics.go new file mode 100644 index 0000000000..3a36b13f68 --- /dev/null +++ b/pkg/workflows/dontime/metrics.go @@ -0,0 +1,67 @@ +package dontime + +import ( + "context" + "sync" + + "go.opentelemetry.io/otel/metric" + + "github.com/smartcontractkit/chainlink-common/pkg/beholder" + "github.com/smartcontractkit/chainlink-common/pkg/logger" +) + +type BeholderMetrics struct { + responsesDelivered metric.Int64Counter + pendingRequests metric.Int64Gauge +} + +var sharedDontimeMetrics struct { + once sync.Once + m *BeholderMetrics +} + +func sharedBeholderMetrics(lggr logger.Logger) *BeholderMetrics { + sharedDontimeMetrics.once.Do(func() { + var err error + sharedDontimeMetrics.m, err = NewBeholderMetrics() + if err != nil { + lggr.Warnw("failed to initialize dontime beholder metrics; continuing without", "err", err) + } + }) + return sharedDontimeMetrics.m +} + +func NewBeholderMetrics() (*BeholderMetrics, error) { + responsesDelivered, err := beholder.GetMeter().Int64Counter( + "dontime_responses_delivered_total", + metric.WithDescription("Total DON time responses delivered to callers after OCR3 transmit"), + ) + if err != nil { + return nil, err + } + pendingRequests, err := beholder.GetMeter().Int64Gauge( + "dontime_pending_requests_in_store", + metric.WithDescription("Pending DON time requests in the local store"), + ) + if err != nil { + return nil, err + } + return &BeholderMetrics{ + responsesDelivered: responsesDelivered, + pendingRequests: pendingRequests, + }, nil +} + +func (m *BeholderMetrics) AddResponsesDelivered(ctx context.Context, n int64) { + if m == nil || n == 0 { + return + } + m.responsesDelivered.Add(ctx, n) +} + +func (m *BeholderMetrics) SetPendingRequestsInStore(ctx context.Context, n int64) { + if m == nil { + return + } + m.pendingRequests.Record(ctx, n) +} diff --git a/pkg/workflows/dontime/plugin.go b/pkg/workflows/dontime/plugin.go index 3fa5d2f928..ec57e3c20b 100644 --- a/pkg/workflows/dontime/plugin.go +++ b/pkg/workflows/dontime/plugin.go @@ -27,6 +27,7 @@ type Plugin struct { config ocr3types.ReportingPluginConfig offChainConfig *pb.Config lggr logger.Logger + metrics *BeholderMetrics batchSize int minTimeIncrease int64 @@ -45,11 +46,13 @@ func NewPlugin(store *Store, config ocr3types.ReportingPluginConfig, offchainCfg return nil, errors.New("execution removal time must be positive") } + namedLggr := logger.Named(lggr, "DONTimePlugin") return &Plugin{ store: store, config: config, offChainConfig: offchainCfg, - lggr: logger.Named(lggr, "DONTimePlugin"), + lggr: namedLggr, + metrics: sharedBeholderMetrics(namedLggr), batchSize: int(offchainCfg.MaxBatchSize), minTimeIncrease: offchainCfg.MinTimeIncrease / int64(time.Millisecond), }, nil @@ -59,7 +62,7 @@ func (p *Plugin) Query(_ context.Context, _ ocr3types.OutcomeContext) (types.Que return nil, nil } -func (p *Plugin) Observation(_ context.Context, outctx ocr3types.OutcomeContext, query types.Query) (types.Observation, error) { +func (p *Plugin) Observation(ctx context.Context, outctx ocr3types.OutcomeContext, query types.Query) (types.Observation, error) { previousOutcome := &pb.Outcome{} if err := proto.Unmarshal(outctx.PreviousOutcome, previousOutcome); err != nil { p.lggr.Errorf("failed to unmarshal previous outcome in Observation phase") @@ -99,10 +102,8 @@ func (p *Plugin) Observation(_ context.Context, outctx ocr3types.OutcomeContext, requests[req.WorkflowExecutionID] = int64(req.SeqNum) } - p.lggr.Infow("Observation built", - "requestsIncluded", len(requests), - "pendingRequestsInStore", len(p.store.GetRequests()), - ) + p.lggr.Infow("Observation built", "requestsIncluded", len(requests)) + p.metrics.SetPendingRequestsInStore(ctx, int64(len(p.store.GetRequests()))) observation := &pb.Observation{ Timestamp: time.Now().UTC().UnixMilli(), @@ -120,7 +121,7 @@ func (p *Plugin) ObservationQuorum(_ context.Context, _ ocr3types.OutcomeContext return quorumhelper.ObservationCountReachesObservationQuorum(quorumhelper.QuorumTwoFPlusOne, p.config.N, p.config.F, aos), nil } -func (p *Plugin) Outcome(_ context.Context, outctx ocr3types.OutcomeContext, _ types.Query, aos []types.AttributedObservation) (ocr3types.Outcome, error) { +func (p *Plugin) Outcome(ctx context.Context, outctx ocr3types.OutcomeContext, _ types.Query, aos []types.AttributedObservation) (ocr3types.Outcome, error) { observationCounts := map[string]int64{} // counts how many nodes reported where a new DON timestamp might be needed type timestampNodePair struct { Timestamp int64 @@ -219,8 +220,8 @@ func (p *Plugin) Outcome(_ context.Context, outctx ocr3types.OutcomeContext, _ t p.lggr.Infow("Outcome computed", "observedDonTimesEntries", len(outcome.ObservedDonTimes), "outcomeSizeBytes", len(outcomeBytes), - "pendingRequestsInStore", len(p.store.GetRequests()), ) + p.metrics.SetPendingRequestsInStore(ctx, int64(len(p.store.GetRequests()))) return outcomeBytes, err } diff --git a/pkg/workflows/dontime/transmitter.go b/pkg/workflows/dontime/transmitter.go index 9ecd46248e..ce80fea369 100644 --- a/pkg/workflows/dontime/transmitter.go +++ b/pkg/workflows/dontime/transmitter.go @@ -20,13 +20,19 @@ type Transmitter struct { lggr logger.Logger store *Store fromAccount types.Account + metrics *BeholderMetrics } func NewTransmitter(lggr logger.Logger, store *Store, fromAccount types.Account) *Transmitter { - return &Transmitter{lggr: lggr, store: store, fromAccount: fromAccount} + return &Transmitter{ + lggr: lggr, + store: store, + fromAccount: fromAccount, + metrics: sharedBeholderMetrics(lggr), + } } -func (t *Transmitter) Transmit(_ context.Context, _ types.ConfigDigest, _ uint64, r ocr3types.ReportWithInfo[[]byte], _ []types.AttributedOnchainSignature) error { +func (t *Transmitter) Transmit(ctx context.Context, _ types.ConfigDigest, _ uint64, r ocr3types.ReportWithInfo[[]byte], _ []types.AttributedOnchainSignature) error { outcome := &pb.Outcome{} if err := proto.Unmarshal(r.Report, outcome); err != nil { t.lggr.Errorf("failed to unmarshal report") @@ -40,7 +46,7 @@ func (t *Transmitter) Transmit(_ context.Context, _ types.ConfigDigest, _ uint64 t.store.replaceDonTimes(currentDonTimes) t.store.setLastObservedDonTime(outcome.Timestamp) - responsesDelivered := 0 + var responsesDelivered int64 for executionID, donTimes := range outcome.ObservedDonTimes { request := t.store.GetRequest(executionID) if request == nil { @@ -62,10 +68,12 @@ func (t *Transmitter) Transmit(_ context.Context, _ types.ConfigDigest, _ uint64 } } + t.metrics.AddResponsesDelivered(ctx, responsesDelivered) + t.metrics.SetPendingRequestsInStore(ctx, int64(len(t.store.GetRequests()))) + t.lggr.Infow("Transmitting timestamps", "lastObservedDonTime", outcome.Timestamp, "observedDonTimesEntries", len(outcome.ObservedDonTimes), - "responsesDelivered", responsesDelivered, ) return nil