Skip to content

Commit

Permalink
support redis zset
Browse files Browse the repository at this point in the history
  • Loading branch information
harryzhuang authored and KarelCemus committed Feb 10, 2022
1 parent 762026b commit 8aa4eb7
Show file tree
Hide file tree
Showing 12 changed files with 485 additions and 9 deletions.
9 changes: 9 additions & 0 deletions src/main/scala/play/api/cache/redis/CacheApi.scala
Original file line number Diff line number Diff line change
Expand Up @@ -280,6 +280,15 @@ private[redis] trait AbstractCacheApi[Result[_]] {
* @return Scala wrapper
*/
def map[T: ClassTag](key: String): RedisMap[T, Result]

/**
* Scala wrapper around Redis sorted-set-related commands.
*
* @param key the key storing the map
* @tparam T type of elements within the sorted-set
* @return Scala wrapper
*/
def zset[T: ClassTag](key: String): RedisSortedSet[T, Result]
}

/** Synchronous and blocking implementation of the connection to the redis database */
Expand Down
52 changes: 52 additions & 0 deletions src/main/scala/play/api/cache/redis/RedisSortedSet.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
package play.api.cache.redis

import scala.collection.immutable.TreeSet
import scala.language.higherKinds

trait RedisSortedSet[Elem, Result[_]] extends RedisCollection[TreeSet[Elem], Result] {
override type This = RedisSortedSet[Elem, Result]

/**
* Adds all the specified members with the specified scores to the sorted set stored at key.
* It is possible to specify multiple score / member pairs. If a specified member is already
* a member of the sorted set, the score is updated and the element reinserted at the right
* position to ensure the correct ordering.
*
* If key does not exist, a new sorted set with the specified members as sole members is created,
* like if the sorted set was empty.
*
* @note If the key exists but does not hold a sorted set, an error is returned.
* @note <strong>Time complexity:</strong> O(log(N)) for each item added, where N is the number of elements in the sorted set.
* @param scoreValues values and corresponding scores to be added
* @return the sorted set for chaining calls
*/
def add(scoreValues: (Double, Elem)*): Result[This]

/**
* <p>Tests if the element is contained in the sorted set. Returns true if exists, otherwise returns false</p>
*
* @note <strong>Time complexity:</strong> O(1)
* @param element tested element
* @return true if exists in the set, otherwise false
*/
def contains(element: Elem): Result[Boolean]

/**
* <p>Removes the specified members from the sorted set stored at key. Non existing members are ignored.
* An error is returned when key exists and does not hold a sorted set.</p>
*
* @note <strong>Time complexity:</strong> O(M*log(N)) with N being the number of elements in the sorted set and M the number of elements to be removed.
* @param element elements to be removed
* @return the sorted set for chaining calls
*/
def remove(element: Elem*): Result[This]

/**
* Returns the specified range of elements in the sorted set stored at key which sorted in order specified by param `isReverse`.
* @param start the start index of the range
* @param stop the stop index of the range
* @param isReverse whether sorted in descending order or not
* @return
*/
def range(start: Long, stop: Long, isReverse: Boolean = false): Result[Seq[Elem]]
}
90 changes: 90 additions & 0 deletions src/main/scala/play/api/cache/redis/connector/RedisConnector.scala
Original file line number Diff line number Diff line change
Expand Up @@ -463,6 +463,95 @@ private[redis] trait SetCommands {
def setRemove(key: String, value: Any*): Future[Long]
}

