Skip to content

Commit

Permalink
fix: management of the submission status in JobAgent
Browse files Browse the repository at this point in the history
  • Loading branch information
aldbr committed Apr 18, 2023
1 parent c592bfd commit 4044167
Show file tree
Hide file tree
Showing 7 changed files with 188 additions and 164 deletions.
50 changes: 27 additions & 23 deletions src/DIRAC/Resources/Computing/InProcessComputingElement.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@


class InProcessComputingElement(ComputingElement):
#############################################################################
def __init__(self, ceUniqueID):
"""Standard constructor."""
super().__init__(ceUniqueID)
Expand All @@ -25,7 +24,10 @@ def __init__(self, ceUniqueID):
self.processors = int(self.ceParameters.get("NumberOfProcessors", 1))
self.ceParameters["MaxTotalJobs"] = 1

#############################################################################
# Indicates that the submission is done synchronously
# The result is immediately available
self.ceParameters["AsyncSubmission"] = False

def submitJob(self, executableFile, proxy=None, inputs=None, **kwargs):
"""Method to submit job (overriding base method).
Expand Down Expand Up @@ -79,34 +81,36 @@ def submitJob(self, executableFile, proxy=None, inputs=None, **kwargs):
for inputFile in inputs:
os.unlink(inputFile)

ret = S_OK()

# Submission issue
if not result["OK"]:
self.log.error("Fail to run InProcess", result["Message"])
elif result["Value"][0] > 128:
res = S_ERROR()
# negative exit values are returned as 256 - exit
res["Value"] = result["Value"][0] - 256 # yes, it's "correct"
return S_ERROR(f"Failed to run InProcess: {result['Message']}")

retCode = result["Value"][0]
# Submission issue
if retCode > 128:
# Negative exit values are returned as 256 - exit
retCodeSubmission = retCode - 256 # yes, it's "correct"
self.log.warn("InProcess Job Execution Failed")
self.log.info("Exit status:", result["Value"])
if res["Value"] == -2:
error = "JobWrapper initialization error"
elif res["Value"] == -1:
error = "JobWrapper execution error"
self.log.info("Exit status:", retCode)
if retCodeSubmission == -2:
errorMessage = "JobWrapper initialization error"
elif retCodeSubmission == -1:
errorMessage = "JobWrapper execution error"
else:
error = "InProcess Job Execution Failed"
res["Message"] = error
return res
elif result["Value"][0] > 0:
errorMessage = "InProcess Job Execution Failed"
return S_ERROR(errorMessage)

# Submission ok but payload failed
result = S_OK()
if retCode > 0:
self.log.warn("Fail in payload execution")
self.log.info("Exit status:", result["Value"][0])
ret["PayloadFailed"] = result["Value"][0]
else:
self.log.debug("InProcess CE result OK")
self.log.info("Exit status:", retCode)
result["PayloadFailed"] = retCode

return ret
self.log.debug("InProcess CE result OK")
return result

#############################################################################
def getCEStatus(self):
"""Method to return information on running and waiting jobs,
as well as number of available processors
Expand Down
17 changes: 10 additions & 7 deletions src/DIRAC/Resources/Computing/PoolComputingElement.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
**Code Documentation**
"""
import functools
import os
import concurrent.futures

Expand Down Expand Up @@ -141,12 +142,14 @@ def submitJob(self, executableFile, proxy=None, inputs=None, **kwargs):
if "USER" in os.environ:
taskKwargs["PayloadUser"] = os.environ["USER"] + f"p{str(nUser).zfill(2)}"

future = self.pPool.submit(executeJob, executableFile, proxy, self.taskID, inputs, **taskKwargs)
self.processorsPerTask[future] = processorsForJob
taskID = self.taskID
self.taskID += 1
future.add_done_callback(self.finalizeJob)

return S_OK() # returning S_OK as callback will do the rest
future = self.pPool.submit(executeJob, executableFile, proxy, taskID, inputs, **taskKwargs)
self.processorsPerTask[future] = processorsForJob
future.add_done_callback(functools.partial(self.finalizeJob, taskID))

return S_OK(taskID) # returning S_OK as callback will do the rest

def _getProcessorsForJobs(self, kwargs):
"""helper function"""
Expand Down Expand Up @@ -187,7 +190,7 @@ def _getProcessorsForJobs(self, kwargs):

return requestedProcessors

def finalizeJob(self, future):
def finalizeJob(self, taskID, future):
"""Finalize the job by updating the process utilisation counters
:param future: evaluating the future result
Expand All @@ -196,10 +199,10 @@ def finalizeJob(self, future):

result = future.result() # This would be the result of the e.g. InProcess.submitJob()
if result["OK"]:
self.log.info("Task %s finished successfully, %d processor(s) freed" % (future, nProc))
self.log.info("Task finished successfully", f"{nProc} processor(s) freed for {future}")
else:
self.log.error("Task failed submission", f"{future}, message: {result['Message']}")
self.taskResults[future] = result
self.taskResults[taskID] = result

