Skip to content

feat(uptime): Add ability to use queues to manage parallelism#7

Open
akshayutture-augment wants to merge 1 commit into
kafka-consumer-parallel-beforefrom
kafka-consumer-parallel-after
Open

feat(uptime): Add ability to use queues to manage parallelism#7
akshayutture-augment wants to merge 1 commit into
kafka-consumer-parallel-beforefrom
kafka-consumer-parallel-after

Conversation

@akshayutture-augment

@akshayutture-augment akshayutture-augment commented Nov 14, 2025

Copy link
Copy Markdown

Test 9

Summary by CodeRabbit

Release Notes

  • New Features

    • Introduced "thread-queue-parallel" processing mode enabling ordered, group-based task handling with improved throughput.
  • Documentation

    • Updated max-workers parameter description to clarify parallelism capabilities and usage.
  • Tests

    • Added comprehensive test coverage for thread-queue-parallel processing, validating ordering guarantees, concurrent group handling, offset tracking, and commit behavior.

✏️ Tip: You can customize this high-level summary in your review settings.


Replicated from ai-code-review-evaluation/sentry-coderabbit#9

One potential problem we have with batch processing is that any one slow
item will clog up the whole batch. This pr implements a queueing method
instead, where we keep N queues that each have their own workers.
There's still a chance of individual items backlogging a queue, but we
can try increased concurrency here to reduce the chances of that
happening

<!-- Describe your PR here. -->
@akshayutture-augment

Copy link
Copy Markdown
Author

@coderabbitai review

@akshayutture-augment

Copy link
Copy Markdown
Author

@coderabbitai review

@coderabbitai

coderabbitai Bot commented Nov 18, 2025

Copy link
Copy Markdown
✅ Actions performed

Review triggered.

Note: CodeRabbit is an incremental review system and does not re-review already reviewed commits. This command is applicable only when automatic reviews are paused.

@coderabbitai

coderabbitai Bot commented Nov 18, 2025

Copy link
Copy Markdown

Walkthrough

This PR introduces a new "thread-queue-parallel" processing mode for remote subscriptions consumers. It adds an in-process ordered queue processing subsystem with offset tracking, implements per-group work-item queues with dedicated worker threads, and integrates these components into the results consumer while maintaining backward compatibility with existing processing modes.

Changes

Cohort / File(s) Summary
Configuration updates
src/sentry/consumers/__init__.py
Added "thread-queue-parallel" mode option to uptime_options; updated max-workers help text to reflect generic parallelism semantics rather than thread-specific.
Core queue processing subsystem
src/sentry/remote_subscriptions/consumers/queue_consumer.py
New file introducing WorkItem dataclass for offset-tracked work items; OffsetTracker for thread-safe per-partition offset management; OrderedQueueWorker daemon thread for ordered work processing; FixedQueuePool for group-based queue assignment with per-group workers; SimpleQueueProcessingStrategy integrating queue pools into ProcessingStrategy framework with periodic offset commits.
Results consumer integration
src/sentry/remote_subscriptions/consumers/result_consumer.py
Extended ResultsStrategyFactory to recognize "thread-queue-parallel" mode; added queue_pool attribute; introduced create_thread_queue_parallel_worker method; updated create_with_partitions to route to queue-based strategy when mode is thread-queue-parallel; updated shutdown to tear down queue pools.
Queue consumer test suite
tests/sentry/remote_subscriptions/consumers/test_queue_consumer.py
New comprehensive test module with TestOffsetTracker, TestFixedQueuePool, TestSimpleQueueProcessingStrategy, and TestThreadQueueParallelIntegration classes covering offset tracking semantics, queue distribution, ordered processing, concurrent group handling, statistics reporting, and factory integration.
Uptime consumer integration tests
tests/sentry/uptime/consumers/test_results_consumer.py
Extended test coverage for thread-queue-parallel mode including multi-subscription parallelism, per-subscription ordering, offset commit verification, gap handling, error isolation, and Kafka-backed integration scenarios.

Sequence Diagram(s)

