Skip to content

Commit

Permalink
Add global context index and indices handler
Browse files Browse the repository at this point in the history
Signed-off-by: Jackie Han <[email protected]>
  • Loading branch information
jackiehanyang committed Sep 18, 2023
1 parent 0f0b65d commit 1096170
Show file tree
Hide file tree
Showing 6 changed files with 322 additions and 0 deletions.
1 change: 1 addition & 0 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,7 @@ repositories {
dependencies {
implementation "org.opensearch:opensearch:${opensearch_version}"
implementation 'org.junit.jupiter:junit-jupiter:5.10.0'
implementation 'org.projectlombok:lombok:1.18.22'
compileOnly "com.google.guava:guava:32.1.2-jre"
api group: 'org.opensearch', name:'opensearch-ml-client', version: "${opensearch_build}"

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/
package org.opensearch.flowframework.constant;

public class CommonName {
public static final String GLOBAL_CONTEXT_INDEX_NAME = ".opensearch-flow-framework-global-context";

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/
package org.opensearch.flowframework.constant;

public class CommonValue {

public static Integer NO_SCHEMA_VERSION = 0;
public static final String META = "_meta";
public static final String SCHEMA_VERSION_FIELD = "schema_version";
public static final Integer GLOBAL_CONTEXT_INDEX_SCHEMA_VERSION = 1;
public static final String GLOBAL_CONTEXT_INDEX_MAPPING =
"{\n " +
" \"dynamic\": false,\n" +
" \"_meta\": {\n" +
" \"schema_version\": 1\n" +
" },\n" +
" \"properties\": {\n" +
" \"pipeline_id\": {\n" +
" \"type\": \"keyword\"\n"+
" },\n" +
" \"name\": {\n" +
" \"type\": \"text\",\n" +
" \"fields\": {\n" +
" \"keyword\": {\n" +
" \"type\": \"keyword\",\n" +
" \"ignore_above\": 256\n" +
" }\n" +
" }\n" +
" },\n" +
" \"description\": {\n" +
" \"type\": \"text\"\n" +
" },\n" +
" \"use_case\": {\n" +
" \"type\": \"keyword\"\n" +
" },\n" +
" \"operations\": {\n" +
" \"type\": \"keyword\"\n" +
" },\n" +
" \"version\": {\n" +
" \"type\": \"nested\",\n" +
" \"properties\": {\n" +
" \"template\": {\n" +
" \"type\": \"integer\"\n" +
" },\n" +
" \"compatibility\": {\n" +
" \"type\": \"integer\"\n" +
" }\n" +
" }\n" +
" },\n" +
" \"user_inputs\": {\n" +
" \"type\": \"nested\",\n" +
" \"properties\": {\n" +
" \"model_id\": {\n" +
" \"type\": \"keyword\"\n" +
" },\n" +
" \"input_field\": {\n" +
" \"type\": \"keyword\"\n" +
" },\n" +
" \"output_field\": {\n" +
" \"type\": \"keyword\"\n" +
" },\n" +
" \"ingest_pipeline_name\": {\n" +
" \"type\": \"keyword\"\n" +
" },\n" +
" \"index_name\": {\n" +
" \"type\": \"keyword\"\n" +
" }\n" +
" }\n" +
" },\n" +
" \"workflows\": {\n" +
" \"type\": \"text\"\n" +
" },\n" +
" \"responses\": {\n" +
" \"type\": \"text\"\n" +
" }\n" +
" \"resources_created\": {\n" +
" \"type\": \"text\"\n" +
" }\n" +
" }\n" +
"}";
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/
package org.opensearch.flowframework.exception;

public class FlowFrameworkException extends RuntimeException {
/**
* Constructor with error message.
*
* @param message message of the exception
*/
public FlowFrameworkException(String message) {
super(message);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/
package org.opensearch.flowframework.indices;

import static org.opensearch.flowframework.constant.CommonName.GLOBAL_CONTEXT_INDEX_NAME;
import static org.opensearch.flowframework.constant.CommonValue.GLOBAL_CONTEXT_INDEX_MAPPING;
import static org.opensearch.flowframework.constant.CommonValue.GLOBAL_CONTEXT_INDEX_SCHEMA_VERSION;

public enum FlowFrameworkIndex {
GLOBAL_CONTEXT(
GLOBAL_CONTEXT_INDEX_NAME,
GLOBAL_CONTEXT_INDEX_MAPPING,
GLOBAL_CONTEXT_INDEX_SCHEMA_VERSION
);

private final String indexName;
private final String mapping;
private final Integer version;

FlowFrameworkIndex(String name, String mapping, Integer version) {
this.indexName = name;
this.mapping = mapping;
this.version = version;
}

public String getIndexName() {
return indexName;
}

public String getMapping() {
return mapping;
}

public Integer getVersion() {
return version;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,157 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/
package org.opensearch.flowframework.indices;

import lombok.AccessLevel;
import lombok.RequiredArgsConstructor;
import lombok.experimental.FieldDefaults;
import lombok.extern.log4j.Log4j2;

import org.opensearch.action.admin.indices.create.CreateIndexRequest;
import org.opensearch.action.admin.indices.create.CreateIndexResponse;
import org.opensearch.action.admin.indices.mapping.put.PutMappingRequest;
import org.opensearch.action.admin.indices.settings.put.UpdateSettingsRequest;
import org.opensearch.client.Client;
import org.opensearch.cluster.metadata.IndexMetadata;
import org.opensearch.cluster.service.ClusterService;
import org.opensearch.common.util.concurrent.ThreadContext;
import org.opensearch.common.xcontent.XContentType;
import org.opensearch.core.action.ActionListener;
import org.opensearch.flowframework.exception.FlowFrameworkException;

import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.atomic.AtomicBoolean;

import static org.opensearch.flowframework.constant.CommonValue.*;

@FieldDefaults(makeFinal = true, level = AccessLevel.PRIVATE)
@RequiredArgsConstructor
@Log4j2
public class FlowFrameworkIndicesHandler {

ClusterService clusterService;
Client client;
private static final Map<String, Object> indexSettings = Map.of("index.auto_expand_replicas", "0-1");
private static final Map<String, AtomicBoolean> indexMappingUpdated = new HashMap<>();

static {
for (FlowFrameworkIndex flowFrameworkIndex : FlowFrameworkIndex.values()) {
indexMappingUpdated.put(flowFrameworkIndex.getIndexName(), new AtomicBoolean(false));
}
}

public void initGlobalContextIndexIfAbsent(ActionListener<Boolean> listener) {
initFlowFrameworkIndexIfAbsent(FlowFrameworkIndex.GLOBAL_CONTEXT, listener);
}

public void initFlowFrameworkIndexIfAbsent(FlowFrameworkIndex index, ActionListener<Boolean> listener) {
String indexName = index.getIndexName();
String mapping = index.getMapping();

try (ThreadContext.StoredContext threadContext = client.threadPool().getThreadContext().stashContext()) {
ActionListener<Boolean> internalListener = ActionListener.runBefore(listener, () -> threadContext.restore());
if (!clusterService.state().metadata().hasIndex(indexName)) {
ActionListener<CreateIndexResponse> actionListener = ActionListener.wrap(r -> {
if (r.isAcknowledged()) {
log.info("create index:{}", indexName);
internalListener.onResponse(true);
} else {
internalListener.onResponse(false);
}
}, e -> {
log.error("Failed to create index " + indexName, e);
internalListener.onFailure(e);
});
CreateIndexRequest request = new CreateIndexRequest(indexName).mapping(mapping).settings(indexSettings);
client.admin().indices().create(request, actionListener);
} else {
log.debug("index:{} is already created", indexName);
if (indexMappingUpdated.containsKey(indexName) && !indexMappingUpdated.get(indexName).get()) {
shouldUpdateIndex(indexName, index.getVersion(), ActionListener.wrap(r -> {
if (r) {
// return true if update index is needed
client
.admin()
.indices()
.putMapping(
new PutMappingRequest().indices(indexName).source(mapping, XContentType.JSON),
ActionListener.wrap(response -> {
if (response.isAcknowledged()) {
UpdateSettingsRequest updateSettingRequest = new UpdateSettingsRequest();
updateSettingRequest.indices(indexName).settings(indexSettings);
client
.admin()
.indices()
.updateSettings(updateSettingRequest, ActionListener.wrap(updateResponse -> {
if (response.isAcknowledged()) {
indexMappingUpdated.get(indexName).set(true);
internalListener.onResponse(true);
} else {
internalListener
.onFailure(new FlowFrameworkException("Failed to update index setting for: " + indexName));
}
}, exception -> {
log.error("Failed to update index setting for: " + indexName, exception);
internalListener.onFailure(exception);
}));
} else {
internalListener.onFailure(new FlowFrameworkException("Failed to update index: " + indexName));
}
}, exception -> {
log.error("Failed to update index " + indexName, exception);
internalListener.onFailure(exception);
})
);
} else {
// no need to update index if it does not exist or the version is already up-to-date.
indexMappingUpdated.get(indexName).set(true);
internalListener.onResponse(true);
}
}, e -> {
log.error("Failed to update index mapping", e);
internalListener.onFailure(e);
}));
} else {
// No need to update index if it's already updated.
internalListener.onResponse(true);
}
}
} catch (Exception e) {
log.error("Failed to init index " + indexName, e);
listener.onFailure(e);
}
}

/**
* Check if we should update index based on schema version.
* @param indexName index name
* @param newVersion new index mapping version
* @param listener action listener, if update index is needed, will pass true to its onResponse method
*/
public void shouldUpdateIndex(String indexName, Integer newVersion, ActionListener<Boolean> listener) {
IndexMetadata indexMetaData = clusterService.state().getMetadata().indices().get(indexName);
if (indexMetaData == null) {
listener.onResponse(Boolean.FALSE);
return;
}
Integer oldVersion = NO_SCHEMA_VERSION;
Map<String, Object> indexMapping = indexMetaData.mapping().getSourceAsMap();
Object meta = indexMapping.get(META);
if (meta != null && meta instanceof Map) {
@SuppressWarnings("unchecked")
Map<String, Object> metaMapping = (Map<String, Object>) meta;
Object schemaVersion = metaMapping.get(SCHEMA_VERSION_FIELD);
if (schemaVersion instanceof Integer) {
oldVersion = (Integer) schemaVersion;
}
}
listener.onResponse(newVersion > oldVersion);
}
}

0 comments on commit 1096170

Please sign in to comment.