diff --git a/README.md b/README.md index 00517d8f..b52e5250 100644 --- a/README.md +++ b/README.md @@ -107,6 +107,8 @@ All tests are available under [src/test/java](https://github.com/temporalio/samp - [**Money Batch**](https://github.com/temporalio/samples-java/tree/master/src/main/java/io/temporal/samples/moneybatch): Demonstrates a situation where a single deposit should be initiated for multiple withdrawals. For example, a seller might want to be paid once per fixed number of transactions. This sample can be easily extended to perform a payment based on more complex criteria, such as at a specific time or an accumulated amount. The sample also demonstrates how to Signal the Workflow when it executes (*Signal with start*). If the Workflow is already executing, it just receives the Signal. If it is not executing, then the Workflow executes first, and then the Signal is delivered to it. *Signal with start* is a "lazy" way to execute Workflows when Signaling them. +- [**Customer Application Approval DSL**](https://github.com/temporalio/samples-java/tree/master/src/main/java/io/temporal/samples/dsl): Demonstrates execution of a customer application approval workflow defined in a DSL (like JSON or YAML) + ### API demonstrations - [**Updatable Timer**](https://github.com/temporalio/samples-java/tree/master/src/main/java/io/temporal/samples/updatabletimer): Demonstrates the use of a helper class which relies on `Workflow.await` to implement a blocking sleep that can be updated at any moment. diff --git a/build.gradle b/build.gradle index 44ccc5ab..4b322931 100644 --- a/build.gradle +++ b/build.gradle @@ -36,6 +36,10 @@ dependencies { implementation group: 'io.cloudevents', name: 'cloudevents-core', version: '2.2.0' implementation group: 'io.cloudevents', name: 'cloudevents-api', version: '2.2.0' implementation group: 'io.cloudevents', name: 'cloudevents-json-jackson', version: '2.2.0' + implementation group: 'io.serverlessworkflow', name: 'serverlessworkflow-api', version: '3.0.0.Final' + implementation group: 'io.serverlessworkflow', name: 'serverlessworkflow-validation', version: '3.0.0.Final' + implementation group: 'io.serverlessworkflow', name: 'serverlessworkflow-spi', version: '3.0.0.Final' + implementation group: 'com.jayway.jsonpath', name: 'json-path', version: '2.6.0' testImplementation group: 'io.temporal', name: 'temporal-testing', version: '1.2.0' testImplementation group: 'io.temporal', name: 'temporal-testing-junit4', version: '1.2.0' @@ -44,8 +48,6 @@ dependencies { testImplementation group: 'org.mockito', name: 'mockito-core', version: '3.12.4' testImplementation group: 'org.powermock', name: 'powermock-api-mockito2', version: '2.0.9' - - testImplementation 'org.junit.jupiter:junit-jupiter-api:5.8.0-M1' testRuntimeOnly 'org.junit.jupiter:junit-jupiter-engine:5.8.0-M1' testCompileOnly 'junit:junit:4.13.2' @@ -70,6 +72,8 @@ task execute(type: JavaExec) { license { header rootProject.file('license-header.txt') + exclude '**/*.json' + exclude '**/*.yml' } // Executes most of the hello workflows as a sanity check diff --git a/src/main/java/io/temporal/samples/dsl/DslActivities.java b/src/main/java/io/temporal/samples/dsl/DslActivities.java new file mode 100644 index 00000000..bbe6b46c --- /dev/null +++ b/src/main/java/io/temporal/samples/dsl/DslActivities.java @@ -0,0 +1,34 @@ +/* + * Copyright (c) 2020 Temporal Technologies, Inc. All Rights Reserved + * + * Copyright 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Modifications copyright (C) 2017 Uber Technologies, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"). You may not + * use this file except in compliance with the License. A copy of the License is + * located at + * + * http://aws.amazon.com/apache2.0 + * + * or in the "license" file accompanying this file. This file is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +package io.temporal.samples.dsl; + +import com.fasterxml.jackson.databind.JsonNode; +import io.temporal.activity.ActivityInterface; + +@ActivityInterface +public interface DslActivities { + JsonNode checkCustomerInfo(); + + JsonNode approveApplication(); + + JsonNode rejectApplication(); + + JsonNode updateApplicationInfo(); +} diff --git a/src/main/java/io/temporal/samples/dsl/DslActivitiesImpl.java b/src/main/java/io/temporal/samples/dsl/DslActivitiesImpl.java new file mode 100644 index 00000000..649c7546 --- /dev/null +++ b/src/main/java/io/temporal/samples/dsl/DslActivitiesImpl.java @@ -0,0 +1,87 @@ +/* + * Copyright (c) 2020 Temporal Technologies, Inc. All Rights Reserved + * + * Copyright 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Modifications copyright (C) 2017 Uber Technologies, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"). You may not + * use this file except in compliance with the License. A copy of the License is + * located at + * + * http://aws.amazon.com/apache2.0 + * + * or in the "license" file accompanying this file. This file is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +package io.temporal.samples.dsl; + +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.ObjectMapper; +import io.temporal.activity.Activity; + +public class DslActivitiesImpl implements DslActivities { + @Override + public JsonNode checkCustomerInfo() { + try { + ObjectMapper mapper = new ObjectMapper(); + return mapper.readTree( + getReturnJson(Activity.getExecutionContext().getInfo().getActivityType(), "invoked")); + } catch (Exception e) { + return null; + } + } + + @Override + public JsonNode updateApplicationInfo() { + try { + ObjectMapper mapper = new ObjectMapper(); + return mapper.readTree( + getReturnJson(Activity.getExecutionContext().getInfo().getActivityType(), "invoked")); + } catch (Exception e) { + return null; + } + } + + @Override + public JsonNode approveApplication() { + try { + ObjectMapper mapper = new ObjectMapper(); + return mapper.readTree( + getDecisionJson( + Activity.getExecutionContext().getInfo().getActivityType(), + "invoked", + "decision", + "APPROVED")); + } catch (Exception e) { + return null; + } + } + + @Override + public JsonNode rejectApplication() { + try { + ObjectMapper mapper = new ObjectMapper(); + return mapper.readTree( + getDecisionJson( + Activity.getExecutionContext().getInfo().getActivityType(), + "invoked", + "decision", + "DENIED")); + } catch (Exception e) { + return null; + } + } + + private String getReturnJson(String key, String value) { + return "{\n" + " \"" + key + "\": \"" + value + "\"\n" + "}"; + } + + private String getDecisionJson(String key1, String value1, String key2, String value2) { + return "{\n" + " \"" + key1 + "\": \"" + value1 + "\",\n" + " \"" + key2 + "\": \"" + value2 + + "\"\n" + "}"; + } +} diff --git a/src/main/java/io/temporal/samples/dsl/DslWorkflowCache.java b/src/main/java/io/temporal/samples/dsl/DslWorkflowCache.java new file mode 100644 index 00000000..faca0733 --- /dev/null +++ b/src/main/java/io/temporal/samples/dsl/DslWorkflowCache.java @@ -0,0 +1,50 @@ +/* + * Copyright (c) 2020 Temporal Technologies, Inc. All Rights Reserved + * + * Copyright 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Modifications copyright (C) 2017 Uber Technologies, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"). You may not + * use this file except in compliance with the License. A copy of the License is + * located at + * + * http://aws.amazon.com/apache2.0 + * + * or in the "license" file accompanying this file. This file is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +package io.temporal.samples.dsl; + +import static io.temporal.samples.dsl.DslWorkflowUtils.getFileAsString; + +import io.serverlessworkflow.api.Workflow; +import java.util.HashMap; +import java.util.Map; + +/** Class that loads up all the DSL workflows and allows access via id-version */ +public class DslWorkflowCache { + + private static class WorkflowHolder { + static final Map dslWorkflowMap = new HashMap<>(); + + static { + try { + Workflow customerApplicationWorkflow = + Workflow.fromSource(getFileAsString("dsl/customerapplication.yml")); + dslWorkflowMap.put( + customerApplicationWorkflow.getId() + "-" + customerApplicationWorkflow.getVersion(), + customerApplicationWorkflow); + } catch (Exception e) { + e.printStackTrace(); + } + } + } + + public static Workflow getWorkflow(String workflowId, String workflowVersion) { + return WorkflowHolder.dslWorkflowMap.get(workflowId + "-" + workflowVersion); + } +} diff --git a/src/main/java/io/temporal/samples/dsl/DslWorkflowUtils.java b/src/main/java/io/temporal/samples/dsl/DslWorkflowUtils.java new file mode 100644 index 00000000..13291312 --- /dev/null +++ b/src/main/java/io/temporal/samples/dsl/DslWorkflowUtils.java @@ -0,0 +1,151 @@ +/* + * Copyright (c) 2020 Temporal Technologies, Inc. All Rights Reserved + * + * Copyright 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Modifications copyright (C) 2017 Uber Technologies, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"). You may not + * use this file except in compliance with the License. A copy of the License is + * located at + * + * http://aws.amazon.com/apache2.0 + * + * or in the "license" file accompanying this file. This file is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +package io.temporal.samples.dsl; + +import com.fasterxml.jackson.databind.JsonNode; +import com.jayway.jsonpath.JsonPath; +import io.serverlessworkflow.api.Workflow; +import io.serverlessworkflow.api.interfaces.State; +import io.serverlessworkflow.api.retry.RetryDefinition; +import io.serverlessworkflow.api.states.EventState; +import io.temporal.activity.ActivityOptions; +import io.temporal.api.common.v1.WorkflowExecution; +import io.temporal.client.WorkflowOptions; +import io.temporal.client.WorkflowStub; +import io.temporal.common.RetryOptions; +import java.io.File; +import java.io.IOException; +import java.nio.charset.StandardCharsets; +import java.nio.file.Files; +import java.time.Duration; +import java.util.List; + +/** Provides utility methods for dealing with DSL */ +public class DslWorkflowUtils { + + /** Set workflow options from DSL */ + public static WorkflowOptions getWorkflowOptions(Workflow workflow) { + WorkflowOptions.Builder dslWorkflowOptionsBuilder = WorkflowOptions.newBuilder(); + + if (workflow.getId() != null) { + dslWorkflowOptionsBuilder.setWorkflowId(workflow.getId()); + } + + dslWorkflowOptionsBuilder.setTaskQueue(Worker.DEFAULT_TASK_QUEUE_NAME); + + if (workflow.getTimeouts() != null + && workflow.getTimeouts().getWorkflowExecTimeout() != null + && workflow.getTimeouts().getWorkflowExecTimeout().getDuration() != null) { + dslWorkflowOptionsBuilder.setWorkflowExecutionTimeout( + Duration.parse(workflow.getTimeouts().getWorkflowExecTimeout().getDuration())); + } + + if (workflow.getStart().getSchedule() != null + && workflow.getStart().getSchedule().getCron() != null) { + dslWorkflowOptionsBuilder.setCronSchedule( + workflow.getStart().getSchedule().getCron().getExpression()); + } + + return dslWorkflowOptionsBuilder.build(); + } + + /** Set Activity options from DSL */ + public static ActivityOptions getActivityOptionsFromDsl(Workflow dslWorkflow) { + ActivityOptions.Builder dslActivityOptionsBuilder = ActivityOptions.newBuilder(); + if (dslWorkflow.getTimeouts() != null + && dslWorkflow.getTimeouts().getActionExecTimeout() != null) { + dslActivityOptionsBuilder.setStartToCloseTimeout( + Duration.parse(dslWorkflow.getTimeouts().getActionExecTimeout())); + } + + // In SW spec each action (activity) can define a specific retry + // For this demo we just use the globally defined one for all actions + if (dslWorkflow.getRetries() != null + && dslWorkflow.getRetries().getRetryDefs() != null + && dslWorkflow.getRetries().getRetryDefs().size() > 0) { + RetryDefinition retryDefinition = dslWorkflow.getRetries().getRetryDefs().get(0); + RetryOptions.Builder dslRetryOptionsBuilder = RetryOptions.newBuilder(); + if (retryDefinition.getMaxAttempts() != null) { + dslRetryOptionsBuilder.setMaximumAttempts( + Integer.parseInt(retryDefinition.getMaxAttempts())); + } + dslRetryOptionsBuilder.setBackoffCoefficient(1.0); + if (retryDefinition.getDelay() != null) { + dslRetryOptionsBuilder.setInitialInterval(Duration.parse(retryDefinition.getDelay())); + } + if (retryDefinition.getMaxDelay() != null) { + dslRetryOptionsBuilder.setMaximumInterval(Duration.parse(retryDefinition.getMaxDelay())); + } + } + + return dslActivityOptionsBuilder.build(); + } + + /** Read file and return contents as string */ + public static String getFileAsString(String fileName) throws IOException { + File file = new File(Starter.class.getClassLoader().getResource(fileName).getFile()); + return new String(Files.readAllBytes(file.toPath()), StandardCharsets.UTF_8); + } + + /** Start workflow execution depending on the DSL */ + public static WorkflowExecution startWorkflow( + WorkflowStub workflowStub, Workflow dslWorkflow, JsonNode workflowInput) { + State startingDslWorkflowState = getStartingWorkflowState(dslWorkflow); + if (startingDslWorkflowState instanceof EventState) { + // This demo can parse only the first event + EventState eventState = (EventState) startingDslWorkflowState; + String eventName = eventState.getOnEvents().get(0).getEventRefs().get(0); + // send input data as signal data + return workflowStub.signalWithStart( + eventName, + new Object[] {workflowInput}, + new Object[] {dslWorkflow.getId(), dslWorkflow.getVersion()}); + } else { + // directly send input data to workflow + return workflowStub.start(dslWorkflow.getId(), dslWorkflow.getVersion(), workflowInput); + } + } + + /** Returns the starting workflow state from DSL */ + public static State getStartingWorkflowState(Workflow dslWorkflow) { + String start = dslWorkflow.getStart().getStateName(); + for (State state : dslWorkflow.getStates()) { + if (state.getName().equals(start)) { + return state; + } + } + return null; + } + + /** Returns the workflow state with the provided name or null */ + public static State getWorkflowStateWithName(String name, Workflow dslWorkflow) { + for (State state : dslWorkflow.getStates()) { + if (state.getName().equals(name)) { + return state; + } + } + return null; + } + + /** Evaluates a JsonPath expression to true/false, used for switch states data conditions */ + public static boolean isTrueDataCondition(String condition, String jsonData) { + return JsonPath.parse(jsonData).read(condition, List.class).size() > 0; + } +} diff --git a/src/main/java/io/temporal/samples/dsl/DynamicDslWorkflow.java b/src/main/java/io/temporal/samples/dsl/DynamicDslWorkflow.java new file mode 100644 index 00000000..8bb21459 --- /dev/null +++ b/src/main/java/io/temporal/samples/dsl/DynamicDslWorkflow.java @@ -0,0 +1,203 @@ +/* + * Copyright (c) 2020 Temporal Technologies, Inc. All Rights Reserved + * + * Copyright 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Modifications copyright (C) 2017 Uber Technologies, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"). You may not + * use this file except in compliance with the License. A copy of the License is + * located at + * + * http://aws.amazon.com/apache2.0 + * + * or in the "license" file accompanying this file. This file is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +package io.temporal.samples.dsl; + +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.node.ObjectNode; +import io.serverlessworkflow.api.actions.Action; +import io.serverlessworkflow.api.events.OnEvents; +import io.serverlessworkflow.api.interfaces.State; +import io.serverlessworkflow.api.states.EventState; +import io.serverlessworkflow.api.states.OperationState; +import io.serverlessworkflow.api.states.SwitchState; +import io.serverlessworkflow.api.switchconditions.DataCondition; +import io.temporal.activity.ActivityOptions; +import io.temporal.common.converter.EncodedValues; +import io.temporal.workflow.*; +import java.util.ArrayList; +import java.util.List; +import org.slf4j.Logger; + +public class DynamicDslWorkflow implements DynamicWorkflow { + + private io.serverlessworkflow.api.Workflow dslWorkflow; + private JsonNode workflowData; + private Logger logger = Workflow.getLogger(DynamicDslWorkflow.class); + ActivityStub activities; + + @Override + public Object execute(EncodedValues args) { + // Get first input and convert to SW Workflow object + String dslWorkflowId = args.get(0, String.class); + String dslWorkflowVersion = args.get(1, String.class); + // Get second input which is set to workflowData + workflowData = args.get(2, JsonNode.class); + + dslWorkflow = DslWorkflowCache.getWorkflow(dslWorkflowId, dslWorkflowVersion); + + // Register dynamic signal handler + // For demo signals input sets the workflowData + // Improvement can be to add to it instead + Workflow.registerListener( + (DynamicSignalHandler) + (signalName, encodedArgs) -> workflowData = encodedArgs.get(0, JsonNode.class)); + + // Get the activity options that are set from properties in dsl + ActivityOptions activityOptions = DslWorkflowUtils.getActivityOptionsFromDsl(dslWorkflow); + // Create a dynamic activities stub to be used for all actions in dsl + activities = Workflow.newUntypedActivityStub(activityOptions); + + // Start going through the dsl workflow states and execute depending on their instructions + executeDslWorkflowFrom(DslWorkflowUtils.getStartingWorkflowState(dslWorkflow)); + + // Return the final workflow data as result + return workflowData; + } + + /** Executes workflow according to the dsl control flow logic */ + private void executeDslWorkflowFrom(State dslWorkflowState) { + // This demo supports 3 states: Event State, Operation State and Switch state (data-based + // switch) + if (dslWorkflowState != null) { + // execute the state and return the next workflow state depending on control flow logic in dsl + // if next state is null it means that we need to stop execution + executeDslWorkflowFrom(executeStateAndReturnNext(dslWorkflowState)); + } else { + // done + return; + } + } + + private void addToWorkflowData(JsonNode toAdd) { + ((ObjectNode) workflowData).putAll(((ObjectNode) toAdd)); + } + + /** + * Executes the control flow logic for a dsl workflow state. Demo supports EventState, + * OperationState, and SwitchState currently. More can be added. + */ + private State executeStateAndReturnNext(State dslWorkflowState) { + if (dslWorkflowState instanceof EventState) { + EventState eventState = (EventState) dslWorkflowState; + // currently this demo supports only the first onEvents + if (eventState.getOnEvents() != null && eventState.getOnEvents().size() > 0) { + List eventStateActions = eventState.getOnEvents().get(0).getActions(); + if (eventState.getOnEvents().get(0).getActionMode() != null + && eventState + .getOnEvents() + .get(0) + .getActionMode() + .equals(OnEvents.ActionMode.PARALLEL)) { + List> eventPromises = new ArrayList<>(); + for (Action action : eventStateActions) { + eventPromises.add( + activities.executeAsync( + action.getFunctionRef().getRefName(), JsonNode.class, workflowData)); + } + // Invoke all activities in parallel. Wait for all to complete + Promise.allOf(eventPromises).get(); + + for (Promise promise : eventPromises) { + addToWorkflowData(promise.get()); + } + } else { + for (Action action : eventStateActions) { + // execute the action as an activity and assign its results to workflowData + addToWorkflowData( + activities.execute( + action.getFunctionRef().getRefName(), JsonNode.class, workflowData)); + } + } + } + if (eventState.getTransition() == null || eventState.getTransition().getNextState() == null) { + return null; + } + return DslWorkflowUtils.getWorkflowStateWithName( + eventState.getTransition().getNextState(), dslWorkflow); + + } else if (dslWorkflowState instanceof OperationState) { + OperationState operationState = (OperationState) dslWorkflowState; + if (operationState.getActions() != null && operationState.getActions().size() > 0) { + // Check if actions should be executed sequentially or parallel + if (operationState.getActionMode() != null + && operationState.getActionMode().equals(OperationState.ActionMode.PARALLEL)) { + List> actionsPromises = new ArrayList<>(); + for (Action action : operationState.getActions()) { + actionsPromises.add( + activities.executeAsync( + action.getFunctionRef().getRefName(), JsonNode.class, workflowData)); + } + // Invoke all activities in parallel. Wait for all to complete + Promise.allOf(actionsPromises).get(); + + for (Promise promise : actionsPromises) { + addToWorkflowData(promise.get()); + } + } else { + for (Action action : operationState.getActions()) { + // execute the action as an activity and assign its results to workflowData + addToWorkflowData( + activities.execute( + action.getFunctionRef().getRefName(), JsonNode.class, workflowData)); + } + } + } + if (operationState.getTransition() == null + || operationState.getTransition().getNextState() == null) { + return null; + } + return DslWorkflowUtils.getWorkflowStateWithName( + operationState.getTransition().getNextState(), dslWorkflow); + } else if (dslWorkflowState instanceof SwitchState) { + // Demo supports only data based switch + SwitchState switchState = (SwitchState) dslWorkflowState; + if (switchState.getDataConditions() != null && switchState.getDataConditions().size() > 0) { + // evaluate each condition to see if its true. If none are true default to defaultCondition + for (DataCondition dataCondition : switchState.getDataConditions()) { + if (DslWorkflowUtils.isTrueDataCondition( + dataCondition.getCondition(), workflowData.toPrettyString())) { + if (dataCondition.getTransition() == null + || dataCondition.getTransition().getNextState() == null) { + return null; + } + return DslWorkflowUtils.getWorkflowStateWithName( + dataCondition.getTransition().getNextState(), dslWorkflow); + } + } + // no conditions evaluated to true, use default condition + if (switchState.getDefaultCondition().getTransition() == null) { + return null; + } + return DslWorkflowUtils.getWorkflowStateWithName( + switchState.getDefaultCondition().getTransition().getNextState(), dslWorkflow); + } else { + // no conditions use the transition/end of default condition + if (switchState.getDefaultCondition().getTransition() == null) { + return null; + } + return DslWorkflowUtils.getWorkflowStateWithName( + switchState.getDefaultCondition().getTransition().getNextState(), dslWorkflow); + } + } else { + logger.error("Invalid or unsupported in demo dsl workflow state: " + dslWorkflowState); + return null; + } + } +} diff --git a/src/main/java/io/temporal/samples/dsl/README.md b/src/main/java/io/temporal/samples/dsl/README.md new file mode 100644 index 00000000..1d1d06b3 --- /dev/null +++ b/src/main/java/io/temporal/samples/dsl/README.md @@ -0,0 +1,57 @@ +# DSL Sample + +This sample demonstrates of workflow execution where execution semantics are +provided by a DSL. + +This sample uses the CNCF Serverless Workflow (serverlessworkflow.io) specification DSL and its Java SDL +which helps us parse the DSL into an object model as well as provides validation. + +Since this is just a sample, it only provides ability to use some parts of the SW DSL. + +You can see the JSON and YAML DSL workflows in /src/main/resources/dsl/, namely +`customerapplication.json` and `customerapplication.yml`. The +`datainput.json` file in this directory is the JSON used as input to our workflow. + +You can play with the value of `age` of the customer in `datainput.json` to set it below 20 years of age +to see how their application will be rejected in that case. + +Note that with most DSLs the workflow data is JSON, meaning that in order to manipulate this data +we need to use some sort of expression languages inside the DSL to define data manipulation expressions. +For this sample we use JsonPath, but other expression languages can be plugged in as well. + +## Run the sample + +1. Start the Worker: + +```bash +./gradlew -q execute -PmainClass=io.temporal.samples.dsl.Worker +``` + +2. Start the Starter + +```bash +./gradlew -q execute -PmainClass=io.temporal.samples.dsl.Starter +``` + +If the age of the customer is set to >= 20 the application will be approved, if set to < 20 it will be rejected. +The results of the workflow will include the updated `applicationStatus`, and an added array which shows +all the activities that were executed (corespond to actions in the DSL), for example: + +```json +{ + "customer" : { + "name" : "John", + "age" : 22 + }, + "CheckCustomerInfo" : "invoked", + "UpdateApplicationInfo" : "invoked", + "ApproveApplication" : "invoked", + "decision" : "APPROVED" +} +``` + +Is a result of a run where customer age was >= 20. In this case the decision was set to "APPROVED". + + + + diff --git a/src/main/java/io/temporal/samples/dsl/Starter.java b/src/main/java/io/temporal/samples/dsl/Starter.java new file mode 100644 index 00000000..6c06d4d3 --- /dev/null +++ b/src/main/java/io/temporal/samples/dsl/Starter.java @@ -0,0 +1,87 @@ +/* + * Copyright (c) 2020 Temporal Technologies, Inc. All Rights Reserved + * + * Copyright 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Modifications copyright (C) 2017 Uber Technologies, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"). You may not + * use this file except in compliance with the License. A copy of the License is + * located at + * + * http://aws.amazon.com/apache2.0 + * + * or in the "license" file accompanying this file. This file is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +package io.temporal.samples.dsl; + +import static io.temporal.samples.dsl.DslWorkflowUtils.*; + +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.ObjectMapper; +import io.serverlessworkflow.api.Workflow; +import io.serverlessworkflow.api.interfaces.WorkflowValidator; +import io.serverlessworkflow.api.validation.ValidationError; +import io.serverlessworkflow.validation.WorkflowValidatorImpl; +import io.temporal.client.WorkflowClient; +import io.temporal.client.WorkflowOptions; +import io.temporal.client.WorkflowStub; +import io.temporal.serviceclient.WorkflowServiceStubs; +import io.temporal.worker.WorkerFactory; +import java.util.List; + +public class Starter { + + public static final WorkflowServiceStubs service = WorkflowServiceStubs.newInstance(); + public static final WorkflowClient client = WorkflowClient.newInstance(service); + public static final WorkerFactory factory = WorkerFactory.newInstance(client); + + public static void main(String[] args) { + try { + // Get the workflow dsl from cache + Workflow dslWorkflow = DslWorkflowCache.getWorkflow("customerapplication", "1.0"); + + // Validate dsl + WorkflowValidator dslWorkflowValidator = new WorkflowValidatorImpl(); + if (!dslWorkflowValidator.setWorkflow(dslWorkflow).isValid()) { + System.err.println( + "Workflow DSL not valid. Consult github.com/serverlessworkflow/specification/blob/main/specification.md for more info"); + List validationErrorList = + dslWorkflowValidator.setWorkflow(dslWorkflow).validate(); + for (ValidationError error : validationErrorList) { + System.out.println("Error: " + error.getMessage()); + } + System.exit(1); + } + + WorkflowOptions workflowOptions = getWorkflowOptions(dslWorkflow); + + WorkflowStub workflowStub = + client.newUntypedWorkflowStub(dslWorkflow.getName(), workflowOptions); + + // Start workflow execution + startWorkflow(workflowStub, dslWorkflow, getSampleWorkflowInput()); + + // Wait for workflow to finish + JsonNode result = workflowStub.getResult(JsonNode.class); + // Print workflow results + System.out.println("Workflow Results: \n" + result.toPrettyString()); + + } catch (Exception e) { + System.err.println("Error: " + e.getMessage()); + System.exit(1); + } + + System.exit(0); + } + + private static JsonNode getSampleWorkflowInput() throws Exception { + String workflowDataInput = getFileAsString("dsl/datainput.json"); + ObjectMapper objectMapper = new ObjectMapper(); + return objectMapper.readTree(workflowDataInput); + } +} diff --git a/src/main/java/io/temporal/samples/dsl/Worker.java b/src/main/java/io/temporal/samples/dsl/Worker.java new file mode 100644 index 00000000..a83d71bc --- /dev/null +++ b/src/main/java/io/temporal/samples/dsl/Worker.java @@ -0,0 +1,39 @@ +/* + * Copyright (c) 2020 Temporal Technologies, Inc. All Rights Reserved + * + * Copyright 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Modifications copyright (C) 2017 Uber Technologies, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"). You may not + * use this file except in compliance with the License. A copy of the License is + * located at + * + * http://aws.amazon.com/apache2.0 + * + * or in the "license" file accompanying this file. This file is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +package io.temporal.samples.dsl; + +import io.temporal.client.WorkflowClient; +import io.temporal.serviceclient.WorkflowServiceStubs; +import io.temporal.worker.WorkerFactory; + +public class Worker { + private static final WorkflowServiceStubs service = WorkflowServiceStubs.newInstance(); + private static final WorkflowClient client = WorkflowClient.newInstance(service); + private static final WorkerFactory factory = WorkerFactory.newInstance(client); + public static final String DEFAULT_TASK_QUEUE_NAME = "dsltaskqueue"; + + public static void main(String[] args) { + io.temporal.worker.Worker worker = factory.newWorker(DEFAULT_TASK_QUEUE_NAME); + worker.registerWorkflowImplementationTypes(DynamicDslWorkflow.class); + worker.registerActivitiesImplementations(new DslActivitiesImpl()); + + factory.start(); + } +} diff --git a/src/main/java/io/temporal/samples/terminateworkflow/Starter.java b/src/main/java/io/temporal/samples/terminateworkflow/Starter.java index bfbfafea..a29a0995 100644 --- a/src/main/java/io/temporal/samples/terminateworkflow/Starter.java +++ b/src/main/java/io/temporal/samples/terminateworkflow/Starter.java @@ -67,9 +67,7 @@ public static void main(String[] args) { System.exit(0); } - /** - * This method creates a Worker from the factory. - */ + /** This method creates a Worker from the factory. */ private static void createWorker() { Worker worker = factory.newWorker(TASK_QUEUE); worker.registerWorkflowImplementationTypes(MyWorkflowImpl.class); @@ -79,6 +77,7 @@ private static void createWorker() { /** * Convenience method to sleep for a number of seconds. + * * @param seconds */ private static void sleepSeconds(int seconds) { @@ -90,8 +89,9 @@ private static void sleepSeconds(int seconds) { } /** - * This method uses DescribeWorkflowExecutionRequest to get the status of a workflow - * given a WorkflowExecution. + * This method uses DescribeWorkflowExecutionRequest to get the status of a workflow given a + * WorkflowExecution. + * * @param execution * @return Workflow status */ diff --git a/src/main/resources/dsl/customerapplication.json b/src/main/resources/dsl/customerapplication.json new file mode 100644 index 00000000..c7d6d6e1 --- /dev/null +++ b/src/main/resources/dsl/customerapplication.json @@ -0,0 +1,104 @@ +{ + "id": "customerapplication", + "name": "Customer Application Workflow", + "version": "1.0", + "timeouts": { + "workflowExecTimeout": { + "duration": "PT1M" + }, + "actionExecTimeout": "PT10S" + }, + "retries": [ + { + "name": "WorkflowRetries", + "delay": "PT3S", + "maxAttempts": 10 + } + ], + "start": "NewCustomerApplication", + "states": [ + { + "name": "NewCustomerApplication", + "type": "event", + "onEvents": [{ + "eventRefs": ["NewApplicationEvent"], + "actionMode": "parallel", + "actions":[ + { + "name": "Invoke Check Customer Info Function", + "functionRef": "CheckCustomerInfo" + }, + { + "name": "Invoke Update Application Info Function", + "functionRef": "UpdateApplicationInfo" + } + ] + }], + "transition": "MakeApplicationDecision" + }, + { + "name": "MakeApplicationDecision", + "type": "switch", + "dataConditions": [ + { + "condition": "$..[?(@.age >= 20)]", + "transition": "ApproveApplication" + }, + { + "condition": "$..[?(@.age < 20)]", + "transition": "RejectApplication" + } + ], + "defaultCondition": { + "transition": "RejectApplication" + } + }, + { + "name": "ApproveApplication", + "type": "operation", + "actions": [ + { + "name": "Invoke Approve Application Function", + "functionRef": "ApproveApplication" + } + ], + "end": true + }, + { + "name": "RejectApplication", + "type": "operation", + "actions": [ + { + "name": "Invoke Reject Application Function", + "functionRef": "RejectApplication" + } + ], + "end": true + } + ], + "functions": [ + { + "name": "CheckCustomerInfo", + "type": "rest" + }, + { + "name": "UpdateApplicationInfo", + "type": "rest" + }, + { + "name": "ApproveApplication", + "type": "rest" + }, + { + "name": "RejectApplication", + "type": "rest" + } + ], + "events": [ + { + "name": "NewApplicationEvent", + "type": "com.fasterxml.jackson.databind.JsonNode", + "source": "applicationsSource" + } + ] +} \ No newline at end of file diff --git a/src/main/resources/dsl/customerapplication.yml b/src/main/resources/dsl/customerapplication.yml new file mode 100644 index 00000000..898acd66 --- /dev/null +++ b/src/main/resources/dsl/customerapplication.yml @@ -0,0 +1,59 @@ +id: customerapplication +name: Customer Application Workflow +version: '1.0' +timeouts: + workflowExecTimeout: + duration: PT1M + actionExecTimeout: PT10S +retries: + - name: WorkflowRetries + delay: PT3S + maxAttempts: 10 +start: NewCustomerApplication +states: + - name: NewCustomerApplication + type: event + onEvents: + - eventRefs: + - NewApplicationEvent + actionMode: parallel + actions: + - name: Invoke Check Customer Info Function + functionRef: CheckCustomerInfo + - name: Invoke Update Application Info Function + functionRef: UpdateApplicationInfo + transition: MakeApplicationDecision + - name: MakeApplicationDecision + type: switch + dataConditions: + - condition: "$..[?(@.age >= 20)]" + transition: ApproveApplication + - condition: "$..[?(@.age < 20)]" + transition: RejectApplication + defaultCondition: + transition: RejectApplication + - name: ApproveApplication + type: operation + actions: + - name: Invoke Approve Application Function + functionRef: ApproveApplication + end: true + - name: RejectApplication + type: operation + actions: + - name: Invoke Reject Application Function + functionRef: RejectApplication + end: true +functions: + - name: CheckCustomerInfo + type: rest + - name: UpdateApplicationInfo + type: rest + - name: ApproveApplication + type: rest + - name: RejectApplication + type: rest +events: + - name: NewApplicationEvent + type: com.fasterxml.jackson.databind.JsonNode + source: applicationsSource diff --git a/src/main/resources/dsl/datainput.json b/src/main/resources/dsl/datainput.json new file mode 100644 index 00000000..9db59ad5 --- /dev/null +++ b/src/main/resources/dsl/datainput.json @@ -0,0 +1,6 @@ +{ + "customer": { + "name": "John", + "age": 22 + } +} \ No newline at end of file diff --git a/src/test/java/io/temporal/samples/dsl/DslWorkflowTest.java b/src/test/java/io/temporal/samples/dsl/DslWorkflowTest.java new file mode 100644 index 00000000..d65c38b1 --- /dev/null +++ b/src/test/java/io/temporal/samples/dsl/DslWorkflowTest.java @@ -0,0 +1,82 @@ +/* + * Copyright (c) 2020 Temporal Technologies, Inc. All Rights Reserved + * + * Copyright 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Modifications copyright (C) 2017 Uber Technologies, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"). You may not + * use this file except in compliance with the License. A copy of the License is + * located at + * + * http://aws.amazon.com/apache2.0 + * + * or in the "license" file accompanying this file. This file is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +package io.temporal.samples.dsl; + +import static org.junit.Assert.*; + +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.ObjectMapper; +import io.serverlessworkflow.api.Workflow; +import io.temporal.client.WorkflowOptions; +import io.temporal.client.WorkflowStub; +import io.temporal.testing.TestWorkflowRule; +import java.io.File; +import java.io.IOException; +import java.nio.charset.StandardCharsets; +import java.nio.file.Files; +import org.junit.Rule; +import org.junit.Test; + +public class DslWorkflowTest { + + @Rule + public TestWorkflowRule testWorkflowRule = + TestWorkflowRule.newBuilder() + .setWorkflowTypes(DynamicDslWorkflow.class) + .setActivityImplementations(new DslActivitiesImpl()) + .build(); + + @Test + public void testDslWorkflow() throws Exception { + Workflow dslWorkflow = DslWorkflowCache.getWorkflow("customerapplication", "1.0"); + + WorkflowOptions workflowOptions = + WorkflowOptions.newBuilder().setTaskQueue(testWorkflowRule.getTaskQueue()).build(); + + WorkflowStub workflow = + testWorkflowRule + .getWorkflowClient() + .newUntypedWorkflowStub(dslWorkflow.getName(), workflowOptions); + + workflow.start(dslWorkflow.getId(), dslWorkflow.getVersion(), getSampleWorkflowInput()); + + JsonNode result = workflow.getResult(JsonNode.class); + + assertNotNull(result); + assertNotNull(result.get("customer")); + assertNotNull(result.get("decision")); + assertEquals("APPROVED", result.get("decision").asText()); + + assertNotNull(result.get("CheckCustomerInfo")); + assertNotNull(result.get("UpdateApplicationInfo")); + assertNotNull(result.get("ApproveApplication")); + } + + private static String getFileAsString(String fileName) throws IOException { + File file = new File(DslWorkflowTest.class.getClassLoader().getResource(fileName).getFile()); + return new String(Files.readAllBytes(file.toPath()), StandardCharsets.UTF_8); + } + + private static JsonNode getSampleWorkflowInput() throws Exception { + String workflowDataInput = getFileAsString("dsl/datainput.json"); + ObjectMapper objectMapper = new ObjectMapper(); + return objectMapper.readTree(workflowDataInput); + } +} diff --git a/src/test/resources/dsl/customerapplication.json b/src/test/resources/dsl/customerapplication.json new file mode 100644 index 00000000..c7d6d6e1 --- /dev/null +++ b/src/test/resources/dsl/customerapplication.json @@ -0,0 +1,104 @@ +{ + "id": "customerapplication", + "name": "Customer Application Workflow", + "version": "1.0", + "timeouts": { + "workflowExecTimeout": { + "duration": "PT1M" + }, + "actionExecTimeout": "PT10S" + }, + "retries": [ + { + "name": "WorkflowRetries", + "delay": "PT3S", + "maxAttempts": 10 + } + ], + "start": "NewCustomerApplication", + "states": [ + { + "name": "NewCustomerApplication", + "type": "event", + "onEvents": [{ + "eventRefs": ["NewApplicationEvent"], + "actionMode": "parallel", + "actions":[ + { + "name": "Invoke Check Customer Info Function", + "functionRef": "CheckCustomerInfo" + }, + { + "name": "Invoke Update Application Info Function", + "functionRef": "UpdateApplicationInfo" + } + ] + }], + "transition": "MakeApplicationDecision" + }, + { + "name": "MakeApplicationDecision", + "type": "switch", + "dataConditions": [ + { + "condition": "$..[?(@.age >= 20)]", + "transition": "ApproveApplication" + }, + { + "condition": "$..[?(@.age < 20)]", + "transition": "RejectApplication" + } + ], + "defaultCondition": { + "transition": "RejectApplication" + } + }, + { + "name": "ApproveApplication", + "type": "operation", + "actions": [ + { + "name": "Invoke Approve Application Function", + "functionRef": "ApproveApplication" + } + ], + "end": true + }, + { + "name": "RejectApplication", + "type": "operation", + "actions": [ + { + "name": "Invoke Reject Application Function", + "functionRef": "RejectApplication" + } + ], + "end": true + } + ], + "functions": [ + { + "name": "CheckCustomerInfo", + "type": "rest" + }, + { + "name": "UpdateApplicationInfo", + "type": "rest" + }, + { + "name": "ApproveApplication", + "type": "rest" + }, + { + "name": "RejectApplication", + "type": "rest" + } + ], + "events": [ + { + "name": "NewApplicationEvent", + "type": "com.fasterxml.jackson.databind.JsonNode", + "source": "applicationsSource" + } + ] +} \ No newline at end of file diff --git a/src/test/resources/dsl/customerapplication.yml b/src/test/resources/dsl/customerapplication.yml new file mode 100644 index 00000000..898acd66 --- /dev/null +++ b/src/test/resources/dsl/customerapplication.yml @@ -0,0 +1,59 @@ +id: customerapplication +name: Customer Application Workflow +version: '1.0' +timeouts: + workflowExecTimeout: + duration: PT1M + actionExecTimeout: PT10S +retries: + - name: WorkflowRetries + delay: PT3S + maxAttempts: 10 +start: NewCustomerApplication +states: + - name: NewCustomerApplication + type: event + onEvents: + - eventRefs: + - NewApplicationEvent + actionMode: parallel + actions: + - name: Invoke Check Customer Info Function + functionRef: CheckCustomerInfo + - name: Invoke Update Application Info Function + functionRef: UpdateApplicationInfo + transition: MakeApplicationDecision + - name: MakeApplicationDecision + type: switch + dataConditions: + - condition: "$..[?(@.age >= 20)]" + transition: ApproveApplication + - condition: "$..[?(@.age < 20)]" + transition: RejectApplication + defaultCondition: + transition: RejectApplication + - name: ApproveApplication + type: operation + actions: + - name: Invoke Approve Application Function + functionRef: ApproveApplication + end: true + - name: RejectApplication + type: operation + actions: + - name: Invoke Reject Application Function + functionRef: RejectApplication + end: true +functions: + - name: CheckCustomerInfo + type: rest + - name: UpdateApplicationInfo + type: rest + - name: ApproveApplication + type: rest + - name: RejectApplication + type: rest +events: + - name: NewApplicationEvent + type: com.fasterxml.jackson.databind.JsonNode + source: applicationsSource diff --git a/src/test/resources/dsl/datainput.json b/src/test/resources/dsl/datainput.json new file mode 100644 index 00000000..9db59ad5 --- /dev/null +++ b/src/test/resources/dsl/datainput.json @@ -0,0 +1,6 @@ +{ + "customer": { + "name": "John", + "age": 22 + } +} \ No newline at end of file