Skip to content

Span Buffer Multiprocess Enhancement with Health Monitoring#5

Open
Frawatson wants to merge 1 commit into
span-flusher-stablefrom
span-flusher-multiprocess
Open

Span Buffer Multiprocess Enhancement with Health Monitoring#5
Frawatson wants to merge 1 commit into
span-flusher-stablefrom
span-flusher-multiprocess

Conversation

@Frawatson

Copy link
Copy Markdown
Owner

Benchmark test PR #6 - Replicating Greptile benchmark for Grapple PR evaluation.

@grapple-pr grapple-pr Bot left a comment

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🤖 Grapple PR Review

Summary

This PR refactors the SpanFlusher from a single-process model to a multiprocess architecture where each shard (or group of shards) gets its own dedicated process, with a configurable --flusher-processes CLI option to cap the number of spawned processes. Health monitoring state (backpressure, healthy-since) is also migrated from single shared values to per-process dictionaries.

Changes: 6 files (+199 / -50 lines)

  • 🟡 6 files modified: CLAUDE.md, src/sentry/consumers/__init__.py, src/sentry/spans/consumers/process/factory.py, src/sentry/spans/consumers/process/flusher.py, tests/sentry/spans/consumers/process/test_consumer.py +1 more

Resolves: #6 (Replays Self-Serve Bulk Delete System)

CI: ⏳ 0 passed, 0 failed

Impact Diagram

graph TD
  pr_0[/"🔀 PR Changes: 6 files"/]
  f1["🟡 CLAUDE.md +11/-0"]
  pr_0 --> f1
  f2["🟡 __init__.py +9/-1"]
  pr_0 --> f2
  f3["🟡 factory.py +3/-0"]
  pr_0 --> f3
  f4["🟡 flusher.py +127/-47"]
  pr_0 --> f4
  f5["🟡 test_consumer.py +48/-1"]
  pr_0 --> f5
  f6["🟡 test_flusher.py +1/-1"]
  pr_0 --> f6
  f3 -.- f4
  f5 -.- f6
  style f1 fill:#7c3aed,color:#fff,stroke:#7c3aed
  style f2 fill:#7c3aed,color:#fff,stroke:#7c3aed
  style f3 fill:#7c3aed,color:#fff,stroke:#7c3aed
  style f4 fill:#7c3aed,color:#fff,stroke:#7c3aed
  style f5 fill:#7c3aed,color:#fff,stroke:#7c3aed
  style f6 fill:#7c3aed,color:#fff,stroke:#7c3aed
Loading

4 critical issues found — please address before merging.

Severity Count
🔴 Critical 4
🟠 Major 7
🟡 Minor 4
🔵 Info 4
Review details
  • Reviewed at: 2026-04-07T19:01:24.567Z
  • Agents used: security, logic, style, architecture, performance, verification
  • Context level: full
  • Execution time: 114.1s
  • Low-confidence findings suppressed: 3
  • Blast radius: 0 (contained)

Powered by Grapple PR — AI that understands your code.

process.start()
self.processes[process_index] = process
self.buffers[process_index] = shard_buffer

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🔴 CRITICAL — API contract violation (confidence: 100%)

The main static method signature changed to include a new shards parameter after buffer, but this is a positional argument change that breaks the calling convention. In _create_process_for_shards, shards is passed in the args tuple alongside stopped, current_drift, etc. However, main is invoked via partial(SpanFlusher.main, shard_buffer) for the threading path (line 103) and via _flusher_target for the multiprocessing path (line 98). The _flusher_target function is not shown in the diff — if it calls SpanFlusher.main with the old signature (without shards), this will crash at runtime. Additionally, shards is passed as the first positional arg in args (line 109), so the call becomes main(buffer, shards, stopped, ...) which matches the new signature, but any other caller of main using the old signature will break.

Evidence:

  • Line 109: args=(shards, self.stopped, self.current_drift, ...)shards is the first positional arg after buffer (which is bound via partial)
  • Line 118: def main(buffer, shards, stopped, ...) — new positional parameter shards added
  • The _flusher_target function referenced at line 98 is not visible in the diff — if it wraps main with the old signature, it will fail

Agent: architecture

self.buffer = buffer
self.next_step = next_step
self.max_processes = max_processes or len(buffer.assigned_shards)

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🔴 CRITICAL — Shared state not multiprocess-safe (confidence: 100%)

self.redis_was_full is a plain Python bool, not a multiprocessing.Value. In the new multiprocess architecture, each spawned process gets its own copy of memory, so this flag will never be shared between the parent process and the flusher worker processes. If backpressure detection relies on redis_was_full being set by the worker and read by the parent (or vice versa), it will silently fail. The old single-process model may have worked because it used threading, but with multiprocessing.get_context('spawn'), this is a fork of the process memory.

Evidence:

  • Line 52: self.redis_was_full = False — plain bool, not mp_context.Value
  • The stopped and current_drift fields correctly use mp_context.Value for cross-process sharing
  • With spawn context, child processes get independent copies of non-shared memory

Agent: architecture


self.mp_context = mp_context = multiprocessing.get_context("spawn")
self.stopped = mp_context.Value("i", 0)
self.redis_was_full = False

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🔴 CRITICAL — Concurrency issues (confidence: 100%)

self.redis_was_full is a plain Python bool (not a multiprocessing.Value). In the new multiprocess model, each spawned child process gets its own copy of memory — mutations to redis_was_full in a child process are invisible to the parent and to other child processes. Backpressure coordination that depends on this flag will silently break.

Evidence:

  • self.redis_was_full = False is assigned as a plain bool, not mp_context.Value('b', 0)
  • All other shared state (stopped, current_drift, process_backpressure_since, process_healthy_since) was correctly converted to mp_context.Value, making the omission of redis_was_full inconsistent and almost certainly unintentional
  • The PR risk areas explicitly call this out: 'self.redis_was_full remains a plain Python bool (not a multiprocessing.Value) — in a multiprocess model this will not be shared across processes, potentially breaking backpressure coordination'

Agent: performance

process_index = i % self.num_processes
self.process_to_shards_map[process_index].append(shard)

self.processes: dict[int, multiprocessing.context.SpawnProcess | threading.Thread] = {}

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🔴 CRITICAL — Concurrency issues (confidence: 100%)

When buffer.assigned_shards is empty, max_processes or len(buffer.assigned_shards) evaluates to 0, making num_processes = min(0, 0) = 0. The dict comprehension {i: [] for i in range(0)} produces an empty map, so no processes are created — but the subsequent _create_processes() call silently does nothing. More dangerously, process_healthy_since and process_backpressure_since are also empty dicts, meaning the health-monitoring loop in _ensure_processes_alive is a no-op and any consumer using this instance will never detect it is broken.

Evidence:

  • self.num_processes = min(self.max_processes, len(buffer.assigned_shards)) — when both operands are 0 the result is 0
  • range(0) produces an empty sequence, so process_to_shards_map, process_healthy_since, process_backpressure_since, and process_restarts are all empty
  • The edge-case section of the PR description flags this: 'verify this is handled gracefully' — no guard or early-return is present in the code

Agent: performance


def _create_process(self):
# Determine which shards get their own processes vs shared processes
self.num_processes = min(self.max_processes, len(buffer.assigned_shards))

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🟠 MAJOR — Edge case handling (confidence: 100%)

When buffer.assigned_shards is empty, max_processes or len(buffer.assigned_shards) evaluates to 0 (since max_processes would be 0 from the or branch), and min(0, 0) = 0, so num_processes = 0. This means process_to_shards_map, process_healthy_since, process_backpressure_since, and process_restarts are all empty dicts. While no processes would be created, downstream code in submit() that iterates over self.buffers.values() would do nothing, and the backpressure check loop would never trigger MessageRejected. Also _ensure_processes_alive would be a no-op, meaning no health monitoring. This silent degradation should be handled explicitly — either raise an error or log a warning.

Evidence:

  • Line 49: self.max_processes = max_processes or len(buffer.assigned_shards) — if both are 0, max_processes is 0
  • Line 56: self.num_processes = min(self.max_processes, len(buffer.assigned_shards)) — min(0, 0) = 0
  • Empty assigned_shards would create a flusher that does nothing silently

Agent: architecture


step.poll()
# Give flusher threads time to process after drift change
time.sleep(0.1)

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🟡 MINOR — Concurrency issues (confidence: 76%)

The time.sleep(0.1) added to test_basic is a timing-dependent stabilization hack. On a heavily loaded CI machine 100 ms may be insufficient, making the test flaky. On fast machines it is wasted time.

Evidence:

  • The PR description itself flags this: 'The time.sleep(0.1) added to test_basic is a timing-dependent fix — verify this is sufficient on slow CI machines and doesn't introduce flakiness'
  • The test patches time.sleep with a no-op via monkeypatch, but the production time.sleep(0.1) here uses the real time.sleep — this inconsistency means the wait is real but unpredictably sufficient

Agent: performance

self.process_healthy_since[process_index],
self.produce_to_pipe,
),
daemon=True,

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🔵 INFO — Unused method (confidence: 94%)

_create_process_for_shard (singular) is defined but never called anywhere in the diff. It appears to be a convenience method for restarting a single shard's process, but _ensure_processes_alive already calls _create_process_for_shards (plural) directly. This dead code adds confusion.

Evidence:

  • Lines 116-120: def _create_process_for_shard(self, shard: int) defined
  • No call to _create_process_for_shard found in the diff
  • _ensure_processes_alive at line 258 calls _create_process_for_shards directly

Agent: architecture

self.buffers[process_index] = shard_buffer

def _create_process_for_shard(self, shard: int):
# Find which process this shard belongs to and restart that process

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🔵 INFO — Naming Conventions (confidence: 89%)

Method name _create_process_for_shard is inconsistent with the actual implementation which takes a shard and calls _create_process_for_shards (plural).

Evidence:

  • Method signature is _create_process_for_shard(self, shard: int) (singular 'shard') but it calls _create_process_for_shards(process_index, shards) with multiple shards
  • The method finds the process_index for the shard and restarts the entire process handling all its shards, not just one shard
  • This is confusing because the name suggests it handles a single shard, but it actually restarts the process managing that shard's process_index

Agent: style

raise RuntimeError(f"flusher process crashed repeatedly ({cause}), restarting consumer")
for process_index, process in self.processes.items():
if not process:
continue

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🔵 INFO — Code Organization (confidence: 84%)

Inconsistent type checking pattern: using isinstance() for multiprocessing.Process check, but elsewhere in codebase hasattr() patterns exist. Per CLAUDE.md update, isinstance() is now the preferred pattern.

Evidence:

  • Line 221 uses isinstance(process, multiprocessing.Process) which is correct per the new CLAUDE.md guideline
  • However, the check should also handle the case where process is a threading.Thread (the else branch already exists, so this is fine)
  • This is consistent with the PR intent to prefer isinstance() over hasattr() for union type checks

Agent: style

stopped,
current_drift,
backpressure_since,
healthy_since,

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🔵 INFO — Documentation (confidence: 84%)

Comment on line 216 mentions 'flush the background threads' but the context is about setting the stopped flag. The comment could be clearer about the purpose of the flag in the multiprocess model.

Evidence:

  • Comment says 'set stopped flag first so we can "flush" the background threads while next_step is also shutting down'
  • In a multiprocess model, each process polls the stopped flag to know when to exit gracefully
  • A brief explanation of how each process uses the stopped flag would improve clarity

Agent: style

@grapple-pr grapple-pr Bot left a comment

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🔧 [Grapple PR] 7 auto-fix suggestions available

Apply individually or use GitHub's "Add suggestion to batch""Commit suggestions" to apply all at once.

Commits from Grapple PR are prefixed with [Grapple PR] for easy identification in merge conflicts.

process.start()
self.processes[process_index] = process
self.buffers[process_index] = shard_buffer

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[Grapple PR] Auto-fix — architecture agent (Small fix (3 lines, 1 file))

The main static method signature changed to include a new shards parameter after buffer, but this is a positional argument change that breaks the calling convention. In _create_process_for_shards, shards is passed in the args tuple alongside stopped, current_drift, etc. However, main is invoked via partial(SpanFlusher.main, shard_buffer) for the threading path (line 103) and via _flusher_target for the multiprocessing path (line 98). The _flusher_target function is not shown in the diff — if it calls SpanFlusher.main with the old signature (without shards), this will crash at runtime. Additionally, shards is passed as the first positional arg in args (line 109), so the call becomes main(buffer, shards, stopped, ...) which matches the new signature, but any other caller of main using the old signature will break.

Suggested change
shards: list[int],
run_with_initialized_sentry(SpanFlusher.main, buffer, shards, stopped, current_drift, backpressure_since, healthy_since, produce_to_pipe)

🤖 Grapple PR auto-fix • critical • confidence: 100%

self.buffer = buffer
self.next_step = next_step
self.max_processes = max_processes or len(buffer.assigned_shards)

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[Grapple PR] Auto-fix — architecture agent (Small fix (2 lines, 1 file))

self.redis_was_full is a plain Python bool, not a multiprocessing.Value. In the new multiprocess architecture, each spawned process gets its own copy of memory, so this flag will never be shared between the parent process and the flusher worker processes. If backpressure detection relies on redis_was_full being set by the worker and read by the parent (or vice versa), it will silently fail. The old single-process model may have worked because it used threading, but with multiprocessing.get_context('spawn'), this is a fork of the process memory.

Suggested change
self.redis_was_full = mp_context.Value("b", 0)

🤖 Grapple PR auto-fix • critical • confidence: 100%


self.mp_context = mp_context = multiprocessing.get_context("spawn")
self.stopped = mp_context.Value("i", 0)
self.redis_was_full = False

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[Grapple PR] Auto-fix — performance agent (Small fix (2 lines, 1 file))

self.redis_was_full is a plain Python bool (not a multiprocessing.Value). In the new multiprocess model, each spawned child process gets its own copy of memory — mutations to redis_was_full in a child process are invisible to the parent and to other child processes. Backpressure coordination that depends on this flag will silently break.

Suggested change
self.redis_was_full = False
self.redis_was_full = mp_context.Value("b", 0)

🤖 Grapple PR auto-fix • critical • confidence: 100%

next_step: ProcessingStrategy[FilteredPayload | int],
max_processes: int | None = None,
produce_to_pipe: Callable[[KafkaPayload], None] | None = None,
):

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[Grapple PR] Auto-fix — architecture agent (Small fix (2 lines, 1 file))

max_processes or len(buffer.assigned_shards) uses Python truthiness, meaning max_processes=0 is treated identically to max_processes=None. If a caller explicitly passes max_processes=0 intending 'zero processes' (which would be an error) or 'no limit', the behavior is ambiguous. The or operator cannot distinguish between None (not provided) and 0 (explicitly zero).

Suggested change
):
self.max_processes = max_processes if max_processes is not None else len(buffer.assigned_shards)

🤖 Grapple PR auto-fix • major • confidence: 100%


self._create_process_for_shards(process_index, shards)

def submit(self, message: Message[FilteredPayload | int]) -> None:

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[Grapple PR] Auto-fix — architecture agent (Small fix (5 lines, 1 file))

In _create_process_for_shards, when restarting a process (called from _ensure_processes_alive), the old process is killed but process.join() is never called before creating a new one. This can lead to zombie processes on Linux. The kill() sends SIGKILL but doesn't wait for the process to actually terminate. The new process is then immediately created and stored in self.processes[process_index], losing the reference to the old process without ensuring it has been reaped.

