Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Core] Add support for detach setup #1379

Merged
merged 22 commits into from
Nov 7, 2022
Merged
Show file tree
Hide file tree
Changes from 11 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion examples/job_queue/job_multinode.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,12 @@ num_nodes: 2

setup: |
echo "running setup"
sleep 80

run: |
timestamp=$(date +%s)
conda env list
for i in {1..120}; do
for i in {1..240}; do
echo "$timestamp $i"
sleep 1
done
8 changes: 5 additions & 3 deletions sky/backends/backend.py
Original file line number Diff line number Diff line change
Expand Up @@ -66,8 +66,9 @@ def sync_file_mounts(

@timeline.event
@usage_lib.messages.usage.update_runtime('setup')
def setup(self, handle: ResourceHandle, task: 'task_lib.Task') -> None:
return self._setup(handle, task)
def setup(self, handle: ResourceHandle, task: 'task_lib.Task',
detach_setup: bool) -> None:
return self._setup(handle, task, detach_setup)

def add_storage_objects(self, task: 'task_lib.Task') -> None:
raise NotImplementedError
Expand Down Expand Up @@ -123,7 +124,8 @@ def _sync_file_mounts(
) -> None:
raise NotImplementedError

def _setup(self, handle: ResourceHandle, task: 'task_lib.Task') -> None:
def _setup(self, handle: ResourceHandle, task: 'task_lib.Task',
detach_setup: bool) -> None:
raise NotImplementedError

def _execute(self, handle: ResourceHandle, task: 'task_lib.Task',
Expand Down
91 changes: 75 additions & 16 deletions sky/backends/cloud_vm_ray_backend.py
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,9 @@

_TPU_NOT_FOUND_ERROR = 'ERROR: (gcloud.compute.tpus.delete) NOT_FOUND'

_CTRL_C_TIP_MESSAGE = ('INFO: Tip: use Ctrl-C to exit log streaming '
'(task will not be killed).')

_MAX_RAY_UP_RETRY = 5

_JOB_ID_PATTERN = re.compile(r'Job ID: ([0-9]+)')
Expand Down Expand Up @@ -148,6 +151,9 @@ def __init__(self):
def add_prologue(self,
job_id: int,
spot_task: Optional['task_lib.Task'] = None,
setup_cmd: Optional[str] = None,
envs: Optional[Dict[str, str]] = None,
setup_log_path: Optional[str] = None,
is_local: bool = False) -> None:
assert not self._has_prologue, 'add_prologue() called twice?'
self._has_prologue = True
Expand Down Expand Up @@ -182,12 +188,11 @@ def add_prologue(self,
from sky.utils import log_utils

SKY_REMOTE_WORKDIR = {constants.SKY_REMOTE_WORKDIR!r}
job_lib.set_status({job_id!r}, job_lib.JobStatus.PENDING)

ray.init(address={ray_address!r}, namespace='__sky__{job_id}__', log_to_driver=True)

run_fn = None
futures = []"""),
futures = []
"""),
# FIXME: This is a hack to make sure that the functions can be found
# by ray.remote. This should be removed once we have a better way to
# specify dependencies for ray.
Expand All @@ -197,6 +202,36 @@ def add_prologue(self,
inspect.getsource(log_lib.add_ray_env_vars),
inspect.getsource(log_lib.run_bash_command_with_log),
'run_bash_command_with_log = ray.remote(run_bash_command_with_log)',
f'setup_cmd = {setup_cmd!r}'
]
if setup_cmd is not None:
self._code += [
textwrap.dedent(f"""\
_SETUP_CPUS = 0.0001
job_lib.set_status({job_id!r}, job_lib.JobStatus.SETUP)
print({_CTRL_C_TIP_MESSAGE!r}, file=sys.stderr, flush=True)
total_num_nodes = len(ray.nodes())
bundles = [{{"CPU": _SETUP_CPUS}} for _ in range(total_num_nodes)]
pg = ray.util.placement_group(bundles, strategy='STRICT_SPREAD')
Michaelvll marked this conversation as resolved.
Show resolved Hide resolved
ray.get(pg.ready())
setup_workers = [run_bash_command_with_log \\
.options(name='setup', num_cpus=_SETUP_CPUS, placement_group=pg, placement_group_bundle_index=i) \\
.remote(
setup_cmd,
os.path.expanduser({setup_log_path!r}),
getpass.getuser(),
job_id={self.job_id},
env_vars={envs!r},
stream_logs=True,
with_ray=True,
use_sudo={is_local},
) for i in range(total_num_nodes)]
ray.get(setup_workers)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: setup_tasks?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will _tasks be a bit confusing with the sky task?

time.sleep(1)
print('INFO: Setup finished.', file=sys.stderr, flush=True)""")
]
self._code += [
f'job_lib.set_status({job_id!r}, job_lib.JobStatus.PENDING)',
]
if spot_task is not None:
# Add the spot job to spot queue table.
Expand Down Expand Up @@ -247,15 +282,15 @@ def add_gang_scheduling_placement_group(
**gpu_dict,
})

pack_mode = 'STRICT_SPREAD'
self._code += [
textwrap.dedent(f"""\
pg = ray_util.placement_group({json.dumps(bundles)}, {pack_mode!r})
pg = ray_util.placement_group({json.dumps(bundles)}, 'STRICT_SPREAD')
plural = 's' if {num_nodes} > 1 else ''
node_str = f'{num_nodes} node' + plural + '.'
print('INFO: Tip: use Ctrl-C to exit log streaming (task will not be killed).\\n'
'INFO: Waiting for task resources on ' + node_str +
' This will block if the cluster is full.',
node_str = f'{num_nodes} node{{plural}}'

message = '' if setup_cmd is not None else {_CTRL_C_TIP_MESSAGE!r} + '\\n'
message += f'INFO: Waiting for task resources on {{node_str}}. This will block if the cluster is full.'
print(message,
file=sys.stderr,
flush=True)
# FIXME: This will print the error message from autoscaler if
Expand Down Expand Up @@ -1613,6 +1648,10 @@ def __init__(self):
self._dag = None
self._optimize_target = None

# Command for running the setup script. It is only set when the
# setup needs to be run outside the self._setup().
Michaelvll marked this conversation as resolved.
Show resolved Hide resolved
self._setup_cmd = None

# --- Implementation of Backend APIs ---

def register_info(self, **kwargs) -> None:
Expand Down Expand Up @@ -1961,7 +2000,8 @@ def _sync_file_mounts(
self._execute_file_mounts(handle, all_file_mounts)
self._execute_storage_mounts(handle, storage_mounts)

def _setup(self, handle: ResourceHandle, task: task_lib.Task) -> None:
def _setup(self, handle: ResourceHandle, task: task_lib.Task,
detach_setup: bool) -> Optional[str]:
start = time.time()
style = colorama.Style
fore = colorama.Fore
Expand All @@ -1988,17 +2028,20 @@ def _setup(self, handle: ResourceHandle, task: task_lib.Task) -> None:
runners = command_runner.SSHCommandRunner.make_runner_list(
ip_list, *ssh_credentials)

# Need this `-i` option to make sure `source ~/.bashrc` work
setup_cmd = f'/bin/bash -i /tmp/{setup_file} 2>&1'

def _setup_node(runner: command_runner.SSHCommandRunner) -> int:
runner.rsync(source=setup_sh_path,
target=f'/tmp/{setup_file}',
up=True,
stream_logs=False)
# Need this `-i` option to make sure `source ~/.bashrc` work
cmd = f'/bin/bash -i /tmp/{setup_file} 2>&1'
if detach_setup:
return
setup_log_path = os.path.join(self.log_dir,
f'setup-{runner.ip}.log')
returncode = runner.run(
cmd,
setup_cmd,
log_path=setup_log_path,
process_stream=False,
)
Expand Down Expand Up @@ -2031,14 +2074,24 @@ def error_message() -> str:
return err_msg

subprocess_utils.handle_returncode(returncode=returncode,
command=cmd,
command=setup_cmd,
error_msg=error_message)

num_nodes = len(ip_list)
plural = 's' if num_nodes > 1 else ''
logger.info(f'{fore.CYAN}Running setup on {num_nodes} node{plural}.'
f'{style.RESET_ALL}')
if detach_setup:
logger.info(
f'{fore.CYAN}Preparing setup for {num_nodes} node{plural}.'
Michaelvll marked this conversation as resolved.
Show resolved Hide resolved
f'{style.RESET_ALL}')
else:
logger.info(
f'{fore.CYAN}Running setup on {num_nodes} node{plural}.'
f'{style.RESET_ALL}')
subprocess_utils.run_in_parallel(_setup_node, runners)

if detach_setup:
self._setup_cmd = setup_cmd
concretevitamin marked this conversation as resolved.
Show resolved Hide resolved
return
logger.info(f'{fore.GREEN}Setup completed.{style.RESET_ALL}')
end = time.time()
logger.debug(f'Setup took {end - start} seconds.')
Expand Down Expand Up @@ -2994,6 +3047,9 @@ def _execute_task_one_node(self, handle: ResourceHandle,
is_local = isinstance(handle.launched_resources.cloud, clouds.Local)
codegen.add_prologue(job_id,
spot_task=task.spot_task,
setup_cmd=self._setup_cmd,
envs=task.envs,
setup_log_path=os.path.join(log_dir, 'setup.log'),
is_local=is_local)
codegen.add_gang_scheduling_placement_group(1, accelerator_dict)

Expand Down Expand Up @@ -3051,6 +3107,9 @@ def _execute_task_n_nodes(self, handle: ResourceHandle, task: task_lib.Task,
is_local = isinstance(handle.launched_resources.cloud, clouds.Local)
codegen.add_prologue(job_id,
spot_task=task.spot_task,
setup_cmd=self._setup_cmd,
envs=task.envs,
setup_log_path=os.path.join(log_dir, 'setup.log'),
is_local=is_local)
codegen.add_gang_scheduling_placement_group(
num_actual_nodes,
Expand Down
2 changes: 1 addition & 1 deletion sky/benchmark/benchmark_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -315,7 +315,7 @@ def _update_benchmark_result(benchmark_result: Dict[str, Any]) -> Optional[str]:

# Update the benchmark status.
if (cluster_status == global_user_state.ClusterStatus.INIT or
job_status in [job_lib.JobStatus.INIT, job_lib.JobStatus.PENDING]):
job_status < job_lib.JobStatus.RUNNING):
benchmark_status = benchmark_state.BenchmarkStatus.INIT
elif job_status == job_lib.JobStatus.RUNNING:
benchmark_status = benchmark_state.BenchmarkStatus.RUNNING
Expand Down
10 changes: 9 additions & 1 deletion sky/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -666,6 +666,7 @@ def _launch_with_confirm(
*,
dryrun: bool,
detach_run: bool,
detach_setup: bool = False,
no_confirm: bool = False,
idle_minutes_to_autostop: Optional[int] = None,
down: bool = False, # pylint: disable=redefined-outer-name
Expand Down Expand Up @@ -726,6 +727,7 @@ def _launch_with_confirm(
dryrun=dryrun,
stream_logs=True,
cluster_name=cluster,
detach_setup=detach_setup,
detach_run=detach_run,
backend=backend,
idle_minutes_to_autostop=idle_minutes_to_autostop,
Expand Down Expand Up @@ -1057,6 +1059,11 @@ def cli():
default=False,
is_flag=True,
help='If True, do not actually run the job.')
@click.option('--detach-setup',
'-s',
default=False,
is_flag=True,
help='if True, run setup asynchronously.')
@click.option('--detach-run',
'-d',
default=False,
Expand Down Expand Up @@ -1116,7 +1123,6 @@ def cli():
required=False,
help='Skip confirmation prompt.')
@click.option('--no-setup',
'-n',
is_flag=True,
default=False,
required=False,
Expand All @@ -1126,6 +1132,7 @@ def launch(
entrypoint: str,
cluster: Optional[str],
dryrun: bool,
detach_setup: bool,
detach_run: bool,
backend_name: Optional[str],
name: Optional[str],
Expand Down Expand Up @@ -1190,6 +1197,7 @@ def launch(
backend,
cluster,
dryrun=dryrun,
detach_setup=detach_setup,
detach_run=detach_run,
no_confirm=yes,
idle_minutes_to_autostop=idle_minutes_to_autostop,
Expand Down
9 changes: 7 additions & 2 deletions sky/execution.py
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,7 @@ def _execute(
optimize_target: OptimizeTarget = OptimizeTarget.COST,
stages: Optional[List[Stage]] = None,
cluster_name: Optional[str] = None,
detach_setup: bool = False,
detach_run: bool = False,
idle_minutes_to_autostop: Optional[int] = None,
no_setup: bool = False,
Expand Down Expand Up @@ -146,6 +147,7 @@ def _execute(
skipping all setup steps.
cluster_name: Name of the cluster to create/reuse. If None,
auto-generate a name.
detach_setup: bool; whether to run the setup asynchronously.
detach_run: bool; whether to detach the process after the job submitted.
Michaelvll marked this conversation as resolved.
Show resolved Hide resolved
idle_minutes_to_autostop: int; if provided, the cluster will be set to
autostop after this many minutes of idleness.
Expand Down Expand Up @@ -240,7 +242,7 @@ def _execute(
if no_setup:
logger.info('Setup commands skipped.')
elif Stage.SETUP in stages:
backend.setup(handle, task)
backend.setup(handle, task, detach_setup=detach_setup)

if Stage.PRE_EXEC in stages:
if idle_minutes_to_autostop is not None:
Expand Down Expand Up @@ -289,6 +291,7 @@ def launch(
stream_logs: bool = True,
backend: Optional[backends.Backend] = None,
optimize_target: OptimizeTarget = OptimizeTarget.COST,
detach_setup: bool = False,
detach_run: bool = False,
no_setup: bool = False,
) -> None:
Expand Down Expand Up @@ -328,7 +331,8 @@ def launch(
(CloudVMRayBackend).
optimize_target: target to optimize for. Choices: OptimizeTarget.COST,
OptimizeTarget.TIME.
detach_run: If True, run setup first (blocking), then detach from the
detach_setup: If True, run setup asynchronously.
detach_run: If True, run setup first, then detach from the
job's execution.
Michaelvll marked this conversation as resolved.
Show resolved Hide resolved
no_setup: if True, do not re-run setup commands.

Expand All @@ -355,6 +359,7 @@ def launch(
retry_until_up=retry_until_up,
optimize_target=optimize_target,
cluster_name=cluster_name,
detach_setup=detach_setup,
detach_run=detach_run,
idle_minutes_to_autostop=idle_minutes_to_autostop,
no_setup=no_setup,
Expand Down
Loading