Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions core/services/job/models.go
Original file line number Diff line number Diff line change
Expand Up @@ -930,6 +930,7 @@ type WorkflowSpec struct {
CreatedAt time.Time `toml:"-"`
UpdatedAt time.Time `toml:"-"`
SpecType WorkflowSpecType `toml:"spec_type" db:"spec_type"`
Attributes []byte `db:"attributes"`
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's figure out if we need a new Attributes field here or can overload a different existing field.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Any ideas?

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can probably open a thread with dev services today.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Update: this is optimal.

sdkWorkflow *sdk.WorkflowSpec
rawSpec []byte
config []byte
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,8 @@ func GetCapabilityIDFromCommand(command string, config string) string {
return "http-trigger@1.0.0-alpha"
case "http_action":
return "http-actions@1.0.0-alpha" // plural "actions"
case "mock":
return "mock@1.0.0"
default:
return ""
}
Expand All @@ -71,6 +73,8 @@ func GetCommandFromCapabilityID(capabilityID string) string {
return "http_trigger"
case strings.HasPrefix(capabilityID, "http-actions"):
return "http_action"
case strings.HasPrefix(capabilityID, "mock"):
return "mock"
default:
return ""
}
Expand Down
9 changes: 6 additions & 3 deletions core/services/workflows/artifacts/v2/orm.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,8 @@ func (orm *orm) UpsertWorkflowSpec(ctx context.Context, spec *job.WorkflowSpec)
config_url,
created_at,
updated_at,
spec_type
spec_type,
attributes
) VALUES (
:workflow,
:config,
Expand All @@ -76,7 +77,8 @@ func (orm *orm) UpsertWorkflowSpec(ctx context.Context, spec *job.WorkflowSpec)
:config_url,
:created_at,
:updated_at,
:spec_type
:spec_type,
:attributes
) ON CONFLICT (workflow_id) DO UPDATE
SET
workflow = EXCLUDED.workflow,
Expand All @@ -89,7 +91,8 @@ func (orm *orm) UpsertWorkflowSpec(ctx context.Context, spec *job.WorkflowSpec)
config_url = EXCLUDED.config_url,
created_at = EXCLUDED.created_at,
updated_at = EXCLUDED.updated_at,
spec_type = EXCLUDED.spec_type
spec_type = EXCLUDED.spec_type,
attributes = EXCLUDED.attributes
RETURNING id
`

Expand Down
176 changes: 115 additions & 61 deletions core/services/workflows/syncer/v2/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -537,6 +537,7 @@ func (h *eventHandler) createWorkflowSpec(ctx context.Context, payload WorkflowR
SpecType: job.WASMFile,
BinaryURL: payload.BinaryURL,
ConfigURL: payload.ConfigURL,
Attributes: payload.Attributes,
}

if _, err = h.workflowArtifactsStore.UpsertWorkflowSpec(ctx, entry); err != nil {
Expand Down Expand Up @@ -628,55 +629,9 @@ func (h *eventHandler) engineFactoryFn(ctx context.Context, workflowID string, o
}

// V2 aka "NoDAG"
cfg := &v2.EngineConfig{
Lggr: h.lggr,
Module: module,
WorkflowConfig: config,
CapRegistry: h.capRegistry,
DonSubscriber: h.workflowDonSubscriber,
UseLocalTimeProvider: h.useLocalTimeProvider,
DonTimeStore: h.donTimeStore,
ExecutionsStore: h.workflowStore,
WorkflowID: workflowID,
WorkflowOwner: owner,
WorkflowName: name,
WorkflowTag: tag,
WorkflowEncryptionKey: h.workflowEncryptionKey,
cfg := h.newV2EngineConfig(module, workflowID, owner, tag, sdkName, name, config)

LocalLimits: v2.EngineLimits{}, // all defaults
LocalLimiters: h.engineLimiters,
FeatureFlags: h.featureFlags,
GlobalExecutionConcurrencyLimiter: h.workflowLimits,

BeholderEmitter: func() custmsg.MessageEmitter {
h.emitterMu.RLock()
defer h.emitterMu.RUnlock()
return h.emitter
}(),
BillingClient: h.billingClient,

WorkflowRegistryAddress: h.workflowRegistryAddress,
WorkflowRegistryChainSelector: h.workflowRegistryChainSelector,
OrgResolver: h.orgResolver,
DebugMode: h.debugMode,
SecretsFetcher: h.secretsFetcher,
SdkName: sdkName,
}

// Wire the initDone channel to the OnInitialized lifecycle hook.
// This will be called when the engine completes initialization (including trigger subscriptions).
// We compose with any existing hook to avoid overwriting test hooks or other user-provided hooks.
if initDone != nil {
existingHook := cfg.Hooks.OnInitialized
cfg.Hooks.OnInitialized = func(err error) {
// Signal completion to the handler first
initDone <- err
// Then call any existing hook (e.g., from tests)
if existingHook != nil {
existingHook(err)
}
}
}
h.wireInitDoneHook(cfg, initDone)

return v2.NewEngine(cfg)
}
Expand Down Expand Up @@ -789,24 +744,23 @@ func (h *eventHandler) tryEngineCreate(ctx context.Context, spec *job.WorkflowSp
return fmt.Errorf("invalid workflow name: %w", err)
}

confidential, err := v2.IsConfidential(spec.Attributes)
if err != nil {
return fmt.Errorf("failed to parse workflow attributes: %w", err)
}

// Create a channel to receive the initialization result.
// This allows us to wait for the engine to complete initialization (including trigger subscriptions)
// before emitting the workflowActivated event, ensuring the event accurately reflects deployment status.
initDone := make(chan error, 1)
var engine services.Service

// Scope the engineFactory call so that decodedBinary goes out of scope immediately after the factory returns
engine, err := func() (services.Service, error) {
return h.engineFactory(
ctx,
spec.WorkflowID,
spec.WorkflowOwner,
workflowName,
spec.WorkflowTag,
configBytes,
decodedBinary,
initDone,
)
}()
if confidential {
h.lggr.Infow("routing workflow to confidential execution", "workflowID", spec.WorkflowID)
engine, err = h.confidentialEngineFactory(spec, workflowName, decodedBinary, initDone)
} else {
engine, err = h.engineFactory(ctx, spec.WorkflowID, spec.WorkflowOwner, workflowName, spec.WorkflowTag, configBytes, decodedBinary, initDone)
}
if err != nil {
return fmt.Errorf("failed to create workflow engine: %w", err)
}
Expand Down Expand Up @@ -863,6 +817,106 @@ func (h *eventHandler) tryEngineCreate(ctx context.Context, spec *job.WorkflowSp
return nil
}

// newV2EngineConfig builds the common EngineConfig shared by both the normal
// WASM engine and the confidential engine paths. Caller supplies the module.
func (h *eventHandler) newV2EngineConfig(
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nice ty for doing this

module host.ModuleV2,
workflowID, owner, tag, sdkName string,
name types.WorkflowName,
config []byte,
) *v2.EngineConfig {
return &v2.EngineConfig{
Lggr: h.lggr,
Module: module,
WorkflowConfig: config,
CapRegistry: h.capRegistry,
DonSubscriber: h.workflowDonSubscriber,
UseLocalTimeProvider: h.useLocalTimeProvider,
DonTimeStore: h.donTimeStore,
ExecutionsStore: h.workflowStore,
WorkflowID: workflowID,
WorkflowOwner: owner,
WorkflowName: name,
WorkflowTag: tag,
WorkflowEncryptionKey: h.workflowEncryptionKey,

LocalLimits: v2.EngineLimits{}, // all defaults
LocalLimiters: h.engineLimiters,
FeatureFlags: h.featureFlags,
GlobalExecutionConcurrencyLimiter: h.workflowLimits,

BeholderEmitter: func() custmsg.MessageEmitter {
h.emitterMu.RLock()
defer h.emitterMu.RUnlock()
return h.emitter
}(),
BillingClient: h.billingClient,

WorkflowRegistryAddress: h.workflowRegistryAddress,
WorkflowRegistryChainSelector: h.workflowRegistryChainSelector,
OrgResolver: h.orgResolver,
SecretsFetcher: h.secretsFetcher,
DebugMode: h.debugMode,
SdkName: sdkName,
}
}

// wireInitDoneHook wires the initDone channel to the OnInitialized lifecycle hook.
// This will be called when the engine completes initialization (including trigger subscriptions).
// We compose with any existing hook to avoid overwriting test hooks or other user-provided hooks.
func (h *eventHandler) wireInitDoneHook(cfg *v2.EngineConfig, initDone chan<- error) {
if initDone == nil {
return
}
existingHook := cfg.Hooks.OnInitialized
cfg.Hooks.OnInitialized = func(err error) {
// Signal completion to the handler first
initDone <- err
// Then call any existing hook (e.g., from tests)
if existingHook != nil {
existingHook(err)
}
}
}

// confidentialEngineFactory creates a V2 engine backed by a ConfidentialModule
// instead of a local WASM module. The ConfidentialModule delegates execution to
// the confidential-workflows capability which runs the WASM inside a TEE.
func (h *eventHandler) confidentialEngineFactory(
spec *job.WorkflowSpec,
workflowName types.WorkflowName,
decodedBinary []byte,
initDone chan<- error,
) (services.Service, error) {
attrs, err := v2.ParseWorkflowAttributes(spec.Attributes)
if err != nil {
return nil, fmt.Errorf("failed to parse workflow attributes: %w", err)
}

binaryHash := v2.ComputeBinaryHash(decodedBinary)

lggr := logger.Named(h.lggr, "WorkflowEngine.ConfidentialModule")
lggr = logger.With(lggr, "workflowID", spec.WorkflowID, "workflowName", spec.WorkflowName, "workflowOwner", spec.WorkflowOwner)

// nil resolver: raw binaryURL is passed to the enclave as-is.
// PR 5/5 (#21642) wires this to the storage service retriever
// so the enclave receives a presigned URL.
module := v2.NewConfidentialModule(
h.capRegistry,
spec.BinaryURL,
binaryHash,
spec.WorkflowID, spec.WorkflowOwner, workflowName.String(), spec.WorkflowTag,
attrs.VaultDonSecrets,
nil,
lggr,
)

cfg := h.newV2EngineConfig(module, spec.WorkflowID, spec.WorkflowOwner, spec.WorkflowTag, "", workflowName, []byte(spec.Config))
h.wireInitDoneHook(cfg, initDone)

return v2.NewEngine(cfg)
}

// logCustMsg emits a custom message to the external sink and logs an error if that fails.
func logCustMsg(ctx context.Context, cma custmsg.MessageEmitter, msg string, log logger.Logger) {
err := cma.Emit(ctx, msg)
Expand Down
Loading
Loading