Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature/write through cache perf #234

Merged
merged 12 commits into from
Jun 17, 2014
17 changes: 15 additions & 2 deletions project/Build.scala
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,9 @@ import Keys._
import spray.boilerplate.BoilerplatePlugin.Boilerplate
import com.typesafe.tools.mima.plugin.MimaPlugin.mimaDefaultSettings
import com.typesafe.tools.mima.plugin.MimaKeys.previousArtifact
import sbtassembly.Plugin._
import AssemblyKeys._


object StorehausBuild extends Build {
def withCross(dep: ModuleID) =
Expand All @@ -35,7 +38,7 @@ object StorehausBuild extends Build {
case version if version startsWith "2.10" => "org.specs2" %% "specs2" % "1.13" % "test"
}
val extraSettings =
Project.defaultSettings ++ Boilerplate.settings ++ mimaDefaultSettings
Project.defaultSettings ++ Boilerplate.settings ++ assemblySettings ++ mimaDefaultSettings

def ciSettings: Seq[Project.Setting[_]] =
if (sys.env.getOrElse("TRAVIS", "false").toBoolean) Seq(
Expand Down Expand Up @@ -116,7 +119,7 @@ object StorehausBuild extends Build {
.filterNot(unreleasedModules.contains(_))
.map { s => "com.twitter" % ("storehaus-" + s + "_2.9.3") % "0.9.0" }

val algebirdVersion = "0.5.0"
val algebirdVersion = "0.6.0"
val bijectionVersion = "0.6.2"
val utilVersion = "6.11.0"
val scaldingVersion = "0.9.0rc15"
Expand Down Expand Up @@ -265,4 +268,14 @@ object StorehausBuild extends Build {
withCross("com.twitter" %% "util-core" % utilVersion))
)
)

lazy val storehausCaliper = module("caliper").settings(
libraryDependencies ++= Seq("com.google.caliper" % "caliper" % "0.5-rc1",
"com.google.code.java-allocation-instrumenter" % "java-allocation-instrumenter" % "2.0",
"com.google.code.gson" % "gson" % "1.7.1",
"com.twitter" %% "bijection-core" % bijectionVersion,
"com.twitter" %% "algebird-core" % algebirdVersion),
javaOptions in run <++= (fullClasspath in Runtime) map { cp => Seq("-cp", sbt.Build.data(cp).mkString(":")) }
).dependsOn(storehausCore, storehausAlgebra, storehausCache)

}
2 changes: 2 additions & 0 deletions project/plugins.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -8,3 +8,5 @@ addSbtPlugin("com.typesafe.sbt" % "sbt-ghpages" % "0.5.1")
addSbtPlugin("com.typesafe" % "sbt-mima-plugin" % "0.1.6")

addSbtPlugin("io.spray" % "sbt-boilerplate" % "0.5.1")

