Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
67 changes: 67 additions & 0 deletions pkg/workflows/dontime/metrics.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
package dontime

import (
"context"
"sync"

"go.opentelemetry.io/otel/metric"

"github.com/smartcontractkit/chainlink-common/pkg/beholder"
"github.com/smartcontractkit/chainlink-common/pkg/logger"
)

type BeholderMetrics struct {
responsesDelivered metric.Int64Counter
pendingRequests metric.Int64Gauge
}

var sharedDontimeMetrics struct {
once sync.Once
m *BeholderMetrics
}

func sharedBeholderMetrics(lggr logger.Logger) *BeholderMetrics {
sharedDontimeMetrics.once.Do(func() {
var err error
sharedDontimeMetrics.m, err = NewBeholderMetrics()
if err != nil {
lggr.Warnw("failed to initialize dontime beholder metrics; continuing without", "err", err)
}
})
return sharedDontimeMetrics.m
}

func NewBeholderMetrics() (*BeholderMetrics, error) {
responsesDelivered, err := beholder.GetMeter().Int64Counter(
"dontime_responses_delivered_total",
metric.WithDescription("Total DON time responses delivered to callers after OCR3 transmit"),
)
if err != nil {
return nil, err
}
pendingRequests, err := beholder.GetMeter().Int64Gauge(
"dontime_pending_requests_in_store",
metric.WithDescription("Pending DON time requests in the local store"),
)
if err != nil {
return nil, err
}
return &BeholderMetrics{
responsesDelivered: responsesDelivered,
pendingRequests: pendingRequests,
}, nil
}

func (m *BeholderMetrics) AddResponsesDelivered(ctx context.Context, n int64) {
if m == nil || n == 0 {
return
}
m.responsesDelivered.Add(ctx, n)
}

func (m *BeholderMetrics) SetPendingRequestsInStore(ctx context.Context, n int64) {
if m == nil {
return
}
m.pendingRequests.Record(ctx, n)
}
13 changes: 10 additions & 3 deletions pkg/workflows/dontime/plugin.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ type Plugin struct {
config ocr3types.ReportingPluginConfig
offChainConfig *pb.Config
lggr logger.Logger
metrics *BeholderMetrics

batchSize int
minTimeIncrease int64
Expand All @@ -45,11 +46,13 @@ func NewPlugin(store *Store, config ocr3types.ReportingPluginConfig, offchainCfg
return nil, errors.New("execution removal time must be positive")
}

namedLggr := logger.Named(lggr, "DONTimePlugin")
return &Plugin{
store: store,
config: config,
offChainConfig: offchainCfg,
lggr: logger.Named(lggr, "DONTimePlugin"),
lggr: namedLggr,
metrics: sharedBeholderMetrics(namedLggr),
batchSize: int(offchainCfg.MaxBatchSize),
minTimeIncrease: offchainCfg.MinTimeIncrease / int64(time.Millisecond),
}, nil
Expand All @@ -59,7 +62,7 @@ func (p *Plugin) Query(_ context.Context, _ ocr3types.OutcomeContext) (types.Que
return nil, nil
}

func (p *Plugin) Observation(_ context.Context, outctx ocr3types.OutcomeContext, query types.Query) (types.Observation, error) {
func (p *Plugin) Observation(ctx context.Context, outctx ocr3types.OutcomeContext, query types.Query) (types.Observation, error) {
previousOutcome := &pb.Outcome{}
if err := proto.Unmarshal(outctx.PreviousOutcome, previousOutcome); err != nil {
p.lggr.Errorf("failed to unmarshal previous outcome in Observation phase")
Expand Down Expand Up @@ -99,6 +102,9 @@ func (p *Plugin) Observation(_ context.Context, outctx ocr3types.OutcomeContext,
requests[req.WorkflowExecutionID] = int64(req.SeqNum)
}

p.lggr.Infow("Observation built", "requestsIncluded", len(requests))
p.metrics.SetPendingRequestsInStore(ctx, int64(len(p.store.GetRequests())))

observation := &pb.Observation{
Timestamp: time.Now().UTC().UnixMilli(),
Requests: requests,
Expand All @@ -115,7 +121,7 @@ func (p *Plugin) ObservationQuorum(_ context.Context, _ ocr3types.OutcomeContext
return quorumhelper.ObservationCountReachesObservationQuorum(quorumhelper.QuorumTwoFPlusOne, p.config.N, p.config.F, aos), nil
}

func (p *Plugin) Outcome(_ context.Context, outctx ocr3types.OutcomeContext, _ types.Query, aos []types.AttributedObservation) (ocr3types.Outcome, error) {
func (p *Plugin) Outcome(ctx context.Context, outctx ocr3types.OutcomeContext, _ types.Query, aos []types.AttributedObservation) (ocr3types.Outcome, error) {
observationCounts := map[string]int64{} // counts how many nodes reported where a new DON timestamp might be needed
type timestampNodePair struct {
Timestamp int64
Expand Down Expand Up @@ -215,6 +221,7 @@ func (p *Plugin) Outcome(_ context.Context, outctx ocr3types.OutcomeContext, _ t
"observedDonTimesEntries", len(outcome.ObservedDonTimes),
"outcomeSizeBytes", len(outcomeBytes),
)
p.metrics.SetPendingRequestsInStore(ctx, int64(len(p.store.GetRequests())))
return outcomeBytes, err
}

Expand Down
22 changes: 18 additions & 4 deletions pkg/workflows/dontime/transmitter.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,19 @@ type Transmitter struct {
lggr logger.Logger
store *Store
fromAccount types.Account
metrics *BeholderMetrics
}

func NewTransmitter(lggr logger.Logger, store *Store, fromAccount types.Account) *Transmitter {
return &Transmitter{lggr: lggr, store: store, fromAccount: fromAccount}
return &Transmitter{
lggr: lggr,
store: store,
fromAccount: fromAccount,
metrics: sharedBeholderMetrics(lggr),
}
}

func (t *Transmitter) Transmit(_ context.Context, _ types.ConfigDigest, _ uint64, r ocr3types.ReportWithInfo[[]byte], _ []types.AttributedOnchainSignature) error {
func (t *Transmitter) Transmit(ctx context.Context, _ types.ConfigDigest, _ uint64, r ocr3types.ReportWithInfo[[]byte], _ []types.AttributedOnchainSignature) error {
outcome := &pb.Outcome{}
if err := proto.Unmarshal(r.Report, outcome); err != nil {
t.lggr.Errorf("failed to unmarshal report")
Expand All @@ -40,8 +46,7 @@ func (t *Transmitter) Transmit(_ context.Context, _ types.ConfigDigest, _ uint64
t.store.replaceDonTimes(currentDonTimes)
t.store.setLastObservedDonTime(outcome.Timestamp)

t.lggr.Infow("Transmitting timestamps", "lastObservedDonTime", outcome.Timestamp)

var responsesDelivered int64
for executionID, donTimes := range outcome.ObservedDonTimes {
request := t.store.GetRequest(executionID)
if request == nil {
Expand All @@ -59,9 +64,18 @@ func (t *Transmitter) Transmit(_ context.Context, _ types.ConfigDigest, _ uint64
Timestamp: donTime,
Err: nil,
})
responsesDelivered++
}
}

t.metrics.AddResponsesDelivered(ctx, responsesDelivered)
t.metrics.SetPendingRequestsInStore(ctx, int64(len(t.store.GetRequests())))

t.lggr.Infow("Transmitting timestamps",
"lastObservedDonTime", outcome.Timestamp,
"observedDonTimesEntries", len(outcome.ObservedDonTimes),
)

return nil
}

Expand Down
Loading