Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
33 commits
Select commit Hold shift + click to select a range
6163b6a
1st commit
thomaska Feb 27, 2026
5ec2dec
Amend chip_ingress_batch_emitter_test.go
thomaska Feb 27, 2026
b4bc8fe
Merge branch 'main' into infoplat-3436-chipingress-publishBatch
thomaska Feb 27, 2026
ab95437
Fix comment
thomaska Feb 27, 2026
f31d3e8
batchEmitter -> batchEmitterService
thomaska Mar 2, 2026
ba186b7
Add feature flag
thomaska Mar 2, 2026
dc9e235
Make logger last parameter
thomaska Mar 2, 2026
38c7ca3
Omit ChipIngressBatchEmitter.start
thomaska Mar 2, 2026
7925a67
Merge branch 'main' into infoplat-3436-chipingress-publishBatch
thomaska Mar 2, 2026
616ac1e
Add retries and drain
thomaska Mar 4, 2026
c57a9b0
Use idiomatic select
thomaska Mar 4, 2026
c936cda
Merge branch 'main' into infoplat-3436-chipingress-publishBatch
thomaska Mar 4, 2026
b55aafd
Various fixes
thomaska Mar 4, 2026
58a8dd9
Update defaults
thomaska Mar 4, 2026
7871a2c
Change default timeout to 5s
thomaska Mar 4, 2026
d7f42dc
Remove retries, add max workers
thomaska Mar 5, 2026
af757a1
Fix draining, remove unused
thomaska Mar 5, 2026
32ebc25
Change defaults
thomaska Mar 5, 2026
d458126
Use batch emitter client
thomaska Mar 6, 2026
314b0f5
Increase defaultMaxConcurrentSends 3->10
thomaska Mar 6, 2026
689ff49
Use application logger
thomaska Mar 6, 2026
86ca238
Small fixes
thomaska Mar 6, 2026
ac45d12
Amend comment
thomaska Mar 6, 2026
ffad1ed
Add missing logger
thomaska Mar 9, 2026
71db03e
Fix regression
thomaska Mar 12, 2026
a66147b
Switch to single client
thomaska Mar 12, 2026
16aa709
Use caller ctx
thomaska Mar 16, 2026
3048466
Add batch emitter in services
thomaska Mar 18, 2026
7257a19
BatchEmitter + Service
thomaska Mar 18, 2026
e82a57e
remove emit_only_adapter - amend loop server
thomaska Mar 18, 2026
0ea9837
Merge branch 'main' into infoplat-3436-chipingress-publishBatch
thomaska Mar 20, 2026
2c481d0
test(beholder): close writer client in global test
pkcll Mar 26, 2026
08cc1e6
Merge branch 'upstream-main-2026-03-25' into infoplat-3436-chipingres…
pkcll Mar 26, 2026
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -157,3 +157,5 @@ require (
google.golang.org/genproto/googleapis/api v0.0.0-20251029180050-ab9386a59fda // indirect
gopkg.in/yaml.v2 v2.4.0 // indirect
)

replace github.com/smartcontractkit/chainlink-common/pkg/chipingress => ./pkg/chipingress
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please remove

2 changes: 0 additions & 2 deletions go.sum

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

197 changes: 197 additions & 0 deletions pkg/beholder/chip_ingress_batch_emitter_service.go
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

pkg/beholder/chip_ingress_batch_emitter.go -> pkg/beholder/chip_ingress_batch_emitter_service.go ?

Original file line number Diff line number Diff line change
@@ -0,0 +1,197 @@
package beholder

import (
"context"
"fmt"
"sync"
"time"

"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/attribute"
otelmetric "go.opentelemetry.io/otel/metric"

"github.com/smartcontractkit/chainlink-common/pkg/chipingress"
"github.com/smartcontractkit/chainlink-common/pkg/chipingress/batch"
"github.com/smartcontractkit/chainlink-common/pkg/logger"
"github.com/smartcontractkit/chainlink-common/pkg/services"
)

// ChipIngressBatchEmitterService batches events and sends them via chipingress.Client.PublishBatch.
// It implements the Emitter interface.
type ChipIngressBatchEmitterService struct {
services.Service
eng *services.Engine

batchClient *batch.Client

metricAttrsCache sync.Map // map[string]otelmetric.MeasurementOption
metrics batchEmitterMetrics
}

type batchEmitterMetrics struct {
eventsSent otelmetric.Int64Counter
eventsDropped otelmetric.Int64Counter
}