sequenceDiagram
    participant Consumer as Consumer<br/>(SimpleQueueProcessingStrategy)
    participant GroupQ as GroupQueue<br/>(per group)
    participant Worker as OrderedQueueWorker
    participant Tracker as OffsetTracker
    participant Commit as CommitLoop<br/>(background)

    Consumer->>Consumer: submit(message)
    Consumer->>Consumer: decode → WorkItem
    Consumer->>GroupQ: enqueue WorkItem[T]
    Consumer->>Tracker: add_offset(partition, offset)

    Worker->>GroupQ: dequeue WorkItem[T]
    Worker->>Worker: process(result)
    Worker->>Tracker: complete_offset(partition, offset)

    Commit->>Tracker: get_committable_offsets()
    Tracker-->>Commit: dict[Partition, offset]
    Commit->>Commit: commit_function(offsets)
    Commit->>Tracker: mark_committed(partition, offset)
Loading

Estimated code review effort

🎯 4 (Complex) | ⏱️ ~45 minutes

  • Key areas requiring attention:
    • Thread-safety guarantees in OffsetTracker: verify lock usage across add_offset, complete_offset, and get_committable_offsets paths
    • OrderedQueueWorker lifecycle: ensure daemon thread behavior doesn't cause resource leaks; verify proper signaling and error handling
    • FixedQueuePool queue assignment consistency: confirm hashing/modulo logic for group-to-queue mapping is deterministic
    • Offset commit loop semantics: validate that commit ordering respects gaps and prevents out-of-sequence commits
    • Integration with result_consumer: check mode branching logic and shutdown coordination
    • Test coverage breadth: review assertion coverage for concurrent scenarios and edge cases (e.g., offset gaps, filter rejections)

Poem

🐰 A queue of threads in lockstep dance,
Each group gets its own ordered chance,
Offsets tracked with care sublime,
Commits march forward—perfectly timed!
Parallel, yet ordered—a rabbit's delight! ✨

Pre-merge checks and finishing touches

❌ Failed checks (1 warning)
Check name Status Explanation Resolution
Docstring Coverage ⚠️ Warning Docstring coverage is 47.44% which is insufficient. The required threshold is 80.00%. You can run @coderabbitai generate docstrings to improve docstring coverage.
✅ Passed checks (2 passed)
Check name Status Explanation
Description Check ✅ Passed Check skipped - CodeRabbit’s high-level summary is enabled.
Title check ✅ Passed The PR title clearly and concisely describes the main change: adding a new queue-based parallelism mechanism for the uptime consumer. It accurately reflects the primary focus of the changeset across all modified files.
✨ Finishing touches
  • 📝 Generate docstrings
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Post copyable unit tests in a comment
  • Commit unit tests in branch kafka-consumer-parallel-after

Comment @coderabbitai help to get the list of available commands and usage tips.

@akshayutture-augment

Copy link
Copy Markdown
Author

@CodeRabbit review

@coderabbitai

coderabbitai Bot commented Nov 22, 2025

Copy link
Copy Markdown
✅ Actions performed

Review triggered.

Note: CodeRabbit is an incremental review system and does not re-review already reviewed commits. This command is applicable only when automatic reviews are paused.

@coderabbitai coderabbitai Bot left a comment

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 3

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (1)
src/sentry/remote_subscriptions/consumers/result_consumer.py (1)

96-153: Queue pool lifecycle is broken across rebalances; factory and strategy both shut down the shared pool.

The review comment's analysis is correct. Verification confirms a critical correctness issue:

  • SimpleQueueProcessingStrategy.close() and terminate() both call self.queue_pool.shutdown() (lines 338, 342 in queue_consumer.py)
  • ResultsStrategyFactory.shutdown() also calls self.queue_pool.shutdown() (line 184 in result_consumer.py)
  • FixedQueuePool.shutdown() is not idempotent and kills worker threads unconditionally (lines 231–243)

When Arroyo rebalances partitions, it closes the old strategy (killing the shared pool) before creating a new strategy with the same dead pool. Subsequent submit() calls enqueue work to queues with no active workers, causing silent message loss.

