diff --git a/Cargo.lock b/Cargo.lock index 04bce118bb..f5fe91053d 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -521,6 +521,7 @@ dependencies = [ "tpm", "tracer", "vm-memory", + "vm-migration", "vmm", "vmm-sys-util", "wait-timeout", diff --git a/cloud-hypervisor/Cargo.toml b/cloud-hypervisor/Cargo.toml index 426a522635..6f58723fda 100644 --- a/cloud-hypervisor/Cargo.toml +++ b/cloud-hypervisor/Cargo.toml @@ -36,6 +36,7 @@ thiserror = { workspace = true } tpm = { path = "../tpm" } tracer = { path = "../tracer" } vm-memory = { workspace = true } +vm-migration = { path = "../vm-migration" } vmm = { path = "../vmm" } vmm-sys-util = { workspace = true } zbus = { version = "5.7.1", optional = true } diff --git a/cloud-hypervisor/src/bin/ch-remote.rs b/cloud-hypervisor/src/bin/ch-remote.rs index 42b6e05476..6af932536e 100644 --- a/cloud-hypervisor/src/bin/ch-remote.rs +++ b/cloud-hypervisor/src/bin/ch-remote.rs @@ -13,15 +13,18 @@ use std::num::NonZeroU32; use std::os::unix::net::UnixStream; use std::path::PathBuf; use std::process; +use std::thread::sleep; +use std::time::Duration; use api_client::{ - Error as ApiClientError, simple_api_command, simple_api_command_with_fds, - simple_api_full_command, + Error as ApiClientError, StatusCode, simple_api_command, simple_api_command_with_fds, + simple_api_full_command, simple_api_full_command_and_response, }; use clap::{Arg, ArgAction, ArgMatches, Command}; -use log::error; +use log::{error, info}; use option_parser::{ByteSized, ByteSizedParseError}; use thiserror::Error; +use vm_migration::progress::{MigrationProgress, MigrationState}; use vmm::config::RestoreConfig; use vmm::vm_config::{ DeviceConfig, DiskConfig, FsConfig, NetConfig, PmemConfig, UserDeviceConfig, VdpaConfig, @@ -303,6 +306,8 @@ fn rest_api_do_command(matches: &ArgMatches, socket: &mut UnixStream) -> ApiResu Some("shutdown") => { simple_api_command(socket, "PUT", "shutdown", None).map_err(Error::HttpApiClient) } + Some("migration-progress") => simple_api_command(socket, "GET", "migration-progress", None) + .map_err(Error::HttpApiClient), Some("nmi") => simple_api_command(socket, "PUT", "nmi", None).map_err(Error::HttpApiClient), Some("resize") => { let resize = resize_config( @@ -522,8 +527,65 @@ fn rest_api_do_command(matches: &ArgMatches, socket: &mut UnixStream) -> ApiResu .unwrap() .get_one::("tls-dir") .cloned(), + true, ); + simple_api_command(socket, "PUT", "send-migration", Some(&send_migration_data)) + .map_err(Error::HttpApiClient)?; + + // Wait for migration to finish + loop { + let response = simple_api_full_command_and_response( + socket, + "GET", + "vm.migration-progress", + None, + ) + .map_err(Error::HttpApiClient)? + // should have response + .ok_or(Error::HttpApiClient(ApiClientError::ServerResponse( + StatusCode::Ok, + None, + )))?; + + assert_ne!( + response, "null", + "migration progress should be there immediately when the migration was dispatched" + ); + + let progress = serde_json::from_slice::(response.as_bytes()) + .map_err(|e| { + error!("failed to parse response as MigrationProgress: {e}"); + Error::HttpApiClient(ApiClientError::ServerResponse( + StatusCode::Ok, + Some(response), + )) + })?; + + match progress.state { + MigrationState::Cancelled { .. } => { + info!("Migration was cancelled"); + break; + } + MigrationState::Failed { + error_msg, + error_msg_debug, + } => { + error!("Migration failed! {error_msg}\n{error_msg_debug}"); + break; + } + MigrationState::Finished { .. } => { + info!("Migration finished successfully"); + break; + } + MigrationState::Ongoing { .. } => { + sleep(Duration::from_millis(50)); + continue; + } + } + } + + simple_api_full_command(socket, "PUT", "vmm.shutdown", None) .map_err(Error::HttpApiClient) } Some("receive-migration") => { @@ -963,6 +1025,7 @@ fn send_migration_data( migration_timeout: u64, connections: NonZeroU32, tls_dir: Option, + keep_alive: bool, ) -> String { let send_migration_data = vmm::api::VmSendMigrationData { destination_url: url, @@ -971,6 +1034,7 @@ fn send_migration_data( migration_timeout, connections, tls_dir, + keep_alive, }; serde_json::to_string(&send_migration_data).unwrap() @@ -1072,6 +1136,7 @@ fn get_cli_commands_sorted() -> Box<[Command]> { .arg(Arg::new("path").index(1).default_value("-")), Command::new("delete").about("Delete a VM"), Command::new("info").about("Info on the VM"), + Command::new("migration-progress"), Command::new("nmi").about("Trigger NMI"), Command::new("pause").about("Pause the VM"), Command::new("ping").about("Ping the VMM to check for API server availability"), diff --git a/vm-migration/src/lib.rs b/vm-migration/src/lib.rs index 7ae78eaf24..832531af5a 100644 --- a/vm-migration/src/lib.rs +++ b/vm-migration/src/lib.rs @@ -9,10 +9,12 @@ use thiserror::Error; use crate::protocol::MemoryRangeTable; -mod bitpos_iterator; +pub mod progress; pub mod protocol; pub mod tls; +mod bitpos_iterator; + #[derive(Error, Debug)] pub enum MigratableError { #[error("Failed to pause migratable component")] diff --git a/vm-migration/src/progress.rs b/vm-migration/src/progress.rs new file mode 100644 index 0000000000..39895b09df --- /dev/null +++ b/vm-migration/src/progress.rs @@ -0,0 +1,424 @@ +// Copyright © 2025 Cyberus Technology GmbH +// +// SPDX-License-Identifier: Apache-2.0 + +//! Module for reporting of the live-migration progress. +//! +//! The main export is [`MigrationProgress`]. +//! +//! # Motivation +//! +//! Monitoring a live-migration is important for debugging of cloud deployments, +//! for cloud monitoring in general, and for network optimization, such as +//! verifying the throughput for the migration is as high as expected. +//! +//! It also helps to analyze the downtime of VMs and see how much pressure a +//! guest is putting on its memory (by writing), which is slowing down +//! migrations. + +use std::error::Error; +use std::fmt; +use std::fmt::Display; +use std::num::NonZeroU32; +use std::time::{Duration, SystemTime, UNIX_EPOCH}; + +#[derive( + Clone, Debug, PartialOrd, Ord, PartialEq, Eq, Hash, serde::Serialize, serde::Deserialize, +)] +pub enum TransportationMode { + Local, + Tcp { connections: NonZeroU32, tls: bool }, +} + +/// Carries information about the transmission of the VM's memory. +#[derive( + Clone, + Copy, + Debug, + Default, + PartialOrd, + Ord, + PartialEq, + Eq, + Hash, + serde::Serialize, + serde::Deserialize, +)] +pub struct MemoryTransmissionInfo { + /// The memory iteration (only in precopy mode). + pub memory_iteration: u64, + /// Memory bytes per second. + pub memory_transmission_bps: u64, + /// The total size of the VMs memory in bytes. + pub memory_bytes_total: u64, + /// The total size of transmitted bytes. + pub memory_bytes_transmitted: u64, + /// The amount of remaining bytes for this iteration. + pub memory_bytes_remaining_iteration: u64, + /// The amount of transmitted 4k pages. + pub memory_pages_4k_transmitted: u64, + /// The amount of remaining 4k pages for this iteration. + pub memory_pages_4k_remaining_iteration: u64, + /// The amount of constant pages for that we could take a shortcut. + /// Pages where all bits are either zero or one. + pub memory_pages_constant_count: u64, + /// Current memory dirty rate in pages per seconds (pps). + pub memory_dirty_rate_pps: u64, +} + +/// The different phases of an ongoing ([`MigrationState::Ongoing`]) migration +/// (good case). +/// +/// The states correspond to the [live-migration protocol]. +/// +/// [live-migration protocol]: super::protocol +#[derive( + Clone, Debug, PartialOrd, Ord, PartialEq, Eq, Hash, serde::Serialize, serde::Deserialize, +)] +pub enum MigrationStateOngoingPhase { + /// The migration starts. Handshake and transfer of VM config. + Starting, + /// Transfer of memory FDs. + /// + /// Only used for local migrations. + MemoryFds, + /// Transfer of VM memory in precopy mode. + /// + /// Not used for local migrations. + MemoryPrecopy, + // TODO eventually add MemoryPostcopy here + /// The VM migration is completing. This means the last chunks of memory + /// are transmitted as well as the final VM state (vCPUs, devices). + Completing, +} + +impl Display for MigrationStateOngoingPhase { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + match self { + Self::Starting => write!(f, "starting"), + Self::MemoryFds => write!(f, "memory FDs"), + Self::MemoryPrecopy => write!(f, "memory (precopy)"), + Self::Completing => write!(f, "completing"), + } + } +} + +/// The different states of a migration, covering steady progress and failure. +#[derive( + Clone, Debug, PartialOrd, Ord, PartialEq, Eq, Hash, serde::Serialize, serde::Deserialize, +)] +pub enum MigrationState { + /// The migration has been cancelled. + Cancelled {}, + /// The migration has failed. + Failed { + /// Stringified error. + error_msg: String, + /// Debug-stringified error. + error_msg_debug: String, + // TODO this is very tricky because I need clone() + // error: Box, + }, + /// The migration has finished successfully. + Finished {}, + /// The migration is ongoing. + Ongoing { + phase: MigrationStateOngoingPhase, + /// Percent in range `0..=100`. + vcpu_throttle_percent: u8, + }, +} + +impl Display for MigrationState { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + match self { + MigrationState::Cancelled { .. } => write!(f, "{}", self.state_name()), + MigrationState::Failed { error_msg, .. } => { + write!(f, "{}: {error_msg}", self.state_name()) + } + MigrationState::Finished { .. } => write!(f, "{}", self.state_name()), + MigrationState::Ongoing { + phase, + vcpu_throttle_percent, + } => write!( + f, + "{}: phase={phase}, vcpu_throttle={vcpu_throttle_percent}", + self.state_name() + ), + } + } +} + +impl MigrationState { + fn state_name(&self) -> &'static str { + match self { + MigrationState::Cancelled { .. } => "cancelled", + MigrationState::Failed { .. } => "failed", + MigrationState::Finished { .. } => "finished", + MigrationState::Ongoing { .. } => "ongoing", + } + } +} + +/// Returns the current UNIX timestamp in ms. +fn current_unix_timestamp_ms() -> u64 { + SystemTime::now() + .duration_since(UNIX_EPOCH) + .expect("should be valid duration") + .as_millis() as u64 +} + +/// Holds a snapshot of progress and status information for an ongoing live +/// migration, or the last snapshot of a canceled or aborted migration. +/// +/// This type carries insightful information for every step of the +/// [live-migration protocol] in a way that makes it easy for API users to +/// parse the data with ease while retaining all important information. +/// +/// [live-migration protocol]: super::protocol +#[derive( + Clone, Debug, PartialOrd, Ord, PartialEq, Eq, Hash, serde::Serialize, serde::Deserialize, +)] +pub struct MigrationProgress { + /// UNIX timestamp of the start of the live-migration process in ms. + pub timestamp_begin_ms: u64, + /// UNIX timestamp of the current snapshot in ms. + pub timestamp_snapshot_ms: u64, + /// Relative timestamp since the beginning of the migration in ms. + pub timestamp_snapshot_relative_ms: u64, + /// Configured target downtime. + pub downtime_configured_ms: u64, + /// Currently estimated (computed) downtime given the remaining + /// transmissions and the bandwidth. + /// + /// If this is `0`, the downtime could not yet be calculated. + pub downtime_estimated_ms: u64, + /// Requested transportation mode. + pub transportation_mode: TransportationMode, + /// Snapshot of the current phase. + pub state: MigrationState, + /// Latest [`MemoryTransmissionInfo`] info, if any. + /// + /// The most interesting phase is when current state is + /// [`MigrationState::Ongoing`] and [`MigrationStateOngoingPhase::MemoryPrecopy`] + /// as this value will be updated frequently. + pub memory_transmission_info: MemoryTransmissionInfo, +} + +impl MigrationProgress { + /// Creates new progress in a valid init state. + /// + /// This progress must be updated using any of: + /// - [`Self::update`] + /// - [`Self::mark_as_finished`] + /// - [`Self::mark_as_failed`] + /// - [`Self::mark_as_cancelled`] + pub fn new(transportation_mode: TransportationMode, target_downtime: Duration) -> Self { + let timestamp = current_unix_timestamp_ms(); + Self { + timestamp_begin_ms: timestamp, + timestamp_snapshot_ms: timestamp, + timestamp_snapshot_relative_ms: 0, + downtime_configured_ms: target_downtime.as_millis() as u64, + downtime_estimated_ms: 0, + transportation_mode, + state: MigrationState::Ongoing { + phase: MigrationStateOngoingPhase::Starting, + vcpu_throttle_percent: 0, + }, + memory_transmission_info: MemoryTransmissionInfo::default(), + } + } + + /// Updates the state of an ongoing migration. + /// + /// Only updates new values that are provided via `Some`. + /// + /// # Arguments + /// + /// - `new_phase`: The current [`MigrationStateOngoingPhase`]. + /// - `new_memory_transmission_info`: If `Some`, the current [`MemoryTransmissionInfo`]. + /// - `new_cpu_throttle_percent`: If `Some`, the current value of the vCPU throttle percentage. + /// Must be in range `0..=100`. + /// - `new_estimated_downtime`: If `Some`, the latest expected (calculated) downtime. + pub fn update( + &mut self, + new_phase: MigrationStateOngoingPhase, + new_memory_transmission_info: Option, + new_cpu_throttle_percent: Option, + new_estimated_downtime: Option, + ) { + if let Some(percent) = new_cpu_throttle_percent { + assert!(percent <= 100); + } + + if let Some(downtime) = new_estimated_downtime { + self.downtime_estimated_ms = u64::try_from(downtime.as_millis()).unwrap(); + } else { + // This is better than showing `0` and it is likely close to the final actual downtime. + self.downtime_estimated_ms = self.downtime_configured_ms; + } + + match &self.state { + MigrationState::Ongoing { + phase: _old_phase, + vcpu_throttle_percent: old_vcpu_throttle_percent, + } => { + self.timestamp_snapshot_ms = current_unix_timestamp_ms(); + self.timestamp_snapshot_relative_ms = + self.timestamp_snapshot_ms - self.timestamp_begin_ms; + + self.memory_transmission_info = + new_memory_transmission_info.unwrap_or(self.memory_transmission_info); + self.state = MigrationState::Ongoing { + phase: new_phase, + vcpu_throttle_percent: new_cpu_throttle_percent + .unwrap_or(*old_vcpu_throttle_percent), + }; + } + illegal => { + // panic is fine as we have a logic error here, nothing that was caused by a user. + panic!( + "illegal state transition: {} -> ongoing", + illegal.state_name(), + ); + } + } + } + + /// Sets the underlying state to [`MigrationState::Cancelled`] and + /// updates all corresponding metadata. + /// + /// After this state change, the object is supposed to be handled as immutable. + pub fn mark_as_cancelled(&mut self) { + if !matches!(self.state, MigrationState::Ongoing { .. }) { + panic!( + "illegal state transition: {} -> cancelled", + self.state.state_name() + ); + } + self.timestamp_snapshot_ms = current_unix_timestamp_ms(); + self.timestamp_snapshot_relative_ms = self.timestamp_snapshot_ms - self.timestamp_begin_ms; + self.state = MigrationState::Cancelled {}; + } + + /// Sets the underlying state to [`MigrationState::Failed`] and + /// updates all corresponding metadata. + /// + /// After this state change, the object is supposed to be handled as immutable. + pub fn mark_as_failed(&mut self, error: &dyn Error) { + if !matches!(self.state, MigrationState::Ongoing { .. }) { + panic!( + "illegal state transition: {} -> failed", + self.state.state_name() + ); + } + self.timestamp_snapshot_ms = current_unix_timestamp_ms(); + self.timestamp_snapshot_relative_ms = self.timestamp_snapshot_ms - self.timestamp_begin_ms; + self.state = MigrationState::Failed { + error_msg: format!("{error}",), + error_msg_debug: format!("{error:?}",), + }; + } + + /// Sets the underlying state to [`MigrationState::Finished`] and + /// updates all corresponding metadata. + /// + /// After this state change, the object is supposed to be handled as immutable. + pub fn mark_as_finished(&mut self) { + if !matches!(self.state, MigrationState::Ongoing { .. }) { + panic!( + "illegal state transition: {} -> finished", + self.state.state_name() + ); + } + self.timestamp_snapshot_ms = current_unix_timestamp_ms(); + self.timestamp_snapshot_relative_ms = self.timestamp_snapshot_ms - self.timestamp_begin_ms; + self.state = MigrationState::Finished {}; + } +} + +#[cfg(test)] +mod tests { + use anyhow::anyhow; + + use super::*; + + // Helpful to see what the API will look like. + #[test] + fn print_json() { + let starting = MigrationProgress::new( + TransportationMode::Tcp { + connections: NonZeroU32::new(1).unwrap(), + tls: false, + }, + Duration::from_millis(100), + ); + let memory_precopy = { + let mut state = starting.clone(); + state.update( + MigrationStateOngoingPhase::MemoryPrecopy, + Some(MemoryTransmissionInfo { + memory_iteration: 7, + memory_transmission_bps: 0, + memory_bytes_total: 0x1337, + memory_bytes_transmitted: 0x1337, + memory_pages_4k_transmitted: 42, + memory_pages_4k_remaining_iteration: 42, + memory_bytes_remaining_iteration: 124, + memory_dirty_rate_pps: 42, + memory_pages_constant_count: 0, + }), + Some(42), + Some(Duration::from_millis(200)), + ); + state + }; + let completing = { + let mut state = memory_precopy.clone(); + state.update( + MigrationStateOngoingPhase::Completing, + None, + Some(99), + Some(Duration::from_millis(25)), + ); + state + }; + let completed = { + let mut state = completing.clone(); + state.mark_as_finished(); + state + }; + let failed = { + let mut state = completing.clone(); + let error = anyhow!("Some very bad error".to_string()); + let error: &dyn Error = error.as_ref(); + state.mark_as_failed(error); + state + }; + let cancelled = { + let mut state = completing.clone(); + state.mark_as_cancelled(); + state + }; + + let vals = [ + starting, + memory_precopy, + completing, + completed, + failed, + cancelled, + ]; + for val in vals { + println!("state: {}", val.state.state_name()); + println!("Rust: {val:?}"); + println!( + "serde_json: {}", + serde_json::to_string_pretty(&val).unwrap() + ); + println!(); + println!("================="); + } + } +} diff --git a/vmm/src/api/http/http_endpoint.rs b/vmm/src/api/http/http_endpoint.rs index 371692b8a6..27bf18c5bf 100644 --- a/vmm/src/api/http/http_endpoint.rs +++ b/vmm/src/api/http/http_endpoint.rs @@ -35,32 +35,22 @@ //! [special HTTP library]: https://github.com/firecracker-microvm/micro-http use std::fs::File; -use std::sync::mpsc::{Receiver, Sender, SyncSender}; -use std::sync::{LazyLock, Mutex}; +use std::sync::mpsc::Sender; use log::info; use micro_http::{Body, Method, Request, Response, StatusCode, Version}; use vmm_sys_util::eventfd::EventFd; -/// Helper to make the VmSendMigration call blocking as long as a migration is ongoing. -#[allow(clippy::type_complexity)] -pub static ONGOING_LIVEMIGRATION: LazyLock<( - SyncSender>, - Mutex>>, -)> = LazyLock::new(|| { - let (sender, receiver) = std::sync::mpsc::sync_channel(0); - (sender, Mutex::new(receiver)) -}); - #[cfg(all(target_arch = "x86_64", feature = "guest_debug"))] use crate::api::VmCoredump; use crate::api::http::http_endpoint::fds_helper::{attach_fds_to_cfg, attach_fds_to_cfgs}; use crate::api::http::{EndpointHandler, HttpError, error_response}; use crate::api::{ AddDisk, ApiAction, ApiError, ApiRequest, NetConfig, VmAddDevice, VmAddFs, VmAddNet, VmAddPmem, - VmAddUserDevice, VmAddVdpa, VmAddVsock, VmBoot, VmConfig, VmCounters, VmDelete, VmNmi, VmPause, - VmPowerButton, VmReboot, VmReceiveMigration, VmReceiveMigrationData, VmRemoveDevice, VmResize, - VmResizeDisk, VmResizeZone, VmRestore, VmResume, VmSendMigration, VmShutdown, VmSnapshot, + VmAddUserDevice, VmAddVdpa, VmAddVsock, VmBoot, VmConfig, VmCounters, VmDelete, + VmMigrationProgress, VmNmi, VmPause, VmPowerButton, VmReboot, VmReceiveMigration, + VmReceiveMigrationData, VmRemoveDevice, VmResize, VmResizeDisk, VmResizeZone, VmRestore, + VmResume, VmSendMigration, VmShutdown, VmSnapshot, }; use crate::config::RestoreConfig; use crate::cpu::Error as CpuError; @@ -507,26 +497,15 @@ impl PutHandler for VmSendMigration { _files: Vec, ) -> std::result::Result, HttpError> { if let Some(body) = body { - let res = self - .send( - api_notifier, - api_sender, - serde_json::from_slice(body.raw())?, - ) - .map_err(HttpError::ApiError)?; - - info!("live migration started"); - - let (_, receiver) = &*ONGOING_LIVEMIGRATION; - - info!("waiting for live migration result"); - let mig_res = receiver.lock().unwrap().recv().unwrap(); - info!("received live migration result"); - - // We forward the migration error here to the guest - mig_res - .map(|_| res) - .map_err(|e| HttpError::ApiError(ApiError::VmSendMigration(e))) + self.send( + api_notifier, + api_sender, + serde_json::from_slice(body.raw())?, + ) + .inspect(|_| { + info!("live migration started (in background)"); + }) + .map_err(HttpError::ApiError) } else { Err(HttpError::BadRequest) } @@ -708,6 +687,32 @@ impl EndpointHandler for VmmShutdown { } } +impl EndpointHandler for VmMigrationProgress { + fn handle_request( + &self, + req: &Request, + api_notifier: EventFd, + api_sender: Sender, + ) -> Response { + match req.method() { + Method::Get => match crate::api::VmMigrationProgress + .send(api_notifier, api_sender, ()) + .map_err(HttpError::ApiError) + { + Ok(info) => { + let mut response = Response::new(Version::Http11, StatusCode::OK); + let info_serialized = serde_json::to_string(&info).unwrap(); + + response.set_body(Body::new(info_serialized)); + response + } + Err(e) => error_response(e, StatusCode::InternalServerError), + }, + _ => error_response(HttpError::BadRequest, StatusCode::BadRequest), + } + } +} + #[cfg(test)] mod external_fds_tests { use super::*; diff --git a/vmm/src/api/http/mod.rs b/vmm/src/api/http/mod.rs index 2aa52e8e37..764bc608db 100644 --- a/vmm/src/api/http/mod.rs +++ b/vmm/src/api/http/mod.rs @@ -29,9 +29,9 @@ use self::http_endpoint::{VmActionHandler, VmCreate, VmInfo, VmmPing, VmmShutdow use crate::api::VmCoredump; use crate::api::{ AddDisk, ApiError, ApiRequest, VmAddDevice, VmAddFs, VmAddNet, VmAddPmem, VmAddUserDevice, - VmAddVdpa, VmAddVsock, VmBoot, VmCounters, VmDelete, VmNmi, VmPause, VmPowerButton, VmReboot, - VmReceiveMigration, VmRemoveDevice, VmResize, VmResizeDisk, VmResizeZone, VmRestore, VmResume, - VmSendMigration, VmShutdown, VmSnapshot, + VmAddVdpa, VmAddVsock, VmBoot, VmCounters, VmDelete, VmMigrationProgress, VmNmi, VmPause, + VmPowerButton, VmReboot, VmReceiveMigration, VmRemoveDevice, VmResize, VmResizeDisk, + VmResizeZone, VmRestore, VmResume, VmSendMigration, VmShutdown, VmSnapshot, }; use crate::landlock::Landlock; use crate::seccomp_filters::{Thread, get_seccomp_filter}; @@ -275,6 +275,10 @@ pub static HTTP_ROUTES: LazyLock = LazyLock::new(|| { endpoint!("/vm.shutdown"), Box::new(VmActionHandler::new(&VmShutdown)), ); + r.routes.insert( + endpoint!("/vm.migration-progress"), + Box::new(VmMigrationProgress {}), + ); r.routes.insert( endpoint!("/vm.snapshot"), Box::new(VmActionHandler::new(&VmSnapshot)), diff --git a/vmm/src/api/mod.rs b/vmm/src/api/mod.rs index a39fc8a477..4af2823616 100644 --- a/vmm/src/api/mod.rs +++ b/vmm/src/api/mod.rs @@ -43,6 +43,7 @@ use micro_http::Body; use serde::{Deserialize, Serialize}; use thiserror::Error; use vm_migration::MigratableError; +use vm_migration::progress::MigrationProgress; use vmm_sys_util::eventfd::EventFd; #[cfg(feature = "dbus_api")] @@ -203,6 +204,10 @@ pub enum ApiError { /// Error triggering NMI #[error("Error triggering NMI")] VmNmi(#[source] VmError), + + /// Error fetching the migration progress + #[error("Error fetching the migration progress")] + VmMigrationProgress(#[source] VmError), } pub type ApiResult = Result; @@ -295,6 +300,8 @@ pub struct VmSendMigrationData { /// Directory containing the TLS root CA certificate (ca-cert.pem) #[serde(default)] pub tls_dir: Option, + /// Keep the VMM alive. + pub keep_alive: bool, } // Default value for downtime the same as qemu. @@ -314,6 +321,9 @@ pub enum ApiResponsePayload { /// Virtual machine information VmInfo(VmInfoResponse), + /// The progress of a possibly ongoing live migration. + VmMigrationProgress(Box>), + /// Vmm ping response VmmPing(VmmPingResponse), @@ -399,6 +409,10 @@ pub trait RequestHandler { ) -> Result<(), MigratableError>; fn vm_nmi(&mut self) -> Result<(), VmError>; + + /// Returns the progress of the currently active migration or any previous + /// failed or canceled migration. + fn vm_migration_progress(&mut self) -> Option; } /// It would be nice if we could pass around an object like this: @@ -1531,3 +1545,42 @@ impl ApiAction for VmNmi { get_response_body(self, api_evt, api_sender, data) } } + +pub struct VmMigrationProgress; + +impl ApiAction for VmMigrationProgress { + type RequestBody = (); + type ResponseBody = Box>; + + fn request(&self, _: Self::RequestBody, response_sender: Sender) -> ApiRequest { + Box::new(move |vmm| { + info!("API request event: VmMigrationProgress"); + + let snapshot = Ok(vmm.vm_migration_progress()); + let response = snapshot + .map(Box::new) + .map(ApiResponsePayload::VmMigrationProgress) + .map_err(ApiError::VmMigrationProgress); + + response_sender + .send(response) + .map_err(VmmError::ApiResponseSend)?; + + Ok(false) + }) + } + + fn send( + &self, + api_evt: EventFd, + api_sender: Sender, + data: Self::RequestBody, + ) -> ApiResult { + let info = get_response(self, api_evt, api_sender, data)?; + + match info { + ApiResponsePayload::VmMigrationProgress(info) => Ok(info), + _ => Err(ApiError::ResponsePayloadType), + } + } +} diff --git a/vmm/src/lib.rs b/vmm/src/lib.rs index d8d013822d..aa35c530a3 100644 --- a/vmm/src/lib.rs +++ b/vmm/src/lib.rs @@ -56,6 +56,10 @@ use vm_memory::{ GuestAddress, GuestAddressSpace, GuestMemory, GuestMemoryAtomic, ReadVolatile, VolatileMemoryError, VolatileSlice, WriteVolatile, }; +use vm_migration::progress::{ + MemoryTransmissionInfo, MigrationProgress, MigrationState, MigrationStateOngoingPhase, + TransportationMode, +}; use vm_migration::protocol::*; use vm_migration::tls::{TlsConnectionWrapper, TlsStream, TlsStreamWrapper}; use vm_migration::{ @@ -65,7 +69,6 @@ use vmm_sys_util::eventfd::EventFd; use vmm_sys_util::signal::unblock_signal; use vmm_sys_util::sock_ctrl_msg::ScmSocket; -use crate::api::http::http_endpoint::ONGOING_LIVEMIGRATION; use crate::api::{ ApiRequest, ApiResponse, RequestHandler, VmInfoResponse, VmReceiveMigrationData, VmSendMigrationData, VmmPingResponse, @@ -281,6 +284,9 @@ impl From for EpollDispatch { } } +// TODO make this a member of Vmm? +static MIGRATION_PROGRESS_SNAPSHOT: Mutex> = Mutex::new(None); + enum SocketStream { Unix(UnixStream), Tcp(TcpStream), @@ -696,7 +702,7 @@ impl VmmVersionInfo { /// /// Is supposed to be updated on the fly. #[derive(Debug, Clone)] -struct MigrationState { +struct MigrationStateInternal { /* ---------------------------------------------- */ /* Properties that are updated before the first iteration */ /// The instant where the actual downtime of the VM began. @@ -742,7 +748,7 @@ struct MigrationState { migration_duration: Duration, } -impl MigrationState { +impl MigrationStateInternal { pub fn new() -> Self { Self { // Field will be overwritten later. @@ -814,7 +820,7 @@ impl MigrationWorker { } /// Perform the migration and communicate with the [`Vmm`] thread. - fn run(mut self) -> (Vm, result::Result<(), MigratableError>) { + fn run(mut self) -> (Vm, result::Result<(), MigratableError>, VmSendMigrationData) { debug!("migration thread is starting"); let res = self.migrate().inspect_err(|e| error!("migrate error: {e}")); @@ -823,7 +829,7 @@ impl MigrationWorker { self.check_migration_evt.write(1).unwrap(); debug!("migration thread is finished"); - (self.vm, res) + (self.vm, res, self.config) } } @@ -893,7 +899,9 @@ pub struct Vmm { /// Handle to the [`MigrationWorker`] thread. /// /// The handle will return the [`Vm`] back in any case. Further, the underlying error (if any) is returned. - migration_thread_handle: Option)>>, + #[allow(clippy::type_complexity)] + migration_thread_handle: + Option, VmSendMigrationData)>>, } /// Wait for a file descriptor to become readable. In this case, we return @@ -2110,7 +2118,7 @@ impl Vmm { Ok(()) } - fn can_increase_autoconverge_step(s: &MigrationState) -> bool { + fn can_increase_autoconverge_step(s: &MigrationStateInternal) -> bool { if s.iteration < AUTO_CONVERGE_ITERATION_DELAY { false } else { @@ -2127,11 +2135,39 @@ impl Vmm { vm: &mut Vm, mem_send: &SendAdditionalConnections, socket: &mut SocketStream, - s: &mut MigrationState, + s: &mut MigrationStateInternal, migration_timeout: Duration, migrate_downtime_limit: Duration, ) -> result::Result { let mut iteration_table; + let total_memory_size_bytes = vm + .memory_range_table()? + .regions() + .iter() + .map(|range| range.length) + .sum::(); + + let update_migration_progress = |s: &mut MigrationStateInternal, vm: &Vm| { + let mut lock = MIGRATION_PROGRESS_SNAPSHOT.lock().unwrap(); + lock.as_mut() + .expect("live migration should be ongoing") + .update( + MigrationStateOngoingPhase::MemoryPrecopy, + Some(MemoryTransmissionInfo { + memory_iteration: s.iteration, + memory_transmission_bps: s.bytes_per_sec as u64, + memory_bytes_total: total_memory_size_bytes, + memory_bytes_transmitted: s.total_transferred_bytes, + memory_pages_4k_transmitted: s.total_transferred_pages, + memory_pages_4k_remaining_iteration: s.pages_to_transmit, + memory_bytes_remaining_iteration: s.bytes_to_transmit, + memory_dirty_rate_pps: s.dirty_rate_pps, + memory_pages_constant_count: 0, /* TODO */ + }), + Some(vm.throttle_percent()), + s.calculated_downtime_duration, + ); + }; // We loop until we converge (target downtime is achievable). loop { @@ -2178,6 +2214,9 @@ impl Vmm { .sum(); s.pages_to_transmit = s.bytes_to_transmit.div_ceil(PAGE_SIZE as u64); + // Update before we might exit the loop. + update_migration_progress(s, vm); + // Unlikely happy-path. if s.bytes_to_transmit == 0 { break; @@ -2225,6 +2264,9 @@ impl Vmm { } } + // Update with new metrics before transmission. + update_migration_progress(s, vm); + // Send the current dirty pages s.transmit_start_time = Instant::now(); mem_send.send_memory(&iteration_table, socket)?; @@ -2240,13 +2282,16 @@ impl Vmm { s.iteration_duration = s.iteration_start_time.elapsed(); info!( - "iteration:{},cost={}ms,throttle={}%,transmitted={}MiB,dirty_rate={}pps,Mebibyte/s={:.2}", + "iter={},dur={}ms,overhead={}ms,throttle={}%,size={}MiB,dirtyrate={},bandwidth={:.2}MiBs,downtime={}ms", s.iteration, s.iteration_duration.as_millis(), + (s.iteration_duration - s.transmit_duration).as_millis(), vm.throttle_percent(), - s.bytes_to_transmit / 1024 / 1024, + s.bytes_to_transmit.div_ceil(1024).div_ceil(1024), s.dirty_rate_pps, - s.bytes_per_sec / 1024.0 / 1024.0 + s.bytes_per_sec / 1024.0 / 1024.0, + s.calculated_downtime_duration + .map_or(migrate_downtime_limit.as_millis(), |d| d.as_millis()), ); // Increment iteration counter @@ -2259,7 +2304,7 @@ impl Vmm { fn do_memory_migration( vm: &mut Vm, socket: &mut SocketStream, - s: &mut MigrationState, + s: &mut MigrationStateInternal, send_data_migration: &VmSendMigrationData, ) -> result::Result<(), MigratableError> { let mem_send = SendAdditionalConnections::new(send_data_migration, &vm.guest_memory())?; @@ -2342,7 +2387,32 @@ impl Vmm { hypervisor: &dyn hypervisor::Hypervisor, send_data_migration: &VmSendMigrationData, ) -> result::Result<(), MigratableError> { - let mut s = MigrationState::new(); + // Update migration progress snapshot + { + let mut lock = MIGRATION_PROGRESS_SNAPSHOT.lock().unwrap(); + if lock + .as_ref() + .map(|p| &p.state) + .is_some_and(|snapshot| matches!(snapshot, MigrationState::Ongoing { .. })) + { + // If this panic triggers, we made a programming error in our state handling. + panic!("migration already ongoing"); + } + let transportation_mode = if send_data_migration.local { + TransportationMode::Local + } else { + TransportationMode::Tcp { + connections: send_data_migration.connections, + tls: send_data_migration.tls_dir.is_some(), + } + }; + lock.replace(MigrationProgress::new( + transportation_mode, + Duration::from_millis(send_data_migration.downtime), + )); + } + + let mut s = MigrationStateInternal::new(); // Set up the socket connection let mut socket = send_migration_socket(send_data_migration)?; @@ -2396,6 +2466,11 @@ impl Vmm { if send_data_migration.local { match &mut socket { SocketStream::Unix(unix_socket) => { + let mut lock = MIGRATION_PROGRESS_SNAPSHOT.lock().unwrap(); + lock.as_mut() + .expect("live migration should be ongoing") + .update(MigrationStateOngoingPhase::MemoryFds, None, None, None); + // Proceed with sending memory file descriptors over UNIX socket vm.send_memory_fds(unix_socket)?; } @@ -2438,6 +2513,14 @@ impl Vmm { Self::do_memory_migration(vm, &mut socket, &mut s, send_data_migration)?; } + // Update migration progress snapshot + { + let mut lock = MIGRATION_PROGRESS_SNAPSHOT.lock().unwrap(); + lock.as_mut() + .expect("live migration should be ongoing") + .update(MigrationStateOngoingPhase::Completing, None, None, None); + } + // We release the locks early to enable locking them on the destination host. // The VM is already stopped. vm.release_disk_locks() @@ -2623,37 +2706,42 @@ impl Vmm { fn check_migration_result(&mut self) { // At this point, the thread must be finished. // If we fail here, we have lost anyway. Just panic. - let (vm, migration_res) = self + let (vm, migration_res, migration_cfg) = self .migration_thread_handle .take() .expect("should have thread") .join() .expect("should have joined"); - // Give VMM back control. - self.vm = MaybeVmOwnership::Vmm(vm); - match migration_res { Ok(()) => { { - info!("Sending Receiver in HTTP thread that migration succeeded"); - let (sender, _) = &*ONGOING_LIVEMIGRATION; - // unblock API call; propagate migration result - sender.send(Ok(())).unwrap(); + let mut lock = MIGRATION_PROGRESS_SNAPSHOT.lock().unwrap(); + lock.as_mut() + .expect("live migration should be ongoing") + .mark_as_finished(); } - // Shutdown the VM after the migration succeeded - if let Err(e) = self.exit_evt.write(1) { - error!("Failed shutting down the VM after migration: {e}"); + if migration_cfg.keep_alive { + // API users can still query live-migration statistics + info!("Keeping VMM alive as requested"); + } else { + // Shutdown the VM after the migration succeeded + if let Err(e) = self.exit_evt.write(1) { + error!("Failed shutting down the VM after migration: {e}"); + } } + drop(vm); } Err(e) => { - error!("Migration failed: {e}"); + // Give VMM back control. + self.vm = MaybeVmOwnership::Vmm(vm); + // Update migration progress snapshot { - info!("Sending Receiver in HTTP thread that migration failed"); - let (sender, _) = &*ONGOING_LIVEMIGRATION; - // unblock API call; propagate migration result - sender.send(Err(e)).unwrap(); + let mut lock = MIGRATION_PROGRESS_SNAPSHOT.lock().unwrap(); + lock.as_mut() + .expect("live migration should be ongoing") + .mark_as_failed(&e); } // we don't fail the VMM here, it just continues running its VM } @@ -3629,6 +3717,11 @@ impl RequestHandler for Vmm { Ok(()) } + + fn vm_migration_progress(&mut self) -> Option { + let lock = MIGRATION_PROGRESS_SNAPSHOT.lock().unwrap(); + lock.clone() + } } const CPU_MANAGER_SNAPSHOT_ID: &str = "cpu-manager";