Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Backport] Initialize the leaderCheckpoint with follower shard's localCheckpoint… #964

Merged
merged 1 commit into from
Jun 2, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -115,8 +115,8 @@ class RemoteClusterRestoreLeaderService @Inject constructor(private val indicesS
var fromSeqNo = RetentionLeaseActions.RETAIN_ALL

// Adds the retention lease for fromSeqNo for the next stage of the replication.
retentionLeaseHelper.addRetentionLease(request.leaderShardId, fromSeqNo,
request.followerShardId, RemoteClusterRepository.REMOTE_CLUSTER_REPO_REQ_TIMEOUT_IN_MILLI_SEC)
retentionLeaseHelper.addRetentionLease(request.leaderShardId, fromSeqNo, request.followerShardId,
RemoteClusterRepository.REMOTE_CLUSTER_REPO_REQ_TIMEOUT_IN_MILLI_SEC)

/**
* At this point, it should be safe to release retention lock as the retention lease
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import org.opensearch.index.seqno.RetentionLeaseActions
import org.opensearch.index.seqno.RetentionLeaseAlreadyExistsException
import org.opensearch.index.seqno.RetentionLeaseInvalidRetainingSeqNoException
import org.opensearch.index.seqno.RetentionLeaseNotFoundException
import org.opensearch.index.shard.IndexShard
import org.opensearch.index.shard.ShardId
import org.opensearch.replication.metadata.store.ReplicationMetadata
import org.opensearch.replication.repository.RemoteClusterRepository
Expand Down Expand Up @@ -175,22 +176,47 @@ class RemoteClusterRetentionLeaseHelper constructor(var followerClusterNameWithU
}
}

public fun attemptRetentionLeaseRemoval(leaderShardId: ShardId, followerShardId: ShardId, timeout: Long) {
val retentionLeaseId = retentionLeaseIdForShard(followerClusterNameWithUUID, followerShardId)
val request = RetentionLeaseActions.RemoveRequest(leaderShardId, retentionLeaseId)
try {
client.execute(RetentionLeaseActions.Remove.INSTANCE, request).actionGet(timeout)
log.info("Removed retention lease with id - $retentionLeaseId")
} catch(e: RetentionLeaseNotFoundException) {
// log error and bail
log.error(e.stackTraceToString())
} catch (e: Exception) {
// We are not bubbling up the exception as the stop action/ task cleanup should succeed
// even if we fail to remove the retention lease from leader cluster
log.error("Exception in removing retention lease", e)
}
}


/**
* Remove these once the callers are moved to above APIs
*/
public fun addRetentionLease(leaderShardId: ShardId, seqNo: Long,
followerShardId: ShardId, timeout: Long) {
followerShardId: ShardId, timeout: Long) {
val retentionLeaseId = retentionLeaseIdForShard(followerClusterNameWithUUID, followerShardId)
val request = RetentionLeaseActions.AddRequest(leaderShardId, retentionLeaseId, seqNo, retentionLeaseSource)
try {
client.execute(RetentionLeaseActions.Add.INSTANCE, request).actionGet(timeout)
} catch (e: RetentionLeaseAlreadyExistsException) {
log.error("${e.message}")
log.info("Renew retention lease as it already exists $retentionLeaseId with $seqNo")
// Only one retention lease should exists for the follower shard
// Ideally, this should have got cleaned-up
renewRetentionLease(leaderShardId, seqNo, followerShardId, timeout)
var canRetry = true
while (true) {
try {
log.info("Adding retention lease $retentionLeaseId")
client.execute(RetentionLeaseActions.Add.INSTANCE, request).actionGet(timeout)
break
} catch (e: RetentionLeaseAlreadyExistsException) {
log.info("Found a stale retention lease $retentionLeaseId on leader.")
if (canRetry) {
canRetry = false
attemptRetentionLeaseRemoval(leaderShardId, followerShardId, timeout)
log.info("Cleared stale retention lease $retentionLeaseId on leader. Retrying...")
} else {
log.error(e.stackTraceToString())
throw e
}
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -289,7 +289,7 @@ open class IndexReplicationTask(id: Long, type: String, action: String, descript
private suspend fun pollShardTaskStatus(): IndexReplicationState {
val failedShardTasks = findAllReplicationFailedShardTasks(followerIndexName, clusterService.state())
if (failedShardTasks.isNotEmpty()) {
log.info("Failed shard tasks - ", failedShardTasks)
log.info("Failed shard tasks - $failedShardTasks")
var msg = ""
for ((shard, task) in failedShardTasks) {
val taskState = task.state
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -218,6 +218,10 @@ class ShardReplicationTask(id: Long, type: String, action: String, description:

val changeTracker = ShardReplicationChangesTracker(indexShard, replicationSettings)
followerClusterStats.stats[followerShardId]!!.followerCheckpoint = indexShard.localCheckpoint
// In case the shard task starts on a new node and there are no active writes on the leader shard, leader checkpoint
// never gets initialized and defaults to 0. To get around this, we set the leaderCheckpoint to follower shard's
// localCheckpoint as the leader shard is guaranteed to equal or more.
followerClusterStats.stats[followerShardId]!!.leaderCheckpoint = indexShard.localCheckpoint
coroutineScope {
while (isActive) {
rateLimiter.acquire()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@

package org.opensearch.replication

import com.nhaarman.mockitokotlin2.stub
import org.opensearch.replication.MultiClusterAnnotations.ClusterConfiguration
import org.opensearch.replication.MultiClusterAnnotations.ClusterConfigurations
import org.opensearch.replication.MultiClusterAnnotations.getAnnotationsFromClass
Expand All @@ -21,6 +20,7 @@ import org.apache.http.HttpHost
import org.apache.http.HttpStatus
import org.apache.http.client.config.RequestConfig
import org.apache.http.entity.ContentType
import org.apache.http.entity.StringEntity
import org.apache.http.impl.nio.client.HttpAsyncClientBuilder
import org.apache.http.message.BasicHeader
import org.apache.http.nio.conn.ssl.SSLIOSessionStrategy
Expand Down Expand Up @@ -498,6 +498,32 @@ abstract class MultiClusterRestTestCase : OpenSearchTestCase() {
return OpenSearchRestTestCase.entityAsMap(client.performRequest(Request("GET", endpoint)))
}

fun getAsList(client: RestClient, endpoint: String): List<Any> {
return OpenSearchRestTestCase.entityAsList(client.performRequest(Request("GET", endpoint)))
}

protected fun deleteConnection(fromClusterName: String, connectionName: String="source") {
val fromCluster = getNamedCluster(fromClusterName)
val persistentConnectionRequest = Request("PUT", "_cluster/settings")

val entityAsString = """
{
"persistent": {
"cluster": {
"remote": {
"$connectionName": {
"seeds": null
}
}
}
}
}""".trimMargin()

persistentConnectionRequest.entity = StringEntity(entityAsString, ContentType.APPLICATION_JSON)
val persistentConnectionResponse = fromCluster.lowLevelClient.performRequest(persistentConnectionRequest)
assertEquals(HttpStatus.SC_OK.toLong(), persistentConnectionResponse.statusLine.statusCode.toLong())
}

protected fun createConnectionBetweenClusters(fromClusterName: String, toClusterName: String, connectionName: String="source") {
val toCluster = getNamedCluster(toClusterName)
val fromCluster = getNamedCluster(fromClusterName)
Expand Down Expand Up @@ -629,4 +655,17 @@ abstract class MultiClusterRestTestCase : OpenSearchTestCase() {
updateSettingsRequest.transientSettings(Collections.singletonMap<String, String?>(ReplicationPlugin.REPLICATION_METADATA_SYNC_INTERVAL.key, "5s"))
followerClient.cluster().putSettings(updateSettingsRequest, RequestOptions.DEFAULT)
}

protected fun docCount(cluster: RestHighLevelClient, indexName: String) : Int {
val persistentConnectionRequest = Request("GET", "/$indexName/_search?pretty&q=*")

val persistentConnectionResponse = cluster.lowLevelClient.performRequest(persistentConnectionRequest)
val statusResponse: Map<String, Map<String, Map<String, Any>>> = OpenSearchRestTestCase.entityAsMap(persistentConnectionResponse) as Map<String, Map<String, Map<String, String>>>
return statusResponse["hits"]?.get("total")?.get("value") as Int
}

protected fun deleteIndex(testCluster: RestHighLevelClient, indexName: String) {
testCluster.lowLevelClient.performRequest(Request("DELETE", indexName))
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
package org.opensearch.replication.integ.rest

import org.opensearch.replication.MultiClusterRestTestCase
import org.opensearch.replication.MultiClusterAnnotations
import org.opensearch.replication.StartReplicationRequest
import org.opensearch.replication.startReplication
import org.opensearch.replication.stopReplication
import org.assertj.core.api.Assertions
import org.opensearch.client.RequestOptions
import org.opensearch.client.indices.CreateIndexRequest
import org.junit.Assert
import java.util.concurrent.TimeUnit


@MultiClusterAnnotations.ClusterConfigurations(
MultiClusterAnnotations.ClusterConfiguration(clusterName = LEADER),
MultiClusterAnnotations.ClusterConfiguration(clusterName = FOLLOWER)
)

class ReplicationStopThenRestartIT : MultiClusterRestTestCase() {
private val leaderIndexName = "leader_index"
private val followerIndexName = "follower_index"

fun `test replication works after unclean stop and start`() {
val followerClient = getClientForCluster(FOLLOWER)
val leaderClient = getClientForCluster(LEADER)
changeTemplate(LEADER)
createConnectionBetweenClusters(FOLLOWER, LEADER)
val createIndexResponse = leaderClient.indices().create(CreateIndexRequest(leaderIndexName), RequestOptions.DEFAULT)
Assertions.assertThat(createIndexResponse.isAcknowledged).isTrue()
followerClient.startReplication(StartReplicationRequest("source", leaderIndexName, followerIndexName))
insertDocToIndex(LEADER, "1", "dummy data 1",leaderIndexName)
insertDocToIndex(LEADER, "2", "dummy data 1",leaderIndexName)

assertBusy ({
try {
Assert.assertEquals(2, docCount(followerClient, followerIndexName))
} catch (ex: Exception) {
ex.printStackTrace();
Assert.fail("Exception while querying follower cluster. Failing to retry again {}")
}
}, 1, TimeUnit.MINUTES)


deleteConnection(FOLLOWER)
followerClient.stopReplication(followerIndexName, shouldWait = true)
deleteIndex(followerClient, followerIndexName)

createConnectionBetweenClusters(FOLLOWER, LEADER)
insertDocToIndex(LEADER, "3", "dummy data 1",leaderIndexName)
insertDocToIndex(LEADER, "4", "dummy data 1",leaderIndexName)
followerClient.startReplication(StartReplicationRequest("source", leaderIndexName, followerIndexName))

assertBusy ({
try {
Assert.assertEquals(4, docCount(followerClient, followerIndexName))
} catch (ex: Exception) {
Assert.fail("Exception while querying follower cluster. Failing to retry again")
}
}, 1, TimeUnit.MINUTES)
}
}