Skip to content

Commit

Permalink
Remove promised reqs, fix disk values (resolves #41, resolves #42) (#44)
Browse files Browse the repository at this point in the history
* Adjust disk and remove unnecessary Preqs (resolves #41, resolves #42)

* Update docstrings

FileStoreID is not just a str anymore (FileID)
Udpate main docstring
  • Loading branch information
jvivian authored Jan 12, 2017
1 parent 8fa52c3 commit 2ed072d
Showing 1 changed file with 38 additions and 34 deletions.
72 changes: 38 additions & 34 deletions src/toil_rnaseq/rnaseq_cgl_pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
import yaml
from bd2k.util.files import mkdir_p
from bd2k.util.processes import which
from toil.job import Job, PromisedRequirement
from toil.job import Job
from toil_lib import flatten
from toil_lib import require, UserError
from toil_lib.files import copy_files, tarball_files
Expand All @@ -38,7 +38,7 @@ def download_sample(job, sample, config):
Download sample and store unique attributes
:param JobFunctionWrappingJob job: passed automatically by Toil
:param list sample: Information pertaining to a sample: filetype, paired/unpaired, UUID, and URL
:param list(str, str, str, str) sample: Sample information: filetype, paired/unpaired, UUID, and URL
:param Namespace config: Argparse Namespace object containing argument inputs
"""
# Create copy of config that is sample specific
Expand Down Expand Up @@ -76,19 +76,19 @@ def preprocessing_declaration(job, config, tar_id=None, r1_id=None, r2_id=None):
:param JobFunctionWrappingJob job: passed automatically by Toil
:param Namespace config: Argparse Namespace object containing argument inputs
:param str tar_id: FileStoreID of sample tar (or None)
:param str r1_id: FileStoreID of sample read 1 (or None)
:param str r2_id: FileStoreID of sample read 2 (or None)
:param FileID tar_id: FileStoreID of sample tar (or None)
:param FileID r1_id: FileStoreID of sample read 1 (or None)
:param FileID r2_id: FileStoreID of sample read 2 (or None)
"""
if tar_id:
job.fileStore.logToMaster('Processing sample tar and queueing CutAdapt for: ' + config.uuid)
disk = PromisedRequirement(lambda x: 3 * x.size, tar_id)
disk = 5 * tar_id.size
preprocessing_output = job.addChildJobFn(process_sample, config, input_tar=tar_id, disk=disk).rv()
else:
if r2_id:
disk = PromisedRequirement(lambda x, y: 2 * (x.size + y.size), r1_id, r2_id)
disk = 3 * (r1_id.size + r2_id.size)
else:
disk = PromisedRequirement(lambda x: 2 * x.size, r1_id)
disk = 3 * r1_id.size
preprocessing_output = job.addChildJobFn(process_sample, config, input_r1=r1_id, input_r2=r2_id,
gz=config.gz, disk=disk).rv()
job.addFollowOnJobFn(pipeline_declaration, config, preprocessing_output)
Expand All @@ -100,14 +100,14 @@ def pipeline_declaration(job, config, preprocessing_output):
:param JobFunctionWrappingJob job: passed automatically by Toil
:param Namespace config: Argparse Namespace object containing argument inputs
:param tuple(str, str, bool) preprocessing_output: R1 FileStoreID, R2 FileStoreID, Improper Pairing (True/False)
:param tuple(FileID, FileID, bool) preprocessing_output: R1 FileStoreID, R2 FileStoreID, Improper Pairing Flag
"""
r1_id, r2_id = preprocessing_output
kallisto_output, rsem_star_output, fastqc_output = None, None, None
if r2_id:
disk = PromisedRequirement(lambda x, y: 2 * (x.size + y.size), r1_id, r2_id)
disk = 2 * (r1_id.size + r2_id.size)
else:
disk = PromisedRequirement(lambda x: 2 * x.size, r1_id)
disk = 2 * r1_id.size
if config.fastqc:
job.fileStore.logToMaster('Queueing FastQC job for: ')
fastqc_output = job.addChildJobFn(run_fastqc, r1_id, r2_id, cores=2, disk=disk).rv()
Expand All @@ -127,14 +127,14 @@ def star_alignment(job, config, r1_id, r2_id=None):
:param JobFunctionWrappingJob job: passed automatically by Toil
:param Namespace config: Argparse Namespace object containing argument inputs
:param str r1_id: FileStoreID of sample read 1
:param str r2_id: FileStoreID of sample read 2 (or None)
:param FileID r1_id: FileStoreID of sample read 1
:param FileId r2_id: FileStoreID of sample read 2 (or None)
:return: FileStoreID results from RSEM
:rtype: str
:rtype: FileID|tuple(FileID, FileID)
"""
job.fileStore.logToMaster('Queueing RSEM job for: ' + config.uuid)
mem = '2G' if config.ci_test else '40G'
disk = '2G' if config.ci_test else '100G'
disk = '2G' if config.ci_test else r1_id.size + r2_id.size + 80530636800 # 75 G for STAR index and tmp files
star = job.addChildJobFn(run_star, r1_id, r2_id, star_index_url=config.star_index,
wiggle=config.wiggle, cores=config.cores, memory=mem, disk=disk).rv()
rsem = job.addFollowOnJobFn(rsem_quantification, config, star, disk=disk).rv()
Expand All @@ -150,9 +150,9 @@ def bam_qc(job, config, star_output):
:param JobFunctionWrappingJob job: passed automatically by Toil
:param Namespace config: Argparse Namespace object containing argument inputs
:param tuple(str, str, str, str)|tuple(str, str, str) star_output:
:param tuple(FileID, FileID, FileID, FileID)|tuple(FileID, FileID, FileID) star_output: FileStoreIDs from STAR
:return: FileStoreID results from bam_qc
:rtype: str
:rtype: FileID
"""
cores = min(4, config.cores)
if config.wiggle:
Expand All @@ -169,9 +169,9 @@ def rsem_quantification(job, config, star_output):
:param JobFunctionWrappingJob job: passed automatically by Toil
:param Namespace config: Argparse Namespace object containing argument inputs
:param tuple(str, str) star_output: FileStoreIDs from STARs output
:return: FileStoreID results from RSEM postprocess
:rtype: str
:param tuple(FileID, FileID, FileID, FileID)|tuple(FileID, FileID, FileID) star_output: FileStoreIDs from STAR
:return: FileStoreID results from RSEM postprocess and STAR log
:rtype: tuple(FileID, FileID, FileID)
"""
work_dir = job.fileStore.getLocalTempDir()
cores = min(16, config.cores)
Expand All @@ -194,7 +194,7 @@ def rsem_quantification(job, config, star_output):
elif urlparse(config.output_dir).scheme != 's3':
copy_files(file_paths=[bam_path], output_dir=config.output_dir)
# Declare RSEM and RSEM post-process jobs
disk = PromisedRequirement(lambda x: 2 * x.size, transcriptome_id)
disk = 5 * transcriptome_id.size
rsem_output = job.wrapJobFn(run_rsem, transcriptome_id, config.rsem_ref, paired=config.paired,
cores=cores, disk=disk)
rsem_postprocess = job.wrapJobFn(run_rsem_postprocess, config.uuid, rsem_output.rv(0), rsem_output.rv(1))
Expand All @@ -214,12 +214,12 @@ def process_sample(job, config, input_tar=None, input_r1=None, input_r2=None, gz
:param JobFunctionWrappingJob job: passed automatically by Toil
:param Namespace config: Argparse Namespace object containing argument inputs
:param str input_tar: fileStoreID of the tarball (if applicable)
:param str input_r1: fileStoreID of r1 fastq (if applicable)
:param str input_r2: fileStoreID of r2 fastq (if applicable)
:param FileID input_tar: fileStoreID of the tarball (if applicable)
:param FileID input_r1: fileStoreID of r1 fastq (if applicable)
:param FileID input_r2: fileStoreID of r2 fastq (if applicable)
:param bool gz: If True, unzips the r1/r2 files
:return: FileStoreID from Cutadapt or from fastqs directly if pipeline was run without Cutadapt option
:rtype: str
:rtype: tuple(FileID, FileID)
"""
job.fileStore.logToMaster('Processing sample: {}'.format(config.uuid))
work_dir = job.fileStore.getLocalTempDir()
Expand Down Expand Up @@ -271,7 +271,7 @@ def process_sample(job, config, input_tar=None, input_r1=None, input_r2=None, gz
p2.wait()
processed_r1 = job.fileStore.writeGlobalFile(os.path.join(work_dir, 'R1.fastq'))
processed_r2 = job.fileStore.writeGlobalFile(os.path.join(work_dir, 'R2.fastq'))
disk = PromisedRequirement(lambda y, z: 2 * (y.size + z.size), processed_r1, processed_r2)
disk = 2 * (processed_r1.size + processed_r2.size)
else:
command = 'zcat' if fastqs[0].endswith('.gz') else 'cat'
if command == 'cat' and input_r1:
Expand All @@ -280,7 +280,7 @@ def process_sample(job, config, input_tar=None, input_r1=None, input_r2=None, gz
with open(os.path.join(work_dir, 'R1.fastq'), 'w') as f:
subprocess.check_call([command] + fastqs, stdout=f)
processed_r1 = job.fileStore.writeGlobalFile(os.path.join(work_dir, 'R1.fastq'))
disk = PromisedRequirement(lambda y: 2 * y.size, processed_r1)
disk = 2 * processed_r1.size
# Start cutadapt step
if config.cutadapt:
return job.addChildJobFn(run_cutadapt, processed_r1, processed_r2, config.fwd_3pr_adapter,
Expand All @@ -295,9 +295,10 @@ def consolidate_output(job, config, kallisto_output, rsem_star_output, fastqc_ou
:param JobFunctionWrappingJob job: passed automatically by Toil
:param Namespace config: Argparse Namespace object containing argument inputs
:param str kallisto_output: FileStoreID for Kallisto output
:param tuple(str, str) rsem_star_output: FileStoreIDs for RSEM and STAR output
:param str fastqc_output: FileStoreID for FastQC output
:param FileID kallisto_output: FileStoreID for Kallisto output
:param tuple(FileID, FileID, FileID)|tuple(FileID, FileID, FileID, bool, FileID) rsem_star_output:
FileStoreIDs for RSEM and STAR output, and a flag/FileID if run with bamQC
:param FileID fastqc_output: FileStoreID for FastQC output
"""
job.fileStore.logToMaster('Consolidating output: {}'.format(config.uuid))
work_dir = job.fileStore.getLocalTempDir()
Expand Down Expand Up @@ -356,7 +357,7 @@ def parse_samples(path_to_manifest=None, sample_urls=None):
Parses samples, specified in either a manifest or listed with --samples
:param str path_to_manifest: Path to configuration file
:param list sample_urls: Sample URLs
:param list[str] sample_urls: Sample URLs
:return: Samples and their attributes as defined in the manifest
:rtype: list[list]
"""
Expand Down Expand Up @@ -501,9 +502,11 @@ def main():
Structure of RNA-Seq Pipeline (per sample)
8
|
3 -- 4 -- 5
/ |
0 -- 1 -- 2 ---- 7 -- 8
0 -- 1 -- 2 ---- 7 -- 9
| |
6 -----------
Expand All @@ -515,13 +518,14 @@ def main():
5 = RSEM Post-processing
6 = FastQC
7 = Kallisto
8 = Consoliate output and upload to S3
8 = BamQC (as specified by CKCC at UC Santa Cruz)
9 = Consoliate output and upload to S3
=======================================
Dependencies
Curl: apt-get install curl
Docker: wget -qO- https://get.docker.com/ | sh
Toil: pip install toil
Boto: pip install boto (OPTIONAL)
Boto: pip install boto (OPTIONAL, needed for upload to S3)
"""
parser = argparse.ArgumentParser(description=main.__doc__, formatter_class=argparse.RawTextHelpFormatter)
subparsers = parser.add_subparsers(dest='command')
Expand Down

0 comments on commit 2ed072d

Please sign in to comment.