diff --git a/core/services/job/models.go b/core/services/job/models.go index 20e7da2c643..2d257a33ca5 100644 --- a/core/services/job/models.go +++ b/core/services/job/models.go @@ -930,6 +930,7 @@ type WorkflowSpec struct { CreatedAt time.Time `toml:"-"` UpdatedAt time.Time `toml:"-"` SpecType WorkflowSpecType `toml:"spec_type" db:"spec_type"` + Attributes []byte `db:"attributes"` sdkWorkflow *sdk.WorkflowSpec rawSpec []byte config []byte diff --git a/core/services/standardcapabilities/conversions/conversions.go b/core/services/standardcapabilities/conversions/conversions.go index b9a233243af..d623fa9a409 100644 --- a/core/services/standardcapabilities/conversions/conversions.go +++ b/core/services/standardcapabilities/conversions/conversions.go @@ -50,6 +50,8 @@ func GetCapabilityIDFromCommand(command string, config string) string { return "http-trigger@1.0.0-alpha" case "http_action": return "http-actions@1.0.0-alpha" // plural "actions" + case "mock": + return "mock@1.0.0" default: return "" } @@ -71,6 +73,8 @@ func GetCommandFromCapabilityID(capabilityID string) string { return "http_trigger" case strings.HasPrefix(capabilityID, "http-actions"): return "http_action" + case strings.HasPrefix(capabilityID, "mock"): + return "mock" default: return "" } diff --git a/core/services/workflows/artifacts/v2/orm.go b/core/services/workflows/artifacts/v2/orm.go index 7b37afda16b..6d8887dbe06 100644 --- a/core/services/workflows/artifacts/v2/orm.go +++ b/core/services/workflows/artifacts/v2/orm.go @@ -63,7 +63,8 @@ func (orm *orm) UpsertWorkflowSpec(ctx context.Context, spec *job.WorkflowSpec) config_url, created_at, updated_at, - spec_type + spec_type, + attributes ) VALUES ( :workflow, :config, @@ -76,7 +77,8 @@ func (orm *orm) UpsertWorkflowSpec(ctx context.Context, spec *job.WorkflowSpec) :config_url, :created_at, :updated_at, - :spec_type + :spec_type, + :attributes ) ON CONFLICT (workflow_id) DO UPDATE SET workflow = EXCLUDED.workflow, @@ -89,7 +91,8 @@ func (orm *orm) UpsertWorkflowSpec(ctx context.Context, spec *job.WorkflowSpec) config_url = EXCLUDED.config_url, created_at = EXCLUDED.created_at, updated_at = EXCLUDED.updated_at, - spec_type = EXCLUDED.spec_type + spec_type = EXCLUDED.spec_type, + attributes = EXCLUDED.attributes RETURNING id ` diff --git a/core/services/workflows/syncer/v2/handler.go b/core/services/workflows/syncer/v2/handler.go index e78e2ec55bc..7a493ed358d 100644 --- a/core/services/workflows/syncer/v2/handler.go +++ b/core/services/workflows/syncer/v2/handler.go @@ -537,6 +537,7 @@ func (h *eventHandler) createWorkflowSpec(ctx context.Context, payload WorkflowR SpecType: job.WASMFile, BinaryURL: payload.BinaryURL, ConfigURL: payload.ConfigURL, + Attributes: payload.Attributes, } if _, err = h.workflowArtifactsStore.UpsertWorkflowSpec(ctx, entry); err != nil { @@ -628,55 +629,9 @@ func (h *eventHandler) engineFactoryFn(ctx context.Context, workflowID string, o } // V2 aka "NoDAG" - cfg := &v2.EngineConfig{ - Lggr: h.lggr, - Module: module, - WorkflowConfig: config, - CapRegistry: h.capRegistry, - DonSubscriber: h.workflowDonSubscriber, - UseLocalTimeProvider: h.useLocalTimeProvider, - DonTimeStore: h.donTimeStore, - ExecutionsStore: h.workflowStore, - WorkflowID: workflowID, - WorkflowOwner: owner, - WorkflowName: name, - WorkflowTag: tag, - WorkflowEncryptionKey: h.workflowEncryptionKey, + cfg := h.newV2EngineConfig(module, workflowID, owner, tag, sdkName, name, config) - LocalLimits: v2.EngineLimits{}, // all defaults - LocalLimiters: h.engineLimiters, - FeatureFlags: h.featureFlags, - GlobalExecutionConcurrencyLimiter: h.workflowLimits, - - BeholderEmitter: func() custmsg.MessageEmitter { - h.emitterMu.RLock() - defer h.emitterMu.RUnlock() - return h.emitter - }(), - BillingClient: h.billingClient, - - WorkflowRegistryAddress: h.workflowRegistryAddress, - WorkflowRegistryChainSelector: h.workflowRegistryChainSelector, - OrgResolver: h.orgResolver, - DebugMode: h.debugMode, - SecretsFetcher: h.secretsFetcher, - SdkName: sdkName, - } - - // Wire the initDone channel to the OnInitialized lifecycle hook. - // This will be called when the engine completes initialization (including trigger subscriptions). - // We compose with any existing hook to avoid overwriting test hooks or other user-provided hooks. - if initDone != nil { - existingHook := cfg.Hooks.OnInitialized - cfg.Hooks.OnInitialized = func(err error) { - // Signal completion to the handler first - initDone <- err - // Then call any existing hook (e.g., from tests) - if existingHook != nil { - existingHook(err) - } - } - } + h.wireInitDoneHook(cfg, initDone) return v2.NewEngine(cfg) } @@ -789,24 +744,23 @@ func (h *eventHandler) tryEngineCreate(ctx context.Context, spec *job.WorkflowSp return fmt.Errorf("invalid workflow name: %w", err) } + confidential, err := v2.IsConfidential(spec.Attributes) + if err != nil { + return fmt.Errorf("failed to parse workflow attributes: %w", err) + } + // Create a channel to receive the initialization result. // This allows us to wait for the engine to complete initialization (including trigger subscriptions) // before emitting the workflowActivated event, ensuring the event accurately reflects deployment status. initDone := make(chan error, 1) + var engine services.Service - // Scope the engineFactory call so that decodedBinary goes out of scope immediately after the factory returns - engine, err := func() (services.Service, error) { - return h.engineFactory( - ctx, - spec.WorkflowID, - spec.WorkflowOwner, - workflowName, - spec.WorkflowTag, - configBytes, - decodedBinary, - initDone, - ) - }() + if confidential { + h.lggr.Infow("routing workflow to confidential execution", "workflowID", spec.WorkflowID) + engine, err = h.confidentialEngineFactory(spec, workflowName, decodedBinary, initDone) + } else { + engine, err = h.engineFactory(ctx, spec.WorkflowID, spec.WorkflowOwner, workflowName, spec.WorkflowTag, configBytes, decodedBinary, initDone) + } if err != nil { return fmt.Errorf("failed to create workflow engine: %w", err) } @@ -863,6 +817,106 @@ func (h *eventHandler) tryEngineCreate(ctx context.Context, spec *job.WorkflowSp return nil } +// newV2EngineConfig builds the common EngineConfig shared by both the normal +// WASM engine and the confidential engine paths. Caller supplies the module. +func (h *eventHandler) newV2EngineConfig( + module host.ModuleV2, + workflowID, owner, tag, sdkName string, + name types.WorkflowName, + config []byte, +) *v2.EngineConfig { + return &v2.EngineConfig{ + Lggr: h.lggr, + Module: module, + WorkflowConfig: config, + CapRegistry: h.capRegistry, + DonSubscriber: h.workflowDonSubscriber, + UseLocalTimeProvider: h.useLocalTimeProvider, + DonTimeStore: h.donTimeStore, + ExecutionsStore: h.workflowStore, + WorkflowID: workflowID, + WorkflowOwner: owner, + WorkflowName: name, + WorkflowTag: tag, + WorkflowEncryptionKey: h.workflowEncryptionKey, + + LocalLimits: v2.EngineLimits{}, // all defaults + LocalLimiters: h.engineLimiters, + FeatureFlags: h.featureFlags, + GlobalExecutionConcurrencyLimiter: h.workflowLimits, + + BeholderEmitter: func() custmsg.MessageEmitter { + h.emitterMu.RLock() + defer h.emitterMu.RUnlock() + return h.emitter + }(), + BillingClient: h.billingClient, + + WorkflowRegistryAddress: h.workflowRegistryAddress, + WorkflowRegistryChainSelector: h.workflowRegistryChainSelector, + OrgResolver: h.orgResolver, + SecretsFetcher: h.secretsFetcher, + DebugMode: h.debugMode, + SdkName: sdkName, + } +} + +// wireInitDoneHook wires the initDone channel to the OnInitialized lifecycle hook. +// This will be called when the engine completes initialization (including trigger subscriptions). +// We compose with any existing hook to avoid overwriting test hooks or other user-provided hooks. +func (h *eventHandler) wireInitDoneHook(cfg *v2.EngineConfig, initDone chan<- error) { + if initDone == nil { + return + } + existingHook := cfg.Hooks.OnInitialized + cfg.Hooks.OnInitialized = func(err error) { + // Signal completion to the handler first + initDone <- err + // Then call any existing hook (e.g., from tests) + if existingHook != nil { + existingHook(err) + } + } +} + +// confidentialEngineFactory creates a V2 engine backed by a ConfidentialModule +// instead of a local WASM module. The ConfidentialModule delegates execution to +// the confidential-workflows capability which runs the WASM inside a TEE. +func (h *eventHandler) confidentialEngineFactory( + spec *job.WorkflowSpec, + workflowName types.WorkflowName, + decodedBinary []byte, + initDone chan<- error, +) (services.Service, error) { + attrs, err := v2.ParseWorkflowAttributes(spec.Attributes) + if err != nil { + return nil, fmt.Errorf("failed to parse workflow attributes: %w", err) + } + + binaryHash := v2.ComputeBinaryHash(decodedBinary) + + lggr := logger.Named(h.lggr, "WorkflowEngine.ConfidentialModule") + lggr = logger.With(lggr, "workflowID", spec.WorkflowID, "workflowName", spec.WorkflowName, "workflowOwner", spec.WorkflowOwner) + + // nil resolver: raw binaryURL is passed to the enclave as-is. + // PR 5/5 (#21642) wires this to the storage service retriever + // so the enclave receives a presigned URL. + module := v2.NewConfidentialModule( + h.capRegistry, + spec.BinaryURL, + binaryHash, + spec.WorkflowID, spec.WorkflowOwner, workflowName.String(), spec.WorkflowTag, + attrs.VaultDonSecrets, + nil, + lggr, + ) + + cfg := h.newV2EngineConfig(module, spec.WorkflowID, spec.WorkflowOwner, spec.WorkflowTag, "", workflowName, []byte(spec.Config)) + h.wireInitDoneHook(cfg, initDone) + + return v2.NewEngine(cfg) +} + // logCustMsg emits a custom message to the external sink and logs an error if that fails. func logCustMsg(ctx context.Context, cma custmsg.MessageEmitter, msg string, log logger.Logger) { err := cma.Emit(ctx, msg) diff --git a/core/services/workflows/syncer/v2/handler_test.go b/core/services/workflows/syncer/v2/handler_test.go index ad7b80d06b9..aa0a66c6362 100644 --- a/core/services/workflows/syncer/v2/handler_test.go +++ b/core/services/workflows/syncer/v2/handler_test.go @@ -618,6 +618,264 @@ func Test_workflowRegisteredHandler(t *testing.T) { } } +func Test_workflowRegisteredHandler_confidentialRouting(t *testing.T) { + t.Run("confidential workflow bypasses engine factory and routes to confidential path", func(t *testing.T) { + var ( + ctx = testutils.Context(t) + lggr = logger.TestLogger(t) + lf = limits.Factory{Logger: lggr} + db = pgtest.NewSqlxDB(t) + orm = artifacts.NewWorkflowRegistryDS(db, lggr) + emitter = custmsg.NewLabeler() + + binary = wasmtest.CreateTestBinary(binaryCmd, true, t) + encodedBinary = []byte(base64.StdEncoding.EncodeToString(binary)) + config = []byte("") + wfOwner = []byte("0xOwner") + workflowEncryptionKey = workflowkey.MustNewXXXTestingOnly(big.NewInt(1)) + ) + + giveWFID, err := pkgworkflows.GenerateWorkflowID(wfOwner, "workflow-name", binary, config, "") + require.NoError(t, err) + wfIDString := hex.EncodeToString(giveWFID[:]) + + binaryURL := "http://example.com/" + wfIDString + "/binary" + configURL := "http://example.com/" + wfIDString + "/config" + signedURLParameter := "?auth=abc123" + signedBinaryURL := binaryURL + signedURLParameter + signedConfigURL := configURL + signedURLParameter + + fetcher := newMockFetcher(map[string]mockFetchResp{ + wfIDString + "-ARTIFACT_TYPE_BINARY": {Body: []byte(signedBinaryURL), Err: nil}, + wfIDString + "-ARTIFACT_TYPE_CONFIG": {Body: []byte(signedConfigURL), Err: nil}, + signedBinaryURL: {Body: encodedBinary, Err: nil}, + signedConfigURL: {Body: config, Err: nil}, + }) + artifactStore, err := artifacts.NewStore(lggr, orm, fetcher.FetcherFunc(), fetcher.RetrieverFunc(), clockwork.NewFakeClock(), workflowkey.Key{}, custmsg.NewLabeler(), lf, artifacts.WithConfig(artifacts.StoreConfig{ + ArtifactStorageHost: "example.com", + })) + require.NoError(t, err) + + er := NewEngineRegistry() + + // Track whether the engine factory is called. The confidential path + // should bypass it entirely. + factoryCalled := false + trackingFactory := func(ctx context.Context, wfid string, owner string, name types.WorkflowName, tag string, config []byte, binary []byte, initDone chan<- error) (services.Service, error) { + factoryCalled = true + if initDone != nil { + initDone <- nil + } + return &mockEngine{}, nil + } + + wfStore := store.NewInMemoryStore(lggr, clockwork.NewFakeClock()) + registry := capabilities.NewRegistry(lggr) + registry.SetLocalRegistry(&capabilities.TestMetadataRegistry{}) + limiters, err := v2.NewLimiters(lf, nil) + require.NoError(t, err) + rl, err := ratelimiter.NewRateLimiter(rlConfig) + require.NoError(t, err) + workflowLimits, err := syncerlimiter.NewWorkflowLimits(lggr, syncerlimiter.Config{Global: 200, PerOwner: 200}, lf) + require.NoError(t, err) + + h, err := NewEventHandler(lggr, wfStore, nil, true, registry, er, emitter, limiters, nil, rl, workflowLimits, artifactStore, workflowEncryptionKey, &testDonNotifier{}, + WithEngineRegistry(er), + WithEngineFactoryFn(trackingFactory), + ) + require.NoError(t, err) + servicetest.Run(t, h) + + event := WorkflowRegisteredEvent{ + Status: WorkflowStatusActive, + WorkflowID: giveWFID, + WorkflowOwner: wfOwner, + WorkflowName: "workflow-name", + WorkflowTag: "workflow-tag", + BinaryURL: binaryURL, + ConfigURL: configURL, + Attributes: []byte(`{"confidential":true,"vault_don_secrets":[{"key":"API_KEY"}]}`), + } + + ctx = contexts.WithCRE(ctx, contexts.CRE{Owner: hex.EncodeToString(wfOwner), Workflow: wfIDString}) + err = h.workflowRegisteredEvent(ctx, event) + + // The confidential path creates a real v2.Engine. With test data + // (non-hex owner), engine creation fails. The error comes from the + // confidential path, proving routing worked correctly. + require.Error(t, err) + assert.Contains(t, err.Error(), "failed to create workflow engine") + + // The engine factory must NOT have been called; the confidential path + // bypasses it. + assert.False(t, factoryCalled, "engine factory should not be called for confidential workflows") + + // The engine should NOT be in the registry since init failed. + _, ok := er.Get(giveWFID) + assert.False(t, ok, "engine should not be registered after failed init") + }) + + t.Run("non-confidential workflow uses engine factory", func(t *testing.T) { + var ( + ctx = testutils.Context(t) + lggr = logger.TestLogger(t) + lf = limits.Factory{Logger: lggr} + db = pgtest.NewSqlxDB(t) + orm = artifacts.NewWorkflowRegistryDS(db, lggr) + emitter = custmsg.NewLabeler() + + binary = wasmtest.CreateTestBinary(binaryCmd, true, t) + encodedBinary = []byte(base64.StdEncoding.EncodeToString(binary)) + config = []byte("") + wfOwner = []byte("0xOwner") + workflowEncryptionKey = workflowkey.MustNewXXXTestingOnly(big.NewInt(1)) + ) + + giveWFID, err := pkgworkflows.GenerateWorkflowID(wfOwner, "workflow-name", binary, config, "") + require.NoError(t, err) + wfIDString := hex.EncodeToString(giveWFID[:]) + + binaryURL := "http://example.com/" + wfIDString + "/binary" + configURL := "http://example.com/" + wfIDString + "/config" + signedURLParameter := "?auth=abc123" + signedBinaryURL := binaryURL + signedURLParameter + signedConfigURL := configURL + signedURLParameter + + fetcher := newMockFetcher(map[string]mockFetchResp{ + wfIDString + "-ARTIFACT_TYPE_BINARY": {Body: []byte(signedBinaryURL), Err: nil}, + wfIDString + "-ARTIFACT_TYPE_CONFIG": {Body: []byte(signedConfigURL), Err: nil}, + signedBinaryURL: {Body: encodedBinary, Err: nil}, + signedConfigURL: {Body: config, Err: nil}, + }) + artifactStore, err := artifacts.NewStore(lggr, orm, fetcher.FetcherFunc(), fetcher.RetrieverFunc(), clockwork.NewFakeClock(), workflowkey.Key{}, custmsg.NewLabeler(), lf, artifacts.WithConfig(artifacts.StoreConfig{ + ArtifactStorageHost: "example.com", + })) + require.NoError(t, err) + + er := NewEngineRegistry() + + factoryCalled := false + trackingFactory := func(ctx context.Context, wfid string, owner string, name types.WorkflowName, tag string, config []byte, binary []byte, initDone chan<- error) (services.Service, error) { + factoryCalled = true + if initDone != nil { + initDone <- nil + } + return &mockEngine{}, nil + } + + wfStore := store.NewInMemoryStore(lggr, clockwork.NewFakeClock()) + registry := capabilities.NewRegistry(lggr) + registry.SetLocalRegistry(&capabilities.TestMetadataRegistry{}) + limiters, err := v2.NewLimiters(lf, nil) + require.NoError(t, err) + rl, err := ratelimiter.NewRateLimiter(rlConfig) + require.NoError(t, err) + workflowLimits, err := syncerlimiter.NewWorkflowLimits(lggr, syncerlimiter.Config{Global: 200, PerOwner: 200}, lf) + require.NoError(t, err) + + h, err := NewEventHandler(lggr, wfStore, nil, true, registry, er, emitter, limiters, nil, rl, workflowLimits, artifactStore, workflowEncryptionKey, &testDonNotifier{}, + WithEngineRegistry(er), + WithEngineFactoryFn(trackingFactory), + ) + require.NoError(t, err) + servicetest.Run(t, h) + + event := WorkflowRegisteredEvent{ + Status: WorkflowStatusActive, + WorkflowID: giveWFID, + WorkflowOwner: wfOwner, + WorkflowName: "workflow-name", + WorkflowTag: "workflow-tag", + BinaryURL: binaryURL, + ConfigURL: configURL, + // No Attributes, or non-confidential attributes. + } + + ctx = contexts.WithCRE(ctx, contexts.CRE{Owner: hex.EncodeToString(wfOwner), Workflow: wfIDString}) + err = h.workflowRegisteredEvent(ctx, event) + require.NoError(t, err) + + assert.True(t, factoryCalled, "engine factory should be called for non-confidential workflows") + + engine, ok := er.Get(giveWFID) + require.True(t, ok, "engine should be registered") + require.NoError(t, engine.Ready()) + }) + + t.Run("malformed attributes returns error", func(t *testing.T) { + var ( + ctx = testutils.Context(t) + lggr = logger.TestLogger(t) + lf = limits.Factory{Logger: lggr} + db = pgtest.NewSqlxDB(t) + orm = artifacts.NewWorkflowRegistryDS(db, lggr) + emitter = custmsg.NewLabeler() + + binary = wasmtest.CreateTestBinary(binaryCmd, true, t) + encodedBinary = []byte(base64.StdEncoding.EncodeToString(binary)) + config = []byte("") + wfOwner = []byte("0xOwner") + workflowEncryptionKey = workflowkey.MustNewXXXTestingOnly(big.NewInt(1)) + ) + + giveWFID, err := pkgworkflows.GenerateWorkflowID(wfOwner, "workflow-name", binary, config, "") + require.NoError(t, err) + wfIDString := hex.EncodeToString(giveWFID[:]) + + binaryURL := "http://example.com/" + wfIDString + "/binary" + configURL := "http://example.com/" + wfIDString + "/config" + signedURLParameter := "?auth=abc123" + signedBinaryURL := binaryURL + signedURLParameter + signedConfigURL := configURL + signedURLParameter + + fetcher := newMockFetcher(map[string]mockFetchResp{ + wfIDString + "-ARTIFACT_TYPE_BINARY": {Body: []byte(signedBinaryURL), Err: nil}, + wfIDString + "-ARTIFACT_TYPE_CONFIG": {Body: []byte(signedConfigURL), Err: nil}, + signedBinaryURL: {Body: encodedBinary, Err: nil}, + signedConfigURL: {Body: config, Err: nil}, + }) + artifactStore, err := artifacts.NewStore(lggr, orm, fetcher.FetcherFunc(), fetcher.RetrieverFunc(), clockwork.NewFakeClock(), workflowkey.Key{}, custmsg.NewLabeler(), lf, artifacts.WithConfig(artifacts.StoreConfig{ + ArtifactStorageHost: "example.com", + })) + require.NoError(t, err) + + er := NewEngineRegistry() + wfStore := store.NewInMemoryStore(lggr, clockwork.NewFakeClock()) + registry := capabilities.NewRegistry(lggr) + registry.SetLocalRegistry(&capabilities.TestMetadataRegistry{}) + limiters, err := v2.NewLimiters(lf, nil) + require.NoError(t, err) + rl, err := ratelimiter.NewRateLimiter(rlConfig) + require.NoError(t, err) + workflowLimits, err := syncerlimiter.NewWorkflowLimits(lggr, syncerlimiter.Config{Global: 200, PerOwner: 200}, lf) + require.NoError(t, err) + + h, err := NewEventHandler(lggr, wfStore, nil, true, registry, er, emitter, limiters, nil, rl, workflowLimits, artifactStore, workflowEncryptionKey, &testDonNotifier{}, + WithEngineRegistry(er), + WithEngineFactoryFn(mockEngineFactory), + ) + require.NoError(t, err) + servicetest.Run(t, h) + + event := WorkflowRegisteredEvent{ + Status: WorkflowStatusActive, + WorkflowID: giveWFID, + WorkflowOwner: wfOwner, + WorkflowName: "workflow-name", + WorkflowTag: "workflow-tag", + BinaryURL: binaryURL, + ConfigURL: configURL, + Attributes: []byte(`{not valid json`), + } + + ctx = contexts.WithCRE(ctx, contexts.CRE{Owner: hex.EncodeToString(wfOwner), Workflow: wfIDString}) + err = h.workflowRegisteredEvent(ctx, event) + + require.Error(t, err) + assert.Contains(t, err.Error(), "failed to parse workflow attributes") + }) +} + type testCase struct { Name string BinaryURLFactory func(string) string diff --git a/core/services/workflows/v2/confidential_module.go b/core/services/workflows/v2/confidential_module.go new file mode 100644 index 00000000000..7a1983d2a27 --- /dev/null +++ b/core/services/workflows/v2/confidential_module.go @@ -0,0 +1,204 @@ +package v2 + +import ( + "context" + "crypto/sha256" + "encoding/json" + "errors" + "fmt" + + "google.golang.org/protobuf/proto" + "google.golang.org/protobuf/types/known/anypb" + + "github.com/smartcontractkit/chainlink-common/pkg/capabilities" + "github.com/smartcontractkit/chainlink-common/pkg/logger" + "github.com/smartcontractkit/chainlink-common/pkg/types/core" + "github.com/smartcontractkit/chainlink-common/pkg/workflows/wasm/host" + + confworkflowtypes "github.com/smartcontractkit/chainlink-common/pkg/capabilities/v2/actions/confidentialworkflow" + sdkpb "github.com/smartcontractkit/chainlink-protos/cre/go/sdk" +) + +const confidentialWorkflowsCapabilityID = "confidential-workflows@1.0.0-alpha" + +// WorkflowAttributes is the JSON structure stored in WorkflowSpec.Attributes. +type WorkflowAttributes struct { + Confidential bool `json:"confidential"` + VaultDonSecrets []SecretIdentifier `json:"vault_don_secrets"` +} + +// SecretIdentifier identifies a secret in VaultDON. +type SecretIdentifier struct { + Key string `json:"key"` + Namespace string `json:"namespace,omitempty"` +} + +// ParseWorkflowAttributes parses the Attributes JSON from a WorkflowSpec. +// Returns a zero-value struct if data is nil or empty. +func ParseWorkflowAttributes(data []byte) (WorkflowAttributes, error) { + var attrs WorkflowAttributes + if len(data) == 0 { + return attrs, nil + } + if err := json.Unmarshal(data, &attrs); err != nil { + return attrs, fmt.Errorf("failed to parse workflow attributes: %w", err) + } + return attrs, nil +} + +// IsConfidential returns true if the Attributes JSON has "confidential": true. +// Returns an error if the attributes contain malformed JSON, so callers can +// fail loudly rather than silently falling through to non-confidential execution. +func IsConfidential(data []byte) (bool, error) { + attrs, err := ParseWorkflowAttributes(data) + if err != nil { + return false, err + } + return attrs.Confidential, nil +} + +// BinaryURLResolver resolves a raw binary URL into an ephemeral/presigned +// URL that the enclave can fetch without authentication. In production this +// calls the CRE storage service; nil means the raw URL is used as-is. +// PR 5/5 (#21642) wires this to the storage service retriever. +type BinaryURLResolver func(ctx context.Context, workflowID string) (string, error) + +// ConfidentialModule implements host.ModuleV2 for confidential workflows. +// Instead of running WASM locally, it delegates execution to the +// confidential-workflows capability via the CapabilitiesRegistry. +type ConfidentialModule struct { + capRegistry core.CapabilitiesRegistry + binaryURL string + binaryHash []byte + workflowID string + workflowOwner string + workflowName string + workflowTag string + vaultDonSecrets []SecretIdentifier + binaryURLResolver BinaryURLResolver + lggr logger.Logger +} + +var _ host.ModuleV2 = (*ConfidentialModule)(nil) + +func NewConfidentialModule( + capRegistry core.CapabilitiesRegistry, + binaryURL string, + binaryHash []byte, + workflowID, workflowOwner, workflowName, workflowTag string, + vaultDonSecrets []SecretIdentifier, + binaryURLResolver BinaryURLResolver, + lggr logger.Logger, +) *ConfidentialModule { + return &ConfidentialModule{ + capRegistry: capRegistry, + binaryURL: binaryURL, + binaryHash: binaryHash, + workflowID: workflowID, + workflowOwner: workflowOwner, + workflowName: workflowName, + workflowTag: workflowTag, + vaultDonSecrets: vaultDonSecrets, + binaryURLResolver: binaryURLResolver, + lggr: lggr, + } +} + +func (m *ConfidentialModule) Start() {} +func (m *ConfidentialModule) Close() {} +func (m *ConfidentialModule) IsLegacyDAG() bool { return false } + +func (m *ConfidentialModule) Execute( + ctx context.Context, + request *sdkpb.ExecuteRequest, + helper host.ExecutionHelper, +) (*sdkpb.ExecutionResult, error) { + execReqBytes, err := proto.Marshal(request) + if err != nil { + return nil, fmt.Errorf("failed to marshal ExecuteRequest: %w", err) + } + + protoSecrets := make([]*confworkflowtypes.SecretIdentifier, len(m.vaultDonSecrets)) + for i, s := range m.vaultDonSecrets { + // VaultDON treats "main" as the default namespace for secrets. + ns := s.Namespace + if ns == "" { + ns = "main" + } + protoSecrets[i] = &confworkflowtypes.SecretIdentifier{ + Key: s.Key, + Namespace: &ns, + } + } + + binaryURL := m.binaryURL + if m.binaryURLResolver != nil { + resolved, resolveErr := m.binaryURLResolver(ctx, m.workflowID) + if resolveErr != nil { + return nil, fmt.Errorf("failed to resolve binary URL: %w", resolveErr) + } + binaryURL = resolved + } + + capInput := &confworkflowtypes.ConfidentialWorkflowRequest{ + VaultDonSecrets: protoSecrets, + Execution: &confworkflowtypes.WorkflowExecution{ + WorkflowId: m.workflowID, + BinaryUrl: binaryURL, + BinaryHash: m.binaryHash, + ExecuteRequest: execReqBytes, + Owner: m.workflowOwner, + ExecutionId: helper.GetWorkflowExecutionID(), + }, + } + + payload, err := anypb.New(capInput) + if err != nil { + return nil, fmt.Errorf("failed to marshal capability payload: %w", err) + } + + executable, err := m.capRegistry.GetExecutable(ctx, confidentialWorkflowsCapabilityID) + if err != nil { + return nil, fmt.Errorf("failed to get confidential-workflows capability: %w", err) + } + + capReq := capabilities.CapabilityRequest{ + Payload: payload, + Method: "Execute", + CapabilityId: confidentialWorkflowsCapabilityID, + Metadata: capabilities.RequestMetadata{ + WorkflowID: m.workflowID, + WorkflowOwner: m.workflowOwner, + WorkflowName: m.workflowName, + WorkflowTag: m.workflowTag, + WorkflowExecutionID: helper.GetWorkflowExecutionID(), + }, + } + + capResp, err := executable.Execute(ctx, capReq) + if err != nil { + return nil, fmt.Errorf("confidential-workflows capability execution failed: %w", err) + } + + if capResp.Payload == nil { + return nil, errors.New("confidential-workflows capability returned nil payload") + } + + var confResp confworkflowtypes.ConfidentialWorkflowResponse + if err := capResp.Payload.UnmarshalTo(&confResp); err != nil { + return nil, fmt.Errorf("failed to unmarshal capability response: %w", err) + } + + var result sdkpb.ExecutionResult + if err := proto.Unmarshal(confResp.ExecutionResult, &result); err != nil { + return nil, fmt.Errorf("failed to unmarshal ExecutionResult: %w", err) + } + + return &result, nil +} + +// ComputeBinaryHash returns the SHA-256 hash of the given binary. +func ComputeBinaryHash(binary []byte) []byte { + h := sha256.Sum256(binary) + return h[:] +} diff --git a/core/services/workflows/v2/confidential_module_test.go b/core/services/workflows/v2/confidential_module_test.go new file mode 100644 index 00000000000..e72d7164b29 --- /dev/null +++ b/core/services/workflows/v2/confidential_module_test.go @@ -0,0 +1,440 @@ +package v2 + +import ( + "context" + "crypto/sha256" + "errors" + "testing" + "time" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/mock" + "github.com/stretchr/testify/require" + "google.golang.org/protobuf/proto" + "google.golang.org/protobuf/types/known/anypb" + + "github.com/smartcontractkit/chainlink-common/pkg/capabilities" + "github.com/smartcontractkit/chainlink-common/pkg/logger" + regmocks "github.com/smartcontractkit/chainlink-common/pkg/types/core/mocks" + "github.com/smartcontractkit/chainlink-common/pkg/workflows/wasm/host" + + confworkflowtypes "github.com/smartcontractkit/chainlink-common/pkg/capabilities/v2/actions/confidentialworkflow" + capmocks "github.com/smartcontractkit/chainlink/v2/core/capabilities/mocks" + "github.com/smartcontractkit/chainlink/v2/core/utils/matches" + + sdkpb "github.com/smartcontractkit/chainlink-protos/cre/go/sdk" + valuespb "github.com/smartcontractkit/chainlink-protos/cre/go/values/pb" +) + +// stubExecutionHelper implements host.ExecutionHelper for testing. +type stubExecutionHelper struct { + executionID string +} + +var _ host.ExecutionHelper = (*stubExecutionHelper)(nil) + +func (s *stubExecutionHelper) CallCapability(context.Context, *sdkpb.CapabilityRequest) (*sdkpb.CapabilityResponse, error) { + return nil, nil +} +func (s *stubExecutionHelper) GetSecrets(context.Context, *sdkpb.GetSecretsRequest) ([]*sdkpb.SecretResponse, error) { + return nil, nil +} +func (s *stubExecutionHelper) GetWorkflowExecutionID() string { return s.executionID } +func (s *stubExecutionHelper) GetNodeTime() time.Time { return time.Time{} } +func (s *stubExecutionHelper) GetDONTime() (time.Time, error) { return time.Time{}, nil } +func (s *stubExecutionHelper) EmitUserLog(string) error { return nil } + +func TestParseWorkflowAttributes(t *testing.T) { + t.Run("valid JSON with all fields", func(t *testing.T) { + data := []byte(`{"confidential":true,"vault_don_secrets":[{"key":"API_KEY"},{"key":"SIGNING_KEY","namespace":"custom-ns"}]}`) + attrs, err := ParseWorkflowAttributes(data) + require.NoError(t, err) + assert.True(t, attrs.Confidential) + require.Len(t, attrs.VaultDonSecrets, 2) + assert.Equal(t, "API_KEY", attrs.VaultDonSecrets[0].Key) + assert.Empty(t, attrs.VaultDonSecrets[0].Namespace) + assert.Equal(t, "SIGNING_KEY", attrs.VaultDonSecrets[1].Key) + assert.Equal(t, "custom-ns", attrs.VaultDonSecrets[1].Namespace) + }) + + t.Run("empty data returns zero value", func(t *testing.T) { + attrs, err := ParseWorkflowAttributes(nil) + require.NoError(t, err) + assert.False(t, attrs.Confidential) + assert.Nil(t, attrs.VaultDonSecrets) + + attrs, err = ParseWorkflowAttributes([]byte{}) + require.NoError(t, err) + assert.False(t, attrs.Confidential) + }) + + t.Run("non-confidential workflow", func(t *testing.T) { + data := []byte(`{"confidential":false}`) + attrs, err := ParseWorkflowAttributes(data) + require.NoError(t, err) + assert.False(t, attrs.Confidential) + }) + + t.Run("malformed JSON returns error", func(t *testing.T) { + _, err := ParseWorkflowAttributes([]byte(`{not json}`)) + require.Error(t, err) + assert.Contains(t, err.Error(), "failed to parse workflow attributes") + }) +} + +func TestIsConfidential(t *testing.T) { + t.Run("returns true for confidential", func(t *testing.T) { + ok, err := IsConfidential([]byte(`{"confidential":true}`)) + require.NoError(t, err) + assert.True(t, ok) + }) + + t.Run("returns false for non-confidential", func(t *testing.T) { + ok, err := IsConfidential([]byte(`{"confidential":false}`)) + require.NoError(t, err) + assert.False(t, ok) + }) + + t.Run("returns false for empty data", func(t *testing.T) { + ok, err := IsConfidential(nil) + require.NoError(t, err) + assert.False(t, ok) + }) + + t.Run("returns error for malformed JSON", func(t *testing.T) { + _, err := IsConfidential([]byte(`broken`)) + require.Error(t, err) + assert.Contains(t, err.Error(), "failed to parse workflow attributes") + }) +} + +func TestComputeBinaryHash(t *testing.T) { + binary := []byte("hello world") + hash := ComputeBinaryHash(binary) + expected := sha256.Sum256(binary) + assert.Equal(t, expected[:], hash) + + // Deterministic: same input produces same hash. + assert.Equal(t, hash, ComputeBinaryHash(binary)) +} + +func TestConfidentialModule_Execute(t *testing.T) { + ctx := context.Background() + lggr := logger.Nop() + + // Build an ExecuteRequest to send through the module. + execReq := &sdkpb.ExecuteRequest{ + Config: []byte("test-config"), + } + + // Build the expected ExecutionResult that the enclave returns. + expectedResult := &sdkpb.ExecutionResult{ + Result: &sdkpb.ExecutionResult_Value{ + Value: valuespb.NewStringValue("enclave-output"), + }, + } + + // Serialize the result into a ConfidentialWorkflowResponse, as the capability would. + resultBytes, err := proto.Marshal(expectedResult) + require.NoError(t, err) + + confResp := &confworkflowtypes.ConfidentialWorkflowResponse{ + ExecutionResult: resultBytes, + } + respPayload, err := anypb.New(confResp) + require.NoError(t, err) + + t.Run("success", func(t *testing.T) { + capReg := regmocks.NewCapabilitiesRegistry(t) + execCap := capmocks.NewExecutableCapability(t) + + capReg.EXPECT().GetExecutable(matches.AnyContext, confidentialWorkflowsCapabilityID). + Return(execCap, nil).Once() + + execCap.EXPECT().Execute(matches.AnyContext, mock.MatchedBy(func(req capabilities.CapabilityRequest) bool { + return req.Method == "Execute" && + req.CapabilityId == confidentialWorkflowsCapabilityID && + req.Metadata.WorkflowID == "wf-123" && + req.Metadata.WorkflowOwner == "owner-abc" && + req.Metadata.WorkflowExecutionID == "exec-456" && + req.Payload != nil + })).Return(capabilities.CapabilityResponse{Payload: respPayload}, nil).Once() + + mod := NewConfidentialModule( + capReg, + "https://example.com/binary.wasm", + []byte("fakehash"), + "wf-123", + "owner-abc", + "my-workflow", + "v1", + []SecretIdentifier{ + {Key: "API_KEY"}, + {Key: "SIGNING_KEY", Namespace: "custom-ns"}, + }, + nil, + lggr, + ) + + result, err := mod.Execute(ctx, execReq, &stubExecutionHelper{executionID: "exec-456"}) + require.NoError(t, err) + require.NotNil(t, result) + + val := result.GetValue() + require.NotNil(t, val) + assert.Equal(t, "enclave-output", val.GetStringValue()) + }) + + t.Run("default namespace is main", func(t *testing.T) { + capReg := regmocks.NewCapabilitiesRegistry(t) + execCap := capmocks.NewExecutableCapability(t) + + capReg.EXPECT().GetExecutable(matches.AnyContext, confidentialWorkflowsCapabilityID). + Return(execCap, nil).Once() + + // Capture the request to inspect proto secrets. + var capturedReq capabilities.CapabilityRequest + execCap.EXPECT().Execute(matches.AnyContext, mock.Anything). + Run(func(_ context.Context, req capabilities.CapabilityRequest) { + capturedReq = req + }). + Return(capabilities.CapabilityResponse{Payload: respPayload}, nil).Once() + + mod := NewConfidentialModule( + capReg, + "https://example.com/binary.wasm", + []byte("hash"), + "wf-1", "owner", "name", "tag", + []SecretIdentifier{{Key: "SECRET_A"}}, // no namespace + nil, + lggr, + ) + + _, err := mod.Execute(ctx, execReq, &stubExecutionHelper{executionID: "exec-1"}) + require.NoError(t, err) + + // Unmarshal the captured request payload and verify namespace defaulted to "main". + var confReq confworkflowtypes.ConfidentialWorkflowRequest + require.NoError(t, capturedReq.Payload.UnmarshalTo(&confReq)) + require.Len(t, confReq.VaultDonSecrets, 1) + assert.Equal(t, "SECRET_A", confReq.VaultDonSecrets[0].Key) + assert.Equal(t, "main", confReq.VaultDonSecrets[0].GetNamespace()) + }) + + t.Run("GetExecutable error", func(t *testing.T) { + capReg := regmocks.NewCapabilitiesRegistry(t) + capReg.EXPECT().GetExecutable(matches.AnyContext, confidentialWorkflowsCapabilityID). + Return(nil, errors.New("capability not found")).Once() + + mod := NewConfidentialModule( + capReg, "", nil, "wf", "owner", "name", "tag", nil, nil, lggr, + ) + + _, err := mod.Execute(ctx, execReq, &stubExecutionHelper{}) + require.Error(t, err) + assert.Contains(t, err.Error(), "failed to get confidential-workflows capability") + }) + + t.Run("capability Execute error", func(t *testing.T) { + capReg := regmocks.NewCapabilitiesRegistry(t) + execCap := capmocks.NewExecutableCapability(t) + + capReg.EXPECT().GetExecutable(matches.AnyContext, confidentialWorkflowsCapabilityID). + Return(execCap, nil).Once() + execCap.EXPECT().Execute(matches.AnyContext, mock.Anything). + Return(capabilities.CapabilityResponse{}, errors.New("enclave unavailable")).Once() + + mod := NewConfidentialModule( + capReg, "", nil, "wf", "owner", "name", "tag", nil, nil, lggr, + ) + + _, err := mod.Execute(ctx, execReq, &stubExecutionHelper{}) + require.Error(t, err) + assert.Contains(t, err.Error(), "confidential-workflows capability execution failed") + }) + + t.Run("nil payload in response", func(t *testing.T) { + capReg := regmocks.NewCapabilitiesRegistry(t) + execCap := capmocks.NewExecutableCapability(t) + + capReg.EXPECT().GetExecutable(matches.AnyContext, confidentialWorkflowsCapabilityID). + Return(execCap, nil).Once() + execCap.EXPECT().Execute(matches.AnyContext, mock.Anything). + Return(capabilities.CapabilityResponse{Payload: nil}, nil).Once() + + mod := NewConfidentialModule( + capReg, "", nil, "wf", "owner", "name", "tag", nil, nil, lggr, + ) + + _, err := mod.Execute(ctx, execReq, &stubExecutionHelper{}) + require.Error(t, err) + assert.Contains(t, err.Error(), "returned nil payload") + }) + + t.Run("request fields are forwarded correctly", func(t *testing.T) { + capReg := regmocks.NewCapabilitiesRegistry(t) + execCap := capmocks.NewExecutableCapability(t) + + capReg.EXPECT().GetExecutable(matches.AnyContext, confidentialWorkflowsCapabilityID). + Return(execCap, nil).Once() + + var capturedReq capabilities.CapabilityRequest + execCap.EXPECT().Execute(matches.AnyContext, mock.Anything). + Run(func(_ context.Context, req capabilities.CapabilityRequest) { + capturedReq = req + }). + Return(capabilities.CapabilityResponse{Payload: respPayload}, nil).Once() + + binaryHash := ComputeBinaryHash([]byte("some-binary")) + mod := NewConfidentialModule( + capReg, + "https://example.com/wasm", + binaryHash, + "wf-abc", + "0xowner", + "my-workflow", + "v2", + []SecretIdentifier{ + {Key: "K1", Namespace: "ns1"}, + {Key: "K2"}, + }, + nil, + lggr, + ) + + _, err := mod.Execute(ctx, execReq, &stubExecutionHelper{executionID: "exec-xyz"}) + require.NoError(t, err) + + // Verify metadata. + assert.Equal(t, "Execute", capturedReq.Method) + assert.Equal(t, confidentialWorkflowsCapabilityID, capturedReq.CapabilityId) + assert.Equal(t, "wf-abc", capturedReq.Metadata.WorkflowID) + assert.Equal(t, "0xowner", capturedReq.Metadata.WorkflowOwner) + assert.Equal(t, "my-workflow", capturedReq.Metadata.WorkflowName) + assert.Equal(t, "v2", capturedReq.Metadata.WorkflowTag) + assert.Equal(t, "exec-xyz", capturedReq.Metadata.WorkflowExecutionID) + + // Verify payload contents. + var confReq confworkflowtypes.ConfidentialWorkflowRequest + require.NoError(t, capturedReq.Payload.UnmarshalTo(&confReq)) + + assert.Equal(t, "wf-abc", confReq.Execution.WorkflowId) + assert.Equal(t, "https://example.com/wasm", confReq.Execution.BinaryUrl) + assert.Equal(t, binaryHash, confReq.Execution.BinaryHash) + + // Verify the serialized ExecuteRequest round-trips. + var roundTripped sdkpb.ExecuteRequest + require.NoError(t, proto.Unmarshal(confReq.Execution.ExecuteRequest, &roundTripped)) + assert.Equal(t, execReq.GetConfig(), roundTripped.GetConfig()) + + // Verify secrets. + require.Len(t, confReq.VaultDonSecrets, 2) + assert.Equal(t, "K1", confReq.VaultDonSecrets[0].Key) + assert.Equal(t, "ns1", confReq.VaultDonSecrets[0].GetNamespace()) + assert.Equal(t, "K2", confReq.VaultDonSecrets[1].Key) + assert.Equal(t, "main", confReq.VaultDonSecrets[1].GetNamespace()) + }) +} + +func TestConfidentialModule_BinaryURLResolver(t *testing.T) { + ctx := context.Background() + lggr := logger.Nop() + + execReq := &sdkpb.ExecuteRequest{Config: []byte("cfg")} + + expectedResult := &sdkpb.ExecutionResult{ + Result: &sdkpb.ExecutionResult_Value{ + Value: valuespb.NewStringValue("ok"), + }, + } + resultBytes, err := proto.Marshal(expectedResult) + require.NoError(t, err) + confResp := &confworkflowtypes.ConfidentialWorkflowResponse{ExecutionResult: resultBytes} + respPayload, err := anypb.New(confResp) + require.NoError(t, err) + + t.Run("resolver replaces binary URL", func(t *testing.T) { + capReg := regmocks.NewCapabilitiesRegistry(t) + execCap := capmocks.NewExecutableCapability(t) + + capReg.EXPECT().GetExecutable(matches.AnyContext, confidentialWorkflowsCapabilityID). + Return(execCap, nil).Once() + + var capturedReq capabilities.CapabilityRequest + execCap.EXPECT().Execute(matches.AnyContext, mock.Anything). + Run(func(_ context.Context, req capabilities.CapabilityRequest) { + capturedReq = req + }). + Return(capabilities.CapabilityResponse{Payload: respPayload}, nil).Once() + + resolver := func(_ context.Context, wfID string) (string, error) { + return "https://presigned.example.com/" + wfID + "?token=abc", nil + } + + mod := NewConfidentialModule( + capReg, "https://storage.example.com/raw", []byte("hash"), + "wf-1", "owner", "name", "tag", nil, resolver, lggr, + ) + + _, err := mod.Execute(ctx, execReq, &stubExecutionHelper{executionID: "exec-1"}) + require.NoError(t, err) + + var confReq confworkflowtypes.ConfidentialWorkflowRequest + require.NoError(t, capturedReq.Payload.UnmarshalTo(&confReq)) + assert.Equal(t, "https://presigned.example.com/wf-1?token=abc", confReq.Execution.BinaryUrl) + }) + + t.Run("nil resolver uses raw URL", func(t *testing.T) { + capReg := regmocks.NewCapabilitiesRegistry(t) + execCap := capmocks.NewExecutableCapability(t) + + capReg.EXPECT().GetExecutable(matches.AnyContext, confidentialWorkflowsCapabilityID). + Return(execCap, nil).Once() + + var capturedReq capabilities.CapabilityRequest + execCap.EXPECT().Execute(matches.AnyContext, mock.Anything). + Run(func(_ context.Context, req capabilities.CapabilityRequest) { + capturedReq = req + }). + Return(capabilities.CapabilityResponse{Payload: respPayload}, nil).Once() + + mod := NewConfidentialModule( + capReg, "https://storage.example.com/raw", []byte("hash"), + "wf-1", "owner", "name", "tag", nil, nil, lggr, + ) + + _, err := mod.Execute(ctx, execReq, &stubExecutionHelper{executionID: "exec-1"}) + require.NoError(t, err) + + var confReq confworkflowtypes.ConfidentialWorkflowRequest + require.NoError(t, capturedReq.Payload.UnmarshalTo(&confReq)) + assert.Equal(t, "https://storage.example.com/raw", confReq.Execution.BinaryUrl) + }) + + t.Run("resolver error propagates", func(t *testing.T) { + capReg := regmocks.NewCapabilitiesRegistry(t) + + resolver := func(_ context.Context, _ string) (string, error) { + return "", errors.New("storage service unavailable") + } + + mod := NewConfidentialModule( + capReg, "https://storage.example.com/raw", []byte("hash"), + "wf-1", "owner", "name", "tag", nil, resolver, lggr, + ) + + _, err := mod.Execute(ctx, execReq, &stubExecutionHelper{executionID: "exec-1"}) + require.Error(t, err) + assert.Contains(t, err.Error(), "failed to resolve binary URL") + assert.Contains(t, err.Error(), "storage service unavailable") + }) +} + +func TestConfidentialModule_InterfaceMethods(t *testing.T) { + mod := &ConfidentialModule{} + + // These are no-ops but should not panic. + mod.Start() + mod.Close() + assert.False(t, mod.IsLegacyDAG()) +} diff --git a/core/store/migrate/migrations/0295_add_workflow_attributes_column.sql b/core/store/migrate/migrations/0295_add_workflow_attributes_column.sql new file mode 100644 index 00000000000..30c85db7289 --- /dev/null +++ b/core/store/migrate/migrations/0295_add_workflow_attributes_column.sql @@ -0,0 +1,5 @@ +-- +goose Up +ALTER TABLE workflow_specs_v2 ADD COLUMN attributes bytea DEFAULT ''; + +-- +goose Down +ALTER TABLE workflow_specs_v2 DROP COLUMN attributes;