Skip to content

Commit

Permalink
Updating workflowData interface, modifying CreateIngestPipelineStep
Browse files Browse the repository at this point in the history
Signed-off-by: Joshua Palis <[email protected]>
  • Loading branch information
joshpalis committed Sep 18, 2023
1 parent c728f0d commit 6b2a0ac
Show file tree
Hide file tree
Showing 5 changed files with 50 additions and 115 deletions.

This file was deleted.

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,12 @@
import org.opensearch.action.ingest.PutPipelineRequest;
import org.opensearch.client.Client;
import org.opensearch.client.ClusterAdminClient;
import org.opensearch.common.xcontent.XContentType;
import org.opensearch.core.action.ActionListener;
import org.opensearch.core.common.bytes.BytesReference;
import org.opensearch.core.xcontent.MediaType;
import org.opensearch.core.common.bytes.BytesArray;

import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;

/**
Expand All @@ -42,40 +43,51 @@ public CreateIngestPipelineStep(Client client) {
public CompletableFuture<WorkflowData> execute(List<WorkflowData> data) {

CompletableFuture<WorkflowData> createIngestPipelineFuture = new CompletableFuture<>();
PutPipelineRequest putPipelineRequest = null;
String pipelineId = null;
String source = null;

// TODO : Still not clear if this is the correct way to retrieve data from the parsed use case tempalte
// Extract required content from workflow data
for (WorkflowData workflowData : data) {
if (workflowData instanceof WorkflowInputData) {
logger.debug("Previous step sent params: {}, content: {}", workflowData.getParams(), workflowData.getContent());

WorkflowInputData inputData = (WorkflowInputData) workflowData;
logger.debug("Previous step sent params: {}, content: {}", inputData.getParams(), workflowData.getContent());

// Extract params and content to create request
String pipelineId = inputData.getParams().get("id");
BytesReference source = (BytesReference) inputData.getContent().get("source");
MediaType mediaType = (MediaType) inputData.getContent().get("mediaType");
putPipelineRequest = new PutPipelineRequest(pipelineId, source, mediaType);
Map<String, Object> content = workflowData.getContent();
if (content.containsKey("id")) {
pipelineId = (String) content.get("id");
}
if (content.containsKey("source")) {
source = (String) content.get("source");
}
if (pipelineId != null && source != null) {
break;
}
}

if (putPipelineRequest != null) {
String pipelineId = putPipelineRequest.getId();
// Create PutPipelineRequest and execute
if (pipelineId != null && source != null) {
PutPipelineRequest putPipelineRequest = new PutPipelineRequest(pipelineId, new BytesArray(source), XContentType.JSON);
clusterAdminClient.putPipeline(putPipelineRequest, ActionListener.wrap(response -> {

// Return create pipeline response to workflow data
logger.info("Created pipeline : " + pipelineId);
CreateIngestPipelineResponseData responseData = new CreateIngestPipelineResponseData(pipelineId);
createIngestPipelineFuture.complete(responseData);
logger.info("Created pipeline : " + putPipelineRequest.getId());

// PutPipelineRequest returns only an AcknowledgeResponse, returning pipelineId instead
createIngestPipelineFuture.complete(new WorkflowData() {
@Override
public Map<String, Object> getContent() {
return Map.of("pipelineId", putPipelineRequest.getId());
}
});

// TODO : Use node client to index response data to global context (pending global context index implementation)

}, exception -> {
logger.error("Failed to create pipeline : " + exception.getMessage());
createIngestPipelineFuture.completeExceptionally(exception);
}));
} else {
// Required workflow data not found
createIngestPipelineFuture.completeExceptionally(new Exception("Failed to create pipeline"));
}

return createIngestPipelineFuture;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,16 +8,33 @@
*/
package org.opensearch.flowframework.workflow;

import java.util.Collections;
import java.util.Map;

/**
* Interface for handling the input/output of the building blocks.
* Interface representing data provided as input to, and produced as output from, {@link WorkflowStep}s.
*/
public interface WorkflowData {

/**
* An object representing no data, useful when a workflow step has no required input or output.
*/
WorkflowData EMPTY = new WorkflowData() {
};

/**
* Accesses a map containing the content of the workflow step. This represents the data associated with a Rest API request.
* @return the content of this step.
*/
Map<String, Object> getContent();
default Map<String, Object> getContent() {
return Collections.emptyMap();
};

/**
* Accesses a map containing the params of this workflow step. This represents the params associated with a Rest API request, parsed from the URI.
* @return the params of this step.
*/
default Map<String, String> getParams() {
return Collections.emptyMap();
};
}

This file was deleted.

0 comments on commit 6b2a0ac

Please sign in to comment.