diff --git a/src/main/kotlin/org/opensearch/replication/task/shard/TranslogSequencer.kt b/src/main/kotlin/org/opensearch/replication/task/shard/TranslogSequencer.kt index 5036df59..97127291 100644 --- a/src/main/kotlin/org/opensearch/replication/task/shard/TranslogSequencer.kt +++ b/src/main/kotlin/org/opensearch/replication/task/shard/TranslogSequencer.kt @@ -31,6 +31,7 @@ import org.opensearch.common.logging.Loggers import org.opensearch.index.IndexNotFoundException import org.opensearch.index.shard.ShardId import org.opensearch.index.translog.Translog +import org.opensearch.replication.util.indicesService import org.opensearch.tasks.TaskId import java.util.ArrayList import java.util.concurrent.ConcurrentHashMap @@ -60,6 +61,9 @@ class TranslogSequencer(scope: CoroutineScope, private val replicationMetadata: private val log = Loggers.getLogger(javaClass, followerShardId)!! private val completed = CompletableDeferred() + val followerIndexService = indicesService.indexServiceSafe(followerShardId.index) + val indexShard = followerIndexService.getShard(followerShardId.id) + private val sequencer = scope.actor(capacity = 0) { // Exceptions thrown here will mark the channel as failed and the next attempt to send to the channel will @@ -67,12 +71,12 @@ class TranslogSequencer(scope: CoroutineScope, private val replicationMetadata: val rateLimiter = Semaphore(writersPerShard) var highWatermark = initialSeqNo for (m in channel) { - rateLimiter.acquire() while (unAppliedChanges.containsKey(highWatermark + 1)) { val next = unAppliedChanges.remove(highWatermark + 1)!! val replayRequest = ReplayChangesRequest(followerShardId, next.changes, next.maxSeqNoOfUpdatesOrDeletes, - leaderAlias, leaderIndexName) + leaderAlias, leaderIndexName) replayRequest.parentTask = parentTaskId + rateLimiter.acquire() launch { var relativeStartNanos = System.nanoTime() val retryOnExceptions = ArrayList>() @@ -109,15 +113,16 @@ class TranslogSequencer(scope: CoroutineScope, private val replicationMetadata: followerClusterStats.stats[followerShardId]!!.opsWritten.addAndGet( replayRequest.changes.size.toLong() ) + followerClusterStats.stats[followerShardId]!!.followerCheckpoint = indexShard.localCheckpoint } catch (e: OpenSearchException) { if (e !is IndexNotFoundException && (retryOnExceptions.contains(e.javaClass) || TransportActions.isShardNotAvailableException(e) // This waits for the dependencies to load and retry. Helps during boot-up || e.status().status >= 500 || e.status() == RestStatus.TOO_MANY_REQUESTS)) { - tryReplay = true - } - else { + tryReplay = true + } + else { log.error("Got non-retriable Exception:${e.message} with status:${e.status()}") throw e } @@ -143,4 +148,4 @@ class TranslogSequencer(scope: CoroutineScope, private val replicationMetadata: unAppliedChanges[changes.fromSeqNo] = changes sequencer.send(Unit) } -} +} \ No newline at end of file diff --git a/src/test/kotlin/org/opensearch/replication/integ/rest/StartReplicationIT.kt b/src/test/kotlin/org/opensearch/replication/integ/rest/StartReplicationIT.kt index cd3d849b..79f46faf 100644 --- a/src/test/kotlin/org/opensearch/replication/integ/rest/StartReplicationIT.kt +++ b/src/test/kotlin/org/opensearch/replication/integ/rest/StartReplicationIT.kt @@ -886,36 +886,32 @@ class StartReplicationIT: MultiClusterRestTestCase() { }, 60L, TimeUnit.SECONDS) } - @AwaitsFix(bugUrl = "https://github.com/opensearch-project/cross-cluster-replication/issues/176") + fun `test follower stats`() { val followerClient = getClientForCluster(FOLLOWER) val leaderClient = getClientForCluster(LEADER) - val leaderIndexName2 = randomAlphaOfLength(10).toLowerCase(Locale.ROOT)+"leader" val followerIndexName2 = randomAlphaOfLength(10).toLowerCase(Locale.ROOT)+"follower" - val leaderIndexName3 = randomAlphaOfLength(10).toLowerCase(Locale.ROOT)+"leader" val followerIndexName3 = randomAlphaOfLength(10).toLowerCase(Locale.ROOT)+"follower" -// val followerIndex2 = "follower_index_2" -// val followerIndex3 = "follower_index_3" createConnectionBetweenClusters(FOLLOWER, LEADER) val createIndexResponse = leaderClient.indices().create( - CreateIndexRequest(leaderIndexName), - RequestOptions.DEFAULT + CreateIndexRequest(leaderIndexName), + RequestOptions.DEFAULT ) assertThat(createIndexResponse.isAcknowledged).isTrue() followerClient.startReplication( - StartReplicationRequest("source", leaderIndexName, followerIndexName), - TimeValue.timeValueSeconds(10), - true + StartReplicationRequest("source", leaderIndexName, followerIndexName), + TimeValue.timeValueSeconds(10), + true ) followerClient.startReplication( - StartReplicationRequest("source", leaderIndexName2, followerIndexName2), - TimeValue.timeValueSeconds(10), - true + StartReplicationRequest("source", leaderIndexName, followerIndexName2), + TimeValue.timeValueSeconds(10), + true ) followerClient.startReplication( - StartReplicationRequest("source", leaderIndexName3, followerIndexName3), - TimeValue.timeValueSeconds(10), - true + StartReplicationRequest("source", leaderIndexName, followerIndexName3), + TimeValue.timeValueSeconds(10), + true ) val docCount = 50 for (i in 1..docCount) { @@ -923,12 +919,16 @@ class StartReplicationIT: MultiClusterRestTestCase() { leaderClient.index(IndexRequest(leaderIndexName).id(i.toString()).source(sourceMap), RequestOptions.DEFAULT) } followerClient.pauseReplication(followerIndexName2) - val stats = followerClient.followerStats() + followerClient.stopReplication(followerIndexName3) + var stats = followerClient.followerStats() assertThat(stats.getValue("num_syncing_indices").toString()).isEqualTo("1") assertThat(stats.getValue("num_paused_indices").toString()).isEqualTo("1") assertThat(stats.getValue("num_failed_indices").toString()).isEqualTo("0") assertThat(stats.getValue("num_shard_tasks").toString()).isEqualTo("1") - assertThat(stats.getValue("operations_written").toString()).isEqualTo("50") + assertBusy({ + stats = followerClient.followerStats() + assertThat(stats.getValue("operations_written").toString()).isEqualTo("50") + }, 60, TimeUnit.SECONDS) assertThat(stats.getValue("operations_read").toString()).isEqualTo("50") assertThat(stats.getValue("failed_read_requests").toString()).isEqualTo("0") assertThat(stats.getValue("failed_write_requests").toString()).isEqualTo("0") diff --git a/src/test/kotlin/org/opensearch/replication/task/shard/TranslogSequencerTests.kt b/src/test/kotlin/org/opensearch/replication/task/shard/TranslogSequencerTests.kt index b1ffc495..82fe13ef 100644 --- a/src/test/kotlin/org/opensearch/replication/task/shard/TranslogSequencerTests.kt +++ b/src/test/kotlin/org/opensearch/replication/task/shard/TranslogSequencerTests.kt @@ -23,14 +23,19 @@ import kotlinx.coroutines.ExperimentalCoroutinesApi import kotlinx.coroutines.ObsoleteCoroutinesApi import kotlinx.coroutines.test.runBlockingTest import org.assertj.core.api.Assertions.assertThat +import org.mockito.Mockito import org.opensearch.action.ActionListener import org.opensearch.action.ActionRequest import org.opensearch.action.ActionResponse import org.opensearch.action.ActionType import org.opensearch.action.support.replication.ReplicationResponse.ShardInfo import org.opensearch.common.settings.Settings +import org.opensearch.index.IndexService +import org.opensearch.index.shard.IndexShard import org.opensearch.index.shard.ShardId import org.opensearch.index.translog.Translog +import org.opensearch.indices.IndicesService +import org.opensearch.replication.util.indicesService import org.opensearch.tasks.TaskId.EMPTY_TASK_ID import org.opensearch.test.OpenSearchTestCase import org.opensearch.test.OpenSearchTestCase.randomList @@ -83,8 +88,13 @@ class TranslogSequencerTests : OpenSearchTestCase() { val stats = FollowerClusterStats() stats.stats[followerShardId] = FollowerShardMetric() val startSeqNo = randomNonNegativeLong() + indicesService = Mockito.mock(IndicesService::class.java) + val followerIndexService = Mockito.mock(IndexService::class.java) + val indexShard = Mockito.mock(IndexShard::class.java) + Mockito.`when`(indicesService.indexServiceSafe(followerShardId.index)).thenReturn(followerIndexService) + Mockito.`when`(followerIndexService.getShard(followerShardId.id)).thenReturn(indexShard) val sequencer = TranslogSequencer(this, replicationMetadata, followerShardId, leaderAlias, leaderIndex, EMPTY_TASK_ID, - client, startSeqNo, stats, 2) + client, startSeqNo, stats, 2) // Send requests out of order (shuffled seqNo) and await for them to be processed. var batchSeqNo = startSeqNo