Skip to content

Commit

Permalink
Merge c6b3077 into 316c3a1
Browse files Browse the repository at this point in the history
  • Loading branch information
sricharanvuppu authored Mar 23, 2023
2 parents 316c3a1 + c6b3077 commit 29ac04b
Show file tree
Hide file tree
Showing 2 changed files with 114 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,9 @@ import org.opensearch.action.admin.indices.alias.get.GetAliasesRequest
import org.opensearch.action.admin.indices.delete.DeleteIndexRequest
import org.opensearch.action.admin.indices.settings.get.GetSettingsRequest
import org.opensearch.action.admin.indices.settings.put.UpdateSettingsRequest
import org.opensearch.action.admin.indices.mapping.get.GetMappingsRequest
import org.opensearch.action.admin.indices.mapping.put.PutMappingRequest
import org.opensearch.action.support.IndicesOptions
import org.opensearch.client.Client
import org.opensearch.client.Requests
import org.opensearch.cluster.ClusterChangedEvent
Expand All @@ -75,6 +78,7 @@ import org.opensearch.common.unit.ByteSizeValue
import org.opensearch.common.xcontent.ToXContent
import org.opensearch.common.xcontent.ToXContentObject
import org.opensearch.common.xcontent.XContentBuilder
import org.opensearch.common.xcontent.XContentType
import org.opensearch.index.Index
import org.opensearch.index.IndexService
import org.opensearch.index.IndexSettings
Expand All @@ -88,6 +92,7 @@ import org.opensearch.persistent.PersistentTasksCustomMetadata.PersistentTask
import org.opensearch.persistent.PersistentTasksNodeService
import org.opensearch.persistent.PersistentTasksService
import org.opensearch.replication.ReplicationException
import org.opensearch.replication.MappingNotAvailableException
import org.opensearch.replication.ReplicationPlugin.Companion.REPLICATION_INDEX_TRANSLOG_PRUNING_ENABLED_SETTING
import org.opensearch.rest.RestStatus
import org.opensearch.tasks.TaskId
Expand All @@ -100,6 +105,7 @@ import kotlin.coroutines.resume
import kotlin.coroutines.resumeWithException
import kotlin.coroutines.suspendCoroutine
import kotlin.streams.toList
import org.opensearch.cluster.DiffableUtils

open class IndexReplicationTask(id: Long, type: String, action: String, description: String,
parentTask: TaskId,
Expand Down Expand Up @@ -395,6 +401,19 @@ open class IndexReplicationTask(id: Long, type: String, action: String, descript
}
}

private suspend fun updateFollowerMapping(followerIndex: String,mappingSource: String?) {

val options = IndicesOptions.strictSingleIndexNoExpandForbidClosed()
if (null == mappingSource) {
throw MappingNotAvailableException("MappingSource is not available")
}
val putMappingRequest = PutMappingRequest().indices(followerIndex).indicesOptions(options).type("_doc")
.source(mappingSource, XContentType.JSON)
val updateMappingRequest = UpdateMetadataRequest(followerIndex, UpdateMetadataRequest.Type.MAPPING, putMappingRequest)
client.suspendExecute(UpdateMetadataAction.INSTANCE, updateMappingRequest, injectSecurityContext = true)
log.debug("Mappings synced for $followerIndex")
}

private suspend fun pollForMetadata(scope: CoroutineScope) {
while (scope.isActive) {
try {
Expand Down Expand Up @@ -535,6 +554,25 @@ open class IndexReplicationTask(id: Long, type: String, action: String, descript
} else {
metadataUpdate = null
}
val options = IndicesOptions.strictSingleIndexNoExpandForbidClosed()
var gmr = GetMappingsRequest().indices(this.leaderIndex.name).indicesOptions(options)
var mappingResponse = remoteClient.suspending(remoteClient.admin().indices()::getMappings, injectSecurityContext = true)(gmr)
var leaderMappingSource = mappingResponse?.mappings()?.get(this.leaderIndex.name)?.get("_doc")?.source()?.toString()
@Suppress("UNCHECKED_CAST")
val leaderProperties = mappingResponse?.mappings()?.get(this.leaderIndex.name)?.get("_doc")?.sourceAsMap()?.toMap()?.get("properties") as? Map<String,Any>?
gmr = GetMappingsRequest().indices(this.followerIndexName).indicesOptions(options)
mappingResponse = client.suspending(client.admin().indices()::getMappings, injectSecurityContext = true)(gmr)
@Suppress("UNCHECKED_CAST")
val followerProperties = mappingResponse?.mappings()?.get(this.followerIndexName)?.get("_doc")?.sourceAsMap()?.toMap()?.get("properties") as? Map<String,Any>?
run updateMappingLoop@ {
followerProperties?.forEach { iter ->
if (leaderProperties?.containsKey(iter.key) == true && leaderProperties.getValue(iter.key).toString() != (iter.value).toString()) {
log.debug("Updating Multi-field Mapping at Follower")
updateFollowerMapping(this.followerIndexName, leaderMappingSource)
return@updateMappingLoop
}
}
}

} catch (e: Exception) {
log.error("Error in getting the required metadata ${e.stackTraceToString()}")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
package org.opensearch.replication.integ.rest


import kotlinx.coroutines.delay
import org.opensearch.replication.IndexUtil
import org.opensearch.replication.MultiClusterAnnotations
import org.opensearch.replication.MultiClusterRestTestCase
Expand Down Expand Up @@ -77,6 +78,7 @@ import java.util.*
import java.util.concurrent.TimeUnit



@MultiClusterAnnotations.ClusterConfigurations(
MultiClusterAnnotations.ClusterConfiguration(clusterName = LEADER),
MultiClusterAnnotations.ClusterConfiguration(clusterName = FOLLOWER)
Expand Down Expand Up @@ -1012,6 +1014,80 @@ class StartReplicationIT: MultiClusterRestTestCase() {
.hasMessageContaining("Primary shards in the Index[source:${leaderIndexName}] are not active")
}

fun `test that follower index mapping updates when leader index gets multi-field mapping`() {
val followerClient = getClientForCluster(FOLLOWER)
val leaderClient = getClientForCluster(LEADER)
createConnectionBetweenClusters(FOLLOWER, LEADER)
val createIndexResponse = leaderClient.indices().create(CreateIndexRequest(leaderIndexName), RequestOptions.DEFAULT)
assertThat(createIndexResponse.isAcknowledged).isTrue()
var putMappingRequest = PutMappingRequest(leaderIndexName)
putMappingRequest.source("{\"properties\":{\"field1\":{\"type\":\"text\"}}}", XContentType.JSON)
leaderClient.indices().putMapping(putMappingRequest, RequestOptions.DEFAULT)
val sourceMap = mapOf("field1" to randomAlphaOfLength(5))
leaderClient.index(IndexRequest(leaderIndexName).id("1").source(sourceMap), RequestOptions.DEFAULT)
followerClient.startReplication(StartReplicationRequest("source", leaderIndexName, followerIndexName),
waitForRestore = true)
assertBusy {
assertThat(followerClient.indices()
.exists(GetIndexRequest(followerIndexName), RequestOptions.DEFAULT))
.isEqualTo(true)
}
Assert.assertEquals(
leaderClient.indices().getMapping(GetMappingsRequest().indices(leaderIndexName), RequestOptions.DEFAULT)
.mappings()[leaderIndexName],
followerClient.indices().getMapping(GetMappingsRequest().indices(followerIndexName), RequestOptions.DEFAULT)
.mappings()[followerIndexName]
)
putMappingRequest = PutMappingRequest(leaderIndexName)
putMappingRequest.source("{\"properties\":{\"field1\":{\"type\":\"text\",\"fields\":{\"field2\":{\"type\":\"text\",\"analyzer\":\"standard\"},\"field3\":{\"type\":\"text\",\"analyzer\":\"standard\"}}}}}",XContentType.JSON)
leaderClient.indices().putMapping(putMappingRequest, RequestOptions.DEFAULT)
val leaderMappings = leaderClient.indices().getMapping(GetMappingsRequest().indices(leaderIndexName), RequestOptions.DEFAULT)
.mappings()[leaderIndexName]
TimeUnit.MINUTES.sleep(2)
Assert.assertEquals(
leaderMappings,
followerClient.indices().getMapping(GetMappingsRequest().indices(followerIndexName), RequestOptions.DEFAULT)
.mappings()[followerIndexName]
)
}

fun `test that follower index mapping does not update when only new fields are added but not respective docs in leader index`() {
val followerClient = getClientForCluster(FOLLOWER)
val leaderClient = getClientForCluster(LEADER)
createConnectionBetweenClusters(FOLLOWER, LEADER)
val createIndexResponse = leaderClient.indices().create(CreateIndexRequest(leaderIndexName), RequestOptions.DEFAULT)
assertThat(createIndexResponse.isAcknowledged).isTrue()
var putMappingRequest = PutMappingRequest(leaderIndexName)
putMappingRequest.source("{\"properties\":{\"name\":{\"type\":\"text\"}}}", XContentType.JSON)
leaderClient.indices().putMapping(putMappingRequest, RequestOptions.DEFAULT)
val sourceMap = mapOf("name" to randomAlphaOfLength(5))
leaderClient.index(IndexRequest(leaderIndexName).id("1").source(sourceMap), RequestOptions.DEFAULT)
followerClient.startReplication(StartReplicationRequest("source", leaderIndexName, followerIndexName),
waitForRestore = true)
assertBusy {
assertThat(followerClient.indices()
.exists(GetIndexRequest(followerIndexName), RequestOptions.DEFAULT))
.isEqualTo(true)
}
Assert.assertEquals(
leaderClient.indices().getMapping(GetMappingsRequest().indices(leaderIndexName), RequestOptions.DEFAULT)
.mappings()[leaderIndexName],
followerClient.indices().getMapping(GetMappingsRequest().indices(followerIndexName), RequestOptions.DEFAULT)
.mappings()[followerIndexName]
)
putMappingRequest = PutMappingRequest(leaderIndexName)
putMappingRequest.source("{\"properties\":{\"name\":{\"type\":\"text\"},\"age\":{\"type\":\"integer\"}}}",XContentType.JSON)
leaderClient.indices().putMapping(putMappingRequest, RequestOptions.DEFAULT)
val leaderMappings = leaderClient.indices().getMapping(GetMappingsRequest().indices(leaderIndexName), RequestOptions.DEFAULT)
.mappings()[leaderIndexName]
TimeUnit.MINUTES.sleep(2)
Assert.assertNotEquals(
leaderMappings,
followerClient.indices().getMapping(GetMappingsRequest().indices(followerIndexName), RequestOptions.DEFAULT)
.mappings()[followerIndexName]
)
}

private fun excludeAllClusterNodes(clusterName: String) {
val transientSettingsRequest = Request("PUT", "_cluster/settings")
// Get IPs directly from the cluster to handle all cases - single node cluster, multi node cluster and remote test cluster.
Expand Down

0 comments on commit 29ac04b

Please sign in to comment.