feat: add ChipIngress batch emitter support#21327
Conversation
|
👋 thomaska, thanks for creating this pull request! To help reviewers, please consider creating future PRs as drafts first. This allows you to self-review and make any final changes before notifying the team. Once you're ready, you can mark it as "Ready for review" to request feedback. Thanks! |
|
✅ No conflicts with other open PRs targeting |
There was a problem hiding this comment.
Pull request overview
Adds PublishBatch support to the chip-testsink gRPC server so CRE/system tests don’t fail with UNIMPLEMENTED when nodes emit batched ChIP ingress events.
Changes:
- Implement
PublishBatchon the chip-testsinkChipIngressServer. - Delegate batch handling to the existing
Publishflow (including configuredPublishFuncand optional upstream forwarding).
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| } | ||
|
|
||
| for _, event := range batch.Events { | ||
| if _, err := s.Publish(ctx, event); err != nil { |
There was a problem hiding this comment.
Calling s.Publish() inside the batch loop triggers the per-event async upstream forwarding goroutine in Publish(). For large batches this can create a burst of goroutines and N upstream RPCs. Consider handling upstream forwarding in PublishBatch with a single PublishBatch call (or at least a bounded worker/pool), and calling the configured PublishFunc directly for local handling to avoid unbounded goroutine/RPC fan-out per batch.
| if _, err := s.Publish(ctx, event); err != nil { | |
| // Forward upstream synchronously to avoid spawning a goroutine per event. | |
| if s.cfg.UpstreamEndpoint != "" { | |
| forwardCtx, cancelFn := context.WithTimeout(context.Background(), 10*time.Second) | |
| _, err := s.upstream.Publish(forwardCtx, event) | |
| cancelFn() | |
| if err != nil { | |
| log.Printf("failed to forward to upstream: %v", err) | |
| } | |
| } | |
| if _, err := s.cfg.PublishFunc(ctx, event); err != nil { |
| // It delegates each event in the batch to the configured PublishFunc, | ||
| // mirroring how the real ChIP Ingress processes batches atomically. |
There was a problem hiding this comment.
The doc comment claims this mirrors how the real ChIP ingress processes batches "atomically", but this implementation is not atomic: it publishes events one-by-one and can return an error after earlier events have already been accepted/forwarded. Please either adjust the comment to reflect best-effort sequential processing, or change the implementation to provide the atomicity guarantees being documented.
| // It delegates each event in the batch to the configured PublishFunc, | |
| // mirroring how the real ChIP Ingress processes batches atomically. | |
| // It delegates each event in the batch to the configured PublishFunc | |
| // sequentially, returning an error on the first failure. Earlier events | |
| // in the batch may already have been published or forwarded when an error | |
| // is returned, so processing is best-effort rather than atomic. |
|
I see you updated files related to
|
# Conflicts: # core/scripts/go.mod # core/scripts/go.sum # deployment/go.mod # deployment/go.sum # go.mod # go.sum # integration-tests/go.mod # integration-tests/go.sum # integration-tests/load/go.sum # system-tests/lib/go.mod # system-tests/lib/go.sum # system-tests/tests/go.sum
|
Based on my thorough review of both PRs, here is a concrete implementation plan for registering Implementation PlanProblemThe
Step 1: Expose the batch emitter from
|
| Concern | How it's addressed |
|---|---|
| Initialization order | initGlobals → beholder.NewGRPCClient runs in beforeNode() before NewApplication, so beholder.GetClient() is already populated |
| Double-start | services.Engine's Start() is idempotent — the service framework calling Start() again is a no-op since it's already running |
| Nil safety | When ChipIngressBatchEmitterEnabled = false (default), batchEmitterService remains nil, and both nil checks protect against it |
| Shutdown order | Services in srvcs are stopped in reverse order. The emitter is closed before beholder's own Client.Close() (which closes the gRPC connection), matching the drain-before-disconnect requirement already implemented in PR #1862's reordered Client.Close() |
| Health checks | ChipIngressBatchEmitter implements services.Service via services.Engine, exposing Name(), Ready(), HealthReport() — all needed for /health |
Files summary
| File | Repo | Change |
|---|---|---|
pkg/beholder/client.go |
chainlink-common (PR #1862) |
Add BatchEmitter field to Client, store it in constructor, add GetChipIngressBatchEmitter() getter |
core/services/chainlink/application.go |
chainlink (PR #21327) |
After telemetryManager, retrieve and append batch emitter to srvcs |
Note: The PR #21327 file list may be incomplete (API results limited to 30 files). You can view the full file list here.
…ingress-publishBatch
# Conflicts: # core/scripts/cre/environment/examples/workflows/v1/proof-of-reserve/cron-based/go.mod # core/scripts/cre/environment/examples/workflows/v1/proof-of-reserve/cron-based/go.sum # core/scripts/cre/environment/examples/workflows/v1/proof-of-reserve/web-trigger-based/go.mod # core/scripts/cre/environment/examples/workflows/v1/proof-of-reserve/web-trigger-based/go.sum # core/scripts/cre/environment/examples/workflows/v2/proof-of-reserve/cron-based/go.mod # core/scripts/cre/environment/examples/workflows/v2/proof-of-reserve/cron-based/go.sum # core/scripts/go.mod # core/scripts/go.sum # deployment/go.mod # deployment/go.sum # devenv/go.mod # devenv/go.sum # go.mod # go.sum # integration-tests/go.mod # integration-tests/go.sum # integration-tests/load/go.mod # integration-tests/load/go.sum # system-tests/lib/go.mod # system-tests/lib/go.sum # system-tests/tests/canaries_sentinels/proof-of-reserve/cron-based/go.mod # system-tests/tests/canaries_sentinels/proof-of-reserve/cron-based/go.sum # system-tests/tests/go.mod # system-tests/tests/go.sum
…ingress-publishBatch # Conflicts: # core/scripts/go.mod # core/scripts/go.sum # deployment/go.mod # deployment/go.sum # devenv/go.mod # go.mod # go.sum # integration-tests/go.mod # integration-tests/go.sum # integration-tests/load/go.mod # integration-tests/load/go.sum # system-tests/lib/go.mod # system-tests/lib/go.sum # system-tests/tests/go.mod # system-tests/tests/go.sum
…ngress-publishBatch' into infoplat-3436-chipingress-publishBatch
| name: Validate go.mod dependencies | ||
| runs-on: ubuntu-latest | ||
| if: ${{ github.event_name == 'pull_request' }} | ||
| steps: | ||
| - uses: actions/checkout@v6 | ||
|
|
||
| - name: Validate go.mod | ||
| uses: smartcontractkit/.github/apps/go-mod-validator@go-mod-validator/v1 | ||
| with: | ||
| repo-branch-exceptions: | | ||
| smartcontractkit/chainlink-ccip:develop | ||
| smartcontractkit/chainlink-sui:2.31.0-cherry-picked | ||
| smartcontractkit/chainlink-protos:capabilities-development | ||
| smartcontractkit/cre-sdk-go:capabilities-development | ||
| smartcontractkit/cre-sdk-typescript:capabilities-development | ||
| smartcontractkit/chainlink-common:infoplat-3436-chipingress-publishBatch |
Check warning
Code scanning / CodeQL
Workflow does not contain permissions Medium
Show autofix suggestion
Hide autofix suggestion
Copilot Autofix
AI 7 days ago
To fix this, explicitly restrict the GITHUB_TOKEN permissions used by the workflow so it doesn’t rely on potentially broad repository defaults. The most direct and non-breaking approach is to add a permissions block setting contents: read, which is sufficient for checking out code and reading go.mod files.
The single best fix here is to add a job-level permissions block under go-mod-validation: in .github/workflows/go-mod-validation.yml, right after the job name line. This ensures only this job is affected and does not change any triggers, steps, or existing behavior beyond tightening token permissions. No imports, methods, or additional definitions are needed.
Concretely, in .github/workflows/go-mod-validation.yml, modify the go-mod-validation job definition to include:
permissions:
contents: readso that the job explicitly documents and enforces read-only access to repository contents for the GITHUB_TOKEN.
| @@ -8,6 +8,8 @@ | ||
| go-mod-validation: | ||
| name: Validate go.mod dependencies | ||
| runs-on: ubuntu-latest | ||
| permissions: | ||
| contents: read | ||
| if: ${{ github.event_name == 'pull_request' }} | ||
| steps: | ||
| - uses: actions/checkout@v6 |
|




Ticket: https://smartcontract-it.atlassian.net/browse/INFOPLAT-3436
Summary
ChipIngressBatchEmitterEnabledtelemetry config flag (defaultfalse)to toggle batch mode per-node without a code change
PublishBatchon the chip-testsink gRPC server so CRE system testswork when batch mode is enabled
chainlink-commonto include batch-emitter feature-flag supportDetail
Config flag – new
ChipIngressBatchEmitterEnabledboolean in[Telemetry].Wired through the config interface, TOML types, beholder globals, docs, and
test fixtures.
chip-testsink – the test-helper server only had single-event
Publish,inheriting
UNIMPLEMENTEDforPublishBatch. Now delegates each batch event tothe configured
PublishFuncand forwards the full batch upstream in one RPC.Why
9 CRE system tests depend on chip-testsink. Without this change they get gRPC
UNIMPLEMENTEDerrors once batch mode is the default.Requires
smartcontractkit/chainlink-common#1862