Skip to content

Commit

Permalink
[Core] Add support for detach setup (skypilot-org#1379)
Browse files Browse the repository at this point in the history
* Add support for async setup

* Fix logging

* Add test for async setup

* add parens

* fix

* refactor a bit

* Fix status

* fix smoke test

* rename

* fix is_cluster_idle function

* format

* address comments

* fix

* Add setup failed

* Fix failed setup

* Add comment

* Add comments

* format

* fix logs

* format

* address comments
  • Loading branch information
Michaelvll authored and Sumanth committed Jan 15, 2023
1 parent 9e4df86 commit a530df1
Show file tree
Hide file tree
Showing 10 changed files with 218 additions and 81 deletions.
3 changes: 2 additions & 1 deletion examples/job_queue/job_multinode.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,12 @@ num_nodes: 2

setup: |
echo "running setup"
sleep 80
run: |
timestamp=$(date +%s)
conda env list
for i in {1..120}; do
for i in {1..240}; do
echo "$timestamp $i"
sleep 1
done
8 changes: 5 additions & 3 deletions sky/backends/backend.py
Original file line number Diff line number Diff line change
Expand Up @@ -66,8 +66,9 @@ def sync_file_mounts(

@timeline.event
@usage_lib.messages.usage.update_runtime('setup')
def setup(self, handle: ResourceHandle, task: 'task_lib.Task') -> None:
return self._setup(handle, task)
def setup(self, handle: ResourceHandle, task: 'task_lib.Task',
detach_setup: bool) -> None:
return self._setup(handle, task, detach_setup)

def add_storage_objects(self, task: 'task_lib.Task') -> None:
raise NotImplementedError
Expand Down Expand Up @@ -123,7 +124,8 @@ def _sync_file_mounts(
) -> None:
raise NotImplementedError

def _setup(self, handle: ResourceHandle, task: 'task_lib.Task') -> None:
def _setup(self, handle: ResourceHandle, task: 'task_lib.Task',
detach_setup: bool) -> None:
raise NotImplementedError

def _execute(self, handle: ResourceHandle, task: 'task_lib.Task',
Expand Down
100 changes: 84 additions & 16 deletions sky/backends/cloud_vm_ray_backend.py
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,9 @@

_TPU_NOT_FOUND_ERROR = 'ERROR: (gcloud.compute.tpus.delete) NOT_FOUND'

_CTRL_C_TIP_MESSAGE = ('INFO: Tip: use Ctrl-C to exit log streaming '
'(task will not be killed).')

_MAX_RAY_UP_RETRY = 5

_JOB_ID_PATTERN = re.compile(r'Job ID: ([0-9]+)')
Expand Down Expand Up @@ -148,6 +151,9 @@ def __init__(self):
def add_prologue(self,
job_id: int,
spot_task: Optional['task_lib.Task'] = None,
setup_cmd: Optional[str] = None,
envs: Optional[Dict[str, str]] = None,
setup_log_path: Optional[str] = None,
is_local: bool = False) -> None:
assert not self._has_prologue, 'add_prologue() called twice?'
self._has_prologue = True
Expand Down Expand Up @@ -182,12 +188,11 @@ def add_prologue(self,
from sky.utils import log_utils
SKY_REMOTE_WORKDIR = {constants.SKY_REMOTE_WORKDIR!r}
job_lib.set_status({job_id!r}, job_lib.JobStatus.PENDING)
ray.init(address={ray_address!r}, namespace='__sky__{job_id}__', log_to_driver=True)
run_fn = None
futures = []"""),
futures = []
"""),
# FIXME: This is a hack to make sure that the functions can be found
# by ray.remote. This should be removed once we have a better way to
# specify dependencies for ray.
Expand All @@ -197,6 +202,46 @@ def add_prologue(self,
inspect.getsource(log_lib.add_ray_env_vars),
inspect.getsource(log_lib.run_bash_command_with_log),
'run_bash_command_with_log = ray.remote(run_bash_command_with_log)',
f'setup_cmd = {setup_cmd!r}'
]
if setup_cmd is not None:
self._code += [
textwrap.dedent(f"""\
_SETUP_CPUS = 0.0001
job_lib.set_status({job_id!r}, job_lib.JobStatus.SETTING_UP)
print({_CTRL_C_TIP_MESSAGE!r}, file=sys.stderr, flush=True)
total_num_nodes = len(ray.nodes())
setup_bundles = [{{"CPU": _SETUP_CPUS}} for _ in range(total_num_nodes)]
setup_pg = ray.util.placement_group(setup_bundles, strategy='STRICT_SPREAD')
ray.get(setup_pg.ready())
setup_workers = [run_bash_command_with_log \\
.options(name='setup', num_cpus=_SETUP_CPUS, placement_group=setup_pg, placement_group_bundle_index=i) \\
.remote(
setup_cmd,
os.path.expanduser({setup_log_path!r}),
getpass.getuser(),
job_id={self.job_id},
env_vars={envs!r},
stream_logs=True,
with_ray=True,
use_sudo={is_local},
) for i in range(total_num_nodes)]
setup_returncodes = ray.get(setup_workers)
if sum(setup_returncodes) != 0:
job_lib.set_status({self.job_id!r}, job_lib.JobStatus.FAILED_SETUP)
# This waits for all streaming logs to finish.
time.sleep(1)
print('ERROR: {colorama.Fore.RED}Job {self.job_id}\\'s setup failed with '
'return code list:{colorama.Style.RESET_ALL}',
setup_returncodes,
file=sys.stderr,
flush=True)
# Need this to set the job status in ray job to be FAILED.
sys.exit(1)
""")
]
self._code += [
f'job_lib.set_status({job_id!r}, job_lib.JobStatus.PENDING)',
]
if spot_task is not None:
# Add the spot job to spot queue table.
Expand Down Expand Up @@ -247,15 +292,15 @@ def add_gang_scheduling_placement_group(
**gpu_dict,
})

pack_mode = 'STRICT_SPREAD'
self._code += [
textwrap.dedent(f"""\
pg = ray_util.placement_group({json.dumps(bundles)}, {pack_mode!r})
pg = ray_util.placement_group({json.dumps(bundles)}, 'STRICT_SPREAD')
plural = 's' if {num_nodes} > 1 else ''
node_str = f'{num_nodes} node' + plural + '.'
print('INFO: Tip: use Ctrl-C to exit log streaming (task will not be killed).\\n'
'INFO: Waiting for task resources on ' + node_str +
' This will block if the cluster is full.',
node_str = f'{num_nodes} node{{plural}}'
message = '' if setup_cmd is not None else {_CTRL_C_TIP_MESSAGE!r} + '\\n'
message += f'INFO: Waiting for task resources on {{node_str}}. This will block if the cluster is full.'
print(message,
file=sys.stderr,
flush=True)
# FIXME: This will print the error message from autoscaler if
Expand Down Expand Up @@ -1636,6 +1681,11 @@ def __init__(self):
self._dag = None
self._optimize_target = None

# Command for running the setup script. It is only set when the
# setup needs to be run outside the self._setup() and as part of
# a job (--detach-setup).
self._setup_cmd = None

# --- Implementation of Backend APIs ---

def register_info(self, **kwargs) -> None:
Expand Down Expand Up @@ -1987,7 +2037,8 @@ def _sync_file_mounts(
self._execute_file_mounts(handle, all_file_mounts)
self._execute_storage_mounts(handle, storage_mounts)

def _setup(self, handle: ResourceHandle, task: task_lib.Task) -> None:
def _setup(self, handle: ResourceHandle, task: task_lib.Task,
detach_setup: bool) -> Optional[str]:
start = time.time()
style = colorama.Style
fore = colorama.Fore
Expand All @@ -2014,17 +2065,20 @@ def _setup(self, handle: ResourceHandle, task: task_lib.Task) -> None:
runners = command_runner.SSHCommandRunner.make_runner_list(
ip_list, *ssh_credentials)

# Need this `-i` option to make sure `source ~/.bashrc` work
setup_cmd = f'/bin/bash -i /tmp/{setup_file} 2>&1'

def _setup_node(runner: command_runner.SSHCommandRunner) -> int:
runner.rsync(source=setup_sh_path,
target=f'/tmp/{setup_file}',
up=True,
stream_logs=False)
# Need this `-i` option to make sure `source ~/.bashrc` work
cmd = f'/bin/bash -i /tmp/{setup_file} 2>&1'
if detach_setup:
return
setup_log_path = os.path.join(self.log_dir,
f'setup-{runner.ip}.log')
returncode = runner.run(
cmd,
setup_cmd,
log_path=setup_log_path,
process_stream=False,
)
Expand Down Expand Up @@ -2057,14 +2111,22 @@ def error_message() -> str:
return err_msg

subprocess_utils.handle_returncode(returncode=returncode,
command=cmd,
command=setup_cmd,
error_msg=error_message)

num_nodes = len(ip_list)
plural = 's' if num_nodes > 1 else ''
logger.info(f'{fore.CYAN}Running setup on {num_nodes} node{plural}.'
f'{style.RESET_ALL}')
if not detach_setup:
logger.info(
f'{fore.CYAN}Running setup on {num_nodes} node{plural}.'
f'{style.RESET_ALL}')
subprocess_utils.run_in_parallel(_setup_node, runners)

if detach_setup:
# Only set this when setup needs to be run outside the self._setup()
# as part of a job (--detach-setup).
self._setup_cmd = setup_cmd
return
logger.info(f'{fore.GREEN}Setup completed.{style.RESET_ALL}')
end = time.time()
logger.debug(f'Setup took {end - start} seconds.')
Expand Down Expand Up @@ -3021,6 +3083,9 @@ def _execute_task_one_node(self, handle: ResourceHandle,
is_local = isinstance(handle.launched_resources.cloud, clouds.Local)
codegen.add_prologue(job_id,
spot_task=task.spot_task,
setup_cmd=self._setup_cmd,
envs=task.envs,
setup_log_path=os.path.join(log_dir, 'setup.log'),
is_local=is_local)
codegen.add_gang_scheduling_placement_group(1, accelerator_dict)

Expand Down Expand Up @@ -3082,6 +3147,9 @@ def _execute_task_n_nodes(self, handle: ResourceHandle, task: task_lib.Task,
is_local = isinstance(handle.launched_resources.cloud, clouds.Local)
codegen.add_prologue(job_id,
spot_task=task.spot_task,
setup_cmd=self._setup_cmd,
envs=task.envs,
setup_log_path=os.path.join(log_dir, 'setup.log'),
is_local=is_local)
codegen.add_gang_scheduling_placement_group(
num_actual_nodes,
Expand Down
2 changes: 1 addition & 1 deletion sky/benchmark/benchmark_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -315,7 +315,7 @@ def _update_benchmark_result(benchmark_result: Dict[str, Any]) -> Optional[str]:

# Update the benchmark status.
if (cluster_status == global_user_state.ClusterStatus.INIT or
job_status in [job_lib.JobStatus.INIT, job_lib.JobStatus.PENDING]):
job_status < job_lib.JobStatus.RUNNING):
benchmark_status = benchmark_state.BenchmarkStatus.INIT
elif job_status == job_lib.JobStatus.RUNNING:
benchmark_status = benchmark_state.BenchmarkStatus.RUNNING
Expand Down
55 changes: 36 additions & 19 deletions sky/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -666,6 +666,7 @@ def _launch_with_confirm(
*,
dryrun: bool,
detach_run: bool,
detach_setup: bool = False,
no_confirm: bool = False,
idle_minutes_to_autostop: Optional[int] = None,
down: bool = False, # pylint: disable=redefined-outer-name
Expand Down Expand Up @@ -726,6 +727,7 @@ def _launch_with_confirm(
dryrun=dryrun,
stream_logs=True,
cluster_name=cluster,
detach_setup=detach_setup,
detach_run=detach_run,
backend=backend,
idle_minutes_to_autostop=idle_minutes_to_autostop,
Expand Down Expand Up @@ -1057,12 +1059,24 @@ def cli():
default=False,
is_flag=True,
help='If True, do not actually run the job.')
@click.option('--detach-run',
'-d',
default=False,
is_flag=True,
help='If True, run setup first (blocking), '
'then detach from the job\'s execution.')
@click.option(
'--detach-setup',
'-s',
default=False,
is_flag=True,
help=
('If True, run setup in non-interactive mode as part of the job itself. '
'You can safely ctrl-c to detach from logging, and it will not interrupt '
'the setup process. To see the logs again after detaching, use `sky logs`.'
' To cancel setup, cancel the job via `sky cancel`. Useful for long-'
'running setup commands.'))
@click.option(
'--detach-run',
'-d',
default=False,
is_flag=True,
help=('If True, as soon as a job is submitted, return from this call '
'and do not stream execution logs.'))
@click.option('--docker',
'backend_name',
flag_value=backends.LocalDockerBackend.NAME,
Expand Down Expand Up @@ -1116,7 +1130,6 @@ def cli():
required=False,
help='Skip confirmation prompt.')
@click.option('--no-setup',
'-n',
is_flag=True,
default=False,
required=False,
Expand All @@ -1126,6 +1139,7 @@ def launch(
entrypoint: str,
cluster: Optional[str],
dryrun: bool,
detach_setup: bool,
detach_run: bool,
backend_name: Optional[str],
name: Optional[str],
Expand Down Expand Up @@ -1190,6 +1204,7 @@ def launch(
backend,
cluster,
dryrun=dryrun,
detach_setup=detach_setup,
detach_run=detach_run,
no_confirm=yes,
idle_minutes_to_autostop=idle_minutes_to_autostop,
Expand All @@ -1209,12 +1224,13 @@ def launch(
type=str,
nargs=-1,
**_get_shell_complete_args(_complete_file_name))
@click.option('--detach-run',
'-d',
default=False,
is_flag=True,
help='If True, run workdir syncing first (blocking), '
'then detach from the job\'s execution.')
@click.option(
'--detach-run',
'-d',
default=False,
is_flag=True,
help=('If True, as soon as a job is submitted, return from this call '
'and do not stream execution logs.'))
@_add_click_options(_TASK_OPTIONS + _EXTRA_RESOURCES_OPTIONS)
@usage_lib.entrypoint
# pylint: disable=redefined-builtin
Expand Down Expand Up @@ -2801,12 +2817,13 @@ def spot():
type=int,
required=False,
help=('OS disk size in GBs.'))
@click.option('--detach-run',
'-d',
default=False,
is_flag=True,
help='If True, run setup first (blocking), '
'then detach from the job\'s execution.')
@click.option(
'--detach-run',
'-d',
default=False,
is_flag=True,
help=('If True, as soon as a job is submitted, return from this call '
'and do not stream execution logs.'))
@click.option(
'--retry-until-up',
'-r',
Expand Down
Loading

0 comments on commit a530df1

Please sign in to comment.