Skip to content

Commit

Permalink
Extract batch executor out of cluster service (#24102)
Browse files Browse the repository at this point in the history
Refactoring that extracts the task batching functionality from ClusterService and makes it a reusable component that can be tested in isolation.
  • Loading branch information
ywelsch committed Apr 20, 2017
1 parent 907ccd7 commit 86f2e1e
Show file tree
Hide file tree
Showing 9 changed files with 1,050 additions and 604 deletions.
237 changes: 66 additions & 171 deletions core/src/main/java/org/elasticsearch/cluster/service/ClusterService.java

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.elasticsearch.cluster.service;

import org.elasticsearch.common.Priority;
import org.elasticsearch.common.util.concurrent.PrioritizedRunnable;

/**
* PrioritizedRunnable that also has a source string
*/
public abstract class SourcePrioritizedRunnable extends PrioritizedRunnable {
protected final String source;

public SourcePrioritizedRunnable(Priority priority, String source) {
super(priority);
this.source = source;
}

public String source() {
return source;
}

@Override
public String toString() {
return "[" + source + "]";
}
}
207 changes: 207 additions & 0 deletions core/src/main/java/org/elasticsearch/cluster/service/TaskBatcher.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,207 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.elasticsearch.cluster.service;

import org.apache.logging.log4j.Logger;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.Priority;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException;
import org.elasticsearch.common.util.concurrent.PrioritizedEsThreadPoolExecutor;

import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.IdentityHashMap;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Function;
import java.util.stream.Collectors;

/**
* Batching support for {@link PrioritizedEsThreadPoolExecutor}
* Tasks that share the same batching key are batched (see {@link BatchedTask#batchingKey})
*/
public abstract class TaskBatcher {

private final Logger logger;
private final PrioritizedEsThreadPoolExecutor threadExecutor;
// package visible for tests
final Map<Object, LinkedHashSet<BatchedTask>> tasksPerBatchingKey = new HashMap<>();

public TaskBatcher(Logger logger, PrioritizedEsThreadPoolExecutor threadExecutor) {
this.logger = logger;
this.threadExecutor = threadExecutor;
}

public void submitTasks(List<? extends BatchedTask> tasks, @Nullable TimeValue timeout) throws EsRejectedExecutionException {
if (tasks.isEmpty()) {
return;
}
final BatchedTask firstTask = tasks.get(0);
assert tasks.stream().allMatch(t -> t.batchingKey == firstTask.batchingKey) :
"tasks submitted in a batch should share the same batching key: " + tasks;
// convert to an identity map to check for dups based on task identity
final Map<Object, BatchedTask> tasksIdentity = tasks.stream().collect(Collectors.toMap(
BatchedTask::getTask,
Function.identity(),
(a, b) -> { throw new IllegalStateException("cannot add duplicate task: " + a); },
IdentityHashMap::new));

synchronized (tasksPerBatchingKey) {
LinkedHashSet<BatchedTask> existingTasks = tasksPerBatchingKey.computeIfAbsent(firstTask.batchingKey,
k -> new LinkedHashSet<>(tasks.size()));
for (BatchedTask existing : existingTasks) {
// check that there won't be two tasks with the same identity for the same batching key
BatchedTask duplicateTask = tasksIdentity.get(existing.getTask());
if (duplicateTask != null) {
throw new IllegalStateException("task [" + duplicateTask.describeTasks(
Collections.singletonList(existing)) + "] with source [" + duplicateTask.source + "] is already queued");
}
}
existingTasks.addAll(tasks);
}

if (timeout != null) {
threadExecutor.execute(firstTask, timeout, () -> onTimeoutInternal(tasks, timeout));
} else {
threadExecutor.execute(firstTask);
}
}

private void onTimeoutInternal(List<? extends BatchedTask> tasks, TimeValue timeout) {
final ArrayList<BatchedTask> toRemove = new ArrayList<>();
for (BatchedTask task : tasks) {
if (task.processed.getAndSet(true) == false) {
logger.debug("task [{}] timed out after [{}]", task.source, timeout);
toRemove.add(task);
}
}
if (toRemove.isEmpty() == false) {
BatchedTask firstTask = toRemove.get(0);
Object batchingKey = firstTask.batchingKey;
assert tasks.stream().allMatch(t -> t.batchingKey == batchingKey) :
"tasks submitted in a batch should share the same batching key: " + tasks;
synchronized (tasksPerBatchingKey) {
LinkedHashSet<BatchedTask> existingTasks = tasksPerBatchingKey.get(batchingKey);
if (existingTasks != null) {
existingTasks.removeAll(toRemove);
if (existingTasks.isEmpty()) {
tasksPerBatchingKey.remove(batchingKey);
}
}
}
onTimeout(toRemove, timeout);
}
}

/**
* Action to be implemented by the specific batching implementation.
* All tasks have the same batching key.
*/
protected abstract void onTimeout(List<? extends BatchedTask> tasks, TimeValue timeout);

void runIfNotProcessed(BatchedTask updateTask) {
// if this task is already processed, it shouldn't execute other tasks with same batching key that arrived later,
// to give other tasks with different batching key a chance to execute.
if (updateTask.processed.get() == false) {
final List<BatchedTask> toExecute = new ArrayList<>();
final Map<String, List<BatchedTask>> processTasksBySource = new HashMap<>();
synchronized (tasksPerBatchingKey) {
LinkedHashSet<BatchedTask> pending = tasksPerBatchingKey.remove(updateTask.batchingKey);
if (pending != null) {
for (BatchedTask task : pending) {
if (task.processed.getAndSet(true) == false) {
logger.trace("will process {}", task);
toExecute.add(task);
processTasksBySource.computeIfAbsent(task.source, s -> new ArrayList<>()).add(task);
} else {
logger.trace("skipping {}, already processed", task);
}
}
}
}

if (toExecute.isEmpty() == false) {
final String tasksSummary = processTasksBySource.entrySet().stream().map(entry -> {
String tasks = updateTask.describeTasks(entry.getValue());
return tasks.isEmpty() ? entry.getKey() : entry.getKey() + "[" + tasks + "]";
}).reduce((s1, s2) -> s1 + ", " + s2).orElse("");

run(updateTask.batchingKey, toExecute, tasksSummary);
}
}
}

/**
* Action to be implemented by the specific batching implementation
* All tasks have the given batching key.
*/
protected abstract void run(Object batchingKey, List<? extends BatchedTask> tasks, String tasksSummary);

/**
* Represents a runnable task that supports batching.
* Implementors of TaskBatcher can subclass this to add a payload to the task.
*/
protected abstract class BatchedTask extends SourcePrioritizedRunnable {
/**
* whether the task has been processed already
*/
protected final AtomicBoolean processed = new AtomicBoolean();

/**
* the object that is used as batching key
*/
protected final Object batchingKey;
/**
* the task object that is wrapped
*/
protected final Object task;

protected BatchedTask(Priority priority, String source, Object batchingKey, Object task) {
super(priority, source);
this.batchingKey = batchingKey;
this.task = task;
}

@Override
public void run() {
runIfNotProcessed(this);
}

@Override
public String toString() {
String taskDescription = describeTasks(Collections.singletonList(this));
if (taskDescription.isEmpty()) {
return "[" + source + "]";
} else {
return "[" + source + "[" + taskDescription + "]]";
}
}

public abstract String describeTasks(List<? extends BatchedTask> tasks);

public Object getTask() {
return task;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedTransferQueue;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
Expand All @@ -56,8 +57,8 @@ public static int boundedNumberOfProcessors(Settings settings) {
return PROCESSORS_SETTING.get(settings);
}

public static PrioritizedEsThreadPoolExecutor newSinglePrioritizing(String name, ThreadFactory threadFactory, ThreadContext contextHolder) {
return new PrioritizedEsThreadPoolExecutor(name, 1, 1, 0L, TimeUnit.MILLISECONDS, threadFactory, contextHolder);
public static PrioritizedEsThreadPoolExecutor newSinglePrioritizing(String name, ThreadFactory threadFactory, ThreadContext contextHolder, ScheduledExecutorService timer) {
return new PrioritizedEsThreadPoolExecutor(name, 1, 1, 0L, TimeUnit.MILLISECONDS, threadFactory, contextHolder, timer);
}

public static EsThreadPoolExecutor newScaling(String name, int min, int max, long keepAliveTime, TimeUnit unit, ThreadFactory threadFactory, ThreadContext contextHolder) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,11 +44,14 @@
public class PrioritizedEsThreadPoolExecutor extends EsThreadPoolExecutor {

private static final TimeValue NO_WAIT_TIME_VALUE = TimeValue.timeValueMillis(0);
private AtomicLong insertionOrder = new AtomicLong();
private Queue<Runnable> current = ConcurrentCollections.newQueue();
private final AtomicLong insertionOrder = new AtomicLong();
private final Queue<Runnable> current = ConcurrentCollections.newQueue();
private final ScheduledExecutorService timer;

PrioritizedEsThreadPoolExecutor(String name, int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, ThreadFactory threadFactory, ThreadContext contextHolder) {
PrioritizedEsThreadPoolExecutor(String name, int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit,
ThreadFactory threadFactory, ThreadContext contextHolder, ScheduledExecutorService timer) {
super(name, corePoolSize, maximumPoolSize, keepAliveTime, unit, new PriorityBlockingQueue<>(), threadFactory, contextHolder);
this.timer = timer;
}

public Pending[] getPending() {
Expand Down Expand Up @@ -111,7 +114,7 @@ protected void afterExecute(Runnable r, Throwable t) {
current.remove(r);
}

public void execute(Runnable command, final ScheduledExecutorService timer, final TimeValue timeout, final Runnable timeoutCallback) {
public void execute(Runnable command, final TimeValue timeout, final Runnable timeoutCallback) {
command = wrapRunnable(command);
doExecute(command);
if (timeout.nanos() >= 0) {
Expand Down
Loading

0 comments on commit 86f2e1e

Please sign in to comment.