Skip to content

Commit

Permalink
Merge pull request #18 from twitter/develop
Browse files Browse the repository at this point in the history
Merge from twitter/storehaus
  • Loading branch information
rubanm committed Aug 2, 2014
2 parents b50b274 + 9c7db24 commit 51623db
Show file tree
Hide file tree
Showing 38 changed files with 2,184 additions and 30 deletions.
2 changes: 1 addition & 1 deletion .travis.yml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
language: scala
scala:
- 2.10.3
- 2.10.4
- 2.9.3
before_script:
- mysql -u root -e "create database storehaus_test;"
Expand Down
29 changes: 29 additions & 0 deletions CHANGES.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,34 @@
# Storehaus #

### Version 0.9.1 ###
* Feature/write through cache perf: https://github.com/twitter/storehaus/pull/234
* Share the Retrying Read Write store in storehaus repo: https://github.com/twitter/storehaus/pull/230
* initial Kafka 0.8 support: https://github.com/twitter/storehaus/pull/232
* Exceptions on the cache-store should be ignored for Read/WriteThroughStore: https://github.com/twitter/storehaus/pull/225

### Version 0.9.0 ###
* Reporting store algebra: https://github.com/twitter/storehaus/pull/176
* Bumping finagle to a more recent version, changes that were required: https://github.com/twitter/storehaus/pull/223
* Bump Algebird to version 0.5.0: https://github.com/twitter/storehaus/pull/221
* Add stores for read-through and write-through caching: https://github.com/twitter/storehaus/pull/220
* fix bug in onFailure enriched mergeable store: https://github.com/twitter/storehaus/pull/218
* Fixes an issue that Future.collect is N^2 on scala Lists: https://github.com/twitter/storehaus/pull/219
* Adds GetBatchingReadableStore: https://github.com/twitter/storehaus/pull/215
* Elastic Search Store: https://github.com/twitter/storehaus/pull/205
* Issue #72: Added mongodb store.: https://github.com/twitter/storehaus/pull/199
* Add out of retries exception to retrying store: https://github.com/twitter/storehaus/pull/210
* IterableStore: https://github.com/twitter/storehaus/pull/191
* add onFailure to EnrichedMergeableStore: https://github.com/twitter/storehaus/pull/200
* clean up htable after finishing get and put operations.: https://github.com/twitter/storehaus/pull/207
* Adds a mutable TTL cache: https://github.com/twitter/storehaus/pull/196
* add MergeableStore.fromStoreNoMulti that does single get then put: https://github.com/twitter/storehaus/pull/201
* my little proxy: https://github.com/twitter/storehaus/pull/202
* Add immutable LIRS Cache implementation: https://github.com/twitter/storehaus/pull/155
* Adds the CalendarTimeStrategy: https://github.com/twitter/storehaus/pull/195
* Adds the ability to add an Optional component onto any strategy: https://github.com/twitter/storehaus/pull/198
* Just adds some whitespace: https://github.com/twitter/storehaus/pull/197
* Kafka Sink for SummingBird: https://github.com/twitter/storehaus/pull/192

