diff --git a/.travis.yml b/.travis.yml index 3c1941a4..0a04e577 100644 --- a/.travis.yml +++ b/.travis.yml @@ -1,7 +1,8 @@ language: scala +sudo: false scala: - - 2.10.4 - - 2.9.3 + - 2.10.5 + - 2.11.7 before_script: - mysql -u root -e "create database storehaus_test;" - mysql -u root -e "create user 'storehaususer'@'localhost' identified by 'test1234';" diff --git a/CHANGES.md b/CHANGES.md index 874bc2a6..b59c2a85 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -1,5 +1,43 @@ # Storehaus # +### Version 0.12.0 ### +* Update for new finagle: https://github.com/twitter/storehaus/pull/277 +* Move version to separate file, recommended way to do it for sbt: https://github.com/twitter/storehaus/pull/276 +* Upgrades scalacheck to the same as other projects, add same scalac op…: https://github.com/twitter/storehaus/pull/275 +* Updates for 2.11: https://github.com/twitter/storehaus/pull/274 +* AsyncHBaseStringStore is returning wrong instance: https://github.com/twitter/storehaus/pull/271 + +### Version 0.11.2 ### +* Algebird to 0.10.2 #269 + +### Version 0.11.1 ### +* storehaus-memcache: pass ttl for MergeableMemcacheStore CAS calls #262 +* Upgrade Finagle and Util #265 +* Ugprade finagle-memcached to finagle-memcachedx #266 +* Elasticsearch test increase timeout #267 + +### Version 0.11.0 ### +* Add correct String/ChannelBuffer injections #257 +* initial scalatest migration #260 +* Remove usage of twitter's maven repo, travis seems to dislike it -- mayb... #261 +* Bijection 0.8.0, algebird 0.10.0, scalding 0.14.0, and scala 2.10.5 + +### Version 0.10.0 ### +* Use latest scalding, algebird, and bijection versions: https://github.com/twitter/storehaus/pull/255 +* Use new Travis CI container infrastructure: https://github.com/twitter/storehaus/pull/254 +* Add hook for CAS based memcache mergeable: https://github.com/twitter/storehaus/pull/252 +* Bump bijection/algebird versions: https://github.com/twitter/storehaus/pull/253 +* Remove + operator: https://github.com/twitter/storehaus/pull/21 +* Memcache mergeable - use semigroup: https://github.com/twitter/storehaus/pull/251 +* add logic for replicating writes and reads to stores: https://github.com/twitter/storehaus/pull/20 +* bump finagle and util to 6.22.0: https://github.com/twitter/storehaus/pull/247 +* Minified kill 2.9.3: https://github.com/twitter/storehaus/pull/249 +* Read through store - do not query backing store when no cache miss: https://github.com/twitter/storehaus/pull/246 +* implementation of store that uses http protocol: https://github.com/twitter/storehaus/pull/241 +* Retry unittest: https://github.com/twitter/storehaus/pull/240 +* Added endpoint support to storehaus-dynamodb: https://github.com/twitter/storehaus/pull/236 +* Https sonatype: https://github.com/twitter/storehaus/pull/237 + ### Version 0.9.1 ### * Feature/write through cache perf: https://github.com/twitter/storehaus/pull/234 * Share the Retrying Read Write store in storehaus repo: https://github.com/twitter/storehaus/pull/230 diff --git a/README.md b/README.md index 38480c16..7f93ea03 100644 --- a/README.md +++ b/README.md @@ -83,21 +83,23 @@ The [`MergeableStore`](http://twitter.github.com/storehaus/#com.twitter.storehau Storehaus provides a number of modules wrapping existing key-value stores. Enriching these key-value stores with Storehaus's combinators has been hugely helpful to us here at Twitter. Writing your jobs in terms of Storehaus stores makes it easy to test your jobs; use an in-memory `JMapStore` in testing and a `MemcacheStore` in production. - * [Storehaus-memcache](http://twitter.github.com/storehaus/#com.twitter.storehaus.memcache.MemcacheStore) (wraps Twitter's [finagle-memcached](https://github.com/twitter/finagle/tree/master/finagle-memcached) library) + * [Storehaus-memcache](http://twitter.github.com/storehaus/#com.twitter.storehaus.memcache.MemcacheStore) (wraps Twitter's [finagle-memcachedx](https://github.com/twitter/finagle/tree/master/finagle-memcachedx) library) * [Storehaus-mysql](http://twitter.github.com/storehaus/#com.twitter.storehaus.mysql.MySqlStore) (wraps Twitter's [finagle-mysql](https://github.com/twitter/finagle/tree/master/finagle-mysql) library) * [Storehaus-redis](http://twitter.github.com/storehaus/#com.twitter.storehaus.redis.RedisStore) (wraps Twitter's [finagle-redis](https://github.com/twitter/finagle/tree/master/finagle-redis) library) * [Storehaus-hbase](http://twitter.github.com/storehaus/#com.twitter.storehaus.hbase.HBaseStore) - * [storehaus-dynamodb](https://github.com/twitter/storehaus/tree/develop/storehaus-dynamodb) + * [Storehaus-dynamodb](https://github.com/twitter/storehaus/tree/develop/storehaus-dynamodb) + * [Storehaus-leveldb](https://github.com/twitter/storehaus/tree/develop/storehaus-leveldb) #### Planned Modules Here's a list of modules we plan in implementing, with links to the github issues tracking progress on these modules: -* [storehaus-leveldb](https://github.com/twitter/storehaus/issues/51) * [storehaus-berkeleydb](https://github.com/twitter/storehaus/issues/52) ## Community and Documentation +This, and all [github.com/twitter](https://github.com/twitter) projects, are under the [Twitter Open Source Code of Conduct](https://engineering.twitter.com/opensource/code-of-conduct). Additionally, see the [Typelevel Code of Conduct](http://typelevel.org/conduct) for specific examples of harassing behavior that are not tolerated. + To learn more and find links to tutorials and information around the web, check out the [Storehaus Wiki](https://github.com/twitter/storehaus/wiki). The latest ScalaDocs are hosted on Storehaus's [Github Project Page](http://twitter.github.io/storehaus). @@ -106,27 +108,37 @@ Discussion occurs primarily on the [Storehaus mailing list](https://groups.googl ## Maven -Storehaus modules are available on maven central. The current groupid and version for all modules is, respectively, `"com.twitter"` and `0.9.0`. +Storehaus modules are available on maven central. The current groupid and version for all modules is, respectively, `"com.twitter"` and `0.12.0`. Current published artifacts are -* `storehaus-core_2.9.3` +* `storehaus-core_2.11` * `storehaus-core_2.10` -* `storehaus-algebra_2.9.3` +* `storehaus-algebra_2.11` * `storehaus-algebra_2.10` -* `storehaus-memcache_2.9.3` +* `storehaus-memcache_2.11` * `storehaus-memcache_2.10` -* `storehaus-mysql_2.9.3` +* `storehaus-mysql_2.11` * `storehaus-mysql_2.10` -* `storehaus-hbase_2.9.3` +* `storehaus-hbase_2.11` * `storehaus-hbase_2.10` -* `storehaus-redis_2.9.3` +* `storehaus-redis_2.11` * `storehaus-redis_2.10` -* `storehaus-dynamodb_2.9.3` +* `storehaus-dynamodb_2.11` * `storehaus-dynamodb_2.10` -* `storehaus-cache_2.9.3` +* `storehaus-kafka-08_2.11` +* `storehaus-kafka-08_2.10` +* `storehaus-mongodb_2.11` +* `storehaus-mongodb_2.10` +* `storehaus-elasticsearch_2.11` +* `storehaus-elasticsearch_2.10` +* `storehaus-leveldb_2.11` +* `storehaus-leveldb_2.10` +* `storehaus-http_2.11` +* `storehaus-http_2.10` +* `storehaus-cache_2.11` * `storehaus-cache_2.10` -* `storehaus-testing_2.9.3` +* `storehaus-testing_2.11` * `storehaus-testing_2.10` The suffix denotes the scala version. diff --git a/project/Build.scala b/project/Build.scala index 3f6d2712..a33547c8 100644 --- a/project/Build.scala +++ b/project/Build.scala @@ -28,15 +28,9 @@ import AssemblyKeys._ object StorehausBuild extends Build { def withCross(dep: ModuleID) = dep cross CrossVersion.binaryMapped { - case "2.9.3" => "2.9.2" // TODO: hack because twitter hasn't built things against 2.9.3 case version if version startsWith "2.10" => "2.10" // TODO: hack because sbt is broken case x => x } - - def specs2Import(scalaVersion: String) = scalaVersion match { - case version if version startsWith "2.9" => "org.specs2" %% "specs2" % "1.12.4.1" % "test" - case version if version startsWith "2.10" => "org.specs2" %% "specs2" % "1.13" % "test" - } val extraSettings = Project.defaultSettings ++ Boilerplate.settings ++ assemblySettings ++ mimaDefaultSettings @@ -57,20 +51,23 @@ object StorehausBuild extends Build { val sharedSettings = extraSettings ++ ciSettings ++ Seq( organization := "com.twitter", - scalaVersion := "2.9.3", - version := "0.9.1", - crossScalaVersions := Seq("2.9.3", "2.10.4"), + scalaVersion := "2.10.5", + crossScalaVersions := Seq("2.10.5", "2.11.7"), javacOptions ++= Seq("-source", "1.6", "-target", "1.6"), javacOptions in doc := Seq("-source", "1.6"), - libraryDependencies <+= scalaVersion(specs2Import(_)), + libraryDependencies += "org.scalatest" %% "scalatest" % scalatestVersion % "test", resolvers ++= Seq( Opts.resolver.sonatypeSnapshots, Opts.resolver.sonatypeReleases, - "Twitter Maven" at "http://maven.twttr.com", "Conjars Repository" at "http://conjars.org/repo" ), parallelExecution in Test := true, - scalacOptions ++= Seq(Opts.compile.unchecked, Opts.compile.deprecation), + scalacOptions ++= Seq( + "-unchecked", + "-deprecation", + "-Xlint", + "-Yresolve-term-conflict:package" + ), // Publishing options: publishMavenStyle := true, @@ -117,13 +114,14 @@ object StorehausBuild extends Build { def youngestForwardCompatible(subProj: String) = Some(subProj) .filterNot(unreleasedModules.contains(_)) - .map { s => "com.twitter" % ("storehaus-" + s + "_2.9.3") % "0.9.0" } - - val algebirdVersion = "0.7.0" - val bijectionVersion = "0.6.3" - val utilVersion = "6.11.0" - val scaldingVersion = "0.11.1" + .map { s => "com.twitter" % ("storehaus-" + s + "_2.10") % "0.12.0" } + val algebirdVersion = "0.11.0" + val bijectionVersion = "0.8.0" + val utilVersion = "6.26.0" + val scaldingVersion = "0.15.0" + val finagleVersion = "6.27.0" + val scalatestVersion = "2.2.4" lazy val storehaus = Project( id = "storehaus", base = file("."), @@ -141,10 +139,11 @@ object StorehausBuild extends Build { storehausRedis, storehausHBase, storehausDynamoDB, - storehausKafka, + storehausLevelDB, storehausKafka08, storehausMongoDB, storehausElastic, + storehausHttp, storehausTesting ) @@ -181,25 +180,29 @@ object StorehausBuild extends Build { "com.twitter" %% "algebird-core" % algebirdVersion, "com.twitter" %% "bijection-core" % bijectionVersion, "com.twitter" %% "bijection-netty" % bijectionVersion, - Finagle.module("memcached") + "com.twitter" %% "finagle-memcachedx" % finagleVersion excludeAll( + // we don't use this and its not on maven central. + ExclusionRule("com.twitter.common.zookeeper"), + ExclusionRule("com.twitter.common") + ) ) ).dependsOn(storehausAlgebra % "test->test;compile->compile") lazy val storehausMySQL = module("mysql").settings( - libraryDependencies += Finagle.module("mysql") + libraryDependencies += "com.twitter" %% "finagle-mysql" % finagleVersion ).dependsOn(storehausAlgebra % "test->test;compile->compile") lazy val storehausRedis = module("redis").settings( libraryDependencies ++= Seq ( "com.twitter" %% "bijection-core" % bijectionVersion, "com.twitter" %% "bijection-netty" % bijectionVersion, - Finagle.module("redis") + "com.twitter" %% "finagle-redis" % finagleVersion ), // we don't want various tests clobbering each others keys parallelExecution in Test := false ).dependsOn(storehausAlgebra % "test->test;compile->compile") - lazy val storehausHBase= module("hbase").settings( + lazy val storehausHBase = module("hbase").settings( libraryDependencies ++= Seq( "com.twitter" %% "algebird-core" % algebirdVersion, "com.twitter" %% "bijection-core" % bijectionVersion, @@ -213,7 +216,7 @@ object StorehausBuild extends Build { parallelExecution in Test := false ).dependsOn(storehausAlgebra % "test->test;compile->compile") - lazy val storehausDynamoDB= module("dynamodb").settings( + lazy val storehausDynamoDB = module("dynamodb").settings( libraryDependencies ++= Seq( "com.twitter" %% "algebird-core" % algebirdVersion, "com.twitter" %% "bijection-core" % bijectionVersion, @@ -225,37 +228,33 @@ object StorehausBuild extends Build { parallelExecution in Test := false ).dependsOn(storehausAlgebra % "test->test;compile->compile") - lazy val storehausKafka = module("kafka").settings( - libraryDependencies ++= Seq ( - "com.twitter" %% "bijection-core" % bijectionVersion, - "com.twitter" %% "bijection-avro" % bijectionVersion, - "com.twitter"%"kafka_2.9.2"%"0.7.0" % "provided" excludeAll( - ExclusionRule("com.sun.jdmk","jmxtools"), - ExclusionRule( "com.sun.jmx","jmxri"), - ExclusionRule( "javax.jms","jms") - ) - ), - // we don't want various tests clobbering each others keys - parallelExecution in Test := false - ).dependsOn(storehausAlgebra % "test->test;compile->compile") + lazy val storehausLevelDB = module("leveldb").settings( + libraryDependencies += + "org.fusesource.leveldbjni" % "leveldbjni-all" % "1.8", + parallelExecution in Test := false, + // workaround because of how sbt handles native libraries + // http://stackoverflow.com/questions/19425613/unsatisfiedlinkerror-with-native-library-under-sbt + testOptions in Test := Seq(), + fork in Test := true + ).dependsOn(storehausCore % "test->test;compile->compile") lazy val storehausKafka08 = module("kafka-08").settings( libraryDependencies ++= Seq ( "com.twitter" %% "bijection-core" % bijectionVersion, "com.twitter" %% "bijection-avro" % bijectionVersion, - "org.apache.kafka" % "kafka_2.9.2" % "0.8.0" % "provided" excludeAll( + "org.apache.kafka" %% "kafka" % "0.8.2.1" % "provided" excludeAll( ExclusionRule(organization = "com.sun.jdmk"), ExclusionRule(organization = "com.sun.jmx"), ExclusionRule(organization = "javax.jms")) ), // we don't want various tests clobbering each others keys parallelExecution in Test := false - ).dependsOn(storehausCore,storehausAlgebra % "test->test;compile->compile") + ).dependsOn(storehausCore, storehausAlgebra % "test->test;compile->compile") - lazy val storehausMongoDB= module("mongodb").settings( + lazy val storehausMongoDB = module("mongodb").settings( libraryDependencies ++= Seq( "com.twitter" %% "bijection-core" % bijectionVersion, - "org.mongodb" %% "casbah" % "2.6.4" + "org.mongodb" %% "casbah" % "2.8.2" ), parallelExecution in Test := false ).dependsOn(storehausAlgebra % "test->test;compile->compile") @@ -263,7 +262,7 @@ object StorehausBuild extends Build { lazy val storehausElastic = module("elasticsearch").settings( libraryDependencies ++= Seq ( "org.elasticsearch" % "elasticsearch" % "0.90.9", - "org.json4s" %% "json4s-native" % "3.2.6", + "org.json4s" %% "json4s-native" % "3.2.10", "com.google.code.findbugs" % "jsr305" % "1.3.+", "com.twitter" %% "bijection-json4s" % bijectionVersion ), @@ -271,15 +270,16 @@ object StorehausBuild extends Build { parallelExecution in Test := false ).dependsOn(storehausAlgebra % "test->test;compile->compile") - val storehausTesting = Project( id = "storehaus-testing", base = file("storehaus-testing"), settings = sharedSettings ++ Seq( name := "storehaus-testing", previousArtifact := youngestForwardCompatible("testing"), - libraryDependencies ++= Seq("org.scalacheck" %% "scalacheck" % "1.10.0" withSources(), - withCross("com.twitter" %% "util-core" % utilVersion)) + libraryDependencies ++= Seq( + "org.scalacheck" %% "scalacheck" % "1.12.2" withSources(), + withCross("com.twitter" %% "util-core" % utilVersion) + ) ) ) @@ -288,8 +288,15 @@ object StorehausBuild extends Build { "com.google.code.java-allocation-instrumenter" % "java-allocation-instrumenter" % "2.0", "com.google.code.gson" % "gson" % "1.7.1", "com.twitter" %% "bijection-core" % bijectionVersion, - "com.twitter" %% "algebird-core" % algebirdVersion), - javaOptions in run <++= (fullClasspath in Runtime) map { cp => Seq("-cp", sbt.Build.data(cp).mkString(":")) } + "com.twitter" %% "algebird-core" % algebirdVersion + ), + javaOptions in run <++= (fullClasspath in Runtime) map { cp => Seq("-cp", sbt.Build.data(cp).mkString(":")) } ).dependsOn(storehausCore, storehausAlgebra, storehausCache) + lazy val storehausHttp = module("http").settings( + libraryDependencies ++= Seq( + "com.twitter" %% "finagle-http" % finagleVersion, + "com.twitter" %% "bijection-netty" % bijectionVersion + ) + ).dependsOn(storehausCore) } diff --git a/project/Finagle.scala b/project/Finagle.scala deleted file mode 100644 index bd9454cb..00000000 --- a/project/Finagle.scala +++ /dev/null @@ -1,11 +0,0 @@ -package storehaus - -/** Module defining latest finagle version - * and means of constructing finagle module - * dependency */ -object Finagle { - import sbt._ - val LatestVersion = "6.12.2" - def module(name: String, version: String = LatestVersion) = - StorehausBuild.withCross("com.twitter" %% "finagle-%s".format(name) % version) -} diff --git a/project/build.properties b/project/build.properties index 0974fce4..a6e117b6 100644 --- a/project/build.properties +++ b/project/build.properties @@ -1 +1 @@ -sbt.version=0.13.0 +sbt.version=0.13.8 diff --git a/project/plugins.sbt b/project/plugins.sbt index 46d15ea6..209283bf 100644 --- a/project/plugins.sbt +++ b/project/plugins.sbt @@ -1,6 +1,6 @@ resolvers ++= Seq( "jgit-repo" at "http://download.eclipse.org/jgit/maven", - "sonatype-releases" at "http://oss.sonatype.org/content/repositories/releases" + "sonatype-releases" at "https://oss.sonatype.org/content/repositories/releases" ) addSbtPlugin("com.typesafe.sbt" % "sbt-ghpages" % "0.5.1") @@ -9,4 +9,4 @@ addSbtPlugin("com.typesafe" % "sbt-mima-plugin" % "0.1.6") addSbtPlugin("io.spray" % "sbt-boilerplate" % "0.5.1") -addSbtPlugin("com.eed3si9n" % "sbt-assembly" % "0.11.2") \ No newline at end of file +addSbtPlugin("com.eed3si9n" % "sbt-assembly" % "0.11.2") diff --git a/sbt b/sbt index 040ce801..689cb94c 100755 --- a/sbt +++ b/sbt @@ -4,7 +4,7 @@ # Author: Paul Phillips # todo - make this dynamic -declare -r sbt_release_version=0.13.0 +declare -r sbt_release_version=0.13.5 declare sbt_jar sbt_dir sbt_create sbt_launch_dir declare scala_version java_home sbt_explicit_version @@ -126,8 +126,8 @@ declare -r default_jvm_opts="-Dfile.encoding=UTF8 -XX:MaxPermSize=384m -Xms512m declare -r noshare_opts="-Dsbt.global.base=project/.sbtboot -Dsbt.boot.directory=project/.boot -Dsbt.ivy.home=project/.ivy" declare -r latest_28="2.8.2" declare -r latest_29="2.9.3" -declare -r latest_210="2.10.3" -declare -r latest_211="2.11.0-M5" +declare -r latest_210="2.10.5" +declare -r latest_211="2.11.5" declare -r script_path=$(get_script_path "$BASH_SOURCE") declare -r script_dir="$(dirname $script_path)" diff --git a/storehaus-cache/src/main/scala/com/twitter/storehaus/cache/CyclicIncrement.scala b/storehaus-cache/src/main/scala/com/twitter/storehaus/cache/CyclicIncrement.scala index ce599f06..abe1c687 100644 --- a/storehaus-cache/src/main/scala/com/twitter/storehaus/cache/CyclicIncrement.scala +++ b/storehaus-cache/src/main/scala/com/twitter/storehaus/cache/CyclicIncrement.scala @@ -59,7 +59,7 @@ class CyclicIncrementProvider[@specialized(Int, Long) K: Successible] nextSideCount: Int, maxNextSideVal: K) extends IdProvider[CyclicIncrement[K]] { - implicit val ord = implicitly[Successible[K]].ordering + implicit val ord = implicitly[Successible[K]].partialOrdering private def next(v: K) = Successible.next(v).getOrElse(throw new IllegalStateException("Hit maximum value for increment")) diff --git a/storehaus-cache/src/main/scala/com/twitter/storehaus/cache/HHFilteredCache.scala b/storehaus-cache/src/main/scala/com/twitter/storehaus/cache/HHFilteredCache.scala index 00101fc7..f9cf4d55 100644 --- a/storehaus-cache/src/main/scala/com/twitter/storehaus/cache/HHFilteredCache.scala +++ b/storehaus-cache/src/main/scala/com/twitter/storehaus/cache/HHFilteredCache.scala @@ -16,7 +16,7 @@ package com.twitter.storehaus.cache -import com.twitter.algebird.{ Semigroup, Monoid, Group, CMSHash } +import com.twitter.algebird.{ Semigroup, Monoid, Group, CMSHash, CMSHasherImplicits} import com.twitter.util.Future @@ -41,6 +41,8 @@ object HeavyHittersPercent { } sealed class ApproxHHTracker[K](hhPct: HeavyHittersPercent, updateFreq: WriteOperationUpdateFrequency, roFreq: RollOverFrequencyMS) { + import CMSHasherImplicits._ + private[this] final val WIDTH = 1000 private[this] final val DEPTH = 4 private[this] final val hh = new java.util.HashMap[K, Long]() @@ -53,9 +55,9 @@ sealed class ApproxHHTracker[K](hhPct: HeavyHittersPercent, updateFreq: WriteOpe private[this] var nextRollOver: Long = System.currentTimeMillis + roFreq.toLong private[this] final val updateOps = new java.util.concurrent.atomic.AtomicInteger(0) - private[this] final val hashes: IndexedSeq[CMSHash] = { + private[this] final val hashes: IndexedSeq[CMSHash[Long]] = { val r = new scala.util.Random(5) - (0 until DEPTH).map { _ => CMSHash(r.nextInt, 0, WIDTH) } + (0 until DEPTH).map { _ => CMSHash[Long](r.nextInt, 0, WIDTH) } }.toIndexedSeq @inline diff --git a/storehaus-cache/src/test/scala/com/twitter/storehaus/cache/HHFilteredCacheTest.scala b/storehaus-cache/src/test/scala/com/twitter/storehaus/cache/HHFilteredCacheTest.scala index 08b6c3e4..a0042b03 100644 --- a/storehaus-cache/src/test/scala/com/twitter/storehaus/cache/HHFilteredCacheTest.scala +++ b/storehaus-cache/src/test/scala/com/twitter/storehaus/cache/HHFilteredCacheTest.scala @@ -16,14 +16,14 @@ package com.twitter.storehaus.cache -import org.specs2.mutable._ +import org.scalatest.{Matchers, WordSpec} -class HHFilteredCacheTest extends Specification { +class HHFilteredCacheTest extends WordSpec with Matchers { def checkCache[K, V](pairs: Seq[(K, V)], m: Map[K, V])(implicit cache: MutableCache[K, V]) = { pairs.foldLeft(cache)(_ += _) val res = cache.iterator.toMap cache.clear - res must be_==(m) + res should equal(m) } "HHFilteredCache works properly" in { diff --git a/storehaus-cache/src/test/scala/com/twitter/storehaus/cache/LRUCacheTest.scala b/storehaus-cache/src/test/scala/com/twitter/storehaus/cache/LRUCacheTest.scala index 24cd3cc3..0941c83e 100644 --- a/storehaus-cache/src/test/scala/com/twitter/storehaus/cache/LRUCacheTest.scala +++ b/storehaus-cache/src/test/scala/com/twitter/storehaus/cache/LRUCacheTest.scala @@ -16,11 +16,11 @@ package com.twitter.storehaus.cache -import org.specs2.mutable._ +import org.scalatest.{Matchers, WordSpec} -class LRUCacheTest extends Specification { +class LRUCacheTest extends WordSpec with Matchers { def checkCache[K, V](pairs: Seq[(K, V)], m: Map[K, V])(implicit cache: Cache[K, V]) = - pairs.foldLeft(cache)(_ + _).toMap must be_==(m) + pairs.foldLeft(cache)(_ + _).toMap should equal(m) "LRUCache works properly with threshold 2" in { implicit val cache = Cache.lru[String, Int](2) @@ -36,7 +36,7 @@ class LRUCacheTest extends Specification { Seq("a" -> 1, "b" -> 2, "b" -> 3), Map("a" -> 1, "b" -> 3) ) - ((cache + ("a" -> 1) + ("b" -> 2)).hit("a") + ("c" -> 3)).toMap - .must(be_==(Map("a" -> 1, "c" -> 3))) + val result = ((cache + ("a" -> 1) + ("b" -> 2)).hit("a") + ("c" -> 3)).toMap + result should equal(Map("a" -> 1, "c" -> 3)) } } diff --git a/storehaus-cache/src/test/scala/com/twitter/storehaus/cache/MutableLRUCacheTest.scala b/storehaus-cache/src/test/scala/com/twitter/storehaus/cache/MutableLRUCacheTest.scala index 9abf39fc..200b5c7b 100644 --- a/storehaus-cache/src/test/scala/com/twitter/storehaus/cache/MutableLRUCacheTest.scala +++ b/storehaus-cache/src/test/scala/com/twitter/storehaus/cache/MutableLRUCacheTest.scala @@ -16,15 +16,15 @@ package com.twitter.storehaus.cache -import org.specs2.mutable._ +import org.scalatest.{Matchers, WordSpec} -class MutableLRUCacheTest extends Specification { +class MutableLRUCacheTest extends WordSpec with Matchers { def freshCache = MutableLRUCache[String, Int](2) def checkCache(pairs: Seq[(String, Int)], results: Seq[Boolean]) = { val cache = freshCache pairs.foreach(cache += _) - pairs.map { case (k, _) => cache.contains(k) } must be_==(results) + pairs.map { case (k, _) => cache.contains(k) } should equal(results) } "MutableLRUCache works properly with threshold 2" in { diff --git a/storehaus-cache/src/test/scala/com/twitter/storehaus/cache/MutableTTLCacheTest.scala b/storehaus-cache/src/test/scala/com/twitter/storehaus/cache/MutableTTLCacheTest.scala index 385e18c5..3c190965 100644 --- a/storehaus-cache/src/test/scala/com/twitter/storehaus/cache/MutableTTLCacheTest.scala +++ b/storehaus-cache/src/test/scala/com/twitter/storehaus/cache/MutableTTLCacheTest.scala @@ -16,28 +16,28 @@ package com.twitter.storehaus.cache -import org.specs2.mutable._ +import org.scalatest.{Matchers, WordSpec} import com.twitter.util.Duration -class MutableTTLCacheTest extends Specification { +class MutableTTLCacheTest extends WordSpec with Matchers { "TTLCache exhibits proper TTL-ness" in { val ttl: Duration = Duration.fromMilliseconds(500) val cache = MutableCache.ttl[String, Int](ttl, 100) cache += ("a" -> 1) cache += ("b" -> 2) - cache.toNonExpiredMap must be_==(Map("a" -> 1, "b" -> 2)) + cache.toNonExpiredMap should equal(Map("a" -> 1, "b" -> 2)) Thread.sleep(ttl.inMilliseconds) cache += ("c" -> 3) - cache.toNonExpiredMap must be_==(Map("c" -> 3)) + cache.toNonExpiredMap should equal(Map("c" -> 3)) } "TTLCache does not return an expired value" in { val ttl: Duration = Duration.fromMilliseconds(500) val cache = MutableCache.ttl[String, Int](ttl, 100) cache += ("a" -> 10) - cache.get("a") must be_==(Some(10)) + cache.get("a") should equal(Some(10)) Thread.sleep(ttl.inMilliseconds) - cache.get("a") must be_==(None) + cache.get("a") should equal(None) } } diff --git a/storehaus-cache/src/test/scala/com/twitter/storehaus/cache/TTLCacheTest.scala b/storehaus-cache/src/test/scala/com/twitter/storehaus/cache/TTLCacheTest.scala index 1c915dec..9bc66585 100644 --- a/storehaus-cache/src/test/scala/com/twitter/storehaus/cache/TTLCacheTest.scala +++ b/storehaus-cache/src/test/scala/com/twitter/storehaus/cache/TTLCacheTest.scala @@ -16,25 +16,24 @@ package com.twitter.storehaus.cache -import org.specs2.mutable._ +import org.scalatest.{Matchers, WordSpec} import com.twitter.util.Duration - -class TTLCacheTest extends Specification { +class TTLCacheTest extends WordSpec with Matchers { val ttlMS = 600 val cache = Cache.ttl[String, Int](Duration.fromMilliseconds(ttlMS)) "TTLCache exhibits proper TTL-ness" in { val abCache = cache.putClocked("a" -> 1)._2.putClocked("b" -> 2)._2 - abCache.toNonExpiredMap must be_==(Map("a" -> 1, "b" -> 2)) + abCache.toNonExpiredMap should equal(Map("a" -> 1, "b" -> 2)) Thread.sleep(ttlMS) - (abCache.putClocked("c" -> 3)._2).toNonExpiredMap must be_==(Map("c" -> 3)) + (abCache.putClocked("c" -> 3)._2).toNonExpiredMap should equal(Map("c" -> 3)) } "TTLCache does not return an expired value" in { val withV = cache.putClocked("a" -> 10)._2 - withV.getNonExpired("a") must be_==(Some(10)) + withV.getNonExpired("a") should equal(Some(10)) Thread.sleep(ttlMS) - withV.getNonExpired("a") must be_==(None) + withV.getNonExpired("a") should equal(None) } } diff --git a/storehaus-core/src/main/scala/com/twitter/storehaus/ReadThroughStore.scala b/storehaus-core/src/main/scala/com/twitter/storehaus/ReadThroughStore.scala index 874d2b6d..cbd9646d 100644 --- a/storehaus-core/src/main/scala/com/twitter/storehaus/ReadThroughStore.scala +++ b/storehaus-core/src/main/scala/com/twitter/storehaus/ReadThroughStore.scala @@ -72,12 +72,17 @@ class ReadThroughStore[K, V](backingStore: ReadableStore[K, V], cache: Store[K, val hits = responses.filter { !_._2.isEmpty } val missedKeys = responses.filter { _._2.isEmpty }.keySet - FutureOps.mapCollect(backingStore.multiGet(missedKeys ++ failedKeys)).flatMap { storeResult => - // write fetched keys to cache, best effort - mutex.acquire.flatMap { p => - FutureOps.mapCollect(cache.multiPut(storeResult))(FutureCollector.bestEffort[(K1, Unit)]) - .map { u => hits ++ storeResult } - .ensure { p.release } + val remaining = missedKeys ++ failedKeys + if (remaining.isEmpty) { + Future.value(hits) // no cache misses + } else { + FutureOps.mapCollect(backingStore.multiGet(remaining)).flatMap { storeResult => + // write fetched keys to cache, best effort + mutex.acquire.flatMap { p => + FutureOps.mapCollect(cache.multiPut(storeResult))(FutureCollector.bestEffort[(K1, Unit)]) + .map { u => hits ++ storeResult } + .ensure { p.release } + } } } } diff --git a/storehaus-core/src/main/scala/com/twitter/storehaus/TunableReplicatedStore.scala b/storehaus-core/src/main/scala/com/twitter/storehaus/TunableReplicatedStore.scala index e1ecd2b7..4012d2b4 100644 --- a/storehaus-core/src/main/scala/com/twitter/storehaus/TunableReplicatedStore.scala +++ b/storehaus-core/src/main/scala/com/twitter/storehaus/TunableReplicatedStore.scala @@ -19,7 +19,6 @@ package com.twitter.storehaus import java.util.concurrent.atomic.AtomicInteger import java.util.concurrent.ConcurrentHashMap -import scala.collection.mutable.ConcurrentMap import scala.collection.JavaConverters._ import com.twitter.util.{ Future, Promise, Return, Throw, Time } diff --git a/storehaus-core/src/main/scala/com/twitter/storehaus/WritableStore.scala b/storehaus-core/src/main/scala/com/twitter/storehaus/WritableStore.scala index 236b8e5c..fd3d4931 100644 --- a/storehaus-core/src/main/scala/com/twitter/storehaus/WritableStore.scala +++ b/storehaus-core/src/main/scala/com/twitter/storehaus/WritableStore.scala @@ -24,7 +24,7 @@ import com.twitter.util.{ Closable, Duration, Future, Time } */ trait WritableStore[-K, -V] extends Closable { /** - * replace a value + * Replace a value * Delete is the same as put((k,None)) */ def put(kv: (K, V)): Future[Unit] = multiPut(Map(kv)).apply(kv._1) diff --git a/storehaus-core/src/test/scala/com/twitter/storehaus/RetryingStoreProperties.scala b/storehaus-core/src/test/scala/com/twitter/storehaus/RetryingStoreProperties.scala new file mode 100644 index 00000000..aae2bd71 --- /dev/null +++ b/storehaus-core/src/test/scala/com/twitter/storehaus/RetryingStoreProperties.scala @@ -0,0 +1,36 @@ +/* + * Copyright 2014 Twitter Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. You may obtain + * a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.twitter.storehaus + +import com.twitter.conversions.time._ +import com.twitter.util.{JavaTimer, Timer} + +import org.scalacheck.Properties + +object RetryingStoreProperties extends Properties("RetryingStore") { + import StoreProperties.storeTest + + implicit val timer: Timer = new JavaTimer(true) + + property("RetryingStore obeys the Store laws, assuming the underlying Store always returns results before timeout") = + storeTest[String, Int] { + Store.withRetry[String, Int]( + store = new ConcurrentHashMapStore[String,Int](), + backoffs = for (i <- 0 until 3) yield 1.milliseconds + )(_ => true) + } +} diff --git a/storehaus-dynamodb/src/main/scala/com/twitter/storehaus/dynamodb/DynamoLongStore.scala b/storehaus-dynamodb/src/main/scala/com/twitter/storehaus/dynamodb/DynamoLongStore.scala index 256423df..90175967 100644 --- a/storehaus-dynamodb/src/main/scala/com/twitter/storehaus/dynamodb/DynamoLongStore.scala +++ b/storehaus-dynamodb/src/main/scala/com/twitter/storehaus/dynamodb/DynamoLongStore.scala @@ -24,14 +24,18 @@ import com.twitter.storehaus.ConvertedStore import com.twitter.storehaus.algebra.MergeableStore import scala.util.Try - +import com.amazonaws.regions.{ Region, Regions } import com.amazonaws.services.dynamodbv2.model._ import AwsBijections._ object DynamoLongStore { - def apply(awsAccessKey: String, awsSecretKey: String, tableName: String, primaryKeyColumn: String, valueColumn: String) = - new DynamoLongStore(DynamoStore(awsAccessKey, awsSecretKey, tableName, primaryKeyColumn, valueColumn)) + def apply(awsAccessKey: String, awsSecretKey: String, tableName: String, + primaryKeyColumn: String, valueColumn: String, + endpoint: Regions = Regions.US_EAST_1) = + + new DynamoLongStore(DynamoStore(awsAccessKey, awsSecretKey, tableName, + primaryKeyColumn, valueColumn, endpoint)) } class DynamoLongStore(underlying: DynamoStore) diff --git a/storehaus-dynamodb/src/main/scala/com/twitter/storehaus/dynamodb/DynamoStore.scala b/storehaus-dynamodb/src/main/scala/com/twitter/storehaus/dynamodb/DynamoStore.scala index 4d07ba16..55059035 100644 --- a/storehaus-dynamodb/src/main/scala/com/twitter/storehaus/dynamodb/DynamoStore.scala +++ b/storehaus-dynamodb/src/main/scala/com/twitter/storehaus/dynamodb/DynamoStore.scala @@ -23,6 +23,7 @@ import com.twitter.util.{ Future, FuturePool } import com.twitter.storehaus.Store import com.amazonaws.auth.BasicAWSCredentials +import com.amazonaws.regions.{ Region, Regions } import com.amazonaws.services.dynamodbv2.{ AmazonDynamoDBClient, AmazonDynamoDB } import com.amazonaws.services.dynamodbv2.model._ @@ -34,19 +35,25 @@ import AwsBijections._ object DynamoStore { - def apply(awsAccessKey: String, awsSecretKey: String, tableName: String, primaryKeyColumn: String, valueColumn: String): DynamoStore = { + def apply(awsAccessKey: String, awsSecretKey: String, tableName: String, + primaryKeyColumn: String, valueColumn: String, + endpoint: Regions = Regions.US_EAST_1): DynamoStore = { + val processors = Runtime.getRuntime.availableProcessors - this(awsAccessKey, awsSecretKey, tableName, primaryKeyColumn, valueColumn, processors) + this(awsAccessKey, awsSecretKey, tableName, primaryKeyColumn, valueColumn, + processors, endpoint) } def apply(awsAccessKey: String, awsSecretKey: String, tableName: String, - primaryKeyColumn: String, valueColumn: String, numberWorkerThreads: Int): DynamoStore = { + primaryKeyColumn: String, valueColumn: String, numberWorkerThreads: Int, + endpoint: Regions): DynamoStore = { val auth = new BasicAWSCredentials(awsAccessKey, awsSecretKey) - val client = new AmazonDynamoDBClient(auth) - new DynamoStore(client, tableName, primaryKeyColumn, valueColumn, numberWorkerThreads) + var client = new AmazonDynamoDBClient(auth) + client.setRegion(Region.getRegion(endpoint)); + new DynamoStore(client, tableName, primaryKeyColumn, valueColumn, + numberWorkerThreads) } - } class DynamoStore(val client: AmazonDynamoDB, val tableName: String, @@ -75,9 +82,7 @@ class DynamoStore(val client: AmazonDynamoDB, val tableName: String, apiRequestFuturePool(client.deleteItem(deleteRequest)) } - } - } override def get(k: String): Future[Option[AttributeValue]] = { @@ -88,6 +93,4 @@ class DynamoStore(val client: AmazonDynamoDB, val tableName: String, Option(client.getItem(getRequest).getItem).map(_.get(valueColumn)) } } - } - diff --git a/storehaus-dynamodb/src/main/scala/com/twitter/storehaus/dynamodb/DynamoStringStore.scala b/storehaus-dynamodb/src/main/scala/com/twitter/storehaus/dynamodb/DynamoStringStore.scala index 1a1f9bdc..f066a1cf 100644 --- a/storehaus-dynamodb/src/main/scala/com/twitter/storehaus/dynamodb/DynamoStringStore.scala +++ b/storehaus-dynamodb/src/main/scala/com/twitter/storehaus/dynamodb/DynamoStringStore.scala @@ -19,13 +19,18 @@ import java.util.{ Map => JMap } import com.twitter.storehaus.ConvertedStore +import com.amazonaws.regions.{ Region, Regions } import com.amazonaws.services.dynamodbv2.model._ import AwsBijections._ object DynamoStringStore { - def apply(awsAccessKey: String, awsSecretKey: String, tableName: String, primaryKeyColumn: String, valueColumn: String) = - new DynamoStringStore(DynamoStore(awsAccessKey, awsSecretKey, tableName, primaryKeyColumn, valueColumn)) + def apply(awsAccessKey: String, awsSecretKey: String, tableName: String, + primaryKeyColumn: String, valueColumn: String, + endpoint: Regions = Regions.US_EAST_1) = + + new DynamoStringStore(DynamoStore(awsAccessKey, awsSecretKey, tableName, + primaryKeyColumn, valueColumn, endpoint)) } class DynamoStringStore(underlying: DynamoStore) diff --git a/storehaus-elasticsearch/src/main/scala/com/twitter/storehaus/elasticsearch/ElasticSearchStringStore.scala b/storehaus-elasticsearch/src/main/scala/com/twitter/storehaus/elasticsearch/ElasticSearchStringStore.scala index b4693111..3f8e2140 100644 --- a/storehaus-elasticsearch/src/main/scala/com/twitter/storehaus/elasticsearch/ElasticSearchStringStore.scala +++ b/storehaus-elasticsearch/src/main/scala/com/twitter/storehaus/elasticsearch/ElasticSearchStringStore.scala @@ -41,7 +41,7 @@ object ElasticSearchStringStore { class ElasticSearchStringStore(private val index: String, private val tipe: String, //tipe -> type since type is a reserved keyword - private val client: Client) extends Store[String, String] with QueryableStore[SearchRequest, String] { + private val client: Client) extends Store[String, String] with QueryableStore[SearchRequest, String] { private lazy val futurePool = FuturePool.unboundedPool private[this] lazy val mutex = new AsyncMutex diff --git a/storehaus-elasticsearch/src/test/scala/com/twitter/storehaus/elasticsearch/DefaultElasticContext.scala b/storehaus-elasticsearch/src/test/scala/com/twitter/storehaus/elasticsearch/DefaultElasticContext.scala index eabf89b3..51e52c62 100644 --- a/storehaus-elasticsearch/src/test/scala/com/twitter/storehaus/elasticsearch/DefaultElasticContext.scala +++ b/storehaus-elasticsearch/src/test/scala/com/twitter/storehaus/elasticsearch/DefaultElasticContext.scala @@ -20,21 +20,19 @@ import org.elasticsearch.common.settings.ImmutableSettings import java.util.UUID import java.io.File import org.elasticsearch.node.NodeBuilder._ -import org.specs2.specification.Scope import org.json4s.{native, NoTypeHints} - /** * @author Mansur Ashraf * @since 1/13/14 */ -trait DefaultElasticContext extends Scope { +trait DefaultElasticContext { val tempFile = File.createTempFile("elasticsearchtests", "tmp") val homeDir = new File(tempFile.getParent + "/" + UUID.randomUUID().toString) val test_index = "test_index" val test_type = "test_type" - val DEFAULT_TIMEOUT = 4 * 1000 + val DEFAULT_TIMEOUT = 10 * 1000 homeDir.mkdir() homeDir.deleteOnExit() @@ -54,7 +52,7 @@ trait DefaultElasticContext extends Scope { node.client() } private implicit val formats = native.Serialization.formats(NoTypeHints) - lazy val store = ElasticSearchCaseClassStore[Person](test_index, test_type, client) + val store = ElasticSearchCaseClassStore[Person](test_index, test_type, client) def refreshIndex(): Unit = { refresh(test_index) diff --git a/storehaus-elasticsearch/src/test/scala/com/twitter/storehaus/elasticsearch/ElasticSearchStoreSpecs.scala b/storehaus-elasticsearch/src/test/scala/com/twitter/storehaus/elasticsearch/ElasticSearchStoreSpecs.scala index a6289d82..dcd3f321 100644 --- a/storehaus-elasticsearch/src/test/scala/com/twitter/storehaus/elasticsearch/ElasticSearchStoreSpecs.scala +++ b/storehaus-elasticsearch/src/test/scala/com/twitter/storehaus/elasticsearch/ElasticSearchStoreSpecs.scala @@ -16,7 +16,7 @@ package com.twitter.storehaus.elasticsearch -import org.specs2.mutable.Specification +import org.scalatest.{OneInstancePerTest, BeforeAndAfter, Matchers, WordSpec} import com.twitter.util.{Future, Await} import com.twitter.storehaus.FutureOps import org.elasticsearch.action.search.SearchRequestBuilder @@ -28,56 +28,63 @@ import org.json4s.{native, NoTypeHints} * @author Mansur Ashraf * @since 1/13/14 */ -class ElasticSearchStoreSpecs extends Specification { +class ElasticSearchStoreSpecs extends WordSpec with Matchers + with BeforeAndAfter with OneInstancePerTest with DefaultElasticContext { + private implicit val formats = native.Serialization.formats(NoTypeHints) private val person = Person("Joe", "Smith", 29) + before { + // wait for the shards to load up + block(DEFAULT_TIMEOUT) + } + "ElasticSearch Store" should { - "Put a value" in new DefaultElasticContext { - private val key = "put_key" + "Put a value" in { + val key = "put_key" store.put((key, Some(person))) blockAndRefreshIndex val result = Await.result(store.get(key)) - result === Some(person) + result should equal(Some(person)) } - "Retrieve a value that doesnt exist" in new DefaultElasticContext { - private val key = "put_key" + "Retrieve a value that doesnt exist" in { + val key = "put_key" store.put((key, Some(person))) blockAndRefreshIndex val result = Await.result(store.get("missing_key")) - result === None + result should equal(None) } - "Update a value" in new DefaultElasticContext { - private val key = "update_key" + "Update a value" in { + val key = "update_key" store.put(key, Some(person)) store.put(key, Some(person.copy(age = 30))) blockAndRefreshIndex val result = Await.result(store.get(key)) - result === Some(person.copy(age = 30)) + result should equal(Some(person.copy(age = 30))) } "Delete a value" in new DefaultElasticContext { - private val key = "delete_key" + val key = "delete_key" store.put(key, Some(person)) store.put(key, None) blockAndRefreshIndex val result = Await.result(store.get(key)) - result === None + result should equal (None) } - "Put multiple values" in new DefaultElasticContext { + "Put multiple values" in { val key = "_put_key" val persons = (1 to 10).map(i => i + key -> Some(person.copy(age = i))).toMap @@ -87,10 +94,10 @@ class ElasticSearchStoreSpecs extends Specification { val response = store.multiGet(persons.keySet) val result = Await.result(FutureOps.mapCollect(response)) - result === persons + result should equal (persons) } - "Retrieve values that do not exist" in new DefaultElasticContext { + "Retrieve values that do not exist" in { val key = "_put_key" val persons = (1 to 10).map(i => i + key -> Some(person.copy(age = i))).toMap @@ -100,10 +107,10 @@ class ElasticSearchStoreSpecs extends Specification { val response = store.multiGet(Set[String]()) val result = Await.result(FutureOps.mapCollect(response)) - result === Map[String,Future[Option[String]]]() + result should equal(Map[String,Future[Option[String]]]()) } - "Update multiple values" in new DefaultElasticContext { + "Update multiple values" in { val key = "_update_key" val persons = (1 to 10).map(i => i + key -> Some(person.copy(age = i))).toMap @@ -115,10 +122,10 @@ class ElasticSearchStoreSpecs extends Specification { val response = store.multiGet(persons_updated.keySet) val result = Await.result(FutureOps.mapCollect(response)) - result === persons_updated + result should equal(persons_updated) } - "Delete multiple values" in new DefaultElasticContext { + "Delete multiple values" in { val key = "_delete_key" val persons = (1 to 10).map(i => i + key -> Some(person.copy(age = i))).toMap @@ -130,10 +137,10 @@ class ElasticSearchStoreSpecs extends Specification { val response = store.multiGet(deleted_persons.keySet) val result = Await.result(FutureOps.mapCollect(response)) - result === deleted_persons + result should equal(deleted_persons) } - "Search for values" in new DefaultElasticContext { + "Search for values" in { val bookStore = ElasticSearchCaseClassStore[Book]("books", "programming", client) val books = Map( @@ -150,8 +157,8 @@ class ElasticSearchStoreSpecs extends Specification { //search for a particular author val request1 = new SearchRequestBuilder(client).setQuery(termQuery("authors", "josh")).request() val response1 = Await.result(bookStore.queryable.get(request1)) - response1 !== None - response1.get.head.name === "Effective Java" + response1 should not equal(None) + response1.get.head.name should equal("Effective Java") //find all the books published after 2001 where author is not Josh Bloch @@ -165,8 +172,8 @@ class ElasticSearchStoreSpecs extends Specification { ).request() val response2 = Await.result(bookStore.queryable.get(request2)) - response2 !== None - response2.get.size === 2 + response2 should not equal(None) + response2.get.size should equal(2) } } diff --git a/storehaus-hbase/src/main/scala/com/twitter/storehaus/asynchbase/AsyncHBaseStringStore.scala b/storehaus-hbase/src/main/scala/com/twitter/storehaus/asynchbase/AsyncHBaseStringStore.scala index dcc61985..17c34a3e 100644 --- a/storehaus-hbase/src/main/scala/com/twitter/storehaus/asynchbase/AsyncHBaseStringStore.scala +++ b/storehaus-hbase/src/main/scala/com/twitter/storehaus/asynchbase/AsyncHBaseStringStore.scala @@ -30,8 +30,8 @@ object AsyncHBaseStringStore { table: String, columnFamily: String, column: String, - threads: Int=4): AsyncHBaseLongStore = { - val store = new AsyncHBaseLongStore(quorumNames, table, columnFamily, column, new HBaseClient(quorumNames.mkString(",")), threads) + threads: Int=4): AsyncHBaseStringStore = { + val store = new AsyncHBaseStringStore(quorumNames, table, columnFamily, column, new HBaseClient(quorumNames.mkString(",")), threads) store.validateConfiguration() store } @@ -48,7 +48,6 @@ class AsyncHBaseStringStore(protected val quorumNames: Seq[String], */ override def get(k: String): Future[Option[String]] = { import com.twitter.bijection.hbase.HBaseBijections._ - implicit val stringInj = fromBijectionRep[String, StringBytes] getValue[String, String](k) } @@ -58,7 +57,6 @@ class AsyncHBaseStringStore(protected val quorumNames: Seq[String], */ override def put(kv: (String, Option[String])): Future[Unit] = { import com.twitter.bijection.hbase.HBaseBijections._ - implicit val stringInj = fromBijectionRep[String, StringBytes] putValue(kv) } diff --git a/storehaus-http/src/main/scala/com/twitter/storehaus/http/HttpStore.scala b/storehaus-http/src/main/scala/com/twitter/storehaus/http/HttpStore.scala new file mode 100644 index 00000000..39dee3c8 --- /dev/null +++ b/storehaus-http/src/main/scala/com/twitter/storehaus/http/HttpStore.scala @@ -0,0 +1,81 @@ +/* + * Copyright 2014 Twitter Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. You may obtain + * a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.twitter.storehaus.http + +import java.nio.charset.Charset +import org.jboss.netty.buffer.{ ChannelBuffer, ChannelBuffers } +import org.jboss.netty.handler.codec.http.{ HttpRequest, HttpResponse, DefaultHttpRequest, HttpVersion, HttpMethod, HttpHeaders, HttpResponseStatus } +import com.twitter.util.Future +import com.twitter.bijection.StringCodec +import com.twitter.bijection.netty.ChannelBufferBijection +import com.twitter.finagle.{ Service, Http } +import com.twitter.storehaus.{ Store, ConvertedStore } + +object HttpException { + def apply(response: HttpResponse): HttpException = + new HttpException(response.getStatus.getCode, response.getStatus.getReasonPhrase, response.getContent.toString(Charset.forName("UTF-8"))) +} + +case class HttpException(code: Int, reasonPhrase: String, content: String) extends Exception(reasonPhrase + Option(content).map("\n" + _ ).getOrElse("")) + +object HttpStore { + def apply(dest: String): HttpStore = new HttpStore(Http.newService(dest)) +} + +class HttpStore(val client: Service[HttpRequest, HttpResponse]) extends Store[String, ChannelBuffer] { + override def get(k: String): Future[Option[ChannelBuffer]] = { + val request = new DefaultHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.GET, k) + request.headers.set(HttpHeaders.Names.CONTENT_LENGTH, "0") + client(request).map{ response => + response.getStatus match { + case HttpResponseStatus.OK => Some(response.getContent) + case HttpResponseStatus.NOT_FOUND => None + case _ => throw HttpException(response) + } + } + } + + override def put(kv: (String, Option[ChannelBuffer])): Future[Unit] = { + val request = kv match { + case (k, Some(cb)) => + val req = new DefaultHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.PUT, k) + req.setContent(cb) + req.headers.set(HttpHeaders.Names.CONTENT_LENGTH, cb.readableBytes.toString) + req + case (k, None) => + val req = new DefaultHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.DELETE, k) + req.headers.set(HttpHeaders.Names.CONTENT_LENGTH, "0") + req + } + client(request).map{ response => + response.getStatus match { + case HttpResponseStatus.OK => () + case HttpResponseStatus.CREATED => () + case HttpResponseStatus.NO_CONTENT => () + case _ => throw HttpException(response) + } + } + } +} + +object HttpStringStore { + def apply(dest: String): HttpStringStore = new HttpStringStore(Http.newService(dest)) +} + +class HttpStringStore(val client: Service[HttpRequest, HttpResponse]) + extends ConvertedStore[String, String, ChannelBuffer, String](new HttpStore(client))(identity)( + StringCodec.utf8 andThen ChannelBufferBijection.inverse) diff --git a/storehaus-http/src/test/scala/com/twitter/storehaus/http/HttpStringStoreProperties.scala b/storehaus-http/src/test/scala/com/twitter/storehaus/http/HttpStringStoreProperties.scala new file mode 100644 index 00000000..853897a8 --- /dev/null +++ b/storehaus-http/src/test/scala/com/twitter/storehaus/http/HttpStringStoreProperties.scala @@ -0,0 +1,109 @@ +/* + * Copyright 2014 Twitter Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. You may obtain + * a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.twitter.storehaus.http + +import java.util.concurrent.ConcurrentHashMap +import java.nio.charset.Charset +import org.jboss.netty.buffer.ChannelBuffers +import org.jboss.netty.handler.codec.http.{ HttpRequest, HttpResponse, DefaultHttpResponse, HttpResponseStatus, HttpMethod, HttpHeaders } +import com.twitter.util.{ Await, Future } +import com.twitter.finagle.{ Service, Http, ListeningServer } +import com.twitter.storehaus.{ FutureOps, Store } +import com.twitter.storehaus.testing.CloseableCleanup +import com.twitter.storehaus.testing.generator.NonEmpty +import org.scalacheck.{ Arbitrary, Gen, Properties } +import org.scalacheck.Prop._ + +object HttpStringStoreProperties extends Properties("HttpStringStore") with CloseableCleanup[ListeningServer] { + def validPairs: Gen[List[(String, Option[String])]] = + NonEmpty.Pairing.alphaStrs().map(_.map{ case (k, v) => ("/" + k, v) }) + + def baseTest[K: Arbitrary, V: Arbitrary : Equiv](store: Store[K, V], validPairs: Gen[List[(K, Option[V])]]) + (put: (Store[K, V], List[(K, Option[V])]) => Unit) = + forAll(validPairs) { (examples: List[(K, Option[V])]) => + put(store, examples) + examples.toMap.forall { case (k, optV) => + val res = Await.result(store.get(k)) + Equiv[Option[V]].equiv(res, optV) + } + } + + def putStoreTest[K: Arbitrary, V: Arbitrary : Equiv](store: Store[K, V], validPairs: Gen[List[(K, Option[V])]]) = + baseTest(store, validPairs) { (s, pairs) => + pairs.foreach { + case (k, v) => + Await.result(s.put((k, v))) + } + } + + def multiPutStoreTest[K: Arbitrary, V: Arbitrary : Equiv](store: Store[K, V], validPairs: Gen[List[(K, Option[V])]]) = + baseTest(store, validPairs) { (s, pairs) => + Await.result(FutureOps.mapCollect(s.multiPut(pairs.toMap))) + } + + def storeTest(store: Store[String, String]) = + putStoreTest(store, validPairs) && multiPutStoreTest(store, validPairs) + + val service = new Service[HttpRequest, HttpResponse] { + private val map = new ConcurrentHashMap[String, String]() + private val utf8 = Charset.forName("UTF-8") + + def apply(request: HttpRequest): Future[HttpResponse] = { + val response = request.getMethod match { + case HttpMethod.GET => + Option(map.get(request.getUri)).map{ v => + val resp = new DefaultHttpResponse(request.getProtocolVersion, HttpResponseStatus.OK) + val content = ChannelBuffers.wrappedBuffer(v.getBytes(utf8)) + resp.setContent(content) + resp.headers.set(HttpHeaders.Names.CONTENT_LENGTH, content.readableBytes.toString) + resp + }.getOrElse { + val resp = new DefaultHttpResponse(request.getProtocolVersion, HttpResponseStatus.NOT_FOUND) + resp.headers.set(HttpHeaders.Names.CONTENT_LENGTH, "0") + resp + } + case HttpMethod.DELETE => + map.remove(request.getUri) + val resp = new DefaultHttpResponse(request.getProtocolVersion, HttpResponseStatus.NO_CONTENT) + resp.headers.set(HttpHeaders.Names.CONTENT_LENGTH, "0") + resp + case HttpMethod.PUT => + val maybeOldV = Option(map.put(request.getUri, request.getContent.toString(utf8))) + val resp = new DefaultHttpResponse(request.getProtocolVersion, maybeOldV.map(_ => HttpResponseStatus.OK).getOrElse(HttpResponseStatus.CREATED)) + resp.setContent(request.getContent) + resp.headers.set(HttpHeaders.Names.CONTENT_LENGTH, request.getContent.readableBytes.toString) + resp + case _ => + new DefaultHttpResponse(request.getProtocolVersion, HttpResponseStatus.METHOD_NOT_ALLOWED) + } + Future.value(response) + } + } + + val server = Http.serve("localhost:0", service) + + val store = HttpStringStore(server.boundAddress.toString.substring(1)) // i dont know how else to convert boundAddress into something usable + + property("HttpStringStore test") = storeTest(store) + + override def closeable = server + + override def cleanup() = { + println("closing server") + super.cleanup() + } +} diff --git a/storehaus-kafka-08/src/test/scala/com/twitter/storehaus/kafka/KafkaAvroSinkSpec.scala b/storehaus-kafka-08/src/test/scala/com/twitter/storehaus/kafka/KafkaAvroSinkSpec.scala index 7fbe1e92..1996eecf 100644 --- a/storehaus-kafka-08/src/test/scala/com/twitter/storehaus/kafka/KafkaAvroSinkSpec.scala +++ b/storehaus-kafka-08/src/test/scala/com/twitter/storehaus/kafka/KafkaAvroSinkSpec.scala @@ -16,7 +16,7 @@ package com.twitter.storehaus.kafka -import org.specs2.mutable.Specification +import org.scalatest.WordSpec import kafka.DataTuple import java.util.Date import com.twitter.util.{Future, Await} @@ -29,11 +29,12 @@ import kafka.serializer.Decoder * @author Mansur Ashraf * @since 12/8/13 */ -class KafkaAvroSinkSpec extends Specification { +class KafkaAvroSinkSpec extends WordSpec { "KafkaAvroSink" should { - "put avro object on a topic" in new KafkaContext { - val topic = "avro-topic-" + random - val sink = KafkaAvroSink[DataTuple](Seq(broker), topic,executor) + "put avro object on a topic" ignore { + val context = KafkaContext() + val topic = "avro-topic-" + context.random + val sink = KafkaAvroSink[DataTuple](Seq(context.broker), topic, context.executor) .filter { case (k, v) => v.getValue % 2 == 0 } @@ -43,17 +44,17 @@ class KafkaAvroSinkSpec extends Specification { .map(sink.write()("key", _)) Await.result(Future.collect(futures)) - + import context._ try { - val stream = consumer.createMessageStreamsByFilter(new Whitelist(topic), 1,implicitly[Decoder[String]], implicitly[Decoder[DataTuple]])(0) + val stream = context.consumer.createMessageStreamsByFilter(new Whitelist(topic), 1,implicitly[Decoder[String]], implicitly[Decoder[DataTuple]])(0) val iterator = stream.iterator() iterator.next().message.getValue % 2 === 0 iterator.next().message.getValue % 2 === 0 iterator.next().message.getValue % 2 === 0 iterator.next().message.getValue % 2 === 0 } catch { - case e: ConsumerTimeoutException => failure("test failed as consumer timed out without getting any msges") + case e: ConsumerTimeoutException => fail("test failed as consumer timed out without getting any msges") } - }.pendingUntilFixed + } } } diff --git a/storehaus-kafka-08/src/test/scala/com/twitter/storehaus/kafka/KafkaContext.scala b/storehaus-kafka-08/src/test/scala/com/twitter/storehaus/kafka/KafkaContext.scala index c6772914..8eaa773a 100644 --- a/storehaus-kafka-08/src/test/scala/com/twitter/storehaus/kafka/KafkaContext.scala +++ b/storehaus-kafka-08/src/test/scala/com/twitter/storehaus/kafka/KafkaContext.scala @@ -17,7 +17,6 @@ package com.twitter.storehaus.kafka -import org.specs2.specification.Scope import java.util.concurrent.Executors import com.twitter.concurrent.NamedPoolThreadFactory import java.util.{Properties, Random} @@ -34,7 +33,7 @@ import kafka.DataTuple * @author Mansur Ashraf * @since 12/7/13 */ -trait KafkaContext extends Scope { +case class KafkaContext() { val zK = "localhost:2181" val broker = "localhost:9092" diff --git a/storehaus-kafka-08/src/test/scala/com/twitter/storehaus/kafka/KafkaStoreSpec.scala b/storehaus-kafka-08/src/test/scala/com/twitter/storehaus/kafka/KafkaStoreSpec.scala index 609b1909..7073b608 100644 --- a/storehaus-kafka-08/src/test/scala/com/twitter/storehaus/kafka/KafkaStoreSpec.scala +++ b/storehaus-kafka-08/src/test/scala/com/twitter/storehaus/kafka/KafkaStoreSpec.scala @@ -16,7 +16,7 @@ package com.twitter.storehaus.kafka -import org.specs2.mutable.Specification +import org.scalatest.WordSpec import kafka.serializer.Decoder import com.twitter.util.{Future, Await} import kafka.consumer.{ConsumerTimeoutException, Whitelist} @@ -27,44 +27,46 @@ import KafkaInjections._ * @author Mansur Ashraf * @since 12/7/13 */ -class KafkaStoreSpec extends Specification { +class KafkaStoreSpec extends WordSpec { - "Kafka store" should { - "put a value on a topic" in new KafkaContext { - val topic = "test-topic-" + random + "Kafka store" ignore { + "put a value on a topic" in { + val context = KafkaContext() + val topic = "test-topic-" + context.random - Await.result(store(topic).put("testKey", "testValue")) + Await.result(context.store(topic).put("testKey", "testValue")) try { - val stream = consumer.createMessageStreamsByFilter(new Whitelist(topic), 1, implicitly[Decoder[String]], implicitly[Decoder[String]])(0) + val stream = context.consumer.createMessageStreamsByFilter(new Whitelist(topic), 1, implicitly[Decoder[String]], implicitly[Decoder[String]])(0) val message = stream.iterator().next().message message === "testValue" } catch { - case e: ConsumerTimeoutException => failure("test failed as consumer timed out without getting any msges") + case e: ConsumerTimeoutException => fail("test failed as consumer timed out without getting any msges") } - } .pendingUntilFixed + } - "put multiple values on a topic" in new KafkaContext { - val multiput_topic = "multiput-test-topic-" + random + "put multiple values on a topic" in { + val context = KafkaContext() + val multiput_topic = "multiput-test-topic-" + context.random - private val map = Map( + val map = Map( "Key_1" -> "value_2", "Key_2" -> "value_4", "Key_3" -> "value_6" ) - private val multiputResponse = store(multiput_topic).multiPut(map) + val multiputResponse = context.store(multiput_topic).multiPut(map) Await.result(Future.collect(multiputResponse.values.toList)) try { - val stream = consumer.createMessageStreamsByFilter(new Whitelist(multiput_topic), 1, implicitly[Decoder[String]], implicitly[Decoder[String]])(0) + val stream = context.consumer.createMessageStreamsByFilter(new Whitelist(multiput_topic), 1, implicitly[Decoder[String]], implicitly[Decoder[String]])(0) val iterator = stream.iterator() iterator.next().message.contains("value_") iterator.next().message.contains("value_") iterator.next().message.contains("value_") } catch { - case e: ConsumerTimeoutException => failure("test failed as consumer timed out without getting any msges") + case e: ConsumerTimeoutException => fail("test failed as consumer timed out without getting any msges") } - }.pendingUntilFixed + } } } diff --git a/storehaus-kafka/src/main/scala/com/twitter/storehaus/kafka/KafkaAvroSink.scala b/storehaus-kafka/src/main/scala/com/twitter/storehaus/kafka/KafkaAvroSink.scala deleted file mode 100644 index 3b6de332..00000000 --- a/storehaus-kafka/src/main/scala/com/twitter/storehaus/kafka/KafkaAvroSink.scala +++ /dev/null @@ -1,63 +0,0 @@ -/* - * Copyright 2014 Twitter inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.twitter.storehaus.kafka - -import org.apache.avro.specific.SpecificRecordBase -import com.twitter.bijection.avro.SpecificAvroCodecs -import com.twitter.bijection._ - -/** - * @author Mansur Ashraf - * @since 12/12/13 - */ -/** - * KafkaSink capable of sending Avro messages to a Kafka Topic - */ -@deprecated("use com.twitter.storehaus.kafka.KafkaStore with com.twitter.summingbird.storm.WritableStoreSink","0.9.0") -object KafkaAvroSink { - - import com.twitter.bijection.StringCodec.utf8 - - /** - * Creates KafkaSink that can sends message of form (String,SpecificRecord) to a Kafka Topic - * @param zkQuorum zookeeper quorum - * @param topic Kafka Topic - * @tparam V Avro Record - * @return KafkaSink[String,SpecificRecordBase] - */ - def apply[V <: SpecificRecordBase : Manifest](zkQuorum: Seq[String], topic: String) = { - implicit val inj = SpecificAvroCodecs[V] - lazy val sink = KafkaSink(zkQuorum: Seq[String], topic: String) - .convert[String, V](utf8.toFunction) - sink - } - - /** - * Creates KafkaSink that can sends message of form (T,SpecificRecord) to a Kafka Topic - * @param zkQuorum zookeeper quorum - * @param topic Kafka Topic - * @tparam V Avro Record - * @tparam K key - * @return KafkaSink[T,SpecificRecordBase] - */ - def apply[K: Codec, V <: SpecificRecordBase : Manifest](zkQuorum: Seq[String], topic: String) = { - implicit val inj = SpecificAvroCodecs[V] - lazy val sink = KafkaSink(zkQuorum: Seq[String], topic: String) - .convert[K, V](implicitly[Codec[K]].toFunction) - sink - } -} diff --git a/storehaus-kafka/src/main/scala/com/twitter/storehaus/kafka/KafkaInjections.scala b/storehaus-kafka/src/main/scala/com/twitter/storehaus/kafka/KafkaInjections.scala deleted file mode 100644 index 8de438ec..00000000 --- a/storehaus-kafka/src/main/scala/com/twitter/storehaus/kafka/KafkaInjections.scala +++ /dev/null @@ -1,56 +0,0 @@ -/* - * Copyright 2013 Twitter inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.twitter.storehaus.kafka - -import kafka.serializer.{Decoder, Encoder} -import kafka.message.Message -import com.twitter.bijection.{Codec, Injection} -import com.twitter.bijection.Conversion._ - -/** - * @author Mansur Ashraf - * @since 11/23/13 - */ -object KafkaInjections { - class ByteArrayEncoder extends FromInjectionEncoder[Array[Byte]] { - def injection: Injection[Array[Byte], Array[Byte]] = Injection.identity[Array[Byte]] - } - - trait FromInjectionDecoder[T] extends Decoder[T] { - def injection: Injection[T, Array[Byte]] - - def toEvent(m: Message) = injection.invert(m.payload.as[Array[Byte]]).get - } - - trait FromInjectionEncoder[T] extends Encoder[T] { - def injection: Injection[T, Array[Byte]] - - def toMessage(event: T): Message = new Message(injection(event)) - } - - def fromInjection[T: Codec]: (Encoder[T], Decoder[T]) = { - val result = new FromInjectionEncoder[T] with FromInjectionDecoder[T] { - def injection: Injection[T, Array[Byte]] = implicitly[Codec[T]] - } - (result, result) - } - - implicit def injectionEncoder[T: Codec] = fromInjection[T]._1 - - implicit def injectionDecoder[T: Codec] = fromInjection[T]._2 - -} diff --git a/storehaus-kafka/src/main/scala/com/twitter/storehaus/kafka/KafkaSink.scala b/storehaus-kafka/src/main/scala/com/twitter/storehaus/kafka/KafkaSink.scala deleted file mode 100644 index 2966b8ef..00000000 --- a/storehaus-kafka/src/main/scala/com/twitter/storehaus/kafka/KafkaSink.scala +++ /dev/null @@ -1,119 +0,0 @@ -/* - * Copyright 2014 Twitter inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.twitter.storehaus.kafka - -import com.twitter.util.Future -import KafkaSink.Dispatcher -import com.twitter.bijection.Injection -import scala.Array -import java.util.concurrent.{Executors, ExecutorService} -import com.twitter.concurrent.NamedPoolThreadFactory -import kafka.serializer.Encoder -import com.twitter.storehaus.kafka.KafkaInjections.ByteArrayEncoder - -/** - * Kafka Sink that can be used with SummingBird to sink messages to a Kafka Queue - * @author Mansur Ashraf - * @since 11/22/13 - */ -@deprecated("use com.twitter.storehaus.kafka.KafkaStore with com.twitter.summingbird.storm.WritableStoreSink") -class KafkaSink[K, V](dispatcher: Dispatcher[K, V]) extends Serializable { - /** - * Function that satisfies Storm#Sink {@see SummingBird-Storm} - * @return () => (K,V) => Future[Unit] - */ - def write: () => Dispatcher[K, V] = () => dispatcher - - /** - * Converts KafkaSink[k,V] to KafkaSink[k1,V1] - * @param kfn function that converts K1 to K - * @param inj injection from V1 to V - * @tparam K1 new Store Key - * @tparam V1 new Store Value - * @return KafkaSink[k1,V1] - */ - def convert[K1, V1](kfn: K1 => K)(implicit inj: Injection[V1, V]) = { - val fn: Dispatcher[K1, V1] = { - kv: (K1, V1) => dispatcher(compose(kfn, inj)(kv)) - } - new KafkaSink[K1, V1](fn) - } - - /** - * Filter all the messages that do not satisfy the given predicate - * @param fn predicate - * @return KafkaSink - */ - def filter(fn: ((K, V)) => Boolean) = { - val f: Dispatcher[K, V] = { - kv: (K, V) => - if (fn(kv)) dispatcher(kv) - else Future.Unit - } - new KafkaSink[K, V](f) - } - - private def compose[K1, V1](kfn: K1 => K, inj: Injection[V1, V]): ((K1, V1)) => ((K, V)) = { - case (k: K1, v: V1) => (kfn(k), inj(v)) - } -} - -object KafkaSink { - - private lazy val defaultExecutor = Executors.newCachedThreadPool(new NamedPoolThreadFactory("KafkaSinkUnboundedFuturePool")) - - type Dispatcher[K, V] = ((K, V)) => Future[Unit] - - /** - * Creates KafkaSink by wrapping KafkaStore - * @param store KafkaStore - * @tparam K key - * @tparam V value - * @return KafkaSink - */ - def apply[K, V](store: KafkaStore[K, V]): KafkaSink[K, V] = { - lazy val sink = new KafkaSink[K, V](store.put) - sink - } - - /** - * Returns KafkaSink[K,V] - * @param zkQuorum zookeeper quorum - * @param topic kafka topic. - * @tparam K key - * @tparam V value - * @return KafkaSink[K,V] - */ - def apply[K, V, E <: Encoder[V] : Manifest](zkQuorum: Seq[String], topic: String, executor: => ExecutorService): KafkaSink[K, V] = { - lazy val store = KafkaStore[K, V, E](zkQuorum, topic)(executor) - lazy val sink = apply[K, V](store) - sink - } - - /** - * Returns KafkaSink[Array[Byte], Array[Byte]]. This should be your default implementation - * in most scenarios - * @param zkQuorum zookeeper quorum - * @param topic kafka encoder - * @return KafkaSink[Array[Byte], Array[Byte]] - */ - def apply(zkQuorum: Seq[String], - topic: String): KafkaSink[Array[Byte], Array[Byte]] = { - apply[Array[Byte], Array[Byte], ByteArrayEncoder](zkQuorum, topic, defaultExecutor) - } -} - diff --git a/storehaus-kafka/src/main/scala/com/twitter/storehaus/kafka/KafkaStore.scala b/storehaus-kafka/src/main/scala/com/twitter/storehaus/kafka/KafkaStore.scala deleted file mode 100644 index f2215679..00000000 --- a/storehaus-kafka/src/main/scala/com/twitter/storehaus/kafka/KafkaStore.scala +++ /dev/null @@ -1,107 +0,0 @@ -/* - * Copyright 2013 Twitter inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.twitter.storehaus.kafka - -import com.twitter.storehaus.WritableStore -import com.twitter.util.{Time, FuturePool, Future} -import java.util.Properties -import kafka.producer.{ProducerData, Producer, ProducerConfig} -import java.util.concurrent.ExecutorService -import com.twitter.concurrent.AsyncMutex -import kafka.serializer.Encoder - - -/** - * Store capable of writing to a Kafka Topic. - * @author Mansur Ashraf - * @since 11/22/13 - */ -class KafkaStore[K, V](topic: String, props: Properties)(executor: => ExecutorService) extends WritableStore[K, V] with Serializable { - private lazy val producerConfig = new ProducerConfig(props) - private lazy val producer = new Producer[K, V](producerConfig) - private lazy val futurePool = FuturePool(executor) - private[this] lazy val mutex = new AsyncMutex - - /** - * Puts a key/value pair on a Kafka Topic using kafka.producer.AyncProducer and does not block thread - * @param kv (key,value) - * @return Future.unit - */ - override def put(kv: (K, V)): Future[Unit] = mutex.acquire().flatMap { - p => - futurePool { - val (key, value) = kv - producer.send(new ProducerData[K, V](topic, key, List(value))) - } ensure { - p.release() - } - } - - override def multiPut[K1 <: K](kvs: Map[K1, V]): Map[K1, Future[Unit]] = { - val future = mutex.acquire().flatMap { - p => futurePool { - val batch = kvs.map { - case (k, v) => new ProducerData[K, V](topic, k, List(v)) - }.toList - producer.send(batch: _*) - - } ensure { - p.release() - } - } - kvs.mapValues(v => future) - } - - /** Close this store and release any resources. - * It is undefined what happens on get/multiGet after close - */ - override def close(time: Time): Future[Unit] = futurePool { - producer.close() - } -} - -object KafkaStore { - - /** - * Creates an instance of Kafka store based on given properties. - * @param topic Kafka topic. - * @param props Kafka properties { @see http://kafka.apache.org/07/configuration.html}. - * @tparam K Key - * @tparam V Value - * @return Kafka Store - */ - def apply[K, V](topic: String, props: Properties)(executor: => ExecutorService) = new KafkaStore[K, V](topic, props)(executor) - - /** - * Creates a Kafka store. - * @param zkQuorum zookeeper quorum. - * @param topic Kafka topic. - * @tparam K Key - * @tparam V Value - * @return Kafka Store - */ - def apply[K, V, E <: Encoder[V] : Manifest](zkQuorum: Seq[String], - topic: String)(executor: => ExecutorService) = new KafkaStore[K, V](topic, createProp[V,E](zkQuorum))(executor) - - - private def createProp[V, E <: Encoder[V] : Manifest](zkQuorum: Seq[String]): Properties = { - val prop = new Properties() - prop.put("serializer.class", implicitly[Manifest[E]].erasure.getName) - prop.put("zk.connect", zkQuorum.mkString(",")) - prop - } -} \ No newline at end of file diff --git a/storehaus-kafka/src/main/scala/com/twitter/storehaus/kafka/package.scala b/storehaus-kafka/src/main/scala/com/twitter/storehaus/kafka/package.scala deleted file mode 100644 index c27d33f1..00000000 --- a/storehaus-kafka/src/main/scala/com/twitter/storehaus/kafka/package.scala +++ /dev/null @@ -1,27 +0,0 @@ -/* - * Copyright 2013 Twitter inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.twitter.storehaus - -import com.twitter.storehaus.kafka.KafkaSink.Dispatcher - -/** - * @author Mansur Ashraf - * @since 11/23/13 - */ -package object kafka { - implicit def dispatch[K, V](sink: KafkaSink[K, V]): () => Dispatcher[K, V] = sink.write -} diff --git a/storehaus-kafka/src/test/java/kafka/DataTuple.java b/storehaus-kafka/src/test/java/kafka/DataTuple.java deleted file mode 100644 index b0ac7142..00000000 --- a/storehaus-kafka/src/test/java/kafka/DataTuple.java +++ /dev/null @@ -1,248 +0,0 @@ -/* - * Copyright 2013 Twitter inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -/** - * Autogenerated by Avro - * - * DO NOT EDIT DIRECTLY - */ -package kafka; -@SuppressWarnings("all") -public class DataTuple extends org.apache.avro.specific.SpecificRecordBase implements org.apache.avro.specific.SpecificRecord { - public static final org.apache.avro.Schema SCHEMA$ = new org.apache.avro.Schema.Parser().parse("{\"type\":\"record\",\"name\":\"DataTuple\",\"namespace\":\"kafka\",\"fields\":[{\"name\":\"value\",\"type\":\"long\"},{\"name\":\"key\",\"type\":\"string\"},{\"name\":\"timestamp\",\"type\":\"long\"}]}"); - @Deprecated public long value; - @Deprecated public java.lang.CharSequence key; - @Deprecated public long timestamp; - - /** - * Default constructor. - */ - public DataTuple() {} - - /** - * All-args constructor. - */ - public DataTuple(java.lang.Long value, java.lang.CharSequence key, java.lang.Long timestamp) { - this.value = value; - this.key = key; - this.timestamp = timestamp; - } - - public org.apache.avro.Schema getSchema() { return SCHEMA$; } - // Used by DatumWriter. Applications should not call. - public java.lang.Object get(int field$) { - switch (field$) { - case 0: return value; - case 1: return key; - case 2: return timestamp; - default: throw new org.apache.avro.AvroRuntimeException("Bad index"); - } - } - // Used by DatumReader. Applications should not call. - @SuppressWarnings(value="unchecked") - public void put(int field$, java.lang.Object value$) { - switch (field$) { - case 0: value = (java.lang.Long)value$; break; - case 1: key = (java.lang.CharSequence)value$; break; - case 2: timestamp = (java.lang.Long)value$; break; - default: throw new org.apache.avro.AvroRuntimeException("Bad index"); - } - } - - /** - * Gets the value of the 'value' field. - */ - public java.lang.Long getValue() { - return value; - } - - /** - * Sets the value of the 'value' field. - * @param value the value to set. - */ - public void setValue(java.lang.Long value) { - this.value = value; - } - - /** - * Gets the value of the 'key' field. - */ - public java.lang.CharSequence getKey() { - return key; - } - - /** - * Sets the value of the 'key' field. - * @param value the value to set. - */ - public void setKey(java.lang.CharSequence value) { - this.key = value; - } - - /** - * Gets the value of the 'timestamp' field. - */ - public java.lang.Long getTimestamp() { - return timestamp; - } - - /** - * Sets the value of the 'timestamp' field. - * @param value the value to set. - */ - public void setTimestamp(java.lang.Long value) { - this.timestamp = value; - } - - /** Creates a new DataTuple RecordBuilder */ - public static kafka.DataTuple.Builder newBuilder() { - return new kafka.DataTuple.Builder(); - } - - /** Creates a new DataTuple RecordBuilder by copying an existing Builder */ - public static kafka.DataTuple.Builder newBuilder(kafka.DataTuple.Builder other) { - return new kafka.DataTuple.Builder(other); - } - - /** Creates a new DataTuple RecordBuilder by copying an existing DataTuple instance */ - public static kafka.DataTuple.Builder newBuilder(kafka.DataTuple other) { - return new kafka.DataTuple.Builder(other); - } - - /** - * RecordBuilder for DataTuple instances. - */ - public static class Builder extends org.apache.avro.specific.SpecificRecordBuilderBase - implements org.apache.avro.data.RecordBuilder { - - private long value; - private java.lang.CharSequence key; - private long timestamp; - - /** Creates a new Builder */ - private Builder() { - super(kafka.DataTuple.SCHEMA$); - } - - /** Creates a Builder by copying an existing Builder */ - private Builder(kafka.DataTuple.Builder other) { - super(other); - } - - /** Creates a Builder by copying an existing DataTuple instance */ - private Builder(kafka.DataTuple other) { - super(kafka.DataTuple.SCHEMA$); - if (isValidValue(fields()[0], other.value)) { - this.value = (java.lang.Long) data().deepCopy(fields()[0].schema(), other.value); - fieldSetFlags()[0] = true; - } - if (isValidValue(fields()[1], other.key)) { - this.key = (java.lang.CharSequence) data().deepCopy(fields()[1].schema(), other.key); - fieldSetFlags()[1] = true; - } - if (isValidValue(fields()[2], other.timestamp)) { - this.timestamp = (java.lang.Long) data().deepCopy(fields()[2].schema(), other.timestamp); - fieldSetFlags()[2] = true; - } - } - - /** Gets the value of the 'value' field */ - public java.lang.Long getValue() { - return value; - } - - /** Sets the value of the 'value' field */ - public kafka.DataTuple.Builder setValue(long value) { - validate(fields()[0], value); - this.value = value; - fieldSetFlags()[0] = true; - return this; - } - - /** Checks whether the 'value' field has been set */ - public boolean hasValue() { - return fieldSetFlags()[0]; - } - - /** Clears the value of the 'value' field */ - public kafka.DataTuple.Builder clearValue() { - fieldSetFlags()[0] = false; - return this; - } - - /** Gets the value of the 'key' field */ - public java.lang.CharSequence getKey() { - return key; - } - - /** Sets the value of the 'key' field */ - public kafka.DataTuple.Builder setKey(java.lang.CharSequence value) { - validate(fields()[1], value); - this.key = value; - fieldSetFlags()[1] = true; - return this; - } - - /** Checks whether the 'key' field has been set */ - public boolean hasKey() { - return fieldSetFlags()[1]; - } - - /** Clears the value of the 'key' field */ - public kafka.DataTuple.Builder clearKey() { - key = null; - fieldSetFlags()[1] = false; - return this; - } - - /** Gets the value of the 'timestamp' field */ - public java.lang.Long getTimestamp() { - return timestamp; - } - - /** Sets the value of the 'timestamp' field */ - public kafka.DataTuple.Builder setTimestamp(long value) { - validate(fields()[2], value); - this.timestamp = value; - fieldSetFlags()[2] = true; - return this; - } - - /** Checks whether the 'timestamp' field has been set */ - public boolean hasTimestamp() { - return fieldSetFlags()[2]; - } - - /** Clears the value of the 'timestamp' field */ - public kafka.DataTuple.Builder clearTimestamp() { - fieldSetFlags()[2] = false; - return this; - } - - @Override - public DataTuple build() { - try { - DataTuple record = new DataTuple(); - record.value = fieldSetFlags()[0] ? this.value : (java.lang.Long) defaultValue(fields()[0]); - record.key = fieldSetFlags()[1] ? this.key : (java.lang.CharSequence) defaultValue(fields()[1]); - record.timestamp = fieldSetFlags()[2] ? this.timestamp : (java.lang.Long) defaultValue(fields()[2]); - return record; - } catch (Exception e) { - throw new org.apache.avro.AvroRuntimeException(e); - } - } - } -} diff --git a/storehaus-kafka/src/test/resources/log4j.xml b/storehaus-kafka/src/test/resources/log4j.xml deleted file mode 100644 index b1ffb0a2..00000000 --- a/storehaus-kafka/src/test/resources/log4j.xml +++ /dev/null @@ -1,32 +0,0 @@ - - - - - - - - - - - - - - - - - \ No newline at end of file diff --git a/storehaus-kafka/src/test/scala/com/twitter/storehaus/kafka/KafkaAvroSinkSpec.scala b/storehaus-kafka/src/test/scala/com/twitter/storehaus/kafka/KafkaAvroSinkSpec.scala deleted file mode 100644 index e2ed6ccc..00000000 --- a/storehaus-kafka/src/test/scala/com/twitter/storehaus/kafka/KafkaAvroSinkSpec.scala +++ /dev/null @@ -1,60 +0,0 @@ -/* - * Copyright 2013 Twitter inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.twitter.storehaus.kafka - -import org.specs2.mutable.Specification -import kafka.DataTuple -import java.util.Date -import com.twitter.util.{Future, Await} -import kafka.consumer.{ConsumerTimeoutException, Whitelist} -import KafkaInjections._ -import kafka.serializer.Decoder - -/** - * Integration Test! Remove .pendingUntilFixed if testing against a Kafka Cluster - * @author Mansur Ashraf - * @since 12/8/13 - */ -class KafkaAvroSinkSpec extends Specification { - "KafkaAvroSink" should { - "put avro object on a topic" in new KafkaContext { - val topic = "avro-topic-" + random - val sink = KafkaAvroSink[DataTuple](Seq(zK), topic) - .filter { - case (k, v) => v.getValue % 2 == 0 - } - - val futures = (1 to 10) - .map(new DataTuple(_, "key", new Date().getTime)) - .map(sink.write()("key", _)) - - Await.result(Future.collect(futures)) - - try { - val stream = consumer.createMessageStreamsByFilter(new Whitelist(topic), 1, implicitly[Decoder[DataTuple]])(0) - val iterator = stream.iterator() - iterator.next().message.getValue % 2 === 0 - iterator.next().message.getValue % 2 === 0 - iterator.next().message.getValue % 2 === 0 - iterator.next().message.getValue % 2 === 0 - !iterator.hasNext() - } catch { - case e: ConsumerTimeoutException => failure("test failed as consumer timed out without getting any msges") - } - }.pendingUntilFixed - } -} diff --git a/storehaus-kafka/src/test/scala/com/twitter/storehaus/kafka/KafkaContext.scala b/storehaus-kafka/src/test/scala/com/twitter/storehaus/kafka/KafkaContext.scala deleted file mode 100644 index 3840eb84..00000000 --- a/storehaus-kafka/src/test/scala/com/twitter/storehaus/kafka/KafkaContext.scala +++ /dev/null @@ -1,62 +0,0 @@ -/* - * Copyright 2013 Twitter inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - - -package com.twitter.storehaus.kafka - -import org.specs2.specification.Scope -import java.util.concurrent.Executors -import com.twitter.concurrent.NamedPoolThreadFactory -import java.util.{Properties, Random} -import kafka.serializer.{Decoder, StringEncoder} -import kafka.consumer.{Consumer, ConsumerConfig} -import kafka.message.Message -import com.twitter.bijection.Injection -import java.nio.ByteBuffer -import org.apache.avro.specific.SpecificRecordBase -import com.twitter.bijection.avro.SpecificAvroCodecs -import kafka.DataTuple - -/** - * @author Mansur Ashraf - * @since 12/7/13 - */ -trait KafkaContext extends Scope { - - val zK = "localhost:2181" - lazy val executor = Executors.newCachedThreadPool(new NamedPoolThreadFactory("KafkaTestPool")) - implicit val dataTupleInj= SpecificAvroCodecs[DataTuple] - - def store(topic: String) = KafkaStore[String, String,StringEncoder](Seq(zK), topic)(executor) - - def sink(topic: String) = KafkaSink(Seq(zK), topic) - - def random = new Random().nextInt(100000) - - //Consumer props - val props = new Properties() - props.put("groupid", "consumer-" + random) - props.put("socket.buffersize", (2 * 1024 * 1024).toString) - props.put("fetch.size", (1024 * 1024).toString) - props.put("auto.commit", "true") - props.put("autocommit.interval.ms", (10 * 1000).toString) - props.put("autooffset.reset", "smallest") - props.put("zk.connect", zK) - props.put("consumer.timeout.ms", (2 * 60 * 1000).toString) - val config = new ConsumerConfig(props) - lazy val consumer = Consumer.create(config) -} - diff --git a/storehaus-kafka/src/test/scala/com/twitter/storehaus/kafka/KafkaSinkSpec.scala b/storehaus-kafka/src/test/scala/com/twitter/storehaus/kafka/KafkaSinkSpec.scala deleted file mode 100644 index 7856aabd..00000000 --- a/storehaus-kafka/src/test/scala/com/twitter/storehaus/kafka/KafkaSinkSpec.scala +++ /dev/null @@ -1,68 +0,0 @@ -/* - * Copyright 2013 Twitter inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.twitter.storehaus.kafka - -import org.specs2.mutable.Specification -import com.twitter.bijection.StringCodec.utf8 -import com.twitter.util.{Future, Await} -import kafka.consumer.{ConsumerTimeoutException, Whitelist} -import kafka.serializer.Decoder -import KafkaInjections._ - -/** - * Integration Test! Remove .pendingUntilFixed if testing against a Kafka Cluster - * @author Mansur Ashraf - * @since 12/7/13 - */ -class KafkaSinkSpec extends Specification { - - "Kafka Sink" should { - - "be able to convert and sink value" in new KafkaContext { - val topic = "long_topic-" + random - val longSink = sink(topic).convert[String, Long](utf8.toFunction) - Await.result(longSink.write()("1", 1L)) - try { - val stream = consumer.createMessageStreamsByFilter(new Whitelist(topic), 1,implicitly[Decoder[Long]])(0) - stream.iterator().next().message === 1L - } catch { - case e: ConsumerTimeoutException => failure("test failed as consumer timed out without getting any msges") - } - } .pendingUntilFixed - - "be able to filter value" in new KafkaContext { - val topic = "filter_topic-" + random - val filteredSink = sink(topic) - .convert[String, Long](utf8.toFunction) - .filter(_._2 % 2 == 0) //only send even values - - val futures = Future.collect((1 to 10).map(v => filteredSink()("key", v))) - Await.result(futures) - try { - val stream = consumer.createMessageStreamsByFilter(new Whitelist(topic), 1, implicitly[Decoder[Long]])(0) - val iterator = stream.iterator() - iterator.next().message % 2 === 0 - iterator.next().message % 2 === 0 - iterator.next().message % 2 === 0 - iterator.next().message % 2 === 0 - !iterator.hasNext() - } catch { - case e: ConsumerTimeoutException => failure("test failed as consumer timed out without getting any msges") - } - } .pendingUntilFixed - } -} diff --git a/storehaus-kafka/src/test/scala/com/twitter/storehaus/kafka/KafkaStoreSpec.scala b/storehaus-kafka/src/test/scala/com/twitter/storehaus/kafka/KafkaStoreSpec.scala deleted file mode 100644 index b97f0e2f..00000000 --- a/storehaus-kafka/src/test/scala/com/twitter/storehaus/kafka/KafkaStoreSpec.scala +++ /dev/null @@ -1,70 +0,0 @@ -/* - * Copyright 2013 Twitter inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.twitter.storehaus.kafka - -import org.specs2.mutable.Specification -import kafka.serializer.StringDecoder -import com.twitter.util.{Future, Await} -import kafka.consumer.{ConsumerTimeoutException, Whitelist} -import com.twitter.storehaus.FutureOps - -/** - * Integration Test! Remove .pendingUntilFixed if testing against a Kafka Cluster - * @author Mansur Ashraf - * @since 12/7/13 - */ -class KafkaStoreSpec extends Specification { - - "Kafka store" should { - "put a value on a topic" in new KafkaContext { - val topic = "test-topic-" + random - - Await.result(store(topic).put("testKey", "testValue")) - try { - val stream = consumer.createMessageStreamsByFilter(new Whitelist(topic), 1, new StringDecoder)(0) - stream.iterator().next().message === "testValue" - } - catch { - case e: ConsumerTimeoutException => failure("test failed as consumer timed out without getting any msges") - } - }.pendingUntilFixed - - "put multiple values on a topic" in new KafkaContext { - val multiput_topic = "multiput-test-topic-" + random - - private val map = Map( - "Key_1" -> "value_1", - "Key_2" -> "value_2", - "Key_3" -> "value_3" - ) - - private val multiputResponse = store(multiput_topic).multiPut(map) - Await.result(Future.collect(multiputResponse.values.toList)) - try { - val stream = consumer.createMessageStreamsByFilter(new Whitelist(multiput_topic), 1, new StringDecoder)(0) - val iterator = stream.iterator() - iterator.next().message === "value_1" - iterator.next().message === "value_2" - iterator.next().message === "value_3" - !iterator.hasNext() - } - catch { - case e: ConsumerTimeoutException => failure("test failed as consumer timed out without getting any msges") - } - }.pendingUntilFixed - } -} diff --git a/storehaus-leveldb/src/main/scala/com.twitter.storehaus.leveldb/LevelDBStore.scala b/storehaus-leveldb/src/main/scala/com.twitter.storehaus.leveldb/LevelDBStore.scala new file mode 100644 index 00000000..4f9ea5db --- /dev/null +++ b/storehaus-leveldb/src/main/scala/com.twitter.storehaus.leveldb/LevelDBStore.scala @@ -0,0 +1,107 @@ +/* + * Copyright 2015 Twitter inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.twitter.storehaus.leveldb + +import java.io.File +import java.util.concurrent.Executors + +import com.twitter.storehaus.Store +import com.twitter.util.{FuturePool, Time, Future} +import org.iq80.leveldb._ +import org.fusesource.leveldbjni.JniDBFactory._ + +/** + * Store for interacting with a LevelDB database. + * Example usage: + * {{{ + * import java.io.File + * import org.iq80.leveldb.Options + * import com.twitter.storehaus.leveldb.LevelDBStore + * + * val dir = new File("/some/path/myleveldb-directory") + * dir.mkdirs() + * val options = { + * val opt = new Options + * opt.createIfMissing(true) + * opt.blockSize(8192) + * opt + * } + * val store = new LevelDBStore(dir, new Options, 4) + * }}} + * @constructor Create a new LevelDB store. + * @param dir Directory where the database is/will be stored. + * @param options Different options for the database, see: https://github.com/google/leveldb/blob/master/util/options.cc + * @param numThreads Number of threads in the pool of threads interacting with the db. + * @author Ben Fradet + * @since 10/03/15 + */ +class LevelDBStore(val dir: File, + val options: Options, + val numThreads: Int = Runtime.getRuntime.availableProcessors) + extends Store[Array[Byte], Array[Byte]] { + + private lazy val db = factory.open(dir, options) + private val futurePool = FuturePool(Executors.newFixedThreadPool(numThreads)) + + /** + * Get a single key from the store. + * Prefer multiGet if you are getting more than one key at a time. + */ + override def get(k: Array[Byte]): Future[Option[Array[Byte]]] = + futurePool { Option(db.get(k)) } + + /** + * Replace a value. + * Delete is the same as put((k,None)). + */ + override def put(kv: (Array[Byte], Option[Array[Byte]])): Future[Unit] = { + require(kv._1 != null) + kv match { + case (k, Some(v)) => futurePool { + db.put(k, v) + } + case (k, None) => futurePool { + db.delete(k) + } + } + } + + /** + * Replace a set of keys at one time. + */ + override def multiPut[K1 <: Array[Byte]](kvs: Map[K1, Option[Array[Byte]]]) + : Map[K1, Future[Unit]] = { + val future = futurePool { + val batch = db.createWriteBatch() + kvs.foreach { + case (k, Some(v)) => batch.put(k, v) + case (k, None) => batch.delete(k) + } + db.write(batch) + batch.close() + } + kvs.mapValues(_ => future) + } + + /** + * Close this store and release any resources. + * It is undefined what happens on get/multiGet after close + */ + override def close(time: Time): Future[Unit] = { + futurePool { db.close() }.flatMap { _ => super.close(time) } + } +} diff --git a/storehaus-leveldb/src/test/scala/com/twitter/storehaus/leveldb/LevelDBStoreProperties.scala b/storehaus-leveldb/src/test/scala/com/twitter/storehaus/leveldb/LevelDBStoreProperties.scala new file mode 100644 index 00000000..189a3779 --- /dev/null +++ b/storehaus-leveldb/src/test/scala/com/twitter/storehaus/leveldb/LevelDBStoreProperties.scala @@ -0,0 +1,73 @@ +package com.twitter.storehaus.leveldb + +import java.io.File +import java.util + +import com.twitter.storehaus.Store +import com.twitter.storehaus.testing.generator.NonEmpty +import com.twitter.util.{Future, Await} +import org.iq80.leveldb.Options +import org.scalacheck.{Gen, Properties} +import org.scalacheck.Prop.forAll + +import scala.util.Random + +/** + * @author Ben Fradet + */ +object LevelDBStoreProperties extends Properties("LevelDBStore") { + + def putAndGetTest(store: Store[Array[Byte], Array[Byte]], + pairs: Gen[List[(Array[Byte], Option[Array[Byte]])]]) = + forAll(pairs) { examples: List[(Array[Byte], Option[Array[Byte]])] => + examples.forall { + case (k, v) => { + Await.result(store.put(k, v)) + val found = Await.result(store.get(k)) + found match { + case Some(a) => util.Arrays.equals(a, v.get) + case None => found == v + } + } + } + } + + def multiPutAndGetTest(store: Store[Array[Byte], Array[Byte]], + pairs: Gen[List[(Array[Byte], Option[Array[Byte]])]]) = + forAll(pairs) { examples: List[(Array[Byte], Option[Array[Byte]])] => + val examplesMap = examples.toMap + Await.result(Future.collect(store.multiPut(examplesMap).values.toList)) + val result = store.multiGet(examplesMap.keySet) + .map { case (key, v) => (key, Await.result(v)) } + + val stringifiedResults = stringifyMap(result) + val stringifiedExamples = stringifyMap(examplesMap) + + stringifiedResults == stringifiedExamples + } + + private def stringifyMap(map: Map[Array[Byte], Option[Array[Byte]]]) + :Map[String, Option[String]] = { + map.map { + case (k, Some(v)) => (new String(k, "UTF-8"), + Some(new String(v, "UTF-8"))) + case (k, None) => (new String(k, "UTF-8"), None) + } + } + + property("LevelDB[Array[Byte], Array[Byte]] single") = { + val dir = new File(System.getProperty("java.io.tmpdir"), + "leveldb-test-" + new Random().nextInt(Int.MaxValue)) + dir.mkdirs() + val store = new LevelDBStore(dir, new Options(), 2) + putAndGetTest(store, NonEmpty.Pairing.byteArrays()) + } + + property("LevelDB[Array[Byte], Array[Byte] multi") = { + val dir = new File(System.getProperty("java.io.tmpdir"), + "leveldb-test-multi-" + new Random().nextInt(Int.MaxValue)) + dir.mkdirs() + val store = new LevelDBStore(dir, new Options(), 1) + multiPutAndGetTest(store, NonEmpty.Pairing.byteArrays()) + } +} diff --git a/storehaus-memcache/src/main/scala/com/twitter/storehaus/memcache/MemcacheLongStore.scala b/storehaus-memcache/src/main/scala/com/twitter/storehaus/memcache/MemcacheLongStore.scala index a9284c0e..35335f80 100644 --- a/storehaus-memcache/src/main/scala/com/twitter/storehaus/memcache/MemcacheLongStore.scala +++ b/storehaus-memcache/src/main/scala/com/twitter/storehaus/memcache/MemcacheLongStore.scala @@ -18,8 +18,10 @@ package com.twitter.storehaus.memcache import com.twitter.algebird.Semigroup import com.twitter.bijection.NumericInjections +import com.twitter.bijection.twitter_util.UtilBijections import com.twitter.util.{Duration, Future} -import com.twitter.finagle.memcached.Client +import com.twitter.finagle.memcachedx.Client +import com.twitter.io.Buf import com.twitter.storehaus.ConvertedStore import com.twitter.storehaus.algebra.MergeableStore import org.jboss.netty.buffer.ChannelBuffer @@ -33,11 +35,17 @@ import com.twitter.bijection.netty.ChannelBufferBijection * @author Doug Tangren */ object MemcacheLongStore { + import UtilBijections.Owned.byteArrayBufBijection + private implicit val cb2ary = ChannelBufferBijection // Long => String => ChannelBuffer <= String <= Long - private [memcache] implicit val LongInjection: Injection[Long, ChannelBuffer] = + private[memcache] implicit val LongChannelBuffer: Injection[Long, ChannelBuffer] = Injection.connect[Long, String, Array[Byte], ChannelBuffer] + // Long => String => Buf <= String <= Long + private[memcache] implicit val LongBuf: Injection[Long, Buf] = + Injection.connect[Long, String, Array[Byte], Buf] + def apply(client: Client, ttl: Duration = MemcacheStore.DEFAULT_TTL, flag: Int = MemcacheStore.DEFAULT_FLAG) = new MemcacheLongStore(MemcacheStore(client, ttl, flag)) } @@ -58,7 +66,7 @@ class MemcacheLongStore(underlying: MemcacheStore) case None => // memcache does not create on increment underlying .client - .add(k, v.as[ChannelBuffer]) + .add(k, v.as[Buf]) .flatMap { b => if(b) Future.value(None) else merge(kv) } } } diff --git a/storehaus-memcache/src/main/scala/com/twitter/storehaus/memcache/MemcacheStore.scala b/storehaus-memcache/src/main/scala/com/twitter/storehaus/memcache/MemcacheStore.scala index c258fa79..9a0676e0 100644 --- a/storehaus-memcache/src/main/scala/com/twitter/storehaus/memcache/MemcacheStore.scala +++ b/storehaus-memcache/src/main/scala/com/twitter/storehaus/memcache/MemcacheStore.scala @@ -16,15 +16,16 @@ package com.twitter.storehaus.memcache -import com.twitter.algebird.Monoid +import com.twitter.algebird.Semigroup import com.twitter.bijection.{ Bijection, Codec, Injection } import com.twitter.bijection.netty.Implicits._ import com.twitter.conversions.time._ import com.twitter.finagle.builder.ClientBuilder -import com.twitter.finagle.memcached.KetamaClientBuilder -import com.twitter.finagle.memcached.protocol.text.Memcached +import com.twitter.finagle.memcachedx.KetamaClientBuilder +import com.twitter.finagle.memcachedx.protocol.text.Memcached +import com.twitter.finagle.netty3.{BufChannelBuffer, ChannelBufferBuf} import com.twitter.util.{ Duration, Future, Time } -import com.twitter.finagle.memcached.{ GetResult, Client } +import com.twitter.finagle.memcachedx.{ GetResult, Client } import com.twitter.storehaus.{ FutureOps, Store, WithPutTtl } import com.twitter.storehaus.algebra.MergeableStore import org.jboss.netty.buffer.ChannelBuffer @@ -92,39 +93,57 @@ object MemcacheStore { /** * Returns a Memcache-backed MergeableStore[K, V] that uses * implicitly-supplied Injection instances from K and V -> - * Array[Byte] to manage type conversion. The Monoid[V] is also + * Array[Byte] to manage type conversion. The Semigroup[V] is also * pulled in implicitly. */ - def mergeable[K: Codec, V: Codec: Monoid](client: Client, keyPrefix: String, + def mergeable[K: Codec, V: Codec: Semigroup](client: Client, keyPrefix: String, ttl: Duration = DEFAULT_TTL, flag: Int = DEFAULT_FLAG): MergeableStore[K, V] = MergeableStore.fromStore( MemcacheStore.typed(client, keyPrefix, ttl, flag) ) + + /** + * Returns a Memcache-backed MergeableStore[K, V] that uses + * compare-and-swap with retries. It supports multiple concurrent + * writes to the same key and is useful when one thread/node does not + * own a key space. + */ + def mergeableWithCAS[K, V: Semigroup](client: Client, retries: Int, + ttl: Duration = DEFAULT_TTL, flag: Int = DEFAULT_FLAG)(kfn: K => String) + (implicit inj: Injection[V, ChannelBuffer]): MergeableStore[K, V] = + MergeableMemcacheStore[K, V](client, ttl, flag, retries)(kfn)(inj, implicitly[Semigroup[V]]) } -class MemcacheStore(val client: Client, ttl: Duration, flag: Int) +class MemcacheStore(val client: Client, val ttl: Duration, val flag: Int) extends Store[String, ChannelBuffer] with WithPutTtl[String, ChannelBuffer, MemcacheStore] { override def withPutTtl(ttl: Duration) = new MemcacheStore(client, ttl, flag) - override def get(k: String): Future[Option[ChannelBuffer]] = client.get(k) + override def get(k: String): Future[Option[ChannelBuffer]] = + client.get(k).flatMap { + case None => Future.None + case Some(buf) => Future.value(Some(ChannelBufferBuf.Owned.extract(buf))) + } override def multiGet[K1 <: String](ks: Set[K1]): Map[K1, Future[Option[ChannelBuffer]]] = { val memcacheResult: Future[Map[String, Future[Option[ChannelBuffer]]]] = client.getResult(ks).map { result => - result.hits.mapValues { v => Future.value(Some(v.value)) } ++ - result.failures.mapValues { Future.exception(_) } + result.hits.mapValues { v => + Future.value(Some(BufChannelBuffer(v.value))) + } ++ result.failures.mapValues(Future.exception) } FutureOps.liftValues(ks, memcacheResult, { (k: K1) => Future.value(Future.None) }) .mapValues { _.flatten } } - protected def set(k: String, v: ChannelBuffer) = client.set(k, flag, ttl.fromNow, v) + protected def set(k: String, v: ChannelBuffer) = + client.set(k, flag, ttl.fromNow, ChannelBufferBuf.Owned(v)) override def put(kv: (String, Option[ChannelBuffer])): Future[Unit] = kv match { - case (key, Some(value)) => client.set(key, flag, ttl.fromNow, value) + case (key, Some(value)) => + client.set(key, flag, ttl.fromNow, ChannelBufferBuf.Owned(value)) case (key, None) => client.delete(key).unit } diff --git a/storehaus-memcache/src/main/scala/com/twitter/storehaus/memcache/MergeableMemcacheStore.scala b/storehaus-memcache/src/main/scala/com/twitter/storehaus/memcache/MergeableMemcacheStore.scala index 5c4416a0..ff730ae2 100644 --- a/storehaus-memcache/src/main/scala/com/twitter/storehaus/memcache/MergeableMemcacheStore.scala +++ b/storehaus-memcache/src/main/scala/com/twitter/storehaus/memcache/MergeableMemcacheStore.scala @@ -18,7 +18,9 @@ package com.twitter.storehaus.memcache import com.twitter.algebird.Semigroup import com.twitter.bijection.Injection -import com.twitter.finagle.memcached.Client +import com.twitter.finagle.memcachedx.Client +import com.twitter.finagle.netty3.{BufChannelBuffer, ChannelBufferBuf} +import com.twitter.io.Buf import com.twitter.storehaus.ConvertedStore import com.twitter.storehaus.algebra.MergeableStore import com.twitter.util.{ Duration, Future } @@ -34,10 +36,16 @@ object MergeableMemcacheStore { // this is to support multiple concurrent writers val MAX_RETRIES = 10 - def apply[V](client: Client, ttl: Duration = MemcacheStore.DEFAULT_TTL, flag: Int = MemcacheStore.DEFAULT_FLAG, + /** + * Returns a Memcache-backed MergeableStore[K, V] that uses + * compare-and-swap with retries. It supports multiple concurrent + * writes to the same key and is useful when one thread/node does not + * own a key space. + */ + def apply[K, V](client: Client, ttl: Duration = MemcacheStore.DEFAULT_TTL, flag: Int = MemcacheStore.DEFAULT_FLAG, maxRetries: Int = MAX_RETRIES) - (inj: Injection[V, ChannelBuffer], semigroup: Semigroup[V]) = - new MergeableMemcacheStore[V](MemcacheStore(client, ttl, flag), maxRetries)(inj, semigroup) + (kfn: K => String)(implicit inj: Injection[V, ChannelBuffer], semigroup: Semigroup[V]) = + new MergeableMemcacheStore[K, V](MemcacheStore(client, ttl, flag), maxRetries)(kfn)(inj, semigroup) } /** Returned when merge fails after a certain number of retries */ @@ -51,46 +59,60 @@ class MergeFailedException(val key: String) * see a performance hit if there are too many concurrent writes to a hot key. * The solution is to group by a hot key, and use only a single (or few) writers to that key. */ -class MergeableMemcacheStore[V](underlying: MemcacheStore, maxRetries: Int)(implicit inj: Injection[V, ChannelBuffer], +class MergeableMemcacheStore[K, V](underlying: MemcacheStore, maxRetries: Int)(kfn: K => String) + (implicit inj: Injection[V, ChannelBuffer], override val semigroup: Semigroup[V]) - extends ConvertedStore[String, String, ChannelBuffer, V](underlying)(identity) - with MergeableStore[String, V] { + extends ConvertedStore[String, K, ChannelBuffer, V](underlying)(kfn)(inj) + with MergeableStore[K, V] { // NOTE: we might want exponential backoff if there are a lot of retries. // use a timer to wait a random interval between [0,t), then [0,2t), then [0,4t), then [0,16t), etc... // retryable merge - protected def doMerge(kv: (String, V), currentRetry: Int) : Future[Option[V]] = + protected def doMerge(kv: (K, V), currentRetry: Int) : Future[Option[V]] = { + val key = kfn(kv._1) (currentRetry > maxRetries) match { case false => // use 'gets' api to obtain casunique token - underlying.client.gets(kv._1).flatMap { res : Option[(ChannelBuffer, ChannelBuffer)] => - res match { - case Some((cbValue, casunique)) => - inj.invert(cbValue) match { - case Success(v) => // attempt cas - val resV = semigroup.plus(v, kv._2) - underlying.client.cas(kv._1, inj.apply(resV), casunique).flatMap { success => - success.booleanValue match { - case true => Future.value(Some(v)) - case false => doMerge(kv, currentRetry + 1) // retry - } + underlying.client.gets(key).flatMap { + case Some((cbValue, casunique)) => + inj.invert(BufChannelBuffer(cbValue)) match { + case Success(v) => // attempt cas + val resV = semigroup.plus(v, kv._2) + val buf = ChannelBufferBuf.Owned(inj.apply(resV)) + underlying.client.cas( + key, + underlying.flag, + underlying.ttl.fromNow, + buf, + casunique + ).flatMap { success => + success.booleanValue match { + case true => Future.value(Some(v)) + case false => doMerge(kv, currentRetry + 1) // retry } - case Failure(ex) => Future.exception(ex) - } - // no pre-existing value, try to 'add' it - case None => - underlying.client.add(kv._1, inj.apply(kv._2)).flatMap { success => - success.booleanValue match { - case true => Future.None - case false => doMerge(kv, currentRetry + 1) // retry, next retry should call cas } + case Failure(ex) => Future.exception(ex) + } + // no pre-existing value, try to 'add' it + case None => + val buf = ChannelBufferBuf.Owned(inj.apply(kv._2)) + underlying.client.add( + key, + underlying.flag, + underlying.ttl.fromNow, + buf + ).flatMap { success => + success.booleanValue match { + case true => Future.None + case false => doMerge(kv, currentRetry + 1) // retry, next retry should call cas } - } + } } // no more retries - case true => Future.exception(new MergeFailedException(kv._1)) + case true => Future.exception(new MergeFailedException(key)) } + } - override def merge(kv: (String, V)): Future[Option[V]] = doMerge(kv, 1) + override def merge(kv: (K, V)): Future[Option[V]] = doMerge(kv, 1) } diff --git a/storehaus-memcache/src/test/scala/com/twitter/storehaus/memecache/MemcacheMergeableStoreProperties.scala b/storehaus-memcache/src/test/scala/com/twitter/storehaus/memcache/MemcacheMergeableStoreProperties.scala similarity index 82% rename from storehaus-memcache/src/test/scala/com/twitter/storehaus/memecache/MemcacheMergeableStoreProperties.scala rename to storehaus-memcache/src/test/scala/com/twitter/storehaus/memcache/MemcacheMergeableStoreProperties.scala index 85b4c66f..980df693 100644 --- a/storehaus-memcache/src/test/scala/com/twitter/storehaus/memecache/MemcacheMergeableStoreProperties.scala +++ b/storehaus-memcache/src/test/scala/com/twitter/storehaus/memcache/MemcacheMergeableStoreProperties.scala @@ -17,32 +17,31 @@ package com.twitter.storehaus.memcache import com.twitter.algebird.Semigroup -import com.twitter.bijection.{ Injection, NumericInjections } +import com.twitter.bijection.Injection import com.twitter.bijection.netty.ChannelBufferBijection -import com.twitter.finagle.memcached.Client +import com.twitter.finagle.memcachedx.Client import com.twitter.storehaus.testing.SelfAggregatingCloseableCleanup import com.twitter.storehaus.testing.generator.NonEmpty -import com.twitter.util.{Await, Future} +import com.twitter.util.Await import org.jboss.netty.buffer.ChannelBuffer -import org.scalacheck.Arbitrary import org.scalacheck.Gen import org.scalacheck.Properties import org.scalacheck.Prop.forAll /** Unit test using Long values */ object MergeableMemcacheStoreProperties extends Properties("MergeableMemcacheStore") - with SelfAggregatingCloseableCleanup[MergeableMemcacheStore[Long]] { + with SelfAggregatingCloseableCleanup[MergeableMemcacheStore[String, Long]] { - def put(s: MergeableMemcacheStore[Long], pairs: List[(String, Option[Long])]) = { + def put(s: MergeableMemcacheStore[String, Long], pairs: List[(String, Option[Long])]) = { pairs.foreach { case (k, v) => Await.result(s.put((k, v))) } } - def merge(s: MergeableMemcacheStore[Long], kvs: Map[String, Long]) = s.multiMerge(kvs) + def merge(s: MergeableMemcacheStore[String, Long], kvs: Map[String, Long]) = s.multiMerge(kvs) - def putAndGetStoreTest(store: MergeableMemcacheStore[Long], + def putAndGetStoreTest(store: MergeableMemcacheStore[String, Long], pairs: Gen[List[(String, Option[Long])]] = NonEmpty.Pairing.alphaStrNumerics[Long](10)) = forAll(pairs) { (examples: List[(String, Option[Long])]) => put(store, examples) @@ -52,7 +51,7 @@ object MergeableMemcacheStoreProperties extends Properties("MergeableMemcacheSto } } - def mergeStoreTest(store: MergeableMemcacheStore[Long], + def mergeStoreTest(store: MergeableMemcacheStore[String, Long], pairs: Gen[List[(String, Option[Long])]] = NonEmpty.Pairing.alphaStrNumerics[Long](10)) = forAll(pairs) { (examples: List[(String, Option[Long])]) => put(store, examples) @@ -84,7 +83,7 @@ object MergeableMemcacheStoreProperties extends Properties("MergeableMemcacheSto val client = Client("localhost:11211") val injection = Injection.connect[Long, String, Array[Byte], ChannelBuffer] val semigroup = implicitly[Semigroup[Long]] - val store = MergeableMemcacheStore[Long](client)(injection, semigroup) + val store = MergeableMemcacheStore[String, Long](client)(identity)(injection, semigroup) putAndGetStoreTest(store) && mergeStoreTest(store) } diff --git a/storehaus-mysql/src/main/scala/com/twitter/storehaus/mysql/MySQLStore.scala b/storehaus-mysql/src/main/scala/com/twitter/storehaus/mysql/MySQLStore.scala index d32d992d..98a9a825 100644 --- a/storehaus-mysql/src/main/scala/com/twitter/storehaus/mysql/MySQLStore.scala +++ b/storehaus-mysql/src/main/scala/com/twitter/storehaus/mysql/MySQLStore.scala @@ -16,7 +16,7 @@ package com.twitter.storehaus.mysql -import com.twitter.finagle.exp.mysql.{ Client, PreparedStatement, Result } +import com.twitter.finagle.exp.mysql.{ Client, Result, Parameter } import com.twitter.storehaus.FutureOps import com.twitter.storehaus.Store import com.twitter.util.{ Await, Future, Time } @@ -88,10 +88,10 @@ class MySqlStore(protected [mysql] val client: Client, table: String, kCol: Stri // prepared statements to be reused across gets and puts // TODO: should this be non-blocking? this is part of object construction, so maybe not? - protected val selectStmt = Await.result(client.prepare(SELECT_SQL)) - protected val insertStmt = Await.result(client.prepare(INSERT_SQL)) - protected val updateStmt = Await.result(client.prepare(UPDATE_SQL)) - protected val deleteStmt = Await.result(client.prepare(DELETE_SQL)) + protected val selectStmt = client.prepare(SELECT_SQL) + protected val insertStmt = client.prepare(INSERT_SQL) + protected val updateStmt = client.prepare(UPDATE_SQL) + protected val deleteStmt = client.prepare(DELETE_SQL) protected [mysql] def startTransaction : Future[Unit] = client.query(START_TXN_SQL).unit protected [mysql] def commitTransaction : Future[Unit] = client.query(COMMIT_TXN_SQL).unit @@ -101,40 +101,35 @@ class MySqlStore(protected [mysql] val client: Client, table: String, kCol: Stri val insertSql = MULTI_INSERT_SQL_PREFIX + Stream.continually("(?, ?)").take(kvs.size).mkString(",") val insertParams = kvs.map { kv => List(MySqlStringInjection(kv._1).getBytes, MySqlStringInjection(kv._2).getBytes) - }.toSeq.flatten - client.prepareAndExecute(insertSql, insertParams:_*).map { case (ps, r) => - // close prepared statement on server - client.closeStatement(ps) - } + }.toSeq.flatten.map(Parameter.wrap[Array[Byte]]) + client.prepare(insertSql)(insertParams:_*) } protected [mysql] def executeMultiUpdate[K1 <: MySqlValue](kvs: Map[K1, MySqlValue]) = { val updateSql = MULTI_UPDATE_SQL_PREFIX + Stream.continually("WHEN ? THEN ?").take(kvs.size).mkString(" ") + MULTI_UPDATE_SQL_INFIX + Stream.continually("?").take(kvs.size).mkString("(", ",", ")") + val updateParams = kvs.map { kv => (MySqlStringInjection(kv._1).getBytes, MySqlStringInjection(kv._2).getBytes) } // params for "WHEN ? THEN ?" - val updateCaseParams = updateParams.map { kv => List(kv._1, kv._2) }.toSeq.flatten + val updateCaseParams: Seq[Parameter] = + updateParams.map { kv => List(kv._1, kv._2) }.toSeq.flatten.map(Parameter.wrap[Array[Byte]]) // params for "IN (?, ?, ?)" - val updateInParams = updateParams.map { kv => kv._1 }.toSeq - client.prepareAndExecute(updateSql, (updateCaseParams ++ updateInParams):_*).map { case (ps, r) => - // close prepared statement on server - client.closeStatement(ps) - } + val updateInParams = updateParams.map { kv => kv._1 }.toSeq.map(Parameter.wrap[Array[Byte]]) + client.prepare(updateSql)((updateCaseParams ++ updateInParams):_*) } override def get(k: MySqlValue): Future[Option[MySqlValue]] = { // finagle-mysql select() method lets you pass in a mapping function // to convert resultset into desired output format // we assume here the mysql client already has the dbname/schema selected - selectStmt.parameters = Array(MySqlStringInjection(k).getBytes) - val mysqlResult: Future[Seq[Option[MySqlValue]]] = client.select(selectStmt) { row => - row(vCol) match { case None => None; case Some(v) => Some(MySqlValue(v)) } - } - mysqlResult.map { case result => - result.lift(0).flatten.headOption - } + val mysqlResult = + selectStmt.select(MySqlStringInjection(k).getBytes) { row => + row(vCol).map { MySqlValue(_)} + } + + mysqlResult.map { result => result.lift(0).flatten.headOption } } override def multiGet[K1 <: MySqlValue](ks: Set[K1]): Map[K1, Future[Option[MySqlValue]]] = { @@ -142,13 +137,15 @@ class MySqlStore(protected [mysql] val client: Client, table: String, kCol: Stri // build preparedstatement based on keyset size val placeholders = Stream.continually("?").take(ks.size).mkString("(", ",", ")") val selectSql = MULTI_SELECT_SQL_PREFIX + placeholders - val mysqlResult: Future[(PreparedStatement,Seq[(Option[MySqlValue], Option[MySqlValue])])] = - client.prepareAndSelect(selectSql, ks.map(key => MySqlStringInjection(key).getBytes).toSeq:_* ) { row => - (row(kCol).map(MySqlValue(_)), row(vCol).map(MySqlValue(_))) - } - FutureOps.liftValues(ks, - mysqlResult.map { case (ps, rows) => - client.closeStatement(ps) + + val params = ks.map(key => MySqlStringInjection(key).getBytes).toSeq.map(Parameter.wrap[Array[Byte]]) + val mysqlResult = + client.prepare(selectSql).select(params:_*) { row => + (row(kCol).map(MySqlValue(_)), row(vCol).map(MySqlValue(_))) + } + FutureOps.liftValues( + ks, + mysqlResult.map { rows => rows.toMap.filterKeys { _ != None }.map { case (optK, optV) => (optK.get, optV) } }, { (k: K1) => Future.None } @@ -203,11 +200,8 @@ class MySqlStore(protected [mysql] val client: Client, table: String, kCol: Stri case true => Future.Unit case false => val deleteSql = MULTI_DELETE_SQL_PREFIX + Stream.continually("?").take(deleteKeys.size).mkString("(", ",", ")") - val deleteParams = deleteKeys.map { k => MySqlStringInjection(k).getBytes }.toSeq - client.prepareAndExecute(deleteSql, deleteParams:_*).map { case (ps, r) => - // close prepared statement on server - client.closeStatement(ps) - } + val deleteParams = deleteKeys.map { k => MySqlStringInjection(k).getBytes }.toSeq.map(Parameter.wrap[Array[Byte]]) + client.prepare(deleteSql)(deleteParams:_*) } // sequence the three queries. the inner futures are lazy @@ -224,10 +218,6 @@ class MySqlStore(protected [mysql] val client: Client, table: String, kCol: Stri override def close(t: Time) = { // close prepared statements before closing the connection - client.closeStatement(selectStmt) - client.closeStatement(insertStmt) - client.closeStatement(updateStmt) - client.closeStatement(deleteStmt) client.close(t) } @@ -239,18 +229,15 @@ class MySqlStore(protected [mysql] val client: Client, table: String, kCol: Stri get(k).flatMap { optionV => optionV match { case Some(value) => - updateStmt.parameters = Array(MySqlStringInjection(v).getBytes, MySqlStringInjection(k).getBytes) - client.execute(updateStmt) + updateStmt(MySqlStringInjection(v).getBytes, MySqlStringInjection(k).getBytes) case None => - insertStmt.parameters = Array(MySqlStringInjection(k).getBytes, MySqlStringInjection(v).getBytes) - client.execute(insertStmt) + insertStmt(MySqlStringInjection(k).getBytes, MySqlStringInjection(v).getBytes) } } } protected def doDelete(k: MySqlValue): Future[Result] = { - deleteStmt.parameters = Array(MySqlStringInjection(k).getBytes) - client.execute(deleteStmt) + deleteStmt(MySqlStringInjection(k).getBytes) } // enclose table or column names in backticks, in case they happen to be sql keywords diff --git a/storehaus-mysql/src/main/scala/com/twitter/storehaus/mysql/ValueMapper.scala b/storehaus-mysql/src/main/scala/com/twitter/storehaus/mysql/ValueMapper.scala index c5f6a7af..2f500989 100644 --- a/storehaus-mysql/src/main/scala/com/twitter/storehaus/mysql/ValueMapper.scala +++ b/storehaus-mysql/src/main/scala/com/twitter/storehaus/mysql/ValueMapper.scala @@ -95,7 +95,12 @@ object MySqlValue { case v: Int => new MySqlValue(IntValue(v)) case v: Long => new MySqlValue(LongValue(v)) case v: Short => new MySqlValue(ShortValue(v)) - case v: ChannelBuffer => new MySqlValue(RawValue(Type.String, Charset.Utf8_general_ci, false, v.toString(UTF_8).getBytes)) + case v: ChannelBuffer => + val bytes = Array.ofDim[Byte](v.readableBytes) + v.markReaderIndex() + v.readBytes(bytes) + v.resetReaderIndex() + new MySqlValue(RawValue(Type.Blob, Charset.Binary, isBinary = true, bytes)) case _ => throw new UnsupportedOperationException(v.getClass.getName + " is currently not supported.") } } @@ -124,21 +129,33 @@ class MySqlValue(val v: Value) { * Returns string representation of the finagle-mysql Value wrapped by MySqlValue * Both null values and empty values map to empty string. */ +@deprecated("Use String2MySqlValueInjection", "0.10.0") object MySqlStringInjection extends Injection[MySqlValue, String] { def apply(a: MySqlValue): String = ValueMapper.toString(a.v).getOrElse("") // should this be null: String instead? override def invert(b: String) = Try(MySqlValue(RawValue(Type.String, Charset.Utf8_general_ci, false, b.getBytes))) } +object String2MySqlValueInjection extends Injection[String, MySqlValue] { + def apply(s: String): MySqlValue = MySqlValue(s) + override def invert(m: MySqlValue): Try[String] = Try { ValueMapper.toString(m.v).get } +} + /** * Injection from MySqlValue to ChannelBuffer. * Returns a channel buffer containing the Value wrapped by MySqlValue. * Both null values and empty values map to empty channel buffer. */ +@deprecated("Use ChannelBuffer2MySqlValueInjection", "0.10.0") object MySqlCbInjection extends Injection[MySqlValue, ChannelBuffer] { def apply(a: MySqlValue): ChannelBuffer = ValueMapper.toChannelBuffer(a.v).getOrElse(ChannelBuffers.EMPTY_BUFFER) override def invert(b: ChannelBuffer) = Try(MySqlValue((Type.String, Charset.Utf8_general_ci, false, b.toString(UTF_8)))) } +object ChannelBuffer2MySqlValueInjection extends Injection[ChannelBuffer, MySqlValue] { + def apply(c: ChannelBuffer): MySqlValue = MySqlValue(c) + override def invert(m: MySqlValue): Try[ChannelBuffer] = Try { ValueMapper.toChannelBuffer(m.v).get } +} + object LongMySqlInjection extends Injection[Long, MySqlValue] { def apply(a: Long): MySqlValue = MySqlValue(a) override def invert(b: MySqlValue) = Try(ValueMapper.toLong(b.v).get) diff --git a/storehaus-mysql/src/test/scala/com/twitter/storehaus/mysql/MySQLStoreProperties.scala b/storehaus-mysql/src/test/scala/com/twitter/storehaus/mysql/MySQLStoreProperties.scala index 89e1b484..73469908 100644 --- a/storehaus-mysql/src/test/scala/com/twitter/storehaus/mysql/MySQLStoreProperties.scala +++ b/storehaus-mysql/src/test/scala/com/twitter/storehaus/mysql/MySQLStoreProperties.scala @@ -18,6 +18,7 @@ package com.twitter.storehaus.mysql import java.util.logging.Level +import com.twitter.finagle.exp.Mysql import com.twitter.finagle.exp.mysql.Client import com.twitter.storehaus.testing.SelfAggregatingCloseableCleanup import com.twitter.storehaus.testing.generator.NonEmpty @@ -126,13 +127,15 @@ object MySqlStoreProperties extends Properties("MySqlStore") withStore(multiPutAndMultiGetStoreTest(_, NonEmpty.Pairing.numerics[Short]()), "smallint", "smallint", true) private def withStore[T](f: MySqlStore => T, kColType: String, vColType: String, multiGet: Boolean = false): T = { - val client = Client("localhost:3306", "storehaususer", "test1234", "storehaus_test", Level.WARNING) + val client = Mysql.client + .withCredentials("storehaususer", "test1234") + .withDatabase("storehaus_test") + .newRichClient("127.0.0.1:3306") // these should match mysql setup used in .travis.yml val tableName = "storehaus-mysql-"+kColType+"-"+vColType + ( if (multiGet) { "-multiget" } else { "" } ) - val schema = "CREATE TEMPORARY TABLE IF NOT EXISTS `"+tableName+"` (`key` "+kColType+" DEFAULT NULL, `value` "+vColType+" DEFAULT NULL) ENGINE=InnoDB DEFAULT CHARSET=utf8;" + val schema = s"CREATE TEMPORARY TABLE IF NOT EXISTS `${tableName}` (`key` ${kColType} DEFAULT NULL, `value` ${vColType} DEFAULT NULL) ENGINE=InnoDB DEFAULT CHARSET=utf8;" Await.result(client.query(schema)) - f(newStore(client, tableName)) } diff --git a/storehaus-mysql/src/test/scala/com/twitter/storehaus/mysql/MySqlLongStoreProperties.scala b/storehaus-mysql/src/test/scala/com/twitter/storehaus/mysql/MySqlLongStoreProperties.scala index a7e70a97..ab6c1f0f 100644 --- a/storehaus-mysql/src/test/scala/com/twitter/storehaus/mysql/MySqlLongStoreProperties.scala +++ b/storehaus-mysql/src/test/scala/com/twitter/storehaus/mysql/MySqlLongStoreProperties.scala @@ -18,10 +18,11 @@ package com.twitter.storehaus.mysql import java.util.logging.Level +import com.twitter.finagle.exp.Mysql import com.twitter.finagle.exp.mysql.Client import com.twitter.storehaus.testing.SelfAggregatingCloseableCleanup import com.twitter.storehaus.testing.generator.NonEmpty -import com.twitter.util.{Await, Future} +import com.twitter.util.Await import org.scalacheck.Arbitrary import org.scalacheck.Gen @@ -91,7 +92,10 @@ object MySqlLongStoreProperties extends Properties("MySqlLongStore") withStore(multiMergeStoreTest(_), "text", "bigint", true) private def withStore[T](f: MySqlLongStore => T, kColType: String, vColType: String, merge: Boolean = false): T = { - val client = Client("localhost:3306", "storehaususer", "test1234", "storehaus_test", Level.WARNING) + val client = Mysql.client + .withCredentials("storehaususer", "test1234") + .withDatabase("storehaus_test") + .newRichClient("127.0.0.1:3306") // these should match mysql setup used in .travis.yml val tableName = "storehaus-mysql-long-"+kColType+"-"+vColType + ( if (merge) { "-merge" } else { "" } ) diff --git a/storehaus-mysql/src/test/scala/com/twitter/storehaus/mysql/MySqlValueInjectionProperties.scala b/storehaus-mysql/src/test/scala/com/twitter/storehaus/mysql/MySqlValueInjectionProperties.scala new file mode 100644 index 00000000..f965f9c7 --- /dev/null +++ b/storehaus-mysql/src/test/scala/com/twitter/storehaus/mysql/MySqlValueInjectionProperties.scala @@ -0,0 +1,60 @@ +/* + * Copyright 2013 Twitter Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. You may obtain + * a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.twitter.storehaus.mysql + +import java.util.Arrays + +import org.jboss.netty.buffer.{ChannelBuffers, ChannelBuffer} +import org.scalacheck.{Gen, Arbitrary, Properties} +import org.scalacheck.Prop.forAll + +object MySqlValueInjectionProperties extends Properties("MySqlValue Injections") { + + val channelBufferGenerator = for { + b <- Gen.containerOf[Array, Byte](Arbitrary.arbitrary[Byte]) + } yield ChannelBuffers.wrappedBuffer(b) + + implicit val gen = Arbitrary(channelBufferGenerator) + + property("String2MySqlValue apply and invert") = { + val injection = String2MySqlValueInjection + forAll { (s: String) => + injection.invert(injection(s)).toOption == Some(s) + } + } + + property("ChannelBuffer2MySqlValue apply and invert") = { + val injection = ChannelBuffer2MySqlValueInjection + + forAll { (c: ChannelBuffer) => + val bytes = bytesFromChannelBuffer(c) + val inverted = injection.invert(injection(c)).toOption + inverted.exists { invertedBuf => + val invertedBytes = bytesFromChannelBuffer(invertedBuf) + Arrays.equals(invertedBytes, bytes) + } + } + } + + def bytesFromChannelBuffer(c: ChannelBuffer): Array[Byte] = { + val arr = Array.ofDim[Byte](c.readableBytes) + c.markReaderIndex() + c.readBytes(arr) + c.resetReaderIndex() + arr + } +} diff --git a/storehaus-redis/src/test/scala/com/twitter/storehaus/redis/RedisLongStoreProperties.scala b/storehaus-redis/src/test/scala/com/twitter/storehaus/redis/RedisLongStoreProperties.scala index d72adcc2..78aa139b 100644 --- a/storehaus-redis/src/test/scala/com/twitter/storehaus/redis/RedisLongStoreProperties.scala +++ b/storehaus-redis/src/test/scala/com/twitter/storehaus/redis/RedisLongStoreProperties.scala @@ -25,7 +25,6 @@ import com.twitter.storehaus.testing.CloseableCleanup import com.twitter.storehaus.testing.generator.NonEmpty import org.scalacheck.Arbitrary.arbitrary import org.scalacheck.Gen -import org.scalacheck.Gen.listOf1 import org.scalacheck.Prop._ import org.scalacheck.Properties diff --git a/storehaus-redis/src/test/scala/com/twitter/storehaus/redis/RedisSortedSetSpec.scala b/storehaus-redis/src/test/scala/com/twitter/storehaus/redis/RedisSortedSetSpec.scala index ceb1c47d..050ba889 100644 --- a/storehaus-redis/src/test/scala/com/twitter/storehaus/redis/RedisSortedSetSpec.scala +++ b/storehaus-redis/src/test/scala/com/twitter/storehaus/redis/RedisSortedSetSpec.scala @@ -6,10 +6,10 @@ import com.twitter.storehaus.algebra.MergeableStore import com.twitter.storehaus.testing.CloseableCleanup import com.twitter.util.{ Await, Future } import org.jboss.netty.buffer.ChannelBuffer -import org.specs2.mutable._ +import org.scalatest.{Matchers, WordSpec} import scala.util.Try -class RedisSortedSetSpec extends Specification +class RedisSortedSetSpec extends WordSpec with Matchers with CloseableCleanup[RedisSortedSetStore] with DefaultRedisClient { import com.twitter.bijection.Bijection._ @@ -38,14 +38,12 @@ class RedisSortedSetSpec extends Specification if (xs.isEmpty) None else Some(xs.init, xs.last) } - sequential // Required as tests mutate the store in order - "RedisSortedSet" should { "support Store operations" in { Await.result(for { put <- sets.put(("commits", Some(commits))) commits <- sets.get("commits") - } yield commits) must beSome(commits.sortWith(_._2 < _._2)) + } yield commits) should be(Some(commits.sortWith(_._2 < _._2))) } "support merge operations" in { @@ -53,14 +51,14 @@ class RedisSortedSetSpec extends Specification _ <- sets.merge(("commits", Seq(("sritchie", 1.0)))) commits <- sets.get("commits") } yield commits) - (for (_ ::> last <- merged) yield last) must beSome( - ("sritchie", 138.0)) + (for (_ ::> last <- merged) yield last) should be(Some( + ("sritchie", 138.0))) } "support delete operation" in { Await.result(for { _ <- sets.put(("commits", None)) commits <- sets.get("commits") - } yield commits) must beNone + } yield commits) should be(None) } } @@ -74,7 +72,7 @@ class RedisSortedSetSpec extends Specification Await.result(Future.collect(members.multiPut(putting).values.toSeq)) putting.foreach { case (k, v) => - Await.result(members.get(k)) aka("key %s" format k) must_==(v) + Await.result(members.get(k)) should equal(v) } } "support merge operations" in { @@ -82,14 +80,14 @@ class RedisSortedSetSpec extends Specification Await.result(for { _ <- members.merge((who, 1.0)) score <- members.get(who) - } yield score) aka("score of %s" format who) must beSome(138.0) + } yield score) should be(Some(138.0)) } "support delete operation" in { val who = ("commits", "sritchie") Await.result(for { _ <- members.put((who, None)) score <- members.get(who) - } yield score) aka("score of %s" format who) must beNone + } yield score) should be(None) } } } diff --git a/storehaus-testing/src/main/scala/com/twitter/storehaus/testing/generator/NonEmpty.scala b/storehaus-testing/src/main/scala/com/twitter/storehaus/testing/generator/NonEmpty.scala index 72e72c35..4f92cb14 100644 --- a/storehaus-testing/src/main/scala/com/twitter/storehaus/testing/generator/NonEmpty.scala +++ b/storehaus-testing/src/main/scala/com/twitter/storehaus/testing/generator/NonEmpty.scala @@ -16,18 +16,27 @@ package com.twitter.storehaus.testing.generator -import org.scalacheck.{ Choose, Gen } +import org.scalacheck.Gen._ +import org.scalacheck.Gen /** Generators for non-empty data */ object NonEmpty { /** Generator for non-empty alpha strings of random length */ def alphaStr: Gen[String] = - for(cs <- Gen.listOf1(Gen.alphaChar)) yield cs.mkString + for (cs <- Gen.listOfN(1, Gen.alphaChar)) yield cs.mkString /** Generator for Options of non-empty alpha strings of random length */ def alphaStrOpt: Gen[Option[String]] = alphaStr.flatMap(str => Gen.oneOf(Some(str), None)) + /** Generator for non-empty byte arrays of random length */ + def byteArray: Gen[Array[Byte]] = + for (cs <- Gen.listOfN(1, Gen.alphaChar)) yield cs.mkString.getBytes("UTF-8") + + /** Generator for Options of non-empty by arrays of random length */ + def byteArrayOpt: Gen[Option[Array[Byte]]] = + byteArray.flatMap(array => Gen.oneOf(Some(array), None)) + /** Storehaus pairings of non-empty data. * In most cases this means 2 element tuples of(K, Option[V]) */ object Pairing { @@ -37,12 +46,20 @@ object NonEmpty { opt <- NonEmpty.alphaStrOpt } yield (str, opt) + /** Generator for pairings of non-empty byte arrays and non-empty byte + arrays options*/ + def byteArrayPair: Gen[(Array[Byte], Option[Array[Byte]])] = for { + array <- NonEmpty.byteArray + opt <- NonEmpty.byteArrayOpt + } yield (array, opt) + /** Generator for pairings of non-empty alpha strings to options of positive numerics */ def alphaStrPosNumericPair[T : Numeric : Choose]: Gen[(String, Option[T])] = for { str <- NonEmpty.alphaStr opt <- Gen.posNum[T].flatMap(l => Gen.oneOf(Some(l), None)) } yield (str, opt) + /** Generator for pairings of numerics */ def numericPair[T : Numeric : Choose]: Gen[(T, Option[T])] = for { num <- Gen.posNum[T] opt <- Gen.posNum[T].flatMap(l => Gen.oneOf(Some(l), None)) @@ -52,6 +69,10 @@ object NonEmpty { def alphaStrs(n: Int = 10) = Gen.listOfN(n, alphaStrPair) + /** Generator for non-empty lists of (Array[Byte], Option[Array[Byte]])'s */ + def byteArrays(n: Int = 10) = + Gen.listOfN(n, byteArrayPair) + /** Generator for non-empty lists of (String, Option[T])'s */ def alphaStrNumerics[T : Numeric : Choose](n: Int = 10) = Gen.listOfN(n, alphaStrPosNumericPair[T]) diff --git a/version.sbt b/version.sbt new file mode 100644 index 00000000..185f13e7 --- /dev/null +++ b/version.sbt @@ -0,0 +1 @@ +version in ThisBuild := "0.12.0"