Skip to content

Commit

Permalink
Add optional connect timeout for RedisClient & RedisBlockingClient to…
Browse files Browse the repository at this point in the history
… avoid blocking all cache calls until default akka TCP timeout kicks in.

Also, changing RedisClientActor instantiation to recommended type safe method.
  • Loading branch information
stsmedia committed Sep 16, 2016
1 parent f1c1101 commit 6c9734c
Show file tree
Hide file tree
Showing 4 changed files with 35 additions and 20 deletions.
21 changes: 12 additions & 9 deletions src/main/scala/redis/Redis.scala
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@ import redis.api.pubsub._
import java.util.concurrent.atomic.AtomicLong
import akka.event.Logging

import scala.concurrent.duration.FiniteDuration

trait RedisCommands
extends Keys
with Strings
Expand All @@ -23,17 +25,16 @@ trait RedisCommands
with Server
with HyperLogLog

abstract class RedisClientActorLike(system: ActorSystem, redisDispatcher: RedisDispatcher) extends ActorRequest {
abstract class RedisClientActorLike(system: ActorSystem, redisDispatcher: RedisDispatcher, connectTimeout: Option[FiniteDuration] = None) extends ActorRequest {
var host: String
var port: Int
val name: String
val password: Option[String] = None
val db: Option[Int] = None
implicit val executionContext = system.dispatchers.lookup(redisDispatcher.name)

val redisConnection: ActorRef = system.actorOf(
Props(classOf[RedisClientActor], new InetSocketAddress(host, port), getConnectOperations,
onConnectStatus, redisDispatcher.name)
val redisConnection: ActorRef = system.actorOf(RedisClientActor.props(new InetSocketAddress(host, port),
getConnectOperations, onConnectStatus, redisDispatcher.name, connectTimeout)
.withDispatcher(redisDispatcher.name),
name + '-' + Redis.tempName()
)
Expand All @@ -51,7 +52,7 @@ abstract class RedisClientActorLike(system: ActorSystem, redisDispatcher: RedisD
db.foreach(redis.select(_))
}

def onConnectStatus(): (Boolean) => Unit = (status: Boolean) => {
def onConnectStatus: (Boolean) => Unit = (status: Boolean) => {

}

Expand All @@ -76,21 +77,23 @@ case class RedisClient(var host: String = "localhost",
var port: Int = 6379,
override val password: Option[String] = None,
override val db: Option[Int] = None,
name: String = "RedisClient")
name: String = "RedisClient",
connectTimeout: Option[FiniteDuration] = None)
(implicit _system: ActorSystem,
redisDispatcher: RedisDispatcher = Redis.dispatcher
) extends RedisClientActorLike(_system, redisDispatcher) with RedisCommands with Transactions {
) extends RedisClientActorLike(_system, redisDispatcher, connectTimeout) with RedisCommands with Transactions {

}

case class RedisBlockingClient(var host: String = "localhost",
var port: Int = 6379,
override val password: Option[String] = None,
override val db: Option[Int] = None,
name: String = "RedisBlockingClient")
name: String = "RedisBlockingClient",
connectTimeout: Option[FiniteDuration] = None)
(implicit _system: ActorSystem,
redisDispatcher: RedisDispatcher = Redis.dispatcher
) extends RedisClientActorLike(_system, redisDispatcher) with BLists {
) extends RedisClientActorLike(_system, redisDispatcher, connectTimeout) with BLists {
}

case class RedisPubSub(
Expand Down
7 changes: 3 additions & 4 deletions src/main/scala/redis/RedisPool.scala
Original file line number Diff line number Diff line change
Expand Up @@ -77,10 +77,9 @@ abstract class RedisClientPoolLike(system: ActorSystem, redisDispatcher: RedisDi
}

def makeRedisClientActor(server: RedisServer, active: Ref[Boolean]): ActorRef = {
system.actorOf(
Props(classOf[RedisClientActor], new InetSocketAddress(server.host, server.port),
getConnectOperations(server), onConnectStatus(server, active), redisDispatcher.name)
.withDispatcher(redisDispatcher.name),
system.actorOf(RedisClientActor.props(new InetSocketAddress(server.host, server.port),
getConnectOperations(server), onConnectStatus(server, active), redisDispatcher.name)
.withDispatcher(redisDispatcher.name),
name + '-' + Redis.tempName()
)
}
Expand Down
22 changes: 17 additions & 5 deletions src/main/scala/redis/actors/RedisClientActor.scala
Original file line number Diff line number Diff line change
@@ -1,14 +1,26 @@
package redis.actors

import akka.util.{ByteString, ByteStringBuilder}
import java.net.InetSocketAddress
import redis.{RedisDispatcher, Redis, Operation, Transaction}

import akka.actor.SupervisorStrategy.Stop
import akka.actor._
import akka.util.{ByteString, ByteStringBuilder}
import redis.{Operation, Transaction}

import scala.collection.mutable
import akka.actor.SupervisorStrategy.Stop
import scala.concurrent.duration.FiniteDuration

object RedisClientActor {

def props( address: InetSocketAddress, getConnectOperations: () => Seq[Operation[_, _]],
onConnectStatus: Boolean => Unit,
dispatcherName: String,
connectTimeout: Option[FiniteDuration] = None) =
Props(new RedisClientActor(address, getConnectOperations, onConnectStatus, dispatcherName, connectTimeout))
}

class RedisClientActor(override val address: InetSocketAddress, getConnectOperations: () =>
Seq[Operation[_, _]], onConnectStatus: Boolean => Unit, dispatcherName: String) extends RedisWorkerIO(address,onConnectStatus) {
Seq[Operation[_, _]], onConnectStatus: Boolean => Unit, dispatcherName: String, connectTimeout: Option[FiniteDuration] = None) extends RedisWorkerIO(address, onConnectStatus, connectTimeout) {


import context._
Expand All @@ -24,7 +36,7 @@ class RedisClientActor(override val address: InetSocketAddress, getConnectOperat
var queuePromises = mutable.Queue[Operation[_, _]]()

def writing: Receive = {
case op : Operation[_,_] =>
case op: Operation[_, _] =>
queuePromises enqueue op
write(op.redisCommand.encodedRequest)
case Transaction(commands) => {
Expand Down
5 changes: 3 additions & 2 deletions src/main/scala/redis/actors/RedisWorkerIO.scala
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,9 @@ import akka.io.Tcp.Register
import akka.io.Tcp.Connect
import akka.io.Tcp.CommandFailed
import akka.io.Tcp.Received
import scala.concurrent.duration.FiniteDuration

abstract class RedisWorkerIO(val address: InetSocketAddress, onConnectStatus: Boolean => Unit ) extends Actor with ActorLogging {
abstract class RedisWorkerIO(val address: InetSocketAddress, onConnectStatus: Boolean => Unit, connectTimeout: Option[FiniteDuration] = None) extends Actor with ActorLogging {

private var currAddress = address

Expand All @@ -33,7 +34,7 @@ abstract class RedisWorkerIO(val address: InetSocketAddress, onConnectStatus: Bo
log.info(s"Connect to $currAddress")
// Create a new InetSocketAddress to clear the cached IP address.
currAddress = new InetSocketAddress(currAddress.getHostName, currAddress.getPort)
tcp ! Connect(currAddress, options = SO.KeepAlive(on = true) :: Nil)
tcp ! Connect(remoteAddress = currAddress, options = SO.KeepAlive(on = true) :: Nil, timeout = connectTimeout)
}

def reconnect() = {
Expand Down

0 comments on commit 6c9734c

Please sign in to comment.