diff --git a/src/DIRAC/Resources/Computing/AREXComputingElement.py b/src/DIRAC/Resources/Computing/AREXComputingElement.py index 0ff6f151581..efb756641e1 100755 --- a/src/DIRAC/Resources/Computing/AREXComputingElement.py +++ b/src/DIRAC/Resources/Computing/AREXComputingElement.py @@ -15,7 +15,7 @@ Port added to the CE host name to interact with AREX services. ProxyTimeLeftBeforeRenewal: - Time in seconds before the AREXCE renews proxy of submitted pilots. + Time in seconds before the AREXCE renews proxy of submitted payloads. RESTVersion: Version of the REST interface to use. @@ -105,34 +105,33 @@ def setToken(self, token, valid): super().setToken(token, valid) self.headers["Authorization"] = "Bearer " + self.token["access_token"] - def _arcToDiracID(self, arcJobID): - """Convert an ARC jobID into a DIRAC jobID. + def _arcIDToJobReference(self, arcJobID): + """Convert an ARC jobID into a job reference. Example: 1234 becomes https://:/arex/1234 :param str: ARC jobID - :return: DIRAC jobID + :return: job reference, defined as an ARC jobID with additional details """ # Add CE and protocol information to arc Job ID if "://" in arcJobID: self.log.warn("Identifier already in ARC format", arcJobID) return arcJobID - diracJobID = "https://" + self.ceHost + ":" + self.port + "/arex/" + arcJobID - return diracJobID + return f"https://{self.ceHost}:{self.port}/arex/{arcJobID}" - def _DiracToArcID(self, diracJobID): - """Convert a DIRAC jobID into an ARC jobID. + def _jobReferenceToArcID(self, jobReference): + """Convert a job reference into an ARC jobID. Example: https://:/arex/1234 becomes 1234 - :param str: DIRAC jobID + :param str: job reference, defined as an ARC jobID with additional details :return: ARC jobID """ # Remove CE and protocol information from arc Job ID - if "://" in diracJobID: - arcJobID = diracJobID.split("arex/")[-1] + if "://" in jobReference: + arcJobID = jobReference.split("arex/")[-1] return arcJobID - self.log.warn("Identifier already in REST format?", diracJobID) - return diracJobID + self.log.warn("Identifier already in REST format?", jobReference) + return jobReference ############################################################################# @@ -494,12 +493,12 @@ def submitJob(self, executableFile, proxy, numberOfJobs=1, inputs=None, outputs= if not result["OK"]: break - jobID = self._arcToDiracID(arcJobID) - batchIDList.append(jobID) - stampDict[jobID] = diracStamp + jobReference = self._arcIDToJobReference(arcJobID) + batchIDList.append(jobReference) + stampDict[jobReference] = diracStamp self.log.debug( "Successfully submitted job", - f"{jobID} to CE {self.ceHost}", + f"{jobReference} to CE {self.ceHost}", ) if batchIDList: @@ -514,16 +513,16 @@ def submitJob(self, executableFile, proxy, numberOfJobs=1, inputs=None, outputs= def killJob(self, jobIDList): """Kill the specified jobs - :param list jobIDList: list of DIRAC Job IDs + :param list jobIDList: list of Job references """ if not isinstance(jobIDList, list): jobIDList = [jobIDList] self.log.debug("Killing jobs", ",".join(jobIDList)) - # Convert DIRAC jobs to ARC jobs - # DIRAC Jobs might be stored with a DIRAC stamp (":::XXXXX") that should be removed - jList = [self._DiracToArcID(job.split(":::")[0]) for job in jobIDList] - return self._killJob(jList) + # Convert job references to ARC jobs + # Job references might be stored with a DIRAC stamp (":::XXXXX") that should be removed + arcJobList = [self._jobReferenceToArcID(job.split(":::")[0]) for job in jobIDList] + return self._killJob(arcJobList) def _killJob(self, arcJobList): """Kill the specified jobs @@ -556,16 +555,16 @@ def _killJob(self, arcJobList): def cleanJob(self, jobIDList): """Clean files related to the specified jobs - :param list jobIDList: list of DIRAC Job IDs + :param list jobIDList: list of job references """ if not isinstance(jobIDList, list): jobIDList = [jobIDList] self.log.debug("Cleaning jobs", ",".join(jobIDList)) - # Convert DIRAC jobs to ARC jobs - # DIRAC Jobs might be stored with a DIRAC stamp (":::XXXXX") that should be removed - jList = [self._DiracToArcID(job.split(":::")[0]) for job in jobIDList] - return self._cleanJob(jList) + # Convert job references to ARC jobs + # Job references might be stored with a DIRAC stamp (":::XXXXX") that should be removed + arcJobList = [self._jobReferenceToArcID(job.split(":::")[0]) for job in jobIDList] + return self._cleanJob(arcJobList) def _cleanJob(self, arcJobList): """Clean files related to the specified jobs @@ -721,7 +720,7 @@ def _renewDelegation(self, delegationID): def getJobStatus(self, jobIDList): """Get the status information for the given list of jobs. - :param list jobIDList: list of DIRAC Job ID, followed by the DIRAC stamp. + :param list jobIDList: list of job references, followed by the DIRAC stamp. """ result = self._checkSession() if not result["OK"]: @@ -732,9 +731,9 @@ def getJobStatus(self, jobIDList): jobIDList = [jobIDList] self.log.debug("Getting status of jobs:", jobIDList) - # Convert DIRAC jobs to ARC jobs and encapsulate them in a dictionary for the REST query - # DIRAC Jobs might be stored with a DIRAC stamp (":::XXXXX") that should be removed - arcJobsJson = {"job": [{"id": self._DiracToArcID(job.split(":::")[0])} for job in jobIDList]} + # Convert job references to ARC jobs and encapsulate them in a dictionary for the REST query + # Job references might be stored with a DIRAC stamp (":::XXXXX") that should be removed + arcJobsJson = {"job": [{"id": self._jobReferenceToArcID(job.split(":::")[0])} for job in jobIDList]} # Prepare the command params = {"action": "status"} @@ -757,16 +756,16 @@ def getJobStatus(self, jobIDList): arcJobsInfo = [arcJobsInfo] for arcJob in arcJobsInfo: - jobID = self._arcToDiracID(arcJob["id"]) + jobReference = self._arcIDToJobReference(arcJob["id"]) # ARC REST interface returns hyperbole arcState = arcJob["state"].capitalize() - self.log.debug("REST ARC status", f"for job {jobID} is {arcState}") - resultDict[jobID] = self.mapStates[arcState] + self.log.debug("REST ARC status", f"for job {jobReference} is {arcState}") + resultDict[jobReference] = self.mapStates[arcState] # Cancel held jobs so they don't sit in the queue forever if arcState == "Hold": jobsToCancel.append(arcJob["id"]) - self.log.debug(f"Killing held job {jobID}") + self.log.debug(f"Killing held job {jobReference}") # Renew delegations to renew the proxies of the jobs result = self._getDelegationIDs() @@ -793,7 +792,7 @@ def getJobStatus(self, jobIDList): def getJobLog(self, jobID): """Get job logging info - :param str jobID: DIRAC JobID followed by the DIRAC stamp. + :param str jobID: Job reference followed by the DIRAC stamp. :return: string representing the logging info of a given jobID """ result = self._checkSession() @@ -802,7 +801,7 @@ def getJobLog(self, jobID): return result # Prepare the command: Get output files - arcJob = self._DiracToArcID(jobID.split(":::")[0]) + arcJob = self._jobReferenceToArcID(jobID.split(":::")[0]) query = self._urlJoin(os.path.join("jobs", arcJob, "diagnose", "errors")) # Submit the GET request to retrieve outputs @@ -821,7 +820,7 @@ def getJobLog(self, jobID): def _getListOfAvailableOutputs(self, jobID, arcJobID): """Request a list of outputs available for a given jobID. - :param str jobID: DIRAC job ID without the DIRAC stamp + :param str jobID: job reference without the DIRAC stamp :param str arcJobID: ARC job ID :return list: names of the available outputs """ @@ -841,11 +840,11 @@ def _getListOfAvailableOutputs(self, jobID, arcJobID): return S_OK(response.json()["file"]) def getJobOutput(self, jobID, workingDirectory=None): - """Get the outputs of the given DIRAC job ID. + """Get the outputs of the given job reference. Outputs and stored in workingDirectory if present, else in a new directory named . - :param str jobID: DIRAC JobID followed by the DIRAC stamp. + :param str jobID: job reference followed by the DIRAC stamp. :param str workingDirectory: name of the directory containing the retrieved outputs. :return: content of stdout and stderr """ @@ -859,10 +858,10 @@ def getJobOutput(self, jobID, workingDirectory=None): jobRef, stamp = jobID.split(":::") else: return S_ERROR(f"DIRAC stamp not defined for {jobID}") - job = self._DiracToArcID(jobRef) + arcJob = self._jobReferenceToArcID(jobRef) # Get the list of available outputs - result = self._getListOfAvailableOutputs(jobRef, job) + result = self._getListOfAvailableOutputs(jobRef, arcJob) if not result["OK"]: return result remoteOutputs = result["Value"] @@ -871,21 +870,21 @@ def getJobOutput(self, jobID, workingDirectory=None): if not workingDirectory: if "WorkingDirectory" in self.ceParameters: # We assume that workingDirectory exists - workingDirectory = os.path.join(self.ceParameters["WorkingDirectory"], job) + workingDirectory = os.path.join(self.ceParameters["WorkingDirectory"], arcJob) else: - workingDirectory = job + workingDirectory = arcJob os.mkdir(workingDirectory) stdout = None stderr = None for remoteOutput in remoteOutputs: # Prepare the command - query = self._urlJoin(os.path.join("jobs", job, "session", remoteOutput)) + query = self._urlJoin(os.path.join("jobs", arcJob, "session", remoteOutput)) # Submit the GET request to retrieve outputs result = self._request("get", query, stream=True) if not result["OK"]: - self.log.error("Error downloading", f"{remoteOutput} for {job}: {result['Message']}") + self.log.error("Error downloading", f"{remoteOutput} for {arcJob}: {result['Message']}") return S_ERROR(f"Error downloading {remoteOutput} for {jobID}") response = result["Value"] diff --git a/src/DIRAC/Resources/Computing/BatchSystems/Condor.py b/src/DIRAC/Resources/Computing/BatchSystems/Condor.py index d9ef473c5eb..1a4c2dc5471 100644 --- a/src/DIRAC/Resources/Computing/BatchSystems/Condor.py +++ b/src/DIRAC/Resources/Computing/BatchSystems/Condor.py @@ -1,9 +1,3 @@ -######################################################################################### -# Condor.py -# 10.11.2014 -# Author: A.T. -######################################################################################### - """ Condor.py is a DIRAC independent class representing Condor batch system. Condor objects are used as backend batch system representation for LocalComputingElement and SSHComputingElement classes @@ -19,50 +13,137 @@ import os +# Cannot use the PilotStatus module here as Condor is meant to be executed on a remote machine +# DIRAC might not be available +STATES_MAP = { + 1: "Waiting", + 2: "Running", + 3: "Aborted", + 4: "Done", + 5: "Failed", +} + +HOLD_REASON_SUBCODE = "55" + +subTemplate = """ +# Environment +# ----------- +# There exist many universe: +# https://htcondor.readthedocs.io/en/latest/users-manual/choosing-an-htcondor-universe.html +universe = %(targetUniverse)s + +# Inputs/Outputs +# -------------- +# Inputs: executable to submit +executable = %(executable)s + +# Directory that will contain the outputs +initialdir = %(initialDir)s + +# Outputs: stdout, stderr, log +output = $(Cluster).$(Process).out +error = $(Cluster).$(Process).err +log = $(Cluster).$(Process).log + +# No other files are to be transferred +transfer_output_files = "" + +# Transfer outputs, even if the job is failed +should_transfer_files = YES +when_to_transfer_output = ON_EXIT_OR_EVICT + +# Environment variables to pass to the job +environment = "DIRAC_PILOT_STAMP=$(stamp) %(environment)s" + +# Credentials +# ----------- +%(useCredentials)s + +# Requirements +# ------------ +request_cpus = %(processors)s + +# Exit options +# ------------ +# Specify the signal sent to the job when HTCondor needs to vacate the worker node +kill_sig=SIGTERM +# By default, HTCondor marked jobs as completed regardless of its status +# This option allows to mark jobs as Held if they don't finish successfully +on_exit_hold = ExitCode != 0 +# A subcode of our choice to identify who put the job on hold +on_exit_hold_subcode = %(holdReasonSubcode)s +# Jobs are then deleted from the system after N days if they are not idle or running +periodic_remove = (JobStatus != 1) && (JobStatus != 2) && ((time() - EnteredCurrentStatus) > (%(daysToKeepRemoteLogs)s * 24 * 3600)) + +# Specific options +# ---------------- +# Local vs Remote schedd +%(scheddOptions)s +# CE-specific options +%(extraString)s + + +Queue stamp in %(pilotStampList)s +""" + + def parseCondorStatus(lines, jobID): """parse the condor_q or condor_history output for the job status - :param lines: list of lines from the output of the condor commands, each line is a pair of jobID and statusID + :param lines: list of lines from the output of the condor commands, each line is a tuple of jobID, statusID, and holdReasonCode :type lines: python:list :param str jobID: jobID of condor job, e.g.: 123.53 - :returns: Status as known by DIRAC + :returns: Status as known by DIRAC, and a reason if the job is being held """ jobID = str(jobID) + + holdReason = "" + status = None for line in lines: l = line.strip().split() + + # Make sure the job ID exists + if len(l) < 1 or l[0] != jobID: + continue + + # Make sure the status is present and is an integer try: status = int(l[1]) except (ValueError, IndexError): - continue - if l[0] == jobID: - return {1: "Waiting", 2: "Running", 3: "Aborted", 4: "Done", 5: "HELD"}.get(status, "Unknown") - return "Unknown" + break + # Stop here if the status is not held (5): result should be found in STATES_MAP + if status != 5: + break -def treatCondorHistory(condorHistCall, qList): - """concatenate clusterID and processID to get the same output as condor_q - until we can expect condor version 8.5.3 everywhere + # A job can be held for various reasons, + # we need to further investigate with the holdReasonCode & holdReasonSubCode + # Details in: + # https://htcondor.readthedocs.io/en/latest/classad-attributes/job-classad-attributes.html#HoldReasonCode - :param str condorHistCall: condor_history command to run - :param qList: list of jobID and status from condor_q output, will be modified in this function - :type qList: python:list - :returns: None - """ - sp = subprocess.Popen( - shlex.split(condorHistCall), - stdout=subprocess.PIPE, - stderr=subprocess.PIPE, - universal_newlines=True, - ) - output, _ = sp.communicate() - status = sp.returncode - - # Join the ClusterId and the ProcId and add to existing list of statuses - if status == 0: - for line in output.split("\n"): - values = line.strip().split() - if len(values) == 3: - qList.append("%s.%s %s" % tuple(values)) + # By default, a held (5) job is defined as Aborted in STATES_MAP, but there might be some exceptions + status = 3 + try: + holdReasonCode = l[2] + holdReasonSubcode = l[3] + holdReason = " ".join(l[4:]) + except IndexError: + # This should not happen in theory + # Just set the status to unknown such as + status = None + holdReasonCode = "undefined" + holdReasonSubcode = "undefined" + break + + # If holdReasonCode is 3 (The PERIODIC_HOLD expression evaluated to True. Or, ON_EXIT_HOLD was true) + # And subcode is HOLD_REASON_SUBCODE, then it means the job failed by itself, it needs to be marked as Failed + if holdReasonCode == "3" and holdReasonSubcode == HOLD_REASON_SUBCODE: + status = 5 + # If holdReasonCode is 16 (Input files are being spooled), the job should be marked as Waiting + elif holdReasonCode == "16": + status = 1 + + return (STATES_MAP.get(status, "Unknown"), holdReason) class Condor(object): @@ -96,24 +177,23 @@ def submitJob(self, **kwargs): return resultDict jdlFile = tempfile.NamedTemporaryFile(dir=outputDir, suffix=".jdl") + scheddOptions = 'requirements = OpSys == "LINUX"\n' + scheddOptions += "gentenv = False" jdlFile.write( - """ - Executable = %s - Universe = vanilla - Requirements = OpSys == "LINUX" - Initialdir = %s - Output = $(Cluster).$(Process).out - Error = $(Cluster).$(Process).err - Log = test.log - Environment = "CONDOR_JOBID=$(Cluster).$(Process) DIRAC_PILOT_STAMP=$(stamp)" - Getenv = False - - request_cpus = %s - - Queue stamp in %s - - """ - % (executable, outputDir, numberOfProcessors, ",".join(stamps)) + subTemplate + % dict( + targetUniverse="vanilla", + executable=executable, + initialDir=outputDir, + environment="CONDOR_JOBID=$(Cluster).$(Process)", + useCredentials="", + processors=numberOfProcessors, + holdReasonSubcode=HOLD_REASON_SUBCODE, + daysToKeepRemoteLogs=1, + scheddOptions="", + extraString="", + pilotStampList=",".join(stamps), + ) ) jdlFile.flush() @@ -233,7 +313,7 @@ def getJobStatus(self, **kwargs): resultDict["Message"] = "No user name" return resultDict - cmd = "condor_q -submitter %s -af:j JobStatus" % user + cmd = "condor_q -submitter %s -af:j JobStatus HoldReasonCode HoldReasonSubCode HoldReason" % user sp = subprocess.Popen( shlex.split(cmd), stdout=subprocess.PIPE, @@ -250,19 +330,26 @@ def getJobStatus(self, **kwargs): qList = output.strip().split("\n") - # FIXME: condor_history does only support j for autoformat from 8.5.3, - # format adds whitespace for each field This will return a list of 1245 75 3 - # needs to cocatenate the first two with a dot - condorHistCall = "condor_history -af ClusterId ProcId JobStatus -submitter %s" % user - treatCondorHistory(condorHistCall, qList) + condorHistCall = ( + "condor_history -af:j JobStatus HoldReasonCode HoldReasonSubCode HoldReason -submitter %s" % user + ) + sp = subprocess.Popen( + shlex.split(condorHistCall), + stdout=subprocess.PIPE, + stderr=subprocess.PIPE, + universal_newlines=True, + ) + output, _ = sp.communicate() + status = sp.returncode + if status == 0: + for line in output.split("\n"): + qList.append(line) statusDict = {} if len(qList): for job in jobIDList: job = str(job) - statusDict[job] = parseCondorStatus(qList, job) - if statusDict[job] == "HELD": - statusDict[job] = "Unknown" + statusDict[job], _ = parseCondorStatus(qList, job) # Final output status = 0 diff --git a/src/DIRAC/Resources/Computing/HTCondorCEComputingElement.py b/src/DIRAC/Resources/Computing/HTCondorCEComputingElement.py index 4010adbd1c0..dc0c2f1f7d7 100644 --- a/src/DIRAC/Resources/Computing/HTCondorCEComputingElement.py +++ b/src/DIRAC/Resources/Computing/HTCondorCEComputingElement.py @@ -63,7 +63,7 @@ from DIRAC.WorkloadManagementSystem.Client.PilotManagerClient import PilotManagerClient from DIRAC.FrameworkSystem.private.authorization.utils.Tokens import writeToTokenFile from DIRAC.Core.Security.Locations import getCAsLocation -from DIRAC.Resources.Computing.BatchSystems.Condor import parseCondorStatus +from DIRAC.Resources.Computing.BatchSystems.Condor import HOLD_REASON_SUBCODE, subTemplate, parseCondorStatus MANDATORY_PARAMETERS = ["Queue"] DEFAULT_WORKINGDIRECTORY = "/opt/dirac/pro/runit/WorkloadManagement/SiteDirectorHT" @@ -71,50 +71,6 @@ DEFAULT_DAYSTOKEEPLOGS = 15 -def logDir(ceName, stamp): - """Return path to log and output files for pilot. - - :param str ceName: Name of the CE - :param str stamp: pilot stamp from/for jobRef - """ - return os.path.join(ceName, stamp[0], stamp[1:3]) - - -def condorIDAndPathToResultFromJobRef(jobRef): - """Extract tuple of jobURL and jobID from the jobRef string. - The condorID as well as the path leading to the job results are also extracted from the jobID. - - :param str jobRef: PilotJobReference of the following form: ``htcondorce:///:::`` - - :return: tuple composed of the jobURL, the path to the job results and the condorID of the given jobRef - """ - splits = jobRef.split(":::") - jobURL = splits[0] - stamp = splits[1] if len(splits) > 1 else "" - _, _, ceName, condorID = jobURL.split("/") - - # Reconstruct the path leading to the result (log, output) - # Construction of the path can be found in submitJob() - pathToResult = logDir(ceName, stamp) if len(stamp) >= 3 else "" - - return jobURL, pathToResult, condorID - - -def findFile(workingDir, fileName, pathToResult): - """Find a file in a file system. - - :param str workingDir: the name of the directory containing the given file to search for - :param str fileName: the name of the file to find - :param str pathToResult: the path to follow from workingDir to find the file - - :return: path leading to the file - """ - path = os.path.join(workingDir, pathToResult, fileName) - if os.path.exists(path): - return S_OK(path) - return S_ERROR(errno.ENOENT, f"Could not find {path}") - - class HTCondorCEComputingElement(ComputingElement): """HTCondorCE computing element class implementing the functions jobSubmit, getJobOutput @@ -152,11 +108,49 @@ def __init__(self, ceUniqueID): self.tokenFile = None ############################################################################# - def __writeSub(self, executable, nJobs, location, processors, pilotStamps, tokenFile=None): + + def _jobReferenceToCondorID(self, jobReference): + """Convert a job reference into a Condor jobID. + Example: htcondorce:///1234.0 becomes 1234.0 + + :param str: job reference, a condor jobID with additional details + :return: Condor jobID + """ + # Remove CE and protocol information from arc Job ID + if "://" in jobReference: + condorJobID = jobReference.split("/")[-1] + return condorJobID + return jobReference + + def _condorIDToJobReference(self, condorJobIDs): + """Get the job references from the condor job IDs. + Cluster ids look like " 107.0 - 107.0 " or " 107.0 - 107.4 " + + :param str condorJobIDs: the output of condor_submit + + :return: job references such as htcondorce:///. + """ + clusterIDs = condorJobIDs.split("-") + if len(clusterIDs) != 2: + return S_ERROR(f"Something wrong with the condor_submit output: {condorJobIDs}") + clusterIDs = [clu.strip() for clu in clusterIDs] + self.log.verbose("Cluster IDs parsed:", clusterIDs) + try: + clusterID = clusterIDs[0].split(".")[0] + numJobs = clusterIDs[1].split(".")[1] + except IndexError: + return S_ERROR(f"Something wrong with the condor_submit output: {condorJobIDs}") + + cePrefix = f"htcondorce://{self.ceName}/" + jobReferences = [f"{cePrefix}{clusterID}.{i}" for i in range(int(numJobs) + 1)] + return S_OK(jobReferences) + + ############################################################################# + + def __writeSub(self, executable, location, processors, pilotStamps, tokenFile=None): """Create the Sub File for submission. :param str executable: name of the script to execute - :param int nJobs: number of desired jobs :param str location: directory that should contain the result of the jobs :param int processors: number of CPU cores to allocate :param list pilotStamps: list of pilot stamps (strings) @@ -166,7 +160,6 @@ def __writeSub(self, executable, nJobs, location, processors, pilotStamps, token mkDir(os.path.join(self.workingDirectory, location)) self.log.debug("InitialDir:", os.path.join(self.workingDirectory, location)) - self.log.debug(f"ExtraSubmitString:\n### \n {self.extraSubmitString} \n###") fd, name = tempfile.mkstemp(suffix=".sub", prefix="HTCondorCE_", dir=self.workingDirectory) @@ -175,6 +168,7 @@ def __writeSub(self, executable, nJobs, location, processors, pilotStamps, token executable = os.path.join(self.workingDirectory, executable) useCredentials = "use_x509userproxy = true" + # If tokenFile is present, then we transfer it to the worker node if tokenFile: useCredentials += textwrap.dedent( f""" @@ -183,55 +177,25 @@ def __writeSub(self, executable, nJobs, location, processors, pilotStamps, token """ ) - # This is used to remove outputs from the remote schedd - # Used in case a local schedd is not used - periodicRemove = "periodic_remove = " - periodicRemove += "(JobStatus == 4) && " - periodicRemove += f"(time() - EnteredCurrentStatus) > ({self.daysToKeepRemoteLogs} * 24 * 3600)" - - localScheddOptions = ( - """ -ShouldTransferFiles = YES -WhenToTransferOutput = ON_EXIT_OR_EVICT -""" - if self.useLocalSchedd - else periodicRemove - ) - - targetUniverse = "grid" if self.useLocalSchedd else "vanilla" - - sub = """ -executable = %(executable)s -universe = %(targetUniverse)s -%(useCredentials)s -output = $(Cluster).$(Process).out -error = $(Cluster).$(Process).err -log = $(Cluster).$(Process).log -environment = "HTCONDOR_JOBID=$(Cluster).$(Process) DIRAC_PILOT_STAMP=$(stamp)" -initialdir = %(initialDir)s -grid_resource = condor %(ceName)s %(ceName)s:%(port)s -transfer_output_files = "" -request_cpus = %(processors)s -%(localScheddOptions)s - -kill_sig=SIGTERM - -%(extraString)s - -Queue stamp in %(pilotStampList)s + # Remote schedd options by default + targetUniverse = "vanilla" + scheddOptions = "" + if self.useLocalSchedd: + targetUniverse = "grid" + scheddOptions = f"grid_resource = condor {self.ceName} {self.ceName}:{self.port}" -""" % dict( + sub = subTemplate % dict( + targetUniverse=targetUniverse, executable=executable, - nJobs=nJobs, + initialDir=os.path.join(self.workingDirectory, location), + environment="HTCONDOR_JOBID=$(Cluster).$(Process)", + useCredentials=useCredentials, + holdReasonSubcode=HOLD_REASON_SUBCODE, processors=processors, - ceName=self.ceName, - port=self.port, + daysToKeepRemoteLogs=self.daysToKeepRemoteLogs, + scheddOptions=scheddOptions, extraString=self.extraSubmitString, - initialDir=os.path.join(self.workingDirectory, location), - localScheddOptions=localScheddOptions, - targetUniverse=targetUniverse, pilotStampList=",".join(pilotStamps), - useCredentials=useCredentials, ) subFile.write(sub) subFile.close() @@ -262,7 +226,7 @@ def _executeCondorCommand(self, cmd, keepTokenFile=False): :param list cmd: list of the condor command elements :param bool keepTokenFile: flag to reuse or not the previously created token file - :return: S_OK/S_ERROR - the result of the executeGridCommand() call + :return: S_OK/S_ERROR - the stdout parameter of the executeGridCommand() call """ if not self.token and not self.proxy: return S_ERROR(f"Cannot execute the command, token and proxy not found: {cmd}") @@ -291,15 +255,28 @@ def _executeCondorCommand(self, cmd, keepTokenFile=False): if cas := getCAsLocation(): htcEnv["_CONDOR_AUTH_SSL_CLIENT_CADIR"] = cas + # Execute the command result = executeGridCommand( cmd, gridEnvScript=self.gridEnv, gridEnvDict=htcEnv, ) + if not result["OK"]: + self.tokenFile = None + self.log.error("Command", f"{cmd} failed with: {result['Message']}") + return result + + status, stdout, stderr = result["Value"] + if status: + self.tokenFile = None + # We have got a non-zero status code + errorString = stderr if stderr else stdout + return S_ERROR(f"Command {cmd} failed with: {status} - {errorString.strip()}") + # Remove token file if we do not want to keep it self.tokenFile = self.tokenFile if keepTokenFile else None - return result + return S_OK(stdout.strip()) ############################################################################# def submitJob(self, executableFile, proxy, numberOfJobs=1): @@ -318,16 +295,14 @@ def submitJob(self, executableFile, proxy, numberOfJobs=1): jobStamps.append(jobStamp) # We randomize the location of the pilot output and log, because there are just too many of them - location = logDir(self.ceName, commonJobStampPart) + location = os.path.join(self.ceName, commonJobStampPart[0], commonJobStampPart[1:3]) nProcessors = self.ceParameters.get("NumberOfProcessors", 1) if self.token: self.tokenFile = tempfile.NamedTemporaryFile( suffix=".token", prefix="HTCondorCE_", dir=self.workingDirectory ) writeToTokenFile(self.token["access_token"], self.tokenFile.name) - subName = self.__writeSub( - executableFile, numberOfJobs, location, nProcessors, jobStamps, tokenFile=self.tokenFile - ) + subName = self.__writeSub(executableFile, location, nProcessors, jobStamps, tokenFile=self.tokenFile) cmd = ["condor_submit", "-terse", subName] # the options for submit to remote are different than the other remoteScheddOptions @@ -337,30 +312,22 @@ def submitJob(self, executableFile, proxy, numberOfJobs=1): cmd.insert(-1, op) result = self._executeCondorCommand(cmd, keepTokenFile=True) - self.log.verbose(result) os.remove(subName) - self.tokenFile = None if not result["OK"]: self.log.error("Failed to submit jobs to htcondor", result["Message"]) return result - status, stdout, stderr = result["Value"] - - if status: - # We have got a non-zero status code - errorString = stderr if stderr else stdout - return S_ERROR(f"Pilot submission failed with error: {errorString.strip()}") - - pilotJobReferences = self.__getPilotReferences(stdout.strip()) - if not pilotJobReferences["OK"]: - return pilotJobReferences - pilotJobReferences = pilotJobReferences["Value"] + stdout = result["Value"] + jobReferences = self._condorIDToJobReference(stdout) + if not jobReferences["OK"]: + return jobReferences + jobReferences = jobReferences["Value"] self.log.verbose("JobStamps:", jobStamps) - self.log.verbose("pilotRefs:", pilotJobReferences) + self.log.verbose("pilotRefs:", jobReferences) - result = S_OK(pilotJobReferences) - result["PilotStampDict"] = dict(zip(pilotJobReferences, jobStamps)) + result = S_OK(jobReferences) + result["PilotStampDict"] = dict(zip(jobReferences, jobStamps)) if self.useLocalSchedd: # Executable is transferred afterward # Inform the caller that Condor cannot delete it before the end of the execution @@ -377,27 +344,20 @@ def killJob(self, jobIDList): jobIDList = [jobIDList] self.log.verbose("KillJob jobIDList", jobIDList) - self.tokenFile = None - for jobRef in jobIDList: - job, _, jobID = condorIDAndPathToResultFromJobRef(jobRef) - self.log.verbose("Killing pilot", job) + for jobReference in jobIDList: + condorJobID = self._jobReferenceToCondorID(jobReference.split(":::")[0]) + self.log.verbose("Killing pilot", jobReference) cmd = ["condor_rm"] cmd.extend(self.remoteScheddOptions.strip().split(" ")) - cmd.append(jobID) + cmd.append(condorJobID) result = self._executeCondorCommand(cmd, keepTokenFile=True) if not result["OK"]: - self.tokenFile = None - return S_ERROR(f"condor_rm failed completely: {result['Message']}") - status, stdout, stderr = result["Value"] - if status != 0: - self.log.warn("Failed to kill pilot", f"{job}: {stdout}, {stderr}") - self.tokenFile = None - return S_ERROR(f"Failed to kill pilot {job}: {stderr}") + self.log.error("Failed to kill pilot", f"{jobReference}: {result['Message']}") + return result self.tokenFile = None - return S_OK() ############################################################################# @@ -443,77 +403,48 @@ def getJobStatus(self, jobIDList): resultDict = {} condorIDs = {} # Get all condorIDs so we can just call condor_q and condor_history once - for jobRef in jobIDList: - job, _, jobID = condorIDAndPathToResultFromJobRef(jobRef) - condorIDs[job] = jobID + for jobReference in jobIDList: + jobReference = jobReference.split(":::")[0] + condorIDs[jobReference] = self._jobReferenceToCondorID(jobReference) self.tokenFile = None qList = [] for _condorIDs in breakListIntoChunks(condorIDs.values(), 100): - # This will return a list of 1245.75 3 + # This will return a list of 1245.75 3 undefined undefined undefined cmd = ["condor_q"] cmd.extend(self.remoteScheddOptions.strip().split(" ")) cmd.extend(_condorIDs) - cmd.extend(["-af:j", "JobStatus"]) + cmd.extend(["-af:j", "JobStatus", "HoldReasonCode", "HoldReasonSubCode", "HoldReason"]) result = self._executeCondorCommand(cmd, keepTokenFile=True) if not result["OK"]: - self.tokenFile = None - return S_ERROR(f"condor_q failed completely: {result['Message']}") - status, stdout, stderr = result["Value"] - if status != 0: - self.tokenFile = None - return S_ERROR(stdout + stderr) - _qList = stdout.strip().split("\n") - qList.extend(_qList) - - # FIXME: condor_history does only support j for autoformat from 8.5.3, - # format adds whitespace for each field This will return a list of 1245 75 3 - # needs to concatenate the first two with a dot + return result + + qList.extend(result["Value"].split("\n")) + condorHistCall = ["condor_history"] condorHistCall.extend(self.remoteScheddOptions.strip().split(" ")) condorHistCall.extend(_condorIDs) - condorHistCall.extend(["-af", "ClusterId", "ProcId", "JobStatus"]) + condorHistCall.extend(["-af:j", "JobStatus", "HoldReasonCode", "HoldReasonSubCode", "HoldReason"]) + result = self._executeCondorCommand(cmd, keepTokenFile=True) + if not result["OK"]: + return result - self._treatCondorHistory(condorHistCall, qList) + qList.extend(result["Value"].split("\n")) for job, jobID in condorIDs.items(): - pilotStatus = parseCondorStatus(qList, jobID) - if pilotStatus == "HELD": - # make sure the pilot stays dead and gets taken out of the condor_q - cmd = f"condor_rm {self.remoteScheddOptions} {jobID}".split() - _result = self._executeCondorCommand(cmd, keepTokenFile=True) - pilotStatus = PilotStatus.ABORTED + jobStatus, reason = parseCondorStatus(qList, jobID) + + if jobStatus == PilotStatus.ABORTED: + self.log.verbose("Job", f"{jobID} held: {reason}") - resultDict[job] = pilotStatus + resultDict[job] = jobStatus self.tokenFile = None self.log.verbose(f"Pilot Statuses: {resultDict} ") return S_OK(resultDict) - def _treatCondorHistory(self, condorHistCall, qList): - """concatenate clusterID and processID to get the same output as condor_q - until we can expect condor version 8.5.3 everywhere - - :param str condorHistCall: condor_history command to run - :param list qList: list of jobID and status from condor_q output, will be modified in this function - :returns: None - """ - - result = self._executeCondorCommand(condorHistCall) - if not result["OK"]: - return S_ERROR(f"condorHistCall failed completely: {result['Message']}") - - status_history, stdout_history, stderr_history = result["Value"] - - # Join the ClusterId and the ProcId and add to existing list of statuses - if status_history == 0: - for line in stdout_history.split("\n"): - values = line.strip().split() - if len(values) == 3: - qList.append("%s.%s %s" % tuple(values)) - def getJobLog(self, jobID): """Get pilot job logging info from HTCondor @@ -540,13 +471,39 @@ def getJobOutput(self, jobID): return S_OK((result["Value"]["output"], result["Value"]["error"])) + def _findFile(self, fileName, pathToResult): + """Find a file in a file system. + + :param str workingDir: the name of the directory containing the given file to search for + :param str fileName: the name of the file to find + :param str pathToResult: the path to follow from workingDir to find the file + + :return: path leading to the file + """ + path = os.path.join(self.workingDirectory, pathToResult, fileName) + if os.path.exists(path): + return S_OK(path) + return S_ERROR(errno.ENOENT, f"Could not find {path}") + def __getJobOutput(self, jobID, outTypes): """Get job outputs: output, error and logging files from HTCondor :param str jobID: job identifier :param list outTypes: output types targeted (output, error and/or logging) """ - _job, pathToResult, condorID = condorIDAndPathToResultFromJobRef(jobID) + # Extract stamp from the Job ID + if ":::" in jobID: + jobReference, stamp = jobID.split(":::") + else: + return S_ERROR(f"DIRAC stamp not defined for {jobID}") + + # Reconstruct the path leading to the result (log, output) + # Construction of the path can be found in submitJob() + if len(stamp) < 3: + return S_ERROR(f"Stamp is not long enough: {stamp}") + pathToResult = os.path.join(self.ceName, stamp[0], stamp[1:3]) + + condorJobID = self._jobReferenceToCondorID(jobReference) iwd = os.path.join(self.workingDirectory, pathToResult) try: @@ -557,30 +514,19 @@ def __getJobOutput(self, jobID, outTypes): return S_ERROR(e.errno, f"{errorMessage} ({iwd})") if not self.useLocalSchedd: - cmd = ["condor_transfer_data", "-pool", f"{self.ceName}:{self.port}", "-name", self.ceName, condorID] + cmd = ["condor_transfer_data", "-pool", f"{self.ceName}:{self.port}", "-name", self.ceName, condorJobID] result = self._executeCondorCommand(cmd) - self.log.verbose(result) # Getting 'logging' without 'error' and 'output' is possible but will generate command errors # We do not check the command errors if we only want 'logging' if "error" in outTypes or "output" in outTypes: - errorMessage = "Failed to get job output from htcondor" if not result["OK"]: - self.log.error(errorMessage, result["Message"]) return result - # Even if result is OK, the actual exit code of cmd can still be an error - status, stdout, stderr = result["Value"] - if status != 0: - outMessage = stdout.strip() - errMessage = stderr.strip() - varMessage = outMessage + " " + errMessage - self.log.error(errorMessage, varMessage) - return S_ERROR(f"{errorMessage}: {varMessage}") outputsSuffix = {"output": "out", "error": "err", "logging": "log"} outputs = {} for output, suffix in outputsSuffix.items(): - resOut = findFile(self.workingDirectory, f"{condorID}.{suffix}", pathToResult) + resOut = self._findFile(f"{condorJobID}.{suffix}", pathToResult) if not resOut["OK"]: # Return an error if the output type was targeted, else we continue if output in outTypes: @@ -603,29 +549,6 @@ def __getJobOutput(self, jobID, outTypes): return S_OK(outputs) - def __getPilotReferences(self, jobString): - """Get the references from the condor_submit output. - Cluster ids look like " 107.0 - 107.0 " or " 107.0 - 107.4 " - - :param str jobString: the output of condor_submit - - :return: job references such as htcondorce:///-. - """ - self.log.verbose("getPilotReferences:", jobString) - clusterIDs = jobString.split("-") - if len(clusterIDs) != 2: - return S_ERROR(f"Something wrong with the condor_submit output: {jobString}") - clusterIDs = [clu.strip() for clu in clusterIDs] - self.log.verbose("Cluster IDs parsed:", clusterIDs) - try: - clusterID = clusterIDs[0].split(".")[0] - numJobs = clusterIDs[1].split(".")[1] - except IndexError: - return S_ERROR(f"Something wrong with the condor_submit output: {jobString}") - cePrefix = f"htcondorce://{self.ceName}/" - jobReferences = [f"{cePrefix}{clusterID}.{i}" for i in range(int(numJobs) + 1)] - return S_OK(jobReferences) - def __cleanup(self): """Clean the working directory of old jobs""" if not HTCondorCEComputingElement._cleanupLock.acquire(False): diff --git a/src/DIRAC/Resources/Computing/test/Test_HTCondorCEComputingElement.py b/src/DIRAC/Resources/Computing/test/Test_HTCondorCEComputingElement.py index b5acb6211b6..a84347b7d4f 100644 --- a/src/DIRAC/Resources/Computing/test/Test_HTCondorCEComputingElement.py +++ b/src/DIRAC/Resources/Computing/test/Test_HTCondorCEComputingElement.py @@ -12,14 +12,14 @@ MODNAME = "DIRAC.Resources.Computing.HTCondorCEComputingElement" STATUS_LINES = """ -123.2 5 -123.1 3 +123.2 5 4 0 undefined +123.1 3 undefined undefined undefined """.strip().split( "\n" ) HISTORY_LINES = """ -123 0 4 +123.0 4 undefined undefined undefined """.strip().split( "\n" ) @@ -31,46 +31,62 @@ def setUp(): def test_parseCondorStatus(): - statusLines = """ - 104097.9 2 - 104098.0 1 - 104098.1 4 + statusLines = f""" + 104098.1 1 undefined undefined undefined + 104098.2 2 undefined undefined undefined + 104098.3 3 undefined undefined undefined + 104098.4 4 undefined undefined undefined + 104098.5 5 16 57 Input data are being spooled + 104098.6 5 3 {Condor.HOLD_REASON_SUBCODE} Policy + 104098.7 5 1 0 undefined foo bar - 104098.2 3 - 104098.3 5 - 104098.4 7 + 104096.1 3 16 test test + 104096.2 3 test + 104096.3 5 undefined undefined undefined + 104096.4 7 """.strip().split( "\n" ) # force there to be an empty line expectedResults = { - "104097.9": "Running", - "104098.0": "Waiting", - "104098.1": "Done", - "104098.2": "Aborted", - "104098.3": "HELD", - "104098.4": "Unknown", + "104098.1": "Waiting", + "104098.2": "Running", + "104098.3": "Aborted", + "104098.4": "Done", + "104098.5": "Waiting", + "104098.6": "Failed", + "104098.7": "Aborted", + "foo": "Unknown", + "104096.1": "Aborted", + "104096.2": "Aborted", + "104096.3": "Aborted", + "104096.4": "Unknown", } for jobID, expected in expectedResults.items(): - assert HTCE.parseCondorStatus(statusLines, jobID) == expected + assert HTCE.parseCondorStatus(statusLines, jobID)[0] == expected def test_getJobStatus(mocker): """Test HTCondorCE getJobStatus""" mocker.patch( - MODNAME + ".HTCondorCEComputingElement._executeCondorCommand", + MODNAME + ".executeGridCommand", side_effect=[ S_OK((0, "\n".join(STATUS_LINES), "")), S_OK((0, "\n".join(HISTORY_LINES), "")), S_OK((0, "", "")), + S_OK((0, "", "")), ], ) mocker.patch(MODNAME + ".HTCondorCEComputingElement._HTCondorCEComputingElement__cleanup") + mocker.patch(MODNAME + ".HTCondorCEComputingElement._prepareProxy", return_value=S_OK()) htce = HTCE.HTCondorCEComputingElement(12345) + # Need to initialize proxy because it is required by executeCondorCommand() + htce.proxy = "dumb_proxy" + ret = htce.getJobStatus( [ "htcondorce://condorce.foo.arg/123.0:::abc321", @@ -86,7 +102,6 @@ def test_getJobStatus(mocker): "htcondorce://condorce.foo.arg/123.2": "Aborted", "htcondorce://condorce.foo.arg/333.3": "Unknown", } - assert ret["OK"] is True assert expectedResults == ret["Value"] @@ -102,7 +117,7 @@ def test_getJobStatusBatchSystem(mocker): expectedResults = { "123.0": "Done", "123.1": "Aborted", - "123.2": "Unknown", # HELD is treated as Unknown + "123.2": "Aborted", "333.3": "Unknown", } @@ -113,8 +128,8 @@ def test_getJobStatusBatchSystem(mocker): @pytest.mark.parametrize( "localSchedd, optionsNotExpected, optionsExpected", [ - (False, ["ShouldTransferFiles = YES", "WhenToTransferOutput = ON_EXIT_OR_EVICT"], ["universe = vanilla"]), - (True, [], ["ShouldTransferFiles = YES", "WhenToTransferOutput = ON_EXIT_OR_EVICT", "universe = grid"]), + (False, ["grid_resources = "], ["universe = vanilla"]), + (True, [], ["universe = grid"]), ], ) def test__writeSub(mocker, localSchedd, optionsNotExpected, optionsExpected): @@ -132,7 +147,7 @@ def test__writeSub(mocker, localSchedd, optionsNotExpected, optionsExpected): jobStamp = commonJobStampPart + uuid.uuid4().hex[:29] jobStamps.append(jobStamp) - htce._HTCondorCEComputingElement__writeSub("dirac-install", 42, "", 1, jobStamps) # pylint: disable=E1101 + htce._HTCondorCEComputingElement__writeSub("dirac-install", "", 1, jobStamps) # pylint: disable=E1101 for option in optionsNotExpected: # the three [0] are: call_args_list[firstCall][ArgsArgumentsTuple][FirstArgsArgument] assert option not in subFileMock.write.call_args_list[0][0][0] @@ -141,18 +156,18 @@ def test__writeSub(mocker, localSchedd, optionsNotExpected, optionsExpected): @pytest.mark.parametrize( - "localSchedd, expected", [(False, "-pool condorce.cern.ch:9619 -name condorce.cern.ch"), (True, "")] + "localSchedd, expected", [(False, "-pool condorce.cern.ch:9619 -name condorce.cern.ch "), (True, "")] ) def test_reset(setUp, localSchedd, expected): ceParameters = setUp htce = HTCE.HTCondorCEComputingElement(12345) htce.ceParameters = ceParameters - htce.useLocalSchedd = True + htce.useLocalSchedd = localSchedd ceName = "condorce.cern.ch" htce.ceName = ceName htce._reset() - assert htce.remoteScheddOptions == "" + assert htce.remoteScheddOptions == expected @pytest.mark.parametrize( @@ -167,12 +182,12 @@ def test_submitJob(setUp, mocker, localSchedd, expected): htce = HTCE.HTCondorCEComputingElement(12345) htce.ceParameters = ceParameters htce.useLocalSchedd = localSchedd + htce.proxy = "dumb_proxy" ceName = "condorce.cern.ch" htce.ceName = ceName - execMock = mocker.patch( - MODNAME + ".HTCondorCEComputingElement._executeCondorCommand", return_value=S_OK((0, "123.0 - 123.0", "")) - ) + execMock = mocker.patch(MODNAME + ".executeGridCommand", return_value=S_OK((0, "123.0 - 123.0", ""))) + mocker.patch(MODNAME + ".HTCondorCEComputingElement._prepareProxy", return_value=S_OK()) mocker.patch( MODNAME + ".HTCondorCEComputingElement._HTCondorCEComputingElement__writeSub", return_value="dirac_pilot" ) @@ -200,13 +215,13 @@ def test_killJob(setUp, mocker, jobIDList, jobID, ret, success, local): ceParameters = setUp htce = HTCE.HTCondorCEComputingElement(12345) htce.ceName = "condorce.foo.arg" + htce.proxy = "dumb_proxy" htce.useLocalSchedd = local htce.ceParameters = ceParameters htce._reset() - execMock = mocker.patch( - MODNAME + ".HTCondorCEComputingElement._executeCondorCommand", return_value=S_OK((ret, "", "")) - ) + execMock = mocker.patch(MODNAME + ".executeGridCommand", return_value=S_OK((ret, "", ""))) + mocker.patch(MODNAME + ".HTCondorCEComputingElement._prepareProxy", return_value=S_OK()) ret = htce.killJob(jobIDList=jobIDList) assert ret["OK"] == success