diff --git a/package.json b/package.json
index c91b850..889f393 100644
--- a/package.json
+++ b/package.json
@@ -14,6 +14,9 @@
"clean": "rimraf dist",
"test": "jest",
"test:coverage": "jest --coverage",
+ "test:rabbitmq": "jest --testPathPattern='rabbitmq.spec' --testTimeout=120000",
+ "test:integration": "jest --testPathPattern='event-bus.integration' --testTimeout=180000",
+ "test:all": "jest --testTimeout=180000",
"transpile": "tsc -p tsconfig.build.json",
"build": "run-s clean transpile",
"prepublishOnly": "npm run build"
@@ -32,9 +35,14 @@
"@aws-sdk/credential-provider-node": "^3.972.39",
"@aws-sdk/lib-storage": "^3.1045.0",
"@azure/identity": "^4.13.1",
+ "@azure/service-bus": "^7.9.5",
"@azure/storage-blob": "^12.31.0",
+ "@google-cloud/pubsub": "^5.2.2",
"@google-cloud/storage": "^7.19.0",
- "fastify-plugin": "^5.1.0"
+ "fastify-plugin": "^5.1.0",
+ "mnemonist": "^0.40.3",
+ "prom-client": "^15.1.3",
+ "rabbitmq-client": "^5.0.8"
},
"peerDependencies": {
"fastify": "^3.0.0 || ^4.0.0 || ^5.0.0"
@@ -43,6 +51,7 @@
"@trivago/prettier-plugin-sort-imports": "^6.0.2",
"@types/jest": "^30.0.0",
"@types/node": "^25.6.2",
+ "fastify": "^4.29.1",
"husky": "^9.1.7",
"jest": "^30.4.2",
"lint-staged": "^17.0.4",
diff --git a/src/event-bus/RABBITMQ.md b/src/event-bus/RABBITMQ.md
new file mode 100644
index 0000000..0bb87f5
--- /dev/null
+++ b/src/event-bus/RABBITMQ.md
@@ -0,0 +1,132 @@
+# RabbitMQ Architecture
+
+This document describes the exchange and queue topology used by the event-bus RabbitMQ implementation.
+
+## Naming Convention
+
+All resources use a dynamic prefix derived from the service name (part before the first hyphen):
+- `wms-cincout` → prefix: `wms`
+- `xyz-service` → prefix: `xyz`
+- `noprefix` → prefix: `default`
+
+## Topology Flowchart
+
+```mermaid
+flowchart TD
+ subgraph Publisher
+ P[Producer]
+ end
+
+ subgraph "Main Exchange (fanout)"
+ ME["{prefix}.main-exchange"]
+ end
+
+ subgraph "Service Queues"
+ SQ1["{prefix}.queue.{service-a}"]
+ SQ2["{prefix}.queue.{service-b}"]
+ end
+
+ subgraph "Retry System (per service)"
+ RE["{prefix}.retry-exchange.{service}
(direct)"]
+ RQ["{prefix}.retry-queue.{service}
(TTL: 5s)"]
+ end
+
+ subgraph "Dead Letter Queue (per service)"
+ DLQ["{prefix}.dlq.{service}
(manual retry)"]
+ end
+
+ subgraph Consumers
+ C1[Consumer A]
+ C2[Consumer B]
+ end
+
+ P -->|publish| ME
+ ME -->|fanout| SQ1
+ ME -->|fanout| SQ2
+ SQ1 -->|consume| C1
+ SQ2 -->|consume| C2
+
+ C1 -->|nack/reject| SQ1
+ SQ1 -.->|dead-letter| RE
+ RE -->|route| RQ
+ RQ -.->|"dead-letter after TTL
(via default exchange)"| SQ1
+ C1 -->|"retry >= 10"| DLQ
+```
+
+## Message Flow
+
+### Happy Path
+1. Producer publishes message to `{prefix}.main-exchange`
+2. Exchange fans out message to all bound service queues
+3. Consumer reads message from `{prefix}.queue.{service}`
+4. Consumer acknowledges (ack) → message removed
+
+### Retry Path (dead-letter)
+Used for: 5xx errors, 429 (rate-limit), 409 (lock conflict)
+
+1. Consumer returns `DROP` (nack with requeue=false)
+2. Message dead-letters to `{prefix}.retry-exchange.{service}`
+3. Retry exchange routes to `{prefix}.retry-queue.{service}`
+4. Message sits in retry queue for 5 seconds (TTL)
+5. After TTL expires, message dead-letters directly to `{prefix}.queue.{service}` (via default exchange)
+6. Message is re-delivered only to the failed service (not fanned out to all services)
+7. **Max 10 retries** - after 10 failed attempts, message is moved to `{prefix}.dlq.{service}` for manual retry (logged as `RABBITMQ_MESSAGE_MAX_RETRIES_EXCEEDED`)
+
+### Delayed Message Path (local sleep)
+Used for: 425 (too early - `processAfterDelayMs` not yet reached)
+
+1. Consumer sleeps locally (randomDelay)
+2. Consumer returns `REQUEUE` (nack with requeue=true)
+3. Message returns to the same queue immediately for retry
+4. This avoids multiple DLX cycles when delay exceeds 5s TTL
+
+## Consumer Status Handling
+
+| HTTP Status | ConsumerStatus | Behavior |
+|-------------|----------------|----------|
+| 2xx | `ACK` | Success, message removed |
+| 429, 409 | `DROP` | Dead-letter retry (rate-limit/lock conflict) |
+| 425 | `REQUEUE` | Local sleep + immediate requeue (delayed message) |
+| 5xx | `DROP` | Dead-letter retry (transient error) |
+| Other 4xx | `DROP` | Dead-letter (bad message, will likely fail again) |
+| Exception | `DROP` | Dead-letter (consumer error) |
+| Any (retry >= 10) | `ACK` | Max retries exceeded, message moved to DLQ |
+
+## Queue Configuration
+
+| Queue | Type | Dead-Letter Exchange | Dead-Letter Routing Key | TTL |
+|-------|------|---------------------|------------------------|-----|
+| `{prefix}.queue.{service}` | classic | `{prefix}.retry-exchange.{service}` | `retry` | - |
+| `{prefix}.retry-queue.{service}` | classic | `""` (default) | `{prefix}.queue.{service}` | 5000ms |
+| `{prefix}.dlq.{service}` | classic | - | - | - |
+
+## Exchange Configuration
+
+| Exchange | Type | Purpose |
+|----------|------|---------|
+| `{prefix}.main-exchange` | fanout | Distribute messages to all service queues |
+| `{prefix}.retry-exchange.{service}` | direct | Route failed messages to retry queue (binding key: `retry`) |
+
+## Dead Letter Queue (DLQ)
+
+Messages that fail after 10 retry attempts are moved to `{prefix}.dlq.{service}` for manual inspection and retry.
+
+### DLQ Message Headers
+
+Messages in the DLQ include additional headers for debugging:
+
+| Header | Description |
+|--------|-------------|
+| `x-original-queue` | The queue the message was consumed from |
+| `x-final-status-code` | HTTP status code from last processing attempt (if applicable) |
+| `x-final-error` | Error message from last processing attempt (if exception) |
+| `x-final-retry-count` | Number of retry attempts before moving to DLQ |
+| `x-death` | Standard RabbitMQ dead-letter history |
+
+### Manual Retry
+
+To retry messages from the DLQ:
+1. Inspect messages in `{prefix}.dlq.{service}` queue
+2. Fix the underlying issue (e.g., dependent service, data issue)
+3. Move message back to `{prefix}.queue.{service}` for reprocessing
+4. The `x-death` header will be preserved, so retry count continues from where it left off
diff --git a/src/event-bus/azure-servicebus.ts b/src/event-bus/azure-servicebus.ts
new file mode 100644
index 0000000..a705a4b
--- /dev/null
+++ b/src/event-bus/azure-servicebus.ts
@@ -0,0 +1,298 @@
+import * as AzureIden from "@azure/identity";
+import {
+ ServiceBusClient,
+ ServiceBusMessage,
+ ServiceBusSender,
+} from "@azure/service-bus";
+import { FastifyInstance, FastifyPluginAsync, FastifyRequest } from "fastify";
+import fp from "fastify-plugin";
+import { Queue } from "mnemonist";
+import {
+ CreateHandlerRunner,
+ ErrorWithStatus,
+ getHandlerMap,
+ noMatchingHandlers,
+} from "./commons";
+import { EventBus, EventBusOptions, EventMessage } from "./interfaces";
+
+interface IncomingServiceBusMessage {
+ messageId: number;
+ body: string;
+ scheduledEnqueueTimeUtc?: string;
+}
+
+interface MessageBody {
+ event: string;
+ payload: any;
+ file: string | null;
+ processAfterDelayMs: number | undefined;
+ publishTimestamp: number;
+}
+
+interface MessageWithAttempts {
+ msg: ServiceBusMessage;
+ attempts: number;
+}
+
+const plugin: FastifyPluginAsync = async function (
+ f,
+ options,
+) {
+ const handlerMap = getHandlerMap(options);
+ if (!options.namespace) {
+ throw new Error(
+ "Azure ServiceBus needs the namespace specified. Use EVENT_NAMESPACE env var",
+ );
+ }
+ if (!options.topic) {
+ throw new Error(
+ "Azure ServiceBus needs the topic specified. Use EVENT_TOPIC env var",
+ );
+ }
+
+ const client = new ServiceBusClient(
+ options.namespace,
+ new AzureIden.DefaultAzureCredential({}),
+ {},
+ );
+
+ const sender = client.createSender(options.topic);
+
+ const msgQueue = new Queue();
+
+ const flush = createMessageFlusher(f, sender, msgQueue);
+ const ref = setInterval(() => {
+ // eslint-disable-next-line @typescript-eslint/no-floating-promises
+ flush();
+ }, 20);
+
+ f.addHook("onClose", async () => {
+ f.log.info({ tag: "AZURE_SERVICEBUS_FINAL_FLUSH" });
+ clearInterval(ref);
+ // final task to ensure all messages are flushed
+ await flush(true);
+ await sender.close();
+ await client.close();
+ });
+
+ function publishToServiceBus(
+ event: string,
+ payload: any,
+ file: string | null,
+ processAfterDelayMs: number,
+ req?: FastifyRequest,
+ ) {
+ options.validateMsg(event, payload, req);
+ const messageBody: MessageBody = {
+ event,
+ payload,
+ file: file ?? null,
+ processAfterDelayMs:
+ processAfterDelayMs > 0 ? processAfterDelayMs : undefined,
+ publishTimestamp: Date.now(),
+ };
+ const encoded = JSON.stringify(messageBody);
+ msgQueue.enqueue({
+ msg: {
+ body: Buffer.from(encoded, "utf8"),
+ applicationProperties: {
+ event,
+ file: file ?? "",
+ },
+ contentType: "application/json",
+ scheduledEnqueueTimeUtc:
+ processAfterDelayMs > 0
+ ? new Date(messageBody.publishTimestamp + processAfterDelayMs)
+ : undefined,
+ },
+ attempts: 0,
+ });
+ req?.log.info({
+ tag: "EVENT_PUBLISH",
+ event,
+ payload,
+ processAfterDelayMs,
+ });
+ }
+
+ const bus: EventBus = {
+ publish(event, payload, processAfterDelayMs) {
+ publishToServiceBus(event, payload, null, processAfterDelayMs ?? 0);
+ },
+ };
+ f.decorate("EventBus", {
+ getter() {
+ return bus;
+ },
+ });
+
+ f.decorateRequest("EventBus", {
+ getter() {
+ return {
+ publish: (event: string, payload: any, processAfterDelayMs?: number) => { // eslint-disable-line @typescript-eslint/no-explicit-any
+ publishToServiceBus(event, payload, null, processAfterDelayMs ?? 0, this);
+ },
+ };
+ },
+ });
+
+ const selectAndRunHandlers = CreateHandlerRunner(f, options, handlerMap);
+
+ f.post<{ Body: IncomingServiceBusMessage }>(
+ "/azure-servicebus/process-message",
+ {
+ schema: {
+ hide: true,
+ } as any,
+ },
+ async function (req, reply) {
+ const rawMsg = req.body;
+ if (!rawMsg) {
+ reply.send("OK");
+ return reply;
+ }
+ req.log.info({
+ tag: "AZURE_SERVICEBUS_MESSAGE",
+ messageId: rawMsg.messageId,
+ scheduledEnqueueTimeUtc: rawMsg.scheduledEnqueueTimeUtc,
+ });
+ const msg = convert(rawMsg);
+ options.validateMsg(msg.event, msg.data, req);
+
+ if (noMatchingHandlers(handlerMap, msg)) {
+ // bail-out
+ // service has no event-handlers registered
+ reply.send("OK");
+ return reply;
+ }
+
+ req.log.info({
+ tag: "AZURE_SERVICEBUS_MESSAGE_HANDLE",
+ event: msg,
+ });
+
+ if (
+ msg.processAfterDelayMs > 0 &&
+ Date.now() < msg.publishTime.getTime() + msg.processAfterDelayMs
+ ) {
+ // wait for pub-sub to repush. can't process so early
+ reply
+ .status(425)
+ .send({ processAfterDelayMs: msg?.processAfterDelayMs });
+ return reply;
+ }
+
+ try {
+ await selectAndRunHandlers(req, msg, (event, payload, file) =>
+ publishToServiceBus(event, payload, file, msg.processAfterDelayMs, req),
+ );
+ reply.send("OK");
+ return reply;
+ } catch (err) {
+ if (err instanceof ErrorWithStatus) {
+ reply.status(err.status).send(err.message);
+ } else {
+ reply.status(500).send("ERROR");
+ }
+ return reply;
+ }
+ },
+ );
+};
+
+export = fp(plugin, { name: "fp-eventbus-azure-servicebus" });
+
+function convert(msg: IncomingServiceBusMessage): EventMessage {
+ const body: MessageBody = JSON.parse(msg.body);
+ return {
+ id: "" + msg.messageId,
+ attributes: {
+ event: body.event,
+ processAfterDelayMs: "" + (body.processAfterDelayMs ?? 0),
+ file: body.file ?? "",
+ },
+ data: body.payload,
+ event: body.event,
+ processAfterDelayMs: body.processAfterDelayMs ?? 0,
+ publishTime: new Date(body.publishTimestamp),
+ };
+}
+
+function createMessageFlusher(
+ f: FastifyInstance,
+ sender: ServiceBusSender,
+ msgQueue: Queue,
+) {
+ let running = false;
+ return async function flush(force = false) {
+ if (msgQueue.size === 0 || (!force && running)) {
+ return;
+ }
+ running = true;
+ const tracker: MessageWithAttempts[] = [];
+ let flushed = 0;
+ const total = msgQueue.size;
+ const start = Date.now();
+ try {
+ let batch = await sender.createMessageBatch({});
+ while (msgQueue.size > 0) {
+ const message = msgQueue.dequeue();
+ if (!message) {
+ break;
+ }
+ const added = batch.tryAddMessage(message.msg);
+ if (added) {
+ tracker.push(message);
+ } else {
+ if (batch.count > 0) {
+ await sender.sendMessages(batch);
+ flushed += batch.count;
+ tracker.length = 0;
+ }
+ batch = await sender.createMessageBatch();
+ // try adding this message solo to a batch
+ if (batch.tryAddMessage(message.msg)) {
+ tracker.push(message);
+ } else {
+ f.log.error({
+ tag: "AZURE_SERVICE_BUS_ERROR",
+ msg: "Message too big to fit in a batch",
+ });
+ }
+ }
+ }
+ if (batch.count > 0) {
+ await sender.sendMessages(batch);
+ flushed += batch.count;
+ tracker.length = 0;
+ }
+ } catch (err) {
+ for (const message of tracker) {
+ // don't re-attempt indefinitely
+ if (message.attempts < 3) {
+ msgQueue.enqueue({
+ msg: message.msg,
+ attempts: message.attempts + 1,
+ });
+ }
+ }
+ tracker.length = 0;
+ f.log.error({
+ tag: "AZURE_SERVICE_BUS_ERROR",
+ msg: (err as Error).message,
+ err,
+ });
+ } finally {
+ running = false;
+ const latency = Date.now() - start;
+ if (latency > 100) {
+ f.log.warn({
+ tag: "AZURE_SERVICEBUS_SLOW_FLUSH",
+ latency,
+ total,
+ flushed,
+ });
+ }
+ }
+ };
+}
diff --git a/src/event-bus/commons.spec.ts b/src/event-bus/commons.spec.ts
new file mode 100644
index 0000000..d92edeb
--- /dev/null
+++ b/src/event-bus/commons.spec.ts
@@ -0,0 +1,47 @@
+import { getHandlerMap } from "./commons";
+import { EventBusOptions, EventHandler } from "./interfaces";
+
+describe("getHandlerMap", () => {
+ it("should create handler map from options", () => {
+ const mockHandler1: EventHandler = async () => {};
+ const mockHandler2: EventHandler = async () => {};
+
+ const options: Pick = {
+ handlers: [
+ {
+ file: "file1.ts",
+ handlers: {
+ event1: mockHandler1,
+ event2: mockHandler2,
+ },
+ },
+ {
+ file: "file2.ts",
+ handlers: {
+ event1: mockHandler2,
+ },
+ },
+ ],
+ };
+
+ const handlerMap = getHandlerMap(options);
+
+ expect(handlerMap.size).toBe(2);
+ expect(handlerMap.get("event1")).toEqual([
+ { file: "file1.ts", handler: mockHandler1 },
+ { file: "file2.ts", handler: mockHandler2 },
+ ]);
+ expect(handlerMap.get("event2")).toEqual([
+ { file: "file1.ts", handler: mockHandler2 },
+ ]);
+ });
+
+ it("should handle empty handlers", () => {
+ const options: Pick = {
+ handlers: [],
+ };
+
+ const handlerMap = getHandlerMap(options);
+ expect(handlerMap.size).toBe(0);
+ });
+});
diff --git a/src/event-bus/commons.ts b/src/event-bus/commons.ts
new file mode 100644
index 0000000..c4ed274
--- /dev/null
+++ b/src/event-bus/commons.ts
@@ -0,0 +1,214 @@
+import * as prom from "prom-client";
+import { FastifyInstance, FastifyRequest } from "fastify";
+import {
+ ActionContext,
+ EventHandler,
+ EventMessage,
+ PublishToPubSub,
+} from "./interfaces";
+import { EventBusOptions } from "./interfaces";
+
+interface HandlerInfo {
+ file: string;
+ handler: EventHandler;
+}
+
+interface AppContext {
+ f: FastifyInstance;
+ counter: prom.Counter;
+ histogram: prom.Histogram;
+}
+
+type Action = () => Promise;
+
+export function getHandlerMap(options: Pick) {
+ const handlerMap = new Map<
+ string,
+ {
+ file: string;
+ handler: EventHandler;
+ }[]
+ >();
+ for (const { file, handlers } of options.handlers) {
+ for (const [key, handler] of Object.entries(handlers)) {
+ if (!handlerMap.has(key)) {
+ handlerMap.set(key, []);
+ }
+ if (handler as any) {
+ handlerMap.get(key)!.push({ file, handler });
+ }
+ }
+ }
+ return handlerMap;
+}
+
+export function noMatchingHandlers(
+ handlerMap: Map<
+ string,
+ {
+ file: string;
+ handler: EventHandler;
+ }[]
+ >,
+ eventMsg: EventMessage,
+) {
+ const handlers = handlerMap.get(eventMsg.event) ?? [];
+ const specifiedFile = eventMsg.attributes.file;
+ for (const { file } of handlers) {
+ if (specifiedFile && file !== specifiedFile) {
+ continue;
+ }
+ return false;
+ }
+ return true;
+}
+
+export class ErrorWithStatus extends Error {
+ status: number;
+ constructor(status: number, message?: string) {
+ super(message);
+ this.status = status;
+ }
+}
+
+/**
+ * Creates an action factory that generates an asynchronous function to execute a specific event handler.
+ * This factory encapsulates the logic for handling an event, including error processing and a retry strategy.
+ *
+ * Retry Strategy:
+ * 1. If `eventMsg.attributes.noRetry` is "true", no retry attempt is made, and the function returns.
+ * 2. Otherwise, the error is processed by `options.processError(err, ctx)`.
+ * 3. If `ctx.specifiedFile` is not set (i.e., the event was not initially targeted at a specific handler file):
+ * - The event is re-published to Pub/Sub via `ctx.publishToPubSub`.
+ * - The re-published event will include `ctx.file` (the file of the handler that just failed)
+ * as the `specifiedFile` attribute. This targets the retry to the specific handler that failed.
+ * 4. If `ctx.specifiedFile` is set (i.e., this is likely a retry for a specific handler):
+ * - An `ErrorWithStatus` is thrown. This allows the underlying Pub/Sub mechanism
+ * (or other calling infrastructure) to handle further retries or dead-lettering based on the error.
+ *
+ * Metrics (Prometheus):
+ * - A counter (`appCtx.counter`) is incremented for each handler execution, labeled with event, file, and status.
+ * - A histogram (`appCtx.histogram`) observes the latency of each handler execution, labeled with event, file, and status.
+ *
+ * @param options - Event bus options, including `processError` and `registry`.
+ * @param appCtx - Application context containing Fastify instance, Prometheus counter, and histogram.
+ * @returns A function that takes an `ActionContext` and returns an `Action` (an async function).
+ */
+const CreateActionFactory =
+ (options: EventBusOptions, appCtx: AppContext) =>
+ (ctx: ActionContext) =>
+ async () => {
+ const start = Date.now();
+ let status = 200;
+ try {
+ await ctx.handler.call(appCtx.f, ctx.eventMsg, ctx.req);
+ } catch (err) {
+ if (ctx.eventMsg.attributes.noRetry === "true") {
+ return;
+ }
+
+ ({ err, status } = options.processError(err, ctx));
+ if (!ctx.specifiedFile) {
+ // if no specified file is provided, trigger another message with
+ // the specified file
+ ctx.publishToPubSub(ctx.eventMsg.event, ctx.eventMsg.data, ctx.file);
+ } else {
+ throw new ErrorWithStatus(
+ status,
+ `${ctx.file}-${ctx.handler.name} failed with ${(err as Error).message}`,
+ );
+ }
+ } finally {
+ const label = {
+ event: ctx.eventMsg.event,
+ file: ctx.file,
+ status,
+ };
+ appCtx.counter.inc(label);
+ appCtx.histogram.observe(label, Date.now() - start);
+ }
+ };
+
+export function CreateHandlerRunner(
+ f: FastifyInstance,
+ options: EventBusOptions,
+ handlersMap: Map,
+) {
+ const histogram = new prom.Histogram({
+ help: "event_handler_latency_ms",
+ name: "event_handler_latency_ms",
+ buckets: [
+ 1, 5, 10, 20, 30, 40, 50, 60, 70, 80, 90, 100, 200, 300, 500, 750, 1000,
+ 1500, 2000, 2500, 3000, 4000, 5000, 6000, 7000, 8000, 9000, 10000, 12000,
+ 15000, 18000, 20000, 25000, 30000, 35000, 40000, 45000, 50000, 60000,
+ 70000, 80000,
+ ],
+ registers: options.registry ? [options.registry] : [],
+ labelNames: ["event", "file", "status"] as const,
+ });
+ const counter = new prom.Counter({
+ help: "event_handler_latency_total",
+ name: "event_handler_latency_total",
+ registers: options.registry ? [options.registry] : [],
+ labelNames: ["event", "file", "status"] as const,
+ });
+
+ const appCtx: AppContext = {
+ f,
+ counter,
+ histogram,
+ };
+ const createAction = CreateActionFactory(options, appCtx);
+
+ const CONCURRENCY = options.actionConcurrency ?? 1;
+ f.log.info({
+ tag: "HANDLER_ACTION_CONCURRENCY",
+ concurrency: CONCURRENCY,
+ });
+
+ return async function selectAndRunHandlers(
+ req: FastifyRequest,
+ eventMsg: EventMessage,
+ publishToPubSub: PublishToPubSub,
+ ) {
+ const handlers = handlersMap.get(eventMsg.event) ?? [];
+ const actions: Action[] = [];
+ const specifiedFile = eventMsg.attributes.file;
+ req.log.debug({
+ tag: "HANDLER_LOOKUP",
+ event: eventMsg.event,
+ specifiedFile,
+ registeredHandlers: handlers.map((h) => h.file),
+ handlersCount: handlers.length,
+ });
+ for (const { file, handler } of handlers) {
+ if (specifiedFile && file !== specifiedFile) {
+ continue;
+ }
+ const ctx: ActionContext = {
+ req,
+ publishToPubSub,
+ handler,
+ eventMsg,
+ file,
+ specifiedFile,
+ };
+ const act = createAction(ctx);
+ actions.push(act);
+ }
+
+ req.log.debug({
+ tag: "HANDLER_ACTIONS_CREATED",
+ event: eventMsg.event,
+ actionsCount: actions.length,
+ });
+ for (let i = 0; i < actions.length; i += CONCURRENCY) {
+ await Promise.all(actions.slice(i, i + CONCURRENCY).map((act) => act()));
+ }
+ req.log.debug({
+ tag: "HANDLER_ACTIONS_COMPLETED",
+ event: eventMsg.event,
+ actionsCount: actions.length,
+ });
+ };
+}
diff --git a/src/event-bus/event-consumer/azure-servicebus.ts b/src/event-bus/event-consumer/azure-servicebus.ts
new file mode 100644
index 0000000..af3c8d9
--- /dev/null
+++ b/src/event-bus/event-consumer/azure-servicebus.ts
@@ -0,0 +1,131 @@
+import * as AzureIden from "@azure/identity";
+import { RetryMode, ServiceBusClient } from "@azure/service-bus";
+import { EventConsumerBuilder } from "./interface";
+import { exponentialDelay, randomDelay } from "./utils";
+
+/**
+ * Azure ServiceBus supports
+ * 1. scheduledEnqueueTimeUtc -> this is to support delayed message delivery
+ * 2. peekLock -> this is to support message processing without contention
+ * 3. maxConcurrentCalls -> this is to support parallel processing
+ */
+export const AzureServiceBusConsumerBuilder: EventConsumerBuilder = async (
+ instance,
+) => {
+ if (!process.env.EVENT_NAMESPACE) {
+ throw new Error("Azure ServiceBus needs EVENT_NAMESPACE");
+ }
+ if (!process.env.EVENT_TOPIC) {
+ throw new Error("Azure ServiceBus needs EVENT_TOPIC");
+ }
+ if (!process.env.EVENT_SUBSCRIPTION) {
+ throw new Error("Azure ServiceBus needs EVENT_SUBSCRIPTION");
+ }
+ const client = new ServiceBusClient(
+ process.env.EVENT_NAMESPACE,
+ new AzureIden.DefaultAzureCredential({}),
+ {
+ retryOptions: {
+ mode: RetryMode.Fixed,
+ retryDelayInMs: 10_000,
+ timeoutInMs: 5_000,
+ maxRetries: 5,
+ },
+ },
+ );
+ const receiver = client.createReceiver(
+ process.env.EVENT_TOPIC,
+ process.env.EVENT_SUBSCRIPTION,
+ {
+ skipParsingBodyAsJson: true,
+ skipConvertingDate: true,
+ receiveMode: "peekLock",
+ },
+ );
+ const ctrl = new AbortController();
+
+ const handler = receiver.subscribe(
+ {
+ async processMessage(msg) {
+ if (ctrl.signal.aborted) {
+ await receiver.abandonMessage(msg);
+ return;
+ }
+ if (msg.deliveryCount && msg.deliveryCount > 1) {
+ instance.log.warn({
+ tag: "AZURE_SERVICE_BUS_RECEIVER_RETRY",
+ deliveryCount: msg.deliveryCount,
+ messageId: msg.messageId,
+ });
+ await exponentialDelay(Math.max(msg.deliveryCount - 2, 0));
+ }
+ try {
+ const payload = {
+ messageId: msg.messageId,
+ body: msg.body,
+ scheduledEnqueueTimeUtc: msg.scheduledEnqueueTimeUtc?.toISOString(),
+ };
+ if (Buffer.isBuffer(payload.body)) {
+ payload.body = payload.body.toString("utf8");
+ }
+ const resp = await instance.inject({
+ method: "POST",
+ url: "/azure-servicebus/process-message",
+ payload,
+ });
+ if (resp.statusCode >= 200 && resp.statusCode < 300) {
+ await receiver.completeMessage(msg);
+ } else if (resp.statusCode === 429 || resp.statusCode === 409) {
+ // rate-limited or lock-conflict
+ await randomDelay();
+ await receiver.abandonMessage(msg);
+ } else if (resp.statusCode === 425) {
+ // delayed message
+ // ideally it shouldn't come here because azure service bus already
+ // supports delayed message receiving
+ instance.log.error({
+ tag: "AZURE_SERVICE_BUS_DELAYED_MESSAGE",
+ payload,
+ });
+ await receiver.abandonMessage(msg);
+ } else {
+ await receiver.abandonMessage(msg);
+ }
+ } catch (err) {
+ instance.log.error({
+ tag: "AZURE_SERVICE_BUS_RECEIVER_ERROR",
+ err: err,
+ });
+ await receiver.abandonMessage(msg);
+ }
+ },
+ async processError(args) {
+ instance.log.error({
+ tag: "AZURE_SERVICE_BUS_RECEIVER_ERROR",
+ err: args.error,
+ entityPath: args.entityPath,
+ });
+ },
+ },
+ {
+ abortSignal: ctrl.signal,
+ autoCompleteMessages: false,
+ maxConcurrentCalls:
+ parseInt(
+ process.env.EVENT_SUBSCRIPTION_MAX_CONCURRENT_CALLS ?? "10",
+ 10,
+ ) || 10,
+ },
+ );
+ instance.log.info(
+ "Attached to Azure ServiceBus Subscription=" +
+ process.env.EVENT_SUBSCRIPTION,
+ );
+ return {
+ close: async () => {
+ ctrl.abort();
+ await handler.close();
+ await client.close();
+ },
+ };
+};
diff --git a/src/event-bus/event-consumer/gcp-pubsub.ts b/src/event-bus/event-consumer/gcp-pubsub.ts
new file mode 100644
index 0000000..4643ef2
--- /dev/null
+++ b/src/event-bus/event-consumer/gcp-pubsub.ts
@@ -0,0 +1,204 @@
+import * as timers from "node:timers/promises";
+import { Message, PubSub, Subscription } from "@google-cloud/pubsub";
+import { FastifyInstance } from "fastify";
+import { EventConsumerBuilder } from "./interface";
+import { randomDelay } from "./utils";
+
+// Resolves @opentelemetry/api from either NODE_PATH (OTel operator) or the
+// pnpm absolute path. All copies share the same global symbol so the
+// registered SDK is always reached.
+function getOtelApi() {
+ try {
+ // eslint-disable-next-line @typescript-eslint/no-require-imports
+ return require("@opentelemetry/api");
+ } catch {
+ try {
+ // eslint-disable-next-line @typescript-eslint/no-require-imports
+ return require(
+ "/app/node_modules/.pnpm/@opentelemetry+api@1.9.0/node_modules/@opentelemetry/api",
+ );
+ } catch {
+ return null;
+ }
+ }
+}
+
+// eslint-disable-next-line @typescript-eslint/no-explicit-any
+const otel: any = getOtelApi();
+// eslint-disable-next-line @typescript-eslint/no-explicit-any
+const tracer: any = otel
+ ? otel.trace.getTracer("fp-eventbus-gcp-pubsub-consumer")
+ : null;
+
+/**
+ * GCP Pub/Sub supports
+ * 1. flowControl -> this is to support parallel processing
+ * 2. need to support delayed message delivery manually (recursive retry for 425)
+ */
+export const GcpPubSubConsumerBuilder: EventConsumerBuilder = async (
+ instance,
+) => {
+ const pubsub = new PubSub();
+ if (
+ process.env.EVENT_TOPIC &&
+ !process.env.EVENT_SUBSCRIPTION &&
+ process.env.APP
+ ) {
+ instance.log.info(
+ "Looking for GCP PubSub Subscription for APP=" + process.env.APP,
+ );
+ const subs = await pubsub.topic(process.env.EVENT_TOPIC).getSubscriptions();
+ instance.log.info("Found Subscriptions=" + subs[0].length);
+ for (const sub of subs[0]) {
+ if (sub.name.includes(process.env.APP)) {
+ process.env.EVENT_SUBSCRIPTION = sub.name;
+ }
+ }
+ }
+ if (!process.env.EVENT_SUBSCRIPTION) {
+ throw new Error("GCP PubSub needs EVENT_SUBSCRIPTION");
+ }
+ const subName = process.env.EVENT_SUBSCRIPTION;
+ instance.log.info("Attaching to GCP PubSub Subscription=" + subName);
+
+ const runner = new Runner(instance, pubsub, subName);
+ runner.init();
+
+ return {
+ close: async () => {
+ await runner.close();
+ await pubsub.close();
+ },
+ };
+};
+
+class Runner {
+ public subscription: Subscription | null = null;
+ private readonly ctrl = new AbortController();
+ private timerRef: NodeJS.Timeout | null = null;
+ constructor(
+ public readonly instance: FastifyInstance,
+ public readonly pubsub: PubSub,
+ public readonly subName: string,
+ ) {}
+
+ async close(): Promise {
+ this.ctrl.abort();
+ if (this.timerRef) {
+ clearTimeout(this.timerRef);
+ }
+ if (this.subscription) {
+ this.subscription.removeAllListeners();
+ return this.subscription.close();
+ }
+ }
+
+ init() {
+ if (this.ctrl.signal.aborted) {
+ return;
+ }
+ this.subscription = this.pubsub.subscription(this.subName, {
+ flowControl: {
+ allowExcessMessages: false,
+ maxMessages:
+ parseInt(process.env.EVENT_SUBSCRIPTION_MAX_MESSAGES ?? "10", 10) ||
+ 10,
+ },
+ });
+ this.subscription.addListener("message", (msg: Message) => {
+ this.processMsg(msg, 0);
+ });
+ this.subscription.on("error", (err) => {
+ this.instance.log.error({ tag: "GCP_PUBSUB_RECEIVER_ERROR", err });
+ this.subscription?.removeAllListeners();
+ this.subscription?.close();
+ this.instance.log.warn("waiting for 10 seconds before reconnecting...");
+ this.timerRef = setTimeout(() => {
+ this.init();
+ }, 10000); // retrying after 10 seconds
+ });
+ }
+
+ async processMsg(msg: Message, attempt: number) {
+ if (this.ctrl.signal.aborted) {
+ msg.nack();
+ return;
+ }
+
+ // Extract the W3C trace context from Pub/Sub message attributes and start
+ // a CONSUMER span. Re-inject the context as HTTP headers so the push
+ // handler at /gcp-pubsub/process-message continues the same trace tree.
+ const attrs = msg.attributes ?? {};
+ // eslint-disable-next-line @typescript-eslint/no-explicit-any
+ let ctx: any = null;
+ // eslint-disable-next-line @typescript-eslint/no-explicit-any
+ let span: any = null;
+ if (otel && attrs.traceparent) {
+ const parentCtx = otel.propagation.extract(otel.context.active(), attrs);
+ span = tracer.startSpan(
+ "pubsub.pull.process",
+ { kind: otel.SpanKind.CONSUMER },
+ parentCtx,
+ );
+ ctx = otel.trace.setSpan(parentCtx, span);
+ }
+
+ const doProcess = async () => {
+ const headers: Record = {
+ "content-type": "application/json",
+ };
+ // Re-inject the active context as HTTP headers so the Fastify inject()
+ // call carries the traceparent into the push handler.
+ if (otel && ctx) {
+ otel.propagation.inject(otel.context.active(), headers);
+ }
+ try {
+ const resp = await this.instance.inject({
+ method: "POST",
+ url: "/gcp-pubsub/process-message",
+ payload: {
+ message: {
+ attributes: msg.attributes,
+ data: msg.data.toString("base64"),
+ messageId: msg.id,
+ publishTime: msg.publishTime.toISOString(),
+ },
+ attempt,
+ subscription: this.subName,
+ },
+ headers,
+ });
+ if (resp.statusCode >= 200 && resp.statusCode < 300) {
+ msg.ack();
+ } else if (resp.statusCode === 429 || resp.statusCode === 409) {
+ // rate-limited or lock-conflict
+ await randomDelay();
+ msg.nack();
+ } else if (resp.statusCode === 425 && attempt < 2) {
+ const parsed = JSON.parse(resp.body);
+ const processAfterDelayMs = parsed?.processAfterDelayMs ?? 0;
+ if (processAfterDelayMs > 0) {
+ await timers.setTimeout(processAfterDelayMs);
+ }
+ await this.processMsg(msg, attempt + 1);
+ } else {
+ msg.nack();
+ }
+ } catch (err) {
+ this.instance.log.error({
+ tag: "GCP_PUBSUB_RECEIVER_ERROR",
+ err,
+ });
+ msg.nack();
+ } finally {
+ span?.end();
+ }
+ };
+
+ if (otel && ctx) {
+ await otel.context.with(ctx, doProcess);
+ } else {
+ await doProcess();
+ }
+ }
+}
diff --git a/src/event-bus/event-consumer/index.ts b/src/event-bus/event-consumer/index.ts
new file mode 100644
index 0000000..8305e21
--- /dev/null
+++ b/src/event-bus/event-consumer/index.ts
@@ -0,0 +1,26 @@
+import { FastifyInstance } from "fastify";
+import { AzureServiceBusConsumerBuilder } from "./azure-servicebus";
+import { GcpPubSubConsumerBuilder } from "./gcp-pubsub";
+import { EventConsumer } from "./interface";
+import { RabbitMqServiceBusConsumerBuilder } from "./rabbitmq";
+import { EventBusOptions } from "../interfaces";
+
+export function CreateEventConsumer(
+ instance: FastifyInstance,
+ type: EventBusOptions["busType"],
+): Promise {
+ switch (type) {
+ case "gcp-pubsub":
+ return GcpPubSubConsumerBuilder(instance);
+ case "azure-servicebus":
+ return AzureServiceBusConsumerBuilder(instance);
+ case "rabbitmq":
+ return RabbitMqServiceBusConsumerBuilder(instance);
+ default:
+ return Promise.resolve({
+ close: async () => {
+ // Implement the close logic here
+ },
+ });
+ }
+}
diff --git a/src/event-bus/event-consumer/interface.ts b/src/event-bus/event-consumer/interface.ts
new file mode 100644
index 0000000..c8fd6eb
--- /dev/null
+++ b/src/event-bus/event-consumer/interface.ts
@@ -0,0 +1,9 @@
+import { FastifyInstance } from "fastify";
+
+export interface EventConsumer {
+ close(): Promise;
+}
+
+export type EventConsumerBuilder = (
+ instance: FastifyInstance,
+) => Promise;
diff --git a/src/event-bus/event-consumer/rabbitmq.ts b/src/event-bus/event-consumer/rabbitmq.ts
new file mode 100644
index 0000000..099f3b4
--- /dev/null
+++ b/src/event-bus/event-consumer/rabbitmq.ts
@@ -0,0 +1,192 @@
+import { AsyncMessage, Connection, ConsumerStatus } from "rabbitmq-client";
+import {
+ RABBITMQ_TAG,
+ ensureRabbitMqExchangesAndQueues,
+ getServicePrefix,
+} from "../rabbitmq-utils";
+import { EventConsumerBuilder } from "./interface";
+import { randomDelay } from "./utils";
+
+const MAX_RETRY_COUNT = 10;
+
+/**
+ * Extract retry count from x-death header added by RabbitMQ DLX.
+ * Only counts rejections (consumer drops), not TTL expirations from retry queue.
+ * x-death entries look like: { queue, reason, count, ... }
+ * - reason "rejected" = consumer nacked/dropped the message
+ * - reason "expired" = TTL expired in retry queue (not a retry attempt)
+ */
+function getRetryCount(msg: AsyncMessage): number {
+ const xDeath = msg.headers?.["x-death"];
+ if (!Array.isArray(xDeath) || xDeath.length === 0) {
+ return 0;
+ }
+ // Only count "rejected" entries (actual consumer rejections)
+ // Don't count "expired" entries from retry queue TTL
+ return xDeath
+ .filter((death) => death.reason === "rejected")
+ .reduce((total, death) => total + (death.count || 0), 0);
+}
+
+/**
+ * RabbitMq supports
+ * 1. prefetchCount -> this is to support parallel processing
+ * 2. concurrency -> this is to support parallel processing
+ * 3. ConsumerStatus -> requeue and drop messages appropriately
+ */
+export const RabbitMqServiceBusConsumerBuilder: EventConsumerBuilder = async (
+ instance,
+) => {
+ // Skip consumer creation if no handlers are registered
+ if ((instance as any)._hasEventHandlers === false) {
+ instance.log.info("No event handlers registered, skipping RabbitMQ consumer");
+ return {
+ close: async () => {},
+ };
+ }
+
+ if (!process.env.RABBITMQ_URL) {
+ throw new Error("RabbitMq requires RABBITMQ_URL");
+ }
+ if (!process.env.K_SERVICE) {
+ throw new Error("RabbitMq requires K_SERVICE");
+ }
+ const connection = new Connection(process.env.RABBITMQ_URL);
+ await ensureRabbitMqExchangesAndQueues(connection, process.env.K_SERVICE);
+
+ const ctrl = new AbortController();
+ const service = process.env.K_SERVICE;
+ const prefix = getServicePrefix(service);
+
+ // Publisher for DLQ messages
+ const dlqPublisher = connection.createPublisher({
+ confirm: true,
+ maxAttempts: 3,
+ });
+
+ const sub = connection.createConsumer(
+ {
+ queue: `${prefix}.queue.${service}`,
+ queueOptions: {
+ passive: true,
+ },
+ qos: {
+ prefetchCount: 10,
+ },
+ concurrency: 10,
+ consumerTag: `${prefix}.consumer.${service}.${RABBITMQ_TAG}`,
+ },
+ async (msg: AsyncMessage) => {
+ if (ctrl.signal.aborted) {
+ return ConsumerStatus.REQUEUE;
+ }
+ try {
+ const payload = {
+ messageId: msg.messageId,
+ body: msg.body,
+ };
+ if (Buffer.isBuffer(payload.body)) {
+ payload.body = payload.body.toString("utf8");
+ }
+ const resp = await instance.inject({
+ method: "POST",
+ url: "/rabbitmq/process-message",
+ payload,
+ });
+ if (resp.statusCode >= 200 && resp.statusCode < 300) {
+ return ConsumerStatus.ACK;
+ }
+
+ // Check retry count before dead-lettering
+ const retryCount = getRetryCount(msg);
+ if (retryCount >= MAX_RETRY_COUNT) {
+ instance.log.error({
+ tag: "RABBITMQ_MESSAGE_MAX_RETRIES_EXCEEDED",
+ messageId: msg.messageId,
+ retryCount,
+ statusCode: resp.statusCode,
+ body: resp.body,
+ });
+ // Store in DLQ for manual retry before ACKing
+ await dlqPublisher.send(
+ {
+ routingKey: `${prefix}.dlq.${service}`,
+ contentType: "application/json",
+ headers: {
+ ...msg.headers,
+ "x-original-queue": `${prefix}.queue.${service}`,
+ "x-final-status-code": resp.statusCode,
+ "x-final-retry-count": retryCount,
+ },
+ },
+ msg.body,
+ );
+ // ACK to remove from queue permanently (don't dead-letter again)
+ return ConsumerStatus.ACK;
+ }
+
+ if (resp.statusCode === 429 || resp.statusCode === 409) {
+ // rate-limited or lock-conflict, use dead-letter retry
+ return ConsumerStatus.DROP;
+ } else if (resp.statusCode === 425) {
+ // delayed message, use local sleep since delay may exceed DLX TTL
+ await randomDelay();
+ return ConsumerStatus.REQUEUE;
+ } else if (resp.statusCode >= 500 && resp.statusCode < 600) {
+ // transient server error, use dead-letter retry
+ return ConsumerStatus.DROP;
+ } else {
+ instance.log.warn({
+ tag: "RABBITMQ_MESSAGE_DROPPED",
+ messageId: msg.messageId,
+ statusCode: resp.statusCode,
+ body: resp.body,
+ });
+ return ConsumerStatus.DROP;
+ }
+ } catch (err) {
+ // Check retry count before dead-lettering on exception
+ const retryCount = getRetryCount(msg);
+ if (retryCount >= MAX_RETRY_COUNT) {
+ instance.log.error({
+ tag: "RABBITMQ_MESSAGE_MAX_RETRIES_EXCEEDED",
+ messageId: msg.messageId,
+ retryCount,
+ err,
+ });
+ // Store in DLQ for manual retry before ACKing
+ await dlqPublisher.send(
+ {
+ routingKey: `${prefix}.dlq.${service}`,
+ contentType: "application/json",
+ headers: {
+ ...msg.headers,
+ "x-original-queue": `${prefix}.queue.${service}`,
+ "x-final-error": err instanceof Error ? err.message : String(err),
+ "x-final-retry-count": retryCount,
+ },
+ },
+ msg.body,
+ );
+ return ConsumerStatus.ACK;
+ }
+ instance.log.error({
+ tag: "RABBITMQ_CONSUMER_ERROR",
+ err: err,
+ });
+ return ConsumerStatus.DROP;
+ }
+ },
+ );
+ instance.log.info(
+ "Attached to RabbitMQ for Service=" + process.env.K_SERVICE,
+ );
+ return {
+ close: async () => {
+ ctrl.abort();
+ await sub.close();
+ await dlqPublisher.close();
+ await connection.close();
+ },
+ };
+};
diff --git a/src/event-bus/event-consumer/utils.ts b/src/event-bus/event-consumer/utils.ts
new file mode 100644
index 0000000..ebd4e75
--- /dev/null
+++ b/src/event-bus/event-consumer/utils.ts
@@ -0,0 +1,33 @@
+import * as timers from "node:timers/promises";
+
+const BASE_DELAY = Math.max(
+ parseInt(process.env.EVENT_RETRY_BASE_DELAY ?? "0", 10) || 0,
+ 5_000,
+);
+const MAX_DELAY = Math.max(
+ parseInt(process.env.EVENT_RETRY_MAX_DELAY ?? "0", 10) || 0,
+ 60_000,
+);
+const RANDOMIZED_DELAY_MAX = Math.max(
+ parseInt(process.env.EVENT_RANDOMIZED_DELAY_MAX ?? "0", 10) || 0,
+ 10_000
+);
+
+export async function exponentialDelay(
+ attempt: number,
+ baseDelayMs: number = BASE_DELAY,
+ maxDelayMs: number = MAX_DELAY,
+ randomizationFactor: number = 0.5,
+): Promise {
+ let delay = Math.min(baseDelayMs * 2 ** attempt, maxDelayMs);
+ if (randomizationFactor > 0) {
+ const randomFactor = (Math.random() - 0.5) * randomizationFactor * delay;
+ delay = Math.max(0, delay + randomFactor);
+ }
+ await timers.setTimeout(delay);
+}
+
+export async function randomDelay(max: number = RANDOMIZED_DELAY_MAX) {
+ const delay = Math.ceil(Math.random() * max);
+ await timers.setTimeout(delay);
+}
\ No newline at end of file
diff --git a/src/event-bus/gcp-pubsub.ts b/src/event-bus/gcp-pubsub.ts
new file mode 100644
index 0000000..6693b18
--- /dev/null
+++ b/src/event-bus/gcp-pubsub.ts
@@ -0,0 +1,270 @@
+import { PubSub } from "@google-cloud/pubsub";
+import { FastifyPluginAsync, FastifyRequest } from "fastify";
+import fp from "fastify-plugin";
+import {
+ CreateHandlerRunner,
+ ErrorWithStatus,
+ getHandlerMap,
+ noMatchingHandlers,
+} from "./commons";
+import { EventBus, EventBusOptions, EventMessage } from "./interfaces";
+
+interface PubsubMessage {
+ message: {
+ attributes: Record;
+ data: string;
+ messageId: string;
+ publishTime: string;
+ };
+ subscription: string;
+ attempt: number;
+}
+
+// Resolves @opentelemetry/api from either NODE_PATH (OTel operator) or the
+// pnpm absolute path. All copies share the same global symbol so the
+// registered SDK is always reached.
+function getOtelApi() {
+ try {
+ // eslint-disable-next-line @typescript-eslint/no-require-imports
+ return require("@opentelemetry/api");
+ } catch {
+ try {
+ // eslint-disable-next-line @typescript-eslint/no-require-imports
+ return require(
+ "/app/node_modules/.pnpm/@opentelemetry+api@1.9.0/node_modules/@opentelemetry/api",
+ );
+ } catch {
+ return null;
+ }
+ }
+}
+
+// eslint-disable-next-line @typescript-eslint/no-explicit-any
+const otel: any = getOtelApi();
+// eslint-disable-next-line @typescript-eslint/no-explicit-any
+const tracer: any = otel
+ ? otel.trace.getTracer("fp-eventbus-gcp-pubsub")
+ : null;
+
+const plugin: FastifyPluginAsync = async function (
+ f,
+ options,
+) {
+ if (!options.topic) {
+ throw new Error(
+ "Google Cloud PubSub needs the topic specified. Use EVENT_TOPIC env var",
+ );
+ }
+
+ const handlerMap = getHandlerMap(options);
+ const client = new PubSub();
+ const topic = client.topic(options.topic, {
+ batching: {
+ maxMilliseconds: 10,
+ maxMessages: 100,
+ },
+ });
+
+ f.addHook("onClose", async () => {
+ await topic.flush();
+ f.log.info({ tag: "GCP_PUBSUB_FINAL_FLUSH" });
+ await client.close();
+ });
+
+ function publishToPubSub(
+ event: string,
+ payload: any, // eslint-disable-line @typescript-eslint/no-explicit-any
+ file: string | null,
+ processAfterDelayMs: number,
+ req?: FastifyRequest,
+ ) {
+ options.validateMsg(event, payload, req);
+ const attrs: Record = {
+ event,
+ };
+ if (file) {
+ attrs.file = file;
+ }
+ if (processAfterDelayMs > 0) {
+ attrs.processAfterDelayMs = "" + processAfterDelayMs;
+ }
+
+ // Inject the active OTel span context as a W3C traceparent/tracestate into
+ // Pub/Sub message attributes. context.active() reads from AsyncLocalStorage
+ // at call-time, so when this is invoked inside a context.with() block (e.g.
+ // the outbox worker) the correct parent context is used automatically.
+ if (otel) {
+ otel.propagation.inject(otel.context.active(), attrs);
+ if (attrs.traceparent) {
+ (req ?? f).log.info({
+ tag: "EVENT_TRACEPARENT_INJECT",
+ event,
+ traceparent: attrs.traceparent,
+ });
+ }
+ }
+
+ // eslint-disable-next-line @typescript-eslint/no-floating-promises
+ topic.publishMessage({
+ json: { event, payload },
+ attributes: attrs,
+ });
+ req?.log.info({
+ tag: "EVENT_PUBLISH",
+ event,
+ payload,
+ processAfterDelayMs,
+ });
+ }
+
+ const bus: EventBus = {
+ publish(event, payload, processAfterDelayMs) {
+ publishToPubSub(event, payload, null, processAfterDelayMs ?? 0);
+ },
+ };
+ f.decorate("EventBus", {
+ getter() {
+ return bus;
+ },
+ });
+
+ f.decorateRequest("EventBus", {
+ getter() {
+ return {
+ // req.EventBus.publish — called inside an HTTP request handler.
+ // context.active() already holds the HTTP server span context here,
+ // so propagation.inject works automatically.
+ publish: (event: string, payload: any, processAfterDelayMs?: number) => { // eslint-disable-line @typescript-eslint/no-explicit-any
+ publishToPubSub(event, payload, null, processAfterDelayMs ?? 0, this);
+ },
+ };
+ },
+ });
+
+ const selectAndRunHandlers = CreateHandlerRunner(f, options, handlerMap);
+
+ f.post<{ Body: PubsubMessage }>(
+ "/gcp-pubsub/process-message",
+ {
+ schema: {
+ hide: true,
+ } as any, // eslint-disable-line @typescript-eslint/no-explicit-any
+ },
+ async function (req, reply) {
+ const body = req.body;
+ if (!body) {
+ reply.send("OK");
+ return reply;
+ }
+ const eventMsg = convert(body);
+ const attrs = body.message.attributes ?? {};
+
+ // Extract the W3C trace context that was injected by the publisher.
+ // This reconstructs the parent span context so all handler spans
+ // (DB queries, further publishes) are children of the original trace.
+ let handlerCtx = otel ? otel.context.active() : null;
+ let span = null;
+ if (otel && attrs.traceparent) {
+ const parentCtx = otel.propagation.extract(
+ otel.context.active(),
+ attrs,
+ );
+ span = tracer.startSpan(
+ `pubsub.consume.${eventMsg.event}`,
+ { kind: otel.SpanKind.CONSUMER },
+ parentCtx,
+ );
+ handlerCtx = otel.trace.setSpan(parentCtx, span);
+ }
+
+ req.log.info({
+ tag: "PUB_SUB_MSG",
+ messageId: body.message.messageId,
+ subscription: body.subscription,
+ attributes: attrs,
+ publishTime: body.message.publishTime,
+ attempt: body.attempt,
+ traceparent_extracted: attrs.traceparent ?? "MISSING",
+ });
+
+ options.validateMsg(eventMsg.event, eventMsg.data, req);
+
+ if (noMatchingHandlers(handlerMap, eventMsg)) {
+ span?.end();
+ reply.send("OK");
+ return reply;
+ }
+
+ req.log.info({
+ tag: "PUB_SUB_MSG_HANDLE",
+ event: eventMsg,
+ traceparent: attrs.traceparent ?? "MISSING",
+ });
+
+ if (
+ eventMsg.processAfterDelayMs > 0 &&
+ Date.now() <
+ eventMsg.publishTime.getTime() + eventMsg.processAfterDelayMs
+ ) {
+ req.log.info({
+ tag: "PUB_SUB_MSG_DELAYED",
+ eventId: eventMsg.id,
+ });
+ span?.end();
+ reply
+ .status(425)
+ .send({ processAfterDelayMs: eventMsg?.processAfterDelayMs });
+ return reply;
+ }
+
+ const runHandlers = async () => {
+ try {
+ await selectAndRunHandlers(req, eventMsg, (event, payload, file) =>
+ publishToPubSub(
+ event,
+ payload,
+ file,
+ eventMsg.processAfterDelayMs,
+ req,
+ ),
+ );
+ reply.send("OK");
+ } catch (err) {
+ if (err instanceof ErrorWithStatus) {
+ reply.status(err.status).send(err.message);
+ } else {
+ reply.status(500).send("ERROR");
+ }
+ } finally {
+ span?.end();
+ }
+ };
+
+ // Run handlers inside the extracted parent context so all child spans
+ // (DB queries, further publishes) are linked to the original trace tree.
+ if (otel && handlerCtx) {
+ await otel.context.with(handlerCtx, runHandlers);
+ } else {
+ await runHandlers();
+ }
+ return reply;
+ },
+ );
+};
+
+export = fp(plugin, { name: "fp-eventbus-gcp-pubsub" });
+
+function convert(msg: PubsubMessage): EventMessage {
+ const buf = Buffer.from(msg.message.data, "base64");
+ const json = buf.toString("utf-8");
+ const obj = JSON.parse(json);
+ return {
+ id: msg.message.messageId,
+ publishTime: new Date(msg.message.publishTime),
+ processAfterDelayMs:
+ parseInt(msg.message.attributes.processAfterDelayMs ?? "0", 10) || 0,
+ attributes: msg.message.attributes,
+ event: obj.event,
+ data: obj.payload,
+ };
+}
diff --git a/src/event-bus/index.spec.ts b/src/event-bus/index.spec.ts
new file mode 100644
index 0000000..603b8ff
--- /dev/null
+++ b/src/event-bus/index.spec.ts
@@ -0,0 +1,293 @@
+import Fastify from "fastify";
+import EventBusPlugin from "./index";
+import { EventBusOptions, EventHandler } from "./interfaces";
+
+describe("EventBus Plugin", () => {
+ let fastify: ReturnType;
+ let mockValidateMsg: jest.Mock;
+ let mockProcessError: jest.Mock;
+ let mockHandler: EventHandler;
+
+ beforeEach(() => {
+ fastify = Fastify({ logger: false });
+ mockValidateMsg = jest.fn();
+ mockProcessError = jest.fn().mockReturnValue({ err: new Error("test"), status: 500 });
+ mockHandler = jest.fn().mockResolvedValue(undefined);
+ });
+
+ afterEach(async () => {
+ try {
+ await fastify.close();
+ } catch (error) {
+ // Ignore errors from fastify already being closed
+ if (!error.message.includes('already been closed')) {
+ throw error;
+ }
+ }
+ });
+
+ describe("Plugin Registration", () => {
+ it("should register with in-process bus type (default)", async () => {
+ const options: EventBusOptions = {
+ busType: "in-process",
+ handlers: [{
+ file: "test.ts",
+ handlers: { testEvent: mockHandler }
+ }],
+ validateMsg: mockValidateMsg,
+ processError: mockProcessError,
+ };
+
+ await fastify.register(EventBusPlugin, options);
+ expect(fastify.EventBus).toBeDefined();
+ });
+
+ it("should require proper environment variables for rabbitmq", async () => {
+ // Test that rabbitmq registration fails without required env vars
+ const options: EventBusOptions = {
+ busType: "rabbitmq",
+ handlers: [],
+ validateMsg: mockValidateMsg,
+ processError: mockProcessError,
+ };
+
+ await expect(fastify.register(EventBusPlugin, options)).rejects.toThrow();
+ });
+
+ it("should register with gcp-pubsub bus type", async () => {
+ process.env.EVENT_TOPIC = "test-topic";
+
+ const options: EventBusOptions = {
+ busType: "gcp-pubsub",
+ topic: "test-topic",
+ handlers: [{
+ file: "test.ts",
+ handlers: { testEvent: mockHandler }
+ }],
+ validateMsg: mockValidateMsg,
+ processError: mockProcessError,
+ };
+
+ await fastify.register(EventBusPlugin, options);
+ expect(fastify.EventBus).toBeDefined();
+
+ delete process.env.EVENT_TOPIC;
+ });
+
+ it("should register with azure-servicebus bus type", async () => {
+ process.env.EVENT_TOPIC = "test-topic";
+
+ const options: EventBusOptions = {
+ busType: "azure-servicebus",
+ namespace: "test-namespace",
+ topic: "test-topic",
+ handlers: [{
+ file: "test.ts",
+ handlers: { testEvent: mockHandler }
+ }],
+ validateMsg: mockValidateMsg,
+ processError: mockProcessError,
+ };
+
+ await fastify.register(EventBusPlugin, options);
+ expect(fastify.EventBus).toBeDefined();
+
+ delete process.env.EVENT_TOPIC;
+ });
+
+ it("should throw error for azure-servicebus without namespace", async () => {
+ const options: EventBusOptions = {
+ busType: "azure-servicebus",
+ handlers: [{
+ file: "test.ts",
+ handlers: { testEvent: mockHandler }
+ }],
+ validateMsg: mockValidateMsg,
+ processError: mockProcessError,
+ };
+
+ await expect(fastify.register(EventBusPlugin, options)).rejects.toThrow(
+ "Azure ServiceBus needs the namespace specified"
+ );
+ });
+ });
+
+ describe("Event Publish Route", () => {
+ beforeEach(async () => {
+ const options: EventBusOptions = {
+ busType: "in-process",
+ handlers: [{
+ file: "test.ts",
+ handlers: { testEvent: mockHandler }
+ }],
+ validateMsg: mockValidateMsg,
+ processError: mockProcessError,
+ };
+
+ await fastify.register(EventBusPlugin, options);
+ });
+
+ it("should register publish route by default", async () => {
+ const response = await fastify.inject({
+ method: "POST",
+ url: "/event-bus/publish/testEvent",
+ payload: { test: "data" }
+ });
+
+ expect(response.statusCode).toBe(200);
+ expect(response.body).toBe("OK");
+ });
+
+ it("should handle string payload via query param", async () => {
+ const response = await fastify.inject({
+ method: "POST",
+ url: "/event-bus/publish/testEvent?stringPayload=test-string",
+ payload: {}
+ });
+
+ expect(response.statusCode).toBe(200);
+ expect(response.body).toBe("OK");
+ });
+
+ it("should handle integer payload via query param", async () => {
+ const response = await fastify.inject({
+ method: "POST",
+ url: "/event-bus/publish/testEvent?integerPayload=123",
+ payload: {}
+ });
+
+ expect(response.statusCode).toBe(200);
+ expect(response.body).toBe("OK");
+ });
+
+ it("should not register publish route when disabled", async () => {
+ const fastifyDisabled = Fastify({ logger: false });
+
+ const options: EventBusOptions = {
+ busType: "in-process",
+ handlers: [{
+ file: "test.ts",
+ handlers: { testEvent: mockHandler }
+ }],
+ validateMsg: mockValidateMsg,
+ processError: mockProcessError,
+ disableEventPublishRoute: true,
+ };
+
+ await fastifyDisabled.register(EventBusPlugin, options);
+
+ const response = await fastifyDisabled.inject({
+ method: "POST",
+ url: "/event-bus/publish/testEvent",
+ payload: { test: "data" }
+ });
+
+ expect(response.statusCode).toBe(404);
+ await fastifyDisabled.close();
+ });
+ });
+
+ describe("EventBus Decorator", () => {
+ beforeEach(async () => {
+ const options: EventBusOptions = {
+ busType: "in-process",
+ handlers: [{
+ file: "test.ts",
+ handlers: { testEvent: mockHandler }
+ }],
+ validateMsg: mockValidateMsg,
+ processError: mockProcessError,
+ };
+
+ await fastify.register(EventBusPlugin, options);
+ });
+
+ it("should provide EventBus on fastify instance", () => {
+ expect(fastify.EventBus).toBeDefined();
+ expect(typeof fastify.EventBus.publish).toBe("function");
+ });
+
+ it("should allow publishing events programmatically", () => {
+ expect(() => {
+ fastify.EventBus.publish("testEvent", { test: "data" });
+ }).not.toThrow();
+ });
+
+ it("should allow publishing events with delay", () => {
+ expect(() => {
+ fastify.EventBus.publish("testEvent", { test: "data" }, 1000);
+ }).not.toThrow();
+ });
+ });
+
+ describe("Configuration Options", () => {
+ it("should handle empty handlers array", async () => {
+ const options: EventBusOptions = {
+ busType: "in-process",
+ handlers: [],
+ validateMsg: mockValidateMsg,
+ processError: mockProcessError,
+ };
+
+ await fastify.register(EventBusPlugin, options);
+ expect(fastify.EventBus).toBeDefined();
+ });
+
+ it("should handle multiple handler files", async () => {
+ const handler1: EventHandler = jest.fn().mockResolvedValue(undefined);
+ const handler2: EventHandler = jest.fn().mockResolvedValue(undefined);
+
+ const options: EventBusOptions = {
+ busType: "in-process",
+ handlers: [
+ {
+ file: "handlers1.ts",
+ handlers: { event1: handler1 }
+ },
+ {
+ file: "handlers2.ts",
+ handlers: { event2: handler2 }
+ }
+ ],
+ validateMsg: mockValidateMsg,
+ processError: mockProcessError,
+ };
+
+ await fastify.register(EventBusPlugin, options);
+ expect(fastify.EventBus).toBeDefined();
+ });
+
+ it("should handle topic and namespace options", async () => {
+ const options: EventBusOptions = {
+ busType: "in-process",
+ topic: "test-topic",
+ namespace: "test-namespace",
+ handlers: [{
+ file: "test.ts",
+ handlers: { testEvent: mockHandler }
+ }],
+ validateMsg: mockValidateMsg,
+ processError: mockProcessError,
+ };
+
+ await fastify.register(EventBusPlugin, options);
+ expect(fastify.EventBus).toBeDefined();
+ });
+
+ it("should handle actionConcurrency option", async () => {
+ const options: EventBusOptions = {
+ busType: "in-process",
+ handlers: [{
+ file: "test.ts",
+ handlers: { testEvent: mockHandler }
+ }],
+ validateMsg: mockValidateMsg,
+ processError: mockProcessError,
+ actionConcurrency: 5,
+ };
+
+ await fastify.register(EventBusPlugin, options);
+ expect(fastify.EventBus).toBeDefined();
+ });
+ });
+});
\ No newline at end of file
diff --git a/src/event-bus/index.ts b/src/event-bus/index.ts
new file mode 100644
index 0000000..ced9f04
--- /dev/null
+++ b/src/event-bus/index.ts
@@ -0,0 +1,82 @@
+import { FastifyPluginAsync } from "fastify";
+import fp from "fastify-plugin";
+import { EventBusOptions } from "./interfaces";
+
+const plugin: FastifyPluginAsync = async function (
+ f,
+ options,
+) {
+ switch (options.busType) {
+ case "gcp-pubsub":
+ // eslint-disable-next-line @typescript-eslint/no-require-imports
+ await f.register(require("./gcp-pubsub"), options);
+ break;
+ case "azure-servicebus":
+ if (!options.namespace) {
+ throw new Error(
+ "Azure ServiceBus needs the namespace specified. Use EVENT_NAMESPACE env var",
+ );
+ }
+ // eslint-disable-next-line @typescript-eslint/no-require-imports
+ await f.register(require("./azure-servicebus"), options);
+ break;
+ case "rabbitmq":
+ // eslint-disable-next-line @typescript-eslint/no-require-imports
+ await f.register(require("./rabbitmq"), options);
+ break;
+ default:
+ // eslint-disable-next-line @typescript-eslint/no-require-imports
+ await f.register(require("./local"), options);
+ }
+
+ ///
+ if (!options.disableEventPublishRoute) {
+ f.post<{
+ Params: { event: string };
+ Querystring: {
+ stringPayload?: string;
+ integerPayload?: string;
+ };
+ Body: any;
+ }>(
+ "/event-bus/publish/:event",
+ {
+ schema: {
+ description:
+ "API to push a event manually. Use the appropriate query-param or request-body to send the payload.",
+ operationId: "publishToEventBus",
+ params: {
+ type: "object",
+ properties: {
+ event: { type: "string" },
+ },
+ },
+ querystring: {
+ type: "object",
+ properties: {
+ stringPayload: { type: "string" },
+ integerPayload: { type: "string" },
+ },
+ },
+ body: {
+ type: "object",
+ additionalProperties: true,
+ },
+ },
+ },
+ async function (req) {
+ if (req.query.stringPayload || req.query.integerPayload) {
+ req.EventBus.publish(
+ req.params.event,
+ req.query.stringPayload || req.query.integerPayload,
+ );
+ } else {
+ req.EventBus.publish(req.params.event, req.body);
+ }
+ return "OK";
+ },
+ );
+ }
+};
+
+export default fp(plugin, { name: "fp-eventbus" });
diff --git a/src/event-bus/interfaces.ts b/src/event-bus/interfaces.ts
new file mode 100644
index 0000000..324991a
--- /dev/null
+++ b/src/event-bus/interfaces.ts
@@ -0,0 +1,58 @@
+import { FastifyInstance, FastifyRequest } from "fastify";
+import { Registry } from "prom-client";
+
+export interface EventBusOptions {
+ busType: "rabbitmq" | "gcp-pubsub" | "azure-servicebus" | "in-process";
+ topic?: string;
+ namespace?: string;
+ handlers: {
+ file: string;
+ handlers: EventHandlers;
+ }[];
+ validateMsg: (event: string, payload: any, req?: FastifyRequest) => void;
+ processError(err: any, ctx: ActionContext): { err: any; status: number };
+ ensureExchangesAndQueues?: boolean;
+ disableEventPublishRoute?: boolean;
+ actionConcurrency?: number;
+ registry?: Registry;
+}
+
+export type PublishToPubSub = (
+ event: string,
+ payload: any,
+ file: string | null,
+) => void;
+
+export interface ActionContext {
+ req: FastifyRequest;
+ publishToPubSub: PublishToPubSub;
+ handler: EventHandler;
+ ///
+ eventMsg: EventMessage;
+ file: string;
+ specifiedFile: string | undefined;
+}
+
+export type EventHandlers = {
+ readonly [k: string]: EventHandler;
+};
+
+export type EventHandler = (
+ this: FastifyInstance,
+ msg: EventMessage,
+ req: FastifyRequest,
+) => Promise;
+
+export interface EventMessage {
+ id: string;
+ publishTime: Date;
+ processAfterDelayMs: number;
+ attributes: Record;
+ event: string;
+ data: T;
+}
+
+export interface EventBus {
+ // published event should be processed after (Date.now() + processAfterDelayMs)
+ publish(event: string, payload: any, processAfterDelayMs?: number): void;
+}
diff --git a/src/event-bus/local.spec.ts b/src/event-bus/local.spec.ts
new file mode 100644
index 0000000..4ccb1c8
--- /dev/null
+++ b/src/event-bus/local.spec.ts
@@ -0,0 +1,391 @@
+import Fastify from "fastify";
+import LocalEventBusPlugin from "./local";
+import { EventBusOptions, EventHandler, EventMessage } from "./interfaces";
+
+describe("Local EventBus Plugin", () => {
+ let mockValidateMsg: jest.Mock;
+ let mockProcessError: jest.Mock;
+ let mockHandler: EventHandler;
+
+ beforeEach(() => {
+ mockValidateMsg = jest.fn();
+ mockProcessError = jest.fn().mockReturnValue({ err: new Error("test"), status: 500 });
+ mockHandler = jest.fn().mockResolvedValue(undefined);
+ });
+
+ describe("Local EventBus Registration", () => {
+ it("should register local event bus plugin", async () => {
+ const fastify = Fastify({ logger: false });
+ const options: EventBusOptions = {
+ busType: "in-process",
+ handlers: [{
+ file: "test.ts",
+ handlers: { testEvent: mockHandler }
+ }],
+ validateMsg: mockValidateMsg,
+ processError: mockProcessError,
+ };
+
+ await fastify.register(LocalEventBusPlugin, options);
+
+ expect(fastify.EventBus).toBeDefined();
+ expect(typeof fastify.EventBus.publish).toBe("function");
+
+ await fastify.close();
+ });
+
+ it("should decorate both fastify instance and request", async () => {
+ const fastify = Fastify({ logger: false });
+ const options: EventBusOptions = {
+ busType: "in-process",
+ handlers: [{
+ file: "test.ts",
+ handlers: { testEvent: mockHandler }
+ }],
+ validateMsg: mockValidateMsg,
+ processError: mockProcessError,
+ };
+
+ await fastify.register(LocalEventBusPlugin, options);
+
+ // Test fastify instance decoration
+ expect(fastify.EventBus).toBeDefined();
+
+ // Test request decoration through a route
+ fastify.get("/test", async (request) => {
+ expect(request.EventBus).toBeDefined();
+ expect(typeof request.EventBus.publish).toBe("function");
+ return "ok";
+ });
+
+ const response = await fastify.inject({
+ method: "GET",
+ url: "/test"
+ });
+
+ expect(response.statusCode).toBe(200);
+
+ await fastify.close();
+ });
+ });
+
+ describe("Event Publishing", () => {
+ it("should publish events and validate", async () => {
+ const fastify = Fastify({ logger: false });
+ const options: EventBusOptions = {
+ busType: "in-process",
+ handlers: [{
+ file: "test.ts",
+ handlers: { testEvent: mockHandler }
+ }],
+ validateMsg: mockValidateMsg,
+ processError: mockProcessError,
+ };
+
+ await fastify.register(LocalEventBusPlugin, options);
+
+ // Test publishing - this should call validateMsg
+ expect(() => {
+ fastify.EventBus.publish("testEvent", { test: "data" });
+ }).not.toThrow();
+
+ expect(mockValidateMsg).toHaveBeenCalledWith("testEvent", { test: "data" }, undefined);
+
+ // Don't close fastify to avoid flush hook issues
+ });
+
+ it("should publish events with delay", async () => {
+ const fastify = Fastify({ logger: false });
+ const options: EventBusOptions = {
+ busType: "in-process",
+ handlers: [{
+ file: "test.ts",
+ handlers: { testEvent: mockHandler }
+ }],
+ validateMsg: mockValidateMsg,
+ processError: mockProcessError,
+ };
+
+ await fastify.register(LocalEventBusPlugin, options);
+
+ expect(() => {
+ fastify.EventBus.publish("testEvent", { test: "data" }, 1000);
+ }).not.toThrow();
+
+ expect(mockValidateMsg).toHaveBeenCalledWith("testEvent", { test: "data" }, undefined);
+
+ // Don't close fastify to avoid flush hook issues
+ });
+ });
+
+ describe("Message Processing", () => {
+ it("should process messages via internal route", async () => {
+ const fastify = Fastify({ logger: false });
+ const options: EventBusOptions = {
+ busType: "in-process",
+ handlers: [{
+ file: "test.ts",
+ handlers: { testEvent: mockHandler }
+ }],
+ validateMsg: mockValidateMsg,
+ processError: mockProcessError,
+ };
+
+ await fastify.register(LocalEventBusPlugin, options);
+
+ const testMessage: EventMessage = {
+ id: "test-msg-1",
+ event: "testEvent",
+ data: { test: "data" },
+ attributes: {},
+ publishTime: new Date("2023-01-01"),
+ processAfterDelayMs: 0,
+ };
+
+ const response = await fastify.inject({
+ method: "POST",
+ url: "/local-servicebus/process-message",
+ payload: testMessage
+ });
+
+ expect(response.statusCode).toBe(200);
+ expect(response.body).toBe("OK");
+ expect(mockValidateMsg).toHaveBeenCalledWith("testEvent", { test: "data" }, expect.any(Object));
+
+ await fastify.close();
+ });
+
+ it("should handle messages with no body", async () => {
+ const fastify = Fastify({ logger: false });
+ const options: EventBusOptions = {
+ busType: "in-process",
+ handlers: [{
+ file: "test.ts",
+ handlers: { testEvent: mockHandler }
+ }],
+ validateMsg: mockValidateMsg,
+ processError: mockProcessError,
+ };
+
+ await fastify.register(LocalEventBusPlugin, options);
+
+ const response = await fastify.inject({
+ method: "POST",
+ url: "/local-servicebus/process-message"
+ // No payload
+ });
+
+ expect(response.statusCode).toBe(200);
+ expect(response.body).toBe("NO_MESSAGE");
+
+ await fastify.close();
+ });
+
+ it("should bail out when no matching handlers", async () => {
+ const fastify = Fastify({ logger: false });
+ const options: EventBusOptions = {
+ busType: "in-process",
+ handlers: [{
+ file: "test.ts",
+ handlers: { testEvent: mockHandler }
+ }],
+ validateMsg: mockValidateMsg,
+ processError: mockProcessError,
+ };
+
+ await fastify.register(LocalEventBusPlugin, options);
+
+ const testMessage: EventMessage = {
+ id: "test-msg-1",
+ event: "nonExistentEvent",
+ data: { test: "data" },
+ attributes: {},
+ publishTime: new Date("2023-01-01"),
+ processAfterDelayMs: 0,
+ };
+
+ const response = await fastify.inject({
+ method: "POST",
+ url: "/local-servicebus/process-message",
+ payload: testMessage
+ });
+
+ expect(response.statusCode).toBe(200);
+ expect(response.body).toBe("BAIL_OUT_NO_MATCHING_HANDLERS");
+
+ await fastify.close();
+ });
+
+ it("should execute matching event handlers", async () => {
+ const fastify = Fastify({ logger: false });
+ const options: EventBusOptions = {
+ busType: "in-process",
+ handlers: [{
+ file: "test.ts",
+ handlers: { testEvent: mockHandler }
+ }],
+ validateMsg: mockValidateMsg,
+ processError: mockProcessError,
+ };
+
+ await fastify.register(LocalEventBusPlugin, options);
+
+ const testMessage: EventMessage = {
+ id: "test-msg-1",
+ event: "testEvent",
+ data: { test: "execution data" },
+ attributes: {},
+ publishTime: new Date("2023-01-01"),
+ processAfterDelayMs: 0,
+ };
+
+ const response = await fastify.inject({
+ method: "POST",
+ url: "/local-servicebus/process-message",
+ payload: testMessage
+ });
+
+ expect(response.statusCode).toBe(200);
+ expect(response.body).toBe("OK");
+ expect(mockHandler).toHaveBeenCalledWith(
+ expect.objectContaining({
+ id: "test-msg-1",
+ event: "testEvent",
+ data: { test: "execution data" },
+ attributes: {},
+ processAfterDelayMs: 0,
+ }),
+ expect.any(Object)
+ );
+
+ await fastify.close();
+ });
+
+ it("should handle messages with file attributes", async () => {
+ const fastify = Fastify({ logger: false });
+ const options: EventBusOptions = {
+ busType: "in-process",
+ handlers: [{
+ file: "specific-file.ts",
+ handlers: { testEvent: mockHandler }
+ }],
+ validateMsg: mockValidateMsg,
+ processError: mockProcessError,
+ };
+
+ await fastify.register(LocalEventBusPlugin, options);
+
+ const testMessage: EventMessage = {
+ id: "test-msg-1",
+ event: "testEvent",
+ data: { test: "data" },
+ attributes: { file: "specific-file.ts" },
+ publishTime: new Date("2023-01-01"),
+ processAfterDelayMs: 0,
+ };
+
+ const response = await fastify.inject({
+ method: "POST",
+ url: "/local-servicebus/process-message",
+ payload: testMessage
+ });
+
+ expect(response.statusCode).toBe(200);
+ expect(response.body).toBe("OK");
+
+ await fastify.close();
+ });
+ });
+
+ describe("Error Handling", () => {
+ it("should handle validation errors", async () => {
+ const fastify = Fastify({ logger: false });
+ const mockValidateMsgFail = jest.fn().mockImplementation(() => {
+ throw new Error("Validation failed");
+ });
+
+ const options: EventBusOptions = {
+ busType: "in-process",
+ handlers: [{
+ file: "test.ts",
+ handlers: { testEvent: mockHandler }
+ }],
+ validateMsg: mockValidateMsgFail,
+ processError: mockProcessError,
+ };
+
+ await fastify.register(LocalEventBusPlugin, options);
+
+ expect(() => {
+ fastify.EventBus.publish("testEvent", { invalid: "data" });
+ }).toThrow("Validation failed");
+
+ await fastify.close();
+ });
+
+ it("should handle handler execution errors gracefully", async () => {
+ const fastify = Fastify({ logger: false });
+ const failingHandler: EventHandler = jest.fn().mockRejectedValue(new Error("Handler failed"));
+
+ const options: EventBusOptions = {
+ busType: "in-process",
+ handlers: [{
+ file: "test.ts",
+ handlers: { failingEvent: failingHandler }
+ }],
+ validateMsg: mockValidateMsg,
+ processError: mockProcessError,
+ };
+
+ await fastify.register(LocalEventBusPlugin, options);
+
+ const testMessage: EventMessage = {
+ id: "test-msg-1",
+ event: "failingEvent",
+ data: { test: "data" },
+ attributes: {},
+ publishTime: new Date("2023-01-01"),
+ processAfterDelayMs: 0,
+ };
+
+ // The request should still complete, errors are handled by the handler runner
+ const response = await fastify.inject({
+ method: "POST",
+ url: "/local-servicebus/process-message",
+ payload: testMessage
+ });
+
+ expect(response.statusCode).toBe(200);
+ expect(failingHandler).toHaveBeenCalled();
+
+ await fastify.close();
+ });
+ });
+
+ describe("Message Queue Integration", () => {
+ it("should handle basic message flow", async () => {
+ const fastify = Fastify({ logger: false });
+ const options: EventBusOptions = {
+ busType: "in-process",
+ handlers: [{
+ file: "test.ts",
+ handlers: { testEvent: mockHandler }
+ }],
+ validateMsg: mockValidateMsg,
+ processError: mockProcessError,
+ };
+
+ await fastify.register(LocalEventBusPlugin, options);
+
+ // Publish an event
+ expect(() => {
+ fastify.EventBus.publish("testEvent", { test: "basic flow" });
+ }).not.toThrow();
+
+ // Verify validation was called
+ expect(mockValidateMsg).toHaveBeenCalledWith("testEvent", { test: "basic flow" }, undefined);
+
+ // Don't close to avoid flush hook issues
+ });
+ });
+});
\ No newline at end of file
diff --git a/src/event-bus/local.ts b/src/event-bus/local.ts
new file mode 100644
index 0000000..d6fc546
--- /dev/null
+++ b/src/event-bus/local.ts
@@ -0,0 +1,125 @@
+import { FastifyPluginAsync, FastifyRequest } from "fastify";
+import fp from "fastify-plugin";
+import {
+ CreateHandlerRunner,
+ getHandlerMap,
+ noMatchingHandlers,
+} from "./commons";
+import { EventBus, EventBusOptions, EventMessage } from "./interfaces";
+
+const plugin: FastifyPluginAsync = async function (
+ f,
+ options,
+) {
+ const handlerMap = getHandlerMap(options);
+
+ const bus: EventBus = {
+ publish(event, payload, processAfterDelayMs) {
+ publishToPubSub(event, payload, null, processAfterDelayMs ?? 0);
+ },
+ };
+ f.decorate("EventBus", {
+ getter() {
+ return bus;
+ },
+ });
+ f.decorateRequest("EventBus", {
+ getter() {
+ return bus;
+ },
+ });
+
+ const messages: EventMessage[] = [];
+
+ const flush = async () => {
+ while (messages.length) {
+ const msg = messages.shift();
+ if (!msg) {
+ break;
+ }
+ await f.inject({
+ method: "POST",
+ url: "/local-servicebus/process-message",
+ payload: msg,
+ });
+ }
+ };
+
+ f.addHook("onClose", async () => {
+ await flush();
+ f.log.trace({ tag: "LOCAL_SERVICEBUS_FINAL_FLUSH" });
+ });
+
+ f.addHook("onSend", async function (_req, _reply, payload) {
+ // eslint-disable-next-line @typescript-eslint/no-floating-promises
+ flush();
+ return payload;
+ });
+
+ let msgId = 0;
+ function publishToPubSub(
+ event: string,
+ payload: any,
+ file: string | null,
+ processAfterDelayMs: number,
+ req?: FastifyRequest,
+ ) {
+ options.validateMsg(event, payload, req);
+ const messageBody: EventMessage = {
+ id: "in-process" + msgId,
+ event,
+ attributes: file == null ? {} : { file },
+ data: payload,
+ publishTime: new Date(),
+ processAfterDelayMs: 0,
+ };
+ msgId++;
+ messages.push(messageBody);
+ f.log.info(
+ "Event pushed --> " +
+ event +
+ " -- " +
+ payload +
+ "--" +
+ file +
+ "--" +
+ processAfterDelayMs,
+ );
+ }
+
+ const selectAndRunHandlers = CreateHandlerRunner(f, options, handlerMap);
+
+ f.post<{ Body: EventMessage }>(
+ "/local-servicebus/process-message",
+ {
+ schema: {
+ hide: true,
+ } as any,
+ },
+ async function (req) {
+ const msg = req.body;
+ if (!msg) {
+ return "NO_MESSAGE";
+ }
+ options.validateMsg(msg.event, msg.data, req);
+
+ if (noMatchingHandlers(handlerMap, msg)) {
+ // bail-out
+ // service has no event-handlers registered
+ return "BAIL_OUT_NO_MATCHING_HANDLERS";
+ }
+
+ req.log.info({
+ tag: "LOCAL_SERVICEBUS_MESSAGE_HANDLE",
+ msg: msg,
+ });
+
+ await selectAndRunHandlers(req, msg, (event, payload, file) =>
+ publishToPubSub(event, payload, file, 0, req),
+ );
+ return "OK";
+ },
+ );
+};
+
+export = fp(plugin, { name: "fp-eventbus-local" });
diff --git a/src/event-bus/rabbitmq-utils.ts b/src/event-bus/rabbitmq-utils.ts
new file mode 100644
index 0000000..f94af79
--- /dev/null
+++ b/src/event-bus/rabbitmq-utils.ts
@@ -0,0 +1,84 @@
+import { Connection } from "rabbitmq-client";
+
+export const RABBITMQ_TAG = "" + Math.floor(Math.random() * 1e9);
+
+/**
+ * Extracts the prefix from a service name (part before the first hyphen).
+ * e.g., "wms-cincout" → "wms", "xyz-service" → "xyz", "noprefix" → "default"
+ */
+export function getServicePrefix(service: string): string {
+ const hyphenIndex = service.indexOf("-");
+ return hyphenIndex > 0 ? service.substring(0, hyphenIndex) : "default";
+}
+
+export async function ensureRabbitMqExchangesAndQueues(
+ connection: Connection,
+ service: string,
+) {
+ const prefix = getServicePrefix(service);
+ await connection.exchangeDeclare({
+ exchange: `${prefix}.main-exchange`,
+ type: "fanout",
+ durable: true,
+ arguments: {},
+ autoDelete: false,
+ internal: false,
+ passive: false,
+ });
+ await connection.queueDeclare({
+ queue: `${prefix}.queue.${service}`,
+ arguments: {
+ "x-message-ttl": 300000, // 5 minutes
+ "x-dead-letter-exchange": `${prefix}.retry-exchange.${service}`,
+ "x-dead-letter-routing-key": "retry", // Fixed routing key for DLX
+ "x-queue-type": "classic",
+ },
+ autoDelete: false,
+ durable: true,
+ exclusive: false,
+ passive: false,
+ });
+ await connection.queueBind({
+ exchange: `${prefix}.main-exchange`,
+ queue: `${prefix}.queue.${service}`,
+ });
+ await connection.exchangeDeclare({
+ exchange: `${prefix}.retry-exchange.${service}`,
+ type: "direct",
+ durable: true,
+ arguments: {},
+ autoDelete: false,
+ internal: false,
+ passive: false,
+ });
+ await connection.queueDeclare({
+ queue: `${prefix}.retry-queue.${service}`,
+ arguments: {
+ "x-message-ttl": 5000,
+ "x-dead-letter-exchange": "", // default exchange routes by queue name
+ "x-dead-letter-routing-key": `${prefix}.queue.${service}`,
+ "x-queue-type": "classic",
+ },
+ autoDelete: false,
+ durable: true,
+ exclusive: false,
+ passive: false,
+ });
+ await connection.queueBind({
+ exchange: `${prefix}.retry-exchange.${service}`,
+ queue: `${prefix}.retry-queue.${service}`,
+ routingKey: "retry", // Must match x-dead-letter-routing-key from main queue
+ });
+ // Dead Letter Queue for messages that exceed max retries (manual retry)
+ await connection.queueDeclare({
+ queue: `${prefix}.dlq.${service}`,
+ arguments: {
+ "x-message-ttl": 300000, // 5 minutes
+ "x-queue-type": "classic",
+ },
+ autoDelete: false,
+ durable: true,
+ exclusive: false,
+ passive: false,
+ });
+}
diff --git a/src/event-bus/rabbitmq.spec.ts b/src/event-bus/rabbitmq.spec.ts
new file mode 100644
index 0000000..4bd5eea
--- /dev/null
+++ b/src/event-bus/rabbitmq.spec.ts
@@ -0,0 +1,1503 @@
+import Fastify, { FastifyInstance } from "fastify";
+import { GenericContainer, StartedTestContainer, Wait } from "testcontainers";
+import { Connection } from "rabbitmq-client";
+import { Plugins } from "../index";
+import { EventBusOptions, EventHandler } from "./interfaces";
+import { getServicePrefix } from "./rabbitmq-utils";
+import { RabbitMqServiceBusConsumerBuilder } from "./event-consumer/rabbitmq";
+
+describe("RabbitMQ Integration Tests", () => {
+ let container: StartedTestContainer;
+ let rabbitmqUrl: string;
+
+ beforeAll(async () => {
+ container = await new GenericContainer("rabbitmq:3-management")
+ .withExposedPorts(5672, 15672)
+ .withWaitStrategy(Wait.forLogMessage(/started TCP listener/))
+ .start();
+
+ const host = container.getHost();
+ const port = container.getMappedPort(5672);
+ rabbitmqUrl = `amqp://guest:guest@${host}:${port}`;
+ }, 60000);
+
+ afterAll(async () => {
+ await container?.stop();
+ });
+
+ describe("getServicePrefix", () => {
+ it("should extract prefix before first hyphen", () => {
+ expect(getServicePrefix("wms-cincout")).toBe("wms");
+ expect(getServicePrefix("xyz-service-name")).toBe("xyz");
+ });
+
+ it("should return 'default' when no hyphen found", () => {
+ expect(getServicePrefix("noprefix")).toBe("default");
+ expect(getServicePrefix("")).toBe("default");
+ });
+
+ it("should handle edge cases", () => {
+ expect(getServicePrefix("-startswithhyphen")).toBe("default");
+ expect(getServicePrefix("a-b")).toBe("a");
+ });
+ });
+
+ describe("RabbitMQ Publisher", () => {
+ let fastify: FastifyInstance;
+ const originalEnv = process.env;
+
+ beforeEach(() => {
+ process.env = { ...originalEnv };
+ process.env.RABBITMQ_URL = rabbitmqUrl;
+ process.env.K_SERVICE = "test-publisher";
+ fastify = Fastify({ logger: false });
+ });
+
+ afterEach(async () => {
+ process.env = originalEnv;
+ await fastify.close();
+ });
+
+ it("should register RabbitMQ plugin successfully", async () => {
+ const eventBusOptions: EventBusOptions = {
+ busType: "rabbitmq",
+ handlers: [],
+ validateMsg: jest.fn(),
+ processError: jest.fn().mockReturnValue({ err: new Error("test"), status: 500 }),
+ };
+
+ await fastify.register(Plugins.EventBus, eventBusOptions);
+ expect(fastify.EventBus).toBeDefined();
+ expect(typeof fastify.EventBus.publish).toBe("function");
+ });
+
+ it("should publish messages to RabbitMQ", async () => {
+ const validateMsg = jest.fn();
+ const eventBusOptions: EventBusOptions = {
+ busType: "rabbitmq",
+ handlers: [],
+ validateMsg,
+ processError: jest.fn().mockReturnValue({ err: new Error("test"), status: 500 }),
+ };
+
+ await fastify.register(Plugins.EventBus, eventBusOptions);
+
+ // Publish a message
+ fastify.EventBus.publish("test.event", { data: "test-payload" });
+
+ // Wait for flush interval
+ await new Promise((resolve) => setTimeout(resolve, 100));
+
+ expect(validateMsg).toHaveBeenCalledWith("test.event", { data: "test-payload" }, undefined);
+ });
+
+ it("should use dynamic prefix based on K_SERVICE", async () => {
+ process.env.K_SERVICE = "myapp-service";
+
+ const eventBusOptions: EventBusOptions = {
+ busType: "rabbitmq",
+ handlers: [],
+ validateMsg: jest.fn(),
+ processError: jest.fn().mockReturnValue({ err: new Error("test"), status: 500 }),
+ };
+
+ await fastify.register(Plugins.EventBus, eventBusOptions);
+
+ // Now verify the exchange was created with correct prefix
+ const connection = new Connection(rabbitmqUrl);
+ try {
+ // This will throw if exchange doesn't exist (passive: true)
+ await connection.exchangeDeclare({
+ exchange: "myapp.main-exchange",
+ type: "fanout",
+ passive: true,
+ });
+ // If we get here, exchange exists
+ expect(true).toBe(true);
+ } finally {
+ await connection.close();
+ }
+ });
+ });
+
+ describe("RabbitMQ Consumer", () => {
+ let fastify: FastifyInstance;
+ let consumer: { close: () => Promise } | null = null;
+ const originalEnv = process.env;
+
+ beforeEach(() => {
+ process.env = { ...originalEnv };
+ process.env.RABBITMQ_URL = rabbitmqUrl;
+ process.env.K_SERVICE = "test-consumer";
+ fastify = Fastify({ logger: false });
+ });
+
+ afterEach(async () => {
+ process.env = originalEnv;
+ if (consumer) {
+ await consumer.close();
+ consumer = null;
+ }
+ await fastify.close();
+ });
+
+ it("should consume messages from RabbitMQ", async () => {
+ const receivedMessages: any[] = [];
+ const messageHandler: EventHandler = jest.fn(async (msg) => {
+ receivedMessages.push(msg);
+ });
+
+ const eventBusOptions: EventBusOptions = {
+ busType: "rabbitmq",
+ handlers: [
+ {
+ file: "test.ts",
+ handlers: { "test.consume": messageHandler },
+ },
+ ],
+ validateMsg: jest.fn(),
+ processError: jest.fn().mockReturnValue({ err: new Error("test"), status: 500 }),
+ };
+
+ await fastify.register(Plugins.EventBus, eventBusOptions);
+ await fastify.ready();
+
+ // Start consumer
+ consumer = await RabbitMqServiceBusConsumerBuilder(fastify);
+
+ // Publish a message directly to RabbitMQ
+ const connection = new Connection(rabbitmqUrl);
+ const publisher = connection.createPublisher({ maxAttempts: 3 });
+
+ try {
+ await publisher.send(
+ {
+ exchange: "test.main-exchange",
+ contentType: "application/json",
+ },
+ JSON.stringify({
+ event: "test.consume",
+ payload: { message: "hello" },
+ file: null,
+ processAfterDelayMs: 0,
+ publishTimestamp: Date.now(),
+ })
+ );
+
+ // Wait for message to be consumed
+ await new Promise((resolve) => setTimeout(resolve, 500));
+
+ expect(messageHandler).toHaveBeenCalled();
+ } finally {
+ await publisher.close();
+ await connection.close();
+ }
+ });
+ });
+
+ describe("RabbitMQ End-to-End", () => {
+ let publisherFastify: FastifyInstance;
+ let consumerFastify: FastifyInstance;
+ let consumer: { close: () => Promise } | null = null;
+ const originalEnv = process.env;
+
+ beforeEach(() => {
+ process.env = { ...originalEnv };
+ process.env.RABBITMQ_URL = rabbitmqUrl;
+ publisherFastify = Fastify({ logger: false });
+ consumerFastify = Fastify({ logger: false });
+ });
+
+ afterEach(async () => {
+ process.env = originalEnv;
+ if (consumer) {
+ await consumer.close();
+ consumer = null;
+ }
+ await publisherFastify.close();
+ await consumerFastify.close();
+ });
+
+ it("should publish and consume messages end-to-end", async () => {
+ process.env.K_SERVICE = "e2e-service";
+
+ const receivedMessages: any[] = [];
+ const messageHandler: EventHandler = jest.fn(async (msg) => {
+ receivedMessages.push(msg);
+ });
+
+ // Set up consumer
+ const consumerOptions: EventBusOptions = {
+ busType: "rabbitmq",
+ handlers: [
+ {
+ file: "e2e.ts",
+ handlers: { "e2e.test": messageHandler },
+ },
+ ],
+ validateMsg: jest.fn(),
+ processError: jest.fn().mockReturnValue({ err: new Error("test"), status: 500 }),
+ };
+
+ await consumerFastify.register(Plugins.EventBus, consumerOptions);
+ await consumerFastify.ready();
+ consumer = await RabbitMqServiceBusConsumerBuilder(consumerFastify);
+
+ // Set up publisher
+ const publisherOptions: EventBusOptions = {
+ busType: "rabbitmq",
+ handlers: [],
+ validateMsg: jest.fn(),
+ processError: jest.fn().mockReturnValue({ err: new Error("test"), status: 500 }),
+ };
+
+ await publisherFastify.register(Plugins.EventBus, publisherOptions);
+
+ // Publish message
+ publisherFastify.EventBus.publish("e2e.test", { testData: "e2e-payload" });
+
+ // Wait for message to be published and consumed
+ await new Promise((resolve) => setTimeout(resolve, 500));
+
+ expect(messageHandler).toHaveBeenCalled();
+ expect(receivedMessages.length).toBeGreaterThan(0);
+ expect(receivedMessages[0].data).toEqual({ testData: "e2e-payload" });
+ });
+
+ it("should respect processAfterDelayMs in messages", async () => {
+ // Test that messages with processAfterDelayMs are handled correctly
+ // The consumer returns 425 for messages that arrive too early
+ process.env.K_SERVICE = "delay-test-service";
+
+ const eventBusOptions: EventBusOptions = {
+ busType: "rabbitmq",
+ handlers: [
+ {
+ file: "delay.ts",
+ handlers: { "delay.check": jest.fn() },
+ },
+ ],
+ validateMsg: jest.fn(),
+ processError: jest.fn().mockReturnValue({ err: new Error("test"), status: 500 }),
+ };
+
+ await consumerFastify.register(Plugins.EventBus, eventBusOptions);
+ await consumerFastify.ready();
+
+ // Send a message with delay that hasn't elapsed yet
+ const response = await consumerFastify.inject({
+ method: "POST",
+ url: "/rabbitmq/process-message",
+ payload: {
+ messageId: 123,
+ body: JSON.stringify({
+ event: "delay.check",
+ payload: { test: true },
+ file: null,
+ processAfterDelayMs: 60000, // 60 seconds in the future
+ publishTimestamp: Date.now(),
+ }),
+ },
+ });
+
+ // Should return 425 (Too Early) for messages that can't be processed yet
+ expect(response.statusCode).toBe(425);
+ });
+
+ it("should handle multiple messages in batch and flush remaining", async () => {
+ // This test verifies the critical bug fix where messages < batch size (10) were being dropped
+ process.env.K_SERVICE = "batch-service";
+
+ const receivedMessages: any[] = [];
+ const messageHandler: EventHandler = jest.fn(async (msg) => {
+ receivedMessages.push(msg.data);
+ });
+
+ const consumerOptions: EventBusOptions = {
+ busType: "rabbitmq",
+ handlers: [
+ {
+ file: "batch.ts",
+ handlers: { "batch.test": messageHandler },
+ },
+ ],
+ validateMsg: jest.fn(),
+ processError: jest.fn().mockReturnValue({ err: new Error("test"), status: 500 }),
+ };
+
+ await consumerFastify.register(Plugins.EventBus, consumerOptions);
+ await consumerFastify.ready();
+ consumer = await RabbitMqServiceBusConsumerBuilder(consumerFastify);
+
+ const publisherOptions: EventBusOptions = {
+ busType: "rabbitmq",
+ handlers: [],
+ validateMsg: jest.fn(),
+ processError: jest.fn().mockReturnValue({ err: new Error("test"), status: 500 }),
+ };
+
+ await publisherFastify.register(Plugins.EventBus, publisherOptions);
+
+ // Publish 15 messages (more than batch size of 10)
+ // Before the fix, only 10 would be sent, the last 5 would be dropped
+ for (let i = 0; i < 15; i++) {
+ publisherFastify.EventBus.publish("batch.test", { index: i });
+ }
+
+ // Wait for all messages to be processed
+ await new Promise((resolve) => setTimeout(resolve, 1500));
+
+ // Verify ALL 15 messages were received (not just first 10)
+ expect(receivedMessages.length).toBe(15);
+
+ // Verify message content - all indices should be present
+ const indices = receivedMessages.map((m) => m.index).sort((a, b) => a - b);
+ expect(indices).toEqual([0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14]);
+ });
+
+ it("should handle multiple event types in same batch", async () => {
+ process.env.K_SERVICE = "multi-event-service";
+
+ const eventAMessages: any[] = [];
+ const eventBMessages: any[] = [];
+
+ const handlerA: EventHandler = jest.fn(async (msg) => {
+ eventAMessages.push(msg.data);
+ });
+ const handlerB: EventHandler = jest.fn(async (msg) => {
+ eventBMessages.push(msg.data);
+ });
+
+ const consumerOptions: EventBusOptions = {
+ busType: "rabbitmq",
+ handlers: [
+ {
+ file: "multi.ts",
+ handlers: {
+ "multi.eventA": handlerA,
+ "multi.eventB": handlerB,
+ },
+ },
+ ],
+ validateMsg: jest.fn(),
+ processError: jest.fn().mockReturnValue({ err: new Error("test"), status: 500 }),
+ };
+
+ await consumerFastify.register(Plugins.EventBus, consumerOptions);
+ await consumerFastify.ready();
+ consumer = await RabbitMqServiceBusConsumerBuilder(consumerFastify);
+
+ const publisherOptions: EventBusOptions = {
+ busType: "rabbitmq",
+ handlers: [],
+ validateMsg: jest.fn(),
+ processError: jest.fn().mockReturnValue({ err: new Error("test"), status: 500 }),
+ };
+
+ await publisherFastify.register(Plugins.EventBus, publisherOptions);
+
+ // Publish interleaved events
+ publisherFastify.EventBus.publish("multi.eventA", { type: "A", index: 1 });
+ publisherFastify.EventBus.publish("multi.eventB", { type: "B", index: 1 });
+ publisherFastify.EventBus.publish("multi.eventA", { type: "A", index: 2 });
+ publisherFastify.EventBus.publish("multi.eventB", { type: "B", index: 2 });
+ publisherFastify.EventBus.publish("multi.eventA", { type: "A", index: 3 });
+
+ // Wait for processing
+ await new Promise((resolve) => setTimeout(resolve, 1000));
+
+ expect(eventAMessages.length).toBe(3);
+ expect(eventBMessages.length).toBe(2);
+ expect(eventAMessages.every((m) => m.type === "A")).toBe(true);
+ expect(eventBMessages.every((m) => m.type === "B")).toBe(true);
+ });
+
+ it("should flush all messages on shutdown", async () => {
+ process.env.K_SERVICE = "shutdown-service";
+
+ const receivedMessages: any[] = [];
+ const messageHandler: EventHandler = jest.fn(async (msg) => {
+ receivedMessages.push(msg.data);
+ });
+
+ const consumerOptions: EventBusOptions = {
+ busType: "rabbitmq",
+ handlers: [
+ {
+ file: "shutdown.ts",
+ handlers: { "shutdown.test": messageHandler },
+ },
+ ],
+ validateMsg: jest.fn(),
+ processError: jest.fn().mockReturnValue({ err: new Error("test"), status: 500 }),
+ };
+
+ await consumerFastify.register(Plugins.EventBus, consumerOptions);
+ await consumerFastify.ready();
+ consumer = await RabbitMqServiceBusConsumerBuilder(consumerFastify);
+
+ const publisherOptions: EventBusOptions = {
+ busType: "rabbitmq",
+ handlers: [],
+ validateMsg: jest.fn(),
+ processError: jest.fn().mockReturnValue({ err: new Error("test"), status: 500 }),
+ };
+
+ await publisherFastify.register(Plugins.EventBus, publisherOptions);
+
+ // Publish messages
+ for (let i = 0; i < 7; i++) {
+ publisherFastify.EventBus.publish("shutdown.test", { index: i });
+ }
+
+ // Close publisher immediately (triggers onClose flush)
+ await publisherFastify.close();
+
+ // Wait for consumer to process
+ await new Promise((resolve) => setTimeout(resolve, 1000));
+
+ // All 7 messages should have been flushed on close
+ expect(receivedMessages.length).toBe(7);
+ });
+ });
+
+ describe("RabbitMQ Error Handling", () => {
+ let fastify: FastifyInstance;
+ const originalEnv = process.env;
+
+ beforeEach(() => {
+ process.env = { ...originalEnv };
+ fastify = Fastify({ logger: false });
+ });
+
+ afterEach(async () => {
+ process.env = originalEnv;
+ await fastify.close();
+ });
+
+ it("should throw error when RABBITMQ_URL is not set", async () => {
+ delete process.env.RABBITMQ_URL;
+ process.env.K_SERVICE = "test-service";
+
+ const eventBusOptions: EventBusOptions = {
+ busType: "rabbitmq",
+ handlers: [],
+ validateMsg: jest.fn(),
+ processError: jest.fn().mockReturnValue({ err: new Error("test"), status: 500 }),
+ };
+
+ await expect(
+ fastify.register(Plugins.EventBus, eventBusOptions)
+ ).rejects.toThrow("RabbitMq requires RABBITMQ_URL");
+ });
+
+ it("should throw error when K_SERVICE is not set", async () => {
+ process.env.RABBITMQ_URL = rabbitmqUrl;
+ delete process.env.K_SERVICE;
+
+ const eventBusOptions: EventBusOptions = {
+ busType: "rabbitmq",
+ handlers: [],
+ validateMsg: jest.fn(),
+ processError: jest.fn().mockReturnValue({ err: new Error("test"), status: 500 }),
+ };
+
+ await expect(
+ fastify.register(Plugins.EventBus, eventBusOptions)
+ ).rejects.toThrow("RabbitMq requires K_SERVICE");
+ });
+
+ it("should handle invalid JSON in message body", async () => {
+ process.env.RABBITMQ_URL = rabbitmqUrl;
+ process.env.K_SERVICE = "json-error-service";
+
+ const eventBusOptions: EventBusOptions = {
+ busType: "rabbitmq",
+ handlers: [
+ {
+ file: "error.ts",
+ handlers: { "error.test": jest.fn() },
+ },
+ ],
+ validateMsg: jest.fn(),
+ processError: jest.fn().mockReturnValue({ err: new Error("test"), status: 500 }),
+ };
+
+ await fastify.register(Plugins.EventBus, eventBusOptions);
+ await fastify.ready();
+
+ // Simulate incoming message with invalid JSON
+ const response = await fastify.inject({
+ method: "POST",
+ url: "/rabbitmq/process-message",
+ payload: {
+ messageId: 123,
+ body: "invalid json {",
+ },
+ });
+
+ expect(response.statusCode).toBe(400);
+ });
+ });
+
+ describe("Handler Execution", () => {
+ let consumerFastify: FastifyInstance;
+ const originalEnv = process.env;
+
+ beforeEach(() => {
+ process.env = { ...originalEnv };
+ process.env.RABBITMQ_URL = rabbitmqUrl;
+ consumerFastify = Fastify({ logger: false });
+ });
+
+ afterEach(async () => {
+ process.env = originalEnv;
+ await consumerFastify.close();
+ });
+
+ it("should call handler matching event name", async () => {
+ process.env.K_SERVICE = "handler-match-service";
+
+ const handler1 = jest.fn().mockResolvedValue(undefined);
+ const handler2 = jest.fn().mockResolvedValue(undefined);
+
+ const eventBusOptions: EventBusOptions = {
+ busType: "rabbitmq",
+ handlers: [
+ {
+ file: "handler1.ts",
+ handlers: { "event.one": handler1 },
+ },
+ {
+ file: "handler2.ts",
+ handlers: { "event.two": handler2 },
+ },
+ ],
+ validateMsg: jest.fn(),
+ processError: jest.fn().mockReturnValue({ err: new Error("test"), status: 500 }),
+ };
+
+ await consumerFastify.register(Plugins.EventBus, eventBusOptions);
+ await consumerFastify.ready();
+
+ // Send message for event.one
+ const response = await consumerFastify.inject({
+ method: "POST",
+ url: "/rabbitmq/process-message",
+ payload: {
+ messageId: "msg-123",
+ body: JSON.stringify({
+ event: "event.one",
+ payload: { test: "data" },
+ file: null,
+ processAfterDelayMs: 0,
+ publishTimestamp: Date.now(),
+ }),
+ },
+ });
+
+ expect(response.statusCode).toBe(200);
+ expect(handler1).toHaveBeenCalledTimes(1);
+ expect(handler2).not.toHaveBeenCalled();
+ });
+
+ it("should call only handler matching specified file", async () => {
+ process.env.K_SERVICE = "file-match-service";
+
+ const handlerA = jest.fn().mockResolvedValue(undefined);
+ const handlerB = jest.fn().mockResolvedValue(undefined);
+
+ const eventBusOptions: EventBusOptions = {
+ busType: "rabbitmq",
+ handlers: [
+ {
+ file: "fileA.ts",
+ handlers: { "same.event": handlerA },
+ },
+ {
+ file: "fileB.ts",
+ handlers: { "same.event": handlerB },
+ },
+ ],
+ validateMsg: jest.fn(),
+ processError: jest.fn().mockReturnValue({ err: new Error("test"), status: 500 }),
+ };
+
+ await consumerFastify.register(Plugins.EventBus, eventBusOptions);
+ await consumerFastify.ready();
+
+ // Send message with specific file filter
+ const response = await consumerFastify.inject({
+ method: "POST",
+ url: "/rabbitmq/process-message",
+ payload: {
+ messageId: "msg-456",
+ body: JSON.stringify({
+ event: "same.event",
+ payload: { test: "data" },
+ file: "fileA.ts", // Only fileA.ts handler should run
+ processAfterDelayMs: 0,
+ publishTimestamp: Date.now(),
+ }),
+ },
+ });
+
+ expect(response.statusCode).toBe(200);
+ expect(handlerA).toHaveBeenCalledTimes(1);
+ expect(handlerB).not.toHaveBeenCalled();
+ });
+
+ it("should call all handlers when no file specified", async () => {
+ process.env.K_SERVICE = "all-handlers-service";
+
+ const handlerA = jest.fn().mockResolvedValue(undefined);
+ const handlerB = jest.fn().mockResolvedValue(undefined);
+
+ const eventBusOptions: EventBusOptions = {
+ busType: "rabbitmq",
+ handlers: [
+ {
+ file: "fileA.ts",
+ handlers: { "broadcast.event": handlerA },
+ },
+ {
+ file: "fileB.ts",
+ handlers: { "broadcast.event": handlerB },
+ },
+ ],
+ validateMsg: jest.fn(),
+ processError: jest.fn().mockReturnValue({ err: new Error("test"), status: 500 }),
+ };
+
+ await consumerFastify.register(Plugins.EventBus, eventBusOptions);
+ await consumerFastify.ready();
+
+ // Send message without file filter
+ const response = await consumerFastify.inject({
+ method: "POST",
+ url: "/rabbitmq/process-message",
+ payload: {
+ messageId: "msg-789",
+ body: JSON.stringify({
+ event: "broadcast.event",
+ payload: { test: "data" },
+ file: null, // No file filter - all handlers run
+ processAfterDelayMs: 0,
+ publishTimestamp: Date.now(),
+ }),
+ },
+ });
+
+ expect(response.statusCode).toBe(200);
+ expect(handlerA).toHaveBeenCalledTimes(1);
+ expect(handlerB).toHaveBeenCalledTimes(1);
+ });
+
+ it("should skip handler when file does not match", async () => {
+ process.env.K_SERVICE = "file-mismatch-service";
+
+ const handler = jest.fn().mockResolvedValue(undefined);
+
+ const eventBusOptions: EventBusOptions = {
+ busType: "rabbitmq",
+ handlers: [
+ {
+ file: "registered-file.ts",
+ handlers: { "mismatch.event": handler },
+ },
+ ],
+ validateMsg: jest.fn(),
+ processError: jest.fn().mockReturnValue({ err: new Error("test"), status: 500 }),
+ };
+
+ await consumerFastify.register(Plugins.EventBus, eventBusOptions);
+ await consumerFastify.ready();
+
+ // Send message with different file
+ const response = await consumerFastify.inject({
+ method: "POST",
+ url: "/rabbitmq/process-message",
+ payload: {
+ messageId: "msg-mismatch",
+ body: JSON.stringify({
+ event: "mismatch.event",
+ payload: { test: "data" },
+ file: "different-file.ts", // Doesn't match registered file
+ processAfterDelayMs: 0,
+ publishTimestamp: Date.now(),
+ }),
+ },
+ });
+
+ // Should return 200 (no error) but handler should NOT be called
+ expect(response.statusCode).toBe(200);
+ expect(handler).not.toHaveBeenCalled();
+ });
+
+ it("should return OK when no handler registered for event", async () => {
+ process.env.K_SERVICE = "no-handler-service";
+
+ const handler = jest.fn().mockResolvedValue(undefined);
+
+ const eventBusOptions: EventBusOptions = {
+ busType: "rabbitmq",
+ handlers: [
+ {
+ file: "some.ts",
+ handlers: { "registered.event": handler },
+ },
+ ],
+ validateMsg: jest.fn(),
+ processError: jest.fn().mockReturnValue({ err: new Error("test"), status: 500 }),
+ };
+
+ await consumerFastify.register(Plugins.EventBus, eventBusOptions);
+ await consumerFastify.ready();
+
+ // Send message for unregistered event
+ const response = await consumerFastify.inject({
+ method: "POST",
+ url: "/rabbitmq/process-message",
+ payload: {
+ messageId: "msg-unknown",
+ body: JSON.stringify({
+ event: "unknown.event", // No handler registered
+ payload: { test: "data" },
+ file: null,
+ processAfterDelayMs: 0,
+ publishTimestamp: Date.now(),
+ }),
+ },
+ });
+
+ // Should bail out gracefully
+ expect(response.statusCode).toBe(200);
+ expect(handler).not.toHaveBeenCalled();
+ });
+
+ it("should pass correct data to handler", async () => {
+ process.env.K_SERVICE = "data-pass-service";
+
+ let receivedMsg: any = null;
+ const handler = jest.fn().mockImplementation(async (msg) => {
+ receivedMsg = msg;
+ });
+
+ const eventBusOptions: EventBusOptions = {
+ busType: "rabbitmq",
+ handlers: [
+ {
+ file: "data.ts",
+ handlers: { "data.event": handler },
+ },
+ ],
+ validateMsg: jest.fn(),
+ processError: jest.fn().mockReturnValue({ err: new Error("test"), status: 500 }),
+ };
+
+ await consumerFastify.register(Plugins.EventBus, eventBusOptions);
+ await consumerFastify.ready();
+
+ const testPayload = { userId: 123, action: "test" };
+ const publishTimestamp = Date.now();
+
+ const response = await consumerFastify.inject({
+ method: "POST",
+ url: "/rabbitmq/process-message",
+ payload: {
+ messageId: "msg-data-123",
+ body: JSON.stringify({
+ event: "data.event",
+ payload: testPayload,
+ file: null,
+ processAfterDelayMs: 0,
+ publishTimestamp,
+ }),
+ },
+ });
+
+ expect(response.statusCode).toBe(200);
+ expect(handler).toHaveBeenCalledTimes(1);
+ expect(receivedMsg).not.toBeNull();
+ expect(receivedMsg.data).toEqual(testPayload);
+ expect(receivedMsg.event).toBe("data.event");
+ expect(receivedMsg.id).toBe("msg-data-123");
+ });
+
+ it("should return 500 when handler throws error", async () => {
+ process.env.K_SERVICE = "error-handler-service";
+
+ const handler = jest.fn().mockRejectedValue(new Error("Handler failed"));
+
+ const eventBusOptions: EventBusOptions = {
+ busType: "rabbitmq",
+ handlers: [
+ {
+ file: "error.ts",
+ handlers: { "error.event": handler },
+ },
+ ],
+ validateMsg: jest.fn(),
+ processError: jest.fn().mockReturnValue({ err: new Error("Processed error"), status: 500 }),
+ };
+
+ await consumerFastify.register(Plugins.EventBus, eventBusOptions);
+ await consumerFastify.ready();
+
+ const response = await consumerFastify.inject({
+ method: "POST",
+ url: "/rabbitmq/process-message",
+ payload: {
+ messageId: "msg-error",
+ body: JSON.stringify({
+ event: "error.event",
+ payload: { test: "data" },
+ file: "error.ts", // Specify file to get ErrorWithStatus thrown
+ processAfterDelayMs: 0,
+ publishTimestamp: Date.now(),
+ }),
+ },
+ });
+
+ expect(response.statusCode).toBe(500);
+ expect(handler).toHaveBeenCalledTimes(1);
+ });
+ });
+
+ describe("RabbitMQ Dead-Letter Retry", () => {
+ let consumerFastify: FastifyInstance;
+ let consumer: { close: () => Promise } | null = null;
+ const originalEnv = process.env;
+
+ beforeEach(() => {
+ process.env = { ...originalEnv };
+ process.env.RABBITMQ_URL = rabbitmqUrl;
+ consumerFastify = Fastify({ logger: false });
+ });
+
+ afterEach(async () => {
+ process.env = originalEnv;
+ if (consumer) {
+ await consumer.close();
+ consumer = null;
+ }
+ await consumerFastify.close();
+ });
+
+ it("should retry via dead-letter on 500 error", async () => {
+ process.env.K_SERVICE = "retry-500-service";
+
+ let callCount = 0;
+ const messageHandler: EventHandler = jest.fn(async () => {
+ callCount++;
+ if (callCount < 2) {
+ throw new Error("Simulated 500 error");
+ }
+ // Succeed on second attempt
+ });
+
+ const eventBusOptions: EventBusOptions = {
+ busType: "rabbitmq",
+ handlers: [
+ {
+ file: "retry.ts",
+ handlers: { "retry.500": messageHandler },
+ },
+ ],
+ validateMsg: jest.fn(),
+ processError: jest.fn().mockReturnValue({ err: new Error("test"), status: 500 }),
+ };
+
+ await consumerFastify.register(Plugins.EventBus, eventBusOptions);
+ await consumerFastify.ready();
+ consumer = await RabbitMqServiceBusConsumerBuilder(consumerFastify);
+
+ // Publish message directly to RabbitMQ
+ const connection = new Connection(rabbitmqUrl);
+ const publisher = connection.createPublisher({ maxAttempts: 3 });
+
+ try {
+ await publisher.send(
+ {
+ exchange: "retry.main-exchange",
+ contentType: "application/json",
+ },
+ JSON.stringify({
+ event: "retry.500",
+ payload: { test: true },
+ file: null,
+ processAfterDelayMs: 0,
+ publishTimestamp: Date.now(),
+ })
+ );
+
+ // Wait for dead-letter retry cycle (5s TTL + processing time)
+ await new Promise((resolve) => setTimeout(resolve, 7000));
+
+ // Handler should be called twice: first fail, then succeed after DLX retry
+ expect(callCount).toBe(2);
+ } finally {
+ await publisher.close();
+ await connection.close();
+ }
+ }, 15000);
+
+ it("should retry via dead-letter on 429 rate-limit", async () => {
+ process.env.K_SERVICE = "retry-429-service";
+
+ let callCount = 0;
+ const receivedMessages: any[] = [];
+
+ const eventBusOptions: EventBusOptions = {
+ busType: "rabbitmq",
+ handlers: [
+ {
+ file: "retry429.ts",
+ handlers: {
+ "retry.429": async (msg) => {
+ callCount++;
+ receivedMessages.push({ attempt: callCount, data: msg.data });
+ },
+ },
+ },
+ ],
+ validateMsg: jest.fn(),
+ processError: jest.fn().mockReturnValue({ err: new Error("test"), status: 500 }),
+ };
+
+ await consumerFastify.register(Plugins.EventBus, eventBusOptions);
+
+ // Override the route to return 429 on first call
+ let httpCallCount = 0;
+ consumerFastify.addHook("onSend", async (request, reply, payload) => {
+ if (request.url === "/rabbitmq/process-message") {
+ httpCallCount++;
+ if (httpCallCount === 1) {
+ reply.code(429);
+ return JSON.stringify({ error: "rate limited" });
+ }
+ }
+ return payload;
+ });
+
+ await consumerFastify.ready();
+ consumer = await RabbitMqServiceBusConsumerBuilder(consumerFastify);
+
+ const connection = new Connection(rabbitmqUrl);
+ const publisher = connection.createPublisher({ maxAttempts: 3 });
+
+ try {
+ await publisher.send(
+ {
+ exchange: "retry.main-exchange",
+ contentType: "application/json",
+ },
+ JSON.stringify({
+ event: "retry.429",
+ payload: { rateLimit: true },
+ file: null,
+ processAfterDelayMs: 0,
+ publishTimestamp: Date.now(),
+ })
+ );
+
+ // Wait for dead-letter retry cycle
+ await new Promise((resolve) => setTimeout(resolve, 7000));
+
+ // Should have been called twice via DLX retry
+ expect(httpCallCount).toBeGreaterThanOrEqual(2);
+ } finally {
+ await publisher.close();
+ await connection.close();
+ }
+ }, 15000);
+
+ it("should retry via dead-letter on 409 lock conflict", async () => {
+ process.env.K_SERVICE = "retry-409-service";
+
+ const eventBusOptions: EventBusOptions = {
+ busType: "rabbitmq",
+ handlers: [
+ {
+ file: "retry409.ts",
+ handlers: { "retry.409": jest.fn() },
+ },
+ ],
+ validateMsg: jest.fn(),
+ processError: jest.fn().mockReturnValue({ err: new Error("test"), status: 500 }),
+ };
+
+ await consumerFastify.register(Plugins.EventBus, eventBusOptions);
+
+ let httpCallCount = 0;
+ consumerFastify.addHook("onSend", async (request, reply, payload) => {
+ if (request.url === "/rabbitmq/process-message") {
+ httpCallCount++;
+ if (httpCallCount === 1) {
+ reply.code(409);
+ return JSON.stringify({ error: "lock conflict" });
+ }
+ }
+ return payload;
+ });
+
+ await consumerFastify.ready();
+ consumer = await RabbitMqServiceBusConsumerBuilder(consumerFastify);
+
+ const connection = new Connection(rabbitmqUrl);
+ const publisher = connection.createPublisher({ maxAttempts: 3 });
+
+ try {
+ await publisher.send(
+ {
+ exchange: "retry.main-exchange",
+ contentType: "application/json",
+ },
+ JSON.stringify({
+ event: "retry.409",
+ payload: { lockConflict: true },
+ file: null,
+ processAfterDelayMs: 0,
+ publishTimestamp: Date.now(),
+ })
+ );
+
+ // Wait for dead-letter retry cycle
+ await new Promise((resolve) => setTimeout(resolve, 7000));
+
+ // Should have retried via DLX
+ expect(httpCallCount).toBeGreaterThanOrEqual(2);
+ } finally {
+ await publisher.close();
+ await connection.close();
+ }
+ }, 15000);
+
+ it("should verify message goes through retry queue", async () => {
+ process.env.K_SERVICE = "dlx-flow-service";
+
+ const eventBusOptions: EventBusOptions = {
+ busType: "rabbitmq",
+ handlers: [
+ {
+ file: "dlx.ts",
+ handlers: { "dlx.test": jest.fn() },
+ },
+ ],
+ validateMsg: jest.fn(),
+ processError: jest.fn().mockReturnValue({ err: new Error("test"), status: 500 }),
+ };
+
+ await consumerFastify.register(Plugins.EventBus, eventBusOptions);
+
+ // Always return 500 to keep message in retry loop
+ consumerFastify.addHook("onSend", async (request, reply, payload) => {
+ if (request.url === "/rabbitmq/process-message") {
+ reply.code(500);
+ return JSON.stringify({ error: "server error" });
+ }
+ return payload;
+ });
+
+ await consumerFastify.ready();
+ consumer = await RabbitMqServiceBusConsumerBuilder(consumerFastify);
+
+ const connection = new Connection(rabbitmqUrl);
+ const publisher = connection.createPublisher({ maxAttempts: 3 });
+
+ try {
+ await publisher.send(
+ {
+ exchange: "dlx.main-exchange",
+ contentType: "application/json",
+ },
+ JSON.stringify({
+ event: "dlx.test",
+ payload: { dlxTest: true },
+ file: null,
+ processAfterDelayMs: 0,
+ publishTimestamp: Date.now(),
+ })
+ );
+
+ // Wait a bit for message to enter retry queue
+ await new Promise((resolve) => setTimeout(resolve, 1000));
+
+ // Check retry queue has the message
+ const retryQueueInfo = await connection.queueDeclare({
+ queue: "dlx.retry-queue.dlx-flow-service",
+ passive: true,
+ });
+
+ // Message should be in retry queue (or already cycled back)
+ // We verify the queue exists and has been used
+ expect(retryQueueInfo.queue).toBe("dlx.retry-queue.dlx-flow-service");
+ } finally {
+ await publisher.close();
+ await connection.close();
+ }
+ }, 15000);
+
+ it("should use immediate REQUEUE for 425 (delayed message), not DLX", async () => {
+ // 425 should use REQUEUE with local sleep (up to 10s randomDelay)
+ // Key: retry should happen BEFORE 5s DLX TTL would complete (total < 15s)
+ // DLX path would be: 5s TTL + processing > 5s
+ // REQUEUE path: up to 10s randomDelay + immediate requeue
+ process.env.K_SERVICE = "requeue-service";
+
+ let httpCallCount = 0;
+ const callTimestamps: number[] = [];
+
+ const eventBusOptions: EventBusOptions = {
+ busType: "rabbitmq",
+ handlers: [
+ {
+ file: "requeue.ts",
+ handlers: { "requeue.425": jest.fn() },
+ },
+ ],
+ validateMsg: jest.fn(),
+ processError: jest.fn().mockReturnValue({ err: new Error("test"), status: 500 }),
+ };
+
+ // Register hook BEFORE plugin
+ consumerFastify.addHook("onSend", async (request, reply, payload) => {
+ if (request.url === "/rabbitmq/process-message") {
+ httpCallCount++;
+ callTimestamps.push(Date.now());
+ if (httpCallCount === 1) {
+ reply.code(425);
+ return JSON.stringify({ processAfterDelayMs: 1000 });
+ }
+ }
+ return payload;
+ });
+
+ await consumerFastify.register(Plugins.EventBus, eventBusOptions);
+ await consumerFastify.ready();
+ consumer = await RabbitMqServiceBusConsumerBuilder(consumerFastify);
+
+ const connection = new Connection(rabbitmqUrl);
+ const publisher = connection.createPublisher({ maxAttempts: 3 });
+
+ try {
+ await publisher.send(
+ {
+ exchange: "requeue.main-exchange",
+ contentType: "application/json",
+ },
+ JSON.stringify({
+ event: "requeue.425",
+ payload: { delayed: true },
+ file: null,
+ processAfterDelayMs: 1000,
+ publishTimestamp: Date.now(),
+ })
+ );
+
+ // Wait for REQUEUE cycle (randomDelay up to 10s + processing)
+ await new Promise((resolve) => setTimeout(resolve, 12000));
+
+ // Should have been called twice (via REQUEUE, not DLX)
+ expect(httpCallCount).toBeGreaterThanOrEqual(2);
+
+ // Verify the retry didn't go through DLX (which adds 5s TTL)
+ // REQUEUE with 10s max delay should be faster than DLX 5s + 10s delay
+ if (callTimestamps.length >= 2) {
+ const timeBetweenCalls = callTimestamps[1] - callTimestamps[0];
+ // Should be less than 12s (10s max delay + some processing)
+ expect(timeBetweenCalls).toBeLessThan(12000);
+ }
+ } finally {
+ await publisher.close();
+ await connection.close();
+ }
+ }, 20000);
+
+ it("should dead-letter on other 4xx errors (bad message)", async () => {
+ process.env.K_SERVICE = "bad-msg-service";
+
+ let callCount = 0;
+
+ const eventBusOptions: EventBusOptions = {
+ busType: "rabbitmq",
+ handlers: [
+ {
+ file: "badmsg.ts",
+ handlers: { "badmsg.test": jest.fn() },
+ },
+ ],
+ validateMsg: jest.fn(),
+ processError: jest.fn().mockReturnValue({ err: new Error("test"), status: 500 }),
+ };
+
+ await consumerFastify.register(Plugins.EventBus, eventBusOptions);
+
+ // Return 400 on first call, 200 after DLX retry
+ consumerFastify.addHook("onSend", async (request, reply, payload) => {
+ if (request.url === "/rabbitmq/process-message") {
+ callCount++;
+ if (callCount === 1) {
+ reply.code(400);
+ return JSON.stringify({ error: "bad request" });
+ }
+ }
+ return payload;
+ });
+
+ await consumerFastify.ready();
+ consumer = await RabbitMqServiceBusConsumerBuilder(consumerFastify);
+
+ const connection = new Connection(rabbitmqUrl);
+ const publisher = connection.createPublisher({ maxAttempts: 3 });
+
+ try {
+ await publisher.send(
+ {
+ exchange: "bad.main-exchange",
+ contentType: "application/json",
+ },
+ JSON.stringify({
+ event: "badmsg.test",
+ payload: { bad: true },
+ file: null,
+ processAfterDelayMs: 0,
+ publishTimestamp: Date.now(),
+ })
+ );
+
+ // Wait for DLX retry cycle (5s TTL + processing)
+ await new Promise((resolve) => setTimeout(resolve, 7000));
+
+ // Should have retried via DLX
+ expect(callCount).toBeGreaterThanOrEqual(2);
+ } finally {
+ await publisher.close();
+ await connection.close();
+ }
+ }, 15000);
+
+ it("should not fan out retry to other services", async () => {
+ // This test verifies that when a message fails in service-a,
+ // the retry only goes back to service-a, not to service-b
+ // Both services use same prefix "shared" so they share the main-exchange
+
+ let serviceBMessageCount = 0;
+ let serviceAHttpCalls = 0;
+
+ // Set up service A (will fail first, then succeed)
+ process.env.K_SERVICE = "shared-service-a";
+
+ // Register hook BEFORE plugin for service A
+ consumerFastify.addHook("onSend", async (request, reply, payload) => {
+ if (request.url === "/rabbitmq/process-message") {
+ serviceAHttpCalls++;
+ if (serviceAHttpCalls === 1) {
+ reply.code(500);
+ return JSON.stringify({ error: "server error" });
+ }
+ }
+ return payload;
+ });
+
+ const serviceAOptions: EventBusOptions = {
+ busType: "rabbitmq",
+ handlers: [
+ {
+ file: "fanout-a.ts",
+ handlers: {
+ "fanout.test": jest.fn(),
+ },
+ },
+ ],
+ validateMsg: jest.fn(),
+ processError: jest.fn().mockReturnValue({ err: new Error("test"), status: 500 }),
+ };
+
+ await consumerFastify.register(Plugins.EventBus, serviceAOptions);
+ await consumerFastify.ready();
+ consumer = await RabbitMqServiceBusConsumerBuilder(consumerFastify);
+
+ // Set up service B (separate instance, same prefix "shared")
+ const serviceBFastify = Fastify({ logger: false });
+ process.env.K_SERVICE = "shared-service-b";
+
+ const serviceBOptions: EventBusOptions = {
+ busType: "rabbitmq",
+ handlers: [
+ {
+ file: "fanout-b.ts",
+ handlers: {
+ "fanout.test": async () => {
+ serviceBMessageCount++;
+ },
+ },
+ },
+ ],
+ validateMsg: jest.fn(),
+ processError: jest.fn().mockReturnValue({ err: new Error("test"), status: 500 }),
+ };
+
+ await serviceBFastify.register(Plugins.EventBus, serviceBOptions);
+ await serviceBFastify.ready();
+ const consumerB = await RabbitMqServiceBusConsumerBuilder(serviceBFastify);
+
+ const connection = new Connection(rabbitmqUrl);
+ const publisher = connection.createPublisher({ maxAttempts: 3 });
+
+ try {
+ // Publish to main exchange (fans out to both services)
+ await publisher.send(
+ {
+ exchange: "shared.main-exchange",
+ contentType: "application/json",
+ },
+ JSON.stringify({
+ event: "fanout.test",
+ payload: { fanoutTest: true },
+ file: null,
+ processAfterDelayMs: 0,
+ publishTimestamp: Date.now(),
+ })
+ );
+
+ // Wait for initial fanout + DLX retry cycle (5s TTL + processing)
+ await new Promise((resolve) => setTimeout(resolve, 8000));
+
+ // KEY ASSERTION: Service B should have received the message exactly once (initial fanout only)
+ // If retry went through main-exchange (the old buggy behavior), service B would get it again
+ expect(serviceBMessageCount).toBe(1);
+
+ // Service A should have received it at least twice (initial fail + retry after DLX)
+ expect(serviceAHttpCalls).toBeGreaterThanOrEqual(2);
+ } finally {
+ await publisher.close();
+ await connection.close();
+ await consumerB.close();
+ await serviceBFastify.close();
+ }
+ }, 20000);
+
+ it("should move message to DLQ after 10 failed retries with correct headers", async () => {
+ // This is the REAL DLQ behavior test:
+ // 1. Message fails 10 times via DLX retry cycles
+ // 2. After 10th failure, message is moved to DLQ
+ // 3. DLQ message has correct headers (x-original-queue, x-final-status-code, x-final-retry-count)
+ // 4. Message is removed from main queue (ACKed)
+ //
+ // Timing: 10 DLX cycles × 5s TTL = 50s minimum
+ process.env.K_SERVICE = "dlq-e2e-service";
+
+ let httpCallCount = 0;
+
+ const eventBusOptions: EventBusOptions = {
+ busType: "rabbitmq",
+ handlers: [
+ {
+ file: "dlq-e2e.ts",
+ handlers: { "dlq.e2e": jest.fn() },
+ },
+ ],
+ validateMsg: jest.fn(),
+ processError: jest.fn().mockReturnValue({ err: new Error("test"), status: 500 }),
+ };
+
+ // Always return 500 to force message through all 10 retry cycles
+ consumerFastify.addHook("onSend", async (request, reply, payload) => {
+ if (request.url === "/rabbitmq/process-message") {
+ httpCallCount++;
+ reply.code(500);
+ return JSON.stringify({ error: "simulated failure" });
+ }
+ return payload;
+ });
+
+ await consumerFastify.register(Plugins.EventBus, eventBusOptions);
+ await consumerFastify.ready();
+ consumer = await RabbitMqServiceBusConsumerBuilder(consumerFastify);
+
+ const connection = new Connection(rabbitmqUrl);
+ const publisher = connection.createPublisher({ maxAttempts: 3 });
+
+ try {
+ // Verify DLQ queue was created during setup
+ const dlqInfo = await connection.queueDeclare({
+ queue: "dlq.dlq.dlq-e2e-service",
+ passive: true,
+ });
+ expect(dlqInfo.queue).toBe("dlq.dlq.dlq-e2e-service");
+
+ const testPayload = { dlqTest: true, timestamp: Date.now() };
+
+ // Publish message that will fail all 10 retries
+ await publisher.send(
+ {
+ exchange: "dlq.main-exchange",
+ contentType: "application/json",
+ },
+ JSON.stringify({
+ event: "dlq.e2e",
+ payload: testPayload,
+ file: null,
+ processAfterDelayMs: 0,
+ publishTimestamp: Date.now(),
+ })
+ );
+
+ // Wait for 10 DLX cycles + processing time
+ // Each cycle: 5s TTL + processing overhead
+ // Total: ~55-60 seconds
+ await new Promise((resolve) => setTimeout(resolve, 58000));
+
+ // Should have been called at least 11 times:
+ // 1 initial call + 10 retry cycles = 11 calls
+ // On the 11th call (after 10 DLX cycles), x-death count = 10, triggers DLQ
+ expect(httpCallCount).toBeGreaterThanOrEqual(11);
+
+ // Now verify the message is in the DLQ
+ // Create a temporary consumer to read from DLQ
+ let dlqMessage: any = null;
+ const dlqConsumer = connection.createConsumer(
+ {
+ queue: "dlq.dlq.dlq-e2e-service",
+ queueOptions: { passive: true },
+ noAck: true, // Don't require ACK for test
+ },
+ async (msg: any) => {
+ dlqMessage = msg;
+ return 1; // ACK
+ }
+ );
+
+ // Give consumer time to receive message
+ await new Promise((resolve) => setTimeout(resolve, 1000));
+ await dlqConsumer.close();
+
+ // Verify DLQ message exists and has correct headers
+ expect(dlqMessage).not.toBeNull();
+ expect(dlqMessage.headers).toBeDefined();
+ expect(dlqMessage.headers["x-original-queue"]).toBe("dlq.queue.dlq-e2e-service");
+ expect(dlqMessage.headers["x-final-status-code"]).toBe(500);
+ expect(dlqMessage.headers["x-final-retry-count"]).toBeGreaterThanOrEqual(10);
+
+ // Verify message body contains original payload
+ const bodyStr = Buffer.isBuffer(dlqMessage.body)
+ ? dlqMessage.body.toString("utf8")
+ : dlqMessage.body;
+ const body = JSON.parse(bodyStr);
+ expect(body.payload).toEqual(testPayload);
+ expect(body.event).toBe("dlq.e2e");
+
+ // Verify main queue is empty (message was ACKed after moving to DLQ)
+ const mainQueueInfo = await connection.queueDeclare({
+ queue: "dlq.queue.dlq-e2e-service",
+ passive: true,
+ });
+ expect(mainQueueInfo.messageCount).toBe(0);
+ } finally {
+ await publisher.close();
+ await connection.close();
+ }
+ }, 70000); // 70s timeout for 10 retry cycles
+ });
+});
diff --git a/src/event-bus/rabbitmq.ts b/src/event-bus/rabbitmq.ts
new file mode 100644
index 0000000..9eebdd4
--- /dev/null
+++ b/src/event-bus/rabbitmq.ts
@@ -0,0 +1,340 @@
+import { randomUUID } from "crypto";
+import { FastifyInstance, FastifyPluginAsync, FastifyRequest } from "fastify";
+import fp from "fastify-plugin";
+import { Queue } from "mnemonist";
+import { Connection, Publisher } from "rabbitmq-client";
+import {
+ CreateHandlerRunner,
+ ErrorWithStatus,
+ getHandlerMap,
+ noMatchingHandlers,
+} from "./commons";
+import { EventBus, EventBusOptions, EventMessage } from "./interfaces";
+import {
+ ensureRabbitMqExchangesAndQueues,
+ getServicePrefix,
+} from "./rabbitmq-utils";
+
+interface IncomingRabbitMqMessage {
+ messageId: number;
+ body: string;
+}
+
+interface MessageBody {
+ event: string;
+ payload: any;
+ file: string | null;
+ processAfterDelayMs: number | undefined;
+ publishTimestamp: number;
+}
+
+interface MessageWithAttempts {
+ body: MessageBody;
+ attempts: number;
+}
+
+const plugin: FastifyPluginAsync = async function (
+ f,
+ options,
+) {
+ const handlerMap = getHandlerMap(options);
+ f.decorate("_hasEventHandlers", handlerMap.size > 0);
+
+ if (!process.env.RABBITMQ_URL) {
+ throw new Error("RabbitMq requires RABBITMQ_URL");
+ }
+ if (!process.env.K_SERVICE) {
+ throw new Error("RabbitMq requires K_SERVICE");
+ }
+ const connection = new Connection(process.env.RABBITMQ_URL);
+ const service = process.env.K_SERVICE;
+ if (options.ensureExchangesAndQueues) {
+ await ensureRabbitMqExchangesAndQueues(connection, service);
+ }
+
+ const publisher = connection.createPublisher({ maxAttempts: 3 });
+
+ const msgQueue = new Queue();
+
+ const flush = createMessageFlusher(f, publisher, msgQueue);
+ const ref = setInterval(() => {
+ // eslint-disable-next-line @typescript-eslint/no-floating-promises
+ flush();
+ }, 20);
+
+ f.addHook("onClose", async () => {
+ f.log.info({ tag: "RABBITMQ_FINAL_FLUSH" });
+ clearInterval(ref);
+ // final task to ensure all messages are flushed
+ await flush(true);
+ await publisher.close();
+ await connection.close();
+ });
+
+ function publishToExchange(
+ event: string,
+ payload: any,
+ file: string | null,
+ processAfterDelayMs: number,
+ req?: FastifyRequest,
+ ) {
+ options.validateMsg(event, payload, req);
+ const messageBody: MessageBody = {
+ event,
+ payload,
+ file: file ?? null,
+ processAfterDelayMs:
+ processAfterDelayMs > 0 ? processAfterDelayMs : undefined,
+ publishTimestamp: Date.now(),
+ };
+ msgQueue.enqueue({
+ body: messageBody,
+ attempts: 0,
+ });
+ req?.log.info({
+ tag: "EVENT_PUBLISH",
+ event,
+ payload,
+ processAfterDelayMs,
+ });
+ }
+
+ const bus: EventBus = {
+ publish(event, payload, processAfterDelayMs) {
+ publishToExchange(event, payload, null, processAfterDelayMs ?? 0);
+ },
+ };
+ f.decorate("EventBus", {
+ getter() {
+ return bus;
+ },
+ });
+
+ f.decorateRequest("EventBus", {
+ getter() {
+ return {
+ publish: (event: string, payload: any, processAfterDelayMs?: number) => { // eslint-disable-line @typescript-eslint/no-explicit-any
+ publishToExchange(
+ event,
+ payload,
+ null,
+ processAfterDelayMs ?? 0,
+ this,
+ );
+ },
+ };
+ },
+ });
+
+ const selectAndRunHandlers = CreateHandlerRunner(f, options, handlerMap);
+
+ f.post<{ Body: IncomingRabbitMqMessage }>(
+ "/rabbitmq/process-message",
+ {
+ schema: {
+ hide: true,
+ } as any,
+ },
+ async function (req, reply) {
+ const rawMsg = req.body;
+ if (!rawMsg) {
+ reply.send("OK");
+ return reply;
+ }
+ req.log.info({
+ tag: "RABBITMQ_MESSAGE_RECEIVED",
+ messageId: rawMsg.messageId,
+ });
+ const msg = convert(rawMsg);
+ options.validateMsg(msg.event, msg.data, req);
+
+ if (noMatchingHandlers(handlerMap, msg)) {
+ // bail-out
+ // service has no event-handlers registered
+ reply.send("OK");
+ return reply;
+ }
+
+ req.log.info({
+ tag: "RABBITMQ_MESSAGE_PROCESSING",
+ event: msg,
+ });
+
+ if (
+ msg.processAfterDelayMs > 0 &&
+ Date.now() < msg.publishTime.getTime() + msg.processAfterDelayMs
+ ) {
+ // wait for pub-sub to repush. can't process so early
+ reply
+ .status(425)
+ .send({ processAfterDelayMs: msg?.processAfterDelayMs });
+ return reply;
+ }
+
+ try {
+ await selectAndRunHandlers(req, msg, (event, payload, file) =>
+ publishToExchange(event, payload, file, 0, req),
+ );
+ reply.send("OK");
+ return reply;
+ } catch (err) {
+ if (err instanceof ErrorWithStatus) {
+ reply.status(err.status).send(err.message);
+ } else {
+ reply.status(500).send("ERROR");
+ }
+ return reply;
+ }
+ },
+ );
+};
+
+export = fp(plugin, { name: "fp-eventbus-rabbitmq" });
+
+function convert(msg: IncomingRabbitMqMessage): EventMessage {
+ let body: MessageBody;
+ try {
+ body = JSON.parse(msg.body);
+ } catch {
+ throw new ErrorWithStatus(
+ 400,
+ `Invalid JSON in message body: ${msg.body?.substring(0, 100)}`,
+ );
+ }
+ return {
+ id: "" + msg.messageId,
+ attributes: {
+ event: body.event,
+ processAfterDelayMs: "" + (body.processAfterDelayMs ?? 0),
+ file: body.file ?? "",
+ },
+ data: body.payload,
+ event: body.event,
+ processAfterDelayMs: body.processAfterDelayMs ?? 0,
+ publishTime: new Date(body.publishTimestamp),
+ };
+}
+
+function createMessageFlusher(
+ f: FastifyInstance,
+ publisher: Publisher,
+ msgQueue: Queue,
+) {
+ let running = false;
+ let currentFlush: Promise | null = null;
+ const CONCURRENCY = 10;
+ return async function flush(force = false) {
+ if (msgQueue.size === 0) {
+ return;
+ }
+ // If a flush is already running, wait for it to complete (for forced flushes) or skip
+ if (running) {
+ if (force && currentFlush) {
+ await currentFlush;
+ } else {
+ return;
+ }
+ }
+ running = true;
+ const doFlush = async () => {
+ let flushed = 0;
+ const total = msgQueue.size;
+ const start = Date.now();
+ try {
+ const batch: MessageWithAttempts[] = [];
+ while (msgQueue.size > 0) {
+ const message = msgQueue.dequeue();
+ if (!message) {
+ break;
+ }
+ batch.push(message);
+
+ if (batch.length >= CONCURRENCY) {
+ await flushBatch(batch, publisher, f, msgQueue, () => flushed++);
+ batch.length = 0;
+ }
+ }
+ // Flush any remaining messages in the batch
+ if (batch.length > 0) {
+ await flushBatch(batch, publisher, f, msgQueue, () => flushed++);
+ }
+ } catch (err) {
+ const errMsg = err instanceof Error ? err.message : String(err);
+ f.log.error({
+ tag: "RABBITMQ_FLUSH_ERROR",
+ msg: errMsg,
+ err,
+ });
+ } finally {
+ running = false;
+ currentFlush = null;
+ const latency = Date.now() - start;
+ if (latency > 100) {
+ f.log.warn({
+ tag: "RABBITMQ_SLOW_FLUSH",
+ latency,
+ total,
+ flushed,
+ });
+ }
+ }
+ };
+ currentFlush = doFlush();
+ await currentFlush;
+ };
+}
+
+async function flushBatch(
+ batch: MessageWithAttempts[],
+ publisher: Publisher,
+ f: FastifyInstance,
+ msgQueue: Queue,
+ onSuccess: () => void,
+) {
+ const service = process.env.K_SERVICE ?? "";
+ const prefix = getServicePrefix(service);
+ await Promise.all(
+ batch.map((msg) =>
+ publisher
+ .send(
+ {
+ messageId: randomUUID(),
+ appId: `${prefix}.${service}`,
+ contentType: "application/json",
+ durable: true,
+ exchange: `${prefix}.main-exchange`,
+ headers: {
+ event: msg.body.event,
+ file: msg.body.file,
+ processAfterDelayMs: "" + msg.body.processAfterDelayMs,
+ },
+ },
+ JSON.stringify(msg.body, null, 0),
+ )
+ .then(() => {
+ onSuccess();
+ })
+ .catch((err: unknown) => {
+ const errMsg = err instanceof Error ? err.message : String(err);
+ msg.attempts++;
+ if (msg.attempts < 3) {
+ f.log.error({
+ tag: "RABBITMQ_PUBLISH_ERROR",
+ msg: errMsg,
+ err,
+ attempt: msg.attempts,
+ });
+ msgQueue.enqueue(msg);
+ } else {
+ f.log.error({
+ tag: "RABBITMQ_MESSAGE_PERMANENTLY_DROPPED",
+ msg: errMsg,
+ err,
+ event: msg.body.event,
+ payload: msg.body.payload,
+ });
+ }
+ }),
+ ),
+ );
+}
diff --git a/src/index.ts b/src/index.ts
index 8ed9fbf..0f26ff4 100644
--- a/src/index.ts
+++ b/src/index.ts
@@ -1,7 +1,11 @@
+import FpEventBus from "./event-bus";
import FpFileStore from "./file-store";
export { FileStore } from "./file-store";
+export { EventBus, EventBusOptions, EventMessage } from "./event-bus/interfaces";
+export { EventConsumerBuilder } from "./event-bus/event-consumer/interface";
export const Plugins = {
+ EventBus: FpEventBus,
FileStore: FpFileStore,
};
diff --git a/src/types.ts b/src/types.ts
index 73a45d3..55a1329 100644
--- a/src/types.ts
+++ b/src/types.ts
@@ -1,6 +1,9 @@
-export {};
+import { EventBus } from "./event-bus/interfaces";
declare module "fastify" {
+ export interface FastifyRequest {
+ EventBus: EventBus;
+ }
export interface FastifySchema {
operationId?: string;
summary?: string;