Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[WX-1156] internal_path_prefix for TES 4.4 #7190

Merged
merged 15 commits into from
Aug 15, 2023
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,15 @@ case class TesJobPaths private[tes] (override val workflowPaths: TesWorkflowPath
val callInputsDockerRoot = callDockerRoot.resolve("inputs")
val callInputsRoot = callRoot.resolve("inputs")

/*
* tesTaskRoot: This is the root directory that TES will use for files related to this task.
* We provide it to TES as a k/v pair where the key is "internal_path_prefix" (specified in TesWorkflowOptionKeys.scala)
* and the value is a blob path.
* This is not a standard TES feature, but rather related to the Azure TES implementation that Terra uses.
* While passing it outside of terra won't do any harm, we could consider making this optional and/or configurable.
*/
val tesTaskRoot : Path = callExecutionRoot.resolve("tes_task")
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does resolve throw an exception if it's not running on Azure TES? Or is it creating-if-not-exists?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good question. It requires callExecutionRoot to at least exist, but it will create .../tes_task if that doesn't yet exist.

callExecutionRoot needs to exist in order to not throw, but that should, at the minimum, be an empty path at this point.


// Given an output path, return a path localized to the storage file system
def storageOutput(path: String): String = {
callExecutionRoot.resolve(path).toString
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
package cromwell.backend.impl.tes

import common.collections.EnhancedCollections._
import common.util.StringUtil._
import cromwell.backend.impl.tes.OutputMode.OutputMode
Expand Down Expand Up @@ -71,7 +70,6 @@ final case class TesTask(jobDescriptor: BackendJobDescriptor,
path = tesPaths.callExecutionDockerRoot.resolve("script").toString,
`type` = Option("FILE")
)

private def writeFunctionFiles: Map[FullyQualifiedName, Seq[WomFile]] =
instantiatedCommand.createdFiles map { f => f.file.value.md5SumShort -> List(f.file) } toMap

Expand Down Expand Up @@ -231,11 +229,6 @@ final case class TesTask(jobDescriptor: BackendJobDescriptor,
workflowExecutionIdentityOption
)

val resources: Resources = TesTask.makeResources(
runtimeAttributes,
preferedWorkflowExecutionIdentity
)

val executors = Seq(Executor(
image = dockerImageUsed,
command = Seq(jobShell, commandScript.path),
Expand All @@ -245,6 +238,12 @@ final case class TesTask(jobDescriptor: BackendJobDescriptor,
stdin = None,
env = None
))

val resources: Resources = TesTask.makeResources(
runtimeAttributes,
preferedWorkflowExecutionIdentity,
Option(tesPaths.tesTaskRoot.pathAsString)
)
}

object TesTask {
Expand All @@ -254,15 +253,22 @@ object TesTask {
configIdentity.map(_.value).orElse(workflowOptionsIdentity.map(_.value))
}
def makeResources(runtimeAttributes: TesRuntimeAttributes,
workflowExecutionId: Option[String]): Resources = {

// This was added in BT-409 to let us pass information to an Azure
// TES server about which user identity to run tasks as.
// Note that we validate the type of WorkflowExecutionIdentity
// in TesInitializationActor.
val backendParameters = runtimeAttributes.backendParameters ++
workflowExecutionId: Option[String], internalPathPrefix: Option[String]): Resources = {
/*
* workflowExecutionId: This was added in BT-409 to let us pass information to an Azure
* TES server about which user identity to run tasks as.
* Note that we validate the type of WorkflowExecutionIdentity in TesInitializationActor.
*
* internalPathPrefix: Added in WX-1156 to support the azure TES implementation. Specifies
* a working directory that the TES task can use.
*/
val internalPathPrefixKey = "internal_path_prefix"
val backendParameters : Map[String, Option[String]] = runtimeAttributes.backendParameters ++
workflowExecutionId
.map(TesWorkflowOptionKeys.WorkflowExecutionIdentity -> Option(_))
.toMap ++
internalPathPrefix
.map(internalPathPrefixKey -> Option(_))
.toMap
val disk :: ram :: _ = Seq(runtimeAttributes.disk, runtimeAttributes.memory) map {
case Some(x) =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,46 +31,87 @@ class TesTaskSpec
false,
Map.empty
)
THWiseman marked this conversation as resolved.
Show resolved Hide resolved
val internalPathPrefix = Option("mock/path/to/tes/task")
val expectedTuple = "internal_path_prefix" -> internalPathPrefix

it should "create the correct resources when an identity is passed in WorkflowOptions" in {
val wei = Option("abc123")
TesTask.makeResources(runtimeAttributes, wei) shouldEqual
Resources(None, None, None, Option(false), None, Option(Map(TesWorkflowOptionKeys.WorkflowExecutionIdentity -> Option("abc123")))
TesTask.makeResources(runtimeAttributes, wei, internalPathPrefix) shouldEqual
Resources(None, None, None, Option(false), None,
Option(Map(TesWorkflowOptionKeys.WorkflowExecutionIdentity -> Option("abc123"),
expectedTuple))
)
}

it should "create the correct resources when an empty identity is passed in WorkflowOptions" in {
val wei = Option("")
TesTask.makeResources(runtimeAttributes, wei) shouldEqual
Resources(None, None, None, Option(false), None, Option(Map(TesWorkflowOptionKeys.WorkflowExecutionIdentity -> Option("")))
TesTask.makeResources(runtimeAttributes, wei, internalPathPrefix) shouldEqual
Resources(None, None, None, Option(false), None,
Option(Map(TesWorkflowOptionKeys.WorkflowExecutionIdentity -> Option(""),
expectedTuple))
)
}

it should "create the correct resources when no identity is passed in WorkflowOptions" in {
val wei = None
TesTask.makeResources(runtimeAttributes, wei) shouldEqual
Resources(None, None, None, Option(false), None, Option(Map.empty[String, Option[String]])
)
TesTask.makeResources(runtimeAttributes, wei, internalPathPrefix) shouldEqual
Resources(None, None, None, Option(false), None, Option(Map(expectedTuple)))
}

it should "create the correct resources when an identity is passed in via backend config" in {
val weic = Option(WorkflowExecutionIdentityConfig("abc123"))
val weio = Option(WorkflowExecutionIdentityOption("def456"))
val wei = TesTask.getPreferredWorkflowExecutionIdentity(weic, weio)
TesTask.makeResources(runtimeAttributes, wei) shouldEqual
Resources(None, None, None, Option(false), None, Option(Map(TesWorkflowOptionKeys.WorkflowExecutionIdentity -> Option("abc123")))
TesTask.makeResources(runtimeAttributes, wei, internalPathPrefix) shouldEqual
Resources(None, None, None, Option(false), None, Option(Map(TesWorkflowOptionKeys.WorkflowExecutionIdentity -> Option("abc123"),
expectedTuple))
)
}

it should "create the correct resources when no identity is passed in via backend config" in {
val weic = None
val weio = Option(WorkflowExecutionIdentityOption("def456"))
val wei = TesTask.getPreferredWorkflowExecutionIdentity(weic, weio)
TesTask.makeResources(runtimeAttributes, wei) shouldEqual
Resources(None, None, None, Option(false), None, Option(Map(TesWorkflowOptionKeys.WorkflowExecutionIdentity -> Option("def456")))
TesTask.makeResources(runtimeAttributes, wei, internalPathPrefix) shouldEqual
Resources(None, None, None, Option(false), None, Option(Map(TesWorkflowOptionKeys.WorkflowExecutionIdentity -> Option("def456"),
expectedTuple))
)
}

it should "correctly set the internal path prefix when provided as a backend parameter" in {
val wei = Option("abc123")
val internalPathPrefix = Option("mock/path/to/tes/task")
TesTask.makeResources(runtimeAttributes, wei, internalPathPrefix) shouldEqual
Resources(None, None, None, Option(false), None,
Option(Map(TesWorkflowOptionKeys.WorkflowExecutionIdentity -> Option("abc123"),
"internal_path_prefix" -> internalPathPrefix)
))
}

it should "correctly resolve the path to .../tes_task and add the k/v pair to backend parameters" in {
val emptyWorkflowOptions = WorkflowOptions(JsObject(Map.empty[String, JsValue]))
val workflowDescriptor = buildWdlWorkflowDescriptor(TestWorkflows.HelloWorld,
labels = Labels("foo" -> "bar"))
val jobDescriptor = jobDescriptorFromSingleCallWorkflow(workflowDescriptor,
Map.empty,
emptyWorkflowOptions,
Set.empty)
val tesPaths = TesJobPaths(jobDescriptor.key,
jobDescriptor.workflowDescriptor,
TestConfig.emptyConfig)

val expectedKey = "internal_path_prefix"
val expectedValue = Option(tesPaths.tesTaskRoot.pathAsString)

//Assert path correctly ends up in the resources
val wei = Option("abc123")
TesTask.makeResources(runtimeAttributes, wei, expectedValue) shouldEqual
Resources(None, None, None, Option(false), None,
Option(Map(TesWorkflowOptionKeys.WorkflowExecutionIdentity -> Option("abc123"),
expectedKey -> expectedValue))
)
}

it should "copy labels to tags" in {
val jobLogger = mock[JobLogger]
val emptyWorkflowOptions = WorkflowOptions(JsObject(Map.empty[String, JsValue]))
Expand Down