Span Buffer Multiprocess Enhancement with Health Monitoring#5
Span Buffer Multiprocess Enhancement with Health Monitoring#5ShashankFC wants to merge 1 commit into
Conversation
|
@CodeAnt-AI: review |
|
CodeAnt AI is running the review. |
Sequence DiagramShows how the consumer/factory configures a multiprocess SpanFlusher, how worker processes flush span segments to Kafka, and how the manager monitors health and enforces backpressure/restarts when needed. sequenceDiagram
participant Consumer
participant Factory
participant SpanFlusher
participant FlusherProcess as WorkerProcess
participant Kafka
Consumer->>Factory: create_with_partitions(commit, partitions, flusher_processes=N)
Factory->>SpanFlusher: instantiate manager (max_processes=N) and start N worker processes
SpanFlusher->>WorkerProcess: worker polls assigned shard buffer and flushes segments
WorkerProcess->>Kafka: produce flushed segment payloads
WorkerProcess-->>SpanFlusher: update healthy/backpressure flags
SpanFlusher->>Consumer: on unhealthy/backpressure -> raise MessageRejected (pause consumer) and restart worker process
Generated by CodeAnt AI |
| produce(kafka_payload) | ||
|
|
||
| with metrics.timer("spans.buffer.flusher.wait_produce"): | ||
| with metrics.timer("spans.buffer.flusher.wait_produce", tags={"shards": shard_tag}): |
There was a problem hiding this comment.
Suggestion: The metrics for flusher wait time use the tag key "shards" while the corresponding produce timer uses "shard", so dashboards and alerts that expect a consistent tag name per shard will not be able to correlate produce and wait metrics correctly; the tag key should be made consistent (e.g. always "shard"). [logic error]
Severity Level: Major ⚠️
- ⚠️ Harder correlating wait and produce metrics per shard.
- ⚠️ Dashboards grouping by shard miss wait_produce timer.| with metrics.timer("spans.buffer.flusher.wait_produce", tags={"shards": shard_tag}): | |
| with metrics.timer("spans.buffer.flusher.wait_produce", tags={"shard": shard_tag}): |
Steps of Reproduction ✅
1. Start the spans consumer using `ProcessSpansStrategyFactory.create_with_partitions` in
`src/sentry/spans/consumers/process/factory.py:58-76`, which constructs a `SpanFlusher`
that manages shard-specific buffers and child processes.
2. Allow spans to be ingested so that `SpanFlusher.main` (child process entrypoint at
`src/sentry/spans/consumers/process/flusher.py:134-213`) begins flushing segments. For
each non-empty `flushed_segments` batch, it records a produce timer at `flusher.py:185`:
`with metrics.timer("spans.buffer.flusher.produce", tags={"shard": shard_tag}): ...`.
3. Immediately after producing Kafka messages, it records a wait timer at
`flusher.py:199`:
`with metrics.timer("spans.buffer.flusher.wait_produce", tags={"shards": shard_tag}):
...`.
Note the different tag keys: `"shard"` for the produce timer vs `"shards"` for the wait
timer.
4. In the metrics backend used by `sentry.utils.metrics` (invoked in this same module),
metric series are distinguished by tag keys. As a result, when an operator groups
`spans.buffer.flusher.*` metrics by `shard`, the produce timer
(`spans.buffer.flusher.produce` with `{"shard": shard_tag}`) will be grouped correctly per
shard, but the wait timer (`spans.buffer.flusher.wait_produce` with `{"shards":
shard_tag}`) will not appear under the same `shard` dimension. This makes per-shard
correlation between produce time and wait time difficult or impossible without custom
handling, demonstrating the impact of the inconsistent tag key.Prompt for AI Agent 🤖
This is a comment left during a code review.
**Path:** src/sentry/spans/consumers/process/flusher.py
**Line:** 199:199
**Comment:**
*Logic Error: The metrics for flusher wait time use the tag key `"shards"` while the corresponding produce timer uses `"shard"`, so dashboards and alerts that expect a consistent tag name per shard will not be able to correlate produce and wait metrics correctly; the tag key should be made consistent (e.g. always `"shard"`).
Validate the correctness of the flagged issue. If correct, How can I resolve this? If you propose a fix, implement it and please make it concise.|
|
||
| step.poll() | ||
| # Give flusher threads time to process after drift change | ||
| time.sleep(0.1) |
There was a problem hiding this comment.
Suggestion: The call to time.sleep(0.1) is rendered a no-op by the earlier monkeypatch.setattr("time.sleep", lambda _: None), so there is no guaranteed delay between advancing the flusher's drift and calling step.join(), which can cause a race where the flusher thread has not yet flushed any spans and the subsequent unpacking of messages fails intermittently. Replacing this with a wait mechanism that does not rely on the patched time.sleep ensures the background flusher has a real opportunity to run before join() shuts it down. [race condition]
Severity Level: Major ⚠️
- ❌ `test_basic` for span consumer can fail intermittently.
- ⚠️ CI reliability reduced due to nondeterministic flakiness.
- ⚠️ Flusher timing behavior insufficiently validated under concurrent conditions.| time.sleep(0.1) | |
| start = time.time() | |
| while time.time() - start < 0.1: | |
| pass |
Steps of Reproduction ✅
1. Run the test directly with pytest: `pytest
tests/sentry/spans/consumers/process/test_consumer.py::test_basic` (entry point in
`tests/sentry/spans/consumers/process/test_consumer.py:12`).
2. In `test_basic` (`test_consumer.py:13-16`), `monkeypatch.setattr("time.sleep", lambda
_: None)` globally replaces `time.sleep` with a no-op, affecting both the test and
`SpanFlusher` implementation (`src/sentry/spans/consumers/process/flusher.py:1-214`).
3. The test submits a span and calls `step.poll()` then manually advances the flusher
clock via `fac._flusher.current_drift.value = 9000` (`test_consumer.py:57-58`), followed
by another `step.poll()` and the comment-intended delay `time.sleep(0.1)` at
`test_consumer.py:60-62`. Because of the monkeypatch, that `time.sleep(0.1)` does not
actually wait.
4. Immediately after the no-op sleep, `step.join()` is invoked (`test_consumer.py:64`),
which ultimately calls `SpanFlusher.join()` (`flusher.py:328-345`), setting
`self.stopped.value = True` before waiting for background flusher threads to stop. If, at
this point, a flusher thread is in the 1-second sleep path of its main loop
(`time.sleep(1)` at `flusher.py:181-183`, also monkeypatched to no-op) and has not yet
executed `buffer.flush_segments(now=now)` with the updated `current_drift`, it will exit
without ever producing to `messages.append`. The subsequent destructuring `(msg,) =
messages` at `test_consumer.py:66` then raises `ValueError: not enough values to unpack
(expected 1, got 0)`, making `test_basic` flaky depending on timing between the drift
update and `step.join()`.Prompt for AI Agent 🤖
This is a comment left during a code review.
**Path:** tests/sentry/spans/consumers/process/test_consumer.py
**Line:** 62:62
**Comment:**
*Race Condition: The call to `time.sleep(0.1)` is rendered a no-op by the earlier `monkeypatch.setattr("time.sleep", lambda _: None)`, so there is no guaranteed delay between advancing the flusher's drift and calling `step.join()`, which can cause a race where the flusher thread has not yet flushed any spans and the subsequent unpacking of `messages` fails intermittently. Replacing this with a wait mechanism that does not rely on the patched `time.sleep` ensures the background flusher has a real opportunity to run before `join()` shuts it down.
Validate the correctness of the flagged issue. If correct, How can I resolve this? If you propose a fix, implement it and please make it concise.|
CodeAnt AI finished running the review. |
User description
Test 6
Summary by CodeRabbit
New Features
Documentation
Tests
✏️ Tip: You can customize this high-level summary in your review settings.
Replicated from ai-code-review-evaluation/sentry-coderabbit#6
CodeAnt-AI Description
Run span flusher with configurable per-shard multiprocessing and per-process health/backpressure
What Changed
Impact
✅ Controlled flusher parallelism across shards✅ Fewer consumer restarts caused by a single flusher crash✅ Clearer flusher health and backpressure metrics per shard💡 Usage Guide
Checking Your Pull Request
Every time you make a pull request, our system automatically looks through it. We check for security issues, mistakes in how you're setting up your infrastructure, and common code problems. We do this to make sure your changes are solid and won't cause any trouble later.
Talking to CodeAnt AI
Got a question or need a hand with something in your pull request? You can easily get in touch with CodeAnt AI right here. Just type the following in a comment on your pull request, and replace "Your question here" with whatever you want to ask:
This lets you have a chat with CodeAnt AI about your pull request, making it easier to understand and improve your code.
Example
Preserve Org Learnings with CodeAnt
You can record team preferences so CodeAnt AI applies them in future reviews. Reply directly to the specific CodeAnt AI suggestion (in the same thread) and replace "Your feedback here" with your input:
This helps CodeAnt AI learn and adapt to your team's coding style and standards.
Example
Retrigger review
Ask CodeAnt AI to review the PR again, by typing:
Check Your Repository Health
To analyze the health of your code repository, visit our dashboard at https://app.codeant.ai. This tool helps you identify potential issues and areas for improvement in your codebase, ensuring your repository maintains high standards of code health.