Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[DC-3780] Accept EHR files as Parquet #1860

Merged
merged 11 commits into from
Jun 28, 2024
11 changes: 8 additions & 3 deletions data_steward/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
from constants.validation.participants.identity_match import REPORT_DIRECTORY_REGEX

# AOU required PII tables
PII = 'pii'
PII_WILDCARD = 'pii*'
PII_NAME = 'pii_name'
PII_EMAIL = 'pii_email'
Expand Down Expand Up @@ -102,10 +103,14 @@
AOU_DEATH = 'aou_death'
AOU_CUSTOM_TABLES = [AOU_DEATH]

AOU_REQUIRED_FILES = [f'{table}.csv' for table in AOU_REQUIRED]
PII_FILES = [f'{table}.csv' for table in PII_TABLES]
AOU_REQUIRED_CSV_FILES = [f'{table}.csv' for table in AOU_REQUIRED]
PII_CSV_FILES = [f'{table}.csv' for table in PII_TABLES]
AOU_REQUIRED_JSONL_FILES = [f'{table}.jsonl' for table in AOU_REQUIRED]
PII_JSONL_FILES = [f'{table}.jsonl' for table in PII_TABLES]
NOTE_JSONL = 'note.jsonl'
SUBMISSION_FILES = AOU_REQUIRED_FILES + PII_FILES
AOU_REQUIRED_PARQUET_FILES = [f'{table}.parquet' for table in AOU_REQUIRED]
PII_PARQUET_FILES = [f'{table}.parquet' for table in PII_TABLES]
SUBMISSION_CSV_FILES = AOU_REQUIRED_CSV_FILES + PII_CSV_FILES
RESULTS_HTML = 'results.html'
PROCESSED_TXT = 'processed.txt'
LOG_JSON = 'log.json'
Expand Down
1 change: 1 addition & 0 deletions data_steward/gcloud/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
# coding=utf-8
1 change: 1 addition & 0 deletions data_steward/resources.py
Original file line number Diff line number Diff line change
Expand Up @@ -468,6 +468,7 @@ def hash_dir(in_dir):
ACHILLES_INDEX_FILES = achilles_index_files()
CDM_CSV_FILES = [f'{table}.csv' for table in CDM_TABLES]
CDM_JSONL_FILES = [f'{table}.jsonl' for table in CDM_TABLES]
CDM_PARQUET_FILES = [f'{table}.parquet' for table in CDM_TABLES]
ALL_ACHILLES_INDEX_FILES = [
name.split(resource_files_path + os.sep)[1].strip()
for name in ACHILLES_INDEX_FILES
Expand Down
6 changes: 3 additions & 3 deletions data_steward/retraction/retract_data_gcs.py
Original file line number Diff line number Diff line change
Expand Up @@ -155,8 +155,8 @@ def retract(pids, bucket, found_files, folder_prefix, force_flag):
"""
for file_name in found_files:
table_name, extension = file_name.split(".")
# TODO retract from JSONL files
if extension.upper() in ['JSON', 'JSONL']:
# TODO retract from JSONL and PARQUET files
if extension.upper() in ['JSON', 'JSONL', 'PARQUET']:
continue
lines_removed = 0
file_gcs_path = f'{bucket.name}/{folder_prefix}{file_name}'
Expand Down Expand Up @@ -226,7 +226,7 @@ def retract(pids, bucket, found_files, folder_prefix, force_flag):
f"Not updating file {file_gcs_path} since pids {pids} not found"
)
elif response.lower() == "n":
logging.info(f"Skipping file {file_gcs_path}")
logging.info(f"Skipping file {file_gcs_path} due to user input")
return


Expand Down
126 changes: 93 additions & 33 deletions data_steward/validation/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,9 @@
from gcloud.bq import BigQueryClient
from gcloud.gcs import StorageClient
import resources
from common import ACHILLES_EXPORT_PREFIX_STRING, ACHILLES_EXPORT_DATASOURCES_JSON, BIGQUERY_DATASET_ID, UNIONED_DATASET_ID
from common import (ACHILLES_EXPORT_PREFIX_STRING,
ACHILLES_EXPORT_DATASOURCES_JSON, BIGQUERY_DATASET_ID,
UNIONED_DATASET_ID)
from constants.validation import hpo_report as report_consts
from constants.validation import main as consts
from retraction import retract_data_bq, retract_data_gcs
Expand Down Expand Up @@ -274,36 +276,77 @@ def validate_submission(hpo_id: str, bucket, folder_items: list,

# Create all tables first to simplify downstream processes
# (e.g. ehr_union doesn't have to check if tables exist)
for file_name in resources.CDM_CSV_FILES + common.PII_FILES:
expected_tables = []
for file_name in resources.CDM_CSV_FILES + common.PII_CSV_FILES:
table_name = file_name.split('.')[0]
table_id = resources.get_table_id(table_name, hpo_id=hpo_id)
expected_tables.append(table_id)
bq_utils.create_standard_table(table_name, table_id, drop_existing=True)

for cdm_file_name in sorted(resources.CDM_CSV_FILES):
file_results, file_errors = perform_validation_on_file(
cdm_file_name, found_cdm_files, hpo_id, folder_prefix, bucket)
results.extend(file_results)
errors.extend(file_errors)
found_csv_files, found_parquet_files, found_jsonl_files = [], [], []
found_csv_tables, found_parquet_tables, found_jsonl_tables = [], [], []
for found_file in found_cdm_files + found_pii_files:
if found_file.endswith('.csv'):
found_csv_files.append(found_file)
found_csv_tables.append(found_file.split('.')[0])
elif found_file.endswith('.parquet'):
# Read split files named cdm_table-part-001.parquet as cdm_table.parquet
found_parquet_files.append(
f"{found_file.split('.')[0].split('-')[0]}.parquet")
found_parquet_tables.append(found_file.split('.')[0].split('-')[0])
elif found_file.endswith('.jsonl'):
found_jsonl_files.append(found_file)
found_jsonl_tables.append(found_file.split('.')[0])
else:
logging.info(f"Ignoring unexpected file type: {found_file}")

# Remove any csv files that have corresponding parquet or jsonl files
found_csv_files = [
csv_file for csv_file in found_csv_files
if csv_file.split('.')[0] in list(
set(found_csv_tables) - set(found_jsonl_tables) -
set(found_parquet_tables))
]

# TODO use sorted(resources.CDM_JSONL_FILES) in the future
for cdm_file_name in [f'{common.NOTE}.jsonl']:
file_results, file_errors = perform_validation_on_file(
cdm_file_name, found_cdm_files, hpo_id, folder_prefix, bucket)
# If JSONL file found, remove note.csv and add note.jsonl instead
if file_results[0][1]:
results = [
result for result in results
if not result[0].startswith(f'{common.NOTE}')
]
# Remove any jsonl files that have corresponding parquet files
found_jsonl_files = [
jsonl_file for jsonl_file in found_jsonl_files
if jsonl_file.split('.')[0] in list(
set(found_jsonl_tables) - set(found_parquet_tables))
]

for cdm_file_name in sorted(resources.CDM_PARQUET_FILES) + sorted(
common.PII_PARQUET_FILES):
if cdm_file_name in found_parquet_files:
file_results, file_errors = perform_validation_on_file(
cdm_file_name, found_parquet_files, hpo_id, folder_prefix,
bucket)
results.extend(file_results)
errors.extend(file_errors)

for pii_file_name in sorted(common.PII_FILES):
for cdm_file_name in sorted(resources.CDM_JSONL_FILES) + sorted(
common.PII_JSONL_FILES):
if cdm_file_name in found_jsonl_files:
file_results, file_errors = perform_validation_on_file(
cdm_file_name, found_jsonl_files, hpo_id, folder_prefix, bucket)
results.extend(file_results)
errors.extend(file_errors)

for cdm_file_name in sorted(resources.CDM_CSV_FILES) + sorted(
common.PII_CSV_FILES):
# Skip if table in list of found parquet/jsonl tables
if cdm_file_name.split(
'.')[0] in found_parquet_tables + found_jsonl_tables:
continue
file_results, file_errors = perform_validation_on_file(
pii_file_name, found_pii_files, hpo_id, folder_prefix, bucket)
cdm_file_name, found_csv_files, hpo_id, folder_prefix, bucket)
results.extend(file_results)
errors.extend(file_errors)

# Order results by file name and keep pii/participant separate
results.sort(key=lambda x: (x[0].startswith(
(common.PII, common.PARTICIPANT_MATCH)), x[0]))

# (filename, message) for each unknown file
warnings = [
(unknown_file, common.UNKNOWN_FILE) for unknown_file in unknown_files
Expand Down Expand Up @@ -699,7 +742,7 @@ def get_hpo_missing_pii_query(hpo_id):
def perform_validation_on_file(file_name: str, found_file_names: list,
hpo_id: str, folder_prefix, bucket):
"""
Attempts to load a csv file into BigQuery
Attempts to load a Parquet/JSONL/csv file into BigQuery

:param file_name: name of the file to validate
:param found_file_names: files found in the submission folder
Expand All @@ -714,8 +757,17 @@ def perform_validation_on_file(file_name: str, found_file_names: list,
results = []
found = parsed = loaded = 0
table_name, extension = file_name.split('.')
if extension.upper() == 'JSONL':
logging.info(f"Validating JSONL file '{file_name}' if found")
# remove any parts from the table name, assuming cdm_table-part-001.parquet
table_name = table_name.split('-')[0]

source_format_dict = {
'PARQUET': bigquery.SourceFormat.PARQUET,
'JSONL': bigquery.SourceFormat.NEWLINE_DELIMITED_JSON
}

if extension.upper() in ['JSONL', 'PARQUET']:
logging.info(
f"Validating {extension.upper()} file '{file_name}' if found")
if file_name in found_file_names:
logging.info(f"Found file '{file_name}'")
found = 1
Expand All @@ -725,21 +777,22 @@ def perform_validation_on_file(file_name: str, found_file_names: list,

bq_client = BigQueryClient(app_id)

if table_name not in resources.CDM_TABLES:
if table_name not in resources.CDM_TABLES + common.PII_TABLES:
raise ValueError(f'{table_name} is not a valid table to load')

dataset_id: str = BIGQUERY_DATASET_ID

# Caution: this will load note and note_nlp tables into note_nlp table if note_nlp is uploaded
# Works for now since note_nlp is not submitted by sites and we are not expecting it
gcs_object_path: str = (f'gs://{hpo_bucket.name}/'
f'{folder_prefix}'
f'{table_name}.{extension}')
f'{table_name}*.{extension}')
table_id = resources.get_table_id(table_name, hpo_id)
fq_table_id = f'{bq_client.project}.{dataset_id}.{table_id}'

job_config = bigquery.LoadJobConfig(
schema=bq_client.get_table_schema(table_name),
source_format=bigquery.SourceFormat.NEWLINE_DELIMITED_JSON,
)
source_format=source_format_dict[extension.upper()])

load_job = bq_client.load_table_from_uri(
gcs_object_path,
Expand Down Expand Up @@ -799,7 +852,7 @@ def perform_validation_on_file(file_name: str, found_file_names: list,
logging.error(message)
raise InternalValidationError(message)

if file_name in common.SUBMISSION_FILES:
if file_name in common.SUBMISSION_CSV_FILES:
results.append((file_name, found, parsed, loaded))

return results, errors
Expand Down Expand Up @@ -837,9 +890,11 @@ def list_submitted_bucket_items(folder_bucketitems):
utc_today = datetime.datetime.now(tz=None)

# If any required file missing, stop submission
folder_bucketitems_table_names = [
basename(file_name).split('.')[0] for file_name in folder_bucketitems
]
folder_bucketitems_table_names = list(
set([
basename(file_name).split('.')[0].split('-')[0]
for file_name in folder_bucketitems
]))

to_process_items = [
item for item in folder_bucketitems
Expand Down Expand Up @@ -963,12 +1018,17 @@ def _get_submission_folder(bucket, bucket_items, force_process=False):

def _is_cdm_file(gcs_file_name):
return gcs_file_name.lower(
) in resources.CDM_CSV_FILES or gcs_file_name.lower(
) in resources.CDM_JSONL_FILES
) in resources.CDM_CSV_FILES + resources.CDM_JSONL_FILES + resources.CDM_PARQUET_FILES or (
gcs_file_name.lower().startswith(tuple(resources.CDM_TABLES)) and
gcs_file_name.lower().endswith('parquet'))


def _is_pii_file(gcs_file_name):
return gcs_file_name.lower() in common.PII_FILES
return gcs_file_name.lower(
) in common.PII_CSV_FILES + common.PII_JSONL_FILES + common.PII_PARQUET_FILES or (
gcs_file_name.lower().startswith(
(common.PII, common.PARTICIPANT_MATCH)) and
gcs_file_name.lower().endswith('parquet'))


def _is_known_file(gcs_file_name):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,8 +59,8 @@ def test_integration_five_person_data_retraction_skip(
mock_extract_pids.return_value = self.skip_pids
lines_to_remove = {}
expected_lines_post = {}
# Exclude note.jsonl until accounted for
for file_path in test_util.FIVE_PERSONS_FILES[:-1]:
# Exclude jsonl and parquet file until accounted for
for file_path in test_util.FIVE_PERSONS_FILES[:-4]:
# generate results files
file_name = file_path.split('/')[-1]
lines_to_remove[file_name] = 0
Expand Down Expand Up @@ -95,8 +95,8 @@ def test_integration_five_person_data_retraction_skip(
site_bucket=self.site_bucket)

total_lines_post = {}
# Exclude note.jsonl until accounted for
for file_path in test_util.FIVE_PERSONS_FILES[:-1]:
# Exclude jsonl and parquet file until accounted for
for file_path in test_util.FIVE_PERSONS_FILES[:-4]:
file_name = file_path.split('/')[-1]
blob = self.gcs_bucket.blob(f'{self.folder_prefix_1}{file_name}')

Expand All @@ -123,8 +123,8 @@ def test_integration_five_person_data_retraction(self, mock_extract_pids):
"""
mock_extract_pids.return_value = self.pids
expected_lines_post = {}
# Exclude note.jsonl until accounted for
for file_path in test_util.FIVE_PERSONS_FILES[:-1]:
# Exclude jsonl and parquet files until accounted for
for file_path in test_util.FIVE_PERSONS_FILES[:-4]:
# generate results files
file_name = file_path.split('/')[-1]
table_name = file_name.split('.')[0]
Expand Down Expand Up @@ -164,8 +164,8 @@ def test_integration_five_person_data_retraction(self, mock_extract_pids):
site_bucket=self.site_bucket)

total_lines_post = {}
# Exclude note.jsonl until accounted for
for file_path in test_util.FIVE_PERSONS_FILES[:-1]:
# Exclude jsonl and parquet file until accounted for
for file_path in test_util.FIVE_PERSONS_FILES[:-4]:
file_name = file_path.split('/')[-1]
blob = self.gcs_bucket.blob(f'{self.folder_prefix_1}{file_name}')
actual_result_contents = blob.download_as_string().split(b'\n')
Expand Down
12 changes: 7 additions & 5 deletions tests/integration_tests/data_steward/validation/main_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -107,15 +107,17 @@ def _table_has_clustering(self, table_obj):
@mock.patch("gcloud.gcs.LOOKUP_TABLES_DATASET_ID", dataset_id)
def test_all_files_unparseable_output(self):
# TODO possible bug: if no pre-existing table, results in bq table not found error
for cdm_table in common.SUBMISSION_FILES:
for cdm_table in common.SUBMISSION_CSV_FILES:
cdm_blob = self.hpo_bucket.blob(f'{self.folder_prefix}{cdm_table}')
cdm_blob.upload_from_string('.\n .')

item_metadata: list = self.storage_client.get_bucket_items_metadata(
self.hpo_bucket)
folder_items: list = main.get_folder_items(item_metadata,
self.folder_prefix)
expected_results: list = [(f, 1, 0, 0) for f in common.SUBMISSION_FILES]
expected_results: list = [
(f, 1, 0, 0) for f in common.SUBMISSION_CSV_FILES
]
actual: list = main.validate_submission(self.hpo_id, self.hpo_bucket,
folder_items,
self.folder_prefix)
Expand Down Expand Up @@ -152,7 +154,7 @@ def test_validate_five_persons_success(self, mock_check_cron):
os.path.basename(f) for f in test_util.FIVE_PERSONS_FILES
]

for cdm_filename in common.SUBMISSION_FILES:
for cdm_filename in common.SUBMISSION_CSV_FILES:
if cdm_filename in test_file_names:
expected_result: tuple = (cdm_filename, 1, 1, 1)
test_filepath: str = os.path.join(test_util.FIVE_PERSONS_PATH,
Expand Down Expand Up @@ -185,7 +187,7 @@ def test_validate_five_persons_success(self, mock_check_cron):

def test_check_processed(self):

for fname in common.AOU_REQUIRED_FILES:
for fname in common.AOU_REQUIRED_CSV_FILES:
blob_name: str = f'{self.folder_prefix}{fname}'
test_blob = self.hpo_bucket.blob(blob_name)
test_blob.upload_from_string('\n')
Expand Down Expand Up @@ -285,7 +287,7 @@ def test_pii_files_loaded(self, mock_check_cron):
expected_results: list = [(r['file_name'], int(r['found']),
int(r['parsed']), int(r['loaded']))
for r in rs]
for f in common.SUBMISSION_FILES:
for f in common.SUBMISSION_CSV_FILES:
if f not in test_file_names:
expected_result: tuple = (f, 0, 0, 0)
expected_results.append(expected_result)
Expand Down
Binary file not shown.
Binary file not shown.
Binary file not shown.
14 changes: 11 additions & 3 deletions tests/test_util.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,12 +52,20 @@
FIVE_PERSONS_PARTICIPANT_MATCH_CSV = os.path.join(FIVE_PERSONS_PATH,
'participant_match.csv')
FIVE_PERSONS_NOTE_JSONL = os.path.join(FIVE_PERSONS_PATH, 'note.jsonl')
FIVE_PERSONS_SPECIMEN_PARQUET_1 = os.path.join(FIVE_PERSONS_PATH,
'specimen-part-001.parquet')
FIVE_PERSONS_SPECIMEN_PARQUET_2 = os.path.join(FIVE_PERSONS_PATH,
'specimen-part-002.parquet')
FIVE_PERSONS_PII_PHONE_NUMBER_PARQUET = os.path.join(
FIVE_PERSONS_PATH, 'pii_phone_number.parquet')
FIVE_PERSONS_FILES = [
FIVE_PERSONS_PERSON_CSV, FIVE_PERSONS_VISIT_OCCURRENCE_CSV,
FIVE_PERSONS_CONDITION_OCCURRENCE_CSV,
FIVE_PERSONS_PROCEDURE_OCCURRENCE_CSV, FIVE_PERSONS_DRUG_EXPOSURE_CSV,
FIVE_PERSONS_MEASUREMENT_CSV, FIVE_PERSONS_PII_NAME_CSV,
FIVE_PERSONS_PARTICIPANT_MATCH_CSV, FIVE_PERSONS_NOTE_JSONL
FIVE_PERSONS_PARTICIPANT_MATCH_CSV, FIVE_PERSONS_NOTE_JSONL,
FIVE_PERSONS_SPECIMEN_PARQUET_1, FIVE_PERSONS_SPECIMEN_PARQUET_2,
FIVE_PERSONS_PII_PHONE_NUMBER_PARQUET
]

FIVE_PERSONS_SUCCESS_RESULT_CSV = os.path.join(
Expand Down Expand Up @@ -220,7 +228,7 @@ def get_table_counts(dataset_id, table_ids=None, where=''):
def normalize_field_payload(field):
"""
Standardize schema field payload so it is easy to compare in tests

:param field: a field from a table/query's schema
:return: the normalized field
"""
Expand Down Expand Up @@ -369,7 +377,7 @@ def setup_hpo_id_bucket_name_table(client, dataset_id):
def drop_hpo_id_bucket_name_table(client, dataset_id):
"""
Drops `hpo_id_bucket_name` table that `get_hpo_bucket()` looks up.

:param client: a BigQueryClient
:param dataset_id: dataset id where the lookup table is located
"""
Expand Down
Loading