Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Expanded Job Queue #1636

Merged
merged 48 commits into from
Jun 6, 2023
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
Show all changes
48 commits
Select commit Hold shift + click to select a range
9b9b670
working expanded queue
mraheja Jan 26, 2023
c099057
added event
mraheja Jan 26, 2023
281d798
updated testing and bug fixes
mraheja Jan 26, 2023
5ad959d
fixed missed space
mraheja Jan 26, 2023
440fc68
formatting fix
mraheja Jan 26, 2023
d92da6a
fixed setting up status + enabled spot
mraheja Feb 15, 2023
0fed010
format
mraheja Feb 15, 2023
2cb6d19
fixed setting up test
mraheja Feb 17, 2023
42ce456
addressed comments
mraheja Feb 23, 2023
496284e
change stop lock range
mraheja Feb 28, 2023
21c3836
Merge branch 'master' of https://github.com/skypilot-org/skypilot int…
mraheja Feb 28, 2023
cbf8ab3
error message for needing update
mraheja Feb 28, 2023
afedc83
removed wandb.py
mraheja Mar 3, 2023
68dea21
added skylet restart
mraheja Mar 3, 2023
fdc68e2
Merge branch 'master' of https://github.com/skypilot-org/skypilot int…
mraheja Mar 3, 2023
5af3146
updates
mraheja Mar 10, 2023
052a579
addressed comments
mraheja Mar 17, 2023
6c29f09
formatting + minor comment addressing
mraheja Mar 24, 2023
be36c35
more comments addressed
mraheja Mar 24, 2023
a12577e
Merge branch 'master' of https://github.com/skypilot-org/skypilot int…
Michaelvll Mar 26, 2023
21919c5
Fix rich status
Michaelvll Mar 26, 2023
0b7141a
fixed stalling issue
mraheja Apr 11, 2023
2e46f14
Merge branch 'master' of https://github.com/skypilot-org/skypilot int…
mraheja Apr 11, 2023
356c04e
format
mraheja Apr 11, 2023
3223b74
small test bug
mraheja Apr 11, 2023
b6a8325
fixed skylet launch issue
mraheja Apr 28, 2023
47ccba0
formatting + forgetten file
mraheja Apr 28, 2023
1bc0478
Merge branch 'master' of https://github.com/skypilot-org/skypilot int…
mraheja Apr 28, 2023
3dc81c4
more formatting + remove check
mraheja Apr 28, 2023
35a6469
Merge branch 'master' of https://github.com/skypilot-org/skypilot int…
mraheja May 20, 2023
3876881
addressed comments and removed pkill usage
mraheja May 20, 2023
4a7ef25
schedule after setup too
mraheja May 20, 2023
4ac4335
Update sky/backends/cloud_vm_ray_backend.py
mraheja Jun 1, 2023
f1556e6
addressed more comments
mraheja Jun 1, 2023
c4e21d1
Merge branch 'master' of https://github.com/skypilot-org/skypilot int…
mraheja Jun 1, 2023
b7d13b1
Merge branch 'expanded-job-queue' of https://github.com/skypilot-org/…
mraheja Jun 1, 2023
d305d26
formatting
mraheja Jun 1, 2023
88dd953
Merge branch 'master' of https://github.com/skypilot-org/skypilot int…
mraheja Jun 5, 2023
cb0d66f
Address comments
Michaelvll Jun 6, 2023
8b28637
faster job scheduling
Michaelvll Jun 6, 2023
c5881b7
format
Michaelvll Jun 6, 2023
cc7b7fd
Fix cancellation logic
Michaelvll Jun 6, 2023
4e45030
Don't schedule a job after reboot
Michaelvll Jun 6, 2023
6903de7
add comment
Michaelvll Jun 6, 2023
a6e82eb
revert changes in test
Michaelvll Jun 6, 2023
46b7c11
Add test for cancelling pending jobs
Michaelvll Jun 6, 2023
505f9e3
Make update job status more readable
Michaelvll Jun 6, 2023
f4a2b80
schedule more frequently for job cancelling
Michaelvll Jun 6, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 16 additions & 6 deletions sky/backends/cloud_vm_ray_backend.py
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,10 @@
pathlib.Path(sky.__file__).resolve().parent / 'backends' /
'monkey_patches' / 'monkey_patch_ray_up.py')

