Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Core] Add support for detach setup #1379

Merged
merged 22 commits into from
Nov 7, 2022
Merged
Show file tree
Hide file tree
Changes from 17 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion examples/job_queue/job_multinode.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,12 @@ num_nodes: 2

setup: |
echo "running setup"
sleep 80

run: |
timestamp=$(date +%s)
conda env list
for i in {1..120}; do
for i in {1..240}; do
echo "$timestamp $i"
sleep 1
done
8 changes: 5 additions & 3 deletions sky/backends/backend.py
Original file line number Diff line number Diff line change
Expand Up @@ -66,8 +66,9 @@ def sync_file_mounts(

@timeline.event
@usage_lib.messages.usage.update_runtime('setup')
def setup(self, handle: ResourceHandle, task: 'task_lib.Task') -> None:
return self._setup(handle, task)
def setup(self, handle: ResourceHandle, task: 'task_lib.Task',
detach_setup: bool) -> None:
return self._setup(handle, task, detach_setup)

def add_storage_objects(self, task: 'task_lib.Task') -> None:
raise NotImplementedError
Expand Down Expand Up @@ -123,7 +124,8 @@ def _sync_file_mounts(
) -> None:
raise NotImplementedError

def _setup(self, handle: ResourceHandle, task: 'task_lib.Task') -> None:
def _setup(self, handle: ResourceHandle, task: 'task_lib.Task',
detach_setup: bool) -> None:
raise NotImplementedError

def _execute(self, handle: ResourceHandle, task: 'task_lib.Task',
Expand Down
100 changes: 84 additions & 16 deletions sky/backends/cloud_vm_ray_backend.py
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,9 @@

_TPU_NOT_FOUND_ERROR = 'ERROR: (gcloud.compute.tpus.delete) NOT_FOUND'

_CTRL_C_TIP_MESSAGE = ('INFO: Tip: use Ctrl-C to exit log streaming '
'(task will not be killed).')

_MAX_RAY_UP_RETRY = 5

_JOB_ID_PATTERN = re.compile(r'Job ID: ([0-9]+)')
Expand Down Expand Up @@ -148,6 +151,9 @@ def __init__(self):
def add_prologue(self,
job_id: int,
spot_task: Optional['task_lib.Task'] = None,
setup_cmd: Optional[str] = None,
envs: Optional[Dict[str, str]] = None,
setup_log_path: Optional[str] = None,
is_local: bool = False) -> None:
assert not self._has_prologue, 'add_prologue() called twice?'
self._has_prologue = True
Expand Down Expand Up @@ -182,12 +188,11 @@ def add_prologue(self,
from sky.utils import log_utils

SKY_REMOTE_WORKDIR = {constants.SKY_REMOTE_WORKDIR!r}
job_lib.set_status({job_id!r}, job_lib.JobStatus.PENDING)

ray.init(address={ray_address!r}, namespace='__sky__{job_id}__', log_to_driver=True)

run_fn = None
futures = []"""),
futures = []
"""),
# FIXME: This is a hack to make sure that the functions can be found
# by ray.remote. This should be removed once we have a better way to
# specify dependencies for ray.
Expand All @@ -197,6 +202,46 @@ def add_prologue(self,
inspect.getsource(log_lib.add_ray_env_vars),
inspect.getsource(log_lib.run_bash_command_with_log),
'run_bash_command_with_log = ray.remote(run_bash_command_with_log)',
f'setup_cmd = {setup_cmd!r}'
]
if setup_cmd is not None:
self._code += [
textwrap.dedent(f"""\
_SETUP_CPUS = 0.0001
job_lib.set_status({job_id!r}, job_lib.JobStatus.SETTING_UP)
print({_CTRL_C_TIP_MESSAGE!r}, file=sys.stderr, flush=True)
total_num_nodes = len(ray.nodes())
setup_bundles = [{{"CPU": _SETUP_CPUS}} for _ in range(total_num_nodes)]
setup_pg = ray.util.placement_group(setup_bundles, strategy='STRICT_SPREAD')
ray.get(setup_pg.ready())
setup_workers = [run_bash_command_with_log \\
.options(name='setup', num_cpus=_SETUP_CPUS, placement_group=setup_pg, placement_group_bundle_index=i) \\
.remote(
setup_cmd,
os.path.expanduser({setup_log_path!r}),
getpass.getuser(),
job_id={self.job_id},
env_vars={envs!r},
stream_logs=True,
with_ray=True,
use_sudo={is_local},
) for i in range(total_num_nodes)]
setup_returncodes = ray.get(setup_workers)
if sum(setup_returncodes) != 0:
job_lib.set_status({self.job_id!r}, job_lib.JobStatus.FAILED_SETUP)
# This waits for all streaming logs to finish.
time.sleep(1)
print('ERROR: {colorama.Fore.RED}Job {self.job_id}'s setup failed with '
'return code list:{colorama.Style.RESET_ALL}',
setup_returncodes,
file=sys.stderr,
flush=True)
# Need this to set the job status in ray job to be FAILED.
sys.exit(1)
""")
]
self._code += [
f'job_lib.set_status({job_id!r}, job_lib.JobStatus.PENDING)',
]
if spot_task is not None:
# Add the spot job to spot queue table.
Expand Down Expand Up @@ -247,15 +292,15 @@ def add_gang_scheduling_placement_group(
**gpu_dict,
})

pack_mode = 'STRICT_SPREAD'
self._code += [
textwrap.dedent(f"""\
pg = ray_util.placement_group({json.dumps(bundles)}, {pack_mode!r})
pg = ray_util.placement_group({json.dumps(bundles)}, 'STRICT_SPREAD')
plural = 's' if {num_nodes} > 1 else ''
node_str = f'{num_nodes} node' + plural + '.'
print('INFO: Tip: use Ctrl-C to exit log streaming (task will not be killed).\\n'
'INFO: Waiting for task resources on ' + node_str +
' This will block if the cluster is full.',
node_str = f'{num_nodes} node{{plural}}'

message = '' if setup_cmd is not None else {_CTRL_C_TIP_MESSAGE!r} + '\\n'
message += f'INFO: Waiting for task resources on {{node_str}}. This will block if the cluster is full.'
print(message,
file=sys.stderr,
flush=True)
# FIXME: This will print the error message from autoscaler if
Expand Down Expand Up @@ -1613,6 +1658,11 @@ def __init__(self):
self._dag = None
self._optimize_target = None

# Command for running the setup script. It is only set when the
# setup needs to be run outside the self._setup() and as part of
# a job (--detach-setup).
self._setup_cmd = None

# --- Implementation of Backend APIs ---

def register_info(self, **kwargs) -> None:
Expand Down Expand Up @@ -1961,7 +2011,8 @@ def _sync_file_mounts(
self._execute_file_mounts(handle, all_file_mounts)
self._execute_storage_mounts(handle, storage_mounts)

def _setup(self, handle: ResourceHandle, task: task_lib.Task) -> None:
def _setup(self, handle: ResourceHandle, task: task_lib.Task,
detach_setup: bool) -> Optional[str]:
start = time.time()
style = colorama.Style
fore = colorama.Fore
Expand All @@ -1988,17 +2039,20 @@ def _setup(self, handle: ResourceHandle, task: task_lib.Task) -> None:
runners = command_runner.SSHCommandRunner.make_runner_list(
ip_list, *ssh_credentials)

# Need this `-i` option to make sure `source ~/.bashrc` work
setup_cmd = f'/bin/bash -i /tmp/{setup_file} 2>&1'

def _setup_node(runner: command_runner.SSHCommandRunner) -> int:
runner.rsync(source=setup_sh_path,
target=f'/tmp/{setup_file}',
up=True,
stream_logs=False)
# Need this `-i` option to make sure `source ~/.bashrc` work
cmd = f'/bin/bash -i /tmp/{setup_file} 2>&1'
if detach_setup:
return
setup_log_path = os.path.join(self.log_dir,
f'setup-{runner.ip}.log')
returncode = runner.run(
cmd,
setup_cmd,
log_path=setup_log_path,
process_stream=False,
)
Expand Down Expand Up @@ -2031,14 +2085,22 @@ def error_message() -> str:
return err_msg

subprocess_utils.handle_returncode(returncode=returncode,
command=cmd,
command=setup_cmd,
error_msg=error_message)

num_nodes = len(ip_list)
plural = 's' if num_nodes > 1 else ''
logger.info(f'{fore.CYAN}Running setup on {num_nodes} node{plural}.'
f'{style.RESET_ALL}')
if not detach_setup:
logger.info(
f'{fore.CYAN}Running setup on {num_nodes} node{plural}.'
f'{style.RESET_ALL}')
subprocess_utils.run_in_parallel(_setup_node, runners)

if detach_setup:
# Only set this when setup needs to be run outside the self._setup()
# as part of a job (--detach-setup).
self._setup_cmd = setup_cmd
concretevitamin marked this conversation as resolved.
Show resolved Hide resolved
return
logger.info(f'{fore.GREEN}Setup completed.{style.RESET_ALL}')
end = time.time()
logger.debug(f'Setup took {end - start} seconds.')
Expand Down Expand Up @@ -2994,6 +3056,9 @@ def _execute_task_one_node(self, handle: ResourceHandle,
is_local = isinstance(handle.launched_resources.cloud, clouds.Local)
codegen.add_prologue(job_id,
spot_task=task.spot_task,
setup_cmd=self._setup_cmd,
envs=task.envs,
setup_log_path=os.path.join(log_dir, 'setup.log'),
is_local=is_local)
codegen.add_gang_scheduling_placement_group(1, accelerator_dict)

Expand Down Expand Up @@ -3051,6 +3116,9 @@ def _execute_task_n_nodes(self, handle: ResourceHandle, task: task_lib.Task,
is_local = isinstance(handle.launched_resources.cloud, clouds.Local)
codegen.add_prologue(job_id,
spot_task=task.spot_task,
setup_cmd=self._setup_cmd,
envs=task.envs,
setup_log_path=os.path.join(log_dir, 'setup.log'),
is_local=is_local)
codegen.add_gang_scheduling_placement_group(
num_actual_nodes,
Expand Down
2 changes: 1 addition & 1 deletion sky/benchmark/benchmark_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -315,7 +315,7 @@ def _update_benchmark_result(benchmark_result: Dict[str, Any]) -> Optional[str]:

# Update the benchmark status.
if (cluster_status == global_user_state.ClusterStatus.INIT or
job_status in [job_lib.JobStatus.INIT, job_lib.JobStatus.PENDING]):
job_status < job_lib.JobStatus.RUNNING):
benchmark_status = benchmark_state.BenchmarkStatus.INIT
elif job_status == job_lib.JobStatus.RUNNING:
benchmark_status = benchmark_state.BenchmarkStatus.RUNNING
Expand Down
54 changes: 35 additions & 19 deletions sky/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -666,6 +666,7 @@ def _launch_with_confirm(
*,
dryrun: bool,
detach_run: bool,
detach_setup: bool = False,
no_confirm: bool = False,
idle_minutes_to_autostop: Optional[int] = None,
down: bool = False, # pylint: disable=redefined-outer-name
Expand Down Expand Up @@ -726,6 +727,7 @@ def _launch_with_confirm(
dryrun=dryrun,
stream_logs=True,
cluster_name=cluster,
detach_setup=detach_setup,
detach_run=detach_run,
backend=backend,
idle_minutes_to_autostop=idle_minutes_to_autostop,
Expand Down Expand Up @@ -1057,12 +1059,23 @@ def cli():
default=False,
is_flag=True,
help='If True, do not actually run the job.')
@click.option('--detach-run',
'-d',
default=False,
is_flag=True,
help='If True, run setup first (blocking), '
'then detach from the job\'s execution.')
@click.option(
'--detach-setup',
'-s',
default=False,
is_flag=True,
help=
('If True, run setup in non-interactive mode as part of the job itself. '
'You can safely ctrl-c to detach from logging, and it will not interrupt '
'the setup process. Setup can be cancelled by canceling the job via '
'`sky cancel`. Useful for long-running setup commands.'))
@click.option(
'--detach-run',
'-d',
default=False,
is_flag=True,
help=('If True, as soon as a job is submitted, return from this call '
'and do not stream execution logs.'))
@click.option('--docker',
'backend_name',
flag_value=backends.LocalDockerBackend.NAME,
Expand Down Expand Up @@ -1116,7 +1129,6 @@ def cli():
required=False,
help='Skip confirmation prompt.')
@click.option('--no-setup',
'-n',
is_flag=True,
default=False,
required=False,
Expand All @@ -1126,6 +1138,7 @@ def launch(
entrypoint: str,
cluster: Optional[str],
dryrun: bool,
detach_setup: bool,
detach_run: bool,
backend_name: Optional[str],
name: Optional[str],
Expand Down Expand Up @@ -1190,6 +1203,7 @@ def launch(
backend,
cluster,
dryrun=dryrun,
detach_setup=detach_setup,
detach_run=detach_run,
no_confirm=yes,
idle_minutes_to_autostop=idle_minutes_to_autostop,
Expand All @@ -1209,12 +1223,13 @@ def launch(
type=str,
nargs=-1,
**_get_shell_complete_args(_complete_file_name))
@click.option('--detach-run',
'-d',
default=False,
is_flag=True,
help='If True, run workdir syncing first (blocking), '
'then detach from the job\'s execution.')
@click.option(
'--detach-run',
'-d',
default=False,
is_flag=True,
help=('If True, as soon as a job is submitted, return from this call '
'and do not stream execution logs.'))
@_add_click_options(_TASK_OPTIONS + _EXTRA_RESOURCES_OPTIONS)
@usage_lib.entrypoint
# pylint: disable=redefined-builtin
Expand Down Expand Up @@ -2754,12 +2769,13 @@ def spot():
type=int,
required=False,
help=('OS disk size in GBs.'))
@click.option('--detach-run',
'-d',
default=False,
is_flag=True,
help='If True, run setup first (blocking), '
'then detach from the job\'s execution.')
@click.option(
'--detach-run',
'-d',
default=False,
is_flag=True,
help=('If True, as soon as a job is submitted, return from this call '
'and do not stream execution logs.'))
@click.option(
'--retry-until-up',
'-r',
Expand Down
Loading