From b285b074ee93a2eb5816979979f83693522c59f8 Mon Sep 17 00:00:00 2001 From: Zhang Tianyang Date: Fri, 5 Apr 2024 16:07:30 +0800 Subject: [PATCH] sandbox: Optimize sandbox lifecycle management 1. Add many roll back handlers in the process of starting sandbox. 2. Move setup network to Start sandbox as it should be called near the vm startup. 3. Only monit running sandbox and dump it status in time. 4. Sandbox stop should make sure the sandbox is stopped successfully, so wait it to stop for 10s. 5. If the VM process terminated suddenly and the containerd has no idea to stop sandbox, destroy network should be done in monit thread. 6. Forcefully kill vmm process in stopping sandbox. Signed-off-by: Zhang Tianyang --- vmm/sandbox/src/cloud_hypervisor/mod.rs | 86 +++++++-- vmm/sandbox/src/network/link.rs | 23 +-- vmm/sandbox/src/sandbox.rs | 236 +++++++++++++++--------- 3 files changed, 224 insertions(+), 121 deletions(-) diff --git a/vmm/sandbox/src/cloud_hypervisor/mod.rs b/vmm/sandbox/src/cloud_hypervisor/mod.rs index 5e4b7706..8eeaef27 100644 --- a/vmm/sandbox/src/cloud_hypervisor/mod.rs +++ b/vmm/sandbox/src/cloud_hypervisor/mod.rs @@ -14,12 +14,13 @@ See the License for the specific language governing permissions and limitations under the License. */ -use std::{os::unix::io::RawFd, process::Stdio}; +use std::{os::unix::io::RawFd, process::Stdio, time::Duration}; use anyhow::anyhow; use async_trait::async_trait; use containerd_sandbox::error::{Error, Result}; -use log::{debug, error, info}; +use log::{debug, error, info, warn}; +use nix::{errno::Errno::ESRCH, sys::signal, unistd::Pid}; use serde::{Deserialize, Serialize}; use time::OffsetDateTime; use tokio::{ @@ -41,10 +42,10 @@ use crate::{ }, }, device::{BusType, DeviceInfo}, - impl_recoverable, load_config, + load_config, param::ToCmdLineParams, sandbox::KuasarSandboxer, - utils::{read_std, set_cmd_fd, set_cmd_netns, wait_pid, write_file_atomic}, + utils::{read_std, set_cmd_fd, set_cmd_netns, wait_channel, wait_pid, write_file_atomic}, vm::{Pids, VcpuThreads, VM}, }; @@ -145,6 +146,16 @@ impl CloudHypervisorVM { self.fds.push(fd); self.fds.len() - 1 + 3 } + + async fn wait_stop(&mut self, t: Duration) -> Result<()> { + if let Some(rx) = self.wait_channel().await { + let (_, ts) = *rx.borrow(); + if ts == 0 { + wait_channel(t, rx).await?; + } + } + Ok(()) + } } #[async_trait] @@ -152,6 +163,8 @@ impl VM for CloudHypervisorVM { async fn start(&mut self) -> Result { create_dir_all(&self.base_dir).await?; let virtiofsd_pid = self.start_virtiofsd().await?; + // TODO: add child virtiofsd process + self.pids.affiliated_pids.push(virtiofsd_pid); let mut params = self.config.to_cmdline_params("--"); for d in self.devices.iter() { params.extend(d.to_cmdline_params("--")); @@ -174,31 +187,57 @@ impl VM for CloudHypervisorVM { .spawn() .map_err(|e| anyhow!("failed to spawn cloud hypervisor command: {}", e))?; let pid = child.id(); + self.pids.vmm_pid = pid; let pid_file = format!("{}/pid", self.base_dir); - let (tx, rx) = tokio::sync::watch::channel((0u32, 0i128)); + let (tx, rx) = channel((0u32, 0i128)); + self.wait_chan = Some(rx); spawn_wait( child, format!("cloud-hypervisor {}", self.id), Some(pid_file), Some(tx), ); - self.client = Some(self.create_client().await?); - self.wait_chan = Some(rx); - // update vmm related pids - self.pids.vmm_pid = pid; - self.pids.affiliated_pids.push(virtiofsd_pid); - // TODO: add child virtiofsd process + match self.create_client().await { + Ok(client) => self.client = Some(client), + Err(e) => { + if let Err(re) = self.stop(true).await { + warn!("roll back in create clh api client: {}", re); + return Err(e); + } + return Err(e); + } + }; Ok(pid.unwrap_or_default()) } async fn stop(&mut self, force: bool) -> Result<()> { - let pid = self.pid()?; - if pid == 0 { - return Ok(()); + let signal = if force { + signal::SIGKILL + } else { + signal::SIGTERM + }; + + let pids = self.pids(); + if let Some(vmm_pid) = pids.vmm_pid { + if vmm_pid > 0 { + // TODO: Consider pid reused + match signal::kill(Pid::from_raw(vmm_pid as i32), signal) { + Err(e) => { + if e != ESRCH { + return Err(anyhow!("kill vmm process {}: {}", vmm_pid, e).into()); + } + } + Ok(_) => self.wait_stop(Duration::from_secs(10)).await?, + } + } + } + for affiliated_pid in pids.affiliated_pids { + if affiliated_pid > 0 { + // affiliated process may exits automatically, so it's ok not handle error + signal::kill(Pid::from_raw(affiliated_pid as i32), signal).unwrap_or_default(); + } } - let signal = if force { 9 } else { 15 }; - unsafe { nix::libc::kill(pid as i32, signal) }; Ok(()) } @@ -288,7 +327,20 @@ impl VM for CloudHypervisorVM { } } -impl_recoverable!(CloudHypervisorVM); +#[async_trait] +impl crate::vm::Recoverable for CloudHypervisorVM { + async fn recover(&mut self) -> Result<()> { + self.client = Some(self.create_client().await?); + let pid = self.pid()?; + let (tx, rx) = channel((0u32, 0i128)); + tokio::spawn(async move { + let wait_result = wait_pid(pid as i32).await; + tx.send(wait_result).unwrap_or_default(); + }); + self.wait_chan = Some(rx); + Ok(()) + } +} macro_rules! read_stdio { ($stdio:expr, $cmd_name:ident) => { diff --git a/vmm/sandbox/src/network/link.rs b/vmm/sandbox/src/network/link.rs index 2addab4d..5ddc2785 100644 --- a/vmm/sandbox/src/network/link.rs +++ b/vmm/sandbox/src/network/link.rs @@ -574,6 +574,11 @@ async fn get_pci_driver(bdf: &str) -> Result { } async fn bind_device_to_driver(driver: &str, bdf: &str) -> Result<()> { + // 0. Check the current driver + if get_pci_driver(bdf).await? == driver { + return Ok(()); + } + // 1. Switch the device driver let driver_override_path = format!("/sys/bus/pci/devices/{}/driver_override", bdf); write_file_async(&driver_override_path, driver).await?; @@ -589,23 +594,13 @@ async fn bind_device_to_driver(driver: &str, bdf: &str) -> Result<()> { write_file_async(probe_path, bdf).await?; // 4. Check the result - let driver_link = format!("/sys/bus/pci/devices/{}/driver", bdf); - let driver_path = tokio::fs::read_link(&*driver_link).await?; - - let result_driver = driver_path.file_name().ok_or(anyhow!( - "failed to get driver name from {}", - driver_path.display() - ))?; - let result_driver = result_driver.to_str().ok_or(anyhow!( - "failed to convert the driver {} to str", - result_driver.to_string_lossy() - ))?; + let result_driver = get_pci_driver(bdf).await?; if result_driver != driver { return Err(anyhow!( - "device {} driver is {} after executing bind to {}", + "device {} driver is expected to {} but got to {}", bdf, - result_driver, - driver + driver, + result_driver ) .into()); } diff --git a/vmm/sandbox/src/sandbox.rs b/vmm/sandbox/src/sandbox.rs index dc8ac46c..0105b098 100644 --- a/vmm/sandbox/src/sandbox.rs +++ b/vmm/sandbox/src/sandbox.rs @@ -52,35 +52,6 @@ use crate::{ pub const KUASAR_GUEST_SHARE_DIR: &str = "/run/kuasar/storage/containers/"; -macro_rules! _monitor { - ($sb:ident) => { - tokio::spawn(async move { - let mut rx = { - let sandbox = $sb.lock().await; - if let SandboxStatus::Running(_) = sandbox.status.clone() { - sandbox.vm.wait_channel().await.unwrap() - } else { - error!("can not get wait channel when sandbox is running"); - return; - } - }; - - let (code, ts) = *rx.borrow(); - if ts == 0 { - rx.changed().await.unwrap_or_default(); - let (code, ts) = *rx.borrow(); - let mut sandbox = $sb.lock().await; - sandbox.status = SandboxStatus::Stopped(code, ts); - sandbox.exit_signal.signal(); - } else { - let mut sandbox = $sb.lock().await; - sandbox.status = SandboxStatus::Stopped(code, ts); - sandbox.exit_signal.signal(); - } - }); - }; -} - pub struct KuasarSandboxer> { factory: F, hooks: H, @@ -124,9 +95,13 @@ where let path = Path::new(dir).join(entry.file_name()); match KuasarSandbox::recover(&path).await { Ok(sb) => { + let status = sb.status.clone(); let sb_mutex = Arc::new(Mutex::new(sb)); - let sb_clone = sb_mutex.clone(); - monitor(sb_clone); + // Only running sandbox should be monitored. + if let SandboxStatus::Running(_) = status { + let sb_clone = sb_mutex.clone(); + monitor(sb_clone); + } self.sandboxes .write() .await @@ -202,8 +177,6 @@ where return Err(e); } } - - // TODO support network let vm = self.factory.create_vm(id, &s).await?; let mut sandbox = KuasarSandbox { vm, @@ -220,27 +193,6 @@ where sandbox_cgroups, }; - // Handle pod network if it has a private network namespace - if !s.sandbox.netns.is_empty() { - // get vcpu for interface queue - let mut vcpu = 1; - if let Some(resources) = get_resources(&s.sandbox) { - if resources.cpu_period > 0 && resources.cpu_quota > 0 { - // get ceil of cpus if it is not integer - let base = (resources.cpu_quota as f64 / resources.cpu_period as f64).ceil(); - vcpu = base as u32; - } - } - - let network_config = NetworkConfig { - netns: s.sandbox.netns.to_string(), - sandbox_id: id.to_string(), - queue: vcpu, - }; - let network = Network::new(network_config).await?; - network.attach_to(&mut sandbox).await?; - } - // setup sandbox files: hosts, hostname and resolv.conf for guest sandbox.setup_sandbox_files().await?; self.hooks.post_create(&mut sandbox).await?; @@ -256,37 +208,47 @@ where let sandbox_mutex = self.sandbox(id).await?; let mut sandbox = sandbox_mutex.lock().await; self.hooks.pre_start(&mut sandbox).await?; - sandbox.start().await?; - // Currently only support cgroup V1, cgroup V2 is not supported now - if !cgroups_rs::hierarchies::is_cgroup2_unified_mode() { - // add vmm process into sandbox cgroup - if let SandboxStatus::Running(vmm_pid) = sandbox.status { - let vcpu_threads = sandbox.vm.vcpus().await?; - debug!( - "vmm process pid: {}, vcpu threads pid: {:?}", - vmm_pid, vcpu_threads - ); - sandbox - .sandbox_cgroups - .add_process_into_sandbox_cgroups(vmm_pid, Some(vcpu_threads))?; - // move all vmm-related process into sandbox cgroup - for pid in sandbox.vm.pids().affiliated_pids { - sandbox - .sandbox_cgroups - .add_process_into_sandbox_cgroups(pid, None)?; - } - } else { - return Err(Error::Other(anyhow!( - "sandbox status is not Running after started!" - ))); - } + // Prepare pod network if it has a private network namespace + if !sandbox.data.netns.is_empty() { + sandbox.prepare_network().await?; + } + + if let Err(e) = sandbox.start().await { + sandbox.destroy_network().await; + return Err(e); } let sandbox_clone = sandbox_mutex.clone(); monitor(sandbox_clone); - self.hooks.post_start(&mut sandbox).await?; - sandbox.dump().await?; + + if let Err(e) = sandbox.add_to_cgroup().await { + if let Err(re) = sandbox.stop(true).await { + warn!("roll back in add to cgroup {}", re); + return Err(e); + } + sandbox.destroy_network().await; + return Err(e); + } + + if let Err(e) = self.hooks.post_start(&mut sandbox).await { + if let Err(re) = sandbox.stop(true).await { + warn!("roll back in sandbox post start {}", re); + return Err(e); + } + sandbox.destroy_network().await; + return Err(e); + } + + if let Err(e) = sandbox.dump().await { + if let Err(re) = sandbox.stop(true).await { + warn!("roll back in sandbox start dump {}", re); + return Err(e); + } + sandbox.destroy_network().await; + return Err(e); + } + Ok(()) } @@ -446,7 +408,22 @@ where let mut sb = serde_json::from_slice::>(content.as_slice()) .map_err(|e| anyhow!("failed to deserialize sandbox, {}", e))?; if let SandboxStatus::Running(_) = sb.status { - sb.vm.recover().await?; + if let Err(e) = sb.vm.recover().await { + warn!("failed to recover vm {}: {}, then force kill it!", sb.id, e); + if let Err(re) = sb.stop(true).await { + warn!("roll back in recover and stop: {}", re); + return Err(e); + } + return Err(e); + }; + if let Err(e) = sb.init_client().await { + if let Err(re) = sb.stop(true).await { + warn!("roll back in recover, init task client and stop: {}", re); + return Err(e); + } + return Err(e); + } + sb.sync_clock().await; } // recover the sandbox_cgroups in the sandbox object sb.sandbox_cgroups = @@ -464,12 +441,18 @@ where let pid = self.vm.start().await?; if let Err(e) = self.init_client().await { - self.vm.stop(true).await.unwrap_or_default(); + if let Err(re) = self.vm.stop(true).await { + warn!("roll back in init task client: {}", re); + return Err(e); + } return Err(e); } if let Err(e) = self.setup_network().await { - self.vm.stop(true).await.unwrap_or_default(); + if let Err(re) = self.vm.stop(true).await { + warn!("roll back in init task client: {}", re); + return Err(e); + } return Err(e); } @@ -479,12 +462,15 @@ where async fn stop(&mut self, force: bool) -> Result<()> { match self.status { - // If sandbox is created but not running, no need to stop. - SandboxStatus::Created => { - return Ok(()); - } + // If a sandbox is created: + // 1. Just Created, vmm is not running: roll back and cleanup + // 2. Created and vmm is running: roll back and cleanup + // 3. Created and vmm is exited abnormally after running: status is Stopped + SandboxStatus::Created => {} SandboxStatus::Running(_) => {} SandboxStatus::Stopped(_, _) => { + // Network should already be destroyed when sandbox is stopped. + self.destroy_network().await; return Ok(()); } _ => { @@ -505,9 +491,7 @@ where } self.vm.stop(force).await?; - if let Some(network) = self.network.as_mut() { - network.destroy().await; - } + self.destroy_network().await; Ok(()) } @@ -530,6 +514,7 @@ where return Err(anyhow!("VM address is empty").into()); } let client = new_sandbox_client(&addr).await?; + debug!("connected to task server {}", self.id); client_check(&client).await?; *client_guard = Some(client) } @@ -606,6 +591,61 @@ where pub fn get_sandbox_shared_path(&self) -> String { format!("{}/{}", self.base_dir, SHARED_DIR_SUFFIX) } + + pub async fn prepare_network(&mut self) -> Result<()> { + // get vcpu for interface queue, at least one vcpu + let mut vcpu = 1; + if let Some(resources) = get_resources(&self.data) { + if resources.cpu_period > 0 && resources.cpu_quota > 0 { + // get ceil of cpus if it is not integer + let base = (resources.cpu_quota as f64 / resources.cpu_period as f64).ceil(); + vcpu = base as u32; + } + } + + let network_config = NetworkConfig { + netns: self.data.netns.to_string(), + sandbox_id: self.id.to_string(), + queue: vcpu, + }; + let network = Network::new(network_config).await?; + network.attach_to(self).await?; + Ok(()) + } + + // If a sandbox is still running, destroy network may hang with its running + pub async fn destroy_network(&mut self) { + // Network should be destroyed only once, take it out here. + if let Some(mut network) = self.network.take() { + network.destroy().await; + } + } + + pub async fn add_to_cgroup(&self) -> Result<()> { + // Currently only support cgroup V1, cgroup V2 is not supported now + if !cgroups_rs::hierarchies::is_cgroup2_unified_mode() { + // add vmm process into sandbox cgroup + if let SandboxStatus::Running(vmm_pid) = self.status { + let vcpu_threads = self.vm.vcpus().await?; + debug!( + "vmm process pid: {}, vcpu threads pid: {:?}", + vmm_pid, vcpu_threads + ); + self.sandbox_cgroups + .add_process_into_sandbox_cgroups(vmm_pid, Some(vcpu_threads))?; + // move all vmm-related process into sandbox cgroup + for pid in self.vm.pids().affiliated_pids { + self.sandbox_cgroups + .add_process_into_sandbox_cgroups(pid, None)?; + } + } else { + return Err(Error::Other(anyhow!( + "sandbox status is not Running after started!" + ))); + } + } + Ok(()) + } } // parse_dnsoptions parse DNS options into resolv.conf format content, @@ -672,12 +712,28 @@ fn monitor(sandbox_mutex: Arc>>) { rx.changed().await.unwrap_or_default(); let (code, ts) = *rx.borrow(); let mut sandbox = sandbox_mutex.lock().await; + info!("monitor sandbox {} terminated", sandbox.id); sandbox.status = SandboxStatus::Stopped(code, ts); sandbox.exit_signal.signal(); + // Network destruction should be done after sandbox status changed from running. + sandbox.destroy_network().await; + sandbox + .dump() + .await + .map_err(|e| error!("dump sandbox {} in monitor: {}", sandbox.id, e)) + .unwrap_or_default(); } else { let mut sandbox = sandbox_mutex.lock().await; + info!("sandbox {} already terminated before monit it", sandbox.id); sandbox.status = SandboxStatus::Stopped(code, ts); sandbox.exit_signal.signal(); + // Network destruction should be done after sandbox status changed from running. + sandbox.destroy_network().await; + sandbox + .dump() + .await + .map_err(|e| error!("dump sandbox {} in monitor: {}", sandbox.id, e)) + .unwrap_or_default(); } }); }