Skip to content

feat(uptime): Add ability to use queues to manage parallelism#9

Open
zaibkhan wants to merge 1 commit into
kafka-consumer-parallel-beforefrom
kafka-consumer-parallel-after
Open

feat(uptime): Add ability to use queues to manage parallelism#9
zaibkhan wants to merge 1 commit into
kafka-consumer-parallel-beforefrom
kafka-consumer-parallel-after

Conversation

@zaibkhan

@zaibkhan zaibkhan commented Sep 9, 2025

Copy link
Copy Markdown

Mirrors ai-code-review-evaluation#9 for like-for-like benchmarking.

  • Base: kafka-consumer-parallel-before
  • Head: kafka-consumer-parallel-after

Original PR excerpt:

Test 9

One potential problem we have with batch processing is that any one slow
item will clog up the whole batch. This pr implements a queueing method
instead, where we keep N queues that each have their own workers.
There's still a chance of individual items backlogging a queue, but we
can try increased concurrency here to reduce the chances of that
happening

<!-- Describe your PR here. -->
@codoki-pr-intelligence

codoki-pr-intelligence Bot commented Sep 9, 2025

Copy link
Copy Markdown

Codoki PR Review

Summary: Fix queue worker shutdown, prevent hangs
What’s good: Threaded queue design preserves per-group ordering and decouples commit logic; tests (including Kafka integration) provide strong validation of ordering and offset semantics.
Review Status: ❌ Requires changes
Overall Priority: High

Issues (Critical & High only)

Severity Issue Why it matters
High Correctness — Blocking get() prevents graceful shutdown; ShutDown is in... …/consumers/queue_consumer.py
The stdlib queue.Queue has no ShutDown exception and get() will block indefinitely; on shutdown, workers can hang and never reach the loop condition. Use a timeout (and optionally a sentinel) and handle queue.Empty to allow prompt exit.

Showing top 1 issues. Critical: 0, High: 1. See inline suggestions for more.

Key Feedback (click to expand)
  • Needs improvement: Worker shutdown relies on stdlib queue semantics that don't support shutdown signaling, risking blocked threads; replace non-existent ShutDown handling with timeout-based get or a sentinel.
  • Testing: Unit tests cover offset tracking, ordering, concurrent groups, and Kafka commit behavior, including gap handling; nice coverage. Consider a test that exercises shutdown timing to ensure workers exit promptly under empty queues.
  • Documentation: Consider adding a short docstring comment on SimpleQueueProcessingStrategy.commit loop semantics (offset +1 contract) to document the rationale and constraints.
  • Compatibility: None observed; changes are internal to consumer processing paths.
  • Performance: OffsetTracker.get_committable_offsets iterates from min to max processed offset and checks set membership; this is O(n) per partition. If partitions accumulate large sparse offset sets, consider tracking a moving 'next_expected' pointer to avoid range scans.
  • Open questions: Do we expect deterministic queue assignment across restarts? If so, consider a stable hash (e.g., mmh3) instead of Python’s salted hash.

Confidence: 3/5 — Needs work before merge (1 high · status: Requires changes)

Sequence Diagram

sequenceDiagram
    participant Factory
    participant Strategy
    participant QueuePool
    participant Worker
    Factory->>Factory: create_thread_queue_parallel_worker(commit)
    Factory-->>Strategy: SimpleQueueProcessingStrategy(decoder, grouping_fn, commit_fn)
    Strategy->>QueuePool: submit(group_key, WorkItem)
    QueuePool->>Worker: work_queue.put(WorkItem)
    Worker->>Worker: result_processor(identifier, work_item.result)
    Strategy->>Strategy: _commit_loop()
    Strategy-->>Factory: commit_offsets({partition: offset+1})
Loading

React with 👍 or 👎 if you found this review useful.

"""Process items from the queue in order."""
while not self.shutdown:
try:
work_item = self.work_queue.get()

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ High: The stdlib queue.Queue has no ShutDown exception and get() will block indefinitely; on shutdown, workers can hang and never reach the loop condition. Use a timeout (and optionally a sentinel) and handle queue.Empty to allow prompt exit.

Suggested change
work_item = self.work_queue.get()
```suggestion
try:
work_item = self.work_queue.get(timeout=0.5)
if work_item is None:
break
except queue.Empty:
continue


for q in self.queues:
try:
q.shutdown(immediate=False)

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🔷 Medium: queue.Queue does not expose shutdown(); this raises AttributeError and logs an exception each shutdown. With the timeout-based get (or sentinel) in the worker, simply wake workers using a sentinel instead of calling a non-existent API.

Suggested change
q.shutdown(immediate=False)
```suggestion
q.put_nowait(None)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants