Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Automount: neuro run --volume=ALL #980

Merged
merged 9 commits into from
Sep 2, 2019
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.D/974.feature
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Implement mounting all available volumes: `neuro run --volume=ALL`.
93 changes: 73 additions & 20 deletions neuromation/cli/job.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,17 @@

log = logging.getLogger(__name__)

STORAGE_MOUNTPOINT = "/var/storage"
ROOT_MOUNTPOINT = "/var/neuro"

NEUROMATION_ROOT_ENV_VAR = "NEUROMATION_ROOT"
NEUROMATION_HOME_ENV_VAR = "NEUROMATION_HOME"
RESERVED_ENV_VARS = {NEUROMATION_ROOT_ENV_VAR, NEUROMATION_HOME_ENV_VAR}


def _get_neuro_mountpoint(username: str) -> str:
return f"{ROOT_MOUNTPOINT}/{username}"


def build_env(env: Sequence[str], env_file: Optional[str]) -> Dict[str, str]:
if env_file:
Expand All @@ -68,12 +79,16 @@ def build_env(env: Sequence[str], env_file: Optional[str]) -> Dict[str, str]:
env_dict = {}
for line in env:
splitted = line.split("=", 1)
name = splitted[0]
if len(splitted) == 1:
val = os.environ.get(splitted[0], "")
env_dict[splitted[0]] = val
else:
env_dict[splitted[0]] = splitted[1]

val = splitted[1]
if name in RESERVED_ENV_VARS:
raise click.UsageError(
f"Unable to re-define system-reserved environment variable: {name}"
)
env_dict[name] = val
return env_dict


