From 899ac6dd3306827e14296daa7e5964bdaf57dd4b Mon Sep 17 00:00:00 2001 From: Ruban Monu Date: Fri, 17 May 2013 16:05:41 -0700 Subject: [PATCH 01/29] move to finagle-mysql 6.3.0, disable store.close in tests --- project/Build.scala | 2 +- .../twitter/storehaus/mysql/MySQLStoreProperties.scala | 9 +++++++-- 2 files changed, 8 insertions(+), 3 deletions(-) diff --git a/project/Build.scala b/project/Build.scala index 20f285f4..8f06f82b 100644 --- a/project/Build.scala +++ b/project/Build.scala @@ -153,7 +153,7 @@ object StorehausBuild extends Build { ).settings( name := "storehaus-mysql", previousArtifact := youngestForwardCompatible("mysql"), - libraryDependencies += Finagle.module("mysql", "6.2.1") // tests fail with the latest + libraryDependencies += Finagle.module("mysql") ).dependsOn(storehausCore % "test->test;compile->compile") lazy val storehausRedis = Project( diff --git a/storehaus-mysql/src/test/scala/com/twitter/storehaus/mysql/MySQLStoreProperties.scala b/storehaus-mysql/src/test/scala/com/twitter/storehaus/mysql/MySQLStoreProperties.scala index af589587..67ec888a 100644 --- a/storehaus-mysql/src/test/scala/com/twitter/storehaus/mysql/MySQLStoreProperties.scala +++ b/storehaus-mysql/src/test/scala/com/twitter/storehaus/mysql/MySQLStoreProperties.scala @@ -117,7 +117,7 @@ object MySqlStoreProperties extends Properties("MySqlStore") { property("MySqlStore smallint->smallint multiget") = withStore(putAndMultiGetStoreTest(_, validPairs[Short]), "smallint", "smallint", true) - + private def withStore[T](f: MySqlStore => T, kColType: String, vColType: String, multiGet: Boolean = false): T = { val client = Client("localhost:3306", "storehaususer", "test1234", "storehaus_test", Level.WARNING) // these should match mysql setup used in .travis.yml @@ -128,7 +128,12 @@ object MySqlStoreProperties extends Properties("MySqlStore") { val store = MySqlStore(client, tableName, "key", "value") val result = f(store) - store.close + + // TODO: fix below once storehaus-testing module is ready + // as of now, store.close gets called before all test cases finish running + // so we comment this out + // store.close + result } } From 5b4200cacfc110d6d25ad92ec6890322e66ae5a8 Mon Sep 17 00:00:00 2001 From: Ruban Monu Date: Fri, 17 May 2013 16:05:57 -0700 Subject: [PATCH 02/29] reuse prepared statements --- .../twitter/storehaus/mysql/MySQLStore.scala | 41 +++++++++++++------ 1 file changed, 28 insertions(+), 13 deletions(-) diff --git a/storehaus-mysql/src/main/scala/com/twitter/storehaus/mysql/MySQLStore.scala b/storehaus-mysql/src/main/scala/com/twitter/storehaus/mysql/MySQLStore.scala index 0fb5fc1f..61ff8029 100644 --- a/storehaus-mysql/src/main/scala/com/twitter/storehaus/mysql/MySQLStore.scala +++ b/storehaus-mysql/src/main/scala/com/twitter/storehaus/mysql/MySQLStore.scala @@ -19,7 +19,7 @@ package com.twitter.storehaus.mysql import com.twitter.finagle.exp.mysql.{ Client, PreparedStatement, Result } import com.twitter.storehaus.FutureOps import com.twitter.storehaus.Store -import com.twitter.util.Future +import com.twitter.util.{ Await, Future } /** * @author Ruban Monu @@ -68,15 +68,22 @@ class MySqlStore(client: Client, table: String, kCol: String, vCol: String) val UPDATE_SQL = "UPDATE " + g(table) + " SET " + g(vCol) + "=? WHERE " + g(kCol) + "=?" val DELETE_SQL = "DELETE FROM " + g(table) + " WHERE " + g(kCol) + "=?" + // prepared statements to be reused across gets and puts + // TODO: should this be non-blocking? this is part of object construction, so maybe not? + val selectStmt = Await.result(client.prepare(SELECT_SQL)) + val insertStmt = Await.result(client.prepare(INSERT_SQL)) + val updateStmt = Await.result(client.prepare(UPDATE_SQL)) + val deleteStmt = Await.result(client.prepare(DELETE_SQL)) + override def get(k: MySqlValue): Future[Option[MySqlValue]] = { // finagle-mysql select() method lets you pass in a mapping function // to convert resultset into desired output format // we assume here the mysql client already has the dbname/schema selected - val mysqlResult: Future[(PreparedStatement,Seq[Option[MySqlValue]])] = client.prepareAndSelect(SELECT_SQL, MySqlStringInjection(k).getBytes) { row => + selectStmt.parameters = Array(MySqlStringInjection(k).getBytes) + val mysqlResult: Future[Seq[Option[MySqlValue]]] = client.select(selectStmt) { row => row(vCol) match { case None => None; case Some(v) => Some(MySqlValue(v)) } } - mysqlResult.map { case(ps, result) => - client.closeStatement(ps) + mysqlResult.map { case result => result.lift(0).flatten.headOption } } @@ -108,7 +115,14 @@ class MySqlStore(client: Client, table: String, kCol: String, vCol: String) } } - override def close { client.close } + override def close { + // close prepared statements before closing the connection + client.closeStatement(selectStmt) + client.closeStatement(insertStmt) + client.closeStatement(updateStmt) + client.closeStatement(deleteStmt) + client.close + } protected def doSet(k: MySqlValue, v: MySqlValue): Future[Result] = { // mysql's insert-or-update syntax works only when a primary key is defined: @@ -117,18 +131,19 @@ class MySqlStore(client: Client, table: String, kCol: String, vCol: String) // and insert or update accordingly get(k).flatMap { optionV => optionV match { - case Some(value) => client.prepareAndExecute(UPDATE_SQL, MySqlStringInjection(v).getBytes, MySqlStringInjection(k).getBytes) - case None => client.prepareAndExecute(INSERT_SQL, MySqlStringInjection(k).getBytes, MySqlStringInjection(v).getBytes) + case Some(value) => + updateStmt.parameters = Array(MySqlStringInjection(v).getBytes, MySqlStringInjection(k).getBytes) + client.execute(updateStmt) + case None => + insertStmt.parameters = Array(MySqlStringInjection(k).getBytes, MySqlStringInjection(v).getBytes) + client.execute(insertStmt) } - // prepareAndExecute returns Future[(PreparedStatement,Result)] - }.map { case (ps, result) => client.closeStatement(ps); result } + } } protected def doDelete(k: MySqlValue): Future[Result] = { - // prepareAndExecute returns Future[(PreparedStatement,Result)] - client.prepareAndExecute(DELETE_SQL, MySqlStringInjection(k).getBytes).map { - case (ps, result) => client.closeStatement(ps); result - } + deleteStmt.parameters = Array(MySqlStringInjection(k).getBytes) + client.execute(deleteStmt) } // enclose table or column names in backticks, in case they happen to be sql keywords From 17eee815e38d833fab1a2322d7ee90f90ba6c3ae Mon Sep 17 00:00:00 2001 From: Sam Ritchie Date: Tue, 25 Jun 2013 14:28:29 -0700 Subject: [PATCH 03/29] beef up memcache --- project/Build.scala | 7 ++- .../storehaus/memcache/HashEncoder.scala | 26 ++++++-- .../storehaus/memcache/MemcacheStore.scala | 61 +++++++++++++++++++ 3 files changed, 87 insertions(+), 7 deletions(-) diff --git a/project/Build.scala b/project/Build.scala index 20f285f4..28f913f5 100644 --- a/project/Build.scala +++ b/project/Build.scala @@ -143,7 +143,12 @@ object StorehausBuild extends Build { ).settings( name := "storehaus-memcache", previousArtifact := youngestForwardCompatible("memcache"), - libraryDependencies += Finagle.module("memcached") + libraryDependencies ++= Seq( + "com.twitter" %% "algebird-core" % algebirdVersion, + "com.twitter" %% "bijection-core" % bijectionVersion, + "com.twitter" %% "bijection-netty" % bijectionVersion, + Finagle.module("memcached") + ) ).dependsOn(storehausAlgebra % "test->test;compile->compile") lazy val storehausMySQL = Project( diff --git a/storehaus-memcache/src/main/scala/com/twitter/storehaus/memcache/HashEncoder.scala b/storehaus-memcache/src/main/scala/com/twitter/storehaus/memcache/HashEncoder.scala index fae8d6a9..b49445ac 100644 --- a/storehaus-memcache/src/main/scala/com/twitter/storehaus/memcache/HashEncoder.scala +++ b/storehaus-memcache/src/main/scala/com/twitter/storehaus/memcache/HashEncoder.scala @@ -16,6 +16,7 @@ package com.twitter.storehaus.memcache +import com.twitter.bijection.{ Base64String, Bijection, Injection } import com.twitter.util.Encoder /** @@ -28,10 +29,7 @@ import com.twitter.util.Encoder * @author Sam Ritchie */ -// See this reference for other algorithm names: -// http://docs.oracle.com/javase/1.4.2/docs/guide/security/CryptoSpec.html#AppA - -class HashEncoder(hashFunc: String = "SHA-256") extends Encoder[Array[Byte],Array[Byte]] { +class HashEncoder(hashFunc: String) extends Encoder[Array[Byte], Array[Byte]] { def encode(bytes: Array[Byte]): Array[Byte] = { val md = java.security.MessageDigest.getInstance(hashFunc) md.digest(bytes) @@ -39,6 +37,22 @@ class HashEncoder(hashFunc: String = "SHA-256") extends Encoder[Array[Byte],Arra } object HashEncoder { - def apply() = new HashEncoder - def apply(hashFunc: String) = new HashEncoder(hashFunc) + // See this reference for other algorithm names: + // http://docs.oracle.com/javase/1.4.2/docs/guide/security/CryptoSpec.html#AppA + val DEFAULT_HASH_FUNC = "SHA-256" + + // TODO: Remove when Bijection gains a Codec type alias. + type Codec[T] = Injection[T, Array[Byte]] + + def apply(hashFunc: String = DEFAULT_HASH_FUNC) = new HashEncoder(hashFunc) + + /** + * Returns a function that encodes a key to a hashed, base64-encoded + * Memcache key string given a unique namespace string. + */ + def keyEncoder[T](namespace: String, hashFunc: String = DEFAULT_HASH_FUNC) + (implicit inj: Codec[T]): T => String = { key: T => + def concat(bytes: Array[Byte]): Array[Byte] = namespace.getBytes ++ bytes + (inj andThen (concat _) andThen HashEncoder() andThen Bijection.connect[Array[Byte], Base64String])(key).str + } } diff --git a/storehaus-memcache/src/main/scala/com/twitter/storehaus/memcache/MemcacheStore.scala b/storehaus-memcache/src/main/scala/com/twitter/storehaus/memcache/MemcacheStore.scala index b3afce02..87245764 100644 --- a/storehaus-memcache/src/main/scala/com/twitter/storehaus/memcache/MemcacheStore.scala +++ b/storehaus-memcache/src/main/scala/com/twitter/storehaus/memcache/MemcacheStore.scala @@ -16,18 +16,30 @@ package com.twitter.storehaus.memcache +import com.twitter.algebird.Monoid +import com.twitter.bijection.{ Bijection, Injection } +import com.twitter.bijection.netty.Implicits._ import com.twitter.conversions.time._ +import com.twitter.finagle.builder.ClientBuilder +import com.twitter.finagle.memcached.KetamaClientBuilder +import com.twitter.finagle.memcached.protocol.text.Memcached +import com.twitter.util.Duration import com.twitter.util.{ Future, Time } import com.twitter.finagle.memcached.{ GetResult, Client } import com.twitter.storehaus.{ FutureOps, Store } +import com.twitter.storehaus.algebra.MergeableStore import org.jboss.netty.buffer.ChannelBuffer +import Store.enrich + /** * @author Oscar Boykin * @author Sam Ritchie */ object MemcacheStore { + import HashEncoder.{ keyEncoder, Codec } + // Default Memcached TTL is one day. val DEFAULT_TTL = Time.fromSeconds(24 * 60 * 60) @@ -36,8 +48,57 @@ object MemcacheStore { // http://docs.libmemcached.org/memcached_set.html val DEFAULT_FLAG = 0 + val DEFAULT_CONNECTION_LIMIT = 1 + val DEFAULT_TIMEOUT = 1.seconds + val DEFAULT_RETRIES = 2 + def apply(client: Client, ttl: Time = DEFAULT_TTL, flag: Int = DEFAULT_FLAG) = new MemcacheStore(client, ttl, flag) + + def defaultClient( + name: String, + nodeString: String, + retries: Int = DEFAULT_RETRIES, + timeout: Duration = DEFAULT_TIMEOUT, + hostConnectionLimit: Int = DEFAULT_CONNECTION_LIMIT): Client = { + val builder = ClientBuilder() + .name(name) + .retries(retries) + .tcpConnectTimeout(timeout) + .requestTimeout(timeout) + .connectTimeout(timeout) + .readerIdleTimeout(timeout) + .hostConnectionLimit(hostConnectionLimit) + .codec(Memcached()) + + KetamaClientBuilder() + .clientBuilder(builder) + .nodes(nodeString) + .build() + } + + /** + * Returns a Memcache-backed Store[K, V] that uses + * implicitly-supplied Injection instances from K and V -> + * Array[Byte] to manage type conversion. + */ + def typed[K: Codec, V: Codec](client: Client, keyPrefix: String, + ttl: Time = DEFAULT_TTL, flag: Int = DEFAULT_FLAG): Store[K, V] = { + implicit val valueToBuf = Injection.connect[V, Array[Byte], ChannelBuffer] + MemcacheStore(client, ttl, flag).convert(keyEncoder[K](keyPrefix)) + } + + /** + * Returns a Memcache-backed MergeableStore[K, V] that uses + * implicitly-supplied Injection instances from K and V -> + * Array[Byte] to manage type conversion. The Monoid[V] is also + * pulled in implicitly. + */ + def mergeable[K: Codec, V: Codec: Monoid](client: Client, keyPrefix: String, + ttl: Time = DEFAULT_TTL, flag: Int = DEFAULT_FLAG): MergeableStore[K, V] = + MergeableStore.fromStore( + MemcacheStore.typed(client, keyPrefix, ttl, flag) + ) } class MemcacheStore(val client: Client, ttl: Time, flag: Int) extends Store[String, ChannelBuffer] { From 9900f363b6602096e2337d8107246de786bbef11 Mon Sep 17 00:00:00 2001 From: Ruban Monu Date: Thu, 27 Jun 2013 21:37:09 -0700 Subject: [PATCH 04/29] merge with develop. remove .close from mysql tests --- .../twitter/storehaus/mysql/MySQLStoreProperties.scala | 9 --------- 1 file changed, 9 deletions(-) diff --git a/storehaus-mysql/src/test/scala/com/twitter/storehaus/mysql/MySQLStoreProperties.scala b/storehaus-mysql/src/test/scala/com/twitter/storehaus/mysql/MySQLStoreProperties.scala index 7fa37567..5d134e41 100644 --- a/storehaus-mysql/src/test/scala/com/twitter/storehaus/mysql/MySQLStoreProperties.scala +++ b/storehaus-mysql/src/test/scala/com/twitter/storehaus/mysql/MySQLStoreProperties.scala @@ -129,15 +129,6 @@ object MySqlStoreProperties extends Properties("MySqlStore") val schema = "CREATE TEMPORARY TABLE IF NOT EXISTS `"+tableName+"` (`key` "+kColType+" DEFAULT NULL, `value` "+vColType+" DEFAULT NULL) ENGINE=InnoDB DEFAULT CHARSET=utf8;" Await.result(client.query(schema)) - // val store = MySqlStore(client, tableName, "key", "value") - // val result = f(store) - - // TODO: fix below once storehaus-testing module is ready - // as of now, store.close gets called before all test cases finish running - // so we comment this out - // store.close - - //result f(newStore(client, tableName)) } From 4129eb5bd6d95f8d49169d30b7a744b9344a2f25 Mon Sep 17 00:00:00 2001 From: Ximing Yu Date: Tue, 9 Jul 2013 16:04:37 -0700 Subject: [PATCH 05/29] Bump to util 6.3.7 and finagle 6.5.1 --- project/Build.scala | 4 ++-- project/Finagle.scala | 2 +- .../main/scala/com/twitter/storehaus/mysql/MySQLStore.scala | 4 ++-- 3 files changed, 5 insertions(+), 5 deletions(-) diff --git a/project/Build.scala b/project/Build.scala index a5b4543a..b9c0652c 100644 --- a/project/Build.scala +++ b/project/Build.scala @@ -131,7 +131,7 @@ object StorehausBuild extends Build { lazy val storehausCache = module("cache") lazy val storehausCore = module("core").settings( - libraryDependencies += "com.twitter" %% "util-core" % "6.3.0", + libraryDependencies += "com.twitter" %% "util-core" % "6.3.7", libraryDependencies += "com.twitter" %% "bijection-core" % bijectionVersion ).dependsOn(storehausCache % "test->test;compile->compile") @@ -146,7 +146,7 @@ object StorehausBuild extends Build { ).dependsOn(storehausAlgebra % "test->test;compile->compile") lazy val storehausMySQL = module("mysql").settings( - libraryDependencies += Finagle.module("mysql", "6.2.1") // tests fail with the latest + libraryDependencies += Finagle.module("mysql") ).dependsOn(storehausCore % "test->test;compile->compile") lazy val storehausRedis = module("redis").settings( diff --git a/project/Finagle.scala b/project/Finagle.scala index 91143fa3..c9a5df7f 100644 --- a/project/Finagle.scala +++ b/project/Finagle.scala @@ -5,7 +5,7 @@ package storehaus * dependency */ object Finagle { import sbt._ - val LatestVersion = "6.3.0" + val LatestVersion = "6.5.1" def module(name: String, version: String = LatestVersion) = "com.twitter" %% "finagle-%s".format(name) % version } diff --git a/storehaus-mysql/src/main/scala/com/twitter/storehaus/mysql/MySQLStore.scala b/storehaus-mysql/src/main/scala/com/twitter/storehaus/mysql/MySQLStore.scala index 0fb5fc1f..a8fe6e04 100644 --- a/storehaus-mysql/src/main/scala/com/twitter/storehaus/mysql/MySQLStore.scala +++ b/storehaus-mysql/src/main/scala/com/twitter/storehaus/mysql/MySQLStore.scala @@ -19,7 +19,7 @@ package com.twitter.storehaus.mysql import com.twitter.finagle.exp.mysql.{ Client, PreparedStatement, Result } import com.twitter.storehaus.FutureOps import com.twitter.storehaus.Store -import com.twitter.util.Future +import com.twitter.util.{Future, Time} /** * @author Ruban Monu @@ -108,7 +108,7 @@ class MySqlStore(client: Client, table: String, kCol: String, vCol: String) } } - override def close { client.close } + override def close { client.close(Time.Bottom) } protected def doSet(k: MySqlValue, v: MySqlValue): Future[Result] = { // mysql's insert-or-update syntax works only when a primary key is defined: From ec210140a620c00dc5e9c9c7ecf4f0708ee371a1 Mon Sep 17 00:00:00 2001 From: Sam Ritchie Date: Thu, 18 Jul 2013 11:44:32 -0400 Subject: [PATCH 06/29] upgrade algebird --- .travis.yml | 2 +- project/Build.scala | 20 +++++++++++++++----- project/Finagle.scala | 2 +- 3 files changed, 17 insertions(+), 7 deletions(-) diff --git a/.travis.yml b/.travis.yml index 5263602d..befbde46 100644 --- a/.travis.yml +++ b/.travis.yml @@ -1,7 +1,7 @@ language: scala scala: - 2.10.0 - - 2.9.2 + - 2.9.3 before_script: - mysql -u root -e "create database storehaus_test;" - mysql -u root -e "create user 'storehaususer'@'localhost' identified by 'test1234';" diff --git a/project/Build.scala b/project/Build.scala index 7750c847..6bead7a2 100644 --- a/project/Build.scala +++ b/project/Build.scala @@ -8,6 +8,13 @@ import com.typesafe.tools.mima.plugin.MimaPlugin.mimaDefaultSettings import com.typesafe.tools.mima.plugin.MimaKeys.previousArtifact object StorehausBuild extends Build { + def withCross(dep: ModuleID) = + dep cross CrossVersion.binaryMapped { + case "2.9.3" => "2.9.2" // TODO: hack because twitter hasn't built things against 2.9.3 + case version if version startsWith "2.10" => "2.10" // TODO: hack because sbt is broken + case x => x + } + val extraSettings = Project.defaultSettings ++ releaseSettings ++ Boilerplate.settings ++ mimaDefaultSettings @@ -28,7 +35,10 @@ object StorehausBuild extends Build { val sharedSettings = extraSettings ++ ciSettings ++ Seq( organization := "com.twitter", - crossScalaVersions := Seq("2.9.2", "2.10.0"), + + scalaVersion := "2.9.3", + + crossScalaVersions := Seq("2.9.3", "2.10.0"), javacOptions ++= Seq("-source", "1.6", "-target", "1.6"), @@ -99,7 +109,7 @@ object StorehausBuild extends Build { .filterNot(unreleasedModules.contains(_)) .map { s => "com.twitter" % ("storehaus-" + s + "_2.9.2") % "0.3.0" } - val algebirdVersion = "0.1.13" + val algebirdVersion = "0.2.0" val bijectionVersion = "0.4.0" lazy val storehaus = Project( @@ -131,14 +141,14 @@ object StorehausBuild extends Build { lazy val storehausCache = module("cache") lazy val storehausCore = module("core").settings( - libraryDependencies += "com.twitter" %% "util-core" % "6.3.7", - libraryDependencies += "com.twitter" %% "bijection-core" % bijectionVersion + libraryDependencies += withCross("com.twitter" %% "util-core" % "6.3.7"), + libraryDependencies += withCross("com.twitter" %% "bijection-core" % bijectionVersion) ).dependsOn(storehausCache % "test->test;compile->compile") lazy val storehausAlgebra = module("algebra").settings( libraryDependencies += "com.twitter" %% "algebird-core" % algebirdVersion, libraryDependencies += "com.twitter" %% "algebird-util" % algebirdVersion, - libraryDependencies += "com.twitter" %% "bijection-algebird" % bijectionVersion + libraryDependencies += withCross("com.twitter" %% "bijection-algebird" % bijectionVersion) ).dependsOn(storehausCore % "test->test;compile->compile") lazy val storehausMemcache = module("memcache").settings( diff --git a/project/Finagle.scala b/project/Finagle.scala index c9a5df7f..b339bd98 100644 --- a/project/Finagle.scala +++ b/project/Finagle.scala @@ -7,5 +7,5 @@ object Finagle { import sbt._ val LatestVersion = "6.5.1" def module(name: String, version: String = LatestVersion) = - "com.twitter" %% "finagle-%s".format(name) % version + StorehausBuild.withCross("com.twitter" %% "finagle-%s".format(name) % version) } From ccb05cdf49cdc8a5cc2196bf1c8af24da948807c Mon Sep 17 00:00:00 2001 From: Sam Ritchie Date: Thu, 18 Jul 2013 13:08:05 -0400 Subject: [PATCH 07/29] upgrade bijection --- project/Build.scala | 11 +++++++---- .../storehaus/algebra/MutableCacheAlgebra.scala | 11 +++++++---- .../twitter/storehaus/algebra/TTLInjection.scala | 9 ++++++--- .../com/twitter/storehaus/ConvertedStore.scala | 16 +++++++++------- .../storehaus/memcache/MemcacheStringStore.scala | 7 ++++--- .../twitter/storehaus/mysql/ValueMapper.scala | 7 +++---- .../storehaus/redis/RedisStringStore.scala | 14 +++++++------- 7 files changed, 43 insertions(+), 32 deletions(-) diff --git a/project/Build.scala b/project/Build.scala index 6bead7a2..c5821205 100644 --- a/project/Build.scala +++ b/project/Build.scala @@ -110,7 +110,7 @@ object StorehausBuild extends Build { .map { s => "com.twitter" % ("storehaus-" + s + "_2.9.2") % "0.3.0" } val algebirdVersion = "0.2.0" - val bijectionVersion = "0.4.0" + val bijectionVersion = "0.5.2" lazy val storehaus = Project( id = "storehaus", @@ -141,14 +141,17 @@ object StorehausBuild extends Build { lazy val storehausCache = module("cache") lazy val storehausCore = module("core").settings( - libraryDependencies += withCross("com.twitter" %% "util-core" % "6.3.7"), - libraryDependencies += withCross("com.twitter" %% "bijection-core" % bijectionVersion) + libraryDependencies ++= Seq( + withCross("com.twitter" %% "util-core" % "6.3.7"), + "com.twitter" %% "bijection-core" % bijectionVersion, + "com.twitter" %% "bijection-util" % bijectionVersion + ) ).dependsOn(storehausCache % "test->test;compile->compile") lazy val storehausAlgebra = module("algebra").settings( libraryDependencies += "com.twitter" %% "algebird-core" % algebirdVersion, libraryDependencies += "com.twitter" %% "algebird-util" % algebirdVersion, - libraryDependencies += withCross("com.twitter" %% "bijection-algebird" % bijectionVersion) + libraryDependencies += "com.twitter" %% "bijection-algebird" % bijectionVersion ).dependsOn(storehausCore % "test->test;compile->compile") lazy val storehausMemcache = module("memcache").settings( diff --git a/storehaus-algebra/src/main/scala/com/twitter/storehaus/algebra/MutableCacheAlgebra.scala b/storehaus-algebra/src/main/scala/com/twitter/storehaus/algebra/MutableCacheAlgebra.scala index 26819b35..f930f163 100644 --- a/storehaus-algebra/src/main/scala/com/twitter/storehaus/algebra/MutableCacheAlgebra.scala +++ b/storehaus-algebra/src/main/scala/com/twitter/storehaus/algebra/MutableCacheAlgebra.scala @@ -42,21 +42,24 @@ class AlgebraicMutableCache[K, V](cache: MutableCache[K, V]) { new MutableCache[K, U] { override def get(k: K) = for { v <- cache.get(k) - (_, u) <- injection.invert(k, v) + (_, u) <- injection.invert(k, v).toOption } yield u override def +=(ku: (K, U)) = { cache += injection(ku); this } - override def hit(k: K) = cache.hit(k).flatMap { injection.invert(k, _) }.map { _._2 } + override def hit(k: K) = + cache.hit(k) + .flatMap(injection.invert(k, _).toOption).map(_._2) override def evict(k: K) = for { evictedV <- cache.evict(k) - (_, evictedU) <- injection.invert(k, evictedV) + (_, evictedU) <- injection.invert(k, evictedV).toOption } yield evictedU override def empty = new AlgebraicMutableCache(cache.empty).inject(injection) override def clear = { cache.clear; this } - override def iterator = cache.iterator.flatMap(injection.invert(_)) + override def iterator = + cache.iterator.flatMap(injection.invert(_).toOption) } } diff --git a/storehaus-algebra/src/main/scala/com/twitter/storehaus/algebra/TTLInjection.scala b/storehaus-algebra/src/main/scala/com/twitter/storehaus/algebra/TTLInjection.scala index f4d2ab36..5cfde0eb 100644 --- a/storehaus-algebra/src/main/scala/com/twitter/storehaus/algebra/TTLInjection.scala +++ b/storehaus-algebra/src/main/scala/com/twitter/storehaus/algebra/TTLInjection.scala @@ -18,6 +18,7 @@ package com.twitter.storehaus.algebra import com.twitter.algebird.Semigroup import com.twitter.bijection.Injection +import scala.util.{ Success, Failure } /** * Injection that maps values paired with stale values of T => None @@ -27,17 +28,19 @@ import com.twitter.bijection.Injection * @author Sam Ritchie */ +case class ExpiredException[K, V](pair: (K, V)) extends RuntimeException(pair.toString) + class TTLInjection[K, T: Ordering: Semigroup, V](delta: T)(clock: () => T) extends Injection[(K, V), (K, (T, V))] { def apply(pair: (K, V)): (K, (T, V)) = { val (k, v) = pair (k, (Semigroup.plus(clock(), delta), v)) } - override def invert(pair: (K, (T, V))): Option[(K, V)] = { + override def invert(pair: (K, (T, V))) = { val (k, (expiration, v)) = pair if (Ordering[T].gteq(expiration, clock())) - Some((k, v)) + Success(k -> v) else - None + Failure(ExpiredException(k -> v)) } } diff --git a/storehaus-core/src/main/scala/com/twitter/storehaus/ConvertedStore.scala b/storehaus-core/src/main/scala/com/twitter/storehaus/ConvertedStore.scala index a4e97f34..536b7f3f 100644 --- a/storehaus-core/src/main/scala/com/twitter/storehaus/ConvertedStore.scala +++ b/storehaus-core/src/main/scala/com/twitter/storehaus/ConvertedStore.scala @@ -17,7 +17,10 @@ package com.twitter.storehaus import com.twitter.bijection.Injection -import com.twitter.util.Future +import com.twitter.bijection.Conversion.asMethod +import com.twitter.bijection.twitter_util.UtilBijections._ +import com.twitter.util.{ Future, Try } +import scala.util.{ Success, Failure } /** Use an injection on V2,V1 to convert a store of values V2. * If the value stored in the underlying store cannot be converted back to V2, then you will get a Future.exception @@ -25,11 +28,10 @@ import com.twitter.util.Future * TODO: we should add a specific exception type here so we can safely filter these cases to Future.None if we so choose. */ class ConvertedStore[K1, -K2, V1, V2](store: Store[K1, V1])(kfn: K2 => K1)(implicit inj: Injection[V2, V1]) - extends ConvertedReadableStore[K1, K2, V1, V2](store)(kfn)({ v1: V1 => - inj.invert(v1).map { Future.value(_) } - .getOrElse(Future.exception(new Exception(v1.toString + ": V1 cannot be converted to V2"))) - }) - with Store[K2, V2] { + extends ConvertedReadableStore[K1, K2, V1, V2](store)(kfn)({ v1: V1 => + Future.const(inj.invert(v1).as[Try[V2]]) + }) + with Store[K2, V2] { override def put(kv: (K2, Option[V2])) = { val k1 = kfn(kv._1) @@ -37,7 +39,7 @@ class ConvertedStore[K1, -K2, V1, V2](store: Store[K1, V1])(kfn: K2 => K1)(impli store.put((k1, v1)) } override def multiPut[K3 <: K2](kvs: Map[K3, Option[V2]]) = { - val mapK1V1 = kvs.map { case (k3, v2) => (kfn(k3), v2.map { inj(_) }) } + val mapK1V1 = kvs.map { case (k3, v2) => (kfn(k3), v2.map(inj(_))) } val res: Map[K1, Future[Unit]] = store.multiPut(mapK1V1) kvs.keySet.map { k3 => (k3, res(kfn(k3))) }.toMap } diff --git a/storehaus-memcache/src/main/scala/com/twitter/storehaus/memcache/MemcacheStringStore.scala b/storehaus-memcache/src/main/scala/com/twitter/storehaus/memcache/MemcacheStringStore.scala index af9a63eb..8ba0eb4f 100644 --- a/storehaus-memcache/src/main/scala/com/twitter/storehaus/memcache/MemcacheStringStore.scala +++ b/storehaus-memcache/src/main/scala/com/twitter/storehaus/memcache/MemcacheStringStore.scala @@ -24,7 +24,8 @@ import com.twitter.storehaus.ConvertedStore import com.twitter.storehaus.algebra.MergeableStore import org.jboss.netty.buffer.{ ChannelBuffer, ChannelBuffers } import org.jboss.netty.util.CharsetUtil -import scala.util.control.Exception.allCatch + +import scala.util.Try /** * @author Doug Tangren @@ -34,7 +35,7 @@ object MemcacheStringStore { private [memcache] implicit object ByteArrayInjection extends Injection[Array[Byte],ChannelBuffer] { def apply(ary: Array[Byte]) = ChannelBuffers.wrappedBuffer(ary) - def invert(buf: ChannelBuffer) = allCatch.opt(buf.array) + def invert(buf: ChannelBuffer) = Try(buf.array) } private [memcache] implicit val StringInjection = Injection.connect[String, Array[Byte], ChannelBuffer] @@ -45,7 +46,7 @@ object MemcacheStringStore { import MemcacheStringStore._ /** A MergeableStore for String values backed by memcache */ -class MemcacheStringStore(underlying: MemcacheStore) +class MemcacheStringStore(underlying: MemcacheStore) extends ConvertedStore[String, String, ChannelBuffer, String](underlying)(identity) with MergeableStore[String, String] { diff --git a/storehaus-mysql/src/main/scala/com/twitter/storehaus/mysql/ValueMapper.scala b/storehaus-mysql/src/main/scala/com/twitter/storehaus/mysql/ValueMapper.scala index cd8fcf45..a7e521dd 100644 --- a/storehaus-mysql/src/main/scala/com/twitter/storehaus/mysql/ValueMapper.scala +++ b/storehaus-mysql/src/main/scala/com/twitter/storehaus/mysql/ValueMapper.scala @@ -18,8 +18,6 @@ package com.twitter.storehaus.mysql import java.lang.UnsupportedOperationException -import scala.util.control.Exception.allCatch - import com.twitter.bijection.Injection import com.twitter.finagle.exp.mysql.{ EmptyValue, @@ -36,6 +34,7 @@ import com.twitter.finagle.exp.mysql.{ import org.jboss.netty.buffer.ChannelBuffer import org.jboss.netty.buffer.ChannelBuffers import org.jboss.netty.util.CharsetUtil.UTF_8 +import scala.util.Try /** Helper class for mapping finagle-mysql Values to types we care about. */ object ValueMapper { @@ -123,7 +122,7 @@ class MySqlValue(val v: Value) { */ object MySqlStringInjection extends Injection[MySqlValue, String] { def apply(a: MySqlValue): String = ValueMapper.toString(a.v).getOrElse("") // should this be null: String instead? - def invert(b: String): Option[MySqlValue] = allCatch.opt(MySqlValue(RawStringValue(b))) + override def invert(b: String) = Try(MySqlValue(RawStringValue(b))) } /** @@ -133,5 +132,5 @@ object MySqlStringInjection extends Injection[MySqlValue, String] { */ object MySqlCbInjection extends Injection[MySqlValue, ChannelBuffer] { def apply(a: MySqlValue): ChannelBuffer = ValueMapper.toChannelBuffer(a.v).getOrElse(ChannelBuffers.EMPTY_BUFFER) - def invert(b: ChannelBuffer): Option[MySqlValue] = allCatch.opt(MySqlValue(RawStringValue(b.toString(UTF_8)))) + override def invert(b: ChannelBuffer) = Try(MySqlValue(RawStringValue(b.toString(UTF_8)))) } diff --git a/storehaus-redis/src/main/scala/com/twitter/storehaus/redis/RedisStringStore.scala b/storehaus-redis/src/main/scala/com/twitter/storehaus/redis/RedisStringStore.scala index 4ec421a7..f94069df 100644 --- a/storehaus-redis/src/main/scala/com/twitter/storehaus/redis/RedisStringStore.scala +++ b/storehaus-redis/src/main/scala/com/twitter/storehaus/redis/RedisStringStore.scala @@ -18,16 +18,17 @@ package com.twitter.storehaus.redis import com.twitter.algebird.Monoid import com.twitter.bijection.Injection +import com.twitter.bijection.Conversion.asMethod import com.twitter.finagle.redis.Client import com.twitter.finagle.redis.util.{ CBToString, StringToChannelBuffer } import com.twitter.storehaus.ConvertedStore import com.twitter.storehaus.algebra.MergeableStore import com.twitter.util.{ Duration, Future } import org.jboss.netty.buffer.ChannelBuffer -import scala.util.control.Exception.allCatch +import scala.util.Try /** - * + * * @author Doug Tangren */ @@ -35,12 +36,13 @@ object RedisStringStore { private [redis] implicit object StringInjection extends Injection[String, ChannelBuffer] { def apply(a: String): ChannelBuffer = StringToChannelBuffer(a) - def invert(b: ChannelBuffer): Option[String] = allCatch.opt(CBToString(b)) + override def invert(b: ChannelBuffer) = Try(CBToString(b)) } def apply(client: Client, ttl: Option[Duration] = RedisStore.Default.TTL) = new RedisStringStore(RedisStore(client, ttl)) } + import RedisStringStore._ /** @@ -50,9 +52,7 @@ import RedisStringStore._ class RedisStringStore(underlying: RedisStore) extends ConvertedStore[ChannelBuffer, ChannelBuffer, ChannelBuffer, String](underlying)(identity) with MergeableStore[ChannelBuffer, String] { - val monoid = implicitly[Monoid[String]] + override val monoid = implicitly[Monoid[String]] override def merge(kv: (ChannelBuffer, String)): Future[Unit] = - underlying.client.append(kv._1, kv._2).unit + underlying.client.append(kv._1, kv._2.as[ChannelBuffer]).unit } - - From 11992aa0227f52f80a7ac2820102ae30062bb23e Mon Sep 17 00:00:00 2001 From: Sam Ritchie Date: Thu, 18 Jul 2013 13:30:08 -0400 Subject: [PATCH 08/29] fix tests --- project/build.properties | 2 +- .../com/twitter/storehaus/StoreProperties.scala | 14 ++++++++------ 2 files changed, 9 insertions(+), 7 deletions(-) diff --git a/project/build.properties b/project/build.properties index 175f2744..a8c2f849 100644 --- a/project/build.properties +++ b/project/build.properties @@ -1 +1 @@ -sbt.version=0.12.0 \ No newline at end of file +sbt.version=0.12.0 diff --git a/storehaus-core/src/test/scala/com/twitter/storehaus/StoreProperties.scala b/storehaus-core/src/test/scala/com/twitter/storehaus/StoreProperties.scala index f8869154..f626a8d0 100644 --- a/storehaus-core/src/test/scala/com/twitter/storehaus/StoreProperties.scala +++ b/storehaus-core/src/test/scala/com/twitter/storehaus/StoreProperties.scala @@ -23,26 +23,28 @@ import org.scalacheck.Gen.choose import org.scalacheck.Prop._ object StoreProperties extends Properties("Store") { - def baseTest[K: Arbitrary, V: Arbitrary: Equiv](store: Store[K, V]) - (put: (Store[K, V], List[(K, Option[V])]) => Unit) = + def baseTest[K: Arbitrary, V: Arbitrary: Equiv](storeIn: => Store[K, V]) + (put: (Store[K, V], List[(K, Option[V])]) => Unit) = { forAll { (examples: List[(K, Option[V])]) => + lazy val store = storeIn put(store, examples) examples.toMap.forall { case (k, optV) => - Equiv[Option[V]].equiv(Await.result(store.get(k)), optV) + Equiv[Option[V]].equiv(Await.result(store.get(k)), optV) } } + } - def putStoreTest[K: Arbitrary, V: Arbitrary: Equiv](store: Store[K, V]) = + def putStoreTest[K: Arbitrary, V: Arbitrary: Equiv](store: => Store[K, V]) = baseTest(store) { (s, pairs) => pairs.foreach { p => Await.result(s.put(p)) } } - def multiPutStoreTest[K: Arbitrary, V: Arbitrary: Equiv](store: Store[K, V]) = + def multiPutStoreTest[K: Arbitrary, V: Arbitrary: Equiv](store: => Store[K, V]) = baseTest(store) { (s, pairs) => Await.result(FutureOps.mapCollect(s.multiPut(pairs.toMap))) } - def storeTest[K: Arbitrary, V: Arbitrary: Equiv](store: Store[K, V]) = + def storeTest[K: Arbitrary, V: Arbitrary: Equiv](store: => Store[K, V]) = putStoreTest(store) && multiPutStoreTest(store) property("ConcurrentHashMapStore test") = From 5baf9432506f277cc8ec820cc5c8e4b62b4ff5f9 Mon Sep 17 00:00:00 2001 From: Sam Ritchie Date: Thu, 18 Jul 2013 13:43:37 -0400 Subject: [PATCH 09/29] fix redis props --- .../com/twitter/storehaus/mysql/MySQLStoreProperties.scala | 4 ++-- .../com/twitter/storehaus/redis/RedisStoreProperties.scala | 4 ++-- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/storehaus-mysql/src/test/scala/com/twitter/storehaus/mysql/MySQLStoreProperties.scala b/storehaus-mysql/src/test/scala/com/twitter/storehaus/mysql/MySQLStoreProperties.scala index 5d134e41..6c2f024c 100644 --- a/storehaus-mysql/src/test/scala/com/twitter/storehaus/mysql/MySQLStoreProperties.scala +++ b/storehaus-mysql/src/test/scala/com/twitter/storehaus/mysql/MySQLStoreProperties.scala @@ -41,7 +41,7 @@ object MySqlStoreProperties extends Properties("MySqlStore") * we lowercase key's here for normalization */ def stringify(examples: List[(Any, Option[Any])]) = examples.map { case (k, v) => - (MySqlStringInjection.invert(k.toString.toLowerCase).get, v.flatMap { d => MySqlStringInjection.invert(d.toString) }) + (MySqlStringInjection.invert(k.toString.toLowerCase).get, v.flatMap { d => MySqlStringInjection.invert(d.toString).toOption }) } def putAndGetStoreTest(store: MySqlStore, pairs: Gen[List[(Any, Option[Any])]] = NonEmpty.Pairing.alphaStrs()) = @@ -72,7 +72,7 @@ object MySqlStoreProperties extends Properties("MySqlStore") def compareValues(k: MySqlValue, expectedOptV: Option[MySqlValue], foundOptV: Option[MySqlValue]) = { val isMatch = expectedOptV match { - case Some(value) => !foundOptV.isEmpty && foundOptV.get == value + case Some(value) => !foundOptV.isEmpty && foundOptV.get == value case None => foundOptV.isEmpty } if (!isMatch) printErr(k, expectedOptV, foundOptV) diff --git a/storehaus-redis/src/test/scala/com/twitter/storehaus/redis/RedisStoreProperties.scala b/storehaus-redis/src/test/scala/com/twitter/storehaus/redis/RedisStoreProperties.scala index a23574af..cedcd787 100644 --- a/storehaus-redis/src/test/scala/com/twitter/storehaus/redis/RedisStoreProperties.scala +++ b/storehaus-redis/src/test/scala/com/twitter/storehaus/redis/RedisStoreProperties.scala @@ -27,7 +27,7 @@ import com.twitter.util.Await import org.jboss.netty.buffer.ChannelBuffer import org.scalacheck.{ Arbitrary, Gen, Properties } import org.scalacheck.Prop._ -import scala.util.control.Exception.allCatch +import scala.util.Try object RedisStoreProperties extends Properties("RedisStore") with CloseableCleanup[Store[String, String]] @@ -61,7 +61,7 @@ object RedisStoreProperties extends Properties("RedisStore") implicit def strToCb = new Injection[String, ChannelBuffer] { def apply(a: String): ChannelBuffer = StringToChannelBuffer(a) - def invert(b: ChannelBuffer): Option[String] = allCatch.opt(CBToString(b)) + override def invert(b: ChannelBuffer) = Try(CBToString(b)) } val closeable = RedisStore(client).convert(StringToChannelBuffer(_: String)) From e34879d796f86b9897da996fee9d515279073fac Mon Sep 17 00:00:00 2001 From: Sam Ritchie Date: Fri, 19 Jul 2013 12:08:10 -0400 Subject: [PATCH 10/29] remove codec --- .../scala/com/twitter/storehaus/memcache/HashEncoder.scala | 5 +---- .../scala/com/twitter/storehaus/memcache/MemcacheStore.scala | 4 ++-- 2 files changed, 3 insertions(+), 6 deletions(-) diff --git a/storehaus-memcache/src/main/scala/com/twitter/storehaus/memcache/HashEncoder.scala b/storehaus-memcache/src/main/scala/com/twitter/storehaus/memcache/HashEncoder.scala index b49445ac..278e498e 100644 --- a/storehaus-memcache/src/main/scala/com/twitter/storehaus/memcache/HashEncoder.scala +++ b/storehaus-memcache/src/main/scala/com/twitter/storehaus/memcache/HashEncoder.scala @@ -16,7 +16,7 @@ package com.twitter.storehaus.memcache -import com.twitter.bijection.{ Base64String, Bijection, Injection } +import com.twitter.bijection.{ Base64String, Bijection, Codec, Injection } import com.twitter.util.Encoder /** @@ -41,9 +41,6 @@ object HashEncoder { // http://docs.oracle.com/javase/1.4.2/docs/guide/security/CryptoSpec.html#AppA val DEFAULT_HASH_FUNC = "SHA-256" - // TODO: Remove when Bijection gains a Codec type alias. - type Codec[T] = Injection[T, Array[Byte]] - def apply(hashFunc: String = DEFAULT_HASH_FUNC) = new HashEncoder(hashFunc) /** diff --git a/storehaus-memcache/src/main/scala/com/twitter/storehaus/memcache/MemcacheStore.scala b/storehaus-memcache/src/main/scala/com/twitter/storehaus/memcache/MemcacheStore.scala index 91608694..c3eb8d70 100644 --- a/storehaus-memcache/src/main/scala/com/twitter/storehaus/memcache/MemcacheStore.scala +++ b/storehaus-memcache/src/main/scala/com/twitter/storehaus/memcache/MemcacheStore.scala @@ -17,7 +17,7 @@ package com.twitter.storehaus.memcache import com.twitter.algebird.Monoid -import com.twitter.bijection.{ Bijection, Injection } +import com.twitter.bijection.{ Bijection, Codec, Injection } import com.twitter.bijection.netty.Implicits._ import com.twitter.conversions.time._ import com.twitter.finagle.builder.ClientBuilder @@ -37,7 +37,7 @@ import Store.enrich */ object MemcacheStore { - import HashEncoder.{ keyEncoder, Codec } + import HashEncoder.keyEncoder // Default Memcached TTL is one day. // For more details of setting expiration time for items in Memcached, please refer to From bd5151b8dea664f90bcf448828452d51ebb61547 Mon Sep 17 00:00:00 2001 From: Sam Ritchie Date: Fri, 19 Jul 2013 15:14:09 -0400 Subject: [PATCH 11/29] version bumps --- CHANGES.md | 12 ++++++++++++ README.md | 16 +++++++++------- project/Build.scala | 30 +++++++----------------------- version.sbt | 2 +- 4 files changed, 29 insertions(+), 31 deletions(-) diff --git a/CHANGES.md b/CHANGES.md index f2fc207a..6da2bf4f 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -1,5 +1,17 @@ # storehaus # +### Version.0.5.0 ### + +* Reuse prepared statements in mysql: https://github.com/twitter/storehaus/issues/93 +* storehaus-testing module: https://github.com/twitter/storehaus/pull/115 +* cache ttl is now a duration, vs a time: https://github.com/twitter/storehaus/pull/100 +* improve performance of CollectionOps: https://github.com/twitter/storehaus/pull/117 +* Augment memcachestore with common functions: https://github.com/twitter/storehaus/pull/121 +* bump twitter-util and finagle versions: https://github.com/twitter/storehaus/pull/125 +* Upgrade to scala 2.9.3, algebird 0.2.0 and Bijection 0.5.2: https://github.com/twitter/storehaus/pull/126 + +Thanks to Doug Tangren, Ruban Monu, Ximing Yu, Ryan LeCompte, Sam Ritchie and Oscar Boykin for contributions! + ### Version.0.4.0 ### * Storehaus-Mysql support for numeric types diff --git a/README.md b/README.md index 4d0905f4..f2dd55c9 100644 --- a/README.md +++ b/README.md @@ -94,22 +94,24 @@ See the [current API documentation](http://twitter.github.com/storehaus) for mor ## Maven -Storehaus modules are available on maven central. The current groupid and version for all modules is, respectively, `"com.twitter"` and `0.4.0`. +Storehaus modules are available on maven central. The current groupid and version for all modules is, respectively, `"com.twitter"` and `0.5.0`. Current published artifacts are -* `storehaus-core_2.9.2` +* `storehaus-core_2.9.3` * `storehaus-core_2.10` -* `storehaus-algebra_2.9.2` +* `storehaus-algebra_2.9.3` * `storehaus-algebra_2.10` -* `storehaus-memcache_2.9.2` +* `storehaus-memcache_2.9.3` * `storehaus-memcache_2.10` -* `storehaus-mysql_2.9.2` +* `storehaus-mysql_2.9.3` * `storehaus-mysql_2.10` -* `storehaus-redis_2.9.2` +* `storehaus-redis_2.9.3` * `storehaus-redis_2.10` -* `storehaus-cache_2.9.2` +* `storehaus-cache_2.9.3` * `storehaus-cache_2.10` +* `storehaus-testing_2.9.3` +* `storehaus-testing_2.10` The suffix denotes the scala version. diff --git a/project/Build.scala b/project/Build.scala index e4564632..4e3691a1 100644 --- a/project/Build.scala +++ b/project/Build.scala @@ -26,7 +26,7 @@ object StorehausBuild extends Build { logLevel in Test := Level.Info ) else Seq.empty[Project.Setting[_]] - val testCleanup = extraSettings ++ Seq( + val testCleanup = Seq( testOptions in Test += Tests.Cleanup { loader => val c = loader.loadClass("com.twitter.storehaus.testing.Cleanup$") c.getMethod("cleanup").invoke(c.getField("MODULE$").get(c)) @@ -35,41 +35,27 @@ object StorehausBuild extends Build { val sharedSettings = extraSettings ++ ciSettings ++ Seq( organization := "com.twitter", - scalaVersion := "2.9.3", - crossScalaVersions := Seq("2.9.3", "2.10.0"), - javacOptions ++= Seq("-source", "1.6", "-target", "1.6"), - javacOptions in doc := Seq("-source", "1.6"), - - libraryDependencies ++= Seq( - "org.scala-tools.testing" %% "specs" % "1.6.9" % "test" withSources() - ), - + libraryDependencies += "org.scala-tools.testing" %% "specs" % "1.6.9" % "test" withSources(), resolvers ++= Seq( Opts.resolver.sonatypeSnapshots, Opts.resolver.sonatypeReleases, "Twitter Maven" at "http://maven.twttr.com" ), - parallelExecution in Test := true, - scalacOptions ++= Seq(Opts.compile.unchecked, Opts.compile.deprecation), // Publishing options: publishMavenStyle := true, - publishArtifact in Test := false, - pomIncludeRepository := { x => false }, - publishTo <<= version { v => Some(if (v.trim.toUpperCase.endsWith("SNAPSHOT")) Opts.resolver.sonatypeSnapshots else Opts.resolver.sonatypeStaging) }, - pomExtra := ( https://github.com/twitter/storehaus @@ -102,12 +88,12 @@ object StorehausBuild extends Build { * This returns the youngest jar we released that is compatible with * the current. */ - val unreleasedModules = Set[String]() + val unreleasedModules = Set[String]("testing") def youngestForwardCompatible(subProj: String) = Some(subProj) .filterNot(unreleasedModules.contains(_)) - .map { s => "com.twitter" % ("storehaus-" + s + "_2.9.2") % "0.3.0" } + .map { s => "com.twitter" % ("storehaus-" + s + "_2.9.2") % "0.4.0" } val algebirdVersion = "0.2.0" val bijectionVersion = "0.5.2" @@ -132,9 +118,9 @@ object StorehausBuild extends Build { def module(name: String) = { val id = "storehaus-%s".format(name) - Project(id = id, base = file(id), settings = sharedSettings ++ Seq( + Project(id = id, base = file(id), settings = sharedSettings ++ testCleanup ++ Seq( Keys.name := id, - previousArtifact := youngestForwardCompatible(name)) ++ testCleanup + previousArtifact := youngestForwardCompatible(name)) ).dependsOn(storehausTesting % "test->test") } @@ -179,9 +165,7 @@ object StorehausBuild extends Build { settings = sharedSettings ++ Seq( name := "storehaus-testing", previousArtifact := youngestForwardCompatible("testing"), - libraryDependencies ++= Seq( - "org.scalacheck" %% "scalacheck" % "1.10.0" withSources() - ) + libraryDependencies += "org.scalacheck" %% "scalacheck" % "1.10.0" withSources() ) ) } diff --git a/version.sbt b/version.sbt index ab9f75d1..297961da 100644 --- a/version.sbt +++ b/version.sbt @@ -1,2 +1,2 @@ -version in ThisBuild := "0.4.0" +version in ThisBuild := "0.5.0" From d9a917910dd279925e5af00501259fc75609b08d Mon Sep 17 00:00:00 2001 From: Sam Ritchie Date: Fri, 19 Jul 2013 15:17:56 -0400 Subject: [PATCH 12/29] update mima. --- project/Build.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/project/Build.scala b/project/Build.scala index 4e3691a1..bb81bb01 100644 --- a/project/Build.scala +++ b/project/Build.scala @@ -88,12 +88,12 @@ object StorehausBuild extends Build { * This returns the youngest jar we released that is compatible with * the current. */ - val unreleasedModules = Set[String]("testing") + val unreleasedModules = Set[String]() def youngestForwardCompatible(subProj: String) = Some(subProj) .filterNot(unreleasedModules.contains(_)) - .map { s => "com.twitter" % ("storehaus-" + s + "_2.9.2") % "0.4.0" } + .map { s => "com.twitter" % ("storehaus-" + s + "_2.9.3") % "0.5.0" } val algebirdVersion = "0.2.0" val bijectionVersion = "0.5.2" From fe46e9c4a89062334bb9bf7fa61d58dfe6d5fe7e Mon Sep 17 00:00:00 2001 From: Sam Ritchie Date: Fri, 19 Jul 2013 15:22:35 -0400 Subject: [PATCH 13/29] bump to snapshot --- version.sbt | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/version.sbt b/version.sbt index 297961da..5280cb4f 100644 --- a/version.sbt +++ b/version.sbt @@ -1,2 +1,2 @@ -version in ThisBuild := "0.5.0" +version in ThisBuild := "0.5.1-SNAPSHOT" From 8753307cd72773936daa33e65c93fdf418341f70 Mon Sep 17 00:00:00 2001 From: Sam Ritchie Date: Wed, 31 Jul 2013 16:25:42 -0700 Subject: [PATCH 14/29] bump finagle --- project/Finagle.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/project/Finagle.scala b/project/Finagle.scala index 91143fa3..c9a5df7f 100644 --- a/project/Finagle.scala +++ b/project/Finagle.scala @@ -5,7 +5,7 @@ package storehaus * dependency */ object Finagle { import sbt._ - val LatestVersion = "6.3.0" + val LatestVersion = "6.5.1" def module(name: String, version: String = LatestVersion) = "com.twitter" %% "finagle-%s".format(name) % version } From 5b33cc361f3263e722ba7e47e1d479a443485a9d Mon Sep 17 00:00:00 2001 From: Sam Ritchie Date: Wed, 31 Jul 2013 16:37:05 -0700 Subject: [PATCH 15/29] bump version --- version.sbt | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/version.sbt b/version.sbt index ab9f75d1..75b63f38 100644 --- a/version.sbt +++ b/version.sbt @@ -1,2 +1,2 @@ -version in ThisBuild := "0.4.0" +version in ThisBuild := "0.4.1" From c46f1f56cb671755d95c3459a20e6fb28221ccfd Mon Sep 17 00:00:00 2001 From: Sam Ritchie Date: Fri, 9 Aug 2013 15:03:17 -0700 Subject: [PATCH 16/29] test and fix --- .../storehaus/cache/MutableLRUCache.scala | 2 +- .../storehaus/cache/MutableLRUCacheTest.scala | 42 +++++++++++++++++++ 2 files changed, 43 insertions(+), 1 deletion(-) create mode 100644 storehaus-cache/src/test/scala/com/twitter/storehaus/cache/MutableLRUCacheTest.scala diff --git a/storehaus-cache/src/main/scala/com/twitter/storehaus/cache/MutableLRUCache.scala b/storehaus-cache/src/main/scala/com/twitter/storehaus/cache/MutableLRUCache.scala index 616cbb38..5ef6a325 100644 --- a/storehaus-cache/src/main/scala/com/twitter/storehaus/cache/MutableLRUCache.scala +++ b/storehaus-cache/src/main/scala/com/twitter/storehaus/cache/MutableLRUCache.scala @@ -31,7 +31,7 @@ object MutableLRUCache { } class MutableLRUCache[K, V](capacity: Int) extends JMapCache[K, V](() => - new JLinkedHashMap[K, V](capacity + 1, 0.75f) { + new JLinkedHashMap[K, V](capacity + 1, 0.75f, true) { override protected def removeEldestEntry(eldest: JMap.Entry[K, V]) = super.size > capacity }) { diff --git a/storehaus-cache/src/test/scala/com/twitter/storehaus/cache/MutableLRUCacheTest.scala b/storehaus-cache/src/test/scala/com/twitter/storehaus/cache/MutableLRUCacheTest.scala new file mode 100644 index 00000000..6676c4d2 --- /dev/null +++ b/storehaus-cache/src/test/scala/com/twitter/storehaus/cache/MutableLRUCacheTest.scala @@ -0,0 +1,42 @@ +/* + * Copyright 2013 Twitter Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. You may obtain + * a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.twitter.storehaus.cache + +import org.specs._ + +class MutableLRUCacheTest extends Specification { + def freshCache = MutableLRUCache[String, Int](2) + + def checkCache(pairs: Seq[(String, Int)], results: Seq[Boolean]) = { + val cache = freshCache + pairs.foreach(cache += _) + pairs.map { case (k, _) => cache.contains(k) } must be_==(results) + } + + "MutableLRUCache works properly with threshold 2" in { + // At capacity + checkCache( + Seq("a" -> 1, "b" -> 2), + Seq(true, true) + ) + // a is touched, so b and c are evicted + checkCache( + Seq("a" -> 1, "b" -> 2, "c" -> 3, "a" -> 1, "d" -> 4), + Seq(true, false, false, true, true) + ) + } +} From d027d2e703674f748d73b9d378f235754e23e8ac Mon Sep 17 00:00:00 2001 From: MansurAshraf Date: Sat, 7 Sep 2013 22:49:40 -0500 Subject: [PATCH 17/29] Added storehaus-hbase, upgraded bijection to 0.5.3 --- project/Build.scala | 14 ++++++++++++-- .../algebra/ConvertedMergeableStore.scala | 2 +- .../algebra/MergeableStoreProperties.scala | 4 ++-- 3 files changed, 15 insertions(+), 5 deletions(-) diff --git a/project/Build.scala b/project/Build.scala index bb81bb01..5ab3cbe6 100644 --- a/project/Build.scala +++ b/project/Build.scala @@ -96,7 +96,7 @@ object StorehausBuild extends Build { .map { s => "com.twitter" % ("storehaus-" + s + "_2.9.3") % "0.5.0" } val algebirdVersion = "0.2.0" - val bijectionVersion = "0.5.2" + val bijectionVersion = "0.5.3" lazy val storehaus = Project( id = "storehaus", @@ -113,6 +113,7 @@ object StorehausBuild extends Build { storehausMemcache, storehausMySQL, storehausRedis, + storehausHBase, storehausTesting ) @@ -137,7 +138,7 @@ object StorehausBuild extends Build { lazy val storehausAlgebra = module("algebra").settings( libraryDependencies += "com.twitter" %% "algebird-core" % algebirdVersion, libraryDependencies += "com.twitter" %% "algebird-util" % algebirdVersion, - libraryDependencies += "com.twitter" %% "bijection-algebird" % bijectionVersion + libraryDependencies += "com.twitter" %% "algebird-bijection" % algebirdVersion ).dependsOn(storehausCore % "test->test;compile->compile") lazy val storehausMemcache = module("memcache").settings( @@ -159,6 +160,15 @@ object StorehausBuild extends Build { parallelExecution in Test := false ).dependsOn(storehausAlgebra % "test->test;compile->compile") + lazy val storehausHBase= module("hbase").settings( + libraryDependencies ++= Seq( + "com.twitter" %% "algebird-core" % algebirdVersion, + "com.twitter" %% "bijection-core" % bijectionVersion, + "com.twitter" %% "bijection-hbase" % bijectionVersion + ), + parallelExecution in Test := false + ).dependsOn(storehausAlgebra % "test->test;compile->compile") + val storehausTesting = Project( id = "storehaus-testing", base = file("storehaus-testing"), diff --git a/storehaus-algebra/src/main/scala/com/twitter/storehaus/algebra/ConvertedMergeableStore.scala b/storehaus-algebra/src/main/scala/com/twitter/storehaus/algebra/ConvertedMergeableStore.scala index 47f05255..463807cc 100644 --- a/storehaus-algebra/src/main/scala/com/twitter/storehaus/algebra/ConvertedMergeableStore.scala +++ b/storehaus-algebra/src/main/scala/com/twitter/storehaus/algebra/ConvertedMergeableStore.scala @@ -32,7 +32,7 @@ class ConvertedMergeableStore[K1, -K2, V1, V2](store: MergeableStore[K1, V1])(kf (implicit bij: ImplicitBijection[V2, V1]) extends com.twitter.storehaus.ConvertedStore[K1, K2, V1, V2](store)(kfn)(Injection.fromBijection(bij.bijection)) with MergeableStore[K2, V2] { - import com.twitter.bijection.algebird.AlgebirdBijections._ + import com.twitter.algebird.bijection.AlgebirdBijections._ override def monoid: Monoid[V2] = store.monoid.as[Monoid[V2]] diff --git a/storehaus-algebra/src/test/scala/com/twitter/storehaus/algebra/MergeableStoreProperties.scala b/storehaus-algebra/src/test/scala/com/twitter/storehaus/algebra/MergeableStoreProperties.scala index 4d4a74f4..eeec71b4 100644 --- a/storehaus-algebra/src/test/scala/com/twitter/storehaus/algebra/MergeableStoreProperties.scala +++ b/storehaus-algebra/src/test/scala/com/twitter/storehaus/algebra/MergeableStoreProperties.scala @@ -17,7 +17,7 @@ package com.twitter.storehaus.algebra import com.twitter.algebird.{ MapAlgebra, Monoid, SummingQueue } -import com.twitter.bijection.algebird.AlgebirdBijections._ +import com.twitter.algebird.bijection.AlgebirdBijections._ import com.twitter.bijection.Injection import com.twitter.storehaus._ import com.twitter.util.Await @@ -93,7 +93,7 @@ object MergeableStoreProperties extends Properties("MergeableStore") { property("Converted MergeableStore obeys the mergeable store laws") = { // We are using a weird monoid on Int here: - import com.twitter.bijection.algebird.AlgebirdBijections._ + import com.twitter.algebird.bijection.AlgebirdBijections._ import com.twitter.bijection.Conversion.asMethod implicit val monoid : Monoid[Int] = implicitly[Monoid[(Short,Short)]].as[Monoid[Int]] From fcdbdd89d3c5f81351dbfca839f6139c46c14e4e Mon Sep 17 00:00:00 2001 From: MansurAshraf Date: Sun, 8 Sep 2013 22:13:37 -0500 Subject: [PATCH 18/29] added hbase dependencies --- project/Build.scala | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/project/Build.scala b/project/Build.scala index 5ab3cbe6..dce88423 100644 --- a/project/Build.scala +++ b/project/Build.scala @@ -164,7 +164,9 @@ object StorehausBuild extends Build { libraryDependencies ++= Seq( "com.twitter" %% "algebird-core" % algebirdVersion, "com.twitter" %% "bijection-core" % bijectionVersion, - "com.twitter" %% "bijection-hbase" % bijectionVersion + "com.twitter" %% "bijection-hbase" % bijectionVersion , + "org.apache.hbase" % "hbase" % "0.94.4" % "provided->default", + "org.apache.hadoop" % "hadoop-core" % "1.0.4" % "provided->default" ), parallelExecution in Test := false ).dependsOn(storehausAlgebra % "test->test;compile->compile") From c602eeced8004b0c0e7b615fb89fd16e5d483237 Mon Sep 17 00:00:00 2001 From: MansurAshraf Date: Sun, 8 Sep 2013 22:34:39 -0500 Subject: [PATCH 19/29] added hbase module --- .../storehaus/hbase/HBaseLongStore.scala | 93 ++++++++++++++++++ .../twitter/storehaus/hbase/HBaseStore.scala | 90 +++++++++++++++++ .../storehaus/hbase/HBaseStoreConfig.scala | 60 ++++++++++++ .../storehaus/hbase/HBaseStringStore.scala | 98 +++++++++++++++++++ .../storehaus/hbase/DefaultHBaseConfig.scala | 29 ++++++ .../hbase/HBaseLongStoreProperties.scala | 43 ++++++++ .../hbase/HBaseStringStoreProperties.scala | 71 ++++++++++++++ 7 files changed, 484 insertions(+) create mode 100644 storehaus-hbase/src/main/scala/com/twitter/storehaus/hbase/HBaseLongStore.scala create mode 100644 storehaus-hbase/src/main/scala/com/twitter/storehaus/hbase/HBaseStore.scala create mode 100644 storehaus-hbase/src/main/scala/com/twitter/storehaus/hbase/HBaseStoreConfig.scala create mode 100644 storehaus-hbase/src/main/scala/com/twitter/storehaus/hbase/HBaseStringStore.scala create mode 100644 storehaus-hbase/src/test/scala/com/twitter/storehaus/hbase/DefaultHBaseConfig.scala create mode 100644 storehaus-hbase/src/test/scala/com/twitter/storehaus/hbase/HBaseLongStoreProperties.scala create mode 100644 storehaus-hbase/src/test/scala/com/twitter/storehaus/hbase/HBaseStringStoreProperties.scala diff --git a/storehaus-hbase/src/main/scala/com/twitter/storehaus/hbase/HBaseLongStore.scala b/storehaus-hbase/src/main/scala/com/twitter/storehaus/hbase/HBaseLongStore.scala new file mode 100644 index 00000000..c8ac62dd --- /dev/null +++ b/storehaus-hbase/src/main/scala/com/twitter/storehaus/hbase/HBaseLongStore.scala @@ -0,0 +1,93 @@ +/* + * Copyright 2013 Twitter inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.twitter.storehaus.hbase + +import org.apache.hadoop.hbase.client.{Delete, Put, Result, HTablePool} +import com.twitter.storehaus.Store +import org.apache.commons.lang.StringUtils._ +import com.twitter.util.Future +import com.twitter.bijection.hbase.HBaseBijections._ +import com.twitter.bijection.Bijection +import com.twitter.bijection.Conversion._ + +/** + * @author MansurAshraf + * @since 9/8/13 + */ +object HBaseLongStore { + def apply(quorumNames: String, table: String, columnFamily: String, column: String, createTable: Boolean): HBaseLongStore = { + val store = new HBaseLongStore(quorumNames, table, columnFamily, column, createTable, new HTablePool()) + store.createTableIfRequired() + store + } +} + +class HBaseLongStore(val quorumNames: String, val table: String, val columnFamily: String, val column: String, val createTable: Boolean, val pool: HTablePool) extends Store[String, Long] with HBaseStoreConfig { + + require(isNotEmpty(quorumNames), "Zookeeper quorums are required") + require(isNotEmpty(columnFamily), "column family is required") + require(isNotEmpty(column), "column is required") + + + /** get a single key from the store. + * Prefer multiGet if you are getting more than one key at a time + */ + override def get(k: String): Future[Option[Long]] = Future[Option[Long]] { + val tbl = pool.getTable(table) + val g = createGetRequest(k.as[StringBytes]) + val result = tbl.get(g) + extractValue(result) + } + + + /** + * replace a value + * Delete is the same as put((k,None)) + */ + override def put(kv: (String, Option[Long])): Future[Unit] = { + kv match { + case (k, Some(v)) => { + Future { + val p = new Put(k.as[StringBytes]) + p.add(columnFamily.as[StringBytes], column.as[StringBytes], v.as[LongBytes]) + val tbl = pool.getTable(table) + tbl.put(p) + } + } + case (k, None) => Future { + val delete = new Delete(k.as[StringBytes]) + val tbl = pool.getTable(table) + tbl.delete(delete) + } + } + + } + + /** Close this store and release any resources. + * It is undefined what happens on get/multiGet after close + */ + override def close { + pool.close() + } + + def extractValue(result: Result): Option[Long] = { + val value = result.getValue(columnFamily.as[StringBytes], column.as[StringBytes]) + Option(value) map (v => Bijection.invert[Long, LongBytes](v.asInstanceOf[LongBytes])) + } + +} + diff --git a/storehaus-hbase/src/main/scala/com/twitter/storehaus/hbase/HBaseStore.scala b/storehaus-hbase/src/main/scala/com/twitter/storehaus/hbase/HBaseStore.scala new file mode 100644 index 00000000..15ba9302 --- /dev/null +++ b/storehaus-hbase/src/main/scala/com/twitter/storehaus/hbase/HBaseStore.scala @@ -0,0 +1,90 @@ +/* + * Copyright 2013 Twitter inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.twitter.storehaus.hbase + +import org.apache.hadoop.hbase.client.{Delete, Put, Result, HTablePool} +import com.twitter.storehaus.Store +import org.apache.commons.lang.StringUtils._ +import com.twitter.util.Future +import com.twitter.bijection.hbase.HBaseBijections._ +import com.twitter.bijection.Conversion._ + +/** + * @author MansurAshraf + * @since 9/8/13 + */ +object HBaseStore { + def apply(quorumNames: String, table: String, columnFamily: String, column: String, createTable: Boolean): HBaseStore = { + val store = new HBaseStore(quorumNames, table, columnFamily, column, createTable, new HTablePool()) + store.createTableIfRequired() + store + } +} + +class HBaseStore(val quorumNames: String, val table: String, val columnFamily: String, val column: String, val createTable: Boolean, val pool: HTablePool) extends Store[Array[Byte], Array[Byte]] with HBaseStoreConfig { + + require(isNotEmpty(quorumNames), "Zookeeper quorums are required") + require(isNotEmpty(columnFamily), "column family is required") + require(isNotEmpty(column), "column is required") + + + /** get a single key from the store. + * Prefer multiGet if you are getting more than one key at a time + */ + override def get(k: Array[Byte]): Future[Option[Array[Byte]]] = Future[Option[Array[Byte]]] { + val tbl = pool.getTable(table) + val g = createGetRequest(k) + val result = tbl.get(g) + extractValue(result) + } + + + /** + * replace a value + * Delete is the same as put((k,None)) + */ + override def put(kv: (Array[Byte], Option[Array[Byte]])): Future[Unit] = { + kv match { + case (k, Some(v)) => Future { + val p = new Put(k) + p.add(columnFamily.as[StringBytes], column.as[StringBytes], v) + val tbl = pool.getTable(table) + tbl.put(p) + } + case (k, None) => Future { + val delete = new Delete(k) + val tbl = pool.getTable(table) + tbl.delete(delete) + } + } + + } + + /** Close this store and release any resources. + * It is undefined what happens on get/multiGet after close + */ + override def close { + pool.close() + } + + def extractValue(result: Result): Option[Array[Byte]] = { + val value = result.getValue(columnFamily.as[StringBytes], column.as[StringBytes]) + Option(value) + } + +} + diff --git a/storehaus-hbase/src/main/scala/com/twitter/storehaus/hbase/HBaseStoreConfig.scala b/storehaus-hbase/src/main/scala/com/twitter/storehaus/hbase/HBaseStoreConfig.scala new file mode 100644 index 00000000..d422eea7 --- /dev/null +++ b/storehaus-hbase/src/main/scala/com/twitter/storehaus/hbase/HBaseStoreConfig.scala @@ -0,0 +1,60 @@ +/* + * Copyright 2013 Twitter inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.twitter.storehaus.hbase + +import org.apache.hadoop.hbase.client.{Get, HBaseAdmin} +import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.hbase.{HColumnDescriptor, HTableDescriptor, HBaseConfiguration} +import com.twitter.bijection.hbase.HBaseBijections._ +import com.twitter.bijection.Conversion._ + +/** + * @author MansurAshraf + * @since 9/8/13 + */ +trait HBaseStoreConfig { + + val quorumNames: String + val createTable: Boolean + val table: String + val columnFamily: String + val column:String + + def getHBaseAdmin: HBaseAdmin = { + val conf = new Configuration() + conf.set("hbase.zookeeper.quorum", quorumNames) + val hbaseConf = HBaseConfiguration.create(conf) + new HBaseAdmin(hbaseConf) + } + + def createTableIfRequired() { + val hbaseAdmin = getHBaseAdmin + if (createTable && !hbaseAdmin.tableExists(table)) { + val tableDescriptor = new HTableDescriptor(table) + tableDescriptor.addFamily(new HColumnDescriptor(columnFamily)) + hbaseAdmin.createTable(tableDescriptor) + } + } + + + def createGetRequest(k: Array[Byte]): Get = { + val g = new Get(k) + g.addColumn(columnFamily.as[StringBytes], column.as[StringBytes]) + g + } + +} diff --git a/storehaus-hbase/src/main/scala/com/twitter/storehaus/hbase/HBaseStringStore.scala b/storehaus-hbase/src/main/scala/com/twitter/storehaus/hbase/HBaseStringStore.scala new file mode 100644 index 00000000..c67f56b2 --- /dev/null +++ b/storehaus-hbase/src/main/scala/com/twitter/storehaus/hbase/HBaseStringStore.scala @@ -0,0 +1,98 @@ +/* + * Copyright 2013 Twitter inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.twitter.storehaus.hbase + +import org.apache.commons.lang.StringUtils._ +import org.apache.hadoop.hbase.client._ +import com.twitter.storehaus.Store +import com.twitter.util.Future +import com.twitter.bijection.Conversion._ +import com.twitter.bijection.hbase.HBaseBijections._ +import com.twitter.bijection.Bijection +import scala.Some + +/** + * @author MansurAshraf + * @since 9/7/13 + */ +object HBaseStringStore { + def apply(quorumNames: String, table: String, columnFamily: String, column: String, createTable: Boolean): HBaseStringStore = { + val stringStore = new HBaseStringStore(quorumNames, table, columnFamily, column, createTable, new HTablePool()) + stringStore.createTableIfRequired() + stringStore + } +} + +class HBaseStringStore(val quorumNames: String, val table: String, val columnFamily: String, val column: String, val createTable: Boolean, val pool: HTablePool) extends Store[String, String] with HBaseStoreConfig { + + require(isNotEmpty(quorumNames), "Zookeeper quorums are required") + require(isNotEmpty(columnFamily), "column family is required") + + + /** get a single key from the store. + * Prefer multiGet if you are getting more than one key at a time + */ + override def get(k: String): Future[Option[String]] = Future[Option[String]] { + val tbl = pool.getTable(table) + val g = createGetRequest(k) + val result = tbl.get(g) + extractValue(result) + } + + + /** + * replace a value + * Delete is the same as put((k,None)) + */ + override def put(kv: (String, Option[String])): Future[Unit] = { + kv match { + case (k, Some(v)) => { + Future { + val p = new Put(k.as[StringBytes]) + p.add(columnFamily.as[StringBytes], column.as[StringBytes], v.as[StringBytes]) + val tbl = pool.getTable(table) + tbl.put(p) + } + } + case (k, None) => Future { + val delete = new Delete(k.as[StringBytes]) + val tbl = pool.getTable(table) + tbl.delete(delete) + } + } + + } + + /** Close this store and release any resources. + * It is undefined what happens on get/multiGet after close + */ + override def close { + pool.close() + } + + def createGetRequest(k: String): Get = { + val g = new Get(k.as[StringBytes]) + g.addColumn(columnFamily.as[StringBytes], column.as[StringBytes]) + g + } + + def extractValue(result: Result): Option[String] = { + val value = result.getValue(columnFamily.as[StringBytes], column.as[StringBytes]) + Option(value) map (v => Bijection.invert[String, StringBytes](v.asInstanceOf[StringBytes])) + } + +} diff --git a/storehaus-hbase/src/test/scala/com/twitter/storehaus/hbase/DefaultHBaseConfig.scala b/storehaus-hbase/src/test/scala/com/twitter/storehaus/hbase/DefaultHBaseConfig.scala new file mode 100644 index 00000000..775a3aed --- /dev/null +++ b/storehaus-hbase/src/test/scala/com/twitter/storehaus/hbase/DefaultHBaseConfig.scala @@ -0,0 +1,29 @@ +/* + * Copyright 2013 Twitter inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.twitter.storehaus.hbase + +/** + * @author MansurAshraf + * @since 9/8/13 + */ +trait DefaultHBaseConfig { + val quorumNames = "localhost:2181" + val table = "summing_bird" + val columnFamily = "sb" + val column = "aggregate" + val createTable = true +} diff --git a/storehaus-hbase/src/test/scala/com/twitter/storehaus/hbase/HBaseLongStoreProperties.scala b/storehaus-hbase/src/test/scala/com/twitter/storehaus/hbase/HBaseLongStoreProperties.scala new file mode 100644 index 00000000..eb054366 --- /dev/null +++ b/storehaus-hbase/src/test/scala/com/twitter/storehaus/hbase/HBaseLongStoreProperties.scala @@ -0,0 +1,43 @@ +/* + * Copyright 2013 Twitter inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.twitter.storehaus.hbase + +import org.scalacheck.{Gen, Properties} +import com.twitter.storehaus.testing.CloseableCleanup +import com.twitter.storehaus.Store +import com.twitter.storehaus.testing.generator.NonEmpty +import HBaseStringStoreProperties._ + +/** + * @author MansurAshraf + * @since 9/8/13 + */ +object HBaseLongStoreProperties extends Properties("HBaseLongStore") +with CloseableCleanup[Store[String, Long]] +with DefaultHBaseConfig { + + def validPairs: Gen[List[(String, Option[Long])]] = + NonEmpty.Pairing.alphaStrNumerics[Long](10) + + def storeTest(store: Store[String, Long]) = + putStoreTest(store, validPairs) && multiPutStoreTest(store, validPairs) + + val closeable = HBaseLongStore(quorumNames, table, columnFamily, column, createTable) + + property("HBaseLongStore test") = storeTest(closeable) + +} diff --git a/storehaus-hbase/src/test/scala/com/twitter/storehaus/hbase/HBaseStringStoreProperties.scala b/storehaus-hbase/src/test/scala/com/twitter/storehaus/hbase/HBaseStringStoreProperties.scala new file mode 100644 index 00000000..d9b7164e --- /dev/null +++ b/storehaus-hbase/src/test/scala/com/twitter/storehaus/hbase/HBaseStringStoreProperties.scala @@ -0,0 +1,71 @@ +/* + * Copyright 2013 Twitter inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.twitter.storehaus.hbase + +import org.scalacheck.{Arbitrary, Gen, Properties} +import com.twitter.storehaus.testing.CloseableCleanup +import com.twitter.storehaus.{FutureOps, Store} +import com.twitter.storehaus.testing.generator.NonEmpty +import org.scalacheck.Prop._ +import com.twitter.util.Await + +/** + * @author MansurAshraf + * @since 9/8/13 + */ +object HBaseStringStoreProperties extends Properties("HBaseStore") +with CloseableCleanup[Store[String, String]] +with DefaultHBaseConfig { + + def validPairs: Gen[List[(String, Option[String])]] = + NonEmpty.Pairing.alphaStrs() + + def baseTest[K: Arbitrary, V: Arbitrary : Equiv](store: Store[K, V], validPairs: Gen[List[(K, Option[V])]]) + (put: (Store[K, V], List[(K, Option[V])]) => Unit) = + forAll(validPairs) { + (examples: List[(K, Option[V])]) => + put(store, examples) + examples.toMap.forall { + case (k, optV) => + val res = Await.result(store.get(k)) + Equiv[Option[V]].equiv(res, optV) + } + } + + def putStoreTest[K: Arbitrary, V: Arbitrary : Equiv](store: Store[K, V], validPairs: Gen[List[(K, Option[V])]]) = + baseTest(store, validPairs) { + (s, pairs) => + pairs.foreach { + case (k, v) => Await.result(s.put((k, v))) + } + } + + def multiPutStoreTest[K: Arbitrary, V: Arbitrary : Equiv](store: Store[K, V], validPairs: Gen[List[(K, Option[V])]]) = + baseTest(store, validPairs) { + (s, pairs) => + Await.result(FutureOps.mapCollect(s.multiPut(pairs.toMap))) + } + + def storeTest(store: Store[String, String]) = + putStoreTest(store, validPairs) && multiPutStoreTest(store, validPairs) + + val closeable =HBaseStringStore(quorumNames, table, columnFamily, column, createTable) + + + property("HBaseStore test") = + storeTest(closeable) +} From 6c5d9dc4ca30ec91d3b81f07ee84f58e6554ff31 Mon Sep 17 00:00:00 2001 From: MansurAshraf Date: Mon, 9 Sep 2013 15:31:38 -0500 Subject: [PATCH 20/29] DRYed out the imp by moving common methods to HBaseStore trait --- .../storehaus/hbase/HBaseByteArrayStore.scala | 64 +++++++++++++ .../storehaus/hbase/HBaseLongStore.scala | 59 ++++-------- .../twitter/storehaus/hbase/HBaseStore.scala | 96 +++++++++---------- .../storehaus/hbase/HBaseStoreConfig.scala | 60 ------------ .../storehaus/hbase/HBaseStringStore.scala | 58 +++-------- 5 files changed, 145 insertions(+), 192 deletions(-) create mode 100644 storehaus-hbase/src/main/scala/com/twitter/storehaus/hbase/HBaseByteArrayStore.scala delete mode 100644 storehaus-hbase/src/main/scala/com/twitter/storehaus/hbase/HBaseStoreConfig.scala diff --git a/storehaus-hbase/src/main/scala/com/twitter/storehaus/hbase/HBaseByteArrayStore.scala b/storehaus-hbase/src/main/scala/com/twitter/storehaus/hbase/HBaseByteArrayStore.scala new file mode 100644 index 00000000..9ee3acfc --- /dev/null +++ b/storehaus-hbase/src/main/scala/com/twitter/storehaus/hbase/HBaseByteArrayStore.scala @@ -0,0 +1,64 @@ +/* + * Copyright 2013 Twitter inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.twitter.storehaus.hbase + +import org.apache.hadoop.hbase.client.HTablePool +import com.twitter.storehaus.Store +import com.twitter.util.Future + +/** + * @author MansurAshraf + * @since 9/8/13 + */ +object HBaseByteArrayStore { + def apply(quorumNames: String, table: String, columnFamily: String, column: String, createTable: Boolean): HBaseByteArrayStore = { + val store = new HBaseByteArrayStore(quorumNames, table, columnFamily, column, createTable, new HTablePool()) + store.validateConfiguration() + store.createTableIfRequired() + store + } +} + +class HBaseByteArrayStore(val quorumNames: String, + val table: String, + val columnFamily: String, + val column: String, + val createTable: Boolean, + val pool: HTablePool) extends Store[Array[Byte], Array[Byte]] with HBaseStore { + + /** get a single key from the store. + * Prefer multiGet if you are getting more than one key at a time + */ + override def get(k: Array[Byte]): Future[Option[Array[Byte]]] = { + getValue(k) + } + + /** + * replace a value + * Delete is the same as put((k,None)) + */ + override def put(kv: (Array[Byte], Option[Array[Byte]])): Future[Unit] = { + putValue(kv) + } + + /** Close this store and release any resources. + * It is undefined what happens on get/multiGet after close + */ + override def close { + pool.close() + } +} diff --git a/storehaus-hbase/src/main/scala/com/twitter/storehaus/hbase/HBaseLongStore.scala b/storehaus-hbase/src/main/scala/com/twitter/storehaus/hbase/HBaseLongStore.scala index c8ac62dd..fa60fc3a 100644 --- a/storehaus-hbase/src/main/scala/com/twitter/storehaus/hbase/HBaseLongStore.scala +++ b/storehaus-hbase/src/main/scala/com/twitter/storehaus/hbase/HBaseLongStore.scala @@ -16,65 +16,50 @@ package com.twitter.storehaus.hbase -import org.apache.hadoop.hbase.client.{Delete, Put, Result, HTablePool} +import org.apache.hadoop.hbase.client.HTablePool import com.twitter.storehaus.Store -import org.apache.commons.lang.StringUtils._ import com.twitter.util.Future -import com.twitter.bijection.hbase.HBaseBijections._ -import com.twitter.bijection.Bijection -import com.twitter.bijection.Conversion._ +import com.twitter.bijection.Injection._ /** - * @author MansurAshraf + * @author Mansur Ashraf * @since 9/8/13 */ object HBaseLongStore { def apply(quorumNames: String, table: String, columnFamily: String, column: String, createTable: Boolean): HBaseLongStore = { val store = new HBaseLongStore(quorumNames, table, columnFamily, column, createTable, new HTablePool()) + store.validateConfiguration() store.createTableIfRequired() store } } -class HBaseLongStore(val quorumNames: String, val table: String, val columnFamily: String, val column: String, val createTable: Boolean, val pool: HTablePool) extends Store[String, Long] with HBaseStoreConfig { - - require(isNotEmpty(quorumNames), "Zookeeper quorums are required") - require(isNotEmpty(columnFamily), "column family is required") - require(isNotEmpty(column), "column is required") - +class HBaseLongStore(protected val quorumNames: String, + protected val table: String, + protected val columnFamily: String, + protected val column: String, + protected val createTable: Boolean, + protected val pool: HTablePool) extends Store[String, Long] with HBaseStore { /** get a single key from the store. * Prefer multiGet if you are getting more than one key at a time */ - override def get(k: String): Future[Option[Long]] = Future[Option[Long]] { - val tbl = pool.getTable(table) - val g = createGetRequest(k.as[StringBytes]) - val result = tbl.get(g) - extractValue(result) + override def get(k: String): Future[Option[Long]] = { + import com.twitter.bijection.hbase.HBaseBijections._ + implicit val stringInj = fromBijectionRep[String, StringBytes] + implicit val LongInj = fromBijectionRep[Long, LongBytes] + getValue[String,Long](k) } - /** * replace a value * Delete is the same as put((k,None)) */ override def put(kv: (String, Option[Long])): Future[Unit] = { - kv match { - case (k, Some(v)) => { - Future { - val p = new Put(k.as[StringBytes]) - p.add(columnFamily.as[StringBytes], column.as[StringBytes], v.as[LongBytes]) - val tbl = pool.getTable(table) - tbl.put(p) - } - } - case (k, None) => Future { - val delete = new Delete(k.as[StringBytes]) - val tbl = pool.getTable(table) - tbl.delete(delete) - } - } - + import com.twitter.bijection.hbase.HBaseBijections._ + implicit val stringInj = fromBijectionRep[String, StringBytes] + implicit val LongInj = fromBijectionRep[Long, LongBytes] + putValue(kv) } /** Close this store and release any resources. @@ -83,11 +68,5 @@ class HBaseLongStore(val quorumNames: String, val table: String, val columnFamil override def close { pool.close() } - - def extractValue(result: Result): Option[Long] = { - val value = result.getValue(columnFamily.as[StringBytes], column.as[StringBytes]) - Option(value) map (v => Bijection.invert[Long, LongBytes](v.asInstanceOf[LongBytes])) - } - } diff --git a/storehaus-hbase/src/main/scala/com/twitter/storehaus/hbase/HBaseStore.scala b/storehaus-hbase/src/main/scala/com/twitter/storehaus/hbase/HBaseStore.scala index 15ba9302..587601ad 100644 --- a/storehaus-hbase/src/main/scala/com/twitter/storehaus/hbase/HBaseStore.scala +++ b/storehaus-hbase/src/main/scala/com/twitter/storehaus/hbase/HBaseStore.scala @@ -16,75 +16,75 @@ package com.twitter.storehaus.hbase -import org.apache.hadoop.hbase.client.{Delete, Put, Result, HTablePool} -import com.twitter.storehaus.Store -import org.apache.commons.lang.StringUtils._ -import com.twitter.util.Future +import org.apache.hadoop.hbase.client._ +import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.hbase.{HColumnDescriptor, HTableDescriptor, HBaseConfiguration} import com.twitter.bijection.hbase.HBaseBijections._ import com.twitter.bijection.Conversion._ +import com.twitter.bijection.Injection +import com.twitter.util.Future +import scala.Some /** - * @author MansurAshraf + * @author Mansur Ashraf * @since 9/8/13 */ -object HBaseStore { - def apply(quorumNames: String, table: String, columnFamily: String, column: String, createTable: Boolean): HBaseStore = { - val store = new HBaseStore(quorumNames, table, columnFamily, column, createTable, new HTablePool()) - store.createTableIfRequired() - store +trait HBaseStore { + + protected val quorumNames: String + protected val createTable: Boolean + protected val table: String + protected val columnFamily: String + protected val column: String + protected val pool: HTablePool + + def getHBaseAdmin: HBaseAdmin = { + val conf = new Configuration() + conf.set("hbase.zookeeper.quorum", quorumNames) + val hbaseConf = HBaseConfiguration.create(conf) + new HBaseAdmin(hbaseConf) } -} -class HBaseStore(val quorumNames: String, val table: String, val columnFamily: String, val column: String, val createTable: Boolean, val pool: HTablePool) extends Store[Array[Byte], Array[Byte]] with HBaseStoreConfig { + def createTableIfRequired() { + val hbaseAdmin = getHBaseAdmin + if (createTable && !hbaseAdmin.tableExists(table)) { + val tableDescriptor = new HTableDescriptor(table) + tableDescriptor.addFamily(new HColumnDescriptor(columnFamily)) + hbaseAdmin.createTable(tableDescriptor) + } + } - require(isNotEmpty(quorumNames), "Zookeeper quorums are required") - require(isNotEmpty(columnFamily), "column family is required") - require(isNotEmpty(column), "column is required") + def validateConfiguration() { + import org.apache.commons.lang.StringUtils.isNotEmpty + require(isNotEmpty(quorumNames), "Zookeeper quorums are required") + require(isNotEmpty(columnFamily), "column family is required") + require(isNotEmpty(column), "column is required") + } - /** get a single key from the store. - * Prefer multiGet if you are getting more than one key at a time - */ - override def get(k: Array[Byte]): Future[Option[Array[Byte]]] = Future[Option[Array[Byte]]] { + def getValue[K, V](key: K)(implicit keyInj: Injection[K, Array[Byte]], valueInj: Injection[V, Array[Byte]]): Future[Option[V]] = Future { val tbl = pool.getTable(table) - val g = createGetRequest(k) + val g = new Get(keyInj(key)) + g.addColumn(columnFamily.as[StringBytes], column.as[StringBytes]) + val result = tbl.get(g) - extractValue(result) + val value = result.getValue(columnFamily.as[StringBytes], column.as[StringBytes]) + Option(value).map(v => valueInj.invert(v).get) } - - /** - * replace a value - * Delete is the same as put((k,None)) - */ - override def put(kv: (Array[Byte], Option[Array[Byte]])): Future[Unit] = { + def putValue[K, V](kv: (K, Option[V]))(implicit keyInj: Injection[K, Array[Byte]], valueInj: Injection[V, Array[Byte]]): Future[Unit] = { kv match { case (k, Some(v)) => Future { - val p = new Put(k) - p.add(columnFamily.as[StringBytes], column.as[StringBytes], v) - val tbl = pool.getTable(table) - tbl.put(p) - } + val p = new Put(keyInj(k)) + p.add(columnFamily.as[StringBytes], column.as[StringBytes], valueInj(v)) + val tbl = pool.getTable(table) + tbl.put(p) + } case (k, None) => Future { - val delete = new Delete(k) + val delete = new Delete(keyInj(k)) val tbl = pool.getTable(table) tbl.delete(delete) } } - - } - - /** Close this store and release any resources. - * It is undefined what happens on get/multiGet after close - */ - override def close { - pool.close() } - - def extractValue(result: Result): Option[Array[Byte]] = { - val value = result.getValue(columnFamily.as[StringBytes], column.as[StringBytes]) - Option(value) - } - } - diff --git a/storehaus-hbase/src/main/scala/com/twitter/storehaus/hbase/HBaseStoreConfig.scala b/storehaus-hbase/src/main/scala/com/twitter/storehaus/hbase/HBaseStoreConfig.scala deleted file mode 100644 index d422eea7..00000000 --- a/storehaus-hbase/src/main/scala/com/twitter/storehaus/hbase/HBaseStoreConfig.scala +++ /dev/null @@ -1,60 +0,0 @@ -/* - * Copyright 2013 Twitter inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.twitter.storehaus.hbase - -import org.apache.hadoop.hbase.client.{Get, HBaseAdmin} -import org.apache.hadoop.conf.Configuration -import org.apache.hadoop.hbase.{HColumnDescriptor, HTableDescriptor, HBaseConfiguration} -import com.twitter.bijection.hbase.HBaseBijections._ -import com.twitter.bijection.Conversion._ - -/** - * @author MansurAshraf - * @since 9/8/13 - */ -trait HBaseStoreConfig { - - val quorumNames: String - val createTable: Boolean - val table: String - val columnFamily: String - val column:String - - def getHBaseAdmin: HBaseAdmin = { - val conf = new Configuration() - conf.set("hbase.zookeeper.quorum", quorumNames) - val hbaseConf = HBaseConfiguration.create(conf) - new HBaseAdmin(hbaseConf) - } - - def createTableIfRequired() { - val hbaseAdmin = getHBaseAdmin - if (createTable && !hbaseAdmin.tableExists(table)) { - val tableDescriptor = new HTableDescriptor(table) - tableDescriptor.addFamily(new HColumnDescriptor(columnFamily)) - hbaseAdmin.createTable(tableDescriptor) - } - } - - - def createGetRequest(k: Array[Byte]): Get = { - val g = new Get(k) - g.addColumn(columnFamily.as[StringBytes], column.as[StringBytes]) - g - } - -} diff --git a/storehaus-hbase/src/main/scala/com/twitter/storehaus/hbase/HBaseStringStore.scala b/storehaus-hbase/src/main/scala/com/twitter/storehaus/hbase/HBaseStringStore.scala index c67f56b2..3e9d2c79 100644 --- a/storehaus-hbase/src/main/scala/com/twitter/storehaus/hbase/HBaseStringStore.scala +++ b/storehaus-hbase/src/main/scala/com/twitter/storehaus/hbase/HBaseStringStore.scala @@ -16,14 +16,10 @@ package com.twitter.storehaus.hbase -import org.apache.commons.lang.StringUtils._ import org.apache.hadoop.hbase.client._ import com.twitter.storehaus.Store import com.twitter.util.Future -import com.twitter.bijection.Conversion._ -import com.twitter.bijection.hbase.HBaseBijections._ -import com.twitter.bijection.Bijection -import scala.Some +import com.twitter.bijection.Injection._ /** * @author MansurAshraf @@ -32,49 +28,35 @@ import scala.Some object HBaseStringStore { def apply(quorumNames: String, table: String, columnFamily: String, column: String, createTable: Boolean): HBaseStringStore = { val stringStore = new HBaseStringStore(quorumNames, table, columnFamily, column, createTable, new HTablePool()) + stringStore.validateConfiguration() stringStore.createTableIfRequired() stringStore } } -class HBaseStringStore(val quorumNames: String, val table: String, val columnFamily: String, val column: String, val createTable: Boolean, val pool: HTablePool) extends Store[String, String] with HBaseStoreConfig { - - require(isNotEmpty(quorumNames), "Zookeeper quorums are required") - require(isNotEmpty(columnFamily), "column family is required") - +class HBaseStringStore(protected val quorumNames: String, + protected val table: String, + protected val columnFamily: String, + protected val column: String, val createTable: Boolean, + protected val pool: HTablePool) extends Store[String, String] with HBaseStore { /** get a single key from the store. * Prefer multiGet if you are getting more than one key at a time */ - override def get(k: String): Future[Option[String]] = Future[Option[String]] { - val tbl = pool.getTable(table) - val g = createGetRequest(k) - val result = tbl.get(g) - extractValue(result) + override def get(k: String): Future[Option[String]] = { + import com.twitter.bijection.hbase.HBaseBijections._ + implicit val stringInj = fromBijectionRep[String, StringBytes] + get(k) } - /** * replace a value * Delete is the same as put((k,None)) */ override def put(kv: (String, Option[String])): Future[Unit] = { - kv match { - case (k, Some(v)) => { - Future { - val p = new Put(k.as[StringBytes]) - p.add(columnFamily.as[StringBytes], column.as[StringBytes], v.as[StringBytes]) - val tbl = pool.getTable(table) - tbl.put(p) - } - } - case (k, None) => Future { - val delete = new Delete(k.as[StringBytes]) - val tbl = pool.getTable(table) - tbl.delete(delete) - } - } - + import com.twitter.bijection.hbase.HBaseBijections._ + implicit val stringInj = fromBijectionRep[String, StringBytes] + putValue(kv) } /** Close this store and release any resources. @@ -83,16 +65,4 @@ class HBaseStringStore(val quorumNames: String, val table: String, val columnFam override def close { pool.close() } - - def createGetRequest(k: String): Get = { - val g = new Get(k.as[StringBytes]) - g.addColumn(columnFamily.as[StringBytes], column.as[StringBytes]) - g - } - - def extractValue(result: Result): Option[String] = { - val value = result.getValue(columnFamily.as[StringBytes], column.as[StringBytes]) - Option(value) map (v => Bijection.invert[String, StringBytes](v.asInstanceOf[StringBytes])) - } - } From fee87345da7a9aa632975dc23d09914221bdcc38 Mon Sep 17 00:00:00 2001 From: MansurAshraf Date: Mon, 9 Sep 2013 16:24:35 -0500 Subject: [PATCH 21/29] changed quorum to Seq --- .../com/twitter/storehaus/hbase/HBaseByteArrayStore.scala | 4 ++-- .../scala/com/twitter/storehaus/hbase/HBaseLongStore.scala | 4 ++-- .../main/scala/com/twitter/storehaus/hbase/HBaseStore.scala | 6 +++--- .../com/twitter/storehaus/hbase/HBaseStringStore.scala | 6 +++--- .../com/twitter/storehaus/hbase/DefaultHBaseConfig.scala | 2 +- 5 files changed, 11 insertions(+), 11 deletions(-) diff --git a/storehaus-hbase/src/main/scala/com/twitter/storehaus/hbase/HBaseByteArrayStore.scala b/storehaus-hbase/src/main/scala/com/twitter/storehaus/hbase/HBaseByteArrayStore.scala index 9ee3acfc..86c45488 100644 --- a/storehaus-hbase/src/main/scala/com/twitter/storehaus/hbase/HBaseByteArrayStore.scala +++ b/storehaus-hbase/src/main/scala/com/twitter/storehaus/hbase/HBaseByteArrayStore.scala @@ -25,7 +25,7 @@ import com.twitter.util.Future * @since 9/8/13 */ object HBaseByteArrayStore { - def apply(quorumNames: String, table: String, columnFamily: String, column: String, createTable: Boolean): HBaseByteArrayStore = { + def apply(quorumNames: Seq[String], table: String, columnFamily: String, column: String, createTable: Boolean): HBaseByteArrayStore = { val store = new HBaseByteArrayStore(quorumNames, table, columnFamily, column, createTable, new HTablePool()) store.validateConfiguration() store.createTableIfRequired() @@ -33,7 +33,7 @@ object HBaseByteArrayStore { } } -class HBaseByteArrayStore(val quorumNames: String, +class HBaseByteArrayStore(val quorumNames: Seq[String], val table: String, val columnFamily: String, val column: String, diff --git a/storehaus-hbase/src/main/scala/com/twitter/storehaus/hbase/HBaseLongStore.scala b/storehaus-hbase/src/main/scala/com/twitter/storehaus/hbase/HBaseLongStore.scala index fa60fc3a..5968c4ba 100644 --- a/storehaus-hbase/src/main/scala/com/twitter/storehaus/hbase/HBaseLongStore.scala +++ b/storehaus-hbase/src/main/scala/com/twitter/storehaus/hbase/HBaseLongStore.scala @@ -26,7 +26,7 @@ import com.twitter.bijection.Injection._ * @since 9/8/13 */ object HBaseLongStore { - def apply(quorumNames: String, table: String, columnFamily: String, column: String, createTable: Boolean): HBaseLongStore = { + def apply(quorumNames: Seq[String], table: String, columnFamily: String, column: String, createTable: Boolean): HBaseLongStore = { val store = new HBaseLongStore(quorumNames, table, columnFamily, column, createTable, new HTablePool()) store.validateConfiguration() store.createTableIfRequired() @@ -34,7 +34,7 @@ object HBaseLongStore { } } -class HBaseLongStore(protected val quorumNames: String, +class HBaseLongStore(protected val quorumNames: Seq[String], protected val table: String, protected val columnFamily: String, protected val column: String, diff --git a/storehaus-hbase/src/main/scala/com/twitter/storehaus/hbase/HBaseStore.scala b/storehaus-hbase/src/main/scala/com/twitter/storehaus/hbase/HBaseStore.scala index 587601ad..d87d41bf 100644 --- a/storehaus-hbase/src/main/scala/com/twitter/storehaus/hbase/HBaseStore.scala +++ b/storehaus-hbase/src/main/scala/com/twitter/storehaus/hbase/HBaseStore.scala @@ -31,7 +31,7 @@ import scala.Some */ trait HBaseStore { - protected val quorumNames: String + protected val quorumNames: Seq[String] protected val createTable: Boolean protected val table: String protected val columnFamily: String @@ -40,7 +40,7 @@ trait HBaseStore { def getHBaseAdmin: HBaseAdmin = { val conf = new Configuration() - conf.set("hbase.zookeeper.quorum", quorumNames) + conf.set("hbase.zookeeper.quorum", quorumNames.mkString(",")) val hbaseConf = HBaseConfiguration.create(conf) new HBaseAdmin(hbaseConf) } @@ -57,7 +57,7 @@ trait HBaseStore { def validateConfiguration() { import org.apache.commons.lang.StringUtils.isNotEmpty - require(isNotEmpty(quorumNames), "Zookeeper quorums are required") + require(!quorumNames.isEmpty, "Zookeeper quorums are required") require(isNotEmpty(columnFamily), "column family is required") require(isNotEmpty(column), "column is required") } diff --git a/storehaus-hbase/src/main/scala/com/twitter/storehaus/hbase/HBaseStringStore.scala b/storehaus-hbase/src/main/scala/com/twitter/storehaus/hbase/HBaseStringStore.scala index 3e9d2c79..1bc9f9b9 100644 --- a/storehaus-hbase/src/main/scala/com/twitter/storehaus/hbase/HBaseStringStore.scala +++ b/storehaus-hbase/src/main/scala/com/twitter/storehaus/hbase/HBaseStringStore.scala @@ -26,7 +26,7 @@ import com.twitter.bijection.Injection._ * @since 9/7/13 */ object HBaseStringStore { - def apply(quorumNames: String, table: String, columnFamily: String, column: String, createTable: Boolean): HBaseStringStore = { + def apply(quorumNames: Seq[String], table: String, columnFamily: String, column: String, createTable: Boolean): HBaseStringStore = { val stringStore = new HBaseStringStore(quorumNames, table, columnFamily, column, createTable, new HTablePool()) stringStore.validateConfiguration() stringStore.createTableIfRequired() @@ -34,7 +34,7 @@ object HBaseStringStore { } } -class HBaseStringStore(protected val quorumNames: String, +class HBaseStringStore(protected val quorumNames: Seq[String], protected val table: String, protected val columnFamily: String, protected val column: String, val createTable: Boolean, @@ -46,7 +46,7 @@ class HBaseStringStore(protected val quorumNames: String, override def get(k: String): Future[Option[String]] = { import com.twitter.bijection.hbase.HBaseBijections._ implicit val stringInj = fromBijectionRep[String, StringBytes] - get(k) + getValue[String,String](k) } /** diff --git a/storehaus-hbase/src/test/scala/com/twitter/storehaus/hbase/DefaultHBaseConfig.scala b/storehaus-hbase/src/test/scala/com/twitter/storehaus/hbase/DefaultHBaseConfig.scala index 775a3aed..9b98906c 100644 --- a/storehaus-hbase/src/test/scala/com/twitter/storehaus/hbase/DefaultHBaseConfig.scala +++ b/storehaus-hbase/src/test/scala/com/twitter/storehaus/hbase/DefaultHBaseConfig.scala @@ -21,7 +21,7 @@ package com.twitter.storehaus.hbase * @since 9/8/13 */ trait DefaultHBaseConfig { - val quorumNames = "localhost:2181" + val quorumNames = Seq("localhost:2181") val table = "summing_bird" val columnFamily = "sb" val column = "aggregate" From 55791ed3fe5237f092f237bd75799d4484023408 Mon Sep 17 00:00:00 2001 From: MansurAshraf Date: Mon, 9 Sep 2013 23:28:08 -0500 Subject: [PATCH 22/29] added support for HBaseMinicluster for testing --- .travis.yml | 3 +- project/Build.scala | 21 ++++++++++-- .../storehaus/hbase/HBaseByteArrayStore.scala | 30 ++++++++++++----- .../storehaus/hbase/HBaseLongStore.scala | 22 ++++++++++--- .../twitter/storehaus/hbase/HBaseStore.scala | 16 ++++++---- .../storehaus/hbase/HBaseStringStore.scala | 32 +++++++++++++------ ...Config.scala => DefaultHBaseCluster.scala} | 15 ++++++++- .../hbase/HBaseLongStoreProperties.scala | 7 ++-- .../hbase/HBaseStringStoreProperties.scala | 13 +++----- 9 files changed, 115 insertions(+), 44 deletions(-) rename storehaus-hbase/src/test/scala/com/twitter/storehaus/hbase/{DefaultHBaseConfig.scala => DefaultHBaseCluster.scala} (64%) diff --git a/.travis.yml b/.travis.yml index befbde46..9c859b5f 100644 --- a/.travis.yml +++ b/.travis.yml @@ -8,4 +8,5 @@ before_script: - mysql -u root -e "grant all on storehaus_test.* to 'storehaususer'@'localhost';" services: - redis-server - - memcache \ No newline at end of file + - memcache +script: umask 0022 && sbt ++$TRAVIS_SCALA_VERSION test \ No newline at end of file diff --git a/project/Build.scala b/project/Build.scala index dce88423..3a609766 100644 --- a/project/Build.scala +++ b/project/Build.scala @@ -1,3 +1,19 @@ +/* + * Copyright 2013 Twitter inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package storehaus import sbt._ @@ -165,8 +181,9 @@ object StorehausBuild extends Build { "com.twitter" %% "algebird-core" % algebirdVersion, "com.twitter" %% "bijection-core" % bijectionVersion, "com.twitter" %% "bijection-hbase" % bijectionVersion , - "org.apache.hbase" % "hbase" % "0.94.4" % "provided->default", - "org.apache.hadoop" % "hadoop-core" % "1.0.4" % "provided->default" + "org.apache.hbase" % "hbase" % "0.94.4" % "provided->default" classifier "tests" classifier "", + "org.apache.hadoop" % "hadoop-core" % "1.0.4" % "provided->default", + "org.apache.hadoop" % "hadoop-test" % "1.0.4" % "test" ), parallelExecution in Test := false ).dependsOn(storehausAlgebra % "test->test;compile->compile") diff --git a/storehaus-hbase/src/main/scala/com/twitter/storehaus/hbase/HBaseByteArrayStore.scala b/storehaus-hbase/src/main/scala/com/twitter/storehaus/hbase/HBaseByteArrayStore.scala index 86c45488..ae53b29c 100644 --- a/storehaus-hbase/src/main/scala/com/twitter/storehaus/hbase/HBaseByteArrayStore.scala +++ b/storehaus-hbase/src/main/scala/com/twitter/storehaus/hbase/HBaseByteArrayStore.scala @@ -19,26 +19,40 @@ package com.twitter.storehaus.hbase import org.apache.hadoop.hbase.client.HTablePool import com.twitter.storehaus.Store import com.twitter.util.Future +import org.apache.hadoop.conf.Configuration /** * @author MansurAshraf * @since 9/8/13 */ object HBaseByteArrayStore { - def apply(quorumNames: Seq[String], table: String, columnFamily: String, column: String, createTable: Boolean): HBaseByteArrayStore = { - val store = new HBaseByteArrayStore(quorumNames, table, columnFamily, column, createTable, new HTablePool()) + def apply(quorumNames: Seq[String], + table: String, + columnFamily: String, + column: String, + createTable: Boolean, + pool: HTablePool, + conf: Configuration): HBaseByteArrayStore = { + val store = new HBaseByteArrayStore(quorumNames, table, columnFamily, column, createTable,pool,conf) store.validateConfiguration() store.createTableIfRequired() store } + + def apply(quorumNames: Seq[String], + table: String, + columnFamily: String, + column: String, + createTable: Boolean): HBaseByteArrayStore = apply(quorumNames,table,columnFamily,column,createTable,new HTablePool(), new Configuration()) } -class HBaseByteArrayStore(val quorumNames: Seq[String], - val table: String, - val columnFamily: String, - val column: String, - val createTable: Boolean, - val pool: HTablePool) extends Store[Array[Byte], Array[Byte]] with HBaseStore { +class HBaseByteArrayStore(protected val quorumNames: Seq[String], + protected val table: String, + protected val columnFamily: String, + protected val column: String, + protected val createTable: Boolean, + protected val pool: HTablePool, + protected val conf: Configuration) extends Store[Array[Byte], Array[Byte]] with HBaseStore { /** get a single key from the store. * Prefer multiGet if you are getting more than one key at a time diff --git a/storehaus-hbase/src/main/scala/com/twitter/storehaus/hbase/HBaseLongStore.scala b/storehaus-hbase/src/main/scala/com/twitter/storehaus/hbase/HBaseLongStore.scala index 5968c4ba..8d3cfc15 100644 --- a/storehaus-hbase/src/main/scala/com/twitter/storehaus/hbase/HBaseLongStore.scala +++ b/storehaus-hbase/src/main/scala/com/twitter/storehaus/hbase/HBaseLongStore.scala @@ -20,18 +20,31 @@ import org.apache.hadoop.hbase.client.HTablePool import com.twitter.storehaus.Store import com.twitter.util.Future import com.twitter.bijection.Injection._ +import org.apache.hadoop.conf.Configuration /** * @author Mansur Ashraf * @since 9/8/13 */ object HBaseLongStore { - def apply(quorumNames: Seq[String], table: String, columnFamily: String, column: String, createTable: Boolean): HBaseLongStore = { - val store = new HBaseLongStore(quorumNames, table, columnFamily, column, createTable, new HTablePool()) + def apply(quorumNames: Seq[String], + table: String, + columnFamily: String, + column: String, + createTable: Boolean, + pool: HTablePool, + conf: Configuration): HBaseLongStore = { + val store = new HBaseLongStore(quorumNames, table, columnFamily, column, createTable, pool, conf) store.validateConfiguration() store.createTableIfRequired() store } + + def apply(quorumNames: Seq[String], + table: String, + columnFamily: String, + column: String, + createTable: Boolean): HBaseLongStore = apply(quorumNames, table, columnFamily, column, createTable, new HTablePool(), new Configuration()) } class HBaseLongStore(protected val quorumNames: Seq[String], @@ -39,7 +52,8 @@ class HBaseLongStore(protected val quorumNames: Seq[String], protected val columnFamily: String, protected val column: String, protected val createTable: Boolean, - protected val pool: HTablePool) extends Store[String, Long] with HBaseStore { + protected val pool: HTablePool, + protected val conf: Configuration) extends Store[String, Long] with HBaseStore { /** get a single key from the store. * Prefer multiGet if you are getting more than one key at a time @@ -48,7 +62,7 @@ class HBaseLongStore(protected val quorumNames: Seq[String], import com.twitter.bijection.hbase.HBaseBijections._ implicit val stringInj = fromBijectionRep[String, StringBytes] implicit val LongInj = fromBijectionRep[Long, LongBytes] - getValue[String,Long](k) + getValue[String, Long](k) } /** diff --git a/storehaus-hbase/src/main/scala/com/twitter/storehaus/hbase/HBaseStore.scala b/storehaus-hbase/src/main/scala/com/twitter/storehaus/hbase/HBaseStore.scala index d87d41bf..f9be24a2 100644 --- a/storehaus-hbase/src/main/scala/com/twitter/storehaus/hbase/HBaseStore.scala +++ b/storehaus-hbase/src/main/scala/com/twitter/storehaus/hbase/HBaseStore.scala @@ -37,10 +37,12 @@ trait HBaseStore { protected val columnFamily: String protected val column: String protected val pool: HTablePool + protected val conf: Configuration def getHBaseAdmin: HBaseAdmin = { - val conf = new Configuration() - conf.set("hbase.zookeeper.quorum", quorumNames.mkString(",")) + if (conf.get("hbase.zookeeper.quorum") == null) { + conf.set("hbase.zookeeper.quorum", quorumNames.mkString(",")) + } val hbaseConf = HBaseConfiguration.create(conf) new HBaseAdmin(hbaseConf) } @@ -75,11 +77,11 @@ trait HBaseStore { def putValue[K, V](kv: (K, Option[V]))(implicit keyInj: Injection[K, Array[Byte]], valueInj: Injection[V, Array[Byte]]): Future[Unit] = { kv match { case (k, Some(v)) => Future { - val p = new Put(keyInj(k)) - p.add(columnFamily.as[StringBytes], column.as[StringBytes], valueInj(v)) - val tbl = pool.getTable(table) - tbl.put(p) - } + val p = new Put(keyInj(k)) + p.add(columnFamily.as[StringBytes], column.as[StringBytes], valueInj(v)) + val tbl = pool.getTable(table) + tbl.put(p) + } case (k, None) => Future { val delete = new Delete(keyInj(k)) val tbl = pool.getTable(table) diff --git a/storehaus-hbase/src/main/scala/com/twitter/storehaus/hbase/HBaseStringStore.scala b/storehaus-hbase/src/main/scala/com/twitter/storehaus/hbase/HBaseStringStore.scala index 1bc9f9b9..aa813d9d 100644 --- a/storehaus-hbase/src/main/scala/com/twitter/storehaus/hbase/HBaseStringStore.scala +++ b/storehaus-hbase/src/main/scala/com/twitter/storehaus/hbase/HBaseStringStore.scala @@ -20,33 +20,47 @@ import org.apache.hadoop.hbase.client._ import com.twitter.storehaus.Store import com.twitter.util.Future import com.twitter.bijection.Injection._ +import org.apache.hadoop.conf.Configuration /** * @author MansurAshraf * @since 9/7/13 */ object HBaseStringStore { - def apply(quorumNames: Seq[String], table: String, columnFamily: String, column: String, createTable: Boolean): HBaseStringStore = { - val stringStore = new HBaseStringStore(quorumNames, table, columnFamily, column, createTable, new HTablePool()) - stringStore.validateConfiguration() - stringStore.createTableIfRequired() - stringStore + def apply(quorumNames: Seq[String], + table: String, + columnFamily: String, + column: String, + createTable: Boolean, + pool: HTablePool, + conf: Configuration): HBaseStringStore = { + val store = new HBaseStringStore(quorumNames, table, columnFamily, column, createTable, pool, conf) + store.validateConfiguration() + store.createTableIfRequired() + store } + + def apply(quorumNames: Seq[String], + table: String, + columnFamily: String, + column: String, + createTable: Boolean): HBaseStringStore = apply(quorumNames, table, columnFamily, column, createTable, new HTablePool(), new Configuration()) } class HBaseStringStore(protected val quorumNames: Seq[String], protected val table: String, protected val columnFamily: String, - protected val column: String, val createTable: Boolean, - protected val pool: HTablePool) extends Store[String, String] with HBaseStore { - + protected val column: String, + protected val createTable: Boolean, + protected val pool: HTablePool, + protected val conf: Configuration) extends Store[String, String] with HBaseStore { /** get a single key from the store. * Prefer multiGet if you are getting more than one key at a time */ override def get(k: String): Future[Option[String]] = { import com.twitter.bijection.hbase.HBaseBijections._ implicit val stringInj = fromBijectionRep[String, StringBytes] - getValue[String,String](k) + getValue[String, String](k) } /** diff --git a/storehaus-hbase/src/test/scala/com/twitter/storehaus/hbase/DefaultHBaseConfig.scala b/storehaus-hbase/src/test/scala/com/twitter/storehaus/hbase/DefaultHBaseCluster.scala similarity index 64% rename from storehaus-hbase/src/test/scala/com/twitter/storehaus/hbase/DefaultHBaseConfig.scala rename to storehaus-hbase/src/test/scala/com/twitter/storehaus/hbase/DefaultHBaseCluster.scala index 9b98906c..fc47aa73 100644 --- a/storehaus-hbase/src/test/scala/com/twitter/storehaus/hbase/DefaultHBaseConfig.scala +++ b/storehaus-hbase/src/test/scala/com/twitter/storehaus/hbase/DefaultHBaseCluster.scala @@ -16,14 +16,27 @@ package com.twitter.storehaus.hbase +import org.apache.hadoop.hbase.HBaseTestingUtility +import org.apache.hadoop.hbase.client.HTablePool +import com.twitter.storehaus.testing.CloseableCleanup +import java.io.Closeable + /** * @author MansurAshraf * @since 9/8/13 */ -trait DefaultHBaseConfig { +trait DefaultHBaseCluster[C <: Closeable] extends CloseableCleanup[C] { val quorumNames = Seq("localhost:2181") val table = "summing_bird" val columnFamily = "sb" val column = "aggregate" val createTable = true + val testingUtil = new HBaseTestingUtility() + val conf = testingUtil.getConfiguration + val pool = new HTablePool(conf, 1) + + override def cleanup() { + super.cleanup() + testingUtil.shutdownMiniCluster() + } } diff --git a/storehaus-hbase/src/test/scala/com/twitter/storehaus/hbase/HBaseLongStoreProperties.scala b/storehaus-hbase/src/test/scala/com/twitter/storehaus/hbase/HBaseLongStoreProperties.scala index eb054366..df597c97 100644 --- a/storehaus-hbase/src/test/scala/com/twitter/storehaus/hbase/HBaseLongStoreProperties.scala +++ b/storehaus-hbase/src/test/scala/com/twitter/storehaus/hbase/HBaseLongStoreProperties.scala @@ -17,7 +17,6 @@ package com.twitter.storehaus.hbase import org.scalacheck.{Gen, Properties} -import com.twitter.storehaus.testing.CloseableCleanup import com.twitter.storehaus.Store import com.twitter.storehaus.testing.generator.NonEmpty import HBaseStringStoreProperties._ @@ -27,8 +26,7 @@ import HBaseStringStoreProperties._ * @since 9/8/13 */ object HBaseLongStoreProperties extends Properties("HBaseLongStore") -with CloseableCleanup[Store[String, Long]] -with DefaultHBaseConfig { +with DefaultHBaseCluster[Store[String, Long]] { def validPairs: Gen[List[(String, Option[Long])]] = NonEmpty.Pairing.alphaStrNumerics[Long](10) @@ -36,7 +34,8 @@ with DefaultHBaseConfig { def storeTest(store: Store[String, Long]) = putStoreTest(store, validPairs) && multiPutStoreTest(store, validPairs) - val closeable = HBaseLongStore(quorumNames, table, columnFamily, column, createTable) + testingUtil.startMiniCluster() + val closeable = HBaseLongStore(quorumNames, table, columnFamily, column, createTable,pool,conf) property("HBaseLongStore test") = storeTest(closeable) diff --git a/storehaus-hbase/src/test/scala/com/twitter/storehaus/hbase/HBaseStringStoreProperties.scala b/storehaus-hbase/src/test/scala/com/twitter/storehaus/hbase/HBaseStringStoreProperties.scala index d9b7164e..83abcdb0 100644 --- a/storehaus-hbase/src/test/scala/com/twitter/storehaus/hbase/HBaseStringStoreProperties.scala +++ b/storehaus-hbase/src/test/scala/com/twitter/storehaus/hbase/HBaseStringStoreProperties.scala @@ -22,15 +22,14 @@ import com.twitter.storehaus.{FutureOps, Store} import com.twitter.storehaus.testing.generator.NonEmpty import org.scalacheck.Prop._ import com.twitter.util.Await +import org.apache.hadoop.hbase.HBaseTestingUtility /** * @author MansurAshraf * @since 9/8/13 */ object HBaseStringStoreProperties extends Properties("HBaseStore") -with CloseableCleanup[Store[String, String]] -with DefaultHBaseConfig { - +with DefaultHBaseCluster[Store[String, String]] { def validPairs: Gen[List[(String, Option[String])]] = NonEmpty.Pairing.alphaStrs() @@ -63,9 +62,7 @@ with DefaultHBaseConfig { def storeTest(store: Store[String, String]) = putStoreTest(store, validPairs) && multiPutStoreTest(store, validPairs) - val closeable =HBaseStringStore(quorumNames, table, columnFamily, column, createTable) - - - property("HBaseStore test") = - storeTest(closeable) + testingUtil.startMiniCluster() + val closeable =HBaseStringStore(quorumNames, table, columnFamily, column, createTable,pool,conf) + property("HBaseStore test") =storeTest(closeable) } From 40a99ba9a936de806bd5257f8a0c120958fb5353 Mon Sep 17 00:00:00 2001 From: MansurAshraf Date: Tue, 10 Sep 2013 11:31:18 -0500 Subject: [PATCH 23/29] mocking hbase - second try --- project/Build.scala | 6 +-- .../hbase/HBaseLongStoreProperties.scala | 42 ------------------- 2 files changed, 3 insertions(+), 45 deletions(-) delete mode 100644 storehaus-hbase/src/test/scala/com/twitter/storehaus/hbase/HBaseLongStoreProperties.scala diff --git a/project/Build.scala b/project/Build.scala index 3a609766..3daa1bda 100644 --- a/project/Build.scala +++ b/project/Build.scala @@ -181,9 +181,9 @@ object StorehausBuild extends Build { "com.twitter" %% "algebird-core" % algebirdVersion, "com.twitter" %% "bijection-core" % bijectionVersion, "com.twitter" %% "bijection-hbase" % bijectionVersion , - "org.apache.hbase" % "hbase" % "0.94.4" % "provided->default" classifier "tests" classifier "", - "org.apache.hadoop" % "hadoop-core" % "1.0.4" % "provided->default", - "org.apache.hadoop" % "hadoop-test" % "1.0.4" % "test" + "org.apache.hbase" % "hbase" % "0.94.6" % "provided->default" classifier "tests" classifier "", + "org.apache.hadoop" % "hadoop-core" % "1.2.0" % "provided->default", + "org.apache.hadoop" % "hadoop-test" % "1.2.0" % "test" ), parallelExecution in Test := false ).dependsOn(storehausAlgebra % "test->test;compile->compile") diff --git a/storehaus-hbase/src/test/scala/com/twitter/storehaus/hbase/HBaseLongStoreProperties.scala b/storehaus-hbase/src/test/scala/com/twitter/storehaus/hbase/HBaseLongStoreProperties.scala deleted file mode 100644 index df597c97..00000000 --- a/storehaus-hbase/src/test/scala/com/twitter/storehaus/hbase/HBaseLongStoreProperties.scala +++ /dev/null @@ -1,42 +0,0 @@ -/* - * Copyright 2013 Twitter inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.twitter.storehaus.hbase - -import org.scalacheck.{Gen, Properties} -import com.twitter.storehaus.Store -import com.twitter.storehaus.testing.generator.NonEmpty -import HBaseStringStoreProperties._ - -/** - * @author MansurAshraf - * @since 9/8/13 - */ -object HBaseLongStoreProperties extends Properties("HBaseLongStore") -with DefaultHBaseCluster[Store[String, Long]] { - - def validPairs: Gen[List[(String, Option[Long])]] = - NonEmpty.Pairing.alphaStrNumerics[Long](10) - - def storeTest(store: Store[String, Long]) = - putStoreTest(store, validPairs) && multiPutStoreTest(store, validPairs) - - testingUtil.startMiniCluster() - val closeable = HBaseLongStore(quorumNames, table, columnFamily, column, createTable,pool,conf) - - property("HBaseLongStore test") = storeTest(closeable) - -} From 1677a9a810a9a436290c8f88a777d464aadb1bfe Mon Sep 17 00:00:00 2001 From: MansurAshraf Date: Tue, 10 Sep 2013 11:43:19 -0500 Subject: [PATCH 24/29] removed shutdown hook at it is causing zookeeper to hang --- .../com/twitter/storehaus/hbase/DefaultHBaseCluster.scala | 3 ++- .../twitter/storehaus/hbase/HBaseStringStoreProperties.scala | 1 + 2 files changed, 3 insertions(+), 1 deletion(-) diff --git a/storehaus-hbase/src/test/scala/com/twitter/storehaus/hbase/DefaultHBaseCluster.scala b/storehaus-hbase/src/test/scala/com/twitter/storehaus/hbase/DefaultHBaseCluster.scala index fc47aa73..d15b1de5 100644 --- a/storehaus-hbase/src/test/scala/com/twitter/storehaus/hbase/DefaultHBaseCluster.scala +++ b/storehaus-hbase/src/test/scala/com/twitter/storehaus/hbase/DefaultHBaseCluster.scala @@ -37,6 +37,7 @@ trait DefaultHBaseCluster[C <: Closeable] extends CloseableCleanup[C] { override def cleanup() { super.cleanup() - testingUtil.shutdownMiniCluster() + /* testingUtil.shutdownMiniZKCluster() + testingUtil.shutdownMiniCluster()*/ } } diff --git a/storehaus-hbase/src/test/scala/com/twitter/storehaus/hbase/HBaseStringStoreProperties.scala b/storehaus-hbase/src/test/scala/com/twitter/storehaus/hbase/HBaseStringStoreProperties.scala index 83abcdb0..b0382ec5 100644 --- a/storehaus-hbase/src/test/scala/com/twitter/storehaus/hbase/HBaseStringStoreProperties.scala +++ b/storehaus-hbase/src/test/scala/com/twitter/storehaus/hbase/HBaseStringStoreProperties.scala @@ -65,4 +65,5 @@ with DefaultHBaseCluster[Store[String, String]] { testingUtil.startMiniCluster() val closeable =HBaseStringStore(quorumNames, table, columnFamily, column, createTable,pool,conf) property("HBaseStore test") =storeTest(closeable) + } From ecc5f3b8bb3da5d876951189b3d5be87be31011e Mon Sep 17 00:00:00 2001 From: MansurAshraf Date: Tue, 10 Sep 2013 20:09:42 -0500 Subject: [PATCH 25/29] switched to FuturePool for blocking operations --- .../twitter/storehaus/hbase/HBaseByteArrayStore.scala | 10 ++++++---- .../com/twitter/storehaus/hbase/HBaseLongStore.scala | 10 ++++++---- .../com/twitter/storehaus/hbase/HBaseStore.scala | 11 +++++++---- .../twitter/storehaus/hbase/HBaseStringStore.scala | 10 ++++++---- .../storehaus/hbase/HBaseStringStoreProperties.scala | 2 +- 5 files changed, 26 insertions(+), 17 deletions(-) diff --git a/storehaus-hbase/src/main/scala/com/twitter/storehaus/hbase/HBaseByteArrayStore.scala b/storehaus-hbase/src/main/scala/com/twitter/storehaus/hbase/HBaseByteArrayStore.scala index ae53b29c..cca402f0 100644 --- a/storehaus-hbase/src/main/scala/com/twitter/storehaus/hbase/HBaseByteArrayStore.scala +++ b/storehaus-hbase/src/main/scala/com/twitter/storehaus/hbase/HBaseByteArrayStore.scala @@ -32,8 +32,9 @@ object HBaseByteArrayStore { column: String, createTable: Boolean, pool: HTablePool, - conf: Configuration): HBaseByteArrayStore = { - val store = new HBaseByteArrayStore(quorumNames, table, columnFamily, column, createTable,pool,conf) + conf: Configuration, + threads: Int): HBaseByteArrayStore = { + val store = new HBaseByteArrayStore(quorumNames, table, columnFamily, column, createTable, pool, conf, threads) store.validateConfiguration() store.createTableIfRequired() store @@ -43,7 +44,7 @@ object HBaseByteArrayStore { table: String, columnFamily: String, column: String, - createTable: Boolean): HBaseByteArrayStore = apply(quorumNames,table,columnFamily,column,createTable,new HTablePool(), new Configuration()) + createTable: Boolean): HBaseByteArrayStore = apply(quorumNames, table, columnFamily, column, createTable, new HTablePool(), new Configuration(), 4) } class HBaseByteArrayStore(protected val quorumNames: Seq[String], @@ -52,7 +53,8 @@ class HBaseByteArrayStore(protected val quorumNames: Seq[String], protected val column: String, protected val createTable: Boolean, protected val pool: HTablePool, - protected val conf: Configuration) extends Store[Array[Byte], Array[Byte]] with HBaseStore { + protected val conf: Configuration, + protected val threads: Int) extends Store[Array[Byte], Array[Byte]] with HBaseStore { /** get a single key from the store. * Prefer multiGet if you are getting more than one key at a time diff --git a/storehaus-hbase/src/main/scala/com/twitter/storehaus/hbase/HBaseLongStore.scala b/storehaus-hbase/src/main/scala/com/twitter/storehaus/hbase/HBaseLongStore.scala index 8d3cfc15..230050ee 100644 --- a/storehaus-hbase/src/main/scala/com/twitter/storehaus/hbase/HBaseLongStore.scala +++ b/storehaus-hbase/src/main/scala/com/twitter/storehaus/hbase/HBaseLongStore.scala @@ -33,8 +33,9 @@ object HBaseLongStore { column: String, createTable: Boolean, pool: HTablePool, - conf: Configuration): HBaseLongStore = { - val store = new HBaseLongStore(quorumNames, table, columnFamily, column, createTable, pool, conf) + conf: Configuration, + threads:Int): HBaseLongStore = { + val store = new HBaseLongStore(quorumNames, table, columnFamily, column, createTable, pool, conf,threads) store.validateConfiguration() store.createTableIfRequired() store @@ -44,7 +45,7 @@ object HBaseLongStore { table: String, columnFamily: String, column: String, - createTable: Boolean): HBaseLongStore = apply(quorumNames, table, columnFamily, column, createTable, new HTablePool(), new Configuration()) + createTable: Boolean): HBaseLongStore = apply(quorumNames, table, columnFamily, column, createTable, new HTablePool(), new Configuration(),4) } class HBaseLongStore(protected val quorumNames: Seq[String], @@ -53,7 +54,8 @@ class HBaseLongStore(protected val quorumNames: Seq[String], protected val column: String, protected val createTable: Boolean, protected val pool: HTablePool, - protected val conf: Configuration) extends Store[String, Long] with HBaseStore { + protected val conf: Configuration, + protected val threads:Int) extends Store[String, Long] with HBaseStore { /** get a single key from the store. * Prefer multiGet if you are getting more than one key at a time diff --git a/storehaus-hbase/src/main/scala/com/twitter/storehaus/hbase/HBaseStore.scala b/storehaus-hbase/src/main/scala/com/twitter/storehaus/hbase/HBaseStore.scala index f9be24a2..b583c755 100644 --- a/storehaus-hbase/src/main/scala/com/twitter/storehaus/hbase/HBaseStore.scala +++ b/storehaus-hbase/src/main/scala/com/twitter/storehaus/hbase/HBaseStore.scala @@ -22,8 +22,9 @@ import org.apache.hadoop.hbase.{HColumnDescriptor, HTableDescriptor, HBaseConfig import com.twitter.bijection.hbase.HBaseBijections._ import com.twitter.bijection.Conversion._ import com.twitter.bijection.Injection -import com.twitter.util.Future +import com.twitter.util.{FuturePool, Future} import scala.Some +import java.util.concurrent.Executors /** * @author Mansur Ashraf @@ -38,6 +39,8 @@ trait HBaseStore { protected val column: String protected val pool: HTablePool protected val conf: Configuration + protected val threads: Int + protected val futurePool = FuturePool(Executors.newFixedThreadPool(threads)) def getHBaseAdmin: HBaseAdmin = { if (conf.get("hbase.zookeeper.quorum") == null) { @@ -64,7 +67,7 @@ trait HBaseStore { require(isNotEmpty(column), "column is required") } - def getValue[K, V](key: K)(implicit keyInj: Injection[K, Array[Byte]], valueInj: Injection[V, Array[Byte]]): Future[Option[V]] = Future { + def getValue[K, V](key: K)(implicit keyInj: Injection[K, Array[Byte]], valueInj: Injection[V, Array[Byte]]): Future[Option[V]] = futurePool { val tbl = pool.getTable(table) val g = new Get(keyInj(key)) g.addColumn(columnFamily.as[StringBytes], column.as[StringBytes]) @@ -76,13 +79,13 @@ trait HBaseStore { def putValue[K, V](kv: (K, Option[V]))(implicit keyInj: Injection[K, Array[Byte]], valueInj: Injection[V, Array[Byte]]): Future[Unit] = { kv match { - case (k, Some(v)) => Future { + case (k, Some(v)) => futurePool { val p = new Put(keyInj(k)) p.add(columnFamily.as[StringBytes], column.as[StringBytes], valueInj(v)) val tbl = pool.getTable(table) tbl.put(p) } - case (k, None) => Future { + case (k, None) => futurePool { val delete = new Delete(keyInj(k)) val tbl = pool.getTable(table) tbl.delete(delete) diff --git a/storehaus-hbase/src/main/scala/com/twitter/storehaus/hbase/HBaseStringStore.scala b/storehaus-hbase/src/main/scala/com/twitter/storehaus/hbase/HBaseStringStore.scala index aa813d9d..e153093c 100644 --- a/storehaus-hbase/src/main/scala/com/twitter/storehaus/hbase/HBaseStringStore.scala +++ b/storehaus-hbase/src/main/scala/com/twitter/storehaus/hbase/HBaseStringStore.scala @@ -33,8 +33,9 @@ object HBaseStringStore { column: String, createTable: Boolean, pool: HTablePool, - conf: Configuration): HBaseStringStore = { - val store = new HBaseStringStore(quorumNames, table, columnFamily, column, createTable, pool, conf) + conf: Configuration, + threads: Int): HBaseStringStore = { + val store = new HBaseStringStore(quorumNames, table, columnFamily, column, createTable, pool, conf, threads) store.validateConfiguration() store.createTableIfRequired() store @@ -44,7 +45,7 @@ object HBaseStringStore { table: String, columnFamily: String, column: String, - createTable: Boolean): HBaseStringStore = apply(quorumNames, table, columnFamily, column, createTable, new HTablePool(), new Configuration()) + createTable: Boolean): HBaseStringStore = apply(quorumNames, table, columnFamily, column, createTable, new HTablePool(), new Configuration(), 4) } class HBaseStringStore(protected val quorumNames: Seq[String], @@ -53,7 +54,8 @@ class HBaseStringStore(protected val quorumNames: Seq[String], protected val column: String, protected val createTable: Boolean, protected val pool: HTablePool, - protected val conf: Configuration) extends Store[String, String] with HBaseStore { + protected val conf: Configuration, + protected val threads: Int) extends Store[String, String] with HBaseStore { /** get a single key from the store. * Prefer multiGet if you are getting more than one key at a time */ diff --git a/storehaus-hbase/src/test/scala/com/twitter/storehaus/hbase/HBaseStringStoreProperties.scala b/storehaus-hbase/src/test/scala/com/twitter/storehaus/hbase/HBaseStringStoreProperties.scala index b0382ec5..ec4ce3f2 100644 --- a/storehaus-hbase/src/test/scala/com/twitter/storehaus/hbase/HBaseStringStoreProperties.scala +++ b/storehaus-hbase/src/test/scala/com/twitter/storehaus/hbase/HBaseStringStoreProperties.scala @@ -63,7 +63,7 @@ with DefaultHBaseCluster[Store[String, String]] { putStoreTest(store, validPairs) && multiPutStoreTest(store, validPairs) testingUtil.startMiniCluster() - val closeable =HBaseStringStore(quorumNames, table, columnFamily, column, createTable,pool,conf) + val closeable =HBaseStringStore(quorumNames, table, columnFamily, column, createTable,pool,conf,4) property("HBaseStore test") =storeTest(closeable) } From 2cbef56e33425856903bf30902c21df8c6362901 Mon Sep 17 00:00:00 2001 From: MansurAshraf Date: Tue, 10 Sep 2013 20:11:54 -0500 Subject: [PATCH 26/29] removed unused imports --- .../twitter/storehaus/hbase/HBaseStringStoreProperties.scala | 2 -- 1 file changed, 2 deletions(-) diff --git a/storehaus-hbase/src/test/scala/com/twitter/storehaus/hbase/HBaseStringStoreProperties.scala b/storehaus-hbase/src/test/scala/com/twitter/storehaus/hbase/HBaseStringStoreProperties.scala index ec4ce3f2..6506bd95 100644 --- a/storehaus-hbase/src/test/scala/com/twitter/storehaus/hbase/HBaseStringStoreProperties.scala +++ b/storehaus-hbase/src/test/scala/com/twitter/storehaus/hbase/HBaseStringStoreProperties.scala @@ -17,12 +17,10 @@ package com.twitter.storehaus.hbase import org.scalacheck.{Arbitrary, Gen, Properties} -import com.twitter.storehaus.testing.CloseableCleanup import com.twitter.storehaus.{FutureOps, Store} import com.twitter.storehaus.testing.generator.NonEmpty import org.scalacheck.Prop._ import com.twitter.util.Await -import org.apache.hadoop.hbase.HBaseTestingUtility /** * @author MansurAshraf From 3f6422580ba0b4e72d728ce5e87cdbc54e778d83 Mon Sep 17 00:00:00 2001 From: Sam Ritchie Date: Wed, 11 Sep 2013 14:34:27 -0700 Subject: [PATCH 27/29] add changes, bump version --- CHANGES.md | 5 +++++ README.md | 2 +- version.sbt | 2 +- 3 files changed, 7 insertions(+), 2 deletions(-) diff --git a/CHANGES.md b/CHANGES.md index 6da2bf4f..58f3500c 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -1,5 +1,10 @@ # storehaus # +### Version.0.5.1 ### + +Add storehaus-hbase and upgrade to bijection 0.5.3: https://github.com/twitter/storehaus/pull/139 +Fix mutable TTL cache bug: https://github.com/twitter/storehaus/pull/136 + ### Version.0.5.0 ### * Reuse prepared statements in mysql: https://github.com/twitter/storehaus/issues/93 diff --git a/README.md b/README.md index f2dd55c9..8c310495 100644 --- a/README.md +++ b/README.md @@ -94,7 +94,7 @@ See the [current API documentation](http://twitter.github.com/storehaus) for mor ## Maven -Storehaus modules are available on maven central. The current groupid and version for all modules is, respectively, `"com.twitter"` and `0.5.0`. +Storehaus modules are available on maven central. The current groupid and version for all modules is, respectively, `"com.twitter"` and `0.5.1`. Current published artifacts are diff --git a/version.sbt b/version.sbt index 5280cb4f..d8e39804 100644 --- a/version.sbt +++ b/version.sbt @@ -1,2 +1,2 @@ -version in ThisBuild := "0.5.1-SNAPSHOT" +version in ThisBuild := "0.5.1" From b77554a2f030eb2ed92734016ec385fb646cfe78 Mon Sep 17 00:00:00 2001 From: Sam Ritchie Date: Wed, 11 Sep 2013 15:01:51 -0700 Subject: [PATCH 28/29] Update CHANGES.md --- CHANGES.md | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/CHANGES.md b/CHANGES.md index 58f3500c..c3ed3aa8 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -2,8 +2,8 @@ ### Version.0.5.1 ### -Add storehaus-hbase and upgrade to bijection 0.5.3: https://github.com/twitter/storehaus/pull/139 -Fix mutable TTL cache bug: https://github.com/twitter/storehaus/pull/136 +* Add storehaus-hbase and upgrade to bijection 0.5.3: https://github.com/twitter/storehaus/pull/139 +* Fix mutable TTL cache bug: https://github.com/twitter/storehaus/pull/136 ### Version.0.5.0 ### From a89455d703aca1dc56f5d399afd485a68d35e618 Mon Sep 17 00:00:00 2001 From: Sam Ritchie Date: Wed, 11 Sep 2013 15:43:42 -0700 Subject: [PATCH 29/29] Add hbase to README --- README.md | 3 +++ 1 file changed, 3 insertions(+) diff --git a/README.md b/README.md index 8c310495..8991eee5 100644 --- a/README.md +++ b/README.md @@ -79,6 +79,7 @@ Storehaus provides a number of modules wrapping existing key-value stores. Enric * [Storehaus-memcache](http://twitter.github.com/storehaus/#com.twitter.storehaus.memcache.MemcacheStore) (wraps Twitter's [finagle-memcached](https://github.com/twitter/finagle/tree/master/finagle-memcached) library) * [Storehaus-mysql](http://twitter.github.com/storehaus/#com.twitter.storehaus.mysql.MySQLStore) (wraps Twitter's [finagle-mysql](https://github.com/twitter/finagle/tree/master/finagle-mysql) library) * [Storehaus-redis](http://twitter.github.com/storehaus/#com.twitter.storehaus.redis.RedisStore) (wraps Twitter's [finagle-redis](https://github.com/twitter/finagle/tree/master/finagle-redis) library) + * [Storehaus-hbase](http://twitter.github.com/storehaus/#com.twitter.storehaus.hbase.HBaseStore) #### Planned Modules @@ -106,6 +107,8 @@ Current published artifacts are * `storehaus-memcache_2.10` * `storehaus-mysql_2.9.3` * `storehaus-mysql_2.10` +* `storehaus-hbase_2.9.3` +* `storehaus-hbase_2.10` * `storehaus-redis_2.9.3` * `storehaus-redis_2.10` * `storehaus-cache_2.9.3`