Skip to content

Commit

Permalink
Initial create ingest pipeline implementation
Browse files Browse the repository at this point in the history
Signed-off-by: Joshua Palis <[email protected]>
  • Loading branch information
joshpalis committed Sep 14, 2023
1 parent 0f0b65d commit 764c5af
Show file tree
Hide file tree
Showing 8 changed files with 244 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,49 @@
*/
package org.opensearch.flowframework;

import org.opensearch.client.Client;
import org.opensearch.cluster.metadata.IndexNameExpressionResolver;
import org.opensearch.cluster.service.ClusterService;
import org.opensearch.core.common.io.stream.NamedWriteableRegistry;
import org.opensearch.core.xcontent.NamedXContentRegistry;
import org.opensearch.env.Environment;
import org.opensearch.env.NodeEnvironment;
import org.opensearch.flowframework.workflow.CreateIngestPipelineStep;
import org.opensearch.plugins.Plugin;
import org.opensearch.repositories.RepositoriesService;
import org.opensearch.script.ScriptService;
import org.opensearch.threadpool.ThreadPool;
import org.opensearch.watcher.ResourceWatcherService;

import java.util.Collection;
import java.util.Collections;
import java.util.function.Supplier;

/**
* An OpenSearch plugin that enables builders to innovate AI apps on OpenSearch.
*/
public class FlowFrameworkPlugin extends Plugin {
// Implement the relevant Plugin Interfaces here

private Client client;

@Override
public Collection<Object> createComponents(
Client client,
ClusterService clusterService,
ThreadPool threadPool,
ResourceWatcherService resourceWatcherService,
ScriptService scriptService,
NamedXContentRegistry xContentRegistry,
Environment environment,
NodeEnvironment nodeEnvironment,
NamedWriteableRegistry namedWriteableRegistry,
IndexNameExpressionResolver indexNameExpressionResolver,
Supplier<RepositoriesService> repositoriesServiceSupplier
) {
// TODO create ingest pipeline workflow item here
this.client = client;
CreateIngestPipelineStep createIngestPipelineStep = new CreateIngestPipelineStep(client);

return Collections.emptyList();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/
package org.opensearch.flowframework.workflow;

import org.opensearch.action.ingest.PutPipelineRequest;

import java.util.HashMap;
import java.util.Map;

public class CreateIngestPipelineRequestData implements WorkflowInputData {

private Map<String, String> params = new HashMap<>();
private Map<String, Object> content = new HashMap<>();

public CreateIngestPipelineRequestData(PutPipelineRequest request) {
super();

// RestPutPipelineAction params
params.put("id", request.getId());

// PutPipelineRequest content
content.put("source", request.getSource());
content.put("mediaType", request.getMediaType());
}

@Override
public Map<String, Object> getContent() {
return Map.copyOf(content);
}

@Override
public Map<String, String> getParams() {
return Map.copyOf(params);
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/
package org.opensearch.flowframework.workflow;

import java.util.HashMap;
import java.util.Map;

public class CreateIngestPipelineResponseData implements WorkflowData {

private Map<String, Object> content = new HashMap<>();

public CreateIngestPipelineResponseData(String ingestPipelineId) {
super();
// PutPipelineAction returns only an acknodledged response, returning ingest pipeline id instead
content.put("pipelineId", ingestPipelineId);
}

@Override
public Map<String, Object> getContent() {
return Map.copyOf(content);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/
package org.opensearch.flowframework.workflow;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.opensearch.action.ingest.PutPipelineRequest;
import org.opensearch.client.Client;
import org.opensearch.client.ClusterAdminClient;
import org.opensearch.core.action.ActionListener;
import org.opensearch.core.common.bytes.BytesReference;
import org.opensearch.core.xcontent.MediaType;

import java.util.List;
import java.util.concurrent.CompletableFuture;

/**
* Workflow step to create an ingest pipeline
*/
public class CreateIngestPipelineStep implements WorkflowStep {

private static final Logger logger = LogManager.getLogger(CreateIngestPipelineStep.class);
private static final String NAME = "create_ingest_pipeline_step";

private final ClusterAdminClient clusterAdminClient;

public CreateIngestPipelineStep(Client client) {
this.clusterAdminClient = client.admin().cluster();
}

@Override
public CompletableFuture<WorkflowData> execute(List<WorkflowData> data) {

CompletableFuture<WorkflowData> createIngestPipelineFuture = new CompletableFuture<>();
PutPipelineRequest putPipelineRequest = null;

// TODO : Still not clear if this is the correct way to retrieve data from the parsed use case tempalte
for (WorkflowData workflowData : data) {
if (workflowData instanceof WorkflowInputData) {

WorkflowInputData inputData = (WorkflowInputData) workflowData;
logger.debug("Previous step sent params: {}, content: {}", inputData.getParams(), workflowData.getContent());

// Extract params and content to create request
String pipelineId = inputData.getParams().get("id");
BytesReference source = (BytesReference) inputData.getContent().get("source");
MediaType mediaType = (MediaType) inputData.getContent().get("mediaType");
putPipelineRequest = new PutPipelineRequest(pipelineId, source, mediaType);
}
}

if (putPipelineRequest != null) {
String pipelineId = putPipelineRequest.getId();
clusterAdminClient.putPipeline(putPipelineRequest, ActionListener.wrap(response -> {
logger.info("Created pipeline : " + pipelineId);
CreateIngestPipelineResponseData responseData = new CreateIngestPipelineResponseData(pipelineId);
createIngestPipelineFuture.complete(responseData);
}, exception -> {
logger.error("Failed to create pipeline : " + exception.getMessage());
createIngestPipelineFuture.completeExceptionally(exception);
}));
} else {
createIngestPipelineFuture.completeExceptionally(new Exception("Failed to create pipeline"));
}
return createIngestPipelineFuture;
}

@Override
public String getName() {
return NAME;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,16 @@
*/
package org.opensearch.flowframework.workflow;

import java.util.Map;

/**
* Interface for handling the input/output of the building blocks.
*/
public interface WorkflowData {}
public interface WorkflowData {

/**
* Accesses a map containing the content of the workflow step. This represents the data associated with a Rest API request.
* @return the content of this step.
*/
Map<String, Object> getContent();
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/
package org.opensearch.flowframework.workflow;

import java.util.Map;

/**
* Interface for handling the input of the building blocks.
*/
public interface WorkflowInputData extends WorkflowData {

/**
* Accesses a map containing the params of this workflow step. This represents the params associated with a Rest API request, parsed from the URI.
* @return the params of this step.
*/
Map<String, String> getParams();

}
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,7 @@
*/
package org.opensearch.flowframework.workflow;

import org.opensearch.common.Nullable;

import java.util.List;
import java.util.concurrent.CompletableFuture;

/**
Expand All @@ -18,11 +17,11 @@
public interface WorkflowStep {

/**
* Triggers the processing of the building block.
* @param data for input/output params of the building blocks.
* @return CompletableFuture of the building block.
* Triggers the actual processing of the building block.
* @param data representing input params and content, or output content of previous steps.
* @return A CompletableFuture of the building block. This block should return immediately, but not be completed until the step executes, containing the step's output data which may be passed to follow-on steps.
*/
CompletableFuture<WorkflowData> execute(@Nullable WorkflowData data);
CompletableFuture<WorkflowData> execute(List<WorkflowData> data);

/**
*
Expand Down
17 changes: 17 additions & 0 deletions src/main/resources/log4j2.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
<?xml version="1.0" encoding="UTF-8"?>
<Configuration status="INFO">
<Appenders>
<Console name="ConsoleAppender" target="SYSTEM_OUT">
<PatternLayout pattern="%d{HH:mm:ss.SSS} %-5level o.o.f.%logger{-3} - %msg%n" />
</Console>
<File name="FileAppender" fileName="application-${date:yyyyMMdd}.log" immediateFlush="false" append="true">
<PatternLayout pattern="%d{yyy-MM-dd HH:mm:ss.SSS} [%t] %-5level %logger{36} - %msg%n"/>
</File>
</Appenders>
<Loggers>
<Root level="debug">
<AppenderRef ref="ConsoleAppender" />
<AppenderRef ref="FileAppender"/>
</Root>
</Loggers>
</Configuration>

0 comments on commit 764c5af

Please sign in to comment.