Skip to content

feat(uptime): Add ability to use queues to manage parallelism#9

Open
everettbu wants to merge 1 commit into
kafka-consumer-parallel-beforefrom
kafka-consumer-parallel-after
Open

feat(uptime): Add ability to use queues to manage parallelism#9
everettbu wants to merge 1 commit into
kafka-consumer-parallel-beforefrom
kafka-consumer-parallel-after

Conversation

@everettbu

@everettbu everettbu commented Jul 29, 2025

Copy link
Copy Markdown
Contributor

Test 9

One potential problem we have with batch processing is that any one slow
item will clog up the whole batch. This pr implements a queueing method
instead, where we keep N queues that each have their own workers.
There's still a chance of individual items backlogging a queue, but we
can try increased concurrency here to reduce the chances of that
happening

<!-- Describe your PR here. -->

@greptile-apps greptile-apps Bot left a comment

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Greptile Summary

This PR introduces a new thread-queue-parallel processing mode for Kafka consumers in Sentry's uptime monitoring system. The implementation adds sophisticated queue-based parallelism that maintains ordering guarantees within subscription groups while enabling concurrent processing across different groups.

The core addition is the queue_consumer.py module, which implements several key components:

  • OffsetTracker: Manages safe offset commits with gap handling to prevent message loss
  • OrderedQueueWorker: Thread-based workers that process messages from specific queues
  • FixedQueuePool: Distributes work across fixed queues using consistent hashing
  • SimpleQueueProcessingStrategy: Main processing strategy that integrates with Arroyo framework

The system uses consistent hashing to route messages to specific worker queues, ensuring all messages for the same subscription group are processed in order while allowing parallelism across different groups. This addresses limitations of existing modes: serial processing is too slow, multiprocessing has overhead, and batched-parallel still processes in batches.

The integration involves:

  • Adding the new mode to ResultsStrategyFactory with early result_processor initialization
  • Updating CLI options in consumers/__init__.py to include the new mode
  • Comprehensive test coverage including unit tests and Kafka integration tests
  • Clean-up of test module __init__.py files

This architecture is particularly valuable for uptime monitoring where order matters within individual subscriptions but different subscriptions can be processed independently, providing natural backpressure and load distribution.

Confidence score: 3/5

  • This PR introduces significant concurrency complexity that could lead to subtle race conditions or deadlocks in production
  • The offset tracking logic in OffsetTracker.commit_offset() has complex gap handling that could potentially commit incorrect offsets under edge cases
  • The thread-safe partition lock creation using setdefault pattern and exception handling for queue.ShutDown in worker threads need careful review for correctness
  • Files needing more attention: src/sentry/remote_subscriptions/consumers/queue_consumer.py (lines 67-98 for offset commit logic, lines 49-54 for thread-safe operations), src/sentry/remote_subscriptions/consumers/result_consumer.py (early processor initialization and queue pool management)

7 files reviewed, 2 comments

Edit Code Review Bot Settings | Greptile

Comment on lines +132 to +133
except queue.ShutDown:
break

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

logic: Exception handling assumes queue.ShutDown exists, but standard library queue module doesn't have this exception


for q in self.queues:
try:
q.shutdown(immediate=False)

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

logic: Standard library queue.Queue doesn't have a shutdown() method with immediate parameter

@GitHoobar

Copy link
Copy Markdown

Review Summary

🏷️ Draft Comments (8)

Skipped posting 8 draft comments that were valid but scored below your review threshold (>=13/15). Feel free to update them here.

src/sentry/consumers/__init__.py (1)

457-630: get_stream_processor (lines 457-630) is a very large, complex function with many branches and responsibilities, making it difficult to maintain and optimize for performance as the system evolves.

📊 Impact Scores:

  • Production Impact: 2/5
  • Fix Specificity: 3/5
  • Urgency Impact: 2/5
  • Total Score: 7/15

🤖 AI Agent Prompt (Copy & Paste Ready):

Refactor `get_stream_processor` in src/sentry/consumers/__init__.py (lines 457-630). The function is too large and complex, with many branches and responsibilities, making it hard to maintain and optimize. Extract major responsibilities into well-named helper functions or classes: (1) consumer definition lookup/validation, (2) click command/context/param handling, (3) Kafka consumer config building, (4) synchronization setup, (5) strategy factory wrapping, (6) DLQ/stale topic setup, and (7) StreamProcessor construction. Each helper should have a clear, single responsibility and be unit-testable. The main function should orchestrate these helpers, reducing complexity and improving maintainability.

src/sentry/remote_subscriptions/consumers/queue_consumer.py (5)

238-238: queue.Queue does not have a shutdown method; calling q.shutdown(immediate=False) in FixedQueuePool.shutdown() will raise an AttributeError and prevent proper shutdown of worker threads.

📊 Impact Scores:

  • Production Impact: 4/5
  • Fix Specificity: 2/5
  • Urgency Impact: 3/5
  • Total Score: 9/15

🤖 AI Agent Prompt (Copy & Paste Ready):

