Skip to content

Commit

Permalink
[split] Bridged thread pool scheduler
Browse files Browse the repository at this point in the history
RB_ID=252615
  • Loading branch information
Lei Wang authored and CI committed Jan 3, 2014
1 parent e04ccc3 commit eb65af3
Showing 1 changed file with 74 additions and 32 deletions.
106 changes: 74 additions & 32 deletions util-core/src/main/scala/com/twitter/concurrent/Scheduler.scala
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
package com.twitter.concurrent

import java.util.ArrayDeque
import java.util.concurrent.Executors
import java.util.concurrent.{Executors, ExecutorService, ThreadFactory}
import java.util.concurrent.atomic.AtomicInteger
import management.ManagementFactory
import scala.util.Random

Expand All @@ -16,7 +17,7 @@ trait Scheduler {
* work to do.
*/
def flush()

// A note on Hotspot's ThreadMXBean's CPU time. On Linux, this
// uses clock_gettime[1] which should both be fast and accurate.
//
Expand All @@ -26,10 +27,10 @@ trait Scheduler {

/** The amount of User time that's been scheduled as per ThreadMXBean. */
def usrTime: Long

/** The amount of CPU time that's been scheduled as per ThreadMXBean */
def cpuTime: Long

/** Number of dispatches performed by this scheduler. */
def numDispatches: Long
}
Expand All @@ -39,7 +40,7 @@ trait Scheduler {
*/
object Scheduler extends Scheduler {
@volatile private var self: Scheduler = new LocalScheduler

def apply(): Scheduler = self

// Note: This can be unsafe since some schedulers may be active,
Expand Down Expand Up @@ -70,17 +71,17 @@ private class LocalScheduler extends Scheduler {
override def initialValue = null
}

private class Activation extends Scheduler {
private class Activation extends Scheduler {
private[this] var r0, r1, r2: Runnable = null
private[this] val rs = new ArrayDeque[Runnable]
private[this] var running = false
private[this] val rng = new Random

// This is safe: there's only one updater.
@volatile var usrTime = 0L
@volatile var cpuTime = 0L
@volatile var numDispatches = 0L

def submit(r: Runnable) {
assert(r != null)
if (r0 == null) r0 = r
Expand Down Expand Up @@ -139,44 +140,47 @@ private class LocalScheduler extends Scheduler {
// Scheduler implementation:
def submit(r: Runnable) = get().submit(r)
def flush() = get().flush()

def usrTime = (activations map (_.usrTime)).sum
def cpuTime = (activations map (_.cpuTime)).sum
def numDispatches = (activations map (_.numDispatches)).sum
}

/**
* A scheduler that dispatches directly to an underlying Java
* cached threadpool executor.
*/
class ThreadPoolScheduler(name: String) extends Scheduler {
private[this] val bean = ManagementFactory.getThreadMXBean()
trait ExecutorScheduler { self: Scheduler =>
val name: String
val executorFactory: ThreadFactory => ExecutorService

protected[this] val bean = ManagementFactory.getThreadMXBean()
protected val threadGroup: ThreadGroup = new ThreadGroup(name)
@volatile private[this] var threads = Set[Thread]()

private[this] val threadFactory =
new NamedPoolThreadFactory(name, true/*daemons*/) {
override def newThread(r: Runnable) = {
super.newThread(new Runnable {
def run() {
val t = Thread.currentThread()
synchronized { threads += t }

This comment has been minimized.

Copy link
@adriaanm

adriaanm Jul 21, 2016

Btw, with this removal, I think the @volatile private[this] var threads = Set[Thread]() above is no longer used. Interestingly, this ended up exploiting a scalac bug where you're allowed to have a private[this] member that name-clashes with another non-private one 😁 (that's how I ended up here, because the community build failed when I fixed that in scala/scala#5141).

This comment has been minimized.

Copy link
@adriaanm

adriaanm Jul 21, 2016

(Here's the fork I'm using in the mean time: develop...adriaanm:develop)

This comment has been minimized.

Copy link
@kevinoliver

kevinoliver Jul 21, 2016

Contributor

nice find!

i've got a patch up now to remove the unused variable.

try r.run() finally {
synchronized { threads -= t }
}
}
})
}
protected val threadFactory = new ThreadFactory {
private val n = new AtomicInteger(1)

def newThread(r: Runnable) = {
val thread = new Thread(threadGroup, r, name + "-" + n.getAndIncrement())
thread.setDaemon(true)
thread
}
}

private[this] val executor =
Executors.newCachedThreadPool(threadFactory)
protected def threads() = {
// We add 2x slop here because it's inherently racy to enumerate
// threads. Since this is used only for monitoring purposes, we
// don't try too hard.
val threads = new Array[Thread](threadGroup.activeCount*2)
val n = threadGroup.enumerate(threads)
threads take n
}

protected[this] val executor = executorFactory(threadFactory)

def shutdown() { executor.shutdown() }
def submit(r: Runnable) { executor.submit(r) }
def flush() = ()
def usrTime = {
var sum = 0L
for (t <- threads) {
for (t <- threads()) {
val time = bean.getThreadUserTime(t.getId())
if (time > 0) sum += time
}
Expand All @@ -185,12 +189,50 @@ class ThreadPoolScheduler(name: String) extends Scheduler {

def cpuTime = {
var sum = 0L
for (t <- threads) {
for (t <- threads()) {
val time = bean.getThreadCpuTime(t.getId())
if (time > 0) sum += time
}
sum
}

def numDispatches = -1L // Unsupported

def getExecutor = executor
}

/**
* A scheduler that dispatches directly to an underlying Java
* cached threadpool executor.
*/
class ThreadPoolScheduler(
val name: String,
val executorFactory: ThreadFactory => ExecutorService
) extends Scheduler with ExecutorScheduler {
def this(name: String) = this(name, Executors.newCachedThreadPool(_))
}

/**
* A scheduler that will bridge tasks from outside into the executor threads,
* while keeping all local tasks on their local threads.
*/
class BridgedThreadPoolScheduler(
val name: String,
val executorFactory: ThreadFactory => ExecutorService
) extends Scheduler with ExecutorScheduler {
private[this] val local = new LocalScheduler

def this(name: String) = this(name, Executors.newCachedThreadPool(_))

override def submit(r: Runnable) {
if (Thread.currentThread.getThreadGroup == threadGroup)
local.submit(r)
else
executor.submit(new Runnable {
def run() {
BridgedThreadPoolScheduler.this.submit(r)
}
})
}
}

0 comments on commit eb65af3

Please sign in to comment.