diff --git a/src/rust/engine/fs/store/src/lib.rs b/src/rust/engine/fs/store/src/lib.rs index 36dafed6030..e5d038e7b68 100644 --- a/src/rust/engine/fs/store/src/lib.rs +++ b/src/rust/engine/fs/store/src/lib.rs @@ -35,13 +35,14 @@ mod snapshot_ops_tests; mod snapshot_tests; pub use crate::snapshot_ops::{SnapshotOps, SubsetParams}; -use std::collections::{BTreeMap, HashMap, HashSet}; +use std::collections::{BTreeMap, HashMap}; use std::fmt::{self, Debug, Display}; use std::fs::OpenOptions; +use std::future::Future; use std::io::{self, Read, Write}; use std::os::unix::fs::{OpenOptionsExt, PermissionsExt}; use std::path::{Path, PathBuf}; -use std::sync::Arc; +use std::sync::{Arc, Weak}; use std::time::{Duration, Instant}; use async_oncecell::OnceCell; @@ -148,35 +149,78 @@ pub struct UploadSummary { pub upload_wall_time: Duration, } +/// +/// Wraps a `remote::ByteStore` with state to help avoid uploading common blobs multiple times. +/// +/// If a blob is generated that many downstream actions depend on, we would otherwise get an +/// expanded set of digests from each of those actions that includes the new blob. If those actions +/// all execute in a time window smaller than the time taken to upload the blob, the effort would be +/// duplicated leading to both wasted resources locally buffering up the blob as well as wasted +/// effort on the remote server depending on its handling of this. +/// #[derive(Clone, Debug)] struct RemoteStore { store: remote::ByteStore, - in_flight_uploads: Arc>>, + in_flight_uploads: Arc>>>>, + in_flight_downloads: Arc>>>>, } impl RemoteStore { fn new(store: remote::ByteStore) -> Self { Self { store, - in_flight_uploads: Arc::new(parking_lot::Mutex::new(HashSet::new())), + in_flight_uploads: Arc::default(), + in_flight_downloads: Arc::default(), } } - fn reserve_uploads(&self, candidates: HashSet) -> HashSet { - let mut active_uploads = self.in_flight_uploads.lock(); - let to_upload = candidates - .difference(&active_uploads) - .cloned() - .collect::>(); - active_uploads.extend(&to_upload); - to_upload + /// + /// Returns a strongly held cell from a map of weakly held cells, creating it if necessary. + /// + fn cell_from( + cells: &Mutex>>>, + digest: Digest, + ) -> Arc> { + let mut cells = cells.lock(); + if let Some(cell) = cells.get(&digest).and_then(|weak_cell| weak_cell.upgrade()) { + cell + } else { + let cell = Arc::new(OnceCell::new()); + cells.insert(digest, Arc::downgrade(&cell)); + cell + } } - fn release_uploads(&self, uploads: HashSet) { - self - .in_flight_uploads - .lock() - .retain(|d| !uploads.contains(d)); + /// + /// Guards an attempt to upload the given `Digest`, skipping the upload if another attempt has + /// been successful. Will not return until either an attempt has succeed, or this attempt has + /// failed. + /// + async fn maybe_upload( + &self, + digest: Digest, + upload: impl Future>, + ) -> Result<(), E> { + Self::cell_from(&self.in_flight_uploads, digest) + .get_or_try_init(upload) + .await + .map(|&()| ()) + } + + /// + /// Guards an attempt to download the given `Digest`, skipping the download if another attempt + /// has been successful. Will not return until either an attempt has succeed, or this attempt has + /// failed. + /// + async fn maybe_download( + &self, + digest: Digest, + upload: impl Future>, + ) -> Result<(), E> { + Self::cell_from(&self.in_flight_downloads, digest) + .get_or_try_init(upload) + .await + .map(|&()| ()) } } @@ -386,23 +430,19 @@ impl Store { /// pub async fn load_file_bytes_with< T: Send + 'static, - F: Fn(&[u8]) -> T + Send + Sync + 'static, + F: Fn(&[u8]) -> T + Clone + Send + Sync + 'static, >( &self, digest: Digest, f: F, ) -> Result { - // No transformation or verification is needed for files, so we pass in a pair of functions - // which always succeed, whether the underlying bytes are coming from a local or remote store. - // Unfortunately, we need to be a little verbose to do this. - let f_local = Arc::new(f); - let f_remote = f_local.clone(); + // No transformation or verification is needed for files. self .load_bytes_with( EntryType::File, digest, - move |v: &[u8]| Ok(f_local(v)), - move |v: Bytes| Ok(f_remote(&v)), + move |v: &[u8]| Ok(f(v)), + |_: Bytes| Ok(()), ) .await } @@ -571,7 +611,7 @@ impl Store { ) })?; protos::verify_directory_canonical(digest, &directory)?; - Ok(directory) + Ok(()) }, ) .await @@ -599,8 +639,8 @@ impl Store { /// async fn load_bytes_with< T: Send + 'static, - FLocal: Fn(&[u8]) -> Result + Send + Sync + 'static, - FRemote: Fn(Bytes) -> Result + Send + Sync + 'static, + FLocal: Fn(&[u8]) -> Result + Clone + Send + Sync + 'static, + FRemote: Fn(Bytes) -> Result<(), String> + Send + Sync + 'static, >( &self, entry_type: EntryType, @@ -613,51 +653,64 @@ impl Store { if let Some(bytes_res) = self .local - .load_bytes_with(entry_type, digest, f_local) + .load_bytes_with(entry_type, digest, f_local.clone()) .await? { return Ok(bytes_res?); } - let remote = maybe_remote - .ok_or_else(|| { - StoreError::MissingDigest("Was not present in the local store".to_owned(), digest) - })? - .store; - - let bytes = retry_call( - remote, - |remote| async move { remote.load_bytes_with(digest, Ok).await }, - |err| match err { - ByteStoreError::Grpc(status) => status_is_retryable(status), - _ => false, - }, - ) - .await - .map_err(|err| match err { - ByteStoreError::Grpc(status) => status_to_str(status), - ByteStoreError::Other(msg) => msg, - })? - .ok_or_else(|| { - StoreError::MissingDigest( - "Was not present in either the local or remote store".to_owned(), - digest, - ) + let remote = maybe_remote.ok_or_else(|| { + StoreError::MissingDigest("Was not present in the local store".to_owned(), digest) })?; + let remote_store = remote.store.clone(); - let value = f_remote(bytes.clone())?; - let stored_digest = local.store_bytes(entry_type, None, bytes, true).await?; - if digest == stored_digest { - Ok(value) - } else { - Err( - format!( - "CAS gave wrong digest: expected {:?}, got {:?}", - digest, stored_digest + remote + .maybe_download(digest, async move { + // TODO: Now that we always copy from the remote store to the local store before executing + // the caller's logic against the local store, `remote::ByteStore::load_bytes_with` no + // longer needs to accept a function. + let bytes = retry_call( + remote_store, + |remote_store| async move { remote_store.load_bytes_with(digest, Ok).await }, + |err| match err { + ByteStoreError::Grpc(status) => status_is_retryable(status), + _ => false, + }, ) - .into(), - ) - } + .await + .map_err(|err| match err { + ByteStoreError::Grpc(status) => status_to_str(status), + ByteStoreError::Other(msg) => msg, + })? + .ok_or_else(|| { + StoreError::MissingDigest( + "Was not present in either the local or remote store".to_owned(), + digest, + ) + })?; + + f_remote(bytes.clone())?; + let stored_digest = local.store_bytes(entry_type, None, bytes, true).await?; + if digest == stored_digest { + Ok(()) + } else { + Err(StoreError::Unclassified(format!( + "CAS gave wrong digest: expected {:?}, got {:?}", + digest, stored_digest + ))) + } + }) + .await?; + + Ok( + self + .local + .load_bytes_with(entry_type, digest, f_local) + .await? + .ok_or_else(|| { + format!("After downloading {digest:?}, the local store claimed that it was not present.") + })??, + ) } /// @@ -700,45 +753,30 @@ impl Store { remote.list_missing_digests(request).await? }; - let uploaded_digests = { - // Here we best-effort avoid uploading common blobs multiple times. If a blob is generated - // that many downstream actions depend on, we would otherwise get an expanded set of digests - // from each of those actions that includes the new blob. If those actions all execute in a - // time window smaller than the time taken to upload the blob, the effort would be - // duplicated leading to both wasted resources locally buffering up the blob as well as - // wasted effort on the remote server depending on its handling of this. - let to_upload = remote_store.reserve_uploads(digests_to_upload); - let uploaded_digests_result = future::try_join_all( - to_upload - .clone() - .into_iter() - .map(|digest| { - let entry_type = ingested_digests[&digest]; - let local = store.local.clone(); - let remote = remote.clone(); - async move { - // TODO(John Sirois): Consider allowing configuration of when to buffer large blobs - // to disk to be independent of the remote store wire chunk size. - if digest.size_bytes > remote.chunk_size_bytes() { - Self::store_large_blob_remote(local, remote, entry_type, digest).await - } else { - Self::store_small_blob_remote(local, remote, entry_type, digest).await - } + future::try_join_all( + digests_to_upload + .iter() + .cloned() + .map(|digest| { + let entry_type = ingested_digests[&digest]; + let local = store.local.clone(); + let remote = remote.clone(); + remote_store.maybe_upload(digest, async move { + // TODO(John Sirois): Consider allowing configuration of when to buffer large blobs + // to disk to be independent of the remote store wire chunk size. + if digest.size_bytes > remote.chunk_size_bytes() { + Self::store_large_blob_remote(local, remote, entry_type, digest).await + } else { + Self::store_small_blob_remote(local, remote, entry_type, digest).await } - .map_ok(move |()| digest) }) - .collect::>(), - ) - .await; - // We release the uploads whether or not they actually succeeded. Future checks for large - // uploads will issue `find_missing_blobs_request`s that will eventually reconcile our - // accounting. In the mean-time we error on the side of at least once semantics. - remote_store.release_uploads(to_upload); - uploaded_digests_result? - }; + }) + .collect::>(), + ) + .await?; let ingested_file_sizes = ingested_digests.iter().map(|(digest, _)| digest.size_bytes); - let uploaded_file_sizes = uploaded_digests.iter().map(|digest| digest.size_bytes); + let uploaded_file_sizes = digests_to_upload.iter().map(|digest| digest.size_bytes); Ok(UploadSummary { ingested_file_count: ingested_file_sizes.len(), @@ -1361,7 +1399,10 @@ pub enum LocalMissingBehavior { impl SnapshotOps for Store { type Error = StoreError; - async fn load_file_bytes_with T + Send + Sync + 'static>( + async fn load_file_bytes_with< + T: Send + 'static, + F: Fn(&[u8]) -> T + Clone + Send + Sync + 'static, + >( &self, digest: Digest, f: F, diff --git a/src/rust/engine/fs/store/src/snapshot_ops.rs b/src/rust/engine/fs/store/src/snapshot_ops.rs index a29ada825b5..92b7fb7fdce 100644 --- a/src/rust/engine/fs/store/src/snapshot_ops.rs +++ b/src/rust/engine/fs/store/src/snapshot_ops.rs @@ -165,7 +165,10 @@ async fn render_merge_error( pub trait SnapshotOps: Clone + Send + Sync + 'static { type Error: Debug + Display + From; - async fn load_file_bytes_with T + Send + Sync + 'static>( + async fn load_file_bytes_with< + T: Send + 'static, + F: Fn(&[u8]) -> T + Clone + Send + Sync + 'static, + >( &self, digest: Digest, f: F,