Required fixes:

  1. SimpleQueueProcessingStrategy.close() and terminate() should NOT shut down the pool; instead, only set self.shutdown_event and join the commit thread.
  2. Move pool shutdown responsibility exclusively to ResultsStrategyFactory.shutdown().
  3. Update test teardown (e.g., TestSimpleQueueProcessingStrategy.tearDown) to manually shut down any test-scoped pool after strategy closes.
🧹 Nitpick comments (5)
src/sentry/consumers/__init__.py (1)

116-143: New thread-queue-parallel mode wiring looks consistent; consider clarifying help text.

The added "thread-queue-parallel" choice aligns with the new mode supported by UptimeResultsStrategyFactory, and using a generic max-workers help string is fine. You might want to tweak the --mode help text to explicitly mention thread-queue-parallel (or “queue-based threading”) so operators understand how it differs from parallel/batched-parallel.

tests/sentry/uptime/consumers/test_results_consumer.py (2)

1689-2006: Thread-queue-parallel tests give solid coverage; waiting loops could be centralized.

The new test_thread_queue_parallel* methods do a good job validating fan‑out, ordering, offset commits, gaps, and shutdown for the new mode. The repeated “spin until queue_pool.get_stats()["total_items"] == 0 and N calls/guids seen” pattern appears in multiple tests; consider extracting a small helper (e.g. wait_for_queue_drain(factory.queue_pool, condition, timeout_s=5)) to reduce duplication and make it easier to tune timeouts if these ever get flaky under load.


1746-1746: Tidy up lint issues and exception handling in new tests.

A few small tweaks will keep linters quiet and improve readability:

  • track_calls(identifier, result) (Line 1746) and the various track_commits(offsets, force=False) helpers (Lines 1830, 1880, 1932) don’t use the identifier / force arguments. Either rename them to _identifier / _force or reference them (e.g. del force) to satisfy Ruff.
  • pytestmark = [requires_kafka] inside ProcessResultThreadQueueParallelKafkaTest (Lines 2012–2017) is a mutable class attribute; either annotate as a ClassVar or switch to a tuple, or move the marker to module scope.
  • The try/except Exception: pass around admin_client.delete_topics (Lines 2143–2146) should at least log the exception or narrow the exception type so real cleanup problems aren’t silently hidden.

These are test-only and won’t affect behavior, but they’ll help keep CI green.

Also applies to: 1830-1831, 1880-1881, 1932-1933, 2012-2017, 2143-2146

tests/sentry/remote_subscriptions/consumers/test_queue_consumer.py (1)

65-207: Queue subsystem tests are comprehensive; just address small nits.

These tests give strong coverage of:

  • Offset tracking across partitions and gaps.
  • Per-group queue assignment and ordering.
  • Concurrent processing across groups.
  • Commit behavior and invalid message handling.
  • Factory integration for thread-queue-parallel.

Two minor cleanups:

  • In TestSimpleQueueProcessingStrategy.setUp, result_processor(identifier, result) never uses identifier; rename it to _identifier or add del identifier to satisfy Ruff and clarify intent.
  • In TestFixedQueuePool.test_stats_reporting, the assert stats["total_items"] > 0 right after enqueuing could be racy in a very fast environment (workers might drain the queues before the first stats call). If you ever see flakiness, consider relaxing this to >= 0 and asserting the second stats call hits 0.

Otherwise these tests look good.

Also applies to: 209-373, 375-421

src/sentry/remote_subscriptions/consumers/queue_consumer.py (1)

34-106: OffsetTracker logic is sound; consider pruning outstanding offsets on commit.

The contiguous-commit logic in get_committable_offsets is correct and matches the tests’ expectations for gaps and multiple partitions. As a small refinement, mark_committed currently prunes all_offsets but not outstanding; in practice there shouldn’t be any outstanding offsets ≤ the committed offset, but if an invariant is ever broken, those entries would live forever. You could defensively clear them as well:

 def mark_committed(self, partition: Partition, offset: int) -> None:
     """Update the last committed offset for a partition."""
     with self._get_partition_lock(partition):
         self.last_committed[partition] = offset
