Skip to content

Commit

Permalink
Merge pull request #139 from MansurAshraf/develop
Browse files Browse the repository at this point in the history
Added storehaus-hbase
  • Loading branch information
sritchie committed Sep 11, 2013
2 parents b835bdb + 2cbef56 commit 2101928
Show file tree
Hide file tree
Showing 10 changed files with 493 additions and 6 deletions.
3 changes: 2 additions & 1 deletion .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -8,4 +8,5 @@ before_script:
- mysql -u root -e "grant all on storehaus_test.* to 'storehaususer'@'localhost';"
services:
- redis-server
- memcache
- memcache
script: umask 0022 && sbt ++$TRAVIS_SCALA_VERSION test
33 changes: 31 additions & 2 deletions project/Build.scala
Original file line number Diff line number Diff line change
@@ -1,3 +1,19 @@
/*
* Copyright 2013 Twitter inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package storehaus

import sbt._
Expand Down Expand Up @@ -96,7 +112,7 @@ object StorehausBuild extends Build {
.map { s => "com.twitter" % ("storehaus-" + s + "_2.9.3") % "0.5.0" }

val algebirdVersion = "0.2.0"
val bijectionVersion = "0.5.2"
val bijectionVersion = "0.5.3"

lazy val storehaus = Project(
id = "storehaus",
Expand All @@ -113,6 +129,7 @@ object StorehausBuild extends Build {
storehausMemcache,
storehausMySQL,
storehausRedis,
storehausHBase,
storehausTesting
)

Expand All @@ -137,7 +154,7 @@ object StorehausBuild extends Build {
lazy val storehausAlgebra = module("algebra").settings(
libraryDependencies += "com.twitter" %% "algebird-core" % algebirdVersion,
libraryDependencies += "com.twitter" %% "algebird-util" % algebirdVersion,
libraryDependencies += "com.twitter" %% "bijection-algebird" % bijectionVersion
libraryDependencies += "com.twitter" %% "algebird-bijection" % algebirdVersion
).dependsOn(storehausCore % "test->test;compile->compile")

lazy val storehausMemcache = module("memcache").settings(
Expand All @@ -159,6 +176,18 @@ object StorehausBuild extends Build {
parallelExecution in Test := false
).dependsOn(storehausAlgebra % "test->test;compile->compile")

lazy val storehausHBase= module("hbase").settings(
libraryDependencies ++= Seq(
"com.twitter" %% "algebird-core" % algebirdVersion,
"com.twitter" %% "bijection-core" % bijectionVersion,
"com.twitter" %% "bijection-hbase" % bijectionVersion ,
"org.apache.hbase" % "hbase" % "0.94.6" % "provided->default" classifier "tests" classifier "",
"org.apache.hadoop" % "hadoop-core" % "1.2.0" % "provided->default",
"org.apache.hadoop" % "hadoop-test" % "1.2.0" % "test"
),
parallelExecution in Test := false
).dependsOn(storehausAlgebra % "test->test;compile->compile")

val storehausTesting = Project(
id = "storehaus-testing",
base = file("storehaus-testing"),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ class ConvertedMergeableStore[K1, -K2, V1, V2](store: MergeableStore[K1, V1])(kf
(implicit bij: ImplicitBijection[V2, V1])
extends com.twitter.storehaus.ConvertedStore[K1, K2, V1, V2](store)(kfn)(Injection.fromBijection(bij.bijection))
with MergeableStore[K2, V2] {
import com.twitter.bijection.algebird.AlgebirdBijections._
import com.twitter.algebird.bijection.AlgebirdBijections._

override def monoid: Monoid[V2] = store.monoid.as[Monoid[V2]]

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
package com.twitter.storehaus.algebra

import com.twitter.algebird.{ MapAlgebra, Monoid, SummingQueue }
import com.twitter.bijection.algebird.AlgebirdBijections._
import com.twitter.algebird.bijection.AlgebirdBijections._
import com.twitter.bijection.Injection
import com.twitter.storehaus._
import com.twitter.util.Await
Expand Down Expand Up @@ -93,7 +93,7 @@ object MergeableStoreProperties extends Properties("MergeableStore") {

property("Converted MergeableStore obeys the mergeable store laws") = {
// We are using a weird monoid on Int here:
import com.twitter.bijection.algebird.AlgebirdBijections._
import com.twitter.algebird.bijection.AlgebirdBijections._
import com.twitter.bijection.Conversion.asMethod
implicit val monoid : Monoid[Int] = implicitly[Monoid[(Short,Short)]].as[Monoid[Int]]

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
/*
* Copyright 2013 Twitter inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.twitter.storehaus.hbase

import org.apache.hadoop.hbase.client.HTablePool
import com.twitter.storehaus.Store
import com.twitter.util.Future
import org.apache.hadoop.conf.Configuration

/**
* @author MansurAshraf
* @since 9/8/13
*/
object HBaseByteArrayStore {
def apply(quorumNames: Seq[String],
table: String,
columnFamily: String,
column: String,
createTable: Boolean,
pool: HTablePool,
conf: Configuration,
threads: Int): HBaseByteArrayStore = {
val store = new HBaseByteArrayStore(quorumNames, table, columnFamily, column, createTable, pool, conf, threads)
store.validateConfiguration()
store.createTableIfRequired()
store
}

def apply(quorumNames: Seq[String],
table: String,
columnFamily: String,
column: String,
createTable: Boolean): HBaseByteArrayStore = apply(quorumNames, table, columnFamily, column, createTable, new HTablePool(), new Configuration(), 4)
}

