diff --git a/src/main/java/org/opensearch/flowframework/workflow/CreateIngestPipelineStep.java b/src/main/java/org/opensearch/flowframework/workflow/CreateIngestPipelineStep.java index 7d7f02bbd..09cf3ed8e 100644 --- a/src/main/java/org/opensearch/flowframework/workflow/CreateIngestPipelineStep.java +++ b/src/main/java/org/opensearch/flowframework/workflow/CreateIngestPipelineStep.java @@ -101,6 +101,8 @@ public CompletableFuture execute(List data) { case OUTPUT_FIELD: outputFieldName = (String) content.get(OUTPUT_FIELD); break; + default: + break; } } @@ -138,7 +140,7 @@ public Map getContent() { // TODO : Use node client to index response data to global context (pending global context index implementation) }, exception -> { - logger.error("Failed to create ignest pipeline : " + exception.getMessage()); + logger.error("Failed to create ingest pipeline : " + exception.getMessage()); createIngestPipelineFuture.completeExceptionally(exception); })); } diff --git a/src/test/java/org/opensearch/flowframework/workflow/CreateIngestPipeline/CreateIngestPipelineStepTests.java b/src/test/java/org/opensearch/flowframework/workflow/CreateIngestPipeline/CreateIngestPipelineStepTests.java index eab54bcdb..6513f3291 100644 --- a/src/test/java/org/opensearch/flowframework/workflow/CreateIngestPipeline/CreateIngestPipelineStepTests.java +++ b/src/test/java/org/opensearch/flowframework/workflow/CreateIngestPipeline/CreateIngestPipelineStepTests.java @@ -47,26 +47,15 @@ public void setUp() throws Exception { @Override public Map getContent() { - return Map.of( - "id", - "pipelineId", - "description", - "some description", - "type", - "text_embedding", - "model_id", - "model_id", - "input_field_name", - "inputField", - "output_field_name", - "outputField" + return Map.ofEntries( + Map.entry("id", "pipelineId"), + Map.entry("description", "some description"), + Map.entry("type", "text_embedding"), + Map.entry("model_id", "model_id"), + Map.entry("input_field_name", "inputField"), + Map.entry("output_field_name", "outputField") ); } - - @Override - public Map getParams() { - return Map.of(); - } }; // Set output data to returned pipelineId @@ -74,12 +63,7 @@ public Map getParams() { @Override public Map getContent() { - return Map.of("pipelineId", "pipelineId"); - } - - @Override - public Map getParams() { - return Map.of(); + return Map.ofEntries(Map.entry("pipelineId", "pipelineId")); } }; @@ -104,11 +88,11 @@ public void testCreateIngestPipelineStep() throws InterruptedException, Executio verify(clusterAdminClient, times(1)).putPipeline(any(PutPipelineRequest.class), actionListenerCaptor.capture()); actionListenerCaptor.getValue().onResponse(new AcknowledgedResponse(true)); - assertTrue(future.isDone()); + assertTrue(future.isDone() && !future.isCompletedExceptionally()); assertEquals(outpuData.getContent(), future.get().getContent()); } - public void testCreateIngestPipelineStepFailure() { + public void testCreateIngestPipelineStepFailure() throws InterruptedException { CreateIngestPipelineStep createIngestPipelineStep = new CreateIngestPipelineStep(client); @@ -119,13 +103,17 @@ public void testCreateIngestPipelineStepFailure() { // Mock put pipeline request execution and return false verify(clusterAdminClient, times(1)).putPipeline(any(PutPipelineRequest.class), actionListenerCaptor.capture()); - actionListenerCaptor.getValue().onFailure(new Exception()); + actionListenerCaptor.getValue().onFailure(new Exception("Failed to create ingest pipeline")); - assertTrue(future.isDone()); - assertThrows(Exception.class, () -> future.get()); + assertTrue(future.isDone() && future.isCompletedExceptionally()); + try { + future.get(); + } catch (ExecutionException e) { + assertTrue(e.getMessage().contains("Failed to create ingest pipeline")); + } } - public void testMissingData() { + public void testMissingData() throws InterruptedException { CreateIngestPipelineStep createIngestPipelineStep = new CreateIngestPipelineStep(client); // Data with missing input and output fields @@ -133,21 +121,21 @@ public void testMissingData() { @Override public Map getContent() { - return Map.of("id", "pipelineId", "description", "some description", "type", "text_embedding", "model_id", "model_id"); - } - - @Override - public Map getParams() { - return Map.of(); + return Map.ofEntries( + Map.entry("id", "pipelineId"), + Map.entry("description", "some description"), + Map.entry("type", "text_embedding"), + Map.entry("model_id", "model_id") + ); } }; CompletableFuture future = createIngestPipelineStep.execute(List.of(incorrectData)); - assertTrue(future.isDone()); + assertTrue(future.isDone() && future.isCompletedExceptionally()); try { future.get(); - } catch (Exception e) { + } catch (ExecutionException e) { assertTrue(e.getMessage().contains("Failed to create ingest pipeline, required inputs not found")); } }