addSbtPlugin("com.eed3si9n" % "sbt-assembly" % "0.11.2")
Original file line number Diff line number Diff line change
@@ -0,0 +1,186 @@
/*
* Copyright 2013 Twitter Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License. You may obtain
* a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.twitter.storehaus.algebra

import com.twitter.algebird.{ Semigroup, Monoid, Group, CMSHash }
import com.twitter.storehaus.{StoreProxy, Store}
import com.twitter.util.Future


// The update frequency is how often we should update the mutable CMS
case class WriteOperationUpdateFrequency(toInt: Int)
object WriteOperationUpdateFrequency {
def default = WriteOperationUpdateFrequency(100) // update 1% of the time
}

// This is how often in MS to roll over the CMS
case class RollOverFrequencyMS(toLong: Long)

object RollOverFrequencyMS {
def default = RollOverFrequencyMS(3600 * 1000L) // 1 Hour
}

// The heavy hitters percent is used to control above what % of items we should send to the backing
// aggregator
case class HeavyHittersPercent(toFloat: Float)
object HeavyHittersPercent {
def default = HeavyHittersPercent(0.001f) // 0.1% of the time
}


class ApproxHHTracker[K](hhPct: HeavyHittersPercent, updateFreq: WriteOperationUpdateFrequency, roFreq: RollOverFrequencyMS) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe in the object, make a convenience method to set the whole thing? Or at least a comment demoing.

private[this] final val WIDTH = 1000
private[this] final val DEPTH = 4
private[this] final val hh = new java.util.HashMap[K, Long]()
private[this] final var totalCount = 0L
private[this] final var hhMinReq = 0L
private[this] final val hhPercent = hhPct.toFloat
private[this] final val updateOpsFrequency = updateFreq.toInt
private[this] final val rollOverFrequency = roFreq.toLong
private[this] final var countsTable = Array.fill(WIDTH * DEPTH)(0L)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why final var?

private[this] var nextRollOver: Long = System.currentTimeMillis + roFreq.toLong
private[this] final val updateOps = new java.util.concurrent.atomic.AtomicInteger(0)

private[this] final val hashes: IndexedSeq[CMSHash] = {
val r = new scala.util.Random(5)
(0 until DEPTH).map { _ => CMSHash(r.nextInt, 0, WIDTH) }
}.toIndexedSeq

@inline
private[this] final def frequencyEst(item : Long): Long = {
var min = Long.MaxValue
var indx = 0
while (indx < DEPTH) {
val newVal = countsTable(indx*WIDTH + hashes(indx)(item))
if(newVal < min) min = newVal
indx += 1
}
min
}


// Update functions in the write path
// a synchronized guard should be used around these
// to ensure consistent updates to backing data structures
@inline
private[this] final def updateItem(item: K) {
val itemHashCode = item.hashCode
totalCount += 1L
hhMinReq = (hhPercent * totalCount).toLong
var indx = 0
while (indx < DEPTH) {
val offset = indx*WIDTH + hashes(indx)(itemHashCode)
countsTable.update(offset, countsTable(offset) + 1L)
indx += 1
}

updateHH(item, itemHashCode)
}

@inline
private[this] final def updateHH(item : K, itemHashCode: Int) {
@inline
def pruneHH {
val iter = hh.values.iterator
while(iter.hasNext) {
val n = iter.next
if(n < hhMinReq) {
iter.remove
}
}
}

if(hh.containsKey(item)) {
val v = hh.get(item)
val newItemCount = v + 1L
if (newItemCount < hhMinReq) {
pruneHH
} else {
hh.put(item, newItemCount)
}
} else {
val newItemCount = frequencyEst(itemHashCode) + 1L
if (newItemCount >= hhMinReq) {
hh.put(item, totalCount)
}
}
}

// We include the ability to reset the CMS so we can age our counters
// over time
private[this] def resetCMS {
hh.clear
totalCount = 0L
hhMinReq = 0L
countsTable = Array.fill(WIDTH * DEPTH)(0L)
updateOps.set(1)
nextRollOver = System.currentTimeMillis + roFreq.toLong
}
// End of thread-unsafe update steps


final def getFilterFunc: K => Boolean = {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

any reason this is not a val?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure, it does a bunch of operations before returning a lambda function

val opsCntr = updateOps.incrementAndGet

if(opsCntr < 100 || opsCntr % updateOpsFrequency == 0) {
hh.synchronized {
if(System.currentTimeMillis > nextRollOver) {
resetCMS
}
{k: K =>
updateItem(k)
hh.containsKey(k)
}
}
} else {
{k: K =>
hh.containsKey(k)
}
}
}

final def query(t: K): Boolean = hh.containsKey(t)
}


class HHFilteredStore[K, V](val self: Store[K, V],
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we add a comment to the effect? Also, I think this could potetially be a standard combinator: KeyFilteredStore, and in the enrichments, .filterKeys( ), right?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not obviously since there is an operation generation step. I can add the comment though

hhPercent: HeavyHittersPercent = HeavyHittersPercent.default,
writeUpdateFreq: WriteOperationUpdateFrequency = WriteOperationUpdateFrequency.default,
rolloverFreq: RollOverFrequencyMS = RollOverFrequencyMS.default) extends StoreProxy[K, V] {
private[this] val approxTracker = new ApproxHHTracker[K](hhPercent, writeUpdateFreq, rolloverFreq)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this this should live outside and you should pass in the filter function.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The filter is a def, so we would need to supply the constructor for a per usage filter function. Not sure I see the gain in the complexity?


override def put(kv: (K, Option[V])): Future[Unit] = if(approxTracker.getFilterFunc(kv._1) || !kv._2.isDefined) self.put(kv) else Future.Unit

override def get(k: K): Future[Option[V]] = if(approxTracker.query(k)) self.get(k) else Future.None
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm guessing these are needed in spite of the multiGet/multiPut implementation below because ProxyStore?


/*
* In the multi get we purely do a lookup to see if its in the heavy hitters to see if we should query the backing cache
*/
override def multiGet[K1 <: K](ks: Set[K1]): Map[K1, Future[Option[V]]] = {
val backed = self.multiGet(ks.filter(k => approxTracker.query(k)))
ks.map { k: K1 => (k, backed.getOrElse(k, Future.None)) }(collection.breakOut)
}