_SKYLET_RESTART_CMD = (
'(pkill -f "python3 -m sky.skylet.skylet"; nohup python3 -m '
'sky.skylet.skylet >> ~/.sky/skylet.log 2>&1 &);')
mraheja marked this conversation as resolved.
Show resolved Hide resolved


def _get_cluster_config_template(cloud):
cloud_to_template = {
Expand Down Expand Up @@ -317,6 +321,7 @@ def add_gang_scheduling_placement_group(
print('INFO: All task resources reserved.',
file=sys.stderr,
flush=True)
job_lib.scheduler.schedule_step()
mraheja marked this conversation as resolved.
Show resolved Hide resolved
""")
]

Expand All @@ -331,7 +336,6 @@ def add_gang_scheduling_placement_group(
# We unset it so that user setup command may properly use this env var.
setup_cmd = 'unset CUDA_VISIBLE_DEVICES; ' + setup_cmd
job_lib.set_status({job_id!r}, job_lib.JobStatus.SETTING_UP)
print({_CTRL_C_TIP_MESSAGE!r}, file=sys.stderr, flush=True)
total_num_nodes = len(ray.nodes())
setup_bundles = [{{"CPU": _SETUP_CPUS}} for _ in range(total_num_nodes)]
setup_pg = ray.util.placement_group(setup_bundles, strategy='STRICT_SPREAD')
Expand Down Expand Up @@ -365,7 +369,6 @@ def add_gang_scheduling_placement_group(
self._code += [
textwrap.dedent(f"""\
job_lib.set_job_started({self.job_id!r})
job_lib.scheduler.schedule_step()
"""),
]

Expand Down Expand Up @@ -1647,9 +1650,10 @@ def _ensure_cluster_ray_started(self, handle: 'CloudVmRayResourceHandle',
return
backend = CloudVmRayBackend()

# For backward compatability and robustness of skylet, it is restarted
mraheja marked this conversation as resolved.
Show resolved Hide resolved
returncode = backend.run_on_head(
handle,
'ray status',
f'{_SKYLET_RESTART_CMD}ray status',
mraheja marked this conversation as resolved.
Show resolved Hide resolved
# At this state, an erroneous cluster may not have cached
# handle.head_ip (global_user_state.add_or_update_cluster(...,
# ready=True)).
Expand Down Expand Up @@ -2595,16 +2599,22 @@ def _exec_code_on_head(
'--no-wait '
f'"{executable} -u {script_path} > {remote_log_path} 2>&1"')

mkdir_code = (
f'{cd} && mkdir -p {remote_log_dir} && touch '
f'{remote_log_path} && echo START > {remote_log_path} 2>&1')
mkdir_code = (f'{cd} && mkdir -p {remote_log_dir} &&'
f'echo START > {remote_log_path} 2>&1')
code = job_lib.JobLibCodeGen.queue_job(job_id, job_submit_cmd)
job_submit_cmd = mkdir_code + ' && ' + code

returncode, stdout, stderr = self.run_on_head(handle,
job_submit_cmd,
stream_logs=False,
require_outputs=True)

if 'has no attribute' in stdout:
logger.info(
f'{colorama.Fore.RED}SkyPilot must be updated on remote,'
f'use `sky launch` instead{colorama.Style.RESET_ALL}')
return
mraheja marked this conversation as resolved.
Show resolved Hide resolved

subprocess_utils.handle_returncode(returncode,
job_submit_cmd,
f'Failed to submit job {job_id}.',
Expand Down
77 changes: 40 additions & 37 deletions sky/skylet/job_lib.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
import subprocess
import time
import typing
from typing import Any, Dict, List, Optional, Tuple
from typing import Any, Dict, List, Optional

import colorama
import filelock
Expand Down Expand Up @@ -92,15 +92,15 @@ class JobStatus(enum.Enum):
# In the 'jobs' table, the `submitted_at` column will be set to the current
# time, when the job is firstly created (in the INIT state).
INIT = 'INIT'
# The job is waiting for the required resources. (`ray job status`
# shows RUNNING as the generated ray program has started, but blocked
# by the placement constraints.)
PENDING = 'PENDING'
# Running the user's setup script (only in effect if --detach-setup is
# set). Our update_job_status() can temporarily (for a short period) set
# the status to SETTING_UP, if the generated ray program has not set
# the status to PENDING or RUNNING yet.
SETTING_UP = 'SETTING_UP'
# The job is waiting for the required resources. (`ray job status`
# shows RUNNING as the generated ray program has started, but blocked
# by the placement constraints.)
PENDING = 'PENDING'
# The job is running.
# In the 'jobs' table, the `start_at` column will be set to the current
# time, when the job is firstly transitioned to RUNNING.
Expand Down Expand Up @@ -140,8 +140,9 @@ def colored_str(self):
# correctly updated.
# TODO(zhwu): This number should be tuned based on heuristics.
_SUBMITTED_GAP_SECONDS = 60
_PENDING_SUBMIT_TIMEOUT = 5

_PRE_RESOURCE_STATUSES = [JobStatus.PENDING, JobStatus.SETTING_UP]
_PRE_RESOURCE_STATUSES = [JobStatus.PENDING]


class JobScheduler:
Expand All @@ -154,13 +155,6 @@ def queue(self, job_id: int, cmd: str) -> None:
set_status(job_id, JobStatus.PENDING)
self.schedule_step()

def set_scheduled(self, job_id: str):
# TODO(mraheja): remove pylint disabling when filelock
# version updated
# pylint: disable=abstract-class-instantiated
with filelock.FileLock(_get_lock_path(job_id)):
self.remove_job_no_lock(job_id)

def remove_job_no_lock(self, job_id: str) -> None:
_CURSOR.execute(f'DELETE FROM pending_jobs WHERE job_id={job_id!r}')
_CONN.commit()
Expand All @@ -173,7 +167,7 @@ def _run_job(self, job_id: int, run_cmd: str):

mraheja marked this conversation as resolved.
Show resolved Hide resolved
def schedule_step(self) -> None:
job_owner = getpass.getuser()
jobs = list(self._get_jobs())
jobs = self._get_jobs()
if len(jobs) > 0:
update_status(job_owner, _SUBMITTED_GAP_SECONDS)
for job_id, run_cmd, submit, _ in jobs:
Expand All @@ -192,12 +186,16 @@ def schedule_step(self) -> None:
self._run_job(job_id, run_cmd)
return
Michaelvll marked this conversation as resolved.
Show resolved Hide resolved

def _get_jobs(self) -> List:
mraheja marked this conversation as resolved.
Show resolved Hide resolved
raise NotImplementedError


class FIFOScheduler(JobScheduler):
"""First in first out job scheduler"""

def _get_jobs(self) -> Tuple:
return _CURSOR.execute('SELECT * FROM pending_jobs ORDER BY job_id')
def _get_jobs(self) -> List:
return list(
_CURSOR.execute('SELECT * FROM pending_jobs ORDER BY job_id'))


scheduler = FIFOScheduler()
Expand Down Expand Up @@ -229,8 +227,8 @@ def _get_jobs(self) -> Tuple:
# (ray's status becomes RUNNING), i.e. it will be very rare that the job
# will be set to SETTING_UP by the update_job_status, as our generated
# ray program will set the status to PENDING immediately.
mraheja marked this conversation as resolved.
Show resolved Hide resolved
'PENDING': JobStatus.INIT,
'RUNNING': JobStatus.SETTING_UP,
'PENDING': JobStatus.PENDING,
'RUNNING': JobStatus.PENDING,
'SUCCEEDED': JobStatus.SUCCEEDED,
'FAILED': JobStatus.FAILED,
'STOPPED': JobStatus.CANCELLED,
Expand Down Expand Up @@ -496,8 +494,7 @@ def update_job_status(job_owner: str,
job_detail_lists: List['ray_pydantic.JobDetails'] = job_client.list_jobs()

pending_jobs = _get_pending_jobs()

job_details = dict()
job_details = {}
ray_job_ids_set = set(ray_job_ids)
for job_detail in job_detail_lists:
if job_detail.submission_id in ray_job_ids_set:
Expand All @@ -509,10 +506,14 @@ def update_job_status(job_owner: str,
ray_status = job_details[ray_job_id].status
job_statuses[i] = _RAY_TO_JOB_STATUS_MAP[ray_status]
if job_id in pending_jobs:
# Gives a 5 second timeout between job being submit from the
# pending queue until appearing in ray jobs
if pending_jobs[job_id]['submit'] > 0 and pending_jobs[job_id][
'submit'] < time.time() - 5:
'submit'] < time.time() - _PENDING_SUBMIT_TIMEOUT:
continue
if pending_jobs[job_id]['created_time'] < psutil.boot_time():
# The job is stale as it is created before the instance
# is created.
job_statuses[i] = JobStatus.FAILED
mraheja marked this conversation as resolved.
Show resolved Hide resolved
else:
job_statuses[i] = JobStatus.PENDING
Expand Down Expand Up @@ -686,22 +687,24 @@ def cancel_jobs(job_owner: str, jobs: Optional[List[int]]) -> None:
# ray cluster (tracked in #1262).
for job in job_records:
job_id = make_ray_job_id(job['job_id'], job_owner)
try:
# TODO(mraheja): remove pylint disabling when filelock
# version updated
# pylint: disable=abstract-class-instantiated
with filelock.FileLock(_get_lock_path(job_id)):
# Job is locked to ensure that pending queue does not start it while
# it is being cancelled
with filelock.FileLock(_get_lock_path(job['job_id'],)):
mraheja marked this conversation as resolved.
Show resolved Hide resolved
try:
# TODO(mraheja): remove pylint disabling when filelock
# version updated
# pylint: disable=abstract-class-instantiated
job_client.stop_job(job_id)
except RuntimeError as e:
# If the job does not exist or if the request to the
# job server fails.
logger.warning(str(e))
continue
except RuntimeError as e:
# If the job does not exist or if the request to the
# job server fails.
logger.warning(str(e))
continue

if job['status'] in [
JobStatus.SETTING_UP, JobStatus.PENDING, JobStatus.RUNNING
]:
set_status(job['job_id'], JobStatus.CANCELLED)
if job['status'] in [
JobStatus.SETTING_UP, JobStatus.PENDING, JobStatus.RUNNING
]:
set_status(job['job_id'], JobStatus.CANCELLED)


def get_run_timestamp(job_id: Optional[int]) -> Optional[str]:
Expand Down Expand Up @@ -759,9 +762,9 @@ def add_job(cls, job_name: str, username: str, run_timestamp: str,
return cls._build(code)

@classmethod
def queue_job(cls, job_name: str, cmd: str) -> None:
def queue_job(cls, job_id: int, cmd: str) -> str:
code = ['job_lib.scheduler.queue('
f'{job_name!r},'
f'{job_id!r},'
f'{cmd!r})']
return cls._build(code)

Expand Down
2 changes: 1 addition & 1 deletion tests/test_smoke.py
Original file line number Diff line number Diff line change
Expand Up @@ -836,7 +836,7 @@ def test_large_job_queue(generic_cloud: str):
f'sky launch -y -c {name} --cloud {generic_cloud}',
f'for i in `seq 1 75`; do sky exec {name} -n {name}-$i -d "echo $i; sleep 100000000"; done',
f'sky cancel -y {name} 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16',
'sleep 120',
'sleep 90',
mraheja marked this conversation as resolved.
Show resolved Hide resolved
# Each job takes 0.5 CPU and the default VM has 8 CPUs, so there should be 8 / 0.5 = 16 jobs running.
# The first 16 jobs are canceled, so there should be 75 - 32 = 43 jobs PENDING.
f'sky queue {name} | grep -v grep | grep PENDING | wc -l | grep 43',
Expand Down
Empty file removed tests/wandb.py
Empty file.