Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions src/sentry/consumers/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ def uptime_options() -> list[click.Option]:
options = [
click.Option(
["--mode", "mode"],
type=click.Choice(["serial", "parallel", "batched-parallel"]),
type=click.Choice(["serial", "parallel", "batched-parallel", "thread-queue-parallel"]),
default="serial",
help="The mode to process results in. Parallel uses multithreading.",
),
Expand All @@ -138,7 +138,7 @@ def uptime_options() -> list[click.Option]:
["--max-workers", "max_workers"],
type=int,
default=None,
help="The maximum number of threads to spawn in parallel mode.",
help="The maximum amount of parallelism to use when in a parallel mode.",
),
click.Option(["--processes", "num_processes"], default=1, type=int),
click.Option(["--input-block-size"], type=int, default=None),
Expand Down
345 changes: 345 additions & 0 deletions src/sentry/remote_subscriptions/consumers/queue_consumer.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,345 @@
from __future__ import annotations

import logging
import queue
import threading
import time
from collections import defaultdict
from collections.abc import Callable
from dataclasses import dataclass
from typing import Any, Generic, TypeVar

import sentry_sdk
from arroyo.backends.kafka.consumer import KafkaPayload
from arroyo.processing.strategies import ProcessingStrategy
from arroyo.types import BrokerValue, FilteredPayload, Message, Partition

from sentry.utils import metrics

logger = logging.getLogger(__name__)

T = TypeVar("T")


@dataclass
class WorkItem(Generic[T]):
"""Work item that includes the original message for offset tracking."""

partition: Partition
offset: int
result: T
message: Message[KafkaPayload | FilteredPayload]


class OffsetTracker:
"""
Tracks outstanding offsets and determines which offsets are safe to commit.

- Tracks offsets per partition
- Only commits offsets when all prior offsets are processed
- Thread-safe for concurrent access with per-partition locks
"""

def __init__(self) -> None:
self.all_offsets: dict[Partition, set[int]] = defaultdict(set)
self.outstanding: dict[Partition, set[int]] = defaultdict(set)
self.last_committed: dict[Partition, int] = {}
self.partition_locks: dict[Partition, threading.Lock] = {}

def _get_partition_lock(self, partition: Partition) -> threading.Lock:
"""Get or create a lock for a partition."""
lock = self.partition_locks.get(partition)
if lock:
return lock
return self.partition_locks.setdefault(partition, threading.Lock())

def add_offset(self, partition: Partition, offset: int) -> None:
"""Record that we've started processing an offset."""
with self._get_partition_lock(partition):
self.all_offsets[partition].add(offset)
self.outstanding[partition].add(offset)

def complete_offset(self, partition: Partition, offset: int) -> None:
"""Mark an offset as completed."""
with self._get_partition_lock(partition):
self.outstanding[partition].discard(offset)

def get_committable_offsets(self) -> dict[Partition, int]:
"""
Get the highest offset per partition that can be safely committed.

For each partition, finds the highest contiguous offset that has been processed.
"""
committable = {}
for partition in list(self.all_offsets.keys()):
with self._get_partition_lock(partition):
all_offsets = self.all_offsets[partition]
if not all_offsets:
continue

outstanding = self.outstanding[partition]
last_committed = self.last_committed.get(partition, -1)

min_offset = min(all_offsets)
max_offset = max(all_offsets)

start = max(last_committed + 1, min_offset)

highest_committable = last_committed
for offset in range(start, max_offset + 1):
if offset in all_offsets and offset not in outstanding:
highest_committable = offset
else:
break

if highest_committable > last_committed:
committable[partition] = highest_committable

return committable

def mark_committed(self, partition: Partition, offset: int) -> None:
"""Update the last committed offset for a partition."""
with self._get_partition_lock(partition):
self.last_committed[partition] = offset
# Remove all offsets <= committed offset
self.all_offsets[partition] = {o for o in self.all_offsets[partition] if o > offset}


class OrderedQueueWorker(threading.Thread, Generic[T]):
"""Worker thread that processes items from a queue in order."""

