diff --git a/pkg/beholder/DurableEmitterDesign.md b/pkg/beholder/DurableEmitterDesign.md new file mode 100644 index 0000000000..7f34cca20b --- /dev/null +++ b/pkg/beholder/DurableEmitterDesign.md @@ -0,0 +1,416 @@ +# Durable Event Buffer for ChIP + +## Problem Statement + +Today there is no persistence in the ChIP pipeline. The `ChipIngressEmitter` calls `chipingress.Client.Publish()` synchronously over gRPC, and the `batch.Client` uses an in-memory channel buffer of 200 messages. If the node crashes, Chip is unreachable, or the buffer fills up, events — including billing records — are silently dropped. + +**Drop points in the current architecture:** + +``` +DualSourceEmitter.Emit() + ├── OTLP (sync) — errors returned to caller + └── ChipIngressEmitter (async goroutine) + │ + └── chipingress.Client.Publish() ← fire-and-forget gRPC + │ + ├── If Chip is down → error logged, event LOST + ├── If node crashes mid-flight → event LOST + └── batch.Client buffer full → "message buffer is full", event LOST +``` + +- `batch.Client` drops messages when its 200-message channel is full. +- `DualSourceEmitter` only logs chip-ingress failures — errors are swallowed. +- No event survives a node restart. Nothing is persisted to disk. + +**Impact:** Billing records are silently dropped, leading to inconsistent revenue reconciliation. Any customer-facing observability that flows through ChIP is unreliable. + +## Requirements + +### Functional +- Events must not be lost on node restarts. +- Events must be delivered within a reasonable period of time (seconds under normal operation). +- System must support eventually-consistent billing. +- Node databases must not bloat unboundedly. + +### Non-Functional +- Scale to 1k+ TPS per node. +- 4 nines of availability (99.99%). +- `Emit()` must not block workflow execution. + +## Architecture + +### High-Level Flow + +``` +Workflow Engine / Billing / Lifecycle Events + │ + ▼ + DualSourceEmitter.Emit() + │ + ├── OTLP MessageEmitter (sync, unchanged) + │ + └── DurableEmitter.Emit() + │ + ├─ 1. ExtractSourceAndType + build CloudEventPb + ├─ 2. proto.Marshal → bytes + ├─ 3. store.Insert(payload) ← DURABLE GUARANTEE + ├─ 4. return nil (caller unblocked) + │ + └─ 5. goroutine: client.Publish(eventPb) + ├── Success → store.Delete(id) + └── Failure → no-op (retransmit loop handles it) + + ┌─────────────────────────────────────┐ + │ Background Retransmit Loop │ every 5s (configurable) + │ │ + │ store.ListPending(olderThan 10s) │ + │ → client.PublishBatch(events) │ + │ → store.Delete(ids) on success │ + └─────────────────────────────────────┘ + + ┌─────────────────────────────────────┐ + │ Background Expiry Loop │ every 1min (configurable) + │ │ + │ store.DeleteExpired(ttl=24h) │ + │ → GC events that could never be │ + │ delivered (bounds table growth) │ + └─────────────────────────────────────┘ +``` + +### Key Guarantee + +`Emit()` returns `nil` once the Postgres INSERT succeeds. Even if the node crashes immediately after, the event survives in Postgres and will be retransmitted on restart. The gRPC publish is fully asynchronous — `Emit()` latency is dominated by one DB insert (~1ms at typical payloads). + +### Design Decision: Standard `chipingress.Client`, Not `batch.Client` + +Per Hagen's guidance, we use the standard `chipingress.Client` directly (supports both `Publish` and `PublishBatch`) since we are implementing our own queuing with persistence-backed guarantees. The `batch.Client`'s in-memory buffer is redundant when we have Postgres as the durable queue. + +### Service Principal & ACK Guarantees + +CRE nodes authenticate to ChIP using the node's **CSA Key** as the `servicePrincipal`. This is NOT the `oti-telemetry-shared` principal which uses a fire-and-forget publish path. With the CSA Key, the gateway waits for Kafka ACKs before returning a gRPC response — a successful response means the event was durably accepted. + +## Components + +### DurableEventStore Interface (`chainlink-common`) + +```go +type DurableEvent struct { + ID int64 + Payload []byte // serialized CloudEventPb proto + CreatedAt time.Time +} + +type DurableEventStore interface { + Insert(ctx context.Context, payload []byte) (int64, error) + Delete(ctx context.Context, id int64) error + ListPending(ctx context.Context, createdBefore time.Time, limit int) ([]DurableEvent, error) + DeleteExpired(ctx context.Context, ttl time.Duration) (int64, error) +} +``` + +Two implementations: +- **`MemDurableEventStore`** — in-memory map, for unit/integration tests. Lives in `chainlink-common`. +- **`PgDurableEventStore`** — Postgres-backed ORM using `sqlutil.DataSource`. Lives in `chainlink`. + +### DurableEmitter (`chainlink-common`) + +Implements `beholder.Emitter` (`Emit` + `Close`). Core logic: + +```go +func (d *DurableEmitter) Emit(ctx context.Context, body []byte, attrKVs ...any) error { + // 1. Validate and extract source/type from attributes + sourceDomain, entityType, err := ExtractSourceAndType(attrKVs...) + + // 2. Build CloudEvent and serialize to proto bytes + event, _ := chipingress.NewEvent(sourceDomain, entityType, body, newAttributes(attrKVs...)) + eventPb, _ := chipingress.EventToProto(event) + payload, _ := proto.Marshal(eventPb) + + // 3. Persist — this is the durable guarantee + id, err := d.store.Insert(ctx, payload) + if err != nil { + return fmt.Errorf("failed to persist event: %w", err) + } + + // 4. Async delivery attempt + go d.publishAndDelete(id, eventPb) + return nil +} +``` + +### Configuration + +```go +type DurableEmitterConfig struct { + RetransmitInterval time.Duration // default 5s — retransmit loop tick rate + RetransmitAfter time.Duration // default 10s — min age before retry + RetransmitBatchSize int // default 100 — max events per batch + ExpiryInterval time.Duration // default 1min — expiry loop tick rate + EventTTL time.Duration // default 24h — max event age + PublishTimeout time.Duration // default 5s — per-RPC deadline +} +``` + +## Postgres Schema + +**Migration `0295_chip_durable_events.sql`** in the existing `cre` schema: + +```sql +-- +goose Up +CREATE TABLE IF NOT EXISTS cre.chip_durable_events ( + id BIGSERIAL PRIMARY KEY, + payload BYTEA NOT NULL, + created_at TIMESTAMPTZ NOT NULL DEFAULT now() +); + +CREATE INDEX idx_chip_durable_events_created_at + ON cre.chip_durable_events (created_at ASC); + +-- +goose Down +DROP INDEX IF EXISTS cre.idx_chip_durable_events_created_at; +DROP TABLE IF EXISTS cre.chip_durable_events; +``` + +The table lives in each node's existing Postgres database. Under normal operation it is **transient** — events are inserted and deleted within milliseconds. Under Chip outage, events accumulate until delivery resumes. + +## Node Wiring + +### Config Flag + +```toml +[Telemetry] +DurableEmitterEnabled = true +``` + +Added to `config.Telemetry` interface, `toml.Telemetry` struct, and `telemetryConfig` implementation. + +### Integration Point (`application.go`) + +Wired in `NewApplication` after the DB is available but before CRE services start: + +```go +func setupDurableEmitter(ctx context.Context, ds sqlutil.DataSource, lggr logger.SugaredLogger) error { + client := beholder.GetClient() + chipClient := client.Chip + + pgStore := beholdersvc.NewPgDurableEventStore(ds) + durableEmitter, _ := beholder.NewDurableEmitter(pgStore, chipClient, beholder.DefaultDurableEmitterConfig(), lggr) + + // Preserve OTLP path alongside durable chip delivery + messageLogger := client.MessageLoggerProvider.Logger("durable-emitter") + otlpEmitter := beholder.NewMessageEmitter(messageLogger) + dualEmitter, _ := beholder.NewDualSourceEmitter(durableEmitter, otlpEmitter) + + durableEmitter.Start(ctx) + client.Emitter = dualEmitter + return nil +} +``` + +This replaces the global beholder emitter, covering **all** emission paths: +- `events.emitProtoMessage()` — billing, workflow execution lifecycle +- `custmsg.Labeler.Emit()` — workflow user logs +- `BridgeStatusReporter` — bridge status events +- Any other `beholder.GetEmitter()` caller + +### CRE Environment Auto-Enable + +`system-tests/lib/cre/don/config/config.go` sets `DurableEmitterEnabled = true` for all Docker-based nodesets, so it activates automatically in local CRE environments. + +## File Manifest + +| Repo | File | Purpose | +|------|------|---------| +| chainlink-common | `pkg/beholder/durable_event_store.go` | `DurableEventStore` interface + `MemDurableEventStore` | +| chainlink-common | `pkg/beholder/durable_emitter.go` | `DurableEmitter` — Emit, retransmit loop, expiry loop | +| chainlink-common | `pkg/beholder/durable_emitter_test.go` | Unit tests (in-memory store) | +| chainlink-common | `pkg/beholder/durable_emitter_integration_test.go` | Integration tests (mock gRPC server) | +| chainlink | `core/services/beholder/durable_event_store_orm.go` | `PgDurableEventStore` (Postgres ORM) | +| chainlink | `core/services/beholder/durable_event_store_orm_test.go` | ORM tests + Postgres benchmarks + load tests | +| chainlink | `core/services/beholder/durable_emitter_load_test.go` | TPS ramp/sustained/payload tests (Postgres + mock or external Chip via `CHIP_INGRESS_TEST_ADDR`) | +| chainlink | `core/store/migrate/migrations/0295_chip_durable_events.sql` | Postgres migration | +| chainlink | `core/config/telemetry_config.go` | `DurableEmitterEnabled()` on Telemetry interface | +| chainlink | `core/config/toml/types.go` | TOML field + setFrom merge | +| chainlink | `core/services/chainlink/config_telemetry.go` | Config accessor | +| chainlink | `core/services/chainlink/application.go` | `setupDurableEmitter` wiring | +| chainlink | `system-tests/lib/cre/don/config/config.go` | Auto-enable in CRE Docker envs | +| chainlink | `system-tests/tests/smoke/cre/v2_durable_emitter_test.go` | CRE smoke tests + load test | +| chainlink | `system-tests/tests/smoke/cre/cre_suite_test.go` | Test entry points | + +## Testing + +### Unit Tests (`chainlink-common`, in-memory store) + +| Test | What It Proves | +|------|---------------| +| `TestDurableEmitter_EmitPersistsAndPublishes` | Happy path: emit → publish → delete | +| `TestDurableEmitter_EmitReturnSuccessEvenWhenPublishFails` | Emit succeeds on DB insert even when gRPC fails | +| `TestDurableEmitter_RetransmitLoopDeliversFailedEvents` | Background loop retries failed events | +| `TestDurableEmitter_ExpiryLoopDeletesOldEvents` | TTL-based garbage collection | +| `TestDurableEmitter_EmitRejectsInvalidAttributes` | Validation before DB insert | +| `TestDurableEmitter_MultipleEvents` | 50 concurrent events all delivered | + +### Integration Tests (`chainlink-common`, mock gRPC server) + +Real gRPC server with controllable failure injection: + +| Test | What It Proves | +|------|---------------| +| `TestIntegration_HappyPath` | Events delivered over real gRPC + proto round-trip | +| `TestIntegration_ServerUnavailable_RetransmitRecovers` | Server returns UNAVAILABLE → retransmit delivers via PublishBatch | +| `TestIntegration_ServerDown_EventsSurvive` | **Crash recovery**: server stopped → events persist → new emitter (same store) retransmits on "restart" | +| `TestIntegration_HighThroughput` | 500 events delivered concurrently | +| `TestIntegration_EventExpiry` | Undeliverable events expired after TTL | +| `TestIntegration_RetransmitUsesBatch` | Retransmit path uses PublishBatch, not individual Publish | +| `TestIntegration_GRPCConnection` | Source/type arrive correctly on server side | + +### Postgres ORM Tests + Benchmarks (`chainlink`, real Postgres) + +| Test / Benchmark | What It Measures | +|---|---| +| `TestPgDurableEventStore_*` | ORM correctness (insert, list, delete, expiry) | +| `Benchmark_Insert` | Raw INSERT throughput | +| `Benchmark_InsertDelete` | Insert+delete cycle (happy-path hot loop) | +| `Benchmark_InsertPayloadSizes` | INSERT at 64B, 256B, 1KB, 4KB | +| `Benchmark_ListPending` | Query performance at 100 and 1000 queue depth | +| `TestLoad_SustainedInsertDelete` | 2000 events, 10-way concurrent insert+delete, measures ops/sec | +| `TestLoad_BurstThenDrain` | 1000-event burst, then drain via ListPending+Delete batches | +| `TestLoad_ConcurrentInsertWithListPending` | 3s of concurrent inserts + ListPending (real contention) | + +### Full-Stack Load Tests (`chainlink`, Postgres + mock gRPC) + +| Test / Benchmark | What It Measures | +|---|---| +| `TestFullStack_SustainedThroughput` | 1000 events, 10 concurrent emitters, end-to-end rate | +| `TestFullStack_ChipOutage` | 3-phase: normal → Chip goes UNAVAILABLE → recovery. Measures accumulation and drain rate | +| `TestFullStack_SlowChip` | 50ms gRPC latency. Proves Emit() stays fast while server is slow | +| `Benchmark_FullStack_EmitThroughput` | Upper bound events/sec through full pipeline | +| `Benchmark_FullStack_EmitPayloadSizes` | Full emit at 64B, 256B, 1KB, 4KB | + +### Durable emitter TPS load tests (`chainlink/core/services/beholder/durable_emitter_load_test.go`) + +These tests exercise **Postgres + `DurableEmitter` + Chip Ingress** (in-process mock **or** a real gateway). They are heavier than the ORM benchmarks and require a **real Postgres** (not `txdb`). + +#### Prerequisites + +- **`CL_DATABASE_URL`** — must point at a Postgres instance where migration **`0295_chip_durable_events`** has been applied (`cre.chip_durable_events` exists). Same URL pattern as other chainlink DB tests. +- **Short tests skipped** — if your test runner uses `-short`, these tests are skipped (`SkipShortDB`); run **without** `-short`. + +#### Mock Chip vs real Chip Ingress + +| Mode | How | Notes | +|------|-----|--------| +| **Mock** (default) | Do **not** set `CHIP_INGRESS_TEST_ADDR` | In-process gRPC server; tests can inject failures (outage, slow Chip). | +| **Real Chip** | Set `CHIP_INGRESS_TEST_ADDR=host:port` | Dials external Chip Ingress. Optional: `CHIP_INGRESS_TEST_TLS`, `CHIP_INGRESS_TEST_BASIC_AUTH_*`, `CHIP_INGRESS_TEST_SKIP_BASIC_AUTH`, `CHIP_INGRESS_TEST_SKIP_SCHEMA_REGISTRATION`. You need Kafka/Redpanda, topic **`chip-demo`**, and schema subject **`chip-demo-pb.DemoClientPayload`** (e.g. Atlas `make create-topic-and-schema` under `atlas/chip-ingress`). | + +Tests that **inject** Chip failures or rely on **in-process** receive counts are **skipped** when `CHIP_INGRESS_TEST_ADDR` is set. + +#### How to run + +From the `chainlink` repo root (examples): + +```bash +# All beholder tests including TPS (requires CL_DATABASE_URL) +export CL_DATABASE_URL='postgres://...' +go test -v -count=1 ./core/services/beholder/ -run 'TestTPS_|TestChipIngressExternalPing' + +# Ramp-up only (100 → 500 → 1k → 2k TPS levels) +go test -v -count=1 ./core/services/beholder/ -run TestTPS_RampUp + +# Sustained 1k TPS for 60s + drain check +go test -v -count=1 ./core/services/beholder/ -run TestTPS_Sustained1k + +# Payload size scaling (fixed duration per size) +go test -v -count=1 ./core/services/beholder/ -run TestTPS_PayloadSizeScaling + +# External Chip smoke (with addr set) +export CHIP_INGRESS_TEST_ADDR='localhost:50051' +go test -v -count=1 ./core/services/beholder/ -run TestChipIngressExternalPing +``` + +After a full package run, **`TestMain`** prints a **TPS LOAD TEST SUMMARY** block aggregating result blocks from **`TestTPS_RampUp`**, **`TestTPS_Sustained1k`**, **`TestTPS_1k_WithChipOutage`** (mock only; skipped with external Chip), and **`TestTPS_PayloadSizeScaling`**. + +#### Reading the tables (column glossary) + +| Column | Meaning | +|--------|---------| +| **Target TPS** | Requested emit rate (token-bucket style scheduling across workers). | +| **Achieved TPS** | `Total emits ÷ window duration` — realized successful `Emit()` throughput. | +| **Total emits** | Count of **`Emit()` calls that returned `nil`** in the measurement window (successful Postgres insert path). Does not count failures. | +| **Emit p50 / p99** | Latency of successful `Emit()` calls (dominated by DB insert). | +| **Pub fail (retry)*** | Failed `Publish` / `PublishBatch` RPCs during the window: immediate failures (one row each, need retransmit) plus, when shown as `a+b`, `b` = total event count in failed `PublishBatch` calls. `Emit()` insert failures are logged separately if non-zero. | +| **Q max (rows)** | Peak row count in `cre.chip_durable_events` sampled during the emit window (~50ms polls). | +| **Q end (rows)** | Row count after a short settle (async publish / retransmit). | +| **Q max (KB)*** | For the peak queue sample: `sum(octet_length(payload))/1024` over queued rows (payload bytes only). **Q end** payload size is omitted from the printed table to keep it narrow. | + +With **`CHIP_INGRESS_TEST_ADDR`** set, there is no in-process mock — validate end-to-end delivery with **Kafka / Chip / gateway metrics** (or consumer checks). **Total emits** and **Achieved TPS** still reflect successful durable inserts on the node. + +### CRE Smoke Tests (live Docker environment) + +Tests connect to the node's Postgres and query `cre.chip_durable_events` directly, using `pg_stat_user_tables` for insert/delete statistics — the same pattern used by the EVM LogTrigger test for `trigger_pending_events`. + +| Test | What It Does | +|------|-------------| +| `Test_CRE_V2_DurableEmitter` | Deploys a cron workflow (every 5s), waits for 30+ insert+delete cycles, verifies queue drains to near-empty | +| `Test_CRE_V2_DurableEmitter_Load` | Deploys 5 cron workflows (every 1s each), runs for 3 minutes. Logs insert/delete rates, max queue depth, and prints summary table | + +**Running CRE smoke tests:** +```bash +# Basic correctness +go test -v -run Test_CRE_V2_DurableEmitter$ -timeout 10m + +# Load test (5 workflows × 1s cron, 3min observation) +go test -v -run Test_CRE_V2_DurableEmitter_Load -timeout 10m +``` + +**Example load test output:** +``` +╔════════════════════════════════════════════════╗ +║ DURABLE EMITTER LOAD TEST RESULTS ║ +╠════════════════════════════════════════════════╣ +║ Workflows deployed: 5 ║ +║ Observation period: 3m0s ║ +║ Total inserts: 1842 ║ +║ Total deletes: 1840 ║ +║ Avg insert rate: 10.2 events/sec ║ +║ Avg delete rate: 10.2 events/sec ║ +║ Max queue depth: 12 ║ +║ Final pending: 2 ║ +╚════════════════════════════════════════════════╝ +``` + +## Metrics to Instrument (Future) + +| Metric | Description | +|--------|-------------| +| `durable_emitter.queue_depth` | Current row count in `chip_durable_events` | +| `durable_emitter.insert_rate` | Events persisted per second | +| `durable_emitter.publish_rate` | Events successfully delivered per second | +| `durable_emitter.retransmit_rate` | Events retransmitted via background loop | +| `durable_emitter.publish_latency` | Time from insert to confirmed delivery | +| `durable_emitter.oldest_pending` | Age of the longest-waiting event | +| `durable_emitter.expired_count` | Events expired (dropped after TTL) | +| `durable_emitter.error_rate` | Failed publish attempts per second | + +## Open Questions & Future Work + +### 1. Chip Gateway Idempotency +Does the gateway deduplicate re-sent events? If the retransmit loop re-sends an event that the immediate path already delivered (race window), the gateway should de-dup using the CloudEvent `id` (UUID). Needs server-side confirmation. + +### 2. DB Load at Scale +At 1k TPS: ~1k inserts/sec + ~1k deletes/sec = ~2k write ops/sec on the node's Postgres. This produces dead tuples requiring autovacuum tuning. Potential optimizations: +- **Batch deletes** — delete by ID list instead of per-row. +- **Two-table approach** — queued + recently-sent to reduce churn on the hot table. +- **CDC streaming** — stream WAL changes directly, avoiding the insert/delete pattern entirely. Matthew Gardener and Clement can advise on CDC implementation. + +### 3. Exponential Backoff +Current PoC uses a fixed retransmit interval. Production should implement per-event exponential backoff using `attempts` and `last_sent_at` columns (schema extension). + +### 4. rmq / Redis Alternative +Patrick raised using [rmq](https://github.com/wellle/rmq) backed by our own DB instead of re-implementing a queue. Worth evaluating if the Postgres-backed approach shows scaling issues in load testing. + +### 5. CDC Streaming +Could stream WAL changes directly rather than polling the table, avoiding the insert/delete churn entirely. This would also enable real-time analytics on event flow. Requires infrastructure coordination with the data analytics pipeline team. + +### 6. DurableEmitter Lifecycle Management +Currently the `DurableEmitter` is started in `application.go` and its background loops are tied to the application context. For production, it should be registered as a proper `services.ServiceCtx` with Start/Close lifecycle management, health checks, and graceful shutdown (flush pending events before stopping). diff --git a/pkg/beholder/durable_emitter.go b/pkg/beholder/durable_emitter.go new file mode 100644 index 0000000000..cfb48b426c --- /dev/null +++ b/pkg/beholder/durable_emitter.go @@ -0,0 +1,653 @@ +package beholder + +import ( + "context" + "fmt" + "slices" + "strings" + "sync" + "time" + + cepb "github.com/cloudevents/sdk-go/binding/format/protobuf/v2/pb" + "google.golang.org/protobuf/proto" + + "github.com/smartcontractkit/chainlink-common/pkg/chipingress" + "github.com/smartcontractkit/chainlink-common/pkg/logger" +) + +// DurableEmitterConfig configures the DurableEmitter behaviour. +type DurableEmitterConfig struct { + // RetransmitInterval controls how often the retransmit loop ticks. + RetransmitInterval time.Duration + // RetransmitAfter is the minimum age of an event before the retransmit + // loop considers it. This gives the immediate-publish path time to succeed. + RetransmitAfter time.Duration + // RetransmitBatchSize caps how many pending rows are listed per retransmit tick + // (each row is sent with its own Publish RPC). + RetransmitBatchSize int + // ExpiryInterval controls how often the expiry loop ticks. + ExpiryInterval time.Duration + // EventTTL is the maximum age of an event before it is expired. + EventTTL time.Duration + // PublishTimeout is the per-RPC deadline for each Publish call. + PublishTimeout time.Duration + // PurgeInterval is how often the purge loop runs to batch-delete rows that + // were marked delivered (Postgres). Zero defaults to 250ms. + PurgeInterval time.Duration + // PurgeBatchSize is the maximum rows removed per PurgeDelivered call. Zero defaults to 500. + PurgeBatchSize int + // Hooks is optional instrumentation (load tests, profiling). Nil fields are skipped. + // Callbacks may run from many goroutines; implementations must be thread-safe. + Hooks *DurableEmitterHooks + // Metrics enables OpenTelemetry instruments on beholder.GetMeter() (queue, publish, store, optional process stats). + // Nil disables. + Metrics *DurableEmitterMetricsConfig + // PersistCloudEventSources limits durable persistence to these CloudEvent Source values + // (the beholder_domain / ce_source). If nil, every source is persisted (library default). + // If non-nil, only matching sources are inserted and retried; others get a single best-effort + // Publish with no store insert. An empty slice persists nothing (all best-effort only). + // A one-element slice containing only "*" is treated like nil (persist all). + PersistCloudEventSources []string +} + +// DurableEmitterHooks records Publish vs Delete latency to locate pipeline bottlenecks. +type DurableEmitterHooks struct { + // OnImmediatePublish is called after each async Publish in publishAndDelete (every attempt). + OnImmediatePublish func(elapsed time.Duration, err error) + // OnImmediateDelete is called after MarkDelivered following a successful immediate Publish. + OnImmediateDelete func(elapsed time.Duration, err error) + // OnRetransmitBatchPublish is called after each retransmit Publish (one RPC per queued event). + OnRetransmitBatchPublish func(elapsed time.Duration, eventCount int, err error) + // OnRetransmitBatchDeletes is called after a retransmit tick with total time and count of + // successful MarkDelivered calls (mem store may delete rows; Postgres sets delivered_at). + OnRetransmitBatchDeletes func(elapsed time.Duration, markedDeliveredCount int) +} + +func DefaultDurableEmitterConfig() DurableEmitterConfig { + return DurableEmitterConfig{ + RetransmitInterval: 5 * time.Second, + RetransmitAfter: 10 * time.Second, + RetransmitBatchSize: 100, + ExpiryInterval: 1 * time.Minute, + EventTTL: 24 * time.Hour, + PublishTimeout: 5 * time.Second, + PurgeInterval: 250 * time.Millisecond, + PurgeBatchSize: 500, + } +} + +// DurableEmitter implements Emitter with persistence-backed delivery guarantees. +// +// On Emit the event is serialized and written to a DurableEventStore. Once the +// insert succeeds Emit returns nil — the caller has a durable guarantee. An +// immediate async Publish is attempted; on success the record is MarkDelivered +// (excluded from retries). Postgres stores then purge physical rows in batches; +// in-memory stores remove the row immediately. If Publish fails, a background +// retransmit loop retries via Publish (one RPC per pending row per tick, up to +// RetransmitBatchSize). +// +// A separate expiry loop garbage-collects events older than EventTTL to bound +// table growth. +type DurableEmitter struct { + store DurableEventStore + client chipingress.Client + cfg DurableEmitterConfig + log logger.Logger + + metrics *durableEmitterMetrics + persistFilter persistSourceFilter + + stopCh chan struct{} + wg sync.WaitGroup +} + +// persistSourceFilter decides whether a CloudEvent source may be written to the durable store. +type persistSourceFilter struct { + allowAll bool + allowed map[string]struct{} +} + +func newPersistSourceFilter(sources []string) persistSourceFilter { + if sources == nil { + return persistSourceFilter{allowAll: true} + } + if len(sources) == 1 && strings.TrimSpace(sources[0]) == "*" { + return persistSourceFilter{allowAll: true} + } + m := make(map[string]struct{}, len(sources)) + for _, s := range sources { + m[strings.TrimSpace(s)] = struct{}{} + } + return persistSourceFilter{allowed: m} +} + +func (f persistSourceFilter) allows(source string) bool { + if f.allowAll { + return true + } + _, ok := f.allowed[source] + return ok +} + +var _ Emitter = (*DurableEmitter)(nil) + +func NewDurableEmitter( + store DurableEventStore, + client chipingress.Client, + cfg DurableEmitterConfig, + log logger.Logger, +) (*DurableEmitter, error) { + if store == nil { + return nil, fmt.Errorf("durable event store is nil") + } + if client == nil { + return nil, fmt.Errorf("chipingress client is nil") + } + if log == nil { + return nil, fmt.Errorf("logger is nil") + } + var m *durableEmitterMetrics + if cfg.Metrics != nil { + var err error + m, err = newDurableEmitterMetrics() + if err != nil { + return nil, fmt.Errorf("durable emitter metrics: %w", err) + } + store = newMetricsInstrumentedStore(store, m) + } + return &DurableEmitter{ + store: store, + client: client, + cfg: cfg, + log: log, + metrics: m, + persistFilter: newPersistSourceFilter(cfg.PersistCloudEventSources), + stopCh: make(chan struct{}), + }, nil +} + +// Start launches the retransmit, expiry, and purge background loops. +// Cancel the supplied context or call Close to stop them. +func (d *DurableEmitter) Start(ctx context.Context) { + n := 3 + if d.metrics != nil && d.cfg.Metrics != nil { + n++ + } + d.wg.Add(n) + go d.retransmitLoop(ctx) + go d.expiryLoop(ctx) + go d.purgeLoop(ctx) + if d.metrics != nil && d.cfg.Metrics != nil { + go d.metricsLoop(ctx) + } +} + +// Emit persists the event then attempts async delivery when the CloudEvent source is allowed +// by PersistCloudEventSources; otherwise it performs a single best-effort Publish with no +// persistence. Returns nil once processing is accepted (insert succeeded, or non-persist path started). +func (d *DurableEmitter) Emit(ctx context.Context, body []byte, attrKVs ...any) error { + emitFail := func() { + if d.metrics != nil { + d.metrics.emitFail.Add(ctx, 1) + } + } + sourceDomain, entityType, err := ExtractSourceAndType(attrKVs...) + if err != nil { + emitFail() + return err + } + + event, err := chipingress.NewEvent(sourceDomain, entityType, body, newAttributes(attrKVs...)) + if err != nil { + emitFail() + return err + } + + eventPb, err := chipingress.EventToProto(event) + if err != nil { + emitFail() + return fmt.Errorf("failed to convert event to proto: %w", err) + } + + if !d.persistFilter.allows(sourceDomain) { + cl := proto.Clone(eventPb) + evCopy, ok := cl.(*chipingress.CloudEventPb) + if !ok { + emitFail() + return fmt.Errorf("proto.Clone event: got %T, want *chipingress.CloudEventPb", cl) + } + go d.publishBestEffortNoStore(evCopy) + return nil + } + + payload, err := proto.Marshal(eventPb) + if err != nil { + emitFail() + return fmt.Errorf("failed to marshal event proto: %w", err) + } + + tIns := time.Now() + id, err := d.store.Insert(ctx, payload) + if d.metrics != nil { + d.metrics.emitDuration.Record(ctx, time.Since(tIns).Seconds()) + if err != nil { + d.metrics.emitFail.Add(ctx, 1) + } else { + d.metrics.emitSuccess.Add(ctx, 1) + } + } + if err != nil { + return fmt.Errorf("failed to persist event: %w", err) + } + + // Fire-and-forget immediate delivery attempt. + go d.publishAndDelete(id, eventPb) + + return nil +} + +// publishBestEffortNoStore performs one Publish without persisting or retries. +func (d *DurableEmitter) publishBestEffortNoStore(eventPb *chipingress.CloudEventPb) { + ctx, cancel := context.WithTimeout(context.Background(), d.cfg.PublishTimeout) + defer cancel() + + detailKVs := cloudEventPublishKVs(0, "best_effort_no_store", d.cfg.PublishTimeout, eventPb) + //d.log.Infow("DurableEmitter: Chip Ingress publish attempt (best-effort, not persisted)", detailKVs...) + + t0 := time.Now() + _, err := d.client.Publish(ctx, eventPb) + elapsed := time.Since(t0) + if h := d.cfg.Hooks; h != nil && h.OnImmediatePublish != nil { + h.OnImmediatePublish(elapsed, err) + } + mctx := context.Background() + d.metrics.recordPublish(mctx, elapsed, "best_effort", err) + if d.metrics != nil { + if err != nil { + d.metrics.publishImmErr.Add(mctx, 1) + } else { + d.metrics.publishImmOK.Add(mctx, 1) + } + } + if err != nil { + failKVs := append([]any{}, detailKVs...) + failKVs = append(failKVs, + "error", err, + "elapsed", elapsed.String(), + "elapsed_ms", elapsed.Milliseconds(), + ) + //d.log.Infow("DurableEmitter: best-effort Chip publish failed (not persisted, no retry)", failKVs...) + return + } + okKVs := append([]any{}, detailKVs...) + okKVs = append(okKVs, "publish_rpc_elapsed_ms", elapsed.Milliseconds()) + //d.log.Infow("DurableEmitter: best-effort Chip publish succeeded (not persisted)", okKVs...) +} + +// Close signals background loops to stop and waits for them to finish. +func (d *DurableEmitter) Close() error { + close(d.stopCh) + d.wg.Wait() + return nil +} + +// publishAndDelete attempts a single Publish and deletes the record on success. +func (d *DurableEmitter) publishAndDelete(id int64, eventPb *chipingress.CloudEventPb) { + ctx, cancel := context.WithTimeout(context.Background(), d.cfg.PublishTimeout) + defer cancel() + + detailKVs := cloudEventPublishKVs(id, "immediate", d.cfg.PublishTimeout, eventPb) + //d.log.Infow("DurableEmitter: Chip Ingress publish attempt (immediate)", detailKVs...) + + t0 := time.Now() + _, err := d.client.Publish(ctx, eventPb) + elapsed := time.Since(t0) + if h := d.cfg.Hooks; h != nil && h.OnImmediatePublish != nil { + h.OnImmediatePublish(elapsed, err) + } + mctx := context.Background() + d.metrics.recordPublish(mctx, elapsed, "immediate", err) + if d.metrics != nil { + if err != nil { + d.metrics.publishImmErr.Add(mctx, 1) + } else { + d.metrics.publishImmOK.Add(mctx, 1) + } + } + if err != nil { + failKVs := append([]any{}, detailKVs...) + failKVs = append(failKVs, + "error", err, + "elapsed", elapsed.String(), + "elapsed_ms", elapsed.Milliseconds(), + ) + d.log.Infow("DurableEmitter: Chip Ingress publish failed (immediate), retransmit loop will retry", failKVs...) + return + } + + pubOKKVs := append([]any{}, detailKVs...) + pubOKKVs = append(pubOKKVs, + "publish_rpc_elapsed", elapsed.String(), + "publish_rpc_elapsed_ms", elapsed.Milliseconds(), + ) + //d.log.Infow("DurableEmitter: Chip Ingress publish succeeded (immediate)", pubOKKVs...) + + t1 := time.Now() + markErr := d.store.MarkDelivered(context.Background(), id) + if h := d.cfg.Hooks; h != nil && h.OnImmediateDelete != nil { + h.OnImmediateDelete(time.Since(t1), markErr) + } + if markErr == nil && d.metrics != nil { + d.metrics.deliverComplete.Add(mctx, 1) + } + markElapsed := time.Since(t1) + if markErr != nil { + d.log.Errorw("failed to mark delivered event", "id", id, "error", markErr) + return + } + delOKKVs := append([]any{}, detailKVs...) + delOKKVs = append(delOKKVs, + "publish_rpc_elapsed_ms", elapsed.Milliseconds(), + "store_mark_delivered_elapsed", markElapsed.String(), + "store_mark_delivered_elapsed_ms", markElapsed.Milliseconds(), + ) + //d.log.Infow("DurableEmitter: durable row marked delivered after successful Chip publish (immediate)", delOKKVs...) +} + +func (d *DurableEmitter) retransmitLoop(ctx context.Context) { + defer d.wg.Done() + ticker := time.NewTicker(d.cfg.RetransmitInterval) + defer ticker.Stop() + + for { + select { + case <-ctx.Done(): + return + case <-d.stopCh: + return + case <-ticker.C: + d.retransmitPending(ctx) + } + } +} + +func (d *DurableEmitter) retransmitPending(ctx context.Context) { + cutoff := time.Now().Add(-d.cfg.RetransmitAfter) + pending, err := d.store.ListPending(ctx, cutoff, d.cfg.RetransmitBatchSize) + if err != nil { + d.log.Errorw("failed to list pending events", "error", err) + return + } + + if obs, ok := d.store.(DurableQueueObserver); ok { + st, obsErr := obs.ObserveDurableQueue(ctx, d.cfg.EventTTL, d.queueStatsNearExpiryLead()) + if obsErr != nil { + d.log.Warnw("DurableEmitter: retransmit scan ObserveDurableQueue failed", "error", obsErr) + } else { + d.log.Infow("DurableEmitter: retransmit pending scan", + "pending_rows", st.Depth, + "pending_payload_bytes", st.PayloadBytes, + "oldest_pending_age", st.OldestPendingAge.String(), + "near_ttl_rows", st.NearTTLCount, + "retransmit_list_batch", len(pending), + "retransmit_after", d.cfg.RetransmitAfter.String(), + "list_limit", d.cfg.RetransmitBatchSize, + ) + } + } + + if len(pending) == 0 { + return + } + + events := make([]*chipingress.CloudEventPb, 0, len(pending)) + ids := make([]int64, 0, len(pending)) + + for _, pe := range pending { + ev := new(chipingress.CloudEventPb) + if err := proto.Unmarshal(pe.Payload, ev); err != nil { + d.log.Errorw("corrupt pending event, deleting", "id", pe.ID, "error", err) + _ = d.store.Delete(ctx, pe.ID) + continue + } + if !d.persistFilter.allows(ev.GetSource()) { + d.log.Infow("DurableEmitter: dropping queued event (ce_source not in PersistCloudEventSources)", + "id", pe.ID, "ce_source", ev.GetSource(), "ce_type", ev.GetType()) + _ = d.store.Delete(ctx, pe.ID) + continue + } + events = append(events, ev) + ids = append(ids, pe.ID) + } + if len(events) == 0 { + return + } + + // One Publish per row so a single bad or rejected event does not block the rest of the slice. + tDel := time.Now() + var markedDelivered int + for i := range events { + detailKVs := cloudEventPublishKVs(ids[i], "retransmit", d.cfg.PublishTimeout, events[i]) + //d.log.Infow("DurableEmitter: Chip Ingress publish attempt (retransmit)", detailKVs...) + + tPub := time.Now() + pubCtx, cancel := context.WithTimeout(context.Background(), d.cfg.PublishTimeout) + _, pubErr := d.client.Publish(pubCtx, events[i]) + cancel() + elapsed := time.Since(tPub) + if h := d.cfg.Hooks; h != nil && h.OnRetransmitBatchPublish != nil { + h.OnRetransmitBatchPublish(elapsed, 1, pubErr) + } + d.metrics.recordPublish(context.Background(), elapsed, "retransmit", pubErr) + if pubErr != nil { + if d.metrics != nil { + d.metrics.publishBatchEvErr.Add(ctx, 1) + } + failKVs := append([]any{}, detailKVs...) + failKVs = append(failKVs, + "error", pubErr, + "elapsed", elapsed.String(), + "elapsed_ms", elapsed.Milliseconds(), + ) + d.log.Infow("DurableEmitter: Chip Ingress publish failed (retransmit)", failKVs...) + continue + } + pubOKKVs := append([]any{}, detailKVs...) + pubOKKVs = append(pubOKKVs, + "publish_rpc_elapsed", elapsed.String(), + "publish_rpc_elapsed_ms", elapsed.Milliseconds(), + ) + //d.log.Infow("DurableEmitter: Chip Ingress publish succeeded (retransmit)", pubOKKVs...) + if d.metrics != nil { + d.metrics.publishBatchEvOK.Add(ctx, 1) + } + tMarkOne := time.Now() + if markErr := d.store.MarkDelivered(ctx, ids[i]); markErr != nil { + d.log.Errorw("failed to mark retransmitted event delivered", "id", ids[i], "error", markErr) + continue + } + markedDelivered++ + if d.metrics != nil { + d.metrics.deliverComplete.Add(ctx, 1) + } + markElapsed := time.Since(tMarkOne) + delOKKVs := append([]any{}, detailKVs...) + delOKKVs = append(delOKKVs, + "publish_rpc_elapsed_ms", elapsed.Milliseconds(), + "store_mark_delivered_elapsed", markElapsed.String(), + "store_mark_delivered_elapsed_ms", markElapsed.Milliseconds(), + ) + //d.log.Infow("DurableEmitter: durable row deleted after successful Chip publish (retransmit)", delOKKVs...) + } + if markedDelivered > 0 { + d.log.Infow("retransmitted events", + "marked_delivered", markedDelivered, + "attempted", len(events), + ) + } + if h := d.cfg.Hooks; h != nil && h.OnRetransmitBatchDeletes != nil && markedDelivered > 0 { + h.OnRetransmitBatchDeletes(time.Since(tDel), markedDelivered) + } +} + +func (d *DurableEmitter) purgeLoop(ctx context.Context) { + defer d.wg.Done() + interval := d.cfg.PurgeInterval + if interval <= 0 { + interval = 250 * time.Millisecond + } + batch := d.cfg.PurgeBatchSize + if batch <= 0 { + batch = 500 + } + ticker := time.NewTicker(interval) + defer ticker.Stop() + for { + select { + case <-ctx.Done(): + return + case <-d.stopCh: + return + case <-ticker.C: + for { + n, err := d.store.PurgeDelivered(ctx, batch) + if err != nil { + d.log.Errorw("failed to purge delivered chip durable events", "error", err) + break + } + if n == 0 { + break + } + } + } + } +} + +func (d *DurableEmitter) expiryLoop(ctx context.Context) { + defer d.wg.Done() + ticker := time.NewTicker(d.cfg.ExpiryInterval) + defer ticker.Stop() + + for { + select { + case <-ctx.Done(): + return + case <-d.stopCh: + return + case <-ticker.C: + deleted, err := d.store.DeleteExpired(ctx, d.cfg.EventTTL) + if err != nil { + d.log.Errorw("failed to delete expired events", "error", err) + continue + } + if deleted > 0 { + if d.metrics != nil { + d.metrics.expiredPurged.Add(context.Background(), deleted) + } + d.log.Infow("purged expired events", "count", deleted) + } + } + } +} + +func (d *DurableEmitter) queueStatsNearExpiryLead() time.Duration { + lead := 5 * time.Minute + if d.cfg.Metrics != nil && d.cfg.Metrics.NearExpiryLead > 0 { + lead = d.cfg.Metrics.NearExpiryLead + } + return lead +} + +func (d *DurableEmitter) metricsLoop(ctx context.Context) { + defer d.wg.Done() + mc := d.cfg.Metrics + poll := mc.PollInterval + if poll <= 0 { + poll = 10 * time.Second + } + ticker := time.NewTicker(poll) + defer ticker.Stop() + for { + select { + case <-ctx.Done(): + return + case <-d.stopCh: + return + case <-ticker.C: + if d.metrics == nil { + return + } + bctx := context.Background() + if obs, ok := d.store.(DurableQueueObserver); ok { + d.metrics.pollQueueGauges(bctx, obs, d.cfg.EventTTL, d.queueStatsNearExpiryLead(), mc.MaxQueuePayloadBytes) + } + if mc.RecordProcessStats { + d.metrics.recordProcessMem(bctx) + d.metrics.recordProcessCPU(bctx) + } + } + } +} + +// cloudEventPublishKVs returns structured fields for logging a Chip Ingress Publish RPC. +func cloudEventPublishKVs(durableRowID int64, phase string, timeout time.Duration, ev *chipingress.CloudEventPb) []any { + if ev == nil { + return []any{ + "durable_row_id", durableRowID, + "publish_phase", phase, + "publish_timeout", timeout.String(), + "ce_nil", true, + } + } + + attrs := ev.GetAttributes() + bin := ev.GetBinaryData() + text := ev.GetTextData() + pd := ev.GetProtoData() + var protoTypeURL string + if pd != nil { + protoTypeURL = pd.GetTypeUrl() + } + + attrKeys := make([]string, 0, len(attrs)) + for k := range attrs { + attrKeys = append(attrKeys, k) + } + slices.Sort(attrKeys) + + kvs := []any{ + "durable_row_id", durableRowID, + "publish_phase", phase, + "publish_timeout", timeout.String(), + "ce_id", ev.GetId(), + "ce_source", ev.GetSource(), + "ce_type", ev.GetType(), + "ce_spec_version", ev.GetSpecVersion(), + "ce_data_binary_bytes", len(bin), + "ce_data_text_bytes", len(text), + "ce_proto_data_type_url", protoTypeURL, + "ce_attribute_count", len(attrs), + "ce_attribute_keys", strings.Join(attrKeys, ","), + "ce_attr_datacontenttype", cloudEventAttrString(attrs, "datacontenttype"), + "ce_attr_dataschema", cloudEventAttrString(attrs, "dataschema"), + "ce_attr_subject", cloudEventAttrString(attrs, "subject"), + } + return kvs +} + +func cloudEventAttrString(attrs map[string]*cepb.CloudEventAttributeValue, key string) string { + if attrs == nil { + return "" + } + v := attrs[key] + if v == nil { + return "" + } + if s := v.GetCeString(); s != "" { + return s + } + if s := v.GetCeUri(); s != "" { + return s + } + return "" +} diff --git a/pkg/beholder/durable_emitter_cpu_other.go b/pkg/beholder/durable_emitter_cpu_other.go new file mode 100644 index 0000000000..2b7a7f5206 --- /dev/null +++ b/pkg/beholder/durable_emitter_cpu_other.go @@ -0,0 +1,9 @@ +//go:build !unix + +package beholder + +import "context" + +func (m *durableEmitterMetrics) recordProcessCPU(ctx context.Context) { + _ = ctx +} diff --git a/pkg/beholder/durable_emitter_cpu_unix.go b/pkg/beholder/durable_emitter_cpu_unix.go new file mode 100644 index 0000000000..ec46802a63 --- /dev/null +++ b/pkg/beholder/durable_emitter_cpu_unix.go @@ -0,0 +1,22 @@ +//go:build unix + +package beholder + +import ( + "context" + "syscall" +) + +func (m *durableEmitterMetrics) recordProcessCPU(ctx context.Context) { + if m == nil { + return + } + var r syscall.Rusage + if err := syscall.Getrusage(syscall.RUSAGE_SELF, &r); err != nil { + return + } + u := float64(r.Utime.Sec) + float64(r.Utime.Usec)/1e6 + s := float64(r.Stime.Sec) + float64(r.Stime.Usec)/1e6 + m.procCPUUser.Record(ctx, u) + m.procCPUSys.Record(ctx, s) +} diff --git a/pkg/beholder/durable_emitter_integration_test.go b/pkg/beholder/durable_emitter_integration_test.go new file mode 100644 index 0000000000..7c9fdff252 --- /dev/null +++ b/pkg/beholder/durable_emitter_integration_test.go @@ -0,0 +1,387 @@ +package beholder_test + +import ( + "context" + "net" + "sync" + "sync/atomic" + "testing" + "time" + + cepb "github.com/cloudevents/sdk-go/binding/format/protobuf/v2/pb" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "google.golang.org/grpc" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/credentials/insecure" + "google.golang.org/grpc/status" + + "github.com/smartcontractkit/chainlink-common/pkg/beholder" + "github.com/smartcontractkit/chainlink-common/pkg/chipingress" + "github.com/smartcontractkit/chainlink-common/pkg/chipingress/pb" + "github.com/smartcontractkit/chainlink-common/pkg/logger" +) + +// mockChipServer implements ChipIngressServer with controllable behaviour. +type mockChipServer struct { + pb.UnimplementedChipIngressServer + + mu sync.Mutex + publishErr error + batchErr error + received []*cepb.CloudEvent + batchReceived [][]*cepb.CloudEvent + publishCount atomic.Int64 + batchCount atomic.Int64 + publishDelay time.Duration +} + +func (s *mockChipServer) Publish(_ context.Context, in *cepb.CloudEvent) (*pb.PublishResponse, error) { + if s.publishDelay > 0 { + time.Sleep(s.publishDelay) + } + s.publishCount.Add(1) + s.mu.Lock() + defer s.mu.Unlock() + if s.publishErr != nil { + return nil, s.publishErr + } + s.received = append(s.received, in) + return &pb.PublishResponse{}, nil +} + +func (s *mockChipServer) PublishBatch(_ context.Context, in *pb.CloudEventBatch) (*pb.PublishResponse, error) { + s.batchCount.Add(1) + s.mu.Lock() + defer s.mu.Unlock() + if s.batchErr != nil { + return nil, s.batchErr + } + s.batchReceived = append(s.batchReceived, in.Events) + s.received = append(s.received, in.Events...) + return &pb.PublishResponse{}, nil +} + +func (s *mockChipServer) Ping(context.Context, *pb.EmptyRequest) (*pb.PingResponse, error) { + return &pb.PingResponse{Message: "pong"}, nil +} + +func (s *mockChipServer) setPublishErr(err error) { + s.mu.Lock() + defer s.mu.Unlock() + s.publishErr = err +} + +func (s *mockChipServer) setBatchErr(err error) { + s.mu.Lock() + defer s.mu.Unlock() + s.batchErr = err +} + +func (s *mockChipServer) receivedCount() int { + s.mu.Lock() + defer s.mu.Unlock() + return len(s.received) +} + +func (s *mockChipServer) batchCallCount() int { + s.mu.Lock() + defer s.mu.Unlock() + return len(s.batchReceived) +} + +// startMockServer starts a gRPC server on a random port and returns the +// server, address, and a cleanup function. +func startMockServer(t *testing.T, srv *mockChipServer) (*grpc.Server, string) { + t.Helper() + lis, err := net.Listen("tcp", "127.0.0.1:0") + require.NoError(t, err) + + gs := grpc.NewServer() + pb.RegisterChipIngressServer(gs, srv) + + go func() { + if err := gs.Serve(lis); err != nil { + // Ignore errors from server being stopped during cleanup. + } + }() + + t.Cleanup(func() { gs.GracefulStop() }) + return gs, lis.Addr().String() +} + +func newChipClient(t *testing.T, addr string) chipingress.Client { + t.Helper() + c, err := chipingress.NewClient(addr, chipingress.WithInsecureConnection()) + require.NoError(t, err) + t.Cleanup(func() { _ = c.Close() }) + return c +} + +func emitAttrs() []any { + return []any{"source", "test-domain", "type", "test-entity"} +} + +func fastCfg() beholder.DurableEmitterConfig { + return beholder.DurableEmitterConfig{ + RetransmitInterval: 100 * time.Millisecond, + RetransmitAfter: 50 * time.Millisecond, + RetransmitBatchSize: 50, + ExpiryInterval: 200 * time.Millisecond, + EventTTL: 500 * time.Millisecond, + PublishTimeout: 2 * time.Second, + } +} + +// ---------- Test cases ---------- + +func TestIntegration_HappyPath(t *testing.T) { + srv := &mockChipServer{} + _, addr := startMockServer(t, srv) + client := newChipClient(t, addr) + store := beholder.NewMemDurableEventStore() + + em, err := beholder.NewDurableEmitter(store, client, fastCfg(), logger.Test(t)) + require.NoError(t, err) + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + em.Start(ctx) + defer em.Close() + + require.NoError(t, em.Emit(ctx, []byte("billing-record-1"), emitAttrs()...)) + require.NoError(t, em.Emit(ctx, []byte("billing-record-2"), emitAttrs()...)) + + require.Eventually(t, func() bool { + return srv.receivedCount() == 2 + }, 3*time.Second, 10*time.Millisecond, "server should receive both events") + + require.Eventually(t, func() bool { + return store.Len() == 0 + }, 3*time.Second, 10*time.Millisecond, "store should be empty after delivery") +} + +func TestIntegration_ServerUnavailable_RetransmitRecovers(t *testing.T) { + // Start with server returning UNAVAILABLE. + srv := &mockChipServer{} + srv.setPublishErr(status.Error(codes.Unavailable, "chip down")) + _, addr := startMockServer(t, srv) + client := newChipClient(t, addr) + store := beholder.NewMemDurableEventStore() + + em, err := beholder.NewDurableEmitter(store, client, fastCfg(), logger.Test(t)) + require.NoError(t, err) + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + em.Start(ctx) + defer em.Close() + + require.NoError(t, em.Emit(ctx, []byte("will-retry"), emitAttrs()...)) + + // Event should be in the store, not delivered. + time.Sleep(200 * time.Millisecond) + assert.Equal(t, 1, store.Len(), "event persists while server is unavailable") + + // "Recover" the server. + srv.setPublishErr(nil) + + require.Eventually(t, func() bool { + return store.Len() == 0 + }, 5*time.Second, 50*time.Millisecond, "retransmit loop should deliver after recovery") + + assert.GreaterOrEqual(t, srv.publishCount.Load(), int64(2), + "one failed immediate Publish then one retransmit Publish") + assert.Equal(t, int64(0), srv.batchCount.Load(), "retransmit should not use PublishBatch") +} + +func TestIntegration_ServerDown_EventsSurvive(t *testing.T) { + // Start server, then stop it to simulate total outage. + srv := &mockChipServer{} + gs, addr := startMockServer(t, srv) + client := newChipClient(t, addr) + store := beholder.NewMemDurableEventStore() + + cfg := fastCfg() + cfg.PublishTimeout = 500 * time.Millisecond + em, err := beholder.NewDurableEmitter(store, client, cfg, logger.Test(t)) + require.NoError(t, err) + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + em.Start(ctx) + + // Stop the gRPC server entirely. + gs.Stop() + time.Sleep(100 * time.Millisecond) + + // Emit while server is down — Emit() itself must succeed (DB insert works). + require.NoError(t, em.Emit(ctx, []byte("server-is-down"), emitAttrs()...)) + assert.Equal(t, 1, store.Len(), "event should be persisted even with server down") + + // Stop the emitter to simulate a "node shutdown". + em.Close() + + // Bring up a new server on the same address. + srv2 := &mockChipServer{} + lis, err := net.Listen("tcp", addr) + require.NoError(t, err) + gs2 := grpc.NewServer() + pb.RegisterChipIngressServer(gs2, srv2) + go func() { _ = gs2.Serve(lis) }() + t.Cleanup(func() { gs2.GracefulStop() }) + + // Create a new client and DurableEmitter re-using the same store + // (simulating node restart with Postgres). + client2, err := chipingress.NewClient(addr, chipingress.WithInsecureConnection()) + require.NoError(t, err) + t.Cleanup(func() { _ = client2.Close() }) + + em2, err := beholder.NewDurableEmitter(store, client2, cfg, logger.Test(t)) + require.NoError(t, err) + em2.Start(ctx) + defer em2.Close() + + require.Eventually(t, func() bool { + return srv2.receivedCount() == 1 + }, 5*time.Second, 50*time.Millisecond, "new emitter should retransmit the surviving event") + + require.Eventually(t, func() bool { + return store.Len() == 0 + }, 5*time.Second, 50*time.Millisecond, "store should be empty after retransmit") +} + +func TestIntegration_HighThroughput(t *testing.T) { + srv := &mockChipServer{} + _, addr := startMockServer(t, srv) + client := newChipClient(t, addr) + store := beholder.NewMemDurableEventStore() + + cfg := fastCfg() + cfg.RetransmitBatchSize = 200 + em, err := beholder.NewDurableEmitter(store, client, cfg, logger.Test(t)) + require.NoError(t, err) + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + em.Start(ctx) + defer em.Close() + + const n = 500 + for i := 0; i < n; i++ { + require.NoError(t, em.Emit(ctx, []byte("event"), emitAttrs()...)) + } + + require.Eventually(t, func() bool { + return srv.receivedCount() >= n + }, 10*time.Second, 50*time.Millisecond, "all %d events should be received", n) + + require.Eventually(t, func() bool { + return store.Len() == 0 + }, 10*time.Second, 50*time.Millisecond, "store should drain completely") +} + +func TestIntegration_EventExpiry(t *testing.T) { + // Server always rejects — events can never be delivered. + srv := &mockChipServer{} + srv.setPublishErr(status.Error(codes.Internal, "permanent failure")) + _, addr := startMockServer(t, srv) + client := newChipClient(t, addr) + store := beholder.NewMemDurableEventStore() + + cfg := fastCfg() + cfg.EventTTL = 100 * time.Millisecond + cfg.ExpiryInterval = 100 * time.Millisecond + em, err := beholder.NewDurableEmitter(store, client, cfg, logger.Test(t)) + require.NoError(t, err) + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + em.Start(ctx) + defer em.Close() + + require.NoError(t, em.Emit(ctx, []byte("will-expire"), emitAttrs()...)) + assert.Equal(t, 1, store.Len()) + + require.Eventually(t, func() bool { + return store.Len() == 0 + }, 5*time.Second, 50*time.Millisecond, + "expiry loop should purge undeliverable events after TTL") +} + +func TestIntegration_RetransmitUsesSerialPublish(t *testing.T) { + // Immediate Publish fails; retransmit uses one Publish per queued row. + srv := &mockChipServer{} + srv.setPublishErr(status.Error(codes.Unavailable, "reject immediate")) + _, addr := startMockServer(t, srv) + client := newChipClient(t, addr) + store := beholder.NewMemDurableEventStore() + + em, err := beholder.NewDurableEmitter(store, client, fastCfg(), logger.Test(t)) + require.NoError(t, err) + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + em.Start(ctx) + defer em.Close() + + for i := 0; i < 5; i++ { + require.NoError(t, em.Emit(ctx, []byte("retry-me"), emitAttrs()...)) + } + + srv.setPublishErr(nil) + + require.Eventually(t, func() bool { + return store.Len() == 0 + }, 5*time.Second, 50*time.Millisecond, + "retransmit should deliver each event with its own Publish RPC") + + assert.Equal(t, 0, srv.batchCallCount(), "retransmit should not call PublishBatch") + assert.GreaterOrEqual(t, srv.publishCount.Load(), int64(10), + "five failed immediate attempts plus five retransmit publishes") +} + +// TestIntegration_GRPCConnection verifies the emitter works over a real gRPC +// connection with proper proto serialization round-trip. +func TestIntegration_GRPCConnection(t *testing.T) { + srv := &mockChipServer{} + _, addr := startMockServer(t, srv) + + // Use a raw gRPC dial to prove we're going over the wire. + conn, err := grpc.NewClient(addr, grpc.WithTransportCredentials(insecure.NewCredentials())) + require.NoError(t, err) + t.Cleanup(func() { _ = conn.Close() }) + + // Ping to verify connectivity. + grpcClient := pb.NewChipIngressClient(conn) + pong, err := grpcClient.Ping(context.Background(), &pb.EmptyRequest{}) + require.NoError(t, err) + assert.Equal(t, "pong", pong.Message) + + // Now use the chipingress.Client wrapper with DurableEmitter. + client := newChipClient(t, addr) + store := beholder.NewMemDurableEventStore() + + em, err := beholder.NewDurableEmitter(store, client, fastCfg(), logger.Test(t)) + require.NoError(t, err) + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + em.Start(ctx) + defer em.Close() + + payload := []byte("proto-round-trip-test") + require.NoError(t, em.Emit(ctx, payload, emitAttrs()...)) + + require.Eventually(t, func() bool { + return srv.receivedCount() == 1 + }, 3*time.Second, 10*time.Millisecond) + + // Verify the CloudEvent arrived with correct source/type. + srv.mu.Lock() + received := srv.received[0] + srv.mu.Unlock() + + assert.Equal(t, "test-domain", received.Source) + assert.Equal(t, "test-entity", received.Type) +} diff --git a/pkg/beholder/durable_emitter_metric_info.go b/pkg/beholder/durable_emitter_metric_info.go new file mode 100644 index 0000000000..7cd92e127e --- /dev/null +++ b/pkg/beholder/durable_emitter_metric_info.go @@ -0,0 +1,122 @@ +package beholder + +// Durable emitter OTel instruments (registered via beholder.GetMeter), matching the +// MetricInfo pattern used with beholder elsewhere in chainlink-common. + +var ( + durableEmitterMetricEmitSuccess = MetricInfo{ + Name: "beholder.durable_emitter.emit.success", + Unit: "{call}", + Description: "Successful durable Emit calls (insert returned)", + } + durableEmitterMetricEmitFailure = MetricInfo{ + Name: "beholder.durable_emitter.emit.failure", + Unit: "{call}", + Description: "Failed Emit calls (before or during insert)", + } + durableEmitterMetricEmitDuration = MetricInfo{ + Name: "beholder.durable_emitter.emit.duration", + Unit: "s", + Description: "Emit insert path duration (seconds, fractional; aligns with Prometheus _duration_seconds)", + } + durableEmitterMetricPublishImmSuccess = MetricInfo{ + Name: "beholder.durable_emitter.publish.immediate.success", + Unit: "{call}", + Description: "Immediate Publish RPC successes", + } + durableEmitterMetricPublishImmFailure = MetricInfo{ + Name: "beholder.durable_emitter.publish.immediate.failure", + Unit: "{call}", + Description: "Immediate Publish RPC failures (events await retransmit)", + } + durableEmitterMetricPublishDuration = MetricInfo{ + Name: "beholder.durable_emitter.publish.duration", + Unit: "s", + Description: "Chip Ingress Publish RPC duration (seconds); labels: phase={immediate,retransmit,best_effort}, error={true,false}", + } + durableEmitterMetricPublishBatchSuccess = MetricInfo{ + Name: "beholder.durable_emitter.publish.retransmit.batch.success", + Unit: "{call}", + Description: "Unused; retransmit uses serial Publish (see retransmit.events.*)", + } + durableEmitterMetricPublishBatchFailure = MetricInfo{ + Name: "beholder.durable_emitter.publish.retransmit.batch.failure", + Unit: "{call}", + Description: "Unused; retransmit uses serial Publish (see retransmit.events.*)", + } + durableEmitterMetricPublishBatchEvSuccess = MetricInfo{ + Name: "beholder.durable_emitter.publish.retransmit.events.success", + Unit: "{event}", + Description: "Retransmit Publish RPC successes (one RPC per queued event)", + } + durableEmitterMetricPublishBatchEvFailure = MetricInfo{ + Name: "beholder.durable_emitter.publish.retransmit.events.failure", + Unit: "{event}", + Description: "Retransmit Publish RPC failures (event stays queued)", + } + durableEmitterMetricDeliveryCompleted = MetricInfo{ + Name: "beholder.durable_emitter.delivery.completed", + Unit: "{event}", + Description: "Events removed from store after successful publish (immediate or retransmit)", + } + durableEmitterMetricExpiredPurged = MetricInfo{ + Name: "beholder.durable_emitter.expired_purged", + Unit: "{event}", + Description: "Events deleted by TTL expiry loop", + } + durableEmitterMetricStoreOperations = MetricInfo{ + Name: "beholder.durable_emitter.store.operations", + Unit: "{op}", + Description: "Durable store operations (proxy for DB load / IOPs)", + } + durableEmitterMetricStoreOpDuration = MetricInfo{ + Name: "beholder.durable_emitter.store.operation.duration", + Unit: "s", + Description: "Durable store operation latency (seconds, fractional)", + } + durableEmitterMetricQueueDepth = MetricInfo{ + Name: "beholder.durable_emitter.queue.depth", + Unit: "{row}", + Description: "Pending rows in durable queue", + } + durableEmitterMetricQueuePayloadBytes = MetricInfo{ + Name: "beholder.durable_emitter.queue.payload_bytes", + Unit: "By", + Description: "Sum of payload bytes for pending rows", + } + durableEmitterMetricQueueOldestAgeSec = MetricInfo{ + Name: "beholder.durable_emitter.queue.oldest_pending_age_seconds", + Unit: "s", + Description: "Age of oldest pending row at last poll (longest wait)", + } + durableEmitterMetricQueueNearTTL = MetricInfo{ + Name: "beholder.durable_emitter.queue.near_ttl", + Unit: "{row}", + Description: "Rows within near-expiry window of EventTTL (DLQ pressure proxy; no separate DLQ table)", + } + durableEmitterMetricQueueCapacityRatio = MetricInfo{ + Name: "beholder.durable_emitter.queue.capacity_usage_ratio", + Unit: "1", + Description: "queue.payload_bytes / MaxQueuePayloadBytes when max > 0", + } + durableEmitterMetricProcHeapInuse = MetricInfo{ + Name: "beholder.durable_emitter.process.memory.heap_inuse_bytes", + Unit: "By", + Description: "Go runtime MemStats HeapInuse", + } + durableEmitterMetricProcHeapSys = MetricInfo{ + Name: "beholder.durable_emitter.process.memory.heap_sys_bytes", + Unit: "By", + Description: "Go runtime MemStats HeapSys", + } + durableEmitterMetricProcCPUUser = MetricInfo{ + Name: "beholder.durable_emitter.process.cpu.user_seconds", + Unit: "s", + Description: "Cumulative user CPU seconds (getrusage; Unix only)", + } + durableEmitterMetricProcCPUSys = MetricInfo{ + Name: "beholder.durable_emitter.process.cpu.system_seconds", + Unit: "s", + Description: "Cumulative system CPU seconds (getrusage; Unix only)", + } +) diff --git a/pkg/beholder/durable_emitter_metrics.go b/pkg/beholder/durable_emitter_metrics.go new file mode 100644 index 0000000000..c9c11a96b3 --- /dev/null +++ b/pkg/beholder/durable_emitter_metrics.go @@ -0,0 +1,184 @@ +package beholder + +import ( + "context" + "runtime" + "time" + + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/metric" +) + +// DurableEmitterMetricsConfig enables OpenTelemetry metrics for DurableEmitter. +// Set on DurableEmitterConfig.Metrics; nil disables instrumentation. +// +// Instruments are registered on beholder.GetMeter() (same path as capabilities +// and monitoring metrics). Ensure beholder.SetClient has been called with a +// configured client before NewDurableEmitter when metrics are enabled. +type DurableEmitterMetricsConfig struct { + // PollInterval is how often queue and optional process gauges refresh. Zero = 10s. + PollInterval time.Duration + // NearExpiryLead is the window before EventTTL used for queue.near_ttl (DLQ pressure proxy). Zero = 5m. + NearExpiryLead time.Duration + // MaxQueuePayloadBytes, if > 0, records capacity_usage_ratio = queue_payload_bytes / max. + MaxQueuePayloadBytes int64 + // RecordProcessStats records Go heap gauges and, on Unix, cumulative CPU seconds (getrusage). + RecordProcessStats bool +} + +type durableEmitterMetrics struct { + emitSuccess metric.Int64Counter + emitFail metric.Int64Counter + emitDuration metric.Float64Histogram + publishImmOK metric.Int64Counter + publishImmErr metric.Int64Counter + publishDuration metric.Float64Histogram + publishBatchOK metric.Int64Counter + publishBatchErr metric.Int64Counter + publishBatchEvOK metric.Int64Counter + publishBatchEvErr metric.Int64Counter + deliverComplete metric.Int64Counter + expiredPurged metric.Int64Counter + storeOps metric.Int64Counter + storeOpDuration metric.Float64Histogram + queueDepth metric.Int64Gauge + queuePayloadBytes metric.Int64Gauge + queueOldestAgeSec metric.Float64Gauge + queueNearTTL metric.Int64Gauge + queueCapacityRatio metric.Float64Gauge + procHeapInuse metric.Int64Gauge + procHeapSys metric.Int64Gauge + procCPUUser metric.Float64Gauge + procCPUSys metric.Float64Gauge +} + +func newDurableEmitterMetrics() (*durableEmitterMetrics, error) { + meter := GetMeter() + m := &durableEmitterMetrics{} + var err error + if m.emitSuccess, err = durableEmitterMetricEmitSuccess.NewInt64Counter(meter); err != nil { + return nil, err + } + if m.emitFail, err = durableEmitterMetricEmitFailure.NewInt64Counter(meter); err != nil { + return nil, err + } + if m.emitDuration, err = durableEmitterMetricEmitDuration.NewFloat64Histogram(meter); err != nil { + return nil, err + } + if m.publishImmOK, err = durableEmitterMetricPublishImmSuccess.NewInt64Counter(meter); err != nil { + return nil, err + } + if m.publishImmErr, err = durableEmitterMetricPublishImmFailure.NewInt64Counter(meter); err != nil { + return nil, err + } + if m.publishDuration, err = durableEmitterMetricPublishDuration.NewFloat64Histogram(meter); err != nil { + return nil, err + } + if m.publishBatchOK, err = durableEmitterMetricPublishBatchSuccess.NewInt64Counter(meter); err != nil { + return nil, err + } + if m.publishBatchErr, err = durableEmitterMetricPublishBatchFailure.NewInt64Counter(meter); err != nil { + return nil, err + } + if m.publishBatchEvOK, err = durableEmitterMetricPublishBatchEvSuccess.NewInt64Counter(meter); err != nil { + return nil, err + } + if m.publishBatchEvErr, err = durableEmitterMetricPublishBatchEvFailure.NewInt64Counter(meter); err != nil { + return nil, err + } + if m.deliverComplete, err = durableEmitterMetricDeliveryCompleted.NewInt64Counter(meter); err != nil { + return nil, err + } + if m.expiredPurged, err = durableEmitterMetricExpiredPurged.NewInt64Counter(meter); err != nil { + return nil, err + } + if m.storeOps, err = durableEmitterMetricStoreOperations.NewInt64Counter(meter); err != nil { + return nil, err + } + if m.storeOpDuration, err = durableEmitterMetricStoreOpDuration.NewFloat64Histogram(meter); err != nil { + return nil, err + } + if m.queueDepth, err = durableEmitterMetricQueueDepth.NewInt64Gauge(meter); err != nil { + return nil, err + } + if m.queuePayloadBytes, err = durableEmitterMetricQueuePayloadBytes.NewInt64Gauge(meter); err != nil { + return nil, err + } + if m.queueOldestAgeSec, err = durableEmitterMetricQueueOldestAgeSec.NewFloat64Gauge(meter); err != nil { + return nil, err + } + if m.queueNearTTL, err = durableEmitterMetricQueueNearTTL.NewInt64Gauge(meter); err != nil { + return nil, err + } + if m.queueCapacityRatio, err = durableEmitterMetricQueueCapacityRatio.NewFloat64Gauge(meter); err != nil { + return nil, err + } + if m.procHeapInuse, err = durableEmitterMetricProcHeapInuse.NewInt64Gauge(meter); err != nil { + return nil, err + } + if m.procHeapSys, err = durableEmitterMetricProcHeapSys.NewInt64Gauge(meter); err != nil { + return nil, err + } + if m.procCPUUser, err = durableEmitterMetricProcCPUUser.NewFloat64Gauge(meter); err != nil { + return nil, err + } + if m.procCPUSys, err = durableEmitterMetricProcCPUSys.NewFloat64Gauge(meter); err != nil { + return nil, err + } + return m, nil +} + +func (m *durableEmitterMetrics) recordStoreOp(ctx context.Context, op string, elapsed time.Duration, opErr error) { + if m == nil { + return + } + attrs := metric.WithAttributes( + attribute.String("operation", op), + attribute.Bool("error", opErr != nil), + ) + m.storeOps.Add(ctx, 1, attrs) + m.storeOpDuration.Record(ctx, elapsed.Seconds(), metric.WithAttributes(attribute.String("operation", op))) +} + +func (m *durableEmitterMetrics) pollQueueGauges(ctx context.Context, obs DurableQueueObserver, ttl, lead time.Duration, maxBytes int64) { + if m == nil || obs == nil { + return + } + st, err := obs.ObserveDurableQueue(ctx, ttl, lead) + if err != nil { + return + } + m.queueDepth.Record(ctx, st.Depth) + m.queuePayloadBytes.Record(ctx, st.PayloadBytes) + if st.Depth == 0 { + m.queueOldestAgeSec.Record(ctx, 0) + } else { + m.queueOldestAgeSec.Record(ctx, st.OldestPendingAge.Seconds()) + } + m.queueNearTTL.Record(ctx, st.NearTTLCount) + if maxBytes > 0 { + m.queueCapacityRatio.Record(ctx, float64(st.PayloadBytes)/float64(maxBytes)) + } +} + +func (m *durableEmitterMetrics) recordProcessMem(ctx context.Context) { + if m == nil { + return + } + var ms runtime.MemStats + runtime.ReadMemStats(&ms) + m.procHeapInuse.Record(ctx, int64(ms.HeapInuse)) + m.procHeapSys.Record(ctx, int64(ms.HeapSys)) +} + +func (m *durableEmitterMetrics) recordPublish(ctx context.Context, elapsed time.Duration, phase string, err error) { + if m == nil { + return + } + m.publishDuration.Record(ctx, elapsed.Seconds(), + metric.WithAttributes( + attribute.String("phase", phase), + attribute.Bool("error", err != nil), + ), + ) +} diff --git a/pkg/beholder/durable_emitter_store_wrap.go b/pkg/beholder/durable_emitter_store_wrap.go new file mode 100644 index 0000000000..9f68047a76 --- /dev/null +++ b/pkg/beholder/durable_emitter_store_wrap.go @@ -0,0 +1,73 @@ +package beholder + +import ( + "context" + "errors" + "time" +) + +// metricsInstrumentedStore wraps DurableEventStore to record store operation metrics. +type metricsInstrumentedStore struct { + inner DurableEventStore + m *durableEmitterMetrics +} + +var _ DurableEventStore = (*metricsInstrumentedStore)(nil) +var _ DurableQueueObserver = (*metricsInstrumentedStore)(nil) + +func newMetricsInstrumentedStore(inner DurableEventStore, m *durableEmitterMetrics) DurableEventStore { + if m == nil { + return inner + } + return &metricsInstrumentedStore{inner: inner, m: m} +} + +func (s *metricsInstrumentedStore) Insert(ctx context.Context, payload []byte) (int64, error) { + t0 := time.Now() + id, err := s.inner.Insert(ctx, payload) + s.m.recordStoreOp(ctx, "insert", time.Since(t0), err) + return id, err +} + +func (s *metricsInstrumentedStore) Delete(ctx context.Context, id int64) error { + t0 := time.Now() + err := s.inner.Delete(ctx, id) + s.m.recordStoreOp(ctx, "delete", time.Since(t0), err) + return err +} + +func (s *metricsInstrumentedStore) MarkDelivered(ctx context.Context, id int64) error { + t0 := time.Now() + err := s.inner.MarkDelivered(ctx, id) + s.m.recordStoreOp(ctx, "mark_delivered", time.Since(t0), err) + return err +} + +func (s *metricsInstrumentedStore) PurgeDelivered(ctx context.Context, batchLimit int) (int64, error) { + t0 := time.Now() + n, err := s.inner.PurgeDelivered(ctx, batchLimit) + s.m.recordStoreOp(ctx, "purge_delivered", time.Since(t0), err) + return n, err +} + +func (s *metricsInstrumentedStore) ListPending(ctx context.Context, createdBefore time.Time, limit int) ([]DurableEvent, error) { + t0 := time.Now() + evs, err := s.inner.ListPending(ctx, createdBefore, limit) + s.m.recordStoreOp(ctx, "list_pending", time.Since(t0), err) + return evs, err +} + +func (s *metricsInstrumentedStore) DeleteExpired(ctx context.Context, ttl time.Duration) (int64, error) { + t0 := time.Now() + n, err := s.inner.DeleteExpired(ctx, ttl) + s.m.recordStoreOp(ctx, "delete_expired", time.Since(t0), err) + return n, err +} + +func (s *metricsInstrumentedStore) ObserveDurableQueue(ctx context.Context, eventTTL, nearExpiryLead time.Duration) (DurableQueueStats, error) { + o, ok := s.inner.(DurableQueueObserver) + if !ok { + return DurableQueueStats{}, errors.New("inner DurableEventStore does not implement DurableQueueObserver") + } + return o.ObserveDurableQueue(ctx, eventTTL, nearExpiryLead) +} diff --git a/pkg/beholder/durable_emitter_test.go b/pkg/beholder/durable_emitter_test.go new file mode 100644 index 0000000000..7846e63031 --- /dev/null +++ b/pkg/beholder/durable_emitter_test.go @@ -0,0 +1,435 @@ +package beholder + +import ( + "context" + "errors" + "sync" + "sync/atomic" + "testing" + "time" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "google.golang.org/grpc" + "google.golang.org/protobuf/proto" + + "github.com/smartcontractkit/chainlink-common/pkg/chipingress" + "github.com/smartcontractkit/chainlink-common/pkg/logger" + sdkmetric "go.opentelemetry.io/otel/sdk/metric" + "go.opentelemetry.io/otel/sdk/metric/metricdata" +) + +// withTestBeholderMeter swaps the global beholder client meter for t's lifetime (for metrics assertions). +func withTestBeholderMeter(t *testing.T) *sdkmetric.ManualReader { + t.Helper() + prev := GetClient() + reader := sdkmetric.NewManualReader() + mp := sdkmetric.NewMeterProvider(sdkmetric.WithReader(reader)) + c := NewNoopClient() + c.MeterProvider = mp + c.Meter = mp.Meter(defaultPackageName) + SetClient(c) + t.Cleanup(func() { + SetClient(prev) + _ = mp.Shutdown(context.Background()) + }) + return reader +} + +// testChipClient is a minimal chipingress.Client for tests. +type testChipClient struct { + chipingress.NoopClient + + mu sync.Mutex + publishErr error + publishCount atomic.Int64 + publishedIDs []string +} + +func (c *testChipClient) Publish(_ context.Context, ev *chipingress.CloudEventPb, _ ...grpc.CallOption) (*chipingress.PublishResponse, error) { + c.publishCount.Add(1) + c.mu.Lock() + if ev != nil { + c.publishedIDs = append(c.publishedIDs, ev.Id) + } + err := c.publishErr + c.mu.Unlock() + return &chipingress.PublishResponse{}, err +} + +func (c *testChipClient) PublishBatch(_ context.Context, _ *chipingress.CloudEventBatch, _ ...grpc.CallOption) (*chipingress.PublishResponse, error) { + return &chipingress.PublishResponse{}, nil +} + +func (c *testChipClient) setPublishErr(err error) { + c.mu.Lock() + defer c.mu.Unlock() + c.publishErr = err +} + +func (c *testChipClient) getPublishedIDs() []string { + c.mu.Lock() + defer c.mu.Unlock() + out := make([]string, len(c.publishedIDs)) + copy(out, c.publishedIDs) + return out +} + +func testEmitAttrs() []any { + return []any{"source", "test-source", "type", "test-type"} +} + +func newTestDurableEmitter(t *testing.T, store DurableEventStore, client chipingress.Client, cfgOverride *DurableEmitterConfig) *DurableEmitter { + t.Helper() + cfg := DefaultDurableEmitterConfig() + if cfgOverride != nil { + cfg = *cfgOverride + } + em, err := NewDurableEmitter(store, client, cfg, logger.Test(t)) + require.NoError(t, err) + return em +} + +func TestDurableEmitter_HooksImmediatePath(t *testing.T) { + store := NewMemDurableEventStore() + client := &testChipClient{} + var pubCalls, delCalls atomic.Int32 + cfg := DefaultDurableEmitterConfig() + cfg.Hooks = &DurableEmitterHooks{ + OnImmediatePublish: func(time.Duration, error) { pubCalls.Add(1) }, + OnImmediateDelete: func(time.Duration, error) { delCalls.Add(1) }, + } + em, err := NewDurableEmitter(store, client, cfg, logger.Test(t)) + require.NoError(t, err) + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + em.Start(ctx) + defer em.Close() + + require.NoError(t, em.Emit(ctx, []byte("hello"), testEmitAttrs()...)) + require.Eventually(t, func() bool { return store.Len() == 0 }, 2*time.Second, 10*time.Millisecond) + assert.Equal(t, int32(1), pubCalls.Load()) + assert.Equal(t, int32(1), delCalls.Load()) +} + +func TestDurableEmitter_HooksPublishFailureSkipsDeleteHook(t *testing.T) { + store := NewMemDurableEventStore() + client := &testChipClient{} + client.setPublishErr(errors.New("down")) + var pubCalls, delCalls atomic.Int32 + cfg := DefaultDurableEmitterConfig() + cfg.Hooks = &DurableEmitterHooks{ + OnImmediatePublish: func(time.Duration, error) { pubCalls.Add(1) }, + OnImmediateDelete: func(time.Duration, error) { delCalls.Add(1) }, + } + em, err := NewDurableEmitter(store, client, cfg, logger.Test(t)) + require.NoError(t, err) + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + em.Start(ctx) + defer em.Close() + + require.NoError(t, em.Emit(ctx, []byte("hello"), testEmitAttrs()...)) + require.Eventually(t, func() bool { return pubCalls.Load() == 1 }, 2*time.Second, 10*time.Millisecond) + assert.Equal(t, int32(0), delCalls.Load()) +} + +func TestDurableEmitter_EmitPersistsAndPublishes(t *testing.T) { + store := NewMemDurableEventStore() + client := &testChipClient{} + em := newTestDurableEmitter(t, store, client, nil) + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + em.Start(ctx) + defer em.Close() + + err := em.Emit(ctx, []byte("hello"), testEmitAttrs()...) + require.NoError(t, err) + + // Immediate async publish should fire and delete the record. + require.Eventually(t, func() bool { + return client.publishCount.Load() == 1 + }, 2*time.Second, 10*time.Millisecond) + + require.Eventually(t, func() bool { + return store.Len() == 0 + }, 2*time.Second, 10*time.Millisecond) +} + +func TestDurableEmitter_EmitReturnSuccessEvenWhenPublishFails(t *testing.T) { + store := NewMemDurableEventStore() + client := &testChipClient{} + client.setPublishErr(errors.New("connection refused")) + + em := newTestDurableEmitter(t, store, client, nil) + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + em.Start(ctx) + defer em.Close() + + err := em.Emit(ctx, []byte("hello"), testEmitAttrs()...) + require.NoError(t, err, "Emit must succeed once the DB insert succeeds") + + // Wait for the async publish attempt to complete. + require.Eventually(t, func() bool { + return client.publishCount.Load() == 1 + }, 2*time.Second, 10*time.Millisecond) + + // Event must remain in the store for retransmit. + assert.Equal(t, 1, store.Len()) +} + +func TestDurableEmitter_RetransmitLoopDeliversFailedEvents(t *testing.T) { + store := NewMemDurableEventStore() + client := &testChipClient{} + client.setPublishErr(errors.New("connection refused")) + + cfg := DefaultDurableEmitterConfig() + cfg.RetransmitInterval = 100 * time.Millisecond + cfg.RetransmitAfter = 50 * time.Millisecond + + em := newTestDurableEmitter(t, store, client, &cfg) + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + em.Start(ctx) + defer em.Close() + + err := em.Emit(ctx, []byte("retry-me"), testEmitAttrs()...) + require.NoError(t, err) + assert.Equal(t, 1, store.Len()) + + client.setPublishErr(nil) + + require.Eventually(t, func() bool { + return store.Len() == 0 + }, 5*time.Second, 50*time.Millisecond, "retransmit loop should eventually deliver and delete the event") + + assert.GreaterOrEqual(t, client.publishCount.Load(), int64(2)) +} + +func TestDurableEmitter_RetransmitSerialDistinctCloudEvents(t *testing.T) { + store := NewMemDurableEventStore() + client := &testChipClient{} + client.setPublishErr(errors.New("immediate fail")) + + cfg := DefaultDurableEmitterConfig() + cfg.RetransmitInterval = 100 * time.Millisecond + cfg.RetransmitAfter = 50 * time.Millisecond + + em := newTestDurableEmitter(t, store, client, &cfg) + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + em.Start(ctx) + defer em.Close() + + require.NoError(t, em.Emit(ctx, []byte("first"), testEmitAttrs()...)) + require.NoError(t, em.Emit(ctx, []byte("second"), testEmitAttrs()...)) + + client.setPublishErr(nil) + + require.Eventually(t, func() bool { return store.Len() == 0 }, 5*time.Second, 50*time.Millisecond) + + ids := client.getPublishedIDs() + require.GreaterOrEqual(t, len(ids), 4, "two immediate fails then two retransmit publishes") + a, b := ids[len(ids)-2], ids[len(ids)-1] + assert.NotEmpty(t, a) + assert.NotEmpty(t, b) + assert.NotEqualf(t, a, b, "retransmit must publish two distinct CloudEvents, not one pointer reused for every row") +} + +func TestDurableEmitter_ExpiryLoopDeletesOldEvents(t *testing.T) { + store := NewMemDurableEventStore() + client := &testChipClient{} + client.setPublishErr(errors.New("always fail")) + + cfg := DefaultDurableEmitterConfig() + cfg.ExpiryInterval = 100 * time.Millisecond + cfg.EventTTL = 50 * time.Millisecond + cfg.RetransmitInterval = 10 * time.Minute // effectively disable retransmit + + em := newTestDurableEmitter(t, store, client, &cfg) + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + em.Start(ctx) + defer em.Close() + + err := em.Emit(ctx, []byte("will-expire"), testEmitAttrs()...) + require.NoError(t, err) + assert.Equal(t, 1, store.Len()) + + require.Eventually(t, func() bool { + return store.Len() == 0 + }, 5*time.Second, 50*time.Millisecond, "expiry loop should purge the event") +} + +func TestDurableEmitter_PersistSourceFilter_skipsStoreBestEffortPublish(t *testing.T) { + store := NewMemDurableEventStore() + client := &testChipClient{} + cfg := DefaultDurableEmitterConfig() + cfg.PersistCloudEventSources = []string{"only-this"} + em := newTestDurableEmitter(t, store, client, &cfg) + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + em.Start(ctx) + defer em.Close() + + require.NoError(t, em.Emit(ctx, []byte("x"), testEmitAttrs()...)) + require.Eventually(t, func() bool { return client.publishCount.Load() == 1 }, 2*time.Second, 10*time.Millisecond) + assert.Equal(t, 0, store.Len()) +} + +func TestDurableEmitter_PersistSourceFilter_persistsAllowedSource(t *testing.T) { + store := NewMemDurableEventStore() + client := &testChipClient{} + cfg := DefaultDurableEmitterConfig() + cfg.PersistCloudEventSources = []string{"test-source"} + em := newTestDurableEmitter(t, store, client, &cfg) + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + em.Start(ctx) + defer em.Close() + + require.NoError(t, em.Emit(ctx, []byte("x"), testEmitAttrs()...)) + require.Eventually(t, func() bool { return client.publishCount.Load() == 1 }, 2*time.Second, 10*time.Millisecond) + require.Eventually(t, func() bool { return store.Len() == 0 }, 2*time.Second, 10*time.Millisecond) +} + +func TestDurableEmitter_PersistSourceWildcardStarAllowsAll(t *testing.T) { + store := NewMemDurableEventStore() + client := &testChipClient{} + cfg := DefaultDurableEmitterConfig() + cfg.PersistCloudEventSources = []string{"*"} + em := newTestDurableEmitter(t, store, client, &cfg) + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + em.Start(ctx) + defer em.Close() + + require.NoError(t, em.Emit(ctx, []byte("x"), testEmitAttrs()...)) + require.Eventually(t, func() bool { return store.Len() == 0 }, 2*time.Second, 10*time.Millisecond) +} + +func TestDurableEmitter_RetransmitDropsDisallowedSource(t *testing.T) { + store := NewMemDurableEventStore() + client := &testChipClient{} + + ev, err := chipingress.NewEvent("unknown-domain", "t", []byte("b"), nil) + require.NoError(t, err) + evPb, err := chipingress.EventToProto(ev) + require.NoError(t, err) + payload, err := proto.Marshal(evPb) + require.NoError(t, err) + + _, err = store.Insert(context.Background(), payload) + require.NoError(t, err) + + cfg := DefaultDurableEmitterConfig() + cfg.PersistCloudEventSources = []string{"test-source"} + cfg.RetransmitInterval = 50 * time.Millisecond + cfg.RetransmitAfter = 30 * time.Millisecond + + em := newTestDurableEmitter(t, store, client, &cfg) + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + em.Start(ctx) + defer em.Close() + + require.Eventually(t, func() bool { + return store.Len() == 0 && client.publishCount.Load() == 0 + }, 3*time.Second, 20*time.Millisecond, "disallowed row should be deleted without Publish") +} + +func TestDurableEmitter_EmitRejectsInvalidAttributes(t *testing.T) { + store := NewMemDurableEventStore() + client := &testChipClient{} + em := newTestDurableEmitter(t, store, client, nil) + + err := em.Emit(context.Background(), []byte("no-attrs")) + require.Error(t, err) + assert.Equal(t, 0, store.Len(), "nothing should be persisted when attributes are invalid") +} + +func TestDurableEmitter_MultipleEvents(t *testing.T) { + store := NewMemDurableEventStore() + client := &testChipClient{} + em := newTestDurableEmitter(t, store, client, nil) + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + em.Start(ctx) + defer em.Close() + + const n = 50 + for i := 0; i < n; i++ { + err := em.Emit(ctx, []byte("event"), testEmitAttrs()...) + require.NoError(t, err) + } + + require.Eventually(t, func() bool { + return client.publishCount.Load() == int64(n) + }, 5*time.Second, 10*time.Millisecond) + + require.Eventually(t, func() bool { + return store.Len() == 0 + }, 5*time.Second, 10*time.Millisecond, "all events should be delivered and deleted") +} + +func TestNewDurableEmitter_ValidationErrors(t *testing.T) { + log := logger.Test(t) + cfg := DefaultDurableEmitterConfig() + + _, err := NewDurableEmitter(nil, &testChipClient{}, cfg, log) + assert.ErrorContains(t, err, "store") + + _, err = NewDurableEmitter(NewMemDurableEventStore(), nil, cfg, log) + assert.ErrorContains(t, err, "client") + + _, err = NewDurableEmitter(NewMemDurableEventStore(), &testChipClient{}, cfg, nil) + assert.ErrorContains(t, err, "logger") +} + +func TestDurableEmitter_MetricsRegistersEmitSuccess(t *testing.T) { + reader := withTestBeholderMeter(t) + + store := NewMemDurableEventStore() + client := &testChipClient{} + cfg := DefaultDurableEmitterConfig() + cfg.RetransmitInterval = time.Hour + cfg.Metrics = &DurableEmitterMetricsConfig{PollInterval: 25 * time.Millisecond} + + em, err := NewDurableEmitter(store, client, cfg, logger.Test(t)) + require.NoError(t, err) + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + em.Start(ctx) + defer func() { _ = em.Close() }() + + require.NoError(t, em.Emit(ctx, []byte("m"), testEmitAttrs()...)) + require.Eventually(t, func() bool { return store.Len() == 0 }, 2*time.Second, 10*time.Millisecond) + time.Sleep(50 * time.Millisecond) + + var rm metricdata.ResourceMetrics + require.NoError(t, reader.Collect(ctx, &rm)) + + var found bool + for _, sm := range rm.ScopeMetrics { + for _, m := range sm.Metrics { + if m.Name == "beholder.durable_emitter.emit.success" { + found = true + } + } + } + assert.True(t, found, "expected beholder.durable_emitter.emit.success in exported metrics") +} diff --git a/pkg/beholder/durable_event_store.go b/pkg/beholder/durable_event_store.go new file mode 100644 index 0000000000..86b46ad7b0 --- /dev/null +++ b/pkg/beholder/durable_event_store.go @@ -0,0 +1,173 @@ +package beholder + +import ( + "context" + "sort" + "sync" + "sync/atomic" + "time" +) + +// DurableEvent represents a persisted event awaiting delivery to Chip. +type DurableEvent struct { + ID int64 + Payload []byte // serialized CloudEventPb proto + CreatedAt time.Time +} + +// DurableQueueStats is a point-in-time snapshot of the pending queue for metrics. +type DurableQueueStats struct { + Depth int64 + PayloadBytes int64 + OldestPendingAge time.Duration // 0 if the queue is empty + // NearTTLCount is the number of rows within nearExpiryLead of EventTTL (still + // pending, not yet removed by expiry). Serves as a DLQ-pressure proxy; there is + // no separate dead-letter table in the default design. + NearTTLCount int64 +} + +// DurableQueueObserver is optionally implemented by DurableEventStore implementations +// so DurableEmitter can export queue depth and age gauges when metrics are enabled. +type DurableQueueObserver interface { + // ObserveDurableQueue returns live queue statistics. eventTTL and nearExpiryLead + // match DurableEmitterConfig (nearExpiryLead should be << eventTTL). + ObserveDurableQueue(ctx context.Context, eventTTL, nearExpiryLead time.Duration) (DurableQueueStats, error) +} + +// DurableEventStore abstracts the persistence layer for durable chip events. +// Implementations must be safe for concurrent use. +type DurableEventStore interface { + // Insert persists a serialized event and returns its assigned ID. + Insert(ctx context.Context, payload []byte) (int64, error) + // Delete physically removes a row (corrupt payloads, policy drops, tests). + Delete(ctx context.Context, id int64) error + // MarkDelivered records successful delivery to Chip. The row must no longer + // appear in ListPending. Postgres implementations typically set delivered_at; + // a background PurgeDelivered removes rows later. MemDurableEventStore removes + // the row immediately (same as Delete). + MarkDelivered(ctx context.Context, id int64) error + // PurgeDelivered deletes up to batchLimit rows already marked delivered. + // Implementations that remove rows in MarkDelivered may return 0, nil always. + PurgeDelivered(ctx context.Context, batchLimit int) (deleted int64, err error) + // ListPending returns events created before the given cutoff, ordered by + // creation time ascending, up to limit rows. + ListPending(ctx context.Context, createdBefore time.Time, limit int) ([]DurableEvent, error) + // DeleteExpired removes events older than ttl and returns the count deleted. + DeleteExpired(ctx context.Context, ttl time.Duration) (int64, error) +} + +// MemDurableEventStore is an in-memory DurableEventStore for unit tests. +type MemDurableEventStore struct { + mu sync.Mutex + events map[int64]*DurableEvent + nextID atomic.Int64 +} + +var ( + _ DurableEventStore = (*MemDurableEventStore)(nil) + _ DurableQueueObserver = (*MemDurableEventStore)(nil) +) + +func NewMemDurableEventStore() *MemDurableEventStore { + return &MemDurableEventStore{ + events: make(map[int64]*DurableEvent), + } +} + +func (m *MemDurableEventStore) Insert(_ context.Context, payload []byte) (int64, error) { + id := m.nextID.Add(1) + m.mu.Lock() + defer m.mu.Unlock() + m.events[id] = &DurableEvent{ + ID: id, + Payload: append([]byte(nil), payload...), // defensive copy + CreatedAt: time.Now(), + } + return id, nil +} + +func (m *MemDurableEventStore) Delete(_ context.Context, id int64) error { + m.mu.Lock() + defer m.mu.Unlock() + delete(m.events, id) + return nil +} + +func (m *MemDurableEventStore) MarkDelivered(ctx context.Context, id int64) error { + return m.Delete(ctx, id) +} + +func (m *MemDurableEventStore) PurgeDelivered(_ context.Context, _ int) (int64, error) { + return 0, nil +} + +func (m *MemDurableEventStore) ListPending(_ context.Context, createdBefore time.Time, limit int) ([]DurableEvent, error) { + m.mu.Lock() + defer m.mu.Unlock() + + var result []DurableEvent + for _, e := range m.events { + if e.CreatedAt.Before(createdBefore) { + result = append(result, *e) + } + } + sort.Slice(result, func(i, j int) bool { + return result[i].CreatedAt.Before(result[j].CreatedAt) + }) + if len(result) > limit { + result = result[:limit] + } + return result, nil +} + +func (m *MemDurableEventStore) DeleteExpired(_ context.Context, ttl time.Duration) (int64, error) { + m.mu.Lock() + defer m.mu.Unlock() + + cutoff := time.Now().Add(-ttl) + var deleted int64 + for id, e := range m.events { + if e.CreatedAt.Before(cutoff) { + delete(m.events, id) + deleted++ + } + } + return deleted, nil +} + +// Len returns the number of events in the store (test helper). +func (m *MemDurableEventStore) Len() int { + m.mu.Lock() + defer m.mu.Unlock() + return len(m.events) +} + +// ObserveDurableQueue implements DurableQueueObserver. +func (m *MemDurableEventStore) ObserveDurableQueue(_ context.Context, eventTTL, nearExpiryLead time.Duration) (DurableQueueStats, error) { + m.mu.Lock() + defer m.mu.Unlock() + now := time.Now() + var st DurableQueueStats + if len(m.events) == 0 { + return st, nil + } + var oldest time.Time + first := true + for _, e := range m.events { + st.Depth++ + st.PayloadBytes += int64(len(e.Payload)) + if first || e.CreatedAt.Before(oldest) { + oldest = e.CreatedAt + first = false + } + age := now.Sub(e.CreatedAt) + if eventTTL > 0 && nearExpiryLead > 0 && nearExpiryLead < eventTTL { + threshold := eventTTL - nearExpiryLead + if age >= threshold && age < eventTTL { + st.NearTTLCount++ + } + } + } + st.OldestPendingAge = now.Sub(oldest) + return st, nil +} diff --git a/pkg/chipingress/client.go b/pkg/chipingress/client.go index d22b1807d1..e3d833284b 100644 --- a/pkg/chipingress/client.go +++ b/pkg/chipingress/client.go @@ -262,6 +262,8 @@ func newHeaderInterceptor(provider HeaderProvider) grpc.UnaryClientInterceptor { } // NewEvent creates a new CloudEvent with the specified domain, entity, payload, and optional attributes. +// Recognized optional keys include CloudEvents names (dataschema, subject, time, …) and Beholder's +// beholder_data_schema, which is mapped to the CloudEvent dataschema when dataschema is not set. func NewEvent(domain, entity string, payload []byte, attributes map[string]any) (CloudEvent, error) { event := ce.NewEvent() @@ -274,6 +276,8 @@ func NewEvent(domain, entity string, payload []byte, attributes map[string]any) attributes = make(map[string]any) } + const beholderDataSchemaKey = "beholder_data_schema" + recordedTime := time.Now() if val, ok := attributes["recordedtime"].(time.Time); ok && !val.IsZero() { recordedTime = val @@ -289,6 +293,8 @@ func NewEvent(domain, entity string, payload []byte, attributes map[string]any) } if val, ok := attributes["dataschema"].(string); ok { event.SetDataSchema(val) + } else if val, ok := attributes[beholderDataSchemaKey].(string); ok { + event.SetDataSchema(val) } if val, ok := attributes["subject"].(string); ok { event.SetSubject(val) diff --git a/pkg/chipingress/client_test.go b/pkg/chipingress/client_test.go index 6b259460a6..82f32e749c 100644 --- a/pkg/chipingress/client_test.go +++ b/pkg/chipingress/client_test.go @@ -126,6 +126,29 @@ func TestNewEvent(t *testing.T) { assert.Equal(t, testProto.Message, resultProto.Message) } +func TestNewEventBeholderDataSchema(t *testing.T) { + testProto := pb.PingResponse{Message: "x"} + protoBytes, err := proto.Marshal(&testProto) + require.NoError(t, err) + + t.Run("beholder_data_schema sets CloudEvent dataschema", func(t *testing.T) { + event, err := NewEvent("platform", "workflows.v2.WorkflowUserLog", protoBytes, map[string]any{ + "beholder_data_schema": "/cre-events-user-logs/v2", + }) + require.NoError(t, err) + assert.Equal(t, "/cre-events-user-logs/v2", event.DataSchema()) + }) + + t.Run("dataschema takes precedence over beholder_data_schema", func(t *testing.T) { + event, err := NewEvent("platform", "workflows.v2.WorkflowUserLog", protoBytes, map[string]any{ + "dataschema": "https://explicit.example/schema", + "beholder_data_schema": "/ignored", + }) + require.NoError(t, err) + assert.Equal(t, "https://explicit.example/schema", event.DataSchema()) + }) +} + func TestEventToProto(t *testing.T) { // Create a test protobuf message testProto := pb.PingResponse{Message: "test message"}