/**
* Internal non-blocking Redis API implementing REDIS protocol
*
* Subset of REDIS commands, sorted set related commands.
*
* @see https://redis.io/commands
*/
private[redis] trait SortedSetCommands {

/**
* Adds all the specified members with the specified scores to the sorted set stored at key.
* It is possible to specify multiple score / member pairs. If a specified member is already
* a member of the sorted set, the score is updated and the element reinserted at the right
* position to ensure the correct ordering.
*
* If key does not exist, a new sorted set with the specified members as sole members is created,
* like if the sorted set was empty. If the key exists but does not hold a sorted set, an error
* is returned.
*
* @note Time complexity: O(log(N)) for each item added, where N is the number of elements in the sorted set.
* @param key cache storage key
* @param scoreValues values and corresponding scores to be added
* @return number of inserted elements ignoring already existing
*/
def sortedSetAdd(key: String, scoreValues: (Double, Any)*): Future[Long]

/**
* Returns the sorted set cardinality (number of elements) of the sorted set stored at key.
*
* Time complexity: O(1)
*
* @param key cache storage key
* @return the cardinality (number of elements) of the set, or 0 if key does not exist.
*/
def sortedSetSize(key: String): Future[Long]

/**
* Returns the score of member in the sorted set at key.
*
* If member does not exist in the sorted set, or key does not exist, nil is returned.
*
* Time complexity: O(1)
*
* @param key cache storage key
* @param value tested element
* @return the score of member (a double precision floating point number).
*/
def sortedSetScore(key: String, value: Any): Future[Option[Double]]

/**
* Removes the specified members from the sorted set stored at key. Non existing members are ignored.
*
* An error is returned when key exists and does not hold a sorted set.
*
* Time complexity: O(M*log(N)) with N being the number of elements in the sorted set and M the number of elements to be removed.
*
* @param key cache storage key
* @param value values to be removed
* @return total number of removed values, non existing are ignored
*/
def sortedSetRemove(key: String, value: Any*): Future[Long]

/**
* Returns the specified range of elements in the sorted set stored at key.
*
* An error is returned when key exists and does not hold a sorted set.
* @param key cache storage key
* @param start the start index of the range
* @param stop the stop index of the range
* @note The start and stop arguments represent zero-based indexes, where 0 is the first element,
* 1 is the next element, and so on. These arguments specify an inclusive range.
* @return list of elements in the specified range
*/
def sortedSetRange[T: ClassTag](key: String, start: Long, stop: Long): Future[Seq[T]]

/**
* Returns the specified range of elements in the sorted set stored at key.
* The elements are considered to be ordered from the highest to the lowest score.
* Descending lexicographical order is used for elements with equal score.
*
* @param key cache storage key
* @param start the start index of the range
* @param stop the stop index of the range
* @note Apart from the reversed ordering, the zrevRange is similar to zrange.
* @return list of elements in the specified range
*/
def sortedSetReverseRange[T: ClassTag](key: String, start: Long, stop: Long): Future[Seq[T]]
}

/**
* Internal non-blocking Redis API implementing REDIS protocol
*
Expand All @@ -473,3 +562,4 @@ trait RedisConnector extends AnyRef
with ListCommands
with SetCommands
with HashCommands
with SortedSetCommands
Original file line number Diff line number Diff line change
Expand Up @@ -294,6 +294,58 @@ private[connector] class RedisConnectorImpl(serializer: AkkaSerializer, redis: R
}
}

def sortedSetAdd(key: String, scoreValues: (Double, Any)*) = {
// encodes the value
def toEncoded(scoreValue: (Double, Any)) = encode(key, scoreValue._2).map((scoreValue._1, _))

Future.sequence(scoreValues.map(toEncoded)).flatMap(redis.zadd(key, _: _*)) executing "ZADD" withKey key andParameters scoreValues expects {
case inserted =>
log.debug(s"Inserted $inserted elements into the zset at '$key'.")
inserted
} recover {
case ExecutionFailedException(_, _, _, ex) if ex.getMessage startsWith "WRONGTYPE" =>
log.warn(s"Value at '$key' is not a zset.")
throw new IllegalArgumentException(s"Value at '$key' is not a zset.")
}
}

def sortedSetSize(key: String) =
redis.zcard(key) executing "ZCARD" withKey key logging {
case length => log.debug(s"The zset at '$key' has $length items.")
}

def sortedSetScore(key: String, value: Any) = {
encode(key, value) flatMap (redis.zscore(key, _)) executing "ZSCORE" withKey key andParameter value logging {
case Some(score) => log.debug(s"The score of item: $value is $score in the collection at '$key'.")
case None => log.debug(s"Item $value does not exist in the collection at '$key'")
}
}

def sortedSetRemove(key: String, values: Any*) = {
// encodes the value
def toEncoded(value: Any) = encode(key, value)

Future.sequence(values map toEncoded).flatMap(redis.zrem(key, _: _*)) executing "ZREM" withKey key andParameters values logging {
case removed => log.debug(s"Removed $removed elements from the zset at '$key'.")
}
}

def sortedSetRange[T: ClassTag](key: String, start: Long, stop: Long) = {
redis.zrange[String](key, start, stop) executing "ZRANGE" withKey key andParameter s"$start $stop" expects {
case encodedSeq =>
log.debug(s"Got range from $start to $stop in the zset at '$key'.")
encodedSeq.map(encoded => decode[T](key, encoded))
}
}

def sortedSetReverseRange[T: ClassTag](key: String, start: Long, stop: Long) = {
redis.zrevrange[String](key, start, stop) executing "ZREVRANGE" withKey key andParameter s"$start $stop" expects {
case encodedSeq =>
log.debug(s"Got reverse range from $start to $stop in the zset at '$key'.")
encodedSeq.map(encoded => decode[T](key, encoded))
}
}

def hashRemove(key: String, fields: String*) =
redis.hdel(key, fields: _*) executing "HDEL" withKey key andParameters fields logging {
case removed => log.debug(s"Removed $removed elements from the collection at '$key'.")
Expand Down
8 changes: 8 additions & 0 deletions src/main/scala/play/api/cache/redis/impl/Builders.scala
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@ object Builders {
def name: String
/** converts future result produced by Redis to the result of desired type */
def toResult[T](run: => Future[T], default: => Future[T])(implicit runtime: RedisRuntime): Result[T]
/** maps the value */
def map[T, U](result: Result[T])(f: T => U)(implicit runtime: RedisRuntime): Result[U]
// $COVERAGE-OFF$
/** show the builder name */
override def toString = s"ResultBuilder($name)"
Expand All @@ -32,6 +34,9 @@ object Builders {
// recover from known exceptions
case failure: RedisException => runtime.policy.recoverFrom(run, default, failure)
}

override def map[T, U](result: AsynchronousResult[T])(f: T => U)(implicit runtime: RedisRuntime): AsynchronousResult[U] =
result.map(f)
}

/** converts the future into the value */
Expand All @@ -54,5 +59,8 @@ object Builders {
// apply recovery policy to recover from expected exceptions
case failure: RedisException => Await.result(runtime.policy.recoverFrom(run, default, failure), runtime.timeout.duration)
}.get // finally, regardless the recovery status, get the synchronous result

override def map[T, U](result: SynchronousResult[T])(f: T => U)(implicit runtime: RedisRuntime): SynchronousResult[U] =
f(result)
}
}
4 changes: 4 additions & 0 deletions src/main/scala/play/api/cache/redis/impl/RedisCache.scala
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,10 @@ private[impl] class RedisCache[Result[_]](redis: RedisConnector, builder: Builde
new RedisMapImpl(key, redis)
}

def zset[T: ClassTag](key: String): RedisSortedSet[T, Result] = key.prefixed { key =>
new RedisSortedSetImpl(key, redis)
}

// $COVERAGE-OFF$
override def toString = s"RedisCache(name=${runtime.name})"
// $COVERAGE-ON$
Expand Down
49 changes: 49 additions & 0 deletions src/main/scala/play/api/cache/redis/impl/RedisSortedSetImpl.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
package play.api.cache.redis.impl

import play.api.cache.redis._

import scala.language.{higherKinds, implicitConversions}
import scala.reflect.ClassTag

