Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions core/config/telemetry_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,10 @@ type Telemetry interface {
EmitterExportTimeout() time.Duration
ChipIngressEndpoint() string
ChipIngressInsecureConnection() bool
DurableEmitterEnabled() bool
// DurableEmitterPersistSources lists CloudEvent Source values (beholder_domain) that may be
// written to the durable Chip queue. See chainlink telemetry config for defaults and wildcards.
DurableEmitterPersistSources() []string
HeartbeatInterval() time.Duration
LogStreamingEnabled() bool
LogLevel() zapcore.Level
Expand Down
8 changes: 8 additions & 0 deletions core/config/toml/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -2762,6 +2762,8 @@ type Telemetry struct {
AuthHeadersTTL *commonconfig.Duration
ChipIngressEndpoint *string
ChipIngressInsecureConnection *bool
DurableEmitterEnabled *bool
DurableEmitterPersistSources []string `toml:",omitempty"`
HeartbeatInterval *commonconfig.Duration
LogLevel *string
LogStreamingEnabled *bool
Expand Down Expand Up @@ -2806,6 +2808,12 @@ func (b *Telemetry) setFrom(f *Telemetry) {
if v := f.ChipIngressInsecureConnection; v != nil {
b.ChipIngressInsecureConnection = v
}
if v := f.DurableEmitterEnabled; v != nil {
b.DurableEmitterEnabled = v
}
if f.DurableEmitterPersistSources != nil {
b.DurableEmitterPersistSources = f.DurableEmitterPersistSources
}
if v := f.HeartbeatInterval; v != nil {
b.HeartbeatInterval = v
}
Expand Down
2 changes: 1 addition & 1 deletion core/scripts/cre/environment/.gitignore
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# ctf binary artifacts
# ctf binary artifacts — fully managed by the ctf binary, do not hand-edit
compose/
blockscout/
binaries/
Expand Down
5 changes: 4 additions & 1 deletion core/scripts/cre/environment/configs/chip-ingress.toml
Original file line number Diff line number Diff line change
Expand Up @@ -4,4 +4,7 @@
# compose_file='https://raw.githubusercontent.com/smartcontractkit/chainlink-testing-framework/refs/tags/framework/components/dockercompose/v0.1.19/framework/components/dockercompose/chip_ingress_set/docker-compose.yml'

[kafka]
topics=['cre']
# `cre` — workflow telemetry (source platform→cre shim). `chip-demo` — Atlas demo / DurableEmitter load tests
# (CHIP_INGRESS_TEST_ADDR; see core/services/beholder/durable_emitter_load_test.go).
# `node-platform` — PluginRelayerConfigEmitter / common.v1.ChainPluginConfig (chainlink-protos node-platform/chip-schemas.json).
topics=['cre', 'chip-demo', 'node-platform']
87 changes: 54 additions & 33 deletions core/scripts/cre/environment/environment/beholder.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,56 +37,71 @@ type moduleInfo struct {
Version string `json:"Version"`
}

// getSchemaSetFromGoMod uses `go list` to extract the version/commit ref
// from the github.com/smartcontractkit/chainlink-protos/workflows/go dependency.
// It returns a SchemaSet with hardcoded values matching default.toml config.
func getSchemaSetFromGoMod(ctx context.Context) ([]chipingressset.SchemaSet, error) {
const targetModule = "github.com/smartcontractkit/chainlink-protos/workflows/go"
const chainlinkProtosGitURI = "https://github.com/smartcontractkit/chainlink-protos"

// Get the absolute path to the repository root (where go.mod is located)
repoRoot, err := filepath.Abs(relativePathToRepoRoot)
if err != nil {
return nil, errors.Wrap(err, "failed to get absolute path to repository root")
}

// Use `go list -m -json` to get module information
// schemaCommitRefFromGoMod runs `go list -m -json` for targetModule from repoRoot and returns the ref for FetchAndRegisterProtos.
func schemaCommitRefFromGoMod(ctx context.Context, repoRoot, targetModule string) (ref string, rawVersion string, err error) {
cmd := exec.CommandContext(ctx, "go", "list", "-m", "-json", targetModule)
cmd.Dir = repoRoot

output, err := cmd.Output()
if err != nil {
return nil, errors.Wrapf(err, "failed to run 'go list -m -json %s'", targetModule)
output, cmdErr := cmd.Output()
if cmdErr != nil {
return "", "", errors.Wrapf(cmdErr, "failed to run 'go list -m -json %s'", targetModule)
}

// Parse JSON output
var modInfo moduleInfo
if err := json.Unmarshal(output, &modInfo); err != nil {
return nil, errors.Wrap(err, "failed to parse go list JSON output")
if unmarshalErr := json.Unmarshal(output, &modInfo); unmarshalErr != nil {
return "", "", errors.Wrap(unmarshalErr, "failed to parse go list JSON output")
}

if modInfo.Version == "" {
return nil, errors.Errorf("no version found for module %s", targetModule)
return "", "", errors.Errorf("no version found for module %s", targetModule)
}

// Extract commit ref from version string
// Support various formats:
// 1. v1.2.1 -> use as-is
// 2. v0.0.0-20211026045750-20ab5afb07e3 -> extract short hash (20ab5afb07e3)
// 3. 2a35b54f48ae06be4cc81c768dc9cc9e92249571 -> full commit hash, use as-is
// 4. v0.0.0-YYYYMMDDHHMMSS-SHORTHASH -> extract short hash
commitRef := extractCommitRef(modInfo.Version)
return commitRef, modInfo.Version, nil
}

// getSchemaSetFromGoMod resolves SchemaSets from chainlink-protos commits pinned in go.mod:
// - workflows (chip-cre.json) for CRE/workflow telemetry
// - node-platform (chip-schemas.json) for PluginRelayerConfigEmitter / common.v1.ChainPluginConfig
func getSchemaSetFromGoMod(ctx context.Context) ([]chipingressset.SchemaSet, error) {
const (
workflowsModule = "github.com/smartcontractkit/chainlink-protos/workflows/go"
nodePlatformModule = "github.com/smartcontractkit/chainlink-protos/node-platform"
)

framework.L.Info().Msgf("Extracted commit ref for %s: %s (from version: %s)", targetModule, commitRef, modInfo.Version)
repoRoot, err := filepath.Abs(relativePathToRepoRoot)
if err != nil {
return nil, errors.Wrap(err, "failed to get absolute path to repository root")
}

wfRef, wfVer, err := schemaCommitRefFromGoMod(ctx, repoRoot, workflowsModule)
if err != nil {
return nil, err
}
framework.L.Info().Msgf("Extracted commit ref for %s: %s (from version: %s)", workflowsModule, wfRef, wfVer)

// Return SchemaSet with hardcoded values from default.toml
schemaSet := chipingressset.SchemaSet{
URI: "https://github.com/smartcontractkit/chainlink-protos",
Ref: commitRef,
SchemaDir: "workflows",
ConfigFile: "chip-cre.json", // file with mappings of protobufs to subjects, together with references
npRef, npVer, err := schemaCommitRefFromGoMod(ctx, repoRoot, nodePlatformModule)
if err != nil {
return nil, err
}
framework.L.Info().Msgf("Extracted commit ref for %s: %s (from version: %s)", nodePlatformModule, npRef, npVer)

return []chipingressset.SchemaSet{schemaSet}, nil
return []chipingressset.SchemaSet{
{
URI: chainlinkProtosGitURI,
Ref: wfRef,
SchemaDir: "workflows",
ConfigFile: "chip-cre.json",
},
{
URI: chainlinkProtosGitURI,
Ref: npRef,
SchemaDir: "node-platform",
ConfigFile: "chip-schemas.json",
},
}, nil
}

// extractCommitRef extracts a commit reference from various version formats
Expand Down Expand Up @@ -701,6 +716,12 @@ and make sure that the sink is pointing to correct upstream endpoint ('localhost
return errors.Wrap(topicsErr, "failed to create topics")
}

if out.ChipIngress != nil && out.ChipIngress.GRPCExternalURL != "" {
if regErr := registerChipDemoLoadTestSchema(cmdContext, out.ChipIngress.GRPCExternalURL); regErr != nil {
framework.L.Warn().Err(regErr).Msg("chip-demo schema registration failed (durable emitter load tests with CHIP_INGRESS_TEST_ADDR may not drain until this succeeds; check Chip / auth)")
}
}

fmt.Print(libformat.PurpleText("%s", stageGen.WrapAndNext("Created topics in %.2f seconds", stageGen.Elapsed().Seconds())))

for _, topic := range in.Kafka.Topics {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
package environment

import (
"context"
"strings"
"time"

"github.com/pkg/errors"

"github.com/smartcontractkit/chainlink-testing-framework/framework"
"github.com/smartcontractkit/chainlink-common/pkg/chipingress"
chipingresspb "github.com/smartcontractkit/chainlink-common/pkg/chipingress/pb"
)

// chipDemoLoadTestProto is the raw .proto for schema subject chip-demo-pb.DemoClientPayload.
// Keep in sync with core/services/beholder/chip_load_test_demo.proto and atlas chip-ingress demo client.
const chipDemoLoadTestProto = `syntax = "proto3";

option go_package = "github.com/smartcontractkit/chainlink/v2/core/services/beholder;beholder";

package pb;

message DemoClientPayload {
string id = 1;
string domain = 2;
string entity = 3;
int64 batch_num = 4;
int64 message_num = 5;
int64 batch_position = 6;
}
`

// registerChipDemoLoadTestSchema registers the chip-demo protobuf used by DurableEmitter load tests
// (TestTPS_* with CHIP_INGRESS_TEST_ADDR) against the local CRE Beholder Chip Ingress.
// It uses the same demo basic-auth account as atlas/chip-ingress docker-compose (CE_SA_CHIP_INGRESS_DEMO_CLIENT).
func registerChipDemoLoadTestSchema(ctx context.Context, chipGRPCAddress string) error {
if strings.TrimSpace(chipGRPCAddress) == "" {
return errors.New("chip gRPC address is empty")
}

opts := []chipingress.Opt{
chipingress.WithInsecureConnection(),
chipingress.WithBasicAuth("chip-ingress-demo-client", "password"),
}
c, err := chipingress.NewClient(chipGRPCAddress, opts...)
if err != nil {
return errors.Wrap(err, "chipingress.NewClient")
}
defer func() { _ = c.Close() }()

regCtx, cancel := context.WithTimeout(ctx, 30*time.Second)
defer cancel()

_, err = c.RegisterSchemas(regCtx, &chipingresspb.Schema{
Subject: "chip-demo-pb.DemoClientPayload",
Schema: chipDemoLoadTestProto,
Format: chipingresspb.SchemaType_PROTOBUF,
})
if err != nil {
msg := strings.ToLower(err.Error())
if strings.Contains(msg, "already") || strings.Contains(msg, "exists") || strings.Contains(msg, "duplicate") {
framework.L.Info().Msg("chip-demo load-test schema already registered (chip-demo-pb.DemoClientPayload)")
return nil
}
return errors.Wrap(err, "RegisterSchemas chip-demo-pb.DemoClientPayload")
}
framework.L.Info().Msg("registered chip-demo load-test schema (chip-demo-pb.DemoClientPayload) for durable emitter / external Chip tests")
return nil
}
76 changes: 76 additions & 0 deletions core/scripts/cre/environment/obs-up.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
#!/usr/bin/env bash
# obs-up.sh — wrapper around `./bin/ctf obs up` that applies local overrides.
#
# CTF regenerates the compose/ directory on every `obs up`, overwriting any
# manual edits. This script re-applies our customisations afterwards and
# restarts only the affected containers so Grafana and the OTel collector pick
# them up without a full stack restart.
#
# Usage:
# ./obs-up.sh # bring the stack up (or recreate it) with patches
# ./obs-up.sh --down # tear down first, then bring up with patches
#
# Tracked overrides live in observability-overrides/ and are applied here.

set -euo pipefail

SCRIPT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)"
OVERRIDES="$SCRIPT_DIR/observability-overrides"
COMPOSE_DIR="$SCRIPT_DIR/compose"

# ── Optionally tear down first ────────────────────────────────────────────────
if [[ "${1:-}" == "--down" ]]; then
echo "▼ Tearing down obs stack..."
"$SCRIPT_DIR/bin/ctf" obs down || true
fi

# ── Bring up the stack (CTF regenerates compose/ here) ───────────────────────
echo "▲ Starting obs stack..."
"$SCRIPT_DIR/bin/ctf" obs up

# ── Apply otel-collector override ─────────────────────────────────────────────
echo "⚙ Applying otel.yaml override (resource_to_telemetry_conversion)..."
cp "$OVERRIDES/otel.yaml" "$COMPOSE_DIR/otel.yaml"

# ── Apply dashboard ───────────────────────────────────────────────────────────
echo "⚙ Copying Durable Emitter Load Test dashboard..."
mkdir -p "$COMPOSE_DIR/conf/provisioning/dashboards/beholder"
cp "$OVERRIDES/dashboards/beholder/load_dashboard.json" \
"$COMPOSE_DIR/conf/provisioning/dashboards/beholder/load_dashboard.json"

# ── Patch docker-compose.yaml to add beholder dashboard volume mount ──────────
echo "⚙ Patching docker-compose.yaml to add dashboard volume mount..."
python3 - "$COMPOSE_DIR/docker-compose.yaml" <<'PYEOF'
import sys, re

path = sys.argv[1]
with open(path) as f:
content = f.read()

marker = "./conf/provisioning/dashboards/beholder/load_dashboard.json:/var/lib/grafana/dashboards/beholder/load_dashboard.json"
if marker in content:
print(" dashboard volume mount already present, skipping.")
sys.exit(0)

# Insert our mount after the last existing dashboard volume line.
content = re.sub(
r"([ \t]+- \./conf/provisioning/dashboards/workflow-engine/engine\.json:/var/lib/grafana/dashboards/workflow-engine/engine\.json)",
r"\1\n - ./conf/provisioning/dashboards/beholder/load_dashboard.json:/var/lib/grafana/dashboards/beholder/load_dashboard.json",
content,
)

with open(path, "w") as f:
f.write(content)
print(" done.")
PYEOF

# ── Recreate affected containers so new volume mounts and config take effect ──
# `restart` reuses the existing container spec (no new mounts); `up --force-recreate`
# rebuilds the container from the patched docker-compose.yaml.
echo "↺ Recreating otel-collector and grafana with updated config..."
docker compose -f "$COMPOSE_DIR/docker-compose.yaml" up -d --force-recreate otel-collector grafana

echo ""
echo "✓ Obs stack is up with all overrides applied."
echo " Grafana: http://localhost:3000"
echo " Dashboard: http://localhost:3000/d/durable-emitter-load-test"
Loading
Loading