Skip to content

Commit

Permalink
Use samtools to sort (resolves #84)
Browse files Browse the repository at this point in the history
Updated version dependencies and use Toil's native dockerCall
  • Loading branch information
jvivian committed May 10, 2017
1 parent c1682cd commit 0e3edee
Show file tree
Hide file tree
Showing 3 changed files with 47 additions and 21 deletions.
6 changes: 3 additions & 3 deletions src/toil_rnaseq/qc.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
import os
from urlparse import urlparse

from toil.lib.docker import dockerCall
from toil_lib.files import tarball_files, copy_files
from toil_lib.programs import docker_call
from toil_lib.urls import s3am_upload


Expand All @@ -11,7 +11,7 @@ def run_bam_qc(job, aligned_bam_id, config):
Run BAM QC as specified by California Kids Cancer Comparison (CKCC)
:param JobFunctionWrappingJob job:
:param str aligned_bam_id: FileStoreID of sorted bam from STAR
:param str aligned_bam_id: FileStoreID of aligned bam from STAR
:param Namespace config: Argparse Namespace object containing argument inputs
Must contain:
config.uuid str: UUID of input sample
Expand All @@ -23,7 +23,7 @@ def run_bam_qc(job, aligned_bam_id, config):
"""
work_dir = job.fileStore.getLocalTempDir()
job.fileStore.readGlobalFile(aligned_bam_id, os.path.join(work_dir, 'rnaAligned.sortedByCoord.out.bam'))
docker_call(tool='hbeale/treehouse_bam_qc:1.0', work_dir=work_dir, parameters=['runQC.sh', str(job.cores)])
dockerCall(job, tool='hbeale/treehouse_bam_qc:1.0', workDir=work_dir, parameters=['runQC.sh', str(job.cores)])

# Tar Output files
output_names = ['readDist.txt', 'rnaAligned.out.md.sorted.geneBodyCoverage.curves.pdf',
Expand Down
58 changes: 42 additions & 16 deletions src/toil_rnaseq/rnaseq_cgl_pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
from bd2k.util.processes import which
from toil.common import Toil
from toil.job import Job
from toil.lib.docker import dockerCall
from toil_lib import flatten
from toil_lib import require, UserError
from toil_lib.files import copy_files, tarball_files
Expand Down Expand Up @@ -132,7 +133,7 @@ def star_alignment(job, config, r1_id, r2_id=None):
disk = r1_id.size + r2_id.size + 80530636800 if r2_id else r1_id.size + 80530636800 # 75G for STAR index / tmp
# Define job functions for STAR and RSEM
star = job.addChildJobFn(run_star, r1_id, r2_id, star_index_url=config.star_index,
wiggle=config.wiggle, cores=config.cores, memory=mem, disk=disk).rv()
wiggle=config.wiggle, sort=False, cores=config.cores, memory=mem, disk=disk).rv()
rsem = job.addFollowOnJobFn(rsem_quantification, config, star, disk=disk).rv()
if config.bamqc:
return rsem, job.addFollowOnJobFn(bam_qc, config, star, disk=disk).rv()
Expand All @@ -152,11 +153,11 @@ def bam_qc(job, config, star_output):
"""
cores = min(4, config.cores)
if config.wiggle:
transcriptome_id, sorted_id, wiggle_id, log_id, sj_id = star_output
transcriptome_id, aligned_id, wiggle_id, log_id, sj_id = star_output
else:
transcriptome_id, sorted_id, log_id, sj_id = star_output
disk = 5 * sorted_id.size
return job.addChildJobFn(run_bam_qc, sorted_id, config, cores=cores, disk=disk).rv()
transcriptome_id, aligned_id, log_id, sj_id = star_output
disk = 5 * aligned_id.size
return job.addChildJobFn(run_bam_qc, aligned_id, config, cores=cores, disk=disk).rv()


def rsem_quantification(job, config, star_output):
Expand All @@ -172,7 +173,7 @@ def rsem_quantification(job, config, star_output):
work_dir = job.fileStore.getLocalTempDir()
cores = min(16, config.cores)
if config.wiggle:
transcriptome_id, sorted_id, wiggle_id, log_id, sj_id = flatten(star_output)
transcriptome_id, aligned_id, wiggle_id, log_id, sj_id = flatten(star_output)
wiggle_path = os.path.join(work_dir, config.uuid + '.wiggle.bg')
job.fileStore.readGlobalFile(wiggle_id, wiggle_path)
if urlparse(config.output_dir).scheme == 's3':
Expand All @@ -181,15 +182,11 @@ def rsem_quantification(job, config, star_output):
copy_files(file_paths=[wiggle_path], output_dir=config.output_dir)
job.fileStore.deleteGlobalFile(wiggle_id)
else:
transcriptome_id, sorted_id, log_id, sj_id = flatten(star_output)
transcriptome_id, aligned_id, log_id, sj_id = flatten(star_output)
# Save sorted bam if flag is selected
if config.save_bam and not config.bamqc: # if config.bamqc is selected, bam is being saved in run_bam_qc
bam_path = os.path.join(work_dir, config.uuid + '.sorted.bam')
job.fileStore.readGlobalFile(sorted_id, bam_path)
if urlparse(config.output_dir).scheme == 's3' and config.ssec:
s3am_upload(fpath=bam_path, s3_dir=config.output_dir, s3_key_path=config.ssec)
elif urlparse(config.output_dir).scheme != 's3':
copy_files(file_paths=[bam_path], output_dir=config.output_dir)
disk = 3 * aligned_id.size
job.addChildJobFn(sort_and_save_bam, config, aligned_id, cores=config.cores, disk=disk)
# Declare RSEM and RSEM post-process jobs
disk = 5 * transcriptome_id.size
rsem_output = job.wrapJobFn(run_rsem, transcriptome_id, config.rsem_ref, paired=config.paired,
Expand All @@ -208,7 +205,7 @@ def rsem_quantification(job, config, star_output):
# Delete intermediates
ids_to_delete = [transcriptome_id, log_id, sj_id]
if not config.bamqc: # If BamQC isn't being run, the sorted bam is no longer needed
ids_to_delete.append(sorted_id)
ids_to_delete.append(aligned_id)
job.addFollowOnJobFn(cleanup_ids, ids_to_delete)

return rsem_postprocess.rv(), star_id
Expand Down Expand Up @@ -380,6 +377,35 @@ def cleanup_ids(job, ids_to_delete):
[job.fileStore.deleteGlobalFile(x) for x in ids_to_delete if x is not None]


def sort_and_save_bam(job, config, bam_id):
"""
Sorts STAR's output bam using samtools. STAR has a sporadic memory leak when sorting.
:param JobFunctionWrappingJob job: passed automatically by Toil
:param Namespace config: Argparse Namespace object containing argument inputs
:param FileID bam_id: FileID for STARs genome aligned bam
"""
work_dir = job.fileStore.getLocalTempDir()
job.fileStore.readGlobalFile(bam_id, os.path.join(work_dir, 'aligned.bam'))

parameters = ['sort',
'-o', '/data/{}.sorted.bam'.format(config.uuid),
'-O', 'bam',
'-T', 'temp',
'-@', str(job.cores),
'/data/star.bam']

dockerCall(job, tool='quay.io/ucsc_cgl/samtools:1.3--256539928ea162949d8a65ca5c79a72ef557ce7c',
parameters=parameters, workDir=work_dir)

bam_path = os.path.join(work_dir, '{}.sorted.bam'.format(config.uuid))

if urlparse(config.output_dir).scheme == 's3' and config.ssec:
s3am_upload(fpath=bam_path, s3_dir=config.output_dir, s3_key_path=config.ssec)
elif urlparse(config.output_dir).scheme != 's3':
copy_files(file_paths=[bam_path], output_dir=config.output_dir)


# Pipeline specific functions
def parse_samples(path_to_manifest=None, sample_urls=None):
"""
Expand Down Expand Up @@ -622,8 +648,8 @@ def main():

# Output dir checks and handling
require(config.output_dir, 'No output location specified: {}'.format(config.output_dir))
for input in [x for x in [config.kallisto_index, config.star_index, config.rsem_ref] if x]:
require(urlparse(input).scheme in schemes,
for file_input in [x for x in [config.kallisto_index, config.star_index, config.rsem_ref] if x]:
require(urlparse(file_input).scheme in schemes,
'Input in config must have the appropriate URL prefix: {}'.format(schemes))

if not config.output_dir.startswith('/'):
Expand Down
4 changes: 2 additions & 2 deletions version.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,6 @@

version = '3.3.0a1'

required_versions = {'toil': '>=3.3.5',
'toil-lib': '==1.1.6',
required_versions = {'toil': '>=3.7.0',
'toil-lib': '==1.1.8',
'pyyaml': '>=3.11'}

0 comments on commit 0e3edee

Please sign in to comment.