From 343313a0d8a668c2f478b1ceb4e6f9802d0768e3 Mon Sep 17 00:00:00 2001 From: Christopher Tomkins-Tinch Date: Tue, 23 Apr 2024 09:29:35 -0400 Subject: [PATCH 1/2] parameterize table names in create_or_update_sample_tables task --- pipes/WDL/tasks/tasks_terra.wdl | 72 +++++++++++++++++++++++---------- 1 file changed, 50 insertions(+), 22 deletions(-) diff --git a/pipes/WDL/tasks/tasks_terra.wdl b/pipes/WDL/tasks/tasks_terra.wdl index 9002a6a43..7d760124a 100644 --- a/pipes/WDL/tasks/tasks_terra.wdl +++ b/pipes/WDL/tasks/tasks_terra.wdl @@ -382,8 +382,12 @@ task create_or_update_sample_tables { Array[String]? raw_reads_unaligned_bams Array[String]? cleaned_reads_unaligned_bams - File? meta_by_filename_json - File? meta_by_sample_json + File? meta_by_filename_json + File? meta_by_sample_json + + String flowcell_table_name = "flowcell" + String seq_library_table_name = "library" + String sample_aggregation_table_name = "sample" String docker = "quay.io/broadinstitute/viral-core:2.2.4" #skip-global-version-pin } @@ -437,10 +441,10 @@ task create_or_update_sample_tables { def get_bam_lists_for_flowcell_from_live_table(project, workspace, runID): # API call to get flowcell_data table - response = fapi.get_entities_tsv(project, workspace, "flowcell", model="flexible") + response = fapi.get_entities_tsv(project, workspace, "~{flowcell_table_name}", model="flexible") # read API response into data frame - df = pd.read_csv(StringIO(response.text), sep="\t", index_col="entity:flowcell_id") + df = pd.read_csv(StringIO(response.text), sep="\t", index_col="entity:~{flowcell_table_name}_id") # create library.tsv data frame (entity:library_id) cleaned_bams_list = literal_eval(df.cleaned_reads_unaligned_bams[runID]) @@ -450,14 +454,14 @@ task create_or_update_sample_tables { def create_library_to_bam_tsvs(cleaned_bams_list, raw_bams_list, runID): cleaned_library_id_list = [bam.split("/")[-1].replace(".bam", "").replace(".cleaned", "") for bam in cleaned_bams_list] - df_library_table = pd.DataFrame({"entity:library_id" : cleaned_library_id_list, + df_library_table = pd.DataFrame({"entity:~{seq_library_table_name}_id" : cleaned_library_id_list, "cleaned_bam" : cleaned_bams_list}) cleaned_library_fname = runID + "_cleaned_bams.tsv" df_library_table.to_csv(cleaned_library_fname, sep="\t", index=False) # create library.tsv data frame (entity:library_id) raw_library_id_list = [bam.split("/")[-1].replace(".bam", "") for bam in raw_bams_list] - df_library_table = pd.DataFrame({"entity:library_id" : raw_library_id_list, + df_library_table = pd.DataFrame({"entity:~{seq_library_table_name}_id" : raw_library_id_list, "raw_bam" : raw_bams_list}) raw_library_fname = runID + "_raw_bams.tsv" df_library_table.to_csv(raw_library_fname, sep="\t", index=False) @@ -499,12 +503,12 @@ task create_or_update_sample_tables { copy_cols = ["sample_original", "spike_in"] # API call to get existing sample_set mappings - header, rows = get_entities_to_table(workspace_project, workspace_name, "sample") - df_sample = pd.DataFrame.from_records(rows, columns=header, index="sample_id") + header, rows = get_entities_to_table(workspace_project, workspace_name, "~{sample_aggregation_table_name}") + df_sample = pd.DataFrame.from_records(rows, columns=header, index="~{sample_aggregation_table_name}_id") # API call to get all existing library ids - header, rows = get_entities_to_table(workspace_project, workspace_name, "library") - df_library = pd.DataFrame.from_records(rows, columns=header, index="library_id") + header, rows = get_entities_to_table(workspace_project, workspace_name, "~{seq_library_table_name}") + df_library = pd.DataFrame.from_records(rows, columns=header, index="~{seq_library_table_name}_id") # if meta_by_filename_json specified and size>0 bytes # parse as json, pass to for loop below @@ -518,20 +522,44 @@ task create_or_update_sample_tables { library_meta_dict = json.load(meta_fp) else: # API call to get flowcell_data table - header, rows = get_entities_to_table(workspace_project, workspace_name, "flowcell") - df_flowcell = pd.DataFrame.from_records(rows, columns=header, index="flowcell_id") + header, rows = get_entities_to_table(workspace_project, workspace_name, "~{flowcell_table_name}") + df_flowcell = pd.DataFrame.from_records(rows, columns=header, index="~{flowcell_table_name}_id") library_meta_dict = df_flowcell.meta_by_filename[flowcell_data_id] # grab the meta_by_filename values to create new sample->library (sample_set->sample) mappings sample_to_libraries = {} for library_id, data in library_meta_dict.items(): - sample_id = data['sample'] + # get the sample ID from either the json files provided, or the live flowcell table + # (defaulting to the "sample" key used in the json, but falling back to the WDL-specified table name) + sample_id = data.get('sample',data['~{sample_aggregation_table_name}']) sample_to_libraries.setdefault(sample_id, []) if library_id in df_library.index: sample_to_libraries[sample_id].append(library_id) else: print (f"missing {library_id} from library table") + # (very) naive pluralization + def pluralize(input_noun_str): + if input_noun_str.endswith(('s', 'x', 'z', 'ch', 'sh')): + return input_noun_str + 'es' + elif input_noun_str.endswith('y'): + # if a consonant precedes the y, 'y' -> 'ies' + if input_noun_str[-2] not in 'aeiou': + return input_noun_str[:-1] + 'ies' + else: + return input_noun_str + 's' + elif input_noun_str.endswith('f'): + return input_noun_str[:-1] + 'ves' + elif input_noun_str.endswith('fe'): + return input_noun_str[:-2] + 'ves' + else: + return input_noun_str + 's' + + # naively pluralize "library" table name for column in the "sample" table + # listing one or more such entities + # (or what the inputs specify these names should be called) + pluralized_library_term = pluralize("~{seq_library_table_name}") + # merge in new sample->library mappings with any pre-existing sample->library mappings if len(df_sample)>0: print(df_sample.index) @@ -539,7 +567,7 @@ task create_or_update_sample_tables { if sample_id in df_sample.index: print (f"sample_set {sample_id} pre-exists in Terra table, merging with new members") #sample_set_to_samples[set_id].extend(df_sample_set.samples[set_id]) - already_associated_libraries = [entity["entityName"] for entity in df_sample.libraries[sample_id]] + already_associated_libraries = [entity["entityName"] for entity in df_sample[pluralized_library_term][sample_id]] print(f"already_associated_libraries {already_associated_libraries}") print(f"sample_to_libraries[sample_id] {sample_to_libraries[sample_id]}") @@ -550,10 +578,10 @@ task create_or_update_sample_tables { sample_fname = 'sample_membership.tsv' with open(sample_fname, 'wt') as outf: - outf.write('entity:sample_id\tlibraries\n') + outf.write(f'entity:~{sample_aggregation_table_name}_id\t{pluralized_library_term}\n') for sample_id, libraries in sample_to_libraries.items(): #for library_id in sorted(libraries): - outf.write(f'{sample_id}\t{json.dumps([{"entityType":"library","entityName":library_name} for library_name in libraries])}\n') + outf.write(f'{~{sample_aggregation_table_name}_id}\t{json.dumps([{"entityType":"~{seq_library_table_name}","entityName":library_name} for library_name in libraries])}\n') # if meta_by_filename_json specified and size>0 bytes # parse as json, pass to for loop below @@ -567,21 +595,21 @@ task create_or_update_sample_tables { meta_by_library_all = json.load(meta_fp) else: # API call to get flowcell_data table - header, rows = get_entities_to_table(workspace_project, workspace_name, "flowcell") - df_flowcell = pd.DataFrame.from_records(rows, columns=header, index="flowcell_id") + header, rows = get_entities_to_table(workspace_project, workspace_name, "~{flowcell_table_name}") + df_flowcell = pd.DataFrame.from_records(rows, columns=header, index="~{flowcell_table_name}_id") # grab the meta_by_sample values from one row in the flowcell_data table meta_by_library_all = df_flowcell.meta_by_sample[flowcell_data_id] # grab all the library IDs - header, rows = get_entities_to_table(workspace_project, workspace_name, "library") + header, rows = get_entities_to_table(workspace_project, workspace_name, "~{seq_library_table_name}") out_rows = [] - out_header = ['library_id'] + copy_cols + out_header = ['~{seq_library_table_name}_id'] + copy_cols print(f"out_header {out_header}") for row in rows: - out_row = {'library_id': row['library_id']} + out_row = {'~{seq_library_table_name}_id': row['~{seq_library_table_name}_id']} for sample_id,sample_library_metadata in meta_by_library_all.items(): - if sample_library_metadata["library"] in row['library_id']: + if sample_library_metadata["~{seq_library_table_name}"] in row.get('library_id',row['~{seq_library_table_name}']): for col in copy_cols: out_row[col] = sample_library_metadata.get(col, '') out_rows.append(out_row) From eb59e7161ebbda1cde623c169805eae2d8339ba2 Mon Sep 17 00:00:00 2001 From: Christopher Tomkins-Tinch Date: Tue, 23 Apr 2024 09:56:20 -0400 Subject: [PATCH 2/2] output table tsvs from create_or_update_sample_tables; add parameter_meta --- pipes/WDL/tasks/tasks_terra.wdl | 20 ++++++++++++++++++-- 1 file changed, 18 insertions(+), 2 deletions(-) diff --git a/pipes/WDL/tasks/tasks_terra.wdl b/pipes/WDL/tasks/tasks_terra.wdl index 7d760124a..42af6f048 100644 --- a/pipes/WDL/tasks/tasks_terra.wdl +++ b/pipes/WDL/tasks/tasks_terra.wdl @@ -396,6 +396,18 @@ task create_or_update_sample_tables { volatile: true } + parameter_meta { + flowcell_table_name: { + description: "Name of table containing per-flowcell rows. If overridden, flowcell_table_name must not contain spaces." + } + seq_library_table_name: { + description: "Name of table containing per-library rows. If overridden, seq_library_table_name must not contain spaces." + } + sample_aggregation_table_name: { + description: "Name of table representing samples, where each row has a foreign key link to one or more per-library rows. If overridden, sample_aggregation_table_name must not contain spaces." + } + } + command <<< python3<