Skip to content

Commit

Permalink
fixing comments
Browse files Browse the repository at this point in the history
Signed-off-by: Joshua Palis <[email protected]>
  • Loading branch information
joshpalis committed Sep 20, 2023
1 parent 5f1cc5b commit fdd2dbb
Showing 1 changed file with 86 additions and 15 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -13,13 +13,17 @@
import org.opensearch.action.ingest.PutPipelineRequest;
import org.opensearch.client.Client;
import org.opensearch.client.ClusterAdminClient;
import org.opensearch.common.xcontent.XContentFactory;
import org.opensearch.common.xcontent.XContentType;
import org.opensearch.core.action.ActionListener;
import org.opensearch.core.common.bytes.BytesArray;
import org.opensearch.core.common.bytes.BytesReference;
import org.opensearch.core.xcontent.XContentBuilder;

import java.io.IOException;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.stream.Stream;

/**
* Workflow step to create an ingest pipeline
Expand All @@ -29,6 +33,12 @@ public class CreateIngestPipelineStep implements WorkflowStep {
private static final Logger logger = LogManager.getLogger(CreateIngestPipelineStep.class);

Check warning on line 33 in src/main/java/org/opensearch/flowframework/workflow/CreateIngestPipelineStep.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/org/opensearch/flowframework/workflow/CreateIngestPipelineStep.java#L33

Added line #L33 was not covered by tests
private static final String NAME = "create_ingest_pipeline_step";

private static final String PIPELINE_ID_FIELD = "id";
private static final String DESCRIPTION_FIELD = "description";
private static final String MODEL_ID_FIELD = "model_id";
private static final String INPUT_FIELD = "input_field_name";
private static final String OUTPUT_FIELD = "output_field_name";

// Client to store a pipeline in the cluster state
private final ClusterAdminClient clusterAdminClient;
// Client to store response data into global context index
Expand All @@ -48,34 +58,55 @@ public CreateIngestPipelineStep(Client client) {
public CompletableFuture<WorkflowData> execute(List<WorkflowData> data) {

CompletableFuture<WorkflowData> createIngestPipelineFuture = new CompletableFuture<>();

Check warning on line 60 in src/main/java/org/opensearch/flowframework/workflow/CreateIngestPipelineStep.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/org/opensearch/flowframework/workflow/CreateIngestPipelineStep.java#L60

Added line #L60 was not covered by tests

String pipelineId = null;
String source = null;
String description = null;
String modelId = null;
String inputFieldName = null;
String outputFieldName = null;
BytesReference configuration = null;

Check warning on line 67 in src/main/java/org/opensearch/flowframework/workflow/CreateIngestPipelineStep.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/org/opensearch/flowframework/workflow/CreateIngestPipelineStep.java#L62-L67

Added lines #L62 - L67 were not covered by tests

// Extract required content from workflow data
// Extract required content from workflow data and generate the ingest pipeline configuration
for (WorkflowData workflowData : data) {

Map<String, String> parameters = workflowData.getParams();
Map<String, Object> content = workflowData.getContent();

logger.debug("Previous step sent params: {}, content: {}", parameters, content);

Check warning on line 74 in src/main/java/org/opensearch/flowframework/workflow/CreateIngestPipelineStep.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/org/opensearch/flowframework/workflow/CreateIngestPipelineStep.java#L72-L74

Added lines #L72 - L74 were not covered by tests

if (content.containsKey("id")) {
pipelineId = (String) content.get("id");
if (content.containsKey(PIPELINE_ID_FIELD)) {
pipelineId = (String) content.get(PIPELINE_ID_FIELD);

Check warning on line 77 in src/main/java/org/opensearch/flowframework/workflow/CreateIngestPipelineStep.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/org/opensearch/flowframework/workflow/CreateIngestPipelineStep.java#L77

Added line #L77 was not covered by tests
}
if (content.containsKey(DESCRIPTION_FIELD)) {
description = (String) content.get(DESCRIPTION_FIELD);

Check warning on line 80 in src/main/java/org/opensearch/flowframework/workflow/CreateIngestPipelineStep.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/org/opensearch/flowframework/workflow/CreateIngestPipelineStep.java#L80

Added line #L80 was not covered by tests
}
if (content.containsKey(MODEL_ID_FIELD)) {
modelId = (String) content.get(MODEL_ID_FIELD);

Check warning on line 83 in src/main/java/org/opensearch/flowframework/workflow/CreateIngestPipelineStep.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/org/opensearch/flowframework/workflow/CreateIngestPipelineStep.java#L83

Added line #L83 was not covered by tests
}
if (content.containsKey(INPUT_FIELD)) {
inputFieldName = (String) content.get(INPUT_FIELD);

Check warning on line 86 in src/main/java/org/opensearch/flowframework/workflow/CreateIngestPipelineStep.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/org/opensearch/flowframework/workflow/CreateIngestPipelineStep.java#L86

Added line #L86 was not covered by tests
}
if (content.containsKey("source")) {
source = (String) content.get("source");
if (content.containsKey(OUTPUT_FIELD)) {
outputFieldName = (String) content.get(OUTPUT_FIELD);

Check warning on line 89 in src/main/java/org/opensearch/flowframework/workflow/CreateIngestPipelineStep.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/org/opensearch/flowframework/workflow/CreateIngestPipelineStep.java#L89

Added line #L89 was not covered by tests
}
if (pipelineId != null && source != null) {

if (Stream.of(pipelineId, description, modelId, inputFieldName, outputFieldName).allMatch(x -> x != null)) {
try {
configuration = BytesReference.bytes(
buildIngestPipelineRequestContent(description, modelId, inputFieldName, outputFieldName)

Check warning on line 95 in src/main/java/org/opensearch/flowframework/workflow/CreateIngestPipelineStep.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/org/opensearch/flowframework/workflow/CreateIngestPipelineStep.java#L94-L95

Added lines #L94 - L95 were not covered by tests
);
} catch (IOException e) {
logger.error("Failed to create ingest pipeline : " + e.getMessage());
createIngestPipelineFuture.completeExceptionally(e);
}
break;

Check warning on line 101 in src/main/java/org/opensearch/flowframework/workflow/CreateIngestPipelineStep.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/org/opensearch/flowframework/workflow/CreateIngestPipelineStep.java#L97-L101

Added lines #L97 - L101 were not covered by tests
}
}

Check warning on line 103 in src/main/java/org/opensearch/flowframework/workflow/CreateIngestPipelineStep.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/org/opensearch/flowframework/workflow/CreateIngestPipelineStep.java#L103

Added line #L103 was not covered by tests

// Create PutPipelineRequest and execute
if (pipelineId != null && source != null) {
PutPipelineRequest putPipelineRequest = new PutPipelineRequest(pipelineId, new BytesArray(source), XContentType.JSON);
if (configuration != null) {
PutPipelineRequest putPipelineRequest = new PutPipelineRequest(pipelineId, configuration, XContentType.JSON);
clusterAdminClient.putPipeline(putPipelineRequest, ActionListener.wrap(response -> {

logger.info("Created pipeline : " + putPipelineRequest.getId());
logger.info("Created ingest pipeline : " + putPipelineRequest.getId());

Check warning on line 109 in src/main/java/org/opensearch/flowframework/workflow/CreateIngestPipelineStep.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/org/opensearch/flowframework/workflow/CreateIngestPipelineStep.java#L107-L109

Added lines #L107 - L109 were not covered by tests

// PutPipelineRequest returns only an AcknowledgeResponse, returning pipelineId instead
createIngestPipelineFuture.complete(new WorkflowData() {

Check warning on line 112 in src/main/java/org/opensearch/flowframework/workflow/CreateIngestPipelineStep.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/org/opensearch/flowframework/workflow/CreateIngestPipelineStep.java#L112

Added line #L112 was not covered by tests
Expand All @@ -88,12 +119,12 @@ public Map<String, Object> getContent() {
// TODO : Use node client to index response data to global context (pending global context index implementation)

}, exception -> {
logger.error("Failed to create pipeline : " + exception.getMessage());
logger.error("Failed to create ignest pipeline : " + exception.getMessage());
createIngestPipelineFuture.completeExceptionally(exception);
}));
} else {

Check warning on line 125 in src/main/java/org/opensearch/flowframework/workflow/CreateIngestPipelineStep.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/org/opensearch/flowframework/workflow/CreateIngestPipelineStep.java#L121-L125

Added lines #L121 - L125 were not covered by tests
// Required workflow data not found
createIngestPipelineFuture.completeExceptionally(new Exception("Failed to create pipeline"));
createIngestPipelineFuture.completeExceptionally(new Exception("Failed to create ingest pipeline, required inputs not found"));

Check warning on line 127 in src/main/java/org/opensearch/flowframework/workflow/CreateIngestPipelineStep.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/org/opensearch/flowframework/workflow/CreateIngestPipelineStep.java#L127

Added line #L127 was not covered by tests
}

return createIngestPipelineFuture;

Check warning on line 130 in src/main/java/org/opensearch/flowframework/workflow/CreateIngestPipelineStep.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/org/opensearch/flowframework/workflow/CreateIngestPipelineStep.java#L130

Added line #L130 was not covered by tests
Expand All @@ -104,4 +135,44 @@ public String getName() {
return NAME;

Check warning on line 135 in src/main/java/org/opensearch/flowframework/workflow/CreateIngestPipelineStep.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/org/opensearch/flowframework/workflow/CreateIngestPipelineStep.java#L135

Added line #L135 was not covered by tests
}

/**
* Temporary, generates the ingest pipeline request content from workflow data
* {
* "description" : "<description>",
* "processors" : [
* "text_embedding" : {
* "model_id" : "<model_id>",
* "field_map" : {
* "<input_field_name>" : "<output_field_name>"
* }
* }
* ]
* }
*
* @param description The description of the ingest pipeline configuration
* @param modelId The ID of the model that will be used in the embedding interface
* @param inputFieldName The field name used to cache text for text embeddings
* @param outputFieldName The field name in which output text is stored
* @throws IOException if the request content fails to be generated
* @return the xcontent builder with the formatted ingest pipeline configuration
*/
private XContentBuilder buildIngestPipelineRequestContent(
String description,
String modelId,
String inputFieldName,
String outputFieldName
) throws IOException {
return XContentFactory.jsonBuilder()
.startObject()
.field("description", description)
.startArray("processors")
.startObject("text_embedding")
.field("model_id", modelId)
.startObject("field_map")
.field(inputFieldName, outputFieldName)
.endObject()
.endObject()
.endArray()
.endObject();

Check warning on line 176 in src/main/java/org/opensearch/flowframework/workflow/CreateIngestPipelineStep.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/org/opensearch/flowframework/workflow/CreateIngestPipelineStep.java#L165-L176

Added lines #L165 - L176 were not covered by tests
}
}

0 comments on commit fdd2dbb

Please sign in to comment.