Skip to content

Commit

Permalink
DSL sample using Serverless Workflo spec (#151)
Browse files Browse the repository at this point in the history
* DSL sample using Serverless Workflow DSL spec

Signed-off-by: Tihomir Surdilovic <[email protected]>

* replaced use of dynamic activity with concrete activity

Signed-off-by: Tihomir Surdilovic <[email protected]>

* fix readme

Signed-off-by: Tihomir Surdilovic <[email protected]>

* further updates per comments

Signed-off-by: Tihomir Surdilovic <[email protected]>

* further updates

Signed-off-by: Tihomir Surdilovic <[email protected]>

* updated workflow

Signed-off-by: Tihomir Surdilovic <[email protected]>

* dont pass dsl workflow around

Signed-off-by: Tihomir Surdilovic <[email protected]>

* adding parallel activity exec and activities no longer update workflow data directly

Signed-off-by: Tihomir Surdilovic <[email protected]>

* updated activities results

Signed-off-by: Tihomir Surdilovic <[email protected]>
  • Loading branch information
tsurdilo authored Sep 9, 2021
1 parent a24ca89 commit a461e04
Show file tree
Hide file tree
Showing 17 changed files with 1,136 additions and 2 deletions.
2 changes: 2 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,8 @@ All tests are available under [src/test/java](https://github.com/temporalio/samp

- [**Money Batch**](https://github.com/temporalio/samples-java/tree/master/src/main/java/io/temporal/samples/moneybatch): Demonstrates a situation where a single deposit should be initiated for multiple withdrawals. For example, a seller might want to be paid once per fixed number of transactions. This sample can be easily extended to perform a payment based on more complex criteria, such as at a specific time or an accumulated amount. The sample also demonstrates how to Signal the Workflow when it executes (*Signal with start*). If the Workflow is already executing, it just receives the Signal. If it is not executing, then the Workflow executes first, and then the Signal is delivered to it. *Signal with start* is a "lazy" way to execute Workflows when Signaling them.

- [**Customer Application Approval DSL**](https://github.com/temporalio/samples-java/tree/master/src/main/java/io/temporal/samples/dsl): Demonstrates execution of a customer application approval workflow defined in a DSL (like JSON or YAML)

### API demonstrations

- [**Updatable Timer**](https://github.com/temporalio/samples-java/tree/master/src/main/java/io/temporal/samples/updatabletimer): Demonstrates the use of a helper class which relies on `Workflow.await` to implement a blocking sleep that can be updated at any moment.
Expand Down
8 changes: 6 additions & 2 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,10 @@ dependencies {
implementation group: 'io.cloudevents', name: 'cloudevents-core', version: '2.2.0'
implementation group: 'io.cloudevents', name: 'cloudevents-api', version: '2.2.0'
implementation group: 'io.cloudevents', name: 'cloudevents-json-jackson', version: '2.2.0'
implementation group: 'io.serverlessworkflow', name: 'serverlessworkflow-api', version: '3.0.0.Final'
implementation group: 'io.serverlessworkflow', name: 'serverlessworkflow-validation', version: '3.0.0.Final'
implementation group: 'io.serverlessworkflow', name: 'serverlessworkflow-spi', version: '3.0.0.Final'
implementation group: 'com.jayway.jsonpath', name: 'json-path', version: '2.6.0'

testImplementation group: 'io.temporal', name: 'temporal-testing', version: '1.2.0'
testImplementation group: 'io.temporal', name: 'temporal-testing-junit4', version: '1.2.0'
Expand All @@ -44,8 +48,6 @@ dependencies {
testImplementation group: 'org.mockito', name: 'mockito-core', version: '3.12.4'
testImplementation group: 'org.powermock', name: 'powermock-api-mockito2', version: '2.0.9'



testImplementation 'org.junit.jupiter:junit-jupiter-api:5.8.0-M1'
testRuntimeOnly 'org.junit.jupiter:junit-jupiter-engine:5.8.0-M1'
testCompileOnly 'junit:junit:4.13.2'
Expand All @@ -70,6 +72,8 @@ task execute(type: JavaExec) {

license {
header rootProject.file('license-header.txt')
exclude '**/*.json'
exclude '**/*.yml'
}

// Executes most of the hello workflows as a sanity check
Expand Down
34 changes: 34 additions & 0 deletions src/main/java/io/temporal/samples/dsl/DslActivities.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
/*
* Copyright (c) 2020 Temporal Technologies, Inc. All Rights Reserved
*
* Copyright 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* Modifications copyright (C) 2017 Uber Technologies, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License"). You may not
* use this file except in compliance with the License. A copy of the License is
* located at
*
* http://aws.amazon.com/apache2.0
*
* or in the "license" file accompanying this file. This file is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
* express or implied. See the License for the specific language governing
* permissions and limitations under the License.
*/

package io.temporal.samples.dsl;

import com.fasterxml.jackson.databind.JsonNode;
import io.temporal.activity.ActivityInterface;

@ActivityInterface
public interface DslActivities {
JsonNode checkCustomerInfo();

JsonNode approveApplication();

JsonNode rejectApplication();

JsonNode updateApplicationInfo();
}
87 changes: 87 additions & 0 deletions src/main/java/io/temporal/samples/dsl/DslActivitiesImpl.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
/*
* Copyright (c) 2020 Temporal Technologies, Inc. All Rights Reserved
*
* Copyright 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* Modifications copyright (C) 2017 Uber Technologies, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License"). You may not
* use this file except in compliance with the License. A copy of the License is
* located at
*
* http://aws.amazon.com/apache2.0
*
* or in the "license" file accompanying this file. This file is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
* express or implied. See the License for the specific language governing
* permissions and limitations under the License.
*/

package io.temporal.samples.dsl;

import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import io.temporal.activity.Activity;

public class DslActivitiesImpl implements DslActivities {
@Override
public JsonNode checkCustomerInfo() {
try {
ObjectMapper mapper = new ObjectMapper();
return mapper.readTree(
getReturnJson(Activity.getExecutionContext().getInfo().getActivityType(), "invoked"));
} catch (Exception e) {
return null;
}
}

@Override
public JsonNode updateApplicationInfo() {
try {
ObjectMapper mapper = new ObjectMapper();
return mapper.readTree(
getReturnJson(Activity.getExecutionContext().getInfo().getActivityType(), "invoked"));
} catch (Exception e) {
return null;
}
}

@Override
public JsonNode approveApplication() {
try {
ObjectMapper mapper = new ObjectMapper();
return mapper.readTree(
getDecisionJson(
Activity.getExecutionContext().getInfo().getActivityType(),
"invoked",
"decision",
"APPROVED"));
} catch (Exception e) {
return null;
}
}

@Override
public JsonNode rejectApplication() {
try {
ObjectMapper mapper = new ObjectMapper();
return mapper.readTree(
getDecisionJson(
Activity.getExecutionContext().getInfo().getActivityType(),
"invoked",
"decision",
"DENIED"));
} catch (Exception e) {
return null;
}
}

private String getReturnJson(String key, String value) {
return "{\n" + " \"" + key + "\": \"" + value + "\"\n" + "}";
}

private String getDecisionJson(String key1, String value1, String key2, String value2) {
return "{\n" + " \"" + key1 + "\": \"" + value1 + "\",\n" + " \"" + key2 + "\": \"" + value2
+ "\"\n" + "}";
}
}
50 changes: 50 additions & 0 deletions src/main/java/io/temporal/samples/dsl/DslWorkflowCache.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
/*
* Copyright (c) 2020 Temporal Technologies, Inc. All Rights Reserved
*
* Copyright 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* Modifications copyright (C) 2017 Uber Technologies, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License"). You may not
* use this file except in compliance with the License. A copy of the License is
* located at
*
* http://aws.amazon.com/apache2.0
*
* or in the "license" file accompanying this file. This file is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
* express or implied. See the License for the specific language governing
* permissions and limitations under the License.
*/

package io.temporal.samples.dsl;

import static io.temporal.samples.dsl.DslWorkflowUtils.getFileAsString;

import io.serverlessworkflow.api.Workflow;
import java.util.HashMap;
import java.util.Map;

/** Class that loads up all the DSL workflows and allows access via id-version */
public class DslWorkflowCache {

private static class WorkflowHolder {
static final Map<String, Workflow> dslWorkflowMap = new HashMap<>();

static {
try {
Workflow customerApplicationWorkflow =
Workflow.fromSource(getFileAsString("dsl/customerapplication.yml"));
dslWorkflowMap.put(
customerApplicationWorkflow.getId() + "-" + customerApplicationWorkflow.getVersion(),
customerApplicationWorkflow);
} catch (Exception e) {
e.printStackTrace();
}
}
}

public static Workflow getWorkflow(String workflowId, String workflowVersion) {
return WorkflowHolder.dslWorkflowMap.get(workflowId + "-" + workflowVersion);
}
}
151 changes: 151 additions & 0 deletions src/main/java/io/temporal/samples/dsl/DslWorkflowUtils.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,151 @@
/*
* Copyright (c) 2020 Temporal Technologies, Inc. All Rights Reserved
*
* Copyright 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* Modifications copyright (C) 2017 Uber Technologies, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License"). You may not
* use this file except in compliance with the License. A copy of the License is
* located at
*
* http://aws.amazon.com/apache2.0
*
* or in the "license" file accompanying this file. This file is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
* express or implied. See the License for the specific language governing
* permissions and limitations under the License.
*/

package io.temporal.samples.dsl;

import com.fasterxml.jackson.databind.JsonNode;
import com.jayway.jsonpath.JsonPath;
import io.serverlessworkflow.api.Workflow;
import io.serverlessworkflow.api.interfaces.State;
import io.serverlessworkflow.api.retry.RetryDefinition;
import io.serverlessworkflow.api.states.EventState;
import io.temporal.activity.ActivityOptions;
import io.temporal.api.common.v1.WorkflowExecution;
import io.temporal.client.WorkflowOptions;
import io.temporal.client.WorkflowStub;
import io.temporal.common.RetryOptions;
import java.io.File;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.time.Duration;
import java.util.List;

/** Provides utility methods for dealing with DSL */
public class DslWorkflowUtils {

/** Set workflow options from DSL */
public static WorkflowOptions getWorkflowOptions(Workflow workflow) {
WorkflowOptions.Builder dslWorkflowOptionsBuilder = WorkflowOptions.newBuilder();

if (workflow.getId() != null) {
dslWorkflowOptionsBuilder.setWorkflowId(workflow.getId());
}

dslWorkflowOptionsBuilder.setTaskQueue(Worker.DEFAULT_TASK_QUEUE_NAME);

if (workflow.getTimeouts() != null
&& workflow.getTimeouts().getWorkflowExecTimeout() != null
&& workflow.getTimeouts().getWorkflowExecTimeout().getDuration() != null) {
dslWorkflowOptionsBuilder.setWorkflowExecutionTimeout(
Duration.parse(workflow.getTimeouts().getWorkflowExecTimeout().getDuration()));
}

if (workflow.getStart().getSchedule() != null
&& workflow.getStart().getSchedule().getCron() != null) {
dslWorkflowOptionsBuilder.setCronSchedule(
workflow.getStart().getSchedule().getCron().getExpression());
}

return dslWorkflowOptionsBuilder.build();
}

/** Set Activity options from DSL */
public static ActivityOptions getActivityOptionsFromDsl(Workflow dslWorkflow) {
ActivityOptions.Builder dslActivityOptionsBuilder = ActivityOptions.newBuilder();
if (dslWorkflow.getTimeouts() != null
&& dslWorkflow.getTimeouts().getActionExecTimeout() != null) {
dslActivityOptionsBuilder.setStartToCloseTimeout(
Duration.parse(dslWorkflow.getTimeouts().getActionExecTimeout()));
}

// In SW spec each action (activity) can define a specific retry
// For this demo we just use the globally defined one for all actions
if (dslWorkflow.getRetries() != null
&& dslWorkflow.getRetries().getRetryDefs() != null
&& dslWorkflow.getRetries().getRetryDefs().size() > 0) {
RetryDefinition retryDefinition = dslWorkflow.getRetries().getRetryDefs().get(0);
RetryOptions.Builder dslRetryOptionsBuilder = RetryOptions.newBuilder();
if (retryDefinition.getMaxAttempts() != null) {
dslRetryOptionsBuilder.setMaximumAttempts(
Integer.parseInt(retryDefinition.getMaxAttempts()));
}
dslRetryOptionsBuilder.setBackoffCoefficient(1.0);
if (retryDefinition.getDelay() != null) {
dslRetryOptionsBuilder.setInitialInterval(Duration.parse(retryDefinition.getDelay()));
}
if (retryDefinition.getMaxDelay() != null) {
dslRetryOptionsBuilder.setMaximumInterval(Duration.parse(retryDefinition.getMaxDelay()));
}
}

return dslActivityOptionsBuilder.build();
}

/** Read file and return contents as string */
public static String getFileAsString(String fileName) throws IOException {
File file = new File(Starter.class.getClassLoader().getResource(fileName).getFile());
return new String(Files.readAllBytes(file.toPath()), StandardCharsets.UTF_8);
}

/** Start workflow execution depending on the DSL */
public static WorkflowExecution startWorkflow(
WorkflowStub workflowStub, Workflow dslWorkflow, JsonNode workflowInput) {
State startingDslWorkflowState = getStartingWorkflowState(dslWorkflow);
if (startingDslWorkflowState instanceof EventState) {
// This demo can parse only the first event
EventState eventState = (EventState) startingDslWorkflowState;
String eventName = eventState.getOnEvents().get(0).getEventRefs().get(0);
// send input data as signal data
return workflowStub.signalWithStart(
eventName,
new Object[] {workflowInput},
new Object[] {dslWorkflow.getId(), dslWorkflow.getVersion()});
} else {
// directly send input data to workflow
return workflowStub.start(dslWorkflow.getId(), dslWorkflow.getVersion(), workflowInput);
}
}

/** Returns the starting workflow state from DSL */
public static State getStartingWorkflowState(Workflow dslWorkflow) {
String start = dslWorkflow.getStart().getStateName();
for (State state : dslWorkflow.getStates()) {
if (state.getName().equals(start)) {
return state;
}
}
return null;
}

/** Returns the workflow state with the provided name or null */
public static State getWorkflowStateWithName(String name, Workflow dslWorkflow) {
for (State state : dslWorkflow.getStates()) {
if (state.getName().equals(name)) {
return state;
}
}
return null;
}

/** Evaluates a JsonPath expression to true/false, used for switch states data conditions */
public static boolean isTrueDataCondition(String condition, String jsonData) {
return JsonPath.parse(jsonData).read(condition, List.class).size() > 0;
}
}
Loading

0 comments on commit a461e04

Please sign in to comment.