// NewChipIngressBatchEmitterService creates a batch emitter service backed by the given chipingress client.
func NewChipIngressBatchEmitterService(client chipingress.Client, cfg Config, lggr logger.Logger) (*ChipIngressBatchEmitterService, error) {
if client == nil {
return nil, fmt.Errorf("chip ingress client is nil")
}

bufferSize := int(cfg.ChipIngressBufferSize)
if bufferSize == 0 {
bufferSize = 1000
}
maxBatchSize := int(cfg.ChipIngressMaxBatchSize)
if maxBatchSize == 0 {
maxBatchSize = 500
}
maxConcurrentSends := cfg.ChipIngressMaxConcurrentSends
if maxConcurrentSends == 0 {
maxConcurrentSends = defaultMaxConcurrentSends
}
sendInterval := cfg.ChipIngressSendInterval
if sendInterval == 0 {
sendInterval = 100 * time.Millisecond
}
sendTimeout := cfg.ChipIngressSendTimeout
if sendTimeout == 0 {
sendTimeout = 3 * time.Second
}
drainTimeout := cfg.ChipIngressDrainTimeout
if drainTimeout == 0 {
drainTimeout = 10 * time.Second
}

meter := otel.Meter("beholder/chip_ingress_batch_emitter")
metrics, err := newBatchEmitterMetrics(meter)
if err != nil {
return nil, fmt.Errorf("failed to create batch emitter metrics: %w", err)
}

batchClient, err := batch.NewBatchClient(client,
batch.WithBatchSize(maxBatchSize),
batch.WithMessageBuffer(bufferSize),
batch.WithBatchInterval(sendInterval),
batch.WithMaxPublishTimeout(sendTimeout),
batch.WithShutdownTimeout(drainTimeout),
batch.WithMaxConcurrentSends(maxConcurrentSends),
batch.WithEventClone(false),
)
if err != nil {
return nil, fmt.Errorf("failed to create batch client: %w", err)
}

e := &ChipIngressBatchEmitterService{
batchClient: batchClient,
metrics: metrics,
}

e.Service, e.eng = services.Config{
Name: "ChipIngressBatchEmitterService",
}.NewServiceEngine(lggr)

e.eng.Go(func(ctx context.Context) {
batchClient.Start(ctx)
<-ctx.Done()
batchClient.Stop()
})

return e, nil
}

// Emit queues an event for batched delivery without blocking.
// Returns an error if the emitter is stopped or the context is cancelled.
// If the buffer is full, the event is silently dropped.
func (e *ChipIngressBatchEmitterService) Emit(ctx context.Context, body []byte, attrKVs ...any) error {
return e.emitInternal(ctx, body, nil, attrKVs...)
}

// EmitWithCallback works like Emit but invokes callback once the event's fate
// is determined (nil on success, non-nil on failure or buffer-full drop).
//
// If EmitWithCallback returns a non-nil error, the callback will NOT be invoked.
// If it returns nil, the callback is guaranteed to fire exactly once.
func (e *ChipIngressBatchEmitterService) EmitWithCallback(ctx context.Context, body []byte, callback func(error), attrKVs ...any) error {
return e.emitInternal(ctx, body, callback, attrKVs...)
}

func (e *ChipIngressBatchEmitterService) emitInternal(ctx context.Context, body []byte, callback func(error), attrKVs ...any) error {
return e.eng.IfNotStopped(func() error {
domain, entity, err := ExtractSourceAndType(attrKVs...)
if err != nil {
return err
}

attributes := newAttributes(attrKVs...)

event, err := chipingress.NewEvent(domain, entity, body, attributes)
if err != nil {
return fmt.Errorf("failed to create CloudEvent: %w", err)
}
eventPb, err := chipingress.EventToProto(event)
if err != nil {
return fmt.Errorf("failed to convert to proto: %w", err)
}

if err := ctx.Err(); err != nil {
return err
}

metricAttrs := e.metricAttrsFor(domain, entity)

queueErr := e.batchClient.QueueMessage(eventPb, func(sendErr error) {
if sendErr != nil {
e.metrics.eventsDropped.Add(ctx, 1, metricAttrs)
} else {
e.metrics.eventsSent.Add(ctx, 1, metricAttrs)
}
if callback != nil {
callback(sendErr)
}
})
if queueErr != nil {
e.metrics.eventsDropped.Add(ctx, 1, metricAttrs)
if callback != nil {
callback(queueErr)
}
}

return nil
})
}

func (e *ChipIngressBatchEmitterService) metricAttrsFor(domain, entity string) otelmetric.MeasurementOption {
key := domain + "\x00" + entity
if v, ok := e.metricAttrsCache.Load(key); ok {
return v.(otelmetric.MeasurementOption)
}
attrs := otelmetric.WithAttributeSet(attribute.NewSet(
attribute.String("domain", domain),
attribute.String("entity", entity),
))
v, _ := e.metricAttrsCache.LoadOrStore(key, attrs)
return v.(otelmetric.MeasurementOption)
}

func newBatchEmitterMetrics(meter otelmetric.Meter) (batchEmitterMetrics, error) {
eventsSent, err := meter.Int64Counter("chip_ingress.events_sent",
otelmetric.WithDescription("Total events successfully sent via PublishBatch"),
otelmetric.WithUnit("{event}"))
if err != nil {
return batchEmitterMetrics{}, err
}

eventsDropped, err := meter.Int64Counter("chip_ingress.events_dropped",
otelmetric.WithDescription("Total events dropped (buffer full or send failure)"),
otelmetric.WithUnit("{event}"))
if err != nil {
return batchEmitterMetrics{}, err
}

return batchEmitterMetrics{
eventsSent: eventsSent,
eventsDropped: eventsDropped,
}, nil
}
Loading
Loading