Skip to content

feat(uptime): Add ability to use queues to manage parallelism#14

Open
ShashankFC wants to merge 1 commit into
kafka-consumer-parallel-beforefrom
kafka-consumer-parallel-after
Open

feat(uptime): Add ability to use queues to manage parallelism#14
ShashankFC wants to merge 1 commit into
kafka-consumer-parallel-beforefrom
kafka-consumer-parallel-after

Conversation

@ShashankFC

@ShashankFC ShashankFC commented Jan 5, 2026

Copy link
Copy Markdown
Collaborator

Test 9

Summary by CodeRabbit

  • New Features

    • Added "thread-queue-parallel" execution mode for uptime processing, expanding available parallelism options.
    • Introduced ordered queue-based processing system for remote subscriptions with improved offset tracking and safe commit management.
  • Improvements

    • Updated help text for maximum worker parameter to reflect broader parallelism concepts.

✏️ Tip: You can customize this high-level summary in your review settings.


Replicated from ai-code-review-evaluation/sentry-coderabbit#9

One potential problem we have with batch processing is that any one slow
item will clog up the whole batch. This pr implements a queueing method
instead, where we keep N queues that each have their own workers.
There's still a chance of individual items backlogging a queue, but we
can try increased concurrency here to reduce the chances of that
happening

<!-- Describe your PR here. -->
@ShashankFC

Copy link
Copy Markdown
Collaborator Author

@coderabbitai review

@coderabbitai

coderabbitai Bot commented Jan 6, 2026

Copy link
Copy Markdown
✅ Actions performed

Review triggered.

Note: CodeRabbit is an incremental review system and does not re-review already reviewed commits. This command is applicable only when automatic reviews are paused.

@coderabbitai

coderabbitai Bot commented Jan 6, 2026

Copy link
Copy Markdown
📝 Walkthrough

Walkthrough

This change introduces a new "thread-queue-parallel" execution mode for processing remote subscriptions. It implements an ordered-queue system with per-partition offset tracking, a fixed pool of worker queues using consistent hashing for group-based ordering, and integrates this strategy into the results consumer factory, along with comprehensive tests.

Changes

Cohort / File(s) Summary
Configuration & Mode Options
src/sentry/consumers/__init__.py
Added "thread-queue-parallel" to uptime mode choices; updated help text for max-workers to reflect broader parallelism semantics.
Core Queue Processing System
src/sentry/remote_subscriptions/consumers/queue_consumer.py
New module implementing ordered-queue processing: WorkItem dataclass, OffsetTracker (per-partition offset lifecycle and committable offset computation), OrderedQueueWorker (worker thread with metrics integration), FixedQueuePool (fixed queue set with consistent hashing for group ordering), and SimpleQueueProcessingStrategy (ProcessingStrategy wrapper with built-in offset commit loop).
Results Consumer Integration
src/sentry/remote_subscriptions/consumers/result_consumer.py
Extended ResultsStrategyFactory with thread-queue-parallel mode: added thread_queue_parallel flag, queue_pool attribute, FixedQueuePool initialization, queue pool shutdown handling, and create_thread_queue_parallel_worker() method for strategy creation.
Queue Consumer Tests
tests/sentry/remote_subscriptions/consumers/test_queue_consumer.py
New comprehensive test suite covering OffsetTracker offset lifecycle, FixedQueuePool group-to-queue mapping and ordering, SimpleQueueProcessingStrategy message processing and offset commits, and integration with ResultsStrategyFactory.
Uptime Results Consumer Tests
tests/sentry/uptime/consumers/test_results_consumer.py
Extended test coverage with integration scenarios for thread-queue-parallel mode, real Kafka offset tracking and commits, ordering guarantees, concurrent subscriptions, and error handling via KafkaConsumer/Producer/AdminClient.

Sequence Diagrams

sequenceDiagram
    participant Consumer as Consumer
    participant Strategy as SimpleQueueProcessingStrategy
    participant Pool as FixedQueuePool
    participant Queue as OrderedQueueWorker
    participant Tracker as OffsetTracker
    participant Processor as Result Processor

    rect rgb(200, 220, 240)
    Note over Consumer,Processor: Message Submission & Queueing
    Consumer->>Strategy: submit(message)
    Strategy->>Strategy: decoder(payload)
    Strategy->>Strategy: grouping_fn(result)
    Strategy->>Tracker: add_offset(partition, offset)
    Strategy->>Pool: submit(group_key, work_item)
    Pool->>Queue: enqueue(work_item)
    end

    rect rgb(230, 240, 200)
    Note over Queue,Processor: Ordered Processing
    Queue->>Queue: dequeue work_item
    Queue->>Processor: invoke result_processor
    Processor-->>Queue: completion
    Queue->>Tracker: complete_offset(partition, offset)
    end

    rect rgb(240, 220, 200)
    Note over Strategy,Tracker: Periodic Offset Commits
    Strategy->>Tracker: get_committable_offsets()
    Tracker-->>Strategy: {partition → max_offset}
    Strategy->>Strategy: commit_function(offsets)
    Strategy->>Tracker: mark_committed(partition, offset)
    end
