From 634679a3c9a4c7073230979875dc776ee5926e34 Mon Sep 17 00:00:00 2001 From: Stu Hood Date: Fri, 3 Jun 2022 18:56:10 -0700 Subject: [PATCH] Record metrics for remote cache store errors (Cherry-pick of #15744) (#15747) The TODO referenced from #12544 was even more wrong than initially suspected. All of the relevant `remote::Store` accessing methods have internal retry, so the only issue was that we were not recording errors for requests which failed to fetch process outputs. Fixes #12544. [ci skip-build-wheels] --- .../engine/process_execution/src/remote.rs | 74 ++++++++++--------- .../src/remote_cache_tests.rs | 56 ++++++++++++-- 2 files changed, 90 insertions(+), 40 deletions(-) diff --git a/src/rust/engine/process_execution/src/remote.rs b/src/rust/engine/process_execution/src/remote.rs index 05c28d5e835..9f828f7b501 100644 --- a/src/rust/engine/process_execution/src/remote.rs +++ b/src/rust/engine/process_execution/src/remote.rs @@ -1337,7 +1337,7 @@ pub async fn check_action_cache( workunit.increment_counter(Metric::RemoteCacheRequests, 1); let client = action_cache_client.as_ref().clone(); - let action_result_response = retry_call( + let response = retry_call( client, move |mut client| { let request = remexec::GetActionResultRequest { @@ -1356,41 +1356,47 @@ pub async fn check_action_cache( }, status_is_retryable, ) + .and_then(|action_result| async move { + let action_result = action_result.into_inner(); + let response = populate_fallible_execution_result( + store.clone(), + context.run_id, + &action_result, + platform, + false, + ProcessResultSource::HitRemotely, + ) + .await + .map_err(|e| Status::unavailable(format!("Output roots could not be loaded: {e}")))?; + + if eager_fetch { + // NB: `ensure_local_has_file` and `ensure_local_has_recursive_directory` are internally + // retried. + let response = response.clone(); + in_workunit!( + "eager_fetch_action_cache", + Level::Trace, + desc = Some("eagerly fetching after action cache hit".to_owned()), + |_workunit| async move { + future::try_join_all(vec![ + store.ensure_local_has_file(response.stdout_digest).boxed(), + store.ensure_local_has_file(response.stderr_digest).boxed(), + store + .ensure_local_has_recursive_directory(response.output_directory) + .boxed(), + ]) + .await + } + ) + .await + .map_err(|e| Status::unavailable(format!("Output content could not be loaded: {e}")))?; + } + Ok(response) + }) .await; - match action_result_response { - Ok(action_result) => { - let action_result = action_result.into_inner(); - let response = populate_fallible_execution_result( - store.clone(), - context.run_id, - &action_result, - platform, - false, - ProcessResultSource::HitRemotely, - ) - .await?; - // TODO: This should move inside the retry_call above, both in order to be retried, and - // to ensure that we increment a miss if we fail to eagerly fetch. - if eager_fetch { - let response = response.clone(); - in_workunit!( - "eager_fetch_action_cache", - Level::Trace, - desc = Some("eagerly fetching after action cache hit".to_owned()), - |_workunit| async move { - future::try_join_all(vec![ - store.ensure_local_has_file(response.stdout_digest).boxed(), - store.ensure_local_has_file(response.stderr_digest).boxed(), - store - .ensure_local_has_recursive_directory(response.output_directory) - .boxed(), - ]) - .await - } - ) - .await?; - } + match response { + Ok(response) => { workunit.increment_counter(Metric::RemoteCacheRequestsCached, 1); Ok(Some(response)) } diff --git a/src/rust/engine/process_execution/src/remote_cache_tests.rs b/src/rust/engine/process_execution/src/remote_cache_tests.rs index aee2ab09de4..70ad0d567a3 100644 --- a/src/rust/engine/process_execution/src/remote_cache_tests.rs +++ b/src/rust/engine/process_execution/src/remote_cache_tests.rs @@ -153,8 +153,7 @@ fn create_cached_runner( async fn create_process(store: &Store) -> (Process, Digest) { let process = Process::new(vec![ - testutil::path::find_bash(), - "echo -n hello world".to_string(), + "this process will not execute: see MockLocalCommandRunner".to_string(), ]); let (action, command, _exec_request) = make_execute_request(&process, ProcessMetadata::default()).unwrap(); @@ -202,24 +201,69 @@ async fn cache_read_success() { assert_eq!(local_runner_call_counter.load(Ordering::SeqCst), 0); } -/// If the cache has any issues during reads, we should gracefully fallback to the local runner. +/// If the cache has any issues during reads from the action cache, we should gracefully fallback +/// to the local runner. #[tokio::test] -async fn cache_read_skipped_on_errors() { - let (_, mut workunit) = WorkunitStore::setup_for_tests(); +async fn cache_read_skipped_on_action_cache_errors() { + let (workunit_store, mut workunit) = WorkunitStore::setup_for_tests(); let store_setup = StoreSetup::new(); - let (local_runner, local_runner_call_counter) = create_local_runner(1, 100); + let (local_runner, local_runner_call_counter) = create_local_runner(1, 500); let (cache_runner, action_cache) = create_cached_runner(local_runner, &store_setup, 0, 0, false); let (process, action_digest) = create_process(&store_setup.store).await; insert_into_action_cache(&action_cache, &action_digest, 0, EMPTY_DIGEST, EMPTY_DIGEST); action_cache.always_errors.store(true, Ordering::SeqCst); + assert_eq!( + workunit_store.get_metrics().get("remote_cache_read_errors"), + None + ); assert_eq!(local_runner_call_counter.load(Ordering::SeqCst), 0); let remote_result = cache_runner .run(Context::default(), &mut workunit, process.clone().into()) .await .unwrap(); assert_eq!(remote_result.exit_code, 1); + assert_eq!( + workunit_store.get_metrics().get("remote_cache_read_errors"), + Some(&1) + ); + assert_eq!(local_runner_call_counter.load(Ordering::SeqCst), 1); +} + +/// If the cache has any issues during reads from the store during eager_fetch, we should gracefully +/// fallback to the local runner. +#[tokio::test] +async fn cache_read_skipped_on_store_errors() { + let (workunit_store, mut workunit) = WorkunitStore::setup_for_tests(); + let store_setup = StoreSetup::new(); + let (local_runner, local_runner_call_counter) = create_local_runner(1, 500); + let (cache_runner, action_cache) = create_cached_runner(local_runner, &store_setup, 0, 0, true); + + // Claim that the process has a non-empty and not-persisted stdout digest. + let (process, action_digest) = create_process(&store_setup.store).await; + insert_into_action_cache( + &action_cache, + &action_digest, + 0, + Digest::of_bytes("pigs flying".as_bytes()), + EMPTY_DIGEST, + ); + + assert_eq!( + workunit_store.get_metrics().get("remote_cache_read_errors"), + None + ); + assert_eq!(local_runner_call_counter.load(Ordering::SeqCst), 0); + let remote_result = cache_runner + .run(Context::default(), &mut workunit, process.clone().into()) + .await + .unwrap(); + assert_eq!(remote_result.exit_code, 1); + assert_eq!( + workunit_store.get_metrics().get("remote_cache_read_errors"), + Some(&1) + ); assert_eq!(local_runner_call_counter.load(Ordering::SeqCst), 1); }