From 46362c3507294c73ccb60bf06509a6710fbc8ad4 Mon Sep 17 00:00:00 2001 From: Artem Yushkovskiy Date: Thu, 12 Mar 2020 16:35:07 +0300 Subject: [PATCH 1/5] first --- README.md | 8 +++ neuromation/api/jobs.py | 6 ++ neuromation/cli/job.py | 35 +++++++++- tests/api/test_jobs.py | 139 ++++++++++++++++++++++++++++++++++++++++ 4 files changed, 187 insertions(+), 1 deletion(-) diff --git a/README.md b/README.md index 3dc918900..785bac273 100644 --- a/README.md +++ b/README.md @@ -423,6 +423,7 @@ Name | Description| |_--http PORT_|Enable HTTP port forwarding to container \[default: 80]| |_\--http-auth / --no-http-auth_|Enable HTTP authentication for forwarded HTTP port \[default: True]| |_\-n, --name NAME_|Optional job name| +|_--tag TAG_|Optional job tag, multiple values allowed| |_\-d, --description DESC_|Optional job description in free format| |_\-q, --quiet_|Run command in quiet mode \(DEPRECATED)| |_\-v, --volume MOUNT_|Mounts directory from vault into container. Use multiple options to mount more than one volume. --volume=HOME is an alias for storage::/var/storage/home:rw and storage://neuromation/public:/var/storage/neuromation:ro| @@ -480,6 +481,7 @@ Name | Description| |_\--http-auth / --no-http-auth_|Enable HTTP authentication for forwarded HTTP port \[default: True]| |_\-p, --preemptible / -P, --non-preemptible_|Run job on a lower-cost preemptible instance \[default: False]| |_\-n, --name NAME_|Optional job name| +|_--tag TAG_|Optional job tag, multiple values allowed| |_\-d, --description DESC_|Optional job description in free format| |_\-q, --quiet_|Run command in quiet mode \(DEPRECATED)| |_\-v, --volume MOUNT_|Mounts directory from vault into container. Use multiple options to mount more than one volume. --volume=HOME is an alias for storage::/var/storage/home:rw and storage://neuromation/public:/var/storage/neuromation:ro| @@ -516,6 +518,7 @@ neuro ps -a --owner=user-1 --owner=user-2 neuro ps --name my-experiments-v1 -s failed -s succeeded neuro ps --description=my favourite job neuro ps -s failed -s succeeded -q +neuro ps --tag tag1 -t tag2 ``` @@ -527,6 +530,7 @@ Name | Description| |_\-o, --owner TEXT_|Filter out jobs by owner \(multiple option).| |_\-a, --all_|Show all jobs regardless the status \(equivalent to `\-s pending -s running -s succeeded -s failed`).| |_\-n, --name NAME_|Filter out jobs by name.| +|_\-t, --tag TAG_|Filter out jobs by tag \(multiple option)| |_\-d, --description DESCRIPTION_|Filter out jobs by description \(exact match).| |_\-q, --quiet_|Run command in quiet mode \(DEPRECATED)| |_\-w, --wide_|Do not cut long lines for terminal width.| @@ -1669,6 +1673,7 @@ Name | Description| |_--http PORT_|Enable HTTP port forwarding to container \[default: 80]| |_\--http-auth / --no-http-auth_|Enable HTTP authentication for forwarded HTTP port \[default: True]| |_\-n, --name NAME_|Optional job name| +|_--tag TAG_|Optional job tag, multiple values allowed| |_\-d, --description DESC_|Optional job description in free format| |_\-q, --quiet_|Run command in quiet mode \(DEPRECATED)| |_\-v, --volume MOUNT_|Mounts directory from vault into container. Use multiple options to mount more than one volume. --volume=HOME is an alias for storage::/var/storage/home:rw and storage://neuromation/public:/var/storage/neuromation:ro| @@ -1726,6 +1731,7 @@ Name | Description| |_\--http-auth / --no-http-auth_|Enable HTTP authentication for forwarded HTTP port \[default: True]| |_\-p, --preemptible / -P, --non-preemptible_|Run job on a lower-cost preemptible instance \[default: False]| |_\-n, --name NAME_|Optional job name| +|_--tag TAG_|Optional job tag, multiple values allowed| |_\-d, --description DESC_|Optional job description in free format| |_\-q, --quiet_|Run command in quiet mode \(DEPRECATED)| |_\-v, --volume MOUNT_|Mounts directory from vault into container. Use multiple options to mount more than one volume. --volume=HOME is an alias for storage::/var/storage/home:rw and storage://neuromation/public:/var/storage/neuromation:ro| @@ -1762,6 +1768,7 @@ neuro ps -a --owner=user-1 --owner=user-2 neuro ps --name my-experiments-v1 -s failed -s succeeded neuro ps --description=my favourite job neuro ps -s failed -s succeeded -q +neuro ps --tag tag1 -t tag2 ``` @@ -1773,6 +1780,7 @@ Name | Description| |_\-o, --owner TEXT_|Filter out jobs by owner \(multiple option).| |_\-a, --all_|Show all jobs regardless the status \(equivalent to `\-s pending -s running -s succeeded -s failed`).| |_\-n, --name NAME_|Filter out jobs by name.| +|_\-t, --tag TAG_|Filter out jobs by tag \(multiple option)| |_\-d, --description DESCRIPTION_|Filter out jobs by description \(exact match).| |_\-q, --quiet_|Run command in quiet mode \(DEPRECATED)| |_\-w, --wide_|Do not cut long lines for terminal width.| diff --git a/neuromation/api/jobs.py b/neuromation/api/jobs.py index b26e0f9dc..06f9685df 100644 --- a/neuromation/api/jobs.py +++ b/neuromation/api/jobs.py @@ -135,6 +135,7 @@ async def run( container: Container, *, name: Optional[str] = None, + tags: Sequence[str] = (), description: Optional[str] = None, is_preemptible: bool = False, schedule_timeout: Optional[float] = None, @@ -147,6 +148,8 @@ async def run( } if name: payload["name"] = name + if tags: + payload["tags"] = tags if description: payload["description"] = description if schedule_timeout: @@ -164,6 +167,7 @@ async def list( *, statuses: Iterable[JobStatus] = (), name: str = "", + tags: Iterable[str] = (), owners: Iterable[str] = (), ) -> List[JobDescription]: url = self._config.api_url / "jobs" @@ -174,6 +178,8 @@ async def list( params.add("name", name) for owner in owners: params.add("owner", owner) + for tag in tags: + params.add("tag", tag) params["cluster_name"] = self._config.cluster_name auth = await self._config._api_auth() async with self._core.request("GET", url, params=params, auth=auth) as resp: diff --git a/neuromation/cli/job.py b/neuromation/cli/job.py index cdeb00f11..c020cf01f 100644 --- a/neuromation/cli/job.py +++ b/neuromation/cli/job.py @@ -201,6 +201,13 @@ def job() -> None: default=None, secure=True, ) +@option( + "--tag", + metavar="TAG", + type=str, + help="Optional job tag, multiple values allowed", + multiple=True, +) @option( "-d", "--description", @@ -298,6 +305,7 @@ async def submit( life_span: Optional[str], preemptible: bool, name: Optional[str], + tag: Sequence[str], description: Optional[str], wait_start: bool, pass_config: bool, @@ -344,6 +352,7 @@ async def submit( life_span=life_span, preemptible=preemptible, name=name, + tags=tag, description=description, wait_start=wait_start, pass_config=pass_config, @@ -516,6 +525,14 @@ async def _print_logs(root: Root, job: str) -> None: ), ) @option("-n", "--name", metavar="NAME", help="Filter out jobs by name.", secure=True) +@option( + "-t", + "--tag", + metavar="TAG", + type=str, + help="Filter out jobs by tag (multiple option)", + multiple=True, +) @option( "-d", "--description", @@ -542,6 +559,7 @@ async def ls( status: Sequence[str], all: bool, name: str, + tag: Sequence[str], owner: Sequence[str], description: str, wide: bool, @@ -557,13 +575,17 @@ async def ls( neuro ps --name my-experiments-v1 -s failed -s succeeded neuro ps --description="my favourite job" neuro ps -s failed -s succeeded -q + neuro ps --tag tag1 -t tag2 """ format = await calc_columns(root.client, format) statuses = calc_statuses(status, all) owners = set(owner) - jobs = await root.client.jobs.list(statuses=statuses, name=name, owners=owners) + tags = set(tag) + jobs = await root.client.jobs.list( + statuses=statuses, name=name, owners=owners, tags=tags + ) # client-side filtering if description: @@ -750,6 +772,13 @@ def format_fail(job: str, reason: Exception) -> str: default=None, secure=True, ) +@option( + "--tag", + metavar="TAG", + type=str, + help="Optional job tag, multiple values allowed", + multiple=True, +) @option( "-d", "--description", @@ -843,6 +872,7 @@ async def run( life_span: Optional[str], preemptible: Optional[bool], name: Optional[str], + tag: Sequence[str], description: Optional[str], wait_start: bool, pass_config: bool, @@ -897,6 +927,7 @@ async def run( life_span=life_span, preemptible=job_preset.is_preemptible, name=name, + tags=tag, description=description, wait_start=wait_start, pass_config=pass_config, @@ -944,6 +975,7 @@ async def run_job( life_span: Optional[str], preemptible: bool, name: Optional[str], + tags: Sequence[str], description: Optional[str], wait_start: bool, pass_config: bool, @@ -1021,6 +1053,7 @@ async def run_job( container, is_preemptible=preemptible, name=name, + tags=tags, description=description, life_span=job_life_span, ) diff --git a/tests/api/test_jobs.py b/tests/api/test_jobs.py index e4978e7d4..c0a98f600 100644 --- a/tests/api/test_jobs.py +++ b/tests/api/test_jobs.py @@ -815,6 +815,105 @@ async def handler(request: web.Request) -> web.Response: assert ret == _job_description_from_api(JSON, client.parse) +async def test_job_run_with_tags( + aiohttp_server: _TestServerFactory, make_client: _MakeClient +) -> None: + JSON = { + "id": "job-cf519ed3-9ea5-48f6-a8c5-492b810eb56f", + "tags": ["t1", "t2", "t3"], + "status": "failed", + "history": { + "status": "failed", + "reason": "Error", + "description": "Mounted on Avail\\n/dev/shm " "64M\\n\\nExit code: 1", + "created_at": "2018-09-25T12:28:21.298672+00:00", + "started_at": "2018-09-25T12:28:59.759433+00:00", + "finished_at": "2018-09-25T12:28:59.759433+00:00", + }, + "owner": "owner", + "cluster_name": "default", + "container": { + "image": "gcr.io/light-reality-205619/ubuntu:latest", + "command": "date", + "resources": { + "cpu": 1.0, + "memory_mb": 16384, + "gpu": 1, + "shm": False, + "gpu_model": "nvidia-tesla-p4", + }, + }, + "http_url": "http://my_host:8889", + "ssh_server": "ssh://my_host.ssh:22", + "ssh_auth_server": "ssh://my_host.ssh:22", + "is_preemptible": False, + } + + async def handler(request: web.Request) -> web.Response: + data = await request.json() + assert data == { + "container": { + "image": "submit-image-name", + "command": "submit-command", + "http": {"port": 8181, "requires_auth": True}, + "resources": { + "memory_mb": 16384, + "cpu": 7.0, + "shm": True, + "gpu": 1, + "gpu_model": "test-gpu-model", + }, + "volumes": [ + { + "src_storage_uri": "storage://test-user/path_read_only", + "dst_path": "/container/read_only", + "read_only": True, + }, + { + "src_storage_uri": "storage://test-user/path_read_write", + "dst_path": "/container/path_read_write", + "read_only": False, + }, + ], + }, + "is_preemptible": False, + "tags": ["t1", "t2", "t3"], + "cluster_name": "default", + } + + return web.json_response(JSON) + + app = web.Application() + app.router.add_post("/jobs", handler) + + srv = await aiohttp_server(app) + + async with make_client(srv.make_url("/")) as client: + resources = Resources(16384, 7, 1, "test-gpu-model", True, None, None) + volumes: List[Volume] = [ + Volume( + URL("storage://test-user/path_read_only"), "/container/read_only", True + ), + Volume( + URL("storage://test-user/path_read_write"), + "/container/path_read_write", + False, + ), + ] + container = Container( + image=RemoteImage("submit-image-name"), + command="submit-command", + resources=resources, + volumes=volumes, + http=HTTPPort(8181), + ) + ret = await client.jobs.run( + container, is_preemptible=False, tags=["t1", "t2", "t3"], + ) + + assert ret == _job_description_from_api(JSON, client.parse) + + async def test_job_run_no_volumes( aiohttp_server: _TestServerFactory, make_client: _MakeClient ) -> None: @@ -1190,6 +1289,7 @@ def create_job_response( owner: str = "owner", name: Optional[str] = None, image: str = "submit-image-name", + tags: Optional[List[str]] = None, ) -> Dict[str, Any]: result = { "id": id, @@ -1219,6 +1319,8 @@ def create_job_response( } if name: result["name"] = name + if tags: + result["tags"] = tags return result @@ -1483,6 +1585,43 @@ async def handler(request: web.Request) -> web.Response: assert ret == job_descriptions[:2] +async def test_list_filter_by_tags( + aiohttp_server: _TestServerFactory, make_client: _MakeClient +) -> None: + jobs = [ + # under filter: + create_job_response("job-id-1", "running", tags=["t1", "t2", "t3"]), + create_job_response("job-id-2", "running", tags=["t1"]), + create_job_response("job-id-3", "running", tags=["t2"]), + # out of filter: + create_job_response("job-id-4", "running", tags=["t4"]), + create_job_response("job-id-5", "running"), + ] + + async def handler(request: web.Request) -> web.Response: + request_tags = set(request.query.getall("tag")) + filtered_jobs = [] + for job in jobs: + tags = job.get("tags") + if tags and set(tags).intersection(request_tags): + filtered_jobs.append(job) + JSON = {"jobs": filtered_jobs} + return web.json_response(JSON) + + app = web.Application() + app.router.add_get("/jobs", handler) + srv = await aiohttp_server(app) + + tags = {"t1", "t2"} + async with make_client(srv.make_url("/")) as client: + ret = await client.jobs.list(tags=tags) + + job_descriptions = [ + _job_description_from_api(job, client.parse) for job in jobs + ] + assert ret == job_descriptions[:3] + + async def test_list_filter_by_name_and_statuses_and_owners( aiohttp_server: _TestServerFactory, make_client: _MakeClient ) -> None: From 5b22a693f85a017d3db38eb611e1383056365678 Mon Sep 17 00:00:00 2001 From: Artem Yushkovskiy Date: Thu, 12 Mar 2020 16:38:36 +0300 Subject: [PATCH 2/5] add CLI changelog --- CHANGELOG.D/1393.feature | 1 + 1 file changed, 1 insertion(+) create mode 100644 CHANGELOG.D/1393.feature diff --git a/CHANGELOG.D/1393.feature b/CHANGELOG.D/1393.feature new file mode 100644 index 000000000..3702dfad4 --- /dev/null +++ b/CHANGELOG.D/1393.feature @@ -0,0 +1 @@ +Support job tags: `neuro run --tag=experiment-1`, `neuro ps --tag=experiment-1`. From 94595a5b458499606cc604614d4956499d849e38 Mon Sep 17 00:00:00 2001 From: Artem Yushkovskiy Date: Thu, 12 Mar 2020 16:48:08 +0300 Subject: [PATCH 3/5] API documentation + JobDescription --- docs/jobs_reference.rst | 14 ++++++++++++++ neuromation/api/jobs.py | 3 +++ 2 files changed, 17 insertions(+) diff --git a/docs/jobs_reference.rst b/docs/jobs_reference.rst index 150e3ea57..c22e329dc 100644 --- a/docs/jobs_reference.rst +++ b/docs/jobs_reference.rst @@ -45,6 +45,7 @@ Jobs .. comethod:: list(*, statuses: Iterable[JobStatus] = (), \ name: str = "" \ + tags: Sequence[str] = (), \ owners: Iterable[str] = (), ) -> List[JobDescription] @@ -69,6 +70,11 @@ Jobs Empty string means that no filter is applied (default). + :param str name: Filter jobs by :attr:`~JobDescription.tags`. Retrieves all + jobs submitted with at least one tag from the specified list. + + Empty list means that no filter is applied (default). + :param ~typing.Iterable[str] owners: filter jobs by their owners. The parameter can be a set or list of owner @@ -113,6 +119,7 @@ Jobs .. comethod:: run(container: Container, \ *, \ name: Optional[str] = None, \ + tags: Sequence[str] = (), \ description: Optional[str] = None, \ is_preemptible: bool = False, \ schedule_timeout: Optional[float] = None, \ @@ -125,6 +132,8 @@ Jobs :param str name: optional container name. + :param str name: optional job tags. + :param str desciption: optional container description. :param bool is_preemtible: a flag that specifies is the job is *preemptible* or @@ -269,6 +278,11 @@ JobDescription Job name provided by user at creation time, :class:`str` or ``None`` if name is omitted. + .. attribute:: tags + + List of job tags provided by user at creation time, :class:`Sequence[str]` or + ``()`` if tags omitted. + .. attribute:: description Job description text provided by user at creation time, :class:`str` or ``None`` diff --git a/neuromation/api/jobs.py b/neuromation/api/jobs.py index 06f9685df..91a7ca6ef 100644 --- a/neuromation/api/jobs.py +++ b/neuromation/api/jobs.py @@ -109,6 +109,7 @@ class JobDescription: container: Container is_preemptible: bool name: Optional[str] = None + tags: Sequence[str] = () description: Optional[str] = None http_url: URL = URL() ssh_server: URL = URL() @@ -519,6 +520,7 @@ def _job_description_from_api(res: Dict[str, Any], parse: Parser) -> JobDescript owner = res["owner"] cluster_name = res["cluster_name"] name = res.get("name") + tags = res.get("tags", ()) description = res.get("description") history = JobStatusHistory( status=JobStatus(res["history"].get("status", "unknown")), @@ -542,6 +544,7 @@ def _job_description_from_api(res: Dict[str, Any], parse: Parser) -> JobDescript container=container, is_preemptible=res["is_preemptible"], name=name, + tags=tags, description=description, http_url=http_url_named or http_url, ssh_server=ssh_server, From 2eb556e266bbd1e5aff0ca0328511f658325edbf Mon Sep 17 00:00:00 2001 From: Artem Yushkovskiy Date: Thu, 12 Mar 2020 17:26:16 +0300 Subject: [PATCH 4/5] add-e2e-test --- tests/e2e/test_e2e_jobs.py | 28 ++++++++++++++++++++++++++++ 1 file changed, 28 insertions(+) diff --git a/tests/e2e/test_e2e_jobs.py b/tests/e2e/test_e2e_jobs.py index a9d681731..d4fd92b64 100644 --- a/tests/e2e/test_e2e_jobs.py +++ b/tests/e2e/test_e2e_jobs.py @@ -162,6 +162,34 @@ def test_job_description(helper: Helper) -> None: helper.kill_job(job_id, wait=False) +@pytest.mark.e2e +def test_job_tags(helper: Helper) -> None: + tags = [f"test_job_tags:{uuid4()}", "tag:common"] + tag_options = [key for pair in [("--opt", t) for t in tags] for key in pair] + + command = "sleep 10m" + captured = helper.run_cli( + [ + "job", + "run", + "-s", + JOB_TINY_CONTAINER_PRESET, + *tag_options, + "--no-wait-start", + UBUNTU_IMAGE_NAME, + command, + ] + ) + match = re.match("Job ID: (.+) Status:", captured.out) + assert match is not None + job_id = match.group(1) + + captured = helper.run_cli(["ps", *tag_options]) + store_out_list = captured.out.split("\n")[1:] + jobs = [x.split(" ")[0] for x in store_out_list] + assert jobs == [job_id] + + @pytest.mark.e2e def test_job_kill_non_existing(helper: Helper) -> None: # try to kill non existing job From 499cec2b3061f3d88a914f26f40a5f64f4bf61ac Mon Sep 17 00:00:00 2001 From: Artem Yushkovskiy Date: Fri, 13 Mar 2020 17:15:08 +0300 Subject: [PATCH 5/5] fix --- tests/e2e/test_e2e_jobs.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/tests/e2e/test_e2e_jobs.py b/tests/e2e/test_e2e_jobs.py index d4fd92b64..c909461ea 100644 --- a/tests/e2e/test_e2e_jobs.py +++ b/tests/e2e/test_e2e_jobs.py @@ -164,8 +164,8 @@ def test_job_description(helper: Helper) -> None: @pytest.mark.e2e def test_job_tags(helper: Helper) -> None: - tags = [f"test_job_tags:{uuid4()}", "tag:common"] - tag_options = [key for pair in [("--opt", t) for t in tags] for key in pair] + tags = [f"test-tag:{uuid4()}", "test-tag:common"] + tag_options = [key for pair in [("--tag", t) for t in tags] for key in pair] command = "sleep 10m" captured = helper.run_cli(