diff --git a/project/Build.scala b/project/Build.scala index 6bcbbebc..df9642c2 100644 --- a/project/Build.scala +++ b/project/Build.scala @@ -121,7 +121,7 @@ object StorehausBuild extends Build { val algebirdVersion = "0.7.0" val bijectionVersion = "0.6.3" - val utilVersion = "6.11.0" + val utilVersion = "6.22.0" val scaldingVersion = "0.11.1" lazy val storehaus = Project( diff --git a/project/Finagle.scala b/project/Finagle.scala index bd9454cb..753e4305 100644 --- a/project/Finagle.scala +++ b/project/Finagle.scala @@ -5,7 +5,7 @@ package storehaus * dependency */ object Finagle { import sbt._ - val LatestVersion = "6.12.2" + val LatestVersion = "6.22.0" def module(name: String, version: String = LatestVersion) = StorehausBuild.withCross("com.twitter" %% "finagle-%s".format(name) % version) } diff --git a/storehaus-mysql/src/main/scala/com/twitter/storehaus/mysql/MySQLStore.scala b/storehaus-mysql/src/main/scala/com/twitter/storehaus/mysql/MySQLStore.scala index d32d992d..93c46af9 100644 --- a/storehaus-mysql/src/main/scala/com/twitter/storehaus/mysql/MySQLStore.scala +++ b/storehaus-mysql/src/main/scala/com/twitter/storehaus/mysql/MySQLStore.scala @@ -16,7 +16,7 @@ package com.twitter.storehaus.mysql -import com.twitter.finagle.exp.mysql.{ Client, PreparedStatement, Result } +import com.twitter.finagle.exp.mysql.{ Client, Result } import com.twitter.storehaus.FutureOps import com.twitter.storehaus.Store import com.twitter.util.{ Await, Future, Time } @@ -88,10 +88,10 @@ class MySqlStore(protected [mysql] val client: Client, table: String, kCol: Stri // prepared statements to be reused across gets and puts // TODO: should this be non-blocking? this is part of object construction, so maybe not? - protected val selectStmt = Await.result(client.prepare(SELECT_SQL)) - protected val insertStmt = Await.result(client.prepare(INSERT_SQL)) - protected val updateStmt = Await.result(client.prepare(UPDATE_SQL)) - protected val deleteStmt = Await.result(client.prepare(DELETE_SQL)) + protected val selectStmt = client.prepare(SELECT_SQL) + protected val insertStmt = client.prepare(INSERT_SQL) + protected val updateStmt = client.prepare(UPDATE_SQL) + protected val deleteStmt = client.prepare(DELETE_SQL) protected [mysql] def startTransaction : Future[Unit] = client.query(START_TXN_SQL).unit protected [mysql] def commitTransaction : Future[Unit] = client.query(COMMIT_TXN_SQL).unit @@ -102,10 +102,7 @@ class MySqlStore(protected [mysql] val client: Client, table: String, kCol: Stri val insertParams = kvs.map { kv => List(MySqlStringInjection(kv._1).getBytes, MySqlStringInjection(kv._2).getBytes) }.toSeq.flatten - client.prepareAndExecute(insertSql, insertParams:_*).map { case (ps, r) => - // close prepared statement on server - client.closeStatement(ps) - } + client.prepare(insertSql)(insertParams:_*) } protected [mysql] def executeMultiUpdate[K1 <: MySqlValue](kvs: Map[K1, MySqlValue]) = { @@ -118,23 +115,19 @@ class MySqlStore(protected [mysql] val client: Client, table: String, kCol: Stri val updateCaseParams = updateParams.map { kv => List(kv._1, kv._2) }.toSeq.flatten // params for "IN (?, ?, ?)" val updateInParams = updateParams.map { kv => kv._1 }.toSeq - client.prepareAndExecute(updateSql, (updateCaseParams ++ updateInParams):_*).map { case (ps, r) => - // close prepared statement on server - client.closeStatement(ps) - } + client.prepare(updateSql)((updateCaseParams ++ updateInParams):_*) } override def get(k: MySqlValue): Future[Option[MySqlValue]] = { // finagle-mysql select() method lets you pass in a mapping function // to convert resultset into desired output format // we assume here the mysql client already has the dbname/schema selected - selectStmt.parameters = Array(MySqlStringInjection(k).getBytes) - val mysqlResult: Future[Seq[Option[MySqlValue]]] = client.select(selectStmt) { row => - row(vCol) match { case None => None; case Some(v) => Some(MySqlValue(v)) } - } - mysqlResult.map { case result => - result.lift(0).flatten.headOption - } + val mysqlResult = + selectStmt.select(MySqlStringInjection(k).getBytes) { row => + row(vCol).map { MySqlValue(_)} + } + + mysqlResult.map { result => result.lift(0).flatten.headOption } } override def multiGet[K1 <: MySqlValue](ks: Set[K1]): Map[K1, Future[Option[MySqlValue]]] = { @@ -142,13 +135,15 @@ class MySqlStore(protected [mysql] val client: Client, table: String, kCol: Stri // build preparedstatement based on keyset size val placeholders = Stream.continually("?").take(ks.size).mkString("(", ",", ")") val selectSql = MULTI_SELECT_SQL_PREFIX + placeholders - val mysqlResult: Future[(PreparedStatement,Seq[(Option[MySqlValue], Option[MySqlValue])])] = - client.prepareAndSelect(selectSql, ks.map(key => MySqlStringInjection(key).getBytes).toSeq:_* ) { row => - (row(kCol).map(MySqlValue(_)), row(vCol).map(MySqlValue(_))) - } - FutureOps.liftValues(ks, - mysqlResult.map { case (ps, rows) => - client.closeStatement(ps) + + val params = ks.map(key => MySqlStringInjection(key).getBytes).toSeq + val mysqlResult = + client.prepare(selectSql).select(params:_*) { row => + (row(kCol).map(MySqlValue(_)), row(vCol).map(MySqlValue(_))) + } + FutureOps.liftValues( + ks, + mysqlResult.map { rows => rows.toMap.filterKeys { _ != None }.map { case (optK, optV) => (optK.get, optV) } }, { (k: K1) => Future.None } @@ -204,10 +199,7 @@ class MySqlStore(protected [mysql] val client: Client, table: String, kCol: Stri case false => val deleteSql = MULTI_DELETE_SQL_PREFIX + Stream.continually("?").take(deleteKeys.size).mkString("(", ",", ")") val deleteParams = deleteKeys.map { k => MySqlStringInjection(k).getBytes }.toSeq - client.prepareAndExecute(deleteSql, deleteParams:_*).map { case (ps, r) => - // close prepared statement on server - client.closeStatement(ps) - } + client.prepare(deleteSql)(deleteParams:_*) } // sequence the three queries. the inner futures are lazy @@ -224,10 +216,6 @@ class MySqlStore(protected [mysql] val client: Client, table: String, kCol: Stri override def close(t: Time) = { // close prepared statements before closing the connection - client.closeStatement(selectStmt) - client.closeStatement(insertStmt) - client.closeStatement(updateStmt) - client.closeStatement(deleteStmt) client.close(t) } @@ -239,18 +227,15 @@ class MySqlStore(protected [mysql] val client: Client, table: String, kCol: Stri get(k).flatMap { optionV => optionV match { case Some(value) => - updateStmt.parameters = Array(MySqlStringInjection(v).getBytes, MySqlStringInjection(k).getBytes) - client.execute(updateStmt) + updateStmt(MySqlStringInjection(v).getBytes, MySqlStringInjection(k).getBytes) case None => - insertStmt.parameters = Array(MySqlStringInjection(k).getBytes, MySqlStringInjection(v).getBytes) - client.execute(insertStmt) + insertStmt(MySqlStringInjection(k).getBytes, MySqlStringInjection(v).getBytes) } } } protected def doDelete(k: MySqlValue): Future[Result] = { - deleteStmt.parameters = Array(MySqlStringInjection(k).getBytes) - client.execute(deleteStmt) + deleteStmt(MySqlStringInjection(k).getBytes) } // enclose table or column names in backticks, in case they happen to be sql keywords diff --git a/storehaus-mysql/src/test/scala/com/twitter/storehaus/mysql/MySQLStoreProperties.scala b/storehaus-mysql/src/test/scala/com/twitter/storehaus/mysql/MySQLStoreProperties.scala index 89e1b484..73469908 100644 --- a/storehaus-mysql/src/test/scala/com/twitter/storehaus/mysql/MySQLStoreProperties.scala +++ b/storehaus-mysql/src/test/scala/com/twitter/storehaus/mysql/MySQLStoreProperties.scala @@ -18,6 +18,7 @@ package com.twitter.storehaus.mysql import java.util.logging.Level +import com.twitter.finagle.exp.Mysql import com.twitter.finagle.exp.mysql.Client import com.twitter.storehaus.testing.SelfAggregatingCloseableCleanup import com.twitter.storehaus.testing.generator.NonEmpty @@ -126,13 +127,15 @@ object MySqlStoreProperties extends Properties("MySqlStore") withStore(multiPutAndMultiGetStoreTest(_, NonEmpty.Pairing.numerics[Short]()), "smallint", "smallint", true) private def withStore[T](f: MySqlStore => T, kColType: String, vColType: String, multiGet: Boolean = false): T = { - val client = Client("localhost:3306", "storehaususer", "test1234", "storehaus_test", Level.WARNING) + val client = Mysql.client + .withCredentials("storehaususer", "test1234") + .withDatabase("storehaus_test") + .newRichClient("127.0.0.1:3306") // these should match mysql setup used in .travis.yml val tableName = "storehaus-mysql-"+kColType+"-"+vColType + ( if (multiGet) { "-multiget" } else { "" } ) - val schema = "CREATE TEMPORARY TABLE IF NOT EXISTS `"+tableName+"` (`key` "+kColType+" DEFAULT NULL, `value` "+vColType+" DEFAULT NULL) ENGINE=InnoDB DEFAULT CHARSET=utf8;" + val schema = s"CREATE TEMPORARY TABLE IF NOT EXISTS `${tableName}` (`key` ${kColType} DEFAULT NULL, `value` ${vColType} DEFAULT NULL) ENGINE=InnoDB DEFAULT CHARSET=utf8;" Await.result(client.query(schema)) - f(newStore(client, tableName)) } diff --git a/storehaus-mysql/src/test/scala/com/twitter/storehaus/mysql/MySqlLongStoreProperties.scala b/storehaus-mysql/src/test/scala/com/twitter/storehaus/mysql/MySqlLongStoreProperties.scala index a7e70a97..ab6c1f0f 100644 --- a/storehaus-mysql/src/test/scala/com/twitter/storehaus/mysql/MySqlLongStoreProperties.scala +++ b/storehaus-mysql/src/test/scala/com/twitter/storehaus/mysql/MySqlLongStoreProperties.scala @@ -18,10 +18,11 @@ package com.twitter.storehaus.mysql import java.util.logging.Level +import com.twitter.finagle.exp.Mysql import com.twitter.finagle.exp.mysql.Client import com.twitter.storehaus.testing.SelfAggregatingCloseableCleanup import com.twitter.storehaus.testing.generator.NonEmpty -import com.twitter.util.{Await, Future} +import com.twitter.util.Await import org.scalacheck.Arbitrary import org.scalacheck.Gen @@ -91,7 +92,10 @@ object MySqlLongStoreProperties extends Properties("MySqlLongStore") withStore(multiMergeStoreTest(_), "text", "bigint", true) private def withStore[T](f: MySqlLongStore => T, kColType: String, vColType: String, merge: Boolean = false): T = { - val client = Client("localhost:3306", "storehaususer", "test1234", "storehaus_test", Level.WARNING) + val client = Mysql.client + .withCredentials("storehaususer", "test1234") + .withDatabase("storehaus_test") + .newRichClient("127.0.0.1:3306") // these should match mysql setup used in .travis.yml val tableName = "storehaus-mysql-long-"+kColType+"-"+vColType + ( if (merge) { "-merge" } else { "" } )