From c31f1d08f8b2c4bf644729a9cea9a81f5ad11acb Mon Sep 17 00:00:00 2001 From: stack72 Date: Mon, 13 Apr 2026 18:34:54 +0100 Subject: [PATCH 1/2] fix: detect async CEL functions in forEach.in and throw actionable error forEach.in is evaluated synchronously, but async CEL functions (data.latest, data.findByTag, data.findBySpec, data.query) return Promises that coerceBigInts silently converts to {}, causing forEach to expand zero steps with no error. Add Promise detection in CelEvaluator.evaluate before coerceBigInts runs, and wrap the error in expandForEachSteps and libswamp evaluate.ts with a UserError that names the expression and points to the nested workflow workaround. Closes: swamp-club #88 Co-Authored-By: Claude Opus 4.6 (1M context) --- design/expressions.md | 30 ++++---- src/domain/workflows/execution_service.ts | 22 +++++- .../workflows/execution_service_test.ts | 76 ++++++++++++++++++- src/infrastructure/cel/cel_evaluator.ts | 26 +++++++ src/infrastructure/cel/cel_evaluator_test.ts | 69 ++++++++++++++++- src/libswamp/workflows/evaluate.ts | 22 +++++- 6 files changed, 224 insertions(+), 21 deletions(-) diff --git a/design/expressions.md b/design/expressions.md index 751aaa78..bea853d9 100644 --- a/design/expressions.md +++ b/design/expressions.md @@ -177,28 +177,24 @@ attributes: ### data.findBySpec(modelName, specName) Returns all data records for a model that match a given output spec name. -Commonly used in `forEach` expressions to iterate over variable-length output. +Commonly used in `task.inputs` to pass variable-length output into a step. **Workflow run scoping:** When called inside a workflow run, `findBySpec` only returns data produced during the current run. This prevents stale data from -previous runs leaking into `forEach` iteration. Outside a workflow context, it -returns all data globally. The same run-scoping applies to -`context.readModelData()` and `context.queryData()` in extension model methods -(see `design/data-query.md`). +previous runs leaking into iteration. Outside a workflow context, it returns all +data globally. The same run-scoping applies to `context.readModelData()` and +`context.queryData()` in extension model methods (see `design/data-query.md`). + +**Async limitation:** `data.findBySpec()` is async and cannot be used directly in +`forEach.in` (which is evaluated synchronously). To iterate over findBySpec +results, resolve the call in a parent workflow's `task.inputs` and pass the array +to a child workflow. See +`.claude/skills/swamp-workflow/references/nested-workflows.md`. ```yaml -# In a forEach step — only sees data from the current workflow run: -steps: - - name: dl-${{ self.ep.attributes.title }} - forEach: - item: ep - in: ${{ data.findBySpec("dedup-model", "episode") }} - task: - type: model_method - modelIdOrName: downloader - methodName: download - inputs: - uri: ${{ self.ep.attributes.url }} +# In task.inputs — resolves the async call before passing to the step: +inputs: + episodes: ${{ data.findBySpec("dedup-model", "episode") }} ``` ### data.findByTag(tagKey, tagValue) diff --git a/src/domain/workflows/execution_service.ts b/src/domain/workflows/execution_service.ts index c5d78738..92ca0c63 100644 --- a/src/domain/workflows/execution_service.ts +++ b/src/domain/workflows/execution_service.ts @@ -73,6 +73,7 @@ import { } from "../expressions/model_resolver.ts"; import { CelEvaluator } from "../../infrastructure/cel/cel_evaluator.ts"; import { UserError } from "../errors.ts"; +import { InvalidExpressionError } from "../expressions/errors.ts"; import { getRunLogger, runFileSink, @@ -1550,7 +1551,26 @@ export class WorkflowExecutionService { } const celExpr = match[1]; - const items = celEvaluator.evaluate(celExpr, context); + let items: unknown; + try { + items = celEvaluator.evaluate(celExpr, context); + } catch (error) { + if (error instanceof InvalidExpressionError) { + throw new UserError( + `forEach.in expression '$\{{ ${celExpr} }}' returned an ` + + `unresolved Promise.\n\n` + + `forEach.in is evaluated synchronously and cannot await ` + + `async CEL functions (data.latest, data.findByTag, ` + + `data.findBySpec, data.query).\n\n` + + `Fix: move the async call into a parent workflow's ` + + `task.inputs (which IS awaited) and have the child ` + + `iterate over inputs.. See:\n` + + `.claude/skills/swamp-workflow/references/` + + `nested-workflows.md#when-to-use-nested-workflows`, + ); + } + throw error; + } // Handle both arrays and objects const expandedSteps: ExpandedStep[] = []; diff --git a/src/domain/workflows/execution_service_test.ts b/src/domain/workflows/execution_service_test.ts index 857aabe6..7756d5ba 100644 --- a/src/domain/workflows/execution_service_test.ts +++ b/src/domain/workflows/execution_service_test.ts @@ -17,7 +17,12 @@ // You should have received a copy of the GNU Affero General Public License // along with Swamp. If not, see . -import { assertEquals, assertNotEquals, assertRejects } from "@std/assert"; +import { + assertEquals, + assertNotEquals, + assertRejects, + assertStringIncludes, +} from "@std/assert"; import { join } from "@std/path"; import { coerceToSuffix, @@ -2683,3 +2688,72 @@ Deno.test("DefaultStepExecutor wires dataQueryService into MethodContext", async } }); }); + +// --- forEach.in async Promise detection (Issue #88) --- + +Deno.test({ + name: + "expandForEachSteps: throws UserError when forEach.in uses async data function", + // The error path interrupts normal cleanup of internally-opened files + // (e.g. the CatalogStore WAL), so resource sanitization is disabled. + sanitizeResources: false, + sanitizeOps: false, + fn: async () => { + await withTempDir(async (tempDir) => { + const workflowRepo = new InMemoryWorkflowRepository(); + const runRepo = new InMemoryWorkflowRunRepository(); + const executor = new MockStepExecutor(); + + const workflow = Workflow.create({ + name: "async-foreach", + jobs: [ + Job.create({ + name: "download", + steps: [ + Step.create({ + name: "process-${{ self.ep.name }}", + task: StepTask.model("test-model", "run"), + forEach: { + item: "ep", + in: '${{ data.findBySpec("producer", "result") }}', + }, + }), + ], + }), + ], + }); + await workflowRepo.save(workflow); + + const catalogStore = new CatalogStore(join(tempDir, "_catalog.db")); + try { + const service = new WorkflowExecutionService( + workflowRepo, + runRepo, + tempDir, + executor, + undefined, + catalogStore, + ); + + // The ModelResolver wires up async data delegates (findBySpec returns + // a Promise). expandForEachSteps uses sync evaluate, which hits the + // Promise detection guard and throws InvalidExpressionError. The + // forEach catch-and-wrap turns it into a UserError. + const error = await assertRejects( + () => service.execute(workflow.name), + ); + + assertStringIncludes( + (error as Error).message, + "unresolved Promise", + ); + assertStringIncludes( + (error as Error).message, + "nested-workflows.md", + ); + } finally { + catalogStore.close(); + } + }); + }, +}); diff --git a/src/infrastructure/cel/cel_evaluator.ts b/src/infrastructure/cel/cel_evaluator.ts index 1c73e4c9..c6cf2ac6 100644 --- a/src/infrastructure/cel/cel_evaluator.ts +++ b/src/infrastructure/cel/cel_evaluator.ts @@ -348,6 +348,32 @@ export class CelEvaluator { const wrappedContext = this.wrapNamespaces(context); const result = this.env.evaluate(transformedExpr, wrappedContext); + + // Detect unresolved Promises from async CEL functions (data.latest, + // data.findByTag, data.findBySpec, data.query). These functions are + // async but the sync evaluate path cannot await them, so the raw + // Promise leaks through. Catch it here — before coerceBigInts + // silently converts the Promise to {} — and throw a clear error. + if (result instanceof Promise) { + // Suppress the dangling rejection — the async operation is still + // in flight but we are about to throw synchronously. Without + // this, the eventual rejection surfaces as an unhandled promise + // rejection in the runtime. + result.catch(() => {}); + + throw new InvalidExpressionError( + "Expression returned an unresolved Promise. " + + "Async CEL functions (data.latest, data.findByTag, " + + "data.findBySpec, data.query) cannot be used in " + + "synchronously-evaluated contexts like forEach.in. " + + "Move the async call into a parent workflow's task.inputs " + + "(which IS awaited) and pass the resolved value to a child " + + "workflow. See: .claude/skills/swamp-workflow/references/" + + "nested-workflows.md#when-to-use-nested-workflows", + expression, + ); + } + return coerceBigInts(result); } catch (error) { throw new InvalidExpressionError( diff --git a/src/infrastructure/cel/cel_evaluator_test.ts b/src/infrastructure/cel/cel_evaluator_test.ts index c7606433..ec6967b1 100644 --- a/src/infrastructure/cel/cel_evaluator_test.ts +++ b/src/infrastructure/cel/cel_evaluator_test.ts @@ -17,7 +17,7 @@ // You should have received a copy of the GNU Affero General Public License // along with Swamp. If not, see . -import { assertEquals, assertThrows } from "@std/assert"; +import { assertEquals, assertStringIncludes, assertThrows } from "@std/assert"; import { CelEvaluator } from "./cel_evaluator.ts"; import { InvalidExpressionError } from "../../domain/expressions/errors.ts"; import { transformHyphenatedModelRefs } from "../../domain/expressions/expression_parser.ts"; @@ -924,3 +924,70 @@ Deno.test("CelEvaluator: directly-missing input throws InvalidExpressionError", InvalidExpressionError, ); }); + +// --- Promise detection in sync evaluate (Issue #88) --- + +Deno.test("evaluate: throws InvalidExpressionError when data.latest returns a Promise", () => { + const evaluator = new CelEvaluator(); + const context = { + data: { + latest: (_m: string, _d: string) => + Promise.resolve({ attributes: { episodes: [] } }), + }, + }; + + const error = assertThrows( + () => evaluator.evaluate('data.latest("dedup", "current")', context), + InvalidExpressionError, + ); + assertStringIncludes(error.message, "unresolved Promise"); + assertStringIncludes(error.message, "nested-workflows.md"); +}); + +Deno.test("evaluate: throws InvalidExpressionError when data.findBySpec returns a Promise", () => { + const evaluator = new CelEvaluator(); + const context = { + data: { + findBySpec: (_m: string, _s: string) => Promise.resolve([]), + }, + }; + + const error = assertThrows( + () => evaluator.evaluate('data.findBySpec("model", "spec")', context), + InvalidExpressionError, + ); + assertStringIncludes(error.message, "unresolved Promise"); +}); + +Deno.test("evaluate: throws InvalidExpressionError when data.findByTag returns a Promise", () => { + const evaluator = new CelEvaluator(); + const context = { + data: { + findByTag: (_k: string, _v: string) => Promise.resolve([]), + }, + }; + + const error = assertThrows( + () => evaluator.evaluate('data.findByTag("key", "value")', context), + InvalidExpressionError, + ); + assertStringIncludes(error.message, "unresolved Promise"); +}); + +Deno.test("evaluate: sync data functions that return arrays still work", () => { + const evaluator = new CelEvaluator(); + const context = { + data: { + findBySpec: (_m: string, _s: string) => [ + { name: "a", attributes: { x: 1 } }, + ], + }, + }; + + const result = evaluator.evaluate( + 'data.findBySpec("model", "spec")', + context, + ); + assertEquals(Array.isArray(result), true); + assertEquals((result as unknown[]).length, 1); +}); diff --git a/src/libswamp/workflows/evaluate.ts b/src/libswamp/workflows/evaluate.ts index f5cff95f..0132ff83 100644 --- a/src/libswamp/workflows/evaluate.ts +++ b/src/libswamp/workflows/evaluate.ts @@ -46,6 +46,7 @@ import { DataQueryService } from "../../domain/data/data_query_service.ts"; import type { DatastorePathResolver } from "../../domain/datastore/datastore_path_resolver.ts"; import type { LibSwampContext } from "../context.ts"; import { notFound, type SwampError } from "../errors.ts"; +import { InvalidExpressionError } from "../../domain/expressions/errors.ts"; /** Evaluation result for a single workflow. */ export interface WorkflowEvaluateItemData { @@ -239,7 +240,26 @@ async function evaluateWorkflowInternal( continue; } - const items = deps.evaluateCel(inMatch[1], context); + let items: unknown; + try { + items = deps.evaluateCel(inMatch[1], context); + } catch (error) { + if (error instanceof InvalidExpressionError) { + throw new Error( + `forEach.in expression '$\{{ ${inMatch[1]} }}' returned an ` + + `unresolved Promise.\n\n` + + `forEach.in is evaluated synchronously and cannot await ` + + `async CEL functions (data.latest, data.findByTag, ` + + `data.findBySpec, data.query).\n\n` + + `Fix: move the async call into a parent workflow's ` + + `task.inputs (which IS awaited) and have the child ` + + `iterate over inputs.. See:\n` + + `.claude/skills/swamp-workflow/references/` + + `nested-workflows.md#when-to-use-nested-workflows`, + ); + } + throw error; + } const itemName = stepData.forEach.item; const nameHasExpression = /\$\{\{.+?\}\}/.test(stepData.name); From 7627a7c91c07bf8ea0dfc918fcd2e03d1c2d0062 Mon Sep 17 00:00:00 2001 From: stack72 Date: Mon, 13 Apr 2026 18:44:12 +0100 Subject: [PATCH 2/2] =?UTF-8?q?fix:=20address=20review=20feedback=20?= =?UTF-8?q?=E2=80=94=20narrow=20catch,=20fix=20double-wrap,=20use=20UserEr?= =?UTF-8?q?ror?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Narrow catch blocks to only match Promise-specific errors by checking error.message.includes("unresolved Promise"). Other CEL errors (syntax, missing vars) now propagate with their original message. - Add `if (error instanceof InvalidExpressionError) throw error` in CelEvaluator.evaluate catch to prevent double-wrapping. - Change evaluate.ts from plain Error to UserError for clean output without stack traces. Co-Authored-By: Claude Opus 4.6 (1M context) --- src/domain/workflows/execution_service.ts | 5 ++++- src/infrastructure/cel/cel_evaluator.ts | 3 +++ src/libswamp/workflows/evaluate.ts | 8 ++++++-- 3 files changed, 13 insertions(+), 3 deletions(-) diff --git a/src/domain/workflows/execution_service.ts b/src/domain/workflows/execution_service.ts index 92ca0c63..f9eaff63 100644 --- a/src/domain/workflows/execution_service.ts +++ b/src/domain/workflows/execution_service.ts @@ -1555,7 +1555,10 @@ export class WorkflowExecutionService { try { items = celEvaluator.evaluate(celExpr, context); } catch (error) { - if (error instanceof InvalidExpressionError) { + if ( + error instanceof InvalidExpressionError && + error.message.includes("unresolved Promise") + ) { throw new UserError( `forEach.in expression '$\{{ ${celExpr} }}' returned an ` + `unresolved Promise.\n\n` + diff --git a/src/infrastructure/cel/cel_evaluator.ts b/src/infrastructure/cel/cel_evaluator.ts index c6cf2ac6..4caead4d 100644 --- a/src/infrastructure/cel/cel_evaluator.ts +++ b/src/infrastructure/cel/cel_evaluator.ts @@ -376,6 +376,9 @@ export class CelEvaluator { return coerceBigInts(result); } catch (error) { + // Re-throw InvalidExpressionError directly to avoid double-wrapping + // (the Promise detection above already throws this type). + if (error instanceof InvalidExpressionError) throw error; throw new InvalidExpressionError( error instanceof Error ? error.message : String(error), expression, diff --git a/src/libswamp/workflows/evaluate.ts b/src/libswamp/workflows/evaluate.ts index 0132ff83..8093204f 100644 --- a/src/libswamp/workflows/evaluate.ts +++ b/src/libswamp/workflows/evaluate.ts @@ -47,6 +47,7 @@ import type { DatastorePathResolver } from "../../domain/datastore/datastore_pat import type { LibSwampContext } from "../context.ts"; import { notFound, type SwampError } from "../errors.ts"; import { InvalidExpressionError } from "../../domain/expressions/errors.ts"; +import { UserError } from "../../domain/errors.ts"; /** Evaluation result for a single workflow. */ export interface WorkflowEvaluateItemData { @@ -244,8 +245,11 @@ async function evaluateWorkflowInternal( try { items = deps.evaluateCel(inMatch[1], context); } catch (error) { - if (error instanceof InvalidExpressionError) { - throw new Error( + if ( + error instanceof InvalidExpressionError && + error.message.includes("unresolved Promise") + ) { + throw new UserError( `forEach.in expression '$\{{ ${inMatch[1]} }}' returned an ` + `unresolved Promise.\n\n` + `forEach.in is evaluated synchronously and cannot await ` +