Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add ability to cancel batch jobs to the system #1932

Merged
merged 4 commits into from
Oct 2, 2017
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions config/acme/machines/config_batch.xml
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
<batch_system type="template" >
<batch_query args=""></batch_query>
<batch_submit></batch_submit>
<batch_cancel></batch_cancel>
<batch_redirect></batch_redirect>
<batch_directive></batch_directive>
<directives>
Expand All @@ -26,6 +27,7 @@
<batch_system type="none" >
<batch_query args=""></batch_query>
<batch_submit></batch_submit>
<batch_cancel></batch_cancel>
<batch_redirect></batch_redirect>
<batch_directive></batch_directive>
<directives>
Expand All @@ -36,6 +38,7 @@
<batch_system type="cobalt" >
<batch_query>qstat</batch_query>
<batch_submit>qsub</batch_submit>
<batch_cancel>qdel</batch_cancel>
<batch_directive></batch_directive>
<jobid_pattern>(\d+)</jobid_pattern>
<depend_string> --dependencies</depend_string>
Expand All @@ -56,6 +59,7 @@
<batch_system type="cobalt_theta" >
<batch_query>qstat</batch_query>
<batch_submit>qsub</batch_submit>
<batch_cancel>qdel</batch_cancel>
<batch_directive>#COBALT</batch_directive>
<jobid_pattern>(\d+)</jobid_pattern>
<depend_string> --dependencies</depend_string>
Expand All @@ -74,6 +78,7 @@
<batch_system type="lsf" version="9.1">
<batch_query args=" -w" >bjobs</batch_query>
<batch_submit>bsub</batch_submit>
<batch_cancel>bkill</batch_cancel>
<batch_redirect>&lt;</batch_redirect>
<batch_directive>#BSUB</batch_directive>
<jobid_pattern>&lt;(\d+)&gt;</jobid_pattern>
Expand Down Expand Up @@ -101,6 +106,7 @@
<batch_system type="pbs" >
<batch_query args="-f" >qstat</batch_query>
<batch_submit>qsub </batch_submit>
<batch_cancel>qdel</batch_cancel>
<batch_directive>#PBS</batch_directive>
<jobid_pattern>^(\S+)$</jobid_pattern>
<depend_string> -W depend=afterok:jobid</depend_string>
Expand All @@ -125,6 +131,7 @@
<batch_system type="moab" >
<batch_query>showq</batch_query>
<batch_submit>msub </batch_submit>
<batch_cancel>canceljob</batch_cancel>
<batch_directive>#MSUB</batch_directive>
<jobid_pattern>(\d+)$</jobid_pattern>
<depend_string> -W depend=afterok:jobid</depend_string>
Expand All @@ -147,6 +154,7 @@
<!-- for cab llnl -->
<batch_system type="lc_slurm">
<batch_submit>msub</batch_submit>
<batch_cancel>canceljob</batch_cancel>
<batch_directive>#MSUB</batch_directive>
<jobid_pattern>(\d+)$</jobid_pattern>
<depend_string> -l depend=jobid</depend_string>
Expand All @@ -170,6 +178,7 @@
<batch_system type="slurm" >
<batch_query per_job_arg="-j">squeue</batch_query>
<batch_submit>sbatch</batch_submit>
<batch_cancel>scancel</batch_cancel>
<batch_directive>#SBATCH</batch_directive>
<jobid_pattern>(\d+)$</jobid_pattern>
<depend_string> --dependency=afterok:jobid</depend_string>
Expand Down
5 changes: 5 additions & 0 deletions config/cesm/machines/config_batch.xml
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@
<batch_system type="cobalt" >
<batch_query>qstat</batch_query>
<batch_submit>qsub</batch_submit>
<batch_cancel>qdel</batch_cancel>
<batch_directive></batch_directive>
<jobid_pattern>(\d+)</jobid_pattern>
<depend_string> --dependencies</depend_string>
Expand All @@ -72,6 +73,7 @@
<batch_system type="cobalt_theta" >
<batch_query>qstat</batch_query>
<batch_submit>qsub</batch_submit>
<batch_cancel>qdel</batch_cancel>
<batch_directive>#COBALT</batch_directive>
<jobid_pattern>(\d+)</jobid_pattern>
<depend_string> --dependencies</depend_string>
Expand All @@ -90,6 +92,7 @@
<batch_system type="lsf" version="9.1">
<batch_query args=" -w" >bjobs</batch_query>
<batch_submit>bsub</batch_submit>
<batch_cancel>bkill</batch_cancel>
<batch_redirect>&lt;</batch_redirect>
<batch_directive>#BSUB</batch_directive>
<jobid_pattern>&lt;(\d+)&gt;</jobid_pattern>
Expand Down Expand Up @@ -117,6 +120,7 @@
<batch_system type="pbs" >
<batch_query args="-f" >qstat</batch_query>
<batch_submit>qsub </batch_submit>
<batch_cancel>qdel</batch_cancel>
<batch_directive>#PBS</batch_directive>
<jobid_pattern>^(\S+)$</jobid_pattern>
<depend_string> -W depend=afterok:jobid</depend_string>
Expand All @@ -140,6 +144,7 @@

