Skip to content

feat(uptime): Add ability to use queues to manage parallelism#2

Open
ShashankFC wants to merge 1 commit into
kafka-consumer-parallel-beforefrom
kafka-consumer-parallel-after
Open

feat(uptime): Add ability to use queues to manage parallelism#2
ShashankFC wants to merge 1 commit into
kafka-consumer-parallel-beforefrom
kafka-consumer-parallel-after

Conversation

@ShashankFC

@ShashankFC ShashankFC commented Feb 13, 2026

Copy link
Copy Markdown

User description

Test 9

Summary by CodeRabbit

  • New Features

    • Added "thread-queue-parallel" execution mode for uptime processing, expanding available parallelism options.
    • Introduced ordered queue-based processing system for remote subscriptions with improved offset tracking and safe commit management.
  • Improvements

    • Updated help text for maximum worker parameter to reflect broader parallelism concepts.

✏️ Tip: You can customize this high-level summary in your review settings.


Replicated from ai-code-review-evaluation/sentry-coderabbit#9


CodeAnt-AI Description

Add a thread-queue-parallel mode that processes results using ordered per-group queues

What Changed

  • Introduces a new "thread-queue-parallel" processing mode which routes results into a fixed number of queues so items for the same group (subscription) are processed in FIFO order while different groups are handled concurrently.
  • Ensures offsets are tracked per partition and only committed once contiguous offsets have been processed, preventing premature commits across gaps.
  • Adds a new CLI choice for the uptime consumer to select "thread-queue-parallel" and ties the mode to a configurable number of queues/workers; includes tests and integration checks (including Kafka commit verification).

Impact

✅ Fewer batch stalls when one item is slow
✅ Maintains per-subscription ordering
✅ Clearer, safer Kafka offset commits

💡 Usage Guide

Checking Your Pull Request

Every time you make a pull request, our system automatically looks through it. We check for security issues, mistakes in how you're setting up your infrastructure, and common code problems. We do this to make sure your changes are solid and won't cause any trouble later.

Talking to CodeAnt AI

Got a question or need a hand with something in your pull request? You can easily get in touch with CodeAnt AI right here. Just type the following in a comment on your pull request, and replace "Your question here" with whatever you want to ask:

@codeant-ai ask: Your question here

This lets you have a chat with CodeAnt AI about your pull request, making it easier to understand and improve your code.

Example

@codeant-ai ask: Can you suggest a safer alternative to storing this secret?

Preserve Org Learnings with CodeAnt

You can record team preferences so CodeAnt AI applies them in future reviews. Reply directly to the specific CodeAnt AI suggestion (in the same thread) and replace "Your feedback here" with your input:

@codeant-ai: Your feedback here

This helps CodeAnt AI learn and adapt to your team's coding style and standards.

Example

@codeant-ai: Do not flag unused imports.

Retrigger review

Ask CodeAnt AI to review the PR again, by typing:

@codeant-ai: review

Check Your Repository Health

To analyze the health of your code repository, visit our dashboard at https://app.codeant.ai. This tool helps you identify potential issues and areas for improvement in your codebase, ensuring your repository maintains high standards of code health.

One potential problem we have with batch processing is that any one slow
item will clog up the whole batch. This pr implements a queueing method
instead, where we keep N queues that each have their own workers.
There's still a chance of individual items backlogging a queue, but we
can try increased concurrency here to reduce the chances of that
happening

<!-- Describe your PR here. -->
@ShashankFC

Copy link
Copy Markdown
Author

@CodeAnt-AI: review

@codeant-ai

codeant-ai Bot commented Feb 13, 2026

Copy link
Copy Markdown

CodeAnt AI is running the review.

@codeant-ai codeant-ai Bot added the size:XXL This PR changes 1000+ lines, ignoring generated files label Feb 13, 2026
Comment on lines +131 to +240
work_item = self.work_queue.get()
except queue.ShutDown:
break

try:
with sentry_sdk.start_transaction(
op="queue_worker.process",
name=f"monitors.{self.identifier}.worker_{self.worker_id}",
):
self.result_processor(self.identifier, work_item.result)

except queue.ShutDown:
break
except Exception:
logger.exception(
"Unexpected error in queue worker", extra={"worker_id": self.worker_id}
)
finally:
self.offset_tracker.complete_offset(work_item.partition, work_item.offset)
metrics.gauge(
"remote_subscriptions.queue_worker.queue_depth",
self.work_queue.qsize(),
tags={
"identifier": self.identifier,
},
)