Loading
sequenceDiagram
    participant OT as OffsetTracker
    participant Pool as FixedQueuePool
    participant Worker as OrderedQueueWorker

    rect rgb(220, 235, 240)
    Note over OT,Worker: Offset Tracking Lifecycle
    Worker->>OT: add_offset(partition=0, offset=100)
    Worker->>OT: add_offset(partition=0, offset=101)
    Worker->>OT: add_offset(partition=0, offset=102)
    end

    rect rgb(240, 235, 220)
    Note over OT,Worker: Partial Completion (offset 101 pending)
    Worker->>OT: complete_offset(partition=0, offset=100)
    Worker->>OT: complete_offset(partition=0, offset=102)
    OT->>OT: offset 100 done, 101 in-flight, 102 done
    end

    rect rgb(220, 240, 220)
    Note over OT,Worker: Committable Offset Query
    Pool->>OT: get_committable_offsets()
    Note over OT: return {partition=0 → 100}<br/>(gap at 101 blocks further commits)
    OT-->>Pool: {partition=0 → 100}
    end

    rect rgb(240, 220, 220)
    Note over OT,Worker: Complete & Commit
    Worker->>OT: complete_offset(partition=0, offset=101)
    Pool->>OT: get_committable_offsets()
    Note over OT: now 100, 101, 102 all done
    OT-->>Pool: {partition=0 → 102}
    Pool->>OT: mark_committed(partition=0, offset=102)
    end
Loading

Estimated code review effort

🎯 4 (Complex) | ⏱️ ~75 minutes

Poem

🐰 A queue of workers, in thread-y delight,
Ordered by group, processing right,
Offsets tracked safe, no commit left behind,
Parallelism achieved with synchronization in mind! 🎯

Pre-merge checks and finishing touches

❌ Failed checks (1 warning)
Check name Status Explanation Resolution
Docstring Coverage ⚠️ Warning Docstring coverage is 47.44% which is insufficient. The required threshold is 80.00%. You can run @coderabbitai generate docstrings to improve docstring coverage.
✅ Passed checks (2 passed)
Check name Status Explanation
Description Check ✅ Passed Check skipped - CodeRabbit’s high-level summary is enabled.
Title check ✅ Passed The pull request title accurately summarizes the main change: introducing a queue-based parallelism mechanism to replace batch processing, which is the primary focus across all modified files.
✨ Finishing touches
  • 📝 Generate docstrings

Comment @coderabbitai help to get the list of available commands and usage tips.

@coderabbitai coderabbitai Bot left a comment

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 0

🧹 Nitpick comments (4)
tests/sentry/uptime/consumers/test_results_consumer.py (2)

1874-1924: Clarify the test docstring to match the assertion.

The docstring says "errors in processing don't block offset commits for other messages," but the assertion (line 1922) verifies that no commits happen when the first message fails. This is correct behavior—offsets must be committed in order—but the docstring is misleading.

🔎 Suggested docstring fix
     def test_thread_queue_parallel_error_handling(self) -> None:
         """
-        Test that errors in processing don't block offset commits for other messages.
+        Test that errors in processing block offset commits to maintain ordering guarantees.
         """

2142-2146: Consider logging the exception during cleanup.

The bare except Exception: pass swallows errors silently during topic cleanup. While acceptable for cleanup code, logging would help with debugging failed tests.

🔎 Suggested improvement
         finally:
             try:
                 admin_client.delete_topics([test_topic])
-            except Exception:
-                pass
+            except Exception as e:
+                # Log but don't fail - this is cleanup code
+                import logging
+                logging.getLogger(__name__).debug(f"Failed to delete test topic: {e}")
src/sentry/remote_subscriptions/consumers/queue_consumer.py (2)

344-345: Unused timeout parameter in join() method.

The timeout parameter is not used in the join() method implementation. This could be intentional (the method just calls close() which has its own timeouts), but it may confuse callers who expect the timeout to be honored.

🔎 Suggested fix
-    def join(self, timeout: float | None = None) -> None:
+    def join(self, timeout: float | None = None) -> None:  # noqa: ARG002
         self.close()

Or pass the timeout to internal operations:

     def join(self, timeout: float | None = None) -> None:
-        self.close()
+        self.shutdown_event.set()
+        self.commit_thread.join(timeout=timeout)
+        self.queue_pool.shutdown()

293-325: Consider replacing assert with explicit check for BrokerValue.

Line 297 uses assert isinstance(message.value, BrokerValue) for runtime validation. Asserts can be disabled with Python's -O flag, which would cause the subsequent code to fail with a less clear error.

🔎 Suggested improvement
     def submit(self, message: Message[KafkaPayload | FilteredPayload]) -> None:
         try:
             result = self.decoder(message.payload)
 
-            assert isinstance(message.value, BrokerValue)
+            if not isinstance(message.value, BrokerValue):
+                logger.warning("Received non-BrokerValue message, skipping")
+                return
             partition = message.value.partition
             offset = message.value.offset
📜 Review details

Configuration used: defaults

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 5c94383 and 33f4fdd.

📒 Files selected for processing (7)
  • src/sentry/consumers/__init__.py
  • src/sentry/remote_subscriptions/consumers/queue_consumer.py
  • src/sentry/remote_subscriptions/consumers/result_consumer.py
  • tests/sentry/remote_subscriptions/__init__.py
  • tests/sentry/remote_subscriptions/consumers/__init__.py
  • tests/sentry/remote_subscriptions/consumers/test_queue_consumer.py
  • tests/sentry/uptime/consumers/test_results_consumer.py
🧰 Additional context used
🧬 Code graph analysis (3)
src/sentry/remote_subscriptions/consumers/result_consumer.py (1)
src/sentry/remote_subscriptions/consumers/queue_consumer.py (3)
  • FixedQueuePool (159-243)
  • SimpleQueueProcessingStrategy (246-345)
  • shutdown (231-243)
tests/sentry/remote_subscriptions/consumers/test_queue_consumer.py (1)
src/sentry/remote_subscriptions/consumers/queue_consumer.py (12)
  • FixedQueuePool (159-243)
  • OffsetTracker (34-105)
  • WorkItem (25-31)
  • add_offset (56-60)
  • get_committable_offsets (67-98)
  • complete_offset (62-65)
  • mark_committed (100-105)
  • shutdown (231-243)
  • get_queue_for_group (198-202)
  • submit (204-212)
  • submit (293-325)
  • get_stats (214-220)
tests/sentry/uptime/consumers/test_results_consumer.py (3)
src/sentry/conf/types/kafka_definition.py (2)
  • Topic (12-109)
  • get_topic_codec (150-154)
src/sentry/utils/kafka_config.py (1)
  • get_kafka_admin_cluster_options (92-97)
src/sentry/uptime/consumers/results_consumer.py (1)
  • UptimeResultsStrategyFactory (600-606)
🪛 Ruff (0.14.10)
tests/sentry/remote_subscriptions/consumers/test_queue_consumer.py

219-219: Unused function argument: identifier

(ARG001)

src/sentry/remote_subscriptions/consumers/queue_consumer.py

344-344: Unused method argument: timeout

(ARG002)

tests/sentry/uptime/consumers/test_results_consumer.py

1746-1746: Unused function argument: identifier

(ARG001)


1830-1830: Unused function argument: force

(ARG001)


1880-1880: Unused function argument: force

(ARG001)


1932-1932: Unused function argument: force

(ARG001)


2017-2017: Mutable class attributes should be annotated with typing.ClassVar

(RUF012)


2145-2146: try-except-pass detected, consider logging the exception

(S110)


2145-2145: Do not catch blind exception: Exception

(BLE001)

🔇 Additional comments (24)
src/sentry/consumers/__init__.py (1)

121-121: LGTM!

The addition of thread-queue-parallel to the mode choices and the updated help text for --max-workers are appropriate. The help text change correctly reflects that max_workers now controls num_queues in the thread-queue-parallel mode, not just thread count.

Also applies to: 141-141

tests/sentry/uptime/consumers/test_results_consumer.py (7)

1-66: LGTM on new imports.

The new imports are appropriate for the added Kafka integration tests and thread-queue-parallel testing functionality.


1689-1731: LGTM on thread-queue-parallel basic test.

The test correctly validates that the thread-queue-parallel mode processes messages and invokes the result processor. Good use of polling with timeout and proper cleanup via factory.shutdown().


1733-1778: LGTM on order preservation test.

This test validates a critical invariant of the thread-queue-parallel mode: that messages for the same subscription are processed in order. The implementation correctly tracks processing order via GUIDs.


