From 1548a84c428177ee508587f90a00e01dafd3ab1a Mon Sep 17 00:00:00 2001 From: sricharanvuppu <113983630+sricharanvuppu@users.noreply.github.com> Date: Tue, 10 Jan 2023 14:50:59 +0530 Subject: [PATCH] Updating multi-field mapping at follower (#671) (#680) * Updating multi-field mapping at follower Signed-off-by: sricharanvuppu Signed-off-by: sricharanvuppu (cherry picked from commit 4d12d6cfb17e2dbac02fdee815aff9bc95062b0a) (cherry picked from commit 475d8da7456fca22c0f022027b8802a14ddb0150) Signed-off-by: sricharanvuppu --- .../task/index/IndexReplicationTask.kt | 34 +++++++++ .../integ/rest/StartReplicationIT.kt | 76 +++++++++++++++++++ 2 files changed, 110 insertions(+) diff --git a/src/main/kotlin/org/opensearch/replication/task/index/IndexReplicationTask.kt b/src/main/kotlin/org/opensearch/replication/task/index/IndexReplicationTask.kt index 906312ac..b4662aa3 100644 --- a/src/main/kotlin/org/opensearch/replication/task/index/IndexReplicationTask.kt +++ b/src/main/kotlin/org/opensearch/replication/task/index/IndexReplicationTask.kt @@ -55,6 +55,9 @@ import org.opensearch.action.admin.indices.alias.get.GetAliasesRequest import org.opensearch.action.admin.indices.delete.DeleteIndexRequest import org.opensearch.action.admin.indices.settings.get.GetSettingsRequest import org.opensearch.action.admin.indices.settings.put.UpdateSettingsRequest +import org.opensearch.action.admin.indices.mapping.get.GetMappingsRequest +import org.opensearch.action.admin.indices.mapping.put.PutMappingRequest +import org.opensearch.action.support.IndicesOptions import org.opensearch.client.Client import org.opensearch.client.Requests import org.opensearch.cluster.ClusterChangedEvent @@ -75,6 +78,7 @@ import org.opensearch.common.unit.ByteSizeValue import org.opensearch.common.xcontent.ToXContent import org.opensearch.common.xcontent.ToXContentObject import org.opensearch.common.xcontent.XContentBuilder +import org.opensearch.common.xcontent.XContentType import org.opensearch.index.Index import org.opensearch.index.IndexService import org.opensearch.index.IndexSettings @@ -88,6 +92,7 @@ import org.opensearch.persistent.PersistentTasksCustomMetadata.PersistentTask import org.opensearch.persistent.PersistentTasksNodeService import org.opensearch.persistent.PersistentTasksService import org.opensearch.replication.ReplicationException +import org.opensearch.replication.MappingNotAvailableException import org.opensearch.replication.ReplicationPlugin.Companion.REPLICATION_INDEX_TRANSLOG_PRUNING_ENABLED_SETTING import org.opensearch.rest.RestStatus import org.opensearch.tasks.TaskId @@ -100,6 +105,7 @@ import kotlin.coroutines.resume import kotlin.coroutines.resumeWithException import kotlin.coroutines.suspendCoroutine import kotlin.streams.toList +import org.opensearch.cluster.DiffableUtils open class IndexReplicationTask(id: Long, type: String, action: String, description: String, parentTask: TaskId, @@ -395,6 +401,19 @@ open class IndexReplicationTask(id: Long, type: String, action: String, descript } } + private suspend fun UpdateFollowereMapping(followerIndex: String,mappingSource: String) { + + val options = IndicesOptions.strictSingleIndexNoExpandForbidClosed() + if (null == mappingSource) { + throw MappingNotAvailableException("MappingSource is not available") + } + val putMappingRequest = PutMappingRequest().indices(followerIndex).indicesOptions(options) + .source(mappingSource, XContentType.JSON) + val updateMappingRequest = UpdateMetadataRequest(followerIndex, UpdateMetadataRequest.Type.MAPPING, putMappingRequest) + client.suspendExecute(UpdateMetadataAction.INSTANCE, updateMappingRequest, injectSecurityContext = true) + log.debug("Mappings synced for $followerIndex") + } + private suspend fun pollForMetadata(scope: CoroutineScope) { while (scope.isActive) { try { @@ -535,6 +554,21 @@ open class IndexReplicationTask(id: Long, type: String, action: String, descript } else { metadataUpdate = null } + val options = IndicesOptions.strictSingleIndexNoExpandForbidClosed() + var gmr = GetMappingsRequest().indices(this.leaderIndex.name).indicesOptions(options) + var mappingResponse = remoteClient.suspending(remoteClient.admin().indices()::getMappings, injectSecurityContext = true)(gmr) + var leaderMappingSource = mappingResponse.mappings.get(this.leaderIndex.name).source().toString() + val leaderProperties = mappingResponse.mappings().get(this.leaderIndex.name).sourceAsMap().toMap().get("properties") as Map + gmr = GetMappingsRequest().indices(this.followerIndexName).indicesOptions(options) + mappingResponse = client.suspending(client.admin().indices()::getMappings, injectSecurityContext = true)(gmr) + val followerProperties = mappingResponse.mappings().get(this.followerIndexName).sourceAsMap().toMap().get("properties") as Map + for(iter in followerProperties) { + if(leaderProperties.containsKey(iter.key) && leaderProperties.getValue(iter.key).toString()!=(iter.value).toString()){ + log.info("Updating Multi-field Mapping at Follower") + UpdateFollowereMapping(this.followerIndexName,leaderMappingSource) + break; + } + } } catch (e: Exception) { log.error("Error in getting the required metadata ${e.stackTraceToString()}") diff --git a/src/test/kotlin/org/opensearch/replication/integ/rest/StartReplicationIT.kt b/src/test/kotlin/org/opensearch/replication/integ/rest/StartReplicationIT.kt index 6946303c..01f280e0 100644 --- a/src/test/kotlin/org/opensearch/replication/integ/rest/StartReplicationIT.kt +++ b/src/test/kotlin/org/opensearch/replication/integ/rest/StartReplicationIT.kt @@ -12,6 +12,7 @@ package org.opensearch.replication.integ.rest +import kotlinx.coroutines.delay import org.opensearch.replication.IndexUtil import org.opensearch.replication.MultiClusterAnnotations import org.opensearch.replication.MultiClusterRestTestCase @@ -77,6 +78,7 @@ import java.util.* import java.util.concurrent.TimeUnit + @MultiClusterAnnotations.ClusterConfigurations( MultiClusterAnnotations.ClusterConfiguration(clusterName = LEADER), MultiClusterAnnotations.ClusterConfiguration(clusterName = FOLLOWER) @@ -1009,6 +1011,80 @@ class StartReplicationIT: MultiClusterRestTestCase() { .hasMessageContaining("Primary shards in the Index[source:${leaderIndexName}] are not active") } + fun `test that follower index mapping updates when leader index gets multi-field mapping`() { + val followerClient = getClientForCluster(FOLLOWER) + val leaderClient = getClientForCluster(LEADER) + createConnectionBetweenClusters(FOLLOWER, LEADER) + val createIndexResponse = leaderClient.indices().create(CreateIndexRequest(leaderIndexName), RequestOptions.DEFAULT) + assertThat(createIndexResponse.isAcknowledged).isTrue() + var putMappingRequest = PutMappingRequest(leaderIndexName) + putMappingRequest.source("{\"properties\":{\"field1\":{\"type\":\"text\"}}}", XContentType.JSON) + leaderClient.indices().putMapping(putMappingRequest, RequestOptions.DEFAULT) + val sourceMap = mapOf("field1" to randomAlphaOfLength(5)) + leaderClient.index(IndexRequest(leaderIndexName).id("1").source(sourceMap), RequestOptions.DEFAULT) + followerClient.startReplication(StartReplicationRequest("source", leaderIndexName, followerIndexName), + waitForRestore = true) + assertBusy { + assertThat(followerClient.indices() + .exists(GetIndexRequest(followerIndexName), RequestOptions.DEFAULT)) + .isEqualTo(true) + } + Assert.assertEquals( + leaderClient.indices().getMapping(GetMappingsRequest().indices(leaderIndexName), RequestOptions.DEFAULT) + .mappings()[leaderIndexName], + followerClient.indices().getMapping(GetMappingsRequest().indices(followerIndexName), RequestOptions.DEFAULT) + .mappings()[followerIndexName] + ) + putMappingRequest = PutMappingRequest(leaderIndexName) + putMappingRequest.source("{\"properties\":{\"field1\":{\"type\":\"text\",\"fields\":{\"field2\":{\"type\":\"text\",\"analyzer\":\"standard\"},\"field3\":{\"type\":\"text\",\"analyzer\":\"standard\"}}}}}",XContentType.JSON) + leaderClient.indices().putMapping(putMappingRequest, RequestOptions.DEFAULT) + val leaderMappings = leaderClient.indices().getMapping(GetMappingsRequest().indices(leaderIndexName), RequestOptions.DEFAULT) + .mappings()[leaderIndexName] + TimeUnit.MINUTES.sleep(2) + Assert.assertEquals( + leaderMappings, + followerClient.indices().getMapping(GetMappingsRequest().indices(followerIndexName), RequestOptions.DEFAULT) + .mappings()[followerIndexName] + ) + } + + fun `test that follower index mapping does not update when only new fields are added but not respective docs in leader index`() { + val followerClient = getClientForCluster(FOLLOWER) + val leaderClient = getClientForCluster(LEADER) + createConnectionBetweenClusters(FOLLOWER, LEADER) + val createIndexResponse = leaderClient.indices().create(CreateIndexRequest(leaderIndexName), RequestOptions.DEFAULT) + assertThat(createIndexResponse.isAcknowledged).isTrue() + var putMappingRequest = PutMappingRequest(leaderIndexName) + putMappingRequest.source("{\"properties\":{\"name\":{\"type\":\"text\"}}}", XContentType.JSON) + leaderClient.indices().putMapping(putMappingRequest, RequestOptions.DEFAULT) + val sourceMap = mapOf("name" to randomAlphaOfLength(5)) + leaderClient.index(IndexRequest(leaderIndexName).id("1").source(sourceMap), RequestOptions.DEFAULT) + followerClient.startReplication(StartReplicationRequest("source", leaderIndexName, followerIndexName), + waitForRestore = true) + assertBusy { + assertThat(followerClient.indices() + .exists(GetIndexRequest(followerIndexName), RequestOptions.DEFAULT)) + .isEqualTo(true) + } + Assert.assertEquals( + leaderClient.indices().getMapping(GetMappingsRequest().indices(leaderIndexName), RequestOptions.DEFAULT) + .mappings()[leaderIndexName], + followerClient.indices().getMapping(GetMappingsRequest().indices(followerIndexName), RequestOptions.DEFAULT) + .mappings()[followerIndexName] + ) + putMappingRequest = PutMappingRequest(leaderIndexName) + putMappingRequest.source("{\"properties\":{\"name\":{\"type\":\"text\"},\"age\":{\"type\":\"integer\"}}}",XContentType.JSON) + leaderClient.indices().putMapping(putMappingRequest, RequestOptions.DEFAULT) + val leaderMappings = leaderClient.indices().getMapping(GetMappingsRequest().indices(leaderIndexName), RequestOptions.DEFAULT) + .mappings()[leaderIndexName] + TimeUnit.MINUTES.sleep(2) + Assert.assertNotEquals( + leaderMappings, + followerClient.indices().getMapping(GetMappingsRequest().indices(followerIndexName), RequestOptions.DEFAULT) + .mappings()[followerIndexName] + ) + } + private fun excludeAllClusterNodes(clusterName: String) { val transientSettingsRequest = Request("PUT", "_cluster/settings") // Get IPs directly from the cluster to handle all cases - single node cluster, multi node cluster and remote test cluster.