class FixedQueuePool(Generic[T]):
"""
Fixed pool of queues that guarantees order within groups.

Key properties:
- Each group is consistently assigned to the same queue
- Each queue has exactly one worker thread
- Items within a queue are processed in FIFO order
- No dynamic reassignment that could break ordering
- Tracks offset completion for safe commits
"""

def __init__(
self,
result_processor: Callable[[str, T], None],
identifier: str,
num_queues: int = 20,
) -> None:
self.result_processor = result_processor
self.identifier = identifier
self.num_queues = num_queues
self.offset_tracker = OffsetTracker()
self.queues: list[queue.Queue[WorkItem[T]]] = []
self.workers: list[OrderedQueueWorker[T]] = []

for i in range(num_queues):
work_queue: queue.Queue[WorkItem[T]] = queue.Queue()
self.queues.append(work_queue)

worker = OrderedQueueWorker[T](
worker_id=i,
work_queue=work_queue,
result_processor=result_processor,
identifier=identifier,
offset_tracker=self.offset_tracker,
)
worker.start()
self.workers.append(worker)

def get_queue_for_group(self, group_key: str) -> int:
"""
Get queue index for a group using consistent hashing.
"""
return hash(group_key) % self.num_queues

def submit(self, group_key: str, work_item: WorkItem[T]) -> None:
"""
Submit a work item to the appropriate queue.
"""
queue_index = self.get_queue_for_group(group_key)
work_queue = self.queues[queue_index]

self.offset_tracker.add_offset(work_item.partition, work_item.offset)
work_queue.put(work_item)

def get_stats(self) -> dict[str, Any]:
"""Get statistics about queue depths."""
queue_depths = [q.qsize() for q in self.queues]
return {
"queue_depths": queue_depths,
"total_items": sum(queue_depths),
}

def wait_until_empty(self, timeout: float = 5.0) -> bool:
"""Wait until all queues are empty. Returns True if successful, False if timeout."""
start_time = time.time()
while time.time() - start_time < timeout:
if self.get_stats()["total_items"] == 0:
return True
time.sleep(0.01)
return False

def shutdown(self) -> None:
"""Gracefully shutdown all workers."""
for worker in self.workers:
worker.shutdown = True

for q in self.queues:
try:
q.shutdown(immediate=False)
except Exception:
logger.exception("Error shutting down queue")

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggestion: The worker pool tries to shut down using non-existent queue.Queue APIs (queue.ShutDown and q.shutdown), so worker threads remain blocked on Queue.get() and FixedQueuePool.shutdown() cannot reliably stop them; switching to a timeout-based get() and a sentinel value to unblock workers ensures clean, deterministic shutdown without relying on unsupported methods. [logic error]

