diff --git a/.codecov.yml b/.codecov.yml index 827160da7..3e977ec34 100644 --- a/.codecov.yml +++ b/.codecov.yml @@ -1,10 +1,6 @@ codecov: require_ci_to_pass: true -# ignore files in demo package -ignore: - - "src/main/java/demo" - coverage: precision: 2 round: down diff --git a/src/main/java/demo/Demo.java b/src/main/java/demo/Demo.java deleted file mode 100644 index e4d2aa8f8..000000000 --- a/src/main/java/demo/Demo.java +++ /dev/null @@ -1,93 +0,0 @@ -/* - * Copyright OpenSearch Contributors - * SPDX-License-Identifier: Apache-2.0 - * - * The OpenSearch Contributors require contributions made to - * this file be licensed under the Apache-2.0 license or a - * compatible open source license. - */ -package demo; - -import org.apache.logging.log4j.LogManager; -import org.apache.logging.log4j.Logger; -import org.opensearch.client.Client; -import org.opensearch.client.node.NodeClient; -import org.opensearch.cluster.service.ClusterService; -import org.opensearch.common.SuppressForbidden; -import org.opensearch.common.io.PathUtils; -import org.opensearch.common.settings.Settings; -import org.opensearch.flowframework.model.Template; -import org.opensearch.flowframework.workflow.ProcessNode; -import org.opensearch.flowframework.workflow.WorkflowProcessSorter; -import org.opensearch.flowframework.workflow.WorkflowStepFactory; -import org.opensearch.ml.client.MachineLearningNodeClient; -import org.opensearch.threadpool.ThreadPool; - -import java.io.IOException; -import java.nio.charset.StandardCharsets; -import java.nio.file.Files; -import java.util.ArrayList; -import java.util.List; -import java.util.Locale; -import java.util.concurrent.CompletableFuture; -import java.util.concurrent.TimeUnit; -import java.util.stream.Collectors; - -/** - * Demo class exercising {@link WorkflowProcessSorter}. This will be moved to a unit test. - */ -public class Demo { - - private static final Logger logger = LogManager.getLogger(Demo.class); - - private Demo() {} - - /** - * Demonstrate parsing a JSON graph. - * - * @param args unused - * @throws IOException on a failure - */ - @SuppressForbidden(reason = "just a demo class that will be deleted") - public static void main(String[] args) throws IOException { - String path = "src/test/resources/template/demo.json"; - String json; - try { - json = new String(Files.readAllBytes(PathUtils.get(path)), StandardCharsets.UTF_8); - } catch (IOException e) { - logger.error("Failed to read JSON at path {}", path); - return; - } - ClusterService clusterService = new ClusterService(null, null, null); - Client client = new NodeClient(null, null); - MachineLearningNodeClient mlClient = new MachineLearningNodeClient(client); - WorkflowStepFactory factory = new WorkflowStepFactory(clusterService, client, mlClient); - - ThreadPool threadPool = new ThreadPool(Settings.EMPTY); - WorkflowProcessSorter workflowProcessSorter = new WorkflowProcessSorter(factory, threadPool); - - logger.info("Parsing graph to sequence..."); - Template t = Template.parse(json); - List processSequence = workflowProcessSorter.sortProcessNodes(t.workflows().get("demo")); - List> futureList = new ArrayList<>(); - - for (ProcessNode n : processSequence) { - List predecessors = n.predecessors(); - logger.info( - "Queueing process [{}].{}", - n.id(), - predecessors.isEmpty() - ? " Can start immediately!" - : String.format( - Locale.getDefault(), - " Must wait for [%s] to complete first.", - predecessors.stream().map(p -> p.id()).collect(Collectors.joining(", ")) - ) - ); - futureList.add(n.execute()); - } - futureList.forEach(CompletableFuture::join); - logger.info("All done!"); - ThreadPool.terminate(threadPool, 500, TimeUnit.MILLISECONDS); - } -} diff --git a/src/main/java/demo/DemoWorkflowStep.java b/src/main/java/demo/DemoWorkflowStep.java deleted file mode 100644 index 267a8c8ab..000000000 --- a/src/main/java/demo/DemoWorkflowStep.java +++ /dev/null @@ -1,52 +0,0 @@ -/* - * Copyright OpenSearch Contributors - * SPDX-License-Identifier: Apache-2.0 - * - * The OpenSearch Contributors require contributions made to - * this file be licensed under the Apache-2.0 license or a - * compatible open source license. - */ -package demo; - -import org.opensearch.flowframework.workflow.WorkflowData; -import org.opensearch.flowframework.workflow.WorkflowStep; - -import java.util.List; -import java.util.concurrent.CompletableFuture; - -/** - * Demo workflowstep to show sequenced execution - */ -public class DemoWorkflowStep implements WorkflowStep { - - private final long delay; - private final String name; - - /** - * Instantiate a step with a delay. - * @param delay milliseconds to take pretending to do work while really sleeping - */ - public DemoWorkflowStep(long delay) { - this.delay = delay; - this.name = "DEMO_DELAY_" + delay; - } - - @Override - public CompletableFuture execute(List data) { - CompletableFuture future = new CompletableFuture<>(); - CompletableFuture.runAsync(() -> { - try { - Thread.sleep(this.delay); - future.complete(WorkflowData.EMPTY); - } catch (InterruptedException e) { - future.completeExceptionally(e); - } - }); - return future; - } - - @Override - public String getName() { - return name; - } -} diff --git a/src/main/java/demo/README.txt b/src/main/java/demo/README.txt deleted file mode 100644 index 4fef77960..000000000 --- a/src/main/java/demo/README.txt +++ /dev/null @@ -1,13 +0,0 @@ - -DO NOT DEPEND ON CLASSES IN THIS PACKAGE. - -The contents of this folder are for demo/proof-of-concept use. - -Feel free to look at the classes in this folder for potential "how could I" scenarios. - -Tests will not be written against them. -Documentation may be incomplete, wrong, or outdated. -These are not for production use. -They will be deleted without notice at some point, and altered without notice at other points. - -DO NOT DEPEND ON CLASSES IN THIS PACKAGE. diff --git a/src/main/java/demo/TemplateParseDemo.java b/src/main/java/demo/TemplateParseDemo.java deleted file mode 100644 index e284764da..000000000 --- a/src/main/java/demo/TemplateParseDemo.java +++ /dev/null @@ -1,75 +0,0 @@ -/* - * Copyright OpenSearch Contributors - * SPDX-License-Identifier: Apache-2.0 - * - * The OpenSearch Contributors require contributions made to - * this file be licensed under the Apache-2.0 license or a - * compatible open source license. - */ -package demo; - -import org.apache.logging.log4j.LogManager; -import org.apache.logging.log4j.Logger; -import org.opensearch.client.Client; -import org.opensearch.client.node.NodeClient; -import org.opensearch.cluster.service.ClusterService; -import org.opensearch.common.SuppressForbidden; -import org.opensearch.common.io.PathUtils; -import org.opensearch.common.settings.Settings; -import org.opensearch.flowframework.model.Template; -import org.opensearch.flowframework.model.Workflow; -import org.opensearch.flowframework.workflow.WorkflowProcessSorter; -import org.opensearch.flowframework.workflow.WorkflowStepFactory; -import org.opensearch.ml.client.MachineLearningNodeClient; -import org.opensearch.threadpool.ThreadPool; - -import java.io.IOException; -import java.nio.charset.StandardCharsets; -import java.nio.file.Files; -import java.util.Map.Entry; -import java.util.concurrent.TimeUnit; - -/** - * Demo class exercising {@link WorkflowProcessSorter}. This will be moved to a unit test. - */ -public class TemplateParseDemo { - - private static final Logger logger = LogManager.getLogger(TemplateParseDemo.class); - - private TemplateParseDemo() {} - - /** - * Demonstrate parsing a JSON graph. - * - * @param args unused - * @throws IOException on error. - */ - @SuppressForbidden(reason = "just a demo class that will be deleted") - public static void main(String[] args) throws IOException { - String path = "src/test/resources/template/finaltemplate.json"; - String json; - try { - json = new String(Files.readAllBytes(PathUtils.get(path)), StandardCharsets.UTF_8); - } catch (IOException e) { - logger.error("Failed to read JSON at path {}", path); - return; - } - ClusterService clusterService = new ClusterService(null, null, null); - Client client = new NodeClient(null, null); - MachineLearningNodeClient mlClient = new MachineLearningNodeClient(client); - WorkflowStepFactory factory = new WorkflowStepFactory(clusterService, client, mlClient); - ThreadPool threadPool = new ThreadPool(Settings.EMPTY); - WorkflowProcessSorter workflowProcessSorter = new WorkflowProcessSorter(factory, threadPool); - - Template t = Template.parse(json); - - System.out.println(t.toJson()); - System.out.println(t.toYaml()); - - for (Entry e : t.workflows().entrySet()) { - logger.info("Parsing {} workflow.", e.getKey()); - workflowProcessSorter.sortProcessNodes(e.getValue()); - } - ThreadPool.terminate(threadPool, 500, TimeUnit.MILLISECONDS); - } -} diff --git a/src/main/java/org/opensearch/flowframework/workflow/NoOpStep.java b/src/main/java/org/opensearch/flowframework/workflow/NoOpStep.java new file mode 100644 index 000000000..098c5626c --- /dev/null +++ b/src/main/java/org/opensearch/flowframework/workflow/NoOpStep.java @@ -0,0 +1,32 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ +package org.opensearch.flowframework.workflow; + +import java.io.IOException; +import java.util.List; +import java.util.concurrent.CompletableFuture; + +/** + * A workflow step that does nothing. May be used for synchronizing other actions. + */ +public class NoOpStep implements WorkflowStep { + + /** The name of this step, used as a key in the template and the {@link WorkflowStepFactory} */ + public static final String NAME = "noop"; + + @Override + public CompletableFuture execute(List data) throws IOException { + return CompletableFuture.completedFuture(WorkflowData.EMPTY); + } + + @Override + public String getName() { + return NAME; + } +} diff --git a/src/main/java/org/opensearch/flowframework/workflow/WorkflowStepFactory.java b/src/main/java/org/opensearch/flowframework/workflow/WorkflowStepFactory.java index dace6c417..719e4ba10 100644 --- a/src/main/java/org/opensearch/flowframework/workflow/WorkflowStepFactory.java +++ b/src/main/java/org/opensearch/flowframework/workflow/WorkflowStepFactory.java @@ -10,14 +10,12 @@ import org.opensearch.client.Client; import org.opensearch.cluster.service.ClusterService; +import org.opensearch.core.rest.RestStatus; +import org.opensearch.flowframework.exception.FlowFrameworkException; import org.opensearch.ml.client.MachineLearningNodeClient; import java.util.HashMap; -import java.util.List; import java.util.Map; -import java.util.concurrent.CompletableFuture; - -import demo.DemoWorkflowStep; /** * Generates instances implementing {@link WorkflowStep}. @@ -39,31 +37,13 @@ public WorkflowStepFactory(ClusterService clusterService, Client client, Machine } private void populateMap(ClusterService clusterService, Client client, MachineLearningNodeClient mlClient) { + stepMap.put(NoOpStep.NAME, new NoOpStep()); stepMap.put(CreateIndexStep.NAME, new CreateIndexStep(clusterService, client)); stepMap.put(CreateIngestPipelineStep.NAME, new CreateIngestPipelineStep(client)); stepMap.put(RegisterModelStep.NAME, new RegisterModelStep(mlClient)); stepMap.put(DeployModelStep.NAME, new DeployModelStep(mlClient)); stepMap.put(CreateConnectorStep.NAME, new CreateConnectorStep(mlClient)); stepMap.put(ModelGroupStep.NAME, new ModelGroupStep(mlClient)); - - // TODO: These are from the demo class as placeholders, remove when demos are deleted - stepMap.put("demo_delay_3", new DemoWorkflowStep(3000)); - stepMap.put("demo_delay_5", new DemoWorkflowStep(5000)); - - // Use as a default until all the actual implementations are ready - stepMap.put("placeholder", new WorkflowStep() { - @Override - public CompletableFuture execute(List data) { - CompletableFuture future = new CompletableFuture<>(); - future.complete(WorkflowData.EMPTY); - return future; - } - - @Override - public String getName() { - return "placeholder"; - } - }); } /** @@ -75,8 +55,6 @@ public WorkflowStep createStep(String type) { if (stepMap.containsKey(type)) { return stepMap.get(type); } - // TODO: replace this with a FlowFrameworkException - // https://github.com/opensearch-project/opensearch-ai-flow-framework/pull/43 - return stepMap.get("placeholder"); + throw new FlowFrameworkException("Workflow step type [" + type + "] is not implemented.", RestStatus.NOT_IMPLEMENTED); } } diff --git a/src/test/java/org/opensearch/flowframework/model/TemplateTestJsonUtil.java b/src/test/java/org/opensearch/flowframework/model/TemplateTestJsonUtil.java index b38346b29..26e22af9a 100644 --- a/src/test/java/org/opensearch/flowframework/model/TemplateTestJsonUtil.java +++ b/src/test/java/org/opensearch/flowframework/model/TemplateTestJsonUtil.java @@ -14,6 +14,7 @@ import org.opensearch.core.xcontent.ToXContent; import org.opensearch.core.xcontent.ToXContentObject; import org.opensearch.core.xcontent.XContentParser; +import org.opensearch.flowframework.workflow.NoOpStep; import java.io.IOException; import java.util.List; @@ -27,7 +28,7 @@ public class TemplateTestJsonUtil { public static String node(String id) { - return nodeWithType(id, "placeholder"); + return nodeWithType(id, NoOpStep.NAME); } public static String nodeWithType(String id, String type) { diff --git a/src/test/java/org/opensearch/flowframework/workflow/NoOpStepTests.java b/src/test/java/org/opensearch/flowframework/workflow/NoOpStepTests.java new file mode 100644 index 000000000..6c03cd87d --- /dev/null +++ b/src/test/java/org/opensearch/flowframework/workflow/NoOpStepTests.java @@ -0,0 +1,26 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ +package org.opensearch.flowframework.workflow; + +import org.opensearch.test.OpenSearchTestCase; + +import java.io.IOException; +import java.util.Collections; +import java.util.concurrent.CompletableFuture; + +public class NoOpStepTests extends OpenSearchTestCase { + + public void testNoOpStep() throws IOException { + NoOpStep noopStep = new NoOpStep(); + assertEquals(NoOpStep.NAME, noopStep.getName()); + CompletableFuture future = noopStep.execute(Collections.emptyList()); + assertTrue(future.isDone()); + assertFalse(future.isCompletedExceptionally()); + } +} diff --git a/src/test/java/org/opensearch/flowframework/workflow/WorkflowProcessSorterTests.java b/src/test/java/org/opensearch/flowframework/workflow/WorkflowProcessSorterTests.java index f728dd7b1..423db8cf0 100644 --- a/src/test/java/org/opensearch/flowframework/workflow/WorkflowProcessSorterTests.java +++ b/src/test/java/org/opensearch/flowframework/workflow/WorkflowProcessSorterTests.java @@ -11,7 +11,9 @@ import org.opensearch.client.AdminClient; import org.opensearch.client.Client; import org.opensearch.cluster.service.ClusterService; +import org.opensearch.core.rest.RestStatus; import org.opensearch.core.xcontent.XContentParser; +import org.opensearch.flowframework.exception.FlowFrameworkException; import org.opensearch.flowframework.model.TemplateTestJsonUtil; import org.opensearch.flowframework.model.Workflow; import org.opensearch.ml.client.MachineLearningNodeClient; @@ -198,5 +200,12 @@ public void testExceptions() throws IOException { ex = assertThrows(IllegalArgumentException.class, () -> parse(workflow(List.of(node("A"), node("B")), List.of(edge("A", "C"))))); assertEquals("Edge destination C does not correspond to a node.", ex.getMessage()); + + ex = assertThrows( + FlowFrameworkException.class, + () -> parse(workflow(List.of(nodeWithType("A", "unimplemented_step")), Collections.emptyList())) + ); + assertEquals("Workflow step type [unimplemented_step] is not implemented.", ex.getMessage()); + assertEquals(RestStatus.NOT_IMPLEMENTED, ((FlowFrameworkException) ex).getRestStatus()); } } diff --git a/src/test/resources/template/demo.json b/src/test/resources/template/demo.json deleted file mode 100644 index 8719bf2fe..000000000 --- a/src/test/resources/template/demo.json +++ /dev/null @@ -1,80 +0,0 @@ -{ - "name": "demo-template", - "description": "Demonstrates workflow steps and passing around of input/output", - "user_inputs": { - "knn_index_name": "my-knn-index" - }, - "workflows": { - "demo": { - "nodes": [ - { - "id": "fetch_model", - "type": "demo_delay_3" - }, - { - "id": "create_index", - "type": "demo_delay_3" - }, - { - "id": "create_ingest_pipeline", - "type": "demo_delay_3" - }, - { - "id": "create_search_pipeline", - "type": "demo_delay_5" - }, - { - "id": "create_neural_search_index", - "type": "demo_delay_3" - }, - { - "id": "register_model", - "type": "demo_delay_3", - "inputs": { - "name": "openAI-gpt-3.5-turbo", - "function_name": "remote", - "description": "test model", - "connector_id": "uDna54oB76l1MtYJF84U" - } - }, - { - "id": "deploy_model", - "type": "demo_delay_3", - "inputs": { - "model_id": "abc" - } - } - ], - "edges": [ - { - "source": "fetch_model", - "dest": "create_index" - }, - { - "source": "create_index", - "dest": "create_ingest_pipeline" - }, - { - "source": "fetch_model", - "dest": "create_search_pipeline" - }, - { - "source": "create_ingest_pipeline", - "dest": "create_neural_search_index" - }, - { - "source": "create_search_pipeline", - "dest": "create_neural_search_index" - }, - { - "source": "create_neural_search_index", - "dest": "register_model" - }, - { - "source": "register_model", - "dest": "deploy_model" - } - ] - } - } -}