Skip to content

Commit

Permalink
Identify system threads using a Thread subclass (elastic#113562)
Browse files Browse the repository at this point in the history
  • Loading branch information
carlosdelest authored Oct 25, 2024
1 parent 5714b98 commit bbd887a
Show file tree
Hide file tree
Showing 6 changed files with 98 additions and 17 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -326,39 +326,62 @@ public static String executorName(Thread thread) {
}

public static ThreadFactory daemonThreadFactory(Settings settings, String namePrefix) {
return daemonThreadFactory(threadName(settings, namePrefix));
return createDaemonThreadFactory(threadName(settings, namePrefix), false);
}

public static ThreadFactory daemonThreadFactory(String nodeName, String namePrefix) {
return daemonThreadFactory(nodeName, namePrefix, false);
}

public static ThreadFactory daemonThreadFactory(String nodeName, String namePrefix, boolean isSystemThread) {
assert nodeName != null && false == nodeName.isEmpty();
return daemonThreadFactory(threadName(nodeName, namePrefix));
return createDaemonThreadFactory(threadName(nodeName, namePrefix), isSystemThread);
}

public static ThreadFactory daemonThreadFactory(String namePrefix) {
return new EsThreadFactory(namePrefix);
public static ThreadFactory daemonThreadFactory(String name) {
assert name != null && name.isEmpty() == false;
return createDaemonThreadFactory(name, false);
}

private static ThreadFactory createDaemonThreadFactory(String namePrefix, boolean isSystemThread) {
return new EsThreadFactory(namePrefix, isSystemThread);
}

static class EsThreadFactory implements ThreadFactory {

final ThreadGroup group;
final AtomicInteger threadNumber = new AtomicInteger(1);
final String namePrefix;
final boolean isSystem;

EsThreadFactory(String namePrefix) {
EsThreadFactory(String namePrefix, boolean isSystem) {
this.namePrefix = namePrefix;
SecurityManager s = System.getSecurityManager();
group = (s != null) ? s.getThreadGroup() : Thread.currentThread().getThreadGroup();
this.isSystem = isSystem;
}

@Override
public Thread newThread(Runnable r) {
return AccessController.doPrivileged((PrivilegedAction<Thread>) () -> {
Thread t = new Thread(group, r, namePrefix + "[T#" + threadNumber.getAndIncrement() + "]", 0);
Thread t = new EsThread(group, r, namePrefix + "[T#" + threadNumber.getAndIncrement() + "]", 0, isSystem);
t.setDaemon(true);
return t;
});
}
}

public static class EsThread extends Thread {
private final boolean isSystem;

EsThread(ThreadGroup group, Runnable target, String name, long stackSize, boolean isSystem) {
super(group, target, name, stackSize);
this.isSystem = isSystem;
}

public boolean isSystem() {
return isSystem;
}
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -170,7 +170,8 @@ public Map<String, ExecutorBuilder> getBuilders(Settings settings, int allocated
ThreadPool.Names.SYSTEM_READ,
halfProcMaxAt5,
2000,
EsExecutors.TaskTrackingConfig.DO_NOT_TRACK
EsExecutors.TaskTrackingConfig.DO_NOT_TRACK,
true
)
);
result.put(
Expand All @@ -180,7 +181,8 @@ public Map<String, ExecutorBuilder> getBuilders(Settings settings, int allocated
ThreadPool.Names.SYSTEM_WRITE,
halfProcMaxAt5,
1000,
new EsExecutors.TaskTrackingConfig(true, indexAutoscalingEWMA)
new EsExecutors.TaskTrackingConfig(true, indexAutoscalingEWMA),
true
)
);
result.put(
Expand All @@ -190,7 +192,8 @@ public Map<String, ExecutorBuilder> getBuilders(Settings settings, int allocated
ThreadPool.Names.SYSTEM_CRITICAL_READ,
halfProcMaxAt5,
2000,
EsExecutors.TaskTrackingConfig.DO_NOT_TRACK
EsExecutors.TaskTrackingConfig.DO_NOT_TRACK,
true
)
);
result.put(
Expand All @@ -200,7 +203,8 @@ public Map<String, ExecutorBuilder> getBuilders(Settings settings, int allocated
ThreadPool.Names.SYSTEM_CRITICAL_WRITE,
halfProcMaxAt5,
1500,
new EsExecutors.TaskTrackingConfig(true, indexAutoscalingEWMA)
new EsExecutors.TaskTrackingConfig(true, indexAutoscalingEWMA),
true
)
);
return unmodifiableMap(result);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,11 @@
public abstract class ExecutorBuilder<U extends ExecutorBuilder.ExecutorSettings> {

private final String name;
private final boolean isSystemThread;

public ExecutorBuilder(String name) {
public ExecutorBuilder(String name, boolean isSystemThread) {
this.name = name;
this.isSystemThread = isSystemThread;
}

protected String name() {
Expand Down Expand Up @@ -90,4 +92,7 @@ abstract static class ExecutorSettings {

}

public boolean isSystemThread() {
return isSystemThread;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,28 @@ public final class FixedExecutorBuilder extends ExecutorBuilder<FixedExecutorBui
final int queueSize,
final TaskTrackingConfig taskTrackingConfig
) {
this(settings, name, size, queueSize, "thread_pool." + name, taskTrackingConfig);
this(settings, name, size, queueSize, "thread_pool." + name, taskTrackingConfig, false);
}

/**
* Construct a fixed executor builder; the settings will have the key prefix "thread_pool." followed by the executor name.
*
* @param settings the node-level settings
* @param name the name of the executor
* @param size the fixed number of threads
* @param queueSize the size of the backing queue, -1 for unbounded
* @param taskTrackingConfig whether to track statics about task execution time
* @param isSystemThread whether the threads are system threads
*/
FixedExecutorBuilder(
final Settings settings,
final String name,
final int size,
final int queueSize,
final TaskTrackingConfig taskTrackingConfig,
boolean isSystemThread
) {
this(settings, name, size, queueSize, "thread_pool." + name, taskTrackingConfig, isSystemThread);
}

/**
Expand All @@ -72,7 +93,29 @@ public FixedExecutorBuilder(
final String prefix,
final TaskTrackingConfig taskTrackingConfig
) {
super(name);
this(settings, name, size, queueSize, prefix, taskTrackingConfig, false);
}

/**
* Construct a fixed executor builder.
*
* @param settings the node-level settings
* @param name the name of the executor
* @param size the fixed number of threads
* @param queueSize the size of the backing queue, -1 for unbounded
* @param prefix the prefix for the settings keys
* @param taskTrackingConfig whether to track statics about task execution time
*/
public FixedExecutorBuilder(
final Settings settings,
final String name,
final int size,
final int queueSize,
final String prefix,
final TaskTrackingConfig taskTrackingConfig,
final boolean isSystemThread
) {
super(name, isSystemThread);
final String sizeKey = settingsKey(prefix, "size");
this.sizeSetting = new Setting<>(
sizeKey,
Expand Down Expand Up @@ -102,7 +145,7 @@ FixedExecutorSettings getSettings(Settings settings) {
ThreadPool.ExecutorHolder build(final FixedExecutorSettings settings, final ThreadContext threadContext) {
int size = settings.size;
int queueSize = settings.queueSize;
final ThreadFactory threadFactory = EsExecutors.daemonThreadFactory(EsExecutors.threadName(settings.nodeName, name()));
final ThreadFactory threadFactory = EsExecutors.daemonThreadFactory(settings.nodeName, name(), isSystemThread());
final ExecutorService executor = EsExecutors.newFixed(
settings.nodeName + "/" + name(),
size,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ public ScalingExecutorBuilder(
final String prefix,
final EsExecutors.TaskTrackingConfig trackingConfig
) {
super(name);
super(name, false);
this.coreSetting = Setting.intSetting(settingsKey(prefix, "core"), core, Setting.Property.NodeScope);
this.maxSetting = Setting.intSetting(settingsKey(prefix, "max"), max, Setting.Property.NodeScope);
this.keepAliveSetting = Setting.timeSetting(settingsKey(prefix, "keep_alive"), keepAlive, Setting.Property.NodeScope);
Expand All @@ -131,7 +131,7 @@ ThreadPool.ExecutorHolder build(final ScalingExecutorSettings settings, final Th
int core = settings.core;
int max = settings.max;
final ThreadPool.Info info = new ThreadPool.Info(name(), ThreadPool.ThreadPoolType.SCALING, core, max, keepAlive, null);
final ThreadFactory threadFactory = EsExecutors.daemonThreadFactory(EsExecutors.threadName(settings.nodeName, name()));
final ThreadFactory threadFactory = EsExecutors.daemonThreadFactory(settings.nodeName, name());
ExecutorService executor;
executor = EsExecutors.newScaling(
settings.nodeName + "/" + name(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -635,15 +635,19 @@ public void testParseExecutorName() throws InterruptedException {
final var executorName = randomAlphaOfLength(10);
final String nodeName = rarely() ? null : randomIdentifier();
final ThreadFactory threadFactory;
final boolean isSystem;
if (nodeName == null) {
isSystem = false;
threadFactory = EsExecutors.daemonThreadFactory(Settings.EMPTY, executorName);
} else if (randomBoolean()) {
isSystem = false;
threadFactory = EsExecutors.daemonThreadFactory(
Settings.builder().put(Node.NODE_NAME_SETTING.getKey(), nodeName).build(),
executorName
);
} else {
threadFactory = EsExecutors.daemonThreadFactory(nodeName, executorName);
isSystem = randomBoolean();
threadFactory = EsExecutors.daemonThreadFactory(nodeName, executorName, isSystem);
}

final var thread = threadFactory.newThread(() -> {});
Expand All @@ -652,6 +656,8 @@ public void testParseExecutorName() throws InterruptedException {
assertThat(EsExecutors.executorName(thread), equalTo(executorName));
assertThat(EsExecutors.executorName("TEST-" + thread.getName()), is(nullValue()));
assertThat(EsExecutors.executorName("LuceneTestCase" + thread.getName()), is(nullValue()));
assertThat(EsExecutors.executorName("LuceneTestCase" + thread.getName()), is(nullValue()));
assertThat(((EsExecutors.EsThread) thread).isSystem(), equalTo(isSystem));
} finally {
thread.join();
}
Expand Down

0 comments on commit bbd887a

Please sign in to comment.