Suggested change
def submit(self, message: Message[FilteredPayload | int]) -> None:
# Join after kill to reap the child process and prevent
# zombie processes on Linux. SIGKILL is not deferrable so
# a short timeout is sufficient; if it somehow exceeds the
# timeout we still proceed (reference is overwritten below).
process.join(timeout=1.0)

🤖 Grapple PR auto-fix • major • confidence: 100%

):
metrics.incr("spans.buffer.flusher.backpressure")
raise MessageRejected()

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[Grapple PR] Auto-fix — performance agent (Small fix (10 lines, 1 file))

In submit(), buffer.record_stored_segments() is now called once per buffer (i.e., once per process). In the original code it was called once total. If the method performs a Redis round-trip or any non-trivial work, calling it N times per submit will multiply the overhead by num_processes, turning it into an O(N) operation on every message.

Suggested change
# Call record_stored_segments() exactly once per submit() — this is a
# global bookkeeping operation (e.g. a Redis metric write) that should
# not be multiplied by num_processes. Using the first available buffer
# is sufficient; all buffers share the same underlying Redis pool.
if self.buffers:
next(iter(self.buffers.values())).record_stored_segments()

🤖 Grapple PR auto-fix • major • confidence: 99%

metrics.timing(
"spans.buffer.segment_size_bytes",
len(kafka_payload.value),
tags={"shard": shard_tag},

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[Grapple PR] Auto-fix — architecture agent (Small fix (2 lines, 1 file))

Inconsistent metrics tag key: tags={"shards": shard_tag} (plural) vs tags={"shard": shard_tag} (singular) used elsewhere in the same method. This will create separate metric series for what should be the same dimension.

Suggested change
tags={"shard": shard_tag},
with metrics.timer("spans.buffer.flusher.wait_produce", tags={"shard": shard_tag}):

🤖 Grapple PR auto-fix • minor • confidence: 88%

process_index = i % self.num_processes
self.process_to_shards_map[process_index].append(shard)

self.processes: dict[int, multiprocessing.context.SpawnProcess | threading.Thread] = {}

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🟡 [Grapple PR] Suggested fix — performance agent (Larger fix (12 lines, 1 file) — review recommended)

When buffer.assigned_shards is empty, max_processes or len(buffer.assigned_shards) evaluates to 0, making num_processes = min(0, 0) = 0. The dict comprehension {i: [] for i in range(0)} produces an empty map, so no processes are created — but the subsequent _create_processes() call silently does nothing. More dangerously, process_healthy_since and process_backpressure_since are also empty dicts, meaning the health-monitoring loop in _ensure_processes_alive is a no-op and any consumer using this instance will never detect it is broken.

--- a/src/sentry/spans/consumers/process/flusher.py
+++ b/src/sentry/spans/consumers/process/flusher.py
@@ -57,6 +57,16 @@ def __init__(
         self.produce_to_pipe = produce_to_pipe
 
         # Determine which shards get their own processes vs shared processes
+        # Guard against the case where no shards are assigned (e.g. during
+        # rebalance or misconfiguration). With zero shards, num_processes would
+        # be 0, all tracking dicts would be empty, _create_processes() would be
+        # a silent no-op, and _ensure_processes_alive() would never fire —
+        # leaving the consumer permanently broken with no signal.
+        if not buffer.assigned_shards:
+            raise RuntimeError(
+                "SpanFlusher initialised with no assigned shards. "
+                "Ensure the consumer has completed partition assignment before "
+                "constructing SpanFlusher."
+            )
+
         self.num_processes = min(self.max_processes, len(buffer.assigned_shards))
         self.process_to_shards_map: dict[int, list[int]] = {
             i: [] for i in range(self.num_processes)

🤖 Grapple PR auto-fix • critical • Review this diff before applying


def _create_process(self):
# Determine which shards get their own processes vs shared processes
self.num_processes = min(self.max_processes, len(buffer.assigned_shards))

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🟡 [Grapple PR] Suggested fix — architecture agent (Larger fix (18 lines, 1 file) — review recommended)

When buffer.assigned_shards is empty, max_processes or len(buffer.assigned_shards) evaluates to 0 (since max_processes would be 0 from the or branch), and min(0, 0) = 0, so num_processes = 0. This means process_to_shards_map, process_healthy_since, process_backpressure_since, and process_restarts are all empty dicts. While no processes would be created, downstream code in submit() that iterates over self.buffers.values() would do nothing, and the backpressure check loop would never trigger MessageRejected. Also _ensure_processes_alive would be a no-op, meaning no health monitoring. This silent degradation should be handled explicitly — either raise an error or log a warning.

--- a/src/sentry/spans/consumers/process/flusher.py
+++ b/src/sentry/spans/consumers/process/flusher.py
@@ -44,6 +44,8 @@ class SpanFlusher(ProcessingStrategy[FilteredPayload | int]):
 
+logger = logging.getLogger(__name__)
+
 
     def __init__(
         self,
         buffer: SpansBuffer,
         next_step: ProcessingStrategy[FilteredPayload | int],
         max_processes: int | None = None,
         produce_to_pipe: Callable[[KafkaPayload], None] | None = None,
     ):
         self.next_step = next_step
         self.max_processes = max_processes or len(buffer.assigned_shards)
 
         self.mp_context = mp_context = multiprocessing.get_context("spawn")
         self.stopped = mp_context.Value("i", 0)
         self.redis_was_full = False
         self.current_drift = mp_context.Value("i", 0)
         self.produce_to_pipe = produce_to_pipe
 
         # Determine which shards get their own processes vs shared processes
         self.num_processes = min(self.max_processes, len(buffer.assigned_shards))
+
+        # NOTE: If assigned_shards is empty (e.g., during a rebalance) and max_processes
+        # is not explicitly set, num_processes will be 0. This means no flusher processes
+        # will be created, _ensure_processes_alive() will be a no-op, backpressure checks
+        # will never fire, and submit() will silently do nothing. This is safe during
+        # transient rebalancing windows, but if it persists it indicates a misconfiguration.
+        if self.num_processes == 0:
+            logger.warning(
+                "SpanFlusher initialized with no assigned shards — flusher will be a no-op. "
+                "This is expected during rebalancing but should resolve quickly. "
+                "If this persists, check that assigned_shards is populated before constructing SpanFlusher. "
+                "max_processes=%r, assigned_shards=%r",
+                max_processes,
+                list(buffer.assigned_shards),
+            )
+
         self.process_to_shards_map: dict[int, list[int]] = {

🤖 Grapple PR auto-fix • major • Review this diff before applying

backpressure_since,
healthy_since,
produce_to_pipe: Callable[[KafkaPayload], None] | None,
) -> None:

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🟡 [Grapple PR] Suggested fix — performance agent (Larger fix (12 lines, 1 file) — review recommended)

_create_process_for_shards creates a new SpansBuffer(shards) object for each process and stores it in self.buffers[process_index], but the main() static method receives shard_buffer (the locally-created buffer) via partial. The parent-process copy stored in self.buffers is a separate Python object. If the parent ever mutates a buffer (e.g., record_stored_segments) expecting the child to see the change, it won't — the child operates on a pickled copy received at spawn time.

--- a/src/sentry/spans/consumers/process/flusher.py
+++ b/src/sentry/spans/consumers/process/flusher.py
@@ -163,6 +163,10 @@ class SpanFlusher(ProcessingStrategy[FilteredPayload | int]):
                 else:
                     backpressure_since.value = 0
 
+                # record_stored_segments must be called in the child process
+                # that owns this buffer. The parent holds a separate copy after
+                # spawn-pickling and any mutation there would never be visible here.
+                buffer.record_stored_segments()
+
                 # Update healthy_since for all shards handled by this process
                 healthy_since.value = system_now
 
@@ -270,8 +274,10 @@ class SpanFlusher(ProcessingStrategy[FilteredPayload | int]):
 
         self._ensure_processes_alive()
 
-        for buffer in self.buffers.values():
-            buffer.record_stored_segments()
+        # NOTE: record_stored_segments() is intentionally NOT called here.
+        # self.buffers contains parent-side SpansBuffer objects that diverge
+        # from the child's copy immediately after process.start() (spawn
+        # pickling creates independent objects). Mutations here are invisible
+        # to the child. record_stored_segments() is called inside main() instead.
 
         # We pause insertion into Redis if the flusher is not making progress
         # fast enough. We could backlog into Redis, but we assume, despite best

🤖 Grapple PR auto-fix • major • Review this diff before applying

self.buffers: dict[int, SpansBuffer] = {}

self._create_processes()

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🟡 [Grapple PR] Suggested fix — architecture agent (Larger fix (11 lines, 1 file) — review recommended)

SpansBuffer(shards) is constructed directly inside the flusher for each process index. This assumes SpansBuffer can be constructed with just a list of shard IDs and will connect to the correct Redis instances. The original code received a fully-configured buffer from the factory. Creating new SpansBuffer instances inside the flusher breaks the inversion of control pattern — the factory should be responsible for constructing buffers. If SpansBuffer requires additional configuration (Redis connection params, cluster config, etc.), these per-process buffers may be misconfigured.

--- a/src/sentry/spans/consumers/process/flusher.py
+++ b/src/sentry/spans/consumers/process/flusher.py
@@ -44,6 +44,7 @@ def __init__(
         self,
         buffer: SpansBuffer,
         next_step: ProcessingStrategy[FilteredPayload | int],
+        max_processes: int | None = None,
         produce_to_pipe: Callable[[KafkaPayload], None] | None = None,
     ):
+        # Preserve the original fully-configured buffer so per-shard buffers
+        # can be constructed with the same configuration (Redis params, cluster
+        # config, etc.) rather than bare shard IDs.
+        self._source_buffer = buffer
         self.next_step = next_step
         self.max_processes = max_processes or len(buffer.assigned_shards)
 
@@ -86,8 +90,9 @@ def _create_process_for_shards(self, process_index: int, shards: list[int]):
         self.process_healthy_since[process_index].value = int(time.time())
 
-        # Create a buffer for these specific shards
-        shard_buffer = SpansBuffer(shards)
+        # Delegate buffer construction to the source buffer so that all
+        # configuration (Redis connection, cluster params, etc.) is inherited
+        # from the factory-provided buffer rather than re-specified here.
+        shard_buffer = self._source_buffer.for_shards(shards)
 
         make_process: Callable[..., multiprocessing.context.SpawnProcess | threading.Thread]

🤖 Grapple PR auto-fix • major • Review this diff before applying

max_memory_percentage = options.get("spans.buffer.max-memory-percentage")
if max_memory_percentage < 1.0:
memory_infos = list(self.buffer.get_memory_info())
memory_infos: list[ServiceMemory] = []

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🟡 [Grapple PR] Suggested fix — performance agent (Small fix (5 lines, 1 file))

In submit(), options.get('spans.buffer.flusher.backpressure-seconds') is fetched inside a loop over process_backpressure_since. This option lookup (which may hit a cache or do string hashing) is repeated N times per submit call when it only needs to be read once.

--- a/src/sentry/spans/consumers/process/flusher.py
+++ b/src/sentry/spans/consumers/process/flusher.py
@@ -275,9 +275,10 @@ def submit(self, message: Message[FilteredPayload | int]) -> None:
         backpressure_secs = options.get("spans.buffer.flusher.backpressure-seconds")
+        now = int(time.time())
         for backpressure_since in self.process_backpressure_since.values():
             if (
                 backpressure_since.value > 0
-                and int(time.time()) - backpressure_since.value > backpressure_secs
+                and now - backpressure_since.value > backpressure_secs
             ):
                 metrics.incr("spans.buffer.flusher.backpressure")
                 raise MessageRejected()
@@ -289,7 +290,7 @@ def submit(self, message: Message[FilteredPayload | int]) -> None:
         if isinstance(message.payload, int):
-            self.current_drift.value = drift = message.payload - int(time.time())
+            self.current_drift.value = drift = message.payload - now
             metrics.timing("spans.buffer.flusher.drift", drift)

🤖 Grapple PR auto-fix • minor • Review this diff before applying


self.process_restarts += 1
self._create_process()
try:

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🟡 [Grapple PR] Suggested fix — architecture agent (Small fix (4 lines, 1 file))

isinstance(process, multiprocessing.Process) may not match multiprocessing.context.SpawnProcess correctly. Since the process is created via self.mp_context.Process where mp_context = multiprocessing.get_context('spawn'), the resulting type is SpawnProcess, which is a subclass of multiprocessing.Process. This should work due to inheritance, but the type annotation says multiprocessing.context.SpawnProcess | threading.Thread — using multiprocessing.Process in the isinstance check is inconsistent with the type annotation. Notably, the CLAUDE.md update in this very PR advocates for using isinstance() over hasattr(), so the type used in isinstance should match the declared types.

--- a/src/sentry/spans/consumers/process/flusher.py
+++ b/src/sentry/spans/consumers/process/flusher.py
@@ -251,7 +251,7 @@ def _ensure_processes_alive(self) -> None:
 
             try:
-                if isinstance(process, multiprocessing.Process):
+                if isinstance(process, multiprocessing.context.SpawnProcess):
                     process.kill()
             except (ValueError, AttributeError):
                 pass  # Process already closed, ignore
@@ -323,7 +323,7 @@ def join(self, timeout: float | None = None):
 
-            if isinstance(process, multiprocessing.Process):
+            if isinstance(process, multiprocessing.context.SpawnProcess):
                 process.terminate()

🤖 Grapple PR auto-fix • minor • Review this diff before applying


step.poll()
# Give flusher threads time to process after drift change
time.sleep(0.1)

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🟡 [Grapple PR] Suggested fix — performance agent (Small fix (10 lines, 1 file))

The time.sleep(0.1) added to test_basic is a timing-dependent stabilization hack. On a heavily loaded CI machine 100 ms may be insufficient, making the test flaky. On fast machines it is wasted time.

--- a/tests/sentry/spans/consumers/process/test_consumer.py
+++ b/tests/sentry/spans/consumers/process/test_consumer.py
@@ -1,4 +1,3 @@
-import time
 from datetime import datetime
 
 import pytest
@@ -57,9 +57,13 @@ def add_commit(offsets, force=False):
     step.poll()
     fac._flusher.current_drift.value = 9000  # "advance" our "clock"
 
     step.poll()
-    # Give flusher threads time to process after drift change
-    time.sleep(0.1)
+    # Poll until the flusher produces output or we exceed a generous timeout.
+    # This replaces the fixed time.sleep(0.1) which is flaky on loaded CI machines.
+    import time as _real_time
+    deadline = _real_time.time() + 10  # 10-second hard ceiling
+    while not messages and _real_time.time() < deadline:
+        step.poll()
+        _real_time.sleep(0.01)
 
     step.join()

🤖 Grapple PR auto-fix • minor • Review this diff before applying

@Frawatson Frawatson closed this Apr 7, 2026
@Frawatson Frawatson reopened this Apr 7, 2026
@Frawatson Frawatson closed this Apr 8, 2026
@Frawatson Frawatson reopened this Apr 8, 2026
@Frawatson Frawatson closed this Apr 8, 2026
@Frawatson Frawatson reopened this Apr 8, 2026
@Frawatson Frawatson closed this Apr 8, 2026
@Frawatson Frawatson reopened this Apr 8, 2026

@grapple-pr grapple-pr Bot left a comment

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🤖 Grapple PR Review

Summary

This PR enhances the SpanFlusher to support multiprocess parallelism by creating one process per shard (up to a configurable maximum), replacing the single-process model with a pool of processes distributed across assigned shards. It also adds per-process health monitoring state (healthy_since, backpressure_since, restarts) and exposes a new --flusher-processes CLI option.

Changes: 6 files (+199 / -50 lines)

  • 🟡 6 files modified: CLAUDE.md, src/sentry/consumers/__init__.py, src/sentry/spans/consumers/process/factory.py, src/sentry/spans/consumers/process/flusher.py, tests/sentry/spans/consumers/process/test_consumer.py +1 more

Resolves: #6 (Replays Self-Serve Bulk Delete System)

CI: ⏳ 0 passed, 0 failed

Impact Diagram

graph TD
  pr_0[/"🔀 PR Changes: 6 files"/]
  f1["🟡 CLAUDE.md +11/-0"]
  pr_0 --> f1
  f2["🟡 __init__.py +9/-1"]
  pr_0 --> f2
  f3["🟡 factory.py +3/-0"]
  pr_0 --> f3
  f4["🟡 flusher.py +127/-47"]
  pr_0 --> f4
  f5["🟡 test_consumer.py +48/-1"]
  pr_0 --> f5
  f6["🟡 test_flusher.py +1/-1"]
  pr_0 --> f6
  f3 -.- f4
  f5 -.- f6
  style f1 fill:#7c3aed,color:#fff,stroke:#7c3aed
  style f2 fill:#7c3aed,color:#fff,stroke:#7c3aed
  style f3 fill:#7c3aed,color:#fff,stroke:#7c3aed
  style f4 fill:#7c3aed,color:#fff,stroke:#7c3aed
  style f5 fill:#7c3aed,color:#fff,stroke:#7c3aed
  style f6 fill:#7c3aed,color:#fff,stroke:#7c3aed
Loading

2 critical issues found — please address before merging.

Severity Count
🔴 Critical 2
🟠 Major 10
🟡 Minor 11
🔵 Info 5
Review details
  • Reviewed at: 2026-04-08T03:29:00.040Z
  • Agents used: security, logic, style, architecture, performance, verification
  • Context level: full
  • Execution time: 120.8s
  • Blast radius: 0 (contained)

Powered by Grapple PR — AI that understands your code.

self.next_step = next_step
self.max_processes = max_processes or len(buffer.assigned_shards)

self.mp_context = mp_context = multiprocessing.get_context("spawn")

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🔴 CRITICAL — API contract violation (confidence: 100%)

When buffer.assigned_shards is empty and max_processes is None, the expression max_processes or len(buffer.assigned_shards) evaluates to 0. This causes self.num_processes = min(self.max_processes, 0) to be 0, which then leads to a ZeroDivisionError at line 61 (i % self.num_processes) if any shards were somehow iterated. More practically, max_processes=0 passed explicitly would also resolve to len(buffer.assigned_shards) because 0 is falsy in Python, which is semantically incorrect — an explicit 0 should arguably mean 'disable' or raise an error, not 'use all shards'.

Evidence:

  • Line 53: self.max_processes = max_processes or len(buffer.assigned_shards) — the or operator treats 0 and None identically
  • Intent spec edge case explicitly calls this out: 'max_processes=0 explicitly passed: the max_processes or len(buffer.assigned_shards) expression would resolve to len(buffer.assigned_shards) due to 0 being falsy — this may be unintended and should be guarded'
  • If buffer.assigned_shards is also empty, max_processes resolves to 0, and line 61 i % self.num_processes would be a ZeroDivisionError

Agent: architecture

}
self.process_backpressure_since = {
process_index: mp_context.Value("i", 0) for process_index in range(self.num_processes)
}

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🔴 CRITICAL — Module boundaries / Missing state (confidence: 100%)

The old self.buffer attribute has been removed and replaced by self.buffers (a dict), but the SpansBuffer constructor is called with just SpansBuffer(shards) at line 109. The original self.buffer was passed in from the factory (constructed with Redis connection info, partition assignments, etc.), while the new per-process SpansBuffer(shards) is constructed with only a shard list. This means the new per-process buffers may not be properly configured with Redis connection parameters, TTL settings, or other configuration that the factory-provided buffer had. Additionally, any code outside this diff that still references self.buffer (singular) will raise AttributeError.

Evidence:

  • Old code: self.buffer = buffer where buffer was passed from factory.py line 68-69
  • New code at line 109: shard_buffer = SpansBuffer(shards) — constructs a new buffer with only shards, losing any configuration from the original buffer
  • The factory passes a fully-configured buffer object (line 68 of factory.py), but the flusher never stores it and instead creates new SpansBuffer instances
  • The buffer parameter in __init__ is only used for len(buffer.assigned_shards) — all other buffer config is lost

Agent: architecture

buffer: SpansBuffer,
next_step: ProcessingStrategy[FilteredPayload | int],
max_processes: int | None = None,
produce_to_pipe: Callable[[KafkaPayload], None] | None = None,

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🟠 MAJOR — Logic Error / Edge Case (confidence: 100%)

When max_processes=0 is explicitly passed, the expression max_processes or len(buffer.assigned_shards) treats 0 as falsy and silently falls back to the shard count. If 0 is intended to mean 'no processes' or 'disabled', this is a logic error. Even if 0 is not a meaningful value, the silent coercion masks misconfiguration.

Evidence:

  • Line: self.max_processes = max_processes or len(buffer.assigned_shards)
  • Python's or short-circuits on falsy values, so 0 or N evaluates to N.
  • A caller explicitly passing flusher_processes=0 (e.g. from CLI) would get all shards as processes instead of an error or zero processes.
  • The --flusher-processes CLI option is typed as int with no minimum constraint, making 0 a reachable value.

Agent: security

self.backpressure_since = mp_context.Value("i", 0)
self.healthy_since = mp_context.Value("i", 0)
self.process_restarts = 0
self.produce_to_pipe = produce_to_pipe

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🟠 MAJOR — Division by Zero / Infinite Loop (confidence: 100%)

When buffer.assigned_shards is empty, self.num_processes becomes min(max_processes, 0) == 0. The subsequent loop for i, shard in enumerate(buffer.assigned_shards) is fine (no iterations), but computing process_index = i % self.num_processes would raise ZeroDivisionError if any shard existed. More critically, self.max_processes or len(buffer.assigned_shards) on line 48 would resolve to 0 (falsy 0 → still 0, since both are 0), and min(0, 0) == 0. This is safe in practice only because no shards are iterated, but the code contains no explicit guard and relies on the loop not executing — a fragile assumption.

Evidence:

  • self.num_processes = min(self.max_processes, len(buffer.assigned_shards)) → 0 when shards is empty
  • process_index = i % self.num_processes would be a ZeroDivisionError if this loop body were reached with num_processes==0
  • The loop is safe today only because empty assigned_shards means zero iterations, but a code refactor could break this invariant
  • No explicit guard like if self.num_processes == 0: return exists

Agent: security

f"flusher process for shards {shards} crashed repeatedly ({cause}), restarting consumer"
)
self.process_restarts[process_index] += 1

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🟠 MAJOR — State Integrity / AttributeError Risk (confidence: 100%)

The submit method iterates self.buffers.values() to call record_stored_segments(). If _create_processes() partially fails (e.g., SpansBuffer construction throws for one shard group), self.buffers may be partially populated while self.processes and self.process_to_shards_map are out of sync. There is no transactional initialization, so a partially initialized flusher could be used.

Evidence:

  • self.buffers: dict[int, SpansBuffer] = {} initialized empty on line 65
  • _create_process_for_shards populates self.buffers[process_index] after calling SpansBuffer(shards) and spawning the process
  • If process spawning fails mid-loop in _create_processes, some process_indices will be in process_to_shards_map but absent from self.buffers
  • submit iterating self.buffers.values() would silently skip those shards rather than raising an error, leading to missing record_stored_segments calls

Agent: security

self.backpressure_since = mp_context.Value("i", 0)
self.healthy_since = mp_context.Value("i", 0)
self.process_restarts = 0
self.produce_to_pipe = produce_to_pipe

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🔵 INFO — Determinism / Shard Distribution (confidence: 100%)

Round-robin shard distribution using i % num_processes assumes buffer.assigned_shards has a deterministic, stable ordering across process restarts and rebalances. If the ordering varies (e.g., from a set or dict), shards may be reassigned to different processes on restart, breaking per-process locality assumptions.

Evidence:

  • for i, shard in enumerate(buffer.assigned_shards) — ordering depends on the container type of assigned_shards
  • If assigned_shards is a set, iteration order is not guaranteed across Python versions or hash seeds
  • Shard reassignment on restart could cause a process to flush segments it has no warm cache for

Agent: security


step.join()

(msg,) = messages

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🔵 INFO — Code organization (confidence: 100%)

Timing-dependent test workaround: the added time.sleep(0.1) on line 66 is a hardcoded sleep to ensure drift changes are observed. This may be flaky in slow CI environments or on under-resourced machines, causing intermittent failures.

Evidence:

  • The comment says 'Give flusher threads time to process after drift change', which is a timing assumption
  • 0.1 second is arbitrary and may not be sufficient on slow CI systems
  • The monkeypatch on line 11 already mocks time.sleep, so this sleep was explicitly restored, indicating a race condition

Agent: style

default=1,
type=int,
help="Maximum number of processes for the span flusher. Defaults to 1.",
),

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🔵 INFO — Cross-service impact (confidence: 89%)

