Skip to content

Commit

Permalink
EOS-18850 : Update py-utils code wrt Elasticsearch and Kibana OpenDis…
Browse files Browse the repository at this point in the history
…tro v7.10 and ElasticSearch host to incorporate the multi-node cluster (Seagate#260)

* EOS-18850:ElasticSearch host to incorporate the multi-node cluster

* EOS-18850:Update py-utils code wrt Elasticsearch and Kibana OpenDistro v7.10 and ElasticSearch host to incorporate the multi-node cluster

* Update elasticsearch and elasticsearch-dsl version

Signed-off-by: Pranali04796 <[email protected]>

* Update py-utils: ElasticSearch host to incorporate the multi-node cluster

Signed-off-by: Pranali04796 <[email protected]>

* codacy changes

Signed-off-by: Pranali04796 <[email protected]>

* update database.py

Signed-off-by: Pranali04796 <[email protected]>

* codacy changes

Signed-off-by: Pranali04796 <[email protected]>

Co-authored-by: Udayan Yaragattikar <[email protected]>
Co-authored-by: Sachin Punadikar <[email protected]>
  • Loading branch information
3 people authored Jun 15, 2021
1 parent cfea374 commit 14c93c6
Show file tree
Hide file tree
Showing 7 changed files with 33 additions and 47 deletions.
4 changes: 2 additions & 2 deletions py-utils/python_requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,8 @@ aiohttp==3.6.1
configparser==4.0.2
confluent-kafka==1.5.0
cryptography==2.8
elasticsearch-dsl==6.4.0
elasticsearch==6.8.1
elasticsearch-dsl==7.3.0
elasticsearch==7.12.0
matplotlib==3.1.3
networkx==2.4
paramiko==2.7.1
Expand Down
2 changes: 1 addition & 1 deletion py-utils/src/utils/data/db/consul_db/storage.py
Original file line number Diff line number Diff line change
Expand Up @@ -244,7 +244,7 @@ async def create_database(cls, config, collection: str,
if not all((cls.consul_client, cls.thread_pool, cls.loop)):
cls.loop = asyncio.get_event_loop()
try:
cls.consul_client = Consul(host=config.host, port=config.port,
cls.consul_client = Consul(host=config.hosts[0], port=config.port,
loop=cls.loop)
except ConnectionRefusedError as e:
raise DataAccessExternalError(f"{e}")
Expand Down
5 changes: 2 additions & 3 deletions py-utils/src/utils/data/db/db_provider.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@
from cortx.utils.synchronization import ThreadSafeEvent


DEFAULT_HOST = "127.0.0.1"
DEFAULT_HOST = ["127.0.0.1"]


class ServiceStatus(Enum):
Expand All @@ -44,8 +44,7 @@ class DBSettings(Model):
"""
Settings for database server
"""

host = StringType(required=True, default=DEFAULT_HOST)
hosts = ListType(StringType, required=True, default=DEFAULT_HOST)
port = IntType(required=True, default=None)
login = StringType()
password = StringType()
Expand Down
59 changes: 23 additions & 36 deletions py-utils/src/utils/data/db/elasticsearch_db/storage.py
Original file line number Diff line number Diff line change
Expand Up @@ -195,22 +195,15 @@ def handle_compare(self, entry: FilterOperationCompare):
class ElasticSearchDataMapper:
"""ElasticSearch data mappings helper"""

def __init__(self, model: Type[BaseModel], mapping_type: str):
def __init__(self, model: Type[BaseModel]):
"""
:param Type[BaseModel] model: model for constructing data mapping for index in ElasticSearch
"""
self._model = model
self._mapping_type = mapping_type

if mapping_type is None:
raise DataAccessInternalError("Mapping type is not specified")

self._mapping = {
ESWords.MAPPINGS: {
mapping_type: {
ESWords.PROPERTIES: {
}
}
}
}
Expand All @@ -223,8 +216,7 @@ def _add_property(self, name: str, property_type: Type[BaseType]):
:param Type[BaseType] property_type: type of property for given property `name`
:return:
"""
properties = self._mapping[ESWords.MAPPINGS][self._mapping_type][ESWords.PROPERTIES]

properties = self._mapping[ESWords.MAPPINGS][ESWords.PROPERTIES]
if name in properties:
raise InternalError(f"Repeated property name in model: {name}")

Expand Down Expand Up @@ -254,12 +246,10 @@ class ElasticSearchQueryService:
"""Query service-helper for Elasticsearch"""

def __init__(self, index: str, es_client: Elasticsearch,
query_converter: ElasticSearchQueryConverter, mapping_type: str):
query_converter: ElasticSearchQueryConverter):
self._index = index
self._es_client = es_client
self._query_converter = query_converter
self._mapping_type = mapping_type

def search_by_query(self, query: Query) -> Search:
"""
Get Elasticsearch Search instance by given query object
Expand All @@ -272,8 +262,7 @@ def convert(name):

extra_params = dict()
sort_by = dict()
search = Search(index=self._index, doc_type=self._mapping_type, using=self._es_client)

search = Search(index=self._index, using=self._es_client)
q = query.data

if q.filter_by is not None:
Expand Down Expand Up @@ -323,12 +312,12 @@ def __init__(self, es_client: Elasticsearch, model: Type[BaseModel], collection:
self._es_client = es_client
self._tread_pool_exec = thread_pool_exec
self._loop = loop or asyncio.get_event_loop()
self._mapping_type = collection # Used as mapping type for particular index
self._collection = collection

self._query_converter = ElasticSearchQueryConverter(model)

# We are associating index name in ElasticSearch with given collection
self._index = self._mapping_type
self._index = self._collection

if not isinstance(model, type) or not issubclass(model, BaseModel):
raise DataAccessInternalError("Model parameter is not a Class object or not inherited "
Expand All @@ -339,7 +328,7 @@ def __init__(self, es_client: Elasticsearch, model: Type[BaseModel], collection:
self._model_scheme = None

self._query_service = ElasticSearchQueryService(self._index, self._es_client,
self._query_converter, self._mapping_type)
self._query_converter)

@classmethod
async def create_database(cls, config, collection, model: Type[BaseModel]) -> IDataBase:
Expand All @@ -358,8 +347,14 @@ async def create_database(cls, config, collection, model: Type[BaseModel]) -> ID
if config.login:
auth = (config.login, config.password)

node = {"host": config.host, "port": config.port}
cls.elastic_instance = Elasticsearch(hosts=[node], http_auth=auth)
nodes = [{"host": x, "port": config.port} for x in config.hosts]
cls.elastic_instance = Elasticsearch(
hosts=nodes,
http_auth=auth,
sniff_on_start=True, # sniff before doing anything
sniff_on_connection_fail=True, # refresh nodes after a node fails to respond
sniff_timeout=10,
sniffer_timeout=60) # and also every 60 seconds
cls.pool = ThreadPoolExecutor(max_workers=multiprocessing.cpu_count())
cls.loop = asyncio.get_event_loop()

Expand Down Expand Up @@ -396,23 +391,17 @@ def _get(_index):

# self._obj_index = self._es_client.indices.get_alias("*")
if indices.get(self._index, None) is None:
data_mappings = ElasticSearchDataMapper(self._model, self._mapping_type)
data_mappings = ElasticSearchDataMapper(self._model)
mappings_dict = data_mappings.build_index_mappings(replication)
# self._es_client.indices.create(index=model.__name__, ignore=400, body=mappings_dict)

# NOTE: for newly created indexes ElasticSearch mapping type and index name coincide
await self._loop.run_in_executor(self._tread_pool_exec,
_create, self._index, mappings_dict)

self._index_info = await self._loop.run_in_executor(self._tread_pool_exec,
_get, self._index)

# NOTE: if ElasticSearch index was created outside from CSM Agent there
# is no guarantee that index name and mapping type coincide
self._mapping_type = next(iter(self._index_info[self._index][ESWords.MAPPINGS].keys()), None)
if self._mapping_type is None:
raise DataAccessExternalError(f"There are no mapping type for ElasticSearch index {self._mapping_type}")
self._model_scheme = self._index_info[self._index][ESWords.MAPPINGS][self._mapping_type][ESWords.PROPERTIES]
self._model_scheme = self._index_info[self._index][ESWords.MAPPINGS][ESWords.PROPERTIES]
self._model_scheme = {k.lower(): v for k, v in self._model_scheme.items()}

async def store(self, obj: BaseModel):
Expand All @@ -430,8 +419,7 @@ def _store(_id, _doc: dict):
:return: elastic search server response
"""
# TODO: is it needed to use id?
_result = self._es_client.index(index=self._index, doc_type=self._mapping_type,
id=_id, body=_doc)
_result = self._es_client.index(index=self._index, id=_id, body=_doc)
return _result

await super().store(obj) # Call generic code
Expand Down Expand Up @@ -521,13 +509,12 @@ def _update(_ubq) -> UpdateByQueryResponse:
# NOTE: Important: call of the parent update method changes _to_update dict!
await super().update(filter_obj, _to_update) # Call the generic code

ubq = UpdateByQuery(index=self._index, doc_type=self._mapping_type, using=self._es_client)

ubq = UpdateByQuery(index=self._index, using=self._es_client)
filter_by = self._query_converter.build(filter_obj)
ubq = ubq.query(filter_by)

source = dict_to_source(_to_update)
ubq = ubq.script(inline=source, lang=ESWords.PAINLESS)
ubq = ubq.script(source=source, lang=ESWords.PAINLESS)

result = await self._loop.run_in_executor(self._tread_pool_exec, _update, ubq)

Expand All @@ -553,7 +540,7 @@ async def delete(self, filter_obj: IFilter) -> int:
:return: number of deleted entries
"""
def _delete(_by_filter):
search = Search(index=self._index, doc_type=self._mapping_type, using=self._es_client)
search = Search(index=self._index, using=self._es_client)
search = search.query(_by_filter)
return search.delete()

Expand All @@ -577,9 +564,9 @@ async def count(self, filter_obj: IFilter = None) -> int:
"""

def _count(_body):
return self._es_client.count(index=self._index, doc_type=self._mapping_type, body=_body)
return self._es_client.count(index=self._index, body=_body)

search = Search(index=self._index, doc_type=self._mapping_type, using=self._es_client)
search = Search(index=self._index, using=self._es_client)
if filter_obj is not None:
filter_by = self._query_converter.build(filter_obj)
search = search.query(filter_by)
Expand Down
4 changes: 2 additions & 2 deletions py-utils/src/utils/data/db/examples/consul_storage.py
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,7 @@ async def example():
"es_db": {
"import_path": "ElasticSearchDB",
"config": {
"host": "localhost",
"hosts": ["localhost"],
"port": 9200,
"login": "",
"password": ""
Expand All @@ -153,7 +153,7 @@ async def example():
"import_path": "ConsulDB",
"config":
{
"host": "127.0.0.1",
"hosts": ["127.0.0.1"],
"port": 8500, # HTTP API Port
"login": "",
"password": ""
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,7 @@ async def example():
"es_db": {
"import_path": "ElasticSearchDB",
"config": {
"host": "localhost",
"hosts": ["localhost"],
"port": 9200,
"login": "",
"password": ""
Expand Down
4 changes: 2 additions & 2 deletions py-utils/src/utils/schema/database.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
"consul_db": {
"import_path": "ConsulDB",
"config": {
"host": "localhost",
"hosts": ["127.0.0.1"],
"port": 8500,
"login": "",
"password": ""
Expand All @@ -29,7 +29,7 @@
"es_db": {
"import_path": "ElasticSearchDB",
"config": {
"host": "localhost",
"hosts": ["127.0.0.1"],
"port": 9200,
"login": "",
"password": "",
Expand Down

0 comments on commit 14c93c6

Please sign in to comment.