diff --git a/Utils/Dataflow/pyDKB/common/misc.py b/Utils/Dataflow/pyDKB/common/misc.py index 08a7e8963..407241712 100644 --- a/Utils/Dataflow/pyDKB/common/misc.py +++ b/Utils/Dataflow/pyDKB/common/misc.py @@ -7,12 +7,16 @@ import sys import inspect from datetime import datetime +import importlib from types import logLevel # Datetime format for log messages DTFORMAT = '%Y-%m-%d %H:%M:%S' +# Special value for `try_to_import()` to indicate failure +NOT_IMPORTED = 'NOT IMPORTED VALUE' + def log(message, level=logLevel.INFO, *args): """ Output log message with given log level. @@ -56,3 +60,35 @@ def log(message, level=logLevel.INFO, *args): out_message += "\n(==) %s" % l out_message += "\n" sys.stderr.write(out_message) + + +def try_to_import(modname, attrname=None): + """ Try to import specified module or attribute from a module. + + If module/attribute can not be imported, catch the exception and output log + message. + + :param modname: module name + :type modname: str + :param attrname: attribute name (optional) + :type attrname: str + + :return: imported module, attribute (or submodule); + ``NOT_IMPORTED`` in case of failure. + :rtype: object + """ + if attrname: + err_msg = "Failed to import '%s' from '%s'.\nDetails: " \ + % (attrname, modname) + else: + err_msg = "Failed to import module '%s'.\nDetails: " % (modname) + + try: + result = importlib.import_module(modname) + if attrname: + result = getattr(result, attrname) + except Exception, err: + log(err_msg + str(err), logLevel.ERROR) + result = NOT_IMPORTED + + return result diff --git a/Utils/Dataflow/pyDKB/storages/__init__.py b/Utils/Dataflow/pyDKB/storages/__init__.py new file mode 100644 index 000000000..33385497e --- /dev/null +++ b/Utils/Dataflow/pyDKB/storages/__init__.py @@ -0,0 +1,73 @@ +""" +pyDKB.storages +""" + +import importlib + + +import exceptions +import client + + +_scope = None + + +def setScope(scope): + """ Set default scope to look for storages. + + :param scope: scope name + :type scope: str + """ + global _scope + _scope = getScope(scope) + + +def getScope(scope): + """ Initialize storages scope for further usage. + + :param scope: scope name + :type scope: str + """ + try: + full_name = __name__ + "." + scope + scope = importlib.import_module(full_name) + except ImportError: + raise exceptions.StorageException("Scope not defined: '%s'" % scope) + return scope + + +def getClient(name, scope=None): + """ Get client for a given storage. + + :raise StorageException: failed to get client by given name and scope + + :param name: storage name + :type name: str + :param scope: scope name. If not specified, default value set with + `setScope()` is used + :type scope: str, NoneType + + :return: storage client + :rtype: client.Client + """ + if scope: + scope = getScope(scope) + else: + scope = _scope + if scope is None: + raise exceptions.StorageException("Storages scope not specified") + cur_scope = scope + for n in name.split('.'): + try: + new_scope = getattr(cur_scope, n, None) + if new_scope is None: + new_scope_name = cur_scope.__name__ + "." + n + new_scope = importlib.import_module(new_scope_name) + cur_scope = new_scope + except ImportError: + raise exceptions.StorageException("Storage not defined in scope " + "'%s': '%s'" + % (scope.__name__.split('.')[-1], + name)) + client = cur_scope.getClient() + return client diff --git a/Utils/Dataflow/pyDKB/storages/atlas/__init__.py b/Utils/Dataflow/pyDKB/storages/atlas/__init__.py new file mode 100644 index 000000000..298900e21 --- /dev/null +++ b/Utils/Dataflow/pyDKB/storages/atlas/__init__.py @@ -0,0 +1,3 @@ +""" +pyDKB.storages.atlas +""" diff --git a/Utils/Dataflow/pyDKB/storages/atlas/rucio.py b/Utils/Dataflow/pyDKB/storages/atlas/rucio.py new file mode 100644 index 000000000..df450820f --- /dev/null +++ b/Utils/Dataflow/pyDKB/storages/atlas/rucio.py @@ -0,0 +1,111 @@ +""" +pyDKB.storages.atlas.rucio +""" + +import os + +from ..client import Client +from ..exceptions import (StorageException, NotFound) +from pyDKB.common.misc import (log, logLevel) +from pyDKB.common.misc import (try_to_import, NOT_IMPORTED) + + +if not os.environ.get("VIRTUAL_ENV", None): + base_dir = os.path.abspath(os.path.dirname(__file__)) + user_rucio_dir = os.path.expanduser("~/.rucio") + if os.path.exists(user_rucio_dir): + os.environ["VIRTUAL_ENV"] = os.path.join(user_rucio_dir) + else: + os.environ["VIRTUAL_ENV"] = os.path.join(base_dir, ".rucio") + log("Set VIRTUAL_ENV: %s" % os.environ["VIRTUAL_ENV"], logLevel.INFO) + +_RucioClient = try_to_import('rucio.client', 'Client') +RucioException = try_to_import('rucio.common.exception', 'RucioException') + + +_client = None + + +def _initClient(): + """ Initialize client. """ + global _client + if _RucioClient is NOT_IMPORTED: + raise StorageException("Failed to initialize Rucio client: required " + "module(s) not loaded.") + _client = RucioClient() + + +def getClient(): + """ Get Rucio client. """ + if not _client: + _initClient() + return _client + + +ParentClientClass = _RucioClient if _RucioClient else object + + +class RucioClient(Client, ParentClientClass): + """ Implement common interface for Rucio client. """ + + def __init__(self, *args, **kwargs): + """ Initialize instance as parent client class object. """ + ParentClientClass.__init__(self, *args, **kwargs) + + def get(self, oid, **kwargs): + """ Get dataset metadata. + + Implementation of interface method `Client.get()`. + + :param oid: dataset name + :type oid: str + :param fields: list of requested metadata fields + (None = all metadata) + :type fields: list + + :return: dataset metadata + :rtype: dict + """ + scope, name = self._scope_and_name(oid) + try: + result = self.get_metadata(scope=scope, name=name) + except ValueError, err: + raise StorageException("Failed to get metadata from Rucio: %s" + % err) + except RucioException, err: + if 'Data identifier not found' in str(err): + raise NotFound(scope=scope, name=name) + raise StorageException("Failed to get metadata from Rucio: %s" + % err) + if kwargs.get('fields') is not None: + result = {f: result.get(f, None) for f in kwargs['fields']} + return result + + def _scope_and_name(self, dsn): + """ Construct normalized scope and dataset name. + + As input accepts dataset names in two forms: + * dot-separated string: ".[.<...>]"; + * dot-separated string with prefix: ":.[.<...>]". + + In first case ID is taken as a canonical dataset name and scope is set + to its first field (or two first fields, if the ID starts with 'user' + or 'group'). + In second case prefix is taken as scope, and removed from ID to get the + canonical dataset name. + + :param dsn: dataset name + :type dsn: str + + :return: scope, datasetname + :rtype: tuple + """ + result = dsn.split(':') + if len(result) < 2: + splitted = dsn.split('.') + if dsn.startswith('user') or dsn.startswith('group'): + scope = '.'.join(splitted[0:2]) + else: + scope = splitted[0] + result = (scope, dsn) + return result diff --git a/Utils/Dataflow/pyDKB/storages/client/Client.py b/Utils/Dataflow/pyDKB/storages/client/Client.py new file mode 100644 index 000000000..fd472919c --- /dev/null +++ b/Utils/Dataflow/pyDKB/storages/client/Client.py @@ -0,0 +1,34 @@ +""" +pyDKB.storages.client.Client +""" + +from pyDKB.common import LoggableObject + + +class Client(LoggableObject): + """ Interface class for external and internal DKB storage clients. """ + + def __init__(self): + """ Initialize Storage object. """ + raise NotImplementedError + + def configure(self, cfg): + """ Apply storage configuration (initialize client). + + :param cfg: configuration parameters + :type cfg: dict + """ + raise NotImplementedError + + def get(self, oid, **kwargs): + """ Get object / record from storage by ID. + + :raise NotFound: object / record is not found + + :param oid: object / record identifier + :type oid: str, int + + :return: record with given ID + :rtype: dict + """ + raise NotImplementedError diff --git a/Utils/Dataflow/pyDKB/storages/client/__init__.py b/Utils/Dataflow/pyDKB/storages/client/__init__.py new file mode 100644 index 000000000..6fdcd4c5b --- /dev/null +++ b/Utils/Dataflow/pyDKB/storages/client/__init__.py @@ -0,0 +1,5 @@ +""" +pyDKB.storages.client +""" + +from Client import Client diff --git a/Utils/Dataflow/pyDKB/storages/client/es.py b/Utils/Dataflow/pyDKB/storages/client/es.py new file mode 100644 index 000000000..44f63e23a --- /dev/null +++ b/Utils/Dataflow/pyDKB/storages/client/es.py @@ -0,0 +1,75 @@ +""" +pyDKB.storages.client.es +""" + +from Client import Client +from pyDKB.common.misc import (try_to_import, NOT_IMPORTED) +from pyDKB.common.types import logLevel + + +_ESClient = try_to_import('elasticsearch', 'Elasticsearch') + +ParentClientClass = _ESClient if _ESClient is not NOT_IMPORTED else object + + +class ESClient(Client, ParentClientClass): + """ Implement common interface for ES client. """ + + index = None + + def __init__(self, *args, **kwargs): + """ Initialize instance as parent client class object. """ + ParentClientClass.__init__(self, *args, **kwargs) + + def configure(self, cfg): + """ Apply configuration. + + Configuration parameters: + hosts (str) -- comma separated list of 'host:port' records + host (str) -- host name or IP (single) (ignored if hosts defined) + port (str) -- host port (ignored if hosts defined) + index (str) -- default index name + user (str) + passwd (str) + + :param cfg: configuration parameters + :type cfg: dict + """ + kwargs = {} + + hosts = None + host = {} + if cfg.get('hosts'): + hosts = [h.strip() for h in cfg['hosts'].split(',')] + if cfg.get('host'): + if cfg.get('hosts'): + self.log("Configuration parameter ignored: 'host' ('hosts' " + "specified)") + else: + host['host'] = cfg['host'] + if cfg.get('port'): + if cfg.get('hosts'): + self.log("Configuration parameter ignored: 'port' ('hosts' " + "specified)") + else: + host['port'] = cfg['port'] + if hosts or host: + kwargs['hosts'] = hosts if hosts else [host] + + if cfg.get('user'): + auth = (cfg['user'], ) + if cfg.get('passwd'): + auth += (cfg['passwd'], ) + else: + self.log("Configuration parameter missed: 'passwd' ('user' " + "specified)", logLevel.WARN) + kwargs['http_auth'] = auth + elif cfg.get('passwd'): + self.log("Configuration parameter ignored: 'passwd' ('user' " + "not specified)") + + if cfg.get('index'): + self.index = cfg['index'] + + # Re-initialize self as parent client class instance + ParentClientClass.__init__(self, **kwargs) diff --git a/Utils/Dataflow/pyDKB/storages/exceptions.py b/Utils/Dataflow/pyDKB/storages/exceptions.py new file mode 100644 index 000000000..44392d26a --- /dev/null +++ b/Utils/Dataflow/pyDKB/storages/exceptions.py @@ -0,0 +1,25 @@ +""" +pyDKB.storages.exceptions +""" + + +class StorageException(Exception): + """ Base exception for all storage-related exceptions. """ + pass + + +class NotFound(StorageException): + """ Exception indicating that record with given ID is not found. """ + + def __init__(self, **kwargs): + """ Initialize exception. + + :param kwargs: record primary key parameters + :type kwargs: dict + """ + message = "Record not found" + if kwargs: + params = [': '.join((key, '%r' % kwargs[key])) for key in kwargs] + params = ', '.join(params) + message = message + ' (%s)' % params + super(NotFound, self).__init__(message)