Skip to content

Add ability to use queues to manage parallelism#9

Open
hussam789 wants to merge 1 commit into
kafka-consumer-parallel-beforefrom
kafka-consumer-parallel-after
Open

Add ability to use queues to manage parallelism#9
hussam789 wants to merge 1 commit into
kafka-consumer-parallel-beforefrom
kafka-consumer-parallel-after

Conversation

@hussam789

@hussam789 hussam789 commented Oct 30, 2025

Copy link
Copy Markdown

User description

PR #9


PR Type

Enhancement


Description

  • Implement thread-queue-parallel processing mode using fixed queue pool

  • Add OffsetTracker for safe offset management across partitions

  • Create OrderedQueueWorker and FixedQueuePool for concurrent group processing

  • Add SimpleQueueProcessingStrategy with automatic offset committing

  • Integrate queue-based consumer into ResultsStrategyFactory

  • Add comprehensive tests for queue consumer and offset tracking


Diagram Walkthrough

flowchart LR
  A["Kafka Messages"] -->|submit| B["SimpleQueueProcessingStrategy"]
  B -->|decode & group| C["FixedQueuePool"]
  C -->|distribute by group| D["Multiple Queues"]
  D -->|FIFO processing| E["OrderedQueueWorkers"]
  E -->|track offsets| F["OffsetTracker"]
  F -->|committable offsets| G["Commit Loop"]
  G -->|commit| H["Kafka"]
Loading

File Walkthrough

Relevant files
Configuration changes
__init__.py
Add thread-queue-parallel mode to CLI options                       

src/sentry/consumers/init.py

  • Add "thread-queue-parallel" mode option to uptime consumer CLI
  • Update help text for max_workers to reflect parallelism usage across
    all modes
+2/-2     
Enhancement
queue_consumer.py
Implement queue-based parallel processing infrastructure 

src/sentry/remote_subscriptions/consumers/queue_consumer.py

  • Implement OffsetTracker class for thread-safe offset tracking per
    partition
  • Create OrderedQueueWorker thread class for processing queue items in
    order
  • Implement FixedQueuePool with consistent hashing for group-to-queue
    assignment
  • Add SimpleQueueProcessingStrategy for Kafka message processing with
    automatic commits
  • Include metrics collection for queue depth and offset commits
+345/-0 
result_consumer.py
Integrate queue-based strategy into results factory           

src/sentry/remote_subscriptions/consumers/result_consumer.py

  • Import FixedQueuePool and SimpleQueueProcessingStrategy
  • Add thread_queue_parallel flag and queue_pool attribute to
    ResultsStrategyFactory
  • Initialize queue_pool in __init__ when mode is "thread-queue-parallel"
  • Add create_thread_queue_parallel_worker method to create strategy
  • Update create_with_partitions to route to queue-based strategy
  • Add queue_pool shutdown in shutdown method
  • Move result_processor initialization earlier in __init__
+41/-3   
Tests
test_queue_consumer.py
Add comprehensive tests for queue consumer                             

tests/sentry/remote_subscriptions/consumers/test_queue_consumer.py

  • Add TestOffsetTracker with tests for offset tracking and committing
  • Add TestFixedQueuePool with tests for group assignment and ordered
    processing
  • Add TestSimpleQueueProcessingStrategy with tests for message
    processing and offset commits
  • Add TestThreadQueueParallelIntegration for factory integration testing
  • Test concurrent processing, offset gaps, and error handling
+421/-0 
test_results_consumer.py
Add thread-queue-parallel consumer tests and Kafka integration

tests/sentry/uptime/consumers/test_results_consumer.py

  • Add test_thread_queue_parallel for basic message processing
  • Add test_thread_queue_parallel_preserves_order for order preservation
    within subscriptions
  • Add test_thread_queue_parallel_concurrent_subscriptions for concurrent
    processing
  • Add test_thread_queue_parallel_offset_commit for offset committing
    behavior
  • Add test_thread_queue_parallel_error_handling for error resilience
  • Add test_thread_queue_parallel_offset_gaps for gap handling
  • Add test_thread_queue_parallel_graceful_shutdown for shutdown behavior
  • Add ProcessResultThreadQueueParallelKafkaTest with Kafka integration
    test
  • Import additional test utilities and Kafka dependencies
