Skip to content

Commit

Permalink
Fix threadpool size for SnapshotResiliencyTests
Browse files Browse the repository at this point in the history
  • Loading branch information
albertzaharovits committed Jan 19, 2025
1 parent 81cc0f1 commit f58120f
Showing 1 changed file with 9 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.apache.lucene.store.RateLimitedIndexOutput;
import org.apache.lucene.store.RateLimiter;
import org.apache.lucene.util.SetOnce;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.common.logging.Loggers;
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
import org.elasticsearch.core.TimeValue;
Expand All @@ -35,6 +36,7 @@
import java.util.HashSet;
import java.util.Locale;
import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

Expand Down Expand Up @@ -64,15 +66,17 @@ public class ThreadPoolMergeScheduler extends MergeScheduler implements Elastics
private final MergeSchedulerConfig config;
private final Logger logger;
private final MergeTracking mergeTracking;
private final ThreadPoolExecutor threadPoolExecutor;
private final ExecutorService executorService;
private final int maxThreadPoolSize;
private final ThreadLocal<MergeRateLimiter> onGoingMergeRateLimiter = new ThreadLocal<>();

public ThreadPoolMergeScheduler(ShardId shardId, IndexSettings indexSettings, ThreadPool threadPool) {
this.config = indexSettings.getMergeSchedulerConfig();
this.logger = Loggers.getLogger(getClass(), shardId);
this.mergeTracking = new MergeTracking(logger, () -> this.config.isAutoThrottle() ? targetMBPerSec : Double.POSITIVE_INFINITY);
// all merge schedulers must use the same thread pool
this.threadPoolExecutor = (ThreadPoolExecutor) threadPool.executor(ThreadPool.Names.MERGE);
// all merge schedulers must use the same executor of the same thread pool
this.executorService = threadPool.executor(ThreadPool.Names.MERGE);
this.maxThreadPoolSize = threadPool.info(ThreadPool.Names.MERGE).getMax();
}

@Override
Expand Down Expand Up @@ -131,9 +135,9 @@ protected void handleMergeException(Throwable t) {
private void submitNewMergeTask(MergeSource mergeSource, MergePolicy.OneMerge merge, MergeTrigger mergeTrigger) {
MergeTask mergeTask = newMergeTask(mergeSource, merge, mergeTrigger);
if (mergeTask.isAutoThrottle) {
trackNewActiveThrottledMergeTask(mergeTask, threadPoolExecutor.getMaximumPoolSize());
trackNewActiveThrottledMergeTask(mergeTask, maxThreadPoolSize);
}
threadPoolExecutor.execute(mergeTask);
executorService.execute(mergeTask);
}

private static double maybeUpdateTargetMBPerSec(int poolSize) {
Expand Down

0 comments on commit f58120f

Please sign in to comment.