Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

execute_with_retry() error handling improvements [VS-159] #7480

Merged
merged 11 commits into from
Sep 29, 2021
2 changes: 2 additions & 0 deletions .dockstore.yml
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@ workflows:
- /scripts/variantstore/wdl/GvsCreateAltAllele.example.inputs.json
filters:
branches:
- master
- ah_var_store
- name: GvsExtractCallset
subclass: WDL
Expand Down Expand Up @@ -109,6 +110,7 @@ workflows:
branches:
- master
- ah_var_store
- rsa_handle_prepare_error
- name: GvsSitesOnlyVCF
subclass: WDL
primaryDescriptorPath: /scripts/variantstore/wdl/GvsSitesOnlyVCF.wdl
Expand Down
2 changes: 1 addition & 1 deletion scripts/variantstore/wdl/GvsCreateAltAllele.wdl
Original file line number Diff line number Diff line change
Expand Up @@ -193,7 +193,7 @@ task PopulateAltAlleleTable {
}

runtime {
docker: "us.gcr.io/broad-dsde-methods/variantstore:ah_var_store_20210917"
docker: "us.gcr.io/broad-dsde-methods/variantstore:ah_var_store_20210923"
memory: "3 GB"
disks: "local-disk 10 HDD"
cpu: 1
Expand Down
2 changes: 1 addition & 1 deletion scripts/variantstore/wdl/GvsPrepareCallset.wdl
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ workflow GvsPrepareCallset {
String? docker
}

String docker_final = select_first([docker, "us.gcr.io/broad-dsde-methods/variantstore:ah_var_store_20210806"])
String docker_final = select_first([docker, "us.gcr.io/broad-dsde-methods/variantstore:ah_var_store_20210923"])

call PrepareCallsetTask {
input:
Expand Down
1 change: 1 addition & 0 deletions scripts/variantstore/wdl/extract/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ COPY extract_subpop.py /app
COPY populate_alt_allele_table.py /app
COPY alt_allele_positions.sql /app
COPY alt_allele_temp_function.sql /app
COPY utils.py /app

WORKDIR /app
ENTRYPOINT ["/bin/bash"]
140 changes: 55 additions & 85 deletions scripts/variantstore/wdl/extract/create_cohort_extract_data_table.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,14 @@
# -*- coding: utf-8 -*-
import uuid
import time
import datetime
import argparse
import re

from google.cloud import bigquery
from google.cloud.bigquery.job import QueryJobConfig
from google.oauth2 import service_account

import argparse
import re
import utils

JOB_IDS = set()

Expand All @@ -34,9 +34,6 @@
VET_NEW_TABLE = f"{output_table_prefix}_vet_new"
EXTRACT_SAMPLE_TABLE = f"{output_table_prefix}_sample_names"

def utf8len(s):
return len(s.encode('utf-8'))

def dump_job_stats():
total = 0

Expand All @@ -50,33 +47,6 @@ def dump_job_stats():

print(" Total GBs billed ", total/(1024 * 1024 * 1024), " GBs")

def execute_with_retry(label, sql):
retry_delay = [30, 60, 90] # 3 retries with incremental backoff
start = time.time()
while len(retry_delay) > 0:
try:
query_label = label.replace(" ","-").strip().lower()

existing_labels = client._default_query_job_config.labels
job_labels = existing_labels
job_labels["gvs_query_name"] = query_label
job_config = bigquery.QueryJobConfig(labels=job_labels)
query = client.query(sql, job_config=job_config)

print(f"STARTING - {label} (jobid: {query.job_id})")
JOB_IDS.add((label, query.job_id))
results = query.result()
print(f"COMPLETED ({time.time() - start} s, {3-len(retry_delay)} retries) - {label} (jobid: {query.job_id})")
return results
except Exception as err:
# if there are no retries left... raise
if (len(retry_delay) == 0):
raise err
else:
t = retry_delay.pop(0)
print(f"Error {err} running query {label} (jobid: {query.job_id}), sleeping for {t}")
time.sleep(t)

def get_partition_range(i):
if i < 1 or i > PET_VET_TABLE_COUNT:
raise ValueError(f"out of partition range")
Expand Down Expand Up @@ -114,7 +84,7 @@ def load_sample_names(sample_names_to_extract, fq_temp_table_dataset):
def get_all_sample_ids(fq_destination_table_samples):
sql = f"select sample_id from `{fq_destination_table_samples}`"

results = execute_with_retry("read cohort sample table", sql)
results = utils.execute_with_retry(client, "read cohort sample table", sql)
sample_ids = [row.sample_id for row in list(results)]
sample_ids.sort()
return sample_ids
Expand All @@ -124,14 +94,14 @@ def create_extract_samples_table(fq_destination_table_samples, fq_sample_name_ta
f"SELECT m.sample_id, m.sample_name, m.is_loaded FROM `{fq_sample_name_table}` s JOIN `{fq_sample_mapping_table}` m ON (s.sample_name = m.sample_name) " \
f"WHERE m.is_loaded is TRUE)"

results = execute_with_retry("create extract sample table", sql)
results = utils.execute_with_retry(client, "create extract sample table", sql)
return results

def get_table_count(fq_pet_vet_dataset):
sql = f"SELECT MAX(CAST(SPLIT(table_name, '_')[OFFSET(1)] AS INT64)) max_table_number " \
f"FROM `{fq_pet_vet_dataset}.INFORMATION_SCHEMA.TABLES` " \
f"WHERE REGEXP_CONTAINS(lower(table_name), r'^(pet_[0-9]+)$') "
results = execute_with_retry("get max table", sql)
results = utils.execute_with_retry(client, "get max table", sql)
return int([row.max_table_number for row in list(results)][0])

def make_new_vet_union_all(fq_pet_vet_dataset, fq_temp_table_dataset, sample_ids):
Expand All @@ -144,12 +114,12 @@ def get_subselect(fq_vet_table, samples, id):
for i in range(1, PET_VET_TABLE_COUNT+1):
partition_samples = get_samples_for_partition(sample_ids, i)

# KCIBUL -- grr, should be fixed width
# KCIBUL -- grr, should be fixed width
fq_vet_table = f"{fq_pet_vet_dataset}.{VET_TABLE_PREFIX}{i:03}"
if len(partition_samples) > 0:
subs = {}
create_or_insert = f"\nCREATE OR REPLACE TABLE `{fq_temp_table_dataset}.{VET_NEW_TABLE}` {TEMP_TABLE_TTL} AS \n WITH \n" if i == 1 \
else f"\nINSERT INTO `{fq_temp_table_dataset}.{VET_NEW_TABLE}` \n WITH \n"
else f"\nINSERT INTO `{fq_temp_table_dataset}.{VET_NEW_TABLE}` \n WITH \n"
fq_vet_table = f"{fq_pet_vet_dataset}.{VET_TABLE_PREFIX}{i:03}"
j = 1

Expand All @@ -159,15 +129,15 @@ def get_subselect(fq_vet_table, samples, id):
j = j + 1

sql = create_or_insert + ("\n".join(subs.values())) + "\n" + \
"q_all AS (" + (" union all ".join([ f"(SELECT * FROM q_{id})" for id in subs.keys()])) + ")\n" + \
f" (SELECT * FROM q_all)"
"q_all AS (" + (" union all ".join([ f"(SELECT * FROM q_{id})" for id in subs.keys()])) + ")\n" + \
f" (SELECT * FROM q_all)"

print(sql)
print(f"VET Query is {utf8len(sql)/(1024*1024)} MB in length")
print(f"VET Query is {utils.utf8len(sql)/(1024*1024)} MB in length")
if i == 1:
execute_with_retry("create and populate vet new table", sql)
utils.execute_with_retry(client, "create and populate vet new table", sql)
else:
execute_with_retry("populate vet new table", sql)
utils.execute_with_retry(client, "populate vet new table", sql)
return


Expand All @@ -179,7 +149,7 @@ def create_position_table(fq_temp_table_dataset, min_variant_samples):
# it is == 0 then we don't need to touch the sample_id column (which doubles the cost of this query)
min_sample_clause = ""
if min_variant_samples > 0:
min_sample_clause = f"HAVING COUNT(distinct sample_id) >= {min_variant_samples}"
min_sample_clause = f"HAVING COUNT(distinct sample_id) >= {min_variant_samples}"

sql = f"""
CREATE OR REPLACE TABLE `{dest}` {TEMP_TABLE_TTL}
Expand Down Expand Up @@ -219,12 +189,12 @@ def get_pet_subselect(fq_pet_table, samples, id):
j = j + 1

sql = insert + ("\n".join(subs.values())) + "\n" + \
"q_all AS (" + (" union all ".join([ f"(SELECT * FROM q_{id})" for id in subs.keys()])) + ")\n" + \
f" (SELECT * FROM q_all)"
"q_all AS (" + (" union all ".join([ f"(SELECT * FROM q_{id})" for id in subs.keys()])) + ")\n" + \
f" (SELECT * FROM q_all)"

print(sql)
print(f"{fq_pet_table} query is {utf8len(sql)/(1024*1024)} MB in length")
execute_with_retry("populate destination table with pet data", sql)
print(f"{fq_pet_table} query is {utils.utf8len(sql)/(1024*1024)} MB in length")
utils.execute_with_retry(client, "populate destination table with pet data", sql)

return

Expand All @@ -250,7 +220,7 @@ def create_final_extract_table(fq_destination_table_data):
{FINAL_TABLE_TTL}
"""
print(sql)
results = execute_with_retry("create-final-export-table", sql)
results = utils.execute_with_retry(client, "create-final-export-table", sql)

def populate_final_extract_table_with_vet_new(fq_temp_table_dataset, fq_destination_table_data):
sql = f"""
Expand All @@ -271,23 +241,23 @@ def populate_final_extract_table_with_vet_new(fq_temp_table_dataset, fq_destinat
`{fq_temp_table_dataset}.{VET_NEW_TABLE}`
"""
print(sql)
results = execute_with_retry("populate-final-export-vet", sql)
results = utils.execute_with_retry(client, "populate-final-export-vet", sql)
return

def make_extract_table(fq_pet_vet_dataset,
max_tables,
sample_names_to_extract,
fq_cohort_sample_names,
query_project,
query_labels,
fq_temp_table_dataset,
fq_destination_dataset,
destination_table_prefix,
min_variant_samples,
fq_sample_mapping_table,
sa_key_path,
temp_table_ttl_hours
):
max_tables,
sample_names_to_extract,
fq_cohort_sample_names,
query_project,
query_labels,
fq_temp_table_dataset,
fq_destination_dataset,
destination_table_prefix,
min_variant_samples,
fq_sample_mapping_table,
sa_key_path,
temp_table_ttl_hours
):
try:
fq_destination_table_data = f"{fq_destination_dataset}.{destination_table_prefix}__DATA"
fq_destination_table_samples = f"{fq_destination_dataset}.{destination_table_prefix}__SAMPLES"
Expand All @@ -300,14 +270,14 @@ def make_extract_table(fq_pet_vet_dataset,

# query_labels is string that looks like 'key1=val1, key2=val2'
if query_labels is not None and len(query_labels) != 0:
for query_label in query_labels:
kv = query_label.split("=", 2)
key = kv[0].strip().lower()
value = kv[1].strip().lower()
query_labels_map[key] = value
for query_label in query_labels:
kv = query_label.split("=", 2)
key = kv[0].strip().lower()
value = kv[1].strip().lower()
query_labels_map[key] = value

if not (bool(re.match(r"[a-z0-9_-]+$", key)) & bool(re.match(r"[a-z0-9_-]+$", value))):
raise ValueError(f"label key or value did not pass validation--format should be 'key1=val1, key2=val2'")
if not (bool(re.match(r"[a-z0-9_-]+$", key)) & bool(re.match(r"[a-z0-9_-]+$", value))):
raise ValueError(f"label key or value did not pass validation--format should be 'key1=val1, key2=val2'")

#Default QueryJobConfig will be merged into job configs passed in
#but if a specific default config is being updated (eg labels), new config must be added
Expand All @@ -324,7 +294,7 @@ def make_extract_table(fq_pet_vet_dataset,
default_query_job_config=default_config)
else:
client = bigquery.Client(project=query_project,
default_query_job_config=default_config)
default_query_job_config=default_config)

## TODO -- provide a cmdline arg to override this (so we can simulate smaller datasets)

Expand All @@ -341,9 +311,9 @@ def make_extract_table(fq_pet_vet_dataset,

# if we have a file of sample names, load it into a temporary table
if (sample_names_to_extract):
fq_sample_name_table = load_sample_names(sample_names_to_extract, fq_temp_table_dataset)
fq_sample_name_table = load_sample_names(sample_names_to_extract, fq_temp_table_dataset)
else:
fq_sample_name_table = fq_cohort_sample_names
fq_sample_name_table = fq_cohort_sample_names

# At this point one way or the other we have a table of sample names in BQ,
# join it to the sample_info table to drive the extract
Expand Down Expand Up @@ -387,15 +357,15 @@ def make_extract_table(fq_pet_vet_dataset,
args = parser.parse_args()

make_extract_table(args.fq_petvet_dataset,
args.max_tables,
args.sample_names_to_extract,
args.fq_cohort_sample_names,
args.query_project,
args.query_labels,
args.fq_temp_table_dataset,
args.fq_destination_dataset,
args.destination_cohort_table_prefix,
args.min_variant_samples,
args.fq_sample_mapping_table,
args.sa_key_path,
args.ttl)
args.max_tables,
args.sample_names_to_extract,
args.fq_cohort_sample_names,
args.query_project,
args.query_labels,
args.fq_temp_table_dataset,
args.fq_destination_dataset,
args.destination_cohort_table_prefix,
args.min_variant_samples,
args.fq_sample_mapping_table,
args.sa_key_path,
args.ttl)
32 changes: 3 additions & 29 deletions scripts/variantstore/wdl/extract/populate_alt_allele_table.py
Original file line number Diff line number Diff line change
@@ -1,41 +1,15 @@
import time
import os
import argparse

from google.cloud import bigquery
from google.cloud.bigquery.job import QueryJobConfig
from google.oauth2 import service_account
from pathlib import Path

import argparse
import utils

client = None

def execute_with_retry(label, sql):
retry_delay = [30, 60, 90] # 3 retries with incremental backoff
start = time.time()
while len(retry_delay) > 0:
try:
query_label = label.replace(" ","-").strip().lower()
existing_labels = client._default_query_job_config.labels
job_labels = existing_labels
job_labels["gvs_query_name"] = query_label
job_config = bigquery.QueryJobConfig(labels=job_labels)
query = client.query(sql, job_config=job_config)
print(f"STARTING - {label} (jobid: {query.job_id})")
results = query.result()
job = client.get_job(query.job_id)
mb_billed = int(0 if job.total_bytes_billed is None else job.total_bytes_billed)/(1024 * 1024)
print(f"COMPLETED ({time.time() - start} seconds, {3-len(retry_delay)} retries, {mb_billed} MBs) - {label}")

return results
except Exception as err:
# if there are no retries left... raise
if (len(retry_delay) == 0):
raise err
else:
t = retry_delay.pop(0)
print(f"Error {err} running query {label}, sleeping for {t}")
time.sleep(t)

def populate_alt_allele_table(query_project, vet_table_name, fq_dataset, sa_key_path):
global client
Expand Down Expand Up @@ -63,7 +37,7 @@ def populate_alt_allele_table(query_project, vet_table_name, fq_dataset, sa_key_
position2 as (select * from {fq_vet_table} WHERE call_GT IN ('1/2', '1|2', '2/1', '2|1'))"""

sql = alt_allele_temp_function + query_with + alt_allele_positions
result = execute_with_retry(f"into alt allele from {vet_table_name}", sql)
result = utils.execute_with_retry(client, f"into alt allele from {vet_table_name}", sql)
return result

if __name__ == '__main__':
Expand Down
Loading