Skip to content

Commit

Permalink
Initialize the leaderCheckpoint with follower shard's localCheckpoint
Browse files Browse the repository at this point in the history
Signed-off-by: Ankit Kala <[email protected]>
  • Loading branch information
ankitkala committed May 29, 2023
1 parent f5c94f7 commit df94cdd
Show file tree
Hide file tree
Showing 6 changed files with 114 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ class RemoteClusterRestoreLeaderService @Inject constructor(private val indicesS
var fromSeqNo = RetentionLeaseActions.RETAIN_ALL

// Adds the retention lease for fromSeqNo for the next stage of the replication.
retentionLeaseHelper.addRetentionLease(request.leaderShardId, fromSeqNo,
retentionLeaseHelper.addRetentionLease(leaderIndexShard, request.leaderShardId, fromSeqNo,
request.followerShardId, RemoteClusterRepository.REMOTE_CLUSTER_REPO_REQ_TIMEOUT_IN_MILLI_SEC)

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import org.opensearch.index.seqno.RetentionLeaseActions
import org.opensearch.index.seqno.RetentionLeaseAlreadyExistsException
import org.opensearch.index.seqno.RetentionLeaseInvalidRetainingSeqNoException
import org.opensearch.index.seqno.RetentionLeaseNotFoundException
import org.opensearch.index.shard.IndexShard
import org.opensearch.index.shard.ShardId
import org.opensearch.replication.metadata.store.ReplicationMetadata
import org.opensearch.replication.repository.RemoteClusterRepository
Expand Down Expand Up @@ -179,18 +180,24 @@ class RemoteClusterRetentionLeaseHelper constructor(var followerClusterNameWithU
/**
* Remove these once the callers are moved to above APIs
*/
public fun addRetentionLease(leaderShardId: ShardId, seqNo: Long,
public fun addRetentionLease(leaderIndexShard: IndexShard, leaderShardId: ShardId, seqNo: Long,
followerShardId: ShardId, timeout: Long) {
val retentionLeaseId = retentionLeaseIdForShard(followerClusterNameWithUUID, followerShardId)
val request = RetentionLeaseActions.AddRequest(leaderShardId, retentionLeaseId, seqNo, retentionLeaseSource)
try {
client.execute(RetentionLeaseActions.Add.INSTANCE, request).actionGet(timeout)
} catch (e: RetentionLeaseAlreadyExistsException) {
log.error(e.stackTraceToString())
log.info("Renew retention lease as it already exists $retentionLeaseId with $seqNo")
// Only one retention lease should exists for the follower shard
// Ideally, this should have got cleaned-up
renewRetentionLease(leaderShardId, seqNo, followerShardId, timeout)
// Only one retention lease should exists for the follower shard and this should've cleared up.
// But if the retention lease still exists, renew with RETAIN_ALL can still fail if the last retention lease
// was at higher sequence number than the minRetainedSequenceNumber(i.e. RETAIN_ALL).
// To get around this, we always renew the existing retention lease with same sequence number to increase the
// expiry time.
val retainedSequenceNumber = leaderIndexShard.retentionLeases.leases().filter { lease ->
lease.id().equals(retentionLeaseId)
}.first().retainingSequenceNumber()
log.info("Renew retention lease as it already exists $retentionLeaseId with $retainedSequenceNumber")
renewRetentionLease(leaderShardId, retainedSequenceNumber, followerShardId, timeout)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -290,7 +290,7 @@ open class IndexReplicationTask(id: Long, type: String, action: String, descript
private suspend fun pollShardTaskStatus(): IndexReplicationState {
val failedShardTasks = findAllReplicationFailedShardTasks(followerIndexName, clusterService.state())
if (failedShardTasks.isNotEmpty()) {
log.info("Failed shard tasks - ", failedShardTasks)
log.info("Failed shard tasks - {}", failedShardTasks)
var msg = ""
for ((shard, task) in failedShardTasks) {
val taskState = task.state
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -218,6 +218,10 @@ class ShardReplicationTask(id: Long, type: String, action: String, description:

val changeTracker = ShardReplicationChangesTracker(indexShard, replicationSettings)
followerClusterStats.stats[followerShardId]!!.followerCheckpoint = indexShard.localCheckpoint
// In case the shard task starts on a new node and there are no active writes on the leader shard, leader checkpoint
// never gets initialized and defaults to 0. To get around this, we set the leaderCheckpoint to follower shard's
// localCheckpoint as the leader shard is guaranteed to equal or more.
followerClusterStats.stats[followerShardId]!!.leaderCheckpoint = indexShard.localCheckpoint
coroutineScope {
while (isActive) {
rateLimiter.acquire()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@

package org.opensearch.replication

import com.nhaarman.mockitokotlin2.stub
import org.opensearch.replication.MultiClusterAnnotations.ClusterConfiguration
import org.opensearch.replication.MultiClusterAnnotations.ClusterConfigurations
import org.opensearch.replication.MultiClusterAnnotations.getAnnotationsFromClass
Expand Down Expand Up @@ -516,6 +515,28 @@ abstract class MultiClusterRestTestCase : OpenSearchTestCase() {
return OpenSearchRestTestCase.entityAsList(client.performRequest(Request("GET", endpoint)))
}

protected fun deleteConnection(fromClusterName: String, connectionName: String="source") {
val fromCluster = getNamedCluster(fromClusterName)
val persistentConnectionRequest = Request("PUT", "_cluster/settings")

val entityAsString = """
{
"persistent": {
"cluster": {
"remote": {
"$connectionName": {
"seeds": null
}
}
}
}
}""".trimMargin()

persistentConnectionRequest.entity = StringEntity(entityAsString, ContentType.APPLICATION_JSON)
val persistentConnectionResponse = fromCluster.lowLevelClient.performRequest(persistentConnectionRequest)
assertEquals(HttpStatus.SC_OK.toLong(), persistentConnectionResponse.statusLine.statusCode.toLong())
}

protected fun createConnectionBetweenClusters(fromClusterName: String, toClusterName: String, connectionName: String="source") {
val toCluster = getNamedCluster(toClusterName)
val fromCluster = getNamedCluster(fromClusterName)
Expand Down Expand Up @@ -646,5 +667,16 @@ abstract class MultiClusterRestTestCase : OpenSearchTestCase() {
return integTestRemote.equals("true")
}

protected fun docCount(cluster: RestHighLevelClient, indexName: String) : Int {
val persistentConnectionRequest = Request("GET", "/$indexName/_search?pretty&q=*")

val persistentConnectionResponse = cluster.lowLevelClient.performRequest(persistentConnectionRequest)
val statusResponse: Map<String, Map<String, Map<String, Any>>> = OpenSearchRestTestCase.entityAsMap(persistentConnectionResponse) as Map<String, Map<String, Map<String, String>>>
return statusResponse["hits"]?.get("total")?.get("value") as Int
}

protected fun deleteIndex(testCluster: RestHighLevelClient, indexName: String) {
testCluster.lowLevelClient.performRequest(Request("DELETE", indexName))
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
package org.opensearch.replication.integ.rest

import org.opensearch.replication.MultiClusterRestTestCase
import org.opensearch.replication.MultiClusterAnnotations
import org.opensearch.replication.StartReplicationRequest
import org.opensearch.replication.startReplication
import org.opensearch.replication.stopReplication
import org.assertj.core.api.Assertions
import org.opensearch.client.RequestOptions
import org.opensearch.client.indices.CreateIndexRequest
import org.junit.Assert
import java.util.concurrent.TimeUnit


@MultiClusterAnnotations.ClusterConfigurations(
MultiClusterAnnotations.ClusterConfiguration(clusterName = LEADER),
MultiClusterAnnotations.ClusterConfiguration(clusterName = FOLLOWER)
)

class ReplicationStopThenRestartIT : MultiClusterRestTestCase() {
private val leaderIndexName = "leader_index"
private val followerIndexName = "follower_index"

fun `test replication works after unclean stop and start`() {
val followerClient = getClientForCluster(FOLLOWER)
val leaderClient = getClientForCluster(LEADER)
changeTemplate(LEADER)
createConnectionBetweenClusters(FOLLOWER, LEADER)
val createIndexResponse = leaderClient.indices().create(CreateIndexRequest(leaderIndexName), RequestOptions.DEFAULT)
Assertions.assertThat(createIndexResponse.isAcknowledged).isTrue()
followerClient.startReplication(StartReplicationRequest("source", leaderIndexName, followerIndexName))
insertDocToIndex(LEADER, "1", "dummy data 1",leaderIndexName)
insertDocToIndex(LEADER, "2", "dummy data 1",leaderIndexName)

assertBusy ({
try {
Assert.assertEquals(2, docCount(followerClient, followerIndexName))
} catch (ex: Exception) {
//Querying ES cluster throws random exceptions like ClusterManagerNotDiscovered or ShardsFailed etc, so catching them and retrying
logger.error("ankikala Exception {}", ex)
Assert.fail("Exception while querying follower cluster. Failing to retry again {}")
}
}, 1, TimeUnit.MINUTES)


deleteConnection(FOLLOWER)
followerClient.stopReplication(followerIndexName, shouldWait = true)
deleteIndex(followerClient, followerIndexName)

createConnectionBetweenClusters(FOLLOWER, LEADER)
insertDocToIndex(LEADER, "3", "dummy data 1",leaderIndexName)
insertDocToIndex(LEADER, "4", "dummy data 1",leaderIndexName)
followerClient.startReplication(StartReplicationRequest("source", leaderIndexName, followerIndexName))

assertBusy ({
try {
Assert.assertEquals(4, docCount(followerClient, followerIndexName))
} catch (ex: Exception) {
Assert.fail("Exception while querying follower cluster. Failing to retry again")
}
}, 1, TimeUnit.MINUTES)
}
}

0 comments on commit df94cdd

Please sign in to comment.