Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 13 additions & 17 deletions design/expressions.md
Original file line number Diff line number Diff line change
Expand Up @@ -177,28 +177,24 @@ attributes:
### data.findBySpec(modelName, specName)

Returns all data records for a model that match a given output spec name.
Commonly used in `forEach` expressions to iterate over variable-length output.
Commonly used in `task.inputs` to pass variable-length output into a step.

**Workflow run scoping:** When called inside a workflow run, `findBySpec` only
returns data produced during the current run. This prevents stale data from
previous runs leaking into `forEach` iteration. Outside a workflow context, it
returns all data globally. The same run-scoping applies to
`context.readModelData()` and `context.queryData()` in extension model methods
(see `design/data-query.md`).
previous runs leaking into iteration. Outside a workflow context, it returns all
data globally. The same run-scoping applies to `context.readModelData()` and
`context.queryData()` in extension model methods (see `design/data-query.md`).

**Async limitation:** `data.findBySpec()` is async and cannot be used directly in
`forEach.in` (which is evaluated synchronously). To iterate over findBySpec
results, resolve the call in a parent workflow's `task.inputs` and pass the array
to a child workflow. See
`.claude/skills/swamp-workflow/references/nested-workflows.md`.

```yaml
# In a forEach step — only sees data from the current workflow run:
steps:
- name: dl-${{ self.ep.attributes.title }}
forEach:
item: ep
in: ${{ data.findBySpec("dedup-model", "episode") }}
task:
type: model_method
modelIdOrName: downloader
methodName: download
inputs:
uri: ${{ self.ep.attributes.url }}
# In task.inputs — resolves the async call before passing to the step:
inputs:
episodes: ${{ data.findBySpec("dedup-model", "episode") }}
```

### data.findByTag(tagKey, tagValue)
Expand Down
25 changes: 24 additions & 1 deletion src/domain/workflows/execution_service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ import {
} from "../expressions/model_resolver.ts";
import { CelEvaluator } from "../../infrastructure/cel/cel_evaluator.ts";
import { UserError } from "../errors.ts";
import { InvalidExpressionError } from "../expressions/errors.ts";
import {
getRunLogger,
runFileSink,
Expand Down Expand Up @@ -1550,7 +1551,29 @@ export class WorkflowExecutionService {
}

const celExpr = match[1];
const items = celEvaluator.evaluate(celExpr, context);
let items: unknown;
try {
items = celEvaluator.evaluate(celExpr, context);
} catch (error) {
if (
error instanceof InvalidExpressionError &&
error.message.includes("unresolved Promise")
) {
throw new UserError(
`forEach.in expression '$\{{ ${celExpr} }}' returned an ` +
`unresolved Promise.\n\n` +
`forEach.in is evaluated synchronously and cannot await ` +
`async CEL functions (data.latest, data.findByTag, ` +
`data.findBySpec, data.query).\n\n` +
`Fix: move the async call into a parent workflow's ` +
`task.inputs (which IS awaited) and have the child ` +
`iterate over inputs.<name>. See:\n` +
`.claude/skills/swamp-workflow/references/` +
`nested-workflows.md#when-to-use-nested-workflows`,
);
}
throw error;
}

// Handle both arrays and objects
const expandedSteps: ExpandedStep[] = [];
Expand Down
76 changes: 75 additions & 1 deletion src/domain/workflows/execution_service_test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,12 @@
// You should have received a copy of the GNU Affero General Public License
// along with Swamp. If not, see <https://www.gnu.org/licenses/>.

import { assertEquals, assertNotEquals, assertRejects } from "@std/assert";
import {
assertEquals,
assertNotEquals,
assertRejects,
assertStringIncludes,
} from "@std/assert";
import { join } from "@std/path";
import {
coerceToSuffix,
Expand Down Expand Up @@ -2683,3 +2688,72 @@ Deno.test("DefaultStepExecutor wires dataQueryService into MethodContext", async
}
});
});

// --- forEach.in async Promise detection (Issue #88) ---

Deno.test({
name:
"expandForEachSteps: throws UserError when forEach.in uses async data function",
// The error path interrupts normal cleanup of internally-opened files
// (e.g. the CatalogStore WAL), so resource sanitization is disabled.
sanitizeResources: false,
sanitizeOps: false,
fn: async () => {
await withTempDir(async (tempDir) => {
const workflowRepo = new InMemoryWorkflowRepository();
const runRepo = new InMemoryWorkflowRunRepository();
const executor = new MockStepExecutor();

const workflow = Workflow.create({
name: "async-foreach",
jobs: [
Job.create({
name: "download",
steps: [
Step.create({
name: "process-${{ self.ep.name }}",
task: StepTask.model("test-model", "run"),
forEach: {
item: "ep",
in: '${{ data.findBySpec("producer", "result") }}',
},
}),
],
}),
],
});
await workflowRepo.save(workflow);

const catalogStore = new CatalogStore(join(tempDir, "_catalog.db"));
try {
const service = new WorkflowExecutionService(
workflowRepo,
runRepo,
tempDir,
executor,
undefined,
catalogStore,
);

// The ModelResolver wires up async data delegates (findBySpec returns
// a Promise). expandForEachSteps uses sync evaluate, which hits the
// Promise detection guard and throws InvalidExpressionError. The
// forEach catch-and-wrap turns it into a UserError.
const error = await assertRejects(
() => service.execute(workflow.name),
);

assertStringIncludes(
(error as Error).message,
"unresolved Promise",
);
assertStringIncludes(
(error as Error).message,
"nested-workflows.md",
);
} finally {
catalogStore.close();
}
});
},
});
29 changes: 29 additions & 0 deletions src/infrastructure/cel/cel_evaluator.ts
Original file line number Diff line number Diff line change
Expand Up @@ -348,8 +348,37 @@ export class CelEvaluator {
const wrappedContext = this.wrapNamespaces(context);

const result = this.env.evaluate(transformedExpr, wrappedContext);

// Detect unresolved Promises from async CEL functions (data.latest,
// data.findByTag, data.findBySpec, data.query). These functions are
// async but the sync evaluate path cannot await them, so the raw
// Promise leaks through. Catch it here — before coerceBigInts
// silently converts the Promise to {} — and throw a clear error.
if (result instanceof Promise) {
// Suppress the dangling rejection — the async operation is still
// in flight but we are about to throw synchronously. Without
// this, the eventual rejection surfaces as an unhandled promise
// rejection in the runtime.
result.catch(() => {});

throw new InvalidExpressionError(
"Expression returned an unresolved Promise. " +
"Async CEL functions (data.latest, data.findByTag, " +
"data.findBySpec, data.query) cannot be used in " +
"synchronously-evaluated contexts like forEach.in. " +
"Move the async call into a parent workflow's task.inputs " +
"(which IS awaited) and pass the resolved value to a child " +
"workflow. See: .claude/skills/swamp-workflow/references/" +
"nested-workflows.md#when-to-use-nested-workflows",
expression,
);
}

return coerceBigInts(result);
} catch (error) {
// Re-throw InvalidExpressionError directly to avoid double-wrapping
// (the Promise detection above already throws this type).
if (error instanceof InvalidExpressionError) throw error;
throw new InvalidExpressionError(
error instanceof Error ? error.message : String(error),
expression,
Expand Down
69 changes: 68 additions & 1 deletion src/infrastructure/cel/cel_evaluator_test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
// You should have received a copy of the GNU Affero General Public License
// along with Swamp. If not, see <https://www.gnu.org/licenses/>.

import { assertEquals, assertThrows } from "@std/assert";
import { assertEquals, assertStringIncludes, assertThrows } from "@std/assert";
import { CelEvaluator } from "./cel_evaluator.ts";
import { InvalidExpressionError } from "../../domain/expressions/errors.ts";
import { transformHyphenatedModelRefs } from "../../domain/expressions/expression_parser.ts";
Expand Down Expand Up @@ -924,3 +924,70 @@ Deno.test("CelEvaluator: directly-missing input throws InvalidExpressionError",
InvalidExpressionError,
);
});

// --- Promise detection in sync evaluate (Issue #88) ---

Deno.test("evaluate: throws InvalidExpressionError when data.latest returns a Promise", () => {
const evaluator = new CelEvaluator();
const context = {
data: {
latest: (_m: string, _d: string) =>
Promise.resolve({ attributes: { episodes: [] } }),
},
};

const error = assertThrows(
() => evaluator.evaluate('data.latest("dedup", "current")', context),
InvalidExpressionError,
);
assertStringIncludes(error.message, "unresolved Promise");
assertStringIncludes(error.message, "nested-workflows.md");
});

Deno.test("evaluate: throws InvalidExpressionError when data.findBySpec returns a Promise", () => {
const evaluator = new CelEvaluator();
const context = {
data: {
findBySpec: (_m: string, _s: string) => Promise.resolve([]),
},
};

const error = assertThrows(
() => evaluator.evaluate('data.findBySpec("model", "spec")', context),
InvalidExpressionError,
);
assertStringIncludes(error.message, "unresolved Promise");
});

Deno.test("evaluate: throws InvalidExpressionError when data.findByTag returns a Promise", () => {
const evaluator = new CelEvaluator();
const context = {
data: {
findByTag: (_k: string, _v: string) => Promise.resolve([]),
},
};

const error = assertThrows(
() => evaluator.evaluate('data.findByTag("key", "value")', context),
InvalidExpressionError,
);
assertStringIncludes(error.message, "unresolved Promise");
});

Deno.test("evaluate: sync data functions that return arrays still work", () => {
const evaluator = new CelEvaluator();
const context = {
data: {
findBySpec: (_m: string, _s: string) => [
{ name: "a", attributes: { x: 1 } },
],
},
};

const result = evaluator.evaluate(
'data.findBySpec("model", "spec")',
context,
);
assertEquals(Array.isArray(result), true);
assertEquals((result as unknown[]).length, 1);
});
26 changes: 25 additions & 1 deletion src/libswamp/workflows/evaluate.ts
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,8 @@ import { DataQueryService } from "../../domain/data/data_query_service.ts";
import type { DatastorePathResolver } from "../../domain/datastore/datastore_path_resolver.ts";
import type { LibSwampContext } from "../context.ts";
import { notFound, type SwampError } from "../errors.ts";
import { InvalidExpressionError } from "../../domain/expressions/errors.ts";
import { UserError } from "../../domain/errors.ts";

/** Evaluation result for a single workflow. */
export interface WorkflowEvaluateItemData {
Expand Down Expand Up @@ -239,7 +241,29 @@ async function evaluateWorkflowInternal(
continue;
}

const items = deps.evaluateCel(inMatch[1], context);
let items: unknown;
try {
items = deps.evaluateCel(inMatch[1], context);
} catch (error) {
if (
error instanceof InvalidExpressionError &&
error.message.includes("unresolved Promise")
) {
throw new UserError(
`forEach.in expression '$\{{ ${inMatch[1]} }}' returned an ` +
`unresolved Promise.\n\n` +
`forEach.in is evaluated synchronously and cannot await ` +
`async CEL functions (data.latest, data.findByTag, ` +
`data.findBySpec, data.query).\n\n` +
`Fix: move the async call into a parent workflow's ` +
`task.inputs (which IS awaited) and have the child ` +
`iterate over inputs.<name>. See:\n` +
`.claude/skills/swamp-workflow/references/` +
`nested-workflows.md#when-to-use-nested-workflows`,
);
}
throw error;
}
const itemName = stepData.forEach.item;
const nameHasExpression = /\$\{\{.+?\}\}/.test(stepData.name);

Expand Down
Loading