From 764c5afda093f4a1afe566261cdc98fb1cb7f594 Mon Sep 17 00:00:00 2001 From: Joshua Palis Date: Thu, 14 Sep 2023 22:46:32 +0000 Subject: [PATCH] Initial create ingest pipeline implementation Signed-off-by: Joshua Palis --- .../flowframework/FlowFrameworkPlugin.java | 40 +++++++++- .../CreateIngestPipelineRequestData.java | 42 ++++++++++ .../CreateIngestPipelineResponseData.java | 28 +++++++ .../workflow/CreateIngestPipelineStep.java | 79 +++++++++++++++++++ .../flowframework/workflow/WorkflowData.java | 11 ++- .../workflow/WorkflowInputData.java | 24 ++++++ .../flowframework/workflow/WorkflowStep.java | 11 ++- src/main/resources/log4j2.xml | 17 ++++ 8 files changed, 244 insertions(+), 8 deletions(-) create mode 100644 src/main/java/org/opensearch/flowframework/workflow/CreateIngestPipelineRequestData.java create mode 100644 src/main/java/org/opensearch/flowframework/workflow/CreateIngestPipelineResponseData.java create mode 100644 src/main/java/org/opensearch/flowframework/workflow/CreateIngestPipelineStep.java create mode 100644 src/main/java/org/opensearch/flowframework/workflow/WorkflowInputData.java create mode 100644 src/main/resources/log4j2.xml diff --git a/src/main/java/org/opensearch/flowframework/FlowFrameworkPlugin.java b/src/main/java/org/opensearch/flowframework/FlowFrameworkPlugin.java index f810767eb..fdaeaa2be 100644 --- a/src/main/java/org/opensearch/flowframework/FlowFrameworkPlugin.java +++ b/src/main/java/org/opensearch/flowframework/FlowFrameworkPlugin.java @@ -8,11 +8,49 @@ */ package org.opensearch.flowframework; +import org.opensearch.client.Client; +import org.opensearch.cluster.metadata.IndexNameExpressionResolver; +import org.opensearch.cluster.service.ClusterService; +import org.opensearch.core.common.io.stream.NamedWriteableRegistry; +import org.opensearch.core.xcontent.NamedXContentRegistry; +import org.opensearch.env.Environment; +import org.opensearch.env.NodeEnvironment; +import org.opensearch.flowframework.workflow.CreateIngestPipelineStep; import org.opensearch.plugins.Plugin; +import org.opensearch.repositories.RepositoriesService; +import org.opensearch.script.ScriptService; +import org.opensearch.threadpool.ThreadPool; +import org.opensearch.watcher.ResourceWatcherService; + +import java.util.Collection; +import java.util.Collections; +import java.util.function.Supplier; /** * An OpenSearch plugin that enables builders to innovate AI apps on OpenSearch. */ public class FlowFrameworkPlugin extends Plugin { - // Implement the relevant Plugin Interfaces here + + private Client client; + + @Override + public Collection createComponents( + Client client, + ClusterService clusterService, + ThreadPool threadPool, + ResourceWatcherService resourceWatcherService, + ScriptService scriptService, + NamedXContentRegistry xContentRegistry, + Environment environment, + NodeEnvironment nodeEnvironment, + NamedWriteableRegistry namedWriteableRegistry, + IndexNameExpressionResolver indexNameExpressionResolver, + Supplier repositoriesServiceSupplier + ) { + // TODO create ingest pipeline workflow item here + this.client = client; + CreateIngestPipelineStep createIngestPipelineStep = new CreateIngestPipelineStep(client); + + return Collections.emptyList(); + } } diff --git a/src/main/java/org/opensearch/flowframework/workflow/CreateIngestPipelineRequestData.java b/src/main/java/org/opensearch/flowframework/workflow/CreateIngestPipelineRequestData.java new file mode 100644 index 000000000..f481c1d38 --- /dev/null +++ b/src/main/java/org/opensearch/flowframework/workflow/CreateIngestPipelineRequestData.java @@ -0,0 +1,42 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ +package org.opensearch.flowframework.workflow; + +import org.opensearch.action.ingest.PutPipelineRequest; + +import java.util.HashMap; +import java.util.Map; + +public class CreateIngestPipelineRequestData implements WorkflowInputData { + + private Map params = new HashMap<>(); + private Map content = new HashMap<>(); + + public CreateIngestPipelineRequestData(PutPipelineRequest request) { + super(); + + // RestPutPipelineAction params + params.put("id", request.getId()); + + // PutPipelineRequest content + content.put("source", request.getSource()); + content.put("mediaType", request.getMediaType()); + } + + @Override + public Map getContent() { + return Map.copyOf(content); + } + + @Override + public Map getParams() { + return Map.copyOf(params); + } + +} diff --git a/src/main/java/org/opensearch/flowframework/workflow/CreateIngestPipelineResponseData.java b/src/main/java/org/opensearch/flowframework/workflow/CreateIngestPipelineResponseData.java new file mode 100644 index 000000000..09d9db4e7 --- /dev/null +++ b/src/main/java/org/opensearch/flowframework/workflow/CreateIngestPipelineResponseData.java @@ -0,0 +1,28 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ +package org.opensearch.flowframework.workflow; + +import java.util.HashMap; +import java.util.Map; + +public class CreateIngestPipelineResponseData implements WorkflowData { + + private Map content = new HashMap<>(); + + public CreateIngestPipelineResponseData(String ingestPipelineId) { + super(); + // PutPipelineAction returns only an acknodledged response, returning ingest pipeline id instead + content.put("pipelineId", ingestPipelineId); + } + + @Override + public Map getContent() { + return Map.copyOf(content); + } +} diff --git a/src/main/java/org/opensearch/flowframework/workflow/CreateIngestPipelineStep.java b/src/main/java/org/opensearch/flowframework/workflow/CreateIngestPipelineStep.java new file mode 100644 index 000000000..e0ce35da3 --- /dev/null +++ b/src/main/java/org/opensearch/flowframework/workflow/CreateIngestPipelineStep.java @@ -0,0 +1,79 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ +package org.opensearch.flowframework.workflow; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.opensearch.action.ingest.PutPipelineRequest; +import org.opensearch.client.Client; +import org.opensearch.client.ClusterAdminClient; +import org.opensearch.core.action.ActionListener; +import org.opensearch.core.common.bytes.BytesReference; +import org.opensearch.core.xcontent.MediaType; + +import java.util.List; +import java.util.concurrent.CompletableFuture; + +/** + * Workflow step to create an ingest pipeline + */ +public class CreateIngestPipelineStep implements WorkflowStep { + + private static final Logger logger = LogManager.getLogger(CreateIngestPipelineStep.class); + private static final String NAME = "create_ingest_pipeline_step"; + + private final ClusterAdminClient clusterAdminClient; + + public CreateIngestPipelineStep(Client client) { + this.clusterAdminClient = client.admin().cluster(); + } + + @Override + public CompletableFuture execute(List data) { + + CompletableFuture createIngestPipelineFuture = new CompletableFuture<>(); + PutPipelineRequest putPipelineRequest = null; + + // TODO : Still not clear if this is the correct way to retrieve data from the parsed use case tempalte + for (WorkflowData workflowData : data) { + if (workflowData instanceof WorkflowInputData) { + + WorkflowInputData inputData = (WorkflowInputData) workflowData; + logger.debug("Previous step sent params: {}, content: {}", inputData.getParams(), workflowData.getContent()); + + // Extract params and content to create request + String pipelineId = inputData.getParams().get("id"); + BytesReference source = (BytesReference) inputData.getContent().get("source"); + MediaType mediaType = (MediaType) inputData.getContent().get("mediaType"); + putPipelineRequest = new PutPipelineRequest(pipelineId, source, mediaType); + } + } + + if (putPipelineRequest != null) { + String pipelineId = putPipelineRequest.getId(); + clusterAdminClient.putPipeline(putPipelineRequest, ActionListener.wrap(response -> { + logger.info("Created pipeline : " + pipelineId); + CreateIngestPipelineResponseData responseData = new CreateIngestPipelineResponseData(pipelineId); + createIngestPipelineFuture.complete(responseData); + }, exception -> { + logger.error("Failed to create pipeline : " + exception.getMessage()); + createIngestPipelineFuture.completeExceptionally(exception); + })); + } else { + createIngestPipelineFuture.completeExceptionally(new Exception("Failed to create pipeline")); + } + return createIngestPipelineFuture; + } + + @Override + public String getName() { + return NAME; + } + +} diff --git a/src/main/java/org/opensearch/flowframework/workflow/WorkflowData.java b/src/main/java/org/opensearch/flowframework/workflow/WorkflowData.java index 3e8dc81b2..4e17ed0d3 100644 --- a/src/main/java/org/opensearch/flowframework/workflow/WorkflowData.java +++ b/src/main/java/org/opensearch/flowframework/workflow/WorkflowData.java @@ -8,7 +8,16 @@ */ package org.opensearch.flowframework.workflow; +import java.util.Map; + /** * Interface for handling the input/output of the building blocks. */ -public interface WorkflowData {} +public interface WorkflowData { + + /** + * Accesses a map containing the content of the workflow step. This represents the data associated with a Rest API request. + * @return the content of this step. + */ + Map getContent(); +} diff --git a/src/main/java/org/opensearch/flowframework/workflow/WorkflowInputData.java b/src/main/java/org/opensearch/flowframework/workflow/WorkflowInputData.java new file mode 100644 index 000000000..86e1179cf --- /dev/null +++ b/src/main/java/org/opensearch/flowframework/workflow/WorkflowInputData.java @@ -0,0 +1,24 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ +package org.opensearch.flowframework.workflow; + +import java.util.Map; + +/** + * Interface for handling the input of the building blocks. + */ +public interface WorkflowInputData extends WorkflowData { + + /** + * Accesses a map containing the params of this workflow step. This represents the params associated with a Rest API request, parsed from the URI. + * @return the params of this step. + */ + Map getParams(); + +} diff --git a/src/main/java/org/opensearch/flowframework/workflow/WorkflowStep.java b/src/main/java/org/opensearch/flowframework/workflow/WorkflowStep.java index 6a65ce6e3..ea3814a91 100644 --- a/src/main/java/org/opensearch/flowframework/workflow/WorkflowStep.java +++ b/src/main/java/org/opensearch/flowframework/workflow/WorkflowStep.java @@ -8,8 +8,7 @@ */ package org.opensearch.flowframework.workflow; -import org.opensearch.common.Nullable; - +import java.util.List; import java.util.concurrent.CompletableFuture; /** @@ -18,11 +17,11 @@ public interface WorkflowStep { /** - * Triggers the processing of the building block. - * @param data for input/output params of the building blocks. - * @return CompletableFuture of the building block. + * Triggers the actual processing of the building block. + * @param data representing input params and content, or output content of previous steps. + * @return A CompletableFuture of the building block. This block should return immediately, but not be completed until the step executes, containing the step's output data which may be passed to follow-on steps. */ - CompletableFuture execute(@Nullable WorkflowData data); + CompletableFuture execute(List data); /** * diff --git a/src/main/resources/log4j2.xml b/src/main/resources/log4j2.xml new file mode 100644 index 000000000..21a4c6fa5 --- /dev/null +++ b/src/main/resources/log4j2.xml @@ -0,0 +1,17 @@ + + + + + + + + + + + + + + + + + \ No newline at end of file