From eb65af31ef03a1fe69af4b8a64347bf20f04c025 Mon Sep 17 00:00:00 2001 From: Lei Wang Date: Fri, 3 Jan 2014 04:16:25 +0000 Subject: [PATCH] [split] Bridged thread pool scheduler RB_ID=252615 --- .../com/twitter/concurrent/Scheduler.scala | 106 ++++++++++++------ 1 file changed, 74 insertions(+), 32 deletions(-) diff --git a/util-core/src/main/scala/com/twitter/concurrent/Scheduler.scala b/util-core/src/main/scala/com/twitter/concurrent/Scheduler.scala index 6a6a66ad37..8616f53c7d 100644 --- a/util-core/src/main/scala/com/twitter/concurrent/Scheduler.scala +++ b/util-core/src/main/scala/com/twitter/concurrent/Scheduler.scala @@ -1,7 +1,8 @@ package com.twitter.concurrent import java.util.ArrayDeque -import java.util.concurrent.Executors +import java.util.concurrent.{Executors, ExecutorService, ThreadFactory} +import java.util.concurrent.atomic.AtomicInteger import management.ManagementFactory import scala.util.Random @@ -16,7 +17,7 @@ trait Scheduler { * work to do. */ def flush() - + // A note on Hotspot's ThreadMXBean's CPU time. On Linux, this // uses clock_gettime[1] which should both be fast and accurate. // @@ -26,10 +27,10 @@ trait Scheduler { /** The amount of User time that's been scheduled as per ThreadMXBean. */ def usrTime: Long - + /** The amount of CPU time that's been scheduled as per ThreadMXBean */ def cpuTime: Long - + /** Number of dispatches performed by this scheduler. */ def numDispatches: Long } @@ -39,7 +40,7 @@ trait Scheduler { */ object Scheduler extends Scheduler { @volatile private var self: Scheduler = new LocalScheduler - + def apply(): Scheduler = self // Note: This can be unsafe since some schedulers may be active, @@ -70,17 +71,17 @@ private class LocalScheduler extends Scheduler { override def initialValue = null } - private class Activation extends Scheduler { + private class Activation extends Scheduler { private[this] var r0, r1, r2: Runnable = null private[this] val rs = new ArrayDeque[Runnable] private[this] var running = false private[this] val rng = new Random - + // This is safe: there's only one updater. @volatile var usrTime = 0L @volatile var cpuTime = 0L @volatile var numDispatches = 0L - + def submit(r: Runnable) { assert(r != null) if (r0 == null) r0 = r @@ -139,44 +140,47 @@ private class LocalScheduler extends Scheduler { // Scheduler implementation: def submit(r: Runnable) = get().submit(r) def flush() = get().flush() - + def usrTime = (activations map (_.usrTime)).sum def cpuTime = (activations map (_.cpuTime)).sum def numDispatches = (activations map (_.numDispatches)).sum } -/** - * A scheduler that dispatches directly to an underlying Java - * cached threadpool executor. - */ -class ThreadPoolScheduler(name: String) extends Scheduler { - private[this] val bean = ManagementFactory.getThreadMXBean() +trait ExecutorScheduler { self: Scheduler => + val name: String + val executorFactory: ThreadFactory => ExecutorService + + protected[this] val bean = ManagementFactory.getThreadMXBean() + protected val threadGroup: ThreadGroup = new ThreadGroup(name) @volatile private[this] var threads = Set[Thread]() - private[this] val threadFactory = - new NamedPoolThreadFactory(name, true/*daemons*/) { - override def newThread(r: Runnable) = { - super.newThread(new Runnable { - def run() { - val t = Thread.currentThread() - synchronized { threads += t } - try r.run() finally { - synchronized { threads -= t } - } - } - }) - } + protected val threadFactory = new ThreadFactory { + private val n = new AtomicInteger(1) + + def newThread(r: Runnable) = { + val thread = new Thread(threadGroup, r, name + "-" + n.getAndIncrement()) + thread.setDaemon(true) + thread } + } - private[this] val executor = - Executors.newCachedThreadPool(threadFactory) + protected def threads() = { + // We add 2x slop here because it's inherently racy to enumerate + // threads. Since this is used only for monitoring purposes, we + // don't try too hard. + val threads = new Array[Thread](threadGroup.activeCount*2) + val n = threadGroup.enumerate(threads) + threads take n + } + + protected[this] val executor = executorFactory(threadFactory) def shutdown() { executor.shutdown() } def submit(r: Runnable) { executor.submit(r) } def flush() = () def usrTime = { var sum = 0L - for (t <- threads) { + for (t <- threads()) { val time = bean.getThreadUserTime(t.getId()) if (time > 0) sum += time } @@ -185,7 +189,7 @@ class ThreadPoolScheduler(name: String) extends Scheduler { def cpuTime = { var sum = 0L - for (t <- threads) { + for (t <- threads()) { val time = bean.getThreadCpuTime(t.getId()) if (time > 0) sum += time } @@ -193,4 +197,42 @@ class ThreadPoolScheduler(name: String) extends Scheduler { } def numDispatches = -1L // Unsupported + + def getExecutor = executor +} + +/** + * A scheduler that dispatches directly to an underlying Java + * cached threadpool executor. + */ +class ThreadPoolScheduler( + val name: String, + val executorFactory: ThreadFactory => ExecutorService +) extends Scheduler with ExecutorScheduler { + def this(name: String) = this(name, Executors.newCachedThreadPool(_)) +} + +/** + * A scheduler that will bridge tasks from outside into the executor threads, + * while keeping all local tasks on their local threads. + */ +class BridgedThreadPoolScheduler( + val name: String, + val executorFactory: ThreadFactory => ExecutorService +) extends Scheduler with ExecutorScheduler { + private[this] val local = new LocalScheduler + + def this(name: String) = this(name, Executors.newCachedThreadPool(_)) + + override def submit(r: Runnable) { + if (Thread.currentThread.getThreadGroup == threadGroup) + local.submit(r) + else + executor.submit(new Runnable { + def run() { + BridgedThreadPoolScheduler.this.submit(r) + } + }) + } } +