Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[WX-1156] internal_path_prefix for TES 4.4 #7190

Merged
merged 15 commits into from
Aug 15, 2023
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,10 @@ case class TesJobPaths private[tes] (override val workflowPaths: TesWorkflowPath
val callInputsDockerRoot = callDockerRoot.resolve("inputs")
val callInputsRoot = callRoot.resolve("inputs")

// This is the root directory that TES will use for files related to this task.
// We must specify it in the backend parameters sent to TES using "internal_path_prefix" as the key.
THWiseman marked this conversation as resolved.
Show resolved Hide resolved
val tesTaskRoot = callExecutionRoot.resolve("tes_task")

// Given an output path, return a path localized to the storage file system
def storageOutput(path: String): String = {
callExecutionRoot.resolve(path).toString
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
package cromwell.backend.impl.tes

import common.collections.EnhancedCollections._
import common.util.StringUtil._
import cromwell.backend.impl.tes.OutputMode.OutputMode
Expand Down Expand Up @@ -71,7 +70,6 @@ final case class TesTask(jobDescriptor: BackendJobDescriptor,
path = tesPaths.callExecutionDockerRoot.resolve("script").toString,
`type` = Option("FILE")
)

private def writeFunctionFiles: Map[FullyQualifiedName, Seq[WomFile]] =
instantiatedCommand.createdFiles map { f => f.file.value.md5SumShort -> List(f.file) } toMap

Expand Down Expand Up @@ -231,9 +229,12 @@ final case class TesTask(jobDescriptor: BackendJobDescriptor,
workflowExecutionIdentityOption
)

val internalPathPrefix = ("internal_path_prefix", Option(tesPaths.tesTaskRoot.pathAsString))

val resources: Resources = TesTask.makeResources(
runtimeAttributes,
preferedWorkflowExecutionIdentity
preferedWorkflowExecutionIdentity,
Map(internalPathPrefix)
)

val executors = Seq(Executor(
Expand All @@ -254,7 +255,7 @@ object TesTask {
configIdentity.map(_.value).orElse(workflowOptionsIdentity.map(_.value))
}
def makeResources(runtimeAttributes: TesRuntimeAttributes,
workflowExecutionId: Option[String]): Resources = {
workflowExecutionId: Option[String], additionalBackendParams: Map[String, Option[String]]): Resources = {
THWiseman marked this conversation as resolved.
Show resolved Hide resolved

// This was added in BT-409 to let us pass information to an Azure
// TES server about which user identity to run tasks as.
Expand All @@ -263,7 +264,8 @@ object TesTask {
val backendParameters = runtimeAttributes.backendParameters ++
workflowExecutionId
.map(TesWorkflowOptionKeys.WorkflowExecutionIdentity -> Option(_))
.toMap
.toMap ++
additionalBackendParams
val disk :: ram :: _ = Seq(runtimeAttributes.disk, runtimeAttributes.memory) map {
case Some(x) =>
Option(x.to(MemoryUnit.GB).amount)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,46 +31,89 @@ class TesTaskSpec
false,
Map.empty
)
THWiseman marked this conversation as resolved.
Show resolved Hide resolved
val internalPathPrefix = ("internal_path_prefix", Option("mock/path/to/tes/task"))
val additionalBackendParams = Map(internalPathPrefix)

it should "create the correct resources when an identity is passed in WorkflowOptions" in {
val wei = Option("abc123")
TesTask.makeResources(runtimeAttributes, wei) shouldEqual
Resources(None, None, None, Option(false), None, Option(Map(TesWorkflowOptionKeys.WorkflowExecutionIdentity -> Option("abc123")))
TesTask.makeResources(runtimeAttributes, wei, additionalBackendParams) shouldEqual
Resources(None, None, None, Option(false), None,
Option(Map(TesWorkflowOptionKeys.WorkflowExecutionIdentity -> Option("abc123"),
internalPathPrefix))
)
}

it should "create the correct resources when an empty identity is passed in WorkflowOptions" in {
val wei = Option("")
TesTask.makeResources(runtimeAttributes, wei) shouldEqual
Resources(None, None, None, Option(false), None, Option(Map(TesWorkflowOptionKeys.WorkflowExecutionIdentity -> Option("")))
TesTask.makeResources(runtimeAttributes, wei, additionalBackendParams) shouldEqual
Resources(None, None, None, Option(false), None,
Option(Map(TesWorkflowOptionKeys.WorkflowExecutionIdentity -> Option(""),
internalPathPrefix))
)
}

it should "create the correct resources when no identity is passed in WorkflowOptions" in {
val wei = None
TesTask.makeResources(runtimeAttributes, wei) shouldEqual
Resources(None, None, None, Option(false), None, Option(Map.empty[String, Option[String]])
)
TesTask.makeResources(runtimeAttributes, wei, additionalBackendParams) shouldEqual
Resources(None, None, None, Option(false), None, Option(Map(internalPathPrefix)))
}

it should "create the correct resources when an identity is passed in via backend config" in {
val weic = Option(WorkflowExecutionIdentityConfig("abc123"))
val weio = Option(WorkflowExecutionIdentityOption("def456"))
val wei = TesTask.getPreferredWorkflowExecutionIdentity(weic, weio)
TesTask.makeResources(runtimeAttributes, wei) shouldEqual
Resources(None, None, None, Option(false), None, Option(Map(TesWorkflowOptionKeys.WorkflowExecutionIdentity -> Option("abc123")))
TesTask.makeResources(runtimeAttributes, wei, additionalBackendParams) shouldEqual
Resources(None, None, None, Option(false), None, Option(Map(TesWorkflowOptionKeys.WorkflowExecutionIdentity -> Option("abc123"),
internalPathPrefix))
)
}

it should "create the correct resources when no identity is passed in via backend config" in {
val weic = None
val weio = Option(WorkflowExecutionIdentityOption("def456"))
val wei = TesTask.getPreferredWorkflowExecutionIdentity(weic, weio)
TesTask.makeResources(runtimeAttributes, wei) shouldEqual
Resources(None, None, None, Option(false), None, Option(Map(TesWorkflowOptionKeys.WorkflowExecutionIdentity -> Option("def456")))
TesTask.makeResources(runtimeAttributes, wei, additionalBackendParams) shouldEqual
Resources(None, None, None, Option(false), None, Option(Map(TesWorkflowOptionKeys.WorkflowExecutionIdentity -> Option("def456"),
internalPathPrefix))
)
}

it should "correctly set the internal path prefix when provided as a backend parameter" in {
val wei = Option("abc123")
val internalPathPrefix = ("internal_path_prefix", Option("mock/path/to/tes/task"))
val additionalBackendParams = Map(internalPathPrefix)
TesTask.makeResources(runtimeAttributes, wei, additionalBackendParams) shouldEqual
Resources(None, None, None, Option(false), None,
Option(Map(TesWorkflowOptionKeys.WorkflowExecutionIdentity -> Option("abc123"),
internalPathPrefix))
)
}

it should "correctly resolve the path to .../tes_task and add the k/v pair to backend parameters" in {
val emptyWorkflowOptions = WorkflowOptions(JsObject(Map.empty[String, JsValue]))
val workflowDescriptor = buildWdlWorkflowDescriptor(TestWorkflows.HelloWorld,
labels = Labels("foo" -> "bar"))
val jobDescriptor = jobDescriptorFromSingleCallWorkflow(workflowDescriptor,
Map.empty,
emptyWorkflowOptions,
Set.empty)
val tesPaths = TesJobPaths(jobDescriptor.key,
jobDescriptor.workflowDescriptor,
TestConfig.emptyConfig)

val expectedKey = "internal_path_prefix"
val expectedValue = Option(tesPaths.tesTaskRoot.pathAsString)

//Assert path correctly ends up in the resources
val additionalBackendParams = Map(expectedKey -> expectedValue)
val wei = Option("abc123")
TesTask.makeResources(runtimeAttributes, wei, additionalBackendParams) shouldEqual
Resources(None, None, None, Option(false), None,
Option(Map(TesWorkflowOptionKeys.WorkflowExecutionIdentity -> Option("abc123"),
expectedKey -> expectedValue))
)
}

it should "copy labels to tags" in {
val jobLogger = mock[JobLogger]
val emptyWorkflowOptions = WorkflowOptions(JsObject(Map.empty[String, JsValue]))
Expand Down