Skip to content

Commit

Permalink
rebase - add global-context index handler
Browse files Browse the repository at this point in the history
Signed-off-by: Jackie Han <[email protected]>
  • Loading branch information
jackiehanyang committed Sep 28, 2023
1 parent 2fb6509 commit bae2dd7
Show file tree
Hide file tree
Showing 13 changed files with 355 additions and 275 deletions.
4 changes: 3 additions & 1 deletion src/main/java/demo/Demo.java
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import org.apache.logging.log4j.Logger;
import org.opensearch.client.Client;
import org.opensearch.client.node.NodeClient;
import org.opensearch.cluster.service.ClusterService;
import org.opensearch.common.SuppressForbidden;
import org.opensearch.common.io.PathUtils;
import org.opensearch.flowframework.model.Template;
Expand Down Expand Up @@ -53,8 +54,9 @@ public static void main(String[] args) throws IOException {
logger.error("Failed to read JSON at path {}", path);
return;
}
ClusterService clusterService = new ClusterService(null, null, null);
Client client = new NodeClient(null, null);
WorkflowStepFactory factory = WorkflowStepFactory.create(client);
WorkflowStepFactory factory = WorkflowStepFactory.create(clusterService, client);
ExecutorService executor = Executors.newFixedThreadPool(10);
WorkflowProcessSorter.create(factory, executor);

Expand Down
4 changes: 3 additions & 1 deletion src/main/java/demo/TemplateParseDemo.java
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import org.apache.logging.log4j.Logger;
import org.opensearch.client.Client;
import org.opensearch.client.node.NodeClient;
import org.opensearch.cluster.service.ClusterService;
import org.opensearch.common.SuppressForbidden;
import org.opensearch.common.io.PathUtils;
import org.opensearch.flowframework.model.Template;
Expand Down Expand Up @@ -48,8 +49,9 @@ public static void main(String[] args) throws IOException {
logger.error("Failed to read JSON at path {}", path);
return;
}
ClusterService clusterService = new ClusterService(null, null, null);
Client client = new NodeClient(null, null);
WorkflowStepFactory factory = WorkflowStepFactory.create(client);
WorkflowStepFactory factory = WorkflowStepFactory.create(clusterService, client);
WorkflowProcessSorter.create(factory, Executors.newFixedThreadPool(10));

