Skip to content

Commit

Permalink
sweep: DIRACGrid#6970 fix the interactions between the Matcher and th…
Browse files Browse the repository at this point in the history
…e PoolCE
  • Loading branch information
aldbr authored and web-flow committed Jun 20, 2023
1 parent 146a9ce commit eb89407
Show file tree
Hide file tree
Showing 13 changed files with 610 additions and 535 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
----------------
ComputingElement
----------------

The full code documentation is available here :py:class:`~DIRAC.Resources.Computing.ComputingElement`
Original file line number Diff line number Diff line change
Expand Up @@ -16,5 +16,6 @@ DIRAC applications
:maxdepth: 2

Catalog
Computing
MessageQueues/index
Storage
11 changes: 11 additions & 0 deletions src/DIRAC/Resources/Computing/ComputingElement.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,17 @@
The ComputingElement objects are usually instantiated with the help of
ComputingElementFactory.
The ComputingElement class can be considered abstract. 3 kinds of abstract ComputingElements
can be distinguished from it:
- Remote ComputingElement: includes methods to interact with a remote ComputingElement
(e.g. HtCondorCEComputingElement, ARCComputingElement).
- Inner ComputingElement: includes methods to locally interact with an underlying worker node.
It is worth noting that an Inner ComputingElement provides synchronous submission
(the submission of a job is blocking the execution until its completion). It deals with one job at a time.
- Inner Pool ComputingElement: includes methods to locally interact with Inner ComputingElements asynchronously.
It can manage a pool of jobs running simultaneously.
"""

import os
Expand Down
46 changes: 22 additions & 24 deletions src/DIRAC/Resources/Computing/InProcessComputingElement.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@


class InProcessComputingElement(ComputingElement):
#############################################################################
def __init__(self, ceUniqueID):
"""Standard constructor."""
super().__init__(ceUniqueID)
Expand All @@ -25,14 +24,14 @@ def __init__(self, ceUniqueID):
self.processors = int(self.ceParameters.get("NumberOfProcessors", 1))
self.ceParameters["MaxTotalJobs"] = 1

#############################################################################
def submitJob(self, executableFile, proxy=None, inputs=None, **kwargs):
"""Method to submit job (overriding base method).
:param str executableFile: file to execute via systemCall.
Normally the JobWrapperTemplate when invoked by the JobAgent.
:param str proxy: the proxy used for running the job (the payload). It will be dumped to a file.
:param list inputs: dependencies of executableFile
:return: S_OK(payload exit code) / S_ERROR() if submission issue
"""
payloadEnv = dict(os.environ)
payloadProxy = ""
Expand Down Expand Up @@ -79,34 +78,33 @@ def submitJob(self, executableFile, proxy=None, inputs=None, **kwargs):
for inputFile in inputs:
os.unlink(inputFile)

ret = S_OK()

# Submission issue
if not result["OK"]:
self.log.error("Fail to run InProcess", result["Message"])
elif result["Value"][0] > 128:
res = S_ERROR()
# negative exit values are returned as 256 - exit
res["Value"] = result["Value"][0] - 256 # yes, it's "correct"
self.log.warn("InProcess Job Execution Failed")
self.log.info("Exit status:", result["Value"])
if res["Value"] == -2:
error = "JobWrapper initialization error"
elif res["Value"] == -1:
error = "JobWrapper execution error"
return S_ERROR(f"Failed to run InProcess: {result['Message']}")

retCode = result["Value"][0]
# Submission issue
if retCode > 128:
# Negative exit values are returned as 256 - exit
retCodeSubmission = retCode - 256 # yes, it's "correct"
self.log.warn("Job Execution Failed")
self.log.info("Exit status:", retCode)
if retCodeSubmission == -2:
errorMessage = "JobWrapper initialization error"
elif retCodeSubmission == -1:
errorMessage = "JobWrapper execution error"
else:
error = "InProcess Job Execution Failed"
res["Message"] = error
return res
elif result["Value"][0] > 0:
errorMessage = "Job Execution Failed"
return S_ERROR(errorMessage)

# Submission ok but payload failed
if retCode:
self.log.warn("Fail in payload execution")
self.log.info("Exit status:", result["Value"][0])
ret["PayloadFailed"] = result["Value"][0]
else:
self.log.debug("InProcess CE result OK")

return ret
self.log.info("Exit status:", retCode)
return S_OK(retCode)

#############################################################################
def getCEStatus(self):
"""Method to return information on running and waiting jobs,
as well as number of available processors
Expand Down
34 changes: 19 additions & 15 deletions src/DIRAC/Resources/Computing/PoolComputingElement.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
**Code Documentation**
"""
import functools
import os
import concurrent.futures

Expand All @@ -28,7 +29,6 @@
from DIRAC.Resources.Computing.ComputingElement import ComputingElement

from DIRAC.Resources.Computing.InProcessComputingElement import InProcessComputingElement
from DIRAC.Resources.Computing.SudoComputingElement import SudoComputingElement
from DIRAC.Resources.Computing.SingularityComputingElement import SingularityComputingElement

# Number of unix users to run job payloads with sudo
Expand All @@ -47,12 +47,7 @@ def executeJob(executableFile, proxy, taskID, inputs, **kwargs):

innerCESubmissionType = kwargs.pop("InnerCESubmissionType")

if innerCESubmissionType == "Sudo":
ce = SudoComputingElement("Task-" + str(taskID))
payloadUser = kwargs.get("PayloadUser")
if payloadUser:
ce.setParameters({"PayloadUser": payloadUser})
elif innerCESubmissionType == "Singularity":
if innerCESubmissionType == "Singularity":
ce = SingularityComputingElement("Task-" + str(taskID))
else:
ce = InProcessComputingElement("Task-" + str(taskID))
Expand Down Expand Up @@ -85,6 +80,9 @@ def _reset(self):

self.processors = int(self.ceParameters.get("NumberOfProcessors", self.processors))
self.ceParameters["MaxTotalJobs"] = self.processors
# Indicates that the submission is done asynchronously
# The result is not immediately available
self.ceParameters["AsyncSubmission"] = True
self.innerCESubmissionType = self.ceParameters.get("InnerCESubmissionType", self.innerCESubmissionType)
return S_OK()

Expand All @@ -107,15 +105,18 @@ def submitJob(self, executableFile, proxy=None, inputs=None, **kwargs):
:param str proxy: payload proxy
:param list inputs: dependencies of executableFile
:return: S_OK/S_ERROR of the result of the job submission
:return: S_OK always. The result of the submission should be included in taskResults
"""

if self.pPool is None:
self.pPool = concurrent.futures.ProcessPoolExecutor(max_workers=self.processors)

processorsForJob = self._getProcessorsForJobs(kwargs)
if not processorsForJob:
return S_ERROR("Not enough processors for the job")
self.taskResults[self.taskID] = S_ERROR("Not enough processors for the job")
taskID = self.taskID
self.taskID += 1
return S_OK(taskID)

# Now persisting the job limits for later use in pilot.cfg file (pilot 3 default)
cd = ConfigurationData(loadDefaultCFG=False)
Expand All @@ -141,12 +142,15 @@ def submitJob(self, executableFile, proxy=None, inputs=None, **kwargs):
if "USER" in os.environ:
taskKwargs["PayloadUser"] = os.environ["USER"] + f"p{str(nUser).zfill(2)}"

# Submission
future = self.pPool.submit(executeJob, executableFile, proxy, self.taskID, inputs, **taskKwargs)
self.processorsPerTask[future] = processorsForJob
future.add_done_callback(functools.partial(self.finalizeJob, self.taskID))

taskID = self.taskID
self.taskID += 1
future.add_done_callback(self.finalizeJob)

return S_OK() # returning S_OK as callback will do the rest
return S_OK(taskID) # returning S_OK as callback will do the rest

def _getProcessorsForJobs(self, kwargs):
"""helper function"""
Expand Down Expand Up @@ -187,7 +191,7 @@ def _getProcessorsForJobs(self, kwargs):

return requestedProcessors