-        # Remove all offsets <= committed offset
-        self.all_offsets[partition] = {o for o in self.all_offsets[partition] if o > offset}
+        # Remove all offsets <= committed offset
+        self.all_offsets[partition] = {o for o in self.all_offsets[partition] if o > offset}
+        self.outstanding[partition] = {o for o in self.outstanding[partition] if o > offset}

Not required today, but it tightens invariants.

📜 Review details

Configuration used: CodeRabbit UI

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 5c94383 and 33f4fdd.

📒 Files selected for processing (5)
  • src/sentry/consumers/__init__.py (2 hunks)
  • src/sentry/remote_subscriptions/consumers/queue_consumer.py (1 hunks)
  • src/sentry/remote_subscriptions/consumers/result_consumer.py (7 hunks)
  • tests/sentry/remote_subscriptions/consumers/test_queue_consumer.py (1 hunks)
  • tests/sentry/uptime/consumers/test_results_consumer.py (5 hunks)
🧰 Additional context used
🪛 Ruff (0.14.5)
tests/sentry/uptime/consumers/test_results_consumer.py

1746-1746: Unused function argument: identifier

(ARG001)


1830-1830: Unused function argument: force

(ARG001)


1880-1880: Unused function argument: force

(ARG001)


1932-1932: Unused function argument: force

(ARG001)


2017-2017: Mutable class attributes should be annotated with typing.ClassVar

(RUF012)


2145-2146: try-except-pass detected, consider logging the exception

(S110)


2145-2145: Do not catch blind exception: Exception

(BLE001)

src/sentry/remote_subscriptions/consumers/queue_consumer.py

344-344: Unused method argument: timeout

(ARG002)

tests/sentry/remote_subscriptions/consumers/test_queue_consumer.py

219-219: Unused function argument: identifier

(ARG001)

🔇 Additional comments (1)
src/sentry/remote_subscriptions/consumers/result_consumer.py (1)

106-139: Verification confirmed: all call sites use documented modes.

The spot-check of UptimeResultsStrategyFactory call sites confirms all invocations use only the documented modes:

  • "serial" (via ProcessResultSerialTest)
  • "parallel" (via ProcessResultParallelTest)
  • "batched-parallel" (lines 1108–1653 in test file)
  • "thread-queue-parallel" (lines 1693–2057 in test file)

The abstract base class constrains dynamic mode usage to ["batched-parallel", "parallel", "serial"] via its type hint, while thread-queue-parallel is tested through separate hardcoded calls. No undocumented modes are being used.

Comment on lines +108 to +197
class OrderedQueueWorker(threading.Thread, Generic[T]):
"""Worker thread that processes items from a queue in order."""

def __init__(
self,
worker_id: int,
work_queue: queue.Queue[WorkItem[T]],
result_processor: Callable[[str, T], None],
identifier: str,
offset_tracker: OffsetTracker,
) -> None:
super().__init__(daemon=True)
self.worker_id = worker_id
self.work_queue = work_queue
self.result_processor = result_processor
self.identifier = identifier
self.offset_tracker = offset_tracker
self.shutdown = False

def run(self) -> None:
"""Process items from the queue in order."""
while not self.shutdown:
try:
work_item = self.work_queue.get()
except queue.ShutDown:
break

try:
with sentry_sdk.start_transaction(
op="queue_worker.process",
name=f"monitors.{self.identifier}.worker_{self.worker_id}",
):
self.result_processor(self.identifier, work_item.result)

except queue.ShutDown:
break
except Exception:
logger.exception(
"Unexpected error in queue worker", extra={"worker_id": self.worker_id}
)
finally:
self.offset_tracker.complete_offset(work_item.partition, work_item.offset)
metrics.gauge(
"remote_subscriptions.queue_worker.queue_depth",
self.work_queue.qsize(),
tags={
"identifier": self.identifier,
},
)


class FixedQueuePool(Generic[T]):
"""
Fixed pool of queues that guarantees order within groups.

Key properties:
- Each group is consistently assigned to the same queue
- Each queue has exactly one worker thread
- Items within a queue are processed in FIFO order
- No dynamic reassignment that could break ordering
- Tracks offset completion for safe commits
"""