Template t = Template.parse(json);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ public Collection<Object> createComponents(
IndexNameExpressionResolver indexNameExpressionResolver,
Supplier<RepositoriesService> repositoriesServiceSupplier
) {
WorkflowStepFactory workflowStepFactory = WorkflowStepFactory.create(client);
WorkflowStepFactory workflowStepFactory = WorkflowStepFactory.create(clusterService, client);
WorkflowProcessSorter workflowProcessSorter = WorkflowProcessSorter.create(workflowStepFactory, threadPool.generic());

return ImmutableList.of(workflowStepFactory, workflowProcessSorter);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,76 +16,8 @@ public class CommonValue {
public static Integer NO_SCHEMA_VERSION = 0;
public static final String META = "_meta";
public static final String SCHEMA_VERSION_FIELD = "schema_version";
public static final Integer GLOBAL_CONTEXT_INDEX_SCHEMA_VERSION = 1;
public static final String GLOBAL_CONTEXT_INDEX_MAPPING = "{\n "
+ " \"dynamic\": false,\n"
+ " \"_meta\": {\n"
+ " \"schema_version\": "
+ GLOBAL_CONTEXT_INDEX_SCHEMA_VERSION
+ "\n"
+ " },\n"
+ " \"properties\": {\n"
+ " \"pipeline_id\": {\n"
+ " \"type\": \"keyword\"\n"
+ " },\n"
+ " \"name\": {\n"
+ " \"type\": \"text\",\n"
+ " \"fields\": {\n"
+ " \"keyword\": {\n"
+ " \"type\": \"keyword\",\n"
+ " \"ignore_above\": 256\n"
+ " }\n"
+ " }\n"
+ " },\n"
+ " \"description\": {\n"
+ " \"type\": \"text\"\n"
+ " },\n"
+ " \"use_case\": {\n"
+ " \"type\": \"keyword\"\n"
+ " },\n"
+ " \"operations\": {\n"
+ " \"type\": \"keyword\"\n"
+ " },\n"
+ " \"version\": {\n"
+ " \"type\": \"nested\",\n"
+ " \"properties\": {\n"
+ " \"template\": {\n"
+ " \"type\": \"integer\"\n"
+ " },\n"
+ " \"compatibility\": {\n"
+ " \"type\": \"integer\"\n"
+ " }\n"
+ " }\n"
+ " },\n"
+ " \"user_inputs\": {\n"
+ " \"type\": \"nested\",\n"
+ " \"properties\": {\n"
+ " \"model_id\": {\n"
+ " \"type\": \"keyword\"\n"
+ " },\n"
+ " \"input_field\": {\n"
+ " \"type\": \"keyword\"\n"
+ " },\n"
+ " \"output_field\": {\n"
+ " \"type\": \"keyword\"\n"
+ " },\n"
+ " \"ingest_pipeline_name\": {\n"
+ " \"type\": \"keyword\"\n"
+ " },\n"
+ " \"index_name\": {\n"
+ " \"type\": \"keyword\"\n"
+ " }\n"
+ " }\n"
+ " },\n"
+ " \"workflows\": {\n"
+ " \"type\": \"text\"\n"
+ " },\n"
+ " \"responses\": {\n"
+ " \"type\": \"text\"\n"
+ " },\n"
+ " \"resources_created\": {\n"
+ " \"type\": \"text\"\n"
+ " }\n"
+ " }\n"
+ "}";

public static final String GLOBAL_CONTEXT_INDEX = ".plugins-ai-global-context";
public static final String GLOBAL_CONTEXT_INDEX_MAPPING = "mappings/global-context.json";
public static final Integer GLOBAL_CONTEXT_INDEX_VERSION = 1;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/
package org.opensearch.flowframework.function;

/**
* A supplier that can throw checked exception
*
* @param <T> method parameter type
* @param <E> Exception type
*/
@FunctionalInterface
public interface ThrowingSupplier<T, E extends Exception> {
T get() throws E;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/
package org.opensearch.flowframework.function;

import java.util.function.Supplier;

public class ThrowingSupplierWrapper {
/*
* Private constructor to avoid Jacoco complaining about public constructor
* not covered: https://tinyurl.com/yetc7tra
*/
private ThrowingSupplierWrapper() {}

/**
* Utility method to use a method throwing checked exception inside a place
* that does not allow throwing the corresponding checked exception (e.g.,
* enum initialization).
* Convert the checked exception thrown by by throwingConsumer to a RuntimeException
* so that the compiler won't complain.
* @param <T> the method's return type
* @param throwingSupplier the method reference that can throw checked exception
* @return converted method reference
*/
public static <T> Supplier<T> throwingSupplierWrapper(ThrowingSupplier<T, Exception> throwingSupplier) {

return () -> {
try {
return throwingSupplier.get();
} catch (Exception ex) {
throw new RuntimeException(ex);
}
};
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,23 +8,29 @@
*/
package org.opensearch.flowframework.indices;

import org.opensearch.flowframework.function.ThrowingSupplierWrapper;

import java.util.function.Supplier;

import static org.opensearch.flowframework.constant.CommonName.GLOBAL_CONTEXT_INDEX_NAME;
import static org.opensearch.flowframework.constant.CommonValue.GLOBAL_CONTEXT_INDEX_MAPPING;
import static org.opensearch.flowframework.constant.CommonValue.GLOBAL_CONTEXT_INDEX_SCHEMA_VERSION;
import static org.opensearch.flowframework.constant.CommonValue.GLOBAL_CONTEXT_INDEX_VERSION;

/**
* An enumeration of Flow Framework indices
*/
public enum FlowFrameworkIndex {
GLOBAL_CONTEXT(GLOBAL_CONTEXT_INDEX_NAME, GLOBAL_CONTEXT_INDEX_MAPPING, GLOBAL_CONTEXT_INDEX_SCHEMA_VERSION);
GLOBAL_CONTEXT(
GLOBAL_CONTEXT_INDEX_NAME,
ThrowingSupplierWrapper.throwingSupplierWrapper(GlobalContextHandler::getGlobalContextMappings),
GLOBAL_CONTEXT_INDEX_VERSION);

private final String indexName;
private final String mapping;
private final Integer version;

FlowFrameworkIndex(String name, String mapping, Integer version) {
FlowFrameworkIndex(String name, Supplier<String> mappingSupplier, Integer version) {
this.indexName = name;
this.mapping = mapping;
this.mapping = mappingSupplier.get();
this.version = version;
}

Expand All @@ -35,7 +41,6 @@ public String getIndexName() {
public String getMapping() {
return mapping;
}

public Integer getVersion() {
return version;
}
Expand Down
Loading

0 comments on commit bae2dd7

Please sign in to comment.