Skip to content

Commit

Permalink
Merge pull request DIRACGrid#7291 from fstagni/80_fixes71
Browse files Browse the repository at this point in the history
[8.0] ElasticJobParametersDB: do not configure the IndexPrefix name
  • Loading branch information
chrisburr authored Nov 28, 2023
2 parents 51d91a2 + a45d852 commit 71c456a
Show file tree
Hide file tree
Showing 5 changed files with 61 additions and 84 deletions.
18 changes: 11 additions & 7 deletions src/DIRAC/Core/Utilities/ElasticSearchDB.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,21 +15,25 @@
try:
from opensearchpy import OpenSearch as Elasticsearch
from opensearchpy.exceptions import (
ConnectionError as ElasticConnectionError,
TransportError,
ConflictError,
NotFoundError,
RequestError,
ConflictError,
TransportError,
)
from opensearchpy.exceptions import (
ConnectionError as ElasticConnectionError,
)
from opensearchpy.helpers import BulkIndexError, bulk
except ImportError:
from elasticsearch import Elasticsearch
from elasticsearch.exceptions import (
ConnectionError as ElasticConnectionError,
TransportError,
ConflictError,
NotFoundError,
RequestError,
ConflictError,
TransportError,
)
from elasticsearch.exceptions import (
ConnectionError as ElasticConnectionError,
)
from elasticsearch.helpers import BulkIndexError, bulk

Expand All @@ -39,7 +43,7 @@
except ImportError:
from opensearch_dsl import A, Q, Search
except ImportError:
from elasticsearch_dsl import Search, Q, A
from elasticsearch_dsl import A, Q, Search


