Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[WIP] pyDKB: storages #277

Draft
wants to merge 20 commits into
base: pyDKB-loggable-object
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
20 commits
Select commit Hold shift + click to select a range
b014051
pyDKB: introduce pyDKB.storages submodule.
mgolosova Apr 11, 2019
fcce139
pyDKB/storages: add mechanics to get clients for known storages.
mgolosova Jun 26, 2019
d36a706
pyDKB/storages: add possibility to set default scope.
mgolosova Jun 26, 2019
c48b02b
pyDKB/storages: remove reference to client object from the interface …
mgolosova Jul 10, 2019
1e073dd
pyDKB/storages: add "bare" wrapper for `atlas.rucio` client.
mgolosova Jul 10, 2019
9dd2f5e
pyDKB/storages: codestyle fix (reserved word).
mgolosova Jul 10, 2019
5602857
pyDKB/storages: implement `RucioClient.get()` method.
mgolosova Jul 10, 2019
b8ddd92
pyDKB/storages: improve docstrings.
mgolosova Jul 11, 2019
ea6f358
pyDKB/storages: add "bare" interface for ES client.
mgolosova Jul 11, 2019
04a2721
pyDKB/storages: add implementation of `ESClient.configure()` method.
mgolosova Jul 11, 2019
b7df5cd
pyDKB/storages: bug fix (undefined variable).
mgolosova Aug 6, 2019
2f6bb07
pyDKB/storages: fix docstrings.
mgolosova Aug 6, 2019
5c32433
pyDKB/storages: typo fix.
mgolosova Aug 13, 2019
1d3c5cd
pyDKB/storages: update exception description.
mgolosova Sep 2, 2019
9111b75
pyDKB/storages: update docs to use Sphinx :raise XXX: markdown.
mgolosova Sep 2, 2019
37de231
pyDKB/storages: typo fix.
mgolosova Sep 16, 2019
6360f5d
pyDKB/misc: fix missed attribute handling in `try_to_import()`.
mgolosova Jan 9, 2020
5891f41
pyDKB/misc: improve error handling in `try_to_import()`.
mgolosova Jan 10, 2020
2f06b83
pyDKB/misc: add params and return value description to `try_to_import…
mgolosova Jan 10, 2020
10ac41a
pyDKB/misc: introduce special return vale for `try_to_import()`.
mgolosova Jan 10, 2020
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 36 additions & 0 deletions Utils/Dataflow/pyDKB/common/misc.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,16 @@
import sys
import inspect
from datetime import datetime
import importlib

from types import logLevel

# Datetime format for log messages
DTFORMAT = '%Y-%m-%d %H:%M:%S'

# Special value for `try_to_import()` to indicate failure
NOT_IMPORTED = 'NOT IMPORTED VALUE'


def log(message, level=logLevel.INFO, *args):
""" Output log message with given log level.
Expand Down Expand Up @@ -56,3 +60,35 @@ def log(message, level=logLevel.INFO, *args):
out_message += "\n(==) %s" % l
out_message += "\n"
sys.stderr.write(out_message)


def try_to_import(modname, attrname=None):
""" Try to import specified module or attribute from a module.

If module/attribute can not be imported, catch the exception and output log
message.

:param modname: module name
:type modname: str
:param attrname: attribute name (optional)
:type attrname: str

:return: imported module, attribute (or submodule);
``NOT_IMPORTED`` in case of failure.
:rtype: object
"""
if attrname:
err_msg = "Failed to import '%s' from '%s'.\nDetails: " \
% (attrname, modname)
else:
err_msg = "Failed to import module '%s'.\nDetails: " % (modname)

try:
result = importlib.import_module(modname)
if attrname:
result = getattr(result, attrname)
except Exception, err:
log(err_msg + str(err), logLevel.ERROR)
result = NOT_IMPORTED

return result
73 changes: 73 additions & 0 deletions Utils/Dataflow/pyDKB/storages/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
"""
pyDKB.storages
"""

import importlib


import exceptions
import client


_scope = None


def setScope(scope):
""" Set default scope to look for storages.

:param scope: scope name
:type scope: str
"""
global _scope
_scope = getScope(scope)


def getScope(scope):
""" Initialize storages scope for further usage.

:param scope: scope name
:type scope: str
"""
try:
full_name = __name__ + "." + scope
scope = importlib.import_module(full_name)
except ImportError:
raise exceptions.StorageException("Scope not defined: '%s'" % scope)
return scope


def getClient(name, scope=None):
""" Get client for a given storage.

:raise StorageException: failed to get client by given name and scope

:param name: storage name
:type name: str
:param scope: scope name. If not specified, default value set with
`setScope()` is used
:type scope: str, NoneType

:return: storage client
:rtype: client.Client
"""
if scope:
scope = getScope(scope)
else:
scope = _scope
if scope is None:
raise exceptions.StorageException("Storages scope not specified")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
raise exceptions.StorageException("Storages scope not specified")
raise exceptions.StorageException("Storage's scope not specified")

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is not a mistake (or, better to say, it is a different mistake): the message says that scope of storages, from which should be picked one (or maybe "storage lookup scope"?), is not specified. "Storage scope" is more like a scope of a single storage -- which wouldn`t have much sense in this situation.