def __init__(
self,
result_processor: Callable[[str, T], None],
identifier: str,
num_queues: int = 20,
) -> None:
self.result_processor = result_processor
self.identifier = identifier
self.num_queues = num_queues
self.offset_tracker = OffsetTracker()
self.queues: list[queue.Queue[WorkItem[T]]] = []
self.workers: list[OrderedQueueWorker[T]] = []

for i in range(num_queues):
work_queue: queue.Queue[WorkItem[T]] = queue.Queue()
self.queues.append(work_queue)

worker = OrderedQueueWorker[T](
worker_id=i,
work_queue=work_queue,
result_processor=result_processor,
identifier=identifier,
offset_tracker=self.offset_tracker,
)
worker.start()
self.workers.append(worker)

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🔴 Critical

🧩 Analysis chain

Queue worker shutdown and error semantics are broken; fix queue.ShutDown usage and only mark successful items complete.

Two issues here:

  1. The code treats stdlib queue as if it had a ShutDown exception and Queue.shutdown() method. Those do not exist on queue.Queue, so FixedQueuePool.shutdown() will log exceptions for each queue and OrderedQueueWorker.run() will never break out on shutdown.
  2. OrderedQueueWorker.run() always calls offset_tracker.complete_offset in finally, even when result_processor raises, which makes failed items look successfully processed to the commit loop.

A concrete fix using queue.Empty and a success flag:

@@
 class OrderedQueueWorker(threading.Thread, Generic[T]):
@@
-    def run(self) -> None:
-        """Process items from the queue in order."""
-        while not self.shutdown:
-            try:
-                work_item = self.work_queue.get()
-            except queue.ShutDown:
-                break
-
-            try:
-                with sentry_sdk.start_transaction(
-                    op="queue_worker.process",
-                    name=f"monitors.{self.identifier}.worker_{self.worker_id}",
-                ):
-                    self.result_processor(self.identifier, work_item.result)
-
-            except queue.ShutDown:
-                break
-            except Exception:
-                logger.exception(
-                    "Unexpected error in queue worker", extra={"worker_id": self.worker_id}
-                )
-            finally:
-                self.offset_tracker.complete_offset(work_item.partition, work_item.offset)
-                metrics.gauge(
-                    "remote_subscriptions.queue_worker.queue_depth",
-                    self.work_queue.qsize(),
-                    tags={
-                        "identifier": self.identifier,
-                    },
-                )
+    def run(self) -> None:
+        """Process items from the queue in order."""
+        while True:
+            try:
+                # Wake up periodically so we can honor the shutdown flag even when idle.
+                work_item = self.work_queue.get(timeout=0.1)
+            except queue.Empty:
+                if self.shutdown:
+                    break
+                continue
+
+            success = False
+            try:
+                with sentry_sdk.start_transaction(
+                    op="queue_worker.process",
+                    name=f"monitors.{self.identifier}.worker_{self.worker_id}",
+                ):
+                    self.result_processor(self.identifier, work_item.result)
+                    success = True
+            except Exception:
+                logger.exception(
+                    "Unexpected error in queue worker", extra={"worker_id": self.worker_id}
+                )
+            finally:
+                # Only mark offsets complete on successful processing; failures leave a gap.
+                if success:
+                    self.offset_tracker.complete_offset(work_item.partition, work_item.offset)
+                metrics.gauge(
+                    "remote_subscriptions.queue_worker.queue_depth",
+                    self.work_queue.qsize(),
+                    tags={
+                        "identifier": self.identifier,
+                    },
+                )
@@
     def shutdown(self) -> None:
         """Gracefully shutdown all workers."""
         for worker in self.workers:
             worker.shutdown = True
-
-        for q in self.queues:
-            try:
-                q.shutdown(immediate=False)
-            except Exception:
-                logger.exception("Error shutting down queue")
-
         for worker in self.workers:
             worker.join(timeout=5.0)

