Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Expanded Job Queue #1218

Closed
wants to merge 10 commits into from
Closed

Expanded Job Queue #1218

wants to merge 10 commits into from

Conversation

mraheja
Copy link
Collaborator

@mraheja mraheja commented Oct 10, 2022

Job Queue Design

In the past, ray jobs have been dispatched everytime a job is submit, causing many resources to be used just to idle. In this job queue, we have a pending jobs table which dispatches jobs only when the resources for the previous job have been fulfilled. The way the pending jobs table does this is by running a scheduling step which
Checks if the job has failed or been cancelled then it’ll remove it from the table
Checks if the job has already been submit, in which case it will return and wait for another call of the scheduling step

However, with the detached setup, there is some extra complication to this.

Option 1 - Current Implementation

The current implementation will run the setup script only when the ray job for the full

Statuses in order:

INIT: Job has been submit
PENDING: Job is waiting for resources
SETTING_UP: Job is running setup command and previous job is completed
RUNNING: Job is running and current job’s resources are acquired
SUCCEEDED: Job is completed

Keep in mind that there needs to be an update function that can just look at the state and calculate where each job is in the process. Here is the current update system:

Check if job is pending table, if so then mark the status as pending
Check the status of the ray job:
If it’s running, mark it as max(current_status, SETTING_UP)
If it’s a terminal status (i.e SUCCEEDED/FAILED), mark it as that
If it doesn’t exist, mark it as failed

Option 2 - Alternative Implementation

A pitfall of this implementation is that is cannot run the setup job (which requires no resources) before the actual job’s resources are freed. An alternative implementation would be:

INIT: Job has been submit
SETTING_UP: Setup command running (without any sort of other condition on resources)
PENDING: Job is waiting for resources
RUNNING: Actual Job is running
SUCCEEDED: Job is completed

Implementing this requires submitting a separate job for setup. Updating however becomes a little more unclear because if a ray job doesn’t exist you don’t know whether setup is completed or the entire thing failed or setup failed etc.

This could also cause some problems (i.e setup before intended) but for the most part will be faster because setup is run beforehand.

Copy link
Collaborator

@Michaelvll Michaelvll left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for adding this job queue @mraheja! Great work! The code can be a proof-of-concept for extracting out the job queue. We should consider whether to place that pending information in the original jobs table, to avoid issues caused by concurrent modification of the two tables.

Comment on lines 2061 to 2062
code = job_lib.JobLibCodeGen.queue_job(job_id, job_submit_cmd)
mkdir_code = f'{cd} && mkdir -p {remote_log_dir} && echo START > {remote_log_path} 2>&1'
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can combine these two commands into one run_on_head to reduce the overhead for ssh.

Comment on lines 28 to 42
_DB_PATH = os.path.expanduser('~/.sky/jobs.db')
os.makedirs(pathlib.Path(_DB_PATH).parents[0], exist_ok=True)
CONN = sqlite3.connect(_DB_PATH)
CURSOR = CONN.cursor()
# CREATE TABLE
try:
CURSOR.execute('SELECT * FROM pending_jobs LIMIT 0')
except sqlite3.OperationalError:
# Tables do not exist, create them.
CURSOR.execute(''' CREATE TABLE pending_jobs(
job_name TEXT,
run_string TEXT,
submit INTEGER
)''')
CONN.commit()
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we place these lines in the create_table in L108, so we will get rid of some issues with the multi-threading + sqlite?

# Go through queue and delete anything which is marked as anything but pending
# If previous job has a pg that has already been acquired, then run the command for the top job
# Give it one second then try the next one -> if not assume that resources aren't free and wait for a job to finish which will trigger this again
query = CURSOR.execute('SELECT * FROM pending_jobs ORDER BY job_name')
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Use the _CURSOR and _CONN created in L126 and L127

Comment on lines 54 to 59
status = list(
CURSOR.execute(f'SELECT status FROM jobs WHERE job_id={name}'))
if len(status) == 0:
remove_job(name)
continue
status = status[0][0]
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: follow the style for retrieving values from the SQLite returns in other parts of our code.

_CURSOR.execute(f'SELECT status FROM jobs WHERE job_id={name}')
status = _CURSOR.fetchone()
if status is None:
    remove_job(name)
    continue



def run_top_if_possible() -> None:
# Go through queue and delete anything which is marked as anything but pending
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we update the comments? pending -> init or pending

f'UPDATE pending_jobs SET submit=1 WHERE job_name={name!r}')
CONN.commit()
os.system(run_cmd)
time.sleep(1)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we continue to the next job after this one is submitted? I thought the logic here is to submit the first job in the queue and break.

