From f431fc6fa318cf4780ef009298765141350d0244 Mon Sep 17 00:00:00 2001 From: Mehul Raheja Date: Tue, 6 Jun 2023 03:07:53 -0700 Subject: [PATCH] Expanded Job Queue (#1636) * working expanded queue * added event * updated testing and bug fixes * fixed missed space * formatting fix * fixed setting up status + enabled spot * format * fixed setting up test * addressed comments * change stop lock range * error message for needing update * removed wandb.py * added skylet restart * updates * addressed comments * formatting + minor comment addressing * more comments addressed * Fix rich status * fixed stalling issue * format * small test bug * fixed skylet launch issue * formatting + forgetten file * more formatting + remove check * addressed comments and removed pkill usage * schedule after setup too * Update sky/backends/cloud_vm_ray_backend.py Co-authored-by: Zhanghao Wu * addressed more comments * formatting * Address comments * faster job scheduling * format * Fix cancellation logic * Don't schedule a job after reboot * add comment * revert changes in test * Add test for cancelling pending jobs * Make update job status more readable * schedule more frequently for job cancelling --------- Co-authored-by: Zhanghao Wu --- sky/backends/cloud_vm_ray_backend.py | 173 ++++++++++++++--------- sky/execution.py | 1 + sky/skylet/attempt_skylet.py | 41 ++++++ sky/skylet/constants.py | 3 + sky/skylet/events.py | 15 +- sky/skylet/job_lib.py | 197 +++++++++++++++++++++------ sky/skylet/skylet.py | 7 +- tests/test_smoke.py | 55 +++++++- 8 files changed, 369 insertions(+), 123 deletions(-) create mode 100644 sky/skylet/attempt_skylet.py diff --git a/sky/backends/cloud_vm_ray_backend.py b/sky/backends/cloud_vm_ray_backend.py index a274c9d9b5a..2073a2f453f 100644 --- a/sky/backends/cloud_vm_ray_backend.py +++ b/sky/backends/cloud_vm_ray_backend.py @@ -119,6 +119,9 @@ pathlib.Path(sky.__file__).resolve().parent / 'backends' / 'monkey_patches' / 'monkey_patch_ray_up.py') +# Restart skylet when the version does not match to keep the skylet up-to-date. +_MAYBE_SKYLET_RESTART_CMD = 'python3 -m sky.skylet.attempt_skylet' + def _get_cluster_config_template(cloud): cloud_to_template = { @@ -198,13 +201,11 @@ def __init__(self): def add_prologue(self, job_id: int, spot_task: Optional['task_lib.Task'] = None, - setup_cmd: Optional[str] = None, - envs: Optional[Dict[str, str]] = None, - setup_log_path: Optional[str] = None, is_local: bool = False) -> None: assert not self._has_prologue, 'add_prologue() called twice?' self._has_prologue = True self.job_id = job_id + self.is_local = is_local # Should use 'auto' or 'ray://:10001' rather than # 'ray://localhost:10001', or 'ray://127.0.0.1:10001', for public cloud. # Otherwise, ray will fail to get the placement group because of a bug @@ -260,7 +261,6 @@ def add_prologue(self, inspect.getsource(log_lib.add_ray_env_vars), inspect.getsource(log_lib.run_bash_command_with_log), 'run_bash_command_with_log = ray.remote(run_bash_command_with_log)', - f'setup_cmd = {setup_cmd!r}', ] # Currently, the codegen program is/can only be submitted to the head # node, due to using job_lib for updating job statuses, and using @@ -272,46 +272,6 @@ def add_prologue(self, if hasattr(autostop_lib, 'set_last_active_time_to_now'): autostop_lib.set_last_active_time_to_now() """)) - if setup_cmd is not None: - self._code += [ - textwrap.dedent(f"""\ - _SETUP_CPUS = 0.0001 - # The setup command will be run as a ray task with num_cpus=_SETUP_CPUS as the - # requirement; this means Ray will set CUDA_VISIBLE_DEVICES to an empty string. - # We unset it so that user setup command may properly use this env var. - setup_cmd = 'unset CUDA_VISIBLE_DEVICES; ' + setup_cmd - job_lib.set_status({job_id!r}, job_lib.JobStatus.SETTING_UP) - print({_CTRL_C_TIP_MESSAGE!r}, file=sys.stderr, flush=True) - total_num_nodes = len(ray.nodes()) - setup_bundles = [{{"CPU": _SETUP_CPUS}} for _ in range(total_num_nodes)] - setup_pg = ray.util.placement_group(setup_bundles, strategy='STRICT_SPREAD') - ray.get(setup_pg.ready()) - setup_workers = [run_bash_command_with_log \\ - .options(name='setup', num_cpus=_SETUP_CPUS, placement_group=setup_pg, placement_group_bundle_index=i) \\ - .remote( - setup_cmd, - os.path.expanduser({setup_log_path!r}), - getpass.getuser(), - job_id={self.job_id}, - env_vars={envs!r}, - stream_logs=True, - with_ray=True, - use_sudo={is_local}, - ) for i in range(total_num_nodes)] - setup_returncodes = ray.get(setup_workers) - if sum(setup_returncodes) != 0: - job_lib.set_status({self.job_id!r}, job_lib.JobStatus.FAILED_SETUP) - # This waits for all streaming logs to finish. - time.sleep(1) - print('ERROR: {colorama.Fore.RED}Job {self.job_id}\\'s setup failed with ' - 'return code list:{colorama.Style.RESET_ALL}', - setup_returncodes, - file=sys.stderr, - flush=True) - # Need this to set the job status in ray job to be FAILED. - sys.exit(1) - """) - ] self._code += [ f'job_lib.set_status({job_id!r}, job_lib.JobStatus.PENDING)', ] @@ -324,11 +284,14 @@ def add_prologue(self, f'{job_id}, {spot_task.name!r}, {resources_str!r})', ] - def add_gang_scheduling_placement_group( + def add_gang_scheduling_placement_group_and_setup( self, num_nodes: int, accelerator_dict: Optional[Dict[str, float]], stable_cluster_internal_ips: List[str], + setup_cmd: Optional[str] = None, + setup_log_path: Optional[str] = None, + envs: Optional[Dict[str, str]] = None, ) -> None: """Create the gang scheduling placement group for a Task. @@ -336,8 +299,9 @@ def add_gang_scheduling_placement_group( variable is assigned in a deterministic order whenever a new task is added. """ - assert self._has_prologue, ('Call add_prologue() before ' - 'add_gang_scheduling_placement_group().') + assert self._has_prologue, ( + 'Call add_prologue() before ' + 'add_gang_scheduling_placement_group_and_setup().') self._has_gang_scheduling = True self._num_nodes = num_nodes @@ -370,7 +334,7 @@ def add_gang_scheduling_placement_group( plural = 's' if {num_nodes} > 1 else '' node_str = f'{num_nodes} node{{plural}}' - message = '' if setup_cmd is not None else {_CTRL_C_TIP_MESSAGE!r} + '\\n' + message = {_CTRL_C_TIP_MESSAGE!r} + '\\n' message += f'INFO: Waiting for task resources on {{node_str}}. This will block if the cluster is full.' print(message, file=sys.stderr, @@ -382,10 +346,62 @@ def add_gang_scheduling_placement_group( print('INFO: All task resources reserved.', file=sys.stderr, flush=True) - job_lib.set_job_started({self.job_id!r}) """) ] + job_id = self.job_id + if setup_cmd is not None: + self._code += [ + textwrap.dedent(f"""\ + setup_cmd = {setup_cmd!r} + _SETUP_CPUS = 0.0001 + # The setup command will be run as a ray task with num_cpus=_SETUP_CPUS as the + # requirement; this means Ray will set CUDA_VISIBLE_DEVICES to an empty string. + # We unset it so that user setup command may properly use this env var. + setup_cmd = 'unset CUDA_VISIBLE_DEVICES; ' + setup_cmd + job_lib.set_status({job_id!r}, job_lib.JobStatus.SETTING_UP) + + # The schedule_step should be called after the job status is set to non-PENDING, + # otherwise, the scheduler will think the current job is not submitted yet, and + # skip the scheduling step. + job_lib.scheduler.schedule_step() + + total_num_nodes = len(ray.nodes()) + setup_bundles = [{{"CPU": _SETUP_CPUS}} for _ in range(total_num_nodes)] + setup_pg = ray.util.placement_group(setup_bundles, strategy='STRICT_SPREAD') + setup_workers = [run_bash_command_with_log \\ + .options(name='setup', num_cpus=_SETUP_CPUS, placement_group=setup_pg, placement_group_bundle_index=i) \\ + .remote( + setup_cmd, + os.path.expanduser({setup_log_path!r}), + getpass.getuser(), + job_id={self.job_id}, + env_vars={envs!r}, + stream_logs=True, + with_ray=True, + use_sudo={self.is_local}, + ) for i in range(total_num_nodes)] + setup_returncodes = ray.get(setup_workers) + if sum(setup_returncodes) != 0: + job_lib.set_status({self.job_id!r}, job_lib.JobStatus.FAILED_SETUP) + # This waits for all streaming logs to finish. + time.sleep(1) + print('ERROR: {colorama.Fore.RED}Job {self.job_id}\\'s setup failed with ' + 'return code list:{colorama.Style.RESET_ALL}', + setup_returncodes, + file=sys.stderr, + flush=True) + # Need this to set the job status in ray job to be FAILED. + sys.exit(1) + """) + ] + + self._code.append(f'job_lib.set_job_started({self.job_id!r})') + if setup_cmd is None: + # Need to call schedule_step() to make sure the scheduler + # schedule the next pending job. + self._code.append('job_lib.scheduler.schedule_step()') + # Export IP and node rank to the environment variables. self._code += [ textwrap.dedent(f"""\ @@ -414,7 +430,7 @@ def register_run_fn(self, run_fn: str, run_fn_name: str) -> None: run_fn: The run function to be run on the remote cluster. """ assert self._has_gang_scheduling, ( - 'Call add_gang_scheduling_placement_group() ' + 'Call add_gang_scheduling_placement_group_and_setup() ' 'before register_run_fn().') assert not self._has_register_run_fn, ( 'register_run_fn() called twice?') @@ -436,7 +452,8 @@ def add_ray_task(self, use_sudo: bool = False) -> None: """Generates code for a ray remote task that runs a bash command.""" assert self._has_gang_scheduling, ( - 'Call add_gang_scheduling_placement_group() before add_ray_task().') + 'Call add_gang_scheduling_placement_group_and_setup() before ' + 'add_ray_task().') assert (not self._has_register_run_fn or bash_script is None), ('bash_script should ' 'be None when run_fn is registered.') @@ -549,7 +566,8 @@ def add_epilogue(self) -> None: if sum(returncodes) != 0: job_lib.set_status({self.job_id!r}, job_lib.JobStatus.FAILED) # This waits for all streaming logs to finish. - time.sleep(1) + job_lib.scheduler.schedule_step() + time.sleep(0.5) print('ERROR: {colorama.Fore.RED}Job {self.job_id} failed with ' 'return code list:{colorama.Style.RESET_ALL}', returncodes, @@ -562,7 +580,8 @@ def add_epilogue(self) -> None: sys.stderr.flush() job_lib.set_status({self.job_id!r}, job_lib.JobStatus.SUCCEEDED) # This waits for all streaming logs to finish. - time.sleep(1) + job_lib.scheduler.schedule_step() + time.sleep(0.5) """) ] @@ -2495,6 +2514,14 @@ def _update_after_cluster_provisioned( usage_lib.messages.usage.update_final_cluster_status( global_user_state.ClusterStatus.UP) + # For backward compatability and robustness of skylet, it is restarted + with log_utils.safe_rich_status('Updating remote skylet'): + self.run_on_head( + handle, + _MAYBE_SKYLET_RESTART_CMD, + use_cached_head_ip=False, + ) + # Update job queue to avoid stale jobs (when restarted), before # setting the cluster to be ready. if prev_cluster_status == global_user_state.ClusterStatus.INIT: @@ -2755,15 +2782,30 @@ def _exec_code_on_head( else: job_submit_cmd = ( 'RAY_DASHBOARD_PORT=$(python -c "from sky.skylet import job_lib; print(job_lib.get_job_submission_port())" 2> /dev/null || echo 8265);' # pylint: disable=line-too-long - f'{cd} && mkdir -p {remote_log_dir} && ray job submit ' + f'{cd} && ray job submit ' '--address=http://127.0.0.1:$RAY_DASHBOARD_PORT ' f'--submission-id {ray_job_id} --no-wait ' f'"{executable} -u {script_path} > {remote_log_path} 2>&1"') + mkdir_code = (f'{cd} && mkdir -p {remote_log_dir} && ' + f'touch {remote_log_path}') + code = job_lib.JobLibCodeGen.queue_job(job_id, job_submit_cmd) + job_submit_cmd = mkdir_code + ' && ' + code + returncode, stdout, stderr = self.run_on_head(handle, job_submit_cmd, stream_logs=False, require_outputs=True) + + if 'has no attribute' in stdout: + # Happens when someone calls `sky exec` but remote is outdated + # necessicating calling `sky launch` + with ux_utils.print_exception_no_traceback(): + raise RuntimeError( + f'{colorama.Fore.RED}SkyPilot runtime is stale on the ' + 'remote cluster. To update, run: sky launch -c ' + f'{handle.cluster_name}{colorama.Style.RESET_ALL}') + subprocess_utils.handle_returncode(returncode, job_submit_cmd, f'Failed to submit job {job_id}.', @@ -3864,12 +3906,15 @@ def _execute_task_one_node(self, handle: CloudVmRayResourceHandle, is_local = isinstance(handle.launched_resources.cloud, clouds.Local) codegen.add_prologue(job_id, spot_task=task.spot_task, - setup_cmd=self._setup_cmd, - envs=task.envs, - setup_log_path=os.path.join(log_dir, 'setup.log'), is_local=is_local) - codegen.add_gang_scheduling_placement_group( - 1, accelerator_dict, stable_cluster_internal_ips=internal_ips) + codegen.add_gang_scheduling_placement_group_and_setup( + 1, + accelerator_dict, + stable_cluster_internal_ips=internal_ips, + setup_cmd=self._setup_cmd, + setup_log_path=os.path.join(log_dir, 'setup.log'), + envs=task.envs, + ) if callable(task.run): run_fn_code = textwrap.dedent(inspect.getsource(task.run)) @@ -3929,14 +3974,14 @@ def _execute_task_n_nodes(self, handle: CloudVmRayResourceHandle, is_local = isinstance(handle.launched_resources.cloud, clouds.Local) codegen.add_prologue(job_id, spot_task=task.spot_task, - setup_cmd=self._setup_cmd, - envs=task.envs, - setup_log_path=os.path.join(log_dir, 'setup.log'), is_local=is_local) - codegen.add_gang_scheduling_placement_group( + codegen.add_gang_scheduling_placement_group_and_setup( num_actual_nodes, accelerator_dict, - stable_cluster_internal_ips=internal_ips) + stable_cluster_internal_ips=internal_ips, + setup_cmd=self._setup_cmd, + setup_log_path=os.path.join(log_dir, 'setup.log'), + envs=task.envs) if callable(task.run): run_fn_code = textwrap.dedent(inspect.getsource(task.run)) diff --git a/sky/execution.py b/sky/execution.py index be4bcbe5d22..362a66d4156 100644 --- a/sky/execution.py +++ b/sky/execution.py @@ -261,6 +261,7 @@ def _execute( task.sync_storage_mounts() try: + if Stage.PROVISION in stages: if handle is None: handle = backend.provision(task, diff --git a/sky/skylet/attempt_skylet.py b/sky/skylet/attempt_skylet.py new file mode 100644 index 00000000000..b526d0642f6 --- /dev/null +++ b/sky/skylet/attempt_skylet.py @@ -0,0 +1,41 @@ +"""Restarts skylet if version does not match""" + +import os +import subprocess + +from sky.skylet import constants + +VERSION_FILE = os.path.expanduser(constants.SKYLET_VERSION_FILE) + + +def restart_skylet(): + # Kills old skylet if it is running + subprocess.run( + 'ps aux | grep "sky.skylet.skylet" | grep "python3 -m"' + '| awk \'{print $2}\' | xargs kill >> ~/.sky/skylet.log 2>&1', + shell=True, + check=False) + subprocess.run( + 'nohup python3 -m sky.skylet.skylet' + ' >> ~/.sky/skylet.log 2>&1 &', + shell=True, + check=True) + with open(VERSION_FILE, 'w') as v_f: + v_f.write(constants.SKYLET_VERSION) + + +proc = subprocess.run( + 'ps aux | grep -v "grep" | grep "sky.skylet.skylet" | grep "python3 -m"', + shell=True, + check=False) + +running = (proc.returncode == 0) + +version_match = False +if os.path.exists(VERSION_FILE): + with open(VERSION_FILE) as f: + if f.read().strip() == constants.SKYLET_VERSION: + version_match = True + +if not running or not version_match: + restart_skylet() diff --git a/sky/skylet/constants.py b/sky/skylet/constants.py index 1d4e72dbec4..e260da766b9 100644 --- a/sky/skylet/constants.py +++ b/sky/skylet/constants.py @@ -26,3 +26,6 @@ 'command to initialize it locally: sky launch -c {cluster} \'\'') JOB_ID_ENV_VAR = 'SKYPILOT_JOB_ID' + +SKYLET_VERSION = '1' +SKYLET_VERSION_FILE = '~/.sky/skylet_version' diff --git a/sky/skylet/events.py b/sky/skylet/events.py index 36649c7a9fe..bf8bf7f9ebc 100644 --- a/sky/skylet/events.py +++ b/sky/skylet/events.py @@ -1,5 +1,4 @@ """skylet events""" -import getpass import math import os import re @@ -53,20 +52,12 @@ def _run(self): raise NotImplementedError -class JobUpdateEvent(SkyletEvent): - """Skylet event for updating job status.""" +class JobSchedulerEvent(SkyletEvent): + """Skylet event for scheduling jobs""" EVENT_INTERVAL_SECONDS = 300 - # Only update status of the jobs after this many seconds of job submission, - # to avoid race condition with `ray job` to make sure it job has been - # correctly updated. - # TODO(zhwu): This number should be tuned based on heuristics. - _SUBMITTED_GAP_SECONDS = 60 - def _run(self): - job_owner = getpass.getuser() - job_lib.update_status(job_owner, - submitted_gap_sec=self._SUBMITTED_GAP_SECONDS) + job_lib.scheduler.schedule_step() class SpotJobUpdateEvent(SkyletEvent): diff --git a/sky/skylet/job_lib.py b/sky/skylet/job_lib.py index 498862e5be2..dc12601c0fe 100644 --- a/sky/skylet/job_lib.py +++ b/sky/skylet/job_lib.py @@ -6,13 +6,16 @@ import json import os import pathlib +import psutil import shlex +import subprocess import time import typing -from typing import Any, Dict, List, Optional +from typing import Any, Dict, List, Optional, Tuple import colorama import filelock +import getpass from sky import sky_logging from sky.skylet import constants @@ -62,6 +65,13 @@ def create_table(cursor, conn): run_timestamp TEXT CANDIDATE KEY, start_at FLOAT DEFAULT -1)""") + cursor.execute("""CREATE TABLE IF NOT EXISTS pending_jobs( + job_id INTEGER, + run_cmd TEXT, + submit INTEGER, + created_time INTEGER + )""") + db_utils.add_column_to_table(cursor, conn, 'jobs', 'end_at', 'FLOAT') db_utils.add_column_to_table(cursor, conn, 'jobs', 'resources', 'TEXT') @@ -83,15 +93,15 @@ class JobStatus(enum.Enum): # In the 'jobs' table, the `submitted_at` column will be set to the current # time, when the job is firstly created (in the INIT state). INIT = 'INIT' + # The job is waiting for the required resources. (`ray job status` + # shows RUNNING as the generated ray program has started, but blocked + # by the placement constraints.) + PENDING = 'PENDING' # Running the user's setup script (only in effect if --detach-setup is # set). Our update_job_status() can temporarily (for a short period) set # the status to SETTING_UP, if the generated ray program has not set # the status to PENDING or RUNNING yet. SETTING_UP = 'SETTING_UP' - # The job is waiting for the required resources. (`ray job status` - # shows RUNNING as the generated ray program has started, but blocked - # by the placement constraints.) - PENDING = 'PENDING' # The job is running. # In the 'jobs' table, the `start_at` column will be set to the current # time, when the job is firstly transitioned to RUNNING. @@ -126,6 +136,77 @@ def colored_str(self): return f'{color}{self.value}{colorama.Style.RESET_ALL}' +# Only update status of the jobs after this many seconds of job submission, +# to avoid race condition with `ray job` to make sure it job has been +# correctly updated. +# TODO(zhwu): This number should be tuned based on heuristics. +_PENDING_SUBMIT_GRACE_PERIOD = 60 + +_PRE_RESOURCE_STATUSES = [JobStatus.PENDING] + + +class JobScheduler: + """Base class for job scheduler""" + + def queue(self, job_id: int, cmd: str) -> None: + _CURSOR.execute('INSERT INTO pending_jobs VALUES (?,?,?,?)', + (job_id, cmd, 0, int(time.time()))) + _CONN.commit() + set_status(job_id, JobStatus.PENDING) + self.schedule_step() + + def remove_job_no_lock(self, job_id: int) -> None: + _CURSOR.execute(f'DELETE FROM pending_jobs WHERE job_id={job_id!r}') + _CONN.commit() + + def _run_job(self, job_id: int, run_cmd: str): + _CURSOR.execute((f'UPDATE pending_jobs SET submit={int(time.time())} ' + f'WHERE job_id={job_id!r}')) + _CONN.commit() + subprocess.Popen(run_cmd, shell=True, stdout=subprocess.DEVNULL) + + def schedule_step(self) -> None: + job_owner = getpass.getuser() + jobs = self._get_jobs() + if len(jobs) > 0: + update_status(job_owner) + # TODO(zhwu, mraheja): One optimization can be allowing more than one + # job staying in the pending state after ray job submit, so that to be + # faster to schedule a large amount of jobs. + for job_id, run_cmd, submit, created_time in jobs: + with filelock.FileLock(_get_lock_path(job_id)): + status = get_status_no_lock(job_id) + if (status not in _PRE_RESOURCE_STATUSES or + created_time < psutil.boot_time()): + # Job doesn't exist, is running/cancelled, or created + # before the last reboot. + self.remove_job_no_lock(job_id) + continue + if submit: + # Next job waiting for resources + return + self._run_job(job_id, run_cmd) + return + + def _get_jobs(self) -> List[Tuple[int, str, int, int]]: + """Returns the metadata for jobs in the pending jobs table + + The information contains job_id, run command, submit time, + creation time. + """ + raise NotImplementedError + + +class FIFOScheduler(JobScheduler): + """First in first out job scheduler""" + + def _get_jobs(self) -> List[Tuple[int, str, int, int]]: + return list( + _CURSOR.execute('SELECT * FROM pending_jobs ORDER BY job_id')) + + +scheduler = FIFOScheduler() + _JOB_STATUS_TO_COLOR = { JobStatus.INIT: colorama.Fore.BLUE, JobStatus.SETTING_UP: colorama.Fore.BLUE, @@ -138,23 +219,24 @@ def colored_str(self): } _RAY_TO_JOB_STATUS_MAP = { - # These are intentionally set to one status before, because: + # These are intentionally set this way, because: # 1. when the ray status indicates the job is PENDING the generated - # python program should not be started yet, i.e. the job should be INIT. + # python program has been `ray job submit` from the job queue + # and is now PENDING # 2. when the ray status indicates the job is RUNNING the job can be in # setup or resources may not be allocated yet, i.e. the job should be - # SETTING_UP. - # For case 2, update_job_status() would compare this mapped SETTING_UP to + # PENDING. + # For case 2, update_job_status() would compare this mapped PENDING to # the status in our jobs DB and take the max. This is because the job's # generated ray program is the only place that can determine a job has # reserved resources and actually started running: it will set the - # status in the DB to RUNNING. + # status in the DB to SETTING_UP or RUNNING. # If there is no setup specified in the task, as soon as it is started # (ray's status becomes RUNNING), i.e. it will be very rare that the job # will be set to SETTING_UP by the update_job_status, as our generated # ray program will set the status to PENDING immediately. - 'PENDING': JobStatus.INIT, - 'RUNNING': JobStatus.SETTING_UP, + 'PENDING': JobStatus.PENDING, + 'RUNNING': JobStatus.PENDING, 'SUCCEEDED': JobStatus.SUCCEEDED, 'FAILED': JobStatus.FAILED, 'STOPPED': JobStatus.CANCELLED, @@ -368,9 +450,9 @@ def _get_records_from_rows(rows) -> List[Dict[str, Any]]: return records -def _get_jobs(username: Optional[str], - status_list: Optional[List[JobStatus]] = None, - submitted_gap_sec: int = 0) -> List[Dict[str, Any]]: +def _get_jobs( + username: Optional[str], + status_list: Optional[List[JobStatus]] = None) -> List[Dict[str, Any]]: if status_list is None: status_list = list(JobStatus) status_str_list = [status.value for status in status_list] @@ -379,18 +461,17 @@ def _get_jobs(username: Optional[str], f"""\ SELECT * FROM jobs WHERE status IN ({','.join(['?'] * len(status_list))}) - AND submitted_at <= (?) ORDER BY job_id DESC""", - (*status_str_list, time.time() - submitted_gap_sec), + (*status_str_list,), ) else: rows = _CURSOR.execute( f"""\ SELECT * FROM jobs WHERE status IN ({','.join(['?'] * len(status_list))}) - AND username=(?) AND submitted_at <= (?) + AND username=(?) ORDER BY job_id DESC""", - (*status_str_list, username, time.time() - submitted_gap_sec), + (*status_str_list, username), ) records = _get_records_from_rows(rows) @@ -409,6 +490,18 @@ def _get_jobs_by_ids(job_ids: List[int]) -> List[Dict[str, Any]]: return records +def _get_pending_jobs(): + rows = _CURSOR.execute( + 'SELECT job_id, created_time, submit FROM pending_jobs') + rows = list(rows) + return { + job_id: { + 'created_time': created_time, + 'submit': submit + } for job_id, created_time, submit in rows + } + + def update_job_status(job_owner: str, job_ids: List[int], silent: bool = False) -> List[JobStatus]: @@ -435,6 +528,7 @@ def update_job_status(job_owner: str, # which contains the job status (str) and submission_id (str). job_detail_lists: List['ray_pydantic.JobDetails'] = job_client.list_jobs() + pending_jobs = _get_pending_jobs() job_details = {} ray_job_ids_set = set(ray_job_ids) for job_detail in job_detail_lists: @@ -442,9 +536,27 @@ def update_job_status(job_owner: str, job_details[job_detail.submission_id] = job_detail job_statuses: List[Optional[JobStatus]] = [None] * len(ray_job_ids) for i, ray_job_id in enumerate(ray_job_ids): + job_id = job_ids[i] if ray_job_id in job_details: ray_status = job_details[ray_job_id].status job_statuses[i] = _RAY_TO_JOB_STATUS_MAP[ray_status] + if job_id in pending_jobs: + if pending_jobs[job_id]['created_time'] < psutil.boot_time(): + # The job is stale as it is created before the instance + # is booted, e.g. the instance is rebooted. + job_statuses[i] = JobStatus.FAILED + # Gives a 60 second grace period between job being submit from + # the pending table until appearing in ray jobs. + if (pending_jobs[job_id]['submit'] > 0 and + pending_jobs[job_id]['submit'] < + time.time() - _PENDING_SUBMIT_GRACE_PERIOD): + # For jobs submitted outside of the grace period, we will + # consider the ray job status. + continue + else: + # Reset the job status to PENDING even though it may not appear + # in the ray jobs, so that it will not be considered as stale. + job_statuses[i] = JobStatus.PENDING assert len(job_statuses) == len(job_ids), (job_statuses, job_ids) @@ -453,9 +565,6 @@ def update_job_status(job_owner: str, # Per-job status lock is required because between the job status # query and the job status update, the job status in the databse # can be modified by the generated ray program. - # TODO(mraheja): remove pylint disabling when filelock version - # updated - # pylint: disable=abstract-class-instantiated with filelock.FileLock(_get_lock_path(job_id)): original_status = get_status_no_lock(job_id) assert original_status is not None, (job_id, status) @@ -503,7 +612,7 @@ def fail_all_jobs_in_progress() -> None: _CONN.commit() -def update_status(job_owner: str, submitted_gap_sec: int = 0) -> None: +def update_status(job_owner: str) -> None: # This will be called periodically by the skylet to update the status # of the jobs in the database, to avoid stale job status. # NOTE: there might be a INIT job in the database set to FAILED by this @@ -511,8 +620,7 @@ def update_status(job_owner: str, submitted_gap_sec: int = 0) -> None: # not submitted yet. It will be then reset to PENDING / RUNNING when the # app starts. nonterminal_jobs = _get_jobs(username=None, - status_list=JobStatus.nonterminal_statuses(), - submitted_gap_sec=submitted_gap_sec) + status_list=JobStatus.nonterminal_statuses()) nonterminal_job_ids = [job['job_id'] for job in nonterminal_jobs] update_job_status(job_owner, nonterminal_job_ids) @@ -603,7 +711,7 @@ def cancel_jobs(job_owner: str, jobs: Optional[List[int]]) -> None: # jobs to CANCELLED. if jobs is None: job_records = _get_jobs( - None, [JobStatus.SETTING_UP, JobStatus.PENDING, JobStatus.RUNNING]) + None, [JobStatus.PENDING, JobStatus.SETTING_UP, JobStatus.RUNNING]) else: job_records = _get_jobs_by_ids(jobs) @@ -615,18 +723,24 @@ def cancel_jobs(job_owner: str, jobs: Optional[List[int]]) -> None: # ray cluster (tracked in #1262). for job in job_records: job_id = make_ray_job_id(job['job_id'], job_owner) - try: - job_client.stop_job(job_id) - except RuntimeError as e: - # If the job does not exist or if the request to the - # job server fails. - logger.warning(str(e)) - continue - - if job['status'] in [ - JobStatus.SETTING_UP, JobStatus.PENDING, JobStatus.RUNNING - ]: - set_status(job['job_id'], JobStatus.CANCELLED) + # Job is locked to ensure that pending queue does not start it while + # it is being cancelled + with filelock.FileLock(_get_lock_path(job['job_id'])): + try: + job_client.stop_job(job_id) + except RuntimeError as e: + # If the request to the job server fails, we should not + # set the job to CANCELLED. + if 'does not exist' not in str(e): + logger.warning(str(e)) + continue + + if job['status'] in [ + JobStatus.PENDING, JobStatus.SETTING_UP, JobStatus.RUNNING + ]: + _set_status_no_lock(job['job_id'], JobStatus.CANCELLED) + + scheduler.schedule_step() def get_run_timestamp(job_id: Optional[int]) -> Optional[str]: @@ -683,6 +797,13 @@ def add_job(cls, job_name: Optional[str], username: str, run_timestamp: str, ] return cls._build(code) + @classmethod + def queue_job(cls, job_id: int, cmd: str) -> str: + code = ['job_lib.scheduler.queue(' + f'{job_id!r},' + f'{cmd!r})'] + return cls._build(code) + @classmethod def update_status(cls, job_owner: str) -> str: code = [ diff --git a/sky/skylet/skylet.py b/sky/skylet/skylet.py index e6befdf72de..6bbb51e7a37 100644 --- a/sky/skylet/skylet.py +++ b/sky/skylet/skylet.py @@ -5,12 +5,15 @@ from sky import sky_logging from sky.skylet import events -logger = sky_logging.init_logger(__name__) +# Use the explicit logger name so that the logger is under the +# `sky.skylet.skylet` namespace when executed directly, so as +# to inherit the setup from the `sky` logger. +logger = sky_logging.init_logger('sky.skylet.skylet') logger.info('skylet started') EVENTS = [ events.AutostopEvent(), - events.JobUpdateEvent(), + events.JobSchedulerEvent(), # The spot job update event should be after the job update event. # Otherwise, the abnormal spot job status update will be delayed # until the next job update event. diff --git a/tests/test_smoke.py b/tests/test_smoke.py index 1d42b065853..117fe2325f2 100644 --- a/tests/test_smoke.py +++ b/tests/test_smoke.py @@ -595,7 +595,7 @@ def test_aws_stale_job_manual_restart(): f'sky logs {name} 1 --status', f'sky logs {name} 3 --status', # Ensure the skylet updated the stale job status. - f'sleep {events.JobUpdateEvent.EVENT_INTERVAL_SECONDS}', + f'sleep {events.JobSchedulerEvent.EVENT_INTERVAL_SECONDS}', f's=$(sky queue {name}); echo "$s"; echo; echo; echo "$s" | grep FAILED', ], f'sky down -y {name}', @@ -624,7 +624,7 @@ def test_gcp_stale_job_manual_restart(): f'sky logs {name} 1 --status', f'sky logs {name} 3 --status', # Ensure the skylet updated the stale job status. - f'sleep {events.JobUpdateEvent.EVENT_INTERVAL_SECONDS}', + f'sleep {events.JobSchedulerEvent.EVENT_INTERVAL_SECONDS}', f's=$(sky queue {name}); echo "$s"; echo; echo; echo "$s" | grep FAILED', ], f'sky down -y {name}', @@ -928,16 +928,16 @@ def test_job_queue_multinode(generic_cloud: str): f'sky launch -c {name} -n {name}-3 --detach-setup -d examples/job_queue/job_multinode.yaml', f's=$(sky queue {name}) && echo "$s" && (echo "$s" | grep {name}-1 | grep RUNNING)', f's=$(sky queue {name}) && echo "$s" && (echo "$s" | grep {name}-2 | grep RUNNING)', - f's=$(sky queue {name}) && echo "$s" && (echo "$s" | grep {name}-3 | grep SETTING_UP)', - 'sleep 90', f's=$(sky queue {name}) && echo "$s" && (echo "$s" | grep {name}-3 | grep PENDING)', + 'sleep 90', f'sky cancel -y {name} 1', 'sleep 5', - f's=$(sky queue {name}); echo "$s"; echo; echo; echo "$s" | grep {name}-3 | grep RUNNING', + f's=$(sky queue {name}); echo "$s"; echo; echo; echo "$s" | grep {name}-3 | grep SETTING_UP', f'sky cancel -y {name} 1 2 3', f'sky launch -c {name} -n {name}-4 --detach-setup -d examples/job_queue/job_multinode.yaml', # Test the job status is correctly set to SETTING_UP, during the setup is running, # and the job can be cancelled during the setup. + 'sleep 5', f's=$(sky queue {name}) && echo "$s" && (echo "$s" | grep {name}-4 | grep SETTING_UP)', f'sky cancel -y {name} 4', f's=$(sky queue {name}) && echo "$s" && (echo "$s" | grep {name}-4 | grep CANCELLED)', @@ -960,12 +960,53 @@ def test_large_job_queue(generic_cloud: str): 'large_job_queue', [ f'sky launch -y -c {name} --cloud {generic_cloud}', - f'for i in `seq 1 75`; do sky exec {name} -d "echo $i; sleep 100000000"; done', + f'for i in `seq 1 75`; do sky exec {name} -n {name}-$i -d "echo $i; sleep 100000000"; done', f'sky cancel -y {name} 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16', - 'sleep 20', + 'sleep 70', # Each job takes 0.5 CPU and the default VM has 8 CPUs, so there should be 8 / 0.5 = 16 jobs running. # The first 16 jobs are canceled, so there should be 75 - 32 = 43 jobs PENDING. f's=$(sky queue {name}); echo "$s"; echo; echo; echo "$s" | grep -v grep | grep PENDING | wc -l | grep 43', + # Make sure the jobs are scheduled in FIFO order + *[ + f's=$(sky queue {name}); echo "$s"; echo; echo; echo "$s" | grep {name}-{i} | grep CANCELLED' + for i in range(1, 17) + ], + *[ + f's=$(sky queue {name}); echo "$s"; echo; echo; echo "$s" | grep {name}-{i} | grep RUNNING' + for i in range(17, 33) + ], + *[ + f's=$(sky queue {name}); echo "$s"; echo; echo; echo "$s" | grep {name}-{i} | grep PENDING' + for i in range(33, 75) + ], + f'sky cancel -y {name} 33 35 37 39 17 18 19', + *[ + f's=$(sky queue {name}); echo "$s"; echo; echo; echo "$s" | grep {name}-{i} | grep CANCELLED' + for i in range(33, 40, 2) + ], + 'sleep 10', + *[ + f's=$(sky queue {name}); echo "$s"; echo; echo; echo "$s" | grep {name}-{i} | grep RUNNING' + for i in [34, 36, 38] + ], + ], + f'sky down -y {name}', + timeout=20 * 60, + ) + run_one_test(test) + + +@pytest.mark.no_lambda_cloud # No Lambda Cloud VM has 8 CPUs +def test_fast_large_job_queue(generic_cloud: str): + # This is to test the jobs can be scheduled quickly when there are many jobs in the queue. + name = _get_cluster_name() + test = Test( + 'fast_large_job_queue', + [ + f'sky launch -y -c {name} --cloud {generic_cloud}', + f'for i in `seq 1 32`; do sky exec {name} -n {name}-$i -d "echo $i"; done', + 'sleep 60', + f's=$(sky queue {name}); echo "$s"; echo; echo; echo "$s" | grep -v grep | grep SUCCEEDED | wc -l | grep 32', ], f'sky down -y {name}', timeout=20 * 60,