Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

parameterize table names in create_or_update_sample_tables task #533

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
92 changes: 68 additions & 24 deletions pipes/WDL/tasks/tasks_terra.wdl
Original file line number Diff line number Diff line change
Expand Up @@ -382,8 +382,12 @@ task create_or_update_sample_tables {
Array[String]? raw_reads_unaligned_bams
Array[String]? cleaned_reads_unaligned_bams

File? meta_by_filename_json
File? meta_by_sample_json
File? meta_by_filename_json
File? meta_by_sample_json

String flowcell_table_name = "flowcell"
String seq_library_table_name = "library"
String sample_aggregation_table_name = "sample"

String docker = "quay.io/broadinstitute/viral-core:2.2.4" #skip-global-version-pin
}
Expand All @@ -392,6 +396,18 @@ task create_or_update_sample_tables {
volatile: true
}

parameter_meta {
flowcell_table_name: {
description: "Name of table containing per-flowcell rows. If overridden, flowcell_table_name must not contain spaces."
}
seq_library_table_name: {
description: "Name of table containing per-library rows. If overridden, seq_library_table_name must not contain spaces."
}
sample_aggregation_table_name: {
description: "Name of table representing samples, where each row has a foreign key link to one or more per-library rows. If overridden, sample_aggregation_table_name must not contain spaces."
}
}