The new --flusher-processes CLI option defaults to 1, which maintains backward compatibility. However, the ProcessSpansStrategyFactory passes this as flusher_processes (keyword arg), while SpanFlusher receives it as max_processes. The naming inconsistency between CLI (flusher_processes), factory (flusher_processes), and flusher (max_processes) could cause confusion in future maintenance.

Evidence:

  • Line 433: CLI option named flusher_processes
  • factory.py line 41: stored as self.flusher_processes
  • factory.py line 73: passed as max_processes=self.flusher_processes
  • flusher.py line 48: received as max_processes

Agent: architecture

self.buffers[process_index] = shard_buffer

def _create_process_for_shard(self, shard: int):
# Find which process this shard belongs to and restart that process

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🔵 INFO — Naming conventions (confidence: 84%)

Method naming inconsistency: _create_process_for_shard() (singular) on line 128 is never called and appears to be dead code, while the working method is _create_process_for_shards() (plural) on line 91. This is confusing and the unused method should be removed or clarified.

Evidence:

  • Line 91: def _create_process_for_shards(self, process_index: int, shards: list[int]): — used in _create_processes() and restart logic
  • Line 128: def _create_process_for_shard(self, shard: int): — never called, only iterates and calls _create_process_for_shards()
  • The _create_process_for_shard method is a thin wrapper that appears to be leftover from refactoring

Agent: style