from DIRAC import S_ERROR, S_OK, gLogger
Expand Down
64 changes: 32 additions & 32 deletions src/DIRAC/Interfaces/API/Dirac.py
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,7 @@ def getRepositoryJobs(self, printOutput=False):
Example Usage:
>>> print dirac.getRepositoryJobs()
>>> print(dirac.getRepositoryJobs())
{'OK': True, 'Value': [1,2,3,4]}
:return: S_OK,S_ERROR
Expand All @@ -152,7 +152,7 @@ def monitorRepository(self, printOutput=False):
Example Usage:
>>> print dirac.monitorRepository()
>>> print(dirac.monitorRepository())
{'OK': True, 'Value': ''}
:returns: S_OK,S_ERROR
Expand All @@ -179,7 +179,7 @@ def retrieveRepositorySandboxes(self, requestedStates=None, destinationDirectory
Example Usage:
>>> print dirac.retrieveRepositorySandboxes(requestedStates=['Done','Failed'],destinationDirectory='sandboxes')
>>> print(dirac.retrieveRepositorySandboxes(requestedStates=['Done','Failed'],destinationDirectory='sandboxes'))
{'OK': True, 'Value': ''}
:param requestedStates: List of jobs states to be considered
Expand Down Expand Up @@ -214,7 +214,7 @@ def retrieveRepositoryData(self, requestedStates=None, destinationDirectory=""):
Example Usage:
>>> print dirac.retrieveRepositoryData(requestedStates=['Done'],destinationDirectory='outputData')
>>> print(dirac.retrieveRepositoryData(requestedStates=['Done'],destinationDirectory='outputData'))
{'OK': True, 'Value': ''}
:param requestedStates: List of jobs states to be considered
Expand Down Expand Up @@ -245,7 +245,7 @@ def removeRepository(self):
Example Usage:
>>> print dirac.removeRepository()
>>> print(dirac.removeRepository())
{'OK': True, 'Value': ''}
:returns: S_OK,S_ERROR
Expand All @@ -272,7 +272,7 @@ def resetRepository(self, jobIDs=None):
Example Usage:
>>> print dirac.resetRepository(jobIDs = [1111,2222,'3333'])
>>> print(dirac.resetRepository(jobIDs = [1111,2222,'3333']))
{'OK': True, 'Value': ''}
:returns: S_OK,S_ERROR
Expand Down Expand Up @@ -304,7 +304,7 @@ def submitJob(self, job, mode="wms"):
Example usage:
>>> print dirac.submitJob(job)
>>> print(dirac.submitJob(job))
{'OK': True, 'Value': '12345'}
:param job: Instance of Job class or JDL string
Expand Down Expand Up @@ -401,7 +401,7 @@ def getInputDataCatalog(self, lfns, siteName="", fileName="pool_xml_catalog.xml"
Example usage:
>>> print print d.getInputDataCatalog('/lhcb/a/b/c/00001680_00000490_5.dst',None,'myCat.xml')
>>> print(getInputDataCatalog('/lhcb/a/b/c/00001680_00000490_5.dst',None,'myCat.xml'))
{'Successful': {'<LFN>': {'pfntype': 'ROOT_All', 'protocol': 'SRM2',
'pfn': '<PFN>', 'turl': '<TURL>', 'guid': '3E3E097D-0AC0-DB11-9C0A-00188B770645',
'se': 'CERN-disk'}}, 'Failed': [], 'OK': True, 'Value': ''}
Expand Down Expand Up @@ -873,7 +873,7 @@ def getReplicas(self, lfns, active=True, preferDisk=False, diskOnly=False, print
Example usage:
>>> print dirac.getReplicas('/lhcb/data/CCRC08/RDST/00000106/0000/00000106_00006321_1.rdst')
>>> print(dirac.getReplicas('/lhcb/data/CCRC08/RDST/00000106/0000/00000106_00006321_1.rdst'))
{'OK': True, 'Value': {'Successful': {'/lhcb/data/CCRC08/RDST/00000106/0000/00000106_00006321_1.rdst':
{'CERN-RDST':
'srm://srm-lhcb.cern.ch/castor/cern.ch/grid/lhcb/data/CCRC08/RDST/00000106/0000/00000106_00006321_1.rdst'}},
Expand Down Expand Up @@ -929,7 +929,7 @@ def getReplicasForJobs(self, lfns, diskOnly=False, printOutput=False):
Example usage:
>>> print dirac.getReplicasForJobs('/lhcb/data/CCRC08/RDST/00000106/0000/00000106_00006321_1.rdst')
>>> print(dirac.getReplicasForJobs('/lhcb/data/CCRC08/RDST/00000106/0000/00000106_00006321_1.rdst'))
{'OK': True, 'Value': {'Successful': {'/lhcb/data/CCRC08/RDST/00000106/0000/00000106_00006321_1.rdst':
{'CERN-RDST':
'srm://srm-lhcb.cern.ch/castor/cern.ch/grid/lhcb/data/CCRC08/RDST/00000106/0000/00000106_00006321_1.rdst'}},
Expand Down Expand Up @@ -982,7 +982,7 @@ def getAllReplicas(self, lfns, printOutput=False):
Example usage:
>>> print dirac.getAllReplicas('/lhcb/data/CCRC08/RDST/00000106/0000/00000106_00006321_1.rdst')
>>> print(dirac.getAllReplicas('/lhcb/data/CCRC08/RDST/00000106/0000/00000106_00006321_1.rdst'))
{'OK': True, 'Value': {'Successful': {'/lhcb/data/CCRC08/RDST/00000106/0000/00000106_00006321_1.rdst':
{'CERN-RDST':
'srm://srm-lhcb.cern.ch/castor/cern.ch/grid/lhcb/data/CCRC08/RDST/00000106/0000/00000106_00006321_1.rdst'}},
Expand Down Expand Up @@ -1104,7 +1104,7 @@ def getLfnMetadata(self, lfns, printOutput=False):
Example usage:
>>> print dirac.getLfnMetadata('/lhcb/data/CCRC08/RDST/00000106/0000/00000106_00006321_1.rdst')
>>> print(dirac.getLfnMetadata('/lhcb/data/CCRC08/RDST/00000106/0000/00000106_00006321_1.rdst'))
{'OK': True, 'Value': {'Successful': {'/lhcb/data/CCRC08/RDST/00000106/0000/00000106_00006321_1.rdst':
{'Status': '-', 'Size': 619475828L, 'GUID': 'E871FBA6-71EA-DC11-8F0C-000E0C4DEB4B', 'ChecksumType': 'AD',
'CheckSumValue': ''}}, 'Failed': {}}}
Expand Down Expand Up @@ -1158,7 +1158,7 @@ def addFile(self, lfn, fullPath, diracSE, fileGuid=None, printOutput=False):
Example Usage:
>>> print dirac.addFile('/lhcb/user/p/paterson/myFile.tar.gz','myFile.tar.gz','CERN-USER')
>>> print(dirac.addFile('/lhcb/user/p/paterson/myFile.tar.gz','myFile.tar.gz','CERN-USER'))
{'OK': True, 'Value':{'Failed': {},
'Successful': {'/lhcb/user/p/paterson/test/myFile.tar.gz': {'put': 64.246301889419556,
'register': 1.1102778911590576}}}}
Expand Down Expand Up @@ -1199,7 +1199,7 @@ def getFile(self, lfn, destDir="", printOutput=False):
Example Usage:
>>> print dirac.getFile('/lhcb/user/p/paterson/myFile.tar.gz')
>>> print(dirac.getFile('/lhcb/user/p/paterson/myFile.tar.gz'))
{'OK': True, 'Value':{'Failed': {},
'Successful': {'/lhcb/user/p/paterson/test/myFile.tar.gz': '/afs/cern.ch/user/p/paterson/myFile.tar.gz'}}}
Expand Down Expand Up @@ -1238,7 +1238,7 @@ def replicateFile(self, lfn, destinationSE, sourceSE="", localCache="", printOut
Example Usage:
>>> print dirac.replicateFile('/lhcb/user/p/paterson/myFile.tar.gz','CNAF-USER')
>>> print(dirac.replicateFile('/lhcb/user/p/paterson/myFile.tar.gz','CNAF-USER'))
{'OK': True, 'Value':{'Failed': {},
'Successful': {'/lhcb/user/p/paterson/test/myFile.tar.gz': {'register': 0.44766902923583984,
'replicate': 56.42345404624939}}}}
Expand Down Expand Up @@ -1301,7 +1301,7 @@ def replicate(self, lfn, destinationSE, sourceSE="", printOutput=False):
Example Usage:
>>> print dirac.replicate('/lhcb/user/p/paterson/myFile.tar.gz','CNAF-USER')
>>> print(dirac.replicate('/lhcb/user/p/paterson/myFile.tar.gz','CNAF-USER'))
{'OK': True, 'Value':{'Failed': {},
'Successful': {'/lhcb/user/p/paterson/test/myFile.tar.gz': {'register': 0.44766902923583984}}}}
Expand Down Expand Up @@ -1341,7 +1341,7 @@ def getAccessURL(self, lfn, storageElement, printOutput=False, protocol=False):
Example Usage:
>>> print dirac.getAccessURL('/lhcb/data/CCRC08/DST/00000151/0000/00000151_00004848_2.dst','CERN-RAW')
>>> print(dirac.getAccessURL('/lhcb/data/CCRC08/DST/00000151/0000/00000151_00004848_2.dst','CERN-RAW'))
{'OK': True, 'Value': {'Successful': {'srm://...': {'SRM2': 'rfio://...'}}, 'Failed': {}}}
:param lfn: Logical File Name (LFN)
Expand Down Expand Up @@ -1374,7 +1374,7 @@ def getPhysicalFileAccessURL(self, pfn, storageElement, printOutput=False):
Example Usage:
>>> print dirac.getPhysicalFileAccessURL('srm://srm-lhcb.cern.ch/castor/cern.ch/grid/lhcb/data/CCRC08/DST/00000151/0000/00000151_00004848_2.dst','CERN_M-DST')
>>> print(dirac.getPhysicalFileAccessURL('srm://srm-lhcb.cern.ch/castor/cern.ch/grid/lhcb/data/CCRC08/DST/00000151/0000/00000151_00004848_2.dst','CERN_M-DST'))
{'OK': True, 'Value':{'Failed': {},
'Successful': {'srm://srm-lhcb.cern.ch/castor/cern.ch/grid/lhcb/data/CCRC08/DST/00000151/0000/00000151_00004848_2.dst': {'RFIO': 'castor://...'}}}}
Expand Down Expand Up @@ -1406,7 +1406,7 @@ def getPhysicalFileMetadata(self, pfn, storageElement, printOutput=False):
Example Usage:
>>> print dirac.getPhysicalFileMetadata('srm://srm.grid.sara.nl/pnfs/grid.sara.nl/data
>>> print(dirac.getPhysicalFileMetadata('srm://srm.grid.sara.nl/pnfs/grid.sara.nl/data)
/lhcb/data/CCRC08/RAW/LHCb/CCRC/23341/023341_0000039571.raw','NIKHEF-RAW')
{'OK': True, 'Value': {'Successful': {'srm://...': {'SRM2': 'rfio://...'}}, 'Failed': {}}}
Expand Down Expand Up @@ -1437,7 +1437,7 @@ def removeFile(self, lfn, printOutput=False):
Example Usage:
>>> print dirac.removeFile('LFN:/lhcb/data/CCRC08/RAW/LHCb/CCRC/22808/022808_0000018443.raw')
>>> print(dirac.removeFile('LFN:/lhcb/data/CCRC08/RAW/LHCb/CCRC/22808/022808_0000018443.raw'))
{'OK': True, 'Value':...}
:param lfn: Logical File Name (LFN)
Expand Down Expand Up @@ -1465,7 +1465,7 @@ def removeReplica(self, lfn, storageElement, printOutput=False):
Example Usage:
>>> print dirac.removeReplica('LFN:/lhcb/user/p/paterson/myDST.dst','CERN-USER')
>>> print(dirac.removeReplica('LFN:/lhcb/user/p/paterson/myDST.dst','CERN-USER'))
{'OK': True, 'Value':...}
:param lfn: Logical File Name (LFN)
Expand Down Expand Up @@ -1496,7 +1496,7 @@ def getInputSandbox(self, jobID, outputDir=None):
Example Usage:
>>> print dirac.getInputSandbox(12345)
>>> print(dirac.getInputSandbox(12345))
{'OK': True, 'Value': ['Job__Sandbox__.tar.bz2']}
:param jobID: JobID
Expand Down Expand Up @@ -1544,7 +1544,7 @@ def getOutputSandbox(self, jobID, outputDir=None, oversized=True, noJobDir=False
Example Usage:
>>> print dirac.getOutputSandbox(12345)
>>> print(dirac.getOutputSandbox(12345))
{'OK': True, 'Value': ['Job__Sandbox__.tar.bz2']}
:param jobID: JobID
Expand Down Expand Up @@ -1639,7 +1639,7 @@ def deleteJob(self, jobID):
Example Usage:
>>> print dirac.deleteJob(12345)
>>> print(dirac.deleteJob(12345))
{'OK': True, 'Value': [12345]}
:param jobID: JobID
Expand Down Expand Up @@ -1676,7 +1676,7 @@ def rescheduleJob(self, jobID):
Example Usage:
>>> print dirac.rescheduleJob(12345)
>>> print(dirac.rescheduleJob(12345))
{'OK': True, 'Value': [12345]}
:param jobID: JobID
Expand Down Expand Up @@ -1745,7 +1745,7 @@ def getJobStatus(self, jobID):
Example Usage:
>>> print dirac.getJobStatus(79241)
>>> print(dirac.getJobStatus(79241))
{79241: {'Status': 'Done',
'MinorStatus': 'Execution Complete',
'ApplicationStatus': 'some app status'
Expand Down Expand Up @@ -2258,7 +2258,7 @@ def getJobAttributes(self, jobID, printOutput=False):
Example Usage:
>>> print dirac.getJobAttributes(79241)
>>> print(dirac.getJobAttributes(79241))
{'AccountedFlag': 'False','ApplicationNumStatus': '0',
'ApplicationStatus': 'Job Finished Successfully',
'CPUTime': '0.0','DIRACSetup': 'LHCb-Production'}
Expand Down Expand Up @@ -2295,7 +2295,7 @@ def getJobParameters(self, jobID, printOutput=False):
Example Usage:
>>> print dirac.getJobParameters(79241)
>>> print(dirac.getJobParameters(79241))
{'OK': True, 'Value': {'JobPath': 'JobPath,JobSanity,JobPolicy,InputData,JobScheduling,TaskQueue',
'JobSanityCheck': 'Job: 768 JDL: OK, InputData: 2 LFNs OK, '}
Expand Down Expand Up @@ -2334,7 +2334,7 @@ def getJobLoggingInfo(self, jobID, printOutput=False):
Example Usage:
>>> print dirac.getJobLoggingInfo(79241)
>>> print(dirac.getJobLoggingInfo(79241))
{'OK': True, 'Value': [('Received', 'JobPath', 'Unknown', '2008-01-29 15:37:09', 'JobPathAgent'),
('Checking', 'JobSanity', 'Unknown', '2008-01-29 15:37:14', 'JobSanityAgent')]}
Expand Down Expand Up @@ -2377,7 +2377,7 @@ def peekJob(self, jobID, printOutput=False):
Example Usage:
>>> print dirac.peekJob(1484)
>>> print(dirac.peekJob(1484))
{'OK': True, 'Value': 'Job peek result'}
:param jobID: JobID
Expand Down Expand Up @@ -2415,7 +2415,7 @@ def pingService(self, system, service, printOutput=False, url=None):
Example Usage:
>>> print dirac.pingService('WorkloadManagement','JobManager')
>>> print(dirac.pingService('WorkloadManagement','JobManager'))
{'OK': True, 'Value': 'Job ping result'}
:param system: system
Expand Down Expand Up @@ -2463,7 +2463,7 @@ def getJobJDL(self, jobID, original=False, printOutput=False):
Example Usage:
>>> print dirac.getJobJDL(12345)
>>> print(dirac.getJobJDL(12345))
{'Arguments': 'jobDescription.xml',...}
:param jobID: JobID
Expand Down
31 changes: 6 additions & 25 deletions src/DIRAC/WorkloadManagementSystem/DB/ElasticJobParametersDB.py
Original file line number Diff line number Diff line change
@@ -1,37 +1,19 @@
""" Module containing a front-end to the ElasticSearch-based ElasticJobParametersDB.
This module interacts with one ES index: "ElasticJobParametersDB",
which is a drop-in replacement for MySQL-based table JobDB.JobParameters.
While JobDB.JobParameters in MySQL is defined as::
CREATE TABLE `JobParameters` (
`JobID` INT(11) UNSIGNED NOT NULL,
`Name` VARCHAR(100) NOT NULL,
`Value` TEXT NOT NULL,
PRIMARY KEY (`JobID`,`Name`),
FOREIGN KEY (`JobID`) REFERENCES `Jobs`(`JobID`)
) ENGINE=InnoDB DEFAULT CHARSET=latin1;
Here we define a dynamic mapping with the constant fields::
"JobID": {"type": "long"},
"timestamp": {"type": "date"},
and all other custom fields added dynamically.
This is a drop-in replacement for MySQL-based table JobDB.JobParameters.
The reason for switching to a ES-based JobParameters lies in the extended searching
capabilities of ES..
capabilities of ES.
This results in higher traceability for DIRAC jobs.
The following class methods are provided for public usage
- getJobParameters()
- setJobParameter()
- deleteJobParameters()
"""
from DIRAC import S_OK, S_ERROR, gConfig
from DIRAC.Core.Utilities import TimeUtilities
from DIRAC.ConfigurationSystem.Client.PathFinder import getDatabaseSection
from DIRAC import S_ERROR, S_OK
from DIRAC.ConfigurationSystem.Client.Helpers import CSGlobals
from DIRAC.Core.Base.ElasticDB import ElasticDB
from DIRAC.Core.Utilities import TimeUtilities

try:
from opensearchpy.exceptions import NotFoundError, RequestError
Expand Down Expand Up @@ -64,14 +46,13 @@ def __init__(self, parentLogger=None):
"""Standard Constructor"""

try:
section = getDatabaseSection("WorkloadManagement/ElasticJobParametersDB")
indexPrefix = gConfig.getValue(f"{section}/IndexPrefix", CSGlobals.getSetup()).lower()
indexPrefix = CSGlobals.getSetup().lower()

# Connecting to the ES cluster
super().__init__(name, "WorkloadManagement/ElasticJobParametersDB", indexPrefix, parentLogger=parentLogger)
except Exception as ex:
self.log.error("Can't connect to ElasticJobParametersDB", repr(ex))
raise RuntimeError("Can't connect to ElasticJobParametersDB")
raise RuntimeError("Can't connect to ElasticJobParametersDB") from ex

self.oldIndexName = f"{self.getIndexPrefix()}_{name.lower()}"
self.indexName_base = f"{self.getIndexPrefix()}_elasticjobparameters_index"
Expand Down
Loading

0 comments on commit 71c456a

Please sign in to comment.