Skip to content

Commit

Permalink
addressing PR comments, adding switch statement to handle parsing wor…
Browse files Browse the repository at this point in the history
…kflow data for required fields

Signed-off-by: Joshua Palis <[email protected]>
  • Loading branch information
joshpalis committed Sep 21, 2023
1 parent fdd2dbb commit d9566d4
Showing 1 changed file with 40 additions and 25 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,14 @@ public class CreateIngestPipelineStep implements WorkflowStep {
private static final Logger logger = LogManager.getLogger(CreateIngestPipelineStep.class);
private static final String NAME = "create_ingest_pipeline_step";

// Common pipeline configuration fields
private static final String PIPELINE_ID_FIELD = "id";
private static final String DESCRIPTION_FIELD = "description";
private static final String PROCESSORS_FIELD = "processors";
private static final String TYPE_FIELD = "type";

// Temporary text embedding processor fields
private static final String FIELD_MAP = "field_map";
private static final String MODEL_ID_FIELD = "model_id";
private static final String INPUT_FIELD = "input_field_name";
private static final String OUTPUT_FIELD = "output_field_name";
Expand All @@ -61,6 +67,7 @@ public CompletableFuture<WorkflowData> execute(List<WorkflowData> data) {

String pipelineId = null;
String description = null;
String type = null;
String modelId = null;
String inputFieldName = null;
String outputFieldName = null;
Expand All @@ -73,22 +80,30 @@ public CompletableFuture<WorkflowData> execute(List<WorkflowData> data) {
Map<String, Object> content = workflowData.getContent();
logger.debug("Previous step sent params: {}, content: {}", parameters, content);

if (content.containsKey(PIPELINE_ID_FIELD)) {
pipelineId = (String) content.get(PIPELINE_ID_FIELD);
}
if (content.containsKey(DESCRIPTION_FIELD)) {
description = (String) content.get(DESCRIPTION_FIELD);
}
if (content.containsKey(MODEL_ID_FIELD)) {
modelId = (String) content.get(MODEL_ID_FIELD);
}
if (content.containsKey(INPUT_FIELD)) {
inputFieldName = (String) content.get(INPUT_FIELD);
}
if (content.containsKey(OUTPUT_FIELD)) {
outputFieldName = (String) content.get(OUTPUT_FIELD);
for (Entry<String, Object> entry : content.entrySet()) {
switch (entry.getKey()) {
case PIPELINE_ID_FIELD:
pipelineId = (String) content.get(PIPELINE_ID_FIELD);
break;
case DESCRIPTION_FIELD:
description = (String) content.get(DESCRIPTION_FIELD);
break;
case TYPE_FIELD:
type = (String) content.get(TYPE_FIELD);
break;
case MODEL_ID_FIELD:
modelId = (String) content.get(MODEL_ID_FIELD);
break;
case INPUT_FIELD:
inputFieldName = (String) content.get(INPUT_FIELD);
break;
case OUTPUT_FIELD:
outputFieldName = (String) content.get(OUTPUT_FIELD);
break;
}
}

// Determmine if fields have been populated, else iterate over remaining workflow data
if (Stream.of(pipelineId, description, modelId, inputFieldName, outputFieldName).allMatch(x -> x != null)) {
try {
configuration = BytesReference.bytes(
Expand All @@ -102,8 +117,11 @@ public CompletableFuture<WorkflowData> execute(List<WorkflowData> data) {
}
}

// Create PutPipelineRequest and execute
if (configuration != null) {
if (configuration == null) {
// Required workflow data not found
createIngestPipelineFuture.completeExceptionally(new Exception("Failed to create ingest pipeline, required inputs not found"));
} else {
// Create PutPipelineRequest and execute
PutPipelineRequest putPipelineRequest = new PutPipelineRequest(pipelineId, configuration, XContentType.JSON);
clusterAdminClient.putPipeline(putPipelineRequest, ActionListener.wrap(response -> {
logger.info("Created ingest pipeline : " + putPipelineRequest.getId());
Expand All @@ -122,9 +140,6 @@ public Map<String, Object> getContent() {
logger.error("Failed to create ignest pipeline : " + exception.getMessage());
createIngestPipelineFuture.completeExceptionally(exception);
}));
} else {
// Required workflow data not found
createIngestPipelineFuture.completeExceptionally(new Exception("Failed to create ingest pipeline, required inputs not found"));
}

return createIngestPipelineFuture;
Expand All @@ -136,7 +151,7 @@ public String getName() {
}

/**
* Temporary, generates the ingest pipeline request content from workflow data
* Temporary, generates the ingest pipeline request content for text_embedding processor from workflow data
* {
* "description" : "<description>",
* "processors" : [
Expand Down Expand Up @@ -164,11 +179,11 @@ private XContentBuilder buildIngestPipelineRequestContent(
) throws IOException {
return XContentFactory.jsonBuilder()
.startObject()
.field("description", description)
.startArray("processors")
.startObject("text_embedding")
.field("model_id", modelId)
.startObject("field_map")
.field(DESCRIPTION_FIELD, description)
.startArray(PROCESSORS_FIELD)
.startObject(TYPE_FIELD)
.field(MODEL_ID_FIELD, modelId)
.startObject(FIELD_MAP)
.field(inputFieldName, outputFieldName)
.endObject()
.endObject()
Expand Down

0 comments on commit d9566d4

Please sign in to comment.