From b1eea1e82a244cfaf168d4df427b28cc77cbcfd1 Mon Sep 17 00:00:00 2001 From: Michael Pfeiffer Date: Thu, 18 Jun 2026 21:06:44 -0500 Subject: [PATCH 1/4] =?UTF-8?q?feat:=20add=20MergeboxCollector=20=E2=80=94?= =?UTF-8?q?=20attribute=20Meteor=20mergebox=20RAM=20residency?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Adds a new collector that measures Meteor's mergebox (the per-session, server-side cache of published documents) and reports per-(publication, collection) RAM residency to POST /api/v1/metrics/mergebox. The mergebox is the single largest and least-visible consumer of server RAM in pub/sub-heavy apps, and nothing measured it before. Where the bytes come from (ddp-server internals, verified against 3.x): Meteor.server.sessions -> session.collectionViews -> SessionCollectionView.documents -> SessionDocumentView.dataByKey (the resident field values) + existsIn (the subs referencing each doc). Subscription._documents is refcount-only and is NOT used. Attribution is pure even-split: each resident doc's bytes/docCount are divided across its existsIn handles and bucketed by (publicationName, collectionName). Because the split divides by existsIn.size, the rows for a collection sum back to its true residency (sum-preserving) — so the server's existing $sum aggregations reconstruct exact per-collection/strategy/site totals with no special "truth" row. Per-publication bytes are therefore an attribution (shared docs), per collection is exact. publicationStrategy is read via getPublicationStrategy() and reverse-mapped by identity (SERVER_MERGE / NO_MERGE / NO_MERGE_NO_HISTORY; NO_MERGE_MULTI and anything else -> unknown). NO_MERGE / NO_MERGE_NO_HISTORY keep no collectionView, so they correctly read ~0 residency — that absence is the optimization signal. Safety: read-only snapshot (never wraps session.send/processMessage, avoiding the bug #7 double-wrap), default OFF (collectMergebox), low 60s cadence with staggered start, per-session sampling (mergeboxSampleRate, extrapolated server-side), maxSessions / maxDocsPerSession caps, top-N row cap aligned to the 500/POST limit, per-session try/catch, and feature-detection that degrades to zero rows on any Meteor shape mismatch. Wires collectMergebox / mergeboxInterval / mergeboxSampleRate / cap options through config.js + env.js, adds SkySignalClient.addMergeboxMetric + /api/v1/metrics/mergebox batching, gates startup in skysignal-agent.js, and bumps AGENT_VERSION + package version to 1.1.0. The core ingest endpoint, service, and System > Mergebox UI already exist (SkySignalAPM/core PR #55). --- CHANGELOG.md | 7 + lib/SkySignalClient.js | 27 +- lib/collectors/MergeboxCollector.js | 456 ++++++++++++++ lib/collectors/SystemMetricsCollector.js | 2 +- lib/config.js | 31 + lib/env.js | 7 + package.js | 2 +- skysignal-agent.js | 31 + tests/unit/client/SkySignalClient.test.js | 6 +- .../unit/collectors/MergeboxCollector.test.js | 555 ++++++++++++++++++ 10 files changed, 1119 insertions(+), 5 deletions(-) create mode 100644 lib/collectors/MergeboxCollector.js create mode 100644 tests/unit/collectors/MergeboxCollector.test.js diff --git a/CHANGELOG.md b/CHANGELOG.md index 3a048b6..1951680 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,6 +1,13 @@ # Changelog +### v1.1.0 (Mergebox RAM Residency Collector) + +- **New `MergeboxCollector`** - Measures Meteor's MERGEBOX RAM residency (the per-session, server-side cache of published documents) and posts per-(publication, collection) rollups to `POST /api/v1/metrics/mergebox`. The collector walks `Meteor.server.sessions` read-only, estimates the resident bytes each session's mergebox holds per published collection (sizing each `SessionDocumentView.dataByKey` field value directly), reads the publication strategy via `Meteor.server.getPublicationStrategy()` (reverse-mapped to `SERVER_MERGE` / `NO_MERGE` / `NO_MERGE_NO_HISTORY` / `unknown`), and attributes residency to subscriptions via a pure even-split across `existsIn`. The even-split is sum-preserving: the rows for a collection sum back to that collection's true residency. `connectionCount` is a count of distinct DDP sessions (never a list of connection ids). + - **Ships dark / opt-in** - `collectMergebox` defaults to **false**. Enable via `SkySignalAgent.configure({ collectMergebox: true })` or `SKYSIGNAL_COLLECT_MERGEBOX=true`. + - **Performance-bounded** - 60s default cadence (`mergeboxInterval`), per-session sampling (`mergeboxSampleRate`, every row stamps the rate for server-side extrapolation), `mergeboxMaxSessions` / `mergeboxMaxDocsPerSession` caps to bound a single synchronous tick, per-session try/catch, feature-detection of internal shapes, and a top-N output cap aligned with the server's 500-rows-per-POST limit. Read-only — never wraps `session.send` / `processMessage`. + - Requires platform-side support for the mergebox ingest endpoint (gated on the `ddp` feature / pro tier). + ### v1.0.33 (App Version on Browser Errors) - **Browser errors now carry the app version** - The client `ErrorTracker` stamps an `appVersion` onto every captured error (via a new `errorTracking.appVersion` setting, falling back to `Meteor.settings.public.skysignal.appVersion`, `Meteor.settings.public.appVersion`, then `__meteor_runtime_config__.appVersion`). The SkySignal Error Details screen shows this version, making it possible to tell whether a reported bug originates from an old or a current build. Errors reported without a version are unaffected. (fixes [#17](https://github.com/SkySignalAPM/agent/issues/17)) diff --git a/lib/SkySignalClient.js b/lib/SkySignalClient.js index 4b13ad1..d7630ea 100644 --- a/lib/SkySignalClient.js +++ b/lib/SkySignalClient.js @@ -35,6 +35,7 @@ const BATCH_ENDPOINTS = { publications: "/api/v1/metrics/publications", environment: "/api/v1/metrics/environment", vulnerabilities: "/api/v1/metrics/vulnerabilities", + mergebox: "/api/v1/metrics/mergebox", liveQueryAggregates: "/api/v1/live-queries/aggregates", subscriptionAggregates: "/api/v1/subscriptions/aggregates" }; @@ -64,6 +65,7 @@ const BATCH_PAYLOAD_KEYS = { publications: "metrics", environment: "metrics", vulnerabilities: "metrics", + mergebox: "metrics", liveQueryAggregates: "aggregates", subscriptionAggregates: "aggregates" }; @@ -171,7 +173,8 @@ export default class SkySignalClient { deprecatedApis: [], publications: [], environment: [], - vulnerabilities: [] + vulnerabilities: [], + mergebox: [] }; // Track batch sizes incrementally (O(1) add instead of O(n)) @@ -438,6 +441,28 @@ export default class SkySignalClient { this._addToBatch("vulnerabilities", metric, "/api/v1/metrics/vulnerabilities"); } + /** + * Add a mergebox RAM residency rollup row to the batch. + * + * Each row is a pre-aggregated per-(publication, collection) residency rollup + * for a flush window (the MergeboxCollector does the even-split attribution + * agent-side). Rows are batched per-item like other per-window metrics; the + * server reads them under the "metrics" key at POST /api/v1/metrics/mergebox. + * + * @param {Object} metric - Mergebox residency rollup row + * @param {string} metric.collectionName - Published collection name + * @param {string} [metric.publicationName] - Publication name (omitted for auto-publish) + * @param {string} metric.strategy - SERVER_MERGE | NO_MERGE | NO_MERGE_NO_HISTORY | unknown + * @param {number} metric.bytesHeld - Estimated mergebox residency bytes for this group + * @param {number} metric.docCount - Documents resident for this group + * @param {number} metric.connectionCount - Distinct DDP connections holding this group + * @param {number} metric.sampleRate - Per-session sample rate used by the collector + * @returns {void} + */ + addMergeboxMetric(metric) { + this._addToBatch("mergebox", metric, "/api/v1/metrics/mergebox"); + } + /** * Add a Real User Monitoring (RUM) measurement to the batch. * diff --git a/lib/collectors/MergeboxCollector.js b/lib/collectors/MergeboxCollector.js new file mode 100644 index 0000000..5a0ca12 --- /dev/null +++ b/lib/collectors/MergeboxCollector.js @@ -0,0 +1,456 @@ +import { Meteor } from "meteor/meteor"; +import { estimateObjectSize } from "../sizeEstimator.js"; + +/** + * MergeboxCollector + * + * Measures Meteor's MERGEBOX RAM residency — the per-session, server-side cache + * of published documents — and emits per-(publication, collection) rollups to + * POST /api/v1/metrics/mergebox. + * + * WHERE THE BYTES LIVE (all ddp-server INTERNALS, source-verified against + * ddp-server 3.x — packages/ddp-server/livedata_server.js + session_*_view.ts): + * - Meteor.server.sessions : Map(sessionId -> Session) + * - session.collectionViews : Map(collectionName -> SessionCollectionView) + * - SessionCollectionView.documents : Map(docId -> SessionDocumentView | DummyDocumentView) + * - SessionDocumentView.dataByKey : Map(field -> PrecedenceItem[]) + * where PrecedenceItem = { subscriptionHandle, value }. The RESIDENT value + * is precedenceList[0].value (see SessionDocumentView.getFields()). + * - SessionDocumentView.existsIn : Set(subscriptionHandle) — the subs referencing + * this doc. SessionCollectionView.removed() deletes the doc once existsIn is + * empty, so a resident doc always has >= 1 handle (we still guard for 0). + * + * We DO NOT use Subscription._documents — that is Map(collectionName -> Set(docId)), + * a refcount-only structure with NO field values. It is not the byte store. + * + * ATTRIBUTION — PURE EVEN-SPLIT (NO per-collection "truth row"): + * The shipped server MergeboxService $sums bytesHeld/docCount across ALL rows per + * collection with no discriminator (getByCollection / getStrategyBreakdown), so + * emitting a separate per-collection total alongside even-split rows would + * double-count. We emit ONLY even-split per-(publicationName, collectionName) rows. + * For a doc referenced by n handles, each referencing handle's bucket receives + * docBytes/n and 1/n docCount. Because the split divides by existsIn.size, the + * even-split rows for a collection SUM BACK to that collection's true residency + * (sum-preserving). That property is the load-bearing correctness invariant. + * + * STRATEGY: + * Read via Meteor.server.getPublicationStrategy(collectionName) — an OBJECT + * { useCollectionView, doAccountingForCollection, useDummyDocumentView }, NOT a + * string. We reverse-map by identity against DDPServer.publicationStrategies + * (SERVER_MERGE / NO_MERGE / NO_MERGE_NO_HISTORY). NO_MERGE / NO_MERGE_NO_HISTORY + * keep NO collectionView entry at all (residency ~0); that absence is the whole + * signal and is never synthesized. NO_MERGE_MULTI uses DummyDocumentView (empty + * dataByKey) and maps to "unknown". Anything unrecognized / any throw -> "unknown". + * + * READ-ONLY: this collector never wraps session.send / processMessage (DDPCollector + * already chains those; double-wrapping risks the stack-overflow regression from + * bug #7). It only reads in a low-cadence setInterval tick. + */ +export default class MergeboxCollector { + constructor(options = {}) { + this.client = options.client; // SkySignalClient instance + this.host = options.host || "unknown-host"; + this.appVersion = options.appVersion || "unknown"; + this.buildHash = options.buildHash || null; + this.interval = options.interval || 60000; // 60s default — low cadence + + // Per-session sampling. < 1.0 samples a fraction of sessions; every emitted + // row carries sampleRate so the server can extrapolate. + this.sampleRate = typeof options.sampleRate === "number" ? options.sampleRate : 1.0; + + // Bounds for a single synchronous tick (mirror LiveQueriesCollector's caps). + this.maxSessions = options.maxSessions || 2000; + this.maxDocsPerSession = options.maxDocsPerSession || 50000; + + // Server accepts up to 500 rows per POST; cap output (top-N by bytesHeld). + this.maxRows = options.maxRows || 500; + + this.debug = options.debug || false; + this.intervalId = null; + this.windowStartTime = Date.now(); + } + + /** @private */ + _log(...args) { + if (this.debug) { + console.log("[SkySignal:Mergebox]", ...args); + } + } + + /** @private */ + _warn(...args) { + console.warn("[SkySignal:Mergebox]", ...args); + } + + start() { + if (this.intervalId) { + this._warn("Already started"); + return; + } + + this.windowStartTime = Date.now(); + this.intervalId = setInterval(() => { + try { + this._collect(); + } catch (err) { + // A single bad tick must never crash the host app. + this._warn("Collect tick failed:", err.message); + } + }, this.interval); + + this._log(`Started (interval: ${this.interval}ms, sampleRate: ${this.sampleRate})`); + } + + stop() { + if (this.intervalId) { + clearInterval(this.intervalId); + this.intervalId = null; + } + this._log("Stopped"); + } + + /** + * One snapshot tick: walk sampled sessions, attribute mergebox residency to + * (publicationName, collectionName) buckets via even-split, and POST one + * rollup payload for the window. + * @private + */ + _collect() { + const server = Meteor.server; + const sessions = server && server.sessions; + + // Feature-detect the residency store. Degrade to zero rows on shape + // mismatch (older/newer Meteor) rather than throwing. + if (!sessions || typeof sessions.forEach !== "function") { + this._log("Meteor.server.sessions not a Map — skipping"); + return; + } + + // buckets: key (publicationName || "" + "|" + collectionName) -> bucket + const buckets = new Map(); + + let sessionsWalked = 0; + let sessionsSampledOut = 0; + let sessionsSkipped = 0; + + for (const session of sessions.values()) { + if (sessionsWalked >= this.maxSessions) { + // Bound a single synchronous tick. Reflect the skipped work honestly. + this._log(`maxSessions (${this.maxSessions}) reached — stopping walk`); + break; + } + + // Per-session sampling (NOT per-doc) so a doc's even-split stays intact + // within a sampled session. Skipped sessions just lower the population. + if (this.sampleRate < 1.0 && Math.random() > this.sampleRate) { + sessionsSampledOut++; + continue; + } + + try { + this._attributeSession(session, server, buckets); + sessionsWalked++; + } catch (err) { + // One malformed session never aborts the whole snapshot. + sessionsSkipped++; + this._log("Skipped session due to error:", err.message); + } + } + + if (buckets.size === 0) { + this._log( + `No mergebox residency this tick (walked=${sessionsWalked}, ` + + `sampledOut=${sessionsSampledOut}, skipped=${sessionsSkipped})` + ); + this.windowStartTime = Date.now(); + return; + } + + const windowEnd = Date.now(); + const windowStart = this.windowStartTime; + const metrics = this._buildRows(buckets, windowStart, windowEnd); + + if (this.client && typeof this.client.addMergeboxMetric === "function" && metrics.length > 0) { + for (const m of metrics) { + this.client.addMergeboxMetric(m); + } + this._log( + `Emitted ${metrics.length} mergebox rows ` + + `(walked=${sessionsWalked}, sampledOut=${sessionsSampledOut}, skipped=${sessionsSkipped})` + ); + } + + this.windowStartTime = windowEnd; + } + + /** + * Walk one session's collectionViews and accumulate even-split contributions + * into the shared `buckets` map. + * @private + */ + _attributeSession(session, server, buckets) { + // Feature-detect: session.collectionViews must be a Map-like. + const collectionViews = session && session.collectionViews; + if (!collectionViews || typeof collectionViews.forEach !== "function") { + return; // older/newer Meteor or a session without a mergebox + } + + let docsWalked = 0; + + collectionViews.forEach((cview, collectionName) => { + const documents = cview && cview.documents; + if (!documents || typeof documents.forEach !== "function") { + return; // shape mismatch — skip this collection view + } + + const strategy = this._resolveStrategyName(server, collectionName); + + documents.forEach((docView) => { + if (docsWalked >= this.maxDocsPerSession) { + // Bound the tick; remaining docs in this session are skipped. + return; + } + docsWalked++; + + // DummyDocumentView (NO_MERGE_MULTI) has no dataByKey -> 0 bytes but + // docCount > 0. SessionDocumentView has the real field values. + const dataByKey = docView && docView.dataByKey; + + let docBytes = 0; + let fieldCount = 0; + if (dataByKey && typeof dataByKey.forEach === "function") { + dataByKey.forEach((precedenceList, fieldName) => { + // estimateObjectSize treats a Map/Set as a plain ~8-byte object, + // so we MUST iterate dataByKey ourselves and size each value. + if (!Array.isArray(precedenceList) || precedenceList.length === 0) { + return; // guard empty/missing precedence arrays + } + const resident = precedenceList[0]; + if (!resident) return; + fieldCount++; + docBytes += + (typeof fieldName === "string" ? fieldName.length * 2 : 0) + + estimateObjectSize(resident.value); + }); + } + + // existsIn = Set(subscriptionHandle). The doc is referenced by these + // subs; even-split docBytes/docCount across them. + const existsIn = docView && docView.existsIn; + const handles = + existsIn && typeof existsIn.forEach === "function" + ? Array.from(existsIn) + : []; + const n = handles.length; + if (n === 0) { + // removed() deletes such docs; guard anyway against torn state. + return; + } + + const bytesShare = docBytes / n; + const docShare = 1 / n; + const fieldShare = fieldCount / n; + + for (const handle of handles) { + const publicationName = this._resolvePublicationName(session, handle); + this._addToBucket(buckets, { + publicationName, + collectionName, + strategy, + bytesShare, + docShare, + fieldShare, + session + }); + } + }); + }); + } + + /** + * Accumulate an even-split contribution into the (publicationName, collectionName) + * bucket. Sessions are unioned into a Set for an honest distinct connectionCount. + * @private + */ + _addToBucket(buckets, contribution) { + const { + publicationName, + collectionName, + strategy, + bytesShare, + docShare, + fieldShare, + session + } = contribution; + + // publicationName is null for auto-publish/universal subs. Use "" in the key. + const key = `${publicationName || ""}|${collectionName}`; + + let bucket = buckets.get(key); + if (!bucket) { + bucket = { + publicationName: publicationName || null, + collectionName, + strategy, + bytesHeld: 0, + docCount: 0, + fieldCount: 0, + _sessions: new Set() + }; + buckets.set(key, bucket); + } + + bucket.bytesHeld += bytesShare; + bucket.docCount += docShare; + bucket.fieldCount += fieldShare; + // connectionCount = distinct sessions (NEVER a list of ids — privacy + size). + bucket._sessions.add(session); + } + + /** + * Materialize bucket map -> emit rows. Rounds fractional even-split values at + * emit time, computes connectionCount + avgBytesPerConnection, and caps to the + * top-N rows by bytesHeld (aligns with the server's 500/POST limit). + * @private + */ + _buildRows(buckets, windowStart, windowEnd) { + const timestamp = new Date(windowEnd); + const windowStartDate = new Date(windowStart); + const windowEndDate = new Date(windowEnd); + + let rows = Array.from(buckets.values()).map((b) => { + const bytesHeld = Math.round(b.bytesHeld); + const connectionCount = b._sessions.size; + const row = { + host: this.host, + appVersion: this.appVersion, + timestamp, + windowStart: windowStartDate, + windowEnd: windowEndDate, + collectionName: b.collectionName, + strategy: b.strategy, + bytesHeld, + docCount: Math.round(b.docCount), + fieldCount: Math.round(b.fieldCount), + connectionCount, + sampleRate: this.sampleRate + }; + // Server strips null publicationName (auto-publish). Only emit when named. + if (b.publicationName) { + row.publicationName = b.publicationName; + } + if (connectionCount > 0) { + row.avgBytesPerConnection = Math.round(bytesHeld / connectionCount); + } + return row; + }); + + // Cap output rows per tick (top-N by bytesHeld) to the server POST limit. + if (rows.length > this.maxRows) { + rows.sort((a, b) => b.bytesHeld - a.bytesHeld); + rows = rows.slice(0, this.maxRows); + this._log(`Capped output to top ${this.maxRows} rows by bytesHeld`); + } + + return rows; + } + + /** + * Resolve the schema's strategy enum for a collection by reverse-mapping the + * getPublicationStrategy() object against DDPServer.publicationStrategies by + * identity. Returns one of SERVER_MERGE | NO_MERGE | NO_MERGE_NO_HISTORY | + * unknown. NO_MERGE_MULTI (and anything unrecognized, or any throw) -> "unknown". + * @private + */ + _resolveStrategyName(server, collectionName) { + try { + if (typeof server.getPublicationStrategy !== "function") { + return "unknown"; + } + const strategyObj = server.getPublicationStrategy(collectionName); + if (!strategyObj) return "unknown"; + + // DDPServer is a Meteor package global; the constant table holds the + // canonical strategy objects we compare against by identity. + const table = + typeof DDPServer !== "undefined" && DDPServer.publicationStrategies; + if (!table) { + // Fall back to a structural match if the global table isn't present. + return this._structuralStrategyName(strategyObj); + } + + // Identity reverse-map (only the three the schema accepts). + if (strategyObj === table.SERVER_MERGE) return "SERVER_MERGE"; + if (strategyObj === table.NO_MERGE) return "NO_MERGE"; + if (strategyObj === table.NO_MERGE_NO_HISTORY) return "NO_MERGE_NO_HISTORY"; + // NO_MERGE_MULTI and anything else are intentionally "unknown". + return "unknown"; + } catch (_err) { + return "unknown"; + } + } + + /** + * Structural fallback when DDPServer.publicationStrategies isn't reachable. + * Maps the { useCollectionView, doAccountingForCollection, useDummyDocumentView } + * shape to the schema enum. Only the three the schema accepts are returned. + * @private + */ + _structuralStrategyName(s) { + const cv = !!s.useCollectionView; + const acct = !!s.doAccountingForCollection; + const dummy = !!s.useDummyDocumentView; + + // SERVER_MERGE: collectionView + accounting + real document view + if (cv && acct && !dummy) return "SERVER_MERGE"; + // NO_MERGE: no collectionView, but still accounts ids + if (!cv && acct && !dummy) return "NO_MERGE"; + // NO_MERGE_NO_HISTORY: no collectionView, no accounting + if (!cv && !acct && !dummy) return "NO_MERGE_NO_HISTORY"; + // NO_MERGE_MULTI (dummy view) and anything else -> unknown + return "unknown"; + } + + /** + * Resolve a subscriptionHandle to its publication name. + * Named subs: handle = 'N' + subscriptionId -> session._namedSubs.get(subId)._name. + * Universal / auto-publish: handle = 'U' + Random.id() -> null (no name; the row's + * publicationName is omitted so the server treats it as auto-publish). + * @private + */ + _resolvePublicationName(session, handle) { + if (typeof handle !== "string" || handle.length === 0) return null; + + const kind = handle[0]; + if (kind === "U") { + // Universal / auto-publish subscription — no publication name. + return null; + } + if (kind === "N") { + const subId = handle.slice(1); + const namedSubs = session && session._namedSubs; + if (namedSubs && typeof namedSubs.get === "function") { + const sub = namedSubs.get(subId); + if (sub && typeof sub._name === "string" && sub._name) { + return sub._name; + } + } + return null; + } + return null; + } + + /** + * Lightweight stats for debugging. + */ + getStats() { + return { + interval: this.interval, + sampleRate: this.sampleRate, + maxSessions: this.maxSessions, + maxDocsPerSession: this.maxDocsPerSession, + maxRows: this.maxRows, + running: this.intervalId !== null + }; + } +} diff --git a/lib/collectors/SystemMetricsCollector.js b/lib/collectors/SystemMetricsCollector.js index c3400ed..8167993 100644 --- a/lib/collectors/SystemMetricsCollector.js +++ b/lib/collectors/SystemMetricsCollector.js @@ -10,7 +10,7 @@ import v8 from "v8"; const execAsync = promisify(exec); // Agent version - must be updated alongside package.js on each release -const AGENT_VERSION = '1.0.33'; +const AGENT_VERSION = '1.1.0'; // cgroup v1 "unlimited" sentinel: values >= 2^62 mean no limit is set const CGROUP_V1_UNLIMITED = 2 ** 62; diff --git a/lib/config.js b/lib/config.js index 810460d..a985ee5 100644 --- a/lib/config.js +++ b/lib/config.js @@ -193,6 +193,13 @@ export const DEFAULT_CONFIG = { collectVulnerabilities: true, // Enable npm audit vulnerability scanning vulnerabilitiesInterval: 3600000, // 1 hour + // Mergebox RAM Residency (ships dark — opt-in) + collectMergebox: false, // Enable mergebox residency attribution (default OFF) + mergeboxInterval: 60000, // 60 seconds (low cadence — a full session walk) + mergeboxSampleRate: 1.0, // Per-session sample rate, 0 < rate <= 1 + mergeboxMaxSessions: 2000, // Cap sessions walked per tick (bounds the sync tick) + mergeboxMaxDocsPerSession: 50000, // Cap docs walked per session per tick + // Aggregation Ingest (requires platform v1.0.30+) ingestAggregation: true // Roll up live query / subscription telemetry into fixed-shape aggregates before shipping }; @@ -352,6 +359,13 @@ export function validateConfig(config) { collectVulnerabilities: Match.Optional(Boolean), vulnerabilitiesInterval: Match.Optional(Match.Integer), + // Mergebox RAM Residency + collectMergebox: Match.Optional(Boolean), + mergeboxInterval: Match.Optional(Match.Integer), + mergeboxSampleRate: Match.Optional(Number), + mergeboxMaxSessions: Match.Optional(Match.Integer), + mergeboxMaxDocsPerSession: Match.Optional(Match.Integer), + // Aggregation Ingest ingestAggregation: Match.Optional(Boolean) }); @@ -388,6 +402,23 @@ export function validateConfig(config) { throw new Error("liveQueriesMaxObservers must be at least 500"); } + // Mergebox: sampleRate in (0, 1]; interval and caps must be positive ints. + if (config.mergeboxSampleRate !== undefined && (config.mergeboxSampleRate <= 0 || config.mergeboxSampleRate > 1)) { + throw new Error("mergeboxSampleRate must be greater than 0 and at most 1"); + } + + if (config.mergeboxInterval !== undefined && config.mergeboxInterval < 1000) { + throw new Error("mergeboxInterval must be at least 1000ms"); + } + + if (config.mergeboxMaxSessions !== undefined && config.mergeboxMaxSessions < 1) { + throw new Error("mergeboxMaxSessions must be at least 1"); + } + + if (config.mergeboxMaxDocsPerSession !== undefined && config.mergeboxMaxDocsPerSession < 1) { + throw new Error("mergeboxMaxDocsPerSession must be at least 1"); + } + if (config.logSampleRate !== undefined && (config.logSampleRate < 0 || config.logSampleRate > 1)) { throw new Error("logSampleRate must be between 0 and 1"); } diff --git a/lib/env.js b/lib/env.js index 34742c0..e781ccb 100644 --- a/lib/env.js +++ b/lib/env.js @@ -65,6 +65,7 @@ export const ENV_MAP = [ { env: 'SKYSIGNAL_RUM_SAMPLE_RATE', key: 'rumSampleRate', type: 'float' }, { env: 'SKYSIGNAL_INDEX_USAGE_SAMPLE_RATE', key: 'indexUsageSampleRate', type: 'float' }, { env: 'SKYSIGNAL_LOG_SAMPLE_RATE', key: 'logSampleRate', type: 'float' }, + { env: 'SKYSIGNAL_MERGEBOX_SAMPLE_RATE', key: 'mergeboxSampleRate', type: 'float' }, // Collection intervals { env: 'SKYSIGNAL_SYSTEM_METRICS_INTERVAL', key: 'systemMetricsInterval', type: 'int' }, @@ -75,6 +76,7 @@ export const ENV_MAP = [ { env: 'SKYSIGNAL_DNS_TIMINGS_INTERVAL', key: 'dnsTimingsInterval', type: 'int' }, { env: 'SKYSIGNAL_OUTBOUND_HTTP_INTERVAL', key: 'outboundHttpInterval', type: 'int' }, { env: 'SKYSIGNAL_LIVE_QUERIES_INTERVAL', key: 'liveQueriesInterval', type: 'int' }, + { env: 'SKYSIGNAL_MERGEBOX_INTERVAL', key: 'mergeboxInterval', type: 'int' }, // Feature flags { env: 'SKYSIGNAL_COLLECT_SYSTEM_METRICS', key: 'collectSystemMetrics', type: 'bool' }, @@ -95,6 +97,7 @@ export const ENV_MAP = [ { env: 'SKYSIGNAL_COLLECT_ENVIRONMENT', key: 'collectEnvironment', type: 'bool' }, { env: 'SKYSIGNAL_COLLECT_VULNERABILITIES', key: 'collectVulnerabilities', type: 'bool' }, { env: 'SKYSIGNAL_COLLECT_DEPRECATED_APIS', key: 'collectDeprecatedApis', type: 'bool' }, + { env: 'SKYSIGNAL_COLLECT_MERGEBOX', key: 'collectMergebox', type: 'bool' }, // Log config { env: 'SKYSIGNAL_LOG_LEVELS', key: 'logLevels', type: 'array' }, @@ -115,6 +118,10 @@ export const ENV_MAP = [ // Live queries { env: 'SKYSIGNAL_LIVE_QUERIES_MAX_OBSERVERS', key: 'liveQueriesMaxObservers', type: 'int' }, + // Mergebox + { env: 'SKYSIGNAL_MERGEBOX_MAX_SESSIONS', key: 'mergeboxMaxSessions', type: 'int' }, + { env: 'SKYSIGNAL_MERGEBOX_MAX_DOCS_PER_SESSION', key: 'mergeboxMaxDocsPerSession', type: 'int' }, + // Worker { env: 'SKYSIGNAL_USE_WORKER_THREAD', key: 'useWorkerThread', type: 'bool' }, { env: 'SKYSIGNAL_WORKER_THRESHOLD', key: 'workerThreshold', type: 'int' }, diff --git a/package.js b/package.js index 947588b..3ea8a14 100644 --- a/package.js +++ b/package.js @@ -1,6 +1,6 @@ Package.describe({ name: "skysignal:agent", - version: "1.0.33", + version: "1.1.0", summary: "SkySignal APM agent for Meteor applications - monitors performance, errors, and system metrics", git: "https://github.com/skysignalapm/agent.git", diff --git a/skysignal-agent.js b/skysignal-agent.js index 55fcbaa..9b905e0 100644 --- a/skysignal-agent.js +++ b/skysignal-agent.js @@ -16,6 +16,7 @@ import DeprecatedApiCollector from "./lib/collectors/DeprecatedApiCollector.js"; import PublicationTracer from "./lib/collectors/PublicationTracer.js"; import EnvironmentCollector from "./lib/collectors/EnvironmentCollector.js"; import VulnerabilityCollector from "./lib/collectors/VulnerabilityCollector.js"; +import MergeboxCollector from "./lib/collectors/MergeboxCollector.js"; import JobCollector from "./lib/collectors/jobs/index.js"; import SkySignalClient from "./lib/SkySignalClient.js"; import { mergeConfig } from "./lib/config.js"; @@ -145,6 +146,12 @@ class SkySignalAgentClass { // Vulnerability scanning collectVulnerabilities: true, vulnerabilitiesInterval: 3600000, // 1 hour + // Mergebox RAM residency (ships dark — opt-in) + collectMergebox: false, + mergeboxInterval: 60000, // 60s default + mergeboxSampleRate: 1.0, // per-session sample rate + mergeboxMaxSessions: 2000, // cap sessions walked per tick + mergeboxMaxDocsPerSession: 50000, // cap docs walked per session per tick // Aggregation ingest (reduces writes on Subscriptions / LiveQueries) ingestAggregation: true }; @@ -764,6 +771,30 @@ class SkySignalAgentClass { } } + if (this.config.collectMergebox) { + try { + this.collectors.mergebox = new MergeboxCollector({ + client: this.client, + host: this.config.host, + appVersion: this.config.appVersion, + buildHash: this.config.buildHash, + interval: this.config.mergeboxInterval, + sampleRate: this.config.mergeboxSampleRate, + maxSessions: this.config.mergeboxMaxSessions, + maxDocsPerSession: this.config.mergeboxMaxDocsPerSession, + debug: this.config.debug + }); + setTimeout(() => { + if (this.started && this.collectors.mergebox) { + this.collectors.mergebox.start(); + this._log("Mergebox residency collector started"); + } + }, this._getStaggerDelay()); + } catch (error) { + this._warn("Failed to start mergebox collector:", error.message); + } + } + if (this.config.collectJobs) { // Defer job monitoring to after all Meteor.startup callbacks have run // Job packages like msavin:sjobs register their own Meteor.startup callbacks diff --git a/tests/unit/client/SkySignalClient.test.js b/tests/unit/client/SkySignalClient.test.js index 8200163..46da178 100644 --- a/tests/unit/client/SkySignalClient.test.js +++ b/tests/unit/client/SkySignalClient.test.js @@ -70,7 +70,8 @@ describe('SkySignalClient', function () { 'dependencies', 'mongoPoolMetrics', 'collectionStats', 'ddpConnections', 'subscriptions', 'liveQueries', 'rum', 'logs', 'dnsMetrics', 'outboundHttp', 'cpuProfiles', - 'deprecatedApis', 'publications', 'environment', 'vulnerabilities' + 'deprecatedApis', 'publications', 'environment', 'vulnerabilities', + 'mergebox' ]; for (const type of types) { expect(client.batches[type], `batches.${type}`).to.be.an('array').that.is.empty; @@ -179,7 +180,8 @@ describe('SkySignalClient', function () { deprecatedApis: '/api/v1/metrics/deprecated-apis', publications: '/api/v1/metrics/publications', environment: '/api/v1/metrics/environment', - vulnerabilities: '/api/v1/metrics/vulnerabilities' + vulnerabilities: '/api/v1/metrics/vulnerabilities', + mergebox: '/api/v1/metrics/mergebox' }; for (const [type, endpoint] of Object.entries(expected)) { diff --git a/tests/unit/collectors/MergeboxCollector.test.js b/tests/unit/collectors/MergeboxCollector.test.js new file mode 100644 index 0000000..b987efb --- /dev/null +++ b/tests/unit/collectors/MergeboxCollector.test.js @@ -0,0 +1,555 @@ +/** + * MergeboxCollector tests. + * + * Fabricates Meteor ddp-server internals (Session.collectionViews, + * SessionCollectionView.documents, SessionDocumentView.dataByKey/existsIn, + * Session._namedSubs) to drive the collector's read-only snapshot logic without a + * real Meteor runtime. All structures mirror ddp-server 3.x source. + * + * Asserts: + * (a) SERVER_MERGE doc shared by 2 named subs -> even-split halves bytes per pub, + * and the two rows SUM to the full doc bytes (sum-preserving invariant). + * (b) auto-publish ('U') handle -> row with publicationName omitted. + * (c) NO_MERGE collection absent from collectionViews -> no row. + * (d) connectionCount counts distinct SESSIONS, not handles. + * (e) sampleRate < 1 path doesn't crash and stamps sampleRate on rows. + * (f) feature-detect: a session missing collectionViews is skipped without throwing. + * (g) strategy reverse-map: NO_MERGE object -> "NO_MERGE", unknown object -> "unknown". + * + * Does NOT wrap session.send / processMessage — read-only collector. + */ + +import { expect } from 'chai'; +import sinon from 'sinon'; +import { Meteor } from 'meteor/meteor'; +import MergeboxCollector from '../../../lib/collectors/MergeboxCollector.js'; +import { estimateObjectSize } from '../../../lib/sizeEstimator.js'; + +// --------------------------------------------------------------------------- +// Fabricated ddp-server internals (source-shaped replicas) +// --------------------------------------------------------------------------- + +// Canonical publication strategy objects (identity-compared by the collector). +const STRATEGIES = { + SERVER_MERGE: { useDummyDocumentView: false, useCollectionView: true, doAccountingForCollection: true }, + NO_MERGE_NO_HISTORY: { useDummyDocumentView: false, useCollectionView: false, doAccountingForCollection: false }, + NO_MERGE: { useDummyDocumentView: false, useCollectionView: false, doAccountingForCollection: true }, + NO_MERGE_MULTI: { useDummyDocumentView: true, useCollectionView: true, doAccountingForCollection: true } +}; + +/** SessionDocumentView replica: dataByKey (Map) + existsIn (Set). */ +function makeDocView(fields, handles) { + const dataByKey = new Map(); + for (const [field, value] of Object.entries(fields)) { + // PrecedenceItem[] — resident value is precedenceList[0].value + dataByKey.set(field, [{ subscriptionHandle: handles[0], value }]); + } + return { dataByKey, existsIn: new Set(handles) }; +} + +/** SessionCollectionView replica: documents (Map of docId -> docView). */ +function makeCollectionView(docs) { + const documents = new Map(); + for (const [docId, docView] of Object.entries(docs)) { + documents.set(docId, docView); + } + return { documents }; +} + +/** + * Session replica. + * @param collectionViews Map(collectionName -> SessionCollectionView) + * @param namedSubs Map(subscriptionId -> { _name }) + */ +function makeSession(id, collectionViews, namedSubs = new Map()) { + return { id, collectionViews, _namedSubs: namedSubs }; +} + +/** Build a fake Meteor.server with a sessions Map + strategy resolver. */ +function makeServer(sessionsArray, strategyByCollection = {}) { + const sessions = new Map(); + for (const s of sessionsArray) sessions.set(s.id, s); + return { + sessions, + getPublicationStrategy(collectionName) { + return strategyByCollection[collectionName] || STRATEGIES.SERVER_MERGE; + } + }; +} + +// Compute the exact resident bytes for a docView the way the collector does. +function docBytesOf(fields) { + let bytes = 0; + for (const [field, value] of Object.entries(fields)) { + bytes += field.length * 2 + estimateObjectSize(value); + } + return bytes; +} + +describe('MergeboxCollector', function () { + let collector; + let mockClient; + let originalServer; + + beforeEach(function () { + mockClient = { addMergeboxMetric: sinon.stub() }; + originalServer = Meteor.server; + collector = new MergeboxCollector({ + client: mockClient, + host: 'test-host', + appVersion: '1.0.0' + }); + }); + + afterEach(function () { + if (collector.intervalId) { + clearInterval(collector.intervalId); + collector.intervalId = null; + } + Meteor.server = originalServer; + delete global.DDPServer; + }); + + function emittedRows() { + return mockClient.addMergeboxMetric.getCalls().map(c => c.args[0]); + } + + // ========================================== + // constructor + // ========================================== + describe('constructor', function () { + it('sets default values', function () { + expect(collector.host).to.equal('test-host'); + expect(collector.interval).to.equal(60000); + expect(collector.sampleRate).to.equal(1.0); + expect(collector.maxSessions).to.equal(2000); + expect(collector.maxDocsPerSession).to.equal(50000); + expect(collector.maxRows).to.equal(500); + expect(collector.intervalId).to.be.null; + }); + + it('honors overrides', function () { + const c = new MergeboxCollector({ + interval: 30000, + sampleRate: 0.25, + maxSessions: 10, + maxDocsPerSession: 100, + maxRows: 50 + }); + expect(c.interval).to.equal(30000); + expect(c.sampleRate).to.equal(0.25); + expect(c.maxSessions).to.equal(10); + expect(c.maxDocsPerSession).to.equal(100); + expect(c.maxRows).to.equal(50); + }); + }); + + // ========================================== + // (a) even-split + sum-preserving (LOAD-BEARING) + // ========================================== + describe('even-split attribution (sum-preserving)', function () { + it('halves a SERVER_MERGE doc shared by 2 named subs; rows sum to full bytes', function () { + // One doc in "messages", referenced by two named subs. + const fields = { text: 'hello world', count: 42 }; + const handles = ['Nsub1', 'Nsub2']; + const docView = makeDocView(fields, handles); + + const namedSubs = new Map([ + ['sub1', { _name: 'pubA' }], + ['sub2', { _name: 'pubB' }] + ]); + + const session = makeSession( + 'sess1', + new Map([['messages', makeCollectionView({ doc1: docView })]]), + namedSubs + ); + + Meteor.server = makeServer([session], { messages: STRATEGIES.SERVER_MERGE }); + + collector._collect(); + + const rows = emittedRows(); + expect(rows).to.have.lengthOf(2); + + const fullBytes = docBytesOf(fields); + + const rowA = rows.find(r => r.publicationName === 'pubA'); + const rowB = rows.find(r => r.publicationName === 'pubB'); + expect(rowA, 'pubA row').to.exist; + expect(rowB, 'pubB row').to.exist; + + // Each half (rounded). Their sum reconstructs the collection's true residency. + expect(rowA.bytesHeld + rowB.bytesHeld).to.equal(fullBytes); + expect(rowA.bytesHeld).to.equal(Math.round(fullBytes / 2)); + expect(rowB.bytesHeld).to.equal(Math.round(fullBytes / 2)); + + // docCount even-split: 1/2 each -> rounded. + expect(rowA.docCount).to.equal(Math.round(0.5)); + expect(rowB.docCount).to.equal(Math.round(0.5)); + + // Both are SERVER_MERGE on the right collection. + expect(rowA.strategy).to.equal('SERVER_MERGE'); + expect(rowA.collectionName).to.equal('messages'); + expect(rowB.collectionName).to.equal('messages'); + }); + + it('does not emit a separate per-collection truth row (avoids double-count)', function () { + const fields = { text: 'x' }; + const handles = ['Nsub1', 'Nsub2']; + const docView = makeDocView(fields, handles); + const namedSubs = new Map([ + ['sub1', { _name: 'pubA' }], + ['sub2', { _name: 'pubB' }] + ]); + const session = makeSession( + 'sess1', + new Map([['messages', makeCollectionView({ doc1: docView })]]), + namedSubs + ); + Meteor.server = makeServer([session], { messages: STRATEGIES.SERVER_MERGE }); + + collector._collect(); + + const rows = emittedRows(); + // Exactly the two even-split rows — no extra collection-total row. + expect(rows).to.have.lengthOf(2); + // No row may be missing a publicationName here (all handles are named). + expect(rows.every(r => r.publicationName)).to.be.true; + }); + + it('aggregates the same (pub, collection) across multiple sessions', function () { + const fields = { v: 1 }; + const namedSubs = new Map([['s', { _name: 'pubA' }]]); + const mkSession = (id) => + makeSession( + id, + new Map([['c', makeCollectionView({ d: makeDocView(fields, ['Ns']) })]]), + namedSubs + ); + + Meteor.server = makeServer([mkSession('a'), mkSession('b')], { c: STRATEGIES.SERVER_MERGE }); + collector._collect(); + + const rows = emittedRows(); + expect(rows).to.have.lengthOf(1); + const row = rows[0]; + // bytesHeld = 2 * full doc bytes (one doc per session, single handle each) + expect(row.bytesHeld).to.equal(docBytesOf(fields) * 2); + expect(row.docCount).to.equal(2); + expect(row.connectionCount).to.equal(2); + }); + }); + + // ========================================== + // (b) auto-publish handle -> publicationName omitted + // ========================================== + describe('auto-publish attribution', function () { + it("omits publicationName for a 'U' (universal) handle", function () { + const fields = { a: 'b' }; + const handles = ['Uxyz']; // universal / auto-publish + const docView = makeDocView(fields, handles); + const session = makeSession( + 'sess1', + new Map([['posts', makeCollectionView({ d: docView })]]) + // no _namedSubs needed for a universal handle + ); + Meteor.server = makeServer([session], { posts: STRATEGIES.SERVER_MERGE }); + + collector._collect(); + + const rows = emittedRows(); + expect(rows).to.have.lengthOf(1); + expect(rows[0]).to.not.have.property('publicationName'); + expect(rows[0].collectionName).to.equal('posts'); + expect(rows[0].bytesHeld).to.equal(docBytesOf(fields)); + }); + }); + + // ========================================== + // (c) NO_MERGE collection absent -> no row + // ========================================== + describe('NO_MERGE residency absence', function () { + it('emits no row when a NO_MERGE collection has no collectionView entry', function () { + // NO_MERGE keeps no SessionCollectionView, so collectionViews is empty. + const session = makeSession('sess1', new Map(), new Map()); + Meteor.server = makeServer([session], { feed: STRATEGIES.NO_MERGE }); + + collector._collect(); + + expect(mockClient.addMergeboxMetric.called).to.be.false; + }); + }); + + // ========================================== + // (d) connectionCount = distinct sessions, not handles + // ========================================== + describe('connectionCount', function () { + it('counts distinct sessions even when many handles reference a doc', function () { + // A single session, one doc referenced by THREE handles. + const fields = { x: 'y' }; + const handles = ['Nsub1', 'Nsub2', 'Nsub3']; + const docView = makeDocView(fields, handles); + const namedSubs = new Map([ + ['sub1', { _name: 'p' }], + ['sub2', { _name: 'p' }], + ['sub3', { _name: 'p' }] + ]); + const session = makeSession( + 'sess1', + new Map([['c', makeCollectionView({ d: docView })]]), + namedSubs + ); + Meteor.server = makeServer([session], { c: STRATEGIES.SERVER_MERGE }); + + collector._collect(); + + const rows = emittedRows(); + // All three handles resolve to pub "p" -> single bucket. + expect(rows).to.have.lengthOf(1); + // Three handles but ONE session. + expect(rows[0].connectionCount).to.equal(1); + // Sum-preserving: three thirds round-trip to the full doc bytes. + expect(rows[0].bytesHeld).to.equal(docBytesOf(fields)); + }); + }); + + // ========================================== + // (e) sampleRate < 1 + // ========================================== + describe('sampleRate < 1', function () { + it('does not crash and stamps sampleRate on every row', function () { + const c = new MergeboxCollector({ + client: mockClient, + host: 'test-host', + sampleRate: 0.5 + }); + // Force the sampler to ALWAYS include the session (Math.random() returns 0). + const rndStub = sinon.stub(Math, 'random').returns(0); + + const fields = { a: 1 }; + const session = makeSession( + 's', + new Map([['c', makeCollectionView({ d: makeDocView(fields, ['Ux']) })]]) + ); + Meteor.server = makeServer([session], { c: STRATEGIES.SERVER_MERGE }); + + expect(() => c._collect()).to.not.throw(); + rndStub.restore(); + + const rows = emittedRows(); + expect(rows).to.have.lengthOf(1); + expect(rows[0].sampleRate).to.equal(0.5); + }); + + it('skips sessions when the sampler excludes them', function () { + const c = new MergeboxCollector({ + client: mockClient, + host: 'test-host', + sampleRate: 0.5 + }); + // Math.random() > sampleRate -> session is sampled out. + const rndStub = sinon.stub(Math, 'random').returns(0.99); + + const session = makeSession( + 's', + new Map([['c', makeCollectionView({ d: makeDocView({ a: 1 }, ['Ux']) })]]) + ); + Meteor.server = makeServer([session], { c: STRATEGIES.SERVER_MERGE }); + + c._collect(); + rndStub.restore(); + + expect(mockClient.addMergeboxMetric.called).to.be.false; + }); + }); + + // ========================================== + // (f) feature-detect malformed sessions + // ========================================== + describe('feature detection', function () { + it('skips a session missing collectionViews without throwing', function () { + const goodFields = { a: 1 }; + const good = makeSession( + 'good', + new Map([['c', makeCollectionView({ d: makeDocView(goodFields, ['Ux']) })]]) + ); + const broken = { id: 'broken' }; // no collectionViews at all + + Meteor.server = makeServer([broken, good], { c: STRATEGIES.SERVER_MERGE }); + + expect(() => collector._collect()).to.not.throw(); + + // The good session still produces its row. + const rows = emittedRows(); + expect(rows).to.have.lengthOf(1); + expect(rows[0].bytesHeld).to.equal(docBytesOf(goodFields)); + }); + + it('degrades to zero rows when Meteor.server.sessions is not a Map', function () { + Meteor.server = { sessions: null, getPublicationStrategy() { return STRATEGIES.SERVER_MERGE; } }; + expect(() => collector._collect()).to.not.throw(); + expect(mockClient.addMergeboxMetric.called).to.be.false; + }); + }); + + // ========================================== + // (g) strategy reverse-map + // ========================================== + describe('_resolveStrategyName', function () { + it('identity-maps NO_MERGE object -> "NO_MERGE" via DDPServer table', function () { + global.DDPServer = { publicationStrategies: STRATEGIES }; + const server = { + getPublicationStrategy() { return STRATEGIES.NO_MERGE; } + }; + expect(collector._resolveStrategyName(server, 'c')).to.equal('NO_MERGE'); + }); + + it('identity-maps SERVER_MERGE and NO_MERGE_NO_HISTORY', function () { + global.DDPServer = { publicationStrategies: STRATEGIES }; + const sm = { getPublicationStrategy() { return STRATEGIES.SERVER_MERGE; } }; + const nh = { getPublicationStrategy() { return STRATEGIES.NO_MERGE_NO_HISTORY; } }; + expect(collector._resolveStrategyName(sm, 'c')).to.equal('SERVER_MERGE'); + expect(collector._resolveStrategyName(nh, 'c')).to.equal('NO_MERGE_NO_HISTORY'); + }); + + it('maps NO_MERGE_MULTI -> "unknown"', function () { + global.DDPServer = { publicationStrategies: STRATEGIES }; + const server = { getPublicationStrategy() { return STRATEGIES.NO_MERGE_MULTI; } }; + expect(collector._resolveStrategyName(server, 'c')).to.equal('unknown'); + }); + + it('maps an unrecognized object -> "unknown"', function () { + global.DDPServer = { publicationStrategies: STRATEGIES }; + const server = { getPublicationStrategy() { return { foo: true }; } }; + expect(collector._resolveStrategyName(server, 'c')).to.equal('unknown'); + }); + + it('returns "unknown" when getPublicationStrategy throws', function () { + const server = { getPublicationStrategy() { throw new Error('boom'); } }; + expect(collector._resolveStrategyName(server, 'c')).to.equal('unknown'); + }); + + it('falls back to structural matching when DDPServer global is absent', function () { + // No global.DDPServer -> structural fallback by shape. + const server = { getPublicationStrategy() { return { ...STRATEGIES.NO_MERGE }; } }; + expect(collector._resolveStrategyName(server, 'c')).to.equal('NO_MERGE'); + }); + }); + + // ========================================== + // _resolvePublicationName + // ========================================== + describe('_resolvePublicationName', function () { + it("resolves 'N'+subId via session._namedSubs._name", function () { + const session = makeSession('s', new Map(), new Map([['abc', { _name: 'myPub' }]])); + expect(collector._resolvePublicationName(session, 'Nabc')).to.equal('myPub'); + }); + + it("returns null for 'U' (universal) handles", function () { + const session = makeSession('s', new Map(), new Map()); + expect(collector._resolvePublicationName(session, 'Uxyz')).to.be.null; + }); + + it('returns null when the named sub is not found', function () { + const session = makeSession('s', new Map(), new Map()); + expect(collector._resolvePublicationName(session, 'Nmissing')).to.be.null; + }); + }); + + // ========================================== + // DummyDocumentView (NO_MERGE_MULTI): docCount>0, bytesHeld~0 + // ========================================== + describe('DummyDocumentView (empty dataByKey)', function () { + it('counts the doc but holds ~0 bytes and maps strategy to "unknown"', function () { + // DummyDocumentView has existsIn but no dataByKey field values. + const dummy = { existsIn: new Set(['Ux']) }; // no dataByKey + const session = makeSession( + 's', + new Map([['c', makeCollectionView({ d: dummy })]]) + ); + global.DDPServer = { publicationStrategies: STRATEGIES }; + Meteor.server = makeServer([session], { c: STRATEGIES.NO_MERGE_MULTI }); + + collector._collect(); + + const rows = emittedRows(); + expect(rows).to.have.lengthOf(1); + expect(rows[0].docCount).to.equal(1); + expect(rows[0].bytesHeld).to.equal(0); + expect(rows[0].strategy).to.equal('unknown'); + }); + }); + + // ========================================== + // row shape + avgBytesPerConnection + window + // ========================================== + describe('row shape', function () { + it('stamps window bounds, timestamp, host and avgBytesPerConnection', function () { + const fields = { a: 'hello' }; + const session = makeSession( + 's', + new Map([['c', makeCollectionView({ d: makeDocView(fields, ['Ux']) })]]) + ); + Meteor.server = makeServer([session], { c: STRATEGIES.SERVER_MERGE }); + + collector._collect(); + const row = emittedRows()[0]; + + expect(row.host).to.equal('test-host'); + expect(row.appVersion).to.equal('1.0.0'); + expect(row.timestamp).to.be.instanceOf(Date); + expect(row.windowStart).to.be.instanceOf(Date); + expect(row.windowEnd).to.be.instanceOf(Date); + expect(row.connectionCount).to.equal(1); + expect(row.avgBytesPerConnection).to.equal(row.bytesHeld); + expect(row.fieldCount).to.equal(1); + }); + }); + + // ========================================== + // maxRows cap + // ========================================== + describe('maxRows cap', function () { + it('keeps only the top-N rows by bytesHeld', function () { + const c = new MergeboxCollector({ client: mockClient, host: 'h', maxRows: 2 }); + + // Three collections with distinct, increasing byte sizes -> 3 buckets. + const collectionViews = new Map([ + ['small', makeCollectionView({ d: makeDocView({ a: 'x' }, ['Ux']) })], + ['medium', makeCollectionView({ d: makeDocView({ a: 'xxxxxxxxxx' }, ['Ux']) })], + ['large', makeCollectionView({ d: makeDocView({ a: 'x'.repeat(200) }, ['Ux']) })] + ]); + const session = makeSession('s', collectionViews); + Meteor.server = makeServer([session], { + small: STRATEGIES.SERVER_MERGE, + medium: STRATEGIES.SERVER_MERGE, + large: STRATEGIES.SERVER_MERGE + }); + + c._collect(); + const rows = emittedRows(); + expect(rows).to.have.lengthOf(2); + const names = rows.map(r => r.collectionName); + expect(names).to.include('large'); + expect(names).to.include('medium'); + expect(names).to.not.include('small'); + }); + }); + + // ========================================== + // start / stop + // ========================================== + describe('start / stop', function () { + it('start sets interval; idempotent; stop clears it', function () { + Meteor.server = makeServer([]); + collector.start(); + expect(collector.intervalId).to.not.be.null; + const id = collector.intervalId; + collector.start(); + expect(collector.intervalId).to.equal(id); + collector.stop(); + expect(collector.intervalId).to.be.null; + }); + }); +}); From 6e837541285032a27bef3ae33738e459bdc33e85 Mon Sep 17 00:00:00 2001 From: Michael Pfeiffer Date: Fri, 19 Jun 2026 15:44:44 -0500 Subject: [PATCH 2/4] feat: emit NO_MERGE_MULTI as a first-class strategy The collector was mapping NO_MERGE_MULTI to "unknown", which conflated a known Meteor strategy with "couldn't read the strategy". Resolve all four publication strategies: add the NO_MERGE_MULTI identity match and the structural shape (useCollectionView + doAccountingForCollection + useDummyDocumentView). "unknown" is now reserved for a genuinely unreadable/unrecognized strategy. Tests updated. --- lib/collectors/MergeboxCollector.js | 28 +++++++++++-------- .../unit/collectors/MergeboxCollector.test.js | 11 +++++--- 2 files changed, 24 insertions(+), 15 deletions(-) diff --git a/lib/collectors/MergeboxCollector.js b/lib/collectors/MergeboxCollector.js index 5a0ca12..d3ba488 100644 --- a/lib/collectors/MergeboxCollector.js +++ b/lib/collectors/MergeboxCollector.js @@ -36,11 +36,12 @@ import { estimateObjectSize } from "../sizeEstimator.js"; * STRATEGY: * Read via Meteor.server.getPublicationStrategy(collectionName) — an OBJECT * { useCollectionView, doAccountingForCollection, useDummyDocumentView }, NOT a - * string. We reverse-map by identity against DDPServer.publicationStrategies - * (SERVER_MERGE / NO_MERGE / NO_MERGE_NO_HISTORY). NO_MERGE / NO_MERGE_NO_HISTORY - * keep NO collectionView entry at all (residency ~0); that absence is the whole - * signal and is never synthesized. NO_MERGE_MULTI uses DummyDocumentView (empty - * dataByKey) and maps to "unknown". Anything unrecognized / any throw -> "unknown". + * string. We reverse-map by identity against DDPServer.publicationStrategies to + * all four Meteor strategies (SERVER_MERGE / NO_MERGE / NO_MERGE_NO_HISTORY / + * NO_MERGE_MULTI). NO_MERGE / NO_MERGE_NO_HISTORY keep NO collectionView entry at + * all (residency ~0); that absence is the whole signal and is never synthesized. + * NO_MERGE_MULTI uses DummyDocumentView (empty dataByKey) — doc counts with ~0 + * field bytes. Only a genuinely unrecognized strategy / any throw -> "unknown". * * READ-ONLY: this collector never wraps session.send / processMessage (DDPCollector * already chains those; double-wrapping risks the stack-overflow regression from @@ -358,8 +359,9 @@ export default class MergeboxCollector { /** * Resolve the schema's strategy enum for a collection by reverse-mapping the * getPublicationStrategy() object against DDPServer.publicationStrategies by - * identity. Returns one of SERVER_MERGE | NO_MERGE | NO_MERGE_NO_HISTORY | - * unknown. NO_MERGE_MULTI (and anything unrecognized, or any throw) -> "unknown". + * identity. Returns one of the four Meteor strategies — SERVER_MERGE | + * NO_MERGE | NO_MERGE_NO_HISTORY | NO_MERGE_MULTI — or "unknown" only when the + * strategy genuinely can't be read (no getPublicationStrategy, or any throw). * @private */ _resolveStrategyName(server, collectionName) { @@ -379,11 +381,12 @@ export default class MergeboxCollector { return this._structuralStrategyName(strategyObj); } - // Identity reverse-map (only the three the schema accepts). + // Identity reverse-map across all four Meteor publication strategies. if (strategyObj === table.SERVER_MERGE) return "SERVER_MERGE"; if (strategyObj === table.NO_MERGE) return "NO_MERGE"; if (strategyObj === table.NO_MERGE_NO_HISTORY) return "NO_MERGE_NO_HISTORY"; - // NO_MERGE_MULTI and anything else are intentionally "unknown". + if (strategyObj === table.NO_MERGE_MULTI) return "NO_MERGE_MULTI"; + // Only a genuinely unrecognized (e.g. future) strategy stays "unknown". return "unknown"; } catch (_err) { return "unknown"; @@ -393,7 +396,7 @@ export default class MergeboxCollector { /** * Structural fallback when DDPServer.publicationStrategies isn't reachable. * Maps the { useCollectionView, doAccountingForCollection, useDummyDocumentView } - * shape to the schema enum. Only the three the schema accepts are returned. + * shape to the schema enum (all four Meteor strategies). * @private */ _structuralStrategyName(s) { @@ -403,11 +406,14 @@ export default class MergeboxCollector { // SERVER_MERGE: collectionView + accounting + real document view if (cv && acct && !dummy) return "SERVER_MERGE"; + // NO_MERGE_MULTI: collectionView + accounting + DUMMY document view + // (multi-publication safe, but stores no field values) + if (cv && acct && dummy) return "NO_MERGE_MULTI"; // NO_MERGE: no collectionView, but still accounts ids if (!cv && acct && !dummy) return "NO_MERGE"; // NO_MERGE_NO_HISTORY: no collectionView, no accounting if (!cv && !acct && !dummy) return "NO_MERGE_NO_HISTORY"; - // NO_MERGE_MULTI (dummy view) and anything else -> unknown + // Anything else (e.g. a future strategy shape) -> unknown return "unknown"; } diff --git a/tests/unit/collectors/MergeboxCollector.test.js b/tests/unit/collectors/MergeboxCollector.test.js index b987efb..f912b59 100644 --- a/tests/unit/collectors/MergeboxCollector.test.js +++ b/tests/unit/collectors/MergeboxCollector.test.js @@ -413,10 +413,10 @@ describe('MergeboxCollector', function () { expect(collector._resolveStrategyName(nh, 'c')).to.equal('NO_MERGE_NO_HISTORY'); }); - it('maps NO_MERGE_MULTI -> "unknown"', function () { + it('identity-maps NO_MERGE_MULTI', function () { global.DDPServer = { publicationStrategies: STRATEGIES }; const server = { getPublicationStrategy() { return STRATEGIES.NO_MERGE_MULTI; } }; - expect(collector._resolveStrategyName(server, 'c')).to.equal('unknown'); + expect(collector._resolveStrategyName(server, 'c')).to.equal('NO_MERGE_MULTI'); }); it('maps an unrecognized object -> "unknown"', function () { @@ -434,6 +434,9 @@ describe('MergeboxCollector', function () { // No global.DDPServer -> structural fallback by shape. const server = { getPublicationStrategy() { return { ...STRATEGIES.NO_MERGE }; } }; expect(collector._resolveStrategyName(server, 'c')).to.equal('NO_MERGE'); + // a dummy-document-view shape resolves structurally to NO_MERGE_MULTI + const multi = { getPublicationStrategy() { return { ...STRATEGIES.NO_MERGE_MULTI }; } }; + expect(collector._resolveStrategyName(multi, 'c')).to.equal('NO_MERGE_MULTI'); }); }); @@ -461,7 +464,7 @@ describe('MergeboxCollector', function () { // DummyDocumentView (NO_MERGE_MULTI): docCount>0, bytesHeld~0 // ========================================== describe('DummyDocumentView (empty dataByKey)', function () { - it('counts the doc but holds ~0 bytes and maps strategy to "unknown"', function () { + it('counts the doc but holds ~0 bytes and maps strategy to NO_MERGE_MULTI', function () { // DummyDocumentView has existsIn but no dataByKey field values. const dummy = { existsIn: new Set(['Ux']) }; // no dataByKey const session = makeSession( @@ -477,7 +480,7 @@ describe('MergeboxCollector', function () { expect(rows).to.have.lengthOf(1); expect(rows[0].docCount).to.equal(1); expect(rows[0].bytesHeld).to.equal(0); - expect(rows[0].strategy).to.equal('unknown'); + expect(rows[0].strategy).to.equal('NO_MERGE_MULTI'); }); }); From 84f8eb1de523229a356840b06a23edae9fc1b78d Mon Sep 17 00:00:00 2001 From: Michael Pfeiffer Date: Fri, 19 Jun 2026 15:50:57 -0500 Subject: [PATCH 3/4] docs: document the mergebox agent configuration options The README config reference omitted the new collector. Add collectMergebox to the environment-variable and feature-flag tables (flagged opt-in / default false), and a dedicated "Mergebox Residency (opt-in)" section with enable snippets and a tuning table for mergeboxInterval / mergeboxSampleRate / mergeboxMaxSessions / mergeboxMaxDocsPerSession (and their SKYSIGNAL_* env vars). Also corrects the v1.1.0 CHANGELOG strategy list to include NO_MERGE_MULTI. --- CHANGELOG.md | 2 +- README.md | 27 +++++++++++++++++++++++++++ 2 files changed, 28 insertions(+), 1 deletion(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 1951680..bce2cd6 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -3,7 +3,7 @@ ### v1.1.0 (Mergebox RAM Residency Collector) -- **New `MergeboxCollector`** - Measures Meteor's MERGEBOX RAM residency (the per-session, server-side cache of published documents) and posts per-(publication, collection) rollups to `POST /api/v1/metrics/mergebox`. The collector walks `Meteor.server.sessions` read-only, estimates the resident bytes each session's mergebox holds per published collection (sizing each `SessionDocumentView.dataByKey` field value directly), reads the publication strategy via `Meteor.server.getPublicationStrategy()` (reverse-mapped to `SERVER_MERGE` / `NO_MERGE` / `NO_MERGE_NO_HISTORY` / `unknown`), and attributes residency to subscriptions via a pure even-split across `existsIn`. The even-split is sum-preserving: the rows for a collection sum back to that collection's true residency. `connectionCount` is a count of distinct DDP sessions (never a list of connection ids). +- **New `MergeboxCollector`** - Measures Meteor's MERGEBOX RAM residency (the per-session, server-side cache of published documents) and posts per-(publication, collection) rollups to `POST /api/v1/metrics/mergebox`. The collector walks `Meteor.server.sessions` read-only, estimates the resident bytes each session's mergebox holds per published collection (sizing each `SessionDocumentView.dataByKey` field value directly), reads the publication strategy via `Meteor.server.getPublicationStrategy()` (reverse-mapped to all four Meteor strategies — `SERVER_MERGE` / `NO_MERGE` / `NO_MERGE_NO_HISTORY` / `NO_MERGE_MULTI`; `unknown` only when the strategy genuinely can't be read), and attributes residency to subscriptions via a pure even-split across `existsIn`. The even-split is sum-preserving: the rows for a collection sum back to that collection's true residency. `connectionCount` is a count of distinct DDP sessions (never a list of connection ids). - **Ships dark / opt-in** - `collectMergebox` defaults to **false**. Enable via `SkySignalAgent.configure({ collectMergebox: true })` or `SKYSIGNAL_COLLECT_MERGEBOX=true`. - **Performance-bounded** - 60s default cadence (`mergeboxInterval`), per-session sampling (`mergeboxSampleRate`, every row stamps the rate for server-side extrapolation), `mergeboxMaxSessions` / `mergeboxMaxDocsPerSession` caps to bound a single synchronous tick, per-session try/catch, feature-detection of internal shapes, and a top-N output cap aligned with the server's 500-rows-per-POST limit. Read-only — never wraps `session.send` / `processMessage`. - Requires platform-side support for the mergebox ingest endpoint (gated on the `ddp` feature / pro tier). diff --git a/README.md b/README.md index ed35872..d3b31d1 100644 --- a/README.md +++ b/README.md @@ -134,6 +134,7 @@ Every server-side configuration option has a corresponding environment variable. | `SKYSIGNAL_COLLECT_OUTBOUND_HTTP` | `collectOutboundHttp` | Boolean | | `SKYSIGNAL_COLLECT_CPU_PROFILES` | `collectCpuProfiles` | Boolean | | `SKYSIGNAL_COLLECT_LIVE_QUERIES` | `collectLiveQueries` | Boolean | +| `SKYSIGNAL_COLLECT_MERGEBOX` | `collectMergebox` | Boolean | | `SKYSIGNAL_COLLECT_PUBLICATIONS` | `collectPublications` | Boolean | | `SKYSIGNAL_COLLECT_ENVIRONMENT` | `collectEnvironment` | Boolean | | `SKYSIGNAL_COLLECT_VULNERABILITIES` | `collectVulnerabilities` | Boolean | @@ -216,8 +217,34 @@ All collection interval and performance options also have corresponding `SKYSIGN | `collectPublications` | Boolean | `true` | Detect publication over-fetching and missing projections | | `collectEnvironment` | Boolean | `true` | Capture environment metadata (packages, flags, OS info) | | `collectVulnerabilities` | Boolean | `true` | Run `npm audit` scans and report high/critical CVEs | +| `collectMergebox` | Boolean | `false` | **Opt-in.** Attribute server-side mergebox RAM residency to publications/collections. Off by default; see [Mergebox Residency](#mergebox-residency-opt-in) for details and tuning | | `ingestAggregation` | Boolean | `true` | Roll up live query / subscription telemetry into fixed-shape aggregates before shipping. Reduces server ingest row counts 10-100× on high-volume apps. Requires a platform version that supports the aggregate ingest endpoints (v1.0.30+). Set to `false` to post per-observer / per-subscription records instead. | +### Mergebox Residency (opt-in) + +`collectMergebox` is **disabled by default** — it is the one collector you must explicitly enable. When on, the agent takes a low-frequency, read-only snapshot of each DDP session's mergebox (Meteor's per-connection, server-side document cache) and reports estimated RAM residency per publication and collection, along with the active publication strategy (`SERVER_MERGE` / `NO_MERGE` / `NO_MERGE_NO_HISTORY` / `NO_MERGE_MULTI`). + +Enable it via settings or environment variable: + +```json +{ "skysignal": { "collectMergebox": true } } +``` + +```bash +SKYSIGNAL_COLLECT_MERGEBOX=true +``` + +Snapshot cost scales with sessions × collections × documents, so the collector is bounded by these safety knobs. Each also has a `SKYSIGNAL_*` environment variable (`SKYSIGNAL_MERGEBOX_INTERVAL`, `SKYSIGNAL_MERGEBOX_SAMPLE_RATE`, `SKYSIGNAL_MERGEBOX_MAX_SESSIONS`, `SKYSIGNAL_MERGEBOX_MAX_DOCS_PER_SESSION`): + +| Option | Type | Default | Description | +|--------|------|---------|-------------| +| `mergeboxInterval` | Number | `60000` | Residency snapshot interval in ms (1 minute) | +| `mergeboxSampleRate` | Number | `1.0` | Fraction of DDP sessions sampled per tick (0-1). Lower on very large fleets; stamped on each row so the server extrapolates | +| `mergeboxMaxSessions` | Number | `2000` | Max sessions walked per snapshot tick | +| `mergeboxMaxDocsPerSession` | Number | `50000` | Max documents inspected per session per tick | + +Requires a SkySignal plan with the DDP feature and a platform version that exposes the mergebox ingest endpoint. + ### Agent-Side Aggregation When `ingestAggregation` is enabled (the default), the `LiveQueriesCollector` and `DDPCollector` pre-aggregate telemetry on the agent into two compact payload shapes (one per live query signature, one per publication + params signature) and POST them to: From 28987d67b0d6647dc10860c63458d42511aa3553 Mon Sep 17 00:00:00 2001 From: Michael Pfeiffer Date: Fri, 19 Jun 2026 16:00:55 -0500 Subject: [PATCH 4/4] =?UTF-8?q?fix(mergebox):=20address=20PR=20review=20?= =?UTF-8?q?=E2=80=94=20exact=20byte=20split,=20doc=20cap,=20buildHash?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Byte attribution now uses an integer largest-remainder split per doc (the first docBytes % n handles get +1 byte) instead of float docBytes/n with per-bucket rounding. Per-handle shares sum back to docBytes exactly, so per-collection residency is now EXACTLY sum-preserving for any byte total (previously off by rounding when not divisible). - maxDocsPerSession is enforced with for...of + break over collectionViews / documents.values() instead of Map#forEach (where return doesn't stop iteration), so the cap actually bounds the walk on large sessions. - Emit buildHash on rows when configured (omitted otherwise), mirroring other collectors so residency correlates to a deployed build; documented in the addMergeboxMetric JSDoc (which also now lists NO_MERGE_MULTI). Tests: exact 3-way non-divisible split, buildHash present/omitted, and the maxDocsPerSession cap stopping the walk. --- lib/SkySignalClient.js | 3 +- lib/collectors/MergeboxCollector.js | 60 ++++++++----- .../unit/collectors/MergeboxCollector.test.js | 88 +++++++++++++++++++ 3 files changed, 129 insertions(+), 22 deletions(-) diff --git a/lib/SkySignalClient.js b/lib/SkySignalClient.js index d7630ea..394d485 100644 --- a/lib/SkySignalClient.js +++ b/lib/SkySignalClient.js @@ -452,11 +452,12 @@ export default class SkySignalClient { * @param {Object} metric - Mergebox residency rollup row * @param {string} metric.collectionName - Published collection name * @param {string} [metric.publicationName] - Publication name (omitted for auto-publish) - * @param {string} metric.strategy - SERVER_MERGE | NO_MERGE | NO_MERGE_NO_HISTORY | unknown + * @param {string} metric.strategy - SERVER_MERGE | NO_MERGE | NO_MERGE_NO_HISTORY | NO_MERGE_MULTI | unknown * @param {number} metric.bytesHeld - Estimated mergebox residency bytes for this group * @param {number} metric.docCount - Documents resident for this group * @param {number} metric.connectionCount - Distinct DDP connections holding this group * @param {number} metric.sampleRate - Per-session sample rate used by the collector + * @param {string} [metric.buildHash] - Deployed build hash (omitted when unresolved) * @returns {void} */ addMergeboxMetric(metric) { diff --git a/lib/collectors/MergeboxCollector.js b/lib/collectors/MergeboxCollector.js index d3ba488..0c0dc2c 100644 --- a/lib/collectors/MergeboxCollector.js +++ b/lib/collectors/MergeboxCollector.js @@ -28,10 +28,12 @@ import { estimateObjectSize } from "../sizeEstimator.js"; * collection with no discriminator (getByCollection / getStrategyBreakdown), so * emitting a separate per-collection total alongside even-split rows would * double-count. We emit ONLY even-split per-(publicationName, collectionName) rows. - * For a doc referenced by n handles, each referencing handle's bucket receives - * docBytes/n and 1/n docCount. Because the split divides by existsIn.size, the - * even-split rows for a collection SUM BACK to that collection's true residency - * (sum-preserving). That property is the load-bearing correctness invariant. + * For a doc referenced by n handles, its bytes are divided across the handles via + * an INTEGER largest-remainder split (the first `docBytes % n` handles get +1 + * byte) so the per-handle shares sum back to docBytes EXACTLY — no per-bucket + * rounding drift. docCount is a 1/n fractional estimate. Because the byte split + * sums to docBytes, the rows for a collection SUM BACK to its true residency + * (exactly sum-preserving). That property is the load-bearing correctness invariant. * * STRATEGY: * Read via Meteor.server.getPublicationStrategy(collectionName) — an OBJECT @@ -198,19 +200,20 @@ export default class MergeboxCollector { let docsWalked = 0; - collectionViews.forEach((cview, collectionName) => { + // for...of (not Map#forEach) so the maxDocsPerSession cap can actually + // `break` — a `return` inside forEach keeps iterating every remaining entry, + // defeating the bound on large sessions/collections. + for (const [collectionName, cview] of collectionViews) { + if (docsWalked >= this.maxDocsPerSession) break; // stop scanning further collections const documents = cview && cview.documents; - if (!documents || typeof documents.forEach !== "function") { - return; // shape mismatch — skip this collection view + if (!documents || typeof documents.values !== "function") { + continue; // shape mismatch — skip this collection view } const strategy = this._resolveStrategyName(server, collectionName); - documents.forEach((docView) => { - if (docsWalked >= this.maxDocsPerSession) { - // Bound the tick; remaining docs in this session are skipped. - return; - } + for (const docView of documents.values()) { + if (docsWalked >= this.maxDocsPerSession) break; // tick bound reached docsWalked++; // DummyDocumentView (NO_MERGE_MULTI) has no dataByKey -> 0 bytes but @@ -236,7 +239,7 @@ export default class MergeboxCollector { } // existsIn = Set(subscriptionHandle). The doc is referenced by these - // subs; even-split docBytes/docCount across them. + // subs; split docBytes across them. const existsIn = docView && docView.existsIn; const handles = existsIn && typeof existsIn.forEach === "function" @@ -245,15 +248,24 @@ export default class MergeboxCollector { const n = handles.length; if (n === 0) { // removed() deletes such docs; guard anyway against torn state. - return; + continue; } - const bytesShare = docBytes / n; + // Integer largest-remainder split of the (rounded) byte total: the + // first `remBytes` handles get +1 byte so the per-handle shares sum + // back to docBytes EXACTLY. Accumulating integers (no per-bucket + // rounding drift) keeps per-collection residency exactly sum-preserving. + const docBytesInt = Math.round(docBytes); + const baseBytes = Math.floor(docBytesInt / n); + const remBytes = docBytesInt - baseBytes * n; + // docCount/fieldCount remain fractional even-split estimates (a shared + // doc isn't a whole doc for any one pub); they're rounded at emit time. const docShare = 1 / n; const fieldShare = fieldCount / n; - for (const handle of handles) { + handles.forEach((handle, i) => { const publicationName = this._resolvePublicationName(session, handle); + const bytesShare = baseBytes + (i < remBytes ? 1 : 0); this._addToBucket(buckets, { publicationName, collectionName, @@ -263,9 +275,9 @@ export default class MergeboxCollector { fieldShare, session }); - } - }); - }); + }); + } + } } /** @@ -309,8 +321,9 @@ export default class MergeboxCollector { } /** - * Materialize bucket map -> emit rows. Rounds fractional even-split values at - * emit time, computes connectionCount + avgBytesPerConnection, and caps to the + * Materialize bucket map -> emit rows. bytesHeld is already an exact integer + * sum (largest-remainder split); docCount/fieldCount fractional estimates are + * rounded here. Computes connectionCount + avgBytesPerConnection and caps to the * top-N rows by bytesHeld (aligns with the server's 500/POST limit). * @private */ @@ -340,6 +353,11 @@ export default class MergeboxCollector { if (b.publicationName) { row.publicationName = b.publicationName; } + // buildHash correlates residency to a specific deployed build (omitted + // when the agent couldn't resolve one), mirroring other collectors. + if (this.buildHash) { + row.buildHash = this.buildHash; + } if (connectionCount > 0) { row.avgBytesPerConnection = Math.round(bytesHeld / connectionCount); } diff --git a/tests/unit/collectors/MergeboxCollector.test.js b/tests/unit/collectors/MergeboxCollector.test.js index f912b59..2e462e8 100644 --- a/tests/unit/collectors/MergeboxCollector.test.js +++ b/tests/unit/collectors/MergeboxCollector.test.js @@ -194,6 +194,36 @@ describe('MergeboxCollector', function () { expect(rowB.collectionName).to.equal('messages'); }); + it('splits across 3 subs with an exact integer remainder (non-divisible bytes)', function () { + const fields = { a: 'alpha', b: 'bravo', c: 'charlie payload here' }; + const handles = ['Nsub1', 'Nsub2', 'Nsub3']; + const docView = makeDocView(fields, handles); + const namedSubs = new Map([ + ['sub1', { _name: 'pubA' }], + ['sub2', { _name: 'pubB' }], + ['sub3', { _name: 'pubC' }] + ]); + const session = makeSession( + 's', + new Map([['messages', makeCollectionView({ d: docView })]]), + namedSubs + ); + Meteor.server = makeServer([session], { messages: STRATEGIES.SERVER_MERGE }); + + collector._collect(); + + const rows = emittedRows(); + expect(rows).to.have.lengthOf(3); + + const fullBytes = docBytesOf(fields); + const sum = rows.reduce((acc, r) => acc + r.bytesHeld, 0); + // EXACTLY sum-preserving whether or not fullBytes divides evenly by 3. + expect(sum).to.equal(fullBytes); + // Largest-remainder split -> per-handle shares differ by at most 1 byte. + const vals = rows.map(r => r.bytesHeld).sort((x, y) => x - y); + expect(vals[vals.length - 1] - vals[0]).to.be.at.most(1); + }); + it('does not emit a separate per-collection truth row (avoids double-count)', function () { const fields = { text: 'x' }; const handles = ['Nsub1', 'Nsub2']; @@ -510,6 +540,38 @@ describe('MergeboxCollector', function () { }); }); + // ========================================== + // buildHash + // ========================================== + describe('buildHash', function () { + it('omits buildHash when not configured', function () { + const session = makeSession( + 's', + new Map([['c', makeCollectionView({ d: makeDocView({ a: 'x' }, ['Ux']) })]]) + ); + Meteor.server = makeServer([session]); + collector._collect(); + expect(emittedRows()[0]).to.not.have.property('buildHash'); + }); + + it('stamps buildHash on rows when configured', function () { + const stub = sinon.stub(); + const c = new MergeboxCollector({ + client: { addMergeboxMetric: stub }, + host: 'h', + buildHash: 'abc123' + }); + const session = makeSession( + 's', + new Map([['c', makeCollectionView({ d: makeDocView({ a: 'x' }, ['Ux']) })]]) + ); + Meteor.server = makeServer([session]); + c._collect(); + const row = stub.getCalls().map(x => x.args[0])[0]; + expect(row.buildHash).to.equal('abc123'); + }); + }); + // ========================================== // maxRows cap // ========================================== @@ -540,6 +602,32 @@ describe('MergeboxCollector', function () { }); }); + // ========================================== + // maxDocsPerSession cap + // ========================================== + describe('maxDocsPerSession cap', function () { + it('stops walking a session once the cap is reached (for...of break)', function () { + const stub = sinon.stub(); + const c = new MergeboxCollector({ + client: { addMergeboxMetric: stub }, + host: 'h', + maxDocsPerSession: 2 + }); + const docs = {}; + const namedSubs = new Map(); + for (let i = 0; i < 5; i++) { + docs[`d${i}`] = makeDocView({ v: `value-${i}` }, [`Nsub${i}`]); + namedSubs.set(`sub${i}`, { _name: `pub${i}` }); + } + const session = makeSession('s', new Map([['c', makeCollectionView(docs)]]), namedSubs); + Meteor.server = makeServer([session]); + c._collect(); + const rows = stub.getCalls().map(x => x.args[0]); + // cap = 2 -> only the first 2 docs walked -> 2 distinct-pub rows, not 5 + expect(rows).to.have.lengthOf(2); + }); + }); + // ========================================== // start / stop // ==========================================