Span Buffer Multiprocess Enhancement with Health Monitoring#12
Conversation
Codoki PR ReviewSummary: Fix futures leak, standardize metric tags Issues (Critical & High only)
Showing top 1 issues. Critical: 0, High: 1. See inline suggestions for more. Key Feedback (click to expand)
Confidence: 3/5 — Needs work before merge (1 high · status: Requires changes) React with 👍 or 👎 if you found this review useful. |
| "spans.buffer.segment_size_bytes", | ||
| len(kafka_payload.value), | ||
| tags={"shard": shard_tag}, | ||
| ) |
There was a problem hiding this comment.
| ) | |
| with metrics.timer("spans.buffer.flusher.wait_produce", tags={"shard": shard_tag}): | |
| for future in producer_futures: | |
| future.result() | |
| producer_futures.clear() |
| raise RuntimeError( | ||
| f"flusher process for shards {shards} crashed repeatedly ({cause}), restarting consumer" | ||
| ) | ||
| self.process_restarts[process_index] += 1 |
There was a problem hiding this comment.
🔷 Medium: Using isinstance(process, multiprocessing.Process) may miss processes created via a specific start method (spawn) depending on the context class, leading to not killing dead processes and potential duplicate restarts; checking for capability (kill) is more robust and also naturally excludes threads.
| self.process_restarts[process_index] += 1 | |
| if hasattr(process, "kill"): |
Mirrors ai-code-review-evaluation#6 for like-for-like benchmarking.
span-flusher-stablespan-flusher-multiprocessOriginal PR excerpt: