diff --git a/src/main/kotlin/org/opensearch/replication/task/index/IndexReplicationTask.kt b/src/main/kotlin/org/opensearch/replication/task/index/IndexReplicationTask.kt index b4662aa3..b0af5a54 100644 --- a/src/main/kotlin/org/opensearch/replication/task/index/IndexReplicationTask.kt +++ b/src/main/kotlin/org/opensearch/replication/task/index/IndexReplicationTask.kt @@ -401,12 +401,9 @@ open class IndexReplicationTask(id: Long, type: String, action: String, descript } } - private suspend fun UpdateFollowereMapping(followerIndex: String,mappingSource: String) { + private suspend fun updateFollowerMapping(followerIndex: String,mappingSource: String?) { val options = IndicesOptions.strictSingleIndexNoExpandForbidClosed() - if (null == mappingSource) { - throw MappingNotAvailableException("MappingSource is not available") - } val putMappingRequest = PutMappingRequest().indices(followerIndex).indicesOptions(options) .source(mappingSource, XContentType.JSON) val updateMappingRequest = UpdateMetadataRequest(followerIndex, UpdateMetadataRequest.Type.MAPPING, putMappingRequest) @@ -557,19 +554,20 @@ open class IndexReplicationTask(id: Long, type: String, action: String, descript val options = IndicesOptions.strictSingleIndexNoExpandForbidClosed() var gmr = GetMappingsRequest().indices(this.leaderIndex.name).indicesOptions(options) var mappingResponse = remoteClient.suspending(remoteClient.admin().indices()::getMappings, injectSecurityContext = true)(gmr) - var leaderMappingSource = mappingResponse.mappings.get(this.leaderIndex.name).source().toString() - val leaderProperties = mappingResponse.mappings().get(this.leaderIndex.name).sourceAsMap().toMap().get("properties") as Map + var leaderMappingSource = mappingResponse?.mappings?.get(this.leaderIndex.name)?.source()?.toString() + @Suppress("UNCHECKED_CAST") + val leaderProperties = mappingResponse?.mappings()?.get(this.leaderIndex.name)?.sourceAsMap()?.toMap()?.get("properties") as? Map? gmr = GetMappingsRequest().indices(this.followerIndexName).indicesOptions(options) mappingResponse = client.suspending(client.admin().indices()::getMappings, injectSecurityContext = true)(gmr) - val followerProperties = mappingResponse.mappings().get(this.followerIndexName).sourceAsMap().toMap().get("properties") as Map - for(iter in followerProperties) { - if(leaderProperties.containsKey(iter.key) && leaderProperties.getValue(iter.key).toString()!=(iter.value).toString()){ - log.info("Updating Multi-field Mapping at Follower") - UpdateFollowereMapping(this.followerIndexName,leaderMappingSource) - break; + @Suppress("UNCHECKED_CAST") + val followerProperties = mappingResponse?.mappings()?.get(this.followerIndexName)?.sourceAsMap()?.toMap()?.get("properties") as? Map? + for((key,value) in followerProperties?: emptyMap()) { + if (leaderProperties?.getValue(key).toString() != (value).toString()) { + log.debug("Updating Multi-field Mapping at Follower") + updateFollowerMapping(this.followerIndexName, leaderMappingSource) + break } } - } catch (e: Exception) { log.error("Error in getting the required metadata ${e.stackTraceToString()}") } finally {