+467/-1 

One potential problem we have with batch processing is that any one slow
item will clog up the whole batch. This pr implements a queueing method
instead, where we keep N queues that each have their own workers.
There's still a chance of individual items backlogging a queue, but we
can try increased concurrency here to reduce the chances of that
happening

<!-- Describe your PR here. -->
@qodo-code-review

Copy link
Copy Markdown

PR Compliance Guide 🔍

Below is a summary of compliance checks for this PR:

Security Compliance
Error handling robustness

Description: Worker threads catch broad Exception and continue processing without surfacing failures or
signaling backpressure, which can lead to silent data loss or inconsistent state if result
processing must be exactly-once; ensure errors are captured and trigger retries or
poisoning mechanisms.
queue_consumer.py [127-156]

Referred Code
def run(self) -> None:
    """Process items from the queue in order."""
    while not self.shutdown:
        try:
            work_item = self.work_queue.get()
        except queue.ShutDown:
            break

        try:
            with sentry_sdk.start_transaction(
                op="queue_worker.process",
                name=f"monitors.{self.identifier}.worker_{self.worker_id}",
            ):
                self.result_processor(self.identifier, work_item.result)

        except queue.ShutDown:
            break
        except Exception:
            logger.exception(
                "Unexpected error in queue worker", extra={"worker_id": self.worker_id}
            )


 ... (clipped 9 lines)
Offset commit timing

Description: Commit loop commits offsets asynchronously on a fixed 1s cadence without bounding commit
latency or ensuring in-flight work is drained before shutdown, risking uncommitted
processed offsets on abrupt termination; verify shutdown ordering to avoid duplicate
processing or commit gaps.
queue_consumer.py [270-293]

Referred Code
    self.commit_thread = threading.Thread(target=self._commit_loop, daemon=True)
    self.commit_thread.start()

def _commit_loop(self) -> None:
    while not self.shutdown_event.is_set():
        try:
            self.shutdown_event.wait(1.0)

            committable = self.queue_pool.offset_tracker.get_committable_offsets()

            if committable:
                metrics.incr(
                    "remote_subscriptions.queue_pool.offsets_committed",
                    len(committable),
                    tags={"identifier": self.queue_pool.identifier},
                )

                self.commit_function(committable)
                for partition, offset in committable.items():
                    self.queue_pool.offset_tracker.mark_committed(partition, offset)
        except Exception:


 ... (clipped 3 lines)
Ticket Compliance
🎫 No ticket provided
  • Create ticket/issue
Codebase Duplication Compliance
Codebase context is not defined

Follow the guide to enable codebase context checks.

Custom Compliance
🟢
Generic: Meaningful Naming and Self-Documenting Code

Objective: Ensure all identifiers clearly express their purpose and intent, making code
self-documenting

Status: Passed

Generic: Secure Error Handling

Objective: To prevent the leakage of sensitive system information through error messages while
providing sufficient detail for internal debugging.

Status: Passed

Generic: Secure Logging Practices

Objective: To ensure logs are useful for debugging and auditing without exposing sensitive
information like PII, PHI, or cardholder data.

Status: Passed

Generic: Comprehensive Audit Trails

Objective: To create a detailed and reliable record of critical system actions for security analysis
and compliance.

Status:
Missing audits: New queue-based processing and offset commit logic do not emit audit-grade logs for
critical actions like processing results and committing offsets, making reconstruction of
events unclear.

Referred Code
    self.commit_thread = threading.Thread(target=self._commit_loop, daemon=True)
    self.commit_thread.start()

def _commit_loop(self) -> None:
    while not self.shutdown_event.is_set():
        try:
            self.shutdown_event.wait(1.0)

            committable = self.queue_pool.offset_tracker.get_committable_offsets()

            if committable:
                metrics.incr(
                    "remote_subscriptions.queue_pool.offsets_committed",
                    len(committable),
                    tags={"identifier": self.queue_pool.identifier},
                )

                self.commit_function(committable)
                for partition, offset in committable.items():
                    self.queue_pool.offset_tracker.mark_committed(partition, offset)
        except Exception:


 ... (clipped 1 lines)
Generic: Robust Error Handling and Edge Case Management

Objective: Ensure comprehensive error handling that provides meaningful context and graceful
degradation

Status:
Broad exception: Multiple broad exception handlers log generic messages without contextual details about
the failing payload or group, which may hinder debugging and edge case clarity.

Referred Code
    break
except Exception:
    logger.exception(
        "Unexpected error in queue worker", extra={"worker_id": self.worker_id}
    )
Generic: Security-First Input Validation and Data Handling

Objective: Ensure all data inputs are validated, sanitized, and handled securely to prevent
vulnerabilities

Status:
Minimal validation: Decoder and grouping functions are invoked without explicit validation of decoded data
integrity which could allow malformed inputs to affect processing logic.

Referred Code
try:
    result = self.decoder(message.payload)

    assert isinstance(message.value, BrokerValue)
    partition = message.value.partition
    offset = message.value.offset

    if result is None:
        self.queue_pool.offset_tracker.add_offset(partition, offset)
        self.queue_pool.offset_tracker.complete_offset(partition, offset)
        return

    group_key = self.grouping_fn(result)

    work_item = WorkItem(
        partition=partition,
        offset=offset,
        result=result,
        message=message,
    )



 ... (clipped 2 lines)
Compliance status legend 🟢 - Fully Compliant
🟡 - Partial Compliant
🔴 - Not Compliant
⚪ - Requires Further Human Verification
🏷️ - Compliance label

@qodo-code-review

Copy link
Copy Markdown

PR Code Suggestions ✨

Explore these optional code suggestions:

CategorySuggestion                                                                                                                                    Impact
High-level
Re-evaluate the custom stream processing logic

The PR adds a custom, complex stream processing engine. This increases
maintenance and bug risk. Consider if the arroyo framework's existing features
can provide ordered, parallel processing with less custom code.

Examples:

src/sentry/remote_subscriptions/consumers/queue_consumer.py [34-345]
class OffsetTracker:
    """
    Tracks outstanding offsets and determines which offsets are safe to commit.

    - Tracks offsets per partition
    - Only commits offsets when all prior offsets are processed
    - Thread-safe for concurrent access with per-partition locks
    """

    def __init__(self) -> None:

 ... (clipped 302 lines)

Solution Walkthrough:

Before:

class SimpleQueueProcessingStrategy(ProcessingStrategy):
    def __init__(self, queue_pool, decoder, grouping_fn, commit_function):
        self.queue_pool = queue_pool  # Manages threads and queues
        self.commit_function = commit_function
        self.commit_thread = threading.Thread(target=self._commit_loop)
        self.commit_thread.start()

    def submit(self, message):
        # ... decode message ...
        group_key = self.grouping_fn(result)
        # Manually track offset before submitting to a queue
        self.queue_pool.offset_tracker.add_offset(...)
        self.queue_pool.submit(group_key, work_item)

    def _commit_loop(self):
        while not self.shutdown_event.is_set():
            # Manually find committable offsets from tracker
            committable = self.queue_pool.offset_tracker.get_committable_offsets()
            if committable:
                self.commit_function(committable)
                # Manually mark offsets as committed in tracker
                self.queue_pool.offset_tracker.mark_committed(...)

After:

# Hypothetical simpler approach using more of arroyo
# This is an exploration, not a concrete implementation.

def create_strategy(commit_func):
    # Arroyo strategy to process a single message for a given group key.
    # Arroyo would manage a pool of these, one per active key.
    def process_message_for_group(message):
        # ... decode and process ...
        pass

    # Arroyo strategy that partitions the stream by a key function.
    # It would ensure that messages for the same key are processed serially.
    partition_by_group = PartitionByKey(
        key_function=grouping_fn,
        next_step=RunTask(process_message_for_group)
    )

    # The top-level strategy would compose these building blocks.
    # Arroyo would handle offset management and commits automatically.
    return Compose(
        partition_by_group,
        CommitOffsets(commit_func),
    )
Suggestion importance[1-10]: 9

__

Why: This is an excellent high-level suggestion that correctly identifies the main architectural change of the PR—a complex, custom-built processing engine—and astutely points out the significant risks of increased maintenance and potential concurrency bugs. It proposes a valid alternative by questioning if the arroyo framework could be better leveraged, which is a critical consideration for long-term stability and maintainability.

High
  • More

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants