diff --git a/CHANGELOG.md b/CHANGELOG.md index 526c244da03..76746d972c8 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -10,6 +10,11 @@ WDL `size` engine function now works for HTTP files. Cromwell can now send logs to Azure Application Insights. To enable, set environment variable `APPLICATIONINSIGHTS_INSTRUMENTATIONKEY` to your account's key. [See here for information.](https://learn.microsoft.com/en-us/azure/azure-monitor/app/sdk-connection-string) +### Workflow Completion Callback + +Cromwell can be configured to send a POST request to a specified URL when a workflow completes. The request body +includes the workflow id, terminal state, and (if applicable) final outputs or error message. +See `WorkflowCallback` in [ReadTheDocs](https://cromwell.readthedocs.io/en/stable/) for more information. ## 85 Release Notes diff --git a/cloudSupport/src/main/scala/cromwell/cloudsupport/azure/AzureCredentials.scala b/cloudSupport/src/main/scala/cromwell/cloudsupport/azure/AzureCredentials.scala index c29155056a9..200b162c614 100644 --- a/cloudSupport/src/main/scala/cromwell/cloudsupport/azure/AzureCredentials.scala +++ b/cloudSupport/src/main/scala/cromwell/cloudsupport/azure/AzureCredentials.scala @@ -33,7 +33,7 @@ case object AzureCredentials { new DefaultAzureCredentialBuilder() .authorityHost(azureProfile.getEnvironment.getActiveDirectoryEndpoint) - def getAccessToken(identityClientId: Option[String]): ErrorOr[String] = { + def getAccessToken(identityClientId: Option[String] = None): ErrorOr[String] = { val credentials = identityClientId.foldLeft(defaultCredentialBuilder) { (builder, clientId) => builder.managedIdentityClientId(clientId) }.build() diff --git a/core/src/main/resources/reference.conf b/core/src/main/resources/reference.conf index d2cc9c5171f..6ec05cf6025 100644 --- a/core/src/main/resources/reference.conf +++ b/core/src/main/resources/reference.conf @@ -626,3 +626,19 @@ ga4gh { contact-info-url = "https://cromwell.readthedocs.io/en/stable/" } } + +workflow-state-callback { + enabled: false + ## The number of threads to allocate for performing callbacks + # num-threads: 5 + # endpoint: "http://example.com/foo" # Can be overridden in workflow options + # auth.azure: true + ## Users can override default retry behavior if desired + # request-backoff { + # min: "3 seconds" + # max: "5 minutes" + # multiplier: 1.1 + # } + # max-retries = 10 + +} diff --git a/core/src/main/scala/cromwell/core/WorkflowOptions.scala b/core/src/main/scala/cromwell/core/WorkflowOptions.scala index 010300b2d8b..91a7c30bbfe 100644 --- a/core/src/main/scala/cromwell/core/WorkflowOptions.scala +++ b/core/src/main/scala/cromwell/core/WorkflowOptions.scala @@ -62,6 +62,7 @@ object WorkflowOptions { case object WorkflowFailureMode extends WorkflowOption("workflow_failure_mode") case object UseReferenceDisks extends WorkflowOption("use_reference_disks") case object MemoryRetryMultiplier extends WorkflowOption("memory_retry_multiplier") + case object WorkflowCallbackUri extends WorkflowOption("workflow_callback_uri") private lazy val WorkflowOptionsConf = ConfigFactory.load.getConfig("workflow-options") private lazy val EncryptedFields: Seq[String] = WorkflowOptionsConf.getStringList("encrypted-fields").asScala.toList diff --git a/docs/cromwell_features/WorkflowCallback.md b/docs/cromwell_features/WorkflowCallback.md new file mode 100644 index 00000000000..275a6ede0a7 --- /dev/null +++ b/docs/cromwell_features/WorkflowCallback.md @@ -0,0 +1,58 @@ +The workflow callback is a simple way to integrate Cromwell with an external system. When each workflow reaches a terminal +state, Cromwell will attempt to POST a message to a provided URL (see below for schema of this message). +Messages are sent for root workflows only, not subworkflows. Callback status information, including success or failure, +will be recorded in workflow metadata with keys containing `workflowCallback`. + +### Configuration + +This feature will only be used if enabled via config. All config items except `enabled` are optional. + +``` +workflow-state-callback { + enabled: true + num-threads: 5 + endpoint: "http://example.com" + auth.azure: true + request-backoff { + min: "3 seconds", + max: "5 minutes", + multiplier: 1.1 + } + max-retries = 10 +} +``` + + * `enabled`: This boolean controls whether a callback will be attempted or not. + * `num-threads`: The number of threads Cromwell will allocate for performing callbacks. + * `endpoint`: This is the default URL to send the message to. If this is unset, and no URL is set in workflow options, no callback will be sent. + * `auth.azure`: If true, and if Cromwell is running in an Azure environment, Cromwell will include an auth header with bearer token generated from local Azure credentials. + * `request-backoff` and `max-retries`: Include these to override the default retry behavior (default behavior shown here). + +### Workflow Options + +You may choose to override the `endpoint` set in config by including this workflow option: +``` +{ + "workflow_callback_uri": "http://mywebsite.com" +} +``` + +### Callback schema + +Below is an example of a callback request body. + +``` +{ + "workflowId": "00001111-2222-3333-4444-555566667777", + "state": "Succeeded", + "outputs": { + "task1.out": 5, + "task2.out": "/some/file.txt" + } +} +``` + + * `workflowId`: The UUID of the workflow + * `state`: The terminal state of the workflow. The list of possible values is: `Succeeded`, `Failed`, `Aborted` + * `outputs`: The final outputs of the workflow, as would be returned from the `api/workflows/{version}/{id}/outputs` endpoint. Expected to be empty when the workflow is not successful.. + * `failures`: A list of strings describing the workflow's failures. Expected to be empty if the workflow did not fail. diff --git a/engine/src/main/scala/cromwell/engine/workflow/WorkflowActor.scala b/engine/src/main/scala/cromwell/engine/workflow/WorkflowActor.scala index b4ccf3de53b..3061e2e8a74 100644 --- a/engine/src/main/scala/cromwell/engine/workflow/WorkflowActor.scala +++ b/engine/src/main/scala/cromwell/engine/workflow/WorkflowActor.scala @@ -1,7 +1,6 @@ package cromwell.engine.workflow import java.util.concurrent.atomic.AtomicInteger - import akka.actor.SupervisorStrategy.Stop import akka.actor._ import com.typesafe.config.Config @@ -23,6 +22,7 @@ import cromwell.engine.workflow.lifecycle.deletion.DeleteWorkflowFilesActor import cromwell.engine.workflow.lifecycle.deletion.DeleteWorkflowFilesActor.{DeleteWorkflowFilesFailedResponse, DeleteWorkflowFilesSucceededResponse, StartWorkflowFilesDeletion} import cromwell.engine.workflow.lifecycle.execution.WorkflowExecutionActor import cromwell.engine.workflow.lifecycle.execution.WorkflowExecutionActor._ +import cromwell.engine.workflow.lifecycle.finalization.WorkflowCallbackActor.PerformCallbackCommand import cromwell.engine.workflow.lifecycle.finalization.WorkflowFinalizationActor.{StartFinalizationCommand, WorkflowFinalizationFailedResponse, WorkflowFinalizationSucceededResponse} import cromwell.engine.workflow.lifecycle.finalization.{CopyWorkflowLogsActor, CopyWorkflowOutputsActor, WorkflowFinalizationActor} import cromwell.engine.workflow.lifecycle.initialization.WorkflowInitializationActor @@ -149,7 +149,7 @@ object WorkflowActor { initializationData: AllBackendInitializationData, lastStateReached: StateCheckpoint, effectiveStartableState: StartableState, - workflowFinalOutputs: Set[WomValue] = Set.empty, + workflowFinalOutputs: Option[CallOutputs] = None, workflowAllOutputs: Set[WomValue] = Set.empty, rootAndSubworkflowIds: Set[WorkflowId] = Set.empty, failedInitializationAttempts: Int = 0) @@ -176,6 +176,7 @@ object WorkflowActor { ioActor: ActorRef, serviceRegistryActor: ActorRef, workflowLogCopyRouter: ActorRef, + workflowCallbackActor: Option[ActorRef], jobStoreActor: ActorRef, subWorkflowStoreActor: ActorRef, callCacheReadActor: ActorRef, @@ -199,6 +200,7 @@ object WorkflowActor { ioActor = ioActor, serviceRegistryActor = serviceRegistryActor, workflowLogCopyRouter = workflowLogCopyRouter, + workflowCallbackActor = workflowCallbackActor, jobStoreActor = jobStoreActor, subWorkflowStoreActor = subWorkflowStoreActor, callCacheReadActor = callCacheReadActor, @@ -226,6 +228,7 @@ class WorkflowActor(workflowToStart: WorkflowToStart, ioActor: ActorRef, override val serviceRegistryActor: ActorRef, workflowLogCopyRouter: ActorRef, + workflowCallbackActor: Option[ActorRef], jobStoreActor: ActorRef, subWorkflowStoreActor: ActorRef, callCacheReadActor: ActorRef, @@ -534,26 +537,27 @@ class WorkflowActor(workflowToStart: WorkflowToStart, case _ => // The WMA is waiting for the WorkflowActorWorkComplete message. No extra information needed here. } - // Copy/Delete workflow logs - if (WorkflowLogger.isEnabled) { - /* - * The submitted workflow options have been previously validated by the CromwellApiHandler. These are - * being recreated so that in case MaterializeWorkflowDescriptor fails, the workflow logs can still - * be copied by accessing the workflow options outside of the EngineWorkflowDescriptor. - */ - def bruteForceWorkflowOptions: WorkflowOptions = sources.workflowOptions - val system = context.system - val ec = context.system.dispatcher - def bruteForcePathBuilders: Future[List[PathBuilder]] = { - // Protect against path builders that may throw an exception instead of returning a failed future - Future(EngineFilesystems.pathBuildersForWorkflow(bruteForceWorkflowOptions, pathBuilderFactories)(system))(ec).flatten - } + /* + * The submitted workflow options have been previously validated by the CromwellApiHandler. These are + * being recreated so that even if the MaterializeWorkflowDescriptor fails, the workflow options can still be + * accessed outside of the EngineWorkflowDescriptor. Used for both copying workflow log and sending workflow callbacks. + */ + def bruteForceWorkflowOptions: WorkflowOptions = sources.workflowOptions + + val system = context.system + val ec = context.system.dispatcher + def bruteForcePathBuilders: Future[List[PathBuilder]] = { + // Protect against path builders that may throw an exception instead of returning a failed future + Future(EngineFilesystems.pathBuildersForWorkflow(bruteForceWorkflowOptions, pathBuilderFactories)(system))(ec).flatten + } - val (workflowOptions, pathBuilders) = stateData.workflowDescriptor match { - case Some(wd) => (wd.backendDescriptor.workflowOptions, Future.successful(wd.pathBuilders)) - case None => (bruteForceWorkflowOptions, bruteForcePathBuilders) - } + val (workflowOptions, pathBuilders) = stateData.workflowDescriptor match { + case Some(wd) => (wd.backendDescriptor.workflowOptions, Future.successful(wd.pathBuilders)) + case None => (bruteForceWorkflowOptions, bruteForcePathBuilders) + } + // Copy/Delete workflow logs + if (WorkflowLogger.isEnabled) { workflowOptions.get(FinalWorkflowLogDir).toOption match { case Some(destinationDir) => pathBuilders @@ -566,6 +570,18 @@ class WorkflowActor(workflowToStart: WorkflowToStart, } } + // Attempt to perform workflow completion callback + workflowCallbackActor.foreach { wca => + val callbackUri = workflowOptions.get(WorkflowOptions.WorkflowCallbackUri).toOption + wca ! PerformCallbackCommand( + workflowId, + callbackUri, + terminalState.workflowState, + stateData.workflowFinalOutputs.getOrElse(CallOutputs.empty), + nextStateData.lastStateReached.failures.toList.flatMap(_.map(_.getMessage)) + ) + } + // We can't transition from within another transition function, but we can instruct ourselves to with a message: self ! AwaitMetadataIntegrity @@ -627,7 +643,7 @@ class WorkflowActor(workflowToStart: WorkflowToStart, rootWorkflowId = rootWorkflowId, rootWorkflowRootPaths = data.initializationData.getWorkflowRoots(), rootAndSubworkflowIds = data.rootAndSubworkflowIds, - workflowFinalOutputs = data.workflowFinalOutputs, + workflowFinalOutputs = data.workflowFinalOutputs.map(out => out.outputs.values.toSet).getOrElse(Set.empty), workflowAllOutputs = data.workflowAllOutputs, pathBuilders = data.workflowDescriptor.get.pathBuilders, serviceRegistryActor = serviceRegistryActor, @@ -689,7 +705,7 @@ class WorkflowActor(workflowToStart: WorkflowToStart, finalizationActor ! StartFinalizationCommand goto(FinalizingWorkflowState) using data.copy( lastStateReached = StateCheckpoint (lastStateOverride.getOrElse(stateName), failures), - workflowFinalOutputs = workflowFinalOutputs.outputs.values.toSet, + workflowFinalOutputs = Option(workflowFinalOutputs), workflowAllOutputs = workflowAllOutputs, rootAndSubworkflowIds = rootAndSubworkflowIds ) diff --git a/engine/src/main/scala/cromwell/engine/workflow/WorkflowManagerActor.scala b/engine/src/main/scala/cromwell/engine/workflow/WorkflowManagerActor.scala index 15d0c4321a7..e84d9091ad0 100644 --- a/engine/src/main/scala/cromwell/engine/workflow/WorkflowManagerActor.scala +++ b/engine/src/main/scala/cromwell/engine/workflow/WorkflowManagerActor.scala @@ -57,6 +57,7 @@ object WorkflowManagerActor { ioActor: ActorRef, serviceRegistryActor: ActorRef, workflowLogCopyRouter: ActorRef, + workflowCallbackActor: Option[ActorRef], jobStoreActor: ActorRef, subWorkflowStoreActor: ActorRef, callCacheReadActor: ActorRef, @@ -75,6 +76,7 @@ object WorkflowManagerActor { ioActor = ioActor, serviceRegistryActor = serviceRegistryActor, workflowLogCopyRouter = workflowLogCopyRouter, + workflowCallbackActor = workflowCallbackActor, jobStoreActor = jobStoreActor, subWorkflowStoreActor = subWorkflowStoreActor, callCacheReadActor = callCacheReadActor, @@ -124,6 +126,7 @@ case class WorkflowManagerActorParams(config: Config, ioActor: ActorRef, serviceRegistryActor: ActorRef, workflowLogCopyRouter: ActorRef, + workflowCallbackActor: Option[ActorRef], jobStoreActor: ActorRef, subWorkflowStoreActor: ActorRef, callCacheReadActor: ActorRef, @@ -313,6 +316,7 @@ class WorkflowManagerActor(params: WorkflowManagerActorParams) ioActor = params.ioActor, serviceRegistryActor = params.serviceRegistryActor, workflowLogCopyRouter = params.workflowLogCopyRouter, + workflowCallbackActor = params.workflowCallbackActor, jobStoreActor = params.jobStoreActor, subWorkflowStoreActor = params.subWorkflowStoreActor, callCacheReadActor = params.callCacheReadActor, diff --git a/engine/src/main/scala/cromwell/engine/workflow/lifecycle/finalization/WorkflowCallbackActor.scala b/engine/src/main/scala/cromwell/engine/workflow/lifecycle/finalization/WorkflowCallbackActor.scala new file mode 100644 index 00000000000..516184d023a --- /dev/null +++ b/engine/src/main/scala/cromwell/engine/workflow/lifecycle/finalization/WorkflowCallbackActor.scala @@ -0,0 +1,216 @@ +package cromwell.engine.workflow.lifecycle.finalization + +import akka.Done +import akka.actor.{Actor, ActorLogging, ActorRef, ActorSystem, Props} +import akka.event.LoggingReceive +import akka.http.scaladsl.Http +import akka.http.scaladsl.marshallers.sprayjson.SprayJsonSupport._ +import akka.http.scaladsl.marshalling.Marshal +import akka.http.scaladsl.model._ +import akka.http.scaladsl.model.headers.RawHeader +import akka.routing.Broadcast +import akka.util.ByteString +import cats.data.Validated.{Invalid, Valid} +import cats.implicits.toTraverseOps +import com.typesafe.config.Config +import com.typesafe.scalalogging.LazyLogging +import common.validation.ErrorOr +import cromwell.cloudsupport.azure.AzureCredentials +import cromwell.core.Dispatcher.IoDispatcher +import cromwell.core.retry.Retry.withRetry +import cromwell.core.retry.SimpleExponentialBackoff +import cromwell.core.{CallOutputs, WorkflowId, WorkflowState} +import cromwell.engine.workflow.lifecycle.finalization.WorkflowCallbackActor.PerformCallbackCommand +import cromwell.engine.workflow.lifecycle.finalization.WorkflowCallbackJsonSupport._ +import net.ceedubs.ficus.Ficus._ + +import scala.concurrent.{ExecutionContext, Future} +import scala.concurrent.duration.DurationInt +import scala.util.Try +import cromwell.services.metadata.MetadataService.PutMetadataAction +import cromwell.services.metadata.{MetadataEvent, MetadataKey, MetadataValue} +import cromwell.util.GracefulShutdownHelper.ShutdownCommand + +import java.net.URI +import java.time.Instant +import java.util.concurrent.Executors +import scala.util.{Failure, Success} + + +case class WorkflowCallbackConfig(enabled: Boolean, + numThreads: Int, + retryBackoff: SimpleExponentialBackoff, + maxRetries: Int, + defaultUri: Option[URI], // May be overridden by workflow options + authMethod: Option[WorkflowCallbackConfig.AuthMethod]) + +object WorkflowCallbackConfig extends LazyLogging { + sealed trait AuthMethod { def getAccessToken: ErrorOr.ErrorOr[String] } + case object AzureAuth extends AuthMethod { + override def getAccessToken: ErrorOr.ErrorOr[String] = AzureCredentials.getAccessToken() + } + + private lazy val defaultNumThreads = 5 + private lazy val defaultRetryBackoff = SimpleExponentialBackoff(3.seconds, 5.minutes, 1.1) + private lazy val defaultMaxRetries = 10 + + def empty: WorkflowCallbackConfig = WorkflowCallbackConfig( + false, defaultNumThreads, defaultRetryBackoff, defaultMaxRetries, None, None + ) + + def apply(config: Config): WorkflowCallbackConfig = { + val enabled = config.as[Boolean]("enabled") + val numThreads = config.as[Option[Int]]("num-threads").getOrElse(defaultNumThreads) + val backoff = config.as[Option[Config]]("request-backoff").map(SimpleExponentialBackoff(_)).getOrElse(defaultRetryBackoff) + val maxRetries = config.as[Option[Int]]("max-retries").getOrElse(defaultMaxRetries) + val uri = config.as[Option[String]]("endpoint").flatMap(createAndValidateUri) + + val authMethod = if (config.hasPath("auth.azure")) { + Option(AzureAuth) + } else None + + WorkflowCallbackConfig( + enabled = enabled, + numThreads = numThreads, + retryBackoff = backoff, + maxRetries = maxRetries, + defaultUri = uri, + authMethod = authMethod + ) + } + + def createAndValidateUri(uriString: String): Option[URI] = { + Try(new URI(uriString)) match { + case Success(uri) => Option(uri) + case Failure(err) => + logger.warn(s"Failed to parse provided workflow callback URI (${uriString}): $err") + None + } + } +} + +/** + * The WorkflowCallbackActor is responsible for sending a message on workflow completion to a configured endpoint. + * This allows for users to build automation around workflow completion without needing to poll Cromwell for + * workflow status. + */ +object WorkflowCallbackActor { + + final case class PerformCallbackCommand(workflowId: WorkflowId, + uri: Option[String], + terminalState: WorkflowState, + workflowOutputs: CallOutputs, + failureMessage: List[String]) + + def props(serviceRegistryActor: ActorRef, + callbackConfig: WorkflowCallbackConfig, + httpClient: CallbackHttpHandler = CallbackHttpHandlerImpl + ) = Props( + new WorkflowCallbackActor( + serviceRegistryActor, + callbackConfig, + httpClient) + ).withDispatcher(IoDispatcher) +} + +class WorkflowCallbackActor(serviceRegistryActor: ActorRef, + config: WorkflowCallbackConfig, + httpClient: CallbackHttpHandler) + extends Actor with ActorLogging { + + // Create a dedicated thread pool for this actor so its damage is limited if we end up with + // too many threads all taking a long time to do callbacks. If we're frequently saturating + // this thread pool, consider refactoring this actor to maintain a queue of callbacks and + // handle them one at a time, as opposed to starting a thread to perform each as soon as its + // received. + implicit val ec = ExecutionContext.fromExecutor(Executors.newFixedThreadPool(config.numThreads)) + implicit val system: ActorSystem = context.system + + override def receive: Actor.Receive = LoggingReceive { + case PerformCallbackCommand(workflowId, requestedCallbackUri, terminalState, outputs, failures) => + // If no uri was provided to us here, fall back to the one in config. If there isn't + // one there, do not perform a callback. + val callbackUri: Option[URI] = requestedCallbackUri.map(WorkflowCallbackConfig.createAndValidateUri).getOrElse(config.defaultUri) + callbackUri.map { uri => + performCallback(workflowId, uri, terminalState, outputs, failures) onComplete { + case Success(_) => + log.info(s"Successfully sent callback for workflow for workflow $workflowId in state $terminalState to $uri") + sendMetadata(workflowId, successful = true, uri) + case Failure(t) => + log.warning(s"Permanently failed to send callback for workflow $workflowId in state $terminalState to $uri: ${t.getMessage}") + sendMetadata(workflowId, successful = false, uri) + } + }.getOrElse(()) + case Broadcast(ShutdownCommand) | ShutdownCommand => context stop self + case other => log.warning(s"WorkflowCallbackActor received an unexpected message: $other") + } + + private def makeHeaders: Future[List[HttpHeader]] = { + config.authMethod.toList.map(_.getAccessToken).map { + case Valid(header) => Future.successful(header) + case Invalid(err) => Future.failed(new RuntimeException(err.toString)) + } + .map(t => t.map(t => RawHeader("Authorization", s"Bearer $t"))) + .traverse(identity) + } + + private def performCallback(workflowId: WorkflowId, callbackUri: URI, terminalState: WorkflowState, outputs: CallOutputs, failures: List[String]): Future[Done] = { + val callbackPostBody = CallbackMessage(workflowId.toString, terminalState.toString, outputs.outputs.map(entry => (entry._1.name, entry._2)), failures) + for { + entity <- Marshal(callbackPostBody).to[RequestEntity] + headers <- makeHeaders + request = HttpRequest(method = HttpMethods.POST, uri = callbackUri.toString, entity = entity).withHeaders(headers) + response <- withRetry( + () => sendRequestOrFail(request), + backoff = config.retryBackoff, + maxRetries = Option(config.maxRetries), + onRetry = err => log.warning(s"Will retry after failure to send workflow callback for workflow $workflowId in state $terminalState to $callbackUri : $err") + ) + result <- + // Akka will get upset if we have a response body and leave it totally unread. + // Since there's nothing here we want to read, we need to deliberately discard it. + response.entity.discardBytes().future + } yield result + } + + private def sendRequestOrFail(request: HttpRequest): Future[HttpResponse] = + httpClient.sendRequest(request).flatMap(response => + if (response.status.isFailure()) { + response.entity.dataBytes.runFold(ByteString(""))(_ ++ _).map(_.utf8String) flatMap { errorBody => + Future.failed( + new RuntimeException(s"HTTP ${response.status.value}: $errorBody") + ) + } + } else Future.successful(response) + ) + + private def sendMetadata(workflowId: WorkflowId, successful: Boolean, uri: URI): Unit = { + val events = List( + MetadataEvent( + MetadataKey(workflowId, None, "workflowCallback", "successful"), + MetadataValue(successful) + ), + MetadataEvent( + MetadataKey(workflowId, None, "workflowCallback", "url"), + MetadataValue(uri.toString) + ), + MetadataEvent( + MetadataKey(workflowId, None, "workflowCallback", "timestamp"), + MetadataValue(Instant.now()) + ) + ) + serviceRegistryActor ! PutMetadataAction(events) + } + +} + +// Wrap the http call in a trait so it can be easily mocked for testing +trait CallbackHttpHandler { + def sendRequest(httpRequest: HttpRequest)(implicit actorSystem: ActorSystem): Future[HttpResponse] +} + +object CallbackHttpHandlerImpl extends CallbackHttpHandler { + override def sendRequest(httpRequest: HttpRequest)(implicit actorSystem: ActorSystem): Future[HttpResponse] = { + Http().singleRequest(httpRequest) + } +} diff --git a/engine/src/main/scala/cromwell/engine/workflow/lifecycle/finalization/WorkflowCallbackJsonSupport.scala b/engine/src/main/scala/cromwell/engine/workflow/lifecycle/finalization/WorkflowCallbackJsonSupport.scala new file mode 100644 index 00000000000..eb9b1c7ea9e --- /dev/null +++ b/engine/src/main/scala/cromwell/engine/workflow/lifecycle/finalization/WorkflowCallbackJsonSupport.scala @@ -0,0 +1,14 @@ +package cromwell.engine.workflow.lifecycle.finalization + +import cromwell.util.JsonFormatting.WomValueJsonFormatter.WomValueJsonFormat +import spray.json.DefaultJsonProtocol +import wom.values.WomValue + +final case class CallbackMessage(workflowId: String, + state: String, + outputs: Map[String, WomValue], + failures: List[String]) + +object WorkflowCallbackJsonSupport extends DefaultJsonProtocol { + implicit val callbackMessageFormat = jsonFormat4(CallbackMessage) +} diff --git a/engine/src/main/scala/cromwell/server/CromwellRootActor.scala b/engine/src/main/scala/cromwell/server/CromwellRootActor.scala index df4fc5b1207..c37d6466fcc 100644 --- a/engine/src/main/scala/cromwell/server/CromwellRootActor.scala +++ b/engine/src/main/scala/cromwell/server/CromwellRootActor.scala @@ -20,7 +20,7 @@ import cromwell.engine.io.{IoActor, IoActorProxy} import cromwell.engine.workflow.WorkflowManagerActor import cromwell.engine.workflow.WorkflowManagerActor.AbortAllWorkflowsCommand import cromwell.engine.workflow.lifecycle.execution.callcaching.{CallCache, CallCacheReadActor, CallCacheWriteActor} -import cromwell.engine.workflow.lifecycle.finalization.CopyWorkflowLogsActor +import cromwell.engine.workflow.lifecycle.finalization.{CopyWorkflowLogsActor, WorkflowCallbackActor, WorkflowCallbackConfig} import cromwell.engine.workflow.tokens.{DynamicRateLimiter, JobTokenDispenserActor} import cromwell.engine.workflow.workflowstore.AbortRequestScanningActor.AbortConfig import cromwell.engine.workflow.workflowstore._ @@ -122,6 +122,18 @@ abstract class CromwellRootActor(terminator: CromwellTerminator, .props(CopyWorkflowLogsActor.props(serviceRegistryActor, ioActor)), "WorkflowLogCopyRouter") + private val workflowCallbackConfig = WorkflowCallbackConfig(config.getConfig("workflow-state-callback")) + + lazy val workflowCallbackActor: Option[ActorRef] = { + if (workflowCallbackConfig.enabled) { + val props = WorkflowCallbackActor.props( + serviceRegistryActor, + workflowCallbackConfig + ) + Option(context.actorOf(props, "WorkflowCallbackActor")) + } else None + } + //Call-caching config validation lazy val callCachingConfig = config.getConfig("call-caching") lazy val callCachingEnabled = callCachingConfig.getBoolean("enabled") @@ -174,6 +186,7 @@ abstract class CromwellRootActor(terminator: CromwellTerminator, ioActor = ioActorProxy, serviceRegistryActor = serviceRegistryActor, workflowLogCopyRouter = workflowLogCopyRouter, + workflowCallbackActor = workflowCallbackActor, jobStoreActor = jobStoreActor, subWorkflowStoreActor = subWorkflowStoreActor, callCacheReadActor = callCacheReadActor, @@ -218,6 +231,7 @@ abstract class CromwellRootActor(terminator: CromwellTerminator, actorSystem = context.system, workflowManagerActor = workflowManagerActor, logCopyRouter = workflowLogCopyRouter, + workflowCallbackActor = workflowCallbackActor, jobStoreActor = jobStoreActor, jobTokenDispenser = jobExecutionTokenDispenserActor, workflowStoreActor = workflowStoreActor, diff --git a/engine/src/main/scala/cromwell/server/CromwellShutdown.scala b/engine/src/main/scala/cromwell/server/CromwellShutdown.scala index 99e4041db56..ce204249b1e 100644 --- a/engine/src/main/scala/cromwell/server/CromwellShutdown.scala +++ b/engine/src/main/scala/cromwell/server/CromwellShutdown.scala @@ -1,7 +1,6 @@ package cromwell.server import java.util.concurrent.atomic.AtomicBoolean - import akka.Done import akka.actor._ import akka.http.scaladsl.Http @@ -83,6 +82,7 @@ object CromwellShutdown extends GracefulStopSupport { actorSystem: ActorSystem, workflowManagerActor: ActorRef, logCopyRouter: ActorRef, + workflowCallbackActor: Option[ActorRef], jobTokenDispenser: ActorRef, jobStoreActor: ActorRef, workflowStoreActor: ActorRef, @@ -180,6 +180,7 @@ object CromwellShutdown extends GracefulStopSupport { shutdownActor(workflowStoreActor, CoordinatedShutdown.PhaseServiceRequestsDone, ShutdownCommand) shutdownActor(logCopyRouter, CoordinatedShutdown.PhaseServiceRequestsDone, Broadcast(ShutdownCommand)) + workflowCallbackActor.foreach(wca => shutdownActor(wca, CoordinatedShutdown.PhaseServiceRequestsDone, Broadcast(ShutdownCommand))) shutdownActor(jobTokenDispenser, CoordinatedShutdown.PhaseServiceRequestsDone, ShutdownCommand) /* diff --git a/engine/src/test/scala/cromwell/engine/workflow/lifecycle/finalization/WorkflowCallbackActorSpec.scala b/engine/src/test/scala/cromwell/engine/workflow/lifecycle/finalization/WorkflowCallbackActorSpec.scala new file mode 100644 index 00000000000..c469d1dcaa2 --- /dev/null +++ b/engine/src/test/scala/cromwell/engine/workflow/lifecycle/finalization/WorkflowCallbackActorSpec.scala @@ -0,0 +1,278 @@ +package cromwell.engine.workflow.lifecycle.finalization + +import akka.testkit._ +import akka.http.scaladsl.client.RequestBuilding.Post +import akka.http.scaladsl.marshallers.sprayjson.SprayJsonSupport._ +import akka.http.scaladsl.model.{HttpResponse, StatusCodes} +import akka.testkit.TestProbe +import common.mock.MockSugar +import cromwell.core.retry.SimpleExponentialBackoff +import org.mockito.Mockito._ +import cromwell.core.{CallOutputs, TestKitSuite, WorkflowFailed, WorkflowId, WorkflowSucceeded} +import cromwell.engine.workflow.lifecycle.finalization.WorkflowCallbackActor.PerformCallbackCommand +import cromwell.engine.workflow.lifecycle.finalization.WorkflowCallbackJsonSupport._ +import cromwell.services.metadata.MetadataService.PutMetadataAction +import cromwell.services.metadata.{MetadataEvent, MetadataKey, MetadataValue} +import cromwell.util.{GracefulShutdownHelper, WomMocks} +import org.scalatest.flatspec.AnyFlatSpecLike +import org.scalatest.matchers.should.Matchers +import wom.values.WomString + +import java.net.URI +import java.time.Instant +import scala.concurrent.duration._ +import scala.concurrent.Future + +class WorkflowCallbackActorSpec + extends TestKitSuite with AnyFlatSpecLike with Matchers with MockSugar { + + behavior of "WorkflowCallbackActor" + + private implicit val ec = system.dispatcher + + private val msgWait = 10.second.dilated + private val awaitAlmostNothing = 1.second + private val serviceRegistryActor = TestProbe("testServiceRegistryActor") + private val deathWatch = TestProbe("deathWatch") + private val mockUri = new URI("http://example.com") + private val basicConfig = WorkflowCallbackConfig.empty.copy(enabled = true) + private val basicOutputs = WomMocks.mockOutputExpectations(List("foo" -> WomString("bar")).toMap) + + private val httpSuccess = Future.successful(HttpResponse.apply(StatusCodes.OK)) + private val httpFailure = Future.successful(HttpResponse.apply(StatusCodes.GatewayTimeout)) + + private def metadataEvents(workflowId: WorkflowId, successful: Boolean) = ( + MetadataEvent( + MetadataKey(workflowId, None, "workflowCallback", "successful"), + MetadataValue(successful) + ), + MetadataEvent( + MetadataKey(workflowId, None, "workflowCallback", "url"), + MetadataValue(mockUri.toString) + ), + MetadataEvent( + MetadataKey(workflowId, None, "workflowCallback", "timestamp"), + MetadataValue(Instant.now()) + ) + ) + + it should "send a command to the default URI and record the correct metadata" in { + // Setup + val workflowId = WorkflowId.randomId() + val mockHttpClient = mock[CallbackHttpHandler] + + val expectedPostBody = CallbackMessage( + workflowId.toString, + WorkflowSucceeded.toString, + basicOutputs.outputs.map(entry => (entry._1.name, entry._2)), + List.empty + ) + val expectedRequest = Post(mockUri.toString, expectedPostBody) + val (expectedResultMetadata, expectedUriMetadata, expectedTimestampMetadata) = metadataEvents(workflowId, true) + + when(mockHttpClient.sendRequest(expectedRequest)).thenReturn(httpSuccess) + + val props = WorkflowCallbackActor.props( + serviceRegistryActor.ref, + basicConfig.copy(defaultUri = Option(mockUri)), + httpClient = mockHttpClient + ) + val workflowCallbackActor = system.actorOf(props, "testWorkflowCallbackActorSuccess") + + // Do the thing + val cmd = PerformCallbackCommand( + workflowId = workflowId, uri = None, terminalState = WorkflowSucceeded, workflowOutputs = basicOutputs, List.empty + ) + workflowCallbackActor ! cmd + + // Check the result + serviceRegistryActor.expectMsgPF(msgWait) { + case PutMetadataAction(List(resultEvent, uriEvent, timestampEvent), _) => + resultEvent.key shouldBe expectedResultMetadata.key + resultEvent.value shouldBe expectedResultMetadata.value + uriEvent.key shouldBe expectedUriMetadata.key + uriEvent.value shouldBe expectedUriMetadata.value + timestampEvent.key shouldBe expectedTimestampMetadata.key + // Not checking timestamp value because it won't match + case _ => + } + + verify(mockHttpClient, times(1)).sendRequest(expectedRequest) + + // Shut it down + deathWatch.watch(workflowCallbackActor) + workflowCallbackActor ! GracefulShutdownHelper.ShutdownCommand + deathWatch.expectTerminated(workflowCallbackActor, msgWait) + + } + + it should "correctly handle a callback that fails, then succeeds" in { + // Setup + val workflowId = WorkflowId.randomId() + val mockHttpClient = mock[CallbackHttpHandler] + + val expectedPostBody = CallbackMessage( + workflowId.toString, + WorkflowSucceeded.toString, + basicOutputs.outputs.map(entry => (entry._1.name, entry._2)), + List.empty + ) + val expectedRequest = Post(mockUri.toString, expectedPostBody) + + val (expectedResultMetadata, expectedUriMetadata, expectedTimestampMetadata) = metadataEvents(workflowId, true) + + when(mockHttpClient.sendRequest(expectedRequest)).thenReturn(httpFailure, httpFailure, httpSuccess) + + val props = WorkflowCallbackActor.props( + serviceRegistryActor.ref, + basicConfig.copy(defaultUri = Option(mockUri)), + httpClient = mockHttpClient + ) + val workflowCallbackActor = system.actorOf(props, "testWorkflowCallbackActorFailThenSuccees") + + // Do the thing + val cmd = PerformCallbackCommand( + workflowId = workflowId, uri = None, terminalState = WorkflowSucceeded, workflowOutputs = basicOutputs, List.empty + ) + workflowCallbackActor ! cmd + + // Check the result + serviceRegistryActor.expectMsgPF(msgWait) { + case PutMetadataAction(List(resultEvent, uriEvent, timestampEvent), _) => + resultEvent.key shouldBe expectedResultMetadata.key + resultEvent.value shouldBe expectedResultMetadata.value + uriEvent.key shouldBe expectedUriMetadata.key + uriEvent.value shouldBe expectedUriMetadata.value + timestampEvent.key shouldBe expectedTimestampMetadata.key + // Not checking timestamp value because it won't match + case _ => + } + + verify(mockHttpClient, times(3)).sendRequest(expectedRequest) + + // Shut it down + deathWatch.watch(workflowCallbackActor) + workflowCallbackActor ! GracefulShutdownHelper.ShutdownCommand + deathWatch.expectTerminated(workflowCallbackActor, msgWait) + + } + + it should "retry sending the callback for a failing workflow before failing" in { + // Setup + val workflowId = WorkflowId.randomId() + val mockHttpClient = mock[CallbackHttpHandler] + + val errorMsg = "Something bad happened :(" + val expectedPostBody = CallbackMessage( + workflowId.toString, + WorkflowFailed.toString, + Map.empty, + List(errorMsg) + ) + val expectedRequest = Post(mockUri.toString, expectedPostBody) + + val (expectedResultMetadata, expectedUriMetadata, expectedTimestampMetadata) = metadataEvents(workflowId, false) + + when(mockHttpClient.sendRequest(expectedRequest)).thenReturn(httpFailure) + + val props = WorkflowCallbackActor.props( + serviceRegistryActor.ref, + basicConfig.copy( + retryBackoff = SimpleExponentialBackoff(500.millis, 1.minute, 1.1), + maxRetries = 5, + ), + httpClient = mockHttpClient + ) + val workflowCallbackActor = system.actorOf(props, "testWorkflowCallbackActorFail") + + // Do the thing + val cmd = PerformCallbackCommand( + workflowId = workflowId, + uri = Option(mockUri.toString), + terminalState = WorkflowFailed, + workflowOutputs = CallOutputs.empty, + List(errorMsg) + ) + workflowCallbackActor ! cmd + + // Check the result + serviceRegistryActor.expectMsgPF(msgWait) { + case PutMetadataAction(List(resultEvent, uriEvent, timestampEvent), _) => + resultEvent.key shouldBe expectedResultMetadata.key + resultEvent.value shouldBe expectedResultMetadata.value + uriEvent.key shouldBe expectedUriMetadata.key + uriEvent.value shouldBe expectedUriMetadata.value + timestampEvent.key shouldBe expectedTimestampMetadata.key + // Not checking timestamp value because it won't match + case _ => + } + + verify(mockHttpClient, times(5)).sendRequest(expectedRequest) + + // Shut it down + deathWatch.watch(workflowCallbackActor) + workflowCallbackActor ! GracefulShutdownHelper.ShutdownCommand + deathWatch.expectTerminated(workflowCallbackActor, msgWait) + + } + + it should "not send a callback if no URI is provided" in { + // Setup + val workflowId = WorkflowId.randomId() + val mockHttpClient = mock[CallbackHttpHandler] + + val props = WorkflowCallbackActor.props( + serviceRegistryActor.ref, + basicConfig, + httpClient = mockHttpClient + ) + val workflowCallbackActor = system.actorOf(props, "testWorkflowCallbackActorNoUri") + + // Do the thing + val cmd = PerformCallbackCommand( + workflowId = workflowId, uri = None, terminalState = WorkflowSucceeded, workflowOutputs = basicOutputs, List.empty + ) + workflowCallbackActor ! cmd + + // Check the result + serviceRegistryActor.expectNoMessage(awaitAlmostNothing) + verifyNoInteractions(mockHttpClient) + + // Shut it down + deathWatch.watch(workflowCallbackActor) + workflowCallbackActor ! GracefulShutdownHelper.ShutdownCommand + deathWatch.expectTerminated(workflowCallbackActor, msgWait) + } + + it should "do nothing when given a bogus URI" in { + // Setup + val workflowId = WorkflowId.randomId() + val mockHttpClient = mock[CallbackHttpHandler] + + val props = WorkflowCallbackActor.props( + serviceRegistryActor.ref, + basicConfig, + httpClient = mockHttpClient + ) + val workflowCallbackActor = system.actorOf(props, "testWorkflowCallbackActorBogusUri") + + // Do the thing + val cmd = PerformCallbackCommand( + workflowId = workflowId, + uri = Option("this is not a very good URI, is it"), + terminalState = WorkflowSucceeded, + workflowOutputs = basicOutputs, + List.empty + ) + workflowCallbackActor ! cmd + + // Check the result + serviceRegistryActor.expectNoMessage(awaitAlmostNothing) + verifyNoInteractions(mockHttpClient) + + // Shut it down + deathWatch.watch(workflowCallbackActor) + workflowCallbackActor ! GracefulShutdownHelper.ShutdownCommand + deathWatch.expectTerminated(workflowCallbackActor, msgWait) + } +} diff --git a/server/src/test/scala/cromwell/SimpleWorkflowActorSpec.scala b/server/src/test/scala/cromwell/SimpleWorkflowActorSpec.scala index 375c51e886b..b3f4df64d07 100644 --- a/server/src/test/scala/cromwell/SimpleWorkflowActorSpec.scala +++ b/server/src/test/scala/cromwell/SimpleWorkflowActorSpec.scala @@ -68,6 +68,7 @@ class SimpleWorkflowActorSpec extends CromwellTestKitWordSpec with BeforeAndAfte invalidateBadCacheResults = invalidateBadCacheResults, serviceRegistryActor = watchActor, workflowLogCopyRouter = system.actorOf(Props.empty, s"workflow-copy-log-router-$workflowId-${UUID.randomUUID()}"), + workflowCallbackActor = None, jobStoreActor = system.actorOf(AlwaysHappyJobStoreActor.props), subWorkflowStoreActor = system.actorOf(AlwaysHappySubWorkflowStoreActor.props), callCacheReadActor = system.actorOf(EmptyCallCacheReadActor.props), diff --git a/server/src/test/scala/cromwell/engine/workflow/WorkflowActorSpec.scala b/server/src/test/scala/cromwell/engine/workflow/WorkflowActorSpec.scala index d9de71b9125..6f7c57ae985 100644 --- a/server/src/test/scala/cromwell/engine/workflow/WorkflowActorSpec.scala +++ b/server/src/test/scala/cromwell/engine/workflow/WorkflowActorSpec.scala @@ -15,16 +15,18 @@ import cromwell.engine.workflow.WorkflowActor._ import cromwell.engine.workflow.WorkflowManagerActor.WorkflowActorWorkComplete import cromwell.engine.workflow.lifecycle.EngineLifecycleActorAbortCommand import cromwell.engine.workflow.lifecycle.execution.WorkflowExecutionActor.{ExecuteWorkflowCommand, WorkflowExecutionAbortedResponse, WorkflowExecutionFailedResponse, WorkflowExecutionSucceededResponse} -import cromwell.engine.workflow.lifecycle.finalization.CopyWorkflowLogsActor -import cromwell.engine.workflow.lifecycle.finalization.WorkflowFinalizationActor.{StartFinalizationCommand, WorkflowFinalizationSucceededResponse} +import cromwell.engine.workflow.lifecycle.finalization.{CopyWorkflowLogsActor, WorkflowCallbackActor} +import cromwell.engine.workflow.lifecycle.finalization.WorkflowFinalizationActor.{StartFinalizationCommand, WorkflowFinalizationFailedResponse, WorkflowFinalizationSucceededResponse} import cromwell.engine.workflow.lifecycle.initialization.WorkflowInitializationActor.{StartInitializationCommand, WorkflowInitializationAbortedResponse, WorkflowInitializationFailedResponse, WorkflowInitializationSucceededResponse} import cromwell.engine.workflow.lifecycle.materialization.MaterializeWorkflowDescriptorActor.MaterializeWorkflowDescriptorFailureResponse import cromwell.engine.workflow.workflowstore.{StartableState, Submitted, WorkflowHeartbeatConfig, WorkflowToStart} import cromwell.engine.{EngineFilesystems, EngineWorkflowDescriptor} import cromwell.services.metadata.MetadataService.{MetadataWriteSuccess, PutMetadataActionAndRespond} import cromwell.util.SampleWdl.ThreeStep +import cromwell.util.WomMocks import org.scalatest.BeforeAndAfter import org.scalatest.concurrent.Eventually +import wom.values.WomString import scala.concurrent.duration._ import scala.concurrent.{ExecutionContext, Future} @@ -50,7 +52,8 @@ class WorkflowActorSpec extends CromwellTestKitWordSpec with WorkflowDescriptorB ) val mockDir: Path = DefaultPathBuilder.get("/where/to/copy/wf/logs") - val mockWorkflowOptions = s"""{ "final_workflow_log_dir" : "$mockDir" }""" + val mockUri: String = "http://example.com" + val mockWorkflowOptions = s"""{ "final_workflow_log_dir" : "$mockDir", "workflow_callback_uri": "$mockUri" }""" var currentWorkflowId: WorkflowId = _ val currentLifecycleActor: TestProbe = TestProbe("currentLifecycleActor") @@ -63,6 +66,7 @@ class WorkflowActorSpec extends CromwellTestKitWordSpec with WorkflowDescriptorB val executionProbe: TestProbe = TestProbe("executionProbe") val finalizationProbe: TestProbe = TestProbe("finalizationProbe") var copyWorkflowLogsProbe: TestProbe = _ + val workflowCallbackProbe: TestProbe = TestProbe("workflowCallbackProbe") val AwaitAlmostNothing: FiniteDuration = 100.milliseconds val initialJobCtByRootWf = new AtomicInteger() val callCachingEnabled = true @@ -79,7 +83,11 @@ class WorkflowActorSpec extends CromwellTestKitWordSpec with WorkflowDescriptorB private val workflowHeartbeatConfig = WorkflowHeartbeatConfig(ConfigFactory.load()) - private def createWorkflowActor(state: WorkflowActorState, extraPathBuilderFactory: Option[PathBuilderFactory] = None, initializationMaxRetries: Int = 3, initializationInterval: FiniteDuration = 10.millis) = { + private def createWorkflowActor(state: WorkflowActorState, + extraPathBuilderFactory: Option[PathBuilderFactory] = None, + workflowCallbackActor: Option[ActorRef] = None, + initializationMaxRetries: Int = 3, + initializationInterval: FiniteDuration = 10.millis) = { val actor = TestFSMRef( factory = new WorkflowActorWithTestAddons( finalizationProbe = finalizationProbe, @@ -92,6 +100,7 @@ class WorkflowActorSpec extends CromwellTestKitWordSpec with WorkflowDescriptorB ioActor = system.actorOf(SimpleIoActor.props, s"ioActor-$currentWorkflowId"), serviceRegistryActor = mockServiceRegistryActor, workflowLogCopyRouter = copyWorkflowLogsProbe.ref, + workflowCallbackActor = workflowCallbackActor, jobStoreActor = system.actorOf(AlwaysHappyJobStoreActor.props, s"jobStoreActor-$currentWorkflowId"), subWorkflowStoreActor = system.actorOf(AlwaysHappySubWorkflowStoreActor.props, s"subWorkflowStoreActor-$currentWorkflowId"), @@ -328,6 +337,31 @@ class WorkflowActorSpec extends CromwellTestKitWordSpec with WorkflowDescriptorB val _ = createWorkflowActor(WorkflowSucceededState, Option(new ThrowingPathBuilderFactory())) } } + + "send a workflow callback message" in { + val actor = createWorkflowActor(ExecutingWorkflowState, workflowCallbackActor = Option(workflowCallbackProbe.ref)) + deathwatch watch actor + val mockOutputs = WomMocks.mockOutputExpectations(Map("foo" -> WomString("bar"))) + val msg = WorkflowCallbackActor.PerformCallbackCommand(currentWorkflowId, Some(mockUri), WorkflowSucceeded, mockOutputs, List.empty) + + workflowCallbackProbe.expectNoMessage(AwaitAlmostNothing) + actor ! WorkflowExecutionSucceededResponse(Map.empty, Set(currentWorkflowId), mockOutputs, Set.empty) + actor ! WorkflowFinalizationSucceededResponse + workflowCallbackProbe.expectMsg(msg) + deathwatch.expectTerminated(actor) + } + + "send a workflow callback message for a failing workflow" in { + val actor = createWorkflowActor(FinalizingWorkflowState, workflowCallbackActor = Option(workflowCallbackProbe.ref)) + deathwatch watch actor + val errorText = "oh nooo :(" + val msg = WorkflowCallbackActor.PerformCallbackCommand(currentWorkflowId, Some(mockUri), WorkflowFailed, CallOutputs.empty, List(errorText)) + + workflowCallbackProbe.expectNoMessage(AwaitAlmostNothing) + actor ! WorkflowFinalizationFailedResponse(Seq(new RuntimeException(errorText))) + workflowCallbackProbe.expectMsg(msg) + deathwatch.expectTerminated(actor) + } } } @@ -353,6 +387,7 @@ class WorkflowActorWithTestAddons(val finalizationProbe: TestProbe, ioActor: ActorRef, serviceRegistryActor: ActorRef, workflowLogCopyRouter: ActorRef, + workflowCallbackActor: Option[ActorRef], jobStoreActor: ActorRef, subWorkflowStoreActor: ActorRef, callCacheReadActor: ActorRef, @@ -379,6 +414,7 @@ class WorkflowActorWithTestAddons(val finalizationProbe: TestProbe, ioActor = ioActor, serviceRegistryActor = serviceRegistryActor, workflowLogCopyRouter = workflowLogCopyRouter, + workflowCallbackActor, jobStoreActor = jobStoreActor, subWorkflowStoreActor = subWorkflowStoreActor, callCacheReadActor = callCacheReadActor,