Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
59 changes: 59 additions & 0 deletions lib/integrations/bullmq.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
import { RequireIntegration } from "../types/integrations";
import { ScoutContextName, ScoutSpanOperation } from "../types";
import { trackJobQueueTime } from "../job-queue-time";

export class BullMQIntegration extends RequireIntegration {
protected readonly packageName: string = "bullmq";

protected shim(bullmqExport: any) {
return this.shimWorker(bullmqExport);
}

private shimWorker(bullmqExport: any): any {
const Worker = bullmqExport.Worker;
if (!Worker || !Worker.prototype) { return bullmqExport; }

const originalCallProcessJob = Worker.prototype.callProcessJob;
if (!originalCallProcessJob) { return bullmqExport; }

const integration = this;

Worker.prototype.callProcessJob = function(job: any, token: string) {
if (!integration.scout) {
return originalCallProcessJob.apply(this, [job, token]);
}

const opName = `${ScoutSpanOperation.BullMQJob}/${job.name || "unknown"}`;
const self = this;

return integration.scout.transaction(opName, (finishRequest) => {
return integration.scout!.instrument(opName, (_, { span }) => {
if (span) {
span.addContextSync(ScoutContextName.TaskId, job.id || "");
span.addContextSync(ScoutContextName.Queue, job.queueName || "");
span.addContextSync(
ScoutContextName.Priority,
job.opts?.priority != null ? String(job.opts.priority) : "unknown",
);
if (job.timestamp) { trackJobQueueTime(span, job.timestamp); }
}

return originalCallProcessJob.apply(self, [job, token])
.then((result: any) => {
finishRequest();
return result;
})
.catch((err: any) => {
if (span) { span.addContextSync(ScoutContextName.Error, "true"); }
finishRequest();
throw err;
});
});
});
};

return bullmqExport;
}
}

export default new BullMQIntegration();
3 changes: 3 additions & 0 deletions lib/integrations/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import prismaIntegration from "./prisma";
import fetchIntegration from "./fetch";
import redisIntegration from "./redis";
import mongodbIntegration from "./mongodb";
import bullmqIntegration from "./bullmq";
import { doNothingRequireIntegration, RequireIntegration } from "../types/integrations";

