Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
96 changes: 96 additions & 0 deletions crates/agent/src/containerd/container.rs
Original file line number Diff line number Diff line change
Expand Up @@ -186,6 +186,28 @@ impl Containers {
Ok(Containers { containers })
}

/// Keep only the latest attempt for each container name.
///
/// `crictl ps -a` returns historical attempts; this function only keeps latest attempt.
pub fn filter_by_latest_attempt(self) -> Self {
let mut latest_by_name: HashMap<String, ContainerSummary> = HashMap::new();

for container in self.containers {
let name = container.metadata.name.clone();
if let Some(existing) = latest_by_name.get(&name)
&& container.metadata.attempt <= existing.metadata.attempt
{
continue;
}
latest_by_name.insert(name, container);
}

let mut containers: Vec<_> = latest_by_name.into_values().collect();
containers.sort_by(|a, b| a.metadata.name.cmp(&b.metadata.name));

Containers { containers }
}

pub fn find_by_name<T>(self, name: T) -> eyre::Result<ContainerSummary>
where
T: AsRef<str>,
Expand Down Expand Up @@ -345,4 +367,78 @@ mod tests {
let filtered = container_images.find_by_name("doca_hbn").unwrap();
assert_eq!(filtered.names[0].version(), "2.3.0-doca2.8.0".to_string());
}

