From 88f27b93a6ca495d3b5c5580255c6f37c6679750 Mon Sep 17 00:00:00 2001 From: Federico Stagni Date: Thu, 6 Jun 2024 16:40:41 +0200 Subject: [PATCH] feat: added index template for ElasticJobParameters --- src/DIRAC/Core/Utilities/ElasticSearchDB.py | 24 ++++++- .../DB/ElasticJobParametersDB.json | 71 +++++++++++++++++++ .../DB/ElasticJobParametersDB.py | 19 +---- 3 files changed, 94 insertions(+), 20 deletions(-) create mode 100644 src/DIRAC/WorkloadManagementSystem/DB/ElasticJobParametersDB.json diff --git a/src/DIRAC/Core/Utilities/ElasticSearchDB.py b/src/DIRAC/Core/Utilities/ElasticSearchDB.py index 273661a5a7f..3de2703c7d6 100644 --- a/src/DIRAC/Core/Utilities/ElasticSearchDB.py +++ b/src/DIRAC/Core/Utilities/ElasticSearchDB.py @@ -210,6 +210,22 @@ def __init__( except ElasticConnectionError as e: sLog.error(repr(e)) + # look for an index template (json file) in the current directory + self.indexTemplate = None + self._loadIndexTemplate() + if self.indexTemplate: + self.client.indices.put_template(name=self.__class__.__name, body=self.indexTemplate) + + def _loadIndexTemplate(self): + """ + Load the index template from the current directory + """ + try: + with open(f"{self.__class__.__name__}.json") as f: + self.indexTemplate = json.load(f) + except Exception as e: + sLog.error("Cannot load index template", repr(e)) + def getIndexPrefix(self): """ It returns the DIRAC setup. @@ -436,8 +452,12 @@ def createIndex(self, indexPrefix, mapping=None, period="day"): return S_OK(fullIndex) try: - sLog.info("Create index: ", fullIndex + str(mapping)) - self.client.indices.create(index=fullIndex, body={"mappings": mapping}) # ES7 + if mapping: + sLog.info("Create index: ", fullIndex + str(mapping)) + self.client.indices.create(index=fullIndex, body={"mappings": mapping}) # ES7 + else: + sLog.info("Create index: ", fullIndex) + self.client.indices.create(index=fullIndex) return S_OK(fullIndex) except Exception as e: # pylint: disable=broad-except diff --git a/src/DIRAC/WorkloadManagementSystem/DB/ElasticJobParametersDB.json b/src/DIRAC/WorkloadManagementSystem/DB/ElasticJobParametersDB.json new file mode 100644 index 00000000000..17ca178a5f5 --- /dev/null +++ b/src/DIRAC/WorkloadManagementSystem/DB/ElasticJobParametersDB.json @@ -0,0 +1,71 @@ +{ + "index_patterns": ["job_parameters_*"], + "settings": { + "number_of_shards": 1, + "number_of_replicas": 1 + }, + "mappings": { + "properties": { + "JobID": { + "type": "long" + }, + "Status": { + "type": "keyword" + }, + "timestamp": { + "type": "date" + }, + "PilotAgent": { + "type": "keyword" + }, + "Pilot_Reference": { + "type": "keyword" + }, + "JobGroup": { + "type": "keyword" + }, + "JobType": { + "type": "keyword" + }, + "JobWrapperPID": { + "type": "long" + }, + "CPUNormalizationFactor": { + "type": "long" + }, + "NormCPUTime(s)": { + "type": "long" + }, + "Memory(MB)": { + "type": "long" + }, + "ModelName": { + "type": "keyword" + }, + "LocalAccount": { + "type": "keyword" + }, + "HostName": { + "type": "text" + }, + "TotalCPUTime(s)": { + "type": "long" + }, + "PayloadPID": { + "type": "long" + }, + "CEQueue": { + "type": "keyword" + }, + "BatchSystem": { + "type": "keyword" + }, + "GridCE": { + "type": "keyword" + } + } + }, + "aliases": { + "my_index_alias": {} + } +} diff --git a/src/DIRAC/WorkloadManagementSystem/DB/ElasticJobParametersDB.py b/src/DIRAC/WorkloadManagementSystem/DB/ElasticJobParametersDB.py index 14d4dec1a36..dae552ffb04 100644 --- a/src/DIRAC/WorkloadManagementSystem/DB/ElasticJobParametersDB.py +++ b/src/DIRAC/WorkloadManagementSystem/DB/ElasticJobParametersDB.py @@ -12,23 +12,6 @@ from DIRAC.Core.Base.ElasticDB import ElasticDB from DIRAC.Core.Utilities import TimeUtilities -mapping = { - "properties": { - "JobID": {"type": "long"}, - "timestamp": {"type": "date"}, - "CPUNormalizationFactor": {"type": "long"}, - "NormCPUTime(s)": {"type": "long"}, - "Memory(kB)": {"type": "long"}, - "TotalCPUTime(s)": {"type": "long"}, - "MemoryUsed(kb)": {"type": "long"}, - "HostName": {"type": "keyword"}, - "GridCE": {"type": "keyword"}, - "ModelName": {"type": "keyword"}, - "Status": {"type": "keyword"}, - "JobType": {"type": "keyword"}, - } -} - class ElasticJobParametersDB(ElasticDB): def __init__(self, parentLogger=None): @@ -59,7 +42,7 @@ def _createIndex(self, indexName: str) -> None: # Verifying if the index is there, and if not create it res = self.existingIndex(indexName) if not res["OK"] or not res["Value"]: - result = self.createIndex(indexName, mapping, period=None) + result = self.createIndex(indexName, period=None) if not result["OK"]: self.log.error(result["Message"]) raise RuntimeError(result["Message"])