diff --git a/src/main/java/org/opensearch/flowframework/FlowFrameworkPlugin.java b/src/main/java/org/opensearch/flowframework/FlowFrameworkPlugin.java index fdaeaa2be..b7da9318c 100644 --- a/src/main/java/org/opensearch/flowframework/FlowFrameworkPlugin.java +++ b/src/main/java/org/opensearch/flowframework/FlowFrameworkPlugin.java @@ -8,6 +8,7 @@ */ package org.opensearch.flowframework; +import com.google.common.collect.ImmutableList; import org.opensearch.client.Client; import org.opensearch.cluster.metadata.IndexNameExpressionResolver; import org.opensearch.cluster.service.ClusterService; @@ -23,7 +24,6 @@ import org.opensearch.watcher.ResourceWatcherService; import java.util.Collection; -import java.util.Collections; import java.util.function.Supplier; /** @@ -47,10 +47,9 @@ public Collection createComponents( IndexNameExpressionResolver indexNameExpressionResolver, Supplier repositoriesServiceSupplier ) { - // TODO create ingest pipeline workflow item here this.client = client; CreateIngestPipelineStep createIngestPipelineStep = new CreateIngestPipelineStep(client); - return Collections.emptyList(); + return ImmutableList.of(createIngestPipelineStep); } } diff --git a/src/main/java/org/opensearch/flowframework/workflow/CreateIngestPipelineResponseData.java b/src/main/java/org/opensearch/flowframework/workflow/CreateIngestPipelineResponseData.java index 09d9db4e7..a175ef2f2 100644 --- a/src/main/java/org/opensearch/flowframework/workflow/CreateIngestPipelineResponseData.java +++ b/src/main/java/org/opensearch/flowframework/workflow/CreateIngestPipelineResponseData.java @@ -17,7 +17,7 @@ public class CreateIngestPipelineResponseData implements WorkflowData { public CreateIngestPipelineResponseData(String ingestPipelineId) { super(); - // PutPipelineAction returns only an acknodledged response, returning ingest pipeline id instead + // PutPipelineAction returns only an ack response, returning ingest pipeline id instead content.put("pipelineId", ingestPipelineId); } diff --git a/src/main/java/org/opensearch/flowframework/workflow/CreateIngestPipelineStep.java b/src/main/java/org/opensearch/flowframework/workflow/CreateIngestPipelineStep.java index e0ce35da3..f7e4461da 100644 --- a/src/main/java/org/opensearch/flowframework/workflow/CreateIngestPipelineStep.java +++ b/src/main/java/org/opensearch/flowframework/workflow/CreateIngestPipelineStep.java @@ -28,10 +28,14 @@ public class CreateIngestPipelineStep implements WorkflowStep { private static final Logger logger = LogManager.getLogger(CreateIngestPipelineStep.class); private static final String NAME = "create_ingest_pipeline_step"; + // Client to store a pipeline in the cluster state private final ClusterAdminClient clusterAdminClient; + // Client to store response data into global context index + private final Client client; public CreateIngestPipelineStep(Client client) { this.clusterAdminClient = client.admin().cluster(); + this.client = client; } @Override @@ -58,9 +62,13 @@ public CompletableFuture execute(List data) { if (putPipelineRequest != null) { String pipelineId = putPipelineRequest.getId(); clusterAdminClient.putPipeline(putPipelineRequest, ActionListener.wrap(response -> { + + // Return create pipeline response to workflow data logger.info("Created pipeline : " + pipelineId); CreateIngestPipelineResponseData responseData = new CreateIngestPipelineResponseData(pipelineId); createIngestPipelineFuture.complete(responseData); + + // TODO : Use node client to index response data to global context (pending global context index implementation) }, exception -> { logger.error("Failed to create pipeline : " + exception.getMessage()); createIngestPipelineFuture.completeExceptionally(exception);