Skip to content

Commit

Permalink
work-on
Browse files Browse the repository at this point in the history
  • Loading branch information
asvetlov committed May 19, 2020
1 parent 4b43517 commit bfc4332
Show file tree
Hide file tree
Showing 3 changed files with 85 additions and 48 deletions.
2 changes: 1 addition & 1 deletion neuromation/api/jobs.py
Original file line number Diff line number Diff line change
Expand Up @@ -497,7 +497,7 @@ async def close(self) -> None:

async def read_out(self) -> Optional[Message]:
msg = await self._ws.receive()
if msg.type in (WSMsgType.CLOSE, WSMsgType.CLOSING, WSMsgType.CLOSED,):
if msg.type in (WSMsgType.CLOSE, WSMsgType.CLOSING, WSMsgType.CLOSED):
self._closing = True
return None
return Message(msg.data[0], msg.data[1:])
Expand Down
115 changes: 69 additions & 46 deletions neuromation/cli/job.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
import uuid
import webbrowser
from datetime import datetime, timedelta
from typing import Dict, Iterator, List, Optional, Sequence, Set, Tuple
from typing import AsyncIterator, Dict, Iterator, List, Optional, Sequence, Set, Tuple

import async_timeout
import click
Expand All @@ -34,6 +34,7 @@
StdStream,
Volume,
)
from neuromation.api.utils import asynccontextmanager
from neuromation.cli.formatters import DockerImageProgress
from neuromation.cli.formatters.utils import (
URIFormatter,
Expand Down Expand Up @@ -549,49 +550,45 @@ async def attach(root: Root, job: str) -> None:
JobStatus.FAILED,
},
)
await _attach(root, id, tty=None)
await _attach(root, id, tty=None, logs=False)


async def _attach(root: Root, job: str, tty: Optional[bool]) -> None:
async def _attach(root: Root, job: str, tty: Optional[bool], logs: bool) -> None:
if tty is None:
status = await root.client.jobs.status(job)
tty = status.container.tty
if tty:
await _attach_tty(root, job)
await _attach_tty(root, job, logs)
else:
await _attach_non_tty(root, job)
await _attach_non_tty(root, job, logs)
# this print doesn't help
# we need to find a way to get cmd prompt without pressing any button
# sys.stdout.write("\x1b[?1h")
# ESC c resets the terminal and clears screen
# sys.stdout.write("\x1bc")
# sys.stdout.flush()
status = await root.client.jobs.status(job)
sys.exit(status.history.exit_code)


async def _attach_tty(root: Root, job: str) -> None:
async def _attach_tty(root: Root, job: str, logs: bool) -> None:
loop = asyncio.get_event_loop()
stdout = create_output()
h, w = stdout.get_size()
async with root.client.jobs.attach(
job, stdin=True, stdout=True, stderr=True, logs=True
) as stream:
await root.client.jobs.resize(job, w=w, h=h)
tasks = []
resize_event = asyncio.Event()
tasks.append(loop.create_task(_process_stdin(stream, resize_event)))
tasks.append(loop.create_task(_process_stdout(stream, stdout)))
tasks.append(loop.create_task(_resize(root, job, resize_event, stdout)))
await asyncio.wait(tasks, return_when=asyncio.FIRST_COMPLETED)
for task in tasks:
if not task.done():
task.cancel()
try:
with contextlib.suppress(asyncio.CancelledError):
await task
except Exception as exc:
if root.show_traceback:
logging.exception(str(exc))
else:
logging.error(str(exc))
async with _print_logs_until_attached(root, job, logs) as attach_ready:
async with root.client.jobs.attach(
job, stdin=True, stdout=True, stderr=True, logs=True
) as stream:
attach_ready.set()
await root.client.jobs.resize(job, w=w, h=h)
tasks = []
resize_event = asyncio.Event()
tasks.append(loop.create_task(_process_stdin(stream, resize_event)))
tasks.append(loop.create_task(_process_stdout(stream, stdout)))
tasks.append(loop.create_task(_resize(root, job, resize_event, stdout)))
await asyncio.wait(tasks, return_when=asyncio.FIRST_COMPLETED)
for task in tasks:
await root.cancel_with_logging(task)


async def _resize(
Expand Down Expand Up @@ -659,26 +656,52 @@ async def _process_stdout(stream: StdStream, stdout: Output) -> None:
stdout.flush()


async def _attach_non_tty(root: Root, job: str) -> None:
async def _attach_non_tty(root: Root, job: str, logs: bool) -> None:
codec_info = codecs.lookup("utf8")
decoder = codec_info.incrementaldecoder("replace")
breakpoint()
async with root.client.jobs.attach(
job, stdout=True, stderr=True, logs=True
) as stream:
while True:
chunk = await stream.read_out()
if chunk is None:
break
if chunk.stream == 2:
f = sys.stderr
else:
f = sys.stdout
print(chunk)
txt = decoder.decode(chunk.data)
if txt is not None:
f.write(txt)
f.flush()
async with _print_logs_until_attached(root, job, logs) as attach_ready:
async with root.client.jobs.attach(
job, stdout=True, stderr=True, logs=True
) as stream:
attach_ready.set()
while True:
chunk = await stream.read_out()
if chunk is None:
break
if chunk.stream == 2:
f = sys.stderr
else:
f = sys.stdout
txt = decoder.decode(chunk.data)
if txt is not None:
f.write(txt)
f.flush()


@asynccontextmanager
async def _print_logs_until_attached(
root: Root, job: str, logs: bool
) -> AsyncIterator[asyncio.Event]:
attach_ready = asyncio.Event()
if not logs:
yield attach_ready
return

loop = asyncio.get_event_loop()
reader = loop.create_task(_print_logs(root, job))

async def wait_attached() -> None:
await attach_ready.wait()
# Job is attached, stop logs reading
await root.cancel_with_logging(reader)

waiter = loop.create_task(wait_attached())

try:
yield attach_ready
finally:
await root.cancel_with_logging(reader)
await root.cancel_with_logging(waiter)


@command()
Expand Down Expand Up @@ -1328,7 +1351,7 @@ async def run_job(
"""
)
click.echo(click.style(msg, dim=True))
await _attach(root, job.id, tty)
await _attach(root, job.id, tty=tty, logs=True)
job = await root.client.jobs.status(job.id)
while job.status in (JobStatus.PENDING, JobStatus.RUNNING):
await asyncio.sleep(0.1)
Expand Down
16 changes: 15 additions & 1 deletion neuromation/cli/root.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,13 @@
import asyncio
import contextlib
import logging
import re
import sys
from dataclasses import dataclass, field
from http.cookies import Morsel # noqa
from pathlib import Path
from types import SimpleNamespace
from typing import Awaitable, Dict, Iterator, List, Optional, Tuple, TypeVar
from typing import Any, Awaitable, Dict, Iterator, List, Optional, Tuple, TypeVar

import aiohttp
import click
Expand Down Expand Up @@ -212,3 +214,15 @@ def _sanitize_token(self, token: str) -> str:
def _find_all_tokens(self, text: str) -> Iterator[str]:
for match in HEADER_TOKEN_PATTERN.finditer(text):
yield match.group("token")

async def cancel_with_logging(self, task: "asyncio.Task[Any]") -> None:
if not task.done():
task.cancel()
try:
with contextlib.suppress(asyncio.CancelledError):
await task
except Exception as exc:
if self.show_traceback:
log.exception(str(exc), stack_info=True)
else:
log.error(str(exc))

0 comments on commit bfc4332

Please sign in to comment.