Skip to content

Commit

Permalink
feat: added index template for ElasticJobParameters
Browse files Browse the repository at this point in the history
  • Loading branch information
fstagni committed Jun 6, 2024
1 parent a5a813a commit 88f27b9
Show file tree
Hide file tree
Showing 3 changed files with 94 additions and 20 deletions.
24 changes: 22 additions & 2 deletions src/DIRAC/Core/Utilities/ElasticSearchDB.py
Original file line number Diff line number Diff line change
Expand Up @@ -210,6 +210,22 @@ def __init__(
except ElasticConnectionError as e:
sLog.error(repr(e))

# look for an index template (json file) in the current directory
self.indexTemplate = None
self._loadIndexTemplate()
if self.indexTemplate:
self.client.indices.put_template(name=self.__class__.__name, body=self.indexTemplate)

def _loadIndexTemplate(self):
"""
Load the index template from the current directory
"""
try:
with open(f"{self.__class__.__name__}.json") as f:
self.indexTemplate = json.load(f)
except Exception as e:
sLog.error("Cannot load index template", repr(e))

def getIndexPrefix(self):
"""
It returns the DIRAC setup.
Expand Down Expand Up @@ -436,8 +452,12 @@ def createIndex(self, indexPrefix, mapping=None, period="day"):
return S_OK(fullIndex)

try:
sLog.info("Create index: ", fullIndex + str(mapping))
self.client.indices.create(index=fullIndex, body={"mappings": mapping}) # ES7
if mapping:
sLog.info("Create index: ", fullIndex + str(mapping))
self.client.indices.create(index=fullIndex, body={"mappings": mapping}) # ES7
else:
sLog.info("Create index: ", fullIndex)
self.client.indices.create(index=fullIndex)

return S_OK(fullIndex)
except Exception as e: # pylint: disable=broad-except
Expand Down
71 changes: 71 additions & 0 deletions src/DIRAC/WorkloadManagementSystem/DB/ElasticJobParametersDB.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
{
"index_patterns": ["job_parameters_*"],
"settings": {
"number_of_shards": 1,
"number_of_replicas": 1
},
"mappings": {
"properties": {
"JobID": {
"type": "long"
},
"Status": {
"type": "keyword"
},
"timestamp": {
"type": "date"
},
"PilotAgent": {
"type": "keyword"
},
"Pilot_Reference": {
"type": "keyword"
},
"JobGroup": {
"type": "keyword"
},
"JobType": {
"type": "keyword"
},
"JobWrapperPID": {
"type": "long"
},
"CPUNormalizationFactor": {
"type": "long"
},
"NormCPUTime(s)": {
"type": "long"
},
"Memory(MB)": {
"type": "long"
},
"ModelName": {
"type": "keyword"
},
"LocalAccount": {
"type": "keyword"
},
"HostName": {
"type": "text"
},
"TotalCPUTime(s)": {
"type": "long"
},
"PayloadPID": {
"type": "long"
},
"CEQueue": {
"type": "keyword"
},
"BatchSystem": {
"type": "keyword"
},
"GridCE": {
"type": "keyword"
}
}
},
"aliases": {
"my_index_alias": {}
}
}
19 changes: 1 addition & 18 deletions src/DIRAC/WorkloadManagementSystem/DB/ElasticJobParametersDB.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,23 +12,6 @@
from DIRAC.Core.Base.ElasticDB import ElasticDB
from DIRAC.Core.Utilities import TimeUtilities

mapping = {
"properties": {
"JobID": {"type": "long"},
"timestamp": {"type": "date"},
"CPUNormalizationFactor": {"type": "long"},
"NormCPUTime(s)": {"type": "long"},
"Memory(kB)": {"type": "long"},
"TotalCPUTime(s)": {"type": "long"},
"MemoryUsed(kb)": {"type": "long"},
"HostName": {"type": "keyword"},
"GridCE": {"type": "keyword"},
"ModelName": {"type": "keyword"},
"Status": {"type": "keyword"},
"JobType": {"type": "keyword"},
}
}


class ElasticJobParametersDB(ElasticDB):
def __init__(self, parentLogger=None):
Expand Down Expand Up @@ -59,7 +42,7 @@ def _createIndex(self, indexName: str) -> None:
# Verifying if the index is there, and if not create it
res = self.existingIndex(indexName)
if not res["OK"] or not res["Value"]:
result = self.createIndex(indexName, mapping, period=None)
result = self.createIndex(indexName, period=None)
if not result["OK"]:
self.log.error(result["Message"])
raise RuntimeError(result["Message"])
Expand Down

0 comments on commit 88f27b9

Please sign in to comment.