Skip to content

Commit

Permalink
Merge pull request #14 from twitter/develop
Browse files Browse the repository at this point in the history
Merge from twitter/storehaus
  • Loading branch information
rubanm committed Nov 22, 2013
2 parents 4d67327 + 81ecf6b commit ee62bc4
Show file tree
Hide file tree
Showing 8 changed files with 410 additions and 28 deletions.
2 changes: 1 addition & 1 deletion project/Build.scala
Original file line number Diff line number Diff line change
Expand Up @@ -173,7 +173,7 @@ object StorehausBuild extends Build {

lazy val storehausMySQL = module("mysql").settings(
libraryDependencies += Finagle.module("mysql")
).dependsOn(storehausCore % "test->test;compile->compile")
).dependsOn(storehausAlgebra % "test->test;compile->compile")

lazy val storehausRedis = module("redis").settings(
libraryDependencies ++= Seq (
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
/*
* Copyright 2013 Twitter Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License. You may obtain
* a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.twitter.storehaus

import com.twitter.bijection.Injection
import com.twitter.util.{ Future, Time }

import scala.util.{ Failure, Success }

/**
* ReadableStore enrichment for ReadableStore[OuterK, ReadableStore[InnerK, V]]
* on top of a ReadableStore[K, V]
*
* @author Ruban Monu
*/
object PivotedReadableStore {

def fromMap[K, OuterK, InnerK, V](m: Map[K, V])(implicit inj: Injection[(OuterK, InnerK), K]) =
new PivotedReadableStore[K, OuterK, InnerK, V](ReadableStore.fromMap(m))(inj)

def fromReadableStore[K, OuterK, InnerK, V](store: ReadableStore[K, V])(implicit inj: Injection[(OuterK, InnerK), K]) =
new PivotedReadableStore[K, OuterK, InnerK, V](store)(inj)
}

class PivotedReadableStore[K, -OuterK, InnerK, +V](store: ReadableStore[K, V])(implicit inj: Injection[(OuterK, InnerK), K])
extends ReadableStore[OuterK, ReadableStore[InnerK, V]] {

override def get(outerK: OuterK) : Future[Option[ReadableStore[InnerK, V]]] =
Future.value(Some(new ReadableStore[InnerK, V]() {
override def get(innerK: InnerK) = store.get(inj((outerK, innerK)))
}))

override def close(time: Time) = store.close(time)
}

Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
/*
* Copyright 2013 Twitter Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License. You may obtain
* a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.twitter.storehaus

import com.twitter.bijection.Injection
import com.twitter.storehaus.testing.SelfAggregatingCloseableCleanup
import com.twitter.util.Await

import org.scalacheck.Properties

import scala.util.Try

object PivotedReadableStoreProperties extends Properties("PivotedReadableStore")
with SelfAggregatingCloseableCleanup[PivotedReadableStore[String, String, Int, String]] {

// (prefix, num) => "prefix/num"
implicit object PivotInjection extends Injection[(String, Int), String] {
def apply(pair: (String, Int)): String = pair._1 + "/" + pair._2.toString
override def invert(s: String) = {
val parts = s.split('/')
Try((parts(0), parts(1).toInt))
}
}

def getStoreTest(store: PivotedReadableStore[String, String, Int, String]) = {
val innerStore1 = Await.result(store.get("prefix1")).get
val innerStore2 = Await.result(store.get("prefix2")).get
(0 until 100).toList.forall { case n =>
Await.result(innerStore1.get(n)).get == "value1" + n.toString
Await.result(innerStore2.get(n)).get == "value2" + n.toString
}
}

def multiGetStoreTest(store: PivotedReadableStore[String, String, Int, String]) = {
val innerStores = store.multiGet(Set("prefix1", "prefix2"))
val innerStore1 = Await.result(innerStores.get("prefix1").get).get
val innerStore2 = Await.result(innerStores.get("prefix2").get).get
(0 until 100).toList.forall { case n =>
Await.result(innerStore1.get(n)).get == "value1" + n.toString
Await.result(innerStore2.get(n)).get == "value2" + n.toString
}
}

property("PivotedReadableStore gets over MapStore") = {
val map1 : Map[String, String] = (0 until 100).toList.map { case n =>
(PivotInjection(("prefix1", n)), "value1" + n.toString)
}.toMap
val map2 : Map[String, String] = (0 until 100).toList.map { case n =>
(PivotInjection(("prefix2", n)), "value2" + n.toString)
}.toMap

val store = PivotedReadableStore.fromMap[String, String, Int, String](map1 ++ map2)

getStoreTest(store) && multiGetStoreTest(store)
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
/*
* Copyright 2013 Twitter Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License. You may obtain
* a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.twitter.storehaus.mysql

import com.twitter.algebird.Semigroup
import com.twitter.bijection.Injection
import com.twitter.finagle.exp.mysql.Client
import com.twitter.storehaus.algebra.MergeableStore
import com.twitter.storehaus.ConvertedStore
import com.twitter.storehaus.FutureOps
import com.twitter.util.{ Future, Throw }

/**
* @author Ruban Monu
*/

/**
* Mergeable MySQL store that performs merge inside a transaction.
*/
class MergeableMySqlStore[V](underlying: MySqlStore)(implicit inj: Injection[V, MySqlValue],
override val semigroup: Semigroup[V])
extends ConvertedStore[MySqlValue, MySqlValue, MySqlValue, V](underlying)(identity)
with MergeableStore[MySqlValue, V] {

// Merges multiple keys inside a transaction.
// 1. existing keys are fetched using multiGet (SELECT query)
// 2. new keys are added using INSERT query
// 3. existing keys are merged using UPDATE query
// NOTE: merge on a single key also in turn calls this
override def multiMerge[K1 <: MySqlValue](kvs: Map[K1, V]): Map[K1, Future[Option[V]]] = {
val mergeResult : Future[Map[K1, Option[V]]] = underlying.startTransaction.flatMap { u: Unit =>
FutureOps.mapCollect(multiGet(kvs.keySet)).flatMap { result: Map[K1, Option[V]] =>
val existingKeys = result.filter { !_._2.isEmpty }.keySet
val newKeys = result.filter { _._2.isEmpty }.keySet

// handle inserts for new keys
val insertF = newKeys.isEmpty match {
case true => Future.Unit
case false =>
val insertKvs = newKeys.map { k => k -> kvs.get(k).get }
insertKvs.isEmpty match {
case true => Future.Unit
case false => underlying.executeMultiInsert(insertKvs.toMap.mapValues { v => inj(v) })
}
}

// handle update/merge for existing keys
// lazy val realized inside of insertF.flatMap
lazy val updateF = existingKeys.isEmpty match {
case true => Future.Unit
case false =>
val existingKvs = existingKeys.map { k => k -> kvs.get(k).get }
underlying.executeMultiUpdate(existingKvs.map { kv =>
val resV = semigroup.plus(result.get(kv._1).get.get, kv._2)
kv._1 -> inj(resV)
}.toMap)
}

// insert, update and commit or rollback accordingly
insertF.flatMap { f =>
updateF.flatMap { f =>
underlying.commitTransaction.map { f =>
// return values before the merge
result
}
}
.onFailure { case e: Exception =>
underlying.rollbackTransaction.map { f =>
// map values to exception
result.mapValues { v => e }
}
}
}
}
}
FutureOps.liftValues(kvs.keySet, mergeResult)
}
}

Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ object MySqlStore {
* val store = MySqlStore(client, "storehaus-mysql-test", "key", "value")
* }}}
*/
class MySqlStore(client: Client, table: String, kCol: String, vCol: String)
class MySqlStore(protected [mysql] val client: Client, table: String, kCol: String, vCol: String)
extends Store[MySqlValue, MySqlValue] {

protected val SELECT_SQL = "SELECT " + g(vCol) + " FROM " + g(table) + " WHERE " + g(kCol) + "=?"
Expand Down Expand Up @@ -93,10 +93,36 @@ class MySqlStore(client: Client, table: String, kCol: String, vCol: String)
protected val updateStmt = Await.result(client.prepare(UPDATE_SQL))
protected val deleteStmt = Await.result(client.prepare(DELETE_SQL))

protected def startTransaction : Future[Unit] = client.query(START_TXN_SQL).unit
protected def commitTransaction : Future[Unit] = client.query(COMMIT_TXN_SQL).unit
protected def rollbackTransaction : Future[Unit] = client.query(ROLLBACK_TXN_SQL).unit
protected [mysql] def startTransaction : Future[Unit] = client.query(START_TXN_SQL).unit
protected [mysql] def commitTransaction : Future[Unit] = client.query(COMMIT_TXN_SQL).unit
protected [mysql] def rollbackTransaction : Future[Unit] = client.query(ROLLBACK_TXN_SQL).unit

protected [mysql] def executeMultiInsert[K1 <: MySqlValue](kvs: Map[K1, MySqlValue]) = {
val insertSql = MULTI_INSERT_SQL_PREFIX + Stream.continually("(?, ?)").take(kvs.size).mkString(",")
val insertParams = kvs.map { kv =>
List(MySqlStringInjection(kv._1).getBytes, MySqlStringInjection(kv._2).getBytes)
}.toSeq.flatten
client.prepareAndExecute(insertSql, insertParams:_*).map { case (ps, r) =>
// close prepared statement on server
client.closeStatement(ps)
}
}

protected [mysql] def executeMultiUpdate[K1 <: MySqlValue](kvs: Map[K1, MySqlValue]) = {
val updateSql = MULTI_UPDATE_SQL_PREFIX + Stream.continually("WHEN ? THEN ?").take(kvs.size).mkString(" ") +
MULTI_UPDATE_SQL_INFIX + Stream.continually("?").take(kvs.size).mkString("(", ",", ")")
val updateParams = kvs.map { kv =>
(MySqlStringInjection(kv._1).getBytes, MySqlStringInjection(kv._2).getBytes)
}
// params for "WHEN ? THEN ?"
val updateCaseParams = updateParams.map { kv => List(kv._1, kv._2) }.toSeq.flatten
// params for "IN (?, ?, ?)"
val updateInParams = updateParams.map { kv => kv._1 }.toSeq
client.prepareAndExecute(updateSql, (updateCaseParams ++ updateInParams):_*).map { case (ps, r) =>
// close prepared statement on server
client.closeStatement(ps)
}
}

override def get(k: MySqlValue): Future[Option[MySqlValue]] = {
// finagle-mysql select() method lets you pass in a mapping function
Expand Down Expand Up @@ -153,17 +179,10 @@ class MySqlStore(client: Client, table: String, kCol: String, vCol: String)
case false =>
// do not include None values in insert query
val insertKvs = newKeys.map { k => k -> kvs.getOrElse(k, None) }.filter { ! _._2.isEmpty }
.toMap.mapValues { v => v.get }
insertKvs.isEmpty match {
case true => Future.Unit
case false =>
val insertSql = MULTI_INSERT_SQL_PREFIX + Stream.continually("(?, ?)").take(insertKvs.size).mkString(",")
val insertParams = insertKvs.map { kv =>
List(MySqlStringInjection(kv._1).getBytes, MySqlStringInjection(kv._2.get).getBytes)
}.toSeq.flatten
client.prepareAndExecute(insertSql, insertParams:_*).map { case (ps, r) =>
// close prepared statement on server
client.closeStatement(ps)
}
case false => executeMultiInsert(insertKvs)
}
}

Expand All @@ -172,22 +191,10 @@ class MySqlStore(client: Client, table: String, kCol: String, vCol: String)

// do not include None values in update query
val updateKvs = existingKvs.filter { ! _._2.isEmpty }
.toMap.mapValues { v => v.get }
lazy val updateF = updateKvs.isEmpty match {
case true => Future.Unit
case false =>
val updateSql = MULTI_UPDATE_SQL_PREFIX + Stream.continually("WHEN ? THEN ?").take(updateKvs.size).mkString(" ") +
MULTI_UPDATE_SQL_INFIX + Stream.continually("?").take(updateKvs.size).mkString("(", ",", ")")
val updateParams = updateKvs.map { kv =>
(MySqlStringInjection(kv._1).getBytes, MySqlStringInjection(kv._2.get).getBytes)
}
// params for "WHEN ? THEN ?"
val updateCaseParams = updateParams.map { kv => List(kv._1, kv._2) }.toSeq.flatten
// params for "IN (?, ?, ?)"
val updateInParams = updateParams.map { kv => kv._1 }.toSeq
client.prepareAndExecute(updateSql, (updateCaseParams ++ updateInParams):_*).map { case (ps, r) =>
// close prepared statement on server
client.closeStatement(ps)
}
case false => executeMultiUpdate(updateKvs)
}

// deletes
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
/*
* Copyright 2013 Twitter Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License. You may obtain
* a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.twitter.storehaus.mysql

import com.twitter.algebird.Semigroup
import com.twitter.bijection.Injection
import com.twitter.finagle.exp.mysql.Client
import com.twitter.storehaus.ConvertedStore
import com.twitter.storehaus.algebra.MergeableStore

import org.jboss.netty.buffer.ChannelBuffer

/**
* @author Ruban Monu
*/

/** Factory for [[com.twitter.storehaus.mysql.MySqlLongStore]] instances. */
object MySqlLongStore {

def apply(underlying: MySqlStore) =
new MySqlLongStore(underlying)(LongMySqlInjection)

def apply(client: Client, table: String, kCol: String, vCol: String) =
new MySqlLongStore(MySqlStore(client, table, kCol, vCol))(LongMySqlInjection)
}
import MySqlLongStore._

/** MySQL store for Long values */
class MySqlLongStore(underlying: MySqlStore)(inj: Injection[Long, MySqlValue])
extends MergeableMySqlStore[Long](underlying)(inj, implicitly[Semigroup[Long]]) {
}

Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,10 @@ object ValueMapper {
case _ => throw new UnsupportedOperationException(v.getClass.getName + " is currently not supported.")
}
}

def toLong(v: Value): Option[Long] = {
toString(v).map { _.toLong }
}
}

/** Factory for [[com.twitter.storehaus.mysql.MySqlValue]] instances. */
Expand Down Expand Up @@ -134,3 +138,8 @@ object MySqlCbInjection extends Injection[MySqlValue, ChannelBuffer] {
def apply(a: MySqlValue): ChannelBuffer = ValueMapper.toChannelBuffer(a.v).getOrElse(ChannelBuffers.EMPTY_BUFFER)
override def invert(b: ChannelBuffer) = Try(MySqlValue(RawStringValue(b.toString(UTF_8))))
}

object LongMySqlInjection extends Injection[Long, MySqlValue] {
def apply(a: Long): MySqlValue = MySqlValue(a)
override def invert(b: MySqlValue) = Try(ValueMapper.toLong(b.v).get)
}
Loading

0 comments on commit ee62bc4

Please sign in to comment.