Skip to content

Commit

Permalink
Add support for parallel reader and writer for ShardReplicationTask
Browse files Browse the repository at this point in the history
  • Loading branch information
ankitkala committed Jul 13, 2021
1 parent a08ab83 commit a9c8200
Show file tree
Hide file tree
Showing 10 changed files with 222 additions and 84 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,7 @@ internal class ReplicationPlugin : Plugin(), ActionPlugin, PersistentTaskPlugin,
const val REPLICATION_EXECUTOR_NAME_FOLLOWER = "replication_follower"
val REPLICATED_INDEX_SETTING: Setting<String> = Setting.simpleString("index.plugins.replication.replicated",
Setting.Property.InternalIndex, Setting.Property.IndexScope)
val REPLICATION_CHANGE_BATCH_SIZE: Setting<Int> = Setting.intSetting("plugins.replication.ops_batch_size", 512, 16,
val REPLICATION_CHANGE_BATCH_SIZE: Setting<Int> = Setting.intSetting("plugins.replication.ops_batch_size", 50000, 16,
Setting.Property.Dynamic, Setting.Property.NodeScope)
val REPLICATION_LEADER_THREADPOOL_SIZE: Setting<Int> = Setting.intSetting("plugins.replication.leader.size", 0, 0,
Setting.Property.Dynamic, Setting.Property.NodeScope)
Expand All @@ -152,6 +152,10 @@ internal class ReplicationPlugin : Plugin(), ActionPlugin, PersistentTaskPlugin,
Setting.Property.Dynamic, Setting.Property.NodeScope)
val REPLICATION_FOLLOWER_RECOVERY_PARALLEL_CHUNKS: Setting<Int> = Setting.intSetting("plugins.replication.index.recovery.max_concurrent_file_chunks", 5, 1,
Setting.Property.Dynamic, Setting.Property.NodeScope)
val REPLICATION_PARALLEL_READ_PER_SHARD = Setting.intSetting("plugins.replication.parallel_reads_per_shard", 2, 1,
Setting.Property.Dynamic, Setting.Property.NodeScope)
val REPLICATION_PARALLEL_READ_POLL_DURATION = Setting.timeSetting ("plugins.replication.parallel_reads_poll_duration", TimeValue.timeValueMillis(50), TimeValue.timeValueMillis(1),
TimeValue.timeValueSeconds(1), Setting.Property.Dynamic, Setting.Property.NodeScope)
}

override fun createComponents(client: Client, clusterService: ClusterService, threadPool: ThreadPool,
Expand Down Expand Up @@ -243,7 +247,7 @@ internal class ReplicationPlugin : Plugin(), ActionPlugin, PersistentTaskPlugin,
expressionResolver: IndexNameExpressionResolver)
: List<PersistentTasksExecutor<*>> {
return listOf(
ShardReplicationExecutor(REPLICATION_EXECUTOR_NAME_FOLLOWER, clusterService, threadPool, client, replicationMetadataManager),
ShardReplicationExecutor(REPLICATION_EXECUTOR_NAME_FOLLOWER, clusterService, threadPool, client, replicationMetadataManager, replicationSettings),
IndexReplicationExecutor(REPLICATION_EXECUTOR_NAME_FOLLOWER, clusterService, threadPool, client, replicationMetadataManager),
AutoFollowExecutor(REPLICATION_EXECUTOR_NAME_FOLLOWER, clusterService, threadPool, client, replicationMetadataManager))
}
Expand Down Expand Up @@ -296,9 +300,10 @@ internal class ReplicationPlugin : Plugin(), ActionPlugin, PersistentTaskPlugin,
}

