From 10961700dc8af081850547e3871c4b257d9baeb9 Mon Sep 17 00:00:00 2001 From: Jackie Han Date: Mon, 18 Sep 2023 10:02:06 -0700 Subject: [PATCH] Add global context index and indices handler Signed-off-by: Jackie Han --- build.gradle | 1 + .../flowframework/constant/CommonName.java | 14 ++ .../flowframework/constant/CommonValue.java | 87 ++++++++++ .../exception/FlowFrameworkException.java | 20 +++ .../indices/FlowFrameworkIndex.java | 43 +++++ .../indices/FlowFrameworkIndicesHandler.java | 157 ++++++++++++++++++ 6 files changed, 322 insertions(+) create mode 100644 src/main/java/org/opensearch/flowframework/constant/CommonName.java create mode 100644 src/main/java/org/opensearch/flowframework/constant/CommonValue.java create mode 100644 src/main/java/org/opensearch/flowframework/exception/FlowFrameworkException.java create mode 100644 src/main/java/org/opensearch/flowframework/indices/FlowFrameworkIndex.java create mode 100644 src/main/java/org/opensearch/flowframework/indices/FlowFrameworkIndicesHandler.java diff --git a/build.gradle b/build.gradle index 748757484..375853f4b 100644 --- a/build.gradle +++ b/build.gradle @@ -105,6 +105,7 @@ repositories { dependencies { implementation "org.opensearch:opensearch:${opensearch_version}" implementation 'org.junit.jupiter:junit-jupiter:5.10.0' + implementation 'org.projectlombok:lombok:1.18.22' compileOnly "com.google.guava:guava:32.1.2-jre" api group: 'org.opensearch', name:'opensearch-ml-client', version: "${opensearch_build}" diff --git a/src/main/java/org/opensearch/flowframework/constant/CommonName.java b/src/main/java/org/opensearch/flowframework/constant/CommonName.java new file mode 100644 index 000000000..ee4ed9642 --- /dev/null +++ b/src/main/java/org/opensearch/flowframework/constant/CommonName.java @@ -0,0 +1,14 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ +package org.opensearch.flowframework.constant; + +public class CommonName { + public static final String GLOBAL_CONTEXT_INDEX_NAME = ".opensearch-flow-framework-global-context"; + +} diff --git a/src/main/java/org/opensearch/flowframework/constant/CommonValue.java b/src/main/java/org/opensearch/flowframework/constant/CommonValue.java new file mode 100644 index 000000000..087de302e --- /dev/null +++ b/src/main/java/org/opensearch/flowframework/constant/CommonValue.java @@ -0,0 +1,87 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ +package org.opensearch.flowframework.constant; + +public class CommonValue { + + public static Integer NO_SCHEMA_VERSION = 0; + public static final String META = "_meta"; + public static final String SCHEMA_VERSION_FIELD = "schema_version"; + public static final Integer GLOBAL_CONTEXT_INDEX_SCHEMA_VERSION = 1; + public static final String GLOBAL_CONTEXT_INDEX_MAPPING = + "{\n " + + " \"dynamic\": false,\n" + + " \"_meta\": {\n" + + " \"schema_version\": 1\n" + + " },\n" + + " \"properties\": {\n" + + " \"pipeline_id\": {\n" + + " \"type\": \"keyword\"\n"+ + " },\n" + + " \"name\": {\n" + + " \"type\": \"text\",\n" + + " \"fields\": {\n" + + " \"keyword\": {\n" + + " \"type\": \"keyword\",\n" + + " \"ignore_above\": 256\n" + + " }\n" + + " }\n" + + " },\n" + + " \"description\": {\n" + + " \"type\": \"text\"\n" + + " },\n" + + " \"use_case\": {\n" + + " \"type\": \"keyword\"\n" + + " },\n" + + " \"operations\": {\n" + + " \"type\": \"keyword\"\n" + + " },\n" + + " \"version\": {\n" + + " \"type\": \"nested\",\n" + + " \"properties\": {\n" + + " \"template\": {\n" + + " \"type\": \"integer\"\n" + + " },\n" + + " \"compatibility\": {\n" + + " \"type\": \"integer\"\n" + + " }\n" + + " }\n" + + " },\n" + + " \"user_inputs\": {\n" + + " \"type\": \"nested\",\n" + + " \"properties\": {\n" + + " \"model_id\": {\n" + + " \"type\": \"keyword\"\n" + + " },\n" + + " \"input_field\": {\n" + + " \"type\": \"keyword\"\n" + + " },\n" + + " \"output_field\": {\n" + + " \"type\": \"keyword\"\n" + + " },\n" + + " \"ingest_pipeline_name\": {\n" + + " \"type\": \"keyword\"\n" + + " },\n" + + " \"index_name\": {\n" + + " \"type\": \"keyword\"\n" + + " }\n" + + " }\n" + + " },\n" + + " \"workflows\": {\n" + + " \"type\": \"text\"\n" + + " },\n" + + " \"responses\": {\n" + + " \"type\": \"text\"\n" + + " }\n" + + " \"resources_created\": {\n" + + " \"type\": \"text\"\n" + + " }\n" + + " }\n" + + "}"; +} diff --git a/src/main/java/org/opensearch/flowframework/exception/FlowFrameworkException.java b/src/main/java/org/opensearch/flowframework/exception/FlowFrameworkException.java new file mode 100644 index 000000000..8073d5867 --- /dev/null +++ b/src/main/java/org/opensearch/flowframework/exception/FlowFrameworkException.java @@ -0,0 +1,20 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ +package org.opensearch.flowframework.exception; + +public class FlowFrameworkException extends RuntimeException { + /** + * Constructor with error message. + * + * @param message message of the exception + */ + public FlowFrameworkException(String message) { + super(message); + } +} diff --git a/src/main/java/org/opensearch/flowframework/indices/FlowFrameworkIndex.java b/src/main/java/org/opensearch/flowframework/indices/FlowFrameworkIndex.java new file mode 100644 index 000000000..aeb81a48e --- /dev/null +++ b/src/main/java/org/opensearch/flowframework/indices/FlowFrameworkIndex.java @@ -0,0 +1,43 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ +package org.opensearch.flowframework.indices; + +import static org.opensearch.flowframework.constant.CommonName.GLOBAL_CONTEXT_INDEX_NAME; +import static org.opensearch.flowframework.constant.CommonValue.GLOBAL_CONTEXT_INDEX_MAPPING; +import static org.opensearch.flowframework.constant.CommonValue.GLOBAL_CONTEXT_INDEX_SCHEMA_VERSION; + +public enum FlowFrameworkIndex { + GLOBAL_CONTEXT( + GLOBAL_CONTEXT_INDEX_NAME, + GLOBAL_CONTEXT_INDEX_MAPPING, + GLOBAL_CONTEXT_INDEX_SCHEMA_VERSION + ); + + private final String indexName; + private final String mapping; + private final Integer version; + + FlowFrameworkIndex(String name, String mapping, Integer version) { + this.indexName = name; + this.mapping = mapping; + this.version = version; + } + + public String getIndexName() { + return indexName; + } + + public String getMapping() { + return mapping; + } + + public Integer getVersion() { + return version; + } +} diff --git a/src/main/java/org/opensearch/flowframework/indices/FlowFrameworkIndicesHandler.java b/src/main/java/org/opensearch/flowframework/indices/FlowFrameworkIndicesHandler.java new file mode 100644 index 000000000..715baf312 --- /dev/null +++ b/src/main/java/org/opensearch/flowframework/indices/FlowFrameworkIndicesHandler.java @@ -0,0 +1,157 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ +package org.opensearch.flowframework.indices; + +import lombok.AccessLevel; +import lombok.RequiredArgsConstructor; +import lombok.experimental.FieldDefaults; +import lombok.extern.log4j.Log4j2; + +import org.opensearch.action.admin.indices.create.CreateIndexRequest; +import org.opensearch.action.admin.indices.create.CreateIndexResponse; +import org.opensearch.action.admin.indices.mapping.put.PutMappingRequest; +import org.opensearch.action.admin.indices.settings.put.UpdateSettingsRequest; +import org.opensearch.client.Client; +import org.opensearch.cluster.metadata.IndexMetadata; +import org.opensearch.cluster.service.ClusterService; +import org.opensearch.common.util.concurrent.ThreadContext; +import org.opensearch.common.xcontent.XContentType; +import org.opensearch.core.action.ActionListener; +import org.opensearch.flowframework.exception.FlowFrameworkException; + +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.atomic.AtomicBoolean; + +import static org.opensearch.flowframework.constant.CommonValue.*; + +@FieldDefaults(makeFinal = true, level = AccessLevel.PRIVATE) +@RequiredArgsConstructor +@Log4j2 +public class FlowFrameworkIndicesHandler { + + ClusterService clusterService; + Client client; + private static final Map indexSettings = Map.of("index.auto_expand_replicas", "0-1"); + private static final Map indexMappingUpdated = new HashMap<>(); + + static { + for (FlowFrameworkIndex flowFrameworkIndex : FlowFrameworkIndex.values()) { + indexMappingUpdated.put(flowFrameworkIndex.getIndexName(), new AtomicBoolean(false)); + } + } + + public void initGlobalContextIndexIfAbsent(ActionListener listener) { + initFlowFrameworkIndexIfAbsent(FlowFrameworkIndex.GLOBAL_CONTEXT, listener); + } + + public void initFlowFrameworkIndexIfAbsent(FlowFrameworkIndex index, ActionListener listener) { + String indexName = index.getIndexName(); + String mapping = index.getMapping(); + + try (ThreadContext.StoredContext threadContext = client.threadPool().getThreadContext().stashContext()) { + ActionListener internalListener = ActionListener.runBefore(listener, () -> threadContext.restore()); + if (!clusterService.state().metadata().hasIndex(indexName)) { + ActionListener actionListener = ActionListener.wrap(r -> { + if (r.isAcknowledged()) { + log.info("create index:{}", indexName); + internalListener.onResponse(true); + } else { + internalListener.onResponse(false); + } + }, e -> { + log.error("Failed to create index " + indexName, e); + internalListener.onFailure(e); + }); + CreateIndexRequest request = new CreateIndexRequest(indexName).mapping(mapping).settings(indexSettings); + client.admin().indices().create(request, actionListener); + } else { + log.debug("index:{} is already created", indexName); + if (indexMappingUpdated.containsKey(indexName) && !indexMappingUpdated.get(indexName).get()) { + shouldUpdateIndex(indexName, index.getVersion(), ActionListener.wrap(r -> { + if (r) { + // return true if update index is needed + client + .admin() + .indices() + .putMapping( + new PutMappingRequest().indices(indexName).source(mapping, XContentType.JSON), + ActionListener.wrap(response -> { + if (response.isAcknowledged()) { + UpdateSettingsRequest updateSettingRequest = new UpdateSettingsRequest(); + updateSettingRequest.indices(indexName).settings(indexSettings); + client + .admin() + .indices() + .updateSettings(updateSettingRequest, ActionListener.wrap(updateResponse -> { + if (response.isAcknowledged()) { + indexMappingUpdated.get(indexName).set(true); + internalListener.onResponse(true); + } else { + internalListener + .onFailure(new FlowFrameworkException("Failed to update index setting for: " + indexName)); + } + }, exception -> { + log.error("Failed to update index setting for: " + indexName, exception); + internalListener.onFailure(exception); + })); + } else { + internalListener.onFailure(new FlowFrameworkException("Failed to update index: " + indexName)); + } + }, exception -> { + log.error("Failed to update index " + indexName, exception); + internalListener.onFailure(exception); + }) + ); + } else { + // no need to update index if it does not exist or the version is already up-to-date. + indexMappingUpdated.get(indexName).set(true); + internalListener.onResponse(true); + } + }, e -> { + log.error("Failed to update index mapping", e); + internalListener.onFailure(e); + })); + } else { + // No need to update index if it's already updated. + internalListener.onResponse(true); + } + } + } catch (Exception e) { + log.error("Failed to init index " + indexName, e); + listener.onFailure(e); + } + } + + /** + * Check if we should update index based on schema version. + * @param indexName index name + * @param newVersion new index mapping version + * @param listener action listener, if update index is needed, will pass true to its onResponse method + */ + public void shouldUpdateIndex(String indexName, Integer newVersion, ActionListener listener) { + IndexMetadata indexMetaData = clusterService.state().getMetadata().indices().get(indexName); + if (indexMetaData == null) { + listener.onResponse(Boolean.FALSE); + return; + } + Integer oldVersion = NO_SCHEMA_VERSION; + Map indexMapping = indexMetaData.mapping().getSourceAsMap(); + Object meta = indexMapping.get(META); + if (meta != null && meta instanceof Map) { + @SuppressWarnings("unchecked") + Map metaMapping = (Map) meta; + Object schemaVersion = metaMapping.get(SCHEMA_VERSION_FIELD); + if (schemaVersion instanceof Integer) { + oldVersion = (Integer) schemaVersion; + } + } + listener.onResponse(newVersion > oldVersion); + } +}