command <<<
python3<<CODE
flowcell_data_id = '~{flowcell_run_id}'
Expand Down Expand Up @@ -437,10 +453,10 @@ task create_or_update_sample_tables {

def get_bam_lists_for_flowcell_from_live_table(project, workspace, runID):
# API call to get flowcell_data table
response = fapi.get_entities_tsv(project, workspace, "flowcell", model="flexible")
response = fapi.get_entities_tsv(project, workspace, "~{flowcell_table_name}", model="flexible")

# read API response into data frame
df = pd.read_csv(StringIO(response.text), sep="\t", index_col="entity:flowcell_id")
df = pd.read_csv(StringIO(response.text), sep="\t", index_col="entity:~{flowcell_table_name}_id")

# create library.tsv data frame (entity:library_id)
cleaned_bams_list = literal_eval(df.cleaned_reads_unaligned_bams[runID])
Expand All @@ -450,14 +466,14 @@ task create_or_update_sample_tables {

def create_library_to_bam_tsvs(cleaned_bams_list, raw_bams_list, runID):
cleaned_library_id_list = [bam.split("/")[-1].replace(".bam", "").replace(".cleaned", "") for bam in cleaned_bams_list]
df_library_table = pd.DataFrame({"entity:library_id" : cleaned_library_id_list,
df_library_table = pd.DataFrame({"entity:~{seq_library_table_name}_id" : cleaned_library_id_list,
"cleaned_bam" : cleaned_bams_list})
cleaned_library_fname = runID + "_cleaned_bams.tsv"
df_library_table.to_csv(cleaned_library_fname, sep="\t", index=False)

# create library.tsv data frame (entity:library_id)
raw_library_id_list = [bam.split("/")[-1].replace(".bam", "") for bam in raw_bams_list]
df_library_table = pd.DataFrame({"entity:library_id" : raw_library_id_list,
df_library_table = pd.DataFrame({"entity:~{seq_library_table_name}_id" : raw_library_id_list,
"raw_bam" : raw_bams_list})
raw_library_fname = runID + "_raw_bams.tsv"
df_library_table.to_csv(raw_library_fname, sep="\t", index=False)
Expand Down Expand Up @@ -499,12 +515,12 @@ task create_or_update_sample_tables {
copy_cols = ["sample_original", "spike_in"]

# API call to get existing sample_set mappings
header, rows = get_entities_to_table(workspace_project, workspace_name, "sample")
df_sample = pd.DataFrame.from_records(rows, columns=header, index="sample_id")
header, rows = get_entities_to_table(workspace_project, workspace_name, "~{sample_aggregation_table_name}")
df_sample = pd.DataFrame.from_records(rows, columns=header, index="~{sample_aggregation_table_name}_id")

# API call to get all existing library ids
header, rows = get_entities_to_table(workspace_project, workspace_name, "library")
df_library = pd.DataFrame.from_records(rows, columns=header, index="library_id")
header, rows = get_entities_to_table(workspace_project, workspace_name, "~{seq_library_table_name}")
df_library = pd.DataFrame.from_records(rows, columns=header, index="~{seq_library_table_name}_id")

# if meta_by_filename_json specified and size>0 bytes
# parse as json, pass to for loop below
Expand All @@ -518,28 +534,52 @@ task create_or_update_sample_tables {
library_meta_dict = json.load(meta_fp)
else:
# API call to get flowcell_data table
header, rows = get_entities_to_table(workspace_project, workspace_name, "flowcell")
df_flowcell = pd.DataFrame.from_records(rows, columns=header, index="flowcell_id")
header, rows = get_entities_to_table(workspace_project, workspace_name, "~{flowcell_table_name}")
df_flowcell = pd.DataFrame.from_records(rows, columns=header, index="~{flowcell_table_name}_id")
library_meta_dict = df_flowcell.meta_by_filename[flowcell_data_id]

# grab the meta_by_filename values to create new sample->library (sample_set->sample) mappings
sample_to_libraries = {}
for library_id, data in library_meta_dict.items():
sample_id = data['sample']
# get the sample ID from either the json files provided, or the live flowcell table
# (defaulting to the "sample" key used in the json, but falling back to the WDL-specified table name)
sample_id = data.get('sample',data['~{sample_aggregation_table_name}'])
sample_to_libraries.setdefault(sample_id, [])
if library_id in df_library.index:
sample_to_libraries[sample_id].append(library_id)
else:
print (f"missing {library_id} from library table")

# (very) naive pluralization
def pluralize(input_noun_str):
if input_noun_str.endswith(('s', 'x', 'z', 'ch', 'sh')):
return input_noun_str + 'es'
elif input_noun_str.endswith('y'):
# if a consonant precedes the y, 'y' -> 'ies'
if input_noun_str[-2] not in 'aeiou':
return input_noun_str[:-1] + 'ies'
else:
return input_noun_str + 's'
elif input_noun_str.endswith('f'):
return input_noun_str[:-1] + 'ves'
elif input_noun_str.endswith('fe'):
return input_noun_str[:-2] + 'ves'
else:
return input_noun_str + 's'

# naively pluralize "library" table name for column in the "sample" table
# listing one or more such entities
# (or what the inputs specify these names should be called)
pluralized_library_term = pluralize("~{seq_library_table_name}")

# merge in new sample->library mappings with any pre-existing sample->library mappings
if len(df_sample)>0:
print(df_sample.index)
for sample_id in sample_to_libraries.keys():
if sample_id in df_sample.index:
print (f"sample_set {sample_id} pre-exists in Terra table, merging with new members")
#sample_set_to_samples[set_id].extend(df_sample_set.samples[set_id])
already_associated_libraries = [entity["entityName"] for entity in df_sample.libraries[sample_id]]
already_associated_libraries = [entity["entityName"] for entity in df_sample[pluralized_library_term][sample_id]]

print(f"already_associated_libraries {already_associated_libraries}")
print(f"sample_to_libraries[sample_id] {sample_to_libraries[sample_id]}")
Expand All @@ -548,12 +588,12 @@ task create_or_update_sample_tables {
# collapse duplicate sample IDs
sample_to_libraries[sample_id] = list(set(sample_to_libraries[sample_id]))

sample_fname = 'sample_membership.tsv'
sample_fname = '~{sample_aggregation_table_name}_membership.tsv'
with open(sample_fname, 'wt') as outf:
outf.write('entity:sample_id\tlibraries\n')
outf.write(f'entity:~{sample_aggregation_table_name}_id\t{pluralized_library_term}\n')
for sample_id, libraries in sample_to_libraries.items():
#for library_id in sorted(libraries):
outf.write(f'{sample_id}\t{json.dumps([{"entityType":"library","entityName":library_name} for library_name in libraries])}\n')
outf.write(f'{~{sample_aggregation_table_name}_id}\t{json.dumps([{"entityType":"~{seq_library_table_name}","entityName":library_name} for library_name in libraries])}\n')

# if meta_by_filename_json specified and size>0 bytes
# parse as json, pass to for loop below
Expand All @@ -567,26 +607,26 @@ task create_or_update_sample_tables {
meta_by_library_all = json.load(meta_fp)
else:
# API call to get flowcell_data table
header, rows = get_entities_to_table(workspace_project, workspace_name, "flowcell")
df_flowcell = pd.DataFrame.from_records(rows, columns=header, index="flowcell_id")
header, rows = get_entities_to_table(workspace_project, workspace_name, "~{flowcell_table_name}")
df_flowcell = pd.DataFrame.from_records(rows, columns=header, index="~{flowcell_table_name}_id")
# grab the meta_by_sample values from one row in the flowcell_data table
meta_by_library_all = df_flowcell.meta_by_sample[flowcell_data_id]

# grab all the library IDs
header, rows = get_entities_to_table(workspace_project, workspace_name, "library")
header, rows = get_entities_to_table(workspace_project, workspace_name, "~{seq_library_table_name}")
out_rows = []
out_header = ['library_id'] + copy_cols
out_header = ['~{seq_library_table_name}_id'] + copy_cols
print(f"out_header {out_header}")
for row in rows:
out_row = {'library_id': row['library_id']}
out_row = {'~{seq_library_table_name}_id': row['~{seq_library_table_name}_id']}

for sample_id,sample_library_metadata in meta_by_library_all.items():
if sample_library_metadata["library"] in row['library_id']:
if sample_library_metadata["~{seq_library_table_name}"] in row.get('library_id',row['~{seq_library_table_name}']):
for col in copy_cols:
out_row[col] = sample_library_metadata.get(col, '')
out_rows.append(out_row)

library_meta_fname = "sample_metadata.tsv"
library_meta_fname = "~{seq_library_table_name}_metadata.tsv"
with open(library_meta_fname, 'wt') as outf:
outf.write("entity:")
writer = csv.DictWriter(outf, out_header, delimiter='\t', dialect=csv.unix_dialect, quoting=csv.QUOTE_MINIMAL)
Expand Down Expand Up @@ -614,6 +654,10 @@ task create_or_update_sample_tables {
output {
File stdout_log = stdout()
File stderr_log = stderr()

File sample_table_tsv = '~{sample_aggregation_table_name}_membership.tsv'
File library_table_tsv = "~{seq_library_table_name}_metadata.tsv"

Int max_ram_gb = ceil(read_float("MEM_BYTES")/1000000000)
}
}