Skip to content

feat(uptime): Add ability to use queues to manage parallelism#2

Open
CodingKylo wants to merge 1 commit into
kafka-consumer-parallel-beforefrom
kafka-consumer-parallel-after
Open

feat(uptime): Add ability to use queues to manage parallelism#2
CodingKylo wants to merge 1 commit into
kafka-consumer-parallel-beforefrom
kafka-consumer-parallel-after

Conversation

@CodingKylo

Copy link
Copy Markdown

Martian Code Review Benchmark PR (mirrored from source #9)

One potential problem we have with batch processing is that any one slow
item will clog up the whole batch. This pr implements a queueing method
instead, where we keep N queues that each have their own workers.
There's still a chance of individual items backlogging a queue, but we
can try increased concurrency here to reduce the chances of that
happening

<!-- Describe your PR here. -->

@re-entry-ai re-entry-ai Bot left a comment

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛡️ re-entry.ai Code Review

@CodingKylo — Please review the following assessment:

🔴 Risk Score: 68/100 · HIGH

Dimension Level
Likelihood 🔴 Critical
Impact 🟠 High
Detectability 🟡 Medium

Intent

Add a new uptime results processing mode that uses a queue to coordinate parallelism while preserving ordered Kafka offset commits.

Summary

The change introduces a new CLI mode thread-queue-parallel and wires new queue-based processing components into the remote subscriptions results strategy. The core risk is correctness around Kafka offset tracking/commit semantics under concurrency (including the offset+1 commit behavior) and ensuring the new mode is fully dispatched everywhere it’s selectable. The diff also shows new unit tests for the offset tracker/queue pool, but no end-to-end integration test proving the production processing loop uses the tracker correctly. Please verify that every mode dispatch site (uptime + remote subscriptions) explicitly supports thread-queue-parallel, and that offset commit behavior is correct and cannot deadlock or skip commits on failures/retries.

🎯 Review Focus

The queue-based offset tracking + commit coordination under concurrency: confirm the full OffsetTracker implementation and that every processed offset (success or failure) reaches the correct mark_committed/retry path without deadlocking or skipping commits, and verify the new thread-queue-parallel mode is fully wired in all mode dispatch sites.

Key Findings

  • 🚨 [src/sentry/remote_subscriptions/consumers/queue_consumer.py:L1] CRITICAL: OffsetTracker uses per-partition locks but the implementation is truncated in the diff; given the new concurrency/commit coordination, any missing/incorrect lock release or commit condition can deadlock or cause incorrect commits. Fix: ensure OffsetTracker methods (add_offset/get_committable_offsets/mark_committed) are fully implemented with correct lock scoping and that the worker always calls mark_committed in a finally block for every processed offset (including error paths).
  • 🚨 [src/sentry/remote_subscriptions/consumers/result_consumer.py:L105] CRITICAL: ResultsStrategyFactory adds thread_queue_parallel/queue_pool and sets self.result_processor = self.result_processor_cls() before mode branching; if result_processor_cls() depends on mode-specific fields (e.g., queue_pool) or if the mode branch doesn’t fully initialize required attributes, this can crash at runtime or silently run the wrong processor. Fix: move self.result_processor = ... to after all mode-specific initialization, and add an explicit else: raise ValueError(f"Unsupported mode: {mode}") plus assertions that required fields (parallel_executor/queue_pool/etc.) are non-None for the selected mode.

✅ Action Checklist

  • [ ] CRITICAL [src/sentry/remote_subscriptions/consumers/queue_consumer.py:L1] — OffsetTracker uses per-partition locks but the implementation is truncated in the diff; given the new concurrency/commit coordination, any missing/incorrect lock release or commit condition can deadlock or cause incorrect commits. Fix: ensure OffsetTracker methods (add_offset/get_committable_offsets/mark_committed) are fully implemented with correct lock scoping and that the worker always calls mark_committed in a finally block for every processed offset (including error paths).
  • [ ] CRITICAL [src/sentry/remote_subscriptions/consumers/result_consumer.py:L105] — ResultsStrategyFactory adds thread_queue_parallel/queue_pool and sets self.result_processor = self.result_processor_cls() before mode branching; if result_processor_cls() depends on mode-specific fields (e.g., queue_pool) or if the mode branch doesn’t fully initialize required attributes, this can crash at runtime or silently run the wrong processor. Fix: move self.result_processor = ... to after all mode-specific initialization, and add an explicit else: raise ValueError(f"Unsupported mode: {mode}") plus assertions that required fields (parallel_executor/queue_pool/etc.) are non-None for the selected mode.
  • [ ] SUGGESTION — Verify mode dispatch completeness: src/sentry/consumers/__init__.py adds --mode thread-queue-parallel, but the diff only shows remote-subscriptions ResultsStrategyFactory changes. Add/confirm an explicit branch in the uptime results strategy factory/selection logic so --mode thread-queue-parallel cannot fall back to serial/parallel. Example: in the uptime factory where it maps mode -> strategy, add case "thread-queue-parallel": return ResultsStrategyFactory(mode=mode, ...) and add a test that selects the CLI mode and asserts the queue-based strategy is used.
  • [ ] SUGGESTION — Add an end-to-end integration test for commit semantics: unit tests for OffsetTracker/FixedQueuePool are not sufficient. Create a test that runs the full queue-based processing strategy with a fake/embedded Kafka (or a deterministic stub of the arroyo consumer loop) and asserts committed offsets equal the tracker’s committable offsets under concurrency and out-of-order completion.
  • [ ] SUGGESTION — Harden failure/retry behavior: in src/sentry/remote_subscriptions/consumers/queue_consumer.py, ensure that when processing fails for an offset, the system either (a) does not mark it committed and eventually retries, or (b) marks it in a way that preserves ordering guarantees without blocking later offsets forever. Concretely: add a test where one offset raises and verify subsequent offsets still progress and commits are correct.

Suggestions

  • Verify mode dispatch completeness: src/sentry/consumers/__init__.py adds --mode thread-queue-parallel, but the diff only shows remote-subscriptions ResultsStrategyFactory changes. Add/confirm an explicit branch in the uptime results strategy factory/selection logic so --mode thread-queue-parallel cannot fall back to serial/parallel. Example: in the uptime factory where it maps mode -> strategy, add case "thread-queue-parallel": return ResultsStrategyFactory(mode=mode, ...) and add a test that selects the CLI mode and asserts the queue-based strategy is used.
  • Add an end-to-end integration test for commit semantics: unit tests for OffsetTracker/FixedQueuePool are not sufficient. Create a test that runs the full queue-based processing strategy with a fake/embedded Kafka (or a deterministic stub of the arroyo consumer loop) and asserts committed offsets equal the tracker’s committable offsets under concurrency and out-of-order completion.
  • Harden failure/retry behavior: in src/sentry/remote_subscriptions/consumers/queue_consumer.py, ensure that when processing fails for an offset, the system either (a) does not mark it committed and eventually retries, or (b) marks it in a way that preserves ordering guarantees without blocking later offsets forever. Concretely: add a test where one offset raises and verify subsequent offsets still progress and commits are correct.

Posted by re-entry.ai · Risk governance for autonomous engineering teams

@re-entry-ai re-entry-ai Bot left a comment

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛡️ re-entry.ai Code Review

@CodingKylo — Please review the following assessment:

🔴 Risk Score: 68/100 · HIGH

Dimension Level
Likelihood 🔴 Critical
Impact 🟠 High
Detectability 🟡 Medium

Intent

Add a new uptime results processing mode that uses a queue to coordinate parallelism while preserving ordered Kafka offset commits.

Summary

The change introduces a new CLI mode thread-queue-parallel and wires new queue-based processing components into the remote subscriptions results strategy. The core risk is correctness around Kafka offset tracking/commit semantics under concurrency (including the offset+1 commit behavior) and ensuring the new mode is fully dispatched everywhere it’s selectable. The diff also shows new unit tests for the offset tracker/queue pool, but no end-to-end integration test proving the production processing loop uses the tracker correctly. Please verify that every mode dispatch site (uptime + remote subscriptions) explicitly supports thread-queue-parallel, and that offset commit behavior is correct and cannot deadlock or skip commits on failures/retries.

🎯 Review Focus

The queue-based offset tracking + commit coordination under concurrency: confirm the full OffsetTracker implementation and that every processed offset (success or failure) reaches the correct mark_committed/retry path without deadlocking or skipping commits, and verify the new thread-queue-parallel mode is fully wired in all mode dispatch sites.

Key Findings

  • 🚨 [src/sentry/remote_subscriptions/consumers/queue_consumer.py:L1] CRITICAL: OffsetTracker uses per-partition locks but the implementation is truncated in the diff; given the new concurrency/commit coordination, any missing/incorrect lock release or commit condition can deadlock or cause incorrect commits. Fix: ensure OffsetTracker methods (add_offset/get_committable_offsets/mark_committed) are fully implemented with correct lock scoping and that the worker always calls mark_committed in a finally block for every processed offset (including error paths).
  • 🚨 [src/sentry/remote_subscriptions/consumers/result_consumer.py:L105] CRITICAL: ResultsStrategyFactory adds thread_queue_parallel/queue_pool and sets self.result_processor = self.result_processor_cls() before mode branching; if result_processor_cls() depends on mode-specific fields (e.g., queue_pool) or if the mode branch doesn’t fully initialize required attributes, this can crash at runtime or silently run the wrong processor. Fix: move self.result_processor = ... to after all mode-specific initialization, and add an explicit else: raise ValueError(f"Unsupported mode: {mode}") plus assertions that required fields (parallel_executor/queue_pool/etc.) are non-None for the selected mode.

✅ Action Checklist

  • [ ] CRITICAL [src/sentry/remote_subscriptions/consumers/queue_consumer.py:L1] — OffsetTracker uses per-partition locks but the implementation is truncated in the diff; given the new concurrency/commit coordination, any missing/incorrect lock release or commit condition can deadlock or cause incorrect commits. Fix: ensure OffsetTracker methods (add_offset/get_committable_offsets/mark_committed) are fully implemented with correct lock scoping and that the worker always calls mark_committed in a finally block for every processed offset (including error paths).
  • [ ] CRITICAL [src/sentry/remote_subscriptions/consumers/result_consumer.py:L105] — ResultsStrategyFactory adds thread_queue_parallel/queue_pool and sets self.result_processor = self.result_processor_cls() before mode branching; if result_processor_cls() depends on mode-specific fields (e.g., queue_pool) or if the mode branch doesn’t fully initialize required attributes, this can crash at runtime or silently run the wrong processor. Fix: move self.result_processor = ... to after all mode-specific initialization, and add an explicit else: raise ValueError(f"Unsupported mode: {mode}") plus assertions that required fields (parallel_executor/queue_pool/etc.) are non-None for the selected mode.
  • [ ] SUGGESTION — Verify mode dispatch completeness: src/sentry/consumers/__init__.py adds --mode thread-queue-parallel, but the diff only shows remote-subscriptions ResultsStrategyFactory changes. Add/confirm an explicit branch in the uptime results strategy factory/selection logic so --mode thread-queue-parallel cannot fall back to serial/parallel. Example: in the uptime factory where it maps mode -> strategy, add case "thread-queue-parallel": return ResultsStrategyFactory(mode=mode, ...) and add a test that selects the CLI mode and asserts the queue-based strategy is used.
  • [ ] SUGGESTION — Add an end-to-end integration test for commit semantics: unit tests for OffsetTracker/FixedQueuePool are not sufficient. Create a test that runs the full queue-based processing strategy with a fake/embedded Kafka (or a deterministic stub of the arroyo consumer loop) and asserts committed offsets equal the tracker’s committable offsets under concurrency and out-of-order completion.
  • [ ] SUGGESTION — Harden failure/retry behavior: in src/sentry/remote_subscriptions/consumers/queue_consumer.py, ensure that when processing fails for an offset, the system either (a) does not mark it committed and eventually retries, or (b) marks it in a way that preserves ordering guarantees without blocking later offsets forever. Concretely: add a test where one offset raises and verify subsequent offsets still progress and commits are correct.

Suggestions

  • Verify mode dispatch completeness: src/sentry/consumers/__init__.py adds --mode thread-queue-parallel, but the diff only shows remote-subscriptions ResultsStrategyFactory changes. Add/confirm an explicit branch in the uptime results strategy factory/selection logic so --mode thread-queue-parallel cannot fall back to serial/parallel. Example: in the uptime factory where it maps mode -> strategy, add case "thread-queue-parallel": return ResultsStrategyFactory(mode=mode, ...) and add a test that selects the CLI mode and asserts the queue-based strategy is used.
  • Add an end-to-end integration test for commit semantics: unit tests for OffsetTracker/FixedQueuePool are not sufficient. Create a test that runs the full queue-based processing strategy with a fake/embedded Kafka (or a deterministic stub of the arroyo consumer loop) and asserts committed offsets equal the tracker’s committable offsets under concurrency and out-of-order completion.
  • Harden failure/retry behavior: in src/sentry/remote_subscriptions/consumers/queue_consumer.py, ensure that when processing fails for an offset, the system either (a) does not mark it committed and eventually retries, or (b) marks it in a way that preserves ordering guarantees without blocking later offsets forever. Concretely: add a test where one offset raises and verify subsequent offsets still progress and commits are correct.

Posted by re-entry.ai · Risk governance for autonomous engineering teams

@re-entry-ai re-entry-ai Bot left a comment

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛡️ re-entry.ai Code Review

@CodingKylo — Please review the following assessment:

🔴 Risk Score: 73/100 · HIGH

Dimension Level
Likelihood 🔴 Critical
Impact 🔴 Critical
Detectability 🟠 High

Intent

Add a new uptime results processing mode that uses a thread+queue pool to coordinate parallelism while preserving ordered Kafka offset commits.

Summary

Behaviorally, this PR introduces a new CLI mode thread-queue-parallel and wires in a new queue-based processing implementation with custom offset tracking/commit semantics (including committing offset + 1). The highest risk is correctness and shutdown safety: the new queue consumer appears to call a non-existent shutdown() method on queue.Queue, which can crash shutdown paths, and the offset-commit tracker’s committable-offset computation may be CPU-expensive under large offset gaps. The reviewer must verify end-to-end mode dispatch (CLI -> uptime consumer -> strategy factory), confirm the offset+1 contract matches Kafka/arroyo expectations under concurrency, and ensure the new tests reliably validate commit ordering without timing flakiness.

🎯 Review Focus

Confirm the new thread-queue-parallel mode’s offset-commit contract and dispatch correctness under concurrency (including the offset + 1 semantics) and ensure the new worker shutdown path cannot crash (the queue.Queue.shutdown() issue).

✅ Action Checklist

  • SUGGESTION — Verify mode dispatch end-to-end: src/sentry/consumers/__init__.py adds thread-queue-parallel to the CLI choice, but you must confirm the uptime consumer path actually instantiates the queue-based strategy (not a different factory). Concretely, trace from CLI option parsing -> uptime consumer -> UptimeResultsStrategyFactory/ResultsStrategyFactory selection, and add a unit test that asserts the selected strategy class when --mode thread-queue-parallel is provided.
  • SUGGESTION — Strengthen offset-commit correctness tests to avoid timing flakiness: the new uptime tests appear to use polling/sleeps and assert ordering/call counts. Replace sleep-based waiting with deterministic synchronization (barriers/events) around offset completion and commit calls, and add assertions that the committed offsets match the intended offset + 1 contract for each partition under concurrent completion.
  • SUGGESTION — Add an explicit contract test for the offset + 1 behavior: since queue_consumer commits offset + 1 (per the risk evidence), add a test that simulates out-of-order completion across offsets and asserts the exact committed offsets sequence per partition, including the boundary case where the last processed offset completes.

Suggestions

  • Verify mode dispatch end-to-end: src/sentry/consumers/__init__.py adds thread-queue-parallel to the CLI choice, but you must confirm the uptime consumer path actually instantiates the queue-based strategy (not a different factory). Concretely, trace from CLI option parsing -> uptime consumer -> UptimeResultsStrategyFactory/ResultsStrategyFactory selection, and add a unit test that asserts the selected strategy class when --mode thread-queue-parallel is provided.
  • Strengthen offset-commit correctness tests to avoid timing flakiness: the new uptime tests appear to use polling/sleeps and assert ordering/call counts. Replace sleep-based waiting with deterministic synchronization (barriers/events) around offset completion and commit calls, and add assertions that the committed offsets match the intended offset + 1 contract for each partition under concurrent completion.
  • Add an explicit contract test for the offset + 1 behavior: since queue_consumer commits offset + 1 (per the risk evidence), add a test that simulates out-of-order completion across offsets and asserts the exact committed offsets sequence per partition, including the boundary case where the last processed offset completes.

📝 This review includes 2 inline comments (1 critical, 1 warning)


Posted by re-entry.ai · Risk governance for autonomous engineering teams

- Each queue has exactly one worker thread
- Items within a queue are processed in FIFO order
- No dynamic reassignment that could break ordering
- Tracks offset completion for safe commits

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🚨 CRITICAL

Issue: FixedQueuePool.shutdown() calls q.shutdown(immediate=False) on queue.Queue. The standard library queue.Queue does not have a shutdown() method; this will raise AttributeError and prevent graceful shutdown.

Fix: Either use a queue implementation that supports shutdown (and type it accordingly), or implement shutdown by pushing sentinel items and/or using get(timeout=...) as suggested above. For example:

for _ in self.workers:
    work_queue.put(None)  # and have worker exit on sentinel

(or switch to queue.SimpleQueue/custom queue with shutdown support).

"""

def __init__(self) -> None:
self.all_offsets: dict[Partition, set[int]] = defaultdict(set)

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Issue: OffsetTracker.get_committable_offsets() uses a per-partition loop over range(start, max_offset + 1) and checks membership in sets for every offset in the contiguous window. Under load (large gaps or high offsets), this can become O(N) per commit tick and cause CPU spikes.

Fix: Track contiguous completion more efficiently (e.g., maintain a min-heap or a sorted structure of completed offsets and advance a pointer), or cap the scan length per tick. For example, add a max scan limit:

MAX_OFFSETS_TO_SCAN = 1000
...
for i, offset in enumerate(range(start, max_offset + 1)):
    if i >= MAX_OFFSETS_TO_SCAN:
        break
    ...

and/or redesign to avoid scanning across large offset ranges.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants