feat(uptime): Add ability to use queues to manage parallelism#8
feat(uptime): Add ability to use queues to manage parallelism#8everettbu wants to merge 1 commit into
Conversation
One potential problem we have with batch processing is that any one slow item will clog up the whole batch. This pr implements a queueing method instead, where we keep N queues that each have their own workers. There's still a chance of individual items backlogging a queue, but we can try increased concurrency here to reduce the chances of that happening <!-- Describe your PR here. -->
There was a problem hiding this comment.
Pull Request Overview
This PR adds a new "thread-queue-parallel" mode to the uptime monitoring system to manage parallelism using queues. This allows for ordered processing within subscription groups while maintaining concurrency across different subscriptions.
- Introduces thread-queue-parallel processing mode for uptime results consumer
- Implements queue-based parallelism with ordering guarantees within groups
- Adds comprehensive test coverage for the new processing mode
Reviewed Changes
Copilot reviewed 5 out of 7 changed files in this pull request and generated 4 comments.
Show a summary per file
| File | Description |
|---|---|
| tests/sentry/uptime/consumers/test_results_consumer.py | Adds extensive test cases for thread-queue-parallel mode including ordering, error handling, and Kafka integration tests |
| tests/sentry/remote_subscriptions/consumers/test_queue_consumer.py | New test file for the queue consumer implementation with unit tests for offset tracking, queue pool, and processing strategy |
| src/sentry/remote_subscriptions/consumers/result_consumer.py | Updates ResultsStrategyFactory to support thread-queue-parallel mode and integrates with the new queue processing system |
| src/sentry/remote_subscriptions/consumers/queue_consumer.py | New implementation of thread-queue-parallel processing with offset tracking, ordered queues, and worker threads |
| src/sentry/consumers/init.py | Updates CLI options to include the new thread-queue-parallel mode |
Comments suppressed due to low confidence (2)
src/sentry/remote_subscriptions/consumers/queue_consumer.py:132
- The exception name 'queue.ShutDown' appears to be incorrect. The standard Python queue module uses 'queue.ShutDown' but this may not exist. Consider using the correct exception type or defining a custom exception.
except queue.ShutDown:
src/sentry/remote_subscriptions/consumers/queue_consumer.py:142
- The exception name 'queue.ShutDown' appears to be incorrect. The standard Python queue module uses 'queue.ShutDown' but this may not exist. Consider using the correct exception type or defining a custom exception.
except queue.ShutDown:
|
|
||
| for q in self.queues: | ||
| try: | ||
| q.shutdown(immediate=False) |
There was a problem hiding this comment.
The queue.shutdown() method with 'immediate=False' parameter may not exist in the standard Python queue module. This could cause AttributeError at runtime. Verify the correct API or implement a custom shutdown mechanism.
| original_create_with_partitions = factory.create_with_partitions | ||
|
|
||
| def create_with_partitions_tracking(commit, partitions): | ||
| def tracked_commit( | ||
| offsets: Mapping[Partition, int], force: bool = False | ||
| ) -> None: | ||
| nonlocal commit_count | ||
| commit_count += 1 | ||
| commits_made.append(dict(offsets)) | ||
| return commit(offsets, force) | ||
|
|
||
| return original_create_with_partitions(tracked_commit, partitions) | ||
|
|
||
| factory.create_with_partitions = create_with_partitions_tracking # type: ignore[method-assign] |
There was a problem hiding this comment.
[nitpick] Monkey-patching the factory method in tests makes the test fragile and harder to understand. Consider using dependency injection or creating a test-specific factory subclass instead.
| original_create_with_partitions = factory.create_with_partitions | |
| def create_with_partitions_tracking(commit, partitions): | |
| def tracked_commit( | |
| offsets: Mapping[Partition, int], force: bool = False | |
| ) -> None: | |
| nonlocal commit_count | |
| commit_count += 1 | |
| commits_made.append(dict(offsets)) | |
| return commit(offsets, force) | |
| return original_create_with_partitions(tracked_commit, partitions) | |
| factory.create_with_partitions = create_with_partitions_tracking # type: ignore[method-assign] | |
| class TestUptimeResultsStrategyFactory(UptimeResultsStrategyFactory): | |
| def create_with_partitions(self, commit, partitions): | |
| def tracked_commit( | |
| offsets: Mapping[Partition, int], force: bool = False | |
| ) -> None: | |
| nonlocal commit_count | |
| commit_count += 1 | |
| commits_made.append(dict(offsets)) | |
| return commit(offsets, force) | |
| return super().create_with_partitions(tracked_commit, partitions) | |
| factory = TestUptimeResultsStrategyFactory( | |
| mode="thread-queue-parallel", | |
| max_workers=2, | |
| ) |
| self.send_result(result_2, consumer=consumer) | ||
|
|
||
| queue_pool = factory.queue_pool | ||
| max_wait = 50 |
There was a problem hiding this comment.
[nitpick] The magic number 50 for max_wait is used repeatedly throughout the tests. Consider extracting this as a named constant to improve maintainability.
| max_wait = 50 | |
| max_wait = self.MAX_WAIT_ITERATIONS |
| def _commit_loop(self) -> None: | ||
| while not self.shutdown_event.is_set(): | ||
| try: | ||
| self.shutdown_event.wait(1.0) |
There was a problem hiding this comment.
[nitpick] The commit loop runs with a 1-second wait in each iteration, which may cause delays in offset commits. Consider implementing a more responsive mechanism using conditions or events to trigger commits immediately when offsets become available.
| self.shutdown_event.wait(1.0) | |
| with self.commit_condition: | |
| self.commit_condition.wait(timeout=1.0) |
Test 9