diff --git a/pom.xml b/pom.xml
index 5ac41cb..6c14077 100644
--- a/pom.xml
+++ b/pom.xml
@@ -19,7 +19,7 @@
com.americanexpress.unify.flowret
unify-flowret
- 1.4.1
+ 1.4.2
jar
unify-flowret
diff --git a/readme.md b/readme.md
index 8543fc3..13e98b1 100644
--- a/readme.md
+++ b/readme.md
@@ -20,7 +20,7 @@ Flowret is available as a jar file in Maven central with the following Maven coo
````pom
com.americanexpress.unify.flowret
unify-flowret
-1.4.1
+1.4.2
````
---
@@ -86,11 +86,11 @@ case
BPM products do not offer true technical parallel processing.
They offer business parallel processing in which while one can have multiple branches
emanating from a route, they will still be executed one at a time.
-1. Except for synchronization of application specific data structures, no additional work around enabling
-parallel processing is required to be done on the application / consumer side. Parallel
-processing is completely managed by Flowret as specified in the process
-definition file
-1. Configurable number of threads are used for parallel processing
+1. Except for synchronization of application specific data structures, no additional work around enabling parallel
+ processing is required to be done on the application / consumer side. Parallel processing is completely managed by
+ Flowret as specified in the process definition file
+1. Configurable number of threads from a pool used for parallel processing or unbounded child threads - the choice is
+ yours
##### State Management
1. Implements "resume from where it left off" functionality
@@ -445,30 +445,58 @@ step should be outside of the outest `p_route` / `p_join` construct
#### Initialize Flowret - needs to be done only once at startup
```java
-Flowret.init(maxThreads, idleTimeout, typeIdSep);
+ Flowret.init(idleTimeout,typeIdSep);
+ Flowret.init(idleTimeout,typeIdSep,errorWorkbasket);
+ Flowret.init(maxThreads,idleTimeout,typeIdSep);
+ Flowret.init(maxThreads,idleTimeout,typeIdSep,errorWorkbasket);
```
-`int maxThreads` specifies the maxiumum number of threads
-in the pool used for parallel processing. By default, the caller thread
-is used to run the process in case of single threaded process.
+`int maxThreads`
-`int idleTimeout` specifies the idle time of a thread in the parallel
-processing thread pool after which it will be terminated to conserve system resources.
+Specifies the maxiumum number of threads in an executor eervice pool used for parallel processing.
-`String typeIdSep` specifies the character to be used as the separator between the type and id
-fields in the name of the document to be written to the data store (via dao object).
-Flowret uses the following document naming convention:
+This variable only comes into picture when Flowret has to do parallel processing. For single threaded process execution,
+the caller thread is used to run the process.
-``
+The parallel processing can be setup in two ways. If the value of this variable is specified and is more than 0, then
+this specifies the maximum number of threads which can be used in parallel processing across cases. This is important to
+understand - Flowret will internally create a thread pool with so many threads and each time it is required for a
+parallel path to be executed, a thread from this pool will be used. This is a fixed thread pool which allows clients to
+specify an upper bound on the number of threads to be used for parallel processing.
-At this point of time, we can describe the various documents that Flowret writes to the data store
-as it executes a case.
+In case the value passed for this variable is less than or equal to 0, Flowret will create threads on the fly with no
+upper bound. Note that, in this option, there may be a very small impact on performance as each time a parallel path is
+to be executed, a new thread will be created (
+as compared to the fixed thread pool where the threads are already created and ready to run). Very important to note is
+that this option is not bounded. In other words, clients could run multiple parallel processing cases such that the pod
+gets overwhelmed with the high number of threads / processing. It is left up to the clients to take care of such
+scenarios and put some kind of safe guards.
-1. Audit Log - a document that stores the state of the execution paths and process variables after
-execution of each step / route
- 1. Type - `flowret_audit_log`
- 1. Separator - `-`
- 1. id - `__`
+`int idleTimeout`
+
+Specifies the idle time of a thread in the parallel processing thread pool after which it will be terminated to conserve
+system resources.
+
+`String typeIdSep`
+
+Specifies the character to be used as the separator between the type and id fields in the name of the document to be
+written to the data store (via dao object). Flowret uses the following document naming
+convention ``
+
+`String errorWorkBasket`
+
+Specifies the name of the work basket to be used in case Flowret encounters an error after the step / route has been
+executed but Flowret encounters an error while processing the application event or encounters an internal error. This
+value will be written out to the process info file.
+
+At this point of time, we can describe the various documents that Flowret writes to the data store as it executes a
+case.
+
+1. Audit Log - a document that stores the state of the execution paths and process variables after execution of each
+ step / route
+ 1. Type - `flowret_audit_log`
+ 1. Separator - `-`
+ 1. id - `__`
1. Example - `flowret_audit_log-1_00001_step_13`
1. Journey - the document that stores the process definition which a case needs to execute
1. Type - `flowret_journey`
diff --git a/src/main/java/com/americanexpress/unify/flowret/ExecThreadTask.java b/src/main/java/com/americanexpress/unify/flowret/ExecThreadTask.java
index df6bdfb..6f77424 100644
--- a/src/main/java/com/americanexpress/unify/flowret/ExecThreadTask.java
+++ b/src/main/java/com/americanexpress/unify/flowret/ExecThreadTask.java
@@ -749,8 +749,6 @@ private String processJoin(Join join) {
private String executeThreads(ExecPath parentExecPath, Route route, List branches) {
int count = branches.size();
ExecThreadTask[] tasks = new ExecThreadTask[count];
- Future>[] futures = new Future[count];
- ExecutorService es = Flowret.instance().getExecutorService();
for (int i = 0; i < count; i++) {
String branchName = branches.get(i);
@@ -772,12 +770,41 @@ private String executeThreads(ExecPath parentExecPath, Route route, List
pi.setExecPath(ep);
}
+ // run threads and wait for them to finish
+ runThreads(tasks);
+
+ // check if all have completed
+ boolean isPend = false;
+ String joinPoint = null;
+ for (int i = 0; i < tasks.length; i++) {
+ ExecThreadTask in = tasks[i];
+ ExecPath ep = in.execPath;
+ joinPoint = ep.getStep();
+
+ if (ep.getPendWorkBasket().isEmpty() == false) {
+ isPend = true;
+ break;
+ }
+ }
+
+ if (isPend == false) {
+ return joinPoint;
+ }
+ else {
+ return null;
+ }
+ }
+
+ private void runThreadsWithExecutorService(ExecutorService executorService, ExecThreadTask[] tasks) {
+ int count = tasks.length;
+ Future>[] futures = new Future[count];
+
// start threads
for (int i = 0; i < count; i++) {
- futures[i] = es.submit(tasks[i]);
+ futures[i] = executorService.submit(tasks[i]);
}
- // wait for threads to finish
+ // wait for them to finish
for (int i = 0; i < tasks.length; i++) {
try {
futures[i].get();
@@ -787,26 +814,36 @@ private String executeThreads(ExecPath parentExecPath, Route route, List
throw new UnifyException("flowret_err_5", e, pi.getCaseId());
}
}
+ }
- // check if all have completed
- boolean isPend = false;
- String joinPoint = null;
+ private void runThreadsAsChildren(ExecThreadTask[] tasks) {
+ Thread[] threads = new Thread[tasks.length];
+
+ // start threads
for (int i = 0; i < tasks.length; i++) {
- ExecThreadTask in = tasks[i];
- ExecPath ep = in.execPath;
- joinPoint = ep.getStep();
+ threads[i] = new Thread(tasks[i]);
+ threads[i].start();
+ }
- if (ep.getPendWorkBasket().isEmpty() == false) {
- isPend = true;
- break;
+ // wait for them to finish
+ for (int i = 0; i < tasks.length; i++) {
+ try {
+ threads[i].join();
+ }
+ catch (InterruptedException e) {
+ // should never happen
+ throw new UnifyException("flowret_err_5", e, pi.getCaseId());
}
}
+ }
- if (isPend == false) {
- return joinPoint;
+ private void runThreads(ExecThreadTask[] tasks) {
+ ExecutorService executorService = Flowret.instance().getExecutorService();
+ if (executorService == null) {
+ runThreadsAsChildren(tasks);
}
else {
- return null;
+ runThreadsWithExecutorService(executorService, tasks);
}
}
diff --git a/src/main/java/com/americanexpress/unify/flowret/Flowret.java b/src/main/java/com/americanexpress/unify/flowret/Flowret.java
index b2d1cb2..9060224 100644
--- a/src/main/java/com/americanexpress/unify/flowret/Flowret.java
+++ b/src/main/java/com/americanexpress/unify/flowret/Flowret.java
@@ -21,7 +21,10 @@
import com.americanexpress.unify.flowret.CONSTS_FLOWRET.DAO;
import com.americanexpress.unify.jdocs.JDocument;
-import java.util.concurrent.*;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
/*
* @author Deepak Arora
@@ -71,33 +74,27 @@ public Wms getWorkManagementService(FlowretDao dao, WorkManager wm, ISlaQueueMan
private Flowret() {
}
- /**
- * Method that is called for initializing Flowret
- *
- * @param maxThreads specifies the number of threads used for parallel processing
- * @param idleTimeout specifies the time out in milliseconds after which parallel processing threads will die out if idle
- * @param typeIdSep specifies the separator character to use to separate the type and the id in the document name used to persist in the data store
- */
+ public static void init(int idleTimeout, String typeIdSep) {
+ init(0, idleTimeout, typeIdSep, "flowret_error");
+ }
+
+ public static void init(int idleTimeout, String typeIdSep, String errorWorkbasket) {
+ init(0, idleTimeout, typeIdSep, errorWorkbasket);
+ }
+
public static void init(int maxThreads, int idleTimeout, String typeIdSep) {
init(maxThreads, idleTimeout, typeIdSep, "flowret_error");
}
public static void init(int maxThreads, int idleTimeout, String typeIdSep, String errorWorkbasket) {
- init(maxThreads, idleTimeout, typeIdSep, errorWorkbasket, null);
- }
-
- public static void init(int maxThreads, int idleTimeout, String typeIdSep, String errorWorkbasket, ThreadFactory threadFactory) {
Flowret am = instance();
am.maxThreads = maxThreads;
am.idleTimeout = idleTimeout;
- BlockOnOfferQueue q = new BlockOnOfferQueue(new ArrayBlockingQueue<>(am.maxThreads * 2));
- if (threadFactory == null) {
+ if (maxThreads > 0) {
+ BlockOnOfferQueue q = new BlockOnOfferQueue(new ArrayBlockingQueue<>(am.maxThreads * 2));
am.es = new ThreadPoolExecutor(am.maxThreads, am.maxThreads, am.idleTimeout, TimeUnit.MILLISECONDS, q, new RejectedItemHandler());
}
- else {
- am.es = new ThreadPoolExecutor(am.maxThreads, am.maxThreads, am.idleTimeout, TimeUnit.MILLISECONDS, q, threadFactory, new RejectedItemHandler());
- }
DAO.SEP = typeIdSep;
am.errorWorkbasket = errorWorkbasket;
@@ -112,15 +109,17 @@ public static void init(int maxThreads, int idleTimeout, String typeIdSep, Strin
* Method that is used to close Flowret
*/
public static void close() {
- singleton.es.shutdown();
- try {
- singleton.es.awaitTermination(5, TimeUnit.MINUTES);
- }
- catch (InterruptedException e) {
- // should never happen
- throw new UnifyException("flowret_err_7", e);
+ if (singleton.es != null) {
+ singleton.es.shutdown();
+ try {
+ singleton.es.awaitTermination(5, TimeUnit.MINUTES);
+ }
+ catch (InterruptedException e) {
+ // should never happen
+ throw new UnifyException("flowret_err_7", e);
+ }
+ singleton.es = null;
}
- singleton.es = null;
}
public int getMaxThreads() {
diff --git a/src/test/java/com/americanexpress/unify/flowret/test_parallel/TestFlowretParallel2.java b/src/test/java/com/americanexpress/unify/flowret/test_parallel/TestFlowretParallel2.java
new file mode 100644
index 0000000..6431c8a
--- /dev/null
+++ b/src/test/java/com/americanexpress/unify/flowret/test_parallel/TestFlowretParallel2.java
@@ -0,0 +1,97 @@
+/*
+ * Copyright 2020 American Express Travel Related Services Company, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
+ * in compliance with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package com.americanexpress.unify.flowret.test_parallel;
+
+import com.americanexpress.unify.base.BaseUtils;
+import com.americanexpress.unify.base.UnifyException;
+import com.americanexpress.unify.flowret.*;
+import com.americanexpress.unify.flowret.test_singular.TestFlowret;
+import org.junit.jupiter.api.*;
+
+import java.io.File;
+
+/*
+ * @author Deepak Arora
+ */
+public class TestFlowretParallel2 {
+
+ private static String dirPath = "./target/test-data-results/";
+ private static Rts rts = null;
+ private static FileDao dao = null;
+ private static ProcessComponentFactory factory = null;
+ private static EventHandler handler = null;
+
+ @BeforeAll
+ protected static void setEnv() throws Exception {
+ File directory = new File(dirPath);
+ if (!directory.exists()) {
+ directory.mkdir();
+ }
+
+ ERRORS_FLOWRET.load();
+ Flowret.init(30000, "-");
+ }
+
+ @BeforeEach
+ protected void beforeEach() {
+ TestUtils.deleteFiles(dirPath);
+ StepResponseFactory.clear();
+ }
+
+ @AfterEach
+ protected void afterEach() {
+ // nothing to do
+ }
+
+ @AfterAll
+ protected static void afterAll() {
+ Flowret.instance().close();
+ TestUtils.deleteFiles(dirPath);
+ }
+
+ // 3 branches, happy path i.e. all branches proceed
+ public static void setScenario1() {
+ // nothing to do
+ }
+
+ private static void init(FlowretDao dao, ProcessComponentFactory factory, EventHandler handler, ISlaQueueManager sqm) {
+ rts = Flowret.instance().getRunTimeService(dao, factory, handler, sqm);
+ }
+
+ private static void runJourney(String journey) {
+ String json = BaseUtils.getResourceAsString(TestFlowret.class, "/flowret/" + journey + ".json");
+ if (new File(dirPath + "flowret_process_info-1.json").exists() == false) {
+ rts.startCase("1", json, null, null);
+ }
+
+ try {
+ while (true) {
+ System.out.println();
+ rts.resumeCase("1");
+ }
+ }
+ catch (UnifyException e) {
+ System.out.println("Exception -> " + e.getMessage());
+ }
+ }
+
+ @Test
+ void testScenario1() {
+ setScenario1();
+ init(new FileDao(dirPath), new TestComponentFactoryParallel1(), new TestHandler(), null);
+ runJourney("parallel_test_1");
+ }
+
+}