Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Configure the threadpool for leader cluster #14

Merged
merged 1 commit into from
Jun 15, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -102,17 +102,23 @@ import java.util.Optional
import java.util.function.Supplier
import com.amazon.elasticsearch.replication.action.index.block.UpdateIndexBlockAction
import com.amazon.elasticsearch.replication.action.index.block.TransportUpddateIndexBlockAction
import org.elasticsearch.common.util.concurrent.EsExecutors
import org.elasticsearch.threadpool.FixedExecutorBuilder

internal class ReplicationPlugin : Plugin(), ActionPlugin, PersistentTaskPlugin, RepositoryPlugin, EnginePlugin {

private lateinit var client: Client
private lateinit var threadPool: ThreadPool

companion object {
const val REPLICATION_EXECUTOR_NAME = "replication"
val REPLICATED_INDEX_SETTING = Setting.simpleString("index.opendistro.replicated",
const val REPLICATION_EXECUTOR_NAME_LEADER = "replication_leader"
const val REPLICATION_EXECUTOR_NAME_FOLLOWER = "replication_follower"
val REPLICATED_INDEX_SETTING: Setting<String> = Setting.simpleString("index.opendistro.replicated",
Setting.Property.InternalIndex, Setting.Property.IndexScope)
val REPLICATION_CHANGE_BATCH_SIZE = Setting.intSetting("opendistro.replication.ops_batch_size", 512, 16,
val REPLICATION_CHANGE_BATCH_SIZE: Setting<Int> = Setting.intSetting("opendistro.replication.ops_batch_size", 512, 16,
Setting.Property.Dynamic, Setting.Property.NodeScope)
val REPLICATION_LEADER_THREADPOOL_SIZE: Setting<Int> = Setting.intSetting("opendistro.replication.leader.size", 0, 0,
Setting.Property.Dynamic, Setting.Property.NodeScope)
val REPLICATION_LEADER_THREADPOOL_QUEUE_SIZE: Setting<Int> = Setting.intSetting("opendistro.replication.leader.queue_size", 1000, 0,
Setting.Property.Dynamic, Setting.Property.NodeScope)
}

Expand Down Expand Up @@ -157,18 +163,38 @@ internal class ReplicationPlugin : Plugin(), ActionPlugin, PersistentTaskPlugin,
}

override fun getExecutorBuilders(settings: Settings): List<ExecutorBuilder<*>> {
//TODO: get the executor size from settings
return listOf(ScalingExecutorBuilder(REPLICATION_EXECUTOR_NAME, 1, 10, TimeValue.timeValueMinutes(1)))
return listOf(followerExecutorBuilder(settings), leaderExecutorBuilder(settings))
}

private fun followerExecutorBuilder(settings: Settings): ExecutorBuilder<*> {
return ScalingExecutorBuilder(REPLICATION_EXECUTOR_NAME_FOLLOWER, 1, 10, TimeValue.timeValueMinutes(1), REPLICATION_EXECUTOR_NAME_FOLLOWER)
}

/**
* Keeping the default configuration for threadpool in parity with search threadpool which is what we were using earlier.
* https://github.com/opensearch-project/OpenSearch/blob/main/server/src/main/java/org/opensearch/threadpool/ThreadPool.java#L195
*/
private fun leaderExecutorBuilder(settings: Settings): ExecutorBuilder<*> {
val availableProcessors = EsExecutors.allocatedProcessors(settings)
val leaderThreadPoolQueueSize = REPLICATION_LEADER_THREADPOOL_QUEUE_SIZE.get(settings)

var leaderThreadPoolSize = REPLICATION_LEADER_THREADPOOL_SIZE.get(settings)
leaderThreadPoolSize = if (leaderThreadPoolSize > 0) leaderThreadPoolSize else leaderThreadPoolSize(availableProcessors)

return FixedExecutorBuilder(settings, REPLICATION_EXECUTOR_NAME_LEADER, leaderThreadPoolSize, leaderThreadPoolQueueSize, REPLICATION_EXECUTOR_NAME_LEADER)
}
fun leaderThreadPoolSize(allocatedProcessors: Int): Int {
return allocatedProcessors * 3 / 2 + 1
}

override fun getPersistentTasksExecutor(clusterService: ClusterService, threadPool: ThreadPool, client: Client,
settingsModule: SettingsModule,
expressionResolver: IndexNameExpressionResolver)
: List<PersistentTasksExecutor<*>> {
return listOf(
ShardReplicationExecutor(REPLICATION_EXECUTOR_NAME, clusterService, threadPool, client),
IndexReplicationExecutor(REPLICATION_EXECUTOR_NAME, clusterService, threadPool, client),
AutoFollowExecutor(REPLICATION_EXECUTOR_NAME, clusterService, threadPool, client))
ShardReplicationExecutor(REPLICATION_EXECUTOR_NAME_FOLLOWER, clusterService, threadPool, client),
IndexReplicationExecutor(REPLICATION_EXECUTOR_NAME_FOLLOWER, clusterService, threadPool, client),
AutoFollowExecutor(REPLICATION_EXECUTOR_NAME_FOLLOWER, clusterService, threadPool, client))
}

override fun getNamedWriteables(): List<NamedWriteableRegistry.Entry> {
Expand Down Expand Up @@ -219,7 +245,7 @@ internal class ReplicationPlugin : Plugin(), ActionPlugin, PersistentTaskPlugin,
}

override fun getSettings(): List<Setting<*>> {
return listOf(REPLICATED_INDEX_SETTING, REPLICATION_CHANGE_BATCH_SIZE)
return listOf(REPLICATED_INDEX_SETTING, REPLICATION_CHANGE_BATCH_SIZE, REPLICATION_LEADER_THREADPOOL_SIZE, REPLICATION_LEADER_THREADPOOL_QUEUE_SIZE)
}

override fun getInternalRepositories(env: Environment, namedXContentRegistry: NamedXContentRegistry,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import com.amazon.elasticsearch.replication.action.repository.GetFileChunkAction
import com.amazon.elasticsearch.replication.util.completeWith
import com.amazon.elasticsearch.replication.util.coroutineContext
import com.amazon.elasticsearch.replication.util.waitForGlobalCheckpoint
import com.amazon.elasticsearch.replication.ReplicationPlugin.Companion.REPLICATION_EXECUTOR_NAME_LEADER
import kotlinx.coroutines.GlobalScope
import kotlinx.coroutines.launch
import org.elasticsearch.ElasticsearchTimeoutException
Expand Down Expand Up @@ -47,7 +48,7 @@ class TransportGetChangesAction @Inject constructor(threadPool: ThreadPool, clus
private val indicesService: IndicesService) :
TransportSingleShardAction<GetChangesRequest, GetChangesResponse>(
GetChangesAction.NAME, threadPool, clusterService, transportService, actionFilters,
indexNameExpressionResolver, ::GetChangesRequest, ThreadPool.Names.SEARCH) {
indexNameExpressionResolver, ::GetChangesRequest, REPLICATION_EXECUTOR_NAME_LEADER) {

init {
TransportActionProxy.registerProxyAction(transportService, GetChangesAction.NAME, ::GetChangesResponse)
Expand All @@ -63,7 +64,7 @@ class TransportGetChangesAction @Inject constructor(threadPool: ThreadPool, clus

@Suppress("BlockingMethodInNonBlockingContext")
override fun asyncShardOperation(request: GetChangesRequest, shardId: ShardId, listener: ActionListener<GetChangesResponse>) {
GlobalScope.launch(threadPool.coroutineContext(ThreadPool.Names.SEARCH)) {
GlobalScope.launch(threadPool.coroutineContext(REPLICATION_EXECUTOR_NAME_LEADER)) {
// TODO: Figure out if we need to acquire a primary permit here
listener.completeWith {
val indexShard = indicesService.indexServiceSafe(shardId.index).getShard(shardId.id)
Expand Down