diff --git a/CHANGELOG.D/879.feature b/CHANGELOG.D/879.feature new file mode 100644 index 000000000..2875e5fff --- /dev/null +++ b/CHANGELOG.D/879.feature @@ -0,0 +1 @@ +Stabilize jobs API \ No newline at end of file diff --git a/neuromation/api/__init__.py b/neuromation/api/__init__.py index afc17827e..e3d0259f0 100644 --- a/neuromation/api/__init__.py +++ b/neuromation/api/__init__.py @@ -26,7 +26,6 @@ from .jobs import ( Container, HTTPPort, - Image, JobDescription, JobStatus, JobStatusHistory, @@ -45,7 +44,6 @@ "DEFAULT_CONFIG_PATH", "CONFIG_ENV_NAME", "DockerImageOperation", - "Image", "ImageNameParser", "JobDescription", "JobStatus", diff --git a/neuromation/api/client.py b/neuromation/api/client.py index 90f7a09e1..859bfadc5 100644 --- a/neuromation/api/client.py +++ b/neuromation/api/client.py @@ -42,7 +42,7 @@ def __init__( self._core = _Core( connector, self._config.url, self._config.auth_token.token, cookie, timeout ) - self._jobs = Jobs._create(self._core, self._config) + self._jobs = Jobs._create(self._core, self._config, self.username) self._storage = Storage._create(self._core, self._config) self._users = Users._create(self._core) self._images: Optional[Images] = None diff --git a/neuromation/api/jobs.py b/neuromation/api/jobs.py index 3139793c8..848b42769 100644 --- a/neuromation/api/jobs.py +++ b/neuromation/api/jobs.py @@ -3,7 +3,7 @@ import json import shlex from dataclasses import dataclass, field -from typing import Any, AsyncIterator, Dict, List, Optional, Sequence, Set +from typing import Any, AsyncIterator, Dict, List, Mapping, Optional, Sequence, Set import async_timeout import attr @@ -24,46 +24,13 @@ class Resources: memory_mb: int cpu: float gpu: Optional[int] - shm: Optional[bool] gpu_model: Optional[str] - - @classmethod - def create( - cls, - cpu: float, - gpu: Optional[int], - gpu_model: Optional[str], - memory: int, - extshm: bool, - ) -> "Resources": - return cls(memory, cpu, gpu, extshm, gpu_model) - - def to_api(self) -> Dict[str, Any]: - value = {"memory_mb": self.memory_mb, "cpu": self.cpu, "shm": self.shm} - if self.gpu: - value["gpu"] = self.gpu - value["gpu_model"] = self.gpu_model # type: ignore - return value - - @classmethod - def from_api(cls, data: Dict[str, Any]) -> "Resources": - return Resources( - memory_mb=data["memory_mb"], - cpu=data["cpu"], - shm=data.get("shm", None), - gpu=data.get("gpu", None), - gpu_model=data.get("gpu_model", None), - ) - - -@dataclass(frozen=True) -class Image: - image: str - command: Optional[str] + shm: Optional[bool] class JobStatus(str, enum.Enum): """An Enum subclass that represents job statuses. + PENDING: a job is being created and scheduled. This includes finding (and possibly waiting for) sufficient amount of resources, pulling an image from a registry etc. @@ -86,61 +53,12 @@ class Volume: container_path: str read_only: bool - def to_api(self) -> Dict[str, Any]: - resp: Dict[str, Any] = { - "src_storage_uri": self.storage_path, - "dst_path": self.container_path, - "read_only": bool(self.read_only), - } - return resp - - @classmethod - def from_api(cls, data: Dict[str, Any]) -> "Volume": - storage_path = data["src_storage_uri"] - container_path = data["dst_path"] - read_only = data.get("read_only", True) - return Volume( - storage_path=storage_path, - container_path=container_path, - read_only=read_only, - ) - - @classmethod - def from_cli(cls, username: str, volume: str) -> "Volume": - parts = volume.split(":") - - read_only = False - if len(parts) == 4: - if parts[-1] not in ["ro", "rw"]: - raise ValueError(f"Wrong ReadWrite/ReadOnly mode spec for '{volume}'") - read_only = parts.pop() == "ro" - elif len(parts) != 3: - raise ValueError(f"Invalid volume specification '{volume}'") - - container_path = parts.pop() - storage_path = normalize_storage_path_uri(URL(":".join(parts)), username) - - return Volume( - storage_path=str(storage_path), - container_path=container_path, - read_only=read_only, - ) - @dataclass(frozen=True) class HTTPPort: port: int requires_auth: bool = True - def to_api(self) -> Dict[str, Any]: - return {"port": self.port, "requires_auth": self.requires_auth} - - @classmethod - def from_api(cls, data: Dict[str, Any]) -> "HTTPPort": - return HTTPPort( - port=data.get("port", -1), requires_auth=data.get("requires_auth", False) - ) - @dataclass(frozen=True) class Container: @@ -148,36 +66,9 @@ class Container: resources: Resources command: Optional[str] = None http: Optional[HTTPPort] = None - # TODO (ASvetlov): replace mutable Dict and List with immutable Mapping and Sequence - env: Dict[str, str] = field(default_factory=dict) + env: Mapping[str, str] = field(default_factory=dict) volumes: Sequence[Volume] = field(default_factory=list) - @classmethod - def from_api(cls, data: Dict[str, Any]) -> "Container": - return Container( - image=data["image"], - resources=Resources.from_api(data["resources"]), - command=data.get("command", None), - http=HTTPPort.from_api(data["http"]) if "http" in data else None, - env=data.get("env", dict()), - volumes=[Volume.from_api(v) for v in data.get("volumes", [])], - ) - - def to_api(self) -> Dict[str, Any]: - primitive: Dict[str, Any] = { - "image": self.image, - "resources": self.resources.to_api(), - } - if self.command: - primitive["command"] = self.command - if self.http: - primitive["http"] = self.http.to_api() - if self.env: - primitive["env"] = self.env - if self.volumes: - primitive["volumes"] = [v.to_api() for v in self.volumes] - return primitive - @dataclass(frozen=True) class JobStatusHistory: @@ -206,48 +97,6 @@ class JobDescription: ssh_server: URL = URL() internal_hostname: Optional[str] = None - def jump_host(self) -> Optional[str]: - ssh_hostname = self.ssh_server.host - if ssh_hostname is None: - return None - ssh_hostname = ".".join(ssh_hostname.split(".")[1:]) - return ssh_hostname - - @classmethod - def from_api(cls, res: Dict[str, Any]) -> "JobDescription": - container = Container.from_api(res["container"]) - owner = res["owner"] - name = res.get("name") - description = res.get("description") - history = JobStatusHistory( - status=JobStatus(res["history"].get("status", "unknown")), - reason=res["history"].get("reason", ""), - description=res["history"].get("description", ""), - created_at=res["history"].get("created_at", ""), - started_at=res["history"].get("started_at", ""), - finished_at=res["history"].get("finished_at", ""), - exit_code=res["history"].get("exit_code"), - ) - http_url = URL(res.get("http_url", "")) - http_url_named = URL(res.get("http_url_named", "")) - ssh_server = URL(res.get("ssh_server", "")) - internal_hostname = res.get("internal_hostname", None) - return JobDescription( - status=JobStatus(res["status"]), - id=res["id"], - owner=owner, - history=history, - container=container, - is_preemptible=res["is_preemptible"], - name=name, - description=description, - http_url=http_url, - http_url_named=http_url_named, - ssh_server=ssh_server, - ssh_auth_server=URL(res["ssh_auth_server"]), - internal_hostname=internal_hostname, - ) - @dataclass(frozen=True) class JobTelemetry: @@ -257,54 +106,24 @@ class JobTelemetry: gpu_duty_cycle: Optional[int] = None gpu_memory: Optional[float] = None - @classmethod - def from_api(cls, value: Dict[str, Any]) -> "JobTelemetry": - return cls( - cpu=value["cpu"], - memory=value["memory"], - timestamp=value["timestamp"], - gpu_duty_cycle=value.get("gpu_duty_cycle"), - gpu_memory=value.get("gpu_memory"), - ) - class Jobs(metaclass=NoPublicConstructor): - def __init__(self, core: _Core, config: _Config) -> None: + def __init__(self, core: _Core, config: _Config, username: str) -> None: self._core = core self._config = config + self._username = username - async def submit( + async def run( self, + container: Container, *, - image: Image, - resources: Resources, - http: Optional[HTTPPort] = None, - volumes: Optional[List[Volume]] = None, name: Optional[str] = None, description: Optional[str] = None, is_preemptible: bool = False, - env: Optional[Dict[str, str]] = None, ) -> JobDescription: - if env is None: - real_env: Dict[str, str] = {} - else: - real_env = env - if volumes is not None: - volumes = volumes - else: - volumes = [] - container = Container( - image=image.image, - command=image.command, - http=http, - resources=resources, - env=real_env, - volumes=volumes, - ) - url = URL("jobs") payload: Dict[str, Any] = { - "container": container.to_api(), + "container": _container_to_api(container), "is_preemptible": is_preemptible, } if name: @@ -313,7 +132,7 @@ async def submit( payload["description"] = description async with self._core.request("POST", url, json=payload) as resp: res = await resp.json() - return JobDescription.from_api(res) + return _job_description_from_api(res) async def list( self, *, statuses: Optional[Set[JobStatus]] = None, name: Optional[str] = None @@ -327,7 +146,7 @@ async def list( params.add("name", name) async with self._core.request("GET", url, params=params) as resp: ret = await resp.json() - return [JobDescription.from_api(j) for j in ret["jobs"]] + return [_job_description_from_api(j) for j in ret["jobs"]] async def kill(self, id: str) -> None: url = URL(f"jobs/{id}") @@ -335,9 +154,7 @@ async def kill(self, id: str) -> None: # an error is raised for status >= 400 return None # 201 status code - async def monitor( - self, id: str - ) -> Any: # real type is async generator with data chunks + async def monitor(self, id: str) -> AsyncIterator[bytes]: url = self._config.cluster_config.monitoring_url / f"{id}/log" timeout = attr.evolve(self._core.timeout, sock_read=None) async with self._core.request( @@ -350,14 +167,14 @@ async def status(self, id: str) -> JobDescription: url = URL(f"jobs/{id}") async with self._core.request("GET", url) as resp: ret = await resp.json() - return JobDescription.from_api(ret) + return _job_description_from_api(ret) async def top(self, id: str) -> AsyncIterator[JobTelemetry]: url = self._config.cluster_config.monitoring_url / f"{id}/top" try: received_any = False async for resp in self._core.ws_connect(url): - yield JobTelemetry.from_api(resp.json()) # type: ignore + yield _job_telemetry_from_api(resp.json()) # type: ignore received_any = True if not received_any: raise ValueError(f"Job is not running. Job Id = {id}") @@ -472,3 +289,148 @@ async def port_forward( await kill_proc_tree(proc.pid, timeout=10) # add a sleep to get process watcher a chance to execute all callbacks await asyncio.sleep(0.1) + + def parse_volume(self, volume: str) -> Volume: + parts = volume.split(":") + + read_only = False + if len(parts) == 4: + if parts[-1] not in ["ro", "rw"]: + raise ValueError(f"Wrong ReadWrite/ReadOnly mode spec for '{volume}'") + read_only = parts.pop() == "ro" + elif len(parts) != 3: + raise ValueError(f"Invalid volume specification '{volume}'") + + container_path = parts.pop() + storage_path = normalize_storage_path_uri(URL(":".join(parts)), self._username) + + return Volume( + storage_path=str(storage_path), + container_path=container_path, + read_only=read_only, + ) + + +# ############## Internal helpers ################### + + +def _resources_to_api(resources: Resources) -> Dict[str, Any]: + value = { + "memory_mb": resources.memory_mb, + "cpu": resources.cpu, + "shm": resources.shm, + } + if resources.gpu: + value["gpu"] = resources.gpu + value["gpu_model"] = resources.gpu_model # type: ignore + return value + + +def _resources_from_api(data: Dict[str, Any]) -> Resources: + return Resources( + memory_mb=data["memory_mb"], + cpu=data["cpu"], + shm=data.get("shm", None), + gpu=data.get("gpu", None), + gpu_model=data.get("gpu_model", None), + ) + + +def _http_port_to_api(port: HTTPPort) -> Dict[str, Any]: + return {"port": port.port, "requires_auth": port.requires_auth} + + +def _http_port_from_api(data: Dict[str, Any]) -> HTTPPort: + return HTTPPort( + port=data.get("port", -1), requires_auth=data.get("requires_auth", False) + ) + + +def _container_from_api(data: Dict[str, Any]) -> Container: + return Container( + image=data["image"], + resources=_resources_from_api(data["resources"]), + command=data.get("command", None), + http=_http_port_from_api(data["http"]) if "http" in data else None, + env=data.get("env", dict()), + volumes=[_volume_from_api(v) for v in data.get("volumes", [])], + ) + + +def _container_to_api(container: Container) -> Dict[str, Any]: + primitive: Dict[str, Any] = { + "image": container.image, + "resources": _resources_to_api(container.resources), + } + if container.command: + primitive["command"] = container.command + if container.http: + primitive["http"] = _http_port_to_api(container.http) + if container.env: + primitive["env"] = container.env + if container.volumes: + primitive["volumes"] = [_volume_to_api(v) for v in container.volumes] + return primitive + + +def _job_description_from_api(res: Dict[str, Any]) -> JobDescription: + container = _container_from_api(res["container"]) + owner = res["owner"] + name = res.get("name") + description = res.get("description") + history = JobStatusHistory( + status=JobStatus(res["history"].get("status", "unknown")), + reason=res["history"].get("reason", ""), + description=res["history"].get("description", ""), + created_at=res["history"].get("created_at", ""), + started_at=res["history"].get("started_at", ""), + finished_at=res["history"].get("finished_at", ""), + exit_code=res["history"].get("exit_code"), + ) + http_url = URL(res.get("http_url", "")) + http_url_named = URL(res.get("http_url_named", "")) + ssh_server = URL(res.get("ssh_server", "")) + internal_hostname = res.get("internal_hostname", None) + return JobDescription( + status=JobStatus(res["status"]), + id=res["id"], + owner=owner, + history=history, + container=container, + is_preemptible=res["is_preemptible"], + name=name, + description=description, + http_url=http_url, + http_url_named=http_url_named, + ssh_server=ssh_server, + ssh_auth_server=URL(res["ssh_auth_server"]), + internal_hostname=internal_hostname, + ) + + +def _job_telemetry_from_api(value: Dict[str, Any]) -> JobTelemetry: + return JobTelemetry( + cpu=value["cpu"], + memory=value["memory"], + timestamp=value["timestamp"], + gpu_duty_cycle=value.get("gpu_duty_cycle"), + gpu_memory=value.get("gpu_memory"), + ) + + +def _volume_to_api(volume: Volume) -> Dict[str, Any]: + resp: Dict[str, Any] = { + "src_storage_uri": volume.storage_path, + "dst_path": volume.container_path, + "read_only": bool(volume.read_only), + } + return resp + + +def _volume_from_api(data: Dict[str, Any]) -> Volume: + storage_path = data["src_storage_uri"] + container_path = data["dst_path"] + read_only = data.get("read_only", True) + return Volume( + storage_path=storage_path, container_path=container_path, read_only=read_only + ) diff --git a/neuromation/api/storage.py b/neuromation/api/storage.py index 1813f2e84..c4f8c6770 100644 --- a/neuromation/api/storage.py +++ b/neuromation/api/storage.py @@ -49,16 +49,6 @@ def is_dir(self) -> bool: def name(self) -> str: return Path(self.path).name - @classmethod - def from_api(cls, values: Dict[str, Any]) -> "FileStatus": - return cls( - path=values["path"], - type=FileStatusType(values["type"]), - size=int(values["length"]), - modification_time=int(values["modificationTime"]), - permission=values["permission"], - ) - class Storage(metaclass=NoPublicConstructor): def __init__(self, core: _Core, config: _Config) -> None: @@ -77,7 +67,7 @@ async def ls(self, uri: URL) -> List[FileStatus]: async with self._core.request("GET", url) as resp: res = await resp.json() return [ - FileStatus.from_api(status) + _file_status_from_api(status) for status in res["FileStatuses"]["FileStatus"] ] @@ -127,7 +117,7 @@ async def stats(self, uri: URL) -> FileStatus: async with self._core.request("GET", url) as resp: res = await resp.json() - return FileStatus.from_api(res["FileStatus"]) + return _file_status_from_api(res["FileStatus"]) async def _is_dir(self, uri: URL) -> bool: if uri.scheme == "storage": @@ -316,3 +306,13 @@ async def download_dir( ) else: log.warning("Cannot download %s", child) # pragma: no cover + + +def _file_status_from_api(values: Dict[str, Any]) -> FileStatus: + return FileStatus( + path=values["path"], + type=FileStatusType(values["type"]), + size=int(values["length"]), + modification_time=int(values["modificationTime"]), + permission=values["permission"], + ) diff --git a/neuromation/cli/job.py b/neuromation/cli/job.py index 85a0d3a80..72bbc2aa4 100644 --- a/neuromation/cli/job.py +++ b/neuromation/cli/job.py @@ -9,9 +9,9 @@ import click from neuromation.api import ( + Container, DockerImage, HTTPPort, - Image, ImageNameParser, JobDescription, JobStatus, @@ -674,8 +674,6 @@ async def run_job( if browse and not wait_start: raise click.UsageError("Cannot use --browse and --no-wait-start together") - username = root.username - env_dict = build_env(env, env_file) cmd = " ".join(cmd) if cmd is not None else None @@ -683,21 +681,22 @@ async def run_job( log.info(f"Using image '{image.as_url_str()}'") log.debug(f"IMAGE: {image}") - image_obj = Image(image=image.as_repo_str(), command=cmd) - resources = Resources.create(cpu, gpu, gpu_model, memory, extshm) + resources = Resources(memory, cpu, gpu, gpu_model, extshm) volumes: Set[Volume] = set() for v in volume: if v == "HOME": - volumes.add(Volume.from_cli(username, "storage://~:/var/storage/home:rw")) volumes.add( - Volume.from_cli( - username, "storage://neuromation:/var/storage/neuromation:ro" + root.client.jobs.parse_volume("storage://~:/var/storage/home:rw") + ) + volumes.add( + root.client.jobs.parse_volume( + "storage://neuromation:/var/storage/neuromation:ro" ) ) else: - volumes.add(Volume.from_cli(username, v)) + volumes.add(root.client.jobs.parse_volume(v)) if volumes: log.info( @@ -705,15 +704,17 @@ async def run_job( + "\n".join(f" {volume_to_verbose_str(v)}" for v in volumes) ) - job = await root.client.jobs.submit( - image=image_obj, - resources=resources, + container = Container( + image=image.as_repo_str(), + command=cmd, http=HTTPPort(http, http_auth) if http else None, - volumes=list(volumes) if volumes else None, - is_preemptible=preemptible, - name=name, - description=description, + resources=resources, env=env_dict, + volumes=list(volumes), + ) + + job = await root.client.jobs.run( + container, is_preemptible=preemptible, name=name, description=description ) click.echo(JobFormatter(root.quiet)(job)) progress = JobStartProgress.create(tty=root.tty, color=root.color, quiet=root.quiet) diff --git a/neuromation/cli/ssh_utils.py b/neuromation/cli/ssh_utils.py deleted file mode 100644 index 82ee0bc01..000000000 --- a/neuromation/cli/ssh_utils.py +++ /dev/null @@ -1,69 +0,0 @@ -import asyncio - -import aiohttp - -from neuromation.api import Client, JobDescription - - -def _validate_args_for_ssh_session( - container_user: str, container_key: str, jump_host_key: str -) -> None: - # Temporal solution - pending custom Jump Server with JWT support - if not container_user: - raise ValueError("Specify container user name") - if not container_key: - raise ValueError("Specify container RSA key path.") - if not jump_host_key: - raise ValueError( - "Configure Github RSA key path." "See for more info `neuro config`." - ) - - -def _validate_job_status_for_ssh_session(job_status: JobDescription) -> None: - if job_status.status == "running": - if job_status.ssh_server: - pass - else: - raise ValueError("Job should be started with SSH support.") - else: - raise ValueError(f"Job is not running. Job status is {job_status.status}") - - -async def _start_ssh_tunnel( - job_status: JobDescription, - jump_host: str, - jump_user: str, - jump_key: str, - local_port: int, -) -> None: - _validate_job_status_for_ssh_session(job_status) - proc = await asyncio.create_subprocess_exec( - "ssh", - "-i", - jump_key, - f"{jump_user}@{jump_host}", - "-f", - "-N", - "-L", - f"{local_port}:{job_status.id}:22", - ) - await proc.wait() - - -async def remote_debug( - client: Client, job_id: str, jump_host_key: str, local_port: int -) -> None: - if not jump_host_key: - raise ValueError( - "Configure Github RSA key path." "See for more info `neuro config`." - ) - try: - job_status = await client.jobs.status(job_id) - except aiohttp.ClientError as e: - raise ValueError(f"Job not found. Job Id = {job_id}") from e - ssh_hostname = job_status.jump_host() - if not ssh_hostname: - raise RuntimeError("Job has no SSH server enabled") - await _start_ssh_tunnel( - job_status, ssh_hostname, client.username, jump_host_key, local_port - ) diff --git a/tests/api/test_config_factory.py b/tests/api/test_config_factory.py index e2b749d89..5c3cf5558 100644 --- a/tests/api/test_config_factory.py +++ b/tests/api/test_config_factory.py @@ -2,13 +2,13 @@ from pathlib import Path from typing import Any, Dict from unittest import mock -from uuid import uuid4 as uuid import aiohttp import pytest import yaml from aiohttp import web from aiohttp.test_utils import TestServer as _TestServer +from jose import jwt from yarl import URL import neuromation.api.config_factory @@ -24,11 +24,6 @@ from tests import _TestServerFactory -@pytest.fixture -def token() -> str: - return str(uuid()) - - @pytest.fixture def tmp_home(tmp_path: Path, monkeypatch: Any) -> Path: monkeypatch.setattr(Path, "home", lambda: tmp_path) # Like as it's not enough @@ -39,10 +34,10 @@ def tmp_home(tmp_path: Path, monkeypatch: Any) -> Path: @pytest.fixture def config_file( - tmp_home: Path, auth_config: _AuthConfig, cluster_config: _ClusterConfig + tmp_home: Path, token: str, auth_config: _AuthConfig, cluster_config: _ClusterConfig ) -> Path: config_path = tmp_home / ".nmrc" - _create_config(config_path, auth_config, cluster_config) + _create_config(config_path, token, auth_config, cluster_config) return config_path @@ -121,9 +116,11 @@ async def token(request: web.Request) -> web.Response: def _create_config( - nmrc_path: Path, auth_config: _AuthConfig, cluster_config: _ClusterConfig + nmrc_path: Path, + token: str, + auth_config: _AuthConfig, + cluster_config: _ClusterConfig, ) -> str: - token = str(uuid()) config = _Config( auth_config=auth_config, auth_token=_AuthToken.create_non_expiring(token), @@ -148,26 +145,40 @@ async def test_config_file_is_dir(self, tmp_home: Path) -> None: await Factory().get() async def test_default_path( - self, tmp_home: Path, auth_config: _AuthConfig, cluster_config: _ClusterConfig + self, + tmp_home: Path, + token: str, + auth_config: _AuthConfig, + cluster_config: _ClusterConfig, ) -> None: - token = _create_config(tmp_home / ".nmrc", auth_config, cluster_config) + token = _create_config(tmp_home / ".nmrc", token, auth_config, cluster_config) client = await Factory().get() await client.close() assert client._config.auth_token.token == token async def test_shorten_path( - self, tmp_home: Path, auth_config: _AuthConfig, cluster_config: _ClusterConfig + self, + tmp_home: Path, + token: str, + auth_config: _AuthConfig, + cluster_config: _ClusterConfig, ) -> None: - token = _create_config(tmp_home / "test.nmrc", auth_config, cluster_config) + token = _create_config( + tmp_home / "test.nmrc", token, auth_config, cluster_config + ) client = await Factory(Path("~/test.nmrc")).get() await client.close() assert client._config.auth_token.token == token async def test_full_path( - self, tmp_home: Path, auth_config: _AuthConfig, cluster_config: _ClusterConfig + self, + tmp_home: Path, + token: str, + auth_config: _AuthConfig, + cluster_config: _ClusterConfig, ) -> None: config_path = tmp_home / "test.nmrc" - token = _create_config(config_path, auth_config, cluster_config) + token = _create_config(config_path, token, auth_config, cluster_config) client = await Factory(config_path).get() await client.close() assert client._config.auth_token.token == token @@ -175,7 +186,7 @@ async def test_full_path( async def test_token_autorefreshing( self, config_file: Path, monkeypatch: Any ) -> None: - new_token = str(uuid()) + "changed" * 10 # token must has other size + new_token = jwt.encode({"identity": "new_user"}, "secret", algorithm="HS256") async def _refresh_token_mock( connector: aiohttp.BaseConnector, diff --git a/tests/api/test_jobs.py b/tests/api/test_jobs.py index 6fe5ed87f..ae9bb0f45 100644 --- a/tests/api/test_jobs.py +++ b/tests/api/test_jobs.py @@ -5,15 +5,15 @@ from neuromation.api import ( Client, + Container, HTTPPort, - Image, - JobDescription, JobStatus, JobTelemetry, ResourceNotFound, Resources, Volume, ) +from neuromation.api.jobs import _job_description_from_api from tests import _TestServerFactory @@ -249,7 +249,7 @@ async def handler(request: web.Request) -> web.Response: async with make_client(srv.make_url("/")) as client: ret = await client.jobs.status("job-id") - assert ret == JobDescription.from_api(JSON) + assert ret == _job_description_from_api(JSON) async def test_status_with_ssh_and_http( @@ -308,7 +308,7 @@ async def handler(request: web.Request) -> web.Response: async with make_client(srv.make_url("/")) as client: ret = await client.jobs.status("job-id") - assert ret == JobDescription.from_api(JSON) + assert ret == _job_description_from_api(JSON) async def test_job_submit( @@ -381,8 +381,7 @@ async def handler(request: web.Request) -> web.Response: srv = await aiohttp_server(app) async with make_client(srv.make_url("/")) as client: - image = Image(image="submit-image-name", command="submit-command") - resources = Resources.create(7, 1, "test-gpu-model", 16384, True) + resources = Resources(16384, 7, 1, "test-gpu-model", True) volumes: List[Volume] = [ Volume("storage://test-user/path_read_only", "/container/read_only", True), Volume( @@ -391,15 +390,16 @@ async def handler(request: web.Request) -> web.Response: False, ), ] - ret = await client.jobs.submit( - image=image, + container = Container( + image="submit-image-name", + command="submit-command", resources=resources, - http=HTTPPort(8181), volumes=volumes, - is_preemptible=False, + http=HTTPPort(8181), ) + ret = await client.jobs.run(container=container, is_preemptible=False) - assert ret == JobDescription.from_api(JSON) + assert ret == _job_description_from_api(JSON) async def test_job_submit_with_name_and_description( @@ -476,8 +476,7 @@ async def handler(request: web.Request) -> web.Response: srv = await aiohttp_server(app) async with make_client(srv.make_url("/")) as client: - image = Image(image="submit-image-name", command="submit-command") - resources = Resources.create(7, 1, "test-gpu-model", 16384, True) + resources = Resources(16384, 7, 1, "test-gpu-model", True) volumes: List[Volume] = [ Volume("storage://test-user/path_read_only", "/container/read_only", True), Volume( @@ -486,16 +485,20 @@ async def handler(request: web.Request) -> web.Response: False, ), ] - ret = await client.jobs.submit( - image=image, + container = Container( + image="submit-image-name", + command="submit-command", resources=resources, - http=HTTPPort(8181), volumes=volumes, + http=HTTPPort(8181), + ) + ret = await client.jobs.run( + container, is_preemptible=False, name="test-job-name", description="job description", ) - assert ret == JobDescription.from_api(JSON) + assert ret == _job_description_from_api(JSON) async def test_job_submit_no_volumes( @@ -518,7 +521,7 @@ async def test_job_submit_no_volumes( "image": "gcr.io/light-reality-205619/ubuntu:latest", "command": "date", "resources": { - "cpu": 1.0, + "cpu": 7, "memory_mb": 16384, "gpu": 1, "shm": False, @@ -540,7 +543,7 @@ async def handler(request: web.Request) -> web.Response: "http": {"port": 8181, "requires_auth": True}, "resources": { "memory_mb": 16384, - "cpu": 7.0, + "cpu": 7, "shm": True, "gpu": 1, "gpu_model": "test-gpu-model", @@ -559,19 +562,21 @@ async def handler(request: web.Request) -> web.Response: srv = await aiohttp_server(app) async with make_client(srv.make_url("/")) as client: - image = Image(image="submit-image-name", command="submit-command") - resources = Resources.create(7, 1, "test-gpu-model", 16384, True) - ret = await client.jobs.submit( - image=image, + resources = Resources(16384, 7, 1, "test-gpu-model", True) + container = Container( + image="submit-image-name", + command="submit-command", resources=resources, http=HTTPPort(8181), - volumes=None, + ) + ret = await client.jobs.run( + container, is_preemptible=False, name="test-job-name", description="job description", ) - assert ret == JobDescription.from_api(JSON) + assert ret == _job_description_from_api(JSON) async def test_job_submit_preemptible( @@ -647,8 +652,7 @@ async def handler(request: web.Request) -> web.Response: srv = await aiohttp_server(app) async with make_client(srv.make_url("/")) as client: - image = Image(image="submit-image-name", command="submit-command") - resources = Resources.create(7, 1, "test-gpu-model", 16384, True) + resources = Resources(16384, 7, 1, "test-gpu-model", True) volumes: List[Volume] = [ Volume("storage://test-user/path_read_only", "/container/read_only", True), Volume( @@ -657,25 +661,30 @@ async def handler(request: web.Request) -> web.Response: False, ), ] - ret = await client.jobs.submit( - image=image, + container = Container( + image="submit-image-name", + command="submit-command", resources=resources, - http=HTTPPort(8181), volumes=volumes, + http=HTTPPort(8181), + ) + ret = await client.jobs.run( + container, is_preemptible=True, name="test-job-name", description="job description", ) - assert ret == JobDescription.from_api(JSON) + assert ret == _job_description_from_api(JSON) @pytest.mark.parametrize( "volume", ["storage:///", ":", "::::", "", "storage:///data/:/data/rest:wrong"] ) -def test_volume_from_str_fail(volume: str) -> None: - with pytest.raises(ValueError): - Volume.from_cli("testuser", volume) +async def test_volume_from_str_fail(volume: str, make_client: _MakeClient) -> None: + async with make_client("https://example.com") as client: + with pytest.raises(ValueError): + client.jobs.parse_volume(volume) def create_job_response( @@ -732,7 +741,7 @@ async def handler(request: web.Request) -> web.Response: async with make_client(srv.make_url("/")) as client: ret = await client.jobs.list() - job_descriptions = [JobDescription.from_api(job) for job in jobs] + job_descriptions = [_job_description_from_api(job) for job in jobs] assert ret == job_descriptions @@ -768,7 +777,7 @@ async def handler(request: web.Request) -> web.Response: async with make_client(srv.make_url("/")) as client: ret = await client.jobs.list(name=name_1) - job_descriptions = [JobDescription.from_api(job) for job in jobs] + job_descriptions = [_job_description_from_api(job) for job in jobs] assert ret == job_descriptions[:3] @@ -805,7 +814,7 @@ async def handler(request: web.Request) -> web.Response: async with make_client(srv.make_url("/")) as client: ret = await client.jobs.list(statuses=statuses) - job_descriptions = [JobDescription.from_api(job) for job in jobs] + job_descriptions = [_job_description_from_api(job) for job in jobs] assert ret == [job for job in job_descriptions if job.status in statuses] @@ -813,40 +822,46 @@ class TestVolumeParsing: @pytest.mark.parametrize( "volume_param", ["dir", "storage://dir", "storage://dir:/var/www:rw:ro"] ) - def test_incorrect_params_count(self, volume_param: str) -> None: - with pytest.raises(ValueError, match=r"Invalid volume specification"): - Volume.from_cli("bob", volume_param) + async def test_incorrect_params_count( + self, volume_param: str, make_client: _MakeClient + ) -> None: + async with make_client("https://example.com") as client: + with pytest.raises(ValueError, match=r"Invalid volume specification"): + client.jobs.parse_volume(volume_param) @pytest.mark.parametrize( "volume_param", ["storage://dir:/var/www:write", "storage://dir:/var/www:"] ) - def test_incorrect_mode(self, volume_param: str) -> None: - with pytest.raises(ValueError, match=r"Wrong ReadWrite/ReadOnly mode spec"): - Volume.from_cli("bob", volume_param) + async def test_incorrect_mode( + self, volume_param: str, make_client: _MakeClient + ) -> None: + async with make_client("https://example.com") as client: + with pytest.raises(ValueError, match=r"Wrong ReadWrite/ReadOnly mode spec"): + client.jobs.parse_volume(volume_param) @pytest.mark.parametrize( "volume_param,volume", [ ( - "storage://bob/dir:/var/www", + "storage://user/dir:/var/www", Volume( - storage_path="storage://bob/dir", + storage_path="storage://user/dir", container_path="/var/www", read_only=False, ), ), ( - "storage://bob/dir:/var/www:rw", + "storage://user/dir:/var/www:rw", Volume( - storage_path="storage://bob/dir", + storage_path="storage://user/dir", container_path="/var/www", read_only=False, ), ), ( - "storage://bob:/var/www:ro", + "storage://user:/var/www:ro", Volume( - storage_path="storage://bob", + storage_path="storage://user", container_path="/var/www", read_only=True, ), @@ -854,7 +869,7 @@ def test_incorrect_mode(self, volume_param: str) -> None: ( "storage://~/:/var/www:ro", Volume( - storage_path="storage://bob", + storage_path="storage://user", container_path="/var/www", read_only=True, ), @@ -862,7 +877,7 @@ def test_incorrect_mode(self, volume_param: str) -> None: ( "storage:dir:/var/www:ro", Volume( - storage_path="storage://bob/dir", + storage_path="storage://user/dir", container_path="/var/www", read_only=True, ), @@ -870,15 +885,18 @@ def test_incorrect_mode(self, volume_param: str) -> None: ( "storage::/var/www:ro", Volume( - storage_path="storage://bob", + storage_path="storage://user", container_path="/var/www", read_only=True, ), ), ], ) - def test_positive(self, volume_param: str, volume: Volume) -> None: - assert Volume.from_cli("bob", volume_param) == volume + async def test_positive( + self, volume_param: str, volume: Volume, make_client: _MakeClient + ) -> None: + async with make_client("https://example.com") as client: + assert client.jobs.parse_volume(volume_param) == volume async def test_list_filter_by_name_and_statuses( @@ -919,5 +937,5 @@ async def handler(request: web.Request) -> web.Response: async with make_client(srv.make_url("/")) as client: ret = await client.jobs.list(statuses=statuses, name=name) - job_descriptions = [JobDescription.from_api(job) for job in jobs] + job_descriptions = [_job_description_from_api(job) for job in jobs] assert ret == job_descriptions[:2] diff --git a/tests/api/test_storage_filestatus.py b/tests/api/test_storage_filestatus.py index 7c70c7ded..9be710346 100644 --- a/tests/api/test_storage_filestatus.py +++ b/tests/api/test_storage_filestatus.py @@ -1,8 +1,9 @@ -from neuromation.api import FileStatus, FileStatusType +from neuromation.api import FileStatusType +from neuromation.api.storage import _file_status_from_api def test_from_api() -> None: - stat = FileStatus.from_api( + stat = _file_status_from_api( { "path": "name", "type": "FILE", @@ -19,7 +20,7 @@ def test_from_api() -> None: def test_file() -> None: - stat = FileStatus.from_api( + stat = _file_status_from_api( { "path": "name", "type": "FILE", @@ -34,7 +35,7 @@ def test_file() -> None: def test_is_dir() -> None: - stat = FileStatus.from_api( + stat = _file_status_from_api( { "path": "name", "type": "DIRECTORY", @@ -49,7 +50,7 @@ def test_is_dir() -> None: def test_name() -> None: - stat = FileStatus.from_api( + stat = _file_status_from_api( { "path": "name", "type": "FILE", diff --git a/tests/cli/test_formatters.py b/tests/cli/test_formatters.py index ab7b77be1..79cb6c51d 100644 --- a/tests/cli/test_formatters.py +++ b/tests/cli/test_formatters.py @@ -68,7 +68,7 @@ def job_descr_no_name() -> JobDescription: finished_at="2018-09-25T12:28:59.759433+00:00", ), container=Container( - image="ubuntu:latest", resources=Resources.create(0.1, 0, None, 16, False) + image="ubuntu:latest", resources=Resources(16, 0.1, 0, None, False) ), ssh_auth_server=URL("ssh-auth"), is_preemptible=True, @@ -91,7 +91,7 @@ def job_descr() -> JobDescription: finished_at="2018-09-25T12:28:59.759433+00:00", ), container=Container( - image="ubuntu:latest", resources=Resources.create(0.1, 0, None, 16, False) + image="ubuntu:latest", resources=Resources(16, 0.1, 0, None, False) ), ssh_auth_server=URL("ssh-auth"), is_preemptible=True, @@ -192,7 +192,7 @@ def make_job(self, status: JobStatus, reason: str) -> JobDescription: container=Container( command="test-command", image="test-image", - resources=Resources.create(0.1, 0, None, 16, False), + resources=Resources(16, 0.1, 0, None, False), ), ssh_auth_server=URL("ssh-auth"), is_preemptible=False, @@ -260,7 +260,7 @@ def test_job_with_name(self) -> None: container=Container( command="test-command", image="test-image", - resources=Resources.create(0.1, 0, None, 16, False), + resources=Resources(16, 0.1, 0, None, False), http=HTTPPort(port=80, requires_auth=True), ), ssh_auth_server=URL("ssh-auth"), @@ -309,7 +309,7 @@ def test_pending_job(self) -> None: container=Container( command="test-command", image="test-image", - resources=Resources.create(0.1, 0, None, 16, False), + resources=Resources(16, 0.1, 0, None, False), http=HTTPPort(port=80, requires_auth=True), ), ssh_auth_server=URL("ssh-auth"), @@ -353,7 +353,7 @@ def test_pending_job_no_reason(self) -> None: container=Container( command="test-command", image="test-image", - resources=Resources.create(0.1, 0, None, 16, False), + resources=Resources(16, 0.1, 0, None, False), ), ssh_auth_server=URL("ssh-auth"), is_preemptible=True, @@ -390,7 +390,7 @@ def test_pending_job_with_reason(self) -> None: container=Container( image="test-image", command="test-command", - resources=Resources.create(0.1, 0, None, 16, False), + resources=Resources(16, 0.1, 0, None, False), ), ssh_auth_server=URL("ssh-auth"), is_preemptible=True, @@ -427,7 +427,7 @@ def test_pending_job_no_description(self) -> None: container=Container( image="test-image", command="test-command", - resources=Resources.create(0.1, 0, None, 16, False), + resources=Resources(16, 0.1, 0, None, False), ), ssh_auth_server=URL("ssh-auth"), is_preemptible=True, @@ -466,7 +466,7 @@ def test_running_job(self) -> None: container=Container( command="test-command", image="test-image", - resources=Resources.create(0.1, 0, None, 16, False), + resources=Resources(16, 0.1, 0, None, False), ), ssh_auth_server=URL("ssh-auth"), is_preemptible=False, @@ -569,8 +569,7 @@ def test_list(self) -> None: finished_at="2018-09-25T12:28:59.759433+00:00", ), container=Container( - image="ubuntu:latest", - resources=Resources.create(0.1, 0, None, 16, False), + image="ubuntu:latest", resources=Resources(16, 0.1, 0, None, False) ), ssh_auth_server=URL("ssh-auth"), is_preemptible=True, @@ -589,8 +588,7 @@ def test_list(self) -> None: finished_at="2018-09-25T12:28:59.759433+00:00", ), container=Container( - image="ubuntu:latest", - resources=Resources.create(0.1, 0, None, 16, False), + image="ubuntu:latest", resources=Resources(16, 0.1, 0, None, False) ), ssh_auth_server=URL("ssh-auth"), is_preemptible=True, @@ -625,9 +623,7 @@ def _job_descr_with_status( finished_at="2017-03-04T12:28:59.759433+00:00", ), container=Container( - image=image, - resources=Resources.create(0.1, 0, None, 16, False), - command="ls", + image=image, resources=Resources(16, 0.1, 0, None, False), command="ls" ), ssh_auth_server=URL("ssh-auth"), is_preemptible=True, @@ -703,9 +699,7 @@ def test_short_cells(self) -> None: finished_at=datetime.fromtimestamp(time.time() - 1).isoformat(), ), container=Container( - image="i:l", - resources=Resources.create(0.1, 0, None, 16, False), - command="c", + image="i:l", resources=Resources(16, 0.1, 0, None, False), command="c" ), ssh_auth_server=URL("ssh-auth"), is_preemptible=True, @@ -745,7 +739,7 @@ def test_wide_cells(self) -> None: ), container=Container( image="some-image-name:with-long-tag", - resources=Resources.create(0.1, 0, None, 16, False), + resources=Resources(16, 0.1, 0, None, False), command="ls -la /some/path", ), ssh_auth_server=URL("ssh-auth"), @@ -767,7 +761,7 @@ def test_wide_cells(self) -> None: ), container=Container( image="some-image-name:with-long-tag", - resources=Resources.create(0.1, 0, None, 16, False), + resources=Resources(16, 0.1, 0, None, False), command="ls -la /some/path", ), ssh_auth_server=URL("ssh-auth"), @@ -1141,9 +1135,7 @@ def test_sorter(self) -> None: class TestResourcesFormatter: def test_tiny_container(self) -> None: - resources = Resources.create( - cpu=0.1, gpu=0, gpu_model=None, memory=16, extshm=False - ) + resources = Resources(cpu=0.1, gpu=0, gpu_model=None, memory_mb=16, shm=False) resource_formatter = ResourcesFormatter() assert ( resource_formatter(resources) == "Resources:\n" @@ -1152,8 +1144,8 @@ def test_tiny_container(self) -> None: ) def test_gpu_container(self) -> None: - resources = Resources.create( - cpu=2, gpu=1, gpu_model="nvidia-tesla-p4", memory=1024, extshm=False + resources = Resources( + cpu=2, gpu=1, gpu_model="nvidia-tesla-p4", memory_mb=1024, shm=False ) resource_formatter = ResourcesFormatter() assert ( @@ -1164,9 +1156,7 @@ def test_gpu_container(self) -> None: ) def test_shm_container(self) -> None: - resources = Resources.create( - cpu=0.1, gpu=0, gpu_model=None, memory=16, extshm=True - ) + resources = Resources(cpu=0.1, gpu=0, gpu_model=None, memory_mb=16, shm=True) resource_formatter = ResourcesFormatter() assert ( resource_formatter(resources) == "Resources:\n" diff --git a/tests/e2e/test_e2e_jobs.py b/tests/e2e/test_e2e_jobs.py index 2843baddb..a72aec944 100644 --- a/tests/e2e/test_e2e_jobs.py +++ b/tests/e2e/test_e2e_jobs.py @@ -12,7 +12,7 @@ from aiohttp.test_utils import unused_port from yarl import URL -from neuromation.api import Image, JobStatus, Resources, get as api_get +from neuromation.api import Container, JobStatus, Resources, get as api_get from neuromation.utils import run as run_async from tests.e2e import Helper @@ -664,13 +664,14 @@ async def nginx_job_async( f"bash -c \"echo -n '{secret}' > /usr/share/nginx/html/secret.txt; " f"timeout 15m /usr/sbin/nginx -g 'daemon off;'\"" ) - job = await client.jobs.submit( - image=Image(NGINX_IMAGE_NAME, command=command), - resources=Resources.create(0.1, None, None, 20, True), - is_preemptible=False, - volumes=None, - description="test NGINX job", - env={}, + container = Container( + image=NGINX_IMAGE_NAME, + command=command, + resources=Resources(20, 0.1, None, None, True), + ) + + job = await client.jobs.run( + container, is_preemptible=False, description="test NGINX job" ) try: for i in range(60):