Span Buffer Multiprocess Enhancement with Health Monitoring#5
Conversation
There was a problem hiding this comment.
Pull Request Overview
This PR enhances the span buffer flusher from a single-threaded model to a multiprocess architecture with health monitoring capabilities. The changes enable the flusher to distribute work across multiple processes based on assigned shards while maintaining proper health checks and resource monitoring.
- Adds multiprocess support to the span flusher with configurable process limits
- Implements per-process health monitoring and backpressure tracking
- Adds a new command-line option for controlling flusher process count
Reviewed Changes
Copilot reviewed 6 out of 6 changed files in this pull request and generated 6 comments.
Show a summary per file
| File | Description |
|---|---|
| src/sentry/spans/consumers/process/flusher.py | Major refactor to support multiprocess architecture with shard distribution and per-process health monitoring |
| src/sentry/spans/consumers/process/factory.py | Adds flusher_processes parameter to control maximum flusher processes |
| src/sentry/consumers/init.py | Adds CLI option for configuring flusher processes |
| tests/sentry/spans/consumers/process/test_flusher.py | Updates test assertion to work with new multiprocess structure |
| tests/sentry/spans/consumers/process/test_consumer.py | Adds test coverage for process limiting and improves test timing |
| CLAUDE.md | Adds style guideline for union type checking |
| shards: list[int], | ||
| stopped, | ||
| current_drift, | ||
| backpressure_since, | ||
| healthy_since, | ||
| produce_to_pipe: Callable[[KafkaPayload], None] | None, | ||
| ) -> None: | ||
| shard_tag = ",".join(map(str, shards)) |
There was a problem hiding this comment.
[nitpick] The main method signature has grown complex with 7 parameters. Consider grouping related parameters into a configuration object or data class to improve maintainability and readability.
| shards: list[int], | |
| stopped, | |
| current_drift, | |
| backpressure_since, | |
| healthy_since, | |
| produce_to_pipe: Callable[[KafkaPayload], None] | None, | |
| ) -> None: | |
| shard_tag = ",".join(map(str, shards)) | |
| config: "FlusherConfig", | |
| ) -> None: | |
| shard_tag = ",".join(map(str, config.shards)) |
| with metrics.timer("spans.buffer.flusher.produce"): | ||
| for _, flushed_segment in flushed_segments.items(): | ||
| with metrics.timer("spans.buffer.flusher.produce", tags={"shard": shard_tag}): | ||
| for flushed_segment in flushed_segments.values(): |
There was a problem hiding this comment.
The underscore variable _ was removed from the tuple unpacking. This change from for _, flushed_segment in flushed_segments.items() to for flushed_segment in flushed_segments.values() is correct, but the commit should verify that the key is indeed not needed elsewhere.
| for flushed_segment in flushed_segments.values(): | |
| for _, flushed_segment in flushed_segments.items(): |
| produce(kafka_payload) | ||
|
|
||
| with metrics.timer("spans.buffer.flusher.wait_produce"): | ||
| with metrics.timer("spans.buffer.flusher.wait_produce", tags={"shards": shard_tag}): |
There was a problem hiding this comment.
Inconsistent tag naming: line 185 uses "shard" while line 199 uses "shards" for the same data. This should be consistent throughout the codebase.
| with metrics.timer("spans.buffer.flusher.wait_produce", tags={"shards": shard_tag}): | |
| with metrics.timer("spans.buffer.flusher.wait_produce", tags={"shard": shard_tag}): |
| # Create a buffer for these specific shards | ||
| shard_buffer = SpansBuffer(shards) |
There was a problem hiding this comment.
[nitpick] Creating multiple SpansBuffer instances for different shards may lead to resource duplication. Consider if the original buffer can be shared or if there's a more efficient way to partition the work.
| # Create a buffer for these specific shards | |
| shard_buffer = SpansBuffer(shards) | |
| # Use the shared buffer for all shards | |
| shard_buffer = self.buffer |
| # Give flusher threads time to process after drift change | ||
| time.sleep(0.1) |
There was a problem hiding this comment.
Adding a fixed sleep in tests can make them flaky. Consider using a more deterministic approach like waiting for a specific condition or mocking the timing mechanism.
| # Give flusher threads time to process after drift change | |
| time.sleep(0.1) | |
| # Wait for flusher threads to process after drift change | |
| start_time = time.time() | |
| while not messages and time.time() - start_time < 5: # Timeout after 5 seconds | |
| time.sleep(0.01) # Check every 10ms |
| def _create_process_for_shard(self, shard: int): | ||
| # Find which process this shard belongs to and restart that process | ||
| for process_index, shards in self.process_to_shards_map.items(): | ||
| if shard in shards: | ||
| self._create_process_for_shards(process_index, shards) | ||
| break |
There was a problem hiding this comment.
The method _create_process_for_shard appears to be unused and may be dead code. If it's intended for future use, consider adding a comment explaining its purpose or removing it if not needed.
| def _create_process_for_shard(self, shard: int): | |
| # Find which process this shard belongs to and restart that process | |
| for process_index, shards in self.process_to_shards_map.items(): | |
| if shard in shards: | |
| self._create_process_for_shards(process_index, shards) | |
| break | |
| # Removed the `_create_process_for_shard` method as it is unused and redundant. |
Test 6