diff --git a/src/main/java/org/opensearch/flowframework/workflow/CreateIngestPipelineRequestData.java b/src/main/java/org/opensearch/flowframework/workflow/CreateIngestPipelineRequestData.java deleted file mode 100644 index f481c1d38..000000000 --- a/src/main/java/org/opensearch/flowframework/workflow/CreateIngestPipelineRequestData.java +++ /dev/null @@ -1,42 +0,0 @@ -/* - * Copyright OpenSearch Contributors - * SPDX-License-Identifier: Apache-2.0 - * - * The OpenSearch Contributors require contributions made to - * this file be licensed under the Apache-2.0 license or a - * compatible open source license. - */ -package org.opensearch.flowframework.workflow; - -import org.opensearch.action.ingest.PutPipelineRequest; - -import java.util.HashMap; -import java.util.Map; - -public class CreateIngestPipelineRequestData implements WorkflowInputData { - - private Map params = new HashMap<>(); - private Map content = new HashMap<>(); - - public CreateIngestPipelineRequestData(PutPipelineRequest request) { - super(); - - // RestPutPipelineAction params - params.put("id", request.getId()); - - // PutPipelineRequest content - content.put("source", request.getSource()); - content.put("mediaType", request.getMediaType()); - } - - @Override - public Map getContent() { - return Map.copyOf(content); - } - - @Override - public Map getParams() { - return Map.copyOf(params); - } - -} diff --git a/src/main/java/org/opensearch/flowframework/workflow/CreateIngestPipelineResponseData.java b/src/main/java/org/opensearch/flowframework/workflow/CreateIngestPipelineResponseData.java deleted file mode 100644 index a175ef2f2..000000000 --- a/src/main/java/org/opensearch/flowframework/workflow/CreateIngestPipelineResponseData.java +++ /dev/null @@ -1,28 +0,0 @@ -/* - * Copyright OpenSearch Contributors - * SPDX-License-Identifier: Apache-2.0 - * - * The OpenSearch Contributors require contributions made to - * this file be licensed under the Apache-2.0 license or a - * compatible open source license. - */ -package org.opensearch.flowframework.workflow; - -import java.util.HashMap; -import java.util.Map; - -public class CreateIngestPipelineResponseData implements WorkflowData { - - private Map content = new HashMap<>(); - - public CreateIngestPipelineResponseData(String ingestPipelineId) { - super(); - // PutPipelineAction returns only an ack response, returning ingest pipeline id instead - content.put("pipelineId", ingestPipelineId); - } - - @Override - public Map getContent() { - return Map.copyOf(content); - } -} diff --git a/src/main/java/org/opensearch/flowframework/workflow/CreateIngestPipelineStep.java b/src/main/java/org/opensearch/flowframework/workflow/CreateIngestPipelineStep.java index f7e4461da..7058410af 100644 --- a/src/main/java/org/opensearch/flowframework/workflow/CreateIngestPipelineStep.java +++ b/src/main/java/org/opensearch/flowframework/workflow/CreateIngestPipelineStep.java @@ -13,11 +13,12 @@ import org.opensearch.action.ingest.PutPipelineRequest; import org.opensearch.client.Client; import org.opensearch.client.ClusterAdminClient; +import org.opensearch.common.xcontent.XContentType; import org.opensearch.core.action.ActionListener; -import org.opensearch.core.common.bytes.BytesReference; -import org.opensearch.core.xcontent.MediaType; +import org.opensearch.core.common.bytes.BytesArray; import java.util.List; +import java.util.Map; import java.util.concurrent.CompletableFuture; /** @@ -42,40 +43,51 @@ public CreateIngestPipelineStep(Client client) { public CompletableFuture execute(List data) { CompletableFuture createIngestPipelineFuture = new CompletableFuture<>(); - PutPipelineRequest putPipelineRequest = null; + String pipelineId = null; + String source = null; - // TODO : Still not clear if this is the correct way to retrieve data from the parsed use case tempalte + // Extract required content from workflow data for (WorkflowData workflowData : data) { - if (workflowData instanceof WorkflowInputData) { + logger.debug("Previous step sent params: {}, content: {}", workflowData.getParams(), workflowData.getContent()); - WorkflowInputData inputData = (WorkflowInputData) workflowData; - logger.debug("Previous step sent params: {}, content: {}", inputData.getParams(), workflowData.getContent()); - - // Extract params and content to create request - String pipelineId = inputData.getParams().get("id"); - BytesReference source = (BytesReference) inputData.getContent().get("source"); - MediaType mediaType = (MediaType) inputData.getContent().get("mediaType"); - putPipelineRequest = new PutPipelineRequest(pipelineId, source, mediaType); + Map content = workflowData.getContent(); + if (content.containsKey("id")) { + pipelineId = (String) content.get("id"); + } + if (content.containsKey("source")) { + source = (String) content.get("source"); + } + if (pipelineId != null && source != null) { + break; } } - if (putPipelineRequest != null) { - String pipelineId = putPipelineRequest.getId(); + // Create PutPipelineRequest and execute + if (pipelineId != null && source != null) { + PutPipelineRequest putPipelineRequest = new PutPipelineRequest(pipelineId, new BytesArray(source), XContentType.JSON); clusterAdminClient.putPipeline(putPipelineRequest, ActionListener.wrap(response -> { - // Return create pipeline response to workflow data - logger.info("Created pipeline : " + pipelineId); - CreateIngestPipelineResponseData responseData = new CreateIngestPipelineResponseData(pipelineId); - createIngestPipelineFuture.complete(responseData); + logger.info("Created pipeline : " + putPipelineRequest.getId()); + + // PutPipelineRequest returns only an AcknowledgeResponse, returning pipelineId instead + createIngestPipelineFuture.complete(new WorkflowData() { + @Override + public Map getContent() { + return Map.of("pipelineId", putPipelineRequest.getId()); + } + }); // TODO : Use node client to index response data to global context (pending global context index implementation) + }, exception -> { logger.error("Failed to create pipeline : " + exception.getMessage()); createIngestPipelineFuture.completeExceptionally(exception); })); } else { + // Required workflow data not found createIngestPipelineFuture.completeExceptionally(new Exception("Failed to create pipeline")); } + return createIngestPipelineFuture; } diff --git a/src/main/java/org/opensearch/flowframework/workflow/WorkflowData.java b/src/main/java/org/opensearch/flowframework/workflow/WorkflowData.java index 4e17ed0d3..09eb041fc 100644 --- a/src/main/java/org/opensearch/flowframework/workflow/WorkflowData.java +++ b/src/main/java/org/opensearch/flowframework/workflow/WorkflowData.java @@ -8,16 +8,33 @@ */ package org.opensearch.flowframework.workflow; +import java.util.Collections; import java.util.Map; /** - * Interface for handling the input/output of the building blocks. + * Interface representing data provided as input to, and produced as output from, {@link WorkflowStep}s. */ public interface WorkflowData { + /** + * An object representing no data, useful when a workflow step has no required input or output. + */ + WorkflowData EMPTY = new WorkflowData() { + }; + /** * Accesses a map containing the content of the workflow step. This represents the data associated with a Rest API request. * @return the content of this step. */ - Map getContent(); + default Map getContent() { + return Collections.emptyMap(); + }; + + /** + * Accesses a map containing the params of this workflow step. This represents the params associated with a Rest API request, parsed from the URI. + * @return the params of this step. + */ + default Map getParams() { + return Collections.emptyMap(); + }; } diff --git a/src/main/java/org/opensearch/flowframework/workflow/WorkflowInputData.java b/src/main/java/org/opensearch/flowframework/workflow/WorkflowInputData.java deleted file mode 100644 index 86e1179cf..000000000 --- a/src/main/java/org/opensearch/flowframework/workflow/WorkflowInputData.java +++ /dev/null @@ -1,24 +0,0 @@ -/* - * Copyright OpenSearch Contributors - * SPDX-License-Identifier: Apache-2.0 - * - * The OpenSearch Contributors require contributions made to - * this file be licensed under the Apache-2.0 license or a - * compatible open source license. - */ -package org.opensearch.flowframework.workflow; - -import java.util.Map; - -/** - * Interface for handling the input of the building blocks. - */ -public interface WorkflowInputData extends WorkflowData { - - /** - * Accesses a map containing the params of this workflow step. This represents the params associated with a Rest API request, parsed from the URI. - * @return the params of this step. - */ - Map getParams(); - -}