Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

WX-1217 Workflow completion callback #7213

Merged
merged 33 commits into from
Sep 22, 2023
Merged
Show file tree
Hide file tree
Changes from 22 commits
Commits
Show all changes
33 commits
Select commit Hold shift + click to select a range
05a30bb
Draft WorkflowCallbackActor
jgainerdewar Aug 22, 2023
106b669
Configure and run WorkflowCallbackActor in finalization
jgainerdewar Aug 28, 2023
5fc1e66
Actually retry when there is a failure
jgainerdewar Aug 28, 2023
3e8500f
Add workflow callback result to metadata
jgainerdewar Aug 31, 2023
a114372
Add Azure auth
jgainerdewar Sep 1, 2023
4ff381d
Move workflow callback out of finalization step
jgainerdewar Sep 4, 2023
14584e9
Only send outputs if the workflow succeeded
jgainerdewar Sep 4, 2023
a87cf07
Let users configure maxRetries
jgainerdewar Sep 5, 2023
891121e
Fix auth header
jgainerdewar Sep 5, 2023
a5454aa
WorkflowActor test
jgainerdewar Sep 8, 2023
f37d614
Break http interactions out for mocking
jgainerdewar Sep 8, 2023
04b8ce7
WorkflowCallbackActor shutdown
jgainerdewar Sep 8, 2023
a9c2ce2
Fix duplicated variable name
jgainerdewar Sep 8, 2023
27dac91
WorkflowCallbackActor tests
jgainerdewar Sep 8, 2023
b190a44
Fix test
jgainerdewar Sep 10, 2023
84b6461
Docs
jgainerdewar Sep 10, 2023
5f1e3cb
Merge branch 'develop' into jgd_WX-1217_callback
jgainerdewar Sep 11, 2023
2870c92
Include workflow failure message in callback
jgainerdewar Sep 11, 2023
ade1ca2
Merge branch 'jgd_WX-1217_callback' of github.com:broadinstitute/crom…
jgainerdewar Sep 11, 2023
7cddd10
Refer to docs in changelog
jgainerdewar Sep 12, 2023
0270448
Clean up TODOs, not going TO DO them right now
jgainerdewar Sep 12, 2023
a91ddd9
Comments
jgainerdewar Sep 12, 2023
c8ea146
Update engine/src/main/scala/cromwell/engine/workflow/WorkflowActor.s…
jgainerdewar Sep 15, 2023
9867876
More WorkflowActor tests
jgainerdewar Sep 18, 2023
6c94189
Request callback directly from onTransition rather than sending event
jgainerdewar Sep 18, 2023
fd1b171
Create a consistent JSON message schema regardless of workflow status
jgainerdewar Sep 18, 2023
25e2f3a
PR feedback
jgainerdewar Sep 19, 2023
5392525
Dedicated thread pool for WorkflowCallbackActor with configurable size
jgainerdewar Sep 19, 2023
9dcfe21
More sane default handling for config
jgainerdewar Sep 19, 2023
f8983cf
Update docs for new config
jgainerdewar Sep 20, 2023
d51bece
Merge branch 'develop' into jgd_WX-1217_callback
jgainerdewar Sep 21, 2023
029b24c
Comment update
jgainerdewar Sep 22, 2023
b291a1d
Merge branch 'jgd_WX-1217_callback' of github.com:broadinstitute/crom…
jgainerdewar Sep 22, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,11 @@ WDL `size` engine function now works for HTTP files.
Cromwell can now send logs to Azure Application Insights. To enable, set environment
variable `APPLICATIONINSIGHTS_INSTRUMENTATIONKEY` to your account's key. [See here for information.](https://learn.microsoft.com/en-us/azure/azure-monitor/app/sdk-connection-string)

### Workflow Completion Callback

