From 6130d6feda5075a695bc384b2702a76d4db383a6 Mon Sep 17 00:00:00 2001 From: Gaurav Bafna <85113518+gbbafna@users.noreply.github.com> Date: Mon, 4 Jul 2022 12:22:33 +0530 Subject: [PATCH] Correctly updating the followerCheckpoint in stats api (#438) Summary : We need to update followerCheckpoint after writing to the follower index. Currently, we are not waiting for the writes and updating it with soon-to-be stale values Signed-off-by: Gaurav Bafna --- .../task/shard/ShardReplicationTask.kt | 2 +- .../task/shard/TranslogSequencer.kt | 5 ++++ .../integ/rest/StartReplicationIT.kt | 1 + .../task/shard/TranslogSequencerTests.kt | 28 +++++++++++++------ 4 files changed, 26 insertions(+), 10 deletions(-) diff --git a/src/main/kotlin/org/opensearch/replication/task/shard/ShardReplicationTask.kt b/src/main/kotlin/org/opensearch/replication/task/shard/ShardReplicationTask.kt index a7418917..44493bc7 100644 --- a/src/main/kotlin/org/opensearch/replication/task/shard/ShardReplicationTask.kt +++ b/src/main/kotlin/org/opensearch/replication/task/shard/ShardReplicationTask.kt @@ -217,6 +217,7 @@ class ShardReplicationTask(id: Long, type: String, action: String, description: TaskId(clusterService.nodeName, id), client, indexShard.localCheckpoint, followerClusterStats) val changeTracker = ShardReplicationChangesTracker(indexShard, replicationSettings) + followerClusterStats.stats[followerShardId]!!.followerCheckpoint = indexShard.localCheckpoint coroutineScope { while (isActive) { rateLimiter.acquire() @@ -273,7 +274,6 @@ class ShardReplicationTask(id: Long, type: String, action: String, description: //hence renew retention lease with lastSyncedGlobalCheckpoint + 1 so that any shard that picks up shard replication task has data until then. try { retentionLeaseHelper.renewRetentionLease(leaderShardId, indexShard.lastSyncedGlobalCheckpoint + 1, followerShardId) - followerClusterStats.stats[followerShardId]!!.followerCheckpoint = indexShard.lastSyncedGlobalCheckpoint lastLeaseRenewalMillis = System.currentTimeMillis() } catch (ex: Exception) { when (ex) { diff --git a/src/main/kotlin/org/opensearch/replication/task/shard/TranslogSequencer.kt b/src/main/kotlin/org/opensearch/replication/task/shard/TranslogSequencer.kt index be5fe89c..38b625bf 100644 --- a/src/main/kotlin/org/opensearch/replication/task/shard/TranslogSequencer.kt +++ b/src/main/kotlin/org/opensearch/replication/task/shard/TranslogSequencer.kt @@ -28,6 +28,7 @@ import org.opensearch.client.Client import org.opensearch.common.logging.Loggers import org.opensearch.index.shard.ShardId import org.opensearch.index.translog.Translog +import org.opensearch.replication.util.indicesService import org.opensearch.tasks.TaskId import java.util.ArrayList import java.util.concurrent.ConcurrentHashMap @@ -55,6 +56,9 @@ class TranslogSequencer(scope: CoroutineScope, private val replicationMetadata: private val log = Loggers.getLogger(javaClass, followerShardId)!! private val completed = CompletableDeferred() + val followerIndexService = indicesService.indexServiceSafe(followerShardId.index) + val indexShard = followerIndexService.getShard(followerShardId.id) + private val sequencer = scope.actor(capacity = Channel.UNLIMITED) { // Exceptions thrown here will mark the channel as failed and the next attempt to send to the channel will // raise the same exception. See [SendChannel.close] method for details. @@ -88,6 +92,7 @@ class TranslogSequencer(scope: CoroutineScope, private val replicationMetadata: val tookInNanos = System.nanoTime() - relativeStartNanos followerClusterStats.stats[followerShardId]!!.totalWriteTime.addAndGet(TimeUnit.NANOSECONDS.toMillis(tookInNanos)) followerClusterStats.stats[followerShardId]!!.opsWritten.addAndGet(replayRequest.changes.size.toLong()) + followerClusterStats.stats[followerShardId]!!.followerCheckpoint = indexShard.localCheckpoint } highWatermark = next.changes.lastOrNull()?.seqNo() ?: highWatermark } diff --git a/src/test/kotlin/org/opensearch/replication/integ/rest/StartReplicationIT.kt b/src/test/kotlin/org/opensearch/replication/integ/rest/StartReplicationIT.kt index e9bc717e..4c50bf3a 100644 --- a/src/test/kotlin/org/opensearch/replication/integ/rest/StartReplicationIT.kt +++ b/src/test/kotlin/org/opensearch/replication/integ/rest/StartReplicationIT.kt @@ -1091,6 +1091,7 @@ class StartReplicationIT: MultiClusterRestTestCase() { assertThat(stats.getValue("operations_read").toString()).isEqualTo("50") assertThat(stats.getValue("failed_read_requests").toString()).isEqualTo("0") assertThat(stats.getValue("failed_write_requests").toString()).isEqualTo("0") + assertThat(stats.getValue("follower_checkpoint").toString()).isEqualTo((docCount-1).toString()) assertThat(stats.containsKey("index_stats")) assertThat(stats.size).isEqualTo(16) diff --git a/src/test/kotlin/org/opensearch/replication/task/shard/TranslogSequencerTests.kt b/src/test/kotlin/org/opensearch/replication/task/shard/TranslogSequencerTests.kt index ed5afb06..ac377687 100644 --- a/src/test/kotlin/org/opensearch/replication/task/shard/TranslogSequencerTests.kt +++ b/src/test/kotlin/org/opensearch/replication/task/shard/TranslogSequencerTests.kt @@ -11,32 +11,37 @@ package org.opensearch.replication.task.shard -import org.opensearch.replication.action.changes.GetChangesResponse -import org.opensearch.replication.action.replay.ReplayChangesAction -import org.opensearch.replication.action.replay.ReplayChangesRequest -import org.opensearch.replication.action.replay.ReplayChangesResponse -import org.opensearch.replication.metadata.ReplicationOverallState -import org.opensearch.replication.metadata.store.ReplicationContext -import org.opensearch.replication.metadata.store.ReplicationMetadata -import org.opensearch.replication.metadata.store.ReplicationStoreMetadataType import kotlinx.coroutines.ExperimentalCoroutinesApi import kotlinx.coroutines.ObsoleteCoroutinesApi import kotlinx.coroutines.test.runBlockingTest import org.assertj.core.api.Assertions.assertThat +import org.mockito.Mockito import org.opensearch.action.ActionListener import org.opensearch.action.ActionRequest import org.opensearch.action.ActionResponse import org.opensearch.action.ActionType import org.opensearch.action.support.replication.ReplicationResponse.ShardInfo import org.opensearch.common.settings.Settings +import org.opensearch.index.IndexService +import org.opensearch.index.shard.IndexShard import org.opensearch.index.shard.ShardId import org.opensearch.index.translog.Translog +import org.opensearch.indices.IndicesService +import org.opensearch.replication.action.changes.GetChangesResponse +import org.opensearch.replication.action.replay.ReplayChangesAction +import org.opensearch.replication.action.replay.ReplayChangesRequest +import org.opensearch.replication.action.replay.ReplayChangesResponse +import org.opensearch.replication.metadata.ReplicationOverallState +import org.opensearch.replication.metadata.store.ReplicationContext +import org.opensearch.replication.metadata.store.ReplicationMetadata +import org.opensearch.replication.metadata.store.ReplicationStoreMetadataType +import org.opensearch.replication.util.indicesService import org.opensearch.tasks.TaskId.EMPTY_TASK_ID import org.opensearch.test.OpenSearchTestCase -import org.opensearch.test.OpenSearchTestCase.randomList import org.opensearch.test.client.NoOpClient import java.util.Locale + @ObsoleteCoroutinesApi class TranslogSequencerTests : OpenSearchTestCase() { @@ -83,6 +88,11 @@ class TranslogSequencerTests : OpenSearchTestCase() { val stats = FollowerClusterStats() stats.stats[followerShardId] = FollowerShardMetric() val startSeqNo = randomNonNegativeLong() + indicesService = Mockito.mock(IndicesService::class.java) + val followerIndexService = Mockito.mock(IndexService::class.java) + val indexShard = Mockito.mock(IndexShard::class.java) + Mockito.`when`(indicesService.indexServiceSafe(followerShardId.index)).thenReturn(followerIndexService) + Mockito.`when`(followerIndexService.getShard(followerShardId.id)).thenReturn(indexShard) val sequencer = TranslogSequencer(this, replicationMetadata, followerShardId, leaderAlias, leaderIndex, EMPTY_TASK_ID, client, startSeqNo, stats)