Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Identify system threads using a Thread subclass #113562

Merged
Original file line number Diff line number Diff line change
Expand Up @@ -326,39 +326,62 @@ public static String executorName(Thread thread) {
}

public static ThreadFactory daemonThreadFactory(Settings settings, String namePrefix) {
return daemonThreadFactory(threadName(settings, namePrefix));
return createDaemonThreadFactory(threadName(settings, namePrefix), false);
}

public static ThreadFactory daemonThreadFactory(String nodeName, String namePrefix) {
return daemonThreadFactory(nodeName, namePrefix, false);
}

public static ThreadFactory daemonThreadFactory(String nodeName, String namePrefix, boolean isSystemThread) {
assert nodeName != null && false == nodeName.isEmpty();
return daemonThreadFactory(threadName(nodeName, namePrefix));
return createDaemonThreadFactory(threadName(nodeName, namePrefix), isSystemThread);
}

public static ThreadFactory daemonThreadFactory(String namePrefix) {
return new EsThreadFactory(namePrefix);
public static ThreadFactory daemonThreadFactory(String name) {
assert name != null && name.isEmpty() == false;
return createDaemonThreadFactory(name, false);
}

private static ThreadFactory createDaemonThreadFactory(String namePrefix, boolean isSystemThread) {
return new EsThreadFactory(namePrefix, isSystemThread);
}

static class EsThreadFactory implements ThreadFactory {

final ThreadGroup group;
final AtomicInteger threadNumber = new AtomicInteger(1);
final String namePrefix;
final boolean isSystem;

EsThreadFactory(String namePrefix) {
EsThreadFactory(String namePrefix, boolean isSystem) {
this.namePrefix = namePrefix;
SecurityManager s = System.getSecurityManager();
group = (s != null) ? s.getThreadGroup() : Thread.currentThread().getThreadGroup();
this.isSystem = isSystem;
}

@Override
public Thread newThread(Runnable r) {
return AccessController.doPrivileged((PrivilegedAction<Thread>) () -> {
Thread t = new Thread(group, r, namePrefix + "[T#" + threadNumber.getAndIncrement() + "]", 0);
Thread t = new EsThread(group, r, namePrefix + "[T#" + threadNumber.getAndIncrement() + "]", 0, isSystem);
t.setDaemon(true);
return t;
});
}
}

public static class EsThread extends Thread {
private final boolean isSystem;

EsThread(ThreadGroup group, Runnable target, String name, long stackSize, boolean isSystem) {
super(group, target, name, stackSize);
this.isSystem = isSystem;
}

public boolean isSystem() {
return isSystem;
}
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -170,7 +170,8 @@ public Map<String, ExecutorBuilder> getBuilders(Settings settings, int allocated
ThreadPool.Names.SYSTEM_READ,
halfProcMaxAt5,
2000,
EsExecutors.TaskTrackingConfig.DO_NOT_TRACK
EsExecutors.TaskTrackingConfig.DO_NOT_TRACK,
true
)
);
result.put(
Expand All @@ -180,7 +181,8 @@ public Map<String, ExecutorBuilder> getBuilders(Settings settings, int allocated
ThreadPool.Names.SYSTEM_WRITE,
halfProcMaxAt5,
1000,
new EsExecutors.TaskTrackingConfig(true, indexAutoscalingEWMA)
new EsExecutors.TaskTrackingConfig(true, indexAutoscalingEWMA),
true
)
);
result.put(
Expand All @@ -190,7 +192,8 @@ public Map<String, ExecutorBuilder> getBuilders(Settings settings, int allocated
ThreadPool.Names.SYSTEM_CRITICAL_READ,
halfProcMaxAt5,
2000,
EsExecutors.TaskTrackingConfig.DO_NOT_TRACK
EsExecutors.TaskTrackingConfig.DO_NOT_TRACK,
true
)
);
result.put(
Expand All @@ -200,7 +203,8 @@ public Map<String, ExecutorBuilder> getBuilders(Settings settings, int allocated
ThreadPool.Names.SYSTEM_CRITICAL_WRITE,
halfProcMaxAt5,
1500,
new EsExecutors.TaskTrackingConfig(true, indexAutoscalingEWMA)
new EsExecutors.TaskTrackingConfig(true, indexAutoscalingEWMA),
true
)
);
return unmodifiableMap(result);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,11 @@
public abstract class ExecutorBuilder<U extends ExecutorBuilder.ExecutorSettings> {

private final String name;
private final boolean isSystemThread;

public ExecutorBuilder(String name) {
public ExecutorBuilder(String name, boolean isSystemThread) {
this.name = name;
this.isSystemThread = isSystemThread;
}

protected String name() {
Expand Down Expand Up @@ -90,4 +92,7 @@ abstract static class ExecutorSettings {

}

public boolean isSystemThread() {
return isSystemThread;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,28 @@ public final class FixedExecutorBuilder extends ExecutorBuilder<FixedExecutorBui
final int queueSize,
final TaskTrackingConfig taskTrackingConfig
) {
this(settings, name, size, queueSize, "thread_pool." + name, taskTrackingConfig);
this(settings, name, size, queueSize, "thread_pool." + name, taskTrackingConfig, false);
}

/**
* Construct a fixed executor builder; the settings will have the key prefix "thread_pool." followed by the executor name.
*
* @param settings the node-level settings
* @param name the name of the executor
* @param size the fixed number of threads
* @param queueSize the size of the backing queue, -1 for unbounded
* @param taskTrackingConfig whether to track statics about task execution time
* @param isSystemThread whether the threads are system threads
*/
FixedExecutorBuilder(
final Settings settings,
final String name,
final int size,
final int queueSize,
final TaskTrackingConfig taskTrackingConfig,
boolean isSystemThread
) {
this(settings, name, size, queueSize, "thread_pool." + name, taskTrackingConfig, isSystemThread);
}

/**
Expand All @@ -72,7 +93,29 @@ public FixedExecutorBuilder(
final String prefix,
final TaskTrackingConfig taskTrackingConfig
) {
super(name);
this(settings, name, size, queueSize, prefix, taskTrackingConfig, false);
}

/**
* Construct a fixed executor builder.
*
* @param settings the node-level settings
* @param name the name of the executor
* @param size the fixed number of threads
* @param queueSize the size of the backing queue, -1 for unbounded
* @param prefix the prefix for the settings keys
* @param taskTrackingConfig whether to track statics about task execution time
*/
public FixedExecutorBuilder(
final Settings settings,
final String name,
final int size,
final int queueSize,
final String prefix,
final TaskTrackingConfig taskTrackingConfig,
final boolean isSystemThread
) {
super(name, isSystemThread);
final String sizeKey = settingsKey(prefix, "size");
this.sizeSetting = new Setting<>(
sizeKey,
Expand Down Expand Up @@ -102,7 +145,7 @@ FixedExecutorSettings getSettings(Settings settings) {
ThreadPool.ExecutorHolder build(final FixedExecutorSettings settings, final ThreadContext threadContext) {
int size = settings.size;
int queueSize = settings.queueSize;
final ThreadFactory threadFactory = EsExecutors.daemonThreadFactory(EsExecutors.threadName(settings.nodeName, name()));
final ThreadFactory threadFactory = EsExecutors.daemonThreadFactory(settings.nodeName, name(), isSystemThread());
final ExecutorService executor = EsExecutors.newFixed(
settings.nodeName + "/" + name(),
size,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ public ScalingExecutorBuilder(
final String prefix,
final EsExecutors.TaskTrackingConfig trackingConfig
) {
super(name);
super(name, false);
this.coreSetting = Setting.intSetting(settingsKey(prefix, "core"), core, Setting.Property.NodeScope);
this.maxSetting = Setting.intSetting(settingsKey(prefix, "max"), max, Setting.Property.NodeScope);
this.keepAliveSetting = Setting.timeSetting(settingsKey(prefix, "keep_alive"), keepAlive, Setting.Property.NodeScope);
Expand All @@ -131,7 +131,7 @@ ThreadPool.ExecutorHolder build(final ScalingExecutorSettings settings, final Th
int core = settings.core;
int max = settings.max;
final ThreadPool.Info info = new ThreadPool.Info(name(), ThreadPool.ThreadPoolType.SCALING, core, max, keepAlive, null);
final ThreadFactory threadFactory = EsExecutors.daemonThreadFactory(EsExecutors.threadName(settings.nodeName, name()));
final ThreadFactory threadFactory = EsExecutors.daemonThreadFactory(settings.nodeName, name());
ExecutorService executor;
executor = EsExecutors.newScaling(
settings.nodeName + "/" + name(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -635,15 +635,19 @@ public void testParseExecutorName() throws InterruptedException {
final var executorName = randomAlphaOfLength(10);
final String nodeName = rarely() ? null : randomIdentifier();
final ThreadFactory threadFactory;
final boolean isSystem;
if (nodeName == null) {
isSystem = false;
threadFactory = EsExecutors.daemonThreadFactory(Settings.EMPTY, executorName);
} else if (randomBoolean()) {
isSystem = false;
threadFactory = EsExecutors.daemonThreadFactory(
Settings.builder().put(Node.NODE_NAME_SETTING.getKey(), nodeName).build(),
executorName
);
} else {
threadFactory = EsExecutors.daemonThreadFactory(nodeName, executorName);
isSystem = randomBoolean();
threadFactory = EsExecutors.daemonThreadFactory(nodeName, executorName, isSystem);
}

final var thread = threadFactory.newThread(() -> {});
Expand All @@ -652,6 +656,8 @@ public void testParseExecutorName() throws InterruptedException {
assertThat(EsExecutors.executorName(thread), equalTo(executorName));
assertThat(EsExecutors.executorName("TEST-" + thread.getName()), is(nullValue()));
assertThat(EsExecutors.executorName("LuceneTestCase" + thread.getName()), is(nullValue()));
assertThat(EsExecutors.executorName("LuceneTestCase" + thread.getName()), is(nullValue()));
assertThat(((EsExecutors.EsThread) thread).isSystem(), equalTo(isSystem));
} finally {
thread.join();
}
Expand Down