From 14b9a9539a0814d2319e5e086de978f80276bfab Mon Sep 17 00:00:00 2001 From: Jackie Han Date: Thu, 21 Sep 2023 09:59:46 -0700 Subject: [PATCH] Update global context index mapping Signed-off-by: Jackie Han --- build.gradle | 1 - .../flowframework/constant/CommonValue.java | 141 +++++++++--------- .../exception/FlowFrameworkException.java | 17 +++ .../indices/FlowFrameworkIndex.java | 6 +- .../indices/FlowFrameworkIndicesHandler.java | 94 ++++++------ .../FlowFrameworkIndicesHandlerTests.java | 36 +++++ 6 files changed, 172 insertions(+), 123 deletions(-) create mode 100644 src/test/java/org/opensearch/flowframework/indices/FlowFrameworkIndicesHandlerTests.java diff --git a/build.gradle b/build.gradle index 375853f4b..748757484 100644 --- a/build.gradle +++ b/build.gradle @@ -105,7 +105,6 @@ repositories { dependencies { implementation "org.opensearch:opensearch:${opensearch_version}" implementation 'org.junit.jupiter:junit-jupiter:5.10.0' - implementation 'org.projectlombok:lombok:1.18.22' compileOnly "com.google.guava:guava:32.1.2-jre" api group: 'org.opensearch', name:'opensearch-ml-client', version: "${opensearch_build}" diff --git a/src/main/java/org/opensearch/flowframework/constant/CommonValue.java b/src/main/java/org/opensearch/flowframework/constant/CommonValue.java index 087de302e..a2d427836 100644 --- a/src/main/java/org/opensearch/flowframework/constant/CommonValue.java +++ b/src/main/java/org/opensearch/flowframework/constant/CommonValue.java @@ -14,74 +14,75 @@ public class CommonValue { public static final String META = "_meta"; public static final String SCHEMA_VERSION_FIELD = "schema_version"; public static final Integer GLOBAL_CONTEXT_INDEX_SCHEMA_VERSION = 1; - public static final String GLOBAL_CONTEXT_INDEX_MAPPING = - "{\n " + - " \"dynamic\": false,\n" + - " \"_meta\": {\n" + - " \"schema_version\": 1\n" + - " },\n" + - " \"properties\": {\n" + - " \"pipeline_id\": {\n" + - " \"type\": \"keyword\"\n"+ - " },\n" + - " \"name\": {\n" + - " \"type\": \"text\",\n" + - " \"fields\": {\n" + - " \"keyword\": {\n" + - " \"type\": \"keyword\",\n" + - " \"ignore_above\": 256\n" + - " }\n" + - " }\n" + - " },\n" + - " \"description\": {\n" + - " \"type\": \"text\"\n" + - " },\n" + - " \"use_case\": {\n" + - " \"type\": \"keyword\"\n" + - " },\n" + - " \"operations\": {\n" + - " \"type\": \"keyword\"\n" + - " },\n" + - " \"version\": {\n" + - " \"type\": \"nested\",\n" + - " \"properties\": {\n" + - " \"template\": {\n" + - " \"type\": \"integer\"\n" + - " },\n" + - " \"compatibility\": {\n" + - " \"type\": \"integer\"\n" + - " }\n" + - " }\n" + - " },\n" + - " \"user_inputs\": {\n" + - " \"type\": \"nested\",\n" + - " \"properties\": {\n" + - " \"model_id\": {\n" + - " \"type\": \"keyword\"\n" + - " },\n" + - " \"input_field\": {\n" + - " \"type\": \"keyword\"\n" + - " },\n" + - " \"output_field\": {\n" + - " \"type\": \"keyword\"\n" + - " },\n" + - " \"ingest_pipeline_name\": {\n" + - " \"type\": \"keyword\"\n" + - " },\n" + - " \"index_name\": {\n" + - " \"type\": \"keyword\"\n" + - " }\n" + - " }\n" + - " },\n" + - " \"workflows\": {\n" + - " \"type\": \"text\"\n" + - " },\n" + - " \"responses\": {\n" + - " \"type\": \"text\"\n" + - " }\n" + - " \"resources_created\": {\n" + - " \"type\": \"text\"\n" + - " }\n" + - " }\n" + - "}"; + public static final String GLOBAL_CONTEXT_INDEX_MAPPING = "{\n " + + " \"dynamic\": false,\n" + + " \"_meta\": {\n" + + " \"schema_version\": " + + GLOBAL_CONTEXT_INDEX_SCHEMA_VERSION + + "\n" + + " },\n" + + " \"properties\": {\n" + + " \"pipeline_id\": {\n" + + " \"type\": \"keyword\"\n" + + " },\n" + + " \"name\": {\n" + + " \"type\": \"text\",\n" + + " \"fields\": {\n" + + " \"keyword\": {\n" + + " \"type\": \"keyword\",\n" + + " \"ignore_above\": 256\n" + + " }\n" + + " }\n" + + " },\n" + + " \"description\": {\n" + + " \"type\": \"text\"\n" + + " },\n" + + " \"use_case\": {\n" + + " \"type\": \"keyword\"\n" + + " },\n" + + " \"operations\": {\n" + + " \"type\": \"keyword\"\n" + + " },\n" + + " \"version\": {\n" + + " \"type\": \"nested\",\n" + + " \"properties\": {\n" + + " \"template\": {\n" + + " \"type\": \"integer\"\n" + + " },\n" + + " \"compatibility\": {\n" + + " \"type\": \"integer\"\n" + + " }\n" + + " }\n" + + " },\n" + + " \"user_inputs\": {\n" + + " \"type\": \"nested\",\n" + + " \"properties\": {\n" + + " \"model_id\": {\n" + + " \"type\": \"keyword\"\n" + + " },\n" + + " \"input_field\": {\n" + + " \"type\": \"keyword\"\n" + + " },\n" + + " \"output_field\": {\n" + + " \"type\": \"keyword\"\n" + + " },\n" + + " \"ingest_pipeline_name\": {\n" + + " \"type\": \"keyword\"\n" + + " },\n" + + " \"index_name\": {\n" + + " \"type\": \"keyword\"\n" + + " }\n" + + " }\n" + + " },\n" + + " \"workflows\": {\n" + + " \"type\": \"text\"\n" + + " },\n" + + " \"responses\": {\n" + + " \"type\": \"text\"\n" + + " },\n" + + " \"resources_created\": {\n" + + " \"type\": \"text\"\n" + + " }\n" + + " }\n" + + "}"; } diff --git a/src/main/java/org/opensearch/flowframework/exception/FlowFrameworkException.java b/src/main/java/org/opensearch/flowframework/exception/FlowFrameworkException.java index 8073d5867..d976b107f 100644 --- a/src/main/java/org/opensearch/flowframework/exception/FlowFrameworkException.java +++ b/src/main/java/org/opensearch/flowframework/exception/FlowFrameworkException.java @@ -17,4 +17,21 @@ public class FlowFrameworkException extends RuntimeException { public FlowFrameworkException(String message) { super(message); } + + /** + * Constructor with specified cause. + * @param cause exception cause + */ + public FlowFrameworkException(Throwable cause) { + super(cause); + } + + /** + * Constructor with specified error message adn cause. + * @param message error message + * @param cause exception cause + */ + public FlowFrameworkException(String message, Throwable cause) { + super(message, cause); + } } diff --git a/src/main/java/org/opensearch/flowframework/indices/FlowFrameworkIndex.java b/src/main/java/org/opensearch/flowframework/indices/FlowFrameworkIndex.java index aeb81a48e..b18ab7b57 100644 --- a/src/main/java/org/opensearch/flowframework/indices/FlowFrameworkIndex.java +++ b/src/main/java/org/opensearch/flowframework/indices/FlowFrameworkIndex.java @@ -13,11 +13,7 @@ import static org.opensearch.flowframework.constant.CommonValue.GLOBAL_CONTEXT_INDEX_SCHEMA_VERSION; public enum FlowFrameworkIndex { - GLOBAL_CONTEXT( - GLOBAL_CONTEXT_INDEX_NAME, - GLOBAL_CONTEXT_INDEX_MAPPING, - GLOBAL_CONTEXT_INDEX_SCHEMA_VERSION - ); + GLOBAL_CONTEXT(GLOBAL_CONTEXT_INDEX_NAME, GLOBAL_CONTEXT_INDEX_MAPPING, GLOBAL_CONTEXT_INDEX_SCHEMA_VERSION); private final String indexName; private final String mapping; diff --git a/src/main/java/org/opensearch/flowframework/indices/FlowFrameworkIndicesHandler.java b/src/main/java/org/opensearch/flowframework/indices/FlowFrameworkIndicesHandler.java index 715baf312..55911c40c 100644 --- a/src/main/java/org/opensearch/flowframework/indices/FlowFrameworkIndicesHandler.java +++ b/src/main/java/org/opensearch/flowframework/indices/FlowFrameworkIndicesHandler.java @@ -8,11 +8,8 @@ */ package org.opensearch.flowframework.indices; -import lombok.AccessLevel; -import lombok.RequiredArgsConstructor; -import lombok.experimental.FieldDefaults; -import lombok.extern.log4j.Log4j2; - +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; import org.opensearch.action.admin.indices.create.CreateIndexRequest; import org.opensearch.action.admin.indices.create.CreateIndexResponse; import org.opensearch.action.admin.indices.mapping.put.PutMappingRequest; @@ -31,16 +28,20 @@ import static org.opensearch.flowframework.constant.CommonValue.*; -@FieldDefaults(makeFinal = true, level = AccessLevel.PRIVATE) -@RequiredArgsConstructor -@Log4j2 public class FlowFrameworkIndicesHandler { - ClusterService clusterService; - Client client; + private static final Logger logger = LogManager.getLogger(FlowFrameworkIndicesHandler.class); private static final Map indexSettings = Map.of("index.auto_expand_replicas", "0-1"); private static final Map indexMappingUpdated = new HashMap<>(); + private ClusterService clusterService; + private Client client; + + public FlowFrameworkIndicesHandler(ClusterService clusterService, Client client) { + this.clusterService = clusterService; + this.client = client; + } + static { for (FlowFrameworkIndex flowFrameworkIndex : FlowFrameworkIndex.values()) { indexMappingUpdated.put(flowFrameworkIndex.getIndexName(), new AtomicBoolean(false)); @@ -60,62 +61,61 @@ public void initFlowFrameworkIndexIfAbsent(FlowFrameworkIndex index, ActionListe if (!clusterService.state().metadata().hasIndex(indexName)) { ActionListener actionListener = ActionListener.wrap(r -> { if (r.isAcknowledged()) { - log.info("create index:{}", indexName); + logger.info("create index:{}", indexName); internalListener.onResponse(true); } else { internalListener.onResponse(false); } }, e -> { - log.error("Failed to create index " + indexName, e); + logger.error("Failed to create index " + indexName, e); internalListener.onFailure(e); }); CreateIndexRequest request = new CreateIndexRequest(indexName).mapping(mapping).settings(indexSettings); client.admin().indices().create(request, actionListener); } else { - log.debug("index:{} is already created", indexName); + logger.debug("index:{} is already created", indexName); if (indexMappingUpdated.containsKey(indexName) && !indexMappingUpdated.get(indexName).get()) { shouldUpdateIndex(indexName, index.getVersion(), ActionListener.wrap(r -> { if (r) { // return true if update index is needed - client - .admin() - .indices() - .putMapping( - new PutMappingRequest().indices(indexName).source(mapping, XContentType.JSON), - ActionListener.wrap(response -> { - if (response.isAcknowledged()) { - UpdateSettingsRequest updateSettingRequest = new UpdateSettingsRequest(); - updateSettingRequest.indices(indexName).settings(indexSettings); - client - .admin() - .indices() - .updateSettings(updateSettingRequest, ActionListener.wrap(updateResponse -> { - if (response.isAcknowledged()) { - indexMappingUpdated.get(indexName).set(true); - internalListener.onResponse(true); - } else { - internalListener - .onFailure(new FlowFrameworkException("Failed to update index setting for: " + indexName)); - } - }, exception -> { - log.error("Failed to update index setting for: " + indexName, exception); - internalListener.onFailure(exception); - })); - } else { - internalListener.onFailure(new FlowFrameworkException("Failed to update index: " + indexName)); - } - }, exception -> { - log.error("Failed to update index " + indexName, exception); - internalListener.onFailure(exception); - }) - ); + client.admin() + .indices() + .putMapping( + new PutMappingRequest().indices(indexName).source(mapping, XContentType.JSON), + ActionListener.wrap(response -> { + if (response.isAcknowledged()) { + UpdateSettingsRequest updateSettingRequest = new UpdateSettingsRequest(); + updateSettingRequest.indices(indexName).settings(indexSettings); + client.admin() + .indices() + .updateSettings(updateSettingRequest, ActionListener.wrap(updateResponse -> { + if (response.isAcknowledged()) { + indexMappingUpdated.get(indexName).set(true); + internalListener.onResponse(true); + } else { + internalListener.onFailure( + new FlowFrameworkException("Failed to update index setting for: " + indexName) + ); + } + }, exception -> { + logger.error("Failed to update index setting for: " + indexName, exception); + internalListener.onFailure(exception); + })); + } else { + internalListener.onFailure(new FlowFrameworkException("Failed to update index: " + indexName)); + } + }, exception -> { + logger.error("Failed to update index " + indexName, exception); + internalListener.onFailure(exception); + }) + ); } else { // no need to update index if it does not exist or the version is already up-to-date. indexMappingUpdated.get(indexName).set(true); internalListener.onResponse(true); } }, e -> { - log.error("Failed to update index mapping", e); + logger.error("Failed to update index mapping", e); internalListener.onFailure(e); })); } else { @@ -124,7 +124,7 @@ public void initFlowFrameworkIndexIfAbsent(FlowFrameworkIndex index, ActionListe } } } catch (Exception e) { - log.error("Failed to init index " + indexName, e); + logger.error("Failed to init index " + indexName, e); listener.onFailure(e); } } diff --git a/src/test/java/org/opensearch/flowframework/indices/FlowFrameworkIndicesHandlerTests.java b/src/test/java/org/opensearch/flowframework/indices/FlowFrameworkIndicesHandlerTests.java new file mode 100644 index 000000000..6f0a8b340 --- /dev/null +++ b/src/test/java/org/opensearch/flowframework/indices/FlowFrameworkIndicesHandlerTests.java @@ -0,0 +1,36 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ +package org.opensearch.flowframework.indices; + +import org.opensearch.client.Client; +import org.opensearch.cluster.service.ClusterService; +import org.opensearch.core.action.ActionListener; +import org.opensearch.test.OpenSearchIntegTestCase; +import org.junit.Before; + +@OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.TEST, numDataNodes = 2) +public class FlowFrameworkIndicesHandlerTests extends OpenSearchIntegTestCase { + + ClusterService clusterService; + Client client; + FlowFrameworkIndicesHandler flowFrameworkIndicesHandler; + + @Before + public void setUp() { + clusterService = clusterService(); + client = client(); + flowFrameworkIndicesHandler = new FlowFrameworkIndicesHandler(clusterService, client); + } + + public void testInitGlobalContextIndex() { + ActionListener listener = ActionListener.wrap(r -> { assertTrue(r); }, e -> { throw new RuntimeException(e); }); + flowFrameworkIndicesHandler.initGlobalContextIndexIfAbsent(listener); + } + +}