forked from acrosa/scala-redis
-
Notifications
You must be signed in to change notification settings - Fork 218
/
Copy pathRedisCluster.scala
79 lines (64 loc) · 2.39 KB
/
RedisCluster.scala
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
package com.redis.cluster
import com.redis._
import com.redis.serialization._
import scala.util.{Failure, Success, Try}
class RedisCluster(
protected val hosts: List[ClusterNode],
override protected val keyTag: Option[KeyTag]
)
extends RedisClusterOps
with WithHashRing[IdentifiableRedisClientPool]
with BaseOps
with NodeOps
with StringOps
with ListOps
with SetOps
with SortedSetOps
// with GeoOps todo: implement GeoApi
with EvalOps
// with HyperLogLogOps todo: implement HyperLogLogApi
with HashOps {
// instantiating a cluster will automatically connect participating nodes to the server
protected[cluster] val clients: List[IdentifiableRedisClientPool] = hosts.map { h =>
new IdentifiableRedisClientPool(h)
}
// the hash ring will instantiate with the nodes up and added
override protected[cluster] val hr: HashRing[IdentifiableRedisClientPool] =
HashRing[IdentifiableRedisClientPool](clients, POINTS_PER_SERVER)
override def nodeForKey(key: Any)(implicit format: Format): IdentifiableRedisClientPool = {
val bKey = format(key)
hr.getNode(keyTag.flatMap(_.tag(bKey.toIndexedSeq)).getOrElse(bKey.toIndexedSeq))
}
override def addServer(server: ClusterNode): Try[Unit] = Try {
val instance = (new IdentifiableRedisClientPool(server))
if (instance.withClient(_.ping) == pong) {
Success(instance)
} else {
Failure(new Throwable(s"Ping method failed for $server"))
}
}.flatten.map { i => hr.addNode(i) }
override def replaceServer(server: ClusterNode): Unit = {
hr replaceNode new IdentifiableRedisClientPool(server) match {
case Some(clientPool) => clientPool.close()
case None =>
}
}
override def removeServer(nodename: String): Unit =
hr.cluster.find(_.node.nodename.equals(nodename)) match {
case Some(pool) =>
hr.removeNode(pool)
Try(pool.close())
case None =>
}
override def listServers: List[ClusterNode] = {
hr.cluster.map(_.node).toList
}
override protected[cluster] def onAllConns[T](body: RedisClient => T): Iterable[T] =
hr.cluster.map(p => p.withClient { client => body(client) })
override def close(): Unit =
hr.cluster.foreach(_.close())
override protected[cluster] def randomNode(): RedisClientPool = {
val rni = r.nextInt(hr.cluster.size)
hr.cluster(rni)
}
}