diff --git a/CHANGELOG.md b/CHANGELOG.md index 0e8d873b..39b3fa01 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -9,6 +9,17 @@ project adheres to [Semantic Versioning](http://semver.org/). ### Breaking +- When using the cluster module, you must now call + `promclient.setupClusterWorker()` from each cluster worker. + + Long explanation: v13.2.0 introduced a change from v13.1.0 that broke cluster + metrics if an `AggregatorRegistry` was not instantiated on each cluster + worker. The example in `examples/cluster.js` shows instantiation of an + `AggregatorRegistry` in the workers, so users following that example were not + affected by this change. However, the example was not written as intended: + `new AggregatorRegistry()` should only be called on the cluster master. + `examples/cluster.js` has been updated to show the full, correct usage. + ### Changed ### Added diff --git a/README.md b/README.md index eaccf1a7..d04abe86 100644 --- a/README.md +++ b/README.md @@ -481,6 +481,8 @@ register }); ``` +Note that you must call `client.setupClusterWorker()` in each worker. + ### Pushgateway It is possible to push metrics via a diff --git a/example/cluster.js b/example/cluster.js index b91e1ceb..18ede7a4 100644 --- a/example/cluster.js +++ b/example/cluster.js @@ -2,17 +2,20 @@ const cluster = require('cluster'); const express = require('express'); -const metricsServer = express(); -const AggregatorRegistry = require('../').AggregatorRegistry; -const aggregatorRegistry = new AggregatorRegistry(); +const prometheus = require('../'); if (cluster.isMaster) { + // Instantiate an AggregatorRegistry in the cluster master. + const aggregatorRegistry = new prometheus.AggregatorRegistry(); + for (let i = 0; i < 4; i++) { cluster.fork(); } + const metricsServer = express(); metricsServer.get('/cluster_metrics', async (req, res) => { try { + // Aggregate metrics across all workers. const metrics = await aggregatorRegistry.clusterMetrics(); res.set('Content-Type', aggregatorRegistry.contentType); res.send(metrics); @@ -27,5 +30,8 @@ if (cluster.isMaster) { 'Cluster metrics server listening to 3001, metrics exposed on /cluster_metrics', ); } else { + // Set up the cluster worker. + prometheus.setupClusterWorker(); + // Register metrics as usual. require('./server.js'); } diff --git a/index.d.ts b/index.d.ts index 4d9655d0..be07ccfd 100644 --- a/index.d.ts +++ b/index.d.ts @@ -113,6 +113,12 @@ export class AggregatorRegistry extends Registry { static setRegistries(regs: Array | Registry): void; } +/** + * Sets up the cluster worker for cluster aggregation. Idempotent (safe to call + * more than once). + */ +export function setupClusterWorker(): void; + /** * General metric type */ diff --git a/index.js b/index.js index 1c913138..f762fded 100644 --- a/index.js +++ b/index.js @@ -22,4 +22,5 @@ exports.exponentialBuckets = require('./lib/bucketGenerators').exponentialBucket exports.collectDefaultMetrics = require('./lib/defaultMetrics'); exports.aggregators = require('./lib/metricAggregators').aggregators; -exports.AggregatorRegistry = require('./lib/cluster'); +exports.AggregatorRegistry = require('./lib/cluster').AggregatorRegistry; +exports.setupClusterWorker = require('./lib/cluster').setupClusterWorker; diff --git a/lib/cluster.js b/lib/cluster.js index cb564ede..e85a3156 100644 --- a/lib/cluster.js +++ b/lib/cluster.js @@ -24,13 +24,14 @@ const GET_METRICS_RES = 'prom-client:getMetricsRes'; let registries = [Registry.globalRegistry]; let requestCtr = 0; // Concurrency control -let listenersAdded = false; +let masterListenersAdded = false; +let workerListenersAdded = false; const requests = new Map(); // Pending requests for workers' local metrics. class AggregatorRegistry extends Registry { constructor() { super(); - addListeners(); + setupMasterListeners(); } /** @@ -146,63 +147,72 @@ class AggregatorRegistry extends Registry { } /** - * Adds event listeners for cluster aggregation. Idempotent (safe to call more - * than once). + * Adds the cluster master's event listeners for cluster aggregation. Idempotent + * (safe to call more than once). * @return {void} */ -function addListeners() { - if (listenersAdded) return; - listenersAdded = true; - - if (cluster().isMaster) { - // Listen for worker responses to requests for local metrics - cluster().on('message', (worker, message) => { - if (message.type === GET_METRICS_RES) { - const request = requests.get(message.requestId); - - if (message.error) { - request.done(new Error(message.error)); - return; - } +function setupMasterListeners() { + if (masterListenersAdded) return; + masterListenersAdded = true; + + // Listen for worker responses to requests for local metrics + cluster().on('message', (worker, message) => { + if (message.type === GET_METRICS_RES) { + const request = requests.get(message.requestId); + + if (message.error) { + request.done(new Error(message.error)); + return; + } - message.metrics.forEach(registry => request.responses.push(registry)); - request.pending--; + message.metrics.forEach(registry => request.responses.push(registry)); + request.pending--; - if (request.pending === 0) { - // finalize - requests.delete(message.requestId); - clearTimeout(request.errorTimeout); + if (request.pending === 0) { + // finalize + requests.delete(message.requestId); + clearTimeout(request.errorTimeout); - const registry = AggregatorRegistry.aggregate(request.responses); - const promString = registry.metrics(); - request.done(null, promString); - } + const registry = AggregatorRegistry.aggregate(request.responses); + const promString = registry.metrics(); + request.done(null, promString); } - }); - } + } + }); +} - if (cluster().isWorker) { - // Respond to master's requests for worker's local metrics. - process.on('message', message => { - if (message.type === GET_METRICS_REQ) { - Promise.all(registries.map(r => r.getMetricsAsJSON())) - .then(metrics => { - process.send({ - type: GET_METRICS_RES, - requestId: message.requestId, - metrics, - }); - }) - .catch(error => { - process.send({ - type: GET_METRICS_RES, - requestId: message.requestId, - error: error.message, - }); +/** + * Adds the cluster worker's event listeners for cluster aggregation. Idempotent + * (safe to call more than once). + * @return {void} + */ +function setupClusterWorker() { + if (workerListenersAdded) return; + workerListenersAdded = true; + + // Respond to master's requests for worker's local metrics. + process.on('message', message => { + if (message.type === GET_METRICS_REQ) { + Promise.all(registries.map(r => r.getMetricsAsJSON())) + .then(metrics => { + process.send({ + type: GET_METRICS_RES, + requestId: message.requestId, + metrics, }); - } - }); - } + }) + .catch(error => { + process.send({ + type: GET_METRICS_RES, + requestId: message.requestId, + error: error.message, + }); + }); + } + }); } -module.exports = AggregatorRegistry; +module.exports = { + AggregatorRegistry, + setupClusterWorker, +}; diff --git a/test/clusterTest.js b/test/clusterTest.js index 8569ea99..d5ead712 100644 --- a/test/clusterTest.js +++ b/test/clusterTest.js @@ -34,7 +34,7 @@ describe('AggregatorRegistry', () => { describe('aggregatorRegistry.clusterMetrics()', () => { it('works properly if there are no cluster workers', async () => { - const AggregatorRegistry = require('../lib/cluster'); + const { AggregatorRegistry } = require('../lib/cluster'); const ar = new AggregatorRegistry(); const metrics = await ar.clusterMetrics(); expect(metrics).toEqual(''); @@ -42,7 +42,7 @@ describe('AggregatorRegistry', () => { }); describe('AggregatorRegistry.aggregate()', () => { - const Registry = require('../lib/cluster'); + const { AggregatorRegistry } = require('../lib/cluster'); // These mimic the output of `getMetricsAsJSON`. const metricsArr1 = [ { @@ -159,7 +159,7 @@ describe('AggregatorRegistry', () => { }, ]; - const aggregated = Registry.aggregate([metricsArr1, metricsArr2]); + const aggregated = AggregatorRegistry.aggregate([metricsArr1, metricsArr2]); it('defaults to summation, preserves histogram bins', async () => { const histogram = aggregated.getSingleMetric('test_histogram').get();