Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[8.0] fix the interactions between the Matcher and the PoolCE #6970

Merged
merged 5 commits into from
Jun 20, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
----------------
ComputingElement
----------------

The full code documentation is available here :py:class:`~DIRAC.Resources.Computing.ComputingElement`
Original file line number Diff line number Diff line change
Expand Up @@ -16,5 +16,6 @@ DIRAC applications
:maxdepth: 2

Catalog
Computing
MessageQueues/index
Storage
11 changes: 11 additions & 0 deletions src/DIRAC/Resources/Computing/ComputingElement.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,17 @@

The ComputingElement objects are usually instantiated with the help of
ComputingElementFactory.

The ComputingElement class can be considered abstract. 3 kinds of abstract ComputingElements
can be distinguished from it:

- Remote ComputingElement: includes methods to interact with a remote ComputingElement
(e.g. HtCondorCEComputingElement, ARCComputingElement).
- Inner ComputingElement: includes methods to locally interact with an underlying worker node.
It is worth noting that an Inner ComputingElement provides synchronous submission
(the submission of a job is blocking the execution until its completion). It deals with one job at a time.
- Inner Pool ComputingElement: includes methods to locally interact with Inner ComputingElements asynchronously.
It can manage a pool of jobs running simultaneously.
"""

import os
Expand Down
46 changes: 22 additions & 24 deletions src/DIRAC/Resources/Computing/InProcessComputingElement.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@


class InProcessComputingElement(ComputingElement):
#############################################################################
def __init__(self, ceUniqueID):
"""Standard constructor."""
super().__init__(ceUniqueID)
Expand All @@ -25,14 +24,14 @@ def __init__(self, ceUniqueID):
self.processors = int(self.ceParameters.get("NumberOfProcessors", 1))
self.ceParameters["MaxTotalJobs"] = 1

#############################################################################
def submitJob(self, executableFile, proxy=None, inputs=None, **kwargs):
"""Method to submit job (overriding base method).

:param str executableFile: file to execute via systemCall.
Normally the JobWrapperTemplate when invoked by the JobAgent.
:param str proxy: the proxy used for running the job (the payload). It will be dumped to a file.
:param list inputs: dependencies of executableFile
:return: S_OK(payload exit code) / S_ERROR() if submission issue
"""
payloadEnv = dict(os.environ)
payloadProxy = ""
Expand Down Expand Up @@ -79,34 +78,33 @@ def submitJob(self, executableFile, proxy=None, inputs=None, **kwargs):
for inputFile in inputs:
os.unlink(inputFile)

ret = S_OK()

# Submission issue
if not result["OK"]:
self.log.error("Fail to run InProcess", result["Message"])
elif result["Value"][0] > 128:
res = S_ERROR()
# negative exit values are returned as 256 - exit
res["Value"] = result["Value"][0] - 256 # yes, it's "correct"
self.log.warn("InProcess Job Execution Failed")
self.log.info("Exit status:", result["Value"])
if res["Value"] == -2:
error = "JobWrapper initialization error"
elif res["Value"] == -1:
error = "JobWrapper execution error"
return S_ERROR(f"Failed to run InProcess: {result['Message']}")

retCode = result["Value"][0]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The elif was meaningful.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you explain why? I am not sure to understand.
Now we return S_ERROR when result["OK"] is False.

# Submission issue
if retCode > 128:
# Negative exit values are returned as 256 - exit
retCodeSubmission = retCode - 256 # yes, it's "correct"
self.log.warn("Job Execution Failed")
self.log.info("Exit status:", retCode)
if retCodeSubmission == -2:
errorMessage = "JobWrapper initialization error"
elif retCodeSubmission == -1:
errorMessage = "JobWrapper execution error"
else:
error = "InProcess Job Execution Failed"
res["Message"] = error
return res
elif result["Value"][0] > 0:
errorMessage = "Job Execution Failed"
return S_ERROR(errorMessage)

# Submission ok but payload failed
if retCode:
self.log.warn("Fail in payload execution")
self.log.info("Exit status:", result["Value"][0])
ret["PayloadFailed"] = result["Value"][0]
else:
self.log.debug("InProcess CE result OK")

return ret
self.log.info("Exit status:", retCode)
return S_OK(retCode)

#############################################################################
def getCEStatus(self):
"""Method to return information on running and waiting jobs,
as well as number of available processors
Expand Down
34 changes: 19 additions & 15 deletions src/DIRAC/Resources/Computing/PoolComputingElement.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

