Skip to content

Commit

Permalink
WX-876 Surface TES System Logs to Cromwell when TES backend returns t…
Browse files Browse the repository at this point in the history
…ask error status (#6980)

* WX-876 Surface TES System Logs to Cromwell when TES backend returns task error status

* Address feedback

* Address feedback (#6997)

* Address additional feedback (#7000)

* Fix copy/paste error (#7005)

* Address additional feedback

* Fix copy/paste error

* Trigger CI

---------

Co-authored-by: Blair Murri <[email protected]>
Co-authored-by: Janet Gainer-Dewar <[email protected]>
3 people authored Feb 9, 2023

Partially verified

This commit was created on GitHub.com and signed with GitHub’s verified signature. The key has expired.
We cannot verify signatures from co-authors, and some of the co-authors attributed to this commit require their commits to be signed.
1 parent 092059f commit 417adfa
Showing 1 changed file with 42 additions and 3 deletions.
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package cromwell.backend.impl.tes

import common.exception.AggregatedMessageException
import java.io.FileNotFoundException
import java.nio.file.FileAlreadyExistsException
import cats.syntax.apply._
@@ -30,6 +31,7 @@ import scala.util.{Failure, Success}

sealed trait TesRunStatus {
def isTerminal: Boolean
def sysLogs: Seq[String] = Seq.empty[String]
}

case object Running extends TesRunStatus {
@@ -40,8 +42,14 @@ case object Complete extends TesRunStatus {
def isTerminal = true
}

case object FailedOrError extends TesRunStatus {
case class Error(override val sysLogs: Seq[String] = Seq.empty[String]) extends TesRunStatus {
def isTerminal = true
override def toString = "SYSTEM_ERROR"
}

case class Failed(override val sysLogs: Seq[String] = Seq.empty[String]) extends TesRunStatus {
def isTerminal = true
override def toString = "EXECUTOR_ERROR"
}

case object Cancelled extends TesRunStatus {
@@ -217,6 +225,21 @@ class TesAsyncBackendJobExecutionActor(override val standardParams: StandardAsyn
override def requestsAbortAndDiesImmediately: Boolean = false

override def pollStatusAsync(handle: StandardAsyncPendingExecutionHandle): Future[TesRunStatus] = {
for {
status <- queryStatusAsync(handle)
errorLog <- status match {
case Error(_) | Failed(_) => getErrorLogs(handle)
case _ => Future.successful(Seq.empty[String])
}
statusWithLog = status match {
case Error(_) => Error(errorLog)
case Failed(_) => Failed(errorLog)
case _ => status
}
} yield statusWithLog
}

private def queryStatusAsync(handle: StandardAsyncPendingExecutionHandle): Future[TesRunStatus] = {
makeRequest[MinimalTaskView](HttpRequest(uri = s"$tesEndpoint/${handle.pendingJob.jobId}?view=MINIMAL")) map {
response =>
val state = response.state
@@ -229,24 +252,40 @@ class TesAsyncBackendJobExecutionActor(override val standardParams: StandardAsyn
jobLogger.info(s"Job ${handle.pendingJob.jobId} was canceled")
Cancelled

case s if s.contains("ERROR") =>
case s if s.contains("EXECUTOR_ERROR") =>
jobLogger.info(s"TES reported a failure for Job ${handle.pendingJob.jobId}: '$s'")
Failed()

case s if s.contains("SYSTEM_ERROR") =>
jobLogger.info(s"TES reported an error for Job ${handle.pendingJob.jobId}: '$s'")
FailedOrError
Error()

case _ => Running
}
}
}

private def getErrorLogs(handle: StandardAsyncPendingExecutionHandle): Future[Seq[String]] = {
makeRequest[Task](HttpRequest(uri = s"$tesEndpoint/${handle.pendingJob.jobId}?view=FULL")) map { response =>
response.logs.flatMap(_.lastOption).flatMap(_.system_logs).getOrElse(Seq.empty[String])
}
}

override def customPollStatusFailure: PartialFunction[(ExecutionHandle, Exception), ExecutionHandle] = {
case (oldHandle: StandardAsyncPendingExecutionHandle@unchecked, e: Exception) =>
jobLogger.error(s"$tag TES Job ${oldHandle.pendingJob.jobId} has not been found, failing call")
FailedNonRetryableExecutionHandle(e, kvPairsToSave = None)
}

private def handleExecutionError(status: TesRunStatus, returnCode: Option[Int]): Future[ExecutionHandle] = {
val exception = new AggregatedMessageException(s"Task ${jobDescriptor.key.tag} failed for unknown reason: ${status.toString()}", status.sysLogs)
Future.successful(FailedNonRetryableExecutionHandle(exception, returnCode, None))
}

override def handleExecutionFailure(status: StandardAsyncRunState, returnCode: Option[Int]) = {
status match {
case Cancelled => Future.successful(AbortedExecutionHandle)
case Error(_) | Failed(_) => handleExecutionError(status, returnCode)
case _ => super.handleExecutionFailure(status, returnCode)
}
}

0 comments on commit 417adfa

Please sign in to comment.