diff --git a/core/capabilities/registry.go b/core/capabilities/registry.go index 830c201182e..daa34cd8bc6 100644 --- a/core/capabilities/registry.go +++ b/core/capabilities/registry.go @@ -83,25 +83,34 @@ func NewRegistry(lggr logger.Logger) *Registry { // interface. It is used when ExternalCapabilitiesRegistry is not available. type TestMetadataRegistry struct { core.UnimplementedCapabilitiesRegistryMetadata + WorkflowDONF uint8 } +const ( + testWorkflowDONID = 1 + testWorkflowDONConfigVersion = 1 +) + func (t *TestMetadataRegistry) LocalNode(ctx context.Context) (capabilities.Node, error) { peerID := p2ptypes.PeerID{} - workflowDON := capabilities.DON{ - ID: 1, - ConfigVersion: 1, + return capabilities.Node{ + PeerID: &peerID, + WorkflowDON: newTestWorkflowDON(peerID, t.WorkflowDONF), + CapabilityDONs: []capabilities.DON{}, + }, nil +} + +func newTestWorkflowDON(peerID p2ptypes.PeerID, faultTolerance uint8) capabilities.DON { + return capabilities.DON{ + ID: testWorkflowDONID, + ConfigVersion: testWorkflowDONConfigVersion, Members: []p2ptypes.PeerID{ peerID, }, - F: 0, + F: faultTolerance, IsPublic: false, AcceptsWorkflows: true, } - return capabilities.Node{ - PeerID: &peerID, - WorkflowDON: workflowDON, - CapabilityDONs: []capabilities.DON{}, - }, nil } func (t *TestMetadataRegistry) NodeByPeerID(ctx context.Context, _ p2ptypes.PeerID) (capabilities.Node, error) { diff --git a/core/capabilities/registry_test.go b/core/capabilities/registry_test.go new file mode 100644 index 00000000000..3dfa7b9f547 --- /dev/null +++ b/core/capabilities/registry_test.go @@ -0,0 +1,39 @@ +package capabilities + +import ( + "context" + "testing" + + "github.com/stretchr/testify/require" +) + +func TestTestMetadataRegistry_LocalNode_UsesConfiguredWorkflowDONF(t *testing.T) { + t.Parallel() + + tests := []struct { + name string + registry TestMetadataRegistry + expectedDonF uint8 + }{ + { + name: "default workflow DON fault tolerance", + registry: TestMetadataRegistry{}, + expectedDonF: 0, + }, + { + name: "mock trigger workflow DON fault tolerance", + registry: TestMetadataRegistry{WorkflowDONF: 1}, + expectedDonF: 1, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + t.Parallel() + + node, err := tt.registry.LocalNode(context.Background()) + require.NoError(t, err) + require.Equal(t, tt.expectedDonF, node.WorkflowDON.F) + }) + } +} diff --git a/core/capabilities/streams/mock_trigger.go b/core/capabilities/streams/mock_trigger.go new file mode 100644 index 00000000000..7778a5d638c --- /dev/null +++ b/core/capabilities/streams/mock_trigger.go @@ -0,0 +1,363 @@ +package streams + +import ( + "context" + "crypto/ecdsa" + "fmt" + "math/big" + "sync" + "time" + + "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/crypto" + "github.com/smartcontractkit/libocr/offchainreporting2plus/chains/evmutil" + ocrTypes "github.com/smartcontractkit/libocr/offchainreporting2plus/types" + + "github.com/smartcontractkit/chainlink-common/pkg/capabilities" + "github.com/smartcontractkit/chainlink-common/pkg/capabilities/datastreams" + "github.com/smartcontractkit/chainlink-common/pkg/capabilities/triggers" + "github.com/smartcontractkit/chainlink-common/pkg/capabilities/triggers/streams" + "github.com/smartcontractkit/chainlink-common/pkg/services" + "github.com/smartcontractkit/chainlink-common/pkg/types/core" + v3 "github.com/smartcontractkit/chainlink-common/pkg/types/mercury/v3" + + "github.com/smartcontractkit/chainlink-evm/pkg/mercury/v3/reportcodec" + "github.com/smartcontractkit/chainlink/v2/core/logger" +) + +const ( + mockTriggerCapabilityName = "mock-streams-trigger" + mockTriggerCapabilityVersion = "1.0.0" + mockTriggerRegisterResolution = 100 + defaultLoopIntervalMs int64 = 1000 + mockSignerFaultTolerance = 1 + mockSignerKeyLength = 32 + mockSignerKeyLastByteIndex = mockSignerKeyLength - 1 + reportExpiryOffsetSeconds = 1_000_000 + initialMockPriceA int64 = 300_000 + initialMockPriceB int64 = 40_000 + initialMockPriceC int64 = 5_000_000 +) + +func RegisterMockTrigger(lggr logger.Logger, capRegistry core.CapabilitiesRegistry) (*MockTriggerService, error) { + ctx := context.Background() + trigger, err := NewMockTriggerService(mockTriggerRegisterResolution, lggr) + if err != nil { + return nil, err + } + if err := trigger.Start(ctx); err != nil { + return nil, err + } + if err := capRegistry.Add(ctx, trigger); err != nil { + _ = trigger.Close() + return nil, err + } + + return trigger, nil +} + +const MockTriggerCapabilityID = mockTriggerCapabilityName + "@" + mockTriggerCapabilityVersion + +var capInfo = capabilities.MustNewCapabilityInfo( + MockTriggerCapabilityID, + capabilities.CapabilityTypeTrigger, + "Mock Streams Trigger", +) + +// Wraps the MercuryTriggerService to produce a trigger with mocked data +type MockTriggerService struct { + *triggers.MercuryTriggerService + meta datastreams.Metadata + signers []*ecdsa.PrivateKey + stopCh services.StopChan + closeOnce sync.Once + wg sync.WaitGroup + loopInterval time.Duration + subscribers map[string][]streams.FeedId + subscribersMu sync.Mutex + lggr logger.Logger +} + +func NewMockTriggerService(tickerResolutionMs int64, lggr logger.Logger) (*MockTriggerService, error) { + trigger, err := triggers.NewMercuryTriggerService(tickerResolutionMs, mockTriggerCapabilityName, mockTriggerCapabilityVersion, lggr) + if err != nil { + return nil, err + } + trigger.CapabilityInfo = capInfo + + if tickerResolutionMs <= 0 { + tickerResolutionMs = defaultLoopIntervalMs + } + + meta, signers, err := newMockMetadataAndSigners() + if err != nil { + return nil, err + } + + // MercuryTrigger is typically wrapped by other modules that ignore the trigger's meta and provide a different one. + // Since we're skipping those wrappers we need to provide our own meta here. + trigger.SetMetaOverride(meta) + + return &MockTriggerService{ + MercuryTriggerService: trigger, + meta: meta, + signers: signers, + stopCh: make(services.StopChan), + loopInterval: time.Duration(tickerResolutionMs) * time.Millisecond, + subscribers: make(map[string][]streams.FeedId), + lggr: lggr, + }, nil +} + +func newMockMetadataAndSigners() (datastreams.Metadata, []*ecdsa.PrivateKey, error) { + meta := datastreams.Metadata{MinRequiredSignatures: 2*mockSignerFaultTolerance + 1} + signers := make([]*ecdsa.PrivateKey, 0, meta.MinRequiredSignatures) + for i := 0; i < meta.MinRequiredSignatures; i++ { + privKey, err := newMockSigner(i + 1) + if err != nil { + return datastreams.Metadata{}, nil, err + } + signers = append(signers, privKey) + meta.Signers = append(meta.Signers, crypto.PubkeyToAddress(privKey.PublicKey).Bytes()) + } + return meta, signers, nil +} + +func newMockSigner(index int) (*ecdsa.PrivateKey, error) { + bytes := make([]byte, mockSignerKeyLength) + lastByte, err := toUint8(index) + if err != nil { + return nil, err + } + bytes[mockSignerKeyLastByteIndex] = lastByte + privKey, err := crypto.ToECDSA(bytes) + if err != nil { + return nil, err + } + return privKey, nil +} + +func (m *MockTriggerService) Start(ctx context.Context) error { + if err := m.MercuryTriggerService.Start(ctx); err != nil { + return err + } + m.wg.Add(1) + go m.loop() + return nil +} + +func (m *MockTriggerService) Close() error { + m.closeOnce.Do(func() { + close(m.stopCh) + }) + m.wg.Wait() + return m.MercuryTriggerService.Close() +} + +func (m *MockTriggerService) RegisterTrigger(ctx context.Context, req capabilities.TriggerRegistrationRequest) (<-chan capabilities.TriggerResponse, error) { + ch, err := m.MercuryTriggerService.RegisterTrigger(ctx, req) + if err != nil { + return nil, err + } + + config, err := m.ValidateConfig(req.Config) + if err != nil { + _ = m.MercuryTriggerService.UnregisterTrigger(ctx, req) + return nil, err + } + m.subscribersMu.Lock() + defer m.subscribersMu.Unlock() + m.subscribers[req.Metadata.WorkflowID] = config.FeedIds + return ch, nil +} + +func (m *MockTriggerService) UnregisterTrigger(ctx context.Context, req capabilities.TriggerRegistrationRequest) error { + err := m.MercuryTriggerService.UnregisterTrigger(ctx, req) + m.subscribersMu.Lock() + defer m.subscribersMu.Unlock() + delete(m.subscribers, req.Metadata.WorkflowID) + return err +} + +const baseTimestamp = 1000000000 + +// NOTE: duplicated from trigger_test.go +func newReport(lggr logger.Logger, feedID [32]byte, price *big.Int, timestamp int64) ([]byte, error) { + uintTimestamp, err := toUint32(timestamp) + if err != nil { + return nil, err + } + expiresAt, err := toUint32(timestamp + reportExpiryOffsetSeconds) + if err != nil { + return nil, err + } + v3Codec := reportcodec.NewReportCodec(feedID, lggr) + raw, err := v3Codec.BuildReport(context.Background(), v3.ReportFields{ + BenchmarkPrice: price, + Timestamp: uintTimestamp, + ValidFromTimestamp: uintTimestamp, + Bid: price, + Ask: price, + LinkFee: price, + NativeFee: price, + ExpiresAt: expiresAt, + }) + if err != nil { + return nil, err + } + return raw, nil +} + +func toUint32(v int64) (uint32, error) { + if v < 0 || v > int64(^uint32(0)) { + return 0, fmt.Errorf("value %d out of uint32 range", v) + } + return uint32(v), nil +} + +func toUint8(v int) (uint8, error) { + if v < 0 || v > int(^uint8(0)) { + return 0, fmt.Errorf("value %d out of uint8 range", v) + } + return uint8(v), nil +} + +func rawReportContext(reportCtx ocrTypes.ReportContext) []byte { + rc := evmutil.RawReportContext(reportCtx) + flat := make([]byte, 0, len(rc)*32) + for _, r := range rc { + flat = append(flat, r[:]...) + } + return flat +} + +func (m *MockTriggerService) loop() { + defer m.wg.Done() + + ticker := time.NewTicker(m.loopInterval) + defer ticker.Stop() + + prices := []int64{initialMockPriceA, initialMockPriceB, initialMockPriceC} + iteration := 0 + + for { + select { + case <-m.stopCh: + return + case <-ticker.C: + } + + incrementPrices(prices) + iteration++ + + timestamp := time.Now().Unix() + reportCtx, err := newReportContext(iteration) + if err != nil { + m.lggr.Errorw("failed to build Mock report context", "err", err, "timestamp", timestamp) + continue + } + reports, err := m.buildReports(timestamp, prices[0], reportCtx) + if err != nil { + m.lggr.Errorw("failed to build Mock reports", "err", err, "timestamp", timestamp) + continue + } + if len(reports) == 0 { + continue + } + + m.lggr.Infow("New set of Mock reports", "timestamp", timestamp, "payload", reports) + if err := m.ProcessReport(reports); err != nil { + m.lggr.Errorw("failed to process Mock reports", "err", err, "timestamp", timestamp, "payload", reports) + } + } +} + +func incrementPrices(prices []int64) { + for i := range prices { + prices[i]++ + } +} + +func newReportContext(iteration int) (ocrTypes.ReportContext, error) { + epoch, err := toUint32(int64(baseTimestamp + iteration)) + if err != nil { + return ocrTypes.ReportContext{}, err + } + return ocrTypes.ReportContext{ + ReportTimestamp: ocrTypes.ReportTimestamp{Epoch: epoch}, + }, nil +} + +func (m *MockTriggerService) buildReports(timestamp, price int64, reportCtx ocrTypes.ReportContext) ([]datastreams.FeedReport, error) { + subscribers := m.snapshotSubscribers() + reports := make([]datastreams.FeedReport, 0, subscriberCount(subscribers)) + for _, feedIDs := range subscribers { + for _, feedID := range feedIDs { + report, err := m.newSignedReport(string(feedID), price, timestamp, reportCtx) + if err != nil { + return nil, err + } + reports = append(reports, report) + } + } + return reports, nil +} + +func (m *MockTriggerService) snapshotSubscribers() map[string][]streams.FeedId { + m.subscribersMu.Lock() + defer m.subscribersMu.Unlock() + + snapshot := make(map[string][]streams.FeedId, len(m.subscribers)) + for workflowID, feedIDs := range m.subscribers { + snapshot[workflowID] = cloneFeedIDs(feedIDs) + } + return snapshot +} + +func subscriberCount(subscribers map[string][]streams.FeedId) int { + total := 0 + for _, feedIDs := range subscribers { + total += len(feedIDs) + } + return total +} + +func cloneFeedIDs(feedIDs []streams.FeedId) []streams.FeedId { + cloned := make([]streams.FeedId, len(feedIDs)) + copy(cloned, feedIDs) + return cloned +} + +func (m *MockTriggerService) newSignedReport(feedID string, price, timestamp int64, reportCtx ocrTypes.ReportContext) (datastreams.FeedReport, error) { + fullReport, err := newReport(m.lggr, common.HexToHash(feedID), big.NewInt(price), timestamp) + if err != nil { + return datastreams.FeedReport{}, fmt.Errorf("build report for feed %s: %w", feedID, err) + } + + report := datastreams.FeedReport{ + FeedID: feedID, + FullReport: fullReport, + ReportContext: rawReportContext(reportCtx), + ObservationTimestamp: timestamp, + } + + if err := m.signReport(&report); err != nil { + return datastreams.FeedReport{}, fmt.Errorf("sign report for feed %s: %w", feedID, err) + } + + return report, nil +} + +func (m *MockTriggerService) signReport(report *datastreams.FeedReport) error { + sigData := append(crypto.Keccak256(report.FullReport), report.ReportContext...) + hash := crypto.Keccak256(sigData) + report.Signatures = make([][]byte, 0, m.meta.MinRequiredSignatures) + for n := 0; n < m.meta.MinRequiredSignatures; n++ { + sig, err := crypto.Sign(hash, m.signers[n]) + if err != nil { + return err + } + report.Signatures = append(report.Signatures, sig) + } + return nil +} diff --git a/core/capabilities/streams/mock_trigger_test.go b/core/capabilities/streams/mock_trigger_test.go new file mode 100644 index 00000000000..1b5d9376751 --- /dev/null +++ b/core/capabilities/streams/mock_trigger_test.go @@ -0,0 +1,139 @@ +package streams + +import ( + "context" + "math/big" + "testing" + "time" + + "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/crypto" + "github.com/stretchr/testify/require" + + ocrTypes "github.com/smartcontractkit/libocr/offchainreporting2plus/types" + + commoncap "github.com/smartcontractkit/chainlink-common/pkg/capabilities" + "github.com/smartcontractkit/chainlink-common/pkg/capabilities/datastreams" + triggercfg "github.com/smartcontractkit/chainlink-common/pkg/capabilities/triggers/streams" + "github.com/smartcontractkit/chainlink-protos/cre/go/values" + + "github.com/smartcontractkit/chainlink/v2/core/logger" +) + +const testFeedID triggercfg.FeedId = "0x0000000000000000000000000000000000000000000000000000000000000001" + +func TestMockTriggerServiceStartClose(t *testing.T) { + t.Parallel() + + svc, err := NewMockTriggerService(10, logger.TestLogger(t)) + require.NoError(t, err) + require.NoError(t, svc.Start(context.Background())) + + done := make(chan error, 1) + go func() { + done <- svc.Close() + }() + + select { + case err := <-done: + require.NoError(t, err) + case <-time.After(time.Second): + t.Fatal("timed out waiting for mock trigger service to close") + } +} + +func TestMockTriggerServiceRegisterEmitUnregister(t *testing.T) { + t.Parallel() + + ctx := context.Background() + svc, err := NewMockTriggerService(200, logger.TestLogger(t)) + require.NoError(t, err) + require.NoError(t, svc.MercuryTriggerService.Start(ctx)) + t.Cleanup(func() { + require.NoError(t, svc.MercuryTriggerService.Close()) + }) + + req := newMockTriggerRequest(t, "workflow-a", "trigger-a", 200) + ch, err := svc.RegisterTrigger(ctx, req) + require.NoError(t, err) + + svc.subscribersMu.Lock() + require.Equal(t, []triggercfg.FeedId{testFeedID}, svc.subscribers[req.Metadata.WorkflowID]) + svc.subscribersMu.Unlock() + + require.NoError(t, svc.ProcessReport([]datastreams.FeedReport{newSignedMockFeedReport(t, svc, testFeedID)})) + + var resp commoncap.TriggerResponse + select { + case resp = <-ch: + case <-time.After(2 * time.Second): + t.Fatal("timed out waiting for mock trigger event") + } + + triggerEvent := datastreams.StreamsTriggerEvent{} + require.NoError(t, resp.Event.Outputs.UnwrapTo(&triggerEvent)) + require.Equal(t, MockTriggerCapabilityID, resp.Event.TriggerType) + require.Len(t, triggerEvent.Payload, 1) + require.Len(t, triggerEvent.Payload[0].Signatures, svc.meta.MinRequiredSignatures) + require.Len(t, triggerEvent.Metadata.Signers, svc.meta.MinRequiredSignatures) + require.Equal(t, svc.meta.MinRequiredSignatures, triggerEvent.Metadata.MinRequiredSignatures) + + require.NoError(t, svc.UnregisterTrigger(ctx, req)) + + svc.subscribersMu.Lock() + require.Empty(t, svc.subscribers) + svc.subscribersMu.Unlock() + + select { + case _, ok := <-ch: + require.False(t, ok) + case <-time.After(time.Second): + t.Fatal("timed out waiting for subscriber channel to close") + } +} + +func newMockTriggerRequest(t *testing.T, workflowID, triggerID string, maxFrequencyMs uint64) commoncap.TriggerRegistrationRequest { + t.Helper() + + cfg, err := values.WrapMap(triggercfg.TriggerConfig{ + FeedIds: []triggercfg.FeedId{testFeedID}, + MaxFrequencyMs: maxFrequencyMs, + }) + require.NoError(t, err) + + return commoncap.TriggerRegistrationRequest{ + TriggerID: triggerID, + Metadata: commoncap.RequestMetadata{ + WorkflowID: workflowID, + }, + Config: cfg, + } +} + +func newSignedMockFeedReport(t *testing.T, svc *MockTriggerService, feedID triggercfg.FeedId) datastreams.FeedReport { + t.Helper() + + timestamp := time.Now().Unix() + reportCtx := ocrTypes.ReportContext{ + ReportTimestamp: ocrTypes.ReportTimestamp{Epoch: uint32(baseTimestamp + 1)}, + } + + report := datastreams.FeedReport{ + FeedID: string(feedID), + ReportContext: rawReportContext(reportCtx), + ObservationTimestamp: timestamp, + } + fullReport, err := newReport(svc.lggr, common.HexToHash(string(feedID)), big.NewInt(123456), timestamp) + require.NoError(t, err) + report.FullReport = fullReport + + sigData := append(crypto.Keccak256(report.FullReport), report.ReportContext...) + hash := crypto.Keccak256(sigData) + for _, signer := range svc.signers { + sig, err := crypto.Sign(hash, signer) + require.NoError(t, err) + report.Signatures = append(report.Signatures, sig) + } + + return report +} diff --git a/core/services/cre/cre.go b/core/services/cre/cre.go index baf87584cc8..9dd8382ef82 100644 --- a/core/services/cre/cre.go +++ b/core/services/cre/cre.go @@ -36,6 +36,7 @@ import ( "github.com/smartcontractkit/chainlink/v2/core/capabilities/localcapmgr" "github.com/smartcontractkit/chainlink/v2/core/capabilities/remote" remotetypes "github.com/smartcontractkit/chainlink/v2/core/capabilities/remote/types" + capStreams "github.com/smartcontractkit/chainlink/v2/core/capabilities/streams" "github.com/smartcontractkit/chainlink/v2/core/config" "github.com/smartcontractkit/chainlink/v2/core/services/job" "github.com/smartcontractkit/chainlink/v2/core/services/keystore" @@ -488,7 +489,7 @@ func (w *dispatcherWrapper) newSubservices( capCfg := cfg.Capabilities() if !capCfg.Peering().Enabled() && !capCfg.SharedPeering().Enabled() { - opts.CapabilitiesRegistry.SetLocalRegistry(&capabilities.TestMetadataRegistry{}) + opts.CapabilitiesRegistry.SetLocalRegistry(newLocalTestMetadataRegistry(capCfg.Local())) return nil, nil } @@ -533,6 +534,15 @@ func (w *dispatcherWrapper) newSubservices( return subs, nil } +func newLocalTestMetadataRegistry(localCfg config.LocalCapabilities) *capabilities.TestMetadataRegistry { + registry := &capabilities.TestMetadataRegistry{} + if localCfg != nil && localCfg.GetCapabilityConfig(capStreams.MockTriggerCapabilityID) != nil { + registry.WorkflowDONF = 1 + } + + return registry +} + // newDispatcherWrapper creates a new dispatcherWrapper service with peer wrappers if peering is enabled func newDispatcherWrapper( cfg Config, diff --git a/core/services/cre/cre_test.go b/core/services/cre/cre_test.go new file mode 100644 index 00000000000..acb787fe3c4 --- /dev/null +++ b/core/services/cre/cre_test.go @@ -0,0 +1,78 @@ +package cre + +import ( + "testing" + + "github.com/stretchr/testify/require" + + capStreams "github.com/smartcontractkit/chainlink/v2/core/capabilities/streams" + "github.com/smartcontractkit/chainlink/v2/core/config" +) + +type testLocalCapabilities struct { + cfgs map[string]config.CapabilityNodeConfig +} + +func (t testLocalCapabilities) RegistryBasedLaunchAllowlist() []string { + return nil +} + +func (t testLocalCapabilities) Capabilities() map[string]config.CapabilityNodeConfig { + return t.cfgs +} + +func (t testLocalCapabilities) IsAllowlisted(string) bool { + return false +} + +func (t testLocalCapabilities) GetCapabilityConfig(capabilityID string) config.CapabilityNodeConfig { + if t.cfgs == nil { + return nil + } + + return t.cfgs[capabilityID] +} + +type testCapabilityNodeConfig struct{} + +func (testCapabilityNodeConfig) BinaryPathOverride() string { + return "" +} + +func (testCapabilityNodeConfig) Config() map[string]string { + return nil +} + +func TestNewLocalTestMetadataRegistry(t *testing.T) { + t.Parallel() + + tests := []struct { + name string + localCfg config.LocalCapabilities + expectedF uint8 + }{ + { + name: "default workflow DON fault tolerance", + localCfg: nil, + expectedF: 0, + }, + { + name: "mock trigger opt-in uses workflow DON fault tolerance one", + localCfg: testLocalCapabilities{ + cfgs: map[string]config.CapabilityNodeConfig{ + capStreams.MockTriggerCapabilityID: testCapabilityNodeConfig{}, + }, + }, + expectedF: 1, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + t.Parallel() + + registry := newLocalTestMetadataRegistry(tt.localCfg) + require.Equal(t, tt.expectedF, registry.WorkflowDONF) + }) + } +} diff --git a/core/services/standardcapabilities/delegate.go b/core/services/standardcapabilities/delegate.go index 7a59fe9e12e..6efab18b06f 100644 --- a/core/services/standardcapabilities/delegate.go +++ b/core/services/standardcapabilities/delegate.go @@ -66,6 +66,7 @@ type Delegate struct { creSettings core.SettingsBroadcaster ocrConfigService capregconfig.OCRConfigService localCfg coreconfig.LocalCapabilities + initErr error isNewlyCreatedJob bool } @@ -99,6 +100,11 @@ func NewDelegate( localCfg coreconfig.LocalCapabilities, opts ...func(*gateway.RoundRobinSelector), ) *Delegate { + initErr := registerOptionalMockStreamsTrigger(logger, localCfg, registry) + if initErr != nil { + logger.Errorw("Failed to register optional mock streams trigger", "err", initErr) + } + return &Delegate{ logger: logger, ds: ds, @@ -119,6 +125,7 @@ func NewDelegate( creSettings: creSettings, ocrConfigService: ocrConfigService, localCfg: localCfg, + initErr: initErr, selectorOpts: opts, } } @@ -133,6 +140,10 @@ func (d *Delegate) BeforeJobCreated(job job.Job) { } func (d *Delegate) ServicesForSpec(ctx context.Context, spec job.Job) ([]job.ServiceCtx, error) { + if d.initErr != nil { + return nil, d.initErr + } + command := spec.StandardCapabilitiesSpec.Command configJSON := spec.StandardCapabilitiesSpec.Config @@ -159,6 +170,10 @@ func (d *Delegate) NewServices( externalJobID uuid.UUID, oracleFactoryConfig job.OracleFactoryConfig, ) ([]job.ServiceCtx, error) { + if d.initErr != nil { + return nil, d.initErr + } + log := d.logger.Named("StandardCapabilities").Named(strconv.Itoa(int(jobID))).Named(jobName) kvStore := job.NewKVStore(jobID, d.ds) diff --git a/core/services/standardcapabilities/local_capabilities.go b/core/services/standardcapabilities/local_capabilities.go new file mode 100644 index 00000000000..a8b9aad85c8 --- /dev/null +++ b/core/services/standardcapabilities/local_capabilities.go @@ -0,0 +1,22 @@ +package standardcapabilities + +import ( + "github.com/smartcontractkit/chainlink-common/pkg/types/core" + + capStreams "github.com/smartcontractkit/chainlink/v2/core/capabilities/streams" + coreconfig "github.com/smartcontractkit/chainlink/v2/core/config" + "github.com/smartcontractkit/chainlink/v2/core/logger" +) + +func shouldRegisterMockStreamsTrigger(localCfg coreconfig.LocalCapabilities) bool { + return localCfg != nil && localCfg.GetCapabilityConfig(capStreams.MockTriggerCapabilityID) != nil +} + +func registerOptionalMockStreamsTrigger(lggr logger.Logger, localCfg coreconfig.LocalCapabilities, registry core.CapabilitiesRegistry) error { + if !shouldRegisterMockStreamsTrigger(localCfg) { + return nil + } + + _, err := capStreams.RegisterMockTrigger(lggr, registry) + return err +} diff --git a/core/services/standardcapabilities/local_capabilities_test.go b/core/services/standardcapabilities/local_capabilities_test.go new file mode 100644 index 00000000000..c3a6cc0f61c --- /dev/null +++ b/core/services/standardcapabilities/local_capabilities_test.go @@ -0,0 +1,80 @@ +package standardcapabilities + +import ( + "testing" + + "github.com/stretchr/testify/require" + + capStreams "github.com/smartcontractkit/chainlink/v2/core/capabilities/streams" + "github.com/smartcontractkit/chainlink/v2/core/config" +) + +type mockTriggerLocalCapabilities struct { + cfgs map[string]config.CapabilityNodeConfig +} + +func (s mockTriggerLocalCapabilities) RegistryBasedLaunchAllowlist() []string { + return nil +} + +func (s mockTriggerLocalCapabilities) Capabilities() map[string]config.CapabilityNodeConfig { + return s.cfgs +} + +func (s mockTriggerLocalCapabilities) IsAllowlisted(string) bool { + return false +} + +func (s mockTriggerLocalCapabilities) GetCapabilityConfig(capabilityID string) config.CapabilityNodeConfig { + if s.cfgs == nil { + return nil + } + return s.cfgs[capabilityID] +} + +type mockTriggerCapabilityNodeConfig struct{} + +func (mockTriggerCapabilityNodeConfig) BinaryPathOverride() string { + return "" +} + +func (mockTriggerCapabilityNodeConfig) Config() map[string]string { + return nil +} + +func TestShouldRegisterMockStreamsTrigger(t *testing.T) { + t.Parallel() + + tests := []struct { + name string + localCfg config.LocalCapabilities + want bool + }{ + {name: "nil config", localCfg: nil, want: false}, + { + name: "different local capability", + localCfg: mockTriggerLocalCapabilities{ + cfgs: map[string]config.CapabilityNodeConfig{ + "cron@1.0.0": mockTriggerCapabilityNodeConfig{}, + }, + }, + want: false, + }, + { + name: "mock trigger opted in", + localCfg: mockTriggerLocalCapabilities{ + cfgs: map[string]config.CapabilityNodeConfig{ + capStreams.MockTriggerCapabilityID: mockTriggerCapabilityNodeConfig{}, + }, + }, + want: true, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + t.Parallel() + require.Equal(t, tt.want, shouldRegisterMockStreamsTrigger(tt.localCfg)) + }) + } +} diff --git a/system-tests/tests/test-helpers/before_suite.go b/system-tests/tests/test-helpers/before_suite.go index 8cc18a6562d..d65815a48c5 100644 --- a/system-tests/tests/test-helpers/before_suite.go +++ b/system-tests/tests/test-helpers/before_suite.go @@ -277,6 +277,12 @@ func authorizePerTestWorkflowSignerIfNeeded(t *testing.T, sharedEnv *ttypes.Test workflowSignerAuthM.Lock() defer workflowSignerAuthM.Unlock() + allowed, err = registry.IsAllowedSigner(rootRegistryChain.SethClient.NewCallOpts(), signer) + require.NoError(t, err, "failed to re-check signer allowlist status") + if allowed { + return + } + _, err = rootRegistryChain.SethClient.Decode(registry.UpdateAllowedSigners(rootRegistryChain.SethClient.NewTXOpts(), []common.Address{signer}, true)) require.NoError(t, err, "failed to authorize per-test signer") }