diff --git a/core/src/main/java/org/elasticsearch/cluster/ClusterStateTaskExecutor.java b/core/src/main/java/org/elasticsearch/cluster/ClusterStateTaskExecutor.java index 9a0195ebcb4f3..3693447cfb6d2 100644 --- a/core/src/main/java/org/elasticsearch/cluster/ClusterStateTaskExecutor.java +++ b/core/src/main/java/org/elasticsearch/cluster/ClusterStateTaskExecutor.java @@ -18,14 +18,13 @@ */ package org.elasticsearch.cluster; -import org.elasticsearch.cluster.service.TaskBatching; import org.elasticsearch.common.Nullable; import java.util.IdentityHashMap; import java.util.List; import java.util.Map; -public interface ClusterStateTaskExecutor extends TaskBatching.BatchingKey { +public interface ClusterStateTaskExecutor { /** * Update the cluster state based on the current state and the given tasks. Return the *same instance* if no state * should be changed. @@ -48,6 +47,25 @@ default boolean runOnlyOnMaster() { default void clusterStatePublished(ClusterChangedEvent clusterChangedEvent) { } + /** + * Builds a concise description of a list of tasks (to be used in logging etc.). + * + * Note that the tasks given are not necessarily the same as those that will be passed to {@link #execute(ClusterState, List)}. + * but are guaranteed to be a subset of them. This method can be called multiple times with different lists before execution. + * This allows groupd task description but the submitting source. + */ + default String describeTasks(List tasks) { + return tasks.stream().map(T::toString).reduce((s1,s2) -> { + if (s1.isEmpty()) { + return s2; + } else if (s2.isEmpty()) { + return s1; + } else { + return s1 + ", " + s2; + } + }).orElse(""); + } + /** * Represents the result of a batched execution of cluster state update tasks * @param the type of the cluster state update task diff --git a/core/src/main/java/org/elasticsearch/cluster/service/ClusterService.java b/core/src/main/java/org/elasticsearch/cluster/service/ClusterService.java index 23984d4ecbb8d..9db3e44cb92b2 100644 --- a/core/src/main/java/org/elasticsearch/cluster/service/ClusterService.java +++ b/core/src/main/java/org/elasticsearch/cluster/service/ClusterService.java @@ -78,6 +78,7 @@ import java.util.Queue; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.CopyOnWriteArrayList; +import java.util.concurrent.Executor; import java.util.concurrent.Future; import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit; @@ -109,7 +110,7 @@ public class ClusterService extends AbstractLifecycleComponent { private TimeValue slowTaskLoggingThreshold; private volatile PrioritizedEsThreadPoolExecutor threadPoolExecutor; - private volatile ClusterServiceTaskBatching taskBatching; + private volatile ClusterServiceTaskBatcher taskBatching; /** * Those 3 state listeners are changing infrequently - CopyOnWriteArrayList is just fine @@ -215,8 +216,8 @@ protected synchronized void doStart() { return ClusterState.builder(state).nodes(nodes).blocks(initialBlocks).build(); }); this.threadPoolExecutor = EsExecutors.newSinglePrioritizing(UPDATE_THREAD_NAME, - daemonThreadFactory(settings, UPDATE_THREAD_NAME), threadPool.getThreadContext()); - this.taskBatching = new ClusterServiceTaskBatching(logger, threadPoolExecutor, threadPool); + daemonThreadFactory(settings, UPDATE_THREAD_NAME), threadPool.getThreadContext(), threadPool.scheduler()); + this.taskBatching = new ClusterServiceTaskBatcher(logger, threadPoolExecutor); } @Override @@ -240,25 +241,28 @@ protected synchronized void doStop() { protected synchronized void doClose() { } - class ClusterServiceTaskBatching extends TaskBatching { + class ClusterServiceTaskBatcher extends TaskBatcher { - ClusterServiceTaskBatching(Logger logger, PrioritizedEsThreadPoolExecutor threadExecutor, ThreadPool threadPool) { - super(logger, threadExecutor, threadPool); + ClusterServiceTaskBatcher(Logger logger, PrioritizedEsThreadPoolExecutor threadExecutor) { + super(logger, threadExecutor); } @Override - protected void onTimeout(BatchingTask task, TimeValue timeout) { - ((UpdateTask) task).listener.onFailure(task.source, new ProcessClusterEventTimeoutException(timeout, task.source)); + protected void onTimeout(List tasks, TimeValue timeout) { + threadPool.generic().execute( + () -> tasks.forEach( + task -> ((UpdateTask) task).listener.onFailure(task.source, + new ProcessClusterEventTimeoutException(timeout, task.source)))); } @Override - protected void run(Object batchingKey, List tasks, String tasksSummary) { + protected void run(Object batchingKey, List tasks, String tasksSummary) { ClusterStateTaskExecutor taskExecutor = (ClusterStateTaskExecutor) batchingKey; List updateTasks = (List) tasks; runTasks(new ClusterService.TaskInputs(taskExecutor, updateTasks, tasksSummary)); } - class UpdateTask extends BatchingTask { + class UpdateTask extends BatchedTask { final ClusterStateTaskListener listener; UpdateTask(Priority priority, String source, Object task, ClusterStateTaskListener listener, @@ -266,6 +270,12 @@ class UpdateTask extends BatchingTask { super(priority, source, executor, task); this.listener = listener; } + + @Override + public String describeTasks(List tasks) { + return ((ClusterStateTaskExecutor) batchingKey).describeTasks( + tasks.stream().map(BatchedTask::getTask).collect(Collectors.toList())); + } } } @@ -459,7 +469,7 @@ public void submitStateUpdateTasks(final String source, return; } try { - List safeTasks = tasks.entrySet().stream() + List safeTasks = tasks.entrySet().stream() .map(e -> taskBatching.new UpdateTask(config.priority(), source, e.getKey(), safe(e.getValue(), logger), executor)) .collect(Collectors.toList()); taskBatching.submitTasks(safeTasks, config.timeout()); @@ -600,11 +610,11 @@ void runTasks(TaskInputs taskInputs) { public TaskOutputs calculateTaskOutputs(TaskInputs taskInputs, ClusterState previousClusterState, long startTimeNS) { ClusterTasksResult clusterTasksResult = executeTasks(taskInputs, startTimeNS, previousClusterState); // extract those that are waiting for results - List nonFailedTasks = new ArrayList<>(); - for (ClusterServiceTaskBatching.UpdateTask updateTask : taskInputs.updateTasks) { - assert clusterTasksResult.executionResults.containsKey(updateTask.taskIdentity) : "missing " + updateTask; + List nonFailedTasks = new ArrayList<>(); + for (ClusterServiceTaskBatcher.UpdateTask updateTask : taskInputs.updateTasks) { + assert clusterTasksResult.executionResults.containsKey(updateTask.task) : "missing " + updateTask; final ClusterStateTaskExecutor.TaskResult taskResult = - clusterTasksResult.executionResults.get(updateTask.taskIdentity); + clusterTasksResult.executionResults.get(updateTask.task); if (taskResult.isSuccess()) { nonFailedTasks.add(updateTask); } @@ -619,7 +629,7 @@ private ClusterTasksResult executeTasks(TaskInputs taskInputs, long star ClusterTasksResult clusterTasksResult; try { List inputs = taskInputs.updateTasks.stream() - .map(ClusterServiceTaskBatching.UpdateTask::getTaskIdentity).collect(Collectors.toList()); + .map(ClusterServiceTaskBatcher.UpdateTask::getTask).collect(Collectors.toList()); clusterTasksResult = taskInputs.executor.execute(previousClusterState, inputs); } catch (Exception e) { TimeValue executionTime = TimeValue.timeValueMillis(Math.max(0, TimeValue.nsecToMSec(currentTimeInNanos() - startTimeNS))); @@ -637,7 +647,7 @@ private ClusterTasksResult executeTasks(TaskInputs taskInputs, long star } warnAboutSlowTaskIfNeeded(executionTime, taskInputs.summary); clusterTasksResult = ClusterTasksResult.builder() - .failures(taskInputs.updateTasks.stream().map(ClusterServiceTaskBatching.UpdateTask::getTaskIdentity)::iterator, e) + .failures(taskInputs.updateTasks.stream().map(ClusterServiceTaskBatcher.UpdateTask::getTask)::iterator, e) .build(previousClusterState); } @@ -648,8 +658,8 @@ private ClusterTasksResult executeTasks(TaskInputs taskInputs, long star boolean assertsEnabled = false; assert (assertsEnabled = true); if (assertsEnabled) { - for (ClusterServiceTaskBatching.UpdateTask updateTask : taskInputs.updateTasks) { - assert clusterTasksResult.executionResults.containsKey(updateTask.taskIdentity) : + for (ClusterServiceTaskBatcher.UpdateTask updateTask : taskInputs.updateTasks) { + assert clusterTasksResult.executionResults.containsKey(updateTask.task) : "missing task result for " + updateTask; } } @@ -814,10 +824,10 @@ private void callClusterStateAppliers(ClusterState newClusterState, ClusterChang */ class TaskInputs { public final String summary; - public final List updateTasks; + public final List updateTasks; public final ClusterStateTaskExecutor executor; - TaskInputs(ClusterStateTaskExecutor executor, List updateTasks, String summary) { + TaskInputs(ClusterStateTaskExecutor executor, List updateTasks, String summary) { this.summary = summary; this.executor = executor; this.updateTasks = updateTasks; @@ -839,11 +849,11 @@ class TaskOutputs { public final TaskInputs taskInputs; public final ClusterState previousClusterState; public final ClusterState newClusterState; - public final List nonFailedTasks; + public final List nonFailedTasks; public final Map executionResults; TaskOutputs(TaskInputs taskInputs, ClusterState previousClusterState, - ClusterState newClusterState, List nonFailedTasks, + ClusterState newClusterState, List nonFailedTasks, Map executionResults) { this.taskInputs = taskInputs; this.previousClusterState = previousClusterState; @@ -895,9 +905,9 @@ public boolean clusterStateUnchanged() { public void notifyFailedTasks() { // fail all tasks that have failed - for (ClusterServiceTaskBatching.UpdateTask updateTask : taskInputs.updateTasks) { - assert executionResults.containsKey(updateTask.taskIdentity) : "missing " + updateTask; - final ClusterStateTaskExecutor.TaskResult taskResult = executionResults.get(updateTask.taskIdentity); + for (ClusterServiceTaskBatcher.UpdateTask updateTask : taskInputs.updateTasks) { + assert executionResults.containsKey(updateTask.task) : "missing " + updateTask; + final ClusterStateTaskExecutor.TaskResult taskResult = executionResults.get(updateTask.task); if (taskResult.isSuccess() == false) { updateTask.listener.onFailure(updateTask.source, taskResult.getFailure()); } @@ -1065,7 +1075,7 @@ public void clusterChanged(ClusterChangedEvent event) { if (!master && event.localNodeMaster()) { master = true; for (LocalNodeMasterListener listener : listeners) { - java.util.concurrent.Executor executor = threadPool.executor(listener.executorName()); + Executor executor = threadPool.executor(listener.executorName()); executor.execute(new OnMasterRunnable(listener)); } return; @@ -1074,7 +1084,7 @@ public void clusterChanged(ClusterChangedEvent event) { if (master && !event.localNodeMaster()) { master = false; for (LocalNodeMasterListener listener : listeners) { - java.util.concurrent.Executor executor = threadPool.executor(listener.executorName()); + Executor executor = threadPool.executor(listener.executorName()); executor.execute(new OffMasterRunnable(listener)); } } diff --git a/core/src/main/java/org/elasticsearch/cluster/service/TaskBatching.java b/core/src/main/java/org/elasticsearch/cluster/service/TaskBatcher.java similarity index 56% rename from core/src/main/java/org/elasticsearch/cluster/service/TaskBatching.java rename to core/src/main/java/org/elasticsearch/cluster/service/TaskBatcher.java index 35c611fc5bd81..d73b14f23d3a3 100644 --- a/core/src/main/java/org/elasticsearch/cluster/service/TaskBatching.java +++ b/core/src/main/java/org/elasticsearch/cluster/service/TaskBatcher.java @@ -25,7 +25,6 @@ import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException; import org.elasticsearch.common.util.concurrent.PrioritizedEsThreadPoolExecutor; -import org.elasticsearch.threadpool.ThreadPool; import java.util.ArrayList; import java.util.Collections; @@ -40,42 +39,40 @@ /** * Batching support for {@link PrioritizedEsThreadPoolExecutor} - * Tasks that share the same batching key are batched (see {@link BatchingTask#batchingKey}) + * Tasks that share the same batching key are batched (see {@link BatchedTask#batchingKey}) */ -public abstract class TaskBatching { +public abstract class TaskBatcher { private final Logger logger; - private final ThreadPool threadPool; private final PrioritizedEsThreadPoolExecutor threadExecutor; // package visible for tests - final Map> tasksPerExecutor = new HashMap<>(); + final Map> tasksPerExecutor = new HashMap<>(); - public TaskBatching(Logger logger, PrioritizedEsThreadPoolExecutor threadExecutor, ThreadPool threadPool) { + public TaskBatcher(Logger logger, PrioritizedEsThreadPoolExecutor threadExecutor) { this.logger = logger; this.threadExecutor = threadExecutor; - this.threadPool = threadPool; } - public void submitTasks(List tasks, @Nullable TimeValue timeout) throws EsRejectedExecutionException { + public void submitTasks(List tasks, @Nullable TimeValue timeout) throws EsRejectedExecutionException { if (tasks.isEmpty()) { return; } - final BatchingTask firstTask = tasks.get(0); + final BatchedTask firstTask = tasks.get(0); assert tasks.stream().allMatch(t -> t.batchingKey == firstTask.batchingKey) : "tasks submitted in a batch should share the same batching key: " + tasks; // convert to an identity map to check for dups based on task identity - final Map tasksIdentity = tasks.stream().collect(Collectors.toMap( - BatchingTask::getTaskIdentity, + final Map tasksIdentity = tasks.stream().collect(Collectors.toMap( + BatchedTask::getTask, Function.identity(), (a, b) -> { throw new IllegalStateException("cannot add duplicate task: " + a); }, IdentityHashMap::new)); synchronized (tasksPerExecutor) { - LinkedHashSet existingTasks = tasksPerExecutor.computeIfAbsent(firstTask.batchingKey, + LinkedHashSet existingTasks = tasksPerExecutor.computeIfAbsent(firstTask.batchingKey, k -> new LinkedHashSet<>(tasks.size())); - for (BatchingTask existing : existingTasks) { + for (BatchedTask existing : existingTasks) { // check that there won't be two tasks with the same identity for the same batching key - BatchingTask duplicateTask = tasksIdentity.get(existing.getTaskIdentity()); + BatchedTask duplicateTask = tasksIdentity.get(existing.getTask()); if (duplicateTask != null) { throw new IllegalStateException("task [" + duplicateTask.describeTasks( Collections.singletonList(existing)) + "] with source [" + duplicateTask.source + "] is already queued"); @@ -85,54 +82,54 @@ public void submitTasks(List tasks, @Nullable TimeValue } if (timeout != null) { - threadExecutor.execute(firstTask, threadPool.scheduler(), timeout, () -> onTimeoutInternal(tasks, timeout)); + threadExecutor.execute(firstTask, timeout, () -> onTimeoutInternal(tasks, timeout)); } else { threadExecutor.execute(firstTask); } } - private void onTimeoutInternal(List updateTasks, TimeValue timeout) { - threadPool.generic().execute(() -> { - final ArrayList toRemove = new ArrayList<>(); - for (BatchingTask task : updateTasks) { - if (task.processed.getAndSet(true) == false) { - logger.debug("task [{}] timed out after [{}]", task.source, timeout); - toRemove.add(task); - } + private void onTimeoutInternal(List tasks, TimeValue timeout) { + final ArrayList toRemove = new ArrayList<>(); + for (BatchedTask task : tasks) { + if (task.processed.getAndSet(true) == false) { + logger.debug("task [{}] timed out after [{}]", task.source, timeout); + toRemove.add(task); } - if (toRemove.isEmpty() == false) { - Object batchingExecutor = toRemove.get(0).batchingKey; - synchronized (tasksPerExecutor) { - LinkedHashSet existingTasks = tasksPerExecutor.get(batchingExecutor); - if (existingTasks != null) { - existingTasks.removeAll(toRemove); - if (existingTasks.isEmpty()) { - tasksPerExecutor.remove(batchingExecutor); - } + } + if (toRemove.isEmpty() == false) { + BatchedTask firstTask = toRemove.get(0); + assert tasks.stream().allMatch(t -> t.batchingKey == firstTask.batchingKey) : + "tasks submitted in a batch should share the same batching key: " + tasks; + Object batchingExecutor = firstTask.batchingKey; + synchronized (tasksPerExecutor) { + LinkedHashSet existingTasks = tasksPerExecutor.get(batchingExecutor); + if (existingTasks != null) { + existingTasks.removeAll(toRemove); + if (existingTasks.isEmpty()) { + tasksPerExecutor.remove(batchingExecutor); } } - for (BatchingTask task : toRemove) { - onTimeout(task, timeout); - } } - }); + onTimeout(toRemove, timeout); + } } /** - * Action to be implemented by the specific batching implementation + * Action to be implemented by the specific batching implementation. + * All tasks have the same batching key. */ - protected abstract void onTimeout(BatchingTask task, TimeValue timeout); + protected abstract void onTimeout(List tasks, TimeValue timeout); - void runIfNotProcessed(BatchingTask updateTask) { + void runIfNotProcessed(BatchedTask updateTask) { // if this task is already processed, the executor shouldn't execute other tasks (that arrived later), // to give other executors a chance to execute their tasks. if (updateTask.processed.get() == false) { - final List toExecute = new ArrayList<>(); - final Map> processTasksBySource = new HashMap<>(); + final List toExecute = new ArrayList<>(); + final Map> processTasksBySource = new HashMap<>(); synchronized (tasksPerExecutor) { - LinkedHashSet pending = tasksPerExecutor.remove(updateTask.batchingKey); + LinkedHashSet pending = tasksPerExecutor.remove(updateTask.batchingKey); if (pending != null) { - for (BatchingTask task : pending) { + for (BatchedTask task : pending) { if (task.processed.getAndSet(true) == false) { logger.trace("will process {}", task); toExecute.add(task); @@ -157,14 +154,15 @@ void runIfNotProcessed(BatchingTask updateTask) { /** * Action to be implemented by the specific batching implementation + * All tasks have the given batching key. */ - protected abstract void run(Object batchingKey, List tasks, String tasksSummary); + protected abstract void run(Object batchingKey, List tasks, String tasksSummary); /** * Represents a runnable task that supports batching. - * Implementors of TaskBatching can subclass this to add a payload to the task. + * Implementors of TaskBatcher can subclass this to add a payload to the task. */ - protected abstract class BatchingTask extends SourcePrioritizedRunnable { + protected abstract class BatchedTask extends SourcePrioritizedRunnable { /** * whether the task has been processed already */ @@ -173,16 +171,16 @@ protected abstract class BatchingTask extends SourcePrioritizedRunnable { /** * the object that is used as batching key */ - protected final BatchingKey batchingKey; + protected final Object batchingKey; /** * the task object that is wrapped */ - protected final Object taskIdentity; + protected final Object task; - protected BatchingTask(Priority priority, String source, BatchingKey batchingKey, Object taskIdentity) { + protected BatchedTask(Priority priority, String source, Object batchingKey, Object task) { super(priority, source); this.batchingKey = batchingKey; - this.taskIdentity = taskIdentity; + this.task = task; } @Override @@ -200,34 +198,10 @@ public String toString() { } } - @SuppressWarnings("unchecked") - public String describeTasks(List tasks) { - return ((BatchingKey) batchingKey).describeTasks( - tasks.stream().map(BatchingTask::getTaskIdentity).collect(Collectors.toList())); - } - - public Object getTaskIdentity() { - return taskIdentity; - } - } + public abstract String describeTasks(List tasks); - public interface BatchingKey { - /** - * Builds a concise description of a list of tasks (to be used in logging etc.). - * - * This method can be called multiple times with different lists before execution. - * This allows groupd task description but the submitting source. - */ - default String describeTasks(List tasks) { - return tasks.stream().map(T::toString).reduce((s1, s2) -> { - if (s1.isEmpty()) { - return s2; - } else if (s2.isEmpty()) { - return s1; - } else { - return s1 + ", " + s2; - } - }).orElse(""); + public Object getTask() { + return task; } } } diff --git a/core/src/main/java/org/elasticsearch/common/util/concurrent/EsExecutors.java b/core/src/main/java/org/elasticsearch/common/util/concurrent/EsExecutors.java index bd99f3b1a4737..3a5e3b4dab465 100644 --- a/core/src/main/java/org/elasticsearch/common/util/concurrent/EsExecutors.java +++ b/core/src/main/java/org/elasticsearch/common/util/concurrent/EsExecutors.java @@ -30,6 +30,7 @@ import java.util.concurrent.BlockingQueue; import java.util.concurrent.ExecutorService; import java.util.concurrent.LinkedTransferQueue; +import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ThreadFactory; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; @@ -57,8 +58,8 @@ public static int numberOfProcessors(final Settings settings) { return PROCESSORS_SETTING.get(settings); } - public static PrioritizedEsThreadPoolExecutor newSinglePrioritizing(String name, ThreadFactory threadFactory, ThreadContext contextHolder) { - return new PrioritizedEsThreadPoolExecutor(name, 1, 1, 0L, TimeUnit.MILLISECONDS, threadFactory, contextHolder); + public static PrioritizedEsThreadPoolExecutor newSinglePrioritizing(String name, ThreadFactory threadFactory, ThreadContext contextHolder, ScheduledExecutorService timer) { + return new PrioritizedEsThreadPoolExecutor(name, 1, 1, 0L, TimeUnit.MILLISECONDS, threadFactory, contextHolder, timer); } public static EsThreadPoolExecutor newScaling(String name, int min, int max, long keepAliveTime, TimeUnit unit, ThreadFactory threadFactory, ThreadContext contextHolder) { diff --git a/core/src/main/java/org/elasticsearch/common/util/concurrent/PrioritizedEsThreadPoolExecutor.java b/core/src/main/java/org/elasticsearch/common/util/concurrent/PrioritizedEsThreadPoolExecutor.java index 1b01455c1ca79..5b3dae7ffae71 100644 --- a/core/src/main/java/org/elasticsearch/common/util/concurrent/PrioritizedEsThreadPoolExecutor.java +++ b/core/src/main/java/org/elasticsearch/common/util/concurrent/PrioritizedEsThreadPoolExecutor.java @@ -44,11 +44,14 @@ public class PrioritizedEsThreadPoolExecutor extends EsThreadPoolExecutor { private static final TimeValue NO_WAIT_TIME_VALUE = TimeValue.timeValueMillis(0); - private AtomicLong insertionOrder = new AtomicLong(); - private Queue current = ConcurrentCollections.newQueue(); + private final AtomicLong insertionOrder = new AtomicLong(); + private final Queue current = ConcurrentCollections.newQueue(); + private final ScheduledExecutorService timer; - PrioritizedEsThreadPoolExecutor(String name, int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, ThreadFactory threadFactory, ThreadContext contextHolder) { + PrioritizedEsThreadPoolExecutor(String name, int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, + ThreadFactory threadFactory, ThreadContext contextHolder, ScheduledExecutorService timer) { super(name, corePoolSize, maximumPoolSize, keepAliveTime, unit, new PriorityBlockingQueue<>(), threadFactory, contextHolder); + this.timer = timer; } public Pending[] getPending() { @@ -111,7 +114,7 @@ protected void afterExecute(Runnable r, Throwable t) { current.remove(r); } - public void execute(Runnable command, final ScheduledExecutorService timer, final TimeValue timeout, final Runnable timeoutCallback) { + public void execute(Runnable command, final TimeValue timeout, final Runnable timeoutCallback) { command = wrapRunnable(command); doExecute(command); if (timeout.nanos() >= 0) { diff --git a/core/src/test/java/org/elasticsearch/cluster/service/TaskBatchingTests.java b/core/src/test/java/org/elasticsearch/cluster/service/TaskBatcherTests.java similarity index 93% rename from core/src/test/java/org/elasticsearch/cluster/service/TaskBatchingTests.java rename to core/src/test/java/org/elasticsearch/cluster/service/TaskBatcherTests.java index b78b953988891..02bc1dce38f23 100644 --- a/core/src/test/java/org/elasticsearch/cluster/service/TaskBatchingTests.java +++ b/core/src/test/java/org/elasticsearch/cluster/service/TaskBatcherTests.java @@ -29,7 +29,6 @@ import org.elasticsearch.common.collect.Tuple; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.util.concurrent.PrioritizedEsThreadPoolExecutor; -import org.elasticsearch.threadpool.ThreadPool; import org.junit.Before; import java.util.ArrayList; @@ -50,40 +49,49 @@ import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.hasToString; -public class TaskBatchingTests extends TaskExecutorTests { +public class TaskBatcherTests extends TaskExecutorTests { protected TestTaskBatching taskBatching; @Before public void setUpBatchingTaskExecutor() throws Exception { - taskBatching = new TestTaskBatching(logger, threadExecutor, threadPool); + taskBatching = new TestTaskBatching(logger, threadExecutor); } - class TestTaskBatching extends TaskBatching { + class TestTaskBatching extends TaskBatcher { - TestTaskBatching(Logger logger, PrioritizedEsThreadPoolExecutor threadExecutor, ThreadPool threadPool) { - super(logger, threadExecutor, threadPool); + TestTaskBatching(Logger logger, PrioritizedEsThreadPoolExecutor threadExecutor) { + super(logger, threadExecutor); } @Override - protected void run(Object batchingKey, List tasks, String tasksSummary) { + protected void run(Object batchingKey, List tasks, String tasksSummary) { List updateTasks = (List) tasks; - ((TestExecutor) batchingKey).execute(updateTasks.stream().map(t -> t.taskIdentity).collect(Collectors.toList())); + ((TestExecutor) batchingKey).execute(updateTasks.stream().map(t -> t.task).collect(Collectors.toList())); updateTasks.forEach(updateTask -> updateTask.listener.processed(updateTask.source)); } @Override - protected void onTimeout(BatchingTask task, TimeValue timeout) { - ((UpdateTask) task).listener.onFailure(task.source, new ProcessClusterEventTimeoutException(timeout, task.source)); + protected void onTimeout(List tasks, TimeValue timeout) { + threadPool.generic().execute( + () -> tasks.forEach( + task -> ((UpdateTask) task).listener.onFailure(task.source, + new ProcessClusterEventTimeoutException(timeout, task.source)))); } - class UpdateTask extends BatchingTask { + class UpdateTask extends BatchedTask { final TestListener listener; UpdateTask(Priority priority, String source, Object task, TestListener listener, TestExecutor executor) { super(priority, source, executor, task); this.listener = listener; } + + @Override + public String describeTasks(List tasks) { + return ((TestExecutor) batchingKey).describeTasks( + tasks.stream().map(BatchedTask::getTask).collect(Collectors.toList())); + } } } diff --git a/core/src/test/java/org/elasticsearch/cluster/service/TaskExecutorTests.java b/core/src/test/java/org/elasticsearch/cluster/service/TaskExecutorTests.java index 015a9091d8b67..fe426fdd42a9d 100644 --- a/core/src/test/java/org/elasticsearch/cluster/service/TaskExecutorTests.java +++ b/core/src/test/java/org/elasticsearch/cluster/service/TaskExecutorTests.java @@ -67,7 +67,7 @@ public static void stopThreadPool() { @Before public void setUpExecutor() { threadExecutor = EsExecutors.newSinglePrioritizing("test_thread", - daemonThreadFactory(Settings.EMPTY, "test_thread"), threadPool.getThreadContext()); + daemonThreadFactory(Settings.EMPTY, "test_thread"), threadPool.getThreadContext(), threadPool.scheduler()); } @After @@ -83,12 +83,24 @@ default void processed(String source) { } } - protected interface TestExecutor extends TaskBatching.BatchingKey { + protected interface TestExecutor { void execute(List tasks); + + default String describeTasks(List tasks) { + return tasks.stream().map(T::toString).reduce((s1,s2) -> { + if (s1.isEmpty()) { + return s2; + } else if (s2.isEmpty()) { + return s1; + } else { + return s1 + ", " + s2; + } + }).orElse(""); + } } /** - * Task class that works for single tasks as well as batching (see {@link TaskBatchingTests}) + * Task class that works for single tasks as well as batching (see {@link TaskBatcherTests}) */ protected abstract static class TestTask implements TestExecutor, TestListener, ClusterStateTaskConfig { @@ -127,12 +139,12 @@ public void run() { } } - // can be overridden by TaskBatchingTests + // can be overridden by TaskBatcherTests protected void submitTask(String source, TestTask testTask) { SourcePrioritizedRunnable task = new UpdateTask(source, testTask); TimeValue timeout = testTask.timeout(); if (timeout != null) { - threadExecutor.execute(task, threadPool.scheduler(), timeout, () -> threadPool.generic().execute(() -> { + threadExecutor.execute(task, timeout, () -> threadPool.generic().execute(() -> { logger.debug("task [{}] timed out after [{}]", task, timeout); testTask.onFailure(source, new ProcessClusterEventTimeoutException(timeout, source)); })); diff --git a/core/src/test/java/org/elasticsearch/common/util/concurrent/PrioritizedExecutorsTests.java b/core/src/test/java/org/elasticsearch/common/util/concurrent/PrioritizedExecutorsTests.java index 933a46de510dc..3ed105080b30b 100644 --- a/core/src/test/java/org/elasticsearch/common/util/concurrent/PrioritizedExecutorsTests.java +++ b/core/src/test/java/org/elasticsearch/common/util/concurrent/PrioritizedExecutorsTests.java @@ -65,7 +65,7 @@ public void testPriorityQueue() throws Exception { } public void testSubmitPrioritizedExecutorWithRunnables() throws Exception { - ExecutorService executor = EsExecutors.newSinglePrioritizing(getTestName(), EsExecutors.daemonThreadFactory(getTestName()), holder); + ExecutorService executor = EsExecutors.newSinglePrioritizing(getTestName(), EsExecutors.daemonThreadFactory(getTestName()), holder, null); List results = new ArrayList<>(8); CountDownLatch awaitingLatch = new CountDownLatch(1); CountDownLatch finishedLatch = new CountDownLatch(8); @@ -94,7 +94,7 @@ public void testSubmitPrioritizedExecutorWithRunnables() throws Exception { } public void testExecutePrioritizedExecutorWithRunnables() throws Exception { - ExecutorService executor = EsExecutors.newSinglePrioritizing(getTestName(), EsExecutors.daemonThreadFactory(getTestName()), holder); + ExecutorService executor = EsExecutors.newSinglePrioritizing(getTestName(), EsExecutors.daemonThreadFactory(getTestName()), holder, null); List results = new ArrayList<>(8); CountDownLatch awaitingLatch = new CountDownLatch(1); CountDownLatch finishedLatch = new CountDownLatch(8); @@ -123,7 +123,7 @@ public void testExecutePrioritizedExecutorWithRunnables() throws Exception { } public void testSubmitPrioritizedExecutorWithCallables() throws Exception { - ExecutorService executor = EsExecutors.newSinglePrioritizing(getTestName(), EsExecutors.daemonThreadFactory(getTestName()), holder); + ExecutorService executor = EsExecutors.newSinglePrioritizing(getTestName(), EsExecutors.daemonThreadFactory(getTestName()), holder, null); List results = new ArrayList<>(8); CountDownLatch awaitingLatch = new CountDownLatch(1); CountDownLatch finishedLatch = new CountDownLatch(8); @@ -152,7 +152,7 @@ public void testSubmitPrioritizedExecutorWithCallables() throws Exception { } public void testSubmitPrioritizedExecutorWithMixed() throws Exception { - ExecutorService executor = EsExecutors.newSinglePrioritizing(getTestName(), EsExecutors.daemonThreadFactory(getTestName()), holder); + ExecutorService executor = EsExecutors.newSinglePrioritizing(getTestName(), EsExecutors.daemonThreadFactory(getTestName()), holder, null); List results = new ArrayList<>(8); CountDownLatch awaitingLatch = new CountDownLatch(1); CountDownLatch finishedLatch = new CountDownLatch(8); @@ -182,7 +182,7 @@ public void testSubmitPrioritizedExecutorWithMixed() throws Exception { public void testTimeout() throws Exception { ScheduledExecutorService timer = Executors.newSingleThreadScheduledExecutor(EsExecutors.daemonThreadFactory(getTestName())); - PrioritizedEsThreadPoolExecutor executor = EsExecutors.newSinglePrioritizing(getTestName(), EsExecutors.daemonThreadFactory(getTestName()), holder); + PrioritizedEsThreadPoolExecutor executor = EsExecutors.newSinglePrioritizing(getTestName(), EsExecutors.daemonThreadFactory(getTestName()), holder, timer); final CountDownLatch invoked = new CountDownLatch(1); final CountDownLatch block = new CountDownLatch(1); executor.execute(new Runnable() { @@ -219,7 +219,7 @@ public void run() { public String toString() { return "the waiting"; } - }, timer, TimeValue.timeValueMillis(100) /* enough timeout to catch them in the pending list... */, new Runnable() { + }, TimeValue.timeValueMillis(100) /* enough timeout to catch them in the pending list... */, new Runnable() { @Override public void run() { timedOut.countDown(); @@ -245,14 +245,14 @@ public void testTimeoutCleanup() throws Exception { ThreadPool threadPool = new TestThreadPool("test"); final ScheduledThreadPoolExecutor timer = (ScheduledThreadPoolExecutor) threadPool.scheduler(); final AtomicBoolean timeoutCalled = new AtomicBoolean(); - PrioritizedEsThreadPoolExecutor executor = EsExecutors.newSinglePrioritizing(getTestName(), EsExecutors.daemonThreadFactory(getTestName()), holder); + PrioritizedEsThreadPoolExecutor executor = EsExecutors.newSinglePrioritizing(getTestName(), EsExecutors.daemonThreadFactory(getTestName()), holder, timer); final CountDownLatch invoked = new CountDownLatch(1); executor.execute(new Runnable() { @Override public void run() { invoked.countDown(); } - }, timer, TimeValue.timeValueHours(1), new Runnable() { + }, TimeValue.timeValueHours(1), new Runnable() { @Override public void run() { // We should never get here