Skip to content

feat(uptime): Add ability to use queues to manage parallelism#2

Open
ShashankFC wants to merge 1 commit into
kafka-consumer-parallel-beforefrom
kafka-consumer-parallel-after
Open

feat(uptime): Add ability to use queues to manage parallelism#2
ShashankFC wants to merge 1 commit into
kafka-consumer-parallel-beforefrom
kafka-consumer-parallel-after

Conversation

@ShashankFC

Copy link
Copy Markdown

Test 9

Summary by CodeRabbit

  • New Features

    • Added "thread-queue-parallel" execution mode for uptime processing, expanding available parallelism options.
    • Introduced ordered queue-based processing system for remote subscriptions with improved offset tracking and safe commit management.
  • Improvements

    • Updated help text for maximum worker parameter to reflect broader parallelism concepts.

✏️ Tip: You can customize this high-level summary in your review settings.


Replicated from ai-code-review-evaluation/sentry-coderabbit#9

One potential problem we have with batch processing is that any one slow
item will clog up the whole batch. This pr implements a queueing method
instead, where we keep N queues that each have their own workers.
There's still a chance of individual items backlogging a queue, but we
can try increased concurrency here to reduce the chances of that
happening

<!-- Describe your PR here. -->
@ShashankFC

Copy link
Copy Markdown
Author

@cubic-dev-ai review this pull request

@cubic-dev-ai

cubic-dev-ai Bot commented Feb 13, 2026

Copy link
Copy Markdown

@cubic-dev-ai review this pull request

@ShashankFC I have started the AI code review. It will take a few minutes to complete.

@cubic-dev-ai cubic-dev-ai Bot left a comment

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

4 issues found across 7 files

Prompt for AI agents (all issues)

Check if these issues are valid — if so, understand the root cause of each and fix them.


<file name="tests/sentry/uptime/consumers/test_results_consumer.py">

<violation number="1" location="tests/sentry/uptime/consumers/test_results_consumer.py:2106">
P2: `processor._shutdown()` and `factory.shutdown()` are called outside the `mock.patch.object` context manager. If any worker thread is still draining during shutdown, it will call the real (unpatched) processor `__call__`, potentially causing test failures. Move shutdown inside the `with` block, or add an assertion within the mock context that all messages have been processed before exiting.</violation>
</file>

<file name="tests/sentry/remote_subscriptions/consumers/test_queue_consumer.py">

<violation number="1" location="tests/sentry/remote_subscriptions/consumers/test_queue_consumer.py:365">
P2: Race condition: this test is timing-dependent and doesn't fully validate its stated purpose. After `mark_committed(partition, 100)` removes offset 100 from tracking, the next commit cycle's `start = max(101, min_offset=102) = 102` jumps over the gap at 101, committing 103 before 101 is even submitted. The first assertion (`== 100`) only holds because it runs within the 1-second commit interval. Under load, the commit loop could fire again before offset 101 is submitted, skipping the gap entirely.

Consider using a mock or direct OffsetTracker unit test to validate gap-blocking behavior without relying on timing, or add an explicit wait/assertion between the first commit and the second submit.</violation>
</file>

<file name="src/sentry/remote_subscriptions/consumers/queue_consumer.py">

<violation number="1" location="src/sentry/remote_subscriptions/consumers/queue_consumer.py:185">
P1: Queues are created without `maxsize`, meaning they are unbounded. If processing falls behind ingestion, messages pile up in memory indefinitely, which can lead to OOM. This also contradicts the class docstring that claims "Natural backpressure when queues fill up." Consider setting `maxsize` (e.g., via a constructor parameter) to enable actual backpressure via blocking `put()`.</violation>

<violation number="2" location="src/sentry/remote_subscriptions/consumers/queue_consumer.py:335">
P2: Shutdown ordering issue: the commit thread is stopped before workers are drained. Workers may complete processing of items after the last commit cycle, and those offset completions will be lost. On consumer restart, those messages will be reprocessed. Consider reversing the order: first drain the queue pool (so workers finish), then do a final commit, then shut down the commit thread.</violation>
</file>

Reply with feedback, questions, or to request a fix. Tag @cubic-dev-ai to re-run a review.

self.workers: list[OrderedQueueWorker[T]] = []

for i in range(num_queues):
work_queue: queue.Queue[WorkItem[T]] = queue.Queue()

@cubic-dev-ai cubic-dev-ai Bot Feb 13, 2026

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P1: Queues are created without maxsize, meaning they are unbounded. If processing falls behind ingestion, messages pile up in memory indefinitely, which can lead to OOM. This also contradicts the class docstring that claims "Natural backpressure when queues fill up." Consider setting maxsize (e.g., via a constructor parameter) to enable actual backpressure via blocking put().

Prompt for AI agents
Check if this issue is valid — if so, understand the root cause and fix it. At src/sentry/remote_subscriptions/consumers/queue_consumer.py, line 185:

