diff --git a/.changeset/confidential-workflow-execution.md b/.changeset/confidential-workflow-execution.md new file mode 100644 index 00000000000..952e5207d47 --- /dev/null +++ b/.changeset/confidential-workflow-execution.md @@ -0,0 +1,5 @@ +--- +"chainlink": minor +--- + +Add confidential workflow execution: ConfidentialModule, relay handler, gateway wiring, single-DON capability fix #added #db_update diff --git a/core/scripts/cre/environment/environment/workflow.go b/core/scripts/cre/environment/environment/workflow.go index 407c25f7eb0..631e6e226a1 100644 --- a/core/scripts/cre/environment/environment/workflow.go +++ b/core/scripts/cre/environment/environment/workflow.go @@ -475,7 +475,7 @@ func deployWorkflow( fmt.Printf("\n⚙️ Registering workflow '%s' with the workflow registry\n\n", workflowNameFlag) - workflowID, registerErr := creworkflow.RegisterWithContract(ctx, sethClient, common.HexToAddress(workflowRegistryAddress), workflowRegistryVersion, uint64(donIDFlag), workflowNameFlag, "file://"+wasmWorkflowFilePathFlag, configPath, secretsPath, &containerTargetDirFlag) + workflowID, registerErr := creworkflow.RegisterWithContract(ctx, sethClient, common.HexToAddress(workflowRegistryAddress), workflowRegistryVersion, uint64(donIDFlag), workflowNameFlag, "file://"+wasmWorkflowFilePathFlag, configPath, secretsPath, nil, &containerTargetDirFlag) if registerErr != nil { return errors.Wrapf(registerErr, "❌ failed to register workflow %s", workflowNameFlag) } diff --git a/core/services/cre/cre.go b/core/services/cre/cre.go index baf87584cc8..70b8dc46024 100644 --- a/core/services/cre/cre.go +++ b/core/services/cre/cre.go @@ -32,6 +32,7 @@ import ( "github.com/smartcontractkit/chainlink/v2/core/capabilities" "github.com/smartcontractkit/chainlink/v2/core/capabilities/compute" + "github.com/smartcontractkit/chainlink/v2/core/capabilities/confidentialrelay" gatewayconnector "github.com/smartcontractkit/chainlink/v2/core/capabilities/gateway_connector" "github.com/smartcontractkit/chainlink/v2/core/capabilities/localcapmgr" "github.com/smartcontractkit/chainlink/v2/core/capabilities/remote" @@ -169,6 +170,13 @@ func (s *Services) newSubservices( } s.GatewayConnectorWrapper = gatewayConnectorWrapper srvs = append(srvs, gatewayConnectorWrapper) + + relayService := confidentialrelay.NewService( + gatewayConnectorWrapper, + opts.CapabilitiesRegistry, + lggr, + ) + srvs = append(srvs, relayService) } if cfg.CRE().Linking().URL() != "" { diff --git a/core/services/gateway/handler_factory.go b/core/services/gateway/handler_factory.go index 76172b3dc9b..c2338503eb7 100644 --- a/core/services/gateway/handler_factory.go +++ b/core/services/gateway/handler_factory.go @@ -18,6 +18,7 @@ import ( "github.com/smartcontractkit/chainlink/v2/core/services/gateway/handlers" "github.com/smartcontractkit/chainlink/v2/core/services/gateway/handlers/capabilities" v2 "github.com/smartcontractkit/chainlink/v2/core/services/gateway/handlers/capabilities/v2" + "github.com/smartcontractkit/chainlink/v2/core/services/gateway/handlers/confidentialrelay" "github.com/smartcontractkit/chainlink/v2/core/services/gateway/handlers/functions" "github.com/smartcontractkit/chainlink/v2/core/services/gateway/handlers/vault" "github.com/smartcontractkit/chainlink/v2/core/services/gateway/network" @@ -25,11 +26,12 @@ import ( ) const ( - FunctionsHandlerType HandlerType = "functions" - DummyHandlerType HandlerType = "dummy" - WebAPICapabilitiesType HandlerType = "web-api-capabilities" // Handler for v0.1 HTTP capabilities for DAG workflows - HTTPCapabilityType HandlerType = "http-capabilities" // Handler for v1.0 HTTP capabilities for NoDAG workflows - VaultHandlerType HandlerType = "vault" + FunctionsHandlerType HandlerType = "functions" + DummyHandlerType HandlerType = "dummy" + WebAPICapabilitiesType HandlerType = "web-api-capabilities" // Handler for v0.1 HTTP capabilities for DAG workflows + HTTPCapabilityType HandlerType = "http-capabilities" // Handler for v1.0 HTTP capabilities for NoDAG workflows + VaultHandlerType HandlerType = "vault" + ConfidentialRelayHandlerType HandlerType = "confidential-compute-relay" ) type handlerFactory struct { @@ -87,6 +89,8 @@ func (hf *handlerFactory) NewHandler( case VaultHandlerType: requestAuthorizer := vaultcap.NewRequestAuthorizer(hf.lggr, hf.workflowRegistrySyncer) return vault.NewHandler(handlerConfig, donConfig, don, hf.capabilitiesRegistry, requestAuthorizer, hf.lggr, clockwork.NewRealClock(), hf.lf) + case ConfidentialRelayHandlerType: + return confidentialrelay.NewHandler(handlerConfig, donConfig, don, hf.lggr, clockwork.NewRealClock(), hf.lf) default: return nil, fmt.Errorf("unsupported handler type %s", handlerType) } diff --git a/core/services/gateway/handlers/confidentialrelay/aggregator.go b/core/services/gateway/handlers/confidentialrelay/aggregator.go new file mode 100644 index 00000000000..5eb00664571 --- /dev/null +++ b/core/services/gateway/handlers/confidentialrelay/aggregator.go @@ -0,0 +1,56 @@ +package confidentialrelay + +import ( + "encoding/json" + "errors" + "fmt" + + jsonrpc "github.com/smartcontractkit/chainlink-common/pkg/jsonrpc2" + "github.com/smartcontractkit/chainlink-common/pkg/logger" +) + +var ( + errInsufficientResponsesForQuorum = errors.New("insufficient valid responses to reach quorum") + errQuorumUnobtainable = errors.New("quorum unobtainable") +) + +type aggregator struct{} + +func (a *aggregator) Aggregate(resps map[string]jsonrpc.Response[json.RawMessage], donF int, donMembersCount int, l logger.Logger) (*jsonrpc.Response[json.RawMessage], error) { + // F+1 (QuorumFPlusOne) is sufficient because each relay node calls the + // target DON (Vault or capability) through CRE's standard capability + // dispatch, which includes DON-level consensus. Every honest relay node + // receives the same consensus-aggregated response and performs deterministic + // translation, producing byte-identical outputs. F+1 matching responses + // therefore guarantees at least one honest node vouched for the result. + requiredQuorum := donF + 1 + + if len(resps) < requiredQuorum { + return nil, errInsufficientResponsesForQuorum + } + + shaToCount := map[string]int{} + maxShaToCount := 0 + for _, r := range resps { + sha, err := r.Digest() + if err != nil { + l.Errorw("failed to compute digest of response during quorum validation, skipping...", "error", err) + continue + } + shaToCount[sha]++ + if shaToCount[sha] > maxShaToCount { + maxShaToCount = shaToCount[sha] + } + if shaToCount[sha] >= requiredQuorum { + return &r, nil + } + } + + remainingResponses := donMembersCount - len(resps) + if maxShaToCount+remainingResponses < requiredQuorum { + l.Warnw("quorum unattainable for request", "requiredQuorum", requiredQuorum, "remainingResponses", remainingResponses, "maxShaToCount", maxShaToCount) + return nil, fmt.Errorf("%w: requiredQuorum=%d, maxShaToCount=%d, remainingResponses=%d", errQuorumUnobtainable, requiredQuorum, maxShaToCount, remainingResponses) + } + + return nil, errInsufficientResponsesForQuorum +} diff --git a/core/services/gateway/handlers/confidentialrelay/handler.go b/core/services/gateway/handlers/confidentialrelay/handler.go new file mode 100644 index 00000000000..294a960d77e --- /dev/null +++ b/core/services/gateway/handlers/confidentialrelay/handler.go @@ -0,0 +1,461 @@ +package confidentialrelay + +import ( + "context" + "encoding/json" + "errors" + "fmt" + "strconv" + "sync" + "time" + + "github.com/jonboulle/clockwork" + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/metric" + "golang.org/x/sync/errgroup" + + "github.com/smartcontractkit/chainlink-common/pkg/beholder" + relaytypes "github.com/smartcontractkit/chainlink-common/pkg/capabilities/v2/actions/confidentialrelay" + jsonrpc "github.com/smartcontractkit/chainlink-common/pkg/jsonrpc2" + "github.com/smartcontractkit/chainlink-common/pkg/logger" + "github.com/smartcontractkit/chainlink-common/pkg/services" + "github.com/smartcontractkit/chainlink-common/pkg/settings/cresettings" + "github.com/smartcontractkit/chainlink-common/pkg/settings/limits" + "github.com/smartcontractkit/chainlink/v2/core/services/gateway/api" + "github.com/smartcontractkit/chainlink/v2/core/services/gateway/config" + gwhandlers "github.com/smartcontractkit/chainlink/v2/core/services/gateway/handlers" +) + +const ( + defaultCleanUpPeriod = 5 * time.Second + + // Re-exported from chainlink-common for local use and test convenience. + MethodSecretsGet = relaytypes.MethodSecretsGet + MethodCapabilityExec = relaytypes.MethodCapabilityExec +) + +var _ gwhandlers.Handler = (*handler)(nil) + +type metrics struct { + requestInternalError metric.Int64Counter + requestUserError metric.Int64Counter + requestSuccess metric.Int64Counter +} + +func newMetrics() (*metrics, error) { + requestInternalError, err := beholder.GetMeter().Int64Counter("confidential_relay_gateway_request_internal_error") + if err != nil { + return nil, fmt.Errorf("failed to register internal error counter: %w", err) + } + + requestUserError, err := beholder.GetMeter().Int64Counter("confidential_relay_gateway_request_user_error") + if err != nil { + return nil, fmt.Errorf("failed to register user error counter: %w", err) + } + + requestSuccess, err := beholder.GetMeter().Int64Counter("confidential_relay_gateway_request_success") + if err != nil { + return nil, fmt.Errorf("failed to register success counter: %w", err) + } + + return &metrics{ + requestInternalError: requestInternalError, + requestUserError: requestUserError, + requestSuccess: requestSuccess, + }, nil +} + +type activeRequest struct { + req jsonrpc.Request[json.RawMessage] + responses map[string]*jsonrpc.Response[json.RawMessage] + mu sync.Mutex + + createdAt time.Time + gwhandlers.Callback +} + +func (ar *activeRequest) addResponseForNode(nodeAddr string, resp *jsonrpc.Response[json.RawMessage]) bool { + ar.mu.Lock() + defer ar.mu.Unlock() + _, exists := ar.responses[nodeAddr] + if exists { + return false + } + + ar.responses[nodeAddr] = resp + return true +} + +func (ar *activeRequest) copiedResponses() map[string]jsonrpc.Response[json.RawMessage] { + ar.mu.Lock() + defer ar.mu.Unlock() + copied := make(map[string]jsonrpc.Response[json.RawMessage], len(ar.responses)) + for k, response := range ar.responses { + var copiedResponse jsonrpc.Response[json.RawMessage] + if response != nil { + copiedResponse = *response + if response.Result != nil { + copiedResult := *response.Result + copiedResponse.Result = &copiedResult + } + if response.Error != nil { + copiedError := *response.Error + copiedResponse.Error = &copiedError + } + } + copied[k] = copiedResponse + } + return copied +} + +type relayAggregator interface { + Aggregate(resps map[string]jsonrpc.Response[json.RawMessage], donF int, donMembersCount int, l logger.Logger) (*jsonrpc.Response[json.RawMessage], error) +} + +type Config struct { + RequestTimeoutSec int `json:"requestTimeoutSec"` +} + +type handler struct { + services.StateMachine + donConfig *config.DONConfig + don gwhandlers.DON + codec api.JsonRPCCodec + lggr logger.Logger + mu sync.RWMutex + stopCh services.StopChan + + globalNodeRateLimiter limits.RateLimiter + perNodeRateLimiters map[string]limits.RateLimiter + requestTimeout time.Duration + + activeRequests map[string]*activeRequest + metrics *metrics + + aggregator relayAggregator + + clock clockwork.Clock +} + +func (h *handler) HealthReport() map[string]error { + return map[string]error{h.Name(): h.Healthy()} +} + +func (h *handler) Name() string { + return h.lggr.Name() +} + +func NewHandler(methodConfig json.RawMessage, donConfig *config.DONConfig, don gwhandlers.DON, lggr logger.Logger, clock clockwork.Clock, limitsFactory limits.Factory) (*handler, error) { + var cfg Config + if err := json.Unmarshal(methodConfig, &cfg); err != nil { + return nil, fmt.Errorf("failed to unmarshal method config: %w", err) + } + + if cfg.RequestTimeoutSec == 0 { + cfg.RequestTimeoutSec = 30 + } + + globalNodeRateLimiter, err := limitsFactory.MakeRateLimiter(cresettings.Default.GatewayConfidentialRelayGlobalRate) + if err != nil { + return nil, fmt.Errorf("failed to create global node rate limiter: %w", err) + } + + perNodeRateLimiters := make(map[string]limits.RateLimiter, len(donConfig.Members)) + for _, member := range donConfig.Members { + rl, makeErr := limitsFactory.MakeRateLimiter(cresettings.Default.GatewayConfidentialRelayPerNodeRate) + if makeErr != nil { + return nil, fmt.Errorf("failed to create per-node rate limiter for %s: %w", member.Address, makeErr) + } + perNodeRateLimiters[member.Address] = rl + } + + metrics, err := newMetrics() + if err != nil { + return nil, fmt.Errorf("failed to create metrics: %w", err) + } + + return &handler{ + donConfig: donConfig, + don: don, + lggr: logger.Named(lggr, "ConfidentialRelayHandler:"+donConfig.DonId), + requestTimeout: time.Duration(cfg.RequestTimeoutSec) * time.Second, + globalNodeRateLimiter: globalNodeRateLimiter, + perNodeRateLimiters: perNodeRateLimiters, + activeRequests: make(map[string]*activeRequest), + mu: sync.RWMutex{}, + stopCh: make(services.StopChan), + metrics: metrics, + aggregator: &aggregator{}, + clock: clock, + }, nil +} + +func (h *handler) Start(_ context.Context) error { + return h.StartOnce("ConfidentialRelayHandler", func() error { + h.lggr.Info("starting confidential relay handler") + go func() { + ctx, cancel := h.stopCh.NewCtx() + defer cancel() + ticker := h.clock.NewTicker(defaultCleanUpPeriod) + defer ticker.Stop() + for { + select { + case <-ticker.Chan(): + h.removeExpiredRequests(ctx) + case <-h.stopCh: + return + } + } + }() + return nil + }) +} + +func (h *handler) Close() error { + return h.StopOnce("ConfidentialRelayHandler", func() error { + h.lggr.Info("closing confidential relay handler") + close(h.stopCh) + var err error + if h.globalNodeRateLimiter != nil { + err = errors.Join(err, h.globalNodeRateLimiter.Close()) + } + for _, rl := range h.perNodeRateLimiters { + err = errors.Join(err, rl.Close()) + } + return err + }) +} + +func (h *handler) removeExpiredRequests(ctx context.Context) { + h.mu.RLock() + var expiredRequests []*activeRequest + now := h.clock.Now() + for _, userRequest := range h.activeRequests { + if now.Sub(userRequest.createdAt) > h.requestTimeout { + expiredRequests = append(expiredRequests, userRequest) + } + } + h.mu.RUnlock() + + for _, er := range expiredRequests { + responses := er.copiedResponses() + h.lggr.Debugw("request expired without quorum", "requestID", er.req.ID, "responseCount", len(responses), "required", h.donConfig.F+1) + err := h.sendResponseAndCleanup(ctx, er, h.constructErrorResponse(er.req, api.RequestTimeoutError, fmt.Errorf("request expired: got %d/%d responses", len(responses), h.donConfig.F+1))) + if err != nil { + h.lggr.Errorw("error sending response to user", "requestID", er.req.ID, "error", err) + } + } +} + +func (h *handler) Methods() []string { + return []string{MethodSecretsGet, MethodCapabilityExec} +} + +func (h *handler) HandleLegacyUserMessage(_ context.Context, _ *api.Message, _ gwhandlers.Callback) error { + return errors.New("confidential relay handler does not support legacy messages") +} + +func (h *handler) HandleJSONRPCUserMessage(ctx context.Context, req jsonrpc.Request[json.RawMessage], callback gwhandlers.Callback) error { + if req.ID == "" { + return errors.New("request ID cannot be empty") + } + if len(req.ID) > 200 { + return errors.New("request ID is too long: " + strconv.Itoa(len(req.ID)) + ". max is 200 characters") + } + + l := logger.With(h.lggr, "method", req.Method, "requestID", req.ID) + l.Debugw("handling confidential relay request") + + ar, err := h.newActiveRequest(req, callback) + if err != nil { + return err + } + + return h.fanOutToNodes(ctx, l, ar) +} + +func (h *handler) newActiveRequest(req jsonrpc.Request[json.RawMessage], callback gwhandlers.Callback) (*activeRequest, error) { + h.mu.Lock() + defer h.mu.Unlock() + if h.activeRequests[req.ID] != nil { + h.lggr.Errorw("request id already exists", "requestID", req.ID) + return nil, errors.New("request ID already exists: " + req.ID) + } + ar := &activeRequest{ + Callback: callback, + req: req, + createdAt: h.clock.Now(), + responses: map[string]*jsonrpc.Response[json.RawMessage]{}, + } + h.activeRequests[req.ID] = ar + return ar, nil +} + +func (h *handler) getActiveRequest(requestID string) *activeRequest { + h.mu.RLock() + defer h.mu.RUnlock() + return h.activeRequests[requestID] +} + +func (h *handler) HandleNodeMessage(ctx context.Context, resp *jsonrpc.Response[json.RawMessage], nodeAddr string) error { + l := logger.With(h.lggr, "method", resp.Method, "requestID", resp.ID, "nodeAddr", nodeAddr) + l.Debugw("handling node response") + + nodeRateLimiter, ok := h.perNodeRateLimiters[nodeAddr] + if !ok { + return fmt.Errorf("received message from unexpected node %s", nodeAddr) + } + if !nodeRateLimiter.Allow(ctx) { + l.Debugw("node is rate limited", "nodeAddr", nodeAddr) + return nil + } + if !h.globalNodeRateLimiter.Allow(ctx) { + l.Debug("global relay rate limit exceeded") + return nil + } + + ar := h.getActiveRequest(resp.ID) + if ar == nil { + l.Debugw("no pending request found for ID") + return nil + } + + added := ar.addResponseForNode(nodeAddr, resp) + if !added { + l.Errorw("duplicate response from node, ignoring", "nodeAddr", nodeAddr) + return nil + } + + copiedResponses := ar.copiedResponses() + aggregatedResp, err := h.aggregator.Aggregate(copiedResponses, h.donConfig.F, len(h.donConfig.Members), l) + switch { + case errors.Is(err, errInsufficientResponsesForQuorum): + l.Debugw("aggregating responses, waiting for other nodes...", "error", err) + return nil + case errors.Is(err, errQuorumUnobtainable): + l.Errorw("quorum unobtainable, returning error to user", "error", err) + return h.sendResponseAndCleanup(ctx, ar, h.constructErrorResponse(ar.req, api.FatalError, err)) + case err != nil: + l.Errorw("unexpected aggregation error", "error", err) + return h.sendResponseAndCleanup(ctx, ar, h.constructErrorResponse(ar.req, api.FatalError, err)) + } + + rawResponse, err := jsonrpc.EncodeResponse(aggregatedResp) + if err != nil { + h.lggr.Errorw("failed to encode response", "requestID", ar.req.ID, "error", err) + return h.sendResponseAndCleanup(ctx, ar, h.constructErrorResponse(ar.req, api.NodeReponseEncodingError, err)) + } + return h.sendResponseAndCleanup(ctx, ar, gwhandlers.UserCallbackPayload{ + RawResponse: rawResponse, + ErrorCode: api.NoError, + }) +} + +func (h *handler) fanOutToNodes(ctx context.Context, l logger.Logger, ar *activeRequest) error { + var ( + group errgroup.Group + nodeErrors int + nodeErrorsMu sync.Mutex + ) + + for _, node := range h.donConfig.Members { + node := node + group.Go(func() error { + err := h.don.SendToNode(ctx, node.Address, &ar.req) + if err != nil { + nodeErrorsMu.Lock() + nodeErrors++ + nodeErrorsMu.Unlock() + l.Errorw("error sending request to node", "node", node.Address, "error", err) + } + return nil + }) + } + + _ = group.Wait() + + if nodeErrors == len(h.donConfig.Members) && nodeErrors > 0 { + return h.sendResponseAndCleanup(ctx, ar, h.constructErrorResponse(ar.req, api.FatalError, errors.New("failed to forward user request to nodes"))) + } + + l.Debugw("successfully forwarded request to relay nodes") + return nil +} + +// sendResponseAndCleanup sends payload. +// The request is always removed from activeRequests +// regardless of whether the send succeeds, since a failed callback cannot +// be retried. +func (h *handler) sendResponseAndCleanup(ctx context.Context, ar *activeRequest, payload gwhandlers.UserCallbackPayload) error { + h.recordMetrics(ctx, payload.ErrorCode) + sendErr := ar.SendResponse(payload) + + h.mu.Lock() + delete(h.activeRequests, ar.req.ID) + h.mu.Unlock() + + if sendErr != nil { + h.lggr.Errorw("error sending response to user", "requestID", ar.req.ID, "error", sendErr) + return sendErr + } + + h.lggr.Debugw("response sent to user", "requestID", ar.req.ID, "errorCode", payload.ErrorCode) + return nil +} + +func (h *handler) recordMetrics(ctx context.Context, errorCode api.ErrorCode) { + switch errorCode { + case api.StaleNodeResponseError: + case api.FatalError: + case api.NodeReponseEncodingError: + case api.RequestTimeoutError: + case api.HandlerError: + h.metrics.requestInternalError.Add(ctx, 1, metric.WithAttributes( + attribute.String("don_id", h.donConfig.DonId), + attribute.String("error", errorCode.String()), + )) + case api.InvalidParamsError: + case api.UnsupportedMethodError: + case api.UserMessageParseError: + case api.UnsupportedDONIdError: + h.metrics.requestUserError.Add(ctx, 1, metric.WithAttributes( + attribute.String("don_id", h.donConfig.DonId), + )) + case api.NoError: + h.metrics.requestSuccess.Add(ctx, 1, metric.WithAttributes( + attribute.String("don_id", h.donConfig.DonId), + )) + case api.ConflictError: + case api.LimitExceededError: + } +} + +func (h *handler) constructErrorResponse(req jsonrpc.Request[json.RawMessage], errorCode api.ErrorCode, err error) gwhandlers.UserCallbackPayload { + switch errorCode { + case api.NodeReponseEncodingError: + err = errors.New(errorCode.String()) + case api.InvalidParamsError: + err = fmt.Errorf("invalid params error: %w", err) + case api.UnsupportedMethodError: + err = fmt.Errorf("unsupported method(%s): %w", req.Method, err) + case api.UserMessageParseError: + err = fmt.Errorf("user message parse error: %w", err) + case api.NoError: + case api.UnsupportedDONIdError: + case api.HandlerError: + case api.FatalError: + case api.RequestTimeoutError: + case api.StaleNodeResponseError: + case api.ConflictError: + case api.LimitExceededError: + } + return gwhandlers.UserCallbackPayload{ + RawResponse: h.codec.EncodeNewErrorResponse( + req.ID, + api.ToJSONRPCErrorCode(errorCode), + err.Error(), + nil, + ), + ErrorCode: errorCode, + } +} diff --git a/core/services/gateway/handlers/confidentialrelay/handler_test.go b/core/services/gateway/handlers/confidentialrelay/handler_test.go new file mode 100644 index 00000000000..c3c4ba20690 --- /dev/null +++ b/core/services/gateway/handlers/confidentialrelay/handler_test.go @@ -0,0 +1,607 @@ +package confidentialrelay + +import ( + "context" + "encoding/json" + "errors" + "fmt" + "strings" + "sync" + "testing" + "time" + + "github.com/jonboulle/clockwork" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/mock" + "github.com/stretchr/testify/require" + "golang.org/x/time/rate" + + jsonrpc "github.com/smartcontractkit/chainlink-common/pkg/jsonrpc2" + "github.com/smartcontractkit/chainlink-common/pkg/logger" + "github.com/smartcontractkit/chainlink-common/pkg/settings/cresettings" + "github.com/smartcontractkit/chainlink-common/pkg/settings/limits" + + "github.com/smartcontractkit/chainlink/v2/core/services/gateway/api" + "github.com/smartcontractkit/chainlink/v2/core/services/gateway/config" + "github.com/smartcontractkit/chainlink/v2/core/services/gateway/handlers/common" + "github.com/smartcontractkit/chainlink/v2/core/services/gateway/handlers/mocks" +) + +type barrierDON struct { + total int + mu sync.Mutex + started int + allStarted chan struct{} + releaseOnce sync.Once +} + +func newBarrierDON(total int) *barrierDON { + return &barrierDON{ + total: total, + allStarted: make(chan struct{}), + } +} + +func (d *barrierDON) SendToNode(_ context.Context, _ string, _ *jsonrpc.Request[json.RawMessage]) error { + d.mu.Lock() + d.started++ + if d.started == d.total { + d.releaseOnce.Do(func() { close(d.allStarted) }) + } + ch := d.allStarted + d.mu.Unlock() + + <-ch + return nil +} + +func (d *barrierDON) forceRelease() { + d.releaseOnce.Do(func() { close(d.allStarted) }) +} + +var nodeOne = config.NodeConfig{ + Name: "node1", + Address: "0x1234", +} + +func setupHandler(t *testing.T, numNodes int) (*handler, *common.Callback, *mocks.DON, *clockwork.FakeClock) { + t.Helper() + lggr := logger.Test(t) + don := mocks.NewDON(t) + + members := make([]config.NodeConfig, numNodes) + for i := range numNodes { + members[i] = config.NodeConfig{ + Name: fmt.Sprintf("node%d", i), + Address: fmt.Sprintf("0x%04d", i), + } + } + + donConfig := &config.DONConfig{ + DonId: "test_relay_don", + F: 1, + Members: members, + } + handlerConfig := Config{ + RequestTimeoutSec: 30, + } + methodConfig, err := json.Marshal(handlerConfig) + require.NoError(t, err) + + clock := clockwork.NewFakeClock() + limitsFactory := limits.Factory{Settings: cresettings.DefaultGetter, Logger: lggr} + h, err := NewHandler(methodConfig, donConfig, don, lggr, clock, limitsFactory) + require.NoError(t, err) + h.aggregator = &mockAggregator{} + cb := common.NewCallback() + return h, cb, don, clock +} + +type mockAggregator struct { + err error +} + +func (m *mockAggregator) Aggregate(_ map[string]jsonrpc.Response[json.RawMessage], _ int, _ int, _ logger.Logger) (*jsonrpc.Response[json.RawMessage], error) { + return nil, m.err +} + +type respondingMockAggregator struct{} + +func (m *respondingMockAggregator) Aggregate(resps map[string]jsonrpc.Response[json.RawMessage], _ int, _ int, _ logger.Logger) (*jsonrpc.Response[json.RawMessage], error) { + if len(resps) == 0 { + return nil, errInsufficientResponsesForQuorum + } + // Return the first response we find. + for _, r := range resps { + return &r, nil + } + return nil, errInsufficientResponsesForQuorum +} + +func TestConfidentialRelayHandler_Methods(t *testing.T) { + h, _, _, _ := setupHandler(t, 4) + methods := h.Methods() + assert.Equal(t, []string{MethodSecretsGet, MethodCapabilityExec}, methods) +} + +func TestConfidentialRelayHandler_HandleLegacyUserMessage(t *testing.T) { + h, cb, _, _ := setupHandler(t, 4) + err := h.HandleLegacyUserMessage(t.Context(), nil, cb) + require.ErrorContains(t, err, "confidential relay handler does not support legacy messages") +} + +func TestConfidentialRelayHandler_RequestIDTooLong(t *testing.T) { + h, cb, _, _ := setupHandler(t, 4) + + longID := strings.Repeat("x", 201) + req := jsonrpc.Request[json.RawMessage]{ + ID: longID, + Method: MethodCapabilityExec, + } + + err := h.HandleJSONRPCUserMessage(t.Context(), req, cb) + expected := fmt.Sprintf("request ID is too long: %d. max is 200 characters", len(longID)) + require.EqualError(t, err, expected) +} + +func TestConfidentialRelayHandler_EmptyRequestID(t *testing.T) { + h, cb, _, _ := setupHandler(t, 4) + + req := jsonrpc.Request[json.RawMessage]{ + ID: "", + Method: MethodCapabilityExec, + } + + err := h.HandleJSONRPCUserMessage(t.Context(), req, cb) + require.EqualError(t, err, "request ID cannot be empty") +} + +func TestConfidentialRelayHandler_FanOutAndQuorumSuccess(t *testing.T) { + h, cb, don, _ := setupHandler(t, 4) + h.aggregator = &respondingMockAggregator{} + don.On("SendToNode", mock.Anything, mock.Anything, mock.Anything).Return(nil) + + params := json.RawMessage(`{"workflow_id":"wf1","secrets":[{"key":"k","namespace":"ns"}],"enclave_public_key":"pk"}`) + req := jsonrpc.Request[json.RawMessage]{ + ID: "req-1", + Method: MethodCapabilityExec, + Params: ¶ms, + } + + resultData := json.RawMessage(`{"secrets":[],"master_public_key":"mpk","threshold":1}`) + response := jsonrpc.Response[json.RawMessage]{ + Version: jsonrpc.JsonRpcVersion, + ID: "req-1", + Method: MethodCapabilityExec, + Result: &resultData, + } + + var wg sync.WaitGroup + wg.Add(1) + go func() { + defer wg.Done() + resp, err := cb.Wait(t.Context()) + assert.NoError(t, err) + assert.Equal(t, api.NoError, resp.ErrorCode) + var jsonResp jsonrpc.Response[json.RawMessage] + err = json.Unmarshal(resp.RawResponse, &jsonResp) + assert.NoError(t, err) + assert.Equal(t, "req-1", jsonResp.ID) + }() + + err := h.HandleJSONRPCUserMessage(t.Context(), req, cb) + require.NoError(t, err) + + err = h.HandleNodeMessage(t.Context(), &response, "0x0000") + require.NoError(t, err) + wg.Wait() +} + +func TestConfidentialRelayHandler_QuorumWithRealAggregator(t *testing.T) { + h, cb, don, _ := setupHandler(t, 4) + // Use the real aggregator; DON F=1 so quorum = F+1 = 2 + h.aggregator = &aggregator{} + don.On("SendToNode", mock.Anything, mock.Anything, mock.Anything).Return(nil) + + params := json.RawMessage(`{"workflow_id":"wf1"}`) + req := jsonrpc.Request[json.RawMessage]{ + ID: "req-quorum", + Method: MethodCapabilityExec, + Params: ¶ms, + } + + resultData := json.RawMessage(`{"payload":"result"}`) + makeResp := func() *jsonrpc.Response[json.RawMessage] { + rd := make(json.RawMessage, len(resultData)) + copy(rd, resultData) + return &jsonrpc.Response[json.RawMessage]{ + Version: jsonrpc.JsonRpcVersion, + ID: "req-quorum", + Method: MethodCapabilityExec, + Result: &rd, + } + } + + var wg sync.WaitGroup + wg.Add(1) + go func() { + defer wg.Done() + resp, err := cb.Wait(t.Context()) + assert.NoError(t, err) + assert.Equal(t, api.NoError, resp.ErrorCode) + }() + + err := h.HandleJSONRPCUserMessage(t.Context(), req, cb) + require.NoError(t, err) + + // Send 2 matching responses (F+1 = 2) + for i := range 2 { + err = h.HandleNodeMessage(t.Context(), makeResp(), fmt.Sprintf("0x%04d", i)) + require.NoError(t, err) + } + wg.Wait() +} + +func TestConfidentialRelayHandler_QuorumWithDivergentResponses(t *testing.T) { + h, cb, don, _ := setupHandler(t, 4) + h.aggregator = &aggregator{} + don.On("SendToNode", mock.Anything, mock.Anything, mock.Anything).Return(nil) + + params := json.RawMessage(`{"workflow_id":"wf1"}`) + req := jsonrpc.Request[json.RawMessage]{ + ID: "req-diverge", + Method: MethodCapabilityExec, + Params: ¶ms, + } + + var wg sync.WaitGroup + wg.Add(1) + go func() { + defer wg.Done() + resp, err := cb.Wait(t.Context()) + assert.NoError(t, err) + assert.Equal(t, api.NoError, resp.ErrorCode) + }() + + err := h.HandleJSONRPCUserMessage(t.Context(), req, cb) + require.NoError(t, err) + + // One divergent response + divergentResult := json.RawMessage(`{"secrets":[],"master_public_key":"DIFFERENT","threshold":1}`) + divergentResp := &jsonrpc.Response[json.RawMessage]{ + Version: jsonrpc.JsonRpcVersion, + ID: "req-diverge", + Method: MethodCapabilityExec, + Result: &divergentResult, + } + err = h.HandleNodeMessage(t.Context(), divergentResp, "0x0000") + require.NoError(t, err) + + // Two matching responses (quorum = F+1 = 2) + matchingResult := json.RawMessage(`{"secrets":[],"master_public_key":"mpk","threshold":1}`) + for i := 1; i <= 2; i++ { + rd := make(json.RawMessage, len(matchingResult)) + copy(rd, matchingResult) + resp := &jsonrpc.Response[json.RawMessage]{ + Version: jsonrpc.JsonRpcVersion, + ID: "req-diverge", + Method: MethodCapabilityExec, + Result: &rd, + } + err = h.HandleNodeMessage(t.Context(), resp, fmt.Sprintf("0x%04d", i)) + require.NoError(t, err) + } + wg.Wait() +} + +func TestConfidentialRelayHandler_QuorumUnobtainable(t *testing.T) { + h, cb, don, _ := setupHandler(t, 4) + h.aggregator = &mockAggregator{err: errQuorumUnobtainable} + don.On("SendToNode", mock.Anything, mock.Anything, mock.Anything).Return(nil) + + params := json.RawMessage(`{"workflow_id":"wf1"}`) + req := jsonrpc.Request[json.RawMessage]{ + ID: "req-unobtainable", + Method: MethodCapabilityExec, + Params: ¶ms, + } + + response := jsonrpc.Response[json.RawMessage]{ + Version: jsonrpc.JsonRpcVersion, + ID: "req-unobtainable", + Method: MethodCapabilityExec, + Error: &jsonrpc.WireError{ + Code: -32603, + Message: errQuorumUnobtainable.Error(), + }, + } + + var wg sync.WaitGroup + wg.Add(1) + go func() { + defer wg.Done() + resp, err := cb.Wait(t.Context()) + assert.NoError(t, err) + var jsonResp jsonrpc.Response[json.RawMessage] + err = json.Unmarshal(resp.RawResponse, &jsonResp) + assert.NoError(t, err) + assert.Equal(t, "req-unobtainable", jsonResp.ID) + assert.NotNil(t, jsonResp.Error) + assert.Contains(t, jsonResp.Error.Message, "quorum unobtainable") + }() + + err := h.HandleJSONRPCUserMessage(t.Context(), req, cb) + require.NoError(t, err) + + err = h.HandleNodeMessage(t.Context(), &response, "0x0000") + require.NoError(t, err) + wg.Wait() +} + +func TestConfidentialRelayHandler_RequestTimeout(t *testing.T) { + h, cb, don, clock := setupHandler(t, 4) + don.On("SendToNode", mock.Anything, mock.Anything, mock.Anything).Return(nil) + // Use the real aggregator so responses are not immediately satisfied + h.aggregator = &aggregator{} + + params := json.RawMessage(`{"workflow_id":"wf1"}`) + req := jsonrpc.Request[json.RawMessage]{ + ID: "req-timeout", + Method: MethodCapabilityExec, + Params: ¶ms, + } + + var wg sync.WaitGroup + wg.Add(1) + go func() { + defer wg.Done() + resp, err := cb.Wait(t.Context()) + assert.NoError(t, err) + assert.Equal(t, api.RequestTimeoutError, resp.ErrorCode) + }() + + err := h.HandleJSONRPCUserMessage(t.Context(), req, cb) + require.NoError(t, err) + + // Advance clock past the request timeout and trigger cleanup + clock.Advance(31 * time.Second) + h.removeExpiredRequests(t.Context()) + wg.Wait() +} + +func TestConfidentialRelayHandler_DuplicateRequestID(t *testing.T) { + h, cb, don, _ := setupHandler(t, 4) + don.On("SendToNode", mock.Anything, mock.Anything, mock.Anything).Return(nil) + + params := json.RawMessage(`{"workflow_id":"wf1"}`) + req := jsonrpc.Request[json.RawMessage]{ + ID: "req-dup", + Method: MethodCapabilityExec, + Params: ¶ms, + } + + err := h.HandleJSONRPCUserMessage(t.Context(), req, cb) + require.NoError(t, err) + + cb2 := common.NewCallback() + err = h.HandleJSONRPCUserMessage(t.Context(), req, cb2) + require.ErrorContains(t, err, "request ID already exists") +} + +func TestConfidentialRelayHandler_RateLimitedNode(t *testing.T) { + handlerConfig := Config{ + RequestTimeoutSec: 30, + } + methodConfig, err := json.Marshal(handlerConfig) + require.NoError(t, err) + + lggr := logger.Test(t) + don := mocks.NewDON(t) + donConfig := &config.DONConfig{ + DonId: "test_relay_don", + F: 1, + Members: []config.NodeConfig{nodeOne}, + } + clock := clockwork.NewFakeClock() + limitsFactory := limits.Factory{Settings: cresettings.DefaultGetter, Logger: lggr} + h, err := NewHandler(methodConfig, donConfig, don, lggr, clock, limitsFactory) + require.NoError(t, err) + h.aggregator = &respondingMockAggregator{} + h.globalNodeRateLimiter = limits.GlobalRateLimiter(rate.Limit(100), 100) + h.perNodeRateLimiters[nodeOne.Address] = limits.GlobalRateLimiter(rate.Limit(0.001), 1) + + don.On("SendToNode", mock.Anything, mock.Anything, mock.Anything).Return(nil) + + cb := common.NewCallback() + params := json.RawMessage(`{"workflow_id":"wf1"}`) + req := jsonrpc.Request[json.RawMessage]{ + ID: "req-ratelimit", + Method: MethodCapabilityExec, + Params: ¶ms, + } + + err = h.HandleJSONRPCUserMessage(t.Context(), req, cb) + require.NoError(t, err) + + resultData := json.RawMessage(`{"secrets":[]}`) + response := jsonrpc.Response[json.RawMessage]{ + Version: jsonrpc.JsonRpcVersion, + ID: "req-ratelimit", + Method: MethodCapabilityExec, + Result: &resultData, + } + + // First response from node uses the burst allowance + err = h.HandleNodeMessage(t.Context(), &response, nodeOne.Address) + require.NoError(t, err) + + // Verify callback was called + ctx, cancel := context.WithTimeout(t.Context(), 100*time.Millisecond) + defer cancel() + resp, err := cb.Wait(ctx) + require.NoError(t, err) + assert.Equal(t, api.NoError, resp.ErrorCode) + + // Start a new request + cb2 := common.NewCallback() + req2 := jsonrpc.Request[json.RawMessage]{ + ID: "req-ratelimit-2", + Method: MethodCapabilityExec, + Params: ¶ms, + } + err = h.HandleJSONRPCUserMessage(t.Context(), req2, cb2) + require.NoError(t, err) + + response2 := jsonrpc.Response[json.RawMessage]{ + Version: jsonrpc.JsonRpcVersion, + ID: "req-ratelimit-2", + Method: MethodCapabilityExec, + Result: &resultData, + } + + // Second response should be rate limited (silently dropped) + err = h.HandleNodeMessage(t.Context(), &response2, nodeOne.Address) + require.NoError(t, err) + + // Callback should NOT be called - verify with timeout + ctx2, cancel2 := context.WithTimeout(t.Context(), 50*time.Millisecond) + defer cancel2() + _, err = cb2.Wait(ctx2) + require.Error(t, err) // Should timeout +} + +func TestConfidentialRelayHandler_LateNodeResponse(t *testing.T) { + h, cb, _, _ := setupHandler(t, 4) + + resultData := json.RawMessage(`{"secrets":[]}`) + staleResponse := jsonrpc.Response[json.RawMessage]{ + Version: jsonrpc.JsonRpcVersion, + ID: "nonexistent-request", + Method: MethodCapabilityExec, + Result: &resultData, + } + + // This should not error, just silently ignore + err := h.HandleNodeMessage(t.Context(), &staleResponse, "0x0000") + require.NoError(t, err) + + // Verify callback was not triggered + ctx, cancel := context.WithTimeout(t.Context(), 10*time.Millisecond) + defer cancel() + _, err = cb.Wait(ctx) + require.Error(t, err) +} + +func TestConfidentialRelayHandler_AllNodesFanOutFail(t *testing.T) { + h, cb, don, _ := setupHandler(t, 4) + don.On("SendToNode", mock.Anything, mock.Anything, mock.Anything).Return(errors.New("connection refused")) + + params := json.RawMessage(`{"workflow_id":"wf1"}`) + req := jsonrpc.Request[json.RawMessage]{ + ID: "req-allfail", + Method: MethodCapabilityExec, + Params: ¶ms, + } + + var wg sync.WaitGroup + wg.Add(1) + go func() { + defer wg.Done() + resp, err := cb.Wait(t.Context()) + assert.NoError(t, err) + assert.Equal(t, api.FatalError, resp.ErrorCode) + var jsonResp jsonrpc.Response[json.RawMessage] + err = json.Unmarshal(resp.RawResponse, &jsonResp) + assert.NoError(t, err) + assert.Contains(t, jsonResp.Error.Message, "failed to forward user request to nodes") + }() + + err := h.HandleJSONRPCUserMessage(t.Context(), req, cb) + require.NoError(t, err) + wg.Wait() +} + +func TestConfidentialRelayHandler_FanOutToNodes_IsConcurrent(t *testing.T) { + lggr := logger.Test(t) + don := newBarrierDON(2) + donConfig := &config.DONConfig{ + DonId: "test_relay_don", + F: 1, + Members: []config.NodeConfig{ + {Name: "node0", Address: "0x0000"}, + {Name: "node1", Address: "0x0001"}, + }, + } + + methodConfig, err := json.Marshal(Config{ + RequestTimeoutSec: 30, + }) + require.NoError(t, err) + + limitsFactory := limits.Factory{Settings: cresettings.DefaultGetter, Logger: lggr} + h, err := NewHandler(methodConfig, donConfig, don, lggr, clockwork.NewFakeClock(), limitsFactory) + require.NoError(t, err) + + cb := common.NewCallback() + params := json.RawMessage(`{"workflow_id":"wf1"}`) + req := jsonrpc.Request[json.RawMessage]{ + ID: "req-concurrent-fanout", + Method: MethodCapabilityExec, + Params: ¶ms, + } + + done := make(chan error, 1) + go func() { + done <- h.HandleJSONRPCUserMessage(t.Context(), req, cb) + }() + + select { + case err := <-done: + require.NoError(t, err) + case <-time.After(100 * time.Millisecond): + don.forceRelease() + t.Fatal("HandleJSONRPCUserMessage did not fan out to nodes concurrently") + } + + don.mu.Lock() + started := don.started + don.mu.Unlock() + assert.Equal(t, 2, started) +} + +func TestConfidentialRelayHandler_CapabilityExecMethod(t *testing.T) { + h, cb, don, _ := setupHandler(t, 4) + h.aggregator = &respondingMockAggregator{} + don.On("SendToNode", mock.Anything, mock.Anything, mock.Anything).Return(nil) + + params := json.RawMessage(`{"workflow_id":"wf1","capability_id":"cap1","payload":"data"}`) + req := jsonrpc.Request[json.RawMessage]{ + ID: "req-cap", + Method: MethodCapabilityExec, + Params: ¶ms, + } + + var wg sync.WaitGroup + wg.Add(1) + go func() { + defer wg.Done() + resp, err := cb.Wait(t.Context()) + assert.NoError(t, err) + assert.Equal(t, api.NoError, resp.ErrorCode) + }() + + err := h.HandleJSONRPCUserMessage(t.Context(), req, cb) + require.NoError(t, err) + + resultData := json.RawMessage(`{"payload":"result"}`) + response := jsonrpc.Response[json.RawMessage]{ + Version: jsonrpc.JsonRpcVersion, + ID: "req-cap", + Method: MethodCapabilityExec, + Result: &resultData, + } + err = h.HandleNodeMessage(t.Context(), &response, "0x0000") + require.NoError(t, err) + wg.Wait() + don.AssertCalled(t, "SendToNode", mock.Anything, mock.Anything, mock.Anything) +} diff --git a/core/services/job/models.go b/core/services/job/models.go index 20e7da2c643..2d257a33ca5 100644 --- a/core/services/job/models.go +++ b/core/services/job/models.go @@ -930,6 +930,7 @@ type WorkflowSpec struct { CreatedAt time.Time `toml:"-"` UpdatedAt time.Time `toml:"-"` SpecType WorkflowSpecType `toml:"spec_type" db:"spec_type"` + Attributes []byte `db:"attributes"` sdkWorkflow *sdk.WorkflowSpec rawSpec []byte config []byte diff --git a/core/services/standardcapabilities/conversions/conversions.go b/core/services/standardcapabilities/conversions/conversions.go index b9a233243af..d623fa9a409 100644 --- a/core/services/standardcapabilities/conversions/conversions.go +++ b/core/services/standardcapabilities/conversions/conversions.go @@ -50,6 +50,8 @@ func GetCapabilityIDFromCommand(command string, config string) string { return "http-trigger@1.0.0-alpha" case "http_action": return "http-actions@1.0.0-alpha" // plural "actions" + case "mock": + return "mock@1.0.0" default: return "" } @@ -71,6 +73,8 @@ func GetCommandFromCapabilityID(capabilityID string) string { return "http_trigger" case strings.HasPrefix(capabilityID, "http-actions"): return "http_action" + case strings.HasPrefix(capabilityID, "mock"): + return "mock" default: return "" } diff --git a/core/services/workflows/artifacts/v2/orm.go b/core/services/workflows/artifacts/v2/orm.go index 7b37afda16b..6d8887dbe06 100644 --- a/core/services/workflows/artifacts/v2/orm.go +++ b/core/services/workflows/artifacts/v2/orm.go @@ -63,7 +63,8 @@ func (orm *orm) UpsertWorkflowSpec(ctx context.Context, spec *job.WorkflowSpec) config_url, created_at, updated_at, - spec_type + spec_type, + attributes ) VALUES ( :workflow, :config, @@ -76,7 +77,8 @@ func (orm *orm) UpsertWorkflowSpec(ctx context.Context, spec *job.WorkflowSpec) :config_url, :created_at, :updated_at, - :spec_type + :spec_type, + :attributes ) ON CONFLICT (workflow_id) DO UPDATE SET workflow = EXCLUDED.workflow, @@ -89,7 +91,8 @@ func (orm *orm) UpsertWorkflowSpec(ctx context.Context, spec *job.WorkflowSpec) config_url = EXCLUDED.config_url, created_at = EXCLUDED.created_at, updated_at = EXCLUDED.updated_at, - spec_type = EXCLUDED.spec_type + spec_type = EXCLUDED.spec_type, + attributes = EXCLUDED.attributes RETURNING id ` diff --git a/core/services/workflows/syncer/fetcher.go b/core/services/workflows/syncer/fetcher.go index c959b6b4da7..01de212507a 100644 --- a/core/services/workflows/syncer/fetcher.go +++ b/core/services/workflows/syncer/fetcher.go @@ -177,6 +177,9 @@ func newFileFetcher(basePath string, lggr logger.Logger) types.FetcherFunc { if err != nil { return nil, fmt.Errorf("invalid URL: %w", err) } + if u.Scheme == "http" || u.Scheme == "https" { + u.Path = filepath.Base(u.Path) + } fullPath := filepath.Clean(u.Path) // ensure that the incoming request URL is either relative or absolute but within the basePath diff --git a/core/services/workflows/syncer/v2/fetcher.go b/core/services/workflows/syncer/v2/fetcher.go index e86f998f6bb..cb334ff8fb8 100644 --- a/core/services/workflows/syncer/v2/fetcher.go +++ b/core/services/workflows/syncer/v2/fetcher.go @@ -211,6 +211,11 @@ func newFileFetcher(basePath string, lggr logger.Logger) types.FetcherFunc { if err != nil { return nil, fmt.Errorf("invalid URL: %w", err) } + // Confidential workflows register with HTTP URLs (for the enclave). + // Extract the filename so the file fetcher can find the local copy. + if u.Scheme == "http" || u.Scheme == "https" { + u.Path = filepath.Base(u.Path) + } fullPath := filepath.Clean(u.Path) // ensure that the incoming request URL is either relative or absolute but within the basePath diff --git a/core/services/workflows/syncer/v2/handler.go b/core/services/workflows/syncer/v2/handler.go index e78e2ec55bc..ba5ee8d3909 100644 --- a/core/services/workflows/syncer/v2/handler.go +++ b/core/services/workflows/syncer/v2/handler.go @@ -537,6 +537,7 @@ func (h *eventHandler) createWorkflowSpec(ctx context.Context, payload WorkflowR SpecType: job.WASMFile, BinaryURL: payload.BinaryURL, ConfigURL: payload.ConfigURL, + Attributes: payload.Attributes, } if _, err = h.workflowArtifactsStore.UpsertWorkflowSpec(ctx, entry); err != nil { @@ -628,55 +629,9 @@ func (h *eventHandler) engineFactoryFn(ctx context.Context, workflowID string, o } // V2 aka "NoDAG" - cfg := &v2.EngineConfig{ - Lggr: h.lggr, - Module: module, - WorkflowConfig: config, - CapRegistry: h.capRegistry, - DonSubscriber: h.workflowDonSubscriber, - UseLocalTimeProvider: h.useLocalTimeProvider, - DonTimeStore: h.donTimeStore, - ExecutionsStore: h.workflowStore, - WorkflowID: workflowID, - WorkflowOwner: owner, - WorkflowName: name, - WorkflowTag: tag, - WorkflowEncryptionKey: h.workflowEncryptionKey, - - LocalLimits: v2.EngineLimits{}, // all defaults - LocalLimiters: h.engineLimiters, - FeatureFlags: h.featureFlags, - GlobalExecutionConcurrencyLimiter: h.workflowLimits, - - BeholderEmitter: func() custmsg.MessageEmitter { - h.emitterMu.RLock() - defer h.emitterMu.RUnlock() - return h.emitter - }(), - BillingClient: h.billingClient, + cfg := h.newV2EngineConfig(module, workflowID, owner, tag, sdkName, name, config) - WorkflowRegistryAddress: h.workflowRegistryAddress, - WorkflowRegistryChainSelector: h.workflowRegistryChainSelector, - OrgResolver: h.orgResolver, - DebugMode: h.debugMode, - SecretsFetcher: h.secretsFetcher, - SdkName: sdkName, - } - - // Wire the initDone channel to the OnInitialized lifecycle hook. - // This will be called when the engine completes initialization (including trigger subscriptions). - // We compose with any existing hook to avoid overwriting test hooks or other user-provided hooks. - if initDone != nil { - existingHook := cfg.Hooks.OnInitialized - cfg.Hooks.OnInitialized = func(err error) { - // Signal completion to the handler first - initDone <- err - // Then call any existing hook (e.g., from tests) - if existingHook != nil { - existingHook(err) - } - } - } + h.wireInitDoneHook(cfg, initDone) return v2.NewEngine(cfg) } @@ -789,24 +744,35 @@ func (h *eventHandler) tryEngineCreate(ctx context.Context, spec *job.WorkflowSp return fmt.Errorf("invalid workflow name: %w", err) } + confidential, err := v2.IsConfidential(spec.Attributes) + if err != nil { + return fmt.Errorf("failed to parse workflow attributes: %w", err) + } + // Create a channel to receive the initialization result. // This allows us to wait for the engine to complete initialization (including trigger subscriptions) // before emitting the workflowActivated event, ensuring the event accurately reflects deployment status. initDone := make(chan error, 1) - - // Scope the engineFactory call so that decodedBinary goes out of scope immediately after the factory returns - engine, err := func() (services.Service, error) { - return h.engineFactory( - ctx, - spec.WorkflowID, - spec.WorkflowOwner, - workflowName, - spec.WorkflowTag, - configBytes, - decodedBinary, - initDone, - ) - }() + var engine services.Service + + if confidential { + h.lggr.Infow("routing workflow to confidential execution", "workflowID", spec.WorkflowID) + engine, err = h.confidentialEngineFactory(spec, workflowName, decodedBinary, initDone) + } else { + // Scope the engineFactory call so that decodedBinary goes out of scope immediately after the factory returns + engine, err = func() (services.Service, error) { + return h.engineFactory( + ctx, + spec.WorkflowID, + spec.WorkflowOwner, + workflowName, + spec.WorkflowTag, + configBytes, + decodedBinary, + initDone, + ) + }() + } if err != nil { return fmt.Errorf("failed to create workflow engine: %w", err) } @@ -863,6 +829,106 @@ func (h *eventHandler) tryEngineCreate(ctx context.Context, spec *job.WorkflowSp return nil } +// newV2EngineConfig builds the common EngineConfig shared by both the normal +// WASM engine and the confidential engine paths. Caller supplies the module. +func (h *eventHandler) newV2EngineConfig( + module host.ModuleV2, + workflowID, owner, tag, sdkName string, + name types.WorkflowName, + config []byte, +) *v2.EngineConfig { + return &v2.EngineConfig{ + Lggr: h.lggr, + Module: module, + WorkflowConfig: config, + CapRegistry: h.capRegistry, + DonSubscriber: h.workflowDonSubscriber, + UseLocalTimeProvider: h.useLocalTimeProvider, + DonTimeStore: h.donTimeStore, + ExecutionsStore: h.workflowStore, + WorkflowID: workflowID, + WorkflowOwner: owner, + WorkflowName: name, + WorkflowTag: tag, + WorkflowEncryptionKey: h.workflowEncryptionKey, + + LocalLimits: v2.EngineLimits{}, // all defaults + LocalLimiters: h.engineLimiters, + FeatureFlags: h.featureFlags, + GlobalExecutionConcurrencyLimiter: h.workflowLimits, + + BeholderEmitter: func() custmsg.MessageEmitter { + h.emitterMu.RLock() + defer h.emitterMu.RUnlock() + return h.emitter + }(), + BillingClient: h.billingClient, + + WorkflowRegistryAddress: h.workflowRegistryAddress, + WorkflowRegistryChainSelector: h.workflowRegistryChainSelector, + OrgResolver: h.orgResolver, + SecretsFetcher: h.secretsFetcher, + DebugMode: h.debugMode, + SdkName: sdkName, + } +} + +// wireInitDoneHook wires the initDone channel to the OnInitialized lifecycle hook. +// This will be called when the engine completes initialization (including trigger subscriptions). +// We compose with any existing hook to avoid overwriting test hooks or other user-provided hooks. +func (h *eventHandler) wireInitDoneHook(cfg *v2.EngineConfig, initDone chan<- error) { + if initDone == nil { + return + } + existingHook := cfg.Hooks.OnInitialized + cfg.Hooks.OnInitialized = func(err error) { + // Signal completion to the handler first + initDone <- err + // Then call any existing hook (e.g., from tests) + if existingHook != nil { + existingHook(err) + } + } +} + +// confidentialEngineFactory creates a V2 engine backed by a ConfidentialModule +// instead of a local WASM module. The ConfidentialModule delegates execution to +// the confidential-workflows capability which runs the WASM inside a TEE. +func (h *eventHandler) confidentialEngineFactory( + spec *job.WorkflowSpec, + workflowName types.WorkflowName, + decodedBinary []byte, + initDone chan<- error, +) (services.Service, error) { + attrs, err := v2.ParseWorkflowAttributes(spec.Attributes) + if err != nil { + return nil, fmt.Errorf("failed to parse workflow attributes: %w", err) + } + + binaryHash := v2.ComputeBinaryHash(decodedBinary) + + lggr := logger.Named(h.lggr, "WorkflowEngine.ConfidentialModule") + lggr = logger.With(lggr, "workflowID", spec.WorkflowID, "workflowName", spec.WorkflowName, "workflowOwner", spec.WorkflowOwner) + + // nil resolver: raw binaryURL is passed to the enclave as-is. + // PR 5/5 (#21642) wires this to the storage service retriever + // so the enclave receives a presigned URL. + module := v2.NewConfidentialModule( + h.capRegistry, + spec.BinaryURL, + binaryHash, + spec.WorkflowID, spec.WorkflowOwner, workflowName.String(), spec.WorkflowTag, + attrs.VaultDonSecrets, + nil, + lggr, + ) + + cfg := h.newV2EngineConfig(module, spec.WorkflowID, spec.WorkflowOwner, spec.WorkflowTag, "", workflowName, []byte(spec.Config)) + h.wireInitDoneHook(cfg, initDone) + + return v2.NewEngine(cfg) +} + // logCustMsg emits a custom message to the external sink and logs an error if that fails. func logCustMsg(ctx context.Context, cma custmsg.MessageEmitter, msg string, log logger.Logger) { err := cma.Emit(ctx, msg) diff --git a/core/services/workflows/syncer/v2/handler_test.go b/core/services/workflows/syncer/v2/handler_test.go index ad7b80d06b9..aa0a66c6362 100644 --- a/core/services/workflows/syncer/v2/handler_test.go +++ b/core/services/workflows/syncer/v2/handler_test.go @@ -618,6 +618,264 @@ func Test_workflowRegisteredHandler(t *testing.T) { } } +func Test_workflowRegisteredHandler_confidentialRouting(t *testing.T) { + t.Run("confidential workflow bypasses engine factory and routes to confidential path", func(t *testing.T) { + var ( + ctx = testutils.Context(t) + lggr = logger.TestLogger(t) + lf = limits.Factory{Logger: lggr} + db = pgtest.NewSqlxDB(t) + orm = artifacts.NewWorkflowRegistryDS(db, lggr) + emitter = custmsg.NewLabeler() + + binary = wasmtest.CreateTestBinary(binaryCmd, true, t) + encodedBinary = []byte(base64.StdEncoding.EncodeToString(binary)) + config = []byte("") + wfOwner = []byte("0xOwner") + workflowEncryptionKey = workflowkey.MustNewXXXTestingOnly(big.NewInt(1)) + ) + + giveWFID, err := pkgworkflows.GenerateWorkflowID(wfOwner, "workflow-name", binary, config, "") + require.NoError(t, err) + wfIDString := hex.EncodeToString(giveWFID[:]) + + binaryURL := "http://example.com/" + wfIDString + "/binary" + configURL := "http://example.com/" + wfIDString + "/config" + signedURLParameter := "?auth=abc123" + signedBinaryURL := binaryURL + signedURLParameter + signedConfigURL := configURL + signedURLParameter + + fetcher := newMockFetcher(map[string]mockFetchResp{ + wfIDString + "-ARTIFACT_TYPE_BINARY": {Body: []byte(signedBinaryURL), Err: nil}, + wfIDString + "-ARTIFACT_TYPE_CONFIG": {Body: []byte(signedConfigURL), Err: nil}, + signedBinaryURL: {Body: encodedBinary, Err: nil}, + signedConfigURL: {Body: config, Err: nil}, + }) + artifactStore, err := artifacts.NewStore(lggr, orm, fetcher.FetcherFunc(), fetcher.RetrieverFunc(), clockwork.NewFakeClock(), workflowkey.Key{}, custmsg.NewLabeler(), lf, artifacts.WithConfig(artifacts.StoreConfig{ + ArtifactStorageHost: "example.com", + })) + require.NoError(t, err) + + er := NewEngineRegistry() + + // Track whether the engine factory is called. The confidential path + // should bypass it entirely. + factoryCalled := false + trackingFactory := func(ctx context.Context, wfid string, owner string, name types.WorkflowName, tag string, config []byte, binary []byte, initDone chan<- error) (services.Service, error) { + factoryCalled = true + if initDone != nil { + initDone <- nil + } + return &mockEngine{}, nil + } + + wfStore := store.NewInMemoryStore(lggr, clockwork.NewFakeClock()) + registry := capabilities.NewRegistry(lggr) + registry.SetLocalRegistry(&capabilities.TestMetadataRegistry{}) + limiters, err := v2.NewLimiters(lf, nil) + require.NoError(t, err) + rl, err := ratelimiter.NewRateLimiter(rlConfig) + require.NoError(t, err) + workflowLimits, err := syncerlimiter.NewWorkflowLimits(lggr, syncerlimiter.Config{Global: 200, PerOwner: 200}, lf) + require.NoError(t, err) + + h, err := NewEventHandler(lggr, wfStore, nil, true, registry, er, emitter, limiters, nil, rl, workflowLimits, artifactStore, workflowEncryptionKey, &testDonNotifier{}, + WithEngineRegistry(er), + WithEngineFactoryFn(trackingFactory), + ) + require.NoError(t, err) + servicetest.Run(t, h) + + event := WorkflowRegisteredEvent{ + Status: WorkflowStatusActive, + WorkflowID: giveWFID, + WorkflowOwner: wfOwner, + WorkflowName: "workflow-name", + WorkflowTag: "workflow-tag", + BinaryURL: binaryURL, + ConfigURL: configURL, + Attributes: []byte(`{"confidential":true,"vault_don_secrets":[{"key":"API_KEY"}]}`), + } + + ctx = contexts.WithCRE(ctx, contexts.CRE{Owner: hex.EncodeToString(wfOwner), Workflow: wfIDString}) + err = h.workflowRegisteredEvent(ctx, event) + + // The confidential path creates a real v2.Engine. With test data + // (non-hex owner), engine creation fails. The error comes from the + // confidential path, proving routing worked correctly. + require.Error(t, err) + assert.Contains(t, err.Error(), "failed to create workflow engine") + + // The engine factory must NOT have been called; the confidential path + // bypasses it. + assert.False(t, factoryCalled, "engine factory should not be called for confidential workflows") + + // The engine should NOT be in the registry since init failed. + _, ok := er.Get(giveWFID) + assert.False(t, ok, "engine should not be registered after failed init") + }) + + t.Run("non-confidential workflow uses engine factory", func(t *testing.T) { + var ( + ctx = testutils.Context(t) + lggr = logger.TestLogger(t) + lf = limits.Factory{Logger: lggr} + db = pgtest.NewSqlxDB(t) + orm = artifacts.NewWorkflowRegistryDS(db, lggr) + emitter = custmsg.NewLabeler() + + binary = wasmtest.CreateTestBinary(binaryCmd, true, t) + encodedBinary = []byte(base64.StdEncoding.EncodeToString(binary)) + config = []byte("") + wfOwner = []byte("0xOwner") + workflowEncryptionKey = workflowkey.MustNewXXXTestingOnly(big.NewInt(1)) + ) + + giveWFID, err := pkgworkflows.GenerateWorkflowID(wfOwner, "workflow-name", binary, config, "") + require.NoError(t, err) + wfIDString := hex.EncodeToString(giveWFID[:]) + + binaryURL := "http://example.com/" + wfIDString + "/binary" + configURL := "http://example.com/" + wfIDString + "/config" + signedURLParameter := "?auth=abc123" + signedBinaryURL := binaryURL + signedURLParameter + signedConfigURL := configURL + signedURLParameter + + fetcher := newMockFetcher(map[string]mockFetchResp{ + wfIDString + "-ARTIFACT_TYPE_BINARY": {Body: []byte(signedBinaryURL), Err: nil}, + wfIDString + "-ARTIFACT_TYPE_CONFIG": {Body: []byte(signedConfigURL), Err: nil}, + signedBinaryURL: {Body: encodedBinary, Err: nil}, + signedConfigURL: {Body: config, Err: nil}, + }) + artifactStore, err := artifacts.NewStore(lggr, orm, fetcher.FetcherFunc(), fetcher.RetrieverFunc(), clockwork.NewFakeClock(), workflowkey.Key{}, custmsg.NewLabeler(), lf, artifacts.WithConfig(artifacts.StoreConfig{ + ArtifactStorageHost: "example.com", + })) + require.NoError(t, err) + + er := NewEngineRegistry() + + factoryCalled := false + trackingFactory := func(ctx context.Context, wfid string, owner string, name types.WorkflowName, tag string, config []byte, binary []byte, initDone chan<- error) (services.Service, error) { + factoryCalled = true + if initDone != nil { + initDone <- nil + } + return &mockEngine{}, nil + } + + wfStore := store.NewInMemoryStore(lggr, clockwork.NewFakeClock()) + registry := capabilities.NewRegistry(lggr) + registry.SetLocalRegistry(&capabilities.TestMetadataRegistry{}) + limiters, err := v2.NewLimiters(lf, nil) + require.NoError(t, err) + rl, err := ratelimiter.NewRateLimiter(rlConfig) + require.NoError(t, err) + workflowLimits, err := syncerlimiter.NewWorkflowLimits(lggr, syncerlimiter.Config{Global: 200, PerOwner: 200}, lf) + require.NoError(t, err) + + h, err := NewEventHandler(lggr, wfStore, nil, true, registry, er, emitter, limiters, nil, rl, workflowLimits, artifactStore, workflowEncryptionKey, &testDonNotifier{}, + WithEngineRegistry(er), + WithEngineFactoryFn(trackingFactory), + ) + require.NoError(t, err) + servicetest.Run(t, h) + + event := WorkflowRegisteredEvent{ + Status: WorkflowStatusActive, + WorkflowID: giveWFID, + WorkflowOwner: wfOwner, + WorkflowName: "workflow-name", + WorkflowTag: "workflow-tag", + BinaryURL: binaryURL, + ConfigURL: configURL, + // No Attributes, or non-confidential attributes. + } + + ctx = contexts.WithCRE(ctx, contexts.CRE{Owner: hex.EncodeToString(wfOwner), Workflow: wfIDString}) + err = h.workflowRegisteredEvent(ctx, event) + require.NoError(t, err) + + assert.True(t, factoryCalled, "engine factory should be called for non-confidential workflows") + + engine, ok := er.Get(giveWFID) + require.True(t, ok, "engine should be registered") + require.NoError(t, engine.Ready()) + }) + + t.Run("malformed attributes returns error", func(t *testing.T) { + var ( + ctx = testutils.Context(t) + lggr = logger.TestLogger(t) + lf = limits.Factory{Logger: lggr} + db = pgtest.NewSqlxDB(t) + orm = artifacts.NewWorkflowRegistryDS(db, lggr) + emitter = custmsg.NewLabeler() + + binary = wasmtest.CreateTestBinary(binaryCmd, true, t) + encodedBinary = []byte(base64.StdEncoding.EncodeToString(binary)) + config = []byte("") + wfOwner = []byte("0xOwner") + workflowEncryptionKey = workflowkey.MustNewXXXTestingOnly(big.NewInt(1)) + ) + + giveWFID, err := pkgworkflows.GenerateWorkflowID(wfOwner, "workflow-name", binary, config, "") + require.NoError(t, err) + wfIDString := hex.EncodeToString(giveWFID[:]) + + binaryURL := "http://example.com/" + wfIDString + "/binary" + configURL := "http://example.com/" + wfIDString + "/config" + signedURLParameter := "?auth=abc123" + signedBinaryURL := binaryURL + signedURLParameter + signedConfigURL := configURL + signedURLParameter + + fetcher := newMockFetcher(map[string]mockFetchResp{ + wfIDString + "-ARTIFACT_TYPE_BINARY": {Body: []byte(signedBinaryURL), Err: nil}, + wfIDString + "-ARTIFACT_TYPE_CONFIG": {Body: []byte(signedConfigURL), Err: nil}, + signedBinaryURL: {Body: encodedBinary, Err: nil}, + signedConfigURL: {Body: config, Err: nil}, + }) + artifactStore, err := artifacts.NewStore(lggr, orm, fetcher.FetcherFunc(), fetcher.RetrieverFunc(), clockwork.NewFakeClock(), workflowkey.Key{}, custmsg.NewLabeler(), lf, artifacts.WithConfig(artifacts.StoreConfig{ + ArtifactStorageHost: "example.com", + })) + require.NoError(t, err) + + er := NewEngineRegistry() + wfStore := store.NewInMemoryStore(lggr, clockwork.NewFakeClock()) + registry := capabilities.NewRegistry(lggr) + registry.SetLocalRegistry(&capabilities.TestMetadataRegistry{}) + limiters, err := v2.NewLimiters(lf, nil) + require.NoError(t, err) + rl, err := ratelimiter.NewRateLimiter(rlConfig) + require.NoError(t, err) + workflowLimits, err := syncerlimiter.NewWorkflowLimits(lggr, syncerlimiter.Config{Global: 200, PerOwner: 200}, lf) + require.NoError(t, err) + + h, err := NewEventHandler(lggr, wfStore, nil, true, registry, er, emitter, limiters, nil, rl, workflowLimits, artifactStore, workflowEncryptionKey, &testDonNotifier{}, + WithEngineRegistry(er), + WithEngineFactoryFn(mockEngineFactory), + ) + require.NoError(t, err) + servicetest.Run(t, h) + + event := WorkflowRegisteredEvent{ + Status: WorkflowStatusActive, + WorkflowID: giveWFID, + WorkflowOwner: wfOwner, + WorkflowName: "workflow-name", + WorkflowTag: "workflow-tag", + BinaryURL: binaryURL, + ConfigURL: configURL, + Attributes: []byte(`{not valid json`), + } + + ctx = contexts.WithCRE(ctx, contexts.CRE{Owner: hex.EncodeToString(wfOwner), Workflow: wfIDString}) + err = h.workflowRegisteredEvent(ctx, event) + + require.Error(t, err) + assert.Contains(t, err.Error(), "failed to parse workflow attributes") + }) +} + type testCase struct { Name string BinaryURLFactory func(string) string diff --git a/core/services/workflows/v2/confidential_module.go b/core/services/workflows/v2/confidential_module.go new file mode 100644 index 00000000000..7a1983d2a27 --- /dev/null +++ b/core/services/workflows/v2/confidential_module.go @@ -0,0 +1,204 @@ +package v2 + +import ( + "context" + "crypto/sha256" + "encoding/json" + "errors" + "fmt" + + "google.golang.org/protobuf/proto" + "google.golang.org/protobuf/types/known/anypb" + + "github.com/smartcontractkit/chainlink-common/pkg/capabilities" + "github.com/smartcontractkit/chainlink-common/pkg/logger" + "github.com/smartcontractkit/chainlink-common/pkg/types/core" + "github.com/smartcontractkit/chainlink-common/pkg/workflows/wasm/host" + + confworkflowtypes "github.com/smartcontractkit/chainlink-common/pkg/capabilities/v2/actions/confidentialworkflow" + sdkpb "github.com/smartcontractkit/chainlink-protos/cre/go/sdk" +) + +const confidentialWorkflowsCapabilityID = "confidential-workflows@1.0.0-alpha" + +// WorkflowAttributes is the JSON structure stored in WorkflowSpec.Attributes. +type WorkflowAttributes struct { + Confidential bool `json:"confidential"` + VaultDonSecrets []SecretIdentifier `json:"vault_don_secrets"` +} + +// SecretIdentifier identifies a secret in VaultDON. +type SecretIdentifier struct { + Key string `json:"key"` + Namespace string `json:"namespace,omitempty"` +} + +// ParseWorkflowAttributes parses the Attributes JSON from a WorkflowSpec. +// Returns a zero-value struct if data is nil or empty. +func ParseWorkflowAttributes(data []byte) (WorkflowAttributes, error) { + var attrs WorkflowAttributes + if len(data) == 0 { + return attrs, nil + } + if err := json.Unmarshal(data, &attrs); err != nil { + return attrs, fmt.Errorf("failed to parse workflow attributes: %w", err) + } + return attrs, nil +} + +// IsConfidential returns true if the Attributes JSON has "confidential": true. +// Returns an error if the attributes contain malformed JSON, so callers can +// fail loudly rather than silently falling through to non-confidential execution. +func IsConfidential(data []byte) (bool, error) { + attrs, err := ParseWorkflowAttributes(data) + if err != nil { + return false, err + } + return attrs.Confidential, nil +} + +// BinaryURLResolver resolves a raw binary URL into an ephemeral/presigned +// URL that the enclave can fetch without authentication. In production this +// calls the CRE storage service; nil means the raw URL is used as-is. +// PR 5/5 (#21642) wires this to the storage service retriever. +type BinaryURLResolver func(ctx context.Context, workflowID string) (string, error) + +// ConfidentialModule implements host.ModuleV2 for confidential workflows. +// Instead of running WASM locally, it delegates execution to the +// confidential-workflows capability via the CapabilitiesRegistry. +type ConfidentialModule struct { + capRegistry core.CapabilitiesRegistry + binaryURL string + binaryHash []byte + workflowID string + workflowOwner string + workflowName string + workflowTag string + vaultDonSecrets []SecretIdentifier + binaryURLResolver BinaryURLResolver + lggr logger.Logger +} + +var _ host.ModuleV2 = (*ConfidentialModule)(nil) + +func NewConfidentialModule( + capRegistry core.CapabilitiesRegistry, + binaryURL string, + binaryHash []byte, + workflowID, workflowOwner, workflowName, workflowTag string, + vaultDonSecrets []SecretIdentifier, + binaryURLResolver BinaryURLResolver, + lggr logger.Logger, +) *ConfidentialModule { + return &ConfidentialModule{ + capRegistry: capRegistry, + binaryURL: binaryURL, + binaryHash: binaryHash, + workflowID: workflowID, + workflowOwner: workflowOwner, + workflowName: workflowName, + workflowTag: workflowTag, + vaultDonSecrets: vaultDonSecrets, + binaryURLResolver: binaryURLResolver, + lggr: lggr, + } +} + +func (m *ConfidentialModule) Start() {} +func (m *ConfidentialModule) Close() {} +func (m *ConfidentialModule) IsLegacyDAG() bool { return false } + +func (m *ConfidentialModule) Execute( + ctx context.Context, + request *sdkpb.ExecuteRequest, + helper host.ExecutionHelper, +) (*sdkpb.ExecutionResult, error) { + execReqBytes, err := proto.Marshal(request) + if err != nil { + return nil, fmt.Errorf("failed to marshal ExecuteRequest: %w", err) + } + + protoSecrets := make([]*confworkflowtypes.SecretIdentifier, len(m.vaultDonSecrets)) + for i, s := range m.vaultDonSecrets { + // VaultDON treats "main" as the default namespace for secrets. + ns := s.Namespace + if ns == "" { + ns = "main" + } + protoSecrets[i] = &confworkflowtypes.SecretIdentifier{ + Key: s.Key, + Namespace: &ns, + } + } + + binaryURL := m.binaryURL + if m.binaryURLResolver != nil { + resolved, resolveErr := m.binaryURLResolver(ctx, m.workflowID) + if resolveErr != nil { + return nil, fmt.Errorf("failed to resolve binary URL: %w", resolveErr) + } + binaryURL = resolved + } + + capInput := &confworkflowtypes.ConfidentialWorkflowRequest{ + VaultDonSecrets: protoSecrets, + Execution: &confworkflowtypes.WorkflowExecution{ + WorkflowId: m.workflowID, + BinaryUrl: binaryURL, + BinaryHash: m.binaryHash, + ExecuteRequest: execReqBytes, + Owner: m.workflowOwner, + ExecutionId: helper.GetWorkflowExecutionID(), + }, + } + + payload, err := anypb.New(capInput) + if err != nil { + return nil, fmt.Errorf("failed to marshal capability payload: %w", err) + } + + executable, err := m.capRegistry.GetExecutable(ctx, confidentialWorkflowsCapabilityID) + if err != nil { + return nil, fmt.Errorf("failed to get confidential-workflows capability: %w", err) + } + + capReq := capabilities.CapabilityRequest{ + Payload: payload, + Method: "Execute", + CapabilityId: confidentialWorkflowsCapabilityID, + Metadata: capabilities.RequestMetadata{ + WorkflowID: m.workflowID, + WorkflowOwner: m.workflowOwner, + WorkflowName: m.workflowName, + WorkflowTag: m.workflowTag, + WorkflowExecutionID: helper.GetWorkflowExecutionID(), + }, + } + + capResp, err := executable.Execute(ctx, capReq) + if err != nil { + return nil, fmt.Errorf("confidential-workflows capability execution failed: %w", err) + } + + if capResp.Payload == nil { + return nil, errors.New("confidential-workflows capability returned nil payload") + } + + var confResp confworkflowtypes.ConfidentialWorkflowResponse + if err := capResp.Payload.UnmarshalTo(&confResp); err != nil { + return nil, fmt.Errorf("failed to unmarshal capability response: %w", err) + } + + var result sdkpb.ExecutionResult + if err := proto.Unmarshal(confResp.ExecutionResult, &result); err != nil { + return nil, fmt.Errorf("failed to unmarshal ExecutionResult: %w", err) + } + + return &result, nil +} + +// ComputeBinaryHash returns the SHA-256 hash of the given binary. +func ComputeBinaryHash(binary []byte) []byte { + h := sha256.Sum256(binary) + return h[:] +} diff --git a/core/services/workflows/v2/confidential_module_test.go b/core/services/workflows/v2/confidential_module_test.go new file mode 100644 index 00000000000..e72d7164b29 --- /dev/null +++ b/core/services/workflows/v2/confidential_module_test.go @@ -0,0 +1,440 @@ +package v2 + +import ( + "context" + "crypto/sha256" + "errors" + "testing" + "time" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/mock" + "github.com/stretchr/testify/require" + "google.golang.org/protobuf/proto" + "google.golang.org/protobuf/types/known/anypb" + + "github.com/smartcontractkit/chainlink-common/pkg/capabilities" + "github.com/smartcontractkit/chainlink-common/pkg/logger" + regmocks "github.com/smartcontractkit/chainlink-common/pkg/types/core/mocks" + "github.com/smartcontractkit/chainlink-common/pkg/workflows/wasm/host" + + confworkflowtypes "github.com/smartcontractkit/chainlink-common/pkg/capabilities/v2/actions/confidentialworkflow" + capmocks "github.com/smartcontractkit/chainlink/v2/core/capabilities/mocks" + "github.com/smartcontractkit/chainlink/v2/core/utils/matches" + + sdkpb "github.com/smartcontractkit/chainlink-protos/cre/go/sdk" + valuespb "github.com/smartcontractkit/chainlink-protos/cre/go/values/pb" +) + +// stubExecutionHelper implements host.ExecutionHelper for testing. +type stubExecutionHelper struct { + executionID string +} + +var _ host.ExecutionHelper = (*stubExecutionHelper)(nil) + +func (s *stubExecutionHelper) CallCapability(context.Context, *sdkpb.CapabilityRequest) (*sdkpb.CapabilityResponse, error) { + return nil, nil +} +func (s *stubExecutionHelper) GetSecrets(context.Context, *sdkpb.GetSecretsRequest) ([]*sdkpb.SecretResponse, error) { + return nil, nil +} +func (s *stubExecutionHelper) GetWorkflowExecutionID() string { return s.executionID } +func (s *stubExecutionHelper) GetNodeTime() time.Time { return time.Time{} } +func (s *stubExecutionHelper) GetDONTime() (time.Time, error) { return time.Time{}, nil } +func (s *stubExecutionHelper) EmitUserLog(string) error { return nil } + +func TestParseWorkflowAttributes(t *testing.T) { + t.Run("valid JSON with all fields", func(t *testing.T) { + data := []byte(`{"confidential":true,"vault_don_secrets":[{"key":"API_KEY"},{"key":"SIGNING_KEY","namespace":"custom-ns"}]}`) + attrs, err := ParseWorkflowAttributes(data) + require.NoError(t, err) + assert.True(t, attrs.Confidential) + require.Len(t, attrs.VaultDonSecrets, 2) + assert.Equal(t, "API_KEY", attrs.VaultDonSecrets[0].Key) + assert.Empty(t, attrs.VaultDonSecrets[0].Namespace) + assert.Equal(t, "SIGNING_KEY", attrs.VaultDonSecrets[1].Key) + assert.Equal(t, "custom-ns", attrs.VaultDonSecrets[1].Namespace) + }) + + t.Run("empty data returns zero value", func(t *testing.T) { + attrs, err := ParseWorkflowAttributes(nil) + require.NoError(t, err) + assert.False(t, attrs.Confidential) + assert.Nil(t, attrs.VaultDonSecrets) + + attrs, err = ParseWorkflowAttributes([]byte{}) + require.NoError(t, err) + assert.False(t, attrs.Confidential) + }) + + t.Run("non-confidential workflow", func(t *testing.T) { + data := []byte(`{"confidential":false}`) + attrs, err := ParseWorkflowAttributes(data) + require.NoError(t, err) + assert.False(t, attrs.Confidential) + }) + + t.Run("malformed JSON returns error", func(t *testing.T) { + _, err := ParseWorkflowAttributes([]byte(`{not json}`)) + require.Error(t, err) + assert.Contains(t, err.Error(), "failed to parse workflow attributes") + }) +} + +func TestIsConfidential(t *testing.T) { + t.Run("returns true for confidential", func(t *testing.T) { + ok, err := IsConfidential([]byte(`{"confidential":true}`)) + require.NoError(t, err) + assert.True(t, ok) + }) + + t.Run("returns false for non-confidential", func(t *testing.T) { + ok, err := IsConfidential([]byte(`{"confidential":false}`)) + require.NoError(t, err) + assert.False(t, ok) + }) + + t.Run("returns false for empty data", func(t *testing.T) { + ok, err := IsConfidential(nil) + require.NoError(t, err) + assert.False(t, ok) + }) + + t.Run("returns error for malformed JSON", func(t *testing.T) { + _, err := IsConfidential([]byte(`broken`)) + require.Error(t, err) + assert.Contains(t, err.Error(), "failed to parse workflow attributes") + }) +} + +func TestComputeBinaryHash(t *testing.T) { + binary := []byte("hello world") + hash := ComputeBinaryHash(binary) + expected := sha256.Sum256(binary) + assert.Equal(t, expected[:], hash) + + // Deterministic: same input produces same hash. + assert.Equal(t, hash, ComputeBinaryHash(binary)) +} + +func TestConfidentialModule_Execute(t *testing.T) { + ctx := context.Background() + lggr := logger.Nop() + + // Build an ExecuteRequest to send through the module. + execReq := &sdkpb.ExecuteRequest{ + Config: []byte("test-config"), + } + + // Build the expected ExecutionResult that the enclave returns. + expectedResult := &sdkpb.ExecutionResult{ + Result: &sdkpb.ExecutionResult_Value{ + Value: valuespb.NewStringValue("enclave-output"), + }, + } + + // Serialize the result into a ConfidentialWorkflowResponse, as the capability would. + resultBytes, err := proto.Marshal(expectedResult) + require.NoError(t, err) + + confResp := &confworkflowtypes.ConfidentialWorkflowResponse{ + ExecutionResult: resultBytes, + } + respPayload, err := anypb.New(confResp) + require.NoError(t, err) + + t.Run("success", func(t *testing.T) { + capReg := regmocks.NewCapabilitiesRegistry(t) + execCap := capmocks.NewExecutableCapability(t) + + capReg.EXPECT().GetExecutable(matches.AnyContext, confidentialWorkflowsCapabilityID). + Return(execCap, nil).Once() + + execCap.EXPECT().Execute(matches.AnyContext, mock.MatchedBy(func(req capabilities.CapabilityRequest) bool { + return req.Method == "Execute" && + req.CapabilityId == confidentialWorkflowsCapabilityID && + req.Metadata.WorkflowID == "wf-123" && + req.Metadata.WorkflowOwner == "owner-abc" && + req.Metadata.WorkflowExecutionID == "exec-456" && + req.Payload != nil + })).Return(capabilities.CapabilityResponse{Payload: respPayload}, nil).Once() + + mod := NewConfidentialModule( + capReg, + "https://example.com/binary.wasm", + []byte("fakehash"), + "wf-123", + "owner-abc", + "my-workflow", + "v1", + []SecretIdentifier{ + {Key: "API_KEY"}, + {Key: "SIGNING_KEY", Namespace: "custom-ns"}, + }, + nil, + lggr, + ) + + result, err := mod.Execute(ctx, execReq, &stubExecutionHelper{executionID: "exec-456"}) + require.NoError(t, err) + require.NotNil(t, result) + + val := result.GetValue() + require.NotNil(t, val) + assert.Equal(t, "enclave-output", val.GetStringValue()) + }) + + t.Run("default namespace is main", func(t *testing.T) { + capReg := regmocks.NewCapabilitiesRegistry(t) + execCap := capmocks.NewExecutableCapability(t) + + capReg.EXPECT().GetExecutable(matches.AnyContext, confidentialWorkflowsCapabilityID). + Return(execCap, nil).Once() + + // Capture the request to inspect proto secrets. + var capturedReq capabilities.CapabilityRequest + execCap.EXPECT().Execute(matches.AnyContext, mock.Anything). + Run(func(_ context.Context, req capabilities.CapabilityRequest) { + capturedReq = req + }). + Return(capabilities.CapabilityResponse{Payload: respPayload}, nil).Once() + + mod := NewConfidentialModule( + capReg, + "https://example.com/binary.wasm", + []byte("hash"), + "wf-1", "owner", "name", "tag", + []SecretIdentifier{{Key: "SECRET_A"}}, // no namespace + nil, + lggr, + ) + + _, err := mod.Execute(ctx, execReq, &stubExecutionHelper{executionID: "exec-1"}) + require.NoError(t, err) + + // Unmarshal the captured request payload and verify namespace defaulted to "main". + var confReq confworkflowtypes.ConfidentialWorkflowRequest + require.NoError(t, capturedReq.Payload.UnmarshalTo(&confReq)) + require.Len(t, confReq.VaultDonSecrets, 1) + assert.Equal(t, "SECRET_A", confReq.VaultDonSecrets[0].Key) + assert.Equal(t, "main", confReq.VaultDonSecrets[0].GetNamespace()) + }) + + t.Run("GetExecutable error", func(t *testing.T) { + capReg := regmocks.NewCapabilitiesRegistry(t) + capReg.EXPECT().GetExecutable(matches.AnyContext, confidentialWorkflowsCapabilityID). + Return(nil, errors.New("capability not found")).Once() + + mod := NewConfidentialModule( + capReg, "", nil, "wf", "owner", "name", "tag", nil, nil, lggr, + ) + + _, err := mod.Execute(ctx, execReq, &stubExecutionHelper{}) + require.Error(t, err) + assert.Contains(t, err.Error(), "failed to get confidential-workflows capability") + }) + + t.Run("capability Execute error", func(t *testing.T) { + capReg := regmocks.NewCapabilitiesRegistry(t) + execCap := capmocks.NewExecutableCapability(t) + + capReg.EXPECT().GetExecutable(matches.AnyContext, confidentialWorkflowsCapabilityID). + Return(execCap, nil).Once() + execCap.EXPECT().Execute(matches.AnyContext, mock.Anything). + Return(capabilities.CapabilityResponse{}, errors.New("enclave unavailable")).Once() + + mod := NewConfidentialModule( + capReg, "", nil, "wf", "owner", "name", "tag", nil, nil, lggr, + ) + + _, err := mod.Execute(ctx, execReq, &stubExecutionHelper{}) + require.Error(t, err) + assert.Contains(t, err.Error(), "confidential-workflows capability execution failed") + }) + + t.Run("nil payload in response", func(t *testing.T) { + capReg := regmocks.NewCapabilitiesRegistry(t) + execCap := capmocks.NewExecutableCapability(t) + + capReg.EXPECT().GetExecutable(matches.AnyContext, confidentialWorkflowsCapabilityID). + Return(execCap, nil).Once() + execCap.EXPECT().Execute(matches.AnyContext, mock.Anything). + Return(capabilities.CapabilityResponse{Payload: nil}, nil).Once() + + mod := NewConfidentialModule( + capReg, "", nil, "wf", "owner", "name", "tag", nil, nil, lggr, + ) + + _, err := mod.Execute(ctx, execReq, &stubExecutionHelper{}) + require.Error(t, err) + assert.Contains(t, err.Error(), "returned nil payload") + }) + + t.Run("request fields are forwarded correctly", func(t *testing.T) { + capReg := regmocks.NewCapabilitiesRegistry(t) + execCap := capmocks.NewExecutableCapability(t) + + capReg.EXPECT().GetExecutable(matches.AnyContext, confidentialWorkflowsCapabilityID). + Return(execCap, nil).Once() + + var capturedReq capabilities.CapabilityRequest + execCap.EXPECT().Execute(matches.AnyContext, mock.Anything). + Run(func(_ context.Context, req capabilities.CapabilityRequest) { + capturedReq = req + }). + Return(capabilities.CapabilityResponse{Payload: respPayload}, nil).Once() + + binaryHash := ComputeBinaryHash([]byte("some-binary")) + mod := NewConfidentialModule( + capReg, + "https://example.com/wasm", + binaryHash, + "wf-abc", + "0xowner", + "my-workflow", + "v2", + []SecretIdentifier{ + {Key: "K1", Namespace: "ns1"}, + {Key: "K2"}, + }, + nil, + lggr, + ) + + _, err := mod.Execute(ctx, execReq, &stubExecutionHelper{executionID: "exec-xyz"}) + require.NoError(t, err) + + // Verify metadata. + assert.Equal(t, "Execute", capturedReq.Method) + assert.Equal(t, confidentialWorkflowsCapabilityID, capturedReq.CapabilityId) + assert.Equal(t, "wf-abc", capturedReq.Metadata.WorkflowID) + assert.Equal(t, "0xowner", capturedReq.Metadata.WorkflowOwner) + assert.Equal(t, "my-workflow", capturedReq.Metadata.WorkflowName) + assert.Equal(t, "v2", capturedReq.Metadata.WorkflowTag) + assert.Equal(t, "exec-xyz", capturedReq.Metadata.WorkflowExecutionID) + + // Verify payload contents. + var confReq confworkflowtypes.ConfidentialWorkflowRequest + require.NoError(t, capturedReq.Payload.UnmarshalTo(&confReq)) + + assert.Equal(t, "wf-abc", confReq.Execution.WorkflowId) + assert.Equal(t, "https://example.com/wasm", confReq.Execution.BinaryUrl) + assert.Equal(t, binaryHash, confReq.Execution.BinaryHash) + + // Verify the serialized ExecuteRequest round-trips. + var roundTripped sdkpb.ExecuteRequest + require.NoError(t, proto.Unmarshal(confReq.Execution.ExecuteRequest, &roundTripped)) + assert.Equal(t, execReq.GetConfig(), roundTripped.GetConfig()) + + // Verify secrets. + require.Len(t, confReq.VaultDonSecrets, 2) + assert.Equal(t, "K1", confReq.VaultDonSecrets[0].Key) + assert.Equal(t, "ns1", confReq.VaultDonSecrets[0].GetNamespace()) + assert.Equal(t, "K2", confReq.VaultDonSecrets[1].Key) + assert.Equal(t, "main", confReq.VaultDonSecrets[1].GetNamespace()) + }) +} + +func TestConfidentialModule_BinaryURLResolver(t *testing.T) { + ctx := context.Background() + lggr := logger.Nop() + + execReq := &sdkpb.ExecuteRequest{Config: []byte("cfg")} + + expectedResult := &sdkpb.ExecutionResult{ + Result: &sdkpb.ExecutionResult_Value{ + Value: valuespb.NewStringValue("ok"), + }, + } + resultBytes, err := proto.Marshal(expectedResult) + require.NoError(t, err) + confResp := &confworkflowtypes.ConfidentialWorkflowResponse{ExecutionResult: resultBytes} + respPayload, err := anypb.New(confResp) + require.NoError(t, err) + + t.Run("resolver replaces binary URL", func(t *testing.T) { + capReg := regmocks.NewCapabilitiesRegistry(t) + execCap := capmocks.NewExecutableCapability(t) + + capReg.EXPECT().GetExecutable(matches.AnyContext, confidentialWorkflowsCapabilityID). + Return(execCap, nil).Once() + + var capturedReq capabilities.CapabilityRequest + execCap.EXPECT().Execute(matches.AnyContext, mock.Anything). + Run(func(_ context.Context, req capabilities.CapabilityRequest) { + capturedReq = req + }). + Return(capabilities.CapabilityResponse{Payload: respPayload}, nil).Once() + + resolver := func(_ context.Context, wfID string) (string, error) { + return "https://presigned.example.com/" + wfID + "?token=abc", nil + } + + mod := NewConfidentialModule( + capReg, "https://storage.example.com/raw", []byte("hash"), + "wf-1", "owner", "name", "tag", nil, resolver, lggr, + ) + + _, err := mod.Execute(ctx, execReq, &stubExecutionHelper{executionID: "exec-1"}) + require.NoError(t, err) + + var confReq confworkflowtypes.ConfidentialWorkflowRequest + require.NoError(t, capturedReq.Payload.UnmarshalTo(&confReq)) + assert.Equal(t, "https://presigned.example.com/wf-1?token=abc", confReq.Execution.BinaryUrl) + }) + + t.Run("nil resolver uses raw URL", func(t *testing.T) { + capReg := regmocks.NewCapabilitiesRegistry(t) + execCap := capmocks.NewExecutableCapability(t) + + capReg.EXPECT().GetExecutable(matches.AnyContext, confidentialWorkflowsCapabilityID). + Return(execCap, nil).Once() + + var capturedReq capabilities.CapabilityRequest + execCap.EXPECT().Execute(matches.AnyContext, mock.Anything). + Run(func(_ context.Context, req capabilities.CapabilityRequest) { + capturedReq = req + }). + Return(capabilities.CapabilityResponse{Payload: respPayload}, nil).Once() + + mod := NewConfidentialModule( + capReg, "https://storage.example.com/raw", []byte("hash"), + "wf-1", "owner", "name", "tag", nil, nil, lggr, + ) + + _, err := mod.Execute(ctx, execReq, &stubExecutionHelper{executionID: "exec-1"}) + require.NoError(t, err) + + var confReq confworkflowtypes.ConfidentialWorkflowRequest + require.NoError(t, capturedReq.Payload.UnmarshalTo(&confReq)) + assert.Equal(t, "https://storage.example.com/raw", confReq.Execution.BinaryUrl) + }) + + t.Run("resolver error propagates", func(t *testing.T) { + capReg := regmocks.NewCapabilitiesRegistry(t) + + resolver := func(_ context.Context, _ string) (string, error) { + return "", errors.New("storage service unavailable") + } + + mod := NewConfidentialModule( + capReg, "https://storage.example.com/raw", []byte("hash"), + "wf-1", "owner", "name", "tag", nil, resolver, lggr, + ) + + _, err := mod.Execute(ctx, execReq, &stubExecutionHelper{executionID: "exec-1"}) + require.Error(t, err) + assert.Contains(t, err.Error(), "failed to resolve binary URL") + assert.Contains(t, err.Error(), "storage service unavailable") + }) +} + +func TestConfidentialModule_InterfaceMethods(t *testing.T) { + mod := &ConfidentialModule{} + + // These are no-ops but should not panic. + mod.Start() + mod.Close() + assert.False(t, mod.IsLegacyDAG()) +} diff --git a/core/store/migrate/migrations/0295_add_workflow_attributes_column.sql b/core/store/migrate/migrations/0295_add_workflow_attributes_column.sql new file mode 100644 index 00000000000..30c85db7289 --- /dev/null +++ b/core/store/migrate/migrations/0295_add_workflow_attributes_column.sql @@ -0,0 +1,5 @@ +-- +goose Up +ALTER TABLE workflow_specs_v2 ADD COLUMN attributes bytea DEFAULT ''; + +-- +goose Down +ALTER TABLE workflow_specs_v2 DROP COLUMN attributes; diff --git a/deployment/cre/jobs/pkg/gateway_job.go b/deployment/cre/jobs/pkg/gateway_job.go index 950ebc79b41..3bc6828cac1 100644 --- a/deployment/cre/jobs/pkg/gateway_job.go +++ b/deployment/cre/jobs/pkg/gateway_job.go @@ -14,9 +14,11 @@ const ( GatewayHandlerTypeWebAPICapabilities = "web-api-capabilities" GatewayHandlerTypeHTTPCapabilities = "http-capabilities" GatewayHandlerTypeVault = "vault" + GatewayHandlerTypeConfidentialRelay = "confidential-compute-relay" - ServiceNameWorkflows = "workflows" - ServiceNameVault = "vault" + ServiceNameWorkflows = "workflows" + ServiceNameVault = "vault" + ServiceNameConfidential = "confidential" minimumRequestTimeoutSec = 5 ) @@ -28,6 +30,8 @@ func HandlerServiceName(handlerType string) string { return ServiceNameVault case GatewayHandlerTypeHTTPCapabilities, GatewayHandlerTypeWebAPICapabilities: return ServiceNameWorkflows + case GatewayHandlerTypeConfidentialRelay: + return ServiceNameConfidential default: return handlerType } @@ -226,6 +230,10 @@ func (g GatewayJob) buildLegacyDons() ([]legacyDON, error) { hs = append(hs, newDefaultVaultHandler(g.RequestTimeoutSec)) case GatewayHandlerTypeHTTPCapabilities: hs = append(hs, newDefaultHTTPCapabilitiesHandler()) + case GatewayHandlerTypeConfidentialRelay: + // -1 so the handler times out before the gateway, allowing a clean error response. + // TODO: the vault handler does the same -1 internally; unify both to use this pattern. + hs = append(hs, newDefaultConfidentialRelayHandler(g.RequestTimeoutSec-1)) default: return nil, errors.New("unknown handler type: " + ht) } @@ -266,6 +274,9 @@ func (g GatewayJob) buildServicesAndShardedDONs() ([]shardedDON, []service, erro handlers = append(handlers, newDefaultVaultHandler(g.RequestTimeoutSec)) case GatewayHandlerTypeHTTPCapabilities: handlers = append(handlers, newDefaultHTTPCapabilitiesHandler()) + case GatewayHandlerTypeConfidentialRelay: + // -1 so the handler times out before the gateway, allowing a clean error response. + handlers = append(handlers, newDefaultConfidentialRelayHandler(g.RequestTimeoutSec-1)) default: return nil, nil, errors.New("unknown handler type: " + ht) } @@ -307,8 +318,8 @@ type vaultHandlerConfig struct { func newDefaultVaultHandler(requestTimeoutSec int) handler { return handler{ - Name: "vault", - ServiceName: "vault", + Name: GatewayHandlerTypeVault, + ServiceName: ServiceNameVault, Config: vaultHandlerConfig{ // must be lower than the overall gateway request timeout. // so we allow for the response to be sent back. @@ -432,7 +443,7 @@ type httpCapabilitiesHandlerConfig struct { func newDefaultHTTPCapabilitiesHandler() handler { return handler{ Name: GatewayHandlerTypeHTTPCapabilities, - ServiceName: "workflows", + ServiceName: ServiceNameWorkflows, Config: httpCapabilitiesHandlerConfig{ CleanUpPeriodMs: 10 * 60 * 1000, // 10 minutes NodeRateLimiter: nodeRateLimiterConfig{ @@ -444,3 +455,17 @@ func newDefaultHTTPCapabilitiesHandler() handler { }, } } + +type confidentialRelayHandlerConfig struct { + RequestTimeoutSec int `toml:"requestTimeoutSec"` +} + +func newDefaultConfidentialRelayHandler(requestTimeoutSec int) handler { + return handler{ + Name: GatewayHandlerTypeConfidentialRelay, + ServiceName: ServiceNameConfidential, + Config: confidentialRelayHandlerConfig{ + RequestTimeoutSec: requestTimeoutSec, + }, + } +} diff --git a/deployment/cre/jobs/pkg/gateway_job_test.go b/deployment/cre/jobs/pkg/gateway_job_test.go index 90d438284b3..d10f56b2df1 100644 --- a/deployment/cre/jobs/pkg/gateway_job_test.go +++ b/deployment/cre/jobs/pkg/gateway_job_test.go @@ -30,6 +30,16 @@ func TestGateway_Validate_ServiceCentric(t *testing.T) { require.ErrorContains(t, g.Validate(), "must provide at least one service") } +func TestNewDefaultConfidentialRelayHandler(t *testing.T) { + t.Parallel() + + got := newDefaultConfidentialRelayHandler(14) + + assert.Equal(t, GatewayHandlerTypeConfidentialRelay, got.Name) + assert.Equal(t, ServiceNameConfidential, got.ServiceName) + assert.Equal(t, confidentialRelayHandlerConfig{RequestTimeoutSec: 14}, got.Config) +} + const ( expected = `type = 'gateway' schemaVersion = 1 diff --git a/deployment/go.mod b/deployment/go.mod index d743293223e..550136bfd35 100644 --- a/deployment/go.mod +++ b/deployment/go.mod @@ -44,7 +44,7 @@ require ( github.com/smartcontractkit/chainlink-ccip/chains/solana v0.0.0-20260224214816-cb23ec38649f github.com/smartcontractkit/chainlink-ccip/chains/solana/gobindings v0.0.0-20260310183131-8d0f0e383288 github.com/smartcontractkit/chainlink-ccip/deployment v0.0.0-20260317175207-e9ff89561326 - github.com/smartcontractkit/chainlink-common v0.11.2-0.20260331163339-a3c0d217e843 + github.com/smartcontractkit/chainlink-common v0.11.2-0.20260402120824-48154c0c65a6 github.com/smartcontractkit/chainlink-common/keystore v1.0.2 github.com/smartcontractkit/chainlink-deployments-framework v0.86.3 github.com/smartcontractkit/chainlink-evm v0.3.4-0.20260330133421-5151ea0c3b05 diff --git a/deployment/go.sum b/deployment/go.sum index ae4f6d0d6f3..ee90c55a682 100644 --- a/deployment/go.sum +++ b/deployment/go.sum @@ -1387,8 +1387,8 @@ github.com/smartcontractkit/chainlink-ccip/deployment v0.0.0-20260317175207-e9ff github.com/smartcontractkit/chainlink-ccip/deployment v0.0.0-20260317175207-e9ff89561326/go.mod h1:P0/tjeeIIxfsBupk5MneRjq5uI9mj+ZQpMpYnFla6WM= github.com/smartcontractkit/chainlink-ccv v0.0.0-20260324000441-d4cfddc9f7d2 h1:5HdH/A6yn8INZAltYDLb7UkUi5IKemhJzJkDW4Bgxyg= github.com/smartcontractkit/chainlink-ccv v0.0.0-20260324000441-d4cfddc9f7d2/go.mod h1:wDHq2E0KwUWG0lQ9f5frW1a7CKVW17MJLPuvKmtSRDg= -github.com/smartcontractkit/chainlink-common v0.11.2-0.20260331163339-a3c0d217e843 h1:yzkeWzWoPTbpDvVIz0ohmNVqAkvE8UwuLqqcUt47gYk= -github.com/smartcontractkit/chainlink-common v0.11.2-0.20260331163339-a3c0d217e843/go.mod h1:6tlxlsiWypGdpaZI+Kz5gFm53gCAcU/pTU3PR9CiFB8= +github.com/smartcontractkit/chainlink-common v0.11.2-0.20260402120824-48154c0c65a6 h1:oaXslIvcy5HD3zkWhx3nu8vRGdWGedYJ+XCsBD8mYkA= +github.com/smartcontractkit/chainlink-common v0.11.2-0.20260402120824-48154c0c65a6/go.mod h1:Ea94/OgfFPRTByGO2Qo+uZ7/4sWhwE2HKu7dDwdojME= github.com/smartcontractkit/chainlink-common/keystore v1.0.2 h1:AWisx4JT3QV8tcgh6J5NCrex+wAgTYpWyHsyNPSXzsQ= github.com/smartcontractkit/chainlink-common/keystore v1.0.2/go.mod h1:rSkIHdomyak3YnUtXLenl6poIq8q0V3UZPiiyYqPdGA= github.com/smartcontractkit/chainlink-common/pkg/chipingress v0.0.10 h1:FJAFgXS9oqASnkS03RE1HQwYQQxrO4l46O5JSzxqLgg= diff --git a/plugins/plugins.private.yaml b/plugins/plugins.private.yaml index 01c184d0132..7276869a68f 100644 --- a/plugins/plugins.private.yaml +++ b/plugins/plugins.private.yaml @@ -56,5 +56,9 @@ plugins: installPath: "." confidential-http: - moduleURI: "github.com/smartcontractkit/confidential-compute/enclave/apps/confidential-http/capability" - gitRef: "187018af88ce265ca70b86222f8587e3fcb7ff32" + gitRef: "526a251" installPath: "./cmd/confidential-http" + confidential-workflows: + - moduleURI: "github.com/smartcontractkit/confidential-compute/enclave/apps/confidential-workflows/capability" + gitRef: "526a251" + installPath: "./cmd/confidential-workflows" diff --git a/system-tests/lib/cre/features/confidential_relay/confidential_relay.go b/system-tests/lib/cre/features/confidential_relay/confidential_relay.go new file mode 100644 index 00000000000..f6f72802ac3 --- /dev/null +++ b/system-tests/lib/cre/features/confidential_relay/confidential_relay.go @@ -0,0 +1,86 @@ +package confidentialrelay + +import ( + "context" + + tomlser "github.com/pelletier/go-toml/v2" + "github.com/pkg/errors" + "github.com/rs/zerolog" + + chainselectors "github.com/smartcontractkit/chain-selectors" + + "github.com/smartcontractkit/chainlink/deployment/cre/jobs/pkg" + "github.com/smartcontractkit/chainlink/system-tests/lib/cre" + coretoml "github.com/smartcontractkit/chainlink/v2/core/config/toml" + corechainlink "github.com/smartcontractkit/chainlink/v2/core/services/chainlink" +) + +const flag = cre.ConfidentialRelayCapability + +type ConfidentialRelay struct{} + +func (o *ConfidentialRelay) Flag() cre.CapabilityFlag { + return flag +} + +func (o *ConfidentialRelay) PreEnvStartup( + ctx context.Context, + testLogger zerolog.Logger, + don *cre.DonMetadata, + topology *cre.Topology, + creEnv *cre.Environment, +) (*cre.PreEnvStartupOutput, error) { + registryChainID, chErr := chainselectors.ChainIdFromSelector(creEnv.RegistryChainSelector) + if chErr != nil { + return nil, errors.Wrapf(chErr, "failed to get chain ID from selector %d", creEnv.RegistryChainSelector) + } + + hErr := topology.AddGatewayHandlers(*don, []string{pkg.GatewayHandlerTypeConfidentialRelay}) + if hErr != nil { + return nil, errors.Wrapf(hErr, "failed to add gateway handlers to gateway config for don %s", don.Name) + } + + cErr := don.ConfigureForGatewayAccess(registryChainID, *topology.GatewayConnectors) + if cErr != nil { + return nil, errors.Wrapf(cErr, "failed to add gateway connectors to node's TOML config for don %s", don.Name) + } + + // Set TOML config to activate the confidential relay handler on DON nodes. + capConfig, ok := don.CapabilityConfigs[flag] + if ok && capConfig.Values != nil { + ns := don.MustNodeSet() + for i := range ns.NodeSpecs { + currentConfig := ns.NodeSpecs[i].Node.TestConfigOverrides + var typedConfig corechainlink.Config + if currentConfig != "" { + if err := tomlser.Unmarshal([]byte(currentConfig), &typedConfig); err != nil { + return nil, errors.Wrapf(err, "failed to unmarshal node TOML config for node %d", i) + } + } + + enabled := true + typedConfig.CRE.ConfidentialRelay = &coretoml.ConfidentialRelayConfig{Enabled: &enabled} + + out, err := tomlser.Marshal(typedConfig) + if err != nil { + return nil, errors.Wrapf(err, "failed to marshal node TOML config for node %d", i) + } + ns.NodeSpecs[i].Node.TestConfigOverrides = string(out) + } + } + + // No on-chain capability registration needed. The relay handler is a CRE subservice, + // not a registered capability. The mock capability that runs on the relay DON is + // registered separately via the mock flag. + return &cre.PreEnvStartupOutput{}, nil +} + +func (o *ConfidentialRelay) PostEnvStartup( + ctx context.Context, + testLogger zerolog.Logger, + don *cre.Don, + dons *cre.Dons, + creEnv *cre.Environment, +) error { + return nil +} diff --git a/system-tests/lib/cre/features/mock/mock.go b/system-tests/lib/cre/features/mock/mock.go index 4166243f7ff..a504a04d220 100644 --- a/system-tests/lib/cre/features/mock/mock.go +++ b/system-tests/lib/cre/features/mock/mock.go @@ -43,7 +43,7 @@ func (o *Mock) PreEnvStartup( Capability: kcr.CapabilitiesRegistryCapability{ LabelledName: "mock", Version: "1.0.0", - CapabilityType: 0, // TRIGGER + CapabilityType: 1, // ACTION }, Config: &capabilitiespb.CapabilityConfig{ LocalOnly: don.HasOnlyLocalCapabilities(), diff --git a/system-tests/lib/cre/types.go b/system-tests/lib/cre/types.go index 6402d41e2a8..f1c0dec70eb 100644 --- a/system-tests/lib/cre/types.go +++ b/system-tests/lib/cre/types.go @@ -57,23 +57,24 @@ const ( // Capabilities const ( - ConsensusCapability CapabilityFlag = "ocr3" - DONTimeCapability CapabilityFlag = "don-time" - ConsensusCapabilityV2 CapabilityFlag = "consensus" // v2 - CronCapability CapabilityFlag = "cron" - EVMCapability CapabilityFlag = "evm" - CustomComputeCapability CapabilityFlag = "custom-compute" - WriteEVMCapability CapabilityFlag = "write-evm" - ReadContractCapability CapabilityFlag = "read-contract" - LogEventTriggerCapability CapabilityFlag = "log-event-trigger" - WebAPITargetCapability CapabilityFlag = "web-api-target" - WebAPITriggerCapability CapabilityFlag = "web-api-trigger" - MockCapability CapabilityFlag = "mock" - VaultCapability CapabilityFlag = "vault" - HTTPTriggerCapability CapabilityFlag = "http-trigger" - HTTPActionCapability CapabilityFlag = "http-action" - SolanaCapability CapabilityFlag = "solana" - AptosCapability CapabilityFlag = "aptos" + ConsensusCapability CapabilityFlag = "ocr3" + DONTimeCapability CapabilityFlag = "don-time" + ConsensusCapabilityV2 CapabilityFlag = "consensus" // v2 + CronCapability CapabilityFlag = "cron" + EVMCapability CapabilityFlag = "evm" + CustomComputeCapability CapabilityFlag = "custom-compute" + WriteEVMCapability CapabilityFlag = "write-evm" + ReadContractCapability CapabilityFlag = "read-contract" + LogEventTriggerCapability CapabilityFlag = "log-event-trigger" + WebAPITargetCapability CapabilityFlag = "web-api-target" + WebAPITriggerCapability CapabilityFlag = "web-api-trigger" + MockCapability CapabilityFlag = "mock" + VaultCapability CapabilityFlag = "vault" + HTTPTriggerCapability CapabilityFlag = "http-trigger" + HTTPActionCapability CapabilityFlag = "http-action" + SolanaCapability CapabilityFlag = "solana" + ConfidentialRelayCapability CapabilityFlag = "confidential-relay" + AptosCapability CapabilityFlag = "aptos" // Add more capabilities as needed ) diff --git a/system-tests/lib/cre/workflow/workflow.go b/system-tests/lib/cre/workflow/workflow.go index 2e7df738450..6560a9a4cfe 100644 --- a/system-tests/lib/cre/workflow/workflow.go +++ b/system-tests/lib/cre/workflow/workflow.go @@ -55,6 +55,7 @@ func RegisterWithContract( version *semver.Version, donID uint64, workflowName, binaryURL string, configURL, secretsURL *string, + attributes []byte, artifactsDirInContainer *string, ) (string, error) { // Download and decode workflow binary @@ -92,7 +93,7 @@ func RegisterWithContract( // Register workflow based on version switch version.Major() { case 2: - if err := registerWorkflowV2(sc, workflowRegistryAddr, version, workflowName, workflowID, binaryURLToUse, configURLToUse); err != nil { + if err := registerWorkflowV2(sc, workflowRegistryAddr, version, workflowName, workflowID, binaryURLToUse, configURLToUse, attributes); err != nil { return "", err } default: @@ -227,6 +228,7 @@ func registerWorkflowV2( workflowRegistryAddr common.Address, version *semver.Version, workflowName, workflowID, binaryURL, configURL string, + attributes []byte, ) error { registry, err := getRegistryV2Instance(sc, workflowRegistryAddr, version) if err != nil { @@ -251,7 +253,7 @@ func registerWorkflowV2( contracts.DonFamily, binaryURL, configURL, - nil, + attributes, false, )) if err != nil { diff --git a/system-tests/tests/test-helpers/t_helpers.go b/system-tests/tests/test-helpers/t_helpers.go index 830d071fb21..19fe5eba175 100644 --- a/system-tests/tests/test-helpers/t_helpers.go +++ b/system-tests/tests/test-helpers/t_helpers.go @@ -17,8 +17,6 @@ package helpers import ( "context" - "crypto/sha256" - "encoding/hex" "encoding/json" "fmt" "math/big" @@ -28,7 +26,6 @@ import ( "slices" "strconv" "strings" - "sync" "testing" "time" @@ -77,9 +74,6 @@ import ( ) const WorkflowEngineInitErrorLog = "Workflow Engine initialization failed" -const maxWorkflowNameLen = 64 - -var deleteWorkflowsMu sync.Mutex ///////////////////////// // ENVIRONMENT HELPERS // @@ -330,12 +324,13 @@ type WorkflowRegistrationConfig struct { ConfigFilePath string CompressedWasmPath string SecretsURL string + Attributes []byte WorkflowRegistryAddr common.Address WorkflowRegistryVersion *semver.Version ChainID uint64 DonID uint64 ContainerTargetDir string - SethClient *seth.Client + Blockchains []blockchains.Blockchain } /* @@ -348,11 +343,11 @@ It returns the paths to: 1. the compressed WASM file; 2. the workflow config file. */ -func createWorkflowArtifacts[T WorkflowConfig](t *testing.T, testLogger zerolog.Logger, workflowName string, workflowDONs []*cre.Don, workflowConfig *T, workflowFileLocation, artifactDir string) (string, string) { +func createWorkflowArtifacts[T WorkflowConfig](t *testing.T, testLogger zerolog.Logger, workflowName string, workflowDONs []*cre.Don, workflowConfig *T, workflowFileLocation string) (string, string) { t.Helper() - workflowConfigFilePath := workflowConfigFactory(t, testLogger, workflowName, workflowConfig, artifactDir) - compressedWorkflowWasmPath, compileErr := creworkflow.CompileWorkflowToDir(t.Context(), workflowFileLocation, workflowName, artifactDir) + workflowConfigFilePath := workflowConfigFactory(t, testLogger, workflowName, workflowConfig) + compressedWorkflowWasmPath, compileErr := creworkflow.CompileWorkflow(t.Context(), workflowFileLocation, workflowName) require.NoError(t, compileErr, "failed to compile workflow '%s'", workflowFileLocation) testLogger.Info().Msg("Workflow compiled successfully.") @@ -373,7 +368,7 @@ Pass `nil` to skip workflow config file creation. Returns the path to the workflow config file. */ -func workflowConfigFactory[T WorkflowConfig](t *testing.T, testLogger zerolog.Logger, workflowName string, workflowConfig *T, outputDir string) (filePath string) { +func workflowConfigFactory[T WorkflowConfig](t *testing.T, testLogger zerolog.Logger, workflowName string, workflowConfig *T) (filePath string) { t.Helper() var workflowConfigFilePath string @@ -386,7 +381,7 @@ func workflowConfigFactory[T WorkflowConfig](t *testing.T, testLogger zerolog.Lo testLogger.Info().Msg("Workflow config file is not requested and will not be created.") case *portypes.WorkflowConfig: - workflowCfgFilePath, configErr := createPoRWorkflowConfigFile(workflowName, cfg, outputDir) + workflowCfgFilePath, configErr := createPoRWorkflowConfigFile(workflowName, cfg) workflowConfigFilePath = workflowCfgFilePath require.NoError(t, configErr, "failed to create PoR workflow config file") testLogger.Info().Msg("PoR workflow config file created.") @@ -401,7 +396,7 @@ func workflowConfigFactory[T WorkflowConfig](t *testing.T, testLogger zerolog.Lo cleanID = cleanID[:32] } cfg.FeedID = "0x" + cleanID - workflowCfgFilePath, configErr := CreateWorkflowYamlConfigFile(workflowName, cfg, outputDir) + workflowCfgFilePath, configErr := CreateWorkflowYamlConfigFile(workflowName, cfg) workflowConfigFilePath = workflowCfgFilePath require.NoError(t, configErr, "failed to create PoR v2 workflow config file") testLogger.Info().Msg("PoR v2 workflow config file created.") @@ -425,72 +420,72 @@ func workflowConfigFactory[T WorkflowConfig](t *testing.T, testLogger zerolog.Lo testLogger.Info().Msg("Aptos write roundtrip workflow config file created.") case *HTTPWorkflowConfig: - workflowCfgFilePath, configErr := createHTTPWorkflowConfigFile(workflowName, cfg, outputDir) + workflowCfgFilePath, configErr := createHTTPWorkflowConfigFile(workflowName, cfg) workflowConfigFilePath = workflowCfgFilePath require.NoError(t, configErr, "failed to create HTTP workflow config file") testLogger.Info().Msg("HTTP workflow config file created.") case *crontypes.WorkflowConfig: - workflowCfgFilePath, configErr := CreateWorkflowYamlConfigFile(workflowName, cfg, outputDir) + workflowCfgFilePath, configErr := CreateWorkflowYamlConfigFile(workflowName, cfg) workflowConfigFilePath = workflowCfgFilePath require.NoError(t, configErr, "failed to create Cron workflow config file") testLogger.Info().Msg("Cron workflow config file created.") case *consensus_negative_config.Config: - workflowCfgFilePath, configErr := CreateWorkflowYamlConfigFile(workflowName, cfg, outputDir) + workflowCfgFilePath, configErr := CreateWorkflowYamlConfigFile(workflowName, cfg) workflowConfigFilePath = workflowCfgFilePath require.NoError(t, configErr, "failed to create consensus workflow config file") testLogger.Info().Msg("Consensus workflow config file created.") case *evmread_config.Config: - workflowCfgFilePath, configErr := CreateWorkflowYamlConfigFile(workflowName, cfg, outputDir) + workflowCfgFilePath, configErr := CreateWorkflowYamlConfigFile(workflowName, cfg) workflowConfigFilePath = workflowCfgFilePath require.NoError(t, configErr, "failed to create evmread workflow config file") testLogger.Info().Msg("EVM Read workflow config file created.") case *logtrigger_config.Config: - workflowCfgFilePath, configErr := CreateWorkflowYamlConfigFile(workflowName, cfg, outputDir) + workflowCfgFilePath, configErr := CreateWorkflowYamlConfigFile(workflowName, cfg) workflowConfigFilePath = workflowCfgFilePath require.NoError(t, configErr, "failed to create logtrigger workflow config file") testLogger.Info().Msg("EVM LogTrigger workflow config file created.") case *evmread_negative_config.Config: - workflowCfgFilePath, configErr := CreateWorkflowYamlConfigFile(workflowName, cfg, outputDir) + workflowCfgFilePath, configErr := CreateWorkflowYamlConfigFile(workflowName, cfg) workflowConfigFilePath = workflowCfgFilePath require.NoError(t, configErr, "failed to create evmread-negative workflow config file") testLogger.Info().Msg("EVM Read negative workflow config file created.") case *evmwrite_negative_config.Config: - workflowCfgFilePath, configErr := CreateWorkflowYamlConfigFile(workflowName, cfg, outputDir) + workflowCfgFilePath, configErr := CreateWorkflowYamlConfigFile(workflowName, cfg) workflowConfigFilePath = workflowCfgFilePath require.NoError(t, configErr, "failed to create evmwrite-negative workflow config file") testLogger.Info().Msg("EVM Write negative workflow config file created.") case *logtrigger_negative_config.Config: - workflowCfgFilePath, configErr := CreateWorkflowYamlConfigFile(workflowName, cfg, outputDir) + workflowCfgFilePath, configErr := CreateWorkflowYamlConfigFile(workflowName, cfg) workflowConfigFilePath = workflowCfgFilePath require.NoError(t, configErr, "failed to create logtrigger-negative workflow config file") testLogger.Info().Msg("EVM LogTrigger negative workflow config file created.") case *http_config.Config: - workflowCfgFilePath, configErr := CreateWorkflowYamlConfigFile(workflowName, cfg, outputDir) + workflowCfgFilePath, configErr := CreateWorkflowYamlConfigFile(workflowName, cfg) workflowConfigFilePath = workflowCfgFilePath require.NoError(t, configErr, "failed to create http-negative workflow config file") testLogger.Info().Msg("HTTP negative workflow config file created.") case *httpaction_smoke_config.Config: - workflowCfgFilePath, configErr := CreateWorkflowYamlConfigFile(workflowName, cfg, outputDir) + workflowCfgFilePath, configErr := CreateWorkflowYamlConfigFile(workflowName, cfg) workflowConfigFilePath = workflowCfgFilePath require.NoError(t, configErr, "failed to create httpaction smoke workflow config file") testLogger.Info().Msg("HTTP Action smoke workflow config file created.") case *httpaction_negative_config.Config: - workflowCfgFilePath, configErr := CreateWorkflowYamlConfigFile(workflowName, cfg, outputDir) + workflowCfgFilePath, configErr := CreateWorkflowYamlConfigFile(workflowName, cfg) workflowConfigFilePath = workflowCfgFilePath require.NoError(t, configErr, "failed to create httpaction negative workflow config file") testLogger.Info().Msg("HTTP Action negative workflow config file created.") case *solwrite_config.Config: - workflowCfgFilePath, configErr := CreateWorkflowYamlConfigFile(workflowName, cfg, outputDir) + workflowCfgFilePath, configErr := CreateWorkflowYamlConfigFile(workflowName, cfg) workflowConfigFilePath = workflowCfgFilePath require.NoError(t, configErr, "failed to create solwrite workflow config file") testLogger.Info().Msg("Solana write workflow config file created.") @@ -515,14 +510,14 @@ It stores the values used by a workflow (main.go), The values are written to types.WorkflowConfig. The method returns the absolute path to the created config file. */ -func createPoRWorkflowConfigFile(workflowName string, workflowConfig *portypes.WorkflowConfig, outputDir string) (string, error) { +func createPoRWorkflowConfigFile(workflowName string, workflowConfig *portypes.WorkflowConfig) (string, error) { feedIDToUse, fIDerr := validateAndFormatFeedID(workflowConfig) if fIDerr != nil { return "", errors.Wrap(fIDerr, "failed to validate and format feed ID") } workflowConfig.FeedID = feedIDToUse - return CreateWorkflowYamlConfigFile(workflowName, workflowConfig, outputDir) + return CreateWorkflowYamlConfigFile(workflowName, workflowConfig) } func validateAndFormatFeedID(workflowConfig *portypes.WorkflowConfig) (string, error) { @@ -544,7 +539,7 @@ func validateAndFormatFeedID(workflowConfig *portypes.WorkflowConfig) (string, e return feedIDToUse, nil } -func createHTTPWorkflowConfigFile(workflowName string, cfg *HTTPWorkflowConfig, outputDir string) (string, error) { +func createHTTPWorkflowConfigFile(workflowName string, cfg *HTTPWorkflowConfig) (string, error) { testLogger := framework.L mockServerURL := cfg.URL parsedURL, urlErr := url.Parse(mockServerURL) @@ -564,7 +559,7 @@ func createHTTPWorkflowConfigFile(workflowName string, cfg *HTTPWorkflowConfig, } configFileName := fmt.Sprintf("test_http_workflow_config_%s.json", workflowName) - configPath := filepath.Join(outputDir, configFileName) + configPath := filepath.Join(os.TempDir(), configFileName) writeErr := os.WriteFile(configPath, configBytes, 0o644) //nolint:gosec // this is a test file if writeErr != nil { @@ -577,23 +572,21 @@ func createHTTPWorkflowConfigFile(workflowName string, cfg *HTTPWorkflowConfig, /* Creates .yaml workflow configuration file and returns the absolute path to the created config file. */ -func CreateWorkflowYamlConfigFile(workflowName string, workflowConfig any, outputDir string) (string, error) { +func CreateWorkflowYamlConfigFile(workflowName string, workflowConfig any) (string, error) { // Write workflow config to a .yaml file configMarshalled, err := yaml.Marshal(workflowConfig) if err != nil { return "", errors.Wrap(err, "failed to marshal workflow config") } - if mkErr := os.MkdirAll(outputDir, 0o755); mkErr != nil { - return "", errors.Wrap(mkErr, "failed to create output directory") - } + workflowSuffix := "_config.yaml" + workflowConfigOutputFile := workflowName + workflowSuffix - workflowConfigFile, tempErr := os.CreateTemp(outputDir, workflowName+"-*_config.yaml") - if tempErr != nil { - return "", errors.Wrap(tempErr, "failed to create workflow config file") - } - workflowConfigOutputFile := workflowConfigFile.Name() - if closeErr := workflowConfigFile.Close(); closeErr != nil { - return "", errors.Wrap(closeErr, "failed to close workflow config file") + // remove the duplicate if it already exists + _, statErr := os.Stat(workflowConfigOutputFile) + if statErr == nil { + if err := os.Remove(workflowConfigOutputFile); err != nil { + return "", errors.Wrap(err, "failed to remove existing output file") + } } if err := os.WriteFile(workflowConfigOutputFile, configMarshalled, 0o644); err != nil { //nolint:gosec // G306: we want it to be readable by everyone @@ -619,8 +612,8 @@ func registerWorkflow(ctx context.Context, t *testing.T, t.Cleanup(func() { deleteWorkflows(t, wfRegCfg.WorkflowName, wfRegCfg.ConfigFilePath, - wfRegCfg.CompressedWasmPath, - wfRegCfg.WorkflowRegistryAddr, wfRegCfg.WorkflowRegistryVersion, wfRegCfg.SethClient, + wfRegCfg.CompressedWasmPath, wfRegCfg.Blockchains, + wfRegCfg.WorkflowRegistryAddr, wfRegCfg.WorkflowRegistryVersion, ) }) @@ -644,6 +637,7 @@ func registerWorkflow(ctx context.Context, t *testing.T, binaryURL, configURL, nil, // no secrets yet + wfRegCfg.Attributes, containerTargetDir, ) require.NoError(t, registerErr, "failed to register workflow '%s'", wfRegCfg.WorkflowName) @@ -664,9 +658,9 @@ func deleteWorkflows( uniqueWorkflowName string, workflowConfigFilePath string, compressedWorkflowWasmPath string, + blockchains []blockchains.Blockchain, workflowRegistryAddress common.Address, version *semver.Version, - sethClient *seth.Client, ) { t.Helper() @@ -675,10 +669,8 @@ func deleteWorkflows( localEnvErr := creworkflow.RemoveWorkflowArtifactsFromLocalEnv(workflowConfigFilePath, compressedWorkflowWasmPath) require.NoError(t, localEnvErr, "failed to remove workflow artifacts from local environment") - deleteWorkflowsMu.Lock() - defer deleteWorkflowsMu.Unlock() - - deleteErr := creworkflow.DeleteWithContract(t.Context(), sethClient, workflowRegistryAddress, version, uniqueWorkflowName) + require.IsType(t, &evm.Blockchain{}, blockchains[0], "expected EVM blockchain type") + deleteErr := creworkflow.DeleteWithContract(t.Context(), blockchains[0].(*evm.Blockchain).SethClient, workflowRegistryAddress, version, uniqueWorkflowName) require.NoError(t, deleteErr, "failed to delete workflow '%s'. Please delete/unregister it manually.", uniqueWorkflowName) testLogger.Info().Msgf("Workflow '%s' deleted successfully from the registry.", uniqueWorkflowName) } @@ -686,26 +678,24 @@ func deleteWorkflows( func CompileAndDeployWorkflow[T WorkflowConfig](t *testing.T, testEnv *ttypes.TestEnvironment, testLogger zerolog.Logger, workflowName string, workflowConfig *T, workflowFileLocation string, - opts ...CompileAndDeployWorkflowOpt, ) string { t.Helper() - cfg := compileAndDeployWorkflowCfg{ - artifactCopyDONTypes: []cre.CapabilityFlag{cre.WorkflowDON}, - } - for _, opt := range opts { - opt(&cfg) - } testLogger.Info(). Str("workflow_name", workflowName). Str("workflow_file_location", workflowFileLocation). Msgf("compiling and registering workflow '%s'", workflowName) - artifactDir := workflowArtifactsDir(t, testEnv) registryChainSelector := testEnv.CreEnvironment.Blockchains[0].ChainSelector() - workflowDONs := selectArtifactTargetDONs(testEnv, cfg.artifactCopyDONTypes) + workflowDONs := make([]*cre.Don, 0) + for _, don := range testEnv.Dons.List() { + if !don.HasFlag(cre.WorkflowDON) { + continue + } + workflowDONs = append(workflowDONs, don) + } - compressedWorkflowWasmPath, workflowConfigPath := createWorkflowArtifacts(t, testLogger, workflowName, workflowDONs, workflowConfig, workflowFileLocation, artifactDir) + compressedWorkflowWasmPath, workflowConfigPath := createWorkflowArtifacts(t, testLogger, workflowName, workflowDONs, workflowConfig, workflowFileLocation) require.NotEmpty(t, compressedWorkflowWasmPath, "failed to find workflow DON in the topology") workflowRegistryAddress := crecontracts.MustGetAddressRefFromDataStore(testEnv.CreEnvironment.CldfEnvironment.DataStore, testEnv.CreEnvironment.Blockchains[0].ChainSelector(), keystone_changeset.WorkflowRegistry.String(), testEnv.CreEnvironment.ContractVersions[keystone_changeset.WorkflowRegistry.String()], "") @@ -720,93 +710,54 @@ func CompileAndDeployWorkflow[T WorkflowConfig](t *testing.T, ChainID: registryChainSelector, DonID: testEnv.Dons.MustWorkflowDON().ID, ContainerTargetDir: creworkflow.DefaultWorkflowTargetDir, - SethClient: testEnv.CreEnvironment.Blockchains[0].(*evm.Blockchain).SethClient, + Blockchains: testEnv.CreEnvironment.Blockchains, } require.IsType(t, &evm.Blockchain{}, testEnv.CreEnvironment.Blockchains[0], "expected EVM blockchain type") workflowID := registerWorkflow(t.Context(), t, workflowRegConfig, testEnv.CreEnvironment.Blockchains[0].(*evm.Blockchain).SethClient, testLogger) return workflowID } -type compileAndDeployWorkflowCfg struct { - artifactCopyDONTypes []cre.CapabilityFlag -} - -// CompileAndDeployWorkflowOpt customizes workflow compilation/deployment behavior. -type CompileAndDeployWorkflowOpt func(*compileAndDeployWorkflowCfg) - -// WithArtifactCopyDONTypes sets DON types where workflow artifacts should be copied. -func WithArtifactCopyDONTypes(donTypes ...cre.CapabilityFlag) CompileAndDeployWorkflowOpt { - return func(cfg *compileAndDeployWorkflowCfg) { - if len(donTypes) == 0 { - return - } - cfg.artifactCopyDONTypes = append([]cre.CapabilityFlag{}, donTypes...) - } -} +// CompileAndDeployConfidentialWorkflow compiles a workflow WASM binary, copies it to Docker +// containers, and registers it with confidential attributes on the workflow registry. +func CompileAndDeployConfidentialWorkflow[T WorkflowConfig](t *testing.T, + testEnv *ttypes.TestEnvironment, testLogger zerolog.Logger, workflowName string, + workflowConfig *T, workflowFileLocation string, attributes []byte, +) string { + t.Helper() -func selectArtifactTargetDONs(testEnv *ttypes.TestEnvironment, donTypes []cre.CapabilityFlag) []*cre.Don { - if len(donTypes) == 0 { - donTypes = []cre.CapabilityFlag{cre.WorkflowDON} - } - allow := make(map[cre.CapabilityFlag]struct{}, len(donTypes)) - for _, donType := range donTypes { - allow[donType] = struct{}{} - } + testLogger.Info(). + Str("workflow_name", workflowName). + Str("workflow_file_location", workflowFileLocation). + Msgf("compiling and registering confidential workflow '%s'", workflowName) + registryChainSelector := testEnv.CreEnvironment.Blockchains[0].ChainSelector() - targetDONs := make([]*cre.Don, 0) + workflowDONs := make([]*cre.Don, 0) for _, don := range testEnv.Dons.List() { - for donType := range allow { - if don.HasFlag(donType) { - targetDONs = append(targetDONs, don) - break - } + if !don.HasFlag(cre.WorkflowDON) { + continue } - } - return targetDONs -} - -func workflowArtifactsDir(t *testing.T, testEnv *ttypes.TestEnvironment) string { - t.Helper() - if testEnv.Execution == nil || testEnv.Execution.TestID == "" { - dir, err := os.MkdirTemp("", "cre-workflow-artifacts-*") - require.NoError(t, err, "failed to create artifacts directory") - return dir + workflowDONs = append(workflowDONs, don) } - dir := filepath.Join(os.TempDir(), "cre-workflow-artifacts", testEnv.Execution.TestID) - require.NoError(t, os.MkdirAll(dir, 0o755), "failed to create artifacts directory") - return dir -} - -func UniqueWorkflowName(testEnv *ttypes.TestEnvironment, baseName string) string { - testID := "" - if testEnv != nil && testEnv.Execution != nil { - testID = testEnv.Execution.TestID - } - if testID == "" { - return truncateWorkflowName(baseName, baseName) - } - return truncateWorkflowName(fmt.Sprintf("%s-%s", baseName, testID), fmt.Sprintf("%s:%s", baseName, testID)) -} + compressedWorkflowWasmPath, workflowConfigPath := createWorkflowArtifacts(t, testLogger, workflowName, workflowDONs, workflowConfig, workflowFileLocation) + require.NotEmpty(t, compressedWorkflowWasmPath, "failed to find workflow DON in the topology") -func truncateWorkflowName(name, uniquenessSeed string) string { - if len(name) <= maxWorkflowNameLen { - return name - } + workflowRegistryAddress := crecontracts.MustGetAddressRefFromDataStore(testEnv.CreEnvironment.CldfEnvironment.DataStore, testEnv.CreEnvironment.Blockchains[0].ChainSelector(), keystone_changeset.WorkflowRegistry.String(), testEnv.CreEnvironment.ContractVersions[keystone_changeset.WorkflowRegistry.String()], "") - sum := sha256.Sum256([]byte(uniquenessSeed)) - suffix := hex.EncodeToString(sum[:])[:8] - prefixLen := maxWorkflowNameLen - len(suffix) - 1 // include hyphen - if prefixLen < 1 { - return suffix[:maxWorkflowNameLen] - } - if prefixLen > len(name) { - prefixLen = len(name) + workflowRegConfig := &WorkflowRegistrationConfig{ + WorkflowName: workflowName, + WorkflowLocation: workflowFileLocation, + ConfigFilePath: workflowConfigPath, + CompressedWasmPath: compressedWorkflowWasmPath, + Attributes: attributes, + WorkflowRegistryAddr: common.HexToAddress(workflowRegistryAddress.Address), + WorkflowRegistryVersion: workflowRegistryAddress.Version, + ChainID: registryChainSelector, + DonID: testEnv.Dons.MustWorkflowDON().ID, + ContainerTargetDir: creworkflow.DefaultWorkflowTargetDir, + Blockchains: testEnv.CreEnvironment.Blockchains, } - return fmt.Sprintf("%s-%s", name[:prefixLen], suffix) -} - -func ParallelEnabled() bool { - v := strings.TrimSpace(strings.ToLower(os.Getenv("CRE_TEST_PARALLEL_ENABLED"))) - return v == "1" || v == "true" || v == "yes" + require.IsType(t, &evm.Blockchain{}, testEnv.CreEnvironment.Blockchains[0], "expected EVM blockchain type") + workflowID := registerWorkflow(t.Context(), t, workflowRegConfig, testEnv.CreEnvironment.Blockchains[0].(*evm.Blockchain).SethClient, testLogger) + return workflowID }