-
Notifications
You must be signed in to change notification settings - Fork 64
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Implement PubSub api #714
Implement PubSub api #714
Changes from 1 commit
a280504
cd4a3f3
a01213d
9541519
ba89a59
9723182
65b524b
892ba20
43edb47
a6f74fa
c7decb7
b6438f0
5fb75e4
ce97f95
a25805b
fd0b303
7cb1491
2ab2db1
87eee71
f532e4b
e9e2a3c
014d2e4
49c141b
8b6ce73
f44c7fc
758ea9b
9febcf8
92aea5e
3fc77c0
3c8a307
0ea75a1
ef3b9f1
478577b
c4105ff
f432815
149ae4b
67652e6
8606409
6a3f68f
1a0862f
92457c4
bf52e26
c027295
a154683
592d327
386fefd
415f75e
1699211
be11d67
552e002
e218b08
ac85cb9
dadb5e7
88c3f4e
6e50d4d
5ca98e7
539ef3f
63cc263
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -12,10 +12,11 @@ final class SingleNodeRedisPubSub( | |
pubSubHubsRef: Ref[Map[SubscriptionKey, Hub[Take[RedisError, PushProtocol]]]], | ||
reqQueue: Queue[Request], | ||
resQueue: Queue[Promise[RedisError, PushProtocol]], | ||
connection: RedisConnection | ||
connection: RedisConnection, | ||
implicit val codec: BinaryCodec | ||
) extends RedisPubSub { | ||
|
||
def execute(command: PubSubCommand): ZIO[BinaryCodec, RedisError, Chunk[Stream[RedisError, RespValue]]] = { | ||
def execute(command: PubSubCommand): IO[RedisError, Chunk[Stream[RedisError, RespValue]]] = { | ||
def applyCallback( | ||
streams: Chunk[Stream[RedisError, PushProtocol]], | ||
callback: PushProtocol => IO[RedisError, Unit] | ||
|
@@ -47,7 +48,7 @@ final class SingleNodeRedisPubSub( | |
private def subscribe( | ||
channel: String, | ||
channels: List[String] | ||
): ZIO[BinaryCodec, RedisError, Chunk[Stream[RedisError, PushProtocol]]] = | ||
): IO[RedisError, Chunk[Stream[RedisError, PushProtocol]]] = | ||
makeSubscriptionStream( | ||
PubSub.Subscribe, | ||
channelKey(channel), | ||
|
@@ -57,7 +58,7 @@ final class SingleNodeRedisPubSub( | |
private def pSubscribe( | ||
pattern: String, | ||
patterns: List[String] | ||
): ZIO[BinaryCodec, RedisError, Chunk[Stream[RedisError, PushProtocol]]] = | ||
): IO[RedisError, Chunk[Stream[RedisError, PushProtocol]]] = | ||
makeSubscriptionStream( | ||
PubSub.PSubscribe, | ||
patternKey(pattern), | ||
|
@@ -66,7 +67,7 @@ final class SingleNodeRedisPubSub( | |
|
||
private def unsubscribe( | ||
channels: List[String] | ||
): ZIO[BinaryCodec, RedisError, Chunk[Stream[RedisError, PushProtocol]]] = | ||
): IO[RedisError, Chunk[Stream[RedisError, PushProtocol]]] = | ||
makeUnsubscriptionStream( | ||
PubSub.Unsubscribe, | ||
if (channels.nonEmpty) | ||
|
@@ -77,7 +78,7 @@ final class SingleNodeRedisPubSub( | |
|
||
private def pUnsubscribe( | ||
patterns: List[String] | ||
): ZIO[BinaryCodec, RedisError, Chunk[Stream[RedisError, PushProtocol]]] = | ||
): IO[RedisError, Chunk[Stream[RedisError, PushProtocol]]] = | ||
makeUnsubscriptionStream( | ||
PubSub.PUnsubscribe, | ||
if (patterns.nonEmpty) | ||
|
@@ -90,34 +91,30 @@ final class SingleNodeRedisPubSub( | |
command: String, | ||
key: SubscriptionKey, | ||
keys: List[SubscriptionKey] | ||
) = | ||
ZIO.serviceWithZIO[BinaryCodec] { implicit codec => | ||
for { | ||
promises <- Promise.make[RedisError, PushProtocol].replicateZIO(keys.size + 1).map(Chunk.fromIterable(_)) | ||
chunk = StringInput.encode(command) ++ NonEmptyList(StringInput).encode((key.value, keys.map(_.value))) | ||
_ <- reqQueue.offer(Request(chunk, promises)) | ||
streams <- ZIO.foreach((key :: keys) zip promises) { case (key, promise) => | ||
for { | ||
hub <- getHub(key) | ||
stream = ZStream.fromHub(hub).flattenTake | ||
} yield ZStream.fromZIO(promise.await) concat stream | ||
} | ||
} yield Chunk.fromIterable(streams) | ||
} | ||
)(implicit codec: BinaryCodec) = | ||
for { | ||
promises <- Promise.make[RedisError, PushProtocol].replicateZIO(keys.size + 1).map(Chunk.fromIterable(_)) | ||
chunk = StringInput.encode(command) ++ NonEmptyList(StringInput).encode((key.value, keys.map(_.value))) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Can this be done in the There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. To remove |
||
_ <- reqQueue.offer(Request(chunk, promises)) | ||
streams <- ZIO.foreach((key :: keys) zip promises) { case (key, promise) => | ||
for { | ||
hub <- getHub(key) | ||
stream = ZStream.fromHub(hub).flattenTake | ||
} yield ZStream.fromZIO(promise.await) concat stream | ||
} | ||
} yield Chunk.fromIterable(streams) | ||
|
||
private def makeUnsubscriptionStream( | ||
command: String, | ||
keys: UIO[List[SubscriptionKey]] | ||
) = | ||
ZIO.serviceWithZIO[BinaryCodec] { implicit codec => | ||
for { | ||
targets <- keys | ||
chunk = StringInput.encode(command) ++ Varargs(StringInput).encode(targets.map(_.value)) | ||
promises <- Promise.make[RedisError, PushProtocol].replicateZIO(targets.size).map(Chunk.fromIterable(_)) | ||
_ <- reqQueue.offer(Request(chunk, promises)) | ||
streams = promises.map(promise => ZStream.fromZIO(promise.await)) | ||
} yield streams | ||
} | ||
)(implicit codec: BinaryCodec) = | ||
for { | ||
targets <- keys | ||
chunk = StringInput.encode(command) ++ Varargs(StringInput).encode(targets.map(_.value)) | ||
promises <- Promise.make[RedisError, PushProtocol].replicateZIO(targets.size).map(Chunk.fromIterable(_)) | ||
_ <- reqQueue.offer(Request(chunk, promises)) | ||
streams = promises.map(promise => ZStream.fromZIO(promise.await)) | ||
} yield streams | ||
|
||
private def getHub(key: SubscriptionKey) = { | ||
def makeNewHub = | ||
|
@@ -152,7 +149,7 @@ final class SingleNodeRedisPubSub( | |
) | ||
} | ||
|
||
private def receive: ZIO[BinaryCodec, RedisError, Unit] = { | ||
private def receive: IO[RedisError, Unit] = { | ||
def handlePushProtocolMessage(msg: PushProtocol): UIO[Unit] = { | ||
def releasePendingPromise(msg: PushProtocol): UIO[Unit] = | ||
resQueue.take.flatMap(_.succeed(msg)).unit | ||
|
@@ -188,31 +185,28 @@ final class SingleNodeRedisPubSub( | |
} | ||
} | ||
|
||
ZIO.serviceWithZIO[BinaryCodec] { implicit codec => | ||
connection.read | ||
.mapError(RedisError.IOError(_)) | ||
.via(RespValue.decoder) | ||
.collectSome | ||
.mapZIO(resp => ZIO.attempt(PushProtocolOutput.unsafeDecode(resp))) | ||
.refineToOrDie[RedisError] | ||
.foreach(handlePushProtocolMessage(_)) | ||
} | ||
connection.read | ||
.mapError(RedisError.IOError(_)) | ||
.via(RespValue.decoder) | ||
.collectSome | ||
.mapZIO(resp => ZIO.attempt(PushProtocolOutput.unsafeDecode(resp))) | ||
.refineToOrDie[RedisError] | ||
.foreach(handlePushProtocolMessage(_)) | ||
} | ||
|
||
private def resubscribe: ZIO[BinaryCodec, RedisError, Unit] = | ||
ZIO.serviceWithZIO[BinaryCodec] { implicit codec => | ||
def makeCommand(name: String, keys: Set[String]) = | ||
RespValue.Array(StringInput.encode(name) ++ Varargs(StringInput).encode(keys)).serialize | ||
|
||
for { | ||
keySet <- pubSubHubsRef.get.map(_.keySet) | ||
(channels, patterns) = keySet.partition(_.isChannel) | ||
_ <- (connection.write(makeCommand(PubSub.Subscribe, channels.map(_.value))).when(channels.nonEmpty) *> | ||
connection.write(makeCommand(PubSub.PSubscribe, patterns.map(_.value))).when(patterns.nonEmpty)) | ||
.mapError(RedisError.IOError(_)) | ||
.retryWhile(True) | ||
} yield () | ||
} | ||
private def resubscribe: IO[RedisError, Unit] = { | ||
def makeCommand(name: String, keys: Set[String]) = | ||
RespValue.Array(StringInput.encode(name) ++ Varargs(StringInput).encode(keys)).serialize | ||
|
||
for { | ||
keySet <- pubSubHubsRef.get.map(_.keySet) | ||
(channels, patterns) = keySet.partition(_.isChannel) | ||
_ <- (connection.write(makeCommand(PubSub.Subscribe, channels.map(_.value))).when(channels.nonEmpty) *> | ||
connection.write(makeCommand(PubSub.PSubscribe, patterns.map(_.value))).when(patterns.nonEmpty)) | ||
.mapError(RedisError.IOError(_)) | ||
.retryWhile(True) | ||
} yield () | ||
} | ||
|
||
private def patternKey(key: String) = SubscriptionKey(key, true) | ||
private def channelKey(key: String) = SubscriptionKey(key, false) | ||
|
@@ -221,9 +215,9 @@ final class SingleNodeRedisPubSub( | |
* Opens a connection to the server and launches receive operations. All failures are retried by opening a new | ||
* connection. Only exits by interruption or defect. | ||
*/ | ||
val run: ZIO[BinaryCodec, RedisError, AnyVal] = | ||
val run: IO[RedisError, AnyVal] = | ||
ZIO.logTrace(s"$this Executable sender and reader has been started") *> | ||
(send.repeat[BinaryCodec, Long](Schedule.forever) race receive) | ||
(send.repeat(Schedule.forever) race receive) | ||
.tapError(e => ZIO.logWarning(s"Reconnecting due to error: $e") *> resubscribe) | ||
.retryWhile(True) | ||
.tapError(e => ZIO.logError(s"Executor exiting: $e")) | ||
|
@@ -242,12 +236,12 @@ object SingleNodeRedisPubSub { | |
|
||
private final val RequestQueueSize = 16 | ||
|
||
def create(conn: RedisConnection) = | ||
def create(conn: RedisConnection, codec: BinaryCodec) = | ||
for { | ||
hubRef <- Ref.make(Map.empty[SubscriptionKey, Hub[Take[RedisError, PushProtocol]]]) | ||
reqQueue <- Queue.bounded[Request](RequestQueueSize) | ||
resQueue <- Queue.unbounded[Promise[RedisError, PushProtocol]] | ||
pubSub = new SingleNodeRedisPubSub(hubRef, reqQueue, resQueue, conn) | ||
pubSub = new SingleNodeRedisPubSub(hubRef, reqQueue, resQueue, conn, codec) | ||
_ <- pubSub.run.forkScoped | ||
_ <- logScopeFinalizer(s"$pubSub Node PubSub is closed") | ||
} yield pubSub | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you do the encoding here and avoid passing the codec as a layer?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it can be removed from
RedisPubSub
interface butSignleNodeRedisPubSub
needs codec for managing sub/unsub state and re-subscribing for failover so I moved codec intoSignleNodeRedisPubSub
's field