Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

demux_deplete small updates #552

Merged
merged 10 commits into from
Aug 22, 2024
8 changes: 4 additions & 4 deletions pipes/WDL/tasks/tasks_ncbi.wdl
Original file line number Diff line number Diff line change
Expand Up @@ -326,7 +326,7 @@ task gisaid_meta_prep {

out_headers = ('submitter', 'fn', 'covv_virus_name', 'covv_type', 'covv_passage', 'covv_collection_date', 'covv_location', 'covv_add_location', 'covv_host', 'covv_add_host_info', 'covv_sampling_strategy', 'covv_gender', 'covv_patient_age', 'covv_patient_status', 'covv_specimen', 'covv_outbreak', 'covv_last_vaccinated', 'covv_treatment', 'covv_seq_technology', 'covv_assembly_method', 'covv_coverage', 'covv_orig_lab', 'covv_orig_lab_addr', 'covv_provider_sample_id', 'covv_subm_lab', 'covv_subm_lab_addr', 'covv_subm_sample_id', 'covv_authors', 'covv_comment', 'comment_type')

with open('~{out_name}', 'wt') as outf:
with open('~{out_name}', 'w', newline='') as outf:
writer = csv.DictWriter(outf, out_headers, dialect=csv.unix_dialect, quoting=csv.QUOTE_MINIMAL)
writer.writeheader()

Expand Down Expand Up @@ -615,13 +615,13 @@ task biosample_to_table {
if v and (k not in biosample_headers) and k not in ('message', 'accession'):
biosample_headers.append(k)
print("biosample headers ({}): {}".format(len(biosample_headers), biosample_headers))
print("biosample rows ({})".format(len(biosample_attributes)))
print("biosample output rows ({})".format(len(biosample_attributes)))
samples_seen_without_biosample = set(sample_names_seen) - set(row['sample_name'] for row in biosample_attributes)
print("samples seen in bams without biosample entries ({}): {}".format(len(samples_seen_without_biosample), sorted(samples_seen_without_biosample)))

# write reformatted table
with open('~{base}.entities.tsv', 'wt') as outf:
writer = csv.DictWriter(outf, delimiter='\t', fieldnames=["~{sanitized_id_col}"]+biosample_headers, quoting=csv.QUOTE_MINIMAL)
with open('~{base}.entities.tsv', 'w', newline='') as outf:
writer = csv.DictWriter(outf, delimiter='\t', fieldnames=["~{sanitized_id_col}"]+biosample_headers, dialect=csv.unix_dialect, quoting=csv.QUOTE_MINIMAL)
writer.writeheader()
for row in biosample_attributes:
outrow = {h: row[h] for h in biosample_headers}
Expand Down
2 changes: 1 addition & 1 deletion pipes/WDL/tasks/tasks_nextstrain.wdl
Original file line number Diff line number Diff line change
Expand Up @@ -399,7 +399,7 @@ task derived_cols {
out_headers.extend(adder.extra_headers())

with open_or_gzopen(out_tsv, 'wt') as outf:
writer = csv.DictWriter(outf, out_headers, delimiter='\t')
writer = csv.DictWriter(outf, out_headers, delimiter='\t', dialect=csv.unix_dialect, quoting=csv.QUOTE_MINIMAL)
writer.writeheader()
for row in reader:
for adder in adders:
Expand Down
24 changes: 20 additions & 4 deletions pipes/WDL/tasks/tasks_terra.wdl
Original file line number Diff line number Diff line change
Expand Up @@ -357,7 +357,7 @@ task download_entities_tsv {
rows.append(outrow)

# dump to tsv
with open(out_fname, 'wt') as outf:
with open(out_fname, 'w', newline='') as outf:
writer = csv.DictWriter(outf, headers.keys(), delimiter='\t', dialect=csv.unix_dialect, quoting=csv.QUOTE_MINIMAL)
writer.writeheader()
writer.writerows(rows)
Expand Down Expand Up @@ -385,6 +385,8 @@ task create_or_update_sample_tables {
Array[String] cleaned_reads_unaligned_bams

File meta_by_filename_json
File? read_counts_raw_json
File? read_counts_cleaned_json

String sample_table_name = "sample"
String library_table_name = "library"
Expand All @@ -397,6 +399,7 @@ task create_or_update_sample_tables {
}

command <<<
set -e
python3<<CODE
flowcell_data_id = '~{flowcell_run_id}'
workspace_project = '~{workspace_namespace}'
Expand All @@ -413,6 +416,16 @@ task create_or_update_sample_tables {

print(workspace_project + "\n" + workspace_name)

# process read counts if available
read_counts_raw = {}
read_counts_cleaned = {}
if '~{default="" read_counts_raw_json}':
with open('~{default="" read_counts_raw_json}','rt') as inf:
read_counts_raw = {pair['left']: pair['right'] for pair in json.load(inf)}
if '~{default="" read_counts_cleaned_json}':
with open('~{default="" read_counts_cleaned_json}','rt') as inf:
read_counts_cleaned = {pair['left']: pair['right'] for pair in json.load(inf)}

# create tsv to populate library table with raw_bam and cleaned_bam columns
raw_bams_list = '~{sep="*" raw_reads_unaligned_bams}'.split('*')
raw_library_id_list = [bam.split("/")[-1].replace(".bam", "") for bam in raw_bams_list]
Expand All @@ -435,9 +448,9 @@ task create_or_update_sample_tables {
# create tsv to populate library table with metadata from demux json / samplesheet
# to do: maybe just merge this into df_library_bams instead and make a single tsv output
library_meta_fname = "library_metadata.tsv"
with open(library_meta_fname, 'wt') as outf:
with open(library_meta_fname, 'w', newline='') as outf:
copy_cols = ["sample_original", "spike_in", "control", "batch_lib", "library", "lane", "library_id_per_sample", "library_strategy", "library_source", "library_selection", "design_description"]
out_header = [lib_col_name] + copy_cols
out_header = [lib_col_name, 'flowcell', 'read_count_raw', 'read_count_cleaned'] + copy_cols
print(f"library_metadata.tsv output header: {out_header}")
writer = csv.DictWriter(outf, out_header, delimiter='\t', dialect=csv.unix_dialect, quoting=csv.QUOTE_MINIMAL)
writer.writeheader()
Expand All @@ -447,6 +460,9 @@ task create_or_update_sample_tables {
if library['run'] in library_bam_names:
out_row = {col: library.get(col, '') for col in copy_cols}
out_row[lib_col_name] = library['run']
out_row['flowcell'] = flowcell_data_id
out_row['read_count_raw'] = read_counts_raw.get(library['run'], '')
out_row['read_count_cleaned'] = read_counts_cleaned.get(library['run'], '')
out_rows.append(out_row)
writer.writerows(out_rows)

Expand Down Expand Up @@ -498,7 +514,7 @@ task create_or_update_sample_tables {
print (f"\tsample {sample_id} pre-exists in Terra table, merging old members {already_associated_libraries} with new members {libraries}")
merged_sample_ids.add(sample_id)

outf.write(f'{sample_id}\t{json.dumps([{"entityType":"library","entityName":library_name} for library_name in libraries])}\n')
outf.write(f'{sample_id}\t{json.dumps([{"entityType":"~{library_table_name}","entityName":library_name} for library_name in libraries])}\n')
print(f"wrote {len(sample_to_libraries)} samples to {sample_fname} where {len(merged_sample_ids)} samples were already in the Terra table")

# write everything to the Terra table! -- TO DO: move this to separate task
Expand Down
12 changes: 11 additions & 1 deletion pipes/WDL/workflows/demux_deplete.wdl
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
version 1.0

#DX_SKIP_WORKFLOW

import "../tasks/tasks_demux.wdl" as demux
import "../tasks/tasks_ncbi.wdl" as ncbi
import "../tasks/tasks_reports.wdl" as reports
Expand Down Expand Up @@ -176,6 +178,8 @@ workflow demux_deplete {
if (read_count_post_depletion < min_reads_per_bam) {
File empty_bam = raw_reads
}
Pair[String,Int] count_raw = (basename(raw_reads, '.bam'), spikein.reads_total)
Pair[String,Int] count_cleaned = (basename(raw_reads, '.bam'), read_count_post_depletion)
}

if(defined(biosample_map)) {
Expand Down Expand Up @@ -214,7 +218,9 @@ workflow demux_deplete {

raw_reads_unaligned_bams = flatten(illumina_demux.raw_reads_unaligned_bams),
cleaned_reads_unaligned_bams = select_all(cleaned_bam_passing),
meta_by_filename_json = meta_filename.merged_json
meta_by_filename_json = meta_filename.merged_json,
read_counts_raw_json = write_json(count_raw),
read_counts_cleaned_json = write_json(count_cleaned)
}

if(defined(biosample_map)) {
Expand Down Expand Up @@ -279,6 +285,10 @@ workflow demux_deplete {
File run_info_json = illumina_demux.run_info_json[0]
String run_id = illumina_demux.run_info[0]['run_id']

File? terra_library_table = create_or_update_sample_tables.library_metadata_tsv
File? terra_sample_library_map = create_or_update_sample_tables.sample_membership_tsv
File? terra_sample_metadata = biosample_to_table.sample_meta_tsv

String demux_viral_core_version = illumina_demux.viralngs_version[0]
}
}
2 changes: 2 additions & 0 deletions pipes/WDL/workflows/sarscov2_illumina_full.wdl
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
version 1.0

#DX_SKIP_WORKFLOW

import "../tasks/tasks_read_utils.wdl" as read_utils
import "../tasks/tasks_ncbi.wdl" as ncbi
import "../tasks/tasks_nextstrain.wdl" as nextstrain
Expand Down