diff --git a/src/bin/cachepot-dist/main.rs b/src/bin/cachepot-dist/main.rs index d045f982..647f2409 100644 --- a/src/bin/cachepot-dist/main.rs +++ b/src/bin/cachepot-dist/main.rs @@ -16,11 +16,11 @@ use cachepot::dist::{ SchedulerIncoming, SchedulerOutgoing, SchedulerStatusResult, SubmitToolchainResult, TcCache, Toolchain, ToolchainReader, UpdateJobStateResult, WorkerIncoming, WorkerNonce, WorkerOutgoing, }; +use cachepot::init_logging; use cachepot::util::daemonize; use jsonwebtoken as jwt; use rand::{rngs::OsRng, RngCore}; use std::collections::{btree_map, BTreeMap, HashMap, HashSet}; -use std::env; use std::io; use std::path::Path; use std::path::PathBuf; @@ -388,15 +388,6 @@ async fn run(command: Command) -> Result { } } -fn init_logging() { - if env::var("RUST_LOG").is_ok() { - match env_logger::try_init() { - Ok(_) => (), - Err(e) => panic!("Failed to initalize logging: {:?}", e), - } - } -} - const MAX_PER_CORE_LOAD: f64 = 10f64; const WORKER_REMEMBER_ERROR_TIMEOUT: Duration = Duration::from_secs(300); const UNCLAIMED_PENDING_TIMEOUT: Duration = Duration::from_secs(300); diff --git a/src/commands.rs b/src/commands.rs index d90b5ea0..753a852c 100644 --- a/src/commands.rs +++ b/src/commands.rs @@ -76,7 +76,9 @@ fn run_coordinator_process() -> Result { trace!("run_coordinator_process"); let tempdir = tempfile::Builder::new().prefix("cachepot").tempdir()?; let socket_path = tempdir.path().join("sock"); - let runtime = Runtime::new()?; + let runtime = tokio::runtime::Builder::new_current_thread() + .enable_all() + .build()?; let exe_path = env::current_exe()?; let workdir = exe_path.parent().expect("executable path has no parent?!"); let _child = process::Command::new(&exe_path) diff --git a/src/lib.rs b/src/lib.rs index fd691079..567f98c0 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -62,8 +62,6 @@ pub mod util; use std::env; -const LOGGING_ENV: &str = "CACHEPOT_LOG"; - pub fn main() { init_logging(); std::process::exit(match cmdline::parse() { @@ -88,9 +86,96 @@ pub fn main() { }); } -fn init_logging() { +pub fn init_logging() { + const LOGGING_ENV: &str = "CACHEPOT_LOG"; + + use env_logger::fmt::{Color, Style}; + use log::Level; + use std::io::Write; + + /// The available service type that cachepot can run as. + #[derive(Copy, Clone)] + enum Kind { + /// A service that connects a coordinator and a remote worker to execute + /// a remote build. + DistScheduler, + /// A service that's used to directly perform remote sandbox compilation + DistWorker, + /// A background service used by the cachepot compilation wrapper (client) + /// to either re-use local compilation cache or schedule a remote + /// compilation via a remote scheduler + Coordinator, + /// A wrapper that masquerades as a compiler but spawns (or talks to) a + /// coordinator to perform the actual compilation locally or offload it + /// to a distributed cluster (in both cases we can re-use cached artifacts) + Client, + } + + impl std::fmt::Display for Kind { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!( + f, + "{}", + match self { + Kind::DistScheduler => "dist scheduler", + Kind::DistWorker => "dist worker", + Kind::Coordinator => "coordinator", + Kind::Client => "client", + } + ) + } + } + + // TODO: That's a rough heuristic - share detection logic from cmdline.rs + let kind = if env::var("CACHEPOT_START_COORDINATOR").is_ok() + || env::args_os().any(|a| a == "--start-coordinator") + { + Kind::Coordinator + } else { + match std::env::args().nth(1).as_deref() { + Some("scheduler") => Kind::DistScheduler, + Some("worker") => Kind::DistWorker, + _ => Kind::Client, + } + }; + + let color_for_kind = |kind| match kind { + Kind::DistScheduler => Color::Yellow, + Kind::DistWorker => Color::Cyan, + Kind::Coordinator => Color::Blue, + Kind::Client => Color::Green, + }; + + let default_level_style = |mut level_style: Style, level: Level| { + match level { + Level::Trace => level_style.set_color(Color::Cyan), + Level::Debug => level_style.set_color(Color::Blue), + Level::Info => level_style.set_color(Color::Green), + Level::Warn => level_style.set_color(Color::Yellow), + Level::Error => level_style.set_color(Color::Red).set_bold(true), + }; + level_style + }; + if env::var(LOGGING_ENV).is_ok() { - match env_logger::Builder::from_env(LOGGING_ENV).try_init() { + let mut builder = env_logger::Builder::from_env(LOGGING_ENV); + // That's mostly what env_logger does by default but we also attach the + // PID and kind of the cachepot executable due to its multi-process nature + builder.format(move |f, record| { + write!( + f, + "{}", + chrono::Local::now().format("%Y-%m-%dT%H:%M:%S%.3f"), + )?; + let style = default_level_style(f.style(), record.level()); + write!(f, " {:<5}", style.value(record.level()))?; + write!(f, " [PID {}]", std::process::id())?; + let mut style = f.style(); + style.set_color(color_for_kind(kind)); + write!(f, " {:>14}", style.value(kind))?; + writeln!(f, " {}", record.args()) + }); + match builder.try_init() { Ok(_) => (), Err(e) => panic!("Failed to initalize logging: {:?}", e), } diff --git a/systemd/config/scheduler.conf b/systemd/config/scheduler.conf index b78f78e6..1e362f76 100644 --- a/systemd/config/scheduler.conf +++ b/systemd/config/scheduler.conf @@ -1,6 +1,6 @@ public_addr = "http://127.0.0.1:10600" -[server_auth] +[worker_auth] type = "token" token = "server_xxxxxxxxxxxxxx" diff --git a/tests/harness/mod.rs b/tests/harness/mod.rs index c65934eb..a2305e16 100644 --- a/tests/harness/mod.rs +++ b/tests/harness/mod.rs @@ -3,6 +3,9 @@ use cachepot::config::{HTTPUrl, WorkerUrl}; use cachepot::coordinator::CoordinatorInfo; use cachepot::dist::{self, SchedulerStatusResult}; use cachepot::util::fs; +#[cfg(feature = "dist-worker")] +use nix::unistd::Pid; +use std::collections::HashMap; use std::env; use std::io::Write; use std::net::{IpAddr, SocketAddr}; @@ -35,6 +38,24 @@ const SERVER_PORT: u16 = 12345; // arbitrary const TC_CACHE_SIZE: u64 = 1024 * 1024 * 1024; // 1 gig +/// Whether the cachepot services created as a part of the test should be +/// spawned as a child process directly or ran inside of a Docker container. +/// Containerization allows for more flexibility (e.g. the running user can be +/// root) may require some additional setup beforehand. +enum ExecStrategy { + /// Cachepot services will be ran inside of a Docker container. + Docker, + /// Cachepot services will be spawned as child processes. + Spawn, +} + +fn exec_strategy() -> ExecStrategy { + match env::var("DIST_EXEC_STRATEGY").as_deref() { + Ok("spawn") => ExecStrategy::Spawn, + _ => ExecStrategy::Docker, + } +} + pub fn start_local_daemon(cfg_path: &Path, cached_cfg_path: &Path) { // Don't run this with run() because on Windows `wait_with_output` // will hang because the internal server process is not detached. @@ -183,6 +204,7 @@ fn create_server_token(worker_url: WorkerUrl, auth_token: &str) -> String { } #[cfg(feature = "dist-worker")] +#[derive(Debug)] pub enum ServerHandle { Container { cid: String, @@ -192,38 +214,58 @@ pub enum ServerHandle { handle: JoinHandle<()>, url: HTTPUrl, }, + Process { + pid: Pid, + url: HTTPUrl, + }, +} + +#[cfg(feature = "dist-worker")] +impl ServerHandle { + fn url(&self) -> &HTTPUrl { + match self { + ServerHandle::Container { url, .. } + | ServerHandle::Process { url, .. } + | ServerHandle::AsyncTask { url, .. } => url, + } + } } +#[cfg(feature = "dist-worker")] +pub type ServerId = usize; #[cfg(feature = "dist-worker")] pub struct DistSystem { cachepot_dist: PathBuf, tmpdir: PathBuf, - scheduler_name: Option, server_names: Vec, - server_handles: Vec>, + scheduler: Option, + server_handles: HashMap, client: reqwest::Client, + servers_counter: usize, } #[cfg(feature = "dist-worker")] impl DistSystem { pub fn new(cachepot_dist: &Path, tmpdir: &Path) -> Self { // Make sure the docker image is available, building it if necessary - let mut child = Command::new("docker") - .args(&["build", "-q", "-t", DIST_IMAGE, "-"]) - .stdin(Stdio::piped()) - .stdout(Stdio::piped()) - .stderr(Stdio::piped()) - .spawn() - .unwrap(); - child - .stdin - .as_mut() - .unwrap() - .write_all(DIST_DOCKERFILE.as_bytes()) - .unwrap(); - let output = child.wait_with_output().unwrap(); - check_output(&output); + if let ExecStrategy::Docker = exec_strategy() { + let mut child = Command::new("docker") + .args(&["build", "-q", "-t", DIST_IMAGE, "-"]) + .stdin(Stdio::piped()) + .stdout(Stdio::piped()) + .stderr(Stdio::piped()) + .spawn() + .unwrap(); + child + .stdin + .as_mut() + .unwrap() + .write_all(DIST_DOCKERFILE.as_bytes()) + .unwrap(); + let output = child.wait_with_output().unwrap(); + check_output(&output); + } let tmpdir = tmpdir.join("distsystem"); fs::create_dir(&tmpdir).unwrap(); @@ -234,18 +276,17 @@ impl DistSystem { cachepot_dist: cachepot_dist.to_owned(), tmpdir, - scheduler_name: None, + scheduler: None, server_names: vec![], - server_handles: vec![], + server_handles: HashMap::default(), client, + servers_counter: 0, } } pub async fn add_scheduler(&mut self) { let scheduler_cfg_relpath = "scheduler-cfg.json"; let scheduler_cfg_path = self.tmpdir.join(scheduler_cfg_relpath); - let scheduler_cfg_container_path = - Path::new(CONFIGS_CONTAINER_PATH).join(scheduler_cfg_relpath); let scheduler_cfg = cachepot_scheduler_cfg(); fs::File::create(&scheduler_cfg_path) .unwrap() @@ -253,45 +294,76 @@ impl DistSystem { .unwrap(); // Create the scheduler - let scheduler_name = make_container_name("scheduler"); - let output = Command::new("docker") - .args(&[ - "run", - "--name", - &scheduler_name, - "-e", - "CACHEPOT_NO_DAEMON=1", - "-e", - "RUST_LOG=cachepot=trace", - "-e", - "RUST_BACKTRACE=1", - "-v", - &format!("{}:/cachepot-dist", self.cachepot_dist.to_str().unwrap()), - "-v", - &format!( - "{}:{}", - self.tmpdir.to_str().unwrap(), - CONFIGS_CONTAINER_PATH - ), - "-d", - DIST_IMAGE, - "bash", - "-c", - &format!( - r#" - set -o errexit && - exec /cachepot-dist scheduler --config {cfg} - "#, - cfg = scheduler_cfg_container_path.to_str().unwrap() - ), - ]) - .output() - .unwrap(); - self.scheduler_name = Some(scheduler_name); + let scheduler = if let ExecStrategy::Docker = exec_strategy() { + let scheduler_cfg_container_path = + Path::new(CONFIGS_CONTAINER_PATH).join(scheduler_cfg_relpath); + let scheduler_name = make_container_name("scheduler"); + let output = Command::new("docker") + .args(&[ + "run", + "--name", + &scheduler_name, + "-e", + "CACHEPOT_NO_DAEMON=1", + "-e", + "RUST_LOG=cachepot=trace", + "-e", + "CACHEPOT_LOG=trace", + "-e", + "RUST_BACKTRACE=1", + "-v", + &format!("{}:/cachepot-dist", self.cachepot_dist.to_str().unwrap()), + "-v", + &format!( + "{}:{}", + self.tmpdir.to_str().unwrap(), + CONFIGS_CONTAINER_PATH + ), + "-d", + DIST_IMAGE, + "bash", + "-c", + &format!( + r#" + set -o errexit && + exec /cachepot-dist scheduler --config {cfg} + "#, + cfg = scheduler_cfg_container_path.to_str().unwrap() + ), + ]) + .output() + .unwrap(); - check_output(&output); + check_output(&output); + let ip = self.container_ip(&scheduler_name); + let url = format!("http://{}:{}", ip, SCHEDULER_PORT); + let scheduler_url = reqwest::Url::parse(&url).unwrap(); + ServerHandle::Container { + cid: scheduler_name, + url: HTTPUrl::from_url(scheduler_url.clone()), + } + } else { + let mut cmd = Command::new(cachepot_dist_path()); + cmd.env("CACHEPOT_NO_DAEMON", "1") + .env("RUST_BACKTRACE", "1") + .arg("scheduler") + .arg("--config") + .arg(scheduler_cfg_path); + let mut child = cmd.spawn().unwrap(); + eprintln!("\nSpawned child scheduler with PID: {}", child.id()); + match child.try_wait() { + Ok(None) => {} + _ => panic!("Couldn't spawn scheduler"), + } - let scheduler_url = self.scheduler_url(); + ServerHandle::Process { + pid: Pid::from_raw(child.id().try_into().unwrap()), + url: HTTPUrl::from_str(&format!("http://127.0.0.1:{}", SCHEDULER_PORT)).unwrap(), + } + }; + + let scheduler_url = scheduler.url().clone(); + self.scheduler = Some(scheduler); wait_for_http( &self.client, @@ -317,6 +389,7 @@ impl DistSystem { ) { break Ok(()); } else { + tokio::time::sleep(Duration::from_millis(100)).await; } } _ = tokio::time::sleep(Duration::from_millis(100)) => {} @@ -327,67 +400,106 @@ impl DistSystem { wait_for(status_fut, MAX_STARTUP_WAIT).await; } - pub async fn add_server(&mut self) -> ServerHandle { + pub async fn add_server(&mut self) -> ServerId { let server_cfg_relpath = format!("server-cfg-{}.json", self.server_names.len()); let server_cfg_path = self.tmpdir.join(&server_cfg_relpath); - let server_cfg_container_path = Path::new(CONFIGS_CONTAINER_PATH).join(server_cfg_relpath); - let server_name = make_container_name("server"); - let output = Command::new("docker") - .args(&[ - "run", - // Important for the bubblewrap builder - "--privileged", - "--name", - &server_name, - "-e", - "RUST_LOG=cachepot=trace", - "-e", - "RUST_BACKTRACE=1", - "-v", - &format!("{}:/cachepot-dist", self.cachepot_dist.to_str().unwrap()), - "-v", - &format!( - "{}:{}", - self.tmpdir.to_str().unwrap(), - CONFIGS_CONTAINER_PATH - ), - "-d", - DIST_IMAGE, - "bash", - "-c", - &format!( - r#" - set -o errexit && - while [ ! -f {cfg}.ready ]; do sleep 0.1; done && - exec /cachepot-dist worker --config {cfg} - "#, - cfg = server_cfg_container_path.to_str().unwrap() - ), - ]) - .output() - .unwrap(); - self.server_names.push(server_name.clone()); + let handle = if let ExecStrategy::Docker = exec_strategy() { + let server_cfg_container_path = + Path::new(CONFIGS_CONTAINER_PATH).join(server_cfg_relpath); + + let server_name = make_container_name("server"); + let output = Command::new("docker") + .args(&[ + "run", + // Important for the bubblewrap builder + "--privileged", + "--name", + &server_name, + "-e", + "RUST_LOG=cachepot=trace", + "-e", + "CACHEPOT_LOG=trace", + "-e", + "RUST_BACKTRACE=1", + "-v", + &format!("{}:/cachepot-dist", self.cachepot_dist.to_str().unwrap()), + "-v", + &format!( + "{}:{}", + self.tmpdir.to_str().unwrap(), + CONFIGS_CONTAINER_PATH + ), + "-d", + DIST_IMAGE, + "bash", + "-c", + &format!( + r#" + set -o errexit && + while [ ! -f {cfg}.ready ]; do sleep 0.1; done && + exec /cachepot-dist worker --config {cfg} + "#, + cfg = server_cfg_container_path.to_str().unwrap() + ), + ]) + .output() + .unwrap(); + check_output(&output); - check_output(&output); + let server_ip = self.container_ip(&server_name); + let server_cfg = cachepot_server_cfg(&self.tmpdir, self.scheduler_url(), server_ip); + fs::File::create(&server_cfg_path) + .unwrap() + .write_all(&serde_json::to_vec(&server_cfg).unwrap()) + .unwrap(); + fs::File::create(format!("{}.ready", server_cfg_path.to_str().unwrap())).unwrap(); + let url = HTTPUrl::from_url( + reqwest::Url::parse(&format!("https://{}:{}", server_ip, SERVER_PORT)).unwrap(), + ); + ServerHandle::Container { + cid: server_name, + url, + } + } else { + let server_ip = std::net::Ipv4Addr::LOCALHOST; + let server_cfg = cachepot::config::worker::Config { + builder: cachepot::config::worker::BuilderType::Overlay { + build_dir: self.tmpdir.join("server-builddir"), + bwrap_path: PathBuf::from("/usr/bin/bwrap"), + }, + cache_dir: self.tmpdir.join("server-cache"), + ..cachepot_server_cfg(&self.tmpdir, self.scheduler_url(), server_ip.into()) + }; + fs::File::create(&server_cfg_path) + .unwrap() + .write_all(&serde_json::to_vec(&server_cfg).unwrap()) + .unwrap(); - let server_ip = self.container_ip(&server_name); - let server_cfg = cachepot_server_cfg(&self.tmpdir, self.scheduler_url(), server_ip); - fs::File::create(&server_cfg_path) - .unwrap() - .write_all(&serde_json::to_vec(&server_cfg).unwrap()) - .unwrap(); - fs::File::create(format!("{}.ready", server_cfg_path.to_str().unwrap())).unwrap(); + let mut cmd = Command::new(cachepot_dist_path()); + cmd.env("CACHEPOT_NO_DAEMON", "1") + .env("RUST_BACKTRACE", "1") + .arg("worker") + .arg("--config") + .arg(server_cfg_path); + let mut child = cmd.spawn().unwrap(); + eprintln!("\nSpawned child build server with PID: {}", child.id()); + match child.try_wait() { + Ok(None) => {} + _ => panic!("Couldn't spawn scheduler"), + } - let url = HTTPUrl::from_url( - reqwest::Url::parse(&format!("https://{}:{}", server_ip, SERVER_PORT)).unwrap(), - ); - let handle = ServerHandle::Container { - cid: server_name, - url, + ServerHandle::Process { + pid: Pid::from_raw(child.id().try_into().unwrap()), + url: HTTPUrl::from_str(&format!("https://{}:{}", server_ip, SERVER_PORT)).unwrap(), + } }; - self.wait_server_ready(&handle).await; - handle + + self.wait_server_ready(handle.url().clone()).await; + self.server_handles.insert(self.servers_counter, handle); + let id = self.servers_counter; + self.servers_counter += 1; + id } pub async fn add_custom_server( @@ -414,12 +526,19 @@ impl DistSystem { let url = HTTPUrl::from_url(reqwest::Url::parse(&format!("https://{}", server_addr)).unwrap()); + self.wait_server_ready(url.clone()).await; let handle = ServerHandle::AsyncTask { handle, url }; - self.wait_server_ready(&handle).await; handle } - pub async fn restart_server(&mut self, handle: &ServerHandle) { + pub async fn restart_server(&mut self, id: &ServerId) { + let handle = match self.server_handles.get(id) { + Some(handle) => handle, + None => { + error!("ServerId {} is unknown", id); + return; + } + }; match handle { ServerHandle::Container { cid, url: _ } => { let output = Command::new("docker") @@ -430,17 +549,19 @@ impl DistSystem { } ServerHandle::AsyncTask { handle: _, url: _ } => { // TODO: pretty easy, just no need yet - panic!("restart not yet implemented for pids") + } + ServerHandle::Process { pid: _, url: _ } => { + // NOTE: Ideally we could restructure servers to listen to SIGHUP + // and reload configuration/restart the servers + // For now, let's just ignore it and keep chugging along + // TODO: restart the process? } } - self.wait_server_ready(handle).await + let url = handle.url().clone(); + self.wait_server_ready(url).await } - pub async fn wait_server_ready(&mut self, handle: &ServerHandle) { - let url = match handle { - ServerHandle::Container { cid: _, url } - | ServerHandle::AsyncTask { handle: _, url } => url.clone(), - }; + pub async fn wait_server_ready(&mut self, url: HTTPUrl) { wait_for_http( &self.client, url, @@ -464,6 +585,7 @@ impl DistSystem { ) { break Ok(()); } else { + tokio::time::sleep(Duration::from_millis(100)).await; } } _ = tokio::time::sleep(Duration::from_millis(100)) => {} @@ -475,9 +597,7 @@ impl DistSystem { } pub fn scheduler_url(&self) -> HTTPUrl { - let ip = self.container_ip(self.scheduler_name.as_ref().unwrap()); - let url = format!("http://{}:{}", ip, SCHEDULER_PORT); - HTTPUrl::from_url(reqwest::Url::parse(&url).unwrap()) + self.scheduler.as_ref().unwrap().url().clone() } async fn scheduler_status(&self) -> SchedulerStatusResult { @@ -506,12 +626,20 @@ impl DistSystem { // The interface that the host sees on the docker network (typically 'docker0') fn host_interface_ip(&self) -> IpAddr { + let scheduler_name = match self.scheduler.as_ref().unwrap() { + ServerHandle::Container { cid, .. } => cid, + ServerHandle::Process { .. } => match exec_strategy() { + ExecStrategy::Spawn => return std::net::Ipv4Addr::LOCALHOST.into(), + ExecStrategy::Docker => unreachable!("We only spawn schedulers via docker"), + }, + ServerHandle::AsyncTask { handle: _, url: _ } => todo!(), + }; let output = Command::new("docker") .args(&[ "inspect", "--format", "{{ .NetworkSettings.Gateway }}", - self.scheduler_name.as_ref().unwrap(), + scheduler_name, ]) .output() .unwrap(); @@ -545,37 +673,33 @@ impl Drop for DistSystem { let mut logs = vec![]; let mut outputs = vec![]; - if let Some(scheduler_name) = self.scheduler_name.as_ref() { + let handles = self.scheduler.iter().chain(self.server_handles.values()); + let container_names = handles.filter_map(|s| match s { + ServerHandle::Container { cid, .. } => Some(cid), + _ => None, + }); + for container_name in container_names { droperr!(Command::new("docker") - .args(&["logs", scheduler_name]) + .args(&["logs", container_name]) .output() - .map(|o| logs.push((scheduler_name, o)))); + .map(|o| logs.push((container_name, o)))); droperr!(Command::new("docker") - .args(&["kill", scheduler_name]) + .args(&["kill", container_name]) .output() - .map(|o| outputs.push((scheduler_name, o)))); + .map(|o| outputs.push((container_name, o)))); droperr!(Command::new("docker") - .args(&["rm", "-f", scheduler_name]) + .args(&["rm", "-f", container_name]) .output() - .map(|o| outputs.push((scheduler_name, o)))); - } - for server_name in self.server_names.iter() { - droperr!(Command::new("docker") - .args(&["logs", server_name]) - .output() - .map(|o| logs.push((server_name, o)))); - droperr!(Command::new("docker") - .args(&["kill", server_name]) - .output() - .map(|o| outputs.push((server_name, o)))); - droperr!(Command::new("docker") - .args(&["rm", "-f", server_name]) - .output() - .map(|o| outputs.push((server_name, o)))); + .map(|o| outputs.push((container_name, o)))); } // TODO: they will die with the runtime, but correctly waiting for them // may be only possible when we have async Drop. - for _handle in self.server_handles.iter() {} + for handle in self.scheduler.iter().chain(self.server_handles.values()) { + if let ServerHandle::Process { pid, .. } = handle { + nix::sys::signal::kill(*pid, nix::sys::signal::Signal::SIGTERM).unwrap(); + let _status = nix::sys::wait::waitpid(*pid, None).unwrap(); + } + } for ( container,