def finalizeJob(self, future):
def finalizeJob(self, taskID, future):
"""Finalize the job by updating the process utilisation counters
:param future: evaluating the future result
Expand All @@ -196,10 +200,10 @@ def finalizeJob(self, future):

result = future.result() # This would be the result of the e.g. InProcess.submitJob()
if result["OK"]:
self.log.info("Task %s finished successfully, %d processor(s) freed" % (future, nProc))
self.log.info("Task finished successfully:", f"{taskID}; {nProc} processor(s) freed")
else:
self.log.error("Task failed submission", f"{future}, message: {result['Message']}")
self.taskResults[future] = result
self.log.error("Task failed submission:", f"{taskID}; message: {result['Message']}")
self.taskResults[taskID] = result

def getCEStatus(self):
"""Method to return information on running and waiting jobs,
Expand Down
34 changes: 9 additions & 25 deletions src/DIRAC/Resources/Computing/SingularityComputingElement.py
Original file line number Diff line number Diff line change
Expand Up @@ -218,17 +218,13 @@ def __createWorkArea(self, jobDesc=None, log=None, logLevel="INFO", proxy=None):
os.mkdir(self.__workdir)
except OSError:
if not os.path.isdir(self.__workdir):
result = S_ERROR(f"Failed to create container base directory '{self.__workdir}'")
result["ReschedulePayload"] = True
return result
return S_ERROR(f"Failed to create container base directory '{self.__workdir}'")
# Otherwise, directory probably just already exists...
baseDir = None
try:
baseDir = tempfile.mkdtemp(prefix=f"job{jobDesc.get('jobID', 0)}_", dir=self.__workdir)
except OSError:
result = S_ERROR(f"Failed to create container work directory in '{self.__workdir}'")
result["ReschedulePayload"] = True
return result
return S_ERROR(f"Failed to create container work directory in '{self.__workdir}'")

self.log.debug(f"Use singularity workarea: {baseDir}")
for subdir in ["home", "tmp", "var_tmp"]:
Expand Down Expand Up @@ -259,7 +255,6 @@ def __createWorkArea(self, jobDesc=None, log=None, logLevel="INFO", proxy=None):
extraOptions="" if self.__installDIRACInContainer else "/tmp/pilot.cfg",
)
if not result["OK"]:
result["ReschedulePayload"] = True
return result
wrapperPath = result["Value"]

Expand Down Expand Up @@ -334,30 +329,23 @@ def __checkResult(tmpDir):
retCode = int(fp.read())
except (OSError, ValueError):
# Something failed while trying to get the return code
result = S_ERROR("Failed to get return code from inner wrapper")
result["ReschedulePayload"] = True
return result
return S_ERROR("Failed to get return code from inner wrapper")

result = S_OK()
if retCode:
# This is the one case where we don't reschedule:
# An actual failure of the inner payload for some reason
result = S_ERROR("Command failed with exit code %d" % retCode)
return result
return S_OK(retCode)

def submitJob(self, executableFile, proxy=None, **kwargs):
"""Start a container for a job.
executableFile is ignored. A new wrapper suitable for running in a
container is created from jobDesc.
:return: S_OK(payload exit code) / S_ERROR() if submission issue
"""
rootImage = self.__root

# Check that singularity is available
if not self.__hasSingularity():
self.log.error("Singularity is not installed on PATH.")
result = S_ERROR("Failed to find singularity ")
result["ReschedulePayload"] = True
return result
return S_ERROR("Failed to find singularity")

self.log.info("Creating singularity container")

Expand Down Expand Up @@ -448,9 +436,7 @@ def submitJob(self, executableFile, proxy=None, **kwargs):
else:
# if we are here is because there's no image, or it is not accessible (e.g. not on CVMFS)
self.log.error("Singularity image to exec not found: ", rootImage)
result = S_ERROR("Failed to find singularity image to exec")
result["ReschedulePayload"] = True
return result
return S_ERROR("Failed to find singularity image to exec")

self.log.debug(f"Execute singularity command: {cmd}")
self.log.debug(f"Execute singularity env: {self.__getEnv()}")
Expand All @@ -463,9 +449,7 @@ def submitJob(self, executableFile, proxy=None, **kwargs):
if proxy and renewTask:
gThreadScheduler.removeTask(renewTask)
self.__deleteWorkArea(baseDir)
result = S_ERROR("Error running singularity command")
result["ReschedulePayload"] = True
return result
return S_ERROR("Error running singularity command")

result = self.__checkResult(tmpDir)
if proxy and renewTask:
Expand Down
Loading

0 comments on commit eb89407

Please sign in to comment.