Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Load destination support #15

Merged
merged 2 commits into from
Mar 13, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
85 changes: 76 additions & 9 deletions api/load_data.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
from json import load
import sys
import os
import glob
import pymongo
from pymongo.errors import DuplicateKeyError
Expand Down Expand Up @@ -28,7 +30,7 @@ def preprocess_checks(data: list) -> bool:
'''
for document in data:
# check that the document has a valid formatted biomarker id
if not validate_id_format(document['biomarker_id']):
if not validate_id_format(document['biomarker_id']) or 'collision' not in document:
return False
return True

Expand Down Expand Up @@ -62,9 +64,6 @@ def process_data(data: list, dbh, db_collection: str, collision_collection: str,
collision_ops = []

for idx, document in enumerate(data):
if 'collision' not in document:
print(f'No collision key found for entry {idx} in file {fp}.')
logging.error(f'No collision key found for entry {idx} in file {fp}.')
collision_status = document.pop('collision')
if collision_status == 0:
bulk_ops.append(pymongo.InsertOne(document))
Expand Down Expand Up @@ -104,6 +103,53 @@ def process_data(data: list, dbh, db_collection: str, collision_collection: str,

return True

def unreviewed_process_data(data: list, dbh, collision_collection: str, fp: str) -> bool:
''' Inserts the marked files directly into the unreviewed collection.

Parameters
----------
data: list
The data to process.
dbh: pymongo.MongoClient
The database handle.
collision_collection: str
The name of the collision collection to load the data into.
fp: str
The filepath to the data file.

Returns
-------
bool
True if data was loaded, False otherwise.
'''
if not preprocess_checks(data):
logging.error(f'Preprocessing checks failed for file: \'{fp}\'.')
print(f'Preprocessing checks failed for file: \'{fp}\'.')
return False

bulk_ops = []
for idx, document in enumerate(data):
_ = document.pop('collision')
bulk_ops.append(pymongo.InsertOne(document))
if len(bulk_ops) >= BATCH_SIZE:
try:
dbh[collision_collection].bulk_write(bulk_ops, ordered = False)
bulk_ops = []
except Exception as e:
print(f'\nError during bulk ops write:\n\tFile: {fp}\n\tError: {e}.')
logging.error(f'\nError during bulk ops write:\n\tFile: {fp}\n\tError: {e}.')
return False

if bulk_ops:
try:
dbh[collision_collection].bulk_write(bulk_ops, ordered = False)
except Exception as e:
print(f'\nError during bulk ops write:\n\tFile: {fp}\n\tError: {e}.')
logging.error(f'\nError during bulk ops write:\n\tFile: {fp}\n\tError: {e}.')
return False

return True

def load_id_collection(id_collection_data: list, dbh, id_collection: str) -> None:
''' Loads the id_collection.json file into the prod database.

Expand Down Expand Up @@ -170,6 +216,19 @@ def main():

### setup first run biomarker_id index
misc_fns.setup_index(dbh, 'biomarker_id', db_collection, 'biomarker_id_1')

### load the load map
load_map = misc_fns.load_json(f'{data_root_path}/generated/datamodel/new_data/current/load_map.json')
unreviewed_string = 'The following files are marked to be loaded into the unreviewed collection:'
if len(load_map['unreviewed']) > 0:
for file in load_map['unreviewed']:
unreviewed_string += f'\n\t{file}'
else:
unreviewed_string = 'All files are set to be loaded into the reviewed collection barring individual record collision.'
print(unreviewed_string)
if not misc_fns.get_user_confirmation():
print('Exiting...')
sys.exit(0)

### begin processing data
data_release_glob_pattern = f'{data_root_path}/generated/datamodel/new_data/current/*.json'
Expand All @@ -180,12 +239,20 @@ def main():

for fp in glob.glob(data_release_glob_pattern):
data = misc_fns.load_json(fp)
if process_data(data, dbh, db_collection, collision_collection, fp):
logging.info(f'Successfully loaded data for file: {fp}.')
print(f'Successfully loaded data for file: {fp}.')
if os.path.basename(fp) in load_map['unreviewed']:
if unreviewed_process_data(data, dbh, collision_collection, fp):
logging.info(f'Successfully loaded data into unreviewed collection for file: {fp}.')
print(f'Successfully loaded data into unreviewed collection for file: {fp}.')
else:
logging.error(f'Failed to load data for file: {fp}.')
print(f'Failed to load data for file: {fp}.')
else:
logging.error(f'Failed to load data for file: {fp}.')
print(f'Failed to load data for file: {fp}.')
if process_data(data, dbh, db_collection, collision_collection, fp):
logging.info(f'Successfully loaded data for file: {fp}.')
print(f'Successfully loaded data for file: {fp}.')
else:
logging.error(f'Failed to load data for file: {fp}.')
print(f'Failed to load data for file: {fp}.')

logging.info(f'Finished loading data for server: {server}. ---------------------')

Expand Down
19 changes: 18 additions & 1 deletion api/misc_functions.py
Original file line number Diff line number Diff line change
Expand Up @@ -181,4 +181,21 @@ def clean_value(value: str) -> str:
The cleaned value.
'''
value = re.sub(r'[^a-zA-Z0-9]', '', value).lower()
return value
return value

def get_user_confirmation() -> bool:
''' Prompts the user for a confirmation or denial.

Returns
----------
bool
Whether the user confirmed or denied.
'''
while True:
user_input = input('Continue? (y/n)').strip().lower()
if user_input == 'y':
return True
elif user_input == 'n':
return False
else:
print("Please enter 'y' for yes or 'n' for no.")