Span Buffer Multiprocess Enhancement with Health Monitoring#10
Span Buffer Multiprocess Enhancement with Health Monitoring#10akshayutture-augment wants to merge 1 commit into
Conversation
|
@coderabbitai review |
|
@coderabbitai review |
✅ Actions performedReview triggered.
|
WalkthroughThe changes implement multi-process shard parallelism for the span flusher consumer. A new Changes
Sequence Diagram(s)sequenceDiagram
participant Config as CLI Config
participant Factory as ProcessSpansStrategyFactory
participant Flusher as SpanFlusher
participant Processes as Worker Processes
Config->>Factory: Create with flusher_processes=N
Factory->>Flusher: __init__(max_processes=N)
activate Flusher
Flusher->>Flusher: Compute shard-to-process mapping
Flusher->>Flusher: Create process_to_shards_map<br/>per-process state trackers
deactivate Flusher
rect rgb(200, 240, 255)
note over Factory,Flusher: Process Creation Phase
loop For each shard subset
Flusher->>Processes: _create_process_for_shards(shards)
Processes->>Processes: Start worker (main method)
end
end
rect rgb(240, 200, 255)
note over Flusher,Processes: Runtime: Per-Process Shard Handling
Flusher->>Processes: submit() accumulates per-process buffers
Processes->>Processes: main(shards=[...]) processes shard subset
Processes->>Processes: Tag metrics with shard_tag
Processes->>Flusher: Report per-process backpressure/health
end
rect rgb(240, 220, 200)
note over Flusher,Processes: Lifecycle Management
Flusher->>Processes: _ensure_processes_alive checks all processes
Flusher->>Processes: On failure: restart via _create_process_for_shards
Flusher->>Processes: join() terminates all worker processes
end
Estimated code review effort🎯 4 (Complex) | ⏱️ ~45–60 minutes Areas requiring close attention:
Poem
Pre-merge checks and finishing touches❌ Failed checks (1 warning)
✅ Passed checks (2 passed)
✨ Finishing touches
🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
|
@coderabbitai review |
✅ Actions performedReview triggered.
|
There was a problem hiding this comment.
Actionable comments posted: 1
🧹 Nitpick comments (7)
src/sentry/spans/consumers/process/flusher.py (5)
43-80: Max-processes and shard mapping logic mostly sound; consider minor edge-case guardThe per-process shard mapping and state dictionaries look coherent, and
num_processes = min(max_processes, len(buffer.assigned_shards))correctly bounds process count by shard count.One small defensive improvement you might want to consider:
- If
buffer.assigned_shardswere ever empty (e.g. factory called with an empty partition set),self.max_processesandself.num_processeswould both become 0 and_create_processes()would create no workers. If that state is possible, explicitly asserting or handling the “no shards” case at construction time (e.g. early-return or a clear error) would make the behavior easier to reason about.Otherwise, the mapping and per-process state initializations (
process_*dicts,self.buffers) are consistent with the multi-process design.
81-133: Process/thread creation helpers are correct;_create_process_for_shardappears unusedThe split into
_create_processes,_create_process_for_shards, and_create_process_for_shardmakes the lifecycle clearer, and the waybuffer/shardsare bound into the target callable viarun_with_initialized_sentryvsfunctools.partialis correct for both process and thread modes.Two small observations:
_create_process_for_shardis not referenced anywhere in this file; unless it’s used via indirection elsewhere, it looks like dead code. Removing it (or adding a call site) would reduce confusion.- The
process_healthy_sincereset before (re)starting a worker is a sensible race-avoidance tactic and matches the health checking below.
134-200: Shard-aware tagging and per-process backpressure look correct; align metric tag keysThe updated
mainsignature and usage (buffer,shards, sharedValues) is consistent, and the shard tag construction plus:
- per-process backpressure (
backpressure_since) and- per-process health (
healthy_since)all line up with the new dictionaries on the parent side.
Minor metrics nits:
- You use
tags={"shard": shard_tag}for"spans.buffer.flusher.produce"and"spans.buffer.segment_size_bytes", buttags={"shards": shard_tag}for"spans.buffer.flusher.wait_produce". For downstream metrics consumers, it will likely be simpler if the tag key is consistent across all these timers (either allshardor allshards).shard_tagitself is a comma-separated list of shards, which is fine, but the mix of singular/plural keys can be confusing when this value represents multiple shards.These are cosmetic, but tightening them now will avoid metrics churn later.
267-315: submit/backpressure/memory logic matches new multi-buffer designThe changes in
submitare aligned with the per-process structure:
_ensure_processes_alive()is called before any work, which is a reasonable place for health checks.buffer.record_stored_segments()andbuffer.get_memory_info()are invoked acrossself.buffers.values(), covering all shard groups.- Backpressure is now computed across
self.process_backpressure_since.values(), which correctly generalizes the previous single-Value approach.One subtle point on memory accounting:
- If
SpansBuffer.get_memory_info()returns per-service (ServiceMemory) entries that are identical across buffers (e.g., each buffer returns overall Redis memory for the same service), summingusedandavailableover all buffers will scale both numerator and denominator by the same factor, leaving theused / availableratio unchanged. That’s logically safe but does redundant work.- If
get_memory_info()is truly per-shard or per-service, the aggregate sum is also fine.This is more about efficiency than correctness; if this path is hot and
get_memory_info()is expensive, you might later want to deduplicate calls per underlying service/cluster.
328-347: join implementation works but can be simplified and made more lint‑friendlyThe new
joincorrectly:
- sets
stoppedbefore joiningnext_step, and- then waits for each background worker to finish within the optional timeout.
A couple of minor cleanups:
You don’t use
process_indexinfor process_index, process in self.processes.items():, which is what Ruff is flagging. You can iterate values directly:for process in self.processes.values(): ...
deadline = time.time() + timeout if timeout else Nonetreatstimeout=0as “no deadline”; if consumers rely ontimeout=0meaning “don’t wait”, they won’t get that behavior. If this is a public API and 0 is a realistic input, consider distinguishingNonefrom 0 explicitly.These are small polish items; behavior as written is otherwise consistent.
tests/sentry/spans/consumers/process/test_consumer.py (2)
12-63: Basic consumer test adjustments: behavior OK, but sleep comment is misleading
- Marking
test_basicas@pytest.mark.django_db(transaction=True)is consistent with the use of real DB-backed components in this path.- The extra
step.poll()after updatingcurrent_driftis reasonable to give the pipeline another chance to drive the flusher.However:
# Give flusher threads time to process after drift change time.sleep(0.1)is a no-op because you previously did:
monkeypatch.setattr("time.sleep", lambda _: None)So the comment doesn’t match reality, and the test is not actually sleeping here. If the test does not need real time to elapse, you can simply drop the comment and the
time.sleepcall; if you do need actual sleeping, patch only the module(s) under test (e.g.,sentry.spans.consumers.process.flusher.time.sleep) rather than the globaltime.sleep.
84-123: Renameforceto_forceto suppress Ruff ARG001 lint warning intest_flusher_processes_limitThe test correctly verifies flusher process limits and shard distribution. The suggestion to rename the unused
forceparameter to_forcein theadd_commitfunction (line 106) is valid—the parameter is never used in the function body and the underscore prefix is the standard Python convention for intentionally unused parameters. This change preserves caller compatibility while suppressing the lint warning.
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (6)
CLAUDE.md(1 hunks)src/sentry/consumers/__init__.py(1 hunks)src/sentry/spans/consumers/process/factory.py(3 hunks)src/sentry/spans/consumers/process/flusher.py(8 hunks)tests/sentry/spans/consumers/process/test_consumer.py(4 hunks)tests/sentry/spans/consumers/process/test_flusher.py(1 hunks)
🧰 Additional context used
🪛 Ruff (0.14.5)
src/sentry/spans/consumers/process/flusher.py
248-250: Avoid specifying long messages outside the exception class
(TRY003)
337-337: Loop control variable process_index not used within loop body
Rename unused process_index to _process_index
(B007)
tests/sentry/spans/consumers/process/test_consumer.py
106-106: Unused function argument: force
(ARG001)
🔇 Additional comments (4)
CLAUDE.md (1)
448-457: Type-checking example is correct and reinforces best practiceThe new
hasattrvsisinstanceexample is accurate and aligns with how unions should be handled; good addition to the anti-patterns section.tests/sentry/spans/consumers/process/test_flusher.py (1)
78-83: Backpressure assertion correctly updated for per-process trackingSwitching the assertion to:
assert any(x.value for x in flusher.process_backpressure_since.values())matches the new
process_backpressure_since: dict[int, Value]shape and still validates that at least one flusher worker has entered backpressure. This is the right adaptation of the test.src/sentry/spans/consumers/process/factory.py (1)
34-76: Factory wiring offlusher_processesinto SpanFlusher looks correct
- The new
flusher_processes: int | None = Noneparameter is stored asself.flusher_processes.- In
create_with_partitions, you passmax_processes=self.flusher_processesintoSpanFlusher, preserving prior behavior when it isNone.- Because
SpanFlusherinternally boundsnum_processesbylen(buffer.assigned_shards), you won’t oversubscribe workers relative to shards even if a higherflusher_processesis configured.This is a clean, backwards-compatible extension of the factory.
src/sentry/consumers/__init__.py (1)
430-438: All parameter wiring verified and correct.The verification confirms that the
--flusher-processesparameter is properly implemented:
- ProcessSpansStrategyFactory correctly accepts
flusher_processes: int | None = None(factory.py, line 41)- The parameter is properly passed through to SpanFlusher instantiation via
max_processes=self.flusher_processes(factory.py, line 74)- The implementation is complete and correctly wired from CLI option to flusher
The original review's suggestions for validation and clarified help text remain as optional enhancements—the code functions correctly as-is.
| def _ensure_processes_alive(self) -> None: | ||
| max_unhealthy_seconds = options.get("spans.buffer.flusher.max-unhealthy-seconds") | ||
| if not self.process.is_alive(): | ||
| exitcode = getattr(self.process, "exitcode", "unknown") | ||
| cause = f"no_process_{exitcode}" | ||
| elif int(time.time()) - self.healthy_since.value > max_unhealthy_seconds: | ||
| cause = "hang" | ||
| else: | ||
| return # healthy | ||
|
|
||
| metrics.incr("spans.buffer.flusher_unhealthy", tags={"cause": cause}) | ||
| if self.process_restarts > MAX_PROCESS_RESTARTS: | ||
| raise RuntimeError(f"flusher process crashed repeatedly ({cause}), restarting consumer") | ||
| for process_index, process in self.processes.items(): | ||
| if not process: | ||
| continue | ||
|
|
||
| shards = self.process_to_shards_map[process_index] | ||
|
|
||
| cause = None | ||
| if not process.is_alive(): | ||
| exitcode = getattr(process, "exitcode", "unknown") | ||
| cause = f"no_process_{exitcode}" | ||
| elif ( | ||
| int(time.time()) - self.process_healthy_since[process_index].value | ||
| > max_unhealthy_seconds | ||
| ): | ||
| # Check if any shard handled by this process is unhealthy | ||
| cause = "hang" | ||
|
|
||
| if cause is None: | ||
| continue # healthy | ||
|
|
||
| # Report unhealthy for all shards handled by this process | ||
| for shard in shards: | ||
| metrics.incr( | ||
| "spans.buffer.flusher_unhealthy", tags={"cause": cause, "shard": shard} | ||
| ) | ||
|
|
||
| try: | ||
| self.process.kill() | ||
| except ValueError: | ||
| pass # Process already closed, ignore | ||
| if self.process_restarts[process_index] > MAX_PROCESS_RESTARTS: | ||
| raise RuntimeError( | ||
| f"flusher process for shards {shards} crashed repeatedly ({cause}), restarting consumer" | ||
| ) | ||
| self.process_restarts[process_index] += 1 | ||
|
|
||
| self.process_restarts += 1 | ||
| self._create_process() | ||
| try: | ||
| if isinstance(process, multiprocessing.Process): | ||
| process.kill() | ||
| except (ValueError, AttributeError): | ||
| pass # Process already closed, ignore | ||
|
|
||
| self._create_process_for_shards(process_index, shards) | ||
|
|
There was a problem hiding this comment.
🧩 Analysis chain
Health monitoring and restart logic: watch process type check and thread-mode restarts
The per-process health logic is a good upgrade, but a couple of details are worth revisiting:
-
Process type check before
kill()if isinstance(process, multiprocessing.Process): process.kill()
Here
processinstances are created viaself.mp_context.Process(...), whose type ismultiprocessing.context.SpawnProcess. Relying onisinstance(process, multiprocessing.Process)may or may not match depending on the exact class hierarchy and could silently skip the kill step in some environments.To make this robust and independent of the concrete multiprocessing implementation, consider checking capabilities instead of type, e.g.:
if hasattr(process, "kill"): process.kill()
while explicitly excluding threads if you want:
import threading if not isinstance(process, threading.Thread) and hasattr(process, "kill"): process.kill()
-
Thread mode (
produce_to_pipe) restart behaviorIn thread mode,
processis athreading.Thread, so thekill()branch is skipped. If a thread is considered “hung” (health exceeds threshold),_create_process_for_shardswill start an additional thread for the same shard set without stopping the old one. That’s acceptable in tests but could lead to multiple flusher threads per shard ifproduce_to_pipeis ever used outside unit tests.If you expect thread mode beyond tests, it would be safer to special-case threads (e.g., don’t auto-restart on hang, or track/join and only create one replacement).
Given the current docstring says produce_to_pipe is for unit testing, treating these as future-proofing rather than critical bugs seems reasonable.
🏁 Script executed:
#!/bin/bash
# Check actual types used for SpanFlusher processes to validate the isinstance check.
rg -n "class SpanFlusher" -C3
rg -n "mp_context.Process" -C3
python - << 'PY'
import multiprocessing as mp, inspect
ctx = mp.get_context("spawn")
p = ctx.Process(target=print, args=("x",))
print("Process type:", type(p))
print("Is instance of multiprocessing.Process:", isinstance(p, mp.Process))
PYLength of output: 1908
🏁 Script executed:
#!/bin/bash
# Find _create_process_for_shards implementation and process creation/tracking logic
rg -n "_create_process_for_shards" -A20 src/sentry/spans/consumers/process/flusher.py
# Check how processes are stored and if there's any cleanup/replacement logic
rg -n "self.processes\[" -C2 src/sentry/spans/consumers/process/flusher.py
# Look for thread-specific handling or guards
rg -n "threading.Thread" -C3 src/sentry/spans/consumers/process/flusher.pyLength of output: 4162
Fix broken isinstance check for process type and implement proper thread cleanup
Your concerns are confirmed by the code. Both issues are real:
-
Line 249:
isinstance(process, multiprocessing.Process)fails for SpawnProcessThe Python test confirms that
multiprocessing.get_context("spawn").Process(...)returnsSpawnProcess, which is not an instance ofmultiprocessing.Process. This meansprocess.kill()at line 253 is never executed. Old multiprocessing processes become orphans and accumulate as zombie processes.Fix: Use
hasattr(process, "kill")instead:if hasattr(process, "kill"): process.kill()
-
Line 124 assigns new thread without stopping old one
In thread mode,
_create_process_for_shardscreates a new thread and overwrites the dict entry at line 124 without stopping the old thread. Both threads continue running for the same shard set, creating duplicates.Fix: For thread mode, either (a) join the old thread before creating a new one, or (b) prevent auto-restart on hang for threads, or (c) explicitly exclude threads from kill logic and don't auto-restart them.
🧰 Tools
🪛 Ruff (0.14.5)
248-250: Avoid specifying long messages outside the exception class
(TRY003)
Test 6
Summary by CodeRabbit
Release Notes
New Features
--flusher-processesCLI optionDocumentation
Tests
Replicated from ai-code-review-evaluation/sentry-coderabbit#6