Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add SpawnedJob.and_then API. #1435

Merged
merged 2 commits into from
Sep 1, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 8 additions & 10 deletions pex/interpreter.py
Original file line number Diff line number Diff line change
Expand Up @@ -1182,38 +1182,36 @@ def __repr__(self):


def spawn_python_job(
args, env=None, interpreter=None, expose=None, pythonpath=None, **subprocess_kwargs
args, # type: Iterable[str]
env=None, # type: Optional[Mapping[str, str]]
interpreter=None, # type: Optional[PythonInterpreter]
expose=None, # type: Optional[Iterable[str]]
pythonpath=None, # type: Optional[Iterable[str]]
**subprocess_kwargs # type: Any
):
# type: (...) -> Job
"""Spawns a python job.

:param args: The arguments to pass to the python interpreter.
:type args: list of str
:param env: The environment to spawn the python interpreter process in. Defaults to the ambient
environment.
:type env: dict of (str, str)
:param interpreter: The interpreter to use to spawn the python job. Defaults to the current
interpreter.
:type interpreter: :class:`PythonInterpreter`
:param expose: The names of any vendored distributions to expose to the spawned python process.
These will be appended to `pythonpath` if passed.
:type expose: list of str
:param pythonpath: The PYTHONPATH to expose to the spawned python process. These will be
pre-pended to the `expose` path if passed.
:type pythonpath: list of str
:param subprocess_kwargs: Any additional :class:`subprocess.Popen` kwargs to pass through.
:returns: A job handle to the spawned python process.
:rtype: :class:`Job`
"""
pythonpath = list(pythonpath or ())
subprocess_env = dict(env or os.environ)
if expose:
subprocess_env = (env or os.environ).copy()
# In order to expose vendored distributions with their un-vendored import paths in-tact, we
# need to set `__PEX_UNVENDORED__`. See: vendor.__main__.ImportRewriter._modify_import.
subprocess_env["__PEX_UNVENDORED__"] = "1"

pythonpath.extend(third_party.expose(expose))
else:
subprocess_env = env

