Skip to content

Commit

Permalink
WX-1716 Set outputs move location in metadata (#7483)
Browse files Browse the repository at this point in the history
  • Loading branch information
aednichols authored Aug 6, 2024
1 parent ad57859 commit 8b4a498
Show file tree
Hide file tree
Showing 8 changed files with 81 additions and 31 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,20 @@ files {

metadata {
status: Succeeded
"outputs.simpleWorkflow.outFile": "gs://centaur-ci-us-east1/move_destination/simpleWorkflow/<<UUID>>/call-simpleStdoutTask/output.txt"
"outputs.simpleWorkflow.outGlob.0": "gs://centaur-ci-us-east1/move_destination/simpleWorkflow/<<UUID>>/call-simpleStdoutTask/glob-"~~
"outputs.simpleWorkflow.outGlob.1": "gs://centaur-ci-us-east1/move_destination/simpleWorkflow/<<UUID>>/call-simpleStdoutTask/glob-"~~
"outputs.simpleWorkflow.outGlob.2": "gs://centaur-ci-us-east1/move_destination/simpleWorkflow/<<UUID>>/call-simpleStdoutTask/glob-"~~
}

# The `centaur-ci-us-east1` bucket is in a different region than the workflow runs
fileSystemCheck: "gcs"
outputExpectations: {
"gs://centaur-ci-us-east1/move_destination/simpleWorkflow/<<UUID>>/call-simpleStdoutTask/output.txt": 1
# We don't currently have a way to check for glob contents, so do the next best thing by asserting the expected count
"gs://centaur-ci-us-east1/move_destination/simpleWorkflow/<<UUID>>/call-simpleStdoutTask/": 4
# "gs://centaur-ci-us-east1/move_destination/simpleWorkflow/<<UUID>>/call-simpleStdoutTask/glob-3860fac5129fd76b802f49dcd7f9a9f6/foo.zardoz": 1
# "gs://centaur-ci-us-east1/move_destination/simpleWorkflow/<<UUID>>/call-simpleStdoutTask/glob-3860fac5129fd76b802f49dcd7f9a9f6/bar.zardoz": 1
# "gs://centaur-ci-us-east1/move_destination/simpleWorkflow/<<UUID>>/call-simpleStdoutTask/glob-3860fac5129fd76b802f49dcd7f9a9f6/baz.zardoz": 1
"gs://cloud-cromwell-dev-self-cleaning/cromwell_execution/ci/simpleWorkflow/<<UUID>>/call-simpleStdoutTask/output.txt": 0
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ workflow simpleWorkflow {
call simpleStdoutTask {}
output {
File outFile = simpleStdoutTask.outFile
Array[File] outGlob = simpleStdoutTask.outGlob
}
}

Expand All @@ -10,6 +11,9 @@ task simpleStdoutTask {

command {
echo 'Hello world' > ${outputFileName}
echo 'foo' > "foo.zardoz"
echo 'bar' > "bar.zardoz"
echo 'baz' > "baz.zardoz"
}

runtime {
Expand All @@ -20,6 +24,6 @@ task simpleStdoutTask {

output {
File outFile = outputFileName
Array[File] outGlob = glob("*.zardoz")
}
}

Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,10 @@ case class WorkflowFlatMetadata(value: Map[String, JsValue]) extends AnyVal {
case o: JsString if stripQuotes(cacheSubstitutions).startsWith("~~") =>
val stripped = stripQuotes(cacheSubstitutions).stripPrefix("~~")
(!stripQuotes(o.toString).contains(stripped)).option(s"Actual value ${o.toString()} does not contain $stripped")
case o: JsString if stripQuotes(cacheSubstitutions).endsWith("~~") =>
val stripped = stripQuotes(cacheSubstitutions).stripSuffix("~~")
(!stripQuotes(o.toString).contains(stripped))
.option(s"Actual value ${o.toString()} does not start with $stripped")
case o: JsString =>
(cacheSubstitutions != o.toString).option(s"expected: $cacheSubstitutions but got: $actual")
case o: JsNumber =>
Expand Down
2 changes: 1 addition & 1 deletion docs/wf_options/Overview.md
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ Example `options.json`:
|Option| Value | Description |
|---|-----------------------------------|----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
|`final_workflow_outputs_dir`| A directory available to Cromwell | Specifies a path where final workflow outputs will be written. If this is not specified, workflow outputs will not be copied out of the Cromwell workflow execution directory/path. |
|`final_workflow_outputs_mode`| `"copy"` or `"move"` | `"copy"` is the default and preserves the source files. `"move"` performs a copy-delete sequence to clean up the source.<br/><br/>Note: as of this writing, the `/outputs` endpoint points to the source location. It is planned that for the `"move"` option only, `/outputs` will point to the destination.
|`final_workflow_outputs_mode`| `"copy"` or `"move"` | `"copy"` is the default and preserves the source files. `"move"` performs a copy-delete sequence to clean up the source.<br/><br/>For the `"move"` option only, the `/outputs` endpoint points to the destination.
|`use_relative_output_paths`| A boolean | When set to `true` this will copy all the outputs relative to their execution directory. my_final_workflow_outputs_dir/~~MyWorkflow/af76876d8-6e8768fa/call-MyTask/execution/~~output_of_interest . Cromwell will throw an exception when this leads to collisions. When the option is not set it will default to `false`. |
|`final_workflow_log_dir`| A directory available to Cromwell | Specifies a path where per-workflow logs will be written. If this is not specified, per-workflow logs will not be copied out of the Cromwell workflow log temporary directory/path before they are deleted. |
|`final_call_logs_dir`| A directory available to Cromwell | Specifies a path where final call logs will be written. If this is not specified, call logs will not be copied out of the Cromwell workflow execution directory/path. |
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
package cromwell.engine

import cromwell.backend.BackendWorkflowDescriptor
import cromwell.core.WorkflowOptions.WorkflowOption
import cromwell.core.WorkflowOptions.{FinalWorkflowOutputsDir, FinalWorkflowOutputsMode, WorkflowOption}
import cromwell.core.callcaching.CallCachingMode
import cromwell.core.path.PathBuilder
import wom.callable.Callable
Expand Down Expand Up @@ -31,4 +31,10 @@ case class EngineWorkflowDescriptor(topLevelCallable: Callable,
lazy val knownValues = backendDescriptor.knownValues

def getWorkflowOption(key: WorkflowOption): Option[String] = backendDescriptor.getWorkflowOption(key)

def finalWorkflowOutputsDir: Option[String] =
getWorkflowOption(FinalWorkflowOutputsDir)

def finalWorkflowOutputsMode: FinalWorkflowOutputsMode =
FinalWorkflowOutputsMode.fromString(getWorkflowOption(FinalWorkflowOutputsMode))
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,14 +6,13 @@ import cromwell.backend.{
BackendInitializationData,
BackendLifecycleActorFactory
}
import cromwell.core.CallOutputs
import cromwell.core.WorkflowOptions.UseRelativeOutputPaths
import cromwell.core.path.{Path, PathCopier, PathFactory}
import cromwell.engine.EngineWorkflowDescriptor
import cromwell.engine.backend.{BackendConfiguration, CromwellBackends}
import wom.values.{WomSingleFile, WomValue}

trait OutputsLocationHelper extends PathFactory {
trait OutputsLocationHelper {

private def findFiles(values: Seq[WomValue]): Seq[WomSingleFile] =
values flatMap {
Expand All @@ -22,20 +21,19 @@ trait OutputsLocationHelper extends PathFactory {
}
}

protected def getOutputFilePaths(outputsDir: String,
descriptor: EngineWorkflowDescriptor,
backendInitData: AllBackendInitializationData,
workflowOutputs: CallOutputs
): List[(Path, Path)] = {
PathFactory.buildPath(outputsDir, descriptor.pathBuilders)
val workflowOutputsPath = buildPath(outputsDir)
protected def outputFilePathMapping(outputsDir: String,
descriptor: EngineWorkflowDescriptor,
backendInitData: AllBackendInitializationData,
workflowOutputs: Seq[WomValue]
): Map[Path, Path] = {
val workflowOutputsPath = PathFactory.buildPath(outputsDir, descriptor.pathBuilders)
val useRelativeOutputPaths: Boolean = descriptor.getWorkflowOption(UseRelativeOutputPaths).contains("true")
val rootAndFiles = for {
// NOTE: Without .toSeq, outputs in arrays only yield the last output
backend <- descriptor.backendAssignments.values.toSeq
config <- BackendConfiguration.backendConfigurationDescriptor(backend).toOption.toSeq
rootPath <- getBackendRootPath(backend, config, descriptor, backendInitData).toSeq
outputFiles = findFiles(workflowOutputs.outputs.values.toSeq).map(_.value)
outputFiles = findFiles(workflowOutputs).map(_.value)
} yield (rootPath, outputFiles)

// This regex will make sure the path is relative to the execution folder.
Expand All @@ -56,7 +54,7 @@ trait OutputsLocationHelper extends PathFactory {
}
}
}
outputFileDestinations.distinct.toList
outputFileDestinations.distinct.toMap
}

private def getBackendRootPath(backend: String,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import cromwell.backend._
import cromwell.backend.standard.callcaching.BlacklistCache
import cromwell.core.Dispatcher._
import cromwell.core.ExecutionStatus._
import cromwell.core.WorkflowOptions.Move
import cromwell.core._
import cromwell.core.io.AsyncIo
import cromwell.core.logging.WorkflowLogging
Expand All @@ -34,7 +35,11 @@ import cromwell.engine.workflow.lifecycle.execution.keys.ExpressionKey.{
}
import cromwell.engine.workflow.lifecycle.execution.keys._
import cromwell.engine.workflow.lifecycle.execution.stores.{ActiveExecutionStore, ExecutionStore}
import cromwell.engine.workflow.lifecycle.{EngineLifecycleActorAbortCommand, EngineLifecycleActorAbortedResponse}
import cromwell.engine.workflow.lifecycle.{
EngineLifecycleActorAbortCommand,
EngineLifecycleActorAbortedResponse,
OutputsLocationHelper
}
import cromwell.engine.workflow.workflowstore.{RestartableAborting, StartableState}
import cromwell.filesystems.gcs.batch.GcsBatchCommandBuilder
import cromwell.services.instrumentation.CromwellInstrumentation
Expand All @@ -60,7 +65,8 @@ case class WorkflowExecutionActor(params: WorkflowExecutionActorParams)
with CallMetadataHelper
with StopAndLogSupervisor
with Timers
with CromwellInstrumentation {
with CromwellInstrumentation
with OutputsLocationHelper {

implicit val ec: ExecutionContextExecutor = context.dispatcher
override val serviceRegistryActor: ActorRef = params.serviceRegistryActor
Expand Down Expand Up @@ -408,14 +414,42 @@ case class WorkflowExecutionActor(params: WorkflowExecutionActorParams)
import spray.json._

def handleSuccessfulWorkflowOutputs(outputs: Map[GraphOutputNode, WomValue]) = {
val mapping: Map[String, String] =
(workflowDescriptor.finalWorkflowOutputsDir, workflowDescriptor.finalWorkflowOutputsMode) match {
case (Some(outputDir), Move) =>
outputFilePathMapping(outputDir, workflowDescriptor, params.initializationData, outputs.values.toSeq) map {
case (src, dst) =>
(src.pathAsString, dst.pathAsString)
}
case _ =>
Map.empty[String, String]
}

def moveOrIdentity(value: WomValue): WomValue =
value match {
case single: WomSingleFile =>
mapping.get(single.valueString) match {
case Some(dst) =>
WomSingleFile(dst)
case None =>
single
}
case array: WomArray =>
WomArray(array.value.map(moveOrIdentity))
case nonFileValue =>
nonFileValue
}

val fullyQualifiedOutputs = outputs map { case (outputNode, value) =>
outputNode.identifier.fullyQualifiedName.value -> value
outputNode.identifier.fullyQualifiedName.value -> moveOrIdentity(value)
}
// Publish fully qualified workflow outputs to log and metadata
workflowLogger.info(
s"""Workflow ${workflowDescriptor.callable.name} complete. Final Outputs:
|${fullyQualifiedOutputs.stripLarge.toJson.prettyPrint}""".stripMargin
)

// Fully qualified to match `CALL_FQN` column in metadata table
pushWorkflowOutputMetadata(fullyQualifiedOutputs)

val localOutputs = CallOutputs(outputs map { case (outputNode, value) =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ import cromwell.backend.BackendWorkflowFinalizationActor.{
}
import cromwell.backend.AllBackendInitializationData
import cromwell.core.Dispatcher.IoDispatcher
import cromwell.core.WorkflowOptions._
import cromwell.core.WorkflowOptions.{Copy, Move}
import cromwell.core._
import cromwell.core.io.AsyncIoActorClient
import cromwell.core.path.Path
Expand Down Expand Up @@ -44,7 +44,6 @@ class CopyWorkflowOutputsActor(workflowId: WorkflowId,
with OutputsLocationHelper {
override lazy val ioCommandBuilder = GcsBatchCommandBuilder
implicit val ec = context.dispatcher
override val pathBuilders = workflowDescriptor.pathBuilders

override def receive = LoggingReceive { case Finalize =>
performActionThenRespond(afterAll()(context.dispatcher), FinalizationFailed)(context.dispatcher)
Expand All @@ -60,13 +59,13 @@ class CopyWorkflowOutputsActor(workflowId: WorkflowId,
}
}

private def markDuplicates(outputFilePaths: List[(Path, Path)]) = {
private def markDuplicates(outputFilePaths: Map[Path, Path]) = {
// Check if there are duplicated destination paths and throw an exception if that is the case.
// This creates a map of destinations and source paths which point to them in cases where there are multiple
// source paths that point to the same destination.
val duplicatedDestPaths: Map[Path, List[Path]] =
outputFilePaths.groupBy { case (_, destPath) => destPath }.collect {
case (destPath, list) if list.size > 1 => destPath -> list.map { case (source, _) => source }
case (destPath, list) if list.size > 1 => destPath -> list.toList.map { case (source, _) => source }
}
if (duplicatedDestPaths.nonEmpty) {
val formattedCollidingCopyOptions = duplicatedDestPaths.toList
Expand All @@ -84,11 +83,11 @@ class CopyWorkflowOutputsActor(workflowId: WorkflowId,

private def copyWorkflowOutputs(outputsDir: String): Future[Seq[Unit]] = {
val outputFilePaths =
getOutputFilePaths(outputsDir, workflowDescriptor, initializationData, workflowOutputs)
outputFilePathMapping(outputsDir, workflowDescriptor, initializationData, workflowOutputs.outputs.values.toSeq)

markDuplicates(outputFilePaths)

val copies = outputFilePaths map { case (srcPath, dstPath) =>
val copies = outputFilePaths.toList map { case (srcPath, dstPath) =>
asyncIo.copyAsync(srcPath, dstPath)
}

Expand All @@ -97,11 +96,11 @@ class CopyWorkflowOutputsActor(workflowId: WorkflowId,

private def moveWorkflowOutputs(outputsDir: String): Future[Seq[Unit]] = {
val outputFilePaths =
getOutputFilePaths(outputsDir, workflowDescriptor, initializationData, workflowOutputs)
outputFilePathMapping(outputsDir, workflowDescriptor, initializationData, workflowOutputs.outputs.values.toSeq)

markDuplicates(outputFilePaths)

val moves = outputFilePaths map { case (srcPath, dstPath) =>
val moves = outputFilePaths.toList map { case (srcPath, dstPath) =>
asyncIo.copyAsync(srcPath, dstPath) flatMap { _ =>
asyncIo.deleteAsync(srcPath)
}
Expand All @@ -113,14 +112,10 @@ class CopyWorkflowOutputsActor(workflowId: WorkflowId,
/**
* Happens after everything else runs
*/
final def afterAll()(implicit ec: ExecutionContext): Future[FinalizationResponse] = {
val maybeOutputsDir = workflowDescriptor.getWorkflowOption(FinalWorkflowOutputsDir)
val mode = FinalWorkflowOutputsMode.fromString(workflowDescriptor.getWorkflowOption(FinalWorkflowOutputsMode))

(maybeOutputsDir, mode) match {
final def afterAll()(implicit ec: ExecutionContext): Future[FinalizationResponse] =
(workflowDescriptor.finalWorkflowOutputsDir, workflowDescriptor.finalWorkflowOutputsMode) match {
case (Some(outputsDir), Copy) => copyWorkflowOutputs(outputsDir) map { _ => FinalizationSuccess }
case (Some(outputsDir), Move) => moveWorkflowOutputs(outputsDir) map { _ => FinalizationSuccess }
case _ => Future.successful(FinalizationSuccess)
}
}
}

0 comments on commit 8b4a498

Please sign in to comment.