Skip to content

Commit

Permalink
Fix Process output capture re: working_directory. (#12197)
Browse files Browse the repository at this point in the history
Previously our local Process output capturing did not match the REAPI
spec which led to local execution working and remote execution failing
whenever a Process execution used both `working_directory` and output
capturing.

Add a test that output capturing occurs relative to the
`working_directory` when set and fix out one combined use of
`working_directory` and output capturing in `archive.py`.

Fixes #12157
  • Loading branch information
jsirois authored Jun 12, 2021
1 parent b6ae6c0 commit 982de2a
Show file tree
Hide file tree
Showing 4 changed files with 45 additions and 31 deletions.
13 changes: 6 additions & 7 deletions src/python/pants/core/util_rules/archive.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
from dataclasses import dataclass
from enum import Enum

from pants.engine.fs import CreateDigest, Digest, Directory, MergeDigests, RemovePrefix, Snapshot
from pants.engine.fs import CreateDigest, Digest, Directory, MergeDigests, Snapshot
from pants.engine.process import (
BinaryNotFoundError,
BinaryPath,
Expand Down Expand Up @@ -170,10 +170,10 @@ async def maybe_extract_archive(
digest: Digest, tar_binary: TarBinary, unzip_binary: UnzipBinary
) -> ExtractedArchive:
"""If digest contains a single archive file, extract it, otherwise return the input digest."""
output_dir = "__output"
extract_archive_dir = "__extract_archive_dir"
snapshot, output_dir_digest = await MultiGet(
Get(Snapshot, Digest, digest),
Get(Digest, CreateDigest([Directory(output_dir)])),
Get(Digest, CreateDigest([Directory(extract_archive_dir)])),
)
if len(snapshot.files) != 1:
return ExtractedArchive(digest)
Expand All @@ -199,12 +199,11 @@ async def maybe_extract_archive(
input_digest=input_digest,
description=f"Extract {fp}",
level=LogLevel.DEBUG,
output_directories=(output_dir,),
working_directory=output_dir,
output_directories=(".",),
working_directory=extract_archive_dir,
),
)
strip_output_dir = await Get(Digest, RemovePrefix(result.output_digest, output_dir))
return ExtractedArchive(strip_output_dir)
return ExtractedArchive(result.output_digest)


def rules():
Expand Down
19 changes: 11 additions & 8 deletions src/python/pants/engine/process.py
Original file line number Diff line number Diff line change
Expand Up @@ -94,21 +94,24 @@ def __init__(
that are not explicitly populated. For example, $PATH will not be defined by default, unless
populated through the `env` parameter.
Usually, you will want to provide input files/directories via the parameter `input_digest`. The
process will then be able to access these paths through relative paths. If you want to give
multiple input digests, first merge them with `await Get(Digest, MergeDigests)`.
Usually, you will want to provide input files/directories via the parameter `input_digest`.
The process will then be able to access these paths through relative paths. If you want to
give multiple input digests, first merge them with `await Get(Digest, MergeDigests)`.
Often, you will want to capture the files/directories created in the process. To do this, you
can either set `output_files` or `output_directories`. The specified paths will then be used to
populate `output_digest` on the `ProcessResult`. If you want to split up this output digest
into multiple digests, use `await Get(Digest, DigestSubset)` on the `output_digest`.
Often, you will want to capture the files/directories created in the process. To do this,
you can either set `output_files` or `output_directories`. The specified paths should be
specified relative to the `working_directory`, if any, and will then be used to populate
`output_digest` on the `ProcessResult`. If you want to split up this output digest into
multiple digests, use `await Get(Digest, DigestSubset)` on the `output_digest`.
To actually run the process, use `await Get(ProcessResult, Process)` or
`await Get(FallibleProcessResult, Process)`.
Example:
result = await Get(ProcessResult, Process(["/bin/echo", "hello world"], description="demo"))
result = await Get(
ProcessResult, Process(["/bin/echo", "hello world"], description="demo")
)
assert result.stdout == b"hello world"
"""
if isinstance(argv, str):
Expand Down
32 changes: 20 additions & 12 deletions src/rust/engine/process_execution/src/local.rs
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,13 @@ impl CommandRunner {
let output_paths: Result<Vec<String>, String> = output_dir_paths
.into_iter()
.flat_map(|p| {
let mut dir_glob = PathBuf::from(p).into_os_string();
let mut dir_glob = {
let mut dir = PathBuf::from(p).into_os_string();
if dir.is_empty() {
dir.push(".")
}
dir
};
let dir = dir_glob.clone();
dir_glob.push("/**");
vec![dir, dir_glob]
Expand Down Expand Up @@ -543,19 +549,21 @@ pub trait CapturedWorkdir {
let output_snapshot = if req.output_files.is_empty() && req.output_directories.is_empty() {
store::Snapshot::empty()
} else {
let root = if let Some(ref working_directory) = req.working_directory {
workdir_path.join(working_directory)
} else {
workdir_path.clone()
};
// Use no ignore patterns, because we are looking for explicitly listed paths.
let posix_fs = Arc::new(
fs::PosixFS::new(
workdir_path.clone(),
fs::GitignoreStyleExcludes::empty(),
executor.clone(),
)
.map_err(|err| {
format!(
"Error making posix_fs to fetch local process execution output files: {}",
err
)
})?,
fs::PosixFS::new(root, fs::GitignoreStyleExcludes::empty(), executor.clone()).map_err(
|err| {
format!(
"Error making posix_fs to fetch local process execution output files: {}",
err
)
},
)?,
);
CommandRunner::construct_output_snapshot(
store.clone(),
Expand Down
12 changes: 8 additions & 4 deletions src/rust/engine/process_execution/src/local_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use hashing::EMPTY_DIGEST;
use shell_quote::bash;
use spectral::{assert_that, string::StrAssertions};
use std;
use std::collections::BTreeMap;
use std::collections::{BTreeMap, BTreeSet};
use std::path::PathBuf;
use std::str;
use std::time::Duration;
Expand Down Expand Up @@ -567,8 +567,8 @@ async fn working_directory() {
let executor = task_executor::Executor::new();
let store = Store::local_only(executor.clone(), store_dir.path()).unwrap();

// Prepare the store to contain /cats/roland.ext, because the EPR needs to materialize it and then run
// from the ./cats directory.
// Prepare the store to contain /cats/roland.ext, because the EPR needs to materialize it and
// then run from the ./cats directory.
store
.store_file_bytes(TestData::roland().bytes(), false)
.await
Expand All @@ -586,6 +586,7 @@ async fn working_directory() {

let mut process = Process::new(vec![find_bash(), "-c".to_owned(), "/bin/ls".to_string()]);
process.working_directory = Some(RelativePath::new("cats").unwrap());
process.output_directories = relative_paths(&["roland.ext"]).collect::<BTreeSet<_>>();
process.input_files = TestDirectory::nested().digest();
process.timeout = one_second();
process.description = "confused-cat".to_string();
Expand All @@ -603,7 +604,10 @@ async fn working_directory() {
assert_eq!(result.stdout_bytes, "roland.ext\n".as_bytes());
assert_eq!(result.stderr_bytes, "".as_bytes());
assert_eq!(result.original.exit_code, 0);
assert_eq!(result.original.output_directory, EMPTY_DIGEST);
assert_eq!(
result.original.output_directory,
TestDirectory::containing_roland().digest()
);
assert_eq!(result.original.platform, Platform::current().unwrap());
}

Expand Down

0 comments on commit 982de2a

Please sign in to comment.