def getCEStatus(self):
"""Method to return information on running and waiting jobs,
Expand Down
31 changes: 11 additions & 20 deletions src/DIRAC/Resources/Computing/SingularityComputingElement.py
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,10 @@ def __init__(self, ceUniqueID):

self.processors = int(self.ceParameters.get("NumberOfProcessors", 1))

# Indicates that the submission is done synchronously
# The result is immediately available
self.ceParameters["AsyncSubmission"] = False

def __hasUserNS(self):
"""Detect if this node has user namespaces enabled.
Returns True if they are enabled, False otherwise.
Expand Down Expand Up @@ -218,17 +222,13 @@ def __createWorkArea(self, jobDesc=None, log=None, logLevel="INFO", proxy=None):
os.mkdir(self.__workdir)
except OSError:
if not os.path.isdir(self.__workdir):
result = S_ERROR(f"Failed to create container base directory '{self.__workdir}'")
result["ReschedulePayload"] = True
return result
return S_ERROR(f"Failed to create container base directory '{self.__workdir}'")
# Otherwise, directory probably just already exists...
baseDir = None
try:
baseDir = tempfile.mkdtemp(prefix=f"job{jobDesc.get('jobID', 0)}_", dir=self.__workdir)
except OSError:
result = S_ERROR(f"Failed to create container work directory in '{self.__workdir}'")
result["ReschedulePayload"] = True
return result
return S_ERROR(f"Failed to create container work directory in '{self.__workdir}'")

self.log.debug(f"Use singularity workarea: {baseDir}")
for subdir in ["home", "tmp", "var_tmp"]:
Expand Down Expand Up @@ -259,7 +259,6 @@ def __createWorkArea(self, jobDesc=None, log=None, logLevel="INFO", proxy=None):
extraOptions="" if self.__installDIRACInContainer else "/tmp/pilot.cfg",
)
if not result["OK"]:
result["ReschedulePayload"] = True
return result
wrapperPath = result["Value"]

Expand Down Expand Up @@ -334,15 +333,13 @@ def __checkResult(tmpDir):
retCode = int(fp.read())
except (OSError, ValueError):
# Something failed while trying to get the return code
result = S_ERROR("Failed to get return code from inner wrapper")
result["ReschedulePayload"] = True
return result
return S_ERROR("Failed to get return code from inner wrapper")

result = S_OK()
if retCode:
# This is the one case where we don't reschedule:
# An actual failure of the inner payload for some reason
result = S_ERROR("Command failed with exit code %d" % retCode)
result["PayloadFailed"] = retCode
return result

def submitJob(self, executableFile, proxy=None, **kwargs):
Expand All @@ -355,9 +352,7 @@ def submitJob(self, executableFile, proxy=None, **kwargs):
# Check that singularity is available
if not self.__hasSingularity():
self.log.error("Singularity is not installed on PATH.")
result = S_ERROR("Failed to find singularity ")
result["ReschedulePayload"] = True
return result
return S_ERROR("Failed to find singularity ")

self.log.info("Creating singularity container")

Expand Down Expand Up @@ -448,9 +443,7 @@ def submitJob(self, executableFile, proxy=None, **kwargs):
else:
# if we are here is because there's no image, or it is not accessible (e.g. not on CVMFS)
self.log.error("Singularity image to exec not found: ", rootImage)
result = S_ERROR("Failed to find singularity image to exec")
result["ReschedulePayload"] = True
return result
return S_ERROR("Failed to find singularity image to exec")

self.log.debug(f"Execute singularity command: {cmd}")
self.log.debug(f"Execute singularity env: {self.__getEnv()}")
Expand All @@ -463,9 +456,7 @@ def submitJob(self, executableFile, proxy=None, **kwargs):
if proxy and renewTask:
gThreadScheduler.removeTask(renewTask)
self.__deleteWorkArea(baseDir)
result = S_ERROR("Error running singularity command")
result["ReschedulePayload"] = True
return result
return S_ERROR("Error running singularity command")

result = self.__checkResult(tmpDir)
if proxy and renewTask:
Expand Down
4 changes: 4 additions & 0 deletions src/DIRAC/Resources/Computing/SudoComputingElement.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,10 @@ def __init__(self, ceUniqueID):
self.processors = int(self.ceParameters.get("NumberOfProcessors", 1))
self.ceParameters["MaxTotalJobs"] = 1

# Indicates that the submission is done synchronously
# The result is immediately available
self.ceParameters["AsyncSubmission"] = False

#############################################################################
def submitJob(self, executableFile, proxy=None, **kwargs):
"""Method to submit job, overridden from super-class.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,7 @@ def test_submitBadJobs_and_getResult(mocker, createAndDelete, script, ceSubmissi
# It cannot capture failures occurring during the submission or after
# because it is asynchronous
assert result["OK"] is True
assert result["Value"] == 0

# Waiting for the results of the submission/execution of the script
while not ce.taskResults:
Expand Down
Loading

0 comments on commit 4044167

Please sign in to comment.