-
Notifications
You must be signed in to change notification settings - Fork 212
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Update queue selection to take walltime into account #1450
Changes from 3 commits
b091fb2
0a70675
0e3cc76
4379cad
dc84643
26f0ffc
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -7,7 +7,7 @@ | |
from CIME.XML.standard_module_setup import * | ||
from CIME.utils import format_time | ||
from CIME.XML.env_base import EnvBase | ||
from CIME.utils import transform_vars, get_cime_root | ||
from CIME.utils import transform_vars, get_cime_root, convert_to_seconds | ||
from copy import deepcopy | ||
|
||
logger = logging.getLogger(__name__) | ||
|
@@ -195,18 +195,31 @@ def set_job_defaults(self, batch_jobs, pesize=None, walltime=None, force_queue=N | |
else: | ||
task_count = int(task_count) | ||
|
||
queue = force_queue if force_queue is not None else self.select_best_queue(task_count, job) | ||
self.set_value("JOB_QUEUE", queue, subgroup=job) | ||
|
||
walltime = self.get_max_walltime(queue) if walltime is None else walltime | ||
if walltime is None: | ||
logger.warn("Could not find a queue matching task count %d, falling back to deprecated default walltime parameter"%task_count) | ||
#if the user names a queue which is not defined in config_batch.xml and does not set a | ||
#walltime, fall back to the max walltime in the default queue | ||
if force_queue: | ||
self.get_default_queue() | ||
walltime = self._default_walltime | ||
if force_queue: | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is worth a close look. The behavior is basically the same as before expect that the algorithm will override a users requested walltime if the user walltime is causing us to fail to match any queues. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. In another issue (#1439), I was saying we should never override something explicitly set by the user. The lower walltime you fill in may not be long enough for what the user intended to do. They won't notice CIME submitted with a different walltime, the experiment will run and burn real time, and they they'll find it didn't run as long as they wanted/asked for. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yeah, that true. I'm wondering how to support the following case: A test that uses few procs but takes a long time, the test lives in acme_integration.
With the changes below, the walltime is automatically reduced and the test will at least have a chance to run. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I could change the warning to something more conspicuous.
|
||
if not self.queue_meets_spec(force_queue, task_count, walltime=walltime, job=job): | ||
logger.warning("User-request queue '%s' does not meet requirements for job '%s'" % (force_queue, job)) | ||
else: | ||
queue = self.select_best_queue(task_count, walltime=walltime, job=job) | ||
if queue is None and walltime is not None: | ||
# Try to see if walltime was the holdup | ||
queue = self.select_best_queue(task_count, walltime=None, job=job) | ||
if queue is not None: | ||
# It was, override the walltime to avoid failure | ||
new_walltime = self.get_queue_specs(queue)[3] | ||
expect(new_walltime is not None, "Should never make it here") | ||
logger.warning("Requested walltime '%s' could not be matched by any queue, using '%s' instead" % (walltime, new_walltime)) | ||
walltime = new_walltime | ||
|
||
if queue is None: | ||
logger.warning("No queue on this system met the requirements for this job. Falling back to defaults") | ||
default_queue_node = self.get_default_queue() | ||
queue = default_queue_node.text | ||
walltime = self.get_queue_specs(queue)[3] | ||
|
||
walltime = self.get_queue_specs(queue)[3] if walltime is None else walltime | ||
walltime = self._default_walltime if walltime is None else walltime # last-chance fallback | ||
|
||
self.set_value("JOB_QUEUE", queue, subgroup=job) | ||
self.set_value("JOB_WALLCLOCK_TIME", walltime, subgroup=job) | ||
logger.debug("Job %s queue %s walltime %s" % (job, queue, walltime)) | ||
|
||
|
@@ -362,7 +375,6 @@ def _submit_single_job(self, case, job, depid=None, no_batch=False, batch_args=N | |
|
||
function_name = job.replace(".", "_") | ||
if not dry_run: | ||
function_name = job.replace(".", "_") | ||
locals()[function_name](case) | ||
|
||
return | ||
|
@@ -418,36 +430,64 @@ def get_job_id(self, output): | |
jobid = search_match.group(1) | ||
return jobid | ||
|
||
def select_best_queue(self, num_pes, job=None): | ||
def queue_meets_spec(self, queue, num_pes, walltime=None, job=None): | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Basically just an encapsulation of what select_best_queue was doing. |
||
jobmin, jobmax, jobname, walltimemax, strict = self.get_queue_specs(queue) | ||
|
||
# A job name match automatically meets spec | ||
if job is not None and jobname is not None: | ||
return jobname == job | ||
|
||
if jobmin is not None and num_pes < int(jobmin): | ||
return False | ||
|
||
if jobmax is not None and num_pes > int(jobmax): | ||
return False | ||
|
||
if walltime is not None and walltimemax is not None and strict: | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The "strict" concept is implemented here. |
||
walltime_s = convert_to_seconds(walltime) | ||
walltimemax_s = convert_to_seconds(walltimemax) | ||
if walltime_s > walltimemax_s: | ||
return False | ||
|
||
return True | ||
|
||
def select_best_queue(self, num_pes, walltime=None, job=None): | ||
# Make sure to check default queue first. | ||
all_queues = [] | ||
all_queues.append( self.get_default_queue()) | ||
all_queues = all_queues + self.get_all_queues() | ||
for queue in all_queues: | ||
if queue is not None: | ||
jobmin = queue.get("jobmin") | ||
jobmax = queue.get("jobmax") | ||
jobname = queue.get("jobname") | ||
if jobname is not None: | ||
if job == jobname: | ||
return queue.text | ||
# if the fullsum is between the min and max # jobs, then use this queue. | ||
elif jobmin is not None and jobmax is not None and num_pes >= int(jobmin) and num_pes <= int(jobmax): | ||
return queue.text | ||
qname = queue.text | ||
if self.queue_meets_spec(qname, num_pes, walltime=walltime, job=job): | ||
return qname | ||
|
||
return None | ||
|
||
def get_max_walltime(self, queue): | ||
def get_queue_specs(self, queue): | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Just a generalization of getting properties of a queue. |
||
""" | ||
Get queue specifications by name. | ||
|
||
Returns (jobmin, jobmax, jobname, walltimemax, is_strict) | ||
""" | ||
for queue_node in self.get_all_queues(): | ||
if queue_node.text == queue: | ||
return queue_node.get("walltimemax") | ||
jobmin = queue_node.get("jobmin") | ||
jobmax = queue_node.get("jobmax") | ||
jobname = queue_node.get("jobname") | ||
walltimemax = queue_node.get("walltimemax") | ||
strict = queue_node.get("strict") == "true" | ||
|
||
return jobmin, jobmax, jobname, walltimemax, strict | ||
|
||
expect(False, "Queue '%s' is unknown to this system" % queue) | ||
|
||
def get_default_queue(self): | ||
node = self.get_optional_node("queue", attributes={"default" : "true"}) | ||
if node is None: | ||
node = self.get_optional_node("queue") | ||
expect(node is not None, "No queues found") | ||
self._default_walltime = node.get("walltimemax") | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I wasn't comfortable with a method called "get_default_queue" having an important side effect. |
||
return(node) | ||
return node | ||
|
||
def get_all_queues(self): | ||
return self.get_nodes("queue") | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
"strict" is to deal with the fact that sometimes walltimemax is acts as a default and sometimes it's truly the longest walltime that a queue will accept.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Wouldn't adding "walltimedefault" be a better option? I always read "walltimemax" as the maximum walltime that is allowed for that queue.
(I feel the "strict" flag somehow doesn't convey the intent)