Skip to content

Commit

Permalink
Add XContent classes representing Template JSON
Browse files Browse the repository at this point in the history
Signed-off-by: Daniel Widdis <[email protected]>
  • Loading branch information
dbwiddis committed Sep 20, 2023
1 parent 683648e commit 8332103
Show file tree
Hide file tree
Showing 7 changed files with 298 additions and 16 deletions.
69 changes: 69 additions & 0 deletions src/main/java/org/opensearch/flowframework/template/Template.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/
package org.opensearch.flowframework.template;

import org.opensearch.Version;
import org.opensearch.core.xcontent.ToXContentObject;
import org.opensearch.core.xcontent.XContentBuilder;
import org.opensearch.flowframework.workflow.Workflow;

import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import java.util.Map.Entry;

/**
* The Template is the central data structure which configures workflows. This object is used to parse JSON communicated via REST API.
*/
public class Template implements ToXContentObject {

// TODO: Some of thse are placeholders based on the template design
// Current code is only using user inputs and workflows
private String name = null;
private String description = null;
private String useCase = null;
private String[] operations = null; // probably an ENUM actually
private Version templateVersion = null;
private Version[] compatibilityVersion = null;
private Map<String, String> userInputs = new HashMap<>();
private Map<String, Workflow> workflows = new HashMap<>();

@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
XContentBuilder xContentBuilder = builder.startObject();
xContentBuilder.field("name", this.name);
xContentBuilder.field("description", this.description);
xContentBuilder.field("use_case", this.useCase);
xContentBuilder.field("operations", this.operations);

xContentBuilder.startObject("version");
xContentBuilder.field("template", this.templateVersion);
xContentBuilder.startArray("compatibility");
for (Version v : this.compatibilityVersion) {
xContentBuilder.value(v);
}
xContentBuilder.endArray();
xContentBuilder.endObject();

xContentBuilder.startObject("user_inputs");
for (Entry<String, String> e : userInputs.entrySet()) {
xContentBuilder.field(e.getKey(), e.getValue());
}
xContentBuilder.endObject();

xContentBuilder.startObject("workflows");
for (Entry<String, Workflow> e : workflows.entrySet()) {
xContentBuilder.field(e.getKey(), e.getValue(), params);
}
xContentBuilder.endObject();

return xContentBuilder.endObject();
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ public static List<ProcessNode> parseJsonGraphToSequence(String json) {
JsonObject graph = jsonObject.getAsJsonObject(WORKFLOW);

List<ProcessNode> nodes = new ArrayList<>();
List<ProcessSequenceEdge> edges = new ArrayList<>();
List<WorkflowEdge> edges = new ArrayList<>();

for (JsonElement nodeJson : graph.getAsJsonArray(NODES)) {
JsonObject nodeObject = nodeJson.getAsJsonObject();
Expand All @@ -86,21 +86,21 @@ public static List<ProcessNode> parseJsonGraphToSequence(String json) {
if (sourceNodeId.equals(destNodeId)) {
throw new IllegalArgumentException("Edge connects node " + sourceNodeId + " to itself.");
}
edges.add(new ProcessSequenceEdge(sourceNodeId, destNodeId));
edges.add(new WorkflowEdge(sourceNodeId, destNodeId));
}

return topologicalSort(nodes, edges);
}

private static List<ProcessNode> topologicalSort(List<ProcessNode> nodes, List<ProcessSequenceEdge> edges) {
private static List<ProcessNode> topologicalSort(List<ProcessNode> nodes, List<WorkflowEdge> edges) {
// Define the graph
Set<ProcessSequenceEdge> graph = new HashSet<>(edges);
Set<WorkflowEdge> graph = new HashSet<>(edges);
// Map node id string to object
Map<String, ProcessNode> nodeMap = nodes.stream().collect(Collectors.toMap(ProcessNode::id, Function.identity()));
// Build predecessor and successor maps
Map<ProcessNode, Set<ProcessSequenceEdge>> predecessorEdges = new HashMap<>();
Map<ProcessNode, Set<ProcessSequenceEdge>> successorEdges = new HashMap<>();
for (ProcessSequenceEdge edge : edges) {
Map<ProcessNode, Set<WorkflowEdge>> predecessorEdges = new HashMap<>();
Map<ProcessNode, Set<WorkflowEdge>> successorEdges = new HashMap<>();
for (WorkflowEdge edge : edges) {
ProcessNode source = nodeMap.get(edge.getSource());
ProcessNode dest = nodeMap.get(edge.getDestination());
predecessorEdges.computeIfAbsent(dest, k -> new HashSet<>()).add(edge);
Expand Down Expand Up @@ -129,7 +129,7 @@ private static List<ProcessNode> topologicalSort(List<ProcessNode> nodes, List<P
// add n to L
sortedNodes.add(n);
// for each node m with an edge e from n to m do
for (ProcessSequenceEdge e : successorEdges.getOrDefault(n, Collections.emptySet())) {
for (WorkflowEdge e : successorEdges.getOrDefault(n, Collections.emptySet())) {
ProcessNode m = nodeMap.get(e.getDestination());
// remove edge e from the graph
graph.remove(e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,19 @@
*/
package org.opensearch.flowframework.template;

import org.opensearch.core.xcontent.ToXContentFragment;
import org.opensearch.core.xcontent.XContentBuilder;

import java.io.IOException;
import java.util.Objects;

/**
* Representation of an edge between process nodes in a workflow graph.
* This represents an edge between process nodes (steps) in a workflow graph in the {@link Template}.
*/
public class ProcessSequenceEdge {
public class WorkflowEdge implements ToXContentFragment {
public static final String DEST_FIELD = "dest";
public static final String SOURCE_FIELD = "source";

private final String source;
private final String destination;

Expand All @@ -23,11 +30,19 @@ public class ProcessSequenceEdge {
* @param source The source node id.
* @param destination The destination node id.
*/
ProcessSequenceEdge(String source, String destination) {
WorkflowEdge(String source, String destination) {
this.source = source;
this.destination = destination;
}

@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
XContentBuilder xContentBuilder = builder.startObject();
xContentBuilder.field(SOURCE_FIELD, this.source);
xContentBuilder.field(DEST_FIELD, this.destination);
return xContentBuilder.endObject();
}

/**
* Gets the source node id.
*
Expand Down Expand Up @@ -56,7 +71,7 @@ public boolean equals(Object obj) {
if (this == obj) return true;
if (obj == null) return false;
if (getClass() != obj.getClass()) return false;
ProcessSequenceEdge other = (ProcessSequenceEdge) obj;
WorkflowEdge other = (WorkflowEdge) obj;
return Objects.equals(destination, other.destination) && Objects.equals(source, other.source);
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/
package org.opensearch.flowframework.template;

import org.opensearch.core.xcontent.ToXContentFragment;
import org.opensearch.core.xcontent.XContentBuilder;

import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import java.util.Map.Entry;

/**
* This represents a process node (step) in a workflow graph in the {@link Template}.
* It will have a one-to-one correspondence with a {@link ProcessNode},
* where its type is used to determine the correct {@link WorkflowStep} object,
* and its inputs are used to populate the {@link WorkflowData} input.
*/
public class WorkflowNode implements ToXContentFragment {

private static final String INPUTS_FIELD = "inputs";
private static final String TYPE_FIELD = "type";
private static final String ID_FIELD = "id";

private String id = null; // unique id
private String type = null; // maps to a WorkflowStep
private Map<String, String> inputs = new HashMap<>(); // maps to WorkflowData

@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
XContentBuilder xContentBuilder = builder.startObject();
xContentBuilder.field(ID_FIELD, this.id);
xContentBuilder.field(TYPE_FIELD, this.type);

xContentBuilder.startObject(INPUTS_FIELD);
for (Entry<String, String> e : inputs.entrySet()) {
xContentBuilder.field(e.getKey(), e.getValue());
}
xContentBuilder.endObject();

return xContentBuilder.endObject();
}
}
59 changes: 59 additions & 0 deletions src/main/java/org/opensearch/flowframework/workflow/Workflow.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/
package org.opensearch.flowframework.workflow;

import org.opensearch.core.xcontent.ToXContentFragment;
import org.opensearch.core.xcontent.XContentBuilder;
import org.opensearch.flowframework.template.WorkflowEdge;
import org.opensearch.flowframework.template.WorkflowNode;

import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import java.util.Map.Entry;

/**
* This represents an object in the workflows section of a {@link Template}.
*/
public class Workflow implements ToXContentFragment {

private static final String USER_PARAMS_FIELD = "user_params";
private static final String NODES_FIELD = "nodes";
private static final String EDGES_FIELD = "edges";

private Map<String, String> userParams = new HashMap<>();
private WorkflowNode[] nodes = null;
private WorkflowEdge[] edges = null;

@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
XContentBuilder xContentBuilder = builder.startObject();

xContentBuilder.startObject(USER_PARAMS_FIELD);
for (Entry<String, String> e : userParams.entrySet()) {
xContentBuilder.field(e.getKey(), e.getValue());
}
xContentBuilder.endObject();

xContentBuilder.startArray(NODES_FIELD);
for (WorkflowNode n : nodes) {
xContentBuilder.value(n);
}
xContentBuilder.endArray();

xContentBuilder.startArray(EDGES_FIELD);
for (WorkflowEdge e : edges) {
xContentBuilder.value(e);
}
xContentBuilder.endArray();

return xContentBuilder.endObject();
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -10,23 +10,23 @@

import org.opensearch.test.OpenSearchTestCase;

public class ProcessSequenceEdgeTests extends OpenSearchTestCase {
public class WorkflowEdgeTests extends OpenSearchTestCase {

@Override
public void setUp() throws Exception {
super.setUp();
}

public void testEdge() {
ProcessSequenceEdge edgeAB = new ProcessSequenceEdge("A", "B");
WorkflowEdge edgeAB = new WorkflowEdge("A", "B");
assertEquals("A", edgeAB.getSource());
assertEquals("B", edgeAB.getDestination());
assertEquals("A->B", edgeAB.toString());

ProcessSequenceEdge edgeAB2 = new ProcessSequenceEdge("A", "B");
WorkflowEdge edgeAB2 = new WorkflowEdge("A", "B");
assertEquals(edgeAB, edgeAB2);

ProcessSequenceEdge edgeAC = new ProcessSequenceEdge("A", "C");
WorkflowEdge edgeAC = new WorkflowEdge("A", "C");
assertNotEquals(edgeAB, edgeAC);
}
}
90 changes: 90 additions & 0 deletions src/test/resources/template/finaltemplate.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
{
"name": "semantic-search",
"description": "My semantic search use case",
"use_case": "SEMANTIC_SEARCH",
"operations": [
"PROVISION",
"INGEST",
"QUERY"
],
"version": {
"template": "1.0",
"compatibility": [
"2.9",
"3.0"
]
},
"user_inputs": {
"index_name": "my-knn-index",
"index_settings": {
}
},
"workflows": {
"provision": {
"steps": [{
"id": "create_index",
"inputs": {
"name": "user_inputs.index_name",
"settings": "user_inputs.index_settings"
}
},
{
"id": "create_ingest_pipeline",
"inputs": {
"name": "my-ingest-pipeline",
"description": "some description",
"processors": [{
"type": "text_embedding",
"model_id": "my-existing-model-id",
"input_field": "text_passage",
"output_field": "text_embedding"
}]
}
}
],
"edges": [{
"source": "create_index",
"dest": "create_ingest_pipeline"
}]
},
"ingest": {
"user_params": {
"document": {}
},
"steps": [{
"id": "ingest_index",
"inputs": {
"index": "user_inputs.index_name",
"ingest_pipeline": "my-ingest-pipeline",
"document": "user_params.document"
}
}]
},
"query": {
"user_params": {
"plaintext": "string"
},
"nodes": [{
"id": "transform_query",
"inputs": {
"template": "neural-search-template-1",
"plaintext": "user_params.plaintext"
}
},
{
"id": "query_index",
"inputs": {
"index": "user_inputs.index_name",
"query": "{{output-from-prev-step}}.query",
"search_request_processors": [],
"search_response_processors": []
}
}
],
"edges": [{
"source": "transform_query",
"dest": "query_index"
}]
}
}
}

0 comments on commit 8332103

Please sign in to comment.