From 78b866bbe706f05749b88723b9f7bbaa37cc958a Mon Sep 17 00:00:00 2001 From: aldbr Date: Wed, 17 Jan 2024 14:23:32 +0100 Subject: [PATCH] sweep: #7397 fix: JobAgent rescheduling wrong jobs --- .../Agent/JobAgent.py | 7 +- .../Agent/test/Test_Agent_JobAgent.py | 74 ++++++++++++++++++- 2 files changed, 76 insertions(+), 5 deletions(-) diff --git a/src/DIRAC/WorkloadManagementSystem/Agent/JobAgent.py b/src/DIRAC/WorkloadManagementSystem/Agent/JobAgent.py index 2e7cef25aa8..b8518700a63 100755 --- a/src/DIRAC/WorkloadManagementSystem/Agent/JobAgent.py +++ b/src/DIRAC/WorkloadManagementSystem/Agent/JobAgent.py @@ -691,7 +691,10 @@ def _checkSubmittedJobs(self): submissionErrors = [] payloadErrors = [] originalJobID = self.jobReport.jobID - for jobID, taskID in self.submissionDict.items(): + # Loop over the jobIDs submitted to the CE + # Here we iterate over a copy of the keys because we are modifying the dictionary within the loop + for jobID in list(self.submissionDict.keys()): + taskID = self.submissionDict[jobID] if taskID not in self.computingElement.taskResults: continue @@ -728,7 +731,9 @@ def _checkSubmittedJobs(self): self.log.info(message) # Remove taskID from computingElement.taskResults as it has been treated + # Remove jobID from submissionDict as it has been treated del self.computingElement.taskResults[taskID] + del self.submissionDict[jobID] self.jobReport.setJob(originalJobID) return S_OK((submissionErrors, payloadErrors)) diff --git a/src/DIRAC/WorkloadManagementSystem/Agent/test/Test_Agent_JobAgent.py b/src/DIRAC/WorkloadManagementSystem/Agent/test/Test_Agent_JobAgent.py index 18ee0bbbd68..fdfb203c895 100644 --- a/src/DIRAC/WorkloadManagementSystem/Agent/test/Test_Agent_JobAgent.py +++ b/src/DIRAC/WorkloadManagementSystem/Agent/test/Test_Agent_JobAgent.py @@ -516,6 +516,7 @@ def test_submitAndCheckJob(mocker, localCE, job, expectedResult1, expectedResult jobAgent.log = gLogger.getSubLogger("JobAgent") jobAgent._initializeComputingElement(localCE) jobAgent.jobReport = JobReport(jobID) + jobAgent.jobSubmissionDelay = 3 # Submit a job result = jobAgent._submitJob( @@ -546,10 +547,6 @@ def test_submitAndCheckJob(mocker, localCE, job, expectedResult1, expectedResult assert result["OK"] assert result["Value"] == expectedResult1 - # Check that the job is still present in jobAgent.submissionDict - assert len(jobAgent.submissionDict) == 1 - assert jobID in jobAgent.submissionDict - # If the submission is synchronous jobAgent.computingElement.taskResults # should not contain the result anymore: already processed by checkSubmittedJobs if not jobAgent.computingElement.ceParameters.get("AsyncSubmission", False): @@ -575,5 +572,74 @@ def test_submitAndCheckJob(mocker, localCE, job, expectedResult1, expectedResult assert result["Value"] == expectedResult2 # From here, taskResults should be empty + assert len(jobAgent.computingElement.taskResults) == 0 + + +def test_submitAndCheck2Jobs(mocker): + """Test the submission and the management of the job status. + + This time, a first job is successfully submitted, but the second submission fails. + We want to make sure that both jobs are correctly managed. + """ + # Mock the JobAgent + mocker.patch("DIRAC.WorkloadManagementSystem.Agent.JobAgent.AgentModule.__init__") + mocker.patch("DIRAC.WorkloadManagementSystem.Agent.JobAgent.JobAgent.am_stopExecution") + mocker.patch("DIRAC.WorkloadManagementSystem.Agent.JobAgent.createJobWrapper", return_value=S_OK(["jobWrapper.py"])) + mocker.patch("DIRAC.Core.Security.X509Chain.X509Chain.dumpAllToString", return_value=S_OK()) + mocker.patch( + "DIRAC.Resources.Computing.InProcessComputingElement.InProcessComputingElement.submitJob", + side_effect=[S_OK(), S_ERROR("ComputingElement error")], + ) + + jobAgent = JobAgent("JobAgent", "Test") + jobAgent.log = gLogger.getSubLogger("JobAgent") + jobAgent._initializeComputingElement("InProcess") + jobAgent.ceName = "InProcess" + jobAgent.jobSubmissionDelay = 0 + + jobAgent.jobReport = JobReport(0) + mocker.patch.object(jobAgent, "jobReport", autospec=True) + mock_rescheduleFailedJob = mocker.patch.object(jobAgent, "_rescheduleFailedJob") + + # Submit a first job: should be successful + jobID = "123" + result = jobAgent._submitJob( + jobID=jobID, jobParams={}, resourceParams={}, optimizerParams={}, proxyChain=X509Chain() + ) + # Check that no error occurred during the submission process + # at the level of the JobAgent + assert result["OK"] + + # Check that the job was added to jobAgent.submissionDict + assert len(jobAgent.submissionDict) == 1 assert jobID in jobAgent.submissionDict + + # The submission is synchronous taskResults should already contain the result + assert len(jobAgent.computingElement.taskResults) == 1 + + # Check errors that could have occurred in the innerCE + result = jobAgent._checkSubmittedJobs() + assert result["OK"] + assert result["Value"] == ([], []) + + mock_rescheduleFailedJob.assert_not_called() + + # Submit a second job: should fail + jobID = "456" + result = jobAgent._submitJob( + jobID=jobID, jobParams={}, resourceParams={}, optimizerParams={}, proxyChain=X509Chain() + ) + # Check that no error occurred during the submission process + # at the level of the JobAgent + assert result["OK"] + + # Check errors that could have occurred in the innerCE + result = jobAgent._checkSubmittedJobs() + assert result["OK"] + assert result["Value"] == (["ComputingElement error"], []) + + # Make sure that the correct job is rescheduled + mock_rescheduleFailedJob.assert_called_with(jobID, "ComputingElement error") + + # From here, taskResults should be empty assert len(jobAgent.computingElement.taskResults) == 0