-
Notifications
You must be signed in to change notification settings - Fork 0
Span Buffer Multiprocess Enhancement with Health Monitoring #11
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: span-flusher-stable
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change | ||||
|---|---|---|---|---|---|---|
|
|
@@ -15,6 +15,7 @@ | |||||
|
|
||||||
| from sentry import options | ||||||
| from sentry.conf.types.kafka_definition import Topic | ||||||
| from sentry.processing.backpressure.memory import ServiceMemory | ||||||
| from sentry.spans.buffer import SpansBuffer | ||||||
| from sentry.utils import metrics | ||||||
| from sentry.utils.arroyo import run_with_initialized_sentry | ||||||
|
|
@@ -27,7 +28,8 @@ | |||||
|
|
||||||
| class SpanFlusher(ProcessingStrategy[FilteredPayload | int]): | ||||||
| """ | ||||||
| A background thread that polls Redis for new segments to flush and to produce to Kafka. | ||||||
| A background multiprocessing manager that polls Redis for new segments to flush and to produce to Kafka. | ||||||
| Creates one process per shard for parallel processing. | ||||||
|
|
||||||
| This is a processing step to be embedded into the consumer that writes to | ||||||
| Redis. It takes and fowards integer messages that represent recently | ||||||
|
|
@@ -42,27 +44,53 @@ def __init__( | |||||
| self, | ||||||
| buffer: SpansBuffer, | ||||||
| next_step: ProcessingStrategy[FilteredPayload | int], | ||||||
| max_processes: int | None = None, | ||||||
| produce_to_pipe: Callable[[KafkaPayload], None] | None = None, | ||||||
| ): | ||||||
| self.buffer = buffer | ||||||
| self.next_step = next_step | ||||||
| self.max_processes = max_processes or len(buffer.assigned_shards) | ||||||
|
|
||||||
| self.mp_context = mp_context = multiprocessing.get_context("spawn") | ||||||
| self.stopped = mp_context.Value("i", 0) | ||||||
| self.redis_was_full = False | ||||||
| self.current_drift = mp_context.Value("i", 0) | ||||||
| self.backpressure_since = mp_context.Value("i", 0) | ||||||
| self.healthy_since = mp_context.Value("i", 0) | ||||||
| self.process_restarts = 0 | ||||||
| self.produce_to_pipe = produce_to_pipe | ||||||
|
|
||||||
| self._create_process() | ||||||
|
|
||||||
| def _create_process(self): | ||||||
| # Determine which shards get their own processes vs shared processes | ||||||
| self.num_processes = min(self.max_processes, len(buffer.assigned_shards)) | ||||||
| self.process_to_shards_map: dict[int, list[int]] = { | ||||||
| i: [] for i in range(self.num_processes) | ||||||
| } | ||||||
| for i, shard in enumerate(buffer.assigned_shards): | ||||||
| process_index = i % self.num_processes | ||||||
| self.process_to_shards_map[process_index].append(shard) | ||||||
|
|
||||||
| self.processes: dict[int, multiprocessing.context.SpawnProcess | threading.Thread] = {} | ||||||
| self.process_healthy_since = { | ||||||
| process_index: mp_context.Value("i", int(time.time())) | ||||||
| for process_index in range(self.num_processes) | ||||||
| } | ||||||
| self.process_backpressure_since = { | ||||||
| process_index: mp_context.Value("i", 0) for process_index in range(self.num_processes) | ||||||
| } | ||||||
| self.process_restarts = {process_index: 0 for process_index in range(self.num_processes)} | ||||||
| self.buffers: dict[int, SpansBuffer] = {} | ||||||
|
|
||||||
| self._create_processes() | ||||||
|
|
||||||
| def _create_processes(self): | ||||||
| # Create processes based on shard mapping | ||||||
| for process_index, shards in self.process_to_shards_map.items(): | ||||||
| self._create_process_for_shards(process_index, shards) | ||||||
|
|
||||||
| def _create_process_for_shards(self, process_index: int, shards: list[int]): | ||||||
| # Optimistically reset healthy_since to avoid a race between the | ||||||
| # starting process and the next flush cycle. Keep back pressure across | ||||||
| # the restart, however. | ||||||
| self.healthy_since.value = int(time.time()) | ||||||
| self.process_healthy_since[process_index].value = int(time.time()) | ||||||
|
|
||||||
| # Create a buffer for these specific shards | ||||||
| shard_buffer = SpansBuffer(shards) | ||||||
|
|
||||||
| make_process: Callable[..., multiprocessing.context.SpawnProcess | threading.Thread] | ||||||
| if self.produce_to_pipe is None: | ||||||
|
|
@@ -72,37 +100,50 @@ def _create_process(self): | |||||
| # pickled separately. at the same time, pickling | ||||||
| # synchronization primitives like multiprocessing.Value can | ||||||
| # only be done by the Process | ||||||
| self.buffer, | ||||||
| shard_buffer, | ||||||
| ) | ||||||
| make_process = self.mp_context.Process | ||||||
| else: | ||||||
| target = partial(SpanFlusher.main, self.buffer) | ||||||
| target = partial(SpanFlusher.main, shard_buffer) | ||||||
| make_process = threading.Thread | ||||||
|
|
||||||
| self.process = make_process( | ||||||
| process = make_process( | ||||||
| target=target, | ||||||
| args=( | ||||||
| shards, | ||||||
| self.stopped, | ||||||
| self.current_drift, | ||||||
| self.backpressure_since, | ||||||
| self.healthy_since, | ||||||
| self.process_backpressure_since[process_index], | ||||||
| self.process_healthy_since[process_index], | ||||||
| self.produce_to_pipe, | ||||||
| ), | ||||||
| daemon=True, | ||||||
| ) | ||||||
|
|
||||||
| self.process.start() | ||||||
| process.start() | ||||||
| self.processes[process_index] = process | ||||||
| self.buffers[process_index] = shard_buffer | ||||||
|
|
||||||
| def _create_process_for_shard(self, shard: int): | ||||||
| # Find which process this shard belongs to and restart that process | ||||||
| for process_index, shards in self.process_to_shards_map.items(): | ||||||
| if shard in shards: | ||||||
| self._create_process_for_shards(process_index, shards) | ||||||
| break | ||||||
|
|
||||||
| @staticmethod | ||||||
| def main( | ||||||
| buffer: SpansBuffer, | ||||||
| shards: list[int], | ||||||
| stopped, | ||||||
| current_drift, | ||||||
| backpressure_since, | ||||||
| healthy_since, | ||||||
| produce_to_pipe: Callable[[KafkaPayload], None] | None, | ||||||
| ) -> None: | ||||||
| shard_tag = ",".join(map(str, shards)) | ||||||
| sentry_sdk.set_tag("sentry_spans_buffer_component", "flusher") | ||||||
| sentry_sdk.set_tag("sentry_spans_buffer_shards", shard_tag) | ||||||
|
|
||||||
| try: | ||||||
| producer_futures = [] | ||||||
|
|
@@ -134,23 +175,28 @@ def produce(payload: KafkaPayload) -> None: | |||||
| else: | ||||||
| backpressure_since.value = 0 | ||||||
|
|
||||||
| # Update healthy_since for all shards handled by this process | ||||||
| healthy_since.value = system_now | ||||||
|
|
||||||
| if not flushed_segments: | ||||||
| time.sleep(1) | ||||||
| continue | ||||||
|
|
||||||
| with metrics.timer("spans.buffer.flusher.produce"): | ||||||
| for _, flushed_segment in flushed_segments.items(): | ||||||
| with metrics.timer("spans.buffer.flusher.produce", tags={"shard": shard_tag}): | ||||||
| for flushed_segment in flushed_segments.values(): | ||||||
|
||||||
| for flushed_segment in flushed_segments.values(): | |
| for shard, flushed_segment in flushed_segments.items(): |
Copilot
AI
Nov 14, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The isinstance check is redundant here since the kill() method exists on threading.Thread as well (it's just not typically used). However, the more important issue is that this code only kills multiprocessing.Process instances but not Thread instances, which could leave threads running. Consider handling both types or document why threads don't need to be terminated.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
[nitpick] The
shardsparameter is now passed to the main() method but the corresponding function signature at line 137 shows it's the second parameter. This creates a coupling where the order of arguments in the tuple must exactly match the function signature. Consider using keyword arguments or documenting this dependency more clearly.