Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
26 commits
Select commit Hold shift + click to select a range
d3eed64
feat(consumoor): bump default ClickHouse maxConns from 8 to 32 (#790)
samcm Feb 25, 2026
49cf6c8
feat(consumoor): add per-table adaptive concurrency limiting for Clic…
samcm Feb 25, 2026
36a903d
feat(consumoor): add configurable TCP connect timeout for kafka broke…
samcm Feb 25, 2026
27fefae
fix(consumoor): query lag for per-topic consumer groups instead of ba…
samcm Feb 25, 2026
13bdc19
feat(consumoor): add Benthos output batching to fix 1-row INSERT bott…
samcm Feb 25, 2026
308d033
feat(consumoor): remove per-table serialization, enable concurrent IN…
samcm Feb 25, 2026
7a0b53d
feat(consumoor): add per-topic batch config overrides
samcm Feb 25, 2026
e974d8c
perf(consumoor): reduce GC pressure via allocation optimizations
samcm Feb 25, 2026
8fa34c2
perf(consumoor): use vtprotobuf for reflection-free proto deserializa…
samcm Feb 25, 2026
67c9da4
feat(consumoor): use per-topic consumer groups for lag monitoring
samcm Feb 25, 2026
b5132b9
feat(consumoor): add configurable rebalance timeout for faster partit…
samcm Feb 25, 2026
1e404a8
perf(consumoor): use sync.Pool for DecoratedEvent via vtprotobuf pool…
samcm Feb 25, 2026
f317b9a
feat(consumoor): add event_lag_seconds histogram metric
samcm Feb 25, 2026
9b14199
feat(consumoor): increase default outputBatchPeriod to 5s
samcm Feb 26, 2026
43d4948
feat(consumoor): add missing observability metrics and dashboard panels
samcm Feb 26, 2026
0a50359
fix(consumoor): align data transformations with Vector pipeline
samcm Feb 27, 2026
643cc79
consumoor: add rate-limited logging, unknown event warnings, and inva…
samcm Mar 2, 2026
794abd9
consumoor: log bad events in JSON form for invalid events and flatten…
samcm Mar 2, 2026
6cc72ce
consumoor: fix error handling gaps — DLQ invalid events, permanent fl…
samcm Mar 2, 2026
d966f0e
consumoor: drop intentionally unsupported events; NAK unknown routes
samcm Mar 2, 2026
975d737
yeah (#804)
Savid Apr 1, 2026
f45bf97
consumoor: fix review findings — config strictness, failure isolation…
samcm Apr 1, 2026
e8ad155
consumoor: fix lint issues from CI
samcm Apr 1, 2026
a9184a5
fix: bump grpc to v1.79.3 for GO-2026-4762, fix buf action hash
samcm Apr 1, 2026
902beb9
fix attestation order by
Savid Apr 1, 2026
fbf30c6
consumoor: add clientId config for shared Kafka client-id
samcm Apr 1, 2026
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
The table of contents is too big for display.
Diff view
Diff view
  •  
  •  
  •  
2 changes: 1 addition & 1 deletion .github/workflows/buf.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ jobs:
uses: actions/checkout@08c6903cd8c0fde910a37f88322edcfb5dd907a8 # v5.0.0

- name: Set up buf
uses: bufbuild/buf-setup-action@a47c93e0b1648769eb9a2e1f98e7b4e3e13089f0 # v1.50.0
uses: bufbuild/buf-setup-action@a47c93e0b1648d5651a065437926377d060baa99 # v1.50.0

- name: Lint protos
run: buf lint
Expand Down
12 changes: 12 additions & 0 deletions buf.gen.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -6,3 +6,15 @@ plugins:
- remote: buf.build/grpc/go:v1.5.1
out: .
opt: paths=source_relative
- remote: buf.build/community/planetscale-vtprotobuf
out: .
opt:
- paths=source_relative
- features=marshal+unmarshal+size+pool
- pool=github.com/ethpandaops/xatu/pkg/proto/xatu.*
- pool=github.com/ethpandaops/xatu/pkg/proto/eth/v1.*
- pool=github.com/ethpandaops/xatu/pkg/proto/eth/v2.*
- pool=github.com/ethpandaops/xatu/pkg/proto/libp2p.*
- pool=github.com/ethpandaops/xatu/pkg/proto/libp2p/gossipsub.*
- pool=github.com/ethpandaops/xatu/pkg/proto/mevrelay.*
- pool=github.com/ethpandaops/xatu/pkg/proto/noderecord.*
15 changes: 10 additions & 5 deletions cmd/consumoor.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
package cmd

import (
"bytes"
"os"

"github.com/creasty/defaults"
Expand Down Expand Up @@ -36,14 +37,15 @@ func createConsumoorOverride(config ConsumoorOverrideConfig) ConsumoorOverride {
Setter: func(cmd *cobra.Command, overrides *consumoor.Override) error {
val := ""

if cmd.Flags().Changed(config.FlagName) {
val = cmd.Flags().Lookup(config.FlagName).Value.String()
}

// Precedence: flag > env > config file.
if os.Getenv(config.EnvName) != "" {
val = os.Getenv(config.EnvName)
}

if cmd.Flags().Changed(config.FlagName) {
val = cmd.Flags().Lookup(config.FlagName).Value.String()
}

if val == "" {
return nil
}
Expand Down Expand Up @@ -134,7 +136,10 @@ func loadConsumoorConfigFromFile(file string) (*consumoor.Config, error) {

type plain consumoor.Config

if err := yaml.Unmarshal(yamlFile, (*plain)(config)); err != nil {
dec := yaml.NewDecoder(bytes.NewReader(yamlFile))
dec.KnownFields(true)

if err := dec.Decode((*plain)(config)); err != nil {
return nil, err
}

Expand Down
Loading
Loading