Severity Level: Major ⚠️
- ⚠️ Remote_subscriptions result consumer shutdown logs queue shutdown exceptions.
- ⚠️ Thread-queue-parallel workers remain blocked on Queue.get during shutdown.
- ⚠️ Test teardown using FixedQueuePool.shutdown leaves daemon threads alive.
Suggested change
work_item = self.work_queue.get()
except queue.ShutDown:
break
try:
with sentry_sdk.start_transaction(
op="queue_worker.process",
name=f"monitors.{self.identifier}.worker_{self.worker_id}",
):
self.result_processor(self.identifier, work_item.result)
except queue.ShutDown:
break
except Exception:
logger.exception(
"Unexpected error in queue worker", extra={"worker_id": self.worker_id}
)
finally:
self.offset_tracker.complete_offset(work_item.partition, work_item.offset)
metrics.gauge(
"remote_subscriptions.queue_worker.queue_depth",
self.work_queue.qsize(),
tags={
"identifier": self.identifier,
},
)
class FixedQueuePool(Generic[T]):
"""
Fixed pool of queues that guarantees order within groups.
Key properties:
- Each group is consistently assigned to the same queue
- Each queue has exactly one worker thread
- Items within a queue are processed in FIFO order
- No dynamic reassignment that could break ordering
- Tracks offset completion for safe commits
"""
def __init__(
self,
result_processor: Callable[[str, T], None],
identifier: str,
num_queues: int = 20,
) -> None:
self.result_processor = result_processor
self.identifier = identifier
self.num_queues = num_queues
self.offset_tracker = OffsetTracker()
self.queues: list[queue.Queue[WorkItem[T]]] = []
self.workers: list[OrderedQueueWorker[T]] = []
for i in range(num_queues):
work_queue: queue.Queue[WorkItem[T]] = queue.Queue()
self.queues.append(work_queue)
worker = OrderedQueueWorker[T](
worker_id=i,
work_queue=work_queue,
result_processor=result_processor,
identifier=identifier,
offset_tracker=self.offset_tracker,
)
worker.start()
self.workers.append(worker)
def get_queue_for_group(self, group_key: str) -> int:
"""
Get queue index for a group using consistent hashing.
"""
return hash(group_key) % self.num_queues
def submit(self, group_key: str, work_item: WorkItem[T]) -> None:
"""
Submit a work item to the appropriate queue.
"""
queue_index = self.get_queue_for_group(group_key)
work_queue = self.queues[queue_index]
self.offset_tracker.add_offset(work_item.partition, work_item.offset)
work_queue.put(work_item)
def get_stats(self) -> dict[str, Any]:
"""Get statistics about queue depths."""
queue_depths = [q.qsize() for q in self.queues]
return {
"queue_depths": queue_depths,
"total_items": sum(queue_depths),
}
def wait_until_empty(self, timeout: float = 5.0) -> bool:
"""Wait until all queues are empty. Returns True if successful, False if timeout."""
start_time = time.time()
while time.time() - start_time < timeout:
if self.get_stats()["total_items"] == 0:
return True
time.sleep(0.01)
return False
def shutdown(self) -> None:
"""Gracefully shutdown all workers."""
for worker in self.workers:
worker.shutdown = True
for q in self.queues:
try:
q.shutdown(immediate=False)
except Exception:
logger.exception("Error shutting down queue")
# Use a timeout so the worker can notice shutdown requests.
work_item = self.work_queue.get(timeout=0.1)
except queue.Empty:
continue
if work_item is None:
# Sentinel used to unblock the worker during shutdown.
break
try:
with sentry_sdk.start_transaction(
op="queue_worker.process",
name=f"monitors.{self.identifier}.worker_{self.worker_id}",
):
self.result_processor(self.identifier, work_item.result)
except Exception:
logger.exception(
"Unexpected error in queue worker", extra={"worker_id": self.worker_id}
)
finally:
self.offset_tracker.complete_offset(work_item.partition, work_item.offset)
metrics.gauge(
"remote_subscriptions.queue_worker.queue_depth",
self.work_queue.qsize(),
tags={
"identifier": self.identifier,
},
)
class FixedQueuePool(Generic[T]):
"""
Fixed pool of queues that guarantees order within groups.
Key properties:
- Each group is consistently assigned to the same queue
- Each queue has exactly one worker thread
- Items within a queue are processed in FIFO order
- No dynamic reassignment that could break ordering
- Tracks offset completion for safe commits
"""
def __init__(
self,
result_processor: Callable[[str, T], None],
identifier: str,
num_queues: int = 20,
) -> None:
self.result_processor = result_processor
self.identifier = identifier
self.num_queues = num_queues
self.offset_tracker = OffsetTracker()
self.queues: list[queue.Queue[WorkItem[T]]] = []
self.workers: list[OrderedQueueWorker[T]] = []
for i in range(num_queues):
work_queue: queue.Queue[WorkItem[T]] = queue.Queue()
self.queues.append(work_queue)
worker = OrderedQueueWorker[T](
worker_id=i,
work_queue=work_queue,
result_processor=result_processor,
identifier=identifier,
offset_tracker=self.offset_tracker,
)
worker.start()
self.workers.append(worker)
def get_queue_for_group(self, group_key: str) -> int:
"""
Get queue index for a group using consistent hashing.
"""
return hash(group_key) % self.num_queues
def submit(self, group_key: str, work_item: WorkItem[T]) -> None:
"""
Submit a work item to the appropriate queue.
"""
queue_index = self.get_queue_for_group(group_key)
work_queue = self.queues[queue_index]
self.offset_tracker.add_offset(work_item.partition, work_item.offset)
work_queue.put(work_item)
def get_stats(self) -> dict[str, Any]:
"""Get statistics about queue depths."""
queue_depths = [q.qsize() for q in self.queues]
return {
"queue_depths": queue_depths,
"total_items": sum(queue_depths),
}
def wait_until_empty(self, timeout: float = 5.0) -> bool:
"""Wait until all queues are empty. Returns True if successful, False if timeout."""
start_time = time.time()
while time.time() - start_time < timeout:
if self.get_stats()["total_items"] == 0:
return True
time.sleep(0.01)
return False
def shutdown(self) -> None:
"""Gracefully shutdown all workers."""
# Signal workers to stop processing new items.
for worker in self.workers:
worker.shutdown = True
# Enqueue sentinel values to unblock any workers waiting on queue.get().
for _ in self.workers:
for q in self.queues:
q.put(None)
Steps of Reproduction ✅
1. Run the test suite that exercises the thread-queue-parallel pool, e.g.
`TestFixedQueuePool` in
`tests/sentry/remote_subscriptions/consumers/test_queue_consumer.py:65-88`, which creates
a `FixedQueuePool` in `setUp()` (lines 80-84) and calls `self.pool.shutdown()` in
`tearDown()` (lines 86-87).

