Skip to content

feat(uptime): Add ability to use queues to manage parallelism#12

Open
ShashankFC wants to merge 1 commit into
kafka-consumer-parallel-beforefrom
kafka-consumer-parallel-after
Open

feat(uptime): Add ability to use queues to manage parallelism#12
ShashankFC wants to merge 1 commit into
kafka-consumer-parallel-beforefrom
kafka-consumer-parallel-after

Conversation

@ShashankFC

@ShashankFC ShashankFC commented Jan 22, 2026

Copy link
Copy Markdown
Contributor

Test 9

Summary by CodeRabbit

  • New Features

    • Added "thread-queue-parallel" execution mode for uptime processing, expanding available parallelism options.
    • Introduced ordered queue-based processing system for remote subscriptions with improved offset tracking and safe commit management.
  • Improvements

    • Updated help text for maximum worker parameter to reflect broader parallelism concepts.

✏️ Tip: You can customize this high-level summary in your review settings.


Replicated from ai-code-review-evaluation/sentry-coderabbit#9


Note

Medium Risk
Changes message processing and offset-commit behavior for a Kafka consumer via new multi-threaded queues and a background commit loop, which could cause ordering/commit regressions or shutdown edge cases if incorrect.

Overview
Adds a new thread-queue-parallel execution mode for uptime/remote subscription result consumers, using a fixed set of per-group FIFO queues to increase parallelism while preserving ordering within a subscription/group.

Introduces queue_consumer.py with queue workers plus an OffsetTracker and background commit loop to only commit Kafka offsets once all prior offsets in a partition are completed, and wires this strategy into ResultsStrategyFactory (including lifecycle shutdown handling). CLI uptime consumer options now expose the new mode and clarify --max-workers semantics; tests are expanded with unit coverage for ordering/offset behavior and an integration test that verifies committed offsets against a real Kafka consumer group.

Written by Cursor Bugbot for commit 33f4fdd. Configure here.

One potential problem we have with batch processing is that any one slow
item will clog up the whole batch. This pr implements a queueing method
instead, where we keep N queues that each have their own workers.
There's still a chance of individual items backlogging a queue, but we
can try increased concurrency here to reduce the chances of that
happening

<!-- Describe your PR here. -->
@ShashankFC

Copy link
Copy Markdown
Contributor Author

bugbot run

@cursor cursor Bot left a comment

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Cursor Bugbot has reviewed your changes and found 1 potential issue.

Bugbot Autofix is OFF. To automatically fix reported issues with Cloud Agents, enable Autofix in the Cursor dashboard.

def close(self) -> None:
self.shutdown_event.set()
self.commit_thread.join(timeout=5.0)
self.queue_pool.shutdown()

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shutdown ordering causes uncommitted offsets for drained items

Medium Severity

In close(), the commit thread is joined before the queue pool is shut down. queue_pool.shutdown() drains remaining items from worker queues — workers process those items and call complete_offset — but the commit thread is already dead at that point, so those newly completed offsets are never committed. On restart, the consumer re-reads and reprocesses those messages. Swapping the order — calling self.queue_pool.shutdown() first, then signaling and joining the commit thread — would allow the final _commit_loop iteration to commit all offsets including those from drained items.

Fix in Cursor Fix in Web

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants