Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Expanded Job Queue #1636

Merged
merged 48 commits into from
Jun 6, 2023
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
48 commits
Select commit Hold shift + click to select a range
9b9b670
working expanded queue
mraheja Jan 26, 2023
c099057
added event
mraheja Jan 26, 2023
281d798
updated testing and bug fixes
mraheja Jan 26, 2023
5ad959d
fixed missed space
mraheja Jan 26, 2023
440fc68
formatting fix
mraheja Jan 26, 2023
d92da6a
fixed setting up status + enabled spot
mraheja Feb 15, 2023
0fed010
format
mraheja Feb 15, 2023
2cb6d19
fixed setting up test
mraheja Feb 17, 2023
42ce456
addressed comments
mraheja Feb 23, 2023
496284e
change stop lock range
mraheja Feb 28, 2023
21c3836
Merge branch 'master' of https://github.com/skypilot-org/skypilot int…
mraheja Feb 28, 2023
cbf8ab3
error message for needing update
mraheja Feb 28, 2023
afedc83
removed wandb.py
mraheja Mar 3, 2023
68dea21
added skylet restart
mraheja Mar 3, 2023
fdc68e2
Merge branch 'master' of https://github.com/skypilot-org/skypilot int…
mraheja Mar 3, 2023
5af3146
updates
mraheja Mar 10, 2023
052a579
addressed comments
mraheja Mar 17, 2023
6c29f09
formatting + minor comment addressing
mraheja Mar 24, 2023
be36c35
more comments addressed
mraheja Mar 24, 2023
a12577e
Merge branch 'master' of https://github.com/skypilot-org/skypilot int…
Michaelvll Mar 26, 2023
21919c5
Fix rich status
Michaelvll Mar 26, 2023
0b7141a
fixed stalling issue
mraheja Apr 11, 2023
2e46f14
Merge branch 'master' of https://github.com/skypilot-org/skypilot int…
mraheja Apr 11, 2023
356c04e
format
mraheja Apr 11, 2023
3223b74
small test bug
mraheja Apr 11, 2023
b6a8325
fixed skylet launch issue
mraheja Apr 28, 2023
47ccba0
formatting + forgetten file
mraheja Apr 28, 2023
1bc0478
Merge branch 'master' of https://github.com/skypilot-org/skypilot int…
mraheja Apr 28, 2023
3dc81c4
more formatting + remove check
mraheja Apr 28, 2023
35a6469
Merge branch 'master' of https://github.com/skypilot-org/skypilot int…
mraheja May 20, 2023
3876881
addressed comments and removed pkill usage
mraheja May 20, 2023
4a7ef25
schedule after setup too
mraheja May 20, 2023
4ac4335
Update sky/backends/cloud_vm_ray_backend.py
mraheja Jun 1, 2023
f1556e6
addressed more comments
mraheja Jun 1, 2023
c4e21d1
Merge branch 'master' of https://github.com/skypilot-org/skypilot int…
mraheja Jun 1, 2023
b7d13b1
Merge branch 'expanded-job-queue' of https://github.com/skypilot-org/…
mraheja Jun 1, 2023
d305d26
formatting
mraheja Jun 1, 2023
88dd953
Merge branch 'master' of https://github.com/skypilot-org/skypilot int…
mraheja Jun 5, 2023
cb0d66f
Address comments
Michaelvll Jun 6, 2023
8b28637
faster job scheduling
Michaelvll Jun 6, 2023
c5881b7
format
Michaelvll Jun 6, 2023
cc7b7fd
Fix cancellation logic
Michaelvll Jun 6, 2023
4e45030
Don't schedule a job after reboot
Michaelvll Jun 6, 2023
6903de7
add comment
Michaelvll Jun 6, 2023
a6e82eb
revert changes in test
Michaelvll Jun 6, 2023
46b7c11
Add test for cancelling pending jobs
Michaelvll Jun 6, 2023
505f9e3
Make update job status more readable
Michaelvll Jun 6, 2023
f4a2b80
schedule more frequently for job cancelling
Michaelvll Jun 6, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 15 additions & 2 deletions sky/backends/cloud_vm_ray_backend.py
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,13 @@
pathlib.Path(sky.__file__).resolve().parent / 'backends' /
'monkey_patches' / 'monkey_patch_ray_up.py')

# Restart skylet when the version does not match to keep the skylet up-to-date.
_MAYBE_SKYLET_RESTART_CMD = (
f'[[ $(cat {constants.SKYLET_VERSION_FILE}) = "{constants.SKYLET_VERSION}"'
' ]] || (pkill -f "python3 -m sky.skylet.skylet";'
f' echo {constants.SKYLET_VERSION} > {constants.SKYLET_VERSION_FILE};'
'nohup python3 -m sky.skylet.skylet >> ~/.sky/skylet.log 2>&1 &);')

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Did we test this manually, just to make sure it does not update the skylet every time?


