From d9566d40074c325aadda1943ddc509f004265528 Mon Sep 17 00:00:00 2001 From: Joshua Palis Date: Thu, 21 Sep 2023 21:31:50 +0000 Subject: [PATCH] addressing PR comments, adding switch statement to handle parsing workflow data for required fields Signed-off-by: Joshua Palis --- .../workflow/CreateIngestPipelineStep.java | 65 ++++++++++++------- 1 file changed, 40 insertions(+), 25 deletions(-) diff --git a/src/main/java/org/opensearch/flowframework/workflow/CreateIngestPipelineStep.java b/src/main/java/org/opensearch/flowframework/workflow/CreateIngestPipelineStep.java index 57075f0ec..b14ebd263 100644 --- a/src/main/java/org/opensearch/flowframework/workflow/CreateIngestPipelineStep.java +++ b/src/main/java/org/opensearch/flowframework/workflow/CreateIngestPipelineStep.java @@ -33,8 +33,14 @@ public class CreateIngestPipelineStep implements WorkflowStep { private static final Logger logger = LogManager.getLogger(CreateIngestPipelineStep.class); private static final String NAME = "create_ingest_pipeline_step"; + // Common pipeline configuration fields private static final String PIPELINE_ID_FIELD = "id"; private static final String DESCRIPTION_FIELD = "description"; + private static final String PROCESSORS_FIELD = "processors"; + private static final String TYPE_FIELD = "type"; + + // Temporary text embedding processor fields + private static final String FIELD_MAP = "field_map"; private static final String MODEL_ID_FIELD = "model_id"; private static final String INPUT_FIELD = "input_field_name"; private static final String OUTPUT_FIELD = "output_field_name"; @@ -61,6 +67,7 @@ public CompletableFuture execute(List data) { String pipelineId = null; String description = null; + String type = null; String modelId = null; String inputFieldName = null; String outputFieldName = null; @@ -73,22 +80,30 @@ public CompletableFuture execute(List data) { Map content = workflowData.getContent(); logger.debug("Previous step sent params: {}, content: {}", parameters, content); - if (content.containsKey(PIPELINE_ID_FIELD)) { - pipelineId = (String) content.get(PIPELINE_ID_FIELD); - } - if (content.containsKey(DESCRIPTION_FIELD)) { - description = (String) content.get(DESCRIPTION_FIELD); - } - if (content.containsKey(MODEL_ID_FIELD)) { - modelId = (String) content.get(MODEL_ID_FIELD); - } - if (content.containsKey(INPUT_FIELD)) { - inputFieldName = (String) content.get(INPUT_FIELD); - } - if (content.containsKey(OUTPUT_FIELD)) { - outputFieldName = (String) content.get(OUTPUT_FIELD); + for (Entry entry : content.entrySet()) { + switch (entry.getKey()) { + case PIPELINE_ID_FIELD: + pipelineId = (String) content.get(PIPELINE_ID_FIELD); + break; + case DESCRIPTION_FIELD: + description = (String) content.get(DESCRIPTION_FIELD); + break; + case TYPE_FIELD: + type = (String) content.get(TYPE_FIELD); + break; + case MODEL_ID_FIELD: + modelId = (String) content.get(MODEL_ID_FIELD); + break; + case INPUT_FIELD: + inputFieldName = (String) content.get(INPUT_FIELD); + break; + case OUTPUT_FIELD: + outputFieldName = (String) content.get(OUTPUT_FIELD); + break; + } } + // Determmine if fields have been populated, else iterate over remaining workflow data if (Stream.of(pipelineId, description, modelId, inputFieldName, outputFieldName).allMatch(x -> x != null)) { try { configuration = BytesReference.bytes( @@ -102,8 +117,11 @@ public CompletableFuture execute(List data) { } } - // Create PutPipelineRequest and execute - if (configuration != null) { + if (configuration == null) { + // Required workflow data not found + createIngestPipelineFuture.completeExceptionally(new Exception("Failed to create ingest pipeline, required inputs not found")); + } else { + // Create PutPipelineRequest and execute PutPipelineRequest putPipelineRequest = new PutPipelineRequest(pipelineId, configuration, XContentType.JSON); clusterAdminClient.putPipeline(putPipelineRequest, ActionListener.wrap(response -> { logger.info("Created ingest pipeline : " + putPipelineRequest.getId()); @@ -122,9 +140,6 @@ public Map getContent() { logger.error("Failed to create ignest pipeline : " + exception.getMessage()); createIngestPipelineFuture.completeExceptionally(exception); })); - } else { - // Required workflow data not found - createIngestPipelineFuture.completeExceptionally(new Exception("Failed to create ingest pipeline, required inputs not found")); } return createIngestPipelineFuture; @@ -136,7 +151,7 @@ public String getName() { } /** - * Temporary, generates the ingest pipeline request content from workflow data + * Temporary, generates the ingest pipeline request content for text_embedding processor from workflow data * { * "description" : "", * "processors" : [ @@ -164,11 +179,11 @@ private XContentBuilder buildIngestPipelineRequestContent( ) throws IOException { return XContentFactory.jsonBuilder() .startObject() - .field("description", description) - .startArray("processors") - .startObject("text_embedding") - .field("model_id", modelId) - .startObject("field_map") + .field(DESCRIPTION_FIELD, description) + .startArray(PROCESSORS_FIELD) + .startObject(TYPE_FIELD) + .field(MODEL_ID_FIELD, modelId) + .startObject(FIELD_MAP) .field(inputFieldName, outputFieldName) .endObject() .endObject()