Skip to content

feat(uptime): Add ability to use queues to manage parallelism#9

Draft
ShashankFC wants to merge 1 commit into
kafka-consumer-parallel-beforefrom
kafka-consumer-parallel-after
Draft

feat(uptime): Add ability to use queues to manage parallelism#9
ShashankFC wants to merge 1 commit into
kafka-consumer-parallel-beforefrom
kafka-consumer-parallel-after

Conversation

@ShashankFC

Copy link
Copy Markdown
Collaborator

Test 9nn---nReplicated from ai-code-review-evaluation/sentry-greptile#9

One potential problem we have with batch processing is that any one slow
item will clog up the whole batch. This pr implements a queueing method
instead, where we keep N queues that each have their own workers.
There's still a chance of individual items backlogging a queue, but we
can try increased concurrency here to reduce the chances of that
happening

<!-- Describe your PR here. -->
@ShashankFC

Copy link
Copy Markdown
Collaborator Author

@claude

@claude

claude Bot commented Jan 7, 2026

Copy link
Copy Markdown

Claude finished @ShashankFC's task in 1m 43s —— View job


Code Review Complete

  • Gather context and read changed files
  • Review implementation for correctness and best practices
  • Analyze test coverage and quality
  • Provide detailed feedback

Summary

This PR adds a new thread-queue-parallel processing mode for Kafka consumers, implementing a queue-based parallelism system that preserves message ordering within groups while allowing concurrent processing across groups. The implementation is generally well-structured with comprehensive test coverage.

Critical Issues

1. Race Condition in Lock Acquisition (src/sentry/remote_subscriptions/consumers/queue_consumer.py:49-54)

def _get_partition_lock(self, partition: Partition) -> threading.Lock:
    """Get or create a lock for a partition."""
    lock = self.partition_locks.get(partition)
    if lock:
        return lock
    return self.partition_locks.setdefault(partition, threading.Lock())

Issue: Classic check-then-act race condition. If two threads call this simultaneously for a new partition, both could pass the if lock: check and create separate locks via setdefault, potentially creating two different lock objects for the same partition.

Impact: This could lead to data corruption in offset tracking since different threads might use different locks for the same partition, breaking the mutual exclusion guarantee.

Recommendation: Use a global lock for partition lock creation:

def __init__(self) -> None:
    self.all_offsets: dict[Partition, set[int]] = defaultdict(set)
    self.outstanding: dict[Partition, set[int]] = defaultdict(set)
    self.last_committed: dict[Partition, int] = {}
    self.partition_locks: dict[Partition, threading.Lock] = {}
    self._partition_locks_lock = threading.Lock()  # Add this

def _get_partition_lock(self, partition: Partition) -> threading.Lock:
    with self._partition_locks_lock:
        return self.partition_locks.setdefault(partition, threading.Lock())

2. Inefficient Offset Commit Algorithm (src/sentry/remote_subscriptions/consumers/queue_consumer.py:67-98)

The get_committable_offsets method has O(n²) complexity where n is the range between min and max offset. For partitions with large offset gaps, this could become a performance bottleneck:

for offset in range(start, max_offset + 1):  # Could iterate millions of times
    if offset in all_offsets and offset not in outstanding:
        highest_committable = offset
    else:
        break

Impact: If there's a gap of 1 million offsets, this iterates 1 million times checking set membership on each iteration.

Recommendation: Use sorted offset tracking:

def get_committable_offsets(self) -> dict[Partition, int]:
    committable = {}
    for partition in list(self.all_offsets.keys()):
        with self._get_partition_lock(partition):
            all_offsets = sorted(self.all_offsets[partition])
            if not all_offsets:
                continue
            
            outstanding = self.outstanding[partition]
            last_committed = self.last_committed.get(partition, -1)
            
            highest_committable = last_committed
            for offset in all_offsets:
                if offset <= last_committed:
                    continue
                if offset in outstanding:
                    break
                if offset != highest_committable + 1:
                    break
                highest_committable = offset
            
            if highest_committable > last_committed:
                committable[partition] = highest_committable
    
    return committable

3. Missing Error Handling in Worker Thread (src/sentry/remote_subscriptions/consumers/queue_consumer.py:127-157)

The worker thread catches exceptions but continues running even after queue.ShutDown is raised. The except queue.ShutDown: break at line 142 is unreachable because the outer try already caught it at line 132.

def run(self) -> None:
    while not self.shutdown:
        try:
            work_item = self.work_queue.get()
        except queue.ShutDown:
            break

        try:
            # ... processing code ...
        except queue.ShutDown:  # This is unreachable!
            break

Recommendation: Restructure exception handling:

def run(self) -> None:
    while not self.shutdown:
        try:
            work_item = self.work_queue.get(timeout=1.0)
        except queue.Empty:
            continue
        except queue.ShutDown:
            break

        try:
            with sentry_sdk.start_transaction(...):
                self.result_processor(self.identifier, work_item.result)
        except Exception:
            logger.exception(...)
        finally:
            self.offset_tracker.complete_offset(...)