override fun getSettings(): List<Setting<*>> {
return listOf(REPLICATED_INDEX_SETTING, REPLICATION_CHANGE_BATCH_SIZE,
REPLICATION_LEADER_THREADPOOL_SIZE, REPLICATION_LEADER_THREADPOOL_QUEUE_SIZE,
REPLICATION_FOLLOWER_RECOVERY_CHUNK_SIZE, REPLICATION_FOLLOWER_RECOVERY_PARALLEL_CHUNKS)
return listOf(REPLICATED_INDEX_SETTING, REPLICATION_CHANGE_BATCH_SIZE, REPLICATION_LEADER_THREADPOOL_SIZE,
REPLICATION_LEADER_THREADPOOL_QUEUE_SIZE, REPLICATION_PARALLEL_READ_PER_SHARD,
REPLICATION_FOLLOWER_RECOVERY_CHUNK_SIZE, REPLICATION_FOLLOWER_RECOVERY_PARALLEL_CHUNKS,
REPLICATION_PARALLEL_READ_POLL_DURATION)
}

override fun getInternalRepositories(env: Environment, namedXContentRegistry: NamedXContentRegistry,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,15 @@ package com.amazon.elasticsearch.replication
import org.elasticsearch.cluster.service.ClusterService
import org.elasticsearch.common.settings.ClusterSettings
import org.elasticsearch.common.unit.ByteSizeValue
import org.elasticsearch.common.unit.TimeValue

class ReplicationSettings(clusterService: ClusterService) {

@Volatile
var chunkSize = ReplicationPlugin.REPLICATION_FOLLOWER_RECOVERY_CHUNK_SIZE.get(clusterService.settings)

@Volatile
var concurrentFileChunks = ReplicationPlugin.REPLICATION_FOLLOWER_RECOVERY_PARALLEL_CHUNKS.get(clusterService.settings)
@Volatile var chunkSize = ReplicationPlugin.REPLICATION_FOLLOWER_RECOVERY_CHUNK_SIZE.get(clusterService.settings)
@Volatile var concurrentFileChunks = ReplicationPlugin.REPLICATION_FOLLOWER_RECOVERY_PARALLEL_CHUNKS.get(clusterService.settings)
@Volatile var readersPerShard = clusterService.clusterSettings.get(ReplicationPlugin.REPLICATION_PARALLEL_READ_PER_SHARD)
@Volatile var batchSize = clusterService.clusterSettings.get(ReplicationPlugin.REPLICATION_CHANGE_BATCH_SIZE)
@Volatile var pollDuration: TimeValue = clusterService.clusterSettings.get(ReplicationPlugin.REPLICATION_PARALLEL_READ_POLL_DURATION)

init {
listenForUpdates(clusterService.clusterSettings)
Expand All @@ -19,5 +20,9 @@ class ReplicationSettings(clusterService: ClusterService) {
private fun listenForUpdates(clusterSettings: ClusterSettings) {
clusterSettings.addSettingsUpdateConsumer(ReplicationPlugin.REPLICATION_FOLLOWER_RECOVERY_CHUNK_SIZE) { value: ByteSizeValue -> this.chunkSize = value}
clusterSettings.addSettingsUpdateConsumer(ReplicationPlugin.REPLICATION_FOLLOWER_RECOVERY_PARALLEL_CHUNKS) { value: Int -> this.concurrentFileChunks = value}
clusterSettings.addSettingsUpdateConsumer(ReplicationPlugin.REPLICATION_PARALLEL_READ_PER_SHARD) { value: Int -> this.readersPerShard = value}
clusterSettings.addSettingsUpdateConsumer(ReplicationPlugin.REPLICATION_CHANGE_BATCH_SIZE) { batchSize = it }
clusterSettings.addSettingsUpdateConsumer(ReplicationPlugin.REPLICATION_PARALLEL_READ_POLL_DURATION) { pollDuration = it }

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,16 @@ import org.elasticsearch.index.translog.Translog

class GetChangesResponse(val changes: List<Translog.Operation>,
val fromSeqNo: Long,
val maxSeqNoOfUpdatesOrDeletes: Long) : ActionResponse() {
val maxSeqNoOfUpdatesOrDeletes: Long,
val lastSyncedGlobalCheckpoint: Long) : ActionResponse() {

constructor(inp: StreamInput) : this(inp.readList(Translog.Operation::readOperation), inp.readVLong(), inp.readLong())
constructor(inp: StreamInput) : this(inp.readList(Translog.Operation::readOperation), inp.readVLong(),
inp.readLong(), inp.readLong())

override fun writeTo(out: StreamOutput) {
out.writeCollection(changes, Translog.Operation::writeOperation)
out.writeVLong(fromSeqNo)
out.writeLong(maxSeqNoOfUpdatesOrDeletes)
out.writeLong(lastSyncedGlobalCheckpoint)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ class TransportGetChangesAction @Inject constructor(threadPool: ThreadPool, clus
}
}
}
GetChangesResponse(ops, request.fromSeqNo, indexShard.maxSeqNoOfUpdatesOrDeletes)
GetChangesResponse(ops, request.fromSeqNo, indexShard.maxSeqNoOfUpdatesOrDeletes, indexShard.lastSyncedGlobalCheckpoint)
}
}
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
package com.amazon.elasticsearch.replication.task.shard

import com.amazon.elasticsearch.replication.ReplicationSettings
import kotlinx.coroutines.delay
import kotlinx.coroutines.sync.Mutex
import kotlinx.coroutines.sync.withLock
import org.elasticsearch.common.logging.Loggers
import org.elasticsearch.index.shard.IndexShard
import java.util.Collections
import java.util.concurrent.atomic.AtomicLong
import kotlin.collections.ArrayList

/**
* Since we have added support for fetching batch of operations in parallel, we need to keep track of
* how many operations have been fetched and what batch needs to be fetched next. This creates the
* problem of concurrency with shared mutable state (https://kotlinlang.org/docs/shared-mutable-state-and-concurrency.html).
* ShardReplicationChangesTracker abstracts away all that complexity from ShardReplicationTask.
* Every reader coroutine in a shard has to interact with the tracker for:
* 1. Requesting the range of operations to be fetched in the batch.
* 2. Updating the final status of the batch fetch.
*/
class ShardReplicationChangesTracker(indexShard: IndexShard, private val replicationSettings: ReplicationSettings) {
private val log = Loggers.getLogger(javaClass, indexShard.shardId())!!

private val mutex = Mutex()
private val missingBatches = Collections.synchronizedList(ArrayList<Pair<Long, Long>>())
private val observedSeqNoAtLeader = AtomicLong(indexShard.localCheckpoint)
private val seqNoAlreadyRequested = AtomicLong(indexShard.localCheckpoint)
private val batchSize = replicationSettings.batchSize

/**
* Provides a range of operations to be fetched next.
*
* Here are the guarantees that this method provides:
* 1. All reader coroutines get unique range of operations to fetch.
* 2. It'll ensure that the complete range of operations would be fetched.
* 3. Mutex in this method ensures that only one coroutine is requesting the batch at a time.
* If there are multiple coroutines, they'll be waiting in order to get the range of operations to fetch.
* 4. If we've already fetched all the operations from leader, there would be one and only one
* reader polling on leader per shard.
*/
suspend fun requestBatchToFetch():Pair<Long, Long> {
mutex.withLock {
logDebug("Waiting to get batch. requested: ${seqNoAlreadyRequested.get()}, leader: ${observedSeqNoAtLeader.get()}")

// Wait till we have batch to fetch. Note that if seqNoAlreadyRequested is equal to observedSeqNoAtLeader,
// we still should be sending one more request to fetch which will just do a poll and eventually timeout
// if no new operations are there on the leader (configured via TransportGetChangesAction.WAIT_FOR_NEW_OPS_TIMEOUT)
while (seqNoAlreadyRequested.get() > observedSeqNoAtLeader.get() && missingBatches.isEmpty()) {
delay(replicationSettings.pollDuration.millis)
}

// missing batch takes higher priority.
return if (missingBatches.isNotEmpty()) {
logDebug("Fetching missing batch ${missingBatches[0].first}-${missingBatches[0].second}")
missingBatches.removeAt(0)
} else {
// return the next batch to fetch and update seqNoAlreadyRequested.
val fromSeq = seqNoAlreadyRequested.getAndAdd(batchSize.toLong()) + 1
val toSeq = fromSeq + batchSize - 1
logDebug("Fetching the batch $fromSeq-$toSeq")
Pair(fromSeq, toSeq)
}
}
}

/**
* Ensures that we've successfully fetched a particular range of operations.
* In case of any failure(or we didn't get complete batch), we make sure that we're fetching the
* missing operations in the next batch.
*/
fun updateBatchFetched(success: Boolean, fromSeqNoRequested: Long, toSeqNoRequested: Long, toSeqNoReceived: Long, seqNoAtLeader: Long) {
if (success) {
// we shouldn't ever be getting more operations than requested.
assert(toSeqNoRequested >= toSeqNoReceived) { "${Thread.currentThread().getName()} Got more operations in the batch than requested" }
logDebug("Updating the batch fetched. ${fromSeqNoRequested}-${toSeqNoReceived}/${toSeqNoRequested}, seqNoAtLeader:$seqNoAtLeader")

// If we didn't get the complete batch that we had requested.
if (toSeqNoRequested > toSeqNoReceived) {
// If this is the last batch being fetched, update the seqNoAlreadyRequested.
if (seqNoAlreadyRequested.get() == toSeqNoRequested) {
seqNoAlreadyRequested.updateAndGet { toSeqNoReceived }
} else {
// Else, add to the missing operations to missing batch
logDebug("Didn't get the complete batch. Adding the missing operations ${toSeqNoReceived + 1}-${toSeqNoRequested}")
missingBatches.add(Pair(toSeqNoReceived + 1, toSeqNoRequested))
}
}

// Update the sequence number observed at leader.
observedSeqNoAtLeader.getAndUpdate { value -> if (seqNoAtLeader > value) seqNoAtLeader else value }
logDebug("observedSeqNoAtLeader: ${observedSeqNoAtLeader.get()}")
} else {
// If this is the last batch being fetched, update the seqNoAlreadyRequested.
if (seqNoAlreadyRequested.get() == toSeqNoRequested) {
seqNoAlreadyRequested.updateAndGet { fromSeqNoRequested - 1 }
} else {
// If this was not the last batch, we might have already fetched other batch of
// operations after this. Adding this to missing.
logDebug("Adding batch to missing $fromSeqNoRequested-$toSeqNoRequested")
missingBatches.add(Pair(fromSeqNoRequested, toSeqNoRequested))
}
}
}

private fun logDebug(msg: String) {
log.debug("${Thread.currentThread().name}: $msg")
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@

package com.amazon.elasticsearch.replication.task.shard

import com.amazon.elasticsearch.replication.ReplicationSettings
import com.amazon.elasticsearch.replication.metadata.ReplicationMetadataManager
import com.amazon.elasticsearch.replication.metadata.ReplicationOverallState
import com.amazon.elasticsearch.replication.metadata.state.REPLICATION_LAST_KNOWN_OVERALL_STATE
Expand All @@ -34,7 +35,8 @@ import org.elasticsearch.threadpool.ThreadPool

class ShardReplicationExecutor(executor: String, private val clusterService : ClusterService,
private val threadPool: ThreadPool, private val client: Client,
private val replicationMetadataManager: ReplicationMetadataManager) :
private val replicationMetadataManager: ReplicationMetadataManager,
private val replicationSettings: ReplicationSettings) :
PersistentTasksExecutor<ShardReplicationParams>(TASK_NAME, executor) {

companion object {
Expand Down Expand Up @@ -75,7 +77,8 @@ class ShardReplicationExecutor(executor: String, private val clusterService : Cl
taskInProgress: PersistentTask<ShardReplicationParams>,
headers: Map<String, String>): AllocatedPersistentTask {
return ShardReplicationTask(id, type, action, getDescription(taskInProgress), parentTaskId,
taskInProgress.params!!, executor, clusterService, threadPool, client, replicationMetadataManager)
taskInProgress.params!!, executor, clusterService, threadPool,
client, replicationMetadataManager, replicationSettings)
}

override fun getDescription(taskInProgress: PersistentTask<ShardReplicationParams>): String {
Expand Down
Loading

0 comments on commit a9c8200

Please sign in to comment.