<batch_system type="slurm" >
<batch_query per_job_arg="-j">squeue</batch_query>
<batch_cancel>scancel</batch_cancel>
<batch_directive>#SBATCH</batch_directive>
<jobid_pattern>(\d+)$</jobid_pattern>
<depend_string> --dependency=afterok:jobid</depend_string>
Expand Down
4 changes: 4 additions & 0 deletions config/xml_schemas/config_batch.xsd
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@

<!-- simple elements -->
<xs:element name="batch_submit" type="xs:string"/>
<xs:element name="batch_cancel" type="xs:string"/>
<xs:element name="batch_redirect" type="xs:string"/>
<xs:element name="batch_directive" type="xs:string"/>
<xs:element name="jobid_pattern" type="xs:string"/>
Expand Down Expand Up @@ -45,6 +46,9 @@
<!-- batch_submit: The command used to submit jobs in this batch_system. -->
<xs:element minOccurs="0" ref="batch_submit"/>

<!-- batch_cancel: The command used to cancel jobs in this batch_system. -->
<xs:element minOccurs="0" ref="batch_cancel"/>

<!-- batch_redirect: some batch systems (lsf) require the script to be read from stdin,
this is the shell redirect to handle that -->
<xs:element minOccurs="0" ref="batch_redirect"/>
Expand Down
14 changes: 14 additions & 0 deletions scripts/lib/CIME/XML/env_batch.py
Original file line number Diff line number Diff line change
Expand Up @@ -571,3 +571,17 @@ def get_status(self, jobid):
logger.warning("Batch query command '{}' failed with error '{}'".format(cmd, err))
else:
return out.strip()

def cancel_job(self, jobid):
batch_cancel = self.get_optional_node("batch_cancel")
if batch_cancel is None:
logger.warning("Batch cancellation not supported on this platform")
return False
else:
cmd = batch_cancel.text + " " + str(jobid)

status, out, err = run_cmd(cmd)
if status != 0:
logger.warning("Batch cancel command '{}' failed with error '{}'".format(cmd, out + "\n" + err))
else:
return True
26 changes: 24 additions & 2 deletions scripts/lib/CIME/case.py
Original file line number Diff line number Diff line change
Expand Up @@ -1155,20 +1155,41 @@ def submit_jobs(self, no_batch=False, job=None, skip_pnl=False,
mail_type=mail_type, batch_args=batch_args,
dry_run=dry_run)

def report_job_status(self):
def get_job_info(self):
"""
Get information on batch jobs associated with this case
"""
xml_job_ids = self.get_value("JOB_IDS")
if not xml_job_ids:
logger.info("No job ids associated with this case. Either case.submit was not run or was run with no-batch")
return {}
else:
result = {}
job_infos = xml_job_ids.split(", ") # pylint: disable=no-member
for job_info in job_infos:
jobname, jobid = job_info.split(":")
result[jobname] = jobid

return result

def report_job_status(self):
jobmap = self.get_job_info()
if not jobmap:
logger.info("No job ids associated with this case. Either case.submit was not run or was run with no-batch")
else:
for jobname, jobid in jobmap.iteritems():
status = self.get_env("batch").get_status(jobid)
if status:
logger.info("{}: {}".format(jobname, status))
else:
logger.info("{}: Unable to get status. Job may be complete already.".format(jobname))

def cancel_batch_jobs(self, jobids):
env_batch = self.get_env('batch')
for jobid in jobids:
success = env_batch.cancel_job(jobid)
if not success:
logger.warning("Failed to kill {}".format(jobid))

