diff --git a/src/main/kotlin/org/opensearch/replication/action/resume/TransportResumeIndexReplicationAction.kt b/src/main/kotlin/org/opensearch/replication/action/resume/TransportResumeIndexReplicationAction.kt index c3c36419..97fb017e 100644 --- a/src/main/kotlin/org/opensearch/replication/action/resume/TransportResumeIndexReplicationAction.kt +++ b/src/main/kotlin/org/opensearch/replication/action/resume/TransportResumeIndexReplicationAction.kt @@ -54,7 +54,6 @@ import org.opensearch.common.io.stream.StreamInput import org.opensearch.env.Environment import org.opensearch.index.IndexNotFoundException import org.opensearch.index.shard.ShardId -import org.opensearch.replication.util.indicesService import org.opensearch.threadpool.ThreadPool import org.opensearch.transport.TransportService import java.io.IOException @@ -136,10 +135,7 @@ class TransportResumeIndexReplicationAction @Inject constructor(transportService shards?.forEach { val followerShardId = it.value.shardId - val followerIndexService = indicesService.indexServiceSafe(followerShardId.index) - val indexShard = followerIndexService.getShard(followerShardId.id) - - if (!retentionLeaseHelper.verifyRetentionLeaseExist(ShardId(params.leaderIndex, followerShardId.id), followerShardId, indexShard.lastSyncedGlobalCheckpoint+1)) { + if (!retentionLeaseHelper.verifyRetentionLeaseExist(ShardId(params.leaderIndex, followerShardId.id), followerShardId)) { isResumable = false } } diff --git a/src/main/kotlin/org/opensearch/replication/seqno/RemoteClusterRetentionLeaseHelper.kt b/src/main/kotlin/org/opensearch/replication/seqno/RemoteClusterRetentionLeaseHelper.kt index 80037455..d50540a3 100644 --- a/src/main/kotlin/org/opensearch/replication/seqno/RemoteClusterRetentionLeaseHelper.kt +++ b/src/main/kotlin/org/opensearch/replication/seqno/RemoteClusterRetentionLeaseHelper.kt @@ -58,7 +58,7 @@ class RemoteClusterRetentionLeaseHelper constructor(var followerClusterNameWithU } } - public suspend fun verifyRetentionLeaseExist(leaderShardId: ShardId, followerShardId: ShardId, seqNo: Long): Boolean { + public suspend fun verifyRetentionLeaseExist(leaderShardId: ShardId, followerShardId: ShardId): Boolean { val retentionLeaseId = retentionLeaseIdForShard(followerClusterNameWithUUID, followerShardId) // Currently there is no API to describe/list the retention leases . // So we are verifying the existence of lease by trying to renew a lease by same name . @@ -74,7 +74,7 @@ class RemoteClusterRetentionLeaseHelper constructor(var followerClusterNameWithU return true } catch (e: RetentionLeaseNotFoundException) { - return addNewRetentionLeaseIfOldExists(leaderShardId, followerShardId, seqNo) + return addNewRetentionLeaseIfOldExists(leaderShardId, followerShardId, RetentionLeaseActions.RETAIN_ALL) }catch (e : Exception) { return false }