Skip to content

Commit

Permalink
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Modified _stop replication API to remove any stale replication settin…
Browse files Browse the repository at this point in the history
…gs on existing index (#410)

Signed-off-by: Sai Kumar <[email protected]>

(cherry picked from commit 0bad307)
saikaranam-amazon authored and github-actions[bot] committed Aug 24, 2022
1 parent 3c81c0e commit 78fead7
Showing 3 changed files with 128 additions and 14 deletions.
Original file line number Diff line number Diff line change
@@ -22,10 +22,7 @@ import org.opensearch.replication.metadata.UpdateMetadataAction
import org.opensearch.replication.metadata.UpdateMetadataRequest
import org.opensearch.replication.metadata.state.REPLICATION_LAST_KNOWN_OVERALL_STATE
import org.opensearch.replication.metadata.state.getReplicationStateParamsForIndex
import org.opensearch.replication.metadata.store.ReplicationMetadata
import org.opensearch.replication.seqno.RemoteClusterRetentionLeaseHelper
import org.opensearch.replication.task.index.IndexReplicationParams
import org.opensearch.replication.util.completeWith
import org.opensearch.replication.util.coroutineContext
import org.opensearch.replication.util.suspendExecute
import org.opensearch.replication.util.suspending
@@ -39,7 +36,6 @@ import org.opensearch.OpenSearchException
import org.opensearch.action.ActionListener
import org.opensearch.action.admin.indices.open.OpenIndexRequest
import org.opensearch.action.support.ActionFilters
import org.opensearch.action.support.IndicesOptions
import org.opensearch.action.support.master.AcknowledgedResponse
import org.opensearch.action.support.master.TransportMasterNodeAction
import org.opensearch.client.Client
@@ -57,8 +53,6 @@ import org.opensearch.cluster.service.ClusterService
import org.opensearch.common.inject.Inject
import org.opensearch.common.io.stream.StreamInput
import org.opensearch.common.settings.Settings
import org.opensearch.index.IndexNotFoundException
import org.opensearch.index.shard.ShardId
import org.opensearch.replication.util.stackTraceToString
import org.opensearch.threadpool.ThreadPool
import org.opensearch.transport.TransportService
@@ -99,7 +93,7 @@ class TransportStopIndexReplicationAction @Inject constructor(transportService:
throw OpenSearchException("Failed to remove index block on ${request.indexName}")
}

validateStopReplicationRequest(request)
validateReplicationStateOfIndex(request)

// Index will be deleted if replication is stopped while it is restoring. So no need to close/reopen
val restoring = clusterService.state().custom<RestoreInProgress>(RestoreInProgress.TYPE, RestoreInProgress.EMPTY).any { entry ->
@@ -117,8 +111,9 @@ class TransportStopIndexReplicationAction @Inject constructor(transportService:
throw OpenSearchException("Unable to close index: ${request.indexName}")
}
}
val replMetadata = replicationMetadataManager.getIndexReplicationMetadata(request.indexName)

try {
val replMetadata = replicationMetadataManager.getIndexReplicationMetadata(request.indexName)
val remoteClient = client.getRemoteClusterClient(replMetadata.connectionName)
val retentionLeaseHelper = RemoteClusterRetentionLeaseHelper(clusterService.clusterName.value(), remoteClient)
retentionLeaseHelper.attemptRemoveRetentionLease(clusterService, replMetadata, request.indexName)
@@ -127,12 +122,12 @@ class TransportStopIndexReplicationAction @Inject constructor(transportService:
}

val clusterStateUpdateResponse : AcknowledgedResponse =
clusterService.waitForClusterStateUpdate("stop_replication") { l -> StopReplicationTask(request, l)}
clusterService.waitForClusterStateUpdate("stop_replication") { l -> StopReplicationTask(request, l)}
if (!clusterStateUpdateResponse.isAcknowledged) {
throw OpenSearchException("Failed to update cluster state")
}

// Index will be deleted if stop is called while it is restoring. So no need to reopen
// Index will be deleted if stop is called while it is restoring. So no need to reopen
if (!restoring &&
state.routingTable.hasIndex(request.indexName)) {
val reopenResponse = client.suspending(client.admin().indices()::open, injectSecurityContext = true)(OpenIndexRequest(request.indexName))
@@ -149,7 +144,15 @@ class TransportStopIndexReplicationAction @Inject constructor(transportService:
}
}

private fun validateStopReplicationRequest(request: StopIndexReplicationRequest) {
private fun validateReplicationStateOfIndex(request: StopIndexReplicationRequest) {
// If replication blocks/settings are present, Stop action should proceed with the clean-up
// This can happen during settings of follower index are carried over in the snapshot and the restore is
// performed using this snapshot.
if (clusterService.state().blocks.hasIndexBlock(request.indexName, INDEX_REPLICATION_BLOCK)
|| clusterService.state().metadata.index(request.indexName)?.settings?.get(REPLICATED_INDEX_SETTING.key) != null) {
return
}

val replicationStateParams = getReplicationStateParamsForIndex(clusterService, request.indexName)
?:
throw IllegalArgumentException("No replication in progress for index:${request.indexName}")
@@ -187,13 +190,15 @@ class TransportStopIndexReplicationAction @Inject constructor(transportService:
val mdBuilder = Metadata.builder(currentState.metadata)
// remove replicated index setting
val currentIndexMetadata = currentState.metadata.index(request.indexName)
if (currentIndexMetadata != null) {
if (currentIndexMetadata != null &&
currentIndexMetadata.settings[REPLICATED_INDEX_SETTING.key] != null) {
val newIndexMetadata = IndexMetadata.builder(currentIndexMetadata)
.settings(Settings.builder().put(currentIndexMetadata.settings).putNull(REPLICATED_INDEX_SETTING.key))
.settingsVersion(1 + currentIndexMetadata.settingsVersion)
mdBuilder.put(newIndexMetadata)
}
newState.metadata(mdBuilder)

return newState.build()
}

Original file line number Diff line number Diff line change
@@ -108,6 +108,9 @@ abstract class MultiClusterRestTestCase : OpenSearchTestCase() {
val lowLevelClient = restClient.lowLevelClient!!

var defaultSecuritySetupCompleted = false
companion object {
const val FS_SNAPSHOT_REPO = "repl_repo"
}
}

companion object {
@@ -253,7 +256,33 @@ abstract class MultiClusterRestTestCase : OpenSearchTestCase() {
*/
@Before
fun setup() {
testClusters.values.forEach { if(it.securityEnabled && !it.defaultSecuritySetupCompleted) setupDefaultSecurityRoles(it) }
testClusters.values.forEach {
registerSnapshotRepository(it)
if(it.securityEnabled && !it.defaultSecuritySetupCompleted)
setupDefaultSecurityRoles(it)
}
}

/**
* Register snapshot repo - "fs" type on all the clusters
*/
private fun registerSnapshotRepository(testCluster: TestCluster) {
val getResponse: Map<String, Any> = OpenSearchRestTestCase.entityAsMap(testCluster.lowLevelClient.performRequest(
Request("GET", "/_cluster/settings?include_defaults=true&flat_settings=true")))
val configuredRepositories = (getResponse["defaults"] as Map<*, *>)["path.repo"] as List<*>
if(configuredRepositories.isEmpty()) {
return
}
val repo = configuredRepositories[0] as String
val repoConfig = """
{
"type": "fs",
"settings": {
"location": "$repo"
}
}
""".trimIndent()
triggerRequest(testCluster.lowLevelClient, "PUT", "_snapshot/${TestCluster.FS_SNAPSHOT_REPO}", repoConfig)
}

/**
Original file line number Diff line number Diff line change
@@ -17,22 +17,30 @@ import org.opensearch.replication.MultiClusterRestTestCase
import org.opensearch.replication.StartReplicationRequest
import org.opensearch.replication.startReplication
import org.opensearch.replication.stopReplication
import org.opensearch.replication.replicationStatus
import org.opensearch.replication.getShardReplicationTasks
import org.opensearch.replication.`validate status syncing response`
import org.apache.http.util.EntityUtils
import org.assertj.core.api.Assertions.assertThat
import org.assertj.core.api.Assertions.assertThatThrownBy
import org.junit.Assert
import org.opensearch.OpenSearchStatusException
import org.opensearch.action.admin.cluster.settings.ClusterUpdateSettingsRequest
import org.opensearch.action.admin.cluster.snapshots.create.CreateSnapshotRequest
import org.opensearch.action.admin.cluster.snapshots.restore.RestoreSnapshotRequest
import org.opensearch.action.admin.cluster.snapshots.status.SnapshotsStatusRequest
import org.opensearch.action.index.IndexRequest
import org.opensearch.client.Request
import org.opensearch.client.RequestOptions
import org.opensearch.client.ResponseException
import org.opensearch.client.indices.CreateIndexRequest
import org.opensearch.client.indices.GetIndexRequest
import org.opensearch.cluster.SnapshotsInProgress
import org.opensearch.cluster.metadata.IndexMetadata
import org.opensearch.common.settings.Settings
import org.opensearch.common.unit.TimeValue
import org.opensearch.index.mapper.MapperService
import org.opensearch.test.OpenSearchTestCase.assertBusy
import java.util.Random
import java.util.concurrent.TimeUnit


@@ -234,4 +242,76 @@ class StopReplicationIT: MultiClusterRestTestCase() {
val sourceMap = mapOf("name" to randomAlphaOfLength(5))
followerClient.index(IndexRequest(followerIndexName).id("2").source(sourceMap), RequestOptions.DEFAULT)
}

fun `test stop replication with stale replication settings at leader cluster`() {
val followerClient = getClientForCluster(FOLLOWER)
val leaderClient = getClientForCluster(LEADER)
createConnectionBetweenClusters(FOLLOWER, LEADER, "source")

val createIndexResponse = leaderClient.indices().create(CreateIndexRequest(leaderIndexName), RequestOptions.DEFAULT)
assertThat(createIndexResponse.isAcknowledged).isTrue()
val snapshotSuffix = Random().nextInt(1000).toString()

try {
followerClient.startReplication(
StartReplicationRequest("source", leaderIndexName, followerIndexName),
TimeValue.timeValueSeconds(10),
true
)

assertBusy({
var statusResp = followerClient.replicationStatus(followerIndexName)
`validate status syncing response`(statusResp)
assertThat(followerClient.getShardReplicationTasks(followerIndexName)).isNotEmpty()
}, 60, TimeUnit.SECONDS)

// Trigger snapshot on the follower cluster
val createSnapshotRequest = CreateSnapshotRequest(TestCluster.FS_SNAPSHOT_REPO, "test-$snapshotSuffix")
createSnapshotRequest.waitForCompletion(true)
followerClient.snapshot().create(createSnapshotRequest, RequestOptions.DEFAULT)

assertBusy {
var snapshotStatusResponse = followerClient.snapshot().status(SnapshotsStatusRequest(TestCluster.FS_SNAPSHOT_REPO,
arrayOf("test-$snapshotSuffix")), RequestOptions.DEFAULT)
for (snapshotStatus in snapshotStatusResponse.snapshots) {
Assert.assertEquals(SnapshotsInProgress.State.SUCCESS, snapshotStatus.state)
}
}

// Restore follower index on leader cluster
val restoreSnapshotRequest = RestoreSnapshotRequest(TestCluster.FS_SNAPSHOT_REPO, "test-$snapshotSuffix")
restoreSnapshotRequest.indices(followerIndexName)
restoreSnapshotRequest.waitForCompletion(true)
restoreSnapshotRequest.renamePattern("(.+)")
restoreSnapshotRequest.renameReplacement("restored-\$1")
leaderClient.snapshot().restore(restoreSnapshotRequest, RequestOptions.DEFAULT)

assertBusy {
assertThat(leaderClient.indices().exists(GetIndexRequest("restored-$followerIndexName"), RequestOptions.DEFAULT)).isEqualTo(true)
}

// Invoke stop on the new leader cluster index
assertThatThrownBy { leaderClient.stopReplication("restored-$followerIndexName") }
.isInstanceOf(ResponseException::class.java)
.hasMessageContaining("Metadata for restored-$followerIndexName doesn't exist")

// Start replication on the new leader index
followerClient.startReplication(
StartReplicationRequest("source", "restored-$followerIndexName", "restored-$followerIndexName"),
TimeValue.timeValueSeconds(10),
true, true
)

assertBusy({
var statusResp = followerClient.replicationStatus("restored-$followerIndexName")
`validate status syncing response`(statusResp)
assertThat(followerClient.getShardReplicationTasks("restored-$followerIndexName")).isNotEmpty()
}, 60, TimeUnit.SECONDS)

} finally {
followerClient.stopReplication("restored-$followerIndexName")
followerClient.stopReplication(followerIndexName)
}

}
}

0 comments on commit 78fead7

Please sign in to comment.