Span Buffer Multiprocess Enhancement with Health Monitoring#11
Span Buffer Multiprocess Enhancement with Health Monitoring#11ShashankFC wants to merge 1 commit into
Conversation
|
@coderabbitai review |
✅ Actions performedReview triggered.
|
📝 WalkthroughWalkthroughThe PR implements multi-process support for the SpanFlusher component, enabling parallel span processing across configurable process counts. A new Changes
Sequence Diagram(s)sequenceDiagram
participant CLI as CLI Layer
participant Factory as ProcessSpansStrategyFactory
participant Flusher as SpanFlusher
participant ProcMgr as Process Manager
participant Buf as Per-Process Buffers
CLI->>Factory: create(flusher_processes=2)
Factory->>Flusher: __init__(max_processes=2)
Flusher->>Flusher: num_processes = min(2, shard_count)
Flusher->>Flusher: process_to_shards mapping created
rect rgb(220, 240, 250)
Note over Flusher,Buf: Process Creation Phase
loop for each process group
Flusher->>ProcMgr: _create_processes(shard_group)
ProcMgr->>Buf: allocate SpansBuffer per process
ProcMgr->>ProcMgr: spawn Process(main, shards=[...])
end
end
rect rgb(240, 250, 220)
Note over Flusher,Buf: Runtime Health & Backpressure
Flusher->>ProcMgr: _ensure_processes_alive()
ProcMgr->>ProcMgr: check per_process_healthy_since
ProcMgr->>Buf: aggregate memory across buffers
ProcMgr->>ProcMgr: restart unhealthy processes as needed
end
Estimated code review effort🎯 4 (Complex) | ⏱️ ~50 minutes Poem
Pre-merge checks and finishing touches❌ Failed checks (1 warning)
✅ Passed checks (2 passed)
✨ Finishing touches
Comment |
There was a problem hiding this comment.
Actionable comments posted: 3
🤖 Fix all issues with AI Agents
In @src/sentry/spans/consumers/process/flusher.py:
- Around line 185-199: The metrics tag key is inconsistent: the first
metrics.timer and metrics.timing use tags={"shard": shard_tag} but the later
metrics.timer named "spans.buffer.flusher.wait_produce" uses tags={"shards":
shard_tag}; change that tags dict to use the same key "shard" (i.e.,
tags={"shard": shard_tag}) so all three metric calls (the metrics.timer
producing loop, metrics.timing for "spans.buffer.segment_size_bytes", and the
"spans.buffer.flusher.wait_produce" timer) use the identical tag key.
- Around line 51-66: Handle the edge case where max_processes or
buffer.assigned_shards length is 0 by forcing at least one process: set
self.max_processes = max(1, max_processes or len(buffer.assigned_shards)) and
compute self.num_processes = min(self.max_processes, max(1,
len(buffer.assigned_shards))). This ensures process_to_shards_map, the
range(self.num_processes) loop, and the modulo operation (i %
self.num_processes) cannot raise ZeroDivisionError while preserving behavior
when there are shards.
In @tests/sentry/spans/consumers/process/test_consumer.py:
- Around line 84-122: The helper function add_commit in the test has an unused
parameter force; either remove the parameter entirely from add_commit(offsets)
if no external caller requires that signature, or rename it to _force (e.g.,
add_commit(offsets, _force=False)) to signal it is intentionally unused; update
any references if needed and run tests to ensure signature expectations remain
satisfied.
🧹 Nitpick comments (4)
src/sentry/consumers/__init__.py (1)
430-438: LGTM! Consider adding validation for positive values.The new
flusher_processesCLI option is properly structured and maintains backward compatibility with a default of 1.Optional: Add validation to ensure positive values
Consider adding a callback validator to ensure
flusher_processesis at least 1:def validate_positive(ctx, param, value): if value <= 0: raise click.BadParameter("--flusher-processes must be greater than 0") return valueThen apply it:
click.Option( ["--flusher-processes", "flusher_processes"], default=1, type=int, + callback=validate_positive, help="Maximum number of processes for the span flusher. Defaults to 1.", ),src/sentry/spans/consumers/process/flusher.py (3)
253-259:isinstancecheck may not matchSpawnProcesscorrectly.Line 254 checks
isinstance(process, multiprocessing.Process), but the type annotation on line 68 ismultiprocessing.context.SpawnProcess. WhileSpawnProcessinherits fromProcess, usingmultiprocessing.Processhere works, but consider usinghasattr(process, 'kill')for clarity sincethreading.Threadlacks this method.🔎 Alternative approach
try: - if isinstance(process, multiprocessing.Process): + if hasattr(process, 'kill'): process.kill() except (ValueError, AttributeError): pass # Process already closed, ignore
337-347: Unused loop variableprocess_index.The loop variable
process_indexis not used in the loop body. Per the static analysis hint, rename it to_process_indexto indicate it's intentionally unused.🔎 Proposed fix
- for process_index, process in self.processes.items(): + for _process_index, process in self.processes.items(): if deadline is not None: remaining_time = deadline - time.time() if remaining_time <= 0: break
247-251: Consider extracting the exception message to improve maintainability.The static analysis tool flagged TRY003 for the long exception message. While the current message is helpful for debugging, you could define a constant or use a custom exception class for consistency.
🔎 Optional: Use a custom exception or constant
+class FlusherRestartLimitExceeded(RuntimeError): + """Raised when a flusher process exceeds the maximum restart limit.""" + pass + # In _ensure_processes_alive: - if self.process_restarts[process_index] > MAX_PROCESS_RESTARTS: - raise RuntimeError( - f"flusher process for shards {shards} crashed repeatedly ({cause}), restarting consumer" - ) + if self.process_restarts[process_index] > MAX_PROCESS_RESTARTS: + raise FlusherRestartLimitExceeded( + f"shards={shards}, cause={cause}, restarts={self.process_restarts[process_index]}" + )
📜 Review details
Configuration used: defaults
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (6)
CLAUDE.mdsrc/sentry/consumers/__init__.pysrc/sentry/spans/consumers/process/factory.pysrc/sentry/spans/consumers/process/flusher.pytests/sentry/spans/consumers/process/test_consumer.pytests/sentry/spans/consumers/process/test_flusher.py
🧰 Additional context used
🧬 Code graph analysis (3)
tests/sentry/spans/consumers/process/test_consumer.py (1)
src/sentry/spans/consumers/process/factory.py (2)
ProcessSpansStrategyFactory(24-120)create_with_partitions(58-116)
src/sentry/spans/consumers/process/flusher.py (2)
src/sentry/processing/backpressure/memory.py (1)
ServiceMemory(12-24)src/sentry/spans/buffer.py (1)
SpansBuffer(152-541)
tests/sentry/spans/consumers/process/test_flusher.py (2)
tests/sentry/replays/test_project_replay_summarize_breadcrumbs.py (1)
x(122-123)src/sentry/api/event_search.py (1)
value(430-437)
🪛 Ruff (0.14.10)
tests/sentry/spans/consumers/process/test_consumer.py
106-106: Unused function argument: force
(ARG001)
src/sentry/spans/consumers/process/flusher.py
248-250: Avoid specifying long messages outside the exception class
(TRY003)
337-337: Loop control variable process_index not used within loop body
Rename unused process_index to _process_index
(B007)
🔇 Additional comments (11)
tests/sentry/spans/consumers/process/test_flusher.py (1)
83-83: LGTM! Correctly adapted to per-process backpressure tracking.The assertion now checks whether any process is experiencing backpressure, which aligns with the new multi-process architecture where backpressure state is tracked individually per process.
CLAUDE.md (1)
448-458: LGTM! Clear guidance on type narrowing for union types.The examples correctly demonstrate that
isinstance()should be used for type narrowing with union types instead ofhasattr(). This is a valuable addition to the development guidelines.src/sentry/spans/consumers/process/factory.py (2)
41-42: LGTM! Parameter properly declared with backward-compatible default.The
flusher_processesparameter is correctly typed asint | Nonewith aNonedefault, maintaining backward compatibility.
52-52: LGTM! Parameter correctly threaded to SpanFlusher.The
flusher_processesparameter is properly stored and passed asmax_processesto the SpanFlusher, enabling process limit control.Also applies to: 74-74
tests/sentry/spans/consumers/process/test_consumer.py (2)
1-1: LGTM! Proper test setup for multi-process scenarios.The
timeimport andtransaction=Truedecorator are appropriate additions for testing the multi-process flusher behavior with proper timing control and database transaction handling.Also applies to: 12-12
60-62: LGTM! Appropriate timing adjustment for multi-process flusher.The extra
poll()and brief sleep allow the flusher processes time to react to the clock advancement, which is necessary for the new multi-process architecture.src/sentry/spans/consumers/process/flusher.py (5)
1-26: LGTM!Imports are well-organized and appropriate for the multiprocessing enhancements. The addition of
ServiceMemoryaligns with the aggregated memory pressure handling across per-process buffers.
269-284: LGTM!The submit method correctly aggregates buffer state and backpressure across all processes. The any-process-backpressure-triggers-rejection approach is appropriate since the consumer should pause if any shard group is struggling.
318-326: LGTM!The
terminateandclosemethods correctly handle the multi-process lifecycle. The intentional choice to not shut down the flusher inclose()to avoid Redis memory spikes during rebalancing is well-documented.
127-132: Method_create_process_for_shardis unused and should be removed or documented.This private helper method is defined but never called anywhere in the codebase. If it's intended for future functionality, add a docstring explaining its purpose and expected usage. Otherwise, remove it as dead code.
86-126: Add logging when process is restarted to aid debugging.When
_create_process_for_shardsis called on a process restart, a newSpansBufferis created on line 93, replacing the previous buffer reference. Consider adding a log entry when a process is being restarted to help with troubleshooting process lifecycle issues.
| self.max_processes = max_processes or len(buffer.assigned_shards) | ||
|
|
||
| self.mp_context = mp_context = multiprocessing.get_context("spawn") | ||
| self.stopped = mp_context.Value("i", 0) | ||
| self.redis_was_full = False | ||
| self.current_drift = mp_context.Value("i", 0) | ||
| self.backpressure_since = mp_context.Value("i", 0) | ||
| self.healthy_since = mp_context.Value("i", 0) | ||
| self.process_restarts = 0 | ||
| self.produce_to_pipe = produce_to_pipe | ||
|
|
||
| self._create_process() | ||
|
|
||
| def _create_process(self): | ||
| # Determine which shards get their own processes vs shared processes | ||
| self.num_processes = min(self.max_processes, len(buffer.assigned_shards)) | ||
| self.process_to_shards_map: dict[int, list[int]] = { | ||
| i: [] for i in range(self.num_processes) | ||
| } | ||
| for i, shard in enumerate(buffer.assigned_shards): | ||
| process_index = i % self.num_processes | ||
| self.process_to_shards_map[process_index].append(shard) |
There was a problem hiding this comment.
Edge case: max_processes=0 will cause division/iteration issues.
If max_processes is explicitly passed as 0, then num_processes becomes 0, and the modulo operation on line 65 will raise ZeroDivisionError. Consider validating that max_processes is at least 1 or falling back properly.
🔎 Proposed fix
- self.max_processes = max_processes or len(buffer.assigned_shards)
+ self.max_processes = max_processes if max_processes and max_processes > 0 else len(buffer.assigned_shards)🤖 Prompt for AI Agents
In @src/sentry/spans/consumers/process/flusher.py around lines 51 - 66, Handle
the edge case where max_processes or buffer.assigned_shards length is 0 by
forcing at least one process: set self.max_processes = max(1, max_processes or
len(buffer.assigned_shards)) and compute self.num_processes =
min(self.max_processes, max(1, len(buffer.assigned_shards))). This ensures
process_to_shards_map, the range(self.num_processes) loop, and the modulo
operation (i % self.num_processes) cannot raise ZeroDivisionError while
preserving behavior when there are shards.
| with metrics.timer("spans.buffer.flusher.produce", tags={"shard": shard_tag}): | ||
| for flushed_segment in flushed_segments.values(): | ||
| if not flushed_segment.spans: | ||
| continue | ||
|
|
||
| spans = [span.payload for span in flushed_segment.spans] | ||
| kafka_payload = KafkaPayload(None, orjson.dumps({"spans": spans}), []) | ||
| metrics.timing("spans.buffer.segment_size_bytes", len(kafka_payload.value)) | ||
| metrics.timing( | ||
| "spans.buffer.segment_size_bytes", | ||
| len(kafka_payload.value), | ||
| tags={"shard": shard_tag}, | ||
| ) | ||
| produce(kafka_payload) | ||
|
|
||
| with metrics.timer("spans.buffer.flusher.wait_produce"): | ||
| with metrics.timer("spans.buffer.flusher.wait_produce", tags={"shards": shard_tag}): |
There was a problem hiding this comment.
Inconsistent metric tag key: "shard" vs "shards".
Line 185 and 192-195 use "shard" as the tag key, but line 199 uses "shards". This inconsistency will make metric aggregation and dashboarding harder.
🔎 Proposed fix
- with metrics.timer("spans.buffer.flusher.wait_produce", tags={"shards": shard_tag}):
+ with metrics.timer("spans.buffer.flusher.wait_produce", tags={"shard": shard_tag}):📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| with metrics.timer("spans.buffer.flusher.produce", tags={"shard": shard_tag}): | |
| for flushed_segment in flushed_segments.values(): | |
| if not flushed_segment.spans: | |
| continue | |
| spans = [span.payload for span in flushed_segment.spans] | |
| kafka_payload = KafkaPayload(None, orjson.dumps({"spans": spans}), []) | |
| metrics.timing("spans.buffer.segment_size_bytes", len(kafka_payload.value)) | |
| metrics.timing( | |
| "spans.buffer.segment_size_bytes", | |
| len(kafka_payload.value), | |
| tags={"shard": shard_tag}, | |
| ) | |
| produce(kafka_payload) | |
| with metrics.timer("spans.buffer.flusher.wait_produce"): | |
| with metrics.timer("spans.buffer.flusher.wait_produce", tags={"shards": shard_tag}): | |
| with metrics.timer("spans.buffer.flusher.produce", tags={"shard": shard_tag}): | |
| for flushed_segment in flushed_segments.values(): | |
| if not flushed_segment.spans: | |
| continue | |
| spans = [span.payload for span in flushed_segment.spans] | |
| kafka_payload = KafkaPayload(None, orjson.dumps({"spans": spans}), []) | |
| metrics.timing( | |
| "spans.buffer.segment_size_bytes", | |
| len(kafka_payload.value), | |
| tags={"shard": shard_tag}, | |
| ) | |
| produce(kafka_payload) | |
| with metrics.timer("spans.buffer.flusher.wait_produce", tags={"shard": shard_tag}): |
🤖 Prompt for AI Agents
In @src/sentry/spans/consumers/process/flusher.py around lines 185 - 199, The
metrics tag key is inconsistent: the first metrics.timer and metrics.timing use
tags={"shard": shard_tag} but the later metrics.timer named
"spans.buffer.flusher.wait_produce" uses tags={"shards": shard_tag}; change that
tags dict to use the same key "shard" (i.e., tags={"shard": shard_tag}) so all
three metric calls (the metrics.timer producing loop, metrics.timing for
"spans.buffer.segment_size_bytes", and the "spans.buffer.flusher.wait_produce"
timer) use the identical tag key.
| @pytest.mark.django_db(transaction=True) | ||
| def test_flusher_processes_limit(monkeypatch): | ||
| """Test that flusher respects the max_processes limit""" | ||
| # Flush very aggressively to make test pass instantly | ||
| monkeypatch.setattr("time.sleep", lambda _: None) | ||
|
|
||
| topic = Topic("test") | ||
| messages: list[KafkaPayload] = [] | ||
|
|
||
| # Create factory with limited flusher processes | ||
| fac = ProcessSpansStrategyFactory( | ||
| max_batch_size=10, | ||
| max_batch_time=10, | ||
| num_processes=1, | ||
| input_block_size=None, | ||
| output_block_size=None, | ||
| flusher_processes=2, # Limit to 2 processes even if more shards | ||
| produce_to_pipe=messages.append, | ||
| ) | ||
|
|
||
| commits = [] | ||
|
|
||
| def add_commit(offsets, force=False): | ||
| commits.append(offsets) | ||
|
|
||
| # Create with 4 partitions/shards to test process sharing | ||
| partitions = {Partition(topic, i): 0 for i in range(4)} | ||
| step = fac.create_with_partitions(add_commit, partitions) | ||
|
|
||
| # Verify that flusher uses at most 2 processes | ||
| flusher = fac._flusher | ||
| assert len(flusher.processes) == 2 | ||
| assert flusher.max_processes == 2 | ||
| assert flusher.num_processes == 2 | ||
|
|
||
| # Verify shards are distributed across processes | ||
| total_shards = sum(len(shards) for shards in flusher.process_to_shards_map.values()) | ||
| assert total_shards == 4 # All 4 shards should be assigned | ||
|
|
There was a problem hiding this comment.
Excellent test coverage for process limiting feature!
The new test properly validates that the flusher respects the max_processes limit and correctly distributes shards across processes. The test logic is sound and the assertions are comprehensive.
Minor: Unused parameter in mock function.
Line 106: The force parameter in the add_commit function is unused. Consider either removing it or prefixing with an underscore to indicate it's intentionally unused:
🔎 Proposed fix
- def add_commit(offsets, force=False):
+ def add_commit(offsets, _force=False):
commits.append(offsets)Or if the signature doesn't need to match anything specific:
- def add_commit(offsets, force=False):
+ def add_commit(offsets):
commits.append(offsets)Based on static analysis hints.
📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| @pytest.mark.django_db(transaction=True) | |
| def test_flusher_processes_limit(monkeypatch): | |
| """Test that flusher respects the max_processes limit""" | |
| # Flush very aggressively to make test pass instantly | |
| monkeypatch.setattr("time.sleep", lambda _: None) | |
| topic = Topic("test") | |
| messages: list[KafkaPayload] = [] | |
| # Create factory with limited flusher processes | |
| fac = ProcessSpansStrategyFactory( | |
| max_batch_size=10, | |
| max_batch_time=10, | |
| num_processes=1, | |
| input_block_size=None, | |
| output_block_size=None, | |
| flusher_processes=2, # Limit to 2 processes even if more shards | |
| produce_to_pipe=messages.append, | |
| ) | |
| commits = [] | |
| def add_commit(offsets, force=False): | |
| commits.append(offsets) | |
| # Create with 4 partitions/shards to test process sharing | |
| partitions = {Partition(topic, i): 0 for i in range(4)} | |
| step = fac.create_with_partitions(add_commit, partitions) | |
| # Verify that flusher uses at most 2 processes | |
| flusher = fac._flusher | |
| assert len(flusher.processes) == 2 | |
| assert flusher.max_processes == 2 | |
| assert flusher.num_processes == 2 | |
| # Verify shards are distributed across processes | |
| total_shards = sum(len(shards) for shards in flusher.process_to_shards_map.values()) | |
| assert total_shards == 4 # All 4 shards should be assigned | |
| @pytest.mark.django_db(transaction=True) | |
| def test_flusher_processes_limit(monkeypatch): | |
| """Test that flusher respects the max_processes limit""" | |
| # Flush very aggressively to make test pass instantly | |
| monkeypatch.setattr("time.sleep", lambda _: None) | |
| topic = Topic("test") | |
| messages: list[KafkaPayload] = [] | |
| # Create factory with limited flusher processes | |
| fac = ProcessSpansStrategyFactory( | |
| max_batch_size=10, | |
| max_batch_time=10, | |
| num_processes=1, | |
| input_block_size=None, | |
| output_block_size=None, | |
| flusher_processes=2, # Limit to 2 processes even if more shards | |
| produce_to_pipe=messages.append, | |
| ) | |
| commits = [] | |
| def add_commit(offsets, _force=False): | |
| commits.append(offsets) | |
| # Create with 4 partitions/shards to test process sharing | |
| partitions = {Partition(topic, i): 0 for i in range(4)} | |
| step = fac.create_with_partitions(add_commit, partitions) | |
| # Verify that flusher uses at most 2 processes | |
| flusher = fac._flusher | |
| assert len(flusher.processes) == 2 | |
| assert flusher.max_processes == 2 | |
| assert flusher.num_processes == 2 | |
| # Verify shards are distributed across processes | |
| total_shards = sum(len(shards) for shards in flusher.process_to_shards_map.values()) | |
| assert total_shards == 4 # All 4 shards should be assigned |
🧰 Tools
🪛 Ruff (0.14.10)
106-106: Unused function argument: force
(ARG001)
🤖 Prompt for AI Agents
In @tests/sentry/spans/consumers/process/test_consumer.py around lines 84 - 122,
The helper function add_commit in the test has an unused parameter force; either
remove the parameter entirely from add_commit(offsets) if no external caller
requires that signature, or rename it to _force (e.g., add_commit(offsets,
_force=False)) to signal it is intentionally unused; update any references if
needed and run tests to ensure signature expectations remain satisfied.
Test 6
Summary by CodeRabbit
New Features
Documentation
Tests
✏️ Tip: You can customize this high-level summary in your review settings.
Replicated from ai-code-review-evaluation/sentry-coderabbit#6