2. During `setUp()`, `FixedQueuePool.__init__` in
`src/sentry/remote_subscriptions/consumers/queue_consumer.py:171-196` appends a
`queue.Queue` instance to `self.queues` (line 185) and starts an `OrderedQueueWorker` for
each queue (lines 188-196); each worker enters `OrderedQueueWorker.run` and blocks on
`self.work_queue.get()` (lines 129-132).

3. When `tearDown()` invokes `self.pool.shutdown()`, `FixedQueuePool.shutdown` at
`queue_consumer.py:231-243` sets `worker.shutdown = True` for each worker (lines 233-234),
then iterates queues and calls `q.shutdown(immediate=False)` on each `queue.Queue`
instance (lines 236-238); since `queue.Queue` from the stdlib (imported at
`queue_consumer.py:4`) has no `shutdown` method, an `AttributeError` is raised for each
queue, caught by the broad `except Exception` (lines 237-240), and logged as `"Error
shutting down queue"`.

4. After logging the errors, `FixedQueuePool.shutdown` calls `worker.join(timeout=5.0)`
for each worker thread (lines 242-243), but all workers are still blocked inside
`Queue.get()` because no sentinel or timeout is used and `queue.ShutDown` (referenced in
`OrderedQueueWorker.run` at lines 132 and 142) is not a real exception type in the `queue`
module and is never raised; as a result the join returns only after the timeout while
worker threads continue running as daemons, so both the tests and any
`ResultsStrategyFactory.shutdown()` caller (see
`src/sentry/remote_subscriptions/consumers/result_consumer.py:180-185`) cannot reliably
stop the worker pool.
Prompt for AI Agent 🤖
This is a comment left during a code review.

**Path:** src/sentry/remote_subscriptions/consumers/queue_consumer.py
**Line:** 131:240
**Comment:**
	*Logic Error: The worker pool tries to shut down using non-existent `queue.Queue` APIs (`queue.ShutDown` and `q.shutdown`), so worker threads remain blocked on `Queue.get()` and `FixedQueuePool.shutdown()` cannot reliably stop them; switching to a timeout-based `get()` and a sentinel value to unblock workers ensures clean, deterministic shutdown without relying on unsupported methods.

Validate the correctness of the flagged issue. If correct, How can I resolve this? If you propose a fix, implement it and please make it concise.
👍 | 👎

self.pool.submit(group_key, work_item)

stats = self.pool.get_stats()
assert stats["total_items"] > 0

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggestion: The assertion that the queue stats must report a strictly positive total_items immediately after submitting work is racy, since worker threads may drain the queues before the stats snapshot is taken, causing intermittent test failures even though the implementation is correct; it's safer to only assert the structure of the stats and that the queues are empty after processing completes. [race condition]

Severity Level: Major ⚠️
- ⚠️ Flaky stats test for FixedQueuePool on CI runs.
- ⚠️ Spurious failures in `test_queue_consumer.py` test suite.
Suggested change
assert stats["total_items"] > 0
Steps of Reproduction ✅
1. Run the test suite including `TestFixedQueuePool.test_stats_reporting` in
`tests/sentry/remote_subscriptions/consumers/test_queue_consumer.py:175-206` (e.g. `pytest
tests/sentry/remote_subscriptions/consumers/test_queue_consumer.py -k
test_stats_reporting`).

2. During `setUp` at `tests/.../test_queue_consumer.py:73-83`, a `FixedQueuePool` is
created with 3 worker threads
(`src/sentry/remote_subscriptions/consumers/queue_consumer.py:159-195`), each already
started and ready to consume items.

3. The test enqueues 10 `WorkItem`s via `self.pool.submit(...)` in the loop at
`tests/.../test_queue_consumer.py:182-197`, which internally calls `queue.Queue.put` and
allows the worker threads to start draining the queues immediately
(`queue_consumer.py:184-213`).

