From 061604275dd83c19e53b11547d5bee9f0ac2cf4b Mon Sep 17 00:00:00 2001 From: Stu Hood Date: Fri, 17 Jun 2022 10:10:24 -0700 Subject: [PATCH] Remove synchronous remote cache lookup from remote execution (#15854) To prepare for #11331, it's necessary to clarify the building of the "stacks" of command runners, in order to progressively remove caches for each attempt to backtrack. Both for performance reasons (to allow an attempt to execute a process to proceed concurrently with checking the cache), and to allow us to more easily conditionally remove remote caching from remote execution, this change removes the synchronous cache lookup from the `remote::CommandRunner` by wrapping in `remote_cache::CommandRunner`. Additionally, refactors the `CommandRunner` stacking, and produces stacks per backtrack attempt for #11331. --- .../engine/process_execution/src/bounded.rs | 9 + .../engine/process_execution/src/cache.rs | 17 +- src/rust/engine/process_execution/src/lib.rs | 4 +- .../engine/process_execution/src/local.rs | 8 + .../process_execution/src/nailgun/mod.rs | 9 + .../engine/process_execution/src/remote.rs | 180 +--------- .../process_execution/src/remote_cache.rs | 143 +++++++- .../src/remote_cache_tests.rs | 3 +- .../process_execution/src/remote_tests.rs | 335 +++++------------- src/rust/engine/process_executor/src/main.rs | 34 +- src/rust/engine/src/context.rs | 276 ++++++++++----- src/rust/engine/src/nodes.rs | 2 +- 12 files changed, 483 insertions(+), 537 deletions(-) diff --git a/src/rust/engine/process_execution/src/bounded.rs b/src/rust/engine/process_execution/src/bounded.rs index ef408595542..77bd8f4ae10 100644 --- a/src/rust/engine/process_execution/src/bounded.rs +++ b/src/rust/engine/process_execution/src/bounded.rs @@ -1,6 +1,7 @@ use std::borrow::Cow; use std::cmp::{max, min, Ordering, Reverse}; use std::collections::VecDeque; +use std::fmt::{self, Debug}; use std::future::Future; use std::sync::{atomic, Arc}; use std::time::{Duration, Instant}; @@ -53,6 +54,14 @@ impl CommandRunner { } } +impl Debug for CommandRunner { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("bounded::CommandRunner") + .field("inner", &self.inner) + .finish_non_exhaustive() + } +} + #[async_trait] impl crate::CommandRunner for CommandRunner { async fn run( diff --git a/src/rust/engine/process_execution/src/cache.rs b/src/rust/engine/process_execution/src/cache.rs index 8468e3fa07a..8afebdbadda 100644 --- a/src/rust/engine/process_execution/src/cache.rs +++ b/src/rust/engine/process_execution/src/cache.rs @@ -1,3 +1,4 @@ +use std::fmt::{self, Debug}; use std::sync::Arc; use std::time::Instant; @@ -29,7 +30,7 @@ struct PlatformAndResponseBytes { #[derive(Clone)] pub struct CommandRunner { - underlying: Arc, + inner: Arc, cache: PersistentCache, file_store: Store, metadata: ProcessMetadata, @@ -37,13 +38,13 @@ pub struct CommandRunner { impl CommandRunner { pub fn new( - underlying: Arc, + inner: Arc, cache: PersistentCache, file_store: Store, metadata: ProcessMetadata, ) -> CommandRunner { CommandRunner { - underlying, + inner, cache, file_store, metadata, @@ -51,6 +52,14 @@ impl CommandRunner { } } +impl Debug for CommandRunner { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("cache::CommandRunner") + .field("inner", &self.inner) + .finish_non_exhaustive() + } +} + #[async_trait] impl crate::CommandRunner for CommandRunner { async fn run( @@ -125,7 +134,7 @@ impl crate::CommandRunner for CommandRunner { return Ok(result); } - let result = self.underlying.run(context.clone(), workunit, req).await?; + let result = self.inner.run(context.clone(), workunit, req).await?; if result.exit_code == 0 || write_failures_to_cache { let result = result.clone(); in_workunit!("local_cache_write", Level::Trace, |workunit| async move { diff --git a/src/rust/engine/process_execution/src/lib.rs b/src/rust/engine/process_execution/src/lib.rs index d3ebbb13a20..df9c74fd182 100644 --- a/src/rust/engine/process_execution/src/lib.rs +++ b/src/rust/engine/process_execution/src/lib.rs @@ -29,7 +29,7 @@ extern crate derivative; use std::collections::{BTreeMap, BTreeSet}; use std::convert::{TryFrom, TryInto}; -use std::fmt::{self, Display}; +use std::fmt::{self, Debug, Display}; use std::path::PathBuf; use async_trait::async_trait; @@ -783,7 +783,7 @@ impl Context { } #[async_trait] -pub trait CommandRunner: Send + Sync { +pub trait CommandRunner: Send + Sync + Debug { /// /// Submit a request for execution on the underlying runtime, and return /// a future for it. diff --git a/src/rust/engine/process_execution/src/local.rs b/src/rust/engine/process_execution/src/local.rs index 5fcf45a2c0c..9f9fea2e219 100644 --- a/src/rust/engine/process_execution/src/local.rs +++ b/src/rust/engine/process_execution/src/local.rs @@ -1,5 +1,6 @@ use std::collections::{BTreeMap, BTreeSet, HashSet}; use std::ffi::OsStr; +use std::fmt::{self, Debug}; use std::fs::create_dir_all; use std::io::Write; use std::ops::Neg; @@ -137,6 +138,13 @@ impl CommandRunner { } } +impl Debug for CommandRunner { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("local::CommandRunner") + .finish_non_exhaustive() + } +} + pub struct HermeticCommand { inner: Command, } diff --git a/src/rust/engine/process_execution/src/nailgun/mod.rs b/src/rust/engine/process_execution/src/nailgun/mod.rs index c938a429659..a69cdf0c67a 100644 --- a/src/rust/engine/process_execution/src/nailgun/mod.rs +++ b/src/rust/engine/process_execution/src/nailgun/mod.rs @@ -1,4 +1,5 @@ use std::collections::{BTreeMap, BTreeSet}; +use std::fmt::{self, Debug}; use std::net::SocketAddr; use std::path::{Path, PathBuf}; @@ -109,6 +110,14 @@ impl CommandRunner { } } +impl Debug for CommandRunner { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("nailgun::CommandRunner") + .field("inner", &self.inner) + .finish_non_exhaustive() + } +} + #[async_trait] impl super::CommandRunner for CommandRunner { async fn run( diff --git a/src/rust/engine/process_execution/src/remote.rs b/src/rust/engine/process_execution/src/remote.rs index 0b25b4e3a53..b537bc67386 100644 --- a/src/rust/engine/process_execution/src/remote.rs +++ b/src/rust/engine/process_execution/src/remote.rs @@ -1,11 +1,11 @@ use std::cmp::Ordering; use std::collections::{BTreeMap, HashMap}; use std::convert::TryInto; +use std::fmt::{self, Debug}; use std::io::Cursor; use std::path::PathBuf; use std::sync::Arc; -use std::time::SystemTime; -use std::time::{Duration, Instant}; +use std::time::{Duration, Instant, SystemTime}; use async_oncecell::OnceCell; use async_trait::async_trait; @@ -17,7 +17,6 @@ use futures::FutureExt; use futures::{Stream, StreamExt}; use grpc_util::headers_to_http_header_map; use grpc_util::prost::MessageExt; -use grpc_util::retry::{retry_call, status_is_retryable}; use grpc_util::{layered_service, status_to_str, LayeredService}; use hashing::{Digest, Fingerprint}; use log::{debug, trace, warn, Level}; @@ -28,9 +27,9 @@ use protos::gen::google::rpc::{PreconditionFailure, Status as StatusProto}; use protos::require_digest; use rand::{thread_rng, Rng}; use remexec::{ - action_cache_client::ActionCacheClient, capabilities_client::CapabilitiesClient, - execution_client::ExecutionClient, Action, Command, ExecuteRequest, ExecuteResponse, - ExecutedActionMetadata, ServerCapabilities, WaitExecutionRequest, + capabilities_client::CapabilitiesClient, execution_client::ExecutionClient, Action, Command, + ExecuteRequest, ExecuteResponse, ExecutedActionMetadata, ServerCapabilities, + WaitExecutionRequest, }; use store::{Snapshot, SnapshotOps, Store, StoreError, StoreFileByDigest}; use tonic::metadata::BinaryMetadataValue; @@ -102,12 +101,10 @@ pub struct CommandRunner { platform: Platform, store: Store, execution_client: Arc>, - action_cache_client: Arc>, overall_deadline: Duration, retry_interval_duration: Duration, capabilities_cell: Arc>, capabilities_client: Arc>, - cache_read_timeout: Duration, } enum StreamOutcome { @@ -119,7 +116,6 @@ impl CommandRunner { /// Construct a new CommandRunner pub fn new( execution_address: &str, - store_address: &str, metadata: ProcessMetadata, root_ca_certs: Option>, headers: BTreeMap, @@ -128,20 +124,17 @@ impl CommandRunner { overall_deadline: Duration, retry_interval_duration: Duration, execution_concurrency_limit: usize, - cache_concurrency_limit: usize, - cache_read_timeout: Duration, capabilities_cell_opt: Option>>, ) -> Result { let execution_use_tls = execution_address.starts_with("https://"); - let store_use_tls = store_address.starts_with("https://"); - let tls_client_config = if execution_use_tls || store_use_tls { + let tls_client_config = if execution_use_tls { Some(grpc_util::tls::Config::new_without_mtls(root_ca_certs).try_into()?) } else { None }; - let mut execution_headers = headers.clone(); + let mut execution_headers = headers; let execution_endpoint = grpc_util::create_endpoint( execution_address, tls_client_config.as_ref().filter(|_| execution_use_tls), @@ -155,34 +148,17 @@ impl CommandRunner { ); let execution_client = Arc::new(ExecutionClient::new(execution_channel.clone())); - let mut store_headers = headers; - let store_endpoint = grpc_util::create_endpoint( - store_address, - tls_client_config.as_ref().filter(|_| execution_use_tls), - &mut store_headers, - )?; - let store_http_headers = headers_to_http_header_map(&store_headers)?; - let store_channel = layered_service( - tonic::transport::Channel::balance_list(vec![store_endpoint].into_iter()), - cache_concurrency_limit, - store_http_headers, - ); - - let action_cache_client = Arc::new(ActionCacheClient::new(store_channel)); - let capabilities_client = Arc::new(CapabilitiesClient::new(execution_channel)); let command_runner = CommandRunner { metadata, execution_client, - action_cache_client, store, platform, overall_deadline, retry_interval_duration, capabilities_cell: capabilities_cell_opt.unwrap_or_else(|| Arc::new(OnceCell::new())), capabilities_client, - cache_read_timeout, }; Ok(command_runner) @@ -745,6 +721,13 @@ impl CommandRunner { } } +impl Debug for CommandRunner { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("remote::CommandRunner") + .finish_non_exhaustive() + } +} + #[async_trait] impl crate::CommandRunner for CommandRunner { /// Run the given Process via the Remote Execution API. @@ -776,29 +759,6 @@ impl crate::CommandRunner for CommandRunner { let (command_digest, action_digest) = ensure_action_stored_locally(&self.store, &command, &action).await?; - // Check the remote Action Cache to see if this request was already computed. - // If so, return immediately with the result. - let context2 = context.clone(); - let cached_response_opt = check_action_cache( - action_digest, - &request.description, - &self.metadata, - self.platform, - &context2, - self.action_cache_client.clone(), - self.store.clone(), - false, - self.cache_read_timeout, - ) - .await?; - debug!( - "action cache response: build_id={}; digest={:?}: {:?}", - &build_id, action_digest, cached_response_opt - ); - if let Some(cached_response) = cached_response_opt { - return Ok(cached_response); - } - // Upload the action (and related data, i.e. the embedded command and input files). ensure_action_uploaded( &self.store, @@ -1053,7 +1013,7 @@ pub fn make_execute_request( Ok((action, command, execute_request)) } -pub async fn populate_fallible_execution_result_for_timeout( +async fn populate_fallible_execution_result_for_timeout( store: &Store, context: &Context, description: &str, @@ -1090,7 +1050,7 @@ pub async fn populate_fallible_execution_result_for_timeout( /// of the ActionResult/ExecuteResponse stored in the local cache. When /// `treat_tree_digest_as_final_directory_hack` is true, then that final merged directory /// will be extracted from the tree_digest of the single output directory. -pub async fn populate_fallible_execution_result( +pub(crate) async fn populate_fallible_execution_result( store: Store, run_id: RunId, action_result: &remexec::ActionResult, @@ -1311,7 +1271,7 @@ pub fn extract_output_files( } /// Apply REAPI request metadata header to a `tonic::Request`. -fn apply_headers(mut request: Request, build_id: &str) -> Request { +pub(crate) fn apply_headers(mut request: Request, build_id: &str) -> Request { let reapi_request_metadata = remexec::RequestMetadata { tool_details: Some(remexec::ToolDetails { tool_name: "pants".into(), @@ -1330,112 +1290,6 @@ fn apply_headers(mut request: Request, build_id: &str) -> Request { request } -/// Check the remote Action Cache for a cached result of running the given `command` and the Action -/// with the given `action_digest`. -/// -/// This check is necessary because some REAPI servers do not short-circuit the Execute method -/// by checking the Action Cache (e.g., BuildBarn). Thus, this client must check the cache -/// explicitly in order to avoid duplicating already-cached work. This behavior matches -/// the Bazel RE client. -pub async fn check_action_cache( - action_digest: Digest, - command_description: &str, - metadata: &ProcessMetadata, - platform: Platform, - context: &Context, - action_cache_client: Arc>, - store: Store, - eager_fetch: bool, - timeout_duration: Duration, -) -> Result, ProcessError> { - in_workunit!( - "check_action_cache", - Level::Debug, - desc = Some(format!("Remote cache lookup for: {}", command_description)), - |workunit| async move { - workunit.increment_counter(Metric::RemoteCacheRequests, 1); - - let client = action_cache_client.as_ref().clone(); - let response = retry_call( - client, - move |mut client| { - let request = remexec::GetActionResultRequest { - action_digest: Some(action_digest.into()), - instance_name: metadata.instance_name.as_ref().cloned().unwrap_or_default(), - ..remexec::GetActionResultRequest::default() - }; - let request = apply_headers(Request::new(request), &context.build_id); - async move { - let lookup_fut = client.get_action_result(request); - let timeout_fut = tokio::time::timeout(timeout_duration, lookup_fut); - timeout_fut - .await - .unwrap_or_else(|_| Err(Status::unavailable("Pants client timeout"))) - } - }, - status_is_retryable, - ) - .and_then(|action_result| async move { - let action_result = action_result.into_inner(); - let response = populate_fallible_execution_result( - store.clone(), - context.run_id, - &action_result, - platform, - false, - ProcessResultSource::HitRemotely, - ) - .await - .map_err(|e| Status::unavailable(format!("Output roots could not be loaded: {e}")))?; - - if eager_fetch { - // NB: `ensure_local_has_file` and `ensure_local_has_recursive_directory` are internally - // retried. - let response = response.clone(); - in_workunit!( - "eager_fetch_action_cache", - Level::Trace, - desc = Some("eagerly fetching after action cache hit".to_owned()), - |_workunit| async move { - future::try_join_all(vec![ - store.ensure_local_has_file(response.stdout_digest).boxed(), - store.ensure_local_has_file(response.stderr_digest).boxed(), - store - .ensure_local_has_recursive_directory(response.output_directory) - .boxed(), - ]) - .await - } - ) - .await - .map_err(|e| Status::unavailable(format!("Output content could not be loaded: {e}")))?; - } - Ok(response) - }) - .await; - - match response { - Ok(response) => { - workunit.increment_counter(Metric::RemoteCacheRequestsCached, 1); - Ok(Some(response)) - } - Err(status) => match status.code() { - Code::NotFound => { - workunit.increment_counter(Metric::RemoteCacheRequestsUncached, 1); - Ok(None) - } - _ => { - workunit.increment_counter(Metric::RemoteCacheReadErrors, 1); - // TODO: Ensure that we're catching missing digests. - Err(status_to_str(status).into()) - } - }, - } - } - ) - .await -} - pub async fn store_proto_locally( store: &Store, proto: &P, diff --git a/src/rust/engine/process_execution/src/remote_cache.rs b/src/rust/engine/process_execution/src/remote_cache.rs index 3e86a28b19a..8782f1520aa 100644 --- a/src/rust/engine/process_execution/src/remote_cache.rs +++ b/src/rust/engine/process_execution/src/remote_cache.rs @@ -1,16 +1,15 @@ use std::collections::{BTreeMap, HashSet}; use std::convert::TryInto; +use std::fmt::{self, Debug}; use std::sync::Arc; use std::time::{Duration, Instant}; use async_trait::async_trait; use fs::{directory, DigestTrie, RelativePath}; -use futures::future::BoxFuture; +use futures::future::{self, BoxFuture, TryFutureExt}; use futures::FutureExt; -use grpc_util::retry::status_is_retryable; -use grpc_util::{ - headers_to_http_header_map, layered_service, retry::retry_call, status_to_str, LayeredService, -}; +use grpc_util::retry::{retry_call, status_is_retryable}; +use grpc_util::{headers_to_http_header_map, layered_service, status_to_str, LayeredService}; use hashing::Digest; use parking_lot::Mutex; use protos::gen::build::bazel::remote::execution::v2 as remexec; @@ -22,11 +21,12 @@ use workunit_store::{ in_workunit, Level, Metric, ObservationMetric, RunningWorkunit, WorkunitMetadata, }; -use crate::remote::make_execute_request; +use crate::remote::{apply_headers, make_execute_request, populate_fallible_execution_result}; use crate::{ Context, FallibleProcessResultWithPlatform, Platform, Process, ProcessCacheScope, ProcessError, - ProcessMetadata, + ProcessMetadata, ProcessResultSource, }; +use tonic::{Code, Request, Status}; #[derive(Clone, Copy, Debug, PartialEq, strum_macros::EnumString)] #[strum(serialize_all = "snake_case")] @@ -46,7 +46,7 @@ pub enum RemoteCacheWarningsBehavior { /// then locally. #[derive(Clone)] pub struct CommandRunner { - underlying: Arc, + inner: Arc, metadata: ProcessMetadata, executor: task_executor::Executor, store: Store, @@ -63,7 +63,7 @@ pub struct CommandRunner { impl CommandRunner { pub fn new( - underlying: Arc, + inner: Arc, metadata: ProcessMetadata, executor: task_executor::Executor, store: Store, @@ -98,7 +98,7 @@ impl CommandRunner { let action_cache_client = Arc::new(ActionCacheClient::new(channel)); Ok(CommandRunner { - underlying, + inner, metadata, executor, store, @@ -256,7 +256,7 @@ impl CommandRunner { ) -> Result<(FallibleProcessResultWithPlatform, bool), ProcessError> { // A future to read from the cache and log the results accordingly. let cache_read_future = async { - let response = crate::remote::check_action_cache( + let response = check_action_cache( action_digest, &request.description, &self.metadata, @@ -414,6 +414,14 @@ impl CommandRunner { } } +impl Debug for CommandRunner { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("remote_cache::CommandRunner") + .field("inner", &self.inner) + .finish_non_exhaustive() + } +} + enum CacheErrorType { ReadError, WriteError, @@ -444,15 +452,12 @@ impl crate::CommandRunner for CommandRunner { cache_lookup_start, action_digest, &request.clone(), - self.underlying.run(context.clone(), workunit, request), + self.inner.run(context.clone(), workunit, request), ) .await? } else { ( - self - .underlying - .run(context.clone(), workunit, request) - .await?, + self.inner.run(context.clone(), workunit, request).await?, false, ) }; @@ -490,3 +495,109 @@ impl crate::CommandRunner for CommandRunner { Ok(result) } } + +/// Check the remote Action Cache for a cached result of running the given `command` and the Action +/// with the given `action_digest`. +/// +/// This check is necessary because some REAPI servers do not short-circuit the Execute method +/// by checking the Action Cache (e.g., BuildBarn). Thus, this client must check the cache +/// explicitly in order to avoid duplicating already-cached work. This behavior matches +/// the Bazel RE client. +async fn check_action_cache( + action_digest: Digest, + command_description: &str, + metadata: &ProcessMetadata, + platform: Platform, + context: &Context, + action_cache_client: Arc>, + store: Store, + eager_fetch: bool, + timeout_duration: Duration, +) -> Result, ProcessError> { + in_workunit!( + "check_action_cache", + Level::Debug, + desc = Some(format!("Remote cache lookup for: {}", command_description)), + |workunit| async move { + workunit.increment_counter(Metric::RemoteCacheRequests, 1); + + let client = action_cache_client.as_ref().clone(); + let response = retry_call( + client, + move |mut client| { + let request = remexec::GetActionResultRequest { + action_digest: Some(action_digest.into()), + instance_name: metadata.instance_name.as_ref().cloned().unwrap_or_default(), + ..remexec::GetActionResultRequest::default() + }; + let request = apply_headers(Request::new(request), &context.build_id); + async move { + let lookup_fut = client.get_action_result(request); + let timeout_fut = tokio::time::timeout(timeout_duration, lookup_fut); + timeout_fut + .await + .unwrap_or_else(|_| Err(Status::unavailable("Pants client timeout"))) + } + }, + status_is_retryable, + ) + .and_then(|action_result| async move { + let action_result = action_result.into_inner(); + let response = populate_fallible_execution_result( + store.clone(), + context.run_id, + &action_result, + platform, + false, + ProcessResultSource::HitRemotely, + ) + .await + .map_err(|e| Status::unavailable(format!("Output roots could not be loaded: {e}")))?; + + if eager_fetch { + // NB: `ensure_local_has_file` and `ensure_local_has_recursive_directory` are internally + // retried. + let response = response.clone(); + in_workunit!( + "eager_fetch_action_cache", + Level::Trace, + desc = Some("eagerly fetching after action cache hit".to_owned()), + |_workunit| async move { + future::try_join_all(vec![ + store.ensure_local_has_file(response.stdout_digest).boxed(), + store.ensure_local_has_file(response.stderr_digest).boxed(), + store + .ensure_local_has_recursive_directory(response.output_directory) + .boxed(), + ]) + .await + } + ) + .await + .map_err(|e| Status::unavailable(format!("Output content could not be loaded: {e}")))?; + } + Ok(response) + }) + .await; + + match response { + Ok(response) => { + workunit.increment_counter(Metric::RemoteCacheRequestsCached, 1); + Ok(Some(response)) + } + Err(status) => match status.code() { + Code::NotFound => { + workunit.increment_counter(Metric::RemoteCacheRequestsUncached, 1); + Ok(None) + } + _ => { + workunit.increment_counter(Metric::RemoteCacheReadErrors, 1); + // TODO: Ensure that we're catching missing digests. + Err(status_to_str(status).into()) + } + }, + } + } + ) + .await +} diff --git a/src/rust/engine/process_execution/src/remote_cache_tests.rs b/src/rust/engine/process_execution/src/remote_cache_tests.rs index 3e1022399bb..392cbdeab2e 100644 --- a/src/rust/engine/process_execution/src/remote_cache_tests.rs +++ b/src/rust/engine/process_execution/src/remote_cache_tests.rs @@ -28,7 +28,7 @@ use crate::{ const CACHE_READ_TIMEOUT: Duration = Duration::from_secs(5); /// A mock of the local runner used for better hermeticity of the tests. -#[derive(Clone)] +#[derive(Debug, Clone)] struct MockLocalCommandRunner { result: Result, call_counter: Arc, @@ -553,6 +553,7 @@ async fn extract_output_file() { #[tokio::test] async fn make_action_result_basic() { + #[derive(Debug)] struct MockCommandRunner; #[async_trait] diff --git a/src/rust/engine/process_execution/src/remote_tests.rs b/src/rust/engine/process_execution/src/remote_tests.rs index b7ce1d958ee..637c104c509 100644 --- a/src/rust/engine/process_execution/src/remote_tests.rs +++ b/src/rust/engine/process_execution/src/remote_tests.rs @@ -21,7 +21,7 @@ use testutil::data::{TestData, TestDirectory, TestTree}; use testutil::{owned_string_vec, relative_paths}; use workunit_store::{RunId, WorkunitStore}; -use crate::remote::{digest, CommandRunner, ExecutionError, OperationOrStatus}; +use crate::remote::{CommandRunner, ExecutionError, OperationOrStatus}; use crate::{ CommandRunner as CommandRunnerTrait, Context, FallibleProcessResultWithPlatform, InputDigests, Platform, Process, ProcessCacheScope, ProcessError, ProcessMetadata, @@ -36,8 +36,6 @@ const RETRY_INTERVAL: Duration = Duration::from_micros(0); const STORE_CONCURRENCY_LIMIT: usize = 256; const STORE_BATCH_API_SIZE_LIMIT: usize = 4 * 1024 * 1024; const EXEC_CONCURRENCY_LIMIT: usize = 256; -const CACHE_CONCURRENCY_LIMIT: usize = 256; -const CACHE_READ_TIMEOUT: Duration = Duration::from_secs(5); #[derive(Debug, PartialEq)] pub(crate) struct RemoteTestResult { @@ -662,32 +660,25 @@ async fn successful_with_only_call_to_execute() { let op_name = "gimme-foo".to_string(); let mock_server = { - let (action, _, execute_request) = crate::remote::make_execute_request( + let (_, _, execute_request) = crate::remote::make_execute_request( &execute_request.clone().try_into().unwrap(), ProcessMetadata::default(), ) .unwrap(); - let action_digest = digest(&action).unwrap(); mock::execution_server::TestServer::new( - mock::execution_server::MockExecution::new(vec![ - ExpectedAPICall::GetActionResult { - action_digest, - response: Err(Status::not_found("")), - }, - ExpectedAPICall::Execute { - execute_request, - stream_responses: Ok(vec![ - make_incomplete_operation(&op_name), - make_successful_operation( - &op_name, - StdoutType::Raw("foo".to_owned()), - StderrType::Raw("".to_owned()), - 0, - ), - ]), - }, - ]), + mock::execution_server::MockExecution::new(vec![ExpectedAPICall::Execute { + execute_request, + stream_responses: Ok(vec![ + make_incomplete_operation(&op_name), + make_successful_operation( + &op_name, + StdoutType::Raw("foo".to_owned()), + StderrType::Raw("".to_owned()), + 0, + ), + ]), + }]), None, ) }; @@ -710,19 +701,14 @@ async fn successful_after_reconnect_with_wait_execution() { let op_name = "gimme-foo".to_string(); let mock_server = { - let (action, _, execute_request) = crate::remote::make_execute_request( + let (_, _, execute_request) = crate::remote::make_execute_request( &execute_request.clone().try_into().unwrap(), ProcessMetadata::default(), ) .unwrap(); - let action_digest = digest(&action).unwrap(); mock::execution_server::TestServer::new( mock::execution_server::MockExecution::new(vec![ - ExpectedAPICall::GetActionResult { - action_digest, - response: Err(Status::not_found("".to_owned())), - }, ExpectedAPICall::Execute { execute_request, stream_responses: Ok(vec![make_incomplete_operation(&op_name)]), @@ -760,7 +746,7 @@ async fn successful_after_reconnect_from_retryable_error() { let op_name_2 = "gimme-bar".to_string(); let mock_server = { - let (action, _, execute_request) = crate::remote::make_execute_request( + let (_, _, execute_request) = crate::remote::make_execute_request( &execute_request.clone().try_into().unwrap(), ProcessMetadata::default(), ) @@ -768,14 +754,8 @@ async fn successful_after_reconnect_from_retryable_error() { let execute_request_2 = execute_request.clone(); - let action_digest = digest(&action).unwrap(); - mock::execution_server::TestServer::new( mock::execution_server::MockExecution::new(vec![ - ExpectedAPICall::GetActionResult { - action_digest, - response: Err(Status::not_found("".to_owned())), - }, ExpectedAPICall::Execute { execute_request, stream_responses: Ok(vec![ @@ -811,47 +791,6 @@ async fn successful_after_reconnect_from_retryable_error() { assert_cancellation_requests(&mock_server, vec![]); } -#[tokio::test] -async fn successful_served_from_action_cache() { - WorkunitStore::setup_for_tests(); - let execute_request = echo_foo_request(); - - let mock_server = { - let (action, _, _) = crate::remote::make_execute_request( - &execute_request.clone().try_into().unwrap(), - ProcessMetadata::default(), - ) - .unwrap(); - - let action_digest = digest(&action).unwrap(); - - let action_result = make_action_result( - StdoutType::Raw("foo".to_owned()), - StderrType::Raw("".to_owned()), - 0, - None, - ); - - mock::execution_server::TestServer::new( - mock::execution_server::MockExecution::new(vec![ExpectedAPICall::GetActionResult { - action_digest, - response: Ok(action_result), - }]), - None, - ) - }; - - let result = run_command_remote(mock_server.address(), execute_request) - .await - .unwrap(); - - assert_eq!(result.stdout_bytes, "foo".as_bytes()); - assert_eq!(result.stderr_bytes, "".as_bytes()); - assert_eq!(result.original.exit_code, 0); - assert_eq!(result.original.output_directory, *EMPTY_DIRECTORY_DIGEST); - assert_cancellation_requests(&mock_server, vec![]); -} - #[tokio::test] async fn server_rejecting_execute_request_gives_error() { WorkunitStore::setup_for_tests(); @@ -859,27 +798,15 @@ async fn server_rejecting_execute_request_gives_error() { let execute_request = echo_foo_request(); let mock_server = mock::execution_server::TestServer::new( - mock::execution_server::MockExecution::new(vec![ - ExpectedAPICall::GetActionResult { - action_digest: hashing::Digest::new( - hashing::Fingerprint::from_hex_string( - "bf10cd168ad711602f7a241cbcbc9a3d32497fdd1465d8a01d4549ee7d8ebc08", - ) - .unwrap(), - 142, - ), - response: Err(Status::not_found("")), - }, - ExpectedAPICall::Execute { - execute_request: crate::remote::make_execute_request( - &Process::new(owned_string_vec(&["/bin/echo", "-n", "bar"])), - ProcessMetadata::default(), - ) - .unwrap() - .2, - stream_responses: Err(Status::invalid_argument("".to_owned())), - }, - ]), + mock::execution_server::MockExecution::new(vec![ExpectedAPICall::Execute { + execute_request: crate::remote::make_execute_request( + &Process::new(owned_string_vec(&["/bin/echo", "-n", "bar"])), + ProcessMetadata::default(), + ) + .unwrap() + .2, + stream_responses: Err(Status::invalid_argument("".to_owned())), + }]), None, ); @@ -897,25 +824,17 @@ async fn server_sending_triggering_timeout_with_deadline_exceeded() { let execute_request = echo_foo_request(); let mock_server = { - let (action, _, execute_request) = crate::remote::make_execute_request( + let (_, _, execute_request) = crate::remote::make_execute_request( &execute_request.clone().try_into().unwrap(), ProcessMetadata::default(), ) .unwrap(); - let action_digest = digest(&action).unwrap(); - mock::execution_server::TestServer::new( - mock::execution_server::MockExecution::new(vec![ - ExpectedAPICall::GetActionResult { - action_digest, - response: Err(Status::not_found("".to_owned())), - }, - ExpectedAPICall::Execute { - execute_request, - stream_responses: Err(Status::deadline_exceeded("")), - }, - ]), + mock::execution_server::MockExecution::new(vec![ExpectedAPICall::Execute { + execute_request, + stream_responses: Err(Status::deadline_exceeded("")), + }]), None, ) }; @@ -934,33 +853,25 @@ async fn sends_headers() { let op_name = "gimme-foo".to_string(); let mock_server = { - let (action, _, execute_request) = crate::remote::make_execute_request( + let (_, _, execute_request) = crate::remote::make_execute_request( &execute_request.clone().try_into().unwrap(), ProcessMetadata::default(), ) .unwrap(); - let action_digest = digest(&action).unwrap(); - mock::execution_server::TestServer::new( - mock::execution_server::MockExecution::new(vec![ - ExpectedAPICall::GetActionResult { - action_digest, - response: Err(Status::not_found("".to_owned())), - }, - ExpectedAPICall::Execute { - execute_request, - stream_responses: Ok(vec![ - make_incomplete_operation(&op_name), - make_successful_operation( - &op_name, - StdoutType::Raw("foo".to_owned()), - StderrType::Raw("".to_owned()), - 0, - ), - ]), - }, - ]), + mock::execution_server::MockExecution::new(vec![ExpectedAPICall::Execute { + execute_request, + stream_responses: Ok(vec![ + make_incomplete_operation(&op_name), + make_successful_operation( + &op_name, + StdoutType::Raw("foo".to_owned()), + StderrType::Raw("".to_owned()), + 0, + ), + ]), + }]), None, ) }; @@ -984,7 +895,6 @@ async fn sends_headers() { .unwrap(); let command_runner = CommandRunner::new( - &mock_server.address(), &mock_server.address(), ProcessMetadata::default(), None, @@ -997,8 +907,6 @@ async fn sends_headers() { OVERALL_DEADLINE_SECS, RETRY_INTERVAL, EXEC_CONCURRENCY_LIMIT, - CACHE_CONCURRENCY_LIMIT, - CACHE_READ_TIMEOUT, None, ) .unwrap(); @@ -1017,7 +925,7 @@ async fn sends_headers() { .iter() .map(|received_message| received_message.headers.clone()) .collect(); - assert_that!(message_headers).has_length(2); + assert_that!(message_headers).has_length(1); for headers in message_headers { { let want_key = "google.devtools.remoteexecution.v1test.requestmetadata-bin"; @@ -1137,33 +1045,25 @@ async fn ensure_inline_stdio_is_stored() { let mock_server = { let op_name = "cat".to_owned(); - let (action, _, execute_request) = crate::remote::make_execute_request( + let (_, _, execute_request) = crate::remote::make_execute_request( &echo_roland_request().try_into().unwrap(), ProcessMetadata::default(), ) .unwrap(); - let action_digest = digest(&action).unwrap(); - mock::execution_server::TestServer::new( - mock::execution_server::MockExecution::new(vec![ - ExpectedAPICall::GetActionResult { - action_digest, - response: Err(Status::not_found("".to_owned())), - }, - ExpectedAPICall::Execute { - execute_request, - stream_responses: Ok(vec![ - make_incomplete_operation(&op_name), - make_successful_operation( - &op_name.clone(), - StdoutType::Raw(test_stdout.string()), - StderrType::Raw(test_stderr.string()), - 0, - ), - ]), - }, - ]), + mock::execution_server::MockExecution::new(vec![ExpectedAPICall::Execute { + execute_request, + stream_responses: Ok(vec![ + make_incomplete_operation(&op_name), + make_successful_operation( + &op_name.clone(), + StdoutType::Raw(test_stdout.string()), + StderrType::Raw(test_stderr.string()), + 0, + ), + ]), + }]), None, ) }; @@ -1189,7 +1089,6 @@ async fn ensure_inline_stdio_is_stored() { .unwrap(); let cmd_runner = CommandRunner::new( - &mock_server.address(), &mock_server.address(), ProcessMetadata::default(), None, @@ -1199,8 +1098,6 @@ async fn ensure_inline_stdio_is_stored() { OVERALL_DEADLINE_SECS, RETRY_INTERVAL, EXEC_CONCURRENCY_LIMIT, - CACHE_CONCURRENCY_LIMIT, - CACHE_READ_TIMEOUT, None, ) .unwrap(); @@ -1283,38 +1180,30 @@ async fn initial_response_error() { let mock_server = { let op_name = "gimme-foo".to_string(); - let (action, _, execute_request) = crate::remote::make_execute_request( + let (_, _, execute_request) = crate::remote::make_execute_request( &execute_request.clone().try_into().unwrap(), ProcessMetadata::default(), ) .unwrap(); - let action_digest = digest(&action).unwrap(); - mock::execution_server::TestServer::new( - mock::execution_server::MockExecution::new(vec![ - ExpectedAPICall::GetActionResult { - action_digest, - response: Err(Status::not_found("".to_owned())), - }, - ExpectedAPICall::Execute { - execute_request, - stream_responses: Ok(vec![MockOperation::new({ - Operation { - name: op_name.to_string(), - done: true, - result: Some(protos::gen::google::longrunning::operation::Result::Error( - protos::gen::google::rpc::Status { - code: Code::Internal as i32, - message: "Something went wrong".to_string(), - ..Default::default() - }, - )), - ..Default::default() - } - })]), - }, - ]), + mock::execution_server::MockExecution::new(vec![ExpectedAPICall::Execute { + execute_request, + stream_responses: Ok(vec![MockOperation::new({ + Operation { + name: op_name.to_string(), + done: true, + result: Some(protos::gen::google::longrunning::operation::Result::Error( + protos::gen::google::rpc::Status { + code: Code::Internal as i32, + message: "Something went wrong".to_string(), + ..Default::default() + }, + )), + ..Default::default() + } + })]), + }]), None, ) }; @@ -1333,31 +1222,23 @@ async fn initial_response_missing_response_and_error() { let mock_server = { let op_name = "gimme-foo".to_string(); - let (action, _, execute_request) = crate::remote::make_execute_request( + let (_, _, execute_request) = crate::remote::make_execute_request( &execute_request.clone().try_into().unwrap(), ProcessMetadata::default(), ) .unwrap(); - let action_digest = digest(&action).unwrap(); - mock::execution_server::TestServer::new( - mock::execution_server::MockExecution::new(vec![ - ExpectedAPICall::GetActionResult { - action_digest, - response: Err(Status::not_found("".to_owned())), - }, - ExpectedAPICall::Execute { - execute_request, - stream_responses: Ok(vec![MockOperation::new({ - Operation { - name: op_name.to_string(), - done: true, - ..Default::default() - } - })]), - }, - ]), + mock::execution_server::MockExecution::new(vec![ExpectedAPICall::Execute { + execute_request, + stream_responses: Ok(vec![MockOperation::new({ + Operation { + name: op_name.to_string(), + done: true, + ..Default::default() + } + })]), + }]), None, ) }; @@ -1378,20 +1259,14 @@ async fn fails_after_retry_limit_exceeded() { let execute_request = echo_foo_request(); let mock_server = { - let (action, _, execute_request) = crate::remote::make_execute_request( + let (_, _, execute_request) = crate::remote::make_execute_request( &execute_request.clone().try_into().unwrap(), ProcessMetadata::default(), ) .unwrap(); - let action_digest = digest(&action).unwrap(); - mock::execution_server::TestServer::new( mock::execution_server::MockExecution::new(vec![ - ExpectedAPICall::GetActionResult { - action_digest, - response: Err(Status::not_found("".to_owned())), - }, ExpectedAPICall::Execute { execute_request: execute_request.clone(), stream_responses: Ok(vec![make_retryable_operation_failure()]), @@ -1438,20 +1313,14 @@ async fn fails_after_retry_limit_exceeded_with_stream_close() { let mock_server = { let op_name = "foo-bar".to_owned(); - let (action, _, execute_request) = crate::remote::make_execute_request( + let (_, _, execute_request) = crate::remote::make_execute_request( &execute_request.clone().try_into().unwrap(), ProcessMetadata::default(), ) .unwrap(); - let action_digest = digest(&action).unwrap(); - mock::execution_server::TestServer::new( mock::execution_server::MockExecution::new(vec![ - ExpectedAPICall::GetActionResult { - action_digest, - response: Err(Status::not_found("".to_owned())), - }, ExpectedAPICall::Execute { execute_request: execute_request.clone(), stream_responses: Ok(vec![make_incomplete_operation(&op_name)]), @@ -1501,20 +1370,14 @@ async fn execute_missing_file_uploads_if_known() { let mock_server = { let op_name = "cat".to_owned(); - let (action, _, execute_request) = crate::remote::make_execute_request( + let (_, _, execute_request) = crate::remote::make_execute_request( &cat_roland_request().try_into().unwrap(), ProcessMetadata::default(), ) .unwrap(); - let action_digest = digest(&action).unwrap(); - mock::execution_server::TestServer::new( mock::execution_server::MockExecution::new(vec![ - ExpectedAPICall::GetActionResult { - action_digest, - response: Err(Status::not_found("".to_owned())), - }, ExpectedAPICall::Execute { execute_request, stream_responses: Ok(vec![ @@ -1574,7 +1437,6 @@ async fn execute_missing_file_uploads_if_known() { .await .expect("Saving directory bytes to store"); let command_runner = CommandRunner::new( - &mock_server.address(), &mock_server.address(), ProcessMetadata::default(), None, @@ -1584,8 +1446,6 @@ async fn execute_missing_file_uploads_if_known() { OVERALL_DEADLINE_SECS, RETRY_INTERVAL, EXEC_CONCURRENCY_LIMIT, - CACHE_CONCURRENCY_LIMIT, - CACHE_READ_TIMEOUT, None, ) .unwrap(); @@ -1612,16 +1472,7 @@ async fn execute_missing_file_errors_if_unknown() { let mock_server = { mock::execution_server::TestServer::new( - mock::execution_server::MockExecution::new(vec![ExpectedAPICall::GetActionResult { - action_digest: hashing::Digest::new( - hashing::Fingerprint::from_hex_string( - "3174d44cc8d68ec5047f2da24a4db150ab0264a9d6558872081585696a98b410", - ) - .unwrap(), - 144, - ), - response: Err(Status::not_found("".to_owned())), - }]), + mock::execution_server::MockExecution::new(vec![]), None, ) }; @@ -1646,7 +1497,6 @@ async fn execute_missing_file_errors_if_unknown() { .unwrap(); let runner = CommandRunner::new( - &mock_server.address(), &mock_server.address(), ProcessMetadata::default(), None, @@ -1656,8 +1506,6 @@ async fn execute_missing_file_errors_if_unknown() { OVERALL_DEADLINE_SECS, RETRY_INTERVAL, EXEC_CONCURRENCY_LIMIT, - CACHE_CONCURRENCY_LIMIT, - CACHE_READ_TIMEOUT, None, ) .unwrap(); @@ -2321,7 +2169,6 @@ fn create_command_runner( let store_dir = TempDir::new().unwrap(); let store = make_store(store_dir.path(), cas, runtime.clone()); let command_runner = CommandRunner::new( - &address, &address, ProcessMetadata::default(), None, @@ -2331,8 +2178,6 @@ fn create_command_runner( OVERALL_DEADLINE_SECS, RETRY_INTERVAL, EXEC_CONCURRENCY_LIMIT, - CACHE_CONCURRENCY_LIMIT, - CACHE_READ_TIMEOUT, None, ) .expect("Failed to make command runner"); diff --git a/src/rust/engine/process_executor/src/main.rs b/src/rust/engine/process_executor/src/main.rs index 04f2850a339..dc0cdbc08ab 100644 --- a/src/rust/engine/process_executor/src/main.rs +++ b/src/rust/engine/process_executor/src/main.rs @@ -30,6 +30,7 @@ use std::collections::{BTreeMap, BTreeSet}; use std::iter::{FromIterator, Iterator}; use std::path::PathBuf; use std::process::exit; +use std::sync::Arc; use std::time::Duration; use fs::{DirectoryDigest, Permissions, RelativePath}; @@ -289,24 +290,39 @@ async fn main() { ); } + let remote_runner = process_execution::remote::CommandRunner::new( + &address, + process_metadata.clone(), + root_ca_certs.clone(), + headers.clone(), + store.clone(), + Platform::Linux_x86_64, + Duration::from_secs(args.overall_deadline_secs), + Duration::from_millis(100), + args.execution_rpc_concurrency, + None, + ) + .expect("Failed to make remote command runner"); + let command_runner_box: Box = { Box::new( - process_execution::remote::CommandRunner::new( - &address, - &address, + process_execution::remote_cache::CommandRunner::new( + Arc::new(remote_runner), process_metadata, + executor, + store.clone(), + &address, root_ca_certs, headers, - store.clone(), Platform::Linux_x86_64, - Duration::from_secs(args.overall_deadline_secs), - Duration::from_millis(100), - args.execution_rpc_concurrency, + true, + true, + process_execution::remote_cache::RemoteCacheWarningsBehavior::Backoff, + false, args.cache_rpc_concurrency, Duration::from_secs(2), - None, ) - .expect("Failed to make command runner"), + .expect("Failed to make remote cache command runner"), ) }; diff --git a/src/rust/engine/src/context.rs b/src/rust/engine/src/context.rs index 5ff96f1912f..18c0a73bd67 100644 --- a/src/rust/engine/src/context.rs +++ b/src/rust/engine/src/context.rs @@ -60,7 +60,10 @@ pub struct Core { pub intrinsics: Intrinsics, pub executor: Executor, store: Store, - pub command_runner: Box, + /// The CommandRunners to use for execution, in ascending order of reliability (for the purposes + /// of backtracking). For performance reasons, caching `CommandRunners` might skip validation of + /// their outputs, and so should be listed before uncached `CommandRunners`. + pub command_runners: Vec>, pub http_client: reqwest::Client, pub local_cache: PersistentCache, pub vfs: PosixFS, @@ -167,25 +170,61 @@ impl Core { } } - fn make_local_execution_runner( - store: &Store, + /// + /// Make the innermost / leaf runner. Will have concurrency control and process pooling, but + /// will not have caching. + /// + fn make_leaf_runner( + full_store: &Store, executor: &Executor, local_execution_root_dir: &Path, named_caches_dir: &Path, + process_execution_metadata: &ProcessMetadata, + root_ca_certs: &Option>, exec_strategy_opts: &ExecutionStrategyOptions, - ) -> Result, String> { - let immutable_inputs = ImmutableInputs::new(store.clone(), local_execution_root_dir)?; - let local_command_runner = local::CommandRunner::new( - store.clone(), - executor.clone(), - local_execution_root_dir.to_path_buf(), - NamedCaches::new(named_caches_dir.to_path_buf()), - immutable_inputs, - exec_strategy_opts.local_cleanup, - ); - - let maybe_nailgunnable_local_command_runner: Box = - if exec_strategy_opts.local_enable_nailgun { + remoting_opts: &RemotingOptions, + capabilities_cell_opt: Option>>, + ) -> Result, String> { + let (runner, parallelism): (Box, usize) = if remoting_opts.execution_enable { + ( + Box::new(remote::CommandRunner::new( + // We unwrap because global_options.py will have already validated these are defined. + remoting_opts.execution_address.as_ref().unwrap(), + process_execution_metadata.clone(), + root_ca_certs.clone(), + remoting_opts.execution_headers.clone(), + full_store.clone(), + // TODO if we ever want to configure the remote platform to be something else we + // need to take an option all the way down here and into the remote::CommandRunner struct. + Platform::Linux_x86_64, + remoting_opts.execution_overall_deadline, + Duration::from_millis(100), + remoting_opts.execution_rpc_concurrency, + capabilities_cell_opt, + )?), + exec_strategy_opts.remote_parallelism, + ) + } else { + // If eager_fetch is enabled, we do not want to use any remote store with the local command + // runner. This reduces the surface area of where the remote store is + // used to only be the remote cache command runner. + let store_for_local_runner = if remoting_opts.cache_eager_fetch { + full_store.clone().into_local_only() + } else { + full_store.clone() + }; + let immutable_inputs = + ImmutableInputs::new(store_for_local_runner.clone(), local_execution_root_dir)?; + let local_command_runner = local::CommandRunner::new( + store_for_local_runner.clone(), + executor.clone(), + local_execution_root_dir.to_path_buf(), + NamedCaches::new(named_caches_dir.to_path_buf()), + immutable_inputs, + exec_strategy_opts.local_cleanup, + ); + + let runner: Box = if exec_strategy_opts.local_enable_nailgun { // We set the nailgun pool size to the number of instances that fit within the memory // parameters configured when a max child process memory has been given. // Otherwise, pool size will be double of the local parallelism so we can always keep @@ -202,7 +241,7 @@ impl Core { Box::new(nailgun::CommandRunner::new( local_command_runner, local_execution_root_dir.to_path_buf(), - store.clone(), + store_for_local_runner, executor.clone(), pool_size, )) @@ -210,17 +249,73 @@ impl Core { Box::new(local_command_runner) }; - Ok(Box::new(bounded::CommandRunner::new( + (runner, exec_strategy_opts.local_parallelism) + }; + + Ok(Arc::new(bounded::CommandRunner::new( executor, - maybe_nailgunnable_local_command_runner, - exec_strategy_opts.local_parallelism, + runner, + parallelism, ))) } - fn make_command_runner( + /// + /// Wraps the given runner in the local cache runner. + /// + fn make_local_cached_runner( + inner_runner: Arc, + full_store: &Store, + local_cache: &PersistentCache, + process_execution_metadata: &ProcessMetadata, + ) -> Arc { + Arc::new(process_execution::cache::CommandRunner::new( + inner_runner, + local_cache.clone(), + full_store.clone(), + process_execution_metadata.clone(), + )) + } + + /// + /// Wraps the given runner in any configured caches. + /// + fn make_remote_cached_runner( + inner_runner: Arc, full_store: &Store, remote_store_address: &Option, executor: &Executor, + process_execution_metadata: &ProcessMetadata, + root_ca_certs: &Option>, + _exec_strategy_opts: &ExecutionStrategyOptions, + remoting_opts: &RemotingOptions, + remote_cache_read: bool, + remote_cache_write: bool, + eager_fetch: bool, + ) -> Result, String> { + Ok(Arc::new(remote_cache::CommandRunner::new( + inner_runner, + process_execution_metadata.clone(), + executor.clone(), + full_store.clone(), + remote_store_address.as_ref().unwrap(), + root_ca_certs.clone(), + remoting_opts.store_headers.clone(), + Platform::current()?, + remote_cache_read, + remote_cache_write, + remoting_opts.cache_warnings_behavior, + eager_fetch, + remoting_opts.cache_rpc_concurrency, + remoting_opts.cache_read_timeout, + )?)) + } + + /// + /// Creates the stack of CommandRunners for the purposes of backtracking. + /// + fn make_command_runners( + full_store: &Store, + executor: &Executor, local_cache: &PersistentCache, local_execution_root_dir: &Path, named_caches_dir: &Path, @@ -229,87 +324,76 @@ impl Core { exec_strategy_opts: &ExecutionStrategyOptions, remoting_opts: &RemotingOptions, capabilities_cell_opt: Option>>, - ) -> Result, String> { - let remote_caching_used = - exec_strategy_opts.remote_cache_read || exec_strategy_opts.remote_cache_write; - - // If remote caching is used with eager_fetch, we do not want to use the remote store - // with the local command runner. This reduces the surface area of where the remote store is - // used to only be the remote cache command runner. - let store_for_local_runner = if remote_caching_used && remoting_opts.cache_eager_fetch { - full_store.clone().into_local_only() - } else { - full_store.clone() - }; - - let local_command_runner = Core::make_local_execution_runner( - &store_for_local_runner, + ) -> Result>, String> { + let leaf_runner = Self::make_leaf_runner( + full_store, executor, local_execution_root_dir, named_caches_dir, + process_execution_metadata, + root_ca_certs, exec_strategy_opts, + remoting_opts, + capabilities_cell_opt, )?; - // Possibly either add the remote execution runner or the remote cache runner. - // `global_options.py` already validates that both are not set at the same time. - let maybe_remote_enabled_command_runner: Box = - if remoting_opts.execution_enable { - Box::new(bounded::CommandRunner::new( - executor, - Box::new(remote::CommandRunner::new( - // We unwrap because global_options.py will have already validated these are defined. - remoting_opts.execution_address.as_ref().unwrap(), - remoting_opts.store_address.as_ref().unwrap(), - process_execution_metadata.clone(), - root_ca_certs.clone(), - remoting_opts.execution_headers.clone(), - full_store.clone(), - // TODO if we ever want to configure the remote platform to be something else we - // need to take an option all the way down here and into the remote::CommandRunner struct. - Platform::Linux_x86_64, - remoting_opts.execution_overall_deadline, - Duration::from_millis(100), - remoting_opts.execution_rpc_concurrency, - remoting_opts.cache_rpc_concurrency, - remoting_opts.cache_read_timeout, - capabilities_cell_opt, - )?), - exec_strategy_opts.remote_parallelism, + // TODO: Until we can deprecate letting the flag default, we implicitly disable + // eager_fetch when remote execution is in use. + let eager_fetch = remoting_opts.cache_eager_fetch && !remoting_opts.execution_enable; + // TODO: Until we can deprecate letting remote-cache-{read,write} default, we implicitly + // enable them when remote execution is in use. + let remote_cache_read = exec_strategy_opts.remote_cache_read || remoting_opts.execution_enable; + let remote_cache_write = + exec_strategy_opts.remote_cache_write || remoting_opts.execution_enable; + let maybe_remote_cached_runner = if remote_cache_read || remote_cache_write { + Some(Self::make_remote_cached_runner( + leaf_runner.clone(), + full_store, + &remoting_opts.store_address, + executor, + process_execution_metadata, + root_ca_certs, + exec_strategy_opts, + remoting_opts, + remote_cache_read, + remote_cache_write, + eager_fetch, + )?) + } else { + None + }; + + // TODO: The local cache eagerly fetches outputs independent of the `eager_fetch` flag. Once + // `eager_fetch` backtracks via https://github.com/pantsbuild/pants/issues/11331, the local + // cache will be able to obey `eager_fetch` as well, and can efficiently be used with remote + // execution. + let maybe_local_cached_runner = + if exec_strategy_opts.local_cache && !remoting_opts.execution_enable { + Some(Self::make_local_cached_runner( + maybe_remote_cached_runner + .clone() + .unwrap_or_else(|| leaf_runner.clone()), + full_store, + local_cache, + process_execution_metadata, )) - } else if remote_caching_used { - Box::new(remote_cache::CommandRunner::new( - local_command_runner.into(), - process_execution_metadata.clone(), - executor.clone(), - full_store.clone(), - remote_store_address.as_ref().unwrap(), - root_ca_certs.clone(), - remoting_opts.store_headers.clone(), - Platform::current()?, - exec_strategy_opts.remote_cache_read, - exec_strategy_opts.remote_cache_write, - remoting_opts.cache_warnings_behavior, - remoting_opts.cache_eager_fetch, - remoting_opts.cache_rpc_concurrency, - remoting_opts.cache_read_timeout, - )?) } else { - local_command_runner + None }; - // Possibly use the local cache runner, regardless of remote execution/caching. - let maybe_local_cached_command_runner = if exec_strategy_opts.local_cache { - Box::new(process_execution::cache::CommandRunner::new( - maybe_remote_enabled_command_runner.into(), - local_cache.clone(), - full_store.clone(), - process_execution_metadata.clone(), - )) - } else { - maybe_remote_enabled_command_runner - }; - - Ok(maybe_local_cached_command_runner) + Ok( + vec![ + // Use all enabled caches on the first attempt. + maybe_local_cached_runner, + // Remove local caching on the second attempt. + maybe_remote_cached_runner, + // Remove all caching on the third attempt. + Some(leaf_runner), + ] + .into_iter() + .flatten() + .collect(), + ) } fn load_certificates( @@ -430,9 +514,8 @@ impl Core { platform_properties: remoting_opts.execution_extra_platform_properties.clone(), }; - let command_runner = Self::make_command_runner( + let command_runners = Self::make_command_runners( &full_store, - &remoting_opts.store_address, &executor, &local_cache, &local_execution_root_dir, @@ -443,6 +526,7 @@ impl Core { &remoting_opts, capabilities_cell_opt, )?; + log::debug!("Using {command_runners:?} for process execution."); let graph = Arc::new(InvalidatableGraph(Graph::new(executor.clone()))); @@ -491,7 +575,7 @@ impl Core { intrinsics, executor: executor.clone(), store, - command_runner, + command_runners, http_client, local_cache, vfs: PosixFS::new(&build_root, ignorer, executor) @@ -518,7 +602,7 @@ impl Core { if let Err(msg) = self.sessions.shutdown(timeout).await { log::warn!("During shutdown: {}", msg); } - // Then clear the Graph to ensure that drop handlers run (particular for running processes). + // Then clear the Graph to ensure that drop handlers run (particularly for running processes). self.graph.clear(); } } diff --git a/src/rust/engine/src/nodes.rs b/src/rust/engine/src/nodes.rs index 70ab67a119d..cd4c1345b83 100644 --- a/src/rust/engine/src/nodes.rs +++ b/src/rust/engine/src/nodes.rs @@ -387,7 +387,7 @@ impl ExecuteProcess { ) -> NodeResult { let request = self.process; - let command_runner = &context.core.command_runner; + let command_runner = &context.core.command_runners[0]; let execution_context = process_execution::Context::new( context.session.workunit_store(),