Skip to content

Commit

Permalink
Update global context index mapping
Browse files Browse the repository at this point in the history
Signed-off-by: Jackie Han <[email protected]>
  • Loading branch information
jackiehanyang committed Sep 21, 2023
1 parent 1096170 commit 14b9a95
Show file tree
Hide file tree
Showing 6 changed files with 172 additions and 123 deletions.
1 change: 0 additions & 1 deletion build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,6 @@ repositories {
dependencies {
implementation "org.opensearch:opensearch:${opensearch_version}"
implementation 'org.junit.jupiter:junit-jupiter:5.10.0'
implementation 'org.projectlombok:lombok:1.18.22'
compileOnly "com.google.guava:guava:32.1.2-jre"
api group: 'org.opensearch', name:'opensearch-ml-client', version: "${opensearch_build}"

Expand Down
141 changes: 71 additions & 70 deletions src/main/java/org/opensearch/flowframework/constant/CommonValue.java
Original file line number Diff line number Diff line change
Expand Up @@ -14,74 +14,75 @@ public class CommonValue {
public static final String META = "_meta";
public static final String SCHEMA_VERSION_FIELD = "schema_version";
public static final Integer GLOBAL_CONTEXT_INDEX_SCHEMA_VERSION = 1;
public static final String GLOBAL_CONTEXT_INDEX_MAPPING =
"{\n " +
" \"dynamic\": false,\n" +
" \"_meta\": {\n" +
" \"schema_version\": 1\n" +
" },\n" +
" \"properties\": {\n" +
" \"pipeline_id\": {\n" +
" \"type\": \"keyword\"\n"+
" },\n" +
" \"name\": {\n" +
" \"type\": \"text\",\n" +
" \"fields\": {\n" +
" \"keyword\": {\n" +
" \"type\": \"keyword\",\n" +
" \"ignore_above\": 256\n" +
" }\n" +
" }\n" +
" },\n" +
" \"description\": {\n" +
" \"type\": \"text\"\n" +
" },\n" +
" \"use_case\": {\n" +
" \"type\": \"keyword\"\n" +
" },\n" +
" \"operations\": {\n" +
" \"type\": \"keyword\"\n" +
" },\n" +
" \"version\": {\n" +
" \"type\": \"nested\",\n" +
" \"properties\": {\n" +
" \"template\": {\n" +
" \"type\": \"integer\"\n" +
" },\n" +
" \"compatibility\": {\n" +
" \"type\": \"integer\"\n" +
" }\n" +
" }\n" +
" },\n" +
" \"user_inputs\": {\n" +
" \"type\": \"nested\",\n" +
" \"properties\": {\n" +
" \"model_id\": {\n" +
" \"type\": \"keyword\"\n" +
" },\n" +
" \"input_field\": {\n" +
" \"type\": \"keyword\"\n" +
" },\n" +
" \"output_field\": {\n" +
" \"type\": \"keyword\"\n" +
" },\n" +
" \"ingest_pipeline_name\": {\n" +
" \"type\": \"keyword\"\n" +
" },\n" +
" \"index_name\": {\n" +
" \"type\": \"keyword\"\n" +
" }\n" +
" }\n" +
" },\n" +
" \"workflows\": {\n" +
" \"type\": \"text\"\n" +
" },\n" +
" \"responses\": {\n" +
" \"type\": \"text\"\n" +
" }\n" +
" \"resources_created\": {\n" +
" \"type\": \"text\"\n" +
" }\n" +
" }\n" +
"}";
public static final String GLOBAL_CONTEXT_INDEX_MAPPING = "{\n "
+ " \"dynamic\": false,\n"
+ " \"_meta\": {\n"
+ " \"schema_version\": "
+ GLOBAL_CONTEXT_INDEX_SCHEMA_VERSION
+ "\n"
+ " },\n"
+ " \"properties\": {\n"
+ " \"pipeline_id\": {\n"
+ " \"type\": \"keyword\"\n"
+ " },\n"
+ " \"name\": {\n"
+ " \"type\": \"text\",\n"
+ " \"fields\": {\n"
+ " \"keyword\": {\n"
+ " \"type\": \"keyword\",\n"
+ " \"ignore_above\": 256\n"
+ " }\n"
+ " }\n"
+ " },\n"
+ " \"description\": {\n"
+ " \"type\": \"text\"\n"
+ " },\n"
+ " \"use_case\": {\n"
+ " \"type\": \"keyword\"\n"
+ " },\n"
+ " \"operations\": {\n"
+ " \"type\": \"keyword\"\n"
+ " },\n"
+ " \"version\": {\n"
+ " \"type\": \"nested\",\n"
+ " \"properties\": {\n"
+ " \"template\": {\n"
+ " \"type\": \"integer\"\n"
+ " },\n"
+ " \"compatibility\": {\n"
+ " \"type\": \"integer\"\n"
+ " }\n"
+ " }\n"
+ " },\n"
+ " \"user_inputs\": {\n"
+ " \"type\": \"nested\",\n"
+ " \"properties\": {\n"
+ " \"model_id\": {\n"
+ " \"type\": \"keyword\"\n"
+ " },\n"
+ " \"input_field\": {\n"
+ " \"type\": \"keyword\"\n"
+ " },\n"
+ " \"output_field\": {\n"
+ " \"type\": \"keyword\"\n"
+ " },\n"
+ " \"ingest_pipeline_name\": {\n"
+ " \"type\": \"keyword\"\n"
+ " },\n"
+ " \"index_name\": {\n"
+ " \"type\": \"keyword\"\n"
+ " }\n"
+ " }\n"
+ " },\n"
+ " \"workflows\": {\n"
+ " \"type\": \"text\"\n"
+ " },\n"
+ " \"responses\": {\n"
+ " \"type\": \"text\"\n"
+ " },\n"
+ " \"resources_created\": {\n"
+ " \"type\": \"text\"\n"
+ " }\n"
+ " }\n"
+ "}";
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,4 +17,21 @@ public class FlowFrameworkException extends RuntimeException {
public FlowFrameworkException(String message) {
super(message);
}

/**
* Constructor with specified cause.
* @param cause exception cause
*/
public FlowFrameworkException(Throwable cause) {
super(cause);
}

/**
* Constructor with specified error message adn cause.
* @param message error message
* @param cause exception cause
*/
public FlowFrameworkException(String message, Throwable cause) {
super(message, cause);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,7 @@
import static org.opensearch.flowframework.constant.CommonValue.GLOBAL_CONTEXT_INDEX_SCHEMA_VERSION;

public enum FlowFrameworkIndex {
GLOBAL_CONTEXT(
GLOBAL_CONTEXT_INDEX_NAME,
GLOBAL_CONTEXT_INDEX_MAPPING,
GLOBAL_CONTEXT_INDEX_SCHEMA_VERSION
);
GLOBAL_CONTEXT(GLOBAL_CONTEXT_INDEX_NAME, GLOBAL_CONTEXT_INDEX_MAPPING, GLOBAL_CONTEXT_INDEX_SCHEMA_VERSION);

private final String indexName;
private final String mapping;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,8 @@
*/
package org.opensearch.flowframework.indices;

import lombok.AccessLevel;
import lombok.RequiredArgsConstructor;
import lombok.experimental.FieldDefaults;
import lombok.extern.log4j.Log4j2;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.opensearch.action.admin.indices.create.CreateIndexRequest;
import org.opensearch.action.admin.indices.create.CreateIndexResponse;
import org.opensearch.action.admin.indices.mapping.put.PutMappingRequest;
Expand All @@ -31,16 +28,20 @@

import static org.opensearch.flowframework.constant.CommonValue.*;

@FieldDefaults(makeFinal = true, level = AccessLevel.PRIVATE)
@RequiredArgsConstructor
@Log4j2
public class FlowFrameworkIndicesHandler {

ClusterService clusterService;
Client client;
private static final Logger logger = LogManager.getLogger(FlowFrameworkIndicesHandler.class);
private static final Map<String, Object> indexSettings = Map.of("index.auto_expand_replicas", "0-1");
private static final Map<String, AtomicBoolean> indexMappingUpdated = new HashMap<>();

private ClusterService clusterService;
private Client client;

public FlowFrameworkIndicesHandler(ClusterService clusterService, Client client) {
this.clusterService = clusterService;
this.client = client;
}

static {
for (FlowFrameworkIndex flowFrameworkIndex : FlowFrameworkIndex.values()) {
indexMappingUpdated.put(flowFrameworkIndex.getIndexName(), new AtomicBoolean(false));
Expand All @@ -60,62 +61,61 @@ public void initFlowFrameworkIndexIfAbsent(FlowFrameworkIndex index, ActionListe
if (!clusterService.state().metadata().hasIndex(indexName)) {
ActionListener<CreateIndexResponse> actionListener = ActionListener.wrap(r -> {
if (r.isAcknowledged()) {
log.info("create index:{}", indexName);
logger.info("create index:{}", indexName);
internalListener.onResponse(true);
} else {
internalListener.onResponse(false);
}
}, e -> {
log.error("Failed to create index " + indexName, e);
logger.error("Failed to create index " + indexName, e);
internalListener.onFailure(e);
});
CreateIndexRequest request = new CreateIndexRequest(indexName).mapping(mapping).settings(indexSettings);
client.admin().indices().create(request, actionListener);
} else {
log.debug("index:{} is already created", indexName);
logger.debug("index:{} is already created", indexName);
if (indexMappingUpdated.containsKey(indexName) && !indexMappingUpdated.get(indexName).get()) {
shouldUpdateIndex(indexName, index.getVersion(), ActionListener.wrap(r -> {
if (r) {
// return true if update index is needed
client
.admin()
.indices()
.putMapping(
new PutMappingRequest().indices(indexName).source(mapping, XContentType.JSON),
ActionListener.wrap(response -> {
if (response.isAcknowledged()) {
UpdateSettingsRequest updateSettingRequest = new UpdateSettingsRequest();
updateSettingRequest.indices(indexName).settings(indexSettings);
client
.admin()
.indices()
.updateSettings(updateSettingRequest, ActionListener.wrap(updateResponse -> {
if (response.isAcknowledged()) {
indexMappingUpdated.get(indexName).set(true);
internalListener.onResponse(true);
} else {
internalListener
.onFailure(new FlowFrameworkException("Failed to update index setting for: " + indexName));
}
}, exception -> {
log.error("Failed to update index setting for: " + indexName, exception);
internalListener.onFailure(exception);
}));
} else {
internalListener.onFailure(new FlowFrameworkException("Failed to update index: " + indexName));
}
}, exception -> {
log.error("Failed to update index " + indexName, exception);
internalListener.onFailure(exception);
})
);
client.admin()
.indices()
.putMapping(
new PutMappingRequest().indices(indexName).source(mapping, XContentType.JSON),
ActionListener.wrap(response -> {
if (response.isAcknowledged()) {
UpdateSettingsRequest updateSettingRequest = new UpdateSettingsRequest();
updateSettingRequest.indices(indexName).settings(indexSettings);
client.admin()
.indices()
.updateSettings(updateSettingRequest, ActionListener.wrap(updateResponse -> {
if (response.isAcknowledged()) {
indexMappingUpdated.get(indexName).set(true);
internalListener.onResponse(true);
} else {
internalListener.onFailure(
new FlowFrameworkException("Failed to update index setting for: " + indexName)
);
}
}, exception -> {
logger.error("Failed to update index setting for: " + indexName, exception);
internalListener.onFailure(exception);
}));
} else {
internalListener.onFailure(new FlowFrameworkException("Failed to update index: " + indexName));
}
}, exception -> {
logger.error("Failed to update index " + indexName, exception);
internalListener.onFailure(exception);
})
);
} else {
// no need to update index if it does not exist or the version is already up-to-date.
indexMappingUpdated.get(indexName).set(true);
internalListener.onResponse(true);
}
}, e -> {
log.error("Failed to update index mapping", e);
logger.error("Failed to update index mapping", e);
internalListener.onFailure(e);
}));
} else {
Expand All @@ -124,7 +124,7 @@ public void initFlowFrameworkIndexIfAbsent(FlowFrameworkIndex index, ActionListe
}
}
} catch (Exception e) {
log.error("Failed to init index " + indexName, e);
logger.error("Failed to init index " + indexName, e);
listener.onFailure(e);
}
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/
package org.opensearch.flowframework.indices;

import org.opensearch.client.Client;
import org.opensearch.cluster.service.ClusterService;
import org.opensearch.core.action.ActionListener;
import org.opensearch.test.OpenSearchIntegTestCase;
import org.junit.Before;

@OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.TEST, numDataNodes = 2)
public class FlowFrameworkIndicesHandlerTests extends OpenSearchIntegTestCase {

ClusterService clusterService;
Client client;
FlowFrameworkIndicesHandler flowFrameworkIndicesHandler;

@Before
public void setUp() {
clusterService = clusterService();
client = client();
flowFrameworkIndicesHandler = new FlowFrameworkIndicesHandler(clusterService, client);
}

public void testInitGlobalContextIndex() {
ActionListener<Boolean> listener = ActionListener.wrap(r -> { assertTrue(r); }, e -> { throw new RuntimeException(e); });
flowFrameworkIndicesHandler.initGlobalContextIndexIfAbsent(listener);
}

}

0 comments on commit 14b9a95

Please sign in to comment.