This:

  • Uses queue.Empty with a short timeout to allow workers to exit once shutdown is set.
  • Removes references to nonexistent queue.ShutDown / Queue.shutdown.
  • Ensures failed items remain “outstanding” so they block commits for that partition, matching the new error-handling tests.

Also applies to: 231-244


🏁 Script executed:

#!/bin/bash
# After applying the change, re-run only the queue-consumer-related tests
# to confirm shutdown and error-handling semantics behave as intended.
rg -n "TestFixedQueuePool|TestSimpleQueueProcessingStrategy|thread_queue_parallel_error_handling" tests -n

Length of output: 352


I'll verify the issues by examining the actual code and test coverage.



🏁 Script executed:

#!/bin/bash
# Check the current queue_consumer.py code for the issues
cat src/sentry/remote_subscriptions/consumers/queue_consumer.py | sed -n '108,244p'

Length of output: 4691


🏁 Script executed:

#!/bin/bash
# Look for FixedQueuePool.shutdown() method
rg -n "def shutdown" src/sentry/remote_subscriptions/consumers/queue_consumer.py -A 10

Length of output: 378


🏁 Script executed:

#!/bin/bash
# Examine the relevant test cases for expected error-handling
cat tests/sentry/remote_subscriptions/consumers/test_queue_consumer.py | sed -n '65,150p'

Length of output: 3128


🏁 Script executed:

#!/bin/bash
# Check for any tests covering error scenarios
rg -n "exception|error|fail" tests/sentry/remote_subscriptions/consumers/test_queue_consumer.py -i -B 2 -A 5

Length of output: 0


🏁 Script executed:

#!/bin/bash
# Verify stdlib queue module lacks ShutDown and shutdown
python3 -c "import queue; print('ShutDown attr:', hasattr(queue, 'ShutDown')); q = queue.Queue(); print('shutdown method:', hasattr(q, 'shutdown'))"

Length of output: 43


🏁 Script executed:

#!/bin/bash
# Check test_thread_queue_parallel_error_handling test
cat tests/sentry/uptime/consumers/test_results_consumer.py | sed -n '1874,1920p'

Length of output: 1893


Let me check for error handling tests and the OffsetTracker implementation:



🏁 Script executed:

#!/bin/bash
# Look at full test file to find any error/failure test cases
cat tests/sentry/remote_subscriptions/consumers/test_queue_consumer.py

Length of output: 15122


🏁 Script executed:

#!/bin/bash
# Find OffsetTracker implementation and tests
find . -name "*.py" -type f | xargs grep -l "class OffsetTracker" | head -5

Length of output: 61


🏁 Script executed:

#!/bin/bash
# Search for error handling or exception tests in test_queue_consumer.py
rg -n "def test_" tests/sentry/remote_subscriptions/consumers/test_queue_consumer.py

Length of output: 711


🏁 Script executed:

#!/bin/bash
# Check if there are any comments or docstrings about offset completion behavior
rg -n "complete_offset|offset.*fail|failed.*offset" src/sentry/remote_subscriptions/consumers/queue_consumer.py -B 2 -A 2

Length of output: 979


🏁 Script executed:

#!/bin/bash
# Look for shutdown behavior expectations in tests
rg -n "shutdown" tests/sentry/remote_subscriptions/consumers/test_queue_consumer.py -B 3 -A 5

Length of output: 451


Fix non-existent queue.ShutDown exception, Queue.shutdown() method, and offset completion semantics.

All three issues are confirmed:

  1. queue.ShutDown exception doesn't exist: Lines 132 and 143 attempt to catch a non-existent exception. Python's stdlib queue module has no ShutDown exception, so these handlers will never trigger and workers won't exit cleanly on shutdown.

  2. Queue.shutdown() method doesn't exist: Line 238 calls q.shutdown(immediate=False), which doesn't exist on queue.Queue. The call is caught by the generic Exception handler and logged, but the shutdown is ineffective.

  3. Failed items marked complete unconditionally: Line 149 calls offset_tracker.complete_offset in the finally block even when result_processor (line 146) raises an exception. This masks processing failures and creates offset gaps that appear to block commits, contrary to the intended semantics of only marking successful items complete.

