Skip to content

Commit

Permalink
Initialize the leaderCheckpoint with follower shard's localCheckpoint
Browse files Browse the repository at this point in the history
Signed-off-by: Ankit Kala <[email protected]>
  • Loading branch information
ankitkala committed Jun 2, 2023
1 parent f5c94f7 commit 236c860
Show file tree
Hide file tree
Showing 6 changed files with 137 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -114,8 +114,8 @@ class RemoteClusterRestoreLeaderService @Inject constructor(private val indicesS
var fromSeqNo = RetentionLeaseActions.RETAIN_ALL

// Adds the retention lease for fromSeqNo for the next stage of the replication.
retentionLeaseHelper.addRetentionLease(request.leaderShardId, fromSeqNo,
request.followerShardId, RemoteClusterRepository.REMOTE_CLUSTER_REPO_REQ_TIMEOUT_IN_MILLI_SEC)
retentionLeaseHelper.addRetentionLease(request.leaderShardId, fromSeqNo, request.followerShardId,
RemoteClusterRepository.REMOTE_CLUSTER_REPO_REQ_TIMEOUT_IN_MILLI_SEC)

/**
* At this point, it should be safe to release retention lock as the retention lease
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import org.opensearch.index.seqno.RetentionLeaseActions
import org.opensearch.index.seqno.RetentionLeaseAlreadyExistsException
import org.opensearch.index.seqno.RetentionLeaseInvalidRetainingSeqNoException
import org.opensearch.index.seqno.RetentionLeaseNotFoundException
import org.opensearch.index.shard.IndexShard
import org.opensearch.index.shard.ShardId
import org.opensearch.replication.metadata.store.ReplicationMetadata
import org.opensearch.replication.repository.RemoteClusterRepository
Expand Down Expand Up @@ -175,22 +176,47 @@ class RemoteClusterRetentionLeaseHelper constructor(var followerClusterNameWithU
}
}

public fun attemptRetentionLeaseRemoval(leaderShardId: ShardId, followerShardId: ShardId, timeout: Long) {
val retentionLeaseId = retentionLeaseIdForShard(followerClusterNameWithUUID, followerShardId)
val request = RetentionLeaseActions.RemoveRequest(leaderShardId, retentionLeaseId)
try {
client.execute(RetentionLeaseActions.Remove.INSTANCE, request).actionGet(timeout)
log.info("Removed retention lease with id - $retentionLeaseId")
} catch(e: RetentionLeaseNotFoundException) {
// log error and bail
log.error(e.stackTraceToString())
} catch (e: Exception) {
// We are not bubbling up the exception as the stop action/ task cleanup should succeed
// even if we fail to remove the retention lease from leader cluster
log.error("Exception in removing retention lease", e)
}
}


/**
* Remove these once the callers are moved to above APIs
*/
public fun addRetentionLease(leaderShardId: ShardId, seqNo: Long,
followerShardId: ShardId, timeout: Long) {
followerShardId: ShardId, timeout: Long) {
val retentionLeaseId = retentionLeaseIdForShard(followerClusterNameWithUUID, followerShardId)
val request = RetentionLeaseActions.AddRequest(leaderShardId, retentionLeaseId, seqNo, retentionLeaseSource)
try {
client.execute(RetentionLeaseActions.Add.INSTANCE, request).actionGet(timeout)
} catch (e: RetentionLeaseAlreadyExistsException) {
log.error(e.stackTraceToString())
log.info("Renew retention lease as it already exists $retentionLeaseId with $seqNo")
// Only one retention lease should exists for the follower shard
// Ideally, this should have got cleaned-up
renewRetentionLease(leaderShardId, seqNo, followerShardId, timeout)
var canRetry = true
while (true) {
try {
log.info("Adding retention lease $retentionLeaseId")
client.execute(RetentionLeaseActions.Add.INSTANCE, request).actionGet(timeout)
break
} catch (e: RetentionLeaseAlreadyExistsException) {
log.info("Found a stale retention lease $retentionLeaseId on leader.")
if (canRetry) {
canRetry = false
attemptRetentionLeaseRemoval(leaderShardId, followerShardId, timeout)
log.info("Cleared stale retention lease $retentionLeaseId on leader. Retrying...")
} else {
log.error(e.stackTraceToString())
throw e
}
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -290,7 +290,7 @@ open class IndexReplicationTask(id: Long, type: String, action: String, descript
private suspend fun pollShardTaskStatus(): IndexReplicationState {
val failedShardTasks = findAllReplicationFailedShardTasks(followerIndexName, clusterService.state())
if (failedShardTasks.isNotEmpty()) {
log.info("Failed shard tasks - ", failedShardTasks)
log.info("Failed shard tasks - $failedShardTasks")
var msg = ""
for ((shard, task) in failedShardTasks) {
val taskState = task.state
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -218,6 +218,10 @@ class ShardReplicationTask(id: Long, type: String, action: String, description:

val changeTracker = ShardReplicationChangesTracker(indexShard, replicationSettings)
followerClusterStats.stats[followerShardId]!!.followerCheckpoint = indexShard.localCheckpoint
// In case the shard task starts on a new node and there are no active writes on the leader shard, leader checkpoint
// never gets initialized and defaults to 0. To get around this, we set the leaderCheckpoint to follower shard's
// localCheckpoint as the leader shard is guaranteed to equal or more.
followerClusterStats.stats[followerShardId]!!.leaderCheckpoint = indexShard.localCheckpoint
coroutineScope {
while (isActive) {
rateLimiter.acquire()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@

package org.opensearch.replication

import com.nhaarman.mockitokotlin2.stub
import org.opensearch.replication.MultiClusterAnnotations.ClusterConfiguration
import org.opensearch.replication.MultiClusterAnnotations.ClusterConfigurations
import org.opensearch.replication.MultiClusterAnnotations.getAnnotationsFromClass
Expand Down Expand Up @@ -516,6 +515,28 @@ abstract class MultiClusterRestTestCase : OpenSearchTestCase() {
return OpenSearchRestTestCase.entityAsList(client.performRequest(Request("GET", endpoint)))
}

protected fun deleteConnection(fromClusterName: String, connectionName: String="source") {
val fromCluster = getNamedCluster(fromClusterName)
val persistentConnectionRequest = Request("PUT", "_cluster/settings")

val entityAsString = """
{
"persistent": {
"cluster": {
"remote": {
"$connectionName": {
"seeds": null
}
}
}
}
}""".trimMargin()

persistentConnectionRequest.entity = StringEntity(entityAsString, ContentType.APPLICATION_JSON)
val persistentConnectionResponse = fromCluster.lowLevelClient.performRequest(persistentConnectionRequest)
assertEquals(HttpStatus.SC_OK.toLong(), persistentConnectionResponse.statusLine.statusCode.toLong())
}

protected fun createConnectionBetweenClusters(fromClusterName: String, toClusterName: String, connectionName: String="source") {
val toCluster = getNamedCluster(toClusterName)
val fromCluster = getNamedCluster(fromClusterName)
Expand Down Expand Up @@ -646,5 +667,16 @@ abstract class MultiClusterRestTestCase : OpenSearchTestCase() {
return integTestRemote.equals("true")
}

protected fun docCount(cluster: RestHighLevelClient, indexName: String) : Int {
val persistentConnectionRequest = Request("GET", "/$indexName/_search?pretty&q=*")

val persistentConnectionResponse = cluster.lowLevelClient.performRequest(persistentConnectionRequest)
val statusResponse: Map<String, Map<String, Map<String, Any>>> = OpenSearchRestTestCase.entityAsMap(persistentConnectionResponse) as Map<String, Map<String, Map<String, String>>>
return statusResponse["hits"]?.get("total")?.get("value") as Int
}

protected fun deleteIndex(testCluster: RestHighLevelClient, indexName: String) {
testCluster.lowLevelClient.performRequest(Request("DELETE", indexName))
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
package org.opensearch.replication.integ.rest

import org.opensearch.replication.MultiClusterRestTestCase
import org.opensearch.replication.MultiClusterAnnotations
import org.opensearch.replication.StartReplicationRequest
import org.opensearch.replication.startReplication
import org.opensearch.replication.stopReplication
import org.assertj.core.api.Assertions
import org.opensearch.client.RequestOptions
import org.opensearch.client.indices.CreateIndexRequest
import org.junit.Assert
import java.util.concurrent.TimeUnit


@MultiClusterAnnotations.ClusterConfigurations(
MultiClusterAnnotations.ClusterConfiguration(clusterName = LEADER),
MultiClusterAnnotations.ClusterConfiguration(clusterName = FOLLOWER)
)

class ReplicationStopThenRestartIT : MultiClusterRestTestCase() {
private val leaderIndexName = "leader_index"
private val followerIndexName = "follower_index"

fun `test replication works after unclean stop and start`() {
val followerClient = getClientForCluster(FOLLOWER)
val leaderClient = getClientForCluster(LEADER)
changeTemplate(LEADER)
createConnectionBetweenClusters(FOLLOWER, LEADER)
val createIndexResponse = leaderClient.indices().create(CreateIndexRequest(leaderIndexName), RequestOptions.DEFAULT)
Assertions.assertThat(createIndexResponse.isAcknowledged).isTrue()
followerClient.startReplication(StartReplicationRequest("source", leaderIndexName, followerIndexName))
insertDocToIndex(LEADER, "1", "dummy data 1",leaderIndexName)
insertDocToIndex(LEADER, "2", "dummy data 1",leaderIndexName)

assertBusy ({
try {
Assert.assertEquals(2, docCount(followerClient, followerIndexName))
} catch (ex: Exception) {
ex.printStackTrace();
Assert.fail("Exception while querying follower cluster. Failing to retry again {}")
}
}, 1, TimeUnit.MINUTES)


deleteConnection(FOLLOWER)
followerClient.stopReplication(followerIndexName, shouldWait = true)
deleteIndex(followerClient, followerIndexName)

createConnectionBetweenClusters(FOLLOWER, LEADER)
insertDocToIndex(LEADER, "3", "dummy data 1",leaderIndexName)
insertDocToIndex(LEADER, "4", "dummy data 1",leaderIndexName)
followerClient.startReplication(StartReplicationRequest("source", leaderIndexName, followerIndexName))

assertBusy ({
try {
Assert.assertEquals(4, docCount(followerClient, followerIndexName))
} catch (ex: Exception) {
Assert.fail("Exception while querying follower cluster. Failing to retry again")
}
}, 1, TimeUnit.MINUTES)
}
}

0 comments on commit 236c860

Please sign in to comment.