Skip to content

Commit

Permalink
Merge fastqs: support arbitrary number of inputs
Browse files Browse the repository at this point in the history
Also improve the compression handling.
  • Loading branch information
Donaim committed Feb 16, 2024
1 parent 6840f80 commit 0020301
Show file tree
Hide file tree
Showing 3 changed files with 449 additions and 169 deletions.
227 changes: 138 additions & 89 deletions micall/core/merge_fastqs.py
Original file line number Diff line number Diff line change
@@ -1,127 +1,176 @@
from argparse import ArgumentParser, ArgumentDefaultsHelpFormatter
import argparse
import logging
import shutil
import sys
import os
import tempfile
import gzip
import csv
from typing import TextIO, List, Optional, Dict, IO, Set, Tuple

from micall.core.trim_fastqs import TrimSteps, trim


logging.basicConfig(level=logging.INFO,
format='%(asctime)s[%(levelname)s]%(name)s.%(funcName)s(): %(message)s')
logger = logging.getLogger('micall')
logger = logging.getLogger(__name__)


def concatenate_files(input_file1, input_file2, output_file):
with open(input_file1, 'rb') as src1, \
open(input_file2, 'rb') as src2, \
open(output_file, 'wb') as dst:
class AmbiguousBadCycles(ValueError): pass
class BadTrimplanFieldNames(ValueError): pass

shutil.copyfileobj(src1, dst)
shutil.copyfileobj(src2, dst)

def concatenate_files(input_files: List[str], output_file: str) -> None:
with open(output_file, 'wb') as dst:
for input_file in input_files:
with open(input_file, 'rb') as src:
shutil.copyfileobj(src, dst)

def merge_fastqs(args):
with tempfile.NamedTemporaryFile() as trimmed_fastq1_a, \
tempfile.NamedTemporaryFile() as trimmed_fastq2_a, \
tempfile.NamedTemporaryFile() as trimmed_fastq1_b, \
tempfile.NamedTemporaryFile() as trimmed_fastq2_b:

logger.info('Processing reads of Sample A.')
def parse_inputs_and_merge_fastqs(trim_file: TextIO, mergeplan_file: TextIO, zip_file: Optional[TextIO]) -> None:
mergeplan: Dict[str, List[str]] = {}
trimplan: Set[Tuple[str, ...]] = set()
zipped: Set[str] = set()
bad_cycles: Dict[str, str] = {}

trim((args.fastq1_a, args.fastq2_a),
args.bad_cycles_a_csv,
(trimmed_fastq1_a.name, trimmed_fastq2_a.name),
use_gzip=not args.unzipped)
mergeplan_reader = csv.DictReader(mergeplan_file)
trim_reader = csv.DictReader(trim_file)
zip_reader = csv.DictReader(zip_file) if zip_file is not None else None

logger.info('Processing reads of Sample B.')
for row in mergeplan_reader:
input_path = row["input"]
output_path = row["output"]

trim((args.fastq1_b, args.fastq2_b),
args.bad_cycles_b_csv,
(trimmed_fastq1_b.name, trimmed_fastq2_b.name),
use_gzip=not args.unzipped)
if output_path not in mergeplan:
mergeplan[output_path] = []
mergeplan[output_path].append(input_path)

logger.info('Merging resulting reads files.')
no_badcycles_fields = list(sorted(
(field for field in trim_reader.fieldnames or [] if field != "bad_cycles"),
key=lambda field: field.lower()))
expected_no_badcycles_fields = [f"r{i + 1}" for i in range(len(no_badcycles_fields))]

os.makedirs(os.path.dirname(args.fastq1_result), exist_ok=True)
os.makedirs(os.path.dirname(args.fastq2_result), exist_ok=True)
if [field.lower() for field in no_badcycles_fields] \
!= expected_no_badcycles_fields:
raise BadTrimplanFieldNames(f"Bad field names: {no_badcycles_fields}, expected {expected_no_badcycles_fields}")

if args.unzipped:
concatenate_files(trimmed_fastq1_a.name, trimmed_fastq1_b.name,
args.fastq1_result)
concatenate_files(trimmed_fastq2_a.name, trimmed_fastq2_b.name,
args.fastq2_result)
for row in trim_reader:
input_paths = tuple(row[field] for field in no_badcycles_fields)
trimplan.add(input_paths)
bad_cycles_path = row.get("bad_cycles", "")
if bad_cycles_path:
for input_path in input_paths:
bad_cycles[input_path] = bad_cycles_path

else:
with tempfile.NamedTemporaryFile() as temp_fastq1, \
tempfile.NamedTemporaryFile() as temp_fastq2:
if zip_reader is not None:
for row in zip_reader:
zipped.add(row["file"])

temp_fastq1.close()
temp_fastq2.close()
return merge_fastqs(trimplan, mergeplan, zipped, bad_cycles)

concatenate_files(trimmed_fastq1_a.name, trimmed_fastq1_b.name,
temp_fastq1.name)
concatenate_files(trimmed_fastq2_a.name, trimmed_fastq2_b.name,
temp_fastq2.name)

logger.info('Compressing final outputs.')
def compress_file(input_path: str, output_path: str) -> None:
with open(input_path, 'rb') as input_file, \
open(output_path, 'wb') as output_file:
with gzip.GzipFile(fileobj=output_file, mode='wb') as gzip_file:
shutil.copyfileobj(input_file, gzip_file)

with open(temp_fastq1.name, 'rb') as f_in, gzip.open(args.fastq1_result, 'wb') as f_out:
shutil.copyfileobj(f_in, f_out)

