Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

AWS Batch - add support for custom log group name and additional tags #48

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ import common.util.StringUtil._
import common.validation.Validation._

import cromwell.backend._
import cromwell.backend.async._
import cromwell.backend.async._
import cromwell.backend.impl.aws.IntervalLimitedAwsJobSubmitActor.SubmitAwsJobRequest
import cromwell.backend.impl.aws.OccasionalStatusPollingActor.{NotifyOfStatus, WhatsMyStatus}
import cromwell.backend.impl.aws.RunStatus.{Initializing, TerminalRunStatus}
Expand Down Expand Up @@ -178,9 +178,9 @@ class AwsBatchAsyncBackendJobExecutionActor(override val standardParams: Standar
* commandScriptContents here
*/

/* part of the full commandScriptContents is overriden here, in the context of mixed S3/EFS support with globbing.
/* part of the full commandScriptContents is overriden here, in the context of mixed S3/EFS support with globbing.
we'll see how much we need...
*/
*/

lazy val cmdScript = configuration.fileSystem match {
case AWSBatchStorageSystems.s3 => commandScriptContents.toEither.toOption.get
Expand All @@ -203,7 +203,9 @@ class AwsBatchAsyncBackendJobExecutionActor(override val standardParams: Standar
configuration.efsMntPoint,
Option(runtimeAttributes.efsMakeMD5),
Option(runtimeAttributes.efsDelocalize),
Option(runtimeAttributes.tagResources))
Option(runtimeAttributes.tagResources),
runtimeAttributes.logGroupName,
runtimeAttributes.additionalTags)
}

// setup batch client to query job container info
Expand Down Expand Up @@ -247,7 +249,7 @@ class AwsBatchAsyncBackendJobExecutionActor(override val standardParams: Standar
}
Seq(AwsBatchFileInput(s"$namePrefix-$index", remotePath.valueString, DefaultPathBuilder.get(localPathString), workingDisk))
}

}

