Add is_shutting_down flag for arroyo rebalance case#512
Add is_shutting_down flag for arroyo rebalance case#512NicoHinderling wants to merge 1 commit intomainfrom
Conversation
|
|
||
| # Check if killed during rebalance | ||
| pid = process.pid | ||
| if pid is not None: | ||
| with registry_lock: | ||
| was_killed_by_rebalance = pid in factory._killed_during_rebalance | ||
| if was_killed_by_rebalance: | ||
| factory._killed_during_rebalance.discard(pid) | ||
|
|
||
| if was_killed_by_rebalance: | ||
| # Wait for kill to complete, then don't commit offset | ||
| process.join(timeout=10) # Give kill_active_processes time to finish | ||
| logger.warning( | ||
| "Process killed during rebalance, message will be reprocessed", | ||
| extra={"artifact_id": artifact_id}, | ||
| ) | ||
| raise TimeoutError("Subprocess killed during rebalance") | ||
| if factory._is_shutting_down: | ||
| logger.warning( | ||
| "Process killed during rebalance, message will be reprocessed", | ||
| extra={"artifact_id": artifact_id}, | ||
| ) | ||
| raise TimeoutError("Subprocess killed during rebalance") | ||
|
|
||
| # Handle timeout (process still alive after full timeout) | ||
| if process.is_alive(): |
There was a problem hiding this comment.
Bug: Messages completing during rebalance are incorrectly marked as killed due to _is_shutting_down check, causing reprocessing.
Severity: CRITICAL | Confidence: High
🔍 Detailed Analysis
The code incorrectly raises a TimeoutError when a subprocess completes successfully (or naturally times out) during a Kafka rebalance, due to checking the global _is_shutting_down flag. This conflates processes actively killed by kill_active_processes() with those that finish while shutdown is merely in progress. As a result, messages that have already been processed successfully are marked as 'killed during rebalance' and reprocessed, violating exactly-once processing semantics and potentially leading to duplicate message processing.
💡 Suggested Fix
Reintroduce a mechanism, such as PID-based tracking, to explicitly differentiate between subprocesses that are actively killed during rebalance and those that complete naturally while _is_shutting_down is true. Only raise the 'killed during rebalance' TimeoutError if the process was indeed terminated by the shutdown sequence.
🤖 Prompt for AI Agent
Review the code at the location below. A potential bug has been identified by an AI
agent.
Verify if this is a real issue. If it is, propose a fix; if not, explain why it's not
valid.
Location: src/launchpad/kafka.py#L104-L115
Potential issue: The code incorrectly raises a `TimeoutError` when a subprocess
completes successfully (or naturally times out) during a Kafka rebalance, due to
checking the global `_is_shutting_down` flag. This conflates processes actively killed
by `kill_active_processes()` with those that finish while shutdown is merely in
progress. As a result, messages that have already been processed successfully are marked
as 'killed during rebalance' and reprocessed, violating exactly-once processing
semantics and potentially leading to duplicate message processing.
Did we get this right? 👍 / 👎 to inform future reviews.
Reference ID: 5866656
| "Process killed during rebalance, message will be reprocessed", | ||
| extra={"artifact_id": artifact_id}, | ||
| ) | ||
| raise TimeoutError("Subprocess killed during rebalance") |
There was a problem hiding this comment.
we can just use the flag instead of checking the pid set now
Codecov Report❌ Patch coverage is
Additional details and impacted files@@ Coverage Diff @@
## main #512 +/- ##
==========================================
+ Coverage 80.91% 80.93% +0.02%
==========================================
Files 164 164
Lines 14213 14209 -4
Branches 1501 1500 -1
==========================================
Hits 11500 11500
+ Misses 2144 2140 -4
Partials 569 569 ☔ View full report in Codecov by Sentry. 🚀 New features to boost your workflow:
|
|
Going with #513 instead for now, but keeping this PR in our back pocket for the reasons cited in 513's summary |
After merging #504 we noticed that there was still cases of the poll interval timeout happening. Upon further investigation, we realized this is happening:
To validate this, I set
KAFKA_MAX_PENDING_FUTURESto 0 and that killed the topic lag quickly on US region yesterday. This works because it means that arroyo'srun_task_in_threadswont have any future tasks pre-fetched. Instead of doing that though, we can have a flag set when shut down is happening and then just skip any future tasks that attempt to get processed during the shutdown rebalance.This way we can keep
KAFKA_MAX_PENDING_FUTURESnon zero and still have the benefit of prefetching tasks (and potentially still our consumer with a concurrency > 1 if we needed to in the future