From e0dcad0895bedce48b12e5a700e0b31a94743edd Mon Sep 17 00:00:00 2001 From: Daniel Widdis Date: Thu, 7 Sep 2023 18:14:53 -0700 Subject: [PATCH 01/12] Topological Sorting and Sequenced Execution Signed-off-by: Daniel Widdis --- build.gradle | 1 + .../flowframework/template/ProcessNode.java | 94 ++++++++++ .../template/ProcessSequenceEdge.java | 48 +++++ .../template/TemplateParser.java | 168 ++++++++++++++++++ 4 files changed, 311 insertions(+) create mode 100644 src/main/java/org/opensearch/flowframework/template/ProcessNode.java create mode 100644 src/main/java/org/opensearch/flowframework/template/ProcessSequenceEdge.java create mode 100644 src/main/java/org/opensearch/flowframework/template/TemplateParser.java diff --git a/build.gradle b/build.gradle index 748757484..aa20423ee 100644 --- a/build.gradle +++ b/build.gradle @@ -105,6 +105,7 @@ repositories { dependencies { implementation "org.opensearch:opensearch:${opensearch_version}" implementation 'org.junit.jupiter:junit-jupiter:5.10.0' + implementation "com.google.code.gson:gson:2.10.1" compileOnly "com.google.guava:guava:32.1.2-jre" api group: 'org.opensearch', name:'opensearch-ml-client', version: "${opensearch_build}" diff --git a/src/main/java/org/opensearch/flowframework/template/ProcessNode.java b/src/main/java/org/opensearch/flowframework/template/ProcessNode.java new file mode 100644 index 000000000..e1b57fc51 --- /dev/null +++ b/src/main/java/org/opensearch/flowframework/template/ProcessNode.java @@ -0,0 +1,94 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ +package org.opensearch.flowframework.template; + +import java.util.Collections; +import java.util.List; +import java.util.Objects; +import java.util.Set; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; + +public class ProcessNode { + private final String id; + private CompletableFuture future; + + // will be populated during graph parsing + private Set predecessors = Collections.emptySet(); + + ProcessNode(String id) { + this.id = id; + } + + public String getId() { + return id; + } + + public CompletableFuture getFuture() { + return future; + } + + public Set getPredecessors() { + return predecessors; + } + + public void setPredecessors(Set predecessors) { + this.predecessors = Set.copyOf(predecessors); + } + + public CompletableFuture execute() { + this.future = new CompletableFuture<>(); + CompletableFuture.runAsync(() -> { + if (!predecessors.isEmpty()) { + List> predFutures = predecessors.stream().map(p -> p.getFuture()).collect(Collectors.toList()); + CompletableFuture waitForPredecessors = CompletableFuture.allOf(predFutures.toArray(new CompletableFuture[0])); + try { + waitForPredecessors.orTimeout(30, TimeUnit.SECONDS).get(); + } catch (InterruptedException | ExecutionException e) { + future.completeExceptionally(e); + } + } + if (future.isCompletedExceptionally()) { + return; + } + System.out.println(">>> Starting " + this.id); + sleep(id.contains("ingest") ? 8000 : 4000); + System.out.println("<<< Finished " + this.id); + future.complete(this.id); + }); + return this.future; + } + + private void sleep(long i) { + try { + Thread.sleep(i); + } catch (InterruptedException e) {} + } + + @Override + public int hashCode() { + return Objects.hash(id); + } + + @Override + public boolean equals(Object obj) { + if (this == obj) return true; + if (obj == null) return false; + if (getClass() != obj.getClass()) return false; + ProcessNode other = (ProcessNode) obj; + return Objects.equals(id, other.id); + } + + @Override + public String toString() { + return this.id; + } +} diff --git a/src/main/java/org/opensearch/flowframework/template/ProcessSequenceEdge.java b/src/main/java/org/opensearch/flowframework/template/ProcessSequenceEdge.java new file mode 100644 index 000000000..4d768958f --- /dev/null +++ b/src/main/java/org/opensearch/flowframework/template/ProcessSequenceEdge.java @@ -0,0 +1,48 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ +package org.opensearch.flowframework.template; + +import java.util.Objects; + +public class ProcessSequenceEdge { + private final String source; + private final String destination; + + ProcessSequenceEdge(String source, String destination) { + this.source = source; + this.destination = destination; + } + + public String getSource() { + return source; + } + + public String getDestination() { + return destination; + } + + @Override + public int hashCode() { + return Objects.hash(destination, source); + } + + @Override + public boolean equals(Object obj) { + if (this == obj) return true; + if (obj == null) return false; + if (getClass() != obj.getClass()) return false; + ProcessSequenceEdge other = (ProcessSequenceEdge) obj; + return Objects.equals(destination, other.destination) && Objects.equals(source, other.source); + } + + @Override + public String toString() { + return this.source + "->" + this.destination; + } +} diff --git a/src/main/java/org/opensearch/flowframework/template/TemplateParser.java b/src/main/java/org/opensearch/flowframework/template/TemplateParser.java new file mode 100644 index 000000000..9d125d118 --- /dev/null +++ b/src/main/java/org/opensearch/flowframework/template/TemplateParser.java @@ -0,0 +1,168 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ +package org.opensearch.flowframework.template; + +import com.google.gson.Gson; +import com.google.gson.JsonElement; +import com.google.gson.JsonObject; + +import java.util.ArrayDeque; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Queue; +import java.util.Set; +import java.util.concurrent.CompletableFuture; +import java.util.function.Function; +import java.util.stream.Collectors; + +public class TemplateParser { + + public static void main(String[] args) { + String json = "{\n" + + " \"sequence\": {\n" + + " \"nodes\": [\n" + + " {\n" + + " \"id\": \"fetch_model\"\n" + + " },\n" + + " {\n" + + " \"id\": \"create_ingest_pipeline\"\n" + + " },\n" + + " {\n" + + " \"id\": \"create_search_pipeline\"\n" + + " },\n" + + " {\n" + + " \"id\": \"create_neural_search_index\"\n" + + " }\n" + + " ],\n" + + " \"edges\": [\n" + + " {\n" + + " \"source\": \"fetch_model\",\n" + + " \"dest\": \"create_ingest_pipeline\"\n" + + " },\n" + + " {\n" + + " \"source\": \"fetch_model\",\n" + + " \"dest\": \"create_search_pipeline\"\n" + + " },\n" + + " {\n" + + " \"source\": \"create_ingest_pipeline\",\n" + + " \"dest\": \"create_neural_search_index\"\n" + + " },\n" + + " {\n" + + " \"source\": \"create_search_pipeline\",\n" + + " \"dest\": \"create_neural_search_index\"\n" + // + " }\n," + // + " {\n" + // + " \"source\": \"create_neural_search_index\",\n" + // + " \"dest\": \"fetch_model\"\n" + + " }\n" + + " ]\n" + + " }\n" + + "}"; + + System.out.println(json); + + System.out.println("Parsing graph to sequence..."); + List processSequence = parseJsonGraphToSequence(json); + List> futureList = new ArrayList<>(); + + for (ProcessNode n : processSequence) { + Set predecessors = n.getPredecessors(); + System.out.format( + "Queueing process [%s]. %s.%n", + n.getId(), + predecessors.isEmpty() + ? "Can start immediately!" + : String.format( + "Must wait for [%s] to complete first.", + predecessors.stream().map(p -> p.getId()).collect(Collectors.joining(", ")) + ) + ); + futureList.add(n.execute()); + } + futureList.forEach(CompletableFuture::join); + System.out.println("All done!"); + } + + private static List parseJsonGraphToSequence(String json) { + Gson gson = new Gson(); + JsonObject jsonObject = gson.fromJson(json, JsonObject.class); + + JsonObject graph = jsonObject.getAsJsonObject("sequence"); + + List nodes = new ArrayList<>(); + List edges = new ArrayList<>(); + + for (JsonElement nodeJson : graph.getAsJsonArray("nodes")) { + JsonObject nodeObject = nodeJson.getAsJsonObject(); + String nodeId = nodeObject.get("id").getAsString(); + nodes.add(new ProcessNode(nodeId)); + } + + for (JsonElement edgeJson : graph.getAsJsonArray("edges")) { + JsonObject edgeObject = edgeJson.getAsJsonObject(); + String sourceNodeId = edgeObject.get("source").getAsString(); + String destNodeId = edgeObject.get("dest").getAsString(); + edges.add(new ProcessSequenceEdge(sourceNodeId, destNodeId)); + } + + return topologicalSort(nodes, edges); + } + + private static List topologicalSort(List nodes, List edges) { + // Define the graph + Set graph = new HashSet<>(edges); + // Map node id string to object + Map nodeMap = nodes.stream().collect(Collectors.toMap(ProcessNode::getId, Function.identity())); + // Build predecessor and successor maps + Map> predecessorEdges = new HashMap<>(); + Map> successorEdges = new HashMap<>(); + for (ProcessSequenceEdge edge : edges) { + ProcessNode source = nodeMap.get(edge.getSource()); + ProcessNode dest = nodeMap.get(edge.getDestination()); + predecessorEdges.computeIfAbsent(dest, k -> new HashSet<>()).add(edge); + successorEdges.computeIfAbsent(source, k -> new HashSet<>()).add(edge); + } + // See https://en.wikipedia.org/wiki/Topological_sorting#Kahn's_algorithm + // Find start node(s) which have no predecessors + Queue sourceNodes = new ArrayDeque<>(); + nodes.stream().filter(n -> !predecessorEdges.containsKey(n)).forEach(n -> sourceNodes.add(n)); + if (sourceNodes.isEmpty()) { + throw new IllegalArgumentException("No start node detected: all nodes have a predecessor."); + } + System.out.println("Start node(s): " + sourceNodes); + + // List to contain sorted elements + List sortedNodes = new ArrayList<>(); + // Keep adding successors + while (!sourceNodes.isEmpty()) { + ProcessNode n = sourceNodes.poll(); + sortedNodes.add(n); + if (predecessorEdges.containsKey(n)) { + n.setPredecessors(predecessorEdges.get(n).stream().map(e -> nodeMap.get(e.getSource())).collect(Collectors.toSet())); + } + // Add successors to the queue + for (ProcessSequenceEdge e : successorEdges.getOrDefault(n, Collections.emptySet())) { + graph.remove(e); + ProcessNode dest = nodeMap.get(e.getDestination()); + if (!sourceNodes.contains(dest) && !sortedNodes.contains(dest)) { + sourceNodes.add(dest); + } + } + } + if (!graph.isEmpty()) { + throw new IllegalArgumentException("Cycle detected: " + graph); + } + System.out.println("Execution sequence: " + sortedNodes); + return sortedNodes; + } +} From b8370132cd99f47a658cacf2e945735db528f8de Mon Sep 17 00:00:00 2001 From: Daniel Widdis Date: Fri, 8 Sep 2023 12:37:09 -0700 Subject: [PATCH 02/12] Add javadocs Signed-off-by: Daniel Widdis --- .../flowframework/template/Demo.java | 89 +++++++++++++++++++ .../flowframework/template/ProcessNode.java | 44 +++++++-- .../template/ProcessSequenceEdge.java | 19 ++++ .../template/TemplateParser.java | 80 +++-------------- 4 files changed, 160 insertions(+), 72 deletions(-) create mode 100644 src/main/java/org/opensearch/flowframework/template/Demo.java diff --git a/src/main/java/org/opensearch/flowframework/template/Demo.java b/src/main/java/org/opensearch/flowframework/template/Demo.java new file mode 100644 index 000000000..8c9afadd9 --- /dev/null +++ b/src/main/java/org/opensearch/flowframework/template/Demo.java @@ -0,0 +1,89 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ +package org.opensearch.flowframework.template; + +import java.util.ArrayList; +import java.util.List; +import java.util.Set; +import java.util.concurrent.CompletableFuture; +import java.util.stream.Collectors; + +/** + * Demo class exercising {@link TemplateParser}. This will be moved to a unit test. + */ +public class Demo { + + /** + * Demonstrate parsing a JSON graph. + * + * @param args unused + */ + public static void main(String[] args) { + String json = "{\n" + + " \"sequence\": {\n" + + " \"nodes\": [\n" + + " {\n" + + " \"id\": \"fetch_model\"\n" + + " },\n" + + " {\n" + + " \"id\": \"create_ingest_pipeline\"\n" + + " },\n" + + " {\n" + + " \"id\": \"create_search_pipeline\"\n" + + " },\n" + + " {\n" + + " \"id\": \"create_neural_search_index\"\n" + + " }\n" + + " ],\n" + + " \"edges\": [\n" + + " {\n" + + " \"source\": \"fetch_model\",\n" + + " \"dest\": \"create_ingest_pipeline\"\n" + + " },\n" + + " {\n" + + " \"source\": \"fetch_model\",\n" + + " \"dest\": \"create_search_pipeline\"\n" + + " },\n" + + " {\n" + + " \"source\": \"create_ingest_pipeline\",\n" + + " \"dest\": \"create_neural_search_index\"\n" + + " },\n" + + " {\n" + + " \"source\": \"create_search_pipeline\",\n" + + " \"dest\": \"create_neural_search_index\"\n" + + " }\n" + + " ]\n" + + " }\n" + + "}"; + + System.out.println(json); + + System.out.println("Parsing graph to sequence..."); + List processSequence = TemplateParser.parseJsonGraphToSequence(json); + List> futureList = new ArrayList<>(); + + for (ProcessNode n : processSequence) { + Set predecessors = n.getPredecessors(); + System.out.format( + "Queueing process [%s]. %s.%n", + n.getId(), + predecessors.isEmpty() + ? "Can start immediately!" + : String.format( + "Must wait for [%s] to complete first.", + predecessors.stream().map(p -> p.getId()).collect(Collectors.joining(", ")) + ) + ); + futureList.add(n.execute()); + } + futureList.forEach(CompletableFuture::join); + System.out.println("All done!"); + } + +} diff --git a/src/main/java/org/opensearch/flowframework/template/ProcessNode.java b/src/main/java/org/opensearch/flowframework/template/ProcessNode.java index e1b57fc51..88c0bfb74 100644 --- a/src/main/java/org/opensearch/flowframework/template/ProcessNode.java +++ b/src/main/java/org/opensearch/flowframework/template/ProcessNode.java @@ -17,33 +17,66 @@ import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; +/** + * Representation of a process node in a workflow graph. Tracks predecessor nodes which must be completed before it can start execution. + */ public class ProcessNode { private final String id; - private CompletableFuture future; + private CompletableFuture future = null; // will be populated during graph parsing private Set predecessors = Collections.emptySet(); + /** + * Create this node with a unique id. + */ ProcessNode(String id) { this.id = id; } + /** + * Returns the node's id. + * @return the node id. + */ public String getId() { return id; } + /** + * Returns a {@link CompletableFuture} if this process is executing. + * Relies on the node having been sorted and executed in an order such that all predecessor nodes have begun execution first (and thus populated this value). + * + * @return A future indicating the processing state of this node. + * Returns {@code null} if it has not begun executing, should not happen if a workflow is sorted and executed topologically. + */ public CompletableFuture getFuture() { return future; } + /** + * Returns the predecessors of this node in the workflow. + * The predecessor's {@link #getFuture()} must complete before execution begins on this node. + * + * @return a set of predecessor nodes, if any. At least one node in the graph must have no predecessors and serve as a start node. + */ public Set getPredecessors() { return predecessors; } - public void setPredecessors(Set predecessors) { + /** + * Sets the predecessor node. Called by {@link TemplateParser}. + * + * @param predecessors The predecessors of this node. + */ + void setPredecessors(Set predecessors) { this.predecessors = Set.copyOf(predecessors); } + /** + * Execute this node in the sequence. Initializes the node's {@link CompletableFuture} and completes it when the process completes. + * + * @return this node's future. This is returned immediately, while process execution continues asynchronously. + */ public CompletableFuture execute() { this.future = new CompletableFuture<>(); CompletableFuture.runAsync(() -> { @@ -60,16 +93,17 @@ public CompletableFuture execute() { return; } System.out.println(">>> Starting " + this.id); - sleep(id.contains("ingest") ? 8000 : 4000); + // TODO: Here is where we would call out to workflow step API + workflowExecute(this.id); System.out.println("<<< Finished " + this.id); future.complete(this.id); }); return this.future; } - private void sleep(long i) { + private void workflowExecute(String s) { try { - Thread.sleep(i); + Thread.sleep(s.contains("ingest") ? 8000 : 4000); } catch (InterruptedException e) {} } diff --git a/src/main/java/org/opensearch/flowframework/template/ProcessSequenceEdge.java b/src/main/java/org/opensearch/flowframework/template/ProcessSequenceEdge.java index 4d768958f..9544620fb 100644 --- a/src/main/java/org/opensearch/flowframework/template/ProcessSequenceEdge.java +++ b/src/main/java/org/opensearch/flowframework/template/ProcessSequenceEdge.java @@ -10,19 +10,38 @@ import java.util.Objects; +/** + * Representation of an edge between process nodes in a workflow graph. + */ public class ProcessSequenceEdge { private final String source; private final String destination; + /** + * Create this edge with the id's of the source and destination nodes. + * + * @param source The source node id. + * @param destination The destination node id. + */ ProcessSequenceEdge(String source, String destination) { this.source = source; this.destination = destination; } + /** + * Gets the source node id. + * + * @return the source node id. + */ public String getSource() { return source; } + /** + * Gets the destination node id. + * + * @return the destination node id. + */ public String getDestination() { return destination; } diff --git a/src/main/java/org/opensearch/flowframework/template/TemplateParser.java b/src/main/java/org/opensearch/flowframework/template/TemplateParser.java index 9d125d118..0448fafd3 100644 --- a/src/main/java/org/opensearch/flowframework/template/TemplateParser.java +++ b/src/main/java/org/opensearch/flowframework/template/TemplateParser.java @@ -21,79 +21,25 @@ import java.util.Map; import java.util.Queue; import java.util.Set; -import java.util.concurrent.CompletableFuture; import java.util.function.Function; import java.util.stream.Collectors; +/** + * Utility class for parsing templates. + */ public class TemplateParser { - public static void main(String[] args) { - String json = "{\n" - + " \"sequence\": {\n" - + " \"nodes\": [\n" - + " {\n" - + " \"id\": \"fetch_model\"\n" - + " },\n" - + " {\n" - + " \"id\": \"create_ingest_pipeline\"\n" - + " },\n" - + " {\n" - + " \"id\": \"create_search_pipeline\"\n" - + " },\n" - + " {\n" - + " \"id\": \"create_neural_search_index\"\n" - + " }\n" - + " ],\n" - + " \"edges\": [\n" - + " {\n" - + " \"source\": \"fetch_model\",\n" - + " \"dest\": \"create_ingest_pipeline\"\n" - + " },\n" - + " {\n" - + " \"source\": \"fetch_model\",\n" - + " \"dest\": \"create_search_pipeline\"\n" - + " },\n" - + " {\n" - + " \"source\": \"create_ingest_pipeline\",\n" - + " \"dest\": \"create_neural_search_index\"\n" - + " },\n" - + " {\n" - + " \"source\": \"create_search_pipeline\",\n" - + " \"dest\": \"create_neural_search_index\"\n" - // + " }\n," - // + " {\n" - // + " \"source\": \"create_neural_search_index\",\n" - // + " \"dest\": \"fetch_model\"\n" - + " }\n" - + " ]\n" - + " }\n" - + "}"; - - System.out.println(json); - - System.out.println("Parsing graph to sequence..."); - List processSequence = parseJsonGraphToSequence(json); - List> futureList = new ArrayList<>(); - - for (ProcessNode n : processSequence) { - Set predecessors = n.getPredecessors(); - System.out.format( - "Queueing process [%s]. %s.%n", - n.getId(), - predecessors.isEmpty() - ? "Can start immediately!" - : String.format( - "Must wait for [%s] to complete first.", - predecessors.stream().map(p -> p.getId()).collect(Collectors.joining(", ")) - ) - ); - futureList.add(n.execute()); - } - futureList.forEach(CompletableFuture::join); - System.out.println("All done!"); - } + /** + * Prevent instantiating this class. + */ + private TemplateParser() {} - private static List parseJsonGraphToSequence(String json) { + /** + * Parse a JSON representation of nodes and edges into a topologically sorted list of process nodes. + * @param json A string containing a JSON representation of nodes and edges + * @return A list of Process Nodes sorted topologically. All predecessors of any node will occur prior to it in the list. + */ + public static List parseJsonGraphToSequence(String json) { Gson gson = new Gson(); JsonObject jsonObject = gson.fromJson(json, JsonObject.class); From cc4cd643023c3da0d68b9ccd12139cf12a5f057a Mon Sep 17 00:00:00 2001 From: Daniel Widdis Date: Mon, 11 Sep 2023 13:28:08 -0700 Subject: [PATCH 03/12] Update demo to link to Workflow interface Signed-off-by: Daniel Widdis --- .../flowframework/template/Demo.java | 20 +++++++-- .../template/DemoWorkflowStep.java | 29 ++++++++++++ .../flowframework/template/ProcessNode.java | 45 ++++++++++++------- .../template/TemplateParser.java | 8 ++-- .../flowframework/FlowFrameworkPluginIT.java | 4 +- 5 files changed, 80 insertions(+), 26 deletions(-) create mode 100644 src/main/java/org/opensearch/flowframework/template/DemoWorkflowStep.java diff --git a/src/main/java/org/opensearch/flowframework/template/Demo.java b/src/main/java/org/opensearch/flowframework/template/Demo.java index 8c9afadd9..97f2e4227 100644 --- a/src/main/java/org/opensearch/flowframework/template/Demo.java +++ b/src/main/java/org/opensearch/flowframework/template/Demo.java @@ -8,8 +8,12 @@ */ package org.opensearch.flowframework.template; +import org.opensearch.flowframework.workflow.Workflow; + import java.util.ArrayList; +import java.util.HashMap; import java.util.List; +import java.util.Map; import java.util.Set; import java.util.concurrent.CompletableFuture; import java.util.stream.Collectors; @@ -19,6 +23,14 @@ */ public class Demo { + private static Map workflowMap = new HashMap<>(); + static { + workflowMap.put("fetch_model", new DemoWorkflowStep(3000)); + workflowMap.put("create_ingest_pipeline", new DemoWorkflowStep(4000)); + workflowMap.put("create_search_pipeline", new DemoWorkflowStep(8000)); + workflowMap.put("create_neural_search_index", new DemoWorkflowStep(2000)); + } + /** * Demonstrate parsing a JSON graph. * @@ -65,19 +77,19 @@ public static void main(String[] args) { System.out.println(json); System.out.println("Parsing graph to sequence..."); - List processSequence = TemplateParser.parseJsonGraphToSequence(json); - List> futureList = new ArrayList<>(); + List processSequence = TemplateParser.parseJsonGraphToSequence(json, workflowMap); + List> futureList = new ArrayList<>(); for (ProcessNode n : processSequence) { Set predecessors = n.getPredecessors(); System.out.format( "Queueing process [%s]. %s.%n", - n.getId(), + n.id(), predecessors.isEmpty() ? "Can start immediately!" : String.format( "Must wait for [%s] to complete first.", - predecessors.stream().map(p -> p.getId()).collect(Collectors.joining(", ")) + predecessors.stream().map(p -> p.id()).collect(Collectors.joining(", ")) ) ); futureList.add(n.execute()); diff --git a/src/main/java/org/opensearch/flowframework/template/DemoWorkflowStep.java b/src/main/java/org/opensearch/flowframework/template/DemoWorkflowStep.java new file mode 100644 index 000000000..877b05261 --- /dev/null +++ b/src/main/java/org/opensearch/flowframework/template/DemoWorkflowStep.java @@ -0,0 +1,29 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ +package org.opensearch.flowframework.template; + +import org.opensearch.flowframework.workflow.Workflow; + +import java.util.concurrent.CompletableFuture; + +public class DemoWorkflowStep implements Workflow { + + private final long delay; + + public DemoWorkflowStep(long delay) { + this.delay = delay; + } + + @Override + public CompletableFuture execute() throws Exception { + Thread.sleep(this.delay); + return null; + } + +} diff --git a/src/main/java/org/opensearch/flowframework/template/ProcessNode.java b/src/main/java/org/opensearch/flowframework/template/ProcessNode.java index 88c0bfb74..561fcef1b 100644 --- a/src/main/java/org/opensearch/flowframework/template/ProcessNode.java +++ b/src/main/java/org/opensearch/flowframework/template/ProcessNode.java @@ -8,6 +8,8 @@ */ package org.opensearch.flowframework.template; +import org.opensearch.flowframework.workflow.Workflow; + import java.util.Collections; import java.util.List; import java.util.Objects; @@ -22,26 +24,39 @@ */ public class ProcessNode { private final String id; - private CompletableFuture future = null; + private final Workflow workflow; + private CompletableFuture future = null; // will be populated during graph parsing private Set predecessors = Collections.emptySet(); /** - * Create this node with a unique id. + * Create this node linked to its executing process. + * + * @param id A string identifying the workflow step + * @param workflow A java class implementing {@link Workflow} to be executed when it's this node's turn. */ - ProcessNode(String id) { + ProcessNode(String id, Workflow workflow) { this.id = id; + this.workflow = workflow; } /** * Returns the node's id. * @return the node id. */ - public String getId() { + public String id() { return id; } + /** + * Returns the node's workflow implementation. + * @return the workflow step + */ + public Workflow workflow() { + return workflow; + } + /** * Returns a {@link CompletableFuture} if this process is executing. * Relies on the node having been sorted and executed in an order such that all predecessor nodes have begun execution first (and thus populated this value). @@ -49,7 +64,7 @@ public String getId() { * @return A future indicating the processing state of this node. * Returns {@code null} if it has not begun executing, should not happen if a workflow is sorted and executed topologically. */ - public CompletableFuture getFuture() { + public CompletableFuture getFuture() { return future; } @@ -77,11 +92,11 @@ void setPredecessors(Set predecessors) { * * @return this node's future. This is returned immediately, while process execution continues asynchronously. */ - public CompletableFuture execute() { + public CompletableFuture execute() { this.future = new CompletableFuture<>(); CompletableFuture.runAsync(() -> { if (!predecessors.isEmpty()) { - List> predFutures = predecessors.stream().map(p -> p.getFuture()).collect(Collectors.toList()); + List> predFutures = predecessors.stream().map(p -> p.getFuture()).collect(Collectors.toList()); CompletableFuture waitForPredecessors = CompletableFuture.allOf(predFutures.toArray(new CompletableFuture[0])); try { waitForPredecessors.orTimeout(30, TimeUnit.SECONDS).get(); @@ -93,20 +108,18 @@ public CompletableFuture execute() { return; } System.out.println(">>> Starting " + this.id); - // TODO: Here is where we would call out to workflow step API - workflowExecute(this.id); + try { + // TODO collect the future from this step and use it in our own completion + this.workflow.execute(); + } catch (Exception e) { + // TODO remove the exception on workflow, instead handle exceptional completion + } System.out.println("<<< Finished " + this.id); - future.complete(this.id); + future.complete(null); }); return this.future; } - private void workflowExecute(String s) { - try { - Thread.sleep(s.contains("ingest") ? 8000 : 4000); - } catch (InterruptedException e) {} - } - @Override public int hashCode() { return Objects.hash(id); diff --git a/src/main/java/org/opensearch/flowframework/template/TemplateParser.java b/src/main/java/org/opensearch/flowframework/template/TemplateParser.java index 0448fafd3..89c9eddb4 100644 --- a/src/main/java/org/opensearch/flowframework/template/TemplateParser.java +++ b/src/main/java/org/opensearch/flowframework/template/TemplateParser.java @@ -11,6 +11,7 @@ import com.google.gson.Gson; import com.google.gson.JsonElement; import com.google.gson.JsonObject; +import org.opensearch.flowframework.workflow.Workflow; import java.util.ArrayDeque; import java.util.ArrayList; @@ -37,9 +38,10 @@ private TemplateParser() {} /** * Parse a JSON representation of nodes and edges into a topologically sorted list of process nodes. * @param json A string containing a JSON representation of nodes and edges + * @param workflowSteps A map linking JSON node names to Java objects implementing {@link Workflow} * @return A list of Process Nodes sorted topologically. All predecessors of any node will occur prior to it in the list. */ - public static List parseJsonGraphToSequence(String json) { + public static List parseJsonGraphToSequence(String json, Map workflowSteps) { Gson gson = new Gson(); JsonObject jsonObject = gson.fromJson(json, JsonObject.class); @@ -51,7 +53,7 @@ public static List parseJsonGraphToSequence(String json) { for (JsonElement nodeJson : graph.getAsJsonArray("nodes")) { JsonObject nodeObject = nodeJson.getAsJsonObject(); String nodeId = nodeObject.get("id").getAsString(); - nodes.add(new ProcessNode(nodeId)); + nodes.add(new ProcessNode(nodeId, workflowSteps.get(nodeId))); } for (JsonElement edgeJson : graph.getAsJsonArray("edges")) { @@ -68,7 +70,7 @@ private static List topologicalSort(List nodes, List

graph = new HashSet<>(edges); // Map node id string to object - Map nodeMap = nodes.stream().collect(Collectors.toMap(ProcessNode::getId, Function.identity())); + Map nodeMap = nodes.stream().collect(Collectors.toMap(ProcessNode::id, Function.identity())); // Build predecessor and successor maps Map> predecessorEdges = new HashMap<>(); Map> successorEdges = new HashMap<>(); diff --git a/src/test/java/org/opensearch/flowframework/FlowFrameworkPluginIT.java b/src/test/java/org/opensearch/flowframework/FlowFrameworkPluginIT.java index d54dc2c63..0dccc27ce 100644 --- a/src/test/java/org/opensearch/flowframework/FlowFrameworkPluginIT.java +++ b/src/test/java/org/opensearch/flowframework/FlowFrameworkPluginIT.java @@ -22,8 +22,6 @@ import java.util.Collection; import java.util.Collections; -import static org.hamcrest.Matchers.containsString; - @ThreadLeakScope(ThreadLeakScope.Scope.NONE) @OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.SUITE) public class FlowFrameworkPluginIT extends OpenSearchIntegTestCase { @@ -38,6 +36,6 @@ public void testPluginInstalled() throws IOException, ParseException { String body = EntityUtils.toString(response.getEntity(), StandardCharsets.UTF_8); logger.info("response body: {}", body); - assertThat(body, containsString("flowframework")); + assertTrue(body.contains("flowframework")); } } From 6a8739198de9962d40a0ce6f510d723a6e51e1d4 Mon Sep 17 00:00:00 2001 From: Daniel Widdis Date: Tue, 12 Sep 2023 10:08:56 -0700 Subject: [PATCH 04/12] Replace System.out with logging Signed-off-by: Daniel Widdis --- .../flowframework/template/Demo.java | 69 +++++++------------ .../flowframework/template/ProcessNode.java | 9 ++- .../template/TemplateParser.java | 8 ++- src/main/resources/log4j2.xml | 17 +++++ src/test/resources/template/demo.json | 36 ++++++++++ 5 files changed, 89 insertions(+), 50 deletions(-) create mode 100644 src/main/resources/log4j2.xml create mode 100644 src/test/resources/template/demo.json diff --git a/src/main/java/org/opensearch/flowframework/template/Demo.java b/src/main/java/org/opensearch/flowframework/template/Demo.java index 97f2e4227..6dbdce9df 100644 --- a/src/main/java/org/opensearch/flowframework/template/Demo.java +++ b/src/main/java/org/opensearch/flowframework/template/Demo.java @@ -8,8 +8,13 @@ */ package org.opensearch.flowframework.template; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; import org.opensearch.flowframework.workflow.Workflow; +import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.Paths; import java.util.ArrayList; import java.util.HashMap; import java.util.List; @@ -23,11 +28,13 @@ */ public class Demo { + private static final Logger logger = LogManager.getLogger(Demo.class); + private static Map workflowMap = new HashMap<>(); static { workflowMap.put("fetch_model", new DemoWorkflowStep(3000)); - workflowMap.put("create_ingest_pipeline", new DemoWorkflowStep(4000)); - workflowMap.put("create_search_pipeline", new DemoWorkflowStep(8000)); + workflowMap.put("create_ingest_pipeline", new DemoWorkflowStep(3000)); + workflowMap.put("create_search_pipeline", new DemoWorkflowStep(5000)); workflowMap.put("create_neural_search_index", new DemoWorkflowStep(2000)); } @@ -37,65 +44,35 @@ public class Demo { * @param args unused */ public static void main(String[] args) { - String json = "{\n" - + " \"sequence\": {\n" - + " \"nodes\": [\n" - + " {\n" - + " \"id\": \"fetch_model\"\n" - + " },\n" - + " {\n" - + " \"id\": \"create_ingest_pipeline\"\n" - + " },\n" - + " {\n" - + " \"id\": \"create_search_pipeline\"\n" - + " },\n" - + " {\n" - + " \"id\": \"create_neural_search_index\"\n" - + " }\n" - + " ],\n" - + " \"edges\": [\n" - + " {\n" - + " \"source\": \"fetch_model\",\n" - + " \"dest\": \"create_ingest_pipeline\"\n" - + " },\n" - + " {\n" - + " \"source\": \"fetch_model\",\n" - + " \"dest\": \"create_search_pipeline\"\n" - + " },\n" - + " {\n" - + " \"source\": \"create_ingest_pipeline\",\n" - + " \"dest\": \"create_neural_search_index\"\n" - + " },\n" - + " {\n" - + " \"source\": \"create_search_pipeline\",\n" - + " \"dest\": \"create_neural_search_index\"\n" - + " }\n" - + " ]\n" - + " }\n" - + "}"; - - System.out.println(json); + String path = "src/test/resources/template/demo.json"; + String json; + try { + json = new String(Files.readAllBytes(Paths.get(path))); + } catch (IOException e) { + logger.error("Failed to read JSON at path {}", path); + return; + } - System.out.println("Parsing graph to sequence..."); + logger.info("Parsing graph to sequence..."); List processSequence = TemplateParser.parseJsonGraphToSequence(json, workflowMap); List> futureList = new ArrayList<>(); for (ProcessNode n : processSequence) { Set predecessors = n.getPredecessors(); - System.out.format( - "Queueing process [%s]. %s.%n", + logger.info( + "Queueing process [{}].{}", n.id(), predecessors.isEmpty() - ? "Can start immediately!" + ? " Can start immediately!" : String.format( - "Must wait for [%s] to complete first.", + " Must wait for [%s] to complete first.", predecessors.stream().map(p -> p.id()).collect(Collectors.joining(", ")) ) ); futureList.add(n.execute()); } futureList.forEach(CompletableFuture::join); - System.out.println("All done!"); + logger.info("All done!"); } } diff --git a/src/main/java/org/opensearch/flowframework/template/ProcessNode.java b/src/main/java/org/opensearch/flowframework/template/ProcessNode.java index 561fcef1b..928cd2cd1 100644 --- a/src/main/java/org/opensearch/flowframework/template/ProcessNode.java +++ b/src/main/java/org/opensearch/flowframework/template/ProcessNode.java @@ -8,6 +8,8 @@ */ package org.opensearch.flowframework.template; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; import org.opensearch.flowframework.workflow.Workflow; import java.util.Collections; @@ -23,6 +25,9 @@ * Representation of a process node in a workflow graph. Tracks predecessor nodes which must be completed before it can start execution. */ public class ProcessNode { + + private static final Logger logger = LogManager.getLogger(ProcessNode.class); + private final String id; private final Workflow workflow; private CompletableFuture future = null; @@ -107,14 +112,14 @@ public CompletableFuture execute() { if (future.isCompletedExceptionally()) { return; } - System.out.println(">>> Starting " + this.id); + logger.debug(">>> Starting {}", this.id); try { // TODO collect the future from this step and use it in our own completion this.workflow.execute(); } catch (Exception e) { // TODO remove the exception on workflow, instead handle exceptional completion } - System.out.println("<<< Finished " + this.id); + logger.debug("<<< Finished {}", this.id); future.complete(null); }); return this.future; diff --git a/src/main/java/org/opensearch/flowframework/template/TemplateParser.java b/src/main/java/org/opensearch/flowframework/template/TemplateParser.java index 89c9eddb4..e565593a7 100644 --- a/src/main/java/org/opensearch/flowframework/template/TemplateParser.java +++ b/src/main/java/org/opensearch/flowframework/template/TemplateParser.java @@ -11,6 +11,8 @@ import com.google.gson.Gson; import com.google.gson.JsonElement; import com.google.gson.JsonObject; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; import org.opensearch.flowframework.workflow.Workflow; import java.util.ArrayDeque; @@ -30,6 +32,8 @@ */ public class TemplateParser { + private static final Logger logger = LogManager.getLogger(TemplateParser.class); + /** * Prevent instantiating this class. */ @@ -87,7 +91,7 @@ private static List topologicalSort(List nodes, List

sortedNodes = new ArrayList<>(); @@ -110,7 +114,7 @@ private static List topologicalSort(List nodes, List

+ + + + + + + + + + + + + + + + \ No newline at end of file diff --git a/src/test/resources/template/demo.json b/src/test/resources/template/demo.json new file mode 100644 index 000000000..38f1d0644 --- /dev/null +++ b/src/test/resources/template/demo.json @@ -0,0 +1,36 @@ +{ + "sequence": { + "nodes": [ + { + "id": "fetch_model" + }, + { + "id": "create_ingest_pipeline" + }, + { + "id": "create_search_pipeline" + }, + { + "id": "create_neural_search_index" + } + ], + "edges": [ + { + "source": "fetch_model", + "dest": "create_ingest_pipeline" + }, + { + "source": "fetch_model", + "dest": "create_search_pipeline" + }, + { + "source": "create_ingest_pipeline", + "dest": "create_neural_search_index" + }, + { + "source": "create_search_pipeline", + "dest": "create_neural_search_index" + } + ] + } +} From 4b6a9fe45486e97358995042a0ba61c8b5b4a7c2 Mon Sep 17 00:00:00 2001 From: Daniel Widdis Date: Wed, 13 Sep 2023 16:34:58 -0700 Subject: [PATCH 05/12] Update with new interface signatures Signed-off-by: Daniel Widdis --- .../flowframework/template/Demo.java | 4 +- .../template/DemoWorkflowStep.java | 25 ++++++++++--- .../flowframework/template/ProcessNode.java | 37 ++++++++++--------- .../template/TemplateParser.java | 6 +-- src/main/resources/log4j2.xml | 2 +- 5 files changed, 46 insertions(+), 28 deletions(-) diff --git a/src/main/java/org/opensearch/flowframework/template/Demo.java b/src/main/java/org/opensearch/flowframework/template/Demo.java index 6dbdce9df..e42496f14 100644 --- a/src/main/java/org/opensearch/flowframework/template/Demo.java +++ b/src/main/java/org/opensearch/flowframework/template/Demo.java @@ -10,7 +10,7 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; -import org.opensearch.flowframework.workflow.Workflow; +import org.opensearch.flowframework.workflow.WorkflowStep; import java.io.IOException; import java.nio.file.Files; @@ -30,7 +30,7 @@ public class Demo { private static final Logger logger = LogManager.getLogger(Demo.class); - private static Map workflowMap = new HashMap<>(); + private static Map workflowMap = new HashMap<>(); static { workflowMap.put("fetch_model", new DemoWorkflowStep(3000)); workflowMap.put("create_ingest_pipeline", new DemoWorkflowStep(3000)); diff --git a/src/main/java/org/opensearch/flowframework/template/DemoWorkflowStep.java b/src/main/java/org/opensearch/flowframework/template/DemoWorkflowStep.java index 877b05261..6cc05b92e 100644 --- a/src/main/java/org/opensearch/flowframework/template/DemoWorkflowStep.java +++ b/src/main/java/org/opensearch/flowframework/template/DemoWorkflowStep.java @@ -8,22 +8,37 @@ */ package org.opensearch.flowframework.template; -import org.opensearch.flowframework.workflow.Workflow; +import org.opensearch.flowframework.workflow.WorkflowData; +import org.opensearch.flowframework.workflow.WorkflowStep; import java.util.concurrent.CompletableFuture; -public class DemoWorkflowStep implements Workflow { +public class DemoWorkflowStep implements WorkflowStep { private final long delay; + private final String name; public DemoWorkflowStep(long delay) { this.delay = delay; + this.name = "DEMO_DELAY_" + delay; } @Override - public CompletableFuture execute() throws Exception { - Thread.sleep(this.delay); - return null; + public CompletableFuture execute(WorkflowData data) { + CompletableFuture future = new CompletableFuture<>(); + CompletableFuture.runAsync(() -> { + try { + Thread.sleep(this.delay); + future.complete(null); + } catch (InterruptedException e) { + future.completeExceptionally(e); + } + }); + return future; } + @Override + public String getName() { + return name; + } } diff --git a/src/main/java/org/opensearch/flowframework/template/ProcessNode.java b/src/main/java/org/opensearch/flowframework/template/ProcessNode.java index 928cd2cd1..89e081afd 100644 --- a/src/main/java/org/opensearch/flowframework/template/ProcessNode.java +++ b/src/main/java/org/opensearch/flowframework/template/ProcessNode.java @@ -10,7 +10,8 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; -import org.opensearch.flowframework.workflow.Workflow; +import org.opensearch.flowframework.workflow.WorkflowData; +import org.opensearch.flowframework.workflow.WorkflowStep; import java.util.Collections; import java.util.List; @@ -29,8 +30,8 @@ public class ProcessNode { private static final Logger logger = LogManager.getLogger(ProcessNode.class); private final String id; - private final Workflow workflow; - private CompletableFuture future = null; + private final WorkflowStep workflowStep; + private CompletableFuture future = null; // will be populated during graph parsing private Set predecessors = Collections.emptySet(); @@ -39,11 +40,11 @@ public class ProcessNode { * Create this node linked to its executing process. * * @param id A string identifying the workflow step - * @param workflow A java class implementing {@link Workflow} to be executed when it's this node's turn. + * @param workflowStep A java class implementing {@link WorkflowStep} to be executed when it's this node's turn. */ - ProcessNode(String id, Workflow workflow) { + ProcessNode(String id, WorkflowStep workflowStep) { this.id = id; - this.workflow = workflow; + this.workflowStep = workflowStep; } /** @@ -58,8 +59,8 @@ public String id() { * Returns the node's workflow implementation. * @return the workflow step */ - public Workflow workflow() { - return workflow; + public WorkflowStep workflowStep() { + return workflowStep; } /** @@ -69,7 +70,7 @@ public Workflow workflow() { * @return A future indicating the processing state of this node. * Returns {@code null} if it has not begun executing, should not happen if a workflow is sorted and executed topologically. */ - public CompletableFuture getFuture() { + public CompletableFuture getFuture() { return future; } @@ -97,11 +98,11 @@ void setPredecessors(Set predecessors) { * * @return this node's future. This is returned immediately, while process execution continues asynchronously. */ - public CompletableFuture execute() { + public CompletableFuture execute() { this.future = new CompletableFuture<>(); CompletableFuture.runAsync(() -> { if (!predecessors.isEmpty()) { - List> predFutures = predecessors.stream().map(p -> p.getFuture()).collect(Collectors.toList()); + List> predFutures = predecessors.stream().map(p -> p.getFuture()).collect(Collectors.toList()); CompletableFuture waitForPredecessors = CompletableFuture.allOf(predFutures.toArray(new CompletableFuture[0])); try { waitForPredecessors.orTimeout(30, TimeUnit.SECONDS).get(); @@ -113,14 +114,16 @@ public CompletableFuture execute() { return; } logger.debug(">>> Starting {}", this.id); + CompletableFuture stepFuture = this.workflowStep.execute(null); + stepFuture.join(); try { - // TODO collect the future from this step and use it in our own completion - this.workflow.execute(); - } catch (Exception e) { - // TODO remove the exception on workflow, instead handle exceptional completion + future.complete(stepFuture.get()); + logger.debug("<<< Completed {}", this.id); + } catch (InterruptedException | ExecutionException e) { + // TODO: better handling of getCause + future.completeExceptionally(e); + logger.debug("<<< Completed Exceptionally {}", this.id); } - logger.debug("<<< Finished {}", this.id); - future.complete(null); }); return this.future; } diff --git a/src/main/java/org/opensearch/flowframework/template/TemplateParser.java b/src/main/java/org/opensearch/flowframework/template/TemplateParser.java index e565593a7..1f8e1dd68 100644 --- a/src/main/java/org/opensearch/flowframework/template/TemplateParser.java +++ b/src/main/java/org/opensearch/flowframework/template/TemplateParser.java @@ -13,7 +13,7 @@ import com.google.gson.JsonObject; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; -import org.opensearch.flowframework.workflow.Workflow; +import org.opensearch.flowframework.workflow.WorkflowStep; import java.util.ArrayDeque; import java.util.ArrayList; @@ -42,10 +42,10 @@ private TemplateParser() {} /** * Parse a JSON representation of nodes and edges into a topologically sorted list of process nodes. * @param json A string containing a JSON representation of nodes and edges - * @param workflowSteps A map linking JSON node names to Java objects implementing {@link Workflow} + * @param workflowSteps A map linking JSON node names to Java objects implementing {@link WorkflowStep} * @return A list of Process Nodes sorted topologically. All predecessors of any node will occur prior to it in the list. */ - public static List parseJsonGraphToSequence(String json, Map workflowSteps) { + public static List parseJsonGraphToSequence(String json, Map workflowSteps) { Gson gson = new Gson(); JsonObject jsonObject = gson.fromJson(json, JsonObject.class); diff --git a/src/main/resources/log4j2.xml b/src/main/resources/log4j2.xml index cbd33bbbb..21a4c6fa5 100644 --- a/src/main/resources/log4j2.xml +++ b/src/main/resources/log4j2.xml @@ -2,7 +2,7 @@ - + From d7211dc2eabc2ca10197819aaab3c4eae82e462f Mon Sep 17 00:00:00 2001 From: Daniel Widdis Date: Wed, 13 Sep 2023 17:34:58 -0700 Subject: [PATCH 06/12] Demo passing input data at parse-time Signed-off-by: Daniel Widdis --- .../template/CreateIndexRequestData.java | 42 +++++++++++ .../template/CreateIndexWorkflowStep.java | 63 ++++++++++++++++ .../flowframework/template/DataDemo.java | 75 +++++++++++++++++++ .../flowframework/template/Demo.java | 3 +- .../flowframework/template/ProcessNode.java | 8 +- .../template/TemplateParser.java | 8 +- .../flowframework/workflow/WorkflowData.java | 11 ++- .../workflow/WorkflowInputData.java | 24 ++++++ src/test/resources/template/datademo.json | 11 +++ 9 files changed, 239 insertions(+), 6 deletions(-) create mode 100644 src/main/java/org/opensearch/flowframework/template/CreateIndexRequestData.java create mode 100644 src/main/java/org/opensearch/flowframework/template/CreateIndexWorkflowStep.java create mode 100644 src/main/java/org/opensearch/flowframework/template/DataDemo.java create mode 100644 src/main/java/org/opensearch/flowframework/workflow/WorkflowInputData.java create mode 100644 src/test/resources/template/datademo.json diff --git a/src/main/java/org/opensearch/flowframework/template/CreateIndexRequestData.java b/src/main/java/org/opensearch/flowframework/template/CreateIndexRequestData.java new file mode 100644 index 000000000..e4594d0cf --- /dev/null +++ b/src/main/java/org/opensearch/flowframework/template/CreateIndexRequestData.java @@ -0,0 +1,42 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ +package org.opensearch.flowframework.template; + +import org.opensearch.action.admin.indices.create.CreateIndexRequest; +import org.opensearch.flowframework.workflow.WorkflowInputData; + +import java.util.HashMap; +import java.util.Map; + +public class CreateIndexRequestData implements WorkflowInputData { + + private Map params = new HashMap<>(); + private Map content = new HashMap<>(); + + public CreateIndexRequestData(CreateIndexRequest request) { + super(); + // See RestCreateIndexAction for source of param keys needed + params.put("index", request.index()); + // See CreateIndexRequest ParseFields for source of content keys needed + content.put("mappings", request.mappings()); + content.put("settings", request.settings()); + content.put("aliases", request.aliases()); + } + + @Override + public Map getContent() { + return Map.copyOf(content); + } + + @Override + public Map getParams() { + return Map.copyOf(params); + } + +} diff --git a/src/main/java/org/opensearch/flowframework/template/CreateIndexWorkflowStep.java b/src/main/java/org/opensearch/flowframework/template/CreateIndexWorkflowStep.java new file mode 100644 index 000000000..713ee343b --- /dev/null +++ b/src/main/java/org/opensearch/flowframework/template/CreateIndexWorkflowStep.java @@ -0,0 +1,63 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ +package org.opensearch.flowframework.template; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.opensearch.flowframework.workflow.WorkflowData; +import org.opensearch.flowframework.workflow.WorkflowStep; + +import java.util.Map; +import java.util.concurrent.CompletableFuture; + +public class CreateIndexWorkflowStep implements WorkflowStep { + + private static final Logger logger = LogManager.getLogger(CreateIndexWorkflowStep.class); + + private final String name; + private WorkflowData data = null; + + public CreateIndexWorkflowStep() { + this.name = "CREATE_INDEX"; + } + + @Override + public CompletableFuture execute(WorkflowData data) { + // TODO, need to handle this better, we are pre-running execute all the time so passing a param is silly + if (data != null) { + this.data = data; + } + CompletableFuture future = new CompletableFuture<>(); + CompletableFuture.runAsync(() -> { + if (this.data instanceof CreateIndexRequestData) { + Map params = ((CreateIndexRequestData) this.data).getParams(); + Map content = ((CreateIndexRequestData) this.data).getContent(); + logger.debug("Received params: {}, content: {}", params, content); + future.complete(null); + } else { + logger.debug("Wrong data type!"); + future.completeExceptionally(new IllegalArgumentException("Expected CreateIndexRequestData here.")); + } + }); + return future; + } + + @Override + public String getName() { + return name; + } + + public WorkflowData getData() { + return data; + } + + public void setData(WorkflowData data) { + this.data = data; + } +} diff --git a/src/main/java/org/opensearch/flowframework/template/DataDemo.java b/src/main/java/org/opensearch/flowframework/template/DataDemo.java new file mode 100644 index 000000000..0c817c269 --- /dev/null +++ b/src/main/java/org/opensearch/flowframework/template/DataDemo.java @@ -0,0 +1,75 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ +package org.opensearch.flowframework.template; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.opensearch.flowframework.workflow.WorkflowStep; + +import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.Paths; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.CompletableFuture; +import java.util.stream.Collectors; + +/** + * Demo class exercising {@link TemplateParser}. This will be moved to a unit test. + */ +public class DataDemo { + + private static final Logger logger = LogManager.getLogger(DataDemo.class); + + private static Map workflowMap = new HashMap<>(); + static { + workflowMap.put("create_index", new CreateIndexWorkflowStep()); + } + + /** + * Demonstrate parsing a JSON graph. + * + * @param args unused + */ + public static void main(String[] args) { + String path = "src/test/resources/template/datademo.json"; + String json; + try { + json = new String(Files.readAllBytes(Paths.get(path))); + } catch (IOException e) { + logger.error("Failed to read JSON at path {}", path); + return; + } + + logger.info("Parsing graph to sequence..."); + List processSequence = TemplateParser.parseJsonGraphToSequence(json, workflowMap); + List> futureList = new ArrayList<>(); + + for (ProcessNode n : processSequence) { + Set predecessors = n.getPredecessors(); + logger.info( + "Queueing process [{}].{}", + n.id(), + predecessors.isEmpty() + ? " Can start immediately!" + : String.format( + " Must wait for [%s] to complete first.", + predecessors.stream().map(p -> p.id()).collect(Collectors.joining(", ")) + ) + ); + futureList.add(n.execute(null)); + } + futureList.forEach(CompletableFuture::join); + logger.info("All done!"); + } + +} diff --git a/src/main/java/org/opensearch/flowframework/template/Demo.java b/src/main/java/org/opensearch/flowframework/template/Demo.java index e42496f14..00dcf5d72 100644 --- a/src/main/java/org/opensearch/flowframework/template/Demo.java +++ b/src/main/java/org/opensearch/flowframework/template/Demo.java @@ -69,7 +69,8 @@ public static void main(String[] args) { predecessors.stream().map(p -> p.id()).collect(Collectors.joining(", ")) ) ); - futureList.add(n.execute()); + // TODO need to handle this better, passing an argument when we start them all at the beginning is silly + futureList.add(n.execute(null)); } futureList.forEach(CompletableFuture::join); logger.info("All done!"); diff --git a/src/main/java/org/opensearch/flowframework/template/ProcessNode.java b/src/main/java/org/opensearch/flowframework/template/ProcessNode.java index 89e081afd..7bc6c9266 100644 --- a/src/main/java/org/opensearch/flowframework/template/ProcessNode.java +++ b/src/main/java/org/opensearch/flowframework/template/ProcessNode.java @@ -98,11 +98,13 @@ void setPredecessors(Set predecessors) { * * @return this node's future. This is returned immediately, while process execution continues asynchronously. */ - public CompletableFuture execute() { + public CompletableFuture execute(WorkflowData data) { this.future = new CompletableFuture<>(); CompletableFuture.runAsync(() -> { if (!predecessors.isEmpty()) { - List> predFutures = predecessors.stream().map(p -> p.getFuture()).collect(Collectors.toList()); + List> predFutures = predecessors.stream() + .map(p -> p.getFuture()) + .collect(Collectors.toList()); CompletableFuture waitForPredecessors = CompletableFuture.allOf(predFutures.toArray(new CompletableFuture[0])); try { waitForPredecessors.orTimeout(30, TimeUnit.SECONDS).get(); @@ -114,7 +116,7 @@ public CompletableFuture execute() { return; } logger.debug(">>> Starting {}", this.id); - CompletableFuture stepFuture = this.workflowStep.execute(null); + CompletableFuture stepFuture = this.workflowStep.execute(data); stepFuture.join(); try { future.complete(stepFuture.get()); diff --git a/src/main/java/org/opensearch/flowframework/template/TemplateParser.java b/src/main/java/org/opensearch/flowframework/template/TemplateParser.java index 1f8e1dd68..ed3f8e601 100644 --- a/src/main/java/org/opensearch/flowframework/template/TemplateParser.java +++ b/src/main/java/org/opensearch/flowframework/template/TemplateParser.java @@ -13,6 +13,7 @@ import com.google.gson.JsonObject; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; +import org.opensearch.action.admin.indices.create.CreateIndexRequest; import org.opensearch.flowframework.workflow.WorkflowStep; import java.util.ArrayDeque; @@ -57,7 +58,12 @@ public static List parseJsonGraphToSequence(String json, Map getContent(); +} diff --git a/src/main/java/org/opensearch/flowframework/workflow/WorkflowInputData.java b/src/main/java/org/opensearch/flowframework/workflow/WorkflowInputData.java new file mode 100644 index 000000000..86e1179cf --- /dev/null +++ b/src/main/java/org/opensearch/flowframework/workflow/WorkflowInputData.java @@ -0,0 +1,24 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ +package org.opensearch.flowframework.workflow; + +import java.util.Map; + +/** + * Interface for handling the input of the building blocks. + */ +public interface WorkflowInputData extends WorkflowData { + + /** + * Accesses a map containing the params of this workflow step. This represents the params associated with a Rest API request, parsed from the URI. + * @return the params of this step. + */ + Map getParams(); + +} diff --git a/src/test/resources/template/datademo.json b/src/test/resources/template/datademo.json new file mode 100644 index 000000000..bbe4fb441 --- /dev/null +++ b/src/test/resources/template/datademo.json @@ -0,0 +1,11 @@ +{ + "sequence": { + "nodes": [ + { + "id": "create_index", + "index_name": "demo" + } + ], + "edges": [] + } +} From 170e0adaa876ef381e4bc3b467ac261f8878675d Mon Sep 17 00:00:00 2001 From: Daniel Widdis Date: Wed, 13 Sep 2023 19:10:01 -0700 Subject: [PATCH 07/12] Demo passing data in between steps Signed-off-by: Daniel Widdis --- .../template/CreateIndexResponseData.java | 31 +++++++++++++ .../template/CreateIndexWorkflowStep.java | 43 ++++++++++--------- .../flowframework/template/DataDemo.java | 5 ++- .../flowframework/template/Demo.java | 4 +- .../template/DemoWorkflowStep.java | 2 +- .../flowframework/template/ProcessNode.java | 24 +++++++---- .../template/TemplateParser.java | 2 +- .../flowframework/workflow/WorkflowStep.java | 6 +-- src/test/resources/template/datademo.json | 11 ++++- 9 files changed, 91 insertions(+), 37 deletions(-) create mode 100644 src/main/java/org/opensearch/flowframework/template/CreateIndexResponseData.java diff --git a/src/main/java/org/opensearch/flowframework/template/CreateIndexResponseData.java b/src/main/java/org/opensearch/flowframework/template/CreateIndexResponseData.java new file mode 100644 index 000000000..2cc71edf8 --- /dev/null +++ b/src/main/java/org/opensearch/flowframework/template/CreateIndexResponseData.java @@ -0,0 +1,31 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ +package org.opensearch.flowframework.template; + +import org.opensearch.action.admin.indices.create.CreateIndexResponse; +import org.opensearch.flowframework.workflow.WorkflowData; + +import java.util.HashMap; +import java.util.Map; + +public class CreateIndexResponseData implements WorkflowData { + + private Map content = new HashMap<>(); + + public CreateIndexResponseData(CreateIndexResponse response) { + super(); + // See CreateIndexResponse ParseFields for source of content keys needed + content.put("index", response.index()); + } + + @Override + public Map getContent() { + return Map.copyOf(content); + } +} diff --git a/src/main/java/org/opensearch/flowframework/template/CreateIndexWorkflowStep.java b/src/main/java/org/opensearch/flowframework/template/CreateIndexWorkflowStep.java index 713ee343b..ae01c42f0 100644 --- a/src/main/java/org/opensearch/flowframework/template/CreateIndexWorkflowStep.java +++ b/src/main/java/org/opensearch/flowframework/template/CreateIndexWorkflowStep.java @@ -10,7 +10,9 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; +import org.opensearch.action.admin.indices.create.CreateIndexResponse; import org.opensearch.flowframework.workflow.WorkflowData; +import org.opensearch.flowframework.workflow.WorkflowInputData; import org.opensearch.flowframework.workflow.WorkflowStep; import java.util.Map; @@ -21,30 +23,35 @@ public class CreateIndexWorkflowStep implements WorkflowStep { private static final Logger logger = LogManager.getLogger(CreateIndexWorkflowStep.class); private final String name; - private WorkflowData data = null; + private WorkflowInputData inputData = null; public CreateIndexWorkflowStep() { this.name = "CREATE_INDEX"; } @Override - public CompletableFuture execute(WorkflowData data) { - // TODO, need to handle this better, we are pre-running execute all the time so passing a param is silly - if (data != null) { - this.data = data; - } + public CompletableFuture execute(WorkflowData... data) { + logger.debug("Executing with " + data.length + " args"); CompletableFuture future = new CompletableFuture<>(); CompletableFuture.runAsync(() -> { - if (this.data instanceof CreateIndexRequestData) { - Map params = ((CreateIndexRequestData) this.data).getParams(); - Map content = ((CreateIndexRequestData) this.data).getContent(); - logger.debug("Received params: {}, content: {}", params, content); - future.complete(null); - } else { - logger.debug("Wrong data type!"); - future.completeExceptionally(new IllegalArgumentException("Expected CreateIndexRequestData here.")); + if (inputData != null) { + Map params = inputData.getParams(); + Map content = inputData.getContent(); + logger.debug("Initialized params: {}, content: {}", params, content); } + for (WorkflowData wfData : data) { + Map content = wfData.getContent(); + logger.debug("Received content from previous step: {}", content); + } + // do some work + try { + Thread.sleep(2000); + } catch (InterruptedException e) {} + // Simulate response of created index + CreateIndexResponse response = new CreateIndexResponse(true, true, inputData.getParams().get("index")); + future.complete(new CreateIndexResponseData(response)); }); + return future; } @@ -53,11 +60,7 @@ public String getName() { return name; } - public WorkflowData getData() { - return data; - } - - public void setData(WorkflowData data) { - this.data = data; + public void setInput(WorkflowInputData data) { + this.inputData = data; } } diff --git a/src/main/java/org/opensearch/flowframework/template/DataDemo.java b/src/main/java/org/opensearch/flowframework/template/DataDemo.java index 0c817c269..2a425e12c 100644 --- a/src/main/java/org/opensearch/flowframework/template/DataDemo.java +++ b/src/main/java/org/opensearch/flowframework/template/DataDemo.java @@ -30,9 +30,12 @@ public class DataDemo { private static final Logger logger = LogManager.getLogger(DataDemo.class); + // This is temporary. We need a factory class to generate these workflow steps + // based on a field in the JSON. private static Map workflowMap = new HashMap<>(); static { workflowMap.put("create_index", new CreateIndexWorkflowStep()); + workflowMap.put("create_another_index", new CreateIndexWorkflowStep()); } /** @@ -66,7 +69,7 @@ public static void main(String[] args) { predecessors.stream().map(p -> p.id()).collect(Collectors.joining(", ")) ) ); - futureList.add(n.execute(null)); + futureList.add(n.execute()); } futureList.forEach(CompletableFuture::join); logger.info("All done!"); diff --git a/src/main/java/org/opensearch/flowframework/template/Demo.java b/src/main/java/org/opensearch/flowframework/template/Demo.java index 00dcf5d72..ef334794b 100644 --- a/src/main/java/org/opensearch/flowframework/template/Demo.java +++ b/src/main/java/org/opensearch/flowframework/template/Demo.java @@ -30,6 +30,8 @@ public class Demo { private static final Logger logger = LogManager.getLogger(Demo.class); + // This is temporary. We need a factory class to generate these workflow steps + // based on a field in the JSON. private static Map workflowMap = new HashMap<>(); static { workflowMap.put("fetch_model", new DemoWorkflowStep(3000)); @@ -70,7 +72,7 @@ public static void main(String[] args) { ) ); // TODO need to handle this better, passing an argument when we start them all at the beginning is silly - futureList.add(n.execute(null)); + futureList.add(n.execute()); } futureList.forEach(CompletableFuture::join); logger.info("All done!"); diff --git a/src/main/java/org/opensearch/flowframework/template/DemoWorkflowStep.java b/src/main/java/org/opensearch/flowframework/template/DemoWorkflowStep.java index 6cc05b92e..07ee9b193 100644 --- a/src/main/java/org/opensearch/flowframework/template/DemoWorkflowStep.java +++ b/src/main/java/org/opensearch/flowframework/template/DemoWorkflowStep.java @@ -24,7 +24,7 @@ public DemoWorkflowStep(long delay) { } @Override - public CompletableFuture execute(WorkflowData data) { + public CompletableFuture execute(WorkflowData... data) { CompletableFuture future = new CompletableFuture<>(); CompletableFuture.runAsync(() -> { try { diff --git a/src/main/java/org/opensearch/flowframework/template/ProcessNode.java b/src/main/java/org/opensearch/flowframework/template/ProcessNode.java index 7bc6c9266..80c1e151e 100644 --- a/src/main/java/org/opensearch/flowframework/template/ProcessNode.java +++ b/src/main/java/org/opensearch/flowframework/template/ProcessNode.java @@ -98,27 +98,33 @@ void setPredecessors(Set predecessors) { * * @return this node's future. This is returned immediately, while process execution continues asynchronously. */ - public CompletableFuture execute(WorkflowData data) { + public CompletableFuture execute() { this.future = new CompletableFuture<>(); CompletableFuture.runAsync(() -> { + List> predFutures = predecessors.stream().map(p -> p.getFuture()).collect(Collectors.toList()); if (!predecessors.isEmpty()) { - List> predFutures = predecessors.stream() - .map(p -> p.getFuture()) - .collect(Collectors.toList()); CompletableFuture waitForPredecessors = CompletableFuture.allOf(predFutures.toArray(new CompletableFuture[0])); try { waitForPredecessors.orTimeout(30, TimeUnit.SECONDS).get(); } catch (InterruptedException | ExecutionException e) { future.completeExceptionally(e); + return; } } - if (future.isCompletedExceptionally()) { - return; + logger.debug(">>> Starting {}.", this.id); + // get the input data from predecessor(s) + WorkflowData[] input = new WorkflowData[predFutures.size()]; + for (int i = 0; i < predFutures.size(); i++) { + try { + input[i] = predFutures.get(i).get(); + } catch (InterruptedException | ExecutionException e) { + future.completeExceptionally(e); + return; + } } - logger.debug(">>> Starting {}", this.id); - CompletableFuture stepFuture = this.workflowStep.execute(data); - stepFuture.join(); + CompletableFuture stepFuture = this.workflowStep.execute(input); try { + stepFuture.join(); future.complete(stepFuture.get()); logger.debug("<<< Completed {}", this.id); } catch (InterruptedException | ExecutionException e) { diff --git a/src/main/java/org/opensearch/flowframework/template/TemplateParser.java b/src/main/java/org/opensearch/flowframework/template/TemplateParser.java index ed3f8e601..9db98e415 100644 --- a/src/main/java/org/opensearch/flowframework/template/TemplateParser.java +++ b/src/main/java/org/opensearch/flowframework/template/TemplateParser.java @@ -61,7 +61,7 @@ public static List parseJsonGraphToSequence(String json, Map execute(@Nullable WorkflowData data); + CompletableFuture execute(@Nullable WorkflowData... data); /** * diff --git a/src/test/resources/template/datademo.json b/src/test/resources/template/datademo.json index bbe4fb441..966f37f13 100644 --- a/src/test/resources/template/datademo.json +++ b/src/test/resources/template/datademo.json @@ -4,8 +4,17 @@ { "id": "create_index", "index_name": "demo" + }, + { + "id": "create_another_index", + "index_name": "second_demo" } ], - "edges": [] + "edges": [ + { + "source": "create_index", + "dest": "create_another_index" + } + ] } } From 533431e5173a4503b5f9c074659192d4517e2a36 Mon Sep 17 00:00:00 2001 From: Daniel Widdis Date: Thu, 14 Sep 2023 08:43:59 -0700 Subject: [PATCH 08/12] Change execute arg to list and refactor demo classes to own package Signed-off-by: Daniel Widdis --- .../CreateIndexRequestData.java | 2 +- .../CreateIndexResponseData.java | 2 +- .../CreateIndexWorkflowStep.java | 21 ++++++++++++------- .../template => demo}/DataDemo.java | 4 +++- .../flowframework/template => demo}/Demo.java | 4 +++- .../template => demo}/DemoWorkflowStep.java | 5 +++-- .../flowframework/template/ProcessNode.java | 21 ++++++++++++------- .../template/TemplateParser.java | 3 +++ .../flowframework/workflow/WorkflowStep.java | 11 +++++----- 9 files changed, 46 insertions(+), 27 deletions(-) rename src/main/java/{org/opensearch/flowframework/template => demo}/CreateIndexRequestData.java (96%) rename src/main/java/{org/opensearch/flowframework/template => demo}/CreateIndexResponseData.java (94%) rename src/main/java/{org/opensearch/flowframework/template => demo}/CreateIndexWorkflowStep.java (74%) rename src/main/java/{org/opensearch/flowframework/template => demo}/DataDemo.java (95%) rename src/main/java/{org/opensearch/flowframework/template => demo}/Demo.java (95%) rename src/main/java/{org/opensearch/flowframework/template => demo}/DemoWorkflowStep.java (90%) diff --git a/src/main/java/org/opensearch/flowframework/template/CreateIndexRequestData.java b/src/main/java/demo/CreateIndexRequestData.java similarity index 96% rename from src/main/java/org/opensearch/flowframework/template/CreateIndexRequestData.java rename to src/main/java/demo/CreateIndexRequestData.java index e4594d0cf..7d1dae2c3 100644 --- a/src/main/java/org/opensearch/flowframework/template/CreateIndexRequestData.java +++ b/src/main/java/demo/CreateIndexRequestData.java @@ -6,7 +6,7 @@ * this file be licensed under the Apache-2.0 license or a * compatible open source license. */ -package org.opensearch.flowframework.template; +package demo; import org.opensearch.action.admin.indices.create.CreateIndexRequest; import org.opensearch.flowframework.workflow.WorkflowInputData; diff --git a/src/main/java/org/opensearch/flowframework/template/CreateIndexResponseData.java b/src/main/java/demo/CreateIndexResponseData.java similarity index 94% rename from src/main/java/org/opensearch/flowframework/template/CreateIndexResponseData.java rename to src/main/java/demo/CreateIndexResponseData.java index 2cc71edf8..7c2928724 100644 --- a/src/main/java/org/opensearch/flowframework/template/CreateIndexResponseData.java +++ b/src/main/java/demo/CreateIndexResponseData.java @@ -6,7 +6,7 @@ * this file be licensed under the Apache-2.0 license or a * compatible open source license. */ -package org.opensearch.flowframework.template; +package demo; import org.opensearch.action.admin.indices.create.CreateIndexResponse; import org.opensearch.flowframework.workflow.WorkflowData; diff --git a/src/main/java/org/opensearch/flowframework/template/CreateIndexWorkflowStep.java b/src/main/java/demo/CreateIndexWorkflowStep.java similarity index 74% rename from src/main/java/org/opensearch/flowframework/template/CreateIndexWorkflowStep.java rename to src/main/java/demo/CreateIndexWorkflowStep.java index ae01c42f0..6b0243e37 100644 --- a/src/main/java/org/opensearch/flowframework/template/CreateIndexWorkflowStep.java +++ b/src/main/java/demo/CreateIndexWorkflowStep.java @@ -6,7 +6,7 @@ * this file be licensed under the Apache-2.0 license or a * compatible open source license. */ -package org.opensearch.flowframework.template; +package demo; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; @@ -15,6 +15,7 @@ import org.opensearch.flowframework.workflow.WorkflowInputData; import org.opensearch.flowframework.workflow.WorkflowStep; +import java.util.List; import java.util.Map; import java.util.concurrent.CompletableFuture; @@ -30,18 +31,24 @@ public CreateIndexWorkflowStep() { } @Override - public CompletableFuture execute(WorkflowData... data) { - logger.debug("Executing with " + data.length + " args"); + public CompletableFuture execute(List data) { + logger.debug("Executing with {} input args.", data.size()); CompletableFuture future = new CompletableFuture<>(); CompletableFuture.runAsync(() -> { if (inputData != null) { - Map params = inputData.getParams(); - Map content = inputData.getContent(); - logger.debug("Initialized params: {}, content: {}", params, content); + logger.debug("Initialized params: {}, content: {}", inputData.getParams(), inputData.getContent()); } for (WorkflowData wfData : data) { Map content = wfData.getContent(); - logger.debug("Received content from previous step: {}", content); + if (wfData instanceof WorkflowInputData) { + logger.debug( + "Previous step sent params: {}, content: {}", + ((WorkflowInputData) wfData).getParams(), + wfData.getContent() + ); + } else { + logger.debug("Received from previous step: content: {}", content); + } } // do some work try { diff --git a/src/main/java/org/opensearch/flowframework/template/DataDemo.java b/src/main/java/demo/DataDemo.java similarity index 95% rename from src/main/java/org/opensearch/flowframework/template/DataDemo.java rename to src/main/java/demo/DataDemo.java index 2a425e12c..cedf0f5e9 100644 --- a/src/main/java/org/opensearch/flowframework/template/DataDemo.java +++ b/src/main/java/demo/DataDemo.java @@ -6,10 +6,12 @@ * this file be licensed under the Apache-2.0 license or a * compatible open source license. */ -package org.opensearch.flowframework.template; +package demo; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; +import org.opensearch.flowframework.template.ProcessNode; +import org.opensearch.flowframework.template.TemplateParser; import org.opensearch.flowframework.workflow.WorkflowStep; import java.io.IOException; diff --git a/src/main/java/org/opensearch/flowframework/template/Demo.java b/src/main/java/demo/Demo.java similarity index 95% rename from src/main/java/org/opensearch/flowframework/template/Demo.java rename to src/main/java/demo/Demo.java index ef334794b..0dba03169 100644 --- a/src/main/java/org/opensearch/flowframework/template/Demo.java +++ b/src/main/java/demo/Demo.java @@ -6,10 +6,12 @@ * this file be licensed under the Apache-2.0 license or a * compatible open source license. */ -package org.opensearch.flowframework.template; +package demo; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; +import org.opensearch.flowframework.template.ProcessNode; +import org.opensearch.flowframework.template.TemplateParser; import org.opensearch.flowframework.workflow.WorkflowStep; import java.io.IOException; diff --git a/src/main/java/org/opensearch/flowframework/template/DemoWorkflowStep.java b/src/main/java/demo/DemoWorkflowStep.java similarity index 90% rename from src/main/java/org/opensearch/flowframework/template/DemoWorkflowStep.java rename to src/main/java/demo/DemoWorkflowStep.java index 07ee9b193..866928f7c 100644 --- a/src/main/java/org/opensearch/flowframework/template/DemoWorkflowStep.java +++ b/src/main/java/demo/DemoWorkflowStep.java @@ -6,11 +6,12 @@ * this file be licensed under the Apache-2.0 license or a * compatible open source license. */ -package org.opensearch.flowframework.template; +package demo; import org.opensearch.flowframework.workflow.WorkflowData; import org.opensearch.flowframework.workflow.WorkflowStep; +import java.util.List; import java.util.concurrent.CompletableFuture; public class DemoWorkflowStep implements WorkflowStep { @@ -24,7 +25,7 @@ public DemoWorkflowStep(long delay) { } @Override - public CompletableFuture execute(WorkflowData... data) { + public CompletableFuture execute(List data) { CompletableFuture future = new CompletableFuture<>(); CompletableFuture.runAsync(() -> { try { diff --git a/src/main/java/org/opensearch/flowframework/template/ProcessNode.java b/src/main/java/org/opensearch/flowframework/template/ProcessNode.java index 80c1e151e..bc71da0aa 100644 --- a/src/main/java/org/opensearch/flowframework/template/ProcessNode.java +++ b/src/main/java/org/opensearch/flowframework/template/ProcessNode.java @@ -13,6 +13,7 @@ import org.opensearch.flowframework.workflow.WorkflowData; import org.opensearch.flowframework.workflow.WorkflowStep; +import java.util.ArrayList; import java.util.Collections; import java.util.List; import java.util.Objects; @@ -107,18 +108,18 @@ public CompletableFuture execute() { try { waitForPredecessors.orTimeout(30, TimeUnit.SECONDS).get(); } catch (InterruptedException | ExecutionException e) { - future.completeExceptionally(e); + handleException(e); return; } } logger.debug(">>> Starting {}.", this.id); // get the input data from predecessor(s) - WorkflowData[] input = new WorkflowData[predFutures.size()]; - for (int i = 0; i < predFutures.size(); i++) { + List input = new ArrayList(); + for (CompletableFuture cf : predFutures) { try { - input[i] = predFutures.get(i).get(); + input.add(cf.get()); } catch (InterruptedException | ExecutionException e) { - future.completeExceptionally(e); + handleException(e); return; } } @@ -128,14 +129,18 @@ public CompletableFuture execute() { future.complete(stepFuture.get()); logger.debug("<<< Completed {}", this.id); } catch (InterruptedException | ExecutionException e) { - // TODO: better handling of getCause - future.completeExceptionally(e); - logger.debug("<<< Completed Exceptionally {}", this.id); + handleException(e); } }); return this.future; } + private void handleException(Exception e) { + // TODO: better handling of getCause + this.future.completeExceptionally(e); + logger.debug("<<< Completed Exceptionally {}", this.id); + } + @Override public int hashCode() { return Objects.hash(id); diff --git a/src/main/java/org/opensearch/flowframework/template/TemplateParser.java b/src/main/java/org/opensearch/flowframework/template/TemplateParser.java index 9db98e415..cec956af3 100644 --- a/src/main/java/org/opensearch/flowframework/template/TemplateParser.java +++ b/src/main/java/org/opensearch/flowframework/template/TemplateParser.java @@ -28,6 +28,9 @@ import java.util.function.Function; import java.util.stream.Collectors; +import demo.CreateIndexRequestData; +import demo.CreateIndexWorkflowStep; + /** * Utility class for parsing templates. */ diff --git a/src/main/java/org/opensearch/flowframework/workflow/WorkflowStep.java b/src/main/java/org/opensearch/flowframework/workflow/WorkflowStep.java index 3917f7c36..ea3814a91 100644 --- a/src/main/java/org/opensearch/flowframework/workflow/WorkflowStep.java +++ b/src/main/java/org/opensearch/flowframework/workflow/WorkflowStep.java @@ -8,8 +8,7 @@ */ package org.opensearch.flowframework.workflow; -import org.opensearch.common.Nullable; - +import java.util.List; import java.util.concurrent.CompletableFuture; /** @@ -18,11 +17,11 @@ public interface WorkflowStep { /** - * Triggers the processing of the building block. - * @param data for input params of the building blocks. - * @return CompletableFuture of the building block containing its output data. + * Triggers the actual processing of the building block. + * @param data representing input params and content, or output content of previous steps. + * @return A CompletableFuture of the building block. This block should return immediately, but not be completed until the step executes, containing the step's output data which may be passed to follow-on steps. */ - CompletableFuture execute(@Nullable WorkflowData... data); + CompletableFuture execute(List data); /** * From 10c09d6ed85c7e92ab5884068de62c60f8a8a6d2 Mon Sep 17 00:00:00 2001 From: Daniel Widdis Date: Thu, 14 Sep 2023 17:06:46 -0700 Subject: [PATCH 09/12] Significantly simplify input/output data passing Signed-off-by: Daniel Widdis --- .../java/demo/CreateIndexRequestData.java | 42 ----------------- .../java/demo/CreateIndexResponseData.java | 31 ------------- .../java/demo/CreateIndexWorkflowStep.java | 46 ++++++++++--------- src/main/java/demo/README.txt | 13 ++++++ .../flowframework/template/ProcessNode.java | 22 +++++++++ .../template/TemplateParser.java | 32 ++++++++++--- .../flowframework/workflow/WorkflowData.java | 21 ++++++++- .../workflow/WorkflowInputData.java | 24 ---------- .../flowframework/workflow/WorkflowStep.java | 4 +- src/test/resources/template/datademo.json | 8 ++-- 10 files changed, 110 insertions(+), 133 deletions(-) delete mode 100644 src/main/java/demo/CreateIndexRequestData.java delete mode 100644 src/main/java/demo/CreateIndexResponseData.java create mode 100644 src/main/java/demo/README.txt delete mode 100644 src/main/java/org/opensearch/flowframework/workflow/WorkflowInputData.java diff --git a/src/main/java/demo/CreateIndexRequestData.java b/src/main/java/demo/CreateIndexRequestData.java deleted file mode 100644 index 7d1dae2c3..000000000 --- a/src/main/java/demo/CreateIndexRequestData.java +++ /dev/null @@ -1,42 +0,0 @@ -/* - * Copyright OpenSearch Contributors - * SPDX-License-Identifier: Apache-2.0 - * - * The OpenSearch Contributors require contributions made to - * this file be licensed under the Apache-2.0 license or a - * compatible open source license. - */ -package demo; - -import org.opensearch.action.admin.indices.create.CreateIndexRequest; -import org.opensearch.flowframework.workflow.WorkflowInputData; - -import java.util.HashMap; -import java.util.Map; - -public class CreateIndexRequestData implements WorkflowInputData { - - private Map params = new HashMap<>(); - private Map content = new HashMap<>(); - - public CreateIndexRequestData(CreateIndexRequest request) { - super(); - // See RestCreateIndexAction for source of param keys needed - params.put("index", request.index()); - // See CreateIndexRequest ParseFields for source of content keys needed - content.put("mappings", request.mappings()); - content.put("settings", request.settings()); - content.put("aliases", request.aliases()); - } - - @Override - public Map getContent() { - return Map.copyOf(content); - } - - @Override - public Map getParams() { - return Map.copyOf(params); - } - -} diff --git a/src/main/java/demo/CreateIndexResponseData.java b/src/main/java/demo/CreateIndexResponseData.java deleted file mode 100644 index 7c2928724..000000000 --- a/src/main/java/demo/CreateIndexResponseData.java +++ /dev/null @@ -1,31 +0,0 @@ -/* - * Copyright OpenSearch Contributors - * SPDX-License-Identifier: Apache-2.0 - * - * The OpenSearch Contributors require contributions made to - * this file be licensed under the Apache-2.0 license or a - * compatible open source license. - */ -package demo; - -import org.opensearch.action.admin.indices.create.CreateIndexResponse; -import org.opensearch.flowframework.workflow.WorkflowData; - -import java.util.HashMap; -import java.util.Map; - -public class CreateIndexResponseData implements WorkflowData { - - private Map content = new HashMap<>(); - - public CreateIndexResponseData(CreateIndexResponse response) { - super(); - // See CreateIndexResponse ParseFields for source of content keys needed - content.put("index", response.index()); - } - - @Override - public Map getContent() { - return Map.copyOf(content); - } -} diff --git a/src/main/java/demo/CreateIndexWorkflowStep.java b/src/main/java/demo/CreateIndexWorkflowStep.java index 6b0243e37..d6c40c6ff 100644 --- a/src/main/java/demo/CreateIndexWorkflowStep.java +++ b/src/main/java/demo/CreateIndexWorkflowStep.java @@ -12,7 +12,6 @@ import org.apache.logging.log4j.Logger; import org.opensearch.action.admin.indices.create.CreateIndexResponse; import org.opensearch.flowframework.workflow.WorkflowData; -import org.opensearch.flowframework.workflow.WorkflowInputData; import org.opensearch.flowframework.workflow.WorkflowStep; import java.util.List; @@ -24,7 +23,6 @@ public class CreateIndexWorkflowStep implements WorkflowStep { private static final Logger logger = LogManager.getLogger(CreateIndexWorkflowStep.class); private final String name; - private WorkflowInputData inputData = null; public CreateIndexWorkflowStep() { this.name = "CREATE_INDEX"; @@ -32,31 +30,39 @@ public CreateIndexWorkflowStep() { @Override public CompletableFuture execute(List data) { - logger.debug("Executing with {} input args.", data.size()); CompletableFuture future = new CompletableFuture<>(); CompletableFuture.runAsync(() -> { - if (inputData != null) { - logger.debug("Initialized params: {}, content: {}", inputData.getParams(), inputData.getContent()); - } + String inputIndex = null; + boolean first = true; for (WorkflowData wfData : data) { - Map content = wfData.getContent(); - if (wfData instanceof WorkflowInputData) { - logger.debug( - "Previous step sent params: {}, content: {}", - ((WorkflowInputData) wfData).getParams(), - wfData.getContent() - ); - } else { - logger.debug("Received from previous step: content: {}", content); + logger.debug( + "{} sent params: {}, content: {}", + first ? "Initialization" : "Previous step", + wfData.getParams(), + wfData.getContent() + ); + if (first) { + Map params = data.get(0).getParams(); + if (params.containsKey("index")) { + inputIndex = params.get("index"); + } + first = false; } } - // do some work + // do some work, simulating a REST API call try { Thread.sleep(2000); } catch (InterruptedException e) {} // Simulate response of created index - CreateIndexResponse response = new CreateIndexResponse(true, true, inputData.getParams().get("index")); - future.complete(new CreateIndexResponseData(response)); + CreateIndexResponse response = new CreateIndexResponse(true, true, inputIndex); + // OLD UNSCALABLE WAY: future.complete(new CreateIndexResponseData(response)); + // Better way with an anonymous class: + future.complete(new WorkflowData() { + @Override + public Map getContent() { + return Map.of("index", response.index()); + } + }); }); return future; @@ -66,8 +72,4 @@ public CompletableFuture execute(List data) { public String getName() { return name; } - - public void setInput(WorkflowInputData data) { - this.inputData = data; - } } diff --git a/src/main/java/demo/README.txt b/src/main/java/demo/README.txt new file mode 100644 index 000000000..4fef77960 --- /dev/null +++ b/src/main/java/demo/README.txt @@ -0,0 +1,13 @@ + +DO NOT DEPEND ON CLASSES IN THIS PACKAGE. + +The contents of this folder are for demo/proof-of-concept use. + +Feel free to look at the classes in this folder for potential "how could I" scenarios. + +Tests will not be written against them. +Documentation may be incomplete, wrong, or outdated. +These are not for production use. +They will be deleted without notice at some point, and altered without notice at other points. + +DO NOT DEPEND ON CLASSES IN THIS PACKAGE. diff --git a/src/main/java/org/opensearch/flowframework/template/ProcessNode.java b/src/main/java/org/opensearch/flowframework/template/ProcessNode.java index bc71da0aa..d3c207d28 100644 --- a/src/main/java/org/opensearch/flowframework/template/ProcessNode.java +++ b/src/main/java/org/opensearch/flowframework/template/ProcessNode.java @@ -32,6 +32,7 @@ public class ProcessNode { private final String id; private final WorkflowStep workflowStep; + private final WorkflowData input; private CompletableFuture future = null; // will be populated during graph parsing @@ -44,8 +45,20 @@ public class ProcessNode { * @param workflowStep A java class implementing {@link WorkflowStep} to be executed when it's this node's turn. */ ProcessNode(String id, WorkflowStep workflowStep) { + this(id, workflowStep, WorkflowData.EMPTY); + } + + /** + * Create this node linked to its executing process. + * + * @param id A string identifying the workflow step + * @param workflowStep A java class implementing {@link WorkflowStep} to be executed when it's this node's turn. + * @param input Input required by the node + */ + public ProcessNode(String id, WorkflowStep workflowStep, WorkflowData input) { this.id = id; this.workflowStep = workflowStep; + this.input = input; } /** @@ -64,6 +77,14 @@ public WorkflowStep workflowStep() { return workflowStep; } + /** + * Returns the input data for this node. + * @return the input data + */ + public WorkflowData getInput() { + return input; + } + /** * Returns a {@link CompletableFuture} if this process is executing. * Relies on the node having been sorted and executed in an order such that all predecessor nodes have begun execution first (and thus populated this value). @@ -115,6 +136,7 @@ public CompletableFuture execute() { logger.debug(">>> Starting {}.", this.id); // get the input data from predecessor(s) List input = new ArrayList(); + input.add(this.input); for (CompletableFuture cf : predFutures) { try { input.add(cf.get()); diff --git a/src/main/java/org/opensearch/flowframework/template/TemplateParser.java b/src/main/java/org/opensearch/flowframework/template/TemplateParser.java index cec956af3..bafd108e6 100644 --- a/src/main/java/org/opensearch/flowframework/template/TemplateParser.java +++ b/src/main/java/org/opensearch/flowframework/template/TemplateParser.java @@ -14,6 +14,7 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.opensearch.action.admin.indices.create.CreateIndexRequest; +import org.opensearch.flowframework.workflow.WorkflowData; import org.opensearch.flowframework.workflow.WorkflowStep; import java.util.ArrayDeque; @@ -28,9 +29,6 @@ import java.util.function.Function; import java.util.stream.Collectors; -import demo.CreateIndexRequestData; -import demo.CreateIndexWorkflowStep; - /** * Utility class for parsing templates. */ @@ -53,22 +51,44 @@ public static List parseJsonGraphToSequence(String json, Map nodes = new ArrayList<>(); List edges = new ArrayList<>(); + // TODO: make this name a constant and make sure it is consistent with template for (JsonElement nodeJson : graph.getAsJsonArray("nodes")) { JsonObject nodeObject = nodeJson.getAsJsonObject(); String nodeId = nodeObject.get("id").getAsString(); + // The below steps will be replaced by a generator class that instantiates a WorkflowStep + // based on user_input data from the template. WorkflowStep workflowStep = workflowSteps.get(nodeId); - if (workflowStep instanceof CreateIndexWorkflowStep) { + // temporary demo POC of getting from a request to input data + // this will be refactored into something pulling from user template + WorkflowData input = WorkflowData.EMPTY; + if (List.of("create_index", "create_another_index").contains(nodeId)) { CreateIndexRequest request = new CreateIndexRequest(nodeObject.get("index_name").getAsString()); - ((CreateIndexWorkflowStep) workflowStep).setInput(new CreateIndexRequestData(request)); + input = new WorkflowData() { + + @Override + public Map getContent() { + // See CreateIndexRequest ParseFields for source of content keys needed + return Map.of("mappings", request.mappings(), "settings", request.settings(), "aliases", request.aliases()); + } + + @Override + public Map getParams() { + // See RestCreateIndexAction for source of param keys needed + return Map.of("index", request.index()); + } + + }; } - nodes.add(new ProcessNode(nodeId, workflowStep)); + nodes.add(new ProcessNode(nodeId, workflowStep, input)); } + // TODO: make this name a constant and make sure it is consistent with template for (JsonElement edgeJson : graph.getAsJsonArray("edges")) { JsonObject edgeObject = edgeJson.getAsJsonObject(); String sourceNodeId = edgeObject.get("source").getAsString(); diff --git a/src/main/java/org/opensearch/flowframework/workflow/WorkflowData.java b/src/main/java/org/opensearch/flowframework/workflow/WorkflowData.java index 4e17ed0d3..09eb041fc 100644 --- a/src/main/java/org/opensearch/flowframework/workflow/WorkflowData.java +++ b/src/main/java/org/opensearch/flowframework/workflow/WorkflowData.java @@ -8,16 +8,33 @@ */ package org.opensearch.flowframework.workflow; +import java.util.Collections; import java.util.Map; /** - * Interface for handling the input/output of the building blocks. + * Interface representing data provided as input to, and produced as output from, {@link WorkflowStep}s. */ public interface WorkflowData { + /** + * An object representing no data, useful when a workflow step has no required input or output. + */ + WorkflowData EMPTY = new WorkflowData() { + }; + /** * Accesses a map containing the content of the workflow step. This represents the data associated with a Rest API request. * @return the content of this step. */ - Map getContent(); + default Map getContent() { + return Collections.emptyMap(); + }; + + /** + * Accesses a map containing the params of this workflow step. This represents the params associated with a Rest API request, parsed from the URI. + * @return the params of this step. + */ + default Map getParams() { + return Collections.emptyMap(); + }; } diff --git a/src/main/java/org/opensearch/flowframework/workflow/WorkflowInputData.java b/src/main/java/org/opensearch/flowframework/workflow/WorkflowInputData.java deleted file mode 100644 index 86e1179cf..000000000 --- a/src/main/java/org/opensearch/flowframework/workflow/WorkflowInputData.java +++ /dev/null @@ -1,24 +0,0 @@ -/* - * Copyright OpenSearch Contributors - * SPDX-License-Identifier: Apache-2.0 - * - * The OpenSearch Contributors require contributions made to - * this file be licensed under the Apache-2.0 license or a - * compatible open source license. - */ -package org.opensearch.flowframework.workflow; - -import java.util.Map; - -/** - * Interface for handling the input of the building blocks. - */ -public interface WorkflowInputData extends WorkflowData { - - /** - * Accesses a map containing the params of this workflow step. This represents the params associated with a Rest API request, parsed from the URI. - * @return the params of this step. - */ - Map getParams(); - -} diff --git a/src/main/java/org/opensearch/flowframework/workflow/WorkflowStep.java b/src/main/java/org/opensearch/flowframework/workflow/WorkflowStep.java index ea3814a91..6cd5f5a28 100644 --- a/src/main/java/org/opensearch/flowframework/workflow/WorkflowStep.java +++ b/src/main/java/org/opensearch/flowframework/workflow/WorkflowStep.java @@ -18,8 +18,8 @@ public interface WorkflowStep { /** * Triggers the actual processing of the building block. - * @param data representing input params and content, or output content of previous steps. - * @return A CompletableFuture of the building block. This block should return immediately, but not be completed until the step executes, containing the step's output data which may be passed to follow-on steps. + * @param data representing input params and content, or output content of previous steps. The first element of the list is data (if any) provided from parsing the template, and may be {@link WorkflowData#EMPTY}. + * @return A CompletableFuture of the building block. This block should return immediately, but not be completed until the step executes, containing either the step's output data or {@link WorkflowData#EMPTY} which may be passed to follow-on steps. */ CompletableFuture execute(List data); diff --git a/src/test/resources/template/datademo.json b/src/test/resources/template/datademo.json index 966f37f13..e2b9eb386 100644 --- a/src/test/resources/template/datademo.json +++ b/src/test/resources/template/datademo.json @@ -11,10 +11,10 @@ } ], "edges": [ - { - "source": "create_index", - "dest": "create_another_index" - } + { + "source": "create_index", + "dest": "create_another_index" + } ] } } From dfcd891df7ff3f2870a2ed917a9846308bf6c299 Mon Sep 17 00:00:00 2001 From: Daniel Widdis Date: Fri, 15 Sep 2023 13:48:34 -0700 Subject: [PATCH 10/12] Add tests Signed-off-by: Daniel Widdis --- .../flowframework/template/ProcessNode.java | 2 +- .../template/TemplateParser.java | 61 ++++--- .../template/ProcessNodeTests.java | 57 +++++++ .../template/ProcessSequenceEdgeTests.java | 32 ++++ .../template/TemplateParserTests.java | 153 ++++++++++++++++++ 5 files changed, 281 insertions(+), 24 deletions(-) create mode 100644 src/test/java/org/opensearch/flowframework/template/ProcessNodeTests.java create mode 100644 src/test/java/org/opensearch/flowframework/template/ProcessSequenceEdgeTests.java create mode 100644 src/test/java/org/opensearch/flowframework/template/TemplateParserTests.java diff --git a/src/main/java/org/opensearch/flowframework/template/ProcessNode.java b/src/main/java/org/opensearch/flowframework/template/ProcessNode.java index d3c207d28..fc1afa127 100644 --- a/src/main/java/org/opensearch/flowframework/template/ProcessNode.java +++ b/src/main/java/org/opensearch/flowframework/template/ProcessNode.java @@ -81,7 +81,7 @@ public WorkflowStep workflowStep() { * Returns the input data for this node. * @return the input data */ - public WorkflowData getInput() { + public WorkflowData input() { return input; } diff --git a/src/main/java/org/opensearch/flowframework/template/TemplateParser.java b/src/main/java/org/opensearch/flowframework/template/TemplateParser.java index bafd108e6..bba8bfb1e 100644 --- a/src/main/java/org/opensearch/flowframework/template/TemplateParser.java +++ b/src/main/java/org/opensearch/flowframework/template/TemplateParser.java @@ -36,6 +36,14 @@ public class TemplateParser { private static final Logger logger = LogManager.getLogger(TemplateParser.class); + // Field names in the JSON. Package private for tests. + static final String WORKFLOW = "sequence"; + static final String NODES = "nodes"; + static final String NODE_ID = "id"; + static final String EDGES = "edges"; + static final String SOURCE = "source"; + static final String DESTINATION = "dest"; + /** * Prevent instantiating this class. */ @@ -51,25 +59,23 @@ public static List parseJsonGraphToSequence(String json, Map nodes = new ArrayList<>(); List edges = new ArrayList<>(); - // TODO: make this name a constant and make sure it is consistent with template - for (JsonElement nodeJson : graph.getAsJsonArray("nodes")) { + for (JsonElement nodeJson : graph.getAsJsonArray(NODES)) { JsonObject nodeObject = nodeJson.getAsJsonObject(); - String nodeId = nodeObject.get("id").getAsString(); + String nodeId = nodeObject.get(NODE_ID).getAsString(); // The below steps will be replaced by a generator class that instantiates a WorkflowStep // based on user_input data from the template. WorkflowStep workflowStep = workflowSteps.get(nodeId); // temporary demo POC of getting from a request to input data // this will be refactored into something pulling from user template - WorkflowData input = WorkflowData.EMPTY; + WorkflowData inputData = WorkflowData.EMPTY; if (List.of("create_index", "create_another_index").contains(nodeId)) { CreateIndexRequest request = new CreateIndexRequest(nodeObject.get("index_name").getAsString()); - input = new WorkflowData() { + inputData = new WorkflowData() { @Override public Map getContent() { @@ -85,14 +91,16 @@ public Map getParams() { }; } - nodes.add(new ProcessNode(nodeId, workflowStep, input)); + nodes.add(new ProcessNode(nodeId, workflowStep, inputData)); } - // TODO: make this name a constant and make sure it is consistent with template - for (JsonElement edgeJson : graph.getAsJsonArray("edges")) { + for (JsonElement edgeJson : graph.getAsJsonArray(EDGES)) { JsonObject edgeObject = edgeJson.getAsJsonObject(); - String sourceNodeId = edgeObject.get("source").getAsString(); - String destNodeId = edgeObject.get("dest").getAsString(); + String sourceNodeId = edgeObject.get(SOURCE).getAsString(); + String destNodeId = edgeObject.get(DESTINATION).getAsString(); + if (sourceNodeId.equals(destNodeId)) { + throw new IllegalArgumentException("Edge connects node " + sourceNodeId + " to itself."); + } edges.add(new ProcessSequenceEdge(sourceNodeId, destNodeId)); } @@ -113,8 +121,15 @@ private static List topologicalSort(List nodes, List

new HashSet<>()).add(edge); successorEdges.computeIfAbsent(source, k -> new HashSet<>()).add(edge); } + // update predecessors on the node object + nodes.stream().filter(n -> predecessorEdges.containsKey(n)).forEach(n -> { + n.setPredecessors(predecessorEdges.get(n).stream().map(e -> nodeMap.get(e.getSource())).collect(Collectors.toSet())); + }); + // See https://en.wikipedia.org/wiki/Topological_sorting#Kahn's_algorithm - // Find start node(s) which have no predecessors + // L ← Empty list that will contain the sorted elements + List sortedNodes = new ArrayList<>(); + // S ← Set of all nodes with no incoming edge Queue sourceNodes = new ArrayDeque<>(); nodes.stream().filter(n -> !predecessorEdges.containsKey(n)).forEach(n -> sourceNodes.add(n)); if (sourceNodes.isEmpty()) { @@ -122,21 +137,21 @@ private static List topologicalSort(List nodes, List

sortedNodes = new ArrayList<>(); - // Keep adding successors + // while S is not empty do while (!sourceNodes.isEmpty()) { + // remove a node n from S ProcessNode n = sourceNodes.poll(); + // add n to L sortedNodes.add(n); - if (predecessorEdges.containsKey(n)) { - n.setPredecessors(predecessorEdges.get(n).stream().map(e -> nodeMap.get(e.getSource())).collect(Collectors.toSet())); - } - // Add successors to the queue + // for each node m with an edge e from n to m do for (ProcessSequenceEdge e : successorEdges.getOrDefault(n, Collections.emptySet())) { + ProcessNode m = nodeMap.get(e.getDestination()); + // remove edge e from the graph graph.remove(e); - ProcessNode dest = nodeMap.get(e.getDestination()); - if (!sourceNodes.contains(dest) && !sortedNodes.contains(dest)) { - sourceNodes.add(dest); + // if m has no other incoming edges then + if (!predecessorEdges.get(m).stream().anyMatch(i -> graph.contains(i))) { + // insert m into S + sourceNodes.add(m); } } } diff --git a/src/test/java/org/opensearch/flowframework/template/ProcessNodeTests.java b/src/test/java/org/opensearch/flowframework/template/ProcessNodeTests.java new file mode 100644 index 000000000..f9fa328c6 --- /dev/null +++ b/src/test/java/org/opensearch/flowframework/template/ProcessNodeTests.java @@ -0,0 +1,57 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ +package org.opensearch.flowframework.template; + +import org.opensearch.flowframework.workflow.WorkflowData; +import org.opensearch.flowframework.workflow.WorkflowStep; +import org.opensearch.test.OpenSearchTestCase; + +import java.util.List; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; + +public class ProcessNodeTests extends OpenSearchTestCase { + + @Override + public void setUp() throws Exception { + super.setUp(); + } + + public void testNode() throws InterruptedException, ExecutionException { + ProcessNode nodeA = new ProcessNode("A", new WorkflowStep() { + @Override + public CompletableFuture execute(List data) { + CompletableFuture f = new CompletableFuture<>(); + f.complete(WorkflowData.EMPTY); + return f; + } + + @Override + public String getName() { + return "test"; + } + }, WorkflowData.EMPTY); + assertEquals("A", nodeA.id()); + assertEquals("test", nodeA.workflowStep().getName()); + assertEquals(WorkflowData.EMPTY, nodeA.input()); + // FIXME: This is causing thread leaks + CompletableFuture f = nodeA.execute(); + assertEquals(f, nodeA.getFuture()); + f.orTimeout(5, TimeUnit.SECONDS); + assertTrue(f.isDone()); + assertEquals(WorkflowData.EMPTY, f.get()); + + ProcessNode nodeB = new ProcessNode("B", null, null); + assertNotEquals(nodeA, nodeB); + + ProcessNode nodeA2 = new ProcessNode("A", null, null); + assertEquals(nodeA, nodeA2); + } +} diff --git a/src/test/java/org/opensearch/flowframework/template/ProcessSequenceEdgeTests.java b/src/test/java/org/opensearch/flowframework/template/ProcessSequenceEdgeTests.java new file mode 100644 index 000000000..80cecd96e --- /dev/null +++ b/src/test/java/org/opensearch/flowframework/template/ProcessSequenceEdgeTests.java @@ -0,0 +1,32 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ +package org.opensearch.flowframework.template; + +import org.opensearch.test.OpenSearchTestCase; + +public class ProcessSequenceEdgeTests extends OpenSearchTestCase { + + @Override + public void setUp() throws Exception { + super.setUp(); + } + + public void testEdge() { + ProcessSequenceEdge edgeAB = new ProcessSequenceEdge("A", "B"); + assertEquals("A", edgeAB.getSource()); + assertEquals("B", edgeAB.getDestination()); + assertEquals("A->B", edgeAB.toString()); + + ProcessSequenceEdge edgeAB2 = new ProcessSequenceEdge("A", "B"); + assertEquals(edgeAB, edgeAB2); + + ProcessSequenceEdge edgeAC = new ProcessSequenceEdge("A", "C"); + assertNotEquals(edgeAB, edgeAC); + } +} diff --git a/src/test/java/org/opensearch/flowframework/template/TemplateParserTests.java b/src/test/java/org/opensearch/flowframework/template/TemplateParserTests.java new file mode 100644 index 000000000..24dcf0640 --- /dev/null +++ b/src/test/java/org/opensearch/flowframework/template/TemplateParserTests.java @@ -0,0 +1,153 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ +package org.opensearch.flowframework.template; + +import org.opensearch.test.OpenSearchTestCase; + +import java.util.Collections; +import java.util.List; +import java.util.stream.Collectors; + +import static org.opensearch.flowframework.template.TemplateParser.DESTINATION; +import static org.opensearch.flowframework.template.TemplateParser.EDGES; +import static org.opensearch.flowframework.template.TemplateParser.NODES; +import static org.opensearch.flowframework.template.TemplateParser.NODE_ID; +import static org.opensearch.flowframework.template.TemplateParser.SOURCE; +import static org.opensearch.flowframework.template.TemplateParser.WORKFLOW; + +public class TemplateParserTests extends OpenSearchTestCase { + + private static final String NO_START_NODE_DETECTED = "No start node detected: all nodes have a predecessor."; + private static final String CYCLE_DETECTED = "Cycle detected:"; + + // Input JSON generators + private static String node(String id) { + return "{\"" + NODE_ID + "\": \"" + id + "\"}"; + } + + private static String edge(String sourceId, String destId) { + return "{\"" + SOURCE + "\": \"" + sourceId + "\", \"" + DESTINATION + "\": \"" + destId + "\"}"; + } + + private static String workflow(List nodes, List edges) { + return "{\"" + WORKFLOW + "\": {" + arrayField(NODES, nodes) + ", " + arrayField(EDGES, edges) + "}}"; + } + + private static String arrayField(String fieldName, List objects) { + return "\"" + fieldName + "\": [" + objects.stream().collect(Collectors.joining(", ")) + "]"; + } + + // Output list elements + private static ProcessNode expectedNode(String id) { + return new ProcessNode(id, null, null); + } + + // Less verbose parser + private static List parse(String json) { + return TemplateParser.parseJsonGraphToSequence(json, Collections.emptyMap()); + } + + @Override + public void setUp() throws Exception { + super.setUp(); + } + + public void testOrdering() { + List workflow; + + workflow = parse(workflow(List.of(node("A"), node("B"), node("C")), List.of(edge("C", "B"), edge("B", "A")))); + assertEquals(0, workflow.indexOf(expectedNode("C"))); + assertEquals(1, workflow.indexOf(expectedNode("B"))); + assertEquals(2, workflow.indexOf(expectedNode("A"))); + + workflow = parse( + workflow( + List.of(node("A"), node("B"), node("C"), node("D")), + List.of(edge("A", "B"), edge("A", "C"), edge("B", "D"), edge("C", "D")) + ) + ); + assertEquals(0, workflow.indexOf(expectedNode("A"))); + int b = workflow.indexOf(expectedNode("B")); + int c = workflow.indexOf(expectedNode("C")); + assertTrue(b == 1 || b == 2); + assertTrue(c == 1 || c == 2); + assertEquals(3, workflow.indexOf(expectedNode("D"))); + + workflow = parse( + workflow( + List.of(node("A"), node("B"), node("C"), node("D"), node("E")), + List.of(edge("A", "B"), edge("A", "C"), edge("B", "D"), edge("D", "E"), edge("C", "E")) + ) + ); + assertEquals(0, workflow.indexOf(expectedNode("A"))); + b = workflow.indexOf(expectedNode("B")); + c = workflow.indexOf(expectedNode("C")); + int d = workflow.indexOf(expectedNode("D")); + assertTrue(b == 1 || b == 2); + assertTrue(c == 1 || c == 2); + assertTrue(d == 2 || d == 3); + assertEquals(4, workflow.indexOf(expectedNode("E"))); + } + + public void testCycles() { + Exception ex; + + ex = assertThrows(IllegalArgumentException.class, () -> parse(workflow(List.of(node("A")), List.of(edge("A", "A"))))); + assertEquals("Edge connects node A to itself.", ex.getMessage()); + + ex = assertThrows( + IllegalArgumentException.class, + () -> parse(workflow(List.of(node("A"), node("B")), List.of(edge("A", "B"), edge("B", "B")))) + ); + assertEquals("Edge connects node B to itself.", ex.getMessage()); + + ex = assertThrows( + IllegalArgumentException.class, + () -> parse(workflow(List.of(node("A"), node("B")), List.of(edge("A", "B"), edge("B", "A")))) + ); + assertEquals(NO_START_NODE_DETECTED, ex.getMessage()); + + ex = assertThrows( + IllegalArgumentException.class, + () -> parse(workflow(List.of(node("A"), node("B"), node("C")), List.of(edge("A", "B"), edge("B", "C"), edge("C", "B")))) + ); + assertTrue(ex.getMessage().startsWith(CYCLE_DETECTED)); + assertTrue(ex.getMessage().contains("B->C")); + assertTrue(ex.getMessage().contains("C->B")); + + ex = assertThrows( + IllegalArgumentException.class, + () -> parse( + workflow( + List.of(node("A"), node("B"), node("C"), node("D")), + List.of(edge("A", "B"), edge("B", "C"), edge("C", "D"), edge("D", "B")) + ) + ) + ); + assertTrue(ex.getMessage().startsWith(CYCLE_DETECTED)); + assertTrue(ex.getMessage().contains("B->C")); + assertTrue(ex.getMessage().contains("C->D")); + assertTrue(ex.getMessage().contains("D->B")); + } + + public void testNoEdges() { + Exception ex = assertThrows( + IllegalArgumentException.class, + () -> parse(workflow(Collections.emptyList(), Collections.emptyList())) + ); + assertEquals(NO_START_NODE_DETECTED, ex.getMessage()); + + assertEquals(List.of(expectedNode("A")), parse(workflow(List.of(node("A")), Collections.emptyList()))); + + List workflow = parse(workflow(List.of(node("A"), node("B")), Collections.emptyList())); + assertEquals(2, workflow.size()); + assertTrue(workflow.contains(expectedNode("A"))); + assertTrue(workflow.contains(expectedNode("B"))); + } +} From 136ceaaa5a171143c457ea52f037423aeefa695c Mon Sep 17 00:00:00 2001 From: Daniel Widdis Date: Fri, 15 Sep 2023 17:21:11 -0700 Subject: [PATCH 11/12] Fix javadocs and forbidden API issues Signed-off-by: Daniel Widdis --- .codecov.yml | 4 +++ formatter/formatting.gradle | 1 + .../java/demo/CreateIndexWorkflowStep.java | 6 ++++ src/main/java/demo/DataDemo.java | 9 +++-- src/main/java/demo/Demo.java | 9 +++-- src/main/java/demo/DemoWorkflowStep.java | 7 ++++ .../template/TemplateParser.java | 4 +-- .../template/ProcessNodeTests.java | 16 ++++++--- .../workflow/WorkflowDataTests.java | 28 +++++++++++++++ src/test/resources/template/datademo.json | 36 +++++++++---------- 10 files changed, 92 insertions(+), 28 deletions(-) create mode 100644 src/test/java/org/opensearch/flowframework/workflow/WorkflowDataTests.java diff --git a/.codecov.yml b/.codecov.yml index 7c38e4e63..e5bbd7262 100644 --- a/.codecov.yml +++ b/.codecov.yml @@ -1,6 +1,10 @@ codecov: require_ci_to_pass: yes +# ignore files in demo package +ignore: + - "src/main/java/demo" + coverage: precision: 2 round: down diff --git a/formatter/formatting.gradle b/formatter/formatting.gradle index e3bc090e0..88d097623 100644 --- a/formatter/formatting.gradle +++ b/formatter/formatting.gradle @@ -35,6 +35,7 @@ allprojects { trimTrailingWhitespace() endWithNewline() + indentWithSpaces(4) } format("license", { licenseHeaderFile("${rootProject.file("formatter/license-header.txt")}", "package "); diff --git a/src/main/java/demo/CreateIndexWorkflowStep.java b/src/main/java/demo/CreateIndexWorkflowStep.java index d6c40c6ff..17b42567d 100644 --- a/src/main/java/demo/CreateIndexWorkflowStep.java +++ b/src/main/java/demo/CreateIndexWorkflowStep.java @@ -18,12 +18,18 @@ import java.util.Map; import java.util.concurrent.CompletableFuture; +/** + * Sample to show other devs how to pass data around. Will be deleted once other PRs are merged. + */ public class CreateIndexWorkflowStep implements WorkflowStep { private static final Logger logger = LogManager.getLogger(CreateIndexWorkflowStep.class); private final String name; + /** + * Instantiate this class. + */ public CreateIndexWorkflowStep() { this.name = "CREATE_INDEX"; } diff --git a/src/main/java/demo/DataDemo.java b/src/main/java/demo/DataDemo.java index cedf0f5e9..f2d606f07 100644 --- a/src/main/java/demo/DataDemo.java +++ b/src/main/java/demo/DataDemo.java @@ -10,16 +10,19 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; +import org.opensearch.common.SuppressForbidden; +import org.opensearch.common.io.PathUtils; import org.opensearch.flowframework.template.ProcessNode; import org.opensearch.flowframework.template.TemplateParser; import org.opensearch.flowframework.workflow.WorkflowStep; import java.io.IOException; +import java.nio.charset.StandardCharsets; import java.nio.file.Files; -import java.nio.file.Paths; import java.util.ArrayList; import java.util.HashMap; import java.util.List; +import java.util.Locale; import java.util.Map; import java.util.Set; import java.util.concurrent.CompletableFuture; @@ -45,11 +48,12 @@ public class DataDemo { * * @param args unused */ + @SuppressForbidden(reason = "just a demo class that will be deleted") public static void main(String[] args) { String path = "src/test/resources/template/datademo.json"; String json; try { - json = new String(Files.readAllBytes(Paths.get(path))); + json = new String(Files.readAllBytes(PathUtils.get(path)), StandardCharsets.UTF_8); } catch (IOException e) { logger.error("Failed to read JSON at path {}", path); return; @@ -67,6 +71,7 @@ public static void main(String[] args) { predecessors.isEmpty() ? " Can start immediately!" : String.format( + Locale.getDefault(), " Must wait for [%s] to complete first.", predecessors.stream().map(p -> p.id()).collect(Collectors.joining(", ")) ) diff --git a/src/main/java/demo/Demo.java b/src/main/java/demo/Demo.java index 0dba03169..58d977827 100644 --- a/src/main/java/demo/Demo.java +++ b/src/main/java/demo/Demo.java @@ -10,16 +10,19 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; +import org.opensearch.common.SuppressForbidden; +import org.opensearch.common.io.PathUtils; import org.opensearch.flowframework.template.ProcessNode; import org.opensearch.flowframework.template.TemplateParser; import org.opensearch.flowframework.workflow.WorkflowStep; import java.io.IOException; +import java.nio.charset.StandardCharsets; import java.nio.file.Files; -import java.nio.file.Paths; import java.util.ArrayList; import java.util.HashMap; import java.util.List; +import java.util.Locale; import java.util.Map; import java.util.Set; import java.util.concurrent.CompletableFuture; @@ -47,11 +50,12 @@ public class Demo { * * @param args unused */ + @SuppressForbidden(reason = "just a demo class that will be deleted") public static void main(String[] args) { String path = "src/test/resources/template/demo.json"; String json; try { - json = new String(Files.readAllBytes(Paths.get(path))); + json = new String(Files.readAllBytes(PathUtils.get(path)), StandardCharsets.UTF_8); } catch (IOException e) { logger.error("Failed to read JSON at path {}", path); return; @@ -69,6 +73,7 @@ public static void main(String[] args) { predecessors.isEmpty() ? " Can start immediately!" : String.format( + Locale.getDefault(), " Must wait for [%s] to complete first.", predecessors.stream().map(p -> p.id()).collect(Collectors.joining(", ")) ) diff --git a/src/main/java/demo/DemoWorkflowStep.java b/src/main/java/demo/DemoWorkflowStep.java index 866928f7c..037d9b6f6 100644 --- a/src/main/java/demo/DemoWorkflowStep.java +++ b/src/main/java/demo/DemoWorkflowStep.java @@ -14,11 +14,18 @@ import java.util.List; import java.util.concurrent.CompletableFuture; +/** + * Demo workflowstep to show sequenced execution + */ public class DemoWorkflowStep implements WorkflowStep { private final long delay; private final String name; + /** + * Instantiate a step with a delay. + * @param delay milliseconds to take pretending to do work while really sleeping + */ public DemoWorkflowStep(long delay) { this.delay = delay; this.name = "DEMO_DELAY_" + delay; diff --git a/src/main/java/org/opensearch/flowframework/template/TemplateParser.java b/src/main/java/org/opensearch/flowframework/template/TemplateParser.java index bba8bfb1e..efca5a954 100644 --- a/src/main/java/org/opensearch/flowframework/template/TemplateParser.java +++ b/src/main/java/org/opensearch/flowframework/template/TemplateParser.java @@ -127,9 +127,9 @@ private static List topologicalSort(List nodes, List

sortedNodes = new ArrayList<>(); - // S ← Set of all nodes with no incoming edge + // S <- Set of all nodes with no incoming edge Queue sourceNodes = new ArrayDeque<>(); nodes.stream().filter(n -> !predecessorEdges.containsKey(n)).forEach(n -> sourceNodes.add(n)); if (sourceNodes.isEmpty()) { diff --git a/src/test/java/org/opensearch/flowframework/template/ProcessNodeTests.java b/src/test/java/org/opensearch/flowframework/template/ProcessNodeTests.java index f9fa328c6..3feab9f3b 100644 --- a/src/test/java/org/opensearch/flowframework/template/ProcessNodeTests.java +++ b/src/test/java/org/opensearch/flowframework/template/ProcessNodeTests.java @@ -8,15 +8,20 @@ */ package org.opensearch.flowframework.template; +import com.carrotsearch.randomizedtesting.annotations.ThreadLeakScope; +import com.carrotsearch.randomizedtesting.annotations.ThreadLeakScope.Scope; + import org.opensearch.flowframework.workflow.WorkflowData; import org.opensearch.flowframework.workflow.WorkflowStep; import org.opensearch.test.OpenSearchTestCase; +import java.util.Collections; import java.util.List; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; +@ThreadLeakScope(Scope.NONE) public class ProcessNodeTests extends OpenSearchTestCase { @Override @@ -37,21 +42,24 @@ public CompletableFuture execute(List data) { public String getName() { return "test"; } - }, WorkflowData.EMPTY); + }); assertEquals("A", nodeA.id()); assertEquals("test", nodeA.workflowStep().getName()); assertEquals(WorkflowData.EMPTY, nodeA.input()); - // FIXME: This is causing thread leaks + assertEquals(Collections.emptySet(), nodeA.getPredecessors()); + assertEquals("A", nodeA.toString()); + + // TODO: Once we can get OpenSearch Thread Pool for this execute method, create an IT and don't test execute here CompletableFuture f = nodeA.execute(); assertEquals(f, nodeA.getFuture()); f.orTimeout(5, TimeUnit.SECONDS); assertTrue(f.isDone()); assertEquals(WorkflowData.EMPTY, f.get()); - ProcessNode nodeB = new ProcessNode("B", null, null); + ProcessNode nodeB = new ProcessNode("B", null); assertNotEquals(nodeA, nodeB); - ProcessNode nodeA2 = new ProcessNode("A", null, null); + ProcessNode nodeA2 = new ProcessNode("A", null); assertEquals(nodeA, nodeA2); } } diff --git a/src/test/java/org/opensearch/flowframework/workflow/WorkflowDataTests.java b/src/test/java/org/opensearch/flowframework/workflow/WorkflowDataTests.java new file mode 100644 index 000000000..42a1a1a03 --- /dev/null +++ b/src/test/java/org/opensearch/flowframework/workflow/WorkflowDataTests.java @@ -0,0 +1,28 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ +package org.opensearch.flowframework.workflow; + +import org.opensearch.test.OpenSearchTestCase; + +import java.util.Collections; + +public class WorkflowDataTests extends OpenSearchTestCase { + + @Override + public void setUp() throws Exception { + super.setUp(); + } + + public void testWorkflowData() { + WorkflowData data = new WorkflowData() { + }; + assertEquals(Collections.emptyMap(), data.getParams()); + assertEquals(Collections.emptyMap(), data.getContent()); + } +} diff --git a/src/test/resources/template/datademo.json b/src/test/resources/template/datademo.json index e2b9eb386..a1323ed2c 100644 --- a/src/test/resources/template/datademo.json +++ b/src/test/resources/template/datademo.json @@ -1,20 +1,20 @@ { - "sequence": { - "nodes": [ - { - "id": "create_index", - "index_name": "demo" - }, - { - "id": "create_another_index", - "index_name": "second_demo" - } - ], - "edges": [ - { - "source": "create_index", - "dest": "create_another_index" - } - ] - } + "sequence": { + "nodes": [ + { + "id": "create_index", + "index_name": "demo" + }, + { + "id": "create_another_index", + "index_name": "second_demo" + } + ], + "edges": [ + { + "source": "create_index", + "dest": "create_another_index" + } + ] + } } From d1cbcbd02d0f25cbe4a5b0d705f882b723a4c249 Mon Sep 17 00:00:00 2001 From: Daniel Widdis Date: Mon, 18 Sep 2023 17:02:29 -0700 Subject: [PATCH 12/12] Address code review comments Signed-off-by: Daniel Widdis --- formatter/formatting.gradle | 2 +- src/main/java/demo/CreateIndexWorkflowStep.java | 6 ++++-- .../org/opensearch/flowframework/template/ProcessNode.java | 7 ++++++- .../opensearch/flowframework/template/TemplateParser.java | 3 ++- 4 files changed, 13 insertions(+), 5 deletions(-) diff --git a/formatter/formatting.gradle b/formatter/formatting.gradle index 88d097623..8f842128f 100644 --- a/formatter/formatting.gradle +++ b/formatter/formatting.gradle @@ -35,7 +35,7 @@ allprojects { trimTrailingWhitespace() endWithNewline() - indentWithSpaces(4) + indentWithSpaces() } format("license", { licenseHeaderFile("${rootProject.file("formatter/license-header.txt")}", "package "); diff --git a/src/main/java/demo/CreateIndexWorkflowStep.java b/src/main/java/demo/CreateIndexWorkflowStep.java index 17b42567d..c1a79188b 100644 --- a/src/main/java/demo/CreateIndexWorkflowStep.java +++ b/src/main/java/demo/CreateIndexWorkflowStep.java @@ -37,6 +37,10 @@ public CreateIndexWorkflowStep() { @Override public CompletableFuture execute(List data) { CompletableFuture future = new CompletableFuture<>(); + // TODO we will be passing a thread pool to this object when it's instantiated + // we should either add the generic executor from that pool to this call + // or use executorservice.submit or any of various threading options + // https://github.com/opensearch-project/opensearch-ai-flow-framework/issues/42 CompletableFuture.runAsync(() -> { String inputIndex = null; boolean first = true; @@ -61,8 +65,6 @@ public CompletableFuture execute(List data) { } catch (InterruptedException e) {} // Simulate response of created index CreateIndexResponse response = new CreateIndexResponse(true, true, inputIndex); - // OLD UNSCALABLE WAY: future.complete(new CreateIndexResponseData(response)); - // Better way with an anonymous class: future.complete(new WorkflowData() { @Override public Map getContent() { diff --git a/src/main/java/org/opensearch/flowframework/template/ProcessNode.java b/src/main/java/org/opensearch/flowframework/template/ProcessNode.java index fc1afa127..08a7ec841 100644 --- a/src/main/java/org/opensearch/flowframework/template/ProcessNode.java +++ b/src/main/java/org/opensearch/flowframework/template/ProcessNode.java @@ -122,18 +122,23 @@ void setPredecessors(Set predecessors) { */ public CompletableFuture execute() { this.future = new CompletableFuture<>(); + // TODO this class will be instantiated with the OpenSearch thread pool (or one for tests!) + // the generic executor from that pool should be passed to this runAsync call + // https://github.com/opensearch-project/opensearch-ai-flow-framework/issues/42 CompletableFuture.runAsync(() -> { List> predFutures = predecessors.stream().map(p -> p.getFuture()).collect(Collectors.toList()); if (!predecessors.isEmpty()) { CompletableFuture waitForPredecessors = CompletableFuture.allOf(predFutures.toArray(new CompletableFuture[0])); try { + // We need timeouts to be part of the user template or in settings + // https://github.com/opensearch-project/opensearch-ai-flow-framework/issues/45 waitForPredecessors.orTimeout(30, TimeUnit.SECONDS).get(); } catch (InterruptedException | ExecutionException e) { handleException(e); return; } } - logger.debug(">>> Starting {}.", this.id); + logger.info(">>> Starting {}.", this.id); // get the input data from predecessor(s) List input = new ArrayList(); input.add(this.input); diff --git a/src/main/java/org/opensearch/flowframework/template/TemplateParser.java b/src/main/java/org/opensearch/flowframework/template/TemplateParser.java index efca5a954..bce07c616 100644 --- a/src/main/java/org/opensearch/flowframework/template/TemplateParser.java +++ b/src/main/java/org/opensearch/flowframework/template/TemplateParser.java @@ -69,9 +69,10 @@ public static List parseJsonGraphToSequence(String json, Map