Skip to content

Commit

Permalink
Parallel watchman queries for saved state and changed files
Browse files Browse the repository at this point in the history
Reviewed By: tyao1

Differential Revision: D58544752

fbshipit-source-id: d82d53543e6b77d83733b49b7680caaee32dd53d
  • Loading branch information
gordyf authored and facebook-github-bot committed Jun 14, 2024
1 parent 7a6f4f6 commit 232985f
Showing 1 changed file with 113 additions and 76 deletions.
189 changes: 113 additions & 76 deletions compiler/crates/relay-compiler/src/file_source/watchman_file_source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ use crate::errors::Result;
use crate::saved_state::SavedStateLoader;

pub struct WatchmanFileSource {
client: Client,
client: Arc<Client>,
config: Arc<Config>,
resolved_root: ResolvedRoot,
}
Expand All @@ -53,7 +53,7 @@ impl WatchmanFileSource {
resolved_root
);
Ok(Self {
client,
client: Arc::new(client),
config: config.clone(),
resolved_root,
})
Expand All @@ -74,9 +74,14 @@ impl WatchmanFileSource {
CompilerState::deserialize_from_file(saved_state_path)
})?;
let query_timer = perf_logger_event.start("watchman_query_time");
let file_source_result = self
.query_file_result(compiler_state.clock.clone(), false)
.await?;
let file_source_result = query_file_result(
&self.config,
&self.client,
&self.resolved_root.clone(),
compiler_state.clock.clone(),
false,
)
.await?;
perf_logger_event.stop(query_timer);
compiler_state
.pending_file_source_changes
Expand Down Expand Up @@ -136,7 +141,14 @@ impl WatchmanFileSource {
perf_logger_event: &impl PerfLogEvent,
perf_logger: &impl PerfLogger,
) -> Result<CompilerState> {
let file_source_result = self.query_file_result(None, false).await?;
let file_source_result = query_file_result(
&self.config,
&self.client,
&self.resolved_root.clone(),
None,
false,
)
.await?;
let compiler_state = perf_logger_event.time("from_file_source_changes", || {
CompilerState::from_file_source_changes(
&self.config,
Expand All @@ -160,9 +172,14 @@ impl WatchmanFileSource {
let expression = get_watchman_expr(&self.config);

let query_timer = perf_logger_event.start("watchman_query_time_before_subscribe");
let file_source_result = self
.query_file_result(compiler_state.clock.clone(), true)
.await?;
let file_source_result = query_file_result(
&self.config,
&self.client,
&self.resolved_root.clone(),
compiler_state.clock.clone(),
true,
)
.await?;
perf_logger_event.stop(query_timer);

let query_timer = perf_logger_event.start("watchman_query_time_subscribe");
Expand All @@ -188,65 +205,6 @@ impl WatchmanFileSource {
))
}

/// Internal method to issue a watchman query, returning a raw
/// WatchmanFileSourceResult.
async fn query_file_result(
&self,
since_clock: Option<Clock>,
omit_changed_files: bool,
) -> Result<FileSourceResult> {
let expression = get_watchman_expr(&self.config);
debug!(
"WatchmanFileSource::query_file_result(...) get_watchman_expr = {:?}",
&expression
);

// If `since` is available, we should not pass the `path` parameter.
// Watchman ignores `since` parameter if both `path` and `since` are
// passed as the request params
let request = if since_clock.is_some() {
QueryRequestCommon {
omit_changed_files,
empty_on_fresh_instance: omit_changed_files,
expression: Some(expression),
since: since_clock,
..Default::default()
}
} else {
let query_roots = get_all_roots(&self.config)
.into_iter()
.map(PathGeneratorElement::RecursivePath)
.collect();
QueryRequestCommon {
omit_changed_files,
empty_on_fresh_instance: omit_changed_files,
expression: Some(expression),
path: Some(query_roots),
..Default::default()
}
};
debug!(
"WatchmanFileSource::query_file_result(...) request = {:?}",
&request
);
let query_result = self
.client
.query::<WatchmanFile>(&self.resolved_root, request)
.await?;

// print debug information for this result
// (file list will include only files with specified extension)
debug_query_results(&query_result, "graphql");

let files = query_result.files.ok_or(Error::EmptyQueryResult)?;
Ok(FileSourceResult::Watchman(WatchmanFileSourceResult {
files,
resolved_root: self.resolved_root.clone(),
clock: query_result.clock,
saved_state_info: query_result.saved_state_info,
}))
}

/// Tries to load saved state with a watchman query.
/// The return value is a nested Result:
/// The outer Result indicates the result of a possible saved state infrastructure failure.
Expand All @@ -259,6 +217,7 @@ impl WatchmanFileSource {
saved_state_loader: &'_ (dyn SavedStateLoader + Send + Sync),
saved_state_version: &str,
) -> std::result::Result<Result<CompilerState>, &'static str> {
let try_saved_state_event = perf_logger_event.start("try_saved_state_time");
let scm_since = Clock::ScmAware(FatClockData {
clock: ClockSpec::null(),
scm: Some(saved_state_config),
Expand All @@ -267,18 +226,32 @@ impl WatchmanFileSource {
"WatchmanFileSource::try_saved_state(...) scm_since = {:?}",
&scm_since
);
let query_timer = perf_logger_event.start("watchman_query_time_try_saved_state");
let file_source_result = self
.query_file_result(Some(scm_since), false)

// Issue two watchman queries: One to get the saved state info, and one to get the changed files.
// We'll download and deserialize saved state from manifold while the second watchman query executes.

let since = Some(scm_since.clone());
let root = self.resolved_root.clone();
let saved_state_result = query_file_result(&self.config, &self.client, &root, since, true)
.await
.map_err(|_| "query failed")?;
perf_logger_event.stop(query_timer);

let since = Some(scm_since.clone());
let config = Arc::clone(&self.config);
let client = Arc::clone(&self.client);
let root = self.resolved_root.clone();
let changed_files_result_future = tokio::task::spawn(async move {
query_file_result(&config, &client, &root, since, false)
.await
.map_err(|_| "query failed")
});

// First, use saved state query to download saved state from manifold.
debug!(
"WatchmanFileSource::try_saved_state(...) file_source_result = {:?}",
&file_source_result
"WatchmanFileSource::try_saved_state(...) saved_state_result = {:?}",
&saved_state_result
);
let saved_state_info = file_source_result
let saved_state_info = saved_state_result
.saved_state_info()
.as_ref()
.ok_or("no saved state in watchman response")?;
Expand Down Expand Up @@ -308,6 +281,12 @@ impl WatchmanFileSource {
{
return Err("Saved state version doesn't match.");
}

// Then await the changed files query.
let file_source_result = changed_files_result_future
.await
.map_err(|_| "query failed")??;

compiler_state
.pending_file_source_changes
.write()
Expand All @@ -321,7 +300,9 @@ impl WatchmanFileSource {
}

if let Err(parse_error) = perf_logger_event.time("merge_file_source_changes", || {
compiler_state.merge_file_source_changes(&self.config, perf_logger, true)
let result = compiler_state.merge_file_source_changes(&self.config, perf_logger, true);
perf_logger_event.stop(try_saved_state_event);
result
}) {
Ok(Err(parse_error))
} else {
Expand All @@ -330,6 +311,62 @@ impl WatchmanFileSource {
}
}

async fn query_file_result(
config: &Config,
client: &Client,
resolved_root: &ResolvedRoot,
since_clock: Option<Clock>,
omit_changed_files: bool,
) -> Result<FileSourceResult> {
let expression = get_watchman_expr(config);
debug!(
"WatchmanFileSource::query_file_result(...) get_watchman_expr = {:?}",
&expression
);

// If `since` is available, we should not pass the `path` parameter.
// Watchman ignores `since` parameter if both `path` and `since` are
// passed as the request params
let request = if since_clock.is_some() {
QueryRequestCommon {
omit_changed_files,
empty_on_fresh_instance: omit_changed_files,
expression: Some(expression),
since: since_clock,
..Default::default()
}
} else {
let query_roots = get_all_roots(config)
.into_iter()
.map(PathGeneratorElement::RecursivePath)
.collect();
QueryRequestCommon {
omit_changed_files,
empty_on_fresh_instance: omit_changed_files,
expression: Some(expression),
path: Some(query_roots),
..Default::default()
}
};
debug!(
"WatchmanFileSource::query_file_result(...) request = {:?}",
&request
);
let query_result = client.query::<WatchmanFile>(resolved_root, request).await?;

// print debug information for this result
// (file list will include only files with specified extension)
debug_query_results(&query_result, "graphql");

let files = query_result.files.ok_or(Error::EmptyQueryResult)?;
Ok(FileSourceResult::Watchman(WatchmanFileSourceResult {
files,
resolved_root: resolved_root.clone(),
clock: query_result.clock,
saved_state_info: query_result.saved_state_info,
}))
}

fn debug_query_results(query_result: &QueryResult<WatchmanFile>, extension_filter: &str) {
if let Ok(rust_log) = std::env::var("RUST_LOG") {
if rust_log == *"debug" {
Expand Down

0 comments on commit 232985f

Please sign in to comment.