diff --git a/src/main/java/org/opensearch/flowframework/template/Template.java b/src/main/java/org/opensearch/flowframework/template/Template.java new file mode 100644 index 000000000..25e25b691 --- /dev/null +++ b/src/main/java/org/opensearch/flowframework/template/Template.java @@ -0,0 +1,69 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ +package org.opensearch.flowframework.template; + +import org.opensearch.Version; +import org.opensearch.core.xcontent.ToXContentObject; +import org.opensearch.core.xcontent.XContentBuilder; +import org.opensearch.flowframework.workflow.Workflow; + +import java.io.IOException; +import java.util.HashMap; +import java.util.Map; +import java.util.Map.Entry; + +/** + * The Template is the central data structure which configures workflows. This object is used to parse JSON communicated via REST API. + */ +public class Template implements ToXContentObject { + + // TODO: Some of thse are placeholders based on the template design + // Current code is only using user inputs and workflows + private String name = null; + private String description = null; + private String useCase = null; + private String[] operations = null; // probably an ENUM actually + private Version templateVersion = null; + private Version[] compatibilityVersion = null; + private Map userInputs = new HashMap<>(); + private Map workflows = new HashMap<>(); + + @Override + public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { + XContentBuilder xContentBuilder = builder.startObject(); + xContentBuilder.field("name", this.name); + xContentBuilder.field("description", this.description); + xContentBuilder.field("use_case", this.useCase); + xContentBuilder.field("operations", this.operations); + + xContentBuilder.startObject("version"); + xContentBuilder.field("template", this.templateVersion); + xContentBuilder.startArray("compatibility"); + for (Version v : this.compatibilityVersion) { + xContentBuilder.value(v); + } + xContentBuilder.endArray(); + xContentBuilder.endObject(); + + xContentBuilder.startObject("user_inputs"); + for (Entry e : userInputs.entrySet()) { + xContentBuilder.field(e.getKey(), e.getValue()); + } + xContentBuilder.endObject(); + + xContentBuilder.startObject("workflows"); + for (Entry e : workflows.entrySet()) { + xContentBuilder.field(e.getKey(), e.getValue(), params); + } + xContentBuilder.endObject(); + + return xContentBuilder.endObject(); + } + +} diff --git a/src/main/java/org/opensearch/flowframework/template/TemplateParser.java b/src/main/java/org/opensearch/flowframework/template/TemplateParser.java index 6a224179e..949cd245b 100644 --- a/src/main/java/org/opensearch/flowframework/template/TemplateParser.java +++ b/src/main/java/org/opensearch/flowframework/template/TemplateParser.java @@ -64,7 +64,7 @@ public static List parseJsonGraphToSequence(String json) { JsonObject graph = jsonObject.getAsJsonObject(WORKFLOW); List nodes = new ArrayList<>(); - List edges = new ArrayList<>(); + List edges = new ArrayList<>(); for (JsonElement nodeJson : graph.getAsJsonArray(NODES)) { JsonObject nodeObject = nodeJson.getAsJsonObject(); @@ -86,21 +86,21 @@ public static List parseJsonGraphToSequence(String json) { if (sourceNodeId.equals(destNodeId)) { throw new IllegalArgumentException("Edge connects node " + sourceNodeId + " to itself."); } - edges.add(new ProcessSequenceEdge(sourceNodeId, destNodeId)); + edges.add(new WorkflowEdge(sourceNodeId, destNodeId)); } return topologicalSort(nodes, edges); } - private static List topologicalSort(List nodes, List edges) { + private static List topologicalSort(List nodes, List edges) { // Define the graph - Set graph = new HashSet<>(edges); + Set graph = new HashSet<>(edges); // Map node id string to object Map nodeMap = nodes.stream().collect(Collectors.toMap(ProcessNode::id, Function.identity())); // Build predecessor and successor maps - Map> predecessorEdges = new HashMap<>(); - Map> successorEdges = new HashMap<>(); - for (ProcessSequenceEdge edge : edges) { + Map> predecessorEdges = new HashMap<>(); + Map> successorEdges = new HashMap<>(); + for (WorkflowEdge edge : edges) { ProcessNode source = nodeMap.get(edge.getSource()); ProcessNode dest = nodeMap.get(edge.getDestination()); predecessorEdges.computeIfAbsent(dest, k -> new HashSet<>()).add(edge); @@ -129,7 +129,7 @@ private static List topologicalSort(List nodes, List

inputs = new HashMap<>(); // maps to WorkflowData + + @Override + public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { + XContentBuilder xContentBuilder = builder.startObject(); + xContentBuilder.field(ID_FIELD, this.id); + xContentBuilder.field(TYPE_FIELD, this.type); + + xContentBuilder.startObject(INPUTS_FIELD); + for (Entry e : inputs.entrySet()) { + xContentBuilder.field(e.getKey(), e.getValue()); + } + xContentBuilder.endObject(); + + return xContentBuilder.endObject(); + } +} diff --git a/src/main/java/org/opensearch/flowframework/workflow/Workflow.java b/src/main/java/org/opensearch/flowframework/workflow/Workflow.java new file mode 100644 index 000000000..6666ff10a --- /dev/null +++ b/src/main/java/org/opensearch/flowframework/workflow/Workflow.java @@ -0,0 +1,59 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ +package org.opensearch.flowframework.workflow; + +import org.opensearch.core.xcontent.ToXContentFragment; +import org.opensearch.core.xcontent.XContentBuilder; +import org.opensearch.flowframework.template.WorkflowEdge; +import org.opensearch.flowframework.template.WorkflowNode; + +import java.io.IOException; +import java.util.HashMap; +import java.util.Map; +import java.util.Map.Entry; + +/** + * This represents an object in the workflows section of a {@link Template}. + */ +public class Workflow implements ToXContentFragment { + + private static final String USER_PARAMS_FIELD = "user_params"; + private static final String NODES_FIELD = "nodes"; + private static final String EDGES_FIELD = "edges"; + + private Map userParams = new HashMap<>(); + private WorkflowNode[] nodes = null; + private WorkflowEdge[] edges = null; + + @Override + public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { + XContentBuilder xContentBuilder = builder.startObject(); + + xContentBuilder.startObject(USER_PARAMS_FIELD); + for (Entry e : userParams.entrySet()) { + xContentBuilder.field(e.getKey(), e.getValue()); + } + xContentBuilder.endObject(); + + xContentBuilder.startArray(NODES_FIELD); + for (WorkflowNode n : nodes) { + xContentBuilder.value(n); + } + xContentBuilder.endArray(); + + xContentBuilder.startArray(EDGES_FIELD); + for (WorkflowEdge e : edges) { + xContentBuilder.value(e); + } + xContentBuilder.endArray(); + + return xContentBuilder.endObject(); + } + +} diff --git a/src/test/java/org/opensearch/flowframework/template/ProcessSequenceEdgeTests.java b/src/test/java/org/opensearch/flowframework/template/WorkflowEdgeTests.java similarity index 70% rename from src/test/java/org/opensearch/flowframework/template/ProcessSequenceEdgeTests.java rename to src/test/java/org/opensearch/flowframework/template/WorkflowEdgeTests.java index 80cecd96e..f44b54527 100644 --- a/src/test/java/org/opensearch/flowframework/template/ProcessSequenceEdgeTests.java +++ b/src/test/java/org/opensearch/flowframework/template/WorkflowEdgeTests.java @@ -10,7 +10,7 @@ import org.opensearch.test.OpenSearchTestCase; -public class ProcessSequenceEdgeTests extends OpenSearchTestCase { +public class WorkflowEdgeTests extends OpenSearchTestCase { @Override public void setUp() throws Exception { @@ -18,15 +18,15 @@ public void setUp() throws Exception { } public void testEdge() { - ProcessSequenceEdge edgeAB = new ProcessSequenceEdge("A", "B"); + WorkflowEdge edgeAB = new WorkflowEdge("A", "B"); assertEquals("A", edgeAB.getSource()); assertEquals("B", edgeAB.getDestination()); assertEquals("A->B", edgeAB.toString()); - ProcessSequenceEdge edgeAB2 = new ProcessSequenceEdge("A", "B"); + WorkflowEdge edgeAB2 = new WorkflowEdge("A", "B"); assertEquals(edgeAB, edgeAB2); - ProcessSequenceEdge edgeAC = new ProcessSequenceEdge("A", "C"); + WorkflowEdge edgeAC = new WorkflowEdge("A", "C"); assertNotEquals(edgeAB, edgeAC); } } diff --git a/src/test/resources/template/finaltemplate.json b/src/test/resources/template/finaltemplate.json new file mode 100644 index 000000000..be0e3da2c --- /dev/null +++ b/src/test/resources/template/finaltemplate.json @@ -0,0 +1,90 @@ +{ + "name": "semantic-search", + "description": "My semantic search use case", + "use_case": "SEMANTIC_SEARCH", + "operations": [ + "PROVISION", + "INGEST", + "QUERY" + ], + "version": { + "template": "1.0", + "compatibility": [ + "2.9", + "3.0" + ] + }, + "user_inputs": { + "index_name": "my-knn-index", + "index_settings": { + } + }, + "workflows": { + "provision": { + "steps": [{ + "id": "create_index", + "inputs": { + "name": "user_inputs.index_name", + "settings": "user_inputs.index_settings" + } + }, + { + "id": "create_ingest_pipeline", + "inputs": { + "name": "my-ingest-pipeline", + "description": "some description", + "processors": [{ + "type": "text_embedding", + "model_id": "my-existing-model-id", + "input_field": "text_passage", + "output_field": "text_embedding" + }] + } + } + ], + "edges": [{ + "source": "create_index", + "dest": "create_ingest_pipeline" + }] + }, + "ingest": { + "user_params": { + "document": {} + }, + "steps": [{ + "id": "ingest_index", + "inputs": { + "index": "user_inputs.index_name", + "ingest_pipeline": "my-ingest-pipeline", + "document": "user_params.document" + } + }] + }, + "query": { + "user_params": { + "plaintext": "string" + }, + "nodes": [{ + "id": "transform_query", + "inputs": { + "template": "neural-search-template-1", + "plaintext": "user_params.plaintext" + } + }, + { + "id": "query_index", + "inputs": { + "index": "user_inputs.index_name", + "query": "{{output-from-prev-step}}.query", + "search_request_processors": [], + "search_response_processors": [] + } + } + ], + "edges": [{ + "source": "transform_query", + "dest": "query_index" + }] + } + } +}