From c605a3d87ce0dcd683c183663cd98aee08b34ded Mon Sep 17 00:00:00 2001 From: "Michael R. Crusoe" Date: Fri, 17 Aug 2018 10:25:38 -0700 Subject: [PATCH 1/6] CWL:fix support for user space overrides --- setup.py | 2 +- src/toil/cwl/cwltoil.py | 808 +++++++++++++++++++++------------------- 2 files changed, 431 insertions(+), 379 deletions(-) diff --git a/setup.py b/setup.py index c6dfabda0e..06040ebb06 100644 --- a/setup.py +++ b/setup.py @@ -34,7 +34,7 @@ def runSetup(): gcs = 'google-cloud-storage==1.6.0' gcs_oauth2_boto_plugin = 'gcs_oauth2_boto_plugin==1.14' apacheLibcloud = 'apache-libcloud==2.2.1' - cwltool = 'cwltool==1.0.20180518123035' + cwltool = 'cwltool==1.0.20180809224403' schemaSalad = 'schema-salad>=2.6, <3' galaxyLib = 'galaxy-lib==17.9.3' htcondor = 'htcondor>=8.6.0' diff --git a/src/toil/cwl/cwltoil.py b/src/toil/cwl/cwltoil.py index 70466408fd..598c55b262 100755 --- a/src/toil/cwl/cwltoil.py +++ b/src/toil/cwl/cwltoil.py @@ -1,4 +1,4 @@ -# Implement support for Common Workflow Language (CWL) for Toil. +""" Implement support for Common Workflow Language (CWL) for Toil.""" # # Copyright (C) 2015 Curoverse, Inc # Copyright (C) 2016 UCSC Computational Genomics Lab @@ -20,11 +20,18 @@ from builtins import range from builtins import object -from toil.job import Job, Promise -from toil.common import Config, Toil, addOptions -from toil.version import baseVersion - +import abc import argparse +import os +import tempfile +import json +import sys +import logging +import copy +import functools +from typing import Text, Mapping, MutableSequence +import hashlib + import cwltool.errors import cwltool.load_tool import cwltool.main @@ -38,29 +45,23 @@ from cwltool.pathmapper import (PathMapper, adjustDirObjs, adjustFileObjs, get_listing, MapperEnt, visit_class, normalizeFilesDirs) -from cwltool.process import (shortname, fillInDefaults, compute_checksums, - collectFilesAndDirs) +from cwltool.process import (shortname, fill_in_defaults, compute_checksums, + collectFilesAndDirs, add_sizes) from cwltool.software_requirements import ( - DependenciesConfiguration, get_container_from_software_requirements) -from cwltool.utils import aslist, add_sizes + DependenciesConfiguration, get_container_from_software_requirements) +from cwltool.utils import aslist import schema_salad.validate as validate -from ruamel.yaml.comments import CommentedSeq import schema_salad.ref_resolver -import os -import tempfile -import json -import sys -import logging -import copy -import functools -from typing import Text -import hashlib # Python 3 compatibility imports from six import iteritems, string_types -import six.moves.urllib.parse as urlparse +from six.moves.urllib import parse as urlparse import six +from toil.job import Job, Promise +from toil.common import Config, Toil, addOptions +from toil.version import baseVersion + cwllogger = logging.getLogger("cwltool") # Define internal jobs we should avoid submitting to batch systems and logging @@ -79,10 +80,11 @@ class IndirectDict(dict): - """Type tag to indicate a dict is an IndirectDict that needs to resolved.""" + """Tag to indicate a dict is an IndirectDict that needs to resolved.""" pass +@six.add_metaclass(abc.ABCMeta) class MergeInputs(object): """Base type for workflow step inputs that are connected to multiple upstream inputs that must be merged into a single array. @@ -90,7 +92,9 @@ class MergeInputs(object): def __init__(self, sources): self.sources = sources + @abc.abstractmethod def resolve(self): + """Resolves the inputs.""" raise NotImplementedError() @@ -108,14 +112,14 @@ class MergeInputsFlattened(MergeInputs): """ def resolve(self): - r = [] - for v in self.sources: - v = v[1][v[0]] - if isinstance(v, list): - r.extend(v) + result = [] + for promise in self.sources: + source = promise[1][promise[0]] + if isinstance(source, MutableSequence): + result.extend(source) else: - r.append(v) - return r + result.append(source) + return result class StepValueFrom(object): @@ -129,8 +133,9 @@ def __init__(self, expr, inner, req): self.req = req def do_eval(self, inputs, ctx): - return cwltool.expression.do_eval(self.expr, inputs, self.req, - None, None, {}, context=ctx) + """Evalute ourselves.""" + return cwltool.expression.do_eval( + self.expr, inputs, self.req, None, None, {}, context=ctx) class DefaultWithSource(object): @@ -140,65 +145,65 @@ def __init__(self, default, source): self.source = source def resolve(self): + """Determine the final input value.""" if self.source: result = self.source[1][self.source[0]] if result: return result return self.default -def _resolve_indirect_inner(d): + +def _resolve_indirect_inner(maybe_idict): """Resolve the contents an indirect dictionary (containing promises) to produce a dictionary actual values, including merging multiple sources into a single input. - """ - if isinstance(d, IndirectDict): - r = {} - for k, v in list(d.items()): - if isinstance(v, (MergeInputs, DefaultWithSource)): - r[k] = v.resolve() + if isinstance(maybe_idict, IndirectDict): + result = {} + for key, value in list(maybe_idict.items()): + if isinstance(value, (MergeInputs, DefaultWithSource)): + result[key] = value.resolve() else: - r[k] = v[1].get(v[0]) - return r - else: - return d + result[key] = value[1].get(value[0]) + return result + return maybe_idict -def resolve_indirect(d): +def resolve_indirect(pdict): """Resolve the contents an indirect dictionary (containing promises) and evaluate expressions to produce the dictionary of actual values. - """ - inner = IndirectDict() if isinstance(d, IndirectDict) else {} - needEval = False - for k, v in iteritems(d): - if isinstance(v, StepValueFrom): - inner[k] = v.inner - needEval = True + inner = IndirectDict() if isinstance(pdict, IndirectDict) else {} + needs_eval = False + for k, value in iteritems(pdict): + if isinstance(value, StepValueFrom): + inner[k] = value.inner + needs_eval = True else: - inner[k] = v + inner[k] = value res = _resolve_indirect_inner(inner) - if needEval: + if needs_eval: ev = {} - for k, v in iteritems(d): - if isinstance(v, StepValueFrom): - ev[k] = v.do_eval(res, res[k]) + for k, value in iteritems(pdict): + if isinstance(value, StepValueFrom): + ev[k] = value.do_eval(res, res[k]) else: ev[k] = res[k] return ev - else: - return res + return res + -def simplify_list(l): +def simplify_list(maybe_list): """Turn a length one list loaded by cwltool into a scalar. - Anything else is passed as-is, by reference.""" - if isinstance(l, CommentedSeq): - l = aslist(l) - if len(l)==1: - return l[0] - return l + Anything else is passed as-is, by reference.""" + if isinstance(maybe_list, MutableSequence): + is_list = aslist(maybe_list) + if len(is_list) == 1: + return is_list[0] + return maybe_list + class ToilPathMapper(PathMapper): """ToilPathMapper keeps track of a file's symbolic identifier (the Toil @@ -214,8 +219,8 @@ def __init__(self, referenced_files, basedir, stagedir, stage_listing=False): self.get_file = get_file self.stage_listing = stage_listing - super(ToilPathMapper, self).__init__(referenced_files, basedir, - stagedir, separateDirs=separateDirs) + super(ToilPathMapper, self).__init__( + referenced_files, basedir, stagedir, separateDirs=separateDirs) def visit(self, obj, stagedir, basedir, copy=False, staged=False): # type: (Dict[Text, Any], Text, Text, bool, bool) -> None @@ -224,46 +229,56 @@ def visit(self, obj, stagedir, basedir, copy=False, staged=False): return if obj["class"] == "Directory": if obj["location"].startswith("file://"): - resolved = schema_salad.ref_resolver.uri_file_path(obj["location"]) + resolved = schema_salad.ref_resolver.uri_file_path( + obj["location"]) else: resolved = obj["location"] - self._pathmap[obj["location"]] = MapperEnt(resolved, tgt, "WritableDirectory" if copy else "Directory", staged) - if obj["location"].startswith("file://") and not self.stage_listing: + self._pathmap[obj["location"]] = MapperEnt( + resolved, tgt, + "WritableDirectory" if copy else "Directory", staged) + if obj["location"].startswith("file://") \ + and not self.stage_listing: staged = False - self.visitlisting(obj.get("listing", []), tgt, basedir, copy=copy, staged=staged) + self.visitlisting( + obj.get("listing", []), tgt, basedir, copy=copy, staged=staged) elif obj["class"] == "File": loc = obj["location"] if "contents" in obj and obj["location"].startswith("_:"): - self._pathmap[obj["location"]] = MapperEnt(obj["contents"], tgt, "CreateFile", staged) + self._pathmap[obj["location"]] = MapperEnt( + obj["contents"], tgt, "CreateFile", staged) else: resolved = self.get_file(loc) if self.get_file else loc if resolved.startswith("file:"): - resolved = schema_salad.ref_resolver.uri_file_path(resolved) - self._pathmap[loc] = MapperEnt(resolved, tgt, "WritableFile" if copy else "File", staged) - self.visitlisting(obj.get("secondaryFiles", []), stagedir, basedir, copy=copy, staged=staged) + resolved = schema_salad.ref_resolver.uri_file_path( + resolved) + self._pathmap[loc] = MapperEnt( + resolved, tgt, "WritableFile" if copy else "File", staged) + self.visitlisting(obj.get("secondaryFiles", []), + stagedir, basedir, copy=copy, staged=staged) class ToilCommandLineTool(cwltool.command_line_tool.CommandLineTool): """Subclass the cwltool command line tool to provide the custom Toil.PathMapper. - """ - def makePathMapper(self, reffiles, stagedir, **kwargs): - return ToilPathMapper(reffiles, kwargs["basedir"], stagedir, - separateDirs=kwargs.get("separateDirs", True), - get_file=kwargs["toil_get_file"]) + def make_path_mapper(self, reffiles, stagedir, runtimeContext, + separateDirs): + return ToilPathMapper( + reffiles, runtimeContext.basedir, stagedir, separateDirs, + runtimeContext.toil_get_file) -def toilMakeTool(toolpath_object, **kwargs): +def toil_make_tool(toolpath_object, loading_context): """Factory function passed to load_tool() which creates instances of the custom ToilCommandLineTool. """ - if isinstance(toolpath_object, dict) and toolpath_object.get("class") == "CommandLineTool": - return ToilCommandLineTool(toolpath_object, **kwargs) - return cwltool.workflow.defaultMakeTool(toolpath_object, **kwargs) + if isinstance(toolpath_object, Mapping) \ + and toolpath_object.get("class") == "CommandLineTool": + return ToilCommandLineTool(toolpath_object, loading_context) + return cwltool.workflow.default_make_tool(toolpath_object, loading_context) class ToilFsAccess(cwltool.stdfsaccess.StdFsAccess): @@ -271,33 +286,32 @@ class ToilFsAccess(cwltool.stdfsaccess.StdFsAccess): """ - def __init__(self, basedir, fileStore=None): - self.fileStore = fileStore + def __init__(self, basedir, file_store=None): + self.file_store = file_store super(ToilFsAccess, self).__init__(basedir) - def _abs(self, p): - if p.startswith("toilfs:"): - return self.fileStore.readGlobalFile(p[7:]) - else: - return super(ToilFsAccess, self)._abs(p) + def _abs(self, path): + if path.startswith("toilfs:"): + return self.file_store.readGlobalFile(path[7:]) + return super(ToilFsAccess, self)._abs(path) -def toilGetFile(fileStore, index, existing, fileStoreID): +def toil_get_file(file_store, index, existing, file_store_id): """Get path to input file from Toil jobstore.""" - if not fileStoreID.startswith("toilfs:"): - return schema_salad.ref_resolver.file_uri(fileStoreID) - srcPath = fileStore.readGlobalFile(fileStoreID[7:]) - index[srcPath] = fileStoreID - existing[fileStoreID] = srcPath - return schema_salad.ref_resolver.file_uri(srcPath) + if not file_store_id.startswith("toilfs:"): + return schema_salad.ref_resolver.file_uri(file_store_id) + src_path = file_store.readGlobalFile(file_store_id[7:]) + index[src_path] = file_store_id + existing[file_store_id] = src_path + return schema_salad.ref_resolver.file_uri(src_path) -def writeFile(writeFunc, index, existing, x): +def write_file(writeFunc, index, existing, x): """Write a file into the Toil jobstore. - 'existing' is a set of files retrieved as inputs from toilGetFile. This ensures - they are mapped back as the same name if passed through. + 'existing' is a set of files retrieved as inputs from toil_get_file. This + ensures they are mapped back as the same name if passed through. """ # Toil fileStore reference @@ -340,16 +354,16 @@ def uploadFile(uploadfunc, fileindex, existing, uf, skip_broken=False): if skip_broken: return else: - raise cwltool.errors.WorkflowException("File is missing: %s" % uf["location"]) - uf["location"] = writeFile(uploadfunc, - fileindex, - existing, - uf["location"]) + raise cwltool.errors.WorkflowException( + "File is missing: %s" % uf["location"]) + uf["location"] = write_file( + uploadfunc, fileindex, existing, uf["location"]) -def writeGlobalFileWrapper(fileStore, fileuri): +def writeGlobalFileWrapper(file_store, fileuri): """Wrap writeGlobalFile to accepts file:// URIs""" - return fileStore.writeGlobalFile(schema_salad.ref_resolver.uri_file_path(fileuri)) + return file_store.writeGlobalFile( + schema_salad.ref_resolver.uri_file_path(fileuri)) class ResolveIndirect(Job): @@ -362,17 +376,19 @@ def __init__(self, cwljob): super(ResolveIndirect, self).__init__() self.cwljob = cwljob - def run(self, fileStore): + def run(self, file_store): return resolve_indirect(self.cwljob) -def toilStageFiles(fileStore, cwljob, outdir, index, existing, export, destBucket=None): +def toilStageFiles(file_store, cwljob, outdir, index, existing, export, + destBucket=None): """Copy input files out of the global file store and update location and path.""" jobfiles = [] # type: List[Dict[Text, Any]] collectFilesAndDirs(cwljob, jobfiles) - pm = ToilPathMapper(jobfiles, "", outdir, separateDirs=False, stage_listing=True) + pm = ToilPathMapper( + jobfiles, "", outdir, separateDirs=False, stage_listing=True) for f, p in pm.items(): if not p.staged: continue @@ -385,16 +401,17 @@ def toilStageFiles(fileStore, cwljob, outdir, index, existing, export, destBucke # Remove the staging directory from the filepath and # form the destination URL unstageTargetPath = p.target[len(outdir):] - destUrl = '/'.join(s.strip('/') for s in [destBucket, unstageTargetPath]) + destUrl = '/'.join(s.strip('/') + for s in [destBucket, unstageTargetPath]) - fileStore.exportFile(p.resolved[7:], destUrl) + file_store.exportFile(p.resolved[7:], destUrl) continue if not os.path.exists(os.path.dirname(p.target)): os.makedirs(os.path.dirname(p.target), 0o0755) if p.type == "File": - fileStore.exportFile(p.resolved[7:], "file://" + p.target) + file_store.exportFile(p.resolved[7:], "file://" + p.target) elif p.type == "Directory" and not os.path.exists(p.target): os.makedirs(p.target, 0o0755) elif p.type == "CreateFile": @@ -402,7 +419,8 @@ def toilStageFiles(fileStore, cwljob, outdir, index, existing, export, destBucke n.write(p.resolved.encode("utf-8")) def _check_adjust(f): - f["location"] = schema_salad.ref_resolver.file_uri(pm.mapper(f["location"])[1]) + f["location"] = schema_salad.ref_resolver.file_uri( + pm.mapper(f["location"])[1]) if "contents" in f: del f["contents"] return f @@ -416,136 +434,145 @@ class CWLJobWrapper(Job): """ - def __init__(self, tool, cwljob, **kwargs): - super(CWLJobWrapper, self).__init__(cores=1, - memory=1024*1024, - disk=8*1024) + def __init__(self, tool, cwljob, runtime_context): + super(CWLJobWrapper, self).__init__( + cores=1, memory=1024*1024, disk=8*1024) self.cwltool = remove_pickle_problems(tool) self.cwljob = cwljob - self.kwargs = kwargs + self.runtime_context = runtime_context - def run(self, fileStore): + def run(self, file_store): cwljob = resolve_indirect(self.cwljob) - fillInDefaults(self.cwltool.tool['inputs'], cwljob) - options = copy.deepcopy(self.kwargs) - options['jobobj'] = cwljob - realjob = CWLJob(self.cwltool, cwljob, **options) + fill_in_defaults( + self.cwltool.tool['inputs'], cwljob, + self.runtime_context.make_fs_access( + self.runtime_context.basedir or "")) + realjob = CWLJob(self.cwltool, cwljob, self.runtime_context) self.addChild(realjob) return realjob.rv() -def _makeNestedTempDir(top,seed,levels=2): +def _makeNestedTempDir(top, seed, levels=2): """ - Gets a temporary directory in the hierarchy of directories under a given top directory. + Gets a temporary directory in the hierarchy of directories under a given + top directory. - This exists to avoid placing too many temporary directories under a single top - in a flat structure, which can slow down metadata updates such as deletes on the - local file system. + This exists to avoid placing too many temporary directories under a single + top in a flat structure, which can slow down metadata updates such as + deletes on the local file system. - The seed parameter allows for deterministic placement of the created directory. - The seed is hashed into hex digest and the directory structure is created from - the initial letters of the digest. + The seed parameter allows for deterministic placement of the created + directory. The seed is hashed into hex digest and the directory structure + is created from the initial letters of the digest. :param top : string, top directory for the hierarchy :param seed : string, the hierarchy will be generated from this seed string - :rtype : string, path to temporary directory - will be created when necessary. + :rtype : string, path to temporary directory - will be created when + necessary. """ # Valid chars for the creation of temporary directories validDirs = hashlib.md5(six.b(str(seed))).hexdigest() tempDir = top - for i in range(max(min(levels,len(validDirs)),1)): + for i in range(max(min(levels, len(validDirs)), 1)): tempDir = os.path.join(tempDir, validDirs[i]) if not os.path.exists(tempDir): try: os.makedirs(tempDir) except os.error: - if not os.path.exists(tempDir): # In the case that a collision occurs and + if not os.path.exists(tempDir): + # In the case that a collision occurs and # it is created while we wait then we ignore raise return tempDir + class CWLJob(Job): """Execute a CWL tool using cwltool.main.single_job_executor""" - def __init__(self, tool, cwljob, **kwargs): + def __init__(self, tool, cwljob, runtime_context, step_inputs=None): self.cwltool = remove_pickle_problems(tool) - if 'builder' in kwargs: - builder = kwargs["builder"] + if runtime_context.builder: + builder = runtime_context.builder else: - builder = cwltool.builder.Builder() - builder.job = cwljob + builder = cwltool.builder.Builder(cwljob) builder.requirements = self.cwltool.requirements builder.outdir = None builder.tmpdir = None - builder.timeout = kwargs.get('eval_timeout') + builder.timeout = runtime_context.eval_timeout builder.resources = {} - req = tool.evalResources(builder, {}) + req = tool.evalResources(builder, runtime_context) # pass the default of None if basecommand is empty unitName = self.cwltool.tool.get("baseCommand", None) - if isinstance(unitName, (list, tuple)): + if isinstance(unitName, (MutableSequence, tuple)): unitName = ' '.join(unitName) - super(CWLJob, self).__init__(cores=req["cores"], - memory=int(req["ram"]*(2**20)), - disk=int((req["tmpdirSize"]*(2**20)) + (req["outdirSize"]*(2**20))), - unitName=unitName) + super(CWLJob, self).__init__( + cores=req["cores"], memory=int(req["ram"]*(2**20)), + disk=int((req["tmpdirSize"]*(2**20))+(req["outdirSize"]*(2**20))), + unitName=unitName) self.cwljob = cwljob try: self.jobName = str(self.cwltool.tool['id']) except KeyError: - # fall back to the Toil defined class name if the tool doesn't have an identifier + # fall back to the Toil defined class name if the tool doesn't have + # an identifier pass - self.step_inputs = kwargs.get("step_inputs", self.cwltool.tool["inputs"]) - self.executor_options = kwargs + self.runtime_context = runtime_context + self.step_inputs = step_inputs or self.cwltool.tool["inputs"] + self.workdir = runtime_context.workdir - def run(self, fileStore): + def run(self, file_store): cwljob = resolve_indirect(self.cwljob) - fillInDefaults(self.step_inputs, cwljob) + fill_in_defaults( + self.step_inputs, cwljob, + self.runtime_context.make_fs_access("")) - opts = copy.deepcopy(self.executor_options) # Exports temporary directory for batch systems that reset TMPDIR - os.environ["TMPDIR"] = os.path.realpath(opts.pop("tmpdir", None) or fileStore.getLocalTempDir()) - outdir = os.path.join(fileStore.getLocalTempDir(), "out") + os.environ["TMPDIR"] = os.path.realpath( + self.runtime_context.tmpdir or file_store.getLocalTempDir()) + outdir = os.path.join(file_store.getLocalTempDir(), "out") os.mkdir(outdir) - top_tmp_outdir = (opts.pop("workdir", None) or os.environ["TMPDIR"]) - tmp_outdir_prefix = os.path.join(_makeNestedTempDir(top=top_tmp_outdir,seed=outdir,levels=2),"out_tmpdir") + top_tmp_outdir = self.workdir or os.environ["TMPDIR"] + tmp_outdir_prefix = os.path.join( + _makeNestedTempDir(top=top_tmp_outdir, seed=outdir, levels=2), + "out_tmpdir") index = {} existing = {} - opts.update({'t': self.cwltool, 'job_order_object': cwljob, - 'basedir': os.getcwd(), 'outdir': outdir, - 'tmp_outdir_prefix': tmp_outdir_prefix, - 'tmpdir_prefix': fileStore.getLocalTempDir(), - 'make_fs_access': functools.partial(ToilFsAccess, fileStore=fileStore), - 'toil_get_file': functools.partial(toilGetFile, fileStore, index, existing)}) - del opts['job_order'] - + runtime_context = self.runtime_context.copy() + runtime_context.basedir = os.getcwd() + runtime_context.outdir = outdir + runtime_context.tmp_outdir_prefix = tmp_outdir_prefix + runtime_context.tmpdir_prefix = file_store.getLocalTempDir() + runtime_context.make_fs_access = functools.partial( + ToilFsAccess, file_store=file_store) + runtime_context.toil_get_file = functools.partial( + toil_get_file, file_store, index, existing) # Run the tool - (output, status) = cwltool.main.single_job_executor(**opts) + (output, status) = cwltool.executors.SingleJobExecutor().execute( + self.cwltool, cwljob, runtime_context, cwllogger) if status != "success": raise cwltool.errors.WorkflowException(status) - adjustDirObjs(output, functools.partial(get_listing, - cwltool.stdfsaccess.StdFsAccess(outdir), - recursive=True)) + adjustDirObjs(output, functools.partial( + get_listing, cwltool.stdfsaccess.StdFsAccess(outdir), + recursive=True)) - adjustFileObjs(output, functools.partial(uploadFile, - functools.partial(writeGlobalFileWrapper, fileStore), - index, existing)) + adjustFileObjs(output, functools.partial( + uploadFile, functools.partial(writeGlobalFileWrapper, file_store), + index, existing)) return output -def makeJob(tool, jobobj, **kwargs): +def makeJob(tool, jobobj, step_inputs, runtime_context): """Create the correct Toil Job object for the CWL tool (workflow, job, or job wrapper for dynamic resource requirements.) """ if tool.tool["class"] == "Workflow": - options = copy.deepcopy(kwargs) - options.update({'cwlwf': tool, 'cwljob': jobobj}) - wfjob = CWLWorkflow(**options) + wfjob = CWLWorkflow(tool, jobobj, runtime_context) followOn = ResolveIndirect(wfjob.rv()) wfjob.addFollowOn(followOn) return (wfjob, followOn) @@ -557,18 +584,10 @@ def makeJob(tool, jobobj, **kwargs): "tmpdirMin", "tmpdirMax", "outdirMin", "outdirMax"): r = resourceReq.get(req) if isinstance(r, string_types) and ("$(" in r or "${" in r): - # Found a dynamic resource requirement so use a job wrapper. - options = copy.deepcopy(kwargs) - options.update({ - 'tool': tool, - 'cwljob': jobobj}) - job = CWLJobWrapper(**options) + # Found a dynamic resource requirement so use a job wrapper + job = CWLJobWrapper(tool, jobobj, runtime_context) return (job, job) - options = copy.deepcopy(kwargs) - options.update({ - 'tool': tool, - 'cwljob': jobobj}) - job = CWLJob(**options) + job = CWLJob(tool, jobobj, runtime_context) return (job, job) @@ -578,26 +597,32 @@ class CWLScatter(Job): """ - def __init__(self, step, cwljob, **kwargs): + def __init__(self, step, cwljob, runtime_context): super(CWLScatter, self).__init__() self.step = step self.cwljob = cwljob - self.executor_options = kwargs + self.runtime_context = runtime_context - def flat_crossproduct_scatter(self, joborder, scatter_keys, outputs, postScatterEval): + def flat_crossproduct_scatter(self, + joborder, + scatter_keys, + outputs, postScatterEval): scatter_key = shortname(scatter_keys[0]) for n in range(0, len(joborder[scatter_key])): jo = copy.copy(joborder) jo[scatter_key] = joborder[scatter_key][n] if len(scatter_keys) == 1: jo = postScatterEval(jo) - (subjob, followOn) = makeJob(self.step.embedded_tool, jo, **self.executor_options) + (subjob, followOn) = makeJob( + self.step.embedded_tool, jo, None, self.runtime_context) self.addChild(subjob) outputs.append(followOn.rv()) else: - self.flat_crossproduct_scatter(jo, scatter_keys[1:], outputs, postScatterEval) + self.flat_crossproduct_scatter( + jo, scatter_keys[1:], outputs, postScatterEval) - def nested_crossproduct_scatter(self, joborder, scatter_keys, postScatterEval): + def nested_crossproduct_scatter(self, + joborder, scatter_keys, postScatterEval): scatter_key = shortname(scatter_keys[0]) outputs = [] for n in range(0, len(joborder[scatter_key])): @@ -605,14 +630,16 @@ def nested_crossproduct_scatter(self, joborder, scatter_keys, postScatterEval): jo[scatter_key] = joborder[scatter_key][n] if len(scatter_keys) == 1: jo = postScatterEval(jo) - (subjob, followOn) = makeJob(self.step.embedded_tool, jo, **self.executor_options) + (subjob, followOn) = makeJob( + self.step.embedded_tool, jo, None, self.runtime_context) self.addChild(subjob) outputs.append(followOn.rv()) else: - outputs.append(self.nested_crossproduct_scatter(jo, scatter_keys[1:], postScatterEval)) + outputs.append(self.nested_crossproduct_scatter( + jo, scatter_keys[1:], postScatterEval)) return outputs - def run(self, fileStore): + def run(self, file_store): cwljob = resolve_indirect(self.cwljob) if isinstance(self.step.tool["scatter"], string_types): @@ -625,7 +652,8 @@ def run(self, fileStore): scatterMethod = "dotproduct" outputs = [] - valueFrom = {shortname(i["id"]): i["valueFrom"] for i in self.step.tool["inputs"] if "valueFrom" in i} + valueFrom = {shortname(i["id"]): i["valueFrom"] + for i in self.step.tool["inputs"] if "valueFrom" in i} def postScatterEval(io): shortio = {shortname(k): v for k, v in iteritems(io)} @@ -647,20 +675,25 @@ def valueFromFunc(k, v): for sc in [shortname(x) for x in scatter]: copyjob[sc] = cwljob[sc][i] copyjob = postScatterEval(copyjob) - (subjob, followOn) = makeJob(self.step.embedded_tool, copyjob, **self.executor_options) + (subjob, follow_on) = makeJob( + self.step.embedded_tool, copyjob, None, + self.runtime_context) self.addChild(subjob) - outputs.append(followOn.rv()) + outputs.append(follow_on.rv()) elif scatterMethod == "nested_crossproduct": - outputs = self.nested_crossproduct_scatter(cwljob, scatter, postScatterEval) + outputs = self.nested_crossproduct_scatter( + cwljob, scatter, postScatterEval) elif scatterMethod == "flat_crossproduct": - self.flat_crossproduct_scatter(cwljob, scatter, outputs, postScatterEval) + self.flat_crossproduct_scatter( + cwljob, scatter, outputs, postScatterEval) else: if scatterMethod: raise validate.ValidationException( "Unsupported complex scatter type '%s'" % scatterMethod) else: raise validate.ValidationException( - "Must provide scatterMethod to scatter over multiple inputs") + "Must provide scatterMethod to scatter over multiple" + " inputs.") return outputs @@ -677,17 +710,17 @@ def __init__(self, step, outputs): self.outputs = outputs def allkeys(self, obj, keys): - if isinstance(obj, dict): + if isinstance(obj, Mapping): for k in list(obj.keys()): keys.add(k) - elif isinstance(obj, list): + elif isinstance(obj, MutableSequence): for l in obj: self.allkeys(l, keys) def extract(self, obj, k): - if isinstance(obj, dict): + if isinstance(obj, Mapping): return obj.get(k) - elif isinstance(obj, list): + elif isinstance(obj, MutableSequence): cp = [] for l in obj: cp.append(self.extract(l, k)) @@ -695,11 +728,11 @@ def extract(self, obj, k): else: return [] - def run(self, fileStore): + def run(self, file_store): outobj = {} def sn(n): - if isinstance(n, dict): + if isinstance(n, Mapping): return shortname(n["id"]) if isinstance(n, string_types): return shortname(n) @@ -728,7 +761,8 @@ def hasChild(self, c): def remove_pickle_problems(obj): - """doc_loader does not pickle correctly, causing Toil errors, remove from objects. + """doc_loader does not pickle correctly, causing Toil errors, remove from + objects. """ if hasattr(obj, "doc_loader"): obj.doc_loader = None @@ -745,16 +779,14 @@ class CWLWorkflow(Job): """ - def __init__(self, cwlwf, cwljob, **kwargs): + def __init__(self, cwlwf, cwljob, runtime_context): super(CWLWorkflow, self).__init__() self.cwlwf = cwlwf self.cwljob = cwljob - self.executor_options = kwargs - if "step_inputs" in self.executor_options: - del self.executor_options["step_inputs"] + self.runtime_context = runtime_context self.cwlwf = remove_pickle_problems(self.cwlwf) - def run(self, fileStore): + def run(self, file_store): cwljob = resolve_indirect(self.cwljob) # `promises` dict @@ -793,85 +825,105 @@ def run(self, fileStore): for inp in step.tool["inputs"]: key = shortname(inp["id"]) if "source" in inp: - if inp.get("linkMerge") or len(aslist(inp["source"])) > 1: - linkMerge = inp.get("linkMerge", "merge_nested") + if inp.get("linkMerge") \ + or len(aslist(inp["source"])) > 1: + linkMerge = inp.get( + "linkMerge", "merge_nested") if linkMerge == "merge_nested": jobobj[key] = ( - MergeInputsNested([(shortname(s), promises[s].rv()) - for s in aslist(inp["source"])])) + MergeInputsNested( + [(shortname(s), + promises[s].rv()) + for s in aslist( + inp["source"])])) elif linkMerge == "merge_flattened": jobobj[key] = ( - MergeInputsFlattened([(shortname(s), promises[s].rv()) - for s in aslist(inp["source"])])) + MergeInputsFlattened( + [(shortname(s), + promises[s].rv()) + for s in aslist( + inp["source"])])) else: raise validate.ValidationException( - "Unsupported linkMerge '%s'", linkMerge) + "Unsupported linkMerge '%s'" % + linkMerge) else: - inputSource = inp["source"] - if isinstance(inputSource, CommentedSeq): - # It seems that an input source with a '#' in the name will be - # returned as a CommentedSeq list by the yaml parser. - inputSource = str(inputSource[0]) - jobobj[key] = (shortname(inputSource), - promises[inputSource].rv()) + inpSource = inp["source"] + if isinstance(inpSource, MutableSequence): + # It seems that an input source with a + # '#' in the name will be returned as a + # CommentedSeq list by the yaml parser. + inpSource = str(inpSource[0]) + jobobj[key] = (shortname(inpSource), + promises[inpSource].rv()) if "default" in inp: if key in jobobj: if isinstance(jobobj[key][1], Promise): d = copy.copy(inp["default"]) - jobobj[key] = DefaultWithSource(d, jobobj[key]) + jobobj[key] = DefaultWithSource( + d, jobobj[key]) else: - if jobobj[key][1][jobobj[key][0]] is None: + if jobobj[key][1][ + jobobj[key][0]] is None: d = copy.copy(inp["default"]) - jobobj[key] = ("default", {"default": d}) + jobobj[key] = ( + "default", {"default": d}) else: d = copy.copy(inp["default"]) jobobj[key] = ("default", {"default": d}) - if "valueFrom" in inp and "scatter" not in step.tool: + if "valueFrom" in inp \ + and "scatter" not in step.tool: if key in jobobj: - jobobj[key] = StepValueFrom(inp["valueFrom"], - jobobj[key], - self.cwlwf.requirements) + jobobj[key] = StepValueFrom( + inp["valueFrom"], jobobj[key], + self.cwlwf.requirements) else: - jobobj[key] = StepValueFrom(inp["valueFrom"], - ("None", {"None": None}), - self.cwlwf.requirements) + jobobj[key] = StepValueFrom( + inp["valueFrom"], ( + "None", {"None": None}), + self.cwlwf.requirements) if "scatter" in step.tool: - wfjob = CWLScatter(step, IndirectDict(jobobj), **self.executor_options) + wfjob = CWLScatter(step, IndirectDict(jobobj), + self.runtime_context) followOn = CWLGather(step, wfjob.rv()) wfjob.addFollowOn(followOn) else: - (wfjob, followOn) = makeJob(step.embedded_tool, IndirectDict(jobobj), - step_inputs=step.tool["inputs"], - **self.executor_options) + (wfjob, followOn) = makeJob( + step.embedded_tool, IndirectDict(jobobj), + step.tool["inputs"], + self.runtime_context) jobs[step.tool["id"]] = followOn connected = False for inp in step.tool["inputs"]: for s in aslist(inp.get("source", [])): - if (isinstance(promises[s], - (CWLJobWrapper, CWLGather)) and + if (isinstance( + promises[s], (CWLJobWrapper, CWLGather) + ) and not promises[s].hasFollowOn(wfjob)): - promises[s].addFollowOn(wfjob) - connected = True - if (not isinstance(promises[s], - (CWLJobWrapper, CWLGather)) and + promises[s].addFollowOn(wfjob) + connected = True + if (not isinstance( + promises[s], (CWLJobWrapper, CWLGather) + ) and not promises[s].hasChild(wfjob)): - promises[s].addChild(wfjob) - connected = True + promises[s].addChild(wfjob) + connected = True if not connected: - # workflow step has default inputs only, isn't connected to other jobs, - # so add it as child of workflow. + # the workflow step has default inputs only & isn't + # connected to other jobs, so add it as child of + # this workflow. self.addChild(wfjob) for out in step.tool["outputs"]: promises[out["id"]] = followOn for inp in step.tool["inputs"]: - for s in aslist(inp.get("source", [])): - if s not in promises: + for source in aslist(inp.get("source", [])): + if source not in promises: alloutputs_fufilled = False # may need a test @@ -884,19 +936,20 @@ def run(self, fileStore): for out in self.cwlwf.tool["outputs"]: key = shortname(out["id"]) if out.get("linkMerge") or len(aslist(out["outputSource"])) > 1: - linkMerge = out.get("linkMerge", "merge_nested") - if linkMerge == "merge_nested": + link_merge = out.get("linkMerge", "merge_nested") + if link_merge == "merge_nested": outobj[key] = ( - MergeInputsNested([(shortname(s), promises[s].rv()) - for s in aslist(out["outputSource"])])) - elif linkMerge == "merge_flattened": + MergeInputsNested( + [(shortname(s), promises[s].rv()) + for s in aslist(out["outputSource"])])) + elif link_merge == "merge_flattened": outobj[key] = ( - MergeInputsFlattened([ - (shortname(s), promises[s].rv()) - for s in aslist(out["source"])])) + MergeInputsFlattened([ + (shortname(s), promises[s].rv()) + for s in aslist(out["source"])])) else: raise validate.ValidationException( - "Unsupported linkMerge '{}'".format(linkMerge)) + "Unsupported linkMerge '{}'".format(link_merge)) else: # A CommentedSeq of length one still appears here rarely - @@ -904,31 +957,18 @@ def run(self, fileStore): # the execution by causing a non-hashable type exception. # We simplify the list into its first (and only) element. src = simplify_list(out["outputSource"]) - outobj[key] = (shortname(src), - promises[src].rv()) + outobj[key] = (shortname(src), promises[src].rv()) return IndirectDict(outobj) -cwltool.process.supportedProcessRequirements = ("DockerRequirement", - "ExpressionEngineRequirement", - "InlineJavascriptRequirement", - "InitialWorkDirRequirement", - "SchemaDefRequirement", - "EnvVarRequirement", - "CreateFileRequirement", - "SubworkflowFeatureRequirement", - "ScatterFeatureRequirement", - "ShellCommandRequirement", - "MultipleInputFeatureRequirement", - "StepInputExpressionRequirement", - "ResourceRequirement") - - -def unsupportedRequirementsCheck(requirements): - """Check for specific requirement cases we don't support. - """ - pass +cwltool.process.supportedProcessRequirements = ( + "DockerRequirement", "ExpressionEngineRequirement", + "InlineJavascriptRequirement", "InitialWorkDirRequirement", + "SchemaDefRequirement", "EnvVarRequirement", "CreateFileRequirement", + "SubworkflowFeatureRequirement", "ScatterFeatureRequirement", + "ShellCommandRequirement", "MultipleInputFeatureRequirement", + "StepInputExpressionRequirement", "ResourceRequirement") def visitSteps(t, op): @@ -939,6 +979,7 @@ def visitSteps(t, op): def main(args=None, stdout=sys.stdout): + """Main method for toil-cwl-runner.""" config = Config() config.cwl = True parser = argparse.ArgumentParser() @@ -950,50 +991,53 @@ def main(args=None, stdout=sys.stdout): # user to select jobStore or get a default from logic one below. parser.add_argument("--jobStore", type=str) parser.add_argument("--not-strict", action="store_true") - parser.add_argument("--quiet", dest="logLevel", action="store_const", const="ERROR") + parser.add_argument("--quiet", dest="logLevel", action="store_const", + const="ERROR") parser.add_argument("--basedir", type=str) parser.add_argument("--outdir", type=str, default=os.getcwd()) parser.add_argument("--version", action='version', version=baseVersion) dockergroup = parser.add_mutually_exclusive_group() - dockergroup.add_argument("--user-space-docker-cmd", - help="(Linux/OS X only) Specify a user space docker " - "command (like udocker or dx-docker) that will be " - "used to call 'pull' and 'run'") - dockergroup.add_argument("--singularity", action="store_true", - default=False, help="[experimental] Use " - "Singularity runtime for running containers. " - "Requires Singularity v2.3.2+ and Linux with kernel " - "version v3.18+ or with overlayfs support " - "backported.") - dockergroup.add_argument("--no-container", action="store_true", - help="Do not execute jobs in a " - "Docker container, even when `DockerRequirement` " - "is specified under `hints`.") - parser.add_argument("--preserve-environment", type=str, nargs='+', - help="Preserve specified environment variables when running CommandLineTools", - metavar=("VAR1 VAR2"), - default=("PATH",), - dest="preserve_environment") - parser.add_argument("--destBucket", type=str, - help="Specify a cloud bucket endpoint for output files.") - # help="Dependency resolver configuration file describing how to adapt 'SoftwareRequirement' packages to current system." - parser.add_argument("--beta-dependency-resolvers-configuration", default=None) - # help="Defaut root directory used by dependency resolvers configuration." + dockergroup.add_argument( + "--user-space-docker-cmd", + help="(Linux/OS X only) Specify a user space docker command (like " + "udocker or dx-docker) that will be used to call 'pull' and 'run'") + dockergroup.add_argument( + "--singularity", action="store_true", default=False, + help="[experimental] Use Singularity runtime for running containers. " + "Requires Singularity v2.3.2+ and Linux with kernel version v3.18+ or " + "with overlayfs support backported.") + dockergroup.add_argument( + "--no-container", action="store_true", help="Do not execute jobs in a " + "Docker container, even when `DockerRequirement` " + "is specified under `hints`.") + parser.add_argument( + "--preserve-environment", type=str, nargs='+', + help="Preserve specified environment variables when running" + " CommandLineTools", metavar=("VAR1 VAR2"), default=("PATH",), + dest="preserve_environment") + parser.add_argument( + "--destBucket", type=str, + help="Specify a cloud bucket endpoint for output files.") + parser.add_argument( + "--beta-dependency-resolvers-configuration", default=None) parser.add_argument("--beta-dependencies-directory", default=None) - # help="Use biocontainers for tools without an explicitly annotated Docker container." - parser.add_argument("--beta-use-biocontainers", default=None, action="store_true") - # help="Short cut to use Conda to resolve 'SoftwareRequirement' packages." - parser.add_argument("--beta-conda-dependencies", default=None, action="store_true") + parser.add_argument( + "--beta-use-biocontainers", default=None, action="store_true") + parser.add_argument( + "--beta-conda-dependencies", default=None, action="store_true") parser.add_argument("--tmpdir-prefix", type=Text, help="Path prefix for temporary directories", default="tmp") parser.add_argument("--tmp-outdir-prefix", type=Text, help="Path prefix for intermediate output directories", default="tmp") - parser.add_argument("--force-docker-pull", action="store_true", default=False, dest="force_docker_pull", - help="Pull latest docker image even if it is locally present") - parser.add_argument("--no-match-user", action="store_true", default=False, - help="Disable passing the current uid to `docker run --user`") + parser.add_argument( + "--force-docker-pull", action="store_true", default=False, + dest="force_docker_pull", + help="Pull latest docker image even if it is locally present") + parser.add_argument( + "--no-match-user", action="store_true", default=False, + help="Disable passing the current uid to `docker run --user`") # mkdtemp actually creates the directory, but # toil requires that the directory not exist, @@ -1005,7 +1049,7 @@ def main(args=None, stdout=sys.stdout): if args is None: args = sys.argv[1:] - #we use workdir as jobStore: + # we use workdir as jobStore: options = parser.parse_args([workdir] + args) use_container = not options.no_container @@ -1019,23 +1063,26 @@ def main(args=None, stdout=sys.stdout): fileindex = {} existing = {} - make_tool_kwargs = {} - conf_file = getattr(options, "beta_dependency_resolvers_configuration", None) # Text - use_conda_dependencies = getattr(options, "beta_conda_dependencies", None) # Text + conf_file = getattr(options, + "beta_dependency_resolvers_configuration", None) + use_conda_dependencies = getattr(options, "beta_conda_dependencies", None) job_script_provider = None if conf_file or use_conda_dependencies: - dependencies_configuration = DependenciesConfiguration(options) # type: DependenciesConfiguration + dependencies_configuration = DependenciesConfiguration(options) job_script_provider = dependencies_configuration options.default_container = None - make_tool_kwargs["find_default_container"] = functools.partial(find_default_container, options) + runtime_context = cwltool.context.RuntimeContext(vars(options)) + runtime_context.find_default_container = functools.partial( + find_default_container, options) + runtime_context.workdir = workdir + loading_context = cwltool.context.LoadingContext(vars(options)) with Toil(options) as toil: if options.restart: outobj = toil.restart() else: - useStrict = not options.not_strict - make_tool_kwargs["hints"] = [{ + loading_context.hints = [{ "class": "ResourceRequirement", "coresMin": toil.config.defaultCores, "ramMin": toil.config.defaultMemory / (2**20), @@ -1043,78 +1090,81 @@ def main(args=None, stdout=sys.stdout): "tmpdirMin": 0 }] try: - t = cwltool.load_tool.load_tool(options.cwltool, toilMakeTool, - kwargs=make_tool_kwargs, - resolver=cwltool.resolver.tool_resolver, - strict=useStrict) - unsupportedRequirementsCheck(t.requirements) - except cwltool.process.UnsupportedRequirement as e: - logging.error(e) + loading_context.construct_tool_object = toil_make_tool + loading_context.resolver = cwltool.resolver.tool_resolver + loading_context.strict = not options.not_strict + t = cwltool.load_tool.load_tool( + options.cwltool, loading_context) + except cwltool.process.UnsupportedRequirement as err: + logging.error(err) return 33 - if type(t) == int: + if isinstance(t, int): return t options.workflow = options.cwltool options.job_order = options.cwljob + _, tool_file_uri = cwltool.main.resolve_tool_uri( + options.cwltool, loading_context.resolver, + loading_context.fetcher_constructor) options.tool_help = None options.debug = options.logLevel == "DEBUG" job, options.basedir, loader = cwltool.main.load_job_order( - options, sys.stdin, None, [], options.job_order) - job = cwltool.main.init_job_order(job, options, t, loader=loader) - - fillInDefaults(t.tool["inputs"], job) - - def pathToLoc(p): - if "location" not in p and "path" in p: - p["location"] = p["path"] - del p["path"] - - def importFiles(tool): - visit_class(tool, ("File", "Directory"), pathToLoc) - visit_class(tool, ("File", ), add_sizes) + options, sys.stdin, None, [], tool_file_uri) + job = cwltool.main.init_job_order(job, options, t, loader, + sys.stdout) + fs_access = cwltool.stdfsaccess.StdFsAccess(options.basedir) + fill_in_defaults(t.tool["inputs"], job, fs_access) + + def path_to_loc(obj): + if "location" not in obj and "path" in obj: + obj["location"] = obj["path"] + del obj["path"] + + def import_files(tool): + visit_class(tool, ("File", "Directory"), path_to_loc) + visit_class(tool, ("File", ), functools.partial( + add_sizes, fs_access)) normalizeFilesDirs(tool) - adjustDirObjs(tool, functools.partial(get_listing, - cwltool.stdfsaccess.StdFsAccess(""), - recursive=True)) - adjustFileObjs(tool, functools.partial(uploadFile, - toil.importFile, - fileindex, existing, skip_broken=True)) + adjustDirObjs(tool, functools.partial( + get_listing, fs_access, recursive=True)) + adjustFileObjs(tool, functools.partial( + uploadFile, toil.importFile, fileindex, existing, + skip_broken=True)) - t.visit(importFiles) + t.visit(import_files) for inp in t.tool["inputs"]: - def setSecondary(fileobj): - if isinstance(fileobj, dict) and fileobj.get("class") == "File": + def set_secondary(fileobj): + if isinstance(fileobj, Mapping) \ + and fileobj.get("class") == "File": if "secondaryFiles" not in fileobj: - fileobj["secondaryFiles"] = [{ - "location": cwltool.builder.substitute(fileobj["location"], sf), "class": "File"} - for sf in inp["secondaryFiles"]] + fileobj["secondaryFiles"] = [ + {"location": cwltool.builder.substitute( + fileobj["location"], sf), "class": "File"} + for sf in inp["secondaryFiles"]] - if isinstance(fileobj, list): - for e in fileobj: - setSecondary(e) + if isinstance(fileobj, MutableSequence): + for entry in fileobj: + set_secondary(entry) if shortname(inp["id"]) in job and inp.get("secondaryFiles"): - setSecondary(job[shortname(inp["id"])]) + set_secondary(job[shortname(inp["id"])]) - importFiles(job) - visitSteps(t, importFiles) + import_files(job) + visitSteps(t, import_files) try: - make_opts = copy.deepcopy(vars(options)) - make_opts.update({'tool': t, - 'jobobj': {}, - 'use_container': use_container, - 'tmpdir': os.path.realpath(tmpdir_prefix), - 'tmp_outdir_prefix' : os.path.realpath(tmp_outdir_prefix), - 'job_script_provider': job_script_provider, - 'force_docker_pull': options.force_docker_pull, - 'no_match_user': options.no_match_user}) - - (wf1, wf2) = makeJob(**make_opts) - except cwltool.process.UnsupportedRequirement as e: - logging.error(e) + runtime_context.use_container = use_container + runtime_context.tmpdir = os.path.realpath(tmpdir_prefix) + runtime_context.tmp_outdir_prefix = os.path.realpath( + tmp_outdir_prefix) + runtime_context.job_script_provider = job_script_provider + runtime_context.force_docker_pull = options.force_docker_pull + runtime_context.no_match_user = options.no_match_user + (wf1, _) = makeJob(t, {}, None, runtime_context) + except cwltool.process.UnsupportedRequirement as err: + logging.error(err) return 33 wf1.cwljob = job @@ -1135,7 +1185,8 @@ def setSecondary(fileobj): destBucket=options.destBucket) if not options.destBucket: - visit_class(outobj, ("File",), functools.partial(compute_checksums, cwltool.stdfsaccess.StdFsAccess(""))) + visit_class(outobj, ("File",), functools.partial( + compute_checksums, cwltool.stdfsaccess.StdFsAccess(""))) stdout.write(json.dumps(outobj, indent=4)) @@ -1147,6 +1198,7 @@ def find_default_container(args, builder): if args.default_container: default_container = args.default_container elif args.beta_use_biocontainers: - default_container = get_container_from_software_requirements(args, builder) + default_container = get_container_from_software_requirements( + args, builder) return default_container From 13ec5ac73151b5a171ee5ae040c95521396e9906 Mon Sep 17 00:00:00 2001 From: "Michael R. Crusoe" Date: Sun, 19 Aug 2018 11:07:35 -0700 Subject: [PATCH 2/6] fix bioconda --- setup.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/setup.py b/setup.py index 06040ebb06..5325c99f04 100644 --- a/setup.py +++ b/setup.py @@ -34,7 +34,7 @@ def runSetup(): gcs = 'google-cloud-storage==1.6.0' gcs_oauth2_boto_plugin = 'gcs_oauth2_boto_plugin==1.14' apacheLibcloud = 'apache-libcloud==2.2.1' - cwltool = 'cwltool==1.0.20180809224403' + cwltool = 'cwltool==1.0.20180819175200' schemaSalad = 'schema-salad>=2.6, <3' galaxyLib = 'galaxy-lib==17.9.3' htcondor = 'htcondor>=8.6.0' From 27c17de1867a4bfab176e0b72c1e815ba0ba9a28 Mon Sep 17 00:00:00 2001 From: "Michael R. Crusoe" Date: Mon, 20 Aug 2018 00:12:38 -0700 Subject: [PATCH 3/6] latest CWL conformance tests --- src/toil/test/cwl/cwlTest.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/toil/test/cwl/cwlTest.py b/src/toil/test/cwl/cwlTest.py index f33a29f915..fbc9d90ed8 100644 --- a/src/toil/test/cwl/cwlTest.py +++ b/src/toil/test/cwl/cwlTest.py @@ -145,7 +145,7 @@ def test_run_conformance(self, batchSystem=None): cwlSpec = os.path.join(rootDir, 'src/toil/test/cwl/spec') workDir = os.path.join(cwlSpec, 'v1.0') # The latest cwl git hash. Update it to get the latest tests. - testhash = "f96bca6911b6688ff614c02dbefe819bed260a13" + testhash = "22490926651174c6cbe01c76c2ded3c9e8d0ee6f" url = "https://github.com/common-workflow-language/common-workflow-language/archive/%s.zip" % testhash if not os.path.exists(cwlSpec): urlretrieve(url, "spec.zip") From 0aacb4445922125255f2ed0091cc685d227ad81c Mon Sep 17 00:00:00 2001 From: "Michael R. Crusoe" Date: Mon, 20 Aug 2018 00:18:09 -0700 Subject: [PATCH 4/6] strip generation from final outputs --- src/toil/cwl/cwltoil.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/src/toil/cwl/cwltoil.py b/src/toil/cwl/cwltoil.py index 598c55b262..8778955413 100755 --- a/src/toil/cwl/cwltoil.py +++ b/src/toil/cwl/cwltoil.py @@ -50,6 +50,7 @@ from cwltool.software_requirements import ( DependenciesConfiguration, get_container_from_software_requirements) from cwltool.utils import aslist +from cwltool.mutation import MutationManager import schema_salad.validate as validate import schema_salad.ref_resolver @@ -1188,6 +1189,7 @@ def set_secondary(fileobj): visit_class(outobj, ("File",), functools.partial( compute_checksums, cwltool.stdfsaccess.StdFsAccess(""))) + visit_class(outobj, ("File", ), MutationManager().unset_generation) stdout.write(json.dumps(outobj, indent=4)) return 0 From 91a0c28ca1a7573db71c78383b2c407b69f87f13 Mon Sep 17 00:00:00 2001 From: "Michael R. Crusoe" Date: Mon, 20 Aug 2018 00:59:13 -0700 Subject: [PATCH 5/6] remove duplicate CWL logging --- src/toil/cwl/cwltoil.py | 5 +++-- src/toil/realtimeLogger.py | 2 +- 2 files changed, 4 insertions(+), 3 deletions(-) diff --git a/src/toil/cwl/cwltoil.py b/src/toil/cwl/cwltoil.py index 8778955413..5fdb2eb7ea 100755 --- a/src/toil/cwl/cwltoil.py +++ b/src/toil/cwl/cwltoil.py @@ -42,6 +42,8 @@ import cwltool.stdfsaccess import cwltool.command_line_tool +from cwltool.loghandler import _logger as cwllogger +from cwltool.loghandler import defaultStreamHandler from cwltool.pathmapper import (PathMapper, adjustDirObjs, adjustFileObjs, get_listing, MapperEnt, visit_class, normalizeFilesDirs) @@ -63,8 +65,6 @@ from toil.common import Config, Toil, addOptions from toil.version import baseVersion -cwllogger = logging.getLogger("cwltool") - # Define internal jobs we should avoid submitting to batch systems and logging CWL_INTERNAL_JOBS = ("CWLJobWrapper", "CWLWorkflow", "CWLScatter", "CWLGather", "ResolveIndirect") @@ -981,6 +981,7 @@ def visitSteps(t, op): def main(args=None, stdout=sys.stdout): """Main method for toil-cwl-runner.""" + cwllogger.removeHandler(defaultStreamHandler) config = Config() config.cwl = True parser = argparse.ArgumentParser() diff --git a/src/toil/realtimeLogger.py b/src/toil/realtimeLogger.py index 37120b9851..99fe7763b0 100644 --- a/src/toil/realtimeLogger.py +++ b/src/toil/realtimeLogger.py @@ -159,7 +159,7 @@ def _setEnv(name, value): _setEnv('ADDRESS', '%s:%i' % (ip, port)) _setEnv('LEVEL', level) else: - log.info('Real-time logging disabled') + log.debug('Real-time logging disabled') else: if level: log.warn('Ignoring nested request to start real-time logging') From 769a0f48833ad0364ddcf7eef7c6b9ddc9e082cc Mon Sep 17 00:00:00 2001 From: "Michael R. Crusoe" Date: Mon, 20 Aug 2018 01:55:25 -0700 Subject: [PATCH 6/6] bring job initialization up to date --- setup.py | 2 +- src/toil/cwl/cwltoil.py | 100 +++++++++++++++++++++++++--------------- src/toil/leader.py | 5 -- 3 files changed, 64 insertions(+), 43 deletions(-) diff --git a/setup.py b/setup.py index c1922926da..8d01e089d1 100644 --- a/setup.py +++ b/setup.py @@ -34,7 +34,7 @@ def runSetup(): gcs = 'google-cloud-storage==1.6.0' gcs_oauth2_boto_plugin = 'gcs_oauth2_boto_plugin==1.14' apacheLibcloud = 'apache-libcloud==2.2.1' - cwltool = 'cwltool==1.0.20180819175200' + cwltool = 'cwltool==1.0.20180820141117' schemaSalad = 'schema-salad>=2.6, <3' galaxyLib = 'galaxy-lib==17.9.3' htcondor = 'htcondor>=8.6.0' diff --git a/src/toil/cwl/cwltoil.py b/src/toil/cwl/cwltoil.py index 5fdb2eb7ea..69717a9a25 100755 --- a/src/toil/cwl/cwltoil.py +++ b/src/toil/cwl/cwltoil.py @@ -32,6 +32,14 @@ from typing import Text, Mapping, MutableSequence import hashlib +# Python 3 compatibility imports +from six import iteritems, string_types +from six.moves.urllib import parse as urlparse +import six + +from schema_salad import validate +import schema_salad.ref_resolver + import cwltool.errors import cwltool.load_tool import cwltool.main @@ -49,17 +57,11 @@ normalizeFilesDirs) from cwltool.process import (shortname, fill_in_defaults, compute_checksums, collectFilesAndDirs, add_sizes) +from cwltool.secrets import SecretStore from cwltool.software_requirements import ( DependenciesConfiguration, get_container_from_software_requirements) from cwltool.utils import aslist from cwltool.mutation import MutationManager -import schema_salad.validate as validate -import schema_salad.ref_resolver - -# Python 3 compatibility imports -from six import iteritems, string_types -from six.moves.urllib import parse as urlparse -import six from toil.job import Job, Promise from toil.common import Config, Toil, addOptions @@ -96,7 +98,6 @@ def __init__(self, sources): @abc.abstractmethod def resolve(self): """Resolves the inputs.""" - raise NotImplementedError() class MergeInputsNested(MergeInputs): @@ -488,7 +489,7 @@ def _makeNestedTempDir(top, seed, levels=2): class CWLJob(Job): - """Execute a CWL tool using cwltool.main.single_job_executor""" + """Execute a CWL tool using cwltool.executors.SingleJobExecutor""" def __init__(self, tool, cwljob, runtime_context, step_inputs=None): self.cwltool = remove_pickle_problems(tool) @@ -527,6 +528,13 @@ def run(self, file_store): fill_in_defaults( self.step_inputs, cwljob, self.runtime_context.make_fs_access("")) + for inp_id in cwljob.keys(): + found = False + for field in self.cwltool.inputs_record_schema['fields']: + if field['name'] == inp_id: + found = True + if not found: + cwljob.pop(inp_id) # Exports temporary directory for batch systems that reset TMPDIR os.environ["TMPDIR"] = os.path.realpath( @@ -546,7 +554,7 @@ def run(self, file_store): runtime_context.tmp_outdir_prefix = tmp_outdir_prefix runtime_context.tmpdir_prefix = file_store.getLocalTempDir() runtime_context.make_fs_access = functools.partial( - ToilFsAccess, file_store=file_store) + ToilFsAccess, file_store=file_store) runtime_context.toil_get_file = functools.partial( toil_get_file, file_store, index, existing) # Run the tool @@ -1078,6 +1086,8 @@ def main(args=None, stdout=sys.stdout): runtime_context.find_default_container = functools.partial( find_default_container, options) runtime_context.workdir = workdir + runtime_context.move_outputs = "leave" + runtime_context.rm_tmpdir = False loading_context = cwltool.context.LoadingContext(vars(options)) with Toil(options) as toil: @@ -1091,32 +1101,47 @@ def main(args=None, stdout=sys.stdout): "outdirMin": toil.config.defaultDisk / (2**20), "tmpdirMin": 0 }] - try: - loading_context.construct_tool_object = toil_make_tool - loading_context.resolver = cwltool.resolver.tool_resolver - loading_context.strict = not options.not_strict - t = cwltool.load_tool.load_tool( - options.cwltool, loading_context) - except cwltool.process.UnsupportedRequirement as err: - logging.error(err) - return 33 - - if isinstance(t, int): - return t - + loading_context.construct_tool_object = toil_make_tool + loading_context.resolver = cwltool.resolver.tool_resolver + loading_context.strict = not options.not_strict options.workflow = options.cwltool options.job_order = options.cwljob - _, tool_file_uri = cwltool.main.resolve_tool_uri( + uri, tool_file_uri = cwltool.load_tool.resolve_tool_uri( options.cwltool, loading_context.resolver, loading_context.fetcher_constructor) options.tool_help = None options.debug = options.logLevel == "DEBUG" - job, options.basedir, loader = cwltool.main.load_job_order( - options, sys.stdin, None, [], tool_file_uri) - job = cwltool.main.init_job_order(job, options, t, loader, - sys.stdout) + job_order_object, options.basedir, jobloader = \ + cwltool.main.load_job_order( + options, sys.stdin, loading_context.fetcher_constructor, + loading_context.overrides_list, tool_file_uri) + document_loader, workflowobj, uri = \ + cwltool.load_tool.fetch_document( + uri, loading_context.resolver, + loading_context.fetcher_constructor) + document_loader, avsc_names, processobj, metadata, uri = \ + cwltool.load_tool.validate_document( + document_loader, workflowobj, uri, + loading_context.enable_dev, loading_context.strict, False, + loading_context.fetcher_constructor, False, + loading_context.overrides_list, + do_validate=loading_context.do_validate) + loading_context.overrides_list.extend( + metadata.get("cwltool:overrides", [])) + try: + tool = cwltool.load_tool.make_tool( + document_loader, avsc_names, metadata, uri, + loading_context) + except cwltool.process.UnsupportedRequirement as err: + logging.error(err) + return 33 + runtime_context.secret_store = SecretStore() + initialized_job_order = cwltool.main.init_job_order( + job_order_object, options, tool, jobloader, sys.stdout, + secret_store=runtime_context.secret_store) fs_access = cwltool.stdfsaccess.StdFsAccess(options.basedir) - fill_in_defaults(t.tool["inputs"], job, fs_access) + fill_in_defaults( + tool.tool["inputs"], initialized_job_order, fs_access) def path_to_loc(obj): if "location" not in obj and "path" in obj: @@ -1134,9 +1159,9 @@ def import_files(tool): uploadFile, toil.importFile, fileindex, existing, skip_broken=True)) - t.visit(import_files) + tool.visit(import_files) - for inp in t.tool["inputs"]: + for inp in tool.tool["inputs"]: def set_secondary(fileobj): if isinstance(fileobj, Mapping) \ and fileobj.get("class") == "File": @@ -1150,11 +1175,12 @@ def set_secondary(fileobj): for entry in fileobj: set_secondary(entry) - if shortname(inp["id"]) in job and inp.get("secondaryFiles"): - set_secondary(job[shortname(inp["id"])]) + if shortname(inp["id"]) in initialized_job_order \ + and inp.get("secondaryFiles"): + set_secondary(initialized_job_order[shortname(inp["id"])]) - import_files(job) - visitSteps(t, import_files) + import_files(initialized_job_order) + visitSteps(tool, import_files) try: runtime_context.use_container = use_container @@ -1164,12 +1190,12 @@ def set_secondary(fileobj): runtime_context.job_script_provider = job_script_provider runtime_context.force_docker_pull = options.force_docker_pull runtime_context.no_match_user = options.no_match_user - (wf1, _) = makeJob(t, {}, None, runtime_context) + (wf1, _) = makeJob(tool, {}, None, runtime_context) except cwltool.process.UnsupportedRequirement as err: logging.error(err) return 33 - wf1.cwljob = job + wf1.cwljob = initialized_job_order outobj = toil.start(wf1) outobj = resolve_indirect(outobj) diff --git a/src/toil/leader.py b/src/toil/leader.py index 8fe5fe9266..ce545598d5 100644 --- a/src/toil/leader.py +++ b/src/toil/leader.py @@ -27,11 +27,6 @@ import time import os -try: - import cPickle as pickle -except ImportError: - import pickle - from toil.lib.humanize import bytes2human from toil import resolveEntryPoint try: