diff --git a/CHANGELOG.D/988.feature b/CHANGELOG.D/988.feature new file mode 100644 index 000000000..4beea4f9d --- /dev/null +++ b/CHANGELOG.D/988.feature @@ -0,0 +1 @@ +Support job operations via job-URI (e.g., `neuro status job://owner-name/job-name`). diff --git a/neuromation/api/url_utils.py b/neuromation/api/url_utils.py index 4ef807275..e8276292d 100644 --- a/neuromation/api/url_utils.py +++ b/neuromation/api/url_utils.py @@ -1,7 +1,7 @@ import re import sys from pathlib import Path -from typing import Sequence +from typing import Sequence, Union from yarl import URL @@ -51,7 +51,8 @@ def normalize_storage_path_uri(uri: URL, username: str) -> URL: return _normalize_uri(uri, username) -def _normalize_uri(uri: URL, username: str) -> URL: +def _normalize_uri(resource: Union[URL, str], username: str) -> URL: + uri = resource if isinstance(resource, URL) else URL(resource) path = uri.path if not uri.host: if path.startswith("~"): diff --git a/neuromation/cli/job.py b/neuromation/cli/job.py index 082055d7e..722f8260f 100644 --- a/neuromation/cli/job.py +++ b/neuromation/cli/job.py @@ -334,7 +334,7 @@ async def exec( neuro exec --no-tty my-job ls -l """ cmd = shlex.split(" ".join(cmd)) - id = await resolve_job(root.client, job) + id = await resolve_job(job, client=root.client, default_user=root.username) retcode = await root.client.jobs.exec( id, cmd, @@ -376,7 +376,7 @@ async def port_forward( neuro job port-forward my-job- 2080:80 2222:22 2000:100 """ - job_id = await resolve_job(root.client, job) + job_id = await resolve_job(job, client=root.client, default_user=root.username) async with AsyncExitStack() as stack: for local_port, job_port in local_remote_port: click.echo( @@ -404,7 +404,7 @@ async def logs(root: Root, job: str) -> None: """ Print the logs for a container. """ - id = await resolve_job(root.client, job) + id = await resolve_job(job, client=root.client, default_user=root.username) await _print_logs(root, id) @@ -503,7 +503,7 @@ async def status(root: Root, job: str) -> None: """ Display status of a job. """ - id = await resolve_job(root.client, job) + id = await resolve_job(job, client=root.client, default_user=root.username) res = await root.client.jobs.status(id) click.echo(JobStatusFormatter()(res)) @@ -515,7 +515,7 @@ async def browse(root: Root, job: str) -> None: """ Opens a job's URL in a web browser. """ - id = await resolve_job(root.client, job) + id = await resolve_job(job, client=root.client, default_user=root.username) res = await root.client.jobs.status(id) await browse_job(root, res) @@ -528,7 +528,7 @@ async def top(root: Root, job: str) -> None: Display GPU/CPU/Memory usage. """ formatter = JobTelemetryFormatter() - id = await resolve_job(root.client, job) + id = await resolve_job(job, client=root.client, default_user=root.username) print_header = True async for res in root.client.jobs.top(id): if print_header: @@ -552,7 +552,7 @@ async def save(root: Root, job: str, image: RemoteImage) -> None: neuro job save my-favourite-job image://~/ubuntu-patched:v1 neuro job save my-favourite-job image://bob/ubuntu-patched """ - id = await resolve_job(root.client, job) + id = await resolve_job(job, client=root.client, default_user=root.username) progress = DockerImageProgress.create(tty=root.tty, quiet=root.quiet) with contextlib.closing(progress): await root.client.jobs.save(id, image, progress=progress) @@ -568,7 +568,9 @@ async def kill(root: Root, jobs: Sequence[str]) -> None: """ errors = [] for job in jobs: - job_resolved = await resolve_job(root.client, job) + job_resolved = await resolve_job( + job, client=root.client, default_user=root.username + ) try: await root.client.jobs.kill(job_resolved) # TODO (ajuszkowski) printing should be on the cli level diff --git a/neuromation/cli/utils.py b/neuromation/cli/utils.py index 5aa57de2c..30aa1d574 100644 --- a/neuromation/cli/utils.py +++ b/neuromation/cli/utils.py @@ -41,7 +41,7 @@ ) from neuromation.api.config import _CookieSession, _PyPIVersion from neuromation.api.parsing_utils import _ImageNameParser -from neuromation.api.url_utils import uri_from_cli +from neuromation.api.url_utils import _normalize_uri, uri_from_cli from .asyncio_utils import run from .parse_utils import to_megabytes @@ -436,13 +436,36 @@ def volume_to_verbose_str(volume: Volume) -> str: ) -async def resolve_job(client: Client, id_or_name: str) -> str: +async def resolve_job( + id_or_name_or_uri: str, *, client: Client, default_user: str +) -> str: + if id_or_name_or_uri.startswith("job:"): + uri = _normalize_uri(id_or_name_or_uri, username=default_user) + id_or_name = uri.path.lstrip("/") + owner = uri.host or default_user + if not id_or_name: + raise ValueError( + f"Invalid job URI: owner='{owner}', missing job-id or job-name" + ) + else: + id_or_name = id_or_name_or_uri + owner = default_user + jobs: List[JobDescription] = [] + details = f"name={id_or_name}, owner={owner}" try: - jobs = await client.jobs.list(name=id_or_name) + jobs = await client.jobs.list(name=id_or_name, owners={owner}) except Exception as e: - log.error(f"Failed to resolve job-name '{id_or_name}' to a job-ID: {e}") + log.error( + f"Failed to resolve job-name {id_or_name_or_uri} resolved as " + f"{details} to a job-ID: {e}" + ) if jobs: + if len(jobs) > 1: + log.warning( + f"Found {len(jobs)} jobs matching {details}: " + ", ".join(job.id for job in jobs) + ) job_id = jobs[-1].id log.debug(f"Job name '{id_or_name}' resolved to job ID '{job_id}'") else: diff --git a/tests/cli/test_utils.py b/tests/cli/test_utils.py index 9cce92628..3d9ccb2ae 100644 --- a/tests/cli/test_utils.py +++ b/tests/cli/test_utils.py @@ -1,5 +1,5 @@ from pathlib import Path -from typing import Any, Callable, Dict, Tuple +from typing import Any, Callable, Dict, NoReturn, Tuple import click import pytest @@ -21,15 +21,46 @@ _MakeClient = Callable[..., Client] -async def test_resolve_job_id__no_jobs_found( +def _job_entry(job_id: str) -> Dict[str, Any]: + return { + "id": job_id, + "owner": "job-owner", + "status": "running", + "history": { + "status": "running", + "reason": None, + "description": None, + "created_at": "2019-03-18T12:41:10.573468+00:00", + "started_at": "2019-03-18T12:41:16.804040+00:00", + }, + "container": { + "image": "ubuntu:latest", + "env": {}, + "volumes": [], + "command": "sleep 1h", + "resources": {"cpu": 0.1, "memory_mb": 1024, "shm": True}, + }, + "ssh_auth_server": "ssh://nobody@ssh-auth-dev.neu.ro:22", + "is_preemptible": True, + "name": "job-name", + "internal_hostname": "job-id.default", + } + + +async def test_resolve_job_id__from_string__no_jobs_found( aiohttp_server: _TestServerFactory, make_client: _MakeClient ) -> None: JSON: Dict[str, Any] = {"jobs": []} job_id = "job-81839be3-3ecf-4ec5-80d9-19b1588869db" - job_name_to_resolve = job_id async def handler(request: web.Request) -> web.Response: - assert request.query.get("name") == job_name_to_resolve + # Since `resolve_job` excepts any Exception, `assert` will be caught there + name = request.query.get("name") + if name != job_id: + pytest.fail(f"received: {name}") + owner = request.query.get("owner") + if owner != "default-owner": + pytest.fail(f"received: {owner}") return web.json_response(JSON) app = web.Application() @@ -38,45 +69,82 @@ async def handler(request: web.Request) -> web.Response: srv = await aiohttp_server(app) async with make_client(srv.make_url("/")) as client: - resolved = await resolve_job(client, job_name_to_resolve) + resolved = await resolve_job( + job_id, client=client, default_user="default-owner" + ) assert resolved == job_id -async def test_resolve_job_id__single_job_found( +async def test_resolve_job_id__from_uri_with_owner__no_jobs_found( aiohttp_server: _TestServerFactory, make_client: _MakeClient ) -> None: - job_name_to_resolve = "test-job-name-555" - JSON = { - "jobs": [ - { - "id": "job-efb7d723-722c-4d5c-a5db-de258db4b09e", - "owner": "test1", - "status": "running", - "history": { - "status": "running", - "reason": None, - "description": None, - "created_at": "2019-03-18T12:41:10.573468+00:00", - "started_at": "2019-03-18T12:41:16.804040+00:00", - }, - "container": { - "image": "ubuntu:latest", - "env": {}, - "volumes": [], - "command": "sleep 1h", - "resources": {"cpu": 0.1, "memory_mb": 1024, "shm": True}, - }, - "ssh_auth_server": "ssh://nobody@ssh-auth-dev.neu.ro:22", - "is_preemptible": True, - "name": job_name_to_resolve, - "internal_hostname": "job-efb7d723-722c-4d5c-a5db-de258db4b09e.default", - } - ] - } - job_id = JSON["jobs"][0]["id"] + job_owner = "job-owner" + job_name = "job-name" + uri = f"job://{job_owner}/{job_name}" + JSON: Dict[str, Any] = {"jobs": []} + + async def handler(request: web.Request) -> web.Response: + # Since `resolve_job` excepts any Exception, `assert` will be caught there + name = request.query.get("name") + if name != job_name: + pytest.fail(f"received: {name}") + owner = request.query.get("owner") + if owner != job_owner: + pytest.fail(f"received: {owner}") + return web.json_response(JSON) + + app = web.Application() + app.router.add_get("/jobs", handler) + + srv = await aiohttp_server(app) + + async with make_client(srv.make_url("/")) as client: + resolved = await resolve_job(uri, client=client, default_user="default-owner") + assert resolved == job_name + + +async def test_resolve_job_id__from_uri_without_owner__no_jobs_found( + aiohttp_server: _TestServerFactory, make_client: _MakeClient +) -> None: + job_name = "job-name" + uri = f"job:{job_name}" + JSON: Dict[str, Any] = {"jobs": []} + + async def handler(request: web.Request) -> web.Response: + # Since `resolve_job` excepts any Exception, `assert` will be caught there + name = request.query.get("name") + if name != job_name: + pytest.fail(f"received: {name}") + owner = request.query.get("owner") + if owner != "default-owner": + pytest.fail(f"received: {owner}") + return web.json_response(JSON) + + app = web.Application() + app.router.add_get("/jobs", handler) + + srv = await aiohttp_server(app) + + async with make_client(srv.make_url("/")) as client: + resolved = await resolve_job(uri, client=client, default_user="default-owner") + assert resolved == job_name + + +async def test_resolve_job_id__from_string__single_job_found( + aiohttp_server: _TestServerFactory, make_client: _MakeClient +) -> None: + job_name = "test-job-name-555" + job_id = "job-id-1" + JSON = {"jobs": [_job_entry(job_id)]} async def handler(request: web.Request) -> web.Response: - assert request.query.get("name") == job_name_to_resolve + # Since `resolve_job` excepts any Exception, `assert` will be caught there + name = request.query.get("name") + if name != job_name: + pytest.fail(f"received: {name}") + owner = request.query.get("owner") + if owner != "default-owner": + pytest.fail(f"received: {owner}") return web.json_response(JSON) app = web.Application() @@ -85,70 +153,29 @@ async def handler(request: web.Request) -> web.Response: srv = await aiohttp_server(app) async with make_client(srv.make_url("/")) as client: - resolved = await resolve_job(client, job_name_to_resolve) + resolved = await resolve_job( + job_name, client=client, default_user="default-owner" + ) assert resolved == job_id -async def test_resolve_job_id__multiple_jobs_found( +async def test_resolve_job_id__from_uri_with_owner__single_job_found( aiohttp_server: _TestServerFactory, make_client: _MakeClient ) -> None: - job_name_to_resolve = "job-name-123-000" - JSON = { - "jobs": [ - { - "id": "job-d912aa8c-d01b-44bd-b77c-5a19fc151f89", - "owner": "test1", - "status": "succeeded", - "history": { - "status": "succeeded", - "reason": None, - "description": None, - "created_at": "2019-03-17T16:24:54.746175+00:00", - "started_at": "2019-03-17T16:25:00.868880+00:00", - "finished_at": "2019-03-17T16:28:01.298487+00:00", - }, - "container": { - "image": "ubuntu:latest", - "env": {}, - "volumes": [], - "command": "sleep 3m", - "resources": {"cpu": 0.1, "memory_mb": 1024, "shm": True}, - }, - "ssh_auth_server": "ssh://nobody@ssh-auth-dev.neu.ro:22", - "is_preemptible": True, - "name": job_name_to_resolve, - "internal_hostname": "job-d912aa8c-d01b-44bd-b77c-5a19fc151f89.default", - }, - { - "id": "job-e5071b6b-2e97-4cce-b12d-86e31751dc8a", - "owner": "test1", - "status": "succeeded", - "history": { - "status": "succeeded", - "reason": None, - "description": None, - "created_at": "2019-03-18T11:31:03.669549+00:00", - "started_at": "2019-03-18T11:31:10.428975+00:00", - "finished_at": "2019-03-18T11:31:54.896666+00:00", - }, - "container": { - "image": "ubuntu:latest", - "env": {}, - "volumes": [], - "command": "sleep 5m", - "resources": {"cpu": 0.1, "memory_mb": 1024, "shm": True}, - }, - "ssh_auth_server": "ssh://nobody@ssh-auth-dev.neu.ro:22", - "is_preemptible": True, - "name": job_name_to_resolve, - "internal_hostname": "job-e5071b6b-2e97-4cce-b12d-86e31751dc8a.default", - }, - ] - } - job_id = JSON["jobs"][-1]["id"] + job_owner = "job-owner" + job_name = "job-name" + uri = f"job://{job_owner}/{job_name}" + job_id = "job-id-1" + JSON = {"jobs": [_job_entry(job_id)]} async def handler(request: web.Request) -> web.Response: - assert request.query.get("name") == job_name_to_resolve + # Since `resolve_job` excepts any Exception, `assert` will be caught there + name = request.query.get("name") + if name != job_name: + pytest.fail(f"received: {name}") + owner = request.query.get("owner") + if owner != job_owner: + pytest.fail(f"received: {owner}") return web.json_response(JSON) app = web.Application() @@ -157,18 +184,141 @@ async def handler(request: web.Request) -> web.Response: srv = await aiohttp_server(app) async with make_client(srv.make_url("/")) as client: - resolved = await resolve_job(client, job_name_to_resolve) + resolved = await resolve_job(uri, client=client, default_user="default-owner") assert resolved == job_id -async def test_resolve_job_id__server_error( +async def test_resolve_job_id__from_uri_without_owner__single_job_found( aiohttp_server: _TestServerFactory, make_client: _MakeClient ) -> None: - job_id = "job-81839be3-3ecf-4ec5-80d9-19b1588869db" - job_name_to_resolve = job_id + job_name = "job-name" + uri = f"job:{job_name}" + job_id = "job-id-1" + JSON = {"jobs": [_job_entry(job_id)]} + + async def handler(request: web.Request) -> web.Response: + # Since `resolve_job` excepts any Exception, `assert` will be caught there + name = request.query.get("name") + if name != job_name: + pytest.fail(f"received: {name}") + owner = request.query.get("owner") + if owner != "default-owner": + pytest.fail(f"received: {owner}") + return web.json_response(JSON) + + app = web.Application() + app.router.add_get("/jobs", handler) + + srv = await aiohttp_server(app) + + async with make_client(srv.make_url("/")) as client: + resolved = await resolve_job(uri, client=client, default_user="default-owner") + assert resolved == job_id + + +async def test_resolve_job_id__from_string__multiple_jobs_found( + aiohttp_server: _TestServerFactory, make_client: _MakeClient +) -> None: + job_name = "job-name-123-000" + job_id_1 = "job-id-1" + job_id_2 = "job-id-2" + JSON = {"jobs": [_job_entry(job_id_1), _job_entry(job_id_2)]} + + async def handler(request: web.Request) -> web.Response: + # Since `resolve_job` excepts any Exception, `assert` will be caught there + name = request.query.get("name") + if name != job_name: + pytest.fail(f"received: {name}") + owner = request.query.get("owner") + if owner != "default-owner": + pytest.fail(f"received: {owner}") + return web.json_response(JSON) + + app = web.Application() + app.router.add_get("/jobs", handler) + + srv = await aiohttp_server(app) + + async with make_client(srv.make_url("/")) as client: + resolved = await resolve_job( + job_name, client=client, default_user="default-owner" + ) + assert resolved == job_id_2 + + +async def test_resolve_job_id__from_uri_with_owner__multiple_jobs_found( + aiohttp_server: _TestServerFactory, make_client: _MakeClient +) -> None: + job_owner = "job-owner" + job_name = "job-name" + uri = f"job://{job_owner}/{job_name}" + job_id_1 = "job-id-1" + job_id_2 = "job-id-2" + JSON = {"jobs": [_job_entry(job_id_1), _job_entry(job_id_2)]} + + async def handler(request: web.Request) -> web.Response: + # Since `resolve_job` excepts any Exception, `assert` will be caught there + name = request.query.get("name") + if name != job_name: + pytest.fail(f"received: {name}") + owner = request.query.get("owner") + if owner != job_owner: + pytest.fail(f"received: {owner}") + return web.json_response(JSON) + + app = web.Application() + app.router.add_get("/jobs", handler) + + srv = await aiohttp_server(app) + + async with make_client(srv.make_url("/")) as client: + resolved = await resolve_job(uri, client=client, default_user="default-owner") + assert resolved == job_id_2 + + +async def test_resolve_job_id__from_uri_without_owner__multiple_jobs_found( + aiohttp_server: _TestServerFactory, make_client: _MakeClient +) -> None: + job_name = "job-name" + uri = f"job:{job_name}" + job_id_1 = "job-id-1" + job_id_2 = "job-id-2" + JSON = {"jobs": [_job_entry(job_id_1), _job_entry(job_id_2)]} async def handler(request: web.Request) -> web.Response: - assert request.query.get("name") == job_name_to_resolve + # Since `resolve_job` excepts any Exception, `assert` will be caught there + name = request.query.get("name") + if name != job_name: + pytest.fail(f"received: {name}") + owner = request.query.get("owner") + if owner != "default-owner": + pytest.fail(f"received: {owner}") + return web.json_response(JSON) + + app = web.Application() + app.router.add_get("/jobs", handler) + + srv = await aiohttp_server(app) + + async with make_client(srv.make_url("/")) as client: + resolved = await resolve_job(uri, client=client, default_user="default-owner") + assert resolved == job_id_2 + + +async def test_resolve_job_id__server_error( + aiohttp_server: _TestServerFactory, make_client: _MakeClient +) -> None: + job_id = "job-81839be3-3ecf-4ec5-80d9-19b1588869db" + job_name = job_id + + async def handler(request: web.Request) -> NoReturn: + # Since `resolve_job` excepts any Exception, `assert` will be caught there + name = request.query.get("name") + if name != job_name: + pytest.fail(f"received: {name}") + owner = request.query.get("owner") + if owner != "default-owner": + pytest.fail(f"received: {owner}") raise web.HTTPError() app = web.Application() @@ -177,10 +327,82 @@ async def handler(request: web.Request) -> web.Response: srv = await aiohttp_server(app) async with make_client(srv.make_url("/")) as client: - resolved = await resolve_job(client, job_name_to_resolve) + resolved = await resolve_job( + job_name, client=client, default_user="default-owner" + ) assert resolved == job_id +async def test_resolve_job_id__from_uri_with_owner__with_owner__server_error( + aiohttp_server: _TestServerFactory, make_client: _MakeClient +) -> None: + job_owner = "job-owner" + job_name = "job-name" + uri = f"job://{job_owner}/{job_name}" + + async def handler(request: web.Request) -> NoReturn: + # Since `resolve_job` excepts any Exception, `assert` will be caught there + name = request.query.get("name") + if name != job_name: + pytest.fail(f"received: {name}") + owner = request.query.get("owner") + if owner != job_owner: + pytest.fail(f"received: {owner}") + raise web.HTTPError() + + app = web.Application() + app.router.add_get("/jobs", handler) + + srv = await aiohttp_server(app) + + async with make_client(srv.make_url("/")) as client: + resolved = await resolve_job(uri, client=client, default_user="default-owner") + assert resolved == job_name + + +async def test_resolve_job_id__from_uri_without_owner__server_error( + aiohttp_server: _TestServerFactory, make_client: _MakeClient +) -> None: + job_name = "job-name" + uri = f"job:{job_name}" + + async def handler(request: web.Request) -> NoReturn: + # Since `resolve_job` excepts any Exception, `assert` will be caught there + name = request.query.get("name") + if name != job_name: + pytest.fail(f"received: {name}") + owner = request.query.get("owner") + if owner != "default-owner": + pytest.fail(f"received: {owner}") + raise web.HTTPError() + + app = web.Application() + app.router.add_get("/jobs", handler) + + srv = await aiohttp_server(app) + + async with make_client(srv.make_url("/")) as client: + resolved = await resolve_job(uri, client=client, default_user="default-owner") + assert resolved == job_name + + +async def test_resolve_job_id__from_uri__missing_job_id( + aiohttp_server: _TestServerFactory, make_client: _MakeClient +) -> None: + + uri = "job://job-name" + + app = web.Application() + srv = await aiohttp_server(app) + + async with make_client(srv.make_url("/")) as client: + with pytest.raises( + ValueError, + match="Invalid job URI: owner='job-name', missing job-id or job-name", + ): + await resolve_job(uri, client=client, default_user="default-owner") + + def test_parse_file_resource_no_scheme(root: Root) -> None: parsed = parse_file_resource("scheme-less/resource", root) assert parsed == URL((Path.cwd() / "scheme-less/resource").as_uri())