diff --git a/config/acme/machines/config_batch.xml b/config/acme/machines/config_batch.xml index bdb6ebfcbb5..d0bf5bc13f8 100644 --- a/config/acme/machines/config_batch.xml +++ b/config/acme/machines/config_batch.xml @@ -16,6 +16,7 @@ + @@ -26,6 +27,7 @@ + @@ -36,6 +38,7 @@ qstat qsub + qdel (\d+) --dependencies @@ -56,6 +59,7 @@ qstat qsub + qdel #COBALT (\d+) --dependencies @@ -74,6 +78,7 @@ bjobs bsub + bkill < #BSUB <(\d+)> @@ -101,6 +106,7 @@ qstat qsub + qdel #PBS ^(\S+)$ -W depend=afterok:jobid @@ -125,6 +131,7 @@ showq msub + canceljob #MSUB (\d+)$ -W depend=afterok:jobid @@ -147,6 +154,7 @@ msub + canceljob #MSUB (\d+)$ -l depend=jobid @@ -170,6 +178,7 @@ squeue sbatch + scancel #SBATCH (\d+)$ --dependency=afterok:jobid diff --git a/config/cesm/machines/config_batch.xml b/config/cesm/machines/config_batch.xml index e8f30ca0cad..6b570898970 100644 --- a/config/cesm/machines/config_batch.xml +++ b/config/cesm/machines/config_batch.xml @@ -51,6 +51,7 @@ qstat qsub + qdel (\d+) --dependencies @@ -72,6 +73,7 @@ qstat qsub + qdel #COBALT (\d+) --dependencies @@ -90,6 +92,7 @@ bjobs bsub + bkill < #BSUB <(\d+)> @@ -117,6 +120,7 @@ qstat qsub + qdel #PBS ^(\S+)$ -W depend=afterok:jobid @@ -140,6 +144,7 @@ squeue + scancel #SBATCH (\d+)$ --dependency=afterok:jobid diff --git a/config/xml_schemas/config_batch.xsd b/config/xml_schemas/config_batch.xsd index 7129e418a15..fcb7ce3e7fb 100644 --- a/config/xml_schemas/config_batch.xsd +++ b/config/xml_schemas/config_batch.xsd @@ -10,6 +10,7 @@ + @@ -45,6 +46,9 @@ + + + diff --git a/scripts/lib/CIME/XML/env_batch.py b/scripts/lib/CIME/XML/env_batch.py index bec2bad5515..0cacfa0032e 100644 --- a/scripts/lib/CIME/XML/env_batch.py +++ b/scripts/lib/CIME/XML/env_batch.py @@ -571,3 +571,17 @@ def get_status(self, jobid): logger.warning("Batch query command '{}' failed with error '{}'".format(cmd, err)) else: return out.strip() + + def cancel_job(self, jobid): + batch_cancel = self.get_optional_node("batch_cancel") + if batch_cancel is None: + logger.warning("Batch cancellation not supported on this platform") + return False + else: + cmd = batch_cancel.text + " " + str(jobid) + + status, out, err = run_cmd(cmd) + if status != 0: + logger.warning("Batch cancel command '{}' failed with error '{}'".format(cmd, out + "\n" + err)) + else: + return True diff --git a/scripts/lib/CIME/case.py b/scripts/lib/CIME/case.py index 0041ed79979..997c3a4c2d9 100644 --- a/scripts/lib/CIME/case.py +++ b/scripts/lib/CIME/case.py @@ -1155,20 +1155,41 @@ def submit_jobs(self, no_batch=False, job=None, skip_pnl=False, mail_type=mail_type, batch_args=batch_args, dry_run=dry_run) - def report_job_status(self): + def get_job_info(self): + """ + Get information on batch jobs associated with this case + """ xml_job_ids = self.get_value("JOB_IDS") if not xml_job_ids: - logger.info("No job ids associated with this case. Either case.submit was not run or was run with no-batch") + return {} else: + result = {} job_infos = xml_job_ids.split(", ") # pylint: disable=no-member for job_info in job_infos: jobname, jobid = job_info.split(":") + result[jobname] = jobid + + return result + + def report_job_status(self): + jobmap = self.get_job_info() + if not jobmap: + logger.info("No job ids associated with this case. Either case.submit was not run or was run with no-batch") + else: + for jobname, jobid in jobmap.iteritems(): status = self.get_env("batch").get_status(jobid) if status: logger.info("{}: {}".format(jobname, status)) else: logger.info("{}: Unable to get status. Job may be complete already.".format(jobname)) + def cancel_batch_jobs(self, jobids): + env_batch = self.get_env('batch') + for jobid in jobids: + success = env_batch.cancel_job(jobid) + if not success: + logger.warning("Failed to kill {}".format(jobid)) + def get_mpirun_cmd(self, job="case.run"): env_mach_specific = self.get_env('mach_specific') run_exe = env_mach_specific.get_value("run_exe") @@ -1396,6 +1417,7 @@ def create(self, casename, srcroot, compset_name, grid_name, logger.warn("Leaving broken case dir {}".format(self._caseroot)) raise + def create_clone(self, newcase, keepexe=False, mach_dir=None, project=None, cime_output_root=None, exeroot=None, rundir=None, user_mods_dir=None): diff --git a/scripts/lib/CIME/utils.py b/scripts/lib/CIME/utils.py index ac0e422a309..6fbef55c61b 100644 --- a/scripts/lib/CIME/utils.py +++ b/scripts/lib/CIME/utils.py @@ -1118,14 +1118,6 @@ def transform_vars(text, case=None, subgroup=None, overrides=None, default=None) return text -def get_my_queued_jobs(): - # TODO - return [] - -def delete_jobs(_): - # TODO - return True - def wait_for_unlocked(filepath): locked = True file_object = None diff --git a/scripts/lib/CIME/wait_for_tests.py b/scripts/lib/CIME/wait_for_tests.py index edfe9862552..a9b1d5ca6ef 100644 --- a/scripts/lib/CIME/wait_for_tests.py +++ b/scripts/lib/CIME/wait_for_tests.py @@ -56,7 +56,7 @@ def create_cdash_test_xml(results, cdash_build_name, cdash_build_group, utc_time site_elem = xmlet.Element("Site") if ("JENKINS_START_TIME" in os.environ): - time_info_str = "Total testing time: {:d} seconds".format(current_time - int(os.environ["JENKINS_START_TIME"])) + time_info_str = "Total testing time: {:d} seconds".format(int(current_time) - int(os.environ["JENKINS_START_TIME"])) else: time_info_str = "" diff --git a/scripts/lib/jenkins_generic_job.py b/scripts/lib/jenkins_generic_job.py index c3cc9b1947b..c575cd74669 100644 --- a/scripts/lib/jenkins_generic_job.py +++ b/scripts/lib/jenkins_generic_job.py @@ -1,22 +1,25 @@ import CIME.wait_for_tests from CIME.utils import expect +from CIME.case import Case import os, shutil, glob, signal, logging ############################################################################### -def cleanup_queue(set_of_jobs_we_created): +def cleanup_queue(test_root, test_id): ############################################################################### """ Delete all jobs left in the queue """ - current_jobs = set(CIME.utils.get_my_queued_jobs()) - jobs_to_delete = set_of_jobs_we_created & current_jobs + for teststatus_file in glob.iglob("{}/*{}*/TestStatus".format(test_root, test_id)): + case_dir = os.path.dirname(teststatus_file) + with Case(case_dir, read_only=True) as case: + jobmap = case.get_job_info() + jobkills = [] + for jobname, jobid in jobmap.iteritems(): + logging.warning("Found leftover batch job {} ({}) that need to be deleted".format(jobid, jobname)) + jobkills.append(jobid) - if (jobs_to_delete): - logging.warning("Found leftover batch jobs that need to be deleted: {}".format(", ".join(jobs_to_delete))) - success = CIME.utils.delete_jobs(jobs_to_delete) - if not success: - logging.warning("FAILED to clean up leftover jobs!") + case.cancel_batch_jobs(jobkills) ############################################################################### def jenkins_generic_job(generate_baselines, submit_to_cdash, no_batch, @@ -77,14 +80,6 @@ def jenkins_generic_job(generate_baselines, submit_to_cdash, no_batch, else: os.remove(old_file) - # - # Make note of things already in the queue so we know not to delete - # them if we timeout - # - preexisting_queued_jobs = [] - if (use_batch): - preexisting_queued_jobs = CIME.utils.get_my_queued_jobs() - # # Set up create_test command and run it # @@ -118,16 +113,6 @@ def jenkins_generic_job(generate_baselines, submit_to_cdash, no_batch, expect(create_test_stat in [0, CIME.utils.TESTS_FAILED_ERR_CODE, -signal.SIGTERM], "Create_test script FAILED with error code '{:d}'!".format(create_test_stat)) - if (use_batch): - # This is not fullproof. Any jobs that happened to be - # submitted by this user while create_test was running will be - # potentially deleted. This is still a big improvement over the - # previous implementation which just assumed all queued jobs for this - # user came from create_test. - # TODO: change this to probe test_root for jobs ids - # - our_jobs = set(CIME.utils.get_my_queued_jobs()) - set(preexisting_queued_jobs) - # # Wait for tests # @@ -146,8 +131,8 @@ def jenkins_generic_job(generate_baselines, submit_to_cdash, no_batch, cdash_build_name=cdash_build_name, cdash_project=cdash_project, cdash_build_group=cdash_build_group) - if (not tests_passed and use_batch and CIME.wait_for_tests.SIGNAL_RECEIVED): + if use_batch and CIME.wait_for_tests.SIGNAL_RECEIVED: # Cleanup - cleanup_queue(our_jobs) + cleanup_queue(test_root, test_id) return tests_passed