### Version 0.8.0 ###
* add BatchedStore for writes: https://github.com/twitter/storehaus/pull/175
* MySQL batched multiPut: https://github.com/twitter/storehaus/pull/173
Expand Down
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ Discussion occurs primarily on the [Storehaus mailing list](https://groups.googl

## Maven

Storehaus modules are available on maven central. The current groupid and version for all modules is, respectively, `"com.twitter"` and `0.6.0`.
Storehaus modules are available on maven central. The current groupid and version for all modules is, respectively, `"com.twitter"` and `0.9.0`.

Current published artifacts are

Expand Down
43 changes: 35 additions & 8 deletions project/Build.scala
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,9 @@ import Keys._
import spray.boilerplate.BoilerplatePlugin.Boilerplate
import com.typesafe.tools.mima.plugin.MimaPlugin.mimaDefaultSettings
import com.typesafe.tools.mima.plugin.MimaKeys.previousArtifact
import sbtassembly.Plugin._
import AssemblyKeys._


object StorehausBuild extends Build {
def withCross(dep: ModuleID) =
Expand All @@ -35,7 +38,7 @@ object StorehausBuild extends Build {
case version if version startsWith "2.10" => "org.specs2" %% "specs2" % "1.13" % "test"
}
val extraSettings =
Project.defaultSettings ++ Boilerplate.settings ++ mimaDefaultSettings
Project.defaultSettings ++ Boilerplate.settings ++ assemblySettings ++ mimaDefaultSettings

def ciSettings: Seq[Project.Setting[_]] =
if (sys.env.getOrElse("TRAVIS", "false").toBoolean) Seq(
Expand All @@ -55,8 +58,8 @@ object StorehausBuild extends Build {
val sharedSettings = extraSettings ++ ciSettings ++ Seq(
organization := "com.twitter",
scalaVersion := "2.9.3",
version := "0.8.0",
crossScalaVersions := Seq("2.9.3", "2.10.0"),
version := "0.9.1",
crossScalaVersions := Seq("2.9.3", "2.10.4"),
javacOptions ++= Seq("-source", "1.6", "-target", "1.6"),
javacOptions in doc := Seq("-source", "1.6"),
libraryDependencies <+= scalaVersion(specs2Import(_)),
Expand Down Expand Up @@ -114,12 +117,12 @@ object StorehausBuild extends Build {
def youngestForwardCompatible(subProj: String) =
Some(subProj)
.filterNot(unreleasedModules.contains(_))
.map { s => "com.twitter" % ("storehaus-" + s + "_2.9.3") % "0.8.0" }
.map { s => "com.twitter" % ("storehaus-" + s + "_2.9.3") % "0.9.0" }

val algebirdVersion = "0.3.1"
val bijectionVersion = "0.6.2"
val algebirdVersion = "0.7.0"
val bijectionVersion = "0.6.3"
val utilVersion = "6.11.0"
val scaldingVersion = "0.9.0rc4"
val scaldingVersion = "0.11.1"

lazy val storehaus = Project(
id = "storehaus",
Expand All @@ -139,6 +142,7 @@ object StorehausBuild extends Build {
storehausHBase,
storehausDynamoDB,
storehausKafka,
storehausKafka08,
storehausMongoDB,
storehausElastic,
storehausTesting
Expand Down Expand Up @@ -225,7 +229,7 @@ object StorehausBuild extends Build {
libraryDependencies ++= Seq (
"com.twitter" %% "bijection-core" % bijectionVersion,
"com.twitter" %% "bijection-avro" % bijectionVersion,
"com.twitter"%"kafka_2.9.2"%"0.7.0" excludeAll(
"com.twitter"%"kafka_2.9.2"%"0.7.0" % "provided" excludeAll(
ExclusionRule("com.sun.jdmk","jmxtools"),
ExclusionRule( "com.sun.jmx","jmxri"),
ExclusionRule( "javax.jms","jms")
Expand All @@ -235,6 +239,19 @@ object StorehausBuild extends Build {
parallelExecution in Test := false
).dependsOn(storehausAlgebra % "test->test;compile->compile")

lazy val storehausKafka08 = module("kafka-08").settings(
libraryDependencies ++= Seq (
"com.twitter" %% "bijection-core" % bijectionVersion,
"com.twitter" %% "bijection-avro" % bijectionVersion,
"org.apache.kafka" % "kafka_2.9.2" % "0.8.0" % "provided" excludeAll(
ExclusionRule(organization = "com.sun.jdmk"),
ExclusionRule(organization = "com.sun.jmx"),
ExclusionRule(organization = "javax.jms"))
),
// we don't want various tests clobbering each others keys
parallelExecution in Test := false
).dependsOn(storehausCore,storehausAlgebra % "test->test;compile->compile")

lazy val storehausMongoDB= module("mongodb").settings(
libraryDependencies ++= Seq(
"com.twitter" %% "bijection-core" % bijectionVersion,
Expand Down Expand Up @@ -265,4 +282,14 @@ object StorehausBuild extends Build {
withCross("com.twitter" %% "util-core" % utilVersion))
)
)

lazy val storehausCaliper = module("caliper").settings(
libraryDependencies ++= Seq("com.google.caliper" % "caliper" % "0.5-rc1",
"com.google.code.java-allocation-instrumenter" % "java-allocation-instrumenter" % "2.0",
"com.google.code.gson" % "gson" % "1.7.1",
"com.twitter" %% "bijection-core" % bijectionVersion,
"com.twitter" %% "algebird-core" % algebirdVersion),
javaOptions in run <++= (fullClasspath in Runtime) map { cp => Seq("-cp", sbt.Build.data(cp).mkString(":")) }
).dependsOn(storehausCore, storehausAlgebra, storehausCache)

}
2 changes: 1 addition & 1 deletion project/Finagle.scala
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ package storehaus
* dependency */
object Finagle {
import sbt._
val LatestVersion = "6.5.1"
val LatestVersion = "6.12.2"
def module(name: String, version: String = LatestVersion) =
StorehausBuild.withCross("com.twitter" %% "finagle-%s".format(name) % version)
}
2 changes: 2 additions & 0 deletions project/plugins.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -8,3 +8,5 @@ addSbtPlugin("com.typesafe.sbt" % "sbt-ghpages" % "0.5.1")
addSbtPlugin("com.typesafe" % "sbt-mima-plugin" % "0.1.6")

addSbtPlugin("io.spray" % "sbt-boilerplate" % "0.5.1")

addSbtPlugin("com.eed3si9n" % "sbt-assembly" % "0.11.2")
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
/*
* Copyright 2013 Twitter Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License. You may obtain
* a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.twitter.storehaus.algebra

import com.twitter.storehaus.{Store, EagerWriteThroughCacheStore }
import com.twitter.storehaus.cache.MutableCache
import com.twitter.util.Future
import com.twitter.storehaus.cache.{MutableCache, HeavyHittersPercent, WriteOperationUpdateFrequency, RollOverFrequencyMS, HHFilteredCache}

object HHFilteredStore {
def buildStore[K, V](store: Store[K, V], cache: MutableCache[K, Future[Option[V]]], hhPct: HeavyHittersPercent,
writeUpdateFreq: WriteOperationUpdateFrequency, rolloverFreq: RollOverFrequencyMS): Store[K, V] = {
val filteredCacheStore = new HHFilteredCache(cache, hhPct, writeUpdateFreq, rolloverFreq)
new EagerWriteThroughCacheStore[K, V](store, filteredCacheStore)
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
/*
* Copyright 2013 Twitter Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License. You may obtain
* a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.twitter.storehaus.algebra.reporting

import com.twitter.storehaus.{Store, ReadableStore, WritableStore}
import com.twitter.storehaus.algebra.Mergeable
import com.twitter.util.{Future, Time}


object Reporter {
def sideEffect[T, U](params: U, f: Future[T], sideEffect: (U, Future[T]) => Future[Unit]): Future[T] =
Future.join(f, sideEffect(params, f)).map(_._1)

def sideEffect[T, K, U](params: U, f: Map[K, Future[T]], sideEffect: (U, Map[K, Future[T]]) => Map[K, Future[Unit]]): Map[K, Future[T]] = {
val effected = sideEffect(params, f)
f.map{case (k, v) =>
val unitF = effected.getOrElse(k, sys.error("Reporter for multi side effect didn't return a future for key" + k.toString))
(k, Future.join(v, unitF).map(_._1))
}
}
}

trait ReadableStoreReporter[S <: ReadableStore[K, V], K, V] extends ReadableStore[K, V] {
def self: S
override def get(k: K): Future[Option[V]] = Reporter.sideEffect(k, self.get(k), traceGet)
override def multiGet[K1 <: K](keys: Set[K1]) = Reporter.sideEffect(keys, self.multiGet(keys), traceMultiGet)

protected def traceMultiGet[K1 <: K](ks: Set[K1], request: Map[K1, Future[Option[V]]]): Map[K1, Future[Unit]]
protected def traceGet(k: K, request: Future[Option[V]]): Future[Unit]
override def close(time: Time) = self.close(time)
}


trait WritableStoreReporter[S <: WritableStore[K, V],K, V] extends WritableStore[K, V] {
def self: S
override def put(kv: (K, V)): Future[Unit] = Reporter.sideEffect(kv, self.put(kv), tracePut)
override def multiPut[K1 <: K](kvs: Map[K1, V]) = Reporter.sideEffect(kvs, self.multiPut(kvs), traceMultiPut)

protected def tracePut(kv: (K, V), request: Future[Unit]): Future[Unit]
protected def traceMultiPut[K1 <: K](kvs: Map[K1, V], request: Map[K1, Future[Unit]]): Map[K1, Future[Unit]]
override def close(time: Time) = self.close(time)
}


trait MergeableReporter[S <: Mergeable[K, V],K, V] extends Mergeable[K, V] {
def self: S
override def merge(kv: (K, V)) = Reporter.sideEffect(kv, self.merge(kv), traceMerge)
override def multiMerge[K1 <: K](kvs: Map[K1, V]) = Reporter.sideEffect(kvs, self.multiMerge(kvs), traceMultiMerge)

protected def traceMerge(kv: (K, V), request: Future[Option[V]]): Future[Unit]
protected def traceMultiMerge[K1 <: K](kvs: Map[K1, V], request: Map[K1, Future[Option[V]]]): Map[K1, Future[Unit]]
override def close(time: Time) = self.close(time)
}


trait StoreReporter[S <: Store[K, V], K, V] extends ReadableStoreReporter[S, K, V] with WritableStoreReporter[S, K, Option[V]] {
def self: S
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@

/*
* Copyright 2013 Twitter Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License. You may obtain
* a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.twitter.storehaus.algebra.reporting

import com.twitter.util.{ Await, Future }
import org.scalacheck.{ Arbitrary, Properties }
import org.scalacheck.Gen.choose
import org.scalacheck.Prop._

import com.twitter.storehaus.algebra._

object ReportingMergeableStoreProperties extends Properties("ReportingMergeableStore") {
import MergeableStoreProperties.{mergeableStoreTest, newStore}



class DummyReporter[K, V](val self: Mergeable[K, V]) extends MergeableProxy[K, V] with MergeableReporter[Mergeable[K, V], K, V] {
def traceMerge(kv: (K, V), request: Future[Option[V]]) = request.unit
def traceMultiMerge[K1 <: K](kvs: Map[K1, V], request: Map[K1, Future[Option[V]]]) = request.mapValues(_.unit)
}

property("Mergable stat store obeys the mergeable store proporites") =
mergeableStoreTest {
val store = newStore[Int, Int]
new MergeableStoreProxy[Int, Int] with MergeableReporter[Mergeable[Int, Int], Int, Int] {
val self = store
def traceMerge(kv: (Int, Int), request: Future[Option[Int]]) = request.unit
def traceMultiMerge[K1 <: Int](kvs: Map[K1, Int], request: Map[K1, Future[Option[Int]]]) = request.mapValues(_.unit)
}
}

property("merge Some/None count matches") = forAll { (base: Map[Int, Int], merge: Map[Int, Int]) =>
var mergeWithSomeCount = 0
var mergeWithNoneCount = 0
val baseStore = MergeableStore.fromStore(newStore[Int, Int])
baseStore.multiMerge(base)

val wrappedStore = new DummyReporter[Int, Int](baseStore) {
override def traceMerge(kv: (Int, Int), request: Future[Option[Int]]) = {
request.map { optV =>
optV match {
case Some(_) => mergeWithSomeCount += 1
case None => mergeWithNoneCount += 1
}
}.unit
}
}

merge.map(kv => wrappedStore.merge((kv._1, kv._2)))

val existsBeforeList = merge.keySet.toList.map(k => base.get(k))

existsBeforeList.collect{case Some(_) => 1}.size == mergeWithSomeCount &&
existsBeforeList.collect{case None => 1}.size == mergeWithNoneCount
}

property("multiMerge Some/None count matches") = forAll { (base: Map[Int, Int], merge: Map[Int, Int]) =>
var mergeWithSomeCount = 0
var mergeWithNoneCount = 0
val baseStore = MergeableStore.fromStore(newStore[Int, Int])
baseStore.multiMerge(base)

val wrappedStore = new DummyReporter[Int, Int](baseStore) {
override def traceMultiMerge[K1 <: Int](kvs: Map[K1, Int], request: Map[K1, Future[Option[Int]]]) = {
request.mapValues{optV =>
optV.map{ v =>
v match {
case Some(_) => mergeWithSomeCount += 1
case None => mergeWithNoneCount += 1
}
}.unit
}
}
}

wrappedStore.multiMerge(merge)

val existsBeforeList = merge.keySet.toList.map(k => base.get(k))

existsBeforeList.collect{case Some(_) => 1}.size == mergeWithSomeCount &&
existsBeforeList.collect{case None => 1}.size == mergeWithNoneCount
}
}
Loading

0 comments on commit 51623db

Please sign in to comment.