Skip to content

Commit

Permalink
Adds unit tests for create ingest pipeline step, fixes pipeline reque…
Browse files Browse the repository at this point in the history
…st body generator

Signed-off-by: Joshua Palis <[email protected]>
  • Loading branch information
joshpalis committed Sep 22, 2023
1 parent 7fcb718 commit be67209
Show file tree
Hide file tree
Showing 2 changed files with 121 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ public CompletableFuture<WorkflowData> execute(List<WorkflowData> data) {

Map<String, String> parameters = workflowData.getParams();
Map<String, Object> content = workflowData.getContent();
logger.debug("Previous step sent params: {}, content: {}", parameters, content);
logger.info("Previous step sent params: {}, content: {}", parameters, content);

for (Entry<String, Object> entry : content.entrySet()) {
switch (entry.getKey()) {
Expand All @@ -105,13 +105,13 @@ public CompletableFuture<WorkflowData> execute(List<WorkflowData> data) {
}

// Determmine if fields have been populated, else iterate over remaining workflow data
if (Stream.of(pipelineId, description, modelId, inputFieldName, outputFieldName).allMatch(x -> x != null)) {
if (Stream.of(pipelineId, description, modelId, type, inputFieldName, outputFieldName).allMatch(x -> x != null)) {
try {
configuration = BytesReference.bytes(
buildIngestPipelineRequestContent(description, modelId, inputFieldName, outputFieldName)
buildIngestPipelineRequestContent(description, modelId, type, inputFieldName, outputFieldName)
);
} catch (IOException e) {
logger.error("Failed to create ingest pipeline : " + e.getMessage());
logger.error("Failed to create ingest pipeline configuration: " + e.getMessage());
createIngestPipelineFuture.completeExceptionally(e);

Check warning on line 115 in src/main/java/org/opensearch/flowframework/workflow/CreateIngestPipelineStep.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/org/opensearch/flowframework/workflow/CreateIngestPipelineStep.java#L113-L115

Added lines #L113 - L115 were not covered by tests
}
break;

Check warning on line 117 in src/main/java/org/opensearch/flowframework/workflow/CreateIngestPipelineStep.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/org/opensearch/flowframework/workflow/CreateIngestPipelineStep.java#L117

Added line #L117 was not covered by tests
Expand Down Expand Up @@ -157,7 +157,7 @@ public String getName() {
* "description" : "<description>",
* "processors" : [
* {
* "text_embedding" : {
* "<type>" : {
* "model_id" : "<model_id>",
* "field_map" : {
* "<input_field_name>" : "<output_field_name>"
Expand All @@ -168,6 +168,7 @@ public String getName() {
*
* @param description The description of the ingest pipeline configuration
* @param modelId The ID of the model that will be used in the embedding interface
* @param type The processor type
* @param inputFieldName The field name used to cache text for text embeddings
* @param outputFieldName The field name in which output text is stored
* @throws IOException if the request content fails to be generated
Expand All @@ -176,19 +177,22 @@ public String getName() {
private XContentBuilder buildIngestPipelineRequestContent(
String description,
String modelId,
String type,
String inputFieldName,
String outputFieldName
) throws IOException {
return XContentFactory.jsonBuilder()
.startObject()
.field(DESCRIPTION_FIELD, description)
.startArray(PROCESSORS_FIELD)
.startObject(TYPE_FIELD)
.startObject()
.startObject(type)
.field(MODEL_ID_FIELD, modelId)
.startObject(FIELD_MAP)
.field(inputFieldName, outputFieldName)
.endObject()
.endObject()
.endObject()
.endArray()
.endObject();
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,111 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/
package org.opensearch.flowframework.workflow.CreateIngestPipeline;

import org.opensearch.action.ingest.PutPipelineRequest;
import org.opensearch.action.support.master.AcknowledgedResponse;
import org.opensearch.client.AdminClient;
import org.opensearch.client.Client;
import org.opensearch.client.ClusterAdminClient;
import org.opensearch.core.action.ActionListener;
import org.opensearch.flowframework.workflow.CreateIngestPipelineStep;
import org.opensearch.flowframework.workflow.WorkflowData;
import org.opensearch.test.OpenSearchTestCase;

import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;

import org.mockito.ArgumentCaptor;

import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;

public class CreateIngestPipelineStepTests extends OpenSearchTestCase {

private WorkflowData inputData;
private WorkflowData outpuData;
private Client client;
private AdminClient adminClient;
private ClusterAdminClient clusterAdminClient;

@Override
public void setUp() throws Exception {
super.setUp();

inputData = new WorkflowData() {

@Override
public Map<String, Object> getContent() {
return Map.of(
"id",
"pipelineId",
"description",
"some description",
"type",
"text_embedding",
"model_id",
"model_id",
"input_field_name",
"inputField",
"output_field_name",
"outputField"
);
}

@Override
public Map<String, String> getParams() {
return Map.of();
}
};

// Set output data to returned pipelineId
outpuData = new WorkflowData() {

@Override
public Map<String, Object> getContent() {
return Map.of("pipelineId", "pipelineId");
}

@Override
public Map<String, String> getParams() {
return Map.of();
}
};

client = mock(Client.class);
adminClient = mock(AdminClient.class);
clusterAdminClient = mock(ClusterAdminClient.class);

when(client.admin()).thenReturn(adminClient);
when(adminClient.cluster()).thenReturn(clusterAdminClient);
}

public void testCreateIngestPipelineStep() throws InterruptedException, ExecutionException {

CreateIngestPipelineStep createIngestPipelineStep = new CreateIngestPipelineStep(client);

ArgumentCaptor<ActionListener> actionListenerCaptor = ArgumentCaptor.forClass(ActionListener.class);
CompletableFuture<WorkflowData> future = createIngestPipelineStep.execute(List.of(inputData));

assertFalse(future.isDone());

// Mock put pipeline request execution and return true
verify(clusterAdminClient, times(1)).putPipeline(any(PutPipelineRequest.class), actionListenerCaptor.capture());
actionListenerCaptor.getValue().onResponse(new AcknowledgedResponse(true));

assertTrue(future.isDone());
assertEquals(outpuData.getContent(), future.get().getContent());
}

}

0 comments on commit be67209

Please sign in to comment.