/**
Expand All @@ -262,13 +264,13 @@ class AwsBatchAsyncBackendJobExecutionActor(override val standardParams: Standar
// for s3 paths :
case Success(path: S3Path) =>
configuration.fileSystem match {
case AWSBatchStorageSystems.s3 =>
case AWSBatchStorageSystems.s3 =>
URLDecoder.decode(path.pathWithoutScheme,"UTF-8")
case _ =>
case _ =>
URLDecoder.decode(path.toString,"UTF-8")
}
// non-s3 paths
case _ =>
case _ =>
URLDecoder.decode(value,"UTF-8")
}
)
Expand Down Expand Up @@ -301,7 +303,7 @@ class AwsBatchAsyncBackendJobExecutionActor(override val standardParams: Standar
case womFile: WomFile => womFile
}
}

val callInputInputs = callInputFiles flatMap {
case (name, files) => inputsFromWomFiles(name, files, files.map(relativeLocalizationPath), jobDescriptor, true)
}
Expand Down Expand Up @@ -330,7 +332,7 @@ class AwsBatchAsyncBackendJobExecutionActor(override val standardParams: Standar
case _ => AwsBatchWorkingDisk.MountPoint.resolve(path)
}
}

val absolutePath = DefaultPathBuilder.get(path) match {
case p if !p.isAbsolute => getAbsolutePath(p)
case p => p
Expand Down Expand Up @@ -369,7 +371,7 @@ class AwsBatchAsyncBackendJobExecutionActor(override val standardParams: Standar
}

val additionalGlobOutput = jobDescriptor.taskCall.callable.additionalGlob.toList.flatMap(generateAwsBatchGlobFileOutputs).toSet

outputs.toSet ++ additionalGlobOutput
}

Expand Down Expand Up @@ -404,7 +406,7 @@ class AwsBatchAsyncBackendJobExecutionActor(override val standardParams: Standar
// used by generateAwsBatchOutputs, could potentially move this def within that function
private def generateAwsBatchSingleFileOutputs(womFile: WomSingleFile): List[AwsBatchFileOutput] = {
// rewrite this to create more flexibility
//
//
val destination = configuration.fileSystem match {
case AWSBatchStorageSystems.s3 => callRootPath.resolve(womFile.value.stripPrefix("/")).pathAsString
case _ => DefaultPathBuilder.get(womFile.valueString) match {
Expand All @@ -414,8 +416,8 @@ class AwsBatchAsyncBackendJobExecutionActor(override val standardParams: Standar

}
val (relpath, disk) = relativePathAndVolume(womFile.value, runtimeAttributes.disks)
val output = if (configuration.efsMntPoint.isDefined &&

val output = if (configuration.efsMntPoint.isDefined &&
configuration.efsMntPoint.getOrElse("").equals(disk.toString.split(" ")(1)) &&
! runtimeAttributes.efsDelocalize) {
// name: String, s3key: String, local: Path, mount: AwsBatchVolume
Expand All @@ -426,7 +428,7 @@ class AwsBatchAsyncBackendJobExecutionActor(override val standardParams: Standar
}
List(output)
}

// get a unique glob name locations & paths.
// 1. globName :md5 hash of local PATH and WF_ID
// 2. globbedDir : local path of the directory being globbed.
Expand All @@ -445,8 +447,8 @@ class AwsBatchAsyncBackendJobExecutionActor(override val standardParams: Standar
// locate the disk where the globbed data resides
val (_, globDirectoryDisk) = relativePathAndVolume(womFile.value, runtimeAttributes.disks)

val (globDirectoryDestinationPath, globListFileDestinationPath) = if (configuration.efsMntPoint.isDefined &&

val (globDirectoryDestinationPath, globListFileDestinationPath) = if (configuration.efsMntPoint.isDefined &&
configuration.efsMntPoint.getOrElse("").equals(globDirectoryDisk.toString.split(" ")(1)) &&
! runtimeAttributes.efsDelocalize) {
(globDirectory, globListFile)
Expand All @@ -462,11 +464,11 @@ class AwsBatchAsyncBackendJobExecutionActor(override val standardParams: Standar
globDirectoryDestinationPath.toString,
globListFileDestinationPath.toString
)

}
// used by generateAwsBatchOutputs, could potentially move this def within that function
private def generateAwsBatchGlobFileOutputs(womFile: WomGlobFile): List[AwsBatchFileOutput] = {

val (globName, globbedDir, globDirectoryDisk, globDirectoryDestinationPath, globListFileDestinationPath) = generateGlobPaths(womFile)
val (relpathDir,_) = relativePathAndVolume(DefaultPathBuilder.get(globbedDir + "/." + globName + "/" + "*").toString,runtimeAttributes.disks)
val (relpathList,_) = relativePathAndVolume(DefaultPathBuilder.get(globbedDir + "/." + globName + ".list").toString,runtimeAttributes.disks)
Expand Down Expand Up @@ -590,7 +592,7 @@ class AwsBatchAsyncBackendJobExecutionActor(override val standardParams: Standar

override def handleExecutionResult(status: StandardAsyncRunState,
oldHandle: StandardAsyncPendingExecutionHandle): Future[ExecutionHandle] = {


// get path to sderr
val stderr = jobPaths.standardPaths.error
Expand All @@ -614,8 +616,8 @@ class AwsBatchAsyncBackendJobExecutionActor(override val standardParams: Standar
case Success(returnCodeAsInt) if failOnStdErr && stderrSize.intValue > 0 =>
val executionHandle = Future.successful(FailedNonRetryableExecutionHandle(StderrNonEmpty(jobDescriptor.key.tag, stderrSize, stderrAsOption), Option(returnCodeAsInt), None))
retryElseFail(executionHandle)
// job was aborted (cancelled by user?)
// on AWS OOM kill are code 137 : check retryWithMoreMemory here
// job was aborted (cancelled by user?)
// on AWS OOM kill are code 137 : check retryWithMoreMemory here
case Success(returnCodeAsInt) if isAbort(returnCodeAsInt) && !retryWithMoreMemory =>
jobLogger.debug(s"Job was aborted, code was : '${returnCodeAsString.stripLineEnd}'")
Future.successful(AbortedExecutionHandle)
Expand Down Expand Up @@ -660,14 +662,14 @@ class AwsBatchAsyncBackendJobExecutionActor(override val standardParams: Standar
retryElseFail(failureStatus)
}
}


}


// get the exit code of the job.
def JobExitCode: Future[String] = {

// read if the file exists
def readRCFile(fileExists: Boolean): Future[String] = {
if (fileExists)
Expand All @@ -684,20 +686,20 @@ class AwsBatchAsyncBackendJobExecutionActor(override val standardParams: Standar
jobRC <- readRCFile(fileExists)
} yield jobRC
}
// new OOM detection

// new OOM detection
def memoryRetryRC(job: StandardAsyncJob): Future[Boolean] = Future {
// STATUS LOGIC:
// - success : container exit code is zero
// - command failure: container exit code > 0, no statusReason in container
// - OOM kill : container exit code > 0, statusReason contains "OutOfMemory" OR exit code == 137
// - spot kill : no container exit code set. statusReason of ATTEMPT (not container) says "host EC2 (...) terminated"
// - spot kill : no container exit code set. statusReason of ATTEMPT (not container) says "host EC2 (...) terminated"
Log.debug(s"Looking for memoryRetry in job '${job.jobId}'")
val describeJobsResponse = batchClient.describeJobs(DescribeJobsRequest.builder.jobs(job.jobId).build)
val jobDetail = describeJobsResponse.jobs.get(0) //OrElse(throw new RuntimeException(s"Could not get job details for job '${job.jobId}'"))
val nrAttempts = jobDetail.attempts.size
// if job is terminated/cancelled before starting, there are no attempts.
val lastattempt =
// if job is terminated/cancelled before starting, there are no attempts.
val lastattempt =
try {
jobDetail.attempts.get(nrAttempts-1)
} catch {
Expand All @@ -706,8 +708,8 @@ class AwsBatchAsyncBackendJobExecutionActor(override val standardParams: Standar
if (lastattempt == null ) {
Log.info(s"No attempts were made for job '${job.jobId}'. no memory-related retry needed.")
false
}
var containerRC =
}
var containerRC =
try {
lastattempt.container.exitCode
} catch {
Expand All @@ -720,13 +722,13 @@ class AwsBatchAsyncBackendJobExecutionActor(override val standardParams: Standar
}
// if not zero => get reason, else set retry to false.
containerRC.toString() match {
case "0" =>
case "0" =>
Log.debug("container exit code was zero. job succeeded")
false
case "137" =>
case "137" =>
Log.info("Job failed with Container status reason : 'OutOfMemory' (code:137)")
true
case _ =>
case _ =>
// failed job due to command errors (~ user errors) don't have a container exit reason.
val containerStatusReason:String = {
var lastReason = lastattempt.container.reason
Expand All @@ -745,11 +747,11 @@ class AwsBatchAsyncBackendJobExecutionActor(override val standardParams: Standar
Log.debug(s"Retry job based on provided keys : '${retry}'")
retry
}


}




// Despite being a "runtime" exception, BatchExceptions for 429 (too many requests) are *not* fatal:
Expand Down Expand Up @@ -824,7 +826,7 @@ class AwsBatchAsyncBackendJobExecutionActor(override val standardParams: Standar
}
)
}

override def handleExecutionSuccess(runStatus: StandardAsyncRunState,
handle: StandardAsyncPendingExecutionHandle,
returnCode: Int)(implicit ec: ExecutionContext): Future[ExecutionHandle] = {
Expand All @@ -833,7 +835,7 @@ class AwsBatchAsyncBackendJobExecutionActor(override val standardParams: Standar
// Need to make sure the paths are up to date before sending the detritus back in the response
updateJobPaths()
// If instance is terminated while copying stdout/stderr : status is failed while jobs outputs are ok
// => Retryable
// => Retryable
if (runStatus.toString().equals("Failed")) {
jobLogger.warn("Got Failed RunStatus for success Execution")

Expand Down Expand Up @@ -868,8 +870,8 @@ class AwsBatchAsyncBackendJobExecutionActor(override val standardParams: Standar
* @return The shell scripting.
*/
override def globScript(globFile: WomGlobFile): String = {
val (globName, globbedDir, _, _, _) = generateGlobPaths(globFile)

val (globName, globbedDir, _, _, _) = generateGlobPaths(globFile)
val controlFileName = "cromwell_glob_control_file"
val absoluteGlobValue = commandDirectory.resolve(globFile.value).pathAsString
val globDirectory = globbedDir + "/." + globName + "/"
Expand All @@ -882,7 +884,7 @@ class AwsBatchAsyncBackendJobExecutionActor(override val standardParams: Standar
}).toString
.replaceAll("GLOB_PATTERN", absoluteGlobValue)
.replaceAll("GLOB_DIRECTORY", globDirectory)
// if on EFS : remove the globbing dir first, to remove leftover links from previous globs.
// if on EFS : remove the globbing dir first, to remove leftover links from previous globs.
val mkDirCmd : String = if (configuration.efsMntPoint.isDefined && globDirectory.startsWith(configuration.efsMntPoint.getOrElse(""))) {
jobLogger.warn("Globbing on EFS has risks.")
jobLogger.warn(s"The globbing target (${globbedDir}/.${globName}/) will be overwritten when existing!")
Expand All @@ -891,13 +893,13 @@ class AwsBatchAsyncBackendJobExecutionActor(override val standardParams: Standar
} else {
"mkdir"
}

val controlFileContent =
"""This file is used by Cromwell to allow for globs that would not match any file.
|By its presence it works around the limitation of some backends that do not allow empty globs.
|Regardless of the outcome of the glob, this file will not be part of the final list of globbed files.
""".stripMargin

s"""|# make the directory which will keep the matching files
|$mkDirCmd $globDirectory
|
Expand Down
Loading
Loading