def _get_cluster_config_template(cloud):
cloud_to_template = {
Expand Down Expand Up @@ -1649,7 +1656,7 @@ def _ensure_cluster_ray_started(self, handle: 'CloudVmRayResourceHandle',
# For backward compatability and robustness of skylet, it is restarted
mraheja marked this conversation as resolved.
Show resolved Hide resolved
returncode = backend.run_on_head(
handle,
f'ray status',
'ray status',
# At this state, an erroneous cluster may not have cached
# handle.head_ip (global_user_state.add_or_update_cluster(...,
# ready=True)).
Expand Down Expand Up @@ -2332,6 +2339,12 @@ def _update_after_cluster_provisioned(
usage_lib.messages.usage.update_final_cluster_status(
global_user_state.ClusterStatus.UP)

self.run_on_head(
mraheja marked this conversation as resolved.
Show resolved Hide resolved
handle,
_MAYBE_SKYLET_RESTART_CMD,
use_cached_head_ip=False,
)

# Update job queue to avoid stale jobs (when restarted), before
# setting the cluster to be ready.
if prev_cluster_status == global_user_state.ClusterStatus.INIT:
Expand Down Expand Up @@ -2606,7 +2619,7 @@ def _exec_code_on_head(
require_outputs=True)

if 'has no attribute' in stdout:
# Happens when someone calls `sky exec` but remote has not been updated
# Happens when someone calls `sky exec` but remote is outdated
# necessicating calling `sky launch`
with ux_utils.print_exception_no_traceback():
raise RuntimeError(
Expand Down
13 changes: 0 additions & 13 deletions sky/execution.py
Original file line number Diff line number Diff line change
Expand Up @@ -66,14 +66,6 @@
sky.spot_launch(task, ...)
""".strip()

SKYLET_VERSION = 1 # maybe move it to sky/skylet/constants.py
SKYLET_VERSION_FILE = '~/.sky/skylet_version' # maybe move it to sky/skylet/constants.py

# Restart skylet when the version does not match to keep the skylet up-to-date.
_MAYBE_SKYLET_RESTART_CMD = (
f'[[ $(cat {SKYLET_VERSION_FILE}) = "{SKYLET_VERSION}" ]] || (pkill -f "python3 -m sky.skylet.skylet"; echo {SKYLET_VERSION} > {SKYLET_VERSION_FILE}; nohup python3 -m '
'sky.skylet.skylet >> ~/.sky/skylet.log 2>&1 &);')


def _convert_to_dag(entrypoint: Any) -> 'sky.Dag':
"""Convert the entrypoint to a sky.Dag.
Expand Down Expand Up @@ -293,11 +285,6 @@ def _execute(
logger.info('Setup commands skipped.')
elif Stage.SETUP in stages:
backend.setup(handle, task, detach_setup=detach_setup)
backend.run_on_head(
handle,
_MAYBE_SKYLET_RESTART_CMD,
use_cached_head_ip=False,
)

if Stage.PRE_EXEC in stages:
if idle_minutes_to_autostop is not None:
Expand Down
3 changes: 3 additions & 0 deletions sky/skylet/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,3 +10,6 @@
'command to initialize it locally: sky launch -c {cluster} \'\'')

JOB_ID_ENV_VAR = 'SKYPILOT_JOB_ID'

SKYLET_VERSION = 1
SKYLET_VERSION_FILE = '~/.sky/skylet_version'
11 changes: 6 additions & 5 deletions sky/skylet/job_lib.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
import subprocess
import time
import typing
from typing import Any, Dict, List, Optional
from typing import Any, Dict, List, Optional, Tuple

import colorama
import filelock
Expand Down Expand Up @@ -156,7 +156,7 @@ def queue(self, job_id: int, cmd: str) -> None:
set_status(job_id, JobStatus.PENDING)
self.schedule_step()

def remove_job_no_lock(self, job_id: str) -> None:
def remove_job_no_lock(self, job_id: int) -> None:
_CURSOR.execute(f'DELETE FROM pending_jobs WHERE job_id={job_id!r}')
_CONN.commit()

Expand Down Expand Up @@ -191,14 +191,14 @@ def async_schedule_step(self) -> None:
p = multiprocessing.Process(target=self.schedule_step)
p.start()

def _get_jobs(self) -> List[str]:
def _get_jobs(self) -> List[Tuple[int, str, int, int]]:
raise NotImplementedError


class FIFOScheduler(JobScheduler):
"""First in first out job scheduler"""

def _get_jobs(self) -> List[str]:
def _get_jobs(self) -> List[Tuple[int, str, int, int]]:
return list(
_CURSOR.execute('SELECT * FROM pending_jobs ORDER BY job_id'))
mraheja marked this conversation as resolved.
Show resolved Hide resolved

Expand All @@ -219,7 +219,8 @@ def _get_jobs(self) -> List[str]:
_RAY_TO_JOB_STATUS_MAP = {
# These are intentionally set this way, because:
# 1. when the ray status indicates the job is PENDING the generated
# python program has left the job queue and is now PENDING
# python program has been `ray job submit` from the job queue
# and is now PENDING
# 2. when the ray status indicates the job is RUNNING the job can be in
# setup or resources may not be allocated yet, i.e. the job should be
# PENDING.
Expand Down