High-Severity Issues

4. Potential Memory Leak in Offset Tracking (src/sentry/remote_subscriptions/consumers/queue_consumer.py:100-106)

The mark_committed method only removes offsets <= committed offset, but if messages arrive out of order (e.g., offset 1000, then 100), the all_offsets set could grow unbounded until 1000 is committed.

def mark_committed(self, partition: Partition, offset: int) -> None:
    with self._get_partition_lock(partition):
        self.last_committed[partition] = offset
        # Remove all offsets <= committed offset
        self.all_offsets[partition] = {o for o in self.all_offsets[partition] if o > offset}

Recommendation: Add periodic cleanup or bounded set size with logging when threshold is exceeded.

5. Busy Loop in Commit Thread (src/sentry/remote_subscriptions/consumers/queue_consumer.py:273-291)

The commit loop uses wait(1.0) which is reasonable, but it always checks committable offsets even when nothing has been processed. Consider adding a notification mechanism when work is completed to commit more promptly.

Medium-Severity Issues

6. Hardcoded Number of Queues (src/sentry/remote_subscriptions/consumers/result_consumer.py:136)

num_queues=max_workers or 20,  # Number of parallel queues

The default of 20 queues seems arbitrary and isn't documented. This should either:

  • Be a configurable constant with documentation explaining the choice
  • Scale with some metric (e.g., partition count, expected load)

7. Silent Failure on Decoder Error (src/sentry/remote_subscriptions/consumers/queue_consumer.py:317-325)

When message decoding fails, the exception is logged but the offset is still marked as complete. While this prevents poison messages from blocking the consumer, it could hide systematic issues.

Recommendation: Add metrics to track decoder failures:

except Exception:
    logger.exception("Error submitting message to queue")
    metrics.incr("queue_consumer.decoder_error", tags={"identifier": self.queue_pool.identifier})
    if isinstance(message.value, BrokerValue):
        # ... existing code

8. Missing Timeout in wait_until_empty (src/sentry/remote_subscriptions/consumers/queue_consumer.py:222-229)

The method polls every 10ms for up to 5 seconds. In tests with many messages, this could be inefficient. Consider using threading events for notification-based waiting.

9. Type Safety Issue (src/sentry/remote_subscriptions/consumers/queue_consumer.py:297)

assert isinstance(message.value, BrokerValue)

Using assertions for type checking in production code is problematic. If Python runs with -O, assertions are stripped. Use explicit type checks with proper error handling instead.

Low-Severity Issues / Code Quality

10. Inconsistent Naming

  • SimpleQueueProcessingStrategy - Not actually "simple" given the complexity
  • OffsetTracker - Could be more specific like PartitionOffsetTracker

11. Magic Numbers

  • Line 228: time.sleep(0.01) - Why 10ms?
  • Line 243: timeout=5.0 - Why 5 seconds?
  • Line 276: self.shutdown_event.wait(1.0) - Why 1 second?

These should be constants with documentation.

12. Missing Docstrings

Several important methods lack docstrings:

  • submit method in SimpleQueueProcessingStrategy
  • poll method
  • join method

Positive Observations

Excellent Test Coverage: The test suite is comprehensive, covering:

  • Basic functionality
  • Edge cases (offset gaps, errors, out-of-order processing)
  • Integration tests with actual Kafka
  • Concurrency and ordering guarantees

Proper Threading Patterns: Generally good use of locks, daemon threads, and shutdown mechanisms

Metrics Integration: Good instrumentation for monitoring

Backward Compatibility: The new mode is opt-in, preserving existing serial/parallel modes

Recommendations

  1. Fix the critical race condition in _get_partition_lock immediately
  2. Optimize get_committable_offsets to handle large offset gaps efficiently
  3. Add bounded growth protection to offset tracking to prevent memory leaks
  4. Improve error metrics for better operational visibility
  5. Document the threading model and ordering guarantees clearly
  6. Add configuration validation (e.g., max_workers > 0)
  7. Consider adding a health check that verifies worker threads are alive

Test Coverage Assessment

The test coverage is exceptional:

  • ✅ Unit tests for all major components
  • ✅ Integration tests with real Kafka
  • ✅ Concurrency and race condition tests
  • ✅ Error handling tests
  • ✅ Offset commit verification

The integration test at line 2012-2147 is particularly thorough, verifying actual Kafka offset commits.

Overall Assessment

This is a well-thought-out implementation with strong test coverage. The critical issues around race conditions and performance need to be addressed before merging to production, but the overall architecture is sound. The queue-based approach is appropriate for the problem being solved (maintaining ordering within groups while allowing parallelism across groups).

Recommendation: Request changes to address critical issues #1, #2, and #3 before merging.


Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants