From 0c0699df3d128d40edd94e52547e32e76fbc477d Mon Sep 17 00:00:00 2001 From: slinkydeveloper Date: Wed, 8 Apr 2026 17:22:24 +0200 Subject: [PATCH 1/2] Playing with map returning Promise --- packages/libs/restate-sdk/src/context.ts | 2 +- packages/libs/restate-sdk/src/promises.ts | 49 ++++++---- .../src/promise_combinators.ts | 90 +++++++++++++++++++ .../test/promise_combinators.test.ts | 50 +++++++++++ 4 files changed, 173 insertions(+), 18 deletions(-) diff --git a/packages/libs/restate-sdk/src/context.ts b/packages/libs/restate-sdk/src/context.ts index 4919c75a..6eca65de 100644 --- a/packages/libs/restate-sdk/src/context.ts +++ b/packages/libs/restate-sdk/src/context.ts @@ -691,7 +691,7 @@ export type RestatePromise = Promise & { * If this mapper returns a value, this value will be used to resolve the returned {@link RestatePromise}. * If the mapper throws a {@link TerminalError}, this error will be used to reject the returned {@link RestatePromise}. */ - map(mapper: (value?: T, failure?: TerminalError) => U): RestatePromise; + map(mapper: (value?: T, failure?: TerminalError) => U | Promise): RestatePromise; }; /** diff --git a/packages/libs/restate-sdk/src/promises.ts b/packages/libs/restate-sdk/src/promises.ts index d5ac5ff8..2a9f940a 100644 --- a/packages/libs/restate-sdk/src/promises.ts +++ b/packages/libs/restate-sdk/src/promises.ts @@ -73,7 +73,7 @@ export abstract class InternalRestatePromise implements RestatePromise { abstract finally(onfinally: (() => void) | undefined | null): Promise; abstract map( - mapper: (value?: T, failure?: TerminalError) => U + mapper: (value?: T, failure?: TerminalError) => U | Promise ): RestatePromise; abstract orTimeout(millis: Duration | number): RestatePromise; @@ -170,7 +170,7 @@ abstract class BaseRestatePromise extends InternalRestatePromise { ) as RestatePromise; } - map(mapper: (value?: T, failure?: TerminalError) => U): RestatePromise { + map(mapper: (value?: T, failure?: TerminalError) => U | Promise): RestatePromise { return new MappedRestatePromise(this[RESTATE_CTX_SYMBOL], this, mapper); } @@ -338,15 +338,15 @@ export class MappedRestatePromise extends BaseRestatePromise { constructor( ctx: ContextImpl, readonly inner: InternalRestatePromise, - mapper: (value?: T, failure?: TerminalError) => U + mapper: (value?: T, failure?: TerminalError) => U | Promise ) { super(ctx); - this.publicPromiseMapper = (value?: T, failure?: TerminalError) => { + this.publicPromiseMapper = async (value?: T, failure?: TerminalError) => { try { - return Promise.resolve(mapper(value, failure)); + return await mapper(value, failure); } catch (e) { if (e instanceof TerminalError) { - return Promise.reject(e); + throw e; } else { ctx.abortAttempt(e); return pendingPromise(); @@ -382,30 +382,40 @@ export class MappedRestatePromise extends BaseRestatePromise { } export class ConstRestatePromise extends InternalRestatePromise { + private _constPromise?: Promise; + private constructor( - private readonly constPromise: Promise, + // Factory for the underlying promise. Called at most once, memoized in + // `_constPromise`. This lets `map` be lazy: the mapper is only invoked + // when someone actually awaits the result (via then/catch/finally/publicPromise), + // matching the contract documented on RestatePromise.map. + private readonly promiseFactory: () => Promise, private readonly settled: boolean ) { super(); } + private get constPromise(): Promise { + return (this._constPromise ??= this.promiseFactory()); + } + static resolve(value: T): ConstRestatePromise> { - return new ConstRestatePromise(Promise.resolve(value), true); + return new ConstRestatePromise(() => Promise.resolve(value), true); } static reject(reason: TerminalError): ConstRestatePromise { - return new ConstRestatePromise(Promise.reject(reason), true); + return new ConstRestatePromise(() => Promise.reject(reason), true); } static pending(): ConstRestatePromise { - return new ConstRestatePromise(pendingPromise(), false); + return new ConstRestatePromise(() => pendingPromise(), false); } static fromPromise( promise: Promise, settled: boolean ): ConstRestatePromise { - return new ConstRestatePromise(promise, settled); + return new ConstRestatePromise(() => promise, settled); } // --- Promise methods @@ -434,12 +444,17 @@ export class ConstRestatePromise extends InternalRestatePromise { return ConstRestatePromise.reject(new TimeoutError()); } - map(mapper: (value?: T, failure?: TerminalError) => U): RestatePromise { - return ConstRestatePromise.fromPromise( - this.constPromise.then( - (value) => mapper(value, undefined), - (reason) => mapper(undefined, reason as TerminalError) - ), + map( + mapper: (value?: T, failure?: TerminalError) => U | Promise + ): RestatePromise { + if (!this.settled) return this as unknown as RestatePromise; + const selfConstPromise = this.constPromise; + return new ConstRestatePromise( + () => + selfConstPromise.then( + (value) => mapper(value, undefined), + (reason) => mapper(undefined, reason as TerminalError) + ), this.settled ); } diff --git a/packages/tests/restate-e2e-services/src/promise_combinators.ts b/packages/tests/restate-e2e-services/src/promise_combinators.ts index f0477264..277df7c6 100644 --- a/packages/tests/restate-e2e-services/src/promise_combinators.ts +++ b/packages/tests/restate-e2e-services/src/promise_combinators.ts @@ -9,6 +9,7 @@ import * as restate from "@restatedev/restate-sdk"; import { REGISTRY } from "./services.js"; +import {setTimeout} from "node:timers/promises" const promiseCombinators = restate.service({ name: "PromiseCombinators", @@ -158,6 +159,95 @@ const promiseCombinators = restate.service({ return "unexpected"; }); }, + + // --- Async map on ConstRestatePromise --- + + resolveAsyncMap: async ( + _ctx: restate.Context, + value: string + ): Promise => { + // async mapper on a resolved const promise + return RestatePromise.resolve(value).map(async (v) => { + return `mapped:${v ?? ""}`; + }); + }, + + rejectAsyncMapRecover: async ( + _ctx: restate.Context, + message: string + ): Promise => { + // async mapper recovers from a rejected const promise + return RestatePromise.reject( + new restate.TerminalError(message) + ).map(async (_v, err) => { + return `recovered:${err?.message ?? ""}`; + }); + }, + + resolveAsyncMapChained: async ( + _ctx: restate.Context, + value: string + ): Promise => { + // chained async maps on a resolved const promise + return RestatePromise.resolve(value) + .map(async (v) => `${v ?? ""}-a`) + .map(async (v) => `${v ?? ""}-b`) + .map(async (v) => `${v ?? ""}-c`); + }, + + resolveAsyncMapWithCtxRun: async ( + ctx: restate.Context, + value: string + ): Promise => { + // async mapper that performs a ctx.run inside — verifies determinism: + // the ctx.run must be journaled exactly once across replays even though + // the mapper is a microtask-deferred async closure. + return RestatePromise.resolve(value).map(async (v) => { + const suffix = await ctx.run("append", () => "ran"); + return `${v ?? ""}-${suffix}`; + }); + }, + + resolveAsyncMapThrows: async ( + _ctx: restate.Context, + input: { value: string; errorMessage: string } + ): Promise => { + // async mapper throws TerminalError — must propagate as rejection + return RestatePromise.resolve(input.value).map(async () => { + throw new restate.TerminalError(input.errorMessage); + }); + }, + + resolveAsyncMapOrTimeout: async ( + _ctx: restate.Context, + value: string + ): Promise => { + // resolve().map(async).orTimeout() — mapped promise inherits settled=true, + // so orTimeout returns `this` and the async mapper still runs to completion. + return RestatePromise.resolve(value) + .map(async (v) => `mapped:${v ?? ""}`) + .orTimeout(1); + }, + + allSettledAsyncMapWithCtxRun: async ( + ctx: restate.Context, + values: string[] + ): Promise => { + // Build N const RestatePromises, each with an async mapper that calls ctx.run, + // then await them together via RestatePromise.allSettled. + // Verifies: (a) mappers fire lazily (only when allSettled consumes them), + // (b) each ctx.run is journaled deterministically, (c) results come back in order. + const promises = values.map((v, i) => + RestatePromise.resolve(v).map(async (inner) => { + const suffix = await ctx.run(`run-${i}`, async () => { + await setTimeout(Math.random() *1000) + return `ran-${i}`; + }); + return `${inner ?? ""}:${suffix}`; + }) + ); + return RestatePromise.all(promises); + }, }, }); diff --git a/packages/tests/restate-e2e-services/test/promise_combinators.test.ts b/packages/tests/restate-e2e-services/test/promise_combinators.test.ts index 14a2d6b1..55310da4 100644 --- a/packages/tests/restate-e2e-services/test/promise_combinators.test.ts +++ b/packages/tests/restate-e2e-services/test/promise_combinators.test.ts @@ -130,4 +130,54 @@ describe("PromiseCombinators", () => { const result = await client.raceEmptyOrTimeoutMapped(); expect(result).toBe("timeout"); }); + + // --- Async map on ConstRestatePromise --- + + it("resolve().map(async) returns the mapped value", async () => { + const result = await client.resolveAsyncMap("hello"); + expect(result).toBe("mapped:hello"); + }); + + it("reject().map(async) can recover from rejection", async () => { + const result = await client.rejectAsyncMapRecover("boom"); + expect(result).toBe("recovered:boom"); + }); + + it("resolve().map(async).map(async).map(async) chains correctly", async () => { + const result = await client.resolveAsyncMapChained("start"); + expect(result).toBe("start-a-b-c"); + }); + + it("resolve().map(async) can perform ctx.run inside the mapper", async () => { + const result = await client.resolveAsyncMapWithCtxRun("val"); + expect(result).toBe("val-ran"); + }); + + it("resolve().map(async) propagates TerminalError thrown in mapper", async () => { + await expect( + client.resolveAsyncMapThrows({ value: "x", errorMessage: "mapper fail" }) + ).rejects.toThrow("mapper fail"); + }); + + it("resolve().map(async).orTimeout() returns the mapped value", async () => { + const result = await client.resolveAsyncMapOrTimeout("hello"); + expect(result).toBe("mapped:hello"); + }); + + it("allSettled over many resolve().map(async ctx.run) preserves order and journals each run", async () => { + const result = await client.allSettledAsyncMapWithCtxRun([ + "a", + "b", + "c", + "d", + "e", + ]); + expect(result).toEqual([ + "a:ran-0", + "b:ran-1", + "c:ran-2", + "d:ran-3", + "e:ran-4", + ]); + }); }); From 09d1fb8216dfdf0e17af39dde122c0351c2c0d1e Mon Sep 17 00:00:00 2001 From: slinkydeveloper Date: Mon, 13 Apr 2026 12:38:49 +0200 Subject: [PATCH 2/2] stuff --- packages/libs/restate-sdk/src/context.ts | 4 +++- packages/libs/restate-sdk/src/promises.ts | 4 +++- .../tests/restate-e2e-services/src/promise_combinators.ts | 6 +++--- 3 files changed, 9 insertions(+), 5 deletions(-) diff --git a/packages/libs/restate-sdk/src/context.ts b/packages/libs/restate-sdk/src/context.ts index 6eca65de..0e2d1110 100644 --- a/packages/libs/restate-sdk/src/context.ts +++ b/packages/libs/restate-sdk/src/context.ts @@ -691,7 +691,9 @@ export type RestatePromise = Promise & { * If this mapper returns a value, this value will be used to resolve the returned {@link RestatePromise}. * If the mapper throws a {@link TerminalError}, this error will be used to reject the returned {@link RestatePromise}. */ - map(mapper: (value?: T, failure?: TerminalError) => U | Promise): RestatePromise; + map( + mapper: (value?: T, failure?: TerminalError) => U | Promise + ): RestatePromise; }; /** diff --git a/packages/libs/restate-sdk/src/promises.ts b/packages/libs/restate-sdk/src/promises.ts index 2a9f940a..f42dc210 100644 --- a/packages/libs/restate-sdk/src/promises.ts +++ b/packages/libs/restate-sdk/src/promises.ts @@ -170,7 +170,9 @@ abstract class BaseRestatePromise extends InternalRestatePromise { ) as RestatePromise; } - map(mapper: (value?: T, failure?: TerminalError) => U | Promise): RestatePromise { + map( + mapper: (value?: T, failure?: TerminalError) => U | Promise + ): RestatePromise { return new MappedRestatePromise(this[RESTATE_CTX_SYMBOL], this, mapper); } diff --git a/packages/tests/restate-e2e-services/src/promise_combinators.ts b/packages/tests/restate-e2e-services/src/promise_combinators.ts index 277df7c6..ebb1b4a7 100644 --- a/packages/tests/restate-e2e-services/src/promise_combinators.ts +++ b/packages/tests/restate-e2e-services/src/promise_combinators.ts @@ -9,7 +9,7 @@ import * as restate from "@restatedev/restate-sdk"; import { REGISTRY } from "./services.js"; -import {setTimeout} from "node:timers/promises" +import { setTimeout } from "node:timers/promises"; const promiseCombinators = restate.service({ name: "PromiseCombinators", @@ -240,8 +240,8 @@ const promiseCombinators = restate.service({ const promises = values.map((v, i) => RestatePromise.resolve(v).map(async (inner) => { const suffix = await ctx.run(`run-${i}`, async () => { - await setTimeout(Math.random() *1000) - return `ran-${i}`; + await setTimeout(Math.random() * 1000); + return `ran-${i}`; }); return `${inner ?? ""}:${suffix}`; })