Skip to content

Commit

Permalink
address review comments
Browse files Browse the repository at this point in the history
  • Loading branch information
ywelsch committed Apr 20, 2017
1 parent abe574f commit a1e1ec4
Show file tree
Hide file tree
Showing 8 changed files with 163 additions and 137 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,13 @@
*/
package org.elasticsearch.cluster;

import org.elasticsearch.cluster.service.TaskBatching;
import org.elasticsearch.common.Nullable;

import java.util.IdentityHashMap;
import java.util.List;
import java.util.Map;

public interface ClusterStateTaskExecutor<T> extends TaskBatching.BatchingKey<T> {
public interface ClusterStateTaskExecutor<T> {
/**
* Update the cluster state based on the current state and the given tasks. Return the *same instance* if no state
* should be changed.
Expand All @@ -48,6 +47,25 @@ default boolean runOnlyOnMaster() {
default void clusterStatePublished(ClusterChangedEvent clusterChangedEvent) {
}

/**
* Builds a concise description of a list of tasks (to be used in logging etc.).
*
* Note that the tasks given are not necessarily the same as those that will be passed to {@link #execute(ClusterState, List)}.
* but are guaranteed to be a subset of them. This method can be called multiple times with different lists before execution.
* This allows groupd task description but the submitting source.
*/
default String describeTasks(List<T> tasks) {
return tasks.stream().map(T::toString).reduce((s1,s2) -> {
if (s1.isEmpty()) {
return s2;
} else if (s2.isEmpty()) {
return s1;
} else {
return s1 + ", " + s2;
}
}).orElse("");
}

/**
* Represents the result of a batched execution of cluster state update tasks
* @param <T> the type of the cluster state update task
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@
import java.util.Queue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.Executor;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
Expand Down Expand Up @@ -109,7 +110,7 @@ public class ClusterService extends AbstractLifecycleComponent {
private TimeValue slowTaskLoggingThreshold;

private volatile PrioritizedEsThreadPoolExecutor threadPoolExecutor;
private volatile ClusterServiceTaskBatching taskBatching;
private volatile ClusterServiceTaskBatcher taskBatching;

/**
* Those 3 state listeners are changing infrequently - CopyOnWriteArrayList is just fine
Expand Down Expand Up @@ -215,8 +216,8 @@ protected synchronized void doStart() {
return ClusterState.builder(state).nodes(nodes).blocks(initialBlocks).build();
});
this.threadPoolExecutor = EsExecutors.newSinglePrioritizing(UPDATE_THREAD_NAME,
daemonThreadFactory(settings, UPDATE_THREAD_NAME), threadPool.getThreadContext());
this.taskBatching = new ClusterServiceTaskBatching(logger, threadPoolExecutor, threadPool);
daemonThreadFactory(settings, UPDATE_THREAD_NAME), threadPool.getThreadContext(), threadPool.scheduler());
this.taskBatching = new ClusterServiceTaskBatcher(logger, threadPoolExecutor);
}

@Override
Expand All @@ -240,32 +241,41 @@ protected synchronized void doStop() {
protected synchronized void doClose() {
}

class ClusterServiceTaskBatching extends TaskBatching {
class ClusterServiceTaskBatcher extends TaskBatcher {

ClusterServiceTaskBatching(Logger logger, PrioritizedEsThreadPoolExecutor threadExecutor, ThreadPool threadPool) {
super(logger, threadExecutor, threadPool);
ClusterServiceTaskBatcher(Logger logger, PrioritizedEsThreadPoolExecutor threadExecutor) {
super(logger, threadExecutor);
}

@Override
protected void onTimeout(BatchingTask task, TimeValue timeout) {
((UpdateTask) task).listener.onFailure(task.source, new ProcessClusterEventTimeoutException(timeout, task.source));
protected void onTimeout(List<? extends BatchedTask> tasks, TimeValue timeout) {
threadPool.generic().execute(
() -> tasks.forEach(
task -> ((UpdateTask) task).listener.onFailure(task.source,
new ProcessClusterEventTimeoutException(timeout, task.source))));
}

@Override
protected void run(Object batchingKey, List<? extends BatchingTask> tasks, String tasksSummary) {
protected void run(Object batchingKey, List<? extends BatchedTask> tasks, String tasksSummary) {
ClusterStateTaskExecutor<Object> taskExecutor = (ClusterStateTaskExecutor<Object>) batchingKey;
List<UpdateTask> updateTasks = (List<UpdateTask>) tasks;
runTasks(new ClusterService.TaskInputs(taskExecutor, updateTasks, tasksSummary));
}

class UpdateTask extends BatchingTask {
class UpdateTask extends BatchedTask {
final ClusterStateTaskListener listener;

UpdateTask(Priority priority, String source, Object task, ClusterStateTaskListener listener,
ClusterStateTaskExecutor<?> executor) {
super(priority, source, executor, task);
this.listener = listener;
}

@Override
public String describeTasks(List<? extends BatchedTask> tasks) {
return ((ClusterStateTaskExecutor<Object>) batchingKey).describeTasks(
tasks.stream().map(BatchedTask::getTask).collect(Collectors.toList()));
}
}
}

Expand Down Expand Up @@ -459,7 +469,7 @@ public <T> void submitStateUpdateTasks(final String source,
return;
}
try {
List<ClusterServiceTaskBatching.UpdateTask> safeTasks = tasks.entrySet().stream()
List<ClusterServiceTaskBatcher.UpdateTask> safeTasks = tasks.entrySet().stream()
.map(e -> taskBatching.new UpdateTask(config.priority(), source, e.getKey(), safe(e.getValue(), logger), executor))
.collect(Collectors.toList());
taskBatching.submitTasks(safeTasks, config.timeout());
Expand Down Expand Up @@ -600,11 +610,11 @@ void runTasks(TaskInputs taskInputs) {
public TaskOutputs calculateTaskOutputs(TaskInputs taskInputs, ClusterState previousClusterState, long startTimeNS) {
ClusterTasksResult<Object> clusterTasksResult = executeTasks(taskInputs, startTimeNS, previousClusterState);
// extract those that are waiting for results
List<ClusterServiceTaskBatching.UpdateTask> nonFailedTasks = new ArrayList<>();
for (ClusterServiceTaskBatching.UpdateTask updateTask : taskInputs.updateTasks) {
assert clusterTasksResult.executionResults.containsKey(updateTask.taskIdentity) : "missing " + updateTask;
List<ClusterServiceTaskBatcher.UpdateTask> nonFailedTasks = new ArrayList<>();
for (ClusterServiceTaskBatcher.UpdateTask updateTask : taskInputs.updateTasks) {
assert clusterTasksResult.executionResults.containsKey(updateTask.task) : "missing " + updateTask;
final ClusterStateTaskExecutor.TaskResult taskResult =
clusterTasksResult.executionResults.get(updateTask.taskIdentity);
clusterTasksResult.executionResults.get(updateTask.task);
if (taskResult.isSuccess()) {
nonFailedTasks.add(updateTask);
}
Expand All @@ -619,7 +629,7 @@ private ClusterTasksResult<Object> executeTasks(TaskInputs taskInputs, long star
ClusterTasksResult<Object> clusterTasksResult;
try {
List<Object> inputs = taskInputs.updateTasks.stream()
.map(ClusterServiceTaskBatching.UpdateTask::getTaskIdentity).collect(Collectors.toList());
.map(ClusterServiceTaskBatcher.UpdateTask::getTask).collect(Collectors.toList());
clusterTasksResult = taskInputs.executor.execute(previousClusterState, inputs);
} catch (Exception e) {
TimeValue executionTime = TimeValue.timeValueMillis(Math.max(0, TimeValue.nsecToMSec(currentTimeInNanos() - startTimeNS)));
Expand All @@ -637,7 +647,7 @@ private ClusterTasksResult<Object> executeTasks(TaskInputs taskInputs, long star
}
warnAboutSlowTaskIfNeeded(executionTime, taskInputs.summary);
clusterTasksResult = ClusterTasksResult.builder()
.failures(taskInputs.updateTasks.stream().map(ClusterServiceTaskBatching.UpdateTask::getTaskIdentity)::iterator, e)
.failures(taskInputs.updateTasks.stream().map(ClusterServiceTaskBatcher.UpdateTask::getTask)::iterator, e)
.build(previousClusterState);
}

Expand All @@ -648,8 +658,8 @@ private ClusterTasksResult<Object> executeTasks(TaskInputs taskInputs, long star
boolean assertsEnabled = false;
assert (assertsEnabled = true);
if (assertsEnabled) {
for (ClusterServiceTaskBatching.UpdateTask updateTask : taskInputs.updateTasks) {
assert clusterTasksResult.executionResults.containsKey(updateTask.taskIdentity) :
for (ClusterServiceTaskBatcher.UpdateTask updateTask : taskInputs.updateTasks) {
assert clusterTasksResult.executionResults.containsKey(updateTask.task) :
"missing task result for " + updateTask;
}
}
Expand Down Expand Up @@ -814,10 +824,10 @@ private void callClusterStateAppliers(ClusterState newClusterState, ClusterChang
*/
class TaskInputs {
public final String summary;
public final List<ClusterServiceTaskBatching.UpdateTask> updateTasks;
public final List<ClusterServiceTaskBatcher.UpdateTask> updateTasks;
public final ClusterStateTaskExecutor<Object> executor;

TaskInputs(ClusterStateTaskExecutor<Object> executor, List<ClusterServiceTaskBatching.UpdateTask> updateTasks, String summary) {
TaskInputs(ClusterStateTaskExecutor<Object> executor, List<ClusterServiceTaskBatcher.UpdateTask> updateTasks, String summary) {
this.summary = summary;
this.executor = executor;
this.updateTasks = updateTasks;
Expand All @@ -839,11 +849,11 @@ class TaskOutputs {
public final TaskInputs taskInputs;
public final ClusterState previousClusterState;
public final ClusterState newClusterState;
public final List<ClusterServiceTaskBatching.UpdateTask> nonFailedTasks;
public final List<ClusterServiceTaskBatcher.UpdateTask> nonFailedTasks;
public final Map<Object, ClusterStateTaskExecutor.TaskResult> executionResults;

TaskOutputs(TaskInputs taskInputs, ClusterState previousClusterState,
ClusterState newClusterState, List<ClusterServiceTaskBatching.UpdateTask> nonFailedTasks,
ClusterState newClusterState, List<ClusterServiceTaskBatcher.UpdateTask> nonFailedTasks,
Map<Object, ClusterStateTaskExecutor.TaskResult> executionResults) {
this.taskInputs = taskInputs;
this.previousClusterState = previousClusterState;
Expand Down Expand Up @@ -895,9 +905,9 @@ public boolean clusterStateUnchanged() {

public void notifyFailedTasks() {
// fail all tasks that have failed
for (ClusterServiceTaskBatching.UpdateTask updateTask : taskInputs.updateTasks) {
assert executionResults.containsKey(updateTask.taskIdentity) : "missing " + updateTask;
final ClusterStateTaskExecutor.TaskResult taskResult = executionResults.get(updateTask.taskIdentity);
for (ClusterServiceTaskBatcher.UpdateTask updateTask : taskInputs.updateTasks) {
assert executionResults.containsKey(updateTask.task) : "missing " + updateTask;
final ClusterStateTaskExecutor.TaskResult taskResult = executionResults.get(updateTask.task);
if (taskResult.isSuccess() == false) {
updateTask.listener.onFailure(updateTask.source, taskResult.getFailure());
}
Expand Down Expand Up @@ -1065,7 +1075,7 @@ public void clusterChanged(ClusterChangedEvent event) {
if (!master && event.localNodeMaster()) {
master = true;
for (LocalNodeMasterListener listener : listeners) {
java.util.concurrent.Executor executor = threadPool.executor(listener.executorName());
Executor executor = threadPool.executor(listener.executorName());
executor.execute(new OnMasterRunnable(listener));
}
return;
Expand All @@ -1074,7 +1084,7 @@ public void clusterChanged(ClusterChangedEvent event) {
if (master && !event.localNodeMaster()) {
master = false;
for (LocalNodeMasterListener listener : listeners) {
java.util.concurrent.Executor executor = threadPool.executor(listener.executorName());
Executor executor = threadPool.executor(listener.executorName());
executor.execute(new OffMasterRunnable(listener));
}
}
Expand Down
Loading

0 comments on commit a1e1ec4

Please sign in to comment.