Skip to content

Commit

Permalink
adding missing checkpoint and correcting follower stats test case
Browse files Browse the repository at this point in the history
  • Loading branch information
sricharanvuppu committed Sep 26, 2023
1 parent b086a7e commit b7cb129
Show file tree
Hide file tree
Showing 3 changed files with 40 additions and 25 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import org.opensearch.common.logging.Loggers
import org.opensearch.index.IndexNotFoundException
import org.opensearch.index.shard.ShardId
import org.opensearch.index.translog.Translog
import org.opensearch.replication.util.indicesService
import org.opensearch.tasks.TaskId
import java.util.ArrayList
import java.util.concurrent.ConcurrentHashMap
Expand Down Expand Up @@ -60,19 +61,22 @@ class TranslogSequencer(scope: CoroutineScope, private val replicationMetadata:
private val log = Loggers.getLogger(javaClass, followerShardId)!!
private val completed = CompletableDeferred<Unit>()

val followerIndexService = indicesService.indexServiceSafe(followerShardId.index)
val indexShard = followerIndexService.getShard(followerShardId.id)

private val sequencer = scope.actor<Unit>(capacity = 0) {

// Exceptions thrown here will mark the channel as failed and the next attempt to send to the channel will
// raise the same exception. See [SendChannel.close] method for details.
val rateLimiter = Semaphore(writersPerShard)
var highWatermark = initialSeqNo
for (m in channel) {
rateLimiter.acquire()
while (unAppliedChanges.containsKey(highWatermark + 1)) {
val next = unAppliedChanges.remove(highWatermark + 1)!!
val replayRequest = ReplayChangesRequest(followerShardId, next.changes, next.maxSeqNoOfUpdatesOrDeletes,
leaderAlias, leaderIndexName)
leaderAlias, leaderIndexName)
replayRequest.parentTask = parentTaskId
rateLimiter.acquire()
launch {
var relativeStartNanos = System.nanoTime()
val retryOnExceptions = ArrayList<Class<*>>()
Expand Down Expand Up @@ -109,15 +113,16 @@ class TranslogSequencer(scope: CoroutineScope, private val replicationMetadata:
followerClusterStats.stats[followerShardId]!!.opsWritten.addAndGet(
replayRequest.changes.size.toLong()
)
followerClusterStats.stats[followerShardId]!!.followerCheckpoint = indexShard.localCheckpoint
} catch (e: OpenSearchException) {
if (e !is IndexNotFoundException && (retryOnExceptions.contains(e.javaClass)
|| TransportActions.isShardNotAvailableException(e)
// This waits for the dependencies to load and retry. Helps during boot-up
|| e.status().status >= 500
|| e.status() == RestStatus.TOO_MANY_REQUESTS)) {
tryReplay = true
}
else {
tryReplay = true
}
else {
log.error("Got non-retriable Exception:${e.message} with status:${e.status()}")
throw e
}
Expand All @@ -143,4 +148,4 @@ class TranslogSequencer(scope: CoroutineScope, private val replicationMetadata:
unAppliedChanges[changes.fromSeqNo] = changes
sequencer.send(Unit)
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -886,49 +886,49 @@ class StartReplicationIT: MultiClusterRestTestCase() {
}, 60L, TimeUnit.SECONDS)
}

@AwaitsFix(bugUrl = "https://github.com/opensearch-project/cross-cluster-replication/issues/176")

fun `test follower stats`() {
val followerClient = getClientForCluster(FOLLOWER)
val leaderClient = getClientForCluster(LEADER)
val leaderIndexName2 = randomAlphaOfLength(10).toLowerCase(Locale.ROOT)+"leader"
val followerIndexName2 = randomAlphaOfLength(10).toLowerCase(Locale.ROOT)+"follower"
val leaderIndexName3 = randomAlphaOfLength(10).toLowerCase(Locale.ROOT)+"leader"
val followerIndexName3 = randomAlphaOfLength(10).toLowerCase(Locale.ROOT)+"follower"
// val followerIndex2 = "follower_index_2"
// val followerIndex3 = "follower_index_3"
createConnectionBetweenClusters(FOLLOWER, LEADER)
val createIndexResponse = leaderClient.indices().create(
CreateIndexRequest(leaderIndexName),
RequestOptions.DEFAULT
CreateIndexRequest(leaderIndexName),
RequestOptions.DEFAULT
)
assertThat(createIndexResponse.isAcknowledged).isTrue()
followerClient.startReplication(
StartReplicationRequest("source", leaderIndexName, followerIndexName),
TimeValue.timeValueSeconds(10),
true
StartReplicationRequest("source", leaderIndexName, followerIndexName),
TimeValue.timeValueSeconds(10),
true
)
followerClient.startReplication(
StartReplicationRequest("source", leaderIndexName2, followerIndexName2),
TimeValue.timeValueSeconds(10),
true
StartReplicationRequest("source", leaderIndexName, followerIndexName2),
TimeValue.timeValueSeconds(10),
true
)
followerClient.startReplication(
StartReplicationRequest("source", leaderIndexName3, followerIndexName3),
TimeValue.timeValueSeconds(10),
true
StartReplicationRequest("source", leaderIndexName, followerIndexName3),
TimeValue.timeValueSeconds(10),
true
)
val docCount = 50
for (i in 1..docCount) {
val sourceMap = mapOf("name" to randomAlphaOfLength(5))
leaderClient.index(IndexRequest(leaderIndexName).id(i.toString()).source(sourceMap), RequestOptions.DEFAULT)
}
followerClient.pauseReplication(followerIndexName2)
val stats = followerClient.followerStats()
followerClient.stopReplication(followerIndexName3)
var stats = followerClient.followerStats()
assertThat(stats.getValue("num_syncing_indices").toString()).isEqualTo("1")
assertThat(stats.getValue("num_paused_indices").toString()).isEqualTo("1")
assertThat(stats.getValue("num_failed_indices").toString()).isEqualTo("0")
assertThat(stats.getValue("num_shard_tasks").toString()).isEqualTo("1")
assertThat(stats.getValue("operations_written").toString()).isEqualTo("50")
assertBusy({
stats = followerClient.followerStats()
assertThat(stats.getValue("operations_written").toString()).isEqualTo("50")
}, 60, TimeUnit.SECONDS)
assertThat(stats.getValue("operations_read").toString()).isEqualTo("50")
assertThat(stats.getValue("failed_read_requests").toString()).isEqualTo("0")
assertThat(stats.getValue("failed_write_requests").toString()).isEqualTo("0")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,14 +23,19 @@ import kotlinx.coroutines.ExperimentalCoroutinesApi
import kotlinx.coroutines.ObsoleteCoroutinesApi
import kotlinx.coroutines.test.runBlockingTest
import org.assertj.core.api.Assertions.assertThat
import org.mockito.Mockito
import org.opensearch.action.ActionListener
import org.opensearch.action.ActionRequest
import org.opensearch.action.ActionResponse
import org.opensearch.action.ActionType
import org.opensearch.action.support.replication.ReplicationResponse.ShardInfo
import org.opensearch.common.settings.Settings
import org.opensearch.index.IndexService
import org.opensearch.index.shard.IndexShard
import org.opensearch.index.shard.ShardId
import org.opensearch.index.translog.Translog
import org.opensearch.indices.IndicesService
import org.opensearch.replication.util.indicesService
import org.opensearch.tasks.TaskId.EMPTY_TASK_ID
import org.opensearch.test.OpenSearchTestCase
import org.opensearch.test.OpenSearchTestCase.randomList
Expand Down Expand Up @@ -83,8 +88,13 @@ class TranslogSequencerTests : OpenSearchTestCase() {
val stats = FollowerClusterStats()
stats.stats[followerShardId] = FollowerShardMetric()
val startSeqNo = randomNonNegativeLong()
indicesService = Mockito.mock(IndicesService::class.java)
val followerIndexService = Mockito.mock(IndexService::class.java)
val indexShard = Mockito.mock(IndexShard::class.java)
Mockito.`when`(indicesService.indexServiceSafe(followerShardId.index)).thenReturn(followerIndexService)
Mockito.`when`(followerIndexService.getShard(followerShardId.id)).thenReturn(indexShard)
val sequencer = TranslogSequencer(this, replicationMetadata, followerShardId, leaderAlias, leaderIndex, EMPTY_TASK_ID,
client, startSeqNo, stats, 2)
client, startSeqNo, stats, 2)

// Send requests out of order (shuffled seqNo) and await for them to be processed.
var batchSeqNo = startSeqNo
Expand Down

0 comments on commit b7cb129

Please sign in to comment.