From 4dd8a170c242f349ef817f835308d1010a872b54 Mon Sep 17 00:00:00 2001 From: Marina Golosova Date: Fri, 26 Apr 2019 22:58:36 +0200 Subject: [PATCH 1/4] pyDKB/common: rename `custom_readline` to `utils`. There can possibly be different functions, useful for some specific operations. It is possible to keep them all in the individual files, but it is not very comfortable to use (one have to type something like `from pyDKB.common.my_function_name import my_function_name`, or in `__init__` file have a list of imports like this: `from my_function_name import my_function_name`). So I think it will be fine to put them all to a common file ('utils' or 'misc' or whatever) and use like `from pyDKB.common.utils import my_function_name`). Looks a little better (for me, at least). --- Utils/Dataflow/pyDKB/common/__init__.py | 1 - Utils/Dataflow/pyDKB/common/{custom_readline.py => utils.py} | 4 +++- Utils/Dataflow/pyDKB/dataflow/stage/AbstractProcessorStage.py | 2 +- 3 files changed, 4 insertions(+), 3 deletions(-) rename Utils/Dataflow/pyDKB/common/{custom_readline.py => utils.py} (95%) diff --git a/Utils/Dataflow/pyDKB/common/__init__.py b/Utils/Dataflow/pyDKB/common/__init__.py index 024e2871f..84fad2747 100644 --- a/Utils/Dataflow/pyDKB/common/__init__.py +++ b/Utils/Dataflow/pyDKB/common/__init__.py @@ -5,5 +5,4 @@ from exceptions import * import hdfs import json_utils as json -from custom_readline import custom_readline from Type import Type diff --git a/Utils/Dataflow/pyDKB/common/custom_readline.py b/Utils/Dataflow/pyDKB/common/utils.py similarity index 95% rename from Utils/Dataflow/pyDKB/common/custom_readline.py rename to Utils/Dataflow/pyDKB/common/utils.py index 3a84d28af..4d942ffe3 100644 --- a/Utils/Dataflow/pyDKB/common/custom_readline.py +++ b/Utils/Dataflow/pyDKB/common/utils.py @@ -1,5 +1,7 @@ """ -Implementation of "readline"-like functionality for custom separator. +pyDKB.common.utils + +Miscellaneous useful functions. .. todo:: make import of ``fcntl`` (or of this module) optional to avoid errors when library is used under Windows. diff --git a/Utils/Dataflow/pyDKB/dataflow/stage/AbstractProcessorStage.py b/Utils/Dataflow/pyDKB/dataflow/stage/AbstractProcessorStage.py index c9b254063..77b123513 100644 --- a/Utils/Dataflow/pyDKB/dataflow/stage/AbstractProcessorStage.py +++ b/Utils/Dataflow/pyDKB/dataflow/stage/AbstractProcessorStage.py @@ -49,7 +49,7 @@ from . import Message from pyDKB.dataflow import DataflowException from pyDKB.common import hdfs -from pyDKB.common import custom_readline +from pyDKB.common.utils import custom_readline class AbstractProcessorStage(AbstractStage): From 2b4a50e35bdf5710962c8f9544830931ceb43b1c Mon Sep 17 00:00:00 2001 From: Marina Golosova Date: Fri, 26 Apr 2019 23:11:36 +0200 Subject: [PATCH 2/4] pyDKB/common: move `read_es_config()` function from stage to library. Function like this seem to be needed in multiple places, so why not to have it in the common library. --- .../091_datasetsRucio/datasets_processing.py | 29 +---------------- Utils/Dataflow/pyDKB/common/utils.py | 31 +++++++++++++++++++ 2 files changed, 32 insertions(+), 28 deletions(-) diff --git a/Utils/Dataflow/091_datasetsRucio/datasets_processing.py b/Utils/Dataflow/091_datasetsRucio/datasets_processing.py index 6990851b3..1f750dc1a 100755 --- a/Utils/Dataflow/091_datasetsRucio/datasets_processing.py +++ b/Utils/Dataflow/091_datasetsRucio/datasets_processing.py @@ -37,6 +37,7 @@ sys.path.append(dkb_dir) import pyDKB from pyDKB import storages + from pyDKB.common.utils import read_es_config except Exception, err: sys.stderr.write("(ERROR) Failed to import pyDKB library: %s\n" % err) sys.exit(1) @@ -103,34 +104,6 @@ def main(argv): sys.exit(exit_code) -def read_es_config(cfg_file): - """ Read ES configuration file. - - :param cfg_file: open file descriptor with ES access configuration - :type cfg_file: file descriptor - """ - keys = {'ES_HOST': 'host', - 'ES_PORT': 'port', - 'ES_USER': 'user', - 'ES_PASSWORD': '__passwd', - 'ES_INDEX': 'index' - } - cfg = {} - for line in cfg_file.readlines(): - if line.strip().startswith('#'): - continue - line = line.split('#')[0].strip() - if '=' not in line: - continue - key, val = line.split('=')[:2] - try: - cfg[keys[key]] = val - except KeyError: - sys.stderr.write("(WARN) Unknown configuration parameter: " - "'%s'.\n" % key) - return cfg - - def init_rucio_client(): """ Initialize global variable `rucio_client`. """ global rucio_client diff --git a/Utils/Dataflow/pyDKB/common/utils.py b/Utils/Dataflow/pyDKB/common/utils.py index 4d942ffe3..dc5aa0d37 100644 --- a/Utils/Dataflow/pyDKB/common/utils.py +++ b/Utils/Dataflow/pyDKB/common/utils.py @@ -54,3 +54,34 @@ def custom_readline(f, newline): pos = buf.index(newline) yield buf[:pos] buf = buf[pos + len(newline):] + + +def read_es_config(cfg_file): + """ Read ES configuration file. + + We have ES config in form of file with shell variables declaration, + but sometimes need to parse it in Python as well. + + :param cfg_file: open file descriptor with ES access configuration + :type cfg_file: file descriptor + """ + keys = {'ES_HOST': 'host', + 'ES_PORT': 'port', + 'ES_USER': 'user', + 'ES_PASSWORD': '__passwd', + 'ES_INDEX': 'index' + } + cfg = {} + for line in cfg_file.readlines(): + if line.strip().startswith('#'): + continue + line = line.split('#')[0].strip() + if '=' not in line: + continue + key, val = line.split('=')[:2] + try: + cfg[keys[key]] = val + except KeyError: + sys.stderr.write("(WARN) Unknown configuration parameter: " + "'%s'.\n" % key) + return cfg From a9d10acd59ab8418893b0af5f3b6161a13dc96c7 Mon Sep 17 00:00:00 2001 From: Marina Golosova Date: Sat, 27 Apr 2019 19:23:22 +0200 Subject: [PATCH 3/4] DF/095: allow stage to use DKB ES storage as second (backup) source. If `--es-config FILE` parameter is specified, use DKB ES storage as a backup metadata source in case that in primary source (AMI) information was removed. I am not sure under what curcumstances information can be removed from AMI, so for now we just check if there are empty/missed fields in the data taken from AMI and then, if such firlds found, check ES for (possibly) already known values. The problem is that there are quite a lot of "missed" values, so almost for every record we have to check both AMI and ES. Maybe there is more delicate trigger, like "No data at all", or "if dataset property 'deleted' is set to True", or... Or maybe AMI just doesn't remove data at all?.. --- .../095_datasetInfoAMI/amiDatasets.py | 54 +++++++++++++++++++ 1 file changed, 54 insertions(+) diff --git a/Utils/Dataflow/095_datasetInfoAMI/amiDatasets.py b/Utils/Dataflow/095_datasetInfoAMI/amiDatasets.py index 9f90b5244..46a43935f 100755 --- a/Utils/Dataflow/095_datasetInfoAMI/amiDatasets.py +++ b/Utils/Dataflow/095_datasetInfoAMI/amiDatasets.py @@ -3,6 +3,8 @@ import re import sys import os +import argparse + try: import pyAMI.client import pyAMI.atlas.api as AtlasAPI @@ -17,6 +19,8 @@ dkb_dir = os.path.join(base_dir, os.pardir) sys.path.append(dkb_dir) import pyDKB + from pyDKB import storages + from pyDKB.common.utils import read_es_config except Exception, err: sys.stderr.write("(ERROR) Failed to import pyDKB library: %s\n" % err) sys.exit(1) @@ -39,10 +43,22 @@ def main(argv): stage.add_argument('--userkey', help='PEM key file', required=True) stage.add_argument('--usercert', help='PEM certificate file', required=True) + stage.add_argument('--es-config', action='store', + type=argparse.FileType('r'), + help=u'Use ES as a backup source for dataset info' + ' in order to save information even if it was' + ' removed from the original source', + nargs='?', + dest='es' + ) exit_code = 0 try: stage.parse_args(argv) + if stage.ARGS.es: + cfg = read_es_config(stage.ARGS.es) + stage.ARGS.es.close() + storages.create("ES", storages.storageType.ES, cfg) stage.process = process init_ami_client(stage.ARGS.userkey, stage.ARGS.usercert) stage.run() @@ -91,6 +107,7 @@ def process(stage, message): # or not set at all. if update or not formats: amiPhysValues(data) + fix_ds_info(data) stage.output(pyDKB.dataflow.messages.JSONMessage(data)) return True @@ -153,5 +170,42 @@ def remove_tid(dataset): return re.sub('_tid(.)+', '', dataset) +def fix_ds_info(data): + """ Fix dataset metadata with data from ES, if needed and possible. + + :param data: data + :type data: dict + + :return: None if update is not requested (ES client not configured) or + not possible (ES does not contain information of given dataset); + else -- True + :rtype: bool, NoneType + """ + try: + es = storages.get("ES") + except storages.StorageNotConfigured: + return None + mfields = [item['es'] for item in PHYS_VALUES] + update_required = False + for f in mfields: + if not data.get(f): + update_required = True + break + if update_required: + try: + r = es.get(data.get('datasetname'), mfields, + parent=data.get('_parent')) + except storages.exceptions.NotFound, err: + sys.stderr.write("(DEBUG) %s.\n" % err) + return None + for f in mfields: + if not data.get(f) and r.get(f) != data.get(f): + sys.stderr.write("(DEBUG) Update AMI info with data from ES:" + " %s = '%s' (was: '%s')\n" % (f, r.get(f), + data.get(f))) + data[f] = r[f] + return True + + if __name__ == '__main__': main(sys.argv[1:]) From 7d31da3709946e2b30868aff59fb0564d76d1127 Mon Sep 17 00:00:00 2001 From: Marina Golosova Date: Sun, 28 Apr 2019 15:06:10 +0200 Subject: [PATCH 4/4] DF/095: explicitly specify document type for ES `get()` request. In theory it should make the request slightly faster. --- Utils/Dataflow/095_datasetInfoAMI/amiDatasets.py | 1 + 1 file changed, 1 insertion(+) diff --git a/Utils/Dataflow/095_datasetInfoAMI/amiDatasets.py b/Utils/Dataflow/095_datasetInfoAMI/amiDatasets.py index 46a43935f..02536f511 100755 --- a/Utils/Dataflow/095_datasetInfoAMI/amiDatasets.py +++ b/Utils/Dataflow/095_datasetInfoAMI/amiDatasets.py @@ -194,6 +194,7 @@ def fix_ds_info(data): if update_required: try: r = es.get(data.get('datasetname'), mfields, + doc_type='output_dataset', parent=data.get('_parent')) except storages.exceptions.NotFound, err: sys.stderr.write("(DEBUG) %s.\n" % err)