From aa812d3d3c49cbb5ab9b303bdce6cca9ebb881dd Mon Sep 17 00:00:00 2001 From: aldbr Date: Mon, 21 Aug 2023 17:15:38 +0200 Subject: [PATCH] feat: replace DIRACJobID with jobReference in AREX/HTCondorCEs --- .../Computing/AREXComputingElement.py | 91 +++++++++---------- .../Computing/HTCondorCEComputingElement.py | 48 +++++----- 2 files changed, 69 insertions(+), 70 deletions(-) diff --git a/src/DIRAC/Resources/Computing/AREXComputingElement.py b/src/DIRAC/Resources/Computing/AREXComputingElement.py index df673d39deb..77010f9b1a5 100755 --- a/src/DIRAC/Resources/Computing/AREXComputingElement.py +++ b/src/DIRAC/Resources/Computing/AREXComputingElement.py @@ -15,7 +15,7 @@ Port added to the CE host name to interact with AREX services. ProxyTimeLeftBeforeRenewal: - Time in seconds before the AREXCE renews proxy of submitted pilots. + Time in seconds before the AREXCE renews proxy of submitted payloads. RESTVersion: Version of the REST interface to use. @@ -105,34 +105,33 @@ def setToken(self, token, valid): super().setToken(token, valid) self.headers["Authorization"] = "Bearer " + self.token["access_token"] - def _arcToDiracID(self, arcJobID): - """Convert an ARC jobID into a DIRAC jobID. + def _arcIDToJobReference(self, arcJobID): + """Convert an ARC jobID into a job reference. Example: 1234 becomes https://:/arex/1234 :param str: ARC jobID - :return: DIRAC jobID + :return: job reference, defined as an ARC jobID with additional details """ # Add CE and protocol information to arc Job ID if "://" in arcJobID: self.log.warn("Identifier already in ARC format", arcJobID) return arcJobID - diracJobID = "https://" + self.ceHost + ":" + self.port + "/arex/" + arcJobID - return diracJobID + return f"https://{self.ceHost}:{self.port}/arex/{arcJobID}" - def _DiracToArcID(self, diracJobID): - """Convert a DIRAC jobID into an ARC jobID. + def _jobReferenceToArcID(self, jobReference): + """Convert a job reference into an ARC jobID. Example: https://:/arex/1234 becomes 1234 - :param str: DIRAC jobID + :param str: job reference, defined as an ARC jobID with additional details :return: ARC jobID """ # Remove CE and protocol information from arc Job ID - if "://" in diracJobID: - arcJobID = diracJobID.split("arex/")[-1] + if "://" in jobReference: + arcJobID = jobReference.split("arex/")[-1] return arcJobID - self.log.warn("Identifier already in REST format?", diracJobID) - return diracJobID + self.log.warn("Identifier already in REST format?", jobReference) + return jobReference ############################################################################# @@ -483,12 +482,12 @@ def submitJob(self, executableFile, proxy, numberOfJobs=1, inputs=None, outputs= if not result["OK"]: break - jobID = self._arcToDiracID(arcJobID) - batchIDList.append(jobID) - stampDict[jobID] = diracStamp + jobReference = self._arcIDToJobReference(arcJobID) + batchIDList.append(jobReference) + stampDict[jobReference] = diracStamp self.log.debug( "Successfully submitted job", - f"{jobID} to CE {self.ceHost}", + f"{jobReference} to CE {self.ceHost}", ) if batchIDList: @@ -503,16 +502,16 @@ def submitJob(self, executableFile, proxy, numberOfJobs=1, inputs=None, outputs= def killJob(self, jobIDList): """Kill the specified jobs - :param list jobIDList: list of DIRAC Job IDs + :param list jobIDList: list of Job references """ if not isinstance(jobIDList, list): jobIDList = [jobIDList] self.log.debug("Killing jobs", ",".join(jobIDList)) - # Convert DIRAC jobs to ARC jobs - # DIRAC Jobs might be stored with a DIRAC stamp (":::XXXXX") that should be removed - jList = [self._DiracToArcID(job.split(":::")[0]) for job in jobIDList] - return self._killJob(jList) + # Convert job references to ARC jobs + # Job references might be stored with a DIRAC stamp (":::XXXXX") that should be removed + arcJobList = [self._jobReferenceToArcID(job.split(":::")[0]) for job in jobIDList] + return self._killJob(arcJobList) def _killJob(self, arcJobList): """Kill the specified jobs @@ -545,16 +544,16 @@ def _killJob(self, arcJobList): def cleanJob(self, jobIDList): """Clean files related to the specified jobs - :param list jobIDList: list of DIRAC Job IDs + :param list jobIDList: list of job references """ if not isinstance(jobIDList, list): jobIDList = [jobIDList] self.log.debug("Cleaning jobs", ",".join(jobIDList)) - # Convert DIRAC jobs to ARC jobs - # DIRAC Jobs might be stored with a DIRAC stamp (":::XXXXX") that should be removed - jList = [self._DiracToArcID(job.split(":::")[0]) for job in jobIDList] - return self._cleanJob(jList) + # Convert job references to ARC jobs + # Job references might be stored with a DIRAC stamp (":::XXXXX") that should be removed + arcJobList = [self._jobReferenceToArcID(job.split(":::")[0]) for job in jobIDList] + return self._cleanJob(arcJobList) def _cleanJob(self, arcJobList): """Clean files related to the specified jobs @@ -710,7 +709,7 @@ def _renewDelegation(self, delegationID): def getJobStatus(self, jobIDList): """Get the status information for the given list of jobs. - :param list jobIDList: list of DIRAC Job ID, followed by the DIRAC stamp. + :param list jobIDList: list of job references, followed by the DIRAC stamp. """ result = self._checkSession() if not result["OK"]: @@ -721,9 +720,9 @@ def getJobStatus(self, jobIDList): jobIDList = [jobIDList] self.log.debug("Getting status of jobs:", jobIDList) - # Convert DIRAC jobs to ARC jobs and encapsulate them in a dictionary for the REST query - # DIRAC Jobs might be stored with a DIRAC stamp (":::XXXXX") that should be removed - arcJobsJson = {"job": [{"id": self._DiracToArcID(job.split(":::")[0])} for job in jobIDList]} + # Convert job references to ARC jobs and encapsulate them in a dictionary for the REST query + # Job references might be stored with a DIRAC stamp (":::XXXXX") that should be removed + arcJobsJson = {"job": [{"id": self._jobReferenceToArcID(job.split(":::")[0])} for job in jobIDList]} # Prepare the command params = {"action": "status"} @@ -746,16 +745,16 @@ def getJobStatus(self, jobIDList): arcJobsInfo = [arcJobsInfo] for arcJob in arcJobsInfo: - jobID = self._arcToDiracID(arcJob["id"]) + jobReference = self._arcIDToJobReference(arcJob["id"]) # ARC REST interface returns hyperbole arcState = arcJob["state"].capitalize() - self.log.debug("REST ARC status", f"for job {jobID} is {arcState}") - resultDict[jobID] = self.mapStates[arcState] + self.log.debug("REST ARC status", f"for job {jobReference} is {arcState}") + resultDict[jobReference] = self.mapStates[arcState] # Cancel held jobs so they don't sit in the queue forever if arcState == "Hold": jobsToCancel.append(arcJob["id"]) - self.log.debug(f"Killing held job {jobID}") + self.log.debug(f"Killing held job {jobReference}") # Renew delegations to renew the proxies of the jobs result = self._getDelegationIDs() @@ -782,7 +781,7 @@ def getJobStatus(self, jobIDList): def getJobLog(self, jobID): """Get job logging info - :param str jobID: DIRAC JobID followed by the DIRAC stamp. + :param str jobID: Job reference followed by the DIRAC stamp. :return: string representing the logging info of a given jobID """ result = self._checkSession() @@ -791,7 +790,7 @@ def getJobLog(self, jobID): return result # Prepare the command: Get output files - arcJob = self._DiracToArcID(jobID.split(":::")[0]) + arcJob = self._jobReferenceToArcID(jobID.split(":::")[0]) query = self._urlJoin(os.path.join("jobs", arcJob, "diagnose", "errors")) # Submit the GET request to retrieve outputs @@ -810,7 +809,7 @@ def getJobLog(self, jobID): def _getListOfAvailableOutputs(self, jobID, arcJobID): """Request a list of outputs available for a given jobID. - :param str jobID: DIRAC job ID without the DIRAC stamp + :param str jobID: job reference without the DIRAC stamp :param str arcJobID: ARC job ID :return list: names of the available outputs """ @@ -830,11 +829,11 @@ def _getListOfAvailableOutputs(self, jobID, arcJobID): return S_OK(response.json()["file"]) def getJobOutput(self, jobID, workingDirectory=None): - """Get the outputs of the given DIRAC job ID. + """Get the outputs of the given job reference. Outputs and stored in workingDirectory if present, else in a new directory named . - :param str jobID: DIRAC JobID followed by the DIRAC stamp. + :param str jobID: job reference followed by the DIRAC stamp. :param str workingDirectory: name of the directory containing the retrieved outputs. :return: content of stdout and stderr """ @@ -848,10 +847,10 @@ def getJobOutput(self, jobID, workingDirectory=None): jobRef, stamp = jobID.split(":::") else: return S_ERROR(f"DIRAC stamp not defined for {jobID}") - job = self._DiracToArcID(jobRef) + arcJob = self._jobReferenceToArcID(jobRef) # Get the list of available outputs - result = self._getListOfAvailableOutputs(jobRef, job) + result = self._getListOfAvailableOutputs(jobRef, arcJob) if not result["OK"]: return result remoteOutputs = result["Value"] @@ -860,21 +859,21 @@ def getJobOutput(self, jobID, workingDirectory=None): if not workingDirectory: if "WorkingDirectory" in self.ceParameters: # We assume that workingDirectory exists - workingDirectory = os.path.join(self.ceParameters["WorkingDirectory"], job) + workingDirectory = os.path.join(self.ceParameters["WorkingDirectory"], arcJob) else: - workingDirectory = job + workingDirectory = arcJob os.mkdir(workingDirectory) stdout = None stderr = None for remoteOutput in remoteOutputs: # Prepare the command - query = self._urlJoin(os.path.join("jobs", job, "session", remoteOutput)) + query = self._urlJoin(os.path.join("jobs", arcJob, "session", remoteOutput)) # Submit the GET request to retrieve outputs result = self._request("get", query, stream=True) if not result["OK"]: - self.log.error("Error downloading", f"{remoteOutput} for {job}: {result['Message']}") + self.log.error("Error downloading", f"{remoteOutput} for {arcJob}: {result['Message']}") return S_ERROR(f"Error downloading {remoteOutput} for {jobID}") response = result["Value"] diff --git a/src/DIRAC/Resources/Computing/HTCondorCEComputingElement.py b/src/DIRAC/Resources/Computing/HTCondorCEComputingElement.py index 21b7a95aa76..dc0c2f1f7d7 100644 --- a/src/DIRAC/Resources/Computing/HTCondorCEComputingElement.py +++ b/src/DIRAC/Resources/Computing/HTCondorCEComputingElement.py @@ -109,21 +109,21 @@ def __init__(self, ceUniqueID): ############################################################################# - def _DiracToCondorID(self, diracJobID): - """Convert a DIRAC jobID into a Condor jobID. + def _jobReferenceToCondorID(self, jobReference): + """Convert a job reference into a Condor jobID. Example: htcondorce:///1234.0 becomes 1234.0 - :param str: DIRAC jobID + :param str: job reference, a condor jobID with additional details :return: Condor jobID """ # Remove CE and protocol information from arc Job ID - if "://" in diracJobID: - condorJobID = diracJobID.split("/")[-1] + if "://" in jobReference: + condorJobID = jobReference.split("/")[-1] return condorJobID - return diracJobID + return jobReference - def _condorToDiracID(self, condorJobIDs): - """Get the references from the condor_submit output. + def _condorIDToJobReference(self, condorJobIDs): + """Get the job references from the condor job IDs. Cluster ids look like " 107.0 - 107.0 " or " 107.0 - 107.4 " :param str condorJobIDs: the output of condor_submit @@ -318,16 +318,16 @@ def submitJob(self, executableFile, proxy, numberOfJobs=1): return result stdout = result["Value"] - pilotJobReferences = self._condorToDiracID(stdout) - if not pilotJobReferences["OK"]: - return pilotJobReferences - pilotJobReferences = pilotJobReferences["Value"] + jobReferences = self._condorIDToJobReference(stdout) + if not jobReferences["OK"]: + return jobReferences + jobReferences = jobReferences["Value"] self.log.verbose("JobStamps:", jobStamps) - self.log.verbose("pilotRefs:", pilotJobReferences) + self.log.verbose("pilotRefs:", jobReferences) - result = S_OK(pilotJobReferences) - result["PilotStampDict"] = dict(zip(pilotJobReferences, jobStamps)) + result = S_OK(jobReferences) + result["PilotStampDict"] = dict(zip(jobReferences, jobStamps)) if self.useLocalSchedd: # Executable is transferred afterward # Inform the caller that Condor cannot delete it before the end of the execution @@ -346,15 +346,15 @@ def killJob(self, jobIDList): self.log.verbose("KillJob jobIDList", jobIDList) self.tokenFile = None - for diracJobID in jobIDList: - condorJobID = self._DiracToCondorID(diracJobID.split(":::")[0]) - self.log.verbose("Killing pilot", diracJobID) + for jobReference in jobIDList: + condorJobID = self._jobReferenceToCondorID(jobReference.split(":::")[0]) + self.log.verbose("Killing pilot", jobReference) cmd = ["condor_rm"] cmd.extend(self.remoteScheddOptions.strip().split(" ")) cmd.append(condorJobID) result = self._executeCondorCommand(cmd, keepTokenFile=True) if not result["OK"]: - self.log.error("Failed to kill pilot", f"{diracJobID}: {result['Message']}") + self.log.error("Failed to kill pilot", f"{jobReference}: {result['Message']}") return result self.tokenFile = None @@ -403,9 +403,9 @@ def getJobStatus(self, jobIDList): resultDict = {} condorIDs = {} # Get all condorIDs so we can just call condor_q and condor_history once - for diracJobID in jobIDList: - diracJobID = diracJobID.split(":::")[0] - condorIDs[diracJobID] = self._DiracToCondorID(diracJobID) + for jobReference in jobIDList: + jobReference = jobReference.split(":::")[0] + condorIDs[jobReference] = self._jobReferenceToCondorID(jobReference) self.tokenFile = None @@ -493,7 +493,7 @@ def __getJobOutput(self, jobID, outTypes): """ # Extract stamp from the Job ID if ":::" in jobID: - diracJobID, stamp = jobID.split(":::") + jobReference, stamp = jobID.split(":::") else: return S_ERROR(f"DIRAC stamp not defined for {jobID}") @@ -503,7 +503,7 @@ def __getJobOutput(self, jobID, outTypes): return S_ERROR(f"Stamp is not long enough: {stamp}") pathToResult = os.path.join(self.ceName, stamp[0], stamp[1:3]) - condorJobID = self._DiracToCondorID(diracJobID) + condorJobID = self._jobReferenceToCondorID(jobReference) iwd = os.path.join(self.workingDirectory, pathToResult) try: