From ee36faef8c2f36e05b1a2b1802e13323aaee86bd Mon Sep 17 00:00:00 2001 From: Tejaswi Nadahalli Date: Tue, 24 Mar 2026 12:40:11 +0100 Subject: [PATCH 01/12] resolve go.mod conflicts --- core/config/cre_config.go | 8 + core/config/toml/types.go | 27 +- core/services/chainlink/config_cre.go | 29 ++ core/services/job/models.go | 1 + .../conversions/conversions.go | 4 + core/services/workflows/artifacts/v2/orm.go | 9 +- core/services/workflows/syncer/fetcher.go | 18 +- core/services/workflows/syncer/v2/fetcher.go | 18 +- core/services/workflows/syncer/v2/handler.go | 119 +++++- .../workflows/syncer/v2/handler_test.go | 258 +++++++++++++ .../workflows/v2/confidential_module.go | 188 ++++++++++ .../workflows/v2/confidential_module_test.go | 343 ++++++++++++++++++ .../0295_add_workflow_attributes_column.sql | 5 + 13 files changed, 1001 insertions(+), 26 deletions(-) create mode 100644 core/services/workflows/v2/confidential_module.go create mode 100644 core/services/workflows/v2/confidential_module_test.go create mode 100644 core/store/migrate/migrations/0295_add_workflow_attributes_column.sql diff --git a/core/config/cre_config.go b/core/config/cre_config.go index 57bc4ad31a9..78a600d1490 100644 --- a/core/config/cre_config.go +++ b/core/config/cre_config.go @@ -13,6 +13,7 @@ type CRE interface { // When enabled, additional OTel tracing and logging is performed. DebugMode() bool LocalSecrets() map[string]string + ConfidentialRelay() CREConfidentialRelay } // WorkflowFetcher defines configuration for fetching workflow files @@ -21,6 +22,13 @@ type WorkflowFetcher interface { URL() string } +// CREConfidentialRelay defines configuration for the confidential relay handler. +type CREConfidentialRelay interface { + Enabled() bool + TrustedPCRs() string + CARootsPEM() string +} + // CRELinking defines configuration for connecting to the CRE linking service type CRELinking interface { URL() string diff --git a/core/config/toml/types.go b/core/config/toml/types.go index 0865babfc96..8b4d8cd617e 100644 --- a/core/config/toml/types.go +++ b/core/config/toml/types.go @@ -1985,7 +1985,8 @@ type CreConfig struct { // Requires [Tracing].Enabled = true for traces to be exported (trace export is gated by // Tracing.Enabled in initGlobals; Telemetry.Enabled is optional—traces work with or without it). // WARNING: This is not suitable for production use due to performance overhead. - DebugMode *bool `toml:",omitempty"` + DebugMode *bool `toml:",omitempty"` + ConfidentialRelay *ConfidentialRelayConfig `toml:",omitempty"` } // WorkflowFetcherConfig holds the configuration for fetching workflow files @@ -1993,6 +1994,15 @@ type WorkflowFetcherConfig struct { URL *string `toml:",omitempty"` } +// ConfidentialRelayConfig holds the configuration for the confidential relay handler. +// When Enabled is true, the node participates in the confidential relay DON, +// validating enclave attestations and proxying capability requests. +type ConfidentialRelayConfig struct { + Enabled *bool `toml:",omitempty"` + TrustedPCRs *string `toml:",omitempty"` + CARootsPEM *string `toml:",omitempty"` +} + // LinkingConfig holds the configuration for connecting to the CRE linking service type LinkingConfig struct { URL *string `toml:",omitempty"` @@ -2046,6 +2056,21 @@ func (c *CreConfig) setFrom(f *CreConfig) { if f.DebugMode != nil { c.DebugMode = f.DebugMode } + + if f.ConfidentialRelay != nil { + if c.ConfidentialRelay == nil { + c.ConfidentialRelay = &ConfidentialRelayConfig{} + } + if v := f.ConfidentialRelay.Enabled; v != nil { + c.ConfidentialRelay.Enabled = v + } + if v := f.ConfidentialRelay.TrustedPCRs; v != nil { + c.ConfidentialRelay.TrustedPCRs = v + } + if v := f.ConfidentialRelay.CARootsPEM; v != nil { + c.ConfidentialRelay.CARootsPEM = v + } + } } func (w *WorkflowFetcherConfig) ValidateConfig() error { diff --git a/core/services/chainlink/config_cre.go b/core/services/chainlink/config_cre.go index 6714d6a63d5..dbb9c404f21 100644 --- a/core/services/chainlink/config_cre.go +++ b/core/services/chainlink/config_cre.go @@ -105,6 +105,35 @@ func (c *creConfig) Linking() config.CRELinking { return &linkingConfig{url: url, tlsEnabled: tlsEnabled} } +type confidentialRelayConfig struct { + enabled bool + trustedPCRs string + caRootsPEM string +} + +func (cr *confidentialRelayConfig) Enabled() bool { return cr.enabled } +func (cr *confidentialRelayConfig) TrustedPCRs() string { return cr.trustedPCRs } +func (cr *confidentialRelayConfig) CARootsPEM() string { return cr.caRootsPEM } + +func (c *creConfig) ConfidentialRelay() config.CREConfidentialRelay { + if c.c.ConfidentialRelay == nil { + return &confidentialRelayConfig{} + } + enabled := false + if c.c.ConfidentialRelay.Enabled != nil { + enabled = *c.c.ConfidentialRelay.Enabled + } + trustedPCRs := "" + if c.c.ConfidentialRelay.TrustedPCRs != nil { + trustedPCRs = *c.c.ConfidentialRelay.TrustedPCRs + } + caRootsPEM := "" + if c.c.ConfidentialRelay.CARootsPEM != nil { + caRootsPEM = *c.c.ConfidentialRelay.CARootsPEM + } + return &confidentialRelayConfig{enabled: enabled, trustedPCRs: trustedPCRs, caRootsPEM: caRootsPEM} +} + func (c *creConfig) LocalSecrets() map[string]string { return c.s.LocalSecrets } diff --git a/core/services/job/models.go b/core/services/job/models.go index 20e7da2c643..2d257a33ca5 100644 --- a/core/services/job/models.go +++ b/core/services/job/models.go @@ -930,6 +930,7 @@ type WorkflowSpec struct { CreatedAt time.Time `toml:"-"` UpdatedAt time.Time `toml:"-"` SpecType WorkflowSpecType `toml:"spec_type" db:"spec_type"` + Attributes []byte `db:"attributes"` sdkWorkflow *sdk.WorkflowSpec rawSpec []byte config []byte diff --git a/core/services/standardcapabilities/conversions/conversions.go b/core/services/standardcapabilities/conversions/conversions.go index b9a233243af..d623fa9a409 100644 --- a/core/services/standardcapabilities/conversions/conversions.go +++ b/core/services/standardcapabilities/conversions/conversions.go @@ -50,6 +50,8 @@ func GetCapabilityIDFromCommand(command string, config string) string { return "http-trigger@1.0.0-alpha" case "http_action": return "http-actions@1.0.0-alpha" // plural "actions" + case "mock": + return "mock@1.0.0" default: return "" } @@ -71,6 +73,8 @@ func GetCommandFromCapabilityID(capabilityID string) string { return "http_trigger" case strings.HasPrefix(capabilityID, "http-actions"): return "http_action" + case strings.HasPrefix(capabilityID, "mock"): + return "mock" default: return "" } diff --git a/core/services/workflows/artifacts/v2/orm.go b/core/services/workflows/artifacts/v2/orm.go index 7b37afda16b..6d8887dbe06 100644 --- a/core/services/workflows/artifacts/v2/orm.go +++ b/core/services/workflows/artifacts/v2/orm.go @@ -63,7 +63,8 @@ func (orm *orm) UpsertWorkflowSpec(ctx context.Context, spec *job.WorkflowSpec) config_url, created_at, updated_at, - spec_type + spec_type, + attributes ) VALUES ( :workflow, :config, @@ -76,7 +77,8 @@ func (orm *orm) UpsertWorkflowSpec(ctx context.Context, spec *job.WorkflowSpec) :config_url, :created_at, :updated_at, - :spec_type + :spec_type, + :attributes ) ON CONFLICT (workflow_id) DO UPDATE SET workflow = EXCLUDED.workflow, @@ -89,7 +91,8 @@ func (orm *orm) UpsertWorkflowSpec(ctx context.Context, spec *job.WorkflowSpec) config_url = EXCLUDED.config_url, created_at = EXCLUDED.created_at, updated_at = EXCLUDED.updated_at, - spec_type = EXCLUDED.spec_type + spec_type = EXCLUDED.spec_type, + attributes = EXCLUDED.attributes RETURNING id ` diff --git a/core/services/workflows/syncer/fetcher.go b/core/services/workflows/syncer/fetcher.go index c959b6b4da7..33ba4b10a2e 100644 --- a/core/services/workflows/syncer/fetcher.go +++ b/core/services/workflows/syncer/fetcher.go @@ -177,12 +177,20 @@ func newFileFetcher(basePath string, lggr logger.Logger) types.FetcherFunc { if err != nil { return nil, fmt.Errorf("invalid URL: %w", err) } - fullPath := filepath.Clean(u.Path) - // ensure that the incoming request URL is either relative or absolute but within the basePath - if !filepath.IsAbs(fullPath) { - // If it's not absolute, we assume it's relative to the basePath - fullPath = filepath.Join(basePath, fullPath) + var fullPath string + if u.Scheme == "http" || u.Scheme == "https" { + // For HTTP(S) URLs, extract just the filename and resolve against basePath. + // This supports confidential workflows where the on-chain URL must be HTTP + // (so the enclave can fetch the binary), but the syncer reads from the local filesystem. + fullPath = filepath.Join(basePath, filepath.Base(u.Path)) + } else { + fullPath = filepath.Clean(u.Path) + // ensure that the incoming request URL is either relative or absolute but within the basePath + if !filepath.IsAbs(fullPath) { + // If it's not absolute, we assume it's relative to the basePath + fullPath = filepath.Join(basePath, fullPath) + } } if !strings.HasPrefix(fullPath, basePath) { return nil, fmt.Errorf("request URL %s is not within the basePath %s", fullPath, basePath) diff --git a/core/services/workflows/syncer/v2/fetcher.go b/core/services/workflows/syncer/v2/fetcher.go index e86f998f6bb..4c79fffbe24 100644 --- a/core/services/workflows/syncer/v2/fetcher.go +++ b/core/services/workflows/syncer/v2/fetcher.go @@ -211,12 +211,20 @@ func newFileFetcher(basePath string, lggr logger.Logger) types.FetcherFunc { if err != nil { return nil, fmt.Errorf("invalid URL: %w", err) } - fullPath := filepath.Clean(u.Path) - // ensure that the incoming request URL is either relative or absolute but within the basePath - if !filepath.IsAbs(fullPath) { - // If it's not absolute, we assume it's relative to the basePath - fullPath = filepath.Join(basePath, fullPath) + var fullPath string + if u.Scheme == "http" || u.Scheme == "https" { + // For HTTP(S) URLs, extract just the filename and resolve against basePath. + // This supports confidential workflows where the on-chain URL must be HTTP + // (so the enclave can fetch the binary), but the syncer reads from the local filesystem. + fullPath = filepath.Join(basePath, filepath.Base(u.Path)) + } else { + fullPath = filepath.Clean(u.Path) + // ensure that the incoming request URL is either relative or absolute but within the basePath + if !filepath.IsAbs(fullPath) { + // If it's not absolute, we assume it's relative to the basePath + fullPath = filepath.Join(basePath, fullPath) + } } if !strings.HasPrefix(fullPath, basePath) { return nil, fmt.Errorf("request URL %s is not within the basePath %s", fullPath, basePath) diff --git a/core/services/workflows/syncer/v2/handler.go b/core/services/workflows/syncer/v2/handler.go index e78e2ec55bc..6f190b4d457 100644 --- a/core/services/workflows/syncer/v2/handler.go +++ b/core/services/workflows/syncer/v2/handler.go @@ -537,6 +537,7 @@ func (h *eventHandler) createWorkflowSpec(ctx context.Context, payload WorkflowR SpecType: job.WASMFile, BinaryURL: payload.BinaryURL, ConfigURL: payload.ConfigURL, + Attributes: payload.Attributes, } if _, err = h.workflowArtifactsStore.UpsertWorkflowSpec(ctx, entry); err != nil { @@ -789,6 +790,15 @@ func (h *eventHandler) tryEngineCreate(ctx context.Context, spec *job.WorkflowSp return fmt.Errorf("invalid workflow name: %w", err) } + confidential, err := v2.IsConfidential(spec.Attributes) + if err != nil { + return fmt.Errorf("failed to parse workflow attributes: %w", err) + } + if confidential { + h.lggr.Infow("routing workflow to confidential execution", "workflowID", spec.WorkflowID) + return h.tryConfidentialEngineCreate(ctx, spec, wid, workflowName, decodedBinary, source) + } + // Create a channel to receive the initialization result. // This allows us to wait for the engine to complete initialization (including trigger subscriptions) // before emitting the workflowActivated event, ensuring the event accurately reflects deployment status. @@ -811,7 +821,21 @@ func (h *eventHandler) tryEngineCreate(ctx context.Context, spec *job.WorkflowSp return fmt.Errorf("failed to create workflow engine: %w", err) } - if err = engine.Start(ctx); err != nil { + return h.startAndRegisterEngine(ctx, engine, initDone, wid, spec.WorkflowID, source) +} + +// startAndRegisterEngine starts a workflow engine, waits for initialization to +// complete, and registers it in the engine registry. Used by both the normal +// and confidential engine creation paths. +func (h *eventHandler) startAndRegisterEngine( + ctx context.Context, + engine services.Service, + initDone <-chan error, + wid types.WorkflowID, + workflowID string, + source string, +) error { + if err := engine.Start(ctx); err != nil { return fmt.Errorf("failed to start workflow engine: %w", err) } @@ -819,32 +843,25 @@ func (h *eventHandler) tryEngineCreate(ctx context.Context, spec *job.WorkflowSp // This ensures we don't emit workflowActivated events before the engine initializes successfully. select { case <-ctx.Done(): - // Context cancelled while waiting for initialization if closeErr := engine.Close(); closeErr != nil { - h.lggr.Errorw("failed to close engine after context cancellation", "error", closeErr, "workflowID", spec.WorkflowID) + h.lggr.Errorw("failed to close engine after context cancellation", "error", closeErr, "workflowID", workflowID) } return fmt.Errorf("context cancelled while waiting for engine initialization: %w", ctx.Err()) case initErr := <-initDone: if initErr != nil { - // Engine initialization failed (e.g., trigger subscription failed) // TODO (cre-1482) add logic to mark a deployment as failed to avoid churn. - // Currently, failed deployments will be retried on each poll cycle (with exponential backoff). - // If the failure is due to user error (e.g., invalid trigger config), this causes unnecessary retries. - // Consider marking the workflow spec as "failed" in the database and requiring workflow redeployment. if closeErr := engine.Close(); closeErr != nil { - h.lggr.Errorw("failed to close engine after initialization failure", "error", closeErr, "workflowID", spec.WorkflowID) + h.lggr.Errorw("failed to close engine after initialization failure", "error", closeErr, "workflowID", workflowID) } return fmt.Errorf("engine initialization failed: %w", initErr) } } - // Engine is fully initialized, add to registry with source tracking if err := h.engineRegistry.Add(wid, source, engine); err != nil { if closeErr := engine.Close(); closeErr != nil { return fmt.Errorf("failed to close workflow engine: %w during invariant violation: %w", closeErr, err) } - // Check for WorkflowID collision across sources if errors.Is(err, ErrAlreadyExists) { existingEntry, found := h.engineRegistry.Get(wid) if found { @@ -856,13 +873,91 @@ func (h *eventHandler) tryEngineCreate(ctx context.Context, spec *job.WorkflowSp } } - // This shouldn't happen because we call the handler serially and - // check for running engines above, see the call to engineRegistry.Contains. return fmt.Errorf("invariant violation: %w", err) } return nil } +// tryConfidentialEngineCreate creates a V2 engine backed by a ConfidentialModule +// instead of a local WASM module. The ConfidentialModule delegates execution to +// the confidential-workflows capability which runs the WASM inside a TEE. +func (h *eventHandler) tryConfidentialEngineCreate( + ctx context.Context, + spec *job.WorkflowSpec, + wid types.WorkflowID, + workflowName types.WorkflowName, + decodedBinary []byte, + source string, +) error { + attrs, err := v2.ParseWorkflowAttributes(spec.Attributes) + if err != nil { + return fmt.Errorf("failed to parse workflow attributes: %w", err) + } + + binaryHash := v2.ComputeBinaryHash(decodedBinary) + + lggr := logger.Named(h.lggr, "WorkflowEngine.ConfidentialModule") + lggr = logger.With(lggr, "workflowID", spec.WorkflowID, "workflowName", spec.WorkflowName, "workflowOwner", spec.WorkflowOwner) + + module := v2.NewConfidentialModule( + h.capRegistry, + spec.BinaryURL, + binaryHash, + spec.WorkflowID, + spec.WorkflowOwner, + workflowName.String(), + spec.WorkflowTag, + attrs.VaultDonSecrets, + lggr, + ) + + initDone := make(chan error, 1) + + cfg := &v2.EngineConfig{ + Lggr: h.lggr, + Module: module, + WorkflowConfig: []byte(spec.Config), + CapRegistry: h.capRegistry, + DonSubscriber: h.workflowDonSubscriber, + UseLocalTimeProvider: h.useLocalTimeProvider, + DonTimeStore: h.donTimeStore, + ExecutionsStore: h.workflowStore, + WorkflowID: spec.WorkflowID, + WorkflowOwner: spec.WorkflowOwner, + WorkflowName: workflowName, + WorkflowTag: spec.WorkflowTag, + WorkflowEncryptionKey: h.workflowEncryptionKey, + + LocalLimits: v2.EngineLimits{}, + LocalLimiters: h.engineLimiters, + GlobalExecutionConcurrencyLimiter: h.workflowLimits, + + BeholderEmitter: h.emitter, + BillingClient: h.billingClient, + + WorkflowRegistryAddress: h.workflowRegistryAddress, + WorkflowRegistryChainSelector: h.workflowRegistryChainSelector, + OrgResolver: h.orgResolver, + SecretsFetcher: h.secretsFetcher, + FeatureFlags: h.featureFlags, + } + + existingHook := cfg.Hooks.OnInitialized + cfg.Hooks.OnInitialized = func(err error) { + initDone <- err + if existingHook != nil { + existingHook(err) + } + } + + engine, err := v2.NewEngine(cfg) + if err != nil { + return fmt.Errorf("failed to create confidential workflow engine: %w", err) + } + + return h.startAndRegisterEngine(ctx, engine, initDone, wid, spec.WorkflowID, source) +} + // logCustMsg emits a custom message to the external sink and logs an error if that fails. func logCustMsg(ctx context.Context, cma custmsg.MessageEmitter, msg string, log logger.Logger) { err := cma.Emit(ctx, msg) diff --git a/core/services/workflows/syncer/v2/handler_test.go b/core/services/workflows/syncer/v2/handler_test.go index ad7b80d06b9..775f059613e 100644 --- a/core/services/workflows/syncer/v2/handler_test.go +++ b/core/services/workflows/syncer/v2/handler_test.go @@ -618,6 +618,264 @@ func Test_workflowRegisteredHandler(t *testing.T) { } } +func Test_workflowRegisteredHandler_confidentialRouting(t *testing.T) { + t.Run("confidential workflow bypasses engine factory and routes to confidential path", func(t *testing.T) { + var ( + ctx = testutils.Context(t) + lggr = logger.TestLogger(t) + lf = limits.Factory{Logger: lggr} + db = pgtest.NewSqlxDB(t) + orm = artifacts.NewWorkflowRegistryDS(db, lggr) + emitter = custmsg.NewLabeler() + + binary = wasmtest.CreateTestBinary(binaryCmd, true, t) + encodedBinary = []byte(base64.StdEncoding.EncodeToString(binary)) + config = []byte("") + wfOwner = []byte("0xOwner") + workflowEncryptionKey = workflowkey.MustNewXXXTestingOnly(big.NewInt(1)) + ) + + giveWFID, err := pkgworkflows.GenerateWorkflowID(wfOwner, "workflow-name", binary, config, "") + require.NoError(t, err) + wfIDString := hex.EncodeToString(giveWFID[:]) + + binaryURL := "http://example.com/" + wfIDString + "/binary" + configURL := "http://example.com/" + wfIDString + "/config" + signedURLParameter := "?auth=abc123" + signedBinaryURL := binaryURL + signedURLParameter + signedConfigURL := configURL + signedURLParameter + + fetcher := newMockFetcher(map[string]mockFetchResp{ + wfIDString + "-ARTIFACT_TYPE_BINARY": {Body: []byte(signedBinaryURL), Err: nil}, + wfIDString + "-ARTIFACT_TYPE_CONFIG": {Body: []byte(signedConfigURL), Err: nil}, + signedBinaryURL: {Body: encodedBinary, Err: nil}, + signedConfigURL: {Body: config, Err: nil}, + }) + artifactStore, err := artifacts.NewStore(lggr, orm, fetcher.FetcherFunc(), fetcher.RetrieverFunc(), clockwork.NewFakeClock(), workflowkey.Key{}, custmsg.NewLabeler(), lf, artifacts.WithConfig(artifacts.StoreConfig{ + ArtifactStorageHost: "example.com", + })) + require.NoError(t, err) + + er := NewEngineRegistry() + + // Track whether the engine factory is called. The confidential path + // should bypass it entirely. + factoryCalled := false + trackingFactory := func(ctx context.Context, wfid string, owner string, name types.WorkflowName, tag string, config []byte, binary []byte, initDone chan<- error) (services.Service, error) { + factoryCalled = true + if initDone != nil { + initDone <- nil + } + return &mockEngine{}, nil + } + + wfStore := store.NewInMemoryStore(lggr, clockwork.NewFakeClock()) + registry := capabilities.NewRegistry(lggr) + registry.SetLocalRegistry(&capabilities.TestMetadataRegistry{}) + limiters, err := v2.NewLimiters(lf, nil) + require.NoError(t, err) + rl, err := ratelimiter.NewRateLimiter(rlConfig) + require.NoError(t, err) + workflowLimits, err := syncerlimiter.NewWorkflowLimits(lggr, syncerlimiter.Config{Global: 200, PerOwner: 200}, lf) + require.NoError(t, err) + + h, err := NewEventHandler(lggr, wfStore, nil, true, registry, er, emitter, limiters, nil, rl, workflowLimits, artifactStore, workflowEncryptionKey, &testDonNotifier{}, + WithEngineRegistry(er), + WithEngineFactoryFn(trackingFactory), + ) + require.NoError(t, err) + servicetest.Run(t, h) + + event := WorkflowRegisteredEvent{ + Status: WorkflowStatusActive, + WorkflowID: giveWFID, + WorkflowOwner: wfOwner, + WorkflowName: "workflow-name", + WorkflowTag: "workflow-tag", + BinaryURL: binaryURL, + ConfigURL: configURL, + Attributes: []byte(`{"confidential":true,"vault_don_secrets":[{"key":"API_KEY"}]}`), + } + + ctx = contexts.WithCRE(ctx, contexts.CRE{Owner: hex.EncodeToString(wfOwner), Workflow: wfIDString}) + err = h.workflowRegisteredEvent(ctx, event) + + // The confidential path creates a real v2.Engine. With test data + // (non-hex owner), engine creation fails. The error comes from the + // confidential path, proving routing worked correctly. + require.Error(t, err) + assert.Contains(t, err.Error(), "failed to create confidential workflow engine") + + // The engine factory must NOT have been called; the confidential path + // bypasses it. + assert.False(t, factoryCalled, "engine factory should not be called for confidential workflows") + + // The engine should NOT be in the registry since init failed. + _, ok := er.Get(giveWFID) + assert.False(t, ok, "engine should not be registered after failed init") + }) + + t.Run("non-confidential workflow uses engine factory", func(t *testing.T) { + var ( + ctx = testutils.Context(t) + lggr = logger.TestLogger(t) + lf = limits.Factory{Logger: lggr} + db = pgtest.NewSqlxDB(t) + orm = artifacts.NewWorkflowRegistryDS(db, lggr) + emitter = custmsg.NewLabeler() + + binary = wasmtest.CreateTestBinary(binaryCmd, true, t) + encodedBinary = []byte(base64.StdEncoding.EncodeToString(binary)) + config = []byte("") + wfOwner = []byte("0xOwner") + workflowEncryptionKey = workflowkey.MustNewXXXTestingOnly(big.NewInt(1)) + ) + + giveWFID, err := pkgworkflows.GenerateWorkflowID(wfOwner, "workflow-name", binary, config, "") + require.NoError(t, err) + wfIDString := hex.EncodeToString(giveWFID[:]) + + binaryURL := "http://example.com/" + wfIDString + "/binary" + configURL := "http://example.com/" + wfIDString + "/config" + signedURLParameter := "?auth=abc123" + signedBinaryURL := binaryURL + signedURLParameter + signedConfigURL := configURL + signedURLParameter + + fetcher := newMockFetcher(map[string]mockFetchResp{ + wfIDString + "-ARTIFACT_TYPE_BINARY": {Body: []byte(signedBinaryURL), Err: nil}, + wfIDString + "-ARTIFACT_TYPE_CONFIG": {Body: []byte(signedConfigURL), Err: nil}, + signedBinaryURL: {Body: encodedBinary, Err: nil}, + signedConfigURL: {Body: config, Err: nil}, + }) + artifactStore, err := artifacts.NewStore(lggr, orm, fetcher.FetcherFunc(), fetcher.RetrieverFunc(), clockwork.NewFakeClock(), workflowkey.Key{}, custmsg.NewLabeler(), lf, artifacts.WithConfig(artifacts.StoreConfig{ + ArtifactStorageHost: "example.com", + })) + require.NoError(t, err) + + er := NewEngineRegistry() + + factoryCalled := false + trackingFactory := func(ctx context.Context, wfid string, owner string, name types.WorkflowName, tag string, config []byte, binary []byte, initDone chan<- error) (services.Service, error) { + factoryCalled = true + if initDone != nil { + initDone <- nil + } + return &mockEngine{}, nil + } + + wfStore := store.NewInMemoryStore(lggr, clockwork.NewFakeClock()) + registry := capabilities.NewRegistry(lggr) + registry.SetLocalRegistry(&capabilities.TestMetadataRegistry{}) + limiters, err := v2.NewLimiters(lf, nil) + require.NoError(t, err) + rl, err := ratelimiter.NewRateLimiter(rlConfig) + require.NoError(t, err) + workflowLimits, err := syncerlimiter.NewWorkflowLimits(lggr, syncerlimiter.Config{Global: 200, PerOwner: 200}, lf) + require.NoError(t, err) + + h, err := NewEventHandler(lggr, wfStore, nil, true, registry, er, emitter, limiters, nil, rl, workflowLimits, artifactStore, workflowEncryptionKey, &testDonNotifier{}, + WithEngineRegistry(er), + WithEngineFactoryFn(trackingFactory), + ) + require.NoError(t, err) + servicetest.Run(t, h) + + event := WorkflowRegisteredEvent{ + Status: WorkflowStatusActive, + WorkflowID: giveWFID, + WorkflowOwner: wfOwner, + WorkflowName: "workflow-name", + WorkflowTag: "workflow-tag", + BinaryURL: binaryURL, + ConfigURL: configURL, + // No Attributes, or non-confidential attributes. + } + + ctx = contexts.WithCRE(ctx, contexts.CRE{Owner: hex.EncodeToString(wfOwner), Workflow: wfIDString}) + err = h.workflowRegisteredEvent(ctx, event) + require.NoError(t, err) + + assert.True(t, factoryCalled, "engine factory should be called for non-confidential workflows") + + engine, ok := er.Get(giveWFID) + require.True(t, ok, "engine should be registered") + require.NoError(t, engine.Ready()) + }) + + t.Run("malformed attributes returns error", func(t *testing.T) { + var ( + ctx = testutils.Context(t) + lggr = logger.TestLogger(t) + lf = limits.Factory{Logger: lggr} + db = pgtest.NewSqlxDB(t) + orm = artifacts.NewWorkflowRegistryDS(db, lggr) + emitter = custmsg.NewLabeler() + + binary = wasmtest.CreateTestBinary(binaryCmd, true, t) + encodedBinary = []byte(base64.StdEncoding.EncodeToString(binary)) + config = []byte("") + wfOwner = []byte("0xOwner") + workflowEncryptionKey = workflowkey.MustNewXXXTestingOnly(big.NewInt(1)) + ) + + giveWFID, err := pkgworkflows.GenerateWorkflowID(wfOwner, "workflow-name", binary, config, "") + require.NoError(t, err) + wfIDString := hex.EncodeToString(giveWFID[:]) + + binaryURL := "http://example.com/" + wfIDString + "/binary" + configURL := "http://example.com/" + wfIDString + "/config" + signedURLParameter := "?auth=abc123" + signedBinaryURL := binaryURL + signedURLParameter + signedConfigURL := configURL + signedURLParameter + + fetcher := newMockFetcher(map[string]mockFetchResp{ + wfIDString + "-ARTIFACT_TYPE_BINARY": {Body: []byte(signedBinaryURL), Err: nil}, + wfIDString + "-ARTIFACT_TYPE_CONFIG": {Body: []byte(signedConfigURL), Err: nil}, + signedBinaryURL: {Body: encodedBinary, Err: nil}, + signedConfigURL: {Body: config, Err: nil}, + }) + artifactStore, err := artifacts.NewStore(lggr, orm, fetcher.FetcherFunc(), fetcher.RetrieverFunc(), clockwork.NewFakeClock(), workflowkey.Key{}, custmsg.NewLabeler(), lf, artifacts.WithConfig(artifacts.StoreConfig{ + ArtifactStorageHost: "example.com", + })) + require.NoError(t, err) + + er := NewEngineRegistry() + wfStore := store.NewInMemoryStore(lggr, clockwork.NewFakeClock()) + registry := capabilities.NewRegistry(lggr) + registry.SetLocalRegistry(&capabilities.TestMetadataRegistry{}) + limiters, err := v2.NewLimiters(lf, nil) + require.NoError(t, err) + rl, err := ratelimiter.NewRateLimiter(rlConfig) + require.NoError(t, err) + workflowLimits, err := syncerlimiter.NewWorkflowLimits(lggr, syncerlimiter.Config{Global: 200, PerOwner: 200}, lf) + require.NoError(t, err) + + h, err := NewEventHandler(lggr, wfStore, nil, true, registry, er, emitter, limiters, nil, rl, workflowLimits, artifactStore, workflowEncryptionKey, &testDonNotifier{}, + WithEngineRegistry(er), + WithEngineFactoryFn(mockEngineFactory), + ) + require.NoError(t, err) + servicetest.Run(t, h) + + event := WorkflowRegisteredEvent{ + Status: WorkflowStatusActive, + WorkflowID: giveWFID, + WorkflowOwner: wfOwner, + WorkflowName: "workflow-name", + WorkflowTag: "workflow-tag", + BinaryURL: binaryURL, + ConfigURL: configURL, + Attributes: []byte(`{not valid json`), + } + + ctx = contexts.WithCRE(ctx, contexts.CRE{Owner: hex.EncodeToString(wfOwner), Workflow: wfIDString}) + err = h.workflowRegisteredEvent(ctx, event) + + require.Error(t, err) + assert.Contains(t, err.Error(), "failed to parse workflow attributes") + }) +} + type testCase struct { Name string BinaryURLFactory func(string) string diff --git a/core/services/workflows/v2/confidential_module.go b/core/services/workflows/v2/confidential_module.go new file mode 100644 index 00000000000..4cc3761e18a --- /dev/null +++ b/core/services/workflows/v2/confidential_module.go @@ -0,0 +1,188 @@ +package v2 + +import ( + "context" + "crypto/sha256" + "encoding/json" + "fmt" + + "google.golang.org/protobuf/proto" + "google.golang.org/protobuf/types/known/anypb" + + "github.com/smartcontractkit/chainlink-common/pkg/capabilities" + "github.com/smartcontractkit/chainlink-common/pkg/logger" + "github.com/smartcontractkit/chainlink-common/pkg/types/core" + "github.com/smartcontractkit/chainlink-common/pkg/workflows/wasm/host" + + confworkflowtypes "github.com/smartcontractkit/chainlink-common/pkg/capabilities/v2/actions/confidentialworkflow" + sdkpb "github.com/smartcontractkit/chainlink-protos/cre/go/sdk" +) + +const confidentialWorkflowsCapabilityID = "confidential-workflows@1.0.0-alpha" + +// WorkflowAttributes is the JSON structure stored in WorkflowSpec.Attributes. +type WorkflowAttributes struct { + Confidential bool `json:"confidential"` + VaultDonSecrets []SecretIdentifier `json:"vault_don_secrets"` +} + +// SecretIdentifier identifies a secret in VaultDON. +type SecretIdentifier struct { + Key string `json:"key"` + Namespace string `json:"namespace,omitempty"` +} + +// ParseWorkflowAttributes parses the Attributes JSON from a WorkflowSpec. +// Returns a zero-value struct if data is nil or empty. +func ParseWorkflowAttributes(data []byte) (WorkflowAttributes, error) { + var attrs WorkflowAttributes + if len(data) == 0 { + return attrs, nil + } + if err := json.Unmarshal(data, &attrs); err != nil { + return attrs, fmt.Errorf("failed to parse workflow attributes: %w", err) + } + return attrs, nil +} + +// IsConfidential returns true if the Attributes JSON has "confidential": true. +// Returns an error if the attributes contain malformed JSON, so callers can +// fail loudly rather than silently falling through to non-confidential execution. +func IsConfidential(data []byte) (bool, error) { + attrs, err := ParseWorkflowAttributes(data) + if err != nil { + return false, err + } + return attrs.Confidential, nil +} + +// ConfidentialModule implements host.ModuleV2 for confidential workflows. +// Instead of running WASM locally, it delegates execution to the +// confidential-workflows capability via the CapabilitiesRegistry. +type ConfidentialModule struct { + capRegistry core.CapabilitiesRegistry + binaryURL string + binaryHash []byte + workflowID string + workflowOwner string + workflowName string + workflowTag string + vaultDonSecrets []SecretIdentifier + lggr logger.Logger +} + +var _ host.ModuleV2 = (*ConfidentialModule)(nil) + +func NewConfidentialModule( + capRegistry core.CapabilitiesRegistry, + binaryURL string, + binaryHash []byte, + workflowID string, + workflowOwner string, + workflowName string, + workflowTag string, + vaultDonSecrets []SecretIdentifier, + lggr logger.Logger, +) *ConfidentialModule { + return &ConfidentialModule{ + capRegistry: capRegistry, + binaryURL: binaryURL, + binaryHash: binaryHash, + workflowID: workflowID, + workflowOwner: workflowOwner, + workflowName: workflowName, + workflowTag: workflowTag, + vaultDonSecrets: vaultDonSecrets, + lggr: lggr, + } +} + +func (m *ConfidentialModule) Start() {} +func (m *ConfidentialModule) Close() {} +func (m *ConfidentialModule) IsLegacyDAG() bool { return false } + +func (m *ConfidentialModule) Execute( + ctx context.Context, + request *sdkpb.ExecuteRequest, + helper host.ExecutionHelper, +) (*sdkpb.ExecutionResult, error) { + execReqBytes, err := proto.Marshal(request) + if err != nil { + return nil, fmt.Errorf("failed to marshal ExecuteRequest: %w", err) + } + + protoSecrets := make([]*confworkflowtypes.SecretIdentifier, len(m.vaultDonSecrets)) + for i, s := range m.vaultDonSecrets { + // VaultDON treats "main" as the default namespace for secrets. + ns := s.Namespace + if ns == "" { + ns = "main" + } + protoSecrets[i] = &confworkflowtypes.SecretIdentifier{ + Key: s.Key, + Namespace: &ns, + } + } + + capInput := &confworkflowtypes.ConfidentialWorkflowRequest{ + VaultDonSecrets: protoSecrets, + Execution: &confworkflowtypes.WorkflowExecution{ + WorkflowId: m.workflowID, + BinaryUrl: m.binaryURL, + BinaryHash: m.binaryHash, + ExecuteRequest: execReqBytes, + Owner: m.workflowOwner, + ExecutionId: helper.GetWorkflowExecutionID(), + }, + } + + payload, err := anypb.New(capInput) + if err != nil { + return nil, fmt.Errorf("failed to marshal capability payload: %w", err) + } + + executable, err := m.capRegistry.GetExecutable(ctx, confidentialWorkflowsCapabilityID) + if err != nil { + return nil, fmt.Errorf("failed to get confidential-workflows capability: %w", err) + } + + capReq := capabilities.CapabilityRequest{ + Payload: payload, + Method: "Execute", + CapabilityId: confidentialWorkflowsCapabilityID, + Metadata: capabilities.RequestMetadata{ + WorkflowID: m.workflowID, + WorkflowOwner: m.workflowOwner, + WorkflowName: m.workflowName, + WorkflowTag: m.workflowTag, + WorkflowExecutionID: helper.GetWorkflowExecutionID(), + }, + } + + capResp, err := executable.Execute(ctx, capReq) + if err != nil { + return nil, fmt.Errorf("confidential-workflows capability execution failed: %w", err) + } + + if capResp.Payload == nil { + return nil, fmt.Errorf("confidential-workflows capability returned nil payload") + } + + var confResp confworkflowtypes.ConfidentialWorkflowResponse + if err := capResp.Payload.UnmarshalTo(&confResp); err != nil { + return nil, fmt.Errorf("failed to unmarshal capability response: %w", err) + } + + var result sdkpb.ExecutionResult + if err := proto.Unmarshal(confResp.ExecutionResult, &result); err != nil { + return nil, fmt.Errorf("failed to unmarshal ExecutionResult: %w", err) + } + + return &result, nil +} + +// ComputeBinaryHash returns the SHA-256 hash of the given binary. +func ComputeBinaryHash(binary []byte) []byte { + h := sha256.Sum256(binary) + return h[:] +} diff --git a/core/services/workflows/v2/confidential_module_test.go b/core/services/workflows/v2/confidential_module_test.go new file mode 100644 index 00000000000..87dab426022 --- /dev/null +++ b/core/services/workflows/v2/confidential_module_test.go @@ -0,0 +1,343 @@ +package v2 + +import ( + "context" + "crypto/sha256" + "fmt" + "testing" + "time" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/mock" + "github.com/stretchr/testify/require" + "google.golang.org/protobuf/proto" + "google.golang.org/protobuf/types/known/anypb" + + "github.com/smartcontractkit/chainlink-common/pkg/capabilities" + "github.com/smartcontractkit/chainlink-common/pkg/logger" + regmocks "github.com/smartcontractkit/chainlink-common/pkg/types/core/mocks" + "github.com/smartcontractkit/chainlink-common/pkg/workflows/wasm/host" + + confworkflowtypes "github.com/smartcontractkit/chainlink-common/pkg/capabilities/v2/actions/confidentialworkflow" + capmocks "github.com/smartcontractkit/chainlink/v2/core/capabilities/mocks" + "github.com/smartcontractkit/chainlink/v2/core/utils/matches" + + sdkpb "github.com/smartcontractkit/chainlink-protos/cre/go/sdk" + valuespb "github.com/smartcontractkit/chainlink-protos/cre/go/values/pb" +) + +// stubExecutionHelper implements host.ExecutionHelper for testing. +type stubExecutionHelper struct { + executionID string +} + +var _ host.ExecutionHelper = (*stubExecutionHelper)(nil) + +func (s *stubExecutionHelper) CallCapability(context.Context, *sdkpb.CapabilityRequest) (*sdkpb.CapabilityResponse, error) { + return nil, nil +} +func (s *stubExecutionHelper) GetSecrets(context.Context, *sdkpb.GetSecretsRequest) ([]*sdkpb.SecretResponse, error) { + return nil, nil +} +func (s *stubExecutionHelper) GetWorkflowExecutionID() string { return s.executionID } +func (s *stubExecutionHelper) GetNodeTime() time.Time { return time.Time{} } +func (s *stubExecutionHelper) GetDONTime() (time.Time, error) { return time.Time{}, nil } +func (s *stubExecutionHelper) EmitUserLog(string) error { return nil } + +func TestParseWorkflowAttributes(t *testing.T) { + t.Run("valid JSON with all fields", func(t *testing.T) { + data := []byte(`{"confidential":true,"vault_don_secrets":[{"key":"API_KEY"},{"key":"SIGNING_KEY","namespace":"custom-ns"}]}`) + attrs, err := ParseWorkflowAttributes(data) + require.NoError(t, err) + assert.True(t, attrs.Confidential) + require.Len(t, attrs.VaultDonSecrets, 2) + assert.Equal(t, "API_KEY", attrs.VaultDonSecrets[0].Key) + assert.Equal(t, "", attrs.VaultDonSecrets[0].Namespace) + assert.Equal(t, "SIGNING_KEY", attrs.VaultDonSecrets[1].Key) + assert.Equal(t, "custom-ns", attrs.VaultDonSecrets[1].Namespace) + }) + + t.Run("empty data returns zero value", func(t *testing.T) { + attrs, err := ParseWorkflowAttributes(nil) + require.NoError(t, err) + assert.False(t, attrs.Confidential) + assert.Nil(t, attrs.VaultDonSecrets) + + attrs, err = ParseWorkflowAttributes([]byte{}) + require.NoError(t, err) + assert.False(t, attrs.Confidential) + }) + + t.Run("non-confidential workflow", func(t *testing.T) { + data := []byte(`{"confidential":false}`) + attrs, err := ParseWorkflowAttributes(data) + require.NoError(t, err) + assert.False(t, attrs.Confidential) + }) + + t.Run("malformed JSON returns error", func(t *testing.T) { + _, err := ParseWorkflowAttributes([]byte(`{not json}`)) + require.Error(t, err) + assert.Contains(t, err.Error(), "failed to parse workflow attributes") + }) +} + +func TestIsConfidential(t *testing.T) { + t.Run("returns true for confidential", func(t *testing.T) { + ok, err := IsConfidential([]byte(`{"confidential":true}`)) + require.NoError(t, err) + assert.True(t, ok) + }) + + t.Run("returns false for non-confidential", func(t *testing.T) { + ok, err := IsConfidential([]byte(`{"confidential":false}`)) + require.NoError(t, err) + assert.False(t, ok) + }) + + t.Run("returns false for empty data", func(t *testing.T) { + ok, err := IsConfidential(nil) + require.NoError(t, err) + assert.False(t, ok) + }) + + t.Run("returns error for malformed JSON", func(t *testing.T) { + _, err := IsConfidential([]byte(`broken`)) + require.Error(t, err) + assert.Contains(t, err.Error(), "failed to parse workflow attributes") + }) +} + +func TestComputeBinaryHash(t *testing.T) { + binary := []byte("hello world") + hash := ComputeBinaryHash(binary) + expected := sha256.Sum256(binary) + assert.Equal(t, expected[:], hash) + + // Deterministic: same input produces same hash. + assert.Equal(t, hash, ComputeBinaryHash(binary)) +} + +func TestConfidentialModule_Execute(t *testing.T) { + ctx := context.Background() + lggr := logger.Nop() + + // Build an ExecuteRequest to send through the module. + execReq := &sdkpb.ExecuteRequest{ + Config: []byte("test-config"), + } + + // Build the expected ExecutionResult that the enclave returns. + expectedResult := &sdkpb.ExecutionResult{ + Result: &sdkpb.ExecutionResult_Value{ + Value: valuespb.NewStringValue("enclave-output"), + }, + } + + // Serialize the result into a ConfidentialWorkflowResponse, as the capability would. + resultBytes, err := proto.Marshal(expectedResult) + require.NoError(t, err) + + confResp := &confworkflowtypes.ConfidentialWorkflowResponse{ + ExecutionResult: resultBytes, + } + respPayload, err := anypb.New(confResp) + require.NoError(t, err) + + t.Run("success", func(t *testing.T) { + capReg := regmocks.NewCapabilitiesRegistry(t) + execCap := capmocks.NewExecutableCapability(t) + + capReg.EXPECT().GetExecutable(matches.AnyContext, confidentialWorkflowsCapabilityID). + Return(execCap, nil).Once() + + execCap.EXPECT().Execute(matches.AnyContext, mock.MatchedBy(func(req capabilities.CapabilityRequest) bool { + return req.Method == "Execute" && + req.CapabilityId == confidentialWorkflowsCapabilityID && + req.Metadata.WorkflowID == "wf-123" && + req.Metadata.WorkflowOwner == "owner-abc" && + req.Metadata.WorkflowExecutionID == "exec-456" && + req.Payload != nil + })).Return(capabilities.CapabilityResponse{Payload: respPayload}, nil).Once() + + mod := NewConfidentialModule( + capReg, + "https://example.com/binary.wasm", + []byte("fakehash"), + "wf-123", + "owner-abc", + "my-workflow", + "v1", + []SecretIdentifier{ + {Key: "API_KEY"}, + {Key: "SIGNING_KEY", Namespace: "custom-ns"}, + }, + lggr, + ) + + result, err := mod.Execute(ctx, execReq, &stubExecutionHelper{executionID: "exec-456"}) + require.NoError(t, err) + require.NotNil(t, result) + + val := result.GetValue() + require.NotNil(t, val) + assert.Equal(t, "enclave-output", val.GetStringValue()) + }) + + t.Run("default namespace is main", func(t *testing.T) { + capReg := regmocks.NewCapabilitiesRegistry(t) + execCap := capmocks.NewExecutableCapability(t) + + capReg.EXPECT().GetExecutable(matches.AnyContext, confidentialWorkflowsCapabilityID). + Return(execCap, nil).Once() + + // Capture the request to inspect proto secrets. + var capturedReq capabilities.CapabilityRequest + execCap.EXPECT().Execute(matches.AnyContext, mock.Anything). + Run(func(_ context.Context, req capabilities.CapabilityRequest) { + capturedReq = req + }). + Return(capabilities.CapabilityResponse{Payload: respPayload}, nil).Once() + + mod := NewConfidentialModule( + capReg, + "https://example.com/binary.wasm", + []byte("hash"), + "wf-1", "owner", "name", "tag", + []SecretIdentifier{{Key: "SECRET_A"}}, // no namespace + lggr, + ) + + _, err := mod.Execute(ctx, execReq, &stubExecutionHelper{executionID: "exec-1"}) + require.NoError(t, err) + + // Unmarshal the captured request payload and verify namespace defaulted to "main". + var confReq confworkflowtypes.ConfidentialWorkflowRequest + require.NoError(t, capturedReq.Payload.UnmarshalTo(&confReq)) + require.Len(t, confReq.VaultDonSecrets, 1) + assert.Equal(t, "SECRET_A", confReq.VaultDonSecrets[0].Key) + assert.Equal(t, "main", confReq.VaultDonSecrets[0].GetNamespace()) + }) + + t.Run("GetExecutable error", func(t *testing.T) { + capReg := regmocks.NewCapabilitiesRegistry(t) + capReg.EXPECT().GetExecutable(matches.AnyContext, confidentialWorkflowsCapabilityID). + Return(nil, fmt.Errorf("capability not found")).Once() + + mod := NewConfidentialModule( + capReg, "", nil, "wf", "owner", "name", "tag", nil, lggr, + ) + + _, err := mod.Execute(ctx, execReq, &stubExecutionHelper{}) + require.Error(t, err) + assert.Contains(t, err.Error(), "failed to get confidential-workflows capability") + }) + + t.Run("capability Execute error", func(t *testing.T) { + capReg := regmocks.NewCapabilitiesRegistry(t) + execCap := capmocks.NewExecutableCapability(t) + + capReg.EXPECT().GetExecutable(matches.AnyContext, confidentialWorkflowsCapabilityID). + Return(execCap, nil).Once() + execCap.EXPECT().Execute(matches.AnyContext, mock.Anything). + Return(capabilities.CapabilityResponse{}, fmt.Errorf("enclave unavailable")).Once() + + mod := NewConfidentialModule( + capReg, "", nil, "wf", "owner", "name", "tag", nil, lggr, + ) + + _, err := mod.Execute(ctx, execReq, &stubExecutionHelper{}) + require.Error(t, err) + assert.Contains(t, err.Error(), "confidential-workflows capability execution failed") + }) + + t.Run("nil payload in response", func(t *testing.T) { + capReg := regmocks.NewCapabilitiesRegistry(t) + execCap := capmocks.NewExecutableCapability(t) + + capReg.EXPECT().GetExecutable(matches.AnyContext, confidentialWorkflowsCapabilityID). + Return(execCap, nil).Once() + execCap.EXPECT().Execute(matches.AnyContext, mock.Anything). + Return(capabilities.CapabilityResponse{Payload: nil}, nil).Once() + + mod := NewConfidentialModule( + capReg, "", nil, "wf", "owner", "name", "tag", nil, lggr, + ) + + _, err := mod.Execute(ctx, execReq, &stubExecutionHelper{}) + require.Error(t, err) + assert.Contains(t, err.Error(), "returned nil payload") + }) + + t.Run("request fields are forwarded correctly", func(t *testing.T) { + capReg := regmocks.NewCapabilitiesRegistry(t) + execCap := capmocks.NewExecutableCapability(t) + + capReg.EXPECT().GetExecutable(matches.AnyContext, confidentialWorkflowsCapabilityID). + Return(execCap, nil).Once() + + var capturedReq capabilities.CapabilityRequest + execCap.EXPECT().Execute(matches.AnyContext, mock.Anything). + Run(func(_ context.Context, req capabilities.CapabilityRequest) { + capturedReq = req + }). + Return(capabilities.CapabilityResponse{Payload: respPayload}, nil).Once() + + binaryHash := ComputeBinaryHash([]byte("some-binary")) + mod := NewConfidentialModule( + capReg, + "https://example.com/wasm", + binaryHash, + "wf-abc", + "0xowner", + "my-workflow", + "v2", + []SecretIdentifier{ + {Key: "K1", Namespace: "ns1"}, + {Key: "K2"}, + }, + lggr, + ) + + _, err := mod.Execute(ctx, execReq, &stubExecutionHelper{executionID: "exec-xyz"}) + require.NoError(t, err) + + // Verify metadata. + assert.Equal(t, "Execute", capturedReq.Method) + assert.Equal(t, confidentialWorkflowsCapabilityID, capturedReq.CapabilityId) + assert.Equal(t, "wf-abc", capturedReq.Metadata.WorkflowID) + assert.Equal(t, "0xowner", capturedReq.Metadata.WorkflowOwner) + assert.Equal(t, "my-workflow", capturedReq.Metadata.WorkflowName) + assert.Equal(t, "v2", capturedReq.Metadata.WorkflowTag) + assert.Equal(t, "exec-xyz", capturedReq.Metadata.WorkflowExecutionID) + + // Verify payload contents. + var confReq confworkflowtypes.ConfidentialWorkflowRequest + require.NoError(t, capturedReq.Payload.UnmarshalTo(&confReq)) + + assert.Equal(t, "wf-abc", confReq.Execution.WorkflowId) + assert.Equal(t, "https://example.com/wasm", confReq.Execution.BinaryUrl) + assert.Equal(t, binaryHash, confReq.Execution.BinaryHash) + + // Verify the serialized ExecuteRequest round-trips. + var roundTripped sdkpb.ExecuteRequest + require.NoError(t, proto.Unmarshal(confReq.Execution.ExecuteRequest, &roundTripped)) + assert.Equal(t, execReq.GetConfig(), roundTripped.GetConfig()) + + // Verify secrets. + require.Len(t, confReq.VaultDonSecrets, 2) + assert.Equal(t, "K1", confReq.VaultDonSecrets[0].Key) + assert.Equal(t, "ns1", confReq.VaultDonSecrets[0].GetNamespace()) + assert.Equal(t, "K2", confReq.VaultDonSecrets[1].Key) + assert.Equal(t, "main", confReq.VaultDonSecrets[1].GetNamespace()) + }) +} + +func TestConfidentialModule_InterfaceMethods(t *testing.T) { + mod := &ConfidentialModule{} + + // These are no-ops but should not panic. + mod.Start() + mod.Close() + assert.False(t, mod.IsLegacyDAG()) +} diff --git a/core/store/migrate/migrations/0295_add_workflow_attributes_column.sql b/core/store/migrate/migrations/0295_add_workflow_attributes_column.sql new file mode 100644 index 00000000000..30c85db7289 --- /dev/null +++ b/core/store/migrate/migrations/0295_add_workflow_attributes_column.sql @@ -0,0 +1,5 @@ +-- +goose Up +ALTER TABLE workflow_specs_v2 ADD COLUMN attributes bytea DEFAULT ''; + +-- +goose Down +ALTER TABLE workflow_specs_v2 DROP COLUMN attributes; From bfe65ca2c36d38ad4491f48a4292c870c247c83a Mon Sep 17 00:00:00 2001 From: Tejaswi Nadahalli Date: Tue, 24 Mar 2026 13:22:16 +0100 Subject: [PATCH 02/12] Revert file fetcher HTTP URL handling The filepath.Base() code for HTTP URLs in newFileFetcher was dead code. No test or production path sends an HTTP URL to a file-based fetcher. The enclave fetches binaries via its own BinaryFetcher, independent of the node syncer's fetcher. --- core/services/workflows/syncer/fetcher.go | 18 +++++------------- core/services/workflows/syncer/v2/fetcher.go | 18 +++++------------- 2 files changed, 10 insertions(+), 26 deletions(-) diff --git a/core/services/workflows/syncer/fetcher.go b/core/services/workflows/syncer/fetcher.go index 33ba4b10a2e..c959b6b4da7 100644 --- a/core/services/workflows/syncer/fetcher.go +++ b/core/services/workflows/syncer/fetcher.go @@ -177,20 +177,12 @@ func newFileFetcher(basePath string, lggr logger.Logger) types.FetcherFunc { if err != nil { return nil, fmt.Errorf("invalid URL: %w", err) } + fullPath := filepath.Clean(u.Path) - var fullPath string - if u.Scheme == "http" || u.Scheme == "https" { - // For HTTP(S) URLs, extract just the filename and resolve against basePath. - // This supports confidential workflows where the on-chain URL must be HTTP - // (so the enclave can fetch the binary), but the syncer reads from the local filesystem. - fullPath = filepath.Join(basePath, filepath.Base(u.Path)) - } else { - fullPath = filepath.Clean(u.Path) - // ensure that the incoming request URL is either relative or absolute but within the basePath - if !filepath.IsAbs(fullPath) { - // If it's not absolute, we assume it's relative to the basePath - fullPath = filepath.Join(basePath, fullPath) - } + // ensure that the incoming request URL is either relative or absolute but within the basePath + if !filepath.IsAbs(fullPath) { + // If it's not absolute, we assume it's relative to the basePath + fullPath = filepath.Join(basePath, fullPath) } if !strings.HasPrefix(fullPath, basePath) { return nil, fmt.Errorf("request URL %s is not within the basePath %s", fullPath, basePath) diff --git a/core/services/workflows/syncer/v2/fetcher.go b/core/services/workflows/syncer/v2/fetcher.go index 4c79fffbe24..e86f998f6bb 100644 --- a/core/services/workflows/syncer/v2/fetcher.go +++ b/core/services/workflows/syncer/v2/fetcher.go @@ -211,20 +211,12 @@ func newFileFetcher(basePath string, lggr logger.Logger) types.FetcherFunc { if err != nil { return nil, fmt.Errorf("invalid URL: %w", err) } + fullPath := filepath.Clean(u.Path) - var fullPath string - if u.Scheme == "http" || u.Scheme == "https" { - // For HTTP(S) URLs, extract just the filename and resolve against basePath. - // This supports confidential workflows where the on-chain URL must be HTTP - // (so the enclave can fetch the binary), but the syncer reads from the local filesystem. - fullPath = filepath.Join(basePath, filepath.Base(u.Path)) - } else { - fullPath = filepath.Clean(u.Path) - // ensure that the incoming request URL is either relative or absolute but within the basePath - if !filepath.IsAbs(fullPath) { - // If it's not absolute, we assume it's relative to the basePath - fullPath = filepath.Join(basePath, fullPath) - } + // ensure that the incoming request URL is either relative or absolute but within the basePath + if !filepath.IsAbs(fullPath) { + // If it's not absolute, we assume it's relative to the basePath + fullPath = filepath.Join(basePath, fullPath) } if !strings.HasPrefix(fullPath, basePath) { return nil, fmt.Errorf("request URL %s is not within the basePath %s", fullPath, basePath) From da4c9fdd6e6b89f04583841ce91e8da7f36a0ca0 Mon Sep 17 00:00:00 2001 From: Tejaswi Nadahalli Date: Tue, 24 Mar 2026 13:28:25 +0100 Subject: [PATCH 03/12] Remove ConfidentialRelay config (belongs in cw-2 relay handler PR) --- core/config/cre_config.go | 8 -------- core/config/toml/types.go | 27 +------------------------ core/services/chainlink/config_cre.go | 29 --------------------------- 3 files changed, 1 insertion(+), 63 deletions(-) diff --git a/core/config/cre_config.go b/core/config/cre_config.go index 78a600d1490..57bc4ad31a9 100644 --- a/core/config/cre_config.go +++ b/core/config/cre_config.go @@ -13,7 +13,6 @@ type CRE interface { // When enabled, additional OTel tracing and logging is performed. DebugMode() bool LocalSecrets() map[string]string - ConfidentialRelay() CREConfidentialRelay } // WorkflowFetcher defines configuration for fetching workflow files @@ -22,13 +21,6 @@ type WorkflowFetcher interface { URL() string } -// CREConfidentialRelay defines configuration for the confidential relay handler. -type CREConfidentialRelay interface { - Enabled() bool - TrustedPCRs() string - CARootsPEM() string -} - // CRELinking defines configuration for connecting to the CRE linking service type CRELinking interface { URL() string diff --git a/core/config/toml/types.go b/core/config/toml/types.go index 8b4d8cd617e..0865babfc96 100644 --- a/core/config/toml/types.go +++ b/core/config/toml/types.go @@ -1985,8 +1985,7 @@ type CreConfig struct { // Requires [Tracing].Enabled = true for traces to be exported (trace export is gated by // Tracing.Enabled in initGlobals; Telemetry.Enabled is optional—traces work with or without it). // WARNING: This is not suitable for production use due to performance overhead. - DebugMode *bool `toml:",omitempty"` - ConfidentialRelay *ConfidentialRelayConfig `toml:",omitempty"` + DebugMode *bool `toml:",omitempty"` } // WorkflowFetcherConfig holds the configuration for fetching workflow files @@ -1994,15 +1993,6 @@ type WorkflowFetcherConfig struct { URL *string `toml:",omitempty"` } -// ConfidentialRelayConfig holds the configuration for the confidential relay handler. -// When Enabled is true, the node participates in the confidential relay DON, -// validating enclave attestations and proxying capability requests. -type ConfidentialRelayConfig struct { - Enabled *bool `toml:",omitempty"` - TrustedPCRs *string `toml:",omitempty"` - CARootsPEM *string `toml:",omitempty"` -} - // LinkingConfig holds the configuration for connecting to the CRE linking service type LinkingConfig struct { URL *string `toml:",omitempty"` @@ -2056,21 +2046,6 @@ func (c *CreConfig) setFrom(f *CreConfig) { if f.DebugMode != nil { c.DebugMode = f.DebugMode } - - if f.ConfidentialRelay != nil { - if c.ConfidentialRelay == nil { - c.ConfidentialRelay = &ConfidentialRelayConfig{} - } - if v := f.ConfidentialRelay.Enabled; v != nil { - c.ConfidentialRelay.Enabled = v - } - if v := f.ConfidentialRelay.TrustedPCRs; v != nil { - c.ConfidentialRelay.TrustedPCRs = v - } - if v := f.ConfidentialRelay.CARootsPEM; v != nil { - c.ConfidentialRelay.CARootsPEM = v - } - } } func (w *WorkflowFetcherConfig) ValidateConfig() error { diff --git a/core/services/chainlink/config_cre.go b/core/services/chainlink/config_cre.go index dbb9c404f21..6714d6a63d5 100644 --- a/core/services/chainlink/config_cre.go +++ b/core/services/chainlink/config_cre.go @@ -105,35 +105,6 @@ func (c *creConfig) Linking() config.CRELinking { return &linkingConfig{url: url, tlsEnabled: tlsEnabled} } -type confidentialRelayConfig struct { - enabled bool - trustedPCRs string - caRootsPEM string -} - -func (cr *confidentialRelayConfig) Enabled() bool { return cr.enabled } -func (cr *confidentialRelayConfig) TrustedPCRs() string { return cr.trustedPCRs } -func (cr *confidentialRelayConfig) CARootsPEM() string { return cr.caRootsPEM } - -func (c *creConfig) ConfidentialRelay() config.CREConfidentialRelay { - if c.c.ConfidentialRelay == nil { - return &confidentialRelayConfig{} - } - enabled := false - if c.c.ConfidentialRelay.Enabled != nil { - enabled = *c.c.ConfidentialRelay.Enabled - } - trustedPCRs := "" - if c.c.ConfidentialRelay.TrustedPCRs != nil { - trustedPCRs = *c.c.ConfidentialRelay.TrustedPCRs - } - caRootsPEM := "" - if c.c.ConfidentialRelay.CARootsPEM != nil { - caRootsPEM = *c.c.ConfidentialRelay.CARootsPEM - } - return &confidentialRelayConfig{enabled: enabled, trustedPCRs: trustedPCRs, caRootsPEM: caRootsPEM} -} - func (c *creConfig) LocalSecrets() map[string]string { return c.s.LocalSecrets } From 15579b67db5226d6b55d729628158081646403f4 Mon Sep 17 00:00:00 2001 From: Tejaswi Nadahalli Date: Tue, 24 Mar 2026 13:37:43 +0100 Subject: [PATCH 04/12] Restore comments in startAndRegisterEngine, extract newV2EngineConfig Restore comments that were dropped when extracting startAndRegisterEngine from tryEngineCreate. Extract common EngineConfig construction into newV2EngineConfig and initDone hook wiring into wireInitDoneHook, reducing duplication between the normal and confidential engine paths. --- core/services/workflows/syncer/v2/handler.go | 156 +++++++++---------- 1 file changed, 71 insertions(+), 85 deletions(-) diff --git a/core/services/workflows/syncer/v2/handler.go b/core/services/workflows/syncer/v2/handler.go index 6f190b4d457..95fe867d3ab 100644 --- a/core/services/workflows/syncer/v2/handler.go +++ b/core/services/workflows/syncer/v2/handler.go @@ -629,55 +629,11 @@ func (h *eventHandler) engineFactoryFn(ctx context.Context, workflowID string, o } // V2 aka "NoDAG" - cfg := &v2.EngineConfig{ - Lggr: h.lggr, - Module: module, - WorkflowConfig: config, - CapRegistry: h.capRegistry, - DonSubscriber: h.workflowDonSubscriber, - UseLocalTimeProvider: h.useLocalTimeProvider, - DonTimeStore: h.donTimeStore, - ExecutionsStore: h.workflowStore, - WorkflowID: workflowID, - WorkflowOwner: owner, - WorkflowName: name, - WorkflowTag: tag, - WorkflowEncryptionKey: h.workflowEncryptionKey, - - LocalLimits: v2.EngineLimits{}, // all defaults - LocalLimiters: h.engineLimiters, - FeatureFlags: h.featureFlags, - GlobalExecutionConcurrencyLimiter: h.workflowLimits, - - BeholderEmitter: func() custmsg.MessageEmitter { - h.emitterMu.RLock() - defer h.emitterMu.RUnlock() - return h.emitter - }(), - BillingClient: h.billingClient, + cfg := h.newV2EngineConfig(module, workflowID, owner, name, tag, config) + cfg.DebugMode = h.debugMode + cfg.SdkName = sdkName - WorkflowRegistryAddress: h.workflowRegistryAddress, - WorkflowRegistryChainSelector: h.workflowRegistryChainSelector, - OrgResolver: h.orgResolver, - DebugMode: h.debugMode, - SecretsFetcher: h.secretsFetcher, - SdkName: sdkName, - } - - // Wire the initDone channel to the OnInitialized lifecycle hook. - // This will be called when the engine completes initialization (including trigger subscriptions). - // We compose with any existing hook to avoid overwriting test hooks or other user-provided hooks. - if initDone != nil { - existingHook := cfg.Hooks.OnInitialized - cfg.Hooks.OnInitialized = func(err error) { - // Signal completion to the handler first - initDone <- err - // Then call any existing hook (e.g., from tests) - if existingHook != nil { - existingHook(err) - } - } - } + h.wireInitDoneHook(cfg, initDone) return v2.NewEngine(cfg) } @@ -843,13 +799,18 @@ func (h *eventHandler) startAndRegisterEngine( // This ensures we don't emit workflowActivated events before the engine initializes successfully. select { case <-ctx.Done(): + // Context cancelled while waiting for initialization if closeErr := engine.Close(); closeErr != nil { h.lggr.Errorw("failed to close engine after context cancellation", "error", closeErr, "workflowID", workflowID) } return fmt.Errorf("context cancelled while waiting for engine initialization: %w", ctx.Err()) case initErr := <-initDone: if initErr != nil { + // Engine initialization failed (e.g., trigger subscription failed) // TODO (cre-1482) add logic to mark a deployment as failed to avoid churn. + // Currently, failed deployments will be retried on each poll cycle (with exponential backoff). + // If the failure is due to user error (e.g., invalid trigger config), this causes unnecessary retries. + // Consider marking the workflow spec as "failed" in the database and requiring workflow redeployment. if closeErr := engine.Close(); closeErr != nil { h.lggr.Errorw("failed to close engine after initialization failure", "error", closeErr, "workflowID", workflowID) } @@ -857,11 +818,13 @@ func (h *eventHandler) startAndRegisterEngine( } } + // Engine is fully initialized, add to registry with source tracking if err := h.engineRegistry.Add(wid, source, engine); err != nil { if closeErr := engine.Close(); closeErr != nil { return fmt.Errorf("failed to close workflow engine: %w during invariant violation: %w", closeErr, err) } + // Check for WorkflowID collision across sources if errors.Is(err, ErrAlreadyExists) { existingEntry, found := h.engineRegistry.Get(wid) if found { @@ -873,11 +836,68 @@ func (h *eventHandler) startAndRegisterEngine( } } + // This shouldn't happen because we call the handler serially and + // check for running engines above, see the call to engineRegistry.Contains. return fmt.Errorf("invariant violation: %w", err) } return nil } +// newV2EngineConfig builds the common EngineConfig shared by both the normal +// WASM engine and the confidential engine paths. Caller supplies the module. +func (h *eventHandler) newV2EngineConfig( + module host.ModuleV2, + workflowID string, owner string, name types.WorkflowName, tag string, config []byte, +) *v2.EngineConfig { + h.emitterMu.RLock() + emitter := h.emitter + h.emitterMu.RUnlock() + + return &v2.EngineConfig{ + Lggr: h.lggr, + Module: module, + WorkflowConfig: config, + CapRegistry: h.capRegistry, + DonSubscriber: h.workflowDonSubscriber, + UseLocalTimeProvider: h.useLocalTimeProvider, + DonTimeStore: h.donTimeStore, + ExecutionsStore: h.workflowStore, + WorkflowID: workflowID, + WorkflowOwner: owner, + WorkflowName: name, + WorkflowTag: tag, + WorkflowEncryptionKey: h.workflowEncryptionKey, + + LocalLimits: v2.EngineLimits{}, + LocalLimiters: h.engineLimiters, + FeatureFlags: h.featureFlags, + GlobalExecutionConcurrencyLimiter: h.workflowLimits, + + BeholderEmitter: emitter, + BillingClient: h.billingClient, + + WorkflowRegistryAddress: h.workflowRegistryAddress, + WorkflowRegistryChainSelector: h.workflowRegistryChainSelector, + OrgResolver: h.orgResolver, + SecretsFetcher: h.secretsFetcher, + } +} + +// wireInitDoneHook wires the initDone channel to the OnInitialized lifecycle hook. +// It composes with any existing hook to avoid overwriting test hooks. +func (h *eventHandler) wireInitDoneHook(cfg *v2.EngineConfig, initDone chan<- error) { + if initDone == nil { + return + } + existingHook := cfg.Hooks.OnInitialized + cfg.Hooks.OnInitialized = func(err error) { + initDone <- err + if existingHook != nil { + existingHook(err) + } + } +} + // tryConfidentialEngineCreate creates a V2 engine backed by a ConfidentialModule // instead of a local WASM module. The ConfidentialModule delegates execution to // the confidential-workflows capability which runs the WASM inside a TEE. @@ -911,44 +931,10 @@ func (h *eventHandler) tryConfidentialEngineCreate( lggr, ) - initDone := make(chan error, 1) - - cfg := &v2.EngineConfig{ - Lggr: h.lggr, - Module: module, - WorkflowConfig: []byte(spec.Config), - CapRegistry: h.capRegistry, - DonSubscriber: h.workflowDonSubscriber, - UseLocalTimeProvider: h.useLocalTimeProvider, - DonTimeStore: h.donTimeStore, - ExecutionsStore: h.workflowStore, - WorkflowID: spec.WorkflowID, - WorkflowOwner: spec.WorkflowOwner, - WorkflowName: workflowName, - WorkflowTag: spec.WorkflowTag, - WorkflowEncryptionKey: h.workflowEncryptionKey, - - LocalLimits: v2.EngineLimits{}, - LocalLimiters: h.engineLimiters, - GlobalExecutionConcurrencyLimiter: h.workflowLimits, - - BeholderEmitter: h.emitter, - BillingClient: h.billingClient, - - WorkflowRegistryAddress: h.workflowRegistryAddress, - WorkflowRegistryChainSelector: h.workflowRegistryChainSelector, - OrgResolver: h.orgResolver, - SecretsFetcher: h.secretsFetcher, - FeatureFlags: h.featureFlags, - } + cfg := h.newV2EngineConfig(module, spec.WorkflowID, spec.WorkflowOwner, workflowName, spec.WorkflowTag, []byte(spec.Config)) - existingHook := cfg.Hooks.OnInitialized - cfg.Hooks.OnInitialized = func(err error) { - initDone <- err - if existingHook != nil { - existingHook(err) - } - } + initDone := make(chan error, 1) + h.wireInitDoneHook(cfg, initDone) engine, err := v2.NewEngine(cfg) if err != nil { From 03fc9df1c11125caa1f1dda95db87522d3b3cfe7 Mon Sep 17 00:00:00 2001 From: Tejaswi Nadahalli Date: Wed, 1 Apr 2026 14:21:35 +0200 Subject: [PATCH 05/12] Unify engine creation flow for confidential and normal paths Replace the early-return pattern with a symmetric if/else that picks the factory, then converges on a single startAndRegisterEngine call. Rename tryConfidentialEngineCreate to confidentialEngineFactory and change its signature to return (services.Service, error). # Conflicts: # core/services/workflows/syncer/v2/handler.go --- core/services/workflows/syncer/v2/handler.go | 47 +++++-------------- .../workflows/syncer/v2/handler_test.go | 2 +- 2 files changed, 14 insertions(+), 35 deletions(-) diff --git a/core/services/workflows/syncer/v2/handler.go b/core/services/workflows/syncer/v2/handler.go index 95fe867d3ab..7cab85cc575 100644 --- a/core/services/workflows/syncer/v2/handler.go +++ b/core/services/workflows/syncer/v2/handler.go @@ -750,29 +750,16 @@ func (h *eventHandler) tryEngineCreate(ctx context.Context, spec *job.WorkflowSp if err != nil { return fmt.Errorf("failed to parse workflow attributes: %w", err) } - if confidential { - h.lggr.Infow("routing workflow to confidential execution", "workflowID", spec.WorkflowID) - return h.tryConfidentialEngineCreate(ctx, spec, wid, workflowName, decodedBinary, source) - } - // Create a channel to receive the initialization result. - // This allows us to wait for the engine to complete initialization (including trigger subscriptions) - // before emitting the workflowActivated event, ensuring the event accurately reflects deployment status. initDone := make(chan error, 1) + var engine services.Service - // Scope the engineFactory call so that decodedBinary goes out of scope immediately after the factory returns - engine, err := func() (services.Service, error) { - return h.engineFactory( - ctx, - spec.WorkflowID, - spec.WorkflowOwner, - workflowName, - spec.WorkflowTag, - configBytes, - decodedBinary, - initDone, - ) - }() + if confidential { + h.lggr.Infow("routing workflow to confidential execution", "workflowID", spec.WorkflowID) + engine, err = h.confidentialEngineFactory(spec, wid, workflowName, decodedBinary, initDone) + } else { + engine, err = h.engineFactory(ctx, spec.WorkflowID, spec.WorkflowOwner, workflowName, spec.WorkflowTag, configBytes, decodedBinary, initDone) + } if err != nil { return fmt.Errorf("failed to create workflow engine: %w", err) } @@ -898,20 +885,19 @@ func (h *eventHandler) wireInitDoneHook(cfg *v2.EngineConfig, initDone chan<- er } } -// tryConfidentialEngineCreate creates a V2 engine backed by a ConfidentialModule +// confidentialEngineFactory creates a V2 engine backed by a ConfidentialModule // instead of a local WASM module. The ConfidentialModule delegates execution to // the confidential-workflows capability which runs the WASM inside a TEE. -func (h *eventHandler) tryConfidentialEngineCreate( - ctx context.Context, +func (h *eventHandler) confidentialEngineFactory( spec *job.WorkflowSpec, wid types.WorkflowID, workflowName types.WorkflowName, decodedBinary []byte, - source string, -) error { + initDone chan<- error, +) (services.Service, error) { attrs, err := v2.ParseWorkflowAttributes(spec.Attributes) if err != nil { - return fmt.Errorf("failed to parse workflow attributes: %w", err) + return nil, fmt.Errorf("failed to parse workflow attributes: %w", err) } binaryHash := v2.ComputeBinaryHash(decodedBinary) @@ -932,16 +918,9 @@ func (h *eventHandler) tryConfidentialEngineCreate( ) cfg := h.newV2EngineConfig(module, spec.WorkflowID, spec.WorkflowOwner, workflowName, spec.WorkflowTag, []byte(spec.Config)) - - initDone := make(chan error, 1) h.wireInitDoneHook(cfg, initDone) - engine, err := v2.NewEngine(cfg) - if err != nil { - return fmt.Errorf("failed to create confidential workflow engine: %w", err) - } - - return h.startAndRegisterEngine(ctx, engine, initDone, wid, spec.WorkflowID, source) + return v2.NewEngine(cfg) } // logCustMsg emits a custom message to the external sink and logs an error if that fails. diff --git a/core/services/workflows/syncer/v2/handler_test.go b/core/services/workflows/syncer/v2/handler_test.go index 775f059613e..aa0a66c6362 100644 --- a/core/services/workflows/syncer/v2/handler_test.go +++ b/core/services/workflows/syncer/v2/handler_test.go @@ -704,7 +704,7 @@ func Test_workflowRegisteredHandler_confidentialRouting(t *testing.T) { // (non-hex owner), engine creation fails. The error comes from the // confidential path, proving routing worked correctly. require.Error(t, err) - assert.Contains(t, err.Error(), "failed to create confidential workflow engine") + assert.Contains(t, err.Error(), "failed to create workflow engine") // The engine factory must NOT have been called; the confidential path // bypasses it. From 36498392e022bacfb2d894da49aff59907b8808a Mon Sep 17 00:00:00 2001 From: Tejaswi Nadahalli Date: Tue, 24 Mar 2026 13:50:56 +0100 Subject: [PATCH 06/12] Restore initDone comment in tryEngineCreate --- core/services/workflows/syncer/v2/handler.go | 3 +++ 1 file changed, 3 insertions(+) diff --git a/core/services/workflows/syncer/v2/handler.go b/core/services/workflows/syncer/v2/handler.go index 7cab85cc575..3255040656c 100644 --- a/core/services/workflows/syncer/v2/handler.go +++ b/core/services/workflows/syncer/v2/handler.go @@ -751,6 +751,9 @@ func (h *eventHandler) tryEngineCreate(ctx context.Context, spec *job.WorkflowSp return fmt.Errorf("failed to parse workflow attributes: %w", err) } + // Create a channel to receive the initialization result. + // This allows us to wait for the engine to complete initialization (including trigger subscriptions) + // before emitting the workflowActivated event, ensuring the event accurately reflects deployment status. initDone := make(chan error, 1) var engine services.Service From 8690d717103cd809ffc6470565d9e8686023d2f5 Mon Sep 17 00:00:00 2001 From: Tejaswi Nadahalli Date: Tue, 24 Mar 2026 13:53:28 +0100 Subject: [PATCH 07/12] Inline startAndRegisterEngine back into tryEngineCreate No longer needed as a separate method now that both engine paths converge in tryEngineCreate. --- core/services/workflows/syncer/v2/handler.go | 20 +++----------------- 1 file changed, 3 insertions(+), 17 deletions(-) diff --git a/core/services/workflows/syncer/v2/handler.go b/core/services/workflows/syncer/v2/handler.go index 3255040656c..42f90b0a43e 100644 --- a/core/services/workflows/syncer/v2/handler.go +++ b/core/services/workflows/syncer/v2/handler.go @@ -767,21 +767,7 @@ func (h *eventHandler) tryEngineCreate(ctx context.Context, spec *job.WorkflowSp return fmt.Errorf("failed to create workflow engine: %w", err) } - return h.startAndRegisterEngine(ctx, engine, initDone, wid, spec.WorkflowID, source) -} - -// startAndRegisterEngine starts a workflow engine, waits for initialization to -// complete, and registers it in the engine registry. Used by both the normal -// and confidential engine creation paths. -func (h *eventHandler) startAndRegisterEngine( - ctx context.Context, - engine services.Service, - initDone <-chan error, - wid types.WorkflowID, - workflowID string, - source string, -) error { - if err := engine.Start(ctx); err != nil { + if err = engine.Start(ctx); err != nil { return fmt.Errorf("failed to start workflow engine: %w", err) } @@ -791,7 +777,7 @@ func (h *eventHandler) startAndRegisterEngine( case <-ctx.Done(): // Context cancelled while waiting for initialization if closeErr := engine.Close(); closeErr != nil { - h.lggr.Errorw("failed to close engine after context cancellation", "error", closeErr, "workflowID", workflowID) + h.lggr.Errorw("failed to close engine after context cancellation", "error", closeErr, "workflowID", spec.WorkflowID) } return fmt.Errorf("context cancelled while waiting for engine initialization: %w", ctx.Err()) case initErr := <-initDone: @@ -802,7 +788,7 @@ func (h *eventHandler) startAndRegisterEngine( // If the failure is due to user error (e.g., invalid trigger config), this causes unnecessary retries. // Consider marking the workflow spec as "failed" in the database and requiring workflow redeployment. if closeErr := engine.Close(); closeErr != nil { - h.lggr.Errorw("failed to close engine after initialization failure", "error", closeErr, "workflowID", workflowID) + h.lggr.Errorw("failed to close engine after initialization failure", "error", closeErr, "workflowID", spec.WorkflowID) } return fmt.Errorf("engine initialization failed: %w", initErr) } From 563e8a3d73050136b7e17f01a4b002d30aeb81be Mon Sep 17 00:00:00 2001 From: Tejaswi Nadahalli Date: Tue, 24 Mar 2026 13:55:49 +0100 Subject: [PATCH 08/12] Restore inline comments in wireInitDoneHook --- core/services/workflows/syncer/v2/handler.go | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/core/services/workflows/syncer/v2/handler.go b/core/services/workflows/syncer/v2/handler.go index 42f90b0a43e..2129d81c00f 100644 --- a/core/services/workflows/syncer/v2/handler.go +++ b/core/services/workflows/syncer/v2/handler.go @@ -860,14 +860,17 @@ func (h *eventHandler) newV2EngineConfig( } // wireInitDoneHook wires the initDone channel to the OnInitialized lifecycle hook. -// It composes with any existing hook to avoid overwriting test hooks. +// This will be called when the engine completes initialization (including trigger subscriptions). +// We compose with any existing hook to avoid overwriting test hooks or other user-provided hooks. func (h *eventHandler) wireInitDoneHook(cfg *v2.EngineConfig, initDone chan<- error) { if initDone == nil { return } existingHook := cfg.Hooks.OnInitialized cfg.Hooks.OnInitialized = func(err error) { + // Signal completion to the handler first initDone <- err + // Then call any existing hook (e.g., from tests) if existingHook != nil { existingHook(err) } From 9e2aef8a90176289fded1c13a0b903e75f41c912 Mon Sep 17 00:00:00 2001 From: Tejaswi Nadahalli Date: Tue, 24 Mar 2026 13:59:20 +0100 Subject: [PATCH 09/12] Restore original BeholderEmitter closure pattern in newV2EngineConfig --- core/services/workflows/syncer/v2/handler.go | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/core/services/workflows/syncer/v2/handler.go b/core/services/workflows/syncer/v2/handler.go index 2129d81c00f..fa03d29e81c 100644 --- a/core/services/workflows/syncer/v2/handler.go +++ b/core/services/workflows/syncer/v2/handler.go @@ -825,10 +825,6 @@ func (h *eventHandler) newV2EngineConfig( module host.ModuleV2, workflowID string, owner string, name types.WorkflowName, tag string, config []byte, ) *v2.EngineConfig { - h.emitterMu.RLock() - emitter := h.emitter - h.emitterMu.RUnlock() - return &v2.EngineConfig{ Lggr: h.lggr, Module: module, @@ -844,13 +840,17 @@ func (h *eventHandler) newV2EngineConfig( WorkflowTag: tag, WorkflowEncryptionKey: h.workflowEncryptionKey, - LocalLimits: v2.EngineLimits{}, + LocalLimits: v2.EngineLimits{}, // all defaults LocalLimiters: h.engineLimiters, FeatureFlags: h.featureFlags, GlobalExecutionConcurrencyLimiter: h.workflowLimits, - BeholderEmitter: emitter, - BillingClient: h.billingClient, + BeholderEmitter: func() custmsg.MessageEmitter { + h.emitterMu.RLock() + defer h.emitterMu.RUnlock() + return h.emitter + }(), + BillingClient: h.billingClient, WorkflowRegistryAddress: h.workflowRegistryAddress, WorkflowRegistryChainSelector: h.workflowRegistryChainSelector, From 973f35ce6703a6db9c3886ea7b5d21053d9c9772 Mon Sep 17 00:00:00 2001 From: Tejaswi Nadahalli Date: Wed, 1 Apr 2026 14:24:49 +0200 Subject: [PATCH 10/12] Clean up factory signatures and newV2EngineConfig param ordering Group string params together in newV2EngineConfig, move SdkName and DebugMode into the constructor, drop unused wid param from confidentialEngineFactory. # Conflicts: # core/services/workflows/syncer/v2/handler.go --- core/services/workflows/syncer/v2/handler.go | 15 ++++++++------- 1 file changed, 8 insertions(+), 7 deletions(-) diff --git a/core/services/workflows/syncer/v2/handler.go b/core/services/workflows/syncer/v2/handler.go index fa03d29e81c..ccc081d40b6 100644 --- a/core/services/workflows/syncer/v2/handler.go +++ b/core/services/workflows/syncer/v2/handler.go @@ -629,9 +629,7 @@ func (h *eventHandler) engineFactoryFn(ctx context.Context, workflowID string, o } // V2 aka "NoDAG" - cfg := h.newV2EngineConfig(module, workflowID, owner, name, tag, config) - cfg.DebugMode = h.debugMode - cfg.SdkName = sdkName + cfg := h.newV2EngineConfig(module, workflowID, owner, tag, sdkName, name, config) h.wireInitDoneHook(cfg, initDone) @@ -759,7 +757,7 @@ func (h *eventHandler) tryEngineCreate(ctx context.Context, spec *job.WorkflowSp if confidential { h.lggr.Infow("routing workflow to confidential execution", "workflowID", spec.WorkflowID) - engine, err = h.confidentialEngineFactory(spec, wid, workflowName, decodedBinary, initDone) + engine, err = h.confidentialEngineFactory(spec, workflowName, decodedBinary, initDone) } else { engine, err = h.engineFactory(ctx, spec.WorkflowID, spec.WorkflowOwner, workflowName, spec.WorkflowTag, configBytes, decodedBinary, initDone) } @@ -823,7 +821,9 @@ func (h *eventHandler) tryEngineCreate(ctx context.Context, spec *job.WorkflowSp // WASM engine and the confidential engine paths. Caller supplies the module. func (h *eventHandler) newV2EngineConfig( module host.ModuleV2, - workflowID string, owner string, name types.WorkflowName, tag string, config []byte, + workflowID, owner, tag, sdkName string, + name types.WorkflowName, + config []byte, ) *v2.EngineConfig { return &v2.EngineConfig{ Lggr: h.lggr, @@ -856,6 +856,8 @@ func (h *eventHandler) newV2EngineConfig( WorkflowRegistryChainSelector: h.workflowRegistryChainSelector, OrgResolver: h.orgResolver, SecretsFetcher: h.secretsFetcher, + DebugMode: h.debugMode, + SdkName: sdkName, } } @@ -882,7 +884,6 @@ func (h *eventHandler) wireInitDoneHook(cfg *v2.EngineConfig, initDone chan<- er // the confidential-workflows capability which runs the WASM inside a TEE. func (h *eventHandler) confidentialEngineFactory( spec *job.WorkflowSpec, - wid types.WorkflowID, workflowName types.WorkflowName, decodedBinary []byte, initDone chan<- error, @@ -909,7 +910,7 @@ func (h *eventHandler) confidentialEngineFactory( lggr, ) - cfg := h.newV2EngineConfig(module, spec.WorkflowID, spec.WorkflowOwner, workflowName, spec.WorkflowTag, []byte(spec.Config)) + cfg := h.newV2EngineConfig(module, spec.WorkflowID, spec.WorkflowOwner, spec.WorkflowTag, "", workflowName, []byte(spec.Config)) h.wireInitDoneHook(cfg, initDone) return v2.NewEngine(cfg) From eaaaaf88528441c35a018f0b455ea11653b209a7 Mon Sep 17 00:00:00 2001 From: Tejaswi Nadahalli Date: Wed, 25 Mar 2026 13:47:48 +0100 Subject: [PATCH 11/12] Fix lint: errors.New, assert.Empty --- core/services/workflows/v2/confidential_module.go | 3 ++- core/services/workflows/v2/confidential_module_test.go | 8 ++++---- 2 files changed, 6 insertions(+), 5 deletions(-) diff --git a/core/services/workflows/v2/confidential_module.go b/core/services/workflows/v2/confidential_module.go index 4cc3761e18a..16bbb83052c 100644 --- a/core/services/workflows/v2/confidential_module.go +++ b/core/services/workflows/v2/confidential_module.go @@ -4,6 +4,7 @@ import ( "context" "crypto/sha256" "encoding/json" + "errors" "fmt" "google.golang.org/protobuf/proto" @@ -165,7 +166,7 @@ func (m *ConfidentialModule) Execute( } if capResp.Payload == nil { - return nil, fmt.Errorf("confidential-workflows capability returned nil payload") + return nil, errors.New("confidential-workflows capability returned nil payload") } var confResp confworkflowtypes.ConfidentialWorkflowResponse diff --git a/core/services/workflows/v2/confidential_module_test.go b/core/services/workflows/v2/confidential_module_test.go index 87dab426022..598f82c0119 100644 --- a/core/services/workflows/v2/confidential_module_test.go +++ b/core/services/workflows/v2/confidential_module_test.go @@ -3,7 +3,7 @@ package v2 import ( "context" "crypto/sha256" - "fmt" + "errors" "testing" "time" @@ -52,7 +52,7 @@ func TestParseWorkflowAttributes(t *testing.T) { assert.True(t, attrs.Confidential) require.Len(t, attrs.VaultDonSecrets, 2) assert.Equal(t, "API_KEY", attrs.VaultDonSecrets[0].Key) - assert.Equal(t, "", attrs.VaultDonSecrets[0].Namespace) + assert.Empty(t, attrs.VaultDonSecrets[0].Namespace) assert.Equal(t, "SIGNING_KEY", attrs.VaultDonSecrets[1].Key) assert.Equal(t, "custom-ns", attrs.VaultDonSecrets[1].Namespace) }) @@ -222,7 +222,7 @@ func TestConfidentialModule_Execute(t *testing.T) { t.Run("GetExecutable error", func(t *testing.T) { capReg := regmocks.NewCapabilitiesRegistry(t) capReg.EXPECT().GetExecutable(matches.AnyContext, confidentialWorkflowsCapabilityID). - Return(nil, fmt.Errorf("capability not found")).Once() + Return(nil, errors.New("capability not found")).Once() mod := NewConfidentialModule( capReg, "", nil, "wf", "owner", "name", "tag", nil, lggr, @@ -240,7 +240,7 @@ func TestConfidentialModule_Execute(t *testing.T) { capReg.EXPECT().GetExecutable(matches.AnyContext, confidentialWorkflowsCapabilityID). Return(execCap, nil).Once() execCap.EXPECT().Execute(matches.AnyContext, mock.Anything). - Return(capabilities.CapabilityResponse{}, fmt.Errorf("enclave unavailable")).Once() + Return(capabilities.CapabilityResponse{}, errors.New("enclave unavailable")).Once() mod := NewConfidentialModule( capReg, "", nil, "wf", "owner", "name", "tag", nil, lggr, From 764834ec2e30d39a9e9f109ad32317d2bf9e9b50 Mon Sep 17 00:00:00 2001 From: Tejaswi Nadahalli Date: Thu, 26 Mar 2026 17:36:46 +0100 Subject: [PATCH 12/12] Add BinaryURLResolver to ConfidentialModule for presigned URL support The enclave needs an authenticated URL to download WASM binaries from the CRE storage service. BinaryURLResolver resolves the raw on-chain URL into a presigned/ephemeral URL per execution. Nil-safe: falls back to the raw URL when no resolver is set. PR 5/5 (#21642) wires this to the storage service retriever. --- core/services/workflows/syncer/v2/handler.go | 9 +- .../workflows/v2/confidential_module.go | 61 +++++++---- .../workflows/v2/confidential_module_test.go | 103 +++++++++++++++++- 3 files changed, 143 insertions(+), 30 deletions(-) diff --git a/core/services/workflows/syncer/v2/handler.go b/core/services/workflows/syncer/v2/handler.go index ccc081d40b6..7a493ed358d 100644 --- a/core/services/workflows/syncer/v2/handler.go +++ b/core/services/workflows/syncer/v2/handler.go @@ -898,15 +898,16 @@ func (h *eventHandler) confidentialEngineFactory( lggr := logger.Named(h.lggr, "WorkflowEngine.ConfidentialModule") lggr = logger.With(lggr, "workflowID", spec.WorkflowID, "workflowName", spec.WorkflowName, "workflowOwner", spec.WorkflowOwner) + // nil resolver: raw binaryURL is passed to the enclave as-is. + // PR 5/5 (#21642) wires this to the storage service retriever + // so the enclave receives a presigned URL. module := v2.NewConfidentialModule( h.capRegistry, spec.BinaryURL, binaryHash, - spec.WorkflowID, - spec.WorkflowOwner, - workflowName.String(), - spec.WorkflowTag, + spec.WorkflowID, spec.WorkflowOwner, workflowName.String(), spec.WorkflowTag, attrs.VaultDonSecrets, + nil, lggr, ) diff --git a/core/services/workflows/v2/confidential_module.go b/core/services/workflows/v2/confidential_module.go index 16bbb83052c..7a1983d2a27 100644 --- a/core/services/workflows/v2/confidential_module.go +++ b/core/services/workflows/v2/confidential_module.go @@ -57,19 +57,26 @@ func IsConfidential(data []byte) (bool, error) { return attrs.Confidential, nil } +// BinaryURLResolver resolves a raw binary URL into an ephemeral/presigned +// URL that the enclave can fetch without authentication. In production this +// calls the CRE storage service; nil means the raw URL is used as-is. +// PR 5/5 (#21642) wires this to the storage service retriever. +type BinaryURLResolver func(ctx context.Context, workflowID string) (string, error) + // ConfidentialModule implements host.ModuleV2 for confidential workflows. // Instead of running WASM locally, it delegates execution to the // confidential-workflows capability via the CapabilitiesRegistry. type ConfidentialModule struct { - capRegistry core.CapabilitiesRegistry - binaryURL string - binaryHash []byte - workflowID string - workflowOwner string - workflowName string - workflowTag string - vaultDonSecrets []SecretIdentifier - lggr logger.Logger + capRegistry core.CapabilitiesRegistry + binaryURL string + binaryHash []byte + workflowID string + workflowOwner string + workflowName string + workflowTag string + vaultDonSecrets []SecretIdentifier + binaryURLResolver BinaryURLResolver + lggr logger.Logger } var _ host.ModuleV2 = (*ConfidentialModule)(nil) @@ -78,23 +85,22 @@ func NewConfidentialModule( capRegistry core.CapabilitiesRegistry, binaryURL string, binaryHash []byte, - workflowID string, - workflowOwner string, - workflowName string, - workflowTag string, + workflowID, workflowOwner, workflowName, workflowTag string, vaultDonSecrets []SecretIdentifier, + binaryURLResolver BinaryURLResolver, lggr logger.Logger, ) *ConfidentialModule { return &ConfidentialModule{ - capRegistry: capRegistry, - binaryURL: binaryURL, - binaryHash: binaryHash, - workflowID: workflowID, - workflowOwner: workflowOwner, - workflowName: workflowName, - workflowTag: workflowTag, - vaultDonSecrets: vaultDonSecrets, - lggr: lggr, + capRegistry: capRegistry, + binaryURL: binaryURL, + binaryHash: binaryHash, + workflowID: workflowID, + workflowOwner: workflowOwner, + workflowName: workflowName, + workflowTag: workflowTag, + vaultDonSecrets: vaultDonSecrets, + binaryURLResolver: binaryURLResolver, + lggr: lggr, } } @@ -125,11 +131,20 @@ func (m *ConfidentialModule) Execute( } } + binaryURL := m.binaryURL + if m.binaryURLResolver != nil { + resolved, resolveErr := m.binaryURLResolver(ctx, m.workflowID) + if resolveErr != nil { + return nil, fmt.Errorf("failed to resolve binary URL: %w", resolveErr) + } + binaryURL = resolved + } + capInput := &confworkflowtypes.ConfidentialWorkflowRequest{ VaultDonSecrets: protoSecrets, Execution: &confworkflowtypes.WorkflowExecution{ WorkflowId: m.workflowID, - BinaryUrl: m.binaryURL, + BinaryUrl: binaryURL, BinaryHash: m.binaryHash, ExecuteRequest: execReqBytes, Owner: m.workflowOwner, diff --git a/core/services/workflows/v2/confidential_module_test.go b/core/services/workflows/v2/confidential_module_test.go index 598f82c0119..e72d7164b29 100644 --- a/core/services/workflows/v2/confidential_module_test.go +++ b/core/services/workflows/v2/confidential_module_test.go @@ -172,6 +172,7 @@ func TestConfidentialModule_Execute(t *testing.T) { {Key: "API_KEY"}, {Key: "SIGNING_KEY", Namespace: "custom-ns"}, }, + nil, lggr, ) @@ -205,6 +206,7 @@ func TestConfidentialModule_Execute(t *testing.T) { []byte("hash"), "wf-1", "owner", "name", "tag", []SecretIdentifier{{Key: "SECRET_A"}}, // no namespace + nil, lggr, ) @@ -225,7 +227,7 @@ func TestConfidentialModule_Execute(t *testing.T) { Return(nil, errors.New("capability not found")).Once() mod := NewConfidentialModule( - capReg, "", nil, "wf", "owner", "name", "tag", nil, lggr, + capReg, "", nil, "wf", "owner", "name", "tag", nil, nil, lggr, ) _, err := mod.Execute(ctx, execReq, &stubExecutionHelper{}) @@ -243,7 +245,7 @@ func TestConfidentialModule_Execute(t *testing.T) { Return(capabilities.CapabilityResponse{}, errors.New("enclave unavailable")).Once() mod := NewConfidentialModule( - capReg, "", nil, "wf", "owner", "name", "tag", nil, lggr, + capReg, "", nil, "wf", "owner", "name", "tag", nil, nil, lggr, ) _, err := mod.Execute(ctx, execReq, &stubExecutionHelper{}) @@ -261,7 +263,7 @@ func TestConfidentialModule_Execute(t *testing.T) { Return(capabilities.CapabilityResponse{Payload: nil}, nil).Once() mod := NewConfidentialModule( - capReg, "", nil, "wf", "owner", "name", "tag", nil, lggr, + capReg, "", nil, "wf", "owner", "name", "tag", nil, nil, lggr, ) _, err := mod.Execute(ctx, execReq, &stubExecutionHelper{}) @@ -296,6 +298,7 @@ func TestConfidentialModule_Execute(t *testing.T) { {Key: "K1", Namespace: "ns1"}, {Key: "K2"}, }, + nil, lggr, ) @@ -333,6 +336,100 @@ func TestConfidentialModule_Execute(t *testing.T) { }) } +func TestConfidentialModule_BinaryURLResolver(t *testing.T) { + ctx := context.Background() + lggr := logger.Nop() + + execReq := &sdkpb.ExecuteRequest{Config: []byte("cfg")} + + expectedResult := &sdkpb.ExecutionResult{ + Result: &sdkpb.ExecutionResult_Value{ + Value: valuespb.NewStringValue("ok"), + }, + } + resultBytes, err := proto.Marshal(expectedResult) + require.NoError(t, err) + confResp := &confworkflowtypes.ConfidentialWorkflowResponse{ExecutionResult: resultBytes} + respPayload, err := anypb.New(confResp) + require.NoError(t, err) + + t.Run("resolver replaces binary URL", func(t *testing.T) { + capReg := regmocks.NewCapabilitiesRegistry(t) + execCap := capmocks.NewExecutableCapability(t) + + capReg.EXPECT().GetExecutable(matches.AnyContext, confidentialWorkflowsCapabilityID). + Return(execCap, nil).Once() + + var capturedReq capabilities.CapabilityRequest + execCap.EXPECT().Execute(matches.AnyContext, mock.Anything). + Run(func(_ context.Context, req capabilities.CapabilityRequest) { + capturedReq = req + }). + Return(capabilities.CapabilityResponse{Payload: respPayload}, nil).Once() + + resolver := func(_ context.Context, wfID string) (string, error) { + return "https://presigned.example.com/" + wfID + "?token=abc", nil + } + + mod := NewConfidentialModule( + capReg, "https://storage.example.com/raw", []byte("hash"), + "wf-1", "owner", "name", "tag", nil, resolver, lggr, + ) + + _, err := mod.Execute(ctx, execReq, &stubExecutionHelper{executionID: "exec-1"}) + require.NoError(t, err) + + var confReq confworkflowtypes.ConfidentialWorkflowRequest + require.NoError(t, capturedReq.Payload.UnmarshalTo(&confReq)) + assert.Equal(t, "https://presigned.example.com/wf-1?token=abc", confReq.Execution.BinaryUrl) + }) + + t.Run("nil resolver uses raw URL", func(t *testing.T) { + capReg := regmocks.NewCapabilitiesRegistry(t) + execCap := capmocks.NewExecutableCapability(t) + + capReg.EXPECT().GetExecutable(matches.AnyContext, confidentialWorkflowsCapabilityID). + Return(execCap, nil).Once() + + var capturedReq capabilities.CapabilityRequest + execCap.EXPECT().Execute(matches.AnyContext, mock.Anything). + Run(func(_ context.Context, req capabilities.CapabilityRequest) { + capturedReq = req + }). + Return(capabilities.CapabilityResponse{Payload: respPayload}, nil).Once() + + mod := NewConfidentialModule( + capReg, "https://storage.example.com/raw", []byte("hash"), + "wf-1", "owner", "name", "tag", nil, nil, lggr, + ) + + _, err := mod.Execute(ctx, execReq, &stubExecutionHelper{executionID: "exec-1"}) + require.NoError(t, err) + + var confReq confworkflowtypes.ConfidentialWorkflowRequest + require.NoError(t, capturedReq.Payload.UnmarshalTo(&confReq)) + assert.Equal(t, "https://storage.example.com/raw", confReq.Execution.BinaryUrl) + }) + + t.Run("resolver error propagates", func(t *testing.T) { + capReg := regmocks.NewCapabilitiesRegistry(t) + + resolver := func(_ context.Context, _ string) (string, error) { + return "", errors.New("storage service unavailable") + } + + mod := NewConfidentialModule( + capReg, "https://storage.example.com/raw", []byte("hash"), + "wf-1", "owner", "name", "tag", nil, resolver, lggr, + ) + + _, err := mod.Execute(ctx, execReq, &stubExecutionHelper{executionID: "exec-1"}) + require.Error(t, err) + assert.Contains(t, err.Error(), "failed to resolve binary URL") + assert.Contains(t, err.Error(), "storage service unavailable") + }) +} + func TestConfidentialModule_InterfaceMethods(t *testing.T) { mod := &ConfidentialModule{}