4. Immediately after the loop, the test calls `self.pool.get_stats()` and asserts
`stats["total_items"] > 0` at `tests/.../test_queue_consumer.py:199-200`; if the workers
have already processed all items such that each underlying `queue.Queue.qsize()` is zero
(`queue_consumer.py:214-220`), `total_items` is 0 and the assertion fails, even though the
implementation is correct and the rest of the test (which later asserts `total_items == 0`
after waiting) would pass. This produces an intermittent, timing‑dependent test failure.
Prompt for AI Agent 🤖
This is a comment left during a code review.

**Path:** tests/sentry/remote_subscriptions/consumers/test_queue_consumer.py
**Line:** 200:200
**Comment:**
	*Race Condition: The assertion that the queue stats must report a strictly positive `total_items` immediately after submitting work is racy, since worker threads may drain the queues before the stats snapshot is taken, causing intermittent test failures even though the implementation is correct; it's safer to only assert the structure of the stats and that the queues are empty after processing completes.

Validate the correctness of the flagged issue. If correct, How can I resolve this? If you propose a fix, implement it and please make it concise.
👍 | 👎

self.strategy.submit(self.create_message("sub1", 0, 101))

assert self.process_complete_event.wait(timeout=5.0), "Processing did not complete in time"
assert self.commit_event.wait(timeout=2.0), "Commit did not happen in time"

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggestion: In the invalid-message handling test, relying on a single Event to signal that a commit has happened introduces a race where the assertion can read the intermediate committed offset (100) before the second commit (101) occurs, making the test flaky; polling the stored committed offset until it reaches the expected value avoids this timing race. [race condition]

Severity Level: Major ⚠️
- ⚠️ Flaky offset-commit test for SimpleQueueProcessingStrategy.
- ⚠️ Nondeterministic CI failures in `test_handles_invalid_messages`.
Suggested change
assert self.commit_event.wait(timeout=2.0), "Commit did not happen in time"
# Wait until the committed offset reaches the expected value, tolerating
# that there may be an intermediate commit for the invalid message.
for _ in range(20):
with self.process_lock:
if self.committed_offsets.get(partition) == 101:
break
# Wait for a potential new commit and then loop to re-check.
self.commit_event.wait(timeout=0.1)
self.commit_event.clear()
Steps of Reproduction ✅
1. Run `TestSimpleQueueProcessingStrategy.test_handles_invalid_messages` in
`tests/sentry/remote_subscriptions/consumers/test_queue_consumer.py:327-349` (e.g. `pytest
tests/sentry/remote_subscriptions/consumers/test_queue_consumer.py -k
test_handles_invalid_messages`).

2. `setUp` for `TestSimpleQueueProcessingStrategy` at
`tests/.../test_queue_consumer.py:210-250` creates a `FixedQueuePool` and a
`SimpleQueueProcessingStrategy`
(`src/sentry/remote_subscriptions/consumers/queue_consumer.py:246-345`) with a background
commit thread that wakes every second and calls
`queue_pool.offset_tracker.get_committable_offsets()` (`queue_consumer.py:273-289`), then
invokes the `commit_function` defined in the test at
`tests/.../test_queue_consumer.py:232-235`.

3. In the test body, an invalid message with `FilteredPayload` at offset 100 and a valid
message at offset 101 are submitted at `tests/.../test_queue_consumer.py:335-345`. For the
invalid message, `SimpleQueueProcessingStrategy.submit` adds and immediately completes the
offset (result is `None`) at `queue_consumer.py:301-304`; the next commit-loop iteration
can commit offset 100 and call `commit_function`, which sets
`self.committed_offsets[partition] = 100` and `self.commit_event.set()`
(`tests/.../test_queue_consumer.py:232-235`).

4. If this first commit happens before the valid message at offset 101 has been processed
and before the second commit-loop iteration, the test's
`self.process_complete_event.wait(...)` at `tests/.../test_queue_consumer.py:347` will
return once the valid message is handled, but `self.commit_event` is already set from the
first commit. The subsequent `self.commit_event.wait(timeout=2.0)` returns immediately,
and the final assertion `self.committed_offsets.get(partition) == 101` at line 349 can
read the stale value 100, causing an intermittent failure. The suggested polling loop
ensures the test only asserts once the committed offset has actually advanced to 101.
Prompt for AI Agent 🤖
This is a comment left during a code review.

**Path:** tests/sentry/remote_subscriptions/consumers/test_queue_consumer.py
**Line:** 348:348
**Comment:**
	*Race Condition: In the invalid-message handling test, relying on a single `Event` to signal that a commit has happened introduces a race where the assertion can read the intermediate committed offset (100) before the second commit (101) occurs, making the test flaky; polling the stored committed offset until it reaches the expected value avoids this timing race.