In src/sentry/remote_subscriptions/consumers/queue_consumer.py, line 238, the code calls `q.shutdown(immediate=False)` on a standard `queue.Queue` object, which does not have a `shutdown` method. This will raise an AttributeError and prevent proper shutdown of worker threads. Remove this line to avoid the runtime error and ensure the shutdown process completes.

132-133: In OrderedQueueWorker.run, catching queue.ShutDown will always fail because queue has no ShutDown exception; this will cause unhandled exceptions and may crash the worker.

📊 Impact Scores:

  • Production Impact: 4/5
  • Fix Specificity: 4/5
  • Urgency Impact: 4/5
  • Total Score: 12/15

🤖 AI Agent Prompt (Copy & Paste Ready):

In src/sentry/remote_subscriptions/consumers/queue_consumer.py, lines 132-133, the code attempts to catch `queue.ShutDown`, but the standard library `queue` module does not define a `ShutDown` exception. Remove this except block to prevent unhandled exceptions and ensure the worker thread handles queue shutdown correctly.

129-149: In OrderedQueueWorker.run, the finally block always calls self.offset_tracker.complete_offset even if work_item was never assigned (e.g., if self.work_queue.get() raises an exception), causing an UnboundLocalError and crashing the worker.

📊 Impact Scores:

  • Production Impact: 4/5
  • Fix Specificity: 3/5
  • Urgency Impact: 4/5
  • Total Score: 11/15

🤖 AI Agent Prompt (Copy & Paste Ready):

In src/sentry/remote_subscriptions/consumers/queue_consumer.py, lines 129-149, the `finally` block always calls `self.offset_tracker.complete_offset(work_item.partition, work_item.offset)`, but if `self.work_queue.get()` raises an exception, `work_item` will be undefined, causing an `UnboundLocalError`. Update the `finally` block to check if `work_item` is defined before using it (e.g., `if 'work_item' in locals(): ...`).

273-292: try-except inside the commit loop (_commit_loop) causes unnecessary performance overhead on every iteration, especially under high throughput; this can be refactored to minimize exception handling cost.

📊 Impact Scores:

  • Production Impact: 2/5
  • Fix Specificity: 4/5
  • Urgency Impact: 2/5
  • Total Score: 8/15

🤖 AI Agent Prompt (Copy & Paste Ready):

Refactor src/sentry/remote_subscriptions/consumers/queue_consumer.py lines 273-292: The `_commit_loop` method currently wraps the entire loop body in a `try`-`except`, causing performance overhead on every iteration. Move the `try`-`except` inside the loop to only wrap the code that can actually throw, minimizing exception handling cost. Ensure the exception handler only covers the offset commit logic, not the wait.

297-297: assert isinstance(message.value, BrokerValue) in SimpleQueueProcessingStrategy.submit can be bypassed if assertions are disabled, leading to type confusion and potential logic errors with attacker-controlled input.

📊 Impact Scores:

  • Production Impact: 3/5
  • Fix Specificity: 5/5
  • Urgency Impact: 2/5
  • Total Score: 10/15

🤖 AI Agent Prompt (Copy & Paste Ready):

In src/sentry/remote_subscriptions/consumers/queue_consumer.py, lines 297-297, replace the use of `assert isinstance(message.value, BrokerValue)` with an explicit runtime type check and error handling. This prevents attackers from bypassing the type check if Python assertions are disabled, which could lead to logic errors or security issues. Insert a conditional that logs an error and returns if the type check fails.

src/sentry/remote_subscriptions/consumers/result_consumer.py (2)

121-121: ThreadPoolExecutor in batched-parallel mode is created per-factory instance, risking thread exhaustion and resource contention if many factories are created (measurable impact at scale).

📊 Impact Scores:

  • Production Impact: 4/5
  • Fix Specificity: 2/5
  • Urgency Impact: 3/5
  • Total Score: 9/15

🤖 AI Agent Prompt (Copy & Paste Ready):

In src/sentry/remote_subscriptions/consumers/result_consumer.py, lines 121-121, the code creates a new ThreadPoolExecutor for each ResultsStrategyFactory instance in batched-parallel mode, which can exhaust threads and system resources at scale. Refactor so that ThreadPoolExecutor is a shared class-level singleton (one per process), not per-factory instance. Only create it if it doesn't already exist.

133-137: Potential unbounded growth of FixedQueuePool queues if message production outpaces consumption, leading to high memory usage and possible OOM at scale.

📊 Impact Scores:

  • Production Impact: 4/5
  • Fix Specificity: 2/5
  • Urgency Impact: 3/5
  • Total Score: 9/15

🤖 AI Agent Prompt (Copy & Paste Ready):

In src/sentry/remote_subscriptions/consumers/result_consumer.py, lines 133-137, FixedQueuePool is instantiated without a maximum queue size, which can lead to unbounded memory growth and OOM if message production outpaces consumption. Update FixedQueuePool and its usage to support a configurable max queue size per queue, and ensure that backpressure or message dropping is handled appropriately when the queue is full.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants