Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support for Redis 5 #228

Draft
wants to merge 14 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -15,5 +15,5 @@ Session.vim
.DS_Store
*.rdb
*.aof
redis-3.2.0/
redis-3.2.0.tar.gz
redis-4.0.14/
redis-4.0.14.tar.gz
2 changes: 1 addition & 1 deletion .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ before_install:
env:
global:
- SBT_OPTS="-XX:+UseConcMarkSweepGC -XX:+CMSClassUnloadingEnabled -XX:PermSize=256M -XX:MaxPermSize=512M"
- REDIS_HOME=redis-3.2.0/src
- REDIS_HOME=redis-5.0.5/src
# whitelist
branches:
only:
Expand Down
6 changes: 3 additions & 3 deletions install-redis.sh
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
#!/usr/bin/env bash
set -ex
wget http://download.redis.io/releases/redis-3.2.0.tar.gz
tar -xzvf redis-3.2.0.tar.gz
cd redis-3.2.0 && make
wget http://download.redis.io/releases/redis-5.0.5.tar.gz
tar -xzvf redis-5.0.5.tar.gz
cd redis-5.0.5 && make
6 changes: 6 additions & 0 deletions src/main/scala/redis/Converter.scala
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,12 @@ object MultiBulkConverter {
}).getOrElse(None)
}

def toOptionStringByteStringDouble[R](reply: MultiBulk)(implicit deserializer: ByteStringDeserializer[R]): Option[(String, R, Double)] = {
reply.responses.map(r => {
Some((r(0).toString, deserializer.deserialize(r(1).toByteString), r(2).toByteString.utf8String.toDouble))
}).getOrElse(None)
}