Validate the correctness of the flagged issue. If correct, How can I resolve this? If you propose a fix, implement it and please make it concise.
👍 | 👎

time.sleep(0.1)

assert mock_processor_call.call_count == 2
assert len(committed_offsets) == 0 or test_partition not in committed_offsets

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggestion: The test_thread_queue_parallel_error_handling test currently asserts that no offsets are committed when a processing error occurs, but the queue worker marks offsets as completed even on exceptions and the commit loop will commit them, causing this test to fail and misrepresenting the documented intent that errors "don't block offset commits for other messages"; update the assertion to verify that some commit happens instead of asserting that none do. [logic error]

Severity Level: Major ⚠️
- ⚠️ Thread-queue-parallel error-handling test fails despite correct behavior.
- ⚠️ Misaligned assertion hides regressions in offset commit semantics.
- ⚠️ CI reliability for uptime Kafka consumer tests is reduced.
Suggested change
assert len(committed_offsets) == 0 or test_partition not in committed_offsets
# Even if one message fails, we should still eventually commit offsets for processed messages.
assert len(committed_offsets) > 0
Steps of Reproduction ✅
1. Run the test suite including `tests/sentry/uptime/consumers/test_results_consumer.py`
(e.g. `pytest
tests/sentry/uptime/consumers/test_results_consumer.py::ProcessResultSerialTest::test_thread_queue_parallel_error_handling`),
which exercises `ProcessResultSerialTest.test_thread_queue_parallel_error_handling` at
`tests/sentry/uptime/consumers/test_results_consumer.py:1874-1924`.

2. Inside this test, an `UptimeResultsStrategyFactory` is instantiated with
`mode="thread-queue-parallel"` and `max_workers=2` (factory class defined in
`src/sentry/uptime/consumers/results_consumer.py:600+`, inheriting from
`ResultsStrategyFactory` in
`src/sentry/remote_subscriptions/consumers/result_consumer.py:73+`), and
`factory.create_with_partitions(track_commits, {test_partition: 0})` is called, wiring
`track_commits` as the commit callback.

3. `ResultsStrategyFactory.create_with_partitions` at
`src/sentry/remote_subscriptions/consumers/result_consumer.py:200-212` detects
`self.thread_queue_parallel` and returns a `SimpleQueueProcessingStrategy` via
`create_thread_queue_parallel_worker`, which wraps the commit callback and uses a
`FixedQueuePool` from `src/sentry/remote_subscriptions/consumers/queue_consumer.py:159+`
to submit work items.

4. In the test's `with mock.patch.object(type(factory.result_processor), "__call__")`
block (`test_results_consumer.py:1891-1922`), two messages are submitted; the patched
`ResultProcessor.__call__` raises `Exception("Processing failed")` for the first, then
succeeds for the second, but `OrderedQueueWorker.run` in `queue_consumer.py:127-149`
always calls `offset_tracker.complete_offset(...)` in a `finally` block, and
`SimpleQueueProcessingStrategy._commit_loop` in `queue_consumer.py:273-291` periodically
calls the provided `commit_function`, which in this test is `track_commits` capturing
committed offsets; as a result `committed_offsets` becomes non-empty while the test still
asserts `len(committed_offsets) == 0 or test_partition not in committed_offsets`
(`test_results_consumer.py:1921-1922`), causing the assertion to fail and contradicting
the test's own docstring that errors "don't block offset commits for other messages."
Prompt for AI Agent 🤖
This is a comment left during a code review.

**Path:** tests/sentry/uptime/consumers/test_results_consumer.py
**Line:** 1922:1922
**Comment:**
	*Logic Error: The `test_thread_queue_parallel_error_handling` test currently asserts that no offsets are committed when a processing error occurs, but the queue worker marks offsets as completed even on exceptions and the commit loop will commit them, causing this test to fail and misrepresenting the documented intent that errors "don't block offset commits for other messages"; update the assertion to verify that some commit happens instead of asserting that none do.

Validate the correctness of the flagged issue. If correct, How can I resolve this? If you propose a fix, implement it and please make it concise.
👍 | 👎

@codeant-ai

codeant-ai Bot commented Feb 13, 2026

Copy link
Copy Markdown

CodeAnt AI finished running the review.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

size:XXL This PR changes 1000+ lines, ignoring generated files

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants