Skip to content

Commit

Permalink
sweep: DIRACGrid#7069 Getting more details about failed/aborted pilot…
Browse files Browse the repository at this point in the history
…s from HTCondor
  • Loading branch information
aldbr authored and web-flow committed Nov 7, 2023
1 parent 38cde53 commit 1cffae5
Show file tree
Hide file tree
Showing 4 changed files with 375 additions and 351 deletions.
91 changes: 45 additions & 46 deletions src/DIRAC/Resources/Computing/AREXComputingElement.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
Port added to the CE host name to interact with AREX services.
ProxyTimeLeftBeforeRenewal:
Time in seconds before the AREXCE renews proxy of submitted pilots.
Time in seconds before the AREXCE renews proxy of submitted payloads.
RESTVersion:
Version of the REST interface to use.
Expand Down Expand Up @@ -97,34 +97,33 @@ def setToken(self, token, valid):
super().setToken(token, valid)
self.headers["Authorization"] = "Bearer " + self.token["access_token"]

def _arcToDiracID(self, arcJobID):
"""Convert an ARC jobID into a DIRAC jobID.
def _arcIDToJobReference(self, arcJobID):
"""Convert an ARC jobID into a job reference.
Example: 1234 becomes https://<ce>:<port>/arex/1234
:param str: ARC jobID
:return: DIRAC jobID
:return: job reference, defined as an ARC jobID with additional details
"""
# Add CE and protocol information to arc Job ID
if "://" in arcJobID:
self.log.warn("Identifier already in ARC format", arcJobID)
return arcJobID

diracJobID = "https://" + self.ceHost + ":" + self.port + "/arex/" + arcJobID
return diracJobID
return f"https://{self.ceHost}:{self.port}/arex/{arcJobID}"

def _DiracToArcID(self, diracJobID):
"""Convert a DIRAC jobID into an ARC jobID.
def _jobReferenceToArcID(self, jobReference):
"""Convert a job reference into an ARC jobID.
Example: https://<ce>:<port>/arex/1234 becomes 1234
:param str: DIRAC jobID
:param str: job reference, defined as an ARC jobID with additional details
:return: ARC jobID
"""
# Remove CE and protocol information from arc Job ID
if "://" in diracJobID:
arcJobID = diracJobID.split("arex/")[-1]
if "://" in jobReference:
arcJobID = jobReference.split("arex/")[-1]
return arcJobID
self.log.warn("Identifier already in REST format?", diracJobID)
return diracJobID
self.log.warn("Identifier already in REST format?", jobReference)
return jobReference

#############################################################################

Expand Down Expand Up @@ -486,12 +485,12 @@ def submitJob(self, executableFile, proxy, numberOfJobs=1, inputs=None, outputs=
if not result["OK"]:
break

jobID = self._arcToDiracID(arcJobID)
batchIDList.append(jobID)
stampDict[jobID] = diracStamp
jobReference = self._arcIDToJobReference(arcJobID)
batchIDList.append(jobReference)
stampDict[jobReference] = diracStamp
self.log.debug(
"Successfully submitted job",
f"{jobID} to CE {self.ceHost}",
f"{jobReference} to CE {self.ceHost}",
)

if batchIDList:
Expand All @@ -506,16 +505,16 @@ def submitJob(self, executableFile, proxy, numberOfJobs=1, inputs=None, outputs=
def killJob(self, jobIDList):
"""Kill the specified jobs
:param list jobIDList: list of DIRAC Job IDs
:param list jobIDList: list of Job references
"""
if not isinstance(jobIDList, list):
jobIDList = [jobIDList]
self.log.debug("Killing jobs", ",".join(jobIDList))

# Convert DIRAC jobs to ARC jobs
# DIRAC Jobs might be stored with a DIRAC stamp (":::XXXXX") that should be removed
jList = [self._DiracToArcID(job.split(":::")[0]) for job in jobIDList]
return self._killJob(jList)
# Convert job references to ARC jobs
# Job references might be stored with a DIRAC stamp (":::XXXXX") that should be removed
arcJobList = [self._jobReferenceToArcID(job.split(":::")[0]) for job in jobIDList]
return self._killJob(arcJobList)

def _killJob(self, arcJobList):
"""Kill the specified jobs
Expand Down Expand Up @@ -548,16 +547,16 @@ def _killJob(self, arcJobList):
def cleanJob(self, jobIDList):
"""Clean files related to the specified jobs
:param list jobIDList: list of DIRAC Job IDs
:param list jobIDList: list of job references
"""
if not isinstance(jobIDList, list):
jobIDList = [jobIDList]
self.log.debug("Cleaning jobs", ",".join(jobIDList))

# Convert DIRAC jobs to ARC jobs
# DIRAC Jobs might be stored with a DIRAC stamp (":::XXXXX") that should be removed
jList = [self._DiracToArcID(job.split(":::")[0]) for job in jobIDList]
return self._cleanJob(jList)
# Convert job references to ARC jobs
# Job references might be stored with a DIRAC stamp (":::XXXXX") that should be removed
arcJobList = [self._jobReferenceToArcID(job.split(":::")[0]) for job in jobIDList]
return self._cleanJob(arcJobList)

def _cleanJob(self, arcJobList):
"""Clean files related to the specified jobs
Expand Down Expand Up @@ -713,7 +712,7 @@ def _renewDelegation(self, delegationID):
def getJobStatus(self, jobIDList):
"""Get the status information for the given list of jobs.
:param list jobIDList: list of DIRAC Job ID, followed by the DIRAC stamp.
:param list jobIDList: list of job references, followed by the DIRAC stamp.
"""
result = self._checkSession()
if not result["OK"]:
Expand All @@ -724,9 +723,9 @@ def getJobStatus(self, jobIDList):
jobIDList = [jobIDList]

self.log.debug("Getting status of jobs:", jobIDList)
# Convert DIRAC jobs to ARC jobs and encapsulate them in a dictionary for the REST query
# DIRAC Jobs might be stored with a DIRAC stamp (":::XXXXX") that should be removed
arcJobsJson = {"job": [{"id": self._DiracToArcID(job.split(":::")[0])} for job in jobIDList]}
# Convert job references to ARC jobs and encapsulate them in a dictionary for the REST query
# Job references might be stored with a DIRAC stamp (":::XXXXX") that should be removed
arcJobsJson = {"job": [{"id": self._jobReferenceToArcID(job.split(":::")[0])} for job in jobIDList]}

# Prepare the command
params = {"action": "status"}
Expand All @@ -749,16 +748,16 @@ def getJobStatus(self, jobIDList):
arcJobsInfo = [arcJobsInfo]

for arcJob in arcJobsInfo:
jobID = self._arcToDiracID(arcJob["id"])
jobReference = self._arcIDToJobReference(arcJob["id"])
# ARC REST interface returns hyperbole
arcState = arcJob["state"].capitalize()
self.log.debug("REST ARC status", f"for job {jobID} is {arcState}")
resultDict[jobID] = self.mapStates[arcState]
self.log.debug("REST ARC status", f"for job {jobReference} is {arcState}")
resultDict[jobReference] = self.mapStates[arcState]

# Cancel held jobs so they don't sit in the queue forever
if arcState == "Hold":
jobsToCancel.append(arcJob["id"])
self.log.debug(f"Killing held job {jobID}")
self.log.debug(f"Killing held job {jobReference}")

# Renew delegations to renew the proxies of the jobs
result = self._getDelegationIDs()
Expand All @@ -785,7 +784,7 @@ def getJobStatus(self, jobIDList):
def getJobLog(self, jobID):
"""Get job logging info
:param str jobID: DIRAC JobID followed by the DIRAC stamp.
:param str jobID: Job reference followed by the DIRAC stamp.
:return: string representing the logging info of a given jobID
"""
result = self._checkSession()
Expand All @@ -794,7 +793,7 @@ def getJobLog(self, jobID):
return result

# Prepare the command: Get output files
arcJob = self._DiracToArcID(jobID.split(":::")[0])
arcJob = self._jobReferenceToArcID(jobID.split(":::")[0])
query = self._urlJoin(os.path.join("jobs", arcJob, "diagnose", "errors"))

# Submit the GET request to retrieve outputs
Expand All @@ -813,7 +812,7 @@ def getJobLog(self, jobID):
def _getListOfAvailableOutputs(self, jobID, arcJobID):
"""Request a list of outputs available for a given jobID.
:param str jobID: DIRAC job ID without the DIRAC stamp
:param str jobID: job reference without the DIRAC stamp
:param str arcJobID: ARC job ID
:return list: names of the available outputs
"""
Expand All @@ -833,11 +832,11 @@ def _getListOfAvailableOutputs(self, jobID, arcJobID):
return S_OK(response.json()["file"])

def getJobOutput(self, jobID, workingDirectory=None):
"""Get the outputs of the given DIRAC job ID.
"""Get the outputs of the given job reference.
Outputs and stored in workingDirectory if present, else in a new directory named <ARC JobID>.
:param str jobID: DIRAC JobID followed by the DIRAC stamp.
:param str jobID: job reference followed by the DIRAC stamp.
:param str workingDirectory: name of the directory containing the retrieved outputs.
:return: content of stdout and stderr
"""
Expand All @@ -851,10 +850,10 @@ def getJobOutput(self, jobID, workingDirectory=None):
jobRef, stamp = jobID.split(":::")
else:
return S_ERROR(f"DIRAC stamp not defined for {jobID}")
job = self._DiracToArcID(jobRef)
arcJob = self._jobReferenceToArcID(jobRef)

# Get the list of available outputs
result = self._getListOfAvailableOutputs(jobRef, job)
result = self._getListOfAvailableOutputs(jobRef, arcJob)
if not result["OK"]:
return result
remoteOutputs = result["Value"]
Expand All @@ -863,21 +862,21 @@ def getJobOutput(self, jobID, workingDirectory=None):
if not workingDirectory:
if "WorkingDirectory" in self.ceParameters:
# We assume that workingDirectory exists
workingDirectory = os.path.join(self.ceParameters["WorkingDirectory"], job)
workingDirectory = os.path.join(self.ceParameters["WorkingDirectory"], arcJob)
else:
workingDirectory = job
workingDirectory = arcJob
os.mkdir(workingDirectory)

stdout = None
stderr = None
for remoteOutput in remoteOutputs:
# Prepare the command
query = self._urlJoin(os.path.join("jobs", job, "session", remoteOutput))
query = self._urlJoin(os.path.join("jobs", arcJob, "session", remoteOutput))

# Submit the GET request to retrieve outputs
result = self._request("get", query, stream=True)
if not result["OK"]:
self.log.error("Error downloading", f"{remoteOutput} for {job}: {result['Message']}")
self.log.error("Error downloading", f"{remoteOutput} for {arcJob}: {result['Message']}")
return S_ERROR(f"Error downloading {remoteOutput} for {jobID}")
response = result["Value"]

Expand Down
Loading

0 comments on commit 1cffae5

Please sign in to comment.