diff --git a/crates/agent/src/containerd/container.rs b/crates/agent/src/containerd/container.rs index 667718f2d3..1067cbb555 100644 --- a/crates/agent/src/containerd/container.rs +++ b/crates/agent/src/containerd/container.rs @@ -186,6 +186,28 @@ impl Containers { Ok(Containers { containers }) } + /// Keep only the latest attempt for each container name. + /// + /// `crictl ps -a` returns historical attempts; this function only keeps latest attempt. + pub fn filter_by_latest_attempt(self) -> Self { + let mut latest_by_name: HashMap = HashMap::new(); + + for container in self.containers { + let name = container.metadata.name.clone(); + if let Some(existing) = latest_by_name.get(&name) + && container.metadata.attempt <= existing.metadata.attempt + { + continue; + } + latest_by_name.insert(name, container); + } + + let mut containers: Vec<_> = latest_by_name.into_values().collect(); + containers.sort_by(|a, b| a.metadata.name.cmp(&b.metadata.name)); + + Containers { containers } + } + pub fn find_by_name(self, name: T) -> eyre::Result where T: AsRef, @@ -345,4 +367,78 @@ mod tests { let filtered = container_images.find_by_name("doca_hbn").unwrap(); assert_eq!(filtered.names[0].version(), "2.3.0-doca2.8.0".to_string()); } + + #[test] + fn test_filter_container_by_latest_attempt() { + let old_exited = ContainerSummary { + id: "old".to_string(), + sandbox_id: "pod1".to_string(), + metadata: ContainerMetadata { + name: "svc-a".to_string(), + attempt: 0, + }, + image: ContainerImage { + id: "img".to_string(), + annotations: HashMap::new(), + }, + image_ref: Vec::new(), + state: ContainerState::Exited, + created_at: "1".to_string(), + labels: HashMap::new(), + annotations: HashMap::new(), + }; + + let new_running = ContainerSummary { + id: "new".to_string(), + sandbox_id: "pod1".to_string(), + metadata: ContainerMetadata { + name: "svc-a".to_string(), + attempt: 1, + }, + image: ContainerImage { + id: "img".to_string(), + annotations: HashMap::new(), + }, + image_ref: Vec::new(), + state: ContainerState::Running, + created_at: "2".to_string(), + labels: HashMap::new(), + annotations: HashMap::new(), + }; + + let unaffected = ContainerSummary { + id: "other".to_string(), + sandbox_id: "pod1".to_string(), + metadata: ContainerMetadata { + name: "svc-b".to_string(), + attempt: 0, + }, + image: ContainerImage { + id: "img2".to_string(), + annotations: HashMap::new(), + }, + image_ref: Vec::new(), + state: ContainerState::Running, + created_at: "3".to_string(), + labels: HashMap::new(), + annotations: HashMap::new(), + }; + + let got = Containers { + containers: vec![old_exited, new_running, unaffected], + } + .filter_by_latest_attempt(); + + assert_eq!(got.containers.len(), 2); + assert!( + got.containers + .iter() + .any(|c| c.metadata.name == "svc-a" && c.metadata.attempt == 1) + ); + assert!( + got.containers + .iter() + .any(|c| c.metadata.name == "svc-b" && c.metadata.attempt == 0) + ); + } } diff --git a/crates/agent/src/extension_services/k8s_pod_handler.rs b/crates/agent/src/extension_services/k8s_pod_handler.rs index e7353eb601..973bb46937 100644 --- a/crates/agent/src/extension_services/k8s_pod_handler.rs +++ b/crates/agent/src/extension_services/k8s_pod_handler.rs @@ -376,6 +376,7 @@ impl KubernetesPodServicesHandler { /// /// Inputs: /// - `statuses`: normalized container states (e.g. "RUNNING", "EXITED", "CREATED", "UNKNOWN"). + /// - `has_exited_with_error`: true when any EXITED container has non-zero exit code. /// - `expected_deploy`: whether we expect the service to be up (deployed) now. /// /// Rules: @@ -383,8 +384,8 @@ impl KubernetesPodServicesHandler { /// - expected -> PENDING /// - not expected -> TERMINATED /// - When expected to be deployed: - /// - all RUNNING -> RUNNING - /// - any EXITED -> ERROR + /// - all containers RUNNING or clean EXITED -> RUNNING + /// - any EXITED with non-zero exit code -> ERROR /// - any CREATED or UNKNOWN -> PENDING /// - otherwise -> PENDING /// - When NOT expected to be deployed (we expect it to be gone): @@ -395,6 +396,7 @@ impl KubernetesPodServicesHandler { &self, pod_status: &str, statuses: &[String], + has_exited_with_error: bool, expected_deploy: bool, ) -> rpc::DpuExtensionServiceDeploymentStatus { if statuses.is_empty() { @@ -407,21 +409,20 @@ impl KubernetesPodServicesHandler { }; } - let all_running = statuses.iter().all(|s| s == "RUNNING"); let all_exited = statuses.iter().all(|s| s == "EXITED"); + let all_running_or_exited = statuses.iter().all(|s| s == "RUNNING" || s == "EXITED"); let any_running = statuses.iter().any(|s| s == "RUNNING"); - let any_exited = statuses.iter().any(|s| s == "EXITED"); let any_created = statuses.iter().any(|s| s == "CREATED"); let any_unknown = statuses.iter().any(|s| s == "UNKNOWN"); if expected_deploy { - if all_running { - return rpc::DpuExtensionServiceDeploymentStatus::DpuExtensionServiceRunning; - } - if any_exited { + if has_exited_with_error { return rpc::DpuExtensionServiceDeploymentStatus::DpuExtensionServiceError; } + if all_running_or_exited { + return rpc::DpuExtensionServiceDeploymentStatus::DpuExtensionServiceRunning; + } if any_created { return rpc::DpuExtensionServiceDeploymentStatus::DpuExtensionServicePending; } @@ -444,6 +445,54 @@ impl KubernetesPodServicesHandler { } } + /// Inspect a container and return its exit code. + async fn get_container_exit_code(&self, container_id: &str) -> eyre::Result { + // Fast path: ask crictl for only the exit code to avoid huge inspect JSON. + let output = TokioCommand::new("crictl") + .args([ + "inspect", + "--output", + "go-template", + "--template", + "{{.status.exitCode}}", + container_id, + ]) + .output() + .await + .wrap_err("Failed to inspect container with go-template")?; + + if output.status.success() { + let stdout = String::from_utf8(output.stdout) + .wrap_err("Failed to parse go-template inspect output")?; + if let Ok(exit_code) = stdout.trim().parse::() { + return Ok(exit_code); + } + } else { + let stderr = String::from_utf8_lossy(&output.stderr); + tracing::debug!( + "go-template inspect failed for container {}: {}", + container_id, + stderr + ); + } + + // Fallback for environments where go-template is unavailable or output format differs. + let container = Self::crictl_output(&["inspect", container_id]) + .await + .wrap_err("Failed to inspect container")?; + + container + .get("status") + .and_then(|s| s.get("exitCode")) + .and_then(|v| v.as_i64()) + .ok_or_else(|| { + eyre::eyre!( + "Container inspect output missing numeric status.exitCode for container {}", + container_id + ) + }) + } + /// Inspect the pod sandbox status from the crictl output async fn get_pod_sandbox_status(&self, pod_id: &str) -> Result { let pod = Self::crictl_output(&["inspectp", pod_id]) @@ -462,6 +511,29 @@ impl KubernetesPodServicesHandler { Ok(self.parse_pod_state(pod_state).to_string()) } + /// Build the error message for a service status based on pod/container states + fn build_service_error_message( + &self, + pod_state: &str, + containers_with_issues: &[String], + service_error: Option<&str>, + ) -> String { + let mut parts = vec![format!("pod state: {}", pod_state)]; + + if !containers_with_issues.is_empty() { + parts.push(format!( + "containers with issues: {}", + containers_with_issues.join(", ") + )); + } + + if let Some(service_error) = service_error { + parts.push(format!("deployment error: {}", service_error)); + } + + parts.join("; ") + } + /// Determine the overall status of the pod from crictl async fn get_pod_status( &self, @@ -469,9 +541,6 @@ impl KubernetesPodServicesHandler { ) -> Result { let expected_deploy = service.removed.is_none(); - let _pod_spec_path = - self.get_pod_spec_path(&service.id.to_string(), service.version.version_nr()); - // Find the pod ID for the service using the label we injected into the pod spec let pod_id = match self.find_pod_id(service).await { Ok(Some(pod_id)) => pod_id, @@ -530,15 +599,43 @@ impl KubernetesPodServicesHandler { let images = container::Images::list().await.ok(); // Get the containers for the pod - let container_list = container::Containers::list_pod(&pod_id).await?; - let containers = container_list.containers; + let containers = container::Containers::list_pod(&pod_id) + .await? + .filter_by_latest_attempt() + .containers; let mut components = Vec::with_capacity(containers.len()); let mut container_statuses = Vec::with_capacity(containers.len()); + + // For displaying the error message and status aggregation + let mut has_exited_with_error = false; + let mut containers_with_issues = Vec::new(); + for container in containers { let container_name = container.metadata.name; let container_state = self.parse_state(container.state).to_string(); container_statuses.push(container_state.to_string()); + if container_state == "EXITED" { + match self.get_container_exit_code(&container.id).await { + Ok(exit_code) if exit_code != 0 => { + has_exited_with_error = true; + containers_with_issues.push(format!( + "{} (state: EXITED, exit code: {})", + container_name, exit_code + )); + } + Ok(_) => {} + Err(e) => { + has_exited_with_error = true; + containers_with_issues.push(format!( + "{} (state: EXITED, exit code unknown: {})", + container_name, e + )); + } + } + } else if container_state == "UNKNOWN" { + containers_with_issues.push(format!("{} (state: UNKNOWN)", container_name)); + } let image_id = container.image.id; let (image_url, image_version) = match images.as_ref() { @@ -568,15 +665,20 @@ impl KubernetesPodServicesHandler { } // Aggregate overall state - let state_enum = self.aggregate_status(&pod_state, &container_statuses, expected_deploy); + let state_enum = self.aggregate_status( + &pod_state, + &container_statuses, + has_exited_with_error, + expected_deploy, + ); - let err_message = match self + // Build the error message + let service_error = self .service_errors .get(&(service.id.to_string(), service.version.version_nr())) - { - Some(e) => e.to_string(), - None => format!("pod state: {}", pod_state), - }; + .map(|s| s.as_str()); + let err_message = + self.build_service_error_message(&pod_state, &containers_with_issues, service_error); Ok(rpc::DpuExtensionServiceStatusObservation { service_id: service.id.to_string(), @@ -1426,7 +1528,7 @@ spec: let handler = KubernetesPodServicesHandler::default(); let statuses = vec!["RUNNING".to_string(), "RUNNING".to_string()]; - let status = handler.aggregate_status("SANDBOX_READY", &statuses, true); + let status = handler.aggregate_status("SANDBOX_READY", &statuses, false, true); assert_eq!( status, rpc::DpuExtensionServiceDeploymentStatus::DpuExtensionServiceRunning @@ -1439,20 +1541,32 @@ spec: let statuses: Vec = vec![]; // Expected to be deployed: should be PENDING - let status = handler.aggregate_status("SANDBOX_READY", &statuses, true); + let status = handler.aggregate_status("SANDBOX_READY", &statuses, false, true); assert_eq!( status, rpc::DpuExtensionServiceDeploymentStatus::DpuExtensionServicePending ); // Not expected to be deployed: should be TERMINATED - let status = handler.aggregate_status("SANDBOX_READY", &statuses, false); + let status = handler.aggregate_status("SANDBOX_READY", &statuses, false, false); assert_eq!( status, rpc::DpuExtensionServiceDeploymentStatus::DpuExtensionServiceTerminated ); } + #[test] + fn test_k8s_pod_handler_aggregate_status_exited_zero_is_running() { + let handler = KubernetesPodServicesHandler::default(); + let statuses = vec!["RUNNING".to_string(), "EXITED".to_string()]; + + let status = handler.aggregate_status("SANDBOX_READY", &statuses, false, true); + assert_eq!( + status, + rpc::DpuExtensionServiceDeploymentStatus::DpuExtensionServiceRunning + ); + } + #[test] fn test_observability_config() { let configs = DpuExtensionServiceObservability {