def __init__(
self,
worker_id: int,
work_queue: queue.Queue[WorkItem[T]],
result_processor: Callable[[str, T], None],
identifier: str,
offset_tracker: OffsetTracker,
) -> None:
super().__init__(daemon=True)
self.worker_id = worker_id
self.work_queue = work_queue
self.result_processor = result_processor
self.identifier = identifier
self.offset_tracker = offset_tracker
self.shutdown = False

def run(self) -> None:
"""Process items from the queue in order."""
while not self.shutdown:
try:
work_item = self.work_queue.get()
except queue.ShutDown:
break

try:
with sentry_sdk.start_transaction(
op="queue_worker.process",
name=f"monitors.{self.identifier}.worker_{self.worker_id}",
):
self.result_processor(self.identifier, work_item.result)

except queue.ShutDown:
break
except Exception:
logger.exception(
"Unexpected error in queue worker", extra={"worker_id": self.worker_id}
)
finally:
self.offset_tracker.complete_offset(work_item.partition, work_item.offset)
metrics.gauge(
"remote_subscriptions.queue_worker.queue_depth",
self.work_queue.qsize(),
tags={
"identifier": self.identifier,
},
)


class FixedQueuePool(Generic[T]):
"""
Fixed pool of queues that guarantees order within groups.

Key properties:
- Each group is consistently assigned to the same queue
- Each queue has exactly one worker thread
- Items within a queue are processed in FIFO order
- No dynamic reassignment that could break ordering
- Tracks offset completion for safe commits
"""

def __init__(
self,
result_processor: Callable[[str, T], None],
identifier: str,
num_queues: int = 20,
) -> None:
self.result_processor = result_processor
self.identifier = identifier
self.num_queues = num_queues
self.offset_tracker = OffsetTracker()
self.queues: list[queue.Queue[WorkItem[T]]] = []
self.workers: list[OrderedQueueWorker[T]] = []

for i in range(num_queues):
work_queue: queue.Queue[WorkItem[T]] = queue.Queue()
self.queues.append(work_queue)

worker = OrderedQueueWorker[T](
worker_id=i,
work_queue=work_queue,
result_processor=result_processor,
identifier=identifier,
offset_tracker=self.offset_tracker,
)
worker.start()
self.workers.append(worker)

Comment on lines +108 to +197

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🔴 Critical

🧩 Analysis chain

Queue worker shutdown and error semantics are broken; fix queue.ShutDown usage and only mark successful items complete.

Two issues here:

  1. The code treats stdlib queue as if it had a ShutDown exception and Queue.shutdown() method. Those do not exist on queue.Queue, so FixedQueuePool.shutdown() will log exceptions for each queue and OrderedQueueWorker.run() will never break out on shutdown.
  2. OrderedQueueWorker.run() always calls offset_tracker.complete_offset in finally, even when result_processor raises, which makes failed items look successfully processed to the commit loop.

A concrete fix using queue.Empty and a success flag:

@@
 class OrderedQueueWorker(threading.Thread, Generic[T]):
