diff --git a/src/main/java/org/opensearch/flowframework/workflow/CreateIndex/CreateIndexStep.java b/src/main/java/org/opensearch/flowframework/workflow/CreateIndex/CreateIndexStep.java index 50b258b9c..3d62e08f7 100644 --- a/src/main/java/org/opensearch/flowframework/workflow/CreateIndex/CreateIndexStep.java +++ b/src/main/java/org/opensearch/flowframework/workflow/CreateIndex/CreateIndexStep.java @@ -55,7 +55,7 @@ public void onResponse(CreateIndexResponse createIndexResponse) { future.complete(new WorkflowData() { @Override public Map getContent() { - return Map.of("index", createIndexResponse.index()); + return Map.of("index-name", createIndexResponse.index()); } }); } @@ -72,12 +72,10 @@ public void onFailure(Exception e) { Settings settings = null; for (WorkflowData workflowData : data) { - // Fetch index from content i.e. request body of execute API Map content = workflowData.getContent(); - index = (String) content.get("index"); + index = (String) content.get("index-name"); type = (String) content.get("type"); - settings = (Settings) content.get("settings"); - if (index != null && type != null) { + if (index != null && type != null && settings != null) { break; } } @@ -86,8 +84,10 @@ public void onFailure(Exception e) { // 1. Create settings based on the index settings received from content try { - CreateIndexRequest request = new CreateIndexRequest(index).mapping(getIndexMappings(type), XContentType.JSON) - .settings(settings); + CreateIndexRequest request = new CreateIndexRequest(index).mapping( + getIndexMappings("mappings/" + type + ".json"), + XContentType.JSON + ); client.admin().indices().create(request, actionListener); } catch (Exception e) { logger.error("Failed to find the right mapping for the index", e); diff --git a/src/test/java/org/opensearch/flowframework/workflow/CreateIndex/CreateIndexStepTests.java b/src/test/java/org/opensearch/flowframework/workflow/CreateIndex/CreateIndexStepTests.java new file mode 100644 index 000000000..e316ea7cb --- /dev/null +++ b/src/test/java/org/opensearch/flowframework/workflow/CreateIndex/CreateIndexStepTests.java @@ -0,0 +1,105 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ +package org.opensearch.flowframework.workflow.CreateIndex; + +import org.opensearch.action.admin.indices.create.CreateIndexRequest; +import org.opensearch.action.admin.indices.create.CreateIndexResponse; +import org.opensearch.client.AdminClient; +import org.opensearch.client.Client; +import org.opensearch.client.IndicesAdminClient; +import org.opensearch.core.action.ActionListener; +import org.opensearch.flowframework.workflow.WorkflowData; +import org.opensearch.test.OpenSearchTestCase; + +import java.io.IOException; +import java.util.List; +import java.util.Map; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; + +import org.mockito.ArgumentCaptor; + +import static org.mockito.Mockito.any; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +public class CreateIndexStepTests extends OpenSearchTestCase { + + private WorkflowData inputData = WorkflowData.EMPTY; + + private Client client; + + private AdminClient adminClient; + + private IndicesAdminClient indicesAdminClient; + + @Override + public void setUp() throws Exception { + super.setUp(); + + inputData = new WorkflowData() { + + @Override + public Map getContent() { + // See CreateIndexRequest ParseFields for source of content keys needed + return Map.of("index-name", "demo", "type", "knn"); + } + + @Override + public Map getParams() { + // See RestCreateIndexAction for source of param keys needed + return Map.of(); + } + + }; + + client = mock(Client.class); + adminClient = mock(AdminClient.class); + indicesAdminClient = mock(IndicesAdminClient.class); + + when(adminClient.indices()).thenReturn(indicesAdminClient); + when(client.admin()).thenReturn(adminClient); + + } + + public void testCreateIndexStep() throws ExecutionException, InterruptedException, IOException { + + CreateIndexStep createIndexStep = new CreateIndexStep(client); + + ArgumentCaptor actionListenerCaptor = ArgumentCaptor.forClass(ActionListener.class); + CompletableFuture future = createIndexStep.execute(List.of(inputData)); + assertFalse(future.isDone()); + verify(indicesAdminClient, times(1)).create(any(CreateIndexRequest.class), actionListenerCaptor.capture()); + actionListenerCaptor.getValue().onResponse(new CreateIndexResponse(true, true, "demo")); + + assertTrue(future.isDone()); + + Map outputData = Map.of("index-name", "demo"); + assertEquals(outputData, future.get().getContent()); + + } + + public void testCreateIndexStepFailure() throws ExecutionException, InterruptedException { + + CreateIndexStep createIndexStep = new CreateIndexStep(client); + + ArgumentCaptor actionListenerCaptor = ArgumentCaptor.forClass(ActionListener.class); + CompletableFuture future = createIndexStep.execute(List.of(inputData)); + assertFalse(future.isDone()); + verify(indicesAdminClient, times(1)).create(any(CreateIndexRequest.class), actionListenerCaptor.capture()); + + actionListenerCaptor.getValue().onFailure(new Exception()); + + assertTrue(future.isDone()); + assertThrows(Exception.class, () -> future.get().getContent()); + + } +} diff --git a/src/test/resources/template/datademo.json b/src/test/resources/template/datademo.json index a1323ed2c..79f79594a 100644 --- a/src/test/resources/template/datademo.json +++ b/src/test/resources/template/datademo.json @@ -3,11 +3,13 @@ "nodes": [ { "id": "create_index", - "index_name": "demo" + "index_name": "demo", + "type": "knn" }, { "id": "create_another_index", - "index_name": "second_demo" + "index_name": "second_demo", + "type": "knn" } ], "edges": [