diff --git a/src/DIRAC/Core/Utilities/ElasticSearchDB.py b/src/DIRAC/Core/Utilities/ElasticSearchDB.py index 273661a5a7f..ff197dec64f 100644 --- a/src/DIRAC/Core/Utilities/ElasticSearchDB.py +++ b/src/DIRAC/Core/Utilities/ElasticSearchDB.py @@ -273,14 +273,14 @@ def getDoc(self, index: str, docID: str) -> dict: return S_ERROR(re) @ifConnected - def getDocs(self, indexFunc, docIDs: list[str]) -> list[dict]: + def getDocs(self, indexFunc, docIDs: list[str], vo: str) -> list[dict]: """Efficiently retrieve many documents from an index. :param index: name of the index :param docIDs: document IDs """ sLog.debug(f"Retrieving documents {docIDs}") - docs = [{"_index": indexFunc(docID), "_id": docID} for docID in docIDs] + docs = [{"_index": indexFunc(docID, vo), "_id": docID} for docID in docIDs] try: response = self.client.mget({"docs": docs}) except RequestError as re: diff --git a/src/DIRAC/WorkloadManagementSystem/Agent/OptimizerModule.py b/src/DIRAC/WorkloadManagementSystem/Agent/OptimizerModule.py index 61d421f9094..30a4a7dfab8 100755 --- a/src/DIRAC/WorkloadManagementSystem/Agent/OptimizerModule.py +++ b/src/DIRAC/WorkloadManagementSystem/Agent/OptimizerModule.py @@ -46,9 +46,7 @@ def initialize(self, jobDB=None, logDB=None): if not self.jobDB.isValid(): dExit(1) - result = ObjectLoader().loadObject( - "WorkloadManagementSystem.DB.JobParametersDB", "JobParametersDB" - ) + result = ObjectLoader().loadObject("WorkloadManagementSystem.DB.JobParametersDB", "JobParametersDB") if not result["OK"]: return result self.elasticJobParametersDB = result["Value"]() diff --git a/src/DIRAC/WorkloadManagementSystem/DB/JobParametersDB.py b/src/DIRAC/WorkloadManagementSystem/DB/JobParametersDB.py index b4e40e1a76c..6059674db00 100644 --- a/src/DIRAC/WorkloadManagementSystem/DB/JobParametersDB.py +++ b/src/DIRAC/WorkloadManagementSystem/DB/JobParametersDB.py @@ -43,13 +43,13 @@ def __init__(self, parentLogger=None): except Exception as ex: raise RuntimeError("Can't connect to JobParameters index") from ex - def _indexName(self, jobID: int) -> str: + def _indexName(self, jobID: int, vo: str) -> str: """construct the index name :param jobID: Job ID """ indexSplit = int(int(jobID) // 1e6) - return f"{self.index_name}_{indexSplit}m" + return f"{self.index_name}_{vo}_{indexSplit}m" def _createIndex(self, indexName: str) -> None: """Create a new index if needed @@ -65,7 +65,7 @@ def _createIndex(self, indexName: str) -> None: raise RuntimeError(result["Message"]) self.log.always("Index created:", indexName) - def getJobParameters(self, jobIDs: Union[int, list[int]], paramList=None) -> dict: + def getJobParameters(self, jobIDs: Union[int, list[int]], vo: str, paramList=None) -> dict: """Get Job Parameters defined for jobID. Returns a dictionary with the Job Parameters. If paramList is empty - all the parameters are returned. @@ -81,7 +81,7 @@ def getJobParameters(self, jobIDs: Union[int, list[int]], paramList=None) -> dic paramList = paramList.replace(" ", "").split(",") self.log.debug(f"JobDB.getParameters: Getting Parameters for jobs {jobIDs}") - res = self.getDocs(self._indexName, jobIDs) + res = self.getDocs(self._indexName, jobIDs, vo) if not res["OK"]: return res result = {} @@ -93,7 +93,7 @@ def getJobParameters(self, jobIDs: Union[int, list[int]], paramList=None) -> dic return S_OK(result) - def setJobParameter(self, jobID: int, key: str, value: str) -> dict: + def setJobParameter(self, jobID: int, key: str, value: str, vo: str) -> dict: """ Inserts data into JobParametersDB index @@ -110,18 +110,18 @@ def setJobParameter(self, jobID: int, key: str, value: str) -> dict: # The _id in ES can't exceed 512 bytes, this is a ES hard-coded limitation. # If a record with this jobID update and add parameter, otherwise create a new record - if self.existsDoc(self._indexName(jobID), docID=str(jobID)): + if self.existsDoc(self._indexName(jobID, vo), docID=str(jobID)): self.log.debug("A document for this job already exists, it will now be updated") - result = self.updateDoc(index=self._indexName(jobID), docID=str(jobID), body={"doc": data}) + result = self.updateDoc(index=self._indexName(jobID, vo), docID=str(jobID), body={"doc": data}) else: self.log.debug("No document has this job id, creating a new document for this job") - self._createIndex(self._indexName(jobID)) - result = self.index(indexName=self._indexName(jobID), body=data, docID=str(jobID)) + self._createIndex(self._indexName(jobID, vo)) + result = self.index(indexName=self._indexName(jobID, vo), body=data, docID=str(jobID)) if not result["OK"]: self.log.error("Couldn't insert or update data", result["Message"]) return result - def setJobParameters(self, jobID: int, parameters: list) -> dict: + def setJobParameters(self, jobID: int, parameters: list, vo: str) -> dict: """ Inserts data into JobParametersDB index using bulk indexing @@ -130,24 +130,24 @@ def setJobParameters(self, jobID: int, parameters: list) -> dict: :param parameters: list of tuples (name, value) pairs :returns: S_OK/S_ERROR as result of indexing """ - self.log.debug("Inserting parameters", f"in {self._indexName(jobID)}: for job {jobID}: {parameters}") + self.log.debug("Inserting parameters", f"in {self._indexName(jobID, vo)}: for job {jobID}: {parameters}") parametersDict = dict(parameters) parametersDict["JobID"] = jobID parametersDict["timestamp"] = int(TimeUtilities.toEpochMilliSeconds()) - if self.existsDoc(self._indexName(jobID), docID=str(jobID)): + if self.existsDoc(self._indexName(jobID, vo), docID=str(jobID)): self.log.debug("A document for this job already exists, it will now be updated") - result = self.updateDoc(index=self._indexName(jobID), docID=str(jobID), body={"doc": parametersDict}) + result = self.updateDoc(index=self._indexName(jobID, vo), docID=str(jobID), body={"doc": parametersDict}) else: self.log.debug("Creating a new document for this job") - self._createIndex(self._indexName(jobID)) - result = self.index(self._indexName(jobID), body=parametersDict, docID=str(jobID)) + self._createIndex(self._indexName(jobID, vo)) + result = self.index(self._indexName(jobID, vo), body=parametersDict, docID=str(jobID)) if not result["OK"]: self.log.error("Couldn't insert or update data", result["Message"]) return result - def deleteJobParameters(self, jobID: int, paramList=None) -> dict: + def deleteJobParameters(self, jobID: int, paramList=None, vo: str = "") -> dict: """Deletes Job Parameters defined for jobID. Returns a dictionary with the Job Parameters. If paramList is empty - all the parameters for the job are removed @@ -164,13 +164,13 @@ def deleteJobParameters(self, jobID: int, paramList=None) -> dict: if not paramList: # Deleting the whole record self.log.debug("Deleting record of job {jobID}") - result = self.deleteDoc(self._indexName(jobID), docID=str(jobID)) + result = self.deleteDoc(self._indexName(jobID, vo), docID=str(jobID)) else: # Deleting the specific parameters self.log.debug(f"JobDB.getParameters: Deleting Parameters {paramList} for job {jobID}") for paramName in paramList: result = self.updateDoc( - index=self._indexName(jobID), + index=self._indexName(jobID, vo), docID=str(jobID), body={"script": "ctx._source.remove('" + paramName + "')"}, ) diff --git a/src/DIRAC/WorkloadManagementSystem/Service/JobMonitoringHandler.py b/src/DIRAC/WorkloadManagementSystem/Service/JobMonitoringHandler.py index 27880e81118..4af546b8868 100755 --- a/src/DIRAC/WorkloadManagementSystem/Service/JobMonitoringHandler.py +++ b/src/DIRAC/WorkloadManagementSystem/Service/JobMonitoringHandler.py @@ -35,9 +35,7 @@ def initializeHandler(cls, svcInfoDict): except RuntimeError as excp: return S_ERROR(f"Can't connect to DB: {excp}") - result = ObjectLoader().loadObject( - "WorkloadManagementSystem.DB.JobParametersDB", "JobParametersDB" - ) + result = ObjectLoader().loadObject("WorkloadManagementSystem.DB.JobParametersDB", "JobParametersDB") if not result["OK"]: return result cls.elasticJobParametersDB = result["Value"](parentLogger=cls.log) diff --git a/src/DIRAC/WorkloadManagementSystem/Service/JobStateUpdateHandler.py b/src/DIRAC/WorkloadManagementSystem/Service/JobStateUpdateHandler.py index 6b8c979a71c..d63ba1d9c01 100755 --- a/src/DIRAC/WorkloadManagementSystem/Service/JobStateUpdateHandler.py +++ b/src/DIRAC/WorkloadManagementSystem/Service/JobStateUpdateHandler.py @@ -9,7 +9,8 @@ import time -from DIRAC import S_OK, S_ERROR +from DIRAC import S_ERROR, S_OK +from DIRAC.ConfigurationSystem.Client.Helpers import Registry from DIRAC.Core.DISET.RequestHandler import RequestHandler from DIRAC.Core.Utilities.DEncode import ignoreEncodeWarning from DIRAC.Core.Utilities.ObjectLoader import ObjectLoader @@ -37,9 +38,7 @@ def initializeHandler(cls, svcInfoDict): except RuntimeError as excp: return S_ERROR(f"Can't connect to DB: {excp}") - result = ObjectLoader().loadObject( - "WorkloadManagementSystem.DB.JobParametersDB", "JobParametersDB" - ) + result = ObjectLoader().loadObject("WorkloadManagementSystem.DB.JobParametersDB", "JobParametersDB") if not result["OK"]: return result cls.elasticJobParametersDB = result["Value"]() @@ -159,8 +158,9 @@ def export_setJobParameter(cls, jobID, name, value): """Set arbitrary parameter specified by name/value pair for job specified by its JobId """ - - return cls.elasticJobParametersDB.setJobParameter(int(jobID), name, value) # pylint: disable=no-member + credDict = cls.getRemoteCredentials() + vo = credDict.get("VO", Registry.getVOForGroup(credDict["group"])) + return cls.elasticJobParametersDB.setJobParameter(int(jobID), name, value, vo=vo) # pylint: disable=no-member ########################################################################### types_setJobsParameter = [dict] @@ -196,7 +196,9 @@ def export_setJobParameters(cls, jobID, parameters): """Set arbitrary parameters specified by a list of name/value pairs for job specified by its JobId """ - result = cls.elasticJobParametersDB.setJobParameters(int(jobID), parameters) + credDict = cls.getRemoteCredentials() + vo = credDict.get("VO", Registry.getVOForGroup(credDict["group"])) + result = cls.elasticJobParametersDB.setJobParameters(int(jobID), parameters, vo=vo) if not result["OK"]: cls.log.error("Failed to add Job Parameters to JobParametersDB", result["Message"]) diff --git a/src/DIRAC/WorkloadManagementSystem/Service/WMSAdministratorHandler.py b/src/DIRAC/WorkloadManagementSystem/Service/WMSAdministratorHandler.py index 63037702c7f..d62d649288e 100755 --- a/src/DIRAC/WorkloadManagementSystem/Service/WMSAdministratorHandler.py +++ b/src/DIRAC/WorkloadManagementSystem/Service/WMSAdministratorHandler.py @@ -21,9 +21,7 @@ def initializeHandler(cls, svcInfoDict): except RuntimeError as excp: return S_ERROR(f"Can't connect to DB: {excp!r}") - result = ObjectLoader().loadObject( - "WorkloadManagementSystem.DB.JobParametersDB", "JobParametersDB" - ) + result = ObjectLoader().loadObject("WorkloadManagementSystem.DB.JobParametersDB", "JobParametersDB") if not result["OK"]: return result cls.elasticJobParametersDB = result["Value"]() diff --git a/src/DIRAC/WorkloadManagementSystem/Utilities/JobStatusUtility.py b/src/DIRAC/WorkloadManagementSystem/Utilities/JobStatusUtility.py index 906b2728929..1440d86bd1a 100644 --- a/src/DIRAC/WorkloadManagementSystem/Utilities/JobStatusUtility.py +++ b/src/DIRAC/WorkloadManagementSystem/Utilities/JobStatusUtility.py @@ -54,9 +54,7 @@ def __init__( raise if not self.elasticJobParametersDB: - result = ObjectLoader().loadObject( - "WorkloadManagementSystem.DB.JobParametersDB", "JobParametersDB" - ) + result = ObjectLoader().loadObject("WorkloadManagementSystem.DB.JobParametersDB", "JobParametersDB") if not result["OK"]: raise AttributeError(result["Message"]) self.elasticJobParametersDB = result["Value"](parentLogger=self.log) diff --git a/tests/Integration/WorkloadManagementSystem/Test_JobParametersDB.py b/tests/Integration/WorkloadManagementSystem/Test_JobParametersDB.py index 389f2754a91..a22251126f5 100644 --- a/tests/Integration/WorkloadManagementSystem/Test_JobParametersDB.py +++ b/tests/Integration/WorkloadManagementSystem/Test_JobParametersDB.py @@ -18,7 +18,7 @@ def test_setAndGetJobFromDB(): - res = elasticJobParametersDB.setJobParameter(100, "DIRAC", "dirac@cern") + res = elasticJobParametersDB.setJobParameter(100, "DIRAC", "dirac@cern", "vo") assert res["OK"] time.sleep(SLEEP_DELAY) @@ -27,7 +27,7 @@ def test_setAndGetJobFromDB(): assert res["Value"][100]["DIRAC"] == "dirac@cern" # update it - res = elasticJobParametersDB.setJobParameter(100, "DIRAC", "dirac@cern.cern") + res = elasticJobParametersDB.setJobParameter(100, "DIRAC", "dirac@cern.cern", "vo") assert res["OK"] time.sleep(SLEEP_DELAY) res = elasticJobParametersDB.getJobParameters(100) @@ -41,7 +41,7 @@ def test_setAndGetJobFromDB(): assert res["Value"][100]["DIRAC"] == "dirac@cern.cern" # add one - res = elasticJobParametersDB.setJobParameter(100, "someKey", "someValue") + res = elasticJobParametersDB.setJobParameter(100, "someKey", "someValue", "vo") assert res["OK"] time.sleep(SLEEP_DELAY) @@ -60,7 +60,7 @@ def test_setAndGetJobFromDB(): assert res["Value"][100]["someKey"] == "someValue" # another one + search - res = elasticJobParametersDB.setJobParameter(100, "someOtherKey", "someOtherValue") + res = elasticJobParametersDB.setJobParameter(100, "someOtherKey", "someOtherValue", "vo") assert res["OK"] time.sleep(SLEEP_DELAY) res = elasticJobParametersDB.getJobParameters(100) @@ -75,13 +75,13 @@ def test_setAndGetJobFromDB(): assert res["Value"][100]["someOtherKey"] == "someOtherValue" # another job - res = elasticJobParametersDB.setJobParameter(101, "DIRAC", "dirac@cern") + res = elasticJobParametersDB.setJobParameter(101, "DIRAC", "dirac@cern", "vo") assert res["OK"] - res = elasticJobParametersDB.setJobParameter(101, "key101", "value101") + res = elasticJobParametersDB.setJobParameter(101, "key101", "value101", "vo") assert res["OK"] - res = elasticJobParametersDB.setJobParameter(101, "someKey", "value101") + res = elasticJobParametersDB.setJobParameter(101, "someKey", "value101", "vo") assert res["OK"] - res = elasticJobParametersDB.setJobParameter(101, "key101", "someValue") + res = elasticJobParametersDB.setJobParameter(101, "key101", "someValue", "vo") assert res["OK"] time.sleep(SLEEP_DELAY) res = elasticJobParametersDB.getJobParameters(100) @@ -98,7 +98,7 @@ def test_setAndGetJobFromDB(): assert res["Value"][101]["someKey"] == "value101" assert len(res["Value"]) == 1 assert len(res["Value"][101]) == 5 # Same thing as with doc 100 - res = elasticJobParametersDB.setJobParameters(101, [("k", "v"), ("k1", "v1"), ("k2", "v2")]) + res = elasticJobParametersDB.setJobParameters(101, [("k", "v"), ("k1", "v1"), ("k2", "v2")], vo="vo") assert res["OK"] time.sleep(SLEEP_DELAY) res = elasticJobParametersDB.getJobParameters(101) @@ -108,7 +108,7 @@ def test_setAndGetJobFromDB(): assert res["Value"][101]["k2"] == "v2" # another job with jobID > 1000000 - res = elasticJobParametersDB.setJobParameters(1010000, [("k", "v"), ("k1", "v1"), ("k2", "v2")]) + res = elasticJobParametersDB.setJobParameters(1010000, [("k", "v"), ("k1", "v1"), ("k2", "v2")], vo="vo") assert res["OK"] time.sleep(SLEEP_DELAY) res = elasticJobParametersDB.getJobParameters(1010000) @@ -116,33 +116,33 @@ def test_setAndGetJobFromDB(): assert res["Value"][1010000]["k2"] == "v2" # deleting - res = elasticJobParametersDB.deleteJobParameters(100) + res = elasticJobParametersDB.deleteJobParameters(100, vo="vo") assert res["OK"] time.sleep(SLEEP_DELAY) res = elasticJobParametersDB.getJobParameters(100) assert res["OK"] assert len(res["Value"][100]) == 0 - res = elasticJobParametersDB.deleteJobParameters(101, "someKey") + res = elasticJobParametersDB.deleteJobParameters(101, "someKey", vo="vo") assert res["OK"] time.sleep(SLEEP_DELAY) res = elasticJobParametersDB.getJobParameters(101) assert res["OK"] assert len(res["Value"][101]) == 7 - res = elasticJobParametersDB.deleteJobParameters(101, "someKey, key101") # someKey is already deleted + res = elasticJobParametersDB.deleteJobParameters(101, "someKey, key101", vo="vo") # someKey is already deleted assert res["OK"] time.sleep(SLEEP_DELAY) res = elasticJobParametersDB.getJobParameters(101) assert res["OK"] assert len(res["Value"][101]) == 6 - res = elasticJobParametersDB.deleteJobParameters(101, "nonExistingKey") + res = elasticJobParametersDB.deleteJobParameters(101, "nonExistingKey", vo="vo") assert res["OK"] time.sleep(SLEEP_DELAY) res = elasticJobParametersDB.getJobParameters(101) assert res["OK"] assert len(res["Value"][101]) == 6 - res = elasticJobParametersDB.deleteJobParameters(1010000) + res = elasticJobParametersDB.deleteJobParameters(1010000, vo="vo") assert res["OK"] time.sleep(SLEEP_DELAY) res = elasticJobParametersDB.getJobParameters(1010000) @@ -153,7 +153,7 @@ def test_setAndGetJobFromDB(): res = elasticJobParametersDB.deleteIndex("job_parameters") assert res["OK"] assert res["Value"] == "Nothing to delete" - res = elasticJobParametersDB.deleteIndex(elasticJobParametersDB._indexName(100)) + res = elasticJobParametersDB.deleteIndex(elasticJobParametersDB._indexName(100, vo="vo")) assert res["OK"] - res = elasticJobParametersDB.deleteIndex(elasticJobParametersDB._indexName(1010000)) + res = elasticJobParametersDB.deleteIndex(elasticJobParametersDB._indexName(1010000, vo="vo")) assert res["OK"]