@@
-    def run(self) -> None:
-        """Process items from the queue in order."""
-        while not self.shutdown:
-            try:
-                work_item = self.work_queue.get()
-            except queue.ShutDown:
-                break
-
-            try:
-                with sentry_sdk.start_transaction(
-                    op="queue_worker.process",
-                    name=f"monitors.{self.identifier}.worker_{self.worker_id}",
-                ):
-                    self.result_processor(self.identifier, work_item.result)
-
-            except queue.ShutDown:
-                break
-            except Exception:
-                logger.exception(
-                    "Unexpected error in queue worker", extra={"worker_id": self.worker_id}
-                )
-            finally:
-                self.offset_tracker.complete_offset(work_item.partition, work_item.offset)
-                metrics.gauge(
-                    "remote_subscriptions.queue_worker.queue_depth",
-                    self.work_queue.qsize(),
-                    tags={
-                        "identifier": self.identifier,
-                    },
-                )
+    def run(self) -> None:
+        """Process items from the queue in order."""
+        while True:
+            try:
+                # Wake up periodically so we can honor the shutdown flag even when idle.
+                work_item = self.work_queue.get(timeout=0.1)
+            except queue.Empty:
+                if self.shutdown:
+                    break
+                continue
+
+            success = False
+            try:
+                with sentry_sdk.start_transaction(
+                    op="queue_worker.process",
+                    name=f"monitors.{self.identifier}.worker_{self.worker_id}",
+                ):
+                    self.result_processor(self.identifier, work_item.result)
+                    success = True
+            except Exception:
+                logger.exception(
+                    "Unexpected error in queue worker", extra={"worker_id": self.worker_id}
+                )
+            finally:
+                # Only mark offsets complete on successful processing; failures leave a gap.
+                if success:
+                    self.offset_tracker.complete_offset(work_item.partition, work_item.offset)
+                metrics.gauge(
+                    "remote_subscriptions.queue_worker.queue_depth",
+                    self.work_queue.qsize(),
+                    tags={
+                        "identifier": self.identifier,
+                    },
+                )
@@
     def shutdown(self) -> None:
         """Gracefully shutdown all workers."""
         for worker in self.workers:
             worker.shutdown = True
-
-        for q in self.queues:
-            try:
-                q.shutdown(immediate=False)
-            except Exception:
-                logger.exception("Error shutting down queue")
-
         for worker in self.workers:
             worker.join(timeout=5.0)

This:

  • Uses queue.Empty with a short timeout to allow workers to exit once shutdown is set.
  • Removes references to nonexistent queue.ShutDown / Queue.shutdown.
  • Ensures failed items remain “outstanding” so they block commits for that partition, matching the new error-handling tests.

Also applies to: 231-244


🏁 Script executed:

#!/bin/bash
# After applying the change, re-run only the queue-consumer-related tests
# to confirm shutdown and error-handling semantics behave as intended.
rg -n "TestFixedQueuePool|TestSimpleQueueProcessingStrategy|thread_queue_parallel_error_handling" tests -n

Length of output: 352


I'll verify the issues by examining the actual code and test coverage.



🏁 Script executed:

#!/bin/bash
# Check the current queue_consumer.py code for the issues
cat src/sentry/remote_subscriptions/consumers/queue_consumer.py | sed -n '108,244p'

Length of output: 4691


🏁 Script executed:

#!/bin/bash
# Look for FixedQueuePool.shutdown() method
rg -n "def shutdown" src/sentry/remote_subscriptions/consumers/queue_consumer.py -A 10

Length of output: 378


🏁 Script executed:

#!/bin/bash
# Examine the relevant test cases for expected error-handling
cat tests/sentry/remote_subscriptions/consumers/test_queue_consumer.py | sed -n '65,150p'

Length of output: 3128


🏁 Script executed:

#!/bin/bash
# Check for any tests covering error scenarios
rg -n "exception|error|fail" tests/sentry/remote_subscriptions/consumers/test_queue_consumer.py -i -B 2 -A 5

Length of output: 0


🏁 Script executed:

#!/bin/bash
# Verify stdlib queue module lacks ShutDown and shutdown
python3 -c "import queue; print('ShutDown attr:', hasattr(queue, 'ShutDown')); q = queue.Queue(); print('shutdown method:', hasattr(q, 'shutdown'))"

Length of output: 43


🏁 Script executed:

#!/bin/bash
# Check test_thread_queue_parallel_error_handling test
cat tests/sentry/uptime/consumers/test_results_consumer.py | sed -n '1874,1920p'

Length of output: 1893


Let me check for error handling tests and the OffsetTracker implementation:



🏁 Script executed:

#!/bin/bash
# Look at full test file to find any error/failure test cases
cat tests/sentry/remote_subscriptions/consumers/test_queue_consumer.py

Length of output: 15122


🏁 Script executed:

#!/bin/bash
# Find OffsetTracker implementation and tests
find . -name "*.py" -type f | xargs grep -l "class OffsetTracker" | head -5

