diff --git a/.travis.yml b/.travis.yml index 3d8e1be1..3c1941a4 100644 --- a/.travis.yml +++ b/.travis.yml @@ -1,6 +1,6 @@ language: scala scala: - - 2.10.3 + - 2.10.4 - 2.9.3 before_script: - mysql -u root -e "create database storehaus_test;" diff --git a/CHANGES.md b/CHANGES.md index 326abece..874bc2a6 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -1,5 +1,11 @@ # Storehaus # +### Version 0.9.1 ### +* Feature/write through cache perf: https://github.com/twitter/storehaus/pull/234 +* Share the Retrying Read Write store in storehaus repo: https://github.com/twitter/storehaus/pull/230 +* initial Kafka 0.8 support: https://github.com/twitter/storehaus/pull/232 +* Exceptions on the cache-store should be ignored for Read/WriteThroughStore: https://github.com/twitter/storehaus/pull/225 + ### Version 0.9.0 ### * Reporting store algebra: https://github.com/twitter/storehaus/pull/176 * Bumping finagle to a more recent version, changes that were required: https://github.com/twitter/storehaus/pull/223 diff --git a/project/Build.scala b/project/Build.scala index 9896ce42..3f6d2712 100644 --- a/project/Build.scala +++ b/project/Build.scala @@ -21,6 +21,9 @@ import Keys._ import spray.boilerplate.BoilerplatePlugin.Boilerplate import com.typesafe.tools.mima.plugin.MimaPlugin.mimaDefaultSettings import com.typesafe.tools.mima.plugin.MimaKeys.previousArtifact +import sbtassembly.Plugin._ +import AssemblyKeys._ + object StorehausBuild extends Build { def withCross(dep: ModuleID) = @@ -35,7 +38,7 @@ object StorehausBuild extends Build { case version if version startsWith "2.10" => "org.specs2" %% "specs2" % "1.13" % "test" } val extraSettings = - Project.defaultSettings ++ Boilerplate.settings ++ mimaDefaultSettings + Project.defaultSettings ++ Boilerplate.settings ++ assemblySettings ++ mimaDefaultSettings def ciSettings: Seq[Project.Setting[_]] = if (sys.env.getOrElse("TRAVIS", "false").toBoolean) Seq( @@ -55,8 +58,8 @@ object StorehausBuild extends Build { val sharedSettings = extraSettings ++ ciSettings ++ Seq( organization := "com.twitter", scalaVersion := "2.9.3", - version := "0.9.0", - crossScalaVersions := Seq("2.9.3", "2.10.0"), + version := "0.9.1", + crossScalaVersions := Seq("2.9.3", "2.10.4"), javacOptions ++= Seq("-source", "1.6", "-target", "1.6"), javacOptions in doc := Seq("-source", "1.6"), libraryDependencies <+= scalaVersion(specs2Import(_)), @@ -116,10 +119,10 @@ object StorehausBuild extends Build { .filterNot(unreleasedModules.contains(_)) .map { s => "com.twitter" % ("storehaus-" + s + "_2.9.3") % "0.9.0" } - val algebirdVersion = "0.5.0" - val bijectionVersion = "0.6.2" + val algebirdVersion = "0.7.0" + val bijectionVersion = "0.6.3" val utilVersion = "6.11.0" - val scaldingVersion = "0.9.0rc15" + val scaldingVersion = "0.11.1" lazy val storehaus = Project( id = "storehaus", @@ -139,6 +142,7 @@ object StorehausBuild extends Build { storehausHBase, storehausDynamoDB, storehausKafka, + storehausKafka08, storehausMongoDB, storehausElastic, storehausTesting @@ -225,7 +229,7 @@ object StorehausBuild extends Build { libraryDependencies ++= Seq ( "com.twitter" %% "bijection-core" % bijectionVersion, "com.twitter" %% "bijection-avro" % bijectionVersion, - "com.twitter"%"kafka_2.9.2"%"0.7.0" excludeAll( + "com.twitter"%"kafka_2.9.2"%"0.7.0" % "provided" excludeAll( ExclusionRule("com.sun.jdmk","jmxtools"), ExclusionRule( "com.sun.jmx","jmxri"), ExclusionRule( "javax.jms","jms") @@ -235,6 +239,19 @@ object StorehausBuild extends Build { parallelExecution in Test := false ).dependsOn(storehausAlgebra % "test->test;compile->compile") + lazy val storehausKafka08 = module("kafka-08").settings( + libraryDependencies ++= Seq ( + "com.twitter" %% "bijection-core" % bijectionVersion, + "com.twitter" %% "bijection-avro" % bijectionVersion, + "org.apache.kafka" % "kafka_2.9.2" % "0.8.0" % "provided" excludeAll( + ExclusionRule(organization = "com.sun.jdmk"), + ExclusionRule(organization = "com.sun.jmx"), + ExclusionRule(organization = "javax.jms")) + ), + // we don't want various tests clobbering each others keys + parallelExecution in Test := false + ).dependsOn(storehausCore,storehausAlgebra % "test->test;compile->compile") + lazy val storehausMongoDB= module("mongodb").settings( libraryDependencies ++= Seq( "com.twitter" %% "bijection-core" % bijectionVersion, @@ -265,4 +282,14 @@ object StorehausBuild extends Build { withCross("com.twitter" %% "util-core" % utilVersion)) ) ) + + lazy val storehausCaliper = module("caliper").settings( + libraryDependencies ++= Seq("com.google.caliper" % "caliper" % "0.5-rc1", + "com.google.code.java-allocation-instrumenter" % "java-allocation-instrumenter" % "2.0", + "com.google.code.gson" % "gson" % "1.7.1", + "com.twitter" %% "bijection-core" % bijectionVersion, + "com.twitter" %% "algebird-core" % algebirdVersion), + javaOptions in run <++= (fullClasspath in Runtime) map { cp => Seq("-cp", sbt.Build.data(cp).mkString(":")) } + ).dependsOn(storehausCore, storehausAlgebra, storehausCache) + } diff --git a/project/plugins.sbt b/project/plugins.sbt index 4885d18b..46d15ea6 100644 --- a/project/plugins.sbt +++ b/project/plugins.sbt @@ -8,3 +8,5 @@ addSbtPlugin("com.typesafe.sbt" % "sbt-ghpages" % "0.5.1") addSbtPlugin("com.typesafe" % "sbt-mima-plugin" % "0.1.6") addSbtPlugin("io.spray" % "sbt-boilerplate" % "0.5.1") + +addSbtPlugin("com.eed3si9n" % "sbt-assembly" % "0.11.2") \ No newline at end of file diff --git a/storehaus-algebra/src/main/scala/com/twitter/storehaus/algebra/HHFilteredStore.scala b/storehaus-algebra/src/main/scala/com/twitter/storehaus/algebra/HHFilteredStore.scala new file mode 100644 index 00000000..b3bd18c7 --- /dev/null +++ b/storehaus-algebra/src/main/scala/com/twitter/storehaus/algebra/HHFilteredStore.scala @@ -0,0 +1,30 @@ +/* + * Copyright 2013 Twitter Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. You may obtain + * a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.twitter.storehaus.algebra + +import com.twitter.storehaus.{Store, EagerWriteThroughCacheStore } +import com.twitter.storehaus.cache.MutableCache +import com.twitter.util.Future +import com.twitter.storehaus.cache.{MutableCache, HeavyHittersPercent, WriteOperationUpdateFrequency, RollOverFrequencyMS, HHFilteredCache} + +object HHFilteredStore { + def buildStore[K, V](store: Store[K, V], cache: MutableCache[K, Future[Option[V]]], hhPct: HeavyHittersPercent, + writeUpdateFreq: WriteOperationUpdateFrequency, rolloverFreq: RollOverFrequencyMS): Store[K, V] = { + val filteredCacheStore = new HHFilteredCache(cache, hhPct, writeUpdateFreq, rolloverFreq) + new EagerWriteThroughCacheStore[K, V](store, filteredCacheStore) + } +} diff --git a/storehaus-cache/src/main/scala/com/twitter/storehaus/cache/CacheProxy.scala b/storehaus-cache/src/main/scala/com/twitter/storehaus/cache/CacheProxy.scala new file mode 100644 index 00000000..d083047b --- /dev/null +++ b/storehaus-cache/src/main/scala/com/twitter/storehaus/cache/CacheProxy.scala @@ -0,0 +1,56 @@ +/* + * Copyright 2013 Twitter Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. You may obtain + * a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.twitter.storehaus.cache + +/** Defines the base of a proxy for a given type. + * A instance of Proxied for type T is intended to use the `self` + * member to forward all methods of an new instance of type T to. + * This allows for extensions of type T which can inherit a proxied + * instance's behavior without needing to override every method of type T. + * + * {{{ + * + * class Dazzle { + * def a: String = "default a" + * def b: String = "default b" + * // ... + * } + * + * // define a reusable concrete proxy statisfying Dazzle forwarding + * // all calls to Proxied method self + * class DazzleProxy(val self: Dazzle) extends Dazzle with Proxied[Dazzle] { + * def a: String = self.a + * def b: String = self.b + * } + * + * val bedazzlable = new Dazzle { + * // return a new Dazzle with some sizzle + * def be(sizzle: String): Dazzle = new DazzleProxy(this) { + * override def b = "%s %s!!!" format(self.b, sizzle) + * } + * } + * + * val dazzled = bedazzlable.be("dazzled") + * dazzled.b // default b dazzled!!! + * dazzled.a // default a + * + * }}} + * + * @author Doug Tangren + */ +trait CacheProxied[T] { + protected def self: T +} \ No newline at end of file diff --git a/storehaus-cache/src/main/scala/com/twitter/storehaus/cache/HHFilteredCache.scala b/storehaus-cache/src/main/scala/com/twitter/storehaus/cache/HHFilteredCache.scala new file mode 100644 index 00000000..00101fc7 --- /dev/null +++ b/storehaus-cache/src/main/scala/com/twitter/storehaus/cache/HHFilteredCache.scala @@ -0,0 +1,207 @@ +/* + * Copyright 2013 Twitter Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. You may obtain + * a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.twitter.storehaus.cache + +import com.twitter.algebird.{ Semigroup, Monoid, Group, CMSHash } +import com.twitter.util.Future + + +// The update frequency is how often we should update the mutable CMS +case class WriteOperationUpdateFrequency(toInt: Int) +object WriteOperationUpdateFrequency { + def default = WriteOperationUpdateFrequency(100) // update 1% of the time +} + +// This is how often in MS to roll over the CMS +case class RollOverFrequencyMS(toLong: Long) + +object RollOverFrequencyMS { + def default = RollOverFrequencyMS(3600 * 1000L) // 1 Hour +} + +// The heavy hitters percent is used to control above what % of items we should send to the backing +// aggregator +case class HeavyHittersPercent(toFloat: Float) +object HeavyHittersPercent { + def default = HeavyHittersPercent(0.001f) // 0.1% of the time +} + +sealed class ApproxHHTracker[K](hhPct: HeavyHittersPercent, updateFreq: WriteOperationUpdateFrequency, roFreq: RollOverFrequencyMS) { + private[this] final val WIDTH = 1000 + private[this] final val DEPTH = 4 + private[this] final val hh = new java.util.HashMap[K, Long]() + private[this] final var totalCount = 0L + private[this] final var hhMinReq = 0L + private[this] final val hhPercent = hhPct.toFloat + private[this] final val updateOpsFrequency = updateFreq.toInt + private[this] final val rollOverFrequency = roFreq.toLong + private[this] var countsTable = Array.fill(WIDTH * DEPTH)(0L) + private[this] var nextRollOver: Long = System.currentTimeMillis + roFreq.toLong + private[this] final val updateOps = new java.util.concurrent.atomic.AtomicInteger(0) + + private[this] final val hashes: IndexedSeq[CMSHash] = { + val r = new scala.util.Random(5) + (0 until DEPTH).map { _ => CMSHash(r.nextInt, 0, WIDTH) } + }.toIndexedSeq + + @inline + private[this] final def frequencyEst(item : Long): Long = { + var min = Long.MaxValue + var indx = 0 + while (indx < DEPTH) { + val tableIdx = indx*WIDTH + hashes(indx)(item) + val newVal = countsTable(tableIdx) + if(newVal < min) min = newVal + indx += 1 + } + min + } + + + // Update functions in the write path + // a synchronized guard should be used around these + // to ensure consistent updates to backing data structures + @inline + private[this] final def updateItem(item: K) { + val itemHashCode = item.hashCode + totalCount += 1L + hhMinReq = (hhPercent * totalCount).toLong + var indx = 0 + while (indx < DEPTH) { + val offset = indx*WIDTH + hashes(indx)(itemHashCode) + countsTable.update(offset, countsTable(offset) + 1L) + indx += 1 + } + + updateHH(item, itemHashCode) + } + + @inline + private[this] final def updateHH(item : K, itemHashCode: Int) { + @inline + def pruneHH { + val iter = hh.values.iterator + while(iter.hasNext) { + val n = iter.next + if(n < hhMinReq) { + iter.remove + } + } + } + + if(hh.containsKey(item)) { + val v = hh.get(item) + val newItemCount = v + 1L + if (newItemCount < hhMinReq) { + pruneHH + } else { + hh.put(item, newItemCount) + } + } else { + val newItemCount = frequencyEst(itemHashCode) // Do not + 1 since we have done that before. + if (newItemCount >= hhMinReq) { + hh.put(item, newItemCount) + } + } + } + + // We include the ability to reset the CMS so we can age our counters + // over time + private[this] def resetCMS { + hh.clear + totalCount = 0L + hhMinReq = 0L + countsTable = Array.fill(WIDTH * DEPTH)(0L) + updateOps.set(1) + nextRollOver = System.currentTimeMillis + roFreq.toLong + } + // End of thread-unsafe update steps + + + final def getFilterFunc: K => Boolean = { + val opsCntr = updateOps.incrementAndGet + + if(opsCntr < 100 || opsCntr % updateOpsFrequency == 0) { + hh.synchronized { + if(System.currentTimeMillis > nextRollOver) { + resetCMS + } + {k: K => + updateItem(k) + hh.containsKey(k) + } + } + } else { + {k: K => + hh.containsKey(k) + } + } + } + + final def clear { + hh.synchronized { + resetCMS + } + } + + final def query(t: K): Boolean = hh.containsKey(t) +} + +/* + This is a store for using the CMS code above to only store/read values which are heavy hitters in the CMS +*/ +class HHFilteredCache[K, V](val self: MutableCache[K, V], + hhPercent: HeavyHittersPercent = HeavyHittersPercent.default, + writeUpdateFreq: WriteOperationUpdateFrequency = WriteOperationUpdateFrequency.default, + rolloverFreq: RollOverFrequencyMS = RollOverFrequencyMS.default) extends MutableCacheProxy[K, V] { + private[this] val approxTracker = new ApproxHHTracker[K](hhPercent, writeUpdateFreq, rolloverFreq) + + override def +=(kv: (K, V)): this.type = + if(approxTracker.getFilterFunc(kv._1)) { + self += kv + this + } else { + this + } + + override def multiInsert(kvs: Map[K, V]): this.type = { + val filterFunc = approxTracker.getFilterFunc + val backed = self.multiInsert(kvs.filterKeys(t => filterFunc(t))) + this + } + + override def hit(k: K): Option[V] = + if(approxTracker.getFilterFunc(k)) { + self.hit(k) + } else { + None + } + + override def clear: this.type = { + self.clear + approxTracker.clear + this + } + + override def iterator: Iterator[(K, V)] = { + self.iterator.filter{kv => approxTracker.query(kv._1)} + } + + override def contains(k: K): Boolean = if(approxTracker.query(k)) self.contains(k) else false + override def get(k: K): Option[V] = if(approxTracker.query(k)) self.get(k) else None + +} \ No newline at end of file diff --git a/storehaus-cache/src/main/scala/com/twitter/storehaus/cache/MutableCache.scala b/storehaus-cache/src/main/scala/com/twitter/storehaus/cache/MutableCache.scala index 357e6f02..dc9f3acf 100644 --- a/storehaus-cache/src/main/scala/com/twitter/storehaus/cache/MutableCache.scala +++ b/storehaus-cache/src/main/scala/com/twitter/storehaus/cache/MutableCache.scala @@ -40,6 +40,12 @@ trait MutableCache[K, V] { */ def get(k: K): Option[V] def +=(kv: (K, V)): this.type + + def multiInsert(kvs: Map[K, V]): this.type = { + kvs.foreach { kv => this.+=(kv) } + this + } + def hit(k: K): Option[V] /* Returns an option of the (potentially) evicted value. */ @@ -65,6 +71,11 @@ trait MutableCache[K, V] { /* Returns the cache with the supplied key evicted. */ def -=(k: K): this.type = { evict(k); this } + def multiRemove(ks: Set[K]): this.type = { + ks.foreach { k => this.-=(k) } + this + } + def getOrElseUpdate(k: K, v: => V): V = hit(k).getOrElse { val realizedV = v diff --git a/storehaus-cache/src/main/scala/com/twitter/storehaus/cache/MutableCacheProxy.scala b/storehaus-cache/src/main/scala/com/twitter/storehaus/cache/MutableCacheProxy.scala new file mode 100644 index 00000000..07a22f70 --- /dev/null +++ b/storehaus-cache/src/main/scala/com/twitter/storehaus/cache/MutableCacheProxy.scala @@ -0,0 +1,60 @@ +/* + * Copyright 2013 Twitter Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. You may obtain + * a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.twitter.storehaus.cache + +import com.twitter.util.{ Future, Time } + +/** Proxy for Mergeables. Methods not overrided in extensions will be forwared to Proxied + * self member */ +trait MutableCacheProxy[K, V] extends MutableCache[K, V] with CacheProxied[MutableCache[K,V]] { + override def get(k: K): Option[V] = self.get(k) + + override def +=(kv: (K, V)): this.type = { + self.+=(kv) + this + } + + override def multiInsert(kvs: Map[K, V]): this.type = { + self.multiInsert(kvs) + this + } + + override def hit(k: K): Option[V] = self.hit(k) + override def evict(k: K): Option[V] = self.evict(k) + override def iterator: Iterator[(K, V)] = self.iterator + override def empty: MutableCache[K, V] = self.empty + + override def clear: this.type = { + self.clear + this + } + + override def contains(k: K): Boolean = self.contains(k) + + override def -=(k: K): this.type = { + self.-=(k) + this + } + + override def multiRemove(ks: Set[K]): this.type = { + self.multiRemove(ks) + this + } + + override def getOrElseUpdate(k: K, v: => V): V = self.getOrElseUpdate(k , v) + override def filter(pred: ((K, V)) => Boolean): MutableCache[K, V] = self.filter(pred) +} \ No newline at end of file diff --git a/storehaus-cache/src/main/scala/com/twitter/storehaus/cache/MutableTTLCache.scala b/storehaus-cache/src/main/scala/com/twitter/storehaus/cache/MutableTTLCache.scala index 9088f4a9..44a4aa41 100644 --- a/storehaus-cache/src/main/scala/com/twitter/storehaus/cache/MutableTTLCache.scala +++ b/storehaus-cache/src/main/scala/com/twitter/storehaus/cache/MutableTTLCache.scala @@ -31,10 +31,21 @@ object MutableTTLCache { } class MutableTTLCache[K, V](val ttl: Duration, protected val backingCache: MutableCache[K, (Long, V)])(val clock: () => Long) extends MutableCache[K, V] { - def get(k: K) = backingCache.get(k).filter(_._1 > clock()).map(_._2) + private[this] val putsSincePrune = new java.util.concurrent.atomic.AtomicInteger(1) + + def get(k: K) = { + // Only query the clock if the backing cache returns + // something + backingCache.get(k).filter {kv => + val clockVal = clock() + kv._1 > clockVal + }.map(_._2) + } def +=(kv: (K, V)): this.type = { - removeExpired + if(putsSincePrune.getAndIncrement % 1000 == 0) { + removeExpired + } backingCache += (kv._1, (clock() + ttl.inMilliseconds, kv._2)) this } diff --git a/storehaus-cache/src/test/scala/com/twitter/storehaus/cache/HHFilteredCacheTest.scala b/storehaus-cache/src/test/scala/com/twitter/storehaus/cache/HHFilteredCacheTest.scala new file mode 100644 index 00000000..08b6c3e4 --- /dev/null +++ b/storehaus-cache/src/test/scala/com/twitter/storehaus/cache/HHFilteredCacheTest.scala @@ -0,0 +1,59 @@ +/* + * Copyright 2013 Twitter Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. You may obtain + * a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.twitter.storehaus.cache + +import org.specs2.mutable._ + +class HHFilteredCacheTest extends Specification { + def checkCache[K, V](pairs: Seq[(K, V)], m: Map[K, V])(implicit cache: MutableCache[K, V]) = { + pairs.foldLeft(cache)(_ += _) + val res = cache.iterator.toMap + cache.clear + res must be_==(m) + } + + "HHFilteredCache works properly" in { + val backingCache = MutableCache.fromJMap[String, Int](new java.util.LinkedHashMap[String, Int]) + + implicit val cache = new HHFilteredCache[String, Int](backingCache, HeavyHittersPercent(0.5f), WriteOperationUpdateFrequency(1), RollOverFrequencyMS(10000000L)) + + checkCache( + Seq("a" -> 1, "b" -> 2), + Map("a" -> 1, "b" -> 2) + ) + + // Ensures the previous clear operations are running + // The output == input + checkCache( + Seq("c" -> 1, "d" -> 2), + Map("c" -> 1, "d" -> 2) + ) + + // Nothing above the 0.5 HH theshold + checkCache( + Seq("a" -> 1, "b" -> 2, "c" -> 3, "d" -> 3, "e" -> 3, "a" -> 1), + Map() + ) + + // Only b should be above the HH threshold + checkCache( + Seq("a" -> 1, "b" -> 2, "b" -> 3, "c" -> 1, "b" -> 3, "a" -> 1), + Map("b" -> 3) + ) + + } +} diff --git a/storehaus-caliper/src/main/scala/com/twitter/storehaus/caliper/Runner.scala b/storehaus-caliper/src/main/scala/com/twitter/storehaus/caliper/Runner.scala new file mode 100644 index 00000000..ea25eab5 --- /dev/null +++ b/storehaus-caliper/src/main/scala/com/twitter/storehaus/caliper/Runner.scala @@ -0,0 +1,14 @@ +package com.twitter.storehaus.caliper + +import com.google.caliper.{Runner => CaliperMain} +import com.google.common.collect.ObjectArrays.concat + +import java.io.PrintWriter + +object Runner { + + def main(args: Array[String]) { + CaliperMain.main(args) + } + +} diff --git a/storehaus-caliper/src/main/scala/com/twitter/storehaus/caliper/WriteThroughCacheBenchmark.scala b/storehaus-caliper/src/main/scala/com/twitter/storehaus/caliper/WriteThroughCacheBenchmark.scala new file mode 100644 index 00000000..8e26af70 --- /dev/null +++ b/storehaus-caliper/src/main/scala/com/twitter/storehaus/caliper/WriteThroughCacheBenchmark.scala @@ -0,0 +1,82 @@ +package com.twitter.storehaus.caliper + +import com.google.caliper.{Param, SimpleBenchmark} +import com.twitter.algebird.HyperLogLogMonoid +import com.twitter.bijection._ +import com.twitter.storehaus._ +import com.twitter.conversions.time._ +import com.twitter.algebird._ +import java.nio.ByteBuffer +import scala.math.pow +import com.twitter.storehaus.algebra._ +import com.twitter.util.{Await, Future} + +class DelayedStore[K, V](val self: Store[K, V])(implicit timer: com.twitter.util.Timer) extends StoreProxy[K, V] { + override def put(kv: (K, Option[V])): Future[Unit] = { + self.put(kv).delayed(10.milliseconds) + } + + override def get(kv: K): Future[Option[V]] = { + self.get(kv).delayed(10.milliseconds) + } + + override def multiGet[K1 <: K](ks: Set[K1]): Map[K1, Future[Option[V]]] = + self.multiGet(ks).map { case (k,v) => + (k, v.delayed(10.milliseconds)) + } + + override def multiPut[K1 <: K](kvs: Map[K1, Option[V]]): Map[K1, Future[Unit]] = + self.multiPut(kvs).map { case (k,v) => + (k, v.delayed(10.milliseconds)) + } +} + +class WriteThroughCacheBenchmark extends SimpleBenchmark { + implicit val custTimer = new com.twitter.util.ScheduledThreadPoolTimer(20) + import StoreAlgebra._ + + implicit val hllMonoid = new HyperLogLogMonoid(14) + + @Param(Array("100", "1000", "10000")) + var numInputKeys: Int = 0 + + @Param(Array("10000", "100000")) + var numElements: Int = 0 + + var inputData: Seq[Map[Long, HLL]] = _ + + var store: MergeableStore[Long, HLL] = _ + + var noCacheStore: MergeableStore[Long, HLL] = _ + + override def setUp { + val rng = new scala.util.Random(3) + val byteEncoder = implicitly[Injection[Long, Array[Byte]]] + def setSize = rng.nextInt(10) + 1 // 1 -> 10 + def hll(elements: Set[Long]): HLL = hllMonoid.batchCreate(elements)(byteEncoder) + val inputIntermediate = (0L until numElements).map {_ => + val setElements = (0 until setSize).map{_ => rng.nextInt(1000).toLong}.toSet + (pow(numInputKeys, rng.nextFloat).toLong, hll(setElements)) + }.grouped(20) + + inputData = inputIntermediate.map(s => MapAlgebra.sumByKey(s)).toSeq + + val delayedStore = new DelayedStore(new ConcurrentHashMapStore[Long, HLL]) + store = (new WriteThroughStore(delayedStore, new HHFilteredStore(new ConcurrentHashMapStore[Long, HLL]), true)).toMergeable + noCacheStore = delayedStore.toMergeable + } + + def timeDoUpdates(reps: Int): Int = { + Await.result(Future.collect((0 until reps).flatMap { indx => + inputData.map(d => FutureOps.mapCollect(store.multiMerge(d)).unit) + }.toSeq)) + 12 + } + + def timeDoUpdatesWithoutCache(reps: Int): Int = { + Await.result(Future.collect((0 until reps).flatMap { indx => + inputData.map(d => FutureOps.mapCollect(noCacheStore.multiMerge(d)).unit) + }.toSeq)) + 12 + } +} diff --git a/storehaus-core/src/main/scala/com/twitter/storehaus/EagerWriteThroughCacheStore.scala b/storehaus-core/src/main/scala/com/twitter/storehaus/EagerWriteThroughCacheStore.scala new file mode 100644 index 00000000..dc84d0eb --- /dev/null +++ b/storehaus-core/src/main/scala/com/twitter/storehaus/EagerWriteThroughCacheStore.scala @@ -0,0 +1,60 @@ +/* + * Copyright 2013 Twitter Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. You may obtain + * a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.twitter.storehaus + +import com.twitter.storehaus.cache.MutableCache +import com.twitter.util.{ Future, Return } +import scala.collection.breakOut + +/* + This will eagerly read/write to the backing gage when doing operations against its underlying store. + It will not wait for the return of the network call to update the cache entry. + The cache itself is assumed to be backed by a thread safe implementation. +*/ +class EagerWriteThroughCacheStore[K, V](store: Store[K, V], threadSafeCache: MutableCache[K, Future[Option[V]]]) extends Store[K, V] { + + override def put(kv: (K, Option[V])): Future[Unit] = { + threadSafeCache += (kv._1, Future.value(kv._2)) + store.put(kv) + } + + override def get(k: K): Future[Option[V]] = { + threadSafeCache.get(k).getOrElse { + store.get(k) + } + } + + override def multiGet[K1 <: K](keys: Set[K1]): Map[K1, Future[Option[V]]] = { + val present: Map[K1, Future[Option[V]]] = + keys.map { k => k -> threadSafeCache.get(k) }.collect { case (k, Some(v)) => k -> v }(breakOut) + + val replaced = store.multiGet(keys -- present.keySet) + + replaced.foreach {kv => + threadSafeCache += ((kv._1, kv._2)) + } + + present ++ replaced + } + + override def multiPut[K1 <: K](kvs: Map[K1, Option[V]]): Map[K1, Future[Unit]] = { + kvs.foreach { case (k, optiV) => + threadSafeCache += ((k, Future.value(optiV))) + } + store.multiPut(kvs) + } +} \ No newline at end of file diff --git a/storehaus-core/src/main/scala/com/twitter/storehaus/ReadThroughStore.scala b/storehaus-core/src/main/scala/com/twitter/storehaus/ReadThroughStore.scala index 6d7f7dd8..874d2b6d 100644 --- a/storehaus-core/src/main/scala/com/twitter/storehaus/ReadThroughStore.scala +++ b/storehaus-core/src/main/scala/com/twitter/storehaus/ReadThroughStore.scala @@ -17,7 +17,7 @@ package com.twitter.storehaus import com.twitter.concurrent.AsyncMutex -import com.twitter.util.Future +import com.twitter.util.{ Future, Return } /** * Provides read-through caching on a readable store fronted by a cache. @@ -46,27 +46,22 @@ class ReadThroughStore[K, V](backingStore: ReadableStore[K, V], cache: Store[K, mutex.acquire.flatMap { p => cache.put((k, storeValue)) .map { u : Unit => storeValue } - .onFailure { case x: Exception => storeValue } + .rescue { case x: Exception => Future.value(storeValue) } .ensure { p.release } } } } - override def get(k: K): Future[Option[V]] = - cache.get(k).flatMap { cacheValue => - cacheValue match { - case None => getFromBackingStore(k) - case some => Future.value(some) - } - } onFailure { case x: Exception => - getFromBackingStore(k) - } + override def get(k: K): Future[Option[V]] = cache.get(k) transform { + case Return(v @ Some(_)) => Future.value(v) + case _ => getFromBackingStore(k) + } override def multiGet[K1 <: K](ks: Set[K1]): Map[K1, Future[Option[V]]] = { // attempt to read from cache first val cacheResults : Map[K1, Future[Either[Option[V], Exception]]] = cache.multiGet(ks).map { case (k, f) => - (k, f.map { optv => Left(optv) } onFailure { case x: Exception => Right(x) }) + (k, f.map { optv => Left(optv) } rescue { case x: Exception => Future.value(Right(x)) }) } // attempt to read all failed keys and cache misses from backing store @@ -89,4 +84,3 @@ class ReadThroughStore[K, V](backingStore: ReadableStore[K, V], cache: Store[K, FutureOps.liftValues(ks, f, { (k: K1) => Future.None }) } } - diff --git a/storehaus-core/src/main/scala/com/twitter/storehaus/RetryingStore.scala b/storehaus-core/src/main/scala/com/twitter/storehaus/RetryingStore.scala index 6a941e45..920df147 100644 --- a/storehaus-core/src/main/scala/com/twitter/storehaus/RetryingStore.scala +++ b/storehaus-core/src/main/scala/com/twitter/storehaus/RetryingStore.scala @@ -16,6 +16,7 @@ package com.twitter.storehaus +import com.twitter.conversions.time._ import com.twitter.util.{ Duration, Future, Return, Throw, Timer } /** @@ -52,6 +53,35 @@ class RetryingReadableStore[-K, +V](store: ReadableStore[K, V], backoffs: Iterab class RetryingStore[-K, V](store: Store[K, V], backoffs: Iterable[Duration])(pred: Option[V] => Boolean)(implicit timer: Timer) extends RetryingReadableStore[K, V](store, backoffs)(pred) with Store[K, V] { - override def put(kv: (K, Option[V])) = store.put(kv) - override def multiPut[K1 <: K](kvs: Map[K1, Option[V]]): Map[K1, Future[Unit]] = store.multiPut(kvs) + + private val padded_backoffs = backoffs ++ Seq(0.second) + + private def find[T](futures: Iterator[(Future[T], Duration)])(pred: T => Boolean): Future[T] = { + if (!futures.hasNext) { + Future.exception(new RuntimeException("RetryingRWStore: empty iterator in function find")) + } else { + val (next, delay) = futures.next() + if (!futures.hasNext) { + next + } else { + next.filter(pred).rescue { + case e: Exception => + timer.doLater(delay)(()) flatMap { _ => + find(futures)(pred) + } + } + } + } + } + + override def multiPut[K1 <: K](kvs: Map[K1, Option[V]]): Map[K1, Future[Unit]] = { + store.multiPut(kvs) map { case (k, future) => + val retryStream = (Iterator(future) ++ Iterator.continually { store.put((k, kvs(k)))}) + .zip(padded_backoffs.iterator) + (k, find(retryStream) { t => true }) + } + } + + override def put(kv: (K, Option[V])) = multiPut(Map(kv)).apply(kv._1) } + diff --git a/storehaus-core/src/main/scala/com/twitter/storehaus/WriteThroughStore.scala b/storehaus-core/src/main/scala/com/twitter/storehaus/WriteThroughStore.scala index 6c044ee7..5841adff 100644 --- a/storehaus-core/src/main/scala/com/twitter/storehaus/WriteThroughStore.scala +++ b/storehaus-core/src/main/scala/com/twitter/storehaus/WriteThroughStore.scala @@ -40,16 +40,16 @@ class WriteThroughStore[K, V](backingStore: Store[K, V], cache: Store[K, V], inv override def put(kv: (K, Option[V])): Future[Unit] = mutex.acquire.flatMap { p => // write key to backing store first - backingStore.put(kv).flatMap { u: Unit => + backingStore.put(kv).flatMap { _ => // now write key to cache, best effort - cache.put(kv) onFailure { case x: Exception => u } - } onFailure { case x: Exception => + cache.put(kv) rescue { case _ => Future.Unit } + } rescue { case x => // write to backing store failed // now optionally invalidate the key in cache, best effort if (invalidate) { - cache.put((kv._1, None)).flatMap { u: Unit => throw x } onFailure { throw x } + cache.put((kv._1, None)) transform { _ => Future.exception(x) } } else { - throw x + Future.exception(x) } } ensure { p.release @@ -61,7 +61,7 @@ class WriteThroughStore[K, V](backingStore: Store[K, V], cache: Store[K, V], inv // write keys to backing store first val storeResults : Map[K1, Future[Either[Unit, Exception]]] = backingStore.multiPut(kvs).map { case (k, f) => - (k, f.map { u: Unit => Left(u) }.onFailure { case x: Exception => Right(x) }) + (k, f.map { u: Unit => Left(u) }.rescue { case x: Exception => Future.value(Right(x)) }) } // perform cache operations based on how writes to backing store go diff --git a/storehaus-core/src/test/scala/com/twitter/storehaus/ExceptionStore.scala b/storehaus-core/src/test/scala/com/twitter/storehaus/ExceptionStore.scala new file mode 100644 index 00000000..01874331 --- /dev/null +++ b/storehaus-core/src/test/scala/com/twitter/storehaus/ExceptionStore.scala @@ -0,0 +1,32 @@ +/* + * Copyright 2014 Twitter Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. You may obtain + * a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.twitter.storehaus + +import com.twitter.util.Future + +import scala.util.Random + +class ExceptionStore[K, V](possibility: Float = 0.5f) extends ConcurrentHashMapStore[K, V] { + private[this] def wrap[A](f: => Future[A]): Future[A] = { + if (Random.nextFloat() < possibility) Future.exception(new RuntimeException()) + else f + } + + override def get(k: K): Future[Option[V]] = wrap(super.get(k)) + + override def put(kv: (K, Option[V])): Future[Unit] = wrap(super.put(kv)) +} diff --git a/storehaus-core/src/test/scala/com/twitter/storehaus/ReadThroughStoreProperties.scala b/storehaus-core/src/test/scala/com/twitter/storehaus/ReadThroughStoreProperties.scala index b6c8ba9c..9326bec4 100644 --- a/storehaus-core/src/test/scala/com/twitter/storehaus/ReadThroughStoreProperties.scala +++ b/storehaus-core/src/test/scala/com/twitter/storehaus/ReadThroughStoreProperties.scala @@ -26,5 +26,11 @@ object ReadThroughStoreProperties extends Properties("ReadThroughStoreProperties readableStoreLaws[String, Int] { m => new ReadThroughStore(ReadableStore.fromMap(m), new ConcurrentHashMapStore[String,Int]) } + + property("ReadThroughStore should ignore exceptions on the cache-store") = + readableStoreLaws[String, Int] { m => + new ReadThroughStore(ReadableStore.fromMap(m), + new ExceptionStore()) + } } diff --git a/storehaus-core/src/test/scala/com/twitter/storehaus/WriteThroughStoreProperties.scala b/storehaus-core/src/test/scala/com/twitter/storehaus/WriteThroughStoreProperties.scala index 356a07b9..c3349dde 100644 --- a/storehaus-core/src/test/scala/com/twitter/storehaus/WriteThroughStoreProperties.scala +++ b/storehaus-core/src/test/scala/com/twitter/storehaus/WriteThroughStoreProperties.scala @@ -32,5 +32,10 @@ object WriteThroughStoreProperties extends Properties("WriteThroughStoreProperti new WriteThroughStore(new ConcurrentHashMapStore[String,Int], new ConcurrentHashMapStore[String,Int], false) } -} + property("WriteThroughStore should ignore on the cache-store") = + storeTest { + new WriteThroughStore(new ConcurrentHashMapStore[String, Int], + new ExceptionStore(1.0f)) + } +} diff --git a/storehaus-kafka-08/src/main/scala/com/twitter/storehaus/kafka/KafkaAvroSink.scala b/storehaus-kafka-08/src/main/scala/com/twitter/storehaus/kafka/KafkaAvroSink.scala new file mode 100644 index 00000000..b0b4844f --- /dev/null +++ b/storehaus-kafka-08/src/main/scala/com/twitter/storehaus/kafka/KafkaAvroSink.scala @@ -0,0 +1,80 @@ +/* + * Copyright 2014 Twitter inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.twitter.storehaus.kafka + +import org.apache.avro.specific.SpecificRecordBase +import com.twitter.bijection.avro.SpecificAvroCodecs +import com.twitter.bijection._ +import java.util.concurrent.ExecutorService +import kafka.serializer.DefaultEncoder +import java.util.Properties + +/** + * @author Mansur Ashraf + * @since 12/12/13 + */ +/** + * KafkaSink capable of sending Avro messages to a Kafka Topic + */ +object KafkaAvroSink { + + import com.twitter.bijection.StringCodec.utf8 + + /** + * Creates KafkaSink that can sends message of form (String,SpecificRecord) to a Kafka Topic + * @param brokers kafka brokers + * @param topic Kafka Topic + * @tparam V Avro Record + * @return KafkaSink[String,SpecificRecordBase] + */ + def apply[V <: SpecificRecordBase : Manifest](brokers: Seq[String], topic: String, executor: => ExecutorService): KafkaSink[String, V] = { + implicit val inj = SpecificAvroCodecs[V] + lazy val sink = KafkaSink[Array[Byte], Array[Byte], DefaultEncoder](brokers: Seq[String], topic: String) + .convert[String, V](utf8.toFunction) + sink + } + + /** + * Creates KafkaSink that can sends message of form (T,SpecificRecord) to a Kafka Topic + * @param brokers kafka brokers + * @param topic Kafka Topic + * @tparam V Avro Record + * @tparam K key + * @return KafkaSink[T,SpecificRecordBase] + */ + def apply[K: Codec, V <: SpecificRecordBase : Manifest](brokers: Seq[String], topic: String): KafkaSink[K, V] = { + implicit val inj = SpecificAvroCodecs[V] + lazy val sink = KafkaSink[Array[Byte], Array[Byte], DefaultEncoder](brokers: Seq[String], topic: String) + .convert[K, V](implicitly[Codec[K]].toFunction) + sink + } + + /** + * Creates KafkaSink that can sends message of form (T,SpecificRecord) to a Kafka Topic + * @param props kafka props + * @param topic Kafka Topic + * @tparam V Avro Record + * @tparam K key + * @return KafkaSink[T,SpecificRecordBase] + */ + def apply[K: Codec, V <: SpecificRecordBase : Manifest](props: Properties, topic: String): KafkaSink[K, V] = { + implicit val inj = SpecificAvroCodecs[V] + lazy val sink = KafkaSink[Array[Byte], Array[Byte]](props, topic: String) + .convert[K, V](implicitly[Codec[K]].toFunction) + sink + } +} diff --git a/storehaus-kafka-08/src/main/scala/com/twitter/storehaus/kafka/KafkaInjections.scala b/storehaus-kafka-08/src/main/scala/com/twitter/storehaus/kafka/KafkaInjections.scala new file mode 100644 index 00000000..6e5012d4 --- /dev/null +++ b/storehaus-kafka-08/src/main/scala/com/twitter/storehaus/kafka/KafkaInjections.scala @@ -0,0 +1,55 @@ +/* + * Copyright 2014 Twitter inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.twitter.storehaus.kafka + +import kafka.serializer.{Decoder, Encoder} +import com.twitter.bijection.{Codec, Injection} + +/** + * @author Mansur Ashraf + * @since 11/23/13 + */ +object KafkaInjections { + + class ByteArrayEncoder extends FromInjectionEncoder[Array[Byte]] { + def injection: Injection[Array[Byte], Array[Byte]] = Injection.identity[Array[Byte]] + } + + trait FromInjectionDecoder[T] extends Decoder[T] { + def injection: Injection[T, Array[Byte]] + + override def fromBytes(bytes: Array[Byte]): T = injection.invert(bytes).get + } + + trait FromInjectionEncoder[T] extends Encoder[T] { + def injection: Injection[T, Array[Byte]] + + override def toBytes(t: T): Array[Byte] = injection(t) + } + + def fromInjection[T: Codec]: (Encoder[T], Decoder[T]) = { + val result = new FromInjectionEncoder[T] with FromInjectionDecoder[T] { + def injection: Injection[T, Array[Byte]] = implicitly[Codec[T]] + } + (result, result) + } + + implicit def injectionEncoder[T: Codec] : Encoder[T] = fromInjection[T]._1 + + implicit def injectionDecoder[T: Codec] : Decoder[T] = fromInjection[T]._2 + +} diff --git a/storehaus-kafka-08/src/main/scala/com/twitter/storehaus/kafka/KafkaSink.scala b/storehaus-kafka-08/src/main/scala/com/twitter/storehaus/kafka/KafkaSink.scala new file mode 100644 index 00000000..d7988d38 --- /dev/null +++ b/storehaus-kafka-08/src/main/scala/com/twitter/storehaus/kafka/KafkaSink.scala @@ -0,0 +1,120 @@ +/* + * Copyright 2014 Twitter inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.twitter.storehaus.kafka + +import com.twitter.util.Future +import KafkaSink.Dispatcher +import com.twitter.bijection.Injection +import scala.Array +import java.util.concurrent.{Executors, ExecutorService} +import com.twitter.concurrent.NamedPoolThreadFactory +import kafka.serializer.Encoder +import com.twitter.storehaus.kafka.KafkaInjections.ByteArrayEncoder +import java.util.Properties + +/** + * Kafka Sink that can be used with SummingBird to sink messages to a Kafka Queue + * @author Mansur Ashraf + * @since 11/22/13 + */ +@deprecated("use com.twitter.storehaus.kafka.KafkaStore with com.twitter.summingbird.storm.WritableStoreSink","0.9.0") +class KafkaSink[K, V](dispatcher: Dispatcher[K, V]) extends Serializable { + /** + * Function that satisfies Storm#Sink + * @return () => (K,V) => Future[Unit] + */ + def write: () => Dispatcher[K, V] = () => dispatcher + + /** + * Converts KafkaSink[k,V] to KafkaSink[k1,V1] + * @param kfn function that converts K1 to K + * @param inj injection from V1 to V + * @tparam K1 new Store Key + * @tparam V1 new Store Value + * @return KafkaSink[k1,V1] + */ + def convert[K1, V1](kfn: K1 => K)(implicit inj: Injection[V1, V]) = { + val fn: Dispatcher[K1, V1] = { + kv: (K1, V1) => dispatcher(compose(kfn, inj)(kv)) + } + new KafkaSink[K1, V1](fn) + } + + /** + * Filter all the messages that do not satisfy the given predicate + * @param fn predicate + * @return KafkaSink + */ + def filter(fn: ((K, V)) => Boolean) = { + val f: Dispatcher[K, V] = { + kv: (K, V) => + if (fn(kv)) dispatcher(kv) + else Future.Unit + } + new KafkaSink[K, V](f) + } + + private def compose[K1, V1](kfn: K1 => K, inj: Injection[V1, V]): ((K1, V1)) => ((K, V)) = { + case (k: K1, v: V1) => (kfn(k), inj(v)) + } +} + +object KafkaSink { + + type Dispatcher[K, V] = ((K, V)) => Future[Unit] + + /** + * Creates KafkaSink by wrapping KafkaStore + * @param store KafkaStore + * @tparam K key + * @tparam V value + * @return KafkaSink + */ + def apply[K, V](store: KafkaStore[K, V]): KafkaSink[K, V] = { + val sink = new KafkaSink[K, V](store.put) + sink + } + + /** + * Returns KafkaSink[K,V] + * @param brokers kafka brokers + * @param topic kafka topic. + * @tparam K key + * @tparam V value + * @return KafkaSink[K,V] + */ + def apply[K, V, E <: Encoder[V] : Manifest](brokers: Seq[String], topic: String): KafkaSink[K, V] = { + val store = KafkaStore[K, V, E](brokers, topic) + lazy val sink = apply[K, V](store) + sink + } + + /** + * Returns KafkaSink[K,V] + * @param props kafka props + * @param topic kafka topic. + * @tparam K key + * @tparam V value + * @return KafkaSink[K,V] + */ + def apply[K, V](props: Properties, topic: String): KafkaSink[K, V] = { + lazy val store = KafkaStore[K, V](topic, props) + lazy val sink = apply[K, V](store) + sink + } +} + diff --git a/storehaus-kafka-08/src/main/scala/com/twitter/storehaus/kafka/KafkaStore.scala b/storehaus-kafka-08/src/main/scala/com/twitter/storehaus/kafka/KafkaStore.scala new file mode 100644 index 00000000..cc1d442a --- /dev/null +++ b/storehaus-kafka-08/src/main/scala/com/twitter/storehaus/kafka/KafkaStore.scala @@ -0,0 +1,96 @@ +/* + * Copyright 2014 Twitter inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.twitter.storehaus.kafka + +import com.twitter.storehaus.WritableStore +import com.twitter.util.{Time, Future} +import java.util.Properties +import kafka.producer.{KeyedMessage, Producer, ProducerConfig} +import kafka.serializer.Encoder + + +/** + * Store capable of writing to a Kafka Topic. + * @author Mansur Ashraf + * @since 11/22/13 + */ +class KafkaStore[K, V](topic: String, props: Properties) extends WritableStore[K, V] with Serializable { + private lazy val producerConfig = new ProducerConfig(props) + private lazy val producer = new Producer[K, V](producerConfig) + + + /** + * Puts a key/value pair on a Kafka Topic using kafka.producer.AyncProducer and does not block thread + * @param kv (key,value) + * @return Future.unit + */ + override def put(kv: (K, V)): Future[Unit] = + Future { + val (key, value) = kv + producer.send(new KeyedMessage[K, V](topic, key, value)) + } + + + override def multiPut[K1 <: K](kvs: Map[K1, V]): Map[K1, Future[Unit]] = { + val future = Future { + val batch = kvs.map { + case (k, v) => new KeyedMessage[K, V](topic, k, v) + }.toList + producer.send(batch: _*) + } + kvs.mapValues(v => future) + } + + /** Close this store and release any resources. + * It is undefined what happens on get/multiGet after close + */ + override def close(time: Time): Future[Unit] = Future { + producer.close() + } +} + +object KafkaStore { + + /** + * Creates an instance of Kafka store based on given properties. + * @param topic Kafka topic. + * @param props Kafka properties { @see http://kafka.apache.org/07/configuration.html}. + * @tparam K Key + * @tparam V Value + * @return Kafka Store + */ + def apply[K, V](topic: String, props: Properties) = new KafkaStore[K, V](topic, props) + + /** + * Creates a Kafka store. + * @param brokers zookeeper quorum. + * @param topic Kafka topic. + * @tparam K Key + * @tparam V Value + * @return Kafka Store + */ + def apply[K, V, E <: Encoder[V] : Manifest](brokers: Seq[String], + topic: String)= new KafkaStore[K, V](topic, createProp[V, E](brokers)) + + + private def createProp[V, E <: Encoder[V] : Manifest](brokers: Seq[String]): Properties = { + val prop = new Properties() + prop.put("serializer.class", implicitly[Manifest[E]].erasure.getName) + prop.put("metadata.broker.list", brokers.mkString(",")) + prop + } +} \ No newline at end of file diff --git a/storehaus-kafka-08/src/test/java/kafka/DataTuple.java b/storehaus-kafka-08/src/test/java/kafka/DataTuple.java new file mode 100644 index 00000000..93ddfdac --- /dev/null +++ b/storehaus-kafka-08/src/test/java/kafka/DataTuple.java @@ -0,0 +1,248 @@ +/* + * Copyright 2014 Twitter inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * Autogenerated by Avro + * + * DO NOT EDIT DIRECTLY + */ +package kafka; +@SuppressWarnings("all") +public class DataTuple extends org.apache.avro.specific.SpecificRecordBase implements org.apache.avro.specific.SpecificRecord { + public static final org.apache.avro.Schema SCHEMA$ = new org.apache.avro.Schema.Parser().parse("{\"type\":\"record\",\"name\":\"DataTuple\",\"namespace\":\"kafka\",\"fields\":[{\"name\":\"value\",\"type\":\"long\"},{\"name\":\"key\",\"type\":\"string\"},{\"name\":\"timestamp\",\"type\":\"long\"}]}"); + @Deprecated public long value; + @Deprecated public CharSequence key; + @Deprecated public long timestamp; + + /** + * Default constructor. + */ + public DataTuple() {} + + /** + * All-args constructor. + */ + public DataTuple(Long value, CharSequence key, Long timestamp) { + this.value = value; + this.key = key; + this.timestamp = timestamp; + } + + public org.apache.avro.Schema getSchema() { return SCHEMA$; } + // Used by DatumWriter. Applications should not call. + public Object get(int field$) { + switch (field$) { + case 0: return value; + case 1: return key; + case 2: return timestamp; + default: throw new org.apache.avro.AvroRuntimeException("Bad index"); + } + } + // Used by DatumReader. Applications should not call. + @SuppressWarnings(value="unchecked") + public void put(int field$, Object value$) { + switch (field$) { + case 0: value = (Long)value$; break; + case 1: key = (CharSequence)value$; break; + case 2: timestamp = (Long)value$; break; + default: throw new org.apache.avro.AvroRuntimeException("Bad index"); + } + } + + /** + * Gets the value of the 'value' field. + */ + public Long getValue() { + return value; + } + + /** + * Sets the value of the 'value' field. + * @param value the value to set. + */ + public void setValue(Long value) { + this.value = value; + } + + /** + * Gets the value of the 'key' field. + */ + public CharSequence getKey() { + return key; + } + + /** + * Sets the value of the 'key' field. + * @param value the value to set. + */ + public void setKey(CharSequence value) { + this.key = value; + } + + /** + * Gets the value of the 'timestamp' field. + */ + public Long getTimestamp() { + return timestamp; + } + + /** + * Sets the value of the 'timestamp' field. + * @param value the value to set. + */ + public void setTimestamp(Long value) { + this.timestamp = value; + } + + /** Creates a new DataTuple RecordBuilder */ + public static Builder newBuilder() { + return new Builder(); + } + + /** Creates a new DataTuple RecordBuilder by copying an existing Builder */ + public static Builder newBuilder(Builder other) { + return new Builder(other); + } + + /** Creates a new DataTuple RecordBuilder by copying an existing DataTuple instance */ + public static Builder newBuilder(DataTuple other) { + return new Builder(other); + } + + /** + * RecordBuilder for DataTuple instances. + */ + public static class Builder extends org.apache.avro.specific.SpecificRecordBuilderBase + implements org.apache.avro.data.RecordBuilder { + + private long value; + private CharSequence key; + private long timestamp; + + /** Creates a new Builder */ + private Builder() { + super(DataTuple.SCHEMA$); + } + + /** Creates a Builder by copying an existing Builder */ + private Builder(Builder other) { + super(other); + } + + /** Creates a Builder by copying an existing DataTuple instance */ + private Builder(DataTuple other) { + super(DataTuple.SCHEMA$); + if (isValidValue(fields()[0], other.value)) { + this.value = (Long) data().deepCopy(fields()[0].schema(), other.value); + fieldSetFlags()[0] = true; + } + if (isValidValue(fields()[1], other.key)) { + this.key = (CharSequence) data().deepCopy(fields()[1].schema(), other.key); + fieldSetFlags()[1] = true; + } + if (isValidValue(fields()[2], other.timestamp)) { + this.timestamp = (Long) data().deepCopy(fields()[2].schema(), other.timestamp); + fieldSetFlags()[2] = true; + } + } + + /** Gets the value of the 'value' field */ + public Long getValue() { + return value; + } + + /** Sets the value of the 'value' field */ + public Builder setValue(long value) { + validate(fields()[0], value); + this.value = value; + fieldSetFlags()[0] = true; + return this; + } + + /** Checks whether the 'value' field has been set */ + public boolean hasValue() { + return fieldSetFlags()[0]; + } + + /** Clears the value of the 'value' field */ + public Builder clearValue() { + fieldSetFlags()[0] = false; + return this; + } + + /** Gets the value of the 'key' field */ + public CharSequence getKey() { + return key; + } + + /** Sets the value of the 'key' field */ + public Builder setKey(CharSequence value) { + validate(fields()[1], value); + this.key = value; + fieldSetFlags()[1] = true; + return this; + } + + /** Checks whether the 'key' field has been set */ + public boolean hasKey() { + return fieldSetFlags()[1]; + } + + /** Clears the value of the 'key' field */ + public Builder clearKey() { + key = null; + fieldSetFlags()[1] = false; + return this; + } + + /** Gets the value of the 'timestamp' field */ + public Long getTimestamp() { + return timestamp; + } + + /** Sets the value of the 'timestamp' field */ + public Builder setTimestamp(long value) { + validate(fields()[2], value); + this.timestamp = value; + fieldSetFlags()[2] = true; + return this; + } + + /** Checks whether the 'timestamp' field has been set */ + public boolean hasTimestamp() { + return fieldSetFlags()[2]; + } + + /** Clears the value of the 'timestamp' field */ + public Builder clearTimestamp() { + fieldSetFlags()[2] = false; + return this; + } + + @Override + public DataTuple build() { + try { + DataTuple record = new DataTuple(); + record.value = fieldSetFlags()[0] ? this.value : (Long) defaultValue(fields()[0]); + record.key = fieldSetFlags()[1] ? this.key : (CharSequence) defaultValue(fields()[1]); + record.timestamp = fieldSetFlags()[2] ? this.timestamp : (Long) defaultValue(fields()[2]); + return record; + } catch (Exception e) { + throw new org.apache.avro.AvroRuntimeException(e); + } + } + } +} diff --git a/storehaus-kafka-08/src/test/resources/log4j.xml b/storehaus-kafka-08/src/test/resources/log4j.xml new file mode 100644 index 00000000..6e2dfffc --- /dev/null +++ b/storehaus-kafka-08/src/test/resources/log4j.xml @@ -0,0 +1,32 @@ + + + + + + + + + + + + + + + + + \ No newline at end of file diff --git a/storehaus-kafka-08/src/test/scala/com/twitter/storehaus/kafka/KafkaAvroSinkSpec.scala b/storehaus-kafka-08/src/test/scala/com/twitter/storehaus/kafka/KafkaAvroSinkSpec.scala new file mode 100644 index 00000000..7fbe1e92 --- /dev/null +++ b/storehaus-kafka-08/src/test/scala/com/twitter/storehaus/kafka/KafkaAvroSinkSpec.scala @@ -0,0 +1,59 @@ +/* + * Copyright 2014 Twitter inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.twitter.storehaus.kafka + +import org.specs2.mutable.Specification +import kafka.DataTuple +import java.util.Date +import com.twitter.util.{Future, Await} +import kafka.consumer.{ConsumerTimeoutException, Whitelist} +import KafkaInjections._ +import kafka.serializer.Decoder + +/** + * Integration Test! Remove .pendingUntilFixed if testing against a Kafka Cluster + * @author Mansur Ashraf + * @since 12/8/13 + */ +class KafkaAvroSinkSpec extends Specification { + "KafkaAvroSink" should { + "put avro object on a topic" in new KafkaContext { + val topic = "avro-topic-" + random + val sink = KafkaAvroSink[DataTuple](Seq(broker), topic,executor) + .filter { + case (k, v) => v.getValue % 2 == 0 + } + + val futures = (1 to 10) + .map(new DataTuple(_, "key", new Date().getTime)) + .map(sink.write()("key", _)) + + Await.result(Future.collect(futures)) + + try { + val stream = consumer.createMessageStreamsByFilter(new Whitelist(topic), 1,implicitly[Decoder[String]], implicitly[Decoder[DataTuple]])(0) + val iterator = stream.iterator() + iterator.next().message.getValue % 2 === 0 + iterator.next().message.getValue % 2 === 0 + iterator.next().message.getValue % 2 === 0 + iterator.next().message.getValue % 2 === 0 + } catch { + case e: ConsumerTimeoutException => failure("test failed as consumer timed out without getting any msges") + } + }.pendingUntilFixed + } +} diff --git a/storehaus-kafka-08/src/test/scala/com/twitter/storehaus/kafka/KafkaContext.scala b/storehaus-kafka-08/src/test/scala/com/twitter/storehaus/kafka/KafkaContext.scala new file mode 100644 index 00000000..c6772914 --- /dev/null +++ b/storehaus-kafka-08/src/test/scala/com/twitter/storehaus/kafka/KafkaContext.scala @@ -0,0 +1,58 @@ +/* + * Copyright 2014 Twitter inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +package com.twitter.storehaus.kafka + +import org.specs2.specification.Scope +import java.util.concurrent.Executors +import com.twitter.concurrent.NamedPoolThreadFactory +import java.util.{Properties, Random} +import kafka.serializer.{Decoder, StringEncoder} +import kafka.consumer.{Consumer, ConsumerConfig} +import kafka.message.Message +import com.twitter.bijection.Injection +import java.nio.ByteBuffer +import org.apache.avro.specific.SpecificRecordBase +import com.twitter.bijection.avro.SpecificAvroCodecs +import kafka.DataTuple + +/** + * @author Mansur Ashraf + * @since 12/7/13 + */ +trait KafkaContext extends Scope { + + val zK = "localhost:2181" + val broker = "localhost:9092" + lazy val executor = Executors.newCachedThreadPool(new NamedPoolThreadFactory("KafkaTestPool")) + implicit val dataTupleInj= SpecificAvroCodecs[DataTuple] + + def store(topic: String) = KafkaStore[String, String,StringEncoder](Seq(broker), topic) + + def random = new Random().nextInt(100000) + + //Consumer props + val props = new Properties() + props.put("group.id", "consumer-"+random) + props.put("autocommit.interval.ms", 1000.toString) + props.put("zookeeper.connect", zK) + props.put("consumer.timeout.ms", (60 * 1000).toString) + props.put("auto.offset.reset", "smallest") + val config = new ConsumerConfig(props) + lazy val consumer = Consumer.create(config) +} + diff --git a/storehaus-kafka-08/src/test/scala/com/twitter/storehaus/kafka/KafkaStoreSpec.scala b/storehaus-kafka-08/src/test/scala/com/twitter/storehaus/kafka/KafkaStoreSpec.scala new file mode 100644 index 00000000..609b1909 --- /dev/null +++ b/storehaus-kafka-08/src/test/scala/com/twitter/storehaus/kafka/KafkaStoreSpec.scala @@ -0,0 +1,70 @@ +/* + * Copyright 2014 Twitter inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.twitter.storehaus.kafka + +import org.specs2.mutable.Specification +import kafka.serializer.Decoder +import com.twitter.util.{Future, Await} +import kafka.consumer.{ConsumerTimeoutException, Whitelist} +import KafkaInjections._ + +/** + * Integration Test! Remove .pendingUntilFixed if testing against a Kafka Cluster + * @author Mansur Ashraf + * @since 12/7/13 + */ +class KafkaStoreSpec extends Specification { + + "Kafka store" should { + "put a value on a topic" in new KafkaContext { + val topic = "test-topic-" + random + + Await.result(store(topic).put("testKey", "testValue")) + try { + val stream = consumer.createMessageStreamsByFilter(new Whitelist(topic), 1, implicitly[Decoder[String]], implicitly[Decoder[String]])(0) + val message = stream.iterator().next().message + message === "testValue" + } + catch { + case e: ConsumerTimeoutException => failure("test failed as consumer timed out without getting any msges") + } + } .pendingUntilFixed + + "put multiple values on a topic" in new KafkaContext { + val multiput_topic = "multiput-test-topic-" + random + + private val map = Map( + "Key_1" -> "value_2", + "Key_2" -> "value_4", + "Key_3" -> "value_6" + ) + + private val multiputResponse = store(multiput_topic).multiPut(map) + Await.result(Future.collect(multiputResponse.values.toList)) + try { + val stream = consumer.createMessageStreamsByFilter(new Whitelist(multiput_topic), 1, implicitly[Decoder[String]], implicitly[Decoder[String]])(0) + val iterator = stream.iterator() + iterator.next().message.contains("value_") + iterator.next().message.contains("value_") + iterator.next().message.contains("value_") + } + catch { + case e: ConsumerTimeoutException => failure("test failed as consumer timed out without getting any msges") + } + }.pendingUntilFixed + } +} diff --git a/storehaus-kafka/src/main/scala/com/twitter/storehaus/kafka/KafkaAvroSink.scala b/storehaus-kafka/src/main/scala/com/twitter/storehaus/kafka/KafkaAvroSink.scala index aa58663e..3b6de332 100644 --- a/storehaus-kafka/src/main/scala/com/twitter/storehaus/kafka/KafkaAvroSink.scala +++ b/storehaus-kafka/src/main/scala/com/twitter/storehaus/kafka/KafkaAvroSink.scala @@ -1,5 +1,5 @@ /* - * Copyright 2013 Twitter inc. + * Copyright 2014 Twitter inc. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -27,6 +27,7 @@ import com.twitter.bijection._ /** * KafkaSink capable of sending Avro messages to a Kafka Topic */ +@deprecated("use com.twitter.storehaus.kafka.KafkaStore with com.twitter.summingbird.storm.WritableStoreSink","0.9.0") object KafkaAvroSink { import com.twitter.bijection.StringCodec.utf8 diff --git a/storehaus-kafka/src/main/scala/com/twitter/storehaus/kafka/KafkaSink.scala b/storehaus-kafka/src/main/scala/com/twitter/storehaus/kafka/KafkaSink.scala index 8d8bca66..2966b8ef 100644 --- a/storehaus-kafka/src/main/scala/com/twitter/storehaus/kafka/KafkaSink.scala +++ b/storehaus-kafka/src/main/scala/com/twitter/storehaus/kafka/KafkaSink.scala @@ -1,5 +1,5 @@ /* - * Copyright 2013 Twitter inc. + * Copyright 2014 Twitter inc. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -30,6 +30,7 @@ import com.twitter.storehaus.kafka.KafkaInjections.ByteArrayEncoder * @author Mansur Ashraf * @since 11/22/13 */ +@deprecated("use com.twitter.storehaus.kafka.KafkaStore with com.twitter.summingbird.storm.WritableStoreSink") class KafkaSink[K, V](dispatcher: Dispatcher[K, V]) extends Serializable { /** * Function that satisfies Storm#Sink {@see SummingBird-Storm}