Skip to content

Commit

Permalink
parallel_processing_enhance
Browse files Browse the repository at this point in the history
  • Loading branch information
deepakarora3 committed Jul 24, 2023
1 parent 0b9b42a commit fcff113
Show file tree
Hide file tree
Showing 5 changed files with 226 additions and 65 deletions.
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@

<groupId>com.americanexpress.unify.flowret</groupId>
<artifactId>unify-flowret</artifactId>
<version>1.4.1</version>
<version>1.4.2</version>
<packaging>jar</packaging>

<name>unify-flowret</name>
Expand Down
74 changes: 51 additions & 23 deletions readme.md
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ Flowret is available as a jar file in Maven central with the following Maven coo
````pom
<groupId>com.americanexpress.unify.flowret</groupId>
<artifactId>unify-flowret</artifactId>
<version>1.4.1</version>
<version>1.4.2</version>
````

---
Expand Down Expand Up @@ -86,11 +86,11 @@ case
BPM products do not offer true technical parallel processing.
They offer business parallel processing in which while one can have multiple branches
emanating from a route, they will still be executed one at a time.
1. Except for synchronization of application specific data structures, no additional work around enabling
parallel processing is required to be done on the application / consumer side. Parallel
processing is completely managed by Flowret as specified in the process
definition file
1. Configurable number of threads are used for parallel processing
1. Except for synchronization of application specific data structures, no additional work around enabling parallel
processing is required to be done on the application / consumer side. Parallel processing is completely managed by
Flowret as specified in the process definition file
1. Configurable number of threads from a pool used for parallel processing or unbounded child threads - the choice is
yours

##### State Management
1. Implements "resume from where it left off" functionality
Expand Down Expand Up @@ -445,30 +445,58 @@ step should be outside of the outest `p_route` / `p_join` construct
#### Initialize Flowret - needs to be done only once at startup

```java
Flowret.init(maxThreads, idleTimeout, typeIdSep);
Flowret.init(idleTimeout,typeIdSep);
Flowret.init(idleTimeout,typeIdSep,errorWorkbasket);
Flowret.init(maxThreads,idleTimeout,typeIdSep);
Flowret.init(maxThreads,idleTimeout,typeIdSep,errorWorkbasket);
```

`int maxThreads` specifies the maxiumum number of threads
in the pool used for parallel processing. By default, the caller thread
is used to run the process in case of single threaded process.
`int maxThreads`

`int idleTimeout` specifies the idle time of a thread in the parallel
processing thread pool after which it will be terminated to conserve system resources.
Specifies the maxiumum number of threads in an executor eervice pool used for parallel processing.

`String typeIdSep` specifies the character to be used as the separator between the type and id
fields in the name of the document to be written to the data store (via dao object).
Flowret uses the following document naming convention:
This variable only comes into picture when Flowret has to do parallel processing. For single threaded process execution,
the caller thread is used to run the process.

`<type><separator><id>`
The parallel processing can be setup in two ways. If the value of this variable is specified and is more than 0, then
this specifies the maximum number of threads which can be used in parallel processing across cases. This is important to
understand - Flowret will internally create a thread pool with so many threads and each time it is required for a
parallel path to be executed, a thread from this pool will be used. This is a fixed thread pool which allows clients to
specify an upper bound on the number of threads to be used for parallel processing.

At this point of time, we can describe the various documents that Flowret writes to the data store
as it executes a case.
In case the value passed for this variable is less than or equal to 0, Flowret will create threads on the fly with no
upper bound. Note that, in this option, there may be a very small impact on performance as each time a parallel path is
to be executed, a new thread will be created (
as compared to the fixed thread pool where the threads are already created and ready to run). Very important to note is
that this option is not bounded. In other words, clients could run multiple parallel processing cases such that the pod
gets overwhelmed with the high number of threads / processing. It is left up to the clients to take care of such
scenarios and put some kind of safe guards.

1. Audit Log - a document that stores the state of the execution paths and process variables after
execution of each step / route
1. Type - `flowret_audit_log`
1. Separator - `-`
1. id - `<case_id>_<sequence_number>_<step_name>`
`int idleTimeout`

Specifies the idle time of a thread in the parallel processing thread pool after which it will be terminated to conserve
system resources.

`String typeIdSep`

Specifies the character to be used as the separator between the type and id fields in the name of the document to be
written to the data store (via dao object). Flowret uses the following document naming
convention `<type><separator><id>`

`String errorWorkBasket`

Specifies the name of the work basket to be used in case Flowret encounters an error after the step / route has been
executed but Flowret encounters an error while processing the application event or encounters an internal error. This
value will be written out to the process info file.

At this point of time, we can describe the various documents that Flowret writes to the data store as it executes a
case.

1. Audit Log - a document that stores the state of the execution paths and process variables after execution of each
step / route
1. Type - `flowret_audit_log`
1. Separator - `-`
1. id - `<case_id>_<sequence_number>_<step_name>`
1. Example - `flowret_audit_log-1_00001_step_13`
1. Journey - the document that stores the process definition which a case needs to execute
1. Type - `flowret_journey`
Expand Down
69 changes: 53 additions & 16 deletions src/main/java/com/americanexpress/unify/flowret/ExecThreadTask.java
Original file line number Diff line number Diff line change
Expand Up @@ -749,8 +749,6 @@ private String processJoin(Join join) {
private String executeThreads(ExecPath parentExecPath, Route route, List<String> branches) {
int count = branches.size();
ExecThreadTask[] tasks = new ExecThreadTask[count];
Future<?>[] futures = new Future[count];
ExecutorService es = Flowret.instance().getExecutorService();

for (int i = 0; i < count; i++) {
String branchName = branches.get(i);
Expand All @@ -772,12 +770,41 @@ private String executeThreads(ExecPath parentExecPath, Route route, List<String>
pi.setExecPath(ep);
}

// run threads and wait for them to finish
runThreads(tasks);

// check if all have completed
boolean isPend = false;
String joinPoint = null;
for (int i = 0; i < tasks.length; i++) {
ExecThreadTask in = tasks[i];
ExecPath ep = in.execPath;
joinPoint = ep.getStep();

if (ep.getPendWorkBasket().isEmpty() == false) {
isPend = true;
break;
}
}

if (isPend == false) {
return joinPoint;
}
else {
return null;
}
}

private void runThreadsWithExecutorService(ExecutorService executorService, ExecThreadTask[] tasks) {
int count = tasks.length;
Future<?>[] futures = new Future[count];

// start threads
for (int i = 0; i < count; i++) {
futures[i] = es.submit(tasks[i]);
futures[i] = executorService.submit(tasks[i]);
}

// wait for threads to finish
// wait for them to finish
for (int i = 0; i < tasks.length; i++) {
try {
futures[i].get();
Expand All @@ -787,26 +814,36 @@ private String executeThreads(ExecPath parentExecPath, Route route, List<String>
throw new UnifyException("flowret_err_5", e, pi.getCaseId());
}
}
}

// check if all have completed
boolean isPend = false;
String joinPoint = null;
private void runThreadsAsChildren(ExecThreadTask[] tasks) {
Thread[] threads = new Thread[tasks.length];

// start threads
for (int i = 0; i < tasks.length; i++) {
ExecThreadTask in = tasks[i];
ExecPath ep = in.execPath;
joinPoint = ep.getStep();
threads[i] = new Thread(tasks[i]);
threads[i].start();
}

if (ep.getPendWorkBasket().isEmpty() == false) {
isPend = true;
break;
// wait for them to finish
for (int i = 0; i < tasks.length; i++) {
try {
threads[i].join();
}
catch (InterruptedException e) {
// should never happen
throw new UnifyException("flowret_err_5", e, pi.getCaseId());
}
}
}

if (isPend == false) {
return joinPoint;
private void runThreads(ExecThreadTask[] tasks) {
ExecutorService executorService = Flowret.instance().getExecutorService();
if (executorService == null) {
runThreadsAsChildren(tasks);
}
else {
return null;
runThreadsWithExecutorService(executorService, tasks);
}
}

Expand Down
49 changes: 24 additions & 25 deletions src/main/java/com/americanexpress/unify/flowret/Flowret.java
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,10 @@
import com.americanexpress.unify.flowret.CONSTS_FLOWRET.DAO;
import com.americanexpress.unify.jdocs.JDocument;

import java.util.concurrent.*;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

/*
* @author Deepak Arora
Expand Down Expand Up @@ -71,33 +74,27 @@ public Wms getWorkManagementService(FlowretDao dao, WorkManager wm, ISlaQueueMan
private Flowret() {
}

/**
* Method that is called for initializing Flowret
*
* @param maxThreads specifies the number of threads used for parallel processing
* @param idleTimeout specifies the time out in milliseconds after which parallel processing threads will die out if idle
* @param typeIdSep specifies the separator character to use to separate the type and the id in the document name used to persist in the data store
*/
public static void init(int idleTimeout, String typeIdSep) {
init(0, idleTimeout, typeIdSep, "flowret_error");
}

public static void init(int idleTimeout, String typeIdSep, String errorWorkbasket) {
init(0, idleTimeout, typeIdSep, errorWorkbasket);
}

public static void init(int maxThreads, int idleTimeout, String typeIdSep) {
init(maxThreads, idleTimeout, typeIdSep, "flowret_error");
}

public static void init(int maxThreads, int idleTimeout, String typeIdSep, String errorWorkbasket) {
init(maxThreads, idleTimeout, typeIdSep, errorWorkbasket, null);
}

public static void init(int maxThreads, int idleTimeout, String typeIdSep, String errorWorkbasket, ThreadFactory threadFactory) {
Flowret am = instance();
am.maxThreads = maxThreads;
am.idleTimeout = idleTimeout;
BlockOnOfferQueue<Runnable> q = new BlockOnOfferQueue(new ArrayBlockingQueue<>(am.maxThreads * 2));

if (threadFactory == null) {
if (maxThreads > 0) {
BlockOnOfferQueue<Runnable> q = new BlockOnOfferQueue(new ArrayBlockingQueue<>(am.maxThreads * 2));
am.es = new ThreadPoolExecutor(am.maxThreads, am.maxThreads, am.idleTimeout, TimeUnit.MILLISECONDS, q, new RejectedItemHandler());
}
else {
am.es = new ThreadPoolExecutor(am.maxThreads, am.maxThreads, am.idleTimeout, TimeUnit.MILLISECONDS, q, threadFactory, new RejectedItemHandler());
}

DAO.SEP = typeIdSep;
am.errorWorkbasket = errorWorkbasket;
Expand All @@ -112,15 +109,17 @@ public static void init(int maxThreads, int idleTimeout, String typeIdSep, Strin
* Method that is used to close Flowret
*/
public static void close() {
singleton.es.shutdown();
try {
singleton.es.awaitTermination(5, TimeUnit.MINUTES);
}
catch (InterruptedException e) {
// should never happen
throw new UnifyException("flowret_err_7", e);
if (singleton.es != null) {
singleton.es.shutdown();
try {
singleton.es.awaitTermination(5, TimeUnit.MINUTES);
}
catch (InterruptedException e) {
// should never happen
throw new UnifyException("flowret_err_7", e);
}
singleton.es = null;
}
singleton.es = null;
}

public int getMaxThreads() {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
/*
* Copyright 2020 American Express Travel Related Services Company, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
* in compliance with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
* or implied. See the License for the specific language governing permissions and limitations under
* the License.
*/

package com.americanexpress.unify.flowret.test_parallel;

import com.americanexpress.unify.base.BaseUtils;
import com.americanexpress.unify.base.UnifyException;
import com.americanexpress.unify.flowret.*;
import com.americanexpress.unify.flowret.test_singular.TestFlowret;
import org.junit.jupiter.api.*;

import java.io.File;

/*
* @author Deepak Arora
*/
public class TestFlowretParallel2 {

private static String dirPath = "./target/test-data-results/";
private static Rts rts = null;
private static FileDao dao = null;
private static ProcessComponentFactory factory = null;
private static EventHandler handler = null;

@BeforeAll
protected static void setEnv() throws Exception {
File directory = new File(dirPath);
if (!directory.exists()) {
directory.mkdir();
}

ERRORS_FLOWRET.load();
Flowret.init(30000, "-");
}

@BeforeEach
protected void beforeEach() {
TestUtils.deleteFiles(dirPath);
StepResponseFactory.clear();
}

@AfterEach
protected void afterEach() {
// nothing to do
}

@AfterAll
protected static void afterAll() {
Flowret.instance().close();
TestUtils.deleteFiles(dirPath);
}

// 3 branches, happy path i.e. all branches proceed
public static void setScenario1() {
// nothing to do
}

private static void init(FlowretDao dao, ProcessComponentFactory factory, EventHandler handler, ISlaQueueManager sqm) {
rts = Flowret.instance().getRunTimeService(dao, factory, handler, sqm);
}

private static void runJourney(String journey) {
String json = BaseUtils.getResourceAsString(TestFlowret.class, "/flowret/" + journey + ".json");
if (new File(dirPath + "flowret_process_info-1.json").exists() == false) {
rts.startCase("1", json, null, null);
}

try {
while (true) {
System.out.println();
rts.resumeCase("1");
}
}
catch (UnifyException e) {
System.out.println("Exception -> " + e.getMessage());
}
}

@Test
void testScenario1() {
setScenario1();
init(new FileDao(dirPath), new TestComponentFactoryParallel1(), new TestHandler(), null);
runJourney("parallel_test_1");
}

}

0 comments on commit fcff113

Please sign in to comment.