#[test]
fn test_filter_container_by_latest_attempt() {
let old_exited = ContainerSummary {
id: "old".to_string(),
sandbox_id: "pod1".to_string(),
metadata: ContainerMetadata {
name: "svc-a".to_string(),
attempt: 0,
},
image: ContainerImage {
id: "img".to_string(),
annotations: HashMap::new(),
},
image_ref: Vec::new(),
state: ContainerState::Exited,
created_at: "1".to_string(),
labels: HashMap::new(),
annotations: HashMap::new(),
};

let new_running = ContainerSummary {
id: "new".to_string(),
sandbox_id: "pod1".to_string(),
metadata: ContainerMetadata {
name: "svc-a".to_string(),
attempt: 1,
},
image: ContainerImage {
id: "img".to_string(),
annotations: HashMap::new(),
},
image_ref: Vec::new(),
state: ContainerState::Running,
created_at: "2".to_string(),
labels: HashMap::new(),
annotations: HashMap::new(),
};

let unaffected = ContainerSummary {
id: "other".to_string(),
sandbox_id: "pod1".to_string(),
metadata: ContainerMetadata {
name: "svc-b".to_string(),
attempt: 0,
},
image: ContainerImage {
id: "img2".to_string(),
annotations: HashMap::new(),
},
image_ref: Vec::new(),
state: ContainerState::Running,
created_at: "3".to_string(),
labels: HashMap::new(),
annotations: HashMap::new(),
};

let got = Containers {
containers: vec![old_exited, new_running, unaffected],
}
.filter_by_latest_attempt();

assert_eq!(got.containers.len(), 2);
assert!(
got.containers
.iter()
.any(|c| c.metadata.name == "svc-a" && c.metadata.attempt == 1)
);
assert!(
got.containers
.iter()
.any(|c| c.metadata.name == "svc-b" && c.metadata.attempt == 0)
);
}
}
158 changes: 136 additions & 22 deletions crates/agent/src/extension_services/k8s_pod_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -376,15 +376,16 @@ impl KubernetesPodServicesHandler {
///
/// Inputs:
/// - `statuses`: normalized container states (e.g. "RUNNING", "EXITED", "CREATED", "UNKNOWN").
/// - `has_exited_with_error`: true when any EXITED container has non-zero exit code.
/// - `expected_deploy`: whether we expect the service to be up (deployed) now.
///
/// Rules:
/// - If no containers:
/// - expected -> PENDING
/// - not expected -> TERMINATED
/// - When expected to be deployed:
/// - all RUNNING -> RUNNING
/// - any EXITED -> ERROR
/// - all containers RUNNING or clean EXITED -> RUNNING
/// - any EXITED with non-zero exit code -> ERROR
/// - any CREATED or UNKNOWN -> PENDING
/// - otherwise -> PENDING
/// - When NOT expected to be deployed (we expect it to be gone):
Expand All @@ -395,6 +396,7 @@ impl KubernetesPodServicesHandler {
&self,
pod_status: &str,
statuses: &[String],
has_exited_with_error: bool,
expected_deploy: bool,
) -> rpc::DpuExtensionServiceDeploymentStatus {
if statuses.is_empty() {
Expand All @@ -407,21 +409,20 @@ impl KubernetesPodServicesHandler {
};
}

let all_running = statuses.iter().all(|s| s == "RUNNING");
let all_exited = statuses.iter().all(|s| s == "EXITED");
let all_running_or_exited = statuses.iter().all(|s| s == "RUNNING" || s == "EXITED");

let any_running = statuses.iter().any(|s| s == "RUNNING");
let any_exited = statuses.iter().any(|s| s == "EXITED");
let any_created = statuses.iter().any(|s| s == "CREATED");
let any_unknown = statuses.iter().any(|s| s == "UNKNOWN");

if expected_deploy {
if all_running {
return rpc::DpuExtensionServiceDeploymentStatus::DpuExtensionServiceRunning;
}
if any_exited {
if has_exited_with_error {
return rpc::DpuExtensionServiceDeploymentStatus::DpuExtensionServiceError;
}
if all_running_or_exited {
return rpc::DpuExtensionServiceDeploymentStatus::DpuExtensionServiceRunning;
}
if any_created {
return rpc::DpuExtensionServiceDeploymentStatus::DpuExtensionServicePending;
}
Expand All @@ -444,6 +445,54 @@ impl KubernetesPodServicesHandler {
}
}

/// Inspect a container and return its exit code.
async fn get_container_exit_code(&self, container_id: &str) -> eyre::Result<i64> {
// Fast path: ask crictl for only the exit code to avoid huge inspect JSON.
let output = TokioCommand::new("crictl")
.args([
"inspect",
"--output",
"go-template",
"--template",
"{{.status.exitCode}}",
container_id,
])
.output()
.await
.wrap_err("Failed to inspect container with go-template")?;

if output.status.success() {
let stdout = String::from_utf8(output.stdout)
.wrap_err("Failed to parse go-template inspect output")?;
if let Ok(exit_code) = stdout.trim().parse::<i64>() {
return Ok(exit_code);
}
} else {
let stderr = String::from_utf8_lossy(&output.stderr);
tracing::debug!(
"go-template inspect failed for container {}: {}",
container_id,
stderr
);
}

// Fallback for environments where go-template is unavailable or output format differs.
let container = Self::crictl_output(&["inspect", container_id])
.await
.wrap_err("Failed to inspect container")?;

container
.get("status")
.and_then(|s| s.get("exitCode"))
.and_then(|v| v.as_i64())
.ok_or_else(|| {
eyre::eyre!(
"Container inspect output missing numeric status.exitCode for container {}",
container_id
)
})
}

/// Inspect the pod sandbox status from the crictl output
async fn get_pod_sandbox_status(&self, pod_id: &str) -> Result<String> {
let pod = Self::crictl_output(&["inspectp", pod_id])
Expand All @@ -462,16 +511,36 @@ impl KubernetesPodServicesHandler {
Ok(self.parse_pod_state(pod_state).to_string())
}

/// Build the error message for a service status based on pod/container states
fn build_service_error_message(
&self,
pod_state: &str,
containers_with_issues: &[String],
service_error: Option<&str>,
) -> String {
let mut parts = vec![format!("pod state: {}", pod_state)];

if !containers_with_issues.is_empty() {
parts.push(format!(
"containers with issues: {}",
containers_with_issues.join(", ")
));
}

if let Some(service_error) = service_error {
parts.push(format!("deployment error: {}", service_error));
}

parts.join("; ")
}

/// Determine the overall status of the pod from crictl
async fn get_pod_status(
&self,
service: &ServiceConfig,
) -> Result<rpc::DpuExtensionServiceStatusObservation> {
let expected_deploy = service.removed.is_none();

let _pod_spec_path =
self.get_pod_spec_path(&service.id.to_string(), service.version.version_nr());

// Find the pod ID for the service using the label we injected into the pod spec
let pod_id = match self.find_pod_id(service).await {
Ok(Some(pod_id)) => pod_id,
Expand Down Expand Up @@ -530,15 +599,43 @@ impl KubernetesPodServicesHandler {
let images = container::Images::list().await.ok();

// Get the containers for the pod
let container_list = container::Containers::list_pod(&pod_id).await?;
let containers = container_list.containers;
let containers = container::Containers::list_pod(&pod_id)
.await?
.filter_by_latest_attempt()
.containers;
let mut components = Vec::with_capacity(containers.len());
let mut container_statuses = Vec::with_capacity(containers.len());

// For displaying the error message and status aggregation
let mut has_exited_with_error = false;
let mut containers_with_issues = Vec::new();

for container in containers {
let container_name = container.metadata.name;

let container_state = self.parse_state(container.state).to_string();
container_statuses.push(container_state.to_string());
if container_state == "EXITED" {
match self.get_container_exit_code(&container.id).await {
Ok(exit_code) if exit_code != 0 => {
has_exited_with_error = true;
containers_with_issues.push(format!(
"{} (state: EXITED, exit code: {})",
container_name, exit_code
));
}
Ok(_) => {}
Err(e) => {
has_exited_with_error = true;
containers_with_issues.push(format!(
"{} (state: EXITED, exit code unknown: {})",
container_name, e
));
}
}
} else if container_state == "UNKNOWN" {
containers_with_issues.push(format!("{} (state: UNKNOWN)", container_name));
}

let image_id = container.image.id;
let (image_url, image_version) = match images.as_ref() {
Expand Down Expand Up @@ -568,15 +665,20 @@ impl KubernetesPodServicesHandler {
}

// Aggregate overall state
let state_enum = self.aggregate_status(&pod_state, &container_statuses, expected_deploy);
let state_enum = self.aggregate_status(
&pod_state,
&container_statuses,
has_exited_with_error,
expected_deploy,
);

let err_message = match self
// Build the error message
let service_error = self
.service_errors
.get(&(service.id.to_string(), service.version.version_nr()))
{
Some(e) => e.to_string(),
None => format!("pod state: {}", pod_state),
};
.map(|s| s.as_str());
let err_message =
self.build_service_error_message(&pod_state, &containers_with_issues, service_error);

Ok(rpc::DpuExtensionServiceStatusObservation {
service_id: service.id.to_string(),
Expand Down Expand Up @@ -1426,7 +1528,7 @@ spec:
let handler = KubernetesPodServicesHandler::default();
let statuses = vec!["RUNNING".to_string(), "RUNNING".to_string()];

let status = handler.aggregate_status("SANDBOX_READY", &statuses, true);
let status = handler.aggregate_status("SANDBOX_READY", &statuses, false, true);
assert_eq!(
status,
rpc::DpuExtensionServiceDeploymentStatus::DpuExtensionServiceRunning
Expand All @@ -1439,20 +1541,32 @@ spec:
let statuses: Vec<String> = vec![];

// Expected to be deployed: should be PENDING
let status = handler.aggregate_status("SANDBOX_READY", &statuses, true);
let status = handler.aggregate_status("SANDBOX_READY", &statuses, false, true);
assert_eq!(
status,
rpc::DpuExtensionServiceDeploymentStatus::DpuExtensionServicePending
);

// Not expected to be deployed: should be TERMINATED
let status = handler.aggregate_status("SANDBOX_READY", &statuses, false);
let status = handler.aggregate_status("SANDBOX_READY", &statuses, false, false);
assert_eq!(
status,
rpc::DpuExtensionServiceDeploymentStatus::DpuExtensionServiceTerminated
);
}

#[test]
fn test_k8s_pod_handler_aggregate_status_exited_zero_is_running() {
let handler = KubernetesPodServicesHandler::default();
let statuses = vec!["RUNNING".to_string(), "EXITED".to_string()];

let status = handler.aggregate_status("SANDBOX_READY", &statuses, false, true);
assert_eq!(
status,
rpc::DpuExtensionServiceDeploymentStatus::DpuExtensionServiceRunning
);
}

#[test]
fn test_observability_config() {
let configs = DpuExtensionServiceObservability {
Expand Down
Loading