Skip to content

Commit

Permalink
Add support for creating TaskExecutionTimeTrackingEsThreadPoolExecuto…
Browse files Browse the repository at this point in the history
…r scaling executors (elastic#109674)

This adds support to EsExecutors for creating TaskExecutionTimeTrackingEsThreadPoolExecutor
scaling executors by providing a non-nullable TaskTrackingConfig.
  • Loading branch information
andreidan authored Jun 13, 2024
1 parent 79a2cd5 commit 0dc56ab
Show file tree
Hide file tree
Showing 3 changed files with 129 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -103,22 +103,63 @@ public static EsThreadPoolExecutor newScaling(
TimeUnit unit,
boolean rejectAfterShutdown,
ThreadFactory threadFactory,
ThreadContext contextHolder
ThreadContext contextHolder,
TaskTrackingConfig config
) {
ExecutorScalingQueue<Runnable> queue = new ExecutorScalingQueue<>();
EsThreadPoolExecutor executor = new EsThreadPoolExecutor(
EsThreadPoolExecutor executor;
if (config.trackExecutionTime()) {
executor = new TaskExecutionTimeTrackingEsThreadPoolExecutor(
name,
min,
max,
keepAliveTime,
unit,
queue,
TimedRunnable::new,
threadFactory,
new ForceQueuePolicy(rejectAfterShutdown),
contextHolder,
config
);
} else {
executor = new EsThreadPoolExecutor(
name,
min,
max,
keepAliveTime,
unit,
queue,
threadFactory,
new ForceQueuePolicy(rejectAfterShutdown),
contextHolder
);
}
queue.executor = executor;
return executor;
}

public static EsThreadPoolExecutor newScaling(
String name,
int min,
int max,
long keepAliveTime,
TimeUnit unit,
boolean rejectAfterShutdown,
ThreadFactory threadFactory,
ThreadContext contextHolder
) {
return newScaling(
name,
min,
max,
keepAliveTime,
unit,
queue,
rejectAfterShutdown,
threadFactory,
new ForceQueuePolicy(rejectAfterShutdown),
contextHolder
contextHolder,
TaskTrackingConfig.DO_NOT_TRACK
);
queue.executor = executor;
return executor;
}

public static EsThreadPoolExecutor newFixed(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ public final class ScalingExecutorBuilder extends ExecutorBuilder<ScalingExecuto
private final Setting<Integer> maxSetting;
private final Setting<TimeValue> keepAliveSetting;
private final boolean rejectAfterShutdown;
private final EsExecutors.TaskTrackingConfig trackingConfig;

/**
* Construct a scaling executor builder; the settings will have the
Expand Down Expand Up @@ -76,12 +77,38 @@ public ScalingExecutorBuilder(
final TimeValue keepAlive,
final boolean rejectAfterShutdown,
final String prefix
) {
this(name, core, max, keepAlive, rejectAfterShutdown, prefix, EsExecutors.TaskTrackingConfig.DO_NOT_TRACK);
}

/**
* Construct a scaling executor builder; the settings will have the
* specified key prefix.
*
* @param name the name of the executor
* @param core the minimum number of threads in the pool
* @param max the maximum number of threads in the pool
* @param keepAlive the time that spare threads above {@code core}
* threads will be kept alive
* @param prefix the prefix for the settings keys
* @param rejectAfterShutdown set to {@code true} if the executor should reject tasks after shutdown
* @param trackingConfig configuration that'll indicate if we should track statistics about task execution time
*/
public ScalingExecutorBuilder(
final String name,
final int core,
final int max,
final TimeValue keepAlive,
final boolean rejectAfterShutdown,
final String prefix,
final EsExecutors.TaskTrackingConfig trackingConfig
) {
super(name);
this.coreSetting = Setting.intSetting(settingsKey(prefix, "core"), core, Setting.Property.NodeScope);
this.maxSetting = Setting.intSetting(settingsKey(prefix, "max"), max, Setting.Property.NodeScope);
this.keepAliveSetting = Setting.timeSetting(settingsKey(prefix, "keep_alive"), keepAlive, Setting.Property.NodeScope);
this.rejectAfterShutdown = rejectAfterShutdown;
this.trackingConfig = trackingConfig;
}

@Override
Expand All @@ -104,15 +131,17 @@ ThreadPool.ExecutorHolder build(final ScalingExecutorSettings settings, final Th
int max = settings.max;
final ThreadPool.Info info = new ThreadPool.Info(name(), ThreadPool.ThreadPoolType.SCALING, core, max, keepAlive, null);
final ThreadFactory threadFactory = EsExecutors.daemonThreadFactory(EsExecutors.threadName(settings.nodeName, name()));
final ExecutorService executor = EsExecutors.newScaling(
ExecutorService executor;
executor = EsExecutors.newScaling(
settings.nodeName + "/" + name(),
core,
max,
keepAlive.millis(),
TimeUnit.MILLISECONDS,
rejectAfterShutdown,
threadFactory,
threadContext
threadContext,
trackingConfig
);
return new ThreadPool.ExecutorHolder(executor, info);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import static org.hamcrest.Matchers.either;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.hasToString;
import static org.hamcrest.Matchers.instanceOf;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.lessThan;
import static org.hamcrest.Matchers.nullValue;
Expand Down Expand Up @@ -655,6 +656,55 @@ public void testParseExecutorName() throws InterruptedException {
}
}

public void testScalingWithTaskTimeTracking() {
final int min = between(1, 3);
final int max = between(min + 1, 6);

{
ThreadPoolExecutor pool = EsExecutors.newScaling(
getClass().getName() + "/" + getTestName(),
min,
max,
between(1, 100),
randomTimeUnit(),
randomBoolean(),
EsExecutors.daemonThreadFactory("test"),
threadContext,
new EsExecutors.TaskTrackingConfig(randomBoolean(), randomDoubleBetween(0.01, 0.1, true))
);
assertThat(pool, instanceOf(TaskExecutionTimeTrackingEsThreadPoolExecutor.class));
}

{
ThreadPoolExecutor pool = EsExecutors.newScaling(
getClass().getName() + "/" + getTestName(),
min,
max,
between(1, 100),
randomTimeUnit(),
randomBoolean(),
EsExecutors.daemonThreadFactory("test"),
threadContext
);
assertThat(pool, instanceOf(EsThreadPoolExecutor.class));
}

{
ThreadPoolExecutor pool = EsExecutors.newScaling(
getClass().getName() + "/" + getTestName(),
min,
max,
between(1, 100),
randomTimeUnit(),
randomBoolean(),
EsExecutors.daemonThreadFactory("test"),
threadContext,
DO_NOT_TRACK
);
assertThat(pool, instanceOf(EsThreadPoolExecutor.class));
}
}

private static void runRejectOnShutdownTest(ExecutorService executor) {
for (int i = between(0, 10); i > 0; i--) {
final var delayMillis = between(0, 100);
Expand Down

0 comments on commit 0dc56ab

Please sign in to comment.