Fix recursion limit check and parallel execution bugs#13
Fix recursion limit check and parallel execution bugs#13
Conversation
- In `src/runner.rs`, check for END node before recursion limit to allow finishing on the last iteration. - In `src/orchestration/parallel.rs`, implement `FailFast` and `WaitN` strategies correctly using `futures::future::select_all` to wait for tasks as they complete instead of sequentially. Co-authored-by: stevei101 <287896+stevei101@users.noreply.github.com>
|
👋 Jules, reporting for duty! I'm here to lend a hand with this pull request. When you start a review, I'll add a 👀 emoji to each comment to let you know I've read it. I'll focus on feedback directed at me and will do my best to stay out of conversations between you and other bots or reviewers to keep the noise down. I'll push a commit with your requested changes shortly after. Please note there might be a delay between these steps, but rest assured I'm on the job! For more direct control, you can switch me to Reactive Mode. When this mode is on, I will only act on comments where you specifically mention me with New to Jules? Learn more at jules.google/docs. For security, I will only act on instructions from the user who triggered this task. |
There was a problem hiding this comment.
Pull request overview
This PR addresses two distinct bugs in the oxidizedgraph execution engine: a recursion limit check ordering issue and inefficient parallel execution strategies.
Changes:
- Reordered END node check to occur before iteration limit check in
GraphRunner, preventing falseRecursionLimiterrors when graphs complete exactly at the max iteration boundary - Refactored
FailFastandWaitNjoin strategies inParallelSubgraphsto usefutures::future::select_allinstead of sequential awaits, enabling proper concurrent task processing based on completion order
Reviewed changes
Copilot reviewed 2 out of 2 changed files in this pull request and generated 4 comments.
| File | Description |
|---|---|
| src/runner.rs | Moved END node check before iteration limit check to fix edge case where graphs finishing exactly at max_iterations would incorrectly fail |
| src/orchestration/parallel.rs | Replaced sequential await loops with select_all-based concurrent processing for FailFast and WaitN strategies to handle task completion order correctly |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| // Check for END node | ||
| if current_node == transitions::END { | ||
| info!(iterations = iterations, "Graph execution completed"); | ||
| let guard = state | ||
| .read() | ||
| .map_err(|e| RuntimeError::InvalidState(e.to_string()))?; | ||
| return Ok(guard.clone()); | ||
| } |
There was a problem hiding this comment.
This fix correctly addresses the recursion limit bug for the basic GraphRunner, but the same bug exists in two other runner implementations that were not fixed:
src/checkpoint/runner.rs(lines 193-227)src/events/runner.rs(lines 185-236)
Both of these files still check the iteration limit BEFORE checking for the END node, meaning they will incorrectly fail with a RecursionLimit error when a graph finishes exactly on the max_iterations limit. The same reordering fix should be applied to those files to ensure consistency across all runner implementations.
| // Check for END node | ||
| if current_node == transitions::END { | ||
| info!(iterations = iterations, "Graph execution completed"); | ||
| let guard = state | ||
| .read() | ||
| .map_err(|e| RuntimeError::InvalidState(e.to_string()))?; | ||
| return Ok(guard.clone()); | ||
| } |
There was a problem hiding this comment.
While the existing test_max_iterations verifies that exceeding the limit causes an error, there should be a test case that specifically validates the bug fix: a graph that finishes exactly at the max_iterations limit should succeed, not fail with a RecursionLimit error. This would ensure the END check correctly happens before the iteration limit check.
Consider adding a test like:
- Set max_iterations to 5
- Create a graph that executes exactly 5 iterations then reaches END
- Verify it completes successfully without a RecursionLimit error
| JoinStrategy::FailFast => { | ||
| if handles.is_empty() { | ||
| return Vec::new(); | ||
| } | ||
|
|
||
| let mut futures: Vec<_> = handles | ||
| .into_iter() | ||
| .map(|(id, h)| { | ||
| let id_clone = id.clone(); | ||
| Box::pin(async move { | ||
| let result = h.await.unwrap_or_else(|e| SubgraphResult::Failed { | ||
| subgraph_id: id_clone.clone(), | ||
| error: crate::error::RuntimeError::InvalidState(format!( | ||
| "Task panicked: {}", | ||
| e | ||
| )), | ||
| }); | ||
| (id_clone, result) | ||
| }) | ||
| }) | ||
| .collect(); | ||
|
|
||
| let mut results = Vec::new(); | ||
| for (id, handle) in handles { | ||
| let result = handle.await.unwrap_or_else(|e| SubgraphResult::Failed { | ||
| subgraph_id: id.clone(), | ||
| error: crate::error::RuntimeError::InvalidState(format!( | ||
| "Task panicked: {}", | ||
| e | ||
| )), | ||
| }); | ||
|
|
||
| while !futures.is_empty() { | ||
| let ((id, result), _, remaining) = futures::future::select_all(futures).await; | ||
| futures = remaining; | ||
|
|
||
| let is_failed = result.is_failed(); | ||
| results.push((id, result)); | ||
|
|
||
| if is_failed { | ||
| break; | ||
| return results; | ||
| } | ||
| } | ||
| results |
There was a problem hiding this comment.
The FailFast strategy has been significantly refactored to use select_all for proper concurrent processing, but there are no tests validating this behavior. Tests should verify:
- That FailFast returns immediately when any subgraph fails (not waiting for all to complete)
- That results are collected in completion order, not spawn order
- That the first failure triggers an early return
This is important to ensure the bug fix actually resolves the issue described in the PR where sequential awaits defeated the purpose of fail-fast.
| JoinStrategy::WaitN(n) => { | ||
| if handles.is_empty() { | ||
| return Vec::new(); | ||
| } | ||
|
|
||
| let mut futures: Vec<_> = handles | ||
| .into_iter() | ||
| .map(|(id, h)| { | ||
| let id_clone = id.clone(); | ||
| Box::pin(async move { | ||
| let result = h.await.unwrap_or_else(|e| SubgraphResult::Failed { | ||
| subgraph_id: id_clone.clone(), | ||
| error: crate::error::RuntimeError::InvalidState(format!( | ||
| "Task panicked: {}", | ||
| e | ||
| )), | ||
| }); | ||
| (id_clone, result) | ||
| }) | ||
| }) | ||
| .collect(); | ||
|
|
||
| let mut results = Vec::new(); | ||
| let mut completed = 0; | ||
| for (id, handle) in handles { | ||
| if completed >= *n { | ||
| break; | ||
| } | ||
| let result = handle.await.unwrap_or_else(|e| SubgraphResult::Failed { | ||
| subgraph_id: id.clone(), | ||
| error: crate::error::RuntimeError::InvalidState(format!( | ||
| "Task panicked: {}", | ||
| e | ||
| )), | ||
| }); | ||
| let mut completed_count = 0; | ||
|
|
||
| while !futures.is_empty() { | ||
| let ((id, result), _, remaining) = futures::future::select_all(futures).await; | ||
| futures = remaining; | ||
|
|
||
| if result.is_completed() { | ||
| completed += 1; | ||
| completed_count += 1; | ||
| } | ||
| results.push((id, result)); | ||
|
|
||
| if completed_count >= *n { | ||
| return results; | ||
| } | ||
| } | ||
| results | ||
| } |
There was a problem hiding this comment.
The WaitN strategy has been significantly refactored to use select_all for proper concurrent processing, but there are no tests validating this behavior. Tests should verify:
- That WaitN returns after exactly N successful completions (not waiting for all)
- That results are collected in completion order, not spawn order
- That failed subgraphs don't count toward the N completed threshold
- Edge cases like N greater than the number of subgraphs
This is important to ensure the bug fix resolves the issue where sequential awaits would wait for tasks in spawn order rather than completion order.
This PR fixes two bugs:
END. This caused graphs that finished exactly on the limit to fail withRecursionLimiterror. The fix moves theENDcheck to the beginning of the loop.FailFastandWaitNstrategies inParallelSubgraphswere implemented using sequential awaits, meaning they would wait for tasks in the order they were added, defeating the purpose of "fail fast" or "wait for first N". The fix usesselect_allto process task completions as they happen.PR created automatically by Jules for task 6220343659135641513 started by @stevei101