1780-1822: LGTM on concurrent subscriptions test.

The test validates that different subscriptions are processed when using thread-queue-parallel mode.


1824-1872: LGTM on offset commit test.

The test correctly validates that offsets are committed after successful processing, with the expected offset of 105 (next offset to read after processing messages at offsets 100-104).


1926-1985: LGTM on offset gaps test.

This test correctly validates that offset gaps prevent committing past the gap—a critical behavior for ensuring no data loss. Processing offsets 100, 102, 103 (with 101 missing) should only allow committing up to 101.


1987-2006: LGTM on graceful shutdown test.

The test validates that factory.shutdown() properly cleans up the queue pool.

tests/sentry/remote_subscriptions/consumers/test_queue_consumer.py (5)

1-20: LGTM on module structure and imports.

The test file is well-organized with clear imports for the components under test.


22-63: LGTM on OffsetTracker tests.

The tests correctly validate the core offset tracking behavior: contiguous completion requirements, multiple partition handling, and the commit lifecycle.


65-207: LGTM on FixedQueuePool tests.

The tests comprehensively cover queue pool behavior: consistent group assignment, distribution, ordered processing within groups, concurrent processing across groups, and stats reporting. Good use of threading synchronization primitives.


209-373: LGTM on SimpleQueueProcessingStrategy tests.

The tests thoroughly cover the strategy's behavior: message processing, offset committing, order preservation, concurrent processing, filtered payload handling, and offset gap blocking. The offset assertions correctly reflect the test's commit function (without the +1 adjustment that the production code adds).


375-421: LGTM on integration test.

The integration test correctly verifies that ResultsStrategyFactory with mode="thread-queue-parallel" creates a SimpleQueueProcessingStrategy with the expected queue pool configuration.

src/sentry/remote_subscriptions/consumers/result_consumer.py (5)

22-25: LGTM on new imports.

The imports for FixedQueuePool and SimpleQueueProcessingStrategy are correctly added.


96-102: LGTM on new class attributes.

The thread_queue_parallel flag and queue_pool attribute are appropriately declared for the new processing mode.


106-137: LGTM on mode initialization.

The initialization correctly:

  1. Creates a FixedQueuePool when mode is "thread-queue-parallel"
  2. Uses max_workers as the number of queues (defaulting to 20)
  3. Passes the result processor and identifier to the queue pool

180-185: LGTM on shutdown handling.

The shutdown method correctly handles the queue pool cleanup, setting it to None after shutdown.


244-259: LGTM on thread-queue-parallel worker creation.

The implementation correctly:

  1. Wraps the commit function to add +1 to offsets (Kafka requires committing the next offset to read)
  2. Creates SimpleQueueProcessingStrategy with the appropriate decoder and grouping function
src/sentry/remote_subscriptions/consumers/queue_consumer.py (6)

24-31: LGTM on WorkItem dataclass.

The dataclass correctly captures all necessary information for tracking work items: partition, offset, result, and the original message.


49-54: Minor race condition in lock creation, but functionally safe.

The _get_partition_lock method has a theoretical race where two threads could both see lock=None and proceed to setdefault. However, since dict.setdefault is thread-safe and returns the existing value if a concurrent thread already inserted the key, only one lock will be used. The extra lock created by the losing thread will be garbage collected.

This is functionally correct but could be slightly cleaner with a global lock for partition lock creation. Given the low frequency of new partition discovery, this is acceptable.


67-98: LGTM on get_committable_offsets.

The implementation correctly finds the highest contiguous completed offset for each partition. The linear scan is acceptable for typical Kafka workloads where the outstanding offset range is small.


127-156: Verify: Failed items are marked as complete and won't be reprocessed.

In the finally block (line 149), complete_offset is called regardless of whether processing succeeded or failed. This means if result_processor throws an exception, the offset is still marked complete and the message won't be reprocessed after a restart.

This is a design choice that prioritizes throughput over at-least-once delivery. The exception is logged (line 145-147), but the message is effectively lost. Verify this aligns with the expected semantics for uptime results.


273-291: LGTM on commit loop.

The commit loop correctly:

  1. Runs periodically (every ~1 second)
  2. Gets committable offsets from the tracker
  3. Commits them via the commit function
  4. Only marks as committed after successful commit
  5. Handles exceptions gracefully

236-240: No issues found. The codebase requires Python 3.13+ (as specified in setup.cfg: python_requires = >=3.13), and queue.Queue.shutdown() was added in Python 3.13, making the code compatible with all supported versions. The try/except wrapping is appropriate defensive programming.

Likely an incorrect or invalid review comment.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants