Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

WX-1252 Runtime attributes cleanup – CWL runtime attributes #7370

Merged
merged 8 commits into from
Feb 23, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ package cromwell.backend
import java.util.UUID

import cromwell.backend.io.JobPaths
import cromwell.backend.validation.{CpuValidation, MemoryValidation}
import cromwell.backend.validation.MemoryValidation
import cromwell.core.path.Path
import eu.timepit.refined.api.Refined
import eu.timepit.refined.numeric.Positive
Expand All @@ -27,7 +27,8 @@ object RuntimeEnvironmentBuilder {
callRoot.resolve(s"tmp.$hash").pathAsString
}

val cores: Int Refined Positive = CpuValidation.instanceMin.validate(runtimeAttributes).getOrElse(minimums.cores)
// This class is going away in https://github.com/broadinstitute/cromwell/pull/7369
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

...and this statement needed updating to keep these PRs compiling while in separate branches (incredibly, the only overlap)

val cores: Int Refined Positive = minimums.cores

val memoryInMB: Double =
MemoryValidation
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,6 @@ import wom.values.{WomInteger, WomValue}
object CpuValidation {
lazy val instance: RuntimeAttributesValidation[Int Refined Positive] = new CpuValidation(CpuKey)
lazy val optional: OptionalRuntimeAttributesValidation[Int Refined Positive] = instance.optional
lazy val instanceMin: RuntimeAttributesValidation[Int Refined Positive] = new CpuValidation(CpuMinKey)
lazy val optionalMin: OptionalRuntimeAttributesValidation[Int Refined Positive] = instanceMin.optional
lazy val instanceMax: RuntimeAttributesValidation[Int Refined Positive] = new CpuValidation(CpuMaxKey)
lazy val optionalMax: OptionalRuntimeAttributesValidation[Int Refined Positive] = instanceMax.optional

lazy val defaultMin: WomValue = WomInteger(1)
def configDefaultWomValue(config: Option[Config]): Option[WomValue] = instance.configDefaultWomValue(config)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -410,20 +410,10 @@ class RuntimeAttributesValidationSpec
ConfigFactory.parseString(backendConfigTemplate).getConfig("default-runtime-attributes")

val memoryVal = MemoryValidation.configDefaultString(RuntimeAttributesKeys.MemoryKey, Some(backendConfig))
val memoryMinVal = MemoryValidation.configDefaultString(RuntimeAttributesKeys.MemoryMinKey, Some(backendConfig))
val memoryMaxVal = MemoryValidation.configDefaultString(RuntimeAttributesKeys.MemoryMaxKey, Some(backendConfig))
MemoryValidation
.withDefaultMemory(RuntimeAttributesKeys.MemoryKey, memoryVal.get)
.runtimeAttributeDefinition
.factoryDefault shouldBe Some(WomLong(2147483648L))
MemoryValidation
.withDefaultMemory(RuntimeAttributesKeys.MemoryMinKey, memoryMinVal.get)
.runtimeAttributeDefinition
.factoryDefault shouldBe Some(WomLong(322122547L))
MemoryValidation
.withDefaultMemory(RuntimeAttributesKeys.MemoryMaxKey, memoryMaxVal.get)
.runtimeAttributeDefinition
.factoryDefault shouldBe Some(WomLong(429496729L))
}