def get_mpirun_cmd(self, job="case.run"):
env_mach_specific = self.get_env('mach_specific')
run_exe = env_mach_specific.get_value("run_exe")
Expand Down Expand Up @@ -1396,6 +1417,7 @@ def create(self, casename, srcroot, compset_name, grid_name,
logger.warn("Leaving broken case dir {}".format(self._caseroot))

raise

def create_clone(self, newcase, keepexe=False, mach_dir=None, project=None,
cime_output_root=None, exeroot=None, rundir=None,
user_mods_dir=None):
Expand Down
8 changes: 0 additions & 8 deletions scripts/lib/CIME/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -1118,14 +1118,6 @@ def transform_vars(text, case=None, subgroup=None, overrides=None, default=None)

return text

def get_my_queued_jobs():
# TODO
return []

def delete_jobs(_):
# TODO
return True

def wait_for_unlocked(filepath):
locked = True
file_object = None
Expand Down
2 changes: 1 addition & 1 deletion scripts/lib/CIME/wait_for_tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ def create_cdash_test_xml(results, cdash_build_name, cdash_build_group, utc_time
site_elem = xmlet.Element("Site")

if ("JENKINS_START_TIME" in os.environ):
time_info_str = "Total testing time: {:d} seconds".format(current_time - int(os.environ["JENKINS_START_TIME"]))
time_info_str = "Total testing time: {:d} seconds".format(int(current_time) - int(os.environ["JENKINS_START_TIME"]))
else:
time_info_str = ""

Expand Down
41 changes: 13 additions & 28 deletions scripts/lib/jenkins_generic_job.py
Original file line number Diff line number Diff line change
@@ -1,22 +1,25 @@
import CIME.wait_for_tests
from CIME.utils import expect
from CIME.case import Case

import os, shutil, glob, signal, logging

###############################################################################
def cleanup_queue(set_of_jobs_we_created):
def cleanup_queue(test_root, test_id):
###############################################################################
"""
Delete all jobs left in the queue
"""
current_jobs = set(CIME.utils.get_my_queued_jobs())
jobs_to_delete = set_of_jobs_we_created & current_jobs
for teststatus_file in glob.glob("{}/*{}*/TestStatus".format(test_root, test_id)):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think that this should be glob.iglob ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why iglob?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

glob.iglob(pathname)
Return an iterator which yields the same values as glob() without actually storing them all simultaneously.

seems that is all that you need here.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, yes that would be better.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

case_dir = os.path.dirname(teststatus_file)
with Case(case_dir, read_only=True) as case:
jobmap = case.get_job_info()
jobkills = []
for jobname, jobid in jobmap.iteritems():
logging.warning("Found leftover batch job {} ({}) that need to be deleted".format(jobid, jobname))
jobkills.append(jobid)

if (jobs_to_delete):
logging.warning("Found leftover batch jobs that need to be deleted: {}".format(", ".join(jobs_to_delete)))
success = CIME.utils.delete_jobs(jobs_to_delete)
if not success:
logging.warning("FAILED to clean up leftover jobs!")
case.cancel_batch_jobs(jobkills)

###############################################################################
def jenkins_generic_job(generate_baselines, submit_to_cdash, no_batch,
Expand Down Expand Up @@ -77,14 +80,6 @@ def jenkins_generic_job(generate_baselines, submit_to_cdash, no_batch,
else:
os.remove(old_file)

#
# Make note of things already in the queue so we know not to delete
# them if we timeout
#
preexisting_queued_jobs = []
if (use_batch):
preexisting_queued_jobs = CIME.utils.get_my_queued_jobs()

#
# Set up create_test command and run it
#
Expand Down Expand Up @@ -118,16 +113,6 @@ def jenkins_generic_job(generate_baselines, submit_to_cdash, no_batch,
expect(create_test_stat in [0, CIME.utils.TESTS_FAILED_ERR_CODE, -signal.SIGTERM],
"Create_test script FAILED with error code '{:d}'!".format(create_test_stat))

if (use_batch):
# This is not fullproof. Any jobs that happened to be
# submitted by this user while create_test was running will be
# potentially deleted. This is still a big improvement over the
# previous implementation which just assumed all queued jobs for this
# user came from create_test.
# TODO: change this to probe test_root for jobs ids
#
our_jobs = set(CIME.utils.get_my_queued_jobs()) - set(preexisting_queued_jobs)

#
# Wait for tests
#
Expand All @@ -146,8 +131,8 @@ def jenkins_generic_job(generate_baselines, submit_to_cdash, no_batch,
cdash_build_name=cdash_build_name,
cdash_project=cdash_project,
cdash_build_group=cdash_build_group)
if (not tests_passed and use_batch and CIME.wait_for_tests.SIGNAL_RECEIVED):
if use_batch and CIME.wait_for_tests.SIGNAL_RECEIVED:
# Cleanup
cleanup_queue(our_jobs)
cleanup_queue(test_root, test_id)

return tests_passed