From 9b9b670722f57247af7b1b0865505dbbede23342 Mon Sep 17 00:00:00 2001 From: mraheja Date: Wed, 25 Jan 2023 23:42:09 -0800 Subject: [PATCH 01/39] working expanded queue --- sky/backends/cloud_vm_ray_backend.py | 12 +++- sky/skylet/job_lib.py | 92 +++++++++++++++++++++++++++- 2 files changed, 101 insertions(+), 3 deletions(-) diff --git a/sky/backends/cloud_vm_ray_backend.py b/sky/backends/cloud_vm_ray_backend.py index 6ea6d4e7832..6965a64952f 100644 --- a/sky/backends/cloud_vm_ray_backend.py +++ b/sky/backends/cloud_vm_ray_backend.py @@ -2439,12 +2439,22 @@ def _exec_code_on_head( f'> {remote_log_path} 2>&1') job_submit_cmd = self._setup_and_create_job_cmd_on_local_head( handle, ray_command, ray_job_id) - else: + elif handle.cluster_name == spot_lib.SPOT_CONTROLLER_NAME: job_submit_cmd = ( f'{cd} && mkdir -p {remote_log_dir} && ray job submit ' f'--address=http://127.0.0.1:8265 --submission-id {ray_job_id} ' '--no-wait ' f'"{executable} -u {script_path} > {remote_log_path} 2>&1"') + else: + job_submit_cmd = ( + f'{cd} && ray job submit ' + f'--address=http://127.0.0.1:8265 --submission-id {ray_job_id} ' + '--no-wait ' + f'"{executable} -u {script_path} > {remote_log_path} 2>&1"') + + mkdir_code = f'{cd} && mkdir -p {remote_log_dir} && touch {remote_log_path} && echo START > {remote_log_path} 2>&1' + code = job_lib.JobLibCodeGen.queue_job(job_id, job_submit_cmd) + job_submit_cmd = mkdir_code + ' && ' + code returncode, stdout, stderr = self.run_on_head(handle, job_submit_cmd, diff --git a/sky/skylet/job_lib.py b/sky/skylet/job_lib.py index 360abab054a..66e01405f0f 100644 --- a/sky/skylet/job_lib.py +++ b/sky/skylet/job_lib.py @@ -5,13 +5,16 @@ import enum import os import pathlib +import psutil import shlex +import subprocess import time import typing -from typing import Any, Dict, List, Optional +from typing import Any, Dict, List, Optional, Tuple import colorama import filelock +import getpass from sky import sky_logging from sky.skylet import constants @@ -61,6 +64,13 @@ def create_table(cursor, conn): run_timestamp TEXT CANDIDATE KEY, start_at FLOAT DEFAULT -1)""") + cursor.execute("""CREATE TABLE IF NOT EXISTS pending_jobs( + job_id INTEGER, + run_cmd TEXT, + submit INTEGER, + created_time INTEGER + )""") + db_utils.add_column_to_table(cursor, conn, 'jobs', 'end_at', 'FLOAT') db_utils.add_column_to_table(cursor, conn, 'jobs', 'resources', 'TEXT') @@ -124,6 +134,55 @@ def colored_str(self): color = _JOB_STATUS_TO_COLOR[self] return f'{color}{self.value}{colorama.Style.RESET_ALL}' +class JobScheduler: + """Base class for job scheduler""" + + def queue(self, job_id: int, cmd: str) -> None: + _CURSOR.execute('INSERT INTO pending_jobs VALUES (?,?,?,?)', + (job_id, cmd, 0, int(time.time()))) + _CONN.commit() + set_status(job_id, JobStatus.PENDING) + self.schedule_step() + + def set_scheduled(self, job_id: str): + with filelock.FileLock(_get_lock_path(job_id)): + self.remove_job_no_lock(job_id) + + def remove_job_no_lock(self, job_id: str) -> None: + _CURSOR.execute(f'DELETE FROM pending_jobs WHERE job_id={job_id!r}') + _CONN.commit() + + def _run_job(self, job_id: int, run_cmd: str): + _CURSOR.execute( + f'UPDATE pending_jobs SET submit={int(time.time())} WHERE job_id={job_id!r}' + ) + _CONN.commit() + subprocess.Popen(run_cmd, shell=True, stdout=subprocess.DEVNULL) + + def schedule_step(self) -> None: + job_owner = getpass.getuser() + jobs = list(self._get_jobs()) + if len(jobs) > 0: + update_status(job_owner) + for job_id, run_cmd, submit, _ in jobs: + with filelock.FileLock(_get_lock_path(job_id)): + status = get_status_no_lock(job_id) + if status != JobStatus.PENDING: + # Job doesn't exist or is running/cancelled + self.remove_job_no_lock(job_id) + continue + if submit: + # Next job waiting for resources + return + self._run_job(job_id, run_cmd) + return + +class FIFOScheduler(JobScheduler): + """First in first out job scheduler""" + def _get_jobs(self) -> Tuple: + return _CURSOR.execute('SELECT * FROM pending_jobs ORDER BY job_id') + +scheduler = FIFOScheduler() _JOB_STATUS_TO_COLOR = { JobStatus.INIT: colorama.Fore.BLUE, @@ -376,6 +435,16 @@ def _get_jobs_by_ids(job_ids: List[int]) -> List[Dict[str, Any]]: records = _get_records_from_rows(rows) return records +def _get_pending_jobs(): + rows = _CURSOR.execute( + 'SELECT job_id, created_time, submit FROM pending_jobs') + rows = list(rows) + return { + job_id: { + 'created_time': created_time, + 'submit': submit + } for job_id, created_time, submit in rows + } def update_job_status(job_owner: str, job_ids: List[int], @@ -403,6 +472,8 @@ def update_job_status(job_owner: str, # which contains the job status (str) and submission_id (str). job_detail_lists: List['ray_pydantic.JobDetails'] = job_client.list_jobs() + pending_jobs = _get_pending_jobs() + job_details = dict() ray_job_ids_set = set(ray_job_ids) for job_detail in job_detail_lists: @@ -410,9 +481,18 @@ def update_job_status(job_owner: str, job_details[job_detail.submission_id] = job_detail job_statuses: List[JobStatus] = [None] * len(ray_job_ids) for i, ray_job_id in enumerate(ray_job_ids): + job_id = job_ids[i] if ray_job_id in job_details: ray_status = job_details[ray_job_id].status job_statuses[i] = _RAY_TO_JOB_STATUS_MAP[ray_status] + if job_id in pending_jobs: + if pending_jobs[job_id]['submit'] > 0 and pending_jobs[job_id][ + 'submit'] < time.time() - 5: + continue + if pending_jobs[job_id]['created_time'] < psutil.boot_time(): + job_statuses[i] = JobStatus.FAILED + else: + job_statuses[i] = JobStatus.PENDING assert len(job_statuses) == len(job_ids), (job_statuses, job_ids) @@ -581,7 +661,8 @@ def cancel_jobs(job_owner: str, jobs: Optional[List[int]]) -> None: for job in job_records: job_id = make_ray_job_id(job['job_id'], job_owner) try: - job_client.stop_job(job_id) + with filelock.FileLock(_get_lock_path(job_id)): + job_client.stop_job(job_id) except RuntimeError as e: # If the job does not exist or if the request to the # job server fails. @@ -649,6 +730,13 @@ def add_job(cls, job_name: str, username: str, run_timestamp: str, ] return cls._build(code) + @classmethod + def queue_job(cls, job_name: str, cmd: str) -> None: + code = ['job_lib.scheduler.queue(' + f'{job_name!r},' + f'{cmd!r})'] + return cls._build(code) + @classmethod def update_status(cls, job_owner: str) -> str: code = [ From c099057953a6bcaa4f0353597cb6370bb8730418 Mon Sep 17 00:00:00 2001 From: mraheja Date: Thu, 26 Jan 2023 00:08:03 -0800 Subject: [PATCH 02/39] added event --- sky/backends/cloud_vm_ray_backend.py | 1 + sky/skylet/events.py | 14 +++----------- sky/skylet/job_lib.py | 12 ++++++++++-- sky/skylet/skylet.py | 2 +- tests/test_smoke.py | 4 ++-- 5 files changed, 17 insertions(+), 16 deletions(-) diff --git a/sky/backends/cloud_vm_ray_backend.py b/sky/backends/cloud_vm_ray_backend.py index 6965a64952f..aed41c566b3 100644 --- a/sky/backends/cloud_vm_ray_backend.py +++ b/sky/backends/cloud_vm_ray_backend.py @@ -358,6 +358,7 @@ def add_gang_scheduling_placement_group( file=sys.stderr, flush=True) job_lib.set_job_started({self.job_id!r}) + job_lib.scheduler.schedule_step() """) ] diff --git a/sky/skylet/events.py b/sky/skylet/events.py index d3b66dabfc0..8c1245aa73f 100644 --- a/sky/skylet/events.py +++ b/sky/skylet/events.py @@ -52,20 +52,12 @@ def _run(self): raise NotImplementedError -class JobUpdateEvent(SkyletEvent): - """Skylet event for updating job status.""" +class JobSchedulerEvent(SkyletEvent): + """Skylet event for scheduling jobs""" EVENT_INTERVAL_SECONDS = 300 - # Only update status of the jobs after this many seconds of job submission, - # to avoid race condition with `ray job` to make sure it job has been - # correctly updated. - # TODO(zhwu): This number should be tuned based on heuristics. - _SUBMITTED_GAP_SECONDS = 60 - def _run(self): - job_owner = getpass.getuser() - job_lib.update_status(job_owner, - submitted_gap_sec=self._SUBMITTED_GAP_SECONDS) + job_lib.scheduler.schedule_step() class SpotJobUpdateEvent(SkyletEvent): diff --git a/sky/skylet/job_lib.py b/sky/skylet/job_lib.py index 66e01405f0f..c73242e818c 100644 --- a/sky/skylet/job_lib.py +++ b/sky/skylet/job_lib.py @@ -134,6 +134,14 @@ def colored_str(self): color = _JOB_STATUS_TO_COLOR[self] return f'{color}{self.value}{colorama.Style.RESET_ALL}' +# Only update status of the jobs after this many seconds of job submission, +# to avoid race condition with `ray job` to make sure it job has been +# correctly updated. +# TODO(zhwu): This number should be tuned based on heuristics. +_SUBMITTED_GAP_SECONDS = 60 + +_PRE_RESOURCE_STATUSES = [JobStatus.PENDING, JobStatus.SETTING_UP] + class JobScheduler: """Base class for job scheduler""" @@ -163,11 +171,11 @@ def schedule_step(self) -> None: job_owner = getpass.getuser() jobs = list(self._get_jobs()) if len(jobs) > 0: - update_status(job_owner) + update_status(job_owner, _SUBMITTED_GAP_SECONDS) for job_id, run_cmd, submit, _ in jobs: with filelock.FileLock(_get_lock_path(job_id)): status = get_status_no_lock(job_id) - if status != JobStatus.PENDING: + if not status in _PRE_RESOURCE_STATUSES : # Job doesn't exist or is running/cancelled self.remove_job_no_lock(job_id) continue diff --git a/sky/skylet/skylet.py b/sky/skylet/skylet.py index e6befdf72de..d396bb84628 100644 --- a/sky/skylet/skylet.py +++ b/sky/skylet/skylet.py @@ -10,7 +10,7 @@ EVENTS = [ events.AutostopEvent(), - events.JobUpdateEvent(), + events.JobSchedulerEvent(), # The spot job update event should be after the job update event. # Otherwise, the abnormal spot job status update will be delayed # until the next job update event. diff --git a/tests/test_smoke.py b/tests/test_smoke.py index 2b07d4ec509..eb3a6b9e3dc 100644 --- a/tests/test_smoke.py +++ b/tests/test_smoke.py @@ -523,7 +523,7 @@ def test_aws_stale_job_manual_restart(): f'sky logs {name} 1 --status', f'sky logs {name} 3 --status', # Ensure the skylet updated the stale job status. - f'sleep {events.JobUpdateEvent.EVENT_INTERVAL_SECONDS}', + f'sleep {events.JobSchedulereEvent.EVENT_INTERVAL_SECONDS}', f's=$(sky queue {name}); echo "$s"; echo; echo; echo "$s" | grep FAILED', ], f'sky down -y {name}', @@ -552,7 +552,7 @@ def test_gcp_stale_job_manual_restart(): f'sky logs {name} 1 --status', f'sky logs {name} 3 --status', # Ensure the skylet updated the stale job status. - f'sleep {events.JobUpdateEvent.EVENT_INTERVAL_SECONDS}', + f'sleep {events.JobSchedulerEvent.EVENT_INTERVAL_SECONDS}', f's=$(sky queue {name}); echo "$s"; echo; echo; echo "$s" | grep FAILED', ], f'sky down -y {name}', From 281d798e23bbc03cc66e739a17d348d635bca5ff Mon Sep 17 00:00:00 2001 From: mraheja Date: Thu, 26 Jan 2023 10:07:45 -0800 Subject: [PATCH 03/39] updated testing and bug fixes --- examples/job_queue/job_multinode.yaml | 2 +- sky/backends/cloud_vm_ray_backend.py | 13 ++++++++----- sky/skylet/events.py | 1 - sky/skylet/job_lib.py | 18 ++++++++++++------ tests/test_smoke.py | 10 +++++++--- 5 files changed, 28 insertions(+), 16 deletions(-) diff --git a/examples/job_queue/job_multinode.yaml b/examples/job_queue/job_multinode.yaml index 62546306f5a..dd0b030e1ca 100644 --- a/examples/job_queue/job_multinode.yaml +++ b/examples/job_queue/job_multinode.yaml @@ -23,5 +23,5 @@ run: | conda env list for i in {1..360}; do echo "$timestamp $i" - sleep 1 + sleep 2 done diff --git a/sky/backends/cloud_vm_ray_backend.py b/sky/backends/cloud_vm_ray_backend.py index aed41c566b3..cb5cb0e6695 100644 --- a/sky/backends/cloud_vm_ray_backend.py +++ b/sky/backends/cloud_vm_ray_backend.py @@ -287,9 +287,10 @@ def add_prologue(self, sys.exit(1) """) ] - self._code += [ - f'job_lib.set_status({job_id!r}, job_lib.JobStatus.PENDING)', - ] + else: + self._code += [ + f'job_lib.set_status({job_id!r}, job_lib.JobStatus.PENDING)', + ] if spot_task is not None: # Add the spot job to spot queue table. resources_str = backend_utils.get_task_resources_str(spot_task) @@ -2452,8 +2453,10 @@ def _exec_code_on_head( f'--address=http://127.0.0.1:8265 --submission-id {ray_job_id} ' '--no-wait ' f'"{executable} -u {script_path} > {remote_log_path} 2>&1"') - - mkdir_code = f'{cd} && mkdir -p {remote_log_dir} && touch {remote_log_path} && echo START > {remote_log_path} 2>&1' + + mkdir_code = ( + f'{cd} && mkdir -p {remote_log_dir} && touch ' + f'{remote_log_path} && echo START > {remote_log_path} 2>&1') code = job_lib.JobLibCodeGen.queue_job(job_id, job_submit_cmd) job_submit_cmd = mkdir_code + ' && ' + code diff --git a/sky/skylet/events.py b/sky/skylet/events.py index 8c1245aa73f..98bf4a7160d 100644 --- a/sky/skylet/events.py +++ b/sky/skylet/events.py @@ -1,5 +1,4 @@ """skylet events""" -import getpass import math import os import re diff --git a/sky/skylet/job_lib.py b/sky/skylet/job_lib.py index c73242e818c..3fa8d072f83 100644 --- a/sky/skylet/job_lib.py +++ b/sky/skylet/job_lib.py @@ -134,6 +134,7 @@ def colored_str(self): color = _JOB_STATUS_TO_COLOR[self] return f'{color}{self.value}{colorama.Style.RESET_ALL}' + # Only update status of the jobs after this many seconds of job submission, # to avoid race condition with `ray job` to make sure it job has been # correctly updated. @@ -142,6 +143,7 @@ def colored_str(self): _PRE_RESOURCE_STATUSES = [JobStatus.PENDING, JobStatus.SETTING_UP] + class JobScheduler: """Base class for job scheduler""" @@ -151,7 +153,7 @@ def queue(self, job_id: int, cmd: str) -> None: _CONN.commit() set_status(job_id, JobStatus.PENDING) self.schedule_step() - + def set_scheduled(self, job_id: str): with filelock.FileLock(_get_lock_path(job_id)): self.remove_job_no_lock(job_id) @@ -159,11 +161,10 @@ def set_scheduled(self, job_id: str): def remove_job_no_lock(self, job_id: str) -> None: _CURSOR.execute(f'DELETE FROM pending_jobs WHERE job_id={job_id!r}') _CONN.commit() - + def _run_job(self, job_id: int, run_cmd: str): - _CURSOR.execute( - f'UPDATE pending_jobs SET submit={int(time.time())} WHERE job_id={job_id!r}' - ) + _CURSOR.execute((f'UPDATE pending_jobs SET submit={int(time.time())}' + f'WHERE job_id={job_id!r}')) _CONN.commit() subprocess.Popen(run_cmd, shell=True, stdout=subprocess.DEVNULL) @@ -175,7 +176,7 @@ def schedule_step(self) -> None: for job_id, run_cmd, submit, _ in jobs: with filelock.FileLock(_get_lock_path(job_id)): status = get_status_no_lock(job_id) - if not status in _PRE_RESOURCE_STATUSES : + if status not in _PRE_RESOURCE_STATUSES: # Job doesn't exist or is running/cancelled self.remove_job_no_lock(job_id) continue @@ -185,11 +186,14 @@ def schedule_step(self) -> None: self._run_job(job_id, run_cmd) return + class FIFOScheduler(JobScheduler): """First in first out job scheduler""" + def _get_jobs(self) -> Tuple: return _CURSOR.execute('SELECT * FROM pending_jobs ORDER BY job_id') + scheduler = FIFOScheduler() _JOB_STATUS_TO_COLOR = { @@ -443,6 +447,7 @@ def _get_jobs_by_ids(job_ids: List[int]) -> List[Dict[str, Any]]: records = _get_records_from_rows(rows) return records + def _get_pending_jobs(): rows = _CURSOR.execute( 'SELECT job_id, created_time, submit FROM pending_jobs') @@ -454,6 +459,7 @@ def _get_pending_jobs(): } for job_id, created_time, submit in rows } + def update_job_status(job_owner: str, job_ids: List[int], silent: bool = False) -> List[JobStatus]: diff --git a/tests/test_smoke.py b/tests/test_smoke.py index eb3a6b9e3dc..bb375d027d8 100644 --- a/tests/test_smoke.py +++ b/tests/test_smoke.py @@ -706,9 +706,9 @@ def test_job_queue_multinode(generic_cloud: str): f'sky launch -c {name} -n {name}-3 --detach-setup -d examples/job_queue/job_multinode.yaml', f's=$(sky queue {name}) && echo "$s" && (echo "$s" | grep {name}-1 | grep RUNNING)', f's=$(sky queue {name}) && echo "$s" && (echo "$s" | grep {name}-2 | grep RUNNING)', + 'sleep 10', f's=$(sky queue {name}) && echo "$s" && (echo "$s" | grep {name}-3 | grep SETTING_UP)', 'sleep 90', - f's=$(sky queue {name}) && echo "$s" && (echo "$s" | grep {name}-3 | grep PENDING)', f'sky cancel -y {name} 1', 'sleep 5', f'sky queue {name} | grep {name}-3 | grep RUNNING', @@ -737,12 +737,16 @@ def test_large_job_queue(generic_cloud: str): 'large_job_queue', [ f'sky launch -y -c {name} --cloud {generic_cloud}', - f'for i in `seq 1 75`; do sky exec {name} -d "echo $i; sleep 100000000"; done', + f'for i in `seq 1 75`; do sky exec {name} -n {name}-$i -d "echo $i; sleep 100000000"; done', f'sky cancel -y {name} 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16', - 'sleep 20', + 'sleep 120', # Each job takes 0.5 CPU and the default VM has 8 CPUs, so there should be 8 / 0.5 = 16 jobs running. # The first 16 jobs are canceled, so there should be 75 - 32 = 43 jobs PENDING. f'sky queue {name} | grep -v grep | grep PENDING | wc -l | grep 43', + f'sky queue {name} | grep {name}-15 | grep CANCELLED', + f'sky queue {name} | grep {name}-32 | grep RUNNING', + f'sky queue {name} | grep {name}-33 | grep PENDING', + f'sky queue {name} | grep {name}-50 | grep PENDING', ], f'sky down -y {name}', timeout=20 * 60, From 5ad959d2d3deafa5e01d3df44a6550819b561fd9 Mon Sep 17 00:00:00 2001 From: mraheja Date: Thu, 26 Jan 2023 10:17:47 -0800 Subject: [PATCH 04/39] fixed missed space --- sky/skylet/job_lib.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sky/skylet/job_lib.py b/sky/skylet/job_lib.py index 3fa8d072f83..ef5b0d2ee6a 100644 --- a/sky/skylet/job_lib.py +++ b/sky/skylet/job_lib.py @@ -163,7 +163,7 @@ def remove_job_no_lock(self, job_id: str) -> None: _CONN.commit() def _run_job(self, job_id: int, run_cmd: str): - _CURSOR.execute((f'UPDATE pending_jobs SET submit={int(time.time())}' + _CURSOR.execute((f'UPDATE pending_jobs SET submit={int(time.time())} ' f'WHERE job_id={job_id!r}')) _CONN.commit() subprocess.Popen(run_cmd, shell=True, stdout=subprocess.DEVNULL) From 440fc68e2d0cdba8d8e8d867c23f6bd1261eb94f Mon Sep 17 00:00:00 2001 From: mraheja Date: Thu, 26 Jan 2023 10:37:56 -0800 Subject: [PATCH 05/39] formatting fix --- sky/skylet/job_lib.py | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/sky/skylet/job_lib.py b/sky/skylet/job_lib.py index ef5b0d2ee6a..578bf80647b 100644 --- a/sky/skylet/job_lib.py +++ b/sky/skylet/job_lib.py @@ -155,6 +155,9 @@ def queue(self, job_id: int, cmd: str) -> None: self.schedule_step() def set_scheduled(self, job_id: str): + # TODO(mraheja): remove pylint disabling when filelock + # version updated + # pylint: disable=abstract-class-instantiated with filelock.FileLock(_get_lock_path(job_id)): self.remove_job_no_lock(job_id) @@ -174,6 +177,9 @@ def schedule_step(self) -> None: if len(jobs) > 0: update_status(job_owner, _SUBMITTED_GAP_SECONDS) for job_id, run_cmd, submit, _ in jobs: + # TODO(mraheja): remove pylint disabling when filelock + # version updated + # pylint: disable=abstract-class-instantiated with filelock.FileLock(_get_lock_path(job_id)): status = get_status_no_lock(job_id) if status not in _PRE_RESOURCE_STATUSES: @@ -675,6 +681,9 @@ def cancel_jobs(job_owner: str, jobs: Optional[List[int]]) -> None: for job in job_records: job_id = make_ray_job_id(job['job_id'], job_owner) try: + # TODO(mraheja): remove pylint disabling when filelock + # version updated + # pylint: disable=abstract-class-instantiated with filelock.FileLock(_get_lock_path(job_id)): job_client.stop_job(job_id) except RuntimeError as e: From d92da6ab0292bbed172fe673d7606acab7efcb54 Mon Sep 17 00:00:00 2001 From: mraheja Date: Tue, 14 Feb 2023 20:07:33 -0800 Subject: [PATCH 06/39] fixed setting up status + enabled spot --- sky/backends/cloud_vm_ray_backend.py | 125 +++++++++++++-------------- 1 file changed, 60 insertions(+), 65 deletions(-) diff --git a/sky/backends/cloud_vm_ray_backend.py b/sky/backends/cloud_vm_ray_backend.py index cb5cb0e6695..a2da2f3fed0 100644 --- a/sky/backends/cloud_vm_ray_backend.py +++ b/sky/backends/cloud_vm_ray_backend.py @@ -183,13 +183,11 @@ def __init__(self): def add_prologue(self, job_id: int, spot_task: Optional['task_lib.Task'] = None, - setup_cmd: Optional[str] = None, - envs: Optional[Dict[str, str]] = None, - setup_log_path: Optional[str] = None, is_local: bool = False) -> None: assert not self._has_prologue, 'add_prologue() called twice?' self._has_prologue = True self.job_id = job_id + self.is_local = is_local # Should use 'auto' or 'ray://:10001' rather than # 'ray://localhost:10001', or 'ray://127.0.0.1:10001', for public cloud. # Otherwise, it will a bug of ray job failed to get the placement group @@ -235,7 +233,6 @@ def add_prologue(self, inspect.getsource(log_lib.add_ray_env_vars), inspect.getsource(log_lib.run_bash_command_with_log), 'run_bash_command_with_log = ray.remote(run_bash_command_with_log)', - f'setup_cmd = {setup_cmd!r}', ] # Currently, the codegen program is/can only be submitted to the head # node, due to using job_lib for updating job statuses, and using @@ -247,50 +244,9 @@ def add_prologue(self, if hasattr(autostop_lib, 'set_last_active_time_to_now'): autostop_lib.set_last_active_time_to_now() """)) - if setup_cmd is not None: - self._code += [ - textwrap.dedent(f"""\ - _SETUP_CPUS = 0.0001 - # The setup command will be run as a ray task with num_cpus=_SETUP_CPUS as the - # requirement; this means Ray will set CUDA_VISIBLE_DEVICES to an empty string. - # We unset it so that user setup command may properly use this env var. - setup_cmd = 'unset CUDA_VISIBLE_DEVICES; ' + setup_cmd - job_lib.set_status({job_id!r}, job_lib.JobStatus.SETTING_UP) - print({_CTRL_C_TIP_MESSAGE!r}, file=sys.stderr, flush=True) - total_num_nodes = len(ray.nodes()) - setup_bundles = [{{"CPU": _SETUP_CPUS}} for _ in range(total_num_nodes)] - setup_pg = ray.util.placement_group(setup_bundles, strategy='STRICT_SPREAD') - ray.get(setup_pg.ready()) - setup_workers = [run_bash_command_with_log \\ - .options(name='setup', num_cpus=_SETUP_CPUS, placement_group=setup_pg, placement_group_bundle_index=i) \\ - .remote( - setup_cmd, - os.path.expanduser({setup_log_path!r}), - getpass.getuser(), - job_id={self.job_id}, - env_vars={envs!r}, - stream_logs=True, - with_ray=True, - use_sudo={is_local}, - ) for i in range(total_num_nodes)] - setup_returncodes = ray.get(setup_workers) - if sum(setup_returncodes) != 0: - job_lib.set_status({self.job_id!r}, job_lib.JobStatus.FAILED_SETUP) - # This waits for all streaming logs to finish. - time.sleep(1) - print('ERROR: {colorama.Fore.RED}Job {self.job_id}\\'s setup failed with ' - 'return code list:{colorama.Style.RESET_ALL}', - setup_returncodes, - file=sys.stderr, - flush=True) - # Need this to set the job status in ray job to be FAILED. - sys.exit(1) - """) - ] - else: - self._code += [ - f'job_lib.set_status({job_id!r}, job_lib.JobStatus.PENDING)', - ] + self._code += [ + f'job_lib.set_status({job_id!r}, job_lib.JobStatus.PENDING)', + ] if spot_task is not None: # Add the spot job to spot queue table. resources_str = backend_utils.get_task_resources_str(spot_task) @@ -305,6 +261,9 @@ def add_gang_scheduling_placement_group( num_nodes: int, accelerator_dict: Dict[str, int], stable_cluster_internal_ips: List[str], + setup_cmd: Optional[str] = None, + setup_log_path: Optional[str] = None, + envs: Optional[Dict[str, str]] = None, ) -> None: """Create the gang scheduling placement group for a Task. @@ -346,7 +305,7 @@ def add_gang_scheduling_placement_group( plural = 's' if {num_nodes} > 1 else '' node_str = f'{num_nodes} node{{plural}}' - message = '' if setup_cmd is not None else {_CTRL_C_TIP_MESSAGE!r} + '\\n' + message = {_CTRL_C_TIP_MESSAGE!r} + '\\n' message += f'INFO: Waiting for task resources on {{node_str}}. This will block if the cluster is full.' print(message, file=sys.stderr, @@ -358,9 +317,56 @@ def add_gang_scheduling_placement_group( print('INFO: All task resources reserved.', file=sys.stderr, flush=True) + """) + ] + + job_id = self.job_id + if setup_cmd is not None: + self._code += [ + textwrap.dedent(f"""\ + setup_cmd = {setup_cmd!r} + _SETUP_CPUS = 0.0001 + # The setup command will be run as a ray task with num_cpus=_SETUP_CPUS as the + # requirement; this means Ray will set CUDA_VISIBLE_DEVICES to an empty string. + # We unset it so that user setup command may properly use this env var. + setup_cmd = 'unset CUDA_VISIBLE_DEVICES; ' + setup_cmd + job_lib.set_status({job_id!r}, job_lib.JobStatus.SETTING_UP) + print({_CTRL_C_TIP_MESSAGE!r}, file=sys.stderr, flush=True) + total_num_nodes = len(ray.nodes()) + setup_bundles = [{{"CPU": _SETUP_CPUS}} for _ in range(total_num_nodes)] + setup_pg = ray.util.placement_group(setup_bundles, strategy='STRICT_SPREAD') + setup_workers = [run_bash_command_with_log \\ + .options(name='setup', num_cpus=_SETUP_CPUS, placement_group=setup_pg, placement_group_bundle_index=i) \\ + .remote( + setup_cmd, + os.path.expanduser({setup_log_path!r}), + getpass.getuser(), + job_id={self.job_id}, + env_vars={envs!r}, + stream_logs=True, + with_ray=True, + use_sudo={self.is_local}, + ) for i in range(total_num_nodes)] + setup_returncodes = ray.get(setup_workers) + if sum(setup_returncodes) != 0: + job_lib.set_status({self.job_id!r}, job_lib.JobStatus.FAILED_SETUP) + # This waits for all streaming logs to finish. + time.sleep(1) + print('ERROR: {colorama.Fore.RED}Job {self.job_id}\\'s setup failed with ' + 'return code list:{colorama.Style.RESET_ALL}', + setup_returncodes, + file=sys.stderr, + flush=True) + # Need this to set the job status in ray job to be FAILED. + sys.exit(1) + """) + ] + + self._code += [ + textwrap.dedent(f"""\ job_lib.set_job_started({self.job_id!r}) job_lib.scheduler.schedule_step() - """) + """), ] # Export IP and node rank to the environment variables. @@ -2441,13 +2447,7 @@ def _exec_code_on_head( f'> {remote_log_path} 2>&1') job_submit_cmd = self._setup_and_create_job_cmd_on_local_head( handle, ray_command, ray_job_id) - elif handle.cluster_name == spot_lib.SPOT_CONTROLLER_NAME: - job_submit_cmd = ( - f'{cd} && mkdir -p {remote_log_dir} && ray job submit ' - f'--address=http://127.0.0.1:8265 --submission-id {ray_job_id} ' - '--no-wait ' - f'"{executable} -u {script_path} > {remote_log_path} 2>&1"') - else: + else: job_submit_cmd = ( f'{cd} && ray job submit ' f'--address=http://127.0.0.1:8265 --submission-id {ray_job_id} ' @@ -3387,12 +3387,10 @@ def _execute_task_one_node(self, handle: ResourceHandle, is_local = isinstance(handle.launched_resources.cloud, clouds.Local) codegen.add_prologue(job_id, spot_task=task.spot_task, - setup_cmd=self._setup_cmd, - envs=task.envs, - setup_log_path=os.path.join(log_dir, 'setup.log'), + is_local=is_local) codegen.add_gang_scheduling_placement_group( - 1, accelerator_dict, stable_cluster_internal_ips=internal_ips) + 1, accelerator_dict, stable_cluster_internal_ips=internal_ips, setup_cmd=self._setup_cmd, setup_log_path=os.path.join(log_dir, 'setup.log'), envs=task.envs,) if callable(task.run): run_fn_code = textwrap.dedent(inspect.getsource(task.run)) @@ -3450,14 +3448,11 @@ def _execute_task_n_nodes(self, handle: ResourceHandle, task: task_lib.Task, is_local = isinstance(handle.launched_resources.cloud, clouds.Local) codegen.add_prologue(job_id, spot_task=task.spot_task, - setup_cmd=self._setup_cmd, - envs=task.envs, - setup_log_path=os.path.join(log_dir, 'setup.log'), is_local=is_local) codegen.add_gang_scheduling_placement_group( num_actual_nodes, accelerator_dict, - stable_cluster_internal_ips=internal_ips) + stable_cluster_internal_ips=internal_ips, setup_cmd=self._setup_cmd, setup_log_path=os.path.join(log_dir, 'setup.log'), envs=task.envs) if callable(task.run): run_fn_code = textwrap.dedent(inspect.getsource(task.run)) From 0fed0105a8aa5eb873918bca9b62f070098a8c32 Mon Sep 17 00:00:00 2001 From: mraheja Date: Tue, 14 Feb 2023 20:12:48 -0800 Subject: [PATCH 07/39] format --- sky/backends/cloud_vm_ray_backend.py | 16 ++++++++++++---- 1 file changed, 12 insertions(+), 4 deletions(-) diff --git a/sky/backends/cloud_vm_ray_backend.py b/sky/backends/cloud_vm_ray_backend.py index a2da2f3fed0..3bfa3688397 100644 --- a/sky/backends/cloud_vm_ray_backend.py +++ b/sky/backends/cloud_vm_ray_backend.py @@ -2447,7 +2447,7 @@ def _exec_code_on_head( f'> {remote_log_path} 2>&1') job_submit_cmd = self._setup_and_create_job_cmd_on_local_head( handle, ray_command, ray_job_id) - else: + else: job_submit_cmd = ( f'{cd} && ray job submit ' f'--address=http://127.0.0.1:8265 --submission-id {ray_job_id} ' @@ -3387,10 +3387,15 @@ def _execute_task_one_node(self, handle: ResourceHandle, is_local = isinstance(handle.launched_resources.cloud, clouds.Local) codegen.add_prologue(job_id, spot_task=task.spot_task, - is_local=is_local) codegen.add_gang_scheduling_placement_group( - 1, accelerator_dict, stable_cluster_internal_ips=internal_ips, setup_cmd=self._setup_cmd, setup_log_path=os.path.join(log_dir, 'setup.log'), envs=task.envs,) + 1, + accelerator_dict, + stable_cluster_internal_ips=internal_ips, + setup_cmd=self._setup_cmd, + setup_log_path=os.path.join(log_dir, 'setup.log'), + envs=task.envs, + ) if callable(task.run): run_fn_code = textwrap.dedent(inspect.getsource(task.run)) @@ -3452,7 +3457,10 @@ def _execute_task_n_nodes(self, handle: ResourceHandle, task: task_lib.Task, codegen.add_gang_scheduling_placement_group( num_actual_nodes, accelerator_dict, - stable_cluster_internal_ips=internal_ips, setup_cmd=self._setup_cmd, setup_log_path=os.path.join(log_dir, 'setup.log'), envs=task.envs) + stable_cluster_internal_ips=internal_ips, + setup_cmd=self._setup_cmd, + setup_log_path=os.path.join(log_dir, 'setup.log'), + envs=task.envs) if callable(task.run): run_fn_code = textwrap.dedent(inspect.getsource(task.run)) From 2cb6d1991c80a1cf2ec491f75fbb60fd1ef6d6a0 Mon Sep 17 00:00:00 2001 From: mraheja Date: Fri, 17 Feb 2023 09:06:35 -0800 Subject: [PATCH 08/39] fixed setting up test --- tests/test_smoke.py | 5 +++-- tests/wandb.py | 0 2 files changed, 3 insertions(+), 2 deletions(-) create mode 100644 tests/wandb.py diff --git a/tests/test_smoke.py b/tests/test_smoke.py index bb375d027d8..a81f764d5fb 100644 --- a/tests/test_smoke.py +++ b/tests/test_smoke.py @@ -707,15 +707,16 @@ def test_job_queue_multinode(generic_cloud: str): f's=$(sky queue {name}) && echo "$s" && (echo "$s" | grep {name}-1 | grep RUNNING)', f's=$(sky queue {name}) && echo "$s" && (echo "$s" | grep {name}-2 | grep RUNNING)', 'sleep 10', - f's=$(sky queue {name}) && echo "$s" && (echo "$s" | grep {name}-3 | grep SETTING_UP)', + f's=$(sky queue {name}) && echo "$s" && (echo "$s" | grep {name}-3 | grep PENDING)', 'sleep 90', f'sky cancel -y {name} 1', 'sleep 5', - f'sky queue {name} | grep {name}-3 | grep RUNNING', + f'sky queue {name} | grep {name}-3 | grep SETTING_UP', f'sky cancel -y {name} 1 2 3', f'sky launch -c {name} -n {name}-4 --detach-setup -d examples/job_queue/job_multinode.yaml', # Test the job status is correctly set to SETTING_UP, during the setup is running, # and the job can be cancelled during the setup. + 'sleep 5', f's=$(sky queue {name}) && echo "$s" && (echo "$s" | grep {name}-4 | grep SETTING_UP)', f'sky cancel -y {name} 4', f's=$(sky queue {name}) && echo "$s" && (echo "$s" | grep {name}-4 | grep CANCELLED)', diff --git a/tests/wandb.py b/tests/wandb.py new file mode 100644 index 00000000000..e69de29bb2d From 42ce45674dfb4c892037b7bfc028857698319a44 Mon Sep 17 00:00:00 2001 From: mraheja Date: Thu, 23 Feb 2023 11:00:00 -0800 Subject: [PATCH 09/39] addressed comments --- sky/backends/cloud_vm_ray_backend.py | 8 +++----- sky/skylet/job_lib.py | 24 ++++++++++++++++-------- tests/test_smoke.py | 2 +- 3 files changed, 20 insertions(+), 14 deletions(-) diff --git a/sky/backends/cloud_vm_ray_backend.py b/sky/backends/cloud_vm_ray_backend.py index 3bfa3688397..69d7d2b35a7 100644 --- a/sky/backends/cloud_vm_ray_backend.py +++ b/sky/backends/cloud_vm_ray_backend.py @@ -317,6 +317,7 @@ def add_gang_scheduling_placement_group( print('INFO: All task resources reserved.', file=sys.stderr, flush=True) + job_lib.scheduler.schedule_step() """) ] @@ -331,7 +332,6 @@ def add_gang_scheduling_placement_group( # We unset it so that user setup command may properly use this env var. setup_cmd = 'unset CUDA_VISIBLE_DEVICES; ' + setup_cmd job_lib.set_status({job_id!r}, job_lib.JobStatus.SETTING_UP) - print({_CTRL_C_TIP_MESSAGE!r}, file=sys.stderr, flush=True) total_num_nodes = len(ray.nodes()) setup_bundles = [{{"CPU": _SETUP_CPUS}} for _ in range(total_num_nodes)] setup_pg = ray.util.placement_group(setup_bundles, strategy='STRICT_SPREAD') @@ -365,7 +365,6 @@ def add_gang_scheduling_placement_group( self._code += [ textwrap.dedent(f"""\ job_lib.set_job_started({self.job_id!r}) - job_lib.scheduler.schedule_step() """), ] @@ -2454,9 +2453,8 @@ def _exec_code_on_head( '--no-wait ' f'"{executable} -u {script_path} > {remote_log_path} 2>&1"') - mkdir_code = ( - f'{cd} && mkdir -p {remote_log_dir} && touch ' - f'{remote_log_path} && echo START > {remote_log_path} 2>&1') + mkdir_code = (f'{cd} && mkdir -p {remote_log_dir} &&' + f'echo START > {remote_log_path} 2>&1') code = job_lib.JobLibCodeGen.queue_job(job_id, job_submit_cmd) job_submit_cmd = mkdir_code + ' && ' + code diff --git a/sky/skylet/job_lib.py b/sky/skylet/job_lib.py index 578bf80647b..4a3cfe9e67b 100644 --- a/sky/skylet/job_lib.py +++ b/sky/skylet/job_lib.py @@ -92,15 +92,15 @@ class JobStatus(enum.Enum): # In the 'jobs' table, the `submitted_at` column will be set to the current # time, when the job is firstly created (in the INIT state). INIT = 'INIT' + # The job is waiting for the required resources. (`ray job status` + # shows RUNNING as the generated ray program has started, but blocked + # by the placement constraints.) + PENDING = 'PENDING' # Running the user's setup script (only in effect if --detach-setup is # set). Our update_job_status() can temporarily (for a short period) set # the status to SETTING_UP, if the generated ray program has not set # the status to PENDING or RUNNING yet. SETTING_UP = 'SETTING_UP' - # The job is waiting for the required resources. (`ray job status` - # shows RUNNING as the generated ray program has started, but blocked - # by the placement constraints.) - PENDING = 'PENDING' # The job is running. # In the 'jobs' table, the `start_at` column will be set to the current # time, when the job is firstly transitioned to RUNNING. @@ -140,8 +140,9 @@ def colored_str(self): # correctly updated. # TODO(zhwu): This number should be tuned based on heuristics. _SUBMITTED_GAP_SECONDS = 60 +_PENDING_SUBMIT_TIMEOUT = 5 -_PRE_RESOURCE_STATUSES = [JobStatus.PENDING, JobStatus.SETTING_UP] +_PRE_RESOURCE_STATUSES = [JobStatus.PENDING] class JobScheduler: @@ -192,6 +193,9 @@ def schedule_step(self) -> None: self._run_job(job_id, run_cmd) return + def _get_jobs(self) -> Tuple: + raise NotImplementedError + class FIFOScheduler(JobScheduler): """First in first out job scheduler""" @@ -229,8 +233,8 @@ def _get_jobs(self) -> Tuple: # (ray's status becomes RUNNING), i.e. it will be very rare that the job # will be set to SETTING_UP by the update_job_status, as our generated # ray program will set the status to PENDING immediately. - 'PENDING': JobStatus.INIT, - 'RUNNING': JobStatus.SETTING_UP, + 'PENDING': JobStatus.PENDING, + 'RUNNING': JobStatus.PENDING, 'SUCCEEDED': JobStatus.SUCCEEDED, 'FAILED': JobStatus.FAILED, 'STOPPED': JobStatus.CANCELLED, @@ -506,10 +510,14 @@ def update_job_status(job_owner: str, ray_status = job_details[ray_job_id].status job_statuses[i] = _RAY_TO_JOB_STATUS_MAP[ray_status] if job_id in pending_jobs: + # Gives a 5 second timeout between job being submit from the + # pending queue until appearing in ray jobs if pending_jobs[job_id]['submit'] > 0 and pending_jobs[job_id][ - 'submit'] < time.time() - 5: + 'submit'] < time.time() - _PENDING_SUBMIT_TIMEOUT: continue if pending_jobs[job_id]['created_time'] < psutil.boot_time(): + # The job is stale as it is created before the instance + # is created. job_statuses[i] = JobStatus.FAILED else: job_statuses[i] = JobStatus.PENDING diff --git a/tests/test_smoke.py b/tests/test_smoke.py index a81f764d5fb..0368d2efe7e 100644 --- a/tests/test_smoke.py +++ b/tests/test_smoke.py @@ -740,7 +740,7 @@ def test_large_job_queue(generic_cloud: str): f'sky launch -y -c {name} --cloud {generic_cloud}', f'for i in `seq 1 75`; do sky exec {name} -n {name}-$i -d "echo $i; sleep 100000000"; done', f'sky cancel -y {name} 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16', - 'sleep 120', + 'sleep 90', # Each job takes 0.5 CPU and the default VM has 8 CPUs, so there should be 8 / 0.5 = 16 jobs running. # The first 16 jobs are canceled, so there should be 75 - 32 = 43 jobs PENDING. f'sky queue {name} | grep -v grep | grep PENDING | wc -l | grep 43', From 496284e73218e9b549d56866a8a233e9e9d071d6 Mon Sep 17 00:00:00 2001 From: mraheja Date: Tue, 28 Feb 2023 13:51:55 -0800 Subject: [PATCH 10/39] change stop lock range --- sky/skylet/job_lib.py | 32 +++++++++++++++++--------------- 1 file changed, 17 insertions(+), 15 deletions(-) diff --git a/sky/skylet/job_lib.py b/sky/skylet/job_lib.py index 4a3cfe9e67b..0fe031ad49a 100644 --- a/sky/skylet/job_lib.py +++ b/sky/skylet/job_lib.py @@ -688,22 +688,24 @@ def cancel_jobs(job_owner: str, jobs: Optional[List[int]]) -> None: # ray cluster (tracked in #1262). for job in job_records: job_id = make_ray_job_id(job['job_id'], job_owner) - try: - # TODO(mraheja): remove pylint disabling when filelock - # version updated - # pylint: disable=abstract-class-instantiated - with filelock.FileLock(_get_lock_path(job_id)): + # Job is locked to ensure that pending queue does not start it while + # it is being cancelled + with filelock.FileLock(_get_lock_path(job_id)): + try: + # TODO(mraheja): remove pylint disabling when filelock + # version updated + # pylint: disable=abstract-class-instantiated job_client.stop_job(job_id) - except RuntimeError as e: - # If the job does not exist or if the request to the - # job server fails. - logger.warning(str(e)) - continue - - if job['status'] in [ - JobStatus.SETTING_UP, JobStatus.PENDING, JobStatus.RUNNING - ]: - set_status(job['job_id'], JobStatus.CANCELLED) + except RuntimeError as e: + # If the job does not exist or if the request to the + # job server fails. + logger.warning(str(e)) + continue + + if job['status'] in [ + JobStatus.SETTING_UP, JobStatus.PENDING, JobStatus.RUNNING + ]: + set_status(job['job_id'], JobStatus.CANCELLED) def get_run_timestamp(job_id: Optional[int]) -> Optional[str]: From cbf8ab3c8f3696aa3ad7d3a85204bff39f94db9c Mon Sep 17 00:00:00 2001 From: mraheja Date: Tue, 28 Feb 2023 14:49:12 -0800 Subject: [PATCH 11/39] error message for needing update --- sky/backends/cloud_vm_ray_backend.py | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/sky/backends/cloud_vm_ray_backend.py b/sky/backends/cloud_vm_ray_backend.py index 56be6054076..a2371262c73 100644 --- a/sky/backends/cloud_vm_ray_backend.py +++ b/sky/backends/cloud_vm_ray_backend.py @@ -2606,6 +2606,14 @@ def _exec_code_on_head( job_submit_cmd, stream_logs=False, require_outputs=True) + + if 'has no attribute' in stdout: + logger.warn( + f'{colorama.Fore.RED}SkyPilot must be updated on remote,' + f'use `sky launch` instead{colorama.Style.RESET_ALL}' + ) + return + subprocess_utils.handle_returncode(returncode, job_submit_cmd, f'Failed to submit job {job_id}.', From afedc830d61c93bfb2bebf926ebc3afb56ff1542 Mon Sep 17 00:00:00 2001 From: mraheja Date: Fri, 3 Mar 2023 10:31:08 -0800 Subject: [PATCH 12/39] removed wandb.py --- tests/wandb.py | 0 1 file changed, 0 insertions(+), 0 deletions(-) delete mode 100644 tests/wandb.py diff --git a/tests/wandb.py b/tests/wandb.py deleted file mode 100644 index e69de29bb2d..00000000000 From 68dea21ece18e55668ca3858f719efc370c5d6ce Mon Sep 17 00:00:00 2001 From: mraheja Date: Fri, 3 Mar 2023 11:58:18 -0800 Subject: [PATCH 13/39] added skylet restart --- sky/backends/cloud_vm_ray_backend.py | 14 +++++++++----- sky/skylet/job_lib.py | 24 +++++++++--------------- 2 files changed, 18 insertions(+), 20 deletions(-) diff --git a/sky/backends/cloud_vm_ray_backend.py b/sky/backends/cloud_vm_ray_backend.py index a2371262c73..e94e062de79 100644 --- a/sky/backends/cloud_vm_ray_backend.py +++ b/sky/backends/cloud_vm_ray_backend.py @@ -107,6 +107,10 @@ pathlib.Path(sky.__file__).resolve().parent / 'backends' / 'monkey_patches' / 'monkey_patch_ray_up.py') +_SKYLET_RESTART_CMD = ( + '(pkill -f "python3 -m sky.skylet.skylet"; nohup python3 -m ' + 'sky.skylet.skylet >> ~/.sky/skylet.log 2>&1 &);') + def _get_cluster_config_template(cloud): cloud_to_template = { @@ -1648,9 +1652,10 @@ def _ensure_cluster_ray_started(self, handle: 'CloudVmRayResourceHandle', return backend = CloudVmRayBackend() + # For backward compatability and robustness of skylet, it is restarted returncode = backend.run_on_head( handle, - 'ray status', + f'{_SKYLET_RESTART_CMD}ray status', # At this state, an erroneous cluster may not have cached # handle.head_ip (global_user_state.add_or_update_cluster(..., # ready=True)). @@ -2608,10 +2613,9 @@ def _exec_code_on_head( require_outputs=True) if 'has no attribute' in stdout: - logger.warn( - f'{colorama.Fore.RED}SkyPilot must be updated on remote,' - f'use `sky launch` instead{colorama.Style.RESET_ALL}' - ) + logger.info( + f'{colorama.Fore.RED}SkyPilot must be updated on remote,' + f'use `sky launch` instead{colorama.Style.RESET_ALL}') return subprocess_utils.handle_returncode(returncode, diff --git a/sky/skylet/job_lib.py b/sky/skylet/job_lib.py index 778b9e353a2..122b6112d5a 100644 --- a/sky/skylet/job_lib.py +++ b/sky/skylet/job_lib.py @@ -10,7 +10,7 @@ import subprocess import time import typing -from typing import Any, Dict, List, Optional, Tuple +from typing import Any, Dict, List, Optional import colorama import filelock @@ -155,13 +155,6 @@ def queue(self, job_id: int, cmd: str) -> None: set_status(job_id, JobStatus.PENDING) self.schedule_step() - def set_scheduled(self, job_id: str): - # TODO(mraheja): remove pylint disabling when filelock - # version updated - # pylint: disable=abstract-class-instantiated - with filelock.FileLock(_get_lock_path(job_id)): - self.remove_job_no_lock(job_id) - def remove_job_no_lock(self, job_id: str) -> None: _CURSOR.execute(f'DELETE FROM pending_jobs WHERE job_id={job_id!r}') _CONN.commit() @@ -174,7 +167,7 @@ def _run_job(self, job_id: int, run_cmd: str): def schedule_step(self) -> None: job_owner = getpass.getuser() - jobs = list(self._get_jobs()) + jobs = self._get_jobs() if len(jobs) > 0: update_status(job_owner, _SUBMITTED_GAP_SECONDS) for job_id, run_cmd, submit, _ in jobs: @@ -193,15 +186,16 @@ def schedule_step(self) -> None: self._run_job(job_id, run_cmd) return - def _get_jobs(self) -> Tuple: + def _get_jobs(self) -> List: raise NotImplementedError class FIFOScheduler(JobScheduler): """First in first out job scheduler""" - def _get_jobs(self) -> Tuple: - return _CURSOR.execute('SELECT * FROM pending_jobs ORDER BY job_id') + def _get_jobs(self) -> List: + return list( + _CURSOR.execute('SELECT * FROM pending_jobs ORDER BY job_id')) scheduler = FIFOScheduler() @@ -695,7 +689,7 @@ def cancel_jobs(job_owner: str, jobs: Optional[List[int]]) -> None: job_id = make_ray_job_id(job['job_id'], job_owner) # Job is locked to ensure that pending queue does not start it while # it is being cancelled - with filelock.FileLock(_get_lock_path(job_id)): + with filelock.FileLock(_get_lock_path(job['job_id'],)): try: # TODO(mraheja): remove pylint disabling when filelock # version updated @@ -768,9 +762,9 @@ def add_job(cls, job_name: str, username: str, run_timestamp: str, return cls._build(code) @classmethod - def queue_job(cls, job_name: str, cmd: str) -> None: + def queue_job(cls, job_id: int, cmd: str) -> str: code = ['job_lib.scheduler.queue(' - f'{job_name!r},' + f'{job_id!r},' f'{cmd!r})'] return cls._build(code) From 5af314645d86ba0cb9022f25287a486cd127e146 Mon Sep 17 00:00:00 2001 From: mraheja Date: Fri, 10 Mar 2023 10:01:57 -0800 Subject: [PATCH 14/39] updates --- sky/backends/cloud_vm_ray_backend.py | 17 ++++++++++++----- sky/skylet/job_lib.py | 11 ++++++++--- tests/test_smoke.py | 19 ++++++++++++++----- 3 files changed, 34 insertions(+), 13 deletions(-) diff --git a/sky/backends/cloud_vm_ray_backend.py b/sky/backends/cloud_vm_ray_backend.py index 3989646f7e2..def82a2ab77 100644 --- a/sky/backends/cloud_vm_ray_backend.py +++ b/sky/backends/cloud_vm_ray_backend.py @@ -107,8 +107,11 @@ pathlib.Path(sky.__file__).resolve().parent / 'backends' / 'monkey_patches' / 'monkey_patch_ray_up.py') -_SKYLET_RESTART_CMD = ( - '(pkill -f "python3 -m sky.skylet.skylet"; nohup python3 -m ' +SKYLET_VERSION = 1 # maybe move it to sky/skylet/constants.py +SKYLET_VERSION_FILE = '~/.sky/skylet_version' # maybe move it to sky/skylet/constants.py +# Restart skylet when the version does not match to keep the skylet up-to-date. +_MAYBE_SKYLET_RESTART_CMD = ( + f'[[ $(cat {SKYLET_VERSION_FILE}) = "{SKYLET_VERSION}" ]] || (pkill -f "python3 -m sky.skylet.skylet"; echo {SKYLET_VERSION} > {SKYLET_VERSION_FILE}; nohup python3 -m ' 'sky.skylet.skylet >> ~/.sky/skylet.log 2>&1 &);') @@ -321,7 +324,7 @@ def add_gang_scheduling_placement_group( print('INFO: All task resources reserved.', file=sys.stderr, flush=True) - job_lib.scheduler.schedule_step() + job_lib.scheduler.async_schedule_step() """) ] @@ -1650,10 +1653,12 @@ def _ensure_cluster_ray_started(self, handle: 'CloudVmRayResourceHandle', return backend = CloudVmRayBackend() + print(_MAYBE_SKYLET_RESTART_CMD) + # For backward compatability and robustness of skylet, it is restarted returncode = backend.run_on_head( handle, - f'{_SKYLET_RESTART_CMD}ray status', + f'{_MAYBE_SKYLET_RESTART_CMD}ray status', # At this state, an erroneous cluster may not have cached # handle.head_ip (global_user_state.add_or_update_cluster(..., # ready=True)). @@ -2609,7 +2614,9 @@ def _exec_code_on_head( stream_logs=False, require_outputs=True) - if 'has no attribute' in stdout: + if True or 'has no attribute' in stdout: + # Happens when someone calls `sky exec` but remote has not been updated + # necessicating calling `sky launch` logger.info( f'{colorama.Fore.RED}SkyPilot must be updated on remote,' f'use `sky launch` instead{colorama.Style.RESET_ALL}') diff --git a/sky/skylet/job_lib.py b/sky/skylet/job_lib.py index 122b6112d5a..858b0a27966 100644 --- a/sky/skylet/job_lib.py +++ b/sky/skylet/job_lib.py @@ -3,6 +3,7 @@ This is a remote utility module that provides job queue functionality. """ import enum +import multiprocessing import os import pathlib import psutil @@ -186,14 +187,18 @@ def schedule_step(self) -> None: self._run_job(job_id, run_cmd) return - def _get_jobs(self) -> List: + def async_schedule_step(self) -> None: + p = multiprocessing.Process(target=self.schedule_step) + p.start() + + def _get_jobs(self) -> List[str]: raise NotImplementedError class FIFOScheduler(JobScheduler): """First in first out job scheduler""" - def _get_jobs(self) -> List: + def _get_jobs(self) -> List[str]: return list( _CURSOR.execute('SELECT * FROM pending_jobs ORDER BY job_id')) @@ -689,7 +694,7 @@ def cancel_jobs(job_owner: str, jobs: Optional[List[int]]) -> None: job_id = make_ray_job_id(job['job_id'], job_owner) # Job is locked to ensure that pending queue does not start it while # it is being cancelled - with filelock.FileLock(_get_lock_path(job['job_id'],)): + with filelock.FileLock(_get_lock_path(job['job_id'])): try: # TODO(mraheja): remove pylint disabling when filelock # version updated diff --git a/tests/test_smoke.py b/tests/test_smoke.py index 507811e9599..3cb4aec0914 100644 --- a/tests/test_smoke.py +++ b/tests/test_smoke.py @@ -836,14 +836,23 @@ def test_large_job_queue(generic_cloud: str): f'sky launch -y -c {name} --cloud {generic_cloud}', f'for i in `seq 1 75`; do sky exec {name} -n {name}-$i -d "echo $i; sleep 100000000"; done', f'sky cancel -y {name} 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16', - 'sleep 90', + 'sleep 60', # Each job takes 0.5 CPU and the default VM has 8 CPUs, so there should be 8 / 0.5 = 16 jobs running. # The first 16 jobs are canceled, so there should be 75 - 32 = 43 jobs PENDING. f'sky queue {name} | grep -v grep | grep PENDING | wc -l | grep 43', - f'sky queue {name} | grep {name}-15 | grep CANCELLED', - f'sky queue {name} | grep {name}-32 | grep RUNNING', - f'sky queue {name} | grep {name}-33 | grep PENDING', - f'sky queue {name} | grep {name}-50 | grep PENDING', + # Make sure the jobs are scheduled in FIFO order + *[ + f'sky queue {name} | grep {name}-{i} | grep CANCELLED' + for i in range(17) + ], + *[ + f'sky queue {name} | grep {name}-{i} | grep RUNNING' + for i in range(17, 33) + ], + *[ + f'sky queue {name} | grep {name}-{i} | grep PENDING' + for i in range(33, 75) + ], ], f'sky down -y {name}', timeout=20 * 60, From 052a57979e00efc63255b80d9899cac21cbcdd52 Mon Sep 17 00:00:00 2001 From: mraheja Date: Fri, 17 Mar 2023 10:00:39 -0700 Subject: [PATCH 15/39] addressed comments --- sky/backends/cloud_vm_ray_backend.py | 21 ++++++--------------- sky/execution.py | 14 ++++++++++++++ sky/skylet/job_lib.py | 10 +++++----- 3 files changed, 25 insertions(+), 20 deletions(-) diff --git a/sky/backends/cloud_vm_ray_backend.py b/sky/backends/cloud_vm_ray_backend.py index def82a2ab77..5b6352d844d 100644 --- a/sky/backends/cloud_vm_ray_backend.py +++ b/sky/backends/cloud_vm_ray_backend.py @@ -107,13 +107,6 @@ pathlib.Path(sky.__file__).resolve().parent / 'backends' / 'monkey_patches' / 'monkey_patch_ray_up.py') -SKYLET_VERSION = 1 # maybe move it to sky/skylet/constants.py -SKYLET_VERSION_FILE = '~/.sky/skylet_version' # maybe move it to sky/skylet/constants.py -# Restart skylet when the version does not match to keep the skylet up-to-date. -_MAYBE_SKYLET_RESTART_CMD = ( - f'[[ $(cat {SKYLET_VERSION_FILE}) = "{SKYLET_VERSION}" ]] || (pkill -f "python3 -m sky.skylet.skylet"; echo {SKYLET_VERSION} > {SKYLET_VERSION_FILE}; nohup python3 -m ' - 'sky.skylet.skylet >> ~/.sky/skylet.log 2>&1 &);') - def _get_cluster_config_template(cloud): cloud_to_template = { @@ -1653,12 +1646,10 @@ def _ensure_cluster_ray_started(self, handle: 'CloudVmRayResourceHandle', return backend = CloudVmRayBackend() - print(_MAYBE_SKYLET_RESTART_CMD) - # For backward compatability and robustness of skylet, it is restarted returncode = backend.run_on_head( handle, - f'{_MAYBE_SKYLET_RESTART_CMD}ray status', + f'ray status', # At this state, an erroneous cluster may not have cached # handle.head_ip (global_user_state.add_or_update_cluster(..., # ready=True)). @@ -2614,13 +2605,13 @@ def _exec_code_on_head( stream_logs=False, require_outputs=True) - if True or 'has no attribute' in stdout: + if 'has no attribute' in stdout: # Happens when someone calls `sky exec` but remote has not been updated # necessicating calling `sky launch` - logger.info( - f'{colorama.Fore.RED}SkyPilot must be updated on remote,' - f'use `sky launch` instead{colorama.Style.RESET_ALL}') - return + with ux_utils.print_exception_no_traceback(): + raise RuntimeError( + f'{colorama.Fore.RED}SkyPilot must be updated on remote,' + f' use `sky launch` instead{colorama.Style.RESET_ALL}') subprocess_utils.handle_returncode(returncode, job_submit_cmd, diff --git a/sky/execution.py b/sky/execution.py index 9bc64d89ae3..b37867f2f85 100644 --- a/sky/execution.py +++ b/sky/execution.py @@ -66,6 +66,14 @@ sky.spot_launch(task, ...) """.strip() +SKYLET_VERSION = 1 # maybe move it to sky/skylet/constants.py +SKYLET_VERSION_FILE = '~/.sky/skylet_version' # maybe move it to sky/skylet/constants.py + +# Restart skylet when the version does not match to keep the skylet up-to-date. +_MAYBE_SKYLET_RESTART_CMD = ( + f'[[ $(cat {SKYLET_VERSION_FILE}) = "{SKYLET_VERSION}" ]] || (pkill -f "python3 -m sky.skylet.skylet"; echo {SKYLET_VERSION} > {SKYLET_VERSION_FILE}; nohup python3 -m ' + 'sky.skylet.skylet >> ~/.sky/skylet.log 2>&1 &);') + def _convert_to_dag(entrypoint: Any) -> 'sky.Dag': """Convert the entrypoint to a sky.Dag. @@ -259,6 +267,7 @@ def _execute( task.sync_storage_mounts() try: + if Stage.PROVISION in stages: if handle is None: handle = backend.provision(task, @@ -284,6 +293,11 @@ def _execute( logger.info('Setup commands skipped.') elif Stage.SETUP in stages: backend.setup(handle, task, detach_setup=detach_setup) + backend.run_on_head( + handle, + _MAYBE_SKYLET_RESTART_CMD, + use_cached_head_ip=False, + ) if Stage.PRE_EXEC in stages: if idle_minutes_to_autostop is not None: diff --git a/sky/skylet/job_lib.py b/sky/skylet/job_lib.py index 858b0a27966..4e88ea0c9f6 100644 --- a/sky/skylet/job_lib.py +++ b/sky/skylet/job_lib.py @@ -217,17 +217,17 @@ def _get_jobs(self) -> List[str]: } _RAY_TO_JOB_STATUS_MAP = { - # These are intentionally set to one status before, because: + # These are intentionally set this way, because: # 1. when the ray status indicates the job is PENDING the generated - # python program should not be started yet, i.e. the job should be INIT. + # python program has left the job queue and is now PENDING # 2. when the ray status indicates the job is RUNNING the job can be in # setup or resources may not be allocated yet, i.e. the job should be - # SETTING_UP. - # For case 2, update_job_status() would compare this mapped SETTING_UP to + # PENDING. + # For case 2, update_job_status() would compare this mapped PENDING to # the status in our jobs DB and take the max. This is because the job's # generated ray program is the only place that can determine a job has # reserved resources and actually started running: it will set the - # status in the DB to RUNNING. + # status in the DB to SETTING_UP or RUNNING. # If there is no setup specified in the task, as soon as it is started # (ray's status becomes RUNNING), i.e. it will be very rare that the job # will be set to SETTING_UP by the update_job_status, as our generated From 6c29f095d3753320cb3ae160863b81bb3d44f575 Mon Sep 17 00:00:00 2001 From: mraheja Date: Fri, 24 Mar 2023 09:42:51 -0700 Subject: [PATCH 16/39] formatting + minor comment addressing --- sky/backends/cloud_vm_ray_backend.py | 17 +++++++++++++++-- sky/execution.py | 13 ------------- sky/skylet/constants.py | 3 +++ sky/skylet/job_lib.py | 11 ++++++----- 4 files changed, 24 insertions(+), 20 deletions(-) diff --git a/sky/backends/cloud_vm_ray_backend.py b/sky/backends/cloud_vm_ray_backend.py index 5b6352d844d..4c382bd30dc 100644 --- a/sky/backends/cloud_vm_ray_backend.py +++ b/sky/backends/cloud_vm_ray_backend.py @@ -107,6 +107,13 @@ pathlib.Path(sky.__file__).resolve().parent / 'backends' / 'monkey_patches' / 'monkey_patch_ray_up.py') +# Restart skylet when the version does not match to keep the skylet up-to-date. +_MAYBE_SKYLET_RESTART_CMD = ( + f'[[ $(cat {constants.SKYLET_VERSION_FILE}) = "{constants.SKYLET_VERSION}"' + ' ]] || (pkill -f "python3 -m sky.skylet.skylet";' + f' echo {constants.SKYLET_VERSION} > {constants.SKYLET_VERSION_FILE};' + 'nohup python3 -m sky.skylet.skylet >> ~/.sky/skylet.log 2>&1 &);') + def _get_cluster_config_template(cloud): cloud_to_template = { @@ -1649,7 +1656,7 @@ def _ensure_cluster_ray_started(self, handle: 'CloudVmRayResourceHandle', # For backward compatability and robustness of skylet, it is restarted returncode = backend.run_on_head( handle, - f'ray status', + 'ray status', # At this state, an erroneous cluster may not have cached # handle.head_ip (global_user_state.add_or_update_cluster(..., # ready=True)). @@ -2332,6 +2339,12 @@ def _update_after_cluster_provisioned( usage_lib.messages.usage.update_final_cluster_status( global_user_state.ClusterStatus.UP) + self.run_on_head( + handle, + _MAYBE_SKYLET_RESTART_CMD, + use_cached_head_ip=False, + ) + # Update job queue to avoid stale jobs (when restarted), before # setting the cluster to be ready. if prev_cluster_status == global_user_state.ClusterStatus.INIT: @@ -2606,7 +2619,7 @@ def _exec_code_on_head( require_outputs=True) if 'has no attribute' in stdout: - # Happens when someone calls `sky exec` but remote has not been updated + # Happens when someone calls `sky exec` but remote is outdated # necessicating calling `sky launch` with ux_utils.print_exception_no_traceback(): raise RuntimeError( diff --git a/sky/execution.py b/sky/execution.py index b37867f2f85..7956234f2f3 100644 --- a/sky/execution.py +++ b/sky/execution.py @@ -66,14 +66,6 @@ sky.spot_launch(task, ...) """.strip() -SKYLET_VERSION = 1 # maybe move it to sky/skylet/constants.py -SKYLET_VERSION_FILE = '~/.sky/skylet_version' # maybe move it to sky/skylet/constants.py - -# Restart skylet when the version does not match to keep the skylet up-to-date. -_MAYBE_SKYLET_RESTART_CMD = ( - f'[[ $(cat {SKYLET_VERSION_FILE}) = "{SKYLET_VERSION}" ]] || (pkill -f "python3 -m sky.skylet.skylet"; echo {SKYLET_VERSION} > {SKYLET_VERSION_FILE}; nohup python3 -m ' - 'sky.skylet.skylet >> ~/.sky/skylet.log 2>&1 &);') - def _convert_to_dag(entrypoint: Any) -> 'sky.Dag': """Convert the entrypoint to a sky.Dag. @@ -293,11 +285,6 @@ def _execute( logger.info('Setup commands skipped.') elif Stage.SETUP in stages: backend.setup(handle, task, detach_setup=detach_setup) - backend.run_on_head( - handle, - _MAYBE_SKYLET_RESTART_CMD, - use_cached_head_ip=False, - ) if Stage.PRE_EXEC in stages: if idle_minutes_to_autostop is not None: diff --git a/sky/skylet/constants.py b/sky/skylet/constants.py index dad9465dc7b..58b3b51011c 100644 --- a/sky/skylet/constants.py +++ b/sky/skylet/constants.py @@ -10,3 +10,6 @@ 'command to initialize it locally: sky launch -c {cluster} \'\'') JOB_ID_ENV_VAR = 'SKYPILOT_JOB_ID' + +SKYLET_VERSION = 1 +SKYLET_VERSION_FILE = '~/.sky/skylet_version' diff --git a/sky/skylet/job_lib.py b/sky/skylet/job_lib.py index 4e88ea0c9f6..4135c51a47b 100644 --- a/sky/skylet/job_lib.py +++ b/sky/skylet/job_lib.py @@ -11,7 +11,7 @@ import subprocess import time import typing -from typing import Any, Dict, List, Optional +from typing import Any, Dict, List, Optional, Tuple import colorama import filelock @@ -156,7 +156,7 @@ def queue(self, job_id: int, cmd: str) -> None: set_status(job_id, JobStatus.PENDING) self.schedule_step() - def remove_job_no_lock(self, job_id: str) -> None: + def remove_job_no_lock(self, job_id: int) -> None: _CURSOR.execute(f'DELETE FROM pending_jobs WHERE job_id={job_id!r}') _CONN.commit() @@ -191,14 +191,14 @@ def async_schedule_step(self) -> None: p = multiprocessing.Process(target=self.schedule_step) p.start() - def _get_jobs(self) -> List[str]: + def _get_jobs(self) -> List[Tuple[int, str, int, int]]: raise NotImplementedError class FIFOScheduler(JobScheduler): """First in first out job scheduler""" - def _get_jobs(self) -> List[str]: + def _get_jobs(self) -> List[Tuple[int, str, int, int]]: return list( _CURSOR.execute('SELECT * FROM pending_jobs ORDER BY job_id')) @@ -219,7 +219,8 @@ def _get_jobs(self) -> List[str]: _RAY_TO_JOB_STATUS_MAP = { # These are intentionally set this way, because: # 1. when the ray status indicates the job is PENDING the generated - # python program has left the job queue and is now PENDING + # python program has been `ray job submit` from the job queue + # and is now PENDING # 2. when the ray status indicates the job is RUNNING the job can be in # setup or resources may not be allocated yet, i.e. the job should be # PENDING. From be36c3521fa154fe785cacd10458670031dbbbe0 Mon Sep 17 00:00:00 2001 From: mraheja Date: Fri, 24 Mar 2023 10:28:42 -0700 Subject: [PATCH 17/39] more comments addressed --- sky/backends/cloud_vm_ray_backend.py | 11 ++++++----- sky/skylet/job_lib.py | 5 +++++ 2 files changed, 11 insertions(+), 5 deletions(-) diff --git a/sky/backends/cloud_vm_ray_backend.py b/sky/backends/cloud_vm_ray_backend.py index 4c382bd30dc..88bc236b7c3 100644 --- a/sky/backends/cloud_vm_ray_backend.py +++ b/sky/backends/cloud_vm_ray_backend.py @@ -2339,11 +2339,12 @@ def _update_after_cluster_provisioned( usage_lib.messages.usage.update_final_cluster_status( global_user_state.ClusterStatus.UP) - self.run_on_head( - handle, - _MAYBE_SKYLET_RESTART_CMD, - use_cached_head_ip=False, - ) + with backend_utils.safe_console_status('Updating remote skylet'): + self.run_on_head( + handle, + _MAYBE_SKYLET_RESTART_CMD, + use_cached_head_ip=False, + ) # Update job queue to avoid stale jobs (when restarted), before # setting the cluster to be ready. diff --git a/sky/skylet/job_lib.py b/sky/skylet/job_lib.py index 4135c51a47b..0154bad15d1 100644 --- a/sky/skylet/job_lib.py +++ b/sky/skylet/job_lib.py @@ -192,6 +192,8 @@ def async_schedule_step(self) -> None: p.start() def _get_jobs(self) -> List[Tuple[int, str, int, int]]: + """Returns the metadata for jobs the pending jobs table + with job_id, run command, submit time, creation time""" raise NotImplementedError @@ -695,6 +697,9 @@ def cancel_jobs(job_owner: str, jobs: Optional[List[int]]) -> None: job_id = make_ray_job_id(job['job_id'], job_owner) # Job is locked to ensure that pending queue does not start it while # it is being cancelled + # TODO(mraheja): remove pylint disabling when filelock + # version updated + # pylint: disable=abstract-class-instantiated with filelock.FileLock(_get_lock_path(job['job_id'])): try: # TODO(mraheja): remove pylint disabling when filelock From 21919c532321a790f6983a626b9f734b79ed6631 Mon Sep 17 00:00:00 2001 From: Zhanghao Wu Date: Sun, 26 Mar 2023 22:28:09 +0000 Subject: [PATCH 18/39] Fix rich status --- sky/backends/cloud_vm_ray_backend.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sky/backends/cloud_vm_ray_backend.py b/sky/backends/cloud_vm_ray_backend.py index 3aa4cd22afe..9e90b329eff 100644 --- a/sky/backends/cloud_vm_ray_backend.py +++ b/sky/backends/cloud_vm_ray_backend.py @@ -2345,7 +2345,7 @@ def _update_after_cluster_provisioned( usage_lib.messages.usage.update_final_cluster_status( global_user_state.ClusterStatus.UP) - with backend_utils.safe_console_status('Updating remote skylet'): + with log_utils.safe_rich_status('Updating remote skylet'): self.run_on_head( handle, _MAYBE_SKYLET_RESTART_CMD, From 0b7141adf1395b9e0c3fe3af09eacc6e13a2079b Mon Sep 17 00:00:00 2001 From: mraheja Date: Tue, 11 Apr 2023 09:42:25 -0700 Subject: [PATCH 19/39] fixed stalling issue --- sky/backends/cloud_vm_ray_backend.py | 2 +- sky/skylet/job_lib.py | 5 ++++- 2 files changed, 5 insertions(+), 2 deletions(-) diff --git a/sky/backends/cloud_vm_ray_backend.py b/sky/backends/cloud_vm_ray_backend.py index 9e90b329eff..7444e2d13f5 100644 --- a/sky/backends/cloud_vm_ray_backend.py +++ b/sky/backends/cloud_vm_ray_backend.py @@ -330,7 +330,6 @@ def add_gang_scheduling_placement_group( print('INFO: All task resources reserved.', file=sys.stderr, flush=True) - job_lib.scheduler.async_schedule_step() """) ] @@ -378,6 +377,7 @@ def add_gang_scheduling_placement_group( self._code += [ textwrap.dedent(f"""\ job_lib.set_job_started({self.job_id!r}) + job_lib.scheduler.schedule_step() """), ] diff --git a/sky/skylet/job_lib.py b/sky/skylet/job_lib.py index 0154bad15d1..778f8c3b4e5 100644 --- a/sky/skylet/job_lib.py +++ b/sky/skylet/job_lib.py @@ -188,6 +188,7 @@ def schedule_step(self) -> None: return def async_schedule_step(self) -> None: + #TODO(mraheja): Integrate into mid-job scheduling p = multiprocessing.Process(target=self.schedule_step) p.start() @@ -715,7 +716,9 @@ def cancel_jobs(job_owner: str, jobs: Optional[List[int]]) -> None: if job['status'] in [ JobStatus.SETTING_UP, JobStatus.PENDING, JobStatus.RUNNING ]: - set_status(job['job_id'], JobStatus.CANCELLED) + _set_status_no_lock(job['job_id'], JobStatus.CANCELLED) + + scheduler.schedule_step() def get_run_timestamp(job_id: Optional[int]) -> Optional[str]: From 356c04ea0cddf4be1614d553240e68855a1c51b7 Mon Sep 17 00:00:00 2001 From: mraheja Date: Tue, 11 Apr 2023 09:45:13 -0700 Subject: [PATCH 20/39] format --- sky/skylet/job_lib.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sky/skylet/job_lib.py b/sky/skylet/job_lib.py index e88909e3672..da833e1ae23 100644 --- a/sky/skylet/job_lib.py +++ b/sky/skylet/job_lib.py @@ -717,7 +717,7 @@ def cancel_jobs(job_owner: str, jobs: Optional[List[int]]) -> None: JobStatus.SETTING_UP, JobStatus.PENDING, JobStatus.RUNNING ]: _set_status_no_lock(job['job_id'], JobStatus.CANCELLED) - + scheduler.schedule_step() From 3223b74c7b3c50b82c9b9dd158659081b1a84721 Mon Sep 17 00:00:00 2001 From: mraheja Date: Tue, 11 Apr 2023 10:00:02 -0700 Subject: [PATCH 21/39] small test bug --- tests/test_smoke.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/test_smoke.py b/tests/test_smoke.py index 17fa10aef84..723ce4aebcb 100644 --- a/tests/test_smoke.py +++ b/tests/test_smoke.py @@ -836,7 +836,7 @@ def test_large_job_queue(generic_cloud: str): # Make sure the jobs are scheduled in FIFO order *[ f'sky queue {name} | grep {name}-{i} | grep CANCELLED' - for i in range(17) + for i in range(1, 17) ], *[ f'sky queue {name} | grep {name}-{i} | grep RUNNING' From b6a832543fcc11c34a7259dd2a41b3a91323d962 Mon Sep 17 00:00:00 2001 From: mraheja Date: Fri, 28 Apr 2023 09:05:29 -0700 Subject: [PATCH 22/39] fixed skylet launch issue --- sky/backends/cloud_vm_ray_backend.py | 6 +----- sky/skylet/constants.py | 4 ++-- 2 files changed, 3 insertions(+), 7 deletions(-) diff --git a/sky/backends/cloud_vm_ray_backend.py b/sky/backends/cloud_vm_ray_backend.py index 8385f94c28b..60f1a00cb81 100644 --- a/sky/backends/cloud_vm_ray_backend.py +++ b/sky/backends/cloud_vm_ray_backend.py @@ -114,11 +114,7 @@ 'monkey_patches' / 'monkey_patch_ray_up.py') # Restart skylet when the version does not match to keep the skylet up-to-date. -_MAYBE_SKYLET_RESTART_CMD = ( - f'[[ $(cat {constants.SKYLET_VERSION_FILE}) = "{constants.SKYLET_VERSION}"' - ' ]] || (pkill -f "python3 -m sky.skylet.skylet";' - f' echo {constants.SKYLET_VERSION} > {constants.SKYLET_VERSION_FILE};' - 'nohup python3 -m sky.skylet.skylet >> ~/.sky/skylet.log 2>&1 &);') +_MAYBE_SKYLET_RESTART_CMD = 'python3 -m sky.skylet.attempt_skylet' def _get_cluster_config_template(cloud): diff --git a/sky/skylet/constants.py b/sky/skylet/constants.py index 58b3b51011c..f2fda4f8076 100644 --- a/sky/skylet/constants.py +++ b/sky/skylet/constants.py @@ -11,5 +11,5 @@ JOB_ID_ENV_VAR = 'SKYPILOT_JOB_ID' -SKYLET_VERSION = 1 -SKYLET_VERSION_FILE = '~/.sky/skylet_version' +SKYLET_VERSION = '1' +SKYLET_VERSION_FILE = '.sky/skylet_version' From 47ccba0451c58cf6d4b446999cb1ceee52006599 Mon Sep 17 00:00:00 2001 From: mraheja Date: Fri, 28 Apr 2023 09:08:34 -0700 Subject: [PATCH 23/39] formatting + forgetten file --- sky/skylet/attempt_skylet.py | 22 ++++++++++++++++++++++ 1 file changed, 22 insertions(+) create mode 100644 sky/skylet/attempt_skylet.py diff --git a/sky/skylet/attempt_skylet.py b/sky/skylet/attempt_skylet.py new file mode 100644 index 00000000000..204f1bbfa99 --- /dev/null +++ b/sky/skylet/attempt_skylet.py @@ -0,0 +1,22 @@ +"""Restarts skylet if version does not match""" + +import os +import subprocess +import sys + +from sky.skylet import constants + +if os.path.exists(constants.SKYLET_VERSION_FILE): + with open(constants.SKYLET_VERSION_FILE) as f: + if f.read() == constants.SKYLET_VERSION: + sys.exit(0) + +with open(constants.SKYLET_VERSION_FILE, 'w+') as f: + f.write(constants.SKYLET_VERSION) + +subprocess.run( + 'pkill -f "python3 -m sky.skylet.skylet"', + shell = True, check=True) +subprocess.run( + 'nohup python3 -m sky.skylet.skylet >> ~/.sky/skylet.log 2>&1 &', + shell=True, check=True) From 3dc81c4ea5ee4b1432bc8ae399dfd697b4ed521c Mon Sep 17 00:00:00 2001 From: mraheja Date: Fri, 28 Apr 2023 09:45:57 -0700 Subject: [PATCH 24/39] more formatting + remove check --- sky/skylet/attempt_skylet.py | 13 +++++++------ 1 file changed, 7 insertions(+), 6 deletions(-) diff --git a/sky/skylet/attempt_skylet.py b/sky/skylet/attempt_skylet.py index 204f1bbfa99..76fd1498bfc 100644 --- a/sky/skylet/attempt_skylet.py +++ b/sky/skylet/attempt_skylet.py @@ -14,9 +14,10 @@ with open(constants.SKYLET_VERSION_FILE, 'w+') as f: f.write(constants.SKYLET_VERSION) -subprocess.run( - 'pkill -f "python3 -m sky.skylet.skylet"', - shell = True, check=True) -subprocess.run( - 'nohup python3 -m sky.skylet.skylet >> ~/.sky/skylet.log 2>&1 &', - shell=True, check=True) +# Does not check if failed because skylet may not exist + +# pylint: disable=subprocess-run-check +subprocess.run('pkill -f "python3 -m sky.skylet.skylet"', shell=True) +# pylint: disable=subprocess-run-check +subprocess.run('nohup python3 -m sky.skylet.skylet >> ~/.sky/skylet.log 2>&1 &', + shell=True) From 3876881b46f73df6caa991471585b46ca4a3081f Mon Sep 17 00:00:00 2001 From: mraheja Date: Sat, 20 May 2023 00:33:16 -0700 Subject: [PATCH 25/39] addressed comments and removed pkill usage --- sky/backends/cloud_vm_ray_backend.py | 2 +- sky/skylet/attempt_skylet.py | 47 +++++++++++++++++++++------- sky/skylet/job_lib.py | 18 +++++------ tests/test_smoke.py | 2 +- 4 files changed, 45 insertions(+), 24 deletions(-) diff --git a/sky/backends/cloud_vm_ray_backend.py b/sky/backends/cloud_vm_ray_backend.py index f3286b4a1fb..fbb09dc1f2c 100644 --- a/sky/backends/cloud_vm_ray_backend.py +++ b/sky/backends/cloud_vm_ray_backend.py @@ -325,6 +325,7 @@ def add_gang_scheduling_placement_group( # it is waiting for other task to finish. We should hide the # error message. ray.get(pg.ready()) + job_lib.scheduler.schedule_step() print('INFO: All task resources reserved.', file=sys.stderr, flush=True) @@ -375,7 +376,6 @@ def add_gang_scheduling_placement_group( self._code += [ textwrap.dedent(f"""\ job_lib.set_job_started({self.job_id!r}) - job_lib.scheduler.schedule_step() """), ] diff --git a/sky/skylet/attempt_skylet.py b/sky/skylet/attempt_skylet.py index 76fd1498bfc..5f6b57e274b 100644 --- a/sky/skylet/attempt_skylet.py +++ b/sky/skylet/attempt_skylet.py @@ -2,22 +2,45 @@ import os import subprocess -import sys from sky.skylet import constants + +def restart_skylet(kill_old=False): + with open(constants.SKYLET_VERSION_FILE, 'w+') as v_f: + v_f.write(constants.SKYLET_VERSION) + + if kill_old: + #pylint: disable=subprocess-run-check + subprocess.run( + 'ps aux | grep "sky.skylet.skylet" | grep "python3 -m"' + '| awk \'{print $2}\' | xargs kill', + shell=True, + ) + subprocess.run( + 'nohup python3 -m sky.skylet.skylet' + ' >> ~/.sky/skylet.log 2>&1 &', + shell=True, + check=True) + + +proc = subprocess.run( + 'ps aux | grep "sky.skylet.skylet" | grep "python3 -m"' + '| grep -v "grep"', + shell=True, + capture_output=True, + check=True) + +running = proc.stdout.decode().strip().replace('\n', '') + +if not running: + restart_skylet(kill_old=False) + +version_match = False if os.path.exists(constants.SKYLET_VERSION_FILE): with open(constants.SKYLET_VERSION_FILE) as f: if f.read() == constants.SKYLET_VERSION: - sys.exit(0) - -with open(constants.SKYLET_VERSION_FILE, 'w+') as f: - f.write(constants.SKYLET_VERSION) - -# Does not check if failed because skylet may not exist + version_match = True -# pylint: disable=subprocess-run-check -subprocess.run('pkill -f "python3 -m sky.skylet.skylet"', shell=True) -# pylint: disable=subprocess-run-check -subprocess.run('nohup python3 -m sky.skylet.skylet >> ~/.sky/skylet.log 2>&1 &', - shell=True) +if not version_match: + restart_skylet(kill_old=True) diff --git a/sky/skylet/job_lib.py b/sky/skylet/job_lib.py index da833e1ae23..699cf88fdb0 100644 --- a/sky/skylet/job_lib.py +++ b/sky/skylet/job_lib.py @@ -140,7 +140,6 @@ def colored_str(self): # to avoid race condition with `ray job` to make sure it job has been # correctly updated. # TODO(zhwu): This number should be tuned based on heuristics. -_SUBMITTED_GAP_SECONDS = 60 _PENDING_SUBMIT_TIMEOUT = 5 _PRE_RESOURCE_STATUSES = [JobStatus.PENDING] @@ -170,7 +169,7 @@ def schedule_step(self) -> None: job_owner = getpass.getuser() jobs = self._get_jobs() if len(jobs) > 0: - update_status(job_owner, _SUBMITTED_GAP_SECONDS) + update_status(job_owner) for job_id, run_cmd, submit, _ in jobs: # TODO(mraheja): remove pylint disabling when filelock # version updated @@ -423,9 +422,9 @@ def _get_records_from_rows(rows) -> List[Dict[str, Any]]: return records -def _get_jobs(username: Optional[str], - status_list: Optional[List[JobStatus]] = None, - submitted_gap_sec: int = 0) -> List[Dict[str, Any]]: +def _get_jobs( + username: Optional[str], + status_list: Optional[List[JobStatus]] = None) -> List[Dict[str, Any]]: if status_list is None: status_list = list(JobStatus) status_str_list = [status.value for status in status_list] @@ -436,7 +435,7 @@ def _get_jobs(username: Optional[str], WHERE status IN ({','.join(['?'] * len(status_list))}) AND submitted_at <= (?) ORDER BY job_id DESC""", - (*status_str_list, time.time() - submitted_gap_sec), + (*status_str_list, time.time()), ) else: rows = _CURSOR.execute( @@ -445,7 +444,7 @@ def _get_jobs(username: Optional[str], WHERE status IN ({','.join(['?'] * len(status_list))}) AND username=(?) AND submitted_at <= (?) ORDER BY job_id DESC""", - (*status_str_list, username, time.time() - submitted_gap_sec), + (*status_str_list, username, time.time()), ) records = _get_records_from_rows(rows) @@ -584,7 +583,7 @@ def fail_all_jobs_in_progress() -> None: _CONN.commit() -def update_status(job_owner: str, submitted_gap_sec: int = 0) -> None: +def update_status(job_owner: str) -> None: # This will be called periodically by the skylet to update the status # of the jobs in the database, to avoid stale job status. # NOTE: there might be a INIT job in the database set to FAILED by this @@ -592,8 +591,7 @@ def update_status(job_owner: str, submitted_gap_sec: int = 0) -> None: # not submitted yet. It will be then reset to PENDING / RUNNING when the # app starts. nonterminal_jobs = _get_jobs(username=None, - status_list=JobStatus.nonterminal_statuses(), - submitted_gap_sec=submitted_gap_sec) + status_list=JobStatus.nonterminal_statuses()) nonterminal_job_ids = [job['job_id'] for job in nonterminal_jobs] update_job_status(job_owner, nonterminal_job_ids) diff --git a/tests/test_smoke.py b/tests/test_smoke.py index bffdc314d17..d5d73b0f2cc 100644 --- a/tests/test_smoke.py +++ b/tests/test_smoke.py @@ -592,7 +592,7 @@ def test_aws_stale_job_manual_restart(): f'sky logs {name} 1 --status', f'sky logs {name} 3 --status', # Ensure the skylet updated the stale job status. - f'sleep {events.JobSchedulereEvent.EVENT_INTERVAL_SECONDS}', + f'sleep {events.JobSchedulerEvent.EVENT_INTERVAL_SECONDS}', f's=$(sky queue {name}); echo "$s"; echo; echo; echo "$s" | grep FAILED', ], f'sky down -y {name}', From 4a7ef2541e67c58d8947a358a258f76f3174424c Mon Sep 17 00:00:00 2001 From: mraheja Date: Sat, 20 May 2023 00:59:21 -0700 Subject: [PATCH 26/39] schedule after setup too --- sky/backends/cloud_vm_ray_backend.py | 1 + 1 file changed, 1 insertion(+) diff --git a/sky/backends/cloud_vm_ray_backend.py b/sky/backends/cloud_vm_ray_backend.py index fbb09dc1f2c..7b2d4d0c80d 100644 --- a/sky/backends/cloud_vm_ray_backend.py +++ b/sky/backends/cloud_vm_ray_backend.py @@ -375,6 +375,7 @@ def add_gang_scheduling_placement_group( self._code += [ textwrap.dedent(f"""\ + job_lib.scheduler.schedule_step() job_lib.set_job_started({self.job_id!r}) """), ] From 4ac4335622b41b81ea4b29f0137c743b7f0aed7e Mon Sep 17 00:00:00 2001 From: Mehul Raheja Date: Wed, 31 May 2023 18:45:41 -0700 Subject: [PATCH 27/39] Update sky/backends/cloud_vm_ray_backend.py Co-authored-by: Zhanghao Wu --- sky/backends/cloud_vm_ray_backend.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/sky/backends/cloud_vm_ray_backend.py b/sky/backends/cloud_vm_ray_backend.py index 7b2d4d0c80d..fb51ee1a0d8 100644 --- a/sky/backends/cloud_vm_ray_backend.py +++ b/sky/backends/cloud_vm_ray_backend.py @@ -2686,8 +2686,8 @@ def _exec_code_on_head( # necessicating calling `sky launch` with ux_utils.print_exception_no_traceback(): raise RuntimeError( - f'{colorama.Fore.RED}SkyPilot must be updated on remote,' - f' use `sky launch` instead{colorama.Style.RESET_ALL}') + f'{colorama.Fore.RED}SkyPilot runtime is stale on the remote cluster. To update ' + f'run: sky launch -c {handle.cluster_name}{colorama.Style.RESET_ALL}') subprocess_utils.handle_returncode(returncode, job_submit_cmd, From f1556e6d3d744efa83f22ffb58f3ac469cec48fa Mon Sep 17 00:00:00 2001 From: mraheja Date: Wed, 31 May 2023 19:21:01 -0700 Subject: [PATCH 28/39] addressed more comments --- sky/backends/cloud_vm_ray_backend.py | 5 ++-- sky/skylet/attempt_skylet.py | 35 +++++++++++++--------------- sky/skylet/constants.py | 2 +- sky/skylet/job_lib.py | 7 +++--- 4 files changed, 22 insertions(+), 27 deletions(-) diff --git a/sky/backends/cloud_vm_ray_backend.py b/sky/backends/cloud_vm_ray_backend.py index 7b2d4d0c80d..5213fcf4037 100644 --- a/sky/backends/cloud_vm_ray_backend.py +++ b/sky/backends/cloud_vm_ray_backend.py @@ -325,7 +325,6 @@ def add_gang_scheduling_placement_group( # it is waiting for other task to finish. We should hide the # error message. ray.get(pg.ready()) - job_lib.scheduler.schedule_step() print('INFO: All task resources reserved.', file=sys.stderr, flush=True) @@ -343,6 +342,7 @@ def add_gang_scheduling_placement_group( # We unset it so that user setup command may properly use this env var. setup_cmd = 'unset CUDA_VISIBLE_DEVICES; ' + setup_cmd job_lib.set_status({job_id!r}, job_lib.JobStatus.SETTING_UP) + job_lib.scheduler.schedule_step() total_num_nodes = len(ray.nodes()) setup_bundles = [{{"CPU": _SETUP_CPUS}} for _ in range(total_num_nodes)] setup_pg = ray.util.placement_group(setup_bundles, strategy='STRICT_SPREAD') @@ -375,7 +375,6 @@ def add_gang_scheduling_placement_group( self._code += [ textwrap.dedent(f"""\ - job_lib.scheduler.schedule_step() job_lib.set_job_started({self.job_id!r}) """), ] @@ -1714,7 +1713,6 @@ def _ensure_cluster_ray_started(self, handle: 'CloudVmRayResourceHandle', return backend = CloudVmRayBackend() - # For backward compatability and robustness of skylet, it is restarted returncode = backend.run_on_head( handle, 'ray status', @@ -2400,6 +2398,7 @@ def _update_after_cluster_provisioned( usage_lib.messages.usage.update_final_cluster_status( global_user_state.ClusterStatus.UP) + # For backward compatability and robustness of skylet, it is restarted with log_utils.safe_rich_status('Updating remote skylet'): self.run_on_head( handle, diff --git a/sky/skylet/attempt_skylet.py b/sky/skylet/attempt_skylet.py index 5f6b57e274b..16cad1eef93 100644 --- a/sky/skylet/attempt_skylet.py +++ b/sky/skylet/attempt_skylet.py @@ -5,42 +5,39 @@ from sky.skylet import constants +VERSION_FILE = os.path.expanduser(constants.SKYLET_VERSION_FILE) -def restart_skylet(kill_old=False): - with open(constants.SKYLET_VERSION_FILE, 'w+') as v_f: +def restart_skylet(): + with open(VERSION_FILE, 'w+') as v_f: v_f.write(constants.SKYLET_VERSION) - if kill_old: - #pylint: disable=subprocess-run-check - subprocess.run( - 'ps aux | grep "sky.skylet.skylet" | grep "python3 -m"' - '| awk \'{print $2}\' | xargs kill', - shell=True, - ) + # Kills old skylet if it is running + subprocess.run( + 'ps aux | grep "sky.skylet.skylet" | grep "python3 -m"' + '| awk \'{print $2}\' | xargs kill', + shell=True, + check=False + ) subprocess.run( 'nohup python3 -m sky.skylet.skylet' ' >> ~/.sky/skylet.log 2>&1 &', shell=True, check=True) - - + proc = subprocess.run( 'ps aux | grep "sky.skylet.skylet" | grep "python3 -m"' - '| grep -v "grep"', + '| grep -v "grep" || true', shell=True, capture_output=True, check=True) running = proc.stdout.decode().strip().replace('\n', '') -if not running: - restart_skylet(kill_old=False) - version_match = False -if os.path.exists(constants.SKYLET_VERSION_FILE): - with open(constants.SKYLET_VERSION_FILE) as f: +if os.path.exists(VERSION_FILE): + with open(VERSION_FILE) as f: if f.read() == constants.SKYLET_VERSION: version_match = True -if not version_match: - restart_skylet(kill_old=True) +if not running or not version_match: + restart_skylet() diff --git a/sky/skylet/constants.py b/sky/skylet/constants.py index f2fda4f8076..b92707bc92a 100644 --- a/sky/skylet/constants.py +++ b/sky/skylet/constants.py @@ -12,4 +12,4 @@ JOB_ID_ENV_VAR = 'SKYPILOT_JOB_ID' SKYLET_VERSION = '1' -SKYLET_VERSION_FILE = '.sky/skylet_version' +SKYLET_VERSION_FILE = '~/.sky/skylet_version' diff --git a/sky/skylet/job_lib.py b/sky/skylet/job_lib.py index 699cf88fdb0..9d207af53b1 100644 --- a/sky/skylet/job_lib.py +++ b/sky/skylet/job_lib.py @@ -433,18 +433,17 @@ def _get_jobs( f"""\ SELECT * FROM jobs WHERE status IN ({','.join(['?'] * len(status_list))}) - AND submitted_at <= (?) ORDER BY job_id DESC""", - (*status_str_list, time.time()), + (*status_str_list,), ) else: rows = _CURSOR.execute( f"""\ SELECT * FROM jobs WHERE status IN ({','.join(['?'] * len(status_list))}) - AND username=(?) AND submitted_at <= (?) + AND username=(?) ORDER BY job_id DESC""", - (*status_str_list, username, time.time()), + (*status_str_list, username), ) records = _get_records_from_rows(rows) From d305d26f5ed6f7f3a566f76907f15732ede488c8 Mon Sep 17 00:00:00 2001 From: mraheja Date: Wed, 31 May 2023 22:23:56 -0700 Subject: [PATCH 29/39] formatting --- sky/backends/cloud_vm_ray_backend.py | 5 +++-- sky/skylet/attempt_skylet.py | 7 ++++--- 2 files changed, 7 insertions(+), 5 deletions(-) diff --git a/sky/backends/cloud_vm_ray_backend.py b/sky/backends/cloud_vm_ray_backend.py index 9129380224f..9e34535f28d 100644 --- a/sky/backends/cloud_vm_ray_backend.py +++ b/sky/backends/cloud_vm_ray_backend.py @@ -2750,8 +2750,9 @@ def _exec_code_on_head( # necessicating calling `sky launch` with ux_utils.print_exception_no_traceback(): raise RuntimeError( - f'{colorama.Fore.RED}SkyPilot runtime is stale on the remote cluster. To update ' - f'run: sky launch -c {handle.cluster_name}{colorama.Style.RESET_ALL}') + f'{colorama.Fore.RED}SkyPilot runtime is stale on the ' + 'remote cluster. To update run: sky launch -c ' + f'{handle.cluster_name}{colorama.Style.RESET_ALL}') subprocess_utils.handle_returncode(returncode, job_submit_cmd, diff --git a/sky/skylet/attempt_skylet.py b/sky/skylet/attempt_skylet.py index 16cad1eef93..a8a40b35603 100644 --- a/sky/skylet/attempt_skylet.py +++ b/sky/skylet/attempt_skylet.py @@ -7,6 +7,7 @@ VERSION_FILE = os.path.expanduser(constants.SKYLET_VERSION_FILE) + def restart_skylet(): with open(VERSION_FILE, 'w+') as v_f: v_f.write(constants.SKYLET_VERSION) @@ -16,14 +17,14 @@ def restart_skylet(): 'ps aux | grep "sky.skylet.skylet" | grep "python3 -m"' '| awk \'{print $2}\' | xargs kill', shell=True, - check=False - ) + check=False) subprocess.run( 'nohup python3 -m sky.skylet.skylet' ' >> ~/.sky/skylet.log 2>&1 &', shell=True, check=True) - + + proc = subprocess.run( 'ps aux | grep "sky.skylet.skylet" | grep "python3 -m"' '| grep -v "grep" || true', From cb0d66fe0804895f2ea1686d91c24c60a098be48 Mon Sep 17 00:00:00 2001 From: Zhanghao Wu Date: Mon, 5 Jun 2023 18:09:01 -0700 Subject: [PATCH 30/39] Address comments --- sky/backends/cloud_vm_ray_backend.py | 31 +++++++++++++++++----------- sky/skylet/attempt_skylet.py | 17 +++++++-------- sky/skylet/job_lib.py | 13 +++++------- 3 files changed, 31 insertions(+), 30 deletions(-) diff --git a/sky/backends/cloud_vm_ray_backend.py b/sky/backends/cloud_vm_ray_backend.py index 5ca2d84d69a..d51d0fc238e 100644 --- a/sky/backends/cloud_vm_ray_backend.py +++ b/sky/backends/cloud_vm_ray_backend.py @@ -282,7 +282,7 @@ def add_prologue(self, f'{job_id}, {spot_task.name!r}, {resources_str!r})', ] - def add_gang_scheduling_placement_group( + def add_gang_scheduling_placement_group_and_setup( self, num_nodes: int, accelerator_dict: Optional[Dict[str, float]], @@ -297,8 +297,9 @@ def add_gang_scheduling_placement_group( variable is assigned in a deterministic order whenever a new task is added. """ - assert self._has_prologue, ('Call add_prologue() before ' - 'add_gang_scheduling_placement_group().') + assert self._has_prologue, ( + 'Call add_prologue() before ' + 'add_gang_scheduling_placement_group_and_setup().') self._has_gang_scheduling = True self._num_nodes = num_nodes @@ -357,7 +358,12 @@ def add_gang_scheduling_placement_group( # We unset it so that user setup command may properly use this env var. setup_cmd = 'unset CUDA_VISIBLE_DEVICES; ' + setup_cmd job_lib.set_status({job_id!r}, job_lib.JobStatus.SETTING_UP) + + # The schedule_step should be called after the job status is set to non-PENDING, + # otherwise, the scheduler will think the current job is not submitted yet, and + # skip the scheduling step. job_lib.scheduler.schedule_step() + total_num_nodes = len(ray.nodes()) setup_bundles = [{{"CPU": _SETUP_CPUS}} for _ in range(total_num_nodes)] setup_pg = ray.util.placement_group(setup_bundles, strategy='STRICT_SPREAD') @@ -388,11 +394,11 @@ def add_gang_scheduling_placement_group( """) ] - self._code += [ - textwrap.dedent(f"""\ - job_lib.set_job_started({self.job_id!r}) - """), - ] + self._code.append(f'job_lib.set_job_started({self.job_id!r})') + if setup_cmd is None: + # Need to call schedule_step() to make sure the scheduler + # schedule the next pending job. + self._code.append('job_lib.scheduler.schedule_step()') # Export IP and node rank to the environment variables. self._code += [ @@ -422,7 +428,7 @@ def register_run_fn(self, run_fn: str, run_fn_name: str) -> None: run_fn: The run function to be run on the remote cluster. """ assert self._has_gang_scheduling, ( - 'Call add_gang_scheduling_placement_group() ' + 'Call add_gang_scheduling_placement_group_and_setup() ' 'before register_run_fn().') assert not self._has_register_run_fn, ( 'register_run_fn() called twice?') @@ -444,7 +450,8 @@ def add_ray_task(self, use_sudo: bool = False) -> None: """Generates code for a ray remote task that runs a bash command.""" assert self._has_gang_scheduling, ( - 'Call add_gang_scheduling_placement_group() before add_ray_task().') + 'Call add_gang_scheduling_placement_group_and_setup() before ' + 'add_ray_task().') assert (not self._has_register_run_fn or bash_script is None), ('bash_script should ' 'be None when run_fn is registered.') @@ -3839,7 +3846,7 @@ def _execute_task_one_node(self, handle: CloudVmRayResourceHandle, codegen.add_prologue(job_id, spot_task=task.spot_task, is_local=is_local) - codegen.add_gang_scheduling_placement_group( + codegen.add_gang_scheduling_placement_group_and_setup( 1, accelerator_dict, stable_cluster_internal_ips=internal_ips, @@ -3907,7 +3914,7 @@ def _execute_task_n_nodes(self, handle: CloudVmRayResourceHandle, codegen.add_prologue(job_id, spot_task=task.spot_task, is_local=is_local) - codegen.add_gang_scheduling_placement_group( + codegen.add_gang_scheduling_placement_group_and_setup( num_actual_nodes, accelerator_dict, stable_cluster_internal_ips=internal_ips, diff --git a/sky/skylet/attempt_skylet.py b/sky/skylet/attempt_skylet.py index a8a40b35603..b526d0642f6 100644 --- a/sky/skylet/attempt_skylet.py +++ b/sky/skylet/attempt_skylet.py @@ -9,13 +9,10 @@ def restart_skylet(): - with open(VERSION_FILE, 'w+') as v_f: - v_f.write(constants.SKYLET_VERSION) - # Kills old skylet if it is running subprocess.run( 'ps aux | grep "sky.skylet.skylet" | grep "python3 -m"' - '| awk \'{print $2}\' | xargs kill', + '| awk \'{print $2}\' | xargs kill >> ~/.sky/skylet.log 2>&1', shell=True, check=False) subprocess.run( @@ -23,21 +20,21 @@ def restart_skylet(): ' >> ~/.sky/skylet.log 2>&1 &', shell=True, check=True) + with open(VERSION_FILE, 'w') as v_f: + v_f.write(constants.SKYLET_VERSION) proc = subprocess.run( - 'ps aux | grep "sky.skylet.skylet" | grep "python3 -m"' - '| grep -v "grep" || true', + 'ps aux | grep -v "grep" | grep "sky.skylet.skylet" | grep "python3 -m"', shell=True, - capture_output=True, - check=True) + check=False) -running = proc.stdout.decode().strip().replace('\n', '') +running = (proc.returncode == 0) version_match = False if os.path.exists(VERSION_FILE): with open(VERSION_FILE) as f: - if f.read() == constants.SKYLET_VERSION: + if f.read().strip() == constants.SKYLET_VERSION: version_match = True if not running or not version_match: diff --git a/sky/skylet/job_lib.py b/sky/skylet/job_lib.py index e246dd4f0d9..b043453c925 100644 --- a/sky/skylet/job_lib.py +++ b/sky/skylet/job_lib.py @@ -4,7 +4,6 @@ """ import enum import json -import multiprocessing import os import pathlib import psutil @@ -187,14 +186,12 @@ def schedule_step(self) -> None: self._run_job(job_id, run_cmd) return - def async_schedule_step(self) -> None: - #TODO(mraheja): Integrate into mid-job scheduling - p = multiprocessing.Process(target=self.schedule_step) - p.start() - def _get_jobs(self) -> List[Tuple[int, str, int, int]]: - """Returns the metadata for jobs the pending jobs table - with job_id, run command, submit time, creation time""" + """Returns the metadata for jobs in the pending jobs table + + The information contains job_id, run command, submit time, + creation time. + """ raise NotImplementedError From 8b286371978e709b28e9d16f6836cc771568c813 Mon Sep 17 00:00:00 2001 From: Zhanghao Wu Date: Mon, 5 Jun 2023 19:10:38 -0700 Subject: [PATCH 31/39] faster job scheduling --- sky/backends/cloud_vm_ray_backend.py | 6 ++++-- sky/skylet/job_lib.py | 6 +++--- tests/test_smoke.py | 20 +++++++++++++++++++- 3 files changed, 26 insertions(+), 6 deletions(-) diff --git a/sky/backends/cloud_vm_ray_backend.py b/sky/backends/cloud_vm_ray_backend.py index d51d0fc238e..69b499ea37a 100644 --- a/sky/backends/cloud_vm_ray_backend.py +++ b/sky/backends/cloud_vm_ray_backend.py @@ -564,7 +564,8 @@ def add_epilogue(self) -> None: if sum(returncodes) != 0: job_lib.set_status({self.job_id!r}, job_lib.JobStatus.FAILED) # This waits for all streaming logs to finish. - time.sleep(1) + job_lib.scheduler.schedule_step() + time.sleep(0.5) print('ERROR: {colorama.Fore.RED}Job {self.job_id} failed with ' 'return code list:{colorama.Style.RESET_ALL}', returncodes, @@ -577,7 +578,8 @@ def add_epilogue(self) -> None: sys.stderr.flush() job_lib.set_status({self.job_id!r}, job_lib.JobStatus.SUCCEEDED) # This waits for all streaming logs to finish. - time.sleep(1) + job_lib.scheduler.schedule_step() + time.sleep(0.5) """) ] diff --git a/sky/skylet/job_lib.py b/sky/skylet/job_lib.py index b043453c925..42bafa7dc8a 100644 --- a/sky/skylet/job_lib.py +++ b/sky/skylet/job_lib.py @@ -170,10 +170,10 @@ def schedule_step(self) -> None: jobs = self._get_jobs() if len(jobs) > 0: update_status(job_owner) + # TODO(zhwu, mraheja): One optimization can be allowing more than one + # job staying in the pending state after ray job submit, so that to be + # faster to schedule a large amount of jobs. for job_id, run_cmd, submit, _ in jobs: - # TODO(mraheja): remove pylint disabling when filelock - # version updated - # pylint: disable=abstract-class-instantiated with filelock.FileLock(_get_lock_path(job_id)): status = get_status_no_lock(job_id) if status not in _PRE_RESOURCE_STATUSES: diff --git a/tests/test_smoke.py b/tests/test_smoke.py index c87240d1c10..81a5d20c4b4 100644 --- a/tests/test_smoke.py +++ b/tests/test_smoke.py @@ -961,7 +961,7 @@ def test_large_job_queue(generic_cloud: str): f'sky launch -y -c {name} --cloud {generic_cloud}', f'for i in `seq 1 75`; do sky exec {name} -n {name}-$i -d "echo $i; sleep 100000000"; done', f'sky cancel -y {name} 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16', - 'sleep 60', + 'sleep 70', # Each job takes 0.5 CPU and the default VM has 8 CPUs, so there should be 8 / 0.5 = 16 jobs running. # The first 16 jobs are canceled, so there should be 75 - 32 = 43 jobs PENDING. f's=$(sky queue {name}); echo "$s"; echo; echo; echo "$s" | grep -v grep | grep PENDING | wc -l | grep 43', @@ -984,6 +984,24 @@ def test_large_job_queue(generic_cloud: str): ) run_one_test(test) +@pytest.mark.no_lambda_cloud # No Lambda Cloud VM has 8 CPUs +def test_fast_large_job_queue(generic_cloud: str): + # This is to test the jobs can be scheduled quickly when there are many jobs in the queue. + name = _get_cluster_name() + test = Test( + 'fast_large_job_queue', + [ + f'sky launch -y -c {name} --cloud {generic_cloud}', + f'for i in `seq 1 32`; do sky exec {name} -n {name}-$i -d "echo $i"; done', + 'sleep 60', + f's=$(sky queue {name}); echo "$s"; echo; echo; echo "$s" | grep -v grep | grep SUCCEEDED | wc -l | grep 32', + ], + f'sky down -y {name}', + timeout=20 * 60, + ) + run_one_test(test) + + @pytest.mark.ibm def test_ibm_job_queue_multinode(): From c5881b7930d1b0e8422d6a7671f7c9c4207771bf Mon Sep 17 00:00:00 2001 From: Zhanghao Wu Date: Mon, 5 Jun 2023 19:27:54 -0700 Subject: [PATCH 32/39] format --- tests/test_smoke.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/test_smoke.py b/tests/test_smoke.py index 81a5d20c4b4..ce57cb0f4ba 100644 --- a/tests/test_smoke.py +++ b/tests/test_smoke.py @@ -984,6 +984,7 @@ def test_large_job_queue(generic_cloud: str): ) run_one_test(test) + @pytest.mark.no_lambda_cloud # No Lambda Cloud VM has 8 CPUs def test_fast_large_job_queue(generic_cloud: str): # This is to test the jobs can be scheduled quickly when there are many jobs in the queue. @@ -1002,7 +1003,6 @@ def test_fast_large_job_queue(generic_cloud: str): run_one_test(test) - @pytest.mark.ibm def test_ibm_job_queue_multinode(): name = _get_cluster_name() From cc7b7fd37dcf33ac7633b01217bbfe57544044a6 Mon Sep 17 00:00:00 2001 From: Zhanghao Wu Date: Mon, 5 Jun 2023 21:47:29 -0700 Subject: [PATCH 33/39] Fix cancellation logic --- sky/skylet/job_lib.py | 15 +++++---------- 1 file changed, 5 insertions(+), 10 deletions(-) diff --git a/sky/skylet/job_lib.py b/sky/skylet/job_lib.py index 42bafa7dc8a..fab3b41e2f6 100644 --- a/sky/skylet/job_lib.py +++ b/sky/skylet/job_lib.py @@ -721,20 +721,15 @@ def cancel_jobs(job_owner: str, jobs: Optional[List[int]]) -> None: job_id = make_ray_job_id(job['job_id'], job_owner) # Job is locked to ensure that pending queue does not start it while # it is being cancelled - # TODO(mraheja): remove pylint disabling when filelock - # version updated - # pylint: disable=abstract-class-instantiated with filelock.FileLock(_get_lock_path(job['job_id'])): try: - # TODO(mraheja): remove pylint disabling when filelock - # version updated - # pylint: disable=abstract-class-instantiated job_client.stop_job(job_id) except RuntimeError as e: - # If the job does not exist or if the request to the - # job server fails. - logger.warning(str(e)) - continue + # If the request to the job server fails, we should not + # set the job to CANCELLED. + if 'does not exist' not in str(e): + logger.warning(str(e)) + continue if job['status'] in [ JobStatus.SETTING_UP, JobStatus.PENDING, JobStatus.RUNNING From 4e4503046ef8ea1ceef194f22e956100d2e5b651 Mon Sep 17 00:00:00 2001 From: Zhanghao Wu Date: Mon, 5 Jun 2023 22:01:05 -0700 Subject: [PATCH 34/39] Don't schedule a job after reboot --- sky/skylet/job_lib.py | 13 ++++++------- 1 file changed, 6 insertions(+), 7 deletions(-) diff --git a/sky/skylet/job_lib.py b/sky/skylet/job_lib.py index fab3b41e2f6..a05c0e873c8 100644 --- a/sky/skylet/job_lib.py +++ b/sky/skylet/job_lib.py @@ -173,11 +173,13 @@ def schedule_step(self) -> None: # TODO(zhwu, mraheja): One optimization can be allowing more than one # job staying in the pending state after ray job submit, so that to be # faster to schedule a large amount of jobs. - for job_id, run_cmd, submit, _ in jobs: + for job_id, run_cmd, submit, created_time in jobs: with filelock.FileLock(_get_lock_path(job_id)): status = get_status_no_lock(job_id) - if status not in _PRE_RESOURCE_STATUSES: - # Job doesn't exist or is running/cancelled + if (status not in _PRE_RESOURCE_STATUSES or + created_time < psutil.boot_time()): + # Job doesn't exist, is running/cancelled, or created + # before the last reboot. self.remove_job_no_lock(job_id) continue if submit: @@ -546,7 +548,7 @@ def update_job_status(job_owner: str, continue if pending_jobs[job_id]['created_time'] < psutil.boot_time(): # The job is stale as it is created before the instance - # is created. + # is booted, e.g. the instance is rebooted. job_statuses[i] = JobStatus.FAILED else: job_statuses[i] = JobStatus.PENDING @@ -558,9 +560,6 @@ def update_job_status(job_owner: str, # Per-job status lock is required because between the job status # query and the job status update, the job status in the databse # can be modified by the generated ray program. - # TODO(mraheja): remove pylint disabling when filelock version - # updated - # pylint: disable=abstract-class-instantiated with filelock.FileLock(_get_lock_path(job_id)): original_status = get_status_no_lock(job_id) assert original_status is not None, (job_id, status) From 6903de7eb5835f450247c083bd663b781154bf5e Mon Sep 17 00:00:00 2001 From: Zhanghao Wu Date: Mon, 5 Jun 2023 22:23:06 -0700 Subject: [PATCH 35/39] add comment --- sky/skylet/job_lib.py | 3 +++ 1 file changed, 3 insertions(+) diff --git a/sky/skylet/job_lib.py b/sky/skylet/job_lib.py index a05c0e873c8..06b8636f610 100644 --- a/sky/skylet/job_lib.py +++ b/sky/skylet/job_lib.py @@ -551,6 +551,9 @@ def update_job_status(job_owner: str, # is booted, e.g. the instance is rebooted. job_statuses[i] = JobStatus.FAILED else: + # Set the job status to PENDING even though the job can be + # in any later status, because the code will take the max + # of this status and the status in the jobs table. job_statuses[i] = JobStatus.PENDING assert len(job_statuses) == len(job_ids), (job_statuses, job_ids) From a6e82eb1503191c19b33265cff8caa327e4db8ea Mon Sep 17 00:00:00 2001 From: Zhanghao Wu Date: Mon, 5 Jun 2023 23:32:35 -0700 Subject: [PATCH 36/39] revert changes in test --- examples/job_queue/job_multinode.yaml | 2 +- tests/test_smoke.py | 1 - 2 files changed, 1 insertion(+), 2 deletions(-) diff --git a/examples/job_queue/job_multinode.yaml b/examples/job_queue/job_multinode.yaml index dd0b030e1ca..62546306f5a 100644 --- a/examples/job_queue/job_multinode.yaml +++ b/examples/job_queue/job_multinode.yaml @@ -23,5 +23,5 @@ run: | conda env list for i in {1..360}; do echo "$timestamp $i" - sleep 2 + sleep 1 done diff --git a/tests/test_smoke.py b/tests/test_smoke.py index ce57cb0f4ba..4f52988e308 100644 --- a/tests/test_smoke.py +++ b/tests/test_smoke.py @@ -926,7 +926,6 @@ def test_job_queue_multinode(generic_cloud: str): f'sky launch -c {name} -n {name}-3 --detach-setup -d examples/job_queue/job_multinode.yaml', f's=$(sky queue {name}) && echo "$s" && (echo "$s" | grep {name}-1 | grep RUNNING)', f's=$(sky queue {name}) && echo "$s" && (echo "$s" | grep {name}-2 | grep RUNNING)', - 'sleep 10', f's=$(sky queue {name}) && echo "$s" && (echo "$s" | grep {name}-3 | grep PENDING)', 'sleep 90', f'sky cancel -y {name} 1', From 46b7c11d96b3ccc06b7854e009563e98df82db8b Mon Sep 17 00:00:00 2001 From: Zhanghao Wu Date: Tue, 6 Jun 2023 00:43:19 -0700 Subject: [PATCH 37/39] Add test for cancelling pending jobs --- sky/backends/cloud_vm_ray_backend.py | 6 +++--- tests/test_smoke.py | 10 ++++++++++ 2 files changed, 13 insertions(+), 3 deletions(-) diff --git a/sky/backends/cloud_vm_ray_backend.py b/sky/backends/cloud_vm_ray_backend.py index 69b499ea37a..606051bcb77 100644 --- a/sky/backends/cloud_vm_ray_backend.py +++ b/sky/backends/cloud_vm_ray_backend.py @@ -2746,8 +2746,8 @@ def _exec_code_on_head( f'--submission-id {ray_job_id} --no-wait ' f'"{executable} -u {script_path} > {remote_log_path} 2>&1"') - mkdir_code = (f'{cd} && mkdir -p {remote_log_dir} &&' - f'echo START > {remote_log_path} 2>&1') + mkdir_code = (f'{cd} && mkdir -p {remote_log_dir} && ' + f'touch {remote_log_path}') code = job_lib.JobLibCodeGen.queue_job(job_id, job_submit_cmd) job_submit_cmd = mkdir_code + ' && ' + code @@ -2762,7 +2762,7 @@ def _exec_code_on_head( with ux_utils.print_exception_no_traceback(): raise RuntimeError( f'{colorama.Fore.RED}SkyPilot runtime is stale on the ' - 'remote cluster. To update run: sky launch -c ' + 'remote cluster. To update, run: sky launch -c ' f'{handle.cluster_name}{colorama.Style.RESET_ALL}') subprocess_utils.handle_returncode(returncode, diff --git a/tests/test_smoke.py b/tests/test_smoke.py index 4f52988e308..c1ca67b421b 100644 --- a/tests/test_smoke.py +++ b/tests/test_smoke.py @@ -977,6 +977,16 @@ def test_large_job_queue(generic_cloud: str): f's=$(sky queue {name}); echo "$s"; echo; echo; echo "$s" | grep {name}-{i} | grep PENDING' for i in range(33, 75) ], + f'sky cancel -y {name} 33 35 37 39 17 18 19', + *[ + f's=$(sky queue {name}); echo "$s"; echo; echo; echo "$s" | grep {name}-{i} | grep CANCELLED' + for i in range(33, 40, 2) + ], + 'sleep 10', + *[ + f's=$(sky queue {name}); echo "$s"; echo; echo; echo "$s" | grep {name}-{i} | grep RUNNING' + for i in [34, 36, 38] + ], ], f'sky down -y {name}', timeout=20 * 60, From 505f9e303d34adc88e97d5fe504ac6eb948cb1d2 Mon Sep 17 00:00:00 2001 From: Zhanghao Wu Date: Tue, 6 Jun 2023 01:33:15 -0700 Subject: [PATCH 38/39] Make update job status more readable --- sky/skylet/job_lib.py | 20 +++++++++++--------- sky/skylet/skylet.py | 5 ++++- 2 files changed, 15 insertions(+), 10 deletions(-) diff --git a/sky/skylet/job_lib.py b/sky/skylet/job_lib.py index 06b8636f610..22270f34dac 100644 --- a/sky/skylet/job_lib.py +++ b/sky/skylet/job_lib.py @@ -140,7 +140,7 @@ def colored_str(self): # to avoid race condition with `ray job` to make sure it job has been # correctly updated. # TODO(zhwu): This number should be tuned based on heuristics. -_PENDING_SUBMIT_TIMEOUT = 5 +_PENDING_SUBMIT_GRACE_PERIOD = 60 _PRE_RESOURCE_STATUSES = [JobStatus.PENDING] @@ -541,19 +541,21 @@ def update_job_status(job_owner: str, ray_status = job_details[ray_job_id].status job_statuses[i] = _RAY_TO_JOB_STATUS_MAP[ray_status] if job_id in pending_jobs: - # Gives a 5 second timeout between job being submit from the - # pending queue until appearing in ray jobs - if pending_jobs[job_id]['submit'] > 0 and pending_jobs[job_id][ - 'submit'] < time.time() - _PENDING_SUBMIT_TIMEOUT: - continue if pending_jobs[job_id]['created_time'] < psutil.boot_time(): # The job is stale as it is created before the instance # is booted, e.g. the instance is rebooted. job_statuses[i] = JobStatus.FAILED + # Gives a 60 second grace period between job being submit from + # the pending table until appearing in ray jobs. + if (pending_jobs[job_id]['submit'] > 0 and + pending_jobs[job_id]['submit'] < + time.time() - _PENDING_SUBMIT_GRACE_PERIOD): + # For jobs submitted outside of the grace period, we will + # consider the ray job status. + continue else: - # Set the job status to PENDING even though the job can be - # in any later status, because the code will take the max - # of this status and the status in the jobs table. + # Reset the job status to PENDING even though it may not appear + # in the ray jobs, so that it will not be considered as stale. job_statuses[i] = JobStatus.PENDING assert len(job_statuses) == len(job_ids), (job_statuses, job_ids) diff --git a/sky/skylet/skylet.py b/sky/skylet/skylet.py index d396bb84628..6bbb51e7a37 100644 --- a/sky/skylet/skylet.py +++ b/sky/skylet/skylet.py @@ -5,7 +5,10 @@ from sky import sky_logging from sky.skylet import events -logger = sky_logging.init_logger(__name__) +# Use the explicit logger name so that the logger is under the +# `sky.skylet.skylet` namespace when executed directly, so as +# to inherit the setup from the `sky` logger. +logger = sky_logging.init_logger('sky.skylet.skylet') logger.info('skylet started') EVENTS = [ From f4a2b8005557489367bfe0f588f2ab418650c290 Mon Sep 17 00:00:00 2001 From: Zhanghao Wu Date: Tue, 6 Jun 2023 02:01:37 -0700 Subject: [PATCH 39/39] schedule more frequently for job cancelling --- sky/skylet/job_lib.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/sky/skylet/job_lib.py b/sky/skylet/job_lib.py index 22270f34dac..dc12601c0fe 100644 --- a/sky/skylet/job_lib.py +++ b/sky/skylet/job_lib.py @@ -711,7 +711,7 @@ def cancel_jobs(job_owner: str, jobs: Optional[List[int]]) -> None: # jobs to CANCELLED. if jobs is None: job_records = _get_jobs( - None, [JobStatus.SETTING_UP, JobStatus.PENDING, JobStatus.RUNNING]) + None, [JobStatus.PENDING, JobStatus.SETTING_UP, JobStatus.RUNNING]) else: job_records = _get_jobs_by_ids(jobs) @@ -736,11 +736,11 @@ def cancel_jobs(job_owner: str, jobs: Optional[List[int]]) -> None: continue if job['status'] in [ - JobStatus.SETTING_UP, JobStatus.PENDING, JobStatus.RUNNING + JobStatus.PENDING, JobStatus.SETTING_UP, JobStatus.RUNNING ]: _set_status_no_lock(job['job_id'], JobStatus.CANCELLED) - scheduler.schedule_step() + scheduler.schedule_step() def get_run_timestamp(job_id: Optional[int]) -> Optional[str]: