Span Buffer Multiprocess Enhancement with Health Monitoring#5
Span Buffer Multiprocess Enhancement with Health Monitoring#5ShashankFC wants to merge 1 commit into
Conversation
|
@cubic-dev-ai review this pull request |
@ShashankFC I have started the AI code review. It will take a few minutes to complete. |
There was a problem hiding this comment.
3 issues found across 6 files
Prompt for AI agents (all issues)
Check if these issues are valid — if so, understand the root cause of each and fix them.
<file name="tests/sentry/spans/consumers/process/test_consumer.py">
<violation number="1" location="tests/sentry/spans/consumers/process/test_consumer.py:62">
P2: `time.sleep(0.1)` is a no-op here because `monkeypatch.setattr("time.sleep", lambda _: None)` on line 15 patches `time.sleep` globally on the module. This call does nothing, making the comment about giving flusher threads time to process misleading. If synchronization is actually needed, use a different mechanism (e.g., a threading `Event` or condition variable). If it's not needed, remove this dead code.</violation>
</file>
<file name="src/sentry/spans/consumers/process/flusher.py">
<violation number="1" location="src/sentry/spans/consumers/process/flusher.py:127">
P2: `_create_process_for_shard` is unused (dead code) and, if it were called, would fail to kill the existing process before creating a replacement — causing duplicate processing for the same shards. Consider removing this method or adding the missing `process.kill()` logic.</violation>
<violation number="2" location="src/sentry/spans/consumers/process/flusher.py:199">
P2: Inconsistent metric tag key: `"shards"` (plural) is used here while all other metrics in this method use `"shard"` (singular). This will cause this metric to be unqueryable alongside the others in dashboards/alerts.</violation>
</file>
Reply with feedback, questions, or to request a fix. Tag @cubic-dev-ai to re-run a review.
|
|
||
| step.poll() | ||
| # Give flusher threads time to process after drift change | ||
| time.sleep(0.1) |
There was a problem hiding this comment.
P2: time.sleep(0.1) is a no-op here because monkeypatch.setattr("time.sleep", lambda _: None) on line 15 patches time.sleep globally on the module. This call does nothing, making the comment about giving flusher threads time to process misleading. If synchronization is actually needed, use a different mechanism (e.g., a threading Event or condition variable). If it's not needed, remove this dead code.
Prompt for AI agents
Check if this issue is valid — if so, understand the root cause and fix it. At tests/sentry/spans/consumers/process/test_consumer.py, line 62:
<comment>`time.sleep(0.1)` is a no-op here because `monkeypatch.setattr("time.sleep", lambda _: None)` on line 15 patches `time.sleep` globally on the module. This call does nothing, making the comment about giving flusher threads time to process misleading. If synchronization is actually needed, use a different mechanism (e.g., a threading `Event` or condition variable). If it's not needed, remove this dead code.</comment>
<file context>
@@ -56,6 +57,10 @@ def add_commit(offsets, force=False):
+ step.poll()
+ # Give flusher threads time to process after drift change
+ time.sleep(0.1)
+
step.join()
</file context>
| self.processes[process_index] = process | ||
| self.buffers[process_index] = shard_buffer | ||
|
|
||
| def _create_process_for_shard(self, shard: int): |
There was a problem hiding this comment.
P2: _create_process_for_shard is unused (dead code) and, if it were called, would fail to kill the existing process before creating a replacement — causing duplicate processing for the same shards. Consider removing this method or adding the missing process.kill() logic.
Prompt for AI agents
Check if this issue is valid — if so, understand the root cause and fix it. At src/sentry/spans/consumers/process/flusher.py, line 127:
<comment>`_create_process_for_shard` is unused (dead code) and, if it were called, would fail to kill the existing process before creating a replacement — causing duplicate processing for the same shards. Consider removing this method or adding the missing `process.kill()` logic.</comment>
<file context>
@@ -72,37 +100,50 @@ def _create_process(self):
+ self.processes[process_index] = process
+ self.buffers[process_index] = shard_buffer
+
+ def _create_process_for_shard(self, shard: int):
+ # Find which process this shard belongs to and restart that process
+ for process_index, shards in self.process_to_shards_map.items():
</file context>
| produce(kafka_payload) | ||
|
|
||
| with metrics.timer("spans.buffer.flusher.wait_produce"): | ||
| with metrics.timer("spans.buffer.flusher.wait_produce", tags={"shards": shard_tag}): |
There was a problem hiding this comment.
P2: Inconsistent metric tag key: "shards" (plural) is used here while all other metrics in this method use "shard" (singular). This will cause this metric to be unqueryable alongside the others in dashboards/alerts.
Prompt for AI agents
Check if this issue is valid — if so, understand the root cause and fix it. At src/sentry/spans/consumers/process/flusher.py, line 199:
<comment>Inconsistent metric tag key: `"shards"` (plural) is used here while all other metrics in this method use `"shard"` (singular). This will cause this metric to be unqueryable alongside the others in dashboards/alerts.</comment>
<file context>
@@ -134,23 +175,28 @@ def produce(payload: KafkaPayload) -> None:
produce(kafka_payload)
- with metrics.timer("spans.buffer.flusher.wait_produce"):
+ with metrics.timer("spans.buffer.flusher.wait_produce", tags={"shards": shard_tag}):
for future in producer_futures:
future.result()
</file context>
| with metrics.timer("spans.buffer.flusher.wait_produce", tags={"shards": shard_tag}): | |
| with metrics.timer("spans.buffer.flusher.wait_produce", tags={"shard": shard_tag}): |
Test 6
Summary by CodeRabbit
New Features
Documentation
Tests
✏️ Tip: You can customize this high-level summary in your review settings.
Replicated from ai-code-review-evaluation/sentry-coderabbit#6