From c0087c29d6259cd8f48436fcdc9a0b65586b8c08 Mon Sep 17 00:00:00 2001 From: TSL-RamKrishna Date: Thu, 17 Dec 2020 22:40:39 +0000 Subject: [PATCH 01/13] slurm settings 1 --- pyani/pyani_config.py | 2 +- pyani/pyani_jobs.py | 5 +- pyani/run_sge.py | 26 +- pyani/run_slurm.py | 357 +++++++++++++++++++ pyani/scripts/average_nucleotide_identity.py | 23 +- 5 files changed, 387 insertions(+), 26 deletions(-) create mode 100644 pyani/run_slurm.py diff --git a/pyani/pyani_config.py b/pyani/pyani_config.py index b79de9c1..3a8a5304 100644 --- a/pyani/pyani_config.py +++ b/pyani/pyani_config.py @@ -54,7 +54,7 @@ BLASTALL_DEFAULT = Path("blastall") FORMATDB_DEFAULT = Path("formatdb") QSUB_DEFAULT = Path("qsub") - +SLURM_DEFAULT = Path("sbatch") # Stems for output files ANIM_FILESTEMS = ( "ANIm_alignment_lengths", diff --git a/pyani/pyani_jobs.py b/pyani/pyani_jobs.py index 1728c518..5dfa8d1b 100644 --- a/pyani/pyani_jobs.py +++ b/pyani/pyani_jobs.py @@ -117,7 +117,8 @@ def wait(self, interval: float = SGE_WAIT) -> None: while not self.finished: time.sleep(interval) interval = min(2.0 * interval, 60) - self.finished = os.system(f"qstat -j {self.name} > /dev/null") + #self.finished = os.system(f"qstat -j {self.name} > /dev/null") + self.finished = os.system(f"squeue -j {self.name} > /dev/null") class JobGroup(object): @@ -222,4 +223,4 @@ def wait(self, interval: float = SGE_WAIT) -> None: while not self.finished: time.sleep(interval) interval = min(2 * interval, 60) - self.finished = os.system("qstat -j %s > /dev/null" % (self.name)) + self.finished = os.system("squeue -j %s > /dev/null" % (self.name)) diff --git a/pyani/run_sge.py b/pyani/run_sge.py index 4473756a..3cb1ee2b 100644 --- a/pyani/run_sge.py +++ b/pyani/run_sge.py @@ -117,7 +117,7 @@ def run_dependency_graph( jobgraph, jgprefix: str = "ANIm_SGE_JG", sgegroupsize: int = 10000, - sgeargs: Optional[str] = None, + schedulerargs: Optional[str] = None, ) -> None: """Create and runs SGE scripts for jobs based on passed jobgraph. @@ -125,7 +125,7 @@ def run_dependency_graph( :param verbose: flag for multiprocessing verbosity :param jgprefix: a prefix for the submitted jobs, in the scheduler :param sgegroupsize: the maximum size for an array job submission - :param sgeargs: additional arguments to qsub + :param schedulerargs: additional arguments to qsub The strategy here is to loop over each job in the dependency graph and, because we expect a single main delta-filter (wrapped) job, @@ -180,7 +180,7 @@ def run_dependency_graph( logger.info("Jobs passed to scheduler in order:") for job in jobgroups: logger.info("\t%s" % job.name) - build_and_submit_jobs(Path.cwd(), jobgroups, sgeargs) + build_and_submit_jobs(Path.cwd(), jobgroups, schedulerargs) logger.info("Waiting for SGE-submitted jobs to finish (polling)") for job in jobgroups: job.wait() @@ -263,13 +263,13 @@ def extract_submittable_jobs(waiting: List) -> List: def submit_safe_jobs( - root_dir: Path, jobs: Iterable, sgeargs: Optional[str] = None + root_dir: Path, jobs: Iterable, schedulerargs: Optional[str] = None ) -> None: """Submit passed list of jobs to SGE server with dir as root for output. :param root_dir: path to output directory :param jobs: iterable of Job objects - :param sgeargs: str, additional arguments for qsub + :param schedulerargs: str, additional arguments for qsub """ logger = logging.getLogger(__name__) logger.debug("Received %s jobs", len(jobs)) @@ -304,20 +304,20 @@ def submit_safe_jobs( # Build the qsub SGE commandline (passing local environment) qsubcmd = f"{pyani_config.QSUB_DEFAULT} -V {args} {job.scriptpath}" - if sgeargs is not None: - qsubcmd = f"{qsubcmd} {sgeargs}" + if schedulerargs is not None: + qsubcmd = f"{qsubcmd} {schedulerargs}" # We've considered Bandit warnings B404,B603 and silence # subprocess.call(qsubcmd, shell=False) # nosec os.system(qsubcmd) job.submitted = True # Set the job's submitted flag to True -def submit_jobs(root_dir: Path, jobs: Iterable, sgeargs: Optional[str] = None) -> None: +def submit_jobs(root_dir: Path, jobs: Iterable, schedulerargs: Optional[str] = None) -> None: """Submit passed jobs to SGE server with passed directory as root. :param root_dir: path to output directory :param jobs: list of Job objects - :param sgeargs: str, additional arguments for qsub + :param schedulerargs: str, additional arguments for qsub """ waiting = list(jobs) # List of jobs still to be done # Loop over the list of pending jobs, while there still are any @@ -325,20 +325,20 @@ def submit_jobs(root_dir: Path, jobs: Iterable, sgeargs: Optional[str] = None) - # extract submittable jobs submittable = extract_submittable_jobs(waiting) # run those jobs - submit_safe_jobs(root_dir, submittable, sgeargs) + submit_safe_jobs(root_dir, submittable, schedulerargs) # remove those from the waiting list for job in submittable: waiting.remove(job) def build_and_submit_jobs( - root_dir: Path, jobs: Iterable, sgeargs: Optional[str] = None + root_dir: Path, jobs: Iterable, schedulerargs: Optional[str] = None ) -> None: """Submit passed iterable of Job objects to SGE. :param root_dir: root directory for SGE and job output :param jobs: list of Job objects, describing each job to be submitted - :param sgeargs: str, additional arguments to qsub + :param schedulerargs: str, additional arguments to qsub This places SGE's output in the passed root directory """ @@ -350,4 +350,4 @@ def build_and_submit_jobs( # Build and submit the passed jobs build_directories(root_dir) # build all necessary directories build_job_scripts(root_dir, jobs) # build job scripts - submit_jobs(root_dir, jobs, sgeargs) # submit the jobs to SGE + submit_jobs(root_dir, jobs, schedulerargs) # submit the jobs to SGE diff --git a/pyani/run_slurm.py b/pyani/run_slurm.py new file mode 100644 index 00000000..88d1aa00 --- /dev/null +++ b/pyani/run_slurm.py @@ -0,0 +1,357 @@ +# -*- coding: utf-8 -*- +# (c) The James Hutton Institute 2013-2019 +# (c) University of Strathclyde 2019 +# Author: Leighton Pritchard +# +# Contact: +# leighton.pritchard@strath.ac.uk +# +# Leighton Pritchard, +# Strathclyde Institute for Pharmacy and Biomedical Sciences, +# Cathedral Street, +# Glasgow, +# G1 1XQ +# Scotland, +# UK +# +# The MIT License +# +# Copyright (c) 2013-2019 The James Hutton Institute +# Copyright (c) 2019 University of Strathclyde +# +# Permission is hereby granted, free of charge, to any person obtaining a copy +# of this software and associated documentation files (the "Software"), to deal +# in the Software without restriction, including without limitation the rights +# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +# copies of the Software, and to permit persons to whom the Software is +# furnished to do so, subject to the following conditions: +# +# The above copyright notice and this permission notice shall be included in +# all copies or substantial portions of the Software. +# +# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +# THE SOFTWARE. +"""Code to run a set of command-line jobs using SLURM + +For parallelisation on multi-node system, we use some custom code to submit +jobs. +""" + +import itertools +import logging +import os + +from collections import defaultdict +from pathlib import Path +from typing import Dict, Generator, Iterable, List, Optional, Set + +from . import pyani_config +from .pyani_jobs import Job, JobGroup + + +def split_seq(iterable: Iterable, size: int) -> Generator: + """Split a passed iterable into chunks of a given size. + + :param iterable: iterable + :param size: int, number of items to retun in each chunk + """ + elm = iter(iterable) + item = list(itertools.islice(elm, size)) + while item: + yield item + item = list(itertools.islice(elm, size)) + + +# Build a list of SLURM jobs from a graph +def build_joblist(jobgraph) -> List: + """Return a list of jobs, from a passed jobgraph. + + :param jobgraph: + """ + logger = logging.getLogger(__name__) + + jobset = set() # type: Set + for job in jobgraph: + jobset = populate_jobset(job, jobset, depth=1) + + logger.debug("built jobset: %s", jobset) + return list(jobset) + + +# Convert joblist into jobgroups +def compile_jobgroups_from_joblist( + joblist: List, jgprefix: str, sgegroupsize: int +) -> List: + """Return list of jobgroups, rather than list of jobs. + + :param joblist: + :param jgprefix: str, prefix for SLRUM jobgroup + :param sgegroupsize: int, number of jobs in each SLURM jobgroup + """ + jobcmds = defaultdict(list) # type: Dict[str, List[str]] + for job in joblist: + jobcmds[job.command.split(" ", 1)[0]].append(job.command) + jobgroups = [] # type: List + for cmds in list(jobcmds.items()): + # Break arglist up into batches of sgegroupsize (default: 10,000) + sublists = split_seq(cmds[1], sgegroupsize) + count = 0 + for sublist in sublists: + count += 1 + sge_jobcmdlist = [f'"{jc}"' for jc in sublist] + jobgroups.append( + JobGroup( + f"{jgprefix}_{count}", "$cmds", arguments={"cmds": sge_jobcmdlist} + ) + ) + return jobgroups + + +# Run a job dependency graph, with SLURM +def run_dependency_graph( + jobgraph, + jgprefix: str = "ANIm_SLURM_JG", + sgegroupsize: int = 1000, # changed to 1000 from 10000 + schedulerargs: Optional[str] = None, +) -> None: + """Create and runs SLURM scripts for jobs based on passed jobgraph. + + :param jobgraph: list of jobs, which may have dependencies. + :param verbose: flag for multiprocessing verbosity + :param jgprefix: a prefix for the submitted jobs, in the scheduler + :param sgegroupsize: the maximum size for an array job submission + :param schedulerargs: additional arguments to qsub + + The strategy here is to loop over each job in the dependency graph + and, because we expect a single main delta-filter (wrapped) job, + with a single nucmer dependency for each analysis, we can split + the dependency graph into two lists of corresponding jobs, and + run the corresponding nucmer jobs before the delta-filter jobs. + """ + logger = logging.getLogger(__name__) + + logger.debug("Received jobgraph with %d jobs", len(jobgraph)) + + + jobs_main = [] # Can be run first, before deps + jobs_deps = [] # Depend on the main jobs + + # Try to be informative by telling the user what jobs will run + dep_count = 0 # how many dependencies are there + logger.info("Jobs to run with scheduler") + for job in jobgraph: + logger.info("{0}: {1}".format(job.name, job.command)) + jobs_main.append(job) + if job.dependencies: + dep_count += len(job.dependencies) + for dep in job.dependencies: + logger.info("\t[^ depends on: %s (%s)]", dep.name, dep.command) + jobs_deps.append(dep) + logger.info("There are %d job dependencies" % dep_count) + # Clear dependencies in main group + for job in jobs_main: + job.dependencies = [] + + # We can use an array (or series of arrays) to schedule our jobs. + # This cuts down on social problems with long job lists choking up + # the queue. + # We split the main and dependent jobs into separate JobGroups. + # These JobGroups are paired, in order + logger.info("Compiling main and dependent jobs into separate JobGroups") + maingroups = compile_jobgroups_from_joblist( + jobs_main, jgprefix + "_main", sgegroupsize + ) + depgroups = compile_jobgroups_from_joblist( + jobs_deps, jgprefix + "_deps", sgegroupsize + ) + + # Assign dependencies to jobgroups + for mgp, dgp in zip(maingroups, depgroups): + mgp.add_dependency(dgp) + jobgroups = maingroups + depgroups + + # Send jobs to scheduler + logger.info("Running jobs with scheduler...") + logger.info("Jobs passed to scheduler in order:") + for job in jobgroups: + logger.info("\t%s" % job.name) + build_and_submit_jobs(Path.cwd(), jobgroups, schedulerargs) + logger.info("Waiting for SLURM-submitted jobs to finish (polling)") + for job in jobgroups: + job.wait() + + +def populate_jobset(job: Job, jobset: Set, depth: int) -> Set: + """Create set of jobs reflecting dependency tree. + + :param job: + :param jobset: + :param depth: + + The set contains jobs at different depths of the dependency tree, + retaining dependencies as strings, not Jobs. + """ + jobset.add(job) + if not job.dependencies: + return jobset + for j in job.dependencies: + jobset = populate_jobset(j, jobset, depth + 1) + return jobset + + +def build_directories(root_dir: Path) -> None: + """Construct the subdirectories output, stderr, stdout, and jobs. + + :param root_dir: path of root directory in which to place output + + Subdirectories are created in the passed root directory. These + subdirectories have the following roles: + + jobs Stores the scripts for each job + stderr Stores the stderr output from SLURM + stdout Stores the stdout output from SLURM + output Stores output (if the scripts place the output here) + + - root_dir Path to the top-level directory for creation of subdirectories + """ + # If the root directory doesn't exist, create it + if not root_dir.exists(): + root_dir.mkdir(exist_ok=True) + + # Create subdirectories + directories = [ + root_dir / subdir for subdir in ("output", "stderr", "stdout", "jobs") + ] + for dirname in directories: + dirname.mkdir(exist_ok=True) + + +def build_job_scripts(root_dir: Path, jobs: List) -> None: + """Construct script for each passed Job in the jobs iterable. + + :param root_dir: Path to output directory + :param jobs: + """ + # Loop over the job list, creating each job script in turn, and then adding + # scriptPath to the Job object + for job in jobs: + scriptpath = root_dir / "jobs" / job.name + with open(scriptpath, "w") as scriptfile: + scriptfile.write(f"#!/bin/sh\n#$ -S /bin/bash\n{job.script}\n") + job.scriptpath = scriptpath + + +def extract_submittable_jobs(waiting: List) -> List: + """Obtain list of jobs that are able to be submitted from pending list. + + :param waiting: list of Job objects + """ + submittable = set() # Holds jobs that are able to be submitted + # Loop over each job, and check all the subjobs in that job's dependency + # list. If there are any, and all of these have been submitted, then + # append the job to the list of submittable jobs. + for job in waiting: + unsatisfied = sum([(subjob.submitted is False) for subjob in job.dependencies]) + if unsatisfied == 0: + submittable.add(job) + return list(submittable) + + +def submit_safe_jobs( + root_dir: Path, jobs: Iterable, schedulerargs: Optional[str] = None +) -> None: + """Submit passed list of jobs to SLURM server with dir as root for output. + + :param root_dir: path to output directory + :param jobs: iterable of Job objects + :param schedulerargs: str, additional arguments for qsub + """ + logger = logging.getLogger(__name__) + logger.debug("Received %s jobs", len(jobs)) + + # Loop over each job, constructing SLURM command-line based on job settings + for job in jobs: + job.out = root_dir / "stdout" + job.err = root_dir / "stderr" + + # Add the job name, current working directory, and SLURM stdout/stderr + # directories to the SLURM command line + #args = f" -N {job.name} " + args += f" -J {job.name}" + #args += " -cwd " # not required in slurm + args += f" -o {job.out} -e {job.err} " + + # If a queue is specified, add this to the SLURM command line + # LP: This has an undeclared variable, not sure why - delete? + # if job.queue is not None and job.queue in local_queues: + # args += local_queues[job.queue] + + # If the job is actually a JobGroup, add the task numbering argument + if isinstance(job, JobGroup): + args += f"-t 1:{job.tasks} " + + # If there are dependencies for this job, hold the job until they are + # complete + if job.dependencies: + #args += "-hold_jid " + args += " --dependency=afterok:" + for dep in job.dependencies: + args += dep.name + "," + args = args[:-1] + + # Build the qsub SLURM commandline (passing local environment) + #qsubcmd = f"{pyani_config.QSUB_DEFAULT} -V {args} {job.scriptpath}" + slurmcmd = f"{pyani_config.SLURM_DEFAULT} {args} {job.scriptpath}" + if schedulerargs is not None: + #qsubcmd = f"{qsubcmd} {schedulerargs}" + slurmcmd = f"{slurmcmd} {slurmargs}" + # We've considered Bandit warnings B404,B603 and silence + # subprocess.call(qsubcmd, shell=False) # nosec + os.system(qsubcmd) + job.submitted = True # Set the job's submitted flag to True + + +def submit_jobs(root_dir: Path, jobs: Iterable, schedulerargs: Optional[str] = None) -> None: + """Submit passed jobs to SLURM server with passed directory as root. + + :param root_dir: path to output directory + :param jobs: list of Job objects + :param schedulerargs: str, additional arguments for qsub + """ + waiting = list(jobs) # List of jobs still to be done + # Loop over the list of pending jobs, while there still are any + while waiting: + # extract submittable jobs + submittable = extract_submittable_jobs(waiting) + # run those jobs + submit_safe_jobs(root_dir, submittable, schedulerargs) + # remove those from the waiting list + for job in submittable: + waiting.remove(job) + + +def build_and_submit_jobs( + root_dir: Path, jobs: Iterable, schedulerargs: Optional[str] = None +) -> None: + """Submit passed iterable of Job objects to SLURM. + + :param root_dir: root directory for SLURM and job output + :param jobs: list of Job objects, describing each job to be submitted + :param schedulerargs: str, additional arguments to qsub + + This places SLURM's output in the passed root directory + """ + # If the passed set of jobs is not a list, turn it into one. This makes the + # use of a single JobGroup a little more intutitive + if not isinstance(jobs, list): + jobs = [jobs] + + # Build and submit the passed jobs + build_directories(root_dir) # build all necessary directories + build_job_scripts(root_dir, jobs) # build job scripts + submit_jobs(root_dir, jobs, schedulerargs) # submit the jobs to SLURM diff --git a/pyani/scripts/average_nucleotide_identity.py b/pyani/scripts/average_nucleotide_identity.py index 10fc7767..3bbb6af7 100755 --- a/pyani/scripts/average_nucleotide_identity.py +++ b/pyani/scripts/average_nucleotide_identity.py @@ -151,7 +151,7 @@ __version__, ) from pyani import run_multiprocessing as run_mp -from pyani import run_sge +from pyani import run_sge, run_slurm from pyani.pyani_config import params_mpl, ALIGNDIR, FRAGSIZE, TETRA_FILESTEMS from pyani.logger import config_logger @@ -309,7 +309,7 @@ def parse_cmdline(argv: Optional[List] = None) -> Namespace: dest="scheduler", action="store", default="multiprocessing", - choices=["multiprocessing", "SGE"], + choices=["multiprocessing", "SGE", "SLURM"], help="Job scheduler (default multiprocessing, i.e. locally)", ) parser.add_argument( @@ -322,16 +322,16 @@ def parse_cmdline(argv: Optional[List] = None) -> Namespace: "(default zero, meaning use all available cores)", ) parser.add_argument( - "--SGEgroupsize", + "--groupsize", dest="sgegroupsize", action="store", default=10000, type=int, - help="Number of jobs to place in an SGE array group " "(default 10000)", + help="Number of jobs to place in an hpc array group " "(default 10000)", ) parser.add_argument( - "--SGEargs", - dest="sgeargs", + "--hpcargs", + dest="schedulerargs", action="store", default=None, type=str, @@ -425,7 +425,7 @@ def parse_cmdline(argv: Optional[List] = None) -> Namespace: dest="jobprefix", action="store", default="ANI", - help="Prefix for SGE jobs (default ANI).", + help="Prefix for SGE/SLURM jobs (default ANI).", ) # Parse arguments if argv is None: @@ -543,13 +543,13 @@ def calculate_anim( else: logger.info("All multiprocessing jobs complete.") else: - logger.info("Running jobs with SGE") + logger.info("Running jobs with ", args.scheduler) logger.info("Jobarray group size set to %d", args.sgegroupsize) run_sge.run_dependency_graph( joblist, jgprefix=args.jobprefix, sgegroupsize=args.sgegroupsize, - sgeargs=args.sgeargs, + schedulerargs=args.schedulerargs, ) else: logger.warning("Skipping NUCmer run (as instructed)!") @@ -673,6 +673,9 @@ def run_blast( elif args.scheduler == "SGE": logger.info("Running dependency graph with SGE") run_sge.run_dependency_graph(jobgraph) + elif args.scheduler.upper() == "SLURM": + logger.info("Running dependency graph with SLURM") + run_sge.run_dependency_graph(jobgraph) else: logger.error(f"Scheduler {args.scheduler} not recognised (exiting)") raise SystemError(1) @@ -934,7 +937,7 @@ def test_scheduler(args: Namespace, logger: Logger) -> None: Exits if the scheduler is invalid """ - schedulers = ["multiprocessing", "SGE"] + schedulers = ["multiprocessing", "SGE", "SLURM"] if args.scheduler not in schedulers: logger.error( f"Valid schedulers are: {'; '.join(schedulers)}\n" From 4d362df3e6fee41a968a6069bcf43214269f3a4f Mon Sep 17 00:00:00 2001 From: TSL-RamKrishna Date: Wed, 20 Jan 2021 00:02:27 +0000 Subject: [PATCH 02/13] ANIm analysis working --- pyani/anim.py | 5 +++-- pyani/pyani_jobs.py | 30 ++++++++++++++++++++++-------- pyani/run_slurm.py | 34 ++++++++++++++++++++++++++-------- setup.py | 5 +++-- 4 files changed, 54 insertions(+), 20 deletions(-) diff --git a/pyani/anim.py b/pyani/anim.py index 71f0a376..4282fb3d 100644 --- a/pyani/anim.py +++ b/pyani/anim.py @@ -98,8 +98,9 @@ def get_version(nucmer_exe: Path = pyani_config.NUCMER_DEFAULT) -> str: result = subprocess.run( cmdline, shell=False, stdout=subprocess.PIPE, stderr=subprocess.PIPE, check=True ) - match = re.search(r"(?<=version\s)[0-9\.]*", str(result.stderr, "utf-8")) - version = match.group() # type: ignore + #match = re.search(r"(?<=version\s)[0-9\.]*", str(result.stderr, "utf-8")) + #version = match.group() # type: ignore + version = str(result.stdout, "utf-8") return f"{platform.system()}_{version}" diff --git a/pyani/pyani_jobs.py b/pyani/pyani_jobs.py index 5dfa8d1b..0401fd1c 100644 --- a/pyani/pyani_jobs.py +++ b/pyani/pyani_jobs.py @@ -62,7 +62,7 @@ import os import time - +import subprocess from typing import Any, Dict, List, Optional from .pyani_config import SGE_WAIT @@ -118,7 +118,13 @@ def wait(self, interval: float = SGE_WAIT) -> None: time.sleep(interval) interval = min(2.0 * interval, 60) #self.finished = os.system(f"qstat -j {self.name} > /dev/null") - self.finished = os.system(f"squeue -j {self.name} > /dev/null") + print("class job; squeue -n {self.name}") + cmd = "squeue -n %s | tail -n+2 | wc -l" % (self.name) + jobcount=subprocess.Popen(cmd, shell=True, stdout=subprocess.PIPE) + count, err = jobcount.communicate() + if int(str(count, 'utf-8')) == 0: + self.finished = True + #self.finished = os.system(f"squeue -n {self.name} > /dev/null") class JobGroup(object): @@ -145,7 +151,7 @@ def __init__( For example, to use a command 'my_cmd' with the arguments '-foo' and '-bar' having values 1, 2, 3, 4 and 'a', 'b', 'c', 'd' in all combinations, respectively, you would pass - command='my_cmd $SGE_TASK_ID -foo $fooargs -bar $barargs' + command='my_cmd $SLURM_TASK_ID -foo $fooargs -bar $barargs' arguments='{'fooargs': ['1','2','3','4'], 'barargs': ['a','b','c','d']} """ @@ -162,13 +168,13 @@ def __init__( self.generate_script() # Make SGE script for sweep/array def generate_script(self) -> None: - """Create the SGE script that will run the jobs in the JobGroup.""" + """Create the SLURM script that will run the jobs in the JobGroup.""" self.script = "" # type: str total = 1 # total number of jobs in this group - # for now, SGE_TASK_ID becomes TASK_ID, but we base it at zero - self.script += """let "TASK_ID=$SGE_TASK_ID - 1"\n""" - + # for now, SLURM_TASK_ID becomes TASK_ID, but we base it at zero + #self.script += """let "TASK_ID=$SLURM_TASK_ID - 1"\n""" + self.script += """let "TASK_ID=$SLURM_ARRAY_TASK_ID - 1"\n""" # build the array definitions for key in sorted(self.arguments.keys()): # The keys are sorted for py3.5 compatibility with tests @@ -223,4 +229,12 @@ def wait(self, interval: float = SGE_WAIT) -> None: while not self.finished: time.sleep(interval) interval = min(2 * interval, 60) - self.finished = os.system("squeue -j %s > /dev/null" % (self.name)) + print("class jobgroup; squeue -n %s" % (self.name), "finished? ", self.finished) + cmd = "squeue -n %s | tail -n+2 | wc -l" % (self.name) + jobcount=subprocess.Popen(cmd, shell=True, stdout=subprocess.PIPE) + count, err = jobcount.communicate() + + if int(str(count, 'utf-8')) == 0: + self.finished = True + print("Finished ", self.finished) + #self.finished = os.system("squeue -n %s > /dev/null" % (self.name)) diff --git a/pyani/run_slurm.py b/pyani/run_slurm.py index 88d1aa00..265bfe9e 100644 --- a/pyani/run_slurm.py +++ b/pyani/run_slurm.py @@ -45,6 +45,7 @@ import itertools import logging import os +import subprocess from collections import defaultdict from pathlib import Path @@ -282,9 +283,9 @@ def submit_safe_jobs( # Add the job name, current working directory, and SLURM stdout/stderr # directories to the SLURM command line #args = f" -N {job.name} " - args += f" -J {job.name}" + args = f" -J {job.name}" #args += " -cwd " # not required in slurm - args += f" -o {job.out} -e {job.err} " + #args += f" -o {job.out} -e {job.err} " # removing this code as the same output/err filename generates error in slurm # If a queue is specified, add this to the SLURM command line # LP: This has an undeclared variable, not sure why - delete? @@ -293,26 +294,43 @@ def submit_safe_jobs( # If the job is actually a JobGroup, add the task numbering argument if isinstance(job, JobGroup): - args += f"-t 1:{job.tasks} " - + #args += f"-t 1:{job.tasks} " + args += f" --array=1-{job.tasks} " # If there are dependencies for this job, hold the job until they are # complete if job.dependencies: #args += "-hold_jid " args += " --dependency=afterok:" for dep in job.dependencies: - args += dep.name + "," + # get the jobid using the jobname + squeue_cmd_for_jobid = "squeue -n " + dep.name + " | grep -v JOBID | awk '{print $1}' | sed 's/_/ /' | awk '{print $1}' | sort | uniq", + squeue_jobid = subprocess.Popen(squeue_cmd_for_jobid, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE) + + jobid, err = squeue_jobid.communicate() + jobid = str(jobid, 'utf-8').strip() + #args += dep.name + "," + if jobid != "": + args += jobid + "," args = args[:-1] - # Build the qsub SLURM commandline (passing local environment) + + # Build the sbatch SLURM commandline (passing local environment) #qsubcmd = f"{pyani_config.QSUB_DEFAULT} -V {args} {job.scriptpath}" - slurmcmd = f"{pyani_config.SLURM_DEFAULT} {args} {job.scriptpath}" + slurmcmd = f"{pyani_config.SLURM_DEFAULT} {args} {job.scriptpath}".strip() if schedulerargs is not None: #qsubcmd = f"{qsubcmd} {schedulerargs}" slurmcmd = f"{slurmcmd} {slurmargs}" # We've considered Bandit warnings B404,B603 and silence # subprocess.call(qsubcmd, shell=False) # nosec - os.system(qsubcmd) + #os.system(qsubcmd) + print("running command", slurmcmd) + os.system(slurmcmd) + + # out = subprocess.Popen(slurmcmd, stdout=PIPE, stderr=PIPE, shell=True) + # jobSubmitOutput = out.stdout.read() + # if "Submitted batch job" in jobSubmitOutput: + # jobid = jobSubmitOutput.split()[3] + job.submitted = True # Set the job's submitted flag to True diff --git a/setup.py b/setup.py index 069be87e..dad6fda3 100644 --- a/setup.py +++ b/setup.py @@ -88,8 +88,9 @@ license="MIT", keywords="genome bioinformatics sequence taxonomy", platforms="Posix; MacOS X", - url="http://widdowquinn.github.io/pyani/", # project home page - download_url="https://github.com/widdowquinn/pyani/releases", + #url="http://widdowquinn.github.io/pyani/", # project home page + #download_url="https://github.com/widdowquinn/pyani/releases", + #download_url="https://github.com/TeamMacLean/pyani" scripts=[], entry_points={ "console_scripts": [ From 28a31f85e32efcc64179e4149db07ce44937ae5e Mon Sep 17 00:00:00 2001 From: TSL-RamKrishna Date: Wed, 20 Jan 2021 12:59:11 +0000 Subject: [PATCH 03/13] check scheduler sge or slurm to wait jobs --- pyani/pyani_jobs.py | 55 ++++++++++++++++++++++++++++++++------------- pyani/run_sge.py | 2 +- pyani/run_slurm.py | 2 +- 3 files changed, 41 insertions(+), 18 deletions(-) diff --git a/pyani/pyani_jobs.py b/pyani/pyani_jobs.py index 0401fd1c..ea3b90b5 100644 --- a/pyani/pyani_jobs.py +++ b/pyani/pyani_jobs.py @@ -62,7 +62,7 @@ import os import time -import subprocess +from subprocess import Popen, PIPE from typing import Any, Dict, List, Optional from .pyani_config import SGE_WAIT @@ -118,13 +118,22 @@ def wait(self, interval: float = SGE_WAIT) -> None: time.sleep(interval) interval = min(2.0 * interval, 60) #self.finished = os.system(f"qstat -j {self.name} > /dev/null") - print("class job; squeue -n {self.name}") - cmd = "squeue -n %s | tail -n+2 | wc -l" % (self.name) - jobcount=subprocess.Popen(cmd, shell=True, stdout=subprocess.PIPE) - count, err = jobcount.communicate() - if int(str(count, 'utf-8')) == 0: - self.finished = True - #self.finished = os.system(f"squeue -n {self.name} > /dev/null") + + cmd = "which qsub" + cmd_out = get_cmd_output(cmd) + if cmd_out != "": # hpc is SGE + self.finished = os.system(f"qstat -j {self.name} > /dev/null") + + cmd = "which sbatch" + cmd_out = get_cmd_output(cmd) + if cmd_out != "": # hpc is SLURM + print("class job; squeue -n %s" % (self.name), "finished? ", self.finished) + cmd = "squeue -n %s | tail -n+2 | wc -l" % (self.name) + count = get_cmd_output(cmd) + + if int(count) == 0: + self.finished = True + print("Finished ", self.finished) class JobGroup(object): @@ -135,8 +144,10 @@ def __init__( self, name: str, command: str, + scheduler: str, queue: Optional[str] = None, arguments: Optional[Dict[str, List[Any]]] = None, + ) -> None: """Instantiate a JobGroup object. @@ -161,6 +172,7 @@ def __init__( self.dependencies = [] # type: List[Any] self.submitted = False # type: bool self.finished = False # type: int + self.scheduler = scheduler if arguments is not None: self.arguments = arguments # Dictionary of arguments for command else: @@ -229,12 +241,23 @@ def wait(self, interval: float = SGE_WAIT) -> None: while not self.finished: time.sleep(interval) interval = min(2 * interval, 60) - print("class jobgroup; squeue -n %s" % (self.name), "finished? ", self.finished) - cmd = "squeue -n %s | tail -n+2 | wc -l" % (self.name) - jobcount=subprocess.Popen(cmd, shell=True, stdout=subprocess.PIPE) - count, err = jobcount.communicate() - if int(str(count, 'utf-8')) == 0: - self.finished = True - print("Finished ", self.finished) - #self.finished = os.system("squeue -n %s > /dev/null" % (self.name)) + if self.scheduler.lower() == "sge" : # hpc is SGE + self.finished = os.system("qstat -j %s > /dev/null" % (self.name)) + + elif self.scheduler.lower() == "slurm" : # hpc is SLURM + print("Scheduler slurm: squeue -n %s" % (self.name), "finished? ", self.finished) + cmd = "squeue -n %s | tail -n+2 | wc -l" % (self.name) + count = get_cmd_output(cmd) + + if int(count) == 0: + self.finished = True + print("Finished ", self.finished) + +def get_cmd_output(cmd): + + """ call subprocess popen to get command stdout """ + + p = Popen(cmd, shell=True, stdout=PIPE, stderr=PIPE) + out, error = p.communicate() + return str(out, 'utf-8') diff --git a/pyani/run_sge.py b/pyani/run_sge.py index 3cb1ee2b..e786d0e9 100644 --- a/pyani/run_sge.py +++ b/pyani/run_sge.py @@ -106,7 +106,7 @@ def compile_jobgroups_from_joblist( sge_jobcmdlist = [f'"{jc}"' for jc in sublist] jobgroups.append( JobGroup( - f"{jgprefix}_{count}", "$cmds", arguments={"cmds": sge_jobcmdlist} + f"{jgprefix}_{count}", "$cmds", arguments={"cmds": sge_jobcmdlist}, scheduler="sge" ) ) return jobgroups diff --git a/pyani/run_slurm.py b/pyani/run_slurm.py index 265bfe9e..36fc1994 100644 --- a/pyani/run_slurm.py +++ b/pyani/run_slurm.py @@ -107,7 +107,7 @@ def compile_jobgroups_from_joblist( sge_jobcmdlist = [f'"{jc}"' for jc in sublist] jobgroups.append( JobGroup( - f"{jgprefix}_{count}", "$cmds", arguments={"cmds": sge_jobcmdlist} + f"{jgprefix}_{count}", "$cmds", arguments={"cmds": sge_jobcmdlist}, scheduler="slurm" ) ) return jobgroups From 39dbc5bb445374748d2173b0bd21bfc56f2b1361 Mon Sep 17 00:00:00 2001 From: TSL-RamKrishna Date: Wed, 20 Jan 2021 13:01:56 +0000 Subject: [PATCH 04/13] options renamed --- pyani/scripts/average_nucleotide_identity.py | 15 ++++++++---- pyani/scripts/parsers/scheduling_parser.py | 10 ++++---- pyani/scripts/subcommands/subcmd_anim.py | 25 +++++++++++++------- 3 files changed, 33 insertions(+), 17 deletions(-) diff --git a/pyani/scripts/average_nucleotide_identity.py b/pyani/scripts/average_nucleotide_identity.py index 3bbb6af7..b17d7e6d 100755 --- a/pyani/scripts/average_nucleotide_identity.py +++ b/pyani/scripts/average_nucleotide_identity.py @@ -151,7 +151,8 @@ __version__, ) from pyani import run_multiprocessing as run_mp -from pyani import run_sge, run_slurm +#from pyani import run_sge +from pyani import run_slurm from pyani.pyani_config import params_mpl, ALIGNDIR, FRAGSIZE, TETRA_FILESTEMS from pyani.logger import config_logger @@ -545,7 +546,7 @@ def calculate_anim( else: logger.info("Running jobs with ", args.scheduler) logger.info("Jobarray group size set to %d", args.sgegroupsize) - run_sge.run_dependency_graph( + run_slurm.run_dependency_graph( joblist, jgprefix=args.jobprefix, sgegroupsize=args.sgegroupsize, @@ -672,10 +673,10 @@ def run_blast( logger.info("All multiprocessing jobs complete.") elif args.scheduler == "SGE": logger.info("Running dependency graph with SGE") - run_sge.run_dependency_graph(jobgraph) + run_slurm.run_dependency_graph(jobgraph) elif args.scheduler.upper() == "SLURM": logger.info("Running dependency graph with SLURM") - run_sge.run_dependency_graph(jobgraph) + run_slurm.run_dependency_graph(jobgraph) else: logger.error(f"Scheduler {args.scheduler} not recognised (exiting)") raise SystemError(1) @@ -815,7 +816,7 @@ def draw(args: Namespace, filestems: List[str], gformat: str) -> None: infilename = fullstem.with_suffix(".tab") dfm = pd.read_csv(infilename, index_col=0, sep="\t") logger.info("Writing heatmap to %s", outfilename) - print(args.labels, args.classes) + #print(args.labels, args.classes) params = pyani_graphics.Params( params_mpl(dfm)[filestem], pyani_tools.get_labels(args.labels, logger=logger), @@ -958,6 +959,7 @@ def run_main(argsin: Optional[Namespace] = None) -> int: # Process command-line and build logger args = process_arguments(argsin) + logger = logging.getLogger(__name__) config_logger(args) @@ -1020,3 +1022,6 @@ def run_main(argsin: Optional[Namespace] = None) -> int: # Exit return 0 + +if __name__=="__main__": + run_main() diff --git a/pyani/scripts/parsers/scheduling_parser.py b/pyani/scripts/parsers/scheduling_parser.py index fc728387..51ff0951 100644 --- a/pyani/scripts/parsers/scheduling_parser.py +++ b/pyani/scripts/parsers/scheduling_parser.py @@ -56,7 +56,7 @@ def build() -> ArgumentParser: dest="scheduler", action="store", default="multiprocessing", - choices=["multiprocessing", "SGE"], + choices=["multiprocessing", "SGE", "SLURM"], help="Job scheduler (default multiprocessing, " + "i.e. locally)", ) parser.add_argument( @@ -69,7 +69,8 @@ def build() -> ArgumentParser: "(default zero, meaning use all available cores)", ) parser.add_argument( - "--SGEgroupsize", + #"--SGEgroupsize", + "--groupsize", dest="sgegroupsize", action="store", default=10000, @@ -77,12 +78,13 @@ def build() -> ArgumentParser: help="Number of jobs to place in an SGE array group " "(default 10000)", ) parser.add_argument( - "--SGEargs", + #"--SGEargs", + "--hpcargs", dest="sgeargs", action="store", default=None, type=str, - help="Additional arguments for qsub", + help="Additional arguments for qsub/sbatch", ) parser.add_argument( "--jobprefix", diff --git a/pyani/scripts/subcommands/subcmd_anim.py b/pyani/scripts/subcommands/subcmd_anim.py index 9473a164..bd559ce9 100644 --- a/pyani/scripts/subcommands/subcmd_anim.py +++ b/pyani/scripts/subcommands/subcmd_anim.py @@ -55,6 +55,7 @@ pyani_config, pyani_jobs, run_sge, + run_slurm, run_multiprocessing as run_mp, ) from pyani.pyani_files import collect_existing_output @@ -375,16 +376,24 @@ def run_anim_jobs(joblist: List[ComparisonJob], args: Namespace) -> None: ) raise PyaniException("Multiprocessing run failed in ANIm") logger.info("Multiprocessing run completed without error") - elif args.scheduler.lower() == "sge": - logger.info("Running jobs with SGE") + elif args.scheduler.lower() == "sge" or args.scheduler.lower() == "slurm": + logger.info("Running jobs with ", args.scheduler) logger.debug("Setting jobarray group size to %d", args.sgegroupsize) logger.debug("Joblist contains %d jobs", len(joblist)) - run_sge.run_dependency_graph( - [_.job for _ in joblist], - jgprefix=args.jobprefix, - sgegroupsize=args.sgegroupsize, - sgeargs=args.sgeargs, - ) + if args.scheduler.lower() == "sge": + run_sge.run_dependency_graph( + [_.job for _ in joblist], + jgprefix=args.jobprefix, + sgegroupsize=args.sgegroupsize, + sgeargs=args.sgeargs, + ) + elif args.scheduler.lower() == "slurm": + run_slurm.run_dependency_graph( + [_.job for _ in joblist], + jgprefix=args.jobprefix, + sgegroupsize=args.sgegroupsize, + #sgeargs=args.sgeargs, + ) else: logger.error(termcolor("Scheduler %s not recognised", "red"), args.scheduler) raise SystemError(1) From 700cdadf0579d5581938b09f5e11a8bbcc3f21d4 Mon Sep 17 00:00:00 2001 From: TSL-RamKrishna Date: Wed, 20 Jan 2021 13:12:11 +0000 Subject: [PATCH 05/13] changes to Job class removed --- pyani/pyani_jobs.py | 19 +------------------ 1 file changed, 1 insertion(+), 18 deletions(-) diff --git a/pyani/pyani_jobs.py b/pyani/pyani_jobs.py index ea3b90b5..60c7bc2e 100644 --- a/pyani/pyani_jobs.py +++ b/pyani/pyani_jobs.py @@ -117,24 +117,7 @@ def wait(self, interval: float = SGE_WAIT) -> None: while not self.finished: time.sleep(interval) interval = min(2.0 * interval, 60) - #self.finished = os.system(f"qstat -j {self.name} > /dev/null") - - cmd = "which qsub" - cmd_out = get_cmd_output(cmd) - if cmd_out != "": # hpc is SGE - self.finished = os.system(f"qstat -j {self.name} > /dev/null") - - cmd = "which sbatch" - cmd_out = get_cmd_output(cmd) - if cmd_out != "": # hpc is SLURM - print("class job; squeue -n %s" % (self.name), "finished? ", self.finished) - cmd = "squeue -n %s | tail -n+2 | wc -l" % (self.name) - count = get_cmd_output(cmd) - - if int(count) == 0: - self.finished = True - print("Finished ", self.finished) - + self.finished = os.system(f"qstat -j {self.name} > /dev/null") class JobGroup(object): From e5de1bf75508722304abfb102a4727d348527f18 Mon Sep 17 00:00:00 2001 From: TSL-RamKrishna Date: Thu, 21 Jan 2021 10:51:42 +0000 Subject: [PATCH 06/13] indentation error fixed --- pyani/pyani_jobs.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pyani/pyani_jobs.py b/pyani/pyani_jobs.py index 60c7bc2e..bf7fe65c 100644 --- a/pyani/pyani_jobs.py +++ b/pyani/pyani_jobs.py @@ -117,7 +117,7 @@ def wait(self, interval: float = SGE_WAIT) -> None: while not self.finished: time.sleep(interval) interval = min(2.0 * interval, 60) - self.finished = os.system(f"qstat -j {self.name} > /dev/null") + self.finished = os.system(f"qstat -j {self.name} > /dev/null") class JobGroup(object): From c9dd926815836d831dcb45974dbf72ee22b0d9eb Mon Sep 17 00:00:00 2001 From: TSL-RamKrishna Date: Thu, 21 Jan 2021 11:19:47 +0000 Subject: [PATCH 07/13] share code for SGE and SLURM --- pyani/scripts/average_nucleotide_identity.py | 7 ++----- 1 file changed, 2 insertions(+), 5 deletions(-) diff --git a/pyani/scripts/average_nucleotide_identity.py b/pyani/scripts/average_nucleotide_identity.py index b17d7e6d..99558b5c 100755 --- a/pyani/scripts/average_nucleotide_identity.py +++ b/pyani/scripts/average_nucleotide_identity.py @@ -671,12 +671,9 @@ def run_blast( ) else: logger.info("All multiprocessing jobs complete.") - elif args.scheduler == "SGE": + elif args.scheduler == "SGE" or args.scheduler.upper() == "SLURM": logger.info("Running dependency graph with SGE") - run_slurm.run_dependency_graph(jobgraph) - elif args.scheduler.upper() == "SLURM": - logger.info("Running dependency graph with SLURM") - run_slurm.run_dependency_graph(jobgraph) + run_sge.run_dependency_graph(jobgraph) else: logger.error(f"Scheduler {args.scheduler} not recognised (exiting)") raise SystemError(1) From 2f77e3e06ff25c71f0b4eb79d51962c003c2bf00 Mon Sep 17 00:00:00 2001 From: TSL-RamKrishna Date: Thu, 21 Jan 2021 11:39:38 +0000 Subject: [PATCH 08/13] sge and slurm code separated again --- pyani/scripts/average_nucleotide_identity.py | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/pyani/scripts/average_nucleotide_identity.py b/pyani/scripts/average_nucleotide_identity.py index 99558b5c..7e1ffb27 100755 --- a/pyani/scripts/average_nucleotide_identity.py +++ b/pyani/scripts/average_nucleotide_identity.py @@ -152,7 +152,7 @@ ) from pyani import run_multiprocessing as run_mp #from pyani import run_sge -from pyani import run_slurm +from pyani import run_sge, run_slurm from pyani.pyani_config import params_mpl, ALIGNDIR, FRAGSIZE, TETRA_FILESTEMS from pyani.logger import config_logger @@ -671,9 +671,12 @@ def run_blast( ) else: logger.info("All multiprocessing jobs complete.") - elif args.scheduler == "SGE" or args.scheduler.upper() == "SLURM": + elif args.scheduler == "SGE": logger.info("Running dependency graph with SGE") run_sge.run_dependency_graph(jobgraph) + elif args.scheduler.upper() == "SLURM": + logger.info("Running dependency graph with SLURM") + run_slurm.run_dependency_graph(jobgraph) else: logger.error(f"Scheduler {args.scheduler} not recognised (exiting)") raise SystemError(1) From 1a2b0ce7d62c6a6c6f1df30e5217901a7bbdb808 Mon Sep 17 00:00:00 2001 From: TSL-RamKrishna Date: Tue, 2 Feb 2021 16:55:15 +0000 Subject: [PATCH 09/13] blastn commands saved to a random filename and run from there --- pyani/anib.py | 12 ++++++++++-- 1 file changed, 10 insertions(+), 2 deletions(-) diff --git a/pyani/anib.py b/pyani/anib.py index 80b2947c..a5a9d9d0 100644 --- a/pyani/anib.py +++ b/pyani/anib.py @@ -85,6 +85,7 @@ import re import shutil import subprocess +import secrets from logging import Logger from pathlib import Path @@ -431,14 +432,21 @@ def construct_blastn_cmdline( :param blastn_exe: str, path to blastn executable """ prefix = outdir / f"{fname1.stem.replace('-fragments', '')}_vs_{fname2.stem}" + command = "#!/bin/bash\n" + f"{blastn_exe} -out {prefix}.blast_tab -query {fname1} -db {fname2} -xdrop_gap_final 150 -dust no -evalue 1e-15 -max_target_seqs 1 -outfmt '6 qseqid sseqid length mismatch pident nident qlen slen qstart qend sstart send positive ppos gaps' -task blastn" + fname = secrets.token_hex(nbytes=16) # generates random string to use as filename + print(command) + with open("jobs/" + fname, 'w') as fh: + fh.write(command + "\n") + return "bash ./jobs/" + fname + """ return ( f"{blastn_exe} -out {prefix}.blast_tab -query {fname1} -db {fname2} " "-xdrop_gap_final 150 -dust no -evalue 1e-15 -max_target_seqs 1 -outfmt " - "'6 qseqid sseqid length mismatch pident nident qlen slen " + " '6 qseqid sseqid length mismatch pident nident qlen slen " "qstart qend sstart send positive ppos gaps' " "-task blastn" ) - + """ # Generate single BLASTALL command line def construct_blastall_cmdline( From 572b37caa7da1d1b4cc4ae3574186d09668ab2b4 Mon Sep 17 00:00:00 2001 From: TSL-RamKrishna Date: Tue, 2 Feb 2021 16:56:21 +0000 Subject: [PATCH 10/13] cumval set to 0 for scheduler slurm and sge --- pyani/pyani_jobs.py | 1 + 1 file changed, 1 insertion(+) diff --git a/pyani/pyani_jobs.py b/pyani/pyani_jobs.py index bf7fe65c..8448ed4f 100644 --- a/pyani/pyani_jobs.py +++ b/pyani/pyani_jobs.py @@ -193,6 +193,7 @@ def generate_script(self) -> None: # now, add the command to run the job self.script += "\n" + self.script += "echo " + self.command + "\n" self.script += self.command self.script += "\n" From b3babc6a6b2f334b0f98ffca61da2a02083222a5 Mon Sep 17 00:00:00 2001 From: TSL-RamKrishna Date: Tue, 2 Feb 2021 16:59:38 +0000 Subject: [PATCH 11/13] cumval set to 0 for scheduler slurm and sge --- pyani/scripts/average_nucleotide_identity.py | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/pyani/scripts/average_nucleotide_identity.py b/pyani/scripts/average_nucleotide_identity.py index 7e1ffb27..096bea9b 100755 --- a/pyani/scripts/average_nucleotide_identity.py +++ b/pyani/scripts/average_nucleotide_identity.py @@ -671,12 +671,17 @@ def run_blast( ) else: logger.info("All multiprocessing jobs complete.") + print ("multiprocessing scheduler : cumval ", cumval) elif args.scheduler == "SGE": logger.info("Running dependency graph with SGE") run_sge.run_dependency_graph(jobgraph) + cumval = 0 + logger.info("All SGE jobs complete.") elif args.scheduler.upper() == "SLURM": logger.info("Running dependency graph with SLURM") run_slurm.run_dependency_graph(jobgraph) + cumval = 0 + logger.info("All SLURM jobs complete.") else: logger.error(f"Scheduler {args.scheduler} not recognised (exiting)") raise SystemError(1) From d0113d2d74267ac800ac57f790996b9213bbc946 Mon Sep 17 00:00:00 2001 From: TSL-RamKrishna Date: Wed, 3 Feb 2021 17:07:02 +0000 Subject: [PATCH 12/13] cleaning dead code --- pyani/anib.py | 9 --------- 1 file changed, 9 deletions(-) diff --git a/pyani/anib.py b/pyani/anib.py index a5a9d9d0..1d1b89dd 100644 --- a/pyani/anib.py +++ b/pyani/anib.py @@ -438,15 +438,6 @@ def construct_blastn_cmdline( with open("jobs/" + fname, 'w') as fh: fh.write(command + "\n") return "bash ./jobs/" + fname - """ - return ( - f"{blastn_exe} -out {prefix}.blast_tab -query {fname1} -db {fname2} " - "-xdrop_gap_final 150 -dust no -evalue 1e-15 -max_target_seqs 1 -outfmt " - " '6 qseqid sseqid length mismatch pident nident qlen slen " - "qstart qend sstart send positive ppos gaps' " - "-task blastn" - ) - """ # Generate single BLASTALL command line def construct_blastall_cmdline( From c62156c1ef00aaa567d06992de0c4138d1e71520 Mon Sep 17 00:00:00 2001 From: TSL-RamKrishna Date: Wed, 3 Feb 2021 17:07:28 +0000 Subject: [PATCH 13/13] remove trailing comma, not required --- pyani/run_slurm.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pyani/run_slurm.py b/pyani/run_slurm.py index 36fc1994..a548d93b 100644 --- a/pyani/run_slurm.py +++ b/pyani/run_slurm.py @@ -303,7 +303,7 @@ def submit_safe_jobs( args += " --dependency=afterok:" for dep in job.dependencies: # get the jobid using the jobname - squeue_cmd_for_jobid = "squeue -n " + dep.name + " | grep -v JOBID | awk '{print $1}' | sed 's/_/ /' | awk '{print $1}' | sort | uniq", + squeue_cmd_for_jobid = "squeue -n " + dep.name + " | grep -v JOBID | awk '{print $1}' | sed 's/_/ /' | awk '{print $1}' | sort | uniq" squeue_jobid = subprocess.Popen(squeue_cmd_for_jobid, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE) jobid, err = squeue_jobid.communicate()