diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java index 05d6a43739ec..e06a3fb8324c 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java @@ -667,11 +667,7 @@ public static StreamingDataflowWorker fromDataflowWorkerHarnessOptions( this.isDoneFuture = new CompletableFuture<>(); this.threadFactory = - r -> { - Thread t = new Thread(r); - t.setDaemon(true); - return t; - }; + new ThreadFactoryBuilder().setNameFormat("DataflowWorkUnits-%d").setDaemon(true).build(); this.workUnitExecutor = new BoundedQueueExecutor( chooseMaximumNumberOfThreads(), @@ -691,7 +687,7 @@ public static StreamingDataflowWorker fromDataflowWorkerHarnessOptions( memoryMonitorThread.setName("MemoryMonitor"); dispatchThread = - threadFactory.newThread( + new Thread( new Runnable() { @Override public void run() { @@ -704,11 +700,12 @@ public void run() { LOG.info("Dispatch done"); } }); + dispatchThread.setDaemon(true); dispatchThread.setPriority(Thread.MIN_PRIORITY); dispatchThread.setName("DispatchThread"); commitThread = - threadFactory.newThread( + new Thread( new Runnable() { @Override public void run() { @@ -719,6 +716,7 @@ public void run() { } } }); + commitThread.setDaemon(true); commitThread.setPriority(Thread.MAX_PRIORITY); commitThread.setName("CommitThread");