Will be changed, thank you!

cur_scope = scope
for n in name.split('.'):
try:
new_scope = getattr(cur_scope, n, None)
if new_scope is None:
new_scope_name = cur_scope.__name__ + "." + n
new_scope = importlib.import_module(new_scope_name)
cur_scope = new_scope
except ImportError:
raise exceptions.StorageException("Storage not defined in scope "
"'%s': '%s'"
% (scope.__name__.split('.')[-1],
name))
client = cur_scope.getClient()
return client
3 changes: 3 additions & 0 deletions Utils/Dataflow/pyDKB/storages/atlas/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
"""
pyDKB.storages.atlas
"""
111 changes: 111 additions & 0 deletions Utils/Dataflow/pyDKB/storages/atlas/rucio.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,111 @@
"""
pyDKB.storages.atlas.rucio
"""

import os

from ..client import Client
from ..exceptions import (StorageException, NotFound)
from pyDKB.common.misc import (log, logLevel)
from pyDKB.common.misc import (try_to_import, NOT_IMPORTED)


if not os.environ.get("VIRTUAL_ENV", None):
base_dir = os.path.abspath(os.path.dirname(__file__))
user_rucio_dir = os.path.expanduser("~/.rucio")
if os.path.exists(user_rucio_dir):
os.environ["VIRTUAL_ENV"] = os.path.join(user_rucio_dir)
else:
os.environ["VIRTUAL_ENV"] = os.path.join(base_dir, ".rucio")
Evildoor marked this conversation as resolved.
Show resolved Hide resolved
log("Set VIRTUAL_ENV: %s" % os.environ["VIRTUAL_ENV"], logLevel.INFO)

_RucioClient = try_to_import('rucio.client', 'Client')
RucioException = try_to_import('rucio.common.exception', 'RucioException')


_client = None


def _initClient():
""" Initialize client. """
global _client
if _RucioClient is NOT_IMPORTED:
raise StorageException("Failed to initialize Rucio client: required "
"module(s) not loaded.")
_client = RucioClient()


def getClient():
""" Get Rucio client. """
if not _client:
_initClient()
return _client


ParentClientClass = _RucioClient if _RucioClient else object


class RucioClient(Client, ParentClientClass):
""" Implement common interface for Rucio client. """

def __init__(self, *args, **kwargs):
""" Initialize instance as parent client class object. """
ParentClientClass.__init__(self, *args, **kwargs)

def get(self, oid, **kwargs):
""" Get dataset metadata.

Implementation of interface method `Client.get()`.

:param oid: dataset name
:type oid: str
:param fields: list of requested metadata fields
(None = all metadata)
:type fields: list

:return: dataset metadata
:rtype: dict
"""
scope, name = self._scope_and_name(oid)
try:
result = self.get_metadata(scope=scope, name=name)
except ValueError, err:
raise StorageException("Failed to get metadata from Rucio: %s"
% err)
except RucioException, err:
if 'Data identifier not found' in str(err):
raise NotFound(scope=scope, name=name)
raise StorageException("Failed to get metadata from Rucio: %s"
% err)
if kwargs.get('fields') is not None:
result = {f: result.get(f, None) for f in kwargs['fields']}
return result

