Skip to content

Commit

Permalink
Addressing PR comments
Browse files Browse the repository at this point in the history
Signed-off-by: Joshua Palis <[email protected]>
  • Loading branch information
joshpalis committed Sep 22, 2023
1 parent ff889ed commit a38773e
Show file tree
Hide file tree
Showing 2 changed files with 29 additions and 39 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,8 @@ public CompletableFuture<WorkflowData> execute(List<WorkflowData> data) {
case OUTPUT_FIELD:
outputFieldName = (String) content.get(OUTPUT_FIELD);
break;
default:
break;
}
}

Expand Down Expand Up @@ -138,7 +140,7 @@ public Map<String, Object> getContent() {
// TODO : Use node client to index response data to global context (pending global context index implementation)

}, exception -> {
logger.error("Failed to create ignest pipeline : " + exception.getMessage());
logger.error("Failed to create ingest pipeline : " + exception.getMessage());
createIngestPipelineFuture.completeExceptionally(exception);
}));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,39 +47,23 @@ public void setUp() throws Exception {

@Override
public Map<String, Object> getContent() {
return Map.of(
"id",
"pipelineId",
"description",
"some description",
"type",
"text_embedding",
"model_id",
"model_id",
"input_field_name",
"inputField",
"output_field_name",
"outputField"
return Map.ofEntries(
Map.entry("id", "pipelineId"),
Map.entry("description", "some description"),
Map.entry("type", "text_embedding"),
Map.entry("model_id", "model_id"),
Map.entry("input_field_name", "inputField"),
Map.entry("output_field_name", "outputField")
);
}

@Override
public Map<String, String> getParams() {
return Map.of();
}
};

// Set output data to returned pipelineId
outpuData = new WorkflowData() {

@Override
public Map<String, Object> getContent() {
return Map.of("pipelineId", "pipelineId");
}

@Override
public Map<String, String> getParams() {
return Map.of();
return Map.ofEntries(Map.entry("pipelineId", "pipelineId"));
}
};

Expand All @@ -104,11 +88,11 @@ public void testCreateIngestPipelineStep() throws InterruptedException, Executio
verify(clusterAdminClient, times(1)).putPipeline(any(PutPipelineRequest.class), actionListenerCaptor.capture());
actionListenerCaptor.getValue().onResponse(new AcknowledgedResponse(true));

assertTrue(future.isDone());
assertTrue(future.isDone() && !future.isCompletedExceptionally());
assertEquals(outpuData.getContent(), future.get().getContent());
}

public void testCreateIngestPipelineStepFailure() {
public void testCreateIngestPipelineStepFailure() throws InterruptedException {

CreateIngestPipelineStep createIngestPipelineStep = new CreateIngestPipelineStep(client);

Expand All @@ -119,35 +103,39 @@ public void testCreateIngestPipelineStepFailure() {

// Mock put pipeline request execution and return false
verify(clusterAdminClient, times(1)).putPipeline(any(PutPipelineRequest.class), actionListenerCaptor.capture());
actionListenerCaptor.getValue().onFailure(new Exception());
actionListenerCaptor.getValue().onFailure(new Exception("Failed to create ingest pipeline"));

assertTrue(future.isDone());
assertThrows(Exception.class, () -> future.get());
assertTrue(future.isDone() && future.isCompletedExceptionally());
try {
future.get();
} catch (ExecutionException e) {
assertTrue(e.getMessage().contains("Failed to create ingest pipeline"));
}
}

public void testMissingData() {
public void testMissingData() throws InterruptedException {
CreateIngestPipelineStep createIngestPipelineStep = new CreateIngestPipelineStep(client);

// Data with missing input and output fields
WorkflowData incorrectData = new WorkflowData() {

@Override
public Map<String, Object> getContent() {
return Map.of("id", "pipelineId", "description", "some description", "type", "text_embedding", "model_id", "model_id");
}

@Override
public Map<String, String> getParams() {
return Map.of();
return Map.ofEntries(
Map.entry("id", "pipelineId"),
Map.entry("description", "some description"),
Map.entry("type", "text_embedding"),
Map.entry("model_id", "model_id")
);
}
};

CompletableFuture<WorkflowData> future = createIngestPipelineStep.execute(List.of(incorrectData));
assertTrue(future.isDone());
assertTrue(future.isDone() && future.isCompletedExceptionally());

try {
future.get();
} catch (Exception e) {
} catch (ExecutionException e) {
assertTrue(e.getMessage().contains("Failed to create ingest pipeline, required inputs not found"));
}
}
Expand Down

0 comments on commit a38773e

Please sign in to comment.