<comment>Queues are created without `maxsize`, meaning they are unbounded. If processing falls behind ingestion, messages pile up in memory indefinitely, which can lead to OOM. This also contradicts the class docstring that claims "Natural backpressure when queues fill up." Consider setting `maxsize` (e.g., via a constructor parameter) to enable actual backpressure via blocking `put()`.</comment>

<file context>
@@ -0,0 +1,345 @@
+        self.workers: list[OrderedQueueWorker[T]] = []
+
+        for i in range(num_queues):
+            work_queue: queue.Queue[WorkItem[T]] = queue.Queue()
+            self.queues.append(work_queue)
+
</file context>
Fix with Cubic

processor._run_once()
time.sleep(0.1)

processor._shutdown()

@cubic-dev-ai cubic-dev-ai Bot Feb 13, 2026

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2: processor._shutdown() and factory.shutdown() are called outside the mock.patch.object context manager. If any worker thread is still draining during shutdown, it will call the real (unpatched) processor __call__, potentially causing test failures. Move shutdown inside the with block, or add an assertion within the mock context that all messages have been processed before exiting.

Prompt for AI agents
Check if this issue is valid — if so, understand the root cause and fix it. At tests/sentry/uptime/consumers/test_results_consumer.py, line 2106:

<comment>`processor._shutdown()` and `factory.shutdown()` are called outside the `mock.patch.object` context manager. If any worker thread is still draining during shutdown, it will call the real (unpatched) processor `__call__`, potentially causing test failures. Move shutdown inside the `with` block, or add an assertion within the mock context that all messages have been processed before exiting.</comment>

<file context>
@@ -1675,6 +1686,461 @@ def test_parallel_grouping(self, mock_process_group) -> None:
+                        processor._run_once()
+                        time.sleep(0.1)
+
+                processor._shutdown()
+                factory.shutdown()
+
</file context>
Fix with Cubic


assert self.process_complete_event.wait(timeout=5.0), "Processing did not complete in time"
assert self.commit_event.wait(timeout=2.0), "Commit did not happen in time"
assert self.committed_offsets.get(partition) == 100

@cubic-dev-ai cubic-dev-ai Bot Feb 13, 2026

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2: Race condition: this test is timing-dependent and doesn't fully validate its stated purpose. After mark_committed(partition, 100) removes offset 100 from tracking, the next commit cycle's start = max(101, min_offset=102) = 102 jumps over the gap at 101, committing 103 before 101 is even submitted. The first assertion (== 100) only holds because it runs within the 1-second commit interval. Under load, the commit loop could fire again before offset 101 is submitted, skipping the gap entirely.

Consider using a mock or direct OffsetTracker unit test to validate gap-blocking behavior without relying on timing, or add an explicit wait/assertion between the first commit and the second submit.

Prompt for AI agents
Check if this issue is valid — if so, understand the root cause and fix it. At tests/sentry/remote_subscriptions/consumers/test_queue_consumer.py, line 365:

<comment>Race condition: this test is timing-dependent and doesn't fully validate its stated purpose. After `mark_committed(partition, 100)` removes offset 100 from tracking, the next commit cycle's `start = max(101, min_offset=102) = 102` jumps over the gap at 101, committing 103 before 101 is even submitted. The first assertion (`== 100`) only holds because it runs within the 1-second commit interval. Under load, the commit loop could fire again before offset 101 is submitted, skipping the gap entirely.

Consider using a mock or direct OffsetTracker unit test to validate gap-blocking behavior without relying on timing, or add an explicit wait/assertion between the first commit and the second submit.</comment>

<file context>
@@ -0,0 +1,421 @@
+
+        assert self.process_complete_event.wait(timeout=5.0), "Processing did not complete in time"
+        assert self.commit_event.wait(timeout=2.0), "Commit did not happen in time"
+        assert self.committed_offsets.get(partition) == 100
+
+        self.expected_items = 4
</file context>
Fix with Cubic

tags={"identifier": self.queue_pool.identifier},
)

def close(self) -> None:

@cubic-dev-ai cubic-dev-ai Bot Feb 13, 2026

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2: Shutdown ordering issue: the commit thread is stopped before workers are drained. Workers may complete processing of items after the last commit cycle, and those offset completions will be lost. On consumer restart, those messages will be reprocessed. Consider reversing the order: first drain the queue pool (so workers finish), then do a final commit, then shut down the commit thread.

Prompt for AI agents
Check if this issue is valid — if so, understand the root cause and fix it. At src/sentry/remote_subscriptions/consumers/queue_consumer.py, line 335:

<comment>Shutdown ordering issue: the commit thread is stopped before workers are drained. Workers may complete processing of items after the last commit cycle, and those offset completions will be lost. On consumer restart, those messages will be reprocessed. Consider reversing the order: first drain the queue pool (so workers finish), then do a final commit, then shut down the commit thread.</comment>

<file context>
@@ -0,0 +1,345 @@
+            tags={"identifier": self.queue_pool.identifier},
+        )
+
+    def close(self) -> None:
+        self.shutdown_event.set()
+        self.commit_thread.join(timeout=5.0)
</file context>
Fix with Cubic

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants