Skip to content

Commit

Permalink
Automount: neuro run --volume=ALL (#980)
Browse files Browse the repository at this point in the history
  • Loading branch information
Artem Yushkovskiy authored Sep 2, 2019
1 parent f73aafe commit 0518c51
Show file tree
Hide file tree
Showing 5 changed files with 162 additions and 18 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.D/974.feature
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Implement mounting all available volumes: `neuro run --volume=ALL`.
85 changes: 69 additions & 16 deletions neuromation/cli/job.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,17 @@

log = logging.getLogger(__name__)

STORAGE_MOUNTPOINT = "/var/storage"
ROOT_MOUNTPOINT = "/var/neuro"

NEUROMATION_ROOT_ENV_VAR = "NEUROMATION_ROOT"
NEUROMATION_HOME_ENV_VAR = "NEUROMATION_HOME"
RESERVED_ENV_VARS = {NEUROMATION_ROOT_ENV_VAR, NEUROMATION_HOME_ENV_VAR}


def _get_neuro_mountpoint(username: str) -> str:
return f"{ROOT_MOUNTPOINT}/{username}"


def build_env(env: Sequence[str], env_file: Optional[str]) -> Dict[str, str]:
if env_file:
Expand All @@ -68,12 +79,16 @@ def build_env(env: Sequence[str], env_file: Optional[str]) -> Dict[str, str]:
env_dict = {}
for line in env:
splitted = line.split("=", 1)
name = splitted[0]
if len(splitted) == 1:
val = os.environ.get(splitted[0], "")
env_dict[splitted[0]] = val
else:
env_dict[splitted[0]] = splitted[1]

val = splitted[1]
if name in RESERVED_ENV_VARS:
raise click.UsageError(
f"Unable to re-define system-reserved environment variable: {name}"
)
env_dict[name] = val
return env_dict


Expand Down Expand Up @@ -847,18 +862,7 @@ async def run_job(
tpu_type=tpu_type,
tpu_software_version=tpu_software_version,
)

volumes: Set[Volume] = set()
for v in volume:
if v == "HOME":
volumes.add(root.client.parse.volume("storage://~:/var/storage/home:rw"))
volumes.add(
root.client.parse.volume(
"storage://neuromation/public:/var/storage/neuromation:ro"
)
)
else:
volumes.add(root.client.parse.volume(v))
volumes = await _build_volumes(root, volume, env_dict)

if pass_config:
if CONFIG_ENV_NAME in env_dict:
Expand Down Expand Up @@ -914,14 +918,63 @@ async def run_job(
return job


async def _build_volumes(
root: Root, input_volumes: Sequence[str], env_dict: Dict[str, str]
) -> Set[Volume]:
input_volumes_set = set(input_volumes)
volumes: Set[Volume] = set()

if "ALL" in input_volumes_set:
if len(input_volumes_set) > 1:
raise click.UsageError(
f"Cannot use `--volume=ALL` together with other `--volume` options"
)
available = await root.client.users.get_acl(root.username, scheme="storage")
volumes.update(
Volume(
storage_uri=perm.uri,
container_path=f"{ROOT_MOUNTPOINT}/{perm.uri.host}{perm.uri.path}",
read_only=perm.action not in ("write", "manage"),
)
for perm in available
)
neuro_mountpoint = _get_neuro_mountpoint(root.username)
env_dict[NEUROMATION_HOME_ENV_VAR] = neuro_mountpoint
env_dict[NEUROMATION_ROOT_ENV_VAR] = ROOT_MOUNTPOINT
if not root.quiet:
click.echo(
"Storage mountpoints will be available as the environment variables:\n"
f" {NEUROMATION_ROOT_ENV_VAR}={ROOT_MOUNTPOINT}\n"
f" {NEUROMATION_HOME_ENV_VAR}={neuro_mountpoint}"
)
else:
for vol in input_volumes_set:
if vol == "HOME":
volumes.add(
root.client.parse.volume(
f"storage://~:{STORAGE_MOUNTPOINT}/home:rw"
)
)
volumes.add(
root.client.parse.volume(
f"storage://neuromation/public:"
f"{STORAGE_MOUNTPOINT}/neuromation:ro"
)
)
# TODO (artem) print deprecation warning (issue #1009)
else:
volumes.add(root.client.parse.volume(vol))
return volumes


async def upload_and_map_config(root: Root) -> Tuple[str, Volume]:

# store the Neuro CLI config on the storage under some random path
nmrc_path = URL(root.config_path.expanduser().resolve().as_uri())
random_nmrc_filename = f"{uuid.uuid4()}-nmrc"
storage_nmrc_folder = URL(f"storage://{root.username}/nmrc/")
storage_nmrc_path = storage_nmrc_folder / random_nmrc_filename
local_nmrc_folder = "/var/storage/nmrc/"
local_nmrc_folder = f"{STORAGE_MOUNTPOINT}/nmrc/"
local_nmrc_path = f"{local_nmrc_folder}{random_nmrc_filename}"
if not root.quiet:
click.echo(f"Temporary config file created on storage: {storage_nmrc_path}.")
Expand Down
33 changes: 32 additions & 1 deletion tests/cli/test_job.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,12 @@
import logging
from pathlib import Path
from typing import Any, Tuple

import click
import pytest

from neuromation.api import JobStatus
from neuromation.cli.job import calc_statuses
from neuromation.cli.job import NEUROMATION_ROOT_ENV_VAR, build_env, calc_statuses


logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -96,3 +97,33 @@ def test_calc_statuses__check_defaults__all_statuses_true(
assert not std.out
assert not std.err
assert not caplog.text


@pytest.mark.parametrize(
"env_var", [NEUROMATION_ROOT_ENV_VAR, f"{NEUROMATION_ROOT_ENV_VAR}=value"]
)
def test_build_env_reserved_env_var_conflict_passed_as_parameter(env_var: str) -> None:
env = ("ENV_VAR_1=value", "ENV_VAR_2=value", env_var)
with pytest.raises(
click.UsageError,
match="Unable to re-define system-reserved environment variable",
):
build_env(env, env_file=None)


@pytest.mark.parametrize(
"env_var", [NEUROMATION_ROOT_ENV_VAR, f"{NEUROMATION_ROOT_ENV_VAR}=value"]
)
def test_build_env_reserved_env_var_conflict_passed_in_file(
env_var: str, tmp_path: Path
) -> None:
env_1 = ("ENV_VAR_1=value",)
env_2 = ("ENV_VAR_2=value", env_var)
env_file = tmp_path / "env_var.txt"
env_file.write_text("\n".join(env_2))

with pytest.raises(
click.UsageError,
match="Unable to re-define system-reserved environment variable",
):
build_env(env_1, env_file=str(env_file))
2 changes: 1 addition & 1 deletion tests/e2e/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -348,7 +348,7 @@ async def wait_job_change_state_to(
async with api_get(timeout=CLIENT_TIMEOUT, path=self._nmrc_path) as client:
job = await client.jobs.status(job_id)
while target_state != job.status:
if stop_state == job.status:
if stop_state and stop_state == job.status:
raise JobWaitStateStopReached(
f"failed running job {job_id}: '{stop_state}'"
)
Expand Down
59 changes: 59 additions & 0 deletions tests/e2e/test_e2e_jobs.py
Original file line number Diff line number Diff line change
Expand Up @@ -1040,3 +1040,62 @@ def test_job_run_home_volumes_automount(helper: Helper, fakebrowser: Any) -> Non
params=("--volume", "HOME"),
wait_state=JobStatus.SUCCEEDED,
)


@pytest.mark.e2e
def test_job_run_volume_all(helper: Helper) -> None:
root_mountpoint = "/var/neuro"
cmd = " && ".join(
[
f"[ -d {root_mountpoint}/{helper.username} ]",
f"[ -d {root_mountpoint}/neuromation ]", # must be public
f"[ -d {root_mountpoint}/test2/public ]", # must be public
f"[ $NEUROMATION_ROOT == {root_mountpoint} ]",
f"[ $NEUROMATION_HOME == {root_mountpoint}/{helper.username} ]",
]
)
command = f"bash -c '{cmd}'"
img = UBUNTU_IMAGE_NAME

# first, run without --volume=ALL
captured = helper.run_cli(
["--quiet", "run", "--detach", "-s", "cpu-small", img, command]
)
job_id = captured.out
helper.wait_job_change_state_to(job_id, JobStatus.FAILED)

# then, run with --volume=ALL
captured = helper.run_cli(
["run", "--detach", "-s", "cpu-small", "--volume=ALL", img, command]
)
assert not captured.err
msg = (
"Storage mountpoints will be available as the environment variables:\n"
f" NEUROMATION_ROOT={root_mountpoint}\n"
f" NEUROMATION_HOME={root_mountpoint}/{helper.username}"
)
assert msg in captured.out
found_job_ids = re.findall("Job ID: (job-.+) Status:", captured.out)
assert len(found_job_ids) == 1
job_id = found_job_ids[0]
helper.wait_job_change_state_to(
job_id, JobStatus.SUCCEEDED, stop_state=JobStatus.FAILED
)


@pytest.mark.e2e
def test_job_run_volume_all_and_home(helper: Helper) -> None:
with pytest.raises(subprocess.CalledProcessError):
args = ["--volume", "ALL", "--volume", "HOME"]
captured = helper.run_cli(["job", "run", *args, UBUNTU_IMAGE_NAME, "sleep 30"])
msg = "Cannot use `--volume=ALL` together with other `--volume` options"
assert msg in captured.err


@pytest.mark.e2e
def test_job_run_volume_all_and_another(helper: Helper) -> None:
with pytest.raises(subprocess.CalledProcessError):
args = ["--volume", "ALL", "--volume", "storage::/home:ro"]
captured = helper.run_cli(["job", "run", *args, UBUNTU_IMAGE_NAME, "sleep 30"])
msg = "Cannot use `--volume=ALL` together with other `--volume` options"
assert msg in captured.err

0 comments on commit 0518c51

Please sign in to comment.