Skip to content

Commit

Permalink
make request queue size configurable (#922)
Browse files Browse the repository at this point in the history
  • Loading branch information
mberndt123 authored Nov 23, 2023
1 parent bac8cc1 commit 5f0dbb2
Show file tree
Hide file tree
Showing 4 changed files with 46 additions and 30 deletions.
11 changes: 8 additions & 3 deletions modules/redis/src/main/scala/zio/redis/config.scala
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,18 @@ package zio.redis

import zio.{Chunk, Duration, durationInt}

final case class RedisConfig(host: String, port: Int)
final case class RedisConfig(host: String, port: Int, requestQueueSize: Int = RedisConfig.DefaultRequestQueueSize)

object RedisConfig {
lazy val Local: RedisConfig = RedisConfig("localhost", 6379)
lazy val Local: RedisConfig = RedisConfig("localhost", 6379)
val DefaultRequestQueueSize: Int = 16
}

final case class RedisClusterConfig(addresses: Chunk[RedisUri], retry: RetryClusterConfig = RetryClusterConfig.Default)
final case class RedisClusterConfig(
addresses: Chunk[RedisUri],
retry: RetryClusterConfig = RetryClusterConfig.Default,
requestQueueSize: Int = RedisConfig.DefaultRequestQueueSize
)

final case class RetryClusterConfig(base: Duration, factor: Double, maxRecurs: Int)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ private[redis] final class ClusterExecutor private (

def executeAsk(address: RedisUri): GenRedis.Sync[RespValue] =
for {
executor <- executor(address)
executor <- executor(address, config.requestQueueSize)
_ <- GenRedis.sync(executor.execute(askingCommand.resp(())))
res <- GenRedis.sync(executor.execute(command))
} yield res
Expand All @@ -51,7 +51,7 @@ private[redis] final class ClusterExecutor private (
case success => ZIO.succeed(success)
}.catchSome[Any, RedisError, RespValue] {
case e: RedisError.Ask => executeAsk(e.address)
case _: RedisError.Moved => refreshConnect *> execute(keySlot)
case _: RedisError.Moved => refreshConnect(config.requestQueueSize) *> execute(keySlot)
}
recover.retry(retryPolicy)
}
Expand All @@ -69,19 +69,21 @@ private[redis] final class ClusterExecutor private (
clusterConnection.get.map(_.executor(slot)).flatMap(ZIO.fromOption(_).orElseFail(CusterKeyExecutorError))

// TODO introduce max connection amount
private def executor(address: RedisUri): IO[RedisError.IOError, RedisExecutor] =
private def executor(address: RedisUri, requestQueueSize: Int): IO[RedisError.IOError, RedisExecutor] =
clusterConnection.modifyZIO { cc =>
val executorOpt = cc.executors.get(address).map(es => (es.executor, cc))
val enrichedClusterIO =
scope.extend[Any](connectToNode(address)).map(es => (es.executor, cc.addExecutor(address, es)))
scope
.extend[Any](connectToNode(address, requestQueueSize))
.map(es => (es.executor, cc.addExecutor(address, es)))
ZIO.fromOption(executorOpt).catchAll(_ => enrichedClusterIO)
}

private def refreshConnect: IO[RedisError, Unit] =
private def refreshConnect(requestQueueSize: Int): IO[RedisError, Unit] =
clusterConnection.updateZIO { connection =>
val addresses = connection.partitions.flatMap(_.addresses)
for {
cluster <- scope.extend[Any](initConnectToCluster(addresses))
cluster <- scope.extend[Any](initConnectToCluster(addresses, requestQueueSize))
_ <- ZIO.foreachParDiscard(connection.executors) { case (_, es) => es.scope.close(Exit.unit) }
} yield cluster
}
Expand Down Expand Up @@ -113,39 +115,45 @@ private[redis] object ClusterExecutor {
scope: Scope.Closeable
): ZIO[Scope, RedisError, ClusterExecutor] =
for {
connection <- initConnectToCluster(config.addresses)
connection <- initConnectToCluster(config.addresses, config.requestQueueSize)
ref <- Ref.Synchronized.make(connection)
executor = new ClusterExecutor(ref, config, scope)
_ <- logScopeFinalizer("Cluster executor is closed")
} yield executor

private def initConnectToCluster(addresses: Chunk[RedisUri]): ZIO[Scope, RedisError, ClusterConnection] =
private def initConnectToCluster(
addresses: Chunk[RedisUri],
requestQueueSize: Int
): ZIO[Scope, RedisError, ClusterConnection] =
ZIO
.collectFirst(addresses) { address =>
connectToCluster(address).foldZIO(
connectToCluster(address, requestQueueSize).foldZIO(
error => ZIO.logError(s"The connection to cluster failed. Cause: $error").as(None),
cc => ZIO.logInfo("The connection to cluster has been established").as(Some(cc))
)
}
.flatMap(cc => ZIO.getOrFailWith(CusterConnectionError)(cc))

private def connectToCluster(address: RedisUri) =
private def connectToCluster(address: RedisUri, requestQueueSize: Int) =
for {
temporaryRedis <- redis(address)
(trLayer, trScope) = temporaryRedis
partitions <- ZIO.serviceWithZIO[Redis](_.slots).provideLayer(trLayer)
_ <- ZIO.logTrace(s"Cluster configs:\n${partitions.mkString("\n")}")
uniqueAddresses = partitions.map(_.master.address).distinct
uriExecScope <- ZIO.foreachPar(uniqueAddresses)(address => connectToNode(address).map(es => address -> es))
uriExecScope <-
ZIO.foreachPar(uniqueAddresses)(address => connectToNode(address, requestQueueSize).map(es => address -> es))
slots = slotAddress(partitions)
_ <- trScope.close(Exit.unit)
} yield ClusterConnection(partitions, uriExecScope.toMap, slots)

private def connectToNode(address: RedisUri) =
private def connectToNode(address: RedisUri, requestQueueSize: Int) =
for {
closableScope <- Scope.make
connection <- closableScope.extend[Any](RedisConnection.create(RedisConfig(address.host, address.port)))
executor <- closableScope.extend[Any](SingleNodeExecutor.create(connection))
cfg = RedisConfig(address.host, address.port, requestQueueSize)
connection <- closableScope.extend[Any](RedisConnection.create(cfg))
executor <-
closableScope.extend[Any](SingleNodeExecutor.create(connection).provideSome[Scope](ZLayer.succeed(cfg)))
layerScope <- ZIO.scope
_ <- layerScope.addFinalizerExit(closableScope.close(_))
} yield ExecutorScope(executor, closableScope)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package zio.redis.internal

import zio._
import zio.redis.RedisError.IOError
import zio.redis._
import zio.stream.{Stream, ZStream}

Expand Down Expand Up @@ -73,8 +74,8 @@ private[redis] object RedisConnection {
lazy val layer: ZLayer[RedisConfig, RedisError.IOError, RedisConnection] =
ZLayer.scoped(ZIO.serviceWithZIO[RedisConfig](create))

lazy val local: ZLayer[Any, RedisError.IOError, RedisConnection] =
ZLayer.succeed(RedisConfig.Local) >>> layer
lazy val local: ZLayer[Any, IOError, RedisConfig & RedisConnection] =
ZLayer.succeed(RedisConfig.Local) >+> layer

def create(uri: RedisConfig): ZIO[Scope, RedisError.IOError, RedisConnection] =
connect(new InetSocketAddress(uri.host, uri.port))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,8 @@ import zio.redis.{RedisConfig, RedisError}
private[redis] final class SingleNodeExecutor private (
connection: RedisConnection,
requests: Queue[Request],
responses: Queue[Promise[RedisError, RespValue]]
responses: Queue[Promise[RedisError, RespValue]],
requestQueueSize: Int
) extends SingleNodeRunner
with RedisExecutor {

Expand All @@ -36,7 +37,7 @@ private[redis] final class SingleNodeExecutor private (
def onError(e: RedisError): UIO[Unit] = responses.takeAll.flatMap(ZIO.foreachDiscard(_)(_.fail(e)))

def send: IO[RedisError.IOError, Unit] =
requests.takeBetween(1, RequestQueueSize).flatMap { requests =>
requests.takeBetween(1, requestQueueSize).flatMap { requests =>
val bytes =
requests
.foldLeft(new ChunkBuilder.Byte())((builder, req) => builder ++= RespValue.Array(req.command).asBytes)
Expand Down Expand Up @@ -65,20 +66,21 @@ private[redis] object SingleNodeExecutor {
lazy val layer: ZLayer[RedisConfig, RedisError.IOError, RedisExecutor] =
RedisConnection.layer >>> makeLayer

lazy val local: ZLayer[Any, RedisError.IOError, RedisExecutor] =
RedisConnection.local >>> makeLayer
def local: ZLayer[Any, RedisError.IOError, RedisExecutor] =
ZLayer.succeed(RedisConfig.Local) >>> layer

def create(connection: RedisConnection): URIO[Scope, SingleNodeExecutor] =
def create(connection: RedisConnection): URIO[Scope & RedisConfig, SingleNodeExecutor] =
for {
requests <- Queue.bounded[Request](RequestQueueSize)
responses <- Queue.unbounded[Promise[RedisError, RespValue]]
executor = new SingleNodeExecutor(connection, requests, responses)
_ <- executor.run.forkScoped
_ <- logScopeFinalizer(s"$executor Node Executor is closed")
requestQueueSize <- ZIO.serviceWith[RedisConfig](_.requestQueueSize)
requests <- Queue.bounded[Request](requestQueueSize)
responses <- Queue.unbounded[Promise[RedisError, RespValue]]
executor = new SingleNodeExecutor(connection, requests, responses, requestQueueSize)
_ <- executor.run.forkScoped
_ <- logScopeFinalizer(s"$executor Node Executor is closed")
} yield executor

private final case class Request(command: Chunk[RespValue.BulkString], promise: Promise[RedisError, RespValue])

private def makeLayer: ZLayer[RedisConnection, RedisError.IOError, RedisExecutor] =
private def makeLayer: ZLayer[RedisConnection & RedisConfig, RedisError.IOError, RedisExecutor] =
ZLayer.scoped(ZIO.serviceWithZIO[RedisConnection](create))
}

0 comments on commit 5f0dbb2

Please sign in to comment.