Skip to content
15 changes: 10 additions & 5 deletions core/capabilities/launcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -490,7 +490,7 @@ func (w *launcher) addRemoteCapability(ctx context.Context, cid string, capabili

methodConfig := capabilityConfig.CapabilityMethodConfig
if methodConfig != nil { // v2 capability - handle via CombinedClient
errAdd := w.addRemoteCapabilityV2(ctx, capability.ID, methodConfig, myDON, remoteDON)
errAdd := w.addRemoteCapabilityV2(ctx, capability.ID, methodConfig, myDON, remoteDON, localRegistry)
if errAdd != nil {
return fmt.Errorf("failed to add remote v2 capability %s: %w", capability.ID, errAdd)
}
Expand Down Expand Up @@ -581,7 +581,7 @@ func (w *launcher) addRemoteCapability(ctx context.Context, cid string, capabili
w.cachedShims.executableClients[shimKey] = execCap
}
// V1 capabilities read transmission schedule from every request
if errCfg := execCap.SetConfig(info, myDON.DON, defaultTargetRequestTimeout, nil); errCfg != nil {
if errCfg := execCap.SetConfig(info, myDON.DON, defaultTargetRequestTimeout, nil, nil); errCfg != nil {
return nil, fmt.Errorf("failed to set trigger config: %w", errCfg)
}
return execCap.(capabilityService), nil
Expand All @@ -607,7 +607,7 @@ func (w *launcher) addRemoteCapability(ctx context.Context, cid string, capabili
w.cachedShims.executableClients[shimKey] = execCap
}
// V1 capabilities read transmission schedule from every request
if errCfg := execCap.SetConfig(info, myDON.DON, defaultTargetRequestTimeout, nil); errCfg != nil {
if errCfg := execCap.SetConfig(info, myDON.DON, defaultTargetRequestTimeout, nil, nil); errCfg != nil {
return nil, fmt.Errorf("failed to set trigger config: %w", errCfg)
}
return execCap.(capabilityService), nil
Expand Down Expand Up @@ -914,7 +914,7 @@ func signersFor(don registrysyncer.DON, localRegistry *registrysyncer.LocalRegis
}

// Add a V2 capability with multiple methods, using CombinedClient.
func (w *launcher) addRemoteCapabilityV2(ctx context.Context, capID string, methodConfig map[string]capabilities.CapabilityMethodConfig, myDON registrysyncer.DON, remoteDON registrysyncer.DON) error {
func (w *launcher) addRemoteCapabilityV2(ctx context.Context, capID string, methodConfig map[string]capabilities.CapabilityMethodConfig, myDON registrysyncer.DON, remoteDON registrysyncer.DON, localRegistry *registrysyncer.LocalRegistry) error {
info, err := capabilities.NewRemoteCapabilityInfo(
capID,
capabilities.CapabilityTypeCombined,
Expand Down Expand Up @@ -969,7 +969,12 @@ func (w *launcher) addRemoteCapabilityV2(ctx context.Context, capID string, meth
Schedule: transmission.EnumToString(config.RemoteExecutableConfig.TransmissionSchedule),
DeltaStage: config.RemoteExecutableConfig.DeltaStage,
}
err := client.SetConfig(info, myDON.DON, config.RemoteExecutableConfig.RequestTimeout, transmissionConfig)

signers, err := signersFor(remoteDON, localRegistry)
if err != nil {
return fmt.Errorf("failed to get signers for executable client: %w", err)
}
err = client.SetConfig(info, myDON.DON, config.RemoteExecutableConfig.RequestTimeout, transmissionConfig, signers)
if err != nil {
w.lggr.Errorw("failed to update client config", "capID", capID, "method", method, "error", err)
continue
Expand Down
9 changes: 6 additions & 3 deletions core/capabilities/remote/executable/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,12 +46,14 @@ type dynamicConfig struct {
requestTimeout time.Duration
// Has to be set only for V2 capabilities. V1 capabilities read transmission schedule from every request.
transmissionConfig *transmission.TransmissionConfig
// Has to be set only for V2 capabilities using OCR.
signers [][]byte
}

type Client interface {
commoncap.ExecutableCapability
Receive(ctx context.Context, msg *types.MessageBody)
SetConfig(remoteCapabilityInfo commoncap.CapabilityInfo, localDonInfo commoncap.DON, requestTimeout time.Duration, transmissionConfig *transmission.TransmissionConfig) error
SetConfig(remoteCapabilityInfo commoncap.CapabilityInfo, localDonInfo commoncap.DON, requestTimeout time.Duration, transmissionConfig *transmission.TransmissionConfig, signers [][]byte) error
}

var _ Client = &client{}
Expand All @@ -78,7 +80,7 @@ func NewClient(capabilityID string, capMethodName string, dispatcher types.Dispa

// SetConfig sets the remote capability configuration dynamically
// TransmissionConfig has to be set only for V2 capabilities. V1 capabilities read transmission schedule from every request.
func (c *client) SetConfig(remoteCapabilityInfo commoncap.CapabilityInfo, localDonInfo commoncap.DON, requestTimeout time.Duration, transmissionConfig *transmission.TransmissionConfig) error {
func (c *client) SetConfig(remoteCapabilityInfo commoncap.CapabilityInfo, localDonInfo commoncap.DON, requestTimeout time.Duration, transmissionConfig *transmission.TransmissionConfig, signers [][]byte) error {
if remoteCapabilityInfo.ID == "" || remoteCapabilityInfo.ID != c.capabilityID {
return fmt.Errorf("capability info provided does not match the client's capabilityID: %s != %s", remoteCapabilityInfo.ID, c.capabilityID)
}
Expand All @@ -98,6 +100,7 @@ func (c *client) SetConfig(remoteCapabilityInfo commoncap.CapabilityInfo, localD
localDONInfo: localDonInfo,
requestTimeout: requestTimeout,
transmissionConfig: transmissionConfig,
signers: signers,
})
c.lggr.Infow("SetConfig", "remoteDONName", remoteCapabilityInfo.DON.Name, "remoteDONID", remoteCapabilityInfo.DON.ID, "requestTimeout", requestTimeout, "transmissionConfig", transmissionConfig)
return nil
Expand Down Expand Up @@ -234,7 +237,7 @@ func (c *client) Execute(ctx context.Context, capReq commoncap.CapabilityRequest
}

req, err := request.NewClientExecuteRequest(ctx, c.lggr, capReq, cfg.remoteCapabilityInfo, cfg.localDONInfo, c.dispatcher,
cfg.requestTimeout, cfg.transmissionConfig, c.capMethodName)
cfg.requestTimeout, cfg.transmissionConfig, c.capMethodName, cfg.signers)
if err != nil {
return commoncap.CapabilityResponse{}, fmt.Errorf("failed to create client request: %w", err)
}
Expand Down
18 changes: 9 additions & 9 deletions core/capabilities/remote/executable/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -243,7 +243,7 @@ func testClient(t *testing.T, numWorkflowPeers int, workflowNodeResponseTimeout
for i := range numWorkflowPeers {
workflowPeerDispatcher := broker.NewDispatcherForNode(workflowPeers[i])
caller := executable.NewClient(capInfo.ID, "", workflowPeerDispatcher, lggr)
err := caller.SetConfig(capInfo, workflowDonInfo, workflowNodeResponseTimeout, nil)
err := caller.SetConfig(capInfo, workflowDonInfo, workflowNodeResponseTimeout, nil, nil)
require.NoError(t, err)
servicetest.Run(t, caller)
broker.RegisterReceiverNode(workflowPeers[i], caller)
Expand Down Expand Up @@ -403,7 +403,7 @@ func TestClient_SetConfig(t *testing.T) {
DeltaStage: 10 * time.Millisecond,
}

err := client.SetConfig(validCapInfo, validDonInfo, validTimeout, transmissionConfig)
err := client.SetConfig(validCapInfo, validDonInfo, validTimeout, transmissionConfig, nil)
require.NoError(t, err)

// Verify config was set
Expand All @@ -418,7 +418,7 @@ func TestClient_SetConfig(t *testing.T) {
CapabilityType: commoncap.CapabilityTypeAction,
}

err := client.SetConfig(invalidCapInfo, validDonInfo, validTimeout, nil)
err := client.SetConfig(invalidCapInfo, validDonInfo, validTimeout, nil, nil)
require.Error(t, err)
assert.Contains(t, err.Error(), "capability info provided does not match the client's capabilityID")
assert.Contains(t, err.Error(), "different_capability@1.0.0 != test_capability@1.0.0")
Expand All @@ -431,15 +431,15 @@ func TestClient_SetConfig(t *testing.T) {
F: 0,
}

err := client.SetConfig(validCapInfo, invalidDonInfo, validTimeout, nil)
err := client.SetConfig(validCapInfo, invalidDonInfo, validTimeout, nil, nil)
require.Error(t, err)
assert.Contains(t, err.Error(), "empty localDonInfo provided")
})

t.Run("successful config update", func(t *testing.T) {
// Set initial config
initialTimeout := 10 * time.Second
err := client.SetConfig(validCapInfo, validDonInfo, initialTimeout, nil)
err := client.SetConfig(validCapInfo, validDonInfo, initialTimeout, nil, nil)
require.NoError(t, err)

// Replace with new config
Expand All @@ -450,7 +450,7 @@ func TestClient_SetConfig(t *testing.T) {
F: 1,
}

err = client.SetConfig(validCapInfo, newDonInfo, newTimeout, nil)
err = client.SetConfig(validCapInfo, newDonInfo, newTimeout, nil, nil)
require.NoError(t, err)

// Verify the config was completely replaced
Expand Down Expand Up @@ -494,7 +494,7 @@ func TestClient_SetConfig_StartClose(t *testing.T) {
})

t.Run("start succeeds after config set", func(t *testing.T) {
require.NoError(t, client.SetConfig(validCapInfo, validDonInfo, validTimeout, nil))
require.NoError(t, client.SetConfig(validCapInfo, validDonInfo, validTimeout, nil, nil))
require.NoError(t, client.Start(ctx))
require.NoError(t, client.Close())
})
Expand All @@ -504,12 +504,12 @@ func TestClient_SetConfig_StartClose(t *testing.T) {
freshClient := executable.NewClient(capabilityID, "execute", dispatcher, lggr)

// Set initial config and start
require.NoError(t, freshClient.SetConfig(validCapInfo, validDonInfo, validTimeout, nil))
require.NoError(t, freshClient.SetConfig(validCapInfo, validDonInfo, validTimeout, nil, nil))
require.NoError(t, freshClient.Start(ctx))

// Update config while running
validCapInfo.Description = "new description"
require.NoError(t, freshClient.SetConfig(validCapInfo, validDonInfo, validTimeout, nil))
require.NoError(t, freshClient.SetConfig(validCapInfo, validDonInfo, validTimeout, nil, nil))

// Verify config was updated
info, err := freshClient.Info(ctx)
Expand Down
2 changes: 1 addition & 1 deletion core/capabilities/remote/executable/endtoend_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -309,7 +309,7 @@ func testRemoteExecutableCapability(ctx context.Context, t *testing.T, underlyin
for i := range numWorkflowPeers {
workflowPeerDispatcher := broker.NewDispatcherForNode(workflowPeers[i])
workflowNode := executable.NewClient(capInfo.ID, "", workflowPeerDispatcher, lggr)
err := workflowNode.SetConfig(capInfo, workflowDonInfo, workflowNodeTimeout, nil)
err := workflowNode.SetConfig(capInfo, workflowDonInfo, workflowNodeTimeout, nil, nil)
require.NoError(t, err)
servicetest.Run(t, workflowNode)
broker.RegisterReceiverNode(workflowPeers[i], workflowNode)
Expand Down
Loading
Loading