Skip to content

Commit

Permalink
Merge 1842f7c into d6ef8b3
Browse files Browse the repository at this point in the history
  • Loading branch information
ankitkala authored Jun 2, 2023
2 parents d6ef8b3 + 1842f7c commit b34ef06
Show file tree
Hide file tree
Showing 8 changed files with 163 additions and 23 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -114,8 +114,8 @@ class RemoteClusterRestoreLeaderService @Inject constructor(private val indicesS
var fromSeqNo = RetentionLeaseActions.RETAIN_ALL

// Adds the retention lease for fromSeqNo for the next stage of the replication.
retentionLeaseHelper.addRetentionLease(request.leaderShardId, fromSeqNo,
request.followerShardId, RemoteClusterRepository.REMOTE_CLUSTER_REPO_REQ_TIMEOUT_IN_MILLI_SEC)
retentionLeaseHelper.addRetentionLease(request.leaderShardId, fromSeqNo, request.followerShardId,
RemoteClusterRepository.REMOTE_CLUSTER_REPO_REQ_TIMEOUT_IN_MILLI_SEC)

/**
* At this point, it should be safe to release retention lock as the retention lease
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import org.opensearch.index.seqno.RetentionLeaseActions
import org.opensearch.index.seqno.RetentionLeaseAlreadyExistsException
import org.opensearch.index.seqno.RetentionLeaseInvalidRetainingSeqNoException
import org.opensearch.index.seqno.RetentionLeaseNotFoundException
import org.opensearch.index.shard.IndexShard
import org.opensearch.index.shard.ShardId
import org.opensearch.replication.metadata.store.ReplicationMetadata
import org.opensearch.replication.repository.RemoteClusterRepository
Expand Down Expand Up @@ -175,22 +176,47 @@ class RemoteClusterRetentionLeaseHelper constructor(var followerClusterNameWithU
}
}

public fun attemptRetentionLeaseRemoval(leaderShardId: ShardId, followerShardId: ShardId, timeout: Long) {
val retentionLeaseId = retentionLeaseIdForShard(followerClusterNameWithUUID, followerShardId)
val request = RetentionLeaseActions.RemoveRequest(leaderShardId, retentionLeaseId)
try {
client.execute(RetentionLeaseActions.Remove.INSTANCE, request).actionGet(timeout)
log.info("Removed retention lease with id - $retentionLeaseId")
} catch(e: RetentionLeaseNotFoundException) {
// log error and bail
log.error(e.stackTraceToString())
} catch (e: Exception) {
// We are not bubbling up the exception as the stop action/ task cleanup should succeed
// even if we fail to remove the retention lease from leader cluster
log.error("Exception in removing retention lease", e)
}
}


/**
* Remove these once the callers are moved to above APIs
*/
public fun addRetentionLease(leaderShardId: ShardId, seqNo: Long,
followerShardId: ShardId, timeout: Long) {
followerShardId: ShardId, timeout: Long) {
val retentionLeaseId = retentionLeaseIdForShard(followerClusterNameWithUUID, followerShardId)
val request = RetentionLeaseActions.AddRequest(leaderShardId, retentionLeaseId, seqNo, retentionLeaseSource)
try {
client.execute(RetentionLeaseActions.Add.INSTANCE, request).actionGet(timeout)
} catch (e: RetentionLeaseAlreadyExistsException) {
log.error(e.stackTraceToString())
log.info("Renew retention lease as it already exists $retentionLeaseId with $seqNo")
// Only one retention lease should exists for the follower shard
// Ideally, this should have got cleaned-up
renewRetentionLease(leaderShardId, seqNo, followerShardId, timeout)
var canRetry = true
while (true) {
try {
log.info("Adding retention lease $retentionLeaseId")
client.execute(RetentionLeaseActions.Add.INSTANCE, request).actionGet(timeout)
break
} catch (e: RetentionLeaseAlreadyExistsException) {
log.info("Found a stale retention lease $retentionLeaseId on leader.")
if (canRetry) {
canRetry = false
attemptRetentionLeaseRemoval(leaderShardId, followerShardId, timeout)
log.info("Cleared stale retention lease $retentionLeaseId on leader. Retrying...")
} else {
log.error(e.stackTraceToString())
throw e
}
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -290,7 +290,7 @@ open class IndexReplicationTask(id: Long, type: String, action: String, descript
private suspend fun pollShardTaskStatus(): IndexReplicationState {
val failedShardTasks = findAllReplicationFailedShardTasks(followerIndexName, clusterService.state())
if (failedShardTasks.isNotEmpty()) {
log.info("Failed shard tasks - ", failedShardTasks)
log.info("Failed shard tasks - $failedShardTasks")
var msg = ""
for ((shard, task) in failedShardTasks) {
val taskState = task.state
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -217,6 +217,11 @@ class ShardReplicationTask(id: Long, type: String, action: String, description:
TaskId(clusterService.nodeName, id), client, indexShard.localCheckpoint, followerClusterStats)

val changeTracker = ShardReplicationChangesTracker(indexShard, replicationSettings)
followerClusterStats.stats[followerShardId]!!.followerCheckpoint = indexShard.localCheckpoint
// In case the shard task starts on a new node and there are no active writes on the leader shard, leader checkpoint
// never gets initialized and defaults to 0. To get around this, we set the leaderCheckpoint to follower shard's
// localCheckpoint as the leader shard is guaranteed to equal or more.
followerClusterStats.stats[followerShardId]!!.leaderCheckpoint = indexShard.localCheckpoint
coroutineScope {
while (isActive) {
rateLimiter.acquire()
Expand Down Expand Up @@ -273,7 +278,6 @@ class ShardReplicationTask(id: Long, type: String, action: String, description:
//hence renew retention lease with lastSyncedGlobalCheckpoint + 1 so that any shard that picks up shard replication task has data until then.
try {
retentionLeaseHelper.renewRetentionLease(leaderShardId, indexShard.lastSyncedGlobalCheckpoint + 1, followerShardId)
followerClusterStats.stats[followerShardId]!!.followerCheckpoint = indexShard.lastSyncedGlobalCheckpoint
lastLeaseRenewalMillis = System.currentTimeMillis()
} catch (ex: Exception) {
when (ex) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import org.opensearch.client.Client
import org.opensearch.common.logging.Loggers
import org.opensearch.index.shard.ShardId
import org.opensearch.index.translog.Translog
import org.opensearch.replication.util.indicesService
import org.opensearch.tasks.TaskId
import java.util.ArrayList
import java.util.concurrent.ConcurrentHashMap
Expand Down Expand Up @@ -55,6 +56,9 @@ class TranslogSequencer(scope: CoroutineScope, private val replicationMetadata:
private val log = Loggers.getLogger(javaClass, followerShardId)!!
private val completed = CompletableDeferred<Unit>()

val followerIndexService = indicesService.indexServiceSafe(followerShardId.index)
val indexShard = followerIndexService.getShard(followerShardId.id)

private val sequencer = scope.actor<Unit>(capacity = Channel.UNLIMITED) {
// Exceptions thrown here will mark the channel as failed and the next attempt to send to the channel will
// raise the same exception. See [SendChannel.close] method for details.
Expand Down Expand Up @@ -88,6 +92,7 @@ class TranslogSequencer(scope: CoroutineScope, private val replicationMetadata:
val tookInNanos = System.nanoTime() - relativeStartNanos
followerClusterStats.stats[followerShardId]!!.totalWriteTime.addAndGet(TimeUnit.NANOSECONDS.toMillis(tookInNanos))
followerClusterStats.stats[followerShardId]!!.opsWritten.addAndGet(replayRequest.changes.size.toLong())
followerClusterStats.stats[followerShardId]!!.followerCheckpoint = indexShard.localCheckpoint
}
highWatermark = next.changes.lastOrNull()?.seqNo() ?: highWatermark
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@

package org.opensearch.replication

import com.nhaarman.mockitokotlin2.stub
import org.opensearch.replication.MultiClusterAnnotations.ClusterConfiguration
import org.opensearch.replication.MultiClusterAnnotations.ClusterConfigurations
import org.opensearch.replication.MultiClusterAnnotations.getAnnotationsFromClass
Expand All @@ -21,6 +20,7 @@ import org.apache.http.HttpHost
import org.apache.http.HttpStatus
import org.apache.http.client.config.RequestConfig
import org.apache.http.entity.ContentType
import org.apache.http.entity.StringEntity
import org.apache.http.impl.nio.client.HttpAsyncClientBuilder
import org.apache.http.message.BasicHeader
import org.apache.http.nio.conn.ssl.SSLIOSessionStrategy
Expand Down Expand Up @@ -512,6 +512,28 @@ abstract class MultiClusterRestTestCase : OpenSearchTestCase() {
return OpenSearchRestTestCase.entityAsList(client.performRequest(Request("GET", endpoint)))
}

protected fun deleteConnection(fromClusterName: String, connectionName: String="source") {
val fromCluster = getNamedCluster(fromClusterName)
val persistentConnectionRequest = Request("PUT", "_cluster/settings")

val entityAsString = """
{
"persistent": {
"cluster": {
"remote": {
"$connectionName": {
"seeds": null
}
}
}
}
}""".trimMargin()

persistentConnectionRequest.entity = StringEntity(entityAsString, ContentType.APPLICATION_JSON)
val persistentConnectionResponse = fromCluster.lowLevelClient.performRequest(persistentConnectionRequest)
assertEquals(HttpStatus.SC_OK.toLong(), persistentConnectionResponse.statusLine.statusCode.toLong())
}

protected fun createConnectionBetweenClusters(fromClusterName: String, toClusterName: String, connectionName: String="source") {
val toCluster = getNamedCluster(toClusterName)
val fromCluster = getNamedCluster(fromClusterName)
Expand Down Expand Up @@ -642,5 +664,16 @@ abstract class MultiClusterRestTestCase : OpenSearchTestCase() {
return integTestRemote.equals("true")
}

protected fun docCount(cluster: RestHighLevelClient, indexName: String) : Int {
val persistentConnectionRequest = Request("GET", "/$indexName/_search?pretty&q=*")

val persistentConnectionResponse = cluster.lowLevelClient.performRequest(persistentConnectionRequest)
val statusResponse: Map<String, Map<String, Map<String, Any>>> = OpenSearchRestTestCase.entityAsMap(persistentConnectionResponse) as Map<String, Map<String, Map<String, String>>>
return statusResponse["hits"]?.get("total")?.get("value") as Int
}

protected fun deleteIndex(testCluster: RestHighLevelClient, indexName: String) {
testCluster.lowLevelClient.performRequest(Request("DELETE", indexName))
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
package org.opensearch.replication.integ.rest

import org.opensearch.replication.MultiClusterRestTestCase
import org.opensearch.replication.MultiClusterAnnotations
import org.opensearch.replication.StartReplicationRequest
import org.opensearch.replication.startReplication
import org.opensearch.replication.stopReplication
import org.assertj.core.api.Assertions
import org.opensearch.client.RequestOptions
import org.opensearch.client.indices.CreateIndexRequest
import org.junit.Assert
import java.util.concurrent.TimeUnit


@MultiClusterAnnotations.ClusterConfigurations(
MultiClusterAnnotations.ClusterConfiguration(clusterName = LEADER),
MultiClusterAnnotations.ClusterConfiguration(clusterName = FOLLOWER)
)

class ReplicationStopThenRestartIT : MultiClusterRestTestCase() {
private val leaderIndexName = "leader_index"
private val followerIndexName = "follower_index"

fun `test replication works after unclean stop and start`() {
val followerClient = getClientForCluster(FOLLOWER)
val leaderClient = getClientForCluster(LEADER)
changeTemplate(LEADER)
createConnectionBetweenClusters(FOLLOWER, LEADER)
val createIndexResponse = leaderClient.indices().create(CreateIndexRequest(leaderIndexName), RequestOptions.DEFAULT)
Assertions.assertThat(createIndexResponse.isAcknowledged).isTrue()
followerClient.startReplication(StartReplicationRequest("source", leaderIndexName, followerIndexName))
insertDocToIndex(LEADER, "1", "dummy data 1",leaderIndexName)
insertDocToIndex(LEADER, "2", "dummy data 1",leaderIndexName)

assertBusy ({
try {
Assert.assertEquals(2, docCount(followerClient, followerIndexName))
} catch (ex: Exception) {
ex.printStackTrace();
Assert.fail("Exception while querying follower cluster. Failing to retry again {}")
}
}, 1, TimeUnit.MINUTES)


deleteConnection(FOLLOWER)
followerClient.stopReplication(followerIndexName, shouldWait = true)
deleteIndex(followerClient, followerIndexName)

createConnectionBetweenClusters(FOLLOWER, LEADER)
insertDocToIndex(LEADER, "3", "dummy data 1",leaderIndexName)
insertDocToIndex(LEADER, "4", "dummy data 1",leaderIndexName)
followerClient.startReplication(StartReplicationRequest("source", leaderIndexName, followerIndexName))

assertBusy ({
try {
Assert.assertEquals(4, docCount(followerClient, followerIndexName))
} catch (ex: Exception) {
Assert.fail("Exception while querying follower cluster. Failing to retry again")
}
}, 1, TimeUnit.MINUTES)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -11,32 +11,37 @@

package org.opensearch.replication.task.shard

import org.opensearch.replication.action.changes.GetChangesResponse
import org.opensearch.replication.action.replay.ReplayChangesAction
import org.opensearch.replication.action.replay.ReplayChangesRequest
import org.opensearch.replication.action.replay.ReplayChangesResponse
import org.opensearch.replication.metadata.ReplicationOverallState
import org.opensearch.replication.metadata.store.ReplicationContext
import org.opensearch.replication.metadata.store.ReplicationMetadata
import org.opensearch.replication.metadata.store.ReplicationStoreMetadataType
import kotlinx.coroutines.ExperimentalCoroutinesApi
import kotlinx.coroutines.ObsoleteCoroutinesApi
import kotlinx.coroutines.test.runBlockingTest
import org.assertj.core.api.Assertions.assertThat
import org.mockito.Mockito
import org.opensearch.action.ActionListener
import org.opensearch.action.ActionRequest
import org.opensearch.action.ActionResponse
import org.opensearch.action.ActionType
import org.opensearch.action.support.replication.ReplicationResponse.ShardInfo
import org.opensearch.common.settings.Settings
import org.opensearch.index.IndexService
import org.opensearch.index.shard.IndexShard
import org.opensearch.index.shard.ShardId
import org.opensearch.index.translog.Translog
import org.opensearch.indices.IndicesService
import org.opensearch.replication.action.changes.GetChangesResponse
import org.opensearch.replication.action.replay.ReplayChangesAction
import org.opensearch.replication.action.replay.ReplayChangesRequest
import org.opensearch.replication.action.replay.ReplayChangesResponse
import org.opensearch.replication.metadata.ReplicationOverallState
import org.opensearch.replication.metadata.store.ReplicationContext
import org.opensearch.replication.metadata.store.ReplicationMetadata
import org.opensearch.replication.metadata.store.ReplicationStoreMetadataType
import org.opensearch.replication.util.indicesService
import org.opensearch.tasks.TaskId.EMPTY_TASK_ID
import org.opensearch.test.OpenSearchTestCase
import org.opensearch.test.OpenSearchTestCase.randomList
import org.opensearch.test.client.NoOpClient
import java.util.Locale


@ObsoleteCoroutinesApi
class TranslogSequencerTests : OpenSearchTestCase() {

Expand Down Expand Up @@ -83,6 +88,11 @@ class TranslogSequencerTests : OpenSearchTestCase() {
val stats = FollowerClusterStats()
stats.stats[followerShardId] = FollowerShardMetric()
val startSeqNo = randomNonNegativeLong()
indicesService = Mockito.mock(IndicesService::class.java)
val followerIndexService = Mockito.mock(IndexService::class.java)
val indexShard = Mockito.mock(IndexShard::class.java)
Mockito.`when`(indicesService.indexServiceSafe(followerShardId.index)).thenReturn(followerIndexService)
Mockito.`when`(followerIndexService.getShard(followerShardId.id)).thenReturn(indexShard)
val sequencer = TranslogSequencer(this, replicationMetadata, followerShardId, leaderAlias, leaderIndex, EMPTY_TASK_ID,
client, startSeqNo, stats)

Expand Down

0 comments on commit b34ef06

Please sign in to comment.