Expand Down Expand Up @@ -176,8 +191,8 @@ def job() -> None:
multiple=True,
help="Mounts directory from vault into container. "
"Use multiple options to mount more than one volume. "
"--volume=HOME is an alias for storage://~:/var/storage/home:rw and "
"storage://neuromation/public:/var/storage/neuromation:ro",
f"--volume=HOME is an alias for storage://~:/var/storage/home:rw and "
atemate marked this conversation as resolved.
Show resolved Hide resolved
f"storage://neuromation/public:/var/storage/neuromation:ro",
)
@click.option(
"--entrypoint",
Expand Down Expand Up @@ -647,8 +662,8 @@ def format_fail(job: str, reason: Exception) -> str:
multiple=True,
help="Mounts directory from vault into container. "
"Use multiple options to mount more than one volume. "
"--volume=HOME is an alias for storage://~:/var/storage/home:rw and "
"storage://neuromation/public:/var/storage/neuromation:ro",
f"--volume=HOME is an alias for storage://~:/var/storage/home:rw and "
f"storage://neuromation/public:/var/storage/neuromation:ro",
)
@click.option(
"--entrypoint",
Expand Down Expand Up @@ -845,18 +860,7 @@ async def run_job(
tpu_type=tpu_type,
tpu_software_version=tpu_software_version,
)

volumes: Set[Volume] = set()
for v in volume:
if v == "HOME":
volumes.add(root.client.parse.volume("storage://~:/var/storage/home:rw"))
volumes.add(
root.client.parse.volume(
"storage://neuromation/public:/var/storage/neuromation:ro"
)
)
else:
volumes.add(root.client.parse.volume(v))
volumes = await _build_volumes(root, volume, env_dict)

if pass_config:
if CONFIG_ENV_NAME in env_dict:
Expand Down Expand Up @@ -912,14 +916,63 @@ async def run_job(
return job


async def _build_volumes(
root: Root, input_volumes: Sequence[str], env_dict: Dict[str, str]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

are input_volumes unique?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

no, it's direct user input.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should we handle edge cases such as same volumes etc? (prob out of scope)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

looking at the code, I can say that this case is handled properly: the result collection volumes is of type Set (but in order to reduce number of loop iterations, I changed input array to the type of Set as well).

However, this is a good case indeed, neuro run --volume=ALL --volume=<custom>, so since --volume=ALL already mounts all possible volumes, I changed the code to forbid this option together with any other option -- thank you for pointing this out!

) -> Set[Volume]:
input_volumes_set = set(input_volumes)
volumes: Set[Volume] = set()

if "ALL" in input_volumes_set:
if len(input_volumes_set) > 1:
raise click.UsageError(
f"Cannot use `--volume=ALL` together with other `--volume` options"
)
available = await root.client.users.get_acl(root.username, scheme="storage")
volumes.update(
Volume(
storage_uri=perm.uri,
container_path=f"{ROOT_MOUNTPOINT}/{perm.uri.host}{perm.uri.path}",
read_only=perm.action not in ("write", "manage"),
)
for perm in available
)
neuro_mountpoint = _get_neuro_mountpoint(root.username)
env_dict[NEUROMATION_HOME_ENV_VAR] = neuro_mountpoint
env_dict[NEUROMATION_ROOT_ENV_VAR] = ROOT_MOUNTPOINT
if not root.quiet:
click.echo(
"Storage mountpoints will be available as the environment variables:\n"
f" {NEUROMATION_ROOT_ENV_VAR}={ROOT_MOUNTPOINT}\n"
f" {NEUROMATION_HOME_ENV_VAR}={neuro_mountpoint}"
)
else:
for vol in input_volumes_set:
if vol == "HOME":
volumes.add(
root.client.parse.volume(
f"storage://~:{STORAGE_MOUNTPOINT}/home:rw"
)
)
volumes.add(
root.client.parse.volume(
f"storage://neuromation/public:"
f"{STORAGE_MOUNTPOINT}/neuromation:ro"
)
)
# TODO (artem) print deprecation warning (issue #1009)
else:
volumes.add(root.client.parse.volume(vol))
return volumes


async def upload_and_map_config(root: Root) -> Tuple[str, Volume]:

# store the Neuro CLI config on the storage under some random path
nmrc_path = URL(root.config_path.expanduser().resolve().as_uri())
random_nmrc_filename = f"{uuid.uuid4()}-nmrc"
storage_nmrc_folder = URL(f"storage://{root.username}/nmrc/")
storage_nmrc_path = storage_nmrc_folder / random_nmrc_filename
local_nmrc_folder = "/var/storage/nmrc/"
local_nmrc_folder = f"{STORAGE_MOUNTPOINT}/nmrc/"
local_nmrc_path = f"{local_nmrc_folder}{random_nmrc_filename}"
if not root.quiet:
click.echo(f"Temporary config file created on storage: {storage_nmrc_path}.")
Expand Down
33 changes: 32 additions & 1 deletion tests/cli/test_job.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,12 @@
import logging
from pathlib import Path
from typing import Any, Tuple

import click
import pytest

from neuromation.api import JobStatus
from neuromation.cli.job import calc_statuses
from neuromation.cli.job import NEUROMATION_ROOT_ENV_VAR, build_env, calc_statuses


logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -96,3 +97,33 @@ def test_calc_statuses__check_defaults__all_statuses_true(
assert not std.out
assert not std.err
assert not caplog.text


@pytest.mark.parametrize(
"env_var", [NEUROMATION_ROOT_ENV_VAR, f"{NEUROMATION_ROOT_ENV_VAR}=value"]
)
def test_build_env_reserved_env_var_conflict_passed_as_parameter(env_var: str) -> None:
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

apart from these two tests, build_env is not covered

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should we properly cover it then?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sure. but not in this PR. Issue: #995

env = ("ENV_VAR_1=value", "ENV_VAR_2=value", env_var)
with pytest.raises(
click.UsageError,
match="Unable to re-define system-reserved environment variable",
):
build_env(env, env_file=None)


@pytest.mark.parametrize(
"env_var", [NEUROMATION_ROOT_ENV_VAR, f"{NEUROMATION_ROOT_ENV_VAR}=value"]
)
def test_build_env_reserved_env_var_conflict_passed_in_file(
env_var: str, tmp_path: Path
) -> None:
env_1 = ("ENV_VAR_1=value",)
env_2 = ("ENV_VAR_2=value", env_var)
env_file = tmp_path / "env_var.txt"
env_file.write_text("\n".join(env_2))

with pytest.raises(
click.UsageError,
match="Unable to re-define system-reserved environment variable",
):
build_env(env_1, env_file=str(env_file))
2 changes: 1 addition & 1 deletion tests/e2e/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -348,7 +348,7 @@ async def wait_job_change_state_to(
async with api_get(timeout=CLIENT_TIMEOUT, path=self._nmrc_path) as client:
job = await client.jobs.status(job_id)
while target_state != job.status:
if stop_state == job.status:
if stop_state and stop_state == job.status:
raise JobWaitStateStopReached(
f"failed running job {job_id}: '{stop_state}'"
)
Expand Down
59 changes: 59 additions & 0 deletions tests/e2e/test_e2e_jobs.py
Original file line number Diff line number Diff line change
Expand Up @@ -1040,3 +1040,62 @@ def test_job_run_home_volumes_automount(helper: Helper, fakebrowser: Any) -> Non
params=("--volume", "HOME"),
wait_state=JobStatus.SUCCEEDED,
)


@pytest.mark.e2e
def test_job_run_volume_all(helper: Helper) -> None:
root_mountpoint = "/var/neuro"
cmd = " && ".join(
[
f"[ -d {root_mountpoint}/{helper.username} ]",
f"[ -d {root_mountpoint}/neuromation ]", # must be public
f"[ -d {root_mountpoint}/test2/public ]", # must be public
f"[ $NEUROMATION_ROOT == {root_mountpoint} ]",
f"[ $NEUROMATION_HOME == {root_mountpoint}/{helper.username} ]",
]
)
command = f"bash -c '{cmd}'"
img = UBUNTU_IMAGE_NAME

# first, run without --volume=ALL
captured = helper.run_cli(
["--quiet", "run", "--detach", "-s", "cpu-small", img, command]
)
job_id = captured.out
helper.wait_job_change_state_to(job_id, JobStatus.FAILED)

# then, run with --volume=ALL
captured = helper.run_cli(
["run", "--detach", "-s", "cpu-small", "--volume=ALL", img, command]
)
assert not captured.err
msg = (
"Storage mountpoints will be available as the environment variables:\n"
f" NEUROMATION_ROOT={root_mountpoint}\n"
f" NEUROMATION_HOME={root_mountpoint}/{helper.username}"
)
assert msg in captured.out
found_job_ids = re.findall("Job ID: (job-.+) Status:", captured.out)
assert len(found_job_ids) == 1
job_id = found_job_ids[0]
helper.wait_job_change_state_to(
job_id, JobStatus.SUCCEEDED, stop_state=JobStatus.FAILED
)


@pytest.mark.e2e
def test_job_run_volume_all_and_home(helper: Helper) -> None:
with pytest.raises(subprocess.CalledProcessError):
args = ["--volume", "ALL", "--volume", "HOME"]
captured = helper.run_cli(["job", "run", *args, UBUNTU_IMAGE_NAME, "sleep 30"])
msg = "Cannot use `--volume=ALL` together with other `--volume` options"
assert msg in captured.err


@pytest.mark.e2e
def test_job_run_volume_all_and_another(helper: Helper) -> None:
with pytest.raises(subprocess.CalledProcessError):
args = ["--volume", "ALL", "--volume", "storage::/home:ro"]
captured = helper.run_cli(["job", "run", *args, UBUNTU_IMAGE_NAME, "sleep 30"])
msg = "Cannot use `--volume=ALL` together with other `--volume` options"
assert msg in captured.err