Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update queue selection to take walltime into account #1450

Merged
merged 6 commits into from
May 2, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 5 additions & 5 deletions config/acme/machines/config_batch.xml
Original file line number Diff line number Diff line change
Expand Up @@ -158,7 +158,7 @@
<directive>-l nodes={{ num_nodes }}:ppn={{ tasks_per_node }}</directive>
</directives>
<queues>
<queue walltimemax="01:00:00" jobmin="1" jobmax="64">shared</queue>
<queue walltimemax="01:00:00" jobmin="1" jobmax="64" strict="true">shared</queue>
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

"strict" is to deal with the fact that sometimes walltimemax is acts as a default and sometimes it's truly the longest walltime that a queue will accept.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wouldn't adding "walltimedefault" be a better option? I always read "walltimemax" as the maximum walltime that is allowed for that queue.
(I feel the "strict" flag somehow doesn't convey the intent)

<queue walltimemax="03:00:00" jobmin="64" jobmax="4096" default="true">batch</queue>
</queues>
</batch_system>
Expand All @@ -177,8 +177,8 @@
<!-- edison is SLURM as of Jan-4-2016 -->
<batch_system MACH="edison" type="slurm" >
<queues>
<queue walltimemax="00:30:00" jobmin="1" jobmax="12288" strict="true">debug</queue>
<queue walltimemax="01:30:00" jobmin="1" jobmax="150000" default="true">regular</queue>
<queue walltimemax="00:30:00" jobmin="1" jobmax="12288">debug</queue>
</queues>
</batch_system>

Expand All @@ -198,8 +198,8 @@
<directive> --constraint=haswell</directive>
</directives>
<queues>
<queue walltimemax="00:30:00" jobmin="1" jobmax="4096" strict="true">debug</queue>
<queue walltimemax="01:00:00" jobmin="1" jobmax="10000" default="true">regular</queue>
<queue walltimemax="00:30:00" jobmin="1" jobmax="4096">debug</queue>
</queues>
</batch_system>

Expand All @@ -208,8 +208,8 @@
<directive> --constraint=knl,quad,cache</directive>
</directives>
<queues>
<queue walltimemax="00:30:00" jobmin="1" jobmax="100000" strict="true">debug</queue>
<queue walltimemax="01:00:00" jobmin="1" jobmax="3000000" default="true">regular</queue>
<queue walltimemax="00:30:00" jobmin="1" jobmax="100000">debug</queue>
</queues>
</batch_system>

Expand Down Expand Up @@ -329,8 +329,8 @@
<directive>-l nodes={{ num_nodes }}</directive>
</directives>
<queues>
<queue walltimemax="01:00:00" jobmin="0" jobmax="64" strict="true">debug</queue>
<queue walltimemax="24:00:00" default="true">batch</queue>
<queue walltimemax="01:00:00" jobmin="0" jobmax="64">debug</queue>
</queues>
</batch_system>

Expand Down
3 changes: 2 additions & 1 deletion config/xml_schemas/config_batch.xsd
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@
<xs:element minOccurs="0" ref="directives"/>

<!-- queues: The list of queue options for this machine, not all system queues need be listed
attributes of this field include walltimemin, walltimemax, jobmin and jobmax
attributes of this field include walltimemin, walltimemax, jobmin and jobmax and strict
default wallclock time for a job is the walltimemax in this field -->
<xs:element minOccurs="0" ref="queues"/>
</xs:sequence>
Expand Down Expand Up @@ -130,6 +130,7 @@
<xs:simpleContent>
<xs:extension base="xs:NCName">
<xs:attribute name="default" type="xs:boolean"/>
<xs:attribute name="strict" type="xs:boolean"/>
<xs:attribute name="jobmax" type="xs:integer"/>
<xs:attribute name="jobmin" type="xs:integer"/>
<xs:attribute name="jobname" type="xs:NCName"/>
Expand Down
5 changes: 2 additions & 3 deletions scripts/create_test
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ import argparse, math, glob

logger = logging.getLogger(__name__)


###############################################################################
def parse_command_line(args, description):
###############################################################################
Expand Down Expand Up @@ -420,8 +419,8 @@ def single_submit_impl(machine_name, test_id, proc_pool, project, args, job_cost
else:
wall_time_bab = wall_time

queue = env_batch.select_best_queue(proc_pool)
wall_time_max_bab = env_batch.get_max_walltime(queue)
queue = env_batch.select_best_queue(proc_pool, wall_time_bab)
wall_time_max_bab = env_batch.get_queue_specs(queue)[3]
if wall_time_max_bab is not None:
wall_time_max = convert_to_seconds(wall_time_max_bab)
if wall_time_max < wall_time:
Expand Down
100 changes: 72 additions & 28 deletions scripts/lib/CIME/XML/env_batch.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
from CIME.XML.standard_module_setup import *
from CIME.utils import format_time
from CIME.XML.env_base import EnvBase
from CIME.utils import transform_vars, get_cime_root
from CIME.utils import transform_vars, get_cime_root, convert_to_seconds
from copy import deepcopy

logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -181,7 +181,7 @@ def make_batch_script(self, input_template, job, case, total_tasks, tasks_per_no
fd.write(output_text)
os.chmod(job, os.stat(job).st_mode | stat.S_IXUSR | stat.S_IXGRP | stat.S_IXOTH)

def set_job_defaults(self, batch_jobs, pesize=None, walltime=None, force_queue=None):
def set_job_defaults(self, batch_jobs, pesize=None, walltime=None, force_queue=None, allow_walltime_override=False):
if self.batchtype is None:
self.batchtype = self.get_batch_system_type()

Expand All @@ -195,18 +195,35 @@ def set_job_defaults(self, batch_jobs, pesize=None, walltime=None, force_queue=N
else:
task_count = int(task_count)

queue = force_queue if force_queue is not None else self.select_best_queue(task_count, job)
self.set_value("JOB_QUEUE", queue, subgroup=job)

walltime = self.get_max_walltime(queue) if walltime is None else walltime
if walltime is None:
logger.warn("Could not find a queue matching task count %d, falling back to deprecated default walltime parameter"%task_count)
#if the user names a queue which is not defined in config_batch.xml and does not set a
#walltime, fall back to the max walltime in the default queue
if force_queue:
self.get_default_queue()
walltime = self._default_walltime
if force_queue:
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is worth a close look. The behavior is basically the same as before expect that the algorithm will override a users requested walltime if the user walltime is causing us to fail to match any queues.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In another issue (#1439), I was saying we should never override something explicitly set by the user. The lower walltime you fill in may not be long enough for what the user intended to do. They won't notice CIME submitted with a different walltime, the experiment will run and burn real time, and they they'll find it didn't run as long as they wanted/asked for.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, that true. I'm wondering how to support the following case:

A test that uses few procs but takes a long time, the test lives in acme_integration.

  1. Due to being in acme_integration, 3 hours is requested
  2. Due to using a small number of procs, blues will try to put it in the debug queue

With the changes below, the walltime is automatically reduced and the test will at least have a chance to run.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I could change the warning to something more conspicuous.

WARNING
WARNING Requested walltime '%s' could not be matched by any queue, using '%s' instead
WARNING

if not self.queue_meets_spec(force_queue, task_count, walltime=walltime, job=job):
logger.warning("WARNING: User-requested queue '%s' does not meet requirements for job '%s'" % (force_queue, job))
else:
queue = self.select_best_queue(task_count, walltime=walltime, job=job)
if queue is None and walltime is not None:
# Try to see if walltime was the holdup
queue = self.select_best_queue(task_count, walltime=None, job=job)
if queue is not None:
# It was, override the walltime if a test, otherwise just warn the user
new_walltime = self.get_queue_specs(queue)[3]
expect(new_walltime is not None, "Should never make it here")
logger.warning("WARNING: Requested walltime '%s' could not be matched by any queue" % walltime)
if allow_walltime_override:
logger.warning(" Using walltime '%s' instead" % new_walltime)
walltime = new_walltime
else:
logger.warning(" Continuing with suspect walltime, batch submission may fail")

if queue is None:
logger.warning("WARNING: No queue on this system met the requirements for this job. Falling back to defaults")
default_queue_node = self.get_default_queue()
queue = default_queue_node.text
walltime = self.get_queue_specs(queue)[3]

walltime = self.get_queue_specs(queue)[3] if walltime is None else walltime
walltime = self._default_walltime if walltime is None else walltime # last-chance fallback

self.set_value("JOB_QUEUE", queue, subgroup=job)
self.set_value("JOB_WALLCLOCK_TIME", walltime, subgroup=job)
logger.debug("Job %s queue %s walltime %s" % (job, queue, walltime))

Expand Down Expand Up @@ -362,7 +379,6 @@ def _submit_single_job(self, case, job, depid=None, no_batch=False, batch_args=N

function_name = job.replace(".", "_")
if not dry_run:
function_name = job.replace(".", "_")
locals()[function_name](case)

return
Expand Down Expand Up @@ -418,36 +434,64 @@ def get_job_id(self, output):
jobid = search_match.group(1)
return jobid

def select_best_queue(self, num_pes, job=None):
def queue_meets_spec(self, queue, num_pes, walltime=None, job=None):
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Basically just an encapsulation of what select_best_queue was doing.

jobmin, jobmax, jobname, walltimemax, strict = self.get_queue_specs(queue)

# A job name match automatically meets spec
if job is not None and jobname is not None:
return jobname == job

if jobmin is not None and num_pes < int(jobmin):
return False

if jobmax is not None and num_pes > int(jobmax):
return False

if walltime is not None and walltimemax is not None and strict:
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The "strict" concept is implemented here.

walltime_s = convert_to_seconds(walltime)
walltimemax_s = convert_to_seconds(walltimemax)
if walltime_s > walltimemax_s:
return False

return True

def select_best_queue(self, num_pes, walltime=None, job=None):
# Make sure to check default queue first.
all_queues = []
all_queues.append( self.get_default_queue())
all_queues = all_queues + self.get_all_queues()
for queue in all_queues:
if queue is not None:
jobmin = queue.get("jobmin")
jobmax = queue.get("jobmax")
jobname = queue.get("jobname")
if jobname is not None:
if job == jobname:
return queue.text
# if the fullsum is between the min and max # jobs, then use this queue.
elif jobmin is not None and jobmax is not None and num_pes >= int(jobmin) and num_pes <= int(jobmax):
return queue.text
qname = queue.text
if self.queue_meets_spec(qname, num_pes, walltime=walltime, job=job):
return qname

return None

def get_max_walltime(self, queue):
def get_queue_specs(self, queue):
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just a generalization of getting properties of a queue.

"""
Get queue specifications by name.

Returns (jobmin, jobmax, jobname, walltimemax, is_strict)
"""
for queue_node in self.get_all_queues():
if queue_node.text == queue:
return queue_node.get("walltimemax")
jobmin = queue_node.get("jobmin")
jobmax = queue_node.get("jobmax")
jobname = queue_node.get("jobname")
walltimemax = queue_node.get("walltimemax")
strict = queue_node.get("strict") == "true"

return jobmin, jobmax, jobname, walltimemax, strict

expect(False, "Queue '%s' is unknown to this system" % queue)

def get_default_queue(self):
node = self.get_optional_node("queue", attributes={"default" : "true"})
if node is None:
node = self.get_optional_node("queue")
expect(node is not None, "No queues found")
self._default_walltime = node.get("walltimemax")
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wasn't comfortable with a method called "get_default_queue" having an important side effect.

return(node)
return node

def get_all_queues(self):
return self.get_nodes("queue")
Expand Down
3 changes: 2 additions & 1 deletion scripts/lib/CIME/case.py
Original file line number Diff line number Diff line change
Expand Up @@ -741,7 +741,7 @@ def configure(self, compset_name, grid_name, machine_name=None,

env_batch.set_batch_system(batch, batch_system_type=batch_system_type)
env_batch.create_job_groups(bjobs)
env_batch.set_job_defaults(bjobs, pesize=maxval, walltime=walltime, force_queue=queue)
env_batch.set_job_defaults(bjobs, pesize=maxval, walltime=walltime, force_queue=queue, allow_walltime_override=test)
self.schedule_rewrite(env_batch)

#--------------------------------------------
Expand Down Expand Up @@ -810,6 +810,7 @@ def configure(self, compset_name, grid_name, machine_name=None,
if model == "cesm" and not test:
self.set_value("DOUT_S",True)
self.set_value("TIMER_LEVEL", 4)

if test:
self.set_value("TEST",True)

Expand Down
1 change: 0 additions & 1 deletion scripts/lib/CIME/case_setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,6 @@ def _case_setup_impl(case, caseroot, clean=False, test_mode=False, reset=False):
logger.debug("at update TOTALPES = %s"%pestot)
case.set_value("TOTALPES", pestot)
thread_count = env_mach_pes.get_max_thread_count(models)
build_threaded = case.get_build_threaded()
cost_pes = env_mach_pes.get_cost_pes(pestot, thread_count, machine=case.get_value("MACH"))
case.set_value("COST_PES", cost_pes)

Expand Down
1 change: 0 additions & 1 deletion scripts/lib/CIME/test_scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -405,7 +405,6 @@ def _create_newcase_phase(self, test):
create_newcase_cmd += " --mpilib %s" % self._mpilib
logger.debug (" MPILIB set to %s" % self._mpilib)


if self._queue is not None:
create_newcase_cmd += " --queue=%s" % self._queue

Expand Down
84 changes: 84 additions & 0 deletions scripts/tests/scripts_regression_tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -1402,6 +1402,90 @@ def test_cime_case_xmlchange_append(self):
result = run_cmd_assert_result(self, "./xmlquery --value PIO_CONFIG_OPTS", from_dir=casedir)
self.assertEqual(result, "-opt1 -opt2")

###########################################################################
def test_cime_case_test_walltime_mgmt_1(self):
###########################################################################
if CIME.utils.get_model() != "acme":
self.skipTest("Skipping walltime test. Depends on ACME batch settings")

test_name = "ERS.f19_g16_rx1.A"
machine, compiler = "blues", "gnu"
run_cmd_assert_result(self, "unset CIME_GLOBAL_WALLTIME && %s/create_test --no-setup --machine %s %s -t %s --test-root %s --output-root %s" %
(SCRIPT_DIR, machine, test_name, self._baseline_name, self._testroot, self._testroot))

casedir = os.path.join(self._testroot,
"%s.%s" % (CIME.utils.get_full_test_name(test_name, machine=machine, compiler=compiler), self._baseline_name))
self.assertTrue(os.path.isdir(casedir), msg="Missing casedir '%s'" % casedir)

result = run_cmd_assert_result(self, "./xmlquery JOB_WALLCLOCK_TIME --subgroup=case.test --value", from_dir=casedir)
self.assertEqual(result, "0:10:00")

result = run_cmd_assert_result(self, "./xmlquery JOB_QUEUE --subgroup=case.test --value", from_dir=casedir)
self.assertEqual(result, "shared")

###########################################################################
def test_cime_case_test_walltime_mgmt_2(self):
###########################################################################
if CIME.utils.get_model() != "acme":
self.skipTest("Skipping walltime test. Depends on ACME batch settings")

test_name = "ERS_P64.f19_g16_rx1.A"
machine, compiler = "blues", "gnu"
run_cmd_assert_result(self, "unset CIME_GLOBAL_WALLTIME && %s/create_test --no-setup --machine %s %s -t %s --test-root %s --output-root %s" %
(SCRIPT_DIR, machine, test_name, self._baseline_name, self._testroot, self._testroot))

casedir = os.path.join(self._testroot,
"%s.%s" % (CIME.utils.get_full_test_name(test_name, machine=machine, compiler=compiler), self._baseline_name))
self.assertTrue(os.path.isdir(casedir), msg="Missing casedir '%s'" % casedir)

result = run_cmd_assert_result(self, "./xmlquery JOB_WALLCLOCK_TIME --subgroup=case.test --value", from_dir=casedir)
self.assertEqual(result, "03:00:00")

result = run_cmd_assert_result(self, "./xmlquery JOB_QUEUE --subgroup=case.test --value", from_dir=casedir)
self.assertEqual(result, "batch")

###########################################################################
def test_cime_case_test_walltime_mgmt_3(self):
###########################################################################
if CIME.utils.get_model() != "acme":
self.skipTest("Skipping walltime test. Depends on ACME batch settings")

test_name = "ERS_P64.f19_g16_rx1.A"
machine, compiler = "blues", "gnu"
run_cmd_assert_result(self, "unset CIME_GLOBAL_WALLTIME && %s/create_test --no-setup --machine %s %s -t %s --test-root %s --output-root %s --walltime='0:10:00'" %
(SCRIPT_DIR, machine, test_name, self._baseline_name, self._testroot, self._testroot))

casedir = os.path.join(self._testroot,
"%s.%s" % (CIME.utils.get_full_test_name(test_name, machine=machine, compiler=compiler), self._baseline_name))
self.assertTrue(os.path.isdir(casedir), msg="Missing casedir '%s'" % casedir)

result = run_cmd_assert_result(self, "./xmlquery JOB_WALLCLOCK_TIME --subgroup=case.test --value", from_dir=casedir)
self.assertEqual(result, "0:10:00")

result = run_cmd_assert_result(self, "./xmlquery JOB_QUEUE --subgroup=case.test --value", from_dir=casedir)
self.assertEqual(result, "batch") # Not smart enough to select faster queue

###########################################################################
def test_cime_case_test_walltime_mgmt_4(self):
###########################################################################
if CIME.utils.get_model() != "acme":
self.skipTest("Skipping walltime test. Depends on ACME batch settings")

test_name = "ERS_P1.f19_g16_rx1.A"
machine, compiler = "blues", "gnu"
run_cmd_assert_result(self, "unset CIME_GLOBAL_WALLTIME && %s/create_test --no-setup --machine %s %s -t %s --test-root %s --output-root %s --walltime='2:00:00'" %
(SCRIPT_DIR, machine, test_name, self._baseline_name, self._testroot, self._testroot))

casedir = os.path.join(self._testroot,
"%s.%s" % (CIME.utils.get_full_test_name(test_name, machine=machine, compiler=compiler), self._baseline_name))
self.assertTrue(os.path.isdir(casedir), msg="Missing casedir '%s'" % casedir)

result = run_cmd_assert_result(self, "./xmlquery JOB_WALLCLOCK_TIME --subgroup=case.test --value", from_dir=casedir)
self.assertEqual(result, "01:00:00")

result = run_cmd_assert_result(self, "./xmlquery JOB_QUEUE --subgroup=case.test --value", from_dir=casedir)
self.assertEqual(result, "shared")

###############################################################################
class X_TestSingleSubmit(TestCreateTestCommon):
###############################################################################
Expand Down