Skip to content

Commit

Permalink
Merge pull request #247 from dschobel/bump_finagle_6_22_0
Browse files Browse the repository at this point in the history
bump finagle and util to 6.22.0
  • Loading branch information
rubanm committed Dec 11, 2014
2 parents dae8611 + 463cde4 commit 841b2e7
Show file tree
Hide file tree
Showing 5 changed files with 40 additions and 48 deletions.
2 changes: 1 addition & 1 deletion project/Build.scala
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@ object StorehausBuild extends Build {

val algebirdVersion = "0.7.0"
val bijectionVersion = "0.6.3"
val utilVersion = "6.11.0"
val utilVersion = "6.22.0"
val scaldingVersion = "0.11.1"
lazy val storehaus = Project(

Expand Down
2 changes: 1 addition & 1 deletion project/Finagle.scala
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ package storehaus
* dependency */
object Finagle {
import sbt._
val LatestVersion = "6.12.2"
val LatestVersion = "6.22.0"
def module(name: String, version: String = LatestVersion) =
StorehausBuild.withCross("com.twitter" %% "finagle-%s".format(name) % version)
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@

package com.twitter.storehaus.mysql

import com.twitter.finagle.exp.mysql.{ Client, PreparedStatement, Result }
import com.twitter.finagle.exp.mysql.{ Client, Result }
import com.twitter.storehaus.FutureOps
import com.twitter.storehaus.Store
import com.twitter.util.{ Await, Future, Time }
Expand Down Expand Up @@ -88,10 +88,10 @@ class MySqlStore(protected [mysql] val client: Client, table: String, kCol: Stri

// prepared statements to be reused across gets and puts
// TODO: should this be non-blocking? this is part of object construction, so maybe not?
protected val selectStmt = Await.result(client.prepare(SELECT_SQL))
protected val insertStmt = Await.result(client.prepare(INSERT_SQL))
protected val updateStmt = Await.result(client.prepare(UPDATE_SQL))
protected val deleteStmt = Await.result(client.prepare(DELETE_SQL))
protected val selectStmt = client.prepare(SELECT_SQL)
protected val insertStmt = client.prepare(INSERT_SQL)
protected val updateStmt = client.prepare(UPDATE_SQL)
protected val deleteStmt = client.prepare(DELETE_SQL)

protected [mysql] def startTransaction : Future[Unit] = client.query(START_TXN_SQL).unit
protected [mysql] def commitTransaction : Future[Unit] = client.query(COMMIT_TXN_SQL).unit
Expand All @@ -102,10 +102,7 @@ class MySqlStore(protected [mysql] val client: Client, table: String, kCol: Stri
val insertParams = kvs.map { kv =>
List(MySqlStringInjection(kv._1).getBytes, MySqlStringInjection(kv._2).getBytes)
}.toSeq.flatten
client.prepareAndExecute(insertSql, insertParams:_*).map { case (ps, r) =>
// close prepared statement on server
client.closeStatement(ps)
}
client.prepare(insertSql)(insertParams:_*)
}

protected [mysql] def executeMultiUpdate[K1 <: MySqlValue](kvs: Map[K1, MySqlValue]) = {
Expand All @@ -118,37 +115,35 @@ class MySqlStore(protected [mysql] val client: Client, table: String, kCol: Stri
val updateCaseParams = updateParams.map { kv => List(kv._1, kv._2) }.toSeq.flatten
// params for "IN (?, ?, ?)"
val updateInParams = updateParams.map { kv => kv._1 }.toSeq
client.prepareAndExecute(updateSql, (updateCaseParams ++ updateInParams):_*).map { case (ps, r) =>
// close prepared statement on server
client.closeStatement(ps)
}
client.prepare(updateSql)((updateCaseParams ++ updateInParams):_*)
}

override def get(k: MySqlValue): Future[Option[MySqlValue]] = {
// finagle-mysql select() method lets you pass in a mapping function
// to convert resultset into desired output format
// we assume here the mysql client already has the dbname/schema selected
selectStmt.parameters = Array(MySqlStringInjection(k).getBytes)
val mysqlResult: Future[Seq[Option[MySqlValue]]] = client.select(selectStmt) { row =>
row(vCol) match { case None => None; case Some(v) => Some(MySqlValue(v)) }
}
mysqlResult.map { case result =>
result.lift(0).flatten.headOption
}
val mysqlResult =
selectStmt.select(MySqlStringInjection(k).getBytes) { row =>
row(vCol).map { MySqlValue(_)}
}

mysqlResult.map { result => result.lift(0).flatten.headOption }
}

override def multiGet[K1 <: MySqlValue](ks: Set[K1]): Map[K1, Future[Option[MySqlValue]]] = {
if (ks.isEmpty) return Map()
// build preparedstatement based on keyset size
val placeholders = Stream.continually("?").take(ks.size).mkString("(", ",", ")")
val selectSql = MULTI_SELECT_SQL_PREFIX + placeholders
val mysqlResult: Future[(PreparedStatement,Seq[(Option[MySqlValue], Option[MySqlValue])])] =
client.prepareAndSelect(selectSql, ks.map(key => MySqlStringInjection(key).getBytes).toSeq:_* ) { row =>
(row(kCol).map(MySqlValue(_)), row(vCol).map(MySqlValue(_)))
}
FutureOps.liftValues(ks,
mysqlResult.map { case (ps, rows) =>
client.closeStatement(ps)

val params = ks.map(key => MySqlStringInjection(key).getBytes).toSeq
val mysqlResult =
client.prepare(selectSql).select(params:_*) { row =>
(row(kCol).map(MySqlValue(_)), row(vCol).map(MySqlValue(_)))
}
FutureOps.liftValues(
ks,
mysqlResult.map { rows =>
rows.toMap.filterKeys { _ != None }.map { case (optK, optV) => (optK.get, optV) }
},
{ (k: K1) => Future.None }
Expand Down Expand Up @@ -204,10 +199,7 @@ class MySqlStore(protected [mysql] val client: Client, table: String, kCol: Stri
case false =>
val deleteSql = MULTI_DELETE_SQL_PREFIX + Stream.continually("?").take(deleteKeys.size).mkString("(", ",", ")")
val deleteParams = deleteKeys.map { k => MySqlStringInjection(k).getBytes }.toSeq
client.prepareAndExecute(deleteSql, deleteParams:_*).map { case (ps, r) =>
// close prepared statement on server
client.closeStatement(ps)
}
client.prepare(deleteSql)(deleteParams:_*)
}

// sequence the three queries. the inner futures are lazy
Expand All @@ -224,10 +216,6 @@ class MySqlStore(protected [mysql] val client: Client, table: String, kCol: Stri

override def close(t: Time) = {
// close prepared statements before closing the connection
client.closeStatement(selectStmt)
client.closeStatement(insertStmt)
client.closeStatement(updateStmt)
client.closeStatement(deleteStmt)
client.close(t)
}

Expand All @@ -239,18 +227,15 @@ class MySqlStore(protected [mysql] val client: Client, table: String, kCol: Stri
get(k).flatMap { optionV =>
optionV match {
case Some(value) =>
updateStmt.parameters = Array(MySqlStringInjection(v).getBytes, MySqlStringInjection(k).getBytes)
client.execute(updateStmt)
updateStmt(MySqlStringInjection(v).getBytes, MySqlStringInjection(k).getBytes)
case None =>
insertStmt.parameters = Array(MySqlStringInjection(k).getBytes, MySqlStringInjection(v).getBytes)
client.execute(insertStmt)
insertStmt(MySqlStringInjection(k).getBytes, MySqlStringInjection(v).getBytes)
}
}
}

protected def doDelete(k: MySqlValue): Future[Result] = {
deleteStmt.parameters = Array(MySqlStringInjection(k).getBytes)
client.execute(deleteStmt)
deleteStmt(MySqlStringInjection(k).getBytes)
}

// enclose table or column names in backticks, in case they happen to be sql keywords
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package com.twitter.storehaus.mysql

import java.util.logging.Level

import com.twitter.finagle.exp.Mysql
import com.twitter.finagle.exp.mysql.Client
import com.twitter.storehaus.testing.SelfAggregatingCloseableCleanup
import com.twitter.storehaus.testing.generator.NonEmpty
Expand Down Expand Up @@ -126,13 +127,15 @@ object MySqlStoreProperties extends Properties("MySqlStore")
withStore(multiPutAndMultiGetStoreTest(_, NonEmpty.Pairing.numerics[Short]()), "smallint", "smallint", true)

private def withStore[T](f: MySqlStore => T, kColType: String, vColType: String, multiGet: Boolean = false): T = {
val client = Client("localhost:3306", "storehaususer", "test1234", "storehaus_test", Level.WARNING)
val client = Mysql.client
.withCredentials("storehaususer", "test1234")
.withDatabase("storehaus_test")
.newRichClient("127.0.0.1:3306")
// these should match mysql setup used in .travis.yml

val tableName = "storehaus-mysql-"+kColType+"-"+vColType + ( if (multiGet) { "-multiget" } else { "" } )
val schema = "CREATE TEMPORARY TABLE IF NOT EXISTS `"+tableName+"` (`key` "+kColType+" DEFAULT NULL, `value` "+vColType+" DEFAULT NULL) ENGINE=InnoDB DEFAULT CHARSET=utf8;"
val schema = s"CREATE TEMPORARY TABLE IF NOT EXISTS `${tableName}` (`key` ${kColType} DEFAULT NULL, `value` ${vColType} DEFAULT NULL) ENGINE=InnoDB DEFAULT CHARSET=utf8;"
Await.result(client.query(schema))

f(newStore(client, tableName))
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,11 @@ package com.twitter.storehaus.mysql

import java.util.logging.Level

import com.twitter.finagle.exp.Mysql
import com.twitter.finagle.exp.mysql.Client
import com.twitter.storehaus.testing.SelfAggregatingCloseableCleanup
import com.twitter.storehaus.testing.generator.NonEmpty
import com.twitter.util.{Await, Future}
import com.twitter.util.Await

import org.scalacheck.Arbitrary
import org.scalacheck.Gen
Expand Down Expand Up @@ -91,7 +92,10 @@ object MySqlLongStoreProperties extends Properties("MySqlLongStore")
withStore(multiMergeStoreTest(_), "text", "bigint", true)

private def withStore[T](f: MySqlLongStore => T, kColType: String, vColType: String, merge: Boolean = false): T = {
val client = Client("localhost:3306", "storehaususer", "test1234", "storehaus_test", Level.WARNING)
val client = Mysql.client
.withCredentials("storehaususer", "test1234")
.withDatabase("storehaus_test")
.newRichClient("127.0.0.1:3306")
// these should match mysql setup used in .travis.yml

val tableName = "storehaus-mysql-long-"+kColType+"-"+vColType + ( if (merge) { "-merge" } else { "" } )
Expand Down

0 comments on commit 841b2e7

Please sign in to comment.