**Code Documentation**
"""
import functools
import os
import concurrent.futures

Expand All @@ -28,7 +29,6 @@
from DIRAC.Resources.Computing.ComputingElement import ComputingElement

from DIRAC.Resources.Computing.InProcessComputingElement import InProcessComputingElement
from DIRAC.Resources.Computing.SudoComputingElement import SudoComputingElement
from DIRAC.Resources.Computing.SingularityComputingElement import SingularityComputingElement

# Number of unix users to run job payloads with sudo
Expand All @@ -47,12 +47,7 @@ def executeJob(executableFile, proxy, taskID, inputs, **kwargs):

innerCESubmissionType = kwargs.pop("InnerCESubmissionType")

if innerCESubmissionType == "Sudo":
ce = SudoComputingElement("Task-" + str(taskID))
payloadUser = kwargs.get("PayloadUser")
if payloadUser:
ce.setParameters({"PayloadUser": payloadUser})
elif innerCESubmissionType == "Singularity":
if innerCESubmissionType == "Singularity":
ce = SingularityComputingElement("Task-" + str(taskID))
else:
ce = InProcessComputingElement("Task-" + str(taskID))
Expand Down Expand Up @@ -85,6 +80,9 @@ def _reset(self):

self.processors = int(self.ceParameters.get("NumberOfProcessors", self.processors))
self.ceParameters["MaxTotalJobs"] = self.processors
# Indicates that the submission is done asynchronously
# The result is not immediately available
self.ceParameters["AsyncSubmission"] = True
self.innerCESubmissionType = self.ceParameters.get("InnerCESubmissionType", self.innerCESubmissionType)
return S_OK()

Expand All @@ -107,15 +105,18 @@ def submitJob(self, executableFile, proxy=None, inputs=None, **kwargs):
:param str proxy: payload proxy
:param list inputs: dependencies of executableFile

:return: S_OK/S_ERROR of the result of the job submission
:return: S_OK always. The result of the submission should be included in taskResults
"""

if self.pPool is None:
self.pPool = concurrent.futures.ProcessPoolExecutor(max_workers=self.processors)

processorsForJob = self._getProcessorsForJobs(kwargs)
if not processorsForJob:
return S_ERROR("Not enough processors for the job")
self.taskResults[self.taskID] = S_ERROR("Not enough processors for the job")
taskID = self.taskID
self.taskID += 1
return S_OK(taskID)

# Now persisting the job limits for later use in pilot.cfg file (pilot 3 default)
cd = ConfigurationData(loadDefaultCFG=False)
Expand All @@ -141,12 +142,15 @@ def submitJob(self, executableFile, proxy=None, inputs=None, **kwargs):
if "USER" in os.environ:
taskKwargs["PayloadUser"] = os.environ["USER"] + f"p{str(nUser).zfill(2)}"

# Submission
future = self.pPool.submit(executeJob, executableFile, proxy, self.taskID, inputs, **taskKwargs)
self.processorsPerTask[future] = processorsForJob
future.add_done_callback(functools.partial(self.finalizeJob, self.taskID))

taskID = self.taskID
self.taskID += 1
future.add_done_callback(self.finalizeJob)

return S_OK() # returning S_OK as callback will do the rest
return S_OK(taskID) # returning S_OK as callback will do the rest

def _getProcessorsForJobs(self, kwargs):
"""helper function"""
Expand Down Expand Up @@ -187,7 +191,7 @@ def _getProcessorsForJobs(self, kwargs):

return requestedProcessors

def finalizeJob(self, future):
def finalizeJob(self, taskID, future):
"""Finalize the job by updating the process utilisation counters

:param future: evaluating the future result
Expand All @@ -196,10 +200,10 @@ def finalizeJob(self, future):

result = future.result() # This would be the result of the e.g. InProcess.submitJob()
if result["OK"]:
self.log.info("Task %s finished successfully, %d processor(s) freed" % (future, nProc))
self.log.info("Task finished successfully:", f"{taskID}; {nProc} processor(s) freed")
else:
self.log.error("Task failed submission", f"{future}, message: {result['Message']}")
self.taskResults[future] = result
self.log.error("Task failed submission:", f"{taskID}; message: {result['Message']}")
self.taskResults[taskID] = result

def getCEStatus(self):
"""Method to return information on running and waiting jobs,
Expand Down
34 changes: 9 additions & 25 deletions src/DIRAC/Resources/Computing/SingularityComputingElement.py
Original file line number Diff line number Diff line change
Expand Up @@ -218,17 +218,13 @@ def __createWorkArea(self, jobDesc=None, log=None, logLevel="INFO", proxy=None):
os.mkdir(self.__workdir)
except OSError:
if not os.path.isdir(self.__workdir):
result = S_ERROR(f"Failed to create container base directory '{self.__workdir}'")
result["ReschedulePayload"] = True
return result
return S_ERROR(f"Failed to create container base directory '{self.__workdir}'")
# Otherwise, directory probably just already exists...
baseDir = None
try:
baseDir = tempfile.mkdtemp(prefix=f"job{jobDesc.get('jobID', 0)}_", dir=self.__workdir)
except OSError:
result = S_ERROR(f"Failed to create container work directory in '{self.__workdir}'")
result["ReschedulePayload"] = True
return result
return S_ERROR(f"Failed to create container work directory in '{self.__workdir}'")

self.log.debug(f"Use singularity workarea: {baseDir}")
for subdir in ["home", "tmp", "var_tmp"]:
Expand Down Expand Up @@ -259,7 +255,6 @@ def __createWorkArea(self, jobDesc=None, log=None, logLevel="INFO", proxy=None):
extraOptions="" if self.__installDIRACInContainer else "/tmp/pilot.cfg",
)
if not result["OK"]:
result["ReschedulePayload"] = True
return result
wrapperPath = result["Value"]

Expand Down Expand Up @@ -334,30 +329,23 @@ def __checkResult(tmpDir):
retCode = int(fp.read())
except (OSError, ValueError):
# Something failed while trying to get the return code
result = S_ERROR("Failed to get return code from inner wrapper")
result["ReschedulePayload"] = True
return result
return S_ERROR("Failed to get return code from inner wrapper")

result = S_OK()
if retCode:
# This is the one case where we don't reschedule:
# An actual failure of the inner payload for some reason
result = S_ERROR("Command failed with exit code %d" % retCode)
return result
return S_OK(retCode)

def submitJob(self, executableFile, proxy=None, **kwargs):
"""Start a container for a job.
executableFile is ignored. A new wrapper suitable for running in a
container is created from jobDesc.

:return: S_OK(payload exit code) / S_ERROR() if submission issue
"""
rootImage = self.__root

# Check that singularity is available
if not self.__hasSingularity():
self.log.error("Singularity is not installed on PATH.")
result = S_ERROR("Failed to find singularity ")
result["ReschedulePayload"] = True
return result
return S_ERROR("Failed to find singularity")

self.log.info("Creating singularity container")

Expand Down Expand Up @@ -448,9 +436,7 @@ def submitJob(self, executableFile, proxy=None, **kwargs):
else:
# if we are here is because there's no image, or it is not accessible (e.g. not on CVMFS)
self.log.error("Singularity image to exec not found: ", rootImage)
result = S_ERROR("Failed to find singularity image to exec")
result["ReschedulePayload"] = True
return result
return S_ERROR("Failed to find singularity image to exec")

self.log.debug(f"Execute singularity command: {cmd}")
self.log.debug(f"Execute singularity env: {self.__getEnv()}")
Expand All @@ -463,9 +449,7 @@ def submitJob(self, executableFile, proxy=None, **kwargs):
if proxy and renewTask:
gThreadScheduler.removeTask(renewTask)
self.__deleteWorkArea(baseDir)
result = S_ERROR("Error running singularity command")
result["ReschedulePayload"] = True
return result
return S_ERROR("Error running singularity command")

result = self.__checkResult(tmpDir)
if proxy and renewTask:
Expand Down
Loading