Skip to content

Commit

Permalink
Separate WorkflowNode and ProcessNode functionality
Browse files Browse the repository at this point in the history
Signed-off-by: Daniel Widdis <[email protected]>
  • Loading branch information
dbwiddis committed Sep 22, 2023
1 parent 73f11ac commit a9f8559
Show file tree
Hide file tree
Showing 15 changed files with 352 additions and 274 deletions.
1 change: 0 additions & 1 deletion build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,6 @@ repositories {
dependencies {
implementation "org.opensearch:opensearch:${opensearch_version}"
implementation 'org.junit.jupiter:junit-jupiter:5.10.0'
implementation "com.google.code.gson:gson:2.10.1"
compileOnly "com.google.guava:guava:32.1.2-jre"
api group: 'org.opensearch', name:'opensearch-ml-client', version: "${opensearch_build}"

Expand Down
12 changes: 7 additions & 5 deletions src/main/java/demo/DataDemo.java
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
import org.opensearch.common.SuppressForbidden;
import org.opensearch.common.io.PathUtils;
import org.opensearch.flowframework.template.ProcessNode;
import org.opensearch.flowframework.template.Template;
import org.opensearch.flowframework.template.TemplateParser;

import java.io.IOException;
Expand All @@ -21,7 +22,6 @@
import java.util.ArrayList;
import java.util.List;
import java.util.Locale;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.stream.Collectors;

Expand All @@ -36,10 +36,11 @@ public class DataDemo {
* Demonstrate parsing a JSON graph.
*
* @param args unused
* @throws IOException on a failure
*/
@SuppressForbidden(reason = "just a demo class that will be deleted")
public static void main(String[] args) {
String path = "src/test/resources/template/datademo.json";
public static void main(String[] args) throws IOException {
String path = "src/test/resources/template/demo.json";
String json;
try {
json = new String(Files.readAllBytes(PathUtils.get(path)), StandardCharsets.UTF_8);
Expand All @@ -49,11 +50,12 @@ public static void main(String[] args) {
}

logger.info("Parsing graph to sequence...");
List<ProcessNode> processSequence = TemplateParser.parseJsonGraphToSequence(json);
Template t = TemplateParser.parseJsonToTemplate(json);
List<ProcessNode> processSequence = TemplateParser.parseWorkflowToSequence(t.workflows().get("datademo"));
List<CompletableFuture<?>> futureList = new ArrayList<>();

for (ProcessNode n : processSequence) {
Set<ProcessNode> predecessors = n.getPredecessors();
List<ProcessNode> predecessors = n.predecessors();
logger.info(
"Queueing process [{}].{}",
n.id(),
Expand Down
10 changes: 6 additions & 4 deletions src/main/java/demo/Demo.java
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
import org.opensearch.common.SuppressForbidden;
import org.opensearch.common.io.PathUtils;
import org.opensearch.flowframework.template.ProcessNode;
import org.opensearch.flowframework.template.Template;
import org.opensearch.flowframework.template.TemplateParser;

import java.io.IOException;
Expand All @@ -21,7 +22,6 @@
import java.util.ArrayList;
import java.util.List;
import java.util.Locale;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.stream.Collectors;

Expand All @@ -36,9 +36,10 @@ public class Demo {
* Demonstrate parsing a JSON graph.
*
* @param args unused
* @throws IOException on a failure
*/
@SuppressForbidden(reason = "just a demo class that will be deleted")
public static void main(String[] args) {
public static void main(String[] args) throws IOException {
String path = "src/test/resources/template/demo.json";
String json;
try {
Expand All @@ -49,11 +50,12 @@ public static void main(String[] args) {
}

logger.info("Parsing graph to sequence...");
List<ProcessNode> processSequence = TemplateParser.parseJsonGraphToSequence(json);
Template t = TemplateParser.parseJsonToTemplate(json);
List<ProcessNode> processSequence = TemplateParser.parseWorkflowToSequence(t.workflows().get("demo"));
List<CompletableFuture<?>> futureList = new ArrayList<>();

for (ProcessNode n : processSequence) {
Set<ProcessNode> predecessors = n.getPredecessors();
List<ProcessNode> predecessors = n.predecessors();
logger.info(
"Queueing process [{}].{}",
n.id(),
Expand Down
23 changes: 9 additions & 14 deletions src/main/java/demo/TemplateParseDemo.java
Original file line number Diff line number Diff line change
Expand Up @@ -12,18 +12,14 @@
import org.apache.logging.log4j.Logger;
import org.opensearch.common.SuppressForbidden;
import org.opensearch.common.io.PathUtils;
import org.opensearch.common.xcontent.LoggingDeprecationHandler;
import org.opensearch.common.xcontent.json.JsonXContent;
import org.opensearch.core.xcontent.NamedXContentRegistry;
import org.opensearch.core.xcontent.XContentParser;
import org.opensearch.flowframework.template.Template;
import org.opensearch.flowframework.template.TemplateParser;
import org.opensearch.flowframework.workflow.Workflow;

import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;

import static org.opensearch.core.xcontent.XContentParserUtils.ensureExpectedToken;
import java.util.Map.Entry;

/**
* Demo class exercising {@link TemplateParser}. This will be moved to a unit test.
Expand All @@ -49,15 +45,14 @@ public static void main(String[] args) throws IOException {
return;
}

logger.info("Parsing template...");
XContentParser parser = JsonXContent.jsonXContent.createParser(
NamedXContentRegistry.EMPTY,
LoggingDeprecationHandler.INSTANCE,
json
);
ensureExpectedToken(XContentParser.Token.START_OBJECT, parser.nextToken(), parser);
Template t = Template.parse(parser);
Template t = TemplateParser.parseJsonToTemplate(json);

System.out.println(t.toJson());
System.out.println(t.toYaml());

for (Entry<String, Workflow> e : t.workflows().entrySet()) {
logger.info("Parsing {} workflow.", e.getKey());
TemplateParser.parseWorkflowToSequence(e.getValue());
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,7 @@
import org.opensearch.flowframework.workflow.WorkflowStep;

import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
Expand All @@ -33,32 +30,22 @@ public class ProcessNode {
private final String id;
private final WorkflowStep workflowStep;
private final WorkflowData input;
private CompletableFuture<WorkflowData> future = null;

// will be populated during graph parsing
private Set<ProcessNode> predecessors = Collections.emptySet();

/**
* Create this node linked to its executing process.
*
* @param id A string identifying the workflow step
* @param workflowStep A java class implementing {@link WorkflowStep} to be executed when it's this node's turn.
*/
ProcessNode(String id, WorkflowStep workflowStep) {
this(id, workflowStep, WorkflowData.EMPTY);
}
private final List<ProcessNode> predecessors;
private final CompletableFuture<WorkflowData> future = new CompletableFuture<>();

/**
* Create this node linked to its executing process.
* Create this node linked to its executing process, including input data and any predecessor nodes.
*
* @param id A string identifying the workflow step
* @param workflowStep A java class implementing {@link WorkflowStep} to be executed when it's this node's turn.
* @param input Input required by the node
* @param input Input required by the node encoded in a {@link WorkflowData} instance.
* @param predecessors Nodes preceding this one in the workflow
*/
public ProcessNode(String id, WorkflowStep workflowStep, WorkflowData input) {
public ProcessNode(String id, WorkflowStep workflowStep, WorkflowData input, List<ProcessNode> predecessors) {
this.id = id;
this.workflowStep = workflowStep;
this.input = input;
this.predecessors = predecessors;
}

/**
Expand Down Expand Up @@ -92,41 +79,31 @@ public WorkflowData input() {
* @return A future indicating the processing state of this node.
* Returns {@code null} if it has not begun executing, should not happen if a workflow is sorted and executed topologically.
*/
public CompletableFuture<WorkflowData> getFuture() {
public CompletableFuture<WorkflowData> future() {
return future;
}

/**
* Returns the predecessors of this node in the workflow.
* The predecessor's {@link #getFuture()} must complete before execution begins on this node.
* The predecessor's {@link #future()} must complete before execution begins on this node.
*
* @return a set of predecessor nodes, if any. At least one node in the graph must have no predecessors and serve as a start node.
*/
public Set<ProcessNode> getPredecessors() {
public List<ProcessNode> predecessors() {
return predecessors;
}

/**
* Sets the predecessor node. Called by {@link TemplateParser}.
*
* @param predecessors The predecessors of this node.
*/
void setPredecessors(Set<ProcessNode> predecessors) {
this.predecessors = Set.copyOf(predecessors);
}

/**
* Execute this node in the sequence. Initializes the node's {@link CompletableFuture} and completes it when the process completes.
*
* @return this node's future. This is returned immediately, while process execution continues asynchronously.
*/
public CompletableFuture<WorkflowData> execute() {
this.future = new CompletableFuture<>();
// TODO this class will be instantiated with the OpenSearch thread pool (or one for tests!)
// the generic executor from that pool should be passed to this runAsync call
// https://github.com/opensearch-project/opensearch-ai-flow-framework/issues/42
CompletableFuture.runAsync(() -> {
List<CompletableFuture<WorkflowData>> predFutures = predecessors.stream().map(p -> p.getFuture()).collect(Collectors.toList());
List<CompletableFuture<WorkflowData>> predFutures = predecessors.stream().map(p -> p.future()).collect(Collectors.toList());
if (!predecessors.isEmpty()) {
CompletableFuture<Void> waitForPredecessors = CompletableFuture.allOf(predFutures.toArray(new CompletableFuture<?>[0]));
try {
Expand Down Expand Up @@ -168,20 +145,6 @@ private void handleException(Exception e) {
logger.debug("<<< Completed Exceptionally {}", this.id);
}

@Override
public int hashCode() {
return Objects.hash(id);
}

@Override
public boolean equals(Object obj) {
if (this == obj) return true;
if (obj == null) return false;
if (getClass() != obj.getClass()) return false;
ProcessNode other = (ProcessNode) obj;
return Objects.equals(id, other.id);
}

@Override
public String toString() {
return this.id;
Expand Down
Loading

0 comments on commit a9f8559

Please sign in to comment.