Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

WX-1217 Workflow completion callback #7213

Merged
merged 33 commits into from
Sep 22, 2023
Merged
Show file tree
Hide file tree
Changes from 30 commits
Commits
Show all changes
33 commits
Select commit Hold shift + click to select a range
05a30bb
Draft WorkflowCallbackActor
jgainerdewar Aug 22, 2023
106b669
Configure and run WorkflowCallbackActor in finalization
jgainerdewar Aug 28, 2023
5fc1e66
Actually retry when there is a failure
jgainerdewar Aug 28, 2023
3e8500f
Add workflow callback result to metadata
jgainerdewar Aug 31, 2023
a114372
Add Azure auth
jgainerdewar Sep 1, 2023
4ff381d
Move workflow callback out of finalization step
jgainerdewar Sep 4, 2023
14584e9
Only send outputs if the workflow succeeded
jgainerdewar Sep 4, 2023
a87cf07
Let users configure maxRetries
jgainerdewar Sep 5, 2023
891121e
Fix auth header
jgainerdewar Sep 5, 2023
a5454aa
WorkflowActor test
jgainerdewar Sep 8, 2023
f37d614
Break http interactions out for mocking
jgainerdewar Sep 8, 2023
04b8ce7
WorkflowCallbackActor shutdown
jgainerdewar Sep 8, 2023
a9c2ce2
Fix duplicated variable name
jgainerdewar Sep 8, 2023
27dac91
WorkflowCallbackActor tests
jgainerdewar Sep 8, 2023
b190a44
Fix test
jgainerdewar Sep 10, 2023
84b6461
Docs
jgainerdewar Sep 10, 2023
5f1e3cb
Merge branch 'develop' into jgd_WX-1217_callback
jgainerdewar Sep 11, 2023
2870c92
Include workflow failure message in callback
jgainerdewar Sep 11, 2023
ade1ca2
Merge branch 'jgd_WX-1217_callback' of github.com:broadinstitute/crom…
jgainerdewar Sep 11, 2023
7cddd10
Refer to docs in changelog
jgainerdewar Sep 12, 2023
0270448
Clean up TODOs, not going TO DO them right now
jgainerdewar Sep 12, 2023
a91ddd9
Comments
jgainerdewar Sep 12, 2023
c8ea146
Update engine/src/main/scala/cromwell/engine/workflow/WorkflowActor.s…
jgainerdewar Sep 15, 2023
9867876
More WorkflowActor tests
jgainerdewar Sep 18, 2023
6c94189
Request callback directly from onTransition rather than sending event
jgainerdewar Sep 18, 2023
fd1b171
Create a consistent JSON message schema regardless of workflow status
jgainerdewar Sep 18, 2023
25e2f3a
PR feedback
jgainerdewar Sep 19, 2023
5392525
Dedicated thread pool for WorkflowCallbackActor with configurable size
jgainerdewar Sep 19, 2023
9dcfe21
More sane default handling for config
jgainerdewar Sep 19, 2023
f8983cf
Update docs for new config
jgainerdewar Sep 20, 2023
d51bece
Merge branch 'develop' into jgd_WX-1217_callback
jgainerdewar Sep 21, 2023
029b24c
Comment update
jgainerdewar Sep 22, 2023
b291a1d
Merge branch 'jgd_WX-1217_callback' of github.com:broadinstitute/crom…
jgainerdewar Sep 22, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,11 @@ WDL `size` engine function now works for HTTP files.
Cromwell can now send logs to Azure Application Insights. To enable, set environment
variable `APPLICATIONINSIGHTS_INSTRUMENTATIONKEY` to your account's key. [See here for information.](https://learn.microsoft.com/en-us/azure/azure-monitor/app/sdk-connection-string)

### Workflow Completion Callback

