-
Notifications
You must be signed in to change notification settings - Fork 0
feat(uptime): Add ability to use queues to manage parallelism #2
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: kafka-consumer-parallel-before
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change | ||||||||||||||||||||||||
|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
|
|
@@ -118,7 +118,7 @@ def uptime_options() -> list[click.Option]: | |||||||||||||||||||||||||
| options = [ | ||||||||||||||||||||||||||
| click.Option( | ||||||||||||||||||||||||||
| ["--mode", "mode"], | ||||||||||||||||||||||||||
| type=click.Choice(["serial", "parallel", "batched-parallel"]), | ||||||||||||||||||||||||||
| type=click.Choice(["serial", "parallel", "batched-parallel", "thread-queue-parallel"]), | ||||||||||||||||||||||||||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. ✅
Suggested change
🤖 Grapple PR auto-fix • minor • confidence: 100% There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 🟡 MINOR — Intent Mismatch / PR-Issue Linkage (confidence: 82%) The linked issue #9 is titled 'Optimize spans buffer insertion with eviction during insert' which is about spans buffer optimization, not about queue-based parallelism for uptime consumers. This PR does not appear to address issue #9's actual content. Evidence:
Agent: logic There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. ✅
Suggested change
🤖 Grapple PR auto-fix • minor • confidence: 82% There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 🟠 MAJOR — API Contract / Consumer Registration (confidence: 82%) The new mode 'thread-queue-parallel' is added to the CLI choices, but the diff only shows 2 lines changed in this file. If the consumer factory wiring (the function that maps mode to strategy factory) does not also handle 'thread-queue-parallel', selecting this mode at runtime would pass validation but could fail silently or raise an error downstream. The Evidence:
Agent: architecture There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. ✅
Suggested change
🤖 Grapple PR auto-fix • major • confidence: 82% |
||||||||||||||||||||||||||
| default="serial", | ||||||||||||||||||||||||||
| help="The mode to process results in. Parallel uses multithreading.", | ||||||||||||||||||||||||||
| ), | ||||||||||||||||||||||||||
|
|
@@ -138,7 +138,7 @@ def uptime_options() -> list[click.Option]: | |||||||||||||||||||||||||
| ["--max-workers", "max_workers"], | ||||||||||||||||||||||||||
| type=int, | ||||||||||||||||||||||||||
| default=None, | ||||||||||||||||||||||||||
| help="The maximum number of threads to spawn in parallel mode.", | ||||||||||||||||||||||||||
| help="The maximum amount of parallelism to use when in a parallel mode.", | ||||||||||||||||||||||||||
| ), | ||||||||||||||||||||||||||
| click.Option(["--processes", "num_processes"], default=1, type=int), | ||||||||||||||||||||||||||
| click.Option(["--input-block-size"], type=int, default=None), | ||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||
| Original file line number | Diff line number | Diff line change | ||||||||||||||||||||
|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
| @@ -0,0 +1,345 @@ | ||||||||||||||||||||||
| from __future__ import annotations | ||||||||||||||||||||||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 🟠 MAJOR — Intent/Ticket Mismatch (confidence: 95%) The linked issue #9 is titled 'Optimize spans buffer insertion with eviction during insert' which describes spans buffer optimization, not queue-based parallelism for uptime consumers. This PR's changes are entirely about adding a QueueConsumer for uptime/remote subscription consumers. The PR does not address the linked ticket. Evidence:
Agent: logic |
||||||||||||||||||||||
|
|
||||||||||||||||||||||
| import logging | ||||||||||||||||||||||
| import queue | ||||||||||||||||||||||
| import threading | ||||||||||||||||||||||
| import time | ||||||||||||||||||||||
| from collections import defaultdict | ||||||||||||||||||||||
| from collections.abc import Callable | ||||||||||||||||||||||
| from dataclasses import dataclass | ||||||||||||||||||||||
| from typing import Any, Generic, TypeVar | ||||||||||||||||||||||
|
|
||||||||||||||||||||||
| import sentry_sdk | ||||||||||||||||||||||
| from arroyo.backends.kafka.consumer import KafkaPayload | ||||||||||||||||||||||
| from arroyo.processing.strategies import ProcessingStrategy | ||||||||||||||||||||||
| from arroyo.types import BrokerValue, FilteredPayload, Message, Partition | ||||||||||||||||||||||
|
|
||||||||||||||||||||||
| from sentry.utils import metrics | ||||||||||||||||||||||
|
|
||||||||||||||||||||||
| logger = logging.getLogger(__name__) | ||||||||||||||||||||||
|
|
||||||||||||||||||||||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 🔵 INFO — Code Organization (confidence: 85%) The module lacks a docstring explaining the purpose of the QueueConsumer abstraction and its role in the system. This is important for maintainability given the complexity of offset tracking and thread synchronization. Evidence:
Agent: style |
||||||||||||||||||||||
| T = TypeVar("T") | ||||||||||||||||||||||
|
|
||||||||||||||||||||||
|
|
||||||||||||||||||||||
| @dataclass | ||||||||||||||||||||||
| class WorkItem(Generic[T]): | ||||||||||||||||||||||
| """Work item that includes the original message for offset tracking.""" | ||||||||||||||||||||||
|
|
||||||||||||||||||||||
| partition: Partition | ||||||||||||||||||||||
| offset: int | ||||||||||||||||||||||
| result: T | ||||||||||||||||||||||
| message: Message[KafkaPayload | FilteredPayload] | ||||||||||||||||||||||
|
|
||||||||||||||||||||||
|
|
||||||||||||||||||||||
| class OffsetTracker: | ||||||||||||||||||||||
| """ | ||||||||||||||||||||||
| Tracks outstanding offsets and determines which offsets are safe to commit. | ||||||||||||||||||||||
|
|
||||||||||||||||||||||
| - Tracks offsets per partition | ||||||||||||||||||||||
| - Only commits offsets when all prior offsets are processed | ||||||||||||||||||||||
| - Thread-safe for concurrent access with per-partition locks | ||||||||||||||||||||||
| """ | ||||||||||||||||||||||
|
|
||||||||||||||||||||||
| def __init__(self) -> None: | ||||||||||||||||||||||
| self.all_offsets: dict[Partition, set[int]] = defaultdict(set) | ||||||||||||||||||||||
| self.outstanding: dict[Partition, set[int]] = defaultdict(set) | ||||||||||||||||||||||
| self.last_committed: dict[Partition, int] = {} | ||||||||||||||||||||||
| self.partition_locks: dict[Partition, threading.Lock] = {} | ||||||||||||||||||||||
|
|
||||||||||||||||||||||
| def _get_partition_lock(self, partition: Partition) -> threading.Lock: | ||||||||||||||||||||||
| """Get or create a lock for a partition.""" | ||||||||||||||||||||||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 🟡 MINOR — Documentation (confidence: 87%) Missing module-level docstring explaining the purpose, architecture, and key design decisions of the QueueConsumer system. This is important for understanding offset commit semantics, concurrency model, and when to use this mode. Evidence:
Agent: style There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 🟡
--- a/src/sentry/remote_subscriptions/consumers/queue_consumer.py
+++ b/src/sentry/remote_subscriptions/consumers/queue_consumer.py
@@ -1,4 +1,45 @@
from __future__ import annotations
+"""
+Queue-based parallel processing strategy for remote subscription / uptime consumers.
+
+Architecture Overview
+---------------------
+This module implements a fan-out parallelism model on top of Arroyo's
+``ProcessingStrategy`` interface. Instead of processing each Kafka message
+sequentially inside the Arroyo event loop, messages are decoded and dispatched
+into a fixed pool of in-process queues, each drained by a dedicated worker
+thread. This lets CPU-bound or I/O-bound result processing happen concurrently
+without spinning up a new thread per message.
+
+Key components
+--------------
+``WorkItem``
+ Thin wrapper that carries a decoded result together with the Kafka
+ partition/offset needed for offset tracking.
+
+``OffsetTracker``
+ Maintains per-partition sets of *all* seen offsets and *outstanding*
+ (not-yet-completed) offsets. An offset is only eligible for commit once
+ every lower offset in the same partition has been marked complete,
+ preserving the at-least-once delivery guarantee expected by Kafka consumers.
+ All mutations are protected by per-partition ``threading.Lock`` instances to
+ allow safe concurrent access from worker threads.
+
+``FixedQueuePool``
+ Owns a fixed number of ``queue.Queue`` objects (default 20) and a 1-to-1
+ mapping of worker threads. Groups (e.g. subscription IDs) are assigned to
+ a queue via ``hash(group_key) % num_queues``, so all messages belonging to
+ the same group are always processed by the same worker in FIFO order.
+
+``SimpleQueueProcessingStrategy``
+ The Arroyo ``ProcessingStrategy`` entry point. Its ``submit()`` method
+ decodes each message and hands it to ``FixedQueuePool``. A background
+ *commit thread* wakes up every second, asks ``OffsetTracker`` for the
+ highest safely committable offset per partition, invokes the upstream
+ commit callback, and advances the committed-offset watermark.
+
+Critical design considerations
+-------------------------------
+- **Offset commit safety**: offsets are committed only after *all* preceding
+ offsets in the partition are complete. Never call ``mark_committed`` before
+ ``get_committable_offsets`` confirms contiguous completion, or messages may
+ be silently skipped on consumer restart.
+- **Deadlock avoidance**: per-partition locks are held only for the duration of
+ a single set/dict operation; the locks are never nested. Do not acquire
+ multiple partition locks simultaneously.
+- **Backpressure**: ``queue.Queue.put()`` blocks when the queue is full (if a
+ ``maxsize`` is set). The current default is unbounded; add a ``maxsize`` if
+ memory growth under lag needs to be bounded.
+- **When to use this mode**: prefer ``SimpleQueueProcessingStrategy`` over a
+ purely sequential strategy when result processing is independently safe to
+ parallelise across groups (i.e. no cross-group shared mutable state) and
+ per-message latency variance is high enough to create head-of-line blocking.
+"""
+
import logging
import queue
import threading🤖 Grapple PR auto-fix • minor • Review this diff before applying |
||||||||||||||||||||||
| lock = self.partition_locks.get(partition) | ||||||||||||||||||||||
| if lock: | ||||||||||||||||||||||
| return lock | ||||||||||||||||||||||
| return self.partition_locks.setdefault(partition, threading.Lock()) | ||||||||||||||||||||||
|
|
||||||||||||||||||||||
| def add_offset(self, partition: Partition, offset: int) -> None: | ||||||||||||||||||||||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 🔴 CRITICAL — Thread Safety - Race Condition (confidence: 100%) The Evidence:
Agent: logic There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 🟡
--- a/src/sentry/remote_subscriptions/consumers/queue_consumer.py
+++ b/src/sentry/remote_subscriptions/consumers/queue_consumer.py
@@ -43,10 +43,17 @@ class OffsetTracker:
def __init__(self) -> None:
self.all_offsets: dict[Partition, set[int]] = defaultdict(set)
self.outstanding: dict[Partition, set[int]] = defaultdict(set)
self.last_committed: dict[Partition, int] = {}
self.partition_locks: dict[Partition, threading.Lock] = {}
+ # Protects partition_locks dict itself during check-and-create.
+ # Per-partition locks are still used for per-partition data access.
+ # This is necessary for correctness under no-GIL Python (PEP 703)
+ # and for clarity: dict.setdefault atomicity is a CPython detail,
+ # not a language guarantee.
+ self._registry_lock: threading.Lock = threading.Lock()
def _get_partition_lock(self, partition: Partition) -> threading.Lock:
"""Get or create a lock for a partition."""
- lock = self.partition_locks.get(partition)
- if lock:
- return lock
- return self.partition_locks.setdefault(partition, threading.Lock())
+ # Fast path: lock already exists (no registry lock needed for read
+ # once the partition is registered, since we never delete entries).
+ lock = self.partition_locks.get(partition)
+ if lock is not None:
+ return lock
+ # Slow path: partition not yet registered — acquire registry lock
+ # to eliminate the TOCTOU race between get() and insertion.
+ with self._registry_lock:
+ # Re-check inside the lock: another thread may have inserted
+ # the lock between our get() above and acquiring _registry_lock.
+ if partition not in self.partition_locks:
+ self.partition_locks[partition] = threading.Lock()
+ return self.partition_locks[partition]🤖 Grapple PR auto-fix • critical • Review this diff before applying |
||||||||||||||||||||||
| """Record that we've started processing an offset.""" | ||||||||||||||||||||||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 🔴 CRITICAL — Concurrency issues (confidence: 100%) Race condition in Evidence:
Agent: performance There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 🟡 MINOR — Caching opportunities (confidence: 100%)
Evidence:
Agent: performance There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 🟡
--- a/src/sentry/remote_subscriptions/consumers/queue_consumer.py
+++ b/src/sentry/remote_subscriptions/consumers/queue_consumer.py
@@ -43,16 +43,33 @@ class OffsetTracker:
def __init__(self) -> None:
- self.all_offsets: dict[Partition, set[int]] = defaultdict(set)
- self.outstanding: dict[Partition, set[int]] = defaultdict(set)
+ # Use plain dicts (not defaultdict) so that accesses outside a lock
+ # never silently create entries via __missing__, which is not thread-safe.
+ self.all_offsets: dict[Partition, set[int]] = {}
+ self.outstanding: dict[Partition, set[int]] = {}
self.last_committed: dict[Partition, int] = {}
self.partition_locks: dict[Partition, threading.Lock] = {}
+ # Guards creation of new partition entries in all four dicts above.
+ # Per-partition locks guard mutations to already-registered partitions.
+ self._meta_lock = threading.Lock()
def _get_partition_lock(self, partition: Partition) -> threading.Lock:
- """Get or create a lock for a partition."""
- lock = self.partition_locks.get(partition)
- if lock:
- return lock
- return self.partition_locks.setdefault(partition, threading.Lock())
+ """
+ Get or create a lock for a partition.
+
+ Uses _meta_lock to make partition registration fully atomic: the lock
+ object and the corresponding set entries in all_offsets/outstanding are
+ created together so no caller can ever see a partition lock without its
+ backing data structures already in place.
+ """
+ # Fast path: partition already registered (no write, safe to read under GIL
+ # after the initial write was done while holding _meta_lock).
+ lock = self.partition_locks.get(partition)
+ if lock is not None:
+ return lock
+ # Slow path: register the partition atomically.
+ with self._meta_lock:
+ # Re-check inside the lock to handle the race between two threads
+ # both reaching the slow path simultaneously.
+ if partition not in self.partition_locks:
+ self.partition_locks[partition] = threading.Lock()
+ self.all_offsets[partition] = set()
+ self.outstanding[partition] = set()
+ return self.partition_locks[partition]
def add_offset(self, partition: Partition, offset: int) -> None:
"""Record that we've started processing an offset."""
@@ -67,7 +84,12 @@ class OffsetTracker:
def get_committable_offsets(self) -> dict[Partition, int]:
"""
Get the highest offset per partition that can be safely committed.
For each partition, finds the highest contiguous offset that has been processed.
"""
committable = {}
- for partition in list(self.all_offsets.keys()):
+ # Snapshot the key list under _meta_lock so we don't iterate a dict
+ # that another thread is concurrently expanding with new partitions.
+ with self._meta_lock:
+ partitions = list(self.partition_locks.keys())
+ for partition in partitions:
with self._get_partition_lock(partition):🤖 Grapple PR auto-fix • critical • Review this diff before applying There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 🟡
--- a/src/sentry/remote_subscriptions/consumers/queue_consumer.py
+++ b/src/sentry/remote_subscriptions/consumers/queue_consumer.py
@@ -43,14 +43,26 @@ class OffsetTracker:
def __init__(self) -> None:
self.all_offsets: dict[Partition, set[int]] = defaultdict(set)
self.outstanding: dict[Partition, set[int]] = defaultdict(set)
self.last_committed: dict[Partition, int] = {}
self.partition_locks: dict[Partition, threading.Lock] = {}
+ def initialize_partition(self, partition: Partition) -> None:
+ """
+ Pre-create lock and data structures for a partition.
+
+ Should be called at partition assignment time so that the hot path
+ (add_offset / complete_offset) only performs a plain dict lookup
+ with no branching or lock-object creation.
+ """
+ if partition not in self.partition_locks:
+ self.partition_locks[partition] = threading.Lock()
+ # Pre-warm the defaultdict entries so hot-path accesses are O(1)
+ _ = self.all_offsets[partition]
+ _ = self.outstanding[partition]
+
def _get_partition_lock(self, partition: Partition) -> threading.Lock:
- """Get or create a lock for a partition."""
- lock = self.partition_locks.get(partition)
- if lock:
- return lock
- return self.partition_locks.setdefault(partition, threading.Lock())
+ """
+ Return the lock for a partition.
+
+ Fast path: plain dict lookup after initialize_partition() has been called.
+ Slow path (unexpected partition): creates the lock lazily so existing
+ behaviour is preserved if initialize_partition() was not called.
+ """
+ lock = self.partition_locks.get(partition)
+ if lock is not None:
+ return lock
+ # Unexpected partition — initialize defensively rather than crashing.
+ self.initialize_partition(partition)
+ return self.partition_locks[partition]
def add_offset(self, partition: Partition, offset: int) -> None:🤖 Grapple PR auto-fix • minor • Review this diff before applying |
||||||||||||||||||||||
| with self._get_partition_lock(partition): | ||||||||||||||||||||||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 🔴 CRITICAL — Offset Commit Correctness (confidence: 100%) The Evidence:
Agent: logic There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 🟡
--- a/src/sentry/remote_subscriptions/consumers/queue_consumer.py
+++ b/src/sentry/remote_subscriptions/consumers/queue_consumer.py
@@ -38,9 +38,15 @@ class OffsetTracker:
def __init__(self) -> None:
self.all_offsets: dict[Partition, set[int]] = defaultdict(set)
self.outstanding: dict[Partition, set[int]] = defaultdict(set)
self.last_committed: dict[Partition, int] = {}
self.partition_locks: dict[Partition, threading.Lock] = {}
+ # Global lock guards creation of per-partition locks only.
+ # Once a partition lock exists, this lock is not needed for
+ # add_offset / complete_offset / mark_committed (fast path).
+ self._global_lock = threading.Lock()
def _get_partition_lock(self, partition: Partition) -> threading.Lock:
"""Get or create a lock for a partition."""
- lock = self.partition_locks.get(partition)
- if lock:
- return lock
- return self.partition_locks.setdefault(partition, threading.Lock())
+ # Fast path: partition lock already exists, no global lock needed.
+ # dict reads are safe under CPython GIL for existing keys, and we
+ # only mutate partition_locks under _global_lock below.
+ lock = self.partition_locks.get(partition)
+ if lock is not None:
+ return lock
+ # Slow path: create the partition lock under the global lock to
+ # eliminate the TOCTOU between get() and setdefault().
+ with self._global_lock:
+ # Re-check inside the lock: another thread may have created
+ # the lock between our get() above and acquiring _global_lock.
+ lock = self.partition_locks.get(partition)
+ if lock is not None:
+ return lock
+ lock = threading.Lock()
+ self.partition_locks[partition] = lock
+ return lock🤖 Grapple PR auto-fix • critical • Review this diff before applying |
||||||||||||||||||||||
| self.all_offsets[partition].add(offset) | ||||||||||||||||||||||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 🔴 CRITICAL — Thread Safety / Race Condition (confidence: 100%) The Evidence:
Agent: architecture There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 🟡
--- a/src/sentry/remote_subscriptions/consumers/queue_consumer.py
+++ b/src/sentry/remote_subscriptions/consumers/queue_consumer.py
@@ -43,12 +43,22 @@ class OffsetTracker:
def __init__(self) -> None:
self.all_offsets: dict[Partition, set[int]] = defaultdict(set)
self.outstanding: dict[Partition, set[int]] = defaultdict(set)
self.last_committed: dict[Partition, int] = {}
self.partition_locks: dict[Partition, threading.Lock] = {}
+ # Protects partition_locks dict itself against concurrent reads/writes
+ # from multiple threads (main consumer thread via add_offset, worker
+ # threads via complete_offset). Per-partition locks are acquired
+ # separately *after* releasing this meta-lock so that unrelated
+ # partitions never contend on a single global lock.
+ self._meta_lock: threading.Lock = threading.Lock()
def _get_partition_lock(self, partition: Partition) -> threading.Lock:
"""Get or create a lock for a partition."""
- lock = self.partition_locks.get(partition)
- if lock:
- return lock
- return self.partition_locks.setdefault(partition, threading.Lock())
+ # Fast path: partition already registered — read under meta-lock to
+ # avoid a TOCTOU race on the underlying dict (which is not thread-safe
+ # for concurrent reads during a resize triggered by another writer).
+ with self._meta_lock:
+ lock = self.partition_locks.get(partition)
+ if lock is None:
+ lock = threading.Lock()
+ self.partition_locks[partition] = lock
+ return lock🤖 Grapple PR auto-fix • critical • Review this diff before applying There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 🔴 CRITICAL — Thread Safety - Race Condition (confidence: 100%) The Evidence:
Agent: architecture There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 🟡
--- a/src/sentry/remote_subscriptions/consumers/queue_consumer.py
+++ b/src/sentry/remote_subscriptions/consumers/queue_consumer.py
@@ -40,12 +40,21 @@ class OffsetTracker:
def __init__(self) -> None:
self.all_offsets: dict[Partition, set[int]] = defaultdict(set)
self.outstanding: dict[Partition, set[int]] = defaultdict(set)
self.last_committed: dict[Partition, int] = {}
self.partition_locks: dict[Partition, threading.Lock] = {}
+ # Guards mutations to partition_locks dict itself, ensuring safety
+ # under free-threaded Python 3.13+ (no-GIL) where dict operations
+ # are no longer implicitly serialized.
+ self._partition_locks_lock: threading.Lock = threading.Lock()
def _get_partition_lock(self, partition: Partition) -> threading.Lock:
"""Get or create a lock for a partition."""
- lock = self.partition_locks.get(partition)
- if lock:
- return lock
- return self.partition_locks.setdefault(partition, threading.Lock())
+ # Fast path: read under the meta-lock to avoid TOCTOU between the
+ # initial get() and setdefault(). Under CPython with the GIL this
+ # is equivalent to the old pattern but is also safe under
+ # free-threaded Python 3.13+ (PEP 703) where dict operations are
+ # no longer serialized by the GIL.
+ lock = self.partition_locks.get(partition)
+ if lock is not None:
+ return lock
+ with self._partition_locks_lock:
+ # Re-check inside the lock (double-checked locking) so we
+ # don't allocate an extra Lock() object unnecessarily when two
+ # threads race here simultaneously.
+ return self.partition_locks.setdefault(partition, threading.Lock())🤖 Grapple PR auto-fix • critical • Review this diff before applying |
||||||||||||||||||||||
| self.outstanding[partition].add(offset) | ||||||||||||||||||||||
|
|
||||||||||||||||||||||
| def complete_offset(self, partition: Partition, offset: int) -> None: | ||||||||||||||||||||||
| """Mark an offset as completed.""" | ||||||||||||||||||||||
| with self._get_partition_lock(partition): | ||||||||||||||||||||||
| self.outstanding[partition].discard(offset) | ||||||||||||||||||||||
|
|
||||||||||||||||||||||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 🟡 MINOR — Edge Case - Empty State (confidence: 97%) The Evidence:
Agent: logic There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. ✅
Suggested change
🤖 Grapple PR auto-fix • minor • confidence: 97% |
||||||||||||||||||||||
| def get_committable_offsets(self) -> dict[Partition, int]: | ||||||||||||||||||||||
| """ | ||||||||||||||||||||||
| Get the highest offset per partition that can be safely committed. | ||||||||||||||||||||||
|
|
||||||||||||||||||||||
| For each partition, finds the highest contiguous offset that has been processed. | ||||||||||||||||||||||
| """ | ||||||||||||||||||||||
| committable = {} | ||||||||||||||||||||||
| for partition in list(self.all_offsets.keys()): | ||||||||||||||||||||||
| with self._get_partition_lock(partition): | ||||||||||||||||||||||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 🔴 CRITICAL — Concurrency issues (confidence: 100%) _get_partition_lock() has a race condition: checking existence and then calling setdefault are not atomic relative to concurrent reads of partition_locks itself, and partition_locks is a plain dict with no lock protecting it. Two threads could simultaneously pass the Evidence:
Agent: performance There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 🟡
--- a/src/sentry/remote_subscriptions/consumers/queue_consumer.py
+++ b/src/sentry/remote_subscriptions/consumers/queue_consumer.py
@@ -43,10 +43,16 @@ class OffsetTracker:
def __init__(self) -> None:
self.all_offsets: dict[Partition, set[int]] = defaultdict(set)
self.outstanding: dict[Partition, set[int]] = defaultdict(set)
self.last_committed: dict[Partition, int] = {}
self.partition_locks: dict[Partition, threading.Lock] = {}
+ # Protects structural modifications to partition_locks (i.e. adding new
+ # partition entries). Per-partition locks are still used for the actual
+ # offset data so contention on this registry lock is minimal — it is
+ # only held long enough to do a dict lookup + possible setdefault.
+ self._registry_lock: threading.Lock = threading.Lock()
def _get_partition_lock(self, partition: Partition) -> threading.Lock:
"""Get or create a lock for a partition."""
- lock = self.partition_locks.get(partition)
- if lock:
- return lock
- return self.partition_locks.setdefault(partition, threading.Lock())
+ # Fast path: partition already registered. Read under registry lock so
+ # we never observe a partially-constructed partition_locks dict.
+ with self._registry_lock:
+ lock = self.partition_locks.get(partition)
+ if lock is not None:
+ return lock
+ # Slow path: first time we see this partition — create and register
+ # atomically under the same registry lock so no other thread can
+ # race us to setdefault and end up with a different lock object.
+ new_lock = threading.Lock()
+ self.partition_locks[partition] = new_lock
+ return new_lock🤖 Grapple PR auto-fix • critical • Review this diff before applying There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 🟠 MAJOR — Memory leak (confidence: 100%) all_offsets[partition] is never cleaned up after mark_committed is called. Offsets are added to all_offsets in add_offset() but the mark_committed() method only updates last_committed and removes from outstanding — it never removes the offset from all_offsets. Over time this set grows without bound, consuming increasing memory. Evidence:
Agent: performance There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. ✅
Suggested change
🤖 Grapple PR auto-fix • major • confidence: 100% |
||||||||||||||||||||||
| all_offsets = self.all_offsets[partition] | ||||||||||||||||||||||
| if not all_offsets: | ||||||||||||||||||||||
| continue | ||||||||||||||||||||||
|
|
||||||||||||||||||||||
| outstanding = self.outstanding[partition] | ||||||||||||||||||||||
| last_committed = self.last_committed.get(partition, -1) | ||||||||||||||||||||||
|
|
||||||||||||||||||||||
| min_offset = min(all_offsets) | ||||||||||||||||||||||
| max_offset = max(all_offsets) | ||||||||||||||||||||||
|
|
||||||||||||||||||||||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 🔴 CRITICAL — Memory leaks (confidence: 100%) OffsetTracker.all_offsets grows unboundedly — completed offsets are never removed from all_offsets, only from outstanding. Over time this set accumulates every offset ever seen for a partition, causing unbounded memory growth in long-running consumers. Evidence:
Agent: performance There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 🟡
--- a/src/sentry/remote_subscriptions/consumers/queue_consumer.py
+++ b/src/sentry/remote_subscriptions/consumers/queue_consumer.py
@@ -93,6 +93,18 @@ class OffsetTracker:
def mark_committed(self, partition: Partition, offset: int) -> None:
"""Update the last committed offset for a partition."""
with self._get_partition_lock(partition):
self.last_committed[partition] = offset
# Remove all offsets <= committed offset to prevent unbounded growth
+ # all_offsets now only holds the in-flight window, not the full history
self.all_offsets[partition] = {o for o in self.all_offsets[partition] if o > offset}
+
+ def revoke_partition(self, partition: Partition) -> None:
+ """
+ Clear all state for a partition on rebalance.
+
+ Must be called when a partition is revoked to prevent stale last_committed
+ from suppressing valid offsets after reassignment at a lower offset.
+ """
+ with self._get_partition_lock(partition):
+ self.all_offsets.pop(partition, None)
+ self.outstanding.pop(partition, None)
+ self.last_committed.pop(partition, None)
+ # Note: keep the lock entry — it's cheap and avoids a TOCTOU race
+ # if a worker thread completes an offset for this partition concurrently.
@@ -240,6 +252,20 @@ class SimpleQueueProcessingStrategy(ProcessingStrategy[KafkaPayload], Generic[T
self.commit_thread = threading.Thread(target=self._commit_loop, daemon=True)
self.commit_thread.start()
+ def revoke_partitions(self, partitions: list[Partition]) -> None:
+ """
+ Called by Arroyo on partition rebalance.
+
+ Clears OffsetTracker state for revoked partitions so that stale
+ last_committed values don't suppress offsets after reassignment.
+ Without this, a partition reassigned at a lower offset (e.g. after
+ consumer group rebalance) would never produce a committable offset
+ because all new offsets would be < last_committed.
+ """
+ for partition in partitions:
+ logger.info(
+ "Revoking partition state from offset tracker",
+ extra={"partition": partition},
+ )
+ self.queue_pool.offset_tracker.revoke_partition(partition)
+
def _commit_loop(self) -> None:
while not self.shutdown_event.is_set():
try:🤖 Grapple PR auto-fix • critical • Review this diff before applying |
||||||||||||||||||||||
| start = max(last_committed + 1, min_offset) | ||||||||||||||||||||||
|
|
||||||||||||||||||||||
| highest_committable = last_committed | ||||||||||||||||||||||
| for offset in range(start, max_offset + 1): | ||||||||||||||||||||||
| if offset in all_offsets and offset not in outstanding: | ||||||||||||||||||||||
| highest_committable = offset | ||||||||||||||||||||||
| else: | ||||||||||||||||||||||
| break | ||||||||||||||||||||||
|
|
||||||||||||||||||||||
| if highest_committable > last_committed: | ||||||||||||||||||||||
| committable[partition] = highest_committable | ||||||||||||||||||||||
|
|
||||||||||||||||||||||
| return committable | ||||||||||||||||||||||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 🔴 CRITICAL — Logic Error - Offset Commit Correctness (confidence: 100%) The Evidence:
Agent: logic There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. ✅
Suggested change
🤖 Grapple PR auto-fix • critical • confidence: 100% |
||||||||||||||||||||||
|
|
||||||||||||||||||||||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 🔵 INFO — Naming conventions (confidence: 83%) The method Evidence:
Agent: style |
||||||||||||||||||||||
| def mark_committed(self, partition: Partition, offset: int) -> None: | ||||||||||||||||||||||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 🔴 CRITICAL — Concurrency issues (confidence: 100%)
Evidence:
Agent: performance There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 🟠 MAJOR — Algorithmic complexity (confidence: 100%)
Evidence:
Agent: performance There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 🟡 MINOR — Algorithmic complexity (confidence: 100%) The contiguous offset scan Evidence:
Agent: performance There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 🟡
--- a/src/sentry/remote_subscriptions/consumers/queue_consumer.py
+++ b/src/sentry/remote_subscriptions/consumers/queue_consumer.py
@@ -38,6 +38,12 @@ class OffsetTracker:
def __init__(self) -> None:
self.all_offsets: dict[Partition, set[int]] = defaultdict(set)
self.outstanding: dict[Partition, set[int]] = defaultdict(set)
self.last_committed: dict[Partition, int] = {}
self.partition_locks: dict[Partition, threading.Lock] = {}
+ # Global lock guards cross-partition operations (key enumeration, new partition
+ # registration). Per-partition locks guard per-partition data mutations.
+ # Use RLock in case future refactors call these methods re-entrantly.
+ self._global_lock = threading.RLock()
def _get_partition_lock(self, partition: Partition) -> threading.Lock:
"""Get or create a lock for a partition."""
- lock = self.partition_locks.get(partition)
- if lock:
- return lock
- return self.partition_locks.setdefault(partition, threading.Lock())
+ # Fast path: partition lock already exists, no global lock needed.
+ lock = self.partition_locks.get(partition)
+ if lock:
+ return lock
+ # Slow path: new partition — hold global lock to prevent two threads
+ # from racing on setdefault for the same partition.
+ with self._global_lock:
+ return self.partition_locks.setdefault(partition, threading.Lock())
def add_offset(self, partition: Partition, offset: int) -> None:
"""Record that we've started processing an offset."""
with self._get_partition_lock(partition):
self.all_offsets[partition].add(offset)
self.outstanding[partition].add(offset)
def complete_offset(self, partition: Partition, offset: int) -> None:
"""Mark an offset as completed."""
with self._get_partition_lock(partition):
self.outstanding[partition].discard(offset)
def get_committable_offsets(self) -> dict[Partition, int]:
"""
Get the highest offset per partition that can be safely committed.
For each partition, finds the highest contiguous offset that has been processed.
"""
committable = {}
- for partition in list(self.all_offsets.keys()):
+ # Hold the global lock only long enough to take a consistent snapshot of
+ # the partition keys. This prevents add_offset from inserting a brand-new
+ # partition (and its incomplete defaultdict entry) between our snapshot and
+ # the per-partition lock acquisition below.
+ with self._global_lock:
+ partitions = list(self.all_offsets.keys())
+
+ for partition in partitions:
with self._get_partition_lock(partition):
all_offsets = self.all_offsets[partition]
if not all_offsets:
continue
outstanding = self.outstanding[partition]
last_committed = self.last_committed.get(partition, -1)
min_offset = min(all_offsets)
max_offset = max(all_offsets)
start = max(last_committed + 1, min_offset)
highest_committable = last_committed
for offset in range(start, max_offset + 1):
if offset in all_offsets and offset not in outstanding:
highest_committable = offset
else:
break
if highest_committable > last_committed:
committable[partition] = highest_committable
return committable
def mark_committed(self, partition: Partition, offset: int) -> None:
"""Update the last committed offset for a partition."""
with self._get_partition_lock(partition):
self.last_committed[partition] = offset
# Remove all offsets <= committed offset
self.all_offsets[partition] = {o for o in self.all_offsets[partition] if o > offset}🤖 Grapple PR auto-fix • critical • Review this diff before applying There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 🟡
--- a/src/sentry/remote_subscriptions/consumers/queue_consumer.py
+++ b/src/sentry/remote_subscriptions/consumers/queue_consumer.py
@@ -1,6 +1,7 @@
from __future__ import annotations
import logging
+import queue
import threading
import time
from collections import defaultdict
@@ -8,6 +9,8 @@ from collections.abc import Callable
from dataclasses import dataclass
from typing import Any, Generic, TypeVar
+from sortedcontainers import SortedList
+
import sentry_sdk
from arroyo.backends.kafka.consumer import KafkaPayload
from arroyo.processing.strategies import ProcessingStrategy
@@ -33,7 +36,9 @@ class OffsetTracker:
- Thread-safe for concurrent access with per-partition locks
"""
+ # Uses SortedList so min() is O(1) via index 0, and iteration is already
+ # sorted — eliminating O(k) min()/max() scans and O(gap) range walks.
def __init__(self) -> None:
- self.all_offsets: dict[Partition, set[int]] = defaultdict(set)
+ self.all_offsets: dict[Partition, SortedList[int]] = defaultdict(SortedList)
self.outstanding: dict[Partition, set[int]] = defaultdict(set)
self.last_committed: dict[Partition, int] = {}
self.partition_locks: dict[Partition, threading.Lock] = {}
@@ -52,7 +57,7 @@ class OffsetTracker:
def add_offset(self, partition: Partition, offset: int) -> None:
"""Record that we've started processing an offset."""
with self._get_partition_lock(partition):
- self.all_offsets[partition].add(offset)
+ self.all_offsets[partition].add(offset) # O(log k) insert into SortedList
self.outstanding[partition].add(offset)
def complete_offset(self, partition: Partition, offset: int) -> None:
@@ -67,32 +72,46 @@ class OffsetTracker:
def get_committable_offsets(self) -> dict[Partition, int]:
"""
Get the highest offset per partition that can be safely committed.
For each partition, finds the highest contiguous offset that has been processed.
+
+ Complexity: O(c * log k) per partition where c is the number of newly
+ committable offsets this cycle and k is the total tracked offsets.
+ Previously O(k) for min/max + O(gap) for the range scan.
"""
committable = {}
for partition in list(self.all_offsets.keys()):
with self._get_partition_lock(partition):
all_offsets = self.all_offsets[partition]
if not all_offsets:
continue
outstanding = self.outstanding[partition]
last_committed = self.last_committed.get(partition, -1)
- min_offset = min(all_offsets)
- max_offset = max(all_offsets)
-
- start = max(last_committed + 1, min_offset)
-
+ # SortedList[0] is O(1) — no full iteration needed.
+ # We start from the smallest tracked offset that is beyond
+ # last_committed, so we never revisit already-committed offsets.
highest_committable = last_committed
- for offset in range(start, max_offset + 1):
- if offset in all_offsets and offset not in outstanding:
- highest_committable = offset
- else:
+
+ # bisect_right returns the insertion point after last_committed,
+ # giving us the index of the first candidate offset — O(log k).
+ start_idx = all_offsets.bisect_right(last_committed)
+
+ # Walk the sorted offsets sequentially. We stop as soon as we
+ # find a gap (next offset != previous + 1) or an outstanding
+ # (not-yet-completed) offset. This is O(c) where c is the
+ # number of contiguous completed offsets, NOT O(max - min).
+ expected = last_committed + 1
+ for offset in all_offsets.islice(start_idx):
+ if offset != expected:
+ # Gap in offsets — cannot commit past here.
+ break
+ if offset in outstanding:
+ # This offset is still being processed — stop.
break
+ highest_committable = offset
+ expected += 1
if highest_committable > last_committed:
committable[partition] = highest_committable
return committable
@@ -101,9 +120,11 @@ class OffsetTracker:
def mark_committed(self, partition: Partition, offset: int) -> None:
"""Update the last committed offset for a partition."""
with self._get_partition_lock(partition):
self.last_committed[partition] = offset
- # Remove all offsets <= committed offset
- self.all_offsets[partition] = {o for o in self.all_offsets[partition] if o > offset}
+ # Remove all offsets <= committed offset from the SortedList.
+ # irange gives us only the values to remove — O(c * log k).
+ to_remove = list(self.all_offsets[partition].irange(None, offset))
+ for o in to_remove:
+ self.all_offsets[partition].remove(o)🤖 Grapple PR auto-fix • major • Review this diff before applying There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 🟡
--- a/src/sentry/remote_subscriptions/consumers/queue_consumer.py
+++ b/src/sentry/remote_subscriptions/consumers/queue_consumer.py
@@ -67,22 +67,23 @@ class OffsetTracker:
def get_committable_offsets(self) -> dict[Partition, int]:
"""
Get the highest offset per partition that can be safely committed.
For each partition, finds the highest contiguous offset that has been processed.
+ Uses a sorted walk over tracked offsets (O(k log k)) rather than a range scan
+ over the full integer gap (O(gap_size)), avoiding lock contention on large gaps.
"""
committable = {}
for partition in list(self.all_offsets.keys()):
with self._get_partition_lock(partition):
all_offsets = self.all_offsets[partition]
if not all_offsets:
continue
outstanding = self.outstanding[partition]
last_committed = self.last_committed.get(partition, -1)
- min_offset = min(all_offsets)
- max_offset = max(all_offsets)
-
- start = max(last_committed + 1, min_offset)
-
+ # Sort once; walk until we hit a gap or an outstanding offset.
+ # This is O(k log k) where k = len(all_offsets), which is bounded
+ # by the in-flight window — not the numeric gap between offsets.
+ sorted_offsets = sorted(all_offsets)
highest_committable = last_committed
- for offset in range(start, max_offset + 1):
- if offset in all_offsets and offset not in outstanding:
- highest_committable = offset
+
+ prev = None
+ for offset in sorted_offsets:
+ # Skip offsets already committed
+ if offset <= last_committed:
+ continue
+ # Stop if there is a numeric gap (offset was never added but
+ # lies between last_committed and this offset), or if the
+ # offset is still outstanding.
+ expected = (prev + 1) if prev is not None else (last_committed + 1)
+ if offset != expected or offset in outstanding:
else:
break
+ highest_committable = offset
+ prev = offset
if highest_committable > last_committed:
committable[partition] = highest_committable
return committable🤖 Grapple PR auto-fix • minor • Review this diff before applying There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 🔴 CRITICAL — Offset Commit Correctness (confidence: 100%) The Evidence:
Agent: logic There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 🟠 MAJOR — Logic Error / get_committable_offsets Iterates Stale all_offsets Set (confidence: 100%) The Evidence:
Agent: security There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 🟠 MAJOR — Logic Error / Offset Tracking Cleanup on Partition Revocation (confidence: 100%) OffsetTracker has no method to remove/reset state for a partition when Kafka partitions are revoked during a rebalance. If a partition is revoked and later reassigned, stale offset data in Evidence:
Agent: logic There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 🟡
--- a/src/sentry/remote_subscriptions/consumers/queue_consumer.py
+++ b/src/sentry/remote_subscriptions/consumers/queue_consumer.py
@@ -67,6 +67,15 @@ class OffsetTracker:
def get_committable_offsets(self) -> dict[Partition, int]:
"""
Get the highest offset per partition that can be safely committed.
For each partition, finds the highest contiguous offset that has been processed.
+
+ Note: Every Kafka offset that this consumer receives must be registered via
+ add_offset() (and subsequently complete_offset()) — including filtered/skipped
+ messages — so that the contiguity scan below has a complete picture. If an
+ offset is never added to all_offsets, a gap between last_committed and
+ min(all_offsets) will block commits for that partition, which is the safe
+ behavior. See submit() in SimpleQueueProcessingStrategy for how FilteredPayload
+ and decoder-None results are handled.
"""
committable = {}
for partition in list(self.all_offsets.keys()):
@@ -83,8 +92,21 @@ class OffsetTracker:
min_offset = min(all_offsets)
max_offset = max(all_offsets)
+ # If there is a gap between the last committed offset and the lowest
+ # tracked offset, it means one or more Kafka offsets were never
+ # registered with add_offset(). We cannot safely commit past
+ # last_committed because we have no record of those intermediate
+ # offsets being processed.
+ #
+ # Example: last_committed=100, all_offsets={102} means offset 101
+ # was never tracked. We must not advance the commit position until
+ # offset 101 is accounted for (or we know it cannot appear).
+ if min_offset > last_committed + 1:
+ # There is an untracked gap — cannot commit anything new.
+ continue
+
start = max(last_committed + 1, min_offset)
highest_committable = last_committed
for offset in range(start, max_offset + 1):
- if offset in all_offsets and offset not in outstanding:
+ if offset in all_offsets:
+ if offset in outstanding:
+ # This offset is still being processed — stop here.
+ break
highest_committable = offset
else:
+ # This offset is not in all_offsets — it is a gap between
+ # two tracked offsets (e.g., offset was never registered).
+ # This is unsafe to skip; stop scanning.
break
if highest_committable > last_committed:🤖 Grapple PR auto-fix • critical • Review this diff before applying There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 🟡
--- a/src/sentry/remote_subscriptions/consumers/queue_consumer.py
+++ b/src/sentry/remote_subscriptions/consumers/queue_consumer.py
@@ -62,7 +62,7 @@ class OffsetTracker:
def complete_offset(self, partition: Partition, offset: int) -> None:
"""Mark an offset as completed."""
with self._get_partition_lock(partition):
self.outstanding[partition].discard(offset)
+ self.all_offsets[partition].discard(offset)
def get_committable_offsets(self) -> dict[Partition, int]:
"""
Get the highest offset per partition that can be safely committed.
- For each partition, finds the highest contiguous offset that has been processed.
+ For each partition, finds the highest contiguous completed offset that has been processed.
+ An offset is "completed" when it has been removed from both outstanding and all_offsets
+ (via complete_offset). We track the committed frontier via last_committed and only
+ advance it over a contiguous run of completed offsets above that frontier.
"""
committable = {}
for partition in list(self.all_offsets.keys()):
with self._get_partition_lock(partition):
- all_offsets = self.all_offsets[partition]
- if not all_offsets:
+ outstanding = self.outstanding[partition]
+ if not outstanding:
+ # No in-flight offsets; nothing new to commit
continue
- outstanding = self.outstanding[partition]
last_committed = self.last_committed.get(partition, -1)
- min_offset = min(all_offsets)
- max_offset = max(all_offsets)
-
- start = max(last_committed + 1, min_offset)
+ # The next offset to commit is just past what we last committed.
+ # The frontier of outstanding tells us how far we can scan.
+ start = last_committed + 1
+ # We can commit up to (but not through) the smallest in-flight offset.
+ # Everything below min(outstanding) has been completed.
+ committable_up_to = min(outstanding) - 1
highest_committable = last_committed
- for offset in range(start, max_offset + 1):
- if offset in all_offsets and offset not in outstanding:
- highest_committable = offset
- else:
- break
+ # Every offset in [start, committable_up_to] has been completed
+ # (no longer outstanding, no longer in all_offsets) so we can
+ # advance the committed frontier to committable_up_to.
+ if committable_up_to >= start:
+ highest_committable = committable_up_to
if highest_committable > last_committed:
committable[partition] = highest_committable
return committable
def mark_committed(self, partition: Partition, offset: int) -> None:
"""Update the last committed offset for a partition."""
with self._get_partition_lock(partition):
self.last_committed[partition] = offset
- # Remove all offsets <= committed offset
- self.all_offsets[partition] = {o for o in self.all_offsets[partition] if o > offset}
+ # Prune any stale entries <= committed offset from both tracking sets.
+ # Under normal operation these should already be empty (complete_offset
+ # removes from all_offsets; outstanding entries are in-flight and should
+ # not be below the committed frontier), but we prune defensively.
+ self.all_offsets[partition] = {
+ o for o in self.all_offsets[partition] if o > offset
+ }
+ self.outstanding[partition] = {
+ o for o in self.outstanding[partition] if o > offset
+ }🤖 Grapple PR auto-fix • major • Review this diff before applying There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 🟡
--- a/src/sentry/remote_subscriptions/consumers/queue_consumer.py
+++ b/src/sentry/remote_subscriptions/consumers/queue_consumer.py
@@ -100,6 +100,30 @@ class OffsetTracker:
def mark_committed(self, partition: Partition, offset: int) -> None:
"""Update the last committed offset for a partition."""
with self._get_partition_lock(partition):
self.last_committed[partition] = offset
# Remove all offsets <= committed offset
self.all_offsets[partition] = {o for o in self.all_offsets[partition] if o > offset}
+
+ def remove_partition(self, partition: Partition) -> None:
+ """
+ Remove all tracking state for a partition.
+
+ Must be called when a partition is revoked during a Kafka rebalance.
+ Since OffsetTracker persists on FixedQueuePool (which lives on the factory
+ across rebalances), failing to clean up here will cause stale offsets from
+ a previous assignment to corrupt get_committable_offsets() and mark_committed()
+ on reassignment.
+
+ Acquires the partition lock before removal to guard against concurrent
+ worker threads that may still be calling complete_offset() for in-flight
+ work items on this partition.
+ """
+ lock = self.partition_locks.get(partition)
+ if lock is None:
+ # Partition was never tracked; nothing to clean up.
+ return
+ with lock:
+ self.all_offsets.pop(partition, None)
+ self.outstanding.pop(partition, None)
+ self.last_committed.pop(partition, None)
+ # Remove the lock entry last, outside the lock itself, so any thread
+ # that acquired it just before us will finish its critical section
+ # before the entry disappears.
+ self.partition_locks.pop(partition, None)
class OrderedQueueWorker(threading.Thread, Generic[T]):
@@ -196,6 +220,19 @@ class FixedQueuePool(Generic[T]):
def wait_until_empty(self, timeout: float = 5.0) -> bool:
"""Wait until all queues are empty. Returns True if successful, False if timeout."""
start_time = time.time()
while time.time() - start_time < timeout:
if self.get_stats()["total_items"] == 0:
return True
time.sleep(0.01)
return False
+ def on_partitions_revoked(self, revoked: list[Partition]) -> None:
+ """
+ Clean up OffsetTracker state for revoked partitions.
+
+ Called by the consumer factory's create_with_partitions on rebalance.
+ Because FixedQueuePool (and its OffsetTracker) is shared across strategy
+ instances, we must explicitly clear per-partition state here; otherwise
+ stale all_offsets / outstanding / last_committed entries from the previous
+ assignment will corrupt commit logic when the partition is reassigned.
+ """
+ for partition in revoked:
+ logger.info(
+ "Clearing OffsetTracker state for revoked partition",
+ extra={"partition": partition},
+ )
+ self.offset_tracker.remove_partition(partition)
+
def shutdown(self) -> None:
"""Gracefully shutdown all workers."""
for worker in self.workers:🤖 Grapple PR auto-fix • major • Review this diff before applying There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 🔴 CRITICAL — Offset Commit Correctness (confidence: 100%) The Evidence:
Agent: architecture There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 🟡
--- a/src/sentry/remote_subscriptions/consumers/queue_consumer.py
+++ b/src/sentry/remote_subscriptions/consumers/queue_consumer.py
@@ -38,6 +38,7 @@ class OffsetTracker:
def __init__(self) -> None:
self.all_offsets: dict[Partition, set[int]] = defaultdict(set)
self.outstanding: dict[Partition, set[int]] = defaultdict(set)
self.last_committed: dict[Partition, int] = {}
self.partition_locks: dict[Partition, threading.Lock] = {}
+ self._global_lock = threading.RLock()
@@ -45,7 +46,10 @@ class OffsetTracker:
def _get_partition_lock(self, partition: Partition) -> threading.Lock:
"""Get or create a lock for a partition."""
lock = self.partition_locks.get(partition)
if lock:
return lock
return self.partition_locks.setdefault(partition, threading.Lock())
+ # NOTE: callers must hold self._global_lock before calling this method
+ # to avoid a race between the .get() check and .setdefault() on
+ # self.partition_locks from concurrent threads introducing a new partition.
@@ -53,10 +57,14 @@ class OffsetTracker:
def add_offset(self, partition: Partition, offset: int) -> None:
"""Record that we've started processing an offset."""
- with self._get_partition_lock(partition):
+ with self._global_lock:
+ lock = self._get_partition_lock(partition)
+ with lock:
self.all_offsets[partition].add(offset)
self.outstanding[partition].add(offset)
@@ -63,8 +71,11 @@ class OffsetTracker:
def complete_offset(self, partition: Partition, offset: int) -> None:
"""Mark an offset as completed."""
- with self._get_partition_lock(partition):
+ with self._global_lock:
+ lock = self._get_partition_lock(partition)
+ with lock:
self.outstanding[partition].discard(offset)
@@ -73,26 +84,44 @@ class OffsetTracker:
def get_committable_offsets(self) -> dict[Partition, int]:
"""
Get the highest offset per partition that can be safely committed.
For each partition, finds the highest contiguous offset that has been processed.
+ Contiguity is evaluated over the set of *tracked* offsets only — gaps from
+ Kafka compaction, transactions, or filtered messages (offsets never added via
+ add_offset) are intentionally ignored.
"""
committable = {}
- for partition in list(self.all_offsets.keys()):
- with self._get_partition_lock(partition):
- all_offsets = self.all_offsets[partition]
- if not all_offsets:
- continue
-
- outstanding = self.outstanding[partition]
- last_committed = self.last_committed.get(partition, -1)
-
- min_offset = min(all_offsets)
- max_offset = max(all_offsets)
-
- start = max(last_committed + 1, min_offset)
-
- highest_committable = last_committed
- for offset in range(start, max_offset + 1):
- if offset in all_offsets and offset not in outstanding:
- highest_committable = offset
- else:
- break
-
- if highest_committable > last_committed:
- committable[partition] = highest_committable
+ # Take a snapshot of partitions under the global lock so that concurrent
+ # add_offset / complete_offset calls on new partitions cannot mutate
+ # self.all_offsets or self.partition_locks while we iterate.
+ with self._global_lock:
+ partitions = list(self.all_offsets.keys())
+ for partition in partitions:
+ with self._get_partition_lock(partition):
+ all_offsets = self.all_offsets[partition]
+ if not all_offsets:
+ continue
+
+ outstanding = self.outstanding[partition]
+ last_committed = self.last_committed.get(partition, -1)
+
+ # Iterate over sorted tracked offsets rather than a dense integer
+ # range. A dense range would break on any gap (compacted /
+ # transactional / filtered offset) that was never registered with
+ # add_offset, producing an artificially low committable offset.
+ sorted_offsets = sorted(all_offsets)
+
+ highest_committable = last_committed
+ prev = None
+ for offset in sorted_offsets:
+ if offset <= last_committed:
+ # Already committed; skip but update prev so the next
+ # offset's contiguity check is against the right base.
+ prev = offset
+ continue
+ # Break contiguity only when there is a gap between two
+ # *consecutive tracked* offsets (i.e. both were registered
+ # via add_offset). Untracked gaps are invisible here by design.
+ if prev is not None and offset != prev + 1:
+ break
+ if offset in outstanding:
+ # This offset is still being processed — cannot advance.
+ break
+ highest_committable = offset
+ prev = offset
+
+ if highest_committable > last_committed:
+ committable[partition] = highest_committable
+
return committable
@@ -100,7 +129,9 @@ class OffsetTracker:
def mark_committed(self, partition: Partition, offset: int) -> None:
"""Update the last committed offset for a partition."""
- with self._get_partition_lock(partition):
+ with self._global_lock:
+ lock = self._get_partition_lock(partition)
+ with lock:
self.last_committed[partition] = offset
# Remove all offsets <= committed offset
self.all_offsets[partition] = {o for o in self.all_offsets[partition] if o > offset}🤖 Grapple PR auto-fix • critical • Review this diff before applying |
||||||||||||||||||||||
| """Update the last committed offset for a partition.""" | ||||||||||||||||||||||
| with self._get_partition_lock(partition): | ||||||||||||||||||||||
| self.last_committed[partition] = offset | ||||||||||||||||||||||
| # Remove all offsets <= committed offset | ||||||||||||||||||||||
| self.all_offsets[partition] = {o for o in self.all_offsets[partition] if o > offset} | ||||||||||||||||||||||
|
|
||||||||||||||||||||||
|
|
||||||||||||||||||||||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 🟠 MAJOR — Thread Safety - Missing Partition Lock in mark_committed (confidence: 100%) The Evidence:
Agent: logic There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. ✅
Suggested change
🤖 Grapple PR auto-fix • major • confidence: 100% |
||||||||||||||||||||||
| class OrderedQueueWorker(threading.Thread, Generic[T]): | ||||||||||||||||||||||
| """Worker thread that processes items from a queue in order.""" | ||||||||||||||||||||||
|
|
||||||||||||||||||||||
| def __init__( | ||||||||||||||||||||||
| self, | ||||||||||||||||||||||
| worker_id: int, | ||||||||||||||||||||||
| work_queue: queue.Queue[WorkItem[T]], | ||||||||||||||||||||||
| result_processor: Callable[[str, T], None], | ||||||||||||||||||||||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 🟠 MAJOR — Algorithmic complexity (confidence: 100%) get_committable_offsets() acquires each partition lock individually in sequence, which means the set of partitions seen between the outer list() call and individual lock acquisitions can be inconsistent. More critically, the contiguous-offset scan is O(n) per partition per call to get_committable_offsets(), and this method is called from the main thread's submit() or flush() path while worker threads are concurrently modifying outstanding. Holding the lock for a linear scan blocks workers from completing offsets. Evidence:
Agent: performance There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 🟡 MINOR — Unnecessary work (confidence: 97%) get_committable_offsets() acquires locks and performs set operations on every call from the main thread. If the main consumer's flush() calls this frequently (e.g., on every message), the lock contention between the main thread and N worker threads will become a throughput bottleneck at high parallelism. Evidence:
Agent: performance There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 🟡
--- a/src/sentry/remote_subscriptions/consumers/queue_consumer.py
+++ b/src/sentry/remote_subscriptions/consumers/queue_consumer.py
@@ -68,25 +68,28 @@ class OffsetTracker:
def get_committable_offsets(self) -> dict[Partition, int]:
"""
Get the highest offset per partition that can be safely committed.
For each partition, finds the highest contiguous offset that has been processed.
+ Uses a sorted scan over only the tracked offsets (O(k log k) where k is the
+ number of tracked offsets) rather than iterating over the full offset range
+ (O(range_size)), so sparse partitions or large gaps don't stall worker threads.
"""
committable = {}
for partition in list(self.all_offsets.keys()):
with self._get_partition_lock(partition):
- all_offsets = self.all_offsets[partition]
- if not all_offsets:
+ # Take a consistent snapshot under the lock so the scan
+ # sees a stable view even as workers call complete_offset().
+ all_offsets_snapshot = set(self.all_offsets[partition])
+ if not all_offsets_snapshot:
continue
- outstanding = self.outstanding[partition]
+ outstanding_snapshot = set(self.outstanding[partition])
last_committed = self.last_committed.get(partition, -1)
- min_offset = min(all_offsets)
- max_offset = max(all_offsets)
-
- start = max(last_committed + 1, min_offset)
-
+ # Sort only the offsets we actually track — O(k log k) where k is
+ # the number of outstanding/tracked offsets, not the range size.
+ # This prevents long lock holds on partitions with sparse offsets.
highest_committable = last_committed
- for offset in range(start, max_offset + 1):
- if offset in all_offsets and offset not in outstanding:
- highest_committable = offset
- else:
+ prev = None
+ for offset in sorted(all_offsets_snapshot):
+ if offset <= last_committed:
+ # Already committed; skip but don't break the contiguity check.
+ prev = offset
+ continue
+ # Check for a gap: if this offset is not contiguous with the
+ # previous committable offset (or last_committed), stop.
+ expected = (prev if prev is not None else last_committed) + 1
+ if offset != expected:
+ break
+ if offset in outstanding_snapshot:
+ # This offset is not yet done; contiguous run ends here.
break
+ highest_committable = offset
+ prev = offset
if highest_committable > last_committed:
committable[partition] = highest_committable
return committable🤖 Grapple PR auto-fix • major • Review this diff before applying There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 🟠 MAJOR — Algorithmic complexity (confidence: 100%) get_committable_offsets() iterates over all tracked offsets for each partition to find the highest contiguous committed offset. Because all_offsets and outstanding are plain sets (not sorted structures), min() and max() are O(n) each call, and the inner contiguous-scan loop is also O(n). This method is called on every commit cycle, and since offsets are never evicted from all_offsets until mark_committed is called, the sets grow unboundedly over the consumer's lifetime. Evidence:
Agent: performance There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 🟡
--- a/src/sentry/remote_subscriptions/consumers/queue_consumer.py
+++ b/src/sentry/remote_subscriptions/consumers/queue_consumer.py
@@ -1,7 +1,8 @@
from __future__ import annotations
import logging
import queue
import threading
import time
from collections import defaultdict
+from sortedcontainers import SortedList
from collections.abc import Callable
from dataclasses import dataclass
from typing import Any, Generic, TypeVar
@@ -38,8 +39,13 @@ class OffsetTracker:
def __init__(self) -> None:
- self.all_offsets: dict[Partition, set[int]] = defaultdict(set)
+ # SortedList gives O(log n) add/remove and O(1) min/max via index,
+ # replacing the plain set whose min()/max() were O(n) each call.
+ self.all_offsets: dict[Partition, SortedList] = defaultdict(SortedList)
self.outstanding: dict[Partition, set[int]] = defaultdict(set)
self.last_committed: dict[Partition, int] = {}
self.partition_locks: dict[Partition, threading.Lock] = {}
+ # next_expected[partition] is the lowest offset we still need to scan
+ # from when computing the contiguous frontier. Avoids re-scanning from
+ # min(all_offsets) on every commit cycle.
+ self.next_expected: dict[Partition, int] = {}
def _get_partition_lock(self, partition: Partition) -> threading.Lock:
"""Get or create a lock for a partition."""
@@ -52,6 +58,10 @@ class OffsetTracker:
def add_offset(self, partition: Partition, offset: int) -> None:
"""Record that we've started processing an offset."""
with self._get_partition_lock(partition):
self.all_offsets[partition].add(offset)
self.outstanding[partition].add(offset)
+ # Initialise the frontier pointer the first time we see an offset
+ # for this partition.
+ if partition not in self.next_expected:
+ self.next_expected[partition] = offset
def complete_offset(self, partition: Partition, offset: int) -> None:
"""Mark an offset as completed."""
@@ -67,28 +77,37 @@ class OffsetTracker:
committable = {}
for partition in list(self.all_offsets.keys()):
with self._get_partition_lock(partition):
all_offsets = self.all_offsets[partition]
if not all_offsets:
continue
outstanding = self.outstanding[partition]
last_committed = self.last_committed.get(partition, -1)
- min_offset = min(all_offsets)
- max_offset = max(all_offsets)
-
- start = max(last_committed + 1, min_offset)
+ # O(1): SortedList index access instead of O(n) min()/max().
+ max_offset = all_offsets[-1]
+ # Resume from the known frontier rather than re-scanning from
+ # the beginning of all_offsets every cycle.
+ start = max(
+ last_committed + 1,
+ self.next_expected.get(partition, all_offsets[0]),
+ )
+
highest_committable = last_committed
for offset in range(start, max_offset + 1):
- if offset in all_offsets and offset not in outstanding:
+ # Membership check is O(log n) for SortedList and O(1) for set.
+ if offset in all_offsets and offset not in outstanding:
highest_committable = offset
else:
break
+ # Advance the frontier pointer so the next commit cycle starts
+ # here instead of at last_committed + 1.
+ if highest_committable > last_committed:
+ self.next_expected[partition] = highest_committable + 1
+
if highest_committable > last_committed:
committable[partition] = highest_committable
return committable
def mark_committed(self, partition: Partition, offset: int) -> None:
"""Update the last committed offset for a partition."""
with self._get_partition_lock(partition):
self.last_committed[partition] = offset
- # Remove all offsets <= committed offset
- self.all_offsets[partition] = {o for o in self.all_offsets[partition] if o > offset}
+ # irange gives us an O(log n) view of values <= offset; removing
+ # them one-by-one is O(k log n) where k is the number removed —
+ # far better than rebuilding the whole set via comprehension O(n).
+ sl = self.all_offsets[partition]
+ to_remove = list(sl.irange(None, offset))
+ for o in to_remove:
+ sl.remove(o)
+ # Ensure the frontier pointer is always at least last_committed + 1
+ # so we never re-scan already-committed history.
+ self.next_expected[partition] = max(
+ self.next_expected.get(partition, offset + 1),
+ offset + 1,
+ )🤖 Grapple PR auto-fix • major • Review this diff before applying |
||||||||||||||||||||||
| identifier: str, | ||||||||||||||||||||||
| offset_tracker: OffsetTracker, | ||||||||||||||||||||||
| ) -> None: | ||||||||||||||||||||||
| super().__init__(daemon=True) | ||||||||||||||||||||||
| self.worker_id = worker_id | ||||||||||||||||||||||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 🟠 MAJOR — Logic Error - Incorrect Committable Offset Calculation (confidence: 100%) The Evidence:
Agent: security There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 🟡
--- a/src/sentry/remote_subscriptions/consumers/queue_consumer.py
+++ b/src/sentry/remote_subscriptions/consumers/queue_consumer.py
@@ -67,28 +67,38 @@ class OffsetTracker:
def get_committable_offsets(self) -> dict[Partition, int]:
"""
Get the highest offset per partition that can be safely committed.
- For each partition, finds the highest contiguous offset that has been processed.
+ For each partition, finds the highest contiguous offset that has been
+ processed, starting from the lowest tracked offset.
+
+ Correctness invariant: mark_committed() prunes all_offsets to only
+ contain offsets > last_committed, so min(all_offsets) is always the
+ first un-committed offset we know about. We can only advance if that
+ offset is contiguous with last_committed (i.e., min == last_committed + 1)
+ AND it is no longer outstanding. Any gap between last_committed and
+ min(all_offsets) means we cannot safely commit further.
"""
committable = {}
for partition in list(self.all_offsets.keys()):
with self._get_partition_lock(partition):
all_offsets = self.all_offsets[partition]
if not all_offsets:
continue
outstanding = self.outstanding[partition]
last_committed = self.last_committed.get(partition, -1)
- min_offset = min(all_offsets)
- max_offset = max(all_offsets)
-
- start = max(last_committed + 1, min_offset)
-
+ # Sort so we scan in ascending order and detect gaps immediately.
+ sorted_offsets = sorted(all_offsets)
+ min_offset = sorted_offsets[0]
+
+ # If there is a gap between the last committed offset and the
+ # lowest tracked offset, we cannot advance — some offsets in
+ # that gap were never added to all_offsets (or were from a
+ # prior assignment). Stop here.
+ if min_offset != last_committed + 1:
+ continue
+
highest_committable = last_committed
- for offset in range(start, max_offset + 1):
- if offset in all_offsets and offset not in outstanding:
- highest_committable = offset
- else:
+ prev = last_committed
+ for offset in sorted_offsets:
+ if offset != prev + 1:
+ # Gap in tracked offsets — cannot commit past prev.
break
+ if offset in outstanding:
+ # This offset is still being processed — stop here.
+ break
+ highest_committable = offset
+ prev = offset
if highest_committable > last_committed:
committable[partition] = highest_committable
return committable🤖 Grapple PR auto-fix • major • Review this diff before applying |
||||||||||||||||||||||
| self.work_queue = work_queue | ||||||||||||||||||||||
| self.result_processor = result_processor | ||||||||||||||||||||||
| self.identifier = identifier | ||||||||||||||||||||||
| self.offset_tracker = offset_tracker | ||||||||||||||||||||||
| self.shutdown = False | ||||||||||||||||||||||
|
|
||||||||||||||||||||||
| def run(self) -> None: | ||||||||||||||||||||||
| """Process items from the queue in order.""" | ||||||||||||||||||||||
| while not self.shutdown: | ||||||||||||||||||||||
| try: | ||||||||||||||||||||||
| work_item = self.work_queue.get() | ||||||||||||||||||||||
| except queue.ShutDown: | ||||||||||||||||||||||
| break | ||||||||||||||||||||||
|
|
||||||||||||||||||||||
| try: | ||||||||||||||||||||||
| with sentry_sdk.start_transaction( | ||||||||||||||||||||||
| op="queue_worker.process", | ||||||||||||||||||||||
| name=f"monitors.{self.identifier}.worker_{self.worker_id}", | ||||||||||||||||||||||
| ): | ||||||||||||||||||||||
| self.result_processor(self.identifier, work_item.result) | ||||||||||||||||||||||
|
|
||||||||||||||||||||||
| except queue.ShutDown: | ||||||||||||||||||||||
| break | ||||||||||||||||||||||
| except Exception: | ||||||||||||||||||||||
| logger.exception( | ||||||||||||||||||||||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 🔵 INFO — Documentation (confidence: 75%) The get_committable_offsets method logic lacks inline documentation explaining the contiguous offset algorithm. The method is critical for preventing message loss but the logic (lines 131-144 are truncated) is not clearly explained. Evidence:
Agent: style |
||||||||||||||||||||||
| "Unexpected error in queue worker", extra={"worker_id": self.worker_id} | ||||||||||||||||||||||
| ) | ||||||||||||||||||||||
| finally: | ||||||||||||||||||||||
| self.offset_tracker.complete_offset(work_item.partition, work_item.offset) | ||||||||||||||||||||||
| metrics.gauge( | ||||||||||||||||||||||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 🟡 MINOR — Input Validation (confidence: 100%) The FixedQueuePool constructor accepts Evidence:
Agent: logic There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. ✅
Suggested change
🤖 Grapple PR auto-fix • minor • confidence: 100% |
||||||||||||||||||||||
| "remote_subscriptions.queue_worker.queue_depth", | ||||||||||||||||||||||
| self.work_queue.qsize(), | ||||||||||||||||||||||
| tags={ | ||||||||||||||||||||||
| "identifier": self.identifier, | ||||||||||||||||||||||
| }, | ||||||||||||||||||||||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 🟠 MAJOR — Missing Partition Revocation Handling (confidence: 100%) The Evidence:
Agent: logic |
||||||||||||||||||||||
| ) | ||||||||||||||||||||||
|
|
||||||||||||||||||||||
|
|
||||||||||||||||||||||
| class FixedQueuePool(Generic[T]): | ||||||||||||||||||||||
| """ | ||||||||||||||||||||||
| Fixed pool of queues that guarantees order within groups. | ||||||||||||||||||||||
|
|
||||||||||||||||||||||
| Key properties: | ||||||||||||||||||||||
| - Each group is consistently assigned to the same queue | ||||||||||||||||||||||
| - Each queue has exactly one worker thread | ||||||||||||||||||||||
| - Items within a queue are processed in FIFO order | ||||||||||||||||||||||
| - No dynamic reassignment that could break ordering | ||||||||||||||||||||||
| - Tracks offset completion for safe commits | ||||||||||||||||||||||
| """ | ||||||||||||||||||||||
|
|
||||||||||||||||||||||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 🟠 MAJOR — Worker Exception Handling / Silent Message Loss (confidence: 100%) In Evidence:
Agent: architecture There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 🟡
--- a/src/sentry/remote_subscriptions/consumers/queue_consumer.py
+++ b/src/sentry/remote_subscriptions/consumers/queue_consumer.py
@@ -128,18 +128,35 @@ class OrderedQueueWorker(threading.Thread, Generic[T]):
def run(self) -> None:
"""Process items from the queue in order."""
while not self.shutdown:
try:
work_item = self.work_queue.get()
except queue.ShutDown:
break
+ processing_failed = False
try:
with sentry_sdk.start_transaction(
op="queue_worker.process",
name=f"monitors.{self.identifier}.worker_{self.worker_id}",
):
self.result_processor(self.identifier, work_item.result)
except queue.ShutDown:
break
except Exception:
+ processing_failed = True
logger.exception(
- "Unexpected error in queue worker", extra={"worker_id": self.worker_id}
+ "Unexpected error in queue worker — offset NOT committed, partition will stall",
+ extra={
+ "worker_id": self.worker_id,
+ "partition": work_item.partition,
+ "offset": work_item.offset,
+ },
)
+ metrics.incr(
+ "remote_subscriptions.queue_worker.processing_error",
+ tags={
+ "identifier": self.identifier,
+ },
+ )
finally:
- self.offset_tracker.complete_offset(work_item.partition, work_item.offset)
+ # Only mark the offset complete on success. On failure we intentionally
+ # leave the offset in `outstanding` so that `get_committable_offsets()`
+ # cannot advance past this point, preventing silent message loss for
+ # transient errors (DB timeouts, network blips, etc.).
+ #
+ # NOTE: This will stall the partition at this offset until the consumer
+ # is restarted (triggering a re-delivery from the last committed offset).
+ # If poison-pill / permanently-failing messages are a concern, the caller's
+ # `result_processor` should handle its own dead-letter logic and only raise
+ # for genuinely retryable errors.
+ if not processing_failed:
+ self.offset_tracker.complete_offset(work_item.partition, work_item.offset)
metrics.gauge(
"remote_subscriptions.queue_worker.queue_depth",
self.work_queue.qsize(),
tags={
"identifier": self.identifier,
},
)🤖 Grapple PR auto-fix • major • Review this diff before applying |
||||||||||||||||||||||
| def __init__( | ||||||||||||||||||||||
| self, | ||||||||||||||||||||||
| result_processor: Callable[[str, T], None], | ||||||||||||||||||||||
| identifier: str, | ||||||||||||||||||||||
| num_queues: int = 20, | ||||||||||||||||||||||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 🟡 MINOR — Naming conventions (confidence: 80%) Inconsistent variable naming in Evidence:
Agent: style There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. ✅
Suggested change
🤖 Grapple PR auto-fix • minor • confidence: 80% |
||||||||||||||||||||||
| ) -> None: | ||||||||||||||||||||||
| self.result_processor = result_processor | ||||||||||||||||||||||
| self.identifier = identifier | ||||||||||||||||||||||
| self.num_queues = num_queues | ||||||||||||||||||||||
| self.offset_tracker = OffsetTracker() | ||||||||||||||||||||||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 🟠 MAJOR — Memory leaks (confidence: 100%) Worker queues have no maximum depth configured, allowing unbounded memory growth if workers fall behind processing speed. Evidence:
Agent: performance There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. ✅
Suggested change
🤖 Grapple PR auto-fix • major • confidence: 100% There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 🟠 MAJOR — Logic Error / FixedQueuePool Worker Receives result_processor Incorrectly (confidence: 100%) The Evidence:
Agent: security There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 🟡
--- a/src/sentry/remote_subscriptions/consumers/queue_consumer.py
+++ b/src/sentry/remote_subscriptions/consumers/queue_consumer.py
@@ -100,7 +100,7 @@ class OrderedQueueWorker(threading.Thread, Generic[T]):
def __init__(
self,
worker_id: int,
work_queue: queue.Queue[WorkItem[T]],
- result_processor: Callable[[str, T], None],
+ result_processor: Callable[[T], None],
identifier: str,
offset_tracker: OffsetTracker,
) -> None:
@@ -136,7 +136,7 @@ class OrderedQueueWorker(threading.Thread, Generic[T]):
with sentry_sdk.start_transaction(
op="queue_worker.process",
name=f"monitors.{self.identifier}.worker_{self.worker_id}",
):
- self.result_processor(self.identifier, work_item.result)
+ self.result_processor(work_item.result)
except queue.ShutDown:
break
@@ -171,7 +171,7 @@ class FixedQueuePool(Generic[T]):
def __init__(
self,
- result_processor: Callable[[str, T], None],
+ result_processor: Callable[[T], None],
identifier: str,
num_queues: int = 20,
) -> None:🤖 Grapple PR auto-fix • major • Review this diff before applying |
||||||||||||||||||||||
| self.queues: list[queue.Queue[WorkItem[T]]] = [] | ||||||||||||||||||||||
| self.workers: list[OrderedQueueWorker[T]] = [] | ||||||||||||||||||||||
|
|
||||||||||||||||||||||
| for i in range(num_queues): | ||||||||||||||||||||||
| work_queue: queue.Queue[WorkItem[T]] = queue.Queue() | ||||||||||||||||||||||
| self.queues.append(work_queue) | ||||||||||||||||||||||
|
|
||||||||||||||||||||||
| worker = OrderedQueueWorker[T]( | ||||||||||||||||||||||
| worker_id=i, | ||||||||||||||||||||||
| work_queue=work_queue, | ||||||||||||||||||||||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 🟠 MAJOR — Error Handling / Silent Message Loss (confidence: 100%) In the Evidence:
Agent: logic There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 🟡
--- a/src/sentry/remote_subscriptions/consumers/queue_consumer.py
+++ b/src/sentry/remote_subscriptions/consumers/queue_consumer.py
@@ -147,13 +147,30 @@ class OrderedQueueWorker(threading.Thread, Generic[T]):
try:
with sentry_sdk.start_transaction(
op="queue_worker.process",
name=f"monitors.{self.identifier}.worker_{self.worker_id}",
):
self.result_processor(self.identifier, work_item.result)
except queue.ShutDown:
break
except Exception:
+ # NOTE: We intentionally still complete the offset below in the
+ # finally block even on failure. The OffsetTracker requires
+ # contiguous offset completion — leaving this offset incomplete
+ # would permanently stall commits for all later offsets on this
+ # partition. There is no retry infrastructure in this consumer.
+ #
+ # Failures here mean the message is skipped. This is an explicit
+ # architectural decision: observability (Sentry + metric) is used
+ # to detect and alert on drops rather than blocking the pipeline.
logger.exception(
- "Unexpected error in queue worker", extra={"worker_id": self.worker_id}
+ "Unexpected error in queue worker — message will be skipped",
+ extra={
+ "worker_id": self.worker_id,
+ "partition": work_item.partition,
+ "offset": work_item.offset,
+ "identifier": self.identifier,
+ },
+ )
+ sentry_sdk.capture_exception()
+ metrics.incr(
+ "remote_subscriptions.queue_worker.message_dropped",
+ tags={
+ "identifier": self.identifier,
+ "reason": "processing_exception",
+ },
)
finally:
+ # Always complete the offset — see note in except block above.
self.offset_tracker.complete_offset(work_item.partition, work_item.offset)
metrics.gauge(
"remote_subscriptions.queue_worker.queue_depth",🤖 Grapple PR auto-fix • major • Review this diff before applying |
||||||||||||||||||||||
| result_processor=result_processor, | ||||||||||||||||||||||
| identifier=identifier, | ||||||||||||||||||||||
| offset_tracker=self.offset_tracker, | ||||||||||||||||||||||
| ) | ||||||||||||||||||||||
| worker.start() | ||||||||||||||||||||||
| self.workers.append(worker) | ||||||||||||||||||||||
|
|
||||||||||||||||||||||
| def get_queue_for_group(self, group_key: str) -> int: | ||||||||||||||||||||||
| """ | ||||||||||||||||||||||
| Get queue index for a group using consistent hashing. | ||||||||||||||||||||||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 🟡 MINOR — Documentation (confidence: 95%) The Evidence:
Agent: style There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 🟡
--- a/src/sentry/remote_subscriptions/consumers/queue_consumer.py
+++ b/src/sentry/remote_subscriptions/consumers/queue_consumer.py
@@ -93,10 +93,43 @@ class OffsetTracker:
def get_committable_offsets(self) -> dict[Partition, int]:
"""
- Get the highest offset per partition that can be safely committed.
-
- For each partition, finds the highest contiguous offset that has been processed.
+ Get the highest offset per partition that can be safely committed to Kafka.
+
+ Kafka offset commit semantics require committing contiguous offsets only.
+ When a consumer commits offset N, it signals to Kafka: "all messages up to
+ and including N have been processed; resume from N+1 on restart." This means
+ committing a non-contiguous offset (e.g., committing offset 5 when offset 3
+ is still outstanding) would cause messages at offset 3 to be silently skipped
+ after a consumer restart — i.e., permanent message loss.
+
+ Algorithm:
+ For each partition, scan forward from the last committed offset + 1 through
+ the highest known offset. Advance the "highest committable" cursor as long as
+ each successive offset is both known (was submitted for processing) and
+ completed (processing has finished). Stop immediately at the first gap —
+ either an offset that hasn't been submitted yet, or one that is still
+ outstanding (in-flight in a worker queue).
+
+ Example:
+ Offsets submitted: {3, 4, 5, 6, 7}
+ Offsets outstanding (in-flight): {5, 6}
+ Last committed: 2
+
+ Scan from 3:
+ offset 3 -> completed -> highest_committable = 3
+ offset 4 -> completed -> highest_committable = 4
+ offset 5 -> outstanding -> STOP
+
+ Returns: {partition: 4}
+
+ Offsets 5, 6, and 7 are intentionally withheld even though 7 may already
+ be complete, because committing 7 would skip 5 and 6 on restart.
+
+ Returns:
+ A dict mapping each partition to the highest offset that is safe to commit.
+ Partitions with no newly committable progress (i.e., the lowest unprocessed
+ offset is still outstanding) are omitted from the result.
"""
committable = {}
for partition in list(self.all_offsets.keys()):🤖 Grapple PR auto-fix • minor • Review this diff before applying There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 🟠 MAJOR — Concurrency issues (confidence: 95%) FixedQueuePool worker threads catch exceptions and log them but continue running without any mechanism to propagate errors to the main thread or trigger backpressure. If a worker thread's result_processor consistently raises exceptions, messages are silently dropped — the offset is marked complete and the work is lost. Evidence:
Agent: performance |
||||||||||||||||||||||
| """ | ||||||||||||||||||||||
| return hash(group_key) % self.num_queues | ||||||||||||||||||||||
|
|
||||||||||||||||||||||
| def submit(self, group_key: str, work_item: WorkItem[T]) -> None: | ||||||||||||||||||||||
| """ | ||||||||||||||||||||||
| Submit a work item to the appropriate queue. | ||||||||||||||||||||||
| """ | ||||||||||||||||||||||
| queue_index = self.get_queue_for_group(group_key) | ||||||||||||||||||||||
| work_queue = self.queues[queue_index] | ||||||||||||||||||||||
|
|
||||||||||||||||||||||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 🟠 MAJOR — Concurrency issues (confidence: 100%) Worker threads in Evidence:
Agent: performance There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 🟡 MINOR — Thread Safety (confidence: 72%) The Evidence:
Agent: logic There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 🟡
--- a/src/sentry/remote_subscriptions/consumers/queue_consumer.py
+++ b/src/sentry/remote_subscriptions/consumers/queue_consumer.py
@@ -108,6 +108,7 @@ class OrderedQueueWorker(threading.Thread, Generic[T]):
self.identifier = identifier
self.offset_tracker = offset_tracker
self.shutdown = False
+ self._alive = True
def run(self) -> None:
"""Process items from the queue in order."""
- while not self.shutdown:
- try:
- work_item = self.work_queue.get()
- except queue.ShutDown:
- break
-
+ work_item = None
+ while not self.shutdown:
try:
+ try:
+ work_item = self.work_queue.get()
+ except queue.ShutDown:
+ break
+
with sentry_sdk.start_transaction(
op="queue_worker.process",
name=f"monitors.{self.identifier}.worker_{self.worker_id}",
):
self.result_processor(self.identifier, work_item.result)
- except queue.ShutDown:
- break
except Exception:
logger.exception(
"Unexpected error in queue worker", extra={"worker_id": self.worker_id}
)
finally:
- self.offset_tracker.complete_offset(work_item.partition, work_item.offset)
+ if work_item is not None:
+ self.offset_tracker.complete_offset(work_item.partition, work_item.offset)
+ work_item.done_event.set() if hasattr(work_item, "done_event") else None
+ work_item = None
metrics.gauge(
"remote_subscriptions.queue_worker.queue_depth",
self.work_queue.qsize(),
tags={
"identifier": self.identifier,
},
)
+ self._alive = False
+ # Drain remaining items so callers blocked on queue.put() are never stuck.
+ # complete_offset is called for each drained item so shutdown() doesn't hang.
+ self._drain_on_exit()
+
+ def _drain_on_exit(self) -> None:
+ """
+ Drain all remaining items from the queue when the worker exits unexpectedly.
+ This prevents the main Kafka poll thread from blocking on queue.put() and
+ ensures offset_tracker.complete_offset() is called for every queued item
+ so that shutdown() / queue.join() do not hang.
+ """
+ while True:
+ try:
+ work_item = self.work_queue.get_nowait()
+ except (queue.Empty, queue.ShutDown):
+ break
+ except Exception:
+ logger.exception(
+ "Unexpected error draining queue on worker exit",
+ extra={"worker_id": self.worker_id},
+ )
+ break
+ try:
+ self.offset_tracker.complete_offset(work_item.partition, work_item.offset)
+ except Exception:
+ logger.exception(
+ "Error completing offset during drain",
+ extra={"worker_id": self.worker_id},
+ )
class FixedQueuePool(Generic[T]):
@@ -180,10 +221,38 @@ class FixedQueuePool(Generic[T]):
def submit(self, group_key: str, work_item: WorkItem[T]) -> None:
"""
Submit a work item to the appropriate queue.
+
+ If the target worker has died, attempt to restart it so the queue does
+ not fill up and block the Kafka poll thread indefinitely.
"""
queue_index = self.get_queue_for_group(group_key)
work_queue = self.queues[queue_index]
+ worker = self.workers[queue_index]
+
+ # Detect dead workers before submitting to avoid blocking indefinitely.
+ if not worker._alive and not worker.is_alive():
+ logger.error(
+ "Queue worker %d has died; restarting to prevent message loss",
+ queue_index,
+ extra={"identifier": self.identifier},
+ )
+ metrics.incr(
+ "remote_subscriptions.queue_pool.worker_restart",
+ tags={"identifier": self.identifier, "worker_id": str(queue_index)},
+ )
+ new_worker = OrderedQueueWorker[T](
+ worker_id=queue_index,
+ work_queue=work_queue,
+ result_processor=self.result_processor,
+ identifier=self.identifier,
+ offset_tracker=self.offset_tracker,
+ )
+ new_worker.start()
+ self.workers[queue_index] = new_worker
+ worker = new_worker
self.offset_tracker.add_offset(work_item.partition, work_item.offset)
- work_queue.put(work_item)
+ # Use a timeout so the main thread never blocks permanently if the
+ # newly restarted worker also fails before accepting the item.
+ try:
+ work_queue.put(work_item, timeout=30.0)
+ except queue.Full:
+ # Mark the offset complete so offset tracking doesn't stall, then
+ # log loudly — this is a last-resort safety valve.
+ self.offset_tracker.complete_offset(work_item.partition, work_item.offset)
+ logger.error(
+ "Queue %d is full after 30s timeout; dropping work item to prevent deadlock. "
+ "partition=%s offset=%d",
+ queue_index,
+ work_item.partition,
+ work_item.offset,
+ extra={"identifier": self.identifier},
+ )
+ metrics.incr(
+ "remote_subscriptions.queue_pool.item_dropped",
+ tags={"identifier": self.identifier, "worker_id": str(queue_index)},
+ )
def get_stats(self) -> dict[str, Any]:
"""Get statistics about queue depths."""
@@ -195,6 +264,17 @@ class FixedQueuePool(Generic[T]):
+ def all_workers_alive(self) -> bool:
+ """Returns True if all worker threads are currently alive."""
+ return all(w.is_alive() for w in self.workers)
+
def wait_until_empty(self, timeout: float = 5.0) -> bool:
"""Wait until all queues are empty. Returns True if successful, False if timeout."""
start_time = time.time()
while time.time() - start_time < timeout:
if self.get_stats()["total_items"] == 0:
return True
time.sleep(0.01)
return False
def shutdown(self) -> None:
"""Gracefully shutdown all workers."""
for worker in self.workers:
worker.shutdown = True
for q in self.queues:
try:
q.shutdown(immediate=False)
except Exception:
logger.exception("Error shutting down queue")
for worker in self.workers:
- worker.join(timeout=5.0)
+ worker.join(timeout=10.0)
+ if worker.is_alive():
+ logger.warning(
+ "Worker %d did not exit within timeout during shutdown",
+ worker.worker_id,
+ extra={"identifier": self.identifier},
+ )🤖 Grapple PR auto-fix • major • Review this diff before applying There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 🟡
--- a/src/sentry/remote_subscriptions/consumers/queue_consumer.py
+++ b/src/sentry/remote_subscriptions/consumers/queue_consumer.py
@@ -108,7 +108,7 @@ class OrderedQueueWorker(threading.Thread, Generic[T]):
self.offset_tracker = offset_tracker
- self.shutdown = False
+ self.shutdown_event = threading.Event()
def run(self) -> None:
"""Process items from the queue in order."""
- while not self.shutdown:
+ while not self.shutdown_event.is_set():
try:
work_item = self.work_queue.get()
except queue.ShutDown:
@@ -222,7 +222,7 @@ class FixedQueuePool(Generic[T]):
def shutdown(self) -> None:
"""Gracefully shutdown all workers."""
for worker in self.workers:
- worker.shutdown = True
+ worker.shutdown_event.set()
for q in self.queues:
try:🤖 Grapple PR auto-fix • minor • Review this diff before applying |
||||||||||||||||||||||
| self.offset_tracker.add_offset(work_item.partition, work_item.offset) | ||||||||||||||||||||||
| work_queue.put(work_item) | ||||||||||||||||||||||
|
|
||||||||||||||||||||||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 🟠 MAJOR — Kafka Consumer Liveness / Heartbeat Risk (confidence: 100%) In Evidence:
Agent: architecture There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 🟡
--- a/src/sentry/remote_subscriptions/consumers/queue_consumer.py
+++ b/src/sentry/remote_subscriptions/consumers/queue_consumer.py
@@ -204,10 +204,26 @@ class FixedQueuePool(Generic[T]):
def submit(self, group_key: str, work_item: WorkItem[T]) -> None:
"""
Submit a work item to the appropriate queue.
+
+ Uses put_nowait() to avoid blocking the Arroyo main thread. If the queue
+ is full (only possible if queues are created with a maxsize), the item is
+ dropped and its offset is immediately marked complete so commits can
+ proceed. This prevents Kafka heartbeat starvation and consumer group
+ rebalancing caused by blocking in submit().
"""
queue_index = self.get_queue_for_group(group_key)
work_queue = self.queues[queue_index]
- self.offset_tracker.add_offset(work_item.partition, work_item.offset)
- work_queue.put(work_item)
+ try:
+ # Register the offset before enqueuing so the tracker is always consistent.
+ self.offset_tracker.add_offset(work_item.partition, work_item.offset)
+ work_queue.put_nowait(work_item)
+ except queue.Full:
+ # Queue is full — drop the item rather than blocking the Kafka consumer
+ # thread. Mark the offset complete immediately so commits can advance.
+ # This is a last-resort safety valve; callers should monitor the
+ # "remote_subscriptions.queue_pool.queue_full_drops" metric and tune
+ # queue capacity or worker count before drops become frequent.
+ self.offset_tracker.complete_offset(work_item.partition, work_item.offset)
+ logger.warning(
+ "Queue %d full, dropping work item for group %r (partition=%s offset=%d)",
+ queue_index,
+ group_key,
+ work_item.partition,
+ work_item.offset,
+ )
+ metrics.incr(
+ "remote_subscriptions.queue_pool.queue_full_drops",
+ tags={"identifier": self.identifier, "queue_index": str(queue_index)},
+ )🤖 Grapple PR auto-fix • major • Review this diff before applying |
||||||||||||||||||||||
| def get_stats(self) -> dict[str, Any]: | ||||||||||||||||||||||
| """Get statistics about queue depths.""" | ||||||||||||||||||||||
| queue_depths = [q.qsize() for q in self.queues] | ||||||||||||||||||||||
| return { | ||||||||||||||||||||||
| "queue_depths": queue_depths, | ||||||||||||||||||||||
| "total_items": sum(queue_depths), | ||||||||||||||||||||||
| } | ||||||||||||||||||||||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 🟠 MAJOR — Concurrency issues (confidence: 100%) Worker threads perform blocking queue.get() without a timeout inside the main processing loop. If the shutdown flag is set while a worker is blocked on queue.get(), the worker will hang indefinitely until a new item arrives or the queue is closed. This can cause the consumer to hang on shutdown when queues are empty. Evidence:
Agent: performance There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. ✅
Suggested change
🤖 Grapple PR auto-fix • major • confidence: 100% |
||||||||||||||||||||||
|
|
||||||||||||||||||||||
| def wait_until_empty(self, timeout: float = 5.0) -> bool: | ||||||||||||||||||||||
| """Wait until all queues are empty. Returns True if successful, False if timeout.""" | ||||||||||||||||||||||
| start_time = time.time() | ||||||||||||||||||||||
| while time.time() - start_time < timeout: | ||||||||||||||||||||||
| if self.get_stats()["total_items"] == 0: | ||||||||||||||||||||||
| return True | ||||||||||||||||||||||
| time.sleep(0.01) | ||||||||||||||||||||||
| return False | ||||||||||||||||||||||
|
|
||||||||||||||||||||||
| def shutdown(self) -> None: | ||||||||||||||||||||||
| """Gracefully shutdown all workers.""" | ||||||||||||||||||||||
| for worker in self.workers: | ||||||||||||||||||||||
| worker.shutdown = True | ||||||||||||||||||||||
|
|
||||||||||||||||||||||
| for q in self.queues: | ||||||||||||||||||||||
| try: | ||||||||||||||||||||||
| q.shutdown(immediate=False) | ||||||||||||||||||||||
| except Exception: | ||||||||||||||||||||||
| logger.exception("Error shutting down queue") | ||||||||||||||||||||||
|
|
||||||||||||||||||||||
| for worker in self.workers: | ||||||||||||||||||||||
| worker.join(timeout=5.0) | ||||||||||||||||||||||
|
|
||||||||||||||||||||||
|
|
||||||||||||||||||||||
| class SimpleQueueProcessingStrategy(ProcessingStrategy[KafkaPayload], Generic[T]): | ||||||||||||||||||||||
| """ | ||||||||||||||||||||||
| Processing strategy that uses a fixed pool of queues. | ||||||||||||||||||||||
|
|
||||||||||||||||||||||
| Guarantees: | ||||||||||||||||||||||
| - Items for the same group are processed in order | ||||||||||||||||||||||
| - No item is lost or processed out of order | ||||||||||||||||||||||
| - Natural backpressure when queues fill up | ||||||||||||||||||||||
| - Only commits offsets after successful processing | ||||||||||||||||||||||
| """ | ||||||||||||||||||||||
|
|
||||||||||||||||||||||
| def __init__( | ||||||||||||||||||||||
| self, | ||||||||||||||||||||||
| queue_pool: FixedQueuePool[T], | ||||||||||||||||||||||
| decoder: Callable[[KafkaPayload | FilteredPayload], T | None], | ||||||||||||||||||||||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 🟠 MAJOR — Concurrency issues (confidence: 100%) Blocking Evidence:
Agent: performance There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 🟡
--- a/src/sentry/remote_subscriptions/consumers/queue_consumer.py
+++ b/src/sentry/remote_subscriptions/consumers/queue_consumer.py
@@ -1,6 +1,7 @@
from __future__ import annotations
import logging
import queue
import threading
import time
from collections import defaultdict
from collections.abc import Callable
from dataclasses import dataclass
from typing import Any, Generic, TypeVar
import sentry_sdk
from arroyo.backends.kafka.consumer import KafkaPayload
from arroyo.processing.strategies import MessageRejected, ProcessingStrategy
from arroyo.types import BrokerValue, FilteredPayload, Message, Partition
from sentry.utils import metrics
@@ -198,14 +199,35 @@ class FixedQueuePool(Generic[T]):
def get_queue_for_group(self, group_key: str) -> int:
"""
Get queue index for a group using consistent hashing.
"""
return hash(group_key) % self.num_queues
- def submit(self, group_key: str, work_item: WorkItem[T]) -> None:
+ def submit(self, group_key: str, work_item: WorkItem[T]) -> bool:
"""
Submit a work item to the appropriate queue.
+
+ Returns True if the item was successfully enqueued, False if the queue
+ is full. Does NOT block the calling thread — callers must handle the
+ False return value by raising MessageRejected or equivalent backpressure.
+
+ NOTE: offset tracking is only registered AFTER a successful put so that
+ the commit loop never waits on an offset that was never actually queued.
"""
queue_index = self.get_queue_for_group(group_key)
work_queue = self.queues[queue_index]
- self.offset_tracker.add_offset(work_item.partition, work_item.offset)
- work_queue.put(work_item)
+ try:
+ work_queue.put_nowait(work_item)
+ except queue.Full:
+ # Queue is full — do NOT block the Kafka poll thread.
+ # The caller should raise MessageRejected so Arroyo re-delivers
+ # this message once capacity frees up.
+ metrics.incr(
+ "remote_subscriptions.queue_pool.queue_full",
+ tags={
+ "identifier": self.identifier,
+ "queue_index": str(queue_index),
+ },
+ )
+ return False
+
+ # Only register the offset after successfully enqueuing — this prevents
+ # the commit loop from stalling on an offset that was never tracked.
+ self.offset_tracker.add_offset(work_item.partition, work_item.offset)
+ return True
def get_stats(self) -> dict[str, Any]:
"""Get statistics about queue depths."""
@@ -290,7 +312,25 @@ class SimpleQueueProcessingStrategy(ProcessingStrategy[KafkaPayload], Generic[T
group_key = self.grouping_fn(result)
work_item = WorkItem(
partition=partition,
offset=offset,
result=result,
message=message,
)
- self.queue_pool.submit(group_key, work_item)
+ # submit() is non-blocking. If the target queue is full, it returns
+ # False and we raise MessageRejected so Arroyo pauses consumption
+ # and re-delivers this message — keeping the Kafka poll loop alive
+ # and avoiding consumer group rebalancing due to missed heartbeats.
+ accepted = self.queue_pool.submit(group_key, work_item)
+ if not accepted:
+ metrics.incr(
+ "remote_subscriptions.queue_pool.message_rejected",
+ tags={"identifier": self.queue_pool.identifier},
+ )
+ raise MessageRejected()
except MessageRejected:
+ # Re-raise MessageRejected so the Arroyo framework handles backpressure
+ # correctly — do not swallow it in the broad except below.
raise
except Exception:
logger.exception("Error submitting message to queue")
if isinstance(message.value, BrokerValue):
self.queue_pool.offset_tracker.add_offset(
message.value.partition, message.value.offset
)
self.queue_pool.offset_tracker.complete_offset(
message.value.partition, message.value.offset
)🤖 Grapple PR auto-fix • major • Review this diff before applying There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 🟠 MAJOR — Concurrency issues (confidence: 100%) shutdown() joins worker threads with a fixed timeout but does not verify all queues are actually drained. If queues contain more items than workers can process within the timeout, messages are abandoned without any warning, violating the 'drain in-flight messages before stopping' requirement. Evidence:
Agent: performance There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 🟠 MAJOR — Logic Error / SimpleQueueProcessingStrategy.submit Does Not Track Offsets for FilteredPayload (confidence: 100%) The Evidence:
Agent: security There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 🟠 MAJOR — Logic Error / Queue Selection for FilteredPayload (confidence: 100%) In Evidence:
Agent: logic There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. ✅
Suggested change
🤖 Grapple PR auto-fix • major • confidence: 100% There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 🟡
--- a/src/sentry/remote_subscriptions/consumers/queue_consumer.py
+++ b/src/sentry/remote_subscriptions/consumers/queue_consumer.py
@@ -231,12 +231,34 @@ class FixedQueuePool(Generic[T]):
def shutdown(self) -> None:
"""Gracefully shutdown all workers."""
+ # Step 1: Drain all queues before stopping workers.
+ # wait_until_empty() polls qsize(); give it a generous window so that
+ # a large-but-finite backlog doesn't cause silent message loss.
+ DRAIN_TIMEOUT = 30.0
+ drained = self.wait_until_empty(timeout=DRAIN_TIMEOUT)
+ if not drained:
+ remaining = self.get_stats()["total_items"]
+ logger.warning(
+ "Queue pool shutdown timed out before draining all items; "
+ "%d item(s) will be abandoned",
+ remaining,
+ extra={"identifier": self.identifier},
+ )
+ metrics.gauge(
+ "remote_subscriptions.queue_pool.abandoned_on_shutdown",
+ remaining,
+ tags={"identifier": self.identifier},
+ )
+
+ # Step 2: Signal workers to stop accepting new work.
for worker in self.workers:
worker.shutdown = True
+ # Step 3: Shut down the underlying queue objects so blocked get()
+ # calls in workers unblock immediately (queue.ShutDown is raised).
for q in self.queues:
try:
q.shutdown(immediate=False)
except Exception:
logger.exception("Error shutting down queue")
+ # Step 4: Reap worker threads. Use a per-worker timeout that is long
+ # enough for a worker currently mid-item to finish, but don't block
+ # forever. By this point the queues are drained (or we've logged the
+ # abandonment), so workers should exit promptly.
for worker in self.workers:
- worker.join(timeout=5.0)
+ worker.join(timeout=10.0)
+ if worker.is_alive():
+ logger.warning(
+ "Worker %d did not exit within join timeout",
+ worker.worker_id,
+ extra={"identifier": self.identifier},
+ )🤖 Grapple PR auto-fix • major • Review this diff before applying |
||||||||||||||||||||||
| grouping_fn: Callable[[T], str], | ||||||||||||||||||||||
| commit_function: Callable[[dict[Partition, int]], None], | ||||||||||||||||||||||
| ) -> None: | ||||||||||||||||||||||
| self.queue_pool = queue_pool | ||||||||||||||||||||||
| self.decoder = decoder | ||||||||||||||||||||||
| self.grouping_fn = grouping_fn | ||||||||||||||||||||||
| self.commit_function = commit_function | ||||||||||||||||||||||
| self.shutdown_event = threading.Event() | ||||||||||||||||||||||
|
|
||||||||||||||||||||||
| self.commit_thread = threading.Thread(target=self._commit_loop, daemon=True) | ||||||||||||||||||||||
| self.commit_thread.start() | ||||||||||||||||||||||
|
|
||||||||||||||||||||||
| def _commit_loop(self) -> None: | ||||||||||||||||||||||
| while not self.shutdown_event.is_set(): | ||||||||||||||||||||||
| try: | ||||||||||||||||||||||
| self.shutdown_event.wait(1.0) | ||||||||||||||||||||||
|
|
||||||||||||||||||||||
| committable = self.queue_pool.offset_tracker.get_committable_offsets() | ||||||||||||||||||||||
|
|
||||||||||||||||||||||
| if committable: | ||||||||||||||||||||||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 🟠 MAJOR — Logic Error - Thread Exception Silencing in Worker Queues (confidence: 100%) If a worker thread in Evidence:
Agent: security There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 🟡 MINOR — Unnecessary work (confidence: 97%) The commit loop in SimpleQueueProcessingStrategy calls get_committable_offsets() on every poll/submit cycle regardless of whether any offsets have completed since the last call. For high-throughput consumers, this adds unnecessary overhead computing the committable frontier on every message submission. Evidence:
Agent: performance There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 🟡
--- a/src/sentry/remote_subscriptions/consumers/queue_consumer.py
+++ b/src/sentry/remote_subscriptions/consumers/queue_consumer.py
@@ -149,20 +149,33 @@ class OrderedQueueWorker(threading.Thread, Generic[T]):
def run(self) -> None:
"""Process items from the queue in order."""
while not self.shutdown:
+ work_item = None
try:
work_item = self.work_queue.get()
except queue.ShutDown:
break
+ # NOTE: complete_offset MUST be called in the finally block below,
+ # even when result_processor raises. Failing to do so will permanently
+ # stall get_committable_offsets() for this partition, since that method
+ # requires contiguous completion. The trade-off is that a failed work
+ # item's offset is committed anyway — callers should treat result_processor
+ # failures as non-fatal and handle errors internally (e.g., via dead-letter
+ # queue or metrics) rather than relying on Kafka redelivery.
try:
with sentry_sdk.start_transaction(
op="queue_worker.process",
name=f"monitors.{self.identifier}.worker_{self.worker_id}",
):
self.result_processor(self.identifier, work_item.result)
except queue.ShutDown:
- break
+ # ShutDown raised during processing — still complete the offset
+ # before exiting so we don't stall the commit pipeline.
+ if work_item is not None:
+ self.offset_tracker.complete_offset(work_item.partition, work_item.offset)
+ break
except Exception:
logger.exception(
"Unexpected error in queue worker", extra={"worker_id": self.worker_id}
)
+ metrics.incr(
+ "remote_subscriptions.queue_worker.processing_error",
+ tags={"identifier": self.identifier},
+ )
finally:
- self.offset_tracker.complete_offset(work_item.partition, work_item.offset)
+ # Guard against the (theoretically impossible) case where work_item
+ # was never assigned — e.g., if queue.get() raised something other
+ # than ShutDown before we broke out of the loop.
+ if work_item is not None:
+ self.offset_tracker.complete_offset(work_item.partition, work_item.offset)
metrics.gauge(
"remote_subscriptions.queue_worker.queue_depth",
self.work_queue.qsize(),
tags={
"identifier": self.identifier,
},
)🤖 Grapple PR auto-fix • major • Review this diff before applying There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 🟡
--- a/src/sentry/remote_subscriptions/consumers/queue_consumer.py
+++ b/src/sentry/remote_subscriptions/consumers/queue_consumer.py
@@ -38,6 +38,7 @@ class OffsetTracker:
def __init__(self) -> None:
self.all_offsets: dict[Partition, set[int]] = defaultdict(set)
self.outstanding: dict[Partition, set[int]] = defaultdict(set)
self.last_committed: dict[Partition, int] = {}
self.partition_locks: dict[Partition, threading.Lock] = {}
+ self._dirty = threading.Event()
def _get_partition_lock(self, partition: Partition) -> threading.Lock:
"""Get or create a lock for a partition."""
@@ -55,6 +56,7 @@ class OffsetTracker:
def complete_offset(self, partition: Partition, offset: int) -> None:
"""Mark an offset as completed."""
with self._get_partition_lock(partition):
self.outstanding[partition].discard(offset)
+ self._dirty.set()
def get_committable_offsets(self) -> dict[Partition, int]:
@@ -91,6 +93,7 @@ class OffsetTracker:
def mark_committed(self, partition: Partition, offset: int) -> None:
"""Update the last committed offset for a partition."""
with self._get_partition_lock(partition):
self.last_committed[partition] = offset
# Remove all offsets <= committed offset
self.all_offsets[partition] = {o for o in self.all_offsets[partition] if o > offset}
+ self._dirty.clear()
class OrderedQueueWorker(threading.Thread, Generic[T]):
@@ -273,6 +275,11 @@ class SimpleQueueProcessingStrategy(ProcessingStrategy[KafkaPayload], Generic[T
def _commit_loop(self) -> None:
while not self.shutdown_event.is_set():
try:
self.shutdown_event.wait(1.0)
+ # Skip the lock-heavy offset scan if no offsets have completed
+ # since the last commit cycle. This avoids unnecessary work when
+ # the consumer is idle or when all workers are still in-flight.
+ if not self.queue_pool.offset_tracker._dirty.is_set():
+ continue
+
committable = self.queue_pool.offset_tracker.get_committable_offsets()
if committable:🤖 Grapple PR auto-fix • minor • Review this diff before applying |
||||||||||||||||||||||
| metrics.incr( | ||||||||||||||||||||||
| "remote_subscriptions.queue_pool.offsets_committed", | ||||||||||||||||||||||
| len(committable), | ||||||||||||||||||||||
| tags={"identifier": self.queue_pool.identifier}, | ||||||||||||||||||||||
| ) | ||||||||||||||||||||||
|
|
||||||||||||||||||||||
| self.commit_function(committable) | ||||||||||||||||||||||
| for partition, offset in committable.items(): | ||||||||||||||||||||||
| self.queue_pool.offset_tracker.mark_committed(partition, offset) | ||||||||||||||||||||||
| except Exception: | ||||||||||||||||||||||
| logger.exception("Error in commit loop") | ||||||||||||||||||||||
|
|
||||||||||||||||||||||
| def submit(self, message: Message[KafkaPayload | FilteredPayload]) -> None: | ||||||||||||||||||||||
| try: | ||||||||||||||||||||||
| result = self.decoder(message.payload) | ||||||||||||||||||||||
|
|
||||||||||||||||||||||
| assert isinstance(message.value, BrokerValue) | ||||||||||||||||||||||
| partition = message.value.partition | ||||||||||||||||||||||
| offset = message.value.offset | ||||||||||||||||||||||
|
|
||||||||||||||||||||||
| if result is None: | ||||||||||||||||||||||
| self.queue_pool.offset_tracker.add_offset(partition, offset) | ||||||||||||||||||||||
| self.queue_pool.offset_tracker.complete_offset(partition, offset) | ||||||||||||||||||||||
| return | ||||||||||||||||||||||
|
|
||||||||||||||||||||||
| group_key = self.grouping_fn(result) | ||||||||||||||||||||||
|
|
||||||||||||||||||||||
| work_item = WorkItem( | ||||||||||||||||||||||
| partition=partition, | ||||||||||||||||||||||
| offset=offset, | ||||||||||||||||||||||
| result=result, | ||||||||||||||||||||||
| message=message, | ||||||||||||||||||||||
| ) | ||||||||||||||||||||||
|
|
||||||||||||||||||||||
| self.queue_pool.submit(group_key, work_item) | ||||||||||||||||||||||
|
|
||||||||||||||||||||||
| except Exception: | ||||||||||||||||||||||
| logger.exception("Error submitting message to queue") | ||||||||||||||||||||||
| if isinstance(message.value, BrokerValue): | ||||||||||||||||||||||
| self.queue_pool.offset_tracker.add_offset( | ||||||||||||||||||||||
| message.value.partition, message.value.offset | ||||||||||||||||||||||
| ) | ||||||||||||||||||||||
| self.queue_pool.offset_tracker.complete_offset( | ||||||||||||||||||||||
| message.value.partition, message.value.offset | ||||||||||||||||||||||
| ) | ||||||||||||||||||||||
|
|
||||||||||||||||||||||
| def poll(self) -> None: | ||||||||||||||||||||||
| stats = self.queue_pool.get_stats() | ||||||||||||||||||||||
| metrics.gauge( | ||||||||||||||||||||||
| "remote_subscriptions.queue_pool.total_queued", | ||||||||||||||||||||||
| stats["total_items"], | ||||||||||||||||||||||
| tags={"identifier": self.queue_pool.identifier}, | ||||||||||||||||||||||
| ) | ||||||||||||||||||||||
|
|
||||||||||||||||||||||
| def close(self) -> None: | ||||||||||||||||||||||
| self.shutdown_event.set() | ||||||||||||||||||||||
| self.commit_thread.join(timeout=5.0) | ||||||||||||||||||||||
| self.queue_pool.shutdown() | ||||||||||||||||||||||
|
|
||||||||||||||||||||||
| def terminate(self) -> None: | ||||||||||||||||||||||
| self.shutdown_event.set() | ||||||||||||||||||||||
| self.queue_pool.shutdown() | ||||||||||||||||||||||
|
|
||||||||||||||||||||||
| def join(self, timeout: float | None = None) -> None: | ||||||||||||||||||||||
| self.close() | ||||||||||||||||||||||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 🟡 MINOR — Missing Partitioning Key Null Check (confidence: 93%) The intent's edge cases specifically call out: 'What happens if the partitioning key is missing or None from a message — ensure a safe fallback routing behavior.' The queue routing likely uses a hash of the subscription_id to determine which queue a message goes to. If the subscription_id or partitioning key is None, Evidence:
Agent: logic There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 🟠 MAJOR — Scalability / Backpressure (confidence: 100%) The queue consumer uses Python's Evidence:
Agent: architecture There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 🟡
--- a/src/sentry/remote_subscriptions/consumers/queue_consumer.py
+++ b/src/sentry/remote_subscriptions/consumers/queue_consumer.py
@@ -14,7 +14,8 @@
from arroyo.backends.kafka.consumer import KafkaPayload
-from arroyo.processing.strategies import ProcessingStrategy
+from arroyo.processing.strategies import MessageRejected, ProcessingStrategy
from arroyo.types import BrokerValue, FilteredPayload, Message, Partition
from sentry.utils import metrics
@@ -159,6 +160,16 @@ class FixedQueuePool(Generic[T]):
"""
Fixed pool of queues that guarantees order within groups.
Key properties:
- Each group is consistently assigned to the same queue
- Each queue has exactly one worker thread
- Items within a queue are processed in FIFO order
- No dynamic reassignment that could break ordering
- Tracks offset completion for safe commits
+
+ Backpressure:
+ - Each queue has a bounded maxsize to prevent unbounded memory growth
+ - submit() raises MessageRejected when a queue is full, signaling Arroyo
+ to pause fetching while still calling poll() for Kafka heartbeats
+ - The poll loop is never blocked indefinitely, preventing consumer group
+ eviction due to missed heartbeats
"""
+ # Maximum items per worker queue. At 20 queues this caps total in-flight
+ # work items at 20 * MAX_QUEUE_SIZE. Tune based on message size and memory.
+ MAX_QUEUE_SIZE = 1000
+
+ # How long submit() waits for queue space before raising MessageRejected.
+ # Must be short enough that the poll loop isn't stalled past
+ # session.timeout.ms / max.poll.interval.ms. 100 ms is a safe default.
+ SUBMIT_TIMEOUT_SECS = 0.1
+
def __init__(
self,
result_processor: Callable[[str, T], None],
identifier: str,
num_queues: int = 20,
+ max_queue_size: int | None = None,
) -> None:
self.result_processor = result_processor
self.identifier = identifier
self.num_queues = num_queues
+ self.max_queue_size = max_queue_size if max_queue_size is not None else self.MAX_QUEUE_SIZE
self.offset_tracker = OffsetTracker()
self.queues: list[queue.Queue[WorkItem[T]]] = []
self.workers: list[OrderedQueueWorker[T]] = []
for i in range(num_queues):
- work_queue: queue.Queue[WorkItem[T]] = queue.Queue()
+ work_queue: queue.Queue[WorkItem[T]] = queue.Queue(maxsize=self.max_queue_size)
self.queues.append(work_queue)
worker = OrderedQueueWorker[T](
@@ -204,12 +219,30 @@ class FixedQueuePool(Generic[T]):
def submit(self, group_key: str, work_item: WorkItem[T]) -> None:
"""
Submit a work item to the appropriate queue.
+
+ Raises MessageRejected if the target queue is full, which signals Arroyo
+ to apply backpressure (pause fetching, keep calling poll() for heartbeats).
+ The offset is NOT registered with the tracker until we know the item was
+ accepted, so no phantom offsets are left outstanding on rejection.
"""
queue_index = self.get_queue_for_group(group_key)
work_queue = self.queues[queue_index]
- self.offset_tracker.add_offset(work_item.partition, work_item.offset)
- work_queue.put(work_item)
+ try:
+ # Use a short timeout so the poll loop is never blocked indefinitely.
+ # add_offset is called only after successful enqueue so that a
+ # rejected item doesn't leave a dangling outstanding offset entry
+ # that would stall future commits.
+ work_queue.put(work_item, block=True, timeout=self.SUBMIT_TIMEOUT_SECS)
+ self.offset_tracker.add_offset(work_item.partition, work_item.offset)
+ except queue.Full:
+ metrics.incr(
+ "remote_subscriptions.queue_pool.backpressure",
+ tags={
+ "identifier": self.identifier,
+ "queue_index": str(queue_index),
+ },
+ )
+ # Raise MessageRejected to signal Arroyo's consumer runner to pause
+ # polling new messages while continuing to call poll() so Kafka
+ # heartbeats are maintained.
+ raise MessageRejected()
def get_stats(self) -> dict[str, Any]:🤖 Grapple PR auto-fix • major • Review this diff before applying |
||||||||||||||||||||||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🟡 MINOR — API Contract / Consumer Registration (confidence: 100%)
The new mode
thread-queue-parallelis added to the CLI choices, but there's no validation downstream thatmax_workersis provided or has a sensible value when this mode is selected. The default falls back tomax_workers or 20inresult_consumer.py, but if someone passes--max-workers 0or a negative number, the behavior is undefined. TheFixedQueuePool.__init__doesn't validatenum_queues.Evidence:
num_queues=max_workers or 20— if max_workers=0, this would use 20 (falsy), but max_workers=-1 would use -1Agent: architecture