From 2ed072d1f165a88c3b1f28cad188613cf4357543 Mon Sep 17 00:00:00 2001 From: John Vivian Date: Thu, 12 Jan 2017 11:52:37 -0800 Subject: [PATCH] Remove promised reqs, fix disk values (resolves #41, resolves #42) (#44) * Adjust disk and remove unnecessary Preqs (resolves #41, resolves #42) * Update docstrings FileStoreID is not just a str anymore (FileID) Udpate main docstring --- src/toil_rnaseq/rnaseq_cgl_pipeline.py | 72 ++++++++++++++------------ 1 file changed, 38 insertions(+), 34 deletions(-) diff --git a/src/toil_rnaseq/rnaseq_cgl_pipeline.py b/src/toil_rnaseq/rnaseq_cgl_pipeline.py index 4b440df..884e873 100644 --- a/src/toil_rnaseq/rnaseq_cgl_pipeline.py +++ b/src/toil_rnaseq/rnaseq_cgl_pipeline.py @@ -16,7 +16,7 @@ import yaml from bd2k.util.files import mkdir_p from bd2k.util.processes import which -from toil.job import Job, PromisedRequirement +from toil.job import Job from toil_lib import flatten from toil_lib import require, UserError from toil_lib.files import copy_files, tarball_files @@ -38,7 +38,7 @@ def download_sample(job, sample, config): Download sample and store unique attributes :param JobFunctionWrappingJob job: passed automatically by Toil - :param list sample: Information pertaining to a sample: filetype, paired/unpaired, UUID, and URL + :param list(str, str, str, str) sample: Sample information: filetype, paired/unpaired, UUID, and URL :param Namespace config: Argparse Namespace object containing argument inputs """ # Create copy of config that is sample specific @@ -76,19 +76,19 @@ def preprocessing_declaration(job, config, tar_id=None, r1_id=None, r2_id=None): :param JobFunctionWrappingJob job: passed automatically by Toil :param Namespace config: Argparse Namespace object containing argument inputs - :param str tar_id: FileStoreID of sample tar (or None) - :param str r1_id: FileStoreID of sample read 1 (or None) - :param str r2_id: FileStoreID of sample read 2 (or None) + :param FileID tar_id: FileStoreID of sample tar (or None) + :param FileID r1_id: FileStoreID of sample read 1 (or None) + :param FileID r2_id: FileStoreID of sample read 2 (or None) """ if tar_id: job.fileStore.logToMaster('Processing sample tar and queueing CutAdapt for: ' + config.uuid) - disk = PromisedRequirement(lambda x: 3 * x.size, tar_id) + disk = 5 * tar_id.size preprocessing_output = job.addChildJobFn(process_sample, config, input_tar=tar_id, disk=disk).rv() else: if r2_id: - disk = PromisedRequirement(lambda x, y: 2 * (x.size + y.size), r1_id, r2_id) + disk = 3 * (r1_id.size + r2_id.size) else: - disk = PromisedRequirement(lambda x: 2 * x.size, r1_id) + disk = 3 * r1_id.size preprocessing_output = job.addChildJobFn(process_sample, config, input_r1=r1_id, input_r2=r2_id, gz=config.gz, disk=disk).rv() job.addFollowOnJobFn(pipeline_declaration, config, preprocessing_output) @@ -100,14 +100,14 @@ def pipeline_declaration(job, config, preprocessing_output): :param JobFunctionWrappingJob job: passed automatically by Toil :param Namespace config: Argparse Namespace object containing argument inputs - :param tuple(str, str, bool) preprocessing_output: R1 FileStoreID, R2 FileStoreID, Improper Pairing (True/False) + :param tuple(FileID, FileID, bool) preprocessing_output: R1 FileStoreID, R2 FileStoreID, Improper Pairing Flag """ r1_id, r2_id = preprocessing_output kallisto_output, rsem_star_output, fastqc_output = None, None, None if r2_id: - disk = PromisedRequirement(lambda x, y: 2 * (x.size + y.size), r1_id, r2_id) + disk = 2 * (r1_id.size + r2_id.size) else: - disk = PromisedRequirement(lambda x: 2 * x.size, r1_id) + disk = 2 * r1_id.size if config.fastqc: job.fileStore.logToMaster('Queueing FastQC job for: ') fastqc_output = job.addChildJobFn(run_fastqc, r1_id, r2_id, cores=2, disk=disk).rv() @@ -127,14 +127,14 @@ def star_alignment(job, config, r1_id, r2_id=None): :param JobFunctionWrappingJob job: passed automatically by Toil :param Namespace config: Argparse Namespace object containing argument inputs - :param str r1_id: FileStoreID of sample read 1 - :param str r2_id: FileStoreID of sample read 2 (or None) + :param FileID r1_id: FileStoreID of sample read 1 + :param FileId r2_id: FileStoreID of sample read 2 (or None) :return: FileStoreID results from RSEM - :rtype: str + :rtype: FileID|tuple(FileID, FileID) """ job.fileStore.logToMaster('Queueing RSEM job for: ' + config.uuid) mem = '2G' if config.ci_test else '40G' - disk = '2G' if config.ci_test else '100G' + disk = '2G' if config.ci_test else r1_id.size + r2_id.size + 80530636800 # 75 G for STAR index and tmp files star = job.addChildJobFn(run_star, r1_id, r2_id, star_index_url=config.star_index, wiggle=config.wiggle, cores=config.cores, memory=mem, disk=disk).rv() rsem = job.addFollowOnJobFn(rsem_quantification, config, star, disk=disk).rv() @@ -150,9 +150,9 @@ def bam_qc(job, config, star_output): :param JobFunctionWrappingJob job: passed automatically by Toil :param Namespace config: Argparse Namespace object containing argument inputs - :param tuple(str, str, str, str)|tuple(str, str, str) star_output: + :param tuple(FileID, FileID, FileID, FileID)|tuple(FileID, FileID, FileID) star_output: FileStoreIDs from STAR :return: FileStoreID results from bam_qc - :rtype: str + :rtype: FileID """ cores = min(4, config.cores) if config.wiggle: @@ -169,9 +169,9 @@ def rsem_quantification(job, config, star_output): :param JobFunctionWrappingJob job: passed automatically by Toil :param Namespace config: Argparse Namespace object containing argument inputs - :param tuple(str, str) star_output: FileStoreIDs from STARs output - :return: FileStoreID results from RSEM postprocess - :rtype: str + :param tuple(FileID, FileID, FileID, FileID)|tuple(FileID, FileID, FileID) star_output: FileStoreIDs from STAR + :return: FileStoreID results from RSEM postprocess and STAR log + :rtype: tuple(FileID, FileID, FileID) """ work_dir = job.fileStore.getLocalTempDir() cores = min(16, config.cores) @@ -194,7 +194,7 @@ def rsem_quantification(job, config, star_output): elif urlparse(config.output_dir).scheme != 's3': copy_files(file_paths=[bam_path], output_dir=config.output_dir) # Declare RSEM and RSEM post-process jobs - disk = PromisedRequirement(lambda x: 2 * x.size, transcriptome_id) + disk = 5 * transcriptome_id.size rsem_output = job.wrapJobFn(run_rsem, transcriptome_id, config.rsem_ref, paired=config.paired, cores=cores, disk=disk) rsem_postprocess = job.wrapJobFn(run_rsem_postprocess, config.uuid, rsem_output.rv(0), rsem_output.rv(1)) @@ -214,12 +214,12 @@ def process_sample(job, config, input_tar=None, input_r1=None, input_r2=None, gz :param JobFunctionWrappingJob job: passed automatically by Toil :param Namespace config: Argparse Namespace object containing argument inputs - :param str input_tar: fileStoreID of the tarball (if applicable) - :param str input_r1: fileStoreID of r1 fastq (if applicable) - :param str input_r2: fileStoreID of r2 fastq (if applicable) + :param FileID input_tar: fileStoreID of the tarball (if applicable) + :param FileID input_r1: fileStoreID of r1 fastq (if applicable) + :param FileID input_r2: fileStoreID of r2 fastq (if applicable) :param bool gz: If True, unzips the r1/r2 files :return: FileStoreID from Cutadapt or from fastqs directly if pipeline was run without Cutadapt option - :rtype: str + :rtype: tuple(FileID, FileID) """ job.fileStore.logToMaster('Processing sample: {}'.format(config.uuid)) work_dir = job.fileStore.getLocalTempDir() @@ -271,7 +271,7 @@ def process_sample(job, config, input_tar=None, input_r1=None, input_r2=None, gz p2.wait() processed_r1 = job.fileStore.writeGlobalFile(os.path.join(work_dir, 'R1.fastq')) processed_r2 = job.fileStore.writeGlobalFile(os.path.join(work_dir, 'R2.fastq')) - disk = PromisedRequirement(lambda y, z: 2 * (y.size + z.size), processed_r1, processed_r2) + disk = 2 * (processed_r1.size + processed_r2.size) else: command = 'zcat' if fastqs[0].endswith('.gz') else 'cat' if command == 'cat' and input_r1: @@ -280,7 +280,7 @@ def process_sample(job, config, input_tar=None, input_r1=None, input_r2=None, gz with open(os.path.join(work_dir, 'R1.fastq'), 'w') as f: subprocess.check_call([command] + fastqs, stdout=f) processed_r1 = job.fileStore.writeGlobalFile(os.path.join(work_dir, 'R1.fastq')) - disk = PromisedRequirement(lambda y: 2 * y.size, processed_r1) + disk = 2 * processed_r1.size # Start cutadapt step if config.cutadapt: return job.addChildJobFn(run_cutadapt, processed_r1, processed_r2, config.fwd_3pr_adapter, @@ -295,9 +295,10 @@ def consolidate_output(job, config, kallisto_output, rsem_star_output, fastqc_ou :param JobFunctionWrappingJob job: passed automatically by Toil :param Namespace config: Argparse Namespace object containing argument inputs - :param str kallisto_output: FileStoreID for Kallisto output - :param tuple(str, str) rsem_star_output: FileStoreIDs for RSEM and STAR output - :param str fastqc_output: FileStoreID for FastQC output + :param FileID kallisto_output: FileStoreID for Kallisto output + :param tuple(FileID, FileID, FileID)|tuple(FileID, FileID, FileID, bool, FileID) rsem_star_output: + FileStoreIDs for RSEM and STAR output, and a flag/FileID if run with bamQC + :param FileID fastqc_output: FileStoreID for FastQC output """ job.fileStore.logToMaster('Consolidating output: {}'.format(config.uuid)) work_dir = job.fileStore.getLocalTempDir() @@ -356,7 +357,7 @@ def parse_samples(path_to_manifest=None, sample_urls=None): Parses samples, specified in either a manifest or listed with --samples :param str path_to_manifest: Path to configuration file - :param list sample_urls: Sample URLs + :param list[str] sample_urls: Sample URLs :return: Samples and their attributes as defined in the manifest :rtype: list[list] """ @@ -501,9 +502,11 @@ def main(): Structure of RNA-Seq Pipeline (per sample) + 8 + | 3 -- 4 -- 5 / | - 0 -- 1 -- 2 ---- 7 -- 8 + 0 -- 1 -- 2 ---- 7 -- 9 | | 6 ----------- @@ -515,13 +518,14 @@ def main(): 5 = RSEM Post-processing 6 = FastQC 7 = Kallisto - 8 = Consoliate output and upload to S3 + 8 = BamQC (as specified by CKCC at UC Santa Cruz) + 9 = Consoliate output and upload to S3 ======================================= Dependencies Curl: apt-get install curl Docker: wget -qO- https://get.docker.com/ | sh Toil: pip install toil - Boto: pip install boto (OPTIONAL) + Boto: pip install boto (OPTIONAL, needed for upload to S3) """ parser = argparse.ArgumentParser(description=main.__doc__, formatter_class=argparse.RawTextHelpFormatter) subparsers = parser.add_subparsers(dest='command')