Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking β€œSign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

queue: docs parity cleanup #7975

Merged
merged 4 commits into from
Jul 5, 2022
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion dvc/commands/queue/kill.py
Original file line number Diff line number Diff line change
@@ -17,7 +17,7 @@ def run(self):


def add_parser(queue_subparsers, parent_parser):
QUEUE_KILL_HELP = "Kill tasks in experiments queue."
QUEUE_KILL_HELP = "Kill actively running experiment queue tasks."
queue_kill_parser = queue_subparsers.add_parser(
"kill",
parents=[parent_parser],
5 changes: 4 additions & 1 deletion dvc/commands/queue/logs.py
Original file line number Diff line number Diff line change
@@ -21,7 +21,9 @@ def run(self):


def add_parser(queue_subparsers, parent_parser):
QUEUE_LOGS_HELP = "Show output logs for a task in the experiments queue."
QUEUE_LOGS_HELP = (
"Show output logs for running and completed experiment queue tasks."
)
queue_logs_parser = queue_subparsers.add_parser(
"logs",
parents=[parent_parser],
@@ -35,6 +37,7 @@ def add_parser(queue_subparsers, parent_parser):
help=(
"Text encoding for log output. Defaults to system locale encoding."
),
metavar="<encoding>",
)
queue_logs_parser.add_argument(
"-f",
20 changes: 14 additions & 6 deletions dvc/commands/queue/remove.py
Original file line number Diff line number Diff line change
@@ -31,7 +31,7 @@ def run(self):

def add_parser(queue_subparsers, parent_parser):

QUEUE_REMOVE_HELP = "Remove tasks in experiments queue."
QUEUE_REMOVE_HELP = "Remove queued and completed tasks from the queue."
queue_remove_parser = queue_subparsers.add_parser(
"remove",
parents=[parent_parser],
@@ -40,21 +40,29 @@ def add_parser(queue_subparsers, parent_parser):
formatter_class=argparse.RawDescriptionHelpFormatter,
)
queue_remove_parser.add_argument(
"--all", action="store_true", help="Remove all tasks."
"--all",
action="store_true",
help="Remove all queued and completed tasks from the queue.",
)
queue_remove_parser.add_argument(
"--queued", action="store_true", help="Remove all tasks in queue."
"--queued",
action="store_true",
help="Remove all queued tasks from the queue.",
)
queue_remove_parser.add_argument(
"--success", action="store_true", help="Remove all successful tasks."
"--success",
action="store_true",
help="Remove all successful tasks from the queue.",
)
queue_remove_parser.add_argument(
"--failed", action="store_true", help="Remove all failed tasks."
"--failed",
action="store_true",
help="Remove all failed tasks from the queue.",
)
queue_remove_parser.add_argument(
"task",
nargs="*",
help="Tasks in queue to remove.",
help="Tasks to remove.",
metavar="<task>",
)
queue_remove_parser.set_defaults(func=CmdQueueRemove)
24 changes: 10 additions & 14 deletions dvc/commands/queue/start.py
Original file line number Diff line number Diff line change
@@ -17,24 +17,16 @@ def run(self):
)

suffix = "s" if started > 1 else ""
ui.write(f"Start {started} new worker{suffix} to process the queue.")
ui.write(
f"Started '{started}' new experiments task queue worker{suffix}."
)

return 0


def job_type(job):
try:
job = int(job)
if job > 0:
return job
except ValueError:
pass
raise argparse.ArgumentTypeError("Worker number must be a natural number.")


def add_parser(queue_subparsers, parent_parser):

QUEUE_START_HELP = "Start experiments queue workers."
QUEUE_START_HELP = "Start the experiments task queue worker."
queue_start_parser = queue_subparsers.add_parser(
"start",
parents=[parent_parser],
@@ -45,8 +37,12 @@ def add_parser(queue_subparsers, parent_parser):
queue_start_parser.add_argument(
"-j",
"--jobs",
type=job_type,
type=int,
default=1,
help="Number of queue workers to start.",
help=(
"Maximum number of concurrent queue workers to start. "
"Defaults to 1."
),
metavar="<number>",
)
queue_start_parser.set_defaults(func=CmdQueueStart)
48 changes: 22 additions & 26 deletions dvc/commands/queue/status.py
Original file line number Diff line number Diff line change
@@ -13,49 +13,45 @@


class CmdQueueStatus(CmdBase):
"""Kill exp task in queue."""
"""Show queue task and worker status."""

def run(self):
result: List[
Mapping[str, Optional[str]]
] = self.repo.experiments.celery_queue.status()
all_headers = ["Task", "Name", "Created", "Status"]
td = TabularData(all_headers)
for exp in result:
created = format_time(exp.get("timestamp"))
td.append(
[exp["rev"][:7], exp.get("name", ""), created, exp["status"]]
)
td.render()

if not result:
ui.write("No experiments in task queue for now.")
if result:
all_headers = ["Task", "Name", "Created", "Status"]
td = TabularData(all_headers)
for exp in result:
created = format_time(exp.get("timestamp"))
td.append(
[
exp["rev"][:7],
exp.get("name", ""),
created,
exp["status"],
]
)
td.render()
else:
ui.write("No experiment tasks in the queue.")
ui.write()

worker_status = self.repo.experiments.celery_queue.worker_status()
active_count = len(
[name for name, task in worker_status.items() if task]
)
idle_count = len(worker_status) - active_count

if active_count == 1:
ui.write("There is 1 worker active", end=", ")
elif active_count == 0:
ui.write("No worker active", end=", ")
else:
ui.write(f"There are {active_count} workers active", end=", ")

if idle_count == 1:
ui.write("1 worker idle at present.")
elif idle_count == 0:
ui.write("no worker idle at present.")
else:
ui.write(f"{idle_count} workers idle at present.")
ui.write(f"Worker status: {active_count} active, {idle_count} idle")

return 0


def add_parser(queue_subparsers, parent_parser):
QUEUE_STATUS_HELP = "List the status of the queue tasks and workers."
QUEUE_STATUS_HELP = (
"Show the status of experiments queue tasks and workers."
)
queue_status_parser = queue_subparsers.add_parser(
"status",
parents=[parent_parser],
2 changes: 1 addition & 1 deletion dvc/commands/queue/stop.py
Original file line number Diff line number Diff line change
@@ -27,7 +27,7 @@ def run(self):

def add_parser(queue_subparsers, parent_parser):

QUEUE_STOP_HELP = "Stop experiments queue workers."
QUEUE_STOP_HELP = "Stop all experiments task queue workers."
queue_stop_parser = queue_subparsers.add_parser(
"stop",
parents=[parent_parser],
7 changes: 4 additions & 3 deletions dvc/repo/experiments/__init__.py
Original file line number Diff line number Diff line change
@@ -117,9 +117,6 @@ def reproduce_one(
self.tempdir_queue if tmp_dir else self.workspace_queue
)
self.queue_one(exp_queue, **kwargs)
if machine:
# TODO: decide how to handle queued remote execution
raise NotImplementedError
results = self._reproduce_queue(exp_queue)
exp_rev = first(results)
if exp_rev is not None:
@@ -137,6 +134,10 @@ def queue_one(
if reset:
self.reset_checkpoints()

if kwargs.pop("machine", None) is not None:
# TODO: decide how to handle queued remote execution
raise NotImplementedError

if checkpoint_resume:
from dvc.scm import resolve_rev

8 changes: 4 additions & 4 deletions tests/unit/command/test_queue.py
Original file line number Diff line number Diff line change
@@ -98,19 +98,19 @@ def test_experiments_stop(dvc, scm, mocker):
[
(
{"worker1": [], "worker2": []},
"No worker active, 2 workers idle at present.",
"Worker status: 0 active, 2 idle",
),
(
{
"worker1": [{"id": "1"}],
"worker2": [{"id": "2"}],
"worker3": [],
},
"There are 2 workers active, 1 worker idle at present.",
"Worker status: 2 active, 1 idle",
),
(
{"worker1": [{"id": "1"}]},
"There is 1 worker active, no worker idle at present.",
"Worker status: 1 active, 0 idle",
),
],
)
@@ -137,7 +137,7 @@ def test_worker_status(dvc, scm, worker_status, output, mocker, capsys):
assert cmd.run() == 0
m.assert_called_once_with()
log, _ = capsys.readouterr()
assert "No experiments in task queue for now." in log
assert "No experiment tasks in the queue." in log
assert output in log