From f9fd4d59e9a319fab3d4c2bf27c0da70fd002cd2 Mon Sep 17 00:00:00 2001 From: aldbr Date: Fri, 23 Jun 2023 08:50:08 +0200 Subject: [PATCH] fix: HTCondor tests + a bit of refactoring --- .../Computing/BatchSystems/Condor.py | 11 +- .../Computing/HTCondorCEComputingElement.py | 159 +++++++++--------- .../test/Test_HTCondorCEComputingElement.py | 56 +++--- 3 files changed, 119 insertions(+), 107 deletions(-) diff --git a/src/DIRAC/Resources/Computing/BatchSystems/Condor.py b/src/DIRAC/Resources/Computing/BatchSystems/Condor.py index 71526d7c486..c39180a3566 100644 --- a/src/DIRAC/Resources/Computing/BatchSystems/Condor.py +++ b/src/DIRAC/Resources/Computing/BatchSystems/Condor.py @@ -70,8 +70,8 @@ on_exit_hold = ExitCode != 0 # A random subcode to identify who put the job on hold on_exit_hold_subcode = %(holdReasonSubcode)s -# Jobs are then deleted from the system after N days -period_remove = (time() - EnteredCurrentStatus) > (%(daysToKeepRemoteLogs)s * 24 * 3600) +# Jobs are then deleted from the system after N days if they are not running +period_remove = (JobStatus != 2) && (time() - EnteredCurrentStatus) > (%(daysToKeepRemoteLogs)s * 24 * 3600) # Specific options # ---------------- @@ -105,9 +105,10 @@ def parseCondorStatus(lines, jobID): # A job can be held for many various reasons, we need to further investigate with the holdReasonCode & holdReasonSubCode # Details in: # https://htcondor.readthedocs.io/en/latest/classad-attributes/job-classad-attributes.html#HoldReasonCode - - # By default, a held (5) job is defined as Aborted, but there might be some exceptions if status == 5: + + # By default, a held (5) job is defined as Aborted, but there might be some exceptions + status = 3 try: holdReasonCode = int(l[2]) holdReasonSubcode = int(l[3]) @@ -124,7 +125,7 @@ def parseCondorStatus(lines, jobID): if holdReasonCode == 3 and holdReasonSubcode == HOLD_REASON_SUBCODE: status = 5 # If holdReasonCode is 16 (Input files are being spooled), the job should be marked as Waiting - if holdReasonCode == 16: + elif holdReasonCode == 16: status = 1 return (STATES_MAP.get(status, "Unknown"), holdReason) diff --git a/src/DIRAC/Resources/Computing/HTCondorCEComputingElement.py b/src/DIRAC/Resources/Computing/HTCondorCEComputingElement.py index d5ded02126e..ac248fb97c6 100644 --- a/src/DIRAC/Resources/Computing/HTCondorCEComputingElement.py +++ b/src/DIRAC/Resources/Computing/HTCondorCEComputingElement.py @@ -71,50 +71,6 @@ DEFAULT_DAYSTOKEEPLOGS = 15 -def logDir(ceName, stamp): - """Return path to log and output files for pilot. - - :param str ceName: Name of the CE - :param str stamp: pilot stamp from/for jobRef - """ - return os.path.join(ceName, stamp[0], stamp[1:3]) - - -def condorIDAndPathToResultFromJobRef(jobRef): - """Extract tuple of jobURL and jobID from the jobRef string. - The condorID as well as the path leading to the job results are also extracted from the jobID. - - :param str jobRef: PilotJobReference of the following form: ``htcondorce:///:::`` - - :return: tuple composed of the jobURL, the path to the job results and the condorID of the given jobRef - """ - splits = jobRef.split(":::") - jobURL = splits[0] - stamp = splits[1] if len(splits) > 1 else "" - _, _, ceName, condorID = jobURL.split("/") - - # Reconstruct the path leading to the result (log, output) - # Construction of the path can be found in submitJob() - pathToResult = logDir(ceName, stamp) if len(stamp) >= 3 else "" - - return jobURL, pathToResult, condorID - - -def findFile(workingDir, fileName, pathToResult): - """Find a file in a file system. - - :param str workingDir: the name of the directory containing the given file to search for - :param str fileName: the name of the file to find - :param str pathToResult: the path to follow from workingDir to find the file - - :return: path leading to the file - """ - path = os.path.join(workingDir, pathToResult, fileName) - if os.path.exists(path): - return S_OK(path) - return S_ERROR(errno.ENOENT, f"Could not find {path}") - - class HTCondorCEComputingElement(ComputingElement): """HTCondorCE computing element class implementing the functions jobSubmit, getJobOutput @@ -150,6 +106,45 @@ def __init__(self, ceUniqueID): self.tokenFile = None ############################################################################# + + def _DiracToCondorID(self, diracJobID): + """Convert a DIRAC jobID into an Condor jobID. + Example: https:///1234/0 becomes 1234.0 + + :param str: DIRAC jobID + :return: Condor jobID + """ + # Remove CE and protocol information from arc Job ID + if "://" in diracJobID: + condorJobID = diracJobID.split("/")[-1] + return condorJobID + return diracJobID + + def _condorToDiracID(self, condorJobIDs): + """Get the references from the condor_submit output. + Cluster ids look like " 107.0 - 107.0 " or " 107.0 - 107.4 " + + :param str condorJobIDs: the output of condor_submit + + :return: job references such as htcondorce:///. + """ + clusterIDs = condorJobIDs.split("-") + if len(clusterIDs) != 2: + return S_ERROR(f"Something wrong with the condor_submit output: {condorJobIDs}") + clusterIDs = [clu.strip() for clu in clusterIDs] + self.log.verbose("Cluster IDs parsed:", clusterIDs) + try: + clusterID = clusterIDs[0].split(".")[0] + numJobs = clusterIDs[1].split(".")[1] + except IndexError: + return S_ERROR(f"Something wrong with the condor_submit output: {condorJobIDs}") + + cePrefix = f"htcondorce://{self.ceName}/" + jobReferences = [f"{cePrefix}{clusterID}.{i}" for i in range(int(numJobs) + 1)] + return S_OK(jobReferences) + + ############################################################################# + def __writeSub(self, executable, location, processors, pilotStamps, tokenFile=None): """Create the Sub File for submission. @@ -288,7 +283,7 @@ def submitJob(self, executableFile, proxy, numberOfJobs=1): jobStamps.append(jobStamp) # We randomize the location of the pilot output and log, because there are just too many of them - location = logDir(self.ceName, commonJobStampPart) + location = os.path.join(self.ceName, commonJobStampPart[0], commonJobStampPart[1:3]) nProcessors = self.ceParameters.get("NumberOfProcessors", 1) if self.token: self.tokenFile = tempfile.NamedTemporaryFile( @@ -311,7 +306,7 @@ def submitJob(self, executableFile, proxy, numberOfJobs=1): return result stdout = result["Value"] - pilotJobReferences = self.__getPilotReferences(stdout) + pilotJobReferences = self._condorToDiracID(stdout) if not pilotJobReferences["OK"]: return pilotJobReferences pilotJobReferences = pilotJobReferences["Value"] @@ -339,15 +334,15 @@ def killJob(self, jobIDList): self.log.verbose("KillJob jobIDList", jobIDList) self.tokenFile = None - for jobRef in jobIDList: - job, _, jobID = condorIDAndPathToResultFromJobRef(jobRef) - self.log.verbose("Killing pilot", job) + for diracJobID in jobIDList: + condorJobID = self._DiracToCondorID(diracJobID.split(":::")[0]) + self.log.verbose("Killing pilot", diracJobID) cmd = ["condor_rm"] cmd.extend(self.remoteScheddOptions.strip().split(" ")) - cmd.append(jobID) + cmd.append(condorJobID) result = self._executeCondorCommand(cmd, keepTokenFile=True) if not result["OK"]: - self.log.error("Failed to kill pilot", f"{job}: {result['Message']}") + self.log.error("Failed to kill pilot", f"{diracJobID}: {result['Message']}") return result self.tokenFile = None @@ -396,9 +391,10 @@ def getJobStatus(self, jobIDList): resultDict = {} condorIDs = {} # Get all condorIDs so we can just call condor_q and condor_history once - for jobRef in jobIDList: - job, _, jobID = condorIDAndPathToResultFromJobRef(jobRef) - condorIDs[job] = jobID + for diracJobID in jobIDList: + diracJobID = diracJobID.split(":::")[0] + condorJobID = self._DiracToCondorID(diracJobID) + condorIDs[diracJobID] = condorJobID self.tokenFile = None @@ -471,13 +467,39 @@ def getJobOutput(self, jobID): return S_OK((result["Value"]["output"], result["Value"]["error"])) + def _findFile(self, workingDir, fileName, pathToResult): + """Find a file in a file system. + + :param str workingDir: the name of the directory containing the given file to search for + :param str fileName: the name of the file to find + :param str pathToResult: the path to follow from workingDir to find the file + + :return: path leading to the file + """ + path = os.path.join(workingDir, pathToResult, fileName) + if os.path.exists(path): + return S_OK(path) + return S_ERROR(errno.ENOENT, f"Could not find {path}") + def __getJobOutput(self, jobID, outTypes): """Get job outputs: output, error and logging files from HTCondor :param str jobID: job identifier :param list outTypes: output types targeted (output, error and/or logging) """ - _job, pathToResult, condorID = condorIDAndPathToResultFromJobRef(jobID) + # Extract stamp from the Job ID + if ":::" in jobID: + diracJobID, stamp = jobID.split(":::") + else: + return S_ERROR(f"DIRAC stamp not defined for {jobID}") + + # Reconstruct the path leading to the result (log, output) + # Construction of the path can be found in submitJob() + if len(stamp) < 3: + return S_ERROR(f"Stamp is not long enough: {stamp}") + pathToResult = os.path.join(self.ceName, stamp[0], stamp[1:3]) + + condorJobID = self._DiracToCondorID(diracJobID) iwd = os.path.join(self.workingDirectory, pathToResult) try: @@ -488,7 +510,7 @@ def __getJobOutput(self, jobID, outTypes): return S_ERROR(e.errno, f"{errorMessage} ({iwd})") if not self.useLocalSchedd: - cmd = ["condor_transfer_data", "-pool", f"{self.ceName}:9619", "-name", self.ceName, condorID] + cmd = ["condor_transfer_data", "-pool", f"{self.ceName}:9619", "-name", self.ceName, condorJobID] result = self._executeCondorCommand(cmd) # Getting 'logging' without 'error' and 'output' is possible but will generate command errors @@ -500,7 +522,7 @@ def __getJobOutput(self, jobID, outTypes): outputsSuffix = {"output": "out", "error": "err", "logging": "log"} outputs = {} for output, suffix in outputsSuffix.items(): - resOut = findFile(self.workingDirectory, f"{condorID}.{suffix}", pathToResult) + resOut = self._findFile(self.workingDirectory, f"{condorJobID}.{suffix}", pathToResult) if not resOut["OK"]: # Return an error if the output type was targeted, else we continue if output in outTypes: @@ -523,29 +545,6 @@ def __getJobOutput(self, jobID, outTypes): return S_OK(outputs) - def __getPilotReferences(self, jobString): - """Get the references from the condor_submit output. - Cluster ids look like " 107.0 - 107.0 " or " 107.0 - 107.4 " - - :param str jobString: the output of condor_submit - - :return: job references such as htcondorce:///-. - """ - self.log.verbose("getPilotReferences:", jobString) - clusterIDs = jobString.split("-") - if len(clusterIDs) != 2: - return S_ERROR(f"Something wrong with the condor_submit output: {jobString}") - clusterIDs = [clu.strip() for clu in clusterIDs] - self.log.verbose("Cluster IDs parsed:", clusterIDs) - try: - clusterID = clusterIDs[0].split(".")[0] - numJobs = clusterIDs[1].split(".")[1] - except IndexError: - return S_ERROR(f"Something wrong with the condor_submit output: {jobString}") - cePrefix = f"htcondorce://{self.ceName}/" - jobReferences = [f"{cePrefix}{clusterID}.{i}" for i in range(int(numJobs) + 1)] - return S_OK(jobReferences) - def __cleanup(self): """Clean the working directory of old jobs""" if not HTCondorCEComputingElement._cleanupLock.acquire(False): diff --git a/src/DIRAC/Resources/Computing/test/Test_HTCondorCEComputingElement.py b/src/DIRAC/Resources/Computing/test/Test_HTCondorCEComputingElement.py index 70988dd109f..51828127629 100644 --- a/src/DIRAC/Resources/Computing/test/Test_HTCondorCEComputingElement.py +++ b/src/DIRAC/Resources/Computing/test/Test_HTCondorCEComputingElement.py @@ -12,14 +12,14 @@ MODNAME = "DIRAC.Resources.Computing.HTCondorCEComputingElement" STATUS_LINES = """ -123.2 5 -123.1 3 +123.2 5 4 0 undefined +123.1 3 undefined undefined undefined """.strip().split( "\n" ) HISTORY_LINES = """ -123 0 4 +123.0 4 undefined undefined undefined """.strip().split( "\n" ) @@ -31,31 +31,43 @@ def setUp(): def test_parseCondorStatus(): - statusLines = """ - 104097.9 2 - 104098.0 1 - 104098.1 4 + statusLines = f""" + 104098.1 1 undefined undefined undefined + 104098.2 2 undefined undefined undefined + 104098.3 3 undefined undefined undefined + 104098.4 4 undefined undefined undefined + 104098.5 5 16 57 Input data are being spooled + 104098.6 5 3 {Condor.HOLD_REASON_SUBCODE} Policy + 104098.7 5 1 0 undefined foo bar - 104098.2 3 - 104098.3 5 - 104098.4 7 + 104096.1 3 16 test test + 104096.2 3 test + 104096.3 5 undefined undefined undefined + 104096.4 7 """.strip().split( "\n" ) # force there to be an empty line expectedResults = { - "104097.9": "Running", - "104098.0": "Waiting", - "104098.1": "Done", - "104098.2": "Aborted", - "104098.3": "HELD", - "104098.4": "Unknown", + "104098.1": "Waiting", + "104098.2": "Running", + "104098.3": "Aborted", + "104098.4": "Done", + "104098.5": "Waiting", + "104098.6": "Failed", + "104098.7": "Aborted", + "foo": "Unknown", + "104096.1": "Aborted", + "104096.2": "Aborted", + "104096.3": "Unknown", + "104096.4": "Unknown", } for jobID, expected in expectedResults.items(): - assert HTCE.parseCondorStatus(statusLines, jobID) == expected + print(jobID, expected) + assert HTCE.parseCondorStatus(statusLines, jobID)[0] == expected def test_getJobStatus(mocker): @@ -66,6 +78,7 @@ def test_getJobStatus(mocker): S_OK((0, "\n".join(STATUS_LINES), "")), S_OK((0, "\n".join(HISTORY_LINES), "")), S_OK((0, "", "")), + S_OK((0, "", "")), ], ) mocker.patch(MODNAME + ".HTCondorCEComputingElement._HTCondorCEComputingElement__cleanup") @@ -86,7 +99,6 @@ def test_getJobStatus(mocker): "htcondorce://condorce.foo.arg/123.2": "Aborted", "htcondorce://condorce.foo.arg/333.3": "Unknown", } - assert ret["OK"] is True assert expectedResults == ret["Value"] @@ -102,7 +114,7 @@ def test_getJobStatusBatchSystem(mocker): expectedResults = { "123.0": "Done", "123.1": "Aborted", - "123.2": "Unknown", # HELD is treated as Unknown + "123.2": "Aborted", "333.3": "Unknown", } @@ -113,8 +125,8 @@ def test_getJobStatusBatchSystem(mocker): @pytest.mark.parametrize( "localSchedd, optionsNotExpected, optionsExpected", [ - (False, ["ShouldTransferFiles = YES", "WhenToTransferOutput = ON_EXIT_OR_EVICT"], ["universe = vanilla"]), - (True, [], ["ShouldTransferFiles = YES", "WhenToTransferOutput = ON_EXIT_OR_EVICT", "universe = grid"]), + (False, ["grid_resources = "], ["universe = vanilla"]), + (True, [], ["universe = grid"]), ], ) def test__writeSub(mocker, localSchedd, optionsNotExpected, optionsExpected): @@ -132,7 +144,7 @@ def test__writeSub(mocker, localSchedd, optionsNotExpected, optionsExpected): jobStamp = commonJobStampPart + uuid.uuid4().hex[:29] jobStamps.append(jobStamp) - htce._HTCondorCEComputingElement__writeSub("dirac-install", 42, "", 1, jobStamps) # pylint: disable=E1101 + htce._HTCondorCEComputingElement__writeSub("dirac-install", "", 1, jobStamps) # pylint: disable=E1101 for option in optionsNotExpected: # the three [0] are: call_args_list[firstCall][ArgsArgumentsTuple][FirstArgsArgument] assert option not in subFileMock.write.call_args_list[0][0][0]