From 356b83c2c3b556218478d40af7c2d1786ae3ef35 Mon Sep 17 00:00:00 2001 From: Elliott de Launay Date: Sat, 20 Jun 2026 20:57:15 +0000 Subject: [PATCH 1/2] feat(TaskSemaphore): light weight mutex wrapper --- src/utils/TaskSemaphore.ts | 40 ++++++ src/utils/__tests__/TaskSemaphore.spec.ts | 144 ++++++++++++++++++++++ 2 files changed, 184 insertions(+) create mode 100644 src/utils/TaskSemaphore.ts create mode 100644 src/utils/__tests__/TaskSemaphore.spec.ts diff --git a/src/utils/TaskSemaphore.ts b/src/utils/TaskSemaphore.ts new file mode 100644 index 0000000000..8f64be57fc --- /dev/null +++ b/src/utils/TaskSemaphore.ts @@ -0,0 +1,40 @@ +import { Semaphore } from "async-mutex" + +export class TaskSemaphore { + private sem: Semaphore + private _waiting = 0 + private _generation = 0 + + constructor(permits: number) { + this.sem = new Semaphore(permits) + } + + get available(): number { + return this.sem.getValue() + } + + get waiting(): number { + return this._waiting + } + + async acquire(): Promise<() => void> { + // Only count as waiting if the permit won't be granted immediately. + const willQueue = this.sem.isLocked() + const gen = this._generation + if (willQueue) this._waiting++ + try { + const [, release] = await this.sem.acquire() + if (willQueue && gen === this._generation) this._waiting-- + return release + } catch (e) { + if (willQueue && gen === this._generation) this._waiting-- + throw e + } + } + + cancel(): void { + this._waiting = 0 + this._generation++ + this.sem.cancel() + } +} diff --git a/src/utils/__tests__/TaskSemaphore.spec.ts b/src/utils/__tests__/TaskSemaphore.spec.ts new file mode 100644 index 0000000000..eee967e20f --- /dev/null +++ b/src/utils/__tests__/TaskSemaphore.spec.ts @@ -0,0 +1,144 @@ +import { TaskSemaphore } from "../TaskSemaphore" + +describe("TaskSemaphore", () => { + beforeEach(() => { + vi.useFakeTimers() + }) + + afterEach(() => { + vi.useRealTimers() + }) + + it("acquire() resolves immediately when permits are available", async () => { + const sem = new TaskSemaphore(2) + const release = await sem.acquire() + expect(sem.available).toBe(1) + expect(sem.waiting).toBe(0) + release() + }) + + it("second acquire() queues when no permits remain; resolves after release", async () => { + const sem = new TaskSemaphore(1) + const release1 = await sem.acquire() + expect(sem.available).toBe(0) + + let acquired = false + const p = sem.acquire().then((r) => { + acquired = true + return r + }) + + await Promise.resolve() + expect(sem.waiting).toBe(1) + expect(acquired).toBe(false) + + release1() + const release2 = await p + expect(acquired).toBe(true) + expect(sem.waiting).toBe(0) + release2() + }) + + it("release restores exactly one permit and unblocks one waiter", async () => { + const sem = new TaskSemaphore(1) + const release1 = await sem.acquire() + + const results: number[] = [] + const p1 = sem.acquire().then((r) => { + results.push(1) + return r + }) + const p2 = sem.acquire().then((r) => { + results.push(2) + return r + }) + + await Promise.resolve() + expect(sem.waiting).toBe(2) + + release1() + const r1 = await p1 + expect(results).toEqual([1]) + expect(sem.waiting).toBe(1) + + r1() + const r2 = await p2 + expect(results).toEqual([1, 2]) + expect(sem.waiting).toBe(0) + r2() + }) + + it("available and waiting return correct values at each step", async () => { + const sem = new TaskSemaphore(2) + expect(sem.available).toBe(2) + expect(sem.waiting).toBe(0) + + const r1 = await sem.acquire() + expect(sem.available).toBe(1) + expect(sem.waiting).toBe(0) + + const r2 = await sem.acquire() + expect(sem.available).toBe(0) + expect(sem.waiting).toBe(0) + + const p = sem.acquire() + await Promise.resolve() + expect(sem.waiting).toBe(1) + + r1() + await p.then((r) => r()) + expect(sem.available).toBe(1) + expect(sem.waiting).toBe(0) + + r2() + expect(sem.available).toBe(2) + }) + + it("cancel() rejects all queued waiters", async () => { + const sem = new TaskSemaphore(1) + const release = await sem.acquire() + + const errors: unknown[] = [] + const p1 = sem.acquire().catch((e) => errors.push(e)) + const p2 = sem.acquire().catch((e) => errors.push(e)) + + await Promise.resolve() + expect(sem.waiting).toBe(2) + + sem.cancel() + await Promise.all([p1, p2]) + + expect(errors).toHaveLength(2) + release() + }) + + it("waiting is 0 while an immediate acquire is in flight (permit available)", async () => { + const sem = new TaskSemaphore(2) + // Do NOT await — capture the promise before it settles. + const p = sem.acquire() + // Permit was available so nothing should be queued. + expect(sem.waiting).toBe(0) + const release = await p + expect(sem.waiting).toBe(0) + release() + }) + + it("cancel() resets waiting count to 0 synchronously", async () => { + const sem = new TaskSemaphore(1) + const release = await sem.acquire() + + const p1 = sem.acquire().catch(() => {}) + const p2 = sem.acquire().catch(() => {}) + + await Promise.resolve() + expect(sem.waiting).toBe(2) + + sem.cancel() + // Synchronous check — waiting must be 0 before any promise callbacks run. + expect(sem.waiting).toBe(0) + await Promise.all([p1, p2]) + + expect(sem.waiting).toBe(0) + release() + }) +}) From 33fa87ad200dd3ddc8101f0c134174ce40a79e1e Mon Sep 17 00:00:00 2001 From: Elliott de Launay Date: Mon, 22 Jun 2026 02:07:08 +0000 Subject: [PATCH 2/2] test(TaskSemaphore): increasing coverage --- src/utils/TaskSemaphore.ts | 25 +++++++++++++++++++++++ src/utils/__tests__/TaskSemaphore.spec.ts | 24 ++++++++++++++-------- 2 files changed, 41 insertions(+), 8 deletions(-) diff --git a/src/utils/TaskSemaphore.ts b/src/utils/TaskSemaphore.ts index 8f64be57fc..15db11c2e1 100644 --- a/src/utils/TaskSemaphore.ts +++ b/src/utils/TaskSemaphore.ts @@ -1,5 +1,24 @@ import { Semaphore } from "async-mutex" +/** + * A thin wrapper around `async-mutex`'s `Semaphore` that adds observable + * queue-depth (`waiting`) and safe bulk-cancellation (`cancel()`). + * + * **Why not use `Semaphore` directly?** + * `Semaphore` has no way to inspect how many callers are blocked waiting for a + * permit. `TaskSemaphore` tracks that count so callers can make scheduling + * decisions (e.g. "don't enqueue more work when the queue is already deep"). + * + * **`_waiting`** is incremented before `sem.acquire()` is awaited (only when + * the semaphore is already locked, i.e. the caller will actually block) and + * decremented once the permit is granted or the acquire is rejected. + * + * **`_generation`** is a monotonically-increasing counter bumped on every + * `cancel()` call. Each in-flight `acquire()` captures the generation at + * enqueue time; when the acquire settles it only adjusts `_waiting` if the + * generation hasn't changed, preventing stale decrements after a cancel has + * already reset the counter to 0. + */ export class TaskSemaphore { private sem: Semaphore private _waiting = 0 @@ -32,6 +51,12 @@ export class TaskSemaphore { } } + /** + * Rejects all queued waiters and resets the waiting count to 0. + * Does NOT release or alter any held permits — callers that already + * received a release function must still call it. + * The semaphore remains usable after cancellation. + */ cancel(): void { this._waiting = 0 this._generation++ diff --git a/src/utils/__tests__/TaskSemaphore.spec.ts b/src/utils/__tests__/TaskSemaphore.spec.ts index eee967e20f..8c8873b025 100644 --- a/src/utils/__tests__/TaskSemaphore.spec.ts +++ b/src/utils/__tests__/TaskSemaphore.spec.ts @@ -1,14 +1,6 @@ import { TaskSemaphore } from "../TaskSemaphore" describe("TaskSemaphore", () => { - beforeEach(() => { - vi.useFakeTimers() - }) - - afterEach(() => { - vi.useRealTimers() - }) - it("acquire() resolves immediately when permits are available", async () => { const sem = new TaskSemaphore(2) const release = await sem.acquire() @@ -141,4 +133,20 @@ describe("TaskSemaphore", () => { expect(sem.waiting).toBe(0) release() }) + + it("acquire() works after cancel() with permits still available", async () => { + const sem = new TaskSemaphore(1) + sem.cancel() // no waiters, no holders + const release = await sem.acquire() + expect(sem.available).toBe(0) + release() + expect(sem.available).toBe(1) + }) + + it("cancel() on an idle semaphore is a safe no-op", () => { + const sem = new TaskSemaphore(2) + expect(() => sem.cancel()).not.toThrow() + expect(sem.waiting).toBe(0) + expect(sem.available).toBe(2) + }) })