feat(spider-execution-manager): Add liveness actor with session ID tracker.#328
feat(spider-execution-manager): Add liveness actor with session ID tracker.#328LinZhihao-723 wants to merge 11 commits into
Conversation
WalkthroughThis pull request introduces a complete execution manager subsystem that spawns task executor subprocesses, manages their lifecycle through a process pool, maintains liveness via periodic heartbeats, and coordinates task dispatch using a wire-protocol IPC layer. It includes the executor binary implementation, client abstractions for external service coordination, comprehensive integration testing, and foundational session tracking. ChangesExecution Manager Feature
Estimated code review effort🎯 4 (Complex) | ⏱️ ~45 minutes Possibly related PRs
Suggested reviewers
🚥 Pre-merge checks | ✅ 5✅ Passed checks (5 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Actionable comments posted: 2
🧹 Nitpick comments (1)
components/spider-execution-manager/src/process_pool.rs (1)
163-173: ⚡ Quick winSerialize the request before taking the executor mutex.
build_request()only does local encoding, but it currently runs after Line 163 inside the same mutex scope that guards the child process. Moving it ahead of the lock keeps large input serializations and local encoding failures from extending head-of-line blocking on the single executor.🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@components/spider-execution-manager/src/process_pool.rs` around lines 163 - 173, Call build_request(request) before acquiring the executor mutex so local serialization/encoding work does not hold the child-process lock; specifically, move the build_request(request)? invocation out of the critical section that surrounds self.handle.lock().await and handle.run(...).await, so you compute frame_request (via build_request) first, then acquire the mutex (self.handle.lock().await), get handle (handle_guard.as_mut().ok_or(InternalError::NotRunning)?), log and call handle.run(frame_request, hard_timeout).await.
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
Inline comments:
In `@components/spider-execution-manager/src/process_pool.rs`:
- Around line 278-325: The current run function (async fn run(&mut self,
request: Request, hard_timeout: Duration) -> Outcome) only starts the
tokio::select! timeout after awaiting self.requests.send(...).await, so a
blocked send can prevent the hard_timeout from ever firing; change run to cover
the full send+receive window by moving the timeout to wrap both send and
response handling (e.g., use tokio::time::timeout(hard_timeout, async {
self.requests.send(Bytes::from(bytes)).await?; self.responses.next().await }) or
include the send future in the same tokio::select! alongside responses and the
sleep), ensuring the send call (self.requests.send) is protected by hard_timeout
and still returns Outcome::Timeout on expiration.
In `@components/spider-task-executor/src/bin/spider_task_executor.rs`:
- Around line 73-77: The package identifier is used directly to build a
filesystem path in the else branch (the block that calls manager.get(package)
and manager.load(&path)), allowing path traversal; before joining
pkg_dir.join(package) validate/sanitize `package` (e.g., reject empty strings,
any path separators like '/' or '\\', any ".." components, and allow only a safe
whitelist such as [A-Za-z0-9_-]); if validation fails return an error instead of
constructing the path; apply this check where you construct `path` and before
calling `manager.load` so only safe package names are used.
---
Nitpick comments:
In `@components/spider-execution-manager/src/process_pool.rs`:
- Around line 163-173: Call build_request(request) before acquiring the executor
mutex so local serialization/encoding work does not hold the child-process lock;
specifically, move the build_request(request)? invocation out of the critical
section that surrounds self.handle.lock().await and handle.run(...).await, so
you compute frame_request (via build_request) first, then acquire the mutex
(self.handle.lock().await), get handle
(handle_guard.as_mut().ok_or(InternalError::NotRunning)?), log and call
handle.run(frame_request, hard_timeout).await.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Repository UI
Review profile: CHILL
Plan: Pro
Run ID: fed459f0-c414-490d-9e84-a0fc3c793720
⛔ Files ignored due to path filters (1)
Cargo.lockis excluded by!**/*.lock
📒 Files selected for processing (27)
Cargo.tomlcomponents/spider-core/Cargo.tomlcomponents/spider-core/src/lib.rscomponents/spider-core/src/session.rscomponents/spider-execution-manager/Cargo.tomlcomponents/spider-execution-manager/src/client.rscomponents/spider-execution-manager/src/client/liveness.rscomponents/spider-execution-manager/src/client/scheduler.rscomponents/spider-execution-manager/src/client/storage.rscomponents/spider-execution-manager/src/lib.rscomponents/spider-execution-manager/src/liveness.rscomponents/spider-execution-manager/src/process_pool.rscomponents/spider-task-executor/Cargo.tomlcomponents/spider-task-executor/src/bin/spider_task_executor.rscomponents/spider-task-executor/src/error.rscomponents/spider-task-executor/src/lib.rscomponents/spider-task-executor/src/manager.rscomponents/spider-task-executor/src/protocol.rstaskfiles/test.yamltests/huntsman/integration-test-tasks/Cargo.tomltests/huntsman/integration-test-tasks/src/lib.rstests/huntsman/task-executor/Cargo.tomltests/huntsman/task-executor/src/lib.rstests/huntsman/task-executor/tests/overhead_instrument.rstests/huntsman/task-executor/tests/test_executor.rstests/huntsman/task-executor/tests/test_process_pool.rstests/huntsman/tdl-integration/tests/complex.rs
| async fn run(&mut self, request: Request, hard_timeout: Duration) -> Outcome { | ||
| let bytes = bincode::serialize(&request).expect("bincode encode Request"); | ||
| if let Err(err) = self.requests.send(Bytes::from(bytes)).await { | ||
| tracing::warn!( | ||
| executor_id = self.executor_id, | ||
| err = ? err, | ||
| "Failed to send request to executor." | ||
| ); | ||
| return Outcome::ExecutorCrash { | ||
| exit_status: self.poll_exit_code(), | ||
| }; | ||
| } | ||
|
|
||
| tokio::select! { | ||
| biased; | ||
| frame = self.responses.next() => match frame { | ||
| Some(Ok(bytes)) => match bincode::deserialize::<Response>(&bytes) { | ||
| Ok(Response::Result { outcome, elapsed_us }) => match outcome { | ||
| ExecutorOutcome::Success { outputs } => { | ||
| Outcome::Success { outputs, elapsed_us } | ||
| } | ||
| ExecutorOutcome::Failure { error } => { | ||
| Outcome::InTaskFailure { error, elapsed_us } | ||
| } | ||
| }, | ||
| Err(err) => { | ||
| tracing::error!( | ||
| executor_id = self.executor_id, | ||
| err = ? err, | ||
| "Failed to decode executor's response. Considered as crashed." | ||
| ); | ||
| Outcome::ExecutorCrash { exit_status: self.poll_exit_code() } | ||
| } | ||
| }, | ||
| Some(Err(err)) => { | ||
| tracing::error!( | ||
| executor_id = self.executor_id, | ||
| err = ? err, | ||
| "Failed to receive executor's response." | ||
| ); | ||
| Outcome::ExecutorCrash { exit_status: self.poll_exit_code() } | ||
| } | ||
| None => Outcome::ExecutorCrash { exit_status: self.poll_exit_code() }, | ||
| }, | ||
| () = tokio::time::sleep(hard_timeout) => { | ||
| tracing::warn!(executor_id = self.executor_id, "Executor time out triggered."); | ||
| Outcome::Timeout { hard_timeout } | ||
| } |
There was a problem hiding this comment.
🧩 Analysis chain
🏁 Script executed:
#!/bin/bash
set -euo pipefail
# Locate the run() implementation and inspect the exact stdin write / pipe handling around the referenced lines.
FILE="components/spider-execution-manager/src/process_pool.rs"
# Show a window around the snippet lines
sed -n '240,360p' "$FILE" | cat -n
# Find where stdin (or any pipe writer) is written in run()
rg -n "stdin|write_all|AsyncWriteExt|pipe|requests\.send|Bytes::from" "$FILE"
# Print the send-related code with some surrounding context (where possible)
rg -n "self\.requests\.send" "$FILE" && \
sed -n '260,330p' "$FILE" | cat -n
# Also locate poll_exit_code and respawn/timeout path logic for context
rg -n "poll_exit_code|respawn|Timeout|ExecutorCrash" "$FILE"Repository: y-scope/spider
Length of output: 11483
Cover the stdin write with hard_timeout
In ExecutorHandle::run, the hard_timeout tokio::select! doesn’t start until after self.requests.send(...).await completes. If the child stops reading stdin (or the framed writer backpressures), that send can block indefinitely, preventing the timeout (and subsequent crash/respawn handling) from triggering. Move the timeout to cover the full send+receive exchange (e.g., tokio::time::timeout(hard_timeout, async { send; recv }) or include the send future in the same select!).
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@components/spider-execution-manager/src/process_pool.rs` around lines 278 -
325, The current run function (async fn run(&mut self, request: Request,
hard_timeout: Duration) -> Outcome) only starts the tokio::select! timeout after
awaiting self.requests.send(...).await, so a blocked send can prevent the
hard_timeout from ever firing; change run to cover the full send+receive window
by moving the timeout to wrap both send and response handling (e.g., use
tokio::time::timeout(hard_timeout, async {
self.requests.send(Bytes::from(bytes)).await?; self.responses.next().await }) or
include the send future in the same tokio::select! alongside responses and the
sleep), ensuring the send call (self.requests.send) is protected by hard_timeout
and still returns Outcome::Timeout on expiration.
| let pkg = if let Some(pkg) = manager.get(package) { | ||
| pkg | ||
| } else { | ||
| let path = pkg_dir.join(package).join(format!("lib{package}.so")); | ||
| manager.load(&path)? |
There was a problem hiding this comment.
Validate package identifiers before filesystem path construction
Line 76 builds a library path from package without validating components. A crafted value (for example containing separators or ..) can escape the intended package directory and load an unintended shared object.
Proposed fix
use std::{
- path::{Path, PathBuf},
+ path::{Component, Path, PathBuf},
time::Instant,
};
@@
fn run_task(
@@
) -> Result<Vec<u8>, ExecutorError> {
let pkg = if let Some(pkg) = manager.get(package) {
pkg
} else {
+ let mut components = Path::new(package).components();
+ let valid_package = matches!(components.next(), Some(Component::Normal(_)))
+ && components.next().is_none();
+ if !valid_package {
+ return Err(ExecutorError::InvalidLibrary(format!(
+ "invalid package identifier: {package}"
+ )));
+ }
let path = pkg_dir.join(package).join(format!("lib{package}.so"));
manager.load(&path)?
};
pkg.execute_task(task_func, raw_ctx, raw_inputs)
}🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@components/spider-task-executor/src/bin/spider_task_executor.rs` around lines
73 - 77, The package identifier is used directly to build a filesystem path in
the else branch (the block that calls manager.get(package) and
manager.load(&path)), allowing path traversal; before joining
pkg_dir.join(package) validate/sanitize `package` (e.g., reject empty strings,
any path separators like '/' or '\\', any ".." components, and allow only a safe
whitelist such as [A-Za-z0-9_-]); if validation fails return an error instead of
constructing the path; apply this check where you construct `path` and before
calling `manager.load` so only safe package names are used.
Description
This PR depends on #327.
This PR adds the liveness actor and the shared session-tracker primitive that the rest of the execution-manager runtime will plug into.
spider_core::session::SessionTracker— a forward-only counter wrappingArc<AtomicU64>for the runtime's view of storage's current session id. Cloneable, withcurrent()/try_advance()semantics: writers always move the stored value forward via a CAS loop, and reads observe the latest committed value. Lives inspider-coreso the future scheduler service can reuse the same primitive.spider_execution_manager::liveness— a tokio-actor driving the periodic storage heartbeat:tokio::time::intervalticks everyheartbeat_interval; each tick callsLivenessClient::heartbeat(em_id)and forwards storage's reply to the sharedSessionTracker. The interval usesMissedTickBehavior::Skipas a defensive guard against starvation-induced burst-replay.LivenessCommand::Refreshlets the rest of the runtime ask for an off-schedule heartbeat. The command does not advance the tracker directly — storage's heartbeat reply is the only source of truth for the current session id, so the actor always re-checks rather than trusting the caller's observation.interval.reset()runs at the end of every heartbeat call, so aRefresh-triggered heartbeat naturally rate-limits the next scheduled tick. Two consecutive heartbeats are never closer together thanheartbeat_interval.MarkedDead/IllegalId) cancel the actor'sCancellationToken, which the rest of the runtime will observe to tear everything down.Checklist
breaking change.
Validation performed
Summary by CodeRabbit
Release Notes
New Features
Tests