Skip to content

Commit

Permalink
Adding TODOs for storing response to global context index, fixing com…
Browse files Browse the repository at this point in the history
…ments

Signed-off-by: Joshua Palis <[email protected]>
  • Loading branch information
joshpalis committed Sep 14, 2023
1 parent 764c5af commit c728f0d
Show file tree
Hide file tree
Showing 3 changed files with 11 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
*/
package org.opensearch.flowframework;

import com.google.common.collect.ImmutableList;
import org.opensearch.client.Client;
import org.opensearch.cluster.metadata.IndexNameExpressionResolver;
import org.opensearch.cluster.service.ClusterService;
Expand All @@ -23,7 +24,6 @@
import org.opensearch.watcher.ResourceWatcherService;

import java.util.Collection;
import java.util.Collections;
import java.util.function.Supplier;

/**
Expand All @@ -47,10 +47,9 @@ public Collection<Object> createComponents(
IndexNameExpressionResolver indexNameExpressionResolver,
Supplier<RepositoriesService> repositoriesServiceSupplier
) {
// TODO create ingest pipeline workflow item here
this.client = client;
CreateIngestPipelineStep createIngestPipelineStep = new CreateIngestPipelineStep(client);

return Collections.emptyList();
return ImmutableList.of(createIngestPipelineStep);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ public class CreateIngestPipelineResponseData implements WorkflowData {

public CreateIngestPipelineResponseData(String ingestPipelineId) {
super();
// PutPipelineAction returns only an acknodledged response, returning ingest pipeline id instead
// PutPipelineAction returns only an ack response, returning ingest pipeline id instead
content.put("pipelineId", ingestPipelineId);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,10 +28,14 @@ public class CreateIngestPipelineStep implements WorkflowStep {
private static final Logger logger = LogManager.getLogger(CreateIngestPipelineStep.class);
private static final String NAME = "create_ingest_pipeline_step";

// Client to store a pipeline in the cluster state
private final ClusterAdminClient clusterAdminClient;
// Client to store response data into global context index
private final Client client;

public CreateIngestPipelineStep(Client client) {
this.clusterAdminClient = client.admin().cluster();
this.client = client;
}

@Override
Expand All @@ -58,9 +62,13 @@ public CompletableFuture<WorkflowData> execute(List<WorkflowData> data) {
if (putPipelineRequest != null) {
String pipelineId = putPipelineRequest.getId();
clusterAdminClient.putPipeline(putPipelineRequest, ActionListener.wrap(response -> {

// Return create pipeline response to workflow data
logger.info("Created pipeline : " + pipelineId);
CreateIngestPipelineResponseData responseData = new CreateIngestPipelineResponseData(pipelineId);
createIngestPipelineFuture.complete(responseData);

// TODO : Use node client to index response data to global context (pending global context index implementation)
}, exception -> {
logger.error("Failed to create pipeline : " + exception.getMessage());
createIngestPipelineFuture.completeExceptionally(exception);
Expand Down

0 comments on commit c728f0d

Please sign in to comment.