From 8430e6ba20ea3a2d71d9b22f7ceff6206e5f137f Mon Sep 17 00:00:00 2001 From: Jorge Prendes Date: Wed, 18 Dec 2024 14:35:40 +0000 Subject: [PATCH] Use a zygote process Signed-off-by: Jorge Prendes --- Cargo.lock | 46 +++++++++++++++++++ crates/containerd-shim-wasm/Cargo.toml | 2 + .../containerd-shim-wasm/src/sandbox/cli.rs | 3 ++ .../src/sandbox/instance.rs | 4 +- .../containerd-shim-wasm/src/sandbox/oci.rs | 4 +- .../src/sys/unix/container/instance.rs | 41 ++++++++++++----- .../containerd-shim-wasm/src/test/signals.rs | 34 ++++---------- 7 files changed, 95 insertions(+), 39 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index eeefe3d8d..31b63ef75 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -886,6 +886,7 @@ dependencies = [ "protobuf 3.2.0", "rand", "serde", + "serde_bytes", "serde_json", "sha256", "temp-env", @@ -899,6 +900,7 @@ dependencies = [ "wasmparser 0.220.0", "wat", "windows-sys 0.59.0", + "zygote", ] [[package]] @@ -4409,6 +4411,28 @@ dependencies = [ "syn 2.0.87", ] +[[package]] +name = "rmp" +version = "0.8.14" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "228ed7c16fa39782c3b3468e974aec2795e9089153cd08ee2e9aefb3613334c4" +dependencies = [ + "byteorder", + "num-traits", + "paste", +] + +[[package]] +name = "rmp-serde" +version = "1.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "52e599a477cf9840e92f2cde9a7189e67b42c57532749bf90aea6ec10facd4db" +dependencies = [ + "byteorder", + "rmp", + "serde", +] + [[package]] name = "rust-criu" version = "0.4.0" @@ -4732,6 +4756,15 @@ dependencies = [ "wasm-bindgen", ] +[[package]] +name = "serde_bytes" +version = "0.11.15" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "387cc504cb06bb40a96c8e04e951fe01854cf6bc921053c954e4a606d9675c6a" +dependencies = [ + "serde", +] + [[package]] name = "serde_derive" version = "1.0.216" @@ -7562,3 +7595,16 @@ dependencies = [ "cc", "pkg-config", ] + +[[package]] +name = "zygote" +version = "0.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b96e95c06f0156974c3cbdac79e5454938c1d301fca4224f5e17f8020cf011a0" +dependencies = [ + "libc", + "nix 0.29.0", + "rmp-serde", + "serde", + "thiserror 2.0.7", +] diff --git a/crates/containerd-shim-wasm/Cargo.toml b/crates/containerd-shim-wasm/Cargo.toml index 6f453cf27..431b16f20 100644 --- a/crates/containerd-shim-wasm/Cargo.toml +++ b/crates/containerd-shim-wasm/Cargo.toml @@ -34,6 +34,7 @@ futures = { version = "0.3.30" } wasmparser = { version = "0.220.0" } tokio-stream = { version = "0.1" } sha256 = { workspace = true } +serde_bytes = "0.11" # tracing # note: it's important to keep the version of tracing in sync with tracing-subscriber @@ -59,6 +60,7 @@ tracing-opentelemetry = { version = "0.27", default-features = false, optional = [target.'cfg(unix)'.dependencies] +zygote = { version = "0.1.1" } caps = "0.5" # this must match the version pulled by libcontainer dbus = { version = "0", features = ["vendored"] } diff --git a/crates/containerd-shim-wasm/src/sandbox/cli.rs b/crates/containerd-shim-wasm/src/sandbox/cli.rs index b12a91cd9..63a41a989 100644 --- a/crates/containerd-shim-wasm/src/sandbox/cli.rs +++ b/crates/containerd-shim-wasm/src/sandbox/cli.rs @@ -47,6 +47,9 @@ pub fn shim_main<'a, I>( I: 'static + Instance + Sync + Send, I::Engine: Default, { + #[cfg(unix)] + zygote::Zygote::init(); + #[cfg(feature = "opentelemetry")] if otel_traces_enabled() { // opentelemetry uses tokio, so we need to initialize a runtime diff --git a/crates/containerd-shim-wasm/src/sandbox/instance.rs b/crates/containerd-shim-wasm/src/sandbox/instance.rs index efb596041..3fc8fca1a 100644 --- a/crates/containerd-shim-wasm/src/sandbox/instance.rs +++ b/crates/containerd-shim-wasm/src/sandbox/instance.rs @@ -4,12 +4,14 @@ use std::path::{Path, PathBuf}; use std::time::Duration; use chrono::{DateTime, Utc}; +use serde::ser::SerializeTuple; +use serde::{Deserialize, Serialize}; use super::error::Error; /// Generic options builder for creating a wasm instance. /// This is passed to the `Instance::new` method. -#[derive(Clone)] +#[derive(Clone, Serialize, Deserialize)] pub struct InstanceConfig { /// Optional stdin named pipe path. stdin: PathBuf, diff --git a/crates/containerd-shim-wasm/src/sandbox/oci.rs b/crates/containerd-shim-wasm/src/sandbox/oci.rs index 0d3478007..8817b16cc 100644 --- a/crates/containerd-shim-wasm/src/sandbox/oci.rs +++ b/crates/containerd-shim-wasm/src/sandbox/oci.rs @@ -8,12 +8,14 @@ use std::process; use anyhow::Context; use oci_spec::image::Descriptor; +use serde::{Deserialize, Serialize}; use super::error::Result; -#[derive(Clone, Debug)] +#[derive(Clone, Debug, Serialize, Deserialize)] pub struct WasmLayer { pub config: Descriptor, + #[serde(with = "serde_bytes")] pub layer: Vec, } diff --git a/crates/containerd-shim-wasm/src/sys/unix/container/instance.rs b/crates/containerd-shim-wasm/src/sys/unix/container/instance.rs index 8ef15d43d..f131b3367 100644 --- a/crates/containerd-shim-wasm/src/sys/unix/container/instance.rs +++ b/crates/containerd-shim-wasm/src/sys/unix/container/instance.rs @@ -14,6 +14,7 @@ use nix::errno::Errno; use nix::sys::wait::{waitid, Id as WaitID, WaitPidFlag, WaitStatus}; use nix::unistd::Pid; use oci_spec::image::Platform; +use zygote::{WireError, Zygote}; use crate::container::Engine; use crate::sandbox::async_utils::AmbientRuntime as _; @@ -24,7 +25,7 @@ use crate::sandbox::{ }; use crate::sys::container::executor::Executor; -static DEFAULT_CONTAINER_ROOT_DIR: &str = "/run/containerd"; +const DEFAULT_CONTAINER_ROOT_DIR: &str = "/run/containerd"; pub struct Instance { exit_code: WaitableCell<(u32, DateTime)>, @@ -39,14 +40,10 @@ impl SandboxInstance for Instance { #[cfg_attr(feature = "tracing", tracing::instrument(parent = tracing::Span::current(), skip_all, level = "Info"))] fn new(id: String, cfg: &InstanceConfig) -> Result { let engine = Self::Engine::default(); - let bundle = cfg.get_bundle().to_path_buf(); let namespace = cfg.get_namespace(); - let rootdir = Path::new(DEFAULT_CONTAINER_ROOT_DIR).join(E::name()); - let rootdir = determine_rootdir(&bundle, &namespace, rootdir)?; - let stdio = Stdio::init_from_cfg(cfg)?; // check if container is OCI image with wasm layers and attempt to read the module - let (modules, platform) = containerd::Client::connect(cfg.get_containerd_address().as_str(), &namespace).block_on()? + let (modules, platform) = containerd::Client::connect(cfg.get_containerd_address(), &namespace).block_on()? .load_modules(&id, &engine) .block_on() .unwrap_or_else(|e| { @@ -54,12 +51,32 @@ impl SandboxInstance for Instance { (vec![], Platform::default()) }); - let container = ContainerBuilder::new(id.clone(), SyscallType::Linux) - .with_executor(Executor::new(engine, stdio, modules, platform)) - .with_root_path(rootdir.clone())? - .as_init(&bundle) - .with_systemd(false) - .build()?; + let (root, state) = Zygote::global() + .run( + |(id, cfg, modules, platform)| -> Result<_, WireError> { + let engine = Self::Engine::default(); + let namespace = cfg.get_namespace(); + + let bundle = cfg.get_bundle().to_path_buf(); + let rootdir = Path::new(DEFAULT_CONTAINER_ROOT_DIR).join(E::name()); + let rootdir = determine_rootdir(&bundle, &namespace, rootdir)?; + let stdio = Stdio::init_from_cfg(&cfg)?; + + let Container { root, state } = ContainerBuilder::new(id, SyscallType::Linux) + .with_executor(Executor::new(engine, stdio, modules, platform)) + .with_root_path(rootdir.clone())? + .as_init(&bundle) + .as_sibling(true) + .with_systemd(false) + .build()?; + + // Container is not serializable, but its parts are + Ok((root, state)) + }, + (id.clone(), cfg.clone(), modules, platform), + ) + .map_err(|e| SandboxError::Others(e.to_string()))?; + let container = Container { root, state }; Ok(Self { id, diff --git a/crates/containerd-shim-wasm/src/test/signals.rs b/crates/containerd-shim-wasm/src/test/signals.rs index 6a223a5fa..48197a62e 100644 --- a/crates/containerd-shim-wasm/src/test/signals.rs +++ b/crates/containerd-shim-wasm/src/test/signals.rs @@ -19,14 +19,14 @@ //! remove the ignore attribute from the test. use std::future::pending; +use std::io::{stderr, Write as _}; use std::sync::mpsc::channel; -use std::sync::{Arc, LazyLock}; +use std::sync::Arc; use std::thread::sleep; use std::time::Duration; use anyhow::{bail, Result}; use containerd_shim_wasm_test_modules::HELLO_WORLD; -use tokio::sync::Notify; use crate::container::{Engine, Instance, RuntimeContext}; use crate::sandbox::Stdio; @@ -35,21 +35,6 @@ use crate::testing::WasiTest; #[derive(Clone, Default)] pub struct SomeEngine; -async fn ctrl_c(use_libc: bool) { - static CANCELLATION: LazyLock = LazyLock::new(|| Notify::new()); - - fn on_ctr_c(_: libc::c_int) { - CANCELLATION.notify_waiters(); - } - - if use_libc { - unsafe { libc::signal(libc::SIGINT, on_ctr_c as _) }; - CANCELLATION.notified().await; - } else { - let _ = tokio::signal::ctrl_c().await; - } -} - impl Engine for SomeEngine { fn name() -> &'static str { "some-engine" @@ -63,16 +48,16 @@ impl Engine for SomeEngine { .build()? .block_on(async move { use tokio::time::sleep; - let use_libc = std::env::var("USE_LIBC").unwrap_or_default(); - let use_libc = !use_libc.is_empty() && use_libc != "0"; let signal = async { println!("{name}> waiting for signal!"); - ctrl_c(use_libc).await; + let _ = tokio::signal::ctrl_c().await; println!("{name}> received signal, bye!"); }; let task = async { sleep(Duration::from_millis(10)).await; - eprintln!("{name}> ready"); + // use writeln to avoid output capturing from the + // testing framework + let _ = writeln!(stderr(), "{name}> ready"); pending().await }; tokio::select! { @@ -94,8 +79,9 @@ impl Drop for KillGuard { } #[test] -#[ignore = "this currently fails due to tokio's global state"] fn test_handling_signals() -> Result<()> { + zygote::Zygote::global(); + // use a thread scope to ensure we join all threads at the end std::thread::scope(|s| -> Result<()> { let mut containers = vec![]; @@ -110,7 +96,7 @@ fn test_handling_signals() -> Result<()> { containers.push(Arc::new(container)); } - let guard: Vec<_> = containers.iter().cloned().map(KillGuard).collect(); + let _guard: Vec<_> = containers.iter().cloned().map(KillGuard).collect(); for container in containers.iter() { container.start()?; @@ -150,8 +136,6 @@ fn test_handling_signals() -> Result<()> { assert_eq!(id, i); } - drop(guard); - Ok(()) }) }