diff --git a/.gitignore b/.gitignore index c834a3a4c..8491a1235 100644 --- a/.gitignore +++ b/.gitignore @@ -68,10 +68,12 @@ open_pipelines/ # Reserved files for comparison *RESERVE* +doc/ +build/ dist/ looper.egg-info/ loopercli.egg-info/ *ipynb_checkpoints* -hello_looper-master* \ No newline at end of file +hello_looper-master* diff --git a/.travis.yml b/.travis.yml index bcdd386a0..566f8749c 100644 --- a/.travis.yml +++ b/.travis.yml @@ -10,7 +10,7 @@ install: - pip install . - pip install -r requirements/requirements-dev.txt - pip install -r requirements/requirements-test.txt -script: pytest +script: pytest --cov=looper branches: only: - dev diff --git a/README.md b/README.md index b90ffeb6b..cb7bb71dd 100644 --- a/README.md +++ b/README.md @@ -3,4 +3,4 @@ [![Build Status](https://travis-ci.org/pepkit/looper.svg?branch=master)](https://travis-ci.org/pepkit/looper) [![PEP compatible](http://pepkit.github.io/img/PEP-compatible-green.svg)](http://pepkit.github.io) -`Looper` is a pipeline submission engine. The typical use case is to run a bioinformatics pipeline across many different input samples. Instructions are in the [documentation](http://code.databio.org/looper/). +`Looper` is a pipeline submission engine. The typical use case is to run a bioinformatics pipeline across many different input samples. Instructions are in the [documentation](http://looper.databio.org/). diff --git a/docs/README.md b/docs/README.md index 60678ad7c..cb2d868d5 100644 --- a/docs/README.md +++ b/docs/README.md @@ -24,13 +24,13 @@ Releases are posted as [GitHub releases](https://github.com/pepkit/looper/releas ``` -pip install --user https://github.com/pepkit/looper/zipball/master +pip install --user loopercli ``` Update with: ``` -pip install --user --upgrade https://github.com/pepkit/looper/zipball/master +pip install --user --upgrade loopercli ``` If the `looper` executable in not automatically in your `$PATH`, add the following line to your `.bashrc` or `.profile`: diff --git a/docs/autodoc_build/.gitignore b/docs/autodoc_build/.gitignore new file mode 100644 index 000000000..d6b7ef32c --- /dev/null +++ b/docs/autodoc_build/.gitignore @@ -0,0 +1,2 @@ +* +!.gitignore diff --git a/docs/autodoc_build/api.md b/docs/autodoc_build/api.md deleted file mode 100644 index 6a1fb503b..000000000 --- a/docs/autodoc_build/api.md +++ /dev/null @@ -1,284 +0,0 @@ -# Package looper Documentation - -## Class Project -Looper-specific NGS Project. - -**Parameters:** - -- `config_file` -- `str`: path to configuration file with data fromwhich Project is to be built -- `subproject` -- `str`: name indicating subproject to use, optional - - -### constants -Return key-value pairs of pan-Sample constants for this Project. -```python -def constants(self) -``` - -**Returns:** - -`Mapping`: collection of KV pairs, each representing a pairingof attribute name and attribute value - - - - -### derived\_columns -Collection of sample attributes for which value of each is derived from elsewhere -```python -def derived_columns(self) -``` - -**Returns:** - -`list[str]`: sample attribute names for which value is derived - - - - -### implied\_columns -Collection of sample attributes for which value of each is implied by other(s) -```python -def implied_columns(self) -``` - -**Returns:** - -`list[str]`: sample attribute names for which value is implied by other(s) - - - - -### num\_samples -Count the number of samples available in this Project. -```python -def num_samples(self) -``` - -**Returns:** - -`int`: number of samples available in this Project. - - - - -### output\_dir -Directory in which to place results and submissions folders. - -By default, assume that the project's configuration file specifies -an output directory, and that this is therefore available within -the project metadata. If that assumption does not hold, though, -consider the folder in which the project configuration file lives -to be the project's output directory. -```python -def output_dir(self) -``` - -**Returns:** - -`str`: path to the project's output directory, either asspecified in the configuration file or the folder that contains the project's configuration file. - - - - -### project\_folders -Keys for paths to folders to ensure exist. -```python -def project_folders(self) -``` - - - - -### protocols -Determine this Project's unique protocol names. -```python -def protocols(self) -``` - -**Returns:** - -`Set[str]`: collection of this Project's unique protocol names - - - - -### required\_metadata -Which metadata attributes are required. -```python -def required_metadata(self) -``` - - - - -### sample\_names -Names of samples of which this Project is aware. -```python -def sample_names(self) -``` - - - - -### samples -Generic/base Sample instance for each of this Project's samples. -```python -def samples(self) -``` - -**Returns:** - -`Iterable[Sample]`: Sample instance for eachof this Project's samples - - - - -### sheet -Annotations/metadata sheet describing this Project's samples. -```python -def sheet(self) -``` - -**Returns:** - -`pandas.core.frame.DataFrame`: table of samples in this Project - - - - -### subproject -Return currently active subproject or None if none was activated -```python -def subproject(self) -``` - -**Returns:** - -`str`: currently active subproject - - - - -### templates\_folder -Path to folder with default submission templates. -```python -def templates_folder(self) -``` - -**Returns:** - -`str`: path to folder with default submission templates - - - - -## Class MissingMetadataException -Project needs certain metadata. - - -## Class MissingSampleSheetError -Represent case in which sample sheet is specified but nonexistent. - - -## Class PipelineInterface -This class parses, holds, and returns information for a yaml file that specifies how to interact with each individual pipeline. This includes both resources to request for cluster job submission, as well as arguments to be passed from the sample annotation metadata to the pipeline - -**Parameters:** - -- `config` -- `str | Mapping`: path to file from which to parseconfiguration data, or pre-parsed configuration data. - - -### pipe\_iface -Old-way access to pipeline key-to-interface mapping -```python -def pipe_iface(self) -``` - -**Returns:** - -`Mapping`: Binding between pipeline key and interface data - - - - -### pipeline\_names -Names of pipelines about which this interface is aware. -```python -def pipeline_names(self) -``` - -**Returns:** - -`Iterable[str]`: names of pipelines about which thisinterface is aware - - - - -### pipelines\_path -Path to pipelines folder. -```python -def pipelines_path(self) -``` - -**Returns:** - -`str | None`: Path to pipelines folder, if configured withfile rather than with raw mapping. - - - - -### protomap -Access protocol mapping portion of this composite interface. -```python -def protomap(self) -``` - -**Returns:** - -`Mapping`: binding between protocol name and pipeline key. - - - - -## Class SubmissionConductor -Collects and then submits pipeline jobs. - -This class holds a 'pool' of commands to submit as a single cluster job. -Eager to submit a job, each instance's collection of commands expands until -it reaches the 'pool' has been filled, and it's therefore time to submit the -job. The pool fills as soon as a fill criteria has been reached, which can -be either total input file size or the number of individual commands. - - -### failed\_samples -```python -def failed_samples(self) -``` - - - -### num\_cmd\_submissions -Return the number of commands that this conductor has submitted. -```python -def num_cmd_submissions(self) -``` - -**Returns:** - -`int`: Number of commands submitted so far. - - - - -### num\_job\_submissions -Return the number of jobs that this conductor has submitted. -```python -def num_job_submissions(self) -``` - -**Returns:** - -`int`: Number of jobs submitted so far. - - - diff --git a/docs/autodoc_build/looper.md b/docs/autodoc_build/looper.md index c63c92a11..f7fa62d28 100644 --- a/docs/autodoc_build/looper.md +++ b/docs/autodoc_build/looper.md @@ -262,6 +262,125 @@ Project needs certain metadata. Represent case in which sample sheet is specified but nonexistent. +## Class SubmissionConductor +Collects and then submits pipeline jobs. + +This class holds a 'pool' of commands to submit as a single cluster job. +Eager to submit a job, each instance's collection of commands expands until +it reaches the 'pool' has been filled, and it's therefore time to submit the +job. The pool fills as soon as a fill criteria has been reached, which can +be either total input file size or the number of individual commands. + + +### add\_sample +Add a sample for submission to this conductor. +```python +def add_sample(self, sample, sample_subtype=, rerun=False) +``` + +**Parameters:** + +- `sample` -- `Sample`: sample to be included with this conductor'scurrently growing collection of command submissions +- `sample_subtype` -- `type`: specific subtype associatedwith this new sample; this is used to tailor-make the sample instance as required by its protocol/pipeline and supported by the pipeline interface. +- `rerun` -- `bool`: whether the given sample is being rerun rather thanrun for the first time + + +**Returns:** + +`bool`: Indication of whether the given sample was added tothe current 'pool.' + + +**Raises:** + +- `TypeError`: If sample subtype is provided but does not extendthe base Sample class, raise a TypeError. + + + + +### failed\_samples +```python +def failed_samples(self) +``` + + + +### num\_cmd\_submissions +Return the number of commands that this conductor has submitted. +```python +def num_cmd_submissions(self) +``` + +**Returns:** + +`int`: Number of commands submitted so far. + + + + +### num\_job\_submissions +Return the number of jobs that this conductor has submitted. +```python +def num_job_submissions(self) +``` + +**Returns:** + +`int`: Number of jobs submitted so far. + + + + +### submit +Submit command(s) as a job. + +This call will submit the commands corresponding to the current pool +of samples if and only if the argument to 'force' evaluates to a +true value, or the pool of samples is full. +```python +def submit(self, force=False) +``` + +**Parameters:** + +- `force` -- `bool`: Whether submission should be done/simulated evenif this conductor's pool isn't full. + + +**Returns:** + +`bool`: Whether a job was submitted (or would've been ifnot for dry run) + + + + +### write\_script +Create the script for job submission. +```python +def write_script(self, pool, template_values, prj_argtext, looper_argtext) +``` + +**Parameters:** + +- `template_values` -- `Mapping`: Collection of template placeholderkeys and the values with which to replace them. +- `prj_argtext` -- `str`: Command text related to Project data. +- `looper_argtext` -- `str`: Command text related to looper arguments. + + +**Returns:** + +`str`: Path to the job submission script created. + + + + +### write\_skipped\_sample\_scripts +For any sample skipped during initial processing, write submission script. +```python +def write_skipped_sample_scripts(self) +``` + + + + ## Class PipelineInterface This class parses, holds, and returns information for a yaml file that specifies how to interact with each individual pipeline. This includes both resources to request for cluster job submission, as well as arguments to be passed from the sample annotation metadata to the pipeline @@ -531,124 +650,5 @@ def uses_looper_args(self, pipeline_name) -## Class SubmissionConductor -Collects and then submits pipeline jobs. - -This class holds a 'pool' of commands to submit as a single cluster job. -Eager to submit a job, each instance's collection of commands expands until -it reaches the 'pool' has been filled, and it's therefore time to submit the -job. The pool fills as soon as a fill criteria has been reached, which can -be either total input file size or the number of individual commands. - - -### add\_sample -Add a sample for submission to this conductor. -```python -def add_sample(self, sample, sample_subtype=, rerun=False) -``` - -**Parameters:** - -- `sample` -- `Sample`: sample to be included with this conductor'scurrently growing collection of command submissions -- `sample_subtype` -- `type`: specific subtype associatedwith this new sample; this is used to tailor-make the sample instance as required by its protocol/pipeline and supported by the pipeline interface. -- `rerun` -- `bool`: whether the given sample is being rerun rather thanrun for the first time - - -**Returns:** - -`bool`: Indication of whether the given sample was added tothe current 'pool.' - - -**Raises:** - -- `TypeError`: If sample subtype is provided but does not extendthe base Sample class, raise a TypeError. - - - - -### failed\_samples -```python -def failed_samples(self) -``` - - - -### num\_cmd\_submissions -Return the number of commands that this conductor has submitted. -```python -def num_cmd_submissions(self) -``` - -**Returns:** - -`int`: Number of commands submitted so far. - - - - -### num\_job\_submissions -Return the number of jobs that this conductor has submitted. -```python -def num_job_submissions(self) -``` - -**Returns:** - -`int`: Number of jobs submitted so far. - - - - -### submit -Submit command(s) as a job. - -This call will submit the commands corresponding to the current pool -of samples if and only if the argument to 'force' evaluates to a -true value, or the pool of samples is full. -```python -def submit(self, force=False) -``` - -**Parameters:** - -- `force` -- `bool`: Whether submission should be done/simulated evenif this conductor's pool isn't full. - - -**Returns:** - -`bool`: Whether a job was submitted (or would've been ifnot for dry run) - - - - -### write\_script -Create the script for job submission. -```python -def write_script(self, pool, template_values, prj_argtext, looper_argtext) -``` - -**Parameters:** - -- `template_values` -- `Mapping`: Collection of template placeholderkeys and the values with which to replace them. -- `prj_argtext` -- `str`: Command text related to Project data. -- `looper_argtext` -- `str`: Command text related to looper arguments. - - -**Returns:** - -`str`: Path to the job submission script created. - - - - -### write\_skipped\_sample\_scripts -For any sample skipped during initial processing, write submission script. -```python -def write_skipped_sample_scripts(self) -``` - - - - -**Version Information**: `looper` v0.11.0dev, generated by `lucidoc` v0.3 \ No newline at end of file +**Version Information**: `looper` v0.11.0, generated by `lucidoc` v0.3 \ No newline at end of file diff --git a/docs/changelog.md b/docs/changelog.md index a7d8c0e1b..b37e0993a 100644 --- a/docs/changelog.md +++ b/docs/changelog.md @@ -2,7 +2,18 @@ This project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html) and [Keep a Changelog](https://keepachangelog.com/en/1.0.0/) format. -## [Unreleased] +## [0.12.0] -- 2019-05-02 + +### Added +- First implementation of pipeline interface 'outputs', so pipeline authors can specify items of interest produced by the pipeline. +- Functions and attributes on `Project` to support "outputs" (`interfaces`, `get_interfaces`, `get_outputs`) + +### Changed +- Start "compute" --> "compute_packges" transition +- `get_logger` moved to `peppy` + +### Fixed +- Prevent CLI option duplication in pipeline commands generated ## [0.11.1] - 2019-04-17 diff --git a/docs/features.md b/docs/features.md index be7184df3..1d576cdce 100644 --- a/docs/features.md +++ b/docs/features.md @@ -1,15 +1,15 @@ # Features and benefits -[cli]: ../img/cli.svg -[computing]: ../img/computing.svg -[flexible_pipelines]: ../img/flexible_pipelines.svg -[job_monitoring]: ../img/job_monitoring.svg -[resources]: ../img/resources.svg -[subprojects]: ../img/subprojects.svg -[collate]: ../img/collate.svg -[file_yaml]: ../img/file_yaml.svg -[html]: ../img/HTML.svg -[modular]: ../img/modular.svg +[cli]: img/cli.svg +[computing]: img/computing.svg +[flexible_pipelines]: img/flexible_pipelines.svg +[job_monitoring]: img/job_monitoring.svg +[resources]: img/resources.svg +[subprojects]: img/subprojects.svg +[collate]: img/collate.svg +[file_yaml]: img/file_yaml.svg +[html]: img/HTML.svg +[modular]: img/modular.svg ![modular][modular] **Modular approach to job handling** diff --git a/docs/pipeline-interface.md b/docs/pipeline-interface.md index 0457b3cfd..d445fc150 100644 --- a/docs/pipeline-interface.md +++ b/docs/pipeline-interface.md @@ -144,10 +144,11 @@ These are considered optional, and so the pipeline will still be submitted if th - `-C`: config_file (the pipeline config file specified in the project config file; or the default config file, if it exists) - `-P`: cores (the number of processing cores specified by the chosen resource package) - `-M`: mem (memory limit) -- `resources` (recommended) A section outlining how much memory, CPU, and clock time to request, modulated by input file size +- `resources` (recommended): A section outlining how much memory, CPU, and clock time to request, modulated by input file size If the `resources` section is missing, looper will only be able to run the pipeline locally (not submit it to a cluster resource manager). If you provide a `resources` section, you must define at least 1 option named 'default' with `file_size: "0"`. Then, you define as many more resource "packages" or "bundles" as you want. +- `outputs`: key-value pairs in which each key is a name for a kind of output file (or group of them) that a pipeline may produce, and the value is a template template for a path that will be populated by sample variables **More on `resources`** @@ -214,4 +215,7 @@ pipelines: cores: "4" mem: "8000" time: "08:00:00" + outputs: + smoothed_bw: "aligned_{sample.genome}/{sample.name}_smoothed.bw" + pre_smoothed_bw: "aligned_{project.prealignments}/{sample.name}_smoothed.bw" ``` diff --git a/docs_jupyter/build/hello-world.md b/docs_jupyter/build/hello-world.md index 4aadbc4b0..078ac64fe 100644 --- a/docs_jupyter/build/hello-world.md +++ b/docs_jupyter/build/hello-world.md @@ -1,4 +1,3 @@ -jupyter:True # Hello World! example for looper @@ -88,7 +87,7 @@ Traceback (most recent call last): File "/home/nsheff/.local/lib/python3.5/site-packages/looper/utils.py", line 104, in determine_config_path raise ValueError("Path doesn't exist: {}".format(root)) ValueError: Path doesn't exist: project/project_config.yaml - + ``` Voila! You've run your very first pipeline across multiple samples using `looper`! diff --git a/examples/microtest_sample_annotation.csv b/examples/microtest_annotation.csv similarity index 92% rename from examples/microtest_sample_annotation.csv rename to examples/microtest_annotation.csv index bc9b1b49e..379c51d53 100644 --- a/examples/microtest_sample_annotation.csv +++ b/examples/microtest_annotation.csv @@ -1,4 +1,4 @@ -sample_name,library,organism,ip,data_source +sample_name,protocol,organism,ip,data_source atac-seq_PE,ATAC-seq,human,,microtest atac-seq_SE,ATAC-seq,human,,microtest chip-seq_PE,CHIP-seq,human,H3K27ac,microtest diff --git a/examples/microtest_project_config.yaml b/examples/microtest_project_config.yaml index fbf66b586..bb4db47d4 100644 --- a/examples/microtest_project_config.yaml +++ b/examples/microtest_project_config.yaml @@ -3,8 +3,8 @@ metadata: results_subdir: results_pipeline submission_subdir: submission pipelines_dir: ${CODE}/pipelines - sample_annotation: microtest_sample_annotation.csv - merge_table: microtest_merge_table.csv + sample_annotation: microtest_annotation.csv + merge_table: microtest_subannotation.csv derived_columns: [data_source] @@ -18,7 +18,7 @@ subprojects: pipeline_config: wgbs.py: wgbs_ds.yaml -implied_columns: +implied_attributes: organism: human: genomes: hg19 diff --git a/examples/microtest_merge_table.csv b/examples/microtest_subannotation.csv similarity index 100% rename from examples/microtest_merge_table.csv rename to examples/microtest_subannotation.csv diff --git a/looper/__init__.py b/looper/__init__.py index 1b703da16..91046907b 100644 --- a/looper/__init__.py +++ b/looper/__init__.py @@ -15,6 +15,7 @@ from ._version import __version__ from .parser_types import * +from divvy import DEFAULT_COMPUTE_RESOURCES_NAME, NEW_COMPUTE_KEY as COMPUTE_KEY # Not used here, but make this the main import interface between peppy and # looper, so that other modules within this package need not worry about # the locations of some of the peppy declarations. Effectively, concentrate @@ -144,9 +145,17 @@ def add_subparser(cmd): "By default, pipelines will not be submitted if a sample name" " is duplicated, since samples names should be unique. " " Set this option to override this setting. Default=False") - subparser.add_argument( - "--compute", dest="compute", default="default", + + comp_spec = subparser.add_mutually_exclusive_group() + comp_spec.add_argument( + "--compute", dest=COMPUTE_KEY, + default=DEFAULT_COMPUTE_RESOURCES_NAME, + help="YAML file with looper environment compute settings.") + comp_spec.add_argument( + "--compute-packages", dest=COMPUTE_KEY, + default=DEFAULT_COMPUTE_RESOURCES_NAME, help="YAML file with looper environment compute settings.") + subparser.add_argument( "--resources", help="Specification of individual computing resource settings; " @@ -162,12 +171,12 @@ def add_subparser(cmd): # distinguish between explicit 0 and lack of specification. subparser.add_argument( "--lump", default=None, - type=html_range(min_val=0, max_val=100, step=0.1, value=100), + type=html_range(min_val=0, max_val=100, step=0.1, value=0), help="Maximum total input file size for a lump/batch of commands " "in a single job (in GB)") subparser.add_argument( "--lumpn", default=None, - type=html_range(min_val=1, max_val="num_samples", value="num_samples"), + type=html_range(min_val=1, max_val="num_samples", value=1), help="Number of individual scripts grouped into single submission") # Other commands @@ -213,7 +222,7 @@ def add_subparser(cmd): subparser.add_argument_group("select samples", "This group of arguments lets you specify samples to use by " "exclusion OR inclusion of the samples attribute values.") - fetch_samples_group.add_argument("--selector-attribute", nargs=1, dest="selector_attribute", + fetch_samples_group.add_argument("--selector-attribute", dest="selector_attribute", help="Specify the attribute for samples exclusion OR inclusion", default="protocol") protocols = fetch_samples_group.add_mutually_exclusive_group() diff --git a/looper/_devtools.py b/looper/_devtools.py new file mode 100644 index 000000000..16cc2d81a --- /dev/null +++ b/looper/_devtools.py @@ -0,0 +1,25 @@ +""" Utility functions for internal, developmental use """ + +import copy +from logmuse import setup_logger + +__author__ = "Vince Reuter" +__email__ = "vreuter@virginia.edu" + +__all__ = ["est_log"] + + +def est_log(**kwargs): + """ + Establish logging, e.g. for an interactive session. + + :param dict kwargs: keyword arguments for logger setup. + :return logging.Logger: looper logger + """ + kwds = copy.copy(kwargs) + if "name" in kwds: + print("Ignoring {} and setting fixed values for logging names". + format(kwds["name"])) + del kwds["name"] + setup_logger(name="peppy", **kwds) + return setup_logger(name="looper", **kwds) diff --git a/looper/_version.py b/looper/_version.py index 008a1d204..9d1ce461c 100644 --- a/looper/_version.py +++ b/looper/_version.py @@ -1,2 +1,2 @@ -__version__ = "0.11.1" +__version__ = "0.12.0" diff --git a/looper/conductor.py b/looper/conductor.py index 3fe37391b..517a596f4 100644 --- a/looper/conductor.py +++ b/looper/conductor.py @@ -88,7 +88,8 @@ def __init__(self, pipeline_key, pipeline_interface, cmd_base, prj, self.sample_subtype = sample_subtype or Sample self.compute_variables = compute_variables - self.extra_args_text = (extra_args and " ".join(extra_args)) or "" + self.extra_pipe_args = extra_args or [] + #self.extra_args_text = (extra_args and " ".join(extra_args)) or "" self.uses_looper_args = \ pipeline_interface.uses_looper_args(pipeline_key) self.ignore_flags = ignore_flags @@ -205,10 +206,9 @@ def add_sample(self, sample, sample_subtype=Sample, rerun=False): missing_reqs_msg = "{}: {}".format( missing_reqs_general, missing_reqs_specific) if self.prj.permissive: - _LOGGER.warning(missing_reqs_msg) + _LOGGER.warning("> Not submitted: %s", missing_reqs_msg) else: raise error_type(missing_reqs_msg) - _LOGGER.warning("> Not submitted: %s", missing_reqs_msg) use_this_sample and skip_reasons.append(missing_reqs_general) # Check if single_or_paired value is recognized. @@ -229,7 +229,7 @@ def add_sample(self, sample, sample_subtype=Sample, rerun=False): argstring = self.pl_iface.get_arg_string( pipeline_name=self.pl_key, sample=sample, submission_folder_path=self.prj.metadata[SUBMISSION_SUBDIR_KEY]) - except AttributeError as e: + except AttributeError: argstring = None # TODO: inform about which missing attribute. fail_message = "Required attribute missing " \ @@ -258,14 +258,24 @@ def add_sample(self, sample, sample_subtype=Sample, rerun=False): return skip_reasons def _get_settings_looptext_prjtext(self, size): + """ + Determine settings, looper argstring, and project argstring. + + :param int | float size: size of submission, used to select the proper + resource package from the pipeline interface + :return dict, str, str: collection of settings, looper argstring, and + project argstring + """ settings = self.pl_iface.choose_resource_package(self.pl_key, size) settings.update(self.compute_variables or {}) if self.uses_looper_args: settings.setdefault("cores", 1) - looper_argtext = create_looper_args_text(self.pl_key, settings, self.prj) + looper_argtext = \ + create_looper_args_text(self.pl_key, settings, self.prj) else: looper_argtext = "" - prj_argtext = self.prj.get_arg_string(self.pl_key) + prj_argtext = self.prj.get_arg_string( + self.pl_key, {x for x in self.extra_pipe_args if x.startswith("-")}) return settings, looper_argtext, prj_argtext def submit(self, force=False): @@ -287,14 +297,6 @@ def submit(self, force=False): submitted = False elif force or self._is_full(self._pool, self._curr_size): - _LOGGER.debug("Determining submission settings for %d sample " - "(%.2f Gb)", len(self._pool), self._curr_size) - settings, looper_argtext, prj_argtext = \ - self._get_settings_looptext_prjtext(self._curr_size) - assert all(map(lambda cmd_part: isinstance(cmd_part, str), - [self.cmd_base, prj_argtext, looper_argtext])), \ - "Each command component must be a string." - # Ensure that each sample is individually represented on disk, # specific to subtype as applicable (should just be a single # subtype for each submission conductor, but some may just be @@ -313,8 +315,7 @@ def submit(self, force=False): subtype_name, s.name) s.to_yaml(subs_folder_path=self.prj.metadata[SUBMISSION_SUBDIR_KEY]) - script = self.write_script(self._pool, settings, - prj_argtext=prj_argtext, looper_argtext=looper_argtext) + script = self.write_script(self._pool, self._curr_size) self._num_total_job_submissions += 1 @@ -392,20 +393,23 @@ def _jobname(self, pool): name = "lump{}".format(self._num_total_job_submissions + 1) return "{}_{}".format(self.pl_key, name) - def write_script(self, pool, template_values, prj_argtext, looper_argtext): + def _cmd_text_extra(self, size): + _LOGGER.debug("Determining submission settings for pool of size %.2f Gb", size) + settings, ltext, ptext = self._get_settings_looptext_prjtext(size) + from_cli = " ".join(self.extra_pipe_args) if self.extra_pipe_args else "" + return settings, " ".join([t for t in [ptext, ltext, from_cli] if t]) + + def write_script(self, pool, size): """ Create the script for job submission. - :param Mapping template_values: Collection of template placeholder - keys and the values with which to replace them. - :param str prj_argtext: Command text related to Project data. - :param str looper_argtext: Command text related to looper arguments. + :param Iterable[(peppy.Sample, str)] pool: collection of pairs in which + first component is a sample instance and second is command/argstring + :param float size: cumulative size of the given pool :return str: Path to the job submission script created. """ - # Determine the command text for the project, looper, and extra args. - texts = [prj_argtext, looper_argtext, self.extra_args_text] - extra_parts_text = " ".join([t for t in texts if t]) + template_values, extra_parts_text = self._cmd_text_extra(size) def get_final_cmd(c): return "{} {}".format(c, extra_parts_text) if extra_parts_text else c @@ -431,11 +435,7 @@ def get_base_cmd(argstr): def write_skipped_sample_scripts(self): """ For any sample skipped during initial processing, write submission script. """ - scripts = [] - for pool, size in self._skipped_sample_pools: - settings, looptext, prjtext = self._get_settings_looptext_prjtext(size) - scripts.append(self.write_script(pool, settings, prjtext, looptext)) - return scripts + return [self.write_script(pool, size) for pool, size in self._skipped_sample_pools] def _reset_pool(self): """ Reset the state of the pool of samples """ diff --git a/looper/const.py b/looper/const.py index acf5ced17..69470e16e 100644 --- a/looper/const.py +++ b/looper/const.py @@ -4,14 +4,10 @@ __email__ = "vreuter@virginia.edu" -__all__ = ["RESULTS_SUBDIR_KEY", "SUBMISSION_SUBDIR_KEY", "TEMPLATES_DIRNAME", "APPEARANCE_BY_FLAG", - "NO_DATA_PLACEHOLDER"] +__all__ = ["APPEARANCE_BY_FLAG", "NO_DATA_PLACEHOLDER", "OUTKEY", + "PIPELINE_INTERFACES_KEY", "RESULTS_SUBDIR_KEY", + "SUBMISSION_SUBDIR_KEY", "TEMPLATES_DIRNAME"] - -RESULTS_SUBDIR_KEY = "results_subdir" -SUBMISSION_SUBDIR_KEY = "submission_subdir" -TEMPLATES_DIRNAME = "jinja_templates" -NO_DATA_PLACEHOLDER = "NA" APPEARANCE_BY_FLAG = { "completed": { "button_class": "table-success", @@ -33,4 +29,10 @@ "button_class": "table-info", "flag": "Waiting" } -} \ No newline at end of file +} +NO_DATA_PLACEHOLDER = "NA" +PIPELINE_INTERFACES_KEY = "pipeline_interfaces" +OUTKEY = "outputs" +RESULTS_SUBDIR_KEY = "results_subdir" +SUBMISSION_SUBDIR_KEY = "submission_subdir" +TEMPLATES_DIRNAME = "jinja_templates" diff --git a/looper/exceptions.py b/looper/exceptions.py index b480ada95..bb3fd0c4e 100644 --- a/looper/exceptions.py +++ b/looper/exceptions.py @@ -1,10 +1,5 @@ """ Exceptions for specific looper issues. """ - -__author__ = "Vince Reuter" -__email__ = "vreuter@virginia.edu" - - from abc import ABCMeta import sys if sys.version_info < (3, 3): @@ -12,9 +7,13 @@ else: from collections.abc import Iterable +__author__ = "Vince Reuter" +__email__ = "vreuter@virginia.edu" -_all__ = ["InvalidResourceSpecificationException", "JobSubmissionException", - "LooperError", "MissingPipelineConfigurationException", +_all__ = ["DuplicatePipelineKeyException", + "InvalidResourceSpecificationException", + "JobSubmissionException", "LooperError", + "MissingPipelineConfigurationException", "PipelineInterfaceConfigError"] @@ -23,6 +22,12 @@ class LooperError(Exception): __metaclass__ = ABCMeta +class DuplicatePipelineKeyException(LooperError): + """ Duplication of pipeline identifier precludes unique pipeline ref. """ + def __init__(self, key): + super(DuplicatePipelineKeyException, self).__init__(key) + + class InvalidResourceSpecificationException(LooperError): """ Pipeline interface resources--if present--needs default. """ def __init__(self, reason): diff --git a/looper/html_reports.py b/looper/html_reports.py index 6f9441d13..2a9326eae 100644 --- a/looper/html_reports.py +++ b/looper/html_reports.py @@ -492,7 +492,7 @@ def create_project_objects(self): figures = [] links = [] warnings = [] - ifaces = self.prj.interfaces_by_protocol[protocol] + ifaces = self.prj.get_interfaces(protocol) # Check the interface files for summarizers for iface in ifaces: diff --git a/looper/jinja_templates/project_object.html b/looper/jinja_templates/project_object.html index 793641c1c..c618d29f7 100644 --- a/looper/jinja_templates/project_object.html +++ b/looper/jinja_templates/project_object.html @@ -1,7 +1,7 @@
{% if links[0] is defined or figures[0] is defined %}
-
Looper project objects
+

