From d80df3d39f8ec33dd1581f15ea1f31534c2e5a5c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Bla=C5=BE=20Hrastnik?= Date: Wed, 10 Jul 2024 15:10:27 +0900 Subject: [PATCH 01/12] keystone: Add a mock-streams-trigger --- core/capabilities/streams/mock_trigger.go | 472 ++++++++++++++++++++++ core/services/chainlink/application.go | 14 + 2 files changed, 486 insertions(+) create mode 100644 core/capabilities/streams/mock_trigger.go diff --git a/core/capabilities/streams/mock_trigger.go b/core/capabilities/streams/mock_trigger.go new file mode 100644 index 00000000000..d4d32cc94d2 --- /dev/null +++ b/core/capabilities/streams/mock_trigger.go @@ -0,0 +1,472 @@ +package streams + +// NOTE: this file is an amalgamation of MercuryTrigger and the streams trigger load tests +// the mercury trigger was modified to contain non-empty meta and sign the report with mock keys + +import ( + "context" + "crypto/ecdsa" + "fmt" + "math/big" + "strconv" + "sync" + "time" + + "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/crypto" + "github.com/smartcontractkit/libocr/offchainreporting2plus/chains/evmutil" + ocrTypes "github.com/smartcontractkit/libocr/offchainreporting2plus/types" + + "github.com/smartcontractkit/chainlink-common/pkg/capabilities" + "github.com/smartcontractkit/chainlink-common/pkg/capabilities/datastreams" + "github.com/smartcontractkit/chainlink-common/pkg/services" + "github.com/smartcontractkit/chainlink-common/pkg/types/core" + v3 "github.com/smartcontractkit/chainlink-common/pkg/types/mercury/v3" + "github.com/smartcontractkit/chainlink-common/pkg/values" + + "github.com/smartcontractkit/chainlink/v2/core/logger" + "github.com/smartcontractkit/chainlink/v2/core/services/relay/evm/mercury/v3/reportcodec" +) + +const ( + baseTimestamp = 1000000000 +) + +func RegisterMockTrigger(lggr logger.Logger, capRegistry core.CapabilitiesRegistry) (*MockTriggerService, error) { + ctx := context.TODO() + trigger := NewMockTriggerService(100, lggr) + if err := trigger.Start(ctx); err != nil { + return nil, err + } + if err := capRegistry.Add(ctx, trigger); err != nil { + return nil, err + } + + return trigger, nil +} + +// NOTE: duplicated from trigger_test.go +func newReport(lggr logger.Logger, feedID [32]byte, price *big.Int, timestamp int64) []byte { + v3Codec := reportcodec.NewReportCodec(feedID, lggr) + raw, err := v3Codec.BuildReport(v3.ReportFields{ + BenchmarkPrice: price, + Timestamp: uint32(timestamp), + ValidFromTimestamp: uint32(timestamp), + Bid: price, + Ask: price, + LinkFee: price, + NativeFee: price, + ExpiresAt: uint32(timestamp + 1000000), + }) + if err != nil { + panic(err) + } + return raw +} + +func rawReportContext(reportCtx ocrTypes.ReportContext) []byte { + rc := evmutil.RawReportContext(reportCtx) + flat := []byte{} + for _, r := range rc { + flat = append(flat, r[:]...) + } + return flat +} + +const triggerID = "mock-streams-trigger@1.0.0" + +var capInfo = capabilities.MustNewCapabilityInfo( + triggerID, + capabilities.CapabilityTypeTrigger, + "Mock Streams Trigger", +) + +const defaultTickerResolutionMs = 1000 + +// TODO pending capabilities configuration implementation - this should be configurable with a sensible default +const defaultSendChannelBufferSize = 1000 + +type config struct { + // strings should be hex-encoded 32-byte values, prefixed with "0x", all lowercase, minimum 1 item + FeedIDs []string `json:"feedIds" jsonschema:"pattern=^0x[0-9a-f]{64}$,minItems=1"` + // must be greater than 0 + MaxFrequencyMs int `json:"maxFrequencyMs" jsonschema:"minimum=1"` +} + +type inputs struct { + TriggerID string `json:"triggerId"` +} + +var mercuryTriggerValidator = capabilities.NewValidator[config, inputs, capabilities.TriggerEvent](capabilities.ValidatorArgs{Info: capInfo}) + +// This Trigger Service allows for the registration and deregistration of triggers. You can also send reports to the service. +type MockTriggerService struct { + capabilities.Validator[config, inputs, capabilities.TriggerEvent] + capabilities.CapabilityInfo + tickerResolutionMs int64 + subscribers map[string]*subscriber + latestReports map[datastreams.FeedID]datastreams.FeedReport + mu sync.Mutex + stopCh services.StopChan + wg sync.WaitGroup + lggr logger.Logger + + // + meta datastreams.SignersMetadata + signers []*ecdsa.PrivateKey + producer *mockDataProducer + // +} + +var _ capabilities.TriggerCapability = (*MockTriggerService)(nil) +var _ services.Service = &MockTriggerService{} + +type subscriber struct { + ch chan<- capabilities.CapabilityResponse + workflowID string + config config +} + +// Mock Trigger will send events to each subscriber every MaxFrequencyMs (configurable per subscriber). +// Event generation happens whenever local unix time is a multiple of tickerResolutionMs. Therefore, +// all subscribers' MaxFrequencyMs values need to be a multiple of tickerResolutionMs. +func NewMockTriggerService(tickerResolutionMs int64, lggr logger.Logger) *MockTriggerService { + if tickerResolutionMs == 0 { + tickerResolutionMs = defaultTickerResolutionMs + } + // + f := 1 + meta := datastreams.SignersMetadata{MinRequiredSignatures: 2*f + 1} + // gen private keys for MinRequiredSignatures + signers := []*ecdsa.PrivateKey{} + for i := 0; i < meta.MinRequiredSignatures; i++ { + // test keys: need to be the same across nodes + bytes := make([]byte, 32) + bytes[31] = uint8(i + 1) + + privKey, err := crypto.ToECDSA(bytes) + if err != nil { + panic(err) + } + signers = append(signers, privKey) + + signerAddr := crypto.PubkeyToAddress(privKey.PublicKey).Bytes() + meta.Signers = append(meta.Signers, signerAddr) + } + // + return &MockTriggerService{ + Validator: mercuryTriggerValidator, + CapabilityInfo: capInfo, + tickerResolutionMs: tickerResolutionMs, + subscribers: make(map[string]*subscriber), + latestReports: make(map[datastreams.FeedID]datastreams.FeedReport), + stopCh: make(services.StopChan), + lggr: lggr.Named("MockTriggerService"), + meta: meta, + signers: signers} +} + +func (o *MockTriggerService) ProcessReport(reports []datastreams.FeedReport) error { + o.mu.Lock() + defer o.mu.Unlock() + o.lggr.Debugw("ProcessReport", "nReports", len(reports)) + for _, report := range reports { + feedID := datastreams.FeedID(report.FeedID) + o.latestReports[feedID] = report + } + return nil +} + +func (o *MockTriggerService) RegisterTrigger(ctx context.Context, req capabilities.CapabilityRequest) (<-chan capabilities.CapabilityResponse, error) { + wid := req.Metadata.WorkflowID + + o.mu.Lock() + defer o.mu.Unlock() + + config, err := o.ValidateConfig(req.Config) + if err != nil { + return nil, err + } + + inputs, err := o.ValidateInputs(req.Inputs) + if err != nil { + return nil, err + } + + triggerID := o.getTriggerID(inputs.TriggerID, wid) + // If triggerId is already registered, return an error + if _, ok := o.subscribers[triggerID]; ok { + return nil, fmt.Errorf("triggerId %s already registered", triggerID) + } + + if int64(config.MaxFrequencyMs)%o.tickerResolutionMs != 0 { + return nil, fmt.Errorf("MaxFrequencyMs must be a multiple of %d", o.tickerResolutionMs) + } + + ch := make(chan capabilities.CapabilityResponse, defaultSendChannelBufferSize) + o.subscribers[triggerID] = + &subscriber{ + ch: ch, + workflowID: wid, + config: *config, + } + + // Only start the producer once a workflow is registered + o.producer = NewMockDataProducer(o, o.meta, o.signers, config.FeedIDs, o.lggr) + if err := o.producer.Start(ctx); err != nil { + return nil, err + } + + return ch, nil +} + +func (o *MockTriggerService) UnregisterTrigger(ctx context.Context, req capabilities.CapabilityRequest) error { + wid := req.Metadata.WorkflowID + + o.mu.Lock() + defer o.mu.Unlock() + + inputs, err := o.ValidateInputs(req.Inputs) + if err != nil { + return err + } + triggerID := o.getTriggerID(inputs.TriggerID, wid) + + subscriber, ok := o.subscribers[triggerID] + if !ok { + return fmt.Errorf("triggerId %s not registered", triggerID) + } + close(subscriber.ch) + delete(o.subscribers, triggerID) + if len(o.subscribers) == 0 { + if err := o.producer.Close(); err != nil { + return err + } + o.producer = nil + } + return nil +} + +func (o *MockTriggerService) getTriggerID(triggerID string, wid string) string { + tid := wid + "|" + triggerID + return tid +} + +func (o *MockTriggerService) loop() { + defer o.wg.Done() + now := time.Now().UnixMilli() + nextWait := o.tickerResolutionMs - now%o.tickerResolutionMs + + for { + select { + case <-o.stopCh: + return + case <-time.After(time.Duration(nextWait) * time.Millisecond): + startTs := time.Now().UnixMilli() + // find closest timestamp that is a multiple of o.tickerResolutionMs + aligned := (startTs + o.tickerResolutionMs/2) / o.tickerResolutionMs * o.tickerResolutionMs + o.process(aligned) + endTs := time.Now().UnixMilli() + if endTs-startTs > o.tickerResolutionMs { + o.lggr.Errorw("processing took longer than ticker resolution", "duration", endTs-startTs, "tickerResolutionMs", o.tickerResolutionMs) + } + nextWait = getNextWaitIntervalMs(aligned, o.tickerResolutionMs, endTs) + } + } +} + +func getNextWaitIntervalMs(lastTs, tickerResolutionMs, currentTs int64) int64 { + desiredNext := lastTs + tickerResolutionMs + nextWait := desiredNext - currentTs + if nextWait <= 0 { + nextWait = 0 + } + return nextWait +} + +func (o *MockTriggerService) process(timestamp int64) { + o.mu.Lock() + defer o.mu.Unlock() + for _, sub := range o.subscribers { + if timestamp%int64(sub.config.MaxFrequencyMs) == 0 { + reportList := make([]datastreams.FeedReport, 0) + for _, feedID := range sub.config.FeedIDs { + if latest, ok := o.latestReports[datastreams.FeedID(feedID)]; ok { + reportList = append(reportList, latest) + } + } + + // use 32-byte-padded timestamp as EventID (human-readable) + eventID := fmt.Sprintf("streams_%024s", strconv.FormatInt(timestamp, 10)) + capabilityResponse, err := wrapReports(reportList, eventID, timestamp, o.meta) + + if err != nil { + o.lggr.Errorw("error wrapping reports", "err", err) + continue + } + + o.lggr.Debugw("ProcessReport pushing event", "nReports", len(reportList), "eventID", eventID) + select { + case sub.ch <- capabilityResponse: + default: + o.lggr.Errorw("subscriber channel full, dropping event", "eventID", eventID, "workflowID", sub.workflowID) + } + } + } +} + +func wrapReports(reportList []datastreams.FeedReport, eventID string, timestamp int64, meta datastreams.SignersMetadata) (capabilities.CapabilityResponse, error) { + val, err := values.Wrap(reportList) + if err != nil { + return capabilities.CapabilityResponse{}, err + } + + metaVal, err := values.Wrap(meta) + if err != nil { + return capabilities.CapabilityResponse{}, err + } + + triggerEvent := capabilities.TriggerEvent{ + TriggerType: triggerID, + ID: eventID, + Timestamp: strconv.FormatInt(timestamp, 10), + Metadata: metaVal, + Payload: val, + } + + eventVal, err := values.Wrap(triggerEvent) + if err != nil { + return capabilities.CapabilityResponse{}, err + } + + // Create a new CapabilityResponse with the MockTriggerEvent + return capabilities.CapabilityResponse{ + Value: eventVal.(*values.Map), + }, nil +} + +func (o *MockTriggerService) Start(ctx context.Context) error { + o.wg.Add(1) + go o.loop() + o.lggr.Info("MockTriggerService started") + return nil +} + +func (o *MockTriggerService) Close() error { + close(o.stopCh) + o.wg.Wait() + o.lggr.Info("MockTriggerService closed") + return nil +} + +func (o *MockTriggerService) Ready() error { + return nil +} + +func (o *MockTriggerService) HealthReport() map[string]error { + return nil +} + +func (o *MockTriggerService) Name() string { + return "MockTriggerService" +} + +type mockDataProducer struct { + trigger *MockTriggerService + wg sync.WaitGroup + closeCh chan struct{} + meta datastreams.SignersMetadata + signers []*ecdsa.PrivateKey + feedIDs []string + lggr logger.Logger +} + +var _ services.Service = &mockDataProducer{} + +func NewMockDataProducer(trigger *MockTriggerService, meta datastreams.SignersMetadata, signers []*ecdsa.PrivateKey, feedIDs []string, lggr logger.Logger) *mockDataProducer { + return &mockDataProducer{ + trigger: trigger, + closeCh: make(chan struct{}), + meta: meta, + signers: signers, + feedIDs: feedIDs, + lggr: lggr, + } +} + +func (m *mockDataProducer) Start(ctx context.Context) error { + m.wg.Add(1) + go m.loop() + return nil +} + +func (m *mockDataProducer) loop() { + defer m.wg.Done() + + sleepSec := 15 + ticker := time.NewTicker(time.Duration(sleepSec) * time.Second) + defer ticker.Stop() + + prices := []int64{300000, 40000, 5000000} + + j := 0 + + for range ticker.C { + for i := range prices { + prices[i] = prices[i] + 1 + } + j++ + + // https://github.com/smartcontractkit/chainlink/blob/41f9428c3aa8231e8834a230fca4c2ccffd4e6c3/core/capabilities/streams/trigger_test.go#L117-L122 + + timestamp := time.Now().Unix() + // TODO: shouldn't we increment round rather than epoch? + reportCtx := ocrTypes.ReportContext{ReportTimestamp: ocrTypes.ReportTimestamp{Epoch: uint32(baseTimestamp + j)}} + + reports := []datastreams.FeedReport{} + for _, feedID := range m.feedIDs { + report := datastreams.FeedReport{ + FeedID: feedID, + FullReport: newReport(m.lggr, common.HexToHash(feedID), big.NewInt(prices[0]), timestamp), + ReportContext: rawReportContext(reportCtx), + ObservationTimestamp: timestamp, + } + // sign report with mock signers + sigData := append(crypto.Keccak256(report.FullReport), report.ReportContext...) + hash := crypto.Keccak256(sigData) + for n := 0; n < m.meta.MinRequiredSignatures; n++ { + sig, err := crypto.Sign(hash, m.signers[n]) + if err != nil { + panic(err) + } + report.Signatures = append(report.Signatures, sig) + } + + reports = append(reports, report) + } + + m.lggr.Infow("New set of Mock reports", "timestamp", time.Now().Unix(), "payload", reports) + err := m.trigger.ProcessReport(reports) + if err != nil { + m.lggr.Errorw("failed to process Mock reports", "err", err, "timestamp", time.Now().Unix(), "payload", reports) + } + } +} + +func (m *mockDataProducer) Close() error { + close(m.closeCh) + m.wg.Wait() + return nil +} + +func (m *mockDataProducer) HealthReport() map[string]error { + return nil +} + +func (m *mockDataProducer) Ready() error { + return nil +} + +func (m *mockDataProducer) Name() string { + return "mockDataProducer" +} diff --git a/core/services/chainlink/application.go b/core/services/chainlink/application.go index 4803a14faa1..ebfdc090fd2 100644 --- a/core/services/chainlink/application.go +++ b/core/services/chainlink/application.go @@ -56,6 +56,11 @@ import ( "github.com/smartcontractkit/chainlink/v2/core/build" "github.com/smartcontractkit/chainlink/v2/core/capabilities" "github.com/smartcontractkit/chainlink/v2/core/capabilities/ccip" + "github.com/smartcontractkit/chainlink/v2/core/capabilities/compute" + gatewayconnector "github.com/smartcontractkit/chainlink/v2/core/capabilities/gateway_connector" + "github.com/smartcontractkit/chainlink/v2/core/capabilities/remote" + remotetypes "github.com/smartcontractkit/chainlink/v2/core/capabilities/remote/types" + capStreams "github.com/smartcontractkit/chainlink/v2/core/capabilities/streams" "github.com/smartcontractkit/chainlink/v2/core/config" "github.com/smartcontractkit/chainlink/v2/core/logger" "github.com/smartcontractkit/chainlink/v2/core/logger/audit" @@ -291,6 +296,15 @@ func NewApplication(ctx context.Context, opts ApplicationOpts) (Application, err loopRegistry := plugins.NewLoopRegistry(globalLogger, cfg.AppID().String(), cfg.Feature().LogPoller(), cfg.Database(), cfg.Mercury(), cfg.Tracing(), cfg.Telemetry(), beholderAuthHeaders, csaPubKeyHex, cfg.LOOPP()) + if cfg.Capabilities().Local().GetCapabilityConfig("mock-streams-trigger@1.0.0") != nil { + // Register the mock streams trigger only for test environments that explicitly opt in. + // TODO: proper component shutdown via srvcs(). + _, err = capStreams.RegisterMockTrigger(globalLogger, opts.CapabilitiesRegistry) + if err != nil { + return nil, err + } + } + relayerFactory := RelayerFactory{ Logger: opts.Logger, Registerer: opts.Registerer, From b7d4b32b514e714d9779f9c014cee1b24a8ba687 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Bla=C5=BE=20Hrastnik?= Date: Tue, 30 Jul 2024 02:30:20 +0900 Subject: [PATCH 02/12] capabilities: Set up a dummy registry if peering is disabled --- core/capabilities/registry.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/capabilities/registry.go b/core/capabilities/registry.go index 830c201182e..1c95ed0aae4 100644 --- a/core/capabilities/registry.go +++ b/core/capabilities/registry.go @@ -93,7 +93,7 @@ func (t *TestMetadataRegistry) LocalNode(ctx context.Context) (capabilities.Node Members: []p2ptypes.PeerID{ peerID, }, - F: 0, + F: 1, IsPublic: false, AcceptsWorkflows: true, } From 3fb7a6383531ca5b72c58916f3298ccc80cd9332 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Bla=C5=BE=20Hrastnik?= Date: Tue, 27 Aug 2024 12:47:55 +0900 Subject: [PATCH 03/12] capabilities: Implement mock trigger by wrapping mercury trigger --- core/capabilities/streams/mock_trigger.go | 445 +++++----------------- 1 file changed, 96 insertions(+), 349 deletions(-) diff --git a/core/capabilities/streams/mock_trigger.go b/core/capabilities/streams/mock_trigger.go index d4d32cc94d2..d88c0f24eb7 100644 --- a/core/capabilities/streams/mock_trigger.go +++ b/core/capabilities/streams/mock_trigger.go @@ -1,14 +1,10 @@ package streams -// NOTE: this file is an amalgamation of MercuryTrigger and the streams trigger load tests -// the mercury trigger was modified to contain non-empty meta and sign the report with mock keys - import ( "context" "crypto/ecdsa" - "fmt" + "maps" "math/big" - "strconv" "sync" "time" @@ -19,19 +15,16 @@ import ( "github.com/smartcontractkit/chainlink-common/pkg/capabilities" "github.com/smartcontractkit/chainlink-common/pkg/capabilities/datastreams" + "github.com/smartcontractkit/chainlink-common/pkg/capabilities/triggers" + "github.com/smartcontractkit/chainlink-common/pkg/capabilities/triggers/streams" "github.com/smartcontractkit/chainlink-common/pkg/services" "github.com/smartcontractkit/chainlink-common/pkg/types/core" v3 "github.com/smartcontractkit/chainlink-common/pkg/types/mercury/v3" - "github.com/smartcontractkit/chainlink-common/pkg/values" "github.com/smartcontractkit/chainlink/v2/core/logger" "github.com/smartcontractkit/chainlink/v2/core/services/relay/evm/mercury/v3/reportcodec" ) -const ( - baseTimestamp = 1000000000 -) - func RegisterMockTrigger(lggr logger.Logger, capRegistry core.CapabilitiesRegistry) (*MockTriggerService, error) { ctx := context.TODO() trigger := NewMockTriggerService(100, lggr) @@ -45,34 +38,6 @@ func RegisterMockTrigger(lggr logger.Logger, capRegistry core.CapabilitiesRegist return trigger, nil } -// NOTE: duplicated from trigger_test.go -func newReport(lggr logger.Logger, feedID [32]byte, price *big.Int, timestamp int64) []byte { - v3Codec := reportcodec.NewReportCodec(feedID, lggr) - raw, err := v3Codec.BuildReport(v3.ReportFields{ - BenchmarkPrice: price, - Timestamp: uint32(timestamp), - ValidFromTimestamp: uint32(timestamp), - Bid: price, - Ask: price, - LinkFee: price, - NativeFee: price, - ExpiresAt: uint32(timestamp + 1000000), - }) - if err != nil { - panic(err) - } - return raw -} - -func rawReportContext(reportCtx ocrTypes.ReportContext) []byte { - rc := evmutil.RawReportContext(reportCtx) - flat := []byte{} - for _, r := range rc { - flat = append(flat, r[:]...) - } - return flat -} - const triggerID = "mock-streams-trigger@1.0.0" var capInfo = capabilities.MustNewCapabilityInfo( @@ -81,62 +46,24 @@ var capInfo = capabilities.MustNewCapabilityInfo( "Mock Streams Trigger", ) -const defaultTickerResolutionMs = 1000 - -// TODO pending capabilities configuration implementation - this should be configurable with a sensible default -const defaultSendChannelBufferSize = 1000 - -type config struct { - // strings should be hex-encoded 32-byte values, prefixed with "0x", all lowercase, minimum 1 item - FeedIDs []string `json:"feedIds" jsonschema:"pattern=^0x[0-9a-f]{64}$,minItems=1"` - // must be greater than 0 - MaxFrequencyMs int `json:"maxFrequencyMs" jsonschema:"minimum=1"` -} - -type inputs struct { - TriggerID string `json:"triggerId"` -} - -var mercuryTriggerValidator = capabilities.NewValidator[config, inputs, capabilities.TriggerEvent](capabilities.ValidatorArgs{Info: capInfo}) - -// This Trigger Service allows for the registration and deregistration of triggers. You can also send reports to the service. +// Wraps the MercuryTriggerService to produce a trigger with mocked data type MockTriggerService struct { - capabilities.Validator[config, inputs, capabilities.TriggerEvent] - capabilities.CapabilityInfo - tickerResolutionMs int64 - subscribers map[string]*subscriber - latestReports map[datastreams.FeedID]datastreams.FeedReport - mu sync.Mutex - stopCh services.StopChan - wg sync.WaitGroup - lggr logger.Logger - - // - meta datastreams.SignersMetadata - signers []*ecdsa.PrivateKey - producer *mockDataProducer - // -} - -var _ capabilities.TriggerCapability = (*MockTriggerService)(nil) -var _ services.Service = &MockTriggerService{} - -type subscriber struct { - ch chan<- capabilities.CapabilityResponse - workflowID string - config config + *triggers.MercuryTriggerService + meta datastreams.Metadata + signers []*ecdsa.PrivateKey + stopCh services.StopChan + wg sync.WaitGroup + subscribers map[string][]streams.FeedId + subscribersMu sync.Mutex + lggr logger.Logger } -// Mock Trigger will send events to each subscriber every MaxFrequencyMs (configurable per subscriber). -// Event generation happens whenever local unix time is a multiple of tickerResolutionMs. Therefore, -// all subscribers' MaxFrequencyMs values need to be a multiple of tickerResolutionMs. func NewMockTriggerService(tickerResolutionMs int64, lggr logger.Logger) *MockTriggerService { - if tickerResolutionMs == 0 { - tickerResolutionMs = defaultTickerResolutionMs - } - // + trigger, _ := triggers.NewMercuryTriggerService(tickerResolutionMs, "mock-streams-trigger", "1.0.0", lggr) + trigger.CapabilityInfo = capInfo + f := 1 - meta := datastreams.SignersMetadata{MinRequiredSignatures: 2*f + 1} + meta := datastreams.Metadata{MinRequiredSignatures: 2*f + 1} // gen private keys for MinRequiredSignatures signers := []*ecdsa.PrivateKey{} for i := 0; i < meta.MinRequiredSignatures; i++ { @@ -153,256 +80,85 @@ func NewMockTriggerService(tickerResolutionMs int64, lggr logger.Logger) *MockTr signerAddr := crypto.PubkeyToAddress(privKey.PublicKey).Bytes() meta.Signers = append(meta.Signers, signerAddr) } - // - return &MockTriggerService{ - Validator: mercuryTriggerValidator, - CapabilityInfo: capInfo, - tickerResolutionMs: tickerResolutionMs, - subscribers: make(map[string]*subscriber), - latestReports: make(map[datastreams.FeedID]datastreams.FeedReport), - stopCh: make(services.StopChan), - lggr: lggr.Named("MockTriggerService"), - meta: meta, - signers: signers} -} - -func (o *MockTriggerService) ProcessReport(reports []datastreams.FeedReport) error { - o.mu.Lock() - defer o.mu.Unlock() - o.lggr.Debugw("ProcessReport", "nReports", len(reports)) - for _, report := range reports { - feedID := datastreams.FeedID(report.FeedID) - o.latestReports[feedID] = report - } - return nil -} - -func (o *MockTriggerService) RegisterTrigger(ctx context.Context, req capabilities.CapabilityRequest) (<-chan capabilities.CapabilityResponse, error) { - wid := req.Metadata.WorkflowID - - o.mu.Lock() - defer o.mu.Unlock() - - config, err := o.ValidateConfig(req.Config) - if err != nil { - return nil, err - } - - inputs, err := o.ValidateInputs(req.Inputs) - if err != nil { - return nil, err - } - triggerID := o.getTriggerID(inputs.TriggerID, wid) - // If triggerId is already registered, return an error - if _, ok := o.subscribers[triggerID]; ok { - return nil, fmt.Errorf("triggerId %s already registered", triggerID) - } + // MercuryTrigger is typically wrapped by other modules that ignore the trigger's meta and provide a different one. + // Since we're skipping those wrappers we need to provide our own meta here. + trigger.SetMetaOverride(meta) - if int64(config.MaxFrequencyMs)%o.tickerResolutionMs != 0 { - return nil, fmt.Errorf("MaxFrequencyMs must be a multiple of %d", o.tickerResolutionMs) - } - - ch := make(chan capabilities.CapabilityResponse, defaultSendChannelBufferSize) - o.subscribers[triggerID] = - &subscriber{ - ch: ch, - workflowID: wid, - config: *config, - } - - // Only start the producer once a workflow is registered - o.producer = NewMockDataProducer(o, o.meta, o.signers, config.FeedIDs, o.lggr) - if err := o.producer.Start(ctx); err != nil { - return nil, err - } - - return ch, nil + return &MockTriggerService{ + MercuryTriggerService: trigger, + meta: meta, + signers: signers, + subscribers: make(map[string][]streams.FeedId), + lggr: lggr} } -func (o *MockTriggerService) UnregisterTrigger(ctx context.Context, req capabilities.CapabilityRequest) error { - wid := req.Metadata.WorkflowID - - o.mu.Lock() - defer o.mu.Unlock() - - inputs, err := o.ValidateInputs(req.Inputs) - if err != nil { +func (m *MockTriggerService) Start(ctx context.Context) error { + if err := m.MercuryTriggerService.Start(ctx); err != nil { return err } - triggerID := o.getTriggerID(inputs.TriggerID, wid) - - subscriber, ok := o.subscribers[triggerID] - if !ok { - return fmt.Errorf("triggerId %s not registered", triggerID) - } - close(subscriber.ch) - delete(o.subscribers, triggerID) - if len(o.subscribers) == 0 { - if err := o.producer.Close(); err != nil { - return err - } - o.producer = nil - } + go m.loop() return nil } -func (o *MockTriggerService) getTriggerID(triggerID string, wid string) string { - tid := wid + "|" + triggerID - return tid +func (m *MockTriggerService) Close() error { + close(m.stopCh) + m.wg.Wait() + return m.MercuryTriggerService.Close() } -func (o *MockTriggerService) loop() { - defer o.wg.Done() - now := time.Now().UnixMilli() - nextWait := o.tickerResolutionMs - now%o.tickerResolutionMs - - for { - select { - case <-o.stopCh: - return - case <-time.After(time.Duration(nextWait) * time.Millisecond): - startTs := time.Now().UnixMilli() - // find closest timestamp that is a multiple of o.tickerResolutionMs - aligned := (startTs + o.tickerResolutionMs/2) / o.tickerResolutionMs * o.tickerResolutionMs - o.process(aligned) - endTs := time.Now().UnixMilli() - if endTs-startTs > o.tickerResolutionMs { - o.lggr.Errorw("processing took longer than ticker resolution", "duration", endTs-startTs, "tickerResolutionMs", o.tickerResolutionMs) - } - nextWait = getNextWaitIntervalMs(aligned, o.tickerResolutionMs, endTs) - } +func (o *MockTriggerService) RegisterTrigger(ctx context.Context, req capabilities.TriggerRegistrationRequest) (<-chan capabilities.TriggerResponse, error) { + ch, err := o.MercuryTriggerService.RegisterTrigger(ctx, req) + if err != nil { + return nil, err } -} -func getNextWaitIntervalMs(lastTs, tickerResolutionMs, currentTs int64) int64 { - desiredNext := lastTs + tickerResolutionMs - nextWait := desiredNext - currentTs - if nextWait <= 0 { - nextWait = 0 - } - return nextWait + config, _ := o.MercuryTriggerService.ValidateConfig(req.Config) + o.subscribersMu.Lock() + defer o.subscribersMu.Unlock() + o.subscribers[req.Metadata.WorkflowID] = config.FeedIds + return ch, nil } -func (o *MockTriggerService) process(timestamp int64) { - o.mu.Lock() - defer o.mu.Unlock() - for _, sub := range o.subscribers { - if timestamp%int64(sub.config.MaxFrequencyMs) == 0 { - reportList := make([]datastreams.FeedReport, 0) - for _, feedID := range sub.config.FeedIDs { - if latest, ok := o.latestReports[datastreams.FeedID(feedID)]; ok { - reportList = append(reportList, latest) - } - } - - // use 32-byte-padded timestamp as EventID (human-readable) - eventID := fmt.Sprintf("streams_%024s", strconv.FormatInt(timestamp, 10)) - capabilityResponse, err := wrapReports(reportList, eventID, timestamp, o.meta) - - if err != nil { - o.lggr.Errorw("error wrapping reports", "err", err) - continue - } - - o.lggr.Debugw("ProcessReport pushing event", "nReports", len(reportList), "eventID", eventID) - select { - case sub.ch <- capabilityResponse: - default: - o.lggr.Errorw("subscriber channel full, dropping event", "eventID", eventID, "workflowID", sub.workflowID) - } - } - } +func (o *MockTriggerService) UnregisterTrigger(ctx context.Context, req capabilities.TriggerRegistrationRequest) error { + err := o.MercuryTriggerService.UnregisterTrigger(ctx, req) + o.subscribersMu.Lock() + defer o.subscribersMu.Unlock() + delete(o.subscribers, req.Metadata.WorkflowID) + return err } -func wrapReports(reportList []datastreams.FeedReport, eventID string, timestamp int64, meta datastreams.SignersMetadata) (capabilities.CapabilityResponse, error) { - val, err := values.Wrap(reportList) - if err != nil { - return capabilities.CapabilityResponse{}, err - } - - metaVal, err := values.Wrap(meta) - if err != nil { - return capabilities.CapabilityResponse{}, err - } - - triggerEvent := capabilities.TriggerEvent{ - TriggerType: triggerID, - ID: eventID, - Timestamp: strconv.FormatInt(timestamp, 10), - Metadata: metaVal, - Payload: val, - } +const baseTimestamp = 1000000000 - eventVal, err := values.Wrap(triggerEvent) +// NOTE: duplicated from trigger_test.go +func newReport(lggr logger.Logger, feedID [32]byte, price *big.Int, timestamp int64) []byte { + v3Codec := reportcodec.NewReportCodec(feedID, lggr) + raw, err := v3Codec.BuildReport(context.Background(), v3.ReportFields{ + BenchmarkPrice: price, + Timestamp: uint32(timestamp), + ValidFromTimestamp: uint32(timestamp), + Bid: price, + Ask: price, + LinkFee: price, + NativeFee: price, + ExpiresAt: uint32(timestamp + 1000000), + }) if err != nil { - return capabilities.CapabilityResponse{}, err + panic(err) } - - // Create a new CapabilityResponse with the MockTriggerEvent - return capabilities.CapabilityResponse{ - Value: eventVal.(*values.Map), - }, nil -} - -func (o *MockTriggerService) Start(ctx context.Context) error { - o.wg.Add(1) - go o.loop() - o.lggr.Info("MockTriggerService started") - return nil -} - -func (o *MockTriggerService) Close() error { - close(o.stopCh) - o.wg.Wait() - o.lggr.Info("MockTriggerService closed") - return nil -} - -func (o *MockTriggerService) Ready() error { - return nil -} - -func (o *MockTriggerService) HealthReport() map[string]error { - return nil -} - -func (o *MockTriggerService) Name() string { - return "MockTriggerService" -} - -type mockDataProducer struct { - trigger *MockTriggerService - wg sync.WaitGroup - closeCh chan struct{} - meta datastreams.SignersMetadata - signers []*ecdsa.PrivateKey - feedIDs []string - lggr logger.Logger + return raw } -var _ services.Service = &mockDataProducer{} - -func NewMockDataProducer(trigger *MockTriggerService, meta datastreams.SignersMetadata, signers []*ecdsa.PrivateKey, feedIDs []string, lggr logger.Logger) *mockDataProducer { - return &mockDataProducer{ - trigger: trigger, - closeCh: make(chan struct{}), - meta: meta, - signers: signers, - feedIDs: feedIDs, - lggr: lggr, +func rawReportContext(reportCtx ocrTypes.ReportContext) []byte { + rc := evmutil.RawReportContext(reportCtx) + flat := []byte{} + for _, r := range rc { + flat = append(flat, r[:]...) } + return flat } -func (m *mockDataProducer) Start(ctx context.Context) error { - m.wg.Add(1) - go m.loop() - return nil -} - -func (m *mockDataProducer) loop() { - defer m.wg.Done() - +func (m *MockTriggerService) loop() { sleepSec := 15 ticker := time.NewTicker(time.Duration(sleepSec) * time.Second) defer ticker.Stop() @@ -412,6 +168,7 @@ func (m *mockDataProducer) loop() { j := 0 for range ticker.C { + // TODO: properly close for i := range prices { prices[i] = prices[i] + 1 } @@ -424,49 +181,39 @@ func (m *mockDataProducer) loop() { reportCtx := ocrTypes.ReportContext{ReportTimestamp: ocrTypes.ReportTimestamp{Epoch: uint32(baseTimestamp + j)}} reports := []datastreams.FeedReport{} - for _, feedID := range m.feedIDs { - report := datastreams.FeedReport{ - FeedID: feedID, - FullReport: newReport(m.lggr, common.HexToHash(feedID), big.NewInt(prices[0]), timestamp), - ReportContext: rawReportContext(reportCtx), - ObservationTimestamp: timestamp, - } - // sign report with mock signers - sigData := append(crypto.Keccak256(report.FullReport), report.ReportContext...) - hash := crypto.Keccak256(sigData) - for n := 0; n < m.meta.MinRequiredSignatures; n++ { - sig, err := crypto.Sign(hash, m.signers[n]) - if err != nil { - panic(err) + subscribers := map[string][]streams.FeedId{} + m.subscribersMu.Lock() + maps.Copy(subscribers, m.subscribers) + m.subscribersMu.Unlock() + for _, feedIDs := range subscribers { + for _, feedID := range feedIDs { + feedID := string(feedID) + report := datastreams.FeedReport{ + FeedID: feedID, + FullReport: newReport(m.lggr, common.HexToHash(feedID), big.NewInt(prices[0]), timestamp), + ReportContext: rawReportContext(reportCtx), + ObservationTimestamp: timestamp, } - report.Signatures = append(report.Signatures, sig) + // sign report with mock signers + sigData := append(crypto.Keccak256(report.FullReport), report.ReportContext...) + hash := crypto.Keccak256(sigData) + for n := 0; n < m.meta.MinRequiredSignatures; n++ { + sig, err := crypto.Sign(hash, m.signers[n]) + if err != nil { + panic(err) + } + report.Signatures = append(report.Signatures, sig) + } + + reports = append(reports, report) } - reports = append(reports, report) } m.lggr.Infow("New set of Mock reports", "timestamp", time.Now().Unix(), "payload", reports) - err := m.trigger.ProcessReport(reports) + err := m.MercuryTriggerService.ProcessReport(reports) if err != nil { m.lggr.Errorw("failed to process Mock reports", "err", err, "timestamp", time.Now().Unix(), "payload", reports) } } } - -func (m *mockDataProducer) Close() error { - close(m.closeCh) - m.wg.Wait() - return nil -} - -func (m *mockDataProducer) HealthReport() map[string]error { - return nil -} - -func (m *mockDataProducer) Ready() error { - return nil -} - -func (m *mockDataProducer) Name() string { - return "mockDataProducer" -} From 4e50b753d3f95ea99cbc490294f906142164b2a6 Mon Sep 17 00:00:00 2001 From: cawthorne Date: Thu, 2 Apr 2026 13:46:47 +0100 Subject: [PATCH 04/12] feat: gate mock streams trigger on develop --- core/capabilities/streams/mock_trigger.go | 35 ++++- .../capabilities/streams/mock_trigger_test.go | 135 ++++++++++++++++++ core/services/chainlink/application.go | 10 +- .../application_mock_trigger_test.go | 61 ++++++++ 4 files changed, 229 insertions(+), 12 deletions(-) create mode 100644 core/capabilities/streams/mock_trigger_test.go create mode 100644 core/services/chainlink/application_mock_trigger_test.go diff --git a/core/capabilities/streams/mock_trigger.go b/core/capabilities/streams/mock_trigger.go index d88c0f24eb7..9d1a6f20e3e 100644 --- a/core/capabilities/streams/mock_trigger.go +++ b/core/capabilities/streams/mock_trigger.go @@ -21,8 +21,8 @@ import ( "github.com/smartcontractkit/chainlink-common/pkg/types/core" v3 "github.com/smartcontractkit/chainlink-common/pkg/types/mercury/v3" + "github.com/smartcontractkit/chainlink-evm/pkg/mercury/v3/reportcodec" "github.com/smartcontractkit/chainlink/v2/core/logger" - "github.com/smartcontractkit/chainlink/v2/core/services/relay/evm/mercury/v3/reportcodec" ) func RegisterMockTrigger(lggr logger.Logger, capRegistry core.CapabilitiesRegistry) (*MockTriggerService, error) { @@ -38,7 +38,9 @@ func RegisterMockTrigger(lggr logger.Logger, capRegistry core.CapabilitiesRegist return trigger, nil } -const triggerID = "mock-streams-trigger@1.0.0" +const MockTriggerCapabilityID = "mock-streams-trigger@1.0.0" + +const triggerID = MockTriggerCapabilityID var capInfo = capabilities.MustNewCapabilityInfo( triggerID, @@ -53,15 +55,23 @@ type MockTriggerService struct { signers []*ecdsa.PrivateKey stopCh services.StopChan wg sync.WaitGroup + loopInterval time.Duration subscribers map[string][]streams.FeedId subscribersMu sync.Mutex lggr logger.Logger } func NewMockTriggerService(tickerResolutionMs int64, lggr logger.Logger) *MockTriggerService { - trigger, _ := triggers.NewMercuryTriggerService(tickerResolutionMs, "mock-streams-trigger", "1.0.0", lggr) + trigger, err := triggers.NewMercuryTriggerService(tickerResolutionMs, "mock-streams-trigger", "1.0.0", lggr) + if err != nil { + panic(err) + } trigger.CapabilityInfo = capInfo + if tickerResolutionMs == 0 { + tickerResolutionMs = 1000 + } + f := 1 meta := datastreams.Metadata{MinRequiredSignatures: 2*f + 1} // gen private keys for MinRequiredSignatures @@ -89,14 +99,18 @@ func NewMockTriggerService(tickerResolutionMs int64, lggr logger.Logger) *MockTr MercuryTriggerService: trigger, meta: meta, signers: signers, + stopCh: make(services.StopChan), + loopInterval: time.Duration(tickerResolutionMs) * time.Millisecond, subscribers: make(map[string][]streams.FeedId), - lggr: lggr} + lggr: lggr, + } } func (m *MockTriggerService) Start(ctx context.Context) error { if err := m.MercuryTriggerService.Start(ctx); err != nil { return err } + m.wg.Add(1) go m.loop() return nil } @@ -159,15 +173,22 @@ func rawReportContext(reportCtx ocrTypes.ReportContext) []byte { } func (m *MockTriggerService) loop() { - sleepSec := 15 - ticker := time.NewTicker(time.Duration(sleepSec) * time.Second) + defer m.wg.Done() + + ticker := time.NewTicker(m.loopInterval) defer ticker.Stop() prices := []int64{300000, 40000, 5000000} j := 0 - for range ticker.C { + for { + select { + case <-m.stopCh: + return + case <-ticker.C: + } + // TODO: properly close for i := range prices { prices[i] = prices[i] + 1 diff --git a/core/capabilities/streams/mock_trigger_test.go b/core/capabilities/streams/mock_trigger_test.go new file mode 100644 index 00000000000..a051e703062 --- /dev/null +++ b/core/capabilities/streams/mock_trigger_test.go @@ -0,0 +1,135 @@ +package streams + +import ( + "context" + "math/big" + "testing" + "time" + + "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/crypto" + "github.com/stretchr/testify/require" + + ocrTypes "github.com/smartcontractkit/libocr/offchainreporting2plus/types" + + commoncap "github.com/smartcontractkit/chainlink-common/pkg/capabilities" + "github.com/smartcontractkit/chainlink-common/pkg/capabilities/datastreams" + triggercfg "github.com/smartcontractkit/chainlink-common/pkg/capabilities/triggers/streams" + "github.com/smartcontractkit/chainlink-protos/cre/go/values" + + "github.com/smartcontractkit/chainlink/v2/core/logger" +) + +const testFeedID triggercfg.FeedId = "0x0000000000000000000000000000000000000000000000000000000000000001" + +func TestMockTriggerServiceStartClose(t *testing.T) { + t.Parallel() + + svc := NewMockTriggerService(10, logger.TestLogger(t)) + require.NoError(t, svc.Start(context.Background())) + + done := make(chan error, 1) + go func() { + done <- svc.Close() + }() + + select { + case err := <-done: + require.NoError(t, err) + case <-time.After(time.Second): + t.Fatal("timed out waiting for mock trigger service to close") + } +} + +func TestMockTriggerServiceRegisterEmitUnregister(t *testing.T) { + t.Parallel() + + ctx := context.Background() + svc := NewMockTriggerService(200, logger.TestLogger(t)) + require.NoError(t, svc.MercuryTriggerService.Start(ctx)) + t.Cleanup(func() { + require.NoError(t, svc.MercuryTriggerService.Close()) + }) + + req := newMockTriggerRequest(t, "workflow-a", "trigger-a", 200) + ch, err := svc.RegisterTrigger(ctx, req) + require.NoError(t, err) + + svc.subscribersMu.Lock() + require.Equal(t, []triggercfg.FeedId{testFeedID}, svc.subscribers[req.Metadata.WorkflowID]) + svc.subscribersMu.Unlock() + + require.NoError(t, svc.ProcessReport([]datastreams.FeedReport{newSignedMockFeedReport(t, svc, testFeedID)})) + + var resp commoncap.TriggerResponse + select { + case resp = <-ch: + case <-time.After(2 * time.Second): + t.Fatal("timed out waiting for mock trigger event") + } + + triggerEvent := datastreams.StreamsTriggerEvent{} + require.NoError(t, resp.Event.Outputs.UnwrapTo(&triggerEvent)) + require.Equal(t, MockTriggerCapabilityID, resp.Event.TriggerType) + require.Len(t, triggerEvent.Payload, 1) + require.Len(t, triggerEvent.Payload[0].Signatures, svc.meta.MinRequiredSignatures) + require.Len(t, triggerEvent.Metadata.Signers, svc.meta.MinRequiredSignatures) + require.Equal(t, svc.meta.MinRequiredSignatures, triggerEvent.Metadata.MinRequiredSignatures) + + require.NoError(t, svc.UnregisterTrigger(ctx, req)) + + svc.subscribersMu.Lock() + require.Empty(t, svc.subscribers) + svc.subscribersMu.Unlock() + + select { + case _, ok := <-ch: + require.False(t, ok) + case <-time.After(time.Second): + t.Fatal("timed out waiting for subscriber channel to close") + } +} + +func newMockTriggerRequest(t *testing.T, workflowID, triggerID string, maxFrequencyMs uint64) commoncap.TriggerRegistrationRequest { + t.Helper() + + cfg, err := values.WrapMap(triggercfg.TriggerConfig{ + FeedIds: []triggercfg.FeedId{testFeedID}, + MaxFrequencyMs: maxFrequencyMs, + }) + require.NoError(t, err) + + return commoncap.TriggerRegistrationRequest{ + TriggerID: triggerID, + Metadata: commoncap.RequestMetadata{ + WorkflowID: workflowID, + }, + Config: cfg, + } +} + +func newSignedMockFeedReport(t *testing.T, svc *MockTriggerService, feedID triggercfg.FeedId) datastreams.FeedReport { + t.Helper() + + timestamp := time.Now().Unix() + reportCtx := ocrTypes.ReportContext{ + ReportTimestamp: ocrTypes.ReportTimestamp{Epoch: uint32(baseTimestamp + 1)}, + } + + report := datastreams.FeedReport{ + FeedID: string(feedID), + FullReport: newReport(svc.lggr, common.HexToHash(string(feedID)), big.NewInt(123456), timestamp), + ReportContext: rawReportContext(reportCtx), + ObservationTimestamp: timestamp, + } + + sigData := append(crypto.Keccak256(report.FullReport), report.ReportContext...) + hash := crypto.Keccak256(sigData) + for _, signer := range svc.signers { + sig, err := crypto.Sign(hash, signer) + require.NoError(t, err) + report.Signatures = append(report.Signatures, sig) + } + + return report +} diff --git a/core/services/chainlink/application.go b/core/services/chainlink/application.go index ebfdc090fd2..d7f1b6b2441 100644 --- a/core/services/chainlink/application.go +++ b/core/services/chainlink/application.go @@ -56,10 +56,6 @@ import ( "github.com/smartcontractkit/chainlink/v2/core/build" "github.com/smartcontractkit/chainlink/v2/core/capabilities" "github.com/smartcontractkit/chainlink/v2/core/capabilities/ccip" - "github.com/smartcontractkit/chainlink/v2/core/capabilities/compute" - gatewayconnector "github.com/smartcontractkit/chainlink/v2/core/capabilities/gateway_connector" - "github.com/smartcontractkit/chainlink/v2/core/capabilities/remote" - remotetypes "github.com/smartcontractkit/chainlink/v2/core/capabilities/remote/types" capStreams "github.com/smartcontractkit/chainlink/v2/core/capabilities/streams" "github.com/smartcontractkit/chainlink/v2/core/config" "github.com/smartcontractkit/chainlink/v2/core/logger" @@ -220,6 +216,10 @@ type ApplicationOpts struct { DonTimeStore *dontime.Store } +func shouldRegisterMockStreamsTrigger(local config.LocalCapabilities) bool { + return local != nil && local.GetCapabilityConfig(capStreams.MockTriggerCapabilityID) != nil +} + // NewApplication initializes a new store if one is not already // present at the configured root directory (default: ~/.chainlink), // the logger at the same directory and returns the Application to @@ -296,7 +296,7 @@ func NewApplication(ctx context.Context, opts ApplicationOpts) (Application, err loopRegistry := plugins.NewLoopRegistry(globalLogger, cfg.AppID().String(), cfg.Feature().LogPoller(), cfg.Database(), cfg.Mercury(), cfg.Tracing(), cfg.Telemetry(), beholderAuthHeaders, csaPubKeyHex, cfg.LOOPP()) - if cfg.Capabilities().Local().GetCapabilityConfig("mock-streams-trigger@1.0.0") != nil { + if shouldRegisterMockStreamsTrigger(cfg.Capabilities().Local()) { // Register the mock streams trigger only for test environments that explicitly opt in. // TODO: proper component shutdown via srvcs(). _, err = capStreams.RegisterMockTrigger(globalLogger, opts.CapabilitiesRegistry) diff --git a/core/services/chainlink/application_mock_trigger_test.go b/core/services/chainlink/application_mock_trigger_test.go new file mode 100644 index 00000000000..d6df18e9173 --- /dev/null +++ b/core/services/chainlink/application_mock_trigger_test.go @@ -0,0 +1,61 @@ +package chainlink + +import ( + "testing" + + "github.com/stretchr/testify/require" + + capStreams "github.com/smartcontractkit/chainlink/v2/core/capabilities/streams" + "github.com/smartcontractkit/chainlink/v2/core/config" +) + +type stubLocalCapabilities struct { + cfgs map[string]config.CapabilityNodeConfig +} + +func (s stubLocalCapabilities) RegistryBasedLaunchAllowlist() []string { + return nil +} + +func (s stubLocalCapabilities) Capabilities() map[string]config.CapabilityNodeConfig { + return s.cfgs +} + +func (s stubLocalCapabilities) IsAllowlisted(string) bool { + return false +} + +func (s stubLocalCapabilities) GetCapabilityConfig(capabilityID string) config.CapabilityNodeConfig { + if s.cfgs == nil { + return nil + } + return s.cfgs[capabilityID] +} + +type stubCapabilityNodeConfig struct{} + +func (stubCapabilityNodeConfig) BinaryPathOverride() string { + return "" +} + +func (stubCapabilityNodeConfig) Config() map[string]string { + return nil +} + +func TestShouldRegisterMockStreamsTrigger(t *testing.T) { + t.Parallel() + + require.False(t, shouldRegisterMockStreamsTrigger(nil)) + + require.False(t, shouldRegisterMockStreamsTrigger(stubLocalCapabilities{ + cfgs: map[string]config.CapabilityNodeConfig{ + "cron@1.0.0": stubCapabilityNodeConfig{}, + }, + })) + + require.True(t, shouldRegisterMockStreamsTrigger(stubLocalCapabilities{ + cfgs: map[string]config.CapabilityNodeConfig{ + capStreams.MockTriggerCapabilityID: stubCapabilityNodeConfig{}, + }, + })) +} From 89ac6bd6f04661d9d1fc306229d5e8c98f82a65d Mon Sep 17 00:00:00 2001 From: cawthorne Date: Thu, 2 Apr 2026 14:06:02 +0100 Subject: [PATCH 05/12] fix: address mock trigger lint issues --- core/capabilities/streams/mock_trigger.go | 51 ++++++++++++++--------- 1 file changed, 32 insertions(+), 19 deletions(-) diff --git a/core/capabilities/streams/mock_trigger.go b/core/capabilities/streams/mock_trigger.go index 9d1a6f20e3e..4f893398e9e 100644 --- a/core/capabilities/streams/mock_trigger.go +++ b/core/capabilities/streams/mock_trigger.go @@ -79,7 +79,7 @@ func NewMockTriggerService(tickerResolutionMs int64, lggr logger.Logger) *MockTr for i := 0; i < meta.MinRequiredSignatures; i++ { // test keys: need to be the same across nodes bytes := make([]byte, 32) - bytes[31] = uint8(i + 1) + bytes[31] = mustUint8(i + 1) privKey, err := crypto.ToECDSA(bytes) if err != nil { @@ -121,24 +121,24 @@ func (m *MockTriggerService) Close() error { return m.MercuryTriggerService.Close() } -func (o *MockTriggerService) RegisterTrigger(ctx context.Context, req capabilities.TriggerRegistrationRequest) (<-chan capabilities.TriggerResponse, error) { - ch, err := o.MercuryTriggerService.RegisterTrigger(ctx, req) +func (m *MockTriggerService) RegisterTrigger(ctx context.Context, req capabilities.TriggerRegistrationRequest) (<-chan capabilities.TriggerResponse, error) { + ch, err := m.MercuryTriggerService.RegisterTrigger(ctx, req) if err != nil { return nil, err } - config, _ := o.MercuryTriggerService.ValidateConfig(req.Config) - o.subscribersMu.Lock() - defer o.subscribersMu.Unlock() - o.subscribers[req.Metadata.WorkflowID] = config.FeedIds + config, _ := m.ValidateConfig(req.Config) + m.subscribersMu.Lock() + defer m.subscribersMu.Unlock() + m.subscribers[req.Metadata.WorkflowID] = config.FeedIds return ch, nil } -func (o *MockTriggerService) UnregisterTrigger(ctx context.Context, req capabilities.TriggerRegistrationRequest) error { - err := o.MercuryTriggerService.UnregisterTrigger(ctx, req) - o.subscribersMu.Lock() - defer o.subscribersMu.Unlock() - delete(o.subscribers, req.Metadata.WorkflowID) +func (m *MockTriggerService) UnregisterTrigger(ctx context.Context, req capabilities.TriggerRegistrationRequest) error { + err := m.MercuryTriggerService.UnregisterTrigger(ctx, req) + m.subscribersMu.Lock() + defer m.subscribersMu.Unlock() + delete(m.subscribers, req.Metadata.WorkflowID) return err } @@ -149,13 +149,13 @@ func newReport(lggr logger.Logger, feedID [32]byte, price *big.Int, timestamp in v3Codec := reportcodec.NewReportCodec(feedID, lggr) raw, err := v3Codec.BuildReport(context.Background(), v3.ReportFields{ BenchmarkPrice: price, - Timestamp: uint32(timestamp), - ValidFromTimestamp: uint32(timestamp), + Timestamp: mustUint32(timestamp), + ValidFromTimestamp: mustUint32(timestamp), Bid: price, Ask: price, LinkFee: price, NativeFee: price, - ExpiresAt: uint32(timestamp + 1000000), + ExpiresAt: mustUint32(timestamp + 1000000), }) if err != nil { panic(err) @@ -163,6 +163,20 @@ func newReport(lggr logger.Logger, feedID [32]byte, price *big.Int, timestamp in return raw } +func mustUint32(v int64) uint32 { + if v < 0 || v > int64(^uint32(0)) { + panic("timestamp out of uint32 range") + } + return uint32(v) +} + +func mustUint8(v int) uint8 { + if v < 0 || v > int(^uint8(0)) { + panic("value out of uint8 range") + } + return uint8(v) +} + func rawReportContext(reportCtx ocrTypes.ReportContext) []byte { rc := evmutil.RawReportContext(reportCtx) flat := []byte{} @@ -191,7 +205,7 @@ func (m *MockTriggerService) loop() { // TODO: properly close for i := range prices { - prices[i] = prices[i] + 1 + prices[i]++ } j++ @@ -199,7 +213,7 @@ func (m *MockTriggerService) loop() { timestamp := time.Now().Unix() // TODO: shouldn't we increment round rather than epoch? - reportCtx := ocrTypes.ReportContext{ReportTimestamp: ocrTypes.ReportTimestamp{Epoch: uint32(baseTimestamp + j)}} + reportCtx := ocrTypes.ReportContext{ReportTimestamp: ocrTypes.ReportTimestamp{Epoch: mustUint32(int64(baseTimestamp + j))}} reports := []datastreams.FeedReport{} subscribers := map[string][]streams.FeedId{} @@ -228,11 +242,10 @@ func (m *MockTriggerService) loop() { reports = append(reports, report) } - } m.lggr.Infow("New set of Mock reports", "timestamp", time.Now().Unix(), "payload", reports) - err := m.MercuryTriggerService.ProcessReport(reports) + err := m.ProcessReport(reports) if err != nil { m.lggr.Errorw("failed to process Mock reports", "err", err, "timestamp", time.Now().Unix(), "payload", reports) } From 64432d96b4b0f0bf0867189086d5ca7adb338798 Mon Sep 17 00:00:00 2001 From: cawthorne Date: Thu, 2 Apr 2026 15:14:26 +0100 Subject: [PATCH 06/12] refactor: simplify mock streams trigger flow --- core/capabilities/streams/mock_trigger.go | 173 +++++++++++------- .../capabilities/streams/mock_trigger_test.go | 4 +- 2 files changed, 113 insertions(+), 64 deletions(-) diff --git a/core/capabilities/streams/mock_trigger.go b/core/capabilities/streams/mock_trigger.go index 4f893398e9e..6018cd7056e 100644 --- a/core/capabilities/streams/mock_trigger.go +++ b/core/capabilities/streams/mock_trigger.go @@ -3,6 +3,7 @@ package streams import ( "context" "crypto/ecdsa" + "fmt" "maps" "math/big" "sync" @@ -40,10 +41,8 @@ func RegisterMockTrigger(lggr logger.Logger, capRegistry core.CapabilitiesRegist const MockTriggerCapabilityID = "mock-streams-trigger@1.0.0" -const triggerID = MockTriggerCapabilityID - var capInfo = capabilities.MustNewCapabilityInfo( - triggerID, + MockTriggerCapabilityID, capabilities.CapabilityTypeTrigger, "Mock Streams Trigger", ) @@ -72,24 +71,7 @@ func NewMockTriggerService(tickerResolutionMs int64, lggr logger.Logger) *MockTr tickerResolutionMs = 1000 } - f := 1 - meta := datastreams.Metadata{MinRequiredSignatures: 2*f + 1} - // gen private keys for MinRequiredSignatures - signers := []*ecdsa.PrivateKey{} - for i := 0; i < meta.MinRequiredSignatures; i++ { - // test keys: need to be the same across nodes - bytes := make([]byte, 32) - bytes[31] = mustUint8(i + 1) - - privKey, err := crypto.ToECDSA(bytes) - if err != nil { - panic(err) - } - signers = append(signers, privKey) - - signerAddr := crypto.PubkeyToAddress(privKey.PublicKey).Bytes() - meta.Signers = append(meta.Signers, signerAddr) - } + meta, signers := newMockMetadataAndSigners() // MercuryTrigger is typically wrapped by other modules that ignore the trigger's meta and provide a different one. // Since we're skipping those wrappers we need to provide our own meta here. @@ -106,6 +88,28 @@ func NewMockTriggerService(tickerResolutionMs int64, lggr logger.Logger) *MockTr } } +func newMockMetadataAndSigners() (datastreams.Metadata, []*ecdsa.PrivateKey) { + f := 1 + meta := datastreams.Metadata{MinRequiredSignatures: 2*f + 1} + signers := make([]*ecdsa.PrivateKey, 0, meta.MinRequiredSignatures) + for i := 0; i < meta.MinRequiredSignatures; i++ { + privKey := mustMockSigner(i + 1) + signers = append(signers, privKey) + meta.Signers = append(meta.Signers, crypto.PubkeyToAddress(privKey.PublicKey).Bytes()) + } + return meta, signers +} + +func mustMockSigner(index int) *ecdsa.PrivateKey { + bytes := make([]byte, 32) + bytes[31] = mustUint8(index) + privKey, err := crypto.ToECDSA(bytes) + if err != nil { + panic(err) + } + return privKey +} + func (m *MockTriggerService) Start(ctx context.Context) error { if err := m.MercuryTriggerService.Start(ctx); err != nil { return err @@ -145,7 +149,7 @@ func (m *MockTriggerService) UnregisterTrigger(ctx context.Context, req capabili const baseTimestamp = 1000000000 // NOTE: duplicated from trigger_test.go -func newReport(lggr logger.Logger, feedID [32]byte, price *big.Int, timestamp int64) []byte { +func newReport(lggr logger.Logger, feedID [32]byte, price *big.Int, timestamp int64) ([]byte, error) { v3Codec := reportcodec.NewReportCodec(feedID, lggr) raw, err := v3Codec.BuildReport(context.Background(), v3.ReportFields{ BenchmarkPrice: price, @@ -158,9 +162,9 @@ func newReport(lggr logger.Logger, feedID [32]byte, price *big.Int, timestamp in ExpiresAt: mustUint32(timestamp + 1000000), }) if err != nil { - panic(err) + return nil, err } - return raw + return raw, nil } func mustUint32(v int64) uint32 { @@ -179,7 +183,7 @@ func mustUint8(v int) uint8 { func rawReportContext(reportCtx ocrTypes.ReportContext) []byte { rc := evmutil.RawReportContext(reportCtx) - flat := []byte{} + flat := make([]byte, 0, len(rc)*32) for _, r := range rc { flat = append(flat, r[:]...) } @@ -203,51 +207,94 @@ func (m *MockTriggerService) loop() { case <-ticker.C: } - // TODO: properly close - for i := range prices { - prices[i]++ - } + incrementPrices(prices) j++ - // https://github.com/smartcontractkit/chainlink/blob/41f9428c3aa8231e8834a230fca4c2ccffd4e6c3/core/capabilities/streams/trigger_test.go#L117-L122 - timestamp := time.Now().Unix() - // TODO: shouldn't we increment round rather than epoch? - reportCtx := ocrTypes.ReportContext{ReportTimestamp: ocrTypes.ReportTimestamp{Epoch: mustUint32(int64(baseTimestamp + j))}} - - reports := []datastreams.FeedReport{} - subscribers := map[string][]streams.FeedId{} - m.subscribersMu.Lock() - maps.Copy(subscribers, m.subscribers) - m.subscribersMu.Unlock() - for _, feedIDs := range subscribers { - for _, feedID := range feedIDs { - feedID := string(feedID) - report := datastreams.FeedReport{ - FeedID: feedID, - FullReport: newReport(m.lggr, common.HexToHash(feedID), big.NewInt(prices[0]), timestamp), - ReportContext: rawReportContext(reportCtx), - ObservationTimestamp: timestamp, - } - // sign report with mock signers - sigData := append(crypto.Keccak256(report.FullReport), report.ReportContext...) - hash := crypto.Keccak256(sigData) - for n := 0; n < m.meta.MinRequiredSignatures; n++ { - sig, err := crypto.Sign(hash, m.signers[n]) - if err != nil { - panic(err) - } - report.Signatures = append(report.Signatures, sig) - } - - reports = append(reports, report) + reportCtx := newReportContext(j) + reports, err := m.buildReports(timestamp, prices[0], reportCtx) + if err != nil { + m.lggr.Errorw("failed to build Mock reports", "err", err, "timestamp", timestamp) + continue + } + if len(reports) == 0 { + continue + } + + m.lggr.Infow("New set of Mock reports", "timestamp", timestamp, "payload", reports) + if err := m.ProcessReport(reports); err != nil { + m.lggr.Errorw("failed to process Mock reports", "err", err, "timestamp", timestamp, "payload", reports) + } + } +} + +func incrementPrices(prices []int64) { + for i := range prices { + prices[i]++ + } +} + +func newReportContext(iteration int) ocrTypes.ReportContext { + // We bump the epoch because this mock trigger is generating synthetic OCR rounds. + return ocrTypes.ReportContext{ + ReportTimestamp: ocrTypes.ReportTimestamp{ + Epoch: mustUint32(int64(baseTimestamp + iteration)), + }, + } +} + +func (m *MockTriggerService) buildReports(timestamp, price int64, reportCtx ocrTypes.ReportContext) ([]datastreams.FeedReport, error) { + reports := make([]datastreams.FeedReport, 0) + for _, feedIDs := range m.snapshotSubscribers() { + for _, feedID := range feedIDs { + report, err := m.newSignedReport(string(feedID), price, timestamp, reportCtx) + if err != nil { + return nil, err } + reports = append(reports, report) } + } + return reports, nil +} + +func (m *MockTriggerService) snapshotSubscribers() map[string][]streams.FeedId { + m.subscribersMu.Lock() + defer m.subscribersMu.Unlock() + + snapshot := make(map[string][]streams.FeedId, len(m.subscribers)) + maps.Copy(snapshot, m.subscribers) + return snapshot +} + +func (m *MockTriggerService) newSignedReport(feedID string, price, timestamp int64, reportCtx ocrTypes.ReportContext) (datastreams.FeedReport, error) { + fullReport, err := newReport(m.lggr, common.HexToHash(feedID), big.NewInt(price), timestamp) + if err != nil { + return datastreams.FeedReport{}, fmt.Errorf("build report for feed %s: %w", feedID, err) + } - m.lggr.Infow("New set of Mock reports", "timestamp", time.Now().Unix(), "payload", reports) - err := m.ProcessReport(reports) + report := datastreams.FeedReport{ + FeedID: feedID, + FullReport: fullReport, + ReportContext: rawReportContext(reportCtx), + ObservationTimestamp: timestamp, + } + + if err := m.signReport(&report); err != nil { + return datastreams.FeedReport{}, fmt.Errorf("sign report for feed %s: %w", feedID, err) + } + + return report, nil +} + +func (m *MockTriggerService) signReport(report *datastreams.FeedReport) error { + sigData := append(crypto.Keccak256(report.FullReport), report.ReportContext...) + hash := crypto.Keccak256(sigData) + for n := 0; n < m.meta.MinRequiredSignatures; n++ { + sig, err := crypto.Sign(hash, m.signers[n]) if err != nil { - m.lggr.Errorw("failed to process Mock reports", "err", err, "timestamp", time.Now().Unix(), "payload", reports) + return err } + report.Signatures = append(report.Signatures, sig) } + return nil } diff --git a/core/capabilities/streams/mock_trigger_test.go b/core/capabilities/streams/mock_trigger_test.go index a051e703062..e8c45c5445d 100644 --- a/core/capabilities/streams/mock_trigger_test.go +++ b/core/capabilities/streams/mock_trigger_test.go @@ -118,10 +118,12 @@ func newSignedMockFeedReport(t *testing.T, svc *MockTriggerService, feedID trigg report := datastreams.FeedReport{ FeedID: string(feedID), - FullReport: newReport(svc.lggr, common.HexToHash(string(feedID)), big.NewInt(123456), timestamp), ReportContext: rawReportContext(reportCtx), ObservationTimestamp: timestamp, } + fullReport, err := newReport(svc.lggr, common.HexToHash(string(feedID)), big.NewInt(123456), timestamp) + require.NoError(t, err) + report.FullReport = fullReport sigData := append(crypto.Keccak256(report.FullReport), report.ReportContext...) hash := crypto.Keccak256(sigData) From 9dfec363c90dce12767f8f9f0036c06ee86480d8 Mon Sep 17 00:00:00 2001 From: cawthorne Date: Thu, 2 Apr 2026 16:19:18 +0100 Subject: [PATCH 07/12] refactor: harden mock trigger setup --- core/capabilities/streams/mock_trigger.go | 90 ++++++++++++------- .../capabilities/streams/mock_trigger_test.go | 6 +- core/services/chainlink/application.go | 1 - 3 files changed, 64 insertions(+), 33 deletions(-) diff --git a/core/capabilities/streams/mock_trigger.go b/core/capabilities/streams/mock_trigger.go index 6018cd7056e..ff4413f0629 100644 --- a/core/capabilities/streams/mock_trigger.go +++ b/core/capabilities/streams/mock_trigger.go @@ -27,8 +27,11 @@ import ( ) func RegisterMockTrigger(lggr logger.Logger, capRegistry core.CapabilitiesRegistry) (*MockTriggerService, error) { - ctx := context.TODO() - trigger := NewMockTriggerService(100, lggr) + ctx := context.Background() + trigger, err := NewMockTriggerService(100, lggr) + if err != nil { + return nil, err + } if err := trigger.Start(ctx); err != nil { return nil, err } @@ -60,10 +63,10 @@ type MockTriggerService struct { lggr logger.Logger } -func NewMockTriggerService(tickerResolutionMs int64, lggr logger.Logger) *MockTriggerService { +func NewMockTriggerService(tickerResolutionMs int64, lggr logger.Logger) (*MockTriggerService, error) { trigger, err := triggers.NewMercuryTriggerService(tickerResolutionMs, "mock-streams-trigger", "1.0.0", lggr) if err != nil { - panic(err) + return nil, err } trigger.CapabilityInfo = capInfo @@ -71,7 +74,10 @@ func NewMockTriggerService(tickerResolutionMs int64, lggr logger.Logger) *MockTr tickerResolutionMs = 1000 } - meta, signers := newMockMetadataAndSigners() + meta, signers, err := newMockMetadataAndSigners() + if err != nil { + return nil, err + } // MercuryTrigger is typically wrapped by other modules that ignore the trigger's meta and provide a different one. // Since we're skipping those wrappers we need to provide our own meta here. @@ -85,29 +91,36 @@ func NewMockTriggerService(tickerResolutionMs int64, lggr logger.Logger) *MockTr loopInterval: time.Duration(tickerResolutionMs) * time.Millisecond, subscribers: make(map[string][]streams.FeedId), lggr: lggr, - } + }, nil } -func newMockMetadataAndSigners() (datastreams.Metadata, []*ecdsa.PrivateKey) { +func newMockMetadataAndSigners() (datastreams.Metadata, []*ecdsa.PrivateKey, error) { f := 1 meta := datastreams.Metadata{MinRequiredSignatures: 2*f + 1} signers := make([]*ecdsa.PrivateKey, 0, meta.MinRequiredSignatures) for i := 0; i < meta.MinRequiredSignatures; i++ { - privKey := mustMockSigner(i + 1) + privKey, err := newMockSigner(i + 1) + if err != nil { + return datastreams.Metadata{}, nil, err + } signers = append(signers, privKey) meta.Signers = append(meta.Signers, crypto.PubkeyToAddress(privKey.PublicKey).Bytes()) } - return meta, signers + return meta, signers, nil } -func mustMockSigner(index int) *ecdsa.PrivateKey { +func newMockSigner(index int) (*ecdsa.PrivateKey, error) { bytes := make([]byte, 32) - bytes[31] = mustUint8(index) + lastByte, err := toUint8(index) + if err != nil { + return nil, err + } + bytes[31] = lastByte privKey, err := crypto.ToECDSA(bytes) if err != nil { - panic(err) + return nil, err } - return privKey + return privKey, nil } func (m *MockTriggerService) Start(ctx context.Context) error { @@ -131,7 +144,11 @@ func (m *MockTriggerService) RegisterTrigger(ctx context.Context, req capabiliti return nil, err } - config, _ := m.ValidateConfig(req.Config) + config, err := m.ValidateConfig(req.Config) + if err != nil { + _ = m.MercuryTriggerService.UnregisterTrigger(ctx, req) + return nil, err + } m.subscribersMu.Lock() defer m.subscribersMu.Unlock() m.subscribers[req.Metadata.WorkflowID] = config.FeedIds @@ -150,16 +167,24 @@ const baseTimestamp = 1000000000 // NOTE: duplicated from trigger_test.go func newReport(lggr logger.Logger, feedID [32]byte, price *big.Int, timestamp int64) ([]byte, error) { + uintTimestamp, err := toUint32(timestamp) + if err != nil { + return nil, err + } + expiresAt, err := toUint32(timestamp + 1000000) + if err != nil { + return nil, err + } v3Codec := reportcodec.NewReportCodec(feedID, lggr) raw, err := v3Codec.BuildReport(context.Background(), v3.ReportFields{ BenchmarkPrice: price, - Timestamp: mustUint32(timestamp), - ValidFromTimestamp: mustUint32(timestamp), + Timestamp: uintTimestamp, + ValidFromTimestamp: uintTimestamp, Bid: price, Ask: price, LinkFee: price, NativeFee: price, - ExpiresAt: mustUint32(timestamp + 1000000), + ExpiresAt: expiresAt, }) if err != nil { return nil, err @@ -167,18 +192,18 @@ func newReport(lggr logger.Logger, feedID [32]byte, price *big.Int, timestamp in return raw, nil } -func mustUint32(v int64) uint32 { +func toUint32(v int64) (uint32, error) { if v < 0 || v > int64(^uint32(0)) { - panic("timestamp out of uint32 range") + return 0, fmt.Errorf("value %d out of uint32 range", v) } - return uint32(v) + return uint32(v), nil } -func mustUint8(v int) uint8 { +func toUint8(v int) (uint8, error) { if v < 0 || v > int(^uint8(0)) { - panic("value out of uint8 range") + return 0, fmt.Errorf("value %d out of uint8 range", v) } - return uint8(v) + return uint8(v), nil } func rawReportContext(reportCtx ocrTypes.ReportContext) []byte { @@ -211,7 +236,11 @@ func (m *MockTriggerService) loop() { j++ timestamp := time.Now().Unix() - reportCtx := newReportContext(j) + reportCtx, err := newReportContext(j) + if err != nil { + m.lggr.Errorw("failed to build Mock report context", "err", err, "timestamp", timestamp) + continue + } reports, err := m.buildReports(timestamp, prices[0], reportCtx) if err != nil { m.lggr.Errorw("failed to build Mock reports", "err", err, "timestamp", timestamp) @@ -234,13 +263,14 @@ func incrementPrices(prices []int64) { } } -func newReportContext(iteration int) ocrTypes.ReportContext { - // We bump the epoch because this mock trigger is generating synthetic OCR rounds. - return ocrTypes.ReportContext{ - ReportTimestamp: ocrTypes.ReportTimestamp{ - Epoch: mustUint32(int64(baseTimestamp + iteration)), - }, +func newReportContext(iteration int) (ocrTypes.ReportContext, error) { + epoch, err := toUint32(int64(baseTimestamp + iteration)) + if err != nil { + return ocrTypes.ReportContext{}, err } + return ocrTypes.ReportContext{ + ReportTimestamp: ocrTypes.ReportTimestamp{Epoch: epoch}, + }, nil } func (m *MockTriggerService) buildReports(timestamp, price int64, reportCtx ocrTypes.ReportContext) ([]datastreams.FeedReport, error) { diff --git a/core/capabilities/streams/mock_trigger_test.go b/core/capabilities/streams/mock_trigger_test.go index e8c45c5445d..1b5d9376751 100644 --- a/core/capabilities/streams/mock_trigger_test.go +++ b/core/capabilities/streams/mock_trigger_test.go @@ -25,7 +25,8 @@ const testFeedID triggercfg.FeedId = "0x0000000000000000000000000000000000000000 func TestMockTriggerServiceStartClose(t *testing.T) { t.Parallel() - svc := NewMockTriggerService(10, logger.TestLogger(t)) + svc, err := NewMockTriggerService(10, logger.TestLogger(t)) + require.NoError(t, err) require.NoError(t, svc.Start(context.Background())) done := make(chan error, 1) @@ -45,7 +46,8 @@ func TestMockTriggerServiceRegisterEmitUnregister(t *testing.T) { t.Parallel() ctx := context.Background() - svc := NewMockTriggerService(200, logger.TestLogger(t)) + svc, err := NewMockTriggerService(200, logger.TestLogger(t)) + require.NoError(t, err) require.NoError(t, svc.MercuryTriggerService.Start(ctx)) t.Cleanup(func() { require.NoError(t, svc.MercuryTriggerService.Close()) diff --git a/core/services/chainlink/application.go b/core/services/chainlink/application.go index d7f1b6b2441..afd7aca6a1b 100644 --- a/core/services/chainlink/application.go +++ b/core/services/chainlink/application.go @@ -298,7 +298,6 @@ func NewApplication(ctx context.Context, opts ApplicationOpts) (Application, err if shouldRegisterMockStreamsTrigger(cfg.Capabilities().Local()) { // Register the mock streams trigger only for test environments that explicitly opt in. - // TODO: proper component shutdown via srvcs(). _, err = capStreams.RegisterMockTrigger(globalLogger, opts.CapabilitiesRegistry) if err != nil { return nil, err From f271b9cceb2be79efabee92e76a53c48adf5cd70 Mon Sep 17 00:00:00 2001 From: cawthorne Date: Thu, 2 Apr 2026 16:49:52 +0100 Subject: [PATCH 08/12] refactor: reduce mock trigger code smells --- core/capabilities/streams/mock_trigger.go | 73 ++++++++++++++----- .../application_mock_trigger_test.go | 42 ++++++++--- 2 files changed, 84 insertions(+), 31 deletions(-) diff --git a/core/capabilities/streams/mock_trigger.go b/core/capabilities/streams/mock_trigger.go index ff4413f0629..7778a5d638c 100644 --- a/core/capabilities/streams/mock_trigger.go +++ b/core/capabilities/streams/mock_trigger.go @@ -4,7 +4,6 @@ import ( "context" "crypto/ecdsa" "fmt" - "maps" "math/big" "sync" "time" @@ -26,9 +25,23 @@ import ( "github.com/smartcontractkit/chainlink/v2/core/logger" ) +const ( + mockTriggerCapabilityName = "mock-streams-trigger" + mockTriggerCapabilityVersion = "1.0.0" + mockTriggerRegisterResolution = 100 + defaultLoopIntervalMs int64 = 1000 + mockSignerFaultTolerance = 1 + mockSignerKeyLength = 32 + mockSignerKeyLastByteIndex = mockSignerKeyLength - 1 + reportExpiryOffsetSeconds = 1_000_000 + initialMockPriceA int64 = 300_000 + initialMockPriceB int64 = 40_000 + initialMockPriceC int64 = 5_000_000 +) + func RegisterMockTrigger(lggr logger.Logger, capRegistry core.CapabilitiesRegistry) (*MockTriggerService, error) { ctx := context.Background() - trigger, err := NewMockTriggerService(100, lggr) + trigger, err := NewMockTriggerService(mockTriggerRegisterResolution, lggr) if err != nil { return nil, err } @@ -36,13 +49,14 @@ func RegisterMockTrigger(lggr logger.Logger, capRegistry core.CapabilitiesRegist return nil, err } if err := capRegistry.Add(ctx, trigger); err != nil { + _ = trigger.Close() return nil, err } return trigger, nil } -const MockTriggerCapabilityID = "mock-streams-trigger@1.0.0" +const MockTriggerCapabilityID = mockTriggerCapabilityName + "@" + mockTriggerCapabilityVersion var capInfo = capabilities.MustNewCapabilityInfo( MockTriggerCapabilityID, @@ -56,6 +70,7 @@ type MockTriggerService struct { meta datastreams.Metadata signers []*ecdsa.PrivateKey stopCh services.StopChan + closeOnce sync.Once wg sync.WaitGroup loopInterval time.Duration subscribers map[string][]streams.FeedId @@ -64,14 +79,14 @@ type MockTriggerService struct { } func NewMockTriggerService(tickerResolutionMs int64, lggr logger.Logger) (*MockTriggerService, error) { - trigger, err := triggers.NewMercuryTriggerService(tickerResolutionMs, "mock-streams-trigger", "1.0.0", lggr) + trigger, err := triggers.NewMercuryTriggerService(tickerResolutionMs, mockTriggerCapabilityName, mockTriggerCapabilityVersion, lggr) if err != nil { return nil, err } trigger.CapabilityInfo = capInfo - if tickerResolutionMs == 0 { - tickerResolutionMs = 1000 + if tickerResolutionMs <= 0 { + tickerResolutionMs = defaultLoopIntervalMs } meta, signers, err := newMockMetadataAndSigners() @@ -95,8 +110,7 @@ func NewMockTriggerService(tickerResolutionMs int64, lggr logger.Logger) (*MockT } func newMockMetadataAndSigners() (datastreams.Metadata, []*ecdsa.PrivateKey, error) { - f := 1 - meta := datastreams.Metadata{MinRequiredSignatures: 2*f + 1} + meta := datastreams.Metadata{MinRequiredSignatures: 2*mockSignerFaultTolerance + 1} signers := make([]*ecdsa.PrivateKey, 0, meta.MinRequiredSignatures) for i := 0; i < meta.MinRequiredSignatures; i++ { privKey, err := newMockSigner(i + 1) @@ -110,12 +124,12 @@ func newMockMetadataAndSigners() (datastreams.Metadata, []*ecdsa.PrivateKey, err } func newMockSigner(index int) (*ecdsa.PrivateKey, error) { - bytes := make([]byte, 32) + bytes := make([]byte, mockSignerKeyLength) lastByte, err := toUint8(index) if err != nil { return nil, err } - bytes[31] = lastByte + bytes[mockSignerKeyLastByteIndex] = lastByte privKey, err := crypto.ToECDSA(bytes) if err != nil { return nil, err @@ -133,7 +147,9 @@ func (m *MockTriggerService) Start(ctx context.Context) error { } func (m *MockTriggerService) Close() error { - close(m.stopCh) + m.closeOnce.Do(func() { + close(m.stopCh) + }) m.wg.Wait() return m.MercuryTriggerService.Close() } @@ -171,7 +187,7 @@ func newReport(lggr logger.Logger, feedID [32]byte, price *big.Int, timestamp in if err != nil { return nil, err } - expiresAt, err := toUint32(timestamp + 1000000) + expiresAt, err := toUint32(timestamp + reportExpiryOffsetSeconds) if err != nil { return nil, err } @@ -221,9 +237,8 @@ func (m *MockTriggerService) loop() { ticker := time.NewTicker(m.loopInterval) defer ticker.Stop() - prices := []int64{300000, 40000, 5000000} - - j := 0 + prices := []int64{initialMockPriceA, initialMockPriceB, initialMockPriceC} + iteration := 0 for { select { @@ -233,10 +248,10 @@ func (m *MockTriggerService) loop() { } incrementPrices(prices) - j++ + iteration++ timestamp := time.Now().Unix() - reportCtx, err := newReportContext(j) + reportCtx, err := newReportContext(iteration) if err != nil { m.lggr.Errorw("failed to build Mock report context", "err", err, "timestamp", timestamp) continue @@ -274,8 +289,9 @@ func newReportContext(iteration int) (ocrTypes.ReportContext, error) { } func (m *MockTriggerService) buildReports(timestamp, price int64, reportCtx ocrTypes.ReportContext) ([]datastreams.FeedReport, error) { - reports := make([]datastreams.FeedReport, 0) - for _, feedIDs := range m.snapshotSubscribers() { + subscribers := m.snapshotSubscribers() + reports := make([]datastreams.FeedReport, 0, subscriberCount(subscribers)) + for _, feedIDs := range subscribers { for _, feedID := range feedIDs { report, err := m.newSignedReport(string(feedID), price, timestamp, reportCtx) if err != nil { @@ -292,10 +308,26 @@ func (m *MockTriggerService) snapshotSubscribers() map[string][]streams.FeedId { defer m.subscribersMu.Unlock() snapshot := make(map[string][]streams.FeedId, len(m.subscribers)) - maps.Copy(snapshot, m.subscribers) + for workflowID, feedIDs := range m.subscribers { + snapshot[workflowID] = cloneFeedIDs(feedIDs) + } return snapshot } +func subscriberCount(subscribers map[string][]streams.FeedId) int { + total := 0 + for _, feedIDs := range subscribers { + total += len(feedIDs) + } + return total +} + +func cloneFeedIDs(feedIDs []streams.FeedId) []streams.FeedId { + cloned := make([]streams.FeedId, len(feedIDs)) + copy(cloned, feedIDs) + return cloned +} + func (m *MockTriggerService) newSignedReport(feedID string, price, timestamp int64, reportCtx ocrTypes.ReportContext) (datastreams.FeedReport, error) { fullReport, err := newReport(m.lggr, common.HexToHash(feedID), big.NewInt(price), timestamp) if err != nil { @@ -319,6 +351,7 @@ func (m *MockTriggerService) newSignedReport(feedID string, price, timestamp int func (m *MockTriggerService) signReport(report *datastreams.FeedReport) error { sigData := append(crypto.Keccak256(report.FullReport), report.ReportContext...) hash := crypto.Keccak256(sigData) + report.Signatures = make([][]byte, 0, m.meta.MinRequiredSignatures) for n := 0; n < m.meta.MinRequiredSignatures; n++ { sig, err := crypto.Sign(hash, m.signers[n]) if err != nil { diff --git a/core/services/chainlink/application_mock_trigger_test.go b/core/services/chainlink/application_mock_trigger_test.go index d6df18e9173..2baa656a128 100644 --- a/core/services/chainlink/application_mock_trigger_test.go +++ b/core/services/chainlink/application_mock_trigger_test.go @@ -45,17 +45,37 @@ func (stubCapabilityNodeConfig) Config() map[string]string { func TestShouldRegisterMockStreamsTrigger(t *testing.T) { t.Parallel() - require.False(t, shouldRegisterMockStreamsTrigger(nil)) - - require.False(t, shouldRegisterMockStreamsTrigger(stubLocalCapabilities{ - cfgs: map[string]config.CapabilityNodeConfig{ - "cron@1.0.0": stubCapabilityNodeConfig{}, + tests := []struct { + name string + local config.LocalCapabilities + want bool + }{ + {name: "nil config", local: nil, want: false}, + { + name: "different local capability", + local: stubLocalCapabilities{ + cfgs: map[string]config.CapabilityNodeConfig{ + "cron@1.0.0": stubCapabilityNodeConfig{}, + }, + }, + want: false, }, - })) - - require.True(t, shouldRegisterMockStreamsTrigger(stubLocalCapabilities{ - cfgs: map[string]config.CapabilityNodeConfig{ - capStreams.MockTriggerCapabilityID: stubCapabilityNodeConfig{}, + { + name: "mock trigger opted in", + local: stubLocalCapabilities{ + cfgs: map[string]config.CapabilityNodeConfig{ + capStreams.MockTriggerCapabilityID: stubCapabilityNodeConfig{}, + }, + }, + want: true, }, - })) + } + + for _, tt := range tests { + tt := tt + t.Run(tt.name, func(t *testing.T) { + t.Parallel() + require.Equal(t, tt.want, shouldRegisterMockStreamsTrigger(tt.local)) + }) + } } From 9dd0c0de753373127b9ef6777b54e04b9f87e99f Mon Sep 17 00:00:00 2001 From: cawthorne Date: Thu, 2 Apr 2026 16:56:02 +0100 Subject: [PATCH 09/12] fix: satisfy copyloopvar in mock trigger test --- core/services/chainlink/application_mock_trigger_test.go | 1 - 1 file changed, 1 deletion(-) diff --git a/core/services/chainlink/application_mock_trigger_test.go b/core/services/chainlink/application_mock_trigger_test.go index 2baa656a128..907637e4893 100644 --- a/core/services/chainlink/application_mock_trigger_test.go +++ b/core/services/chainlink/application_mock_trigger_test.go @@ -72,7 +72,6 @@ func TestShouldRegisterMockStreamsTrigger(t *testing.T) { } for _, tt := range tests { - tt := tt t.Run(tt.name, func(t *testing.T) { t.Parallel() require.Equal(t, tt.want, shouldRegisterMockStreamsTrigger(tt.local)) From 172f3bbab8765026151c7da4161d0dcd0076559b Mon Sep 17 00:00:00 2001 From: cawthorne Date: Thu, 2 Apr 2026 17:30:33 +0100 Subject: [PATCH 10/12] refactor: move mock trigger opt-in out of application --- core/capabilities/registry.go | 27 ++++--- core/services/chainlink/application.go | 13 --- .../application_mock_trigger_test.go | 80 ------------------- .../services/standardcapabilities/delegate.go | 15 ++++ .../local_capabilities.go | 22 +++++ .../local_capabilities_test.go | 80 +++++++++++++++++++ 6 files changed, 135 insertions(+), 102 deletions(-) delete mode 100644 core/services/chainlink/application_mock_trigger_test.go create mode 100644 core/services/standardcapabilities/local_capabilities.go create mode 100644 core/services/standardcapabilities/local_capabilities_test.go diff --git a/core/capabilities/registry.go b/core/capabilities/registry.go index 1c95ed0aae4..7429fd9a23a 100644 --- a/core/capabilities/registry.go +++ b/core/capabilities/registry.go @@ -85,23 +85,32 @@ type TestMetadataRegistry struct { core.UnimplementedCapabilitiesRegistryMetadata } +const ( + testWorkflowDONID = 1 + testWorkflowDONConfigVersion = 1 + testWorkflowDONFaultTolerance = 1 +) + func (t *TestMetadataRegistry) LocalNode(ctx context.Context) (capabilities.Node, error) { peerID := p2ptypes.PeerID{} - workflowDON := capabilities.DON{ - ID: 1, - ConfigVersion: 1, + return capabilities.Node{ + PeerID: &peerID, + WorkflowDON: newTestWorkflowDON(peerID), + CapabilityDONs: []capabilities.DON{}, + }, nil +} + +func newTestWorkflowDON(peerID p2ptypes.PeerID) capabilities.DON { + return capabilities.DON{ + ID: testWorkflowDONID, + ConfigVersion: testWorkflowDONConfigVersion, Members: []p2ptypes.PeerID{ peerID, }, - F: 1, + F: testWorkflowDONFaultTolerance, IsPublic: false, AcceptsWorkflows: true, } - return capabilities.Node{ - PeerID: &peerID, - WorkflowDON: workflowDON, - CapabilityDONs: []capabilities.DON{}, - }, nil } func (t *TestMetadataRegistry) NodeByPeerID(ctx context.Context, _ p2ptypes.PeerID) (capabilities.Node, error) { diff --git a/core/services/chainlink/application.go b/core/services/chainlink/application.go index afd7aca6a1b..4803a14faa1 100644 --- a/core/services/chainlink/application.go +++ b/core/services/chainlink/application.go @@ -56,7 +56,6 @@ import ( "github.com/smartcontractkit/chainlink/v2/core/build" "github.com/smartcontractkit/chainlink/v2/core/capabilities" "github.com/smartcontractkit/chainlink/v2/core/capabilities/ccip" - capStreams "github.com/smartcontractkit/chainlink/v2/core/capabilities/streams" "github.com/smartcontractkit/chainlink/v2/core/config" "github.com/smartcontractkit/chainlink/v2/core/logger" "github.com/smartcontractkit/chainlink/v2/core/logger/audit" @@ -216,10 +215,6 @@ type ApplicationOpts struct { DonTimeStore *dontime.Store } -func shouldRegisterMockStreamsTrigger(local config.LocalCapabilities) bool { - return local != nil && local.GetCapabilityConfig(capStreams.MockTriggerCapabilityID) != nil -} - // NewApplication initializes a new store if one is not already // present at the configured root directory (default: ~/.chainlink), // the logger at the same directory and returns the Application to @@ -296,14 +291,6 @@ func NewApplication(ctx context.Context, opts ApplicationOpts) (Application, err loopRegistry := plugins.NewLoopRegistry(globalLogger, cfg.AppID().String(), cfg.Feature().LogPoller(), cfg.Database(), cfg.Mercury(), cfg.Tracing(), cfg.Telemetry(), beholderAuthHeaders, csaPubKeyHex, cfg.LOOPP()) - if shouldRegisterMockStreamsTrigger(cfg.Capabilities().Local()) { - // Register the mock streams trigger only for test environments that explicitly opt in. - _, err = capStreams.RegisterMockTrigger(globalLogger, opts.CapabilitiesRegistry) - if err != nil { - return nil, err - } - } - relayerFactory := RelayerFactory{ Logger: opts.Logger, Registerer: opts.Registerer, diff --git a/core/services/chainlink/application_mock_trigger_test.go b/core/services/chainlink/application_mock_trigger_test.go deleted file mode 100644 index 907637e4893..00000000000 --- a/core/services/chainlink/application_mock_trigger_test.go +++ /dev/null @@ -1,80 +0,0 @@ -package chainlink - -import ( - "testing" - - "github.com/stretchr/testify/require" - - capStreams "github.com/smartcontractkit/chainlink/v2/core/capabilities/streams" - "github.com/smartcontractkit/chainlink/v2/core/config" -) - -type stubLocalCapabilities struct { - cfgs map[string]config.CapabilityNodeConfig -} - -func (s stubLocalCapabilities) RegistryBasedLaunchAllowlist() []string { - return nil -} - -func (s stubLocalCapabilities) Capabilities() map[string]config.CapabilityNodeConfig { - return s.cfgs -} - -func (s stubLocalCapabilities) IsAllowlisted(string) bool { - return false -} - -func (s stubLocalCapabilities) GetCapabilityConfig(capabilityID string) config.CapabilityNodeConfig { - if s.cfgs == nil { - return nil - } - return s.cfgs[capabilityID] -} - -type stubCapabilityNodeConfig struct{} - -func (stubCapabilityNodeConfig) BinaryPathOverride() string { - return "" -} - -func (stubCapabilityNodeConfig) Config() map[string]string { - return nil -} - -func TestShouldRegisterMockStreamsTrigger(t *testing.T) { - t.Parallel() - - tests := []struct { - name string - local config.LocalCapabilities - want bool - }{ - {name: "nil config", local: nil, want: false}, - { - name: "different local capability", - local: stubLocalCapabilities{ - cfgs: map[string]config.CapabilityNodeConfig{ - "cron@1.0.0": stubCapabilityNodeConfig{}, - }, - }, - want: false, - }, - { - name: "mock trigger opted in", - local: stubLocalCapabilities{ - cfgs: map[string]config.CapabilityNodeConfig{ - capStreams.MockTriggerCapabilityID: stubCapabilityNodeConfig{}, - }, - }, - want: true, - }, - } - - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - t.Parallel() - require.Equal(t, tt.want, shouldRegisterMockStreamsTrigger(tt.local)) - }) - } -} diff --git a/core/services/standardcapabilities/delegate.go b/core/services/standardcapabilities/delegate.go index 7a59fe9e12e..6efab18b06f 100644 --- a/core/services/standardcapabilities/delegate.go +++ b/core/services/standardcapabilities/delegate.go @@ -66,6 +66,7 @@ type Delegate struct { creSettings core.SettingsBroadcaster ocrConfigService capregconfig.OCRConfigService localCfg coreconfig.LocalCapabilities + initErr error isNewlyCreatedJob bool } @@ -99,6 +100,11 @@ func NewDelegate( localCfg coreconfig.LocalCapabilities, opts ...func(*gateway.RoundRobinSelector), ) *Delegate { + initErr := registerOptionalMockStreamsTrigger(logger, localCfg, registry) + if initErr != nil { + logger.Errorw("Failed to register optional mock streams trigger", "err", initErr) + } + return &Delegate{ logger: logger, ds: ds, @@ -119,6 +125,7 @@ func NewDelegate( creSettings: creSettings, ocrConfigService: ocrConfigService, localCfg: localCfg, + initErr: initErr, selectorOpts: opts, } } @@ -133,6 +140,10 @@ func (d *Delegate) BeforeJobCreated(job job.Job) { } func (d *Delegate) ServicesForSpec(ctx context.Context, spec job.Job) ([]job.ServiceCtx, error) { + if d.initErr != nil { + return nil, d.initErr + } + command := spec.StandardCapabilitiesSpec.Command configJSON := spec.StandardCapabilitiesSpec.Config @@ -159,6 +170,10 @@ func (d *Delegate) NewServices( externalJobID uuid.UUID, oracleFactoryConfig job.OracleFactoryConfig, ) ([]job.ServiceCtx, error) { + if d.initErr != nil { + return nil, d.initErr + } + log := d.logger.Named("StandardCapabilities").Named(strconv.Itoa(int(jobID))).Named(jobName) kvStore := job.NewKVStore(jobID, d.ds) diff --git a/core/services/standardcapabilities/local_capabilities.go b/core/services/standardcapabilities/local_capabilities.go new file mode 100644 index 00000000000..a8b9aad85c8 --- /dev/null +++ b/core/services/standardcapabilities/local_capabilities.go @@ -0,0 +1,22 @@ +package standardcapabilities + +import ( + "github.com/smartcontractkit/chainlink-common/pkg/types/core" + + capStreams "github.com/smartcontractkit/chainlink/v2/core/capabilities/streams" + coreconfig "github.com/smartcontractkit/chainlink/v2/core/config" + "github.com/smartcontractkit/chainlink/v2/core/logger" +) + +func shouldRegisterMockStreamsTrigger(localCfg coreconfig.LocalCapabilities) bool { + return localCfg != nil && localCfg.GetCapabilityConfig(capStreams.MockTriggerCapabilityID) != nil +} + +func registerOptionalMockStreamsTrigger(lggr logger.Logger, localCfg coreconfig.LocalCapabilities, registry core.CapabilitiesRegistry) error { + if !shouldRegisterMockStreamsTrigger(localCfg) { + return nil + } + + _, err := capStreams.RegisterMockTrigger(lggr, registry) + return err +} diff --git a/core/services/standardcapabilities/local_capabilities_test.go b/core/services/standardcapabilities/local_capabilities_test.go new file mode 100644 index 00000000000..c3a6cc0f61c --- /dev/null +++ b/core/services/standardcapabilities/local_capabilities_test.go @@ -0,0 +1,80 @@ +package standardcapabilities + +import ( + "testing" + + "github.com/stretchr/testify/require" + + capStreams "github.com/smartcontractkit/chainlink/v2/core/capabilities/streams" + "github.com/smartcontractkit/chainlink/v2/core/config" +) + +type mockTriggerLocalCapabilities struct { + cfgs map[string]config.CapabilityNodeConfig +} + +func (s mockTriggerLocalCapabilities) RegistryBasedLaunchAllowlist() []string { + return nil +} + +func (s mockTriggerLocalCapabilities) Capabilities() map[string]config.CapabilityNodeConfig { + return s.cfgs +} + +func (s mockTriggerLocalCapabilities) IsAllowlisted(string) bool { + return false +} + +func (s mockTriggerLocalCapabilities) GetCapabilityConfig(capabilityID string) config.CapabilityNodeConfig { + if s.cfgs == nil { + return nil + } + return s.cfgs[capabilityID] +} + +type mockTriggerCapabilityNodeConfig struct{} + +func (mockTriggerCapabilityNodeConfig) BinaryPathOverride() string { + return "" +} + +func (mockTriggerCapabilityNodeConfig) Config() map[string]string { + return nil +} + +func TestShouldRegisterMockStreamsTrigger(t *testing.T) { + t.Parallel() + + tests := []struct { + name string + localCfg config.LocalCapabilities + want bool + }{ + {name: "nil config", localCfg: nil, want: false}, + { + name: "different local capability", + localCfg: mockTriggerLocalCapabilities{ + cfgs: map[string]config.CapabilityNodeConfig{ + "cron@1.0.0": mockTriggerCapabilityNodeConfig{}, + }, + }, + want: false, + }, + { + name: "mock trigger opted in", + localCfg: mockTriggerLocalCapabilities{ + cfgs: map[string]config.CapabilityNodeConfig{ + capStreams.MockTriggerCapabilityID: mockTriggerCapabilityNodeConfig{}, + }, + }, + want: true, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + t.Parallel() + require.Equal(t, tt.want, shouldRegisterMockStreamsTrigger(tt.localCfg)) + }) + } +} From 4b05e289ba8c4b9b5233b23d101f580a6c956bba Mon Sep 17 00:00:00 2001 From: cawthorne Date: Thu, 2 Apr 2026 18:01:17 +0100 Subject: [PATCH 11/12] fix: scope local CRE workflow DON fault tolerance --- core/capabilities/registry.go | 12 ++--- core/capabilities/registry_test.go | 39 +++++++++++++++ core/services/cre/cre.go | 12 ++++- core/services/cre/cre_test.go | 78 ++++++++++++++++++++++++++++++ 4 files changed, 134 insertions(+), 7 deletions(-) create mode 100644 core/capabilities/registry_test.go create mode 100644 core/services/cre/cre_test.go diff --git a/core/capabilities/registry.go b/core/capabilities/registry.go index 7429fd9a23a..daa34cd8bc6 100644 --- a/core/capabilities/registry.go +++ b/core/capabilities/registry.go @@ -83,31 +83,31 @@ func NewRegistry(lggr logger.Logger) *Registry { // interface. It is used when ExternalCapabilitiesRegistry is not available. type TestMetadataRegistry struct { core.UnimplementedCapabilitiesRegistryMetadata + WorkflowDONF uint8 } const ( - testWorkflowDONID = 1 - testWorkflowDONConfigVersion = 1 - testWorkflowDONFaultTolerance = 1 + testWorkflowDONID = 1 + testWorkflowDONConfigVersion = 1 ) func (t *TestMetadataRegistry) LocalNode(ctx context.Context) (capabilities.Node, error) { peerID := p2ptypes.PeerID{} return capabilities.Node{ PeerID: &peerID, - WorkflowDON: newTestWorkflowDON(peerID), + WorkflowDON: newTestWorkflowDON(peerID, t.WorkflowDONF), CapabilityDONs: []capabilities.DON{}, }, nil } -func newTestWorkflowDON(peerID p2ptypes.PeerID) capabilities.DON { +func newTestWorkflowDON(peerID p2ptypes.PeerID, faultTolerance uint8) capabilities.DON { return capabilities.DON{ ID: testWorkflowDONID, ConfigVersion: testWorkflowDONConfigVersion, Members: []p2ptypes.PeerID{ peerID, }, - F: testWorkflowDONFaultTolerance, + F: faultTolerance, IsPublic: false, AcceptsWorkflows: true, } diff --git a/core/capabilities/registry_test.go b/core/capabilities/registry_test.go new file mode 100644 index 00000000000..3dfa7b9f547 --- /dev/null +++ b/core/capabilities/registry_test.go @@ -0,0 +1,39 @@ +package capabilities + +import ( + "context" + "testing" + + "github.com/stretchr/testify/require" +) + +func TestTestMetadataRegistry_LocalNode_UsesConfiguredWorkflowDONF(t *testing.T) { + t.Parallel() + + tests := []struct { + name string + registry TestMetadataRegistry + expectedDonF uint8 + }{ + { + name: "default workflow DON fault tolerance", + registry: TestMetadataRegistry{}, + expectedDonF: 0, + }, + { + name: "mock trigger workflow DON fault tolerance", + registry: TestMetadataRegistry{WorkflowDONF: 1}, + expectedDonF: 1, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + t.Parallel() + + node, err := tt.registry.LocalNode(context.Background()) + require.NoError(t, err) + require.Equal(t, tt.expectedDonF, node.WorkflowDON.F) + }) + } +} diff --git a/core/services/cre/cre.go b/core/services/cre/cre.go index baf87584cc8..9dd8382ef82 100644 --- a/core/services/cre/cre.go +++ b/core/services/cre/cre.go @@ -36,6 +36,7 @@ import ( "github.com/smartcontractkit/chainlink/v2/core/capabilities/localcapmgr" "github.com/smartcontractkit/chainlink/v2/core/capabilities/remote" remotetypes "github.com/smartcontractkit/chainlink/v2/core/capabilities/remote/types" + capStreams "github.com/smartcontractkit/chainlink/v2/core/capabilities/streams" "github.com/smartcontractkit/chainlink/v2/core/config" "github.com/smartcontractkit/chainlink/v2/core/services/job" "github.com/smartcontractkit/chainlink/v2/core/services/keystore" @@ -488,7 +489,7 @@ func (w *dispatcherWrapper) newSubservices( capCfg := cfg.Capabilities() if !capCfg.Peering().Enabled() && !capCfg.SharedPeering().Enabled() { - opts.CapabilitiesRegistry.SetLocalRegistry(&capabilities.TestMetadataRegistry{}) + opts.CapabilitiesRegistry.SetLocalRegistry(newLocalTestMetadataRegistry(capCfg.Local())) return nil, nil } @@ -533,6 +534,15 @@ func (w *dispatcherWrapper) newSubservices( return subs, nil } +func newLocalTestMetadataRegistry(localCfg config.LocalCapabilities) *capabilities.TestMetadataRegistry { + registry := &capabilities.TestMetadataRegistry{} + if localCfg != nil && localCfg.GetCapabilityConfig(capStreams.MockTriggerCapabilityID) != nil { + registry.WorkflowDONF = 1 + } + + return registry +} + // newDispatcherWrapper creates a new dispatcherWrapper service with peer wrappers if peering is enabled func newDispatcherWrapper( cfg Config, diff --git a/core/services/cre/cre_test.go b/core/services/cre/cre_test.go new file mode 100644 index 00000000000..acb787fe3c4 --- /dev/null +++ b/core/services/cre/cre_test.go @@ -0,0 +1,78 @@ +package cre + +import ( + "testing" + + "github.com/stretchr/testify/require" + + capStreams "github.com/smartcontractkit/chainlink/v2/core/capabilities/streams" + "github.com/smartcontractkit/chainlink/v2/core/config" +) + +type testLocalCapabilities struct { + cfgs map[string]config.CapabilityNodeConfig +} + +func (t testLocalCapabilities) RegistryBasedLaunchAllowlist() []string { + return nil +} + +func (t testLocalCapabilities) Capabilities() map[string]config.CapabilityNodeConfig { + return t.cfgs +} + +func (t testLocalCapabilities) IsAllowlisted(string) bool { + return false +} + +func (t testLocalCapabilities) GetCapabilityConfig(capabilityID string) config.CapabilityNodeConfig { + if t.cfgs == nil { + return nil + } + + return t.cfgs[capabilityID] +} + +type testCapabilityNodeConfig struct{} + +func (testCapabilityNodeConfig) BinaryPathOverride() string { + return "" +} + +func (testCapabilityNodeConfig) Config() map[string]string { + return nil +} + +func TestNewLocalTestMetadataRegistry(t *testing.T) { + t.Parallel() + + tests := []struct { + name string + localCfg config.LocalCapabilities + expectedF uint8 + }{ + { + name: "default workflow DON fault tolerance", + localCfg: nil, + expectedF: 0, + }, + { + name: "mock trigger opt-in uses workflow DON fault tolerance one", + localCfg: testLocalCapabilities{ + cfgs: map[string]config.CapabilityNodeConfig{ + capStreams.MockTriggerCapabilityID: testCapabilityNodeConfig{}, + }, + }, + expectedF: 1, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + t.Parallel() + + registry := newLocalTestMetadataRegistry(tt.localCfg) + require.Equal(t, tt.expectedF, registry.WorkflowDONF) + }) + } +} From 465d17c6c8067aabac0ca37afeb86b781e52cced Mon Sep 17 00:00:00 2001 From: cawthorne Date: Thu, 2 Apr 2026 20:33:02 +0100 Subject: [PATCH 12/12] test: avoid duplicate signer allowlist tx --- system-tests/tests/test-helpers/before_suite.go | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/system-tests/tests/test-helpers/before_suite.go b/system-tests/tests/test-helpers/before_suite.go index 8cc18a6562d..d65815a48c5 100644 --- a/system-tests/tests/test-helpers/before_suite.go +++ b/system-tests/tests/test-helpers/before_suite.go @@ -277,6 +277,12 @@ func authorizePerTestWorkflowSignerIfNeeded(t *testing.T, sharedEnv *ttypes.Test workflowSignerAuthM.Lock() defer workflowSignerAuthM.Unlock() + allowed, err = registry.IsAllowedSigner(rootRegistryChain.SethClient.NewCallOpts(), signer) + require.NoError(t, err, "failed to re-check signer allowlist status") + if allowed { + return + } + _, err = rootRegistryChain.SethClient.Decode(registry.UpdateAllowedSigners(rootRegistryChain.SethClient.NewTXOpts(), []common.Address{signer}, true)) require.NoError(t, err, "failed to authorize per-test signer") }