Skip to content

Commit

Permalink
Dedupe load_bytes_with calls to a remote Store (Cherry-pick of #15901
Browse files Browse the repository at this point in the history
…) (#15915)

As described in #15524: `remote::ByteStore::load_bytes_with` calls are not deduped currently, meaning that if multiple consumers identify a `Digest` which is missing from the local store, they might concurrently fetch it from the remote store.

This is primarily an issue with `--remote-cache-eager-fetch=false`, as the laziness means that all consumers of a process output might consider whether to download it simultaneously (rather than the output always being downloaded before the process is called complete).

Fixes #15524.

[ci skip-build-wheels]
  • Loading branch information
stuhood authored Jun 24, 2022
1 parent f7b5ec4 commit 69a10c7
Show file tree
Hide file tree
Showing 2 changed files with 146 additions and 102 deletions.
243 changes: 142 additions & 101 deletions src/rust/engine/fs/store/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,13 +35,14 @@ mod snapshot_ops_tests;
mod snapshot_tests;
pub use crate::snapshot_ops::{SnapshotOps, SubsetParams};

use std::collections::{BTreeMap, HashMap, HashSet};
use std::collections::{BTreeMap, HashMap};
use std::fmt::{self, Debug, Display};
use std::fs::OpenOptions;
use std::future::Future;
use std::io::{self, Read, Write};
use std::os::unix::fs::{OpenOptionsExt, PermissionsExt};
use std::path::{Path, PathBuf};
use std::sync::Arc;
use std::sync::{Arc, Weak};
use std::time::{Duration, Instant};

use async_oncecell::OnceCell;
Expand Down Expand Up @@ -148,35 +149,78 @@ pub struct UploadSummary {
pub upload_wall_time: Duration,
}

///
/// Wraps a `remote::ByteStore` with state to help avoid uploading common blobs multiple times.
///
/// If a blob is generated that many downstream actions depend on, we would otherwise get an
/// expanded set of digests from each of those actions that includes the new blob. If those actions
/// all execute in a time window smaller than the time taken to upload the blob, the effort would be
/// duplicated leading to both wasted resources locally buffering up the blob as well as wasted
/// effort on the remote server depending on its handling of this.
///
#[derive(Clone, Debug)]
struct RemoteStore {
store: remote::ByteStore,
in_flight_uploads: Arc<parking_lot::Mutex<HashSet<Digest>>>,
in_flight_uploads: Arc<Mutex<HashMap<Digest, Weak<OnceCell<()>>>>>,
in_flight_downloads: Arc<Mutex<HashMap<Digest, Weak<OnceCell<()>>>>>,
}

impl RemoteStore {
fn new(store: remote::ByteStore) -> Self {
Self {
store,
in_flight_uploads: Arc::new(parking_lot::Mutex::new(HashSet::new())),
in_flight_uploads: Arc::default(),
in_flight_downloads: Arc::default(),
}
}

fn reserve_uploads(&self, candidates: HashSet<Digest>) -> HashSet<Digest> {
let mut active_uploads = self.in_flight_uploads.lock();
let to_upload = candidates
.difference(&active_uploads)
.cloned()
.collect::<HashSet<_>>();
active_uploads.extend(&to_upload);
to_upload
///
/// Returns a strongly held cell from a map of weakly held cells, creating it if necessary.
///
fn cell_from(
cells: &Mutex<HashMap<Digest, Weak<OnceCell<()>>>>,
digest: Digest,
) -> Arc<OnceCell<()>> {
let mut cells = cells.lock();
if let Some(cell) = cells.get(&digest).and_then(|weak_cell| weak_cell.upgrade()) {
cell
} else {
let cell = Arc::new(OnceCell::new());
cells.insert(digest, Arc::downgrade(&cell));
cell
}
}

fn release_uploads(&self, uploads: HashSet<Digest>) {
self
.in_flight_uploads
.lock()
.retain(|d| !uploads.contains(d));
///
/// Guards an attempt to upload the given `Digest`, skipping the upload if another attempt has
/// been successful. Will not return until either an attempt has succeed, or this attempt has
/// failed.
///
async fn maybe_upload<E>(
&self,
digest: Digest,
upload: impl Future<Output = Result<(), E>>,
) -> Result<(), E> {
Self::cell_from(&self.in_flight_uploads, digest)
.get_or_try_init(upload)
.await
.map(|&()| ())
}

///
/// Guards an attempt to download the given `Digest`, skipping the download if another attempt
/// has been successful. Will not return until either an attempt has succeed, or this attempt has
/// failed.
///
async fn maybe_download<E>(
&self,
digest: Digest,
upload: impl Future<Output = Result<(), E>>,
) -> Result<(), E> {
Self::cell_from(&self.in_flight_downloads, digest)
.get_or_try_init(upload)
.await
.map(|&()| ())
}
}

Expand Down Expand Up @@ -386,23 +430,19 @@ impl Store {
///
pub async fn load_file_bytes_with<
T: Send + 'static,
F: Fn(&[u8]) -> T + Send + Sync + 'static,
F: Fn(&[u8]) -> T + Clone + Send + Sync + 'static,
>(
&self,
digest: Digest,
f: F,
) -> Result<T, StoreError> {
// No transformation or verification is needed for files, so we pass in a pair of functions
// which always succeed, whether the underlying bytes are coming from a local or remote store.
// Unfortunately, we need to be a little verbose to do this.
let f_local = Arc::new(f);
let f_remote = f_local.clone();
// No transformation or verification is needed for files.
self
.load_bytes_with(
EntryType::File,
digest,
move |v: &[u8]| Ok(f_local(v)),
move |v: Bytes| Ok(f_remote(&v)),
move |v: &[u8]| Ok(f(v)),
|_: Bytes| Ok(()),
)
.await
}
Expand Down Expand Up @@ -571,7 +611,7 @@ impl Store {
)
})?;
protos::verify_directory_canonical(digest, &directory)?;
Ok(directory)
Ok(())
},
)
.await
Expand Down Expand Up @@ -599,8 +639,8 @@ impl Store {
///
async fn load_bytes_with<
T: Send + 'static,
FLocal: Fn(&[u8]) -> Result<T, String> + Send + Sync + 'static,
FRemote: Fn(Bytes) -> Result<T, String> + Send + Sync + 'static,
FLocal: Fn(&[u8]) -> Result<T, String> + Clone + Send + Sync + 'static,
FRemote: Fn(Bytes) -> Result<(), String> + Send + Sync + 'static,
>(
&self,
entry_type: EntryType,
Expand All @@ -613,51 +653,64 @@ impl Store {

if let Some(bytes_res) = self
.local
.load_bytes_with(entry_type, digest, f_local)
.load_bytes_with(entry_type, digest, f_local.clone())
.await?
{
return Ok(bytes_res?);
}

let remote = maybe_remote
.ok_or_else(|| {
StoreError::MissingDigest("Was not present in the local store".to_owned(), digest)
})?
.store;

let bytes = retry_call(
remote,
|remote| async move { remote.load_bytes_with(digest, Ok).await },
|err| match err {
ByteStoreError::Grpc(status) => status_is_retryable(status),
_ => false,
},
)
.await
.map_err(|err| match err {
ByteStoreError::Grpc(status) => status_to_str(status),
ByteStoreError::Other(msg) => msg,
})?
.ok_or_else(|| {
StoreError::MissingDigest(
"Was not present in either the local or remote store".to_owned(),
digest,
)
let remote = maybe_remote.ok_or_else(|| {
StoreError::MissingDigest("Was not present in the local store".to_owned(), digest)
})?;
let remote_store = remote.store.clone();

let value = f_remote(bytes.clone())?;
let stored_digest = local.store_bytes(entry_type, None, bytes, true).await?;
if digest == stored_digest {
Ok(value)
} else {
Err(
format!(
"CAS gave wrong digest: expected {:?}, got {:?}",
digest, stored_digest
remote
.maybe_download(digest, async move {
// TODO: Now that we always copy from the remote store to the local store before executing
// the caller's logic against the local store, `remote::ByteStore::load_bytes_with` no
// longer needs to accept a function.
let bytes = retry_call(
remote_store,
|remote_store| async move { remote_store.load_bytes_with(digest, Ok).await },
|err| match err {
ByteStoreError::Grpc(status) => status_is_retryable(status),
_ => false,
},
)
.into(),
)
}
.await
.map_err(|err| match err {
ByteStoreError::Grpc(status) => status_to_str(status),
ByteStoreError::Other(msg) => msg,
})?
.ok_or_else(|| {
StoreError::MissingDigest(
"Was not present in either the local or remote store".to_owned(),
digest,
)
})?;

f_remote(bytes.clone())?;
let stored_digest = local.store_bytes(entry_type, None, bytes, true).await?;
if digest == stored_digest {
Ok(())
} else {
Err(StoreError::Unclassified(format!(
"CAS gave wrong digest: expected {:?}, got {:?}",
digest, stored_digest
)))
}
})
.await?;

Ok(
self
.local
.load_bytes_with(entry_type, digest, f_local)
.await?
.ok_or_else(|| {
format!("After downloading {digest:?}, the local store claimed that it was not present.")
})??,
)
}

///
Expand Down Expand Up @@ -700,45 +753,30 @@ impl Store {
remote.list_missing_digests(request).await?
};

let uploaded_digests = {
// Here we best-effort avoid uploading common blobs multiple times. If a blob is generated
// that many downstream actions depend on, we would otherwise get an expanded set of digests
// from each of those actions that includes the new blob. If those actions all execute in a
// time window smaller than the time taken to upload the blob, the effort would be
// duplicated leading to both wasted resources locally buffering up the blob as well as
// wasted effort on the remote server depending on its handling of this.
let to_upload = remote_store.reserve_uploads(digests_to_upload);
let uploaded_digests_result = future::try_join_all(
to_upload
.clone()
.into_iter()
.map(|digest| {
let entry_type = ingested_digests[&digest];
let local = store.local.clone();
let remote = remote.clone();
async move {
// TODO(John Sirois): Consider allowing configuration of when to buffer large blobs
// to disk to be independent of the remote store wire chunk size.
if digest.size_bytes > remote.chunk_size_bytes() {
Self::store_large_blob_remote(local, remote, entry_type, digest).await
} else {
Self::store_small_blob_remote(local, remote, entry_type, digest).await
}
future::try_join_all(
digests_to_upload
.iter()
.cloned()
.map(|digest| {
let entry_type = ingested_digests[&digest];
let local = store.local.clone();
let remote = remote.clone();
remote_store.maybe_upload(digest, async move {
// TODO(John Sirois): Consider allowing configuration of when to buffer large blobs
// to disk to be independent of the remote store wire chunk size.
if digest.size_bytes > remote.chunk_size_bytes() {
Self::store_large_blob_remote(local, remote, entry_type, digest).await
} else {
Self::store_small_blob_remote(local, remote, entry_type, digest).await
}
.map_ok(move |()| digest)
})
.collect::<Vec<_>>(),
)
.await;
// We release the uploads whether or not they actually succeeded. Future checks for large
// uploads will issue `find_missing_blobs_request`s that will eventually reconcile our
// accounting. In the mean-time we error on the side of at least once semantics.
remote_store.release_uploads(to_upload);
uploaded_digests_result?
};
})
.collect::<Vec<_>>(),
)
.await?;

let ingested_file_sizes = ingested_digests.iter().map(|(digest, _)| digest.size_bytes);
let uploaded_file_sizes = uploaded_digests.iter().map(|digest| digest.size_bytes);
let uploaded_file_sizes = digests_to_upload.iter().map(|digest| digest.size_bytes);

Ok(UploadSummary {
ingested_file_count: ingested_file_sizes.len(),
Expand Down Expand Up @@ -1361,7 +1399,10 @@ pub enum LocalMissingBehavior {
impl SnapshotOps for Store {
type Error = StoreError;

async fn load_file_bytes_with<T: Send + 'static, F: Fn(&[u8]) -> T + Send + Sync + 'static>(
async fn load_file_bytes_with<
T: Send + 'static,
F: Fn(&[u8]) -> T + Clone + Send + Sync + 'static,
>(
&self,
digest: Digest,
f: F,
Expand Down
5 changes: 4 additions & 1 deletion src/rust/engine/fs/store/src/snapshot_ops.rs
Original file line number Diff line number Diff line change
Expand Up @@ -165,7 +165,10 @@ async fn render_merge_error<T: SnapshotOps + 'static>(
pub trait SnapshotOps: Clone + Send + Sync + 'static {
type Error: Debug + Display + From<String>;

async fn load_file_bytes_with<T: Send + 'static, F: Fn(&[u8]) -> T + Send + Sync + 'static>(
async fn load_file_bytes_with<
T: Send + 'static,
F: Fn(&[u8]) -> T + Clone + Send + Sync + 'static,
>(
&self,
digest: Digest,
f: F,
Expand Down

0 comments on commit 69a10c7

Please sign in to comment.