Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

DSL sample using Serverless Workflo spec #151

Merged
merged 9 commits into from
Sep 9, 2021
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,8 @@ All tests are available under [src/test/java](https://github.com/temporalio/samp

- [**Money Batch**](https://github.com/temporalio/samples-java/tree/master/src/main/java/io/temporal/samples/moneybatch): Demonstrates a situation where a single deposit should be initiated for multiple withdrawals. For example, a seller might want to be paid once per fixed number of transactions. This sample can be easily extended to perform a payment based on more complex criteria, such as at a specific time or an accumulated amount. The sample also demonstrates how to Signal the Workflow when it executes (*Signal with start*). If the Workflow is already executing, it just receives the Signal. If it is not executing, then the Workflow executes first, and then the Signal is delivered to it. *Signal with start* is a "lazy" way to execute Workflows when Signaling them.

- [**Customer Application Approval DSL**](https://github.com/temporalio/samples-java/tree/master/src/main/java/io/temporal/samples/dsl): Demonstrates execution of a customer application approval workflow defined in a DSL (JSON or YAML)

tsurdilo marked this conversation as resolved.
Show resolved Hide resolved
### API demonstrations

- [**Updatable Timer**](https://github.com/temporalio/samples-java/tree/master/src/main/java/io/temporal/samples/updatabletimer): Demonstrates the use of a helper class which relies on `Workflow.await` to implement a blocking sleep that can be updated at any moment.
Expand Down
8 changes: 6 additions & 2 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,10 @@ dependencies {
implementation group: 'io.cloudevents', name: 'cloudevents-core', version: '2.2.0'
implementation group: 'io.cloudevents', name: 'cloudevents-api', version: '2.2.0'
implementation group: 'io.cloudevents', name: 'cloudevents-json-jackson', version: '2.2.0'
implementation group: 'io.serverlessworkflow', name: 'serverlessworkflow-api', version: '3.0.0.Final'
implementation group: 'io.serverlessworkflow', name: 'serverlessworkflow-validation', version: '3.0.0.Final'
implementation group: 'io.serverlessworkflow', name: 'serverlessworkflow-spi', version: '3.0.0.Final'
implementation group: 'com.jayway.jsonpath', name: 'json-path', version: '2.6.0'

testImplementation group: 'io.temporal', name: 'temporal-testing', version: '1.2.0'
testImplementation group: 'io.temporal', name: 'temporal-testing-junit4', version: '1.2.0'
Expand All @@ -44,8 +48,6 @@ dependencies {
testImplementation group: 'org.mockito', name: 'mockito-core', version: '3.12.4'
testImplementation group: 'org.powermock', name: 'powermock-api-mockito2', version: '2.0.9'



testImplementation 'org.junit.jupiter:junit-jupiter-api:5.8.0-M1'
testRuntimeOnly 'org.junit.jupiter:junit-jupiter-engine:5.8.0-M1'
testCompileOnly 'junit:junit:4.13.2'
Expand All @@ -70,6 +72,8 @@ task execute(type: JavaExec) {

license {
header rootProject.file('license-header.txt')
exclude '**/*.json'
exclude '**/*.yml'
}

// Executes most of the hello workflows as a sanity check
Expand Down
163 changes: 163 additions & 0 deletions src/main/java/io/temporal/samples/dsl/DslWorkflowUtils.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,163 @@
/*
* Copyright (c) 2020 Temporal Technologies, Inc. All Rights Reserved
*
* Copyright 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* Modifications copyright (C) 2017 Uber Technologies, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License"). You may not
* use this file except in compliance with the License. A copy of the License is
* located at
*
* http://aws.amazon.com/apache2.0
*
* or in the "license" file accompanying this file. This file is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
* express or implied. See the License for the specific language governing
* permissions and limitations under the License.
*/

package io.temporal.samples.dsl;

import com.fasterxml.jackson.databind.JsonNode;
import com.jayway.jsonpath.JsonPath;
import io.serverlessworkflow.api.Workflow;
import io.serverlessworkflow.api.interfaces.State;
import io.serverlessworkflow.api.retry.RetryDefinition;
import io.serverlessworkflow.api.states.EventState;
import io.temporal.activity.ActivityOptions;
import io.temporal.api.common.v1.WorkflowExecution;
import io.temporal.client.WorkflowOptions;
import io.temporal.client.WorkflowStub;
import io.temporal.common.RetryOptions;
import java.io.File;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.time.Duration;
import java.util.List;

/** Provides utility methods for dealing with DSL */
public class DslWorkflowUtils {

// Default task queue name if not specified in dsl
public static final String DEFAULT_TASK_QUEUE_NAME = "defaultdsltaskqueue";

/** Set workflow options from DSL */
public static WorkflowOptions getWorkflowOptions(Workflow workflow) {
WorkflowOptions.Builder dslWorkflowOptionsBuilder = WorkflowOptions.newBuilder();

if (workflow.getId() != null) {
dslWorkflowOptionsBuilder.setWorkflowId(workflow.getId());
}

dslWorkflowOptionsBuilder.setTaskQueue(getTaskQueueFromDsl(workflow));

if (workflow.getTimeouts() != null
&& workflow.getTimeouts().getWorkflowExecTimeout() != null
&& workflow.getTimeouts().getWorkflowExecTimeout().getDuration() != null) {
dslWorkflowOptionsBuilder.setWorkflowExecutionTimeout(
Duration.parse(workflow.getTimeouts().getWorkflowExecTimeout().getDuration()));
}

if (workflow.getStart().getSchedule() != null
&& workflow.getStart().getSchedule().getCron() != null) {
dslWorkflowOptionsBuilder.setCronSchedule(
workflow.getStart().getSchedule().getCron().getExpression());
}

return dslWorkflowOptionsBuilder.build();
}

/** Set Activity options from DSL */
public static ActivityOptions getActivityOptionsFromDsl(Workflow dslWorkflow) {
ActivityOptions.Builder dslActivityOptionsBuilder = ActivityOptions.newBuilder();
if (dslWorkflow.getTimeouts() != null
&& dslWorkflow.getTimeouts().getActionExecTimeout() != null) {
dslActivityOptionsBuilder.setStartToCloseTimeout(
Duration.parse(dslWorkflow.getTimeouts().getActionExecTimeout()));
}

// In SW spec each action (activity) can define a specific retry
// For this demo we just use the globally defined one for all actions
if (dslWorkflow.getRetries() != null
&& dslWorkflow.getRetries().getRetryDefs() != null
&& dslWorkflow.getRetries().getRetryDefs().size() > 0) {
RetryDefinition retryDefinition = dslWorkflow.getRetries().getRetryDefs().get(0);
RetryOptions.Builder dslRetryOptionsBuilder = RetryOptions.newBuilder();
if (retryDefinition.getMaxAttempts() != null) {
dslRetryOptionsBuilder.setMaximumAttempts(
Integer.parseInt(retryDefinition.getMaxAttempts()));
}
dslRetryOptionsBuilder.setBackoffCoefficient(1.0);
if (retryDefinition.getDelay() != null) {
dslRetryOptionsBuilder.setInitialInterval(Duration.parse(retryDefinition.getDelay()));
}
if (retryDefinition.getMaxDelay() != null) {
dslRetryOptionsBuilder.setMaximumInterval(Duration.parse(retryDefinition.getMaxDelay()));
}
}

return dslActivityOptionsBuilder.build();
}

/** Read file and return contents as string */
public static String getFileAsString(String fileName) throws IOException {
File file = new File(Starter.class.getClassLoader().getResource(fileName).getFile());
return new String(Files.readAllBytes(file.toPath()), StandardCharsets.UTF_8);
}

/** Get the task queue from DSL */
public static String getTaskQueueFromDsl(Workflow workflow) {
if (workflow.getMetadata() != null
&& workflow.getMetadata().size() > 0
&& workflow.getMetadata().get("taskqueue") != null) {
return workflow.getMetadata().get("taskqueue");
} else {
return DEFAULT_TASK_QUEUE_NAME;
}
}

/** Start workflow execution depending on the DSL */
public static WorkflowExecution startWorkflow(
WorkflowStub workflowStub, Workflow dslWorkflow, JsonNode workflowInput) {
State startingDslWorkflowState = getStartingWorkflowState(dslWorkflow);
if (startingDslWorkflowState instanceof EventState) {
// This demo can parse only the first event
EventState eventState = (EventState) startingDslWorkflowState;
String eventName = eventState.getOnEvents().get(0).getEventRefs().get(0);
// send input data as signal data
return workflowStub.signalWithStart(
eventName, new Object[] {workflowInput}, new Object[] {Workflow.toJson(dslWorkflow)});
} else {
// directly send input data to workflow
return workflowStub.start(Workflow.toJson(dslWorkflow), workflowInput);
}
}

/** Returns the starting workflow state from DSL */
public static State getStartingWorkflowState(Workflow dslWorkflow) {
String start = dslWorkflow.getStart().getStateName();
for (State state : dslWorkflow.getStates()) {
if (state.getName().equals(start)) {
return state;
}
}
return null;
}

/** Returns the workflow state with the provided name or null */
public static State getWorkflowStateWithName(String name, Workflow dslWorkflow) {
for (State state : dslWorkflow.getStates()) {
if (state.getName().equals(name)) {
return state;
}
}
return null;
}

/** Evaluates a JsonPath expression to true/false, used for switch states data conditions */
public static boolean isTrueDataCondition(String condition, String jsonData) {
return JsonPath.parse(jsonData).read(condition, List.class).size() > 0;
}
}
60 changes: 60 additions & 0 deletions src/main/java/io/temporal/samples/dsl/DynamicDslActivities.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
/*
* Copyright (c) 2020 Temporal Technologies, Inc. All Rights Reserved
*
* Copyright 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* Modifications copyright (C) 2017 Uber Technologies, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License"). You may not
* use this file except in compliance with the License. A copy of the License is
* located at
*
* http://aws.amazon.com/apache2.0
*
* or in the "license" file accompanying this file. This file is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
* express or implied. See the License for the specific language governing
* permissions and limitations under the License.
*/

package io.temporal.samples.dsl;

import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.node.ArrayNode;
import com.fasterxml.jackson.databind.node.ObjectNode;
import io.temporal.activity.Activity;
import io.temporal.activity.DynamicActivity;
import io.temporal.common.converter.EncodedValues;

public class DynamicDslActivities implements DynamicActivity {
@Override
public Object execute(EncodedValues args) {
// Get the activity type
String activityType = Activity.getExecutionContext().getInfo().getActivityType();
String actionName = args.get(0, String.class);
JsonNode workflowData = args.get(1, JsonNode.class);

try {
// Add a "actions" array to results to show what was executed
if (workflowData.get("actions") != null) {
((ArrayNode) workflowData.get("actions")).add(actionName);
} else {
((ObjectNode) workflowData).putArray("actions").add(actionName);
}

// Update the application status if we are approving / rejecting the customer
// This is for demo only, in real application this would come as result of the function
// invocation
if (activityType.equals("Invoke Approve Application Function")) {
((ObjectNode) workflowData.get("customer")).put("applicationStatus", "APPROVED");
}
if (activityType.equals("Invoke Reject Application Function")) {
((ObjectNode) workflowData.get("customer")).put("applicationStatus", "REJECTED");
}

return workflowData;
} catch (Exception e) {
throw Activity.wrap(e);
}
}
}
Loading