/** <p>Implementation of Set API using redis-server cache implementation.</p> */
private[impl] class RedisSortedSetImpl[Elem: ClassTag, Result[_]](
key: String,
redis: RedisConnector
)(implicit
builder: Builders.ResultBuilder[Result],
runtime: RedisRuntime
) extends RedisSortedSet[Elem, Result] {

// implicit ask timeout and execution context
import dsl._

@inline
private def This: This = this

override def add(scoreValues: (Double, Elem)*): Result[RedisSortedSet[Elem, Result]] =
redis.sortedSetAdd(key, scoreValues: _*).map(_ => This).recoverWithDefault(This)

override def contains(element: Elem): Result[Boolean] =
redis.sortedSetScore(key, element).map(_.isDefined).recoverWithDefault(false)

override def remove(element: Elem*): Result[RedisSortedSet[Elem, Result]] = {
redis.sortedSetRemove(key, element: _*).map(_ => This).recoverWithDefault(This)
}

override def range(start: Long, stop: Long, isReverse: Boolean = false): Result[Seq[Elem]] = {
if (isReverse) {
redis.sortedSetReverseRange[Elem](key, start, stop).recoverWithDefault(Seq.empty)
} else {
redis.sortedSetRange[Elem](key, start, stop).recoverWithDefault(Seq.empty)
}
}

override def size: Result[Long] =
redis.sortedSetSize(key).recoverWithDefault(0)

override def isEmpty: Result[Boolean] =
builder.map(size)(_ == 0)

override def nonEmpty: Result[Boolean] =
builder.map(isEmpty)(x => !x)
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,18 +17,19 @@ class RedisConnectorFailureSpec(implicit ee: ExecutionEnv) extends Specification

import org.mockito.ArgumentMatchers._

val key = "key"
val value = "value"
private val key = "key"
private val value = "value"
private val score = 1D

val simulatedEx = new RuntimeException("Simulated failure.")
val simulatedFailure = Failure(simulatedEx)
private val simulatedEx = new RuntimeException("Simulated failure.")
private val simulatedFailure = Failure(simulatedEx)

val someValue = Some(value)
private val someValue = Some(value)

val disconnected = Future.failed(new IllegalStateException("Simulated redis status: disconnected."))
private val disconnected = Future.failed(new IllegalStateException("Simulated redis status: disconnected."))

def anySerializer = org.mockito.ArgumentMatchers.any[ByteStringSerializer[String]]
def anyDeserializer = org.mockito.ArgumentMatchers.any[ByteStringDeserializer[String]]
private def anySerializer = org.mockito.ArgumentMatchers.any[ByteStringSerializer[String]]
private def anyDeserializer = org.mockito.ArgumentMatchers.any[ByteStringDeserializer[String]]

"Serializer failure" should {

Expand Down Expand Up @@ -138,5 +139,44 @@ class RedisConnectorFailureSpec(implicit ee: ExecutionEnv) extends Specification
// run the test
connector.hashSet(key, "field", value) must throwA[ExecutionFailedException].await
}

"failed ZADD" in new MockedConnector {
serializer.encode(anyString) returns "encoded"
commands.zadd[String](anyString, any)(anySerializer) returns disconnected
// run the test
connector.sortedSetAdd(key, (score, value)) must throwA[ExecutionFailedException].await
}

"failed ZCARD" in new MockedConnector {
commands.zcard(anyString) returns disconnected
// run the test
connector.sortedSetSize(key) must throwA[ExecutionFailedException].await
}

"failed ZSCORE" in new MockedConnector {
serializer.encode(anyString) returns "encoded"
commands.zscore[String](anyString, anyString)(anySerializer) returns disconnected
// run the test
connector.sortedSetScore(key, value) must throwA[ExecutionFailedException].await
}

"failed ZREM" in new MockedConnector {
serializer.encode(anyString) returns "encoded"
commands.zrem[String](anyString, anyString)(anySerializer) returns disconnected
// run the test
connector.sortedSetRemove(key, value) must throwA[ExecutionFailedException].await
}

"failed ZRANGE" in new MockedConnector {
commands.zrange[String](anyString, anyLong, anyLong)(anyDeserializer) returns disconnected
// run the test
connector.sortedSetRange[String](key, 1, 5) must throwA[ExecutionFailedException].await
}

"failed ZREVRANGE" in new MockedConnector {
commands.zrevrange[String](anyString, anyLong, anyLong)(anyDeserializer) returns disconnected
// run the test
connector.sortedSetReverseRange[String](key, 1, 5) must throwA[ExecutionFailedException].await
}
}
}
Loading

0 comments on commit 8aa4eb7

Please sign in to comment.