diff --git a/backend/src/main/scala/cromwell/backend/backend.scala b/backend/src/main/scala/cromwell/backend/backend.scala index 8a4453a03f9..31a4f37c036 100644 --- a/backend/src/main/scala/cromwell/backend/backend.scala +++ b/backend/src/main/scala/cromwell/backend/backend.scala @@ -146,6 +146,7 @@ object CommonBackendConfigurationAttributes { "default-runtime-attributes.maxRetries", "default-runtime-attributes.awsBatchEvaluateOnExit", "default-runtime-attributes.sharedMemorySize", + "default-runtime-attributes.jobTimeout", "default-runtime-attributes.ulimits", "default-runtime-attributes.efsDelocalize", "default-runtime-attributes.efsMakeMD5", diff --git a/core/src/main/scala/cromwell/core/io/DefaultIoCommand.scala b/core/src/main/scala/cromwell/core/io/DefaultIoCommand.scala index 44451a1e791..b3761636136 100644 --- a/core/src/main/scala/cromwell/core/io/DefaultIoCommand.scala +++ b/core/src/main/scala/cromwell/core/io/DefaultIoCommand.scala @@ -57,6 +57,10 @@ object DefaultIoCommand { case class DefaultIoExistsOrThrowCommand(override val file: Path) extends IoExistsOrThrowCommand(file) { override def commandDescription: String = s"DefaultIoExistsOrThrowCommand file '$file'" } + + case class DefaultIoNoopCommand(override val file: Path) extends IoNoopCommand(file) { + override def commandDescription: String = s"DefaultIoNoopCommand file '$file'" + } case class DefaultIoReadLinesCommand(override val file: Path) extends IoReadLinesCommand(file) { override def commandDescription: String = s"DefaultIoReadLinesCommand file '$file'" diff --git a/core/src/main/scala/cromwell/core/io/IoCommand.scala b/core/src/main/scala/cromwell/core/io/IoCommand.scala index ec90c3b6c5c..0724cf5c365 100644 --- a/core/src/main/scala/cromwell/core/io/IoCommand.scala +++ b/core/src/main/scala/cromwell/core/io/IoCommand.scala @@ -190,6 +190,14 @@ abstract class IoExistsOrThrowCommand(val file: Path) extends SingleFileIoComman override lazy val name = "exist" } +/** + * No-op Command that does nothing + */ +abstract class IoNoopCommand(val file: Path) extends SingleFileIoCommand[Boolean] { + override def toString = s"No-op command on ${file.pathAsString} (does nothing)" + override lazy val name = "noop" +} + /** * Return the lines of a file in a collection */ diff --git a/core/src/main/scala/cromwell/core/io/IoCommandBuilder.scala b/core/src/main/scala/cromwell/core/io/IoCommandBuilder.scala index ae5ef05e252..a20ff019ce7 100644 --- a/core/src/main/scala/cromwell/core/io/IoCommandBuilder.scala +++ b/core/src/main/scala/cromwell/core/io/IoCommandBuilder.scala @@ -22,6 +22,7 @@ abstract class PartialIoCommandBuilder { def touchCommand: PartialFunction[Path, Try[IoTouchCommand]] = PartialFunction.empty def existsCommand: PartialFunction[Path, Try[IoExistsCommand]] = PartialFunction.empty def existsOrThrowCommand: PartialFunction[Path, Try[IoExistsOrThrowCommand]] = PartialFunction.empty + def noopCommand: PartialFunction[Path, Try[IoNoopCommand]] = PartialFunction.empty def isDirectoryCommand: PartialFunction[Path, Try[IoIsDirectoryCommand]] = PartialFunction.empty def readLinesCommand: PartialFunction[Path, Try[IoReadLinesCommand]] = PartialFunction.empty } @@ -98,6 +99,9 @@ class IoCommandBuilder(partialBuilders: List[PartialIoCommandBuilder] = List.emp def existsOrThrowCommand(file: Path): Try[IoExistsOrThrowCommand] = buildOrDefault(_.existsOrThrowCommand, file, DefaultIoExistsOrThrowCommand(file)) + def noopCommand(file: Path): Try[IoNoopCommand] = + buildOrDefault(_.noopCommand, file, DefaultIoNoopCommand(file)) + def isDirectoryCommand(file: Path): Try[IoIsDirectoryCommand] = buildOrDefault(_.isDirectoryCommand, file, DefaultIoIsDirectoryCommand(file)) diff --git a/engine/src/main/scala/cromwell/engine/io/nio/NioFlow.scala b/engine/src/main/scala/cromwell/engine/io/nio/NioFlow.scala index a2de8303a23..48fafbb337b 100644 --- a/engine/src/main/scala/cromwell/engine/io/nio/NioFlow.scala +++ b/engine/src/main/scala/cromwell/engine/io/nio/NioFlow.scala @@ -80,6 +80,7 @@ class NioFlow(parallelism: Int, case touchCommand: IoTouchCommand => touch(touchCommand) map touchCommand.success case existsCommand: IoExistsCommand => exists(existsCommand) map existsCommand.success case existsOrThrowCommand: IoExistsOrThrowCommand => existsOrThrow(existsOrThrowCommand) map existsOrThrowCommand.success + case noopCommand: IoNoopCommand => noop(noopCommand) map noopCommand.success case readLinesCommand: IoReadLinesCommand => readLines(readLinesCommand) map readLinesCommand.success case isDirectoryCommand: IoIsDirectoryCommand => isDirectory(isDirectoryCommand) map isDirectoryCommand.success case _ => IO.raiseError(new UnsupportedOperationException("Method not implemented")) @@ -191,6 +192,9 @@ class NioFlow(parallelism: Int, case true => true } } + private def noop(noop: IoNoopCommand) = IO { + () + } private def readLines(exists: IoReadLinesCommand) = IO { exists.file.withReader { reader => diff --git a/engine/src/main/scala/cromwell/engine/workflow/lifecycle/execution/job/EngineJobExecutionActor.scala b/engine/src/main/scala/cromwell/engine/workflow/lifecycle/execution/job/EngineJobExecutionActor.scala index add2a09bada..4e297e70ecd 100644 --- a/engine/src/main/scala/cromwell/engine/workflow/lifecycle/execution/job/EngineJobExecutionActor.scala +++ b/engine/src/main/scala/cromwell/engine/workflow/lifecycle/execution/job/EngineJobExecutionActor.scala @@ -287,7 +287,6 @@ class EngineJobExecutionActor(replyTo: ActorRef, log.info(s"BT-322 {} cache hit copying nomatch: could not find a suitable cache hit.", jobTag) workflowLogger.debug(s"Could not copy a suitable cache hit for {$jobTag}. No copy attempts were made.") } - runJob(data) case Event(hashes: CallCacheHashes, data: ResponsePendingData) => addHashesAndStay(data, hashes) diff --git a/filesystems/s3/src/main/scala/cromwell/filesystems/s3/batch/S3BatchCommandBuilder.scala b/filesystems/s3/src/main/scala/cromwell/filesystems/s3/batch/S3BatchCommandBuilder.scala index d41bfd26d67..84784e09b30 100644 --- a/filesystems/s3/src/main/scala/cromwell/filesystems/s3/batch/S3BatchCommandBuilder.scala +++ b/filesystems/s3/src/main/scala/cromwell/filesystems/s3/batch/S3BatchCommandBuilder.scala @@ -37,7 +37,7 @@ import cromwell.filesystems.s3.S3Path import org.slf4j.{Logger, LoggerFactory} import cromwell.core.io.DefaultIoCommand.DefaultIoHashCommand -import scala.util.Try +import scala.util.{Try} /** * Generates commands for IO operations on S3 @@ -94,6 +94,11 @@ private case object PartialS3BatchCommandBuilder extends PartialIoCommandBuilder override def existsOrThrowCommand: PartialFunction[Path, Try[S3BatchExistsOrThrowCommand]] = { case path: S3Path => Try(S3BatchExistsOrThrowCommand(path)) } + + override def noopCommand: PartialFunction[Path, Try[S3BatchNoopCommand]] = { case path: S3Path => + Try(S3BatchNoopCommand(path)) + } + } /** diff --git a/filesystems/s3/src/main/scala/cromwell/filesystems/s3/batch/S3BatchIoCommand.scala b/filesystems/s3/src/main/scala/cromwell/filesystems/s3/batch/S3BatchIoCommand.scala index 5a98e46e554..c930dd54ef4 100644 --- a/filesystems/s3/src/main/scala/cromwell/filesystems/s3/batch/S3BatchIoCommand.scala +++ b/filesystems/s3/src/main/scala/cromwell/filesystems/s3/batch/S3BatchIoCommand.scala @@ -39,6 +39,7 @@ import cromwell.core.io.{ IoDeleteCommand, IoExistsCommand, IoExistsOrThrowCommand, + IoNoopCommand, IoHashCommand, IoSizeCommand, IoTouchCommand @@ -46,6 +47,7 @@ import cromwell.core.io.{ import cromwell.filesystems.s3.S3Path import java.nio.file.NoSuchFileException +//import scala.util.{Try } /** * Io commands with S3 paths and some logic enabling batching of request. @@ -162,3 +164,13 @@ case class S3BatchExistsOrThrowCommand(override val file: S3Path) } override def commandDescription: String = s"S3BatchExistsCommand file '$file'" } +/** + * `IoCommand` that does nothing + * @param file the path to the object + */ +case class S3BatchNoopCommand(override val file: S3Path) extends IoNoopCommand(file) { + override def commandDescription: String = s"S3BatchNoopCommand file '$file'" +} + + + diff --git a/project/Dependencies.scala b/project/Dependencies.scala index 6c66a0524a1..d94f5c59ab4 100644 --- a/project/Dependencies.scala +++ b/project/Dependencies.scala @@ -6,7 +6,7 @@ object Dependencies { private val akkaV = "2.5.32" // scala-steward:off (CROM-6637) private val ammoniteOpsV = "2.4.1" private val apacheHttpClientV = "4.5.13" - private val awsSdkV = "2.26.19" + private val awsSdkV = "2.29.20" // We would like to use the BOM to manage Azure SDK versions, but SBT doesn't support it. // https://github.com/Azure/azure-sdk-for-java/tree/main/sdk/boms/azure-sdk-bom // https://github.com/sbt/sbt/issues/4531 diff --git a/supportedBackends/aws/src/main/scala/cromwell/backend/impl/aws/AwsBatchAsyncBackendJobExecutionActor.scala b/supportedBackends/aws/src/main/scala/cromwell/backend/impl/aws/AwsBatchAsyncBackendJobExecutionActor.scala index c88e901396f..00dfc9b0e87 100755 --- a/supportedBackends/aws/src/main/scala/cromwell/backend/impl/aws/AwsBatchAsyncBackendJobExecutionActor.scala +++ b/supportedBackends/aws/src/main/scala/cromwell/backend/impl/aws/AwsBatchAsyncBackendJobExecutionActor.scala @@ -66,6 +66,8 @@ import cromwell.filesystems.s3.S3Path import cromwell.filesystems.s3.batch.S3BatchCommandBuilder import cromwell.services.keyvalue.KvClient +import cromwell.services.metadata.CallMetadataKeys + import org.slf4j.{Logger, LoggerFactory} import software.amazon.awssdk.services.batch.BatchClient @@ -74,7 +76,15 @@ import software.amazon.awssdk.services.batch.model._ import wom.callable.Callable.OutputDefinition import wom.core.FullyQualifiedName import wom.expression.NoIoFunctionSet -import wom.types.{WomArrayType, WomSingleFileType} +import wom.types.{WomArrayType, + WomType, + WomSingleFileType, + WomOptionalType, + WomPrimitiveFileType, + WomPrimitiveType, + WomMapType, + WomPairType, + WomCompositeType} import wom.values._ import scala.concurrent._ @@ -245,16 +255,43 @@ class AwsBatchAsyncBackendJobExecutionActor(override val standardParams: Standar /** * Takes two arrays of remote and local WOM File paths and generates the necessary AwsBatchInputs. */ + + // taken from the WomExpression trait: + private def areAllFilesOptional(womType: WomType): Boolean = { + def innerAreAllFileTypesInWomTypeOptional(womType: WomType): Boolean = womType match { + case WomOptionalType(_: WomPrimitiveFileType) => + true + case _: WomPrimitiveFileType => + false + case _: WomPrimitiveType => + true // WomPairTypes and WomCompositeTypes may have non-File components here which is fine. + case WomArrayType(inner) => innerAreAllFileTypesInWomTypeOptional(inner) + case WomMapType(_, inner) => innerAreAllFileTypesInWomTypeOptional(inner) + case WomPairType(leftType, rightType) => + innerAreAllFileTypesInWomTypeOptional(leftType) && innerAreAllFileTypesInWomTypeOptional(rightType) + case WomCompositeType(typeMap, _) => typeMap.values.forall(innerAreAllFileTypesInWomTypeOptional) + case _ => false + } + + // At the outermost level, primitives are never optional. + womType match { + case _: WomPrimitiveType => + false + case _ => innerAreAllFileTypesInWomTypeOptional(womType) + } + } + private def inputsFromWomFiles( namePrefix: String, remotePathArray: Seq[WomFile], localPathArray: Seq[WomFile], jobDescriptor: BackendJobDescriptor, + isOptional: Seq[Boolean], // New parameter flag: Boolean ): Iterable[AwsBatchInput] = { - (remotePathArray zip localPathArray zipWithIndex) flatMap { - case ((remotePath, localPath), index) => + (remotePathArray zip localPathArray zip isOptional zipWithIndex) flatMap { + case (((remotePath, localPath), optional), index) => var localPathString = localPath.valueString if (localPathString.startsWith("s3://")) { localPathString = localPathString.replace("s3://", "") @@ -266,11 +303,11 @@ class AwsBatchAsyncBackendJobExecutionActor(override val standardParams: Standar s"$namePrefix-$index", remotePath.valueString, DefaultPathBuilder.get(localPathString), - workingDisk + workingDisk, + optional ) ) } - } /** @@ -314,12 +351,12 @@ class AwsBatchAsyncBackendJobExecutionActor(override val standardParams: Standar files.map(_.file), files.map(localizationPath), jobDescriptor, + files.map(_.file.womType).map(areAllFilesOptional), false ) } - // Collect all WomFiles from inputs to the call. - val callInputFiles: Map[FullyQualifiedName, Seq[WomFile]] = jobDescriptor.fullyQualifiedInputs safeMapValues { + val callInputFiles: Map[FullyQualifiedName, Seq[(WomFile, Boolean)]] = jobDescriptor.fullyQualifiedInputs safeMapValues { womFile => val arrays: Seq[WomArray] = womFile collectAsSeq { case womFile: WomFile => val files: List[WomSingleFile] = DirectoryFunctions @@ -328,27 +365,33 @@ class AwsBatchAsyncBackendJobExecutionActor(override val standardParams: Standar .get WomArray(WomArrayType(WomSingleFileType), files) } - - arrays.flatMap(_.value).collect { case womFile: WomFile => - womFile + // mixed optional is cast to mandatory + val isOptional = areAllFilesOptional(womFile.womType) + // Flatten and collect files along with the outer womFile's optional status + arrays.flatMap(_.value).collect { case file: WomFile => + (file, isOptional) } } - - val callInputInputs = callInputFiles flatMap { case (name, files) => + + val callInputInputs = callInputFiles flatMap { case (name, filesWithOptional) => + val files = filesWithOptional.map(_._1) // Extract the WomFiles + val isOptional = filesWithOptional.map(_._2) // Extract the corresponding WomTypes inputsFromWomFiles( name, files, files.map(relativeLocalizationPath), jobDescriptor, + isOptional, // Pass womTypes to inputsFromWomFiles true ) } - // this is a list : AwsBatchInput(name_in_wf, origin_such_as_s3, target_in_docker_relative, target_in_docker_disk[name mount] ) + // this is a list : AwsBatchInput(name_in_wf, origin_such_as_s3, target_in_docker_relative, target_in_docker_disk[name mount], is_optional_file ) val scriptInput: AwsBatchInput = AwsBatchFileInput( "script", jobPaths.script.pathAsString, DefaultPathBuilder.get(jobPaths.script.pathWithoutScheme), - workingDisk + workingDisk, + false ) Set(scriptInput) ++ writeFunctionInputs ++ callInputInputs @@ -397,7 +440,12 @@ class AwsBatchAsyncBackendJobExecutionActor(override val standardParams: Standar jobDescriptor: BackendJobDescriptor ): Set[AwsBatchFileOutput] = { import cats.syntax.validated._ - def evaluateFiles(output: OutputDefinition): List[WomFile] = + + + def evaluateFiles(output: OutputDefinition): List[(WomFile, Boolean)] = { + // mixed mandatory/optional types are cast to mandatory. + val is_optional = areAllFilesOptional(output.womType) + Try( output.expression .evaluateFiles( @@ -405,20 +453,33 @@ class AwsBatchAsyncBackendJobExecutionActor(override val standardParams: Standar NoIoFunctionSet, output.womType ) - .map(_.toList map { _.file }) - ).getOrElse(List.empty[WomFile].validNel) + .map(_.toList map { womFile => + // Pair each WomFile with the optional status + (womFile.file, is_optional) + }) + ).getOrElse(List.empty[(WomFile, Boolean)].validNel) .getOrElse(List.empty) + } - val womFileOutputs = jobDescriptor.taskCall.callable.outputs - .flatMap(evaluateFiles) map relativeLocalizationPath - val outputs: Seq[AwsBatchFileOutput] = womFileOutputs.distinct flatMap { - _.flattenFiles flatMap { - case unlistedDirectory: WomUnlistedDirectory => - generateUnlistedDirectoryOutputs(unlistedDirectory) - case singleFile: WomSingleFile => - generateAwsBatchSingleFileOutputs(singleFile) - case globFile: WomGlobFile => generateAwsBatchGlobFileOutputs(globFile) - } + val womFileOutputsEvaluated = jobDescriptor.taskCall.callable.outputs + .flatMap(evaluateFiles) + + val womFileOutputs = womFileOutputsEvaluated map { case (file, isOptional) => + (relativeLocalizationPath(file), isOptional) + } + + val flatmapped = womFileOutputs.distinct flatMap { + case (file, isOptional) => file.flattenFiles.map((_, isOptional)) + } + // Generate AwsBatchFileOutput for each file based on type + val outputs: Seq[AwsBatchFileOutput] = flatmapped flatMap { + + case (unlistedDirectory: WomUnlistedDirectory, _) => + generateUnlistedDirectoryOutputs(unlistedDirectory) + case (singleFile: WomSingleFile, isOptional) => + generateAwsBatchSingleFileOutputs(singleFile, isOptional) + case (globFile: WomGlobFile, _) => + generateAwsBatchGlobFileOutputs(globFile) } val additionalGlobOutput = @@ -445,21 +506,23 @@ class AwsBatchAsyncBackendJobExecutionActor(override val standardParams: Standar makeSafeAwsBatchReferenceName(directoryListFile), listDestinationPath, DefaultPathBuilder.get(directoryListFile), - directoryDisk + directoryDisk, + false ), // The collection list file: AwsBatchFileOutput( makeSafeAwsBatchReferenceName(directoryPath), dirDestinationPath, DefaultPathBuilder.get(directoryPath + "*"), - directoryDisk + directoryDisk, + false ) ) } // used by generateAwsBatchOutputs, could potentially move this def within that function private def generateAwsBatchSingleFileOutputs( - womFile: WomSingleFile + womFile: WomSingleFile, isOptional: Boolean ): List[AwsBatchFileOutput] = { // rewrite this to create more flexibility val destination = configuration.fileSystem match { @@ -477,11 +540,11 @@ class AwsBatchAsyncBackendJobExecutionActor(override val standardParams: Standar val output = if (configuration.efsMntPoint.isDefined && configuration.efsMntPoint.getOrElse("").equals(disk.toString.split(" ")(1)) && ! runtimeAttributes.efsDelocalize) { - // name: String, s3key: String, local: Path, mount: AwsBatchVolume - AwsBatchFileOutput(makeSafeAwsBatchReferenceName(womFile.value), womFile.value, relpath, disk) + // name: String, s3key: String, local: Path, mount: AwsBatchVolume, optionalFile + AwsBatchFileOutput(makeSafeAwsBatchReferenceName(womFile.value), womFile.value, relpath, disk, isOptional) } else { // if efs is not enabled, OR efs delocalization IS enabled, keep the s3 path as destination. - AwsBatchFileOutput(makeSafeAwsBatchReferenceName(womFile.value), URLDecoder.decode(destination,"UTF-8"), relpath, disk) + AwsBatchFileOutput(makeSafeAwsBatchReferenceName(womFile.value), URLDecoder.decode(destination,"UTF-8"), relpath, disk, isOptional) } List(output) } @@ -549,9 +612,9 @@ class AwsBatchAsyncBackendJobExecutionActor(override val standardParams: Standar // We need both the glob directory and the glob list: List( // The glob directory: - AwsBatchFileOutput(DefaultPathBuilder.get(globbedDir.toString + "/." + globName + "/" + "*").toString,globDirectoryDestinationPath, relpathDir, globDirectoryDisk), + AwsBatchFileOutput(DefaultPathBuilder.get(globbedDir.toString + "/." + globName + "/" + "*").toString,globDirectoryDestinationPath, relpathDir, globDirectoryDisk, false), // The glob list file: - AwsBatchFileOutput(DefaultPathBuilder.get(globbedDir.toString + "/." + globName + ".list").toString, globListFileDestinationPath, relpathList, globDirectoryDisk) + AwsBatchFileOutput(DefaultPathBuilder.get(globbedDir.toString + "/." + globName + ".list").toString, globListFileDestinationPath, relpathList, globDirectoryDisk, false) // TODO: EVALUATE above vs below (mainly the makeSafeAwsBatchReferenceName() routine) // The glob directory: // AwsBatchFileOutput(makeSafeAwsBatchReferenceName(globDirectory), @@ -573,15 +636,6 @@ class AwsBatchAsyncBackendJobExecutionActor(override val standardParams: Standar case _ => jobPaths.callExecutionRoot } - // comment this, as it doesn't provide any data. - // (original) def scriptPreamble: ErrorOr[ScriptPreambleData] = ScriptPreambleData("").valid - // override def scriptPreamble: String = { - // configuration.fileSystem match { - // case AWSBatchStorageSystems.s3 => "" - // case _ => "" - // } - // } - override def scriptClosure: String = { configuration.fileSystem match { case AWSBatchStorageSystems.s3 => "" @@ -927,12 +981,22 @@ class AwsBatchAsyncBackendJobExecutionActor(override val standardParams: Standar override lazy val startMetadataKeyValues: Map[String, Any] = super[AwsBatchJobCachingActorHelper].startMetadataKeyValues - // opportunity to send custom metadata when the run is in a terminal state, currently we don't - override def getTerminalMetadata(runStatus: RunStatus): Map[String, Any] = + // opportunity to send custom metadata when the run is in a terminal state, currently related to cloudwatch info + def getTerminalMetadata(runStatus: RunStatus, jobHandle: StandardAsyncPendingExecutionHandle): Map[String, Any] = { + // job details + val jobDetail = batchClient.describeJobs(DescribeJobsRequest.builder.jobs(jobHandle.pendingJob.jobId).build).jobs.get(0) runStatus match { - case _: TerminalRunStatus => Map() + case _: TerminalRunStatus => + Map( + AwsBatchMetadataKeys.LogStreamName -> Option(jobDetail.container.logStreamName).getOrElse("unknown"), + AwsBatchMetadataKeys.LogGroupName -> Option(jobDetail.container.logConfiguration.options.get("awslogs-group")).getOrElse("unknown"), + //AwsBatchMetadataKeys.JobStatusReason -> Option(jobDetail.statusReason).getOrElse("unknown"), + // region is taken by splitting the ARN of the job queue (logging is always in same region I think) + AwsBatchMetadataKeys.LogStreamRegion -> Option(jobDetail.jobQueue.split(":")(3)).getOrElse("unknown"), + ) case unknown => throw new RuntimeException(s"Attempt to get terminal metadata from non terminal status: $unknown") } + } def hostAbsoluteFilePath(jobPaths: JobPaths, pathString: String): Path = { @@ -990,6 +1054,33 @@ class AwsBatchAsyncBackendJobExecutionActor(override val standardParams: Standar ) } + + + override def handlePollSuccess(oldHandle: StandardAsyncPendingExecutionHandle, + state: StandardAsyncRunState + ): Future[ExecutionHandle] = { + val previousState = oldHandle.previousState + if (!(previousState exists statusEquivalentTo(state))) { + // If this is the first time checking the status, we log the transition as '-' to 'currentStatus'. Otherwise just use + // the state names. + // This logging and metadata publishing assumes that StandardAsyncRunState subtypes `toString` nicely to state names. + val prevStatusName = previousState.map(_.toString).getOrElse("-") + jobLogger.info(s"Status change from $prevStatusName to $state") + tellMetadata(Map(CallMetadataKeys.BackendStatus -> state)) + } + + state match { + case _ if isTerminal(state) => + val metadata = getTerminalMetadata(state, oldHandle) + tellMetadata(metadata) + handleExecutionResult(state, oldHandle) + case s => + Future.successful( + oldHandle.copy(previousState = Option(s)) + ) // Copy the current handle with updated previous status. + } + } + override def handleExecutionSuccess(runStatus: StandardAsyncRunState, handle: StandardAsyncPendingExecutionHandle, returnCode: Int)(implicit ec: ExecutionContext): Future[ExecutionHandle] = { diff --git a/supportedBackends/aws/src/main/scala/cromwell/backend/impl/aws/AwsBatchExpressionFunctions.scala b/supportedBackends/aws/src/main/scala/cromwell/backend/impl/aws/AwsBatchExpressionFunctions.scala index 4ea87b6e10f..2f4b811a9dc 100755 --- a/supportedBackends/aws/src/main/scala/cromwell/backend/impl/aws/AwsBatchExpressionFunctions.scala +++ b/supportedBackends/aws/src/main/scala/cromwell/backend/impl/aws/AwsBatchExpressionFunctions.scala @@ -42,7 +42,9 @@ import cromwell.backend.impl.aws.io._ class AwsBatchExpressionFunctions(override val standardParams: StandardExpressionFunctionsParams) - extends StandardExpressionFunctions(standardParams) with AwsBatchGlobFunctions { + extends StandardExpressionFunctions(standardParams) + with AwsBatchGlobFunctions + with AwsReadLikeFunctions { override lazy val ioCommandBuilder: IoCommandBuilder = S3BatchCommandBuilder diff --git a/supportedBackends/aws/src/main/scala/cromwell/backend/impl/aws/AwsBatchJob.scala b/supportedBackends/aws/src/main/scala/cromwell/backend/impl/aws/AwsBatchJob.scala index 3c5b0acbc72..b99e0fe43b5 100755 --- a/supportedBackends/aws/src/main/scala/cromwell/backend/impl/aws/AwsBatchJob.scala +++ b/supportedBackends/aws/src/main/scala/cromwell/backend/impl/aws/AwsBatchJob.scala @@ -145,20 +145,23 @@ final case class AwsBatchJob(jobDescriptor: BackendJobDescriptor, // WDL/CWL case input: AwsBatchFileInput if input.s3key.startsWith("s3://") => // regular s3 objects : download to working dir. - s"""_s3_localize_with_retry "${input.s3key}" "${input.mount.mountPoint.pathAsString}/${input.local}" """.stripMargin + s"""_s3_localize_with_retry "${input.s3key}" "${input.mount.mountPoint.pathAsString}/${input.local}" "${input.optional}" """.stripMargin .replace(AwsBatchWorkingDisk.MountPoint.pathAsString, workDir) case input: AwsBatchFileInput if efsMntPoint.isDefined && input.s3key.startsWith(efsMntPoint.get) => // EFS located file : test for presence on provided path. Log.debug("EFS input file detected: "+ input.s3key + " / "+ input.local.pathAsString) - s"""test -e "${input.s3key}" || (echo 'input file: ${input.s3key} does not exist' && LOCALIZATION_FAILED=1)""".stripMargin + //s"""test -e "${input.s3key}" || (echo 'input file: ${input.s3key} does not exist' && LOCALIZATION_FAILED=1)""".stripMargin + s"""_check_efs_infile "${input.s3key}" "${input.optional}" """.stripMargin case input: AwsBatchFileInput => // an entry in 'disks' => keep mount as it is.. //here we don't need a copy command but the centaurTests expect us to verify the existence of the file val filePath = input.local.pathAsString Log.debug("input entry in disks detected "+ input.s3key + " / "+ input.local.pathAsString) - s"""test -e "$filePath" || (echo 'input file: $filePath does not exist' && LOCALIZATION_FAILED=1)""".stripMargin + //s"""test -e "$filePath" || (echo 'input file: $filePath does not exist' && LOCALIZATION_FAILED=1)""".stripMargin + // can use same efs routine + s"""_check_efs_infile "$filePath" "${input.optional}" """.stripMargin case _ => "" }.toList).mkString("\n") @@ -190,6 +193,8 @@ final case class AwsBatchJob(jobDescriptor: BackendJobDescriptor, // WDL/CWL | local s3_path="$$1" | # destination must be the path to a file and not just the directory you want the file in | local destination="$$2" + | # if third options is specified, it is the optional tag (true / false) + | local is_optional="$${3:-false}" | | for i in {1..6}; | do @@ -197,13 +202,23 @@ final case class AwsBatchJob(jobDescriptor: BackendJobDescriptor, // WDL/CWL | if [ "$$i" -eq 6 ]; then | echo "failed to copy $$s3_path after $$(( $$i - 1 )) attempts." | LOCALIZATION_FAILED=1 - | break + | return | fi | # check validity of source path | if ! [[ "$$s3_path" =~ s3://([^/]+)/(.+) ]]; then | echo "$$s3_path is not an S3 path with a bucket and key." | LOCALIZATION_FAILED=1 - | break + | return + | fi + | ## if missing on s3 : check if optional: + | if ! $awsCmd s3 ls "$$s3_path" > /dev/null 2>&1 ; then + | if [[ "$$is_optional" == "true" ]]; then + | echo "Optional file '$$s3_path' does not exist. skipping localization" + | else + | echo "$$s3_path does not exist. skipping localization" + | LOCALIZATION_FAILED=1 + | fi + | return | fi | # copy | $awsCmd s3 cp --no-progress "$$s3_path" "$$destination" || @@ -212,7 +227,7 @@ final case class AwsBatchJob(jobDescriptor: BackendJobDescriptor, // WDL/CWL | _check_data_integrity "$$destination" "$$s3_path" || | { echo "data content length difference detected in attempt $$i to copy $$local_path failed" && sleep $$((7 * "$$i")) && continue; } | # copy succeeded - | break + | return | done |} | @@ -221,14 +236,9 @@ final case class AwsBatchJob(jobDescriptor: BackendJobDescriptor, // WDL/CWL | local local_path="$$1" | # destination must be the path to a file and not just the directory you want the file in | local destination="$$2" - | - | # if file/folder does not exist, return immediately - | if [[ ! -e "$$local_path" ]]; then - | echo "$$local_path does not exist. skipping delocalization" - | DELOCALIZATION_FAILED=1 - | return - | fi - | + | # if third options is specified, it is the optional tag (true / false) + | local is_optional="$${3:-false}" + | | # get the multipart chunk size | chunk_size=$$(_get_multipart_chunk_size "$$local_path") | local MP_THRESHOLD=${mp_threshold} @@ -243,13 +253,13 @@ final case class AwsBatchJob(jobDescriptor: BackendJobDescriptor, // WDL/CWL | if [ "$$i" -eq 6 ]; then | echo "failed to delocalize $$local_path after $$(( $$i - 1 )) attempts." | DELOCALIZATION_FAILED=1 - | break + | return | fi | # if destination is not a bucket : abort | if ! [[ "$$destination" =~ s3://([^/]+)/(.+) ]]; then | echo "$$destination is not an S3 path with a bucket and key." | DELOCALIZATION_FAILED=1 - | break + | return | fi | # copy ok or try again. | if [[ -d "$$local_path" ]]; then @@ -266,20 +276,70 @@ final case class AwsBatchJob(jobDescriptor: BackendJobDescriptor, // WDL/CWL | { echo "data content length difference detected in attempt $$i to copy $$local_path/$$FILE failed" && sleep $$((7 * "$$i")) && continue 2; } | done | IFS="$$SAVEIFS" - | else + | # files : if exists or non-optional : must succeed + | elif [[ "$$is_optional" == "false" || -e "$$local_path" ]]; then | $awsCmd s3 cp --no-progress "$$local_path" "$$destination" || | { echo "attempt $$i to copy $$local_path failed" && sleep $$((7 * "$$i")) && continue; } | # check content length for data integrity | _check_data_integrity "$$local_path" "$$destination" || - | { echo "data content length difference detected in attempt $$i to copy $$local_path failed" && sleep $$((7 * "$$i")) && continue; } + | { echo "data content length difference detected in attempt $$i to copy $$local_path failed" && sleep $$((7 * "$$i")) && continue; } + | elif [[ "$$is_optional" == "true" && ! -e "$$local_path" ]]; then + | echo "Optional file '$$local_path' does not exist. skipping delocalization" + | # not optional, but missing : fail + | elif [[ "$$is_optional" == "false" && ! -e "$$local_path" ]]; then + | echo "$$local_path does not exist. skipping delocalization" + | DELOCALIZATION_FAILED=1 | fi - | # copy succeeded - | break + | # copy succeeded or not retrying + | return | done |} | + |function _check_efs_outfile() { + | local outfile="$$1" + | local need_md5="$$2" + | local is_optional="$$3" + | # if file exists and md5 needed: must succeed + | if [[ -e "$$outfile" && "$$need_md5" == "true" ]]; then + | # only create if missing or outdated + | if [[ ! -f "$$outfile.md5" || "$$outfile" -nt "$$outfile.md5" ]]; then + | md5sum "$$outfile" > "$$outfile.md5" || (echo "Could not generate $$outfile.md5" && DELOCALIZATION_FAILED=1 ); + | else + | echo "md5sum file exists. skipping md5sum generation." + | fi + | # if file does not exist : ok if optional file (mention) ; fail if mandatory + | elif [[ ! -e "$$outfile" ]]; then + | if [[ "$$is_optional" == "true" ]]; then + | echo "optional output file: $$outfile does not exist" + | else + | echo "mandatory output file: $$outfile does not exist" + | DELOCALIZATION_FAILED=1 + | fi + | fi + |} + | + |function _check_efs_infile() { + | local infile="$$1" + | local is_optional="$$2" + | # if file exists : ok + | if [[ -e "$$infile" ]]; then + | return + | # if file does not exist : ok if optional file (mention) ; fail if mandatory + | elif [[ "$$is_optional" == "true" ]]; then + | echo "optional input file: $$infile does not exist" + | else + | echo "mandatory input file: $$infile does not exist" + | LOCALIZATION_FAILED=1 + | fi + |} + | |function _get_multipart_chunk_size() { | local file_path="$$1" + | # missing files : skip. + | if [[ ! -e "$$file_path" ]]; then + | echo $$(( 5 * 1024 * 1024 )) + | return + | fi | # file size | file_size=$$(stat --printf="%s" "$$file_path") | # chunk_size : you can have at most 10K parts with at least one 5MB part @@ -457,39 +517,19 @@ final case class AwsBatchJob(jobDescriptor: BackendJobDescriptor, // WDL/CWL // files on /cromwell/ working dir must be delocalized case output: AwsBatchFileOutput if output.s3key.startsWith("s3://") && output.mount.mountPoint.pathAsString == AwsBatchWorkingDisk.MountPoint.pathAsString => //output is on working disk mount - Log.debug("output Data on working disk mount" + output.local.pathAsString) - s"""_s3_delocalize_with_retry "$workDir/${output.local.pathAsString}" "${output.s3key}" """.stripMargin + s"""_s3_delocalize_with_retry "$workDir/${output.local.pathAsString}" "${output.s3key}" "${output.optional}" """.stripMargin // files on EFS mounts are optionally delocalized. case output: AwsBatchFileOutput if efsMntPoint.isDefined && output.mount.mountPoint.pathAsString == efsMntPoint.get => Log.debug("EFS output file detected: "+ output.s3key + s" / ${output.mount.mountPoint.pathAsString}/${output.local.pathAsString}") // EFS located file : test existence or delocalize. - var test_cmd = "" if (efsDelocalize.isDefined && efsDelocalize.getOrElse(false)) { Log.debug("efs-delocalization enabled") - test_cmd = s"""_s3_delocalize_with_retry "${output.mount.mountPoint.pathAsString}/${output.local.pathAsString}" "${output.s3key}" """.stripMargin + s"""_s3_delocalize_with_retry "${output.mount.mountPoint.pathAsString}/${output.local.pathAsString}" "${output.s3key}" "${output.optional}" """.stripMargin } else { - Log.debug("efs-delocalization disabled") - // check file for existence - test_cmd = s"""test -e "${output.mount.mountPoint.pathAsString}/${output.local.pathAsString}" || (echo 'output file: ${output.mount.mountPoint.pathAsString}/${output.local.pathAsString} does not exist' && DELOCALIZATION_FAILED=1)""".stripMargin + Log.debug("efs-delocalization disabled") + s"""_check_efs_outfile "${output.mount.mountPoint.pathAsString}/${output.local.pathAsString}" "${efsMakeMD5.getOrElse(false)}" "${output.optional}" """.stripMargin } - // need to make md5sum? - var md5_cmd = "" - if (efsMakeMD5.isDefined && efsMakeMD5.getOrElse(false)) { - Log.debug("Add cmd to create MD5 sibling if missing or outdated.") - md5_cmd = s""" - |if [[ ! -f '${output.mount.mountPoint.pathAsString}/${output.local.pathAsString}.md5' || '${output.mount.mountPoint.pathAsString}/${output.local.pathAsString}' -nt '${output.mount.mountPoint.pathAsString}/${output.local.pathAsString}.md5' ]]; then - | md5sum '${output.mount.mountPoint.pathAsString}/${output.local.pathAsString}' > '${output.mount.mountPoint.pathAsString}/${output.local.pathAsString}.md5' || (echo 'Could not generate ${output.mount.mountPoint.pathAsString}/${output.local.pathAsString}.md5' && DELOCALIZATION_FAILED=1 ); - |fi - |""".stripMargin - } else { - md5_cmd = "" - } - // return combined result - s""" - |${test_cmd} - |${md5_cmd} - | """.stripMargin case output: AwsBatchFileOutput => //output on a different mount @@ -518,6 +558,7 @@ final case class AwsBatchJob(jobDescriptor: BackendJobDescriptor, // WDL/CWL |echo '*** DELOCALIZING OUTPUTS ***' |DELOCALIZATION_FAILED=0 |$outputCopyCommand + |echo "DELOCALIZATION RESULT: $$DELOCALIZATION_FAILED" |if [[ $$DELOCALIZATION_FAILED -eq 1 ]]; then | echo '*** DELOCALIZATION FAILED ***' | echo '*** EXITING WITH RETURN CODE 1***' @@ -618,6 +659,10 @@ final case class AwsBatchJob(jobDescriptor: BackendJobDescriptor, // WDL/CWL ) submitJobRequest = submitJobRequest.tags(tags.asJava).propagateTags(true) } + // JobTimeout provided (positive value) : add to request + if (runtimeAttributes.jobTimeout > 0) { + submitJobRequest = submitJobRequest.timeout(JobTimeout.builder().attemptDurationSeconds(runtimeAttributes.jobTimeout).build()) + } // submit val submit: F[SubmitJobResponse] = async.delay(batchClient.submitJob( diff --git a/supportedBackends/aws/src/main/scala/cromwell/backend/impl/aws/AwsBatchJobDefinition.scala b/supportedBackends/aws/src/main/scala/cromwell/backend/impl/aws/AwsBatchJobDefinition.scala index 7f2cbd1b953..2c89b02eb49 100755 --- a/supportedBackends/aws/src/main/scala/cromwell/backend/impl/aws/AwsBatchJobDefinition.scala +++ b/supportedBackends/aws/src/main/scala/cromwell/backend/impl/aws/AwsBatchJobDefinition.scala @@ -165,8 +165,9 @@ trait AwsBatchJobDefinitionBuilder { efsMakeMD5: Boolean, tagResources: Boolean, logGroupName: String, - sharedMemorySize: MemorySize): String = { - s"$imageName:$packedCommand:${volumes.map(_.toString).mkString(",")}:${mountPoints.map(_.toString).mkString(",")}:${env.map(_.toString).mkString(",")}:${ulimits.map(_.toString).mkString(",")}:${efsDelocalize.toString}:${efsMakeMD5.toString}:${tagResources.toString}:$logGroupName:${sharedMemorySize.to(MemoryUnit.MB).amount.toInt}" + sharedMemorySize: MemorySize, + jobTimeout: Int): String = { + s"$imageName:$packedCommand:${volumes.map(_.toString).mkString(",")}:${mountPoints.map(_.toString).mkString(",")}:${env.map(_.toString).mkString(",")}:${ulimits.map(_.toString).mkString(",")}:${efsDelocalize.toString}:${efsMakeMD5.toString}:${tagResources.toString}:$logGroupName:${sharedMemorySize.to(MemoryUnit.MB).amount.toInt}:${jobTimeout}" } val environment = List.empty[KeyValuePair] @@ -202,7 +203,8 @@ trait AwsBatchJobDefinitionBuilder { efsMakeMD5, tagResources, logGroupName, - context.runtimeAttributes.sharedMemorySize + context.runtimeAttributes.sharedMemorySize, + context.runtimeAttributes.jobTimeout ) // To reuse job definition for gpu and gpu-runs, we will create a job definition that does not gpu requirements // since aws batch does not allow you to set gpu as 0 when you dont need it. you will always need cpu and memory diff --git a/supportedBackends/aws/src/main/scala/cromwell/backend/impl/aws/AwsBatchMetadataKeys.scala b/supportedBackends/aws/src/main/scala/cromwell/backend/impl/aws/AwsBatchMetadataKeys.scala index cf7ec9bea94..40c97ce08da 100644 --- a/supportedBackends/aws/src/main/scala/cromwell/backend/impl/aws/AwsBatchMetadataKeys.scala +++ b/supportedBackends/aws/src/main/scala/cromwell/backend/impl/aws/AwsBatchMetadataKeys.scala @@ -37,4 +37,8 @@ object AwsBatchMetadataKeys { // val EndpointUrl = "batch:endpointUrl" val MonitoringScript = "batch:monitoringScript" val MonitoringLog = "monitoringLog" + val LogStreamName = "logConfig:logStreamName" + val LogGroupName = "logConfig:logGroupName" + //val JobStatusReason = "jobStatusReason" + val LogStreamRegion = "logConfig:region" } diff --git a/supportedBackends/aws/src/main/scala/cromwell/backend/impl/aws/AwsBatchParameters.scala b/supportedBackends/aws/src/main/scala/cromwell/backend/impl/aws/AwsBatchParameters.scala index 61a61e60a5f..907f55a76a9 100644 --- a/supportedBackends/aws/src/main/scala/cromwell/backend/impl/aws/AwsBatchParameters.scala +++ b/supportedBackends/aws/src/main/scala/cromwell/backend/impl/aws/AwsBatchParameters.scala @@ -47,7 +47,7 @@ sealed trait AwsBatchParameter { sealed trait AwsBatchInput extends AwsBatchParameter -final case class AwsBatchFileInput(name: String, s3key: String, local: Path, mount: AwsBatchVolume) +final case class AwsBatchFileInput(name: String, s3key: String, local: Path, mount: AwsBatchVolume, optional: Boolean) extends AwsBatchInput { def toKeyValuePair = KeyValuePair.builder.name(name).value(s3key).build def toStringString = (name, s3key) @@ -59,7 +59,7 @@ final case class AwsBatchLiteralInput(name: String, value: String) extends AwsBa def toStringString = (name, value) } -final case class AwsBatchFileOutput(name: String, s3key: String, local: Path, mount: AwsBatchVolume) +final case class AwsBatchFileOutput(name: String, s3key: String, local: Path, mount: AwsBatchVolume, optional: Boolean) extends AwsBatchParameter { def toKeyValuePair = KeyValuePair.builder.name(name).value(s3key).build def toStringString = (name, s3key) diff --git a/supportedBackends/aws/src/main/scala/cromwell/backend/impl/aws/AwsBatchReadLikeFunctions.scala b/supportedBackends/aws/src/main/scala/cromwell/backend/impl/aws/AwsBatchReadLikeFunctions.scala new file mode 100644 index 00000000000..e5dc1aeedf4 --- /dev/null +++ b/supportedBackends/aws/src/main/scala/cromwell/backend/impl/aws/AwsBatchReadLikeFunctions.scala @@ -0,0 +1,32 @@ +package cromwell.backend.impl.aws + +import cromwell.backend.ReadLikeFunctions +//import cromwell.core.io.AsyncIoFunctions +//import cromwell.core.path.PathFactory +import scala.concurrent.Future +import scala.util.Try +import cromwell.core.path.{DefaultPathBuilder} +//import cromwell.backend.impl.aws.AwsBatchBackendInitializationData +//import cromwell.backend.BackendInitializationData +import cromwell.backend.standard.StandardExpressionFunctionsParams + +trait AwsReadLikeFunctions extends ReadLikeFunctions { + // standardParams for expression does not contain backend info... + def standardParams: StandardExpressionFunctionsParams + + + //def backendConfig: AwsBatchConfiguration + + //val aws_config = BackendInitializationData.as[AwsBatchBackendInitializationData](standardParams.backendInitializationDataOption).configuration + + override def readFile(path: String, maxBytes: Option[Int], failOnOverflow: Boolean): Future[String] = { + // similar to aws globbing functions, no access to the backend config is available here.... + /// => using hard coded /mnt/efs path. + val awsPath = if (path.startsWith("/mnt/efs/")) { + DefaultPathBuilder.get(path) + } else { + buildPath(path) + } + Future.fromTry(Try(awsPath)) flatMap { p => asyncIo.contentAsStringAsync(p, maxBytes, failOnOverflow) } + } +} \ No newline at end of file diff --git a/supportedBackends/aws/src/main/scala/cromwell/backend/impl/aws/AwsBatchRuntimeAttributes.scala b/supportedBackends/aws/src/main/scala/cromwell/backend/impl/aws/AwsBatchRuntimeAttributes.scala index e90f2feeb78..a6521b683ce 100755 --- a/supportedBackends/aws/src/main/scala/cromwell/backend/impl/aws/AwsBatchRuntimeAttributes.scala +++ b/supportedBackends/aws/src/main/scala/cromwell/backend/impl/aws/AwsBatchRuntimeAttributes.scala @@ -49,7 +49,7 @@ import com.typesafe.config.{ConfigException, ConfigValueFactory} import scala.util.matching.Regex import org.slf4j.{Logger, LoggerFactory} -import wom.RuntimeAttributesKeys.{GpuKey } // , sharedMemoryKey} +import wom.RuntimeAttributesKeys.GpuKey import scala.util.{Failure, Success, Try} import scala.jdk.CollectionConverters._ @@ -96,6 +96,7 @@ case class AwsBatchRuntimeAttributes(cpu: Int Refined Positive, efsDelocalize: Boolean, efsMakeMD5 : Boolean, sharedMemorySize: MemorySize, + jobTimeout: Int, logGroupName: String, additionalTags: Map[String, String], fileSystem: String= "s3", @@ -112,7 +113,7 @@ object AwsBatchRuntimeAttributes { val awsBatchEvaluateOnExitKey = "awsBatchEvaluateOnExit" val defaultSharedMemorySize = MemorySize(64, MemoryUnit.MB) - + private val awsBatchEvaluateOnExitDefault = WomArray(WomArrayType(WomMapType(WomStringType,WomStringType)), Vector(WomMap(Map.empty[WomValue, WomValue]))) @@ -139,13 +140,15 @@ object AwsBatchRuntimeAttributes { private val additionalTagsKey = "additionalTags" val UlimitsKey = "ulimits" + private val jobTimeoutKey = "jobTimeout" + private val UlimitsDefaultValue = WomArray(WomArrayType(WomMapType(WomStringType,WomStringType)), Vector(WomMap(Map.empty[WomValue, WomValue]))) private def cpuValidation(runtimeConfig: Option[Config]): RuntimeAttributesValidation[Int Refined Positive] = CpuValidation.instance .withDefault(CpuValidation.configDefaultWomValue(runtimeConfig) getOrElse CpuValidation.defaultMin) private def gpuCountValidation(runtimeConfig: Option[Config]): RuntimeAttributesValidation[Int] = { - GpuCountValidation(GpuKey).withDefault(GpuCountValidation(GpuKey).configDefaultWomValue(runtimeConfig).getOrElse(WomInteger(0))) + PosIntValidation(GpuKey).withDefault(PosIntValidation(GpuKey).configDefaultWomValue(runtimeConfig).getOrElse(WomInteger(0))) } private def cpuMinValidation(runtimeConfig: Option[Config]):RuntimeAttributesValidation[Int Refined Positive] = CpuValidation.instanceMin @@ -186,7 +189,10 @@ object AwsBatchRuntimeAttributes { MemoryValidation.configDefaultString(RuntimeAttributesKeys.sharedMemoryKey, runtimeConfig) getOrElse defaultSharedMemorySize.toString ) } - + + private def jobTimeoutValidation(runtimeConfig: Option[Config]): RuntimeAttributesValidation[Int] = { + PosIntValidation(jobTimeoutKey, minValue = 60).withDefault(PosIntValidation(jobTimeoutKey, minValue = 60).configDefaultWomValue(runtimeConfig).getOrElse(WomInteger(0))) + } private def logGroupNameValidation(runtimeConfig: Option[Config]): RuntimeAttributesValidation[String] = logGroupNameValidationInstance .withDefault(logGroupNameValidationInstance.configDefaultWomValue(runtimeConfig) getOrElse LogGroupNameDefaultValue) @@ -278,6 +284,7 @@ object AwsBatchRuntimeAttributes { awsBatchefsMakeMD5Validation(runtimeConfig), awsBatchtagResourcesValidation(runtimeConfig), sharedMemorySizeValidation(runtimeConfig), + jobTimeoutValidation(runtimeConfig) ) def validationsLocalBackend = StandardValidatedRuntimeAttributesBuilder.default(runtimeConfig).withValidation( cpuValidation(runtimeConfig), @@ -298,6 +305,7 @@ object AwsBatchRuntimeAttributes { awsBatchefsMakeMD5Validation(runtimeConfig), awsBatchtagResourcesValidation(runtimeConfig), sharedMemorySizeValidation(runtimeConfig), + jobTimeoutValidation(runtimeConfig) ) configuration.fileSystem match { @@ -340,6 +348,7 @@ object AwsBatchRuntimeAttributes { val efsMakeMD5: Boolean = RuntimeAttributesValidation.extract(awsBatchefsMakeMD5Validation(runtimeAttrsConfig),validatedRuntimeAttributes) val tagResources: Boolean = RuntimeAttributesValidation.extract(awsBatchtagResourcesValidation(runtimeAttrsConfig),validatedRuntimeAttributes) val sharedMemorySize: MemorySize = RuntimeAttributesValidation.extract(sharedMemorySizeValidation(runtimeAttrsConfig), validatedRuntimeAttributes) + val jobTimeout: Int = RuntimeAttributesValidation.extract(jobTimeoutValidation(runtimeAttrsConfig), validatedRuntimeAttributes) new AwsBatchRuntimeAttributes( cpu, @@ -359,6 +368,7 @@ object AwsBatchRuntimeAttributes { efsDelocalize, efsMakeMD5, sharedMemorySize, + jobTimeout, logGroupName, additionalTags, fileSystem, @@ -536,17 +546,18 @@ object DisksValidation extends RuntimeAttributesValidation[Seq[AwsBatchVolume]] s"Expecting $key runtime attribute to be a comma separated String or Array[String]" } -object GpuCountValidation { - def apply(key: String): GpuCountValidation = new GpuCountValidation(key) +object PosIntValidation { + def apply(key: String, minValue: Int = 0): PosIntValidation = new PosIntValidation(key, minValue) } -class GpuCountValidation(key: String) extends IntRuntimeAttributesValidation(key) { +class PosIntValidation(key: String, minValue: Int = 0) extends IntRuntimeAttributesValidation(key) { override protected def validateValue: PartialFunction[WomValue, ErrorOr[Int]] = { case womValue if WomIntegerType.coerceRawValue(womValue).isSuccess => WomIntegerType.coerceRawValue(womValue).get match { case WomInteger(value) => - if (value.toInt < 0) - s"Expecting $key runtime attribute value greater than or equal to 0".invalidNel + // allow the default value of zero. + if (value.toInt < minValue && value.toInt != 0) + s"Expecting $key runtime attribute value greater than or equal to $minValue".invalidNel else value.toInt.validNel } diff --git a/supportedBackends/aws/src/main/scala/cromwell/backend/impl/aws/README.md b/supportedBackends/aws/src/main/scala/cromwell/backend/impl/aws/README.md index eb998dceb0f..115a9af91b5 100644 --- a/supportedBackends/aws/src/main/scala/cromwell/backend/impl/aws/README.md +++ b/supportedBackends/aws/src/main/scala/cromwell/backend/impl/aws/README.md @@ -210,6 +210,26 @@ task gpu_queue_task { } ``` +### Job TimeOut Support + +Cromwell supports the [AWS Batch jobTimeout](https://docs.aws.amazon.com/batch/latest/userguide/job_timeouts.html) parameter. This allows to automatically terminate jobs that have been running for more than the specified (expected) timeout. The timeout is specified in seconds. Timeout can be specified as a default for the workflow or per task. Values under 60s are not allowed by AWS and ignored in cromwell. + +default runtime attribute: +``` +{ + "default_runtime_attributes": { + "jobTimeout" : 3600 + } +} +``` + +task runtime attribute: +``` +runtime { + jobTimeout: 3600 +} +``` + ### Call Caching with ECR private diff --git a/supportedBackends/aws/src/main/scala/cromwell/backend/impl/aws/callcaching/AwsBatchBackendCacheHitCopyingActor.scala b/supportedBackends/aws/src/main/scala/cromwell/backend/impl/aws/callcaching/AwsBatchBackendCacheHitCopyingActor.scala index 7610954df3a..182e34ff02f 100755 --- a/supportedBackends/aws/src/main/scala/cromwell/backend/impl/aws/callcaching/AwsBatchBackendCacheHitCopyingActor.scala +++ b/supportedBackends/aws/src/main/scala/cromwell/backend/impl/aws/callcaching/AwsBatchBackendCacheHitCopyingActor.scala @@ -34,16 +34,24 @@ import common.util.TryUtil import cromwell.backend.BackendInitializationData import cromwell.backend.impl.aws.{AWSBatchStorageSystems, AwsBatchBackendInitializationData,AwsBatchJobCachingActorHelper} import cromwell.backend.io.JobPaths +// import cromwell.backend.impl.aws.io._ + import cromwell.backend.standard.callcaching.{StandardCacheHitCopyingActor, StandardCacheHitCopyingActorParams} import cromwell.core.CallOutputs import cromwell.core.io.{DefaultIoCommandBuilder, IoCommand, IoCommandBuilder} -import cromwell.core.path.Path +import cromwell.core.path.{Path, PathCopier} // ,DefaultPathBuilder} import cromwell.core.simpleton.{WomValueBuilder, WomValueSimpleton} import cromwell.filesystems.s3.batch.S3BatchCommandBuilder -import wom.values.WomFile +import wom.values._ //WomFile +import wom.types._ +import wom.callable.Callable.OutputDefinition +import wom.expression.NoIoFunctionSet +//import java.net.{URLDecoder} +//import cromwell.filesystems.s3.S3Path +import cats.syntax.validated._ import scala.language.postfixOps -import scala.util.Try +import scala.util.{Try} //, Success} class AwsBatchBackendCacheHitCopyingActor(standardParams: StandardCacheHitCopyingActorParams) extends StandardCacheHitCopyingActor(standardParams) with AwsBatchJobCachingActorHelper{ private val batchAttributes = BackendInitializationData @@ -57,20 +65,166 @@ class AwsBatchBackendCacheHitCopyingActor(standardParams: StandardCacheHitCopyin } private val cachingStrategy = batchAttributes.duplicationStrategy + + + + // taken from the WomExpression trait: + private def areAllFilesOptional(womType: WomType): Boolean = { + def innerAreAllFileTypesInWomTypeOptional(womType: WomType): Boolean = womType match { + case WomOptionalType(_: WomPrimitiveFileType) => + true + case _: WomPrimitiveFileType => + false + case _: WomPrimitiveType => + true // WomPairTypes and WomCompositeTypes may have non-File components here which is fine. + case WomArrayType(inner) => innerAreAllFileTypesInWomTypeOptional(inner) + case WomMapType(_, inner) => innerAreAllFileTypesInWomTypeOptional(inner) + case WomPairType(leftType, rightType) => + innerAreAllFileTypesInWomTypeOptional(leftType) && innerAreAllFileTypesInWomTypeOptional(rightType) + case WomCompositeType(typeMap, _) => typeMap.values.forall(innerAreAllFileTypesInWomTypeOptional) + case _ => false + } + + // At the outermost level, primitives are never optional. + womType match { + case _: WomPrimitiveType => + false + case _ => innerAreAllFileTypesInWomTypeOptional(womType) + } + } + // taken from AwsBatchAsyncBackendJobExectuionHandler + private def evaluateFiles(output: OutputDefinition): List[(WomFile, Boolean)] = { + // mixed mandatory/optional types are cast to mandatory. + val is_optional = areAllFilesOptional(output.womType) + Try( + output.expression + .evaluateFiles( + jobDescriptor.localInputs, + NoIoFunctionSet, + output.womType + ) + .map(_.toList map { womFile => + // Pair each WomFile with the optional status + (womFile.file, is_optional) + }) + ).getOrElse(List.empty[(WomFile, Boolean)].validNel) + .getOrElse(List.empty) + } + + + + val womFileOutputsEvaluated = jobDescriptor.taskCall.callable.outputs + .flatMap(evaluateFiles) + + // then get paths into a map of path => optional + val womFileMap: Map[String, Boolean] = womFileOutputsEvaluated.map { case (womFile, isOptional) => + womFile.value -> isOptional + }.toMap + + + + + // starting from the womFileMap and simpletons, determine if the file is optional. + // in womFileMap, the key is the path as speccified in the WDL (inside working dir), eg "new_dir/outfile.txt" + // in WomValueSimpletons, the key is the full source path for cache copy, eg "s3://bucket/cromwell_temp/wf-id/call-id/new_dir/outfile.txt" + // strategy : + // - check all keys in womFileMap. if simpleton ends with key, use optional_status from womFileMap (the value) + // - as soon as a mandatory file is found : break and return false + // - keep checking until the end otherwise, as multiple files can have nested suffixes (eg "new_dir/outfile.txt" and "outfile.txt") + // - if the key is found and optional : return true ; else return false + def is_optional(womFile: String, womFileMap: Map[String, Boolean]): Boolean = { + var isOptional = false + for ((key, value) <- womFileMap) { + if (womFile.endsWith(key)) { + if (!value) { + return false + } + isOptional = value + } + } + return isOptional + } + + // check if the file is on efs (local) or s3 + def is_efs(womFile: String): Boolean = { + // get efs mount point/ disk from config + val efs_mount = configuration.efsMntPoint.getOrElse("--") + return womFile.startsWith(efs_mount) + } + + override def processSimpletons(womValueSimpletons: Seq[WomValueSimpleton], sourceCallRootPath: Path ): Try[(CallOutputs, Set[IoCommand[_]])] = (batchAttributes.fileSystem, cachingStrategy) match { + /////////////////////// + // CACHE = REFERENCE // + /////////////////////// case (AWSBatchStorageSystems.s3, UseOriginalCachedOutputs) => - val touchCommands: Seq[Try[IoCommand[_]]] = womValueSimpletons collect { - // only work on WomFiles, skip others? - case WomValueSimpleton(_, wdlFile: WomFile) => - getPath(wdlFile.value).flatMap(S3BatchCommandBuilder.existsOrThrowCommand) + val touchCommands: Seq[Try[(WomValueSimpleton,IoCommand[_])]] = womValueSimpletons collect { + // only work on WomFiles + case WomValueSimpleton(key, wdlFile: WomFile) => + val sourcePath = getPath(wdlFile.value).get + // reference, so source == destination + val destinationPath = sourcePath + val destinationSimpleton = WomValueSimpleton(key, WomSingleFile(destinationPath.pathAsString)) + if (is_optional(wdlFile.value,womFileMap)) { + // can I use this instead of noopCommand (from super) : case nonFileSimpleton => (List(nonFileSimpleton), Set.empty[IoCommand[_]]) + Try(destinationSimpleton -> S3BatchCommandBuilder.noopCommand(destinationPath).get) + } else { + Try(destinationSimpleton -> S3BatchCommandBuilder.existsOrThrowCommand(destinationPath).get) + } + case nonFileSimpleton => + Try(nonFileSimpleton -> S3BatchCommandBuilder.noopCommand(getPath("").get).get) } + // group touchcommands + TryUtil.sequence(touchCommands) map { simpletonsAndCommands => + val (destinationSimpletons, ioCommands) = simpletonsAndCommands.unzip + WomValueBuilder.toJobOutputs(jobDescriptor.taskCall.outputPorts, destinationSimpletons) -> ioCommands.toSet + } + /////////////////// + // CACHE == COPY // + /////////////////// + case (AWSBatchStorageSystems.s3, CopyCachedOutputs) => + val copyCommands: Seq[Try[(WomValueSimpleton, IoCommand[_])]] = womValueSimpletons collect { + // only work on WomFiles + case WomValueSimpleton(key, wdlFile: WomFile) => + val sourcePath = getPath(wdlFile.value).get + // on efs : existOrthrow (mandatory) or noop (optional) + if (is_efs(wdlFile.value)) { + // on efs : source == destination + val destinationPath = sourcePath + val destinationSimpleton = WomValueSimpleton(key, WomSingleFile(destinationPath.pathAsString)) + if (is_optional(wdlFile.value,womFileMap)) { + Try(destinationSimpleton -> S3BatchCommandBuilder.noopCommand(destinationPath).get) + } else { + Try(destinationSimpleton -> S3BatchCommandBuilder.existsOrThrowCommand(destinationPath).get) + } + } + // on s3 : copy (mandatory) or copy if exists (optional) + else {// on efs : source == destination + val destinationPath = PathCopier.getDestinationFilePath(sourceCallRootPath, sourcePath, destinationCallRootPath) + val destinationSimpleton = WomValueSimpleton(key, WomSingleFile(destinationPath.pathAsString)) - TryUtil.sequence(touchCommands) map { - WomValueBuilder.toJobOutputs(jobDescriptor.taskCall.outputPorts, womValueSimpletons) -> _.toSet + // optional + if (is_optional(wdlFile.value,womFileMap)) { + Try(destinationSimpleton -> S3BatchCommandBuilder.noopCommand(destinationPath).get) + // mandatory + } else { + Try(destinationSimpleton -> S3BatchCommandBuilder.copyCommand(sourcePath, destinationPath).get) + } + } + case nonFileSimpleton => + Try(nonFileSimpleton -> S3BatchCommandBuilder.noopCommand(getPath("").get).get) + } + // get copycommands + TryUtil.sequence(copyCommands) map { simpletonsAndCommands => + val (destinationSimpletons, ioCommands) = simpletonsAndCommands.unzip + WomValueBuilder.toJobOutputs(jobDescriptor.taskCall.outputPorts, destinationSimpletons) -> ioCommands.toSet } + /////////////////////// + // NON-S3 FILESYSTEM // + /////////////////////// case (_, _) => super.processSimpletons(womValueSimpletons, sourceCallRootPath) } diff --git a/supportedBackends/aws/src/main/scala/cromwell/backend/impl/aws/callcaching/AwsBatchBackendFileHashingActor.scala b/supportedBackends/aws/src/main/scala/cromwell/backend/impl/aws/callcaching/AwsBatchBackendFileHashingActor.scala index eb6a9f772e7..f8bc66dcfc9 100755 --- a/supportedBackends/aws/src/main/scala/cromwell/backend/impl/aws/callcaching/AwsBatchBackendFileHashingActor.scala +++ b/supportedBackends/aws/src/main/scala/cromwell/backend/impl/aws/callcaching/AwsBatchBackendFileHashingActor.scala @@ -35,15 +35,29 @@ import cromwell.filesystems.s3.batch.S3BatchCommandBuilder import cromwell.backend.BackendInitializationData import cromwell.backend.impl.aws.AwsBatchBackendInitializationData import cromwell.backend.impl.aws.AWSBatchStorageSystems +import cromwell.backend.impl.aws.AwsBatchJobCachingActorHelper +import cromwell.backend.io._ import cromwell.core.io.DefaultIoCommandBuilder +import common.validation.Validation._ import scala.util.Try import cromwell.backend.standard.callcaching.StandardFileHashingActor.SingleFileHashRequest -import cromwell.core.path.DefaultPathBuilder + import org.slf4j.{Logger, LoggerFactory} +import scala.util.Random +import wom.types.{WomArrayType, + WomType, + WomOptionalType, + WomPrimitiveFileType, + WomSingleFileType, + WomPrimitiveType, + WomMapType, + WomPairType, + WomCompositeType} +import wom.values._ class AwsBatchBackendFileHashingActor(standardParams: StandardFileHashingActorParams) - extends StandardFileHashingActor(standardParams) { - + extends StandardFileHashingActor(standardParams) with AwsBatchJobCachingActorHelper{ + val Log: Logger = LoggerFactory.getLogger(StandardFileHashingActor.getClass) override val ioCommandBuilder = BackendInitializationData .as[AwsBatchBackendInitializationData](standardParams.backendInitializationDataOption) @@ -56,39 +70,87 @@ class AwsBatchBackendFileHashingActor(standardParams: StandardFileHashingActorPa // get backend config. val aws_config = BackendInitializationData.as[AwsBatchBackendInitializationData](standardParams.backendInitializationDataOption).configuration - // custom strategy to handle efs (local) files, in case sibling-md5 file is present. - // if valid md5 is found : return the hash + // adapted from the WomExpression trait: + private def areAllFilesOptional(womType: WomType): Boolean = { + def innerAreAllFileTypesInWomTypeOptional(womType: WomType): Boolean = womType match { + case WomOptionalType(_: WomPrimitiveFileType) => + true + case _: WomPrimitiveFileType => + false + case _: WomPrimitiveType => + true // WomPairTypes and WomCompositeTypes may have non-File components here which is fine. + case WomArrayType(inner) => innerAreAllFileTypesInWomTypeOptional(inner) + case WomMapType(_, inner) => innerAreAllFileTypesInWomTypeOptional(inner) + case WomPairType(leftType, rightType) => + innerAreAllFileTypesInWomTypeOptional(leftType) && innerAreAllFileTypesInWomTypeOptional(rightType) + case WomCompositeType(typeMap, _) => typeMap.values.forall(innerAreAllFileTypesInWomTypeOptional) + case _ => false + } + + // At the outermost level, primitives are never optional. + womType match { + case _: WomPrimitiveType => + false + case _ => innerAreAllFileTypesInWomTypeOptional(womType) + } + } + + + + // custom strategy to handle optional and efs (local) files + // if optional file is missing : return hash of empty string (valid) + // if valid md5 is found for efs file: return the hash // if no md5 is found : return None (pass request to parent hashing actor) // if outdated md5 is found : return invalid string (assume file has been altered after md5 creation) // if file is missing : return invalid string override def customHashStrategy(fileRequest: SingleFileHashRequest): Option[Try[String]] = { - val file = DefaultPathBuilder.get(fileRequest.file.valueString) - if (aws_config.efsMntPoint.isDefined && file.toString.startsWith(aws_config.efsMntPoint.getOrElse("--")) && aws_config.checkSiblingMd5.getOrElse(false)) { - val md5 = file.sibling(s"${file.toString}.md5") - // check existance of the file : - if (!file.exists) { - Log.debug(s"File Missing: ${file.toString}") - // if missing, cache hit is invalid; return invalid md5 - Some("File Missing").map(str => Try(str)) - } - // check existence of the sibling file and make sure it's newer than main file - else if (md5.exists && md5.lastModifiedTime.isAfter(file.lastModifiedTime)) { - // read the file. - Log.debug("Found valid sibling file for " + file.toString) - val md5_value: Option[String] = Some(md5.contentAsString.split("\\s+")(0)) - md5_value.map(str => Try(str)) - } else if (md5.exists && md5.lastModifiedTime.isBefore(file.lastModifiedTime)) { - // sibling file is outdated, return invalid md5 - Log.debug("Found outdated sibling file for " + file.toString) - Some("Checksum File Outdated").map(str => Try(str)) - } else { - Log.debug("Found no sibling file for " + file.toString) - // File present, but no sibling found, fall back to default. - None - } - + val file = getPath(fileRequest.file.value).get + + // the call inputs are mapped as path => optional + val callInputFiles: Map[String, Boolean] = standardParams.jobDescriptor.fullyQualifiedInputs.flatMap { + case (_, womFile) => + // Collect all WomArrays of WomFiles + val arrays: Seq[WomArray] = womFile.collectAsSeq { case womFile: WomFile => + val files: List[WomSingleFile] = DirectoryFunctions + .listWomSingleFiles(womFile, callPaths.workflowPaths) + .toTry(s"Error getting single files for $womFile") + .get + WomArray(WomArrayType(WomSingleFileType), files) + } + // Determine if all files in the womFile are optional + val isOptional = areAllFilesOptional(womFile.womType) + // Flatten arrays and map each file path to its optional status + arrays.flatMap(_.value).collect { case file: WomFile => + file.toString -> isOptional + } + }.toMap + + // optional files are allowed to be missing + if (callInputFiles.contains(fileRequest.file.toString) && callInputFiles(fileRequest.file.toString) && ! file.exists) { + // return hash of empty string + Some("".md5Sum).map(str => Try(str)) + // the file is an efs file and sibling md5 is enabled + } else if (file.toString.startsWith(aws_config.efsMntPoint.getOrElse("--")) && aws_config.checkSiblingMd5.getOrElse(false)) { + val md5 = file.sibling(s"${file.toString}.md5") + // check existance of the main file : + if (!file.exists) { + // cache hit is invalid; return invalid (random) md5 + Some(Random.alphanumeric.take(32).mkString.md5Sum).map(str => Try(str)) + } + // check existence of the sibling file and make sure it's newer than main file + else if (md5.exists && md5.lastModifiedTime.isAfter(file.lastModifiedTime)) { + // read the file. + val md5_value: Option[String] = Some(md5.contentAsString.split("\\s+")(0)) + md5_value.map(str => Try(str)) + } else if (md5.exists && md5.lastModifiedTime.isBefore(file.lastModifiedTime)) { + // sibling file is outdated, return invalid (random) string as md5 + Some(Random.alphanumeric.take(32).mkString.md5Sum).map(str => Try(str)) + } else { + // File present, but no sibling found, fall back to default. + None + } } else { - // Detected non-EFS file: return None + // non-efs file or sibling md5 is disabled : fall back to default None } } diff --git a/supportedBackends/aws/src/test/scala/cromwell/backend/impl/aws/AwsBatchAsyncBackendJobExecutionActorSpec.scala b/supportedBackends/aws/src/test/scala/cromwell/backend/impl/aws/AwsBatchAsyncBackendJobExecutionActorSpec.scala index 61ce8db8add..5f76722d732 100644 --- a/supportedBackends/aws/src/test/scala/cromwell/backend/impl/aws/AwsBatchAsyncBackendJobExecutionActorSpec.scala +++ b/supportedBackends/aws/src/test/scala/cromwell/backend/impl/aws/AwsBatchAsyncBackendJobExecutionActorSpec.scala @@ -626,56 +626,64 @@ class AwsBatchAsyncBackendJobExecutionActorSpec AwsBatchFileInput("stringToFileMap-0", "s3://path/to/stringTofile1", DefaultPathBuilder.get("path/to/stringTofile1"), - workingDisk + workingDisk, + false ) ) batchInputs should contain( AwsBatchFileInput("stringToFileMap-1", "s3://path/to/stringTofile2", DefaultPathBuilder.get("path/to/stringTofile2"), - workingDisk + workingDisk, + false ) ) batchInputs should contain( AwsBatchFileInput("fileToStringMap-0", "s3://path/to/fileToString1", DefaultPathBuilder.get("path/to/fileToString1"), - workingDisk + workingDisk, + false ) ) batchInputs should contain( AwsBatchFileInput("fileToStringMap-1", "s3://path/to/fileToString2", DefaultPathBuilder.get("path/to/fileToString2"), - workingDisk + workingDisk, + false ) ) batchInputs should contain( AwsBatchFileInput("fileToFileMap-0", "s3://path/to/fileToFile1Key", DefaultPathBuilder.get("path/to/fileToFile1Key"), - workingDisk + workingDisk, + false ) ) batchInputs should contain( AwsBatchFileInput("fileToFileMap-1", "s3://path/to/fileToFile1Value", DefaultPathBuilder.get("path/to/fileToFile1Value"), - workingDisk + workingDisk, + false ) ) batchInputs should contain( AwsBatchFileInput("fileToFileMap-2", "s3://path/to/fileToFile2Key", DefaultPathBuilder.get("path/to/fileToFile2Key"), - workingDisk + workingDisk, + false ) ) batchInputs should contain( AwsBatchFileInput("fileToFileMap-3", "s3://path/to/fileToFile2Value", DefaultPathBuilder.get("path/to/fileToFile2Value"), - workingDisk + workingDisk, + false ) ) @@ -744,7 +752,7 @@ class AwsBatchAsyncBackendJobExecutionActorSpec val batchInputs = backend.generateAwsBatchInputs(jobDescriptor) batchInputs should have size 1 batchInputs should contain( - AwsBatchFileInput("in-0", "s3://blah/b/c.txt", DefaultPathBuilder.get("blah/b/c.txt"), workingDisk) + AwsBatchFileInput("in-0", "s3://blah/b/c.txt", DefaultPathBuilder.get("blah/b/c.txt"), workingDisk,false) ) val outputs = backend.generateAwsBatchOutputs(jobDescriptor) outputs should have size 1 @@ -752,7 +760,8 @@ class AwsBatchAsyncBackendJobExecutionActorSpec AwsBatchFileOutput("out", s"s3://my-cromwell-workflows-bucket/file_passing/$workflowId/call-a/out", DefaultPathBuilder.get("out"), - workingDisk + workingDisk, + false ) ) } @@ -781,7 +790,8 @@ class AwsBatchAsyncBackendJobExecutionActorSpec AwsBatchFileInput("c6fd5c91-0", "s3://some/path/file.txt", DefaultPathBuilder.get("some/path/file.txt"), - workingDisk + workingDisk, + false ) ) val outputs = backend.generateAwsBatchOutputs(jobDescriptor) @@ -793,7 +803,7 @@ class AwsBatchAsyncBackendJobExecutionActorSpec val batchInputsLocal = backend.generateAwsBatchInputs(jobDescriptorLocal) batchInputsLocal should have size 1 batchInputsLocal should contain( - AwsBatchFileInput("c6fd5c91-0", "/some/path/file.txt", DefaultPathBuilder.get("some/path/file.txt"), workingDisk) + AwsBatchFileInput("c6fd5c91-0", "/some/path/file.txt", DefaultPathBuilder.get("some/path/file.txt"), workingDisk,false) ) val outputsLocal = backendLocal.generateAwsBatchOutputs(jobDescriptorLocal) outputsLocal should have size 0 @@ -850,10 +860,10 @@ class AwsBatchAsyncBackendJobExecutionActorSpec val batchInputs = testActorRef.underlyingActor.generateAwsBatchInputs(jobDescriptor) batchInputs should have size 2 batchInputs should contain( - AwsBatchFileInput("fileArray-0", "s3://path/to/file1", DefaultPathBuilder.get("path/to/file1"), workingDisk) + AwsBatchFileInput("fileArray-0", "s3://path/to/file1", DefaultPathBuilder.get("path/to/file1"), workingDisk,false) ) batchInputs should contain( - AwsBatchFileInput("fileArray-1", "s3://path/to/file2", DefaultPathBuilder.get("path/to/file2"), workingDisk) + AwsBatchFileInput("fileArray-1", "s3://path/to/file2", DefaultPathBuilder.get("path/to/file2"), workingDisk,false) ) case Left(badness) => fail(badness.toList.mkString(", ")) } @@ -908,10 +918,10 @@ class AwsBatchAsyncBackendJobExecutionActorSpec val batchInputs = testActorRef.underlyingActor.generateAwsBatchInputs(jobDescriptor) batchInputs should have size 2 batchInputs should contain( - AwsBatchFileInput("file1-0", "s3://path/to/file1", DefaultPathBuilder.get("path/to/file1"), workingDisk) + AwsBatchFileInput("file1-0", "s3://path/to/file1", DefaultPathBuilder.get("path/to/file1"), workingDisk,false) ) batchInputs should contain( - AwsBatchFileInput("file2-0", "s3://path/to/file2", DefaultPathBuilder.get("path/to/file2"), workingDisk) + AwsBatchFileInput("file2-0", "s3://path/to/file2", DefaultPathBuilder.get("path/to/file2"), workingDisk,false) ) case Left(badness) => fail(badness.toList.mkString(", ")) @@ -923,27 +933,32 @@ class AwsBatchAsyncBackendJobExecutionActorSpec AwsBatchFileOutput("/cromwell_root/path/to/file1", "s3://path/to/file1", DefaultPathBuilder.get("/cromwell_root/path/to/file1"), - workingDisk + workingDisk, + false ), AwsBatchFileOutput("/cromwell_root/path/to/file2", "s3://path/to/file2", DefaultPathBuilder.get("/cromwell_root/path/to/file2"), - workingDisk + workingDisk, + false ), AwsBatchFileOutput("/cromwell_root/path/to/file3", "s3://path/to/file3", DefaultPathBuilder.get("/cromwell_root/path/to/file3"), - workingDisk + workingDisk, + false ), AwsBatchFileOutput("/cromwell_root/path/to/file4", "s3://path/to/file4", DefaultPathBuilder.get("/cromwell_root/path/to/file4"), - workingDisk + workingDisk, + false ), AwsBatchFileOutput("/cromwell_root/path/to/file5", "s3://path/to/file5", DefaultPathBuilder.get("/cromwell_root/path/to/file5"), - workingDisk + workingDisk, + false ) ) val outputValues = Seq( diff --git a/supportedBackends/aws/src/test/scala/cromwell/backend/impl/aws/AwsBatchJobSpec.scala b/supportedBackends/aws/src/test/scala/cromwell/backend/impl/aws/AwsBatchJobSpec.scala index bbde4840705..40510baad9e 100644 --- a/supportedBackends/aws/src/test/scala/cromwell/backend/impl/aws/AwsBatchJobSpec.scala +++ b/supportedBackends/aws/src/test/scala/cromwell/backend/impl/aws/AwsBatchJobSpec.scala @@ -106,11 +106,12 @@ class AwsBatchJobSpec extends TestKitSuite with AnyFlatSpecLike with Matchers wi val jobDescriptor: BackendJobDescriptor = BackendJobDescriptor(workFlowDescriptor, jobKey, null, Map.empty, null, null, null) val jobPaths: AwsBatchJobPaths = AwsBatchJobPaths(workflowPaths, jobKey) - val s3Inputs: Set[AwsBatchInput] = Set(AwsBatchFileInput("foo", "s3://bucket/foo", DefaultPathBuilder.get("foo"), AwsBatchWorkingDisk())) - val s3Outputs: Set[AwsBatchFileOutput] = Set(AwsBatchFileOutput("baa", "s3://bucket/somewhere/baa", DefaultPathBuilder.get("baa"), AwsBatchWorkingDisk())) + val s3Inputs: Set[AwsBatchInput] = Set(AwsBatchFileInput("foo", "s3://bucket/foo", DefaultPathBuilder.get("foo"), AwsBatchWorkingDisk(),false)) + val s3Outputs: Set[AwsBatchFileOutput] = Set(AwsBatchFileOutput("baa", "s3://bucket/somewhere/baa", DefaultPathBuilder.get("baa"), AwsBatchWorkingDisk(), false)) val cpu: Int Refined Positive = 2 val sharedMemorySize: MemorySize = "64 MB" + val jobTimeout: Int = 3600 val runtimeAttributes: AwsBatchRuntimeAttributes = new AwsBatchRuntimeAttributes( cpu = cpu, @@ -131,6 +132,7 @@ class AwsBatchJobSpec extends TestKitSuite with AnyFlatSpecLike with Matchers wi efsMakeMD5 = false, fileSystem = "s3", sharedMemorySize = sharedMemorySize, + jobTimeout = jobTimeout, logGroupName = "/aws/batch/job", additionalTags = Map("tag" -> "value") ) @@ -510,4 +512,6 @@ class AwsBatchJobSpec extends TestKitSuite with AnyFlatSpecLike with Matchers wi val actual = jobDefinition.containerProperties.linuxParameters() val expected = LinuxParameters.builder().sharedMemorySize(100).build() expected should equal(actual) - } \ No newline at end of file + } + + // ADD TEST FOR jobTimout \ No newline at end of file diff --git a/supportedBackends/aws/src/test/scala/cromwell/backend/impl/aws/AwsBatchRuntimeAttributesSpec.scala b/supportedBackends/aws/src/test/scala/cromwell/backend/impl/aws/AwsBatchRuntimeAttributesSpec.scala index e54f12162d9..fdee3532092 100644 --- a/supportedBackends/aws/src/test/scala/cromwell/backend/impl/aws/AwsBatchRuntimeAttributesSpec.scala +++ b/supportedBackends/aws/src/test/scala/cromwell/backend/impl/aws/AwsBatchRuntimeAttributesSpec.scala @@ -78,6 +78,7 @@ class AwsBatchRuntimeAttributesSpec extends AnyWordSpecLike with CromwellTimeout false, false, sharedMemorySize = refineMV[Positive](64), + jobTimeout = refineMV[Positive](3600), "/Cromwell/job/", Map("tag1" -> "value1") ) @@ -531,6 +532,9 @@ class AwsBatchRuntimeAttributesSpec extends AnyWordSpecLike with CromwellTimeout )) } + // add tests for jobTimeout + + "missing or invalid action key result in an invalid awsBatchEvaluateOnExit" in { val invalidEvaluateOnExit = List( // missing action key diff --git a/wdl/transforms/new-base/src/main/scala/wdl/transforms/base/wdlom2wom/CommandPartElementToWomCommandPart.scala b/wdl/transforms/new-base/src/main/scala/wdl/transforms/base/wdlom2wom/CommandPartElementToWomCommandPart.scala index 2ff236f5437..d9892342d2e 100644 --- a/wdl/transforms/new-base/src/main/scala/wdl/transforms/base/wdlom2wom/CommandPartElementToWomCommandPart.scala +++ b/wdl/transforms/new-base/src/main/scala/wdl/transforms/base/wdlom2wom/CommandPartElementToWomCommandPart.scala @@ -11,7 +11,7 @@ import wdl.transforms.base.linking.graph.LinkedGraphMaker import wdl.transforms.base.wdlom2wom.expression.WdlomWomExpression import wom.expression.{IoFunctionSet, WomExpression} import wom.graph.LocalName -import wom.types.{WomArrayType, WomPrimitiveType, WomType} +import wom.types.{WomArrayType, WomOptionalType, WomPrimitiveType, WomType} import wom.values.{WomArray, WomBoolean, WomOptionalValue, WomPrimitive, WomValue} import wom.{CommandPart, InstantiatedCommand} import wdl.model.draft3.graph.ExpressionValueConsumer.ops._ @@ -101,6 +101,15 @@ case class WdlomWomPlaceholderCommandPart(attributes: PlaceholderAttributeSet, e ).validNel case None => "Array value was given but no 'sep' attribute was provided".invalidNel } + case WomArray(WomArrayType(WomOptionalType(_)), arrayValue) => + attributes.sepAttribute match { + case Some(separator) => + // there is no actual checking for existence of the files. that's up to the job. + InstantiatedCommand(commandString = arrayValue.map(valueMapper(_).valueString).mkString(separator), + createdFiles = value.sideEffectFiles.toList + ).validNel + case None => "Array[File?] value was given but no 'sep' attribute was provided".invalidNel + } case other => s"Cannot interpolate ${other.womType.stableName} into a command string with attribute set [$attributes]".invalidNel } diff --git a/wom/src/main/scala/wom/RuntimeAttributes.scala b/wom/src/main/scala/wom/RuntimeAttributes.scala index 178785cea4d..032d4320e57 100644 --- a/wom/src/main/scala/wom/RuntimeAttributes.scala +++ b/wom/src/main/scala/wom/RuntimeAttributes.scala @@ -22,6 +22,8 @@ object RuntimeAttributesKeys { val MemoryMinKey = "memoryMin" val MemoryMaxKey = "memoryMax" val sharedMemoryKey = "sharedMemorySize" + val jobTimeoutKey = "jobTimeout" + val TmpDirMinKey = "tmpDirMin" val TmpDirMaxKey = "tmpDirMax" val OutDirMinKey = "outDirMin"