with open(temp_fastq2.name, 'rb') as f_in, gzip.open(args.fastq2_result, 'wb') as f_out:
shutil.copyfileobj(f_in, f_out)
def uncompress_file(input_path: str, output_path: str) -> None:
with open(input_path, 'rb') as compressed_file, \
open(output_path, 'w+b') as ret:
with gzip.GzipFile(fileobj=compressed_file, mode='rb') as gzip_file:
shutil.copyfileobj(gzip_file, ret)

logger.info('Done.')

def get_transitive(graph: Dict[str, str], key: str) -> str:
if key in graph:
return get_transitive(graph, graph[key])
else:
return key


def merge_fastqs(trimplan: Set[Tuple[str, ...]],
mergeplan: Dict[str, List[str]],
zipped: Set[str] = set(),
bad_cycles: Dict[str, str] = {},
) -> None:

inputs = [value for values in mergeplan.values() for value in values]
outputs = list(mergeplan.keys())
name_mapping: Dict[str, str] = {}
temporary: List[IO] = []

for output_path in outputs:
os.makedirs(os.path.dirname(output_path), exist_ok=True)

for input_path in inputs:
if input_path in zipped:
logger.info('Uncompressing %s.', input_path)
uncompressed = tempfile.NamedTemporaryFile(mode="w+b")
uncompressed_path = uncompressed.name
uncompress_file(input_path, uncompressed_path)
temporary.append(uncompressed)
name_mapping[input_path] = uncompressed_path

for to_trim in trimplan:
assert len(to_trim) > 0

def main(argv) -> int:
parser = ArgumentParser(
description="Combine and filter the FASTQ files from two samples into a single output file.",
formatter_class=ArgumentDefaultsHelpFormatter)

parser.add_argument(
"fastq1_a",
help="FASTQ file containing forward reads of sample A",
)
parser.add_argument(
"fastq2_a",
help="FASTQ file containing reverse reads of sample A",
)
parser.add_argument(
"fastq1_b",
help="FASTQ file containing forward reads of sample B",
)
parser.add_argument(
"fastq2_b",
help="FASTQ file containing reverse reads of sample B",
)
parser.add_argument(
"fastq1_result",
help="Resulting combined FASTQ file containing forward reads",
)
parser.add_argument(
"fastq2_result",
help="Resulting combined FASTQ file containing reverse reads",
)
parser.add_argument(
"--bad_cycles_a_csv",
help="list of tiles and cycles rejected for poor quality in sample A",
)
parser.add_argument(
"--bad_cycles_b_csv",
help="list of tiles and cycles rejected for poor quality in sample B",
)
parser.add_argument(
'--unzipped', '-u',
action='store_true',
help='Set if the original FASTQ files are not compressed',
)
trim_outputs: List[str] = []
trim_inputs: List[str] = []

all_bad_cycles_paths = set(bad_cycles[path] for path in to_trim if path in bad_cycles)
if len(all_bad_cycles_paths) == 0:
bad_cycles_path = None
elif len(all_bad_cycles_paths) == 1:
bad_cycles_path = next(iter(all_bad_cycles_paths))
else:
raise AmbiguousBadCycles(f"Ambiguous bad_cycles for {to_trim}: {all_bad_cycles_paths}.")

for path in to_trim:
path = get_transitive(name_mapping, path)
tmp = tempfile.NamedTemporaryFile()
trim_inputs.append(path)
trim_outputs.append(tmp.name)
temporary.append(tmp)
name_mapping[path] = tmp.name

logger.info('Trimming samples %s.', ','.join(map(repr, to_trim)))
trim(trim_inputs, bad_cycles_path, trim_outputs, use_gzip=False)

for output_path in mergeplan:
input_files = mergeplan[output_path]
logger.info('Merging results %s to %s.', ','.join(map(repr, input_files)), output_path)
input_files = [get_transitive(name_mapping, path) for path in input_files]
tmp = tempfile.NamedTemporaryFile()
temporary.append(tmp)
name_mapping[output_path] = tmp.name
output_path = tmp.name
concatenate_files(input_files, output_path)

for output_path in mergeplan:
concatenated = name_mapping[output_path]
if output_path in zipped:
logger.info('Compressing output %s.', output_path)
compress_file(concatenated, output_path)
else:
os.rename(concatenated, output_path)

for toclose in temporary:
try: toclose.close()
except: pass

logger.info('Done.')


def main(argv: List[str]) -> int:
parser = argparse.ArgumentParser(description="Combine and filter the FASTQ files from multiple samples into single output files.")
parser.add_argument("trimplan", type=argparse.FileType('rt'), help="A CSV file containing the lists of files to be trimmed.")
parser.add_argument("mergeplan", type=argparse.FileType('rt'), help="A CSV file containing merge plan.")
parser.add_argument("--zipfile", type=argparse.FileType('rt'), help="A CSV file containing a list of files that are compressed or need to be compressed.")
args = parser.parse_args(argv)
merge_fastqs(args)

parse_inputs_and_merge_fastqs(args.trimplan, args.mergeplan, args.zipfile)
return 0


Expand Down
9 changes: 5 additions & 4 deletions micall/core/trim_fastqs.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
from pathlib import Path

import typing
from typing import Optional

from Bio import SeqIO

Expand Down Expand Up @@ -57,12 +58,12 @@ def parse_args():


def trim(original_fastq_filenames: typing.Sequence[str],
bad_cycles_filename: str,
bad_cycles_filename: Optional[str],
trimmed_fastq_filenames: typing.Sequence[str],
use_gzip: bool = True,
summary_file: typing.TextIO = None,
skip: typing.Tuple[str] = (),
project_code: str = None):
summary_file: Optional[typing.TextIO] = None,
skip: typing.Tuple[str, ...] = (),
project_code: Optional[str] = None):
"""
:param original_fastq_filenames: sequence of two filenames, containing
Expand Down
Loading

0 comments on commit 0020301

Please sign in to comment.