diff --git a/src/main/kotlin/org/opensearch/replication/task/index/IndexReplicationTask.kt b/src/main/kotlin/org/opensearch/replication/task/index/IndexReplicationTask.kt index e499813d..6a81bd59 100644 --- a/src/main/kotlin/org/opensearch/replication/task/index/IndexReplicationTask.kt +++ b/src/main/kotlin/org/opensearch/replication/task/index/IndexReplicationTask.kt @@ -55,6 +55,9 @@ import org.opensearch.action.admin.indices.alias.get.GetAliasesRequest import org.opensearch.action.admin.indices.delete.DeleteIndexRequest import org.opensearch.action.admin.indices.settings.get.GetSettingsRequest import org.opensearch.action.admin.indices.settings.put.UpdateSettingsRequest +import org.opensearch.action.admin.indices.mapping.get.GetMappingsRequest +import org.opensearch.action.admin.indices.mapping.put.PutMappingRequest +import org.opensearch.action.support.IndicesOptions import org.opensearch.client.Client import org.opensearch.client.Requests import org.opensearch.cluster.ClusterChangedEvent @@ -75,6 +78,7 @@ import org.opensearch.common.unit.ByteSizeValue import org.opensearch.common.xcontent.ToXContent import org.opensearch.common.xcontent.ToXContentObject import org.opensearch.common.xcontent.XContentBuilder +import org.opensearch.common.xcontent.XContentType import org.opensearch.index.Index import org.opensearch.index.IndexService import org.opensearch.index.IndexSettings @@ -88,6 +92,7 @@ import org.opensearch.persistent.PersistentTasksCustomMetadata.PersistentTask import org.opensearch.persistent.PersistentTasksNodeService import org.opensearch.persistent.PersistentTasksService import org.opensearch.replication.ReplicationException +import org.opensearch.replication.MappingNotAvailableException import org.opensearch.replication.ReplicationPlugin.Companion.REPLICATION_INDEX_TRANSLOG_PRUNING_ENABLED_SETTING import org.opensearch.rest.RestStatus import org.opensearch.tasks.TaskId @@ -100,6 +105,7 @@ import kotlin.coroutines.resume import kotlin.coroutines.resumeWithException import kotlin.coroutines.suspendCoroutine import kotlin.streams.toList +import org.opensearch.cluster.DiffableUtils open class IndexReplicationTask(id: Long, type: String, action: String, description: String, parentTask: TaskId, @@ -405,6 +411,19 @@ open class IndexReplicationTask(id: Long, type: String, action: String, descript } } + private suspend fun updateFollowerMapping(followerIndex: String,mappingSource: String?) { + + val options = IndicesOptions.strictSingleIndexNoExpandForbidClosed() + if (null == mappingSource) { + throw MappingNotAvailableException("MappingSource is not available") + } + val putMappingRequest = PutMappingRequest().indices(followerIndex).indicesOptions(options).type("_doc") + .source(mappingSource, XContentType.JSON) + val updateMappingRequest = UpdateMetadataRequest(followerIndex, UpdateMetadataRequest.Type.MAPPING, putMappingRequest) + client.suspendExecute(UpdateMetadataAction.INSTANCE, updateMappingRequest, injectSecurityContext = true) + log.debug("Mappings synced for $followerIndex") + } + private suspend fun pollForMetadata(scope: CoroutineScope) { while (scope.isActive) { try { @@ -540,6 +559,25 @@ open class IndexReplicationTask(id: Long, type: String, action: String, descript } else { metadataUpdate = null } + val options = IndicesOptions.strictSingleIndexNoExpandForbidClosed() + var gmr = GetMappingsRequest().indices(this.leaderIndex.name).indicesOptions(options) + var mappingResponse = remoteClient.suspending(remoteClient.admin().indices()::getMappings, injectSecurityContext = true)(gmr) + var leaderMappingSource = mappingResponse?.mappings()?.get(this.leaderIndex.name)?.get("_doc")?.source()?.toString() + @Suppress("UNCHECKED_CAST") + val leaderProperties = mappingResponse?.mappings()?.get(this.leaderIndex.name)?.get("_doc")?.sourceAsMap()?.toMap()?.get("properties") as? Map? + gmr = GetMappingsRequest().indices(this.followerIndexName).indicesOptions(options) + mappingResponse = client.suspending(client.admin().indices()::getMappings, injectSecurityContext = true)(gmr) + @Suppress("UNCHECKED_CAST") + val followerProperties = mappingResponse?.mappings()?.get(this.followerIndexName)?.get("_doc")?.sourceAsMap()?.toMap()?.get("properties") as? Map? + run updateMappingLoop@ { + followerProperties?.forEach { iter -> + if (leaderProperties?.containsKey(iter.key) == true && leaderProperties.getValue(iter.key).toString() != (iter.value).toString()) { + log.debug("Updating Multi-field Mapping at Follower") + updateFollowerMapping(this.followerIndexName, leaderMappingSource) + return@updateMappingLoop + } + } + } } catch (e: Exception) { log.error("Error in getting the required metadata ${e.stackTraceToString()}") diff --git a/src/test/kotlin/org/opensearch/replication/integ/rest/StartReplicationIT.kt b/src/test/kotlin/org/opensearch/replication/integ/rest/StartReplicationIT.kt index 8ef9a93d..702befeb 100644 --- a/src/test/kotlin/org/opensearch/replication/integ/rest/StartReplicationIT.kt +++ b/src/test/kotlin/org/opensearch/replication/integ/rest/StartReplicationIT.kt @@ -12,6 +12,7 @@ package org.opensearch.replication.integ.rest +import kotlinx.coroutines.delay import org.opensearch.replication.IndexUtil import org.opensearch.replication.MultiClusterAnnotations import org.opensearch.replication.MultiClusterRestTestCase @@ -74,6 +75,7 @@ import java.util.* import java.util.concurrent.TimeUnit + @MultiClusterAnnotations.ClusterConfigurations( MultiClusterAnnotations.ClusterConfiguration(clusterName = LEADER), MultiClusterAnnotations.ClusterConfiguration(clusterName = FOLLOWER) @@ -984,6 +986,79 @@ class StartReplicationIT: MultiClusterRestTestCase() { waitForRestore = true) } + fun `test that follower index mapping updates when leader index gets multi-field mapping`() { + val followerClient = getClientForCluster(FOLLOWER) + val leaderClient = getClientForCluster(LEADER) + createConnectionBetweenClusters(FOLLOWER, LEADER) + val createIndexResponse = leaderClient.indices().create(CreateIndexRequest(leaderIndexName), RequestOptions.DEFAULT) + assertThat(createIndexResponse.isAcknowledged).isTrue() + var putMappingRequest = PutMappingRequest(leaderIndexName) + putMappingRequest.source("{\"properties\":{\"field1\":{\"type\":\"text\"}}}", XContentType.JSON) + leaderClient.indices().putMapping(putMappingRequest, RequestOptions.DEFAULT) + val sourceMap = mapOf("field1" to randomAlphaOfLength(5)) + leaderClient.index(IndexRequest(leaderIndexName).id("1").source(sourceMap), RequestOptions.DEFAULT) + followerClient.startReplication(StartReplicationRequest("source", leaderIndexName, followerIndexName), + waitForRestore = true) + assertBusy { + assertThat(followerClient.indices() + .exists(GetIndexRequest(followerIndexName), RequestOptions.DEFAULT)) + .isEqualTo(true) + } + Assert.assertEquals( + leaderClient.indices().getMapping(GetMappingsRequest().indices(leaderIndexName), RequestOptions.DEFAULT) + .mappings()[leaderIndexName], + followerClient.indices().getMapping(GetMappingsRequest().indices(followerIndexName), RequestOptions.DEFAULT) + .mappings()[followerIndexName] + ) + putMappingRequest = PutMappingRequest(leaderIndexName) + putMappingRequest.source("{\"properties\":{\"field1\":{\"type\":\"text\",\"fields\":{\"field2\":{\"type\":\"text\",\"analyzer\":\"standard\"},\"field3\":{\"type\":\"text\",\"analyzer\":\"standard\"}}}}}",XContentType.JSON) + leaderClient.indices().putMapping(putMappingRequest, RequestOptions.DEFAULT) + val leaderMappings = leaderClient.indices().getMapping(GetMappingsRequest().indices(leaderIndexName), RequestOptions.DEFAULT) + .mappings()[leaderIndexName] + TimeUnit.MINUTES.sleep(2) + Assert.assertEquals( + leaderMappings, + followerClient.indices().getMapping(GetMappingsRequest().indices(followerIndexName), RequestOptions.DEFAULT) + .mappings()[followerIndexName] + ) + } + + fun `test that follower index mapping does not update when only new fields are added but not respective docs in leader index`() { + val followerClient = getClientForCluster(FOLLOWER) + val leaderClient = getClientForCluster(LEADER) + createConnectionBetweenClusters(FOLLOWER, LEADER) + val createIndexResponse = leaderClient.indices().create(CreateIndexRequest(leaderIndexName), RequestOptions.DEFAULT) + assertThat(createIndexResponse.isAcknowledged).isTrue() + var putMappingRequest = PutMappingRequest(leaderIndexName) + putMappingRequest.source("{\"properties\":{\"name\":{\"type\":\"text\"}}}", XContentType.JSON) + leaderClient.indices().putMapping(putMappingRequest, RequestOptions.DEFAULT) + val sourceMap = mapOf("name" to randomAlphaOfLength(5)) + leaderClient.index(IndexRequest(leaderIndexName).id("1").source(sourceMap), RequestOptions.DEFAULT) + followerClient.startReplication(StartReplicationRequest("source", leaderIndexName, followerIndexName), + waitForRestore = true) + assertBusy { + assertThat(followerClient.indices() + .exists(GetIndexRequest(followerIndexName), RequestOptions.DEFAULT)) + .isEqualTo(true) + } + Assert.assertEquals( + leaderClient.indices().getMapping(GetMappingsRequest().indices(leaderIndexName), RequestOptions.DEFAULT) + .mappings()[leaderIndexName], + followerClient.indices().getMapping(GetMappingsRequest().indices(followerIndexName), RequestOptions.DEFAULT) + .mappings()[followerIndexName] + ) + putMappingRequest = PutMappingRequest(leaderIndexName) + putMappingRequest.source("{\"properties\":{\"name\":{\"type\":\"text\"},\"age\":{\"type\":\"integer\"}}}",XContentType.JSON) + leaderClient.indices().putMapping(putMappingRequest, RequestOptions.DEFAULT) + val leaderMappings = leaderClient.indices().getMapping(GetMappingsRequest().indices(leaderIndexName), RequestOptions.DEFAULT) + .mappings()[leaderIndexName] + TimeUnit.MINUTES.sleep(2) + Assert.assertNotEquals( + leaderMappings, + followerClient.indices().getMapping(GetMappingsRequest().indices(followerIndexName), RequestOptions.DEFAULT) + .mappings()[followerIndexName] + ) + } private fun assertValidationFailure(client: RestHighLevelClient, leader: String, follower: String, errrorMsg: String) { assertThatThrownBy { client.startReplication(StartReplicationRequest("source", leader, follower))