Skip to content

Commit

Permalink
[WX-1156] internal_path_prefix for TES 4.4 (#7190)
Browse files Browse the repository at this point in the history
  • Loading branch information
THWiseman authored Aug 15, 2023
1 parent 29d3810 commit 2a69691
Show file tree
Hide file tree
Showing 3 changed files with 81 additions and 25 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,15 @@ case class TesJobPaths private[tes] (override val workflowPaths: TesWorkflowPath
val callInputsDockerRoot = callDockerRoot.resolve("inputs")
val callInputsRoot = callRoot.resolve("inputs")

/*
* tesTaskRoot: This is the root directory that TES will use for files related to this task.
* We provide it to TES as a k/v pair where the key is "internal_path_prefix" (specified in TesWorkflowOptionKeys.scala)
* and the value is a blob path.
* This is not a standard TES feature, but rather related to the Azure TES implementation that Terra uses.
* While passing it outside of terra won't do any harm, we could consider making this optional and/or configurable.
*/
val tesTaskRoot : Path = callExecutionRoot.resolve("tes_task")

// Given an output path, return a path localized to the storage file system
def storageOutput(path: String): String = {
callExecutionRoot.resolve(path).toString
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
package cromwell.backend.impl.tes

import common.collections.EnhancedCollections._
import common.util.StringUtil._
import cromwell.backend.impl.tes.OutputMode.OutputMode
Expand Down Expand Up @@ -71,7 +70,6 @@ final case class TesTask(jobDescriptor: BackendJobDescriptor,
path = tesPaths.callExecutionDockerRoot.resolve("script").toString,
`type` = Option("FILE")
)

private def writeFunctionFiles: Map[FullyQualifiedName, Seq[WomFile]] =
instantiatedCommand.createdFiles map { f => f.file.value.md5SumShort -> List(f.file) } toMap

Expand Down Expand Up @@ -231,11 +229,6 @@ final case class TesTask(jobDescriptor: BackendJobDescriptor,
workflowExecutionIdentityOption
)

val resources: Resources = TesTask.makeResources(
runtimeAttributes,
preferedWorkflowExecutionIdentity
)

val executors = Seq(Executor(
image = dockerImageUsed,
command = Seq(jobShell, commandScript.path),
Expand All @@ -245,6 +238,12 @@ final case class TesTask(jobDescriptor: BackendJobDescriptor,
stdin = None,
env = None
))

val resources: Resources = TesTask.makeResources(
runtimeAttributes,
preferedWorkflowExecutionIdentity,
Option(tesPaths.tesTaskRoot.pathAsString)
)
}

object TesTask {
Expand All @@ -254,15 +253,22 @@ object TesTask {
configIdentity.map(_.value).orElse(workflowOptionsIdentity.map(_.value))
}
def makeResources(runtimeAttributes: TesRuntimeAttributes,
workflowExecutionId: Option[String]): Resources = {

// This was added in BT-409 to let us pass information to an Azure
// TES server about which user identity to run tasks as.
// Note that we validate the type of WorkflowExecutionIdentity
// in TesInitializationActor.
val backendParameters = runtimeAttributes.backendParameters ++
workflowExecutionId: Option[String], internalPathPrefix: Option[String]): Resources = {
/*
* workflowExecutionId: This was added in BT-409 to let us pass information to an Azure
* TES server about which user identity to run tasks as.
* Note that we validate the type of WorkflowExecutionIdentity in TesInitializationActor.
*
* internalPathPrefix: Added in WX-1156 to support the azure TES implementation. Specifies
* a working directory that the TES task can use.
*/
val internalPathPrefixKey = "internal_path_prefix"
val backendParameters : Map[String, Option[String]] = runtimeAttributes.backendParameters ++
workflowExecutionId
.map(TesWorkflowOptionKeys.WorkflowExecutionIdentity -> Option(_))
.toMap ++
internalPathPrefix
.map(internalPathPrefixKey -> Option(_))
.toMap
val disk :: ram :: _ = Seq(runtimeAttributes.disk, runtimeAttributes.memory) map {
case Some(x) =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,46 +31,87 @@ class TesTaskSpec
false,
Map.empty
)
val internalPathPrefix = Option("mock/path/to/tes/task")
val expectedTuple = "internal_path_prefix" -> internalPathPrefix

it should "create the correct resources when an identity is passed in WorkflowOptions" in {
val wei = Option("abc123")
TesTask.makeResources(runtimeAttributes, wei) shouldEqual
Resources(None, None, None, Option(false), None, Option(Map(TesWorkflowOptionKeys.WorkflowExecutionIdentity -> Option("abc123")))
TesTask.makeResources(runtimeAttributes, wei, internalPathPrefix) shouldEqual
Resources(None, None, None, Option(false), None,
Option(Map(TesWorkflowOptionKeys.WorkflowExecutionIdentity -> Option("abc123"),
expectedTuple))
)
}

it should "create the correct resources when an empty identity is passed in WorkflowOptions" in {
val wei = Option("")
TesTask.makeResources(runtimeAttributes, wei) shouldEqual
Resources(None, None, None, Option(false), None, Option(Map(TesWorkflowOptionKeys.WorkflowExecutionIdentity -> Option("")))
TesTask.makeResources(runtimeAttributes, wei, internalPathPrefix) shouldEqual
Resources(None, None, None, Option(false), None,
Option(Map(TesWorkflowOptionKeys.WorkflowExecutionIdentity -> Option(""),
expectedTuple))
)
}

it should "create the correct resources when no identity is passed in WorkflowOptions" in {
val wei = None
TesTask.makeResources(runtimeAttributes, wei) shouldEqual
Resources(None, None, None, Option(false), None, Option(Map.empty[String, Option[String]])
)
TesTask.makeResources(runtimeAttributes, wei, internalPathPrefix) shouldEqual
Resources(None, None, None, Option(false), None, Option(Map(expectedTuple)))
}

it should "create the correct resources when an identity is passed in via backend config" in {
val weic = Option(WorkflowExecutionIdentityConfig("abc123"))
val weio = Option(WorkflowExecutionIdentityOption("def456"))
val wei = TesTask.getPreferredWorkflowExecutionIdentity(weic, weio)
TesTask.makeResources(runtimeAttributes, wei) shouldEqual
Resources(None, None, None, Option(false), None, Option(Map(TesWorkflowOptionKeys.WorkflowExecutionIdentity -> Option("abc123")))
TesTask.makeResources(runtimeAttributes, wei, internalPathPrefix) shouldEqual
Resources(None, None, None, Option(false), None, Option(Map(TesWorkflowOptionKeys.WorkflowExecutionIdentity -> Option("abc123"),
expectedTuple))
)
}

it should "create the correct resources when no identity is passed in via backend config" in {
val weic = None
val weio = Option(WorkflowExecutionIdentityOption("def456"))
val wei = TesTask.getPreferredWorkflowExecutionIdentity(weic, weio)
TesTask.makeResources(runtimeAttributes, wei) shouldEqual
Resources(None, None, None, Option(false), None, Option(Map(TesWorkflowOptionKeys.WorkflowExecutionIdentity -> Option("def456")))
TesTask.makeResources(runtimeAttributes, wei, internalPathPrefix) shouldEqual
Resources(None, None, None, Option(false), None, Option(Map(TesWorkflowOptionKeys.WorkflowExecutionIdentity -> Option("def456"),
expectedTuple))
)
}

it should "correctly set the internal path prefix when provided as a backend parameter" in {
val wei = Option("abc123")
val internalPathPrefix = Option("mock/path/to/tes/task")
TesTask.makeResources(runtimeAttributes, wei, internalPathPrefix) shouldEqual
Resources(None, None, None, Option(false), None,
Option(Map(TesWorkflowOptionKeys.WorkflowExecutionIdentity -> Option("abc123"),
"internal_path_prefix" -> internalPathPrefix)
))
}

it should "correctly resolve the path to .../tes_task and add the k/v pair to backend parameters" in {
val emptyWorkflowOptions = WorkflowOptions(JsObject(Map.empty[String, JsValue]))
val workflowDescriptor = buildWdlWorkflowDescriptor(TestWorkflows.HelloWorld,
labels = Labels("foo" -> "bar"))
val jobDescriptor = jobDescriptorFromSingleCallWorkflow(workflowDescriptor,
Map.empty,
emptyWorkflowOptions,
Set.empty)
val tesPaths = TesJobPaths(jobDescriptor.key,
jobDescriptor.workflowDescriptor,
TestConfig.emptyConfig)

val expectedKey = "internal_path_prefix"
val expectedValue = Option(tesPaths.tesTaskRoot.pathAsString)

//Assert path correctly ends up in the resources
val wei = Option("abc123")
TesTask.makeResources(runtimeAttributes, wei, expectedValue) shouldEqual
Resources(None, None, None, Option(false), None,
Option(Map(TesWorkflowOptionKeys.WorkflowExecutionIdentity -> Option("abc123"),
expectedKey -> expectedValue))
)
}

it should "copy labels to tags" in {
val jobLogger = mock[JobLogger]
val emptyWorkflowOptions = WorkflowOptions(JsObject(Map.empty[String, JsValue]))
Expand Down

0 comments on commit 2a69691

Please sign in to comment.