diff --git a/vmm/sandbox/src/cloud_hypervisor/mod.rs b/vmm/sandbox/src/cloud_hypervisor/mod.rs index 5e4b7706..8eeaef27 100644 --- a/vmm/sandbox/src/cloud_hypervisor/mod.rs +++ b/vmm/sandbox/src/cloud_hypervisor/mod.rs @@ -14,12 +14,13 @@ See the License for the specific language governing permissions and limitations under the License. */ -use std::{os::unix::io::RawFd, process::Stdio}; +use std::{os::unix::io::RawFd, process::Stdio, time::Duration}; use anyhow::anyhow; use async_trait::async_trait; use containerd_sandbox::error::{Error, Result}; -use log::{debug, error, info}; +use log::{debug, error, info, warn}; +use nix::{errno::Errno::ESRCH, sys::signal, unistd::Pid}; use serde::{Deserialize, Serialize}; use time::OffsetDateTime; use tokio::{ @@ -41,10 +42,10 @@ use crate::{ }, }, device::{BusType, DeviceInfo}, - impl_recoverable, load_config, + load_config, param::ToCmdLineParams, sandbox::KuasarSandboxer, - utils::{read_std, set_cmd_fd, set_cmd_netns, wait_pid, write_file_atomic}, + utils::{read_std, set_cmd_fd, set_cmd_netns, wait_channel, wait_pid, write_file_atomic}, vm::{Pids, VcpuThreads, VM}, }; @@ -145,6 +146,16 @@ impl CloudHypervisorVM { self.fds.push(fd); self.fds.len() - 1 + 3 } + + async fn wait_stop(&mut self, t: Duration) -> Result<()> { + if let Some(rx) = self.wait_channel().await { + let (_, ts) = *rx.borrow(); + if ts == 0 { + wait_channel(t, rx).await?; + } + } + Ok(()) + } } #[async_trait] @@ -152,6 +163,8 @@ impl VM for CloudHypervisorVM { async fn start(&mut self) -> Result { create_dir_all(&self.base_dir).await?; let virtiofsd_pid = self.start_virtiofsd().await?; + // TODO: add child virtiofsd process + self.pids.affiliated_pids.push(virtiofsd_pid); let mut params = self.config.to_cmdline_params("--"); for d in self.devices.iter() { params.extend(d.to_cmdline_params("--")); @@ -174,31 +187,57 @@ impl VM for CloudHypervisorVM { .spawn() .map_err(|e| anyhow!("failed to spawn cloud hypervisor command: {}", e))?; let pid = child.id(); + self.pids.vmm_pid = pid; let pid_file = format!("{}/pid", self.base_dir); - let (tx, rx) = tokio::sync::watch::channel((0u32, 0i128)); + let (tx, rx) = channel((0u32, 0i128)); + self.wait_chan = Some(rx); spawn_wait( child, format!("cloud-hypervisor {}", self.id), Some(pid_file), Some(tx), ); - self.client = Some(self.create_client().await?); - self.wait_chan = Some(rx); - // update vmm related pids - self.pids.vmm_pid = pid; - self.pids.affiliated_pids.push(virtiofsd_pid); - // TODO: add child virtiofsd process + match self.create_client().await { + Ok(client) => self.client = Some(client), + Err(e) => { + if let Err(re) = self.stop(true).await { + warn!("roll back in create clh api client: {}", re); + return Err(e); + } + return Err(e); + } + }; Ok(pid.unwrap_or_default()) } async fn stop(&mut self, force: bool) -> Result<()> { - let pid = self.pid()?; - if pid == 0 { - return Ok(()); + let signal = if force { + signal::SIGKILL + } else { + signal::SIGTERM + }; + + let pids = self.pids(); + if let Some(vmm_pid) = pids.vmm_pid { + if vmm_pid > 0 { + // TODO: Consider pid reused + match signal::kill(Pid::from_raw(vmm_pid as i32), signal) { + Err(e) => { + if e != ESRCH { + return Err(anyhow!("kill vmm process {}: {}", vmm_pid, e).into()); + } + } + Ok(_) => self.wait_stop(Duration::from_secs(10)).await?, + } + } + } + for affiliated_pid in pids.affiliated_pids { + if affiliated_pid > 0 { + // affiliated process may exits automatically, so it's ok not handle error + signal::kill(Pid::from_raw(affiliated_pid as i32), signal).unwrap_or_default(); + } } - let signal = if force { 9 } else { 15 }; - unsafe { nix::libc::kill(pid as i32, signal) }; Ok(()) } @@ -288,7 +327,20 @@ impl VM for CloudHypervisorVM { } } -impl_recoverable!(CloudHypervisorVM); +#[async_trait] +impl crate::vm::Recoverable for CloudHypervisorVM { + async fn recover(&mut self) -> Result<()> { + self.client = Some(self.create_client().await?); + let pid = self.pid()?; + let (tx, rx) = channel((0u32, 0i128)); + tokio::spawn(async move { + let wait_result = wait_pid(pid as i32).await; + tx.send(wait_result).unwrap_or_default(); + }); + self.wait_chan = Some(rx); + Ok(()) + } +} macro_rules! read_stdio { ($stdio:expr, $cmd_name:ident) => { diff --git a/vmm/sandbox/src/network/link.rs b/vmm/sandbox/src/network/link.rs index 2addab4d..5ddc2785 100644 --- a/vmm/sandbox/src/network/link.rs +++ b/vmm/sandbox/src/network/link.rs @@ -574,6 +574,11 @@ async fn get_pci_driver(bdf: &str) -> Result { } async fn bind_device_to_driver(driver: &str, bdf: &str) -> Result<()> { + // 0. Check the current driver + if get_pci_driver(bdf).await? == driver { + return Ok(()); + } + // 1. Switch the device driver let driver_override_path = format!("/sys/bus/pci/devices/{}/driver_override", bdf); write_file_async(&driver_override_path, driver).await?; @@ -589,23 +594,13 @@ async fn bind_device_to_driver(driver: &str, bdf: &str) -> Result<()> { write_file_async(probe_path, bdf).await?; // 4. Check the result - let driver_link = format!("/sys/bus/pci/devices/{}/driver", bdf); - let driver_path = tokio::fs::read_link(&*driver_link).await?; - - let result_driver = driver_path.file_name().ok_or(anyhow!( - "failed to get driver name from {}", - driver_path.display() - ))?; - let result_driver = result_driver.to_str().ok_or(anyhow!( - "failed to convert the driver {} to str", - result_driver.to_string_lossy() - ))?; + let result_driver = get_pci_driver(bdf).await?; if result_driver != driver { return Err(anyhow!( - "device {} driver is {} after executing bind to {}", + "device {} driver is expected to {} but got to {}", bdf, - result_driver, - driver + driver, + result_driver ) .into()); } diff --git a/vmm/sandbox/src/sandbox.rs b/vmm/sandbox/src/sandbox.rs index dc8ac46c..0105b098 100644 --- a/vmm/sandbox/src/sandbox.rs +++ b/vmm/sandbox/src/sandbox.rs @@ -52,35 +52,6 @@ use crate::{ pub const KUASAR_GUEST_SHARE_DIR: &str = "/run/kuasar/storage/containers/"; -macro_rules! _monitor { - ($sb:ident) => { - tokio::spawn(async move { - let mut rx = { - let sandbox = $sb.lock().await; - if let SandboxStatus::Running(_) = sandbox.status.clone() { - sandbox.vm.wait_channel().await.unwrap() - } else { - error!("can not get wait channel when sandbox is running"); - return; - } - }; - - let (code, ts) = *rx.borrow(); - if ts == 0 { - rx.changed().await.unwrap_or_default(); - let (code, ts) = *rx.borrow(); - let mut sandbox = $sb.lock().await; - sandbox.status = SandboxStatus::Stopped(code, ts); - sandbox.exit_signal.signal(); - } else { - let mut sandbox = $sb.lock().await; - sandbox.status = SandboxStatus::Stopped(code, ts); - sandbox.exit_signal.signal(); - } - }); - }; -} - pub struct KuasarSandboxer> { factory: F, hooks: H, @@ -124,9 +95,13 @@ where let path = Path::new(dir).join(entry.file_name()); match KuasarSandbox::recover(&path).await { Ok(sb) => { + let status = sb.status.clone(); let sb_mutex = Arc::new(Mutex::new(sb)); - let sb_clone = sb_mutex.clone(); - monitor(sb_clone); + // Only running sandbox should be monitored. + if let SandboxStatus::Running(_) = status { + let sb_clone = sb_mutex.clone(); + monitor(sb_clone); + } self.sandboxes .write() .await @@ -202,8 +177,6 @@ where return Err(e); } } - - // TODO support network let vm = self.factory.create_vm(id, &s).await?; let mut sandbox = KuasarSandbox { vm, @@ -220,27 +193,6 @@ where sandbox_cgroups, }; - // Handle pod network if it has a private network namespace - if !s.sandbox.netns.is_empty() { - // get vcpu for interface queue - let mut vcpu = 1; - if let Some(resources) = get_resources(&s.sandbox) { - if resources.cpu_period > 0 && resources.cpu_quota > 0 { - // get ceil of cpus if it is not integer - let base = (resources.cpu_quota as f64 / resources.cpu_period as f64).ceil(); - vcpu = base as u32; - } - } - - let network_config = NetworkConfig { - netns: s.sandbox.netns.to_string(), - sandbox_id: id.to_string(), - queue: vcpu, - }; - let network = Network::new(network_config).await?; - network.attach_to(&mut sandbox).await?; - } - // setup sandbox files: hosts, hostname and resolv.conf for guest sandbox.setup_sandbox_files().await?; self.hooks.post_create(&mut sandbox).await?; @@ -256,37 +208,47 @@ where let sandbox_mutex = self.sandbox(id).await?; let mut sandbox = sandbox_mutex.lock().await; self.hooks.pre_start(&mut sandbox).await?; - sandbox.start().await?; - // Currently only support cgroup V1, cgroup V2 is not supported now - if !cgroups_rs::hierarchies::is_cgroup2_unified_mode() { - // add vmm process into sandbox cgroup - if let SandboxStatus::Running(vmm_pid) = sandbox.status { - let vcpu_threads = sandbox.vm.vcpus().await?; - debug!( - "vmm process pid: {}, vcpu threads pid: {:?}", - vmm_pid, vcpu_threads - ); - sandbox - .sandbox_cgroups - .add_process_into_sandbox_cgroups(vmm_pid, Some(vcpu_threads))?; - // move all vmm-related process into sandbox cgroup - for pid in sandbox.vm.pids().affiliated_pids { - sandbox - .sandbox_cgroups - .add_process_into_sandbox_cgroups(pid, None)?; - } - } else { - return Err(Error::Other(anyhow!( - "sandbox status is not Running after started!" - ))); - } + // Prepare pod network if it has a private network namespace + if !sandbox.data.netns.is_empty() { + sandbox.prepare_network().await?; + } + + if let Err(e) = sandbox.start().await { + sandbox.destroy_network().await; + return Err(e); } let sandbox_clone = sandbox_mutex.clone(); monitor(sandbox_clone); - self.hooks.post_start(&mut sandbox).await?; - sandbox.dump().await?; + + if let Err(e) = sandbox.add_to_cgroup().await { + if let Err(re) = sandbox.stop(true).await { + warn!("roll back in add to cgroup {}", re); + return Err(e); + } + sandbox.destroy_network().await; + return Err(e); + } + + if let Err(e) = self.hooks.post_start(&mut sandbox).await { + if let Err(re) = sandbox.stop(true).await { + warn!("roll back in sandbox post start {}", re); + return Err(e); + } + sandbox.destroy_network().await; + return Err(e); + } + + if let Err(e) = sandbox.dump().await { + if let Err(re) = sandbox.stop(true).await { + warn!("roll back in sandbox start dump {}", re); + return Err(e); + } + sandbox.destroy_network().await; + return Err(e); + } + Ok(()) } @@ -446,7 +408,22 @@ where let mut sb = serde_json::from_slice::>(content.as_slice()) .map_err(|e| anyhow!("failed to deserialize sandbox, {}", e))?; if let SandboxStatus::Running(_) = sb.status { - sb.vm.recover().await?; + if let Err(e) = sb.vm.recover().await { + warn!("failed to recover vm {}: {}, then force kill it!", sb.id, e); + if let Err(re) = sb.stop(true).await { + warn!("roll back in recover and stop: {}", re); + return Err(e); + } + return Err(e); + }; + if let Err(e) = sb.init_client().await { + if let Err(re) = sb.stop(true).await { + warn!("roll back in recover, init task client and stop: {}", re); + return Err(e); + } + return Err(e); + } + sb.sync_clock().await; } // recover the sandbox_cgroups in the sandbox object sb.sandbox_cgroups = @@ -464,12 +441,18 @@ where let pid = self.vm.start().await?; if let Err(e) = self.init_client().await { - self.vm.stop(true).await.unwrap_or_default(); + if let Err(re) = self.vm.stop(true).await { + warn!("roll back in init task client: {}", re); + return Err(e); + } return Err(e); } if let Err(e) = self.setup_network().await { - self.vm.stop(true).await.unwrap_or_default(); + if let Err(re) = self.vm.stop(true).await { + warn!("roll back in init task client: {}", re); + return Err(e); + } return Err(e); } @@ -479,12 +462,15 @@ where async fn stop(&mut self, force: bool) -> Result<()> { match self.status { - // If sandbox is created but not running, no need to stop. - SandboxStatus::Created => { - return Ok(()); - } + // If a sandbox is created: + // 1. Just Created, vmm is not running: roll back and cleanup + // 2. Created and vmm is running: roll back and cleanup + // 3. Created and vmm is exited abnormally after running: status is Stopped + SandboxStatus::Created => {} SandboxStatus::Running(_) => {} SandboxStatus::Stopped(_, _) => { + // Network should already be destroyed when sandbox is stopped. + self.destroy_network().await; return Ok(()); } _ => { @@ -505,9 +491,7 @@ where } self.vm.stop(force).await?; - if let Some(network) = self.network.as_mut() { - network.destroy().await; - } + self.destroy_network().await; Ok(()) } @@ -530,6 +514,7 @@ where return Err(anyhow!("VM address is empty").into()); } let client = new_sandbox_client(&addr).await?; + debug!("connected to task server {}", self.id); client_check(&client).await?; *client_guard = Some(client) } @@ -606,6 +591,61 @@ where pub fn get_sandbox_shared_path(&self) -> String { format!("{}/{}", self.base_dir, SHARED_DIR_SUFFIX) } + + pub async fn prepare_network(&mut self) -> Result<()> { + // get vcpu for interface queue, at least one vcpu + let mut vcpu = 1; + if let Some(resources) = get_resources(&self.data) { + if resources.cpu_period > 0 && resources.cpu_quota > 0 { + // get ceil of cpus if it is not integer + let base = (resources.cpu_quota as f64 / resources.cpu_period as f64).ceil(); + vcpu = base as u32; + } + } + + let network_config = NetworkConfig { + netns: self.data.netns.to_string(), + sandbox_id: self.id.to_string(), + queue: vcpu, + }; + let network = Network::new(network_config).await?; + network.attach_to(self).await?; + Ok(()) + } + + // If a sandbox is still running, destroy network may hang with its running + pub async fn destroy_network(&mut self) { + // Network should be destroyed only once, take it out here. + if let Some(mut network) = self.network.take() { + network.destroy().await; + } + } + + pub async fn add_to_cgroup(&self) -> Result<()> { + // Currently only support cgroup V1, cgroup V2 is not supported now + if !cgroups_rs::hierarchies::is_cgroup2_unified_mode() { + // add vmm process into sandbox cgroup + if let SandboxStatus::Running(vmm_pid) = self.status { + let vcpu_threads = self.vm.vcpus().await?; + debug!( + "vmm process pid: {}, vcpu threads pid: {:?}", + vmm_pid, vcpu_threads + ); + self.sandbox_cgroups + .add_process_into_sandbox_cgroups(vmm_pid, Some(vcpu_threads))?; + // move all vmm-related process into sandbox cgroup + for pid in self.vm.pids().affiliated_pids { + self.sandbox_cgroups + .add_process_into_sandbox_cgroups(pid, None)?; + } + } else { + return Err(Error::Other(anyhow!( + "sandbox status is not Running after started!" + ))); + } + } + Ok(()) + } } // parse_dnsoptions parse DNS options into resolv.conf format content, @@ -672,12 +712,28 @@ fn monitor(sandbox_mutex: Arc>>) { rx.changed().await.unwrap_or_default(); let (code, ts) = *rx.borrow(); let mut sandbox = sandbox_mutex.lock().await; + info!("monitor sandbox {} terminated", sandbox.id); sandbox.status = SandboxStatus::Stopped(code, ts); sandbox.exit_signal.signal(); + // Network destruction should be done after sandbox status changed from running. + sandbox.destroy_network().await; + sandbox + .dump() + .await + .map_err(|e| error!("dump sandbox {} in monitor: {}", sandbox.id, e)) + .unwrap_or_default(); } else { let mut sandbox = sandbox_mutex.lock().await; + info!("sandbox {} already terminated before monit it", sandbox.id); sandbox.status = SandboxStatus::Stopped(code, ts); sandbox.exit_signal.signal(); + // Network destruction should be done after sandbox status changed from running. + sandbox.destroy_network().await; + sandbox + .dump() + .await + .map_err(|e| error!("dump sandbox {} in monitor: {}", sandbox.id, e)) + .unwrap_or_default(); } }); }