From 66c926aae504efe13d4f149fbc931ab71a634296 Mon Sep 17 00:00:00 2001 From: Ian O Connell Date: Mon, 5 May 2014 14:57:15 -0700 Subject: [PATCH 01/12] Add initial write-through cache benchmark --- project/Build.scala | 13 ++- .../twitter/storehaus/caliper/Runner.scala | 15 ++++ .../caliper/WriteThroughCacheBenchmark.scala | 88 +++++++++++++++++++ 3 files changed, 115 insertions(+), 1 deletion(-) create mode 100644 storehaus-caliper/src/main/scala/com/twitter/storehaus/caliper/Runner.scala create mode 100644 storehaus-caliper/src/main/scala/com/twitter/storehaus/caliper/WriteThroughCacheBenchmark.scala diff --git a/project/Build.scala b/project/Build.scala index 9896ce42..cceaa4fb 100644 --- a/project/Build.scala +++ b/project/Build.scala @@ -116,7 +116,7 @@ object StorehausBuild extends Build { .filterNot(unreleasedModules.contains(_)) .map { s => "com.twitter" % ("storehaus-" + s + "_2.9.3") % "0.9.0" } - val algebirdVersion = "0.5.0" + val algebirdVersion = "0.6.0" val bijectionVersion = "0.6.2" val utilVersion = "6.11.0" val scaldingVersion = "0.9.0rc15" @@ -265,4 +265,15 @@ object StorehausBuild extends Build { withCross("com.twitter" %% "util-core" % utilVersion)) ) ) + + lazy val storehausCaliper = module("caliper").settings( + libraryDependencies ++= Seq("com.google.caliper" % "caliper" % "0.5-rc1", + "com.google.code.java-allocation-instrumenter" % "java-allocation-instrumenter" % "2.0", + "com.google.code.gson" % "gson" % "1.7.1", + "com.twitter" %% "bijection-core" % bijectionVersion, + "com.twitter" %% "algebird-core" % algebirdVersion), + javaOptions in run <++= (fullClasspath in Runtime) map { cp => Seq("-cp", sbt.Build.data(cp).mkString(":")) }, + fork in run := true + ).dependsOn(storehausCore, storehausAlgebra, storehausCache) + } diff --git a/storehaus-caliper/src/main/scala/com/twitter/storehaus/caliper/Runner.scala b/storehaus-caliper/src/main/scala/com/twitter/storehaus/caliper/Runner.scala new file mode 100644 index 00000000..3eac1e87 --- /dev/null +++ b/storehaus-caliper/src/main/scala/com/twitter/storehaus/caliper/Runner.scala @@ -0,0 +1,15 @@ +package com.twiter.storehaus.caliper + +import com.google.caliper.{Runner => CaliperMain} +import com.google.common.collect.ObjectArrays.concat + +import java.io.PrintWriter + +object Runner { + + def main(args: Array[String]) { + CaliperMain.main(args) + } + +} + diff --git a/storehaus-caliper/src/main/scala/com/twitter/storehaus/caliper/WriteThroughCacheBenchmark.scala b/storehaus-caliper/src/main/scala/com/twitter/storehaus/caliper/WriteThroughCacheBenchmark.scala new file mode 100644 index 00000000..aff26efe --- /dev/null +++ b/storehaus-caliper/src/main/scala/com/twitter/storehaus/caliper/WriteThroughCacheBenchmark.scala @@ -0,0 +1,88 @@ +package com.twitter.storehaus.caliper + +import com.google.caliper.{Param, SimpleBenchmark} +import com.twitter.algebird.HyperLogLogMonoid +import com.twitter.bijection._ +import com.twitter.storehaus._ +import com.twitter.conversions.time._ +import com.twitter.algebird._ +import java.nio.ByteBuffer +import scala.math.pow +import com.twitter.storehaus.algebra._ +import com.twitter.util.{Await, Future} + +class DelayedStore[K, V](val self: Store[K, V])(implicit timer: com.twitter.util.Timer) extends StoreProxy[K, V] { + override def put(kv: (K, Option[V])): Future[Unit] = { + Thread.sleep(5) + self.put(kv) + } + + override def get(kv: K): Future[Option[V]] = { + Thread.sleep(5) + self.get(kv) + } + + override def multiGet[K1 <: K](ks: Set[K1]): Map[K1, Future[Option[V]]] = + self.multiGet(ks).map { case (k,v) => + (k, v.delayed(10.milliseconds)) + } + + override def multiPut[K1 <: K](kvs: Map[K1, Option[V]]): Map[K1, Future[Unit]] = + self.multiPut(kvs).map { case (k,v) => + (k, v.delayed(10.milliseconds)) + } +} + +class WriteThroughCacheBenchmark extends SimpleBenchmark { + implicit val custTimer = new com.twitter.util.ScheduledThreadPoolTimer(20) + import StoreAlgebra._ + + implicit val hllMonoid = new HyperLogLogMonoid(14) + + @Param(Array("100", "1000", "10000")) + val numInputKeys: Int = 0 + + @Param(Array("10000", "100000")) + val numElements: Int = 0 + + var inputData: Seq[Map[Long, HLL]] = _ + + var store: MergeableStore[Long, HLL] = _ + + var noCacheStore: MergeableStore[Long, HLL] = _ + + override def setUp { + val rng = new scala.util.Random(3) + val byteEncoder = implicitly[Injection[Long, Array[Byte]]] + def setSize = rng.nextInt(10) + 1 // 1 -> 10 + def hll(elements: Set[Long]): HLL = hllMonoid.batchCreate(elements)(byteEncoder) + val inputIntermediate = (0L until numElements).map {_ => + val setElements = (0 until setSize).map{_ => rng.nextInt(1000).toLong}.toSet + (pow(numInputKeys, rng.nextFloat).toLong, hll(setElements)) + }.grouped(500) + + inputData = inputIntermediate.map(s => MapAlgebra.sumByKey(s)).toSeq + + val delayedStore = new DelayedStore(new ConcurrentHashMapStore[Long, HLL]) + store = (new WriteThroughStore(delayedStore, new ConcurrentHashMapStore[Long, HLL], true)).toMergeable + noCacheStore = delayedStore.toMergeable + } + + def timeDoUpdates(reps: Int): Int = { + var dummy = 0 + while (dummy < reps) { + inputData.foreach(d => Await.result(FutureOps.mapCollect(store.multiMerge(d)))) + dummy += 1 + } + dummy + } + + def timeDoUpdatesWithoutCache(reps: Int): Int = { + var dummy = 0 + while (dummy < reps) { + inputData.foreach(d => Await.result(FutureOps.mapCollect(noCacheStore.multiMerge(d)))) + dummy += 1 + } + dummy + } +} From 3c29b9dedec6cf4c233245c01a6d4b0af443c2da Mon Sep 17 00:00:00 2001 From: Ian O Connell Date: Tue, 13 May 2014 13:45:21 -0700 Subject: [PATCH 02/12] WIP --- project/Build.scala | 10 +- project/plugins.sbt | 2 + .../storehaus/algebra/HHFilteredStore.scala | 173 ++++++++++++++++++ .../twitter/storehaus/caliper/Runner.scala | 16 +- .../caliper/WriteThroughCacheBenchmark.scala | 32 ++-- 5 files changed, 209 insertions(+), 24 deletions(-) create mode 100644 storehaus-algebra/src/main/scala/com/twitter/storehaus/algebra/HHFilteredStore.scala diff --git a/project/Build.scala b/project/Build.scala index cceaa4fb..7369a925 100644 --- a/project/Build.scala +++ b/project/Build.scala @@ -21,6 +21,9 @@ import Keys._ import spray.boilerplate.BoilerplatePlugin.Boilerplate import com.typesafe.tools.mima.plugin.MimaPlugin.mimaDefaultSettings import com.typesafe.tools.mima.plugin.MimaKeys.previousArtifact +import sbtassembly.Plugin._ +import AssemblyKeys._ + object StorehausBuild extends Build { def withCross(dep: ModuleID) = @@ -35,7 +38,7 @@ object StorehausBuild extends Build { case version if version startsWith "2.10" => "org.specs2" %% "specs2" % "1.13" % "test" } val extraSettings = - Project.defaultSettings ++ Boilerplate.settings ++ mimaDefaultSettings + Project.defaultSettings ++ Boilerplate.settings ++ assemblySettings ++ mimaDefaultSettings def ciSettings: Seq[Project.Setting[_]] = if (sys.env.getOrElse("TRAVIS", "false").toBoolean) Seq( @@ -271,9 +274,8 @@ object StorehausBuild extends Build { "com.google.code.java-allocation-instrumenter" % "java-allocation-instrumenter" % "2.0", "com.google.code.gson" % "gson" % "1.7.1", "com.twitter" %% "bijection-core" % bijectionVersion, - "com.twitter" %% "algebird-core" % algebirdVersion), - javaOptions in run <++= (fullClasspath in Runtime) map { cp => Seq("-cp", sbt.Build.data(cp).mkString(":")) }, - fork in run := true + "com.twitter" %% "algebird-core" % algebirdVersion, + javaOptions in run <++= (fullClasspath in Runtime) map { cp => Seq("-cp", sbt.Build.data(cp).mkString(":")) } ).dependsOn(storehausCore, storehausAlgebra, storehausCache) } diff --git a/project/plugins.sbt b/project/plugins.sbt index 4885d18b..46d15ea6 100644 --- a/project/plugins.sbt +++ b/project/plugins.sbt @@ -8,3 +8,5 @@ addSbtPlugin("com.typesafe.sbt" % "sbt-ghpages" % "0.5.1") addSbtPlugin("com.typesafe" % "sbt-mima-plugin" % "0.1.6") addSbtPlugin("io.spray" % "sbt-boilerplate" % "0.5.1") + +addSbtPlugin("com.eed3si9n" % "sbt-assembly" % "0.11.2") \ No newline at end of file diff --git a/storehaus-algebra/src/main/scala/com/twitter/storehaus/algebra/HHFilteredStore.scala b/storehaus-algebra/src/main/scala/com/twitter/storehaus/algebra/HHFilteredStore.scala new file mode 100644 index 00000000..acc42c59 --- /dev/null +++ b/storehaus-algebra/src/main/scala/com/twitter/storehaus/algebra/HHFilteredStore.scala @@ -0,0 +1,173 @@ +/* + * Copyright 2013 Twitter Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. You may obtain + * a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.twitter.storehaus.algebra + +import com.twitter.algebird.{ Semigroup, Monoid, Group, CMSHash } +import com.twitter.storehaus.{StoreProxy, Store} +import com.twitter.util.Future + +import scala.collection.mutable.ListBuffer + + + +// The update frequency is how often we should update the mutable CMS +// other steps will just query the pre-established HH's +// This will only kick in after the first 1000 tuples since a Roll Over +case class UpdateFrequency(toInt: Int) + +// This is after how many entries we will reset the CMS +// This is to account for temporal changes in the HH's +case class RollOverFrequency(toLong: Long) + +// The heavy hitters percent is used to control above what % of items we should send to the backing +// aggregator +case class HeavyHittersPercent(toFloat: Float) + + +class ApproxHHTracker[K](hhPct: HeavyHittersPercent, updateFreq: UpdateFrequency, roFreq: RollOverFrequency) { + private[this] final val WIDTH = 1000 + private[this] final val DEPTH = 4 + private[this] final val hh = new java.util.HashMap[K, Long]() + private[this] final var totalCount = 0L + private[this] final var hhMinReq = 0L + private[this] final val hhPercent = hhPct.toFloat + private[this] final val updateFrequency = updateFreq.toInt + private[this] final val rollOverFrequency = roFreq.toLong + private[this] final var countsTable = Array.fill(WIDTH * DEPTH)(0L) + + private[this] final val hashes: IndexedSeq[CMSHash] = { + val r = new scala.util.Random(5) + (0 until DEPTH).map { _ => CMSHash(r.nextInt, 0, WIDTH) } + }.toIndexedSeq + + @inline + private[this] final def frequencyEst(item : Long): Long = { + var min = Long.MaxValue + var indx = 0 + while (indx < DEPTH) { + val newVal = countsTable(indx*WIDTH + hashes(indx)(item)) + if(newVal < min) min = newVal + indx += 1 + } + min + } + + + // Update functions in the write path + // a synchronized guard should be used around these + // to ensure consistent updates to backing data structures + @inline + private[this] final def updateItem(item: K) { + val itemHashCode = item.hashCode + totalCount += 1L + hhMinReq = (hhPercent * totalCount).toLong + var indx = 0 + while (indx < DEPTH) { + val offset = indx*WIDTH + hashes(indx)(itemHashCode) + countsTable.update(offset, countsTable(offset) + 1L) + indx += 1 + } + + updateHH(item, itemHashCode) + } + + @inline + private[this] final def updateHH(item : K, itemHashCode: Int) { + @inline + def pruneHH { + val iter = hh.values.iterator + while(iter.hasNext) { + val n = iter.next + if(n < hhMinReq) { + iter.remove + } + } + } + + if(hh.containsKey(item)) { + val v = hh.get(item) + val newItemCount = v + 1L + if (newItemCount < hhMinReq) { + pruneHH + } else { + hh.put(item, newItemCount) + } + } else { + val newItemCount = frequencyEst(itemHashCode) + 1L + if (newItemCount >= hhMinReq) { + hh.put(item, totalCount) + pruneHH + } + } + } + + // We include the ability to reset the CMS so we can age our counters + // over time + private[this] def resetCMS { + hh.clear + totalCount = 0L + hhMinReq = 0L + countsTable = Array.fill(WIDTH * DEPTH)(0L) + } + // End of thread-unsafe update steps + + private[this] final val updateStep = new java.util.concurrent.atomic.AtomicLong(0L) + + final def hhFilter(t: K): Boolean = { + // This is the entry point from the iterator into our CMS implementation + // We only update on certain steps < a threshold and on every nTh step. + + // We only acquire locks/synchronize when hitting the update/write path. + // most passes into this function will just hit the final line(containsKey). + // which is our thread safe read path. + val newCounter = updateStep.incrementAndGet + if (newCounter > rollOverFrequency) { + hh.synchronized { + updateStep.set(1L) + resetCMS + } + } + if(newCounter < 1000L || newCounter % updateFrequency == 0L) { + hh.synchronized { + updateItem(t) + } + } + hh.containsKey(t) + } + + final def query(t: K): Boolean = hh.containsKey(t) + +} + + +class HHFilteredStore[K, V](val self: Store[K, V]) extends StoreProxy[K, V] { + private[this] val approxTracker = new ApproxHHTracker[K](HeavyHittersPercent(0.01f), UpdateFrequency(2), RollOverFrequency(10000000L)) + + override def put(kv: (K, Option[V])): Future[Unit] = if(approxTracker.hhFilter(kv._1) || !kv._2.isDefined) self.put(kv) else Future.Unit + + override def get(k: K): Future[Option[V]] = if(approxTracker.query(k)) self.get(k) else Future.None + + override def multiGet[K1 <: K](ks: Set[K1]): Map[K1, Future[Option[V]]] = { + val backed = self.multiGet(ks.filter(k => approxTracker.query(k))) + ks.map { k: K1 => (k, backed.getOrElse(k, Future.None)) }(collection.breakOut) + } + + override def multiPut[K1 <: K](kvs: Map[K1, Option[V]]): Map[K1, Future[Unit]] = { + val backed = self.multiPut(kvs.filter(kv => (!kv._2.isDefined || approxTracker.hhFilter(kv._1)))) + kvs.map { kv => (kv._1, backed.getOrElse(kv._1, Future.Unit)) }(collection.breakOut) + } +} \ No newline at end of file diff --git a/storehaus-caliper/src/main/scala/com/twitter/storehaus/caliper/Runner.scala b/storehaus-caliper/src/main/scala/com/twitter/storehaus/caliper/Runner.scala index 3eac1e87..e2febfb5 100644 --- a/storehaus-caliper/src/main/scala/com/twitter/storehaus/caliper/Runner.scala +++ b/storehaus-caliper/src/main/scala/com/twitter/storehaus/caliper/Runner.scala @@ -1,4 +1,4 @@ -package com.twiter.storehaus.caliper +package com.twitter.storehaus.caliper import com.google.caliper.{Runner => CaliperMain} import com.google.common.collect.ObjectArrays.concat @@ -11,5 +11,17 @@ object Runner { CaliperMain.main(args) } -} + // def main(args: Array[String]) { + // val th = if(args.size > 0 && args(0) == "warmed") com.ichi.bench.Thyme.warmed(verbose = print) else new com.ichi.bench.Thyme + // val fn = new WriteThroughCacheBenchmark + // fn.numInputKeys = 500 + // fn.numElements = 100000 + // println("Starting set up") + // fn.setUp + // println("Starting benchmark") + // println(th.pbench(fn.timeDoUpdates(10))) + // println("Finished benchmark") + // System.exit(1) + // } +} diff --git a/storehaus-caliper/src/main/scala/com/twitter/storehaus/caliper/WriteThroughCacheBenchmark.scala b/storehaus-caliper/src/main/scala/com/twitter/storehaus/caliper/WriteThroughCacheBenchmark.scala index aff26efe..604d03a9 100644 --- a/storehaus-caliper/src/main/scala/com/twitter/storehaus/caliper/WriteThroughCacheBenchmark.scala +++ b/storehaus-caliper/src/main/scala/com/twitter/storehaus/caliper/WriteThroughCacheBenchmark.scala @@ -13,12 +13,12 @@ import com.twitter.util.{Await, Future} class DelayedStore[K, V](val self: Store[K, V])(implicit timer: com.twitter.util.Timer) extends StoreProxy[K, V] { override def put(kv: (K, Option[V])): Future[Unit] = { - Thread.sleep(5) + Thread.sleep(10) self.put(kv) } override def get(kv: K): Future[Option[V]] = { - Thread.sleep(5) + Thread.sleep(10) self.get(kv) } @@ -40,10 +40,10 @@ class WriteThroughCacheBenchmark extends SimpleBenchmark { implicit val hllMonoid = new HyperLogLogMonoid(14) @Param(Array("100", "1000", "10000")) - val numInputKeys: Int = 0 + var numInputKeys: Int = 0 @Param(Array("10000", "100000")) - val numElements: Int = 0 + var numElements: Int = 0 var inputData: Seq[Map[Long, HLL]] = _ @@ -59,30 +59,26 @@ class WriteThroughCacheBenchmark extends SimpleBenchmark { val inputIntermediate = (0L until numElements).map {_ => val setElements = (0 until setSize).map{_ => rng.nextInt(1000).toLong}.toSet (pow(numInputKeys, rng.nextFloat).toLong, hll(setElements)) - }.grouped(500) + }.grouped(20) inputData = inputIntermediate.map(s => MapAlgebra.sumByKey(s)).toSeq val delayedStore = new DelayedStore(new ConcurrentHashMapStore[Long, HLL]) - store = (new WriteThroughStore(delayedStore, new ConcurrentHashMapStore[Long, HLL], true)).toMergeable + store = (new WriteThroughStore(delayedStore, new HHFilteredStore(new ConcurrentHashMapStore[Long, HLL]), true)).toMergeable noCacheStore = delayedStore.toMergeable } def timeDoUpdates(reps: Int): Int = { - var dummy = 0 - while (dummy < reps) { - inputData.foreach(d => Await.result(FutureOps.mapCollect(store.multiMerge(d)))) - dummy += 1 - } - dummy + Await.result(Future.collect((0 until reps).flatMap { indx => + inputData.map(d => FutureOps.mapCollect(store.multiMerge(d)).unit) + }.toSeq)) + 12 } def timeDoUpdatesWithoutCache(reps: Int): Int = { - var dummy = 0 - while (dummy < reps) { - inputData.foreach(d => Await.result(FutureOps.mapCollect(noCacheStore.multiMerge(d)))) - dummy += 1 - } - dummy + Await.result(Future.collect((0 until reps).flatMap { indx => + inputData.map(d => FutureOps.mapCollect(noCacheStore.multiMerge(d)).unit) + }.toSeq)) + 12 } } From 1fbfac7a61ddd53320b7571774f89cee0c5252f8 Mon Sep 17 00:00:00 2001 From: Ian O Connell Date: Tue, 13 May 2014 13:46:43 -0700 Subject: [PATCH 03/12] tmp --- project/Build.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/project/Build.scala b/project/Build.scala index 7369a925..a1fa192b 100644 --- a/project/Build.scala +++ b/project/Build.scala @@ -274,7 +274,7 @@ object StorehausBuild extends Build { "com.google.code.java-allocation-instrumenter" % "java-allocation-instrumenter" % "2.0", "com.google.code.gson" % "gson" % "1.7.1", "com.twitter" %% "bijection-core" % bijectionVersion, - "com.twitter" %% "algebird-core" % algebirdVersion, + "com.twitter" %% "algebird-core" % algebirdVersion), javaOptions in run <++= (fullClasspath in Runtime) map { cp => Seq("-cp", sbt.Build.data(cp).mkString(":")) } ).dependsOn(storehausCore, storehausAlgebra, storehausCache) From 6d4c0a5f6d2f8fb19e2d8d038913ce3e1274c9f1 Mon Sep 17 00:00:00 2001 From: Ian O Connell Date: Tue, 3 Jun 2014 11:54:49 -0700 Subject: [PATCH 04/12] Do not prune on each write operation of the mutable TTL cache --- .../twitter/storehaus/cache/MutableTTLCache.scala | 12 ++++++++++-- 1 file changed, 10 insertions(+), 2 deletions(-) diff --git a/storehaus-cache/src/main/scala/com/twitter/storehaus/cache/MutableTTLCache.scala b/storehaus-cache/src/main/scala/com/twitter/storehaus/cache/MutableTTLCache.scala index 9088f4a9..8f666572 100644 --- a/storehaus-cache/src/main/scala/com/twitter/storehaus/cache/MutableTTLCache.scala +++ b/storehaus-cache/src/main/scala/com/twitter/storehaus/cache/MutableTTLCache.scala @@ -31,10 +31,18 @@ object MutableTTLCache { } class MutableTTLCache[K, V](val ttl: Duration, protected val backingCache: MutableCache[K, (Long, V)])(val clock: () => Long) extends MutableCache[K, V] { - def get(k: K) = backingCache.get(k).filter(_._1 > clock()).map(_._2) + private[this] val putsSincePrune = new java.util.concurrent.atomic.AtomicInteger(1) + + def get(k: K) = { + val clockVal = clock() + backingCache.get(k).filter(_._1 > clockVal).map(_._2) + } def +=(kv: (K, V)): this.type = { - removeExpired + if(putsSincePrune.getAndIncrement % 1000 == 0) { + removeExpired + putsSincePrune.set(1) + } backingCache += (kv._1, (clock() + ttl.inMilliseconds, kv._2)) this } From 4a1b9af6b183fa86c35fb5c355f2a5776c385a4a Mon Sep 17 00:00:00 2001 From: Ian O Connell Date: Tue, 3 Jun 2014 13:26:26 -0700 Subject: [PATCH 05/12] Optimize + refactor from real usage --- .../storehaus/algebra/HHFilteredStore.scala | 76 +++++++++++-------- 1 file changed, 44 insertions(+), 32 deletions(-) diff --git a/storehaus-algebra/src/main/scala/com/twitter/storehaus/algebra/HHFilteredStore.scala b/storehaus-algebra/src/main/scala/com/twitter/storehaus/algebra/HHFilteredStore.scala index acc42c59..95bb1b1f 100644 --- a/storehaus-algebra/src/main/scala/com/twitter/storehaus/algebra/HHFilteredStore.scala +++ b/storehaus-algebra/src/main/scala/com/twitter/storehaus/algebra/HHFilteredStore.scala @@ -20,34 +20,40 @@ import com.twitter.algebird.{ Semigroup, Monoid, Group, CMSHash } import com.twitter.storehaus.{StoreProxy, Store} import com.twitter.util.Future -import scala.collection.mutable.ListBuffer - - // The update frequency is how often we should update the mutable CMS -// other steps will just query the pre-established HH's -// This will only kick in after the first 1000 tuples since a Roll Over -case class UpdateFrequency(toInt: Int) +case class WriteOperationUpdateFrequency(toInt: Int) +object WriteOperationUpdateFrequency { + def default = WriteOperationUpdateFrequency(100) // update 1% of the time +} -// This is after how many entries we will reset the CMS -// This is to account for temporal changes in the HH's -case class RollOverFrequency(toLong: Long) +// This is how often in MS to roll over the CMS +case class RollOverFrequencyMS(toLong: Long) + +object RollOverFrequencyMS { + def default = RollOverFrequencyMS(3600 * 1000L) // 1 Hour +} // The heavy hitters percent is used to control above what % of items we should send to the backing // aggregator case class HeavyHittersPercent(toFloat: Float) +object HeavyHittersPercent { + def default = HeavyHittersPercent(0.001f) // 0.1% of the time +} -class ApproxHHTracker[K](hhPct: HeavyHittersPercent, updateFreq: UpdateFrequency, roFreq: RollOverFrequency) { +class ApproxHHTracker[K](hhPct: HeavyHittersPercent, updateFreq: WriteOperationUpdateFrequency, roFreq: RollOverFrequencyMS) { private[this] final val WIDTH = 1000 private[this] final val DEPTH = 4 private[this] final val hh = new java.util.HashMap[K, Long]() private[this] final var totalCount = 0L private[this] final var hhMinReq = 0L private[this] final val hhPercent = hhPct.toFloat - private[this] final val updateFrequency = updateFreq.toInt + private[this] final val updateOpsFrequency = updateFreq.toInt private[this] final val rollOverFrequency = roFreq.toLong private[this] final var countsTable = Array.fill(WIDTH * DEPTH)(0L) + private[this] var nextRollOver: Long = System.currentTimeMillis + roFreq.toLong + private[this] final val updateOps = new java.util.concurrent.atomic.AtomicInteger(0) private[this] final val hashes: IndexedSeq[CMSHash] = { val r = new scala.util.Random(5) @@ -110,7 +116,6 @@ class ApproxHHTracker[K](hhPct: HeavyHittersPercent, updateFreq: UpdateFrequency val newItemCount = frequencyEst(itemHashCode) + 1L if (newItemCount >= hhMinReq) { hh.put(item, totalCount) - pruneHH } } } @@ -122,52 +127,59 @@ class ApproxHHTracker[K](hhPct: HeavyHittersPercent, updateFreq: UpdateFrequency totalCount = 0L hhMinReq = 0L countsTable = Array.fill(WIDTH * DEPTH)(0L) + updateOps.set(1) + nextRollOver = System.currentTimeMillis + roFreq.toLong } // End of thread-unsafe update steps - private[this] final val updateStep = new java.util.concurrent.atomic.AtomicLong(0L) - final def hhFilter(t: K): Boolean = { - // This is the entry point from the iterator into our CMS implementation - // We only update on certain steps < a threshold and on every nTh step. + final def getFilterFunc: K => Boolean = { + val opsCntr = updateOps.incrementAndGet - // We only acquire locks/synchronize when hitting the update/write path. - // most passes into this function will just hit the final line(containsKey). - // which is our thread safe read path. - val newCounter = updateStep.incrementAndGet - if (newCounter > rollOverFrequency) { + if(opsCntr < 100 || opsCntr % updateOpsFrequency == 0) { hh.synchronized { - updateStep.set(1L) - resetCMS + if(System.currentTimeMillis > nextRollOver) { + resetCMS + } + {k: K => + updateItem(k) + hh.containsKey(k) + } } - } - if(newCounter < 1000L || newCounter % updateFrequency == 0L) { - hh.synchronized { - updateItem(t) + } else { + {k: K => + hh.containsKey(k) } } - hh.containsKey(t) } final def query(t: K): Boolean = hh.containsKey(t) - } -class HHFilteredStore[K, V](val self: Store[K, V]) extends StoreProxy[K, V] { - private[this] val approxTracker = new ApproxHHTracker[K](HeavyHittersPercent(0.01f), UpdateFrequency(2), RollOverFrequency(10000000L)) +class HHFilteredStore[K, V](val self: Store[K, V], + hhPercent: HeavyHittersPercent = HeavyHittersPercent.default, + writeUpdateFreq: WriteOperationUpdateFrequency = WriteOperationUpdateFrequency.default, + rolloverFreq: RollOverFrequencyMS = RollOverFrequencyMS.default) extends StoreProxy[K, V] { + private[this] val approxTracker = new ApproxHHTracker[K](hhPercent, writeUpdateFreq, rolloverFreq) override def put(kv: (K, Option[V])): Future[Unit] = if(approxTracker.hhFilter(kv._1) || !kv._2.isDefined) self.put(kv) else Future.Unit override def get(k: K): Future[Option[V]] = if(approxTracker.query(k)) self.get(k) else Future.None + /* + * In the multi get we purely do a lookup to see if its in the heavy hitters to see if we should query the backing cache + */ override def multiGet[K1 <: K](ks: Set[K1]): Map[K1, Future[Option[V]]] = { val backed = self.multiGet(ks.filter(k => approxTracker.query(k))) ks.map { k: K1 => (k, backed.getOrElse(k, Future.None)) }(collection.breakOut) } + /* + * In the Multi-put we test to see which keys we should store + */ override def multiPut[K1 <: K](kvs: Map[K1, Option[V]]): Map[K1, Future[Unit]] = { - val backed = self.multiPut(kvs.filter(kv => (!kv._2.isDefined || approxTracker.hhFilter(kv._1)))) + val backed = self.multiPut(approxTracker.bulkFilter(kvs)) kvs.map { kv => (kv._1, backed.getOrElse(kv._1, Future.Unit)) }(collection.breakOut) } } \ No newline at end of file From c846ab5192f4c0a4bc03c2c6708cbba5bf0c46a5 Mon Sep 17 00:00:00 2001 From: Ian O Connell Date: Tue, 3 Jun 2014 13:48:38 -0700 Subject: [PATCH 06/12] Fix some missed updates --- .../com/twitter/storehaus/algebra/HHFilteredStore.scala | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/storehaus-algebra/src/main/scala/com/twitter/storehaus/algebra/HHFilteredStore.scala b/storehaus-algebra/src/main/scala/com/twitter/storehaus/algebra/HHFilteredStore.scala index 95bb1b1f..cb7f5cec 100644 --- a/storehaus-algebra/src/main/scala/com/twitter/storehaus/algebra/HHFilteredStore.scala +++ b/storehaus-algebra/src/main/scala/com/twitter/storehaus/algebra/HHFilteredStore.scala @@ -163,7 +163,7 @@ class HHFilteredStore[K, V](val self: Store[K, V], rolloverFreq: RollOverFrequencyMS = RollOverFrequencyMS.default) extends StoreProxy[K, V] { private[this] val approxTracker = new ApproxHHTracker[K](hhPercent, writeUpdateFreq, rolloverFreq) - override def put(kv: (K, Option[V])): Future[Unit] = if(approxTracker.hhFilter(kv._1) || !kv._2.isDefined) self.put(kv) else Future.Unit + override def put(kv: (K, Option[V])): Future[Unit] = if(approxTracker.getFilterFunc(kv._1) || !kv._2.isDefined) self.put(kv) else Future.Unit override def get(k: K): Future[Option[V]] = if(approxTracker.query(k)) self.get(k) else Future.None @@ -179,7 +179,8 @@ class HHFilteredStore[K, V](val self: Store[K, V], * In the Multi-put we test to see which keys we should store */ override def multiPut[K1 <: K](kvs: Map[K1, Option[V]]): Map[K1, Future[Unit]] = { - val backed = self.multiPut(approxTracker.bulkFilter(kvs)) + val filterFunc = approxTracker.getFilterFunc + val backed = self.multiPut(kvs.filterKeys(t => filterFunc(t))) kvs.map { kv => (kv._1, backed.getOrElse(kv._1, Future.Unit)) }(collection.breakOut) } } \ No newline at end of file From a2cace7dd90b21ec477c2abd6af1b3374fbc5e9a Mon Sep 17 00:00:00 2001 From: Ian O Connell Date: Wed, 4 Jun 2014 13:40:25 -0700 Subject: [PATCH 07/12] Remove final, for a var And add a comment on the store --- .../com/twitter/storehaus/algebra/HHFilteredStore.scala | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/storehaus-algebra/src/main/scala/com/twitter/storehaus/algebra/HHFilteredStore.scala b/storehaus-algebra/src/main/scala/com/twitter/storehaus/algebra/HHFilteredStore.scala index cb7f5cec..28dcdc37 100644 --- a/storehaus-algebra/src/main/scala/com/twitter/storehaus/algebra/HHFilteredStore.scala +++ b/storehaus-algebra/src/main/scala/com/twitter/storehaus/algebra/HHFilteredStore.scala @@ -51,7 +51,7 @@ class ApproxHHTracker[K](hhPct: HeavyHittersPercent, updateFreq: WriteOperationU private[this] final val hhPercent = hhPct.toFloat private[this] final val updateOpsFrequency = updateFreq.toInt private[this] final val rollOverFrequency = roFreq.toLong - private[this] final var countsTable = Array.fill(WIDTH * DEPTH)(0L) + private[this] var countsTable = Array.fill(WIDTH * DEPTH)(0L) private[this] var nextRollOver: Long = System.currentTimeMillis + roFreq.toLong private[this] final val updateOps = new java.util.concurrent.atomic.AtomicInteger(0) @@ -157,6 +157,9 @@ class ApproxHHTracker[K](hhPct: HeavyHittersPercent, updateFreq: WriteOperationU } +/* + This is a store for using the CMS code above to only store/read values which are heavy hitters in the CMS +*/ class HHFilteredStore[K, V](val self: Store[K, V], hhPercent: HeavyHittersPercent = HeavyHittersPercent.default, writeUpdateFreq: WriteOperationUpdateFrequency = WriteOperationUpdateFrequency.default, From 027b5272666306abb8b5bd12217cfa9c5e909e04 Mon Sep 17 00:00:00 2001 From: Ian O Connell Date: Wed, 4 Jun 2014 13:40:59 -0700 Subject: [PATCH 08/12] Remove commented code --- .../com/twitter/storehaus/caliper/Runner.scala | 13 ------------- 1 file changed, 13 deletions(-) diff --git a/storehaus-caliper/src/main/scala/com/twitter/storehaus/caliper/Runner.scala b/storehaus-caliper/src/main/scala/com/twitter/storehaus/caliper/Runner.scala index e2febfb5..ea25eab5 100644 --- a/storehaus-caliper/src/main/scala/com/twitter/storehaus/caliper/Runner.scala +++ b/storehaus-caliper/src/main/scala/com/twitter/storehaus/caliper/Runner.scala @@ -11,17 +11,4 @@ object Runner { CaliperMain.main(args) } - // def main(args: Array[String]) { - // val th = if(args.size > 0 && args(0) == "warmed") com.ichi.bench.Thyme.warmed(verbose = print) else new com.ichi.bench.Thyme - // val fn = new WriteThroughCacheBenchmark - // fn.numInputKeys = 500 - // fn.numElements = 100000 - // println("Starting set up") - // fn.setUp - // println("Starting benchmark") - // println(th.pbench(fn.timeDoUpdates(10))) - // println("Finished benchmark") - // System.exit(1) - // } - } From 77155fc2141319e5f007b44a969bdd9f49fdbe2d Mon Sep 17 00:00:00 2001 From: Ian O Connell Date: Mon, 16 Jun 2014 09:52:41 -0700 Subject: [PATCH 09/12] Update the pr. Make the CMS a proxy on a cache rather than a store. Add a eager write through store thats simple + fast and will update the cache before we have a confirmed store write. add a test for the Filtered Cache --- .../storehaus/algebra/HHFilteredStore.scala | 175 +-------------- .../twitter/storehaus/cache/CacheProxy.scala | 56 +++++ .../storehaus/cache/HHFilteredCache.scala | 207 ++++++++++++++++++ .../storehaus/cache/MutableCache.scala | 11 + .../storehaus/cache/MutableCacheProxy.scala | 60 +++++ .../storehaus/cache/HHFilteredCacheTest.scala | 59 +++++ .../EagerWriteThroughCacheStore.scala | 60 +++++ 7 files changed, 461 insertions(+), 167 deletions(-) create mode 100644 storehaus-cache/src/main/scala/com/twitter/storehaus/cache/CacheProxy.scala create mode 100644 storehaus-cache/src/main/scala/com/twitter/storehaus/cache/HHFilteredCache.scala create mode 100644 storehaus-cache/src/main/scala/com/twitter/storehaus/cache/MutableCacheProxy.scala create mode 100644 storehaus-cache/src/test/scala/com/twitter/storehaus/cache/HHFilteredCacheTest.scala create mode 100644 storehaus-core/src/main/scala/com/twitter/storehaus/EagerWriteThroughCacheStore.scala diff --git a/storehaus-algebra/src/main/scala/com/twitter/storehaus/algebra/HHFilteredStore.scala b/storehaus-algebra/src/main/scala/com/twitter/storehaus/algebra/HHFilteredStore.scala index 28dcdc37..0df42719 100644 --- a/storehaus-algebra/src/main/scala/com/twitter/storehaus/algebra/HHFilteredStore.scala +++ b/storehaus-algebra/src/main/scala/com/twitter/storehaus/algebra/HHFilteredStore.scala @@ -16,174 +16,15 @@ package com.twitter.storehaus.algebra -import com.twitter.algebird.{ Semigroup, Monoid, Group, CMSHash } -import com.twitter.storehaus.{StoreProxy, Store} +import com.twitter.storehaus.{Store, EagerWriteThroughCacheStore } +import com.twitter.storehaus.cache.MutableCache import com.twitter.util.Future +import com.twitter.storehaus.cache.{MutableCache, HeavyHittersPercent, WriteOperationUpdateFrequency, RollOverFrequencyMS, HHFilteredMutableCache} - -// The update frequency is how often we should update the mutable CMS -case class WriteOperationUpdateFrequency(toInt: Int) -object WriteOperationUpdateFrequency { - def default = WriteOperationUpdateFrequency(100) // update 1% of the time -} - -// This is how often in MS to roll over the CMS -case class RollOverFrequencyMS(toLong: Long) - -object RollOverFrequencyMS { - def default = RollOverFrequencyMS(3600 * 1000L) // 1 Hour -} - -// The heavy hitters percent is used to control above what % of items we should send to the backing -// aggregator -case class HeavyHittersPercent(toFloat: Float) -object HeavyHittersPercent { - def default = HeavyHittersPercent(0.001f) // 0.1% of the time -} - - -class ApproxHHTracker[K](hhPct: HeavyHittersPercent, updateFreq: WriteOperationUpdateFrequency, roFreq: RollOverFrequencyMS) { - private[this] final val WIDTH = 1000 - private[this] final val DEPTH = 4 - private[this] final val hh = new java.util.HashMap[K, Long]() - private[this] final var totalCount = 0L - private[this] final var hhMinReq = 0L - private[this] final val hhPercent = hhPct.toFloat - private[this] final val updateOpsFrequency = updateFreq.toInt - private[this] final val rollOverFrequency = roFreq.toLong - private[this] var countsTable = Array.fill(WIDTH * DEPTH)(0L) - private[this] var nextRollOver: Long = System.currentTimeMillis + roFreq.toLong - private[this] final val updateOps = new java.util.concurrent.atomic.AtomicInteger(0) - - private[this] final val hashes: IndexedSeq[CMSHash] = { - val r = new scala.util.Random(5) - (0 until DEPTH).map { _ => CMSHash(r.nextInt, 0, WIDTH) } - }.toIndexedSeq - - @inline - private[this] final def frequencyEst(item : Long): Long = { - var min = Long.MaxValue - var indx = 0 - while (indx < DEPTH) { - val newVal = countsTable(indx*WIDTH + hashes(indx)(item)) - if(newVal < min) min = newVal - indx += 1 - } - min - } - - - // Update functions in the write path - // a synchronized guard should be used around these - // to ensure consistent updates to backing data structures - @inline - private[this] final def updateItem(item: K) { - val itemHashCode = item.hashCode - totalCount += 1L - hhMinReq = (hhPercent * totalCount).toLong - var indx = 0 - while (indx < DEPTH) { - val offset = indx*WIDTH + hashes(indx)(itemHashCode) - countsTable.update(offset, countsTable(offset) + 1L) - indx += 1 - } - - updateHH(item, itemHashCode) - } - - @inline - private[this] final def updateHH(item : K, itemHashCode: Int) { - @inline - def pruneHH { - val iter = hh.values.iterator - while(iter.hasNext) { - val n = iter.next - if(n < hhMinReq) { - iter.remove - } - } - } - - if(hh.containsKey(item)) { - val v = hh.get(item) - val newItemCount = v + 1L - if (newItemCount < hhMinReq) { - pruneHH - } else { - hh.put(item, newItemCount) - } - } else { - val newItemCount = frequencyEst(itemHashCode) + 1L - if (newItemCount >= hhMinReq) { - hh.put(item, totalCount) - } - } - } - - // We include the ability to reset the CMS so we can age our counters - // over time - private[this] def resetCMS { - hh.clear - totalCount = 0L - hhMinReq = 0L - countsTable = Array.fill(WIDTH * DEPTH)(0L) - updateOps.set(1) - nextRollOver = System.currentTimeMillis + roFreq.toLong +object HHFilteredStore { + def buildStore[K, V](store: Store[K, V], cache: MutableCache[K, Future[Option[V]]], hhPct: HeavyHittersPercent, + writeUpdateFreq: WriteOperationUpdateFrequency, rolloverFreq: RollOverFrequencyMS): Store[K, V] = { + val filteredCacheStore = new HHFilteredMutableCache(cache, hhPct, writeUpdateFreq, rolloverFreq) + new EagerWriteThroughCacheStore[K, V](store, filteredCacheStore) } - // End of thread-unsafe update steps - - - final def getFilterFunc: K => Boolean = { - val opsCntr = updateOps.incrementAndGet - - if(opsCntr < 100 || opsCntr % updateOpsFrequency == 0) { - hh.synchronized { - if(System.currentTimeMillis > nextRollOver) { - resetCMS - } - {k: K => - updateItem(k) - hh.containsKey(k) - } - } - } else { - {k: K => - hh.containsKey(k) - } - } - } - - final def query(t: K): Boolean = hh.containsKey(t) } - - -/* - This is a store for using the CMS code above to only store/read values which are heavy hitters in the CMS -*/ -class HHFilteredStore[K, V](val self: Store[K, V], - hhPercent: HeavyHittersPercent = HeavyHittersPercent.default, - writeUpdateFreq: WriteOperationUpdateFrequency = WriteOperationUpdateFrequency.default, - rolloverFreq: RollOverFrequencyMS = RollOverFrequencyMS.default) extends StoreProxy[K, V] { - private[this] val approxTracker = new ApproxHHTracker[K](hhPercent, writeUpdateFreq, rolloverFreq) - - override def put(kv: (K, Option[V])): Future[Unit] = if(approxTracker.getFilterFunc(kv._1) || !kv._2.isDefined) self.put(kv) else Future.Unit - - override def get(k: K): Future[Option[V]] = if(approxTracker.query(k)) self.get(k) else Future.None - - /* - * In the multi get we purely do a lookup to see if its in the heavy hitters to see if we should query the backing cache - */ - override def multiGet[K1 <: K](ks: Set[K1]): Map[K1, Future[Option[V]]] = { - val backed = self.multiGet(ks.filter(k => approxTracker.query(k))) - ks.map { k: K1 => (k, backed.getOrElse(k, Future.None)) }(collection.breakOut) - } - - /* - * In the Multi-put we test to see which keys we should store - */ - override def multiPut[K1 <: K](kvs: Map[K1, Option[V]]): Map[K1, Future[Unit]] = { - val filterFunc = approxTracker.getFilterFunc - val backed = self.multiPut(kvs.filterKeys(t => filterFunc(t))) - kvs.map { kv => (kv._1, backed.getOrElse(kv._1, Future.Unit)) }(collection.breakOut) - } -} \ No newline at end of file diff --git a/storehaus-cache/src/main/scala/com/twitter/storehaus/cache/CacheProxy.scala b/storehaus-cache/src/main/scala/com/twitter/storehaus/cache/CacheProxy.scala new file mode 100644 index 00000000..d083047b --- /dev/null +++ b/storehaus-cache/src/main/scala/com/twitter/storehaus/cache/CacheProxy.scala @@ -0,0 +1,56 @@ +/* + * Copyright 2013 Twitter Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. You may obtain + * a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.twitter.storehaus.cache + +/** Defines the base of a proxy for a given type. + * A instance of Proxied for type T is intended to use the `self` + * member to forward all methods of an new instance of type T to. + * This allows for extensions of type T which can inherit a proxied + * instance's behavior without needing to override every method of type T. + * + * {{{ + * + * class Dazzle { + * def a: String = "default a" + * def b: String = "default b" + * // ... + * } + * + * // define a reusable concrete proxy statisfying Dazzle forwarding + * // all calls to Proxied method self + * class DazzleProxy(val self: Dazzle) extends Dazzle with Proxied[Dazzle] { + * def a: String = self.a + * def b: String = self.b + * } + * + * val bedazzlable = new Dazzle { + * // return a new Dazzle with some sizzle + * def be(sizzle: String): Dazzle = new DazzleProxy(this) { + * override def b = "%s %s!!!" format(self.b, sizzle) + * } + * } + * + * val dazzled = bedazzlable.be("dazzled") + * dazzled.b // default b dazzled!!! + * dazzled.a // default a + * + * }}} + * + * @author Doug Tangren + */ +trait CacheProxied[T] { + protected def self: T +} \ No newline at end of file diff --git a/storehaus-cache/src/main/scala/com/twitter/storehaus/cache/HHFilteredCache.scala b/storehaus-cache/src/main/scala/com/twitter/storehaus/cache/HHFilteredCache.scala new file mode 100644 index 00000000..00101fc7 --- /dev/null +++ b/storehaus-cache/src/main/scala/com/twitter/storehaus/cache/HHFilteredCache.scala @@ -0,0 +1,207 @@ +/* + * Copyright 2013 Twitter Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. You may obtain + * a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.twitter.storehaus.cache + +import com.twitter.algebird.{ Semigroup, Monoid, Group, CMSHash } +import com.twitter.util.Future + + +// The update frequency is how often we should update the mutable CMS +case class WriteOperationUpdateFrequency(toInt: Int) +object WriteOperationUpdateFrequency { + def default = WriteOperationUpdateFrequency(100) // update 1% of the time +} + +// This is how often in MS to roll over the CMS +case class RollOverFrequencyMS(toLong: Long) + +object RollOverFrequencyMS { + def default = RollOverFrequencyMS(3600 * 1000L) // 1 Hour +} + +// The heavy hitters percent is used to control above what % of items we should send to the backing +// aggregator +case class HeavyHittersPercent(toFloat: Float) +object HeavyHittersPercent { + def default = HeavyHittersPercent(0.001f) // 0.1% of the time +} + +sealed class ApproxHHTracker[K](hhPct: HeavyHittersPercent, updateFreq: WriteOperationUpdateFrequency, roFreq: RollOverFrequencyMS) { + private[this] final val WIDTH = 1000 + private[this] final val DEPTH = 4 + private[this] final val hh = new java.util.HashMap[K, Long]() + private[this] final var totalCount = 0L + private[this] final var hhMinReq = 0L + private[this] final val hhPercent = hhPct.toFloat + private[this] final val updateOpsFrequency = updateFreq.toInt + private[this] final val rollOverFrequency = roFreq.toLong + private[this] var countsTable = Array.fill(WIDTH * DEPTH)(0L) + private[this] var nextRollOver: Long = System.currentTimeMillis + roFreq.toLong + private[this] final val updateOps = new java.util.concurrent.atomic.AtomicInteger(0) + + private[this] final val hashes: IndexedSeq[CMSHash] = { + val r = new scala.util.Random(5) + (0 until DEPTH).map { _ => CMSHash(r.nextInt, 0, WIDTH) } + }.toIndexedSeq + + @inline + private[this] final def frequencyEst(item : Long): Long = { + var min = Long.MaxValue + var indx = 0 + while (indx < DEPTH) { + val tableIdx = indx*WIDTH + hashes(indx)(item) + val newVal = countsTable(tableIdx) + if(newVal < min) min = newVal + indx += 1 + } + min + } + + + // Update functions in the write path + // a synchronized guard should be used around these + // to ensure consistent updates to backing data structures + @inline + private[this] final def updateItem(item: K) { + val itemHashCode = item.hashCode + totalCount += 1L + hhMinReq = (hhPercent * totalCount).toLong + var indx = 0 + while (indx < DEPTH) { + val offset = indx*WIDTH + hashes(indx)(itemHashCode) + countsTable.update(offset, countsTable(offset) + 1L) + indx += 1 + } + + updateHH(item, itemHashCode) + } + + @inline + private[this] final def updateHH(item : K, itemHashCode: Int) { + @inline + def pruneHH { + val iter = hh.values.iterator + while(iter.hasNext) { + val n = iter.next + if(n < hhMinReq) { + iter.remove + } + } + } + + if(hh.containsKey(item)) { + val v = hh.get(item) + val newItemCount = v + 1L + if (newItemCount < hhMinReq) { + pruneHH + } else { + hh.put(item, newItemCount) + } + } else { + val newItemCount = frequencyEst(itemHashCode) // Do not + 1 since we have done that before. + if (newItemCount >= hhMinReq) { + hh.put(item, newItemCount) + } + } + } + + // We include the ability to reset the CMS so we can age our counters + // over time + private[this] def resetCMS { + hh.clear + totalCount = 0L + hhMinReq = 0L + countsTable = Array.fill(WIDTH * DEPTH)(0L) + updateOps.set(1) + nextRollOver = System.currentTimeMillis + roFreq.toLong + } + // End of thread-unsafe update steps + + + final def getFilterFunc: K => Boolean = { + val opsCntr = updateOps.incrementAndGet + + if(opsCntr < 100 || opsCntr % updateOpsFrequency == 0) { + hh.synchronized { + if(System.currentTimeMillis > nextRollOver) { + resetCMS + } + {k: K => + updateItem(k) + hh.containsKey(k) + } + } + } else { + {k: K => + hh.containsKey(k) + } + } + } + + final def clear { + hh.synchronized { + resetCMS + } + } + + final def query(t: K): Boolean = hh.containsKey(t) +} + +/* + This is a store for using the CMS code above to only store/read values which are heavy hitters in the CMS +*/ +class HHFilteredCache[K, V](val self: MutableCache[K, V], + hhPercent: HeavyHittersPercent = HeavyHittersPercent.default, + writeUpdateFreq: WriteOperationUpdateFrequency = WriteOperationUpdateFrequency.default, + rolloverFreq: RollOverFrequencyMS = RollOverFrequencyMS.default) extends MutableCacheProxy[K, V] { + private[this] val approxTracker = new ApproxHHTracker[K](hhPercent, writeUpdateFreq, rolloverFreq) + + override def +=(kv: (K, V)): this.type = + if(approxTracker.getFilterFunc(kv._1)) { + self += kv + this + } else { + this + } + + override def multiInsert(kvs: Map[K, V]): this.type = { + val filterFunc = approxTracker.getFilterFunc + val backed = self.multiInsert(kvs.filterKeys(t => filterFunc(t))) + this + } + + override def hit(k: K): Option[V] = + if(approxTracker.getFilterFunc(k)) { + self.hit(k) + } else { + None + } + + override def clear: this.type = { + self.clear + approxTracker.clear + this + } + + override def iterator: Iterator[(K, V)] = { + self.iterator.filter{kv => approxTracker.query(kv._1)} + } + + override def contains(k: K): Boolean = if(approxTracker.query(k)) self.contains(k) else false + override def get(k: K): Option[V] = if(approxTracker.query(k)) self.get(k) else None + +} \ No newline at end of file diff --git a/storehaus-cache/src/main/scala/com/twitter/storehaus/cache/MutableCache.scala b/storehaus-cache/src/main/scala/com/twitter/storehaus/cache/MutableCache.scala index 357e6f02..dc9f3acf 100644 --- a/storehaus-cache/src/main/scala/com/twitter/storehaus/cache/MutableCache.scala +++ b/storehaus-cache/src/main/scala/com/twitter/storehaus/cache/MutableCache.scala @@ -40,6 +40,12 @@ trait MutableCache[K, V] { */ def get(k: K): Option[V] def +=(kv: (K, V)): this.type + + def multiInsert(kvs: Map[K, V]): this.type = { + kvs.foreach { kv => this.+=(kv) } + this + } + def hit(k: K): Option[V] /* Returns an option of the (potentially) evicted value. */ @@ -65,6 +71,11 @@ trait MutableCache[K, V] { /* Returns the cache with the supplied key evicted. */ def -=(k: K): this.type = { evict(k); this } + def multiRemove(ks: Set[K]): this.type = { + ks.foreach { k => this.-=(k) } + this + } + def getOrElseUpdate(k: K, v: => V): V = hit(k).getOrElse { val realizedV = v diff --git a/storehaus-cache/src/main/scala/com/twitter/storehaus/cache/MutableCacheProxy.scala b/storehaus-cache/src/main/scala/com/twitter/storehaus/cache/MutableCacheProxy.scala new file mode 100644 index 00000000..07a22f70 --- /dev/null +++ b/storehaus-cache/src/main/scala/com/twitter/storehaus/cache/MutableCacheProxy.scala @@ -0,0 +1,60 @@ +/* + * Copyright 2013 Twitter Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. You may obtain + * a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.twitter.storehaus.cache + +import com.twitter.util.{ Future, Time } + +/** Proxy for Mergeables. Methods not overrided in extensions will be forwared to Proxied + * self member */ +trait MutableCacheProxy[K, V] extends MutableCache[K, V] with CacheProxied[MutableCache[K,V]] { + override def get(k: K): Option[V] = self.get(k) + + override def +=(kv: (K, V)): this.type = { + self.+=(kv) + this + } + + override def multiInsert(kvs: Map[K, V]): this.type = { + self.multiInsert(kvs) + this + } + + override def hit(k: K): Option[V] = self.hit(k) + override def evict(k: K): Option[V] = self.evict(k) + override def iterator: Iterator[(K, V)] = self.iterator + override def empty: MutableCache[K, V] = self.empty + + override def clear: this.type = { + self.clear + this + } + + override def contains(k: K): Boolean = self.contains(k) + + override def -=(k: K): this.type = { + self.-=(k) + this + } + + override def multiRemove(ks: Set[K]): this.type = { + self.multiRemove(ks) + this + } + + override def getOrElseUpdate(k: K, v: => V): V = self.getOrElseUpdate(k , v) + override def filter(pred: ((K, V)) => Boolean): MutableCache[K, V] = self.filter(pred) +} \ No newline at end of file diff --git a/storehaus-cache/src/test/scala/com/twitter/storehaus/cache/HHFilteredCacheTest.scala b/storehaus-cache/src/test/scala/com/twitter/storehaus/cache/HHFilteredCacheTest.scala new file mode 100644 index 00000000..08b6c3e4 --- /dev/null +++ b/storehaus-cache/src/test/scala/com/twitter/storehaus/cache/HHFilteredCacheTest.scala @@ -0,0 +1,59 @@ +/* + * Copyright 2013 Twitter Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. You may obtain + * a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.twitter.storehaus.cache + +import org.specs2.mutable._ + +class HHFilteredCacheTest extends Specification { + def checkCache[K, V](pairs: Seq[(K, V)], m: Map[K, V])(implicit cache: MutableCache[K, V]) = { + pairs.foldLeft(cache)(_ += _) + val res = cache.iterator.toMap + cache.clear + res must be_==(m) + } + + "HHFilteredCache works properly" in { + val backingCache = MutableCache.fromJMap[String, Int](new java.util.LinkedHashMap[String, Int]) + + implicit val cache = new HHFilteredCache[String, Int](backingCache, HeavyHittersPercent(0.5f), WriteOperationUpdateFrequency(1), RollOverFrequencyMS(10000000L)) + + checkCache( + Seq("a" -> 1, "b" -> 2), + Map("a" -> 1, "b" -> 2) + ) + + // Ensures the previous clear operations are running + // The output == input + checkCache( + Seq("c" -> 1, "d" -> 2), + Map("c" -> 1, "d" -> 2) + ) + + // Nothing above the 0.5 HH theshold + checkCache( + Seq("a" -> 1, "b" -> 2, "c" -> 3, "d" -> 3, "e" -> 3, "a" -> 1), + Map() + ) + + // Only b should be above the HH threshold + checkCache( + Seq("a" -> 1, "b" -> 2, "b" -> 3, "c" -> 1, "b" -> 3, "a" -> 1), + Map("b" -> 3) + ) + + } +} diff --git a/storehaus-core/src/main/scala/com/twitter/storehaus/EagerWriteThroughCacheStore.scala b/storehaus-core/src/main/scala/com/twitter/storehaus/EagerWriteThroughCacheStore.scala new file mode 100644 index 00000000..dc84d0eb --- /dev/null +++ b/storehaus-core/src/main/scala/com/twitter/storehaus/EagerWriteThroughCacheStore.scala @@ -0,0 +1,60 @@ +/* + * Copyright 2013 Twitter Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. You may obtain + * a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.twitter.storehaus + +import com.twitter.storehaus.cache.MutableCache +import com.twitter.util.{ Future, Return } +import scala.collection.breakOut + +/* + This will eagerly read/write to the backing gage when doing operations against its underlying store. + It will not wait for the return of the network call to update the cache entry. + The cache itself is assumed to be backed by a thread safe implementation. +*/ +class EagerWriteThroughCacheStore[K, V](store: Store[K, V], threadSafeCache: MutableCache[K, Future[Option[V]]]) extends Store[K, V] { + + override def put(kv: (K, Option[V])): Future[Unit] = { + threadSafeCache += (kv._1, Future.value(kv._2)) + store.put(kv) + } + + override def get(k: K): Future[Option[V]] = { + threadSafeCache.get(k).getOrElse { + store.get(k) + } + } + + override def multiGet[K1 <: K](keys: Set[K1]): Map[K1, Future[Option[V]]] = { + val present: Map[K1, Future[Option[V]]] = + keys.map { k => k -> threadSafeCache.get(k) }.collect { case (k, Some(v)) => k -> v }(breakOut) + + val replaced = store.multiGet(keys -- present.keySet) + + replaced.foreach {kv => + threadSafeCache += ((kv._1, kv._2)) + } + + present ++ replaced + } + + override def multiPut[K1 <: K](kvs: Map[K1, Option[V]]): Map[K1, Future[Unit]] = { + kvs.foreach { case (k, optiV) => + threadSafeCache += ((k, Future.value(optiV))) + } + store.multiPut(kvs) + } +} \ No newline at end of file From 4ffaf909f554a81412f3e0f795a05b3a77b75e04 Mon Sep 17 00:00:00 2001 From: Ian O Connell Date: Mon, 16 Jun 2014 09:55:53 -0700 Subject: [PATCH 10/12] Converge the get/put timers in the benchmark --- .../storehaus/caliper/WriteThroughCacheBenchmark.scala | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/storehaus-caliper/src/main/scala/com/twitter/storehaus/caliper/WriteThroughCacheBenchmark.scala b/storehaus-caliper/src/main/scala/com/twitter/storehaus/caliper/WriteThroughCacheBenchmark.scala index 604d03a9..8e26af70 100644 --- a/storehaus-caliper/src/main/scala/com/twitter/storehaus/caliper/WriteThroughCacheBenchmark.scala +++ b/storehaus-caliper/src/main/scala/com/twitter/storehaus/caliper/WriteThroughCacheBenchmark.scala @@ -13,13 +13,11 @@ import com.twitter.util.{Await, Future} class DelayedStore[K, V](val self: Store[K, V])(implicit timer: com.twitter.util.Timer) extends StoreProxy[K, V] { override def put(kv: (K, Option[V])): Future[Unit] = { - Thread.sleep(10) - self.put(kv) + self.put(kv).delayed(10.milliseconds) } override def get(kv: K): Future[Option[V]] = { - Thread.sleep(10) - self.get(kv) + self.get(kv).delayed(10.milliseconds) } override def multiGet[K1 <: K](ks: Set[K1]): Map[K1, Future[Option[V]]] = From 8bdb9413a3b24d4794aa9686af8c57a09704485f Mon Sep 17 00:00:00 2001 From: Ian O Connell Date: Mon, 16 Jun 2014 10:11:20 -0700 Subject: [PATCH 11/12] Fixup not-renamed --- .../scala/com/twitter/storehaus/algebra/HHFilteredStore.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/storehaus-algebra/src/main/scala/com/twitter/storehaus/algebra/HHFilteredStore.scala b/storehaus-algebra/src/main/scala/com/twitter/storehaus/algebra/HHFilteredStore.scala index 0df42719..b3bd18c7 100644 --- a/storehaus-algebra/src/main/scala/com/twitter/storehaus/algebra/HHFilteredStore.scala +++ b/storehaus-algebra/src/main/scala/com/twitter/storehaus/algebra/HHFilteredStore.scala @@ -19,12 +19,12 @@ package com.twitter.storehaus.algebra import com.twitter.storehaus.{Store, EagerWriteThroughCacheStore } import com.twitter.storehaus.cache.MutableCache import com.twitter.util.Future -import com.twitter.storehaus.cache.{MutableCache, HeavyHittersPercent, WriteOperationUpdateFrequency, RollOverFrequencyMS, HHFilteredMutableCache} +import com.twitter.storehaus.cache.{MutableCache, HeavyHittersPercent, WriteOperationUpdateFrequency, RollOverFrequencyMS, HHFilteredCache} object HHFilteredStore { def buildStore[K, V](store: Store[K, V], cache: MutableCache[K, Future[Option[V]]], hhPct: HeavyHittersPercent, writeUpdateFreq: WriteOperationUpdateFrequency, rolloverFreq: RollOverFrequencyMS): Store[K, V] = { - val filteredCacheStore = new HHFilteredMutableCache(cache, hhPct, writeUpdateFreq, rolloverFreq) + val filteredCacheStore = new HHFilteredCache(cache, hhPct, writeUpdateFreq, rolloverFreq) new EagerWriteThroughCacheStore[K, V](store, filteredCacheStore) } } From e343b7a22e0d9ef401d90ae765c2ee811c233a8c Mon Sep 17 00:00:00 2001 From: Ian O Connell Date: Tue, 17 Jun 2014 12:48:03 -0700 Subject: [PATCH 12/12] Review comments. Do not eagerly evaluate clock(), don't need to reset atomic int --- .../com/twitter/storehaus/cache/MutableTTLCache.scala | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/storehaus-cache/src/main/scala/com/twitter/storehaus/cache/MutableTTLCache.scala b/storehaus-cache/src/main/scala/com/twitter/storehaus/cache/MutableTTLCache.scala index 8f666572..44a4aa41 100644 --- a/storehaus-cache/src/main/scala/com/twitter/storehaus/cache/MutableTTLCache.scala +++ b/storehaus-cache/src/main/scala/com/twitter/storehaus/cache/MutableTTLCache.scala @@ -34,14 +34,17 @@ class MutableTTLCache[K, V](val ttl: Duration, protected val backingCache: Mutab private[this] val putsSincePrune = new java.util.concurrent.atomic.AtomicInteger(1) def get(k: K) = { - val clockVal = clock() - backingCache.get(k).filter(_._1 > clockVal).map(_._2) + // Only query the clock if the backing cache returns + // something + backingCache.get(k).filter {kv => + val clockVal = clock() + kv._1 > clockVal + }.map(_._2) } def +=(kv: (K, V)): this.type = { if(putsSincePrune.getAndIncrement % 1000 == 0) { removeExpired - putsSincePrune.set(1) } backingCache += (kv._1, (clock() + ttl.inMilliseconds, kv._2)) this