Skip to content

Commit

Permalink
Expanded Job Queue (#1636)
Browse files Browse the repository at this point in the history
* working expanded queue

* added event

* updated testing and bug fixes

* fixed missed space

* formatting fix

* fixed setting up status + enabled spot

* format

* fixed setting up test

* addressed comments

* change stop lock range

* error message for needing update

* removed wandb.py

* added skylet restart

* updates

* addressed comments

* formatting + minor comment addressing

* more comments addressed

* Fix rich status

* fixed stalling issue

* format

* small test bug

* fixed skylet launch issue

* formatting + forgetten file

* more formatting + remove check

* addressed comments and removed pkill usage

* schedule after setup too

* Update sky/backends/cloud_vm_ray_backend.py

Co-authored-by: Zhanghao Wu <[email protected]>

* addressed more comments

* formatting

* Address comments

* faster job scheduling

* format

* Fix cancellation logic

* Don't schedule a job after reboot

* add comment

* revert changes in test

* Add test for cancelling pending jobs

* Make update job status more readable

* schedule more frequently for job cancelling

---------

Co-authored-by: Zhanghao Wu <[email protected]>
  • Loading branch information
mraheja and Michaelvll authored Jun 6, 2023
1 parent 03e097d commit f431fc6
Show file tree
Hide file tree
Showing 8 changed files with 369 additions and 123 deletions.
173 changes: 109 additions & 64 deletions sky/backends/cloud_vm_ray_backend.py
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,9 @@
pathlib.Path(sky.__file__).resolve().parent / 'backends' /
'monkey_patches' / 'monkey_patch_ray_up.py')

# Restart skylet when the version does not match to keep the skylet up-to-date.
_MAYBE_SKYLET_RESTART_CMD = 'python3 -m sky.skylet.attempt_skylet'


def _get_cluster_config_template(cloud):
cloud_to_template = {
Expand Down Expand Up @@ -198,13 +201,11 @@ def __init__(self):
def add_prologue(self,
job_id: int,
spot_task: Optional['task_lib.Task'] = None,
setup_cmd: Optional[str] = None,
envs: Optional[Dict[str, str]] = None,
setup_log_path: Optional[str] = None,
is_local: bool = False) -> None:
assert not self._has_prologue, 'add_prologue() called twice?'
self._has_prologue = True
self.job_id = job_id
self.is_local = is_local
# Should use 'auto' or 'ray://<internal_head_ip>:10001' rather than
# 'ray://localhost:10001', or 'ray://127.0.0.1:10001', for public cloud.
# Otherwise, ray will fail to get the placement group because of a bug
Expand Down Expand Up @@ -260,7 +261,6 @@ def add_prologue(self,
inspect.getsource(log_lib.add_ray_env_vars),
inspect.getsource(log_lib.run_bash_command_with_log),
'run_bash_command_with_log = ray.remote(run_bash_command_with_log)',
f'setup_cmd = {setup_cmd!r}',
]
# Currently, the codegen program is/can only be submitted to the head
# node, due to using job_lib for updating job statuses, and using
Expand All @@ -272,46 +272,6 @@ def add_prologue(self,
if hasattr(autostop_lib, 'set_last_active_time_to_now'):
autostop_lib.set_last_active_time_to_now()
"""))
if setup_cmd is not None:
self._code += [
textwrap.dedent(f"""\
_SETUP_CPUS = 0.0001
# The setup command will be run as a ray task with num_cpus=_SETUP_CPUS as the
# requirement; this means Ray will set CUDA_VISIBLE_DEVICES to an empty string.
# We unset it so that user setup command may properly use this env var.
setup_cmd = 'unset CUDA_VISIBLE_DEVICES; ' + setup_cmd
job_lib.set_status({job_id!r}, job_lib.JobStatus.SETTING_UP)
print({_CTRL_C_TIP_MESSAGE!r}, file=sys.stderr, flush=True)
total_num_nodes = len(ray.nodes())
setup_bundles = [{{"CPU": _SETUP_CPUS}} for _ in range(total_num_nodes)]
setup_pg = ray.util.placement_group(setup_bundles, strategy='STRICT_SPREAD')
ray.get(setup_pg.ready())
setup_workers = [run_bash_command_with_log \\
.options(name='setup', num_cpus=_SETUP_CPUS, placement_group=setup_pg, placement_group_bundle_index=i) \\
.remote(
setup_cmd,
os.path.expanduser({setup_log_path!r}),
getpass.getuser(),
job_id={self.job_id},
env_vars={envs!r},
stream_logs=True,
with_ray=True,
use_sudo={is_local},
) for i in range(total_num_nodes)]
setup_returncodes = ray.get(setup_workers)
if sum(setup_returncodes) != 0:
job_lib.set_status({self.job_id!r}, job_lib.JobStatus.FAILED_SETUP)
# This waits for all streaming logs to finish.
time.sleep(1)
print('ERROR: {colorama.Fore.RED}Job {self.job_id}\\'s setup failed with '
'return code list:{colorama.Style.RESET_ALL}',
setup_returncodes,
file=sys.stderr,
flush=True)
# Need this to set the job status in ray job to be FAILED.
sys.exit(1)
""")
]
self._code += [
f'job_lib.set_status({job_id!r}, job_lib.JobStatus.PENDING)',
]
Expand All @@ -324,20 +284,24 @@ def add_prologue(self,
f'{job_id}, {spot_task.name!r}, {resources_str!r})',
]

def add_gang_scheduling_placement_group(
def add_gang_scheduling_placement_group_and_setup(
self,
num_nodes: int,
accelerator_dict: Optional[Dict[str, float]],
stable_cluster_internal_ips: List[str],
setup_cmd: Optional[str] = None,
setup_log_path: Optional[str] = None,
envs: Optional[Dict[str, str]] = None,
) -> None:
"""Create the gang scheduling placement group for a Task.
cluster_ips_sorted is used to ensure that the SKY_NODE_RANK environment
variable is assigned in a deterministic order whenever a new task is
added.
"""
assert self._has_prologue, ('Call add_prologue() before '
'add_gang_scheduling_placement_group().')
assert self._has_prologue, (
'Call add_prologue() before '
'add_gang_scheduling_placement_group_and_setup().')
self._has_gang_scheduling = True
self._num_nodes = num_nodes

Expand Down Expand Up @@ -370,7 +334,7 @@ def add_gang_scheduling_placement_group(
plural = 's' if {num_nodes} > 1 else ''
node_str = f'{num_nodes} node{{plural}}'
message = '' if setup_cmd is not None else {_CTRL_C_TIP_MESSAGE!r} + '\\n'
message = {_CTRL_C_TIP_MESSAGE!r} + '\\n'
message += f'INFO: Waiting for task resources on {{node_str}}. This will block if the cluster is full.'
print(message,
file=sys.stderr,
Expand All @@ -382,10 +346,62 @@ def add_gang_scheduling_placement_group(
print('INFO: All task resources reserved.',
file=sys.stderr,
flush=True)
job_lib.set_job_started({self.job_id!r})
""")
]

