From d027d2e703674f748d73b9d378f235754e23e8ac Mon Sep 17 00:00:00 2001 From: MansurAshraf Date: Sat, 7 Sep 2013 22:49:40 -0500 Subject: [PATCH 01/10] Added storehaus-hbase, upgraded bijection to 0.5.3 --- project/Build.scala | 14 ++++++++++++-- .../algebra/ConvertedMergeableStore.scala | 2 +- .../algebra/MergeableStoreProperties.scala | 4 ++-- 3 files changed, 15 insertions(+), 5 deletions(-) diff --git a/project/Build.scala b/project/Build.scala index bb81bb01..5ab3cbe6 100644 --- a/project/Build.scala +++ b/project/Build.scala @@ -96,7 +96,7 @@ object StorehausBuild extends Build { .map { s => "com.twitter" % ("storehaus-" + s + "_2.9.3") % "0.5.0" } val algebirdVersion = "0.2.0" - val bijectionVersion = "0.5.2" + val bijectionVersion = "0.5.3" lazy val storehaus = Project( id = "storehaus", @@ -113,6 +113,7 @@ object StorehausBuild extends Build { storehausMemcache, storehausMySQL, storehausRedis, + storehausHBase, storehausTesting ) @@ -137,7 +138,7 @@ object StorehausBuild extends Build { lazy val storehausAlgebra = module("algebra").settings( libraryDependencies += "com.twitter" %% "algebird-core" % algebirdVersion, libraryDependencies += "com.twitter" %% "algebird-util" % algebirdVersion, - libraryDependencies += "com.twitter" %% "bijection-algebird" % bijectionVersion + libraryDependencies += "com.twitter" %% "algebird-bijection" % algebirdVersion ).dependsOn(storehausCore % "test->test;compile->compile") lazy val storehausMemcache = module("memcache").settings( @@ -159,6 +160,15 @@ object StorehausBuild extends Build { parallelExecution in Test := false ).dependsOn(storehausAlgebra % "test->test;compile->compile") + lazy val storehausHBase= module("hbase").settings( + libraryDependencies ++= Seq( + "com.twitter" %% "algebird-core" % algebirdVersion, + "com.twitter" %% "bijection-core" % bijectionVersion, + "com.twitter" %% "bijection-hbase" % bijectionVersion + ), + parallelExecution in Test := false + ).dependsOn(storehausAlgebra % "test->test;compile->compile") + val storehausTesting = Project( id = "storehaus-testing", base = file("storehaus-testing"), diff --git a/storehaus-algebra/src/main/scala/com/twitter/storehaus/algebra/ConvertedMergeableStore.scala b/storehaus-algebra/src/main/scala/com/twitter/storehaus/algebra/ConvertedMergeableStore.scala index 47f05255..463807cc 100644 --- a/storehaus-algebra/src/main/scala/com/twitter/storehaus/algebra/ConvertedMergeableStore.scala +++ b/storehaus-algebra/src/main/scala/com/twitter/storehaus/algebra/ConvertedMergeableStore.scala @@ -32,7 +32,7 @@ class ConvertedMergeableStore[K1, -K2, V1, V2](store: MergeableStore[K1, V1])(kf (implicit bij: ImplicitBijection[V2, V1]) extends com.twitter.storehaus.ConvertedStore[K1, K2, V1, V2](store)(kfn)(Injection.fromBijection(bij.bijection)) with MergeableStore[K2, V2] { - import com.twitter.bijection.algebird.AlgebirdBijections._ + import com.twitter.algebird.bijection.AlgebirdBijections._ override def monoid: Monoid[V2] = store.monoid.as[Monoid[V2]] diff --git a/storehaus-algebra/src/test/scala/com/twitter/storehaus/algebra/MergeableStoreProperties.scala b/storehaus-algebra/src/test/scala/com/twitter/storehaus/algebra/MergeableStoreProperties.scala index 4d4a74f4..eeec71b4 100644 --- a/storehaus-algebra/src/test/scala/com/twitter/storehaus/algebra/MergeableStoreProperties.scala +++ b/storehaus-algebra/src/test/scala/com/twitter/storehaus/algebra/MergeableStoreProperties.scala @@ -17,7 +17,7 @@ package com.twitter.storehaus.algebra import com.twitter.algebird.{ MapAlgebra, Monoid, SummingQueue } -import com.twitter.bijection.algebird.AlgebirdBijections._ +import com.twitter.algebird.bijection.AlgebirdBijections._ import com.twitter.bijection.Injection import com.twitter.storehaus._ import com.twitter.util.Await @@ -93,7 +93,7 @@ object MergeableStoreProperties extends Properties("MergeableStore") { property("Converted MergeableStore obeys the mergeable store laws") = { // We are using a weird monoid on Int here: - import com.twitter.bijection.algebird.AlgebirdBijections._ + import com.twitter.algebird.bijection.AlgebirdBijections._ import com.twitter.bijection.Conversion.asMethod implicit val monoid : Monoid[Int] = implicitly[Monoid[(Short,Short)]].as[Monoid[Int]] From fcdbdd89d3c5f81351dbfca839f6139c46c14e4e Mon Sep 17 00:00:00 2001 From: MansurAshraf Date: Sun, 8 Sep 2013 22:13:37 -0500 Subject: [PATCH 02/10] added hbase dependencies --- project/Build.scala | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/project/Build.scala b/project/Build.scala index 5ab3cbe6..dce88423 100644 --- a/project/Build.scala +++ b/project/Build.scala @@ -164,7 +164,9 @@ object StorehausBuild extends Build { libraryDependencies ++= Seq( "com.twitter" %% "algebird-core" % algebirdVersion, "com.twitter" %% "bijection-core" % bijectionVersion, - "com.twitter" %% "bijection-hbase" % bijectionVersion + "com.twitter" %% "bijection-hbase" % bijectionVersion , + "org.apache.hbase" % "hbase" % "0.94.4" % "provided->default", + "org.apache.hadoop" % "hadoop-core" % "1.0.4" % "provided->default" ), parallelExecution in Test := false ).dependsOn(storehausAlgebra % "test->test;compile->compile") From c602eeced8004b0c0e7b615fb89fd16e5d483237 Mon Sep 17 00:00:00 2001 From: MansurAshraf Date: Sun, 8 Sep 2013 22:34:39 -0500 Subject: [PATCH 03/10] added hbase module --- .../storehaus/hbase/HBaseLongStore.scala | 93 ++++++++++++++++++ .../twitter/storehaus/hbase/HBaseStore.scala | 90 +++++++++++++++++ .../storehaus/hbase/HBaseStoreConfig.scala | 60 ++++++++++++ .../storehaus/hbase/HBaseStringStore.scala | 98 +++++++++++++++++++ .../storehaus/hbase/DefaultHBaseConfig.scala | 29 ++++++ .../hbase/HBaseLongStoreProperties.scala | 43 ++++++++ .../hbase/HBaseStringStoreProperties.scala | 71 ++++++++++++++ 7 files changed, 484 insertions(+) create mode 100644 storehaus-hbase/src/main/scala/com/twitter/storehaus/hbase/HBaseLongStore.scala create mode 100644 storehaus-hbase/src/main/scala/com/twitter/storehaus/hbase/HBaseStore.scala create mode 100644 storehaus-hbase/src/main/scala/com/twitter/storehaus/hbase/HBaseStoreConfig.scala create mode 100644 storehaus-hbase/src/main/scala/com/twitter/storehaus/hbase/HBaseStringStore.scala create mode 100644 storehaus-hbase/src/test/scala/com/twitter/storehaus/hbase/DefaultHBaseConfig.scala create mode 100644 storehaus-hbase/src/test/scala/com/twitter/storehaus/hbase/HBaseLongStoreProperties.scala create mode 100644 storehaus-hbase/src/test/scala/com/twitter/storehaus/hbase/HBaseStringStoreProperties.scala diff --git a/storehaus-hbase/src/main/scala/com/twitter/storehaus/hbase/HBaseLongStore.scala b/storehaus-hbase/src/main/scala/com/twitter/storehaus/hbase/HBaseLongStore.scala new file mode 100644 index 00000000..c8ac62dd --- /dev/null +++ b/storehaus-hbase/src/main/scala/com/twitter/storehaus/hbase/HBaseLongStore.scala @@ -0,0 +1,93 @@ +/* + * Copyright 2013 Twitter inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.twitter.storehaus.hbase + +import org.apache.hadoop.hbase.client.{Delete, Put, Result, HTablePool} +import com.twitter.storehaus.Store +import org.apache.commons.lang.StringUtils._ +import com.twitter.util.Future +import com.twitter.bijection.hbase.HBaseBijections._ +import com.twitter.bijection.Bijection +import com.twitter.bijection.Conversion._ + +/** + * @author MansurAshraf + * @since 9/8/13 + */ +object HBaseLongStore { + def apply(quorumNames: String, table: String, columnFamily: String, column: String, createTable: Boolean): HBaseLongStore = { + val store = new HBaseLongStore(quorumNames, table, columnFamily, column, createTable, new HTablePool()) + store.createTableIfRequired() + store + } +} + +class HBaseLongStore(val quorumNames: String, val table: String, val columnFamily: String, val column: String, val createTable: Boolean, val pool: HTablePool) extends Store[String, Long] with HBaseStoreConfig { + + require(isNotEmpty(quorumNames), "Zookeeper quorums are required") + require(isNotEmpty(columnFamily), "column family is required") + require(isNotEmpty(column), "column is required") + + + /** get a single key from the store. + * Prefer multiGet if you are getting more than one key at a time + */ + override def get(k: String): Future[Option[Long]] = Future[Option[Long]] { + val tbl = pool.getTable(table) + val g = createGetRequest(k.as[StringBytes]) + val result = tbl.get(g) + extractValue(result) + } + + + /** + * replace a value + * Delete is the same as put((k,None)) + */ + override def put(kv: (String, Option[Long])): Future[Unit] = { + kv match { + case (k, Some(v)) => { + Future { + val p = new Put(k.as[StringBytes]) + p.add(columnFamily.as[StringBytes], column.as[StringBytes], v.as[LongBytes]) + val tbl = pool.getTable(table) + tbl.put(p) + } + } + case (k, None) => Future { + val delete = new Delete(k.as[StringBytes]) + val tbl = pool.getTable(table) + tbl.delete(delete) + } + } + + } + + /** Close this store and release any resources. + * It is undefined what happens on get/multiGet after close + */ + override def close { + pool.close() + } + + def extractValue(result: Result): Option[Long] = { + val value = result.getValue(columnFamily.as[StringBytes], column.as[StringBytes]) + Option(value) map (v => Bijection.invert[Long, LongBytes](v.asInstanceOf[LongBytes])) + } + +} + diff --git a/storehaus-hbase/src/main/scala/com/twitter/storehaus/hbase/HBaseStore.scala b/storehaus-hbase/src/main/scala/com/twitter/storehaus/hbase/HBaseStore.scala new file mode 100644 index 00000000..15ba9302 --- /dev/null +++ b/storehaus-hbase/src/main/scala/com/twitter/storehaus/hbase/HBaseStore.scala @@ -0,0 +1,90 @@ +/* + * Copyright 2013 Twitter inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.twitter.storehaus.hbase + +import org.apache.hadoop.hbase.client.{Delete, Put, Result, HTablePool} +import com.twitter.storehaus.Store +import org.apache.commons.lang.StringUtils._ +import com.twitter.util.Future +import com.twitter.bijection.hbase.HBaseBijections._ +import com.twitter.bijection.Conversion._ + +/** + * @author MansurAshraf + * @since 9/8/13 + */ +object HBaseStore { + def apply(quorumNames: String, table: String, columnFamily: String, column: String, createTable: Boolean): HBaseStore = { + val store = new HBaseStore(quorumNames, table, columnFamily, column, createTable, new HTablePool()) + store.createTableIfRequired() + store + } +} + +class HBaseStore(val quorumNames: String, val table: String, val columnFamily: String, val column: String, val createTable: Boolean, val pool: HTablePool) extends Store[Array[Byte], Array[Byte]] with HBaseStoreConfig { + + require(isNotEmpty(quorumNames), "Zookeeper quorums are required") + require(isNotEmpty(columnFamily), "column family is required") + require(isNotEmpty(column), "column is required") + + + /** get a single key from the store. + * Prefer multiGet if you are getting more than one key at a time + */ + override def get(k: Array[Byte]): Future[Option[Array[Byte]]] = Future[Option[Array[Byte]]] { + val tbl = pool.getTable(table) + val g = createGetRequest(k) + val result = tbl.get(g) + extractValue(result) + } + + + /** + * replace a value + * Delete is the same as put((k,None)) + */ + override def put(kv: (Array[Byte], Option[Array[Byte]])): Future[Unit] = { + kv match { + case (k, Some(v)) => Future { + val p = new Put(k) + p.add(columnFamily.as[StringBytes], column.as[StringBytes], v) + val tbl = pool.getTable(table) + tbl.put(p) + } + case (k, None) => Future { + val delete = new Delete(k) + val tbl = pool.getTable(table) + tbl.delete(delete) + } + } + + } + + /** Close this store and release any resources. + * It is undefined what happens on get/multiGet after close + */ + override def close { + pool.close() + } + + def extractValue(result: Result): Option[Array[Byte]] = { + val value = result.getValue(columnFamily.as[StringBytes], column.as[StringBytes]) + Option(value) + } + +} + diff --git a/storehaus-hbase/src/main/scala/com/twitter/storehaus/hbase/HBaseStoreConfig.scala b/storehaus-hbase/src/main/scala/com/twitter/storehaus/hbase/HBaseStoreConfig.scala new file mode 100644 index 00000000..d422eea7 --- /dev/null +++ b/storehaus-hbase/src/main/scala/com/twitter/storehaus/hbase/HBaseStoreConfig.scala @@ -0,0 +1,60 @@ +/* + * Copyright 2013 Twitter inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.twitter.storehaus.hbase + +import org.apache.hadoop.hbase.client.{Get, HBaseAdmin} +import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.hbase.{HColumnDescriptor, HTableDescriptor, HBaseConfiguration} +import com.twitter.bijection.hbase.HBaseBijections._ +import com.twitter.bijection.Conversion._ + +/** + * @author MansurAshraf + * @since 9/8/13 + */ +trait HBaseStoreConfig { + + val quorumNames: String + val createTable: Boolean + val table: String + val columnFamily: String + val column:String + + def getHBaseAdmin: HBaseAdmin = { + val conf = new Configuration() + conf.set("hbase.zookeeper.quorum", quorumNames) + val hbaseConf = HBaseConfiguration.create(conf) + new HBaseAdmin(hbaseConf) + } + + def createTableIfRequired() { + val hbaseAdmin = getHBaseAdmin + if (createTable && !hbaseAdmin.tableExists(table)) { + val tableDescriptor = new HTableDescriptor(table) + tableDescriptor.addFamily(new HColumnDescriptor(columnFamily)) + hbaseAdmin.createTable(tableDescriptor) + } + } + + + def createGetRequest(k: Array[Byte]): Get = { + val g = new Get(k) + g.addColumn(columnFamily.as[StringBytes], column.as[StringBytes]) + g + } + +} diff --git a/storehaus-hbase/src/main/scala/com/twitter/storehaus/hbase/HBaseStringStore.scala b/storehaus-hbase/src/main/scala/com/twitter/storehaus/hbase/HBaseStringStore.scala new file mode 100644 index 00000000..c67f56b2 --- /dev/null +++ b/storehaus-hbase/src/main/scala/com/twitter/storehaus/hbase/HBaseStringStore.scala @@ -0,0 +1,98 @@ +/* + * Copyright 2013 Twitter inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.twitter.storehaus.hbase + +import org.apache.commons.lang.StringUtils._ +import org.apache.hadoop.hbase.client._ +import com.twitter.storehaus.Store +import com.twitter.util.Future +import com.twitter.bijection.Conversion._ +import com.twitter.bijection.hbase.HBaseBijections._ +import com.twitter.bijection.Bijection +import scala.Some + +/** + * @author MansurAshraf + * @since 9/7/13 + */ +object HBaseStringStore { + def apply(quorumNames: String, table: String, columnFamily: String, column: String, createTable: Boolean): HBaseStringStore = { + val stringStore = new HBaseStringStore(quorumNames, table, columnFamily, column, createTable, new HTablePool()) + stringStore.createTableIfRequired() + stringStore + } +} + +class HBaseStringStore(val quorumNames: String, val table: String, val columnFamily: String, val column: String, val createTable: Boolean, val pool: HTablePool) extends Store[String, String] with HBaseStoreConfig { + + require(isNotEmpty(quorumNames), "Zookeeper quorums are required") + require(isNotEmpty(columnFamily), "column family is required") + + + /** get a single key from the store. + * Prefer multiGet if you are getting more than one key at a time + */ + override def get(k: String): Future[Option[String]] = Future[Option[String]] { + val tbl = pool.getTable(table) + val g = createGetRequest(k) + val result = tbl.get(g) + extractValue(result) + } + + + /** + * replace a value + * Delete is the same as put((k,None)) + */ + override def put(kv: (String, Option[String])): Future[Unit] = { + kv match { + case (k, Some(v)) => { + Future { + val p = new Put(k.as[StringBytes]) + p.add(columnFamily.as[StringBytes], column.as[StringBytes], v.as[StringBytes]) + val tbl = pool.getTable(table) + tbl.put(p) + } + } + case (k, None) => Future { + val delete = new Delete(k.as[StringBytes]) + val tbl = pool.getTable(table) + tbl.delete(delete) + } + } + + } + + /** Close this store and release any resources. + * It is undefined what happens on get/multiGet after close + */ + override def close { + pool.close() + } + + def createGetRequest(k: String): Get = { + val g = new Get(k.as[StringBytes]) + g.addColumn(columnFamily.as[StringBytes], column.as[StringBytes]) + g + } + + def extractValue(result: Result): Option[String] = { + val value = result.getValue(columnFamily.as[StringBytes], column.as[StringBytes]) + Option(value) map (v => Bijection.invert[String, StringBytes](v.asInstanceOf[StringBytes])) + } + +} diff --git a/storehaus-hbase/src/test/scala/com/twitter/storehaus/hbase/DefaultHBaseConfig.scala b/storehaus-hbase/src/test/scala/com/twitter/storehaus/hbase/DefaultHBaseConfig.scala new file mode 100644 index 00000000..775a3aed --- /dev/null +++ b/storehaus-hbase/src/test/scala/com/twitter/storehaus/hbase/DefaultHBaseConfig.scala @@ -0,0 +1,29 @@ +/* + * Copyright 2013 Twitter inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.twitter.storehaus.hbase + +/** + * @author MansurAshraf + * @since 9/8/13 + */ +trait DefaultHBaseConfig { + val quorumNames = "localhost:2181" + val table = "summing_bird" + val columnFamily = "sb" + val column = "aggregate" + val createTable = true +} diff --git a/storehaus-hbase/src/test/scala/com/twitter/storehaus/hbase/HBaseLongStoreProperties.scala b/storehaus-hbase/src/test/scala/com/twitter/storehaus/hbase/HBaseLongStoreProperties.scala new file mode 100644 index 00000000..eb054366 --- /dev/null +++ b/storehaus-hbase/src/test/scala/com/twitter/storehaus/hbase/HBaseLongStoreProperties.scala @@ -0,0 +1,43 @@ +/* + * Copyright 2013 Twitter inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.twitter.storehaus.hbase + +import org.scalacheck.{Gen, Properties} +import com.twitter.storehaus.testing.CloseableCleanup +import com.twitter.storehaus.Store +import com.twitter.storehaus.testing.generator.NonEmpty +import HBaseStringStoreProperties._ + +/** + * @author MansurAshraf + * @since 9/8/13 + */ +object HBaseLongStoreProperties extends Properties("HBaseLongStore") +with CloseableCleanup[Store[String, Long]] +with DefaultHBaseConfig { + + def validPairs: Gen[List[(String, Option[Long])]] = + NonEmpty.Pairing.alphaStrNumerics[Long](10) + + def storeTest(store: Store[String, Long]) = + putStoreTest(store, validPairs) && multiPutStoreTest(store, validPairs) + + val closeable = HBaseLongStore(quorumNames, table, columnFamily, column, createTable) + + property("HBaseLongStore test") = storeTest(closeable) + +} diff --git a/storehaus-hbase/src/test/scala/com/twitter/storehaus/hbase/HBaseStringStoreProperties.scala b/storehaus-hbase/src/test/scala/com/twitter/storehaus/hbase/HBaseStringStoreProperties.scala new file mode 100644 index 00000000..d9b7164e --- /dev/null +++ b/storehaus-hbase/src/test/scala/com/twitter/storehaus/hbase/HBaseStringStoreProperties.scala @@ -0,0 +1,71 @@ +/* + * Copyright 2013 Twitter inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.twitter.storehaus.hbase + +import org.scalacheck.{Arbitrary, Gen, Properties} +import com.twitter.storehaus.testing.CloseableCleanup +import com.twitter.storehaus.{FutureOps, Store} +import com.twitter.storehaus.testing.generator.NonEmpty +import org.scalacheck.Prop._ +import com.twitter.util.Await + +/** + * @author MansurAshraf + * @since 9/8/13 + */ +object HBaseStringStoreProperties extends Properties("HBaseStore") +with CloseableCleanup[Store[String, String]] +with DefaultHBaseConfig { + + def validPairs: Gen[List[(String, Option[String])]] = + NonEmpty.Pairing.alphaStrs() + + def baseTest[K: Arbitrary, V: Arbitrary : Equiv](store: Store[K, V], validPairs: Gen[List[(K, Option[V])]]) + (put: (Store[K, V], List[(K, Option[V])]) => Unit) = + forAll(validPairs) { + (examples: List[(K, Option[V])]) => + put(store, examples) + examples.toMap.forall { + case (k, optV) => + val res = Await.result(store.get(k)) + Equiv[Option[V]].equiv(res, optV) + } + } + + def putStoreTest[K: Arbitrary, V: Arbitrary : Equiv](store: Store[K, V], validPairs: Gen[List[(K, Option[V])]]) = + baseTest(store, validPairs) { + (s, pairs) => + pairs.foreach { + case (k, v) => Await.result(s.put((k, v))) + } + } + + def multiPutStoreTest[K: Arbitrary, V: Arbitrary : Equiv](store: Store[K, V], validPairs: Gen[List[(K, Option[V])]]) = + baseTest(store, validPairs) { + (s, pairs) => + Await.result(FutureOps.mapCollect(s.multiPut(pairs.toMap))) + } + + def storeTest(store: Store[String, String]) = + putStoreTest(store, validPairs) && multiPutStoreTest(store, validPairs) + + val closeable =HBaseStringStore(quorumNames, table, columnFamily, column, createTable) + + + property("HBaseStore test") = + storeTest(closeable) +} From 6c5d9dc4ca30ec91d3b81f07ee84f58e6554ff31 Mon Sep 17 00:00:00 2001 From: MansurAshraf Date: Mon, 9 Sep 2013 15:31:38 -0500 Subject: [PATCH 04/10] DRYed out the imp by moving common methods to HBaseStore trait --- .../storehaus/hbase/HBaseByteArrayStore.scala | 64 +++++++++++++ .../storehaus/hbase/HBaseLongStore.scala | 59 ++++-------- .../twitter/storehaus/hbase/HBaseStore.scala | 96 +++++++++---------- .../storehaus/hbase/HBaseStoreConfig.scala | 60 ------------ .../storehaus/hbase/HBaseStringStore.scala | 58 +++-------- 5 files changed, 145 insertions(+), 192 deletions(-) create mode 100644 storehaus-hbase/src/main/scala/com/twitter/storehaus/hbase/HBaseByteArrayStore.scala delete mode 100644 storehaus-hbase/src/main/scala/com/twitter/storehaus/hbase/HBaseStoreConfig.scala diff --git a/storehaus-hbase/src/main/scala/com/twitter/storehaus/hbase/HBaseByteArrayStore.scala b/storehaus-hbase/src/main/scala/com/twitter/storehaus/hbase/HBaseByteArrayStore.scala new file mode 100644 index 00000000..9ee3acfc --- /dev/null +++ b/storehaus-hbase/src/main/scala/com/twitter/storehaus/hbase/HBaseByteArrayStore.scala @@ -0,0 +1,64 @@ +/* + * Copyright 2013 Twitter inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.twitter.storehaus.hbase + +import org.apache.hadoop.hbase.client.HTablePool +import com.twitter.storehaus.Store +import com.twitter.util.Future + +/** + * @author MansurAshraf + * @since 9/8/13 + */ +object HBaseByteArrayStore { + def apply(quorumNames: String, table: String, columnFamily: String, column: String, createTable: Boolean): HBaseByteArrayStore = { + val store = new HBaseByteArrayStore(quorumNames, table, columnFamily, column, createTable, new HTablePool()) + store.validateConfiguration() + store.createTableIfRequired() + store + } +} + +class HBaseByteArrayStore(val quorumNames: String, + val table: String, + val columnFamily: String, + val column: String, + val createTable: Boolean, + val pool: HTablePool) extends Store[Array[Byte], Array[Byte]] with HBaseStore { + + /** get a single key from the store. + * Prefer multiGet if you are getting more than one key at a time + */ + override def get(k: Array[Byte]): Future[Option[Array[Byte]]] = { + getValue(k) + } + + /** + * replace a value + * Delete is the same as put((k,None)) + */ + override def put(kv: (Array[Byte], Option[Array[Byte]])): Future[Unit] = { + putValue(kv) + } + + /** Close this store and release any resources. + * It is undefined what happens on get/multiGet after close + */ + override def close { + pool.close() + } +} diff --git a/storehaus-hbase/src/main/scala/com/twitter/storehaus/hbase/HBaseLongStore.scala b/storehaus-hbase/src/main/scala/com/twitter/storehaus/hbase/HBaseLongStore.scala index c8ac62dd..fa60fc3a 100644 --- a/storehaus-hbase/src/main/scala/com/twitter/storehaus/hbase/HBaseLongStore.scala +++ b/storehaus-hbase/src/main/scala/com/twitter/storehaus/hbase/HBaseLongStore.scala @@ -16,65 +16,50 @@ package com.twitter.storehaus.hbase -import org.apache.hadoop.hbase.client.{Delete, Put, Result, HTablePool} +import org.apache.hadoop.hbase.client.HTablePool import com.twitter.storehaus.Store -import org.apache.commons.lang.StringUtils._ import com.twitter.util.Future -import com.twitter.bijection.hbase.HBaseBijections._ -import com.twitter.bijection.Bijection -import com.twitter.bijection.Conversion._ +import com.twitter.bijection.Injection._ /** - * @author MansurAshraf + * @author Mansur Ashraf * @since 9/8/13 */ object HBaseLongStore { def apply(quorumNames: String, table: String, columnFamily: String, column: String, createTable: Boolean): HBaseLongStore = { val store = new HBaseLongStore(quorumNames, table, columnFamily, column, createTable, new HTablePool()) + store.validateConfiguration() store.createTableIfRequired() store } } -class HBaseLongStore(val quorumNames: String, val table: String, val columnFamily: String, val column: String, val createTable: Boolean, val pool: HTablePool) extends Store[String, Long] with HBaseStoreConfig { - - require(isNotEmpty(quorumNames), "Zookeeper quorums are required") - require(isNotEmpty(columnFamily), "column family is required") - require(isNotEmpty(column), "column is required") - +class HBaseLongStore(protected val quorumNames: String, + protected val table: String, + protected val columnFamily: String, + protected val column: String, + protected val createTable: Boolean, + protected val pool: HTablePool) extends Store[String, Long] with HBaseStore { /** get a single key from the store. * Prefer multiGet if you are getting more than one key at a time */ - override def get(k: String): Future[Option[Long]] = Future[Option[Long]] { - val tbl = pool.getTable(table) - val g = createGetRequest(k.as[StringBytes]) - val result = tbl.get(g) - extractValue(result) + override def get(k: String): Future[Option[Long]] = { + import com.twitter.bijection.hbase.HBaseBijections._ + implicit val stringInj = fromBijectionRep[String, StringBytes] + implicit val LongInj = fromBijectionRep[Long, LongBytes] + getValue[String,Long](k) } - /** * replace a value * Delete is the same as put((k,None)) */ override def put(kv: (String, Option[Long])): Future[Unit] = { - kv match { - case (k, Some(v)) => { - Future { - val p = new Put(k.as[StringBytes]) - p.add(columnFamily.as[StringBytes], column.as[StringBytes], v.as[LongBytes]) - val tbl = pool.getTable(table) - tbl.put(p) - } - } - case (k, None) => Future { - val delete = new Delete(k.as[StringBytes]) - val tbl = pool.getTable(table) - tbl.delete(delete) - } - } - + import com.twitter.bijection.hbase.HBaseBijections._ + implicit val stringInj = fromBijectionRep[String, StringBytes] + implicit val LongInj = fromBijectionRep[Long, LongBytes] + putValue(kv) } /** Close this store and release any resources. @@ -83,11 +68,5 @@ class HBaseLongStore(val quorumNames: String, val table: String, val columnFamil override def close { pool.close() } - - def extractValue(result: Result): Option[Long] = { - val value = result.getValue(columnFamily.as[StringBytes], column.as[StringBytes]) - Option(value) map (v => Bijection.invert[Long, LongBytes](v.asInstanceOf[LongBytes])) - } - } diff --git a/storehaus-hbase/src/main/scala/com/twitter/storehaus/hbase/HBaseStore.scala b/storehaus-hbase/src/main/scala/com/twitter/storehaus/hbase/HBaseStore.scala index 15ba9302..587601ad 100644 --- a/storehaus-hbase/src/main/scala/com/twitter/storehaus/hbase/HBaseStore.scala +++ b/storehaus-hbase/src/main/scala/com/twitter/storehaus/hbase/HBaseStore.scala @@ -16,75 +16,75 @@ package com.twitter.storehaus.hbase -import org.apache.hadoop.hbase.client.{Delete, Put, Result, HTablePool} -import com.twitter.storehaus.Store -import org.apache.commons.lang.StringUtils._ -import com.twitter.util.Future +import org.apache.hadoop.hbase.client._ +import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.hbase.{HColumnDescriptor, HTableDescriptor, HBaseConfiguration} import com.twitter.bijection.hbase.HBaseBijections._ import com.twitter.bijection.Conversion._ +import com.twitter.bijection.Injection +import com.twitter.util.Future +import scala.Some /** - * @author MansurAshraf + * @author Mansur Ashraf * @since 9/8/13 */ -object HBaseStore { - def apply(quorumNames: String, table: String, columnFamily: String, column: String, createTable: Boolean): HBaseStore = { - val store = new HBaseStore(quorumNames, table, columnFamily, column, createTable, new HTablePool()) - store.createTableIfRequired() - store +trait HBaseStore { + + protected val quorumNames: String + protected val createTable: Boolean + protected val table: String + protected val columnFamily: String + protected val column: String + protected val pool: HTablePool + + def getHBaseAdmin: HBaseAdmin = { + val conf = new Configuration() + conf.set("hbase.zookeeper.quorum", quorumNames) + val hbaseConf = HBaseConfiguration.create(conf) + new HBaseAdmin(hbaseConf) } -} -class HBaseStore(val quorumNames: String, val table: String, val columnFamily: String, val column: String, val createTable: Boolean, val pool: HTablePool) extends Store[Array[Byte], Array[Byte]] with HBaseStoreConfig { + def createTableIfRequired() { + val hbaseAdmin = getHBaseAdmin + if (createTable && !hbaseAdmin.tableExists(table)) { + val tableDescriptor = new HTableDescriptor(table) + tableDescriptor.addFamily(new HColumnDescriptor(columnFamily)) + hbaseAdmin.createTable(tableDescriptor) + } + } - require(isNotEmpty(quorumNames), "Zookeeper quorums are required") - require(isNotEmpty(columnFamily), "column family is required") - require(isNotEmpty(column), "column is required") + def validateConfiguration() { + import org.apache.commons.lang.StringUtils.isNotEmpty + require(isNotEmpty(quorumNames), "Zookeeper quorums are required") + require(isNotEmpty(columnFamily), "column family is required") + require(isNotEmpty(column), "column is required") + } - /** get a single key from the store. - * Prefer multiGet if you are getting more than one key at a time - */ - override def get(k: Array[Byte]): Future[Option[Array[Byte]]] = Future[Option[Array[Byte]]] { + def getValue[K, V](key: K)(implicit keyInj: Injection[K, Array[Byte]], valueInj: Injection[V, Array[Byte]]): Future[Option[V]] = Future { val tbl = pool.getTable(table) - val g = createGetRequest(k) + val g = new Get(keyInj(key)) + g.addColumn(columnFamily.as[StringBytes], column.as[StringBytes]) + val result = tbl.get(g) - extractValue(result) + val value = result.getValue(columnFamily.as[StringBytes], column.as[StringBytes]) + Option(value).map(v => valueInj.invert(v).get) } - - /** - * replace a value - * Delete is the same as put((k,None)) - */ - override def put(kv: (Array[Byte], Option[Array[Byte]])): Future[Unit] = { + def putValue[K, V](kv: (K, Option[V]))(implicit keyInj: Injection[K, Array[Byte]], valueInj: Injection[V, Array[Byte]]): Future[Unit] = { kv match { case (k, Some(v)) => Future { - val p = new Put(k) - p.add(columnFamily.as[StringBytes], column.as[StringBytes], v) - val tbl = pool.getTable(table) - tbl.put(p) - } + val p = new Put(keyInj(k)) + p.add(columnFamily.as[StringBytes], column.as[StringBytes], valueInj(v)) + val tbl = pool.getTable(table) + tbl.put(p) + } case (k, None) => Future { - val delete = new Delete(k) + val delete = new Delete(keyInj(k)) val tbl = pool.getTable(table) tbl.delete(delete) } } - - } - - /** Close this store and release any resources. - * It is undefined what happens on get/multiGet after close - */ - override def close { - pool.close() } - - def extractValue(result: Result): Option[Array[Byte]] = { - val value = result.getValue(columnFamily.as[StringBytes], column.as[StringBytes]) - Option(value) - } - } - diff --git a/storehaus-hbase/src/main/scala/com/twitter/storehaus/hbase/HBaseStoreConfig.scala b/storehaus-hbase/src/main/scala/com/twitter/storehaus/hbase/HBaseStoreConfig.scala deleted file mode 100644 index d422eea7..00000000 --- a/storehaus-hbase/src/main/scala/com/twitter/storehaus/hbase/HBaseStoreConfig.scala +++ /dev/null @@ -1,60 +0,0 @@ -/* - * Copyright 2013 Twitter inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.twitter.storehaus.hbase - -import org.apache.hadoop.hbase.client.{Get, HBaseAdmin} -import org.apache.hadoop.conf.Configuration -import org.apache.hadoop.hbase.{HColumnDescriptor, HTableDescriptor, HBaseConfiguration} -import com.twitter.bijection.hbase.HBaseBijections._ -import com.twitter.bijection.Conversion._ - -/** - * @author MansurAshraf - * @since 9/8/13 - */ -trait HBaseStoreConfig { - - val quorumNames: String - val createTable: Boolean - val table: String - val columnFamily: String - val column:String - - def getHBaseAdmin: HBaseAdmin = { - val conf = new Configuration() - conf.set("hbase.zookeeper.quorum", quorumNames) - val hbaseConf = HBaseConfiguration.create(conf) - new HBaseAdmin(hbaseConf) - } - - def createTableIfRequired() { - val hbaseAdmin = getHBaseAdmin - if (createTable && !hbaseAdmin.tableExists(table)) { - val tableDescriptor = new HTableDescriptor(table) - tableDescriptor.addFamily(new HColumnDescriptor(columnFamily)) - hbaseAdmin.createTable(tableDescriptor) - } - } - - - def createGetRequest(k: Array[Byte]): Get = { - val g = new Get(k) - g.addColumn(columnFamily.as[StringBytes], column.as[StringBytes]) - g - } - -} diff --git a/storehaus-hbase/src/main/scala/com/twitter/storehaus/hbase/HBaseStringStore.scala b/storehaus-hbase/src/main/scala/com/twitter/storehaus/hbase/HBaseStringStore.scala index c67f56b2..3e9d2c79 100644 --- a/storehaus-hbase/src/main/scala/com/twitter/storehaus/hbase/HBaseStringStore.scala +++ b/storehaus-hbase/src/main/scala/com/twitter/storehaus/hbase/HBaseStringStore.scala @@ -16,14 +16,10 @@ package com.twitter.storehaus.hbase -import org.apache.commons.lang.StringUtils._ import org.apache.hadoop.hbase.client._ import com.twitter.storehaus.Store import com.twitter.util.Future -import com.twitter.bijection.Conversion._ -import com.twitter.bijection.hbase.HBaseBijections._ -import com.twitter.bijection.Bijection -import scala.Some +import com.twitter.bijection.Injection._ /** * @author MansurAshraf @@ -32,49 +28,35 @@ import scala.Some object HBaseStringStore { def apply(quorumNames: String, table: String, columnFamily: String, column: String, createTable: Boolean): HBaseStringStore = { val stringStore = new HBaseStringStore(quorumNames, table, columnFamily, column, createTable, new HTablePool()) + stringStore.validateConfiguration() stringStore.createTableIfRequired() stringStore } } -class HBaseStringStore(val quorumNames: String, val table: String, val columnFamily: String, val column: String, val createTable: Boolean, val pool: HTablePool) extends Store[String, String] with HBaseStoreConfig { - - require(isNotEmpty(quorumNames), "Zookeeper quorums are required") - require(isNotEmpty(columnFamily), "column family is required") - +class HBaseStringStore(protected val quorumNames: String, + protected val table: String, + protected val columnFamily: String, + protected val column: String, val createTable: Boolean, + protected val pool: HTablePool) extends Store[String, String] with HBaseStore { /** get a single key from the store. * Prefer multiGet if you are getting more than one key at a time */ - override def get(k: String): Future[Option[String]] = Future[Option[String]] { - val tbl = pool.getTable(table) - val g = createGetRequest(k) - val result = tbl.get(g) - extractValue(result) + override def get(k: String): Future[Option[String]] = { + import com.twitter.bijection.hbase.HBaseBijections._ + implicit val stringInj = fromBijectionRep[String, StringBytes] + get(k) } - /** * replace a value * Delete is the same as put((k,None)) */ override def put(kv: (String, Option[String])): Future[Unit] = { - kv match { - case (k, Some(v)) => { - Future { - val p = new Put(k.as[StringBytes]) - p.add(columnFamily.as[StringBytes], column.as[StringBytes], v.as[StringBytes]) - val tbl = pool.getTable(table) - tbl.put(p) - } - } - case (k, None) => Future { - val delete = new Delete(k.as[StringBytes]) - val tbl = pool.getTable(table) - tbl.delete(delete) - } - } - + import com.twitter.bijection.hbase.HBaseBijections._ + implicit val stringInj = fromBijectionRep[String, StringBytes] + putValue(kv) } /** Close this store and release any resources. @@ -83,16 +65,4 @@ class HBaseStringStore(val quorumNames: String, val table: String, val columnFam override def close { pool.close() } - - def createGetRequest(k: String): Get = { - val g = new Get(k.as[StringBytes]) - g.addColumn(columnFamily.as[StringBytes], column.as[StringBytes]) - g - } - - def extractValue(result: Result): Option[String] = { - val value = result.getValue(columnFamily.as[StringBytes], column.as[StringBytes]) - Option(value) map (v => Bijection.invert[String, StringBytes](v.asInstanceOf[StringBytes])) - } - } From fee87345da7a9aa632975dc23d09914221bdcc38 Mon Sep 17 00:00:00 2001 From: MansurAshraf Date: Mon, 9 Sep 2013 16:24:35 -0500 Subject: [PATCH 05/10] changed quorum to Seq --- .../com/twitter/storehaus/hbase/HBaseByteArrayStore.scala | 4 ++-- .../scala/com/twitter/storehaus/hbase/HBaseLongStore.scala | 4 ++-- .../main/scala/com/twitter/storehaus/hbase/HBaseStore.scala | 6 +++--- .../com/twitter/storehaus/hbase/HBaseStringStore.scala | 6 +++--- .../com/twitter/storehaus/hbase/DefaultHBaseConfig.scala | 2 +- 5 files changed, 11 insertions(+), 11 deletions(-) diff --git a/storehaus-hbase/src/main/scala/com/twitter/storehaus/hbase/HBaseByteArrayStore.scala b/storehaus-hbase/src/main/scala/com/twitter/storehaus/hbase/HBaseByteArrayStore.scala index 9ee3acfc..86c45488 100644 --- a/storehaus-hbase/src/main/scala/com/twitter/storehaus/hbase/HBaseByteArrayStore.scala +++ b/storehaus-hbase/src/main/scala/com/twitter/storehaus/hbase/HBaseByteArrayStore.scala @@ -25,7 +25,7 @@ import com.twitter.util.Future * @since 9/8/13 */ object HBaseByteArrayStore { - def apply(quorumNames: String, table: String, columnFamily: String, column: String, createTable: Boolean): HBaseByteArrayStore = { + def apply(quorumNames: Seq[String], table: String, columnFamily: String, column: String, createTable: Boolean): HBaseByteArrayStore = { val store = new HBaseByteArrayStore(quorumNames, table, columnFamily, column, createTable, new HTablePool()) store.validateConfiguration() store.createTableIfRequired() @@ -33,7 +33,7 @@ object HBaseByteArrayStore { } } -class HBaseByteArrayStore(val quorumNames: String, +class HBaseByteArrayStore(val quorumNames: Seq[String], val table: String, val columnFamily: String, val column: String, diff --git a/storehaus-hbase/src/main/scala/com/twitter/storehaus/hbase/HBaseLongStore.scala b/storehaus-hbase/src/main/scala/com/twitter/storehaus/hbase/HBaseLongStore.scala index fa60fc3a..5968c4ba 100644 --- a/storehaus-hbase/src/main/scala/com/twitter/storehaus/hbase/HBaseLongStore.scala +++ b/storehaus-hbase/src/main/scala/com/twitter/storehaus/hbase/HBaseLongStore.scala @@ -26,7 +26,7 @@ import com.twitter.bijection.Injection._ * @since 9/8/13 */ object HBaseLongStore { - def apply(quorumNames: String, table: String, columnFamily: String, column: String, createTable: Boolean): HBaseLongStore = { + def apply(quorumNames: Seq[String], table: String, columnFamily: String, column: String, createTable: Boolean): HBaseLongStore = { val store = new HBaseLongStore(quorumNames, table, columnFamily, column, createTable, new HTablePool()) store.validateConfiguration() store.createTableIfRequired() @@ -34,7 +34,7 @@ object HBaseLongStore { } } -class HBaseLongStore(protected val quorumNames: String, +class HBaseLongStore(protected val quorumNames: Seq[String], protected val table: String, protected val columnFamily: String, protected val column: String, diff --git a/storehaus-hbase/src/main/scala/com/twitter/storehaus/hbase/HBaseStore.scala b/storehaus-hbase/src/main/scala/com/twitter/storehaus/hbase/HBaseStore.scala index 587601ad..d87d41bf 100644 --- a/storehaus-hbase/src/main/scala/com/twitter/storehaus/hbase/HBaseStore.scala +++ b/storehaus-hbase/src/main/scala/com/twitter/storehaus/hbase/HBaseStore.scala @@ -31,7 +31,7 @@ import scala.Some */ trait HBaseStore { - protected val quorumNames: String + protected val quorumNames: Seq[String] protected val createTable: Boolean protected val table: String protected val columnFamily: String @@ -40,7 +40,7 @@ trait HBaseStore { def getHBaseAdmin: HBaseAdmin = { val conf = new Configuration() - conf.set("hbase.zookeeper.quorum", quorumNames) + conf.set("hbase.zookeeper.quorum", quorumNames.mkString(",")) val hbaseConf = HBaseConfiguration.create(conf) new HBaseAdmin(hbaseConf) } @@ -57,7 +57,7 @@ trait HBaseStore { def validateConfiguration() { import org.apache.commons.lang.StringUtils.isNotEmpty - require(isNotEmpty(quorumNames), "Zookeeper quorums are required") + require(!quorumNames.isEmpty, "Zookeeper quorums are required") require(isNotEmpty(columnFamily), "column family is required") require(isNotEmpty(column), "column is required") } diff --git a/storehaus-hbase/src/main/scala/com/twitter/storehaus/hbase/HBaseStringStore.scala b/storehaus-hbase/src/main/scala/com/twitter/storehaus/hbase/HBaseStringStore.scala index 3e9d2c79..1bc9f9b9 100644 --- a/storehaus-hbase/src/main/scala/com/twitter/storehaus/hbase/HBaseStringStore.scala +++ b/storehaus-hbase/src/main/scala/com/twitter/storehaus/hbase/HBaseStringStore.scala @@ -26,7 +26,7 @@ import com.twitter.bijection.Injection._ * @since 9/7/13 */ object HBaseStringStore { - def apply(quorumNames: String, table: String, columnFamily: String, column: String, createTable: Boolean): HBaseStringStore = { + def apply(quorumNames: Seq[String], table: String, columnFamily: String, column: String, createTable: Boolean): HBaseStringStore = { val stringStore = new HBaseStringStore(quorumNames, table, columnFamily, column, createTable, new HTablePool()) stringStore.validateConfiguration() stringStore.createTableIfRequired() @@ -34,7 +34,7 @@ object HBaseStringStore { } } -class HBaseStringStore(protected val quorumNames: String, +class HBaseStringStore(protected val quorumNames: Seq[String], protected val table: String, protected val columnFamily: String, protected val column: String, val createTable: Boolean, @@ -46,7 +46,7 @@ class HBaseStringStore(protected val quorumNames: String, override def get(k: String): Future[Option[String]] = { import com.twitter.bijection.hbase.HBaseBijections._ implicit val stringInj = fromBijectionRep[String, StringBytes] - get(k) + getValue[String,String](k) } /** diff --git a/storehaus-hbase/src/test/scala/com/twitter/storehaus/hbase/DefaultHBaseConfig.scala b/storehaus-hbase/src/test/scala/com/twitter/storehaus/hbase/DefaultHBaseConfig.scala index 775a3aed..9b98906c 100644 --- a/storehaus-hbase/src/test/scala/com/twitter/storehaus/hbase/DefaultHBaseConfig.scala +++ b/storehaus-hbase/src/test/scala/com/twitter/storehaus/hbase/DefaultHBaseConfig.scala @@ -21,7 +21,7 @@ package com.twitter.storehaus.hbase * @since 9/8/13 */ trait DefaultHBaseConfig { - val quorumNames = "localhost:2181" + val quorumNames = Seq("localhost:2181") val table = "summing_bird" val columnFamily = "sb" val column = "aggregate" From 55791ed3fe5237f092f237bd75799d4484023408 Mon Sep 17 00:00:00 2001 From: MansurAshraf Date: Mon, 9 Sep 2013 23:28:08 -0500 Subject: [PATCH 06/10] added support for HBaseMinicluster for testing --- .travis.yml | 3 +- project/Build.scala | 21 ++++++++++-- .../storehaus/hbase/HBaseByteArrayStore.scala | 30 ++++++++++++----- .../storehaus/hbase/HBaseLongStore.scala | 22 ++++++++++--- .../twitter/storehaus/hbase/HBaseStore.scala | 16 ++++++---- .../storehaus/hbase/HBaseStringStore.scala | 32 +++++++++++++------ ...Config.scala => DefaultHBaseCluster.scala} | 15 ++++++++- .../hbase/HBaseLongStoreProperties.scala | 7 ++-- .../hbase/HBaseStringStoreProperties.scala | 13 +++----- 9 files changed, 115 insertions(+), 44 deletions(-) rename storehaus-hbase/src/test/scala/com/twitter/storehaus/hbase/{DefaultHBaseConfig.scala => DefaultHBaseCluster.scala} (64%) diff --git a/.travis.yml b/.travis.yml index befbde46..9c859b5f 100644 --- a/.travis.yml +++ b/.travis.yml @@ -8,4 +8,5 @@ before_script: - mysql -u root -e "grant all on storehaus_test.* to 'storehaususer'@'localhost';" services: - redis-server - - memcache \ No newline at end of file + - memcache +script: umask 0022 && sbt ++$TRAVIS_SCALA_VERSION test \ No newline at end of file diff --git a/project/Build.scala b/project/Build.scala index dce88423..3a609766 100644 --- a/project/Build.scala +++ b/project/Build.scala @@ -1,3 +1,19 @@ +/* + * Copyright 2013 Twitter inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package storehaus import sbt._ @@ -165,8 +181,9 @@ object StorehausBuild extends Build { "com.twitter" %% "algebird-core" % algebirdVersion, "com.twitter" %% "bijection-core" % bijectionVersion, "com.twitter" %% "bijection-hbase" % bijectionVersion , - "org.apache.hbase" % "hbase" % "0.94.4" % "provided->default", - "org.apache.hadoop" % "hadoop-core" % "1.0.4" % "provided->default" + "org.apache.hbase" % "hbase" % "0.94.4" % "provided->default" classifier "tests" classifier "", + "org.apache.hadoop" % "hadoop-core" % "1.0.4" % "provided->default", + "org.apache.hadoop" % "hadoop-test" % "1.0.4" % "test" ), parallelExecution in Test := false ).dependsOn(storehausAlgebra % "test->test;compile->compile") diff --git a/storehaus-hbase/src/main/scala/com/twitter/storehaus/hbase/HBaseByteArrayStore.scala b/storehaus-hbase/src/main/scala/com/twitter/storehaus/hbase/HBaseByteArrayStore.scala index 86c45488..ae53b29c 100644 --- a/storehaus-hbase/src/main/scala/com/twitter/storehaus/hbase/HBaseByteArrayStore.scala +++ b/storehaus-hbase/src/main/scala/com/twitter/storehaus/hbase/HBaseByteArrayStore.scala @@ -19,26 +19,40 @@ package com.twitter.storehaus.hbase import org.apache.hadoop.hbase.client.HTablePool import com.twitter.storehaus.Store import com.twitter.util.Future +import org.apache.hadoop.conf.Configuration /** * @author MansurAshraf * @since 9/8/13 */ object HBaseByteArrayStore { - def apply(quorumNames: Seq[String], table: String, columnFamily: String, column: String, createTable: Boolean): HBaseByteArrayStore = { - val store = new HBaseByteArrayStore(quorumNames, table, columnFamily, column, createTable, new HTablePool()) + def apply(quorumNames: Seq[String], + table: String, + columnFamily: String, + column: String, + createTable: Boolean, + pool: HTablePool, + conf: Configuration): HBaseByteArrayStore = { + val store = new HBaseByteArrayStore(quorumNames, table, columnFamily, column, createTable,pool,conf) store.validateConfiguration() store.createTableIfRequired() store } + + def apply(quorumNames: Seq[String], + table: String, + columnFamily: String, + column: String, + createTable: Boolean): HBaseByteArrayStore = apply(quorumNames,table,columnFamily,column,createTable,new HTablePool(), new Configuration()) } -class HBaseByteArrayStore(val quorumNames: Seq[String], - val table: String, - val columnFamily: String, - val column: String, - val createTable: Boolean, - val pool: HTablePool) extends Store[Array[Byte], Array[Byte]] with HBaseStore { +class HBaseByteArrayStore(protected val quorumNames: Seq[String], + protected val table: String, + protected val columnFamily: String, + protected val column: String, + protected val createTable: Boolean, + protected val pool: HTablePool, + protected val conf: Configuration) extends Store[Array[Byte], Array[Byte]] with HBaseStore { /** get a single key from the store. * Prefer multiGet if you are getting more than one key at a time diff --git a/storehaus-hbase/src/main/scala/com/twitter/storehaus/hbase/HBaseLongStore.scala b/storehaus-hbase/src/main/scala/com/twitter/storehaus/hbase/HBaseLongStore.scala index 5968c4ba..8d3cfc15 100644 --- a/storehaus-hbase/src/main/scala/com/twitter/storehaus/hbase/HBaseLongStore.scala +++ b/storehaus-hbase/src/main/scala/com/twitter/storehaus/hbase/HBaseLongStore.scala @@ -20,18 +20,31 @@ import org.apache.hadoop.hbase.client.HTablePool import com.twitter.storehaus.Store import com.twitter.util.Future import com.twitter.bijection.Injection._ +import org.apache.hadoop.conf.Configuration /** * @author Mansur Ashraf * @since 9/8/13 */ object HBaseLongStore { - def apply(quorumNames: Seq[String], table: String, columnFamily: String, column: String, createTable: Boolean): HBaseLongStore = { - val store = new HBaseLongStore(quorumNames, table, columnFamily, column, createTable, new HTablePool()) + def apply(quorumNames: Seq[String], + table: String, + columnFamily: String, + column: String, + createTable: Boolean, + pool: HTablePool, + conf: Configuration): HBaseLongStore = { + val store = new HBaseLongStore(quorumNames, table, columnFamily, column, createTable, pool, conf) store.validateConfiguration() store.createTableIfRequired() store } + + def apply(quorumNames: Seq[String], + table: String, + columnFamily: String, + column: String, + createTable: Boolean): HBaseLongStore = apply(quorumNames, table, columnFamily, column, createTable, new HTablePool(), new Configuration()) } class HBaseLongStore(protected val quorumNames: Seq[String], @@ -39,7 +52,8 @@ class HBaseLongStore(protected val quorumNames: Seq[String], protected val columnFamily: String, protected val column: String, protected val createTable: Boolean, - protected val pool: HTablePool) extends Store[String, Long] with HBaseStore { + protected val pool: HTablePool, + protected val conf: Configuration) extends Store[String, Long] with HBaseStore { /** get a single key from the store. * Prefer multiGet if you are getting more than one key at a time @@ -48,7 +62,7 @@ class HBaseLongStore(protected val quorumNames: Seq[String], import com.twitter.bijection.hbase.HBaseBijections._ implicit val stringInj = fromBijectionRep[String, StringBytes] implicit val LongInj = fromBijectionRep[Long, LongBytes] - getValue[String,Long](k) + getValue[String, Long](k) } /** diff --git a/storehaus-hbase/src/main/scala/com/twitter/storehaus/hbase/HBaseStore.scala b/storehaus-hbase/src/main/scala/com/twitter/storehaus/hbase/HBaseStore.scala index d87d41bf..f9be24a2 100644 --- a/storehaus-hbase/src/main/scala/com/twitter/storehaus/hbase/HBaseStore.scala +++ b/storehaus-hbase/src/main/scala/com/twitter/storehaus/hbase/HBaseStore.scala @@ -37,10 +37,12 @@ trait HBaseStore { protected val columnFamily: String protected val column: String protected val pool: HTablePool + protected val conf: Configuration def getHBaseAdmin: HBaseAdmin = { - val conf = new Configuration() - conf.set("hbase.zookeeper.quorum", quorumNames.mkString(",")) + if (conf.get("hbase.zookeeper.quorum") == null) { + conf.set("hbase.zookeeper.quorum", quorumNames.mkString(",")) + } val hbaseConf = HBaseConfiguration.create(conf) new HBaseAdmin(hbaseConf) } @@ -75,11 +77,11 @@ trait HBaseStore { def putValue[K, V](kv: (K, Option[V]))(implicit keyInj: Injection[K, Array[Byte]], valueInj: Injection[V, Array[Byte]]): Future[Unit] = { kv match { case (k, Some(v)) => Future { - val p = new Put(keyInj(k)) - p.add(columnFamily.as[StringBytes], column.as[StringBytes], valueInj(v)) - val tbl = pool.getTable(table) - tbl.put(p) - } + val p = new Put(keyInj(k)) + p.add(columnFamily.as[StringBytes], column.as[StringBytes], valueInj(v)) + val tbl = pool.getTable(table) + tbl.put(p) + } case (k, None) => Future { val delete = new Delete(keyInj(k)) val tbl = pool.getTable(table) diff --git a/storehaus-hbase/src/main/scala/com/twitter/storehaus/hbase/HBaseStringStore.scala b/storehaus-hbase/src/main/scala/com/twitter/storehaus/hbase/HBaseStringStore.scala index 1bc9f9b9..aa813d9d 100644 --- a/storehaus-hbase/src/main/scala/com/twitter/storehaus/hbase/HBaseStringStore.scala +++ b/storehaus-hbase/src/main/scala/com/twitter/storehaus/hbase/HBaseStringStore.scala @@ -20,33 +20,47 @@ import org.apache.hadoop.hbase.client._ import com.twitter.storehaus.Store import com.twitter.util.Future import com.twitter.bijection.Injection._ +import org.apache.hadoop.conf.Configuration /** * @author MansurAshraf * @since 9/7/13 */ object HBaseStringStore { - def apply(quorumNames: Seq[String], table: String, columnFamily: String, column: String, createTable: Boolean): HBaseStringStore = { - val stringStore = new HBaseStringStore(quorumNames, table, columnFamily, column, createTable, new HTablePool()) - stringStore.validateConfiguration() - stringStore.createTableIfRequired() - stringStore + def apply(quorumNames: Seq[String], + table: String, + columnFamily: String, + column: String, + createTable: Boolean, + pool: HTablePool, + conf: Configuration): HBaseStringStore = { + val store = new HBaseStringStore(quorumNames, table, columnFamily, column, createTable, pool, conf) + store.validateConfiguration() + store.createTableIfRequired() + store } + + def apply(quorumNames: Seq[String], + table: String, + columnFamily: String, + column: String, + createTable: Boolean): HBaseStringStore = apply(quorumNames, table, columnFamily, column, createTable, new HTablePool(), new Configuration()) } class HBaseStringStore(protected val quorumNames: Seq[String], protected val table: String, protected val columnFamily: String, - protected val column: String, val createTable: Boolean, - protected val pool: HTablePool) extends Store[String, String] with HBaseStore { - + protected val column: String, + protected val createTable: Boolean, + protected val pool: HTablePool, + protected val conf: Configuration) extends Store[String, String] with HBaseStore { /** get a single key from the store. * Prefer multiGet if you are getting more than one key at a time */ override def get(k: String): Future[Option[String]] = { import com.twitter.bijection.hbase.HBaseBijections._ implicit val stringInj = fromBijectionRep[String, StringBytes] - getValue[String,String](k) + getValue[String, String](k) } /** diff --git a/storehaus-hbase/src/test/scala/com/twitter/storehaus/hbase/DefaultHBaseConfig.scala b/storehaus-hbase/src/test/scala/com/twitter/storehaus/hbase/DefaultHBaseCluster.scala similarity index 64% rename from storehaus-hbase/src/test/scala/com/twitter/storehaus/hbase/DefaultHBaseConfig.scala rename to storehaus-hbase/src/test/scala/com/twitter/storehaus/hbase/DefaultHBaseCluster.scala index 9b98906c..fc47aa73 100644 --- a/storehaus-hbase/src/test/scala/com/twitter/storehaus/hbase/DefaultHBaseConfig.scala +++ b/storehaus-hbase/src/test/scala/com/twitter/storehaus/hbase/DefaultHBaseCluster.scala @@ -16,14 +16,27 @@ package com.twitter.storehaus.hbase +import org.apache.hadoop.hbase.HBaseTestingUtility +import org.apache.hadoop.hbase.client.HTablePool +import com.twitter.storehaus.testing.CloseableCleanup +import java.io.Closeable + /** * @author MansurAshraf * @since 9/8/13 */ -trait DefaultHBaseConfig { +trait DefaultHBaseCluster[C <: Closeable] extends CloseableCleanup[C] { val quorumNames = Seq("localhost:2181") val table = "summing_bird" val columnFamily = "sb" val column = "aggregate" val createTable = true + val testingUtil = new HBaseTestingUtility() + val conf = testingUtil.getConfiguration + val pool = new HTablePool(conf, 1) + + override def cleanup() { + super.cleanup() + testingUtil.shutdownMiniCluster() + } } diff --git a/storehaus-hbase/src/test/scala/com/twitter/storehaus/hbase/HBaseLongStoreProperties.scala b/storehaus-hbase/src/test/scala/com/twitter/storehaus/hbase/HBaseLongStoreProperties.scala index eb054366..df597c97 100644 --- a/storehaus-hbase/src/test/scala/com/twitter/storehaus/hbase/HBaseLongStoreProperties.scala +++ b/storehaus-hbase/src/test/scala/com/twitter/storehaus/hbase/HBaseLongStoreProperties.scala @@ -17,7 +17,6 @@ package com.twitter.storehaus.hbase import org.scalacheck.{Gen, Properties} -import com.twitter.storehaus.testing.CloseableCleanup import com.twitter.storehaus.Store import com.twitter.storehaus.testing.generator.NonEmpty import HBaseStringStoreProperties._ @@ -27,8 +26,7 @@ import HBaseStringStoreProperties._ * @since 9/8/13 */ object HBaseLongStoreProperties extends Properties("HBaseLongStore") -with CloseableCleanup[Store[String, Long]] -with DefaultHBaseConfig { +with DefaultHBaseCluster[Store[String, Long]] { def validPairs: Gen[List[(String, Option[Long])]] = NonEmpty.Pairing.alphaStrNumerics[Long](10) @@ -36,7 +34,8 @@ with DefaultHBaseConfig { def storeTest(store: Store[String, Long]) = putStoreTest(store, validPairs) && multiPutStoreTest(store, validPairs) - val closeable = HBaseLongStore(quorumNames, table, columnFamily, column, createTable) + testingUtil.startMiniCluster() + val closeable = HBaseLongStore(quorumNames, table, columnFamily, column, createTable,pool,conf) property("HBaseLongStore test") = storeTest(closeable) diff --git a/storehaus-hbase/src/test/scala/com/twitter/storehaus/hbase/HBaseStringStoreProperties.scala b/storehaus-hbase/src/test/scala/com/twitter/storehaus/hbase/HBaseStringStoreProperties.scala index d9b7164e..83abcdb0 100644 --- a/storehaus-hbase/src/test/scala/com/twitter/storehaus/hbase/HBaseStringStoreProperties.scala +++ b/storehaus-hbase/src/test/scala/com/twitter/storehaus/hbase/HBaseStringStoreProperties.scala @@ -22,15 +22,14 @@ import com.twitter.storehaus.{FutureOps, Store} import com.twitter.storehaus.testing.generator.NonEmpty import org.scalacheck.Prop._ import com.twitter.util.Await +import org.apache.hadoop.hbase.HBaseTestingUtility /** * @author MansurAshraf * @since 9/8/13 */ object HBaseStringStoreProperties extends Properties("HBaseStore") -with CloseableCleanup[Store[String, String]] -with DefaultHBaseConfig { - +with DefaultHBaseCluster[Store[String, String]] { def validPairs: Gen[List[(String, Option[String])]] = NonEmpty.Pairing.alphaStrs() @@ -63,9 +62,7 @@ with DefaultHBaseConfig { def storeTest(store: Store[String, String]) = putStoreTest(store, validPairs) && multiPutStoreTest(store, validPairs) - val closeable =HBaseStringStore(quorumNames, table, columnFamily, column, createTable) - - - property("HBaseStore test") = - storeTest(closeable) + testingUtil.startMiniCluster() + val closeable =HBaseStringStore(quorumNames, table, columnFamily, column, createTable,pool,conf) + property("HBaseStore test") =storeTest(closeable) } From 40a99ba9a936de806bd5257f8a0c120958fb5353 Mon Sep 17 00:00:00 2001 From: MansurAshraf Date: Tue, 10 Sep 2013 11:31:18 -0500 Subject: [PATCH 07/10] mocking hbase - second try --- project/Build.scala | 6 +-- .../hbase/HBaseLongStoreProperties.scala | 42 ------------------- 2 files changed, 3 insertions(+), 45 deletions(-) delete mode 100644 storehaus-hbase/src/test/scala/com/twitter/storehaus/hbase/HBaseLongStoreProperties.scala diff --git a/project/Build.scala b/project/Build.scala index 3a609766..3daa1bda 100644 --- a/project/Build.scala +++ b/project/Build.scala @@ -181,9 +181,9 @@ object StorehausBuild extends Build { "com.twitter" %% "algebird-core" % algebirdVersion, "com.twitter" %% "bijection-core" % bijectionVersion, "com.twitter" %% "bijection-hbase" % bijectionVersion , - "org.apache.hbase" % "hbase" % "0.94.4" % "provided->default" classifier "tests" classifier "", - "org.apache.hadoop" % "hadoop-core" % "1.0.4" % "provided->default", - "org.apache.hadoop" % "hadoop-test" % "1.0.4" % "test" + "org.apache.hbase" % "hbase" % "0.94.6" % "provided->default" classifier "tests" classifier "", + "org.apache.hadoop" % "hadoop-core" % "1.2.0" % "provided->default", + "org.apache.hadoop" % "hadoop-test" % "1.2.0" % "test" ), parallelExecution in Test := false ).dependsOn(storehausAlgebra % "test->test;compile->compile") diff --git a/storehaus-hbase/src/test/scala/com/twitter/storehaus/hbase/HBaseLongStoreProperties.scala b/storehaus-hbase/src/test/scala/com/twitter/storehaus/hbase/HBaseLongStoreProperties.scala deleted file mode 100644 index df597c97..00000000 --- a/storehaus-hbase/src/test/scala/com/twitter/storehaus/hbase/HBaseLongStoreProperties.scala +++ /dev/null @@ -1,42 +0,0 @@ -/* - * Copyright 2013 Twitter inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.twitter.storehaus.hbase - -import org.scalacheck.{Gen, Properties} -import com.twitter.storehaus.Store -import com.twitter.storehaus.testing.generator.NonEmpty -import HBaseStringStoreProperties._ - -/** - * @author MansurAshraf - * @since 9/8/13 - */ -object HBaseLongStoreProperties extends Properties("HBaseLongStore") -with DefaultHBaseCluster[Store[String, Long]] { - - def validPairs: Gen[List[(String, Option[Long])]] = - NonEmpty.Pairing.alphaStrNumerics[Long](10) - - def storeTest(store: Store[String, Long]) = - putStoreTest(store, validPairs) && multiPutStoreTest(store, validPairs) - - testingUtil.startMiniCluster() - val closeable = HBaseLongStore(quorumNames, table, columnFamily, column, createTable,pool,conf) - - property("HBaseLongStore test") = storeTest(closeable) - -} From 1677a9a810a9a436290c8f88a777d464aadb1bfe Mon Sep 17 00:00:00 2001 From: MansurAshraf Date: Tue, 10 Sep 2013 11:43:19 -0500 Subject: [PATCH 08/10] removed shutdown hook at it is causing zookeeper to hang --- .../com/twitter/storehaus/hbase/DefaultHBaseCluster.scala | 3 ++- .../twitter/storehaus/hbase/HBaseStringStoreProperties.scala | 1 + 2 files changed, 3 insertions(+), 1 deletion(-) diff --git a/storehaus-hbase/src/test/scala/com/twitter/storehaus/hbase/DefaultHBaseCluster.scala b/storehaus-hbase/src/test/scala/com/twitter/storehaus/hbase/DefaultHBaseCluster.scala index fc47aa73..d15b1de5 100644 --- a/storehaus-hbase/src/test/scala/com/twitter/storehaus/hbase/DefaultHBaseCluster.scala +++ b/storehaus-hbase/src/test/scala/com/twitter/storehaus/hbase/DefaultHBaseCluster.scala @@ -37,6 +37,7 @@ trait DefaultHBaseCluster[C <: Closeable] extends CloseableCleanup[C] { override def cleanup() { super.cleanup() - testingUtil.shutdownMiniCluster() + /* testingUtil.shutdownMiniZKCluster() + testingUtil.shutdownMiniCluster()*/ } } diff --git a/storehaus-hbase/src/test/scala/com/twitter/storehaus/hbase/HBaseStringStoreProperties.scala b/storehaus-hbase/src/test/scala/com/twitter/storehaus/hbase/HBaseStringStoreProperties.scala index 83abcdb0..b0382ec5 100644 --- a/storehaus-hbase/src/test/scala/com/twitter/storehaus/hbase/HBaseStringStoreProperties.scala +++ b/storehaus-hbase/src/test/scala/com/twitter/storehaus/hbase/HBaseStringStoreProperties.scala @@ -65,4 +65,5 @@ with DefaultHBaseCluster[Store[String, String]] { testingUtil.startMiniCluster() val closeable =HBaseStringStore(quorumNames, table, columnFamily, column, createTable,pool,conf) property("HBaseStore test") =storeTest(closeable) + } From ecc5f3b8bb3da5d876951189b3d5be87be31011e Mon Sep 17 00:00:00 2001 From: MansurAshraf Date: Tue, 10 Sep 2013 20:09:42 -0500 Subject: [PATCH 09/10] switched to FuturePool for blocking operations --- .../twitter/storehaus/hbase/HBaseByteArrayStore.scala | 10 ++++++---- .../com/twitter/storehaus/hbase/HBaseLongStore.scala | 10 ++++++---- .../com/twitter/storehaus/hbase/HBaseStore.scala | 11 +++++++---- .../twitter/storehaus/hbase/HBaseStringStore.scala | 10 ++++++---- .../storehaus/hbase/HBaseStringStoreProperties.scala | 2 +- 5 files changed, 26 insertions(+), 17 deletions(-) diff --git a/storehaus-hbase/src/main/scala/com/twitter/storehaus/hbase/HBaseByteArrayStore.scala b/storehaus-hbase/src/main/scala/com/twitter/storehaus/hbase/HBaseByteArrayStore.scala index ae53b29c..cca402f0 100644 --- a/storehaus-hbase/src/main/scala/com/twitter/storehaus/hbase/HBaseByteArrayStore.scala +++ b/storehaus-hbase/src/main/scala/com/twitter/storehaus/hbase/HBaseByteArrayStore.scala @@ -32,8 +32,9 @@ object HBaseByteArrayStore { column: String, createTable: Boolean, pool: HTablePool, - conf: Configuration): HBaseByteArrayStore = { - val store = new HBaseByteArrayStore(quorumNames, table, columnFamily, column, createTable,pool,conf) + conf: Configuration, + threads: Int): HBaseByteArrayStore = { + val store = new HBaseByteArrayStore(quorumNames, table, columnFamily, column, createTable, pool, conf, threads) store.validateConfiguration() store.createTableIfRequired() store @@ -43,7 +44,7 @@ object HBaseByteArrayStore { table: String, columnFamily: String, column: String, - createTable: Boolean): HBaseByteArrayStore = apply(quorumNames,table,columnFamily,column,createTable,new HTablePool(), new Configuration()) + createTable: Boolean): HBaseByteArrayStore = apply(quorumNames, table, columnFamily, column, createTable, new HTablePool(), new Configuration(), 4) } class HBaseByteArrayStore(protected val quorumNames: Seq[String], @@ -52,7 +53,8 @@ class HBaseByteArrayStore(protected val quorumNames: Seq[String], protected val column: String, protected val createTable: Boolean, protected val pool: HTablePool, - protected val conf: Configuration) extends Store[Array[Byte], Array[Byte]] with HBaseStore { + protected val conf: Configuration, + protected val threads: Int) extends Store[Array[Byte], Array[Byte]] with HBaseStore { /** get a single key from the store. * Prefer multiGet if you are getting more than one key at a time diff --git a/storehaus-hbase/src/main/scala/com/twitter/storehaus/hbase/HBaseLongStore.scala b/storehaus-hbase/src/main/scala/com/twitter/storehaus/hbase/HBaseLongStore.scala index 8d3cfc15..230050ee 100644 --- a/storehaus-hbase/src/main/scala/com/twitter/storehaus/hbase/HBaseLongStore.scala +++ b/storehaus-hbase/src/main/scala/com/twitter/storehaus/hbase/HBaseLongStore.scala @@ -33,8 +33,9 @@ object HBaseLongStore { column: String, createTable: Boolean, pool: HTablePool, - conf: Configuration): HBaseLongStore = { - val store = new HBaseLongStore(quorumNames, table, columnFamily, column, createTable, pool, conf) + conf: Configuration, + threads:Int): HBaseLongStore = { + val store = new HBaseLongStore(quorumNames, table, columnFamily, column, createTable, pool, conf,threads) store.validateConfiguration() store.createTableIfRequired() store @@ -44,7 +45,7 @@ object HBaseLongStore { table: String, columnFamily: String, column: String, - createTable: Boolean): HBaseLongStore = apply(quorumNames, table, columnFamily, column, createTable, new HTablePool(), new Configuration()) + createTable: Boolean): HBaseLongStore = apply(quorumNames, table, columnFamily, column, createTable, new HTablePool(), new Configuration(),4) } class HBaseLongStore(protected val quorumNames: Seq[String], @@ -53,7 +54,8 @@ class HBaseLongStore(protected val quorumNames: Seq[String], protected val column: String, protected val createTable: Boolean, protected val pool: HTablePool, - protected val conf: Configuration) extends Store[String, Long] with HBaseStore { + protected val conf: Configuration, + protected val threads:Int) extends Store[String, Long] with HBaseStore { /** get a single key from the store. * Prefer multiGet if you are getting more than one key at a time diff --git a/storehaus-hbase/src/main/scala/com/twitter/storehaus/hbase/HBaseStore.scala b/storehaus-hbase/src/main/scala/com/twitter/storehaus/hbase/HBaseStore.scala index f9be24a2..b583c755 100644 --- a/storehaus-hbase/src/main/scala/com/twitter/storehaus/hbase/HBaseStore.scala +++ b/storehaus-hbase/src/main/scala/com/twitter/storehaus/hbase/HBaseStore.scala @@ -22,8 +22,9 @@ import org.apache.hadoop.hbase.{HColumnDescriptor, HTableDescriptor, HBaseConfig import com.twitter.bijection.hbase.HBaseBijections._ import com.twitter.bijection.Conversion._ import com.twitter.bijection.Injection -import com.twitter.util.Future +import com.twitter.util.{FuturePool, Future} import scala.Some +import java.util.concurrent.Executors /** * @author Mansur Ashraf @@ -38,6 +39,8 @@ trait HBaseStore { protected val column: String protected val pool: HTablePool protected val conf: Configuration + protected val threads: Int + protected val futurePool = FuturePool(Executors.newFixedThreadPool(threads)) def getHBaseAdmin: HBaseAdmin = { if (conf.get("hbase.zookeeper.quorum") == null) { @@ -64,7 +67,7 @@ trait HBaseStore { require(isNotEmpty(column), "column is required") } - def getValue[K, V](key: K)(implicit keyInj: Injection[K, Array[Byte]], valueInj: Injection[V, Array[Byte]]): Future[Option[V]] = Future { + def getValue[K, V](key: K)(implicit keyInj: Injection[K, Array[Byte]], valueInj: Injection[V, Array[Byte]]): Future[Option[V]] = futurePool { val tbl = pool.getTable(table) val g = new Get(keyInj(key)) g.addColumn(columnFamily.as[StringBytes], column.as[StringBytes]) @@ -76,13 +79,13 @@ trait HBaseStore { def putValue[K, V](kv: (K, Option[V]))(implicit keyInj: Injection[K, Array[Byte]], valueInj: Injection[V, Array[Byte]]): Future[Unit] = { kv match { - case (k, Some(v)) => Future { + case (k, Some(v)) => futurePool { val p = new Put(keyInj(k)) p.add(columnFamily.as[StringBytes], column.as[StringBytes], valueInj(v)) val tbl = pool.getTable(table) tbl.put(p) } - case (k, None) => Future { + case (k, None) => futurePool { val delete = new Delete(keyInj(k)) val tbl = pool.getTable(table) tbl.delete(delete) diff --git a/storehaus-hbase/src/main/scala/com/twitter/storehaus/hbase/HBaseStringStore.scala b/storehaus-hbase/src/main/scala/com/twitter/storehaus/hbase/HBaseStringStore.scala index aa813d9d..e153093c 100644 --- a/storehaus-hbase/src/main/scala/com/twitter/storehaus/hbase/HBaseStringStore.scala +++ b/storehaus-hbase/src/main/scala/com/twitter/storehaus/hbase/HBaseStringStore.scala @@ -33,8 +33,9 @@ object HBaseStringStore { column: String, createTable: Boolean, pool: HTablePool, - conf: Configuration): HBaseStringStore = { - val store = new HBaseStringStore(quorumNames, table, columnFamily, column, createTable, pool, conf) + conf: Configuration, + threads: Int): HBaseStringStore = { + val store = new HBaseStringStore(quorumNames, table, columnFamily, column, createTable, pool, conf, threads) store.validateConfiguration() store.createTableIfRequired() store @@ -44,7 +45,7 @@ object HBaseStringStore { table: String, columnFamily: String, column: String, - createTable: Boolean): HBaseStringStore = apply(quorumNames, table, columnFamily, column, createTable, new HTablePool(), new Configuration()) + createTable: Boolean): HBaseStringStore = apply(quorumNames, table, columnFamily, column, createTable, new HTablePool(), new Configuration(), 4) } class HBaseStringStore(protected val quorumNames: Seq[String], @@ -53,7 +54,8 @@ class HBaseStringStore(protected val quorumNames: Seq[String], protected val column: String, protected val createTable: Boolean, protected val pool: HTablePool, - protected val conf: Configuration) extends Store[String, String] with HBaseStore { + protected val conf: Configuration, + protected val threads: Int) extends Store[String, String] with HBaseStore { /** get a single key from the store. * Prefer multiGet if you are getting more than one key at a time */ diff --git a/storehaus-hbase/src/test/scala/com/twitter/storehaus/hbase/HBaseStringStoreProperties.scala b/storehaus-hbase/src/test/scala/com/twitter/storehaus/hbase/HBaseStringStoreProperties.scala index b0382ec5..ec4ce3f2 100644 --- a/storehaus-hbase/src/test/scala/com/twitter/storehaus/hbase/HBaseStringStoreProperties.scala +++ b/storehaus-hbase/src/test/scala/com/twitter/storehaus/hbase/HBaseStringStoreProperties.scala @@ -63,7 +63,7 @@ with DefaultHBaseCluster[Store[String, String]] { putStoreTest(store, validPairs) && multiPutStoreTest(store, validPairs) testingUtil.startMiniCluster() - val closeable =HBaseStringStore(quorumNames, table, columnFamily, column, createTable,pool,conf) + val closeable =HBaseStringStore(quorumNames, table, columnFamily, column, createTable,pool,conf,4) property("HBaseStore test") =storeTest(closeable) } From 2cbef56e33425856903bf30902c21df8c6362901 Mon Sep 17 00:00:00 2001 From: MansurAshraf Date: Tue, 10 Sep 2013 20:11:54 -0500 Subject: [PATCH 10/10] removed unused imports --- .../twitter/storehaus/hbase/HBaseStringStoreProperties.scala | 2 -- 1 file changed, 2 deletions(-) diff --git a/storehaus-hbase/src/test/scala/com/twitter/storehaus/hbase/HBaseStringStoreProperties.scala b/storehaus-hbase/src/test/scala/com/twitter/storehaus/hbase/HBaseStringStoreProperties.scala index ec4ce3f2..6506bd95 100644 --- a/storehaus-hbase/src/test/scala/com/twitter/storehaus/hbase/HBaseStringStoreProperties.scala +++ b/storehaus-hbase/src/test/scala/com/twitter/storehaus/hbase/HBaseStringStoreProperties.scala @@ -17,12 +17,10 @@ package com.twitter.storehaus.hbase import org.scalacheck.{Arbitrary, Gen, Properties} -import com.twitter.storehaus.testing.CloseableCleanup import com.twitter.storehaus.{FutureOps, Store} import com.twitter.storehaus.testing.generator.NonEmpty import org.scalacheck.Prop._ import com.twitter.util.Await -import org.apache.hadoop.hbase.HBaseTestingUtility /** * @author MansurAshraf