From ee5a9faa9690322bf6f4fedd70fbf1f08d695794 Mon Sep 17 00:00:00 2001 From: Monu Singh Date: Thu, 6 Jul 2023 12:20:39 +0530 Subject: [PATCH] Avoid use of indicesService in Resume replicaiton flow (#1030) (#1043) Avoid use of indicesService in Resume replicaiton flow. Signed-off-by: monusingh-1 --- .../action/resume/TransportResumeIndexReplicationAction.kt | 6 +----- .../replication/seqno/RemoteClusterRetentionLeaseHelper.kt | 4 ++-- 2 files changed, 3 insertions(+), 7 deletions(-) diff --git a/src/main/kotlin/org/opensearch/replication/action/resume/TransportResumeIndexReplicationAction.kt b/src/main/kotlin/org/opensearch/replication/action/resume/TransportResumeIndexReplicationAction.kt index 60ec6134..d38c7343 100644 --- a/src/main/kotlin/org/opensearch/replication/action/resume/TransportResumeIndexReplicationAction.kt +++ b/src/main/kotlin/org/opensearch/replication/action/resume/TransportResumeIndexReplicationAction.kt @@ -54,7 +54,6 @@ import org.opensearch.env.Environment import org.opensearch.index.IndexNotFoundException import org.opensearch.index.shard.ShardId import org.opensearch.replication.ReplicationPlugin.Companion.KNN_INDEX_SETTING -import org.opensearch.replication.util.indicesService import org.opensearch.threadpool.ThreadPool import org.opensearch.transport.TransportService import java.io.IOException @@ -139,10 +138,7 @@ class TransportResumeIndexReplicationAction @Inject constructor(transportService shards.forEach { val followerShardId = it.value.shardId - val followerIndexService = indicesService.indexServiceSafe(followerShardId.index) - val indexShard = followerIndexService.getShard(followerShardId.id) - - if (!retentionLeaseHelper.verifyRetentionLeaseExist(ShardId(params.leaderIndex, followerShardId.id), followerShardId, indexShard.lastSyncedGlobalCheckpoint+1)) { + if (!retentionLeaseHelper.verifyRetentionLeaseExist(ShardId(params.leaderIndex, followerShardId.id), followerShardId)) { isResumable = false } } diff --git a/src/main/kotlin/org/opensearch/replication/seqno/RemoteClusterRetentionLeaseHelper.kt b/src/main/kotlin/org/opensearch/replication/seqno/RemoteClusterRetentionLeaseHelper.kt index 29116c51..a1c1ee2f 100644 --- a/src/main/kotlin/org/opensearch/replication/seqno/RemoteClusterRetentionLeaseHelper.kt +++ b/src/main/kotlin/org/opensearch/replication/seqno/RemoteClusterRetentionLeaseHelper.kt @@ -58,7 +58,7 @@ class RemoteClusterRetentionLeaseHelper constructor(var followerClusterNameWithU } } - public suspend fun verifyRetentionLeaseExist(leaderShardId: ShardId, followerShardId: ShardId, seqNo: Long): Boolean { + public suspend fun verifyRetentionLeaseExist(leaderShardId: ShardId, followerShardId: ShardId): Boolean { val retentionLeaseId = retentionLeaseIdForShard(followerClusterNameWithUUID, followerShardId) // Currently there is no API to describe/list the retention leases . // So we are verifying the existence of lease by trying to renew a lease by same name . @@ -74,7 +74,7 @@ class RemoteClusterRetentionLeaseHelper constructor(var followerClusterNameWithU return true } catch (e: RetentionLeaseNotFoundException) { - return addNewRetentionLeaseIfOldExists(leaderShardId, followerShardId, seqNo) + return addNewRetentionLeaseIfOldExists(leaderShardId, followerShardId, RetentionLeaseActions.RETAIN_ALL) }catch (e : Exception) { return false }