diff --git a/src/main/java/org/opensearch/flowframework/FlowFrameworkPlugin.java b/src/main/java/org/opensearch/flowframework/FlowFrameworkPlugin.java index 09cae4940..2f7a34f3f 100644 --- a/src/main/java/org/opensearch/flowframework/FlowFrameworkPlugin.java +++ b/src/main/java/org/opensearch/flowframework/FlowFrameworkPlugin.java @@ -16,7 +16,7 @@ import org.opensearch.core.xcontent.NamedXContentRegistry; import org.opensearch.env.Environment; import org.opensearch.env.NodeEnvironment; -import org.opensearch.flowframework.workflow.CreateIndexStep; +import org.opensearch.flowframework.workflow.CreateIndex.CreateIndexStep; import org.opensearch.plugins.Plugin; import org.opensearch.repositories.RepositoriesService; import org.opensearch.script.ScriptService; diff --git a/src/main/java/org/opensearch/flowframework/workflow/CreateIndexStep.java b/src/main/java/org/opensearch/flowframework/workflow/CreateIndex/CreateIndexStep.java similarity index 64% rename from src/main/java/org/opensearch/flowframework/workflow/CreateIndexStep.java rename to src/main/java/org/opensearch/flowframework/workflow/CreateIndex/CreateIndexStep.java index 9ebd9aade..242d80bae 100644 --- a/src/main/java/org/opensearch/flowframework/workflow/CreateIndexStep.java +++ b/src/main/java/org/opensearch/flowframework/workflow/CreateIndex/CreateIndexStep.java @@ -6,7 +6,7 @@ * this file be licensed under the Apache-2.0 license or a * compatible open source license. */ -package org.opensearch.flowframework.workflow; +package org.opensearch.flowframework.workflow.CreateIndex; import com.google.common.base.Charsets; import com.google.common.io.Resources; @@ -17,9 +17,13 @@ import org.opensearch.action.admin.indices.create.CreateIndexResponse; import org.opensearch.client.Client; import org.opensearch.common.xcontent.XContentType; +import org.opensearch.flowframework.workflow.WorkflowData; +import org.opensearch.flowframework.workflow.WorkflowStep; import java.io.IOException; import java.net.URL; +import java.util.List; +import java.util.Map; import java.util.concurrent.CompletableFuture; public class CreateIndexStep implements WorkflowStep { @@ -33,26 +37,43 @@ public CreateIndexStep(Client client) { } @Override - public CompletableFuture execute(WorkflowData data) { - + public CompletableFuture execute(List data) { + CompletableFuture future = new CompletableFuture<>(); ActionListener actionListener = new ActionListener<>() { @Override public void onResponse(CreateIndexResponse createIndexResponse) { logger.info("created index:{}"); + future.complete(new WorkflowData() { + @Override + public Map getContent() { + return Map.of("index", createIndexResponse.index()); + } + }); } @Override public void onFailure(Exception e) { logger.error("Index creation failed", e); + future.completeExceptionally(e); } }; - // Fetch indexName, fileName and settings from WorkflowData - CreateIndexRequest request = new CreateIndexRequest(indexName).mapping(getIndexMappings(fileName), XContentType.JSON) + String index = null; + + for (WorkflowData workflowData : data) { + // Fetch index from content i.e. request body of execute API + Map content = workflowData.getContent(); + index = (String) content.get("index"); + } + + // TODO: + // 1. Map index type -> fileName + // 2. Create settings based on the index settings received from content + CreateIndexRequest request = new CreateIndexRequest(index).mapping(getIndexMappings(fileName), XContentType.JSON) .settings(settings); client.admin().indices().create(request, actionListener); - return null; + return future; } @Override