Skip to content

Commit

Permalink
Correctly updating the followerCheckpoint in stats api (opensearch-pr…
Browse files Browse the repository at this point in the history
…oject#438)

Summary : We need to update followerCheckpoint after writing to the follower index.
Currently, we are not waiting for the writes and updating it with soon-to-be stale values

Signed-off-by: Gaurav Bafna <[email protected]>
  • Loading branch information
gbbafna authored and ankitkala committed Jun 2, 2023
1 parent 041c14f commit 78e9e75
Show file tree
Hide file tree
Showing 3 changed files with 25 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -217,6 +217,7 @@ class ShardReplicationTask(id: Long, type: String, action: String, description:
TaskId(clusterService.nodeName, id), client, indexShard.localCheckpoint, followerClusterStats)

val changeTracker = ShardReplicationChangesTracker(indexShard, replicationSettings)
followerClusterStats.stats[followerShardId]!!.followerCheckpoint = indexShard.localCheckpoint
coroutineScope {
while (isActive) {
rateLimiter.acquire()
Expand Down Expand Up @@ -273,7 +274,6 @@ class ShardReplicationTask(id: Long, type: String, action: String, description:
//hence renew retention lease with lastSyncedGlobalCheckpoint + 1 so that any shard that picks up shard replication task has data until then.
try {
retentionLeaseHelper.renewRetentionLease(leaderShardId, indexShard.lastSyncedGlobalCheckpoint + 1, followerShardId)
followerClusterStats.stats[followerShardId]!!.followerCheckpoint = indexShard.lastSyncedGlobalCheckpoint
lastLeaseRenewalMillis = System.currentTimeMillis()
} catch (ex: Exception) {
when (ex) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import org.opensearch.client.Client
import org.opensearch.common.logging.Loggers
import org.opensearch.index.shard.ShardId
import org.opensearch.index.translog.Translog
import org.opensearch.replication.util.indicesService
import org.opensearch.tasks.TaskId
import java.util.ArrayList
import java.util.concurrent.ConcurrentHashMap
Expand Down Expand Up @@ -55,6 +56,9 @@ class TranslogSequencer(scope: CoroutineScope, private val replicationMetadata:
private val log = Loggers.getLogger(javaClass, followerShardId)!!
private val completed = CompletableDeferred<Unit>()

val followerIndexService = indicesService.indexServiceSafe(followerShardId.index)
val indexShard = followerIndexService.getShard(followerShardId.id)

private val sequencer = scope.actor<Unit>(capacity = Channel.UNLIMITED) {
// Exceptions thrown here will mark the channel as failed and the next attempt to send to the channel will
// raise the same exception. See [SendChannel.close] method for details.
Expand Down Expand Up @@ -88,6 +92,7 @@ class TranslogSequencer(scope: CoroutineScope, private val replicationMetadata:
val tookInNanos = System.nanoTime() - relativeStartNanos
followerClusterStats.stats[followerShardId]!!.totalWriteTime.addAndGet(TimeUnit.NANOSECONDS.toMillis(tookInNanos))
followerClusterStats.stats[followerShardId]!!.opsWritten.addAndGet(replayRequest.changes.size.toLong())
followerClusterStats.stats[followerShardId]!!.followerCheckpoint = indexShard.localCheckpoint
}
highWatermark = next.changes.lastOrNull()?.seqNo() ?: highWatermark
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,32 +11,37 @@

package org.opensearch.replication.task.shard

import org.opensearch.replication.action.changes.GetChangesResponse
import org.opensearch.replication.action.replay.ReplayChangesAction
import org.opensearch.replication.action.replay.ReplayChangesRequest
import org.opensearch.replication.action.replay.ReplayChangesResponse
import org.opensearch.replication.metadata.ReplicationOverallState
import org.opensearch.replication.metadata.store.ReplicationContext
import org.opensearch.replication.metadata.store.ReplicationMetadata
import org.opensearch.replication.metadata.store.ReplicationStoreMetadataType
import kotlinx.coroutines.ExperimentalCoroutinesApi
import kotlinx.coroutines.ObsoleteCoroutinesApi
import kotlinx.coroutines.test.runBlockingTest
import org.assertj.core.api.Assertions.assertThat
import org.mockito.Mockito
import org.opensearch.action.ActionListener
import org.opensearch.action.ActionRequest
import org.opensearch.action.ActionResponse
import org.opensearch.action.ActionType
import org.opensearch.action.support.replication.ReplicationResponse.ShardInfo
import org.opensearch.common.settings.Settings
import org.opensearch.index.IndexService
import org.opensearch.index.shard.IndexShard
import org.opensearch.index.shard.ShardId
import org.opensearch.index.translog.Translog
import org.opensearch.indices.IndicesService
import org.opensearch.replication.action.changes.GetChangesResponse
import org.opensearch.replication.action.replay.ReplayChangesAction
import org.opensearch.replication.action.replay.ReplayChangesRequest
import org.opensearch.replication.action.replay.ReplayChangesResponse
import org.opensearch.replication.metadata.ReplicationOverallState
import org.opensearch.replication.metadata.store.ReplicationContext
import org.opensearch.replication.metadata.store.ReplicationMetadata
import org.opensearch.replication.metadata.store.ReplicationStoreMetadataType
import org.opensearch.replication.util.indicesService
import org.opensearch.tasks.TaskId.EMPTY_TASK_ID
import org.opensearch.test.OpenSearchTestCase
import org.opensearch.test.OpenSearchTestCase.randomList
import org.opensearch.test.client.NoOpClient
import java.util.Locale


@ObsoleteCoroutinesApi
class TranslogSequencerTests : OpenSearchTestCase() {

Expand Down Expand Up @@ -83,6 +88,11 @@ class TranslogSequencerTests : OpenSearchTestCase() {
val stats = FollowerClusterStats()
stats.stats[followerShardId] = FollowerShardMetric()
val startSeqNo = randomNonNegativeLong()
indicesService = Mockito.mock(IndicesService::class.java)
val followerIndexService = Mockito.mock(IndexService::class.java)
val indexShard = Mockito.mock(IndexShard::class.java)
Mockito.`when`(indicesService.indexServiceSafe(followerShardId.index)).thenReturn(followerIndexService)
Mockito.`when`(followerIndexService.getShard(followerShardId.id)).thenReturn(indexShard)
val sequencer = TranslogSequencer(this, replicationMetadata, followerShardId, leaderAlias, leaderIndex, EMPTY_TASK_ID,
client, startSeqNo, stats)

Expand Down

0 comments on commit 78e9e75

Please sign in to comment.