Skip to content
This repository has been archived by the owner on Feb 28, 2023. It is now read-only.

Commit

Permalink
rouille -> warp (#131)
Browse files Browse the repository at this point in the history
* Initial commit

* A bit more errors

* clippy

* more clippy

* No default dist-server

* more async cod

* remove all blocking reqwests

* fmt

* Fix content types and test harness

* maybe green ci

* bump timeout a bit and create a client only once

* remove unused code

* make thiserror optional

* bring back void

* do not change CI docker script

* fork is not async friendly

* async bind

* fmt

* more fixes

* async handle_assign_job

* async basic_compile

* minor fixes

* fmt

* Adds a comment on native-tls
  • Loading branch information
montekki authored Jan 10, 2022
1 parent 1e7729c commit 943fc53
Show file tree
Hide file tree
Showing 14 changed files with 2,012 additions and 1,668 deletions.
1,599 changes: 669 additions & 930 deletions Cargo.lock

Large diffs are not rendered by default.

8 changes: 5 additions & 3 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ percent-encoding = { version = "2", optional = true }
rand = "0.8"
redis = { version = "0.21", optional = true, default-features = false, features = ["aio", "tokio-comp"] }
regex = "1"
reqwest = { version = "0.11", features = ["json", "blocking"], optional = true }
reqwest = { version = "0.11.7", features = ["json", "native-tls"], optional = true }
retry = "1"
ring = { version = "0.16", optional = true, features = ["std"] }
rusoto_core = { version = "0.47", optional = true }
Expand Down Expand Up @@ -95,15 +95,17 @@ zip = { version = "0.5", default-features = false }
zstd = "0.6"
structopt = "0.3.25"
strum = { version = "0.23.0", features = ["derive"] }
native-tls = "0.2.8"

# dist-server only
crossbeam-utils = { version = "0.8", optional = true }
libmount = { version = "0.1.10", optional = true }
nix = { version = "0.19", optional = true }
rouille = { version = "3", optional = true, default-features = false, features = ["ssl"] }
syslog = { version = "5", optional = true }
void = { version = "1", optional = true }
version-compare = { version = "0.0.11", optional = true }
warp = { version = "0.3.2", optional = true, features = ["tls"] }
thiserror = { version = "1.0.30", optional = true }

# test only
openssl = { version = "0.10", optional = true }
Expand Down Expand Up @@ -145,7 +147,7 @@ unstable = []
# Enables distributed support in the cachepot client
dist-client = ["ar", "flate2", "hyper", "hyperx", "reqwest/stream", "url", "sha2", "tokio/fs"]
# Enables the cachepot-dist binary
dist-server = ["chrono", "crossbeam-utils", "jsonwebtoken", "flate2", "hyperx", "libmount", "nix", "reqwest", "rouille", "sha2", "syslog", "void", "version-compare"]
dist-server = ["chrono", "crossbeam-utils", "jsonwebtoken", "flate2", "hyperx", "libmount", "nix", "reqwest", "sha2", "syslog", "void", "version-compare", "warp", "thiserror"]
# Enables dist tests with external requirements
dist-tests = ["dist-client", "dist-server"]
# Run JWK token crypto against openssl ref impl
Expand Down
12 changes: 6 additions & 6 deletions src/bin/cachepot-dist/build.rs
Original file line number Diff line number Diff line change
Expand Up @@ -484,7 +484,7 @@ fn docker_diff(cid: &str) -> Result<String> {
// Force remove the container
fn docker_rm(cid: &str) -> Result<()> {
Command::new("docker")
.args(&["rm", "-f", &cid])
.args(&["rm", "-f", cid])
.check_run()
.context("Failed to force delete container")
}
Expand Down Expand Up @@ -611,11 +611,11 @@ impl DockerBuilder {
fn clean_container(&self, cid: &str) -> Result<()> {
// Clean up any running processes
Command::new("docker")
.args(&["exec", &cid, "/busybox", "kill", "-9", "-1"])
.args(&["exec", cid, "/busybox", "kill", "-9", "-1"])
.check_run()
.context("Failed to run kill on all processes in container")?;

let diff = docker_diff(&cid)?;
let diff = docker_diff(cid)?;
if !diff.is_empty() {
let mut lastpath = None;
for line in diff.split(|c| c == '\n') {
Expand Down Expand Up @@ -650,15 +650,15 @@ impl DockerBuilder {
}
lastpath = Some(changepath);
if let Err(e) = Command::new("docker")
.args(&["exec", &cid, "/busybox", "rm", "-rf", changepath])
.args(&["exec", cid, "/busybox", "rm", "-rf", changepath])
.check_run()
{
// We do a final check anyway, so just continue
warn!("Failed to remove added path in a container: {}", e)
}
}

let newdiff = docker_diff(&cid)?;
let newdiff = docker_diff(cid)?;
// See note about changepath == "/tmp" above
if !newdiff.is_empty() && newdiff != "C /tmp" {
bail!(
Expand Down Expand Up @@ -821,7 +821,7 @@ impl DockerBuilder {
cmd.arg("-e").arg(env);
}
let shell_cmd = "cd \"$1\" && shift && exec \"$@\"";
cmd.args(&[cid, "/busybox", "sh", "-c", &shell_cmd]);
cmd.args(&[cid, "/busybox", "sh", "-c", shell_cmd]);
cmd.arg(&executable);
cmd.arg(cwd);
cmd.arg(executable);
Expand Down
62 changes: 35 additions & 27 deletions src/bin/cachepot-dist/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ extern crate log;
extern crate serde_derive;

use anyhow::{bail, Context, Error, Result};
use async_trait::async_trait;
use cachepot::config::{
scheduler as scheduler_config, server as server_config, INSECURE_DIST_CLIENT_TOKEN,
};
Expand All @@ -27,7 +28,7 @@ use std::path::Path;
use std::path::PathBuf;
use std::str::FromStr;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::{Mutex, MutexGuard};
use std::sync::{Arc, Mutex, MutexGuard};
use std::time::{Duration, Instant};
use structopt::StructOpt;
use syslog::Facility;
Expand Down Expand Up @@ -93,6 +94,7 @@ struct GenerateJwtHS256ServerToken {
}

#[derive(StructOpt)]
#[allow(clippy::enum_variant_names)]
enum AuthSubcommand {
GenerateSharedToken(GenerateSharedToken),
GenerateJwtHS256Key,
Expand All @@ -101,11 +103,12 @@ enum AuthSubcommand {

// Only supported on x86_64 Linux machines
#[cfg(all(target_os = "linux", target_arch = "x86_64"))]
fn main() {
#[tokio::main]
async fn main() {
init_logging();
std::process::exit({
let cmd = Command::from_args();
match run(cmd) {
match run(cmd).await {
Ok(s) => s,
Err(e) => {
eprintln!("cachepot-dist: error: {}", e);
Expand Down Expand Up @@ -152,10 +155,10 @@ fn create_jwt_server_token(
key: &[u8],
) -> Result<String> {
let key = jwt::EncodingKey::from_secret(key);
jwt::encode(&header, &ServerJwt { server_id }, &key).map_err(Into::into)
jwt::encode(header, &ServerJwt { server_id }, &key).map_err(Into::into)
}
fn dangerous_insecure_extract_jwt_server_token(server_token: &str) -> Option<ServerId> {
jwt::dangerous_insecure_decode::<ServerJwt>(&server_token)
jwt::dangerous_insecure_decode::<ServerJwt>(server_token)
.map(|res| res.claims.server_id)
.ok()
}
Expand All @@ -170,7 +173,7 @@ fn check_jwt_server_token(
.ok()
}

fn run(command: Command) -> Result<i32> {
async fn run(command: Command) -> Result<i32> {
match command {
Command::Auth(AuthSubcommand::GenerateJwtHS256Key) => {
let num_bytes = 256 / 8;
Expand Down Expand Up @@ -203,9 +206,7 @@ fn run(command: Command) -> Result<i32> {
bail!("Could not read config");
}
} else {
secret_key
.expect("missing secret-key in parsed subcommand")
.to_owned()
secret_key.expect("missing secret-key in parsed subcommand")
};

let secret_key = base64::decode_config(&secret_key, base64::URL_SAFE_NO_PAD)?;
Expand Down Expand Up @@ -251,6 +252,7 @@ fn run(command: Command) -> Result<i32> {
jwks_url,
} => Box::new(
token_check::ValidJWTCheck::new(audience, issuer, &jwks_url)
.await
.context("Failed to create a checker for valid JWTs")?,
),
scheduler_config::ClientAuth::Mozilla { required_groups } => {
Expand All @@ -265,10 +267,10 @@ fn run(command: Command) -> Result<i32> {
scheduler_config::ServerAuth::Insecure => {
warn!("Scheduler starting with DANGEROUSLY_INSECURE server authentication");
let token = INSECURE_DIST_SERVER_TOKEN;
Box::new(move |server_token| check_server_token(server_token, &token))
Arc::new(move |server_token| check_server_token(server_token, token))
}
scheduler_config::ServerAuth::Token { token } => {
Box::new(move |server_token| check_server_token(server_token, &token))
Arc::new(move |server_token| check_server_token(server_token, &token))
}
scheduler_config::ServerAuth::JwtHS256 { secret_key } => {
let secret_key = base64::decode_config(&secret_key, base64::URL_SAFE_NO_PAD)
Expand All @@ -285,7 +287,7 @@ fn run(command: Command) -> Result<i32> {
sub: None,
algorithms: vec![jwt::Algorithm::HS256],
};
Box::new(move |server_token| {
Arc::new(move |server_token| {
check_jwt_server_token(server_token, &secret_key, &validation)
})
}
Expand All @@ -299,7 +301,7 @@ fn run(command: Command) -> Result<i32> {
check_client_auth,
check_server_auth,
);
void::unreachable(http_scheduler.start()?);
void::unreachable(http_scheduler.start().await?);
}

Command::Server(ServerSubcommand { config, syslog }) => {
Expand Down Expand Up @@ -337,7 +339,7 @@ fn run(command: Command) -> Result<i32> {
let scheduler_auth = match scheduler_auth {
server_config::SchedulerAuth::Insecure => {
warn!("Server starting with DANGEROUSLY_INSECURE scheduler authentication");
create_server_token(server_id, &INSECURE_DIST_SERVER_TOKEN)
create_server_token(server_id, INSECURE_DIST_SERVER_TOKEN)
}
server_config::SchedulerAuth::Token { token } => {
create_server_token(server_id, &token)
Expand Down Expand Up @@ -366,7 +368,7 @@ fn run(command: Command) -> Result<i32> {
server,
)
.context("Failed to create cachepot HTTP server instance")?;
void::unreachable(http_server.start()?)
void::unreachable(http_server.start().await?)
}
}
}
Expand Down Expand Up @@ -464,8 +466,9 @@ impl Scheduler {
}
}

#[async_trait]
impl SchedulerIncoming for Scheduler {
fn handle_alloc_job(
async fn handle_alloc_job(
&self,
requester: &dyn SchedulerOutgoing,
tc: Toolchain,
Expand Down Expand Up @@ -564,6 +567,7 @@ impl SchedulerIncoming for Scheduler {
need_toolchain,
} = requester
.do_assign_job(server_id, job_id, tc, auth.clone())
.await
.with_context(|| {
// LOCKS
let mut servers = self.servers.lock().unwrap();
Expand Down Expand Up @@ -684,7 +688,7 @@ impl SchedulerIncoming for Scheduler {
}
Some(ref mut details) if details.server_nonce != server_nonce => {
for job_id in details.jobs_assigned.iter() {
if jobs.remove(&job_id).is_none() {
if jobs.remove(job_id).is_none() {
warn!(
"Unknown job found when replacing server {}: {}",
server_id.addr(),
Expand Down Expand Up @@ -782,7 +786,7 @@ impl SchedulerIncoming for Scheduler {
pub struct Server {
builder: Box<dyn BuilderIncoming>,
cache: Mutex<TcCache>,
job_toolchains: Mutex<HashMap<JobId, Toolchain>>,
job_toolchains: tokio::sync::Mutex<HashMap<JobId, Toolchain>>,
}

impl Server {
Expand All @@ -796,18 +800,19 @@ impl Server {
Ok(Server {
builder,
cache: Mutex::new(cache),
job_toolchains: Mutex::new(HashMap::new()),
job_toolchains: tokio::sync::Mutex::new(HashMap::new()),
})
}
}

#[async_trait]
impl ServerIncoming for Server {
fn handle_assign_job(&self, job_id: JobId, tc: Toolchain) -> Result<AssignJobResult> {
async fn handle_assign_job(&self, job_id: JobId, tc: Toolchain) -> Result<AssignJobResult> {
let need_toolchain = !self.cache.lock().unwrap().contains_toolchain(&tc);
assert!(self
.job_toolchains
.lock()
.unwrap()
.await
.insert(job_id, tc)
.is_none());
let state = if need_toolchain {
Expand All @@ -821,18 +826,19 @@ impl ServerIncoming for Server {
need_toolchain,
})
}
fn handle_submit_toolchain(
async fn handle_submit_toolchain(
&self,
requester: &dyn ServerOutgoing,
job_id: JobId,
tc_rdr: ToolchainReader,
tc_rdr: ToolchainReader<'_>,
) -> Result<SubmitToolchainResult> {
requester
.do_update_job_state(job_id, JobState::Ready)
.await
.context("Updating job state failed")?;
// TODO: need to lock the toolchain until the container has started
// TODO: can start prepping container
let tc = match self.job_toolchains.lock().unwrap().get(&job_id).cloned() {
let tc = match self.job_toolchains.lock().await.get(&job_id).cloned() {
Some(tc) => tc,
None => return Ok(SubmitToolchainResult::JobNotFound),
};
Expand All @@ -848,18 +854,19 @@ impl ServerIncoming for Server {
.map(|_| SubmitToolchainResult::Success)
.unwrap_or(SubmitToolchainResult::CannotCache))
}
fn handle_run_job(
async fn handle_run_job(
&self,
requester: &dyn ServerOutgoing,
job_id: JobId,
command: CompileCommand,
outputs: Vec<String>,
inputs_rdr: InputsReader,
inputs_rdr: InputsReader<'_>,
) -> Result<RunJobResult> {
requester
.do_update_job_state(job_id, JobState::Started)
.await
.context("Updating job state failed")?;
let tc = self.job_toolchains.lock().unwrap().remove(&job_id);
let tc = self.job_toolchains.lock().await.remove(&job_id);
let res = match tc {
None => Ok(RunJobResult::JobNotFound),
Some(tc) => {
Expand All @@ -877,6 +884,7 @@ impl ServerIncoming for Server {
};
requester
.do_update_job_state(job_id, JobState::Complete)
.await
.context("Updating job state failed")?;
res
}
Expand Down
Loading

0 comments on commit 943fc53

Please sign in to comment.