Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Print the status of finished jobs #2801

Merged
merged 6 commits into from
Jul 29, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.D/2800.feature
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Commands `neuro run`, `neuro logs`, `neuro attach` and `neuro exec` in non-quiet mode now prints details for cancelled and failed jobs. Also improved other indications of the job status.
133 changes: 81 additions & 52 deletions neuro-cli/src/neuro_cli/ael.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
from prompt_toolkit.keys import Keys
from prompt_toolkit.output import Output, create_output
from prompt_toolkit.shortcuts import PromptSession
from rich.markup import escape as rich_escape

from neuro_sdk import (
JobDescription,
Expand All @@ -37,7 +38,10 @@
log = logging.getLogger(__name__)


JOB_STARTED = "[dim]===== Job is running, press Ctrl-C to detach/kill =====[/dim]"
JOB_STARTED_NEURO_HAS_TTY = (
"[green]√[/green] "
"[dim]===== Job is running, press Ctrl-C to detach/kill =====[/dim]"
)

JOB_STARTED_NEURO_HAS_NO_TTY = (
"[dim]===== Job is running, press Ctrl-C to detach =====[/dim]"
Expand Down Expand Up @@ -66,13 +70,15 @@ class InterruptAction(enum.Enum):
class AttachHelper:
attach_ready: bool
log_printed: bool
job_started_msg: str
write_sem: asyncio.Semaphore
quiet: bool
action: InterruptAction

def __init__(self, *, quiet: bool) -> None:
self.attach_ready = False
self.log_printed = False
self.job_started_msg = ""
self.write_sem = asyncio.Semaphore()
self.quiet = quiet
self.action = InterruptAction.NOTHING
Expand Down Expand Up @@ -109,7 +115,10 @@ async def process_logs(
if helper.attach_ready:
return
async with helper.write_sem:
helper.log_printed = True
if not helper.log_printed:
if not root.quiet:
root.print(helper.job_started_msg, markup=True)
helper.log_printed = True
sys.stdout.write(txt)
sys.stdout.flush()
else:
Expand All @@ -128,6 +137,10 @@ async def process_exec(
finally:
root.soft_reset_tty()

if not root.quiet:
status = await root.client.jobs.status(job)
print_job_result(root, status)

sys.exit(exit_code)


Expand Down Expand Up @@ -278,16 +291,14 @@ async def _process_attach_single_try(
root, job.id, logs, cluster_name=job.cluster_name
)

with JobStopProgress.create(
console=root.console,
quiet=root.quiet,
) as progress:
if action == InterruptAction.KILL:
if action == InterruptAction.KILL:
with JobStopProgress.create(root.console, quiet=root.quiet) as progress:
progress.kill(job)
sys.exit(128 + signal.SIGINT)
elif action == InterruptAction.DETACH:
sys.exit(128 + signal.SIGINT)
elif action == InterruptAction.DETACH:
with JobStopProgress.create(root.console, quiet=root.quiet) as progress:
progress.detach(job)
sys.exit(0)
sys.exit(0)
except ResourceNotFound:
# Container already stopped, so we can ignore such error.
pass
Expand Down Expand Up @@ -315,28 +326,25 @@ async def _process_attach_single_try(
# The class pins the current time in counstructor,
# that's why we need to initialize
# it AFTER the disconnection from attached session.
with JobStopProgress.create(
console=root.console,
quiet=root.quiet,
) as progress:
while job.status == JobStatus.RUNNING:
await asyncio.sleep(0.2)
job = await root.client.jobs.status(job.id)
with JobStopProgress.create(root.console, quiet=root.quiet) as progress:
while not job.status.is_finished:
if not progress.step(job):
sys.exit(EX_IOERR)
if job.status == JobStatus.FAILED:
sys.exit(job.history.exit_code or EX_PLATFORMERROR)
await asyncio.sleep(0.2)
job = await root.client.jobs.status(job.id)
progress.end(job)
if job.status == JobStatus.FAILED:
sys.exit(job.history.exit_code or EX_PLATFORMERROR)
else:
sys.exit(job.history.exit_code)


async def _attach_tty(
root: Root, job: str, logs: bool, *, cluster_name: Optional[str]
) -> InterruptAction:
if not root.quiet:
root.print(JOB_STARTED_TTY, markup=True)

loop = asyncio.get_event_loop()
helper = AttachHelper(quiet=root.quiet)
helper.job_started_msg = JOB_STARTED_TTY

stdout = create_output()
h, w = stdout.get_size()
Expand All @@ -357,6 +365,7 @@ async def _attach_tty(
if status.status is not JobStatus.RUNNING:
# Job is finished
await logs_printer
print_job_result(root, status)
if status.status == JobStatus.FAILED:
sys.exit(status.history.exit_code or EX_PLATFORMERROR)
else:
Expand Down Expand Up @@ -484,33 +493,22 @@ async def _process_stdout_tty(
else:
txt = decoder.decode(chunk.data)
async with helper.write_sem:
if not helper.quiet and not helper.attach_ready:
# Print header to stdout only,
# logs are printed to stdout and never to
# stderr (but logs printing is stopped by
# helper.attach_ready = True regardless
# what stream had receive text in attached mode.
if helper.log_printed:
s = ATTACH_STARTED_AFTER_LOGS
if root.tty:
s = "[green]√[/green] " + s
root.print(s, markup=True)
helper.attach_ready = True
if not helper.attach_ready:
await _print_header(root, helper)
helper.attach_ready = True
stdout.write_raw(txt)
stdout.flush()


async def _attach_non_tty(
root: Root, job: str, logs: bool, *, cluster_name: Optional[str]
) -> InterruptAction:
if not root.quiet:
s = JOB_STARTED_NEURO_HAS_NO_TTY
if root.tty:
s = "[green]√[/green] " + JOB_STARTED
root.print(s, markup=True)

loop = asyncio.get_event_loop()
helper = AttachHelper(quiet=root.quiet)
if root.tty:
helper.job_started_msg = JOB_STARTED_NEURO_HAS_TTY
else:
helper.job_started_msg = JOB_STARTED_NEURO_HAS_NO_TTY

if logs:
logs_printer = loop.create_task(
Expand All @@ -527,6 +525,7 @@ async def _attach_non_tty(
if status.history.exit_code is not None:
# Wait for logs printing finish before exit
await logs_printer
print_job_result(root, status)
sys.exit(status.history.exit_code)

input_task = None
Expand Down Expand Up @@ -580,18 +579,9 @@ async def _process_stdout_non_tty(
async def _write(fileno: int, txt: str) -> None:
f = streams[fileno]
async with helper.write_sem:
if not helper.quiet and not helper.attach_ready:
# Print header to stdout only,
# logs are printed to stdout and never to
# stderr (but logs printing is stopped by
# helper.attach_ready = True regardless
# what stream had receive text in attached mode.
if helper.log_printed:
s = ATTACH_STARTED_AFTER_LOGS
if root.tty:
s = "[green]√[/green] " + s
root.print(s, markup=True)
helper.attach_ready = True
if not helper.attach_ready:
await _print_header(root, helper)
helper.attach_ready = True
f.write(txt)
f.flush()

Expand All @@ -608,6 +598,23 @@ async def _write(fileno: int, txt: str) -> None:
await _write(chunk.fileno, txt)


async def _print_header(root: Root, helper: AttachHelper) -> None:
if not helper.quiet and not helper.attach_ready:
# Print header to stdout only,
# logs are printed to stdout and never to
# stderr (but logs printing is stopped by
# helper.attach_ready = True regardless
# what stream had receive text in attached mode.
if helper.log_printed:
s = ATTACH_STARTED_AFTER_LOGS
if root.tty:
s = "[green]√[/green] " + s
root.print(s, markup=True)
else:
if not root.quiet:
root.print(helper.job_started_msg, markup=True)


def _create_interruption_dialog() -> PromptSession[InterruptAction]:
bindings = KeyBindings()

Expand Down Expand Up @@ -701,3 +708,25 @@ async def _cancel_attach_output(root: Root, output_task: "asyncio.Task[Any]") ->
if ex and isinstance(ex, StdStreamError):
return
await root.cancel_with_logging(output_task)


def print_job_result(root: Root, job: JobDescription) -> None:
if job.status == JobStatus.SUCCEEDED and root.verbosity > 0:
msg = f"Job [b]{job.id}[/b] finished successfully"
if root.tty:
msg = "[green]√[/green] " + msg
root.print(msg, markup=True)
if job.status == JobStatus.CANCELLED and root.verbosity >= 0:
msg = f"Job [b]{job.id}[/b] was cancelled"
if root.tty:
msg = "[green]√[/green] " + msg
if job.history.reason:
msg += f" ({rich_escape(job.history.reason)})"
root.print(msg, markup=True)
if job.status == JobStatus.FAILED and root.verbosity >= 0:
msg = f"Job [b]{job.id}[/b] failed"
if root.tty:
msg = "[red]×[/red] " + msg
if job.history.reason:
msg += f" ({rich_escape(job.history.reason)})"
root.print(msg, markup=True)
64 changes: 46 additions & 18 deletions neuro-cli/src/neuro_cli/formatters/jobs.py
Original file line number Diff line number Diff line change
Expand Up @@ -542,7 +542,7 @@ def _get_status_reason_message(self, job: JobDescription) -> str:
return ""

def _get_status_description_message(self, job: JobDescription) -> str:
description = job.history.description or ""
description = job.history.description
if description:
return f"({description})"
return ""
Expand Down Expand Up @@ -599,13 +599,13 @@ def begin(self, job: JobDescription) -> None:
def step(self, job: JobDescription) -> None:
new_time = self.time_factory()
dt = new_time - self._time
if job.status == JobStatus.PENDING:
if job.status.is_pending:
msg = Text("-", "yellow")
elif job.status == JobStatus.FAILED:
msg = Text("×", "red")
else:
# RUNNING or SUCCEDED
elif job.status in (JobStatus.RUNNING, JobStatus.SUCCEEDED):
msg = Text("√", "green")
else:
# FAILED or CANCELLED or UNKNOWN
msg = Text("×", "red")

msg = Text.assemble(msg, " Status: ", fmt_status(job.status))
reason = self._get_status_reason_message(job)
Expand Down Expand Up @@ -635,7 +635,7 @@ def end(self, job: JobDescription) -> None:
self._prev = empty
self._live_render.set_renderable(empty)

if job.status != JobStatus.FAILED:
if not job.status.is_finished:
http_url = job.http_url
if http_url:
out.append(f"{yes()} [b]Http URL[/b]: {rich_escape(str(http_url))}")
Expand Down Expand Up @@ -739,6 +739,9 @@ def step(self, job: JobDescription) -> bool:
def tick(self, job: JobDescription) -> None:
pass

def end(self, job: JobDescription) -> None:
pass

def timeout(self, job: JobDescription) -> None:
pass

Expand Down Expand Up @@ -790,18 +793,29 @@ def kill(self, job: JobDescription) -> None:
]
)

def end(self, job: JobDescription) -> None:
if job.status == JobStatus.SUCCEEDED:
msg = yes() + f" Job [b]{job.id}[/b] finished successfully"
elif job.status == JobStatus.CANCELLED:
msg = yes() + f" Job [b]{job.id}[/b] was cancelled"
if job.history.reason:
msg += f" ({rich_escape(job.history.reason)})"
else:
msg = no() + f" Job [b]{job.id}[/b] failed"
if job.history.reason:
msg += f" ({rich_escape(job.history.reason)})"

self._live_render.set_renderable(Text.from_markup(msg))
with self._console:
self._console.print(Control())

def tick(self, job: JobDescription) -> None:
new_time = self.time_factory()
dt = new_time - self._time

if job.status == JobStatus.RUNNING:
msg = (
"[yellow]-[/yellow]"
+ f" Wait for stop {next(self._spinner)} [{dt:.1f} sec]"
)
else:
msg = yes() + f" Job [b]{job.id}[/b] stopped"

msg = (
"[yellow]-[/yellow]"
+ f" Wait for stop {next(self._spinner)} [{dt:.1f} sec]"
)
self._live_render.set_renderable(Text.from_markup(msg))
with self._console:
self._console.print(Control())
Expand Down Expand Up @@ -853,16 +867,30 @@ class StreamJobStopProgress(JobStopProgress):
def __init__(self, console: Console) -> None:
super().__init__()
self._console = console
self._console.print("Wait for stopping")
self._first = True

def detach(self, job: JobDescription) -> None:
pass

def kill(self, job: JobDescription) -> None:
self._console.print("Job was killed")

def end(self, job: JobDescription) -> None:
if job.status == JobStatus.CANCELLED:
msg = "Job was cancelled"
if job.history.reason:
msg += f" ({job.history.reason})"
self._console.print(msg)
if job.status == JobStatus.FAILED:
msg = "Job failed"
if job.history.reason:
msg += f" ({job.history.reason})"
self._console.print(msg)

def tick(self, job: JobDescription) -> None:
pass
if self._first:
self._console.print("Wait for stopping")
self._first = False

def timeout(self, job: JobDescription) -> None:
self._console.print("")
Expand Down
Loading