diff --git a/go.mod b/go.mod index 644e6bb1c9..121f0dfede 100644 --- a/go.mod +++ b/go.mod @@ -157,3 +157,5 @@ require ( google.golang.org/genproto/googleapis/api v0.0.0-20251029180050-ab9386a59fda // indirect gopkg.in/yaml.v2 v2.4.0 // indirect ) + +replace github.com/smartcontractkit/chainlink-common/pkg/chipingress => ./pkg/chipingress diff --git a/go.sum b/go.sum index efe63f2888..6be61147d4 100644 --- a/go.sum +++ b/go.sum @@ -328,8 +328,6 @@ github.com/sirupsen/logrus v1.4.1/go.mod h1:ni0Sbl8bgC9z8RoU9G6nDWqqs/fq4eDPysMB github.com/sirupsen/logrus v1.4.2/go.mod h1:tLMulIdttU9McNUspp0xgXVQah82FyeX6MwdIuYE2rE= github.com/smartcontractkit/chain-selectors v1.0.89 h1:L9oWZGqQXWyTPnC6ODXgu3b0DFyLmJ9eHv+uJrE9IZY= github.com/smartcontractkit/chain-selectors v1.0.89/go.mod h1:qy7whtgG5g+7z0jt0nRyii9bLND9m15NZTzuQPkMZ5w= -github.com/smartcontractkit/chainlink-common/pkg/chipingress v0.0.10 h1:FJAFgXS9oqASnkS03RE1HQwYQQxrO4l46O5JSzxqLgg= -github.com/smartcontractkit/chainlink-common/pkg/chipingress v0.0.10/go.mod h1:oiDa54M0FwxevWwyAX773lwdWvFYYlYHHQV1LQ5HpWY= github.com/smartcontractkit/chainlink-protos/billing/go v0.0.0-20251024234028-0988426d98f4 h1:GCzrxDWn3b7jFfEA+WiYRi8CKoegsayiDoJBCjYkneE= github.com/smartcontractkit/chainlink-protos/billing/go v0.0.0-20251024234028-0988426d98f4/go.mod h1:HHGeDUpAsPa0pmOx7wrByCitjQ0mbUxf0R9v+g67uCA= github.com/smartcontractkit/chainlink-protos/cre/go v0.0.0-20260320153346-314ec8dbe5a4 h1:fkS5FJpSozwxL2FA6OJDi7az2DrtMNiK1X5DWuHDyfA= diff --git a/pkg/beholder/chip_ingress_batch_emitter_service.go b/pkg/beholder/chip_ingress_batch_emitter_service.go new file mode 100644 index 0000000000..427941e3f5 --- /dev/null +++ b/pkg/beholder/chip_ingress_batch_emitter_service.go @@ -0,0 +1,197 @@ +package beholder + +import ( + "context" + "fmt" + "sync" + "time" + + "go.opentelemetry.io/otel" + "go.opentelemetry.io/otel/attribute" + otelmetric "go.opentelemetry.io/otel/metric" + + "github.com/smartcontractkit/chainlink-common/pkg/chipingress" + "github.com/smartcontractkit/chainlink-common/pkg/chipingress/batch" + "github.com/smartcontractkit/chainlink-common/pkg/logger" + "github.com/smartcontractkit/chainlink-common/pkg/services" +) + +// ChipIngressBatchEmitterService batches events and sends them via chipingress.Client.PublishBatch. +// It implements the Emitter interface. +type ChipIngressBatchEmitterService struct { + services.Service + eng *services.Engine + + batchClient *batch.Client + + metricAttrsCache sync.Map // map[string]otelmetric.MeasurementOption + metrics batchEmitterMetrics +} + +type batchEmitterMetrics struct { + eventsSent otelmetric.Int64Counter + eventsDropped otelmetric.Int64Counter +} + +// NewChipIngressBatchEmitterService creates a batch emitter service backed by the given chipingress client. +func NewChipIngressBatchEmitterService(client chipingress.Client, cfg Config, lggr logger.Logger) (*ChipIngressBatchEmitterService, error) { + if client == nil { + return nil, fmt.Errorf("chip ingress client is nil") + } + + bufferSize := int(cfg.ChipIngressBufferSize) + if bufferSize == 0 { + bufferSize = 1000 + } + maxBatchSize := int(cfg.ChipIngressMaxBatchSize) + if maxBatchSize == 0 { + maxBatchSize = 500 + } + maxConcurrentSends := cfg.ChipIngressMaxConcurrentSends + if maxConcurrentSends == 0 { + maxConcurrentSends = defaultMaxConcurrentSends + } + sendInterval := cfg.ChipIngressSendInterval + if sendInterval == 0 { + sendInterval = 100 * time.Millisecond + } + sendTimeout := cfg.ChipIngressSendTimeout + if sendTimeout == 0 { + sendTimeout = 3 * time.Second + } + drainTimeout := cfg.ChipIngressDrainTimeout + if drainTimeout == 0 { + drainTimeout = 10 * time.Second + } + + meter := otel.Meter("beholder/chip_ingress_batch_emitter") + metrics, err := newBatchEmitterMetrics(meter) + if err != nil { + return nil, fmt.Errorf("failed to create batch emitter metrics: %w", err) + } + + batchClient, err := batch.NewBatchClient(client, + batch.WithBatchSize(maxBatchSize), + batch.WithMessageBuffer(bufferSize), + batch.WithBatchInterval(sendInterval), + batch.WithMaxPublishTimeout(sendTimeout), + batch.WithShutdownTimeout(drainTimeout), + batch.WithMaxConcurrentSends(maxConcurrentSends), + batch.WithEventClone(false), + ) + if err != nil { + return nil, fmt.Errorf("failed to create batch client: %w", err) + } + + e := &ChipIngressBatchEmitterService{ + batchClient: batchClient, + metrics: metrics, + } + + e.Service, e.eng = services.Config{ + Name: "ChipIngressBatchEmitterService", + }.NewServiceEngine(lggr) + + e.eng.Go(func(ctx context.Context) { + batchClient.Start(ctx) + <-ctx.Done() + batchClient.Stop() + }) + + return e, nil +} + +// Emit queues an event for batched delivery without blocking. +// Returns an error if the emitter is stopped or the context is cancelled. +// If the buffer is full, the event is silently dropped. +func (e *ChipIngressBatchEmitterService) Emit(ctx context.Context, body []byte, attrKVs ...any) error { + return e.emitInternal(ctx, body, nil, attrKVs...) +} + +// EmitWithCallback works like Emit but invokes callback once the event's fate +// is determined (nil on success, non-nil on failure or buffer-full drop). +// +// If EmitWithCallback returns a non-nil error, the callback will NOT be invoked. +// If it returns nil, the callback is guaranteed to fire exactly once. +func (e *ChipIngressBatchEmitterService) EmitWithCallback(ctx context.Context, body []byte, callback func(error), attrKVs ...any) error { + return e.emitInternal(ctx, body, callback, attrKVs...) +} + +func (e *ChipIngressBatchEmitterService) emitInternal(ctx context.Context, body []byte, callback func(error), attrKVs ...any) error { + return e.eng.IfNotStopped(func() error { + domain, entity, err := ExtractSourceAndType(attrKVs...) + if err != nil { + return err + } + + attributes := newAttributes(attrKVs...) + + event, err := chipingress.NewEvent(domain, entity, body, attributes) + if err != nil { + return fmt.Errorf("failed to create CloudEvent: %w", err) + } + eventPb, err := chipingress.EventToProto(event) + if err != nil { + return fmt.Errorf("failed to convert to proto: %w", err) + } + + if err := ctx.Err(); err != nil { + return err + } + + metricAttrs := e.metricAttrsFor(domain, entity) + + queueErr := e.batchClient.QueueMessage(eventPb, func(sendErr error) { + if sendErr != nil { + e.metrics.eventsDropped.Add(ctx, 1, metricAttrs) + } else { + e.metrics.eventsSent.Add(ctx, 1, metricAttrs) + } + if callback != nil { + callback(sendErr) + } + }) + if queueErr != nil { + e.metrics.eventsDropped.Add(ctx, 1, metricAttrs) + if callback != nil { + callback(queueErr) + } + } + + return nil + }) +} + +func (e *ChipIngressBatchEmitterService) metricAttrsFor(domain, entity string) otelmetric.MeasurementOption { + key := domain + "\x00" + entity + if v, ok := e.metricAttrsCache.Load(key); ok { + return v.(otelmetric.MeasurementOption) + } + attrs := otelmetric.WithAttributeSet(attribute.NewSet( + attribute.String("domain", domain), + attribute.String("entity", entity), + )) + v, _ := e.metricAttrsCache.LoadOrStore(key, attrs) + return v.(otelmetric.MeasurementOption) +} + +func newBatchEmitterMetrics(meter otelmetric.Meter) (batchEmitterMetrics, error) { + eventsSent, err := meter.Int64Counter("chip_ingress.events_sent", + otelmetric.WithDescription("Total events successfully sent via PublishBatch"), + otelmetric.WithUnit("{event}")) + if err != nil { + return batchEmitterMetrics{}, err + } + + eventsDropped, err := meter.Int64Counter("chip_ingress.events_dropped", + otelmetric.WithDescription("Total events dropped (buffer full or send failure)"), + otelmetric.WithUnit("{event}")) + if err != nil { + return batchEmitterMetrics{}, err + } + + return batchEmitterMetrics{ + eventsSent: eventsSent, + eventsDropped: eventsDropped, + }, nil +} diff --git a/pkg/beholder/chip_ingress_batch_emitter_service_test.go b/pkg/beholder/chip_ingress_batch_emitter_service_test.go new file mode 100644 index 0000000000..988647a56d --- /dev/null +++ b/pkg/beholder/chip_ingress_batch_emitter_service_test.go @@ -0,0 +1,429 @@ +package beholder_test + +import ( + "context" + "sync" + "testing" + "time" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/mock" + "github.com/stretchr/testify/require" + + "github.com/smartcontractkit/chainlink-common/pkg/beholder" + "github.com/smartcontractkit/chainlink-common/pkg/chipingress" + "github.com/smartcontractkit/chainlink-common/pkg/chipingress/mocks" + "github.com/smartcontractkit/chainlink-common/pkg/logger" +) + +func newTestConfig() beholder.Config { + return beholder.Config{ + ChipIngressBufferSize: 10, + ChipIngressMaxBatchSize: 5, + ChipIngressMaxConcurrentSends: 3, + ChipIngressSendInterval: 50 * time.Millisecond, + ChipIngressSendTimeout: 5 * time.Second, + ChipIngressDrainTimeout: 5 * time.Second, + } +} + +func newTestLogger(t *testing.T) logger.Logger { + t.Helper() + lggr, err := logger.New() + require.NoError(t, err) + t.Cleanup(func() { _ = lggr.Sync() }) + return lggr +} + +func TestNewChipIngressBatchEmitterService(t *testing.T) { + t.Run("happy path", func(t *testing.T) { + clientMock := mocks.NewClient(t) + emitter, err := beholder.NewChipIngressBatchEmitterService(clientMock, newTestConfig(), newTestLogger(t)) + require.NoError(t, err) + assert.NotNil(t, emitter) + }) + + t.Run("returns error when client is nil", func(t *testing.T) { + emitter, err := beholder.NewChipIngressBatchEmitterService(nil, newTestConfig(), newTestLogger(t)) + assert.Error(t, err) + assert.Nil(t, emitter) + }) +} + +func TestChipIngressBatchEmitterService_Emit(t *testing.T) { + t.Run("returns error when domain/entity missing", func(t *testing.T) { + clientMock := mocks.NewClient(t) + emitter, err := beholder.NewChipIngressBatchEmitterService(clientMock, newTestConfig(), newTestLogger(t)) + require.NoError(t, err) + require.NoError(t, emitter.Start(t.Context())) + defer emitter.Close() //nolint:errcheck + + err = emitter.Emit(t.Context(), []byte("test"), "bad_key", "bad_value") + assert.Error(t, err) + }) + + t.Run("events are batched and sent via PublishBatch", func(t *testing.T) { + clientMock := mocks.NewClient(t) + + var mu sync.Mutex + var receivedBatches []*chipingress.CloudEventBatch + clientMock. + On("PublishBatch", mock.Anything, mock.Anything). + Run(func(args mock.Arguments) { + mu.Lock() + defer mu.Unlock() + batch := args.Get(1).(*chipingress.CloudEventBatch) + receivedBatches = append(receivedBatches, batch) + }). + Return(nil, nil) + + cfg := newTestConfig() + cfg.ChipIngressSendInterval = 50 * time.Millisecond + + emitter, err := beholder.NewChipIngressBatchEmitterService(clientMock, cfg, newTestLogger(t)) + require.NoError(t, err) + require.NoError(t, emitter.Start(t.Context())) + + for i := 0; i < 3; i++ { + err = emitter.Emit(t.Context(), []byte("body"), + beholder.AttrKeyDomain, "platform", + beholder.AttrKeyEntity, "TestEvent", + ) + require.NoError(t, err) + } + + assert.Eventually(t, func() bool { + mu.Lock() + defer mu.Unlock() + return len(receivedBatches) > 0 + }, 2*time.Second, 10*time.Millisecond) + + require.NoError(t, emitter.Close()) + + mu.Lock() + defer mu.Unlock() + + totalEvents := 0 + for _, batch := range receivedBatches { + totalEvents += len(batch.Events) + } + assert.Equal(t, 3, totalEvents) + }) +} + +func TestChipIngressBatchEmitterService_CloudEventFormat(t *testing.T) { + clientMock := mocks.NewClient(t) + + var mu sync.Mutex + var receivedBatch *chipingress.CloudEventBatch + clientMock. + On("PublishBatch", mock.Anything, mock.Anything). + Run(func(args mock.Arguments) { + mu.Lock() + defer mu.Unlock() + receivedBatch = args.Get(1).(*chipingress.CloudEventBatch) + }). + Return(nil, nil) + + cfg := newTestConfig() + cfg.ChipIngressSendInterval = 50 * time.Millisecond + + emitter, err := beholder.NewChipIngressBatchEmitterService(clientMock, cfg, newTestLogger(t)) + require.NoError(t, err) + require.NoError(t, emitter.Start(t.Context())) + + err = emitter.Emit(t.Context(), []byte("test-payload"), + beholder.AttrKeyDomain, "my-domain", + beholder.AttrKeyEntity, "my-entity", + ) + require.NoError(t, err) + + assert.Eventually(t, func() bool { + mu.Lock() + defer mu.Unlock() + return receivedBatch != nil + }, 2*time.Second, 10*time.Millisecond) + + require.NoError(t, emitter.Close()) + + mu.Lock() + defer mu.Unlock() + require.Len(t, receivedBatch.Events, 1) + + event := receivedBatch.Events[0] + assert.Equal(t, "my-domain", event.Source) + assert.Equal(t, "my-entity", event.Type) + assert.NotEmpty(t, event.Id) +} + +func TestChipIngressBatchEmitterService_PublishBatchError(t *testing.T) { + clientMock := mocks.NewClient(t) + + var mu sync.Mutex + callCount := 0 + clientMock. + On("PublishBatch", mock.Anything, mock.Anything). + Run(func(_ mock.Arguments) { + mu.Lock() + defer mu.Unlock() + callCount++ + }). + Return(nil, assert.AnError) + + cfg := newTestConfig() + cfg.ChipIngressSendInterval = 50 * time.Millisecond + + emitter, err := beholder.NewChipIngressBatchEmitterService(clientMock, cfg, newTestLogger(t)) + require.NoError(t, err) + require.NoError(t, emitter.Start(t.Context())) + + for i := 0; i < 3; i++ { + err = emitter.Emit(t.Context(), []byte("body"), + beholder.AttrKeyDomain, "platform", + beholder.AttrKeyEntity, "TestEvent", + ) + require.NoError(t, err) + } + + assert.Eventually(t, func() bool { + mu.Lock() + defer mu.Unlock() + return callCount > 0 + }, 2*time.Second, 10*time.Millisecond) + + require.NoError(t, emitter.Close()) +} + +func TestChipIngressBatchEmitterService_ContextCancellation(t *testing.T) { + clientMock := mocks.NewClient(t) + clientMock. + On("PublishBatch", mock.Anything, mock.Anything). + Return(nil, nil). + Maybe() + + cfg := newTestConfig() + cfg.ChipIngressBufferSize = 1 + cfg.ChipIngressSendInterval = 10 * time.Second + + emitter, err := beholder.NewChipIngressBatchEmitterService(clientMock, cfg, newTestLogger(t)) + require.NoError(t, err) + require.NoError(t, emitter.Start(t.Context())) + defer emitter.Close() //nolint:errcheck + + ctx, cancel := context.WithCancel(t.Context()) + cancel() + + err = emitter.Emit(ctx, []byte("should-fail"), + beholder.AttrKeyDomain, "platform", + beholder.AttrKeyEntity, "TestEvent", + ) + assert.ErrorIs(t, err, context.Canceled) +} + +func TestChipIngressBatchEmitterService_DefaultConfig(t *testing.T) { + clientMock := mocks.NewClient(t) + + var mu sync.Mutex + var receivedBatch *chipingress.CloudEventBatch + clientMock. + On("PublishBatch", mock.Anything, mock.Anything). + Run(func(args mock.Arguments) { + mu.Lock() + defer mu.Unlock() + receivedBatch = args.Get(1).(*chipingress.CloudEventBatch) + }). + Return(nil, nil) + + emitter, err := beholder.NewChipIngressBatchEmitterService(clientMock, beholder.Config{}, newTestLogger(t)) + require.NoError(t, err) + require.NoError(t, emitter.Start(t.Context())) + + err = emitter.Emit(t.Context(), []byte("body"), + beholder.AttrKeyDomain, "platform", + beholder.AttrKeyEntity, "TestEvent", + ) + require.NoError(t, err) + + assert.Eventually(t, func() bool { + mu.Lock() + defer mu.Unlock() + return receivedBatch != nil + }, 3*time.Second, 50*time.Millisecond) + + require.NoError(t, emitter.Close()) + + mu.Lock() + defer mu.Unlock() + require.Len(t, receivedBatch.Events, 1) +} + +func TestChipIngressBatchEmitterService_EmitAfterClose(t *testing.T) { + clientMock := mocks.NewClient(t) + clientMock. + On("PublishBatch", mock.Anything, mock.Anything). + Return(nil, nil). + Maybe() + + emitter, err := beholder.NewChipIngressBatchEmitterService(clientMock, newTestConfig(), newTestLogger(t)) + require.NoError(t, err) + require.NoError(t, emitter.Start(t.Context())) + require.NoError(t, emitter.Close()) + + err = emitter.Emit(t.Context(), []byte("body"), + beholder.AttrKeyDomain, "platform", + beholder.AttrKeyEntity, "TestEvent", + ) + assert.Error(t, err) +} + +func TestChipIngressBatchEmitterService_EmitWithCallback(t *testing.T) { + t.Run("callback receives nil on success", func(t *testing.T) { + clientMock := mocks.NewClient(t) + clientMock. + On("PublishBatch", mock.Anything, mock.Anything). + Return(nil, nil) + + cfg := newTestConfig() + cfg.ChipIngressSendInterval = 50 * time.Millisecond + + emitter, err := beholder.NewChipIngressBatchEmitterService(clientMock, cfg, newTestLogger(t)) + require.NoError(t, err) + require.NoError(t, emitter.Start(t.Context())) + + done := make(chan error, 1) + err = emitter.EmitWithCallback(t.Context(), []byte("body"), func(sendErr error) { + done <- sendErr + }, + beholder.AttrKeyDomain, "platform", + beholder.AttrKeyEntity, "TestEvent", + ) + require.NoError(t, err) + + select { + case sendErr := <-done: + assert.NoError(t, sendErr) + case <-time.After(3 * time.Second): + t.Fatal("callback was not invoked within timeout") + } + + require.NoError(t, emitter.Close()) + }) + + t.Run("callback receives error on PublishBatch failure", func(t *testing.T) { + clientMock := mocks.NewClient(t) + clientMock. + On("PublishBatch", mock.Anything, mock.Anything). + Return(nil, assert.AnError) + + cfg := newTestConfig() + cfg.ChipIngressSendInterval = 50 * time.Millisecond + + emitter, err := beholder.NewChipIngressBatchEmitterService(clientMock, cfg, newTestLogger(t)) + require.NoError(t, err) + require.NoError(t, emitter.Start(t.Context())) + + done := make(chan error, 1) + err = emitter.EmitWithCallback(t.Context(), []byte("body"), func(sendErr error) { + done <- sendErr + }, + beholder.AttrKeyDomain, "platform", + beholder.AttrKeyEntity, "TestEvent", + ) + require.NoError(t, err) + + select { + case sendErr := <-done: + assert.Error(t, sendErr) + case <-time.After(3 * time.Second): + t.Fatal("callback was not invoked within timeout") + } + + require.NoError(t, emitter.Close()) + }) + + t.Run("callback receives error when buffer is full", func(t *testing.T) { + clientMock := mocks.NewClient(t) + + sendBlocked := make(chan struct{}) + firstCallSignal := make(chan struct{}, 1) + clientMock. + On("PublishBatch", mock.Anything, mock.Anything). + Run(func(_ mock.Arguments) { + select { + case firstCallSignal <- struct{}{}: + default: + } + <-sendBlocked + }). + Return(nil, nil). + Maybe() + + cfg := newTestConfig() + cfg.ChipIngressBufferSize = 2 + cfg.ChipIngressMaxBatchSize = 1 + cfg.ChipIngressMaxConcurrentSends = 1 + cfg.ChipIngressSendInterval = 50 * time.Millisecond + cfg.ChipIngressDrainTimeout = 200 * time.Millisecond + + emitter, err := beholder.NewChipIngressBatchEmitterService(clientMock, cfg, newTestLogger(t)) + require.NoError(t, err) + require.NoError(t, emitter.Start(t.Context())) + defer close(sendBlocked) + defer emitter.Close() //nolint:errcheck + + err = emitter.Emit(t.Context(), []byte("body"), + beholder.AttrKeyDomain, "platform", + beholder.AttrKeyEntity, "TestEvent", + ) + require.NoError(t, err) + + <-firstCallSignal + time.Sleep(100 * time.Millisecond) + + for i := 0; i < 10; i++ { + _ = emitter.Emit(t.Context(), []byte("filler"), + beholder.AttrKeyDomain, "platform", + beholder.AttrKeyEntity, "TestEvent", + ) + } + + dropped := make(chan error, 1) + err = emitter.EmitWithCallback(t.Context(), []byte("overflow"), func(sendErr error) { + dropped <- sendErr + }, + beholder.AttrKeyDomain, "platform", + beholder.AttrKeyEntity, "TestEvent", + ) + assert.NoError(t, err) + + select { + case dropErr := <-dropped: + assert.Error(t, dropErr) + case <-time.After(time.Second): + t.Fatal("callback was not invoked for dropped event") + } + }) + + t.Run("nil callback behaves like Emit", func(t *testing.T) { + clientMock := mocks.NewClient(t) + clientMock. + On("PublishBatch", mock.Anything, mock.Anything). + Return(nil, nil). + Maybe() + + cfg := newTestConfig() + cfg.ChipIngressSendInterval = 50 * time.Millisecond + + emitter, err := beholder.NewChipIngressBatchEmitterService(clientMock, cfg, newTestLogger(t)) + require.NoError(t, err) + require.NoError(t, emitter.Start(t.Context())) + + err = emitter.EmitWithCallback(t.Context(), []byte("body"), nil, + beholder.AttrKeyDomain, "platform", + beholder.AttrKeyEntity, "TestEvent", + ) + assert.NoError(t, err) + + require.NoError(t, emitter.Close()) + }) +} diff --git a/pkg/beholder/client.go b/pkg/beholder/client.go index 328d877c49..aec93bf046 100644 --- a/pkg/beholder/client.go +++ b/pkg/beholder/client.go @@ -26,6 +26,7 @@ import ( "google.golang.org/grpc/credentials/insecure" "github.com/smartcontractkit/chainlink-common/pkg/chipingress" + "github.com/smartcontractkit/chainlink-common/pkg/services" ) const defaultGRPCCompressor = "gzip" @@ -60,6 +61,9 @@ type Client struct { // OnClose OnClose func() error + + // managedServices holds services started/stopped by the application, not by the Client. + managedServices []services.Service } // NewClient creates a new Client with initialized OpenTelemetry components @@ -187,7 +191,7 @@ func NewGRPCClient(cfg Config, otlploggrpcNew otlploggrpcFactory) (*Client, erro // This will eventually be removed in favor of chip-ingress emitter // and logs will be sent via OTLP using the regular Logger instead of calling Emit emitter := NewMessageEmitter(messageLogger) - + var batchEmitterService *ChipIngressBatchEmitterService var chipIngressClient chipingress.Client = &chipingress.NoopClient{} // if chip ingress is enabled, create dual source emitter that sends to both otel collector and chip ingress // eventually we will remove the dual source emitter and just use chip ingress @@ -223,12 +227,24 @@ func NewGRPCClient(cfg Config, otlploggrpcNew otlploggrpcFactory) (*Client, erro return nil, err } - chipIngressEmitter, err := NewChipIngressEmitter(chipIngressClient) - if err != nil { - return nil, fmt.Errorf("failed to create chip ingress emitter: %w", err) + var chipIngressEmitter Emitter + if cfg.ChipIngressBatchEmitterEnabled { + if cfg.ChipIngressLogger == nil { + return nil, fmt.Errorf("ChipIngressLogger is required when ChipIngressBatchEmitterEnabled is true") + } + batchEmitterService, err = NewChipIngressBatchEmitterService(chipIngressClient, cfg, cfg.ChipIngressLogger) + if err != nil { + return nil, fmt.Errorf("failed to create chip ingress batch emitter: %w", err) + } + chipIngressEmitter = batchEmitterService + } else { + chipIngressEmitter, err = NewChipIngressEmitter(chipIngressClient) + if err != nil { + return nil, fmt.Errorf("failed to create chip ingress emitter: %w", err) + } } - emitter, err = NewDualSourceEmitter(chipIngressEmitter, emitter) + emitter, err = NewDualSourceEmitter(chipIngressEmitter, emitter, cfg.ChipIngressBatchEmitterEnabled) if err != nil { return nil, fmt.Errorf("failed to create dual source emitter: %w", err) } @@ -240,20 +256,36 @@ func NewGRPCClient(cfg Config, otlploggrpcNew otlploggrpcFactory) (*Client, erro } return } - return &Client{cfg, logger, tracer, meter, emitter, chipIngressClient, loggerProvider, tracerProvider, meterProvider, messageLoggerProvider, signer, onClose}, nil + var managed []services.Service + if batchEmitterService != nil { + managed = append(managed, batchEmitterService) + } + + return &Client{cfg, logger, tracer, meter, emitter, chipIngressClient, loggerProvider, tracerProvider, meterProvider, messageLoggerProvider, signer, onClose, managed}, nil } -// Closes all providers, flushes all data and stops all background processes -func (c Client) Close() (err error) { - if c.Chip != nil { - err = errors.Join(err, c.Chip.Close()) +// ManagedServices returns services whose lifecycle is owned by the application +// (start, stop, health). Returns nil when the receiver is nil or has none. +func (c *Client) ManagedServices() []services.Service { + if c == nil { + return nil } + return c.managedServices +} + +// Close shuts down OTel providers and the chip ingress connection. +// The batch emitter service may already have been closed by the application; +// the duplicate Close returns an "already stopped" error which is harmless. +func (c Client) Close() (err error) { if c.Emitter != nil { err = errors.Join(err, c.Emitter.Close()) } if c.OnClose != nil { err = errors.Join(err, c.OnClose()) } + if c.Chip != nil { + err = errors.Join(err, c.Chip.Close()) + } return } diff --git a/pkg/beholder/config.go b/pkg/beholder/config.go index f3e4561b68..deeed041fd 100644 --- a/pkg/beholder/config.go +++ b/pkg/beholder/config.go @@ -7,6 +7,8 @@ import ( "go.opentelemetry.io/otel/sdk/metric" "go.opentelemetry.io/otel/sdk/trace" "go.uber.org/zap/zapcore" + + "github.com/smartcontractkit/chainlink-common/pkg/logger" ) type Config struct { @@ -44,6 +46,16 @@ type Config struct { ChipIngressEmitterGRPCEndpoint string ChipIngressInsecureConnection bool // Disables TLS for Chip Ingress Emitter + // Chip Ingress Batch Emitter + ChipIngressBatchEmitterEnabled bool // When true, use batch emitter; when false (default), use legacy per-event emitter + ChipIngressBufferSize uint // Message buffer size (default 1000) + ChipIngressMaxBatchSize uint // Max events per PublishBatch call (default 500) + ChipIngressSendInterval time.Duration // Flush interval (default 100ms) + ChipIngressSendTimeout time.Duration // Timeout per PublishBatch call (default 3s) + ChipIngressDrainTimeout time.Duration // Max time to flush remaining events on shutdown (default 10s) + ChipIngressMaxConcurrentSends int // Max concurrent PublishBatch calls (default 10) + ChipIngressLogger logger.Logger // Required when ChipIngressBatchEmitterEnabled is true + // OTel Log LogExportTimeout time.Duration LogExportInterval time.Duration @@ -91,7 +103,8 @@ var defaultRetryConfig = RetryConfig{ } const ( - defaultPackageName = "beholder" + defaultPackageName = "beholder" + defaultMaxConcurrentSends = 10 ) var defaultOtelAttributes = []attribute.KeyValue{ @@ -131,8 +144,16 @@ func DefaultConfig() Config { LogMaxQueueSize: 2048, LogBatchProcessor: true, LogStreamingEnabled: true, // Enable logs streaming by default - LogLevel: zapcore.InfoLevel, - LogCompressor: "gzip", + LogLevel: zapcore.InfoLevel, + LogCompressor: "gzip", + // Chip Ingress Batch Emitter + ChipIngressBatchEmitterEnabled: false, + ChipIngressBufferSize: 1000, + ChipIngressMaxBatchSize: 500, + ChipIngressSendInterval: 100 * time.Millisecond, + ChipIngressSendTimeout: 3 * time.Second, + ChipIngressDrainTimeout: 10 * time.Second, + ChipIngressMaxConcurrentSends: defaultMaxConcurrentSends, // Auth (defaults to static auth mode with TTL=0) AuthHeadersTTL: 0, } diff --git a/pkg/beholder/config_test.go b/pkg/beholder/config_test.go index ae156db9c9..80bef78d3a 100644 --- a/pkg/beholder/config_test.go +++ b/pkg/beholder/config_test.go @@ -67,6 +67,6 @@ func ExampleConfig() { } fmt.Printf("%+v\n", *config.LogRetryConfig) // Output: - // {InsecureConnection:true CACertFile: OtelExporterGRPCEndpoint:localhost:4317 OtelExporterHTTPEndpoint:localhost:4318 ResourceAttributes:[{Key:package_name Value:{vtype:4 numeric:0 stringly:beholder slice:}} {Key:sender Value:{vtype:4 numeric:0 stringly:beholderclient slice:}}] EmitterExportTimeout:1s EmitterExportInterval:1s EmitterExportMaxBatchSize:512 EmitterMaxQueueSize:2048 EmitterBatchProcessor:true TraceSampleRatio:1 TraceBatchTimeout:1s TraceSpanExporter: TraceRetryConfig: TraceCompressor:gzip MetricReaderInterval:1s MetricRetryConfig: MetricViews:[] MetricCompressor:gzip ChipIngressEmitterEnabled:false ChipIngressEmitterGRPCEndpoint: ChipIngressInsecureConnection:false LogExportTimeout:1s LogExportInterval:1s LogExportMaxBatchSize:512 LogMaxQueueSize:2048 LogBatchProcessor:true LogRetryConfig: LogStreamingEnabled:false LogLevel:info LogCompressor:gzip AuthHeaders:map[] AuthHeadersTTL:0s AuthKeySigner: AuthPublicKeyHex:} + // {InsecureConnection:true CACertFile: OtelExporterGRPCEndpoint:localhost:4317 OtelExporterHTTPEndpoint:localhost:4318 ResourceAttributes:[{Key:package_name Value:{vtype:4 numeric:0 stringly:beholder slice:}} {Key:sender Value:{vtype:4 numeric:0 stringly:beholderclient slice:}}] EmitterExportTimeout:1s EmitterExportInterval:1s EmitterExportMaxBatchSize:512 EmitterMaxQueueSize:2048 EmitterBatchProcessor:true TraceSampleRatio:1 TraceBatchTimeout:1s TraceSpanExporter: TraceRetryConfig: TraceCompressor:gzip MetricReaderInterval:1s MetricRetryConfig: MetricViews:[] MetricCompressor:gzip ChipIngressEmitterEnabled:false ChipIngressEmitterGRPCEndpoint: ChipIngressInsecureConnection:false ChipIngressBatchEmitterEnabled:false ChipIngressBufferSize:0 ChipIngressMaxBatchSize:0 ChipIngressSendInterval:0s ChipIngressSendTimeout:0s ChipIngressDrainTimeout:0s ChipIngressMaxConcurrentSends:0 ChipIngressLogger: LogExportTimeout:1s LogExportInterval:1s LogExportMaxBatchSize:512 LogMaxQueueSize:2048 LogBatchProcessor:true LogRetryConfig: LogStreamingEnabled:false LogLevel:info LogCompressor:gzip AuthHeaders:map[] AuthHeadersTTL:0s AuthKeySigner: AuthPublicKeyHex:} // {InitialInterval:5s MaxInterval:30s MaxElapsedTime:1m0s} } diff --git a/pkg/beholder/dual_source_emitter.go b/pkg/beholder/dual_source_emitter.go index 5167efaece..7d4cdc7748 100644 --- a/pkg/beholder/dual_source_emitter.go +++ b/pkg/beholder/dual_source_emitter.go @@ -18,12 +18,13 @@ type DualSourceEmitter struct { chipIngressEmitter Emitter otelCollectorEmitter Emitter log logger.Logger + nonBlockingEmitter bool stopCh services.StopChan wg services.WaitGroup closed atomic.Bool } -func NewDualSourceEmitter(chipIngressEmitter Emitter, otelCollectorEmitter Emitter) (Emitter, error) { +func NewDualSourceEmitter(chipIngressEmitter Emitter, otelCollectorEmitter Emitter, nonBlockingChipIngress bool) (Emitter, error) { if chipIngressEmitter == nil { return nil, fmt.Errorf("chip ingress emitter is nil") @@ -42,6 +43,7 @@ func NewDualSourceEmitter(chipIngressEmitter Emitter, otelCollectorEmitter Emitt chipIngressEmitter: chipIngressEmitter, otelCollectorEmitter: otelCollectorEmitter, log: logger, + nonBlockingEmitter: nonBlockingChipIngress, stopCh: make(services.StopChan), }, nil } @@ -56,28 +58,31 @@ func (d *DualSourceEmitter) Close() error { } func (d *DualSourceEmitter) Emit(ctx context.Context, body []byte, attrKVs ...any) error { - - // Emit via OTLP first if err := d.otelCollectorEmitter.Emit(ctx, body, attrKVs...); err != nil { return err } - // Emit via chip ingress async - if err := d.wg.TryAdd(1); err != nil { - return err - } - go func(ctx context.Context) { - defer d.wg.Done() - var cancel context.CancelFunc - ctx, cancel = d.stopCh.Ctx(ctx) - defer cancel() - + if d.nonBlockingEmitter { if err := d.chipIngressEmitter.Emit(ctx, body, attrKVs...); err != nil { - // If the chip ingress emitter fails, we ONLY log the error - // because we still want to send the data to the OTLP collector and not cause disruption d.log.Infof("failed to emit to chip ingress: %v", err) } - }(context.WithoutCancel(ctx)) + } else { + // Legacy ChipIngressEmitter.Emit is a synchronous gRPC call; + // fire-and-forget via goroutine to avoid blocking the caller. + if err := d.wg.TryAdd(1); err != nil { + return err + } + go func(ctx context.Context) { + defer d.wg.Done() + var cancel context.CancelFunc + ctx, cancel = d.stopCh.Ctx(ctx) + defer cancel() + + if err := d.chipIngressEmitter.Emit(ctx, body, attrKVs...); err != nil { + d.log.Infof("failed to emit to chip ingress: %v", err) + } + }(context.WithoutCancel(ctx)) + } return nil } diff --git a/pkg/beholder/dual_source_emitter_test.go b/pkg/beholder/dual_source_emitter_test.go index 3429ab1648..314d82348f 100644 --- a/pkg/beholder/dual_source_emitter_test.go +++ b/pkg/beholder/dual_source_emitter_test.go @@ -3,7 +3,9 @@ package beholder_test import ( "context" "fmt" + "sync/atomic" "testing" + "time" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" @@ -18,7 +20,7 @@ func TestNewDualSourceEmitter(t *testing.T) { chipEmitter := &mockEmitter{} otelEmitter := &mockEmitter{} - emitter, err := beholder.NewDualSourceEmitter(chipEmitter, otelEmitter) + emitter, err := beholder.NewDualSourceEmitter(chipEmitter, otelEmitter, false) require.NoError(t, err) assert.NotNil(t, emitter) @@ -29,7 +31,7 @@ func TestNewDualSourceEmitter(t *testing.T) { t.Run("nil chip ingress emitter", func(t *testing.T) { otelEmitter := &mockEmitter{} - emitter, err := beholder.NewDualSourceEmitter(nil, otelEmitter) + emitter, err := beholder.NewDualSourceEmitter(nil, otelEmitter, false) assert.Error(t, err) assert.Nil(t, emitter) @@ -39,7 +41,7 @@ func TestNewDualSourceEmitter(t *testing.T) { t.Run("nil otel collector emitter", func(t *testing.T) { chipEmitter := &mockEmitter{} - emitter, err := beholder.NewDualSourceEmitter(chipEmitter, nil) + emitter, err := beholder.NewDualSourceEmitter(chipEmitter, nil, false) assert.Error(t, err) assert.Nil(t, emitter) @@ -51,7 +53,7 @@ func TestDualSourceEmitterEmit(t *testing.T) { chipEmitter := &mockEmitter{} otelEmitter := &mockEmitter{} - emitter, err := beholder.NewDualSourceEmitter(chipEmitter, otelEmitter) + emitter, err := beholder.NewDualSourceEmitter(chipEmitter, otelEmitter, false) require.NoError(t, err) err = emitter.Emit(t.Context(), []byte("test message"), "key", "value") @@ -67,7 +69,7 @@ func TestDualSourceEmitterEmit(t *testing.T) { }, } - emitter, err := beholder.NewDualSourceEmitter(chipEmitter, otelEmitter) + emitter, err := beholder.NewDualSourceEmitter(chipEmitter, otelEmitter, false) require.NoError(t, err) err = emitter.Emit(t.Context(), []byte("test message")) @@ -76,6 +78,58 @@ func TestDualSourceEmitterEmit(t *testing.T) { }) } +func TestDualSourceEmitterBlockingBehavior(t *testing.T) { + t.Run("legacy mode does not block caller", func(t *testing.T) { + var chipCalled atomic.Bool + chipEmitter := &mockEmitter{ + emitFunc: func(ctx context.Context, body []byte, attrKVs ...any) error { + time.Sleep(200 * time.Millisecond) + chipCalled.Store(true) + return nil + }, + } + otelEmitter := &mockEmitter{} + + emitter, err := beholder.NewDualSourceEmitter(chipEmitter, otelEmitter, false) + require.NoError(t, err) + + start := time.Now() + err = emitter.Emit(t.Context(), []byte("test")) + elapsed := time.Since(start) + + assert.NoError(t, err) + assert.Less(t, elapsed, 100*time.Millisecond, + "Emit should return immediately; chip ingress runs in a goroutine") + assert.False(t, chipCalled.Load(), + "chip ingress emit should still be in-flight when Emit returns") + + require.NoError(t, emitter.Close()) + assert.True(t, chipCalled.Load(), + "Close should wait for the in-flight chip ingress emit to finish") + }) + + t.Run("non-blocking mode emits inline", func(t *testing.T) { + var chipCalled atomic.Bool + chipEmitter := &mockEmitter{ + emitFunc: func(ctx context.Context, body []byte, attrKVs ...any) error { + chipCalled.Store(true) + return nil + }, + } + otelEmitter := &mockEmitter{} + + emitter, err := beholder.NewDualSourceEmitter(chipEmitter, otelEmitter, true) + require.NoError(t, err) + + err = emitter.Emit(t.Context(), []byte("test")) + assert.NoError(t, err) + assert.True(t, chipCalled.Load(), + "chip ingress emit should complete before Emit returns") + + require.NoError(t, emitter.Close()) + }) +} + // Mock emitter for testing type mockEmitter struct { emitFunc func(ctx context.Context, body []byte, attrKVs ...any) error diff --git a/pkg/beholder/global_test.go b/pkg/beholder/global_test.go index df57d0a41b..0867609f78 100644 --- a/pkg/beholder/global_test.go +++ b/pkg/beholder/global_test.go @@ -77,6 +77,9 @@ func TestClient_SetGlobalOtelProviders(t *testing.T) { var b strings.Builder client, err := beholder.NewWriterClient(&b) require.NoError(t, err) + defer func() { + require.NoError(t, client.Close()) + }() // Set global Otel Client beholder.SetClient(client) diff --git a/pkg/beholder/httpclient.go b/pkg/beholder/httpclient.go index 693f581452..7bea69a424 100644 --- a/pkg/beholder/httpclient.go +++ b/pkg/beholder/httpclient.go @@ -187,7 +187,7 @@ func NewHTTPClient(cfg Config, otlploghttpNew otlploghttpFactory) (*Client, erro return } // HTTP client doesn't currently support rotating auth, so lazySigner is always nil - return &Client{cfg, logger, tracer, meter, emitter, nil, loggerProvider, tracerProvider, meterProvider, messageLoggerProvider, nil, onClose}, nil + return &Client{cfg, logger, tracer, meter, emitter, nil, loggerProvider, tracerProvider, meterProvider, messageLoggerProvider, nil, onClose, nil}, nil } func newHTTPTracerProvider(config Config, resource *sdkresource.Resource, tlsConfig *tls.Config) (*sdktrace.TracerProvider, error) { diff --git a/pkg/beholder/managed_services_test.go b/pkg/beholder/managed_services_test.go new file mode 100644 index 0000000000..476cd4dc44 --- /dev/null +++ b/pkg/beholder/managed_services_test.go @@ -0,0 +1,105 @@ +package beholder_test + +import ( + "context" + "testing" + "time" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + + "github.com/smartcontractkit/chainlink-common/pkg/services" + + "github.com/smartcontractkit/chainlink-common/pkg/beholder" +) + +func TestManagedServices_NilReceiver(t *testing.T) { + var c *beholder.Client + assert.Nil(t, c.ManagedServices()) +} + +func TestManagedServices_NoopClient(t *testing.T) { + c := beholder.NewNoopClient() + assert.Empty(t, c.ManagedServices()) +} + +func TestManagedServices_BatchDisabled(t *testing.T) { + client, err := beholder.NewClient(beholder.Config{ + OtelExporterGRPCEndpoint: "localhost:4317", + ChipIngressEmitterEnabled: true, + ChipIngressEmitterGRPCEndpoint: "localhost:9090", + ChipIngressInsecureConnection: true, + ChipIngressBatchEmitterEnabled: false, + }) + require.NoError(t, err) + defer func() { _ = client.Close() }() + + assert.Empty(t, client.ManagedServices()) +} + +func TestManagedServices_BatchEnabled(t *testing.T) { + lggr := newTestLogger(t) + + client, err := beholder.NewClient(beholder.Config{ + OtelExporterGRPCEndpoint: "localhost:4317", + ChipIngressEmitterEnabled: true, + ChipIngressEmitterGRPCEndpoint: "localhost:9090", + ChipIngressInsecureConnection: true, + ChipIngressBatchEmitterEnabled: true, + ChipIngressLogger: lggr, + ChipIngressBufferSize: 10, + ChipIngressMaxBatchSize: 5, + ChipIngressSendInterval: 50 * time.Millisecond, + ChipIngressSendTimeout: 1 * time.Second, + ChipIngressDrainTimeout: 1 * time.Second, + }) + require.NoError(t, err) + + managed := client.ManagedServices() + require.Len(t, managed, 1) + assert.Equal(t, "ChipIngressBatchEmitterService", managed[0].Name()) + + // Service should be startable (not already started) + err = managed[0].Start(context.Background()) + require.NoError(t, err) + + err = managed[0].Close() + require.NoError(t, err) + + // Client.Close calls DualSourceEmitter.Close which calls the batch emitter's + // Close a second time. StopOnce returns "already stopped" — harmless. + err = client.Close() + require.Error(t, err) + assert.ErrorIs(t, err, services.ErrAlreadyStopped) +} + +func TestManagedServices_BatchEmitterNotAutoStarted(t *testing.T) { + lggr := newTestLogger(t) + + client, err := beholder.NewClient(beholder.Config{ + OtelExporterGRPCEndpoint: "localhost:4317", + ChipIngressEmitterEnabled: true, + ChipIngressEmitterGRPCEndpoint: "localhost:9090", + ChipIngressInsecureConnection: true, + ChipIngressBatchEmitterEnabled: true, + ChipIngressLogger: lggr, + ChipIngressBufferSize: 10, + ChipIngressMaxBatchSize: 5, + ChipIngressSendInterval: 50 * time.Millisecond, + ChipIngressSendTimeout: 1 * time.Second, + ChipIngressDrainTimeout: 1 * time.Second, + }) + require.NoError(t, err) + + managed := client.ManagedServices() + require.Len(t, managed, 1) + + // The service should not be ready yet (not started) + err = managed[0].Ready() + assert.Error(t, err, "service should not be ready before Start()") + + // Start, verify ready, then close + require.NoError(t, managed[0].Start(context.Background())) + require.NoError(t, managed[0].Ready()) + require.NoError(t, managed[0].Close()) +} diff --git a/pkg/beholder/noop.go b/pkg/beholder/noop.go index 67580ec15c..1ed0a0144a 100644 --- a/pkg/beholder/noop.go +++ b/pkg/beholder/noop.go @@ -39,7 +39,7 @@ func NewNoopClient() *Client { // ChipIngress chipClient := &chipingress.NoopClient{} - return &Client{cfg, logger, tracer, meter, messageEmitter, chipClient, loggerProvider, tracerProvider, meterProvider, loggerProvider, nil, noopOnClose} + return &Client{cfg, logger, tracer, meter, messageEmitter, chipClient, loggerProvider, tracerProvider, meterProvider, loggerProvider, nil, noopOnClose, nil} } // NewStdoutClient creates a new Client with exporters which send telemetry data to standard output diff --git a/pkg/chipingress/batch/client.go b/pkg/chipingress/batch/client.go index e02704b26b..af97c0684b 100644 --- a/pkg/chipingress/batch/client.go +++ b/pkg/chipingress/batch/client.go @@ -110,7 +110,9 @@ func (b *Client) Start(ctx context.Context) { // Forcibly shutdowns down after timeout if not completed. func (b *Client) Stop() { b.shutdownOnce.Do(func() { - ctx, cancel := b.stopCh.CtxWithTimeout(b.shutdownTimeout) + // Use a standalone timeout context so the shutdown wait isn't cancelled + // by close(b.stopCh) below. + ctx, cancel := context.WithTimeout(context.Background(), b.shutdownTimeout) defer cancel() if b.cancelBatcher != nil { diff --git a/pkg/chipingress/batch/client_test.go b/pkg/chipingress/batch/client_test.go index 7f8c356fb3..dab751e06c 100644 --- a/pkg/chipingress/batch/client_test.go +++ b/pkg/chipingress/batch/client_test.go @@ -837,6 +837,11 @@ func TestStop(t *testing.T) { t.Run("QueueMessage returns error after Stop", func(t *testing.T) { mockClient := mocks.NewClient(t) + mockClient. + On("PublishBatch", mock.Anything, mock.Anything). + Return(&chipingress.PublishResponse{}, nil). + Maybe() + client, err := NewBatchClient(mockClient, WithBatchSize(10)) require.NoError(t, err) @@ -853,7 +858,7 @@ func TestStop(t *testing.T) { }, nil) require.NoError(t, err) - // Stop the client + // Stop the client — drains any buffered messages client.Stop() // Queue message after stop - should fail diff --git a/pkg/loop/config.go b/pkg/loop/config.go index 19e1ab7f10..1a0f71e26d 100644 --- a/pkg/loop/config.go +++ b/pkg/loop/config.go @@ -77,8 +77,9 @@ const ( envTelemetryMetricCompressor = "CL_TELEMETRY_METRIC_COMPRESSOR" envTelemetryLogCompressor = "CL_TELEMETRY_LOG_COMPRESSOR" - envChipIngressEndpoint = "CL_CHIP_INGRESS_ENDPOINT" - envChipIngressInsecureConnection = "CL_CHIP_INGRESS_INSECURE_CONNECTION" + envChipIngressEndpoint = "CL_CHIP_INGRESS_ENDPOINT" + envChipIngressInsecureConnection = "CL_CHIP_INGRESS_INSECURE_CONNECTION" + envChipIngressBatchEmitterEnabled = "CL_CHIP_INGRESS_BATCH_EMITTER_ENABLED" envCRESettings = cresettings.EnvNameSettings envCRESettingsDefault = cresettings.EnvNameSettingsDefault @@ -148,8 +149,9 @@ type EnvConfig struct { TelemetryMetricCompressor string TelemetryLogCompressor string - ChipIngressEndpoint string - ChipIngressInsecureConnection bool + ChipIngressEndpoint string + ChipIngressInsecureConnection bool + ChipIngressBatchEmitterEnabled bool CRESettings string CRESettingsDefault string @@ -234,6 +236,7 @@ func (e *EnvConfig) AsCmdEnv() (env []string) { add(envChipIngressEndpoint, e.ChipIngressEndpoint) add(envChipIngressInsecureConnection, strconv.FormatBool(e.ChipIngressInsecureConnection)) + add(envChipIngressBatchEmitterEnabled, strconv.FormatBool(e.ChipIngressBatchEmitterEnabled)) if e.CRESettings != "" { add(envCRESettings, e.CRESettings) @@ -449,6 +452,10 @@ func (e *EnvConfig) parse() error { if err != nil { return fmt.Errorf("failed to parse %s: %w", envChipIngressInsecureConnection, err) } + e.ChipIngressBatchEmitterEnabled, err = getBool(envChipIngressBatchEmitterEnabled) + if err != nil { + return fmt.Errorf("failed to parse %s: %w", envChipIngressBatchEmitterEnabled, err) + } } e.CRESettings = os.Getenv(envCRESettings) diff --git a/pkg/loop/config_test.go b/pkg/loop/config_test.go index a0b5ef83d9..5207dbbeaa 100644 --- a/pkg/loop/config_test.go +++ b/pkg/loop/config_test.go @@ -78,8 +78,9 @@ func TestEnvConfig_parse(t *testing.T) { envTelemetryEmitterMaxQueueSize: "1000", envTelemetryLogStreamingEnabled: "false", - envChipIngressEndpoint: "chip-ingress.example.com:50051", - envChipIngressInsecureConnection: "true", + envChipIngressEndpoint: "chip-ingress.example.com:50051", + envChipIngressInsecureConnection: "true", + envChipIngressBatchEmitterEnabled: "false", envCRESettings: `{"global":{}}`, envCRESettingsDefault: `{"foo":"bar"}`, @@ -182,8 +183,9 @@ var envCfgFull = EnvConfig{ TelemetryEmitterMaxQueueSize: 1000, TelemetryLogStreamingEnabled: false, - ChipIngressEndpoint: "chip-ingress.example.com:50051", - ChipIngressInsecureConnection: true, + ChipIngressEndpoint: "chip-ingress.example.com:50051", + ChipIngressInsecureConnection: true, + ChipIngressBatchEmitterEnabled: false, CRESettings: `{"global":{}}`, CRESettingsDefault: `{"foo":"bar"}`, @@ -239,6 +241,7 @@ func TestEnvConfig_AsCmdEnv(t *testing.T) { // Assert ChipIngress environment variables assert.Equal(t, "chip-ingress.example.com:50051", got[envChipIngressEndpoint]) assert.Equal(t, "true", got[envChipIngressInsecureConnection]) + assert.Equal(t, "false", got[envChipIngressBatchEmitterEnabled]) assert.Equal(t, `{"global":{}}`, got[envCRESettings]) assert.Equal(t, `{"foo":"bar"}`, got[envCRESettingsDefault]) diff --git a/pkg/loop/server.go b/pkg/loop/server.go index ae61c9d6c4..3fa83da4c9 100644 --- a/pkg/loop/server.go +++ b/pkg/loop/server.go @@ -94,6 +94,7 @@ type Server struct { webServer *webServer checker *services.HealthChecker LimitsFactory limits.Factory + managedServices []services.Service } func newServer(loggerName string) (*Server, error) { @@ -173,6 +174,8 @@ func (s *Server) start(opts ...ServerOpt) error { ChipIngressEmitterEnabled: s.EnvConfig.ChipIngressEndpoint != "", ChipIngressEmitterGRPCEndpoint: s.EnvConfig.ChipIngressEndpoint, ChipIngressInsecureConnection: s.EnvConfig.ChipIngressInsecureConnection, + ChipIngressBatchEmitterEnabled: s.EnvConfig.ChipIngressBatchEmitterEnabled, + ChipIngressLogger: s.Logger, MetricCompressor: s.EnvConfig.TelemetryMetricCompressor, } @@ -212,6 +215,13 @@ func (s *Server) start(opts ...ServerOpt) error { beholder.SetClient(beholderClient) beholder.SetGlobalOtelProviders() + for _, svc := range beholderClient.ManagedServices() { + if err := svc.Start(ctx); err != nil { + return fmt.Errorf("failed to start beholder managed service %s: %w", svc.Name(), err) + } + s.managedServices = append(s.managedServices, svc) + } + if beholderCfg.LogStreamingEnabled { otelLogger, err := NewOtelLogger(beholderClient.Logger, beholderCfg.LogLevel) if err != nil { @@ -283,6 +293,9 @@ func (s *Server) Register(c services.HealthReporter) error { return s.checker.Re // Stop closes resources and flushes logs. func (s *Server) Stop() { + for i := len(s.managedServices) - 1; i >= 0; i-- { + s.Logger.ErrorIfFn(s.managedServices[i].Close, "Failed to close managed service") + } if s.dbStatsReporter != nil { s.dbStatsReporter.Stop() }