def _scope_and_name(self, dsn):
""" Construct normalized scope and dataset name.

As input accepts dataset names in two forms:
* dot-separated string: "<XXX>.<YYY>[.<...>]";
* dot-separated string with prefix: "<scope>:<XXX>.<YYY>[.<...>]".

In first case ID is taken as a canonical dataset name and scope is set
to its first field (or two first fields, if the ID starts with 'user'
or 'group').
In second case prefix is taken as scope, and removed from ID to get the
canonical dataset name.

:param dsn: dataset name
:type dsn: str

:return: scope, datasetname
:rtype: tuple
"""
result = dsn.split(':')
if len(result) < 2:
splitted = dsn.split('.')
if dsn.startswith('user') or dsn.startswith('group'):
scope = '.'.join(splitted[0:2])
else:
scope = splitted[0]
result = (scope, dsn)
return result
34 changes: 34 additions & 0 deletions Utils/Dataflow/pyDKB/storages/client/Client.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
"""
pyDKB.storages.client.Client
"""

from pyDKB.common import LoggableObject


class Client(LoggableObject):
""" Interface class for external and internal DKB storage clients. """

def __init__(self):
""" Initialize Storage object. """
raise NotImplementedError

def configure(self, cfg):
""" Apply storage configuration (initialize client).

:param cfg: configuration parameters
:type cfg: dict
"""
raise NotImplementedError

def get(self, oid, **kwargs):
""" Get object / record from storage by ID.

:raise NotFound: object / record is not found

:param oid: object / record identifier
:type oid: str, int

:return: record with given ID
:rtype: dict
"""
raise NotImplementedError
5 changes: 5 additions & 0 deletions Utils/Dataflow/pyDKB/storages/client/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
"""
pyDKB.storages.client
"""

from Client import Client
75 changes: 75 additions & 0 deletions Utils/Dataflow/pyDKB/storages/client/es.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
"""
pyDKB.storages.client.es
"""

from Client import Client
from pyDKB.common.misc import (try_to_import, NOT_IMPORTED)
from pyDKB.common.types import logLevel


_ESClient = try_to_import('elasticsearch', 'Elasticsearch')

ParentClientClass = _ESClient if _ESClient is not NOT_IMPORTED else object


class ESClient(Client, ParentClientClass):
""" Implement common interface for ES client. """

index = None

def __init__(self, *args, **kwargs):
""" Initialize instance as parent client class object. """
ParentClientClass.__init__(self, *args, **kwargs)

def configure(self, cfg):
""" Apply configuration.

Configuration parameters:
hosts (str) -- comma separated list of 'host:port' records
host (str) -- host name or IP (single) (ignored if hosts defined)
port (str) -- host port (ignored if hosts defined)
index (str) -- default index name
user (str)
passwd (str)

:param cfg: configuration parameters
:type cfg: dict
"""
kwargs = {}

hosts = None
host = {}
if cfg.get('hosts'):
hosts = [h.strip() for h in cfg['hosts'].split(',')]
if cfg.get('host'):
if cfg.get('hosts'):
self.log("Configuration parameter ignored: 'host' ('hosts' "
"specified)")
else:
host['host'] = cfg['host']
if cfg.get('port'):
if cfg.get('hosts'):
self.log("Configuration parameter ignored: 'port' ('hosts' "
"specified)")
else:
host['port'] = cfg['port']
if hosts or host:
kwargs['hosts'] = hosts if hosts else [host]

if cfg.get('user'):
auth = (cfg['user'], )
if cfg.get('passwd'):
auth += (cfg['passwd'], )
else:
self.log("Configuration parameter missed: 'passwd' ('user' "
"specified)", logLevel.WARN)
kwargs['http_auth'] = auth
elif cfg.get('passwd'):
self.log("Configuration parameter ignored: 'passwd' ('user' "
"not specified)")

if cfg.get('index'):
self.index = cfg['index']

# Re-initialize self as parent client class instance
ParentClientClass.__init__(self, **kwargs)
25 changes: 25 additions & 0 deletions Utils/Dataflow/pyDKB/storages/exceptions.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
"""
pyDKB.storages.exceptions
"""


class StorageException(Exception):
""" Base exception for all storage-related exceptions. """
pass


class NotFound(StorageException):
""" Exception indicating that record with given ID is not found. """

def __init__(self, **kwargs):
""" Initialize exception.

:param kwargs: record primary key parameters
:type kwargs: dict
"""
message = "Record not found"
if kwargs:
params = [': '.join((key, '%r' % kwargs[key])) for key in kwargs]
params = ', '.join(params)
message = message + ' (%s)' % params
super(NotFound, self).__init__(message)