Skip to content

Commit

Permalink
cleaup storehaus-redis (#328)
Browse files Browse the repository at this point in the history
  • Loading branch information
BenFradet authored and johnynek committed Oct 7, 2016
1 parent d27bbbb commit 854c1b8
Show file tree
Hide file tree
Showing 14 changed files with 98 additions and 90 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -37,10 +37,8 @@ class RetriesExhaustedException[K](val key: K)
* These should arguably exist in util-core.
*/
object FutureOps {
// scalastyle:off
def missingValueFor[K](k: K) = Future.exception(new MissingValueException(k))
def retriesExhaustedFor[K](k: K) = Future.exception(new RetriesExhaustedException(k))
// scalastyle:on

/** Kleisli operator for Future[Option[_]] Monad. I knew it would come to this. */
def combineFOFn[A, B, C](
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,10 +29,8 @@ class JMapStore[K, V] extends Store[K, V] {
protected val jstore: JMap[K, Option[V]] = new JHashMap[K, Option[V]]()
protected def storeGet(k: K): Option[V] = {
val stored = jstore.get(k)
// scalastyle:off
if (stored != null) stored
else None
// scalastyle:on
}
override def get(k: K): Future[Option[V]] = Future.value(storeGet(k))
override def put(kv: (K, Option[V])): Future[Unit] = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,9 +40,7 @@ object SearchingReadableStoreProperties extends Properties("SearchingReadableSto
* Creates a ReadableStore that delegates to underlying store and
* records accesses.
*/
// scalastyle:off
def accessRecordingStore[K, V](underlying: ReadableStore[K, V]) =
// scalastyle:on
new ReadableStore[K, V] {
var accesses = 0

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,10 +79,8 @@ object MergeableMemcacheStoreProperties extends Properties("MergeableMemcacheSto
case None => foundOptV.isEmpty
}
if (!isMatch) {
// scalastyle:off
println("FAILURE: Key \"" + k + "\" - expected value " +
expectedOptV + ", but found " + foundOptV)
// scalastyle:on
}
isMatch
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,11 +82,9 @@ object MySqlLongStoreProperties extends Properties("MySqlLongStore")
case None => foundOptV.isEmpty
}
if (!isMatch) {
// scalastyle:off
println(
s"""FAILURE: Key "${String2MySqlValueInjection.invert(k)}" - """" +
s"expected value $expectedOptV, but found $foundOptV")
// scalastyle:on
}
isMatch
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,10 @@

package com.twitter.storehaus.redis

import com.twitter.algebird.Monoid
import com.twitter.util.{ Duration, Future, Time }
import com.twitter.finagle.redis.Client
import com.twitter.storehaus.{ Store, UnpivotedStore }
import org.jboss.netty.buffer.{ ChannelBuffer, ChannelBuffers }
import org.jboss.netty.buffer.ChannelBuffer

/**
*
Expand All @@ -29,13 +28,13 @@ import org.jboss.netty.buffer.{ ChannelBuffer, ChannelBuffers }

object RedisHashStore {

def apply(client: Client, ttl: Option[Duration] = RedisStore.Default.TTL) =
def apply(client: Client, ttl: Option[Duration] = RedisStore.Default.TTL): RedisHashStore =
new RedisHashStore(client, ttl)

def unpivoted(client: Client, ttl: Option[Duration] = RedisStore.Default.TTL) =
def unpivoted(
client: Client, ttl: Option[Duration] = RedisStore.Default.TTL): UnpivotedRedisHashStore =
new UnpivotedRedisHashStore(apply(client, ttl))
}
import RedisHashStore._

/**
* A Store in which keys map to Maps of secondary keys and values backed
Expand All @@ -45,7 +44,10 @@ class RedisHashStore(val client: Client, ttl: Option[Duration])
extends Store[ChannelBuffer, Map[ChannelBuffer, ChannelBuffer]] {

override def get(k: ChannelBuffer): Future[Option[Map[ChannelBuffer, ChannelBuffer]]] =
client.hGetAll(k).map({ case e if (e.isEmpty) => None case xs => Some(Map(xs:_*)) })
client.hGetAll(k).map {
case e if e.isEmpty => None
case xs => Some(Map(xs: _*))
}

protected def set(k: ChannelBuffer, v: Map[ChannelBuffer, ChannelBuffer]) = {
ttl.map(exp => client.expire(k, exp.inSeconds))
Expand All @@ -58,12 +60,13 @@ class RedisHashStore(val client: Client, ttl: Option[Duration])
case (key, None) => client.del(Seq(key)).unit
}

override def close(t: Time) = client.quit.foreach { _ => client.close() }
override def close(t: Time): Future[Unit] = client.quit.foreach { _ => client.close() }
}

/*
* A Store in which K is a tuple of (Key, FieldKey) and V is
* the value of associated with FieldKey within the redis hash
*/
class UnpivotedRedisHashStore(hstore: RedisHashStore)
extends UnpivotedStore[(ChannelBuffer, ChannelBuffer), ChannelBuffer, ChannelBuffer, ChannelBuffer](hstore)(identity)
extends UnpivotedStore[(ChannelBuffer, ChannelBuffer),
ChannelBuffer, ChannelBuffer, ChannelBuffer](hstore)(identity)
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
package com.twitter.storehaus.redis

import com.twitter.algebird.Semigroup
import com.twitter.bijection.{ Injection, NumericInjections }
import com.twitter.bijection.Injection
import com.twitter.bijection.netty.Implicits._
import com.twitter.finagle.redis.Client
import com.twitter.storehaus.ConvertedStore
Expand All @@ -38,7 +38,7 @@ object RedisLongStore {
private [redis] implicit val LongInjection =
Injection.connect[Long, String, Array[Byte], ChannelBuffer]

def apply(client: Client, ttl: Option[Duration] = RedisStore.Default.TTL) =
def apply(client: Client, ttl: Option[Duration] = RedisStore.Default.TTL): RedisLongStore =
new RedisLongStore(RedisStore(client, ttl))
}
import RedisLongStore._
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,10 @@ import org.jboss.netty.buffer.ChannelBuffer
*/

object RedisSetStore {
def apply(client: Client, ttl: Option[Duration] = RedisStore.Default.TTL) =
def apply(client: Client, ttl: Option[Duration] = RedisStore.Default.TTL): RedisSetStore =
new RedisSetStore(client, ttl)
def members(client: Client, ttl: Option[Duration] = RedisStore.Default.TTL) =
def members(
client: Client, ttl: Option[Duration] = RedisStore.Default.TTL): RedisSetMembershipStore =
new RedisSetMembershipStore(RedisSetStore(client, ttl))
}

Expand All @@ -41,7 +42,7 @@ class RedisSetStore(val client: Client, ttl: Option[Duration])

override def get(k: ChannelBuffer): Future[Option[Set[ChannelBuffer]]] =
client.sMembers(k).map {
case e if(e.isEmpty) => None
case e if e.isEmpty => None
case s => Some(s)
}

Expand All @@ -67,7 +68,7 @@ class RedisSetStore(val client: Client, ttl: Option[Duration])
protected [redis] def delete(k: ChannelBuffer, v: List[ChannelBuffer]) =
client.sRem(k, v).unit

override def close(t: Time) = client.quit.foreach { _ => client.close() }
override def close(t: Time): Future[Unit] = client.quit.foreach { _ => client.close() }
}

/**
Expand All @@ -93,7 +94,8 @@ class RedisSetMembershipStore(store: RedisSetStore)
case (key, None) => store.delete(key._1, List(key._2))
}

override def multiPut[K1 <: (ChannelBuffer, ChannelBuffer)](kv: Map[K1, Option[Unit]]): Map[K1, Future[Unit]] = {
override def multiPut[K1 <: (ChannelBuffer, ChannelBuffer)](
kv: Map[K1, Option[Unit]]): Map[K1, Future[Unit]] = {
// we are exploiting redis's built-in support for bulk updates and removals
// by partioning deletions and updates into 2 maps indexed by the first
// component of the composite key, the key of the set
Expand All @@ -104,19 +106,19 @@ class RedisSetMembershipStore(store: RedisSetStore)
case ((deleting, storing), (key, None)) =>
(deleting.updated(key._1, key :: deleting(key._1)), storing)
}
del.map {
del.flatMap {
case (k, members) =>
val value = store.delete(k, members.map(_._2))
members.map(_ -> value)
}.flatten ++ persist.map {
} ++ persist.flatMap {
case (k, members) =>
val value = store.set(k, members.map(_._2))
members.map(_ -> value)
}.flatten toMap
}
}

/** Calling close on this store will also close it's underlying
* RedisSetStore
*/
override def close(t: Time) = store.close(t)
override def close(t: Time): Future[Unit] = store.close(t)
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,11 @@ package com.twitter.storehaus.redis
import com.twitter.algebird.Semigroup
import com.twitter.util.{Future, Time}
import com.twitter.finagle.redis.Client
import com.twitter.storehaus.Store
import com.twitter.storehaus.algebra.MergeableStore
import org.jboss.netty.buffer.ChannelBuffer

object RedisSortedSetStore {
def apply(client: Client) =
def apply(client: Client): RedisSortedSetStore =
new RedisSortedSetStore(client)
}

Expand All @@ -34,7 +33,8 @@ object RedisSortedSetStore {
*/
class RedisSortedSetStore(client: Client)
extends MergeableStore[ChannelBuffer, Seq[(ChannelBuffer, Double)]] {
def semigroup = implicitly[Semigroup[Seq[(ChannelBuffer, Double)]]]
def semigroup: Semigroup[Seq[(ChannelBuffer, Double)]] =
implicitly[Semigroup[Seq[(ChannelBuffer, Double)]]]

/** Returns the whole set as a tuple of seq of (member, score).
* An empty set is represented as None. */
Expand All @@ -58,12 +58,14 @@ class RedisSortedSetStore(client: Client)
}

/** Performs a zIncrBy operation on a set for a seq of members */
override def merge(kv: (ChannelBuffer, Seq[(ChannelBuffer, Double)])): Future[Option[Seq[(ChannelBuffer, Double)]]] =
override def merge(
kv: (ChannelBuffer, Seq[(ChannelBuffer, Double)])
): Future[Option[Seq[(ChannelBuffer, Double)]]] =
Future.collect(kv._2.map {
case (member, by) =>
client.zIncrBy(kv._1, by, member)
.map {
case Some(res) => member -> (res - by) //get the value before
case Some(res) => member -> (res - by) // get the value before
case None => member -> 0.0
}
}).map(Some(_))
Expand All @@ -76,7 +78,7 @@ class RedisSortedSetStore(client: Client)
def members(set: ChannelBuffer): MergeableStore[ChannelBuffer, Double] =
new RedisSortedSetMembershipView(client, set)

override def close(t: Time) = client.quit.foreach { _ => client.close() }
override def close(t: Time): Future[Unit] = client.quit.foreach { _ => client.close() }
}

/** An unpivoted-like member-oriented view of a redis sorted set bound to a specific
Expand All @@ -91,18 +93,18 @@ class RedisSortedSetStore(client: Client)
class RedisSortedSetMembershipView(client: Client, set: ChannelBuffer)
extends MergeableStore[ChannelBuffer, Double] {
private lazy val underlying = new RedisSortedSetMembershipStore(client)
def semigroup = implicitly[Semigroup[Double]]
def semigroup: Semigroup[Double] = implicitly[Semigroup[Double]]

override def get(k: ChannelBuffer): Future[Option[Double]] =
underlying.get((set, k))

override def put(kv: (ChannelBuffer, Option[Double])): Future[Unit] =
underlying.put(((set,kv._1), kv._2))
underlying.put(((set, kv._1), kv._2))

override def merge(kv: (ChannelBuffer, Double)): Future[Option[Double]] =
underlying.merge((set, kv._1), kv._2)
underlying.merge(((set, kv._1), kv._2))

override def close(t: Time) = client.quit.foreach { _ => client.close() }
override def close(t: Time): Future[Unit] = client.quit.foreach { _ => client.close() }
}

/** An unpivoted-like member-oriented view of redis sorted sets.
Expand All @@ -115,11 +117,11 @@ class RedisSortedSetMembershipView(client: Client, set: ChannelBuffer)
*/
class RedisSortedSetMembershipStore(client: Client)
extends MergeableStore[(ChannelBuffer, ChannelBuffer), Double] {
def semigroup = implicitly[Semigroup[Double]]
def semigroup: Semigroup[Double] = implicitly[Semigroup[Double]]

/** @return a member's score or None if the member is not in the set */
override def get(k: (ChannelBuffer, ChannelBuffer)): Future[Option[Double]] =
client.zScore(k._1, k._2).map(_.map(_.toDouble))
client.zScore(k._1, k._2).map(_.map(_.doubleValue()))

/** Partitions a map of multiPut pivoted values into
* a two item tuple of deletes and sets, multimapped
Expand All @@ -132,8 +134,9 @@ class RedisSortedSetMembershipStore(client: Client)
*
* ( general enough to go into PivotOpts )
*/
def multiPutPartitioned[OutterK, InnerK, K1 <: (OutterK, InnerK), V, IndexK](kv: Map[K1, Option[V]])(by: K1 => IndexK):
(Map[IndexK, List[(K1, Option[V])]], Map[IndexK, List[(K1, Option[V])]]) = {
def multiPutPartitioned[OutterK, InnerK, K1 <: (OutterK, InnerK), V, IndexK](
kv: Map[K1, Option[V]])(by: K1 => IndexK)
: (Map[IndexK, List[(K1, Option[V])]], Map[IndexK, List[(K1, Option[V])]]) = {
def emptyMap = Map.empty[IndexK, List[(K1, Option[V])]].withDefaultValue(Nil)
((emptyMap, emptyMap) /: kv) {
case ((deleting, storing), (key, value @ Some(_))) =>
Expand All @@ -147,23 +150,25 @@ class RedisSortedSetMembershipStore(client: Client)

/** Adds or removes members from sets with an initial scoring. A score of None indicates the
* member should be removed from the set */
override def multiPut[K1 <: (ChannelBuffer, ChannelBuffer)](kv: Map[K1, Option[Double]]): Map[K1, Future[Unit]] = {
override def multiPut[K1 <: (ChannelBuffer, ChannelBuffer)](
kv: Map[K1, Option[Double]]): Map[K1, Future[Unit]] = {
// we are exploiting redis's built-in support for removals (zRem)
// by partioning deletions and updates into 2 maps indexed by the first
// component of the composite key, the key of the set
val (del, persist) = multiPutPartitioned[ChannelBuffer, ChannelBuffer, K1, Double, ChannelBuffer](kv)(_._1)
(del.map {
val (del, persist) =
multiPutPartitioned[ChannelBuffer, ChannelBuffer, K1, Double, ChannelBuffer](kv)(_._1)
del.flatMap {
case (k, members) =>
val value = client.zRem(k, members.map(_._1._2))
members.map(_._1 -> value.unit)
}.flatten ++ persist.map {
} ++ persist.flatMap {
case (k, members) =>
members.map {
case (k1, score) =>
// a per-InnerK operation
(k1 -> client.zAdd(k, score.get, k1._2).unit)
k1 -> client.zAdd(k, score.get, k1._2).unit
}
}.flatten).toMap
}
}

/** Performs a zIncrBy operation on a set for a given member */
Expand All @@ -172,5 +177,5 @@ class RedisSortedSetMembershipStore(client: Client)
_.map { res => res - kv._2 }
}

override def close(t: Time) = client.quit.foreach { _ => client.close() }
override def close(t: Time): Future[Unit] = client.quit.foreach { _ => client.close() }
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,6 @@

package com.twitter.storehaus.redis

import com.twitter.algebird.Monoid
import com.twitter.conversions.time._
import com.twitter.util.{ Duration, Future, Time }
import com.twitter.finagle.redis.Client
import com.twitter.storehaus.{ FutureOps, MissingValueException, Store, WithPutTtl }
Expand All @@ -34,27 +32,28 @@ object RedisStore {
val TTL: Option[Duration] = None
}

def apply(client: Client, ttl: Option[Duration] = Default.TTL) =
def apply(client: Client, ttl: Option[Duration] = Default.TTL): RedisStore =
new RedisStore(client, ttl)
}

class RedisStore(val client: Client, ttl: Option[Duration])
extends Store[ChannelBuffer, ChannelBuffer]
with WithPutTtl[ChannelBuffer, ChannelBuffer, RedisStore]
{
override def withPutTtl(ttl: Duration) = new RedisStore(client, Some(ttl))
with WithPutTtl[ChannelBuffer, ChannelBuffer, RedisStore] {

override def withPutTtl(ttl: Duration): RedisStore = new RedisStore(client, Some(ttl))

override def get(k: ChannelBuffer): Future[Option[ChannelBuffer]] =
client.get(k)

override def multiGet[K1 <: ChannelBuffer](ks: Set[K1]): Map[K1, Future[Option[ChannelBuffer]]] = {
override def multiGet[K1 <: ChannelBuffer](
ks: Set[K1]): Map[K1, Future[Option[ChannelBuffer]]] = {
val redisResult: Future[Map[ChannelBuffer, Future[Option[ChannelBuffer]]]] = {
// results are expected in the same order as keys
// keys w/o mapped results are considered exceptional
val keys = ks.toIndexedSeq.view
client.mGet2(keys).map { result =>
val zipped = keys.zip(result).map {
case (k, v) => (k -> Future.value(v))
case (k, v) => k -> Future.value(v)
}.toMap
zipped ++ keys.filterNot(zipped.isDefinedAt).map { k =>
k -> Future.exception(new MissingValueException(k))
Expand All @@ -75,5 +74,5 @@ class RedisStore(val client: Client, ttl: Option[Duration])
case (key, None) => client.del(Seq(key)).unit
}

override def close(t: Time) = client.quit.foreach { _ => client.close() }
override def close(t: Time): Future[Unit] = client.quit.foreach { _ => client.close() }
}
Loading

0 comments on commit 854c1b8

Please sign in to comment.