Skip to content

Commit

Permalink
Merge pull request #6963 from aldbr/rel-v7r3_FIX_RemoteRunner
Browse files Browse the repository at this point in the history
[v7r3] PushJobAgent: fixes and features
  • Loading branch information
fstagni authored Apr 12, 2023
2 parents 784f8d8 + 06592b5 commit b65cb3d
Show file tree
Hide file tree
Showing 9 changed files with 397 additions and 134 deletions.
12 changes: 11 additions & 1 deletion docs/source/AdministratorGuide/Resources/supercomputers.rst
Original file line number Diff line number Diff line change
Expand Up @@ -133,14 +133,24 @@ One has also to authorize the machine hosting the :mod:`~DIRAC.WorkloadManagemen
Properties += GenericPilot
Properties += FileCatalogManagement

One has to specify the concerned VO in the targeted CEs, such as::
One has to specify the concerned VO, the platform and the CPU Power in the targeted CEs as well as , such as::

<CE>
{
# To match a <VO> job
VO = <VO>
# Required because we are on a host (not on a worker node)
VirtualOrganization = <VO>
# To match compatible jobs
Platform = <platform>
Queues
{
<Queue>
{
CPUNormalizationFactor = <CPU Power value>
}
}

}

Finally, one has to make sure that job scheduling parameters are correctly fine-tuned. Further details in the :ref:`JobScheduling section <jobscheduling>`.
Expand Down
2 changes: 1 addition & 1 deletion src/DIRAC/Resources/Computing/ARC6ComputingElement.py
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,7 @@ def submitJob(self, executableFile, proxy, numberOfJobs=1):
jobdescs = arc.JobDescriptionList()

# Get the job into the ARC way
xrslString, diracStamp = self._writeXRSL(executableFile)
xrslString, diracStamp = self._writeXRSL(executableFile, [], [])
self.log.debug("XRSL string submitted : %s" % xrslString)
self.log.debug("DIRAC stamp for job : %s" % diracStamp)

Expand Down
64 changes: 31 additions & 33 deletions src/DIRAC/Resources/Computing/ARCComputingElement.py
Original file line number Diff line number Diff line change
Expand Up @@ -167,13 +167,12 @@ def _addCEConfigDefaults(self):
ComputingElement._addCEConfigDefaults(self)

#############################################################################
def _writeXRSL(self, executableFile, inputs=None, outputs=None, executables=None):
def _writeXRSL(self, executableFile, inputs, outputs):
"""Create the JDL for submission
:param str executableFile: executable to wrap in a XRSL file
:param str/list inputs: path of the dependencies to include along with the executable
:param str/list outputs: path of the outputs that we want to get at the end of the execution
:param str/list executables: path to inputs that should have execution mode on the remote worker node
:param list inputs: path of the dependencies to include along with the executable
:param list outputs: path of the outputs that we want to get at the end of the execution
"""
diracStamp = makeGuid()[:8]
# Evaluate the number of processors to allocate
Expand All @@ -191,34 +190,25 @@ def _writeXRSL(self, executableFile, inputs=None, outputs=None, executables=None
"xrslMPExtraString": self.xrslMPExtraString,
}

# Files that would need execution rights on the remote worker node
xrslExecutables = ""
if executables:
if not isinstance(executables, list):
executables = [executables]
xrslExecutables = "(executables=%s)" % " ".join(map(os.path.basename, executables))
# Add them to the inputFiles
if not inputs:
inputs = []
if not isinstance(inputs, list):
inputs = [inputs]
inputs += executables

# Dependencies that have to be embedded along with the executable
xrslInputs = ""
if inputs:
if not isinstance(inputs, list):
inputs = [inputs]
for inputFile in inputs:
xrslInputs += '(%s "%s")' % (os.path.basename(inputFile), inputFile)
executables = []
for inputFile in inputs:
inputFileBaseName = os.path.basename(inputFile)
if os.access(inputFile, os.X_OK):
# Files that would need execution rights on the remote worker node
executables.append(inputFileBaseName)
xrslInputs += '(%s "%s")' % (inputFileBaseName, inputFile)

# Executables are added to the XRSL
xrslExecutables = ""
if executables:
xrslExecutables = "(executables=%s)" % " ".join(executables)

# Output files to retrieve once the execution is complete
xrslOutputs = '("%s.out" "") ("%s.err" "")' % (diracStamp, diracStamp)
if outputs:
if not isinstance(outputs, list):
outputs = [outputs]
for outputFile in outputs:
xrslOutputs += '(%s "")' % (outputFile)
for outputFile in outputs:
xrslOutputs += '(%s "")' % (outputFile)

xrsl = """
&(executable="%(executable)s")
Expand Down Expand Up @@ -247,6 +237,13 @@ def _writeXRSL(self, executableFile, inputs=None, outputs=None, executables=None
def _bundlePreamble(self, executableFile):
"""Bundle the preamble with the executable file"""
wrapperContent = "%s\n./%s" % (self.preamble, executableFile)

# We need to make sure the executable file can be executed by the wrapper
# By adding the execution mode to the file, the file will be processed as an "executable" in the XRSL
# This is done in _writeXRSL()
if not os.access(executableFile, os.X_OK):
os.chmod(executableFile, stat.S_IRWXU | stat.S_IRGRP | stat.S_IXGRP | stat.S_IROTH + stat.S_IXOTH)

return writeScript(wrapperContent, os.getcwd())

#############################################################################
Expand Down Expand Up @@ -299,13 +296,14 @@ def submitJob(self, executableFile, proxy, numberOfJobs=1, inputs=None, outputs=
return result
self.usercfg.ProxyPath(os.environ["X509_USER_PROXY"])

self.log.verbose("Executable file path: %s" % executableFile)
if not os.access(executableFile, 5):
os.chmod(executableFile, stat.S_IRWXU | stat.S_IRGRP | stat.S_IXGRP | stat.S_IROTH + stat.S_IXOTH)
if not inputs:
inputs = []
if not outputs:
outputs = []

executables = None
self.log.verbose("Executable file path: %s" % executableFile)
if self.preamble:
executables = [executableFile]
inputs.append(executableFile)
executableFile = self._bundlePreamble(executableFile)

batchIDList = []
Expand All @@ -325,7 +323,7 @@ def submitJob(self, executableFile, proxy, numberOfJobs=1, inputs=None, outputs=
# The basic job description
jobdescs = arc.JobDescriptionList()
# Get the job into the ARC way
xrslString, diracStamp = self._writeXRSL(executableFile, inputs, outputs, executables)
xrslString, diracStamp = self._writeXRSL(executableFile, inputs, outputs)
self.log.debug("XRSL string submitted : %s" % xrslString)
self.log.debug("DIRAC stamp for job : %s" % diracStamp)
# The arc bindings don't accept unicode objects in Python 2 so xrslString must be explicitly cast
Expand Down
147 changes: 91 additions & 56 deletions src/DIRAC/Resources/Computing/AREXComputingElement.py
Original file line number Diff line number Diff line change
Expand Up @@ -304,13 +304,12 @@ def _getDelegationID(self, arcJobID):

#############################################################################

def _getArcJobID(self, executableFile, inputs, outputs, executables, delegation):
def _getArcJobID(self, executableFile, inputs, outputs, delegation):
"""Get an ARC JobID endpoint to upload executables and inputs.
:param str executableFile: executable to submit
:param list inputs: list of input files
:param list outputs: list of expected output files
:param list executables: list of secondary executables (will be uploaded with the executable mode)
:param str delegation: delegation ID
:return: tuple containing a job ID and a stamp
Expand All @@ -320,7 +319,7 @@ def _getArcJobID(self, executableFile, inputs, outputs, executables, delegation)
query = self._urlJoin("jobs")

# Get the job into the ARC way
xrslString, diracStamp = self._writeXRSL(executableFile, inputs, outputs, executables)
xrslString, diracStamp = self._writeXRSL(executableFile, inputs, outputs)
xrslString += delegation
self.log.debug("XRSL string submitted", "is %s" % xrslString)
self.log.debug("DIRAC stamp for job", "is %s" % diracStamp)
Expand All @@ -344,21 +343,16 @@ def _getArcJobID(self, executableFile, inputs, outputs, executables, delegation)
arcJobID = responseJob["id"]
return S_OK((arcJobID, diracStamp))

def _uploadJobDependencies(self, arcJobID, executableFile, inputs, executables):
def _uploadJobDependencies(self, arcJobID, executableFile, inputs):
"""Upload job dependencies so that the job can start.
This includes the executables and the inputs.
:param str arcJobID: ARC job ID
:param str executableFile: executable file
:param list inputs: inputs required by the executable file
:param list executables: executables require by the executable file
"""
filesToSubmit = [executableFile]
filesToSubmit += executables
if inputs:
if not isinstance(inputs, list):
inputs = [inputs]
filesToSubmit += inputs
filesToSubmit += inputs

for fileToSubmit in filesToSubmit:
queryExecutable = self._urlJoin(os.path.join("jobs", arcJobID, "session", os.path.basename(fileToSubmit)))
Expand All @@ -376,32 +370,6 @@ def _uploadJobDependencies(self, arcJobID, executableFile, inputs, executables):
self.log.verbose("Input correctly uploaded", fileToSubmit)
return S_OK()

def _killJob(self, arcJobList):
"""Kill the specified jobs
:param list arcJobList: list of ARC Job IDs
"""
result = self._checkSession()
if not result["OK"]:
self.log.error("Cannot kill jobs", result["Message"])
return result

# List of jobs in json format for the REST query
jobsJson = {"job": [{"id": job} for job in arcJobList]}

# Prepare the command
params = {"action": "kill"}
query = self._urlJoin("jobs")

# Killing jobs should be fast
result = self._request("post", query, params=params, data=json.dumps(jobsJson))
if not result["OK"]:
self.log.error("Failed to kill all these jobs.", result["Message"])
return S_ERROR("Failed to kill all these jobs")

self.log.debug("Successfully deleted jobs")
return S_OK()

def submitJob(self, executableFile, proxy, numberOfJobs=1, inputs=None, outputs=None):
"""Method to submit job
Assume that the ARC queues are always of the format nordugrid-<batchSystem>-<queue>
Expand All @@ -423,10 +391,14 @@ def submitJob(self, executableFile, proxy, numberOfJobs=1, inputs=None, outputs=
else:
delegation = "\n(delegationid=%s)" % result["Value"]

if not inputs:
inputs = []
if not outputs:
outputs = []

# If there is a preamble, then we bundle it in an executable file
executables = []
if self.preamble:
executables = [executableFile]
inputs.append(executableFile)
executableFile = self._bundlePreamble(executableFile)

# Submit multiple jobs sequentially.
Expand All @@ -436,14 +408,14 @@ def submitJob(self, executableFile, proxy, numberOfJobs=1, inputs=None, outputs=
batchIDList = []
stampDict = {}
for _ in range(numberOfJobs):
result = self._getArcJobID(executableFile, inputs, outputs, executables, delegation)
result = self._getArcJobID(executableFile, inputs, outputs, delegation)
if not result["OK"]:
break
arcJobID, diracStamp = result["Value"]

# At this point, only the XRSL job has been submitted to AREX services
# Here we also upload the executable, other executable files and inputs.
result = self._uploadJobDependencies(arcJobID, executableFile, inputs, executables)
result = self._uploadJobDependencies(arcJobID, executableFile, inputs)
if not result["OK"]:
break

Expand All @@ -469,12 +441,83 @@ def killJob(self, jobIDList):
:param list jobIDList: list of DIRAC Job IDs
"""
if not isinstance(jobIDList, list):
jobIDList = [jobIDList]
self.log.debug("Killing jobs", ",".join(jobIDList))

# List of jobs in json format for the REST query
jList = [self._DiracToArcID(job) for job in jobIDList]
# Convert DIRAC jobs to ARC jobs
# DIRAC Jobs might be stored with a DIRAC stamp (":::XXXXX") that should be removed
jList = [self._DiracToArcID(job.split(":::")[0]) for job in jobIDList]
return self._killJob(jList)

def _killJob(self, arcJobList):
"""Kill the specified jobs
:param list arcJobList: list of ARC Job IDs
"""
result = self._checkSession()
if not result["OK"]:
self.log.error("Cannot kill jobs", result["Message"])
return result

# List of jobs in json format for the REST query
jobsJson = {"job": [{"id": job} for job in arcJobList]}

# Prepare the command
params = {"action": "kill"}
query = self._urlJoin("jobs")

# Killing jobs should be fast
result = self._request("post", query, params=params, data=json.dumps(jobsJson))
if not result["OK"]:
self.log.error("Failed to kill all these jobs.", result["Message"])
return S_ERROR("Failed to kill all these jobs")

self.log.debug("Successfully deleted jobs")
return S_OK()

#############################################################################

def cleanJob(self, jobIDList):
"""Clean files related to the specified jobs
:param list jobIDList: list of DIRAC Job IDs
"""
if not isinstance(jobIDList, list):
jobIDList = [jobIDList]
self.log.debug("Cleaning jobs", ",".join(jobIDList))

# Convert DIRAC jobs to ARC jobs
# DIRAC Jobs might be stored with a DIRAC stamp (":::XXXXX") that should be removed
jList = [self._DiracToArcID(job.split(":::")[0]) for job in jobIDList]
return self._cleanJob(jList)

def _cleanJob(self, arcJobList):
"""Clean files related to the specified jobs
:param list jobIDList: list of ARC Job IDs
"""
result = self._checkSession()
if not result["OK"]:
self.log.error("Cannot clean jobs", result["Message"])
return result

# List of jobs in json format for the REST query
jobsJson = {"job": [{"id": job} for job in arcJobList]}

# Prepare the command
params = {"action": "clean"}
query = self._urlJoin("jobs")

# Cleaning jobs
result = self._request("post", query, params=params, data=json.dumps(jobsJson))
if not result["OK"]:
self.log.error("Failed to clean all these jobs.", result["Message"])
return S_ERROR("Failed to clean all these jobs")

self.log.debug("Successfully cleaned jobs")
return S_OK()

#############################################################################

def getCEStatus(self):
Expand Down Expand Up @@ -613,14 +656,10 @@ def getJobStatus(self, jobIDList):
if not isinstance(jobIDList, list):
jobIDList = [jobIDList]

# Jobs are stored with a DIRAC stamp (":::XXXXX") appended
jobList = []
for j in jobIDList:
job = j.split(":::")[0]
jobList.append(job)

self.log.debug("Getting status of jobs : %s" % jobList)
arcJobsJson = {"job": [{"id": self._DiracToArcID(job)} for job in jobList]}
self.log.debug("Getting status of jobs:", jobIDList)
# Convert DIRAC jobs to ARC jobs and encapsulate them in a dictionary for the REST query
# DIRAC Jobs might be stored with a DIRAC stamp (":::XXXXX") that should be removed
arcJobsJson = {"job": [{"id": self._DiracToArcID(job.split(":::")[0])} for job in jobIDList]}

# Prepare the command
params = {"action": "status"}
Expand Down Expand Up @@ -688,12 +727,8 @@ def getJobLog(self, jobID):
self.log.error("Cannot get job logging info", result["Message"])
return result

# Extract stamp from the Job ID
if ":::" in jobID:
jobID = jobID.split(":::")[0]

# Prepare the command: Get output files
arcJob = self._DiracToArcID(jobID)
arcJob = self._DiracToArcID(jobID.split(":::")[0])
query = self._urlJoin(os.path.join("jobs", arcJob, "diagnose", "errors"))

# Submit the GET request to retrieve outputs
Expand Down Expand Up @@ -759,9 +794,9 @@ def getJobOutput(self, jobID, workingDirectory=None):
remoteOutputs = result["Value"]
self.log.debug("Outputs to get are", remoteOutputs)

# We assume that workingDirectory exists
if not workingDirectory:
if "WorkingDirectory" in self.ceParameters:
# We assume that workingDirectory exists
workingDirectory = os.path.join(self.ceParameters["WorkingDirectory"], job)
else:
workingDirectory = job
Expand Down
Loading

0 comments on commit b65cb3d

Please sign in to comment.