From 605d32b383d391c13c4e073d985995fdda0650a9 Mon Sep 17 00:00:00 2001 From: Sai Date: Thu, 1 Jul 2021 10:35:11 +0530 Subject: [PATCH] Added random active shard selector for the getchanges operations --- .../replication/action/changes/GetChangesRequest.kt | 13 ++----------- .../action/changes/TransportGetChangesAction.kt | 4 ++-- .../replication/task/shard/ShardReplicationTask.kt | 8 +++----- 3 files changed, 7 insertions(+), 18 deletions(-) diff --git a/src/main/kotlin/com/amazon/elasticsearch/replication/action/changes/GetChangesRequest.kt b/src/main/kotlin/com/amazon/elasticsearch/replication/action/changes/GetChangesRequest.kt index 23eb13d26..4bb3a7b27 100644 --- a/src/main/kotlin/com/amazon/elasticsearch/replication/action/changes/GetChangesRequest.kt +++ b/src/main/kotlin/com/amazon/elasticsearch/replication/action/changes/GetChangesRequest.kt @@ -23,22 +23,18 @@ import org.elasticsearch.common.io.stream.StreamOutput import org.elasticsearch.index.shard.ShardId import org.elasticsearch.transport.RemoteClusterAwareRequest -class GetChangesRequest : SingleShardRequest, RemoteClusterAwareRequest { - - val remoteNode: DiscoveryNode +class GetChangesRequest : SingleShardRequest { val shardId : ShardId val fromSeqNo: Long val toSeqNo: Long - constructor(remoteNode: DiscoveryNode, shardId: ShardId, fromSeqNo: Long, toSeqNo: Long) : super(shardId.indexName) { - this.remoteNode = remoteNode + constructor(shardId: ShardId, fromSeqNo: Long, toSeqNo: Long) : super(shardId.indexName) { this.shardId = shardId this.fromSeqNo = fromSeqNo this.toSeqNo = toSeqNo } constructor(input : StreamInput) : super(input) { - this.remoteNode = DiscoveryNode(input) this.shardId = ShardId(input) this.fromSeqNo = input.readLong() this.toSeqNo = input.readVLong() @@ -50,13 +46,8 @@ class GetChangesRequest : SingleShardRequest, RemoteClusterAw override fun writeTo(out: StreamOutput) { super.writeTo(out) - remoteNode.writeTo(out) shardId.writeTo(out) out.writeLong(fromSeqNo) out.writeVLong(toSeqNo) } - - override fun getPreferredTargetNode(): DiscoveryNode { - return remoteNode - } } diff --git a/src/main/kotlin/com/amazon/elasticsearch/replication/action/changes/TransportGetChangesAction.kt b/src/main/kotlin/com/amazon/elasticsearch/replication/action/changes/TransportGetChangesAction.kt index efcbc981e..ffbc4300c 100644 --- a/src/main/kotlin/com/amazon/elasticsearch/replication/action/changes/TransportGetChangesAction.kt +++ b/src/main/kotlin/com/amazon/elasticsearch/replication/action/changes/TransportGetChangesAction.kt @@ -107,7 +107,7 @@ class TransportGetChangesAction @Inject constructor(threadPool: ThreadPool, clus } override fun shards(state: ClusterState, request: InternalRequest): ShardsIterator { - // TODO: Investigate using any active shards instead of just primary - return state.routingTable().shardRoutingTable(request.request().shardId).primaryShardIt() + // Random active shards + return state.routingTable().shardRoutingTable(request.request().shardId).activeInitializingShardsRandomIt() } } \ No newline at end of file diff --git a/src/main/kotlin/com/amazon/elasticsearch/replication/task/shard/ShardReplicationTask.kt b/src/main/kotlin/com/amazon/elasticsearch/replication/task/shard/ShardReplicationTask.kt index e8444ddff..7a6d28cd4 100644 --- a/src/main/kotlin/com/amazon/elasticsearch/replication/task/shard/ShardReplicationTask.kt +++ b/src/main/kotlin/com/amazon/elasticsearch/replication/task/shard/ShardReplicationTask.kt @@ -140,7 +140,6 @@ class ShardReplicationTask(id: Long, type: String, action: String, description: log.info("Adding retentionlease at follower sequence number: ${indexShard.lastSyncedGlobalCheckpoint}") retentionLeaseHelper.addRetentionLease(remoteShardId, indexShard.lastSyncedGlobalCheckpoint , followerShardId) - var node = primaryShardNode() addListenerToInterruptTask() // Not really used yet as we only have one get changes action at a time. @@ -154,7 +153,7 @@ class ShardReplicationTask(id: Long, type: String, action: String, description: while (scope.isActive) { rateLimiter.acquire() try { - val changesResponse = getChanges(node, seqNo) + val changesResponse = getChanges(seqNo) log.info("Got ${changesResponse.changes.size} changes starting from seqNo: $seqNo") sequencer.send(changesResponse) seqNo = changesResponse.changes.lastOrNull()?.seqNo()?.inc() ?: seqNo @@ -165,7 +164,6 @@ class ShardReplicationTask(id: Long, type: String, action: String, description: } catch (e: NodeNotConnectedException) { log.info("Node not connected. Retrying request using a different node. $e") delay(backOffForNodeDiscovery) - node = primaryShardNode() rateLimiter.release() continue } @@ -199,9 +197,9 @@ class ShardReplicationTask(id: Long, type: String, action: String, description: ?: throw NoSuchNodeException("remote: $remoteCluster:${shardRouting.currentNodeId()}") } - private suspend fun getChanges(remoteNode: DiscoveryNode, fromSeqNo: Long): GetChangesResponse { + private suspend fun getChanges(fromSeqNo: Long): GetChangesResponse { val remoteClient = client.getRemoteClusterClient(remoteCluster) - val request = GetChangesRequest(remoteNode, remoteShardId, fromSeqNo, fromSeqNo + batchSize) + val request = GetChangesRequest(remoteShardId, fromSeqNo, fromSeqNo + batchSize) return remoteClient.suspendExecuteWithRetries(replicationMetadata = replicationMetadata, action = GetChangesAction.INSTANCE, req = request, log = log) }