Skip to content

Commit

Permalink
Scratch exec api
Browse files Browse the repository at this point in the history
  • Loading branch information
asvetlov committed May 21, 2020
1 parent b2cfa14 commit ee73987
Show file tree
Hide file tree
Showing 2 changed files with 108 additions and 25 deletions.
111 changes: 88 additions & 23 deletions neuromation/api/jobs.py
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,45 @@ class JobTelemetry:
gpu_memory: Optional[float] = None


@dataclass(frozen=True)
class ExecInspect:
id: str
running: bool
exit_code: int
job_id: str
tty: bool
entrypoint: str
command: str


@dataclass(frozen=True)
class Message:
stream: int
data: bytes


class StdStream:
def __init__(self, ws: aiohttp.ClientWebSocketResponse) -> None:
self._ws = ws
self._closing = False

async def close(self) -> None:
self._closing = True
await self._ws.close()

async def read_out(self) -> Optional[Message]:
msg = await self._ws.receive()
if msg.type in (WSMsgType.CLOSE, WSMsgType.CLOSING, WSMsgType.CLOSED):
self._closing = True
return None
return Message(msg.data[0], msg.data[1:])

async def write_in(self, data: bytes) -> None:
if self._closing:
return
await self._ws.send_bytes(data)


class Jobs(metaclass=NoPublicConstructor):
def __init__(self, core: _Core, config: Config, parse: Parser) -> None:
self._core = core
Expand Down Expand Up @@ -450,7 +489,7 @@ async def attach(
stdout: bool = False,
stderr: bool = False,
logs: bool = False,
) -> AsyncIterator["StdStream"]:
) -> AsyncIterator[StdStream]:
url = self._config.monitoring_url / id / "attach"
url = url.with_query(
stdin=str(int(stdin)),
Expand Down Expand Up @@ -479,33 +518,59 @@ async def resize(self, id: str, *, w: int, h: int) -> None:
async with self._core.request("POST", url, auth=auth):
pass

async def exec_create(self, id: str, cmd: str, *, tty: bool = False) -> str:
payload = {
"command": cmd,
"stdin": True,
"stdout": True,
"stderr": True,
"tty": tty,
}
url = self._config.monitoring_url / id / "exec_create"
auth = await self._config._api_auth()
async with self._core.request("POST", url, json=payload, auth=auth) as resp:
ret = await resp.json()
return ret["exec_id"]

@dataclass(frozen=True)
class Message:
stream: int
data: bytes

async def exec_resize(self, id: str, exec_id: str, *, w: int, h: int) -> None:
url = self._config.monitoring_url / id / exec_id / "exec_resize"
url = url.with_query(w=w, h=h)
auth = await self._config._api_auth()
async with self._core.request("POST", url, auth=auth) as resp:
resp

class StdStream:
def __init__(self, ws: aiohttp.ClientWebSocketResponse) -> None:
self._ws = ws
self._closing = False
async def exec_inspect(self, id: str, exec_id: str) -> ExecInspect:
url = self._config.monitoring_url / id / exec_id / "exec_inspect"
auth = await self._config._api_auth()
async with self._core.request("GET", url, auth=auth) as resp:
data = await resp.json()
return ExecInspect(
id=data["id"],
running=data["running"],
exit_code=data["exit_code"],
job_id=data["job_id"],
tty=data["tty"],
entrypoint=data["entrypoint"],
command=data["command"],
)

async def close(self) -> None:
self._closing = True
await self._ws.close()
@asynccontextmanager
async def exec_start(self, id: str, exec_id: str) -> AsyncIterator[StdStream]:
url = self._config.monitoring_url / id / exec_id / "exec_start"
auth = await self._config._api_auth()

async def read_out(self) -> Optional[Message]:
msg = await self._ws.receive()
if msg.type in (WSMsgType.CLOSE, WSMsgType.CLOSING, WSMsgType.CLOSED):
self._closing = True
return None
return Message(msg.data[0], msg.data[1:])
ws = await self._core._session.ws_connect(
url,
headers={"Authorization": auth},
timeout=None, # type: ignore
receive_timeout=None,
heartbeat=30,
)

async def write_in(self, data: bytes) -> None:
if self._closing:
return
await self._ws.send_bytes(data)
try:
yield StdStream(ws)
finally:
await ws.close()


# ############## Internal helpers ###################
Expand Down
22 changes: 20 additions & 2 deletions tests/e2e/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -462,10 +462,28 @@ def run_cli(
out = out.strip()
err = err.strip()
if verbosity > 0:
print(f"nero stdout: {out}")
print(f"nero stderr: {err}")
print(f"neuro stdout: {out}")
print(f"neuro stderr: {err}")
return SysCap(out, err)

async def run_cli_async(
self,
arguments: List[str],
*,
verbosity: int = 0,
network_timeout: float = NETWORK_TIMEOUT,
) -> "asyncio.Process":
__tracebackhide__ = True

log.info("Run 'neuro %s'", " ".join(arguments))

return await asyncio.create_subprocess_exec(
*self._default_args(verbosity, network_timeout) + arguments,
stdin=subprocess.PIPE,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
)

def autocomplete(
self,
arguments: List[str],
Expand Down

0 comments on commit ee73987

Please sign in to comment.