diff --git a/supportedBackends/tes/src/main/scala/cromwell/backend/impl/tes/TesAsyncBackendJobExecutionActor.scala b/supportedBackends/tes/src/main/scala/cromwell/backend/impl/tes/TesAsyncBackendJobExecutionActor.scala index 5f7fc85e793..86c51863782 100644 --- a/supportedBackends/tes/src/main/scala/cromwell/backend/impl/tes/TesAsyncBackendJobExecutionActor.scala +++ b/supportedBackends/tes/src/main/scala/cromwell/backend/impl/tes/TesAsyncBackendJobExecutionActor.scala @@ -29,40 +29,34 @@ import scala.concurrent.duration._ import scala.language.postfixOps import scala.util.{Failure, Success} -private object TesTaskStatusType extends Enumeration { - type TesTaskStatusType = Value - val Running, Complete, Cancelled, FailedOrError = Value -} - sealed trait TesRunStatus { - var status: String = "" def isTerminal: Boolean - var sysLogs: Seq[String] = Seq.empty[String]; - - def setStatus(newStatus: String): Unit = { status = newStatus } - def setLogs(logs: Seq[String]): Unit = { sysLogs = logs } - - override def toString: String = status + def sysLogs: Seq[String] } case object Running extends TesRunStatus { def isTerminal = false - status = Running.toString() + def sysLogs = Seq.empty[String] } case object Complete extends TesRunStatus { def isTerminal = true - status = Complete.toString() + def sysLogs = Seq.empty[String] +} + +case class Error(sysLogs: Seq[String]) extends TesRunStatus { + def isTerminal = true + override def toString = "SYSTEM_ERROR" } -case object FailedOrError extends TesRunStatus { +case class Failed(sysLogs: Seq[String]) extends TesRunStatus { def isTerminal = true - status = FailedOrError.toString() + override def toString = "EXECUTOR_ERROR" } case object Cancelled extends TesRunStatus { def isTerminal = true - status = Cancelled.toString() + def sysLogs = Seq.empty[String] } object TesAsyncBackendJobExecutionActor { @@ -78,7 +72,7 @@ class TesAsyncBackendJobExecutionActor(override val standardParams: StandardAsyn override type StandardAsyncRunState = TesRunStatus - override def statusEquivalentTo(thiz: StandardAsyncRunState)(that: StandardAsyncRunState): Boolean = thiz.status == that.status + def statusEquivalentTo(thiz: StandardAsyncRunState)(that: StandardAsyncRunState): Boolean = thiz == that override lazy val pollBackOff = SimpleExponentialBackoff( initialInterval = 1 seconds, @@ -234,12 +228,19 @@ class TesAsyncBackendJobExecutionActor(override val standardParams: StandardAsyn override def requestsAbortAndDiesImmediately: Boolean = false override def pollStatusAsync(handle: StandardAsyncPendingExecutionHandle): Future[TesRunStatus] = { - queryStatusAsync(handle).flatMap { status => - status match { - case FailedOrError => addSystemLogsToStatusAsync(status, handle) - case _ => Future { status } + for { + status <- queryStatusAsync(handle) + errorLog <- status match { + case Error(_) => getErrorLogs(handle) + case Failed(_) => getErrorLogs(handle) + case _ => Future.successful(Seq.empty[String]) } - } + statusWithLog = status match { + case Error(_) => Error(errorLog) + case Failed(_) => Error(errorLog) + case _ => status + } + } yield statusWithLog } private def queryStatusAsync(handle: StandardAsyncPendingExecutionHandle): Future[TesRunStatus] = { @@ -255,20 +256,22 @@ class TesAsyncBackendJobExecutionActor(override val standardParams: StandardAsyn jobLogger.info(s"Job ${handle.pendingJob.jobId} was canceled") Cancelled - case s if s.contains("ERROR") => + case s if s.contains("EXECUTOR_ERROR") => + jobLogger.info(s"TES reported an error for Job ${handle.pendingJob.jobId}: '$s'") + Failed(Seq.empty[String]) + + case s if s.contains("SYSTEM_ERROR") => jobLogger.info(s"TES reported an error for Job ${handle.pendingJob.jobId}: '$s'") - FailedOrError + Error(Seq.empty[String]) case _ => Running } } } - private def addSystemLogsToStatusAsync(status: TesRunStatus, handle: StandardAsyncPendingExecutionHandle): Future[TesRunStatus] = { + private def getErrorLogs(handle: StandardAsyncPendingExecutionHandle): Future[Seq[String]] = { makeRequest[Task](HttpRequest(uri = s"$tesEndpoint/${handle.pendingJob.jobId}?view=FULL")) map { response => - status.setStatus(response.state.getOrElse(status.status)) - status.setLogs(response.logs.last(0).system_logs.getOrElse(Seq.empty[String])) - status + response.logs.flatMap(_.lastOption).flatMap(_.system_logs).getOrElse(Seq.empty[String]) } } @@ -279,14 +282,15 @@ class TesAsyncBackendJobExecutionActor(override val standardParams: StandardAsyn } private def handleExecutionError(status: TesRunStatus, returnCode: Option[Int]): Future[ExecutionHandle] = { - val exception = new AggregatedMessageException(s"Task ${jobDescriptor.key.tag} failed for unknown reason: ${status.status}", status.sysLogs) + val exception = new AggregatedMessageException(s"Task ${jobDescriptor.key.tag} failed for unknown reason: ${status.toString()}", status.sysLogs) Future.successful(FailedNonRetryableExecutionHandle(exception, returnCode, None)) } override def handleExecutionFailure(status: StandardAsyncRunState, returnCode: Option[Int]) = { status match { case Cancelled => Future.successful(AbortedExecutionHandle) - case FailedOrError => handleExecutionError(status, returnCode) + case Error(_) => handleExecutionError(status, returnCode) + case Failed(_) => handleExecutionError(status, returnCode) case _ => super.handleExecutionFailure(status, returnCode) } }