Span Buffer Multiprocess Enhancement with Health Monitoring#15
Span Buffer Multiprocess Enhancement with Health Monitoring#15ShashankFC wants to merge 1 commit into
Conversation
|
bugbot run |
There was a problem hiding this comment.
Cursor Bugbot has reviewed your changes and found 3 potential issues.
Bugbot Autofix is OFF. To automatically fix reported issues with Cloud Agents, enable Autofix in the Cursor dashboard.
| if isinstance(self.process, multiprocessing.Process): | ||
| self.process.terminate() | ||
| if isinstance(process, multiprocessing.Process): | ||
| process.terminate() |
There was a problem hiding this comment.
Processes not terminated when join deadline expires
High Severity
When the deadline expires during join, the break statement exits the loop, causing remaining processes to never be terminated. The old single-process code always called terminate() regardless of deadline expiry. Now, if the deadline runs out while waiting for an early process, later processes in the dict are completely skipped — no wait and no terminate() call — leaving them running as orphaned daemon processes.
| for process_index, shards in self.process_to_shards_map.items(): | ||
| if shard in shards: | ||
| self._create_process_for_shards(process_index, shards) | ||
| break |
There was a problem hiding this comment.
Unused method _create_process_for_shard is dead code
Low Severity
The method _create_process_for_shard (singular) is defined but never called anywhere in the codebase. All actual callers use _create_process_for_shards (plural) instead. This appears to be leftover code that adds confusion since its name is nearly identical to the actively-used method.
| produce(kafka_payload) | ||
|
|
||
| with metrics.timer("spans.buffer.flusher.wait_produce"): | ||
| with metrics.timer("spans.buffer.flusher.wait_produce", tags={"shards": shard_tag}): |
There was a problem hiding this comment.
Inconsistent metrics tag key name for shard
Low Severity
The spans.buffer.flusher.wait_produce timer uses tags={"shards": shard_tag} (plural), while the sibling spans.buffer.flusher.produce timer and spans.buffer.segment_size_bytes timing both use tags={"shard": shard_tag} (singular). This inconsistency means the wait_produce metric can't be correlated with the other flusher metrics using the same tag filter in dashboards/queries.


Test 6
Summary by CodeRabbit
New Features
Documentation
Tests
✏️ Tip: You can customize this high-level summary in your review settings.
Replicated from ai-code-review-evaluation/sentry-coderabbit#6
Note
Medium Risk
Touches the span ingestion/flushing pipeline and changes process/thread lifecycle, health monitoring, and backpressure behavior, which could impact throughput or consumer stability if misconfigured.
Overview
Updates the
process-spansconsumer to support a configurable number of span flusher workers via a new--flusher-processesoption, and wires this throughProcessSpansStrategyFactoryintoSpanFlusher.Refactors
SpanFlusherfrom a single background worker into a manager that spawns multiple workers and distributes assigned shards across them, tracking per-processhealthy_since, backpressure, restarts, and adding shard-tagged metrics.Tests are updated/added to cover the new multi-worker behavior (including a limit test) and to make the basic consumer test deterministic; docs add a small type-checking example for union types in
CLAUDE.md.Written by Cursor Bugbot for commit 9bf83b0. Configure here.