Length of output: 61


🏁 Script executed:

#!/bin/bash
# Search for error handling or exception tests in test_queue_consumer.py
rg -n "def test_" tests/sentry/remote_subscriptions/consumers/test_queue_consumer.py

Length of output: 711


🏁 Script executed:

#!/bin/bash
# Check if there are any comments or docstrings about offset completion behavior
rg -n "complete_offset|offset.*fail|failed.*offset" src/sentry/remote_subscriptions/consumers/queue_consumer.py -B 2 -A 2

Length of output: 979


🏁 Script executed:

#!/bin/bash
# Look for shutdown behavior expectations in tests
rg -n "shutdown" tests/sentry/remote_subscriptions/consumers/test_queue_consumer.py -B 3 -A 5

Length of output: 451


Fix non-existent queue.ShutDown exception, Queue.shutdown() method, and offset completion semantics.

All three issues are confirmed:

  1. queue.ShutDown exception doesn't exist: Lines 132 and 143 attempt to catch a non-existent exception. Python's stdlib queue module has no ShutDown exception, so these handlers will never trigger and workers won't exit cleanly on shutdown.

  2. Queue.shutdown() method doesn't exist: Line 238 calls q.shutdown(immediate=False), which doesn't exist on queue.Queue. The call is caught by the generic Exception handler and logged, but the shutdown is ineffective.

  3. Failed items marked complete unconditionally: Line 149 calls offset_tracker.complete_offset in the finally block even when result_processor (line 146) raises an exception. This masks processing failures and creates offset gaps that appear to block commits, contrary to the intended semantics of only marking successful items complete.

The review's suggested fix using queue.Empty with a timeout and a success flag is correct and aligns with the offset gap-based commit strategy shown in test_offset_gaps_block_commits.

def get_queue_for_group(self, group_key: str) -> int:
"""
Get queue index for a group using consistent hashing.
"""
return hash(group_key) % self.num_queues

def submit(self, group_key: str, work_item: WorkItem[T]) -> None:
"""
Submit a work item to the appropriate queue.
"""
queue_index = self.get_queue_for_group(group_key)
work_queue = self.queues[queue_index]

self.offset_tracker.add_offset(work_item.partition, work_item.offset)
work_queue.put(work_item)

def get_stats(self) -> dict[str, Any]:
"""Get statistics about queue depths."""
queue_depths = [q.qsize() for q in self.queues]
return {
"queue_depths": queue_depths,
"total_items": sum(queue_depths),
}

def wait_until_empty(self, timeout: float = 5.0) -> bool:
"""Wait until all queues are empty. Returns True if successful, False if timeout."""
start_time = time.time()
while time.time() - start_time < timeout:
if self.get_stats()["total_items"] == 0:
return True
time.sleep(0.01)
return False

def shutdown(self) -> None:
"""Gracefully shutdown all workers."""
for worker in self.workers:
worker.shutdown = True

for q in self.queues:
try:
q.shutdown(immediate=False)
except Exception:
logger.exception("Error shutting down queue")

for worker in self.workers:
worker.join(timeout=5.0)


class SimpleQueueProcessingStrategy(ProcessingStrategy[KafkaPayload], Generic[T]):
"""
Processing strategy that uses a fixed pool of queues.

Guarantees:
- Items for the same group are processed in order
- No item is lost or processed out of order
- Natural backpressure when queues fill up
- Only commits offsets after successful processing
"""

def __init__(
self,
queue_pool: FixedQueuePool[T],
decoder: Callable[[KafkaPayload | FilteredPayload], T | None],
grouping_fn: Callable[[T], str],
commit_function: Callable[[dict[Partition, int]], None],
) -> None:
self.queue_pool = queue_pool
self.decoder = decoder
self.grouping_fn = grouping_fn
self.commit_function = commit_function
self.shutdown_event = threading.Event()

self.commit_thread = threading.Thread(target=self._commit_loop, daemon=True)
self.commit_thread.start()

def _commit_loop(self) -> None:
while not self.shutdown_event.is_set():
try:
self.shutdown_event.wait(1.0)

