Skip to content

Commit

Permalink
Merge b3a3e28 into 93205a1
Browse files Browse the repository at this point in the history
  • Loading branch information
monusingh-1 authored Jun 14, 2023
2 parents 93205a1 + b3a3e28 commit c6f4bef
Showing 1 changed file with 126 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -26,14 +26,18 @@ import kotlinx.coroutines.asCoroutineDispatcher
import kotlinx.coroutines.launch
import org.apache.logging.log4j.LogManager
import org.opensearch.action.ActionListener
import org.opensearch.action.admin.indices.alias.IndicesAliasesRequest
import org.opensearch.action.admin.indices.mapping.get.GetMappingsRequest
import org.opensearch.action.admin.indices.mapping.put.PutMappingRequest
import org.opensearch.action.admin.indices.settings.get.GetSettingsRequest
import org.opensearch.action.admin.indices.settings.put.UpdateSettingsRequest
import org.opensearch.action.index.IndexRequest
import org.opensearch.action.resync.TransportResyncReplicationAction
import org.opensearch.action.support.ActionFilters
import org.opensearch.action.support.IndicesOptions
import org.opensearch.action.support.replication.TransportWriteAction
import org.opensearch.client.Client
import org.opensearch.client.Requests
import org.opensearch.cluster.ClusterStateObserver
import org.opensearch.cluster.action.index.MappingUpdatedAction
import org.opensearch.cluster.action.shard.ShardStateAction
Expand All @@ -43,6 +47,7 @@ import org.opensearch.common.bytes.BytesReference
import org.opensearch.common.inject.Inject
import org.opensearch.common.io.stream.StreamInput
import org.opensearch.common.io.stream.Writeable
import org.opensearch.common.settings.IndexScopedSettings
import org.opensearch.common.settings.Settings
import org.opensearch.common.xcontent.XContentType
import org.opensearch.index.IndexingPressureService
Expand All @@ -52,6 +57,8 @@ import org.opensearch.index.shard.IndexShard
import org.opensearch.index.translog.Translog
import org.opensearch.indices.IndicesService
import org.opensearch.indices.SystemIndices
import org.opensearch.replication.ReplicationPlugin
import org.opensearch.replication.task.index.IndexReplicationTask
import org.opensearch.threadpool.ThreadPool
import org.opensearch.transport.TransportService
import java.util.function.Function
Expand Down Expand Up @@ -124,6 +131,11 @@ class TransportReplayChangesAction @Inject constructor(settings: Settings, trans
var result = primaryShard.applyTranslogOperation(op, Engine.Operation.Origin.PRIMARY)
if (shouldSyncMappingAndRetry(result)) {
waitForMappingUpdate {
// fetch index settings from the leader cluster when applying on PRIMARY...
try{
syncRemoteSettings(request.leaderAlias, request.leaderIndex, request.shardId()!!.indexName)
}catch (e:Exception){
}
// fetch mappings from the leader cluster when applying on PRIMARY...
syncRemoteMapping(request.leaderAlias, request.leaderIndex, request.shardId()!!.indexName)
}
Expand Down Expand Up @@ -166,6 +178,120 @@ class TransportReplayChangesAction @Inject constructor(settings: Settings, trans
return WriteReplicaResult(request, location, null, replicaShard, log)
}

// fetches the index settings from the leader cluster and applies it to the local cluster's index settings.
private suspend fun syncRemoteSettings(leaderAlias: String, leaderIndex: String, followerIndex: String) {

log.debug("Syncing index settings from ${leaderAlias}:${leaderIndex} -> ${followerIndex}...")
val remoteClient = client.getRemoteClusterClient(leaderAlias)

var staticUpdated = false

var gsr = GetSettingsRequest().includeDefaults(false).indices(leaderIndex)
var settingsResponse = remoteClient.suspending(remoteClient.admin().indices()::getSettings, injectSecurityContext = true)(gsr)
var leaderSettings = settingsResponse.indexToSettings.getOrDefault(leaderIndex, Settings.EMPTY)
leaderSettings = leaderSettings.filter { k: String ->
!IndexReplicationTask.blockListedSettings.contains(k)
}

gsr = GetSettingsRequest().includeDefaults(false).indices(followerIndex)
settingsResponse = client.suspending(client.admin().indices()::getSettings, injectSecurityContext = true)(gsr)
var followerSettings = settingsResponse.indexToSettings.getOrDefault(followerIndex, Settings.EMPTY)

followerSettings = followerSettings.filter { k: String ->
k != ReplicationPlugin.REPLICATED_INDEX_SETTING.key
}

val desiredSettingsBuilder = Settings.builder()
val indexScopedSettings = IndexScopedSettings(followerSettings, IndexScopedSettings.BUILT_IN_INDEX_SETTINGS)
val settingsList = arrayOf(leaderSettings)
for (settings in settingsList) {
for (key in settings.keySet()) {
if (indexScopedSettings.isPrivateSetting(key)) {
continue
}
val setting = indexScopedSettings[key]
if (!setting.isPrivateIndex) {
desiredSettingsBuilder.copy(key, settings);
}
}
}
val desiredSettings = desiredSettingsBuilder.build()

val changedSettingsBuilder = Settings.builder()
for(key in desiredSettings.keySet()) {
if (desiredSettings.get(key) != followerSettings.get(key)) {
//Not intended setting on follower side.
val setting = indexScopedSettings[key]
if (indexScopedSettings.isPrivateSetting(key)) {
continue
}
if (!setting.isDynamic()) {
staticUpdated = true
}
log.info("Adding setting $key for $followerIndex")
changedSettingsBuilder.copy(key, desiredSettings);
}
}

for (key in followerSettings.keySet()) {
val setting = indexScopedSettings[key]
if (setting == null || setting.isPrivateIndex) {
continue
}

if (desiredSettings.get(key) == null) {
if (!setting.isDynamic()) {
staticUpdated = true
}
log.info("Removing setting $key from $followerIndex")
changedSettingsBuilder.putNull(key)
}
}
val changedSettings = changedSettingsBuilder.build()

var updateSettingsRequest :UpdateSettingsRequest?

if (changedSettings.keySet().size == 0) {
log.debug("No settings to apply")
updateSettingsRequest = null
} else {
updateSettingsRequest = Requests.updateSettingsRequest(followerIndex)
updateSettingsRequest.settings(changedSettings)
log.info("Got index settings to apply ${changedSettings} for $followerIndex")
}


var request : IndicesAliasesRequest? = null
var metadataUpdate : IndexReplicationTask.MetadataUpdate? = null
if (updateSettingsRequest != null) {
metadataUpdate = IndexReplicationTask.MetadataUpdate(updateSettingsRequest, request, staticUpdated)
} else {
metadataUpdate = null
}

var updateSettingsRequestFinal = metadataUpdate!!.updateSettingsRequest
if (updateSettingsRequestFinal == null) {
return
}
try{

log.info("Closing the index $followerIndex to apply static settings now")
var updateRequest = UpdateMetadataRequest(followerIndex, UpdateMetadataRequest.Type.CLOSE, Requests.closeIndexRequest(followerIndex))
client.suspendExecute(UpdateMetadataAction.INSTANCE, updateRequest, injectSecurityContext = true)
log.info("Closed the index $followerIndex to apply static settings now")

updateRequest = UpdateMetadataRequest(followerIndex, UpdateMetadataRequest.Type.SETTING, updateSettingsRequestFinal)
client.suspendExecute(UpdateMetadataAction.INSTANCE, updateRequest, injectSecurityContext = true)

}finally {
val updateRequest = UpdateMetadataRequest(followerIndex, UpdateMetadataRequest.Type.OPEN, Requests.openIndexRequest(followerIndex))
client.suspendExecute(UpdateMetadataAction.INSTANCE, updateRequest, injectSecurityContext = true)
log.info("Opened the index $followerIndex now post applying static settings")
}

}


/**
* Fetches the index mapping from the leader cluster, applies it to the local cluster's clusterManager and then waits
* for the mapping to become available on the current shard. Should only be called on the primary shard .
Expand Down

0 comments on commit c6f4bef

Please sign in to comment.