diff --git a/.travis.yml b/.travis.yml index 5263602d..9c859b5f 100644 --- a/.travis.yml +++ b/.travis.yml @@ -1,11 +1,12 @@ language: scala scala: - 2.10.0 - - 2.9.2 + - 2.9.3 before_script: - mysql -u root -e "create database storehaus_test;" - mysql -u root -e "create user 'storehaususer'@'localhost' identified by 'test1234';" - mysql -u root -e "grant all on storehaus_test.* to 'storehaususer'@'localhost';" services: - redis-server - - memcache \ No newline at end of file + - memcache +script: umask 0022 && sbt ++$TRAVIS_SCALA_VERSION test \ No newline at end of file diff --git a/CHANGES.md b/CHANGES.md index f2fc207a..c3ed3aa8 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -1,5 +1,22 @@ # storehaus # +### Version.0.5.1 ### + +* Add storehaus-hbase and upgrade to bijection 0.5.3: https://github.com/twitter/storehaus/pull/139 +* Fix mutable TTL cache bug: https://github.com/twitter/storehaus/pull/136 + +### Version.0.5.0 ### + +* Reuse prepared statements in mysql: https://github.com/twitter/storehaus/issues/93 +* storehaus-testing module: https://github.com/twitter/storehaus/pull/115 +* cache ttl is now a duration, vs a time: https://github.com/twitter/storehaus/pull/100 +* improve performance of CollectionOps: https://github.com/twitter/storehaus/pull/117 +* Augment memcachestore with common functions: https://github.com/twitter/storehaus/pull/121 +* bump twitter-util and finagle versions: https://github.com/twitter/storehaus/pull/125 +* Upgrade to scala 2.9.3, algebird 0.2.0 and Bijection 0.5.2: https://github.com/twitter/storehaus/pull/126 + +Thanks to Doug Tangren, Ruban Monu, Ximing Yu, Ryan LeCompte, Sam Ritchie and Oscar Boykin for contributions! + ### Version.0.4.0 ### * Storehaus-Mysql support for numeric types diff --git a/README.md b/README.md index 4d0905f4..8991eee5 100644 --- a/README.md +++ b/README.md @@ -79,6 +79,7 @@ Storehaus provides a number of modules wrapping existing key-value stores. Enric * [Storehaus-memcache](http://twitter.github.com/storehaus/#com.twitter.storehaus.memcache.MemcacheStore) (wraps Twitter's [finagle-memcached](https://github.com/twitter/finagle/tree/master/finagle-memcached) library) * [Storehaus-mysql](http://twitter.github.com/storehaus/#com.twitter.storehaus.mysql.MySQLStore) (wraps Twitter's [finagle-mysql](https://github.com/twitter/finagle/tree/master/finagle-mysql) library) * [Storehaus-redis](http://twitter.github.com/storehaus/#com.twitter.storehaus.redis.RedisStore) (wraps Twitter's [finagle-redis](https://github.com/twitter/finagle/tree/master/finagle-redis) library) + * [Storehaus-hbase](http://twitter.github.com/storehaus/#com.twitter.storehaus.hbase.HBaseStore) #### Planned Modules @@ -94,22 +95,26 @@ See the [current API documentation](http://twitter.github.com/storehaus) for mor ## Maven -Storehaus modules are available on maven central. The current groupid and version for all modules is, respectively, `"com.twitter"` and `0.4.0`. +Storehaus modules are available on maven central. The current groupid and version for all modules is, respectively, `"com.twitter"` and `0.5.1`. Current published artifacts are -* `storehaus-core_2.9.2` +* `storehaus-core_2.9.3` * `storehaus-core_2.10` -* `storehaus-algebra_2.9.2` +* `storehaus-algebra_2.9.3` * `storehaus-algebra_2.10` -* `storehaus-memcache_2.9.2` +* `storehaus-memcache_2.9.3` * `storehaus-memcache_2.10` -* `storehaus-mysql_2.9.2` +* `storehaus-mysql_2.9.3` * `storehaus-mysql_2.10` -* `storehaus-redis_2.9.2` +* `storehaus-hbase_2.9.3` +* `storehaus-hbase_2.10` +* `storehaus-redis_2.9.3` * `storehaus-redis_2.10` -* `storehaus-cache_2.9.2` +* `storehaus-cache_2.9.3` * `storehaus-cache_2.10` +* `storehaus-testing_2.9.3` +* `storehaus-testing_2.10` The suffix denotes the scala version. diff --git a/project/Build.scala b/project/Build.scala index a5b4543a..3daa1bda 100644 --- a/project/Build.scala +++ b/project/Build.scala @@ -1,3 +1,19 @@ +/* + * Copyright 2013 Twitter inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package storehaus import sbt._ @@ -8,6 +24,13 @@ import com.typesafe.tools.mima.plugin.MimaPlugin.mimaDefaultSettings import com.typesafe.tools.mima.plugin.MimaKeys.previousArtifact object StorehausBuild extends Build { + def withCross(dep: ModuleID) = + dep cross CrossVersion.binaryMapped { + case "2.9.3" => "2.9.2" // TODO: hack because twitter hasn't built things against 2.9.3 + case version if version startsWith "2.10" => "2.10" // TODO: hack because sbt is broken + case x => x + } + val extraSettings = Project.defaultSettings ++ releaseSettings ++ Boilerplate.settings ++ mimaDefaultSettings @@ -19,7 +42,7 @@ object StorehausBuild extends Build { logLevel in Test := Level.Info ) else Seq.empty[Project.Setting[_]] - val testCleanup = extraSettings ++ Seq( + val testCleanup = Seq( testOptions in Test += Tests.Cleanup { loader => val c = loader.loadClass("com.twitter.storehaus.testing.Cleanup$") c.getMethod("cleanup").invoke(c.getField("MODULE$").get(c)) @@ -28,38 +51,27 @@ object StorehausBuild extends Build { val sharedSettings = extraSettings ++ ciSettings ++ Seq( organization := "com.twitter", - crossScalaVersions := Seq("2.9.2", "2.10.0"), - + scalaVersion := "2.9.3", + crossScalaVersions := Seq("2.9.3", "2.10.0"), javacOptions ++= Seq("-source", "1.6", "-target", "1.6"), - javacOptions in doc := Seq("-source", "1.6"), - - libraryDependencies ++= Seq( - "org.scala-tools.testing" %% "specs" % "1.6.9" % "test" withSources() - ), - + libraryDependencies += "org.scala-tools.testing" %% "specs" % "1.6.9" % "test" withSources(), resolvers ++= Seq( Opts.resolver.sonatypeSnapshots, Opts.resolver.sonatypeReleases, "Twitter Maven" at "http://maven.twttr.com" ), - parallelExecution in Test := true, - scalacOptions ++= Seq(Opts.compile.unchecked, Opts.compile.deprecation), // Publishing options: publishMavenStyle := true, - publishArtifact in Test := false, - pomIncludeRepository := { x => false }, - publishTo <<= version { v => Some(if (v.trim.toUpperCase.endsWith("SNAPSHOT")) Opts.resolver.sonatypeSnapshots else Opts.resolver.sonatypeStaging) }, - pomExtra := ( https://github.com/twitter/storehaus @@ -97,10 +109,10 @@ object StorehausBuild extends Build { def youngestForwardCompatible(subProj: String) = Some(subProj) .filterNot(unreleasedModules.contains(_)) - .map { s => "com.twitter" % ("storehaus-" + s + "_2.9.2") % "0.3.0" } + .map { s => "com.twitter" % ("storehaus-" + s + "_2.9.3") % "0.5.0" } - val algebirdVersion = "0.1.13" - val bijectionVersion = "0.4.0" + val algebirdVersion = "0.2.0" + val bijectionVersion = "0.5.3" lazy val storehaus = Project( id = "storehaus", @@ -117,42 +129,63 @@ object StorehausBuild extends Build { storehausMemcache, storehausMySQL, storehausRedis, + storehausHBase, storehausTesting ) def module(name: String) = { val id = "storehaus-%s".format(name) - Project(id = id, base = file(id), settings = sharedSettings ++ Seq( + Project(id = id, base = file(id), settings = sharedSettings ++ testCleanup ++ Seq( Keys.name := id, - previousArtifact := youngestForwardCompatible(name)) ++ testCleanup + previousArtifact := youngestForwardCompatible(name)) ).dependsOn(storehausTesting % "test->test") } lazy val storehausCache = module("cache") lazy val storehausCore = module("core").settings( - libraryDependencies += "com.twitter" %% "util-core" % "6.3.0", - libraryDependencies += "com.twitter" %% "bijection-core" % bijectionVersion + libraryDependencies ++= Seq( + withCross("com.twitter" %% "util-core" % "6.3.7"), + "com.twitter" %% "bijection-core" % bijectionVersion, + "com.twitter" %% "bijection-util" % bijectionVersion + ) ).dependsOn(storehausCache % "test->test;compile->compile") lazy val storehausAlgebra = module("algebra").settings( libraryDependencies += "com.twitter" %% "algebird-core" % algebirdVersion, libraryDependencies += "com.twitter" %% "algebird-util" % algebirdVersion, - libraryDependencies += "com.twitter" %% "bijection-algebird" % bijectionVersion + libraryDependencies += "com.twitter" %% "algebird-bijection" % algebirdVersion ).dependsOn(storehausCore % "test->test;compile->compile") lazy val storehausMemcache = module("memcache").settings( - libraryDependencies += Finagle.module("memcached") + libraryDependencies ++= Seq( + "com.twitter" %% "algebird-core" % algebirdVersion, + "com.twitter" %% "bijection-core" % bijectionVersion, + "com.twitter" %% "bijection-netty" % bijectionVersion, + Finagle.module("memcached") + ) ).dependsOn(storehausAlgebra % "test->test;compile->compile") lazy val storehausMySQL = module("mysql").settings( - libraryDependencies += Finagle.module("mysql", "6.2.1") // tests fail with the latest + libraryDependencies += Finagle.module("mysql") ).dependsOn(storehausCore % "test->test;compile->compile") lazy val storehausRedis = module("redis").settings( libraryDependencies += Finagle.module("redis"), // we don't want various tests clobbering each others keys - parallelExecution in Test := false + parallelExecution in Test := false + ).dependsOn(storehausAlgebra % "test->test;compile->compile") + + lazy val storehausHBase= module("hbase").settings( + libraryDependencies ++= Seq( + "com.twitter" %% "algebird-core" % algebirdVersion, + "com.twitter" %% "bijection-core" % bijectionVersion, + "com.twitter" %% "bijection-hbase" % bijectionVersion , + "org.apache.hbase" % "hbase" % "0.94.6" % "provided->default" classifier "tests" classifier "", + "org.apache.hadoop" % "hadoop-core" % "1.2.0" % "provided->default", + "org.apache.hadoop" % "hadoop-test" % "1.2.0" % "test" + ), + parallelExecution in Test := false ).dependsOn(storehausAlgebra % "test->test;compile->compile") val storehausTesting = Project( @@ -161,9 +194,7 @@ object StorehausBuild extends Build { settings = sharedSettings ++ Seq( name := "storehaus-testing", previousArtifact := youngestForwardCompatible("testing"), - libraryDependencies ++= Seq( - "org.scalacheck" %% "scalacheck" % "1.10.0" withSources() - ) + libraryDependencies += "org.scalacheck" %% "scalacheck" % "1.10.0" withSources() ) ) } diff --git a/project/Finagle.scala b/project/Finagle.scala index 91143fa3..b339bd98 100644 --- a/project/Finagle.scala +++ b/project/Finagle.scala @@ -5,7 +5,7 @@ package storehaus * dependency */ object Finagle { import sbt._ - val LatestVersion = "6.3.0" + val LatestVersion = "6.5.1" def module(name: String, version: String = LatestVersion) = - "com.twitter" %% "finagle-%s".format(name) % version + StorehausBuild.withCross("com.twitter" %% "finagle-%s".format(name) % version) } diff --git a/project/build.properties b/project/build.properties index 175f2744..a8c2f849 100644 --- a/project/build.properties +++ b/project/build.properties @@ -1 +1 @@ -sbt.version=0.12.0 \ No newline at end of file +sbt.version=0.12.0 diff --git a/storehaus-algebra/src/main/scala/com/twitter/storehaus/algebra/ConvertedMergeableStore.scala b/storehaus-algebra/src/main/scala/com/twitter/storehaus/algebra/ConvertedMergeableStore.scala index 47f05255..463807cc 100644 --- a/storehaus-algebra/src/main/scala/com/twitter/storehaus/algebra/ConvertedMergeableStore.scala +++ b/storehaus-algebra/src/main/scala/com/twitter/storehaus/algebra/ConvertedMergeableStore.scala @@ -32,7 +32,7 @@ class ConvertedMergeableStore[K1, -K2, V1, V2](store: MergeableStore[K1, V1])(kf (implicit bij: ImplicitBijection[V2, V1]) extends com.twitter.storehaus.ConvertedStore[K1, K2, V1, V2](store)(kfn)(Injection.fromBijection(bij.bijection)) with MergeableStore[K2, V2] { - import com.twitter.bijection.algebird.AlgebirdBijections._ + import com.twitter.algebird.bijection.AlgebirdBijections._ override def monoid: Monoid[V2] = store.monoid.as[Monoid[V2]] diff --git a/storehaus-algebra/src/main/scala/com/twitter/storehaus/algebra/MutableCacheAlgebra.scala b/storehaus-algebra/src/main/scala/com/twitter/storehaus/algebra/MutableCacheAlgebra.scala index 26819b35..f930f163 100644 --- a/storehaus-algebra/src/main/scala/com/twitter/storehaus/algebra/MutableCacheAlgebra.scala +++ b/storehaus-algebra/src/main/scala/com/twitter/storehaus/algebra/MutableCacheAlgebra.scala @@ -42,21 +42,24 @@ class AlgebraicMutableCache[K, V](cache: MutableCache[K, V]) { new MutableCache[K, U] { override def get(k: K) = for { v <- cache.get(k) - (_, u) <- injection.invert(k, v) + (_, u) <- injection.invert(k, v).toOption } yield u override def +=(ku: (K, U)) = { cache += injection(ku); this } - override def hit(k: K) = cache.hit(k).flatMap { injection.invert(k, _) }.map { _._2 } + override def hit(k: K) = + cache.hit(k) + .flatMap(injection.invert(k, _).toOption).map(_._2) override def evict(k: K) = for { evictedV <- cache.evict(k) - (_, evictedU) <- injection.invert(k, evictedV) + (_, evictedU) <- injection.invert(k, evictedV).toOption } yield evictedU override def empty = new AlgebraicMutableCache(cache.empty).inject(injection) override def clear = { cache.clear; this } - override def iterator = cache.iterator.flatMap(injection.invert(_)) + override def iterator = + cache.iterator.flatMap(injection.invert(_).toOption) } } diff --git a/storehaus-algebra/src/main/scala/com/twitter/storehaus/algebra/TTLInjection.scala b/storehaus-algebra/src/main/scala/com/twitter/storehaus/algebra/TTLInjection.scala index f4d2ab36..5cfde0eb 100644 --- a/storehaus-algebra/src/main/scala/com/twitter/storehaus/algebra/TTLInjection.scala +++ b/storehaus-algebra/src/main/scala/com/twitter/storehaus/algebra/TTLInjection.scala @@ -18,6 +18,7 @@ package com.twitter.storehaus.algebra import com.twitter.algebird.Semigroup import com.twitter.bijection.Injection +import scala.util.{ Success, Failure } /** * Injection that maps values paired with stale values of T => None @@ -27,17 +28,19 @@ import com.twitter.bijection.Injection * @author Sam Ritchie */ +case class ExpiredException[K, V](pair: (K, V)) extends RuntimeException(pair.toString) + class TTLInjection[K, T: Ordering: Semigroup, V](delta: T)(clock: () => T) extends Injection[(K, V), (K, (T, V))] { def apply(pair: (K, V)): (K, (T, V)) = { val (k, v) = pair (k, (Semigroup.plus(clock(), delta), v)) } - override def invert(pair: (K, (T, V))): Option[(K, V)] = { + override def invert(pair: (K, (T, V))) = { val (k, (expiration, v)) = pair if (Ordering[T].gteq(expiration, clock())) - Some((k, v)) + Success(k -> v) else - None + Failure(ExpiredException(k -> v)) } } diff --git a/storehaus-algebra/src/test/scala/com/twitter/storehaus/algebra/MergeableStoreProperties.scala b/storehaus-algebra/src/test/scala/com/twitter/storehaus/algebra/MergeableStoreProperties.scala index 4d4a74f4..eeec71b4 100644 --- a/storehaus-algebra/src/test/scala/com/twitter/storehaus/algebra/MergeableStoreProperties.scala +++ b/storehaus-algebra/src/test/scala/com/twitter/storehaus/algebra/MergeableStoreProperties.scala @@ -17,7 +17,7 @@ package com.twitter.storehaus.algebra import com.twitter.algebird.{ MapAlgebra, Monoid, SummingQueue } -import com.twitter.bijection.algebird.AlgebirdBijections._ +import com.twitter.algebird.bijection.AlgebirdBijections._ import com.twitter.bijection.Injection import com.twitter.storehaus._ import com.twitter.util.Await @@ -93,7 +93,7 @@ object MergeableStoreProperties extends Properties("MergeableStore") { property("Converted MergeableStore obeys the mergeable store laws") = { // We are using a weird monoid on Int here: - import com.twitter.bijection.algebird.AlgebirdBijections._ + import com.twitter.algebird.bijection.AlgebirdBijections._ import com.twitter.bijection.Conversion.asMethod implicit val monoid : Monoid[Int] = implicitly[Monoid[(Short,Short)]].as[Monoid[Int]] diff --git a/storehaus-cache/src/main/scala/com/twitter/storehaus/cache/MutableLRUCache.scala b/storehaus-cache/src/main/scala/com/twitter/storehaus/cache/MutableLRUCache.scala index 616cbb38..5ef6a325 100644 --- a/storehaus-cache/src/main/scala/com/twitter/storehaus/cache/MutableLRUCache.scala +++ b/storehaus-cache/src/main/scala/com/twitter/storehaus/cache/MutableLRUCache.scala @@ -31,7 +31,7 @@ object MutableLRUCache { } class MutableLRUCache[K, V](capacity: Int) extends JMapCache[K, V](() => - new JLinkedHashMap[K, V](capacity + 1, 0.75f) { + new JLinkedHashMap[K, V](capacity + 1, 0.75f, true) { override protected def removeEldestEntry(eldest: JMap.Entry[K, V]) = super.size > capacity }) { diff --git a/storehaus-cache/src/test/scala/com/twitter/storehaus/cache/MutableLRUCacheTest.scala b/storehaus-cache/src/test/scala/com/twitter/storehaus/cache/MutableLRUCacheTest.scala new file mode 100644 index 00000000..6676c4d2 --- /dev/null +++ b/storehaus-cache/src/test/scala/com/twitter/storehaus/cache/MutableLRUCacheTest.scala @@ -0,0 +1,42 @@ +/* + * Copyright 2013 Twitter Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. You may obtain + * a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.twitter.storehaus.cache + +import org.specs._ + +class MutableLRUCacheTest extends Specification { + def freshCache = MutableLRUCache[String, Int](2) + + def checkCache(pairs: Seq[(String, Int)], results: Seq[Boolean]) = { + val cache = freshCache + pairs.foreach(cache += _) + pairs.map { case (k, _) => cache.contains(k) } must be_==(results) + } + + "MutableLRUCache works properly with threshold 2" in { + // At capacity + checkCache( + Seq("a" -> 1, "b" -> 2), + Seq(true, true) + ) + // a is touched, so b and c are evicted + checkCache( + Seq("a" -> 1, "b" -> 2, "c" -> 3, "a" -> 1, "d" -> 4), + Seq(true, false, false, true, true) + ) + } +} diff --git a/storehaus-core/src/main/scala/com/twitter/storehaus/ConvertedStore.scala b/storehaus-core/src/main/scala/com/twitter/storehaus/ConvertedStore.scala index a4e97f34..536b7f3f 100644 --- a/storehaus-core/src/main/scala/com/twitter/storehaus/ConvertedStore.scala +++ b/storehaus-core/src/main/scala/com/twitter/storehaus/ConvertedStore.scala @@ -17,7 +17,10 @@ package com.twitter.storehaus import com.twitter.bijection.Injection -import com.twitter.util.Future +import com.twitter.bijection.Conversion.asMethod +import com.twitter.bijection.twitter_util.UtilBijections._ +import com.twitter.util.{ Future, Try } +import scala.util.{ Success, Failure } /** Use an injection on V2,V1 to convert a store of values V2. * If the value stored in the underlying store cannot be converted back to V2, then you will get a Future.exception @@ -25,11 +28,10 @@ import com.twitter.util.Future * TODO: we should add a specific exception type here so we can safely filter these cases to Future.None if we so choose. */ class ConvertedStore[K1, -K2, V1, V2](store: Store[K1, V1])(kfn: K2 => K1)(implicit inj: Injection[V2, V1]) - extends ConvertedReadableStore[K1, K2, V1, V2](store)(kfn)({ v1: V1 => - inj.invert(v1).map { Future.value(_) } - .getOrElse(Future.exception(new Exception(v1.toString + ": V1 cannot be converted to V2"))) - }) - with Store[K2, V2] { + extends ConvertedReadableStore[K1, K2, V1, V2](store)(kfn)({ v1: V1 => + Future.const(inj.invert(v1).as[Try[V2]]) + }) + with Store[K2, V2] { override def put(kv: (K2, Option[V2])) = { val k1 = kfn(kv._1) @@ -37,7 +39,7 @@ class ConvertedStore[K1, -K2, V1, V2](store: Store[K1, V1])(kfn: K2 => K1)(impli store.put((k1, v1)) } override def multiPut[K3 <: K2](kvs: Map[K3, Option[V2]]) = { - val mapK1V1 = kvs.map { case (k3, v2) => (kfn(k3), v2.map { inj(_) }) } + val mapK1V1 = kvs.map { case (k3, v2) => (kfn(k3), v2.map(inj(_))) } val res: Map[K1, Future[Unit]] = store.multiPut(mapK1V1) kvs.keySet.map { k3 => (k3, res(kfn(k3))) }.toMap } diff --git a/storehaus-core/src/test/scala/com/twitter/storehaus/StoreProperties.scala b/storehaus-core/src/test/scala/com/twitter/storehaus/StoreProperties.scala index f8869154..f626a8d0 100644 --- a/storehaus-core/src/test/scala/com/twitter/storehaus/StoreProperties.scala +++ b/storehaus-core/src/test/scala/com/twitter/storehaus/StoreProperties.scala @@ -23,26 +23,28 @@ import org.scalacheck.Gen.choose import org.scalacheck.Prop._ object StoreProperties extends Properties("Store") { - def baseTest[K: Arbitrary, V: Arbitrary: Equiv](store: Store[K, V]) - (put: (Store[K, V], List[(K, Option[V])]) => Unit) = + def baseTest[K: Arbitrary, V: Arbitrary: Equiv](storeIn: => Store[K, V]) + (put: (Store[K, V], List[(K, Option[V])]) => Unit) = { forAll { (examples: List[(K, Option[V])]) => + lazy val store = storeIn put(store, examples) examples.toMap.forall { case (k, optV) => - Equiv[Option[V]].equiv(Await.result(store.get(k)), optV) + Equiv[Option[V]].equiv(Await.result(store.get(k)), optV) } } + } - def putStoreTest[K: Arbitrary, V: Arbitrary: Equiv](store: Store[K, V]) = + def putStoreTest[K: Arbitrary, V: Arbitrary: Equiv](store: => Store[K, V]) = baseTest(store) { (s, pairs) => pairs.foreach { p => Await.result(s.put(p)) } } - def multiPutStoreTest[K: Arbitrary, V: Arbitrary: Equiv](store: Store[K, V]) = + def multiPutStoreTest[K: Arbitrary, V: Arbitrary: Equiv](store: => Store[K, V]) = baseTest(store) { (s, pairs) => Await.result(FutureOps.mapCollect(s.multiPut(pairs.toMap))) } - def storeTest[K: Arbitrary, V: Arbitrary: Equiv](store: Store[K, V]) = + def storeTest[K: Arbitrary, V: Arbitrary: Equiv](store: => Store[K, V]) = putStoreTest(store) && multiPutStoreTest(store) property("ConcurrentHashMapStore test") = diff --git a/storehaus-hbase/src/main/scala/com/twitter/storehaus/hbase/HBaseByteArrayStore.scala b/storehaus-hbase/src/main/scala/com/twitter/storehaus/hbase/HBaseByteArrayStore.scala new file mode 100644 index 00000000..cca402f0 --- /dev/null +++ b/storehaus-hbase/src/main/scala/com/twitter/storehaus/hbase/HBaseByteArrayStore.scala @@ -0,0 +1,80 @@ +/* + * Copyright 2013 Twitter inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.twitter.storehaus.hbase + +import org.apache.hadoop.hbase.client.HTablePool +import com.twitter.storehaus.Store +import com.twitter.util.Future +import org.apache.hadoop.conf.Configuration + +/** + * @author MansurAshraf + * @since 9/8/13 + */ +object HBaseByteArrayStore { + def apply(quorumNames: Seq[String], + table: String, + columnFamily: String, + column: String, + createTable: Boolean, + pool: HTablePool, + conf: Configuration, + threads: Int): HBaseByteArrayStore = { + val store = new HBaseByteArrayStore(quorumNames, table, columnFamily, column, createTable, pool, conf, threads) + store.validateConfiguration() + store.createTableIfRequired() + store + } + + def apply(quorumNames: Seq[String], + table: String, + columnFamily: String, + column: String, + createTable: Boolean): HBaseByteArrayStore = apply(quorumNames, table, columnFamily, column, createTable, new HTablePool(), new Configuration(), 4) +} + +class HBaseByteArrayStore(protected val quorumNames: Seq[String], + protected val table: String, + protected val columnFamily: String, + protected val column: String, + protected val createTable: Boolean, + protected val pool: HTablePool, + protected val conf: Configuration, + protected val threads: Int) extends Store[Array[Byte], Array[Byte]] with HBaseStore { + + /** get a single key from the store. + * Prefer multiGet if you are getting more than one key at a time + */ + override def get(k: Array[Byte]): Future[Option[Array[Byte]]] = { + getValue(k) + } + + /** + * replace a value + * Delete is the same as put((k,None)) + */ + override def put(kv: (Array[Byte], Option[Array[Byte]])): Future[Unit] = { + putValue(kv) + } + + /** Close this store and release any resources. + * It is undefined what happens on get/multiGet after close + */ + override def close { + pool.close() + } +} diff --git a/storehaus-hbase/src/main/scala/com/twitter/storehaus/hbase/HBaseLongStore.scala b/storehaus-hbase/src/main/scala/com/twitter/storehaus/hbase/HBaseLongStore.scala new file mode 100644 index 00000000..230050ee --- /dev/null +++ b/storehaus-hbase/src/main/scala/com/twitter/storehaus/hbase/HBaseLongStore.scala @@ -0,0 +1,88 @@ +/* + * Copyright 2013 Twitter inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.twitter.storehaus.hbase + +import org.apache.hadoop.hbase.client.HTablePool +import com.twitter.storehaus.Store +import com.twitter.util.Future +import com.twitter.bijection.Injection._ +import org.apache.hadoop.conf.Configuration + +/** + * @author Mansur Ashraf + * @since 9/8/13 + */ +object HBaseLongStore { + def apply(quorumNames: Seq[String], + table: String, + columnFamily: String, + column: String, + createTable: Boolean, + pool: HTablePool, + conf: Configuration, + threads:Int): HBaseLongStore = { + val store = new HBaseLongStore(quorumNames, table, columnFamily, column, createTable, pool, conf,threads) + store.validateConfiguration() + store.createTableIfRequired() + store + } + + def apply(quorumNames: Seq[String], + table: String, + columnFamily: String, + column: String, + createTable: Boolean): HBaseLongStore = apply(quorumNames, table, columnFamily, column, createTable, new HTablePool(), new Configuration(),4) +} + +class HBaseLongStore(protected val quorumNames: Seq[String], + protected val table: String, + protected val columnFamily: String, + protected val column: String, + protected val createTable: Boolean, + protected val pool: HTablePool, + protected val conf: Configuration, + protected val threads:Int) extends Store[String, Long] with HBaseStore { + + /** get a single key from the store. + * Prefer multiGet if you are getting more than one key at a time + */ + override def get(k: String): Future[Option[Long]] = { + import com.twitter.bijection.hbase.HBaseBijections._ + implicit val stringInj = fromBijectionRep[String, StringBytes] + implicit val LongInj = fromBijectionRep[Long, LongBytes] + getValue[String, Long](k) + } + + /** + * replace a value + * Delete is the same as put((k,None)) + */ + override def put(kv: (String, Option[Long])): Future[Unit] = { + import com.twitter.bijection.hbase.HBaseBijections._ + implicit val stringInj = fromBijectionRep[String, StringBytes] + implicit val LongInj = fromBijectionRep[Long, LongBytes] + putValue(kv) + } + + /** Close this store and release any resources. + * It is undefined what happens on get/multiGet after close + */ + override def close { + pool.close() + } +} + diff --git a/storehaus-hbase/src/main/scala/com/twitter/storehaus/hbase/HBaseStore.scala b/storehaus-hbase/src/main/scala/com/twitter/storehaus/hbase/HBaseStore.scala new file mode 100644 index 00000000..b583c755 --- /dev/null +++ b/storehaus-hbase/src/main/scala/com/twitter/storehaus/hbase/HBaseStore.scala @@ -0,0 +1,95 @@ +/* + * Copyright 2013 Twitter inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.twitter.storehaus.hbase + +import org.apache.hadoop.hbase.client._ +import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.hbase.{HColumnDescriptor, HTableDescriptor, HBaseConfiguration} +import com.twitter.bijection.hbase.HBaseBijections._ +import com.twitter.bijection.Conversion._ +import com.twitter.bijection.Injection +import com.twitter.util.{FuturePool, Future} +import scala.Some +import java.util.concurrent.Executors + +/** + * @author Mansur Ashraf + * @since 9/8/13 + */ +trait HBaseStore { + + protected val quorumNames: Seq[String] + protected val createTable: Boolean + protected val table: String + protected val columnFamily: String + protected val column: String + protected val pool: HTablePool + protected val conf: Configuration + protected val threads: Int + protected val futurePool = FuturePool(Executors.newFixedThreadPool(threads)) + + def getHBaseAdmin: HBaseAdmin = { + if (conf.get("hbase.zookeeper.quorum") == null) { + conf.set("hbase.zookeeper.quorum", quorumNames.mkString(",")) + } + val hbaseConf = HBaseConfiguration.create(conf) + new HBaseAdmin(hbaseConf) + } + + def createTableIfRequired() { + val hbaseAdmin = getHBaseAdmin + if (createTable && !hbaseAdmin.tableExists(table)) { + val tableDescriptor = new HTableDescriptor(table) + tableDescriptor.addFamily(new HColumnDescriptor(columnFamily)) + hbaseAdmin.createTable(tableDescriptor) + } + } + + def validateConfiguration() { + import org.apache.commons.lang.StringUtils.isNotEmpty + + require(!quorumNames.isEmpty, "Zookeeper quorums are required") + require(isNotEmpty(columnFamily), "column family is required") + require(isNotEmpty(column), "column is required") + } + + def getValue[K, V](key: K)(implicit keyInj: Injection[K, Array[Byte]], valueInj: Injection[V, Array[Byte]]): Future[Option[V]] = futurePool { + val tbl = pool.getTable(table) + val g = new Get(keyInj(key)) + g.addColumn(columnFamily.as[StringBytes], column.as[StringBytes]) + + val result = tbl.get(g) + val value = result.getValue(columnFamily.as[StringBytes], column.as[StringBytes]) + Option(value).map(v => valueInj.invert(v).get) + } + + def putValue[K, V](kv: (K, Option[V]))(implicit keyInj: Injection[K, Array[Byte]], valueInj: Injection[V, Array[Byte]]): Future[Unit] = { + kv match { + case (k, Some(v)) => futurePool { + val p = new Put(keyInj(k)) + p.add(columnFamily.as[StringBytes], column.as[StringBytes], valueInj(v)) + val tbl = pool.getTable(table) + tbl.put(p) + } + case (k, None) => futurePool { + val delete = new Delete(keyInj(k)) + val tbl = pool.getTable(table) + tbl.delete(delete) + } + } + } +} diff --git a/storehaus-hbase/src/main/scala/com/twitter/storehaus/hbase/HBaseStringStore.scala b/storehaus-hbase/src/main/scala/com/twitter/storehaus/hbase/HBaseStringStore.scala new file mode 100644 index 00000000..e153093c --- /dev/null +++ b/storehaus-hbase/src/main/scala/com/twitter/storehaus/hbase/HBaseStringStore.scala @@ -0,0 +1,84 @@ +/* + * Copyright 2013 Twitter inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.twitter.storehaus.hbase + +import org.apache.hadoop.hbase.client._ +import com.twitter.storehaus.Store +import com.twitter.util.Future +import com.twitter.bijection.Injection._ +import org.apache.hadoop.conf.Configuration + +/** + * @author MansurAshraf + * @since 9/7/13 + */ +object HBaseStringStore { + def apply(quorumNames: Seq[String], + table: String, + columnFamily: String, + column: String, + createTable: Boolean, + pool: HTablePool, + conf: Configuration, + threads: Int): HBaseStringStore = { + val store = new HBaseStringStore(quorumNames, table, columnFamily, column, createTable, pool, conf, threads) + store.validateConfiguration() + store.createTableIfRequired() + store + } + + def apply(quorumNames: Seq[String], + table: String, + columnFamily: String, + column: String, + createTable: Boolean): HBaseStringStore = apply(quorumNames, table, columnFamily, column, createTable, new HTablePool(), new Configuration(), 4) +} + +class HBaseStringStore(protected val quorumNames: Seq[String], + protected val table: String, + protected val columnFamily: String, + protected val column: String, + protected val createTable: Boolean, + protected val pool: HTablePool, + protected val conf: Configuration, + protected val threads: Int) extends Store[String, String] with HBaseStore { + /** get a single key from the store. + * Prefer multiGet if you are getting more than one key at a time + */ + override def get(k: String): Future[Option[String]] = { + import com.twitter.bijection.hbase.HBaseBijections._ + implicit val stringInj = fromBijectionRep[String, StringBytes] + getValue[String, String](k) + } + + /** + * replace a value + * Delete is the same as put((k,None)) + */ + override def put(kv: (String, Option[String])): Future[Unit] = { + import com.twitter.bijection.hbase.HBaseBijections._ + implicit val stringInj = fromBijectionRep[String, StringBytes] + putValue(kv) + } + + /** Close this store and release any resources. + * It is undefined what happens on get/multiGet after close + */ + override def close { + pool.close() + } +} diff --git a/storehaus-hbase/src/test/scala/com/twitter/storehaus/hbase/DefaultHBaseCluster.scala b/storehaus-hbase/src/test/scala/com/twitter/storehaus/hbase/DefaultHBaseCluster.scala new file mode 100644 index 00000000..d15b1de5 --- /dev/null +++ b/storehaus-hbase/src/test/scala/com/twitter/storehaus/hbase/DefaultHBaseCluster.scala @@ -0,0 +1,43 @@ +/* + * Copyright 2013 Twitter inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.twitter.storehaus.hbase + +import org.apache.hadoop.hbase.HBaseTestingUtility +import org.apache.hadoop.hbase.client.HTablePool +import com.twitter.storehaus.testing.CloseableCleanup +import java.io.Closeable + +/** + * @author MansurAshraf + * @since 9/8/13 + */ +trait DefaultHBaseCluster[C <: Closeable] extends CloseableCleanup[C] { + val quorumNames = Seq("localhost:2181") + val table = "summing_bird" + val columnFamily = "sb" + val column = "aggregate" + val createTable = true + val testingUtil = new HBaseTestingUtility() + val conf = testingUtil.getConfiguration + val pool = new HTablePool(conf, 1) + + override def cleanup() { + super.cleanup() + /* testingUtil.shutdownMiniZKCluster() + testingUtil.shutdownMiniCluster()*/ + } +} diff --git a/storehaus-hbase/src/test/scala/com/twitter/storehaus/hbase/HBaseStringStoreProperties.scala b/storehaus-hbase/src/test/scala/com/twitter/storehaus/hbase/HBaseStringStoreProperties.scala new file mode 100644 index 00000000..6506bd95 --- /dev/null +++ b/storehaus-hbase/src/test/scala/com/twitter/storehaus/hbase/HBaseStringStoreProperties.scala @@ -0,0 +1,67 @@ +/* + * Copyright 2013 Twitter inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.twitter.storehaus.hbase + +import org.scalacheck.{Arbitrary, Gen, Properties} +import com.twitter.storehaus.{FutureOps, Store} +import com.twitter.storehaus.testing.generator.NonEmpty +import org.scalacheck.Prop._ +import com.twitter.util.Await + +/** + * @author MansurAshraf + * @since 9/8/13 + */ +object HBaseStringStoreProperties extends Properties("HBaseStore") +with DefaultHBaseCluster[Store[String, String]] { + def validPairs: Gen[List[(String, Option[String])]] = + NonEmpty.Pairing.alphaStrs() + + def baseTest[K: Arbitrary, V: Arbitrary : Equiv](store: Store[K, V], validPairs: Gen[List[(K, Option[V])]]) + (put: (Store[K, V], List[(K, Option[V])]) => Unit) = + forAll(validPairs) { + (examples: List[(K, Option[V])]) => + put(store, examples) + examples.toMap.forall { + case (k, optV) => + val res = Await.result(store.get(k)) + Equiv[Option[V]].equiv(res, optV) + } + } + + def putStoreTest[K: Arbitrary, V: Arbitrary : Equiv](store: Store[K, V], validPairs: Gen[List[(K, Option[V])]]) = + baseTest(store, validPairs) { + (s, pairs) => + pairs.foreach { + case (k, v) => Await.result(s.put((k, v))) + } + } + + def multiPutStoreTest[K: Arbitrary, V: Arbitrary : Equiv](store: Store[K, V], validPairs: Gen[List[(K, Option[V])]]) = + baseTest(store, validPairs) { + (s, pairs) => + Await.result(FutureOps.mapCollect(s.multiPut(pairs.toMap))) + } + + def storeTest(store: Store[String, String]) = + putStoreTest(store, validPairs) && multiPutStoreTest(store, validPairs) + + testingUtil.startMiniCluster() + val closeable =HBaseStringStore(quorumNames, table, columnFamily, column, createTable,pool,conf,4) + property("HBaseStore test") =storeTest(closeable) + +} diff --git a/storehaus-memcache/src/main/scala/com/twitter/storehaus/memcache/HashEncoder.scala b/storehaus-memcache/src/main/scala/com/twitter/storehaus/memcache/HashEncoder.scala index fae8d6a9..278e498e 100644 --- a/storehaus-memcache/src/main/scala/com/twitter/storehaus/memcache/HashEncoder.scala +++ b/storehaus-memcache/src/main/scala/com/twitter/storehaus/memcache/HashEncoder.scala @@ -16,6 +16,7 @@ package com.twitter.storehaus.memcache +import com.twitter.bijection.{ Base64String, Bijection, Codec, Injection } import com.twitter.util.Encoder /** @@ -28,10 +29,7 @@ import com.twitter.util.Encoder * @author Sam Ritchie */ -// See this reference for other algorithm names: -// http://docs.oracle.com/javase/1.4.2/docs/guide/security/CryptoSpec.html#AppA - -class HashEncoder(hashFunc: String = "SHA-256") extends Encoder[Array[Byte],Array[Byte]] { +class HashEncoder(hashFunc: String) extends Encoder[Array[Byte], Array[Byte]] { def encode(bytes: Array[Byte]): Array[Byte] = { val md = java.security.MessageDigest.getInstance(hashFunc) md.digest(bytes) @@ -39,6 +37,19 @@ class HashEncoder(hashFunc: String = "SHA-256") extends Encoder[Array[Byte],Arra } object HashEncoder { - def apply() = new HashEncoder - def apply(hashFunc: String) = new HashEncoder(hashFunc) + // See this reference for other algorithm names: + // http://docs.oracle.com/javase/1.4.2/docs/guide/security/CryptoSpec.html#AppA + val DEFAULT_HASH_FUNC = "SHA-256" + + def apply(hashFunc: String = DEFAULT_HASH_FUNC) = new HashEncoder(hashFunc) + + /** + * Returns a function that encodes a key to a hashed, base64-encoded + * Memcache key string given a unique namespace string. + */ + def keyEncoder[T](namespace: String, hashFunc: String = DEFAULT_HASH_FUNC) + (implicit inj: Codec[T]): T => String = { key: T => + def concat(bytes: Array[Byte]): Array[Byte] = namespace.getBytes ++ bytes + (inj andThen (concat _) andThen HashEncoder() andThen Bijection.connect[Array[Byte], Base64String])(key).str + } } diff --git a/storehaus-memcache/src/main/scala/com/twitter/storehaus/memcache/MemcacheStore.scala b/storehaus-memcache/src/main/scala/com/twitter/storehaus/memcache/MemcacheStore.scala index 906efda6..c3eb8d70 100644 --- a/storehaus-memcache/src/main/scala/com/twitter/storehaus/memcache/MemcacheStore.scala +++ b/storehaus-memcache/src/main/scala/com/twitter/storehaus/memcache/MemcacheStore.scala @@ -16,18 +16,29 @@ package com.twitter.storehaus.memcache +import com.twitter.algebird.Monoid +import com.twitter.bijection.{ Bijection, Codec, Injection } +import com.twitter.bijection.netty.Implicits._ import com.twitter.conversions.time._ +import com.twitter.finagle.builder.ClientBuilder +import com.twitter.finagle.memcached.KetamaClientBuilder +import com.twitter.finagle.memcached.protocol.text.Memcached import com.twitter.util.{ Duration, Future } import com.twitter.finagle.memcached.{ GetResult, Client } import com.twitter.storehaus.{ FutureOps, Store, WithPutTtl } +import com.twitter.storehaus.algebra.MergeableStore import org.jboss.netty.buffer.ChannelBuffer +import Store.enrich + /** * @author Oscar Boykin * @author Sam Ritchie */ object MemcacheStore { + import HashEncoder.keyEncoder + // Default Memcached TTL is one day. // For more details of setting expiration time for items in Memcached, please refer to // https://github.com/memcached/memcached/blob/master/doc/protocol.txt#L79 @@ -38,8 +49,57 @@ object MemcacheStore { // http://docs.libmemcached.org/memcached_set.html val DEFAULT_FLAG = 0 + val DEFAULT_CONNECTION_LIMIT = 1 + val DEFAULT_TIMEOUT = 1.seconds + val DEFAULT_RETRIES = 2 + def apply(client: Client, ttl: Duration = DEFAULT_TTL, flag: Int = DEFAULT_FLAG) = new MemcacheStore(client, ttl, flag) + + def defaultClient( + name: String, + nodeString: String, + retries: Int = DEFAULT_RETRIES, + timeout: Duration = DEFAULT_TIMEOUT, + hostConnectionLimit: Int = DEFAULT_CONNECTION_LIMIT): Client = { + val builder = ClientBuilder() + .name(name) + .retries(retries) + .tcpConnectTimeout(timeout) + .requestTimeout(timeout) + .connectTimeout(timeout) + .readerIdleTimeout(timeout) + .hostConnectionLimit(hostConnectionLimit) + .codec(Memcached()) + + KetamaClientBuilder() + .clientBuilder(builder) + .nodes(nodeString) + .build() + } + + /** + * Returns a Memcache-backed Store[K, V] that uses + * implicitly-supplied Injection instances from K and V -> + * Array[Byte] to manage type conversion. + */ + def typed[K: Codec, V: Codec](client: Client, keyPrefix: String, + ttl: Duration = DEFAULT_TTL, flag: Int = DEFAULT_FLAG): Store[K, V] = { + implicit val valueToBuf = Injection.connect[V, Array[Byte], ChannelBuffer] + MemcacheStore(client, ttl, flag).convert(keyEncoder[K](keyPrefix)) + } + + /** + * Returns a Memcache-backed MergeableStore[K, V] that uses + * implicitly-supplied Injection instances from K and V -> + * Array[Byte] to manage type conversion. The Monoid[V] is also + * pulled in implicitly. + */ + def mergeable[K: Codec, V: Codec: Monoid](client: Client, keyPrefix: String, + ttl: Duration = DEFAULT_TTL, flag: Int = DEFAULT_FLAG): MergeableStore[K, V] = + MergeableStore.fromStore( + MemcacheStore.typed(client, keyPrefix, ttl, flag) + ) } class MemcacheStore(val client: Client, ttl: Duration, flag: Int) diff --git a/storehaus-memcache/src/main/scala/com/twitter/storehaus/memcache/MemcacheStringStore.scala b/storehaus-memcache/src/main/scala/com/twitter/storehaus/memcache/MemcacheStringStore.scala index af9a63eb..8ba0eb4f 100644 --- a/storehaus-memcache/src/main/scala/com/twitter/storehaus/memcache/MemcacheStringStore.scala +++ b/storehaus-memcache/src/main/scala/com/twitter/storehaus/memcache/MemcacheStringStore.scala @@ -24,7 +24,8 @@ import com.twitter.storehaus.ConvertedStore import com.twitter.storehaus.algebra.MergeableStore import org.jboss.netty.buffer.{ ChannelBuffer, ChannelBuffers } import org.jboss.netty.util.CharsetUtil -import scala.util.control.Exception.allCatch + +import scala.util.Try /** * @author Doug Tangren @@ -34,7 +35,7 @@ object MemcacheStringStore { private [memcache] implicit object ByteArrayInjection extends Injection[Array[Byte],ChannelBuffer] { def apply(ary: Array[Byte]) = ChannelBuffers.wrappedBuffer(ary) - def invert(buf: ChannelBuffer) = allCatch.opt(buf.array) + def invert(buf: ChannelBuffer) = Try(buf.array) } private [memcache] implicit val StringInjection = Injection.connect[String, Array[Byte], ChannelBuffer] @@ -45,7 +46,7 @@ object MemcacheStringStore { import MemcacheStringStore._ /** A MergeableStore for String values backed by memcache */ -class MemcacheStringStore(underlying: MemcacheStore) +class MemcacheStringStore(underlying: MemcacheStore) extends ConvertedStore[String, String, ChannelBuffer, String](underlying)(identity) with MergeableStore[String, String] { diff --git a/storehaus-mysql/src/main/scala/com/twitter/storehaus/mysql/MySQLStore.scala b/storehaus-mysql/src/main/scala/com/twitter/storehaus/mysql/MySQLStore.scala index 0fb5fc1f..89ddec98 100644 --- a/storehaus-mysql/src/main/scala/com/twitter/storehaus/mysql/MySQLStore.scala +++ b/storehaus-mysql/src/main/scala/com/twitter/storehaus/mysql/MySQLStore.scala @@ -19,7 +19,7 @@ package com.twitter.storehaus.mysql import com.twitter.finagle.exp.mysql.{ Client, PreparedStatement, Result } import com.twitter.storehaus.FutureOps import com.twitter.storehaus.Store -import com.twitter.util.Future +import com.twitter.util.{ Await, Future, Time } /** * @author Ruban Monu @@ -68,15 +68,22 @@ class MySqlStore(client: Client, table: String, kCol: String, vCol: String) val UPDATE_SQL = "UPDATE " + g(table) + " SET " + g(vCol) + "=? WHERE " + g(kCol) + "=?" val DELETE_SQL = "DELETE FROM " + g(table) + " WHERE " + g(kCol) + "=?" + // prepared statements to be reused across gets and puts + // TODO: should this be non-blocking? this is part of object construction, so maybe not? + val selectStmt = Await.result(client.prepare(SELECT_SQL)) + val insertStmt = Await.result(client.prepare(INSERT_SQL)) + val updateStmt = Await.result(client.prepare(UPDATE_SQL)) + val deleteStmt = Await.result(client.prepare(DELETE_SQL)) + override def get(k: MySqlValue): Future[Option[MySqlValue]] = { // finagle-mysql select() method lets you pass in a mapping function // to convert resultset into desired output format // we assume here the mysql client already has the dbname/schema selected - val mysqlResult: Future[(PreparedStatement,Seq[Option[MySqlValue]])] = client.prepareAndSelect(SELECT_SQL, MySqlStringInjection(k).getBytes) { row => + selectStmt.parameters = Array(MySqlStringInjection(k).getBytes) + val mysqlResult: Future[Seq[Option[MySqlValue]]] = client.select(selectStmt) { row => row(vCol) match { case None => None; case Some(v) => Some(MySqlValue(v)) } } - mysqlResult.map { case(ps, result) => - client.closeStatement(ps) + mysqlResult.map { case result => result.lift(0).flatten.headOption } } @@ -108,7 +115,14 @@ class MySqlStore(client: Client, table: String, kCol: String, vCol: String) } } - override def close { client.close } + override def close { + // close prepared statements before closing the connection + client.closeStatement(selectStmt) + client.closeStatement(insertStmt) + client.closeStatement(updateStmt) + client.closeStatement(deleteStmt) + client.close(Time.Bottom) + } protected def doSet(k: MySqlValue, v: MySqlValue): Future[Result] = { // mysql's insert-or-update syntax works only when a primary key is defined: @@ -117,18 +131,19 @@ class MySqlStore(client: Client, table: String, kCol: String, vCol: String) // and insert or update accordingly get(k).flatMap { optionV => optionV match { - case Some(value) => client.prepareAndExecute(UPDATE_SQL, MySqlStringInjection(v).getBytes, MySqlStringInjection(k).getBytes) - case None => client.prepareAndExecute(INSERT_SQL, MySqlStringInjection(k).getBytes, MySqlStringInjection(v).getBytes) + case Some(value) => + updateStmt.parameters = Array(MySqlStringInjection(v).getBytes, MySqlStringInjection(k).getBytes) + client.execute(updateStmt) + case None => + insertStmt.parameters = Array(MySqlStringInjection(k).getBytes, MySqlStringInjection(v).getBytes) + client.execute(insertStmt) } - // prepareAndExecute returns Future[(PreparedStatement,Result)] - }.map { case (ps, result) => client.closeStatement(ps); result } + } } protected def doDelete(k: MySqlValue): Future[Result] = { - // prepareAndExecute returns Future[(PreparedStatement,Result)] - client.prepareAndExecute(DELETE_SQL, MySqlStringInjection(k).getBytes).map { - case (ps, result) => client.closeStatement(ps); result - } + deleteStmt.parameters = Array(MySqlStringInjection(k).getBytes) + client.execute(deleteStmt) } // enclose table or column names in backticks, in case they happen to be sql keywords diff --git a/storehaus-mysql/src/main/scala/com/twitter/storehaus/mysql/ValueMapper.scala b/storehaus-mysql/src/main/scala/com/twitter/storehaus/mysql/ValueMapper.scala index cd8fcf45..a7e521dd 100644 --- a/storehaus-mysql/src/main/scala/com/twitter/storehaus/mysql/ValueMapper.scala +++ b/storehaus-mysql/src/main/scala/com/twitter/storehaus/mysql/ValueMapper.scala @@ -18,8 +18,6 @@ package com.twitter.storehaus.mysql import java.lang.UnsupportedOperationException -import scala.util.control.Exception.allCatch - import com.twitter.bijection.Injection import com.twitter.finagle.exp.mysql.{ EmptyValue, @@ -36,6 +34,7 @@ import com.twitter.finagle.exp.mysql.{ import org.jboss.netty.buffer.ChannelBuffer import org.jboss.netty.buffer.ChannelBuffers import org.jboss.netty.util.CharsetUtil.UTF_8 +import scala.util.Try /** Helper class for mapping finagle-mysql Values to types we care about. */ object ValueMapper { @@ -123,7 +122,7 @@ class MySqlValue(val v: Value) { */ object MySqlStringInjection extends Injection[MySqlValue, String] { def apply(a: MySqlValue): String = ValueMapper.toString(a.v).getOrElse("") // should this be null: String instead? - def invert(b: String): Option[MySqlValue] = allCatch.opt(MySqlValue(RawStringValue(b))) + override def invert(b: String) = Try(MySqlValue(RawStringValue(b))) } /** @@ -133,5 +132,5 @@ object MySqlStringInjection extends Injection[MySqlValue, String] { */ object MySqlCbInjection extends Injection[MySqlValue, ChannelBuffer] { def apply(a: MySqlValue): ChannelBuffer = ValueMapper.toChannelBuffer(a.v).getOrElse(ChannelBuffers.EMPTY_BUFFER) - def invert(b: ChannelBuffer): Option[MySqlValue] = allCatch.opt(MySqlValue(RawStringValue(b.toString(UTF_8)))) + override def invert(b: ChannelBuffer) = Try(MySqlValue(RawStringValue(b.toString(UTF_8)))) } diff --git a/storehaus-mysql/src/test/scala/com/twitter/storehaus/mysql/MySQLStoreProperties.scala b/storehaus-mysql/src/test/scala/com/twitter/storehaus/mysql/MySQLStoreProperties.scala index 35a1f8d3..6c2f024c 100644 --- a/storehaus-mysql/src/test/scala/com/twitter/storehaus/mysql/MySQLStoreProperties.scala +++ b/storehaus-mysql/src/test/scala/com/twitter/storehaus/mysql/MySQLStoreProperties.scala @@ -41,7 +41,7 @@ object MySqlStoreProperties extends Properties("MySqlStore") * we lowercase key's here for normalization */ def stringify(examples: List[(Any, Option[Any])]) = examples.map { case (k, v) => - (MySqlStringInjection.invert(k.toString.toLowerCase).get, v.flatMap { d => MySqlStringInjection.invert(d.toString) }) + (MySqlStringInjection.invert(k.toString.toLowerCase).get, v.flatMap { d => MySqlStringInjection.invert(d.toString).toOption }) } def putAndGetStoreTest(store: MySqlStore, pairs: Gen[List[(Any, Option[Any])]] = NonEmpty.Pairing.alphaStrs()) = @@ -72,7 +72,7 @@ object MySqlStoreProperties extends Properties("MySqlStore") def compareValues(k: MySqlValue, expectedOptV: Option[MySqlValue], foundOptV: Option[MySqlValue]) = { val isMatch = expectedOptV match { - case Some(value) => !foundOptV.isEmpty && foundOptV.get == value + case Some(value) => !foundOptV.isEmpty && foundOptV.get == value case None => foundOptV.isEmpty } if (!isMatch) printErr(k, expectedOptV, foundOptV) @@ -120,7 +120,7 @@ object MySqlStoreProperties extends Properties("MySqlStore") property("MySqlStore smallint->smallint multiget") = withStore(putAndMultiGetStoreTest(_, NonEmpty.Pairing.numerics[Short]()), "smallint", "smallint", true) - + private def withStore[T](f: MySqlStore => T, kColType: String, vColType: String, multiGet: Boolean = false): T = { val client = Client("localhost:3306", "storehaususer", "test1234", "storehaus_test", Level.WARNING) // these should match mysql setup used in .travis.yml diff --git a/storehaus-redis/src/main/scala/com/twitter/storehaus/redis/RedisStringStore.scala b/storehaus-redis/src/main/scala/com/twitter/storehaus/redis/RedisStringStore.scala index 4ec421a7..f94069df 100644 --- a/storehaus-redis/src/main/scala/com/twitter/storehaus/redis/RedisStringStore.scala +++ b/storehaus-redis/src/main/scala/com/twitter/storehaus/redis/RedisStringStore.scala @@ -18,16 +18,17 @@ package com.twitter.storehaus.redis import com.twitter.algebird.Monoid import com.twitter.bijection.Injection +import com.twitter.bijection.Conversion.asMethod import com.twitter.finagle.redis.Client import com.twitter.finagle.redis.util.{ CBToString, StringToChannelBuffer } import com.twitter.storehaus.ConvertedStore import com.twitter.storehaus.algebra.MergeableStore import com.twitter.util.{ Duration, Future } import org.jboss.netty.buffer.ChannelBuffer -import scala.util.control.Exception.allCatch +import scala.util.Try /** - * + * * @author Doug Tangren */ @@ -35,12 +36,13 @@ object RedisStringStore { private [redis] implicit object StringInjection extends Injection[String, ChannelBuffer] { def apply(a: String): ChannelBuffer = StringToChannelBuffer(a) - def invert(b: ChannelBuffer): Option[String] = allCatch.opt(CBToString(b)) + override def invert(b: ChannelBuffer) = Try(CBToString(b)) } def apply(client: Client, ttl: Option[Duration] = RedisStore.Default.TTL) = new RedisStringStore(RedisStore(client, ttl)) } + import RedisStringStore._ /** @@ -50,9 +52,7 @@ import RedisStringStore._ class RedisStringStore(underlying: RedisStore) extends ConvertedStore[ChannelBuffer, ChannelBuffer, ChannelBuffer, String](underlying)(identity) with MergeableStore[ChannelBuffer, String] { - val monoid = implicitly[Monoid[String]] + override val monoid = implicitly[Monoid[String]] override def merge(kv: (ChannelBuffer, String)): Future[Unit] = - underlying.client.append(kv._1, kv._2).unit + underlying.client.append(kv._1, kv._2.as[ChannelBuffer]).unit } - - diff --git a/storehaus-redis/src/test/scala/com/twitter/storehaus/redis/RedisStoreProperties.scala b/storehaus-redis/src/test/scala/com/twitter/storehaus/redis/RedisStoreProperties.scala index a23574af..cedcd787 100644 --- a/storehaus-redis/src/test/scala/com/twitter/storehaus/redis/RedisStoreProperties.scala +++ b/storehaus-redis/src/test/scala/com/twitter/storehaus/redis/RedisStoreProperties.scala @@ -27,7 +27,7 @@ import com.twitter.util.Await import org.jboss.netty.buffer.ChannelBuffer import org.scalacheck.{ Arbitrary, Gen, Properties } import org.scalacheck.Prop._ -import scala.util.control.Exception.allCatch +import scala.util.Try object RedisStoreProperties extends Properties("RedisStore") with CloseableCleanup[Store[String, String]] @@ -61,7 +61,7 @@ object RedisStoreProperties extends Properties("RedisStore") implicit def strToCb = new Injection[String, ChannelBuffer] { def apply(a: String): ChannelBuffer = StringToChannelBuffer(a) - def invert(b: ChannelBuffer): Option[String] = allCatch.opt(CBToString(b)) + override def invert(b: ChannelBuffer) = Try(CBToString(b)) } val closeable = RedisStore(client).convert(StringToChannelBuffer(_: String)) diff --git a/version.sbt b/version.sbt index ab9f75d1..d8e39804 100644 --- a/version.sbt +++ b/version.sbt @@ -1,2 +1,2 @@ -version in ThisBuild := "0.4.0" +version in ThisBuild := "0.5.1"