committable = self.queue_pool.offset_tracker.get_committable_offsets()

if committable:
metrics.incr(
"remote_subscriptions.queue_pool.offsets_committed",
len(committable),
tags={"identifier": self.queue_pool.identifier},
)

self.commit_function(committable)
for partition, offset in committable.items():
self.queue_pool.offset_tracker.mark_committed(partition, offset)
except Exception:
logger.exception("Error in commit loop")

def submit(self, message: Message[KafkaPayload | FilteredPayload]) -> None:
try:
result = self.decoder(message.payload)

assert isinstance(message.value, BrokerValue)
partition = message.value.partition
offset = message.value.offset

if result is None:
self.queue_pool.offset_tracker.add_offset(partition, offset)
self.queue_pool.offset_tracker.complete_offset(partition, offset)
return

group_key = self.grouping_fn(result)

work_item = WorkItem(
partition=partition,
offset=offset,
result=result,
message=message,
)

self.queue_pool.submit(group_key, work_item)

except Exception:
logger.exception("Error submitting message to queue")
if isinstance(message.value, BrokerValue):
self.queue_pool.offset_tracker.add_offset(
message.value.partition, message.value.offset
)
self.queue_pool.offset_tracker.complete_offset(
message.value.partition, message.value.offset
)

def poll(self) -> None:
stats = self.queue_pool.get_stats()
metrics.gauge(
"remote_subscriptions.queue_pool.total_queued",
stats["total_items"],
tags={"identifier": self.queue_pool.identifier},
)

def close(self) -> None:
self.shutdown_event.set()
self.commit_thread.join(timeout=5.0)
self.queue_pool.shutdown()

def terminate(self) -> None:
self.shutdown_event.set()
self.queue_pool.shutdown()

def join(self, timeout: float | None = None) -> None:
self.close()
Comment on lines +246 to +345

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion | 🟠 Major

Align SimpleQueueProcessingStrategy lifecycle with factory ownership of the pool and quiet the unused timeout arg.

Given ResultsStrategyFactory now owns FixedQueuePool and calls queue_pool.shutdown() in its shutdown() method, SimpleQueueProcessingStrategy should avoid shutting the pool itself and focus only on its own commit thread:

     def close(self) -> None:
-        self.shutdown_event.set()
-        self.commit_thread.join(timeout=5.0)
-        self.queue_pool.shutdown()
+        self.shutdown_event.set()
+        self.commit_thread.join(timeout=5.0)
 
     def terminate(self) -> None:
-        self.shutdown_event.set()
-        self.queue_pool.shutdown()
+        self.shutdown_event.set()
 
     def join(self, timeout: float | None = None) -> None:
-        self.close()
+        # `timeout` is unused; we always wait up to 5s in `close`.
+        self.close()

In the queue-consumer tests, you can then explicitly call self.queue_pool.shutdown() in TestSimpleQueueProcessingStrategy.tearDown to clean up worker threads.

This prevents the pool from being torn down during a rebalance (when only the strategy is closed), and it resolves the double-shutdown behavior once the factory also calls shutdown(). The timeout parameter remains part of the interface but is intentionally ignored, which is fine for Arroyo’s ProcessingStrategy contract.

🧰 Tools
🪛 Ruff (0.14.5)

344-344: Unused method argument: timeout

(ARG002)

🤖 Prompt for AI Agents
In src/sentry/remote_subscriptions/consumers/queue_consumer.py around lines
246-345, the strategy currently shuts down the FixedQueuePool from
close/terminate/join which conflicts with the ResultsStrategyFactory ownership
and causes double-shutdown during rebalances; remove all calls to
self.queue_pool.shutdown() from close(), terminate(), and join(), keep setting
shutdown_event and joining the commit_thread (join may ignore the timeout arg),
and leave pool.shutdown() to be called by the factory/tests (e.g.
TestSimpleQueueProcessingStrategy.tearDown) so the strategy only manages its
commit thread lifecycle and not the pool.

Loading