Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature/write through cache perf #234

Merged
merged 12 commits into from
Jun 17, 2014
17 changes: 15 additions & 2 deletions project/Build.scala
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,9 @@ import Keys._
import spray.boilerplate.BoilerplatePlugin.Boilerplate
import com.typesafe.tools.mima.plugin.MimaPlugin.mimaDefaultSettings
import com.typesafe.tools.mima.plugin.MimaKeys.previousArtifact
import sbtassembly.Plugin._
import AssemblyKeys._


object StorehausBuild extends Build {
def withCross(dep: ModuleID) =
Expand All @@ -35,7 +38,7 @@ object StorehausBuild extends Build {
case version if version startsWith "2.10" => "org.specs2" %% "specs2" % "1.13" % "test"
}
val extraSettings =
Project.defaultSettings ++ Boilerplate.settings ++ mimaDefaultSettings
Project.defaultSettings ++ Boilerplate.settings ++ assemblySettings ++ mimaDefaultSettings

def ciSettings: Seq[Project.Setting[_]] =
if (sys.env.getOrElse("TRAVIS", "false").toBoolean) Seq(
Expand Down Expand Up @@ -116,7 +119,7 @@ object StorehausBuild extends Build {
.filterNot(unreleasedModules.contains(_))
.map { s => "com.twitter" % ("storehaus-" + s + "_2.9.3") % "0.9.0" }

val algebirdVersion = "0.5.0"
val algebirdVersion = "0.6.0"
val bijectionVersion = "0.6.2"
val utilVersion = "6.11.0"
val scaldingVersion = "0.9.0rc15"
Expand Down Expand Up @@ -265,4 +268,14 @@ object StorehausBuild extends Build {
withCross("com.twitter" %% "util-core" % utilVersion))
)
)

lazy val storehausCaliper = module("caliper").settings(
libraryDependencies ++= Seq("com.google.caliper" % "caliper" % "0.5-rc1",
"com.google.code.java-allocation-instrumenter" % "java-allocation-instrumenter" % "2.0",
"com.google.code.gson" % "gson" % "1.7.1",
"com.twitter" %% "bijection-core" % bijectionVersion,
"com.twitter" %% "algebird-core" % algebirdVersion),
javaOptions in run <++= (fullClasspath in Runtime) map { cp => Seq("-cp", sbt.Build.data(cp).mkString(":")) }
).dependsOn(storehausCore, storehausAlgebra, storehausCache)

}
2 changes: 2 additions & 0 deletions project/plugins.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -8,3 +8,5 @@ addSbtPlugin("com.typesafe.sbt" % "sbt-ghpages" % "0.5.1")
addSbtPlugin("com.typesafe" % "sbt-mima-plugin" % "0.1.6")

addSbtPlugin("io.spray" % "sbt-boilerplate" % "0.5.1")

addSbtPlugin("com.eed3si9n" % "sbt-assembly" % "0.11.2")
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
/*
* Copyright 2013 Twitter Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License. You may obtain
* a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.twitter.storehaus.algebra

import com.twitter.storehaus.{Store, EagerWriteThroughCacheStore }
import com.twitter.storehaus.cache.MutableCache
import com.twitter.util.Future
import com.twitter.storehaus.cache.{MutableCache, HeavyHittersPercent, WriteOperationUpdateFrequency, RollOverFrequencyMS, HHFilteredCache}

object HHFilteredStore {
def buildStore[K, V](store: Store[K, V], cache: MutableCache[K, Future[Option[V]]], hhPct: HeavyHittersPercent,
writeUpdateFreq: WriteOperationUpdateFrequency, rolloverFreq: RollOverFrequencyMS): Store[K, V] = {
val filteredCacheStore = new HHFilteredCache(cache, hhPct, writeUpdateFreq, rolloverFreq)
new EagerWriteThroughCacheStore[K, V](store, filteredCacheStore)
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
/*
* Copyright 2013 Twitter Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License. You may obtain
* a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.twitter.storehaus.cache

/** Defines the base of a proxy for a given type.
* A instance of Proxied for type T is intended to use the `self`
* member to forward all methods of an new instance of type T to.
* This allows for extensions of type T which can inherit a proxied
* instance's behavior without needing to override every method of type T.
*
* {{{
*
* class Dazzle {
* def a: String = "default a"
* def b: String = "default b"
* // ...
* }
*
* // define a reusable concrete proxy statisfying Dazzle forwarding
* // all calls to Proxied method self
* class DazzleProxy(val self: Dazzle) extends Dazzle with Proxied[Dazzle] {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems to be copied over from Proxied in storehaus-core. Do we need this scaladoc here as well?

* def a: String = self.a
* def b: String = self.b
* }
*
* val bedazzlable = new Dazzle {
* // return a new Dazzle with some sizzle
* def be(sizzle: String): Dazzle = new DazzleProxy(this) {
* override def b = "%s %s!!!" format(self.b, sizzle)
* }
* }
*
* val dazzled = bedazzlable.be("dazzled")
* dazzled.b // default b dazzled!!!
* dazzled.a // default a
*
* }}}
*
* @author Doug Tangren
*/
trait CacheProxied[T] {
protected def self: T
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,207 @@
/*
* Copyright 2013 Twitter Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License. You may obtain
* a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.twitter.storehaus.cache

import com.twitter.algebird.{ Semigroup, Monoid, Group, CMSHash }
import com.twitter.util.Future


// The update frequency is how often we should update the mutable CMS
case class WriteOperationUpdateFrequency(toInt: Int)
object WriteOperationUpdateFrequency {
def default = WriteOperationUpdateFrequency(100) // update 1% of the time
}

// This is how often in MS to roll over the CMS
case class RollOverFrequencyMS(toLong: Long)

object RollOverFrequencyMS {
def default = RollOverFrequencyMS(3600 * 1000L) // 1 Hour
}

// The heavy hitters percent is used to control above what % of items we should send to the backing
// aggregator
case class HeavyHittersPercent(toFloat: Float)
object HeavyHittersPercent {
def default = HeavyHittersPercent(0.001f) // 0.1% of the time
}

sealed class ApproxHHTracker[K](hhPct: HeavyHittersPercent, updateFreq: WriteOperationUpdateFrequency, roFreq: RollOverFrequencyMS) {
private[this] final val WIDTH = 1000
private[this] final val DEPTH = 4
private[this] final val hh = new java.util.HashMap[K, Long]()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Was wondering if this can be made a ConcurrentHashMap. But looks like it can't because an update here optionally performs pruning which needs a global view of the hash map?

private[this] final var totalCount = 0L
private[this] final var hhMinReq = 0L
private[this] final val hhPercent = hhPct.toFloat
private[this] final val updateOpsFrequency = updateFreq.toInt
private[this] final val rollOverFrequency = roFreq.toLong
private[this] var countsTable = Array.fill(WIDTH * DEPTH)(0L)
private[this] var nextRollOver: Long = System.currentTimeMillis + roFreq.toLong
private[this] final val updateOps = new java.util.concurrent.atomic.AtomicInteger(0)

private[this] final val hashes: IndexedSeq[CMSHash] = {
val r = new scala.util.Random(5)
(0 until DEPTH).map { _ => CMSHash(r.nextInt, 0, WIDTH) }
}.toIndexedSeq

@inline
private[this] final def frequencyEst(item : Long): Long = {
var min = Long.MaxValue
var indx = 0
while (indx < DEPTH) {
val tableIdx = indx*WIDTH + hashes(indx)(item)
val newVal = countsTable(tableIdx)
if(newVal < min) min = newVal
indx += 1
}
min
}


// Update functions in the write path
// a synchronized guard should be used around these
// to ensure consistent updates to backing data structures
@inline
private[this] final def updateItem(item: K) {
val itemHashCode = item.hashCode
totalCount += 1L
hhMinReq = (hhPercent * totalCount).toLong
var indx = 0
while (indx < DEPTH) {
val offset = indx*WIDTH + hashes(indx)(itemHashCode)
countsTable.update(offset, countsTable(offset) + 1L)
indx += 1
}

updateHH(item, itemHashCode)
}

@inline
private[this] final def updateHH(item : K, itemHashCode: Int) {
@inline
def pruneHH {
val iter = hh.values.iterator
while(iter.hasNext) {
val n = iter.next
if(n < hhMinReq) {
iter.remove
}
}
}

if(hh.containsKey(item)) {
val v = hh.get(item)
val newItemCount = v + 1L
if (newItemCount < hhMinReq) {
pruneHH
} else {
hh.put(item, newItemCount)
}
} else {
val newItemCount = frequencyEst(itemHashCode) // Do not + 1 since we have done that before.
if (newItemCount >= hhMinReq) {
hh.put(item, newItemCount)
}
}
}

// We include the ability to reset the CMS so we can age our counters
// over time
private[this] def resetCMS {
hh.clear
totalCount = 0L
hhMinReq = 0L
countsTable = Array.fill(WIDTH * DEPTH)(0L)
updateOps.set(1)
nextRollOver = System.currentTimeMillis + roFreq.toLong
}
// End of thread-unsafe update steps


final def getFilterFunc: K => Boolean = {
val opsCntr = updateOps.incrementAndGet

if(opsCntr < 100 || opsCntr % updateOpsFrequency == 0) {
hh.synchronized {
if(System.currentTimeMillis > nextRollOver) {
resetCMS
}
{k: K =>
updateItem(k)
hh.containsKey(k)
}
}
} else {
{k: K =>
hh.containsKey(k)
}
}
}

final def clear {
hh.synchronized {
resetCMS
}
}

final def query(t: K): Boolean = hh.containsKey(t)
}

/*
This is a store for using the CMS code above to only store/read values which are heavy hitters in the CMS
*/
class HHFilteredCache[K, V](val self: MutableCache[K, V],
hhPercent: HeavyHittersPercent = HeavyHittersPercent.default,
writeUpdateFreq: WriteOperationUpdateFrequency = WriteOperationUpdateFrequency.default,
rolloverFreq: RollOverFrequencyMS = RollOverFrequencyMS.default) extends MutableCacheProxy[K, V] {
private[this] val approxTracker = new ApproxHHTracker[K](hhPercent, writeUpdateFreq, rolloverFreq)

override def +=(kv: (K, V)): this.type =
if(approxTracker.getFilterFunc(kv._1)) {
self += kv
this
} else {
this
}

override def multiInsert(kvs: Map[K, V]): this.type = {
val filterFunc = approxTracker.getFilterFunc
val backed = self.multiInsert(kvs.filterKeys(t => filterFunc(t)))
this
}

override def hit(k: K): Option[V] =
if(approxTracker.getFilterFunc(k)) {
self.hit(k)
} else {
None
}

override def clear: this.type = {
self.clear
approxTracker.clear
this
}

override def iterator: Iterator[(K, V)] = {
self.iterator.filter{kv => approxTracker.query(kv._1)}
}

override def contains(k: K): Boolean = if(approxTracker.query(k)) self.contains(k) else false
override def get(k: K): Option[V] = if(approxTracker.query(k)) self.get(k) else None

}
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,12 @@ trait MutableCache[K, V] {
*/
def get(k: K): Option[V]
def +=(kv: (K, V)): this.type

def multiInsert(kvs: Map[K, V]): this.type = {
kvs.foreach { kv => this.+=(kv) }
this
}

def hit(k: K): Option[V]

/* Returns an option of the (potentially) evicted value. */
Expand All @@ -65,6 +71,11 @@ trait MutableCache[K, V] {
/* Returns the cache with the supplied key evicted. */
def -=(k: K): this.type = { evict(k); this }

def multiRemove(ks: Set[K]): this.type = {
ks.foreach { k => this.-=(k) }
this
}

def getOrElseUpdate(k: K, v: => V): V =
hit(k).getOrElse {
val realizedV = v
Expand Down
Loading