From 22811b56f0badf44384bc7631cb0295b5f0bf892 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Reto=20Sch=C3=BCttel?= Date: Thu, 12 Oct 2017 15:20:51 +0200 Subject: [PATCH] Use a dedicated ThreadGroup in rest sniffer (#26897) This change adds a dedicated thread group, configures threads with a corresponding thread name and starts all threads as daemon threads. --- .../elasticsearch/client/sniff/Sniffer.java | 38 ++++++++++++++++++- 1 file changed, 37 insertions(+), 1 deletion(-) diff --git a/client/sniffer/src/main/java/org/elasticsearch/client/sniff/Sniffer.java b/client/sniffer/src/main/java/org/elasticsearch/client/sniff/Sniffer.java index 89a7d9df8e60d..c655babd9ed3d 100644 --- a/client/sniffer/src/main/java/org/elasticsearch/client/sniff/Sniffer.java +++ b/client/sniffer/src/main/java/org/elasticsearch/client/sniff/Sniffer.java @@ -27,12 +27,16 @@ import java.io.Closeable; import java.io.IOException; +import java.security.AccessController; +import java.security.PrivilegedAction; import java.util.List; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.ThreadFactory; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; /** * Class responsible for sniffing nodes from some source (default is elasticsearch itself) and setting them to a provided instance of @@ -45,6 +49,7 @@ public class Sniffer implements Closeable { private static final Log logger = LogFactory.getLog(Sniffer.class); + private static final String SNIFFER_THREAD_NAME = "es_rest_client_sniffer"; private final Task task; @@ -79,7 +84,8 @@ private Task(HostsSniffer hostsSniffer, RestClient restClient, long sniffInterva this.restClient = restClient; this.sniffIntervalMillis = sniffIntervalMillis; this.sniffAfterFailureDelayMillis = sniffAfterFailureDelayMillis; - this.scheduledExecutorService = Executors.newScheduledThreadPool(1); + SnifferThreadFactory threadFactory = new SnifferThreadFactory(SNIFFER_THREAD_NAME); + this.scheduledExecutorService = Executors.newScheduledThreadPool(1, threadFactory); scheduleNextRun(0); } @@ -151,4 +157,34 @@ synchronized void shutdown() { public static SnifferBuilder builder(RestClient restClient) { return new SnifferBuilder(restClient); } + + private static class SnifferThreadFactory implements ThreadFactory { + + private final AtomicInteger threadNumber = new AtomicInteger(1); + private final String namePrefix; + private final ThreadFactory originalThreadFactory; + + private SnifferThreadFactory(String namePrefix) { + this.namePrefix = namePrefix; + this.originalThreadFactory = AccessController.doPrivileged(new PrivilegedAction() { + @Override + public ThreadFactory run() { + return Executors.defaultThreadFactory(); + } + }); + } + + @Override + public Thread newThread(final Runnable r) { + return AccessController.doPrivileged(new PrivilegedAction() { + @Override + public Thread run() { + Thread t = originalThreadFactory.newThread(r); + t.setName(namePrefix + "[T#" + threadNumber.getAndIncrement() + "]"); + t.setDaemon(true); + return t; + } + }); + } + } }