Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement a batched local lookup for missing fingerprints. (Cherry-pick of #16627) #16701

Merged
merged 1 commit into from
Aug 30, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 4 additions & 13 deletions src/rust/engine/fs/store/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -905,19 +905,10 @@ impl Store {
}

// Filter out file digests that exist locally.
// TODO: Implement a local batch API: see https://github.com/pantsbuild/pants/issues/16400.
let local_file_exists = future::try_join_all(
file_digests
.iter()
.map(|file_digest| self.local.exists(EntryType::File, *file_digest))
.collect::<Vec<_>>(),
)
.await?;
let missing_locally = local_file_exists
.into_iter()
.zip(file_digests.into_iter())
.filter_map(|(exists, digest)| if exists { None } else { Some(digest) })
.collect::<Vec<_>>();
let missing_locally = self
.local
.get_missing_digests(EntryType::File, file_digests)
.await?;

// If there are any digests which don't exist locally, check remotely.
if missing_locally.is_empty() {
Expand Down
39 changes: 31 additions & 8 deletions src/rust/engine/fs/store/src/local.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use super::{EntryType, ShrinkBehavior};

use std::collections::BinaryHeap;
use std::collections::{BinaryHeap, HashSet};
use std::fmt::Debug;
use std::io::{self, Read};
use std::path::Path;
Expand Down Expand Up @@ -338,18 +338,41 @@ impl ByteStore {
.await
}

pub async fn exists(&self, entry_type: EntryType, digest: Digest) -> Result<bool, String> {
if digest == EMPTY_DIGEST {
// Avoid I/O for this case. This allows some client-provided operations (like merging
// snapshots) to work without needing to first store the empty snapshot.
return Ok(true);
}
///
/// Given a collection of Digests (digests),
/// returns the set of digests from that collection not present in the
/// underlying LMDB store.
///
pub async fn get_missing_digests(
&self,
entry_type: EntryType,
digests: HashSet<Digest>,
) -> Result<HashSet<Digest>, String> {
let fingerprints_to_check = digests
.iter()
.filter_map(|digest| {
// Avoid I/O for this case. This allows some client-provided operations (like
// merging snapshots) to work without needing to first store the empty snapshot.
if *digest == EMPTY_DIGEST {
None
} else {
Some(digest.hash)
}
})
.collect::<Vec<_>>();

let dbs = match entry_type {
EntryType::Directory => self.inner.directory_dbs.clone(),
EntryType::File => self.inner.file_dbs.clone(),
}?;
dbs.exists(digest.hash).await

let existing = dbs.exists_batch(fingerprints_to_check).await?;

let missing = digests
.into_iter()
.filter(|digest| *digest != EMPTY_DIGEST && !existing.contains(&digest.hash))
.collect::<HashSet<_>>();
Ok(missing)
}

///
Expand Down
74 changes: 60 additions & 14 deletions src/rust/engine/sharded_lmdb/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
// Arc<Mutex> can be more clear than needing to grok Orderings:
#![allow(clippy::mutex_atomic)]

use std::collections::HashMap;
use std::collections::{HashMap, HashSet};
use std::fmt;
use std::fmt::Debug;
use std::io::{self, Read};
Expand Down Expand Up @@ -327,25 +327,71 @@ impl ShardedLmdb {
.await
}

///
/// Singular form of `Self::exists_batch`. When checking the existence of more than one item,
/// prefer `Self::exists_batch`.
///
pub async fn exists(&self, fingerprint: Fingerprint) -> Result<bool, String> {
let missing = self.exists_batch(vec![fingerprint]).await?;
Ok(missing.contains(&fingerprint))
}

///
/// Determine which of the given Fingerprints are already present in the store,
/// returning them as a set.
///
pub async fn exists_batch(
&self,
fingerprints: Vec<Fingerprint>,
) -> Result<HashSet<Fingerprint>, String> {
let store = self.clone();
let effective_key = VersionedFingerprint::new(fingerprint, ShardedLmdb::SCHEMA_VERSION);
self
.executor
.spawn_blocking(move || {
let fingerprint = effective_key.get_fingerprint();
let (env, db, _) = store.get(&fingerprint);
let txn = env
.begin_ro_txn()
.map_err(|err| format!("Failed to begin read transaction: {:?}", err))?;
match txn.get(db, &effective_key) {
Ok(_) => Ok(true),
Err(lmdb::Error::NotFound) => Ok(false),
Err(err) => Err(format!(
"Error reading from store when checking existence of {}: {}",
fingerprint, err
)),
// Group the items by the Environment that they will be applied to.
let mut items_by_env = HashMap::new();
let mut exists = HashSet::new();

for fingerprint in &fingerprints {
let effective_key = VersionedFingerprint::new(*fingerprint, ShardedLmdb::SCHEMA_VERSION);
let (env_id, _, env, db, _) = store.get_raw(&fingerprint.0);

let (_, _, batch) = items_by_env
.entry(*env_id)
.or_insert_with(|| (env.clone(), *db, vec![]));
batch.push(effective_key);
}

// Open and commit a Transaction per Environment. Since we never have more than one
// Transaction open at a time, we don't have to worry about ordering.
for (_, (env, db, batch)) in items_by_env {
env
.begin_ro_txn()
.and_then(|txn| {
for effective_key in &batch {
let get_res = txn.get(db, &effective_key);
match get_res {
Ok(_) => {
exists.insert(effective_key.get_fingerprint());
}
Err(lmdb::Error::NotFound) => (),
Err(err) => return Err(err),
};
}
txn.commit()
})
.map_err(|e| {
format!(
"Error checking existence of fingerprints {:?}: {}",
batch
.iter()
.map(|key| key.get_fingerprint())
.collect::<Vec<_>>(),
e
)
})?;
}
Ok(exists)
})
.await
}
Expand Down