/*
* In the Multi-put we test to see which keys we should store
*/
override def multiPut[K1 <: K](kvs: Map[K1, Option[V]]): Map[K1, Future[Unit]] = {
val filterFunc = approxTracker.getFilterFunc
val backed = self.multiPut(kvs.filterKeys(t => filterFunc(t)))
kvs.map { kv => (kv._1, backed.getOrElse(kv._1, Future.Unit)) }(collection.breakOut)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,10 +31,18 @@ object MutableTTLCache {
}

class MutableTTLCache[K, V](val ttl: Duration, protected val backingCache: MutableCache[K, (Long, V)])(val clock: () => Long) extends MutableCache[K, V] {
def get(k: K) = backingCache.get(k).filter(_._1 > clock()).map(_._2)
private[this] val putsSincePrune = new java.util.concurrent.atomic.AtomicInteger(1)

def get(k: K) = {
val clockVal = clock()
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

there is a difference in this change. Here you are hitting the clock even when the item is absent. Originally, we only check the clock if the item is present. This will make cases of high miss rate more expensive. Can you keep it in the closure, but just make it more verbose there, like add a comment?

backingCache.get(k).filter(_._1 > clockVal).map(_._2)
}

def +=(kv: (K, V)): this.type = {
removeExpired
if(putsSincePrune.getAndIncrement % 1000 == 0) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will making this tunable be of any use?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thats a good question, maybe, but we also maybe have configuration operation overload. Not sure what we would want there.

removeExpired
putsSincePrune.set(1)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is this needed? Might save an atomic op to skip it.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It was there just to avoid the potential overflow eventually since its an atomic int

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

overflow is not a problem is it? You are checking % 1000 == 0 which will be fine for negative numbers:

scala> -1000 % 1000
res0: Int = 0

}
backingCache += (kv._1, (clock() + ttl.inMilliseconds, kv._2))
this
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
package com.twitter.storehaus.caliper

import com.google.caliper.{Runner => CaliperMain}
import com.google.common.collect.ObjectArrays.concat

import java.io.PrintWriter

object Runner {

def main(args: Array[String]) {
CaliperMain.main(args)
}

// def main(args: Array[String]) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

remove commented code, please.

// val th = if(args.size > 0 && args(0) == "warmed") com.ichi.bench.Thyme.warmed(verbose = print) else new com.ichi.bench.Thyme
// val fn = new WriteThroughCacheBenchmark
// fn.numInputKeys = 500
// fn.numElements = 100000
// println("Starting set up")
// fn.setUp
// println("Starting benchmark")
// println(th.pbench(fn.timeDoUpdates(10)))
// println("Finished benchmark")
// System.exit(1)
// }

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
package com.twitter.storehaus.caliper

import com.google.caliper.{Param, SimpleBenchmark}
import com.twitter.algebird.HyperLogLogMonoid
import com.twitter.bijection._
import com.twitter.storehaus._
import com.twitter.conversions.time._
import com.twitter.algebird._
import java.nio.ByteBuffer
import scala.math.pow
import com.twitter.storehaus.algebra._
import com.twitter.util.{Await, Future}

class DelayedStore[K, V](val self: Store[K, V])(implicit timer: com.twitter.util.Timer) extends StoreProxy[K, V] {
override def put(kv: (K, Option[V])): Future[Unit] = {
Thread.sleep(10)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why not delayed here?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would still do, Future.Unit.delayed(10).flatMap { _ => self.put(kv) }

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is the idea here to delay the execution of the future or to delay its completion, or does it not matter which one it is?
Seems like get/put are doing the earlier while multiGet/multiPut the latter?

self.put(kv)
}

override def get(kv: K): Future[Option[V]] = {
Thread.sleep(10)
self.get(kv)
}

override def multiGet[K1 <: K](ks: Set[K1]): Map[K1, Future[Option[V]]] =
self.multiGet(ks).map { case (k,v) =>
(k, v.delayed(10.milliseconds))
}

override def multiPut[K1 <: K](kvs: Map[K1, Option[V]]): Map[K1, Future[Unit]] =
self.multiPut(kvs).map { case (k,v) =>
(k, v.delayed(10.milliseconds))
}
}

class WriteThroughCacheBenchmark extends SimpleBenchmark {
implicit val custTimer = new com.twitter.util.ScheduledThreadPoolTimer(20)
import StoreAlgebra._

implicit val hllMonoid = new HyperLogLogMonoid(14)

@Param(Array("100", "1000", "10000"))
var numInputKeys: Int = 0

@Param(Array("10000", "100000"))
var numElements: Int = 0

var inputData: Seq[Map[Long, HLL]] = _

var store: MergeableStore[Long, HLL] = _

var noCacheStore: MergeableStore[Long, HLL] = _

override def setUp {
val rng = new scala.util.Random(3)
val byteEncoder = implicitly[Injection[Long, Array[Byte]]]
def setSize = rng.nextInt(10) + 1 // 1 -> 10
def hll(elements: Set[Long]): HLL = hllMonoid.batchCreate(elements)(byteEncoder)
val inputIntermediate = (0L until numElements).map {_ =>
val setElements = (0 until setSize).map{_ => rng.nextInt(1000).toLong}.toSet
(pow(numInputKeys, rng.nextFloat).toLong, hll(setElements))
}.grouped(20)

inputData = inputIntermediate.map(s => MapAlgebra.sumByKey(s)).toSeq

val delayedStore = new DelayedStore(new ConcurrentHashMapStore[Long, HLL])
store = (new WriteThroughStore(delayedStore, new HHFilteredStore(new ConcurrentHashMapStore[Long, HLL]), true)).toMergeable
noCacheStore = delayedStore.toMergeable
}

def timeDoUpdates(reps: Int): Int = {
Await.result(Future.collect((0 until reps).flatMap { indx =>
inputData.map(d => FutureOps.mapCollect(store.multiMerge(d)).unit)
}.toSeq))
12
}

def timeDoUpdatesWithoutCache(reps: Int): Int = {
Await.result(Future.collect((0 until reps).flatMap { indx =>
inputData.map(d => FutureOps.mapCollect(noCacheStore.multiMerge(d)).unit)
}.toSeq))
12
}
}