The review's suggested fix using queue.Empty with a timeout and a success flag is correct and aligns with the offset gap-based commit strategy shown in test_offset_gaps_block_commits.

Comment on lines +246 to +345
class SimpleQueueProcessingStrategy(ProcessingStrategy[KafkaPayload], Generic[T]):
"""
Processing strategy that uses a fixed pool of queues.

Guarantees:
- Items for the same group are processed in order
- No item is lost or processed out of order
- Natural backpressure when queues fill up
- Only commits offsets after successful processing
"""

def __init__(
self,
queue_pool: FixedQueuePool[T],
decoder: Callable[[KafkaPayload | FilteredPayload], T | None],
grouping_fn: Callable[[T], str],
commit_function: Callable[[dict[Partition, int]], None],
) -> None:
self.queue_pool = queue_pool
self.decoder = decoder
self.grouping_fn = grouping_fn
self.commit_function = commit_function
self.shutdown_event = threading.Event()

self.commit_thread = threading.Thread(target=self._commit_loop, daemon=True)
self.commit_thread.start()

def _commit_loop(self) -> None:
while not self.shutdown_event.is_set():
try:
self.shutdown_event.wait(1.0)

committable = self.queue_pool.offset_tracker.get_committable_offsets()

if committable:
metrics.incr(
"remote_subscriptions.queue_pool.offsets_committed",
len(committable),
tags={"identifier": self.queue_pool.identifier},
)

self.commit_function(committable)
for partition, offset in committable.items():
self.queue_pool.offset_tracker.mark_committed(partition, offset)
except Exception:
logger.exception("Error in commit loop")

def submit(self, message: Message[KafkaPayload | FilteredPayload]) -> None:
try:
result = self.decoder(message.payload)

assert isinstance(message.value, BrokerValue)
partition = message.value.partition
offset = message.value.offset

if result is None:
self.queue_pool.offset_tracker.add_offset(partition, offset)
self.queue_pool.offset_tracker.complete_offset(partition, offset)
return

group_key = self.grouping_fn(result)

work_item = WorkItem(
partition=partition,
offset=offset,
result=result,
message=message,
)

self.queue_pool.submit(group_key, work_item)

except Exception:
logger.exception("Error submitting message to queue")
if isinstance(message.value, BrokerValue):
self.queue_pool.offset_tracker.add_offset(
message.value.partition, message.value.offset
)
self.queue_pool.offset_tracker.complete_offset(
message.value.partition, message.value.offset
)

def poll(self) -> None:
stats = self.queue_pool.get_stats()
metrics.gauge(
"remote_subscriptions.queue_pool.total_queued",
stats["total_items"],
tags={"identifier": self.queue_pool.identifier},
)

def close(self) -> None:
self.shutdown_event.set()
self.commit_thread.join(timeout=5.0)
self.queue_pool.shutdown()

def terminate(self) -> None:
self.shutdown_event.set()
self.queue_pool.shutdown()

def join(self, timeout: float | None = None) -> None:
self.close()

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion | 🟠 Major

Align SimpleQueueProcessingStrategy lifecycle with factory ownership of the pool and quiet the unused timeout arg.

Given ResultsStrategyFactory now owns FixedQueuePool and calls queue_pool.shutdown() in its shutdown() method, SimpleQueueProcessingStrategy should avoid shutting the pool itself and focus only on its own commit thread:

     def close(self) -> None:
-        self.shutdown_event.set()
-        self.commit_thread.join(timeout=5.0)
-        self.queue_pool.shutdown()
+        self.shutdown_event.set()
+        self.commit_thread.join(timeout=5.0)
 
     def terminate(self) -> None:
-        self.shutdown_event.set()
-        self.queue_pool.shutdown()
+        self.shutdown_event.set()
 
     def join(self, timeout: float | None = None) -> None:
-        self.close()
+        # `timeout` is unused; we always wait up to 5s in `close`.
+        self.close()

