diff --git a/src/main/kotlin/org/opensearch/replication/action/replay/TransportReplayChangesAction.kt b/src/main/kotlin/org/opensearch/replication/action/replay/TransportReplayChangesAction.kt index 9b3a78b63..6a42c5883 100644 --- a/src/main/kotlin/org/opensearch/replication/action/replay/TransportReplayChangesAction.kt +++ b/src/main/kotlin/org/opensearch/replication/action/replay/TransportReplayChangesAction.kt @@ -26,14 +26,18 @@ import kotlinx.coroutines.asCoroutineDispatcher import kotlinx.coroutines.launch import org.apache.logging.log4j.LogManager import org.opensearch.action.ActionListener +import org.opensearch.action.admin.indices.alias.IndicesAliasesRequest import org.opensearch.action.admin.indices.mapping.get.GetMappingsRequest import org.opensearch.action.admin.indices.mapping.put.PutMappingRequest +import org.opensearch.action.admin.indices.settings.get.GetSettingsRequest +import org.opensearch.action.admin.indices.settings.put.UpdateSettingsRequest import org.opensearch.action.index.IndexRequest import org.opensearch.action.resync.TransportResyncReplicationAction import org.opensearch.action.support.ActionFilters import org.opensearch.action.support.IndicesOptions import org.opensearch.action.support.replication.TransportWriteAction import org.opensearch.client.Client +import org.opensearch.client.Requests import org.opensearch.cluster.ClusterStateObserver import org.opensearch.cluster.action.index.MappingUpdatedAction import org.opensearch.cluster.action.shard.ShardStateAction @@ -43,6 +47,7 @@ import org.opensearch.common.bytes.BytesReference import org.opensearch.common.inject.Inject import org.opensearch.common.io.stream.StreamInput import org.opensearch.common.io.stream.Writeable +import org.opensearch.common.settings.IndexScopedSettings import org.opensearch.common.settings.Settings import org.opensearch.common.xcontent.XContentType import org.opensearch.index.IndexingPressureService @@ -52,6 +57,8 @@ import org.opensearch.index.shard.IndexShard import org.opensearch.index.translog.Translog import org.opensearch.indices.IndicesService import org.opensearch.indices.SystemIndices +import org.opensearch.replication.ReplicationPlugin +import org.opensearch.replication.task.index.IndexReplicationTask import org.opensearch.threadpool.ThreadPool import org.opensearch.transport.TransportService import java.util.function.Function @@ -124,6 +131,11 @@ class TransportReplayChangesAction @Inject constructor(settings: Settings, trans var result = primaryShard.applyTranslogOperation(op, Engine.Operation.Origin.PRIMARY) if (shouldSyncMappingAndRetry(result)) { waitForMappingUpdate { + // fetch index settings from the leader cluster when applying on PRIMARY... + try{ + syncRemoteSettings(request.leaderAlias, request.leaderIndex, request.shardId()!!.indexName) + }catch (e:Exception){ + } // fetch mappings from the leader cluster when applying on PRIMARY... syncRemoteMapping(request.leaderAlias, request.leaderIndex, request.shardId()!!.indexName) } @@ -166,6 +178,120 @@ class TransportReplayChangesAction @Inject constructor(settings: Settings, trans return WriteReplicaResult(request, location, null, replicaShard, log) } + // fetches the index settings from the leader cluster and applies it to the local cluster's index settings. + private suspend fun syncRemoteSettings(leaderAlias: String, leaderIndex: String, followerIndex: String) { + + log.debug("Syncing index settings from ${leaderAlias}:${leaderIndex} -> ${followerIndex}...") + val remoteClient = client.getRemoteClusterClient(leaderAlias) + + var staticUpdated = false + + var gsr = GetSettingsRequest().includeDefaults(false).indices(leaderIndex) + var settingsResponse = remoteClient.suspending(remoteClient.admin().indices()::getSettings, injectSecurityContext = true)(gsr) + var leaderSettings = settingsResponse.indexToSettings.getOrDefault(leaderIndex, Settings.EMPTY) + leaderSettings = leaderSettings.filter { k: String -> + !IndexReplicationTask.blockListedSettings.contains(k) + } + + gsr = GetSettingsRequest().includeDefaults(false).indices(followerIndex) + settingsResponse = client.suspending(client.admin().indices()::getSettings, injectSecurityContext = true)(gsr) + var followerSettings = settingsResponse.indexToSettings.getOrDefault(followerIndex, Settings.EMPTY) + + followerSettings = followerSettings.filter { k: String -> + k != ReplicationPlugin.REPLICATED_INDEX_SETTING.key + } + + val desiredSettingsBuilder = Settings.builder() + val indexScopedSettings = IndexScopedSettings(followerSettings, IndexScopedSettings.BUILT_IN_INDEX_SETTINGS) + val settingsList = arrayOf(leaderSettings) + for (settings in settingsList) { + for (key in settings.keySet()) { + if (indexScopedSettings.isPrivateSetting(key)) { + continue + } + val setting = indexScopedSettings[key] + if (!setting.isPrivateIndex) { + desiredSettingsBuilder.copy(key, settings); + } + } + } + val desiredSettings = desiredSettingsBuilder.build() + + val changedSettingsBuilder = Settings.builder() + for(key in desiredSettings.keySet()) { + if (desiredSettings.get(key) != followerSettings.get(key)) { + //Not intended setting on follower side. + val setting = indexScopedSettings[key] + if (indexScopedSettings.isPrivateSetting(key)) { + continue + } + if (!setting.isDynamic()) { + staticUpdated = true + } + log.info("Adding setting $key for $followerIndex") + changedSettingsBuilder.copy(key, desiredSettings); + } + } + + for (key in followerSettings.keySet()) { + val setting = indexScopedSettings[key] + if (setting == null || setting.isPrivateIndex) { + continue + } + + if (desiredSettings.get(key) == null) { + if (!setting.isDynamic()) { + staticUpdated = true + } + log.info("Removing setting $key from $followerIndex") + changedSettingsBuilder.putNull(key) + } + } + val changedSettings = changedSettingsBuilder.build() + + var updateSettingsRequest :UpdateSettingsRequest? + + if (changedSettings.keySet().size == 0) { + log.debug("No settings to apply") + updateSettingsRequest = null + } else { + updateSettingsRequest = Requests.updateSettingsRequest(followerIndex) + updateSettingsRequest.settings(changedSettings) + log.info("Got index settings to apply ${changedSettings} for $followerIndex") + } + + + var request : IndicesAliasesRequest? = null + var metadataUpdate : IndexReplicationTask.MetadataUpdate? = null + if (updateSettingsRequest != null) { + metadataUpdate = IndexReplicationTask.MetadataUpdate(updateSettingsRequest, request, staticUpdated) + } else { + metadataUpdate = null + } + + var updateSettingsRequestFinal = metadataUpdate!!.updateSettingsRequest + if (updateSettingsRequestFinal == null) { + return + } + try{ + + log.info("Closing the index $followerIndex to apply static settings now") + var updateRequest = UpdateMetadataRequest(followerIndex, UpdateMetadataRequest.Type.CLOSE, Requests.closeIndexRequest(followerIndex)) + client.suspendExecute(UpdateMetadataAction.INSTANCE, updateRequest, injectSecurityContext = true) + log.info("Closed the index $followerIndex to apply static settings now") + + updateRequest = UpdateMetadataRequest(followerIndex, UpdateMetadataRequest.Type.SETTING, updateSettingsRequestFinal) + client.suspendExecute(UpdateMetadataAction.INSTANCE, updateRequest, injectSecurityContext = true) + + }finally { + val updateRequest = UpdateMetadataRequest(followerIndex, UpdateMetadataRequest.Type.OPEN, Requests.openIndexRequest(followerIndex)) + client.suspendExecute(UpdateMetadataAction.INSTANCE, updateRequest, injectSecurityContext = true) + log.info("Opened the index $followerIndex now post applying static settings") + } + + } + + /** * Fetches the index mapping from the leader cluster, applies it to the local cluster's clusterManager and then waits * for the mapping to become available on the current shard. Should only be called on the primary shard .