Skip to content

Commit

Permalink
[naga xtask] Run validation jobs in parallel, using jobserver.
Browse files Browse the repository at this point in the history
  • Loading branch information
jimblandy committed Dec 26, 2023
1 parent 04fe2a6 commit cd6262d
Show file tree
Hide file tree
Showing 5 changed files with 120 additions and 23 deletions.
27 changes: 27 additions & 0 deletions naga/xtask/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions naga/xtask/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,9 @@ env_logger = { version = "0.10.0", default-features = false }
glob = "0.3.1"
hlsl-snapshots = { path = "../hlsl-snapshots"}
indicatif = "0.17"
jobserver = "0.1"
log = "0.4.17"
num_cpus = "1.16"
pico-args = "0.5.0"
shell-words = "1.1.0"
which = "4.4.0"
Expand Down
34 changes: 34 additions & 0 deletions naga/xtask/src/jobserver.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
//! Running jobs in parallel, with a controlled degree of concurrency.
use std::sync::OnceLock;

use jobserver::Client;

static JOB_SERVER: OnceLock<Client> = OnceLock::new();

pub fn init() {
JOB_SERVER.get_or_init(|| {
// Try to connect to a jobserver inherited from our parent.
if let Some(client) = unsafe { Client::from_env() } {
log::debug!("connected to inherited jobserver client");
client
} else {
// Otherwise, start our own jobserver.
log::debug!("no inherited jobserver client; creating a new jobserver");
Client::new(num_cpus::get()).expect("failed to create jobserver")
}
});
}

/// Wait until it is okay to start a new job, and then spawn a thread running `body`.
pub fn start_job_thread<F>(body: F) -> anyhow::Result<()>
where
F: FnOnce() + Send + 'static,
{
let acquired = JOB_SERVER.get().unwrap().acquire()?;
std::thread::spawn(move || {
body();
drop(acquired);
});
Ok(())
}
3 changes: 3 additions & 0 deletions naga/xtask/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ use crate::{
mod cli;
mod fs;
mod glob;
mod jobserver;
mod path;
mod process;
mod result;
Expand All @@ -25,6 +26,8 @@ fn main() -> ExitCode {
.format_indent(Some(0))
.init();

jobserver::init();

let args = Args::parse();

match run(args) {
Expand Down
77 changes: 54 additions & 23 deletions naga/xtask/src/validate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,15 +13,37 @@ use crate::{
process::{which, EasyCommand},
};

type Job = Box<dyn FnOnce() -> anyhow::Result<()> + Send + std::panic::UnwindSafe + 'static>;

pub(crate) fn validate(cmd: ValidateSubcommand) -> anyhow::Result<()> {
let mut jobs = vec![];
collect_validation_jobs(&mut jobs, cmd)?;

let progress_bar = indicatif::ProgressBar::new(jobs.len() as u64);

let (tx_results, rx_results) = std::sync::mpsc::channel();
let enqueuing_thread = std::thread::spawn(move || -> anyhow::Result<()> {
for job in jobs {
let tx_results = tx_results.clone();
crate::jobserver::start_job_thread(move || {
let result = match std::panic::catch_unwind(|| job()) {
Ok(result) => result,
Err(payload) => Err(match payload.downcast_ref::<&str>() {
Some(message) => {
anyhow::anyhow!("Validation job thread panicked: {}", message)
}
None => anyhow::anyhow!("Validation job thread panicked"),
}),
};
tx_results.send(result).unwrap();
})?;
}
Ok(())
});

let mut all_good = true;
for job in jobs {
if let Err(error) = job() {
for result in rx_results {
if let Err(error) = result {
all_good = false;
progress_bar.suspend(|| {
eprintln!("{:#}", error);
Expand All @@ -35,29 +57,12 @@ pub(crate) fn validate(cmd: ValidateSubcommand) -> anyhow::Result<()> {
if !all_good {
bail!("failed to validate one or more files, see above output for more details")
}
Ok(())
}

type Job = Box<dyn FnOnce() -> anyhow::Result<()>>;

fn push_job_for_each_file(
top_dir: impl AsRef<Path>,
pattern: impl AsRef<Path>,
jobs: &mut Vec<Job>,
f: impl FnOnce(std::path::PathBuf) -> anyhow::Result<()> + Clone + 'static,
) {
crate::glob::for_each_file(top_dir, pattern, move |path_result| {
// Let each job closure stand on its own.
let f = f.clone();
jobs.push(Box::new(|| f(path_result?)));
});
}

/// Call `f` to extend `jobs`, but if `f` itself fails, push a job that reports that.
fn try_push_job(jobs: &mut Vec<Job>, f: impl FnOnce(&mut Vec<Job>) -> anyhow::Result<()>) {
if let Err(error) = f(jobs) {
jobs.push(Box::new(|| Err(error)));
if let Err(error) = enqueuing_thread.join().unwrap() {
bail!("Error enqueuing jobs:\n{:#}", error);
}

Ok(())
}

fn collect_validation_jobs(jobs: &mut Vec<Job>, cmd: ValidateSubcommand) -> anyhow::Result<()> {
Expand Down Expand Up @@ -142,6 +147,30 @@ fn collect_validation_jobs(jobs: &mut Vec<Job>, cmd: ValidateSubcommand) -> anyh
Ok(())
}

fn push_job_for_each_file(
top_dir: impl AsRef<Path>,
pattern: impl AsRef<Path>,
jobs: &mut Vec<Job>,
f: impl FnOnce(std::path::PathBuf) -> anyhow::Result<()>
+ Clone
+ Send
+ std::panic::UnwindSafe
+ 'static,
) {
crate::glob::for_each_file(top_dir, pattern, move |path_result| {
// Let each job closure stand on its own.
let f = f.clone();
jobs.push(Box::new(|| f(path_result?)));
});
}

/// Call `f` to extend `jobs`, but if `f` itself fails, push a job that reports that.
fn try_push_job(jobs: &mut Vec<Job>, f: impl FnOnce(&mut Vec<Job>) -> anyhow::Result<()>) {
if let Err(error) = f(jobs) {
jobs.push(Box::new(|| Err(error)));
}
}

fn validate_spirv(path: &Path, spirv_as: &str, spirv_val: &str) -> anyhow::Result<()> {
let second_line = {
let mut file = BufReader::new(open_file(path)?);
Expand Down Expand Up @@ -233,6 +262,8 @@ fn push_job_for_each_hlsl_config_item(
jobs: &mut Vec<Job>,
validator: impl FnMut(&Path, hlsl_snapshots::ConfigItem, &str) -> anyhow::Result<()>
+ Clone
+ Send
+ std::panic::UnwindSafe
+ 'static,
) -> anyhow::Result<()> {
let hlsl_snapshots::Config {
Expand Down

0 comments on commit cd6262d

Please sign in to comment.