job_id = self.job_id
if setup_cmd is not None:
self._code += [
textwrap.dedent(f"""\
setup_cmd = {setup_cmd!r}
_SETUP_CPUS = 0.0001
# The setup command will be run as a ray task with num_cpus=_SETUP_CPUS as the
# requirement; this means Ray will set CUDA_VISIBLE_DEVICES to an empty string.
# We unset it so that user setup command may properly use this env var.
setup_cmd = 'unset CUDA_VISIBLE_DEVICES; ' + setup_cmd
job_lib.set_status({job_id!r}, job_lib.JobStatus.SETTING_UP)
# The schedule_step should be called after the job status is set to non-PENDING,
# otherwise, the scheduler will think the current job is not submitted yet, and
# skip the scheduling step.
job_lib.scheduler.schedule_step()
total_num_nodes = len(ray.nodes())
setup_bundles = [{{"CPU": _SETUP_CPUS}} for _ in range(total_num_nodes)]
setup_pg = ray.util.placement_group(setup_bundles, strategy='STRICT_SPREAD')
setup_workers = [run_bash_command_with_log \\
.options(name='setup', num_cpus=_SETUP_CPUS, placement_group=setup_pg, placement_group_bundle_index=i) \\
.remote(
setup_cmd,
os.path.expanduser({setup_log_path!r}),
getpass.getuser(),
job_id={self.job_id},
env_vars={envs!r},
stream_logs=True,
with_ray=True,
use_sudo={self.is_local},
) for i in range(total_num_nodes)]
setup_returncodes = ray.get(setup_workers)
if sum(setup_returncodes) != 0:
job_lib.set_status({self.job_id!r}, job_lib.JobStatus.FAILED_SETUP)
# This waits for all streaming logs to finish.
time.sleep(1)
print('ERROR: {colorama.Fore.RED}Job {self.job_id}\\'s setup failed with '
'return code list:{colorama.Style.RESET_ALL}',
setup_returncodes,
file=sys.stderr,
flush=True)
# Need this to set the job status in ray job to be FAILED.
sys.exit(1)
""")
]

self._code.append(f'job_lib.set_job_started({self.job_id!r})')
if setup_cmd is None:
# Need to call schedule_step() to make sure the scheduler
# schedule the next pending job.
self._code.append('job_lib.scheduler.schedule_step()')

# Export IP and node rank to the environment variables.
self._code += [
textwrap.dedent(f"""\
Expand Down Expand Up @@ -414,7 +430,7 @@ def register_run_fn(self, run_fn: str, run_fn_name: str) -> None:
run_fn: The run function to be run on the remote cluster.
"""
assert self._has_gang_scheduling, (
'Call add_gang_scheduling_placement_group() '
'Call add_gang_scheduling_placement_group_and_setup() '
'before register_run_fn().')
assert not self._has_register_run_fn, (
'register_run_fn() called twice?')
Expand All @@ -436,7 +452,8 @@ def add_ray_task(self,
use_sudo: bool = False) -> None:
"""Generates code for a ray remote task that runs a bash command."""
assert self._has_gang_scheduling, (
'Call add_gang_scheduling_placement_group() before add_ray_task().')
'Call add_gang_scheduling_placement_group_and_setup() before '
'add_ray_task().')
assert (not self._has_register_run_fn or
bash_script is None), ('bash_script should '
'be None when run_fn is registered.')
Expand Down Expand Up @@ -549,7 +566,8 @@ def add_epilogue(self) -> None:
if sum(returncodes) != 0:
job_lib.set_status({self.job_id!r}, job_lib.JobStatus.FAILED)
# This waits for all streaming logs to finish.
time.sleep(1)
job_lib.scheduler.schedule_step()
time.sleep(0.5)
print('ERROR: {colorama.Fore.RED}Job {self.job_id} failed with '
'return code list:{colorama.Style.RESET_ALL}',
returncodes,
Expand All @@ -562,7 +580,8 @@ def add_epilogue(self) -> None:
sys.stderr.flush()
job_lib.set_status({self.job_id!r}, job_lib.JobStatus.SUCCEEDED)
# This waits for all streaming logs to finish.
time.sleep(1)
job_lib.scheduler.schedule_step()
time.sleep(0.5)
""")
]

Expand Down Expand Up @@ -2495,6 +2514,14 @@ def _update_after_cluster_provisioned(
usage_lib.messages.usage.update_final_cluster_status(
global_user_state.ClusterStatus.UP)

# For backward compatability and robustness of skylet, it is restarted
with log_utils.safe_rich_status('Updating remote skylet'):
self.run_on_head(
handle,
_MAYBE_SKYLET_RESTART_CMD,
use_cached_head_ip=False,
)

# Update job queue to avoid stale jobs (when restarted), before
# setting the cluster to be ready.
if prev_cluster_status == global_user_state.ClusterStatus.INIT:
Expand Down Expand Up @@ -2755,15 +2782,30 @@ def _exec_code_on_head(
else:
job_submit_cmd = (
'RAY_DASHBOARD_PORT=$(python -c "from sky.skylet import job_lib; print(job_lib.get_job_submission_port())" 2> /dev/null || echo 8265);' # pylint: disable=line-too-long
f'{cd} && mkdir -p {remote_log_dir} && ray job submit '
f'{cd} && ray job submit '
'--address=http://127.0.0.1:$RAY_DASHBOARD_PORT '
f'--submission-id {ray_job_id} --no-wait '
f'"{executable} -u {script_path} > {remote_log_path} 2>&1"')

mkdir_code = (f'{cd} && mkdir -p {remote_log_dir} && '
f'touch {remote_log_path}')
code = job_lib.JobLibCodeGen.queue_job(job_id, job_submit_cmd)
job_submit_cmd = mkdir_code + ' && ' + code

returncode, stdout, stderr = self.run_on_head(handle,
job_submit_cmd,
stream_logs=False,
require_outputs=True)

if 'has no attribute' in stdout:
# Happens when someone calls `sky exec` but remote is outdated
# necessicating calling `sky launch`
with ux_utils.print_exception_no_traceback():
raise RuntimeError(
f'{colorama.Fore.RED}SkyPilot runtime is stale on the '
'remote cluster. To update, run: sky launch -c '
f'{handle.cluster_name}{colorama.Style.RESET_ALL}')

subprocess_utils.handle_returncode(returncode,
job_submit_cmd,
f'Failed to submit job {job_id}.',
Expand Down Expand Up @@ -3864,12 +3906,15 @@ def _execute_task_one_node(self, handle: CloudVmRayResourceHandle,
is_local = isinstance(handle.launched_resources.cloud, clouds.Local)
codegen.add_prologue(job_id,
spot_task=task.spot_task,
setup_cmd=self._setup_cmd,
envs=task.envs,
setup_log_path=os.path.join(log_dir, 'setup.log'),
is_local=is_local)
codegen.add_gang_scheduling_placement_group(
1, accelerator_dict, stable_cluster_internal_ips=internal_ips)
codegen.add_gang_scheduling_placement_group_and_setup(
1,
accelerator_dict,
stable_cluster_internal_ips=internal_ips,
setup_cmd=self._setup_cmd,
setup_log_path=os.path.join(log_dir, 'setup.log'),
envs=task.envs,
)

if callable(task.run):
run_fn_code = textwrap.dedent(inspect.getsource(task.run))
Expand Down Expand Up @@ -3929,14 +3974,14 @@ def _execute_task_n_nodes(self, handle: CloudVmRayResourceHandle,
is_local = isinstance(handle.launched_resources.cloud, clouds.Local)
codegen.add_prologue(job_id,
spot_task=task.spot_task,
setup_cmd=self._setup_cmd,
envs=task.envs,
setup_log_path=os.path.join(log_dir, 'setup.log'),
is_local=is_local)
codegen.add_gang_scheduling_placement_group(
codegen.add_gang_scheduling_placement_group_and_setup(
num_actual_nodes,
accelerator_dict,
stable_cluster_internal_ips=internal_ips)
stable_cluster_internal_ips=internal_ips,
setup_cmd=self._setup_cmd,
setup_log_path=os.path.join(log_dir, 'setup.log'),
envs=task.envs)

if callable(task.run):
run_fn_code = textwrap.dedent(inspect.getsource(task.run))
Expand Down
1 change: 1 addition & 0 deletions sky/execution.py
Original file line number Diff line number Diff line change
Expand Up @@ -261,6 +261,7 @@ def _execute(
task.sync_storage_mounts()

try:

if Stage.PROVISION in stages:
if handle is None:
handle = backend.provision(task,
Expand Down
41 changes: 41 additions & 0 deletions sky/skylet/attempt_skylet.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
"""Restarts skylet if version does not match"""

import os
import subprocess

from sky.skylet import constants

VERSION_FILE = os.path.expanduser(constants.SKYLET_VERSION_FILE)


def restart_skylet():
# Kills old skylet if it is running
subprocess.run(
'ps aux | grep "sky.skylet.skylet" | grep "python3 -m"'
'| awk \'{print $2}\' | xargs kill >> ~/.sky/skylet.log 2>&1',
shell=True,
check=False)
subprocess.run(
'nohup python3 -m sky.skylet.skylet'
' >> ~/.sky/skylet.log 2>&1 &',
shell=True,
check=True)
with open(VERSION_FILE, 'w') as v_f:
v_f.write(constants.SKYLET_VERSION)


proc = subprocess.run(
'ps aux | grep -v "grep" | grep "sky.skylet.skylet" | grep "python3 -m"',
shell=True,
check=False)

running = (proc.returncode == 0)

version_match = False
if os.path.exists(VERSION_FILE):
with open(VERSION_FILE) as f:
if f.read().strip() == constants.SKYLET_VERSION:
version_match = True

if not running or not version_match:
restart_skylet()
3 changes: 3 additions & 0 deletions sky/skylet/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,3 +26,6 @@
'command to initialize it locally: sky launch -c {cluster} \'\'')

JOB_ID_ENV_VAR = 'SKYPILOT_JOB_ID'

SKYLET_VERSION = '1'
SKYLET_VERSION_FILE = '~/.sky/skylet_version'
15 changes: 3 additions & 12 deletions sky/skylet/events.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
"""skylet events"""
import getpass
import math
import os
import re
Expand Down Expand Up @@ -53,20 +52,12 @@ def _run(self):
raise NotImplementedError


class JobUpdateEvent(SkyletEvent):
"""Skylet event for updating job status."""
class JobSchedulerEvent(SkyletEvent):
"""Skylet event for scheduling jobs"""
EVENT_INTERVAL_SECONDS = 300

# Only update status of the jobs after this many seconds of job submission,
# to avoid race condition with `ray job` to make sure it job has been
# correctly updated.
# TODO(zhwu): This number should be tuned based on heuristics.
_SUBMITTED_GAP_SECONDS = 60

def _run(self):
job_owner = getpass.getuser()
job_lib.update_status(job_owner,
submitted_gap_sec=self._SUBMITTED_GAP_SECONDS)
job_lib.scheduler.schedule_step()


class SpotJobUpdateEvent(SkyletEvent):
Expand Down
Loading

0 comments on commit f431fc6

Please sign in to comment.