Skip to content

Commit

Permalink
Added random active shard selector for the getchanges operations
Browse files Browse the repository at this point in the history
  • Loading branch information
saikaranam-amazon committed Jul 1, 2021
1 parent bcd2ae1 commit 605d32b
Show file tree
Hide file tree
Showing 3 changed files with 7 additions and 18 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -23,22 +23,18 @@ import org.elasticsearch.common.io.stream.StreamOutput
import org.elasticsearch.index.shard.ShardId
import org.elasticsearch.transport.RemoteClusterAwareRequest

class GetChangesRequest : SingleShardRequest<GetChangesRequest>, RemoteClusterAwareRequest {

val remoteNode: DiscoveryNode
class GetChangesRequest : SingleShardRequest<GetChangesRequest> {
val shardId : ShardId
val fromSeqNo: Long
val toSeqNo: Long

constructor(remoteNode: DiscoveryNode, shardId: ShardId, fromSeqNo: Long, toSeqNo: Long) : super(shardId.indexName) {
this.remoteNode = remoteNode
constructor(shardId: ShardId, fromSeqNo: Long, toSeqNo: Long) : super(shardId.indexName) {
this.shardId = shardId
this.fromSeqNo = fromSeqNo
this.toSeqNo = toSeqNo
}

constructor(input : StreamInput) : super(input) {
this.remoteNode = DiscoveryNode(input)
this.shardId = ShardId(input)
this.fromSeqNo = input.readLong()
this.toSeqNo = input.readVLong()
Expand All @@ -50,13 +46,8 @@ class GetChangesRequest : SingleShardRequest<GetChangesRequest>, RemoteClusterAw

override fun writeTo(out: StreamOutput) {
super.writeTo(out)
remoteNode.writeTo(out)
shardId.writeTo(out)
out.writeLong(fromSeqNo)
out.writeVLong(toSeqNo)
}

override fun getPreferredTargetNode(): DiscoveryNode {
return remoteNode
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ class TransportGetChangesAction @Inject constructor(threadPool: ThreadPool, clus
}

override fun shards(state: ClusterState, request: InternalRequest): ShardsIterator {
// TODO: Investigate using any active shards instead of just primary
return state.routingTable().shardRoutingTable(request.request().shardId).primaryShardIt()
// Random active shards
return state.routingTable().shardRoutingTable(request.request().shardId).activeInitializingShardsRandomIt()
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,6 @@ class ShardReplicationTask(id: Long, type: String, action: String, description:
log.info("Adding retentionlease at follower sequence number: ${indexShard.lastSyncedGlobalCheckpoint}")
retentionLeaseHelper.addRetentionLease(remoteShardId, indexShard.lastSyncedGlobalCheckpoint , followerShardId)

var node = primaryShardNode()
addListenerToInterruptTask()

// Not really used yet as we only have one get changes action at a time.
Expand All @@ -154,7 +153,7 @@ class ShardReplicationTask(id: Long, type: String, action: String, description:
while (scope.isActive) {
rateLimiter.acquire()
try {
val changesResponse = getChanges(node, seqNo)
val changesResponse = getChanges(seqNo)
log.info("Got ${changesResponse.changes.size} changes starting from seqNo: $seqNo")
sequencer.send(changesResponse)
seqNo = changesResponse.changes.lastOrNull()?.seqNo()?.inc() ?: seqNo
Expand All @@ -165,7 +164,6 @@ class ShardReplicationTask(id: Long, type: String, action: String, description:
} catch (e: NodeNotConnectedException) {
log.info("Node not connected. Retrying request using a different node. $e")
delay(backOffForNodeDiscovery)
node = primaryShardNode()
rateLimiter.release()
continue
}
Expand Down Expand Up @@ -199,9 +197,9 @@ class ShardReplicationTask(id: Long, type: String, action: String, description:
?: throw NoSuchNodeException("remote: $remoteCluster:${shardRouting.currentNodeId()}")
}

private suspend fun getChanges(remoteNode: DiscoveryNode, fromSeqNo: Long): GetChangesResponse {
private suspend fun getChanges(fromSeqNo: Long): GetChangesResponse {
val remoteClient = client.getRemoteClusterClient(remoteCluster)
val request = GetChangesRequest(remoteNode, remoteShardId, fromSeqNo, fromSeqNo + batchSize)
val request = GetChangesRequest(remoteShardId, fromSeqNo, fromSeqNo + batchSize)
return remoteClient.suspendExecuteWithRetries(replicationMetadata = replicationMetadata,
action = GetChangesAction.INSTANCE, req = request, log = log)
}
Expand Down

0 comments on commit 605d32b

Please sign in to comment.