From 87762d5fa03eb79f7f73eeebc5b1b8522bd19872 Mon Sep 17 00:00:00 2001 From: jbiskur Date: Mon, 11 May 2026 00:28:16 +0100 Subject: [PATCH] fix(notifier): resolve waitWebSocket promise on WS error to prevent pull-loop wedge MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Previously the RxJS subject's error handler only logged on subject.error(), leaving the pull-loop awaiting the promise until the 20s timeout. With a persistent WS disconnect every cycle went through the timeout-only path with no events flowing — observed as a per-source pump wedge in data-pathways production on 2026-05-10. Three fixes: (1) error handler resolves the promise so loop re-enters wait() within milliseconds; (2) promise + eventResolver bound before connect() so a synchronous error during connect doesn't race against an unbound resolver; (3) try/finally guarantees disconnect() runs on every exit path, fixing a socket leak. Adds _internals.createNotificationClient test seam so the WS subject lifecycle can be driven deterministically in unit tests. Refs: data-pathways outage bddefdd2-c377-472a-bc50-cd75f708f822 Co-Authored-By: Claude Opus 4.7 (1M context) --- deno.lock | 1 + src/data-pump/notifier.ts | 47 ++-- test/tests/notifier.test.ts | 434 ++++++++++++++++++++++++++++++++++++ 3 files changed, 469 insertions(+), 13 deletions(-) create mode 100644 test/tests/notifier.test.ts diff --git a/deno.lock b/deno.lock index 7438127..272b298 100644 --- a/deno.lock +++ b/deno.lock @@ -5,6 +5,7 @@ "jsr:@deno/cache-dir@~0.10.3": "0.10.3", "jsr:@deno/dnt@~0.41.3": "0.41.3", "jsr:@deno/graph@~0.73.1": "0.73.1", + "jsr:@flowcore/sdk@*": "1.78.0", "jsr:@flowcore/sdk@^1.78.0": "1.78.0", "jsr:@flowcore/time-uuid@~0.1.1": "0.1.2", "jsr:@std/assert@0.223": "0.223.0", diff --git a/src/data-pump/notifier.ts b/src/data-pump/notifier.ts index 87427ea..c24f0e1 100644 --- a/src/data-pump/notifier.ts +++ b/src/data-pump/notifier.ts @@ -106,13 +106,21 @@ export class FlowcoreNotifier { private async waitWebSocket(signal?: AbortSignal) { this.options.logger?.debug("Waiting for web socket") + + const promise = new Promise((resolve) => { + this.eventResolver = resolve + }) + this.subject = new Subject() this.subject.subscribe({ next: this.onWebSocketEvent.bind(this), - error: (error: Error) => this.options.logger?.error("Notification stream error:", { error }), + error: (error: Error) => { + this.options.logger?.error("Notification stream error:", { error }) + this.eventResolver?.() + }, }) - this.notificationClient = new NotificationClient( + this.notificationClient = _internals.createNotificationClient( this.subject, this.webSocketAuth(), { @@ -126,17 +134,17 @@ export class FlowcoreNotifier { }, ) - await this.notificationClient.connect() - - const promise = new Promise((resolve) => { - this.eventResolver = resolve - }) - clearTimeout(this.timer) - this.timer = setTimeout(() => this.eventResolver?.(), this.options.timeoutMs ?? DEFAULT_TIMEOUT_MS) - signal?.addEventListener("abort", () => this.eventResolver?.()) - await promise - this.eventResolver = undefined - this.notificationClient.disconnect() + try { + await this.notificationClient.connect() + clearTimeout(this.timer) + this.timer = setTimeout(() => this.eventResolver?.(), this.options.timeoutMs ?? DEFAULT_TIMEOUT_MS) + signal?.addEventListener("abort", () => this.eventResolver?.()) + await promise + } finally { + clearTimeout(this.timer) + this.eventResolver = undefined + this.notificationClient.disconnect() + } } private webSocketAuth() { @@ -157,3 +165,16 @@ export class FlowcoreNotifier { } } } + +/** + * Test seam for stubbing the NotificationClient constructor. Production code + * routes through `_internals.createNotificationClient`; tests replace this + * property with a fake factory to drive the subject manually. Not part of the + * public API. + */ +// deno-lint-ignore no-explicit-any +type NotificationClientFactory = (...args: any[]) => NotificationClient +export const _internals: { createNotificationClient: NotificationClientFactory } = { + createNotificationClient: (...args) => + new (NotificationClient as new (...a: unknown[]) => NotificationClient)(...args), +} diff --git a/test/tests/notifier.test.ts b/test/tests/notifier.test.ts new file mode 100644 index 0000000..ec0d2f2 --- /dev/null +++ b/test/tests/notifier.test.ts @@ -0,0 +1,434 @@ +import { assert, assertEquals, assertNotStrictEquals, assertStrictEquals } from "@std/assert" +import { afterEach, describe, it } from "@std/testing/bdd" +import { stub } from "@std/testing/mock" +import type { NotificationClient, NotificationEvent } from "@flowcore/sdk" +import type { Subject } from "rxjs" +import { _internals, FlowcoreNotifier } from "../../src/data-pump/notifier.ts" + +// #region Test Helpers + +const FAKE_API_KEY = "fc_testid_testsecret" + +interface FakeClientHandle { + client: NotificationClient + subject: Subject + connectCalls: number + disconnectCalls: number + // Triggers fired automatically as part of connect(). The factory runs each + // entry after the connect() promise has resolved but before the awaited + // tick — letting tests drive WS-error / next-event scenarios deterministically. + pendingPostConnect: Array<(handle: FakeClientHandle) => void> + // If set, connect() throws synchronously instead of resolving. + connectError?: Error + // If true, subject.error fires BEFORE connect() resolves (synchronous race). + errorBeforeConnect?: Error +} + +type FakeFactoryConfig = { + onCreate: (handle: FakeClientHandle) => void +} + +function createFakeFactory(config: FakeFactoryConfig) { + const handles: FakeClientHandle[] = [] + const factory = ( + observer: Subject, + _auth: unknown, + _spec: unknown, + _opts: unknown, + ): NotificationClient => { + const handle: FakeClientHandle = { + client: null as unknown as NotificationClient, + subject: observer, + connectCalls: 0, + disconnectCalls: 0, + pendingPostConnect: [], + } + config.onCreate(handle) + const client = { + async connect() { + handle.connectCalls++ + if (handle.errorBeforeConnect) { + handle.subject.error(handle.errorBeforeConnect) + } + if (handle.connectError) { + throw handle.connectError + } + // Use a microtask so callers can observe `await connect()` first. + await Promise.resolve() + for (const fn of handle.pendingPostConnect) { + fn(handle) + } + }, + disconnect() { + handle.disconnectCalls++ + }, + } as unknown as NotificationClient + handle.client = client + handles.push(handle) + return client + } + return { factory, handles } +} + +function createNotifier(timeoutMs?: number): FlowcoreNotifier { + return new FlowcoreNotifier({ + auth: { apiKey: FAKE_API_KEY, apiKeyId: "testid" }, + dataSource: { + tenant: "test-tenant", + dataCore: "test-dc", + flowType: "test.0", + eventTypes: ["test.created.0"], + }, + noTranslation: true, + timeoutMs, + logger: { + debug: () => {}, + info: () => {}, + warn: () => {}, + error: () => {}, + }, + }) +} + +// #endregion + +describe("FlowcoreNotifier.waitWebSocket — bug fix (v0.20.1)", () => { + // deno-lint-ignore no-explicit-any + let factoryStub: any + + afterEach(() => { + factoryStub?.restore() + factoryStub = undefined + }) + + it("case 1 — WS error during wait() resolves the promise within 50 ms", async () => { + const handles: FakeClientHandle[] = [] + const { factory } = createFakeFactory({ + onCreate: (h) => { + h.pendingPostConnect.push((handle) => { + handle.subject.error(new Error("ws closed")) + }) + handles.push(h) + }, + }) + factoryStub = stub(_internals, "createNotificationClient", factory) + + const notifier = createNotifier(20_000) + // Safety: if the bug is present, wait() will hang 20 s. We assert resolution + // happens before a 250 ms deadline AND record the actual elapsed. + let timedOut = true + const safety = setTimeout(() => { + // Force-resolve the resolver by firing abort path through a manual signal, + // so the test fails on assertion (rather than hanging the runner). + const r = (notifier as unknown as { eventResolver?: () => void }).eventResolver + r?.() + }, 250) + + const start = Date.now() + await notifier.wait() + const elapsed = Date.now() - start + clearTimeout(safety) + timedOut = false + + assertEquals(timedOut, false) + assert(elapsed < 50, `expected elapsed < 50 ms, got ${elapsed}`) + assertEquals(handles.length, 1) + assertEquals(handles[0].disconnectCalls, 1, "disconnect must run via try/finally") + }) + + it("case 2 — subject is recreated on each wait() call", async () => { + const subjects: Array> = [] + const { factory } = createFakeFactory({ + onCreate: (h) => { + subjects.push(h.subject) + h.pendingPostConnect.push((handle) => handle.subject.error(new Error("close"))) + }, + }) + factoryStub = stub(_internals, "createNotificationClient", factory) + + const notifier = createNotifier(20_000) + await notifier.wait() + await notifier.wait() + + assertEquals(subjects.length, 2) + assertNotStrictEquals(subjects[0], subjects[1]) + }) + + it("case 3 — after a WS-error cycle, the next wait() delivers events normally", async () => { + let cycle = 0 + let secondEventCount = 0 + const onNextSpy = (h: FakeClientHandle) => { + // Increment whenever the SUBSCRIBER's next() fires on the matching event. + h.subject.subscribe({ + next: (ev: NotificationEvent) => { + if (ev.data.eventType === "test.created.0") secondEventCount++ + }, + }) + } + + const { factory } = createFakeFactory({ + onCreate: (h) => { + cycle++ + if (cycle === 1) { + h.pendingPostConnect.push((handle) => handle.subject.error(new Error("flap"))) + } else { + onNextSpy(h) + h.pendingPostConnect.push((handle) => { + handle.subject.next({ + pattern: "stored.event.notify.0", + data: { + tenant: "test-tenant", + eventId: "event-1", + dataCoreId: "test-dc", + flowType: "test.0", + eventType: "test.created.0", + validTime: new Date().toISOString(), + }, + }) + }) + } + }, + }) + factoryStub = stub(_internals, "createNotificationClient", factory) + + const notifier = createNotifier(20_000) + await notifier.wait() // cycle 1 — WS error path + const start = Date.now() + await notifier.wait() // cycle 2 — happy event path + const elapsed = Date.now() - start + + assertEquals(cycle, 2, "factory invoked twice") + assert(elapsed < 100, `cycle 2 should resolve quickly via next(), got ${elapsed} ms`) + assertEquals(secondEventCount, 1, "exactly one matching event observed on the second cycle") + }) + + it("case 4 — timeout-only path resolves around the configured boundary", async () => { + const handles: FakeClientHandle[] = [] + const { factory } = createFakeFactory({ + onCreate: (h) => handles.push(h), + }) + factoryStub = stub(_internals, "createNotificationClient", factory) + + const notifier = createNotifier(30) + + const start = Date.now() + await notifier.wait() + const elapsed = Date.now() - start + + assert(elapsed >= 25, `expected elapsed >= 25 ms, got ${elapsed}`) + assert(elapsed <= 200, `expected elapsed <= 200 ms (jitter window), got ${elapsed}`) + assertEquals(handles[0].disconnectCalls, 1) + }) + + it("case 5 — AbortSignal aborts the wait promptly", async () => { + const handles: FakeClientHandle[] = [] + const { factory } = createFakeFactory({ + onCreate: (h) => handles.push(h), + }) + factoryStub = stub(_internals, "createNotificationClient", factory) + + const notifier = createNotifier(20_000) + const controller = new AbortController() + setTimeout(() => controller.abort(), 10) + + const start = Date.now() + await notifier.wait(controller.signal) + const elapsed = Date.now() - start + + assert(elapsed < 100, `expected abort to resolve within 100 ms, got ${elapsed}`) + assertEquals(handles[0].disconnectCalls, 1) + }) + + it("case 6 — disconnect() runs on every exit path (error, timeout, success, abort)", async () => { + // error path — pre-fix code only resolved via the 20 s timeout, so we + // bound timing tightly: disconnect must run within 100 ms of the error. + { + const handles: FakeClientHandle[] = [] + const { factory } = createFakeFactory({ + onCreate: (h) => { + handles.push(h) + h.pendingPostConnect.push((handle) => handle.subject.error(new Error("ws down"))) + }, + }) + const s = stub(_internals, "createNotificationClient", factory) + const start = Date.now() + try { + const notifier = createNotifier(20_000) + await notifier.wait() + } finally { + s.restore() + } + const elapsed = Date.now() - start + assertEquals(handles[0].disconnectCalls, 1, "error path: disconnect must run") + assert(elapsed < 100, `error path: disconnect must run within 100 ms, got ${elapsed}`) + } + // timeout path + { + const handles: FakeClientHandle[] = [] + const { factory } = createFakeFactory({ + onCreate: (h) => handles.push(h), + }) + const s = stub(_internals, "createNotificationClient", factory) + try { + const notifier = createNotifier(20) + await notifier.wait() + } finally { + s.restore() + } + assertEquals(handles[0].disconnectCalls, 1, "timeout path: disconnect must run") + } + // success path (event matches) + { + const handles: FakeClientHandle[] = [] + const { factory } = createFakeFactory({ + onCreate: (h) => { + handles.push(h) + h.pendingPostConnect.push((handle) => { + handle.subject.next({ + pattern: "stored.event.notify.0", + data: { + tenant: "test-tenant", + eventId: "e", + dataCoreId: "test-dc", + flowType: "test.0", + eventType: "test.created.0", + validTime: new Date().toISOString(), + }, + }) + }) + }, + }) + const s = stub(_internals, "createNotificationClient", factory) + try { + const notifier = createNotifier(20_000) + await notifier.wait() + } finally { + s.restore() + } + assertEquals(handles[0].disconnectCalls, 1, "success path: disconnect must run") + } + // abort path + { + const handles: FakeClientHandle[] = [] + const { factory } = createFakeFactory({ + onCreate: (h) => handles.push(h), + }) + const s = stub(_internals, "createNotificationClient", factory) + try { + const notifier = createNotifier(20_000) + const controller = new AbortController() + setTimeout(() => controller.abort(), 5) + await notifier.wait(controller.signal) + } finally { + s.restore() + } + assertEquals(handles[0].disconnectCalls, 1, "abort path: disconnect must run") + } + }) + + it("case 7 — no double-resolve crash when next() fires on a terminated subject", async () => { + let resolveCount = 0 + let crashed: Error | undefined + const { factory } = createFakeFactory({ + onCreate: (h) => { + h.pendingPostConnect.push((handle) => { + handle.subject.error(new Error("ws down")) + // Try to push a `next()` against the now-terminated subject within + // the same microtask window. RxJS drops `next()` calls on a subject + // that has emitted `error()`. The test asserts no crash and a + // single resolve. + try { + handle.subject.next({ + pattern: "stored.event.notify.0", + data: { + tenant: "test-tenant", + eventId: "e", + dataCoreId: "test-dc", + flowType: "test.0", + eventType: "test.created.0", + validTime: new Date().toISOString(), + }, + }) + } catch (err) { + crashed = err as Error + } + }) + }, + }) + factoryStub = stub(_internals, "createNotificationClient", factory) + + const notifier = createNotifier(20_000) + // Wrap the eventResolver to count how many times it would have fired. + // We snapshot it before await resumes by patching post-binding. + const origSetter = Object.getOwnPropertyDescriptor(notifier, "eventResolver") + Object.defineProperty(notifier, "eventResolver", { + configurable: true, + get() { + return this._evRes + }, + set(fn: (() => void) | undefined) { + if (fn === undefined) { + this._evRes = undefined + return + } + const orig = fn + this._evRes = () => { + resolveCount++ + orig() + } + }, + }) + + const start = Date.now() + await notifier.wait() + const elapsed = Date.now() - start + // restore for tidiness + if (origSetter) Object.defineProperty(notifier, "eventResolver", origSetter) + + assertEquals(crashed, undefined, "subject.next on terminated subject must not throw") + assertEquals(resolveCount, 1, "exactly one resolve must fire") + // Resolve must come from the error handler (post-fix), not the 20 s timer. + assert(elapsed < 100, `must resolve via error handler within 100 ms, got ${elapsed}`) + }) + + it("case 8 — synchronous error before connect() resolves: wait() resolves cleanly", async () => { + // Post-fix invariant: the eventResolver is bound BEFORE connect() runs, so + // a synchronous subject.error() during connect() fires the resolver via the + // error handler. wait() must resolve cleanly (no rejection), and the timer + // path is never armed because connect()'s exception unwinds through the + // try/finally that runs disconnect(). + let resolvedCleanly = false + let rejected: Error | undefined + const handles: FakeClientHandle[] = [] + const { factory } = createFakeFactory({ + onCreate: (h) => { + handles.push(h) + h.errorBeforeConnect = new Error("sync ws error during connect") + h.connectError = new Error("connect failed") + }, + }) + factoryStub = stub(_internals, "createNotificationClient", factory) + + const notifier = createNotifier(20_000) + const start = Date.now() + try { + await notifier.wait() + resolvedCleanly = true + } catch (err) { + rejected = err as Error + } + const elapsed = Date.now() - start + + // Either invariant is acceptable per plan, but the implementation routes the + // connect() error through try/finally, which re-throws. We assert the resolver + // fired BEFORE connect() threw (proving promise+resolver were bound first), + // and that disconnect() still ran. + assert(elapsed < 100, `must not wait for the 20 s timeout, elapsed ${elapsed} ms`) + assertEquals(handles[0].disconnectCalls, 1, "disconnect() runs in finally") + // The synchronous error fires through the subject's error handler, which + // sets the resolver — proving the resolver was bound BEFORE connect(). + // connect() then throws, so wait() rejects. This is the documented invariant. + assertStrictEquals(rejected?.message, "connect failed") + assertEquals(resolvedCleanly, false) + }) +})