From 47544399e9e9c4ce6ce73b0cabeb7717695c696f Mon Sep 17 00:00:00 2001 From: Federico Stagni Date: Wed, 22 Nov 2023 17:09:11 +0100 Subject: [PATCH] fix: sets jobStatus=Failed/Payload failed iff the job was running --- src/DIRAC/WorkloadManagementSystem/Agent/JobAgent.py | 10 ++++++++-- .../Agent/test/Test_Agent_JobAgent.py | 6 ++++-- 2 files changed, 12 insertions(+), 4 deletions(-) diff --git a/src/DIRAC/WorkloadManagementSystem/Agent/JobAgent.py b/src/DIRAC/WorkloadManagementSystem/Agent/JobAgent.py index a568bc957d3..be86ab2001d 100755 --- a/src/DIRAC/WorkloadManagementSystem/Agent/JobAgent.py +++ b/src/DIRAC/WorkloadManagementSystem/Agent/JobAgent.py @@ -32,6 +32,7 @@ from DIRAC.WorkloadManagementSystem.Client.MatcherClient import MatcherClient from DIRAC.WorkloadManagementSystem.Client.PilotManagerClient import PilotManagerClient from DIRAC.WorkloadManagementSystem.Client.JobManagerClient import JobManagerClient +from DIRAC.WorkloadManagementSystem.Client.JobMonitoringClient import JobMonitoringClient from DIRAC.WorkloadManagementSystem.Client.JobStateUpdateClient import JobStateUpdateClient from DIRAC.WorkloadManagementSystem.Client.JobReport import JobReport from DIRAC.WorkloadManagementSystem.Client import JobStatus @@ -691,7 +692,7 @@ def _checkSubmittedJobs(self): payloadErrors = [] originalJobID = self.jobReport.jobID for jobID, taskID in self.submissionDict.items(): - if not taskID in self.computingElement.taskResults: + if taskID not in self.computingElement.taskResults: continue result = self.computingElement.taskResults[taskID] @@ -714,7 +715,12 @@ def _checkSubmittedJobs(self): # The payload failed (if result["Value"] is not 0) elif result["Value"]: - self.jobReport.setJobStatus(status=JobStatus.FAILED, minorStatus="Payload failed") + # In order to avoid overriding perfectly valid states, the status is updated iff the job was running + res = JobMonitoringClient().getJobsStatus(jobID) + if not res["OK"]: + return res + if res["Value"][jobID]["Status"] == JobStatus.RUNNING: + self.jobReport.setJobStatus(status=JobStatus.FAILED, minorStatus="Payload failed") # Do not keep running and do not overwrite the Payload error message = f"Payload execution failed with error code {result['Value']}" diff --git a/src/DIRAC/WorkloadManagementSystem/Agent/test/Test_Agent_JobAgent.py b/src/DIRAC/WorkloadManagementSystem/Agent/test/Test_Agent_JobAgent.py index 274390d0745..b47e6e8ec96 100644 --- a/src/DIRAC/WorkloadManagementSystem/Agent/test/Test_Agent_JobAgent.py +++ b/src/DIRAC/WorkloadManagementSystem/Agent/test/Test_Agent_JobAgent.py @@ -3,6 +3,7 @@ import os import pytest import time +from unittest.mock import MagicMock from DIRAC import gLogger, S_OK, S_ERROR from DIRAC.Core.Security.X509Chain import X509Chain # pylint: disable=import-error @@ -484,9 +485,9 @@ def test_submitJob(mocker, mockJWInput, expected): # Async submission of a failed job, first time the job has not failed yet, second time it is failed ("Pool/InProcess", badJobScript, ([], []), ([], ["Payload execution failed with error code 5"])), # Sync submission, should fail because of a problem in the Singularity CE - ("Singularity", jobScript % "1", (["Failed to find singularity"], []), ([], [])), + ("Singularity", jobScript % "1", (["Failed to find singularity image to exec"], []), ([], [])), # Async submission, should fail because of a problem in the Singularity CE - ("Pool/Singularity", jobScript % "1", (["Failed to find singularity"], []), ([], [])), + ("Pool/Singularity", jobScript % "1", (["Failed to find singularity image to exec"], []), ([], [])), ], ) def test_submitAndCheckJob(mocker, localCE, job, expectedResult1, expectedResult2): @@ -498,6 +499,7 @@ def test_submitAndCheckJob(mocker, localCE, job, expectedResult1, expectedResult mocker.patch("DIRAC.WorkloadManagementSystem.Agent.JobAgent.AgentModule.__init__") mocker.patch("DIRAC.WorkloadManagementSystem.Agent.JobAgent.JobAgent.am_stopExecution") + mocker.patch("DIRAC.WorkloadManagementSystem.Agent.JobAgent.JobMonitoringClient", return_value=MagicMock()) mocker.patch("DIRAC.WorkloadManagementSystem.Agent.JobAgent.createJobWrapper", return_value=S_OK([jobName])) mocker.patch("DIRAC.Core.Security.X509Chain.X509Chain.dumpAllToString", return_value=S_OK())