export const KNOWN_PACKAGES: string[] = [
Expand All @@ -29,6 +30,7 @@ export const KNOWN_PACKAGES: string[] = [
fetchIntegration.getPackageName(),
redisIntegration.getPackageName(),
mongodbIntegration.getPackageName(),
bullmqIntegration.getPackageName(),
];

export function getIntegrationForPackage(pkg: string): RequireIntegration {
Expand All @@ -47,6 +49,7 @@ export function getIntegrationForPackage(pkg: string): RequireIntegration {
case fetchIntegration.getPackageName(): return fetchIntegration;
case redisIntegration.getPackageName(): return redisIntegration;
case mongodbIntegration.getPackageName(): return mongodbIntegration;
case bullmqIntegration.getPackageName(): return bullmqIntegration;
default: return doNothingRequireIntegration;
}
}
3 changes: 3 additions & 0 deletions lib/types/enum.ts
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,8 @@ export enum ScoutContextName {
QueueTimeNS = "scout.queue_time_ns",
JobQueueTimeNS = "scout.job_queue_time_ns",
Queue = "queue",
TaskId = "task_id",
Priority = "priority",
}

export enum ScoutSpanOperation {
Expand All @@ -159,4 +161,5 @@ export enum ScoutSpanOperation {
HTTPDelete = "HTTP/DELETE",
HTTPPut = "HTTP/PUT",
HTTPPatch = "HTTP/PATCH",
BullMQJob = "Job",
}
11 changes: 6 additions & 5 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -59,13 +59,18 @@
"@nestjs/common": "^10.0.0",
"@nestjs/core": "^10.0.0",
"@nestjs/platform-express": "^10.0.0",
"@prisma/client": "^6.0.0",
"@scout_apm/scout-apm": "file:.",
"@types/express": "^4.17.21",
"@types/mysql": "^2.15.26",
"@types/node": "^18.19.0",
"@types/supertest": "^6.0.2",
"@typescript-eslint/eslint-plugin": "^7.0.0",
"@typescript-eslint/parser": "^7.0.0",
"app-root-dir": "^1.0.2",
"bullmq": "^5.79.1",
"ejs": "^3.1.10",
"eslint": "^8.57.0",
"express": "^4.21.2",
"get-port": "^5.1.1",
"http-proxy-middleware": "^2.0.7",
Expand All @@ -77,7 +82,6 @@
"mysql": "^2.18.1",
"mysql2": "^3.11.5",
"pg": "^8.13.1",
"@prisma/client": "^6.0.0",
"prisma": "^6.0.0",
"pug": "^3.0.3",
"randomstring": "^1.3.0",
Expand All @@ -89,9 +93,6 @@
"supertest": "^4.0.2",
"tape": "^4.12.1",
"tempfile": "^3.0.0",
"@typescript-eslint/eslint-plugin": "^7.0.0",
"@typescript-eslint/parser": "^7.0.0",
"eslint": "^8.57.0",
"typescript": "^5.0.0"
},
"resolutions": {
Expand All @@ -111,7 +112,6 @@
"@types/tape": "4.2.33",
"@types/tmp": "0.0.34",
"app-root-path": "^3.0.0",
"tslib": "^2.8.1",
"big-number": "^2.0.0",
"cpu-percentage": "^1.0.3",
"detect-libc": "1.0.3",
Expand All @@ -131,6 +131,7 @@
"tcp-port-used": "^1.0.2",
"tmp": "0.0.33",
"tmp-promise": "1.0.5",
"tslib": "^2.8.1",
"uuid": "^11.0.0",
"winston": "^3.2.1"
}
Expand Down
195 changes: 195 additions & 0 deletions test/integrations/bullmq.e2e.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,195 @@
import { setupRequireIntegrations } from "../../lib";
setupRequireIntegrations(["bullmq"]);

import { Queue, Worker } from "bullmq";
import * as test from "tape";
import * as TestUtil from "../util";
import { getIntegrationSymbol } from "../../lib/types/integrations";
import { ScoutEvent, buildScoutConfiguration } from "../../lib/types";
import { Scout, ScoutEventRequestSentData } from "../../lib/scout";
import { MockAgent } from "../integration/mock-agent";

const REDIS_HOST = process.env.REDIS_HOST || "127.0.0.1";
const REDIS_PORT = parseInt(process.env.REDIS_PORT || "6379", 10);
const TIMEOUT_MS = 15000;
const QUEUE_NAME = "scout-bullmq-test";

const redisConn = { host: REDIS_HOST, port: REDIS_PORT };
const sharedMock = new MockAgent();

test("setup: start shared mock agent", (t) => {
sharedMock.start().then(() => t.end()).catch(t.end);
});

test("bullmq Worker shim is applied", (t) => {
t.ok((Worker as any)[getIntegrationSymbol()], "Worker class has integration symbol");
t.end();
});

test("Job/{name} span is created for a processed job", { timeout: TIMEOUT_MS }, (t) => {
const scout = new Scout(buildScoutConfiguration({
monitor: true,
coreAgentDownload: false,
coreAgentLaunch: false,
socketPath: sharedMock.socketPath(),
}));

const queue = new Queue(QUEUE_NAME, { connection: redisConn });
let worker: Worker | undefined;

const cleanup = (err?: any) => {
const closeQueue = () => queue.close().catch(() => undefined);
const closeWorker = () => worker ? worker.close().catch(() => undefined) : Promise.resolve();
return closeWorker()
.then(closeQueue)
.then(() => TestUtil.shutdownScout(t, scout, err));
};

const listener = (data: ScoutEventRequestSentData) => {
const spans = data.request.getChildSpansSync();
const jobSpan = spans.find((s) => s.operation === "Job/TestJob");
if (!jobSpan) { return; }

scout.removeListener(ScoutEvent.RequestSent, listener);

t.ok(jobSpan, "Job/TestJob span present");
t.equal(jobSpan.operation, "Job/TestJob", "operation is Job/TestJob");

const queue = jobSpan.getContextValue("queue");
t.ok(queue, "queue context is set");
t.equal(queue, QUEUE_NAME, `queue context matches queue name (${QUEUE_NAME})`);

const taskId = jobSpan.getContextValue("task_id");
t.ok(taskId !== undefined && taskId !== "", "task_id context is set");

const queueTimeNs = jobSpan.getContextValue("scout.job_queue_time_ns");
t.ok(typeof queueTimeNs === "number", "scout.job_queue_time_ns is a number");
t.ok((queueTimeNs as number) >= 0, "scout.job_queue_time_ns is >= 0");

const priority = jobSpan.getContextValue("priority");
t.ok(priority !== undefined, "priority context is set");

cleanup().catch((err2) => t.end(err2));
};

scout.on(ScoutEvent.RequestSent, listener);

scout.setup()
.then(() => {
worker = new Worker(
QUEUE_NAME,
async (_job) => {
await new Promise((r) => setTimeout(r, 50));
},
{ connection: redisConn },
);
return queue.add("TestJob", { hello: "world" });
})
.catch(cleanup);
});

test("Job/{name} span with priority context", { timeout: TIMEOUT_MS }, (t) => {
const scout = new Scout(buildScoutConfiguration({
monitor: true,
coreAgentDownload: false,
coreAgentLaunch: false,
socketPath: sharedMock.socketPath(),
}));

const queue = new Queue(QUEUE_NAME, { connection: redisConn });
let worker: Worker | undefined;

const cleanup = (err?: any) => {
const closeQueue = () => queue.close().catch(() => undefined);
const closeWorker = () => worker ? worker.close().catch(() => undefined) : Promise.resolve();
return closeWorker()
.then(closeQueue)
.then(() => TestUtil.shutdownScout(t, scout, err));
};

const listener = (data: ScoutEventRequestSentData) => {
const spans = data.request.getChildSpansSync();
const jobSpan = spans.find((s) => s.operation === "Job/PriorityJob");
if (!jobSpan) { return; }

scout.removeListener(ScoutEvent.RequestSent, listener);

const priority = jobSpan.getContextValue("priority");
t.equal(priority, "5", "priority context reflects enqueued priority (5)");

cleanup().catch((err2) => t.end(err2));
};

scout.on(ScoutEvent.RequestSent, listener);

scout.setup()
.then(() => {
worker = new Worker(
QUEUE_NAME,
async (_job) => {
await new Promise((r) => setTimeout(r, 50));
},
{ connection: redisConn },
);
return queue.add("PriorityJob", {}, { priority: 5 });
})
.catch(cleanup);
});

test("error flag set when processor throws", { timeout: TIMEOUT_MS }, (t) => {
const scout = new Scout(buildScoutConfiguration({
monitor: true,
coreAgentDownload: false,
coreAgentLaunch: false,
socketPath: sharedMock.socketPath(),
}));

const queue = new Queue(QUEUE_NAME, { connection: redisConn });
let worker: Worker | undefined;

const cleanup = (err?: any) => {
const closeQueue = () => queue.close().catch(() => undefined);
const closeWorker = () => worker ? worker.close().catch(() => undefined) : Promise.resolve();
return closeWorker()
.then(closeQueue)
.then(() => TestUtil.shutdownScout(t, scout, err));
};

const listener = (data: ScoutEventRequestSentData) => {
const spans = data.request.getChildSpansSync();
const jobSpan = spans.find((s) => s.operation === "Job/FailingJob");
if (!jobSpan) { return; }

scout.removeListener(ScoutEvent.RequestSent, listener);

const errorFlag = jobSpan.getContextValue("error");
t.equal(errorFlag, "true", "error context is 'true' when processor throws");

cleanup().catch((err2) => t.end(err2));
};

scout.on(ScoutEvent.RequestSent, listener);

scout.setup()
.then(() => {
worker = new Worker(
QUEUE_NAME,
async (_job) => {
throw new Error("intentional test failure");
},
{
connection: redisConn,
// Don't retry — fail immediately
settings: { backoffStrategy: () => 0 },
},
);
// BullMQ won't re-throw to callProcessJob on retry by default;
// use attempts:1 so it fails without retrying
return queue.add("FailingJob", {}, { attempts: 1 });
})
.catch(cleanup);
});

test("teardown: stop shared mock agent", (t) => {
sharedMock.stop().then(() => t.end()).catch(t.end);
});
Loading
Loading