def toSeqBoolean(reply: MultiBulk): Seq[Boolean] = {
reply.responses.map(r => {
r.map(_.toString == "1")
Expand Down
7 changes: 6 additions & 1 deletion src/main/scala/redis/Redis.scala
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,11 @@ trait RedisCommands
with HyperLogLog
with Clusters
with Geo
with Streams

trait RedisBlockingCommands
extends BLists
with BSortedSets

abstract class RedisClientActorLike(system: ActorSystem, redisDispatcher: RedisDispatcher, connectTimeout: Option[FiniteDuration] = None) extends ActorRequest {
var host: String
Expand Down Expand Up @@ -94,7 +99,7 @@ case class RedisBlockingClient(var host: String = "localhost",
connectTimeout: Option[FiniteDuration] = None)
(implicit _system: ActorSystem,
redisDispatcher: RedisDispatcher = Redis.dispatcher
) extends RedisClientActorLike(_system, redisDispatcher, connectTimeout) with BLists {
) extends RedisClientActorLike(_system, redisDispatcher, connectTimeout) with RedisBlockingCommands {
}

case class RedisPubSub(
Expand Down
23 changes: 23 additions & 0 deletions src/main/scala/redis/api/BSortedSets.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
package redis.api.bsortedsets

import scala.concurrent.duration.{Duration, FiniteDuration}
import redis._
import akka.util.ByteString
import redis.protocol.MultiBulk

case class Bzpopmax[KK: ByteStringSerializer, R: ByteStringDeserializer](keys: Seq[KK], timeout: FiniteDuration = Duration.Zero)
extends Bzpopx[KK, R]("BZPOPMAX")

case class Bzpopmin[KK: ByteStringSerializer, R: ByteStringDeserializer](keys: Seq[KK], timeout: FiniteDuration = Duration.Zero)
extends Bzpopx[KK, R]("BZPOPMIN")

private[redis] abstract class Bzpopx[KK, R](command: String)(implicit redisKeys: ByteStringSerializer[KK], deserializerR: ByteStringDeserializer[R])
extends RedisCommandMultiBulk[Option[(String, R, Double)]] {
val isMasterOnly = true
val keys: Seq[KK]
val timeout: FiniteDuration

val encodedRequest: ByteString = encode(command, keys.map(redisKeys.serialize) ++ Seq(ByteString(timeout.toSeconds.toString)))

def decodeReply(mb: MultiBulk) = MultiBulkConverter.toOptionStringByteStringDouble(mb)
}
5 changes: 5 additions & 0 deletions src/main/scala/redis/api/Connection.scala
Original file line number Diff line number Diff line change
Expand Up @@ -30,4 +30,9 @@ case object Quit extends RedisCommandStatusBoolean {
case class Select(index: Int) extends RedisCommandStatusBoolean {
val isMasterOnly = true
val encodedRequest: ByteString = encode("SELECT", Seq(ByteString(index.toString)))
}

case class Swapdb(index1: Int, index2: Int) extends RedisCommandStatusBoolean {
val isMasterOnly = true
val encodedRequest: ByteString = encode("SWAPDB", Seq(ByteString(index1.toString), ByteString(index2.toString)))
}
40 changes: 31 additions & 9 deletions src/main/scala/redis/api/Keys.scala
Original file line number Diff line number Diff line change
Expand Up @@ -46,17 +46,32 @@ case class Keys(pattern: String) extends RedisCommandMultiBulk[Seq[String]] {
def decodeReply(mb: MultiBulk) = MultiBulkConverter.toSeqString(mb)
}

case class Migrate[K](host: String, port: Int, key: K, destinationDB: Int, timeout: FiniteDuration)(implicit redisKey: ByteStringSerializer[K])
case class Migrate[K](host: String, port: Int, keys: Seq[K], destinationDB: Int, timeout: FiniteDuration, copy: Boolean = false, replace: Boolean = false, password: Option[String])(implicit redisKey: ByteStringSerializer[K])
extends RedisCommandStatusBoolean {
val isMasterOnly = true
val encodedRequest: ByteString =
encode("MIGRATE",
Seq(ByteString(host),
ByteString(port.toString),
redisKey.serialize(key),
ByteString(destinationDB.toString),
ByteString(timeout.toMillis.toString)
))
val encodedRequest: ByteString = {
val builder = Seq.newBuilder[ByteString]

builder += ByteString(host)
builder += ByteString(port.toString)
builder += ByteString("")
builder += ByteString(destinationDB.toString)
builder += ByteString(timeout.toMillis.toString)

if (copy)
builder += ByteString("COPY")
if (replace)
builder += ByteString("REPLACE")
if (password.isDefined) {
builder += ByteString("AUTH")
builder += ByteString(password.get)
}

builder += ByteString("KEYS")
builder ++= keys.map(redisKey.serialize)

RedisProtocolRequest.multiBulk("MIGRATE", builder.result())
}
}

case class Move[K](key: K, db: Int)(implicit redisKey: ByteStringSerializer[K]) extends SimpleClusterKey[K] with RedisCommandIntegerBoolean {
Expand Down Expand Up @@ -189,3 +204,10 @@ case class Scan[C](cursor: C, count: Option[Int], matchGlob: Option[String])(imp

val empty: Seq[String] = Seq.empty
}

case class Unlink[K](keys: Seq[K])(implicit redisKey: ByteStringSerializer[K]) extends MultiClusterKey[K] with RedisCommandIntegerLong {
val isMasterOnly = true
val encodedRequest: ByteString = encode("UNLINK", keys.map(redisKey.serialize))
}


8 changes: 4 additions & 4 deletions src/main/scala/redis/api/Lists.scala
Original file line number Diff line number Diff line change
Expand Up @@ -35,9 +35,9 @@ case class Lpush[K, V](key: K, values: Seq[V])(implicit redisKey: ByteStringSeri
}


case class Lpushx[K, V](key: K, value: V)(implicit redisKey: ByteStringSerializer[K], convert: ByteStringSerializer[V]) extends SimpleClusterKey[K] with RedisCommandIntegerLong {
case class Lpushx[K, V](key: K, values: Seq[V])(implicit redisKey: ByteStringSerializer[K], convert: ByteStringSerializer[V]) extends SimpleClusterKey[K] with RedisCommandIntegerLong {
val isMasterOnly = true
val encodedRequest: ByteString = encode("LPUSHX", Seq(keyAsString, convert.serialize(value)))
val encodedRequest: ByteString = encode("LPUSHX", keyAsString +: values.map(v => convert.serialize(v)))
}

case class Lrange[K, R](key: K, start: Long, stop: Long)(implicit redisKey: ByteStringSerializer[K], deserializerR: ByteStringDeserializer[R]) extends SimpleClusterKey[K] with RedisCommandMultiBulk[Seq[R]] {
Expand Down Expand Up @@ -83,7 +83,7 @@ case class Rpush[K, V](key: K, values: Seq[V])(implicit redisKey: ByteStringSeri
val encodedRequest: ByteString = encode("RPUSH", keyAsString +: values.map(v => convert.serialize(v)))
}

case class Rpushx[K, V](key: K, value: V)(implicit redisKey: ByteStringSerializer[K], convert: ByteStringSerializer[V]) extends SimpleClusterKey[K] with RedisCommandIntegerLong {
case class Rpushx[K, V](key: K, values: Seq[V])(implicit redisKey: ByteStringSerializer[K], convert: ByteStringSerializer[V]) extends SimpleClusterKey[K] with RedisCommandIntegerLong {
val isMasterOnly = true
val encodedRequest: ByteString = encode("RPUSHX", Seq(keyAsString, convert.serialize(value)))
val encodedRequest: ByteString = encode("RPUSHX", keyAsString +: values.map(v => convert.serialize(v)))
}
8 changes: 4 additions & 4 deletions src/main/scala/redis/api/Servers.scala
Original file line number Diff line number Diff line change
Expand Up @@ -81,14 +81,14 @@ case object DebugSegfault extends RedisCommandStatusString {
val encodedRequest: ByteString = encode("DEBUG SEGFAULT")
}

case object Flushall extends RedisCommandStatusBoolean {
case class Flushall(async: Boolean = false) extends RedisCommandStatusBoolean {
val isMasterOnly: Boolean = true
val encodedRequest: ByteString = encode("FLUSHALL")
val encodedRequest: ByteString = encode("FLUSHALL", if (async) Seq(ByteString("ASYNC")) else Seq.empty)
}

case object Flushdb extends RedisCommandStatusBoolean {
case class Flushdb(async: Boolean = false) extends RedisCommandStatusBoolean {
val isMasterOnly: Boolean = true
val encodedRequest: ByteString = encode("FLUSHDB")
val encodedRequest: ByteString = encode("FLUSHDB", if (async) Seq(ByteString("ASYNC")) else Seq.empty)
}

case class Info(section: Option[String] = None) extends RedisCommandBulk[String] {
Expand Down
14 changes: 14 additions & 0 deletions src/main/scala/redis/api/SortedSets.scala
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,20 @@ case class ZinterstoreWeighted[KD: ByteStringSerializer, K: ByteStringSerializer
val encodedRequest: ByteString = encode("ZINTERSTORE", ZstoreWeighted.buildArgs(destination, keys, aggregate))
}

case class Zpopmax[K, R](key: K, count: Option[Long])(implicit keySeria: ByteStringSerializer[K], deserializerR: ByteStringDeserializer[R])
extends SimpleClusterKey[K] with RedisCommandMultiBulkSeqByteStringDouble[R] {
val isMasterOnly = true
val encodedRequest: ByteString = encode("ZPOPMAX", Seq(keyAsString) ++ count.map(c => ByteString(c.toString)))
val deserializer: ByteStringDeserializer[R] = deserializerR
}

case class Zpopmin[K, R](key: K, count: Option[Long])(implicit keySeria: ByteStringSerializer[K], deserializerR: ByteStringDeserializer[R])
extends SimpleClusterKey[K] with RedisCommandMultiBulkSeqByteStringDouble[R] {
val isMasterOnly = true
val encodedRequest: ByteString = encode("ZPOPMIN", Seq(keyAsString) ++ count.map(c => ByteString(c.toString)))
val deserializer: ByteStringDeserializer[R] = deserializerR
}

case class Zrange[K, R](key: K, start: Long, stop: Long)(implicit keySeria: ByteStringSerializer[K], deserializerR: ByteStringDeserializer[R])
extends SimpleClusterKey[K] with RedisCommandMultiBulkSeqByteString[R] {
val encodedRequest: ByteString = encode("ZRANGE", Seq(keyAsString, ByteString(start.toString), ByteString(stop.toString)))
Expand Down
126 changes: 126 additions & 0 deletions src/main/scala/redis/api/Streams.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,126 @@
package redis.api.streams

import akka.util.ByteString
import redis._
import redis.api.{RequestStreamId, StreamEntry, StreamId, TrimStrategy}
import redis.protocol.{Bulk, MultiBulk, RedisReply}

case class Xadd[K, F, V](key: K, id: RequestStreamId, fields: Seq[(F, V)], trimStrategy: Option[TrimStrategy])(implicit serializerK: ByteStringSerializer[K], serializerF: ByteStringSerializer[F], serializerV: ByteStringSerializer[V])
extends SimpleClusterKey[K] with RedisCommandBulk[StreamId] {
val isMasterOnly = true

val encodedRequest: ByteString = {
val builder = Seq.newBuilder[ByteString]

builder += keyAsString

if (trimStrategy.isDefined) {
builder ++= trimStrategy.get.toByteString
}

builder += id.serialize

fields.foreach { f =>
builder += serializerF.serialize(f._1)
builder += serializerV.serialize(f._2)
}

encode("XADD", builder.result())
}

def decodeReply(bulk: Bulk): StreamId = bulk.response.map(StreamId.deserialize).get
}

case class Xdel[K](key: K, ids: Seq[StreamId])(implicit serializerK: ByteStringSerializer[K])
extends SimpleClusterKey[K] with RedisCommandIntegerLong {
val isMasterOnly = true
val encodedRequest: ByteString = encode("XDEL", Seq(keyAsString) ++ ids.map(_.serialize))
}

case class Xlen[K](key: K)(implicit redisKey: ByteStringSerializer[K])
extends SimpleClusterKey[K] with RedisCommandIntegerLong {
val isMasterOnly = false
val encodedRequest: ByteString = encode("XLEN", Seq(keyAsString))
}

case class Xrange[K, F, V](key: K, start: RequestStreamId, end: RequestStreamId, count: Option[Long] = None)(implicit serializerK: ByteStringSerializer[K], deserializerF: ByteStringDeserializer[F], deserializerV: ByteStringDeserializer[V])
extends SimpleClusterKey[K] with RedisCommandMultiBulk[Seq[StreamEntry[F, V]]] {
val isMasterOnly = false
val encodedRequest: ByteString = encode("XRANGE", Xrange.buildArgs(key, start, end, count))
def decodeReply(reply: MultiBulk): Seq[StreamEntry[F, V]] = StreamEntryDecoder.toSeqEntry(reply)
}

private [redis] object Xrange {
def buildArgs[K](key: K, id1: RequestStreamId, id2: RequestStreamId, count: Option[Long])(implicit serializerK: ByteStringSerializer[K]): Seq[ByteString] = {
val builder = Seq.newBuilder[ByteString]
builder += serializerK.serialize(key)
builder += id1.serialize
builder += id2.serialize
if (count.isDefined) {
builder += ByteString("COUNT")
builder += ByteString(count.get.toString)
}
builder.result()
}
}

case class Xrevrange[K, F, V](key: K, end: RequestStreamId, start: RequestStreamId, count: Option[Long] = None)(implicit serializerK: ByteStringSerializer[K], deserializerF: ByteStringDeserializer[F], deserializerV: ByteStringDeserializer[V])
extends SimpleClusterKey[K] with RedisCommandMultiBulk[Seq[StreamEntry[F, V]]] {
val isMasterOnly = false
val encodedRequest: ByteString = encode("XREVRANGE", Xrange.buildArgs(key, end, start, count))
def decodeReply(reply: MultiBulk): Seq[StreamEntry[F, V]] = StreamEntryDecoder.toSeqEntry(reply)
}

case class Xtrim[K](key: K, trimStrategy: TrimStrategy)(implicit serializerK: ByteStringSerializer[K])
extends SimpleClusterKey[K] with RedisCommandIntegerLong {
val isMasterOnly = true
val encodedRequest: ByteString = encode("XTRIM", Seq(keyAsString) ++ trimStrategy.toByteString)
}

case class Xread[K, F, V](streams: Seq[(K, RequestStreamId)], count: Option[Long])(implicit serializerK: ByteStringSerializer[K], deserializerK: ByteStringDeserializer[K], deserializerF: ByteStringDeserializer[F], deserializerV: ByteStringDeserializer[V])
extends RedisCommandMultiBulk[Option[Seq[(K, Seq[StreamEntry[F, V]])]]] {
val keys = streams.map(_._1)
val isMasterOnly = false
val encodedRequest: ByteString = {
val builder = Seq.newBuilder[ByteString]

if (count.isDefined) {
builder += ByteString("COUNT")
builder += ByteString(count.get.toString)
}

builder += ByteString("STREAMS")
builder ++= streams.map(p => serializerK.serialize(p._1))
builder ++= streams.map(p => p._2.serialize)

encode("XREAD", builder.result())
}

def decodeReply(r: MultiBulk): Option[Seq[(K, Seq[StreamEntry[F, V]])]] =
StreamEntryDecoder.toOptionSeqStringSeqEntry(r)(deserializerK, deserializerF, deserializerV)
}

private [redis] object StreamEntryDecoder {
def toEntry[F, V](mb: MultiBulk)(implicit deserializerF: ByteStringDeserializer[F], deserializerV: ByteStringDeserializer[V]): StreamEntry[F, V] = {
val r = mb.responses.get
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You are using "unsafe" method here and there (.get, r(1) (seq.apply(index))
I don't know how safe it is to do it here.
Maybe you could try to compare with what we did in this file

object MultiBulkConverter {

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please refer to the comment above for the reasoning behind usage of "unsafe" methods.

val id = StreamId.deserialize(r(0).toByteString)
val fields = r(1).asInstanceOf[MultiBulk].responses.get.map(_.toByteString).grouped(2).map(p => (deserializerF.deserialize(p(0)), deserializerV.deserialize(p(1)))).toSeq
StreamEntry(id, fields)
}

def toSeqEntry[F, V](mb: MultiBulk)(implicit deserializerF: ByteStringDeserializer[F], deserializerV: ByteStringDeserializer[V]): Seq[StreamEntry[F, V]] = {
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

relatively similar to MultiBulkConverter.toSeqByteString

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

While the method signature is similar, the underlying reply from Redis is significantly different.

The Stream commands use nested arrays in their replies to a much greater extent than commands for the other data structures. For example, XRANGE returns an array of two-element arrays, where the first element is the stream id and the second element is an array of field-value pairs. Borrowing JSON notation, this would be something like:

[ 
   [ ID1, [ FIELD1, VALUE1, FIELD2, VALUE2, ... ] ], 
   [ ID2, [ FIELD1, VALUE1, FIELD2, VALUE2, ... ] ],
   ...
]

where ID, FIELD and VALUE are all Bulk strings and [] indicates a MultiBulk array.

The XREAD reply is even more nested, with an array containing two element arrays where the first element is the stream key and the second element is a sequence of entries, similar to the XRANGE reply.

Because the replies are structured in such specific ways for Stream comamnds, I opted to put the decoding logic in a separate object (StreamEntryDecoder) rather than adding it to MultiBulkConverter.

I opted to use unsafe operations (casting to MultiBulk, accessing elements by index) for two reasons. First, I think it's best for decoding to break immediately and loudly if we get a reply that doesn't match our understanding/implementation of the Redis spec. Second, I don't see good fallback options, aside from silently dropping parts of the response entirely. Throwing an exception seems like the best response. It seems like there is precedent here in the use of head/tail in MultiBulkConverter.seqtoMapString and MultiBulkConverter.toOptionStringByteString, which will also throw exceptions if the array is too small.

How would you feel about introducing a new exception specifically for decode errors? This feels cleaner than emitting low-level exceptions like ClassCastException, NoSuchElementException, etc., and would allow callers to implement custom decode error handling if desired. I thought about reusing ReplyErrorException for this, but that seems best reserved for error messages from the Redis server itself. Perhaps ReplyDecodeException?

mb.responses.map(r => r.map(_.asInstanceOf[MultiBulk]).map(toEntry[F,V])).getOrElse(Seq())
}

def toOptionSeqStringSeqEntry[K, F, V](mb: MultiBulk)(implicit deserializerK: ByteStringDeserializer[K], deserializerF: ByteStringDeserializer[F], deserializerV: ByteStringDeserializer[V]): Option[Seq[(K, Seq[StreamEntry[F, V]])]] =
mb.responses.map { r =>
r.map(_.asInstanceOf[MultiBulk]).map(toStringSeqEntry[K,F,V])
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you could squeeze some perf by doing map only once
r.map{ case mb: MultiBulk => toStringSeqEntry[K,F,V](mb)}

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point. I'll include this in my next pull request.

}

def toStringSeqEntry[K, F, V](mb: MultiBulk)(implicit deserializerK: ByteStringDeserializer[K], deserializerF: ByteStringDeserializer[F], deserializerV: ByteStringDeserializer[V]): (K, Seq[StreamEntry[F, V]]) = {
val r = mb.responses.get
val key = deserializerK.deserialize(r(0).toByteString)
val entries = toSeqEntry[F,V](r(1).asInstanceOf[MultiBulk])
(key, entries)
}
}
45 changes: 45 additions & 0 deletions src/main/scala/redis/api/api.scala
Original file line number Diff line number Diff line change
Expand Up @@ -74,3 +74,48 @@ object ZaddOption {
}

}

sealed trait RequestStreamId {
def serialize: ByteString
}

case class StreamId(time: Long, sequence: Long = 0) extends RequestStreamId {
override def serialize: ByteString = ByteString(toString)
override def toString: String = s"${time}-${sequence}"
def next: StreamId = StreamId(time, sequence + 1)
}

protected[redis] class MagicStreamId(s: String) extends RequestStreamId {
val serialize: ByteString = ByteString(s)
}

object StreamId {
final val MIN = new MagicStreamId("-")
final val MAX = new MagicStreamId("+")
final val AUTOGENERATE = new MagicStreamId("*")
final val ADDED = new MagicStreamId("$")
final val UNDELIVERED = new MagicStreamId(">")
final val MIN_VALID = StreamId(0, 1)

def deserialize(s: ByteString): StreamId = {
val f = s.utf8String.split('-')
StreamId(f(0).toLong, f(1).toLong)
}
}

case class StreamEntry[F, V](id: StreamId, fields: Seq[(F, V)])

sealed trait TrimStrategy {
def toByteString: Seq[ByteString]
}

case class MaxLength(count: Long, approximate: Boolean = false) extends TrimStrategy {
def toByteString: Seq[ByteString] = {
val builder = Seq.newBuilder[ByteString]
builder += ByteString("MAXLEN")
if (approximate)
builder += ByteString("~")
builder += ByteString(count.toString)
builder.result()
}
}
Loading