class HBaseByteArrayStore(protected val quorumNames: Seq[String],
protected val table: String,
protected val columnFamily: String,
protected val column: String,
protected val createTable: Boolean,
protected val pool: HTablePool,
protected val conf: Configuration,
protected val threads: Int) extends Store[Array[Byte], Array[Byte]] with HBaseStore {

/** get a single key from the store.
* Prefer multiGet if you are getting more than one key at a time
*/
override def get(k: Array[Byte]): Future[Option[Array[Byte]]] = {
getValue(k)
}

/**
* replace a value
* Delete is the same as put((k,None))
*/
override def put(kv: (Array[Byte], Option[Array[Byte]])): Future[Unit] = {
putValue(kv)
}

/** Close this store and release any resources.
* It is undefined what happens on get/multiGet after close
*/
override def close {
pool.close()
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
/*
* Copyright 2013 Twitter inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.twitter.storehaus.hbase

import org.apache.hadoop.hbase.client.HTablePool
import com.twitter.storehaus.Store
import com.twitter.util.Future
import com.twitter.bijection.Injection._
import org.apache.hadoop.conf.Configuration

/**
* @author Mansur Ashraf
* @since 9/8/13
*/
object HBaseLongStore {
def apply(quorumNames: Seq[String],
table: String,
columnFamily: String,
column: String,
createTable: Boolean,
pool: HTablePool,
conf: Configuration,
threads:Int): HBaseLongStore = {
val store = new HBaseLongStore(quorumNames, table, columnFamily, column, createTable, pool, conf,threads)
store.validateConfiguration()
store.createTableIfRequired()
store
}

def apply(quorumNames: Seq[String],
table: String,
columnFamily: String,
column: String,
createTable: Boolean): HBaseLongStore = apply(quorumNames, table, columnFamily, column, createTable, new HTablePool(), new Configuration(),4)
}

class HBaseLongStore(protected val quorumNames: Seq[String],
protected val table: String,
protected val columnFamily: String,
protected val column: String,
protected val createTable: Boolean,
protected val pool: HTablePool,
protected val conf: Configuration,
protected val threads:Int) extends Store[String, Long] with HBaseStore {

/** get a single key from the store.
* Prefer multiGet if you are getting more than one key at a time
*/
override def get(k: String): Future[Option[Long]] = {
import com.twitter.bijection.hbase.HBaseBijections._
implicit val stringInj = fromBijectionRep[String, StringBytes]
implicit val LongInj = fromBijectionRep[Long, LongBytes]
getValue[String, Long](k)
}

/**
* replace a value
* Delete is the same as put((k,None))
*/
override def put(kv: (String, Option[Long])): Future[Unit] = {
import com.twitter.bijection.hbase.HBaseBijections._
implicit val stringInj = fromBijectionRep[String, StringBytes]
implicit val LongInj = fromBijectionRep[Long, LongBytes]
putValue(kv)
}

/** Close this store and release any resources.
* It is undefined what happens on get/multiGet after close
*/
override def close {
pool.close()
}
}

Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
/*
* Copyright 2013 Twitter inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.twitter.storehaus.hbase

import org.apache.hadoop.hbase.client._
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.hbase.{HColumnDescriptor, HTableDescriptor, HBaseConfiguration}
import com.twitter.bijection.hbase.HBaseBijections._
import com.twitter.bijection.Conversion._
import com.twitter.bijection.Injection
import com.twitter.util.{FuturePool, Future}
import scala.Some
import java.util.concurrent.Executors

/**
* @author Mansur Ashraf
* @since 9/8/13
*/
trait HBaseStore {

protected val quorumNames: Seq[String]
protected val createTable: Boolean
protected val table: String
protected val columnFamily: String
protected val column: String
protected val pool: HTablePool
protected val conf: Configuration
protected val threads: Int
protected val futurePool = FuturePool(Executors.newFixedThreadPool(threads))

def getHBaseAdmin: HBaseAdmin = {
if (conf.get("hbase.zookeeper.quorum") == null) {
conf.set("hbase.zookeeper.quorum", quorumNames.mkString(","))
}
val hbaseConf = HBaseConfiguration.create(conf)
new HBaseAdmin(hbaseConf)
}

def createTableIfRequired() {
val hbaseAdmin = getHBaseAdmin
if (createTable && !hbaseAdmin.tableExists(table)) {
val tableDescriptor = new HTableDescriptor(table)
tableDescriptor.addFamily(new HColumnDescriptor(columnFamily))
hbaseAdmin.createTable(tableDescriptor)
}
}

def validateConfiguration() {
import org.apache.commons.lang.StringUtils.isNotEmpty

require(!quorumNames.isEmpty, "Zookeeper quorums are required")
require(isNotEmpty(columnFamily), "column family is required")
require(isNotEmpty(column), "column is required")
}

def getValue[K, V](key: K)(implicit keyInj: Injection[K, Array[Byte]], valueInj: Injection[V, Array[Byte]]): Future[Option[V]] = futurePool {
val tbl = pool.getTable(table)
val g = new Get(keyInj(key))
g.addColumn(columnFamily.as[StringBytes], column.as[StringBytes])

val result = tbl.get(g)
val value = result.getValue(columnFamily.as[StringBytes], column.as[StringBytes])
Option(value).map(v => valueInj.invert(v).get)
}

def putValue[K, V](kv: (K, Option[V]))(implicit keyInj: Injection[K, Array[Byte]], valueInj: Injection[V, Array[Byte]]): Future[Unit] = {
kv match {
case (k, Some(v)) => futurePool {
val p = new Put(keyInj(k))
p.add(columnFamily.as[StringBytes], column.as[StringBytes], valueInj(v))
val tbl = pool.getTable(table)
tbl.put(p)
}
case (k, None) => futurePool {
val delete = new Delete(keyInj(k))
val tbl = pool.getTable(table)
tbl.delete(delete)
}
}
}
}
Loading

0 comments on commit 2101928

Please sign in to comment.