interpreter = interpreter or PythonInterpreter.get()
cmd, process = interpreter.open_process(
Expand Down
32 changes: 24 additions & 8 deletions pex/jobs.py
Original file line number Diff line number Diff line change
Expand Up @@ -70,8 +70,8 @@ def wait(self):
:raises: :class:`Job.Error` if the job exited non-zero.
"""
try:
self._process.wait()
self._check_returncode()
_, stderr = self._process.communicate()
self._check_returncode(stderr)
finally:
self._finalize_job()

Expand Down Expand Up @@ -178,22 +178,38 @@ def wait(
:return: A spawned job whose result is a side effect of the job (a written file, a populated
directory, etc.).
"""
return cls.and_then(job, lambda: result)

class Wait(SpawnedJob):
@classmethod
def and_then(
cls,
job, # type: Job
result_func, # type: Callable[[], _T]
):
# type: (...) -> SpawnedJob[_T]
"""Wait for the job to complete and return a result derived from its side effects.

:param job: The spawned job.
:param result_func: A function that will be called to produce the result upon job success.
:return: A spawned job whose result is derived from a side effect of the job (a written
file, a populated directory, etc.).
"""

class AndThen(SpawnedJob):
def await_result(self):
# type: () -> _T
job.wait()
return result
return result_func()

def kill(self):
# type: () -> None
job.kill()

def __repr__(self):
# type: () -> str
return "SpawnedJob.wait({!r})".format(job)
return "SpawnedJob.and_then({!r})".format(job)

return Wait()
return AndThen()

@classmethod
def stdout(
Expand All @@ -206,8 +222,8 @@ def stdout(
"""Wait for the job to complete and return a result derived from its stdout.

:param job: The spawned job.
:param result_func: A function taking the stdout byte string collected from the spawned job and
returning the desired result.
:param result_func: A function taking the stdout byte string collected from the spawned job
and returning the desired result.
:param input: Optional input stream data to pass to the process as per the
`subprocess.Popen.communicate` API.
:return: A spawned job whose result is derived from stdout contents.
Expand Down
8 changes: 5 additions & 3 deletions tests/test_bdist_pex.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,15 +17,17 @@


def pex_project_dir():
# type: () -> Text
return subprocess.check_output(["git", "rev-parse", "--show-toplevel"]).decode("utf-8").strip()
# type: () -> str
return str(
subprocess.check_output(["git", "rev-parse", "--show-toplevel"]).decode("ascii").strip()
)


BDIST_PEX_PYTHONPATH = None


def bdist_pex_pythonpath():
# type: () -> List[Text]
# type: () -> List[str]
# In order to run the bdist_pex distutils command we need:
# 1. setuptools on the PYTHONPATH since the test projects use and test setuptools.setup and its
# additional features above and beyond distutils.core.setup like entry points declaration.
Expand Down
88 changes: 87 additions & 1 deletion tests/test_jobs.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,19 @@
# Copyright 2019 Pants project contributors (see CONTRIBUTORS.md).
# Licensed under the Apache License, Version 2.0 (see LICENSE).

from pex.jobs import _ABSOLUTE_MAX_JOBS, DEFAULT_MAX_JOBS, _sanitize_max_jobs
import json
import os
import subprocess
from textwrap import dedent

import pytest

from pex.interpreter import spawn_python_job
from pex.jobs import _ABSOLUTE_MAX_JOBS, DEFAULT_MAX_JOBS, Job, SpawnedJob, _sanitize_max_jobs
from pex.typing import TYPE_CHECKING, cast

if TYPE_CHECKING:
from typing import Any, Dict


def test_sanitize_max_jobs_none():
Expand All @@ -26,3 +38,77 @@ def test_sanitize_max_jobs_too_large():
assert _ABSOLUTE_MAX_JOBS == _sanitize_max_jobs(_ABSOLUTE_MAX_JOBS)
assert _ABSOLUTE_MAX_JOBS == _sanitize_max_jobs(_ABSOLUTE_MAX_JOBS + 1)
assert _ABSOLUTE_MAX_JOBS == _sanitize_max_jobs(_ABSOLUTE_MAX_JOBS + 5)


def create_error_job(exit_code):
# type: (int) -> Job
return spawn_python_job(args=["-c", "import sys; sys.exit({})".format(exit_code)])


def test_spawn_wait():
# type: () -> None
result = object()
assert (
result is SpawnedJob.wait(job=spawn_python_job(args=["-V"]), result=result).await_result()
)

spawned_job = SpawnedJob.wait(job=create_error_job(42), result=1 / 137)
with pytest.raises(Job.Error) as exec_info:
spawned_job.await_result()
assert 42 == exec_info.value.exitcode


def test_spawn_and_then(tmpdir):
# type: (Any) -> None
side_effect_file = os.path.join(str(tmpdir), "side.effect")

def observe_side_effect():
# type: () -> Dict[str, int]
with open(side_effect_file) as fp:
return cast("Dict[str, int]", json.load(fp))

assert (
{"exit_code": 42}
== SpawnedJob.and_then(
job=spawn_python_job(
args=[
"-c",
dedent(
"""\
import json

with open({side_effect_file!r}, "w") as fp:
json.dump({{"exit_code": 42}}, fp)
"""
).format(side_effect_file=side_effect_file),
]
),
result_func=observe_side_effect,
).await_result()
)

spawned_job = SpawnedJob.and_then(job=create_error_job(3), result_func=lambda: 1 / 137)
with pytest.raises(Job.Error) as exec_info:
spawned_job.await_result()
assert 3 == exec_info.value.exitcode


def test_spawn_stdout():
# type: () -> None
assert (
"Jane\n"
== SpawnedJob.stdout(
job=spawn_python_job(
args=["-c", "import sys; print(sys.stdin.read())"],
stdin=subprocess.PIPE,
stdout=subprocess.PIPE,
),
result_func=lambda stdout: stdout.decode("utf-8"),
input=b"Jane",
).await_result()
)

spawned_job = SpawnedJob.stdout(create_error_job(137), lambda output: 42)
with pytest.raises(Job.Error) as exec_info:
spawned_job.await_result()
assert 137 == exec_info.value.exitcode