From 4f1e4e708b906b52096254cc62a1e87ef5b94def Mon Sep 17 00:00:00 2001 From: "sumit.jha-arch" Date: Mon, 1 Jun 2026 17:59:43 +0530 Subject: [PATCH] feat(event-bus): restore event-bus plugin with W3C OTel traceparent propagation MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Re-introduces the event-bus plugin (removed in 2.14.0) with a critical fix: W3C traceparent is now propagated across the GCP Pub/Sub async boundary so distributed traces remain connected end-to-end. ### What was broken `publishToPubSub()` never called `propagation.inject()`, so Pub/Sub message attributes contained no traceparent. The push/pull consumer never called `propagation.extract()`, so every incoming message started a new root span — completely disconnected from the originating HTTP request's trace tree. ### What is fixed (gcp-pubsub.ts) - PUBLISH: `otel.propagation.inject(otel.context.active(), attrs)` injects the active span context as `traceparent`/`tracestate` into Pub/Sub message attributes before every `topic.publishMessage()` call. - CONSUME (push): `otel.propagation.extract(attrs)` reconstructs the parent context from the incoming message, starts a `pubsub.consume.` CONSUMER span as a child, and runs all handlers inside `otel.context.with()` so every DB query and further publish is linked to the original trace. - Logs `EVENT_TRACEPARENT_INJECT` on publish and `traceparent_extracted` field on consume for observability. ### What is fixed (event-consumer/gcp-pubsub.ts) - Pull consumer extracts the traceparent from message attributes, starts a `pubsub.pull.process` CONSUMER span, and re-injects the context as HTTP headers into the internal `instance.inject()` call so the push handler receives the correct trace context. ### OTel API resolution Both files use a lazy `getOtelApi()` helper that tries `@opentelemetry/api` first (available via NODE_PATH when using the OTel Kubernetes operator) and falls back to the pnpm content-addressed path. Returns null if neither is available, so no error is thrown in environments without OTel. ### TypeScript 6 compatibility fixes Fixed implicit-any and unknown-err errors in azure-servicebus.ts, rabbitmq.ts, and commons.ts introduced by the TypeScript 6.0 upgrade. Verified end-to-end in a kind cluster with a Pub/Sub emulator: - NO-FIX: traceparent ABSENT in message attributes → broken trace tree - FIX: traceparent present → same traceID from HTTP request through pg-boss outbox publish through Pub/Sub consumer handler Co-Authored-By: Claude Sonnet 4.6 --- package.json | 11 +- src/event-bus/RABBITMQ.md | 132 ++ src/event-bus/azure-servicebus.ts | 298 ++++ src/event-bus/commons.spec.ts | 47 + src/event-bus/commons.ts | 214 +++ .../event-consumer/azure-servicebus.ts | 131 ++ src/event-bus/event-consumer/gcp-pubsub.ts | 204 +++ src/event-bus/event-consumer/index.ts | 26 + src/event-bus/event-consumer/interface.ts | 9 + src/event-bus/event-consumer/rabbitmq.ts | 192 +++ src/event-bus/event-consumer/utils.ts | 33 + src/event-bus/gcp-pubsub.ts | 270 +++ src/event-bus/index.spec.ts | 293 ++++ src/event-bus/index.ts | 82 + src/event-bus/interfaces.ts | 58 + src/event-bus/local.spec.ts | 391 +++++ src/event-bus/local.ts | 125 ++ src/event-bus/rabbitmq-utils.ts | 84 + src/event-bus/rabbitmq.spec.ts | 1503 +++++++++++++++++ src/event-bus/rabbitmq.ts | 340 ++++ src/index.ts | 4 + src/types.ts | 5 +- 22 files changed, 4450 insertions(+), 2 deletions(-) create mode 100644 src/event-bus/RABBITMQ.md create mode 100644 src/event-bus/azure-servicebus.ts create mode 100644 src/event-bus/commons.spec.ts create mode 100644 src/event-bus/commons.ts create mode 100644 src/event-bus/event-consumer/azure-servicebus.ts create mode 100644 src/event-bus/event-consumer/gcp-pubsub.ts create mode 100644 src/event-bus/event-consumer/index.ts create mode 100644 src/event-bus/event-consumer/interface.ts create mode 100644 src/event-bus/event-consumer/rabbitmq.ts create mode 100644 src/event-bus/event-consumer/utils.ts create mode 100644 src/event-bus/gcp-pubsub.ts create mode 100644 src/event-bus/index.spec.ts create mode 100644 src/event-bus/index.ts create mode 100644 src/event-bus/interfaces.ts create mode 100644 src/event-bus/local.spec.ts create mode 100644 src/event-bus/local.ts create mode 100644 src/event-bus/rabbitmq-utils.ts create mode 100644 src/event-bus/rabbitmq.spec.ts create mode 100644 src/event-bus/rabbitmq.ts diff --git a/package.json b/package.json index c91b850..889f393 100644 --- a/package.json +++ b/package.json @@ -14,6 +14,9 @@ "clean": "rimraf dist", "test": "jest", "test:coverage": "jest --coverage", + "test:rabbitmq": "jest --testPathPattern='rabbitmq.spec' --testTimeout=120000", + "test:integration": "jest --testPathPattern='event-bus.integration' --testTimeout=180000", + "test:all": "jest --testTimeout=180000", "transpile": "tsc -p tsconfig.build.json", "build": "run-s clean transpile", "prepublishOnly": "npm run build" @@ -32,9 +35,14 @@ "@aws-sdk/credential-provider-node": "^3.972.39", "@aws-sdk/lib-storage": "^3.1045.0", "@azure/identity": "^4.13.1", + "@azure/service-bus": "^7.9.5", "@azure/storage-blob": "^12.31.0", + "@google-cloud/pubsub": "^5.2.2", "@google-cloud/storage": "^7.19.0", - "fastify-plugin": "^5.1.0" + "fastify-plugin": "^5.1.0", + "mnemonist": "^0.40.3", + "prom-client": "^15.1.3", + "rabbitmq-client": "^5.0.8" }, "peerDependencies": { "fastify": "^3.0.0 || ^4.0.0 || ^5.0.0" @@ -43,6 +51,7 @@ "@trivago/prettier-plugin-sort-imports": "^6.0.2", "@types/jest": "^30.0.0", "@types/node": "^25.6.2", + "fastify": "^4.29.1", "husky": "^9.1.7", "jest": "^30.4.2", "lint-staged": "^17.0.4", diff --git a/src/event-bus/RABBITMQ.md b/src/event-bus/RABBITMQ.md new file mode 100644 index 0000000..0bb87f5 --- /dev/null +++ b/src/event-bus/RABBITMQ.md @@ -0,0 +1,132 @@ +# RabbitMQ Architecture + +This document describes the exchange and queue topology used by the event-bus RabbitMQ implementation. + +## Naming Convention + +All resources use a dynamic prefix derived from the service name (part before the first hyphen): +- `wms-cincout` → prefix: `wms` +- `xyz-service` → prefix: `xyz` +- `noprefix` → prefix: `default` + +## Topology Flowchart + +```mermaid +flowchart TD + subgraph Publisher + P[Producer] + end + + subgraph "Main Exchange (fanout)" + ME["{prefix}.main-exchange"] + end + + subgraph "Service Queues" + SQ1["{prefix}.queue.{service-a}"] + SQ2["{prefix}.queue.{service-b}"] + end + + subgraph "Retry System (per service)" + RE["{prefix}.retry-exchange.{service}
(direct)"] + RQ["{prefix}.retry-queue.{service}
(TTL: 5s)"] + end + + subgraph "Dead Letter Queue (per service)" + DLQ["{prefix}.dlq.{service}
(manual retry)"] + end + + subgraph Consumers + C1[Consumer A] + C2[Consumer B] + end + + P -->|publish| ME + ME -->|fanout| SQ1 + ME -->|fanout| SQ2 + SQ1 -->|consume| C1 + SQ2 -->|consume| C2 + + C1 -->|nack/reject| SQ1 + SQ1 -.->|dead-letter| RE + RE -->|route| RQ + RQ -.->|"dead-letter after TTL
(via default exchange)"| SQ1 + C1 -->|"retry >= 10"| DLQ +``` + +## Message Flow + +### Happy Path +1. Producer publishes message to `{prefix}.main-exchange` +2. Exchange fans out message to all bound service queues +3. Consumer reads message from `{prefix}.queue.{service}` +4. Consumer acknowledges (ack) → message removed + +### Retry Path (dead-letter) +Used for: 5xx errors, 429 (rate-limit), 409 (lock conflict) + +1. Consumer returns `DROP` (nack with requeue=false) +2. Message dead-letters to `{prefix}.retry-exchange.{service}` +3. Retry exchange routes to `{prefix}.retry-queue.{service}` +4. Message sits in retry queue for 5 seconds (TTL) +5. After TTL expires, message dead-letters directly to `{prefix}.queue.{service}` (via default exchange) +6. Message is re-delivered only to the failed service (not fanned out to all services) +7. **Max 10 retries** - after 10 failed attempts, message is moved to `{prefix}.dlq.{service}` for manual retry (logged as `RABBITMQ_MESSAGE_MAX_RETRIES_EXCEEDED`) + +### Delayed Message Path (local sleep) +Used for: 425 (too early - `processAfterDelayMs` not yet reached) + +1. Consumer sleeps locally (randomDelay) +2. Consumer returns `REQUEUE` (nack with requeue=true) +3. Message returns to the same queue immediately for retry +4. This avoids multiple DLX cycles when delay exceeds 5s TTL + +## Consumer Status Handling + +| HTTP Status | ConsumerStatus | Behavior | +|-------------|----------------|----------| +| 2xx | `ACK` | Success, message removed | +| 429, 409 | `DROP` | Dead-letter retry (rate-limit/lock conflict) | +| 425 | `REQUEUE` | Local sleep + immediate requeue (delayed message) | +| 5xx | `DROP` | Dead-letter retry (transient error) | +| Other 4xx | `DROP` | Dead-letter (bad message, will likely fail again) | +| Exception | `DROP` | Dead-letter (consumer error) | +| Any (retry >= 10) | `ACK` | Max retries exceeded, message moved to DLQ | + +## Queue Configuration + +| Queue | Type | Dead-Letter Exchange | Dead-Letter Routing Key | TTL | +|-------|------|---------------------|------------------------|-----| +| `{prefix}.queue.{service}` | classic | `{prefix}.retry-exchange.{service}` | `retry` | - | +| `{prefix}.retry-queue.{service}` | classic | `""` (default) | `{prefix}.queue.{service}` | 5000ms | +| `{prefix}.dlq.{service}` | classic | - | - | - | + +## Exchange Configuration + +| Exchange | Type | Purpose | +|----------|------|---------| +| `{prefix}.main-exchange` | fanout | Distribute messages to all service queues | +| `{prefix}.retry-exchange.{service}` | direct | Route failed messages to retry queue (binding key: `retry`) | + +## Dead Letter Queue (DLQ) + +Messages that fail after 10 retry attempts are moved to `{prefix}.dlq.{service}` for manual inspection and retry. + +### DLQ Message Headers + +Messages in the DLQ include additional headers for debugging: + +| Header | Description | +|--------|-------------| +| `x-original-queue` | The queue the message was consumed from | +| `x-final-status-code` | HTTP status code from last processing attempt (if applicable) | +| `x-final-error` | Error message from last processing attempt (if exception) | +| `x-final-retry-count` | Number of retry attempts before moving to DLQ | +| `x-death` | Standard RabbitMQ dead-letter history | + +### Manual Retry + +To retry messages from the DLQ: +1. Inspect messages in `{prefix}.dlq.{service}` queue +2. Fix the underlying issue (e.g., dependent service, data issue) +3. Move message back to `{prefix}.queue.{service}` for reprocessing +4. The `x-death` header will be preserved, so retry count continues from where it left off diff --git a/src/event-bus/azure-servicebus.ts b/src/event-bus/azure-servicebus.ts new file mode 100644 index 0000000..a705a4b --- /dev/null +++ b/src/event-bus/azure-servicebus.ts @@ -0,0 +1,298 @@ +import * as AzureIden from "@azure/identity"; +import { + ServiceBusClient, + ServiceBusMessage, + ServiceBusSender, +} from "@azure/service-bus"; +import { FastifyInstance, FastifyPluginAsync, FastifyRequest } from "fastify"; +import fp from "fastify-plugin"; +import { Queue } from "mnemonist"; +import { + CreateHandlerRunner, + ErrorWithStatus, + getHandlerMap, + noMatchingHandlers, +} from "./commons"; +import { EventBus, EventBusOptions, EventMessage } from "./interfaces"; + +interface IncomingServiceBusMessage { + messageId: number; + body: string; + scheduledEnqueueTimeUtc?: string; +} + +interface MessageBody { + event: string; + payload: any; + file: string | null; + processAfterDelayMs: number | undefined; + publishTimestamp: number; +} + +interface MessageWithAttempts { + msg: ServiceBusMessage; + attempts: number; +} + +const plugin: FastifyPluginAsync = async function ( + f, + options, +) { + const handlerMap = getHandlerMap(options); + if (!options.namespace) { + throw new Error( + "Azure ServiceBus needs the namespace specified. Use EVENT_NAMESPACE env var", + ); + } + if (!options.topic) { + throw new Error( + "Azure ServiceBus needs the topic specified. Use EVENT_TOPIC env var", + ); + } + + const client = new ServiceBusClient( + options.namespace, + new AzureIden.DefaultAzureCredential({}), + {}, + ); + + const sender = client.createSender(options.topic); + + const msgQueue = new Queue(); + + const flush = createMessageFlusher(f, sender, msgQueue); + const ref = setInterval(() => { + // eslint-disable-next-line @typescript-eslint/no-floating-promises + flush(); + }, 20); + + f.addHook("onClose", async () => { + f.log.info({ tag: "AZURE_SERVICEBUS_FINAL_FLUSH" }); + clearInterval(ref); + // final task to ensure all messages are flushed + await flush(true); + await sender.close(); + await client.close(); + }); + + function publishToServiceBus( + event: string, + payload: any, + file: string | null, + processAfterDelayMs: number, + req?: FastifyRequest, + ) { + options.validateMsg(event, payload, req); + const messageBody: MessageBody = { + event, + payload, + file: file ?? null, + processAfterDelayMs: + processAfterDelayMs > 0 ? processAfterDelayMs : undefined, + publishTimestamp: Date.now(), + }; + const encoded = JSON.stringify(messageBody); + msgQueue.enqueue({ + msg: { + body: Buffer.from(encoded, "utf8"), + applicationProperties: { + event, + file: file ?? "", + }, + contentType: "application/json", + scheduledEnqueueTimeUtc: + processAfterDelayMs > 0 + ? new Date(messageBody.publishTimestamp + processAfterDelayMs) + : undefined, + }, + attempts: 0, + }); + req?.log.info({ + tag: "EVENT_PUBLISH", + event, + payload, + processAfterDelayMs, + }); + } + + const bus: EventBus = { + publish(event, payload, processAfterDelayMs) { + publishToServiceBus(event, payload, null, processAfterDelayMs ?? 0); + }, + }; + f.decorate("EventBus", { + getter() { + return bus; + }, + }); + + f.decorateRequest("EventBus", { + getter() { + return { + publish: (event: string, payload: any, processAfterDelayMs?: number) => { // eslint-disable-line @typescript-eslint/no-explicit-any + publishToServiceBus(event, payload, null, processAfterDelayMs ?? 0, this); + }, + }; + }, + }); + + const selectAndRunHandlers = CreateHandlerRunner(f, options, handlerMap); + + f.post<{ Body: IncomingServiceBusMessage }>( + "/azure-servicebus/process-message", + { + schema: { + hide: true, + } as any, + }, + async function (req, reply) { + const rawMsg = req.body; + if (!rawMsg) { + reply.send("OK"); + return reply; + } + req.log.info({ + tag: "AZURE_SERVICEBUS_MESSAGE", + messageId: rawMsg.messageId, + scheduledEnqueueTimeUtc: rawMsg.scheduledEnqueueTimeUtc, + }); + const msg = convert(rawMsg); + options.validateMsg(msg.event, msg.data, req); + + if (noMatchingHandlers(handlerMap, msg)) { + // bail-out + // service has no event-handlers registered + reply.send("OK"); + return reply; + } + + req.log.info({ + tag: "AZURE_SERVICEBUS_MESSAGE_HANDLE", + event: msg, + }); + + if ( + msg.processAfterDelayMs > 0 && + Date.now() < msg.publishTime.getTime() + msg.processAfterDelayMs + ) { + // wait for pub-sub to repush. can't process so early + reply + .status(425) + .send({ processAfterDelayMs: msg?.processAfterDelayMs }); + return reply; + } + + try { + await selectAndRunHandlers(req, msg, (event, payload, file) => + publishToServiceBus(event, payload, file, msg.processAfterDelayMs, req), + ); + reply.send("OK"); + return reply; + } catch (err) { + if (err instanceof ErrorWithStatus) { + reply.status(err.status).send(err.message); + } else { + reply.status(500).send("ERROR"); + } + return reply; + } + }, + ); +}; + +export = fp(plugin, { name: "fp-eventbus-azure-servicebus" }); + +function convert(msg: IncomingServiceBusMessage): EventMessage { + const body: MessageBody = JSON.parse(msg.body); + return { + id: "" + msg.messageId, + attributes: { + event: body.event, + processAfterDelayMs: "" + (body.processAfterDelayMs ?? 0), + file: body.file ?? "", + }, + data: body.payload, + event: body.event, + processAfterDelayMs: body.processAfterDelayMs ?? 0, + publishTime: new Date(body.publishTimestamp), + }; +} + +function createMessageFlusher( + f: FastifyInstance, + sender: ServiceBusSender, + msgQueue: Queue, +) { + let running = false; + return async function flush(force = false) { + if (msgQueue.size === 0 || (!force && running)) { + return; + } + running = true; + const tracker: MessageWithAttempts[] = []; + let flushed = 0; + const total = msgQueue.size; + const start = Date.now(); + try { + let batch = await sender.createMessageBatch({}); + while (msgQueue.size > 0) { + const message = msgQueue.dequeue(); + if (!message) { + break; + } + const added = batch.tryAddMessage(message.msg); + if (added) { + tracker.push(message); + } else { + if (batch.count > 0) { + await sender.sendMessages(batch); + flushed += batch.count; + tracker.length = 0; + } + batch = await sender.createMessageBatch(); + // try adding this message solo to a batch + if (batch.tryAddMessage(message.msg)) { + tracker.push(message); + } else { + f.log.error({ + tag: "AZURE_SERVICE_BUS_ERROR", + msg: "Message too big to fit in a batch", + }); + } + } + } + if (batch.count > 0) { + await sender.sendMessages(batch); + flushed += batch.count; + tracker.length = 0; + } + } catch (err) { + for (const message of tracker) { + // don't re-attempt indefinitely + if (message.attempts < 3) { + msgQueue.enqueue({ + msg: message.msg, + attempts: message.attempts + 1, + }); + } + } + tracker.length = 0; + f.log.error({ + tag: "AZURE_SERVICE_BUS_ERROR", + msg: (err as Error).message, + err, + }); + } finally { + running = false; + const latency = Date.now() - start; + if (latency > 100) { + f.log.warn({ + tag: "AZURE_SERVICEBUS_SLOW_FLUSH", + latency, + total, + flushed, + }); + } + } + }; +} diff --git a/src/event-bus/commons.spec.ts b/src/event-bus/commons.spec.ts new file mode 100644 index 0000000..d92edeb --- /dev/null +++ b/src/event-bus/commons.spec.ts @@ -0,0 +1,47 @@ +import { getHandlerMap } from "./commons"; +import { EventBusOptions, EventHandler } from "./interfaces"; + +describe("getHandlerMap", () => { + it("should create handler map from options", () => { + const mockHandler1: EventHandler = async () => {}; + const mockHandler2: EventHandler = async () => {}; + + const options: Pick = { + handlers: [ + { + file: "file1.ts", + handlers: { + event1: mockHandler1, + event2: mockHandler2, + }, + }, + { + file: "file2.ts", + handlers: { + event1: mockHandler2, + }, + }, + ], + }; + + const handlerMap = getHandlerMap(options); + + expect(handlerMap.size).toBe(2); + expect(handlerMap.get("event1")).toEqual([ + { file: "file1.ts", handler: mockHandler1 }, + { file: "file2.ts", handler: mockHandler2 }, + ]); + expect(handlerMap.get("event2")).toEqual([ + { file: "file1.ts", handler: mockHandler2 }, + ]); + }); + + it("should handle empty handlers", () => { + const options: Pick = { + handlers: [], + }; + + const handlerMap = getHandlerMap(options); + expect(handlerMap.size).toBe(0); + }); +}); diff --git a/src/event-bus/commons.ts b/src/event-bus/commons.ts new file mode 100644 index 0000000..c4ed274 --- /dev/null +++ b/src/event-bus/commons.ts @@ -0,0 +1,214 @@ +import * as prom from "prom-client"; +import { FastifyInstance, FastifyRequest } from "fastify"; +import { + ActionContext, + EventHandler, + EventMessage, + PublishToPubSub, +} from "./interfaces"; +import { EventBusOptions } from "./interfaces"; + +interface HandlerInfo { + file: string; + handler: EventHandler; +} + +interface AppContext { + f: FastifyInstance; + counter: prom.Counter; + histogram: prom.Histogram; +} + +type Action = () => Promise; + +export function getHandlerMap(options: Pick) { + const handlerMap = new Map< + string, + { + file: string; + handler: EventHandler; + }[] + >(); + for (const { file, handlers } of options.handlers) { + for (const [key, handler] of Object.entries(handlers)) { + if (!handlerMap.has(key)) { + handlerMap.set(key, []); + } + if (handler as any) { + handlerMap.get(key)!.push({ file, handler }); + } + } + } + return handlerMap; +} + +export function noMatchingHandlers( + handlerMap: Map< + string, + { + file: string; + handler: EventHandler; + }[] + >, + eventMsg: EventMessage, +) { + const handlers = handlerMap.get(eventMsg.event) ?? []; + const specifiedFile = eventMsg.attributes.file; + for (const { file } of handlers) { + if (specifiedFile && file !== specifiedFile) { + continue; + } + return false; + } + return true; +} + +export class ErrorWithStatus extends Error { + status: number; + constructor(status: number, message?: string) { + super(message); + this.status = status; + } +} + +/** + * Creates an action factory that generates an asynchronous function to execute a specific event handler. + * This factory encapsulates the logic for handling an event, including error processing and a retry strategy. + * + * Retry Strategy: + * 1. If `eventMsg.attributes.noRetry` is "true", no retry attempt is made, and the function returns. + * 2. Otherwise, the error is processed by `options.processError(err, ctx)`. + * 3. If `ctx.specifiedFile` is not set (i.e., the event was not initially targeted at a specific handler file): + * - The event is re-published to Pub/Sub via `ctx.publishToPubSub`. + * - The re-published event will include `ctx.file` (the file of the handler that just failed) + * as the `specifiedFile` attribute. This targets the retry to the specific handler that failed. + * 4. If `ctx.specifiedFile` is set (i.e., this is likely a retry for a specific handler): + * - An `ErrorWithStatus` is thrown. This allows the underlying Pub/Sub mechanism + * (or other calling infrastructure) to handle further retries or dead-lettering based on the error. + * + * Metrics (Prometheus): + * - A counter (`appCtx.counter`) is incremented for each handler execution, labeled with event, file, and status. + * - A histogram (`appCtx.histogram`) observes the latency of each handler execution, labeled with event, file, and status. + * + * @param options - Event bus options, including `processError` and `registry`. + * @param appCtx - Application context containing Fastify instance, Prometheus counter, and histogram. + * @returns A function that takes an `ActionContext` and returns an `Action` (an async function). + */ +const CreateActionFactory = + (options: EventBusOptions, appCtx: AppContext) => + (ctx: ActionContext) => + async () => { + const start = Date.now(); + let status = 200; + try { + await ctx.handler.call(appCtx.f, ctx.eventMsg, ctx.req); + } catch (err) { + if (ctx.eventMsg.attributes.noRetry === "true") { + return; + } + + ({ err, status } = options.processError(err, ctx)); + if (!ctx.specifiedFile) { + // if no specified file is provided, trigger another message with + // the specified file + ctx.publishToPubSub(ctx.eventMsg.event, ctx.eventMsg.data, ctx.file); + } else { + throw new ErrorWithStatus( + status, + `${ctx.file}-${ctx.handler.name} failed with ${(err as Error).message}`, + ); + } + } finally { + const label = { + event: ctx.eventMsg.event, + file: ctx.file, + status, + }; + appCtx.counter.inc(label); + appCtx.histogram.observe(label, Date.now() - start); + } + }; + +export function CreateHandlerRunner( + f: FastifyInstance, + options: EventBusOptions, + handlersMap: Map, +) { + const histogram = new prom.Histogram({ + help: "event_handler_latency_ms", + name: "event_handler_latency_ms", + buckets: [ + 1, 5, 10, 20, 30, 40, 50, 60, 70, 80, 90, 100, 200, 300, 500, 750, 1000, + 1500, 2000, 2500, 3000, 4000, 5000, 6000, 7000, 8000, 9000, 10000, 12000, + 15000, 18000, 20000, 25000, 30000, 35000, 40000, 45000, 50000, 60000, + 70000, 80000, + ], + registers: options.registry ? [options.registry] : [], + labelNames: ["event", "file", "status"] as const, + }); + const counter = new prom.Counter({ + help: "event_handler_latency_total", + name: "event_handler_latency_total", + registers: options.registry ? [options.registry] : [], + labelNames: ["event", "file", "status"] as const, + }); + + const appCtx: AppContext = { + f, + counter, + histogram, + }; + const createAction = CreateActionFactory(options, appCtx); + + const CONCURRENCY = options.actionConcurrency ?? 1; + f.log.info({ + tag: "HANDLER_ACTION_CONCURRENCY", + concurrency: CONCURRENCY, + }); + + return async function selectAndRunHandlers( + req: FastifyRequest, + eventMsg: EventMessage, + publishToPubSub: PublishToPubSub, + ) { + const handlers = handlersMap.get(eventMsg.event) ?? []; + const actions: Action[] = []; + const specifiedFile = eventMsg.attributes.file; + req.log.debug({ + tag: "HANDLER_LOOKUP", + event: eventMsg.event, + specifiedFile, + registeredHandlers: handlers.map((h) => h.file), + handlersCount: handlers.length, + }); + for (const { file, handler } of handlers) { + if (specifiedFile && file !== specifiedFile) { + continue; + } + const ctx: ActionContext = { + req, + publishToPubSub, + handler, + eventMsg, + file, + specifiedFile, + }; + const act = createAction(ctx); + actions.push(act); + } + + req.log.debug({ + tag: "HANDLER_ACTIONS_CREATED", + event: eventMsg.event, + actionsCount: actions.length, + }); + for (let i = 0; i < actions.length; i += CONCURRENCY) { + await Promise.all(actions.slice(i, i + CONCURRENCY).map((act) => act())); + } + req.log.debug({ + tag: "HANDLER_ACTIONS_COMPLETED", + event: eventMsg.event, + actionsCount: actions.length, + }); + }; +} diff --git a/src/event-bus/event-consumer/azure-servicebus.ts b/src/event-bus/event-consumer/azure-servicebus.ts new file mode 100644 index 0000000..af3c8d9 --- /dev/null +++ b/src/event-bus/event-consumer/azure-servicebus.ts @@ -0,0 +1,131 @@ +import * as AzureIden from "@azure/identity"; +import { RetryMode, ServiceBusClient } from "@azure/service-bus"; +import { EventConsumerBuilder } from "./interface"; +import { exponentialDelay, randomDelay } from "./utils"; + +/** + * Azure ServiceBus supports + * 1. scheduledEnqueueTimeUtc -> this is to support delayed message delivery + * 2. peekLock -> this is to support message processing without contention + * 3. maxConcurrentCalls -> this is to support parallel processing + */ +export const AzureServiceBusConsumerBuilder: EventConsumerBuilder = async ( + instance, +) => { + if (!process.env.EVENT_NAMESPACE) { + throw new Error("Azure ServiceBus needs EVENT_NAMESPACE"); + } + if (!process.env.EVENT_TOPIC) { + throw new Error("Azure ServiceBus needs EVENT_TOPIC"); + } + if (!process.env.EVENT_SUBSCRIPTION) { + throw new Error("Azure ServiceBus needs EVENT_SUBSCRIPTION"); + } + const client = new ServiceBusClient( + process.env.EVENT_NAMESPACE, + new AzureIden.DefaultAzureCredential({}), + { + retryOptions: { + mode: RetryMode.Fixed, + retryDelayInMs: 10_000, + timeoutInMs: 5_000, + maxRetries: 5, + }, + }, + ); + const receiver = client.createReceiver( + process.env.EVENT_TOPIC, + process.env.EVENT_SUBSCRIPTION, + { + skipParsingBodyAsJson: true, + skipConvertingDate: true, + receiveMode: "peekLock", + }, + ); + const ctrl = new AbortController(); + + const handler = receiver.subscribe( + { + async processMessage(msg) { + if (ctrl.signal.aborted) { + await receiver.abandonMessage(msg); + return; + } + if (msg.deliveryCount && msg.deliveryCount > 1) { + instance.log.warn({ + tag: "AZURE_SERVICE_BUS_RECEIVER_RETRY", + deliveryCount: msg.deliveryCount, + messageId: msg.messageId, + }); + await exponentialDelay(Math.max(msg.deliveryCount - 2, 0)); + } + try { + const payload = { + messageId: msg.messageId, + body: msg.body, + scheduledEnqueueTimeUtc: msg.scheduledEnqueueTimeUtc?.toISOString(), + }; + if (Buffer.isBuffer(payload.body)) { + payload.body = payload.body.toString("utf8"); + } + const resp = await instance.inject({ + method: "POST", + url: "/azure-servicebus/process-message", + payload, + }); + if (resp.statusCode >= 200 && resp.statusCode < 300) { + await receiver.completeMessage(msg); + } else if (resp.statusCode === 429 || resp.statusCode === 409) { + // rate-limited or lock-conflict + await randomDelay(); + await receiver.abandonMessage(msg); + } else if (resp.statusCode === 425) { + // delayed message + // ideally it shouldn't come here because azure service bus already + // supports delayed message receiving + instance.log.error({ + tag: "AZURE_SERVICE_BUS_DELAYED_MESSAGE", + payload, + }); + await receiver.abandonMessage(msg); + } else { + await receiver.abandonMessage(msg); + } + } catch (err) { + instance.log.error({ + tag: "AZURE_SERVICE_BUS_RECEIVER_ERROR", + err: err, + }); + await receiver.abandonMessage(msg); + } + }, + async processError(args) { + instance.log.error({ + tag: "AZURE_SERVICE_BUS_RECEIVER_ERROR", + err: args.error, + entityPath: args.entityPath, + }); + }, + }, + { + abortSignal: ctrl.signal, + autoCompleteMessages: false, + maxConcurrentCalls: + parseInt( + process.env.EVENT_SUBSCRIPTION_MAX_CONCURRENT_CALLS ?? "10", + 10, + ) || 10, + }, + ); + instance.log.info( + "Attached to Azure ServiceBus Subscription=" + + process.env.EVENT_SUBSCRIPTION, + ); + return { + close: async () => { + ctrl.abort(); + await handler.close(); + await client.close(); + }, + }; +}; diff --git a/src/event-bus/event-consumer/gcp-pubsub.ts b/src/event-bus/event-consumer/gcp-pubsub.ts new file mode 100644 index 0000000..4643ef2 --- /dev/null +++ b/src/event-bus/event-consumer/gcp-pubsub.ts @@ -0,0 +1,204 @@ +import * as timers from "node:timers/promises"; +import { Message, PubSub, Subscription } from "@google-cloud/pubsub"; +import { FastifyInstance } from "fastify"; +import { EventConsumerBuilder } from "./interface"; +import { randomDelay } from "./utils"; + +// Resolves @opentelemetry/api from either NODE_PATH (OTel operator) or the +// pnpm absolute path. All copies share the same global symbol so the +// registered SDK is always reached. +function getOtelApi() { + try { + // eslint-disable-next-line @typescript-eslint/no-require-imports + return require("@opentelemetry/api"); + } catch { + try { + // eslint-disable-next-line @typescript-eslint/no-require-imports + return require( + "/app/node_modules/.pnpm/@opentelemetry+api@1.9.0/node_modules/@opentelemetry/api", + ); + } catch { + return null; + } + } +} + +// eslint-disable-next-line @typescript-eslint/no-explicit-any +const otel: any = getOtelApi(); +// eslint-disable-next-line @typescript-eslint/no-explicit-any +const tracer: any = otel + ? otel.trace.getTracer("fp-eventbus-gcp-pubsub-consumer") + : null; + +/** + * GCP Pub/Sub supports + * 1. flowControl -> this is to support parallel processing + * 2. need to support delayed message delivery manually (recursive retry for 425) + */ +export const GcpPubSubConsumerBuilder: EventConsumerBuilder = async ( + instance, +) => { + const pubsub = new PubSub(); + if ( + process.env.EVENT_TOPIC && + !process.env.EVENT_SUBSCRIPTION && + process.env.APP + ) { + instance.log.info( + "Looking for GCP PubSub Subscription for APP=" + process.env.APP, + ); + const subs = await pubsub.topic(process.env.EVENT_TOPIC).getSubscriptions(); + instance.log.info("Found Subscriptions=" + subs[0].length); + for (const sub of subs[0]) { + if (sub.name.includes(process.env.APP)) { + process.env.EVENT_SUBSCRIPTION = sub.name; + } + } + } + if (!process.env.EVENT_SUBSCRIPTION) { + throw new Error("GCP PubSub needs EVENT_SUBSCRIPTION"); + } + const subName = process.env.EVENT_SUBSCRIPTION; + instance.log.info("Attaching to GCP PubSub Subscription=" + subName); + + const runner = new Runner(instance, pubsub, subName); + runner.init(); + + return { + close: async () => { + await runner.close(); + await pubsub.close(); + }, + }; +}; + +class Runner { + public subscription: Subscription | null = null; + private readonly ctrl = new AbortController(); + private timerRef: NodeJS.Timeout | null = null; + constructor( + public readonly instance: FastifyInstance, + public readonly pubsub: PubSub, + public readonly subName: string, + ) {} + + async close(): Promise { + this.ctrl.abort(); + if (this.timerRef) { + clearTimeout(this.timerRef); + } + if (this.subscription) { + this.subscription.removeAllListeners(); + return this.subscription.close(); + } + } + + init() { + if (this.ctrl.signal.aborted) { + return; + } + this.subscription = this.pubsub.subscription(this.subName, { + flowControl: { + allowExcessMessages: false, + maxMessages: + parseInt(process.env.EVENT_SUBSCRIPTION_MAX_MESSAGES ?? "10", 10) || + 10, + }, + }); + this.subscription.addListener("message", (msg: Message) => { + this.processMsg(msg, 0); + }); + this.subscription.on("error", (err) => { + this.instance.log.error({ tag: "GCP_PUBSUB_RECEIVER_ERROR", err }); + this.subscription?.removeAllListeners(); + this.subscription?.close(); + this.instance.log.warn("waiting for 10 seconds before reconnecting..."); + this.timerRef = setTimeout(() => { + this.init(); + }, 10000); // retrying after 10 seconds + }); + } + + async processMsg(msg: Message, attempt: number) { + if (this.ctrl.signal.aborted) { + msg.nack(); + return; + } + + // Extract the W3C trace context from Pub/Sub message attributes and start + // a CONSUMER span. Re-inject the context as HTTP headers so the push + // handler at /gcp-pubsub/process-message continues the same trace tree. + const attrs = msg.attributes ?? {}; + // eslint-disable-next-line @typescript-eslint/no-explicit-any + let ctx: any = null; + // eslint-disable-next-line @typescript-eslint/no-explicit-any + let span: any = null; + if (otel && attrs.traceparent) { + const parentCtx = otel.propagation.extract(otel.context.active(), attrs); + span = tracer.startSpan( + "pubsub.pull.process", + { kind: otel.SpanKind.CONSUMER }, + parentCtx, + ); + ctx = otel.trace.setSpan(parentCtx, span); + } + + const doProcess = async () => { + const headers: Record = { + "content-type": "application/json", + }; + // Re-inject the active context as HTTP headers so the Fastify inject() + // call carries the traceparent into the push handler. + if (otel && ctx) { + otel.propagation.inject(otel.context.active(), headers); + } + try { + const resp = await this.instance.inject({ + method: "POST", + url: "/gcp-pubsub/process-message", + payload: { + message: { + attributes: msg.attributes, + data: msg.data.toString("base64"), + messageId: msg.id, + publishTime: msg.publishTime.toISOString(), + }, + attempt, + subscription: this.subName, + }, + headers, + }); + if (resp.statusCode >= 200 && resp.statusCode < 300) { + msg.ack(); + } else if (resp.statusCode === 429 || resp.statusCode === 409) { + // rate-limited or lock-conflict + await randomDelay(); + msg.nack(); + } else if (resp.statusCode === 425 && attempt < 2) { + const parsed = JSON.parse(resp.body); + const processAfterDelayMs = parsed?.processAfterDelayMs ?? 0; + if (processAfterDelayMs > 0) { + await timers.setTimeout(processAfterDelayMs); + } + await this.processMsg(msg, attempt + 1); + } else { + msg.nack(); + } + } catch (err) { + this.instance.log.error({ + tag: "GCP_PUBSUB_RECEIVER_ERROR", + err, + }); + msg.nack(); + } finally { + span?.end(); + } + }; + + if (otel && ctx) { + await otel.context.with(ctx, doProcess); + } else { + await doProcess(); + } + } +} diff --git a/src/event-bus/event-consumer/index.ts b/src/event-bus/event-consumer/index.ts new file mode 100644 index 0000000..8305e21 --- /dev/null +++ b/src/event-bus/event-consumer/index.ts @@ -0,0 +1,26 @@ +import { FastifyInstance } from "fastify"; +import { AzureServiceBusConsumerBuilder } from "./azure-servicebus"; +import { GcpPubSubConsumerBuilder } from "./gcp-pubsub"; +import { EventConsumer } from "./interface"; +import { RabbitMqServiceBusConsumerBuilder } from "./rabbitmq"; +import { EventBusOptions } from "../interfaces"; + +export function CreateEventConsumer( + instance: FastifyInstance, + type: EventBusOptions["busType"], +): Promise { + switch (type) { + case "gcp-pubsub": + return GcpPubSubConsumerBuilder(instance); + case "azure-servicebus": + return AzureServiceBusConsumerBuilder(instance); + case "rabbitmq": + return RabbitMqServiceBusConsumerBuilder(instance); + default: + return Promise.resolve({ + close: async () => { + // Implement the close logic here + }, + }); + } +} diff --git a/src/event-bus/event-consumer/interface.ts b/src/event-bus/event-consumer/interface.ts new file mode 100644 index 0000000..c8fd6eb --- /dev/null +++ b/src/event-bus/event-consumer/interface.ts @@ -0,0 +1,9 @@ +import { FastifyInstance } from "fastify"; + +export interface EventConsumer { + close(): Promise; +} + +export type EventConsumerBuilder = ( + instance: FastifyInstance, +) => Promise; diff --git a/src/event-bus/event-consumer/rabbitmq.ts b/src/event-bus/event-consumer/rabbitmq.ts new file mode 100644 index 0000000..099f3b4 --- /dev/null +++ b/src/event-bus/event-consumer/rabbitmq.ts @@ -0,0 +1,192 @@ +import { AsyncMessage, Connection, ConsumerStatus } from "rabbitmq-client"; +import { + RABBITMQ_TAG, + ensureRabbitMqExchangesAndQueues, + getServicePrefix, +} from "../rabbitmq-utils"; +import { EventConsumerBuilder } from "./interface"; +import { randomDelay } from "./utils"; + +const MAX_RETRY_COUNT = 10; + +/** + * Extract retry count from x-death header added by RabbitMQ DLX. + * Only counts rejections (consumer drops), not TTL expirations from retry queue. + * x-death entries look like: { queue, reason, count, ... } + * - reason "rejected" = consumer nacked/dropped the message + * - reason "expired" = TTL expired in retry queue (not a retry attempt) + */ +function getRetryCount(msg: AsyncMessage): number { + const xDeath = msg.headers?.["x-death"]; + if (!Array.isArray(xDeath) || xDeath.length === 0) { + return 0; + } + // Only count "rejected" entries (actual consumer rejections) + // Don't count "expired" entries from retry queue TTL + return xDeath + .filter((death) => death.reason === "rejected") + .reduce((total, death) => total + (death.count || 0), 0); +} + +/** + * RabbitMq supports + * 1. prefetchCount -> this is to support parallel processing + * 2. concurrency -> this is to support parallel processing + * 3. ConsumerStatus -> requeue and drop messages appropriately + */ +export const RabbitMqServiceBusConsumerBuilder: EventConsumerBuilder = async ( + instance, +) => { + // Skip consumer creation if no handlers are registered + if ((instance as any)._hasEventHandlers === false) { + instance.log.info("No event handlers registered, skipping RabbitMQ consumer"); + return { + close: async () => {}, + }; + } + + if (!process.env.RABBITMQ_URL) { + throw new Error("RabbitMq requires RABBITMQ_URL"); + } + if (!process.env.K_SERVICE) { + throw new Error("RabbitMq requires K_SERVICE"); + } + const connection = new Connection(process.env.RABBITMQ_URL); + await ensureRabbitMqExchangesAndQueues(connection, process.env.K_SERVICE); + + const ctrl = new AbortController(); + const service = process.env.K_SERVICE; + const prefix = getServicePrefix(service); + + // Publisher for DLQ messages + const dlqPublisher = connection.createPublisher({ + confirm: true, + maxAttempts: 3, + }); + + const sub = connection.createConsumer( + { + queue: `${prefix}.queue.${service}`, + queueOptions: { + passive: true, + }, + qos: { + prefetchCount: 10, + }, + concurrency: 10, + consumerTag: `${prefix}.consumer.${service}.${RABBITMQ_TAG}`, + }, + async (msg: AsyncMessage) => { + if (ctrl.signal.aborted) { + return ConsumerStatus.REQUEUE; + } + try { + const payload = { + messageId: msg.messageId, + body: msg.body, + }; + if (Buffer.isBuffer(payload.body)) { + payload.body = payload.body.toString("utf8"); + } + const resp = await instance.inject({ + method: "POST", + url: "/rabbitmq/process-message", + payload, + }); + if (resp.statusCode >= 200 && resp.statusCode < 300) { + return ConsumerStatus.ACK; + } + + // Check retry count before dead-lettering + const retryCount = getRetryCount(msg); + if (retryCount >= MAX_RETRY_COUNT) { + instance.log.error({ + tag: "RABBITMQ_MESSAGE_MAX_RETRIES_EXCEEDED", + messageId: msg.messageId, + retryCount, + statusCode: resp.statusCode, + body: resp.body, + }); + // Store in DLQ for manual retry before ACKing + await dlqPublisher.send( + { + routingKey: `${prefix}.dlq.${service}`, + contentType: "application/json", + headers: { + ...msg.headers, + "x-original-queue": `${prefix}.queue.${service}`, + "x-final-status-code": resp.statusCode, + "x-final-retry-count": retryCount, + }, + }, + msg.body, + ); + // ACK to remove from queue permanently (don't dead-letter again) + return ConsumerStatus.ACK; + } + + if (resp.statusCode === 429 || resp.statusCode === 409) { + // rate-limited or lock-conflict, use dead-letter retry + return ConsumerStatus.DROP; + } else if (resp.statusCode === 425) { + // delayed message, use local sleep since delay may exceed DLX TTL + await randomDelay(); + return ConsumerStatus.REQUEUE; + } else if (resp.statusCode >= 500 && resp.statusCode < 600) { + // transient server error, use dead-letter retry + return ConsumerStatus.DROP; + } else { + instance.log.warn({ + tag: "RABBITMQ_MESSAGE_DROPPED", + messageId: msg.messageId, + statusCode: resp.statusCode, + body: resp.body, + }); + return ConsumerStatus.DROP; + } + } catch (err) { + // Check retry count before dead-lettering on exception + const retryCount = getRetryCount(msg); + if (retryCount >= MAX_RETRY_COUNT) { + instance.log.error({ + tag: "RABBITMQ_MESSAGE_MAX_RETRIES_EXCEEDED", + messageId: msg.messageId, + retryCount, + err, + }); + // Store in DLQ for manual retry before ACKing + await dlqPublisher.send( + { + routingKey: `${prefix}.dlq.${service}`, + contentType: "application/json", + headers: { + ...msg.headers, + "x-original-queue": `${prefix}.queue.${service}`, + "x-final-error": err instanceof Error ? err.message : String(err), + "x-final-retry-count": retryCount, + }, + }, + msg.body, + ); + return ConsumerStatus.ACK; + } + instance.log.error({ + tag: "RABBITMQ_CONSUMER_ERROR", + err: err, + }); + return ConsumerStatus.DROP; + } + }, + ); + instance.log.info( + "Attached to RabbitMQ for Service=" + process.env.K_SERVICE, + ); + return { + close: async () => { + ctrl.abort(); + await sub.close(); + await dlqPublisher.close(); + await connection.close(); + }, + }; +}; diff --git a/src/event-bus/event-consumer/utils.ts b/src/event-bus/event-consumer/utils.ts new file mode 100644 index 0000000..ebd4e75 --- /dev/null +++ b/src/event-bus/event-consumer/utils.ts @@ -0,0 +1,33 @@ +import * as timers from "node:timers/promises"; + +const BASE_DELAY = Math.max( + parseInt(process.env.EVENT_RETRY_BASE_DELAY ?? "0", 10) || 0, + 5_000, +); +const MAX_DELAY = Math.max( + parseInt(process.env.EVENT_RETRY_MAX_DELAY ?? "0", 10) || 0, + 60_000, +); +const RANDOMIZED_DELAY_MAX = Math.max( + parseInt(process.env.EVENT_RANDOMIZED_DELAY_MAX ?? "0", 10) || 0, + 10_000 +); + +export async function exponentialDelay( + attempt: number, + baseDelayMs: number = BASE_DELAY, + maxDelayMs: number = MAX_DELAY, + randomizationFactor: number = 0.5, +): Promise { + let delay = Math.min(baseDelayMs * 2 ** attempt, maxDelayMs); + if (randomizationFactor > 0) { + const randomFactor = (Math.random() - 0.5) * randomizationFactor * delay; + delay = Math.max(0, delay + randomFactor); + } + await timers.setTimeout(delay); +} + +export async function randomDelay(max: number = RANDOMIZED_DELAY_MAX) { + const delay = Math.ceil(Math.random() * max); + await timers.setTimeout(delay); +} \ No newline at end of file diff --git a/src/event-bus/gcp-pubsub.ts b/src/event-bus/gcp-pubsub.ts new file mode 100644 index 0000000..6693b18 --- /dev/null +++ b/src/event-bus/gcp-pubsub.ts @@ -0,0 +1,270 @@ +import { PubSub } from "@google-cloud/pubsub"; +import { FastifyPluginAsync, FastifyRequest } from "fastify"; +import fp from "fastify-plugin"; +import { + CreateHandlerRunner, + ErrorWithStatus, + getHandlerMap, + noMatchingHandlers, +} from "./commons"; +import { EventBus, EventBusOptions, EventMessage } from "./interfaces"; + +interface PubsubMessage { + message: { + attributes: Record; + data: string; + messageId: string; + publishTime: string; + }; + subscription: string; + attempt: number; +} + +// Resolves @opentelemetry/api from either NODE_PATH (OTel operator) or the +// pnpm absolute path. All copies share the same global symbol so the +// registered SDK is always reached. +function getOtelApi() { + try { + // eslint-disable-next-line @typescript-eslint/no-require-imports + return require("@opentelemetry/api"); + } catch { + try { + // eslint-disable-next-line @typescript-eslint/no-require-imports + return require( + "/app/node_modules/.pnpm/@opentelemetry+api@1.9.0/node_modules/@opentelemetry/api", + ); + } catch { + return null; + } + } +} + +// eslint-disable-next-line @typescript-eslint/no-explicit-any +const otel: any = getOtelApi(); +// eslint-disable-next-line @typescript-eslint/no-explicit-any +const tracer: any = otel + ? otel.trace.getTracer("fp-eventbus-gcp-pubsub") + : null; + +const plugin: FastifyPluginAsync = async function ( + f, + options, +) { + if (!options.topic) { + throw new Error( + "Google Cloud PubSub needs the topic specified. Use EVENT_TOPIC env var", + ); + } + + const handlerMap = getHandlerMap(options); + const client = new PubSub(); + const topic = client.topic(options.topic, { + batching: { + maxMilliseconds: 10, + maxMessages: 100, + }, + }); + + f.addHook("onClose", async () => { + await topic.flush(); + f.log.info({ tag: "GCP_PUBSUB_FINAL_FLUSH" }); + await client.close(); + }); + + function publishToPubSub( + event: string, + payload: any, // eslint-disable-line @typescript-eslint/no-explicit-any + file: string | null, + processAfterDelayMs: number, + req?: FastifyRequest, + ) { + options.validateMsg(event, payload, req); + const attrs: Record = { + event, + }; + if (file) { + attrs.file = file; + } + if (processAfterDelayMs > 0) { + attrs.processAfterDelayMs = "" + processAfterDelayMs; + } + + // Inject the active OTel span context as a W3C traceparent/tracestate into + // Pub/Sub message attributes. context.active() reads from AsyncLocalStorage + // at call-time, so when this is invoked inside a context.with() block (e.g. + // the outbox worker) the correct parent context is used automatically. + if (otel) { + otel.propagation.inject(otel.context.active(), attrs); + if (attrs.traceparent) { + (req ?? f).log.info({ + tag: "EVENT_TRACEPARENT_INJECT", + event, + traceparent: attrs.traceparent, + }); + } + } + + // eslint-disable-next-line @typescript-eslint/no-floating-promises + topic.publishMessage({ + json: { event, payload }, + attributes: attrs, + }); + req?.log.info({ + tag: "EVENT_PUBLISH", + event, + payload, + processAfterDelayMs, + }); + } + + const bus: EventBus = { + publish(event, payload, processAfterDelayMs) { + publishToPubSub(event, payload, null, processAfterDelayMs ?? 0); + }, + }; + f.decorate("EventBus", { + getter() { + return bus; + }, + }); + + f.decorateRequest("EventBus", { + getter() { + return { + // req.EventBus.publish — called inside an HTTP request handler. + // context.active() already holds the HTTP server span context here, + // so propagation.inject works automatically. + publish: (event: string, payload: any, processAfterDelayMs?: number) => { // eslint-disable-line @typescript-eslint/no-explicit-any + publishToPubSub(event, payload, null, processAfterDelayMs ?? 0, this); + }, + }; + }, + }); + + const selectAndRunHandlers = CreateHandlerRunner(f, options, handlerMap); + + f.post<{ Body: PubsubMessage }>( + "/gcp-pubsub/process-message", + { + schema: { + hide: true, + } as any, // eslint-disable-line @typescript-eslint/no-explicit-any + }, + async function (req, reply) { + const body = req.body; + if (!body) { + reply.send("OK"); + return reply; + } + const eventMsg = convert(body); + const attrs = body.message.attributes ?? {}; + + // Extract the W3C trace context that was injected by the publisher. + // This reconstructs the parent span context so all handler spans + // (DB queries, further publishes) are children of the original trace. + let handlerCtx = otel ? otel.context.active() : null; + let span = null; + if (otel && attrs.traceparent) { + const parentCtx = otel.propagation.extract( + otel.context.active(), + attrs, + ); + span = tracer.startSpan( + `pubsub.consume.${eventMsg.event}`, + { kind: otel.SpanKind.CONSUMER }, + parentCtx, + ); + handlerCtx = otel.trace.setSpan(parentCtx, span); + } + + req.log.info({ + tag: "PUB_SUB_MSG", + messageId: body.message.messageId, + subscription: body.subscription, + attributes: attrs, + publishTime: body.message.publishTime, + attempt: body.attempt, + traceparent_extracted: attrs.traceparent ?? "MISSING", + }); + + options.validateMsg(eventMsg.event, eventMsg.data, req); + + if (noMatchingHandlers(handlerMap, eventMsg)) { + span?.end(); + reply.send("OK"); + return reply; + } + + req.log.info({ + tag: "PUB_SUB_MSG_HANDLE", + event: eventMsg, + traceparent: attrs.traceparent ?? "MISSING", + }); + + if ( + eventMsg.processAfterDelayMs > 0 && + Date.now() < + eventMsg.publishTime.getTime() + eventMsg.processAfterDelayMs + ) { + req.log.info({ + tag: "PUB_SUB_MSG_DELAYED", + eventId: eventMsg.id, + }); + span?.end(); + reply + .status(425) + .send({ processAfterDelayMs: eventMsg?.processAfterDelayMs }); + return reply; + } + + const runHandlers = async () => { + try { + await selectAndRunHandlers(req, eventMsg, (event, payload, file) => + publishToPubSub( + event, + payload, + file, + eventMsg.processAfterDelayMs, + req, + ), + ); + reply.send("OK"); + } catch (err) { + if (err instanceof ErrorWithStatus) { + reply.status(err.status).send(err.message); + } else { + reply.status(500).send("ERROR"); + } + } finally { + span?.end(); + } + }; + + // Run handlers inside the extracted parent context so all child spans + // (DB queries, further publishes) are linked to the original trace tree. + if (otel && handlerCtx) { + await otel.context.with(handlerCtx, runHandlers); + } else { + await runHandlers(); + } + return reply; + }, + ); +}; + +export = fp(plugin, { name: "fp-eventbus-gcp-pubsub" }); + +function convert(msg: PubsubMessage): EventMessage { + const buf = Buffer.from(msg.message.data, "base64"); + const json = buf.toString("utf-8"); + const obj = JSON.parse(json); + return { + id: msg.message.messageId, + publishTime: new Date(msg.message.publishTime), + processAfterDelayMs: + parseInt(msg.message.attributes.processAfterDelayMs ?? "0", 10) || 0, + attributes: msg.message.attributes, + event: obj.event, + data: obj.payload, + }; +} diff --git a/src/event-bus/index.spec.ts b/src/event-bus/index.spec.ts new file mode 100644 index 0000000..603b8ff --- /dev/null +++ b/src/event-bus/index.spec.ts @@ -0,0 +1,293 @@ +import Fastify from "fastify"; +import EventBusPlugin from "./index"; +import { EventBusOptions, EventHandler } from "./interfaces"; + +describe("EventBus Plugin", () => { + let fastify: ReturnType; + let mockValidateMsg: jest.Mock; + let mockProcessError: jest.Mock; + let mockHandler: EventHandler; + + beforeEach(() => { + fastify = Fastify({ logger: false }); + mockValidateMsg = jest.fn(); + mockProcessError = jest.fn().mockReturnValue({ err: new Error("test"), status: 500 }); + mockHandler = jest.fn().mockResolvedValue(undefined); + }); + + afterEach(async () => { + try { + await fastify.close(); + } catch (error) { + // Ignore errors from fastify already being closed + if (!error.message.includes('already been closed')) { + throw error; + } + } + }); + + describe("Plugin Registration", () => { + it("should register with in-process bus type (default)", async () => { + const options: EventBusOptions = { + busType: "in-process", + handlers: [{ + file: "test.ts", + handlers: { testEvent: mockHandler } + }], + validateMsg: mockValidateMsg, + processError: mockProcessError, + }; + + await fastify.register(EventBusPlugin, options); + expect(fastify.EventBus).toBeDefined(); + }); + + it("should require proper environment variables for rabbitmq", async () => { + // Test that rabbitmq registration fails without required env vars + const options: EventBusOptions = { + busType: "rabbitmq", + handlers: [], + validateMsg: mockValidateMsg, + processError: mockProcessError, + }; + + await expect(fastify.register(EventBusPlugin, options)).rejects.toThrow(); + }); + + it("should register with gcp-pubsub bus type", async () => { + process.env.EVENT_TOPIC = "test-topic"; + + const options: EventBusOptions = { + busType: "gcp-pubsub", + topic: "test-topic", + handlers: [{ + file: "test.ts", + handlers: { testEvent: mockHandler } + }], + validateMsg: mockValidateMsg, + processError: mockProcessError, + }; + + await fastify.register(EventBusPlugin, options); + expect(fastify.EventBus).toBeDefined(); + + delete process.env.EVENT_TOPIC; + }); + + it("should register with azure-servicebus bus type", async () => { + process.env.EVENT_TOPIC = "test-topic"; + + const options: EventBusOptions = { + busType: "azure-servicebus", + namespace: "test-namespace", + topic: "test-topic", + handlers: [{ + file: "test.ts", + handlers: { testEvent: mockHandler } + }], + validateMsg: mockValidateMsg, + processError: mockProcessError, + }; + + await fastify.register(EventBusPlugin, options); + expect(fastify.EventBus).toBeDefined(); + + delete process.env.EVENT_TOPIC; + }); + + it("should throw error for azure-servicebus without namespace", async () => { + const options: EventBusOptions = { + busType: "azure-servicebus", + handlers: [{ + file: "test.ts", + handlers: { testEvent: mockHandler } + }], + validateMsg: mockValidateMsg, + processError: mockProcessError, + }; + + await expect(fastify.register(EventBusPlugin, options)).rejects.toThrow( + "Azure ServiceBus needs the namespace specified" + ); + }); + }); + + describe("Event Publish Route", () => { + beforeEach(async () => { + const options: EventBusOptions = { + busType: "in-process", + handlers: [{ + file: "test.ts", + handlers: { testEvent: mockHandler } + }], + validateMsg: mockValidateMsg, + processError: mockProcessError, + }; + + await fastify.register(EventBusPlugin, options); + }); + + it("should register publish route by default", async () => { + const response = await fastify.inject({ + method: "POST", + url: "/event-bus/publish/testEvent", + payload: { test: "data" } + }); + + expect(response.statusCode).toBe(200); + expect(response.body).toBe("OK"); + }); + + it("should handle string payload via query param", async () => { + const response = await fastify.inject({ + method: "POST", + url: "/event-bus/publish/testEvent?stringPayload=test-string", + payload: {} + }); + + expect(response.statusCode).toBe(200); + expect(response.body).toBe("OK"); + }); + + it("should handle integer payload via query param", async () => { + const response = await fastify.inject({ + method: "POST", + url: "/event-bus/publish/testEvent?integerPayload=123", + payload: {} + }); + + expect(response.statusCode).toBe(200); + expect(response.body).toBe("OK"); + }); + + it("should not register publish route when disabled", async () => { + const fastifyDisabled = Fastify({ logger: false }); + + const options: EventBusOptions = { + busType: "in-process", + handlers: [{ + file: "test.ts", + handlers: { testEvent: mockHandler } + }], + validateMsg: mockValidateMsg, + processError: mockProcessError, + disableEventPublishRoute: true, + }; + + await fastifyDisabled.register(EventBusPlugin, options); + + const response = await fastifyDisabled.inject({ + method: "POST", + url: "/event-bus/publish/testEvent", + payload: { test: "data" } + }); + + expect(response.statusCode).toBe(404); + await fastifyDisabled.close(); + }); + }); + + describe("EventBus Decorator", () => { + beforeEach(async () => { + const options: EventBusOptions = { + busType: "in-process", + handlers: [{ + file: "test.ts", + handlers: { testEvent: mockHandler } + }], + validateMsg: mockValidateMsg, + processError: mockProcessError, + }; + + await fastify.register(EventBusPlugin, options); + }); + + it("should provide EventBus on fastify instance", () => { + expect(fastify.EventBus).toBeDefined(); + expect(typeof fastify.EventBus.publish).toBe("function"); + }); + + it("should allow publishing events programmatically", () => { + expect(() => { + fastify.EventBus.publish("testEvent", { test: "data" }); + }).not.toThrow(); + }); + + it("should allow publishing events with delay", () => { + expect(() => { + fastify.EventBus.publish("testEvent", { test: "data" }, 1000); + }).not.toThrow(); + }); + }); + + describe("Configuration Options", () => { + it("should handle empty handlers array", async () => { + const options: EventBusOptions = { + busType: "in-process", + handlers: [], + validateMsg: mockValidateMsg, + processError: mockProcessError, + }; + + await fastify.register(EventBusPlugin, options); + expect(fastify.EventBus).toBeDefined(); + }); + + it("should handle multiple handler files", async () => { + const handler1: EventHandler = jest.fn().mockResolvedValue(undefined); + const handler2: EventHandler = jest.fn().mockResolvedValue(undefined); + + const options: EventBusOptions = { + busType: "in-process", + handlers: [ + { + file: "handlers1.ts", + handlers: { event1: handler1 } + }, + { + file: "handlers2.ts", + handlers: { event2: handler2 } + } + ], + validateMsg: mockValidateMsg, + processError: mockProcessError, + }; + + await fastify.register(EventBusPlugin, options); + expect(fastify.EventBus).toBeDefined(); + }); + + it("should handle topic and namespace options", async () => { + const options: EventBusOptions = { + busType: "in-process", + topic: "test-topic", + namespace: "test-namespace", + handlers: [{ + file: "test.ts", + handlers: { testEvent: mockHandler } + }], + validateMsg: mockValidateMsg, + processError: mockProcessError, + }; + + await fastify.register(EventBusPlugin, options); + expect(fastify.EventBus).toBeDefined(); + }); + + it("should handle actionConcurrency option", async () => { + const options: EventBusOptions = { + busType: "in-process", + handlers: [{ + file: "test.ts", + handlers: { testEvent: mockHandler } + }], + validateMsg: mockValidateMsg, + processError: mockProcessError, + actionConcurrency: 5, + }; + + await fastify.register(EventBusPlugin, options); + expect(fastify.EventBus).toBeDefined(); + }); + }); +}); \ No newline at end of file diff --git a/src/event-bus/index.ts b/src/event-bus/index.ts new file mode 100644 index 0000000..ced9f04 --- /dev/null +++ b/src/event-bus/index.ts @@ -0,0 +1,82 @@ +import { FastifyPluginAsync } from "fastify"; +import fp from "fastify-plugin"; +import { EventBusOptions } from "./interfaces"; + +const plugin: FastifyPluginAsync = async function ( + f, + options, +) { + switch (options.busType) { + case "gcp-pubsub": + // eslint-disable-next-line @typescript-eslint/no-require-imports + await f.register(require("./gcp-pubsub"), options); + break; + case "azure-servicebus": + if (!options.namespace) { + throw new Error( + "Azure ServiceBus needs the namespace specified. Use EVENT_NAMESPACE env var", + ); + } + // eslint-disable-next-line @typescript-eslint/no-require-imports + await f.register(require("./azure-servicebus"), options); + break; + case "rabbitmq": + // eslint-disable-next-line @typescript-eslint/no-require-imports + await f.register(require("./rabbitmq"), options); + break; + default: + // eslint-disable-next-line @typescript-eslint/no-require-imports + await f.register(require("./local"), options); + } + + /// + if (!options.disableEventPublishRoute) { + f.post<{ + Params: { event: string }; + Querystring: { + stringPayload?: string; + integerPayload?: string; + }; + Body: any; + }>( + "/event-bus/publish/:event", + { + schema: { + description: + "API to push a event manually. Use the appropriate query-param or request-body to send the payload.", + operationId: "publishToEventBus", + params: { + type: "object", + properties: { + event: { type: "string" }, + }, + }, + querystring: { + type: "object", + properties: { + stringPayload: { type: "string" }, + integerPayload: { type: "string" }, + }, + }, + body: { + type: "object", + additionalProperties: true, + }, + }, + }, + async function (req) { + if (req.query.stringPayload || req.query.integerPayload) { + req.EventBus.publish( + req.params.event, + req.query.stringPayload || req.query.integerPayload, + ); + } else { + req.EventBus.publish(req.params.event, req.body); + } + return "OK"; + }, + ); + } +}; + +export default fp(plugin, { name: "fp-eventbus" }); diff --git a/src/event-bus/interfaces.ts b/src/event-bus/interfaces.ts new file mode 100644 index 0000000..324991a --- /dev/null +++ b/src/event-bus/interfaces.ts @@ -0,0 +1,58 @@ +import { FastifyInstance, FastifyRequest } from "fastify"; +import { Registry } from "prom-client"; + +export interface EventBusOptions { + busType: "rabbitmq" | "gcp-pubsub" | "azure-servicebus" | "in-process"; + topic?: string; + namespace?: string; + handlers: { + file: string; + handlers: EventHandlers; + }[]; + validateMsg: (event: string, payload: any, req?: FastifyRequest) => void; + processError(err: any, ctx: ActionContext): { err: any; status: number }; + ensureExchangesAndQueues?: boolean; + disableEventPublishRoute?: boolean; + actionConcurrency?: number; + registry?: Registry; +} + +export type PublishToPubSub = ( + event: string, + payload: any, + file: string | null, +) => void; + +export interface ActionContext { + req: FastifyRequest; + publishToPubSub: PublishToPubSub; + handler: EventHandler; + /// + eventMsg: EventMessage; + file: string; + specifiedFile: string | undefined; +} + +export type EventHandlers = { + readonly [k: string]: EventHandler; +}; + +export type EventHandler = ( + this: FastifyInstance, + msg: EventMessage, + req: FastifyRequest, +) => Promise; + +export interface EventMessage { + id: string; + publishTime: Date; + processAfterDelayMs: number; + attributes: Record; + event: string; + data: T; +} + +export interface EventBus { + // published event should be processed after (Date.now() + processAfterDelayMs) + publish(event: string, payload: any, processAfterDelayMs?: number): void; +} diff --git a/src/event-bus/local.spec.ts b/src/event-bus/local.spec.ts new file mode 100644 index 0000000..4ccb1c8 --- /dev/null +++ b/src/event-bus/local.spec.ts @@ -0,0 +1,391 @@ +import Fastify from "fastify"; +import LocalEventBusPlugin from "./local"; +import { EventBusOptions, EventHandler, EventMessage } from "./interfaces"; + +describe("Local EventBus Plugin", () => { + let mockValidateMsg: jest.Mock; + let mockProcessError: jest.Mock; + let mockHandler: EventHandler; + + beforeEach(() => { + mockValidateMsg = jest.fn(); + mockProcessError = jest.fn().mockReturnValue({ err: new Error("test"), status: 500 }); + mockHandler = jest.fn().mockResolvedValue(undefined); + }); + + describe("Local EventBus Registration", () => { + it("should register local event bus plugin", async () => { + const fastify = Fastify({ logger: false }); + const options: EventBusOptions = { + busType: "in-process", + handlers: [{ + file: "test.ts", + handlers: { testEvent: mockHandler } + }], + validateMsg: mockValidateMsg, + processError: mockProcessError, + }; + + await fastify.register(LocalEventBusPlugin, options); + + expect(fastify.EventBus).toBeDefined(); + expect(typeof fastify.EventBus.publish).toBe("function"); + + await fastify.close(); + }); + + it("should decorate both fastify instance and request", async () => { + const fastify = Fastify({ logger: false }); + const options: EventBusOptions = { + busType: "in-process", + handlers: [{ + file: "test.ts", + handlers: { testEvent: mockHandler } + }], + validateMsg: mockValidateMsg, + processError: mockProcessError, + }; + + await fastify.register(LocalEventBusPlugin, options); + + // Test fastify instance decoration + expect(fastify.EventBus).toBeDefined(); + + // Test request decoration through a route + fastify.get("/test", async (request) => { + expect(request.EventBus).toBeDefined(); + expect(typeof request.EventBus.publish).toBe("function"); + return "ok"; + }); + + const response = await fastify.inject({ + method: "GET", + url: "/test" + }); + + expect(response.statusCode).toBe(200); + + await fastify.close(); + }); + }); + + describe("Event Publishing", () => { + it("should publish events and validate", async () => { + const fastify = Fastify({ logger: false }); + const options: EventBusOptions = { + busType: "in-process", + handlers: [{ + file: "test.ts", + handlers: { testEvent: mockHandler } + }], + validateMsg: mockValidateMsg, + processError: mockProcessError, + }; + + await fastify.register(LocalEventBusPlugin, options); + + // Test publishing - this should call validateMsg + expect(() => { + fastify.EventBus.publish("testEvent", { test: "data" }); + }).not.toThrow(); + + expect(mockValidateMsg).toHaveBeenCalledWith("testEvent", { test: "data" }, undefined); + + // Don't close fastify to avoid flush hook issues + }); + + it("should publish events with delay", async () => { + const fastify = Fastify({ logger: false }); + const options: EventBusOptions = { + busType: "in-process", + handlers: [{ + file: "test.ts", + handlers: { testEvent: mockHandler } + }], + validateMsg: mockValidateMsg, + processError: mockProcessError, + }; + + await fastify.register(LocalEventBusPlugin, options); + + expect(() => { + fastify.EventBus.publish("testEvent", { test: "data" }, 1000); + }).not.toThrow(); + + expect(mockValidateMsg).toHaveBeenCalledWith("testEvent", { test: "data" }, undefined); + + // Don't close fastify to avoid flush hook issues + }); + }); + + describe("Message Processing", () => { + it("should process messages via internal route", async () => { + const fastify = Fastify({ logger: false }); + const options: EventBusOptions = { + busType: "in-process", + handlers: [{ + file: "test.ts", + handlers: { testEvent: mockHandler } + }], + validateMsg: mockValidateMsg, + processError: mockProcessError, + }; + + await fastify.register(LocalEventBusPlugin, options); + + const testMessage: EventMessage = { + id: "test-msg-1", + event: "testEvent", + data: { test: "data" }, + attributes: {}, + publishTime: new Date("2023-01-01"), + processAfterDelayMs: 0, + }; + + const response = await fastify.inject({ + method: "POST", + url: "/local-servicebus/process-message", + payload: testMessage + }); + + expect(response.statusCode).toBe(200); + expect(response.body).toBe("OK"); + expect(mockValidateMsg).toHaveBeenCalledWith("testEvent", { test: "data" }, expect.any(Object)); + + await fastify.close(); + }); + + it("should handle messages with no body", async () => { + const fastify = Fastify({ logger: false }); + const options: EventBusOptions = { + busType: "in-process", + handlers: [{ + file: "test.ts", + handlers: { testEvent: mockHandler } + }], + validateMsg: mockValidateMsg, + processError: mockProcessError, + }; + + await fastify.register(LocalEventBusPlugin, options); + + const response = await fastify.inject({ + method: "POST", + url: "/local-servicebus/process-message" + // No payload + }); + + expect(response.statusCode).toBe(200); + expect(response.body).toBe("NO_MESSAGE"); + + await fastify.close(); + }); + + it("should bail out when no matching handlers", async () => { + const fastify = Fastify({ logger: false }); + const options: EventBusOptions = { + busType: "in-process", + handlers: [{ + file: "test.ts", + handlers: { testEvent: mockHandler } + }], + validateMsg: mockValidateMsg, + processError: mockProcessError, + }; + + await fastify.register(LocalEventBusPlugin, options); + + const testMessage: EventMessage = { + id: "test-msg-1", + event: "nonExistentEvent", + data: { test: "data" }, + attributes: {}, + publishTime: new Date("2023-01-01"), + processAfterDelayMs: 0, + }; + + const response = await fastify.inject({ + method: "POST", + url: "/local-servicebus/process-message", + payload: testMessage + }); + + expect(response.statusCode).toBe(200); + expect(response.body).toBe("BAIL_OUT_NO_MATCHING_HANDLERS"); + + await fastify.close(); + }); + + it("should execute matching event handlers", async () => { + const fastify = Fastify({ logger: false }); + const options: EventBusOptions = { + busType: "in-process", + handlers: [{ + file: "test.ts", + handlers: { testEvent: mockHandler } + }], + validateMsg: mockValidateMsg, + processError: mockProcessError, + }; + + await fastify.register(LocalEventBusPlugin, options); + + const testMessage: EventMessage = { + id: "test-msg-1", + event: "testEvent", + data: { test: "execution data" }, + attributes: {}, + publishTime: new Date("2023-01-01"), + processAfterDelayMs: 0, + }; + + const response = await fastify.inject({ + method: "POST", + url: "/local-servicebus/process-message", + payload: testMessage + }); + + expect(response.statusCode).toBe(200); + expect(response.body).toBe("OK"); + expect(mockHandler).toHaveBeenCalledWith( + expect.objectContaining({ + id: "test-msg-1", + event: "testEvent", + data: { test: "execution data" }, + attributes: {}, + processAfterDelayMs: 0, + }), + expect.any(Object) + ); + + await fastify.close(); + }); + + it("should handle messages with file attributes", async () => { + const fastify = Fastify({ logger: false }); + const options: EventBusOptions = { + busType: "in-process", + handlers: [{ + file: "specific-file.ts", + handlers: { testEvent: mockHandler } + }], + validateMsg: mockValidateMsg, + processError: mockProcessError, + }; + + await fastify.register(LocalEventBusPlugin, options); + + const testMessage: EventMessage = { + id: "test-msg-1", + event: "testEvent", + data: { test: "data" }, + attributes: { file: "specific-file.ts" }, + publishTime: new Date("2023-01-01"), + processAfterDelayMs: 0, + }; + + const response = await fastify.inject({ + method: "POST", + url: "/local-servicebus/process-message", + payload: testMessage + }); + + expect(response.statusCode).toBe(200); + expect(response.body).toBe("OK"); + + await fastify.close(); + }); + }); + + describe("Error Handling", () => { + it("should handle validation errors", async () => { + const fastify = Fastify({ logger: false }); + const mockValidateMsgFail = jest.fn().mockImplementation(() => { + throw new Error("Validation failed"); + }); + + const options: EventBusOptions = { + busType: "in-process", + handlers: [{ + file: "test.ts", + handlers: { testEvent: mockHandler } + }], + validateMsg: mockValidateMsgFail, + processError: mockProcessError, + }; + + await fastify.register(LocalEventBusPlugin, options); + + expect(() => { + fastify.EventBus.publish("testEvent", { invalid: "data" }); + }).toThrow("Validation failed"); + + await fastify.close(); + }); + + it("should handle handler execution errors gracefully", async () => { + const fastify = Fastify({ logger: false }); + const failingHandler: EventHandler = jest.fn().mockRejectedValue(new Error("Handler failed")); + + const options: EventBusOptions = { + busType: "in-process", + handlers: [{ + file: "test.ts", + handlers: { failingEvent: failingHandler } + }], + validateMsg: mockValidateMsg, + processError: mockProcessError, + }; + + await fastify.register(LocalEventBusPlugin, options); + + const testMessage: EventMessage = { + id: "test-msg-1", + event: "failingEvent", + data: { test: "data" }, + attributes: {}, + publishTime: new Date("2023-01-01"), + processAfterDelayMs: 0, + }; + + // The request should still complete, errors are handled by the handler runner + const response = await fastify.inject({ + method: "POST", + url: "/local-servicebus/process-message", + payload: testMessage + }); + + expect(response.statusCode).toBe(200); + expect(failingHandler).toHaveBeenCalled(); + + await fastify.close(); + }); + }); + + describe("Message Queue Integration", () => { + it("should handle basic message flow", async () => { + const fastify = Fastify({ logger: false }); + const options: EventBusOptions = { + busType: "in-process", + handlers: [{ + file: "test.ts", + handlers: { testEvent: mockHandler } + }], + validateMsg: mockValidateMsg, + processError: mockProcessError, + }; + + await fastify.register(LocalEventBusPlugin, options); + + // Publish an event + expect(() => { + fastify.EventBus.publish("testEvent", { test: "basic flow" }); + }).not.toThrow(); + + // Verify validation was called + expect(mockValidateMsg).toHaveBeenCalledWith("testEvent", { test: "basic flow" }, undefined); + + // Don't close to avoid flush hook issues + }); + }); +}); \ No newline at end of file diff --git a/src/event-bus/local.ts b/src/event-bus/local.ts new file mode 100644 index 0000000..d6fc546 --- /dev/null +++ b/src/event-bus/local.ts @@ -0,0 +1,125 @@ +import { FastifyPluginAsync, FastifyRequest } from "fastify"; +import fp from "fastify-plugin"; +import { + CreateHandlerRunner, + getHandlerMap, + noMatchingHandlers, +} from "./commons"; +import { EventBus, EventBusOptions, EventMessage } from "./interfaces"; + +const plugin: FastifyPluginAsync = async function ( + f, + options, +) { + const handlerMap = getHandlerMap(options); + + const bus: EventBus = { + publish(event, payload, processAfterDelayMs) { + publishToPubSub(event, payload, null, processAfterDelayMs ?? 0); + }, + }; + f.decorate("EventBus", { + getter() { + return bus; + }, + }); + f.decorateRequest("EventBus", { + getter() { + return bus; + }, + }); + + const messages: EventMessage[] = []; + + const flush = async () => { + while (messages.length) { + const msg = messages.shift(); + if (!msg) { + break; + } + await f.inject({ + method: "POST", + url: "/local-servicebus/process-message", + payload: msg, + }); + } + }; + + f.addHook("onClose", async () => { + await flush(); + f.log.trace({ tag: "LOCAL_SERVICEBUS_FINAL_FLUSH" }); + }); + + f.addHook("onSend", async function (_req, _reply, payload) { + // eslint-disable-next-line @typescript-eslint/no-floating-promises + flush(); + return payload; + }); + + let msgId = 0; + function publishToPubSub( + event: string, + payload: any, + file: string | null, + processAfterDelayMs: number, + req?: FastifyRequest, + ) { + options.validateMsg(event, payload, req); + const messageBody: EventMessage = { + id: "in-process" + msgId, + event, + attributes: file == null ? {} : { file }, + data: payload, + publishTime: new Date(), + processAfterDelayMs: 0, + }; + msgId++; + messages.push(messageBody); + f.log.info( + "Event pushed --> " + + event + + " -- " + + payload + + "--" + + file + + "--" + + processAfterDelayMs, + ); + } + + const selectAndRunHandlers = CreateHandlerRunner(f, options, handlerMap); + + f.post<{ Body: EventMessage }>( + "/local-servicebus/process-message", + { + schema: { + hide: true, + } as any, + }, + async function (req) { + const msg = req.body; + if (!msg) { + return "NO_MESSAGE"; + } + options.validateMsg(msg.event, msg.data, req); + + if (noMatchingHandlers(handlerMap, msg)) { + // bail-out + // service has no event-handlers registered + return "BAIL_OUT_NO_MATCHING_HANDLERS"; + } + + req.log.info({ + tag: "LOCAL_SERVICEBUS_MESSAGE_HANDLE", + msg: msg, + }); + + await selectAndRunHandlers(req, msg, (event, payload, file) => + publishToPubSub(event, payload, file, 0, req), + ); + return "OK"; + }, + ); +}; + +export = fp(plugin, { name: "fp-eventbus-local" }); diff --git a/src/event-bus/rabbitmq-utils.ts b/src/event-bus/rabbitmq-utils.ts new file mode 100644 index 0000000..f94af79 --- /dev/null +++ b/src/event-bus/rabbitmq-utils.ts @@ -0,0 +1,84 @@ +import { Connection } from "rabbitmq-client"; + +export const RABBITMQ_TAG = "" + Math.floor(Math.random() * 1e9); + +/** + * Extracts the prefix from a service name (part before the first hyphen). + * e.g., "wms-cincout" → "wms", "xyz-service" → "xyz", "noprefix" → "default" + */ +export function getServicePrefix(service: string): string { + const hyphenIndex = service.indexOf("-"); + return hyphenIndex > 0 ? service.substring(0, hyphenIndex) : "default"; +} + +export async function ensureRabbitMqExchangesAndQueues( + connection: Connection, + service: string, +) { + const prefix = getServicePrefix(service); + await connection.exchangeDeclare({ + exchange: `${prefix}.main-exchange`, + type: "fanout", + durable: true, + arguments: {}, + autoDelete: false, + internal: false, + passive: false, + }); + await connection.queueDeclare({ + queue: `${prefix}.queue.${service}`, + arguments: { + "x-message-ttl": 300000, // 5 minutes + "x-dead-letter-exchange": `${prefix}.retry-exchange.${service}`, + "x-dead-letter-routing-key": "retry", // Fixed routing key for DLX + "x-queue-type": "classic", + }, + autoDelete: false, + durable: true, + exclusive: false, + passive: false, + }); + await connection.queueBind({ + exchange: `${prefix}.main-exchange`, + queue: `${prefix}.queue.${service}`, + }); + await connection.exchangeDeclare({ + exchange: `${prefix}.retry-exchange.${service}`, + type: "direct", + durable: true, + arguments: {}, + autoDelete: false, + internal: false, + passive: false, + }); + await connection.queueDeclare({ + queue: `${prefix}.retry-queue.${service}`, + arguments: { + "x-message-ttl": 5000, + "x-dead-letter-exchange": "", // default exchange routes by queue name + "x-dead-letter-routing-key": `${prefix}.queue.${service}`, + "x-queue-type": "classic", + }, + autoDelete: false, + durable: true, + exclusive: false, + passive: false, + }); + await connection.queueBind({ + exchange: `${prefix}.retry-exchange.${service}`, + queue: `${prefix}.retry-queue.${service}`, + routingKey: "retry", // Must match x-dead-letter-routing-key from main queue + }); + // Dead Letter Queue for messages that exceed max retries (manual retry) + await connection.queueDeclare({ + queue: `${prefix}.dlq.${service}`, + arguments: { + "x-message-ttl": 300000, // 5 minutes + "x-queue-type": "classic", + }, + autoDelete: false, + durable: true, + exclusive: false, + passive: false, + }); +} diff --git a/src/event-bus/rabbitmq.spec.ts b/src/event-bus/rabbitmq.spec.ts new file mode 100644 index 0000000..4bd5eea --- /dev/null +++ b/src/event-bus/rabbitmq.spec.ts @@ -0,0 +1,1503 @@ +import Fastify, { FastifyInstance } from "fastify"; +import { GenericContainer, StartedTestContainer, Wait } from "testcontainers"; +import { Connection } from "rabbitmq-client"; +import { Plugins } from "../index"; +import { EventBusOptions, EventHandler } from "./interfaces"; +import { getServicePrefix } from "./rabbitmq-utils"; +import { RabbitMqServiceBusConsumerBuilder } from "./event-consumer/rabbitmq"; + +describe("RabbitMQ Integration Tests", () => { + let container: StartedTestContainer; + let rabbitmqUrl: string; + + beforeAll(async () => { + container = await new GenericContainer("rabbitmq:3-management") + .withExposedPorts(5672, 15672) + .withWaitStrategy(Wait.forLogMessage(/started TCP listener/)) + .start(); + + const host = container.getHost(); + const port = container.getMappedPort(5672); + rabbitmqUrl = `amqp://guest:guest@${host}:${port}`; + }, 60000); + + afterAll(async () => { + await container?.stop(); + }); + + describe("getServicePrefix", () => { + it("should extract prefix before first hyphen", () => { + expect(getServicePrefix("wms-cincout")).toBe("wms"); + expect(getServicePrefix("xyz-service-name")).toBe("xyz"); + }); + + it("should return 'default' when no hyphen found", () => { + expect(getServicePrefix("noprefix")).toBe("default"); + expect(getServicePrefix("")).toBe("default"); + }); + + it("should handle edge cases", () => { + expect(getServicePrefix("-startswithhyphen")).toBe("default"); + expect(getServicePrefix("a-b")).toBe("a"); + }); + }); + + describe("RabbitMQ Publisher", () => { + let fastify: FastifyInstance; + const originalEnv = process.env; + + beforeEach(() => { + process.env = { ...originalEnv }; + process.env.RABBITMQ_URL = rabbitmqUrl; + process.env.K_SERVICE = "test-publisher"; + fastify = Fastify({ logger: false }); + }); + + afterEach(async () => { + process.env = originalEnv; + await fastify.close(); + }); + + it("should register RabbitMQ plugin successfully", async () => { + const eventBusOptions: EventBusOptions = { + busType: "rabbitmq", + handlers: [], + validateMsg: jest.fn(), + processError: jest.fn().mockReturnValue({ err: new Error("test"), status: 500 }), + }; + + await fastify.register(Plugins.EventBus, eventBusOptions); + expect(fastify.EventBus).toBeDefined(); + expect(typeof fastify.EventBus.publish).toBe("function"); + }); + + it("should publish messages to RabbitMQ", async () => { + const validateMsg = jest.fn(); + const eventBusOptions: EventBusOptions = { + busType: "rabbitmq", + handlers: [], + validateMsg, + processError: jest.fn().mockReturnValue({ err: new Error("test"), status: 500 }), + }; + + await fastify.register(Plugins.EventBus, eventBusOptions); + + // Publish a message + fastify.EventBus.publish("test.event", { data: "test-payload" }); + + // Wait for flush interval + await new Promise((resolve) => setTimeout(resolve, 100)); + + expect(validateMsg).toHaveBeenCalledWith("test.event", { data: "test-payload" }, undefined); + }); + + it("should use dynamic prefix based on K_SERVICE", async () => { + process.env.K_SERVICE = "myapp-service"; + + const eventBusOptions: EventBusOptions = { + busType: "rabbitmq", + handlers: [], + validateMsg: jest.fn(), + processError: jest.fn().mockReturnValue({ err: new Error("test"), status: 500 }), + }; + + await fastify.register(Plugins.EventBus, eventBusOptions); + + // Now verify the exchange was created with correct prefix + const connection = new Connection(rabbitmqUrl); + try { + // This will throw if exchange doesn't exist (passive: true) + await connection.exchangeDeclare({ + exchange: "myapp.main-exchange", + type: "fanout", + passive: true, + }); + // If we get here, exchange exists + expect(true).toBe(true); + } finally { + await connection.close(); + } + }); + }); + + describe("RabbitMQ Consumer", () => { + let fastify: FastifyInstance; + let consumer: { close: () => Promise } | null = null; + const originalEnv = process.env; + + beforeEach(() => { + process.env = { ...originalEnv }; + process.env.RABBITMQ_URL = rabbitmqUrl; + process.env.K_SERVICE = "test-consumer"; + fastify = Fastify({ logger: false }); + }); + + afterEach(async () => { + process.env = originalEnv; + if (consumer) { + await consumer.close(); + consumer = null; + } + await fastify.close(); + }); + + it("should consume messages from RabbitMQ", async () => { + const receivedMessages: any[] = []; + const messageHandler: EventHandler = jest.fn(async (msg) => { + receivedMessages.push(msg); + }); + + const eventBusOptions: EventBusOptions = { + busType: "rabbitmq", + handlers: [ + { + file: "test.ts", + handlers: { "test.consume": messageHandler }, + }, + ], + validateMsg: jest.fn(), + processError: jest.fn().mockReturnValue({ err: new Error("test"), status: 500 }), + }; + + await fastify.register(Plugins.EventBus, eventBusOptions); + await fastify.ready(); + + // Start consumer + consumer = await RabbitMqServiceBusConsumerBuilder(fastify); + + // Publish a message directly to RabbitMQ + const connection = new Connection(rabbitmqUrl); + const publisher = connection.createPublisher({ maxAttempts: 3 }); + + try { + await publisher.send( + { + exchange: "test.main-exchange", + contentType: "application/json", + }, + JSON.stringify({ + event: "test.consume", + payload: { message: "hello" }, + file: null, + processAfterDelayMs: 0, + publishTimestamp: Date.now(), + }) + ); + + // Wait for message to be consumed + await new Promise((resolve) => setTimeout(resolve, 500)); + + expect(messageHandler).toHaveBeenCalled(); + } finally { + await publisher.close(); + await connection.close(); + } + }); + }); + + describe("RabbitMQ End-to-End", () => { + let publisherFastify: FastifyInstance; + let consumerFastify: FastifyInstance; + let consumer: { close: () => Promise } | null = null; + const originalEnv = process.env; + + beforeEach(() => { + process.env = { ...originalEnv }; + process.env.RABBITMQ_URL = rabbitmqUrl; + publisherFastify = Fastify({ logger: false }); + consumerFastify = Fastify({ logger: false }); + }); + + afterEach(async () => { + process.env = originalEnv; + if (consumer) { + await consumer.close(); + consumer = null; + } + await publisherFastify.close(); + await consumerFastify.close(); + }); + + it("should publish and consume messages end-to-end", async () => { + process.env.K_SERVICE = "e2e-service"; + + const receivedMessages: any[] = []; + const messageHandler: EventHandler = jest.fn(async (msg) => { + receivedMessages.push(msg); + }); + + // Set up consumer + const consumerOptions: EventBusOptions = { + busType: "rabbitmq", + handlers: [ + { + file: "e2e.ts", + handlers: { "e2e.test": messageHandler }, + }, + ], + validateMsg: jest.fn(), + processError: jest.fn().mockReturnValue({ err: new Error("test"), status: 500 }), + }; + + await consumerFastify.register(Plugins.EventBus, consumerOptions); + await consumerFastify.ready(); + consumer = await RabbitMqServiceBusConsumerBuilder(consumerFastify); + + // Set up publisher + const publisherOptions: EventBusOptions = { + busType: "rabbitmq", + handlers: [], + validateMsg: jest.fn(), + processError: jest.fn().mockReturnValue({ err: new Error("test"), status: 500 }), + }; + + await publisherFastify.register(Plugins.EventBus, publisherOptions); + + // Publish message + publisherFastify.EventBus.publish("e2e.test", { testData: "e2e-payload" }); + + // Wait for message to be published and consumed + await new Promise((resolve) => setTimeout(resolve, 500)); + + expect(messageHandler).toHaveBeenCalled(); + expect(receivedMessages.length).toBeGreaterThan(0); + expect(receivedMessages[0].data).toEqual({ testData: "e2e-payload" }); + }); + + it("should respect processAfterDelayMs in messages", async () => { + // Test that messages with processAfterDelayMs are handled correctly + // The consumer returns 425 for messages that arrive too early + process.env.K_SERVICE = "delay-test-service"; + + const eventBusOptions: EventBusOptions = { + busType: "rabbitmq", + handlers: [ + { + file: "delay.ts", + handlers: { "delay.check": jest.fn() }, + }, + ], + validateMsg: jest.fn(), + processError: jest.fn().mockReturnValue({ err: new Error("test"), status: 500 }), + }; + + await consumerFastify.register(Plugins.EventBus, eventBusOptions); + await consumerFastify.ready(); + + // Send a message with delay that hasn't elapsed yet + const response = await consumerFastify.inject({ + method: "POST", + url: "/rabbitmq/process-message", + payload: { + messageId: 123, + body: JSON.stringify({ + event: "delay.check", + payload: { test: true }, + file: null, + processAfterDelayMs: 60000, // 60 seconds in the future + publishTimestamp: Date.now(), + }), + }, + }); + + // Should return 425 (Too Early) for messages that can't be processed yet + expect(response.statusCode).toBe(425); + }); + + it("should handle multiple messages in batch and flush remaining", async () => { + // This test verifies the critical bug fix where messages < batch size (10) were being dropped + process.env.K_SERVICE = "batch-service"; + + const receivedMessages: any[] = []; + const messageHandler: EventHandler = jest.fn(async (msg) => { + receivedMessages.push(msg.data); + }); + + const consumerOptions: EventBusOptions = { + busType: "rabbitmq", + handlers: [ + { + file: "batch.ts", + handlers: { "batch.test": messageHandler }, + }, + ], + validateMsg: jest.fn(), + processError: jest.fn().mockReturnValue({ err: new Error("test"), status: 500 }), + }; + + await consumerFastify.register(Plugins.EventBus, consumerOptions); + await consumerFastify.ready(); + consumer = await RabbitMqServiceBusConsumerBuilder(consumerFastify); + + const publisherOptions: EventBusOptions = { + busType: "rabbitmq", + handlers: [], + validateMsg: jest.fn(), + processError: jest.fn().mockReturnValue({ err: new Error("test"), status: 500 }), + }; + + await publisherFastify.register(Plugins.EventBus, publisherOptions); + + // Publish 15 messages (more than batch size of 10) + // Before the fix, only 10 would be sent, the last 5 would be dropped + for (let i = 0; i < 15; i++) { + publisherFastify.EventBus.publish("batch.test", { index: i }); + } + + // Wait for all messages to be processed + await new Promise((resolve) => setTimeout(resolve, 1500)); + + // Verify ALL 15 messages were received (not just first 10) + expect(receivedMessages.length).toBe(15); + + // Verify message content - all indices should be present + const indices = receivedMessages.map((m) => m.index).sort((a, b) => a - b); + expect(indices).toEqual([0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14]); + }); + + it("should handle multiple event types in same batch", async () => { + process.env.K_SERVICE = "multi-event-service"; + + const eventAMessages: any[] = []; + const eventBMessages: any[] = []; + + const handlerA: EventHandler = jest.fn(async (msg) => { + eventAMessages.push(msg.data); + }); + const handlerB: EventHandler = jest.fn(async (msg) => { + eventBMessages.push(msg.data); + }); + + const consumerOptions: EventBusOptions = { + busType: "rabbitmq", + handlers: [ + { + file: "multi.ts", + handlers: { + "multi.eventA": handlerA, + "multi.eventB": handlerB, + }, + }, + ], + validateMsg: jest.fn(), + processError: jest.fn().mockReturnValue({ err: new Error("test"), status: 500 }), + }; + + await consumerFastify.register(Plugins.EventBus, consumerOptions); + await consumerFastify.ready(); + consumer = await RabbitMqServiceBusConsumerBuilder(consumerFastify); + + const publisherOptions: EventBusOptions = { + busType: "rabbitmq", + handlers: [], + validateMsg: jest.fn(), + processError: jest.fn().mockReturnValue({ err: new Error("test"), status: 500 }), + }; + + await publisherFastify.register(Plugins.EventBus, publisherOptions); + + // Publish interleaved events + publisherFastify.EventBus.publish("multi.eventA", { type: "A", index: 1 }); + publisherFastify.EventBus.publish("multi.eventB", { type: "B", index: 1 }); + publisherFastify.EventBus.publish("multi.eventA", { type: "A", index: 2 }); + publisherFastify.EventBus.publish("multi.eventB", { type: "B", index: 2 }); + publisherFastify.EventBus.publish("multi.eventA", { type: "A", index: 3 }); + + // Wait for processing + await new Promise((resolve) => setTimeout(resolve, 1000)); + + expect(eventAMessages.length).toBe(3); + expect(eventBMessages.length).toBe(2); + expect(eventAMessages.every((m) => m.type === "A")).toBe(true); + expect(eventBMessages.every((m) => m.type === "B")).toBe(true); + }); + + it("should flush all messages on shutdown", async () => { + process.env.K_SERVICE = "shutdown-service"; + + const receivedMessages: any[] = []; + const messageHandler: EventHandler = jest.fn(async (msg) => { + receivedMessages.push(msg.data); + }); + + const consumerOptions: EventBusOptions = { + busType: "rabbitmq", + handlers: [ + { + file: "shutdown.ts", + handlers: { "shutdown.test": messageHandler }, + }, + ], + validateMsg: jest.fn(), + processError: jest.fn().mockReturnValue({ err: new Error("test"), status: 500 }), + }; + + await consumerFastify.register(Plugins.EventBus, consumerOptions); + await consumerFastify.ready(); + consumer = await RabbitMqServiceBusConsumerBuilder(consumerFastify); + + const publisherOptions: EventBusOptions = { + busType: "rabbitmq", + handlers: [], + validateMsg: jest.fn(), + processError: jest.fn().mockReturnValue({ err: new Error("test"), status: 500 }), + }; + + await publisherFastify.register(Plugins.EventBus, publisherOptions); + + // Publish messages + for (let i = 0; i < 7; i++) { + publisherFastify.EventBus.publish("shutdown.test", { index: i }); + } + + // Close publisher immediately (triggers onClose flush) + await publisherFastify.close(); + + // Wait for consumer to process + await new Promise((resolve) => setTimeout(resolve, 1000)); + + // All 7 messages should have been flushed on close + expect(receivedMessages.length).toBe(7); + }); + }); + + describe("RabbitMQ Error Handling", () => { + let fastify: FastifyInstance; + const originalEnv = process.env; + + beforeEach(() => { + process.env = { ...originalEnv }; + fastify = Fastify({ logger: false }); + }); + + afterEach(async () => { + process.env = originalEnv; + await fastify.close(); + }); + + it("should throw error when RABBITMQ_URL is not set", async () => { + delete process.env.RABBITMQ_URL; + process.env.K_SERVICE = "test-service"; + + const eventBusOptions: EventBusOptions = { + busType: "rabbitmq", + handlers: [], + validateMsg: jest.fn(), + processError: jest.fn().mockReturnValue({ err: new Error("test"), status: 500 }), + }; + + await expect( + fastify.register(Plugins.EventBus, eventBusOptions) + ).rejects.toThrow("RabbitMq requires RABBITMQ_URL"); + }); + + it("should throw error when K_SERVICE is not set", async () => { + process.env.RABBITMQ_URL = rabbitmqUrl; + delete process.env.K_SERVICE; + + const eventBusOptions: EventBusOptions = { + busType: "rabbitmq", + handlers: [], + validateMsg: jest.fn(), + processError: jest.fn().mockReturnValue({ err: new Error("test"), status: 500 }), + }; + + await expect( + fastify.register(Plugins.EventBus, eventBusOptions) + ).rejects.toThrow("RabbitMq requires K_SERVICE"); + }); + + it("should handle invalid JSON in message body", async () => { + process.env.RABBITMQ_URL = rabbitmqUrl; + process.env.K_SERVICE = "json-error-service"; + + const eventBusOptions: EventBusOptions = { + busType: "rabbitmq", + handlers: [ + { + file: "error.ts", + handlers: { "error.test": jest.fn() }, + }, + ], + validateMsg: jest.fn(), + processError: jest.fn().mockReturnValue({ err: new Error("test"), status: 500 }), + }; + + await fastify.register(Plugins.EventBus, eventBusOptions); + await fastify.ready(); + + // Simulate incoming message with invalid JSON + const response = await fastify.inject({ + method: "POST", + url: "/rabbitmq/process-message", + payload: { + messageId: 123, + body: "invalid json {", + }, + }); + + expect(response.statusCode).toBe(400); + }); + }); + + describe("Handler Execution", () => { + let consumerFastify: FastifyInstance; + const originalEnv = process.env; + + beforeEach(() => { + process.env = { ...originalEnv }; + process.env.RABBITMQ_URL = rabbitmqUrl; + consumerFastify = Fastify({ logger: false }); + }); + + afterEach(async () => { + process.env = originalEnv; + await consumerFastify.close(); + }); + + it("should call handler matching event name", async () => { + process.env.K_SERVICE = "handler-match-service"; + + const handler1 = jest.fn().mockResolvedValue(undefined); + const handler2 = jest.fn().mockResolvedValue(undefined); + + const eventBusOptions: EventBusOptions = { + busType: "rabbitmq", + handlers: [ + { + file: "handler1.ts", + handlers: { "event.one": handler1 }, + }, + { + file: "handler2.ts", + handlers: { "event.two": handler2 }, + }, + ], + validateMsg: jest.fn(), + processError: jest.fn().mockReturnValue({ err: new Error("test"), status: 500 }), + }; + + await consumerFastify.register(Plugins.EventBus, eventBusOptions); + await consumerFastify.ready(); + + // Send message for event.one + const response = await consumerFastify.inject({ + method: "POST", + url: "/rabbitmq/process-message", + payload: { + messageId: "msg-123", + body: JSON.stringify({ + event: "event.one", + payload: { test: "data" }, + file: null, + processAfterDelayMs: 0, + publishTimestamp: Date.now(), + }), + }, + }); + + expect(response.statusCode).toBe(200); + expect(handler1).toHaveBeenCalledTimes(1); + expect(handler2).not.toHaveBeenCalled(); + }); + + it("should call only handler matching specified file", async () => { + process.env.K_SERVICE = "file-match-service"; + + const handlerA = jest.fn().mockResolvedValue(undefined); + const handlerB = jest.fn().mockResolvedValue(undefined); + + const eventBusOptions: EventBusOptions = { + busType: "rabbitmq", + handlers: [ + { + file: "fileA.ts", + handlers: { "same.event": handlerA }, + }, + { + file: "fileB.ts", + handlers: { "same.event": handlerB }, + }, + ], + validateMsg: jest.fn(), + processError: jest.fn().mockReturnValue({ err: new Error("test"), status: 500 }), + }; + + await consumerFastify.register(Plugins.EventBus, eventBusOptions); + await consumerFastify.ready(); + + // Send message with specific file filter + const response = await consumerFastify.inject({ + method: "POST", + url: "/rabbitmq/process-message", + payload: { + messageId: "msg-456", + body: JSON.stringify({ + event: "same.event", + payload: { test: "data" }, + file: "fileA.ts", // Only fileA.ts handler should run + processAfterDelayMs: 0, + publishTimestamp: Date.now(), + }), + }, + }); + + expect(response.statusCode).toBe(200); + expect(handlerA).toHaveBeenCalledTimes(1); + expect(handlerB).not.toHaveBeenCalled(); + }); + + it("should call all handlers when no file specified", async () => { + process.env.K_SERVICE = "all-handlers-service"; + + const handlerA = jest.fn().mockResolvedValue(undefined); + const handlerB = jest.fn().mockResolvedValue(undefined); + + const eventBusOptions: EventBusOptions = { + busType: "rabbitmq", + handlers: [ + { + file: "fileA.ts", + handlers: { "broadcast.event": handlerA }, + }, + { + file: "fileB.ts", + handlers: { "broadcast.event": handlerB }, + }, + ], + validateMsg: jest.fn(), + processError: jest.fn().mockReturnValue({ err: new Error("test"), status: 500 }), + }; + + await consumerFastify.register(Plugins.EventBus, eventBusOptions); + await consumerFastify.ready(); + + // Send message without file filter + const response = await consumerFastify.inject({ + method: "POST", + url: "/rabbitmq/process-message", + payload: { + messageId: "msg-789", + body: JSON.stringify({ + event: "broadcast.event", + payload: { test: "data" }, + file: null, // No file filter - all handlers run + processAfterDelayMs: 0, + publishTimestamp: Date.now(), + }), + }, + }); + + expect(response.statusCode).toBe(200); + expect(handlerA).toHaveBeenCalledTimes(1); + expect(handlerB).toHaveBeenCalledTimes(1); + }); + + it("should skip handler when file does not match", async () => { + process.env.K_SERVICE = "file-mismatch-service"; + + const handler = jest.fn().mockResolvedValue(undefined); + + const eventBusOptions: EventBusOptions = { + busType: "rabbitmq", + handlers: [ + { + file: "registered-file.ts", + handlers: { "mismatch.event": handler }, + }, + ], + validateMsg: jest.fn(), + processError: jest.fn().mockReturnValue({ err: new Error("test"), status: 500 }), + }; + + await consumerFastify.register(Plugins.EventBus, eventBusOptions); + await consumerFastify.ready(); + + // Send message with different file + const response = await consumerFastify.inject({ + method: "POST", + url: "/rabbitmq/process-message", + payload: { + messageId: "msg-mismatch", + body: JSON.stringify({ + event: "mismatch.event", + payload: { test: "data" }, + file: "different-file.ts", // Doesn't match registered file + processAfterDelayMs: 0, + publishTimestamp: Date.now(), + }), + }, + }); + + // Should return 200 (no error) but handler should NOT be called + expect(response.statusCode).toBe(200); + expect(handler).not.toHaveBeenCalled(); + }); + + it("should return OK when no handler registered for event", async () => { + process.env.K_SERVICE = "no-handler-service"; + + const handler = jest.fn().mockResolvedValue(undefined); + + const eventBusOptions: EventBusOptions = { + busType: "rabbitmq", + handlers: [ + { + file: "some.ts", + handlers: { "registered.event": handler }, + }, + ], + validateMsg: jest.fn(), + processError: jest.fn().mockReturnValue({ err: new Error("test"), status: 500 }), + }; + + await consumerFastify.register(Plugins.EventBus, eventBusOptions); + await consumerFastify.ready(); + + // Send message for unregistered event + const response = await consumerFastify.inject({ + method: "POST", + url: "/rabbitmq/process-message", + payload: { + messageId: "msg-unknown", + body: JSON.stringify({ + event: "unknown.event", // No handler registered + payload: { test: "data" }, + file: null, + processAfterDelayMs: 0, + publishTimestamp: Date.now(), + }), + }, + }); + + // Should bail out gracefully + expect(response.statusCode).toBe(200); + expect(handler).not.toHaveBeenCalled(); + }); + + it("should pass correct data to handler", async () => { + process.env.K_SERVICE = "data-pass-service"; + + let receivedMsg: any = null; + const handler = jest.fn().mockImplementation(async (msg) => { + receivedMsg = msg; + }); + + const eventBusOptions: EventBusOptions = { + busType: "rabbitmq", + handlers: [ + { + file: "data.ts", + handlers: { "data.event": handler }, + }, + ], + validateMsg: jest.fn(), + processError: jest.fn().mockReturnValue({ err: new Error("test"), status: 500 }), + }; + + await consumerFastify.register(Plugins.EventBus, eventBusOptions); + await consumerFastify.ready(); + + const testPayload = { userId: 123, action: "test" }; + const publishTimestamp = Date.now(); + + const response = await consumerFastify.inject({ + method: "POST", + url: "/rabbitmq/process-message", + payload: { + messageId: "msg-data-123", + body: JSON.stringify({ + event: "data.event", + payload: testPayload, + file: null, + processAfterDelayMs: 0, + publishTimestamp, + }), + }, + }); + + expect(response.statusCode).toBe(200); + expect(handler).toHaveBeenCalledTimes(1); + expect(receivedMsg).not.toBeNull(); + expect(receivedMsg.data).toEqual(testPayload); + expect(receivedMsg.event).toBe("data.event"); + expect(receivedMsg.id).toBe("msg-data-123"); + }); + + it("should return 500 when handler throws error", async () => { + process.env.K_SERVICE = "error-handler-service"; + + const handler = jest.fn().mockRejectedValue(new Error("Handler failed")); + + const eventBusOptions: EventBusOptions = { + busType: "rabbitmq", + handlers: [ + { + file: "error.ts", + handlers: { "error.event": handler }, + }, + ], + validateMsg: jest.fn(), + processError: jest.fn().mockReturnValue({ err: new Error("Processed error"), status: 500 }), + }; + + await consumerFastify.register(Plugins.EventBus, eventBusOptions); + await consumerFastify.ready(); + + const response = await consumerFastify.inject({ + method: "POST", + url: "/rabbitmq/process-message", + payload: { + messageId: "msg-error", + body: JSON.stringify({ + event: "error.event", + payload: { test: "data" }, + file: "error.ts", // Specify file to get ErrorWithStatus thrown + processAfterDelayMs: 0, + publishTimestamp: Date.now(), + }), + }, + }); + + expect(response.statusCode).toBe(500); + expect(handler).toHaveBeenCalledTimes(1); + }); + }); + + describe("RabbitMQ Dead-Letter Retry", () => { + let consumerFastify: FastifyInstance; + let consumer: { close: () => Promise } | null = null; + const originalEnv = process.env; + + beforeEach(() => { + process.env = { ...originalEnv }; + process.env.RABBITMQ_URL = rabbitmqUrl; + consumerFastify = Fastify({ logger: false }); + }); + + afterEach(async () => { + process.env = originalEnv; + if (consumer) { + await consumer.close(); + consumer = null; + } + await consumerFastify.close(); + }); + + it("should retry via dead-letter on 500 error", async () => { + process.env.K_SERVICE = "retry-500-service"; + + let callCount = 0; + const messageHandler: EventHandler = jest.fn(async () => { + callCount++; + if (callCount < 2) { + throw new Error("Simulated 500 error"); + } + // Succeed on second attempt + }); + + const eventBusOptions: EventBusOptions = { + busType: "rabbitmq", + handlers: [ + { + file: "retry.ts", + handlers: { "retry.500": messageHandler }, + }, + ], + validateMsg: jest.fn(), + processError: jest.fn().mockReturnValue({ err: new Error("test"), status: 500 }), + }; + + await consumerFastify.register(Plugins.EventBus, eventBusOptions); + await consumerFastify.ready(); + consumer = await RabbitMqServiceBusConsumerBuilder(consumerFastify); + + // Publish message directly to RabbitMQ + const connection = new Connection(rabbitmqUrl); + const publisher = connection.createPublisher({ maxAttempts: 3 }); + + try { + await publisher.send( + { + exchange: "retry.main-exchange", + contentType: "application/json", + }, + JSON.stringify({ + event: "retry.500", + payload: { test: true }, + file: null, + processAfterDelayMs: 0, + publishTimestamp: Date.now(), + }) + ); + + // Wait for dead-letter retry cycle (5s TTL + processing time) + await new Promise((resolve) => setTimeout(resolve, 7000)); + + // Handler should be called twice: first fail, then succeed after DLX retry + expect(callCount).toBe(2); + } finally { + await publisher.close(); + await connection.close(); + } + }, 15000); + + it("should retry via dead-letter on 429 rate-limit", async () => { + process.env.K_SERVICE = "retry-429-service"; + + let callCount = 0; + const receivedMessages: any[] = []; + + const eventBusOptions: EventBusOptions = { + busType: "rabbitmq", + handlers: [ + { + file: "retry429.ts", + handlers: { + "retry.429": async (msg) => { + callCount++; + receivedMessages.push({ attempt: callCount, data: msg.data }); + }, + }, + }, + ], + validateMsg: jest.fn(), + processError: jest.fn().mockReturnValue({ err: new Error("test"), status: 500 }), + }; + + await consumerFastify.register(Plugins.EventBus, eventBusOptions); + + // Override the route to return 429 on first call + let httpCallCount = 0; + consumerFastify.addHook("onSend", async (request, reply, payload) => { + if (request.url === "/rabbitmq/process-message") { + httpCallCount++; + if (httpCallCount === 1) { + reply.code(429); + return JSON.stringify({ error: "rate limited" }); + } + } + return payload; + }); + + await consumerFastify.ready(); + consumer = await RabbitMqServiceBusConsumerBuilder(consumerFastify); + + const connection = new Connection(rabbitmqUrl); + const publisher = connection.createPublisher({ maxAttempts: 3 }); + + try { + await publisher.send( + { + exchange: "retry.main-exchange", + contentType: "application/json", + }, + JSON.stringify({ + event: "retry.429", + payload: { rateLimit: true }, + file: null, + processAfterDelayMs: 0, + publishTimestamp: Date.now(), + }) + ); + + // Wait for dead-letter retry cycle + await new Promise((resolve) => setTimeout(resolve, 7000)); + + // Should have been called twice via DLX retry + expect(httpCallCount).toBeGreaterThanOrEqual(2); + } finally { + await publisher.close(); + await connection.close(); + } + }, 15000); + + it("should retry via dead-letter on 409 lock conflict", async () => { + process.env.K_SERVICE = "retry-409-service"; + + const eventBusOptions: EventBusOptions = { + busType: "rabbitmq", + handlers: [ + { + file: "retry409.ts", + handlers: { "retry.409": jest.fn() }, + }, + ], + validateMsg: jest.fn(), + processError: jest.fn().mockReturnValue({ err: new Error("test"), status: 500 }), + }; + + await consumerFastify.register(Plugins.EventBus, eventBusOptions); + + let httpCallCount = 0; + consumerFastify.addHook("onSend", async (request, reply, payload) => { + if (request.url === "/rabbitmq/process-message") { + httpCallCount++; + if (httpCallCount === 1) { + reply.code(409); + return JSON.stringify({ error: "lock conflict" }); + } + } + return payload; + }); + + await consumerFastify.ready(); + consumer = await RabbitMqServiceBusConsumerBuilder(consumerFastify); + + const connection = new Connection(rabbitmqUrl); + const publisher = connection.createPublisher({ maxAttempts: 3 }); + + try { + await publisher.send( + { + exchange: "retry.main-exchange", + contentType: "application/json", + }, + JSON.stringify({ + event: "retry.409", + payload: { lockConflict: true }, + file: null, + processAfterDelayMs: 0, + publishTimestamp: Date.now(), + }) + ); + + // Wait for dead-letter retry cycle + await new Promise((resolve) => setTimeout(resolve, 7000)); + + // Should have retried via DLX + expect(httpCallCount).toBeGreaterThanOrEqual(2); + } finally { + await publisher.close(); + await connection.close(); + } + }, 15000); + + it("should verify message goes through retry queue", async () => { + process.env.K_SERVICE = "dlx-flow-service"; + + const eventBusOptions: EventBusOptions = { + busType: "rabbitmq", + handlers: [ + { + file: "dlx.ts", + handlers: { "dlx.test": jest.fn() }, + }, + ], + validateMsg: jest.fn(), + processError: jest.fn().mockReturnValue({ err: new Error("test"), status: 500 }), + }; + + await consumerFastify.register(Plugins.EventBus, eventBusOptions); + + // Always return 500 to keep message in retry loop + consumerFastify.addHook("onSend", async (request, reply, payload) => { + if (request.url === "/rabbitmq/process-message") { + reply.code(500); + return JSON.stringify({ error: "server error" }); + } + return payload; + }); + + await consumerFastify.ready(); + consumer = await RabbitMqServiceBusConsumerBuilder(consumerFastify); + + const connection = new Connection(rabbitmqUrl); + const publisher = connection.createPublisher({ maxAttempts: 3 }); + + try { + await publisher.send( + { + exchange: "dlx.main-exchange", + contentType: "application/json", + }, + JSON.stringify({ + event: "dlx.test", + payload: { dlxTest: true }, + file: null, + processAfterDelayMs: 0, + publishTimestamp: Date.now(), + }) + ); + + // Wait a bit for message to enter retry queue + await new Promise((resolve) => setTimeout(resolve, 1000)); + + // Check retry queue has the message + const retryQueueInfo = await connection.queueDeclare({ + queue: "dlx.retry-queue.dlx-flow-service", + passive: true, + }); + + // Message should be in retry queue (or already cycled back) + // We verify the queue exists and has been used + expect(retryQueueInfo.queue).toBe("dlx.retry-queue.dlx-flow-service"); + } finally { + await publisher.close(); + await connection.close(); + } + }, 15000); + + it("should use immediate REQUEUE for 425 (delayed message), not DLX", async () => { + // 425 should use REQUEUE with local sleep (up to 10s randomDelay) + // Key: retry should happen BEFORE 5s DLX TTL would complete (total < 15s) + // DLX path would be: 5s TTL + processing > 5s + // REQUEUE path: up to 10s randomDelay + immediate requeue + process.env.K_SERVICE = "requeue-service"; + + let httpCallCount = 0; + const callTimestamps: number[] = []; + + const eventBusOptions: EventBusOptions = { + busType: "rabbitmq", + handlers: [ + { + file: "requeue.ts", + handlers: { "requeue.425": jest.fn() }, + }, + ], + validateMsg: jest.fn(), + processError: jest.fn().mockReturnValue({ err: new Error("test"), status: 500 }), + }; + + // Register hook BEFORE plugin + consumerFastify.addHook("onSend", async (request, reply, payload) => { + if (request.url === "/rabbitmq/process-message") { + httpCallCount++; + callTimestamps.push(Date.now()); + if (httpCallCount === 1) { + reply.code(425); + return JSON.stringify({ processAfterDelayMs: 1000 }); + } + } + return payload; + }); + + await consumerFastify.register(Plugins.EventBus, eventBusOptions); + await consumerFastify.ready(); + consumer = await RabbitMqServiceBusConsumerBuilder(consumerFastify); + + const connection = new Connection(rabbitmqUrl); + const publisher = connection.createPublisher({ maxAttempts: 3 }); + + try { + await publisher.send( + { + exchange: "requeue.main-exchange", + contentType: "application/json", + }, + JSON.stringify({ + event: "requeue.425", + payload: { delayed: true }, + file: null, + processAfterDelayMs: 1000, + publishTimestamp: Date.now(), + }) + ); + + // Wait for REQUEUE cycle (randomDelay up to 10s + processing) + await new Promise((resolve) => setTimeout(resolve, 12000)); + + // Should have been called twice (via REQUEUE, not DLX) + expect(httpCallCount).toBeGreaterThanOrEqual(2); + + // Verify the retry didn't go through DLX (which adds 5s TTL) + // REQUEUE with 10s max delay should be faster than DLX 5s + 10s delay + if (callTimestamps.length >= 2) { + const timeBetweenCalls = callTimestamps[1] - callTimestamps[0]; + // Should be less than 12s (10s max delay + some processing) + expect(timeBetweenCalls).toBeLessThan(12000); + } + } finally { + await publisher.close(); + await connection.close(); + } + }, 20000); + + it("should dead-letter on other 4xx errors (bad message)", async () => { + process.env.K_SERVICE = "bad-msg-service"; + + let callCount = 0; + + const eventBusOptions: EventBusOptions = { + busType: "rabbitmq", + handlers: [ + { + file: "badmsg.ts", + handlers: { "badmsg.test": jest.fn() }, + }, + ], + validateMsg: jest.fn(), + processError: jest.fn().mockReturnValue({ err: new Error("test"), status: 500 }), + }; + + await consumerFastify.register(Plugins.EventBus, eventBusOptions); + + // Return 400 on first call, 200 after DLX retry + consumerFastify.addHook("onSend", async (request, reply, payload) => { + if (request.url === "/rabbitmq/process-message") { + callCount++; + if (callCount === 1) { + reply.code(400); + return JSON.stringify({ error: "bad request" }); + } + } + return payload; + }); + + await consumerFastify.ready(); + consumer = await RabbitMqServiceBusConsumerBuilder(consumerFastify); + + const connection = new Connection(rabbitmqUrl); + const publisher = connection.createPublisher({ maxAttempts: 3 }); + + try { + await publisher.send( + { + exchange: "bad.main-exchange", + contentType: "application/json", + }, + JSON.stringify({ + event: "badmsg.test", + payload: { bad: true }, + file: null, + processAfterDelayMs: 0, + publishTimestamp: Date.now(), + }) + ); + + // Wait for DLX retry cycle (5s TTL + processing) + await new Promise((resolve) => setTimeout(resolve, 7000)); + + // Should have retried via DLX + expect(callCount).toBeGreaterThanOrEqual(2); + } finally { + await publisher.close(); + await connection.close(); + } + }, 15000); + + it("should not fan out retry to other services", async () => { + // This test verifies that when a message fails in service-a, + // the retry only goes back to service-a, not to service-b + // Both services use same prefix "shared" so they share the main-exchange + + let serviceBMessageCount = 0; + let serviceAHttpCalls = 0; + + // Set up service A (will fail first, then succeed) + process.env.K_SERVICE = "shared-service-a"; + + // Register hook BEFORE plugin for service A + consumerFastify.addHook("onSend", async (request, reply, payload) => { + if (request.url === "/rabbitmq/process-message") { + serviceAHttpCalls++; + if (serviceAHttpCalls === 1) { + reply.code(500); + return JSON.stringify({ error: "server error" }); + } + } + return payload; + }); + + const serviceAOptions: EventBusOptions = { + busType: "rabbitmq", + handlers: [ + { + file: "fanout-a.ts", + handlers: { + "fanout.test": jest.fn(), + }, + }, + ], + validateMsg: jest.fn(), + processError: jest.fn().mockReturnValue({ err: new Error("test"), status: 500 }), + }; + + await consumerFastify.register(Plugins.EventBus, serviceAOptions); + await consumerFastify.ready(); + consumer = await RabbitMqServiceBusConsumerBuilder(consumerFastify); + + // Set up service B (separate instance, same prefix "shared") + const serviceBFastify = Fastify({ logger: false }); + process.env.K_SERVICE = "shared-service-b"; + + const serviceBOptions: EventBusOptions = { + busType: "rabbitmq", + handlers: [ + { + file: "fanout-b.ts", + handlers: { + "fanout.test": async () => { + serviceBMessageCount++; + }, + }, + }, + ], + validateMsg: jest.fn(), + processError: jest.fn().mockReturnValue({ err: new Error("test"), status: 500 }), + }; + + await serviceBFastify.register(Plugins.EventBus, serviceBOptions); + await serviceBFastify.ready(); + const consumerB = await RabbitMqServiceBusConsumerBuilder(serviceBFastify); + + const connection = new Connection(rabbitmqUrl); + const publisher = connection.createPublisher({ maxAttempts: 3 }); + + try { + // Publish to main exchange (fans out to both services) + await publisher.send( + { + exchange: "shared.main-exchange", + contentType: "application/json", + }, + JSON.stringify({ + event: "fanout.test", + payload: { fanoutTest: true }, + file: null, + processAfterDelayMs: 0, + publishTimestamp: Date.now(), + }) + ); + + // Wait for initial fanout + DLX retry cycle (5s TTL + processing) + await new Promise((resolve) => setTimeout(resolve, 8000)); + + // KEY ASSERTION: Service B should have received the message exactly once (initial fanout only) + // If retry went through main-exchange (the old buggy behavior), service B would get it again + expect(serviceBMessageCount).toBe(1); + + // Service A should have received it at least twice (initial fail + retry after DLX) + expect(serviceAHttpCalls).toBeGreaterThanOrEqual(2); + } finally { + await publisher.close(); + await connection.close(); + await consumerB.close(); + await serviceBFastify.close(); + } + }, 20000); + + it("should move message to DLQ after 10 failed retries with correct headers", async () => { + // This is the REAL DLQ behavior test: + // 1. Message fails 10 times via DLX retry cycles + // 2. After 10th failure, message is moved to DLQ + // 3. DLQ message has correct headers (x-original-queue, x-final-status-code, x-final-retry-count) + // 4. Message is removed from main queue (ACKed) + // + // Timing: 10 DLX cycles × 5s TTL = 50s minimum + process.env.K_SERVICE = "dlq-e2e-service"; + + let httpCallCount = 0; + + const eventBusOptions: EventBusOptions = { + busType: "rabbitmq", + handlers: [ + { + file: "dlq-e2e.ts", + handlers: { "dlq.e2e": jest.fn() }, + }, + ], + validateMsg: jest.fn(), + processError: jest.fn().mockReturnValue({ err: new Error("test"), status: 500 }), + }; + + // Always return 500 to force message through all 10 retry cycles + consumerFastify.addHook("onSend", async (request, reply, payload) => { + if (request.url === "/rabbitmq/process-message") { + httpCallCount++; + reply.code(500); + return JSON.stringify({ error: "simulated failure" }); + } + return payload; + }); + + await consumerFastify.register(Plugins.EventBus, eventBusOptions); + await consumerFastify.ready(); + consumer = await RabbitMqServiceBusConsumerBuilder(consumerFastify); + + const connection = new Connection(rabbitmqUrl); + const publisher = connection.createPublisher({ maxAttempts: 3 }); + + try { + // Verify DLQ queue was created during setup + const dlqInfo = await connection.queueDeclare({ + queue: "dlq.dlq.dlq-e2e-service", + passive: true, + }); + expect(dlqInfo.queue).toBe("dlq.dlq.dlq-e2e-service"); + + const testPayload = { dlqTest: true, timestamp: Date.now() }; + + // Publish message that will fail all 10 retries + await publisher.send( + { + exchange: "dlq.main-exchange", + contentType: "application/json", + }, + JSON.stringify({ + event: "dlq.e2e", + payload: testPayload, + file: null, + processAfterDelayMs: 0, + publishTimestamp: Date.now(), + }) + ); + + // Wait for 10 DLX cycles + processing time + // Each cycle: 5s TTL + processing overhead + // Total: ~55-60 seconds + await new Promise((resolve) => setTimeout(resolve, 58000)); + + // Should have been called at least 11 times: + // 1 initial call + 10 retry cycles = 11 calls + // On the 11th call (after 10 DLX cycles), x-death count = 10, triggers DLQ + expect(httpCallCount).toBeGreaterThanOrEqual(11); + + // Now verify the message is in the DLQ + // Create a temporary consumer to read from DLQ + let dlqMessage: any = null; + const dlqConsumer = connection.createConsumer( + { + queue: "dlq.dlq.dlq-e2e-service", + queueOptions: { passive: true }, + noAck: true, // Don't require ACK for test + }, + async (msg: any) => { + dlqMessage = msg; + return 1; // ACK + } + ); + + // Give consumer time to receive message + await new Promise((resolve) => setTimeout(resolve, 1000)); + await dlqConsumer.close(); + + // Verify DLQ message exists and has correct headers + expect(dlqMessage).not.toBeNull(); + expect(dlqMessage.headers).toBeDefined(); + expect(dlqMessage.headers["x-original-queue"]).toBe("dlq.queue.dlq-e2e-service"); + expect(dlqMessage.headers["x-final-status-code"]).toBe(500); + expect(dlqMessage.headers["x-final-retry-count"]).toBeGreaterThanOrEqual(10); + + // Verify message body contains original payload + const bodyStr = Buffer.isBuffer(dlqMessage.body) + ? dlqMessage.body.toString("utf8") + : dlqMessage.body; + const body = JSON.parse(bodyStr); + expect(body.payload).toEqual(testPayload); + expect(body.event).toBe("dlq.e2e"); + + // Verify main queue is empty (message was ACKed after moving to DLQ) + const mainQueueInfo = await connection.queueDeclare({ + queue: "dlq.queue.dlq-e2e-service", + passive: true, + }); + expect(mainQueueInfo.messageCount).toBe(0); + } finally { + await publisher.close(); + await connection.close(); + } + }, 70000); // 70s timeout for 10 retry cycles + }); +}); diff --git a/src/event-bus/rabbitmq.ts b/src/event-bus/rabbitmq.ts new file mode 100644 index 0000000..9eebdd4 --- /dev/null +++ b/src/event-bus/rabbitmq.ts @@ -0,0 +1,340 @@ +import { randomUUID } from "crypto"; +import { FastifyInstance, FastifyPluginAsync, FastifyRequest } from "fastify"; +import fp from "fastify-plugin"; +import { Queue } from "mnemonist"; +import { Connection, Publisher } from "rabbitmq-client"; +import { + CreateHandlerRunner, + ErrorWithStatus, + getHandlerMap, + noMatchingHandlers, +} from "./commons"; +import { EventBus, EventBusOptions, EventMessage } from "./interfaces"; +import { + ensureRabbitMqExchangesAndQueues, + getServicePrefix, +} from "./rabbitmq-utils"; + +interface IncomingRabbitMqMessage { + messageId: number; + body: string; +} + +interface MessageBody { + event: string; + payload: any; + file: string | null; + processAfterDelayMs: number | undefined; + publishTimestamp: number; +} + +interface MessageWithAttempts { + body: MessageBody; + attempts: number; +} + +const plugin: FastifyPluginAsync = async function ( + f, + options, +) { + const handlerMap = getHandlerMap(options); + f.decorate("_hasEventHandlers", handlerMap.size > 0); + + if (!process.env.RABBITMQ_URL) { + throw new Error("RabbitMq requires RABBITMQ_URL"); + } + if (!process.env.K_SERVICE) { + throw new Error("RabbitMq requires K_SERVICE"); + } + const connection = new Connection(process.env.RABBITMQ_URL); + const service = process.env.K_SERVICE; + if (options.ensureExchangesAndQueues) { + await ensureRabbitMqExchangesAndQueues(connection, service); + } + + const publisher = connection.createPublisher({ maxAttempts: 3 }); + + const msgQueue = new Queue(); + + const flush = createMessageFlusher(f, publisher, msgQueue); + const ref = setInterval(() => { + // eslint-disable-next-line @typescript-eslint/no-floating-promises + flush(); + }, 20); + + f.addHook("onClose", async () => { + f.log.info({ tag: "RABBITMQ_FINAL_FLUSH" }); + clearInterval(ref); + // final task to ensure all messages are flushed + await flush(true); + await publisher.close(); + await connection.close(); + }); + + function publishToExchange( + event: string, + payload: any, + file: string | null, + processAfterDelayMs: number, + req?: FastifyRequest, + ) { + options.validateMsg(event, payload, req); + const messageBody: MessageBody = { + event, + payload, + file: file ?? null, + processAfterDelayMs: + processAfterDelayMs > 0 ? processAfterDelayMs : undefined, + publishTimestamp: Date.now(), + }; + msgQueue.enqueue({ + body: messageBody, + attempts: 0, + }); + req?.log.info({ + tag: "EVENT_PUBLISH", + event, + payload, + processAfterDelayMs, + }); + } + + const bus: EventBus = { + publish(event, payload, processAfterDelayMs) { + publishToExchange(event, payload, null, processAfterDelayMs ?? 0); + }, + }; + f.decorate("EventBus", { + getter() { + return bus; + }, + }); + + f.decorateRequest("EventBus", { + getter() { + return { + publish: (event: string, payload: any, processAfterDelayMs?: number) => { // eslint-disable-line @typescript-eslint/no-explicit-any + publishToExchange( + event, + payload, + null, + processAfterDelayMs ?? 0, + this, + ); + }, + }; + }, + }); + + const selectAndRunHandlers = CreateHandlerRunner(f, options, handlerMap); + + f.post<{ Body: IncomingRabbitMqMessage }>( + "/rabbitmq/process-message", + { + schema: { + hide: true, + } as any, + }, + async function (req, reply) { + const rawMsg = req.body; + if (!rawMsg) { + reply.send("OK"); + return reply; + } + req.log.info({ + tag: "RABBITMQ_MESSAGE_RECEIVED", + messageId: rawMsg.messageId, + }); + const msg = convert(rawMsg); + options.validateMsg(msg.event, msg.data, req); + + if (noMatchingHandlers(handlerMap, msg)) { + // bail-out + // service has no event-handlers registered + reply.send("OK"); + return reply; + } + + req.log.info({ + tag: "RABBITMQ_MESSAGE_PROCESSING", + event: msg, + }); + + if ( + msg.processAfterDelayMs > 0 && + Date.now() < msg.publishTime.getTime() + msg.processAfterDelayMs + ) { + // wait for pub-sub to repush. can't process so early + reply + .status(425) + .send({ processAfterDelayMs: msg?.processAfterDelayMs }); + return reply; + } + + try { + await selectAndRunHandlers(req, msg, (event, payload, file) => + publishToExchange(event, payload, file, 0, req), + ); + reply.send("OK"); + return reply; + } catch (err) { + if (err instanceof ErrorWithStatus) { + reply.status(err.status).send(err.message); + } else { + reply.status(500).send("ERROR"); + } + return reply; + } + }, + ); +}; + +export = fp(plugin, { name: "fp-eventbus-rabbitmq" }); + +function convert(msg: IncomingRabbitMqMessage): EventMessage { + let body: MessageBody; + try { + body = JSON.parse(msg.body); + } catch { + throw new ErrorWithStatus( + 400, + `Invalid JSON in message body: ${msg.body?.substring(0, 100)}`, + ); + } + return { + id: "" + msg.messageId, + attributes: { + event: body.event, + processAfterDelayMs: "" + (body.processAfterDelayMs ?? 0), + file: body.file ?? "", + }, + data: body.payload, + event: body.event, + processAfterDelayMs: body.processAfterDelayMs ?? 0, + publishTime: new Date(body.publishTimestamp), + }; +} + +function createMessageFlusher( + f: FastifyInstance, + publisher: Publisher, + msgQueue: Queue, +) { + let running = false; + let currentFlush: Promise | null = null; + const CONCURRENCY = 10; + return async function flush(force = false) { + if (msgQueue.size === 0) { + return; + } + // If a flush is already running, wait for it to complete (for forced flushes) or skip + if (running) { + if (force && currentFlush) { + await currentFlush; + } else { + return; + } + } + running = true; + const doFlush = async () => { + let flushed = 0; + const total = msgQueue.size; + const start = Date.now(); + try { + const batch: MessageWithAttempts[] = []; + while (msgQueue.size > 0) { + const message = msgQueue.dequeue(); + if (!message) { + break; + } + batch.push(message); + + if (batch.length >= CONCURRENCY) { + await flushBatch(batch, publisher, f, msgQueue, () => flushed++); + batch.length = 0; + } + } + // Flush any remaining messages in the batch + if (batch.length > 0) { + await flushBatch(batch, publisher, f, msgQueue, () => flushed++); + } + } catch (err) { + const errMsg = err instanceof Error ? err.message : String(err); + f.log.error({ + tag: "RABBITMQ_FLUSH_ERROR", + msg: errMsg, + err, + }); + } finally { + running = false; + currentFlush = null; + const latency = Date.now() - start; + if (latency > 100) { + f.log.warn({ + tag: "RABBITMQ_SLOW_FLUSH", + latency, + total, + flushed, + }); + } + } + }; + currentFlush = doFlush(); + await currentFlush; + }; +} + +async function flushBatch( + batch: MessageWithAttempts[], + publisher: Publisher, + f: FastifyInstance, + msgQueue: Queue, + onSuccess: () => void, +) { + const service = process.env.K_SERVICE ?? ""; + const prefix = getServicePrefix(service); + await Promise.all( + batch.map((msg) => + publisher + .send( + { + messageId: randomUUID(), + appId: `${prefix}.${service}`, + contentType: "application/json", + durable: true, + exchange: `${prefix}.main-exchange`, + headers: { + event: msg.body.event, + file: msg.body.file, + processAfterDelayMs: "" + msg.body.processAfterDelayMs, + }, + }, + JSON.stringify(msg.body, null, 0), + ) + .then(() => { + onSuccess(); + }) + .catch((err: unknown) => { + const errMsg = err instanceof Error ? err.message : String(err); + msg.attempts++; + if (msg.attempts < 3) { + f.log.error({ + tag: "RABBITMQ_PUBLISH_ERROR", + msg: errMsg, + err, + attempt: msg.attempts, + }); + msgQueue.enqueue(msg); + } else { + f.log.error({ + tag: "RABBITMQ_MESSAGE_PERMANENTLY_DROPPED", + msg: errMsg, + err, + event: msg.body.event, + payload: msg.body.payload, + }); + } + }), + ), + ); +} diff --git a/src/index.ts b/src/index.ts index 8ed9fbf..0f26ff4 100644 --- a/src/index.ts +++ b/src/index.ts @@ -1,7 +1,11 @@ +import FpEventBus from "./event-bus"; import FpFileStore from "./file-store"; export { FileStore } from "./file-store"; +export { EventBus, EventBusOptions, EventMessage } from "./event-bus/interfaces"; +export { EventConsumerBuilder } from "./event-bus/event-consumer/interface"; export const Plugins = { + EventBus: FpEventBus, FileStore: FpFileStore, }; diff --git a/src/types.ts b/src/types.ts index 73a45d3..55a1329 100644 --- a/src/types.ts +++ b/src/types.ts @@ -1,6 +1,9 @@ -export {}; +import { EventBus } from "./event-bus/interfaces"; declare module "fastify" { + export interface FastifyRequest { + EventBus: EventBus; + } export interface FastifySchema { operationId?: string; summary?: string;