Skip to content

Commit

Permalink
Add unit tests
Browse files Browse the repository at this point in the history
Signed-off-by: Jackie Han <[email protected]>
  • Loading branch information
jackiehanyang committed Sep 30, 2023
1 parent bae2dd7 commit 0be7ba0
Show file tree
Hide file tree
Showing 15 changed files with 429 additions and 114 deletions.
21 changes: 0 additions & 21 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -141,27 +141,6 @@ tasks.named("check").configure { dependsOn(integTest) }

integTest {
dependsOn "bundlePlugin"
systemProperty 'tests.security.manager', 'false'
systemProperty 'java.io.tmpdir', opensearch_tmp_dir.absolutePath
systemProperty "https", System.getProperty("https")
systemProperty "user", System.getProperty("user")
systemProperty "password", System.getProperty("password")

// // Only rest case can run with remote cluster
// if (System.getProperty("tests.rest.cluster") != null) {
// filter {
// includeTestsMatching "org.opensearch.flowframework.rest.*IT"
// }
// }
//
// if (System.getProperty("https") == null || System.getProperty("https") == "false") {
// filter {
// }
// }

filter {
excludeTestsMatching "org.opensearch.flowframework.indices.*Tests"
}

// The --debug-jvm command-line option makes the cluster debuggable; this makes the tests debuggable
if (System.getProperty("test.debug") != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,5 +16,11 @@
*/
@FunctionalInterface
public interface ThrowingSupplier<T, E extends Exception> {
/**
* Gets a result or throws an exception if unable to produce a result.
*
* @return the result
* @throws E if unable to produce a result
*/
T get() throws E;
}
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,9 @@

import java.util.function.Supplier;

/**
* Wrapper for throwing checked exception inside places that does not allow to do so
*/
public class ThrowingSupplierWrapper {
/*
* Private constructor to avoid Jacoco complaining about public constructor
Expand All @@ -21,7 +24,7 @@ private ThrowingSupplierWrapper() {}
* Utility method to use a method throwing checked exception inside a place
* that does not allow throwing the corresponding checked exception (e.g.,
* enum initialization).
* Convert the checked exception thrown by by throwingConsumer to a RuntimeException
* Convert the checked exception thrown by throwingConsumer to a RuntimeException
* so that the compiler won't complain.
* @param <T> the method's return type
* @param throwingSupplier the method reference that can throw checked exception
Expand All @@ -37,4 +40,4 @@ public static <T> Supplier<T> throwingSupplierWrapper(ThrowingSupplier<T, Except
}
};
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,16 +13,18 @@
import java.util.function.Supplier;

import static org.opensearch.flowframework.constant.CommonName.GLOBAL_CONTEXT_INDEX_NAME;
import static org.opensearch.flowframework.constant.CommonValue.GLOBAL_CONTEXT_INDEX;
import static org.opensearch.flowframework.constant.CommonValue.GLOBAL_CONTEXT_INDEX_VERSION;

/**
* An enumeration of Flow Framework indices
*/
public enum FlowFrameworkIndex {
GLOBAL_CONTEXT(
GLOBAL_CONTEXT_INDEX_NAME,
ThrowingSupplierWrapper.throwingSupplierWrapper(GlobalContextHandler::getGlobalContextMappings),
GLOBAL_CONTEXT_INDEX_VERSION);
GLOBAL_CONTEXT_INDEX,
ThrowingSupplierWrapper.throwingSupplierWrapper(GlobalContextHandler::getGlobalContextMappings),
GLOBAL_CONTEXT_INDEX_VERSION
);

private final String indexName;
private final String mapping;
Expand All @@ -41,6 +43,7 @@ public String getIndexName() {
public String getMapping() {
return mapping;
}

public Integer getVersion() {
return version;
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,25 +1,53 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/
package org.opensearch.flowframework.indices;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.opensearch.action.index.IndexRequest;
import org.opensearch.action.index.IndexResponse;
import org.opensearch.action.support.WriteRequest;
import org.opensearch.action.update.UpdateRequest;
import org.opensearch.action.update.UpdateResponse;
import org.opensearch.client.Client;
import org.opensearch.common.util.concurrent.ThreadContext;
import org.opensearch.common.xcontent.XContentFactory;
import org.opensearch.core.action.ActionListener;
import org.opensearch.core.xcontent.ToXContent;
import org.opensearch.core.xcontent.XContentBuilder;
import org.opensearch.flowframework.exception.FlowFrameworkException;
import org.opensearch.flowframework.model.Template;
import org.opensearch.flowframework.workflow.CreateIndexStep;
import org.opensearch.flowframework.workflow.WorkflowData;

import java.io.IOException;
import java.util.List;
import java.util.HashMap;
import java.util.Map;

import static org.opensearch.flowframework.constant.CommonValue.GLOBAL_CONTEXT_INDEX;
import static org.opensearch.flowframework.constant.CommonValue.GLOBAL_CONTEXT_INDEX_MAPPING;
import static org.opensearch.flowframework.workflow.CreateIndexStep.getIndexMappings;

/**
* A handler for global context related operations
*/
public class GlobalContextHandler {
private static final Logger logger = LogManager.getLogger(GlobalContextHandler.class);
private CreateIndexStep createIndexStep;
private final Client client;
private final CreateIndexStep createIndexStep;

public GlobalContextHandler(CreateIndexStep createIndexStep) {
/**
* constructor
* @param client the open search client
* @param createIndexStep create index step
*/
public GlobalContextHandler(Client client, CreateIndexStep createIndexStep) {
this.client = client;
this.createIndexStep = createIndexStep;
}

Expand All @@ -36,23 +64,52 @@ private void initGlobalContextIndexIfAbsent(ActionListener<Boolean> listener) {
createIndexStep.initIndexIfAbsent(FlowFrameworkIndex.GLOBAL_CONTEXT, listener);
}

public void putGlobalContextDocument(ActionListener<IndexResponse> listener) {
initGlobalContextIndexIfAbsent(listener.wrap(indexCreated -> {
/**
* add document insert into global context index
* @param template the use-case template
* @param listener action listener
*/
public void putTemplateToGlobalContext(Template template, ActionListener<IndexResponse> listener){
initGlobalContextIndexIfAbsent(ActionListener.wrap(indexCreated -> {
if (!indexCreated) {

listener.onFailure(new FlowFrameworkException("No response to create global_context index"));
return;
}
IndexRequest request = new IndexRequest(GLOBAL_CONTEXT_INDEX);
try () {

try (
XContentBuilder builder = XContentFactory.jsonBuilder();
ThreadContext.StoredContext context = client.threadPool().getThreadContext().stashContext()
) {
request.source(template.toXContent(builder, ToXContent.EMPTY_PARAMS))
.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE);
client.index(request, ActionListener.runBefore(listener, () -> context.restore()));
} catch (Exception e) {

logger.error("Failed to index global_context index");
listener.onFailure(e);
}
}, e -> {

logger.error("Failed to create global_context index", e);
listener.onFailure(e);
}));
}

public void storeResponseToGlobalContext(String documentId, List<WorkflowData> workflowDataList) {

/**
* Update global context index for specific fields
* @param documentId global context index document id
* @param updatedFields updated fields; key: field name, value: new value
* @param listener UpdateResponse action listener
*/
public void storeResponseToGlobalContext(
String documentId,
Map<String, Object> updatedFields,
ActionListener<UpdateResponse> listener
) {
UpdateRequest updateRequest = new UpdateRequest(GLOBAL_CONTEXT_INDEX, documentId);
Map<String, Object> updatedResponsesContext = new HashMap<>();
updatedResponsesContext.putAll(updatedFields);
updateRequest.doc(updatedResponsesContext);
updateRequest.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE);
// TODO: decide what condition can be considered as an update conflict and add retry strategy
client.update(updateRequest, listener);
}
}
94 changes: 92 additions & 2 deletions src/main/java/org/opensearch/flowframework/model/Template.java
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,10 @@ public class Template implements ToXContentObject {
public static final String USER_INPUTS_FIELD = "user_inputs";
/** The template field name for template workflows */
public static final String WORKFLOWS_FIELD = "workflows";
/** The template field name for template responses */
public static final String RESPONSES_FIELD = "responses";
/** The template field name for template resources created */
public static final String RESOURCES_CREATED_FIELD = "resources_created";

private final String name;
private final String description;
Expand All @@ -58,6 +62,8 @@ public class Template implements ToXContentObject {
private final List<Version> compatibilityVersion;
private final Map<String, Object> userInputs;
private final Map<String, Workflow> workflows;
private Map<String, Object> responses;
private Map<String, Object> resourcesCreated;

/**
* Instantiate the object representing a use case template
Expand All @@ -70,6 +76,8 @@ public class Template implements ToXContentObject {
* @param compatibilityVersion OpenSearch version compatibility of this template
* @param userInputs Optional user inputs to apply globally
* @param workflows Workflow graph definitions corresponding to the defined operations.
* @param responses A map of essential API responses for backend to use and lookup.
* @param resourcesCreated A map of all the resources created.
*/
public Template(
String name,
Expand All @@ -79,7 +87,9 @@ public Template(
Version templateVersion,
List<Version> compatibilityVersion,
Map<String, Object> userInputs,
Map<String, Workflow> workflows
Map<String, Workflow> workflows,
Map<String, Object> responses,
Map<String, Object> resourcesCreated
) {
this.name = name;
this.description = description;
Expand All @@ -89,6 +99,8 @@ public Template(
this.compatibilityVersion = List.copyOf(compatibilityVersion);
this.userInputs = Map.copyOf(userInputs);
this.workflows = Map.copyOf(workflows);
this.responses = Map.copyOf(responses);
this.resourcesCreated = Map.copyOf(resourcesCreated);
}

@Override
Expand Down Expand Up @@ -132,6 +144,18 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws
}
xContentBuilder.endObject();

xContentBuilder.startObject(RESPONSES_FIELD);
for (Entry<String, Object> e : responses.entrySet()) {
xContentBuilder.field(e.getKey(), e.getValue());
}
xContentBuilder.endObject();

xContentBuilder.startObject(RESOURCES_CREATED_FIELD);
for (Entry<String, Object> e : resourcesCreated.entrySet()) {
xContentBuilder.field(e.getKey(), e.getValue());
}
xContentBuilder.endObject();

return xContentBuilder.endObject();
}

Expand All @@ -151,6 +175,8 @@ public static Template parse(XContentParser parser) throws IOException {
List<Version> compatibilityVersion = new ArrayList<>();
Map<String, Object> userInputs = new HashMap<>();
Map<String, Workflow> workflows = new HashMap<>();
Map<String, Object> responses = new HashMap<>();
Map<String, Object> resourcesCreated = new HashMap<>();

ensureExpectedToken(XContentParser.Token.START_OBJECT, parser.currentToken(), parser);
while (parser.nextToken() != XContentParser.Token.END_OBJECT) {
Expand Down Expand Up @@ -216,6 +242,39 @@ public static Template parse(XContentParser parser) throws IOException {
workflows.put(workflowFieldName, Workflow.parse(parser));
}
break;
case RESPONSES_FIELD:
ensureExpectedToken(XContentParser.Token.START_OBJECT, parser.currentToken(), parser);
while (parser.nextToken() != XContentParser.Token.END_OBJECT) {
String responsesFieldName = parser.currentName();
switch (parser.nextToken()) {
case VALUE_STRING:
responses.put(responsesFieldName, parser.text());
break;
case START_OBJECT:
responses.put(responsesFieldName, parseStringToStringMap(parser));
break;
default:
throw new IOException("Unable to parse field [" + responsesFieldName + "] in a responses object.");
}
}
break;

case RESOURCES_CREATED_FIELD:
ensureExpectedToken(XContentParser.Token.START_OBJECT, parser.currentToken(), parser);
while (parser.nextToken() != XContentParser.Token.END_OBJECT) {
String resourcesCreatedField = parser.currentName();
switch (parser.nextToken()) {
case VALUE_STRING:
resourcesCreated.put(resourcesCreatedField, parser.text());
break;
case START_OBJECT:
resourcesCreated.put(resourcesCreatedField, parseStringToStringMap(parser));
break;
default:
throw new IOException("Unable to parse field [" + resourcesCreatedField + "] in a responses object.");
}
}
break;

default:
throw new IOException("Unable to parse field [" + fieldName + "] in a template object.");
Expand All @@ -225,7 +284,18 @@ public static Template parse(XContentParser parser) throws IOException {
throw new IOException("An template object requires a name.");
}

return new Template(name, description, useCase, operations, templateVersion, compatibilityVersion, userInputs, workflows);
return new Template(
name,
description,
useCase,
operations,
templateVersion,
compatibilityVersion,
userInputs,
workflows,
responses,
resourcesCreated
);
}

/**
Expand Down Expand Up @@ -370,6 +440,22 @@ public Map<String, Workflow> workflows() {
return workflows;
}

/**
* A map of essential API responses
* @return the responses
*/
public Map<String, Object> responses() {
return responses;
}

/**
* A map of all the resources created
* @return the resources created
*/
public Map<String, Object> resourcesCreated() {
return responses;
}

@Override
public String toString() {
return "Template [name="
Expand All @@ -388,6 +474,10 @@ public String toString() {
+ userInputs
+ ", workflows="
+ workflows
+ ", responses="
+ responses
+ ", resourcesCreated="
+ resourcesCreated
+ "]";
}
}
Loading

0 comments on commit 0be7ba0

Please sign in to comment.