feat: Omadia Conductor — deterministic workflow engine (Spec 005, US1–US9 + waves 1–6 + channel event-emit)#388
Merged
Merged
Conversation
…er & human-in-the-loop Conception for Omadia Conductor: a process layer where the runtime (not the LLM) owns step progression and hand-offs, promoting the existing per-tool postcondition / tool-obligation / deterministic-action atoms to process scope. - spec.md: 9 user stories (engine, durable run lifecycle/resume, triggers, event triggers + connector "Conductor Surface", human steps with durable awaits/reminders/ deadline, principals & role resolver seam, visual+conversational Designer, dry-run, audit), 32 FRs, 10 SCs. - data-model.md: workflow/version/draft, run/run-step, conductor_awaits (+responses), roles/role-assignments (the "baton"), manifest emits: extension, LISTEN/NOTIFY resume, run/await state machines. Status: Draft. In-repo modular (@omadia/conductor-core + kernel wiring + Designer under web-ui/app/admin/conductor/). No connectors built here — only the contract. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
Grounded against the live codebase: phase-by-phase build sequence (US1-US9), reuse map, net-new substrate, 16 resolved integration landmines (A-P), test strategy mapped to SC-001..010, and the 4 owner design decisions (conductor_channel_bindings, conductor_schedules, FOR UPDATE SKIP LOCKED claim, serializable predicate AST).
Phase 0+1 of the Spec 005 integration plan: the deterministic, I/O-free
workflow engine — sibling of @omadia/canvas-core.
- Predicate AST (serializable guards + exit postconditions; no eval) with a
total, deterministic evaluator over {ctx, stepResult}.
- validate(graph, knownRefs?): shape gate (ajv 2020-12) + reachability,
unguarded-cycle detection, deadline-without-fallback, duplicate ids,
fallback-origin, and optional agent/action/role/event reference checks —
each error names the offending node (FR-003).
- nextStep(): postcondition verdict -> matching guarded transition -> declared
fallback -> complete/stuck; identical inputs yield identical decisions
(FR-001/002/006, SC-009).
- Published JSON schema (schema/) kept in parity with the runtime schema by test.
- 46 vitest cases (predicate, engine US1 acceptance, validation, fixtures);
builds clean (tsc) and typechecks. Wired into the workspace build order.
ajv-only runtime dep; engine has no DB/network/LLM I/O (FR-032).
…l slice) Deterministic engine now drives real, persisted runs end to end: - migrations/0001_conductor.sql: full conductor_* schema (workflows/versions/ drafts/runs/run_steps/awaits/await_responses/roles/role_assignments) + the resolved-decision tables (conductor_channel_bindings, conductor_schedules) + claim columns + pg_notify triggers. TEXT+CHECK enums, idempotent. - runConductorMigrations (per-subsystem migrator, mirrors runAuthMigrations). - ConductorWorkflowStore (create + versioned publish) and ConductorRunStore (run + durable per-step record, recordStepAndAdvance persists before advance). - StepEffects seam (so US8 preview reuses this executor) + StubStepEffects. - ConductorRunExecutor.startRun: validate-active-version -> create run -> engine nextStep loop -> persist each step + accumulated context; human steps park as waiting (durable awaits land later), agent/action run to completion. - Operator API /api/v1/operator/conductors (publish/list/status/start-run/trace), graph validated by conductor-core before persist; mounted behind requireAuth. - wireConductor() called from boot inside the graphPool block (inert in-memory). tsc --noEmit clean across the middleware; conductor-core builds.
…pm ci) The Docker image build runs `npm ci`, which requires the lockfile to include the new conductor-core workspace package. Regenerated with node 22.22.3; minimal diff (one workspace entry, no other dep churn).
- routes: coerce req.params (string | string[]) via paramStr(); cast req.body graph through unknown before WorkflowGraph (TS2352). - runExecutor: human-step actor uses primitive principalKind/principalRef fields instead of the Principal interface (not assignable to JsonValue). Full workspace build (node 22.22.3) clean; surfaces only under the stricter lockfile-pinned deps the Docker image uses.
tsc does not emit .sql; runConductorMigrations scans dist/conductor/migrations at boot. Add the explicit COPY (same pattern as auth/routines/profile migrations) so the production image ships 0001_conductor.sql. Fixes ENOENT crash-loop on boot.
Engine semantics corrected: an 'agent' step targets an **Agent (orchestrator
instance)** resolved by slug from the multi-orchestrator registry — not a stub,
not a bare model. Backend:
- conductor-core: add step.prompt (agent message, {{ctx.x}} interpolation) and
step.input (action input); agentId doc'd as the Agent slug.
- RealStepEffects: agent step runs a genuine turn via
registry.get(slug).built.bundle.agent.chat(...) (the schedule-worker path),
answer.text becomes the step result; action step invokes the real connector
tool via dynamicAgentRuntime.invokeAgentTool. StepMeta threads runId for
session scoping. wireConductor now wires RealStepEffects (stub kept for tests).
Operator UI (the missing entry point):
- web-ui/app/conductor/page.tsx: list/publish workflows (graph validated by the
engine before save), start runs, render the durable step trace + result context.
- Nav: top-level 'Conductor' link; api.ts conductor client; en/de i18n (parity OK).
Full middleware build + web-ui typecheck clean; conductor-core 46 tests green.
A real Agent turn on the fallback orchestrator takes ~tens of seconds; driving the run synchronously inside the HTTP request hit a gateway timeout (500) even though the run completed durably in the background. Now: - startRun drives the run in the background and returns immediately (202); the run is durable and observed via the run/trace API. awaitCompletion keeps a synchronous path for tests/fast steps. - the Conductor UI polls GET /:slug/runs/:runId until the run leaves 'running', then renders the final trace + result context (the real agent text). - log the actual error in the publish/run 500 catch blocks. Verified live: fallback Agent produced a real gpt-5.5 answer persisted in conductor_runs.context (no stub).
A double-submitted publish of a new slug raced SELECT-then-INSERT and one request hit conductor_workflows_slug_key (500). Replace with INSERT ... ON CONFLICT (slug) DO UPDATE + a FOR UPDATE lock to serialize version numbering. Status is set only on first create.
Replace the raw-JSON publish form with a visual canvas (@xyflow/react):
- ConductorCanvas: add agent/action/human step nodes, drag to wire transitions,
per-kind inspector (agent slug+prompt, action id+input, human principal/
channel/message/reminder/deadline, optional postcondition + fallback + entry
toggle; edge guard), trigger config (manual/event/cron), load an existing
workflow's graph, Save→validate+publish, Run+poll. node.id stays stable while
data.stepId is renameable (edge-safe).
- backend GET /:slug returns {workflow, graph} for the editor to load;
getConductorWorkflowGraph client; en/de i18n (parity OK).
- page keeps the workflow list + quick-run and mounts the designer below.
web-ui typecheck + i18n clean; conductor middleware typecheck clean.
A double-fired Add-step click created two nodes sharing one generated id (React rendered one, state held two) → duplicate_step_id on publish. Use a monotonic ref counter for node/edge ids and swallow sub-350ms repeat add clicks; bump the counter past loaded ids.
The headline human-in-the-loop substrate. A human step now opens a durable conductor_awaits row and parks the run as 'waiting' (instead of a dead park): - ConductorAwaitStore (create/get/listWaiting/listDue/recordResponse/atomic close). - executor.resolveAwait: records the response, atomically closes the await, then resumes the run — feeds the response as the step result into the engine and drives onward. expireAwait: on deadline, fires the step's in-graph fallback (FR-017). - ConductorAwaitWorker: minute-tick poll of due awaits → expire (deadline path). - ISO-8601 duration parsing (PT6H/PT24H/P1D) for reminder/deadline. - routes: GET /awaits/pending (operator inbox), POST /awaits/:id/respond. - migration 0002: responder_id → TEXT (session identity, no users join for MVP). - UI: 'Pending human steps' inbox with Approve/Reject that resolves + resumes. Atomic resolution via UPDATE ... WHERE status='waiting' (FR-018). MVP: quorum 'any' (first response wins); holder-at-access auth deferred. backend + web-ui typecheck clean; i18n parity ok (1263 keys).
The recurring synthetic/double-click double-fire (already fixed for add-step) also started two runs / two responses. Guard handleRun, handleRespond, and the canvas run with a 600ms ref debounce so one click is one action.
…US4) The trigger class the real use cases depend on (merge/RC-build, ATS invite, calendar). ConductorEventRouter.emit(eventId, payload): scans enabled workflows, matches each active version's event trigger (eventId + optional payload-filter predicate, evaluated by conductor-core), and starts a run per match with the payload as initial context (FR-013). Operator route POST /emit + a UI 'Emit an event' form to fire/test it. (The kernel-side seam a connector calls; plugin ctx.events.emit + manifest emits-autodiscovery is the remaining connector half.) backend + web-ui typecheck clean; i18n parity ok (1268 keys).
Confidence before go-live. executor.previewRun(slug, payload, humanResponses):
simulates the workflow path in-memory with NO persistence and NO side effects —
no conductor_runs/awaits rows, no notification, no durable await. Human steps are
answered inline (supplied response, default {approved:true}); agent steps run a
real turn; action steps are stubbed (irreversible connector actions not executed).
Returns the full simulated step path (actor, postcondition, transition per step).
Route POST /:slug/preview + a 'Dry-run' button in the designer that renders the
simulated path. Reuses the StepEffects seam built for exactly this.
backend + web-ui typecheck clean; i18n parity ok (1272 keys).
A human step can address role:<key> instead of a fixed person; the role resolves live to its current holder(s). ConductorRoleStore (createRole, addHolder/removeHolder = the baton, resolve()=current open assignments — never frozen, FR-022). Migration 0003: holder_id/delegate_id -> TEXT (assign by session identity, no users join). Routes: GET/POST /roles, POST /roles/:key/holders (add|remove). The awaits inbox resolves role principals live to their current holders (FR-023). UI 'Roles & the baton' section: create role, assign/move holders; inbox shows role -> resolved holders. MVP: default resolver only (external RoleResolver registry is a follow-up); quorum 'any'. backend + web-ui typecheck clean; i18n parity ok (1278 keys).
Add an 'Edit' button to each workflow row in the operator list that loads the workflow into the visual designer and scrolls there. ConductorCanvas gains an editRequest prop (slug + per-click nonce) and loads on change, reusing the existing getConductorWorkflowGraph + versioned publish path.
…hases # Conflicts: # middleware/package-lock.json # middleware/package.json # web-ui/app/_lib/api.ts
Re-drive runs orphaned by a process restart (US2/SC-002) and surface the durable run trace in the operator UI (US9). Resume worker: - ConductorRunResumeWorker reconciles 'running' runs on a 60s tick, claiming stale rows atomically (FOR UPDATE SKIP LOCKED) and re-driving from current_step_id. Lease fencing (claimed_by) on every step/park write makes a superseded driver abort (RunLeaseLostError) instead of double-driving. - staleMs default 900s (> orchestrator wall-clock), per-step claimed_at heartbeat, in-flight set + concurrency cap. - Idempotent human-await re-open (partial-unique conductor_awaits(run_id,step_id) WHERE waiting + ON CONFLICT) so a crash between create() and park() never doubles an await. - migration 0004: claimed_by/claimed_at + covering indexes. Audit viewer: - ConductorRunTrace / ConductorRunHistory (web-ui) over the existing GET /:slug/runs(/:runId) routes: run history + ordered step trace. Tests: ConductorRunResumeWorker tick (4/4). Gates green: build, typecheck, i18n parity, web build.
US4 — make Conductor workflows start automatically: on a cron schedule, and on
domain events a plugin emits.
Cron:
- ConductorScheduleStore + ConductorScheduleWorker: a minute tick fires each
enabled workflow whose cron matches the current UTC minute. Exactly-once per
minute via an atomic DB claim (date_trunc('minute', now()) on both compare and
write, so replica/clock skew can't double-fire).
- Cron schedules reconcile from the graph's cron triggers ATOMICALLY inside the
publish transaction (onPublished hook) so a failed reconcile rolls the publish
back rather than leaving stale schedules firing.
Connector event-emit (the Conductor Surface):
- EventCatalogRegistry + eventEmitIds extractor; resolve hooks on BOTH activation
runtimes (dynamic + tool) so built-in plugins resolve too.
- ctx.events.emit (plugin-api contract): gated on permissions.events.emit,
deny-by-default fail-closed against declared { id, event_emit:true } capabilities,
router resolved lazily; throws typed ConductorUnavailableError when Conductor
is not wired in the host.
- GET /events/catalog for the Designer's trigger picker.
Reviewed in two passes (Claude agents + Codex/Forge); all findings folded in.
Tests: schedule worker (4) + event catalog (5) + resume worker (4). Gates green:
build, typecheck.
…ve 3)
US5 quorum 'all' + US2 resume-safety hardening.
Quorum 'all':
- resolveAwait records each response and resumes only once every CURRENT holder
(resolveRoleHolders, late-bound, now a required dep) has answered; the aggregate
{quorum:'all', approved, responses} is built over the current required set only,
so a holder who lost the baton cannot skew it. close() atomicity gives exactly-once
resume under concurrent final responses.
- A role with no current holder no longer parks (which would hang) — the step takes
its in-graph fallback at dispatch (FR-024).
- Publish guard: a quorum 'all' human step must declare a deadline + fallback
(validate: quorum_all_requires_deadline_fallback) so a non-responding holder
escalates instead of hanging forever.
- recordResponse is gated on the await still being 'waiting' — a late/double-click
response cannot rewrite the audit row the decision was based on.
Per-step hard timeout:
- RealStepEffects wraps agent.chat + invokeAction in withTimeout (StepTimeoutError);
DEFAULT_STEP_TIMEOUT_MS=600_000 is strictly < the resume worker's
DEFAULT_RESUME_STALE_MS=900_000 (invariant asserted by a test), so a step always
settles before its run could be claimed — closing the last single-step
at-least-once window from Wave 1.
Reviewed in two passes (Claude agents + Codex/Forge); all HIGH/MED findings folded in.
Tests: quorum+timeout + invariant (kernel) + conductor-core validate (2). Gates green:
build, typecheck, conductor-core (48), kernel conductor (18).
Documented follow-ups: shared withTimeout util; action idempotency keys for
irreversible writes on timeout; respond-endpoint authz for channel-based responses.
US5 reminders — proactively nudge a waiting human-await's current holders on their channel. Kernel-only (no private channel-plugin edit required). - ConductorChannelBindingStore: user -> conversationRef (upsert/get/getMany). Populated per inbound turn via an onTurnCaptured hook on the routines turn-capture seam (lazy serviceRegistry resolve, fire-and-forget, never breaks a turn). Migration 0005 aligns user_id to TEXT (holder_id/responder_id were already migrated to TEXT in 0002/0003 — one consistent identity space). - awaitWorker reminder branch, CLAIM-THEN-SEND: claimReminderDue atomically advances last_reminder_at (gated on COALESCE(last_reminder_at, created_at) + interval, so the first reminder waits a full interval) BEFORE delivery — so two replicas, or a failed send/record, re-nudge at most once per interval (never a per-minute storm). Holders resolved LIVE (FR-022) + deduped; quorum='all' skips holders who already responded; each send isolated in try/catch; unreachable set iff someone still needs nudging but none reachable (cleared on success), surfaced in the operator inbox via /awaits/pending. - getMany batches the binding fetch; resolveAwaitHolders shared worker+inbox. IDENTITY NOTE: delivery resolves only when a human principal's id equals the binding key (the channel-native turn user id). Mismatches degrade gracefully to `unreachable` (never hang, never spam). A cross-id-space identity bridge and a kernel-owned turn-capture seam are documented follow-ups. Reviewed in two passes (Claude agents + Codex/Forge); all findings folded in: per-holder send isolation, unreachable reset, batch fetch, shared holder helper (pass 1); claim-then-send for first-interval + replica + storm safety, quorum=all re-nudge filter (pass 2). Forge H1 (identity-type mismatch) verified REFUTED — holder_id/responder_id are TEXT via migrations 0002/0003. Tests: reminder worker (6) + existing conductor suites. Gates green: build, typecheck, kernel tests (24).
US4/FR-028 — the visual Designer's event trigger now sources declared events
from the live catalog instead of blind free-text.
- api.ts getConductorEventCatalog() -> GET /operator/conductors/events/catalog.
- ConductorCanvas fetches the catalog once on mount (cancelled-flag cleanup,
Array.isArray guard, errors degrade to empty hint + free-text). The event-id
input gains a useId()-keyed <datalist> of the declared events (deduped); a hint
shows the count or a "none declared yet" message. Free-text custom ids remain
fully allowed (datalist is suggestion-only); publish serialization unchanged.
- i18n en+de (eventCatalogHint {count}, eventCatalogEmpty).
Reviewed in two passes (Claude + Codex/Forge): pass 1 -> Array guard, useId,
dedupe; pass 2 (Forge) -> APPROVE (only a comment-accuracy nit, folded in).
Gates green: i18n parity, web build.
Add a chat surface that co-designs a Conductor workflow graph, parallel to the visual Designer. A builder turn is stateless: the client sends the current draft graph + message + history; the kernel runs an orchestrator chat turn that returns JSON patches, applies them via a pure 7-op patch algebra, validates the result with @omadia/conductor-core, and returns the patched draft + reply + validation. The draft stays client-side (one shared WorkflowGraph), so chat and canvas are two views of the same graph; "Show in designer" loads the chat draft into the visual canvas on explicit action (no silent overwrite of manual edits). Backend (middleware/src/conductor): - graphPatch.ts: applyGraphPatches over add/update/remove step, add/remove transition, set_trigger, set_entry; pure, per-patch error isolation, drops dangling transitions and orphaned fallback pointers on remove. - builderAgent.ts: ConductorBuilderAgent.runTurn -- best-of-two self-correcting turn via bundle.agent.chat, robust multi-candidate JSON extraction, unique per-turn sessionScope (no cross-operator memory bleed), bounded chat timeout. - routes.ts: POST /builder/turn (operator-auth, message/history/graph input caps + history element filtering); wired in wireConductor. - 20 new kernel tests. Web-ui (app/conductor): - ConductorChatPane: transcript, patch chips, validation + apply-error surfacing, publish via the existing versioned path, explicit "Show in designer". - ConductorCanvas: additive loadGraphRequest prop (hydrateFromGraph refactor, existing load path unchanged). - en/de i18n. Gates green: middleware build+typecheck, conductor-core 48 + kernel 44 tests, web-ui i18n + build. Reviewed (2 Claude passes + Forge APPROVE-WITH-CHANGES): no HIGH; all MED folded.
…l-world P1a) Channel plugins (e.g. Microsoft Teams) can now declare `event_emit` capabilities that trigger Conductor workflows. The tool and dynamic-agent runtimes already register a plugin's eventEmitIds() into the EventCatalogRegistry on activation, but the channel activation path (DefaultChannelRegistry) never did -- so a channel-declared event would be rejected deny-by-default at ctx.events.emit time. - channelRegistry.activate: register eventEmitIds(catalogEntry.manifest) after a successful activation; deactivate: unregister (runs on every teardown path, before the no-handle early return -- no catalog leak). - index.ts: thread the shared eventCatalogRegistry singleton into the channel registry (same instance the tool/dynamic runtimes + event router already use). - Test drives the real activate/deactivate against a live catalog, so reverting the wiring fails it. This is Phase 1a of the Conductor real-world plan (the OSS enabler); the Teams plugin declaring + emitting events, and the email identity-bridge for reminders, follow. Reviewed (Claude + Forge APPROVE-WITH-CHANGES): production code clean; folded the test-quality finding (now exercises the wiring) + dropped a misleading fixture comment.
CI runs `lint + typecheck + test`; the local gates had run only typecheck + test, so two eslint errors slipped through and turned the PR checks red: - routes.ts: `ConductorRunExecutor` is used only as a type -> `import type` (@typescript-eslint/consistent-type-imports). - Four conductor effects (canvas edit-request + loadGraph-request, conductor page mount-load, run-trace mount-load) tripped react-hooks/set-state-in-effect. Disabled per the codebase's established pattern -- app/memory and app/graph use the same inline disable for legitimate fetch-on-mount / prop-trigger effects. Both packages now lint clean (0 errors, warnings only); tsc + 47 conductor tests green.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Omadia Conductor — deterministic workflow engine (Spec 005)
This PR lands the complete Conductor: a deterministic, durable workflow engine for Omadia. Operators compose multi-step processes (agent / action / human steps) that run reliably across restarts, pause for human approval, react to real-world events, and can be built either visually or by chatting with a builder agent.
It supersedes #321 (Spec 005) — that branch's spec + first implementation are included here, plus six follow-on waves and the channel event-emit enabler. Merging this brings the full conductor subsystem (including the
ctx.eventsplugin contract) intomain.What's included
Core engine (US1–US3)
@omadia/conductor-core— a pure, dependency-light engine: workflow graph types, a serializable predicate AST for guards/postconditions (evaluated, nevereval'd),validate(graph)(ajv shape gate + reachability / unguarded-cycle / dangling-ref checks), and a deterministicnextStep(). 48 vitest cases.middleware/src/conductor/): per-subsystem migrator, workflow/run/await/role/schedule/binding stores over the graph pool, aStepEffectsseam (real orchestrator turns vs. preview stub), versioned race-safe publish, and the operator API mounted behindrequireAuth.Triggers (US4)
ctx.events.emit(eventId, payload)provisioned for plugins, gated onpermissions.events.emit, deny-by-default via anEventCatalogRegistry; manifests declare emittable events. (Phase 1a in this PR extends catalog registration to the channel runtime — see below.)Human-in-the-loop (US5, US6)
conductor_awaits: a human step parks the run, an operator inbox approves/rejects, the run resumes. Deadlines fire the in-graph fallback; quorumany/all; proactive reminders for waiting holders.Designer & preview (US7, US8)
Durability & audit (US2, US9)
Channel event-emit enabler (Phase 1a)
DefaultChannelRegistry.activatenow registers a channel plugin's declaredevent_emitids into theEventCatalogRegistry(and unregisters on deactivate) — the tool and dynamic-agent runtimes already did this; the channel activation path was the missing third. This lets channel plugins (e.g. Microsoft Teams) declare events that trigger Conductor workflows.Wave history (each independently reviewed)
9e06d1e84f74c4ctx.events.emit, catalog on both runtimes)6e5d33eall+ per-step hard timeout (< resume-stale window)0d46339bb66f0559d0651df2835fData model
New
conductor_*tables via a dedicated migrator (middleware/src/conductor/migrations/0001–0005): workflows + versions, runs + per-step trace (with run-claim/lease columns), durable awaits (responder/holder TEXT ids, reminder +unreachablecolumns), roles + holders, schedules, and channel bindings. The Dockerfile copies the migrations dir into the runtime image.Testing
@omadia/conductor-core: 48 vitest.i18n:check(en/de parity) + production build clean.build+typecheckclean.Notes for review
packages/conductor-core; kernel I/O is behind stores + theStepEffectsseam.evalanywhere — guards/postconditions are a serializable AST.Follow-ups (not in this PR)
mainso the plugin builds against thectx.eventscontract.