Cromwell can be configured to send a POST request to a specified URL when a workflow completes. The request body
includes the workflow id, terminal state, and (if applicable) final outputs or error message.
See `WorkflowCallback` in [ReadTheDocs](https://cromwell.readthedocs.io/en/stable/) for more information.

## 85 Release Notes

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ case object AzureCredentials {
new DefaultAzureCredentialBuilder()
.authorityHost(azureProfile.getEnvironment.getActiveDirectoryEndpoint)

def getAccessToken(identityClientId: Option[String]): ErrorOr[String] = {
def getAccessToken(identityClientId: Option[String] = None): ErrorOr[String] = {
val credentials = identityClientId.foldLeft(defaultCredentialBuilder) {
(builder, clientId) => builder.managedIdentityClientId(clientId)
}.build()
Expand Down
16 changes: 16 additions & 0 deletions core/src/main/resources/reference.conf
Original file line number Diff line number Diff line change
Expand Up @@ -626,3 +626,19 @@ ga4gh {
contact-info-url = "https://cromwell.readthedocs.io/en/stable/"
}
}

workflow-state-callback {
enabled: false
## The number of threads to allocate for performing callbacks
# num-threads: 5
# endpoint: "http://example.com/foo" # Can be overridden in workflow options
# auth.azure: true
## Users can override default retry behavior if desired
# request-backoff {
# min: "3 seconds"
# max: "5 minutes"
# multiplier: 1.1
# }
# max-retries = 10

}
1 change: 1 addition & 0 deletions core/src/main/scala/cromwell/core/WorkflowOptions.scala
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ object WorkflowOptions {
case object WorkflowFailureMode extends WorkflowOption("workflow_failure_mode")
case object UseReferenceDisks extends WorkflowOption("use_reference_disks")
case object MemoryRetryMultiplier extends WorkflowOption("memory_retry_multiplier")
case object WorkflowCallbackUri extends WorkflowOption("workflow_callback_uri")

private lazy val WorkflowOptionsConf = ConfigFactory.load.getConfig("workflow-options")
private lazy val EncryptedFields: Seq[String] = WorkflowOptionsConf.getStringList("encrypted-fields").asScala.toList
Expand Down
57 changes: 57 additions & 0 deletions docs/cromwell_features/WorkflowCallback.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
The workflow callback is a simple way to integrate Cromwell with an external system. When each workflow reaches a terminal
state, Cromwell will POST a message to a provided URL (see below for schema of this message). Messages are sent for root
workflows only, not subworkflows.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

One last thought - maybe include the phase "best effort" here to make sure it clear that Cromwell will try, but makes no guarantees about delivery


### Configuration

This feature will only be used if enabled via config. All config items except `enabled` are optional.

```
workflow-state-callback {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we include where in the config file should this go? Top level? In engine or service? Somewhere else? (meanwhile, I'm off to look through the code to try to find out 😄 )

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems like the answer is: yes, top level

enabled: true
num-threads: 5
endpoint: "http://example.com"
auth.azure: true
request-backoff {
min: "3 seconds",
max: "5 minutes",
multiplier: 1.1
}
max-retries = 10
}
```

* `enabled`: This boolean controls whether a callback will be attempted or not.
* `num-threads`: The number of threads Cromwell will allocate for performing callbacks.
* `endpoint`: This is the default URL to send the message to. If this is unset, and no URL is set in workflow options, no callback will be sent.
* `auth.azure`: If true, and if Cromwell is running in an Azure environment, Cromwell will include an auth header with bearer token generated from local Azure credentials.
* `request-backoff` and `max-retries`: Include these to override the default retry behavior (default behavior shown here).

### Workflow Options

You may choose to override the `endpoint` set in config by including this workflow option:
```
{
"workflow_callback_uri": "http://mywebsite.com"
}
```

### Callback schema

Below is an example of a callback request body.

```
{
"workflowId": "00001111-2222-3333-4444-555566667777",
"state": "Succeeded",
"outputs": {
"task1.out": 5,
"task2.out": "/some/file.txt"
}
}
```

* `workflowId`: The UUID of the workflow
* `state`: The terminal state of the workflow. The list of possible values is: `Succeeded`, `Failed`, `Aborted`
* `outputs`: The final outputs of the workflow, as would be returned from the `api/workflows/{version}/{id}/outputs` endpoint. Expected to be empty when the workflow is not successful..
* `failures`: A list of strings describing the workflow's failures. Expected to be empty if the workflow did not fail.
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package cromwell.engine.workflow

import java.util.concurrent.atomic.AtomicInteger

import akka.actor.SupervisorStrategy.Stop
import akka.actor._
import com.typesafe.config.Config
Expand All @@ -23,6 +22,7 @@ import cromwell.engine.workflow.lifecycle.deletion.DeleteWorkflowFilesActor
import cromwell.engine.workflow.lifecycle.deletion.DeleteWorkflowFilesActor.{DeleteWorkflowFilesFailedResponse, DeleteWorkflowFilesSucceededResponse, StartWorkflowFilesDeletion}
import cromwell.engine.workflow.lifecycle.execution.WorkflowExecutionActor
import cromwell.engine.workflow.lifecycle.execution.WorkflowExecutionActor._
import cromwell.engine.workflow.lifecycle.finalization.WorkflowCallbackActor.PerformCallbackCommand
import cromwell.engine.workflow.lifecycle.finalization.WorkflowFinalizationActor.{StartFinalizationCommand, WorkflowFinalizationFailedResponse, WorkflowFinalizationSucceededResponse}
import cromwell.engine.workflow.lifecycle.finalization.{CopyWorkflowLogsActor, CopyWorkflowOutputsActor, WorkflowFinalizationActor}
import cromwell.engine.workflow.lifecycle.initialization.WorkflowInitializationActor
Expand Down Expand Up @@ -149,7 +149,7 @@ object WorkflowActor {
initializationData: AllBackendInitializationData,
lastStateReached: StateCheckpoint,
effectiveStartableState: StartableState,
workflowFinalOutputs: Set[WomValue] = Set.empty,
workflowFinalOutputs: Option[CallOutputs] = None,
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This change was made so that the PerformWorkflowCallback event handling has access to the final CallOutputs, which it needs to create its POST request. Previously it had access to their values but not their labels/names.

workflowAllOutputs: Set[WomValue] = Set.empty,
rootAndSubworkflowIds: Set[WorkflowId] = Set.empty,
failedInitializationAttempts: Int = 0)
Expand All @@ -176,6 +176,7 @@ object WorkflowActor {
ioActor: ActorRef,
serviceRegistryActor: ActorRef,
workflowLogCopyRouter: ActorRef,
workflowCallbackActor: Option[ActorRef],
jobStoreActor: ActorRef,
subWorkflowStoreActor: ActorRef,
callCacheReadActor: ActorRef,
Expand All @@ -199,6 +200,7 @@ object WorkflowActor {
ioActor = ioActor,
serviceRegistryActor = serviceRegistryActor,
workflowLogCopyRouter = workflowLogCopyRouter,
workflowCallbackActor = workflowCallbackActor,
jobStoreActor = jobStoreActor,
subWorkflowStoreActor = subWorkflowStoreActor,
callCacheReadActor = callCacheReadActor,
Expand Down Expand Up @@ -226,6 +228,7 @@ class WorkflowActor(workflowToStart: WorkflowToStart,
ioActor: ActorRef,
override val serviceRegistryActor: ActorRef,
workflowLogCopyRouter: ActorRef,
workflowCallbackActor: Option[ActorRef],
jobStoreActor: ActorRef,
subWorkflowStoreActor: ActorRef,
callCacheReadActor: ActorRef,
Expand Down Expand Up @@ -534,26 +537,27 @@ class WorkflowActor(workflowToStart: WorkflowToStart,
case _ => // The WMA is waiting for the WorkflowActorWorkComplete message. No extra information needed here.
}

// Copy/Delete workflow logs
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This section is just a copy/paste of this code to move the brute force workflow options up in scope (outside if (WorkflowLogger.isEnabled).

if (WorkflowLogger.isEnabled) {
/*
* The submitted workflow options have been previously validated by the CromwellApiHandler. These are
* being recreated so that in case MaterializeWorkflowDescriptor fails, the workflow logs can still
* be copied by accessing the workflow options outside of the EngineWorkflowDescriptor.
*/
def bruteForceWorkflowOptions: WorkflowOptions = sources.workflowOptions
val system = context.system
val ec = context.system.dispatcher
def bruteForcePathBuilders: Future[List[PathBuilder]] = {
// Protect against path builders that may throw an exception instead of returning a failed future
Future(EngineFilesystems.pathBuildersForWorkflow(bruteForceWorkflowOptions, pathBuilderFactories)(system))(ec).flatten
}
/*
* The submitted workflow options have been previously validated by the CromwellApiHandler. These are
* being recreated so that even if the MaterializeWorkflowDescriptor fails, the workflow options can still be
* accessed outside of the EngineWorkflowDescriptor. Used for both copying workflow log and sending workflow callbacks.
*/
def bruteForceWorkflowOptions: WorkflowOptions = sources.workflowOptions

val system = context.system
val ec = context.system.dispatcher
def bruteForcePathBuilders: Future[List[PathBuilder]] = {
// Protect against path builders that may throw an exception instead of returning a failed future
Future(EngineFilesystems.pathBuildersForWorkflow(bruteForceWorkflowOptions, pathBuilderFactories)(system))(ec).flatten
}

val (workflowOptions, pathBuilders) = stateData.workflowDescriptor match {
case Some(wd) => (wd.backendDescriptor.workflowOptions, Future.successful(wd.pathBuilders))
case None => (bruteForceWorkflowOptions, bruteForcePathBuilders)
}
val (workflowOptions, pathBuilders) = stateData.workflowDescriptor match {
case Some(wd) => (wd.backendDescriptor.workflowOptions, Future.successful(wd.pathBuilders))
case None => (bruteForceWorkflowOptions, bruteForcePathBuilders)
}

// Copy/Delete workflow logs
if (WorkflowLogger.isEnabled) {
workflowOptions.get(FinalWorkflowLogDir).toOption match {
case Some(destinationDir) =>
pathBuilders
Expand All @@ -566,6 +570,18 @@ class WorkflowActor(workflowToStart: WorkflowToStart,
}
}

// Attempt to perform workflow completion callback
workflowCallbackActor.foreach { wca =>
val callbackUri = workflowOptions.get(WorkflowOptions.WorkflowCallbackUri).toOption
wca ! PerformCallbackCommand(
workflowId,
callbackUri,
terminalState.workflowState,
stateData.workflowFinalOutputs.getOrElse(CallOutputs.empty),
nextStateData.lastStateReached.failures.toList.flatMap(_.map(_.getMessage))
)
}

// We can't transition from within another transition function, but we can instruct ourselves to with a message:
self ! AwaitMetadataIntegrity
jgainerdewar marked this conversation as resolved.
Show resolved Hide resolved

Expand Down Expand Up @@ -627,7 +643,7 @@ class WorkflowActor(workflowToStart: WorkflowToStart,
rootWorkflowId = rootWorkflowId,
rootWorkflowRootPaths = data.initializationData.getWorkflowRoots(),
rootAndSubworkflowIds = data.rootAndSubworkflowIds,
workflowFinalOutputs = data.workflowFinalOutputs,
workflowFinalOutputs = data.workflowFinalOutputs.map(out => out.outputs.values.toSet).getOrElse(Set.empty),
workflowAllOutputs = data.workflowAllOutputs,
pathBuilders = data.workflowDescriptor.get.pathBuilders,
serviceRegistryActor = serviceRegistryActor,
Expand Down Expand Up @@ -689,7 +705,7 @@ class WorkflowActor(workflowToStart: WorkflowToStart,
finalizationActor ! StartFinalizationCommand
goto(FinalizingWorkflowState) using data.copy(
lastStateReached = StateCheckpoint (lastStateOverride.getOrElse(stateName), failures),
workflowFinalOutputs = workflowFinalOutputs.outputs.values.toSet,
workflowFinalOutputs = Option(workflowFinalOutputs),
workflowAllOutputs = workflowAllOutputs,
rootAndSubworkflowIds = rootAndSubworkflowIds
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ object WorkflowManagerActor {
ioActor: ActorRef,
serviceRegistryActor: ActorRef,
workflowLogCopyRouter: ActorRef,
workflowCallbackActor: Option[ActorRef],
jobStoreActor: ActorRef,
subWorkflowStoreActor: ActorRef,
callCacheReadActor: ActorRef,
Expand All @@ -75,6 +76,7 @@ object WorkflowManagerActor {
ioActor = ioActor,
serviceRegistryActor = serviceRegistryActor,
workflowLogCopyRouter = workflowLogCopyRouter,
workflowCallbackActor = workflowCallbackActor,
jobStoreActor = jobStoreActor,
subWorkflowStoreActor = subWorkflowStoreActor,
callCacheReadActor = callCacheReadActor,
Expand Down Expand Up @@ -124,6 +126,7 @@ case class WorkflowManagerActorParams(config: Config,
ioActor: ActorRef,
serviceRegistryActor: ActorRef,
workflowLogCopyRouter: ActorRef,
workflowCallbackActor: Option[ActorRef],
jobStoreActor: ActorRef,
subWorkflowStoreActor: ActorRef,
callCacheReadActor: ActorRef,
Expand Down Expand Up @@ -313,6 +316,7 @@ class WorkflowManagerActor(params: WorkflowManagerActorParams)
ioActor = params.ioActor,
serviceRegistryActor = params.serviceRegistryActor,
workflowLogCopyRouter = params.workflowLogCopyRouter,
workflowCallbackActor = params.workflowCallbackActor,
jobStoreActor = params.jobStoreActor,
subWorkflowStoreActor = params.subWorkflowStoreActor,
callCacheReadActor = params.callCacheReadActor,
Expand Down
Loading