Skip to content

Commit

Permalink
Introduce global-context index and related operations (#65)
Browse files Browse the repository at this point in the history
* Add global context index and indices handler

Signed-off-by: Jackie Han <[email protected]>

* Update global context index mapping

Signed-off-by: Jackie Han <[email protected]>

* correct checkstyle errors

Signed-off-by: Jackie Han <[email protected]>

* skip index handler integ tests

Signed-off-by: Jackie Han <[email protected]>

* remove indices integration tests for now

Signed-off-by: Jackie Han <[email protected]>

* rebase - add global-context index handler

Signed-off-by: Jackie Han <[email protected]>

* Add unit tests

Signed-off-by: Jackie Han <[email protected]>

* remove duplicate index name file

Signed-off-by: Jackie Han <[email protected]>

* refactor package and file names

Signed-off-by: Jackie Han <[email protected]>

* spotless apply

Signed-off-by: Jackie Han <[email protected]>

* add javax ws dependency

Signed-off-by: Jackie Han <[email protected]>

* remove visible for testing

Signed-off-by: Jackie Han <[email protected]>

* add final keyword to map in Template ToXContect parser

Signed-off-by: Jackie Han <[email protected]>

* spotless apply

Signed-off-by: Jackie Han <[email protected]>

* disable checkStyleTest

Signed-off-by: Jackie Han <[email protected]>

* Add more unit tests

Signed-off-by: Jackie Han <[email protected]>

* use OpenSearch rest status code

Signed-off-by: Jackie Han <[email protected]>

* Addressing comments

Signed-off-by: Jackie Han <[email protected]>

* update resposnes field name to userOutputs

Signed-off-by: Jackie Han <[email protected]>

* spotlessApply

Signed-off-by: Jackie Han <[email protected]>

---------

Signed-off-by: Jackie Han <[email protected]>
  • Loading branch information
jackiehanyang authored Oct 9, 2023
1 parent 28326bd commit c4f1fc7
Show file tree
Hide file tree
Showing 20 changed files with 877 additions and 46 deletions.
26 changes: 13 additions & 13 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -26,20 +26,20 @@ publishing {
publications {
pluginZip(MavenPublication) { publication ->
pom {
name = pluginName
description = pluginDescription
licenses {
license {
name = "The Apache License, Version 2.0"
url = "http://www.apache.org/licenses/LICENSE-2.0.txt"
name = pluginName
description = pluginDescription
licenses {
license {
name = "The Apache License, Version 2.0"
url = "http://www.apache.org/licenses/LICENSE-2.0.txt"
}
}
}
developers {
developer {
name = "OpenSearch AI Flow Framework Plugin"
url = "https://github.com/opensearch-project/opensearch-ai-flow-framework"
developers {
developer {
name = "OpenSearch AI Flow Framework Plugin"
url = "https://github.com/opensearch-project/opensearch-ai-flow-framework"
}
}
}
}
}
}
Expand Down Expand Up @@ -159,7 +159,7 @@ task updateVersion {
doLast {
ext.newVersion = System.getProperty('newVersion')
println "Setting version to ${newVersion}."
// String tokenization to support -SNAPSHOT
// String tokenization to support -SNAPSHOT
ant.replaceregexp(file:'build.gradle', match: '"opensearch.version", "\\d.*"', replace: '"opensearch.version", "' + newVersion.tokenize('-')[0] + '-SNAPSHOT"', flags:'g', byline:true)
}
}
Expand Down
4 changes: 3 additions & 1 deletion src/main/java/demo/Demo.java
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import org.apache.logging.log4j.Logger;
import org.opensearch.client.Client;
import org.opensearch.client.node.NodeClient;
import org.opensearch.cluster.service.ClusterService;
import org.opensearch.common.SuppressForbidden;
import org.opensearch.common.io.PathUtils;
import org.opensearch.common.settings.Settings;
Expand Down Expand Up @@ -56,8 +57,9 @@ public static void main(String[] args) throws IOException {
logger.error("Failed to read JSON at path {}", path);
return;
}
ClusterService clusterService = new ClusterService(null, null, null);
Client client = new NodeClient(null, null);
WorkflowStepFactory factory = new WorkflowStepFactory(client);
WorkflowStepFactory factory = new WorkflowStepFactory(clusterService, client);

ThreadPool threadPool = new ThreadPool(Settings.EMPTY);
WorkflowProcessSorter workflowProcessSorter = new WorkflowProcessSorter(factory, threadPool);
Expand Down
5 changes: 4 additions & 1 deletion src/main/java/demo/TemplateParseDemo.java
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import org.apache.logging.log4j.Logger;
import org.opensearch.client.Client;
import org.opensearch.client.node.NodeClient;
import org.opensearch.cluster.service.ClusterService;
import org.opensearch.common.SuppressForbidden;
import org.opensearch.common.io.PathUtils;
import org.opensearch.common.settings.Settings;
Expand Down Expand Up @@ -52,8 +53,10 @@ public static void main(String[] args) throws IOException {
logger.error("Failed to read JSON at path {}", path);
return;
}
ClusterService clusterService = new ClusterService(null, null, null);
Client client = new NodeClient(null, null);
WorkflowStepFactory factory = new WorkflowStepFactory(client);

WorkflowStepFactory factory = new WorkflowStepFactory(clusterService, client);
ThreadPool threadPool = new ThreadPool(Settings.EMPTY);
WorkflowProcessSorter workflowProcessSorter = new WorkflowProcessSorter(factory, threadPool);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ public Collection<Object> createComponents(
IndexNameExpressionResolver indexNameExpressionResolver,
Supplier<RepositoriesService> repositoriesServiceSupplier
) {
WorkflowStepFactory workflowStepFactory = new WorkflowStepFactory(client);
WorkflowStepFactory workflowStepFactory = new WorkflowStepFactory(clusterService, client);
WorkflowProcessSorter workflowProcessSorter = new WorkflowProcessSorter(workflowStepFactory, threadPool);

return ImmutableList.of(workflowStepFactory, workflowProcessSorter);
Expand Down
22 changes: 22 additions & 0 deletions src/main/java/org/opensearch/flowframework/common/CommonValue.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/
package org.opensearch.flowframework.common;

/**
* Representation of common values that are used across project
*/
public class CommonValue {

public static Integer NO_SCHEMA_VERSION = 0;
public static final String META = "_meta";
public static final String SCHEMA_VERSION_FIELD = "schema_version";
public static final String GLOBAL_CONTEXT_INDEX = ".plugins-ai-global-context";
public static final String GLOBAL_CONTEXT_INDEX_MAPPING = "mappings/global-context.json";
public static final Integer GLOBAL_CONTEXT_INDEX_VERSION = 1;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/
package org.opensearch.flowframework.common;

/**
* A supplier that can throw checked exception
*
* @param <T> method parameter type
* @param <E> Exception type
*/
@FunctionalInterface
public interface ThrowingSupplier<T, E extends Exception> {
/**
* Gets a result or throws an exception if unable to produce a result.
*
* @return the result
* @throws E if unable to produce a result
*/
T get() throws E;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/
package org.opensearch.flowframework.common;

import java.util.function.Supplier;

/**
* Wrapper for throwing checked exception inside places that does not allow to do so
*/
public class ThrowingSupplierWrapper {

private ThrowingSupplierWrapper() {}

/**
* Utility method to use a method throwing checked exception inside a place
* that does not allow throwing the corresponding checked exception (e.g.,
* enum initialization).
* Convert the checked exception thrown by throwingConsumer to a RuntimeException
* so that the compiler won't complain.
* @param <T> the method's return type
* @param throwingSupplier the method reference that can throw checked exception
* @return converted method reference
*/
public static <T> Supplier<T> throwingSupplierWrapper(ThrowingSupplier<T, Exception> throwingSupplier) {

return () -> {
try {
return throwingSupplier.get();
} catch (Exception ex) {
throw new RuntimeException(ex);
}
};
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/
package org.opensearch.flowframework.exception;

import org.opensearch.core.rest.RestStatus;

/**
* Representation of Flow Framework Exceptions
*/
public class FlowFrameworkException extends RuntimeException {

private static final long serialVersionUID = 1L;

private final RestStatus restStatus;

/**
* Constructor with error message.
*
* @param message message of the exception
* @param restStatus HTTP status code of the response
*/
public FlowFrameworkException(String message, RestStatus restStatus) {
super(message);
this.restStatus = restStatus;
}

/**
* Constructor with specified cause.
* @param cause exception cause
* @param restStatus HTTP status code of the response
*/
public FlowFrameworkException(Throwable cause, RestStatus restStatus) {
super(cause);
this.restStatus = restStatus;
}

/**
* Constructor with specified error message adn cause.
* @param message error message
* @param cause exception cause
* @param restStatus HTTP status code of the response
*/
public FlowFrameworkException(String message, Throwable cause, RestStatus restStatus) {
super(message, cause);
this.restStatus = restStatus;
}

/**
* Getter for restStatus.
*
* @return the HTTP status code associated with the exception
*/
public RestStatus getRestStatus() {
return restStatus;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/
package org.opensearch.flowframework.indices;

import org.opensearch.flowframework.common.ThrowingSupplierWrapper;

import java.util.function.Supplier;

import static org.opensearch.flowframework.common.CommonValue.GLOBAL_CONTEXT_INDEX;
import static org.opensearch.flowframework.common.CommonValue.GLOBAL_CONTEXT_INDEX_VERSION;

/**
* An enumeration of Flow Framework indices
*/
public enum FlowFrameworkIndex {
GLOBAL_CONTEXT(
GLOBAL_CONTEXT_INDEX,
ThrowingSupplierWrapper.throwingSupplierWrapper(GlobalContextHandler::getGlobalContextMappings),
GLOBAL_CONTEXT_INDEX_VERSION
);

private final String indexName;
private final String mapping;
private final Integer version;

FlowFrameworkIndex(String name, Supplier<String> mappingSupplier, Integer version) {
this.indexName = name;
this.mapping = mappingSupplier.get();
this.version = version;
}

public String getIndexName() {
return indexName;
}

public String getMapping() {
return mapping;
}

public Integer getVersion() {
return version;
}
}
Loading

0 comments on commit c4f1fc7

Please sign in to comment.