Skip to content

feat(uptime): Add ability to use queues to manage parallelism#7

Open
akshayutture-augment wants to merge 1 commit into
kafka-consumer-parallel-beforefrom
kafka-consumer-parallel-after
Open

feat(uptime): Add ability to use queues to manage parallelism#7
akshayutture-augment wants to merge 1 commit into
kafka-consumer-parallel-beforefrom
kafka-consumer-parallel-after

Conversation

@akshayutture-augment

@akshayutture-augment akshayutture-augment commented Nov 17, 2025

Copy link
Copy Markdown

No description provided.

One potential problem we have with batch processing is that any one slow
item will clog up the whole batch. This pr implements a queueing method
instead, where we keep N queues that each have their own workers.
There's still a chance of individual items backlogging a queue, but we
can try increased concurrency here to reduce the chances of that
happening

<!-- Describe your PR here. -->

@augmentcode augmentcode Bot left a comment

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Review completed. 3 suggestions posted.

Comment augment review to trigger a new review at any time.

try:
work_item = self.work_queue.get()
except queue.ShutDown:
break

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

queue.Queue does not define a ShutDown exception; catching queue.ShutDown will never trigger and can mask real shutdown conditions (also applies to the later except queue.ShutDown).

🤖 Was this useful? React with 👍 or 👎


for q in self.queues:
try:
q.shutdown(immediate=False)

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

queue.Queue has no shutdown method; calling q.shutdown(...) will raise and threads blocked on get() won’t be woken, risking a hang during shutdown.

🤖 Was this useful? React with 👍 or 👎

"Unexpected error in queue worker", extra={"worker_id": self.worker_id}
)
finally:
self.offset_tracker.complete_offset(work_item.partition, work_item.offset)

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Offsets are marked complete in the finally block even when processing raises, which violates the stated guarantee of committing only after successful processing and can advance commits past failed items.

🤖 Was this useful? React with 👍 or 👎

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants