Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adds REST APIs for creating and provisioning a workflow #63

Merged
merged 30 commits into from
Oct 12, 2023
Merged
Show file tree
Hide file tree
Changes from 27 commits
Commits
Show all changes
30 commits
Select commit Hold shift + click to select a range
d3be2e4
Inital implementation, set up rest/transport actions, registration, p…
joshpalis Sep 29, 2023
544ce89
Merge branch 'main' into create
joshpalis Oct 2, 2023
c32bf31
Addressing PR comments, seting params to snake case, removing redunda…
joshpalis Oct 2, 2023
0bceff6
Merge branch 'main' into create
joshpalis Oct 3, 2023
cc32079
Introducing getExecutorBuilders extension point to FlowFramworkPlugin…
joshpalis Oct 4, 2023
a51c681
updating unit tests for FlowFrameworkPluginTests, adding WorkflowRequ…
joshpalis Oct 5, 2023
524407e
Adding validate/toXContent tests for workflow request/responses
joshpalis Oct 5, 2023
6aaa8c7
Adding unit tests for create and provision rest actions
joshpalis Oct 5, 2023
527ddd6
Merge branch 'main' into create
joshpalis Oct 9, 2023
791f943
Addressing PR comments (Part 1). Moving common vlaues to CommonValue …
joshpalis Oct 9, 2023
c7c819b
Addressing PR comments (Part 2), adding globalcontexthandler to creat…
joshpalis Oct 9, 2023
8084005
Merge branch 'main' into create
joshpalis Oct 9, 2023
6c3d3db
Addressing PR comments (Part 3)
joshpalis Oct 9, 2023
288a8ae
Removing TODOs for RestAction constructors, adding basic unit tests f…
joshpalis Oct 9, 2023
e200d9a
Adding CreateWorkflowTransportAction unit tests
joshpalis Oct 9, 2023
956e823
Adding intial failure test case for the ProvisionWorkflowTransportAct…
joshpalis Oct 10, 2023
9236966
Updating base URI namespace to workflow instead of workflows
joshpalis Oct 10, 2023
0ac7873
Addressing PR comment, updating invalid template config test, removin…
joshpalis Oct 10, 2023
b945bdd
Add success test case for ProvisionWorkflowTransportAction
joshpalis Oct 10, 2023
c29a639
Merge branch 'main' into create
joshpalis Oct 10, 2023
8f1fef7
Merge branch 'main' into create
joshpalis Oct 11, 2023
1dac1ee
Updating global context index mapping for template version and compat…
joshpalis Oct 11, 2023
afeb2b6
Fixing bugs, changed GC index mapping so that template/compatibility …
joshpalis Oct 12, 2023
bbe8eff
Updating GlobalContextHandler.updateTemplate() to use toDocumentSourc…
joshpalis Oct 12, 2023
6725c72
Merge branch 'main' into create
joshpalis Oct 12, 2023
b910ebc
Replacing exceptions with FlowFrameworException
joshpalis Oct 12, 2023
decb31f
Resolving javadoc warnings
joshpalis Oct 12, 2023
09dd471
Cleaning up TODOs
joshpalis Oct 12, 2023
4d96e50
Addressing PR comments
joshpalis Oct 12, 2023
8200441
Addressing PR comments, moving some common template parsing methods t…
joshpalis Oct 12, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -9,28 +9,53 @@
package org.opensearch.flowframework;

import com.google.common.collect.ImmutableList;
import org.opensearch.action.ActionRequest;
import org.opensearch.client.Client;
import org.opensearch.cluster.metadata.IndexNameExpressionResolver;
import org.opensearch.cluster.node.DiscoveryNodes;
import org.opensearch.cluster.service.ClusterService;
import org.opensearch.common.settings.ClusterSettings;
import org.opensearch.common.settings.IndexScopedSettings;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.settings.SettingsFilter;
import org.opensearch.common.util.concurrent.OpenSearchExecutors;
import org.opensearch.core.action.ActionResponse;
import org.opensearch.core.common.io.stream.NamedWriteableRegistry;
import org.opensearch.core.xcontent.NamedXContentRegistry;
import org.opensearch.env.Environment;
import org.opensearch.env.NodeEnvironment;
import org.opensearch.flowframework.indices.GlobalContextHandler;
import org.opensearch.flowframework.rest.RestCreateWorkflowAction;
import org.opensearch.flowframework.rest.RestProvisionWorkflowAction;
import org.opensearch.flowframework.transport.CreateWorkflowAction;
import org.opensearch.flowframework.transport.CreateWorkflowTransportAction;
import org.opensearch.flowframework.transport.ProvisionWorkflowAction;
import org.opensearch.flowframework.transport.ProvisionWorkflowTransportAction;
import org.opensearch.flowframework.workflow.CreateIndexStep;
import org.opensearch.flowframework.workflow.WorkflowProcessSorter;
import org.opensearch.flowframework.workflow.WorkflowStepFactory;
import org.opensearch.plugins.ActionPlugin;
import org.opensearch.plugins.Plugin;
import org.opensearch.repositories.RepositoriesService;
import org.opensearch.rest.RestController;
import org.opensearch.rest.RestHandler;
import org.opensearch.script.ScriptService;
import org.opensearch.threadpool.ExecutorBuilder;
import org.opensearch.threadpool.FixedExecutorBuilder;
import org.opensearch.threadpool.ThreadPool;
import org.opensearch.watcher.ResourceWatcherService;

import java.util.Collection;
import java.util.List;
import java.util.function.Supplier;

import static org.opensearch.flowframework.common.CommonValue.FLOW_FRAMEWORK_THREAD_POOL_PREFIX;
import static org.opensearch.flowframework.common.CommonValue.PROVISION_THREAD_POOL;

/**
* An OpenSearch plugin that enables builders to innovate AI apps on OpenSearch.
*/
public class FlowFrameworkPlugin extends Plugin {
public class FlowFrameworkPlugin extends Plugin implements ActionPlugin {

/**
* Instantiate this plugin.
Expand All @@ -54,6 +79,45 @@ public Collection<Object> createComponents(
WorkflowStepFactory workflowStepFactory = new WorkflowStepFactory(clusterService, client);
WorkflowProcessSorter workflowProcessSorter = new WorkflowProcessSorter(workflowStepFactory, threadPool);

return ImmutableList.of(workflowStepFactory, workflowProcessSorter);
// TODO : Refactor, move system index creation/associated methods outside of the CreateIndexStep
GlobalContextHandler globalContextHandler = new GlobalContextHandler(client, new CreateIndexStep(clusterService, client));

return ImmutableList.of(workflowStepFactory, workflowProcessSorter, globalContextHandler);
}

@Override
public List<RestHandler> getRestHandlers(
Settings settings,
RestController restController,
ClusterSettings clusterSettings,
IndexScopedSettings indexScopedSettings,
SettingsFilter settingsFilter,
IndexNameExpressionResolver indexNameExpressionResolver,
Supplier<DiscoveryNodes> nodesInCluster
) {
return ImmutableList.of(new RestCreateWorkflowAction(), new RestProvisionWorkflowAction());
}

@Override
public List<ActionHandler<? extends ActionRequest, ? extends ActionResponse>> getActions() {
return ImmutableList.of(
new ActionHandler<>(CreateWorkflowAction.INSTANCE, CreateWorkflowTransportAction.class),
new ActionHandler<>(ProvisionWorkflowAction.INSTANCE, ProvisionWorkflowTransportAction.class)
);
}

@Override
public List<ExecutorBuilder<?>> getExecutorBuilders(Settings settings) {
// TODO : Determine final size/queueSize values for the provision thread pool
joshpalis marked this conversation as resolved.
Show resolved Hide resolved
return ImmutableList.of(
new FixedExecutorBuilder(
settings,
PROVISION_THREAD_POOL,
OpenSearchExecutors.allocatedProcessors(settings),
10,
FLOW_FRAMEWORK_THREAD_POOL_PREFIX + PROVISION_THREAD_POOL
)
);
owaiskazi19 marked this conversation as resolved.
Show resolved Hide resolved
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -13,19 +13,49 @@
*/
public class CommonValue {

/** Default value for no schema version */
public static Integer NO_SCHEMA_VERSION = 0;
/** Index mapping meta field name*/
public static final String META = "_meta";
/** Schema Version field name */
public static final String SCHEMA_VERSION_FIELD = "schema_version";
/** Global Context Index Name */
public static final String GLOBAL_CONTEXT_INDEX = ".plugins-ai-global-context";
/** Global Context index mapping file path */
public static final String GLOBAL_CONTEXT_INDEX_MAPPING = "mappings/global-context.json";
/** Global Context index mapping version */
public static final Integer GLOBAL_CONTEXT_INDEX_VERSION = 1;

/** The base URI for this plugin's rest actions */
public static final String AI_FLOW_FRAMEWORK_BASE_URI = "/_plugins/_flow_framework";
/** The URI for this plugin's workflow rest actions */
public static final String WORKFLOW_URI = AI_FLOW_FRAMEWORK_BASE_URI + "/workflow";
/** Field name for workflow Id, the document Id of the indexed use case template */
public static final String WORKFLOW_ID = "workflow_id";
/** The field name for provision workflow within a use case template*/
public static final String PROVISION_WORKFLOW = "provision";

/** Flow Framework plugin thread pool name prefix */
public static final String FLOW_FRAMEWORK_THREAD_POOL_PREFIX = "thread_pool.flow_framework.";
/** The provision workflow thread pool name */
public static final String PROVISION_THREAD_POOL = "opensearch_workflow_provision";

/** Model Id field */
public static final String MODEL_ID = "model_id";
/** Function Name field */
public static final String FUNCTION_NAME = "function_name";
/** Model Name field */
public static final String MODEL_NAME = "name";
/** Model Version field */
public static final String MODEL_VERSION = "model_version";
/** Model Group Id field */
public static final String MODEL_GROUP_ID = "model_group_id";
/** Description field */
public static final String DESCRIPTION = "description";
/** Connector Id field */
public static final String CONNECTOR_ID = "connector_id";
/** Model format field */
public static final String MODEL_FORMAT = "model_format";
/** Model config field */
owaiskazi19 marked this conversation as resolved.
Show resolved Hide resolved
public static final String MODEL_CONFIG = "model_config";
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ public class FlowFrameworkException extends RuntimeException {

private static final long serialVersionUID = 1L;

/** The rest status code of this exception */
private final RestStatus restStatus;

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,9 @@
* An enumeration of Flow Framework indices
*/
public enum FlowFrameworkIndex {
/**
* Global Context Index
*/
GLOBAL_CONTEXT(
GLOBAL_CONTEXT_INDEX,
ThrowingSupplierWrapper.throwingSupplierWrapper(GlobalContextHandler::getGlobalContextMappings),
Expand All @@ -35,14 +38,26 @@ public enum FlowFrameworkIndex {
this.version = version;
}

/**
* Retrieves the index name
* @return the index name
*/
public String getIndexName() {
return indexName;
}

/**
* Retrieves the index mapping
* @return the index mapping
*/
public String getMapping() {
return mapping;
}

/**
* Retrieves the index version
* @return the index version
*/
public Integer getVersion() {
return version;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ public void putTemplateToGlobalContext(Template template, ActionListener<IndexRe
XContentBuilder builder = XContentFactory.jsonBuilder();
ThreadContext.StoredContext context = client.threadPool().getThreadContext().stashContext()
) {
request.source(template.toXContent(builder, ToXContent.EMPTY_PARAMS))
request.source(template.toDocumentSource(builder, ToXContent.EMPTY_PARAMS))
.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE);
client.index(request, ActionListener.runBefore(listener, () -> context.restore()));
} catch (Exception e) {
Expand All @@ -94,6 +94,35 @@ public void putTemplateToGlobalContext(Template template, ActionListener<IndexRe
}));
}

/**
* Replaces a document in the global context index
* @param documentId the document Id
* @param template the use-case template
* @param listener action listener
*/
public void updateTemplate(String documentId, Template template, ActionListener<IndexResponse> listener) {
owaiskazi19 marked this conversation as resolved.
Show resolved Hide resolved
if (!createIndexStep.doesIndexExist(GLOBAL_CONTEXT_INDEX)) {
String exceptionMessage = "Failed to update template for workflow_id : "
+ documentId
+ ", global_context index does not exist.";
logger.error(exceptionMessage);
listener.onFailure(new Exception(exceptionMessage));
} else {
IndexRequest request = new IndexRequest(GLOBAL_CONTEXT_INDEX).id(documentId);
try (
XContentBuilder builder = XContentFactory.jsonBuilder();
ThreadContext.StoredContext context = client.threadPool().getThreadContext().stashContext()
) {
request.source(template.toDocumentSource(builder, ToXContent.EMPTY_PARAMS))
.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE);
client.index(request, ActionListener.runBefore(listener, () -> context.restore()));
owaiskazi19 marked this conversation as resolved.
Show resolved Hide resolved
} catch (Exception e) {
logger.error("Failed to update global_context entry : {}. {}", documentId, e.getMessage());
listener.onFailure(e);
}
}
}

/**
* Update global context index for specific fields
* @param documentId global context index document id
Expand Down
Loading