Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Core] Add support for detach setup #1379

Merged
merged 22 commits into from
Nov 7, 2022
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 22 additions & 9 deletions sky/backends/cloud_vm_ray_backend.py
Original file line number Diff line number Diff line change
Expand Up @@ -208,14 +208,14 @@ def add_prologue(self,
self._code += [
textwrap.dedent(f"""\
_SETUP_CPUS = 0.0001
job_lib.set_status({job_id!r}, job_lib.JobStatus.SETUP)
job_lib.set_status({job_id!r}, job_lib.JobStatus.SETTING_UP)
print({_CTRL_C_TIP_MESSAGE!r}, file=sys.stderr, flush=True)
total_num_nodes = len(ray.nodes())
bundles = [{{"CPU": _SETUP_CPUS}} for _ in range(total_num_nodes)]
pg = ray.util.placement_group(bundles, strategy='STRICT_SPREAD')
ray.get(pg.ready())
setup_bundles = [{{"CPU": _SETUP_CPUS}} for _ in range(total_num_nodes)]
setup_pg = ray.util.placement_group(setup_bundles, strategy='STRICT_SPREAD')
ray.get(setup_pg.ready())
setup_workers = [run_bash_command_with_log \\
.options(name='setup', num_cpus=_SETUP_CPUS, placement_group=pg, placement_group_bundle_index=i) \\
.options(name='setup', num_cpus=_SETUP_CPUS, placement_group=setup_pg, placement_group_bundle_index=i) \\
.remote(
setup_cmd,
os.path.expanduser({setup_log_path!r}),
Expand All @@ -226,9 +226,19 @@ def add_prologue(self,
with_ray=True,
use_sudo={is_local},
) for i in range(total_num_nodes)]
ray.get(setup_workers)
time.sleep(1)
print('INFO: Setup finished.', file=sys.stderr, flush=True)""")
setup_returncodes = ray.get(setup_workers)
if sum(setup_returncodes) != 0:
job_lib.set_status({self.job_id!r}, job_lib.JobStatus.FAILED_SETUP)
# This waits for all streaming logs to finish.
time.sleep(1)
print('ERROR: {colorama.Fore.RED}Job {self.job_id} setup failed with '
'return code list:{colorama.Style.RESET_ALL}',
setup_returncodes,
file=sys.stderr,
flush=True)
Michaelvll marked this conversation as resolved.
Show resolved Hide resolved
# Need this to set the job status in ray job to be FAILED.
sys.exit(1)
""")
]
self._code += [
f'job_lib.set_status({job_id!r}, job_lib.JobStatus.PENDING)',
Expand Down Expand Up @@ -1649,7 +1659,8 @@ def __init__(self):
self._optimize_target = None

# Command for running the setup script. It is only set when the
# setup needs to be run outside the self._setup().
# setup needs to be run outside the self._setup() and as part of
# a job (--detach-setup).
self._setup_cmd = None

# --- Implementation of Backend APIs ---
Expand Down Expand Up @@ -2090,6 +2101,8 @@ def error_message() -> str:
subprocess_utils.run_in_parallel(_setup_node, runners)

if detach_setup:
# Set setup_cmd and it will be run outside the self._setup() as part
# of a job (--detach-setup).
Michaelvll marked this conversation as resolved.
Show resolved Hide resolved
self._setup_cmd = setup_cmd
concretevitamin marked this conversation as resolved.
Show resolved Hide resolved
return
logger.info(f'{fore.GREEN}Setup completed.{style.RESET_ALL}')
Expand Down
52 changes: 29 additions & 23 deletions sky/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -1059,17 +1059,21 @@ def cli():
default=False,
is_flag=True,
help='If True, do not actually run the job.')
@click.option('--detach-setup',
'-s',
default=False,
is_flag=True,
help='if True, run setup asynchronously.')
@click.option('--detach-run',
'-d',
default=False,
is_flag=True,
help='If True, run setup first (blocking), '
'then detach from the job\'s execution.')
@click.option(
'--detach-setup',
'-s',
default=False,
is_flag=True,
help=(
'If True, run setup in non-interactive mode where ctrl-c will not '
'interrupt the setup process. Useful for long-running setup commands.'))
Michaelvll marked this conversation as resolved.
Show resolved Hide resolved
@click.option(
'--detach-run',
'-d',
default=False,
is_flag=True,
help=('If True, as soon as a job is submitted, return to the system '
'and do not stream execution logs.'))
@click.option('--docker',
'backend_name',
flag_value=backends.LocalDockerBackend.NAME,
Expand Down Expand Up @@ -1217,12 +1221,13 @@ def launch(
type=str,
nargs=-1,
**_get_shell_complete_args(_complete_file_name))
@click.option('--detach-run',
'-d',
default=False,
is_flag=True,
help='If True, run workdir syncing first (blocking), '
'then detach from the job\'s execution.')
@click.option(
'--detach-run',
'-d',
default=False,
is_flag=True,
help=('If True, as soon as a job is submitted, return to the system '
'and do not stream execution logs.'))
@_add_click_options(_TASK_OPTIONS + _EXTRA_RESOURCES_OPTIONS)
@usage_lib.entrypoint
# pylint: disable=redefined-builtin
Expand Down Expand Up @@ -2762,12 +2767,13 @@ def spot():
type=int,
required=False,
help=('OS disk size in GBs.'))
@click.option('--detach-run',
'-d',
default=False,
is_flag=True,
help='If True, run setup first (blocking), '
'then detach from the job\'s execution.')
@click.option(
'--detach-run',
'-d',
default=False,
is_flag=True,
help=('If True, as soon as a job is submitted, return to the system '
Michaelvll marked this conversation as resolved.
Show resolved Hide resolved
'and do not stream execution logs.'))
@click.option(
'--retry-until-up',
'-r',
Expand Down
15 changes: 10 additions & 5 deletions sky/execution.py
Original file line number Diff line number Diff line change
Expand Up @@ -147,8 +147,11 @@ def _execute(
skipping all setup steps.
cluster_name: Name of the cluster to create/reuse. If None,
auto-generate a name.
detach_setup: bool; whether to run the setup asynchronously.
detach_run: bool; whether to detach the process after the job submitted.
detach_setup: If True, run setup in non-interactive mode where ctrl-c
will not interrupt the setup process. Useful for long-running setup
commands.
detach_run: If True, as soon as a job is submitted, return from this
function and do not stream execution logs.
idle_minutes_to_autostop: int; if provided, the cluster will be set to
autostop after this many minutes of idleness.
no_setup: bool; whether to skip setup commands or not when (re-)launching.
Expand Down Expand Up @@ -331,9 +334,11 @@ def launch(
(CloudVMRayBackend).
optimize_target: target to optimize for. Choices: OptimizeTarget.COST,
OptimizeTarget.TIME.
detach_setup: If True, run setup asynchronously.
detach_run: If True, run setup first, then detach from the
job's execution.
detach_setup: If True, run setup in non-interactive mode where ctrl-c
will not interrupt the setup process. Useful for long-running setup
commands.
detach_run: If True, as soon as a job is submitted, return from this
function and do not stream execution logs.
no_setup: if True, do not re-run setup commands.

Example:
Expand Down
32 changes: 19 additions & 13 deletions sky/skylet/job_lib.py
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ class JobStatus(enum.Enum):
# directly, if the ray program fails to start.
INIT = 'INIT'
# Running the user's setup script.
Michaelvll marked this conversation as resolved.
Show resolved Hide resolved
SETUP = 'SETTING_UP'
SETTING_UP = 'SETTING_UP'
# The job is waiting for the required resources. (`ray job status`
# shows RUNNING as the generated ray program has started, but blocked
# by the placement constraints.)
Expand All @@ -91,12 +91,16 @@ class JobStatus(enum.Enum):
SUCCEEDED = 'SUCCEEDED'
# The job fails due to the user code or a system restart.
FAILED = 'FAILED'
# The job setup failed (--detach-setup). It needs to be placed after
Michaelvll marked this conversation as resolved.
Show resolved Hide resolved
# the `FAILED` state, so that the status set by our generated ray
# program will not be overwritten by ray's job status (FAILED).
Michaelvll marked this conversation as resolved.
Show resolved Hide resolved
FAILED_SETUP = 'FAILED_SETUP'
# The job is cancelled by the user.
CANCELLED = 'CANCELLED'

@classmethod
def nonterminal_statuses(cls) -> List['JobStatus']:
return [cls.INIT, cls.SETUP, cls.PENDING, cls.RUNNING]
return [cls.INIT, cls.SETTING_UP, cls.PENDING, cls.RUNNING]

def is_terminal(self):
return self not in self.nonterminal_statuses()
Expand All @@ -109,15 +113,16 @@ def __lt__(self, other):
# These are intentionally set to one status before, because:
# 1. when the ray status indicates the job is PENDING the generated
# python program should not be started yet, i.e. the job should be INIT.
# 2. when the ray status indicates the job is RUNNING the resources
# may not be allocated yet, i.e. the job should be PENDING.
# For case 2, update_job_status() would compare this mapped PENDING to
# 2. when the ray status indicates the job is RUNNING the job can be in
# setup or resources may not be allocated yet, i.e. the job should be
# SETTING_UP.
# For case 2, update_job_status() would compare this mapped SETTING_UP to
# the status in our jobs DB and take the max. This is because the job's
# generated ray program is the only place that can determine a job has
# reserved resources and actually started running: it will set the
# status in the DB to RUNNING.
'PENDING': JobStatus.INIT,
'RUNNING': JobStatus.SETUP,
'RUNNING': JobStatus.SETTING_UP,
'SUCCEEDED': JobStatus.SUCCEEDED,
'FAILED': JobStatus.FAILED,
'STOPPED': JobStatus.CANCELLED,
Expand Down Expand Up @@ -392,10 +397,11 @@ def update_job_status(job_owner: str,
# already been set to later state by the job. We skip the
# update.
# 2. _RAY_TO_JOB_STATUS_MAP would map `ray job status`'s
# `RUNNING` to our JobStatus.SETYO; if a job has already been
# set to JobStatus.RUNNING by the generated ray program,
# `original_status` (job status from our DB) would already have
# that value. So we take the max here to keep it at RUNNING.
# `RUNNING` to our JobStatus.SETTING_UP; if a job has already
# been set to JobStatus.PENDING or JobStatus.RUNNING by the
# generated ray program, `original_status` (job status from our
# DB) would already have that value. So we take the max here to
# keep it at later status.
status = max(status, original_status)
if status != original_status: # Prevents redundant update.
_set_status_no_lock(job_id, status)
Expand Down Expand Up @@ -480,7 +486,7 @@ def dump_job_queue(username: Optional[str], all_jobs: bool) -> str:
username: The username to show jobs for. Show all the users if None.
all_jobs: Whether to show all jobs, not just the pending/running ones.
"""
status_list = [JobStatus.SETUP, JobStatus.PENDING, JobStatus.RUNNING]
status_list = [JobStatus.SETTING_UP, JobStatus.PENDING, JobStatus.RUNNING]
if all_jobs:
status_list = None

Expand Down Expand Up @@ -514,7 +520,7 @@ def cancel_jobs(job_owner: str, jobs: Optional[List[int]]) -> None:
# jobs to CANCELLED.
if jobs is None:
job_records = _get_jobs(
None, [JobStatus.SETUP, JobStatus.PENDING, JobStatus.RUNNING])
None, [JobStatus.SETTING_UP, JobStatus.PENDING, JobStatus.RUNNING])
else:
job_records = _get_jobs_by_ids(jobs)

Expand All @@ -535,7 +541,7 @@ def cancel_jobs(job_owner: str, jobs: Optional[List[int]]) -> None:
continue

if job['status'] in [
JobStatus.SETUP, JobStatus.PENDING, JobStatus.RUNNING
JobStatus.SETTING_UP, JobStatus.PENDING, JobStatus.RUNNING
]:
set_status(job['job_id'], JobStatus.CANCELLED)

Expand Down
4 changes: 2 additions & 2 deletions sky/skylet/log_lib.py
Original file line number Diff line number Diff line change
Expand Up @@ -351,7 +351,7 @@ def _follow_job_logs(file,
# Auto-exit the log tailing, if the job has finished. Check
# the job status before query again to avoid unfinished logs.
if status not in [
job_lib.JobStatus.SETUP, job_lib.JobStatus.PENDING,
job_lib.JobStatus.SETTING_UP, job_lib.JobStatus.PENDING,
job_lib.JobStatus.RUNNING
]:
if wait_last_logs:
Expand Down Expand Up @@ -416,7 +416,7 @@ def tail_logs(job_owner: str,

start_stream_at = 'INFO: Tip: use Ctrl-C to exit log'
if follow and status in [
job_lib.JobStatus.SETUP,
job_lib.JobStatus.SETTING_UP,
job_lib.JobStatus.PENDING,
job_lib.JobStatus.RUNNING,
]:
Expand Down