Cromwell can be configured to send a POST request to a specified URL when a workflow completes. The request body
includes the workflow id, terminal state, and (if workflow succeeded) final outputs or (if workflow failed) error message.
See `WorkflowCallback` in [ReadTheDocs](https://cromwell.readthedocs.io/en/stable/) for more information.

## 85 Release Notes

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ case object AzureCredentials {
new DefaultAzureCredentialBuilder()
.authorityHost(azureProfile.getEnvironment.getActiveDirectoryEndpoint)

def getAccessToken(identityClientId: Option[String]): ErrorOr[String] = {
def getAccessToken(identityClientId: Option[String] = None): ErrorOr[String] = {
val credentials = identityClientId.foldLeft(defaultCredentialBuilder) {
(builder, clientId) => builder.managedIdentityClientId(clientId)
}.build()
Expand Down
14 changes: 14 additions & 0 deletions core/src/main/resources/reference.conf
Original file line number Diff line number Diff line change
Expand Up @@ -626,3 +626,17 @@ ga4gh {
contact-info-url = "https://cromwell.readthedocs.io/en/stable/"
}
}

workflow-state-callback {
enabled: false
# endpoint: "http://example.com/foo" # Can be overridden in workflow options
# auth.azure: true
# Users can override default retry behavior if desired
# request-backoff {
# min: "3 seconds"
# max: "5 minutes"
# multiplier: 1.1
# }
# max-retries = 10

}
1 change: 1 addition & 0 deletions core/src/main/scala/cromwell/core/WorkflowOptions.scala
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ object WorkflowOptions {
case object WorkflowFailureMode extends WorkflowOption("workflow_failure_mode")
case object UseReferenceDisks extends WorkflowOption("use_reference_disks")
case object MemoryRetryMultiplier extends WorkflowOption("memory_retry_multiplier")
case object WorkflowCallbackUri extends WorkflowOption("workflow_callback_uri")

private lazy val WorkflowOptionsConf = ConfigFactory.load.getConfig("workflow-options")
private lazy val EncryptedFields: Seq[String] = WorkflowOptionsConf.getStringList("encrypted-fields").asScala.toList
Expand Down
55 changes: 55 additions & 0 deletions docs/cromwell_features/WorkflowCallback.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
The workflow callback is a simple way to integrate Cromwell with an external system. When each workflow reaches a terminal
state, Cromwell will POST a message to a provided URL (see below for schema of this message). Messages are sent for root
workflows only, not subworkflows.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

One last thought - maybe include the phase "best effort" here to make sure it clear that Cromwell will try, but makes no guarantees about delivery


### Configuration

This feature will only be used if enabled via config. All config items except `enabled` are optional.

```
workflow-state-callback {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we include where in the config file should this go? Top level? In engine or service? Somewhere else? (meanwhile, I'm off to look through the code to try to find out 😄 )

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems like the answer is: yes, top level

enabled: true
endpoint: "http://example.com"
auth.azure: true
request-backoff {
min: "3 seconds",
max: "5 minutes",
multiplier: 1.1
}
max-retries = 10
}
```

* `enabled`: This boolean controls whether a callback will be attempted or not.
* `endpoint`: This is the default URL to send the message to. If this is unset, and no URL is set in workflow options, no callback will be sent.
* `auth.azure`: If true, and if Cromwell is running in an Azure environment, Cromwell will include an auth header with bearer token generated from local Azure credentials.
* `request-backoff` and `max-retries`: Include these to override the default retry behavior (default behavior shown here).

### Workflow Options

You may choose to override the `endpoint` set in config by including this workflow option:
```
{
"workflow_callback_uri": "http://mywebsite.com"
}
```

### Callback schema

Below is an example of a callback request body.

```
{
"workflowId": "00001111-2222-3333-4444-555566667777",
"state": "Succeeded",
"outputs": {
"task1.out": 5,
"task2.out": "/some/file.txt"
}
}
```

* `workflowId`: The UUID of the workflow
* `state`: The terminal state of the workflow. The list of possible values is: `Succeeded`, `Failed`, `Aborted`
* `outputs`: The final outputs of the workflow, as would be returned from the `api/workflows/{version}/{id}/outputs` endpoint. This field will ONLY be included when the workflow was successful.
* `failures`: A list of strings describing the workflow's failures. This field will ONLY be included if the workflow failed.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

FWIW it might be easier for auto-generated parsers if this is always included but empty unless needed. Same with outputs

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure, we can do that!

Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package cromwell.engine.workflow

import java.util.concurrent.atomic.AtomicInteger

import akka.actor.SupervisorStrategy.Stop
import akka.actor._
import com.typesafe.config.Config
Expand All @@ -23,6 +22,7 @@ import cromwell.engine.workflow.lifecycle.deletion.DeleteWorkflowFilesActor
import cromwell.engine.workflow.lifecycle.deletion.DeleteWorkflowFilesActor.{DeleteWorkflowFilesFailedResponse, DeleteWorkflowFilesSucceededResponse, StartWorkflowFilesDeletion}
import cromwell.engine.workflow.lifecycle.execution.WorkflowExecutionActor
import cromwell.engine.workflow.lifecycle.execution.WorkflowExecutionActor._
import cromwell.engine.workflow.lifecycle.finalization.WorkflowCallbackActor.PerformCallbackCommand
import cromwell.engine.workflow.lifecycle.finalization.WorkflowFinalizationActor.{StartFinalizationCommand, WorkflowFinalizationFailedResponse, WorkflowFinalizationSucceededResponse}
import cromwell.engine.workflow.lifecycle.finalization.{CopyWorkflowLogsActor, CopyWorkflowOutputsActor, WorkflowFinalizationActor}
import cromwell.engine.workflow.lifecycle.initialization.WorkflowInitializationActor
Expand Down Expand Up @@ -52,6 +52,7 @@ object WorkflowActor {
final case class AbortWorkflowWithExceptionCommand(exception: Throwable) extends WorkflowActorCommand
case object SendWorkflowHeartbeatCommand extends WorkflowActorCommand
case object AwaitMetadataIntegrity
case class PerformWorkflowCallback(uri: Option[String], workflowState: WorkflowState)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

One could imagine a Map[WorkflowState, String] but definitely premature generalization for now


case class WorkflowFailedResponse(workflowId: WorkflowId, inState: WorkflowActorState, reasons: Seq[Throwable])

Expand Down Expand Up @@ -149,7 +150,7 @@ object WorkflowActor {
initializationData: AllBackendInitializationData,
lastStateReached: StateCheckpoint,
effectiveStartableState: StartableState,
workflowFinalOutputs: Set[WomValue] = Set.empty,
workflowFinalOutputs: Option[CallOutputs] = None,
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This change was made so that the PerformWorkflowCallback event handling has access to the final CallOutputs, which it needs to create its POST request. Previously it had access to their values but not their labels/names.

workflowAllOutputs: Set[WomValue] = Set.empty,
rootAndSubworkflowIds: Set[WorkflowId] = Set.empty,
failedInitializationAttempts: Int = 0)
Expand All @@ -176,6 +177,7 @@ object WorkflowActor {
ioActor: ActorRef,
serviceRegistryActor: ActorRef,
workflowLogCopyRouter: ActorRef,
workflowCallbackActor: Option[ActorRef],
jobStoreActor: ActorRef,
subWorkflowStoreActor: ActorRef,
callCacheReadActor: ActorRef,
Expand All @@ -199,6 +201,7 @@ object WorkflowActor {
ioActor = ioActor,
serviceRegistryActor = serviceRegistryActor,
workflowLogCopyRouter = workflowLogCopyRouter,
workflowCallbackActor = workflowCallbackActor,
jobStoreActor = jobStoreActor,
subWorkflowStoreActor = subWorkflowStoreActor,
callCacheReadActor = callCacheReadActor,
Expand Down Expand Up @@ -226,6 +229,7 @@ class WorkflowActor(workflowToStart: WorkflowToStart,
ioActor: ActorRef,
override val serviceRegistryActor: ActorRef,
workflowLogCopyRouter: ActorRef,
workflowCallbackActor: Option[ActorRef],
jobStoreActor: ActorRef,
subWorkflowStoreActor: ActorRef,
callCacheReadActor: ActorRef,
Expand Down Expand Up @@ -516,6 +520,17 @@ class WorkflowActor(workflowToStart: WorkflowToStart,
stay()
case Event(AwaitMetadataIntegrity, data) =>
goto(MetadataIntegrityValidationState) using data.copy(lastStateReached = data.lastStateReached.copy(state = stateName))
case Event(PerformWorkflowCallback(uri, workflowState), data) =>
workflowCallbackActor.foreach { wca =>
wca ! PerformCallbackCommand(
workflowId,
uri,
workflowState,
data.workflowFinalOutputs.getOrElse(CallOutputs.empty),
data.lastStateReached.failures.toList.flatMap(_.map(_.getMessage))
)
}
stay()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

TOL: Is it worth putting this handler into the state we expect this to happen from, and having a separate PerformingWorkflowCallback state to record that this is happening?

}

onTransition {
Expand All @@ -534,26 +549,28 @@ class WorkflowActor(workflowToStart: WorkflowToStart,
case _ => // The WMA is waiting for the WorkflowActorWorkComplete message. No extra information needed here.
}

// Copy/Delete workflow logs
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This section is just a copy/paste of this code to move the brute force workflow options up in scope (outside if (WorkflowLogger.isEnabled).

if (WorkflowLogger.isEnabled) {
/*
* The submitted workflow options have been previously validated by the CromwellApiHandler. These are
* being recreated so that in case MaterializeWorkflowDescriptor fails, the workflow logs can still
* be copied by accessing the workflow options outside of the EngineWorkflowDescriptor.
*/
def bruteForceWorkflowOptions: WorkflowOptions = sources.workflowOptions
val system = context.system
val ec = context.system.dispatcher
def bruteForcePathBuilders: Future[List[PathBuilder]] = {
// Protect against path builders that may throw an exception instead of returning a failed future
Future(EngineFilesystems.pathBuildersForWorkflow(bruteForceWorkflowOptions, pathBuilderFactories)(system))(ec).flatten
}
/*
* The submitted workflow options have been previously validated by the CromwellApiHandler. These are
* being recreated so that in case MaterializeWorkflowDescriptor fails, the workflow logs can still
* be copied by accessing the workflow options outside of the EngineWorkflowDescriptor. Used for both
* copying workflow log and sending workflow callback.
*/
jgainerdewar marked this conversation as resolved.
Show resolved Hide resolved
def bruteForceWorkflowOptions: WorkflowOptions = sources.workflowOptions

val system = context.system
val ec = context.system.dispatcher
def bruteForcePathBuilders: Future[List[PathBuilder]] = {
// Protect against path builders that may throw an exception instead of returning a failed future
Future(EngineFilesystems.pathBuildersForWorkflow(bruteForceWorkflowOptions, pathBuilderFactories)(system))(ec).flatten
}

val (workflowOptions, pathBuilders) = stateData.workflowDescriptor match {
case Some(wd) => (wd.backendDescriptor.workflowOptions, Future.successful(wd.pathBuilders))
case None => (bruteForceWorkflowOptions, bruteForcePathBuilders)
}
val (workflowOptions, pathBuilders) = stateData.workflowDescriptor match {
case Some(wd) => (wd.backendDescriptor.workflowOptions, Future.successful(wd.pathBuilders))
case None => (bruteForceWorkflowOptions, bruteForcePathBuilders)
}

// Copy/Delete workflow logs
if (WorkflowLogger.isEnabled) {
workflowOptions.get(FinalWorkflowLogDir).toOption match {
case Some(destinationDir) =>
pathBuilders
Expand All @@ -566,6 +583,10 @@ class WorkflowActor(workflowToStart: WorkflowToStart,
}
}

// Attempt to perform workflow completion callback
val callbackUri = workflowOptions.get(WorkflowOptions.WorkflowCallbackUri).toOption
self ! PerformWorkflowCallback(callbackUri, terminalState.workflowState)
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I initially implemented this as part of finalization (you can see that working in earlier commits) but that turned out not to work well. Workflows are still Running, and may still fail, during finalization. Also, if a workflow fails very early in its lifecycle we may not ever perform finalization. Putting it here, at the moment of transition to terminal state, seemed right.

I wanted to send a message directly to the WorkflowCallbackActor from here, but couldn't find a way to get access to the final workflow outputs. That's why I send an event to this actor, and get them from the data where the event is handled. If there's a way to get the actor data here I could simplify this a little.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There might be a stateData field that you can use to access this directly. The data in the Event is as much for filtering whether to do an action or not as it is to provide access to the data.


// We can't transition from within another transition function, but we can instruct ourselves to with a message:
self ! AwaitMetadataIntegrity
jgainerdewar marked this conversation as resolved.
Show resolved Hide resolved

Expand Down Expand Up @@ -627,7 +648,7 @@ class WorkflowActor(workflowToStart: WorkflowToStart,
rootWorkflowId = rootWorkflowId,
rootWorkflowRootPaths = data.initializationData.getWorkflowRoots(),
rootAndSubworkflowIds = data.rootAndSubworkflowIds,
workflowFinalOutputs = data.workflowFinalOutputs,
workflowFinalOutputs = data.workflowFinalOutputs.map(out => out.outputs.values.toSet).getOrElse(Set.empty),
workflowAllOutputs = data.workflowAllOutputs,
pathBuilders = data.workflowDescriptor.get.pathBuilders,
serviceRegistryActor = serviceRegistryActor,
Expand Down Expand Up @@ -689,7 +710,7 @@ class WorkflowActor(workflowToStart: WorkflowToStart,
finalizationActor ! StartFinalizationCommand
goto(FinalizingWorkflowState) using data.copy(
lastStateReached = StateCheckpoint (lastStateOverride.getOrElse(stateName), failures),
workflowFinalOutputs = workflowFinalOutputs.outputs.values.toSet,
workflowFinalOutputs = Option(workflowFinalOutputs),
workflowAllOutputs = workflowAllOutputs,
rootAndSubworkflowIds = rootAndSubworkflowIds
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ object WorkflowManagerActor {
ioActor: ActorRef,
serviceRegistryActor: ActorRef,
workflowLogCopyRouter: ActorRef,
workflowCallbackActor: Option[ActorRef],
jobStoreActor: ActorRef,
subWorkflowStoreActor: ActorRef,
callCacheReadActor: ActorRef,
Expand All @@ -75,6 +76,7 @@ object WorkflowManagerActor {
ioActor = ioActor,
serviceRegistryActor = serviceRegistryActor,
workflowLogCopyRouter = workflowLogCopyRouter,
workflowCallbackActor = workflowCallbackActor,
jobStoreActor = jobStoreActor,
subWorkflowStoreActor = subWorkflowStoreActor,
callCacheReadActor = callCacheReadActor,
Expand Down Expand Up @@ -124,6 +126,7 @@ case class WorkflowManagerActorParams(config: Config,
ioActor: ActorRef,
serviceRegistryActor: ActorRef,
workflowLogCopyRouter: ActorRef,
workflowCallbackActor: Option[ActorRef],
jobStoreActor: ActorRef,
subWorkflowStoreActor: ActorRef,
callCacheReadActor: ActorRef,
Expand Down Expand Up @@ -313,6 +316,7 @@ class WorkflowManagerActor(params: WorkflowManagerActorParams)
ioActor = params.ioActor,
serviceRegistryActor = params.serviceRegistryActor,
workflowLogCopyRouter = params.workflowLogCopyRouter,
workflowCallbackActor = params.workflowCallbackActor,
jobStoreActor = params.jobStoreActor,
subWorkflowStoreActor = params.subWorkflowStoreActor,
callCacheReadActor = params.callCacheReadActor,
Expand Down
Loading