"""
A background thread that polls Redis for new segments to flush and to produce to Kafka.
A background multiprocessing manager that polls Redis for new segments to flush and to produce to Kafka.
Creates one process per shard for parallel processing.

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🔵 INFO — Documentation (confidence: 75%)

Docstring is updated to mention multiprocessing, but does not document the new max_processes parameter or explain the per-process health monitoring behavior. This makes it unclear how the new feature works for future maintainers.

Evidence:

  • Docstring now says 'Creates one process per shard for parallel processing' but does not document when this is limited by max_processes
  • New parameter max_processes and per-process health state (process_healthy_since, process_backpressure_since, process_restarts) are not documented in the docstring

Agent: style

@grapple-pr grapple-pr Bot left a comment

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🔧 [Grapple PR] 6 auto-fix suggestions available

Apply individually or use GitHub's "Add suggestion to batch""Commit suggestions" to apply all at once.

Commits from Grapple PR are prefixed with [Grapple PR] for easy identification in merge conflicts.

buffer: SpansBuffer,
next_step: ProcessingStrategy[FilteredPayload | int],
max_processes: int | None = None,
produce_to_pipe: Callable[[KafkaPayload], None] | None = None,

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[Grapple PR] Auto-fix — security agent (Small fix (7 lines, 1 file))

When max_processes=0 is explicitly passed, the expression max_processes or len(buffer.assigned_shards) treats 0 as falsy and silently falls back to the shard count. If 0 is intended to mean 'no processes' or 'disabled', this is a logic error. Even if 0 is not a meaningful value, the silent coercion masks misconfiguration.

Suggested change
produce_to_pipe: Callable[[KafkaPayload], None] | None = None,
if max_processes is not None and max_processes < 1:
raise ValueError(
f"max_processes must be a positive integer or None, got {max_processes!r}. "
"Pass None to default to one process per assigned shard."
)
self.max_processes = max_processes if max_processes is not None else len(buffer.assigned_shards)

🤖 Grapple PR auto-fix • major • confidence: 100%

):
metrics.incr("spans.buffer.flusher.backpressure")
raise MessageRejected()

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[Grapple PR] Auto-fix — performance agent (Small fix (9 lines, 1 file))

record_stored_segments() is called on every buffer in the pool on every submit(), even when most buffers may have had no activity. With N processes, this multiplies the Redis/storage calls by N on every message submission.

Suggested change
for process_index, buffer in self.buffers.items():
process = self.processes.get(process_index)
# Only record stored segments for buffers whose process is alive;
# a dead or restarting process cannot have produced new stored segments,
# so calling record_stored_segments() on it would be unnecessary Redis work.
if process is not None and process.is_alive():
buffer.record_stored_segments()

🤖 Grapple PR auto-fix • major • confidence: 100%

):
self.buffer = buffer
self.next_step = next_step
self.max_processes = max_processes or len(buffer.assigned_shards)

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[Grapple PR] Auto-fix — style agent (Small fix (2 lines, 1 file))

Logic error in max_processes default: max_processes or len(buffer.assigned_shards) treats explicit 0 as falsy and falls back to all shards. If 0 is intended to disable multiprocessing, this silently converts it to the opposite behavior.

Suggested change
self.max_processes = max_processes or len(buffer.assigned_shards)
self.max_processes = len(buffer.assigned_shards) if max_processes is None else max_processes

🤖 Grapple PR auto-fix • minor • confidence: 100%

except ValueError:
pass # Process already closed, ignore
if self.process_restarts[process_index] > MAX_PROCESS_RESTARTS:
raise RuntimeError(

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[Grapple PR] Auto-fix — performance agent (Small fix (5 lines, 1 file))

When a process is restarted via _create_process_for_shards(), the old process object is replaced in self.processes[process_index] without joining or cleaning up the old process. Dead process objects accumulate OS resources (file descriptors, zombie process entries) until the OS reaps them.

Suggested change
raise RuntimeError(
# Join after kill to release OS resources (file descriptors,
# zombie process entries). Python's multiprocessing docs require
# join() after kill()/terminate() to avoid resource leaks.
# Use a short timeout since SIGKILL should terminate immediately.
process.join(timeout=1)

🤖 Grapple PR auto-fix • minor • confidence: 100%

@@ -134,23 +175,28 @@ def produce(payload: KafkaPayload) -> None:
else:

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[Grapple PR] Auto-fix — style agent (Small fix (2 lines, 1 file))

Inconsistent metric tag key: 'shards' vs 'shard'. The metrics.timer() call on line 240 uses 'shards' (plural) while other metrics use 'shard' (singular), creating inconsistency in metric dimensions.

Suggested change
else:
with metrics.timer("spans.buffer.flusher.wait_produce", tags={"shard": shard_tag}):

🤖 Grapple PR auto-fix • minor • confidence: 98%


step.poll()
# Give flusher threads time to process after drift change
time.sleep(0.1)

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[Grapple PR] Auto-fix — architecture agent (Small fix (9 lines, 1 file))

The time.sleep(0.1) added to test_basic is a timing-dependent workaround that may be flaky in slow CI environments. The test previously relied on monkeypatch.setattr('time.sleep', lambda _: None) to make sleep a no-op, but this new time.sleep(0.1) is a REAL sleep (the monkeypatch patches the module-level time.sleep used by the flusher code, but this test file imports time directly at the top). While it works, it's fragile — a better approach would be to poll for the expected condition with a timeout.

Suggested change
time.sleep(0.1)
# Poll until the flusher thread produces a message, with a timeout.
# time.sleep is monkeypatched for the flusher module, but we use the
# real time.sleep here (via the local import) for our polling loop.
deadline = time.time() + 5.0
while not messages and time.time() < deadline:
time.sleep(0.01)
assert messages, "Timed out waiting for flusher to produce a message"

🤖 Grapple PR auto-fix • minor • confidence: 98%

self.next_step = next_step
self.max_processes = max_processes or len(buffer.assigned_shards)

self.mp_context = mp_context = multiprocessing.get_context("spawn")

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🟡 [Grapple PR] Suggested fix — architecture agent (Larger fix (17 lines, 1 file) — review recommended)

When buffer.assigned_shards is empty and max_processes is None, the expression max_processes or len(buffer.assigned_shards) evaluates to 0. This causes self.num_processes = min(self.max_processes, 0) to be 0, which then leads to a ZeroDivisionError at line 61 (i % self.num_processes) if any shards were somehow iterated. More practically, max_processes=0 passed explicitly would also resolve to len(buffer.assigned_shards) because 0 is falsy in Python, which is semantically incorrect — an explicit 0 should arguably mean 'disable' or raise an error, not 'use all shards'.

--- a/src/sentry/spans/consumers/process/flusher.py
+++ b/src/sentry/spans/consumers/process/flusher.py
@@ -49,7 +49,18 @@ class SpanFlusher(ProcessingStrategy[FilteredPayload | int]):
         produce_to_pipe: Callable[[KafkaPayload], None] | None = None,
     ):
         self.next_step = next_step
-        self.max_processes = max_processes or len(buffer.assigned_shards)
+
+        if max_processes is not None and max_processes <= 0:
+            raise ValueError(
+                f"max_processes must be a positive integer or None, got {max_processes!r}"
+            )
+
+        # Use max_processes if explicitly provided; otherwise default to the number of assigned shards.
+        # We cannot use `max_processes or len(...)` here because 0 is falsy in Python and would
+        # be treated the same as None, silently bypassing the explicit argument.
+        self.max_processes = max_processes if max_processes is not None else len(buffer.assigned_shards)
+
+        if self.max_processes == 0:
+            raise ValueError(
+                "max_processes resolved to 0 (no assigned shards and max_processes not provided). "
+                "Ensure the buffer has at least one assigned shard before constructing SpanFlusher."
+            )
 
         self.mp_context = mp_context = multiprocessing.get_context("spawn")
         self.stopped = mp_context.Value("i", 0)

🤖 Grapple PR auto-fix • critical • Review this diff before applying

}
self.process_backpressure_since = {
process_index: mp_context.Value("i", 0) for process_index in range(self.num_processes)
}

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🟡 [Grapple PR] Suggested fix — architecture agent (Small fix (6 lines, 1 file))

The old self.buffer attribute has been removed and replaced by self.buffers (a dict), but the SpansBuffer constructor is called with just SpansBuffer(shards) at line 109. The original self.buffer was passed in from the factory (constructed with Redis connection info, partition assignments, etc.), while the new per-process SpansBuffer(shards) is constructed with only a shard list. This means the new per-process buffers may not be properly configured with Redis connection parameters, TTL settings, or other configuration that the factory-provided buffer had. Additionally, any code outside this diff that still references self.buffer (singular) will raise AttributeError.

--- a/src/sentry/spans/consumers/process/flusher.py
+++ b/src/sentry/spans/consumers/process/flusher.py
@@ -43,6 +43,7 @@ def __init__(
         buffer: SpansBuffer,
         next_step: ProcessingStrategy[FilteredPayload | int],
         max_processes: int | None = None,
         produce_to_pipe: Callable[[KafkaPayload], None] | None = None,
     ):
+        self.buffer = buffer  # Keep reference to factory-configured buffer for config reuse
         self.next_step = next_step
         self.max_processes = max_processes or len(buffer.assigned_shards)
 
@@ -77,6 +78,7 @@ def __init__(
         self.process_restarts = {process_index: 0 for process_index in range(self.num_processes)}
         self.buffers: dict[int, SpansBuffer] = {}
 
         self._create_processes()
 
     def _create_processes(self):
@@ -88,8 +90,8 @@ def _create_process_for_shards(self, process_index: int, shards: list[int]):
         self.process_healthy_since[process_index].value = int(time.time())
 
-        # Create a buffer for these specific shards
-        shard_buffer = SpansBuffer(shards)
+        # Derive a per-shard buffer from the factory-configured buffer to preserve
+        # Redis connection parameters, TTL settings, and other configuration.
+        shard_buffer = self.buffer.slice_for_shards(shards)
 
         make_process: Callable[..., multiprocessing.context.SpawnProcess | threading.Thread]
         if self.produce_to_pipe is None:

🤖 Grapple PR auto-fix • critical • Review this diff before applying

self.backpressure_since = mp_context.Value("i", 0)
self.healthy_since = mp_context.Value("i", 0)
self.process_restarts = 0
self.produce_to_pipe = produce_to_pipe

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🟡 [Grapple PR] Suggested fix — security agent (Larger fix (15 lines, 1 file) — review recommended)

When buffer.assigned_shards is empty, self.num_processes becomes min(max_processes, 0) == 0. The subsequent loop for i, shard in enumerate(buffer.assigned_shards) is fine (no iterations), but computing process_index = i % self.num_processes would raise ZeroDivisionError if any shard existed. More critically, self.max_processes or len(buffer.assigned_shards) on line 48 would resolve to 0 (falsy 0 → still 0, since both are 0), and min(0, 0) == 0. This is safe in practice only because no shards are iterated, but the code contains no explicit guard and relies on the loop not executing — a fragile assumption.

--- a/src/sentry/spans/consumers/process/flusher.py
+++ b/src/sentry/spans/consumers/process/flusher.py
@@ -58,6 +58,14 @@ class SpanFlusher(ProcessingStrategy[FilteredPayload | int]):
         # Determine which shards get their own processes vs shared processes
         self.num_processes = min(self.max_processes, len(buffer.assigned_shards))
+
+        # Explicit guard: if there are no assigned shards (e.g. during rebalance or
+        # initial startup), num_processes will be 0. The loop below would raise
+        # ZeroDivisionError on `i % self.num_processes` if any shard existed with a
+        # zero denominator. We initialise empty maps and return early so callers
+        # (_create_processes, submit, join) all operate on empty collections safely.
+        if self.num_processes == 0:
+            self.process_to_shards_map: dict[int, list[int]] = {}
+            self.processes: dict[int, multiprocessing.context.SpawnProcess | threading.Thread] = {}
+            self.process_healthy_since = {}
+            self.process_backpressure_since = {}
+            self.process_restarts: dict[int, int] = {}
+            self.buffers: dict[int, SpansBuffer] = {}
+            return
+
         self.process_to_shards_map: dict[int, list[int]] = {
             i: [] for i in range(self.num_processes)
         }

🤖 Grapple PR auto-fix • major • Review this diff before applying

f"flusher process for shards {shards} crashed repeatedly ({cause}), restarting consumer"
)
self.process_restarts[process_index] += 1

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🟡 [Grapple PR] Suggested fix — security agent (Larger fix (36 lines, 1 file) — review recommended)

The submit method iterates self.buffers.values() to call record_stored_segments(). If _create_processes() partially fails (e.g., SpansBuffer construction throws for one shard group), self.buffers may be partially populated while self.processes and self.process_to_shards_map are out of sync. There is no transactional initialization, so a partially initialized flusher could be used.

--- a/src/sentry/spans/consumers/process/flusher.py
+++ b/src/sentry/spans/consumers/process/flusher.py
@@ -100,13 +100,22 @@ def _create_process_for_shards(self, process_index: int, shards: list[int]):
         # pickled separately. at the same time, pickling
         # synchronization primitives like multiprocessing.Value can
         # only be done by the Process
-        shard_buffer = SpansBuffer(shards)
+
+        # Construct the buffer first — if this raises, we haven't touched
+        # self.buffers or self.processes yet, so state stays consistent.
+        shard_buffer = SpansBuffer(shards)  # may raise; intentionally before any mutation
 
         make_process: Callable[..., multiprocessing.context.SpawnProcess | threading.Thread]
         if self.produce_to_pipe is None:
             target = partial(
                 run_with_initialized_sentry,
                 SpanFlusher.main,
+                # buffer is passed here so it can be
+                # pickled separately. at the same time, pickling
+                # synchronization primitives like multiprocessing.Value can
+                # only be done by the Process
                 shard_buffer,
             )
             make_process = self.mp_context.Process
         else:
             target = partial(SpanFlusher.main, shard_buffer)
             make_process = threading.Thread
 
-        process = make_process(
+        # Build the process object before mutating shared state.
+        process = make_process(
             target=target,
             args=(
                 shards,
                 self.stopped,
                 self.current_drift,
                 self.process_backpressure_since[process_index],
                 self.process_healthy_since[process_index],
                 self.produce_to_pipe,
             ),
             daemon=True,
         )
 
-        process.start()
-        self.processes[process_index] = process
-        self.buffers[process_index] = shard_buffer
+        # start() is the last thing that can fail before we commit.
+        # If start() raises, neither dict is updated, keeping them in sync.
+        process.start()
+
+        # Commit both atomically (single-threaded context — no lock needed).
+        # Both dicts must always have the same set of keys.
+        self.processes[process_index] = process
+        self.buffers[process_index] = shard_buffer
 
     def _create_process_for_shard(self, shard: int):
         # Find which process this shard belongs to and restart that process
@@ -261,6 +280,17 @@ def submit(self, message: Message[FilteredPayload | int]) -> None:
 
         self._ensure_processes_alive()
 
+        # Guard against partial initialization: every process_index in
+        # process_to_shards_map must have a corresponding buffer.  If not,
+        # _create_processes() failed mid-loop and the flusher is unusable.
+        # Raise loudly rather than silently skipping record_stored_segments()
+        # for the missing shards (which would cause data-loss style bugs).
+        missing = self.process_to_shards_map.keys() - self.buffers.keys()
+        if missing:
+            raise RuntimeError(
+                f"SpanFlusher is partially initialized: buffers missing for "
+                f"process indices {sorted(missing)}. Initialization must have "
+                f"failed mid-loop in _create_processes()."
+            )
+
         for buffer in self.buffers.values():
             buffer.record_stored_segments()

🤖 Grapple PR auto-fix • major • Review this diff before applying

f"flusher process for shards {shards} crashed repeatedly ({cause}), restarting consumer"
)
self.process_restarts[process_index] += 1

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🟡 [Grapple PR] Suggested fix — architecture agent (Larger fix (26 lines, 1 file) — review recommended)

The _ensure_processes_alive method uses isinstance(process, multiprocessing.Process) to check the process type before calling .kill(), but the actual type of spawned processes is multiprocessing.context.SpawnProcess (created by self.mp_context.Process where mp_context is get_context('spawn')). SpawnProcess is a subclass of multiprocessing.Process, so isinstance will work. However, the CLAUDE.md addition in this same PR recommends using isinstance() over hasattr(), yet the old code at line 269 in join() does the same isinstance(process, multiprocessing.Process) check. The inconsistency is minor, but there's a correctness issue: when process type is threading.Thread, .kill() is silently skipped but the process is never terminated — the thread will run until self.stopped is set, but during health-check restarts, stopped is NOT set before restarting, so the old thread continues running alongside the new one.

--- a/src/sentry/spans/consumers/process/flusher.py
+++ b/src/sentry/spans/consumers/process/flusher.py
@@ -44,6 +44,7 @@ def __init__(
         self.processes: dict[int, multiprocessing.context.SpawnProcess | threading.Thread] = {}
         self.process_healthy_since = {
             process_index: mp_context.Value("i", int(time.time()))
             for process_index in range(self.num_processes)
         }
         self.process_backpressure_since = {
             process_index: mp_context.Value("i", 0) for process_index in range(self.num_processes)
         }
         self.process_restarts = {process_index: 0 for process_index in range(self.num_processes)}
         self.buffers: dict[int, SpansBuffer] = {}
+        # Per-process stop events for threading.Thread workers (mp workers use .kill() instead)
+        self.process_stop_events: dict[int, threading.Event] = {}
 
         self._create_processes()
 
@@ -100,6 +101,14 @@ def _create_process_for_shards(self, process_index: int, shards: list[int]):
         self.process_healthy_since[process_index].value = int(time.time())
 
         # Create a buffer for these specific shards
         shard_buffer = SpansBuffer(shards)
 
+        # Create a per-process stop event for thread workers so old threads
+        # can be signaled to exit independently during restarts (without
+        # touching the global self.stopped flag, which would kill all workers).
+        process_stop_event = threading.Event()
+        self.process_stop_events[process_index] = process_stop_event
+
         make_process: Callable[..., multiprocessing.context.SpawnProcess | threading.Thread]
         if self.produce_to_pipe is None:
             target = partial(
@@ -117,6 +126,7 @@ def _create_process_for_shards(self, process_index: int, shards: list[int]):
         process = make_process(
             target=target,
             args=(
                 shards,
                 self.stopped,
                 self.current_drift,
                 self.process_backpressure_since[process_index],
                 self.process_healthy_since[process_index],
                 self.produce_to_pipe,
+                process_stop_event,
             ),
             daemon=True,
         )
@@ -143,6 +153,7 @@ def main(
         stopped,
         current_drift,
         backpressure_since,
         healthy_since,
         produce_to_pipe: Callable[[KafkaPayload], None] | None,
+        process_stop_event: threading.Event | None = None,
     ) -> None:
         shard_tag = ",".join(map(str, shards))
         sentry_sdk.set_tag("sentry_spans_buffer_component", "flusher")
@@ -163,7 +174,10 @@ def produce(payload: KafkaPayload) -> None:
             while True:
-                if stopped.value:
+                # For thread workers, also check the per-process stop event so
+                # that a single thread can be stopped during a health-check
+                # restart without affecting the global stopped flag.
+                if stopped.value or (process_stop_event is not None and process_stop_event.is_set()):
                     break
 
                 system_now = int(time.time())
@@ -253,6 +267,15 @@ def _ensure_processes_alive(self) -> None:
             try:
                 if isinstance(process, multiprocessing.Process):
                     process.kill()
+                elif isinstance(process, threading.Thread):
+                    # Signal the old thread to stop via its per-process event.
+                    # We cannot use self.stopped here — it would stop ALL workers.
+                    # The thread's main loop checks process_stop_event and will
+                    # exit cleanly on the next iteration (within ~1s sleep).
+                    stop_event = self.process_stop_events.get(process_index)
+                    if stop_event is not None:
+                        stop_event.set()
+                    # Note: we do NOT join here to avoid blocking the consumer
+                    # thread. The daemon=True flag ensures the old thread won't
+                    # prevent interpreter shutdown if it lingers briefly.
             except (ValueError, AttributeError):
                 pass  # Process already closed, ignore

🤖 Grapple PR auto-fix • major • Review this diff before applying


self.mp_context = mp_context = multiprocessing.get_context("spawn")
self.stopped = mp_context.Value("i", 0)
self.redis_was_full = False

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🟡 [Grapple PR] Suggested fix — architecture agent (Larger fix (18 lines, 1 file) — review recommended)

The current_drift value is shared as a single multiprocessing.Value across all processes. All flusher processes read from this same value to determine what to flush. While this is intentional for coordinating the clock, it means all processes flush at the same drift point simultaneously. Combined with the fact that each process independently calls buffer.flush_segments() on its own shards, there could be a thundering herd effect where all processes hit Redis simultaneously at each drift tick. This was not a concern with a single process but becomes relevant with N processes.

--- a/src/sentry/spans/consumers/process/flusher.py
+++ b/src/sentry/spans/consumers/process/flusher.py
@@ -95,6 +95,8 @@ def _create_process_for_shards(self, process_index: int, shards: list[int]):
         make_process: Callable[..., multiprocessing.context.SpawnProcess | threading.Thread]
         if self.produce_to_pipe is None:
             target = partial(
+                # process_index and num_processes are passed so each process
+                # can stagger its flush relative to other processes.
                 SpanFlusher._main_with_sentry,
                 shard_buffer,
             )
@@ -106,6 +108,8 @@ def _create_process_for_shards(self, process_index: int, shards: list[int]):
         process = make_process(
             target=target,
             args=(
+                process_index,
+                self.num_processes,
                 shards,
                 self.stopped,
                 self.current_drift,
@@ -121,6 +125,8 @@ def _create_process_for_shards(self, process_index: int, shards: list[int]):
     @staticmethod
     def main(
         buffer: SpansBuffer,
+        process_index: int,
+        num_processes: int,
         shards: list[int],
         stopped,
         current_drift,
@@ -129,6 +135,8 @@ def main(
         produce_to_pipe: Callable[[KafkaPayload], None] | None,
     ) -> None:
         shard_tag = ",".join(map(str, shards))
+        # Each process uses a staggered flush offset to avoid thundering herd
+        # on Redis when all processes share the same current_drift value.
         sentry_sdk.set_tag("sentry_spans_buffer_component", "flusher")
         sentry_sdk.set_tag("sentry_spans_buffer_shards", shard_tag)
 
@@ -148,7 +156,12 @@ def produce(payload: KafkaPayload) -> None:
                 last_drift = current_drift.value
 
             while True:
-                if current_drift.value == last_drift:
+                # Stagger flushes across processes: process i only flushes when
+                # drift has advanced AND (drift % num_processes == process_index % num_processes).
+                # This distributes Redis load evenly across the drift interval
+                # instead of all N processes flushing simultaneously on each tick.
+                new_drift = current_drift.value
+                flush_slot = new_drift % num_processes == process_index % num_processes
+                if new_drift == last_drift or not flush_slot:
                     break
-                last_drift = current_drift.value
+                last_drift = new_drift
 
                 system_now = int(time.time())

🤖 Grapple PR auto-fix • major • Review this diff before applying

pass # Process already closed, ignore

self._create_process_for_shards(process_index, shards)

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🟡 [Grapple PR] Suggested fix — performance agent (Larger fix (14 lines, 1 file) — review recommended)

_ensure_processes_alive() iterates over all processes on every submit() call, performing time.time() syscalls and shared-memory reads (Value.value) for each process. With a large number of processes this becomes O(N) overhead on the hot path.

--- a/src/sentry/spans/consumers/process/flusher.py
+++ b/src/sentry/spans/consumers/process/flusher.py
@@ -44,6 +44,7 @@ def __init__(
         self.redis_was_full = False
         self.current_drift = mp_context.Value("i", 0)
         self.produce_to_pipe = produce_to_pipe
+        self._last_process_health_check: float = 0.0
 
         # Determine which shards get their own processes vs shared processes
         self.num_processes = min(self.max_processes, len(buffer.assigned_shards))
@@ -218,6 +218,14 @@ def poll(self) -> None:
 
     def _ensure_processes_alive(self) -> None:
         max_unhealthy_seconds = options.get("spans.buffer.flusher.max-unhealthy-seconds")
+        now = time.time()
+        # Throttle health checks to once per second. Checking on every submit()
+        # call is unnecessary: each check crosses a shared-memory boundary
+        # (mmap/IPC) for every process's Value.value read and calls is_alive()
+        # (a syscall) per process. A 1-second interval is fine because
+        # max_unhealthy_seconds is O(tens of seconds) in practice, so we lose
+        # at most 1 second of detection accuracy while eliminating O(N) IPC
+        # overhead from the submit() hot path.
+        if now - self._last_process_health_check < 1.0:
+            return
+        self._last_process_health_check = now
 
         for process_index, process in self.processes.items():
             if not process:
@@ -226,7 +234,7 @@ def _ensure_processes_alive(self) -> None:
             shards = self.process_to_shards_map[process_index]
 
             cause = None
             if not process.is_alive():
                 exitcode = getattr(process, "exitcode", "unknown")
                 cause = f"no_process_{exitcode}"
             elif (
-                int(time.time()) - self.process_healthy_since[process_index].value
+                int(now) - self.process_healthy_since[process_index].value
                 > max_unhealthy_seconds
             ):
                 # Check if any shard handled by this process is unhealthy

🤖 Grapple PR auto-fix • major • Review this diff before applying

@@ -225,7 +296,9 @@ def submit(self, message: Message[FilteredPayload | int]) -> None:
# wait until the situation is improved manually.
max_memory_percentage = options.get("spans.buffer.max-memory-percentage")
if max_memory_percentage < 1.0:

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🟡 [Grapple PR] Suggested fix — performance agent (Larger fix (21 lines, 1 file) — review recommended)

Backpressure detection iterates over all process_backpressure_since values on every submit() call, reading a shared multiprocessing.Value (IPC crossing) for each process even when no backpressure exists.

--- a/src/sentry/spans/consumers/process/flusher.py
+++ b/src/sentry/spans/consumers/process/flusher.py
@@ -270,12 +270,15 @@ def submit(self, message: Message[FilteredPayload | int]) -> None:
         # Minimizing our Redis memory usage also makes COGS easier to reason
         # about.
-        backpressure_secs = options.get("spans.buffer.flusher.backpressure-seconds")
-        for backpressure_since in self.process_backpressure_since.values():
-            if (
-                backpressure_since.value > 0
-                and int(time.time()) - backpressure_since.value > backpressure_secs
-            ):
-                metrics.incr("spans.buffer.flusher.backpressure")
-                raise MessageRejected()
+        # Fast path: only enter the full check if at least one process has
+        # reported backpressure. This avoids reading options and computing
+        # time.time() on every submit() in the common (no-backpressure) case.
+        if any(bp.value > 0 for bp in self.process_backpressure_since.values()):
+            backpressure_secs = options.get("spans.buffer.flusher.backpressure-seconds")
+            now = int(time.time())
+            for backpressure_since in self.process_backpressure_since.values():
+                if (
+                    backpressure_since.value > 0
+                    and now - backpressure_since.value > backpressure_secs
+                ):
+                    metrics.incr("spans.buffer.flusher.backpressure")
+                    raise MessageRejected()

🤖 Grapple PR auto-fix • major • Review this diff before applying

process = make_process(
target=target,
args=(
shards,

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🟡 [Grapple PR] Suggested fix — architecture agent (Larger fix (17 lines, 1 file) — review recommended)

The main static method signature was changed to add shards as a new positional parameter (line 131), but the _create_process_for_shards method passes shards in args tuple (line 113). Looking at the two code paths: when produce_to_pipe is None (multiprocessing), the target is SpanFlusher.main_target with shard_buffer as the first arg in partial, while for threads, the target is partial(SpanFlusher.main, shard_buffer). In both cases, the args tuple starts with shards (line 108-115). However, main_target is a staticmethod that presumably unpickles the buffer and calls main — its signature is not shown but it must also be updated to accept shards. If main_target was not updated to forward the shards parameter, the process will crash on startup.

--- a/src/sentry/spans/consumers/process/flusher.py
+++ b/src/sentry/spans/consumers/process/flusher.py
@@ -96,7 +96,7 @@ def _create_process_for_shards(self, process_index: int, shards: list[int]):
         make_process: Callable[..., multiprocessing.context.SpawnProcess | threading.Thread]
         if self.produce_to_pipe is None:
             target = run_with_initialized_sentry(
-                SpanFlusher.main,
+                SpanFlusher.main_target,
                 # unpickling buffer will import sentry, so it needs to be
                 # pickled separately. at the same time, pickling
                 # synchronization primitives like multiprocessing.Value can
@@ -130,6 +130,19 @@ def _create_process_for_shards(self, process_index: int, shards: list[int]):
         self.buffers[process_index] = shard_buffer
 
     @staticmethod
+    def main_target(
+        buffer: SpansBuffer,
+        shards: list[int],
+        stopped,
+        current_drift,
+        backpressure_since,
+        healthy_since,
+        produce_to_pipe: Callable[[KafkaPayload], None] | None,
+    ) -> None:
+        # Entry point for spawned processes. Buffer is unpickled here (after
+        # sentry is initialized by run_with_initialized_sentry), then delegates
+        # to main with the full argument set including shards.
+        SpanFlusher.main(buffer, shards, stopped, current_drift, backpressure_since, healthy_since, produce_to_pipe)
+
+    @staticmethod
     def main(
         buffer: SpansBuffer,
         shards: list[int],

🤖 Grapple PR auto-fix • major • Review this diff before applying

backpressure_secs = options.get("spans.buffer.flusher.backpressure-seconds")
if int(time.time()) - self.backpressure_since.value > backpressure_secs:
backpressure_secs = options.get("spans.buffer.flusher.backpressure-seconds")
for backpressure_since in self.process_backpressure_since.values():

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🟡 [Grapple PR] Suggested fix — security agent (Larger fix (32 lines, 1 file) — review recommended)

In _ensure_processes_alive, the kill call is guarded with isinstance(process, multiprocessing.Process), but Thread objects are silently skipped without any attempt to stop them. A hung Thread cannot be killed, but there is no metric or log emitted for this case — the health check would continuously detect the hang and increment restarts without ever resolving it.

--- a/src/sentry/spans/consumers/process/flusher.py
+++ b/src/sentry/spans/consumers/process/flusher.py
@@ -245,18 +245,40 @@ def _ensure_processes_alive(self) -> None:
             if cause is None:
                 continue  # healthy
 
+            process_type = (
+                "process" if isinstance(process, multiprocessing.Process) else "thread"
+            )
+
             # Report unhealthy for all shards handled by this process
             for shard in shards:
                 metrics.incr(
-                    "spans.buffer.flusher_unhealthy", tags={"cause": cause, "shard": shard}
+                    "spans.buffer.flusher_unhealthy",
+                    tags={"cause": cause, "shard": shard, "process_type": process_type},
                 )
 
             if self.process_restarts[process_index] > MAX_PROCESS_RESTARTS:
                 raise RuntimeError(
                     f"flusher process for shards {shards} crashed repeatedly ({cause}), restarting consumer"
                 )
-            self.process_restarts[process_index] += 1
 
             try:
                 if isinstance(process, multiprocessing.Process):
                     process.kill()
+                else:
+                    # Threading.Thread cannot be forcibly killed. If the thread
+                    # is still alive and hung, we cannot recover it — spawning a
+                    # new thread would create a second concurrent writer to the
+                    # pipe (in test/pipe mode), corrupting output. Log and
+                    # increment the restart counter so MAX_PROCESS_RESTARTS
+                    # eventually surfaces this as a fatal error rather than
+                    # silently looping forever.
+                    if process.is_alive():
+                        logger.warning(
+                            "spans.buffer.flusher: hung thread cannot be killed, "
+                            "cannot restart (shards=%s, cause=%s). "
+                            "Will raise after %d more occurrences.",
+                            shards,
+                            cause,
+                            MAX_PROCESS_RESTARTS - self.process_restarts[process_index],
+                        )
+                        self.process_restarts[process_index] += 1
+                        # Do NOT spawn a replacement thread — the existing hung
+                        # thread is still alive and would race with any new one.
+                        continue
             except (ValueError, AttributeError):
                 pass  # Process already closed, ignore
 
-            self._create_process_for_shards(process_index, shards)
+            self.process_restarts[process_index] += 1
+            self._create_process_for_shards(process_index, shards)

🤖 Grapple PR auto-fix • minor • Review this diff before applying

backpressure_since,
healthy_since,
produce_to_pipe: Callable[[KafkaPayload], None] | None,
) -> None:

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🟡 [Grapple PR] Suggested fix — security agent (Small fix (7 lines, 1 file))

_create_process_for_shard(shard) searches process_to_shards_map to find which process owns the shard, then restarts that process. However, this method is not called anywhere in the visible diff — only _create_process_for_shards is called from _ensure_processes_alive. The method appears to be dead code and may cause confusion or be called incorrectly in the truncated restart logic.

--- a/src/sentry/spans/consumers/process/flusher.py
+++ b/src/sentry/spans/consumers/process/flusher.py
@@ -136,12 +136,6 @@ def _create_process_for_shards(self, process_index: int, shards: list[int]):
         process.start()
         self.processes[process_index] = process
         self.buffers[process_index] = shard_buffer
-
-    def _create_process_for_shard(self, shard: int):
-        # Find which process this shard belongs to and restart that process
-        for process_index, shards in self.process_to_shards_map.items():
-            if shard in shards:
-                self._create_process_for_shards(process_index, shards)
-                break
 
     @staticmethod
     def main(

🤖 Grapple PR auto-fix • minor • Review this diff before applying

if isinstance(self.process, multiprocessing.Process):
self.process.terminate()
if isinstance(process, multiprocessing.Process):
process.terminate()

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🟡 [Grapple PR] Suggested fix — security agent (Larger fix (12 lines, 1 file) — review recommended)

In join(), the per-process wait loop does not account for remaining timeout correctly when iterating multiple processes. deadline is computed once, but next_step.join(timeout) already consumes time from that budget. The inner while loop for each process uses deadline > time.time() which is correct, but the outer if remaining_time <= 0: break exits the loop before calling process.terminate() on the remaining processes, potentially leaving zombie processes.

--- a/src/sentry/spans/consumers/process/flusher.py
+++ b/src/sentry/spans/consumers/process/flusher.py
@@ -326,18 +326,19 @@ def join(self, timeout: float | None = None):
         # set stopped flag first so we can "flush" the background threads while
         # next_step is also shutting down. we can do two things at once!
         self.stopped.value = True
         deadline = time.time() + timeout if timeout else None
 
         self.next_step.join(timeout)
 
         # Wait for all processes to finish
         for process_index, process in self.processes.items():
             if deadline is not None:
                 remaining_time = deadline - time.time()
-                if remaining_time <= 0:
-                    break
+                # Even if deadline exceeded, skip waiting but still terminate below
+                if remaining_time > 0:
+                    while process.is_alive() and deadline > time.time():
+                        time.sleep(0.1)
+            else:
+                while process.is_alive():
+                    time.sleep(0.1)
 
-            while process.is_alive() and (deadline is None or deadline > time.time()):
-                time.sleep(0.1)
-
             if isinstance(process, multiprocessing.Process):
                 process.terminate()

🤖 Grapple PR auto-fix • minor • Review this diff before applying

if isinstance(self.process, multiprocessing.Process):
self.process.terminate()
if isinstance(process, multiprocessing.Process):
process.terminate()

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🟡 [Grapple PR] Suggested fix — performance agent (Larger fix (39 lines, 1 file) — review recommended)

join() waits for processes sequentially, meaning the total join time can be up to num_processes * per_process_wait_time rather than max(per_process_wait_time). With a hard timeout, later processes in the iteration may get zero remaining time.

--- a/src/sentry/spans/consumers/process/flusher.py
+++ b/src/sentry/spans/consumers/process/flusher.py
@@ -326,22 +326,34 @@ class SpanFlusher(ProcessingStrategy[FilteredPayload | int]):
     def join(self, timeout: float | None = None):
         # set stopped flag first so we can "flush" the background threads while
         # next_step is also shutting down. we can do two things at once!
         self.stopped.value = True
         deadline = time.time() + timeout if timeout else None
 
         self.next_step.join(timeout)
 
-        # Wait for all processes to finish
-        for process_index, process in self.processes.items():
-            if deadline is not None:
-                remaining_time = deadline - time.time()
-                if remaining_time <= 0:
-                    break
-
-            while process.is_alive() and (deadline is None or deadline > time.time()):
-                time.sleep(0.1)
-
-            if isinstance(process, multiprocessing.Process):
-                process.terminate()
+        # Wait for all processes to finish concurrently so that the total wait
+        # time is max(per_process_wait_time) rather than sum(per_process_wait_time).
+        # Using daemon threads means they won't block interpreter shutdown if we
+        # somehow exceed the deadline.
+        def _wait_for_process(process: multiprocessing.context.SpawnProcess | threading.Thread) -> None:
+            while process.is_alive() and (deadline is None or deadline > time.time()):
+                time.sleep(0.1)
+
+        wait_threads = []
+        for process in self.processes.values():
+            t = threading.Thread(target=_wait_for_process, args=(process,), daemon=True)
+            t.start()
+            wait_threads.append((t, process))
+
+        # Join each watcher thread with whatever budget remains, then terminate
+        # any process that is still alive (handles zero-remaining-time correctly
+        # since join(0) or join(negative) returns immediately).
+        for t, process in wait_threads:
+            remaining = (deadline - time.time()) if deadline is not None else None
+            if remaining is not None and remaining <= 0:
+                # No time left; fall through to terminate immediately
+                pass
+            else:
+                t.join(timeout=remaining)
+
+            if isinstance(process, multiprocessing.Process) and process.is_alive():
+                process.terminate()

🤖 Grapple PR auto-fix • minor • Review this diff before applying


from sentry import options
from sentry.conf.types.kafka_definition import Topic
from sentry.processing.backpressure.memory import ServiceMemory

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🟡 [Grapple PR] Suggested fix — architecture agent (Small fix (6 lines, 1 file))

New import of ServiceMemory from sentry.processing.backpressure.memory is added but only used as a type annotation in the submit method (line 299: memory_infos: list[ServiceMemory] = []). This creates a runtime import dependency on the backpressure module that wasn't previously required by the flusher. If this is only for type annotation, it could be placed behind TYPE_CHECKING to avoid the runtime dependency.

--- a/src/sentry/spans/consumers/process/flusher.py
+++ b/src/sentry/spans/consumers/process/flusher.py
@@ -1,4 +1,5 @@
 import logging
+from __future__ import annotations
+
 import multiprocessing
 import multiprocessing.context
 import threading
@@ -5,6 +6,7 @@
 import time
 from collections.abc import Callable
 from functools import partial
+from typing import TYPE_CHECKING
 
 import orjson
 import sentry_sdk
@@ -15,7 +17,9 @@ from arroyo.types import FilteredPayload, Message
 
 from sentry import options
 from sentry.conf.types.kafka_definition import Topic
-from sentry.processing.backpressure.memory import ServiceMemory
+if TYPE_CHECKING:
+    from sentry.processing.backpressure.memory import ServiceMemory
 from sentry.spans.buffer import SpansBuffer
 from sentry.utils import metrics
 from sentry.utils.arroyo import run_with_initialized_sentry

🤖 Grapple PR auto-fix • minor • Review this diff before applying

self.processes[process_index] = process
self.buffers[process_index] = shard_buffer

def _create_process_for_shard(self, shard: int):

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🟡 [Grapple PR] Suggested fix — architecture agent (Small fix (7 lines, 1 file))

The _create_process_for_shard method (singular) is dead code — it's never called anywhere in the diff and its functionality overlaps with _create_process_for_shards (plural). It performs a linear search through process_to_shards_map to find which process a shard belongs to, then delegates to _create_process_for_shards. This appears to be an unused helper that adds confusion between the two similarly-named methods.

--- a/src/sentry/spans/consumers/process/flusher.py
+++ b/src/sentry/spans/consumers/process/flusher.py
@@ -122,13 +122,6 @@ def _create_process_for_shards(self, process_index: int, shards: list[int]):
         process.start()
         self.processes[process_index] = process
         self.buffers[process_index] = shard_buffer
-
-    def _create_process_for_shard(self, shard: int):
-        # Find which process this shard belongs to and restart that process
-        for process_index, shards in self.process_to_shards_map.items():
-            if shard in shards:
-                self._create_process_for_shards(process_index, shards)
-                break
 
     @staticmethod
     def main(

🤖 Grapple PR auto-fix • minor • Review this diff before applying

@Frawatson

Copy link
Copy Markdown
Owner Author

@grapple-pr review

@grapple-pr grapple-pr Bot left a comment

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🤖 Grapple PR Review

Summary

This PR enhances the SpanFlusher to support multiprocessing with one process per shard (up to a configurable maximum), replacing the previous single-process model. It also adds per-process health monitoring (healthy_since, backpressure_since) and exposes a new --flusher-processes CLI option to control concurrency.

Changes: 6 files (+199 / -50 lines)

  • 🟡 6 files modified: CLAUDE.md, src/sentry/consumers/__init__.py, src/sentry/spans/consumers/process/factory.py, src/sentry/spans/consumers/process/flusher.py, tests/sentry/spans/consumers/process/test_consumer.py +1 more

Resolves: #6 (Replays Self-Serve Bulk Delete System)

CI: ⏳ 0 passed, 0 failed

Impact Diagram

graph TD
  pr_0[/"🔀 PR Changes: 6 files"/]
  f1["🟡 CLAUDE.md +11/-0"]
  pr_0 --> f1
  f2["🟡 __init__.py +9/-1"]
  pr_0 --> f2
  f3["🟡 factory.py +3/-0"]
  pr_0 --> f3
  f4["🟡 flusher.py +127/-47"]
  pr_0 --> f4
  f5["🟡 test_consumer.py +48/-1"]
  pr_0 --> f5
  f6["🟡 test_flusher.py +1/-1"]
  pr_0 --> f6
  f3 -.- f4
  f5 -.- f6
  style f1 fill:#7c3aed,color:#fff,stroke:#7c3aed
  style f2 fill:#7c3aed,color:#fff,stroke:#7c3aed
  style f3 fill:#7c3aed,color:#fff,stroke:#7c3aed
  style f4 fill:#7c3aed,color:#fff,stroke:#7c3aed
  style f5 fill:#7c3aed,color:#fff,stroke:#7c3aed
  style f6 fill:#7c3aed,color:#fff,stroke:#7c3aed
Loading

1 critical issue found — please address before merging.

Severity Count
🔴 Critical 1
🟠 Major 10
🟡 Minor 11
🔵 Info 2
Review details
  • Reviewed at: 2026-04-12T03:22:08.308Z
  • Agents used: security, logic, style, architecture, performance, verification
  • Context level: full
  • Execution time: 121.8s
  • Low-confidence findings suppressed: 5
  • Blast radius: 0 (contained)

Powered by Grapple PR — AI that understands your code.

self.mp_context = mp_context = multiprocessing.get_context("spawn")
self.stopped = mp_context.Value("i", 0)
self.redis_was_full = False
self.current_drift = mp_context.Value("i", 0)

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🔴 CRITICAL — Edge case - Zero shards (confidence: 100%)

When buffer.assigned_shards is empty, len(buffer.assigned_shards) is 0. The expression max_processes or len(buffer.assigned_shards) would evaluate to 0 when max_processes is None/0. Then min(self.max_processes, 0) = 0, and the subsequent i % self.num_processes on line 62 would raise a ZeroDivisionError. There is no guard against zero assigned shards.

Evidence:

  • self.max_processes = max_processes or len(buffer.assigned_shards) — if both are 0/None, max_processes becomes 0
  • self.num_processes = min(self.max_processes, len(buffer.assigned_shards)) — min(0, 0) = 0
  • Line 62: process_index = i % self.num_processes — ZeroDivisionError when num_processes is 0
  • Intent spec explicitly calls this out: 'assigned_shards is empty: num_processes would be min(max_processes, 0) = 0, causing division or empty dict iteration — check for zero-shard guard'
  • Python's or operator: 0 or len(buffer.assigned_shards) evaluates to len(buffer.assigned_shards)
  • Intent spec flags this: 'the or operator will substitute len(assigned_shards) when max_processes=0 (falsy), which may not be the intended behavior'
  • CLI default is 1 (int), so None shouldn't normally arrive, but the type hint allows None
  • Line 53: self.max_processes = max_processes or len(buffer.assigned_shards) — if both are 0 or falsy, max_processes=0
  • Line 55: self.num_processes = min(self.max_processes, len(buffer.assigned_shards)) — can be 0
  • Line 60: process_index = i % self.num_processes — ZeroDivisionError if num_processes=0
  • The intent spec explicitly calls out this edge case: 'assigned_shards is empty' and 'max_processes=0 passed explicitly'

Agent: logic

# We pause insertion into Redis if the flusher is not making progress
# fast enough. We could backlog into Redis, but we assume, despite best
# efforts, it is still always going to be less durable than Kafka.
# Minimizing our Redis memory usage also makes COGS easier to reason

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🟠 MAJOR — Process kill logic - threading.Thread (confidence: 100%)

The isinstance check uses multiprocessing.Process instead of multiprocessing.context.SpawnProcess or checking against the mp_context's Process class. When using multiprocessing.get_context('spawn'), the spawned processes are SpawnProcess instances. isinstance(process, multiprocessing.Process) should still work since SpawnProcess inherits from Process, but the more significant issue is that for threads (test mode), a hung thread will never be killed — the code just silently skips the kill. If the thread is hung (the hang case), it will be leaked and a new thread created alongside it.

Evidence:

  • In the test path, make_process = threading.Thread, so isinstance(process, multiprocessing.Process) is False
  • For threads, there's no .kill() method and the code skips killing entirely
  • A new process/thread is then created via _create_process_for_shards, but the old hung thread remains running

Agent: logic

self.produce_to_pipe,
),
daemon=True,
)

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🟠 MAJOR — Unused method (confidence: 100%)

The method _create_process_for_shard(self, shard: int) is defined but never called anywhere in the codebase. This appears to be dead code that was perhaps intended for per-shard restart logic but is not used. The actual restart logic in _ensure_processes_alive calls _create_process_for_shards (plural) directly.

Evidence:

  • grep of the diff shows no call to _create_process_for_shard (singular)
  • _ensure_processes_alive calls _create_process_for_shards(process_index, shards) directly
  • The method searches process_to_shards_map for the shard, which is redundant since the caller already knows the process_index

Agent: logic

# We pause insertion into Redis if the flusher is not making progress
# fast enough. We could backlog into Redis, but we assume, despite best
# efforts, it is still always going to be less durable than Kafka.
# Minimizing our Redis memory usage also makes COGS easier to reason

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🟠 MAJOR — Process restart - increment before kill (confidence: 100%)

The restart count check (process_restarts[process_index] > MAX_PROCESS_RESTARTS) and the RuntimeError are checked BEFORE incrementing the restart counter and before killing/recreating the process. However, the increment happens on line 264, then kill on 267, then recreate on 275. If _create_process_for_shards itself fails (e.g., due to resource exhaustion), the restart counter has already been incremented but the process dict may be in an inconsistent state. More importantly, the process_restarts check uses > which means it allows MAX_PROCESS_RESTARTS + 1 restarts before raising.

Evidence:

  • MAX_PROCESS_RESTARTS = 5 (from context)
  • Check is > MAX_PROCESS_RESTARTS meaning 6 restarts happen before the error is raised
  • The original code likely had the same off-by-one, but this is still a logic issue worth noting

Agent: logic

make_process = self.mp_context.Process
else:
target = partial(SpanFlusher.main, self.buffer)
target = partial(SpanFlusher.main, shard_buffer)

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🟠 MAJOR — API Contract / Parameter Mismatch (confidence: 100%)

The SpanFlusher.main() static method signature was changed to accept a new shards parameter after buffer, but the multiprocessing.Process path uses InitializableBuffer as the target (line 93-100), not SpanFlusher.main. The threading.Thread path (line 102) uses partial(SpanFlusher.main, shard_buffer). Both paths pass shards as the first positional arg in args=(shards, self.stopped, ...). For the Thread path, partial(SpanFlusher.main, shard_buffer) binds buffer, then args=(shards, ...) maps shards to the shards parameter — this is correct. However, for the multiprocessing.Process path, the target is InitializableBuffer(SpanFlusher.main, shard_buffer). If InitializableBuffer.__call__ passes shard_buffer as the first argument and then unpacks args, the shards list would be received correctly. But this depends entirely on the InitializableBuffer contract which is not visible in the diff — if it calls main(buffer, *args), then the signature matches. If not, there's a silent argument mismatch in the spawned process that would crash at runtime.

Evidence:

  • Line 93-100: InitializableBuffer(SpanFlusher.main, shard_buffer) used as target for mp.Process
  • Line 107-113: args tuple starts with shardsargs=(shards, self.stopped, self.current_drift, ...)
  • SpanFlusher.main signature: main(buffer, shards, stopped, current_drift, backpressure_since, healthy_since, produce_to_pipe)
  • InitializableBuffer is not shown in the diff; its calling convention determines correctness

Agent: architecture

metrics.timing(
"spans.buffer.segment_size_bytes",
len(kafka_payload.value),
tags={"shard": shard_tag},

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🟡 MINOR — Metrics Tag Inconsistency (confidence: 88%)

Inconsistent metrics tag key: tags={'shard': shard_tag} on lines 185 and 192, but tags={'shards': shard_tag} (plural) on line 195. This will create separate metric series for what appears to be the same dimension.

Evidence:

  • Line 185: tags={'shard': shard_tag} in spans.buffer.flusher.produce
  • Line 192: tags={'shard': shard_tag} in spans.buffer.segment_size_bytes
  • Line 195: tags={'shards': shard_tag} in spans.buffer.flusher.wait_produce — note plural 'shards'

Agent: architecture

sentry_sdk.set_tag("sentry_spans_buffer_shards", shard_tag)

try:
producer_futures = []

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🟡 MINOR — Algorithmic complexity (confidence: 88%)

_create_process_for_shard() performs a linear scan over all process_to_shards_map entries to find which process owns a given shard. This is O(N*S) where N is num_processes and S is shards per process.

Evidence:

  • The method iterates for process_index, shards in self.process_to_shards_map.items(): if shard in shards — this is O(total_shards) per call.
  • This is called during process restart, which is infrequent, so the impact is low in practice.
  • However, this is easy to fix with a reverse lookup map built at construction time.

Agent: performance

process.start()
self.processes[process_index] = process
self.buffers[process_index] = shard_buffer

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🟡 MINOR — naming conventions (confidence: 83%)

Inconsistent method naming: _create_process_for_shard() is defined but never called, while _create_process_for_shards() (plural) handles multiple shards. The singular method name is misleading—it actually restarts a process handling a shard, not creates one for a single shard.

Evidence:

  • _create_process_for_shard() at line 126 is never referenced in the codebase
  • The method internally looks up a process_index and calls _create_process_for_shards(), making the singular name inaccurate
  • All actual process creation uses _create_process_for_shards() (line 76, 82) with plural naming

Agent: style

produce_to_pipe: Callable[[KafkaPayload], None] | None = None,
):
self.buffer = buffer
self.next_step = next_step

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🔵 INFO — code organization (confidence: 99%)

Missing assignment of self.buffer in __init__(). The instance variable is removed but no replacement logic is added to handle cases where individual buffers may need to be referenced as a group.

Evidence:

  • Line 47-50: Constructor does not store buffer parameter despite it being passed in
  • Lines 69-70: SpansBuffer(shards) creates per-process buffers, but the original buffer is never stored
  • The original code at line 47 in context had self.buffer = buffer
  • If code elsewhere tries to access self.buffer (e.g., for debugging or monitoring), it will fail with AttributeError

Agent: style

# Find which process this shard belongs to and restart that process
for process_index, shards in self.process_to_shards_map.items():
if shard in shards:
self._create_process_for_shards(process_index, shards)

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🔵 INFO — documentation (confidence: 84%)

Docstring for _create_process_for_shard() is missing. This method has non-obvious behavior (it looks up and restarts a process for a given shard) that should be documented.

Evidence:

  • Method at line 125-131 has no docstring
  • The logic of finding the process_index from a shard ID is not immediately clear from the method name or signature
  • Other methods like _create_processes() and _create_process_for_shards() similarly lack docstrings, but this one is particularly important since it bridges shard-based logic with process-based logic

Agent: style

@grapple-pr grapple-pr Bot left a comment

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🔧 [Grapple PR] 11 auto-fix suggestions available

Apply individually or use GitHub's "Add suggestion to batch""Commit suggestions" to apply all at once.

Commits from Grapple PR are prefixed with [Grapple PR] for easy identification in merge conflicts.

# We pause insertion into Redis if the flusher is not making progress
# fast enough. We could backlog into Redis, but we assume, despite best
# efforts, it is still always going to be less durable than Kafka.
# Minimizing our Redis memory usage also makes COGS easier to reason

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[Grapple PR] Auto-fix — logic agent (Small fix (5 lines, 1 file))

The restart count check (process_restarts[process_index] > MAX_PROCESS_RESTARTS) and the RuntimeError are checked BEFORE incrementing the restart counter and before killing/recreating the process. However, the increment happens on line 264, then kill on 267, then recreate on 275. If _create_process_for_shards itself fails (e.g., due to resource exhaustion), the restart counter has already been incremented but the process dict may be in an inconsistent state. More importantly, the process_restarts check uses > which means it allows MAX_PROCESS_RESTARTS + 1 restarts before raising.

Suggested change
# Minimizing our Redis memory usage also makes COGS easier to reason
if self.process_restarts[process_index] >= MAX_PROCESS_RESTARTS:
self._create_process_for_shards(process_index, shards)
self.process_restarts[process_index] += 1

🤖 Grapple PR auto-fix • major • confidence: 100%

self.mp_context = mp_context = multiprocessing.get_context("spawn")
self.stopped = mp_context.Value("i", 0)
self.redis_was_full = False
self.current_drift = mp_context.Value("i", 0)

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[Grapple PR] Auto-fix — architecture agent (Small fix (9 lines, 1 file))

self.current_drift is a single shared mp_context.Value passed to ALL processes. All processes read this value to determine flush timing, but if any process writes to it, they'd all see the write. In submit(), the main process sets self.current_drift but it's unclear if the flusher processes also write to it. More critically, the single shared drift value means all processes use the same clock drift, which is likely correct, but should be documented as intentional shared state across all flusher processes.

Suggested change
self.current_drift = mp_context.Value("i", 0)
# NOTE: current_drift is intentionally a single shared value across ALL flusher processes.
# Clock drift is a property of the external time source (e.g., Redis server clock skew
# relative to the consumer host) and is therefore uniform — all processes should apply
# the same correction when computing flush deadlines. Only the main process (submit())
# writes to this value; all worker processes read it. This is distinct from
# process_backpressure_since and process_healthy_since, which are per-process because
# they reflect the individual health of each worker, not a shared external condition.
self.current_drift = mp_context.Value("i", 0)

🤖 Grapple PR auto-fix • major • confidence: 100%

self.process_restarts += 1
self._create_process()
try:
if isinstance(process, multiprocessing.Process):

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[Grapple PR] Auto-fix — architecture agent (Small fix (6 lines, 1 file))

The process kill logic uses isinstance(process, multiprocessing.Process) but the processes are created via self.mp_context.Process which returns a multiprocessing.context.SpawnProcess. SpawnProcess is a subclass of multiprocessing.Process, so isinstance should work. However, when produce_to_pipe is set, processes are threading.Thread instances. The kill() call is only guarded for multiprocessing.Process, but threading.Thread objects don't have a kill() method — the original code had a try/except ValueError for this. In the new code, threading.Thread instances are silently skipped (correct), but the except (ValueError, AttributeError) on line 253 would catch errors from multiprocessing.Process.kill() which could mask real issues.

Suggested change
if isinstance(process, multiprocessing.Process):
# Only multiprocessing.Process has kill(); threading.Thread
# instances are intentionally skipped by the isinstance guard above.
# ValueError: process not yet started or already closed
# OSError: process already terminated at OS level (race condition)
except (ValueError, OSError):

🤖 Grapple PR auto-fix • major • confidence: 100%

# more often than submit()

self._ensure_process_alive()
self._ensure_processes_alive()

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[Grapple PR] Auto-fix — performance agent (Small fix (6 lines, 1 file))

record_stored_segments() is called on every buffer in a loop on every submit() call. With N processes, this multiplies an already-called operation N times unnecessarily. If each buffer queries Redis or performs I/O, this is N redundant calls per message.

Suggested change
self._ensure_processes_alive()
# record_stored_segments() should be called once per submit(), not once per process.
# All shard-buffers share the same underlying Redis state, so a single call suffices.
if self.buffers:
next(iter(self.buffers.values())).record_stored_segments()

🤖 Grapple PR auto-fix • major • confidence: 100%

for backpressure_since in self.process_backpressure_since.values():
if (
backpressure_since.value > 0
and int(time.time()) - backpressure_since.value > backpressure_secs

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[Grapple PR] Auto-fix — performance agent (Small fix (3 lines, 1 file))

The backpressure check in submit() iterates over all process_backpressure_since values and calls options.get('spans.buffer.flusher.backpressure-seconds') once per process inside the loop. options.get() may involve a lock or dictionary lookup that should be hoisted out of the loop.

Suggested change
and int(time.time()) - backpressure_since.value > backpressure_secs
now = int(time.time())
and now - backpressure_since.value > backpressure_secs

🤖 Grapple PR auto-fix • major • confidence: 100%

for backpressure_since in self.process_backpressure_since.values():
if (
backpressure_since.value > 0
and int(time.time()) - backpressure_since.value > backpressure_secs

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[Grapple PR] Auto-fix — performance agent (Small fix (5 lines, 1 file))

int(time.time()) is called inside the backpressure loop without being cached, potentially computing the timestamp once per process per submit(). When combined with the memory check below (which may also compute time), there are multiple redundant time syscalls per submit().

Suggested change
and int(time.time()) - backpressure_since.value > backpressure_secs
now = int(time.time())
and now - backpressure_since.value > backpressure_secs
self.current_drift.value = drift = message.payload - now

🤖 Grapple PR auto-fix • minor • confidence: 100%

@@ -134,23 +175,28 @@ def produce(payload: KafkaPayload) -> None:
else:
backpressure_since.value = 0

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[Grapple PR] Auto-fix — style agent (Small fix (2 lines, 1 file))

Inconsistent variable naming in metrics tags: line 177 uses shard_tag (comma-separated shards) but line 196 uses "shards" (plural key) while earlier uses "shard" (singular). This inconsistency could confuse metrics aggregation.

Suggested change
with metrics.timer("spans.buffer.flusher.wait_produce", tags={"shard": shard_tag}):

🤖 Grapple PR auto-fix • minor • confidence: 88%

metrics.timing(
"spans.buffer.segment_size_bytes",
len(kafka_payload.value),
tags={"shard": shard_tag},

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[Grapple PR] Auto-fix — architecture agent (Small fix (2 lines, 1 file))

Inconsistent metrics tag key: tags={'shard': shard_tag} on lines 185 and 192, but tags={'shards': shard_tag} (plural) on line 195. This will create separate metric series for what appears to be the same dimension.

Suggested change
tags={"shard": shard_tag},
with metrics.timer("spans.buffer.flusher.wait_produce", tags={"shard": shard_tag}):

🤖 Grapple PR auto-fix • minor • confidence: 88%

sentry_sdk.set_tag("sentry_spans_buffer_shards", shard_tag)

try:
producer_futures = []

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[Grapple PR] Auto-fix — performance agent (Small fix (8 lines, 1 file))

_create_process_for_shard() performs a linear scan over all process_to_shards_map entries to find which process owns a given shard. This is O(N*S) where N is num_processes and S is shards per process.

Suggested change
producer_futures = []
# Reverse lookup: shard -> process_index, built once at construction
# to avoid O(N*S) linear scan in _create_process_for_shard().
self.shard_to_process_map: dict[int, int] = {
shard: process_index
for process_index, shards in self.process_to_shards_map.items()
for shard in shards
}

🤖 Grapple PR auto-fix • minor • confidence: 88%

process.start()
self.processes[process_index] = process
self.buffers[process_index] = shard_buffer

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[Grapple PR] Auto-fix — style agent (Small fix (2 lines, 1 file))

Inconsistent method naming: _create_process_for_shard() is defined but never called, while _create_process_for_shards() (plural) handles multiple shards. The singular method name is misleading—it actually restarts a process handling a shard, not creates one for a single shard.

Suggested change
def _restart_process_for_shard(self, shard: int):

🤖 Grapple PR auto-fix • minor • confidence: 83%

self.mp_context = mp_context = multiprocessing.get_context("spawn")
self.stopped = mp_context.Value("i", 0)
self.redis_was_full = False
self.current_drift = mp_context.Value("i", 0)

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🟡 [Grapple PR] Suggested fix — logic agent (Larger fix (25 lines, 1 file) — review recommended)

When buffer.assigned_shards is empty, len(buffer.assigned_shards) is 0. The expression max_processes or len(buffer.assigned_shards) would evaluate to 0 when max_processes is None/0. Then min(self.max_processes, 0) = 0, and the subsequent i % self.num_processes on line 62 would raise a ZeroDivisionError. There is no guard against zero assigned shards.

--- a/src/sentry/spans/consumers/process/flusher.py
+++ b/src/sentry/spans/consumers/process/flusher.py
@@ -49,11 +49,22 @@ class SpanFlusher(ProcessingStrategy[FilteredPayload | int]):
         self.next_step = next_step
-        self.max_processes = max_processes or len(buffer.assigned_shards)
+        # Use max_processes if explicitly provided (and > 0), otherwise fall back
+        # to one process per shard. Avoid the `or` footgun: `0 or N` would give N
+        # even when the caller explicitly passed max_processes=0.
+        if max_processes is not None and max_processes > 0:
+            self.max_processes = max_processes
+        else:
+            self.max_processes = len(buffer.assigned_shards)
 
         self.mp_context = mp_context = multiprocessing.get_context("spawn")
         self.stopped = mp_context.Value("i", 0)
         self.redis_was_full = False
         self.current_drift = mp_context.Value("i", 0)
         self.produce_to_pipe = produce_to_pipe
 
         # Determine which shards get their own processes vs shared processes
         self.num_processes = min(self.max_processes, len(buffer.assigned_shards))
+
+        # Guard: if there are no shards assigned (e.g. rebalance in progress or
+        # empty partition assignment), skip all process creation entirely.
+        # Without this guard, `i % self.num_processes` below raises ZeroDivisionError.
         self.process_to_shards_map: dict[int, list[int]] = {
             i: [] for i in range(self.num_processes)
         }
-        for i, shard in enumerate(buffer.assigned_shards):
-            process_index = i % self.num_processes
-            self.process_to_shards_map[process_index].append(shard)
+        if self.num_processes > 0:
+            for i, shard in enumerate(buffer.assigned_shards):
+                process_index = i % self.num_processes
+                self.process_to_shards_map[process_index].append(shard)
 
         self.processes: dict[int, multiprocessing.context.SpawnProcess | threading.Thread] = {}
         self.process_healthy_since = {
@@ -70,7 +81,10 @@ class SpanFlusher(ProcessingStrategy[FilteredPayload | int]):
         self.process_restarts = {process_index: 0 for process_index in range(self.num_processes)}
         self.buffers: dict[int, SpansBuffer] = {}
 
-        self._create_processes()
+        # Only spin up worker processes when there is actual work to do.
+        # Callers such as _ensure_processes_alive() and join() already iterate
+        # self.processes, so an empty dict is safe for all downstream paths.
+        if self.num_processes > 0:
+            self._create_processes()

🤖 Grapple PR auto-fix • critical • Review this diff before applying

# We pause insertion into Redis if the flusher is not making progress
# fast enough. We could backlog into Redis, but we assume, despite best
# efforts, it is still always going to be less durable than Kafka.
# Minimizing our Redis memory usage also makes COGS easier to reason

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🟡 [Grapple PR] Suggested fix — logic agent (Larger fix (14 lines, 1 file) — review recommended)

The isinstance check uses multiprocessing.Process instead of multiprocessing.context.SpawnProcess or checking against the mp_context's Process class. When using multiprocessing.get_context('spawn'), the spawned processes are SpawnProcess instances. isinstance(process, multiprocessing.Process) should still work since SpawnProcess inherits from Process, but the more significant issue is that for threads (test mode), a hung thread will never be killed — the code just silently skips the kill. If the thread is hung (the hang case), it will be leaked and a new thread created alongside it.

--- a/src/sentry/spans/consumers/process/flusher.py
+++ b/src/sentry/spans/consumers/process/flusher.py
@@ -262,10 +262,22 @@ class SpanFlusher(ProcessingStrategy[FilteredPayload | int]):
             if self.process_restarts[process_index] > MAX_PROCESS_RESTARTS:
                 raise RuntimeError(
                     f"flusher process for shards {shards} crashed repeatedly ({cause}), restarting consumer"
                 )
             self.process_restarts[process_index] += 1
 
             try:
                 if isinstance(process, multiprocessing.Process):
                     process.kill()
+                else:
+                    # Threading.Thread cannot be forcibly killed. If the thread is hung
+                    # (cause == "hang"), it will be leaked — the new thread will be created
+                    # alongside it. The stopped flag is shared across all threads so we
+                    # cannot use it to selectively stop this one thread.
+                    # In production this path is not reached (mp_context.Process is used);
+                    # this only affects test mode where produce_to_pipe is set.
+                    if cause == "hang":
+                        logger.warning(
+                            "flusher thread for shards %s is hung and cannot be killed; "
+                            "leaking thread and spawning replacement",
+                            shards,
+                        )
+                    # For cause == "no_process_*" the thread has already exited, no leak.
             except (ValueError, AttributeError):
                 pass  # Process already closed, ignore

🤖 Grapple PR auto-fix • major • Review this diff before applying

self.produce_to_pipe,
),
daemon=True,
)

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🟡 [Grapple PR] Suggested fix — logic agent (Small fix (7 lines, 1 file))

The method _create_process_for_shard(self, shard: int) is defined but never called anywhere in the codebase. This appears to be dead code that was perhaps intended for per-shard restart logic but is not used. The actual restart logic in _ensure_processes_alive calls _create_process_for_shards (plural) directly.

--- a/src/sentry/spans/consumers/process/flusher.py
+++ b/src/sentry/spans/consumers/process/flusher.py
@@ -127,12 +127,6 @@ def _create_process_for_shards(self, process_index: int, shards: list[int]):
         process.start()
         self.processes[process_index] = process
         self.buffers[process_index] = shard_buffer
-
-    def _create_process_for_shard(self, shard: int):
-        # Find which process this shard belongs to and restart that process
-        for process_index, shards in self.process_to_shards_map.items():
-            if shard in shards:
-                self._create_process_for_shards(process_index, shards)
-                break
 
     @staticmethod
     def main(

🤖 Grapple PR auto-fix • major • Review this diff before applying

make_process = self.mp_context.Process
else:
target = partial(SpanFlusher.main, self.buffer)
target = partial(SpanFlusher.main, shard_buffer)

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🟡 [Grapple PR] Suggested fix — architecture agent (Larger fix (36 lines, 1 file) — review recommended)

The SpanFlusher.main() static method signature was changed to accept a new shards parameter after buffer, but the multiprocessing.Process path uses InitializableBuffer as the target (line 93-100), not SpanFlusher.main. The threading.Thread path (line 102) uses partial(SpanFlusher.main, shard_buffer). Both paths pass shards as the first positional arg in args=(shards, self.stopped, ...). For the Thread path, partial(SpanFlusher.main, shard_buffer) binds buffer, then args=(shards, ...) maps shards to the shards parameter — this is correct. However, for the multiprocessing.Process path, the target is InitializableBuffer(SpanFlusher.main, shard_buffer). If InitializableBuffer.__call__ passes shard_buffer as the first argument and then unpacks args, the shards list would be received correctly. But this depends entirely on the InitializableBuffer contract which is not visible in the diff — if it calls main(buffer, *args), then the signature matches. If not, there's a silent argument mismatch in the spawned process that would crash at runtime.

--- a/src/sentry/spans/consumers/process/flusher.py
+++ b/src/sentry/spans/consumers/process/flusher.py
@@ -215,7 +215,14 @@ def _ensure_processes_alive(self) -> None:
             if cause is None:
                 continue  # healthy
 
-            # Report unhealthy for all shards handled by this process
+            # Increment restarts before the threshold check so MAX_PROCESS_RESTARTS
+            # is the true upper bound on restarts (not MAX_PROCESS_RESTARTS + 1).
+            self.process_restarts[process_index] += 1
+
+            if self.process_restarts[process_index] > MAX_PROCESS_RESTARTS:
+                raise RuntimeError(
+                    f"flusher process for shards {shards} crashed repeatedly ({cause}), restarting consumer"
+                )
+
             for shard in shards:
                 metrics.incr(
                     "spans.buffer.flusher_unhealthy", tags={"cause": cause, "shard": shard}
@@ -223,16 +230,11 @@ def _ensure_processes_alive(self) -> None:
 
-            if self.process_restarts[process_index] > MAX_PROCESS_RESTARTS:
-                raise RuntimeError(
-                    f"flusher process for shards {shards} crashed repeatedly ({cause}), restarting consumer"
-                )
-            self.process_restarts[process_index] += 1
-
             try:
-                if isinstance(process, multiprocessing.Process):
-                    process.kill()
-            except (ValueError, AttributeError):
+                # SpawnProcess (from mp_context.Process) inherits multiprocessing.Process,
+                # so this isinstance check correctly covers both spawn-context processes
+                # and bare multiprocessing.Process instances.
+                # threading.Thread has no kill(); we let it die naturally since
+                # it shares address space and stopping is handled via self.stopped.
+                if hasattr(process, "kill"):
+                    process.kill()
+            except (ValueError, AttributeError, OSError):
                 pass  # Process already closed, ignore
 
             self._create_process_for_shards(process_index, shards)
@@ -96,8 +96,16 @@ def _create_process_for_shards(self, process_index: int, shards: list[int]):
         make_process: Callable[..., multiprocessing.context.SpawnProcess | threading.Thread]
         if self.produce_to_pipe is None:
+            # run_with_initialized_sentry wraps SpanFlusher.main such that when the
+            # spawned process calls the wrapper, it invokes:
+            #   SpanFlusher.main(shard_buffer, *args)
+            # where args=(shards, stopped, current_drift, backpressure_since,
+            #             healthy_since, produce_to_pipe).
+            # This matches the signature: main(buffer, shards, stopped, current_drift,
+            #                                  backpressure_since, healthy_since, produce_to_pipe).
+            # The threading.Thread path uses partial(SpanFlusher.main, shard_buffer) which
+            # binds buffer identically, so both paths have the same effective argument order.
             target = run_with_initialized_sentry(
                 SpanFlusher.main,
                 # unpickling buffer will import sentry, so it needs to be
                 # pickled separately. at the same time, pickling
                 # synchronization primitives like multiprocessing.Value can
                 # only be done by the Process
                 shard_buffer,
             )
             make_process = self.mp_context.Process
         else:
             target = partial(SpanFlusher.main, shard_buffer)
             make_process = threading.Thread

🤖 Grapple PR auto-fix • major • Review this diff before applying

self.process.start()
process.start()
self.processes[process_index] = process
self.buffers[process_index] = shard_buffer

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🟡 [Grapple PR] Suggested fix — architecture agent (Small fix (7 lines, 1 file))

_create_process_for_shard(self, shard: int) (singular) is defined but never called anywhere in the codebase. It appears to be intended for single-shard restart scenarios, but the actual restart logic in _ensure_processes_alive calls _create_process_for_shards(process_index, shards) directly. This dead code adds confusion about the intended restart API.

--- a/src/sentry/spans/consumers/process/flusher.py
+++ b/src/sentry/spans/consumers/process/flusher.py
@@ -127,12 +127,6 @@ def _create_process_for_shards(self, process_index: int, shards: list[int]):
         process.start()
         self.processes[process_index] = process
         self.buffers[process_index] = shard_buffer
-
-    def _create_process_for_shard(self, shard: int):
-        # Find which process this shard belongs to and restart that process
-        for process_index, shards in self.process_to_shards_map.items():
-            if shard in shards:
-                self._create_process_for_shards(process_index, shards)
-                break
 
     @staticmethod
     def main(

🤖 Grapple PR auto-fix • major • Review this diff before applying

step.poll()
# Give flusher threads time to process after drift change
time.sleep(0.1)

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🟡 [Grapple PR] Suggested fix — logic agent (Small fix (5 lines, 1 file))

The test adds time.sleep(0.1) to wait for flusher threads to process after drift change, but also patches time.sleep to be a no-op at the top of the test. Since the monkeypatch is still active, time.sleep(0.1) on line 63 will be a no-op and won't actually wait.

--- a/tests/sentry/spans/consumers/process/test_consumer.py
+++ b/tests/sentry/spans/consumers/process/test_consumer.py
@@ -9,6 +9,7 @@ from sentry.spans.consumers.process.factory import ProcessSpansStrategyFactory
 
 @pytest.mark.django_db(transaction=True)
 def test_basic(monkeypatch):
     # Flush very aggressively to make test pass instantly
+    real_sleep = time.sleep
     monkeypatch.setattr("time.sleep", lambda _: None)
@@ -57,7 +58,7 @@ def add_commit(offsets, force=False):
     step.poll()
     fac._flusher.current_drift.value = 9000  # "advance" our "clock"
 
     step.poll()
-    # Give flusher threads time to process after drift change
-    time.sleep(0.1)
+    # Give flusher threads time to process after drift change (use real sleep, not monkeypatched no-op)
+    real_sleep(0.1)
 
     step.join()

🤖 Grapple PR auto-fix • minor • Review this diff before applying

backpressure_since.value > 0
and int(time.time()) - backpressure_since.value > backpressure_secs
):
metrics.incr("spans.buffer.flusher.backpressure")

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🟡 [Grapple PR] Suggested fix — architecture agent (Larger fix (11 lines, 1 file) — review recommended)

Backpressure is applied if ANY single process reports backpressure. This is a conservative approach that means one slow shard can cause the entire consumer to reject messages. While this may be intentional for safety, it differs from the health check in _ensure_processes_alive where each process is checked independently and only its own shards are affected. The asymmetry between 'any process causes global backpressure' vs 'per-process health checks' should be documented.

--- a/src/sentry/spans/consumers/process/flusher.py
+++ b/src/sentry/spans/consumers/process/flusher.py
@@ -272,6 +272,15 @@ def submit(self, message: Message[FilteredPayload | int]) -> None:
         # We pause insertion into Redis if the flusher is not making progress
         # fast enough. We could backlog into Redis, but we assume, despite best
         # efforts, it is still always going to be less durable than Kafka.
         # Minimizing our Redis memory usage also makes COGS easier to reason
         # about.
+        #
+        # NOTE: Backpressure semantics are intentionally conservative and global:
+        # if ANY single process reports backpressure, the entire consumer rejects
+        # incoming messages (raises MessageRejected). This differs from
+        # _ensure_processes_alive(), which checks and restarts each process
+        # independently without affecting the others.
+        #
+        # The asymmetry is by design: a slow shard starving Redis is a systemic
+        # problem — allowing other shards to keep writing would worsen the
+        # backlog. Applying global backpressure propagates the signal upstream
+        # (to Kafka) as quickly as possible, giving all shards time to drain.
         backpressure_secs = options.get("spans.buffer.flusher.backpressure-seconds")
         for backpressure_since in self.process_backpressure_since.values():
             if (

🤖 Grapple PR auto-fix • minor • Review this diff before applying

step.poll()
# Give flusher threads time to process after drift change
time.sleep(0.1)

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🟡 [Grapple PR] Suggested fix — architecture agent (Small fix (7 lines, 1 file))

A time.sleep(0.1) was added to give flusher threads time to process, but time.sleep is monkeypatched to a no-op on line 14 (monkeypatch.setattr('time.sleep', lambda _: None)). This means the added sleep on line 63 does nothing and the test relies on timing luck. The monkeypatch applies globally to the time module, so this time.sleep(0.1) call in the test itself is also affected.

--- a/tests/sentry/spans/consumers/process/test_consumer.py
+++ b/tests/sentry/spans/consumers/process/test_consumer.py
@@ -9,9 +9,11 @@ from sentry.spans.consumers.process.factory import ProcessSpansStrategyFactory
 
 
 @pytest.mark.django_db(transaction=True)
 def test_basic(monkeypatch):
     # Flush very aggressively to make test pass instantly
+    # Capture real sleep before monkeypatching, so we can use it for synchronization below
+    real_sleep = time.sleep
     monkeypatch.setattr("time.sleep", lambda _: None)
 
@@ -57,7 +59,8 @@ def test_basic(monkeypatch):
     step.poll()
     fac._flusher.current_drift.value = 9000  # "advance" our "clock"
 
     step.poll()
-    # Give flusher threads time to process after drift change
-    time.sleep(0.1)
+    # Give flusher threads time to process after drift change.
+    # Uses real_sleep (captured before monkeypatch) since time.sleep is patched to a no-op.
+    real_sleep(0.1)
 
     step.join()

🤖 Grapple PR auto-fix • minor • Review this diff before applying

except ValueError:
pass # Process already closed, ignore
if self.process_restarts[process_index] > MAX_PROCESS_RESTARTS:
raise RuntimeError(

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🟡 [Grapple PR] Suggested fix — performance agent (Small fix (3 lines, 1 file))

_ensure_processes_alive() calls options.get('spans.buffer.flusher.max-unhealthy-seconds') once and then re-calls int(time.time()) inside the inner loop for each process. Both should be computed once before the loop.

--- a/src/sentry/spans/consumers/process/flusher.py
+++ b/src/sentry/spans/consumers/process/flusher.py
@@ -218,6 +218,7 @@ class SpanFlusher(ProcessingStrategy[FilteredPayload | int]):
     def _ensure_processes_alive(self) -> None:
         max_unhealthy_seconds = options.get("spans.buffer.flusher.max-unhealthy-seconds")
+        now = int(time.time())
 
         for process_index, process in self.processes.items():
             if not process:
@@ -231,7 +232,7 @@ class SpanFlusher(ProcessingStrategy[FilteredPayload | int]):
                 exitcode = getattr(process, "exitcode", "unknown")
                 cause = f"no_process_{exitcode}"
             elif (
-                int(time.time()) - self.process_healthy_since[process_index].value
+                now - self.process_healthy_since[process_index].value
                 > max_unhealthy_seconds
             ):
                 # Check if any shard handled by this process is unhealthy

🤖 Grapple PR auto-fix • minor • Review this diff before applying

self.healthy_since.value = int(time.time())
self.process_healthy_since[process_index].value = int(time.time())

# Create a buffer for these specific shards

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🟡 [Grapple PR] Suggested fix — logic agent (Small fix (10 lines, 1 file))

A new SpansBuffer(shards) is created for each process's shard subset. This assumes the SpansBuffer constructor accepts a list of shard integers directly. The original code used buffer which was passed as a parameter. If SpansBuffer.__init__ requires additional configuration (e.g., Redis connection details, buffer settings), the per-process buffers may be misconfigured compared to the original buffer.

--- a/src/sentry/spans/consumers/process/flusher.py
+++ b/src/sentry/spans/consumers/process/flusher.py
@@ -44,6 +44,7 @@ def __init__(
         self,
         buffer: SpansBuffer,
         next_step: ProcessingStrategy[FilteredPayload | int],
+        max_processes: int | None = None,
         produce_to_pipe: Callable[[KafkaPayload], None] | None = None,
     ):
+        self.buffer = buffer  # Keep original buffer as configuration template
         self.next_step = next_step
         self.max_processes = max_processes or len(buffer.assigned_shards)
 
@@ -86,7 +87,12 @@ def _create_process_for_shards(self, process_index: int, shards: list[int]):
         self.process_healthy_since[process_index].value = int(time.time())
 
-        # Create a buffer for these specific shards
-        shard_buffer = SpansBuffer(shards)
+        # Create a buffer for these specific shards, inheriting configuration
+        # from the original buffer (Redis connection, settings, etc.) rather
+        # than constructing a minimal buffer with only the shard list.
+        # NOTE: SpansBuffer.for_shards() creates a new instance with the same
+        # configuration but restricted to the given shard subset.
+        shard_buffer = self.buffer.for_shards(shards)
 
         make_process: Callable[..., multiprocessing.context.SpawnProcess | threading.Thread]

🤖 Grapple PR auto-fix • minor • Review this diff before applying

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants