Skip to content

Commit

Permalink
Implement Attach/Exec via WebSockets (#1497)
Browse files Browse the repository at this point in the history
Co-authored-by: Serhiy Storchaka <[email protected]>
  • Loading branch information
asvetlov and serhiy-storchaka authored Jun 3, 2020
1 parent e879f01 commit dfe74a9
Show file tree
Hide file tree
Showing 21 changed files with 1,137 additions and 197 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.D/1497.feature
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Implement attach/exec/interactive-run using WebSockets.
35 changes: 27 additions & 8 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
* [neuro job top](#neuro-job-top)
* [neuro job save](#neuro-job-save)
* [neuro job browse](#neuro-job-browse)
* [neuro job attach](#neuro-job-attach)
* [neuro project](#neuro-project)
* [neuro project init](#neuro-project-init)
* [neuro storage](#neuro-storage)
Expand Down Expand Up @@ -392,6 +393,7 @@ Name | Description|
| _[neuro job top](#neuro-job-top)_| Display GPU/CPU/Memory usage |
| _[neuro job save](#neuro-job-save)_| Save job's state to an image |
| _[neuro job browse](#neuro-job-browse)_| Opens a job's URL in a web browser |
| _[neuro job attach](#neuro-job-attach)_| Print the logs for a container |



Expand Down Expand Up @@ -444,7 +446,7 @@ Name | Description|
|_\--pass-config / --no-pass-config_|Upload neuro config to the job \[default: False]|
|_--browse_|Open a job's URL in a web browser|
|_--detach_|Don't attach to job logs and don't wait for exit code|
|_\-t, --tty_|Allocate a TTY|
|_\-t, --tty / -T, --no-tty_|Allocate a TTY, can be useful for interactive jobs. By default is on if the command is executed from a terminal, non-tty mode is used if executed from a script.|
|_--help_|Show this message and exit.|


Expand Down Expand Up @@ -503,7 +505,7 @@ Name | Description|
|_\--pass-config / --no-pass-config_|Upload neuro config to the job \[default: False]|
|_--browse_|Open a job's URL in a web browser|
|_--detach_|Don't attach to job logs and don't wait for exit code|
|_\-t, --tty_|Allocate a TTY|
|_\-t, --tty / -T, --no-tty_|Allocate a TTY, can be useful for interactive jobs. By default is on if the command is executed from a terminal, non-tty mode is used if executed from a script.|
|_--help_|Show this message and exit.|


Expand Down Expand Up @@ -618,8 +620,7 @@ neuro exec --no-tty my-job ls -l

Name | Description|
|----|------------|
|_\-t, --tty / -T, --no-tty_|Allocate virtual tty. Useful for interactive jobs.|
|_\-i, --interactive / -I, --no-interactive_|Keep STDIN open even if not attached. On for tty by default, false otherwise.|
|_\-t, --tty / -T, --no-tty_|Allocate a TTY, can be useful for interactive jobs. By default is on if the command is executed from a terminal, non-tty mode is used if executed from a script.|
|_--timeout FLOAT_|Maximum allowed time for executing the command, 0 for no timeout \[default: 0]|
|_--help_|Show this message and exit.|

Expand Down Expand Up @@ -771,6 +772,25 @@ Name | Description|



### neuro job attach

Print the logs for a container.

**Usage:**

```bash
neuro job attach [OPTIONS] JOB
```

**Options:**

Name | Description|
|----|------------|
|_--help_|Show this message and exit.|




## neuro project

Project operations.
Expand Down Expand Up @@ -1841,7 +1861,7 @@ Name | Description|
|_\--pass-config / --no-pass-config_|Upload neuro config to the job \[default: False]|
|_--browse_|Open a job's URL in a web browser|
|_--detach_|Don't attach to job logs and don't wait for exit code|
|_\-t, --tty_|Allocate a TTY|
|_\-t, --tty / -T, --no-tty_|Allocate a TTY, can be useful for interactive jobs. By default is on if the command is executed from a terminal, non-tty mode is used if executed from a script.|
|_--help_|Show this message and exit.|


Expand Down Expand Up @@ -1900,7 +1920,7 @@ Name | Description|
|_\--pass-config / --no-pass-config_|Upload neuro config to the job \[default: False]|
|_--browse_|Open a job's URL in a web browser|
|_--detach_|Don't attach to job logs and don't wait for exit code|
|_\-t, --tty_|Allocate a TTY|
|_\-t, --tty / -T, --no-tty_|Allocate a TTY, can be useful for interactive jobs. By default is on if the command is executed from a terminal, non-tty mode is used if executed from a script.|
|_--help_|Show this message and exit.|


Expand Down Expand Up @@ -1996,8 +2016,7 @@ neuro exec --no-tty my-job ls -l

Name | Description|
|----|------------|
|_\-t, --tty / -T, --no-tty_|Allocate virtual tty. Useful for interactive jobs.|
|_\-i, --interactive / -I, --no-interactive_|Keep STDIN open even if not attached. On for tty by default, false otherwise.|
|_\-t, --tty / -T, --no-tty_|Allocate a TTY, can be useful for interactive jobs. By default is on if the command is executed from a terminal, non-tty mode is used if executed from a script.|
|_--timeout FLOAT_|Maximum allowed time for executing the command, 0 for no timeout \[default: 0]|
|_--help_|Show this message and exit.|

Expand Down
2 changes: 2 additions & 0 deletions neuromation/api/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@
JobStatusHistory,
JobTelemetry,
Resources,
StdStream,
Volume,
)
from .parser import Parser
Expand All @@ -70,6 +71,7 @@
"JobStatusHistory",
"JobTelemetry",
"Resources",
"StdStream",
"Volume",
"HTTPPort",
"Users",
Expand Down
10 changes: 9 additions & 1 deletion neuromation/api/core.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import asyncio
import contextlib
import errno
import json as jsonmodule
Expand Down Expand Up @@ -167,7 +168,14 @@ async def request(
err_cls = self._exception_map.get(resp.status, IllegalArgumentError)
raise err_cls(err_text)
else:
yield resp
try:
yield resp
except GeneratorExit:
# There is a bug in CPython and/or aiohttp,
# if GeneratorExit is reraised @asynccontextmanager
# reports this as an error
# Need to investigate and fix.
raise asyncio.CancelledError

async def ws_connect(
self, abs_url: URL, auth: str, *, headers: Optional[Dict[str, str]] = None
Expand Down
199 changes: 153 additions & 46 deletions neuromation/api/jobs.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,23 @@
from contextlib import suppress
from dataclasses import dataclass, field
from datetime import datetime, timezone
from typing import Any, AsyncIterator, Dict, Iterable, List, Mapping, Optional, Sequence
from typing import (
Any,
AsyncIterator,
Dict,
Iterable,
List,
Mapping,
Optional,
Sequence,
Union,
)

import aiohttp
import attr
import psutil
from aiodocker.exceptions import DockerError
from aiohttp import WSServerHandshakeError
from aiohttp import WSMsgType, WSServerHandshakeError
from dateutil.parser import isoparse
from multidict import MultiDict
from yarl import URL
Expand Down Expand Up @@ -139,6 +150,47 @@ class JobTelemetry:
gpu_memory: Optional[float] = None


@dataclass(frozen=True)
class ExecInspect:
id: str
running: bool
exit_code: int
job_id: str
tty: bool
entrypoint: str
command: str


@dataclass(frozen=True)
class Message:
fileno: int
data: bytes


class StdStream:
def __init__(self, ws: aiohttp.ClientWebSocketResponse) -> None:
self._ws = ws
self._closing = False

async def close(self) -> None:
self._closing = True
await self._ws.close()

async def read_out(self) -> Optional[Message]:
if self._closing:
return None
msg = await self._ws.receive()
if msg.type in (WSMsgType.CLOSE, WSMsgType.CLOSING, WSMsgType.CLOSED):
self._closing = True
return None
return Message(msg.data[0], msg.data[1:])

async def write_in(self, data: bytes) -> None:
if self._closing:
return
await self._ws.send_bytes(data)


class Jobs(metaclass=NoPublicConstructor):
def __init__(self, core: _Core, config: Config, parse: Parser) -> None:
self._core = core
Expand Down Expand Up @@ -322,50 +374,6 @@ async def save(
if push_step:
progress.step(push_step)

async def exec(
self,
id: str,
cmd: Iterable[str],
*,
tty: bool = False,
no_key_check: bool = False,
) -> int:
try:
job_status = await self.status(id)
except IllegalArgumentError as e:
raise ValueError(f"Job not found. Job Id = {id}") from e
if job_status.status != "running":
raise ValueError(f"Job is not running. Job Id = {job_status.id}")
payload = json.dumps(
{
"method": "job_exec",
"token": await self._config.token(),
"params": {"job": job_status.id, "command": list(cmd)},
}
)
command = ["ssh"]
if tty:
command += ["-tt"]
else:
command += ["-T"]
if no_key_check: # pragma: no branch
command += [
"-o",
"StrictHostKeyChecking=no",
"-o",
"UserKnownHostsFile=/dev/null",
]
server_url = job_status.ssh_server
port = server_url.port if server_url.port else 22
command += ["-p", str(port), f"{server_url.user}@{server_url.host}", payload]
proc = await asyncio.create_subprocess_exec(*command)
try:
return await proc.wait()
finally:
await _kill_proc_tree(proc.pid, timeout=10)
# add a sleep to get process watcher a chance to execute all callbacks
await asyncio.sleep(0.1)

@asynccontextmanager
async def port_forward(
self, id: str, local_port: int, job_port: int, *, no_key_check: bool = False
Expand Down Expand Up @@ -440,6 +448,105 @@ async def _port_forward(
# add a sleep to get process watcher a chance to execute all callbacks
await asyncio.sleep(0.1)

@asynccontextmanager
async def attach(
self,
id: str,
*,
stdin: bool = False,
stdout: bool = False,
stderr: bool = False,
logs: bool = False,
) -> AsyncIterator[StdStream]:
url = self._config.monitoring_url / id / "attach"
url = url.with_query(
stdin=str(int(stdin)),
stdout=str(int(stdout)),
stderr=str(int(stderr)),
logs=str(int(logs)),
)
auth = await self._config._api_auth()
ws = await self._core._session.ws_connect(
url,
headers={"Authorization": auth},
timeout=None, # type: ignore
receive_timeout=None,
heartbeat=30,
)

try:
yield StdStream(ws)
finally:
await ws.close()

async def resize(self, id: str, *, w: int, h: int) -> None:
url = self._config.monitoring_url / id / "resize"
url = url.with_query(w=w, h=h)
auth = await self._config._api_auth()
async with self._core.request("POST", url, auth=auth):
pass

async def exec_create(self, id: str, cmd: str, *, tty: bool = False) -> str:
payload = {
"command": cmd,
"stdin": True,
"stdout": True,
"stderr": True,
"tty": tty,
}
url = self._config.monitoring_url / id / "exec_create"
auth = await self._config._api_auth()
async with self._core.request("POST", url, json=payload, auth=auth) as resp:
ret = await resp.json()
return ret["exec_id"]

async def exec_resize(self, id: str, exec_id: str, *, w: int, h: int) -> None:
url = self._config.monitoring_url / id / exec_id / "exec_resize"
url = url.with_query(w=w, h=h)
auth = await self._config._api_auth()
async with self._core.request("POST", url, auth=auth) as resp:
resp

async def exec_inspect(self, id: str, exec_id: str) -> ExecInspect:
url = self._config.monitoring_url / id / exec_id / "exec_inspect"
auth = await self._config._api_auth()
async with self._core.request("GET", url, auth=auth) as resp:
data = await resp.json()
return ExecInspect(
id=data["id"],
running=data["running"],
exit_code=data["exit_code"],
job_id=data["job_id"],
tty=data["tty"],
entrypoint=data["entrypoint"],
command=data["command"],
)

@asynccontextmanager
async def exec_start(self, id: str, exec_id: str) -> AsyncIterator[StdStream]:
url = self._config.monitoring_url / id / exec_id / "exec_start"
auth = await self._config._api_auth()

ws = await self._core._session.ws_connect(
url,
headers={"Authorization": auth},
timeout=None, # type: ignore
receive_timeout=None,
heartbeat=30,
)

try:
yield StdStream(ws)
finally:
await ws.close()

async def send_signal(self, id: str, signal: Union[str, int]) -> None:
url = self._config.monitoring_url / id / "kill"
url = url.with_query(signal=signal)
auth = await self._config._api_auth()
async with self._core.request("POST", url, auth=auth) as resp:
resp


# ############## Internal helpers ###################

Expand Down
Loading

0 comments on commit dfe74a9

Please sign in to comment.