diff --git a/src/main/kotlin/com/amazon/elasticsearch/replication/action/resume/TransportResumeIndexReplicationAction.kt b/src/main/kotlin/com/amazon/elasticsearch/replication/action/resume/TransportResumeIndexReplicationAction.kt index 5d15b635..11896eee 100644 --- a/src/main/kotlin/com/amazon/elasticsearch/replication/action/resume/TransportResumeIndexReplicationAction.kt +++ b/src/main/kotlin/com/amazon/elasticsearch/replication/action/resume/TransportResumeIndexReplicationAction.kt @@ -60,7 +60,6 @@ import org.elasticsearch.index.IndexNotFoundException import org.elasticsearch.index.shard.ShardId import org.elasticsearch.threadpool.ThreadPool import org.elasticsearch.transport.TransportService -import com.amazon.elasticsearch.replication.util.indicesService import java.io.IOException class TransportResumeIndexReplicationAction @Inject constructor(transportService: TransportService, @@ -142,10 +141,7 @@ class TransportResumeIndexReplicationAction @Inject constructor(transportService shards.forEach { val followerShardId = it.value.shardId - val followerIndexService = indicesService.indexServiceSafe(followerShardId.index) - val indexShard = followerIndexService.getShard(followerShardId.id) - - if (!retentionLeaseHelper.verifyRetentionLeaseExist(ShardId(params.leaderIndex, followerShardId.id), followerShardId, indexShard.lastSyncedGlobalCheckpoint+1)) { + if (!retentionLeaseHelper.verifyRetentionLeaseExist(ShardId(params.leaderIndex, followerShardId.id), followerShardId)) { isResumable = false } } diff --git a/src/main/kotlin/com/amazon/elasticsearch/replication/seqno/RemoteClusterRetentionLeaseHelper.kt b/src/main/kotlin/com/amazon/elasticsearch/replication/seqno/RemoteClusterRetentionLeaseHelper.kt index ed3bc651..19f1f1b6 100644 --- a/src/main/kotlin/com/amazon/elasticsearch/replication/seqno/RemoteClusterRetentionLeaseHelper.kt +++ b/src/main/kotlin/com/amazon/elasticsearch/replication/seqno/RemoteClusterRetentionLeaseHelper.kt @@ -62,7 +62,7 @@ class RemoteClusterRetentionLeaseHelper constructor(var followerClusterNameWithU } } - public suspend fun verifyRetentionLeaseExist(leaderShardId: ShardId, followerShardId: ShardId, seqNo: Long): Boolean { + public suspend fun verifyRetentionLeaseExist(leaderShardId: ShardId, followerShardId: ShardId): Boolean { val retentionLeaseId = retentionLeaseIdForShard(followerClusterNameWithUUID, followerShardId) // Currently there is no API to describe/list the retention leases . // So we are verifying the existence of lease by trying to renew a lease by same name . @@ -78,7 +78,7 @@ class RemoteClusterRetentionLeaseHelper constructor(var followerClusterNameWithU return true } catch (e: RetentionLeaseNotFoundException) { - return addNewRetentionLeaseIfOldExists(leaderShardId, followerShardId, seqNo) + return addNewRetentionLeaseIfOldExists(leaderShardId, followerShardId, RetentionLeaseActions.RETAIN_ALL) }catch (e : Exception) { return false }