Skip to content

Commit

Permalink
Implemented max thread setting
Browse files Browse the repository at this point in the history
  • Loading branch information
albertzaharovits committed Jan 19, 2025
1 parent 3c203cb commit 6c21654
Showing 1 changed file with 34 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,11 @@
import org.elasticsearch.threadpool.ThreadPool;

import java.io.IOException;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Locale;
import java.util.PriorityQueue;
import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
Expand Down Expand Up @@ -67,6 +70,8 @@ public class ThreadPoolMergeScheduler extends MergeScheduler implements Elastics
private final ExecutorService executorService;
private final int maxThreadPoolSize;
private final ThreadLocal<MergeRateLimiter> onGoingMergeRateLimiter = new ThreadLocal<>();
private final PriorityQueue<MergeTask> activeMergeTasksLocalSchedulerQueue = new PriorityQueue<>();
private final List<MergeTask> activeMergeTasksExecutingOnLocalSchedulerList = new ArrayList<>();

public ThreadPoolMergeScheduler(ShardId shardId, IndexSettings indexSettings, ThreadPool threadPool) {
this.config = indexSettings.getMergeSchedulerConfig();
Expand Down Expand Up @@ -135,6 +140,32 @@ private void submitNewMergeTask(MergeSource mergeSource, MergePolicy.OneMerge me
if (mergeTask.isAutoThrottle) {
trackNewActiveThrottledMergeTask(mergeTask, maxThreadPoolSize);
}
synchronized (this) {
activeMergeTasksLocalSchedulerQueue.add(mergeTask);
}
maybeExecuteNextMerge();
}

private void mergeDone(MergeTask mergeTask) {
synchronized (this) {
activeMergeTasksExecutingOnLocalSchedulerList.remove(mergeTask);
}
maybeExecuteNextMerge();
}

private void maybeExecuteNextMerge() {
MergeTask mergeTask;
synchronized (this) {
if (activeMergeTasksExecutingOnLocalSchedulerList.size() >= config.getMaxThreadCount()) {
return;
}
mergeTask = activeMergeTasksLocalSchedulerQueue.poll();
if (mergeTask == null) {
// no more merges to execute
return;
}
activeMergeTasksExecutingOnLocalSchedulerList.add(mergeTask);
}
executorService.execute(mergeTask);
}

Expand Down Expand Up @@ -293,6 +324,7 @@ public void onAfter() {
if (isAutoThrottle) {
removeFromActiveThrottledMergeTasks(this);
}
mergeDone(this);
// kick-off next merge, if any
MergePolicy.OneMerge nextMerge = null;
try {
Expand All @@ -319,6 +351,7 @@ public void onFailure(Exception e) {
// plus the engine is probably going to be failed when any merge fails,
// but keep this in case something believes calling `MergeTask#onFailure` is a sane way to abort a merge
abortOnGoingMerge();
mergeDone(this);
handleMergeException(e);
}

Expand All @@ -333,6 +366,7 @@ public void onRejection(Exception e) {
message(String.format(Locale.ROOT, "merge task [%s] rejected by thread pool, aborting", onGoingMerge.getId()));
}
abortOnGoingMerge();
mergeDone(this);
}

private void abortOnGoingMerge() {
Expand Down

0 comments on commit 6c21654

Please sign in to comment.