Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[8.0] fix: JobAgent rescheduling wrong jobs #7397

Merged
merged 2 commits into from
Jan 17, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 6 additions & 1 deletion src/DIRAC/WorkloadManagementSystem/Agent/JobAgent.py
Original file line number Diff line number Diff line change
Expand Up @@ -694,7 +694,10 @@ def _checkSubmittedJobs(self):
submissionErrors = []
payloadErrors = []
originalJobID = self.jobReport.jobID
for jobID, taskID in self.submissionDict.items():
# Loop over the jobIDs submitted to the CE
# Here we iterate over a copy of the keys because we are modifying the dictionary within the loop
for jobID in list(self.submissionDict.keys()):
taskID = self.submissionDict[jobID]
if taskID not in self.computingElement.taskResults:
continue

Expand Down Expand Up @@ -731,7 +734,9 @@ def _checkSubmittedJobs(self):
self.log.info(message)

# Remove taskID from computingElement.taskResults as it has been treated
# Remove jobID from submissionDict as it has been treated
del self.computingElement.taskResults[taskID]
del self.submissionDict[jobID]

self.jobReport.setJob(originalJobID)
return S_OK((submissionErrors, payloadErrors))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -517,6 +517,7 @@ def test_submitAndCheckJob(mocker, localCE, job, expectedResult1, expectedResult
jobAgent.log = gLogger.getSubLogger("JobAgent")
jobAgent._initializeComputingElement(localCE)
jobAgent.jobReport = JobReport(jobID)
jobAgent.jobSubmissionDelay = 3

# Submit a job
result = jobAgent._submitJob(
Expand Down Expand Up @@ -547,10 +548,6 @@ def test_submitAndCheckJob(mocker, localCE, job, expectedResult1, expectedResult
assert result["OK"]
assert result["Value"] == expectedResult1

# Check that the job is still present in jobAgent.submissionDict
assert len(jobAgent.submissionDict) == 1
assert jobID in jobAgent.submissionDict

# If the submission is synchronous jobAgent.computingElement.taskResults
# should not contain the result anymore: already processed by checkSubmittedJobs
if not jobAgent.computingElement.ceParameters.get("AsyncSubmission", False):
Expand All @@ -576,5 +573,74 @@ def test_submitAndCheckJob(mocker, localCE, job, expectedResult1, expectedResult
assert result["Value"] == expectedResult2

# From here, taskResults should be empty
assert len(jobAgent.computingElement.taskResults) == 0


def test_submitAndCheck2Jobs(mocker):
"""Test the submission and the management of the job status.

This time, a first job is successfully submitted, but the second submission fails.
We want to make sure that both jobs are correctly managed.
"""
# Mock the JobAgent
mocker.patch("DIRAC.WorkloadManagementSystem.Agent.JobAgent.AgentModule.__init__")
mocker.patch("DIRAC.WorkloadManagementSystem.Agent.JobAgent.JobAgent.am_stopExecution")
mocker.patch("DIRAC.WorkloadManagementSystem.Agent.JobAgent.createJobWrapper", return_value=S_OK(["jobWrapper.py"]))
mocker.patch("DIRAC.Core.Security.X509Chain.X509Chain.dumpAllToString", return_value=S_OK())
mocker.patch(
"DIRAC.Resources.Computing.InProcessComputingElement.InProcessComputingElement.submitJob",
side_effect=[S_OK(), S_ERROR("ComputingElement error")],
)

jobAgent = JobAgent("JobAgent", "Test")
jobAgent.log = gLogger.getSubLogger("JobAgent")
jobAgent._initializeComputingElement("InProcess")
jobAgent.ceName = "InProcess"
jobAgent.jobSubmissionDelay = 0

jobAgent.jobReport = JobReport(0)
mocker.patch.object(jobAgent, "jobReport", autospec=True)
mock_rescheduleFailedJob = mocker.patch.object(jobAgent, "_rescheduleFailedJob")

# Submit a first job: should be successful
jobID = "123"
result = jobAgent._submitJob(
jobID=jobID, jobParams={}, resourceParams={}, optimizerParams={}, proxyChain=X509Chain()
)
# Check that no error occurred during the submission process
# at the level of the JobAgent
assert result["OK"]

# Check that the job was added to jobAgent.submissionDict
assert len(jobAgent.submissionDict) == 1
assert jobID in jobAgent.submissionDict

# The submission is synchronous taskResults should already contain the result
assert len(jobAgent.computingElement.taskResults) == 1

# Check errors that could have occurred in the innerCE
result = jobAgent._checkSubmittedJobs()
assert result["OK"]
assert result["Value"] == ([], [])

mock_rescheduleFailedJob.assert_not_called()

# Submit a second job: should fail
jobID = "456"
result = jobAgent._submitJob(
jobID=jobID, jobParams={}, resourceParams={}, optimizerParams={}, proxyChain=X509Chain()
)
# Check that no error occurred during the submission process
# at the level of the JobAgent
assert result["OK"]

# Check errors that could have occurred in the innerCE
result = jobAgent._checkSubmittedJobs()
assert result["OK"]
assert result["Value"] == (["ComputingElement error"], [])

# Make sure that the correct job is rescheduled
mock_rescheduleFailedJob.assert_called_with(jobID, "ComputingElement error")

# From here, taskResults should be empty
assert len(jobAgent.computingElement.taskResults) == 0
Loading