Also, this makes me wonder why don't we have this function call in the sky/skylet/events.py, so that we will call it every several seconds, submitting the job whenever there is no submitted one in the queue.

except sqlite3.OperationalError:
# Tables do not exist, create them.
CURSOR.execute(''' CREATE TABLE pending_jobs(
job_name TEXT,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we use job_name instead of job_id as in L111? The job_name can be the same for different jobs.

# Tables do not exist, create them.
CURSOR.execute(''' CREATE TABLE pending_jobs(
job_name TEXT,
run_string TEXT,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: run_cmd?

CURSOR.execute('SELECT * FROM pending_jobs LIMIT 0')
except sqlite3.OperationalError:
# Tables do not exist, create them.
CURSOR.execute(''' CREATE TABLE pending_jobs(
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we create another pending_jobs table? I think we can add the run_string and submit columns to our original jobs table, so that we can get the job status and submit status together with

_CURSOR.execute('SELECT submit FROM jobs WHERE status='PENDING' ORDER_BY job_id LIMIT 1')
if not submit:
    # submit the job
else:
    return

CONN.commit()


def run_top_if_possible() -> None:
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can refactor it a bit to support different strategy, but this might be fine for now.

@Michaelvll
Copy link
Collaborator

A correctness note: we set the set_pending for the spot tasks in the generated ray program.

if spot_task is not None:
# Add the spot job to spot queue table.
resources_str = backend_utils.get_task_resources_str(spot_task)
self._code += [
'from sky.spot import spot_state',
f'spot_state.set_pending('
f'{job_id}, {spot_task.name!r}, {resources_str!r})',
]

Since we don't immediately run the generated ray program, when the job is submitted. The pending spot jobs will not appear in the spot queue. We need to move this check and set_pending call before the following line.

codegen.add_prologue(job_id,

@Michaelvll Michaelvll self-requested a review November 10, 2022 00:58
Copy link
Collaborator

@Michaelvll Michaelvll left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you for the fix @mraheja! We may need to be careful about the detached setup, which adds a SETTING_UP status for the job #1379
For that, we may want to run the setup before the job goes to PENDING.

@@ -8,4 +8,3 @@

resources:
cloud: aws
accelerators: K80
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

remnant?

@@ -9,6 +9,5 @@

resources:
cloud: aws
accelerators: K80
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We need the accelerators to check the jobs pending works correctly. Otherwise, all the jobs in the test will be submitted without staying in the pending queue.

Comment on lines 11 to 13
resources:
accelerators: K80:0.5

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

revert it?

@@ -262,6 +262,8 @@ def add_gang_scheduling_placement_group(
# it is waiting for other task to finish. We should hide the
# error message.
ray.get(pg.ready())
job_lib.scheduler.remove_job({self.job_id!r})
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

rename it? For example, how about job_lib.scheduler.set_scheduled({self.job_id!r})?

f'--address=http://127.0.0.1:8265 --submission-id {ray_job_id} '
'--no-wait '
'--no-wait -- '
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Any reason we need this --?

def _get_jobs(self) -> Tuple:
return _CURSOR.execute('SELECT * FROM pending_jobs ORDER BY job_id')

def run_next_if_possible(self) -> None:
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about we rename it to schedule_step(self)?

def _get_pending_jobs():
rows = _CURSOR.execute('SELECT job_id, created_time FROM pending_jobs')
rows = list(rows)
return [int(row[0]) for row in rows], [int(row[1]) for row in rows]
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about we return a dict instead?

{
  job_id: created_time
  for job_id, created_time in rows
}

Comment on lines 443 to 444
idx = pending_jobs.index(job_id)
if start_times[idx] < psutil.boot_time():
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Using the dict to get the created_time as mentioned above?

# job server fails.
logger.warning(str(e))
continue
with filelock.FileLock(_get_lock_path(job_id)):
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch!

Comment on lines 348 to 351
f'sky queue {name} | grep {name}-15 | grep RUNNING',
f'sky queue {name} | grep {name}-32 | grep RUNNING',
f'sky queue {name} | grep {name}-33 | grep PENDING',
f'sky queue {name} | grep {name}-50 | grep PENDING',
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice! It is great to see the FIFO order works.

@Michaelvll Michaelvll self-requested a review January 16, 2023 03:06
@mraheja mraheja closed this Jan 26, 2023
@mraheja
Copy link
Collaborator Author

mraheja commented Jan 26, 2023

moved to #1636

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants