From 4524f55faef9074270bec851cbd0c36c65cde649 Mon Sep 17 00:00:00 2001 From: "Paulo R. Lima" Date: Mon, 16 Jun 2025 13:15:06 -0300 Subject: [PATCH 1/2] feat: add support for BullMQ 5.x while maintaining backwards compatibility --- package-lock.json | 2 +- package.json | 2 +- src/index.ts | 47 ++++++++++++++++++++++++++++++++++++----------- tsconfig.json | 3 ++- 4 files changed, 40 insertions(+), 14 deletions(-) diff --git a/package-lock.json b/package-lock.json index b69d5b1..0b64ad2 100644 --- a/package-lock.json +++ b/package-lock.json @@ -18,7 +18,7 @@ "typescript": "^4.5.2" }, "peerDependencies": { - "bullmq": "^1.59.4", + "bullmq": "^1.59.4 || ^5.0.0", "fastify": ">=3.24.1" } }, diff --git a/package.json b/package.json index 0c5b7a7..f171e65 100644 --- a/package.json +++ b/package.json @@ -34,7 +34,7 @@ "license": "MIT", "peerDependencies": { "fastify": ">=3.24.1", - "bullmq": "^1.59.4" + "bullmq": "^1.59.4 || ^5.0.0" }, "devDependencies": { "@types/node": "^16.11.12", diff --git a/src/index.ts b/src/index.ts index 3222877..bb32b50 100644 --- a/src/index.ts +++ b/src/index.ts @@ -1,6 +1,6 @@ import { FastifyInstance } from 'fastify'; import fp from 'fastify-plugin'; -import { Queue, Worker, ConnectionOptions } from 'bullmq'; +import { Queue, Worker, ConnectionOptions, Job } from 'bullmq'; import * as fg from 'fast-glob'; import path from 'path'; @@ -20,10 +20,14 @@ const fastifyBullMQ = async ( ) => { const queues = {}; const workers = {}; + // Detectar versão do BullMQ verificando se Worker suporta certas propriedades + const isBullMQ5 = Queue.prototype.hasOwnProperty('isPaused'); + + fastify.log.info(`Using BullMQ version ${isBullMQ5 ? '5.x' : '1.x'} compatibility mode`); const files = fg.sync(opts.bullPath); - files.forEach(async (filePath) => { + for (const filePath of files) { const parts = filePath.split('/'); // the queue name is defined by the name of the directory in which the files are const queueName = parts[parts.length - 2]; @@ -45,17 +49,38 @@ const fastifyBullMQ = async ( `The queue ${queueName} does not have a worker function` ); } else { - (workers as any)[queueName] = new Worker( - queueName, - (job) => worker(fastify, job), - { - connection: opts.connection, - ...(workerConfig && workerConfig), - } - ); + if (isBullMQ5) { + // BullMQ 5.x compatible worker + (workers as any)[queueName] = new Worker( + queueName, + async (job: Job) => { + try { + return await worker(fastify, job); + } catch (error: unknown) { + const errorMessage = error instanceof Error ? error.message : String(error); + fastify.log.error(`Error processing job ${job.id} in queue ${queueName}: ${errorMessage}`); + throw error; + } + }, + { + connection: opts.connection, + ...(workerConfig && workerConfig), + } + ); + } else { + // BullMQ 1.x compatible worker + (workers as any)[queueName] = new Worker( + queueName, + (job) => worker(fastify, job), + { + connection: opts.connection, + ...(workerConfig && workerConfig), + } + ); + } fastify.log.info(`Created a worker for the queue ${queueName}`); } - }); + } fastify.decorate('queues', queues); fastify.decorate('workers', workers); diff --git a/tsconfig.json b/tsconfig.json index 62871e4..e231e05 100644 --- a/tsconfig.json +++ b/tsconfig.json @@ -5,7 +5,8 @@ "compilerOptions": { "module": "commonjs", "lib": ["dom", "esnext"], - "importHelpers": true, + "importHelpers": false, + "types": ["node"], // output .d.ts declaration files for consumers "declaration": true, // output .js.map sourcemap files for consumers From ee6b1b638c00fea337b922f9ca436beb238beaf6 Mon Sep 17 00:00:00 2001 From: "Paulo R. Lima" Date: Wed, 18 Jun 2025 16:50:33 -0300 Subject: [PATCH 2/2] refactor: unify worker implementation for BullMQ version compatibility --- src/index.ts | 46 +++++++++++++++++----------------------------- 1 file changed, 17 insertions(+), 29 deletions(-) diff --git a/src/index.ts b/src/index.ts index bb32b50..8727a1e 100644 --- a/src/index.ts +++ b/src/index.ts @@ -20,7 +20,7 @@ const fastifyBullMQ = async ( ) => { const queues = {}; const workers = {}; - // Detectar versão do BullMQ verificando se Worker suporta certas propriedades + // Detect BullMQ version by checking if Queue supports certain properties const isBullMQ5 = Queue.prototype.hasOwnProperty('isPaused'); fastify.log.info(`Using BullMQ version ${isBullMQ5 ? '5.x' : '1.x'} compatibility mode`); @@ -49,35 +49,23 @@ const fastifyBullMQ = async ( `The queue ${queueName} does not have a worker function` ); } else { - if (isBullMQ5) { - // BullMQ 5.x compatible worker - (workers as any)[queueName] = new Worker( - queueName, - async (job: Job) => { - try { - return await worker(fastify, job); - } catch (error: unknown) { - const errorMessage = error instanceof Error ? error.message : String(error); - fastify.log.error(`Error processing job ${job.id} in queue ${queueName}: ${errorMessage}`); - throw error; - } - }, - { - connection: opts.connection, - ...(workerConfig && workerConfig), + // Worker implementation compatible with both BullMQ 1.x and 5.x + (workers as any)[queueName] = new Worker( + queueName, + async (job: Job) => { + try { + return await worker(fastify, job); + } catch (error: unknown) { + const errorMessage = error instanceof Error ? error.message : String(error); + fastify.log.error(`Error processing job ${job.id} in queue ${queueName}: ${errorMessage}`); + throw error; } - ); - } else { - // BullMQ 1.x compatible worker - (workers as any)[queueName] = new Worker( - queueName, - (job) => worker(fastify, job), - { - connection: opts.connection, - ...(workerConfig && workerConfig), - } - ); - } + }, + { + connection: opts.connection, + ...(workerConfig && workerConfig), + } + ); fastify.log.info(`Created a worker for the queue ${queueName}`); } }