Skip to content

Commit

Permalink
correctness changes
Browse files Browse the repository at this point in the history
  • Loading branch information
mraheja committed Nov 3, 2022
1 parent 1468060 commit 0ea817c
Show file tree
Hide file tree
Showing 4 changed files with 31 additions and 28 deletions.
15 changes: 3 additions & 12 deletions sky/skylet/events.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,21 +51,12 @@ def run(self):
def _run(self):
raise NotImplementedError


class JobUpdateEvent(SkyletEvent):
"""Skylet event for updating job status."""
class JobSchedulerEvent(SkyletEvent):
"""Skylet event for scheduling jobs"""
EVENT_INTERVAL_SECONDS = 300

# Only update status of the jobs after this many seconds of job submission,
# to avoid race condition with `ray job` to make sure it job has been
# correctly updated.
# TODO(zhwu): This number should be tuned based on heuristics.
_SUBMITTED_GAP_SECONDS = 60

def _run(self):
job_owner = getpass.getuser()
job_lib.update_status(job_owner,
submitted_gap_sec=self._SUBMITTED_GAP_SECONDS)
job_lib.scheduler.run_next_if_possible()


class SpotJobUpdateEvent(SkyletEvent):
Expand Down
36 changes: 22 additions & 14 deletions sky/skylet/job_lib.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
This is a remote utility module that provides job queue functionality.
"""
import datetime
import enum
import os
import pathlib
Expand Down Expand Up @@ -64,7 +65,8 @@ def create_table(cursor, conn):
cursor.execute("""CREATE TABLE IF NOT EXISTS pending_jobs(
job_id INTEGER,
run_cmd TEXT,
submit INTEGER
submit INTEGER,
created_time TIMESTAMP default CURRENT_TIMESTAMP
)""")

db_utils.add_column_to_table(cursor, conn, 'jobs', 'end_at', 'FLOAT')
Expand Down Expand Up @@ -388,8 +390,8 @@ def _get_jobs_by_ids(job_ids: List[int]) -> List[Dict[str, Any]]:


def _get_pending_jobs():
rows = _CURSOR.execute('SELECT job_id FROM pending_jobs')
return [int(row[0]) for row in rows]
rows = _CURSOR.execute('SELECT job_id, created_time FROM pending_jobs')
return [int(row[0]) for row in rows], [int(row[1]) for row in rows]


def update_job_status(job_owner: str,
Expand Down Expand Up @@ -425,13 +427,18 @@ def update_job_status(job_owner: str,
if job_detail.submission_id in ray_job_ids_set:
job_details[job_detail.submission_id] = job_detail
job_statuses: List[JobStatus] = [None] * len(job_ids)
pending_jobs = _get_pending_jobs()
pending_jobs, start_times = _get_pending_jobs()
for i, job_id in enumerate(job_ids):
ray_job_id = make_ray_job_id(job_id, job_owner)
if ray_job_id in job_details:
ray_status = job_details[ray_job_id].status
job_statuses[i] = _RAY_TO_JOB_STATUS_MAP[ray_status]
if job_id in pending_jobs:
idx = pending_jobs.indexof(job_id)
# if start_times[i] < :
# job_statuses[i] = JobStatus.FAILED
# else:
# job_statuses[i] = JobStatus.PENDING
job_statuses[i] = JobStatus.PENDING

assert len(job_statuses) == len(job_ids), (job_statuses, job_ids)
Expand Down Expand Up @@ -594,16 +601,17 @@ def cancel_jobs(job_owner: str, jobs: Optional[List[int]]) -> None:
# ray cluster (tracked in #1262).
for job in job_records:
job_id = make_ray_job_id(job['job_id'], job_owner)
try:
job_client.stop_job(job_id)
except RuntimeError as e:
# If the job does not exist or if the request to the
# job server fails.
logger.warning(str(e))
continue

if job['status'] in [JobStatus.PENDING, JobStatus.RUNNING]:
set_status(job['job_id'], JobStatus.CANCELLED)
with filelock.FileLock(_get_lock_path(job_id)):
try:
job_client.stop_job(job_id)
except RuntimeError as e:
# If the job does not exist or if the request to the
# job server fails.
logger.warning(str(e))
continue

if job['status'] in [JobStatus.PENDING, JobStatus.RUNNING]:
set_status(job['job_id'], JobStatus.CANCELLED)
scheduler.run_next_if_possible()


Expand Down
2 changes: 1 addition & 1 deletion sky/skylet/skylet.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@

EVENTS = [
events.AutostopEvent(),
events.JobUpdateEvent(),
events.JobSchedulerEvent(),
# The spot job update event should be after the job update event.
# Otherwise, the abnormal spot job status update will be delayed
# until the next job update event.
Expand Down
6 changes: 5 additions & 1 deletion tests/test_smoke.py
Original file line number Diff line number Diff line change
Expand Up @@ -214,7 +214,7 @@ def test_stale_job_manual_restart():
f'sky logs {name} 1 --status',
f'sky logs {name} 3 --status',
# Ensure the skylet updated the stale job status.
f'sleep {events.JobUpdateEvent.EVENT_INTERVAL_SECONDS}',
f'sleep {events.JobSchedulerEvent.EVENT_INTERVAL_SECONDS}',
f's=$(sky queue {name}); printf "$s"; echo; echo; printf "$s" | grep FAILED',
],
f'sky down -y {name}',
Expand Down Expand Up @@ -345,6 +345,10 @@ def test_large_job_queue():
# Each job takes 0.5 CPU and the default VM has 8 CPUs, so there should be 8 / 0.5 = 16 jobs running.
# The first 16 jobs are canceled, so there should be 75 - 32 = 43 jobs PENDING.
f'sky queue {name} | grep -v grep | grep PENDING | wc -l | grep 43',
f'sky queue {name} | grep {name}-15 | grep RUNNING',
f'sky queue {name} | grep {name}-32 | grep RUNNING',
f'sky queue {name} | grep {name}-33 | grep PENDING',
f'sky queue {name} | grep {name}-50 | grep PENDING',
],
f'sky down -y {name}',
)
Expand Down

0 comments on commit 0ea817c

Please sign in to comment.