In the queue-consumer tests, you can then explicitly call self.queue_pool.shutdown() in TestSimpleQueueProcessingStrategy.tearDown to clean up worker threads.

This prevents the pool from being torn down during a rebalance (when only the strategy is closed), and it resolves the double-shutdown behavior once the factory also calls shutdown(). The timeout parameter remains part of the interface but is intentionally ignored, which is fine for Arroyo’s ProcessingStrategy contract.

🧰 Tools
🪛 Ruff (0.14.5)

344-344: Unused method argument: timeout

(ARG002)

🤖 Prompt for AI Agents
In src/sentry/remote_subscriptions/consumers/queue_consumer.py around lines
246-345, the strategy currently shuts down the FixedQueuePool from
close/terminate/join which conflicts with the ResultsStrategyFactory ownership
and causes double-shutdown during rebalances; remove all calls to
self.queue_pool.shutdown() from close(), terminate(), and join(), keep setting
shutdown_event and joining the commit_thread (join may ignore the timeout arg),
and leave pool.shutdown() to be called by the factory/tests (e.g.
TestSimpleQueueProcessingStrategy.tearDown) so the strategy only manages its
commit thread lifecycle and not the pool.

Comment on lines +1874 to +1923
def test_thread_queue_parallel_error_handling(self) -> None:
"""
Test that errors in processing don't block offset commits for other messages.
"""
committed_offsets: dict[Partition, int] = {}

def track_commits(offsets: Mapping[Partition, int], force: bool = False) -> None:
committed_offsets.update(offsets)

factory = UptimeResultsStrategyFactory(
mode="thread-queue-parallel",
max_workers=2,
)

test_partition = Partition(Topic("test"), 1)
consumer = factory.create_with_partitions(track_commits, {test_partition: 0})

with mock.patch.object(type(factory.result_processor), "__call__") as mock_processor_call:
mock_processor_call.side_effect = [Exception("Processing failed"), None]

codec = kafka_definition.get_topic_codec(kafka_definition.Topic.UPTIME_RESULTS)

for offset, minutes in [(100, 5), (101, 4)]:
result = self.create_uptime_result(
self.subscription.subscription_id,
scheduled_check_time=datetime.now() - timedelta(minutes=minutes),
)
message = Message(
BrokerValue(
KafkaPayload(None, codec.encode(result), []),
test_partition,
offset,
datetime.now(),
)
)
consumer.submit(message)

queue_pool = factory.queue_pool
max_wait = 20
for _ in range(max_wait):
assert queue_pool is not None
stats = queue_pool.get_stats()
if stats["total_items"] == 0 and mock_processor_call.call_count >= 2:
time.sleep(0.2)
break
time.sleep(0.1)

assert mock_processor_call.call_count == 2
assert len(committed_offsets) == 0 or test_partition not in committed_offsets

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Error-handling expectation requires worker not to treat failed items as completed.

test_thread_queue_parallel_error_handling asserts that no offsets are committed when the first call to result_processor.__call__ raises. With the current queue worker implementation in queue_consumer.OrderedQueueWorker, offsets are always marked complete in a finally block, even on exceptions, which would let the commit loop advance past failed items.

To make this test (and semantics) hold, OrderedQueueWorker.run should only call offset_tracker.complete_offset when processing succeeds, leaving failed offsets outstanding so they block commits for that partition. See the comment in queue_consumer.py for a concrete patch.

🧰 Tools
🪛 Ruff (0.14.5)

1880-1880: Unused function argument: force

(ARG001)

🤖 Prompt for AI Agents
In tests/sentry/uptime/consumers/test_results_consumer.py around lines 1874–1923
the test expects that when result_processor.__call__ raises, the worker does NOT
mark the offset as completed; currently OrderedQueueWorker marks offsets
complete in a finally block which allows commits to advance past failed items.
Modify OrderedQueueWorker.run to call offset_tracker.complete_offset only on
successful processing (i.e., move the completion call into the successful path
and do not call it when an exception is raised), ensure exceptions are
propagated or handled so the queue retains the failed item as uncompleted, and
update any necessary tests/mocks to reflect the new behavior.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants