diff --git a/src/main/java/org/opensearch/flowframework/workflow/CreateIngestPipelineStep.java b/src/main/java/org/opensearch/flowframework/workflow/CreateIngestPipelineStep.java index d31212481..57075f0ec 100644 --- a/src/main/java/org/opensearch/flowframework/workflow/CreateIngestPipelineStep.java +++ b/src/main/java/org/opensearch/flowframework/workflow/CreateIngestPipelineStep.java @@ -13,13 +13,17 @@ import org.opensearch.action.ingest.PutPipelineRequest; import org.opensearch.client.Client; import org.opensearch.client.ClusterAdminClient; +import org.opensearch.common.xcontent.XContentFactory; import org.opensearch.common.xcontent.XContentType; import org.opensearch.core.action.ActionListener; -import org.opensearch.core.common.bytes.BytesArray; +import org.opensearch.core.common.bytes.BytesReference; +import org.opensearch.core.xcontent.XContentBuilder; +import java.io.IOException; import java.util.List; import java.util.Map; import java.util.concurrent.CompletableFuture; +import java.util.stream.Stream; /** * Workflow step to create an ingest pipeline @@ -29,6 +33,12 @@ public class CreateIngestPipelineStep implements WorkflowStep { private static final Logger logger = LogManager.getLogger(CreateIngestPipelineStep.class); private static final String NAME = "create_ingest_pipeline_step"; + private static final String PIPELINE_ID_FIELD = "id"; + private static final String DESCRIPTION_FIELD = "description"; + private static final String MODEL_ID_FIELD = "model_id"; + private static final String INPUT_FIELD = "input_field_name"; + private static final String OUTPUT_FIELD = "output_field_name"; + // Client to store a pipeline in the cluster state private final ClusterAdminClient clusterAdminClient; // Client to store response data into global context index @@ -48,34 +58,55 @@ public CreateIngestPipelineStep(Client client) { public CompletableFuture execute(List data) { CompletableFuture createIngestPipelineFuture = new CompletableFuture<>(); + String pipelineId = null; - String source = null; + String description = null; + String modelId = null; + String inputFieldName = null; + String outputFieldName = null; + BytesReference configuration = null; - // Extract required content from workflow data + // Extract required content from workflow data and generate the ingest pipeline configuration for (WorkflowData workflowData : data) { Map parameters = workflowData.getParams(); Map content = workflowData.getContent(); - logger.debug("Previous step sent params: {}, content: {}", parameters, content); - if (content.containsKey("id")) { - pipelineId = (String) content.get("id"); + if (content.containsKey(PIPELINE_ID_FIELD)) { + pipelineId = (String) content.get(PIPELINE_ID_FIELD); + } + if (content.containsKey(DESCRIPTION_FIELD)) { + description = (String) content.get(DESCRIPTION_FIELD); + } + if (content.containsKey(MODEL_ID_FIELD)) { + modelId = (String) content.get(MODEL_ID_FIELD); + } + if (content.containsKey(INPUT_FIELD)) { + inputFieldName = (String) content.get(INPUT_FIELD); } - if (content.containsKey("source")) { - source = (String) content.get("source"); + if (content.containsKey(OUTPUT_FIELD)) { + outputFieldName = (String) content.get(OUTPUT_FIELD); } - if (pipelineId != null && source != null) { + + if (Stream.of(pipelineId, description, modelId, inputFieldName, outputFieldName).allMatch(x -> x != null)) { + try { + configuration = BytesReference.bytes( + buildIngestPipelineRequestContent(description, modelId, inputFieldName, outputFieldName) + ); + } catch (IOException e) { + logger.error("Failed to create ingest pipeline : " + e.getMessage()); + createIngestPipelineFuture.completeExceptionally(e); + } break; } } // Create PutPipelineRequest and execute - if (pipelineId != null && source != null) { - PutPipelineRequest putPipelineRequest = new PutPipelineRequest(pipelineId, new BytesArray(source), XContentType.JSON); + if (configuration != null) { + PutPipelineRequest putPipelineRequest = new PutPipelineRequest(pipelineId, configuration, XContentType.JSON); clusterAdminClient.putPipeline(putPipelineRequest, ActionListener.wrap(response -> { - - logger.info("Created pipeline : " + putPipelineRequest.getId()); + logger.info("Created ingest pipeline : " + putPipelineRequest.getId()); // PutPipelineRequest returns only an AcknowledgeResponse, returning pipelineId instead createIngestPipelineFuture.complete(new WorkflowData() { @@ -88,12 +119,12 @@ public Map getContent() { // TODO : Use node client to index response data to global context (pending global context index implementation) }, exception -> { - logger.error("Failed to create pipeline : " + exception.getMessage()); + logger.error("Failed to create ignest pipeline : " + exception.getMessage()); createIngestPipelineFuture.completeExceptionally(exception); })); } else { // Required workflow data not found - createIngestPipelineFuture.completeExceptionally(new Exception("Failed to create pipeline")); + createIngestPipelineFuture.completeExceptionally(new Exception("Failed to create ingest pipeline, required inputs not found")); } return createIngestPipelineFuture; @@ -104,4 +135,44 @@ public String getName() { return NAME; } + /** + * Temporary, generates the ingest pipeline request content from workflow data + * { + * "description" : "", + * "processors" : [ + * "text_embedding" : { + * "model_id" : "", + * "field_map" : { + * "" : "" + * } + * } + * ] + * } + * + * @param description The description of the ingest pipeline configuration + * @param modelId The ID of the model that will be used in the embedding interface + * @param inputFieldName The field name used to cache text for text embeddings + * @param outputFieldName The field name in which output text is stored + * @throws IOException if the request content fails to be generated + * @return the xcontent builder with the formatted ingest pipeline configuration + */ + private XContentBuilder buildIngestPipelineRequestContent( + String description, + String modelId, + String inputFieldName, + String outputFieldName + ) throws IOException { + return XContentFactory.jsonBuilder() + .startObject() + .field("description", description) + .startArray("processors") + .startObject("text_embedding") + .field("model_id", modelId) + .startObject("field_map") + .field(inputFieldName, outputFieldName) + .endObject() + .endObject() + .endArray() + .endObject(); + } }