diff --git a/pex/interpreter.py b/pex/interpreter.py index faf8ab72e..49ec27f61 100644 --- a/pex/interpreter.py +++ b/pex/interpreter.py @@ -1182,38 +1182,36 @@ def __repr__(self): def spawn_python_job( - args, env=None, interpreter=None, expose=None, pythonpath=None, **subprocess_kwargs + args, # type: Iterable[str] + env=None, # type: Optional[Mapping[str, str]] + interpreter=None, # type: Optional[PythonInterpreter] + expose=None, # type: Optional[Iterable[str]] + pythonpath=None, # type: Optional[Iterable[str]] + **subprocess_kwargs # type: Any ): + # type: (...) -> Job """Spawns a python job. :param args: The arguments to pass to the python interpreter. - :type args: list of str :param env: The environment to spawn the python interpreter process in. Defaults to the ambient environment. - :type env: dict of (str, str) :param interpreter: The interpreter to use to spawn the python job. Defaults to the current interpreter. - :type interpreter: :class:`PythonInterpreter` :param expose: The names of any vendored distributions to expose to the spawned python process. These will be appended to `pythonpath` if passed. - :type expose: list of str :param pythonpath: The PYTHONPATH to expose to the spawned python process. These will be pre-pended to the `expose` path if passed. - :type pythonpath: list of str :param subprocess_kwargs: Any additional :class:`subprocess.Popen` kwargs to pass through. :returns: A job handle to the spawned python process. - :rtype: :class:`Job` """ pythonpath = list(pythonpath or ()) + subprocess_env = dict(env or os.environ) if expose: - subprocess_env = (env or os.environ).copy() # In order to expose vendored distributions with their un-vendored import paths in-tact, we # need to set `__PEX_UNVENDORED__`. See: vendor.__main__.ImportRewriter._modify_import. subprocess_env["__PEX_UNVENDORED__"] = "1" pythonpath.extend(third_party.expose(expose)) - else: - subprocess_env = env interpreter = interpreter or PythonInterpreter.get() cmd, process = interpreter.open_process( diff --git a/pex/jobs.py b/pex/jobs.py index 8d4411684..d6ee8313a 100644 --- a/pex/jobs.py +++ b/pex/jobs.py @@ -70,8 +70,8 @@ def wait(self): :raises: :class:`Job.Error` if the job exited non-zero. """ try: - self._process.wait() - self._check_returncode() + _, stderr = self._process.communicate() + self._check_returncode(stderr) finally: self._finalize_job() @@ -178,12 +178,28 @@ def wait( :return: A spawned job whose result is a side effect of the job (a written file, a populated directory, etc.). """ + return cls.and_then(job, lambda: result) - class Wait(SpawnedJob): + @classmethod + def and_then( + cls, + job, # type: Job + result_func, # type: Callable[[], _T] + ): + # type: (...) -> SpawnedJob[_T] + """Wait for the job to complete and return a result derived from its side effects. + + :param job: The spawned job. + :param result_func: A function that will be called to produce the result upon job success. + :return: A spawned job whose result is derived from a side effect of the job (a written + file, a populated directory, etc.). + """ + + class AndThen(SpawnedJob): def await_result(self): # type: () -> _T job.wait() - return result + return result_func() def kill(self): # type: () -> None @@ -191,9 +207,9 @@ def kill(self): def __repr__(self): # type: () -> str - return "SpawnedJob.wait({!r})".format(job) + return "SpawnedJob.and_then({!r})".format(job) - return Wait() + return AndThen() @classmethod def stdout( @@ -206,8 +222,8 @@ def stdout( """Wait for the job to complete and return a result derived from its stdout. :param job: The spawned job. - :param result_func: A function taking the stdout byte string collected from the spawned job and - returning the desired result. + :param result_func: A function taking the stdout byte string collected from the spawned job + and returning the desired result. :param input: Optional input stream data to pass to the process as per the `subprocess.Popen.communicate` API. :return: A spawned job whose result is derived from stdout contents. diff --git a/tests/test_bdist_pex.py b/tests/test_bdist_pex.py index 9a843f41f..e36d2a073 100644 --- a/tests/test_bdist_pex.py +++ b/tests/test_bdist_pex.py @@ -17,15 +17,17 @@ def pex_project_dir(): - # type: () -> Text - return subprocess.check_output(["git", "rev-parse", "--show-toplevel"]).decode("utf-8").strip() + # type: () -> str + return str( + subprocess.check_output(["git", "rev-parse", "--show-toplevel"]).decode("ascii").strip() + ) BDIST_PEX_PYTHONPATH = None def bdist_pex_pythonpath(): - # type: () -> List[Text] + # type: () -> List[str] # In order to run the bdist_pex distutils command we need: # 1. setuptools on the PYTHONPATH since the test projects use and test setuptools.setup and its # additional features above and beyond distutils.core.setup like entry points declaration. diff --git a/tests/test_jobs.py b/tests/test_jobs.py index 16cbd7aa8..0ed9407bf 100644 --- a/tests/test_jobs.py +++ b/tests/test_jobs.py @@ -1,7 +1,19 @@ # Copyright 2019 Pants project contributors (see CONTRIBUTORS.md). # Licensed under the Apache License, Version 2.0 (see LICENSE). -from pex.jobs import _ABSOLUTE_MAX_JOBS, DEFAULT_MAX_JOBS, _sanitize_max_jobs +import json +import os +import subprocess +from textwrap import dedent + +import pytest + +from pex.interpreter import spawn_python_job +from pex.jobs import _ABSOLUTE_MAX_JOBS, DEFAULT_MAX_JOBS, Job, SpawnedJob, _sanitize_max_jobs +from pex.typing import TYPE_CHECKING, cast + +if TYPE_CHECKING: + from typing import Any, Dict def test_sanitize_max_jobs_none(): @@ -26,3 +38,77 @@ def test_sanitize_max_jobs_too_large(): assert _ABSOLUTE_MAX_JOBS == _sanitize_max_jobs(_ABSOLUTE_MAX_JOBS) assert _ABSOLUTE_MAX_JOBS == _sanitize_max_jobs(_ABSOLUTE_MAX_JOBS + 1) assert _ABSOLUTE_MAX_JOBS == _sanitize_max_jobs(_ABSOLUTE_MAX_JOBS + 5) + + +def create_error_job(exit_code): + # type: (int) -> Job + return spawn_python_job(args=["-c", "import sys; sys.exit({})".format(exit_code)]) + + +def test_spawn_wait(): + # type: () -> None + result = object() + assert ( + result is SpawnedJob.wait(job=spawn_python_job(args=["-V"]), result=result).await_result() + ) + + spawned_job = SpawnedJob.wait(job=create_error_job(42), result=1 / 137) + with pytest.raises(Job.Error) as exec_info: + spawned_job.await_result() + assert 42 == exec_info.value.exitcode + + +def test_spawn_and_then(tmpdir): + # type: (Any) -> None + side_effect_file = os.path.join(str(tmpdir), "side.effect") + + def observe_side_effect(): + # type: () -> Dict[str, int] + with open(side_effect_file) as fp: + return cast("Dict[str, int]", json.load(fp)) + + assert ( + {"exit_code": 42} + == SpawnedJob.and_then( + job=spawn_python_job( + args=[ + "-c", + dedent( + """\ + import json + + with open({side_effect_file!r}, "w") as fp: + json.dump({{"exit_code": 42}}, fp) + """ + ).format(side_effect_file=side_effect_file), + ] + ), + result_func=observe_side_effect, + ).await_result() + ) + + spawned_job = SpawnedJob.and_then(job=create_error_job(3), result_func=lambda: 1 / 137) + with pytest.raises(Job.Error) as exec_info: + spawned_job.await_result() + assert 3 == exec_info.value.exitcode + + +def test_spawn_stdout(): + # type: () -> None + assert ( + "Jane\n" + == SpawnedJob.stdout( + job=spawn_python_job( + args=["-c", "import sys; print(sys.stdin.read())"], + stdin=subprocess.PIPE, + stdout=subprocess.PIPE, + ), + result_func=lambda stdout: stdout.decode("utf-8"), + input=b"Jane", + ).await_result() + ) + + spawned_job = SpawnedJob.stdout(create_error_job(137), lambda output: 42) + with pytest.raises(Job.Error) as exec_info: + spawned_job.await_result() + assert 137 == exec_info.value.exitcode