Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Sync settings #994

Closed
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -26,14 +26,18 @@ import kotlinx.coroutines.asCoroutineDispatcher
import kotlinx.coroutines.launch
import org.apache.logging.log4j.LogManager
import org.opensearch.action.ActionListener
import org.opensearch.action.admin.indices.alias.IndicesAliasesRequest
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will remove unused imports in next commit.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1

import org.opensearch.action.admin.indices.mapping.get.GetMappingsRequest
import org.opensearch.action.admin.indices.mapping.put.PutMappingRequest
import org.opensearch.action.admin.indices.settings.get.GetSettingsRequest
import org.opensearch.action.admin.indices.settings.put.UpdateSettingsRequest
import org.opensearch.action.index.IndexRequest
import org.opensearch.action.resync.TransportResyncReplicationAction
import org.opensearch.action.support.ActionFilters
import org.opensearch.action.support.IndicesOptions
import org.opensearch.action.support.replication.TransportWriteAction
import org.opensearch.client.Client
import org.opensearch.client.Requests
import org.opensearch.cluster.ClusterStateObserver
import org.opensearch.cluster.action.index.MappingUpdatedAction
import org.opensearch.cluster.action.shard.ShardStateAction
Expand All @@ -43,6 +47,7 @@ import org.opensearch.common.bytes.BytesReference
import org.opensearch.common.inject.Inject
import org.opensearch.common.io.stream.StreamInput
import org.opensearch.common.io.stream.Writeable
import org.opensearch.common.settings.IndexScopedSettings
import org.opensearch.common.settings.Settings
import org.opensearch.common.xcontent.XContentType
import org.opensearch.index.IndexingPressureService
Expand All @@ -52,6 +57,8 @@ import org.opensearch.index.shard.IndexShard
import org.opensearch.index.translog.Translog
import org.opensearch.indices.IndicesService
import org.opensearch.indices.SystemIndices
import org.opensearch.replication.ReplicationPlugin
import org.opensearch.replication.task.index.IndexReplicationTask
import org.opensearch.threadpool.ThreadPool
import org.opensearch.transport.TransportService
import java.util.function.Function
Expand Down Expand Up @@ -124,6 +131,11 @@ class TransportReplayChangesAction @Inject constructor(settings: Settings, trans
var result = primaryShard.applyTranslogOperation(op, Engine.Operation.Origin.PRIMARY)
if (shouldSyncMappingAndRetry(result)) {
waitForMappingUpdate {
// fetch index settings from the leader cluster when applying on PRIMARY...
try{
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should not catch the exception

syncRemoteSettings(request.leaderAlias, request.leaderIndex, request.shardId()!!.indexName)
}catch (e:Exception){
}
// fetch mappings from the leader cluster when applying on PRIMARY...
syncRemoteMapping(request.leaderAlias, request.leaderIndex, request.shardId()!!.indexName)
}
Expand Down Expand Up @@ -166,6 +178,120 @@ class TransportReplayChangesAction @Inject constructor(settings: Settings, trans
return WriteReplicaResult(request, location, null, replicaShard, log)
}

// fetches the index settings from the leader cluster and applies it to the local cluster's index settings.
private suspend fun syncRemoteSettings(leaderAlias: String, leaderIndex: String, followerIndex: String) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Like discussed offline, let's use common logic between replay and index replication task for mapping and settings sync.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ack

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This function is being added mainly to be able to update mappings which use analyzer defined in the settings.
We can reduce the scope of this function to just check if there is any change in the index.analysis.* settings and then update that. This way we can remove most of the logic of creating the settings and closing the index.


log.debug("Syncing index settings from ${leaderAlias}:${leaderIndex} -> ${followerIndex}...")
val remoteClient = client.getRemoteClusterClient(leaderAlias)

var staticUpdated = false

var gsr = GetSettingsRequest().includeDefaults(false).indices(leaderIndex)
var settingsResponse = remoteClient.suspending(remoteClient.admin().indices()::getSettings, injectSecurityContext = true)(gsr)
var leaderSettings = settingsResponse.indexToSettings.getOrDefault(leaderIndex, Settings.EMPTY)
leaderSettings = leaderSettings.filter { k: String ->
!IndexReplicationTask.blockListedSettings.contains(k)
}

gsr = GetSettingsRequest().includeDefaults(false).indices(followerIndex)
settingsResponse = client.suspending(client.admin().indices()::getSettings, injectSecurityContext = true)(gsr)
var followerSettings = settingsResponse.indexToSettings.getOrDefault(followerIndex, Settings.EMPTY)

followerSettings = followerSettings.filter { k: String ->
k != ReplicationPlugin.REPLICATED_INDEX_SETTING.key
}

val desiredSettingsBuilder = Settings.builder()
val indexScopedSettings = IndexScopedSettings(followerSettings, IndexScopedSettings.BUILT_IN_INDEX_SETTINGS)
val settingsList = arrayOf(leaderSettings)
for (settings in settingsList) {
for (key in settings.keySet()) {
if (indexScopedSettings.isPrivateSetting(key)) {
continue
}
val setting = indexScopedSettings[key]
if (!setting.isPrivateIndex) {
desiredSettingsBuilder.copy(key, settings);
}
}
}
val desiredSettings = desiredSettingsBuilder.build()

val changedSettingsBuilder = Settings.builder()
for(key in desiredSettings.keySet()) {
if (desiredSettings.get(key) != followerSettings.get(key)) {
//Not intended setting on follower side.
val setting = indexScopedSettings[key]
if (indexScopedSettings.isPrivateSetting(key)) {
continue
}
if (!setting.isDynamic()) {
staticUpdated = true
}
log.info("Adding setting $key for $followerIndex")
changedSettingsBuilder.copy(key, desiredSettings);
}
}

for (key in followerSettings.keySet()) {
val setting = indexScopedSettings[key]
if (setting == null || setting.isPrivateIndex) {
continue
}

if (desiredSettings.get(key) == null) {
if (!setting.isDynamic()) {
staticUpdated = true
}
log.info("Removing setting $key from $followerIndex")
changedSettingsBuilder.putNull(key)
}
}
val changedSettings = changedSettingsBuilder.build()

var updateSettingsRequest :UpdateSettingsRequest?

if (changedSettings.keySet().size == 0) {
log.debug("No settings to apply")
updateSettingsRequest = null
} else {
updateSettingsRequest = Requests.updateSettingsRequest(followerIndex)
updateSettingsRequest.settings(changedSettings)
log.info("Got index settings to apply ${changedSettings} for $followerIndex")
}


var request : IndicesAliasesRequest? = null
var metadataUpdate : IndexReplicationTask.MetadataUpdate? = null
if (updateSettingsRequest != null) {
metadataUpdate = IndexReplicationTask.MetadataUpdate(updateSettingsRequest, request, staticUpdated)
} else {
metadataUpdate = null
}

var updateSettingsRequestFinal = metadataUpdate!!.updateSettingsRequest
if (updateSettingsRequestFinal == null) {
return
}
try{

log.info("Closing the index $followerIndex to apply static settings now")
var updateRequest = UpdateMetadataRequest(followerIndex, UpdateMetadataRequest.Type.CLOSE, Requests.closeIndexRequest(followerIndex))
client.suspendExecute(UpdateMetadataAction.INSTANCE, updateRequest, injectSecurityContext = true)
log.info("Closed the index $followerIndex to apply static settings now")

updateRequest = UpdateMetadataRequest(followerIndex, UpdateMetadataRequest.Type.SETTING, updateSettingsRequestFinal)
client.suspendExecute(UpdateMetadataAction.INSTANCE, updateRequest, injectSecurityContext = true)

}finally {
val updateRequest = UpdateMetadataRequest(followerIndex, UpdateMetadataRequest.Type.OPEN, Requests.openIndexRequest(followerIndex))
client.suspendExecute(UpdateMetadataAction.INSTANCE, updateRequest, injectSecurityContext = true)
log.info("Opened the index $followerIndex now post applying static settings")
}

}


/**
* Fetches the index mapping from the leader cluster, applies it to the local cluster's clusterManager and then waits
* for the mapping to become available on the current shard. Should only be called on the primary shard .
Expand Down