From d768ac9bb749d908914bc19ec501868a68413cdf Mon Sep 17 00:00:00 2001 From: Dakota Blair Date: Tue, 27 Oct 2020 16:08:59 -0400 Subject: [PATCH] Fixed #56: add --dry-run command line parameter. This commit adds the --dry-run parameter to validate the input data and print an output summary even in the presence of errors. To this end, the --output parameter is also introduced to give an option to display the summary as JSON or in a more user-friendly text format. The --debug parameter will repeat the full list of errors when there are any at the end of the run. Errors are now printed to stderr and may be surpressed using shell redirection. --- importers/djornl/parser.py | 171 ++++++++++++++---- importers/test/test_djornl_parser.py | 12 +- .../test/test_djornl_parser_integration.py | 2 +- 3 files changed, 142 insertions(+), 43 deletions(-) diff --git a/importers/djornl/parser.py b/importers/djornl/parser.py index 6607e0d2..10db0d6c 100644 --- a/importers/djornl/parser.py +++ b/importers/djornl/parser.py @@ -15,15 +15,19 @@ RES_ROOT_DATA_PATH=/path/to/data/dir python -m importers.djornl.parser """ +import argparse +import csv import json -import requests +import logging import os -import csv +import requests import yaml import importers.utils.config as config from relation_engine_server.utils.json_validation import run_validator, get_schema_validator +LOGGER = logging.getLogger(__name__) + class DJORNL_Parser(object): @@ -48,7 +52,9 @@ def config(self, value): def _configure(self): - configuration = config.load_from_env(extra_required=['ROOT_DATA_PATH']) + configuration = config.load_from_env( + extra_required=['ROOT_DATA_PATH'] + ) # Collection name config configuration['node_name'] = 'djornl_node' @@ -89,7 +95,9 @@ def _get_dataset_schema_dir(self): if not hasattr(self, '_dataset_schema_dir'): dir_path = os.path.dirname(os.path.realpath(__file__)) - self._dataset_schema_dir = os.path.join(dir_path, '../', '../', 'spec', 'datasets', 'djornl') + self._dataset_schema_dir = os.path.join( + dir_path, '../', '../', 'spec', 'datasets', 'djornl' + ) return self._dataset_schema_dir @@ -131,7 +139,7 @@ def _get_file_reader(self, fd, file): '''Given a dict containing file information, instantiate the correct type of parser''' delimiter = '\t' - if 'file_format' in file and file['file_format'].lower() == 'csv' or file['path'].lower().endswith('.csv'): + if file.get('file_format', '').lower() == 'csv' or file['path'].lower().endswith('.csv'): delimiter = ',' return csv.reader(fd, delimiter=delimiter) @@ -176,8 +184,8 @@ def check_headers(self, headers, validator=None): :return header_errs: (dict) dict of header errors: 'missing': required headers that are missing from the input - 'invalid': additional headers that should not be in the input - 'duplicate': duplicated headers (content would be overwritten) + 'invalid': headers that should not be in the input + 'duplicate': duplicated headers (data would be overwritten) If the list of headers supplied is valid--i.e. it contains all the fields marked as required in the validator schema--or no validator has been supplied, the method @@ -207,7 +215,7 @@ def check_headers(self, headers, validator=None): if missing_headers: header_errs['missing'] = missing_headers - if 'additionalProperties' in validator.schema and validator.schema['additionalProperties'] is False: + if not validator.schema.get('additionalProperties', True): all_props = validator.schema['properties'].keys() extra_headers = [i for i in headers if i not in all_props] if extra_headers: @@ -266,13 +274,18 @@ def process_file(self, file, remap_fn, store_fn, err_list, validator=None): :param validator: (Validator) jsonschema validator object """ - print("Parsing " + file['data_type'] + " file " + file['file_path']) + LOGGER.info("Parsing " + file['data_type'] + " file " + file['file_path']) file_parser = self.parser_gen(file) + + def add_error(error): + LOGGER.error(error) + err_list.append(error) + try: (line_no, cols, err_str) = next(file_parser) except StopIteration: # no valid lines found in the file - err_list.append(f"{file['path']}: no header line found") + add_error(f"{file['path']}: no header line found") return header_errors = self.check_headers(cols, validator) @@ -284,7 +297,7 @@ def process_file(self, file, remap_fn, store_fn, err_list, validator=None): } for err_type in ['missing', 'invalid', 'duplicate']: if err_type in header_errors: - err_list.append( + add_error( f"{file['path']}: {err_str[err_type]} headers: " + ", ".join(sorted(header_errors[err_type])) ) @@ -295,7 +308,7 @@ def process_file(self, file, remap_fn, store_fn, err_list, validator=None): for (line_no, cols, err_str) in file_parser: # mismatch in number of cols if cols is None: - err_list.append(err_str) + add_error(err_str) continue # merge headers with cols to create an object @@ -308,7 +321,7 @@ def process_file(self, file, remap_fn, store_fn, err_list, validator=None): f"{file['path']} line {line_no}: " + e.message for e in sorted(validator.iter_errors(row_object), key=str) ) - err_list.append(err_msg) + add_error(err_msg) continue try: @@ -316,7 +329,7 @@ def process_file(self, file, remap_fn, store_fn, err_list, validator=None): datum = self.remap_object(row_object, remap_fn) except Exception as err: err_type = type(err) - err_list.append( + add_error( f"{file['path']} line {line_no}: error remapping data: {err_type} {err}" ) continue @@ -326,16 +339,16 @@ def process_file(self, file, remap_fn, store_fn, err_list, validator=None): if storage_error is None: n_stored += 1 else: - err_list.append(f"{file['path']} line {line_no}: " + storage_error) + add_error(f"{file['path']} line {line_no}: " + storage_error) if not n_stored: - err_list.append(f"{file['path']}: no valid data found") + add_error(f"{file['path']}: no valid data found") def store_parsed_edge_data(self, datum): """ store node and edge data in the node (node_ix) and edge (edge_ix) indexes respectively - - Nodes are indexed by the '_key' attribute. Parsed edge data only contains node '_key' values. + Nodes are indexed by the '_key' attribute. + Parsed edge data only contains node '_key' values. Edges are indexed by the unique combination of the two node IDs and the edge type. It is assumed that if there is more than one score for a given combination of node IDs and edge @@ -380,7 +393,9 @@ def load_edges(self): # can do so because that key is in a 'required' property in the CSV spec file remap_functions = { # create a unique key for each record - '_key': lambda row: '__'.join([row[_] for _ in ['node1', 'node2', 'edge_type', 'score']]), + '_key': lambda row: '__'.join( + [row[_] for _ in ['node1', 'node2', 'edge_type', 'score']] + ), 'node1': None, # this will be deleted in the 'store' step 'node2': None, # as will this '_from': lambda row: node_name + '/' + row['node1'], @@ -399,8 +414,8 @@ def load_edges(self): ) return { - 'nodes': self.node_ix.values(), - 'edges': self.edge_ix.values(), + 'nodes': list(self.node_ix.values()), + 'edges': list(self.edge_ix.values()), 'err_list': err_list, } @@ -431,7 +446,10 @@ def _try_node_merge(self, existing_node, new_node, path=[]): merge = {**existing_node, **new_node} # find the shared keys -- keys in both existing and new nodes where the values differ - shared_keys = [i for i in new_node if i in existing_node and new_node[i] != existing_node[i]] + shared_keys = [ + i for i in new_node + if i in existing_node and new_node[i] != existing_node[i] + ] # if there were no shared keys, return the merged list if not shared_keys: @@ -589,7 +607,9 @@ def load_clusters(self): for file in self.config('cluster_files'): prefix = file['cluster_prefix'] - remap_functions['cluster_id'] = lambda row: prefix + ':' + row['cluster_id'].replace('Cluster', '') + remap_functions['cluster_id'] = ( + lambda row: prefix + ':' + row['cluster_id'].replace('Cluster', '') + ) self.process_file( file=file, @@ -646,22 +666,19 @@ def load_data(self, dry_run=False): if output['err_list']: all_errs = all_errs + output['err_list'] - if all_errs: - raise RuntimeError("\n".join(all_errs)) - - if dry_run: - # report stats on the data that has been gathered - return self.summarise_dataset() + # report stats on the data that has been gathered + return self.summarise_dataset(all_errs) - # otherwise, save the dataset - self.save_dataset() + # save the dataset, unless this is a dry run + if not dry_run: + self.save_dataset() return True - def summarise_dataset(self): + def summarise_dataset(self, errs): """summarise the data that has been loaded""" # go through the node index, checking for nodes that only have one attribute ('_key') or - # were loaded from the clusters files, with their only attributes being '_key' and 'clusters' + # were loaded from the clusters files, with only '_key' and 'clusters' attributes node_type_ix = { '__NO_TYPE__': 0 @@ -709,13 +726,95 @@ def summarise_dataset(self): 'cluster': len(node_data['cluster']), 'full': len(node_data['full']) }, + 'errors_total': len(errs), + 'errors': errs } -if __name__ == '__main__': +def format_summary(summary, output): + if output == 'json': + return json.dumps(summary) + node_type_counts = [count for count in summary['node_type_count'].values()] + edge_type_counts = [count for count in summary['node_type_count'].values()] + values = [ + summary['nodes_total'], + summary['edges_total'], + summary['nodes_in_edge'], + summary['node_data_available']['key_only'], + summary['node_data_available']['cluster'], + summary['node_data_available']['full'], + summary.get('errors_total'), + ] + node_type_counts + edge_type_counts + value_width = max([len(str(value)) for value in values]) + node_type_names = dict(__NO_TYPE__="No type") + node_types = "\n".join([( + f"{count:{value_width}} {node_type_names.get(ntype, ntype)}" + .format(value_width) + ) + for ntype, count in summary['node_type_count'].items() + ]) + edge_type_names = dict() + edge_types = "\n".join([( + f"{count:{value_width}} {edge_type_names.get(etype, etype)}" + .format(value_width) + ) + for etype, count in summary['edge_type_count'].items() + ]) + text_summary = f""" +{summary['nodes_total']:{value_width}} Total nodes +{summary['edges_total']:{value_width}} Total edges +{summary['nodes_in_edge']:{value_width}} Nodes in edge +--- +Node Types +{node_types:{value_width}} +--- +Edge Types +{edge_types:{value_width}} +--- +Node data available +{summary['node_data_available']['key_only']:{value_width}} Key only +{summary['node_data_available']['cluster']:{value_width}} Cluster +{summary['node_data_available']['full']:{value_width}} Full +--- +{summary.get('errors_total'):{value_width}} Errors +""".format(value_width) + return text_summary + + +def main(): + argparser = argparse.ArgumentParser(description='Load DJORNL data') + argparser.add_argument( + '--debug', action='store_true', + help='Print errors in summary, by default only their count is printed.' + ) + argparser.add_argument( + '--dry-run', dest='dry', action='store_true', + help='Perform all actions of the parser, except loading the data.' + ) + argparser.add_argument( + '--output', default='text', + help='Specify the format of any output generated. (text or json)' + ) + args = argparser.parse_args() parser = DJORNL_Parser() + summary = dict() + debug = args.debug try: - parser.load_data() + summary = parser.load_data(dry_run=args.dry) except Exception as err: - print(err) + print('Unhandled exception', err) + exit(1) + errors = summary.get('errors') + if summary: + print(format_summary(summary, args.output)) + if errors: + punctuation = ':' if debug else '.' + error_output = f'Aborted with {len(errors)} errors{punctuation}\n' + if debug: + error_output += '\n'.join(errors) + raise RuntimeError(error_output) exit(1) + + +if __name__ == '__main__': + main() diff --git a/importers/test/test_djornl_parser.py b/importers/test/test_djornl_parser.py index 90a84967..34721096 100644 --- a/importers/test/test_djornl_parser.py +++ b/importers/test/test_djornl_parser.py @@ -56,11 +56,9 @@ def test_errors(self, parser=None, errs={}): with self.subTest(data_type="all types"): # test all errors - with self.assertRaisesRegex(RuntimeError, all_errs[0]) as cm: - parser.load_data() - exception = cm.exception - err_list = exception.split("\n") - self.assertEqual(err_list, all_errs) + summary = parser.load_data() + err_list = summary['errors'] + self.assertEqual(err_list, all_errs) def test_missing_required_env_var(self): '''test that the parser exits with code 1 if the RES_ROOT_DATA_PATH env var is not set''' @@ -303,7 +301,9 @@ def test_dry_run(self): 'node_data_available': {'cluster': 0, 'full': 14, 'key_only': 0}, 'node_type_count': {'__NO_TYPE__': 0, 'gene': 10, 'pheno': 4}, 'nodes_in_edge': 10, - 'nodes_total': 14 + 'nodes_total': 14, + 'errors_total': 0, + 'errors': [] }, output ) diff --git a/importers/test/test_djornl_parser_integration.py b/importers/test/test_djornl_parser_integration.py index e7e758e8..ed854ac3 100644 --- a/importers/test/test_djornl_parser_integration.py +++ b/importers/test/test_djornl_parser_integration.py @@ -24,4 +24,4 @@ def test_the_full_shebang(self): with modified_environ(RES_ROOT_DATA_PATH=os.path.join(_TEST_DIR, 'djornl', 'test_data')): parser = DJORNL_Parser() parser.load_data() - self.assertEqual(True, parser.load_data()) + self.assertTrue(bool(parser.load_data()))