From bc6564002334a03eec4f9bd612e2df33a7bec674 Mon Sep 17 00:00:00 2001 From: utam0k Date: Wed, 31 Mar 2021 22:53:23 +0900 Subject: [PATCH] traial implementation of async/await. --- Cargo.lock | 11 ++++ Cargo.toml | 4 +- src/create.rs | 27 ++++++---- src/process/parent.rs | 3 +- src/rootfs.rs | 116 ++++++++++++++++++++++++++++-------------- src/spec.rs | 42 +++++++-------- 6 files changed, 130 insertions(+), 73 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 1e0cfc2f0..1f3a65393 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -159,6 +159,7 @@ dependencies = [ "futures-core", "futures-task", "futures-util", + "num_cpus", ] [[package]] @@ -357,6 +358,16 @@ dependencies = [ "autocfg", ] +[[package]] +name = "num_cpus" +version = "1.13.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "05499f3756671c15885fee9034446956fff3f243d6077b91e5767df161f766b3" +dependencies = [ + "hermit-abi", + "libc", +] + [[package]] name = "once_cell" version = "1.6.0" diff --git a/Cargo.toml b/Cargo.toml index 66f3ce834..4a31f0155 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -5,7 +5,6 @@ authors = ["utam0k "] edition = "2018" [dependencies] -futures = "0.3" clap = "3.0.0-beta.2" nix = "0.19.1" serde = { version = "1.0", features = ["derive"] } @@ -17,4 +16,5 @@ anyhow = "1.0" mio = { version = "0.7", features = ["os-ext", "os-poll"] } chrono = "0.4" once_cell = "1.6.0" -procfs = "0.9.1" \ No newline at end of file +procfs = "0.9.1" +futures = { version = "0.3", features = ["thread-pool"] } \ No newline at end of file diff --git a/src/create.rs b/src/create.rs index c06282f20..d275d314b 100644 --- a/src/create.rs +++ b/src/create.rs @@ -73,8 +73,8 @@ impl Create { let process = run_container( self.pid_file.as_ref(), &mut notify_socket, - &rootfs, - &spec, + rootfs, + spec, csocketfd, container, )?; @@ -88,8 +88,8 @@ impl Create { fn run_container>( pid_file: Option

, notify_socket: &mut NotifyListener, - rootfs: &PathBuf, - spec: &spec::Spec, + rootfs: PathBuf, + spec: spec::Spec, csocketfd: Option, container: Container, ) -> Result { @@ -116,6 +116,8 @@ fn run_container>( )? { Process::Parent(parent) => Ok(Process::Parent(parent)), Process::Child(child) => { + sched::unshare(cf & !sched::CloneFlags::CLONE_NEWUSER)?; + if let Some(csocketfd) = csocketfd { tty::ready(csocketfd)?; } @@ -128,23 +130,28 @@ fn run_container>( } } - sched::unshare(cf & !sched::CloneFlags::CLONE_NEWUSER)?; - match fork::fork_init(child)? { Process::Child(child) => Ok(Process::Child(child)), Process::Init(mut init) => { + let spec_args: &Vec = &spec.process.args.clone(); + + let clone_spec = std::sync::Arc::new(spec); + let clone_rootfs = std::sync::Arc::new(rootfs.clone()); + futures::executor::block_on(rootfs::prepare_rootfs( - spec, - rootfs, + clone_spec, + clone_rootfs, cf.contains(sched::CloneFlags::CLONE_NEWUSER), ))?; - rootfs::pivot_rootfs(&*rootfs)?; + + rootfs::pivot_rootfs(&rootfs)?; init.ready()?; notify_socket.wait_for_container_start()?; - utils::do_exec(&spec.process.args[0], &spec.process.args)?; + // utils::do_exec(&spec.process.args[0], &spec.process.args)?; + utils::do_exec(&spec_args[0], spec_args)?; container.update_status(ContainerStatus::Stopped)?.save()?; Ok(Process::Init(init)) diff --git a/src/process/parent.rs b/src/process/parent.rs index 294bf1aeb..fa48d5bdb 100644 --- a/src/process/parent.rs +++ b/src/process/parent.rs @@ -24,8 +24,7 @@ impl ParentProcess { pub fn wait_for_child_ready(&mut self) -> Result { let mut events = Events::with_capacity(128); - self.poll - .poll(&mut events, Some(Duration::from_millis(1000)))?; + self.poll.poll(&mut events, Some(Duration::from_secs(5)))?; for event in events.iter() { if let PARENT = event.token() { let mut buf = [0; 1]; diff --git a/src/rootfs.rs b/src/rootfs.rs index a3d31b12f..287b1436a 100644 --- a/src/rootfs.rs +++ b/src/rootfs.rs @@ -2,13 +2,16 @@ use std::fs::OpenOptions; use std::fs::{canonicalize, create_dir_all, remove_file}; use std::os::unix::fs::symlink; use std::path::{Path, PathBuf}; +use std::sync::Arc; use anyhow::{bail, Result}; -use futures::future; +use futures::future::{self, try_join_all}; +use futures::stream::{self, StreamExt}; +use futures::task::SpawnExt; use nix::errno::Errno; use nix::fcntl::{open, OFlag}; -use nix::mount::MsFlags; -use nix::mount::*; +use nix::mount::mount as nix_mount; +use nix::mount::{umount2, MntFlags, MsFlags}; use nix::sys::stat::{mknod, umask}; use nix::sys::stat::{Mode, SFlag}; use nix::unistd::{chdir, chown, close, fchdir, getcwd, pivot_root}; @@ -76,7 +79,11 @@ fn default_devices() -> Vec { ] } -pub async fn prepare_rootfs(spec: &Spec, rootfs: &PathBuf, bind_devices: bool) -> Result<()> { +pub async fn prepare_rootfs( + spec: Arc, + rootfs: Arc, + bind_devices: bool, +) -> Result<()> { let mut flags = MsFlags::MS_REC; match spec.linux { Some(ref linux) => match linux.rootfs_propagation.as_ref() { @@ -87,51 +94,78 @@ pub async fn prepare_rootfs(spec: &Spec, rootfs: &PathBuf, bind_devices: bool) - }, None => flags |= MsFlags::MS_SLAVE, }; - let linux = spec.linux.as_ref().unwrap(); - mount(None::<&str>, "/", None::<&str>, flags, None::<&str>)?; + nix_mount(None::<&str>, "/", None::<&str>, flags, None::<&str>)?; log::debug!("mount root fs {:?}", rootfs); - mount( - Some(rootfs), - rootfs, + nix_mount( + Some(rootfs.as_ref()), + rootfs.as_ref(), None::<&str>, MsFlags::MS_BIND | MsFlags::MS_REC, None::<&str>, )?; - future::try_join_all(spec.mounts.iter().map(|m| { async move { - let (flags, data) = parse_mount(m); + let pool = futures::executor::ThreadPool::new()?; + let can_parall = spec.mounts.clone().into_iter().filter(|m| m.typ != "tmpfs"); + let cannot_parall = spec.mounts.iter().filter(|m| m.typ == "tmpfs"); + + for m in cannot_parall { + let (flags, data) = parse_mount(&m); + let ml = &spec.linux.as_ref().unwrap().mount_label; if m.typ == "cgroup" { - log::warn!("A feature of cgoup is unimplemented."); - Ok(()) // skip + log::warn!("A feature of cgoup is unimplemented."); } else if m.destination == PathBuf::from("/dev") { - mount_from( - m, - rootfs, - flags & !MsFlags::MS_RDONLY, - &data, - &linux.mount_label, - ) + mount_to_container(&m, rootfs.as_ref(), flags & !MsFlags::MS_RDONLY, &data, &ml)?; } else { - mount_from(m, rootfs, flags, &data, &linux.mount_label) + mount_to_container(&m, rootfs.as_ref(), flags, &data, &ml)?; } } - })).await?; + + try_join_all( + stream::iter(can_parall) + .map(|m| { + let spec = Arc::clone(&spec); + let rootfs = Arc::clone(&rootfs); + pool.spawn_with_handle(async move { + let (flags, data) = parse_mount(&m); + let ml = &spec.linux.as_ref().unwrap().mount_label; + if m.typ == "cgroup" { + // skip + log::warn!("A feature of cgoup is unimplemented."); + Ok(()) + } else if m.destination == PathBuf::from("/dev") { + mount_to_container( + &m, + rootfs.as_ref(), + flags & !MsFlags::MS_RDONLY, + &data, + &ml, + ) + } else { + mount_to_container(&m, rootfs.as_ref(), flags, &data, &ml) + } + }) + .unwrap() + }) + .collect::>() + .await, + ) + .await?; let olddir = getcwd()?; - chdir(rootfs)?; + chdir(rootfs.as_ref())?; - setup_default_symlinks(rootfs)?; - create_devices(&linux.devices, bind_devices).await?; - setup_ptmx(rootfs)?; + setup_default_symlinks(&rootfs.as_ref())?; + create_devices(&spec.linux.as_ref().unwrap().devices, bind_devices).await?; + setup_ptmx(rootfs.as_ref())?; chdir(&olddir)?; Ok(()) } -fn setup_ptmx(rootfs: &PathBuf) -> Result<()> { +fn setup_ptmx(rootfs: &Path) -> Result<()> { if let Err(e) = remove_file(rootfs.join("dev/ptmx")) { if e.kind() != ::std::io::ErrorKind::NotFound { bail!("could not delete /dev/ptmx") @@ -188,7 +222,7 @@ async fn bind_dev(dev: &LinuxDevice) -> Result<()> { Mode::from_bits_truncate(0o644), )?; close(fd)?; - mount( + nix_mount( Some(&*dev.path), &dev.path[1..], None::<&str>, @@ -227,17 +261,22 @@ fn to_sflag(t: LinuxDeviceType) -> Result { }) } -fn mount_from(m: &Mount, rootfs: &PathBuf, flags: MsFlags, data: &str, label: &str) -> Result<()> { - let d; - if !label.is_empty() && m.typ != "proc" && m.typ != "sysfs" { +fn mount_to_container( + m: &Mount, + rootfs: &PathBuf, + flags: MsFlags, + data: &str, + label: &str, +) -> Result<()> { + let d = if !label.is_empty() && m.typ != "proc" && m.typ != "sysfs" { if data.is_empty() { - d = format! {"context=\"{}\"", label}; + format!("context=\"{}\"", label) } else { - d = format! {"{},context=\"{}\"", data, label}; + format!("{},context=\"{}\"", data, label) } } else { - d = data.to_string(); - } + data.to_string() + }; let dest_for_host = format!( "{}{}", @@ -267,12 +306,13 @@ fn mount_from(m: &Mount, rootfs: &PathBuf, flags: MsFlags, data: &str, label: &s PathBuf::from(&m.source) }; - if let Err(::nix::Error::Sys(errno)) = mount(Some(&*src), dest, Some(&*m.typ), flags, Some(&*d)) + if let Err(::nix::Error::Sys(errno)) = + nix_mount(Some(&*src), dest, Some(&*m.typ), flags, Some(&*d)) { if errno != Errno::EINVAL { bail!("mount of {} failed", m.destination.display()); } - mount(Some(&*src), dest, Some(&*m.typ), flags, Some(data))?; + nix_mount(Some(&*src), dest, Some(&*m.typ), flags, Some(data))?; } if flags.contains(MsFlags::MS_BIND) && flags.intersects( @@ -284,7 +324,7 @@ fn mount_from(m: &Mount, rootfs: &PathBuf, flags: MsFlags, data: &str, label: &s | MsFlags::MS_SLAVE), ) { - mount( + nix_mount( Some(&*dest), &*dest, None::<&str>, diff --git a/src/spec.rs b/src/spec.rs index d733f4a29..593e3a19c 100644 --- a/src/spec.rs +++ b/src/spec.rs @@ -5,7 +5,7 @@ use std::path::PathBuf; use anyhow::Result; use serde::{Deserialize, Serialize}; -#[derive(Serialize, Deserialize, Debug)] +#[derive(Serialize, Deserialize, Debug, Clone)] pub struct Platform { #[serde(default)] pub os: String, @@ -13,7 +13,7 @@ pub struct Platform { pub arch: String, } -#[derive(Default, PartialEq, Serialize, Deserialize, Debug)] +#[derive(Default, PartialEq, Serialize, Deserialize, Debug, Clone)] pub struct Box { #[serde(default)] pub height: u64, @@ -21,7 +21,7 @@ pub struct Box { pub width: u64, } -#[derive(Serialize, Deserialize, Debug)] +#[derive(Serialize, Deserialize, Debug, Clone)] #[serde(rename_all = "camelCase")] pub struct User { #[serde(default)] @@ -34,7 +34,7 @@ pub struct User { pub username: String, } -#[derive(Serialize, Deserialize, Debug)] +#[derive(Serialize, Deserialize, Debug, Clone)] #[serde(rename_all = "camelCase")] pub struct Process { #[serde(default)] @@ -55,7 +55,7 @@ pub struct Process { pub selinux_label: String, } -#[derive(Serialize, Deserialize, Debug)] +#[derive(Serialize, Deserialize, Debug, Clone)] pub struct Root { #[serde(default)] pub path: PathBuf, @@ -63,7 +63,7 @@ pub struct Root { pub readonly: bool, } -#[derive(Serialize, Deserialize, Debug)] +#[derive(Serialize, Deserialize, Debug, Clone)] pub struct Mount { #[serde(default)] pub destination: PathBuf, @@ -103,7 +103,7 @@ impl Default for LinuxDeviceType { } } -#[derive(Serialize, Deserialize, Debug)] +#[derive(Serialize, Deserialize, Debug, Clone)] pub struct LinuxDeviceCgroup { #[serde(default)] pub allow: bool, @@ -115,7 +115,7 @@ pub struct LinuxDeviceCgroup { pub access: String, } -#[derive(Serialize, Deserialize, Debug)] +#[derive(Serialize, Deserialize, Debug, Clone)] pub struct LinuxMemory { pub limit: Option, pub reservation: Option, @@ -126,7 +126,7 @@ pub struct LinuxMemory { pub swappiness: Option, } -#[derive(Serialize, Deserialize, Debug)] +#[derive(Serialize, Deserialize, Debug, Clone)] #[serde(rename_all = "camelCase")] pub struct LinuxCPU { pub shares: Option, @@ -140,13 +140,13 @@ pub struct LinuxCPU { pub mems: String, } -#[derive(Serialize, Deserialize, Debug)] +#[derive(Serialize, Deserialize, Debug, Clone)] pub struct LinuxPids { #[serde(default)] pub limit: i64, } -#[derive(Serialize, Deserialize, Debug)] +#[derive(Serialize, Deserialize, Debug, Clone)] #[serde(rename_all = "camelCase")] pub struct LinuxWeightDevice { #[serde(default)] @@ -157,7 +157,7 @@ pub struct LinuxWeightDevice { pub leaf_weight: Option, } -#[derive(Serialize, Deserialize, Debug)] +#[derive(Serialize, Deserialize, Debug, Clone)] pub struct LinuxThrottleDevice { #[serde(default)] pub major: i64, @@ -167,7 +167,7 @@ pub struct LinuxThrottleDevice { pub rate: u64, } -#[derive(Serialize, Deserialize, Debug)] +#[derive(Serialize, Deserialize, Debug, Clone)] #[serde(rename_all = "camelCase")] pub struct LinuxBlockIO { pub blkio_weight: Option, @@ -184,7 +184,7 @@ pub struct LinuxBlockIO { pub blkio_throttle_write_iops_device: Vec, } -#[derive(Serialize, Deserialize, Debug)] +#[derive(Serialize, Deserialize, Debug, Clone)] #[serde(rename_all = "camelCase")] pub struct LinuxHugepageLimit { #[serde(default)] @@ -193,7 +193,7 @@ pub struct LinuxHugepageLimit { pub limit: i64, } -#[derive(Serialize, Deserialize, Debug)] +#[derive(Serialize, Deserialize, Debug, Clone)] pub struct LinuxInterfacePriority { #[serde(default)] pub name: String, @@ -201,7 +201,7 @@ pub struct LinuxInterfacePriority { pub priority: u32, } -#[derive(Serialize, Deserialize, Debug)] +#[derive(Serialize, Deserialize, Debug, Clone)] #[serde(rename_all = "camelCase")] pub struct LinuxNetwork { #[serde(rename = "classID")] @@ -210,7 +210,7 @@ pub struct LinuxNetwork { pub priorities: Vec, } -#[derive(Default, Serialize, Deserialize, Debug)] +#[derive(Default, Serialize, Deserialize, Debug, Clone)] #[serde(rename_all = "camelCase")] pub struct LinuxResources { #[serde(default)] @@ -240,7 +240,7 @@ pub enum LinuxNamespaceType { Network = 0x40000000, } -#[derive(Serialize, Deserialize, Debug)] +#[derive(Serialize, Deserialize, Debug, Clone)] pub struct LinuxNamespace { #[serde(rename = "type")] pub typ: LinuxNamespaceType, @@ -248,7 +248,7 @@ pub struct LinuxNamespace { pub path: String, } -#[derive(Serialize, Deserialize, Debug)] +#[derive(Serialize, Deserialize, Debug, Clone)] #[serde(rename_all = "camelCase")] pub struct LinuxDevice { #[serde(default)] @@ -311,7 +311,7 @@ pub enum LinuxSeccompOperator { ScmpCmpMaskedEq = 7, } -#[derive(Serialize, Deserialize, Debug)] +#[derive(Serialize, Deserialize, Debug, Clone)] #[serde(rename_all = "camelCase")] pub struct Linux { #[serde(default)] @@ -337,7 +337,7 @@ pub struct Linux { pub mount_label: String, } -#[derive(Serialize, Deserialize, Debug)] +#[derive(Serialize, Deserialize, Debug, Clone)] pub struct Spec { #[serde(default, rename = "ociVersion")] pub version: String,