diff --git a/.travis.yml b/.travis.yml index befbde46..9c859b5f 100644 --- a/.travis.yml +++ b/.travis.yml @@ -8,4 +8,5 @@ before_script: - mysql -u root -e "grant all on storehaus_test.* to 'storehaususer'@'localhost';" services: - redis-server - - memcache \ No newline at end of file + - memcache +script: umask 0022 && sbt ++$TRAVIS_SCALA_VERSION test \ No newline at end of file diff --git a/project/Build.scala b/project/Build.scala index bb81bb01..3daa1bda 100644 --- a/project/Build.scala +++ b/project/Build.scala @@ -1,3 +1,19 @@ +/* + * Copyright 2013 Twitter inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package storehaus import sbt._ @@ -96,7 +112,7 @@ object StorehausBuild extends Build { .map { s => "com.twitter" % ("storehaus-" + s + "_2.9.3") % "0.5.0" } val algebirdVersion = "0.2.0" - val bijectionVersion = "0.5.2" + val bijectionVersion = "0.5.3" lazy val storehaus = Project( id = "storehaus", @@ -113,6 +129,7 @@ object StorehausBuild extends Build { storehausMemcache, storehausMySQL, storehausRedis, + storehausHBase, storehausTesting ) @@ -137,7 +154,7 @@ object StorehausBuild extends Build { lazy val storehausAlgebra = module("algebra").settings( libraryDependencies += "com.twitter" %% "algebird-core" % algebirdVersion, libraryDependencies += "com.twitter" %% "algebird-util" % algebirdVersion, - libraryDependencies += "com.twitter" %% "bijection-algebird" % bijectionVersion + libraryDependencies += "com.twitter" %% "algebird-bijection" % algebirdVersion ).dependsOn(storehausCore % "test->test;compile->compile") lazy val storehausMemcache = module("memcache").settings( @@ -159,6 +176,18 @@ object StorehausBuild extends Build { parallelExecution in Test := false ).dependsOn(storehausAlgebra % "test->test;compile->compile") + lazy val storehausHBase= module("hbase").settings( + libraryDependencies ++= Seq( + "com.twitter" %% "algebird-core" % algebirdVersion, + "com.twitter" %% "bijection-core" % bijectionVersion, + "com.twitter" %% "bijection-hbase" % bijectionVersion , + "org.apache.hbase" % "hbase" % "0.94.6" % "provided->default" classifier "tests" classifier "", + "org.apache.hadoop" % "hadoop-core" % "1.2.0" % "provided->default", + "org.apache.hadoop" % "hadoop-test" % "1.2.0" % "test" + ), + parallelExecution in Test := false + ).dependsOn(storehausAlgebra % "test->test;compile->compile") + val storehausTesting = Project( id = "storehaus-testing", base = file("storehaus-testing"), diff --git a/storehaus-algebra/src/main/scala/com/twitter/storehaus/algebra/ConvertedMergeableStore.scala b/storehaus-algebra/src/main/scala/com/twitter/storehaus/algebra/ConvertedMergeableStore.scala index 47f05255..463807cc 100644 --- a/storehaus-algebra/src/main/scala/com/twitter/storehaus/algebra/ConvertedMergeableStore.scala +++ b/storehaus-algebra/src/main/scala/com/twitter/storehaus/algebra/ConvertedMergeableStore.scala @@ -32,7 +32,7 @@ class ConvertedMergeableStore[K1, -K2, V1, V2](store: MergeableStore[K1, V1])(kf (implicit bij: ImplicitBijection[V2, V1]) extends com.twitter.storehaus.ConvertedStore[K1, K2, V1, V2](store)(kfn)(Injection.fromBijection(bij.bijection)) with MergeableStore[K2, V2] { - import com.twitter.bijection.algebird.AlgebirdBijections._ + import com.twitter.algebird.bijection.AlgebirdBijections._ override def monoid: Monoid[V2] = store.monoid.as[Monoid[V2]] diff --git a/storehaus-algebra/src/test/scala/com/twitter/storehaus/algebra/MergeableStoreProperties.scala b/storehaus-algebra/src/test/scala/com/twitter/storehaus/algebra/MergeableStoreProperties.scala index 4d4a74f4..eeec71b4 100644 --- a/storehaus-algebra/src/test/scala/com/twitter/storehaus/algebra/MergeableStoreProperties.scala +++ b/storehaus-algebra/src/test/scala/com/twitter/storehaus/algebra/MergeableStoreProperties.scala @@ -17,7 +17,7 @@ package com.twitter.storehaus.algebra import com.twitter.algebird.{ MapAlgebra, Monoid, SummingQueue } -import com.twitter.bijection.algebird.AlgebirdBijections._ +import com.twitter.algebird.bijection.AlgebirdBijections._ import com.twitter.bijection.Injection import com.twitter.storehaus._ import com.twitter.util.Await @@ -93,7 +93,7 @@ object MergeableStoreProperties extends Properties("MergeableStore") { property("Converted MergeableStore obeys the mergeable store laws") = { // We are using a weird monoid on Int here: - import com.twitter.bijection.algebird.AlgebirdBijections._ + import com.twitter.algebird.bijection.AlgebirdBijections._ import com.twitter.bijection.Conversion.asMethod implicit val monoid : Monoid[Int] = implicitly[Monoid[(Short,Short)]].as[Monoid[Int]] diff --git a/storehaus-hbase/src/main/scala/com/twitter/storehaus/hbase/HBaseByteArrayStore.scala b/storehaus-hbase/src/main/scala/com/twitter/storehaus/hbase/HBaseByteArrayStore.scala new file mode 100644 index 00000000..cca402f0 --- /dev/null +++ b/storehaus-hbase/src/main/scala/com/twitter/storehaus/hbase/HBaseByteArrayStore.scala @@ -0,0 +1,80 @@ +/* + * Copyright 2013 Twitter inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.twitter.storehaus.hbase + +import org.apache.hadoop.hbase.client.HTablePool +import com.twitter.storehaus.Store +import com.twitter.util.Future +import org.apache.hadoop.conf.Configuration + +/** + * @author MansurAshraf + * @since 9/8/13 + */ +object HBaseByteArrayStore { + def apply(quorumNames: Seq[String], + table: String, + columnFamily: String, + column: String, + createTable: Boolean, + pool: HTablePool, + conf: Configuration, + threads: Int): HBaseByteArrayStore = { + val store = new HBaseByteArrayStore(quorumNames, table, columnFamily, column, createTable, pool, conf, threads) + store.validateConfiguration() + store.createTableIfRequired() + store + } + + def apply(quorumNames: Seq[String], + table: String, + columnFamily: String, + column: String, + createTable: Boolean): HBaseByteArrayStore = apply(quorumNames, table, columnFamily, column, createTable, new HTablePool(), new Configuration(), 4) +} + +class HBaseByteArrayStore(protected val quorumNames: Seq[String], + protected val table: String, + protected val columnFamily: String, + protected val column: String, + protected val createTable: Boolean, + protected val pool: HTablePool, + protected val conf: Configuration, + protected val threads: Int) extends Store[Array[Byte], Array[Byte]] with HBaseStore { + + /** get a single key from the store. + * Prefer multiGet if you are getting more than one key at a time + */ + override def get(k: Array[Byte]): Future[Option[Array[Byte]]] = { + getValue(k) + } + + /** + * replace a value + * Delete is the same as put((k,None)) + */ + override def put(kv: (Array[Byte], Option[Array[Byte]])): Future[Unit] = { + putValue(kv) + } + + /** Close this store and release any resources. + * It is undefined what happens on get/multiGet after close + */ + override def close { + pool.close() + } +} diff --git a/storehaus-hbase/src/main/scala/com/twitter/storehaus/hbase/HBaseLongStore.scala b/storehaus-hbase/src/main/scala/com/twitter/storehaus/hbase/HBaseLongStore.scala new file mode 100644 index 00000000..230050ee --- /dev/null +++ b/storehaus-hbase/src/main/scala/com/twitter/storehaus/hbase/HBaseLongStore.scala @@ -0,0 +1,88 @@ +/* + * Copyright 2013 Twitter inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.twitter.storehaus.hbase + +import org.apache.hadoop.hbase.client.HTablePool +import com.twitter.storehaus.Store +import com.twitter.util.Future +import com.twitter.bijection.Injection._ +import org.apache.hadoop.conf.Configuration + +/** + * @author Mansur Ashraf + * @since 9/8/13 + */ +object HBaseLongStore { + def apply(quorumNames: Seq[String], + table: String, + columnFamily: String, + column: String, + createTable: Boolean, + pool: HTablePool, + conf: Configuration, + threads:Int): HBaseLongStore = { + val store = new HBaseLongStore(quorumNames, table, columnFamily, column, createTable, pool, conf,threads) + store.validateConfiguration() + store.createTableIfRequired() + store + } + + def apply(quorumNames: Seq[String], + table: String, + columnFamily: String, + column: String, + createTable: Boolean): HBaseLongStore = apply(quorumNames, table, columnFamily, column, createTable, new HTablePool(), new Configuration(),4) +} + +class HBaseLongStore(protected val quorumNames: Seq[String], + protected val table: String, + protected val columnFamily: String, + protected val column: String, + protected val createTable: Boolean, + protected val pool: HTablePool, + protected val conf: Configuration, + protected val threads:Int) extends Store[String, Long] with HBaseStore { + + /** get a single key from the store. + * Prefer multiGet if you are getting more than one key at a time + */ + override def get(k: String): Future[Option[Long]] = { + import com.twitter.bijection.hbase.HBaseBijections._ + implicit val stringInj = fromBijectionRep[String, StringBytes] + implicit val LongInj = fromBijectionRep[Long, LongBytes] + getValue[String, Long](k) + } + + /** + * replace a value + * Delete is the same as put((k,None)) + */ + override def put(kv: (String, Option[Long])): Future[Unit] = { + import com.twitter.bijection.hbase.HBaseBijections._ + implicit val stringInj = fromBijectionRep[String, StringBytes] + implicit val LongInj = fromBijectionRep[Long, LongBytes] + putValue(kv) + } + + /** Close this store and release any resources. + * It is undefined what happens on get/multiGet after close + */ + override def close { + pool.close() + } +} + diff --git a/storehaus-hbase/src/main/scala/com/twitter/storehaus/hbase/HBaseStore.scala b/storehaus-hbase/src/main/scala/com/twitter/storehaus/hbase/HBaseStore.scala new file mode 100644 index 00000000..b583c755 --- /dev/null +++ b/storehaus-hbase/src/main/scala/com/twitter/storehaus/hbase/HBaseStore.scala @@ -0,0 +1,95 @@ +/* + * Copyright 2013 Twitter inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.twitter.storehaus.hbase + +import org.apache.hadoop.hbase.client._ +import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.hbase.{HColumnDescriptor, HTableDescriptor, HBaseConfiguration} +import com.twitter.bijection.hbase.HBaseBijections._ +import com.twitter.bijection.Conversion._ +import com.twitter.bijection.Injection +import com.twitter.util.{FuturePool, Future} +import scala.Some +import java.util.concurrent.Executors + +/** + * @author Mansur Ashraf + * @since 9/8/13 + */ +trait HBaseStore { + + protected val quorumNames: Seq[String] + protected val createTable: Boolean + protected val table: String + protected val columnFamily: String + protected val column: String + protected val pool: HTablePool + protected val conf: Configuration + protected val threads: Int + protected val futurePool = FuturePool(Executors.newFixedThreadPool(threads)) + + def getHBaseAdmin: HBaseAdmin = { + if (conf.get("hbase.zookeeper.quorum") == null) { + conf.set("hbase.zookeeper.quorum", quorumNames.mkString(",")) + } + val hbaseConf = HBaseConfiguration.create(conf) + new HBaseAdmin(hbaseConf) + } + + def createTableIfRequired() { + val hbaseAdmin = getHBaseAdmin + if (createTable && !hbaseAdmin.tableExists(table)) { + val tableDescriptor = new HTableDescriptor(table) + tableDescriptor.addFamily(new HColumnDescriptor(columnFamily)) + hbaseAdmin.createTable(tableDescriptor) + } + } + + def validateConfiguration() { + import org.apache.commons.lang.StringUtils.isNotEmpty + + require(!quorumNames.isEmpty, "Zookeeper quorums are required") + require(isNotEmpty(columnFamily), "column family is required") + require(isNotEmpty(column), "column is required") + } + + def getValue[K, V](key: K)(implicit keyInj: Injection[K, Array[Byte]], valueInj: Injection[V, Array[Byte]]): Future[Option[V]] = futurePool { + val tbl = pool.getTable(table) + val g = new Get(keyInj(key)) + g.addColumn(columnFamily.as[StringBytes], column.as[StringBytes]) + + val result = tbl.get(g) + val value = result.getValue(columnFamily.as[StringBytes], column.as[StringBytes]) + Option(value).map(v => valueInj.invert(v).get) + } + + def putValue[K, V](kv: (K, Option[V]))(implicit keyInj: Injection[K, Array[Byte]], valueInj: Injection[V, Array[Byte]]): Future[Unit] = { + kv match { + case (k, Some(v)) => futurePool { + val p = new Put(keyInj(k)) + p.add(columnFamily.as[StringBytes], column.as[StringBytes], valueInj(v)) + val tbl = pool.getTable(table) + tbl.put(p) + } + case (k, None) => futurePool { + val delete = new Delete(keyInj(k)) + val tbl = pool.getTable(table) + tbl.delete(delete) + } + } + } +} diff --git a/storehaus-hbase/src/main/scala/com/twitter/storehaus/hbase/HBaseStringStore.scala b/storehaus-hbase/src/main/scala/com/twitter/storehaus/hbase/HBaseStringStore.scala new file mode 100644 index 00000000..e153093c --- /dev/null +++ b/storehaus-hbase/src/main/scala/com/twitter/storehaus/hbase/HBaseStringStore.scala @@ -0,0 +1,84 @@ +/* + * Copyright 2013 Twitter inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.twitter.storehaus.hbase + +import org.apache.hadoop.hbase.client._ +import com.twitter.storehaus.Store +import com.twitter.util.Future +import com.twitter.bijection.Injection._ +import org.apache.hadoop.conf.Configuration + +/** + * @author MansurAshraf + * @since 9/7/13 + */ +object HBaseStringStore { + def apply(quorumNames: Seq[String], + table: String, + columnFamily: String, + column: String, + createTable: Boolean, + pool: HTablePool, + conf: Configuration, + threads: Int): HBaseStringStore = { + val store = new HBaseStringStore(quorumNames, table, columnFamily, column, createTable, pool, conf, threads) + store.validateConfiguration() + store.createTableIfRequired() + store + } + + def apply(quorumNames: Seq[String], + table: String, + columnFamily: String, + column: String, + createTable: Boolean): HBaseStringStore = apply(quorumNames, table, columnFamily, column, createTable, new HTablePool(), new Configuration(), 4) +} + +class HBaseStringStore(protected val quorumNames: Seq[String], + protected val table: String, + protected val columnFamily: String, + protected val column: String, + protected val createTable: Boolean, + protected val pool: HTablePool, + protected val conf: Configuration, + protected val threads: Int) extends Store[String, String] with HBaseStore { + /** get a single key from the store. + * Prefer multiGet if you are getting more than one key at a time + */ + override def get(k: String): Future[Option[String]] = { + import com.twitter.bijection.hbase.HBaseBijections._ + implicit val stringInj = fromBijectionRep[String, StringBytes] + getValue[String, String](k) + } + + /** + * replace a value + * Delete is the same as put((k,None)) + */ + override def put(kv: (String, Option[String])): Future[Unit] = { + import com.twitter.bijection.hbase.HBaseBijections._ + implicit val stringInj = fromBijectionRep[String, StringBytes] + putValue(kv) + } + + /** Close this store and release any resources. + * It is undefined what happens on get/multiGet after close + */ + override def close { + pool.close() + } +} diff --git a/storehaus-hbase/src/test/scala/com/twitter/storehaus/hbase/DefaultHBaseCluster.scala b/storehaus-hbase/src/test/scala/com/twitter/storehaus/hbase/DefaultHBaseCluster.scala new file mode 100644 index 00000000..d15b1de5 --- /dev/null +++ b/storehaus-hbase/src/test/scala/com/twitter/storehaus/hbase/DefaultHBaseCluster.scala @@ -0,0 +1,43 @@ +/* + * Copyright 2013 Twitter inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.twitter.storehaus.hbase + +import org.apache.hadoop.hbase.HBaseTestingUtility +import org.apache.hadoop.hbase.client.HTablePool +import com.twitter.storehaus.testing.CloseableCleanup +import java.io.Closeable + +/** + * @author MansurAshraf + * @since 9/8/13 + */ +trait DefaultHBaseCluster[C <: Closeable] extends CloseableCleanup[C] { + val quorumNames = Seq("localhost:2181") + val table = "summing_bird" + val columnFamily = "sb" + val column = "aggregate" + val createTable = true + val testingUtil = new HBaseTestingUtility() + val conf = testingUtil.getConfiguration + val pool = new HTablePool(conf, 1) + + override def cleanup() { + super.cleanup() + /* testingUtil.shutdownMiniZKCluster() + testingUtil.shutdownMiniCluster()*/ + } +} diff --git a/storehaus-hbase/src/test/scala/com/twitter/storehaus/hbase/HBaseStringStoreProperties.scala b/storehaus-hbase/src/test/scala/com/twitter/storehaus/hbase/HBaseStringStoreProperties.scala new file mode 100644 index 00000000..6506bd95 --- /dev/null +++ b/storehaus-hbase/src/test/scala/com/twitter/storehaus/hbase/HBaseStringStoreProperties.scala @@ -0,0 +1,67 @@ +/* + * Copyright 2013 Twitter inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.twitter.storehaus.hbase + +import org.scalacheck.{Arbitrary, Gen, Properties} +import com.twitter.storehaus.{FutureOps, Store} +import com.twitter.storehaus.testing.generator.NonEmpty +import org.scalacheck.Prop._ +import com.twitter.util.Await + +/** + * @author MansurAshraf + * @since 9/8/13 + */ +object HBaseStringStoreProperties extends Properties("HBaseStore") +with DefaultHBaseCluster[Store[String, String]] { + def validPairs: Gen[List[(String, Option[String])]] = + NonEmpty.Pairing.alphaStrs() + + def baseTest[K: Arbitrary, V: Arbitrary : Equiv](store: Store[K, V], validPairs: Gen[List[(K, Option[V])]]) + (put: (Store[K, V], List[(K, Option[V])]) => Unit) = + forAll(validPairs) { + (examples: List[(K, Option[V])]) => + put(store, examples) + examples.toMap.forall { + case (k, optV) => + val res = Await.result(store.get(k)) + Equiv[Option[V]].equiv(res, optV) + } + } + + def putStoreTest[K: Arbitrary, V: Arbitrary : Equiv](store: Store[K, V], validPairs: Gen[List[(K, Option[V])]]) = + baseTest(store, validPairs) { + (s, pairs) => + pairs.foreach { + case (k, v) => Await.result(s.put((k, v))) + } + } + + def multiPutStoreTest[K: Arbitrary, V: Arbitrary : Equiv](store: Store[K, V], validPairs: Gen[List[(K, Option[V])]]) = + baseTest(store, validPairs) { + (s, pairs) => + Await.result(FutureOps.mapCollect(s.multiPut(pairs.toMap))) + } + + def storeTest(store: Store[String, String]) = + putStoreTest(store, validPairs) && multiPutStoreTest(store, validPairs) + + testingUtil.startMiniCluster() + val closeable =HBaseStringStore(quorumNames, table, columnFamily, column, createTable,pool,conf,4) + property("HBaseStore test") =storeTest(closeable) + +}