From 24718ef259dc895a1751979b407dddc79ab7a4b6 Mon Sep 17 00:00:00 2001 From: Daniel Moran Date: Tue, 23 Aug 2022 18:23:47 -0400 Subject: [PATCH 1/2] Optimize `CreateDigest` implementation. * Use a `DigestTrie` to create all snapshots at once, instead of creating them individually * Store all in-memory file contents in a single (batched) call, instead of storing them individually [ci skip-build-wheels] --- src/rust/engine/fs/store/src/lib.rs | 17 ++++++ src/rust/engine/src/intrinsics.rs | 81 ++++++++++++++++------------- 2 files changed, 61 insertions(+), 37 deletions(-) diff --git a/src/rust/engine/fs/store/src/lib.rs b/src/rust/engine/fs/store/src/lib.rs index 2410b5026ff..534b5e499cc 100644 --- a/src/rust/engine/fs/store/src/lib.rs +++ b/src/rust/engine/fs/store/src/lib.rs @@ -365,6 +365,23 @@ impl Store { .await } + /// + /// A convenience method for storing batches of small files. + /// + /// NB: This method should not be used for large blobs: prefer to stream them from their source + /// using `store_file`. + /// + pub async fn store_file_bytes_batch( + &self, + items: Vec<(Option, Bytes)>, + initial_lease: bool, + ) -> Result, String> { + self + .local + .store_bytes_batch(EntryType::File, items, initial_lease) + .await + } + /// /// Store a file locally by streaming its contents. /// diff --git a/src/rust/engine/src/intrinsics.rs b/src/rust/engine/src/intrinsics.rs index bb3c4d37c8e..32fccb21b6f 100644 --- a/src/rust/engine/src/intrinsics.rs +++ b/src/rust/engine/src/intrinsics.rs @@ -1,6 +1,7 @@ // Copyright 2021 Pants project contributors (see CONTRIBUTORS.md). // Licensed under the Apache License, Version 2.0 (see LICENSE). +use std::collections::HashMap; use std::path::{Path, PathBuf}; use std::process::Stdio; use std::str::FromStr; @@ -18,14 +19,15 @@ use crate::tasks::Intrinsic; use crate::types::Types; use crate::Failure; -use futures::future::{self, BoxFuture, FutureExt, TryFutureExt}; +use bytes::Bytes; +use futures::future::{BoxFuture, FutureExt, TryFutureExt}; use futures::try_join; use indexmap::IndexMap; use pyo3::{PyAny, PyRef, Python, ToPyObject}; use tokio::process; -use fs::{DirectoryDigest, RelativePath}; -use hashing::Digest; +use fs::{DigestTrie, DirectoryDigest, PathStat, RelativePath}; +use hashing::{Digest, EMPTY_DIGEST}; use process_execution::local::{apply_chroot, create_sandbox, prepare_workdir, KeepSandboxes}; use process_execution::ManagedChild; use stdio::TryCloneAsFile; @@ -406,6 +408,8 @@ fn create_digest_to_digest( context: Context, args: Vec, ) -> BoxFuture<'static, NodeResult> { + let mut new_file_count = 0; + let items: Vec = { let gil = Python::acquire_gil(); let py = gil.python(); @@ -419,6 +423,7 @@ fn create_digest_to_digest( if obj.hasattr("content").unwrap() { let bytes = bytes::Bytes::from(externs::getattr::>(obj, "content").unwrap()); let is_executable: bool = externs::getattr(obj, "is_executable").unwrap(); + new_file_count += 1; CreateDigestItem::FileContent(path, bytes, is_executable) } else if obj.hasattr("file_digest").unwrap() { let py_file_digest: PyFileDigest = externs::getattr(obj, "file_digest").unwrap(); @@ -431,45 +436,47 @@ fn create_digest_to_digest( .collect() }; - // TODO: Rather than creating independent Digests and then merging them, this should use - // `DigestTrie::from_path_stats`. - // see https://github.com/pantsbuild/pants/pull/14569#issuecomment-1057286943 - let digest_futures: Vec<_> = items - .into_iter() - .map(|item| { - let store = context.core.store(); - async move { - match item { - CreateDigestItem::FileContent(path, bytes, is_executable) => { - let digest = store.store_file_bytes(bytes, true).await?; - let snapshot = store - .snapshot_of_one_file(path, digest, is_executable) - .await?; - let res: Result = Ok(snapshot.into()); - res - } - CreateDigestItem::FileEntry(path, digest, is_executable) => { - let snapshot = store - .snapshot_of_one_file(path, digest, is_executable) - .await?; - let res: Result<_, String> = Ok(snapshot.into()); - res - } - CreateDigestItem::Dir(path) => store - .create_empty_dir(&path) - .await - .map_err(|e| e.to_string()), - } + let mut path_stats: Vec = Vec::with_capacity(items.len()); + let mut file_digests: HashMap = HashMap::with_capacity(items.len()); + let mut bytes_to_store: Vec<(Option, Bytes)> = Vec::with_capacity(new_file_count); + + for item in items { + match item { + CreateDigestItem::FileContent(path, bytes, is_executable) => { + let digest = Digest::of_bytes(&bytes); + bytes_to_store.push((Some(digest), bytes)); + let stat = fs::File { + path: path.to_path_buf(), + is_executable, + }; + path_stats.push(PathStat::file(path.to_path_buf(), stat)); + file_digests.insert(path.to_path_buf(), digest); } - }) - .collect(); + CreateDigestItem::FileEntry(path, digest, is_executable) => { + let stat = fs::File { + path: path.to_path_buf(), + is_executable, + }; + path_stats.push(PathStat::file(path.to_path_buf(), stat)); + file_digests.insert(path.to_path_buf(), digest); + } + CreateDigestItem::Dir(path) => { + let stat = fs::Dir(path.to_path_buf()); + path_stats.push(PathStat::dir(path.to_path_buf(), stat)); + file_digests.insert(path.to_path_buf(), EMPTY_DIGEST); + } + } + } let store = context.core.store(); async move { - let digests = future::try_join_all(digest_futures).await?; - let digest = store.merge(digests).await?; + let trie = DigestTrie::from_path_stats(path_stats, &file_digests)?; + let store_trie = store.record_digest_trie(trie, true); + let store_bytes = store.store_file_bytes_batch(bytes_to_store, true); + let (trie_digest, _) = try_join!(store_trie, store_bytes)?; + let gil = Python::acquire_gil(); - let value = Snapshot::store_directory_digest(gil.python(), digest)?; + let value = Snapshot::store_directory_digest(gil.python(), trie_digest)?; Ok(value) } .boxed() From a2959313f1cb3caf9cf06dcfe26b31d9d40457e8 Mon Sep 17 00:00:00 2001 From: Daniel Moran Date: Tue, 23 Aug 2022 18:54:20 -0400 Subject: [PATCH 2/2] Skip persisting `DirectoryDigest` to disk. It's not strictly needed, and skipping it can avoid a lot of IO. [ci skip-build-wheels] --- src/rust/engine/src/intrinsics.rs | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/src/rust/engine/src/intrinsics.rs b/src/rust/engine/src/intrinsics.rs index 32fccb21b6f..ae2224c6283 100644 --- a/src/rust/engine/src/intrinsics.rs +++ b/src/rust/engine/src/intrinsics.rs @@ -470,13 +470,12 @@ fn create_digest_to_digest( let store = context.core.store(); async move { + // The digests returned here are already in the `file_digests` map. + let _ = store.store_file_bytes_batch(bytes_to_store, true).await?; let trie = DigestTrie::from_path_stats(path_stats, &file_digests)?; - let store_trie = store.record_digest_trie(trie, true); - let store_bytes = store.store_file_bytes_batch(bytes_to_store, true); - let (trie_digest, _) = try_join!(store_trie, store_bytes)?; let gil = Python::acquire_gil(); - let value = Snapshot::store_directory_digest(gil.python(), trie_digest)?; + let value = Snapshot::store_directory_digest(gil.python(), trie.into())?; Ok(value) } .boxed()