diff --git a/package-lock.json b/package-lock.json index b69d5b1..0b64ad2 100644 --- a/package-lock.json +++ b/package-lock.json @@ -18,7 +18,7 @@ "typescript": "^4.5.2" }, "peerDependencies": { - "bullmq": "^1.59.4", + "bullmq": "^1.59.4 || ^5.0.0", "fastify": ">=3.24.1" } }, diff --git a/package.json b/package.json index 0c5b7a7..f171e65 100644 --- a/package.json +++ b/package.json @@ -34,7 +34,7 @@ "license": "MIT", "peerDependencies": { "fastify": ">=3.24.1", - "bullmq": "^1.59.4" + "bullmq": "^1.59.4 || ^5.0.0" }, "devDependencies": { "@types/node": "^16.11.12", diff --git a/src/index.ts b/src/index.ts index 3222877..8727a1e 100644 --- a/src/index.ts +++ b/src/index.ts @@ -1,6 +1,6 @@ import { FastifyInstance } from 'fastify'; import fp from 'fastify-plugin'; -import { Queue, Worker, ConnectionOptions } from 'bullmq'; +import { Queue, Worker, ConnectionOptions, Job } from 'bullmq'; import * as fg from 'fast-glob'; import path from 'path'; @@ -20,10 +20,14 @@ const fastifyBullMQ = async ( ) => { const queues = {}; const workers = {}; + // Detect BullMQ version by checking if Queue supports certain properties + const isBullMQ5 = Queue.prototype.hasOwnProperty('isPaused'); + + fastify.log.info(`Using BullMQ version ${isBullMQ5 ? '5.x' : '1.x'} compatibility mode`); const files = fg.sync(opts.bullPath); - files.forEach(async (filePath) => { + for (const filePath of files) { const parts = filePath.split('/'); // the queue name is defined by the name of the directory in which the files are const queueName = parts[parts.length - 2]; @@ -45,9 +49,18 @@ const fastifyBullMQ = async ( `The queue ${queueName} does not have a worker function` ); } else { + // Worker implementation compatible with both BullMQ 1.x and 5.x (workers as any)[queueName] = new Worker( queueName, - (job) => worker(fastify, job), + async (job: Job) => { + try { + return await worker(fastify, job); + } catch (error: unknown) { + const errorMessage = error instanceof Error ? error.message : String(error); + fastify.log.error(`Error processing job ${job.id} in queue ${queueName}: ${errorMessage}`); + throw error; + } + }, { connection: opts.connection, ...(workerConfig && workerConfig), @@ -55,7 +68,7 @@ const fastifyBullMQ = async ( ); fastify.log.info(`Created a worker for the queue ${queueName}`); } - }); + } fastify.decorate('queues', queues); fastify.decorate('workers', workers); diff --git a/tsconfig.json b/tsconfig.json index 62871e4..e231e05 100644 --- a/tsconfig.json +++ b/tsconfig.json @@ -5,7 +5,8 @@ "compilerOptions": { "module": "commonjs", "lib": ["dom", "esnext"], - "importHelpers": true, + "importHelpers": false, + "types": ["node"], // output .d.ts declaration files for consumers "declaration": true, // output .js.map sourcemap files for consumers