"shouldn't throw up if the value for a default-runtime-attribute key cannot be coerced into an expected WomType" in {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,10 +97,6 @@ object AwsBatchRuntimeAttributes {
CpuValidation.instance
.withDefault(CpuValidation.configDefaultWomValue(runtimeConfig) getOrElse CpuValidation.defaultMin)

private def cpuMinValidation(runtimeConfig: Option[Config]): RuntimeAttributesValidation[Int Refined Positive] =
CpuValidation.instanceMin
.withDefault(CpuValidation.configDefaultWomValue(runtimeConfig) getOrElse CpuValidation.defaultMin)

private def failOnStderrValidation(runtimeConfig: Option[Config]) = FailOnStderrValidation.default(runtimeConfig)

private def continueOnReturnCodeValidation(runtimeConfig: Option[Config]) =
Expand All @@ -120,13 +116,6 @@ object AwsBatchRuntimeAttributes {
MemoryValidation.configDefaultString(RuntimeAttributesKeys.MemoryKey, runtimeConfig) getOrElse MemoryDefaultValue
)

private def memoryMinValidation(runtimeConfig: Option[Config]): RuntimeAttributesValidation[MemorySize] =
MemoryValidation.withDefaultMemory(RuntimeAttributesKeys.MemoryMinKey,
MemoryValidation.configDefaultString(RuntimeAttributesKeys.MemoryMinKey,
runtimeConfig
) getOrElse MemoryDefaultValue
)

private def noAddressValidation(runtimeConfig: Option[Config]): RuntimeAttributesValidation[Boolean] =
noAddressValidationInstance
.withDefault(noAddressValidationInstance.configDefaultWomValue(runtimeConfig) getOrElse NoAddressDefaultValue)
Expand All @@ -152,11 +141,9 @@ object AwsBatchRuntimeAttributes {
.default(runtimeConfig)
.withValidation(
cpuValidation(runtimeConfig),
cpuMinValidation(runtimeConfig),
disksValidation(runtimeConfig),
zonesValidation(runtimeConfig),
memoryValidation(runtimeConfig),
memoryMinValidation(runtimeConfig),
noAddressValidation(runtimeConfig),
dockerValidation,
queueArnValidation(runtimeConfig),
Expand All @@ -166,11 +153,9 @@ object AwsBatchRuntimeAttributes {
.default(runtimeConfig)
.withValidation(
cpuValidation(runtimeConfig),
cpuMinValidation(runtimeConfig),
disksValidation(runtimeConfig),
zonesValidation(runtimeConfig),
memoryValidation(runtimeConfig),
memoryMinValidation(runtimeConfig),
noAddressValidation(runtimeConfig),
dockerValidation,
queueArnValidation(runtimeConfig)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,13 +48,11 @@ import shapeless.Coproduct
import org.apache.commons.codec.digest.DigestUtils
import org.apache.commons.csv.{CSVFormat, CSVPrinter}
import org.apache.commons.io.output.ByteArrayOutputStream
import wdl4s.parser.MemoryUnit
import wom.callable.Callable.OutputDefinition
import wom.callable.MetaValueElement.{MetaValueElementBoolean, MetaValueElementObject}
import wom.callable.{AdHocValue, RuntimeEnvironment}
import wom.core.FullyQualifiedName
import wom.expression.{FileEvaluation, NoIoFunctionSet}
import wom.format.MemorySize
import wom.values._

import java.io.OutputStreamWriter
Expand Down Expand Up @@ -590,30 +588,6 @@ class GcpBatchAsyncBackendJobExecutionActor(override val standardParams: Standar
token <- data.privateDockerEncryptedToken
} yield CreateBatchDockerKeyAndToken(key, token)

/*
* Right now this doesn't cost anything, because sizeOption returns the size if it was previously already fetched
* for some reason (expression evaluation for instance), but otherwise does not retrieve it and returns None.
* In CWL-land we tend to be aggressive in pre-fetching the size in order to be able to evaluate JS expressions,
* but less in WDL as we can get it last minute and on demand because size is a WDL function, whereas in CWL
* we don't inspect the JS to know if size is called and therefore always pre-fetch it.
*
* We could decide to call withSize before in which case we would retrieve the size for all files and have
* a guaranteed more accurate total size, but there might be performance impacts ?
*/
val inputFileSize = Option(callInputFiles.values.flatMap(_.flatMap(_.sizeOption)).sum)

// Attempt to adjust the disk size by taking into account the size of input files
val adjustedSizeDisks =
inputFileSize.map(size => MemorySize.apply(size.toDouble, MemoryUnit.Bytes).to(MemoryUnit.GB)) map {
inputFileSizeInformation =>
runtimeAttributes.disks.adjustWorkingDiskWithNewMin(
inputFileSizeInformation,
jobLogger.info(
s"Adjusted working disk size to ${inputFileSizeInformation.amount} GB to account for input files"
)
)
} getOrElse runtimeAttributes.disks

val inputFilePaths = inputOutputParameters.jobInputParameters.map(_.cloudPath.pathAsString).toSet

val referenceDisksToMount =
Expand Down Expand Up @@ -667,7 +641,7 @@ class GcpBatchAsyncBackendJobExecutionActor(override val standardParams: Standar
jobShell = batchConfiguration.jobShell,
privateDockerKeyAndEncryptedToken = dockerKeyAndToken,
womOutputRuntimeExtractor = jobDescriptor.workflowDescriptor.outputRuntimeExtractor,
adjustedSizeDisks = adjustedSizeDisks,
disks = runtimeAttributes.disks,
virtualPrivateCloudConfiguration = batchAttributes.virtualPrivateCloudConfiguration,
retryWithMoreMemoryKeys = retryWithMoreMemoryKeys,
fuseEnabled = fuseEnabled(jobDescriptor.workflowDescriptor),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ object GcpBatchRequestFactory {
jobShell: String,
privateDockerKeyAndEncryptedToken: Option[CreateBatchDockerKeyAndToken],
womOutputRuntimeExtractor: Option[WomOutputRuntimeExtractor],
adjustedSizeDisks: Seq[GcpBatchAttachedDisk],
disks: Seq[GcpBatchAttachedDisk],
virtualPrivateCloudConfiguration: VirtualPrivateCloudConfiguration,
retryWithMoreMemoryKeys: Option[List[String]],
fuseEnabled: Boolean,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,7 @@ class GcpBatchRequestFactoryImpl()(implicit gcsTransferConfiguration: GcsTransfe
val createParameters = data.createParameters
val retryCount = data.gcpBatchParameters.runtimeAttributes.preemptible
val allDisksToBeMounted: Seq[GcpBatchAttachedDisk] =
createParameters.adjustedSizeDisks ++ createParameters.referenceDisksForLocalizationOpt.getOrElse(List.empty)
createParameters.disks ++ createParameters.referenceDisksForLocalizationOpt.getOrElse(List.empty)
val gcpBootDiskSizeMb = convertGbToMib(runtimeAttributes)

// set parent for metadata storage of job information
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,6 @@ import common.exception.MessageAggregation
import common.validation.ErrorOr._
import cromwell.backend.DiskPatterns._
import cromwell.core.path.{DefaultPathBuilder, Path}
import wdl4s.parser.MemoryUnit
import wom.format.MemorySize
import wom.values._

import scala.util.Try
Expand Down Expand Up @@ -58,15 +56,6 @@ object GcpBatchAttachedDisk {
case _: IllegalArgumentException => s"$value not convertible to a Long".invalidNel
}

implicit class EnhancedDisks(val disks: Seq[GcpBatchAttachedDisk]) extends AnyVal {
def adjustWorkingDiskWithNewMin(minimum: MemorySize, onAdjustment: => Unit): Seq[GcpBatchAttachedDisk] = disks map {
case disk: GcpBatchWorkingDisk
if disk == GcpBatchWorkingDisk.Default && disk.sizeGb < minimum.to(MemoryUnit.GB).amount.toInt =>
onAdjustment
disk.copy(sizeGb = minimum.to(MemoryUnit.GB).amount.toInt)
case other => other
}
}
}

trait GcpBatchAttachedDisk {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ import cromwell.backend.validation._
import eu.timepit.refined._
import eu.timepit.refined.api.Refined
import eu.timepit.refined.numeric.Positive
import wdl4s.parser.MemoryUnit
import wom.RuntimeAttributesKeys
import wom.format.MemorySize
import wom.types.{WomArrayType, WomStringType, WomType}
Expand Down Expand Up @@ -79,10 +78,6 @@ object GcpBatchRuntimeAttributes {
val CpuPlatformIntelCascadeLakeValue = "Intel Cascade Lake"
val CpuPlatformAMDRomeValue = "AMD Rome"

private def cpuMinValidation(runtimeConfig: Option[Config]): RuntimeAttributesValidation[Int Refined Positive] =
CpuValidation.instanceMin
.withDefault(CpuValidation.configDefaultWomValue(runtimeConfig) getOrElse CpuValidation.defaultMin)

val UseDockerImageCacheKey = "useDockerImageCache"
private val useDockerImageCacheValidationInstance = new BooleanRuntimeAttributesValidation(
UseDockerImageCacheKey
Expand Down Expand Up @@ -111,9 +106,6 @@ object GcpBatchRuntimeAttributes {
private def gpuCountValidation(
runtimeConfig: Option[Config]
): OptionalRuntimeAttributesValidation[Int Refined Positive] = GpuValidation.optional
private def gpuMinValidation(
runtimeConfig: Option[Config]
): OptionalRuntimeAttributesValidation[Int Refined Positive] = GpuValidation.optionalMin

private val dockerValidation: RuntimeAttributesValidation[String] = DockerValidation.instance

Expand Down Expand Up @@ -143,13 +135,6 @@ object GcpBatchRuntimeAttributes {
MemoryValidation.configDefaultString(RuntimeAttributesKeys.MemoryKey, runtimeConfig) getOrElse MemoryDefaultValue
)

private def memoryMinValidation(runtimeConfig: Option[Config]): RuntimeAttributesValidation[MemorySize] =
MemoryValidation.withDefaultMemory(
RuntimeAttributesKeys.MemoryMinKey,
MemoryValidation
.configDefaultString(RuntimeAttributesKeys.MemoryMinKey, runtimeConfig) getOrElse MemoryDefaultValue
)

private def bootDiskSizeValidation(runtimeConfig: Option[Config]): RuntimeAttributesValidation[Int] =
bootDiskValidationInstance
.withDefault(bootDiskValidationInstance.configDefaultWomValue(runtimeConfig) getOrElse BootDiskDefaultValue)
Expand All @@ -166,15 +151,6 @@ object GcpBatchRuntimeAttributes {
): OptionalRuntimeAttributesValidation[Boolean] =
useDockerImageCacheValidationInstance

private val outDirMinValidation: OptionalRuntimeAttributesValidation[MemorySize] =
InformationValidation.optional(RuntimeAttributesKeys.OutDirMinKey, MemoryUnit.MB, allowZero = true)

private val tmpDirMinValidation: OptionalRuntimeAttributesValidation[MemorySize] =
InformationValidation.optional(RuntimeAttributesKeys.TmpDirMinKey, MemoryUnit.MB, allowZero = true)

private val inputDirMinValidation: OptionalRuntimeAttributesValidation[MemorySize] =
InformationValidation.optional(RuntimeAttributesKeys.DnaNexusInputDirMinKey, MemoryUnit.MB, allowZero = true)

def runtimeAttributesBuilder(batchConfiguration: GcpBatchConfiguration): StandardValidatedRuntimeAttributesBuilder = {
val runtimeConfig = batchConfiguration.runtimeConfig
StandardValidatedRuntimeAttributesBuilder
Expand All @@ -185,21 +161,15 @@ object GcpBatchRuntimeAttributes {
gpuDriverValidation(runtimeConfig),
cpuValidation(runtimeConfig),
cpuPlatformValidation(runtimeConfig),
cpuMinValidation(runtimeConfig),
gpuMinValidation(runtimeConfig),
disksValidation(runtimeConfig),
noAddressValidation(runtimeConfig),
zonesValidation(runtimeConfig),
preemptibleValidation(runtimeConfig),
memoryValidation(runtimeConfig),
memoryMinValidation(runtimeConfig),
bootDiskSizeValidation(runtimeConfig),
useDockerImageCacheValidation(runtimeConfig),
checkpointFileValidationInstance,
dockerValidation,
outDirMinValidation,
tmpDirMinValidation,
inputDirMinValidation
dockerValidation
)
}

Expand Down Expand Up @@ -257,21 +227,6 @@ object GcpBatchRuntimeAttributes {
validatedRuntimeAttributes
)

val outDirMin: Option[MemorySize] = RuntimeAttributesValidation
.extractOption(outDirMinValidation.key, validatedRuntimeAttributes)
val tmpDirMin: Option[MemorySize] = RuntimeAttributesValidation
.extractOption(tmpDirMinValidation.key, validatedRuntimeAttributes)
val inputDirMin: Option[MemorySize] = RuntimeAttributesValidation
.extractOption(inputDirMinValidation.key, validatedRuntimeAttributes)

val totalExecutionDiskSizeBytes = List(inputDirMin.map(_.bytes),
outDirMin.map(_.bytes),
tmpDirMin.map(_.bytes)
).flatten.fold(MemorySize(0, MemoryUnit.Bytes).bytes)(_ + _)
val totalExecutionDiskSize = MemorySize(totalExecutionDiskSizeBytes, MemoryUnit.Bytes)

val adjustedDisks = disks.adjustWorkingDiskWithNewMin(totalExecutionDiskSize, ())

new GcpBatchRuntimeAttributes(
cpu,
cpuPlatform,
Expand All @@ -280,7 +235,7 @@ object GcpBatchRuntimeAttributes {
preemptible,
bootDiskSize,
memory,
adjustedDisks,
disks,
docker,
failOnStderr,
continueOnReturnCode,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,17 +13,13 @@ import cromwell.backend.validation.{
import eu.timepit.refined.api.Refined
import eu.timepit.refined.numeric.Positive
import eu.timepit.refined.refineV
import wom.RuntimeAttributesKeys.{GpuKey, GpuMaxKey, GpuMinKey}
import wom.RuntimeAttributesKeys.GpuKey
import wom.types.WomIntegerType
import wom.values.{WomInteger, WomValue}

object GpuValidation {
lazy val instance: RuntimeAttributesValidation[Int Refined Positive] = new GpuValidation(GpuKey)
lazy val optional: OptionalRuntimeAttributesValidation[Int Refined Positive] = instance.optional
lazy val instanceMin: RuntimeAttributesValidation[Int Refined Positive] = new GpuValidation(GpuMinKey)
lazy val optionalMin: OptionalRuntimeAttributesValidation[Int Refined Positive] = instanceMin.optional
lazy val instanceMax: RuntimeAttributesValidation[Int Refined Positive] = new GpuValidation(GpuMaxKey)
lazy val optionalMax: OptionalRuntimeAttributesValidation[Int Refined Positive] = instanceMax.optional

lazy val defaultMin: WomValue = WomInteger(0)
def configDefaultWomValue(config: Option[Config]): Option[WomValue] = instance.configDefaultWomValue(config)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -936,12 +936,10 @@ class GcpBatchAsyncBackendJobExecutionActorSpec
"runtimeAttributes:bootDiskSizeGb" -> "10",
"runtimeAttributes:continueOnReturnCode" -> "0",
"runtimeAttributes:cpu" -> "1",
"runtimeAttributes:cpuMin" -> "1",
"runtimeAttributes:disks" -> "local-disk 200 SSD",
"runtimeAttributes:docker" -> "ubuntu:latest",
"runtimeAttributes:failOnStderr" -> "false",
"runtimeAttributes:memory" -> "2 GB",
"runtimeAttributes:memoryMin" -> "2 GB",
"runtimeAttributes:noAddress" -> "false",
"runtimeAttributes:preemptible" -> "0",
"runtimeAttributes:zones" -> "us-central1-b,us-central1-a",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,17 +13,13 @@ import cromwell.backend.validation.{
import eu.timepit.refined.api.Refined
import eu.timepit.refined.numeric.Positive
import eu.timepit.refined.refineV
import wom.RuntimeAttributesKeys.{GpuKey, GpuMaxKey, GpuMinKey}
import wom.RuntimeAttributesKeys.GpuKey
import wom.types.WomIntegerType
import wom.values.{WomInteger, WomValue}

object GpuValidation {
lazy val instance: RuntimeAttributesValidation[Int Refined Positive] = new GpuValidation(GpuKey)
lazy val optional: OptionalRuntimeAttributesValidation[Int Refined Positive] = instance.optional
lazy val instanceMin: RuntimeAttributesValidation[Int Refined Positive] = new GpuValidation(GpuMinKey)
lazy val optionalMin: OptionalRuntimeAttributesValidation[Int Refined Positive] = instanceMin.optional
lazy val instanceMax: RuntimeAttributesValidation[Int Refined Positive] = new GpuValidation(GpuMaxKey)
lazy val optionalMax: OptionalRuntimeAttributesValidation[Int Refined Positive] = instanceMax.optional

lazy val defaultMin: WomValue = WomInteger(0)
def configDefaultWomValue(config: Option[Config]): Option[WomValue] = instance.configDefaultWomValue(config)
Expand Down
Loading
Loading