Skip to content

feat(uptime): Add ability to use queues to manage parallelism#12

Open
ShashankFC wants to merge 1 commit into
kafka-consumer-parallel-beforefrom
kafka-consumer-parallel-after
Open

feat(uptime): Add ability to use queues to manage parallelism#12
ShashankFC wants to merge 1 commit into
kafka-consumer-parallel-beforefrom
kafka-consumer-parallel-after

Conversation

@ShashankFC

Copy link
Copy Markdown
Contributor

Test 9

Summary by CodeRabbit

  • New Features

    • Added "thread-queue-parallel" execution mode for uptime processing, expanding available parallelism options.
    • Introduced ordered queue-based processing system for remote subscriptions with improved offset tracking and safe commit management.
  • Improvements

    • Updated help text for maximum worker parameter to reflect broader parallelism concepts.

✏️ Tip: You can customize this high-level summary in your review settings.


Replicated from ai-code-review-evaluation/sentry-coderabbit#9

One potential problem we have with batch processing is that any one slow
item will clog up the whole batch. This pr implements a queueing method
instead, where we keep N queues that each have their own workers.
There's still a chance of individual items backlogging a queue, but we
can try increased concurrency here to reduce the chances of that
happening

<!-- Describe your PR here. -->
@ShashankFC ShashankFC requested a review from Copilot January 30, 2026 10:33

Copilot AI left a comment

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

This PR introduces a new "thread-queue-parallel" execution mode for uptime monitoring that enables ordered queue-based processing of remote subscriptions with improved offset tracking and safe commit management.

Changes:

  • Added thread-queue-parallel processing mode with ordered queue-based message handling
  • Implemented offset tracking system with gap detection to ensure safe Kafka offset commits
  • Updated CLI options to support the new execution mode

Reviewed changes

Copilot reviewed 5 out of 7 changed files in this pull request and generated 4 comments.

Show a summary per file
File Description
tests/sentry/uptime/consumers/test_results_consumer.py Added comprehensive tests for thread-queue-parallel mode including ordering, concurrency, offset commit behavior, error handling, and Kafka integration tests
tests/sentry/remote_subscriptions/consumers/test_queue_consumer.py New test file covering OffsetTracker, FixedQueuePool, and SimpleQueueProcessingStrategy components
src/sentry/remote_subscriptions/consumers/result_consumer.py Extended ResultsStrategyFactory to support thread-queue-parallel mode with queue pool initialization and worker creation
src/sentry/remote_subscriptions/consumers/queue_consumer.py New implementation of ordered queue processing with offset tracking, worker threads, and safe commit logic
src/sentry/consumers/init.py Updated CLI options to include thread-queue-parallel mode and revised help text for max-workers parameter

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

while not self.shutdown:
try:
work_item = self.work_queue.get()
except queue.ShutDown:

Copilot AI Jan 30, 2026

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The exception name queue.ShutDown is incorrect. The Python queue module raises queue.Empty when get() times out, but there is no built-in ShutDown exception. This will cause an unhandled exception if the queue is shut down.

Copilot uses AI. Check for mistakes.
Comment on lines +236 to +241
for q in self.queues:
try:
q.shutdown(immediate=False)
except Exception:
logger.exception("Error shutting down queue")

Copilot AI Jan 30, 2026

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Python's queue.Queue does not have a shutdown() method. This will raise an AttributeError at runtime during graceful shutdown.

Suggested change
for q in self.queues:
try:
q.shutdown(immediate=False)
except Exception:
logger.exception("Error shutting down queue")

Copilot uses AI. Check for mistakes.

with mock.patch.object(type(factory.result_processor), "__call__") as mock_processor_call:
subscription_2 = self.create_uptime_subscription(
subscription_id=uuid.uuid4().hex, interval_seconds=300, url="http://santry.io"

Copilot AI Jan 30, 2026

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Corrected spelling of 'santry' to 'sentry'.

Copilot uses AI. Check for mistakes.
"""Process items from the queue in order."""
while not self.shutdown:
try:
work_item = self.work_queue.get()

Copilot AI Jan 30, 2026

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The get() call blocks indefinitely without a timeout. This makes it difficult to perform graceful shutdown since the thread will block waiting for items even after self.shutdown is set to True. Consider using get(timeout=1.0) and checking self.shutdown between attempts.

Suggested change
work_item = self.work_queue.get()
work_item = self.work_queue.get(timeout=1.0)
except queue.Empty:
# Periodically wake up to check for shutdown.
continue

Copilot uses AI. Check for mistakes.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants