From 927fe00d466ce8a61c37e48c236ac5fe82cb6280 Mon Sep 17 00:00:00 2001 From: Alex Crichton Date: Wed, 17 May 2017 22:37:13 -0700 Subject: [PATCH] Initial stab for async/await --- Cargo.lock | 57 ++++- Cargo.toml | 6 + src/cache/s3.rs | 11 +- src/compiler/c.rs | 35 ++- src/compiler/clang.rs | 2 +- src/compiler/compiler.rs | 513 ++++++++++++++++++------------------- src/compiler/gcc.rs | 2 +- src/compiler/msvc.rs | 111 ++++---- src/compiler/rust.rs | 41 +-- src/errors.rs | 10 - src/main.rs | 4 +- src/mock_command.rs | 2 +- src/server.rs | 257 ++++++++++--------- src/simples3/credential.rs | 270 ++++++++++--------- src/simples3/s3.rs | 115 +++++---- src/util.rs | 54 ++-- 16 files changed, 759 insertions(+), 731 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index d35e810f6..0c9d5fdc0 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -13,6 +13,7 @@ dependencies = [ "fern 0.3.5 (registry+https://github.com/rust-lang/crates.io-index)", "filetime 0.1.10 (registry+https://github.com/rust-lang/crates.io-index)", "futures 0.1.11 (registry+https://github.com/rust-lang/crates.io-index)", + "futures-await 0.1.0", "futures-cpupool 0.1.5 (registry+https://github.com/rust-lang/crates.io-index)", "gcc 0.3.43 (registry+https://github.com/rust-lang/crates.io-index)", "hyper 0.11.0-a.0 (git+https://github.com/hyperium/hyper)", @@ -95,7 +96,7 @@ dependencies = [ [[package]] name = "base64" -version = "0.4.0" +version = "0.5.2" source = "registry+https://github.com/rust-lang/crates.io-index" dependencies = [ "byteorder 1.0.0 (registry+https://github.com/rust-lang/crates.io-index)", @@ -260,6 +261,27 @@ name = "futures" version = "0.1.11" source = "registry+https://github.com/rust-lang/crates.io-index" +[[package]] +name = "futures-async-macro" +version = "0.1.0" +dependencies = [ + "quote 0.3.15 (registry+https://github.com/rust-lang/crates.io-index)", + "syn 0.11.11 (git+https://github.com/alexcrichton/syn?branch=yield)", +] + +[[package]] +name = "futures-await" +version = "0.1.0" +dependencies = [ + "futures 0.1.11 (registry+https://github.com/rust-lang/crates.io-index)", + "futures-async-macro 0.1.0", + "futures-await-macro 0.1.0", +] + +[[package]] +name = "futures-await-macro" +version = "0.1.0" + [[package]] name = "futures-cpupool" version = "0.1.5" @@ -296,9 +318,9 @@ source = "registry+https://github.com/rust-lang/crates.io-index" [[package]] name = "hyper" version = "0.11.0-a.0" -source = "git+https://github.com/hyperium/hyper#43cf9aefe8f9147a0f08aa0f9d609d275549fd7c" +source = "git+https://github.com/hyperium/hyper#12d01e402363a1d7432e878ad43e6345a29e7a21" dependencies = [ - "base64 0.4.0 (registry+https://github.com/rust-lang/crates.io-index)", + "base64 0.5.2 (registry+https://github.com/rust-lang/crates.io-index)", "bytes 0.4.1 (registry+https://github.com/rust-lang/crates.io-index)", "futures 0.1.11 (registry+https://github.com/rust-lang/crates.io-index)", "futures-cpupool 0.1.5 (registry+https://github.com/rust-lang/crates.io-index)", @@ -306,13 +328,12 @@ dependencies = [ "language-tags 0.2.2 (registry+https://github.com/rust-lang/crates.io-index)", "log 0.3.7 (registry+https://github.com/rust-lang/crates.io-index)", "mime 0.2.2 (registry+https://github.com/rust-lang/crates.io-index)", - "rustc_version 0.1.7 (registry+https://github.com/rust-lang/crates.io-index)", "time 0.1.35 (registry+https://github.com/rust-lang/crates.io-index)", "tokio-core 0.1.6 (registry+https://github.com/rust-lang/crates.io-index)", "tokio-io 0.1.1 (registry+https://github.com/rust-lang/crates.io-index)", "tokio-proto 0.1.1 (registry+https://github.com/rust-lang/crates.io-index)", "tokio-service 0.1.0 (registry+https://github.com/rust-lang/crates.io-index)", - "unicase 1.4.0 (registry+https://github.com/rust-lang/crates.io-index)", + "unicase 2.0.0 (registry+https://github.com/rust-lang/crates.io-index)", "url 1.4.0 (registry+https://github.com/rust-lang/crates.io-index)", ] @@ -923,6 +944,24 @@ dependencies = [ "unicode-xid 0.0.4 (registry+https://github.com/rust-lang/crates.io-index)", ] +[[package]] +name = "syn" +version = "0.11.11" +source = "git+https://github.com/alexcrichton/syn?branch=yield#128bed8feaee9ab14881e6f10f920a76fc4c58c8" +dependencies = [ + "quote 0.3.15 (registry+https://github.com/rust-lang/crates.io-index)", + "synom 0.11.3 (git+https://github.com/alexcrichton/syn?branch=yield)", + "unicode-xid 0.0.4 (registry+https://github.com/rust-lang/crates.io-index)", +] + +[[package]] +name = "synom" +version = "0.11.3" +source = "git+https://github.com/alexcrichton/syn?branch=yield#128bed8feaee9ab14881e6f10f920a76fc4c58c8" +dependencies = [ + "unicode-xid 0.0.4 (registry+https://github.com/rust-lang/crates.io-index)", +] + [[package]] name = "synom" version = "0.11.3" @@ -1117,7 +1156,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" [[package]] name = "unicase" -version = "1.4.0" +version = "2.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" dependencies = [ "rustc_version 0.1.7 (registry+https://github.com/rust-lang/crates.io-index)", @@ -1250,7 +1289,7 @@ dependencies = [ "checksum ansi_term 0.9.0 (registry+https://github.com/rust-lang/crates.io-index)" = "23ac7c30002a5accbf7e8987d0632fa6de155b7c3d39d0067317a391e00a2ef6" "checksum app_dirs 1.1.1 (registry+https://github.com/rust-lang/crates.io-index)" = "b7d1c0d48a81bbb13043847f957971f4d87c81542d80ece5e84ba3cba4058fd4" "checksum atty 0.2.2 (registry+https://github.com/rust-lang/crates.io-index)" = "d912da0db7fa85514874458ca3651fe2cddace8d0b0505571dbdcd41ab490159" -"checksum base64 0.4.0 (registry+https://github.com/rust-lang/crates.io-index)" = "065a0ce220ab84d0b6d5ae3e7bb77232209519c366f51f946fe28c19e84989d0" +"checksum base64 0.5.2 (registry+https://github.com/rust-lang/crates.io-index)" = "30e93c03064e7590d0466209155251b90c22e37fab1daf2771582598b5827557" "checksum bincode 1.0.0-alpha6 (git+https://github.com/TyOverby/bincode)" = "" "checksum bitflags 0.7.0 (registry+https://github.com/rust-lang/crates.io-index)" = "aad18937a628ec6abcd26d1489012cc0e18c21798210f491af69ded9b881106d" "checksum bitflags 0.8.2 (registry+https://github.com/rust-lang/crates.io-index)" = "1370e9fc2a6ae53aea8b7a5110edbd08836ed87c88736dfabccade1c2b44bff4" @@ -1347,7 +1386,9 @@ dependencies = [ "checksum slab 0.3.0 (registry+https://github.com/rust-lang/crates.io-index)" = "17b4fcaed89ab08ef143da37bc52adbcc04d4a69014f4c1208d6b51f0c47bc23" "checksum smallvec 0.2.1 (registry+https://github.com/rust-lang/crates.io-index)" = "4c8cbcd6df1e117c2210e13ab5109635ad68a929fcbb8964dc965b76cb5ee013" "checksum strsim 0.6.0 (registry+https://github.com/rust-lang/crates.io-index)" = "b4d15c810519a91cf877e7e36e63fe068815c678181439f2f29e2562147c3694" +"checksum syn 0.11.11 (git+https://github.com/alexcrichton/syn?branch=yield)" = "" "checksum syn 0.11.9 (registry+https://github.com/rust-lang/crates.io-index)" = "480c834701caba3548aa991e54677281be3a5414a9d09ddbdf4ed74a569a9d19" +"checksum synom 0.11.3 (git+https://github.com/alexcrichton/syn?branch=yield)" = "" "checksum synom 0.11.3 (registry+https://github.com/rust-lang/crates.io-index)" = "a393066ed9010ebaed60b9eafa373d4b1baac186dd7e008555b0f702b51945b6" "checksum take 0.1.0 (registry+https://github.com/rust-lang/crates.io-index)" = "b157868d8ac1f56b64604539990685fa7611d8fa9e5476cf0c02cf34d32917c5" "checksum tempdir 0.3.4 (registry+https://github.com/rust-lang/crates.io-index)" = "0b62933a3f96cd559700662c34f8bab881d9e3540289fb4f368419c7f13a5aa9" @@ -1366,7 +1407,7 @@ dependencies = [ "checksum tokio-tls 0.1.2 (registry+https://github.com/rust-lang/crates.io-index)" = "666266622d9a4d1974a0beda33d505999515b0c60edc0c3fda09784e56609a97" "checksum tokio-uds 0.1.4 (registry+https://github.com/rust-lang/crates.io-index)" = "bd209039933255ea77c6d7a1d18abc20b997d161acb900acca6eb74cdd049f31" "checksum toml 0.2.1 (registry+https://github.com/rust-lang/crates.io-index)" = "736b60249cb25337bc196faa43ee12c705e426f3d55c214d73a4e7be06f92cb4" -"checksum unicase 1.4.0 (registry+https://github.com/rust-lang/crates.io-index)" = "13a5906ca2b98c799f4b1ab4557b76367ebd6ae5ef14930ec841c74aed5f3764" +"checksum unicase 2.0.0 (registry+https://github.com/rust-lang/crates.io-index)" = "2e01da42520092d0cd2d6ac3ae69eb21a22ad43ff195676b86f8c37f487d6b80" "checksum unicode-bidi 0.2.5 (registry+https://github.com/rust-lang/crates.io-index)" = "d3a078ebdd62c0e71a709c3d53d2af693fe09fe93fbff8344aebe289b78f9032" "checksum unicode-normalization 0.1.4 (registry+https://github.com/rust-lang/crates.io-index)" = "e28fa37426fceeb5cf8f41ee273faa7c82c47dc8fba5853402841e665fcd86ff" "checksum unicode-segmentation 1.1.0 (registry+https://github.com/rust-lang/crates.io-index)" = "18127285758f0e2c6cf325bb3f3d138a12fee27de4f23e146cd6a179f26c2cf3" diff --git a/Cargo.toml b/Cargo.toml index f7be5976c..0ef0a2347 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -18,6 +18,7 @@ error-chain = { version = "0.7.2", default-features = false } fern = "0.3.5" filetime = "0.1" futures = "0.1.11" +futures-await = { path = "../futures-await" } futures-cpupool = "0.1" hyper = { git = "https://github.com/hyperium/hyper", optional = true } hyper-tls = { git = "https://github.com/hyperium/hyper-tls", optional = true } @@ -72,4 +73,9 @@ unstable = [] [profile.release] debug = true +[profile.dev] +debug = false +[profile.test] +debug = false + [workspace] diff --git a/src/cache/s3.rs b/src/cache/s3.rs index c08f8d313..40bb4b7f0 100644 --- a/src/cache/s3.rs +++ b/src/cache/s3.rs @@ -29,7 +29,6 @@ use simples3::{ }; use std::env; use std::io; -use std::rc::Rc; use std::time::{Instant, Duration}; use tokio_core::reactor::Handle; @@ -38,7 +37,7 @@ use errors::*; /// A cache that stores entries in Amazon S3. pub struct S3Cache { /// The S3 bucket. - bucket: Rc, + bucket: Bucket, /// Credentials provider. provider: AutoRefreshingProvider, } @@ -56,7 +55,7 @@ impl S3Cache { ]; let provider = AutoRefreshingProvider::new(ChainProvider::with_profile_providers(profile_providers, handle)); //TODO: configurable SSL - let bucket = Rc::new(Bucket::new(bucket, endpoint, Ssl::No, handle)); + let bucket = Bucket::new(bucket, endpoint, Ssl::No, handle); Ok(S3Cache { bucket: bucket, provider: provider, @@ -71,7 +70,7 @@ fn normalize_key(key: &str) -> String { impl Storage for S3Cache { fn get(&self, key: &str) -> SFuture { let key = normalize_key(key); - Box::new(self.bucket.get(&key).then(|result| { + Box::new(self.bucket.clone().get(key).then(|result| { match result { Ok(data) => { let hit = CacheRead::from(io::Cursor::new(data))?; @@ -92,13 +91,13 @@ impl Storage for S3Cache { Ok(data) => data, Err(e) => return future::err(e.into()).boxed(), }; - let credentials = self.provider.credentials().chain_err(|| { + let credentials = self.provider.clone().credentials().chain_err(|| { "failed to get AWS credentials" }); let bucket = self.bucket.clone(); let response = credentials.and_then(move |credentials| { - bucket.put(&key, data, &credentials).chain_err(|| { + bucket.put(key, data, credentials).chain_err(|| { "failed to put cache entry in s3" }) }); diff --git a/src/compiler/c.rs b/src/compiler/c.rs index 8e157dee8..36e2a5a09 100644 --- a/src/compiler/c.rs +++ b/src/compiler/c.rs @@ -13,7 +13,7 @@ // limitations under the License. use compiler::{Cacheable, Compiler, CompilerArguments, CompilerHasher, CompilerKind, Compilation, HashResult}; -use futures::Future; +use futures::prelude::*; use futures_cpupool::CpuPool; use mock_command::CommandCreatorSync; use std::borrow::Cow; @@ -131,15 +131,17 @@ pub trait CCompilerImpl: Clone + fmt::Debug + Send + 'static { impl CCompiler where I: CCompilerImpl, { - pub fn new(compiler: I, executable: PathBuf, pool: &CpuPool) -> SFuture> + #[async] + pub fn new(compiler: I, + executable: PathBuf, + pool: CpuPool) -> Result> { - Box::new(Digest::file(executable.clone(), &pool).map(move |digest| { - CCompiler { - executable: executable, - executable_digest: digest, - compiler: compiler, - } - })) + let digest = await!(Digest::file(executable.clone(), &pool))?; + Ok(CCompiler { + executable: executable, + executable_digest: digest, + compiler: compiler, + }) } } @@ -172,15 +174,20 @@ impl CompilerHasher for CCompilerHasher I: CCompilerImpl, { fn generate_hash_key(self: Box, - creator: &T, - cwd: &Path, - env_vars: &[(OsString, OsString)], - pool: &CpuPool) + creator: T, + cwd: PathBuf, + env_vars: Vec<(OsString, OsString)>, + pool: CpuPool) -> SFuture> { let me = *self; let CCompilerHasher { parsed_args, executable, executable_digest, compiler } = me; - let result = compiler.preprocess(creator, &executable, &parsed_args, cwd, env_vars, pool); + let result = compiler.preprocess(&creator, + &executable, + &parsed_args, + &cwd, + &env_vars, + &pool); let out_pretty = parsed_args.output_pretty().into_owned(); let env_vars = env_vars.to_vec(); let result = result.map_err(move |e| { diff --git a/src/compiler/clang.rs b/src/compiler/clang.rs index 455637854..72e763fdd 100644 --- a/src/compiler/clang.rs +++ b/src/compiler/clang.rs @@ -111,7 +111,7 @@ fn compile(creator: &T, Some(name) => name, None => return future::err("missing input filename".into()).boxed(), }; - write_temp_file(pool, filename.as_ref(), preprocessor_result.stdout) + write_temp_file(pool.clone(), filename.into(), preprocessor_result.stdout) }; let input = parsed_args.input.clone(); let out_file = match parsed_args.outputs.get("obj") { diff --git a/src/compiler/compiler.rs b/src/compiler/compiler.rs index 8cd6d3317..9c7ce9228 100644 --- a/src/compiler/compiler.rs +++ b/src/compiler/compiler.rs @@ -23,7 +23,8 @@ use compiler::clang::Clang; use compiler::gcc::GCC; use compiler::msvc::MSVC; use compiler::rust::Rust; -use futures::{Future, IntoFuture}; +use futures::prelude::*; +use futures::future::{self, Either}; use futures_cpupool::CpuPool; use mock_command::{ CommandChild, @@ -88,189 +89,190 @@ pub trait CompilerHasher: fmt::Debug + Send + 'static /// that can be used for cache lookups, as well as any additional /// information that can be reused for compilation if necessary. fn generate_hash_key(self: Box, - creator: &T, - cwd: &Path, - env_vars: &[(OsString, OsString)], - pool: &CpuPool) + creator: T, + cwd: PathBuf, + env_vars: Vec<(OsString, OsString)>, + pool: CpuPool) -> SFuture>; + + /// A descriptive string about the file that we're going to be producing. + /// + /// This is primarily intended for debug logging and such, not for actual + /// artifact generation. + fn output_pretty(&self) -> Cow; + + fn box_clone(&self) -> Box>; +} + +impl CompilerHasher { /// Look up a cached compile result in `storage`. If not found, run the /// compile and store the result. - fn get_cached_or_compile(self: Box, - creator: T, - storage: Arc, - arguments: Vec, - cwd: PathBuf, - env_vars: Vec<(OsString, OsString)>, - cache_control: CacheControl, - pool: CpuPool, - handle: Handle) - -> SFuture<(CompileResult, process::Output)> + #[async] + pub fn get_cached_or_compile(self: Box, + creator: T, + storage: Arc, + arguments: Vec, + cwd: PathBuf, + env_vars: Vec<(OsString, OsString)>, + cache_control: CacheControl, + pool: CpuPool, + handle: Handle) + -> Result<(CompileResult, process::Output)> { let out_pretty = self.output_pretty().into_owned(); debug!("[{}]: get_cached_or_compile: {:?}", out_pretty, arguments); let start = Instant::now(); - let result = self.generate_hash_key(&creator, &cwd, &env_vars, &pool); - Box::new(result.then(move |res| -> SFuture<_> { - debug!("[{}]: generate_hash_key took {}", out_pretty, fmt_duration_as_secs(&start.elapsed())); - let (key, compilation) = match res { - Err(Error(ErrorKind::ProcessError(output), _)) => { - return f_ok((CompileResult::Error, output)); - } - Err(e) => return f_err(e), - Ok(HashResult { key, compilation }) => (key, compilation), - }; - trace!("[{}]: Hash key: {}", out_pretty, key); - // If `ForceRecache` is enabled, we won't check the cache. - let start = Instant::now(); - let cache_status = if cache_control == CacheControl::ForceRecache { - f_ok(Cache::Recache) - } else { - storage.get(&key) - }; + let result = await!(self.generate_hash_key(creator.clone(), + cwd.clone(), + env_vars.clone(), + pool.clone())); + debug!("[{}]: generate_hash_key took {}", + out_pretty, + fmt_duration_as_secs(&start.elapsed())); - // Set a maximum time limit for the cache to respond before we forge - // ahead ourselves with a compilation. - let timeout = Duration::new(60, 0); - let timeout = Timeout::new(timeout, &handle).into_future().flatten(); + let (key, compilation) = match result { + Err(Error(ErrorKind::ProcessError(output), _)) => { + return Ok((CompileResult::Error, output)); + } + Err(e) => return Err(e), + Ok(HashResult { key, compilation }) => (key, compilation), + }; - let cache_status = cache_status.map(Some); - let timeout = timeout.map(|_| None).chain_err(|| "timeout error"); - let cache_status = cache_status.select(timeout).then(|r| { - match r { - Ok((e, _other)) => Ok(e), - Err((e, _other)) => Err(e), - } - }); + trace!("[{}]: Hash key: {}", out_pretty, key); + // If `ForceRecache` is enabled, we won't check the cache. + let start = Instant::now(); + let cache_status = if cache_control == CacheControl::ForceRecache { + Either::A(future::ok(Cache::Recache)) + } else { + Either::B(storage.get(&key)) + }; - // Check the result of the cache lookup. - Box::new(cache_status.then(move |result| { - let duration = start.elapsed(); - let pwd = Path::new(&cwd); - let outputs = compilation.outputs() - .map(|(key, path)| (key.to_string(), pwd.join(path))) - .collect::>(); + // Set a maximum time limit for the cache to respond before we forge + // ahead ourselves with a compilation. + let timeout = Duration::new(60, 0); + let timeout = Timeout::new(timeout, &handle)?; - let miss_type = match result { - Ok(Some(Cache::Hit(mut entry))) => { - debug!("[{}]: Cache hit in {}", out_pretty, fmt_duration_as_secs(&duration)); - let mut stdout = Vec::new(); - let mut stderr = Vec::new(); - drop(entry.get_object("stdout", &mut stdout)); - drop(entry.get_object("stderr", &mut stderr)); - let write = pool.spawn_fn(move ||{ - for (key, path) in &outputs { - let mut f = File::create(&path)?; - let mode = entry.get_object(&key, &mut f)?; - if let Some(mode) = mode { - set_file_mode(&path, mode)?; - } - } - Ok(()) - }); - let output = process::Output { - status: exit_status(0), - stdout: stdout, - stderr: stderr, - }; - let result = CompileResult::CacheHit(duration); - return Box::new(write.map(|_| { - (result, output) - })) as SFuture<_> - } - Ok(Some(Cache::Miss)) => { - debug!("[{}]: Cache miss", out_pretty); - MissType::Normal - } - Ok(Some(Cache::Recache)) => { - debug!("[{}]: Cache recache", out_pretty); - MissType::ForcedRecache - } - Ok(None) => { - debug!("[{}]: Cache timed out", out_pretty); - MissType::TimedOut - } - Err(err) => { - error!("[{}]: Cache read error: {}", out_pretty, err); - for e in err.iter().skip(1) { - error!("[{}] \t{}", out_pretty, e); + let cache_status = cache_status.map(Some); + let timeout = timeout.map(|()| None).chain_err(|| "timeout error"); + let result = await!(cache_status.select(timeout).then(|r| { + match r { + Ok((e, _other)) => Ok(e), + Err((e, _other)) => Err(e), + } + })); + + // Check the result of the cache lookup. + let duration = start.elapsed(); + let pwd = Path::new(&cwd); + let outputs = compilation.outputs() + .map(|(key, path)| (key.to_string(), pwd.join(path))) + .collect::>(); + + let miss_type = match result { + Ok(Some(Cache::Hit(mut entry))) => { + debug!("[{}]: Cache hit in {}", out_pretty, fmt_duration_as_secs(&duration)); + let mut stdout = Vec::new(); + let mut stderr = Vec::new(); + drop(entry.get_object("stdout", &mut stdout)); + drop(entry.get_object("stderr", &mut stderr)); + await!(pool.spawn_fn(move || -> Result<_> { + for (key, path) in &outputs { + let mut f = File::create(&path)?; + let mode = entry.get_object(&key, &mut f)?; + if let Some(mode) = mode { + set_file_mode(&path, mode)?; } - MissType::CacheReadError } + Ok(()) + }))?; + let output = process::Output { + status: exit_status(0), + stdout: stdout, + stderr: stderr, }; + let result = CompileResult::CacheHit(duration); + return Ok((result, output)) + } + Ok(Some(Cache::Miss)) => { + debug!("[{}]: Cache miss", out_pretty); + MissType::Normal + } + Ok(Some(Cache::Recache)) => { + debug!("[{}]: Cache recache", out_pretty); + MissType::ForcedRecache + } + Ok(None) => { + debug!("[{}]: Cache timed out", out_pretty); + MissType::TimedOut + } + Err(err) => { + error!("[{}]: Cache read error: {}", out_pretty, err); + for e in err.iter().skip(1) { + error!("[{}] \t{}", out_pretty, e); + } + MissType::CacheReadError + } + }; - // Cache miss, so compile it. - let start = Instant::now(); - let out_pretty = out_pretty.clone(); - let compile = compilation.compile(&creator, &cwd, &env_vars, &pool); - Box::new(compile.and_then(move |(cacheable, compiler_result)| { - let duration = start.elapsed(); - if !compiler_result.status.success() { - debug!("[{}]: Compiled but failed, not storing in cache", - out_pretty); - return f_ok((CompileResult::CompileFailed, compiler_result)) - as SFuture<_> - } - if cacheable != Cacheable::Yes { - // Not cacheable - debug!("[{}]: Compiled but not cacheable", - out_pretty); - return f_ok((CompileResult::NotCacheable, compiler_result)) - } - debug!("[{}]: Compiled in {}, storing in cache", out_pretty, fmt_duration_as_secs(&duration)); - let write = pool.spawn_fn(move || -> Result<_> { - let mut entry = CacheWrite::new(); - for (key, path) in &outputs { - let mut f = File::open(&path)?; - let mode = get_file_mode(&path)?; - entry.put_object(key, &mut f, mode).chain_err(|| { - format!("failed to put object `{:?}` in zip", path) - })?; - } - Ok(entry) - }); - let write = write.chain_err(|| "failed to zip up compiler outputs"); - let o = out_pretty.clone(); - Box::new(write.and_then(move |mut entry| { - if !compiler_result.stdout.is_empty() { - let mut stdout = &compiler_result.stdout[..]; - entry.put_object("stdout", &mut stdout, None)?; - } - if !compiler_result.stderr.is_empty() { - let mut stderr = &compiler_result.stderr[..]; - entry.put_object("stderr", &mut stderr, None)?; - } - - // Try to finish storing the newly-written cache - // entry. We'll get the result back elsewhere. - let out_pretty = out_pretty.clone(); - let future = storage.put(&key, entry) - .then(move |res| { - match res { - Ok(_) => debug!("[{}]: Stored in cache successfully!", out_pretty), - Err(ref e) => debug!("[{}]: Cache write error: {:?}", out_pretty, e), - } - res.map(|duration| CacheWriteInfo { - object_file_pretty: out_pretty, - duration: duration, - }) - }); - let future = Box::new(future); - Ok((CompileResult::CacheMiss(miss_type, duration, future), compiler_result)) - }).chain_err(move || { - format!("failed to store `{}` to cache", o) - })) - })) - })) - })) - } + // Cache miss, so compile it. + let start = Instant::now(); + let compile = await!(compilation.compile(&creator, + &cwd, + &env_vars, + &pool))?; + let (cacheable, compiler_result) = compile; + let duration = start.elapsed(); + if !compiler_result.status.success() { + debug!("[{}]: Compiled but failed, not storing in cache", out_pretty); + return Ok((CompileResult::CompileFailed, compiler_result)) + } + if cacheable != Cacheable::Yes { + // Not cacheable + debug!("[{}]: Compiled but not cacheable", out_pretty); + return Ok((CompileResult::NotCacheable, compiler_result)) + } + debug!("[{}]: Compiled in {}, storing in cache", + out_pretty, + fmt_duration_as_secs(&duration)); - /// A descriptive string about the file that we're going to be producing. - /// - /// This is primarily intended for debug logging and such, not for actual - /// artifact generation. - fn output_pretty(&self) -> Cow; + let mut entry = await!(pool.spawn_fn(move || -> Result<_> { + let mut entry = CacheWrite::new(); + for (key, path) in &outputs { + let mut f = File::open(&path)?; + let mode = get_file_mode(&path)?; + entry.put_object(key, &mut f, mode).chain_err(|| { + format!("failed to put object `{:?}` in zip", path) + })?; + } + Ok(entry) + })).chain_err(|| { + "failed to zip up compiler outputs" + })?; + if !compiler_result.stdout.is_empty() { + let mut stdout = &compiler_result.stdout[..]; + entry.put_object("stdout", &mut stdout, None)?; + } + if !compiler_result.stderr.is_empty() { + let mut stderr = &compiler_result.stderr[..]; + entry.put_object("stderr", &mut stderr, None)?; + } - fn box_clone(&self) -> Box>; + // Try to finish storing the newly-written cache + // entry. We'll get the result back elsewhere. + let future = storage.put(&key, entry).then(move |res| { + match res { + Ok(_) => debug!("[{}]: Stored in cache successfully!", out_pretty), + Err(ref e) => debug!("[{}]: Cache write error: {:?}", out_pretty, e), + } + res.map(|duration| CacheWriteInfo { + object_file_pretty: out_pretty, + duration: duration, + }) + }); + let future = Box::new(future); + Ok((CompileResult::CacheMiss(miss_type, duration, Box::new(future)), + compiler_result)) + } } impl Clone for Box> { @@ -434,33 +436,36 @@ pub enum CacheControl { /// /// Note that when the `TempDir` is dropped it will delete all of its contents /// including the path returned. -pub fn write_temp_file(pool: &CpuPool, path: &Path, contents: Vec) - -> SFuture<(TempDir, PathBuf)> { - let path = path.to_owned(); - pool.spawn_fn(move || -> Result<_> { +#[async] +pub fn write_temp_file(pool: CpuPool, path: PathBuf, contents: Vec) + -> Result<(TempDir, PathBuf)> { + let pair = await!(pool.spawn_fn(move || -> Result<_> { let dir = TempDir::new("sccache")?; let src = dir.path().join(path); let mut file = File::create(&src)?; file.write_all(&contents)?; Ok((dir, src)) - }).chain_err(|| { + })).chain_err(|| { "failed to write temporary file" - }) + })?; + Ok(pair) } /// If `executable` is a known compiler, return `Some(Box)`. -fn detect_compiler(creator: &T, executable: &Path, pool: &CpuPool) - -> SFuture>>> +#[async] +fn detect_compiler(creator: T, executable: PathBuf, pool: CpuPool) + -> Result>>> where T: CommandCreatorSync { trace!("detect_compiler"); // First, see if this looks like rustc. - let filename = match executable.file_stem() { - None => return f_err("could not determine compiler kind"), - Some(f) => f, + let looks_like_rustc = match executable.file_stem() { + None => bail!("could not determine compiler kind"), + Some(f) => f.to_string_lossy().to_lowercase() == "rustc", }; - let is_rustc = if filename.to_string_lossy().to_lowercase() == "rustc" { + let mut is_rustc = false; + if looks_like_rustc { // Sanity check that it's really rustc. let child = creator.clone().new_command_sync(&executable) .stdout(Stdio::piped()) @@ -468,40 +473,31 @@ fn detect_compiler(creator: &T, executable: &Path, pool: &CpuPool) .args(&["--version"]) .spawn().chain_err(|| { format!("failed to execute {:?}", executable) - }); - let output = child.into_future().and_then(move |child| { - child.wait_with_output() - .chain_err(|| "failed to read child output") - }); - Box::new(output.map(|output| { - if output.status.success() { - if let Ok(stdout) = String::from_utf8(output.stdout) { - if stdout.starts_with("rustc ") { - return true; - } + })?; + let output = await!(child.wait_with_output()).chain_err(|| { + "failed to read child output" + })?; + if output.status.success() { + if let Ok(stdout) = String::from_utf8(output.stdout) { + if stdout.starts_with("rustc ") { + is_rustc = true; } } - false - })) - } else { - f_ok(false) - }; - - let creator = creator.clone(); - let executable = executable.to_owned(); - let pool = pool.clone(); - Box::new(is_rustc.and_then(move |is_rustc| { - if is_rustc { - debug!("Found rustc"); - Box::new(Rust::new(creator, executable, pool).map(|c| Some(Box::new(c) as Box>))) - } else { - detect_c_compiler(creator, executable, pool) } - })) + } + + if is_rustc { + debug!("Found rustc"); + let rust = await!(Rust::new(creator, executable, pool))?; + Ok(Some(Box::new(rust) as Box>)) + } else { + await!(detect_c_compiler(creator, executable, pool)) + } } +#[async] fn detect_c_compiler(creator: T, executable: PathBuf, pool: CpuPool) - -> SFuture>>> + -> Result>>> where T: CommandCreatorSync { trace!("detect_c_compiler"); @@ -514,72 +510,65 @@ clang gcc #endif ".to_vec(); - let write = write_temp_file(&pool, "testfile.c".as_ref(), test); + let (tempdir, src) = await!(write_temp_file(pool.clone(), + "testfile.c".into(), + test))?; let mut cmd = creator.clone().new_command_sync(&executable); cmd.stdout(Stdio::piped()) - .stderr(Stdio::null()); - let output = write.and_then(move |(tempdir, src)| { - cmd.arg("-E").arg(src); - trace!("compiler {:?}", cmd); - let child = cmd.spawn().chain_err(|| { - format!("failed to execute {:?}", cmd) - }); - child.into_future().and_then(|child| { - child.wait_with_output().chain_err(|| "failed to read child output") - }).map(|e| { - drop(tempdir); - e - }) - }); + .stderr(Stdio::null()) + .arg("-E") + .arg(src); + trace!("compiler {:?}", cmd); + let child = cmd.spawn().chain_err(|| { + format!("failed to execute {:?}", cmd) + })?; + let output = await!(child.wait_with_output().chain_err(|| { + "failed to read child output" + }))?; + drop(tempdir); - Box::new(output.and_then(move |output| -> SFuture<_> { - let stdout = match str::from_utf8(&output.stdout) { - Ok(s) => s, - Err(_) => return f_err("Failed to parse output"), - }; - for line in stdout.lines() { - //TODO: do something smarter here. - if line == "gcc" { - debug!("Found GCC"); - return Box::new(CCompiler::new(GCC, executable, &pool) - .map(|c| Some(Box::new(c) as Box>))); - } else if line == "clang" { - debug!("Found clang"); - return Box::new(CCompiler::new(Clang, executable, &pool) - .map(|c| Some(Box::new(c) as Box>))); - } else if line == "msvc" { - debug!("Found MSVC"); - let prefix = msvc::detect_showincludes_prefix(&creator, - executable.as_ref(), - &pool); - return Box::new(prefix.and_then(move |prefix| { - trace!("showIncludes prefix: '{}'", prefix); - CCompiler::new(MSVC { - includes_prefix: prefix, - }, executable, &pool) - .map(|c| Some(Box::new(c) as Box>)) - })) - } + let stdout = match String::from_utf8(output.stdout) { + Ok(s) => s, + Err(_) => bail!("failed to parse output"), + }; + for line in stdout.lines() { + //TODO: do something smarter here. + if line == "gcc" { + debug!("Found GCC"); + let c = await!(CCompiler::new(GCC, executable, pool))?; + return Ok(Some(Box::new(c) as Box>)) + } else if line == "clang" { + debug!("Found clang"); + let c = await!(CCompiler::new(Clang, executable, pool))?; + return Ok(Some(Box::new(c) as Box>)) + } else if line == "msvc" { + debug!("Found MSVC"); + let prefix = await!(msvc::detect_showincludes_prefix(creator, + executable.clone().into(), + pool.clone()))?; + trace!("showIncludes prefix: '{}'", prefix); + let c = await!(CCompiler::new(MSVC { + includes_prefix: prefix, + }, executable, pool))?; + return Ok(Some(Box::new(c) as Box>)) } - debug!("nothing useful in detection output {:?}", stdout); - f_ok(None) - })) + } + debug!("nothing useful in detection output {:?}", stdout); + Ok(None) } /// If `executable` is a known compiler, return a `Box` containing information about it. -pub fn get_compiler_info(creator: &T, executable: &Path, pool: &CpuPool) - -> SFuture>> +#[async] +pub fn get_compiler_info(creator: T, executable: PathBuf, pool: CpuPool) + -> Result>> where T: CommandCreatorSync { let pool = pool.clone(); - let detect = detect_compiler(creator, executable, &pool); - Box::new(detect.and_then(move |compiler| -> Result<_> { - match compiler { - Some(compiler) => Ok(compiler), - None => bail!("could not determine compiler kind"), - } - })) + match await!(detect_compiler(creator, executable, pool))? { + Some(compiler) => Ok(compiler), + None => bail!("could not determine compiler kind"), + } } #[cfg(test)] diff --git a/src/compiler/gcc.rs b/src/compiler/gcc.rs index 82fbc0529..ff50638d8 100644 --- a/src/compiler/gcc.rs +++ b/src/compiler/gcc.rs @@ -264,7 +264,7 @@ pub fn preprocess(creator: &T, if log_enabled!(Trace) { trace!("preprocess: {:?}", cmd); } - run_input_output(cmd, None) + Box::new(run_input_output(cmd, None)) } fn compile(creator: &T, diff --git a/src/compiler/msvc.rs b/src/compiler/msvc.rs index 5f5eff3a3..81f48f005 100644 --- a/src/compiler/msvc.rs +++ b/src/compiler/msvc.rs @@ -20,14 +20,14 @@ use ::compiler::{ use compiler::c::{CCompilerImpl, CCompilerKind, ParsedArguments}; use local_encoding::{Encoding, Encoder}; use log::LogLevel::{Debug, Trace}; -use futures::future::Future; +use futures::prelude::*; use futures_cpupool::CpuPool; use mock_command::{ CommandCreatorSync, RunCommand, }; use std::collections::{HashMap,HashSet}; -use std::ffi::{OsStr, OsString}; +use std::ffi::OsString; use std::fs::File; use std::io::{ self, @@ -91,76 +91,65 @@ fn from_local_codepage(bytes: &Vec) -> io::Result { } /// Detect the prefix included in the output of MSVC's -showIncludes output. -pub fn detect_showincludes_prefix(creator: &T, exe: &OsStr, pool: &CpuPool) - -> SFuture +#[async] +pub fn detect_showincludes_prefix(mut creator: T, + exe: OsString, + pool: CpuPool) + -> Result where T: CommandCreatorSync { - let write = write_temp_file(pool, - "test.c".as_ref(), - b"#include \"test.h\"\n".to_vec()); - - let exe = exe.to_os_string(); - let mut creator = creator.clone(); - let pool = pool.clone(); - let write2 = write.and_then(move |(tempdir, input)| { - let header = tempdir.path().join("test.h"); - pool.spawn_fn(move || -> Result<_> { - let mut file = File::create(&header)?; - file.write_all(b"/* empty */\n")?; - Ok((tempdir, input)) - }).chain_err(|| { - "failed to write temporary file" - }) - }); - let output = write2.and_then(move |(tempdir, input)| { - let mut cmd = creator.new_command_sync(&exe); - cmd.args(&["-nologo", "-showIncludes", "-c", "-Fonul", "-I."]) - .arg(&input) - .current_dir(&tempdir.path()) + let pair = await!(write_temp_file(pool.clone(), + "test.c".into(), + b"#include \"test.h\"\n".to_vec()))?; + let (tempdir, input) = pair; + + let header = tempdir.path().join("test.h"); + await!(pool.spawn_fn(move || -> Result<_> { + let mut file = File::create(&header)?; + file.write_all(b"/* empty */\n")?; + Ok(()) + })).chain_err(|| { + "failed to write temporary file" + })?; + + let mut cmd = creator.new_command_sync(&exe); + cmd.args(&["-nologo", "-showIncludes", "-c", "-Fonul", "-I."]) + .arg(&input) + .current_dir(&tempdir.path()) // The MSDN docs say the -showIncludes output goes to stderr, // but that's not true unless running with -E. - .stdout(Stdio::piped()) - .stderr(Stdio::null()); - - if log_enabled!(Trace) { - trace!("detect_showincludes_prefix: {:?}", cmd); - } + .stdout(Stdio::piped()) + .stderr(Stdio::null()); - run_input_output(cmd, None).map(|e| { - // Keep the tempdir around so test.h still exists for the - // checks below. - (e, tempdir) - }) - }); + if log_enabled!(Trace) { + trace!("detect_showincludes_prefix: {:?}", cmd); + } - Box::new(output.and_then(|(output, tempdir)| { - if !output.status.success() { - bail!("Failed to detect showIncludes prefix") - } + let output = await!(run_input_output(cmd, None))?; + if !output.status.success() { + bail!("Failed to detect showIncludes prefix") + } - let process::Output { stdout: stdout_bytes, .. } = output; - let stdout = try!(from_local_codepage(&stdout_bytes)); - for line in stdout.lines() { - if line.ends_with("test.h") { - for (i, c) in line.char_indices().rev() { - if c == ' ' { - // See if the rest of this line is a full pathname. - if Path::new(&line[i+1..]).exists() { - // Everything from the beginning of the line - // to this index is the prefix. - return Ok(line[..i+1].to_owned()); - } + let process::Output { stdout: stdout_bytes, .. } = output; + let stdout = from_local_codepage(&stdout_bytes)?; + for line in stdout.lines() { + if line.ends_with("test.h") { + for (i, c) in line.char_indices().rev() { + if c == ' ' { + // See if the rest of this line is a full pathname. + if Path::new(&line[i+1..]).exists() { + // Everything from the beginning of the line + // to this index is the prefix. + return Ok(line[..i+1].to_owned()); } } } } - drop(tempdir); + } - debug!("failed to detect showIncludes prefix with output: {}", - stdout); + debug!("failed to detect showIncludes prefix with output: {}", stdout); - bail!("Failed to detect showIncludes prefix") - })) + bail!("Failed to detect showIncludes prefix") } #[cfg(unix)] @@ -487,7 +476,9 @@ fn compile(creator: &T, Some(name) => name, None => return f_err("missing input filename"), }; - write_temp_file(pool, filename.as_ref(), preprocessor_result.stdout) + write_temp_file(pool.clone(), + filename.into(), + preprocessor_result.stdout) }; let mut fo = OsString::from("-Fo"); diff --git a/src/compiler/rust.rs b/src/compiler/rust.rs index 67c5fc983..8e9f03e69 100644 --- a/src/compiler/rust.rs +++ b/src/compiler/rust.rs @@ -14,7 +14,8 @@ use compiler::{Cacheable, Compiler, CompilerArguments, CompilerHasher, CompilerKind, Compilation, HashResult}; -use futures::{Future, future}; +use futures::future; +use futures::prelude::*; use futures_cpupool::CpuPool; use log::LogLevel::Trace; use mock_command::{CommandCreatorSync, RunCommand}; @@ -133,16 +134,16 @@ fn arg_in(arg: &str, set: &HashSet<&str>) -> bool /// Calculate the SHA-1 digest of each file in `files` on background threads /// in `pool`. -fn hash_all(files: Vec, pool: &CpuPool) -> SFuture> +#[async] +fn hash_all(files: Vec, pool: CpuPool) -> Result> { let start = Instant::now(); let count = files.len(); - let pool = pool.clone(); - Box::new(future::join_all(files.into_iter().map(move |f| Digest::file(f, &pool))) - .map(move |hashes| { - trace!("Hashed {} files in {}", count, fmt_duration_as_secs(&start.elapsed())); - hashes - })) + let result = await!(future::join_all(files.into_iter().map(move |f| { + Digest::file(f, &pool) + })))?; + trace!("Hashed {} files in {}", count, fmt_duration_as_secs(&start.elapsed())); + Ok(result) } /// Calculate SHA-1 digests for all source files listed in rustc's dep-info output. @@ -189,7 +190,7 @@ fn hash_source_files(creator: &T, files.len(), fmt_duration_as_secs(&start.elapsed())); // Just to make sure we capture temp_dir. drop(temp_dir); - hash_all(files, &pool) + hash_all(files, pool) })) })) } @@ -283,7 +284,7 @@ impl Rust { Ok(libs) }); Box::new(libs.and_then(move |libs| { - hash_all(libs, &pool).map(move |digests| { + hash_all(libs, pool).map(move |digests| { Rust { executable: executable, compiler_shlibs_digests: digests, @@ -503,10 +504,10 @@ impl CompilerHasher for RustHasher where T: CommandCreatorSync, { fn generate_hash_key(self: Box, - creator: &T, - cwd: &Path, - env_vars: &[(OsString, OsString)], - pool: &CpuPool) + creator: T, + cwd: PathBuf, + env_vars: Vec<(OsString, OsString)>, + pool: CpuPool) -> SFuture> { let me = *self; @@ -524,14 +525,20 @@ impl CompilerHasher for RustHasher .flat_map(|(arg, val)| Some(arg).into_iter().chain(val)) .map(|a| a.clone()) .collect::>(); - let source_hashes = hash_source_files(creator, &crate_name, &executable, &filtered_arguments, cwd, env_vars, pool); + let source_hashes = hash_source_files(&creator, + &crate_name, + &executable, + &filtered_arguments, + &cwd, + &env_vars, + &pool); // Hash the contents of the externs listed on the commandline. - let cwp = Path::new(cwd); + let cwp = Path::new(&cwd); trace!("[{}]: hashing {} externs", crate_name, externs.len()); let extern_hashes = hash_all(externs.iter() .map(|e| cwp.join(e).to_string_lossy().into_owned()) .collect(), - &pool); + pool.clone()); let creator = creator.clone(); let cwd = cwd.to_owned(); let env_vars = env_vars.to_vec(); diff --git a/src/errors.rs b/src/errors.rs index c457f8bf9..1a000057c 100644 --- a/src/errors.rs +++ b/src/errors.rs @@ -68,16 +68,6 @@ impl FutureChainErr for F } } -/// Like `try`, but returns an SFuture instead of a Result. -macro_rules! ftry { - ($e:expr) => { - match $e { - Ok(v) => v, - Err(e) => return Box::new(future::err(e.into())), - } - } -} - pub fn f_ok(t: T) -> SFuture where T: 'static, { diff --git a/src/main.rs b/src/main.rs index 890c63b97..bf740166c 100644 --- a/src/main.rs +++ b/src/main.rs @@ -12,6 +12,8 @@ // See the License for the specific language governing permissions and // limitations under the License. +#![feature(proc_macro, conservative_impl_trait, generators)] + extern crate app_dirs; extern crate bincode; extern crate byteorder; @@ -28,7 +30,7 @@ extern crate env_logger; extern crate error_chain; extern crate filetime; #[macro_use] -extern crate futures; +extern crate futures_await as futures; extern crate futures_cpupool; #[cfg(feature = "hyper")] extern crate hyper; diff --git a/src/mock_command.rs b/src/mock_command.rs index a131594c5..1b3265714 100644 --- a/src/mock_command.rs +++ b/src/mock_command.rs @@ -92,7 +92,7 @@ pub trait CommandChild { } /// A trait that provides a subset of the methods of `std::process::Command`. -pub trait RunCommand: fmt::Debug { +pub trait RunCommand: fmt::Debug + 'static { /// The type returned by `spawn`. type C: CommandChild + 'static; diff --git a/src/server.rs b/src/server.rs index 7412636e5..0e876f057 100644 --- a/src/server.rs +++ b/src/server.rs @@ -29,7 +29,7 @@ use filetime::FileTime; use futures::future; use futures::sync::mpsc; use futures::task::{self, Task}; -use futures::{Stream, Sink, Async, AsyncSink, Poll, StartSend, Future}; +use futures::prelude::*; use futures_cpupool::CpuPool; use mock_command::{ CommandCreatorSync, @@ -345,7 +345,7 @@ impl Service for SccacheService Request::Compile(compile) => { debug!("handle_client: compile"); self.stats.borrow_mut().compile_requests += 1; - return self.handle_compile(compile) + return self.clone().handle_compile(compile) } Request::GetStats => { debug!("handle_client: get_stats"); @@ -411,25 +411,23 @@ impl SccacheService /// This will handle a compile request entirely, generating a response with /// the inital information and an optional body which will eventually /// contain the results of the compilation. - fn handle_compile(&self, compile: Compile) - -> SFuture - { + #[async(boxed)] + fn handle_compile(self, compile: Compile) -> Result { let exe = compile.exe; let cmd = compile.args; let cwd = compile.cwd; let env_vars = compile.env_vars; - let me = self.clone(); - Box::new(self.compiler_info(exe.into()).map(move |info| { - me.check_compiler(info, cmd, cwd.into(), env_vars) - })) + let info = await!(self.clone().compiler_info(exe.into()))?; + Ok(self.check_compiler(info, cmd, cwd.into(), env_vars)) } /// Look up compiler info from the cache for the compiler `path`. /// If not cached, determine the compiler type and cache the result. - fn compiler_info(&self, path: PathBuf) - -> SFuture>>> { + #[async] + fn compiler_info(self, path: PathBuf) -> Result>>> { trace!("compiler_info"); - let mtime = ftry!(metadata(&path).map(|attr| FileTime::from_last_modification_time(&attr))); + let attr = metadata(&path)?; + let mtime = FileTime::from_last_modification_time(&attr); //TODO: properly handle rustup overrides. Currently this will // cache based on the rustup rustc path, ignoring overrides. // https://github.com/mozilla/sccache/issues/87 @@ -443,21 +441,20 @@ impl SccacheService match result { Some(info) => { trace!("compiler_info cache hit"); - f_ok(info) + Ok(info) } None => { trace!("compiler_info cache miss"); // Check the compiler type and return the result when // finished. This generally involves invoking the compiler, // so do it asynchronously. - let me = self.clone(); - let info = get_compiler_info(&self.creator, &path, &self.pool); - Box::new(info.then(move |info| { - let info = info.ok(); - me.compilers.borrow_mut().insert(path, info.clone().map(|i| (i, mtime))); - Ok(info) - })) + let compilers = self.compilers; + let info = await!(get_compiler_info(self.creator, + path.clone(), + self.pool)).ok(); + compilers.borrow_mut().insert(path, info.clone().map(|i| (i, mtime))); + Ok(info) } } } @@ -485,7 +482,14 @@ impl SccacheService debug!("parse_arguments: Ok"); stats.requests_executed += 1; let (tx, rx) = Body::pair(); - self.start_compile_task(hasher, cmd, cwd, env_vars, tx); + let task = self.clone().compile(hasher, + cmd, + cwd, + env_vars, + tx); + self.handle.spawn(task.map_err(|e| { + error!("error in spawned task: {}", e); + })); let res = CompileResponse::CompileStarted; return Message::WithBody(Response::Compile(res), rx) } @@ -509,12 +513,13 @@ impl SccacheService /// Given compiler arguments `arguments`, look up /// a compile result in the cache or execute the compilation and store /// the result in the cache. - fn start_compile_task(&self, - hasher: Box>, - arguments: Vec, - cwd: PathBuf, - env_vars: Vec<(OsString, OsString)>, - tx: mpsc::Sender>) { + #[async] + fn compile(self, + hasher: Box>, + arguments: Vec, + cwd: PathBuf, + env_vars: Vec<(OsString, OsString)>, + tx: mpsc::Sender>) -> Result<()> { let force_recache = env_vars.iter().any(|&(ref k, ref _v)| { k.as_os_str() == OsStr::new("SCCACHE_RECACHE") }); @@ -524,117 +529,113 @@ impl SccacheService CacheControl::Default }; let out_pretty = hasher.output_pretty().into_owned(); - let result = hasher.get_cached_or_compile(self.creator.clone(), - self.storage.clone(), - arguments, - cwd, - env_vars, - cache_control, - self.pool.clone(), - self.handle.clone()); - let me = self.clone(); - let task = result.then(move |result| { - let mut cache_write = None; - let mut stats = me.stats.borrow_mut(); - let mut res = CompileFinished::default(); - match result { - Ok((compiled, out)) => { - match compiled { - CompileResult::Error => { - stats.cache_errors += 1; - } - CompileResult::CacheHit(duration) => { - stats.cache_hits += 1; - stats.cache_read_hit_duration += duration; - }, - CompileResult::CacheMiss(miss_type, duration, future) => { - match miss_type { - MissType::Normal => {} - MissType::ForcedRecache => { - stats.forced_recaches += 1; - } - MissType::TimedOut => { - stats.cache_timeouts += 1; - } - MissType::CacheReadError => { - stats.cache_errors += 1; - } + let result = await!(hasher.get_cached_or_compile(self.creator.clone(), + self.storage.clone(), + arguments, + cwd, + env_vars, + cache_control, + self.pool.clone(), + self.handle.clone())); + let mut cache_write = None; + let mut res = CompileFinished::default(); + match result { + Ok((compiled, out)) => { + let mut stats = self.stats.borrow_mut(); + match compiled { + CompileResult::Error => { + stats.cache_errors += 1; + } + CompileResult::CacheHit(duration) => { + stats.cache_hits += 1; + stats.cache_read_hit_duration += duration; + }, + CompileResult::CacheMiss(miss_type, duration, future) => { + match miss_type { + MissType::Normal => {} + MissType::ForcedRecache => { + stats.forced_recaches += 1; + } + MissType::TimedOut => { + stats.cache_timeouts += 1; + } + MissType::CacheReadError => { + stats.cache_errors += 1; } - stats.cache_misses += 1; - stats.cache_read_miss_duration += duration; - cache_write = Some(future); - } - CompileResult::NotCacheable => { - stats.cache_misses += 1; - stats.non_cacheable_compilations += 1; - } - CompileResult::CompileFailed => { - stats.compile_fails += 1; } - }; - let Output { status, stdout, stderr } = out; - trace!("CompileFinished retcode: {}", status); - match status.code() { - Some(code) => res.retcode = Some(code), - None => res.signal = Some(get_signal(status)), - }; - res.stdout = stdout; - res.stderr = stderr; - } - Err(Error(ErrorKind::ProcessError(output), _)) => { - debug!("Compilation failed: {:?}", output); - stats.compile_fails += 1; - match output.status.code() { - Some(code) => res.retcode = Some(code), - None => res.signal = Some(get_signal(output.status)), - }; - res.stdout = output.stdout; - res.stderr = output.stderr; - } - Err(err) => { - use std::fmt::Write; - - error!("[{:?}] fatal error: {}", out_pretty, err); - - let mut error = format!("sccache: encountered fatal error\n"); - drop(writeln!(error, "sccache: error : {}", err)); - for e in err.iter() { - error!("[{:?}] \t{}", out_pretty, e); - drop(writeln!(error, "sccache: cause: {}", e)); + stats.cache_misses += 1; + stats.cache_read_miss_duration += duration; + cache_write = Some(future); } - stats.cache_errors += 1; - //TODO: figure out a better way to communicate this? - res.retcode = Some(-2); - res.stderr = error.into_bytes(); - } - }; - let send = tx.send(Ok(Response::CompileFinished(res))); - - let me = me.clone(); - let cache_write = cache_write.then(move |result| { - match result { - Err(e) => { - debug!("Error executing cache write: {}", e); - me.stats.borrow_mut().cache_write_errors += 1; + CompileResult::NotCacheable => { + stats.cache_misses += 1; + stats.non_cacheable_compilations += 1; } - //TODO: save cache stats! - Ok(Some(info)) => { - debug!("[{}]: Cache write finished in {}", - info.object_file_pretty, - fmt_duration_as_secs(&info.duration)); - me.stats.borrow_mut().cache_writes += 1; - me.stats.borrow_mut().cache_write_duration += info.duration; + CompileResult::CompileFailed => { + stats.compile_fails += 1; } + }; + let Output { status, stdout, stderr } = out; + trace!("CompileFinished retcode: {}", status); + match status.code() { + Some(code) => res.retcode = Some(code), + None => res.signal = Some(get_signal(status)), + }; + res.stdout = stdout; + res.stderr = stderr; + } + Err(Error(ErrorKind::ProcessError(output), _)) => { + let mut stats = self.stats.borrow_mut(); + debug!("Compilation failed: {:?}", output); + stats.compile_fails += 1; + match output.status.code() { + Some(code) => res.retcode = Some(code), + None => res.signal = Some(get_signal(output.status)), + }; + res.stdout = output.stdout; + res.stderr = output.stderr; + } + Err(err) => { + use std::fmt::Write; + + error!("[{:?}] fatal error: {}", out_pretty, err); - Ok(None) => {} + let mut error = format!("sccache: encountered fatal error\n"); + drop(writeln!(error, "sccache: error : {}", err)); + for e in err.iter() { + error!("[{:?}] \t{}", out_pretty, e); + drop(writeln!(error, "sccache: cause: {}", e)); } - Ok(()) - }); + self.stats.borrow_mut().cache_errors += 1; + //TODO: figure out a better way to communicate this? + res.retcode = Some(-2); + res.stderr = error.into_bytes(); + } + }; + let send = tx.send(Ok(Response::CompileFinished(res))); - send.join(cache_write).then(|_| Ok(())) + let cache_write = cache_write.then(move |result| { + match result { + Err(e) => { + debug!("Error executing cache write: {}", e); + self.stats.borrow_mut().cache_write_errors += 1; + } + //TODO: save cache stats! + Ok(Some(info)) => { + debug!("[{}]: Cache write finished in {}", + info.object_file_pretty, + fmt_duration_as_secs(&info.duration)); + self.stats.borrow_mut().cache_writes += 1; + self.stats.borrow_mut().cache_write_duration += info.duration; + } + + Ok(None) => {} + } + Ok(()) }); - self.handle.spawn(task); + drop(await!(send.join(cache_write))); + Ok(()) } } diff --git a/src/simples3/credential.rs b/src/simples3/credential.rs index 89a96ea9b..44cf12998 100644 --- a/src/simples3/credential.rs +++ b/src/simples3/credential.rs @@ -3,13 +3,13 @@ #![allow(dead_code)] use chrono::{Duration, UTC, DateTime}; -use futures::{Future, Async, IntoFuture, Stream}; -use futures::future::{self, Shared}; +use futures::prelude::*; +use futures::future::Shared; use hyper::{self, Client, Method}; use hyper::client::{HttpConnector, Request}; use hyper::header::Connection; use regex::Regex; -use serde_json::{Value, from_str}; +use serde_json::Value; use std::ascii::AsciiExt; use std::cell::RefCell; use std::collections::HashMap; @@ -20,6 +20,7 @@ use std::fs; use std::io::BufReader; use std::io::prelude::*; use std::path::{Path, PathBuf}; +use std::str; use std::time::Duration as StdDuration; use tokio_core::reactor::{Handle, Timeout}; @@ -77,17 +78,19 @@ impl AwsCredentials { } /// A trait for types that produce `AwsCredentials`. -pub trait ProvideAwsCredentials { +pub trait ProvideAwsCredentials: Clone { /// Produce a new `AwsCredentials`. - fn credentials(&self) -> SFuture; + fn credentials(self) -> SFuture; } /// Provides AWS credentials from environment variables. +#[derive(Clone)] pub struct EnvironmentProvider; impl ProvideAwsCredentials for EnvironmentProvider { - fn credentials(&self) -> SFuture { - future::result(credentials_from_environment()).boxed() + #[async(boxed)] + fn credentials(self) -> Result { + credentials_from_environment() } } @@ -184,12 +187,10 @@ impl ProfileProvider { } impl ProvideAwsCredentials for ProfileProvider { - fn credentials(&self) -> SFuture { - let result = parse_credentials_file(self.file_path()); - let result = result.and_then(|mut profiles| { - profiles.remove(self.profile()).ok_or("profile not found".into()) - }); - future::result(result).boxed() + #[async(boxed)] + fn credentials(self) -> Result { + let mut result = parse_credentials_file(self.file_path())?; + result.remove(self.profile()).ok_or("profile not found".into()) } } @@ -271,6 +272,7 @@ fn parse_credentials_file(file_path: &Path) -> Result, handle: Handle, @@ -284,131 +286,118 @@ impl IamProvider { } } - fn iam_role(&self) -> SFuture { + #[async] + fn iam_role(self) -> Result { // First get the IAM role let address = "http://169.254.169.254/latest/meta-data/iam/security-credentials/"; let mut req = Request::new(Method::Get, address.parse().unwrap()); req.headers_mut().set(Connection::close()); - let response = self.client.request(req).and_then(|response| { - response.body().fold(Vec::new(), |mut body, chunk| { - body.extend_from_slice(&chunk); - Ok::<_, hyper::Error>(body) - }) - }); - - Box::new(response.then(|res| { - let bytes = res.chain_err(|| { - "couldn't connect to metadata service" - })?; - String::from_utf8(bytes).chain_err(|| { - "Didn't get a parsable response body from metadata service" - }) - }).map(move |body| { - let mut address = address.to_string(); - address.push_str(&body); - address - })) + let response = await!(self.client.request(req)).chain_err(|| { + "couldn't connect to metadata service" + })?; + + let bytes = await!(response.body().concat())?; + let body = str::from_utf8(&bytes).chain_err(|| { + "Didn't get a parsable response body from metadata service" + })?; + + let mut address = address.to_string(); + address.push_str(&body); + Ok(address) } -} -impl ProvideAwsCredentials for IamProvider { - fn credentials(&self) -> SFuture { + #[async] + fn fetch_credentials(self) -> Result { let url = match var("AWS_IAM_CREDENTIALS_URL") { - Ok(url) => f_ok(url), - Err(_) => self.iam_role(), + Ok(url) => url, + Err(_) => await!(self.clone().iam_role())?, + }; + let url = url.parse::().chain_err(|| { + format!("failed to parse `{}` as url", url) + })?; + + debug!("Attempting to fetch credentials from {}", url); + let mut req = Request::new(Method::Get, url.clone()); + req.headers_mut().set(Connection::close()); + let response = await!(self.client.request(req)).chain_err(|| { + format!("failed to send http request to: {}", url) + })?; + let bytes = await!(response.body().concat())?; + let body = str::from_utf8(&bytes).chain_err(|| { + "Didn't get a parsable response body from metadata service" + })?; + + let json_object: Value; + match body.parse() { + Err(_) => bail!("Couldn't parse metadata response body."), + Ok(val) => json_object = val, + }; + + let access_key; + match json_object.get("AccessKeyId") { + None => bail!("Couldn't find AccessKeyId in response."), + Some(val) => access_key = val.as_str().expect("AccessKeyId value was not a string").to_owned().replace("\"", "") + }; + + let secret_key; + match json_object.get("SecretAccessKey") { + None => bail!("Couldn't find SecretAccessKey in response."), + Some(val) => secret_key = val.as_str().expect("SecretAccessKey value was not a string").to_owned().replace("\"", "") + }; + + let expiration; + match json_object.get("Expiration") { + None => bail!("Couldn't find Expiration in response."), + Some(val) => expiration = val.as_str().expect("Expiration value was not a string").to_owned().replace("\"", "") }; - let url = url.and_then(|url| { - url.parse().chain_err(|| format!("failed to parse `{}` as url", url)) - }); - - let client = self.client.clone(); - let response = url.and_then(move |address| { - debug!("Attempting to fetch credentials from {}", address); - let mut req = Request::new(Method::Get, address); - req.headers_mut().set(Connection::close()); - client.request(req).chain_err(|| { - "failed to send http request" - }) - }); - let body = response.and_then(|response| { - response.body().fold(Vec::new(), |mut body, chunk| { - body.extend_from_slice(&chunk); - Ok::<_, hyper::Error>(body) - }).chain_err(|| { - "failed to read http body" - }) - }); - let body = body.map_err(|_e| { - "Didn't get a parseable response body from instance role details".into() - }).and_then(|body| { - String::from_utf8(body).chain_err(|| { - "failed to read iam role response" - }) - }); - - let creds = body.and_then(|body| { - let json_object: Value; - match from_str(&body) { - Err(_) => bail!("Couldn't parse metadata response body."), - Ok(val) => json_object = val - }; - - let access_key; - match json_object.get("AccessKeyId") { - None => bail!("Couldn't find AccessKeyId in response."), - Some(val) => access_key = val.as_str().expect("AccessKeyId value was not a string").to_owned().replace("\"", "") - }; - - let secret_key; - match json_object.get("SecretAccessKey") { - None => bail!("Couldn't find SecretAccessKey in response."), - Some(val) => secret_key = val.as_str().expect("SecretAccessKey value was not a string").to_owned().replace("\"", "") - }; - - let expiration; - match json_object.get("Expiration") { - None => bail!("Couldn't find Expiration in response."), - Some(val) => expiration = val.as_str().expect("Expiration value was not a string").to_owned().replace("\"", "") - }; - - let expiration_time = expiration.parse().chain_err(|| { - "failed to parse expiration time" - })?; - - let token_from_response; - match json_object.get("Token") { - None => bail!("Couldn't find Token in response."), - Some(val) => token_from_response = val.as_str().expect("Token value was not a string").to_owned().replace("\"", "") - }; - - Ok(AwsCredentials::new(access_key, secret_key, Some(token_from_response), expiration_time)) - }); + + let expiration_time = expiration.parse().chain_err(|| { + "failed to parse expiration time" + })?; + + let token_from_response; + match json_object.get("Token") { + None => bail!("Couldn't find Token in response."), + Some(val) => token_from_response = val.as_str().expect("Token value was not a string").to_owned().replace("\"", "") + }; + + Ok(AwsCredentials::new(access_key, + secret_key, + Some(token_from_response), + expiration_time)) + } +} + +impl ProvideAwsCredentials for IamProvider { + #[async(boxed)] + fn credentials(self) -> Result { + let creds = self.clone().fetch_credentials(); //XXX: this is crappy, but this blocks on non-EC2 machines like // our mac builders. - let timeout = Timeout::new(StdDuration::from_secs(2), &self.handle); - let timeout = timeout.into_future().flatten().map_err(|_e| { - "timeout failed".into() - }); - - Box::new(creds.map(Ok).select(timeout.map(Err)).then(|result| { - match result { - Ok((Ok(creds), _timeout)) => Ok(creds), - Ok((Err(_), _creds)) => { - bail!("took too long to fetch credentials") - } - Err((e, _)) => { - warn!("Failed to fetch IAM credentials: {}", e); - Err(e) - } + let timeout = StdDuration::from_secs(2); + let timeout = Timeout::new(timeout, &self.handle).chain_err(|| { + "failed to create timeout" + })?; + let timeout = timeout.then(|e| e.chain_err(|| "timeout error")); + + match await!(creds.map(Ok).select(timeout.map(Err))) { + Ok((Ok(creds), _timeout)) => Ok(creds), + Ok((Err(_), _creds)) => { + bail!("took too long to fetch credentials") } - })) + Err((e, _)) => { + warn!("Failed to fetch IAM credentials: {}", e); + Err(e).chain_err(|| "failed to fetch iam credentials") + } + } } } /// Wrapper for ProvideAwsCredentials that caches the credentials returned by the /// wrapped provider. Each time the credentials are accessed, they are checked to see if /// they have expired, in which case they are retrieved from the wrapped provider again. +#[derive(Clone)] pub struct AutoRefreshingProvider

{ credentials_provider: P, cached_credentials: RefCell>>, @@ -417,14 +406,14 @@ pub struct AutoRefreshingProvider

{ impl AutoRefreshingProvider

{ pub fn new(provider: P) -> AutoRefreshingProvider

{ AutoRefreshingProvider { + credentials_provider: provider.clone(), cached_credentials: RefCell::new(provider.credentials().shared()), - credentials_provider: provider, } } } -impl ProvideAwsCredentials for AutoRefreshingProvider

{ - fn credentials(&self) -> SFuture { +impl ProvideAwsCredentials for AutoRefreshingProvider

{ + fn credentials(self) -> SFuture { let mut future = self.cached_credentials.borrow_mut(); if let Ok(Async::Ready(creds)) = future.poll() { if creds.credentials_are_expired() { @@ -456,25 +445,26 @@ pub struct ChainProvider { } impl ProvideAwsCredentials for ChainProvider { - fn credentials(&self) -> SFuture { - let creds = EnvironmentProvider.credentials().map(|c| { + #[async(boxed)] + fn credentials(self) -> Result { + if let Ok(c) = await!(EnvironmentProvider.credentials()) { debug!("Using AWS credentials from environment"); - c - }); - let mut creds = Box::new(creds) as SFuture<_>; - for provider in self.profile_providers.iter() { - let alternate = provider.credentials(); - creds = Box::new(creds.or_else(|_| alternate)); + return Ok(c) } - let handle = self.handle.clone(); - Box::new(creds.or_else(move |_| { - IamProvider::new(&handle).credentials().map(|c| { - debug!("Using AWS credentials from IAM"); - c - }) - }).map_err(|_| { - "Couldn't find AWS credentials in environment, credentials file, or IAM role.".into() - })) + + for provider in self.profile_providers.clone() { + if let Ok(c) = await!(provider.credentials()) { + return Ok(c) + } + } + + if let Ok(c) = await!(IamProvider::new(&self.handle).credentials()) { + debug!("Using AWS credentials from IAM"); + return Ok(c) + } + + Err("Couldn't find AWS credentials in environment, credentials \ + file, or IAM role.".into()) } } diff --git a/src/simples3/s3.rs b/src/simples3/s3.rs index f120b2e15..ff2f35738 100644 --- a/src/simples3/s3.rs +++ b/src/simples3/s3.rs @@ -3,13 +3,14 @@ use std::ascii::AsciiExt; use std::fmt; +use std::rc::Rc; use crypto::digest::Digest; use crypto::hmac::Hmac; use crypto::mac::Mac; use crypto::sha1::Sha1; -use futures::{Future, Stream}; -use hyper::{self, header}; +use futures::prelude::*; +use hyper::header; use hyper::Method; use hyper::client::{Client, Request}; use hyper_tls::HttpsConnector; @@ -50,7 +51,12 @@ fn signature(string_to_sign: &str, signing_key: &str) -> String { } /// An S3 bucket. +#[derive(Clone)] pub struct Bucket { + inner: Rc, +} + +struct Inner { name: String, base_url: String, client: Client, @@ -58,7 +64,9 @@ pub struct Bucket { impl fmt::Display for Bucket { fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { - write!(f, "Bucket(name={}, base_url={})", self.name, self.base_url) + write!(f, "Bucket(name={}, base_url={})", + self.inner.name, + self.inner.base_url) } } @@ -66,50 +74,49 @@ impl Bucket { pub fn new(name: &str, endpoint: &str, ssl: Ssl, handle: &Handle) -> Bucket { let base_url = base_url(&endpoint, ssl); Bucket { - name: name.to_owned(), - base_url: base_url, - client: Client::configure() - .connector(HttpsConnector::new(1, handle)) - .build(handle), + inner: Rc::new(Inner { + name: name.to_owned(), + base_url: base_url, + client: Client::configure() + .connector(HttpsConnector::new(1, handle)) + .build(handle), + }), } } - pub fn get(&self, key: &str) -> SFuture> { - let url = format!("{}{}", self.base_url, key); + // TODO: this should fail to compile due to borrowed args + #[async] + pub fn get(self, key: String) -> Result> { + let url = format!("{}{}", self.inner.base_url, key); debug!("GET {}", url); let url2 = url.clone(); - Box::new(self.client.get(url.parse().unwrap()).chain_err(move || { - format!("failed GET: {}", url) - }).and_then(|res| { - if res.status().class() == hyper::status::StatusClass::Success { - let content_length = res.headers().get::() - .map(|&header::ContentLength(len)| len); - Ok((res.body(), content_length)) + let res = await!(self.inner.client.get(url.parse().unwrap())) + .chain_err(move || format!("failed GET: {}", url))?; + if !res.status().is_success() { + return Err(ErrorKind::BadHTTPStatus(res.status().clone()).into()) + } + let content_length = res.headers().get::() + .map(|&header::ContentLength(len)| len); + let mut bytes = Vec::new(); + #[async] + for chunk in res.body() { + bytes.extend_from_slice(&chunk); + } + if let Some(len) = content_length { + if len != bytes.len() as u64 { + bail!(format!("Bad HTTP body size read: {}, expected {}", bytes.len(), len)); } else { - Err(ErrorKind::BadHTTPStatus(res.status().clone()).into()) + info!("Read {} bytes from {}", bytes.len(), url2); } - }).and_then(|(body, content_length)| { - body.fold(Vec::new(), |mut body, chunk| { - body.extend_from_slice(&chunk); - Ok::<_, hyper::Error>(body) - }).chain_err(|| { - "failed to read HTTP body" - }).and_then(move |bytes| { - if let Some(len) = content_length { - if len != bytes.len() as u64 { - bail!(format!("Bad HTTP body size read: {}, expected {}", bytes.len(), len)); - } else { - info!("Read {} bytes from {}", bytes.len(), url2); - } - } - Ok(bytes) - }) - })) + } + Ok(bytes) } - pub fn put(&self, key: &str, content: Vec, creds: &AwsCredentials) - -> SFuture<()> { - let url = format!("{}{}", self.base_url, key); + // TODO: this should fail to compile due to borrowed args + #[async] + pub fn put(self, key: String, content: Vec, creds: AwsCredentials) + -> Result<()> { + let url = format!("{}{}", self.inner.base_url, key); debug!("PUT {}", url); let mut request = Request::new(Method::Put, url.parse().unwrap()); @@ -128,7 +135,7 @@ impl Bucket { canonical_headers.push_str(format!("{}:{}\n", header.to_ascii_lowercase(), value).as_ref()); } } - let auth = self.auth("PUT", &date, key, "", &canonical_headers, content_type, creds); + let auth = self.auth("PUT", &date, &key, "", &canonical_headers, content_type, &creds); request.headers_mut().set_raw("Date", vec!(date.into_bytes())); request.headers_mut().set(header::ContentType(content_type.parse().unwrap())); request.headers_mut().set(header::ContentLength(content.len() as u64)); @@ -139,23 +146,21 @@ impl Bucket { request.headers_mut().set_raw("Authorization", vec!(auth.into_bytes())); request.set_body(content); - Box::new(self.client.request(request).then(|result| { - match result { - Ok(res) => { - if res.status().class() == hyper::status::StatusClass::Success { - trace!("PUT succeeded"); - Ok(()) - } else { - trace!("PUT failed with HTTP status: {}", res.status()); - Err(ErrorKind::BadHTTPStatus(res.status().clone()).into()) - } - } - Err(e) => { - trace!("PUT failed with error: {:?}", e); - Err(e.into()) + match await!(self.inner.client.request(request)) { + Ok(res) => { + if res.status().is_success() { + trace!("PUT succeeded"); + Ok(()) + } else { + trace!("PUT failed with HTTP status: {}", res.status()); + Err(ErrorKind::BadHTTPStatus(res.status().clone()).into()) } } - })) + Err(e) => { + trace!("PUT failed with error: {:?}", e); + Err(e.into()) + } + } } // http://docs.aws.amazon.com/AmazonS3/latest/dev/RESTAuthentication.html @@ -167,7 +172,7 @@ impl Bucket { ty = content_type, date = date, headers = headers, - resource = format!("/{}/{}", self.name, path)); + resource = format!("/{}/{}", self.inner.name, path)); let signature = signature(&string, creds.aws_secret_access_key()); format!("AWS {}:{}", creds.aws_access_key_id(), signature) } diff --git a/src/util.rs b/src/util.rs index d19e830d1..284d5afb5 100644 --- a/src/util.rs +++ b/src/util.rs @@ -12,8 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -use futures::Future; -use futures::future; +use futures::prelude::*; use futures_cpupool::CpuPool; use mock_command::{CommandChild, RunCommand}; use ring::digest::{SHA512, Context}; @@ -42,8 +41,12 @@ impl Digest { pub fn file(path: T, pool: &CpuPool) -> SFuture where T: Into { - let path = path.into(); - Box::new(pool.spawn_fn(move || -> Result<_> { + Digest::_file(path.into(), pool.clone()) + } + + #[async(boxed)] + fn _file(path: PathBuf, pool: CpuPool) -> Result { + await!(pool.spawn_fn(move || -> Result<_> { let f = File::open(&path).chain_err(|| format!("Failed to open file for hashing: {:?}", path))?; let mut m = Digest::new(); let mut reader = BufReader::new(f); @@ -94,8 +97,9 @@ pub fn fmt_duration_as_secs(duration: &Duration) -> String /// /// This was lifted from `std::process::Child::wait_with_output` and modified /// to also write to stdin. +#[async] fn wait_with_input_output(mut child: T, input: Option>) - -> SFuture + -> Result where T: CommandChild + 'static, { use tokio_io::io::{write_all, read_to_end}; @@ -115,23 +119,23 @@ fn wait_with_input_output(mut child: T, input: Option>) child.wait().chain_err(|| "failed to wait for child") }); - Box::new(status.join3(stdout, stderr).map(|(status, out, err)| { - let stdout = out.map(|p| p.1); - let stderr = err.map(|p| p.1); - process::Output { - status: status, - stdout: stdout.unwrap_or_default(), - stderr: stderr.unwrap_or_default(), - } - })) + let (status, out, err) = await!(status.join3(stdout, stderr))?; + let stdout = out.map(|p| p.1); + let stderr = err.map(|p| p.1); + Ok(process::Output { + status: status, + stdout: stdout.unwrap_or_default(), + stderr: stderr.unwrap_or_default(), + }) } /// Run `command`, writing `input` to its stdin if it is `Some` and return the exit status and output. /// /// If the command returns a non-successful exit status, an error of `ErrorKind::ProcessError` /// will be returned containing the process output. +#[async] pub fn run_input_output(mut command: C, input: Option>) - -> SFuture + -> Result where C: RunCommand { let child = command @@ -140,18 +144,14 @@ pub fn run_input_output(mut command: C, input: Option>) .stdout(Stdio::piped()) .stderr(Stdio::piped()) .spawn() - .chain_err(|| "failed to spawn child"); - - Box::new(future::result(child) - .and_then(|child| { - wait_with_input_output(child, input).and_then(|output| { - if output.status.success() { - f_ok(output) - } else { - f_err(ErrorKind::ProcessError(output)) - } - }) - })) + .chain_err(|| "failed to spawn child")?; + + let output = await!(wait_with_input_output(child, input))?; + if output.status.success() { + Ok(output) + } else { + Err(ErrorKind::ProcessError(output).into()) + } } pub trait OsStrExt {