From 0e3edeec9bb5ed4f0cda6da4333772eff75cecc6 Mon Sep 17 00:00:00 2001 From: John Vivian Date: Wed, 10 May 2017 14:56:00 -0700 Subject: [PATCH] Use samtools to sort (resolves #84) Updated version dependencies and use Toil's native dockerCall --- src/toil_rnaseq/qc.py | 6 +-- src/toil_rnaseq/rnaseq_cgl_pipeline.py | 58 +++++++++++++++++++------- version.py | 4 +- 3 files changed, 47 insertions(+), 21 deletions(-) diff --git a/src/toil_rnaseq/qc.py b/src/toil_rnaseq/qc.py index de57858..6827c0e 100644 --- a/src/toil_rnaseq/qc.py +++ b/src/toil_rnaseq/qc.py @@ -1,8 +1,8 @@ import os from urlparse import urlparse +from toil.lib.docker import dockerCall from toil_lib.files import tarball_files, copy_files -from toil_lib.programs import docker_call from toil_lib.urls import s3am_upload @@ -11,7 +11,7 @@ def run_bam_qc(job, aligned_bam_id, config): Run BAM QC as specified by California Kids Cancer Comparison (CKCC) :param JobFunctionWrappingJob job: - :param str aligned_bam_id: FileStoreID of sorted bam from STAR + :param str aligned_bam_id: FileStoreID of aligned bam from STAR :param Namespace config: Argparse Namespace object containing argument inputs Must contain: config.uuid str: UUID of input sample @@ -23,7 +23,7 @@ def run_bam_qc(job, aligned_bam_id, config): """ work_dir = job.fileStore.getLocalTempDir() job.fileStore.readGlobalFile(aligned_bam_id, os.path.join(work_dir, 'rnaAligned.sortedByCoord.out.bam')) - docker_call(tool='hbeale/treehouse_bam_qc:1.0', work_dir=work_dir, parameters=['runQC.sh', str(job.cores)]) + dockerCall(job, tool='hbeale/treehouse_bam_qc:1.0', workDir=work_dir, parameters=['runQC.sh', str(job.cores)]) # Tar Output files output_names = ['readDist.txt', 'rnaAligned.out.md.sorted.geneBodyCoverage.curves.pdf', diff --git a/src/toil_rnaseq/rnaseq_cgl_pipeline.py b/src/toil_rnaseq/rnaseq_cgl_pipeline.py index fc540c7..16abcd5 100644 --- a/src/toil_rnaseq/rnaseq_cgl_pipeline.py +++ b/src/toil_rnaseq/rnaseq_cgl_pipeline.py @@ -18,6 +18,7 @@ from bd2k.util.processes import which from toil.common import Toil from toil.job import Job +from toil.lib.docker import dockerCall from toil_lib import flatten from toil_lib import require, UserError from toil_lib.files import copy_files, tarball_files @@ -132,7 +133,7 @@ def star_alignment(job, config, r1_id, r2_id=None): disk = r1_id.size + r2_id.size + 80530636800 if r2_id else r1_id.size + 80530636800 # 75G for STAR index / tmp # Define job functions for STAR and RSEM star = job.addChildJobFn(run_star, r1_id, r2_id, star_index_url=config.star_index, - wiggle=config.wiggle, cores=config.cores, memory=mem, disk=disk).rv() + wiggle=config.wiggle, sort=False, cores=config.cores, memory=mem, disk=disk).rv() rsem = job.addFollowOnJobFn(rsem_quantification, config, star, disk=disk).rv() if config.bamqc: return rsem, job.addFollowOnJobFn(bam_qc, config, star, disk=disk).rv() @@ -152,11 +153,11 @@ def bam_qc(job, config, star_output): """ cores = min(4, config.cores) if config.wiggle: - transcriptome_id, sorted_id, wiggle_id, log_id, sj_id = star_output + transcriptome_id, aligned_id, wiggle_id, log_id, sj_id = star_output else: - transcriptome_id, sorted_id, log_id, sj_id = star_output - disk = 5 * sorted_id.size - return job.addChildJobFn(run_bam_qc, sorted_id, config, cores=cores, disk=disk).rv() + transcriptome_id, aligned_id, log_id, sj_id = star_output + disk = 5 * aligned_id.size + return job.addChildJobFn(run_bam_qc, aligned_id, config, cores=cores, disk=disk).rv() def rsem_quantification(job, config, star_output): @@ -172,7 +173,7 @@ def rsem_quantification(job, config, star_output): work_dir = job.fileStore.getLocalTempDir() cores = min(16, config.cores) if config.wiggle: - transcriptome_id, sorted_id, wiggle_id, log_id, sj_id = flatten(star_output) + transcriptome_id, aligned_id, wiggle_id, log_id, sj_id = flatten(star_output) wiggle_path = os.path.join(work_dir, config.uuid + '.wiggle.bg') job.fileStore.readGlobalFile(wiggle_id, wiggle_path) if urlparse(config.output_dir).scheme == 's3': @@ -181,15 +182,11 @@ def rsem_quantification(job, config, star_output): copy_files(file_paths=[wiggle_path], output_dir=config.output_dir) job.fileStore.deleteGlobalFile(wiggle_id) else: - transcriptome_id, sorted_id, log_id, sj_id = flatten(star_output) + transcriptome_id, aligned_id, log_id, sj_id = flatten(star_output) # Save sorted bam if flag is selected if config.save_bam and not config.bamqc: # if config.bamqc is selected, bam is being saved in run_bam_qc - bam_path = os.path.join(work_dir, config.uuid + '.sorted.bam') - job.fileStore.readGlobalFile(sorted_id, bam_path) - if urlparse(config.output_dir).scheme == 's3' and config.ssec: - s3am_upload(fpath=bam_path, s3_dir=config.output_dir, s3_key_path=config.ssec) - elif urlparse(config.output_dir).scheme != 's3': - copy_files(file_paths=[bam_path], output_dir=config.output_dir) + disk = 3 * aligned_id.size + job.addChildJobFn(sort_and_save_bam, config, aligned_id, cores=config.cores, disk=disk) # Declare RSEM and RSEM post-process jobs disk = 5 * transcriptome_id.size rsem_output = job.wrapJobFn(run_rsem, transcriptome_id, config.rsem_ref, paired=config.paired, @@ -208,7 +205,7 @@ def rsem_quantification(job, config, star_output): # Delete intermediates ids_to_delete = [transcriptome_id, log_id, sj_id] if not config.bamqc: # If BamQC isn't being run, the sorted bam is no longer needed - ids_to_delete.append(sorted_id) + ids_to_delete.append(aligned_id) job.addFollowOnJobFn(cleanup_ids, ids_to_delete) return rsem_postprocess.rv(), star_id @@ -380,6 +377,35 @@ def cleanup_ids(job, ids_to_delete): [job.fileStore.deleteGlobalFile(x) for x in ids_to_delete if x is not None] +def sort_and_save_bam(job, config, bam_id): + """ + Sorts STAR's output bam using samtools. STAR has a sporadic memory leak when sorting. + + :param JobFunctionWrappingJob job: passed automatically by Toil + :param Namespace config: Argparse Namespace object containing argument inputs + :param FileID bam_id: FileID for STARs genome aligned bam + """ + work_dir = job.fileStore.getLocalTempDir() + job.fileStore.readGlobalFile(bam_id, os.path.join(work_dir, 'aligned.bam')) + + parameters = ['sort', + '-o', '/data/{}.sorted.bam'.format(config.uuid), + '-O', 'bam', + '-T', 'temp', + '-@', str(job.cores), + '/data/star.bam'] + + dockerCall(job, tool='quay.io/ucsc_cgl/samtools:1.3--256539928ea162949d8a65ca5c79a72ef557ce7c', + parameters=parameters, workDir=work_dir) + + bam_path = os.path.join(work_dir, '{}.sorted.bam'.format(config.uuid)) + + if urlparse(config.output_dir).scheme == 's3' and config.ssec: + s3am_upload(fpath=bam_path, s3_dir=config.output_dir, s3_key_path=config.ssec) + elif urlparse(config.output_dir).scheme != 's3': + copy_files(file_paths=[bam_path], output_dir=config.output_dir) + + # Pipeline specific functions def parse_samples(path_to_manifest=None, sample_urls=None): """ @@ -622,8 +648,8 @@ def main(): # Output dir checks and handling require(config.output_dir, 'No output location specified: {}'.format(config.output_dir)) - for input in [x for x in [config.kallisto_index, config.star_index, config.rsem_ref] if x]: - require(urlparse(input).scheme in schemes, + for file_input in [x for x in [config.kallisto_index, config.star_index, config.rsem_ref] if x]: + require(urlparse(file_input).scheme in schemes, 'Input in config must have the appropriate URL prefix: {}'.format(schemes)) if not config.output_dir.startswith('/'): diff --git a/version.py b/version.py index 30baba7..bba9682 100644 --- a/version.py +++ b/version.py @@ -14,6 +14,6 @@ version = '3.3.0a1' -required_versions = {'toil': '>=3.3.5', - 'toil-lib': '==1.1.6', +required_versions = {'toil': '>=3.7.0', + 'toil-lib': '==1.1.8', 'pyyaml': '>=3.11'}