Skip to content

Commit

Permalink
Switches UpdateManagedIndexMetaData to batch tasks using custom execu…
Browse files Browse the repository at this point in the history
…tor (opendistro-for-elasticsearch#209)

* Switches UpdateManagedIndexMetaData to batch tasks using custom executor

* Removes unneeded type
  • Loading branch information
dbbaughe committed Apr 30, 2020
1 parent 8440a65 commit 936a259
Showing 1 changed file with 67 additions and 40 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -28,19 +28,25 @@ import org.elasticsearch.action.support.ActionFilters
import org.elasticsearch.action.support.master.AcknowledgedResponse
import org.elasticsearch.action.support.master.TransportMasterNodeAction
import org.elasticsearch.client.Client
import org.elasticsearch.cluster.AckedClusterStateUpdateTask
import org.elasticsearch.cluster.ClusterState
import org.elasticsearch.cluster.ClusterStateTaskConfig
import org.elasticsearch.cluster.ClusterStateTaskExecutor
import org.elasticsearch.cluster.ClusterStateTaskExecutor.ClusterTasksResult
import org.elasticsearch.cluster.ClusterStateTaskListener
import org.elasticsearch.cluster.block.ClusterBlockException
import org.elasticsearch.cluster.block.ClusterBlockLevel
import org.elasticsearch.cluster.metadata.IndexMetaData
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver
import org.elasticsearch.cluster.metadata.MetaData
import org.elasticsearch.cluster.service.ClusterService
import org.elasticsearch.common.Priority
import org.elasticsearch.common.inject.Inject
import org.elasticsearch.common.io.stream.StreamInput
import org.elasticsearch.common.io.stream.Writeable
import org.elasticsearch.index.Index
import org.elasticsearch.threadpool.ThreadPool
import org.elasticsearch.transport.TransportService
import java.lang.Exception

class TransportUpdateManagedIndexMetaDataAction : TransportMasterNodeAction<UpdateManagedIndexMetaDataRequest, AcknowledgedResponse> {

Expand Down Expand Up @@ -69,6 +75,7 @@ class TransportUpdateManagedIndexMetaDataAction : TransportMasterNodeAction<Upda
private val log = LogManager.getLogger(javaClass)
private val client: Client
private val indexStateManagementHistory: IndexStateManagementHistory
private val executor = ManagedIndexMetaDataExecutor()

override fun checkBlock(request: UpdateManagedIndexMetaDataRequest, state: ClusterState): ClusterBlockException? {
// https://github.com/elastic/elasticsearch/commit/ae14b4e6f96b554ca8f4aaf4039b468f52df0123
Expand All @@ -87,45 +94,14 @@ class TransportUpdateManagedIndexMetaDataAction : TransportMasterNodeAction<Upda
) {
clusterService.submitStateUpdateTask(
IndexStateManagementPlugin.PLUGIN_NAME,
object : AckedClusterStateUpdateTask<AcknowledgedResponse>(request, listener) {
override fun execute(currentState: ClusterState): ClusterState {
// If there are no indices to make changes to, return early.
// Also doing this because when creating a metaDataBuilder and making no changes to it, for some
// reason the task does not complete, leading to indefinite suspension.
if (request.indicesToAddManagedIndexMetaDataTo.isEmpty() &&
request.indicesToRemoveManagedIndexMetaDataFrom.isEmpty()
) {
return currentState
}

val metaDataBuilder = MetaData.builder(currentState.metaData)

for (pair in request.indicesToAddManagedIndexMetaDataTo) {
if (currentState.metaData.hasIndex(pair.first.name)) {
metaDataBuilder.put(IndexMetaData.builder(currentState.metaData.index(pair.first))
.putCustom(ManagedIndexMetaData.MANAGED_INDEX_METADATA, pair.second.toMap()))
} else {
log.debug("No IndexMetaData found for [${pair.first.name}] when updating ManagedIndexMetaData")
}
}

for (index in request.indicesToRemoveManagedIndexMetaDataFrom) {
if (currentState.metaData.hasIndex(index.name)) {
val indexMetaDataBuilder = IndexMetaData.builder(currentState.metaData.index(index))
indexMetaDataBuilder.removeCustom(ManagedIndexMetaData.MANAGED_INDEX_METADATA)

metaDataBuilder.put(indexMetaDataBuilder)
} else {
log.debug("No IndexMetaData found for [${index.name}] when removing ManagedIndexMetaData")
}
}

return ClusterState.builder(currentState).metaData(metaDataBuilder).build()
}

override fun newResponse(acknowledged: Boolean): AcknowledgedResponse {
return AcknowledgedResponse(acknowledged)
}
ManagedIndexMetaDataTask(request.indicesToAddManagedIndexMetaDataTo, request.indicesToRemoveManagedIndexMetaDataFrom),
ClusterStateTaskConfig.build(Priority.NORMAL),
executor,
object : ClusterStateTaskListener {
override fun onFailure(source: String, e: Exception) = listener.onFailure(e)

override fun clusterStateProcessed(source: String, oldState: ClusterState, newState: ClusterState) =
listener.onResponse(AcknowledgedResponse(true))
}
)

Expand All @@ -143,4 +119,55 @@ class TransportUpdateManagedIndexMetaDataAction : TransportMasterNodeAction<Upda
override fun executor(): String {
return ThreadPool.Names.SAME
}

inner class ManagedIndexMetaDataExecutor : ClusterStateTaskExecutor<ManagedIndexMetaDataTask> {

override fun execute(currentState: ClusterState, tasks: List<ManagedIndexMetaDataTask>): ClusterTasksResult<ManagedIndexMetaDataTask> {
val newClusterState = getUpdatedClusterState(currentState, tasks)
return ClusterTasksResult.builder<ManagedIndexMetaDataTask>().successes(tasks).build(newClusterState)
}
}

fun getUpdatedClusterState(currentState: ClusterState, tasks: List<ManagedIndexMetaDataTask>): ClusterState {
// If there are no indices to make changes to, return early.
// Also doing this because when creating a metaDataBuilder and making no changes to it, for some
// reason the task does not complete, leading to indefinite suspension.
if (tasks.all { it.indicesToAddManagedIndexMetaDataTo.isEmpty() && it.indicesToRemoveManagedIndexMetaDataFrom.isEmpty() }
) {
return currentState
}
log.trace("Start of building new cluster state")
val metaDataBuilder = MetaData.builder(currentState.metaData)
for (task in tasks) {
for (pair in task.indicesToAddManagedIndexMetaDataTo) {
if (currentState.metaData.hasIndex(pair.first.name)) {
metaDataBuilder.put(IndexMetaData.builder(currentState.metaData.index(pair.first))
.putCustom(ManagedIndexMetaData.MANAGED_INDEX_METADATA, pair.second.toMap()))
} else {
log.debug("No IndexMetaData found for [${pair.first.name}] when updating ManagedIndexMetaData")
}
}

for (index in task.indicesToRemoveManagedIndexMetaDataFrom) {
if (currentState.metaData.hasIndex(index.name)) {
val indexMetaDataBuilder = IndexMetaData.builder(currentState.metaData.index(index))
indexMetaDataBuilder.removeCustom(ManagedIndexMetaData.MANAGED_INDEX_METADATA)

metaDataBuilder.put(indexMetaDataBuilder)
} else {
log.debug("No IndexMetaData found for [${index.name}] when removing ManagedIndexMetaData")
}
}
}
log.trace("End of building new cluster state")

return ClusterState.builder(currentState).metaData(metaDataBuilder).build()
}

companion object {
data class ManagedIndexMetaDataTask(
val indicesToAddManagedIndexMetaDataTo: List<Pair<Index, ManagedIndexMetaData>>,
val indicesToRemoveManagedIndexMetaDataFrom: List<Index>
)
}
}

0 comments on commit 936a259

Please sign in to comment.