Looper project objects

{% endif %} {% if figures[0] is defined %}
Figures
@@ -9,11 +9,11 @@
Figures
{% for figure in figures %}
- + - -
'{{ figure[0] }}'
+
+
'{{ figure[1] }}'
{% endfor %} diff --git a/looper/looper.py b/looper/looper.py index 7d599e0d8..a72925e13 100755 --- a/looper/looper.py +++ b/looper/looper.py @@ -29,11 +29,13 @@ from .const import * from .exceptions import JobSubmissionException from .html_reports import HTMLReportBuilder +from .pipeline_interface import RESOURCES_KEY from .project import Project from .utils import determine_config_path, fetch_flag_files, sample_folder +from divvy import DEFAULT_COMPUTE_RESOURCES_NAME, NEW_COMPUTE_KEY as COMPUTE_KEY from logmuse import setup_logger -from peppy import ProjectContext, SAMPLE_EXECUTION_TOGGLE +from peppy import ProjectContext, METADATA_KEY, SAMPLE_EXECUTION_TOGGLE SUBMISSION_FAILURE_MESSAGE = "Cluster resource failure" @@ -240,17 +242,17 @@ def process_protocols(prj, protocols, resource_setting_kwargs=None, **kwargs): resource_setting_kwargs = {} try: - comp_vars = prj.dcc.compute.to_map() + comp_vars = prj.dcc[COMPUTE_KEY].to_map() except AttributeError: - if not isinstance(prj.dcc.compute, Mapping): + if not isinstance(prj.dcc[COMPUTE_KEY], Mapping): raise TypeError("Project's computing config isn't a mapping: {} ({})". - format(prj.dcc.compute, type(prj.dcc.compute))) + format(prj.dcc[COMPUTE_KEY], type(prj.dcc[COMPUTE_KEY]))) from copy import deepcopy - comp_vars = deepcopy(prj.dcc.compute) + comp_vars = deepcopy(prj.dcc[COMPUTE_KEY]) comp_vars.update(resource_setting_kwargs or {}) _LOGGER.info("Known protocols: {}".format( - ", ".join(prj.interfaces_by_protocol.keys()))) + ", ".join(prj.interfaces.protocols))) for proto in set(protocols) | {GENERIC_PROTOCOL_KEY}: _LOGGER.debug("Determining sample type, script, and flags for " @@ -287,8 +289,8 @@ def __call__(self, args, remaining_args, rerun=False, **compute_kwargs): run for the first time """ - if not self.prj.interfaces_by_protocol: - pipe_locs = getattr(self.prj.metadata, "pipeline_interfaces", []) + if not self.prj.interfaces: + pipe_locs = getattr(self.prj[METADATA_KEY], PIPELINE_INTERFACES_KEY, []) # TODO: should these cases be handled as equally exceptional? # That is, should they either both raise errors, or both log errors? if len(pipe_locs) == 0: @@ -475,7 +477,7 @@ def __init__(self, prj): def __call__(self): """ Do the summarization. """ - _run_custom_summarizers(self.prj) + run_custom_summarizers(self.prj) # initialize the report builder report_builder = HTMLReportBuilder(self.prj) # run the report builder. a set of HTML pages is produced @@ -483,7 +485,7 @@ def __call__(self): _LOGGER.info("HTML Report (n=" + str(len(self.stats)) + "): " + report_path) -def _run_custom_summarizers(project): +def run_custom_summarizers(project): """ Run custom summarizers if any are defined @@ -494,7 +496,7 @@ def _run_custom_summarizers(project): for protocol in set(all_protocols): try: - ifaces = project.interfaces_by_protocol[protocol] + ifaces = project.get_interfaces(protocol) except KeyError: _LOGGER.warning("No interface for protocol '{}', skipping summary".format(protocol)) continue @@ -804,23 +806,25 @@ def main(): determine_config_path(args.config_file), subproject=args.subproject, file_checks=args.file_checks, compute_env_file=getattr(args, 'env', None)) except yaml.parser.ParserError as e: - print("Project config parse failed -- {}".format(e)) + _LOGGER.error("Project config parse failed -- {}".format(e)) sys.exit(1) - if hasattr(args, "compute"): - # Default is already loaded - if args.compute != "default": - prj.dcc.activate_package(args.compute) + compute_cli_spec = getattr(args, COMPUTE_KEY, None) + if compute_cli_spec and compute_cli_spec != DEFAULT_COMPUTE_RESOURCES_NAME: + prj.dcc.activate_package(compute_cli_spec) _LOGGER.debug("Results subdir: " + prj.metadata[RESULTS_SUBDIR_KEY]) - with ProjectContext(prj, selector_attribute=args.selector_attribute, selector_include=args.selector_include, + with ProjectContext(prj, + selector_attribute=args.selector_attribute, + selector_include=args.selector_include, selector_exclude=args.selector_exclude) as prj: if args.command in ["run", "rerun"]: run = Runner(prj) try: - compute_kwargs = _proc_resources_spec(getattr(args, "resources", "")) + compute_kwargs = _proc_resources_spec( + getattr(args, RESOURCES_KEY, "")) run(args, remaining_args, rerun=(args.command == "rerun"), **compute_kwargs) except IOError: diff --git a/looper/pipeline_interface.py b/looper/pipeline_interface.py index 4018f6c58..dabf5f667 100644 --- a/looper/pipeline_interface.py +++ b/looper/pipeline_interface.py @@ -17,8 +17,8 @@ MissingPipelineConfigurationException, PipelineInterfaceConfigError from .utils import get_logger from attmap import PathExAttMap +from divvy import DEFAULT_COMPUTE_RESOURCES_NAME, NEW_COMPUTE_KEY as COMPUTE_KEY from peppy import utils, Sample -from divvy import DEFAULT_COMPUTE_RESOURCES_NAME from peppy.utils import is_command_callable @@ -27,6 +27,7 @@ PL_KEY = "pipelines" PROTOMAP_KEY = "protocol_mapping" +RESOURCES_KEY = "resources" SUBTYPE_MAPPING_SECTION = "sample_subtypes" @@ -54,8 +55,15 @@ def __init__(self, config): _LOGGER.debug("Parsing '%s' for %s config data", config, self.__class__.__name__) self.pipe_iface_file = config - with open(config, 'r') as f: - config = yaml.load(f, SafeLoader) + try: + with open(config, 'r') as f: + config = yaml.load(f, SafeLoader) + except yaml.parser.ParserError: + with open(config, 'r') as f: + _LOGGER.error( + "Failed to parse YAML from {}:\n{}". + format(config, "".join(f.readlines()))) + raise self.source = config # Check presence of 2 main sections (protocol mapping and pipelines). @@ -106,28 +114,26 @@ def notify(msg): pl = self.select_pipeline(pipeline_name) - compute_key = "compute" universal_compute = {} try: - universal_compute = pl[compute_key] + universal_compute = pl[COMPUTE_KEY] except KeyError: notify("No compute settings") - res_key = "resources" try: - resources = universal_compute[res_key] + resources = universal_compute[RESOURCES_KEY] except KeyError: try: - resources = pl[res_key] + resources = pl[RESOURCES_KEY] except KeyError: notify("No resources") return {} else: - if res_key in pl: + if RESOURCES_KEY in pl: _LOGGER.warning( "{rk} section found in both {c} section and top-level " "pipelines section of pipeline interface; {c} section " - "version will be used".format(rk=res_key, c=compute_key)) + "version will be used".format(rk=RESOURCES_KEY, c=COMPUTE_KEY)) # Require default resource package specification. try: @@ -553,7 +559,6 @@ def expand_pl_paths(piface): return piface - def standardize_protocols(piface): """ Handle casing and punctuation of protocol keys in pipeline interface. @@ -670,7 +675,6 @@ def class_names(cs): class_names(proper_subtypes))) - def _fetch_classes(mod): """ Return the classes defined in a module. """ try: @@ -681,7 +685,6 @@ def _fetch_classes(mod): return list(classes) - def _proper_subtypes(types, supertype): """ Determine the proper subtypes of a supertype. """ return list(filter( diff --git a/looper/project.py b/looper/project.py index f6953ecac..0e10a72b2 100644 --- a/looper/project.py +++ b/looper/project.py @@ -1,20 +1,26 @@ """ Looper version of NGS project model. """ -from collections import defaultdict, namedtuple +from collections import namedtuple +import copy from functools import partial import itertools import os import peppy +from peppy import METADATA_KEY, OUTDIR_KEY from peppy.utils import is_command_callable from .const import * -from .pipeline_interface import PipelineInterface +from .exceptions import DuplicatePipelineKeyException +from .pipeline_interface import PROTOMAP_KEY +from .project_piface_group import ProjectPifaceGroup from .utils import get_logger, partition __author__ = "Vince Reuter" __email__ = "vreuter@virginia.edu" +__all__ = ["Project", "process_pipeline_interfaces"] + _LOGGER = get_logger(__name__) @@ -32,18 +38,28 @@ def __init__(self, config_file, subproject=None, **kwargs): config_file, subproject=subproject, no_environment_exception=RuntimeError, no_compute_exception=RuntimeError, **kwargs) - self.interfaces_by_protocol = \ - process_pipeline_interfaces(self.metadata.pipeline_interfaces) + self._interfaces = process_pipeline_interfaces( + self[METADATA_KEY][PIPELINE_INTERFACES_KEY]) + + @property + def interfaces(self): + """ + Get this Project's collection of pipeline interfaces + + :return Iterable[looper.PipelineInterface]: collection of pipeline + interfaces known by this Project + """ + return copy.deepcopy(self._interfaces) @property def project_folders(self): """ Keys for paths to folders to ensure exist. """ - return ["output_dir", RESULTS_SUBDIR_KEY, SUBMISSION_SUBDIR_KEY] + return [OUTDIR_KEY, RESULTS_SUBDIR_KEY, SUBMISSION_SUBDIR_KEY] @property def required_metadata(self): """ Which metadata attributes are required. """ - return ["output_dir"] + return [OUTDIR_KEY] def build_submission_bundles(self, protocol, priority=True): """ @@ -77,8 +93,7 @@ def build_submission_bundles(self, protocol, priority=True): # sort of pool of information about possible ways in which to submit # pipeline(s) for sample(s) of the indicated protocol. try: - pipeline_interfaces = \ - self.interfaces_by_protocol[protocol] + pipeline_interfaces = self.get_interfaces(protocol) except KeyError: # Messaging can be done by the caller. _LOGGER.debug("No interface for protocol: %s", protocol) @@ -201,6 +216,112 @@ def build_submission_bundles(self, protocol, priority=True): else: return list(itertools.chain(*job_submission_bundles)) + def get_interfaces(self, protocol): + """ + Get the pipeline interfaces associated with the given protocol. + + :param str protocol: name of the protocol for which to get interfaces + :return Iterable[looper.PipelineInterface]: collection of pipeline + interfaces associated with the given protocol + :raise KeyError: if the given protocol is not (perhaps yet) mapped + to any pipeline interface + """ + return self.interfaces[protocol] + + def get_outputs(self, skip_sample_less=True): + """ + Map pipeline identifier to collection of output specifications. + + This method leverages knowledge of two collections of different kinds + of entities that meet in the manifestation of a Project. The first + is a collection of samples, which is known even in peppy.Project. The + second is a mapping from protocol/assay/library strategy to a collection + of pipeline interfaces, in which kinds of output may be declared. + + Knowledge of these two items is here harnessed to map the identifier + for each pipeline about which this Project is aware to a collection of + pairs of identifier for a kind of output and the collection of + this Project's samples for which it's applicable (i.e., those samples + with protocol that maps to the corresponding pipeline). + + :param bool skip_sample_less: whether to omit pipelines that are for + protocols of which the Project has no Sample instances + :return Mapping[str, Mapping[str, namedtuple]]: collection of bindings + between identifier for pipeline and collection of bindings between + name for a kind of output and pair in which first component is a + path template and the second component is a collection of + sample names + :raise TypeError: if argument to sample-less pipeline skipping parameter + is not a Boolean + """ + if not isinstance(skip_sample_less, bool): + raise TypeError( + "Non-Boolean argument to sample-less skip flag: {} ({})". + format(skip_sample_less, type(skip_sample_less))) + prots_data_pairs = _gather_ifaces(self.interfaces) + m = {} + for name, (prots, data) in prots_data_pairs.items(): + try: + outs = data[OUTKEY] + except KeyError: + _LOGGER.debug("No {} declared for pipeline: {}". + format(OUTKEY, name)) + continue + snames = [s.name for s in self.samples if s.protocol in prots] + if not snames and skip_sample_less: + _LOGGER.debug("No samples matching protocol(s): {}". + format(", ".join(prots))) + continue + m[name] = {path_key: (path_val, snames) + for path_key, path_val in outs.items()} + return m + + def _omit_from_repr(self, k, cls): + """ + Exclude the interfaces from representation. + + :param str k: key of item to consider for omission + :param type cls: placeholder to comply with superclass signature + """ + return super(Project, self)._omit_from_repr(k, cls) or k == "interfaces" + + +def _gather_ifaces(ifaces): + """ + For each pipeline map identifier to protocols and interface data. + + :param Iterable[looper.PipelineInterface] ifaces: collection of pipeline + interface objects + :return Mapping[str, (set[str], attmap.AttMap)]: collection of bindings + between pipeline identifier and pair in which first component is + collection of associated protocol names, and second component is a + collection of interface data for pipeline identified by the key + :raise looper.DuplicatePipelineKeyException: if the same identifier (key or + name) points to collections of pipeline interface data (for a + particular pipeline) that are not equivalent + """ + specs = {} + for pi in ifaces: + protos_by_name = {} + for p, names in pi[PROTOMAP_KEY].items(): + if isinstance(names, str): + names = [names] + for n in names: + protos_by_name.setdefault(n, set()).add(p) + for k, dat in pi.iterpipes(): + name = dat.get("name") or k + try: + old_prots, old_dat = specs[name] + except KeyError: + old_prots = set() + else: + if dat != old_dat: + raise DuplicatePipelineKeyException(name) + new_prots = protos_by_name.get(name, set()) | \ + protos_by_name.get(k, set()) + specs[name] = (old_prots | new_prots, dat) + return specs + def process_pipeline_interfaces(pipeline_interface_locations): """ @@ -213,17 +334,22 @@ def process_pipeline_interfaces(pipeline_interface_locations): :return Mapping[str, Iterable[PipelineInterface]]: mapping from protocol name to interface(s) for which that protocol is mapped """ - interface_by_protocol = defaultdict(list) - for pipe_iface_location in pipeline_interface_locations: - if not os.path.exists(pipe_iface_location): - _LOGGER.warning("Ignoring nonexistent pipeline interface " - "location: '%s'", pipe_iface_location) + iface_group = ProjectPifaceGroup() + for loc in pipeline_interface_locations: + if not os.path.exists(loc): + _LOGGER.warning("Ignoring nonexistent pipeline interface location: " + "{}".format(loc)) continue - pipe_iface = PipelineInterface(pipe_iface_location) - for proto_name in pipe_iface.protocol_mapping: - _LOGGER.whisper("Adding protocol name: '%s'", proto_name) - interface_by_protocol[proto_name].append(pipe_iface) - return interface_by_protocol + fs = [loc] if os.path.isfile(loc) else \ + [os.path.join(loc, f) for f in os.listdir(loc) + if os.path.splitext(f)[1] in [".yaml", ".yml"]] + for f in fs: + _LOGGER.debug("Processing interface definition: {}".format(f)) + iface_group.update(f) + return iface_group + + +OutputGroup = namedtuple("OutputGroup", field_names=["path", "samples"]) # Collect PipelineInterface, Sample type, pipeline path, and script with flags. @@ -234,5 +360,5 @@ def process_pipeline_interfaces(pipeline_interface_locations): def _is_member(item, items): - """ Determine whether an iterm is a member of a collection. """ + """ Determine whether an item is a member of a collection. """ return item in items diff --git a/looper/project_piface_group.py b/looper/project_piface_group.py new file mode 100644 index 000000000..e3bc90212 --- /dev/null +++ b/looper/project_piface_group.py @@ -0,0 +1,116 @@ +""" Group of Project's PipelineInterface instances """ + +import sys +if sys.version_info < (3, 3): + from collections import Mapping +else: + from collections.abc import Mapping +from .pipeline_interface import PipelineInterface, PROTOMAP_KEY +from .utils import get_logger + +__author__ = "Vince Reuter" +__email__ = "vreuter@virginia.edu" + + +_LOGGER = get_logger(__name__) + + +class ProjectPifaceGroup(object): + """ Collection of PipelineInterface instances and lookup-by-protocol. """ + + def __init__(self, piface=None): + """ + Create the group, either empty or with initial data. + + :param str | Mapping | looper.PipelineInterface piface: either pipeline + interface file, pipeline interface, or interface-defining mapping + """ + self._interfaces = [] + self._indices_by_protocol = {} + piface and self.update(piface) + + def __eq__(self, other): + """ + Instances are equivalent iff interfaces and protocol mappings are. + + :param looper.project_piface_group.ProjectPifaceGroup other: the group + to compare to this one + :return bool: whether this group is equivalent to the compared one + """ + return isinstance(other, ProjectPifaceGroup) and \ + self._interfaces == other._interfaces and \ + self._indices_by_protocol == other._indices_by_protocol + + def __ne__(self, other): + """ Leverage the overridden equivalence operator. """ + return not self == other + + def __getitem__(self, item): + """ + Retrieve interfaces for given protocol name. + + :param str item: name of protocol for which to fetch interfaces. + :return Iterable[looper.PipelineInterface]: + """ + return [self._interfaces[i] for i in self._indices_by_protocol[item]] + + def __iter__(self): + """ + Iteration is over the interfaces. + + :return Iterable[looper.PipelineInterface]: iterator over this group's + PipelineInterface instances + """ + return iter(self._interfaces) + + def __len__(self): + """ + Group size is the number of interfaces. + + :return int: number of interfaces in this group + """ + return sum(1 for _ in iter(self)) + + @property + def protocols(self): + """ + Get the collection of names of protocols mapping into this group. + + :return list[str]: collection of protocol names that map to at least + one pipeline represented by an interface in this group + """ + return [p for p in self._indices_by_protocol] + + def update(self, piface): + """ + Add a pipeline interface to this group. + + :param str | Mapping | looper.PipelineInterface piface: either pipeline + interface file, pipeline interface, or interface-defining mapping + :return looper.project_piface_group.ProjectPifaceGroup: updated instance + :raise TypeError: if the argument to the piface parameter is neither + text (filepath) nor a PipelineInterface or Mapping; additional + exception cases may arise from ensuing attempt to create a + PipelineInterface from the argument if the argument itself is not + already a PipelineInterface. + """ + if isinstance(piface, (str, Mapping)): + piface = PipelineInterface(piface) + elif not isinstance(piface, PipelineInterface): + raise TypeError( + "Update value must be {obj}-defining filepath or {obj} itself; " + "got {argtype}".format( + obj=PipelineInterface.__name__, argtype=type(piface))) + assert isinstance(piface, PipelineInterface) + for curr in self._interfaces: + if curr == piface: + _LOGGER.whisper( + "Found match existing {} match: {}".format( + PipelineInterface.__class__.__name__, piface)) + break + else: + self._interfaces.append(piface) + i = len(self._interfaces) - 1 + for p in piface[PROTOMAP_KEY]: + self._indices_by_protocol.setdefault(p, []).append(i) + return self diff --git a/looper/utils.py b/looper/utils.py index cfaf8501e..437f85e39 100644 --- a/looper/utils.py +++ b/looper/utils.py @@ -3,11 +3,11 @@ from collections import defaultdict, Iterable import copy import glob -import logging import os from peppy import \ FLAGS, SAMPLE_INDEPENDENT_PROJECT_SECTIONS, SAMPLE_NAME_COLNAME +from peppy.utils import get_logger from .const import * @@ -15,18 +15,6 @@ DEFAULT_CONFIG_SUFFIX = "_config.yaml" -def get_logger(name): - """ - Returm a logger with given name, equipped with custom method. - - :param str name: name for the logger to get/create. - :return logging.Logger: named, custom logger instance. - """ - l = logging.getLogger(name) - l.whisper = lambda msg, *args, **kwargs: l.log(5, msg, *args, **kwargs) - return l - - _LOGGER = get_logger(__name__) diff --git a/mkdocs.yml b/mkdocs.yml index 777230aa1..f3e89817c 100644 --- a/mkdocs.yml +++ b/mkdocs.yml @@ -1,7 +1,8 @@ site_name: Looper site_logo: img/looper_logo_dark.svg -site_url: http://code.databio.org/looper/ +site_url: http://looper.databio.org repo_url: http://github.com/pepkit/looper +pypi_name: loopercli nav: - Getting Started: @@ -21,6 +22,7 @@ nav: - Reference: - Usage: usage.md - Configuration files: config-files.md + - API: autodoc_build/looper.md - FAQ: faq.md - Support: support.md - Contributing: contributing.md @@ -29,7 +31,10 @@ nav: theme: databio plugins: - - databio + - databio: + autodoc_build: "docs/autodoc_build" + jupyter_source: "docs_jupyter" + jupyter_build: "docs_jupyter/build" + autodoc_package: "looper" + no_top_level: true - search - - diff --git a/requirements/requirements-all.txt b/requirements/requirements-all.txt index b144de7cd..a893ddfdc 100644 --- a/requirements/requirements-all.txt +++ b/requirements/requirements-all.txt @@ -1,8 +1,8 @@ -attmap>=0.6 +attmap>=0.7 colorama>=0.3.9 logmuse>=0.0.2 pandas>=0.20.2 pyyaml>=3.12 divvy>=0.3.1 -peppy>=0.20 - +peppy>=0.21.0 +ubiquerg>=0.0.3 diff --git a/requirements/requirements-dev.txt b/requirements/requirements-dev.txt index 2e300d8dd..72dcff1ac 100644 --- a/requirements/requirements-dev.txt +++ b/requirements/requirements-dev.txt @@ -1,2 +1,4 @@ -mock==2.0.0 -pytest==3.0.7 +jinja2 +mock>=2.0.0 +pytest>=3.0.7 +ubiquerg>=0.0.3 diff --git a/requirements/requirements-doc.txt b/requirements/requirements-doc.txt index 9c5fe172d..492fbb486 100644 --- a/requirements/requirements-doc.txt +++ b/requirements/requirements-doc.txt @@ -2,4 +2,4 @@ mkdocs>=1.0 markdown-include pydoc-markdown https://github.com/databio/mkdocs-databio/archive/master.zip -https://github.com/pepkit/looper/archive/master.zip +loopercli diff --git a/requirements/requirements-test.txt b/requirements/requirements-test.txt index 606b3da48..7f4458baa 100644 --- a/requirements/requirements-test.txt +++ b/requirements/requirements-test.txt @@ -1,2 +1,3 @@ coveralls==1.1 pytest-cov==2.4.0 +pytest-remotedata diff --git a/tests/conftest.py b/tests/conftest.py index 07c4caf9a..b61aeb8b3 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -136,7 +136,7 @@ "testngs.sh": FILE_BY_SAMPLE } -SAMPLE_ANNOTATION_LINES = """sample_name,library,file,file2,organism,nonmerged_col,data_source,dcol2 +SAMPLE_ANNOTATION_LINES = """sample_name,protocol,file,file2,organism,nonmerged_col,data_source,dcol2 a,testlib,src3,src3,,src3,src3, b,testlib,,,,src3,src3,src1 c,testlib,src3,src3,,src3,src3, @@ -182,11 +182,10 @@ } COMPARISON_FUNCTIONS = ["__eq__", "__ne__", "__len__", "keys", "values", "items"] -COLUMNS = [SAMPLE_NAME_COLNAME, "val1", "val2", "library"] +COLUMNS = [SAMPLE_NAME_COLNAME, "val1", "val2", "protocol"] PROJECT_CONFIG_DATA = {"metadata": {SAMPLE_ANNOTATIONS_KEY: "annotations.csv"}} - def update_project_conf_data(extension): """ Updated Project configuration data mapping based on file extension """ updated = copy.deepcopy(PROJECT_CONFIG_DATA) @@ -196,7 +195,6 @@ def update_project_conf_data(extension): return updated - def pytest_addoption(parser): """ Facilitate command-line test behavior adjustment. """ parser.addoption("--logging-level", @@ -204,7 +202,6 @@ def pytest_addoption(parser): help="Project root logger level to use for tests") - def pytest_generate_tests(metafunc): """ Centralize dynamic test case parameterization. """ if "empty_collection" in metafunc.fixturenames: diff --git a/tests/helpers.py b/tests/helpers.py index 51a2bb87f..2bb49a110 100644 --- a/tests/helpers.py +++ b/tests/helpers.py @@ -1,7 +1,8 @@ """ Test utilities. """ from functools import partial -import itertools +import random +import string import numpy as np import pytest @@ -10,6 +11,9 @@ __email__ = "vreuter@virginia.edu" +LETTERS_AND_DIGITS = string.ascii_letters + string.digits + + def assert_entirely_equal(observed, expected): """ Accommodate equality assertion for varied data, including NaN. """ try: @@ -35,25 +39,27 @@ def named_param(argnames, argvalues): ids=lambda arg: "{}={}".format(argnames, arg))) -def powerset(items, min_items=0, include_full_pop=True): +def randstr(pool, size): """ - Build the powerset of a collection of items. - - :param Iterable[object] items: "Pool" of all items, the population for - which to build the power set. - :param int min_items: Minimum number of individuals from the population - to allow in any given subset. - :param bool include_full_pop: Whether to include the full population in - the powerset (default True to accord with genuine definition) - :return list[object]: Sequence of subsets of the population, in - nondecreasing size order + Generate random string of given size/length. + + :param Iterable[str] pool: collection of characters from which to sample + (with replacement) + :param int size: nunber of characters + :return str: string built by concatenating randomly sampled characters + :raise ValueError: if size is not a positive integer """ - items = list(items) # Account for iterable burn possibility. - max_items = len(items) + 1 if include_full_pop else len(items) - min_items = min_items or 0 - return list(itertools.chain.from_iterable( - itertools.combinations(items, k) - for k in range(min_items, max_items))) + if size < 1: + raise ValueError("Must build string of positive integral length; got " + "{}".format(size)) + return "".join(random.choice(pool) for _ in range(size)) -nonempty_powerset = partial(powerset, min_items=1) +def randconf(ext=".yaml"): + """ + Randomly generate config filename. + + :param str ext: filename extension + :return str: randomly generated string to function as filename + """ + return randstr(LETTERS_AND_DIGITS, 15) + ext diff --git a/tests/integration/def test_project_iface_sample_interaction.py b/tests/integration/def test_project_iface_sample_interaction.py index 022e6d9c4..c5ea0ba0b 100644 --- a/tests/integration/def test_project_iface_sample_interaction.py +++ b/tests/integration/def test_project_iface_sample_interaction.py @@ -14,12 +14,10 @@ __email__ = "vreuter@virginia.edu" - @pytest.mark.usefixtures("write_project_files", "pipe_iface_config_file") class SampleWrtProjectCtorTests: """ Tests for `Sample` related to `Project` construction """ - @named_param(argnames="sample_index", argvalues=(set(range(NUM_SAMPLES)) - NGS_SAMPLE_INDICES)) def test_required_inputs(self, proj, pipe_iface, sample_index): @@ -38,7 +36,6 @@ def test_required_inputs(self, proj, pipe_iface, sample_index): assert not error_general assert not error_specific - @named_param(argnames="sample_index", argvalues=NGS_SAMPLE_INDICES) def test_ngs_pipe_ngs_sample(self, proj, pipe_iface, sample_index): """ NGS pipeline with NGS input works just fine. """ @@ -59,7 +56,6 @@ def test_ngs_pipe_ngs_sample(self, proj, pipe_iface, sample_index): assert expected_required_input_basename == \ observed_required_input_basename - @named_param(argnames="sample_index", argvalues=set(range(NUM_SAMPLES)) - NGS_SAMPLE_INDICES) @pytest.mark.parametrize( @@ -110,4 +106,3 @@ def test_ngs_pipe_non_ngs_sample( # Remove the temporary handler and assert that we've reset state. del looper._LOGGER.handlers[-1] assert pre_test_handlers == looper._LOGGER.handlers - diff --git a/tests/integration/test_project_get_interfaces.py b/tests/integration/test_project_get_interfaces.py new file mode 100644 index 000000000..a13ee7b3f --- /dev/null +++ b/tests/integration/test_project_get_interfaces.py @@ -0,0 +1,6 @@ +""" Tests for request to Project for interfaces(s) for particular protocol """ + +import pytest + +__author__ = "Vince Reuter" +__email__ = "vreuter@virginia.edu" diff --git a/tests/integration/test_project_get_outputs.py b/tests/integration/test_project_get_outputs.py new file mode 100644 index 000000000..439c70c00 --- /dev/null +++ b/tests/integration/test_project_get_outputs.py @@ -0,0 +1,736 @@ +""" Tests for interaction between Project and PipelineInterface """ + +from copy import deepcopy +import itertools +import os +import random +import string +import pytest +import yaml +from looper import Project as LP +from looper.const import * +from looper.exceptions import DuplicatePipelineKeyException +from looper.pipeline_interface import PL_KEY, PROTOMAP_KEY, RESOURCES_KEY +from attmap import AttMap +from divvy import DEFAULT_COMPUTE_RESOURCES_NAME as DEF_RES +from peppy.const import * +from peppy.utils import count_repeats +from tests.helpers import LETTERS_AND_DIGITS, randstr, randconf + +__author__ = "Vince Reuter" +__email__ = "vreuter@virginia.edu" + + +MAIN_META_KEY = "main_meta" +SUBS_META_KEY = "subs_meta" +SECTION_BY_FIXTURE = { + MAIN_META_KEY: METADATA_KEY, SUBS_META_KEY: SUBPROJECTS_SECTION} +BASE_META = {OUTDIR_KEY: "arbitrary"} +DECLARED_OUTPUTS = {"smooth_bw": "a_{sample.name}/b_{sample.protocol}.txt", + "unalign": "u_{sample.name}_{sample.protocol}.txt"} +WGBS_NAME = "WGBS" +RRBS_NAME = "RRBS" +WGBS_KEY = "wgbs" +RRBS_KEY = "rrbs" + +PROTO_NAMES = {WGBS_KEY: WGBS_NAME, RRBS_KEY: RRBS_NAME} + +WGBS_IFACE_LINES = """name: {n} +path: src/wgbs.py +required_input_files: [data_source] +ngs_input_files: [data_source] +arguments: + "--sample-name": sample_name + "--genome": genome + "--input": data_source + "--single-or-paired": read_type +{r}: + {d}: + file_size: "0" + cores: "4" + mem: "4000" + time: "0-02:00:00" +""".format(n=WGBS_NAME, r=RESOURCES_KEY, d=DEF_RES).splitlines(True) + +RRBS_IFACE_LINES = """name: {n} +path: src/rrbs.py +required_input_files: [data_source] +all_input_files: [data_source, read1, read2] +ngs_input_files: [data_source, read1, read2] +arguments: + "--sample-name": sample_name + "--genome": genome + "--input": data_source + "--single-or-paired": read_type +{r}: + {d}: + file_size: "0" + cores: "4" + mem: "4000" + time: "0-02:00:00" +""".format(n=RRBS_NAME, r=RESOURCES_KEY, d=DEF_RES).splitlines(True) + + +PROTOMAP = {RRBS_NAME: RRBS_KEY, WGBS_NAME: WGBS_KEY, "EG": WGBS_KEY} +IFACE_LINES = {WGBS_KEY: WGBS_IFACE_LINES, RRBS_KEY: RRBS_IFACE_LINES} + +RNASEQ = "RNA-seq" +KALLISTO_ABUNDANCES_KEY = "abundances" +KALLISTO_ABUNDANCES_TEMPLATE = "{sample.name}_isoforms.txt" + + +def pytest_generate_tests(metafunc): + """ Test case generation and parameterization for this module. """ + skip_empty_flag = "skip_sample_less" + if skip_empty_flag in metafunc.fixturenames: + metafunc.parametrize(skip_empty_flag, [False, True]) + + +def augmented_metadata(metadata, extra=None): + """ Augment base metadata with additional data. """ + assert METADATA_KEY not in metadata, \ + "Found {k} in metadata argument itself; pass just the data/values to " \ + "use as {k}, not the whole mapping".format(k=METADATA_KEY) + m = AttMap({METADATA_KEY: BASE_META}) + m[METADATA_KEY] = m[METADATA_KEY].add_entries(metadata) + return m.add_entries(extra or {}).to_map() + + +def get_conf_data(req): + """ + Get Project config data for a test case. + + :param pytest.FixtureRequest req: test case requesting Project config data + :return dict: Project config data + """ + m = {key: req.getfixturevalue(fix) for fix, key + in SECTION_BY_FIXTURE.items() if fix in req.fixturenames} + return m + + +@pytest.fixture(scope="function") +def prj(request, tmpdir): + """ Provide a test case with a Project instance. """ + conf_file = tmpdir.join(randconf()).strpath + return _write_and_build_prj(conf_file, conf_data=get_conf_data(request)) + + +@pytest.mark.parametrize(MAIN_META_KEY, [BASE_META]) +def test_no_pifaces(prj, main_meta): + """ No pipeline interfaces --> the outputs data mapping is empty.""" + assert {} == prj.get_outputs() + + +@pytest.mark.parametrize("name_cfg_file", [randconf()]) +@pytest.mark.parametrize("ifaces", [ + [{WGBS_KEY: WGBS_IFACE_LINES}], [{RRBS_KEY: RRBS_IFACE_LINES}], + [{WGBS_KEY: WGBS_IFACE_LINES}, {RRBS_KEY: RRBS_IFACE_LINES}]]) +def test_no_outputs(tmpdir, name_cfg_file, ifaces, skip_sample_less): + """ Pipeline interfaces without outputs --> no Project outputs """ + cfg = tmpdir.join(name_cfg_file).strpath + iface_paths = [tmpdir.join(randconf()).strpath for _ in ifaces] + rep_paths = count_repeats(iface_paths) + assert [] == rep_paths, "Repeated temp filepath(s): {}".format(rep_paths) + for data, path in zip(ifaces, iface_paths): + with open(path, 'w') as f: + yaml.dump(data, f) + md = deepcopy(BASE_META) + md[PIPELINE_INTERFACES_KEY] = iface_paths + + # DEBUG + print("Metadata: {}".format(md)) + + for path, data in zip(iface_paths, ifaces): + _write_iface_file(path, data) + prj = _write_and_build_prj(cfg, {METADATA_KEY: md}) + assert {} == prj.get_outputs(skip_sample_less) + + +@pytest.mark.parametrize("name_cfg_file", [randconf()]) +@pytest.mark.parametrize(["ifaces", "prot_pool"], [ + ([{WGBS_KEY: WGBS_IFACE_LINES}], [WGBS_NAME]), + ([{RRBS_KEY: RRBS_IFACE_LINES}], [RRBS_NAME]), + ([{WGBS_KEY: WGBS_IFACE_LINES}, {RRBS_KEY: RRBS_IFACE_LINES}], + [WGBS_NAME, RRBS_NAME])]) +@pytest.mark.parametrize("declared_outputs", [None, ["out1", "out2"]]) +def test_malformed_outputs( + tmpdir, name_cfg_file, ifaces, prot_pool, + declared_outputs, skip_sample_less): + """ Invalid outputs declaration format is exceptional. """ + + cfg = tmpdir.join(name_cfg_file).strpath + + iface_paths = [tmpdir.join(randconf()).strpath for _ in ifaces] + rep_paths = count_repeats(iface_paths) + assert [] == rep_paths, "Repeated temp filepath(s): {}".format(rep_paths) + + for data, path in zip(ifaces, iface_paths): + with open(path, 'w') as f: + yaml.dump(data, f) + md = deepcopy(BASE_META) + md[PIPELINE_INTERFACES_KEY] = iface_paths + + anns_file = tmpdir.join("anns.csv").strpath + assert not os.path.exists(anns_file) + sample_protos = [random.choice(prot_pool) for _ in range(10)] + sample_names = [randstr(string.ascii_letters, 20) for _ in sample_protos] + repeated_sample_names = count_repeats(sample_names) + assert [] == repeated_sample_names, \ + "Repeated sample names: {}".format(repeated_sample_names) + anns_data = [(SAMPLE_NAME_COLNAME, ASSAY_KEY)] + \ + list(zip(sample_names, sample_protos)) + with open(anns_file, 'w') as f: + f.write("\n".join("{0},{1}".format(*pair) for pair in anns_data)) + md[SAMPLE_ANNOTATIONS_KEY] = anns_file + + # DEBUG + print("Metadata: {}".format(md)) + + keyed_outputs = {pk: declared_outputs for pk in + [k for pi in ifaces for k in pi.keys()]} + for path, data in zip(iface_paths, ifaces): + _write_iface_file(path, data, outputs_by_pipe_key=keyed_outputs) + prj = _write_and_build_prj(cfg, {METADATA_KEY: md}) + print("TABLE below:\n{}".format(prj.sample_table)) + with pytest.raises(AttributeError): + # Should fail on .items() call during outputs determination. + print("Outputs: {}".format(prj.get_outputs(skip_sample_less))) + + +@pytest.mark.parametrize("ifaces", [ + [{WGBS_KEY: WGBS_IFACE_LINES}], [{RRBS_KEY: RRBS_IFACE_LINES}], + [{WGBS_KEY: WGBS_IFACE_LINES}, {RRBS_KEY: RRBS_IFACE_LINES}]]) +@pytest.mark.parametrize("declared_outputs", + [{n: DECLARED_OUTPUTS for n in [RRBS_NAME, WGBS_NAME]}]) +def test_only_subproject_has_outputs(tmpdir, ifaces, declared_outputs): + """ Activation state affects status of Project's outputs. """ + + cfg = tmpdir.join(randconf()).strpath + + iface_paths = [tmpdir.join(randconf()).strpath for _ in ifaces] + assert [] == count_repeats(iface_paths), \ + "Repeated temp filepath(s): {}".format(count_repeats(iface_paths)) + + for data, path in zip(ifaces, iface_paths): + with open(path, 'w') as f: + yaml.dump(data, f) + md = deepcopy(BASE_META) + md[PIPELINE_INTERFACES_KEY] = iface_paths + + sp_ifaces_paths = [tmpdir.join(randconf()).strpath for _ in ifaces] + assert [] == count_repeats(sp_ifaces_paths), \ + "Repeated temp filepath(s): {}".format(count_repeats(sp_ifaces_paths)) + iface_path_intersect = set(sp_ifaces_paths) & set(iface_paths) + assert set() == iface_path_intersect, \ + "Nonempty main/subs iface path intersection: {}".\ + format(", ".join(iface_path_intersect)) + + # DEBUG + print("Metadata: {}".format(md)) + + used_iface_keys = set(itertools.chain(*[pi.keys() for pi in ifaces])) + keyed_outputs = {pk: declared_outputs[PROTO_NAMES[pk]] + for pk in used_iface_keys} + for path, data in zip(iface_paths, ifaces): + _write_iface_file(path, data) + for path, data in zip(sp_ifaces_paths, ifaces): + _write_iface_file(path, data, outputs_by_pipe_key=keyed_outputs) + + sp_name = "testing_subproj" + prj = _write_and_build_prj(cfg, { + METADATA_KEY: md, + SUBPROJECTS_SECTION: { + sp_name: { + METADATA_KEY: { + PIPELINE_INTERFACES_KEY: sp_ifaces_paths + } + } + } + }) + + # DEBUG + print("TABLE below:\n{}".format(prj.sample_table)) + + assert len(prj.get_outputs(False)) == 0 + assert {} == prj.get_outputs(False) + prj.activate_subproject(sp_name) + assert len(prj.get_outputs(False)) > 0 + exp = {pipe_name: {k: (v, []) for k, v in outs.items()} + for pipe_name, outs in declared_outputs.items() + if pipe_name in {PROTO_NAMES[k] for k in used_iface_keys}} + assert exp == prj.get_outputs(False) + + +@pytest.mark.parametrize("ifaces", [ + [{WGBS_KEY: WGBS_IFACE_LINES}], [{RRBS_KEY: RRBS_IFACE_LINES}], + [{WGBS_KEY: WGBS_IFACE_LINES}, {RRBS_KEY: RRBS_IFACE_LINES}]]) +@pytest.mark.parametrize("declared_outputs", + [{n: DECLARED_OUTPUTS for n in [RRBS_NAME, WGBS_NAME]}]) +def test_only_main_project_has_outputs(tmpdir, ifaces, declared_outputs): + """ Activation state affects status of Project's outputs. """ + + cfg = tmpdir.join(randconf()).strpath + + iface_paths = [tmpdir.join(randconf()).strpath for _ in ifaces] + assert [] == count_repeats(iface_paths), \ + "Repeated temp filepath(s): {}".format(count_repeats(iface_paths)) + + for data, path in zip(ifaces, iface_paths): + with open(path, 'w') as f: + yaml.dump(data, f) + md = deepcopy(BASE_META) + md[PIPELINE_INTERFACES_KEY] = iface_paths + + sp_ifaces_paths = [tmpdir.join(randconf()).strpath for _ in ifaces] + assert [] == count_repeats(sp_ifaces_paths), \ + "Repeated temp filepath(s): {}".format(count_repeats(sp_ifaces_paths)) + iface_path_intersect = set(sp_ifaces_paths) & set(iface_paths) + assert set() == iface_path_intersect, \ + "Nonempty main/subs iface path intersection: {}". \ + format(", ".join(iface_path_intersect)) + + # DEBUG + print("Metadata: {}".format(md)) + + used_iface_keys = set(itertools.chain(*[pi.keys() for pi in ifaces])) + keyed_outputs = {pk: declared_outputs[PROTO_NAMES[pk]] + for pk in used_iface_keys} + for path, data in zip(iface_paths, ifaces): + _write_iface_file(path, data, outputs_by_pipe_key=keyed_outputs) + for path, data in zip(sp_ifaces_paths, ifaces): + _write_iface_file(path, data) + + sp_name = "testing_subproj" + prj = _write_and_build_prj(cfg, { + METADATA_KEY: md, + SUBPROJECTS_SECTION: { + sp_name: { + METADATA_KEY: { + PIPELINE_INTERFACES_KEY: sp_ifaces_paths + } + } + } + }) + + # DEBUG + print("TABLE below:\n{}".format(prj.sample_table)) + + assert len(prj.get_outputs(False)) > 0 + exp = {pipe_name: {k: (v, []) for k, v in outs.items()} + for pipe_name, outs in declared_outputs.items() + if pipe_name in {PROTO_NAMES[k] for k in used_iface_keys}} + assert exp == prj.get_outputs(False) + prj.activate_subproject(sp_name) + assert len(prj.get_outputs(False)) == 0 + assert {} == prj.get_outputs(False) + + +def test_multiple_project_units_have_declare_interfaces_with_outputs(tmpdir): + """ Activation state affects status of Project's outputs. """ + + # Generate config filepaths. + iface_paths = set() + while len(iface_paths) < 3: + iface_paths.add(tmpdir.join(randconf()).strpath) + iface_paths = list(iface_paths) + + # Collect the Project config data. + main_iface_file, sp_iface_files = iface_paths[0], iface_paths[1:] + sp_files = dict(zip(["sp1", "sp2"], sp_iface_files)) + prj_dat = { + METADATA_KEY: { + OUTDIR_KEY: tmpdir.strpath, + PIPELINE_INTERFACES_KEY: main_iface_file + }, + SUBPROJECTS_SECTION: {n: {METADATA_KEY: {PIPELINE_INTERFACES_KEY: f}} + for n, f in sp_files.items()} + } + + # Generate Project config filepath and create Project. + conf_file = make_temp_file_path(folder=tmpdir.strpath, known=iface_paths) + for f, (lines_spec, outs_spec) in zip( + iface_paths, + [({WGBS_KEY: WGBS_IFACE_LINES}, {WGBS_KEY: DECLARED_OUTPUTS}), + ({RRBS_KEY: RRBS_IFACE_LINES}, {RRBS_KEY: DECLARED_OUTPUTS}), + ({WGBS_KEY: WGBS_IFACE_LINES, RRBS_KEY: RRBS_IFACE_LINES}, + {WGBS_KEY: DECLARED_OUTPUTS, RRBS_KEY: DECLARED_OUTPUTS})]): + _write_iface_file(f, lines_group_by_pipe_key=lines_spec, + outputs_by_pipe_key=outs_spec) + + prj = _write_and_build_prj(conf_file, prj_dat) + + # DEBUG + print("TMPDIR contents:\n{}".format("\n".join( + os.path.join(tmpdir.strpath, f) for f in os.listdir(tmpdir.strpath)))) + + def observe(p): + return p.get_outputs(False) + + def extract_just_path_template(out_res): + return {pipe_name: {k: v for k, (v, _) in outs.items()} + for pipe_name, outs in out_res.items()} + + assert {WGBS_NAME: DECLARED_OUTPUTS} == extract_just_path_template(observe(prj)) + prj.activate_subproject("sp1") + assert {RRBS_NAME: DECLARED_OUTPUTS} == extract_just_path_template(observe(prj)) + prj.activate_subproject("sp2") + assert {pn: DECLARED_OUTPUTS for pn in [WGBS_NAME, RRBS_NAME]} == \ + extract_just_path_template(observe(prj)) + + +@pytest.mark.parametrize("noskip", [False, True]) +@pytest.mark.parametrize("protocols", + [[], [random.choice(["INVALID", "NULL"]) for _ in range(10)]]) +@pytest.mark.parametrize("declared_outputs", + [{n: DECLARED_OUTPUTS for n in [RRBS_NAME, WGBS_NAME]}]) +def test_no_samples_match_protocols_with_outputs( + tmpdir, noskip, protocols, declared_outputs): + """ get_outputs behavior is sensitive to protocol match and skip flag. """ + temproot = tmpdir.strpath + path_iface_file = tmpdir.join(randconf()).strpath + prj_cfg = make_temp_file_path(folder=temproot, known=[path_iface_file]) + prj_dat = { + METADATA_KEY: { + OUTDIR_KEY: temproot, + PIPELINE_INTERFACES_KEY: path_iface_file + } + } + if protocols: + anns_file = make_temp_file_path( + folder=temproot, known=[path_iface_file, prj_cfg]) + anns_data = [("sample{}".format(i), p) for i, p in enumerate(protocols)] + with open(anns_file, 'w') as f: + for n, p in [(SAMPLE_NAME_COLNAME, ASSAY_KEY)] + anns_data: + f.write("{},{}\n".format(n, p)) + prj_dat[METADATA_KEY][SAMPLE_ANNOTATIONS_KEY] = anns_file + _write_iface_file( + path_iface_file, {WGBS_KEY: WGBS_IFACE_LINES, RRBS_KEY: RRBS_IFACE_LINES}, + outputs_by_pipe_key={PROTOMAP[n]: DECLARED_OUTPUTS for n in declared_outputs.keys()}) + prj = _write_and_build_prj(prj_cfg, prj_dat) + exp = { + pipe_name: { + path_key: (path_temp, []) + for path_key, path_temp in decl_outs.items()} + for pipe_name, decl_outs in declared_outputs.items() + } if noskip else {} + assert exp == prj.get_outputs(not noskip) + + +@pytest.mark.parametrize("protomap", [None, PROTOMAP]) +@pytest.mark.parametrize("include_outputs", [False, True]) +def test_pipeline_identifier_collision_same_data(tmpdir, protomap, include_outputs): + """ Interface data that differs from another with same identifier is unexceptional. """ + + temproot = tmpdir.strpath + lines_groups = {WGBS_KEY: WGBS_IFACE_LINES, RRBS_KEY: RRBS_IFACE_LINES} + outputs = {k: DECLARED_OUTPUTS for k in lines_groups.keys()} \ + if include_outputs else None + + def write_iface(f, pm): + _write_iface_file(f, lines_groups, outputs, pm) + + iface_file_1 = os.path.join(temproot, "piface1.yaml") + write_iface(iface_file_1, protomap) + iface_file_2 = os.path.join(temproot, "piface2.yaml") + write_iface(iface_file_2, protomap) + + prj_dat = { + METADATA_KEY: { + OUTDIR_KEY: tmpdir.strpath, + PIPELINE_INTERFACES_KEY: [iface_file_1, iface_file_2] + } + } + prj = _write_and_build_prj(os.path.join(temproot, "pc.yaml"), prj_dat) + exp = {n: {k: (v, []) for k, v in DECLARED_OUTPUTS.items()} + for n in [WGBS_NAME, RRBS_NAME]} if include_outputs else {} + assert exp == prj.get_outputs(skip_sample_less=False) + + +@pytest.mark.parametrize("protomap", [None, PROTOMAP]) +@pytest.mark.parametrize("include_outputs", [False, True]) +@pytest.mark.parametrize("rep_key", [WGBS_KEY, RRBS_KEY]) +def test_pipeline_identifier_collision_different_data( + tmpdir, include_outputs, protomap, skip_sample_less, rep_key): + """ Interface data that differs from another with same identifier is exceptional. """ + temproot = tmpdir.strpath + + def write_iface(f, lines_group): + out_by_key = {k: DECLARED_OUTPUTS for k in lines_group} \ + if include_outputs else None + _write_iface_file(f, lines_group, out_by_key, pm=protomap) + + iface_file_1 = os.path.join(temproot, "piface1.yaml") + write_iface(iface_file_1, {rep_key: WGBS_IFACE_LINES}) + iface_file_2 = os.path.join(temproot, "piface2.yaml") + write_iface(iface_file_2, {rep_key: RRBS_IFACE_LINES}) + + def observe(): + prj_cfg = os.path.join(temproot, "pc.yaml") + prj_dat = { + METADATA_KEY: { + OUTDIR_KEY: tmpdir.strpath, + PIPELINE_INTERFACES_KEY: [iface_file_1, iface_file_2] + } + } + return _write_and_build_prj(prj_cfg, prj_dat).get_outputs(skip_sample_less) + + try: + observe() + except Exception as e: + pytest.fail("Unexpected exception: {}".format(e)) + + write_iface(iface_file_1, {rep_key: WGBS_IFACE_LINES[1:]}) + write_iface(iface_file_2, {rep_key: RRBS_IFACE_LINES[1:]}) + + # DEBUG + def print_iface(fp): + with open(fp, 'r') as f: + return yaml.load(f, yaml.SafeLoader) + + # DEBUG + print("First interface contents (below):\n{}\n".format(print_iface(iface_file_1))) + print("Second interface contents (below):\n{}".format(print_iface(iface_file_2))) + + with pytest.raises(DuplicatePipelineKeyException): + observe() + + +def test_sample_collection_accuracy(tmpdir, skip_sample_less, rna_pi_lines): + """ Names of samples collected for each pipeline are as expected. """ + temproot = tmpdir.strpath + samples = [("sampleA", WGBS_NAME), ("sample2", "HiChIP"), + ("sampleC", RNASEQ), ("sample4", "ATAC"), + ("sampleE", WGBS_NAME), ("sample6", "HiChIP"), + ("sampleG", RNASEQ), ("sample8", "ATAC")] + iface_files = list(get_temp_paths(2, temproot)) + anns_file = make_temp_file_path( + temproot, iface_files, + generate=lambda: "".join(randstr(LETTERS_AND_DIGITS, 20)) + ".csv") + with open(anns_file, 'w') as f: + f.write("\n".join("{},{}".format(*pair) for pair in + [(SAMPLE_NAME_COLNAME, ASSAY_KEY)] + samples)) + _write_iface_file( + iface_files[0], + lines_group_by_pipe_key={WGBS_KEY: WGBS_IFACE_LINES}, + outputs_by_pipe_key={WGBS_KEY: DECLARED_OUTPUTS}, pm=PROTOMAP) + with open(iface_files[1], 'w') as f: + for l in rna_pi_lines: + f.write(l) + prj_dat = { + METADATA_KEY: { + SAMPLE_ANNOTATIONS_KEY: anns_file, + OUTDIR_KEY: tmpdir.strpath, + PIPELINE_INTERFACES_KEY: iface_files + } + } + prj_cfg = make_temp_file_path(temproot, iface_files + [anns_file]) + prj = _write_and_build_prj(prj_cfg, prj_dat) + exp = { + WGBS_NAME: { + k: (v, [sn for sn, pn in samples if pn == WGBS_NAME]) + for k, v in DECLARED_OUTPUTS.items() + }, + RNA_PIPES["kallisto"].name: { + KALLISTO_ABUNDANCES_KEY: ( + KALLISTO_ABUNDANCES_TEMPLATE, + [sn for sn, prot in samples if prot == RNASEQ] + ) + } + } + assert exp == prj.get_outputs(skip_sample_less) + + +def get_temp_paths(n, folder, known=None, generate=randconf): + """ + Generate unique tempfile paths pointing to within a particular folder. + + :param int n: number of paths to generate + :param str folder: path to folder into which randomly generated filepaths + should point + :param Iterable[str] known: collection of filepaths to prohibit a + match to for a newly generated path + :param function() -> str generate: how to randomly generate a filename + :return Iterable[str]: collection of unique tempfile paths pointing to + within a particular folder. + """ + paths = set() + known = set(known or []) + gen = lambda pool: make_temp_file_path(folder, pool, generate) + while len(paths) < n: + p = gen(known) + known.add(p) + paths.add(p) + return paths + + +def make_temp_file_path(folder, known, generate=randconf): + """ + Generate a new tempfile path. + + :param str folder: path to folder that represents parent of path to + generate, i.e. the path to the folder to which a randomized filename + is to be joined + :param Iterable[str] known: collection of current filePATHs + :param function() -> str generate: how to generate fileNAME + :return str: randomly generated filepath that doesn't match a known value + """ + while True: + fp = os.path.join(folder, generate()) + if fp not in known: + return fp + + +def _write_and_build_prj(conf_file, conf_data): + """ + Write Project config data and create the instance. + + :param str conf_file: path to file to write + :param Mapping conf_data: Project config data + :return looper.Project: new Project instance + """ + with open(conf_file, 'w') as f: + yaml.dump(conf_data, f) + return LP(conf_file) + + +def _write_iface_file( + path_iface_file, lines_group_by_pipe_key, + outputs_by_pipe_key=None, pm=None): + """ + Write a pipeline interface file. + + :param str path_iface_file: path to the file to write + :param Mapping[str, Iterable[str]] lines_group_by_pipe_key: binding between + pipeline key and collection of lines that encode its specific + configuration data + :param Mapping[str, Mapping[str, str]] outputs_by_pipe_key: binding between + pipeline key and mapping from output type/kind name to path template + :param Mapping[str, str] pm: protocol mapping + :return str: path to the file written + """ + + folder = os.path.dirname(path_iface_file) + temps = [os.path.join(folder, randconf()) for _ in lines_group_by_pipe_key] + + def read_iface_data(fp, lines): + with open(fp, 'w') as f: + for l in lines: + f.write(l) + try: + with open(fp, 'r') as f: + return yaml.load(f, yaml.SafeLoader) + except yaml.scanner.ScannerError: + with open(fp, 'r') as f: + for l in f.readlines(): + print(l) + raise + + outputs_by_pipe_key = outputs_by_pipe_key or dict() + + dat_by_key = { + k: read_iface_data(tf, lines_group) for tf, (k, lines_group) + in zip(temps, lines_group_by_pipe_key.items())} + for k, outs in outputs_by_pipe_key.items(): + if k not in dat_by_key: + continue + dat_by_key[k][OUTKEY] = outs + + data = {PROTOMAP_KEY: pm or PROTOMAP, PL_KEY: dat_by_key} + with open(path_iface_file, 'w') as f: + yaml.dump(data, f) + + return path_iface_file + + +class PipeSpec(object): + """ Pipeline key and name """ + def __init__(self, key, name=None): + assert "" != os.path.splitext(key)[1] + self.key = key + self.name = name or key.rstrip(".py") + + +RNA_PIPES = {"kallisto": PipeSpec("rnaKallisto.py"), + "tophat": PipeSpec("rnaTopHat.py"), + "bitseq": PipeSpec("rnaBitSeq.py")} + + +@pytest.fixture(scope="function") +def rna_pi_lines(): + return """protocol_mapping: + {rnaseq_proto_name}: [{bs_name}, {kall_name}, {th_name}] + SMART: [{bs_name}, {th_name}] + +pipelines: + {bs_key}: + name: {bs_name} + path: src/rnaBitSeq.py + arguments: + "--sample-name": sample_name + "--genome": transcriptome + "--input": data_source + "--single-or-paired": read_type + required_input_files: [data_source] + ngs_input_files: [data_source] + {res}: + {dr}: + file_size: "0" + cores: "6" + mem: "36000" + time: "2-00:00:00" + large: + file_size: "4" + cores: "6" + mem: "44000" + time: "2-00:00:00" + + {th_key}: + name: {th_name} + path: src/rnaTopHat.py + required_input_files: [data_source] + ngs_input_files: [data_source] + arguments: + "--sample-name": sample_name + "--genome": genome + "--input": data_source + "--single-or-paired": read_type + {res}: + {dr}: + file_size: "0" + cores: "2" + mem: "60000" + time: "7-00:00:00" + + {kall_key}: + name: {kall_name} + path: src/rnaKallisto.py + required_input_files: [data_source] + ngs_input_files: [data_source] + arguments: + "--sample-yaml": yaml_file + "--sample-name": sample_name + "--input": data_source + "--single-or-paired": read_type + optional_arguments: + "--input2": read2 + "--fragment-length": fragment_length + "--fragment-length-sdev": fragment_length_sdev + outputs: + {abundances_key}: \"{abundances_val}\" + {res}: + {dr}: + cores: "2" + mem: "4000" + time: "0-6:00:00" + normal: + min_file_size: "3" + cores: "2" + mem: "8000" + time: "0-12:00:00" +""".format( + res=RESOURCES_KEY, dr=DEF_RES, rnaseq_proto_name=RNASEQ, + bs_key=RNA_PIPES["bitseq"].key, bs_name=RNA_PIPES["bitseq"].name, + th_key=RNA_PIPES["tophat"].key, th_name=RNA_PIPES["tophat"].name, + kall_key=RNA_PIPES["kallisto"].key, kall_name=RNA_PIPES["kallisto"].name, + abundances_key=KALLISTO_ABUNDANCES_KEY, + abundances_val=KALLISTO_ABUNDANCES_TEMPLATE).splitlines(True) diff --git a/tests/models/conftest.py b/tests/models/conftest.py index f672dce10..e5ff9f229 100644 --- a/tests/models/conftest.py +++ b/tests/models/conftest.py @@ -8,20 +8,18 @@ from collections import Iterable, Mapping else: from collections.abc import Iterable, Mapping - import pandas as pd import pytest import yaml - -from peppy import DEFAULT_COMPUTE_RESOURCES_NAME, METADATA_KEY, \ - NAME_TABLE_ATTR, SAMPLE_NAME_COLNAME +from divvy import DEFAULT_COMPUTE_RESOURCES_NAME +from looper.pipeline_interface import PROTOMAP_KEY, RESOURCES_KEY +from peppy import METADATA_KEY, NAME_TABLE_ATTR, SAMPLE_NAME_COLNAME __author__ = "Vince Reuter" __email__ = "vreuter@virginia.edu" - ATAC_PROTOCOL_NAME = "ATAC" CONFIG_FILENAME = "test-proj-conf.yaml" @@ -56,7 +54,6 @@ "time": "30-00:00:00", "partition": "longq"} - def pytest_generate_tests(metafunc): """ Conditional customization of test cases in this directory. """ try: @@ -73,7 +70,6 @@ def pytest_generate_tests(metafunc): {"name": "sans-path"})]) - ATACSEQ_IFACE_WITHOUT_RESOURCES = { "name": "ATACseq", "looper_args": True, @@ -95,14 +91,12 @@ def pytest_generate_tests(metafunc): } - @pytest.fixture(scope="function") def atac_pipe_name(): """ Oft-used as filename for pipeline module and PipelineInterface key. """ return "ATACSeq.py" - @pytest.fixture(scope="function") def atacseq_iface_with_resources(resources): """ @@ -114,11 +108,10 @@ def atacseq_iface_with_resources(resources): of the base sections plus resources section """ iface_data = copy.deepcopy(ATACSEQ_IFACE_WITHOUT_RESOURCES) - iface_data["resources"] = copy.deepcopy(resources) + iface_data[RESOURCES_KEY] = copy.deepcopy(resources) return iface_data - @pytest.fixture(scope="function") def atacseq_piface_data(atac_pipe_name): """ @@ -131,7 +124,6 @@ def atacseq_piface_data(atac_pipe_name): return {atac_pipe_name: copy.deepcopy(ATACSEQ_IFACE_WITHOUT_RESOURCES)} - @pytest.fixture(scope="function") def basic_data_raw(): return copy.deepcopy( @@ -139,7 +131,6 @@ def basic_data_raw(): "Sample": {SAMPLE_NAME_COLNAME: "arbitrary-sample"}}) - @pytest.fixture(scope="function") def basic_instance_data(request, instance_raw_data): """ @@ -154,21 +145,19 @@ def basic_instance_data(request, instance_raw_data): # Cleanup is free with _write_config, using request's temp folder. transformation_by_class = { "PathExAttMap": lambda data: data, - "PipelineInterface": lambda data: - _write_config(data, request, "pipeline_interface.yaml"), + "PipelineInterface": lambda data: _write_config( + data, request, "pipeline_interface.yaml"), "Sample": lambda data: pd.Series(data)} which_class = request.getfixturevalue("class_name") return transformation_by_class[which_class](instance_raw_data) - @pytest.fixture(scope="function") def default_resources(): """ Provide test case with default PipelineInterface resources section. """ return copy.deepcopy(DEFAULT_RESOURCES) - @pytest.fixture(scope="function") def env_config_filepath(tmpdir): """ Write default project/compute environment file for Project ctor. """ @@ -177,14 +166,12 @@ def env_config_filepath(tmpdir): return conf_file.strpath - @pytest.fixture(scope="function") def huge_resources(): """ Provide non-default resources spec. section for PipelineInterface. """ return copy.deepcopy(HUGE_RESOURCES) - @pytest.fixture(scope="function") def instance_raw_data(request, basic_data_raw, atacseq_piface_data): """ Supply the raw data for a basic model instance as a fixture. """ @@ -249,7 +236,6 @@ def path_config_file(request, tmpdir, atac_pipe_name): conf_data=conf_data, dirpath=tmpdir.strpath) - @pytest.fixture(scope="function") def path_proj_conf_file(tmpdir, proj_conf): """ Write basic project configuration data and provide filepath. """ @@ -259,7 +245,6 @@ def path_proj_conf_file(tmpdir, proj_conf): return conf_path - @pytest.fixture(scope="function") def path_anns_file(request, tmpdir, sample_sheet): """ Write basic annotations, optionally using a different delimiter. """ @@ -273,7 +258,6 @@ def path_anns_file(request, tmpdir, sample_sheet): return filepath - @pytest.fixture(scope="function") def piface_config_bundles(request, resources): """ @@ -297,17 +281,16 @@ def piface_config_bundles(request, resources): elif isinstance(iface_config_datas, Iterable): data_bundles = iface_config_datas else: - raise TypeError("Expected mapping or list collection of " - "PipelineInterface data: {} ({})".format( - iface_config_datas, type(iface_config_datas))) - resource_specification = request.getfixturevalue("resources") \ - if "resources" in request.fixturenames else resources + raise TypeError( + "Expected mapping or list collection of PipelineInterface data: {} " + "({})".format(iface_config_datas, type(iface_config_datas))) + resource_specification = request.getfixturevalue(RESOURCES_KEY) \ + if RESOURCES_KEY in request.fixturenames else resources for config_bundle in data_bundles: config_bundle.update(resource_specification) return iface_config_datas - @pytest.fixture(scope="function") def resources(): """ Basic PipelineInterface compute resources data. """ @@ -315,7 +298,6 @@ def resources(): "huge": copy.copy(HUGE_RESOURCES)} - def write_config_data(protomap, conf_data, dirpath): """ Write PipelineInterface data to (temp)file. @@ -327,14 +309,13 @@ def write_config_data(protomap, conf_data, dirpath): file to write :return str: path to the (temp)file written """ - full_conf_data = {"protocol_mapping": protomap, "pipelines": conf_data} + full_conf_data = {PROTOMAP_KEY: protomap, "pipelines": conf_data} filepath = os.path.join(dirpath, "pipeline_interface.yaml") with open(filepath, 'w') as conf_file: yaml.safe_dump(full_conf_data, conf_file) return filepath - def _write_config(data, request, filename): """ Write configuration data to file. diff --git a/tests/models/test_PipelineInterface.py b/tests/models/test_PipelineInterface.py index fdcb0ab25..71a0eba71 100644 --- a/tests/models/test_PipelineInterface.py +++ b/tests/models/test_PipelineInterface.py @@ -13,14 +13,17 @@ import yaml from attmap import PathExAttMap -from looper.pipeline_interface import PipelineInterface, PL_KEY, PROTOMAP_KEY +from divvy import DEFAULT_COMPUTE_RESOURCES_NAME +from looper.const import * +from looper.pipeline_interface import PipelineInterface, PL_KEY, PROTOMAP_KEY, \ + RESOURCES_KEY from looper.project import Project from looper.exceptions import InvalidResourceSpecificationException, \ MissingPipelineConfigurationException, PipelineInterfaceConfigError -from peppy import Project, Sample, \ - DEFAULT_COMPUTE_RESOURCES_NAME, SAMPLE_ANNOTATIONS_KEY, SAMPLE_NAME_COLNAME +from peppy import Project, Sample +from peppy.const import * from .conftest import ATAC_PROTOCOL_NAME, write_config_data -from tests.helpers import powerset +from ubiquerg import powerset __author__ = "Vince Reuter" @@ -77,7 +80,7 @@ def pi_with_resources(request, bundled_piface, resources): rp_data[file_size_name] = size pipe_iface_config = PipelineInterface(bundled_piface) for pipe_data in pipe_iface_config.pipelines.values(): - pipe_data["resources"] = resources + pipe_data[RESOURCES_KEY] = resources return pipe_iface_config @@ -182,7 +185,7 @@ def test_unconfigured_pipeline_exception( if not use_resources: for pipeline in pi.pipelines.values(): try: - del pipeline["resources"][DEFAULT_COMPUTE_RESOURCES_NAME] + del pipeline[RESOURCES_KEY][DEFAULT_COMPUTE_RESOURCES_NAME] except KeyError: # Already no default resource package. pass @@ -256,11 +259,11 @@ def test_requires_default( pi = pi_with_resources for name, pipeline in pi.iterpipes(): try: - del pipeline["resources"][DEFAULT_COMPUTE_RESOURCES_NAME] + del pipeline[RESOURCES_KEY][DEFAULT_COMPUTE_RESOURCES_NAME] except KeyError: # Already no default resource package. pass - assert "default" not in pipeline["resources"] + assert "default" not in pipeline[RESOURCES_KEY] with pytest.raises(InvalidResourceSpecificationException): pi.choose_resource_package( name, file_size=huge_resources["file_size"] + 1) @@ -281,7 +284,7 @@ def test_resources_not_required( """ Compute resource specification is optional. """ pi = pi_with_resources for pipe_data in pi.pipelines.values(): - del pipe_data["resources"] + del pipe_data[RESOURCES_KEY] for pipe_name in pi.pipeline_names: assert {} == pi.choose_resource_package(pipe_name, int(file_size)) assert {} == pi.choose_resource_package(pipe_name, float(file_size)) @@ -295,13 +298,13 @@ def test_selects_proper_resource_package( file_size, expected_package_name, midsize_resources): """ Minimal resource package sufficient for pipeline and file size. """ for pipe_data in pi_with_resources.pipelines.values(): - pipe_data["resources"].update( + pipe_data[RESOURCES_KEY].update( {"midsize": copy.deepcopy(midsize_resources)}) for pipe_name, pipe_data in pi_with_resources.iterpipes(): observed_package = pi_with_resources.choose_resource_package( pipe_name, file_size) expected_package = copy.deepcopy( - pipe_data["resources"][expected_package_name]) + pipe_data[RESOURCES_KEY][expected_package_name]) assert expected_package == observed_package def test_negative_file_size_prohibited( @@ -309,7 +312,7 @@ def test_negative_file_size_prohibited( """ Negative min file size in resource package spec is prohibited. """ file_size_attr = "min_file_size" if use_new_file_size else "file_size" for pipe_data in pi_with_resources.pipelines.values(): - for package_data in pipe_data["resources"].values(): + for package_data in pipe_data[RESOURCES_KEY].values(): package_data[file_size_attr] = -5 * random.random() for pipe_name in pi_with_resources.pipeline_names: file_size_request = random.randrange(1, 11) @@ -342,13 +345,13 @@ def clear_file_size(resource_package): # Add resource package spec data and create PipelineInterface. pipe_iface_data = copy.deepcopy(bundled_piface) for pipe_data in pipe_iface_data[PL_KEY].values(): - pipe_data["resources"] = resources_data + pipe_data[RESOURCES_KEY] = resources_data pi = PipelineInterface(pipe_iface_data) # We should always get default resource package for mini file. for pipe_name, pipe_data in pi.iterpipes(): default_resource_package = \ - pipe_data["resources"][DEFAULT_COMPUTE_RESOURCES_NAME] + pipe_data[RESOURCES_KEY][DEFAULT_COMPUTE_RESOURCES_NAME] clear_file_size(default_resource_package) assert default_resource_package == \ pi.choose_resource_package(pipe_name, 0.001) @@ -361,7 +364,7 @@ def test_default_package_new_name_zero_size( for pipe_name, pipe_data in pi_with_resources.iterpipes(): # Establish faulty default package setting for file size. - default_resource_package = pipe_data["resources"]["default"] + default_resource_package = pipe_data[RESOURCES_KEY]["default"] if use_new_file_size: if "file_size" in default_resource_package: del default_resource_package["file_size"] @@ -403,7 +406,7 @@ def test_file_size_spec_required_for_non_default_packages( # Create the PipelineInterface. for pipe_data in bundled_piface[PL_KEY].values(): - pipe_data["resources"] = resource_package_data + pipe_data[RESOURCES_KEY] = resource_package_data pi = PipelineInterface(bundled_piface) # Attempt to select resource package should fail for each pipeline, @@ -625,9 +628,13 @@ class GenericProtocolMatchTests: @pytest.fixture def prj_data(self): """ Provide basic Project data. """ - return {"metadata": {"output_dir": "output", - "results_subdir": "results_pipeline", - "submission_subdir": "submission"}} + return { + METADATA_KEY: { + OUTDIR_KEY: "output", + RESULTS_SUBDIR_KEY: "results_pipeline", + SUBMISSION_SUBDIR_KEY: "submission" + } + } @pytest.fixture def sheet_lines(self): @@ -649,8 +656,8 @@ def iface_paths(self, tmpdir): @pytest.fixture def prj(self, tmpdir, prj_data, anns_file, iface_paths): """ Provide basic Project. """ - prj_data["pipeline_interfaces"] = iface_paths - prj_data["metadata"][SAMPLE_ANNOTATIONS_KEY] = anns_file + prj_data[PIPELINE_INTERFACES_KEY] = iface_paths + prj_data[METADATA_KEY][SAMPLE_ANNOTATIONS_KEY] = anns_file prj_file = tmpdir.join("pconf.yaml").strpath with open(prj_file, 'w') as f: yaml.dump(prj_data, f) diff --git a/tests/specific_use_cases/test_cli_prj_pipe_args_collision.py b/tests/specific_use_cases/test_cli_prj_pipe_args_collision.py new file mode 100644 index 000000000..b522bf93c --- /dev/null +++ b/tests/specific_use_cases/test_cli_prj_pipe_args_collision.py @@ -0,0 +1,142 @@ +""" Tests for collision between CLI- and Project-specified pipe args """ + +import copy +import itertools +import os +import pytest +import yaml +from divvy import DEFAULT_COMPUTE_RESOURCES_NAME as DEF_RES +from looper import PipelineInterface, Project, SubmissionConductor +from looper.pipeline_interface import PL_KEY, PROTOMAP_KEY +from peppy.const import * +from peppy.utils import count_repeats +from tests.helpers import randconf +from ubiquerg import powerset + +__author__ = "Vince Reuter" +__email__ = "vreuter@virginia.edu" + + +ALL_PIPE_FLAGS = {"--random", "--arbitrary", "--does-not-matter"} + + +def generate_flags_partitions(flags): + """ + Generate all partitions of a CLI flag options. + + Each partition will be such that each flag is either designated for CLI + specification or for project config specification, but not both. + + :param Iterable[str] flags: collection of flag-like options to partition + :return Iterable[(str, dict[str, NoneType])]: collection of pairs in which + first component of each pair is collection of flags for CLI-like + specification simulation, and second component is specification of + remaining flags as pipeline args for project config + """ + return [(ps, {f: None for f in flags if f not in ps}) for ps in powerset(flags)] + + +def generate_overlaps(singles, mapped): + """ + Generate improper partitions, i.e. those with some overlap between subsets. + + :param Iterable[str] singles: collection of flag-like option names + :param dict[str, NoneType] mapped: flag-like option name mapped to null + :return Iterable[(str, dict[str, NoneType])]: collection of pairs in which + first component of each pair is collection of flags for CLI-like + specification simulation, and second component is specification of + remaining flags as pipeline args for project config + """ + common = set(singles) & set(mapped.keys()) + assert set() == common, "Nonempty intersection: {}".format(", ".join(common)) + singles_update = [list(singles) + list(m) for m in + powerset(mapped.keys(), min_items=1)] + mapped_update = [{f: None for f in fs} for fs in powerset(singles, min_items=1)] + aug_maps = [] + for mx in mapped_update: + m = copy.copy(mapped) + m.update(mx) + aug_maps.append(m) + return [(s, mapped) for s in singles_update] + [(singles, m) for m in aug_maps] + + +def generate_full_flags_cover(flags): + """ + Generate all paritions of flags, both with and without overlaps. + + Each partition is binary, designating each flag-like option for either + CLI-like specification simulation or for pipeline args project config + specification (or both in the case of a partition with a nonempty + intersection of the parts). + + :param Iterable[str] flags: collection of flag-like options to partition + :return Iterable[(str, dict[str, NoneType])]: collection of pairs in which + first component of each pair is collection of flags for CLI-like + specification simulation, and second component is specification of + remaining flags as pipeline args for project config + """ + partition = generate_flags_partitions(flags) + overlappings = [generate_overlaps(s, m) for s, m in partition] + return partition + list(itertools.chain(*overlappings)) + + +@pytest.fixture +def prj_dat(request, tmpdir): + """ Project config data for a test case """ + prj_dat = {METADATA_KEY: {OUTDIR_KEY: tmpdir.strpath}} + if PIPE_ARGS_SECTION in request.fixturenames: + pipe_args = request.getfixturevalue(PIPE_ARGS_SECTION) + if type(pipe_args) is not dict: + raise TypeError("Pipeline arguments must be a dictionary; got {}". + format(type(pipe_args))) + prj_dat[PIPE_ARGS_SECTION] = pipe_args + return prj_dat + + +@pytest.mark.parametrize( + ["cli_flags", "pipe_args_data"], generate_full_flags_cover(ALL_PIPE_FLAGS)) +def test_flag_like_option(tmpdir, cli_flags, pipe_args_data, prj_dat): + """ Collision of flag-like options adds each only once. """ + + # Pretest + assert len(cli_flags) > 0 or len(pipe_args_data) > 0, \ + "Invalid test case parameterization -- empty flags and pipeline args" + reps = count_repeats(cli_flags) + assert [] == reps, "Unexpected duplicate flags: {}".format(reps) + + # Build and validate Project. + pipe_key = "arbitrary-testpipe" + prj_dat[PIPE_ARGS_SECTION] = {pipe_key: pipe_args_data} + temproot = tmpdir.strpath + prj_cfg = os.path.join(temproot, randconf()) + prj = _write_and_build_prj(prj_cfg, prj_dat) + assert prj_dat[PIPE_ARGS_SECTION] == prj[PIPE_ARGS_SECTION].to_map() + + # Build the submission conductor. + pi_data = { + PROTOMAP_KEY: {GENERIC_PROTOCOL_KEY: pipe_key}, + PL_KEY: {pipe_key: {DEF_RES: { + "file_size": "0", + "cores": "1", + "mem": "1000", + "time": "0-01:00:00" + }}} + } + pi = PipelineInterface(pi_data) + conductor = SubmissionConductor( + pipe_key, pi, cmd_base="", prj=prj, extra_args=cli_flags) + _, addl_args_text = conductor._cmd_text_extra(0) + assert set(addl_args_text.split(" ")) == ALL_PIPE_FLAGS + + +def _write_and_build_prj(fp, d): + """ + Write project config file and build Project. + + :param str fp: path to config file + :param dict d: project config data + :return looper.Project: newly built Project instance + """ + with open(fp, 'w') as f: + yaml.dump(d, f) + return Project(fp) diff --git a/tests/test_basic_interface_group.py b/tests/test_basic_interface_group.py new file mode 100644 index 000000000..c36b6e42e --- /dev/null +++ b/tests/test_basic_interface_group.py @@ -0,0 +1,32 @@ +""" Tests for Project's pipeline interface group """ + +import pytest +from looper.project_piface_group import ProjectPifaceGroup + +__author__ = "Vince Reuter" +__email__ = "vreuter@virginia.edu" + + +@pytest.mark.skip("not implemented") +def test_iface_grp_cmp(): + pass + + +@pytest.mark.skip("not implemented") +def test_iface_grp_getitem(): + pass + + +@pytest.mark.skip("not implemented") +def test_iface_grp_iter(): + pass + + +@pytest.mark.skip("not implemented") +def test_iface_grp_len(): + pass + + +@pytest.mark.skip("not implemented") +def test_iface_grp_update(): + pass diff --git a/tests/test_submission_scripts.py b/tests/test_submission_scripts.py index e5547d98c..3ff9d5810 100644 --- a/tests/test_submission_scripts.py +++ b/tests/test_submission_scripts.py @@ -2,7 +2,6 @@ from collections import OrderedDict import copy -from functools import partial import glob import itertools import os @@ -10,11 +9,13 @@ import pytest import yaml -from peppy import FLAGS +from divvy import DEFAULT_COMPUTE_RESOURCES_NAME as DEFAULT_RESOURCES_KEY +from peppy import FLAGS, METADATA_KEY, OUTDIR_KEY import looper from looper.const import * from looper.looper import Project -from looper.utils import fetch_sample_flags, sample_folder +from looper.pipeline_interface import PROTOMAP_KEY, RESOURCES_KEY +from looper.utils import fetch_sample_flags, sample_folder from peppy import ASSAY_KEY, SAMPLE_ANNOTATIONS_KEY, SAMPLE_NAME_COLNAME, \ SAMPLE_SUBANNOTATIONS_KEY @@ -26,24 +27,22 @@ ATAC_PIPE = "pepatac.py" PIPE_NAME_KEY = "name" PIPE_PATH_KEY = "path" -PIPE_RESOURCES_KEY = "resources" SAMPLE_METADATA_HEADER = [SAMPLE_NAME_COLNAME, ASSAY_KEY] ASSAYS = ["WGBS", "WGBS", "ATAC", "ATAC"] SAMPLE_METADATA_RECORDS = [("sample" + str(i), p) for i, p in enumerate(ASSAYS)] DEFAULT_RESOURCES = { "file_size": "0", "cores": "1", "mem": "4000", "time": "00-01:00:00"} -DEFAULT_RESOURCES_KEY = "default" ATAC_SPEC = { PIPE_NAME_KEY: "PEPATAC", PIPE_PATH_KEY: ATAC_PIPE, - PIPE_RESOURCES_KEY: {DEFAULT_RESOURCES_KEY: DEFAULT_RESOURCES} + RESOURCES_KEY: {DEFAULT_RESOURCES_KEY: DEFAULT_RESOURCES} } WGBS_SPEC = { PIPE_NAME_KEY: "WGBS", PIPE_PATH_KEY: WGBS_PIPE, - PIPE_RESOURCES_KEY: {DEFAULT_RESOURCES_KEY: DEFAULT_RESOURCES} + RESOURCES_KEY: {DEFAULT_RESOURCES_KEY: DEFAULT_RESOURCES} } PIPE_SPECS = {"pepatac.py": ATAC_SPEC, "wgbs.py": WGBS_SPEC} PLIFACE_DATA = { - "protocol_mapping": {"ATAC": ATAC_PIPE, "WGBS": WGBS_PIPE}, + PROTOMAP_KEY: {"ATAC": ATAC_PIPE, "WGBS": WGBS_PIPE}, "pipelines": PIPE_SPECS } @@ -100,10 +99,10 @@ def prj(request, tmpdir): yaml.dump(PLIFACE_DATA, f) _touch_pipe_files(outdir, PLIFACE_DATA) metadata = {SAMPLE_ANNOTATIONS_KEY: anns, - "output_dir": outdir, "pipeline_interfaces": pipe_iface_path} + OUTDIR_KEY: outdir, PIPELINE_INTERFACES_KEY: pipe_iface_path} if subanns: metadata[SAMPLE_SUBANNOTATIONS_KEY] = subanns - prjdat = {"metadata": metadata} + prjdat = {METADATA_KEY: metadata} with open(conf_path, 'w') as f: yaml.dump(prjdat, f) @@ -134,7 +133,6 @@ def validate_submission_count(project, conductors): "Expected {} submissions but tallied {}".format(num_exp, num_obs) - def validate_submission_scripts(project, _): """ Check bijection between a project's samples and its submission scripts. @@ -274,7 +272,7 @@ def test_ignoring_flags(prj, flag_name, flagged_sample_names, validate): assert len(flagged_sample_names) == len(preexisting) assert set(flag_files_made) == set(itertools.chain(*preexisting.values())) conductors, pipe_keys = process_protocols( - prj, set(PLIFACE_DATA["protocol_mapping"].keys()), ignore_flags=True) + prj, set(PLIFACE_DATA[PROTOMAP_KEY].keys()), ignore_flags=True) assert all(map(lambda c: c.ignore_flags, conductors.values())), \ "Failed to establish precondition, that flags are to be ignored" for s in prj.samples: @@ -302,18 +300,18 @@ def test_convergent_protocol_mapping_keys(tmpdir): with open(anns_path, 'w') as f: f.write(os.linesep.join(sep.join(r) for r in records)) - pliface_data = {"protocol_mapping": dict(protomap), "pipelines": PIPE_SPECS} + pliface_data = {PROTOMAP_KEY: dict(protomap), "pipelines": PIPE_SPECS} pliface_filepath = os.path.join(outdir, "pipes.yaml") with open(pliface_filepath, 'w') as f: yaml.dump(pliface_data, f) - metadata = {"output_dir": outdir, SAMPLE_ANNOTATIONS_KEY: anns_path, + metadata = {OUTDIR_KEY: outdir, SAMPLE_ANNOTATIONS_KEY: anns_path, "pipeline_interfaces": pliface_filepath} _touch_pipe_files(tmpdir.strpath, pliface_data) - prjdat = {"metadata": metadata} + prjdat = {METADATA_KEY: metadata} pcfg = tmpdir.join("prj.yaml").strpath with open(pcfg, 'w') as f: yaml.dump(prjdat, f) @@ -382,7 +380,7 @@ def _process_base_pliface(prj, **kwargs): protocol name to collection of keys for pipelines for that protocol """ return process_protocols( - prj, set(PLIFACE_DATA["protocol_mapping"].keys()), **kwargs) + prj, set(PLIFACE_DATA[PROTOMAP_KEY].keys()), **kwargs) def _mkflag(sample, prj, flag): diff --git a/tests/test_utils.py b/tests/test_utils.py index 97f826129..ffc26cb1f 100644 --- a/tests/test_utils.py +++ b/tests/test_utils.py @@ -6,19 +6,13 @@ import pytest from looper.utils import determine_config_path, DEFAULT_CONFIG_SUFFIX, \ DEFAULT_METADATA_FOLDER +from tests.helpers import randstr, LETTERS_AND_DIGITS + __author__ = "Vince Reuter" __email__ = "vreuter@virginia.edu" -LETTERS_AND_DIGITS = string.ascii_letters + string.digits - - -def randstr(pool, size): - """ Generate random string of given size/length. """ - return "".join(random.choice(pool) for _ in range(size)) - - class ConfigPathDeterminationTests: """ Tests for config path determination function """ diff --git a/tests/test_with_microtest_as_smoketest.py b/tests/test_with_microtest_as_smoketest.py new file mode 100644 index 000000000..742e13126 --- /dev/null +++ b/tests/test_with_microtest_as_smoketest.py @@ -0,0 +1,70 @@ +""" Use microtest for smoketesting the looper CLI. """ + +import os +import subprocess +import pytest +from ubiquerg import build_cli_extra + +__author__ = "Vince Reuter" +__email__ = "vreuter@virginia.edu" + + +REPO_NAME = "microtest" +REPO_URL = "https://github.com/databio/{}".format(REPO_NAME) +SAMPLE_SELECTOR_OPTION = "--selector-attribute" +INCLUSION_OPTION = "--selector-include" + + +@pytest.mark.remote_data +@pytest.fixture +def data_root(tmpdir): + """ Clone data repo and return path to it. """ + tmp = tmpdir.strpath + cmd = "git clone {}".format(REPO_URL) + try: + subprocess.check_call(cmd, cwd=tmp, shell=True) + except subprocess.CalledProcessError: + raise Exception("Failed to pull data ()".format(cmd)) + root = os.path.join(tmp, REPO_NAME) + assert os.path.isdir(root) + return root + + +@pytest.fixture +def data_conf_file(data_root): + """ Clone data repo and return path to project config file. """ + f = os.path.join(data_root, "config", "microtest_config.yaml") + assert os.path.isfile(f), "Contents: {}".format(os.listdir(data_root)) + return f + + +@pytest.fixture(scope="function") +def temp_chdir_home(tmpdir): + """ Temporarily (for a test case) change home and working directories. """ + key = "HOME" + prev_home = os.environ[key] + prev_work = os.environ["PWD"] + curr_home = tmpdir.strpath + os.environ[key] = curr_home + os.chdir(curr_home) + yield + os.environ[key] = prev_home + os.chdir(prev_work) + assert os.getcwd() == prev_work + assert os.getenv(key) == prev_home + assert os.environ[key] == prev_home + + +@pytest.mark.remote_data +@pytest.mark.usefixtures("temp_chdir_home") +@pytest.mark.parametrize("cli_extra", + [build_cli_extra(kvs) for kvs in + [{SAMPLE_SELECTOR_OPTION: "protocol", INCLUSION_OPTION: "ATAC-seq"}]]) +def test_cli_microtest_smoke(cli_extra, data_conf_file): + """ Using microtest as project, test CLI for failure on specific cases. """ + cmd = "looper run -d {} {}".format(data_conf_file, cli_extra) + try: + subprocess.check_call(cmd, shell=True) + except Exception as e: + print("Exception: {}".format(e)) + pytest.fail("Failed command: {}".format(cmd))