From 921e71093399ae5779331dccd52bbfa459ec7f2d Mon Sep 17 00:00:00 2001 From: Christopher Davenport Date: Thu, 14 Oct 2021 20:57:52 -0700 Subject: [PATCH 1/9] KeySpace Based Cache Updates --- build.sbt | 2 +- .../rediculous/concurrent/RedisCache.scala | 35 ++++++++++++++- examples/src/main/scala/Main.scala | 44 +++++++++++-------- 3 files changed, 61 insertions(+), 20 deletions(-) diff --git a/build.sbt b/build.sbt index 9d44bf2..4c9ba30 100644 --- a/build.sbt +++ b/build.sbt @@ -26,7 +26,7 @@ lazy val core = project.in(file("core")) "io.circe" %% "circe-core" % circeV, "io.circe" %% "circe-parser" % circeV, - "io.chrisdavenport" %% "rediculous" % "0.1.0", + "io.chrisdavenport" %% "rediculous" % "0.1.1+14-89573308-SNAPSHOT", "io.chrisdavenport" %% "mapref" % "0.2.0-M2", "io.chrisdavenport" %% "circuit" % "0.5.0-M1", "io.chrisdavenport" %% "mules" % "0.5.0-M1", diff --git a/core/src/main/scala/io/chrisdavenport/rediculous/concurrent/RedisCache.scala b/core/src/main/scala/io/chrisdavenport/rediculous/concurrent/RedisCache.scala index 69e5d9b..f7ee4c9 100644 --- a/core/src/main/scala/io/chrisdavenport/rediculous/concurrent/RedisCache.scala +++ b/core/src/main/scala/io/chrisdavenport/rediculous/concurrent/RedisCache.scala @@ -6,6 +6,7 @@ import cats.conversions._ import cats.effect._ import io.chrisdavenport.mules._ import io.chrisdavenport.rediculous._ +import cats.effect.syntax.all._ object RedisCache { @@ -45,6 +46,39 @@ object RedisCache { } + def keySpacePubSubLayered[F[_]: Async]( + topCache: Cache[F, String, String], + connection: RedisConnection[F], + namespace: String, + setOpts: RedisCommands.SetOpts + ): Resource[F, Cache[F, String, String]] = { + val nameSpaceStarter = namespace ++ ":" + RedisPubSub.fromConnection( + connection, + 4096, + Function.const(Applicative[F].unit), + Function.const(Applicative[F].unit) + ).evalMap{ pubsub => + def invalidateTopCache(message: RedisPubSub.PubSubMessage.PMessage): F[Unit] = { + val channel = message.channel + val msg = message.message + val keyR = ("__keyspace.*__:" + nameSpaceStarter + "(.*)").r + val parsed: String = channel match { + case keyR(key) => key + } + msg match { + case "set" | "expired" | "del" => topCache.delete(parsed) >> Concurrent[F].unit.map(_ => println(s"Deleted $parsed")) + case _ => Concurrent[F].unit.map(_ => println(s"Unhandled $message")) + } + } + pubsub.psubscribe(s"__keyspace*__:$nameSpaceStarter*", invalidateTopCache) + .as(pubsub) + }.flatMap(pubsub => pubsub.runMessages.background.void).as{ + val redis = instance(connection, namespace, setOpts) + layer(topCache, redis) + } + } + def instance[F[_]: Async]( connection: RedisConnection[F], namespace: String, @@ -66,7 +100,6 @@ object RedisCache { def delete(k: String): F[Unit] = RedisCommands.del(nameSpaceStarter ++ k).void.run(connection) - } } \ No newline at end of file diff --git a/examples/src/main/scala/Main.scala b/examples/src/main/scala/Main.scala index 8c5b8e7..9ed655d 100644 --- a/examples/src/main/scala/Main.scala +++ b/examples/src/main/scala/Main.scala @@ -9,6 +9,7 @@ import fs2.io.net._ import scala.concurrent.duration._ import cats.syntax.SetOps import com.comcast.ip4s._ +import _root_.io.chrisdavenport.mapref.MapRef object Main extends IOApp { @@ -21,10 +22,12 @@ object Main extends IOApp { // maxQueued: How many elements before new submissions semantically block. Tradeoff of memory to queue jobs. // Default 1000 is good for small servers. But can easily take 100,000. // workers: How many threads will process pipelined messages. - connection <- RedisConnection.queued[IO](Network[IO], host"localhost", port"6379", maxQueued = 10000, workers = 4) - } yield connection + connection <- RedisConnection.queued[IO].withHost(host"localhost").withPort(port"6379").withMaxQueued(10000).withWorkers(workers = 1).build + topCache <- Resource.eval(_root_.io.chrisdavenport.mules.MemoryCache.ofSingleImmutableMap[IO, String, String](None)) + cache <- RedisCache.keySpacePubSubLayered(topCache, connection, "namespace2", RedisCommands.SetOpts(Some(60), None, None, false)) + } yield (connection, topCache, cache) - r.use{ connection => + r.use{ case (connection, top, cache) => // val ref = RedisRef.liftedDefaultStorage( // RedisRef.lockedOptionRef(connection, "ref-test", 1.seconds, 10.seconds, RedisCommands.SetOpts(None, None, None, false)), // "0" @@ -33,20 +36,20 @@ object Main extends IOApp { // val y = x + 1 // (y,y) // } - val now = IO.delay(System.currentTimeMillis().millis) - def time[A](io: IO[A]): IO[A] = - (now, io, now).tupled.flatMap{ - case (begin, out, end) => - (end - begin).putStrLn.map(_ => out) - } - - val barrier = RedisCyclicBarrier.create[IO](connection, "test-cyclic-barrier", 5, 1.seconds, 1.seconds, 10.millis, 20.minutes, RedisCommands.SetOpts(None, Some(24.hours.toMillis), None, false)) - - Stream.repeatEval( - time( - (barrier.await, IO.race(barrier.await, Temporal[IO].sleep(500.millis) >> barrier.await), barrier.await, Temporal[IO].sleep(1.second) >> barrier.await).parTupled - ) - ).take(10).compile.drain + // val now = IO.delay(System.currentTimeMillis().millis) + // def time[A](io: IO[A]): IO[A] = + // (now, io, now).tupled.flatMap{ + // case (begin, out, end) => + // (end - begin).putStrLn.map(_ => out) + // } + + // val barrier = RedisCyclicBarrier.create[IO](connection, "test-cyclic-barrier", 5, 1.seconds, 1.seconds, 10.millis, 20.minutes, RedisCommands.SetOpts(None, Some(24.hours.toMillis), None, false)) + + // Stream.repeatEval( + // time( + // (barrier.await, IO.race(barrier.await, Temporal[IO].sleep(500.millis) >> barrier.await), barrier.await, Temporal[IO].sleep(1.second) >> barrier.await).parTupled + // ) + // ).take(10).compile.drain // // Layered Cache @@ -55,7 +58,12 @@ object Main extends IOApp { // val layered = RedisCache.layer(cache, cache2) // RedisCommands.get[Redis[IO, *]]("namespace1:test3").run(connection).flatTap(_.putStrLn) >> - // cache2.insert("test3", "value1") >> + cache.insert("test3", "value1") >> + cache.lookup("test3").flatTap(IO.println(_)) >> + RedisCommands.del[Redis[IO, *]]("namespace2:test3").run(connection).flatTap(IO.println(_)) >> + cache.lookup("test3").flatTap(IO.println(_)) + // Temporal[IO].sleep(30.seconds) >> + // layered.lookup("test3").flatTap(_.putStrLn) >> // RedisCommands.get[Redis[IO, *]]("namespace1:test3").run(connection).flatTap(_.putStrLn) >> // RedisCommands.get[Redis[IO, *]]("namespace2:test3").run(connection).flatTap(_.putStrLn) From 4f94aed2ba72feb91b56bde7f83ba5e116184f9d Mon Sep 17 00:00:00 2001 From: Christopher Davenport Date: Fri, 15 Oct 2021 08:28:50 -0700 Subject: [PATCH 2/9] Add Channel Based Cache for no redis-changes required --- build.sbt | 1 + .../rediculous/concurrent/RedisCache.scala | 30 +++++++++++++++++++ 2 files changed, 31 insertions(+) diff --git a/build.sbt b/build.sbt index 4c9ba30..5015cf3 100644 --- a/build.sbt +++ b/build.sbt @@ -30,6 +30,7 @@ lazy val core = project.in(file("core")) "io.chrisdavenport" %% "mapref" % "0.2.0-M2", "io.chrisdavenport" %% "circuit" % "0.5.0-M1", "io.chrisdavenport" %% "mules" % "0.5.0-M1", + "io.chrisdavenport" %% "single-fibered" % "0.1.0", // Deps we may use in the future, but don't need presently. // "io.circe" %% "circe-generic" % circeV, diff --git a/core/src/main/scala/io/chrisdavenport/rediculous/concurrent/RedisCache.scala b/core/src/main/scala/io/chrisdavenport/rediculous/concurrent/RedisCache.scala index f7ee4c9..b2fae2f 100644 --- a/core/src/main/scala/io/chrisdavenport/rediculous/concurrent/RedisCache.scala +++ b/core/src/main/scala/io/chrisdavenport/rediculous/concurrent/RedisCache.scala @@ -7,6 +7,7 @@ import cats.effect._ import io.chrisdavenport.mules._ import io.chrisdavenport.rediculous._ import cats.effect.syntax.all._ +import cats.data.Func object RedisCache { @@ -79,6 +80,35 @@ object RedisCache { } } + // Does not require any redis changes + // Does not see redis expirations so you will want expirations on the top + // cache to mirror the cache as best as possible as there will be some + // delays here. + def channelBasedLayered[F[_]: Async]( + topCache: Cache[F, String, String], + connection: RedisConnection[F], + namespace: String, + setOpts: RedisCommands.SetOpts + ): Resource[F, Cache[F, String, String]] = { + val channel = namespace + val redis = instance(connection, namespace, setOpts) + val layered = layer(topCache, redis) + def publishChange(key: String): F[Int] = + RedisConnection.runRequestTotal[F, Int](cats.data.NonEmptyList.of("publish", channel, key), None).run(connection) + val cache = new Cache[F, String, String]{ + def lookup(k: String): F[Option[String]] = layered.lookup(k) + + def insert(k: String, v: String): F[Unit] = layered.insert(k, v) >> publishChange(k).void + + def delete(k: String): F[Unit] = layered.delete(k) >> publishChange(k).void + } + RedisPubSub.fromConnection(connection, 4096, Function.const(Applicative[F].unit), Function.const(Applicative[F].unit)).flatMap{ + pubsub => + Resource.eval(pubsub.subscribe(channel, {message: RedisPubSub.PubSubMessage.Message => topCache.delete(message.message)})) >> + pubsub.runMessages.background.as(cache) + } + } + def instance[F[_]: Async]( connection: RedisConnection[F], namespace: String, From 1adfde3dcd74bf5ca8a8bf9587f40054fcd83d48 Mon Sep 17 00:00:00 2001 From: Christopher Davenport Date: Wed, 17 Nov 2021 09:51:38 -0800 Subject: [PATCH 3/9] Use Milestone Cache --- build.sbt | 4 ++-- .../rediculous/concurrent/RedisCache.scala | 10 +++------- 2 files changed, 5 insertions(+), 9 deletions(-) diff --git a/build.sbt b/build.sbt index 5015cf3..ccfd4e8 100644 --- a/build.sbt +++ b/build.sbt @@ -26,7 +26,7 @@ lazy val core = project.in(file("core")) "io.circe" %% "circe-core" % circeV, "io.circe" %% "circe-parser" % circeV, - "io.chrisdavenport" %% "rediculous" % "0.1.1+14-89573308-SNAPSHOT", + "io.chrisdavenport" %% "rediculous" % "0.2.0-M2", "io.chrisdavenport" %% "mapref" % "0.2.0-M2", "io.chrisdavenport" %% "circuit" % "0.5.0-M1", "io.chrisdavenport" %% "mules" % "0.5.0-M1", @@ -62,4 +62,4 @@ lazy val site = project.in(file("site")) Seq( micrositeDescription := "Redis Concurrency Structures", ) - } \ No newline at end of file + } diff --git a/core/src/main/scala/io/chrisdavenport/rediculous/concurrent/RedisCache.scala b/core/src/main/scala/io/chrisdavenport/rediculous/concurrent/RedisCache.scala index b2fae2f..c632a23 100644 --- a/core/src/main/scala/io/chrisdavenport/rediculous/concurrent/RedisCache.scala +++ b/core/src/main/scala/io/chrisdavenport/rediculous/concurrent/RedisCache.scala @@ -55,10 +55,7 @@ object RedisCache { ): Resource[F, Cache[F, String, String]] = { val nameSpaceStarter = namespace ++ ":" RedisPubSub.fromConnection( - connection, - 4096, - Function.const(Applicative[F].unit), - Function.const(Applicative[F].unit) + connection ).evalMap{ pubsub => def invalidateTopCache(message: RedisPubSub.PubSubMessage.PMessage): F[Unit] = { val channel = message.channel @@ -93,8 +90,7 @@ object RedisCache { val channel = namespace val redis = instance(connection, namespace, setOpts) val layered = layer(topCache, redis) - def publishChange(key: String): F[Int] = - RedisConnection.runRequestTotal[F, Int](cats.data.NonEmptyList.of("publish", channel, key), None).run(connection) + def publishChange(key: String) = RedisCommands.publish(channel, key).run(connection) val cache = new Cache[F, String, String]{ def lookup(k: String): F[Option[String]] = layered.lookup(k) @@ -102,7 +98,7 @@ object RedisCache { def delete(k: String): F[Unit] = layered.delete(k) >> publishChange(k).void } - RedisPubSub.fromConnection(connection, 4096, Function.const(Applicative[F].unit), Function.const(Applicative[F].unit)).flatMap{ + RedisPubSub.fromConnection(connection).flatMap{ pubsub => Resource.eval(pubsub.subscribe(channel, {message: RedisPubSub.PubSubMessage.Message => topCache.delete(message.message)})) >> pubsub.runMessages.background.as(cache) From e132816773a81a10d888cb51a48f29bc3526464d Mon Sep 17 00:00:00 2001 From: Christopher Davenport Date: Wed, 17 Nov 2021 10:06:21 -0800 Subject: [PATCH 4/9] Single-Fiber the machine --- .../rediculous/concurrent/RedisCache.scala | 55 +++++++++++-------- 1 file changed, 32 insertions(+), 23 deletions(-) diff --git a/core/src/main/scala/io/chrisdavenport/rediculous/concurrent/RedisCache.scala b/core/src/main/scala/io/chrisdavenport/rediculous/concurrent/RedisCache.scala index c632a23..0da4db7 100644 --- a/core/src/main/scala/io/chrisdavenport/rediculous/concurrent/RedisCache.scala +++ b/core/src/main/scala/io/chrisdavenport/rediculous/concurrent/RedisCache.scala @@ -8,6 +8,7 @@ import io.chrisdavenport.mules._ import io.chrisdavenport.rediculous._ import cats.effect.syntax.all._ import cats.data.Func +import io.chrisdavenport.singlefibered.SingleFibered object RedisCache { @@ -20,22 +21,29 @@ object RedisCache { * * inserts and deletes are proliferated to top and then bottom */ - def layer[F[_]: Monad, K, V](top: Cache[F, K, V], bottom: Cache[F, K, V]): Cache[F, K, V] = - new LayeredCache[F, K, V](top, bottom) + def layer[F[_]: Concurrent, K, V](top: Cache[F, K, V], bottom: Cache[F, K, V]): F[Cache[F, K, V]] = { + val f: K => F[Option[V]] = {(k: K) => top.lookup(k).flatMap{ + case s@Some(_) => s.pure[F].widen + case None => bottom.lookup(k).flatMap{ + case None => Option.empty[V].pure[F] + case s@Some(v) => + top.insert(k, v).as(s).widen + } + }} + SingleFibered.prepareFunction(f).map(preppedF => + new LayeredCache[F, K, V](top, bottom, preppedF) + ) + } + private class LayeredCache[F[_]: Monad, K, V]( topLayer: Cache[F, K, V], - bottomLayer: Cache[F, K, V] + bottomLayer: Cache[F, K, V], + lookupCached: K => F[Option[V]] ) extends Cache[F, K, V]{ def lookup(k: K): F[Option[V]] = - topLayer.lookup(k).flatMap{ - case s@Some(_) => s.pure[F].widen - case None => bottomLayer.lookup(k).flatMap{ - case None => Option.empty[V].pure[F] - case s@Some(v) => - topLayer.insert(k, v).as(s).widen - } - } + lookupCached(k) + def insert(k: K, v: V): F[Unit] = topLayer.insert(k, v) >> @@ -71,7 +79,7 @@ object RedisCache { } pubsub.psubscribe(s"__keyspace*__:$nameSpaceStarter*", invalidateTopCache) .as(pubsub) - }.flatMap(pubsub => pubsub.runMessages.background.void).as{ + }.flatMap(pubsub => pubsub.runMessages.background.void).evalMap {_ => val redis = instance(connection, namespace, setOpts) layer(topCache, redis) } @@ -89,19 +97,20 @@ object RedisCache { ): Resource[F, Cache[F, String, String]] = { val channel = namespace val redis = instance(connection, namespace, setOpts) - val layered = layer(topCache, redis) def publishChange(key: String) = RedisCommands.publish(channel, key).run(connection) - val cache = new Cache[F, String, String]{ - def lookup(k: String): F[Option[String]] = layered.lookup(k) - - def insert(k: String, v: String): F[Unit] = layered.insert(k, v) >> publishChange(k).void - - def delete(k: String): F[Unit] = layered.delete(k) >> publishChange(k).void - } - RedisPubSub.fromConnection(connection).flatMap{ - pubsub => + + (Resource.eval(layer[F, String, String](topCache, redis)), RedisPubSub.fromConnection(connection)).tupled.flatMap{ + case (layered, pubsub) => Resource.eval(pubsub.subscribe(channel, {message: RedisPubSub.PubSubMessage.Message => topCache.delete(message.message)})) >> - pubsub.runMessages.background.as(cache) + pubsub.runMessages.background.as{ + new Cache[F, String, String]{ + def lookup(k: String): F[Option[String]] = layered.lookup(k) + + def insert(k: String, v: String): F[Unit] = layered.insert(k, v) >> publishChange(k).void + + def delete(k: String): F[Unit] = layered.delete(k) >> publishChange(k).void + } + } } } From 47fdb86b4c7f16013b7fa0599baea8626f77743a Mon Sep 17 00:00:00 2001 From: Christopher Davenport Date: Wed, 17 Nov 2021 15:41:42 -0800 Subject: [PATCH 5/9] Add Additional Key Actions --- .../rediculous/concurrent/RedisCache.scala | 12 +++++++----- 1 file changed, 7 insertions(+), 5 deletions(-) diff --git a/core/src/main/scala/io/chrisdavenport/rediculous/concurrent/RedisCache.scala b/core/src/main/scala/io/chrisdavenport/rediculous/concurrent/RedisCache.scala index 0da4db7..aaf5418 100644 --- a/core/src/main/scala/io/chrisdavenport/rediculous/concurrent/RedisCache.scala +++ b/core/src/main/scala/io/chrisdavenport/rediculous/concurrent/RedisCache.scala @@ -59,7 +59,8 @@ object RedisCache { topCache: Cache[F, String, String], connection: RedisConnection[F], namespace: String, - setOpts: RedisCommands.SetOpts + setOpts: RedisCommands.SetOpts, + additionalActionOnDelete: Option[String => F[Unit]] = None ): Resource[F, Cache[F, String, String]] = { val nameSpaceStarter = namespace ++ ":" RedisPubSub.fromConnection( @@ -73,8 +74,8 @@ object RedisCache { case keyR(key) => key } msg match { - case "set" | "expired" | "del" => topCache.delete(parsed) >> Concurrent[F].unit.map(_ => println(s"Deleted $parsed")) - case _ => Concurrent[F].unit.map(_ => println(s"Unhandled $message")) + case "set" | "expired" | "del" => topCache.delete(parsed) >> additionalActionOnDelete.traverse_(_.apply(message.message)) + case _ => Concurrent[F].unit } } pubsub.psubscribe(s"__keyspace*__:$nameSpaceStarter*", invalidateTopCache) @@ -93,7 +94,8 @@ object RedisCache { topCache: Cache[F, String, String], connection: RedisConnection[F], namespace: String, - setOpts: RedisCommands.SetOpts + setOpts: RedisCommands.SetOpts, + additionalActionOnDelete: Option[String => F[Unit]] = None ): Resource[F, Cache[F, String, String]] = { val channel = namespace val redis = instance(connection, namespace, setOpts) @@ -101,7 +103,7 @@ object RedisCache { (Resource.eval(layer[F, String, String](topCache, redis)), RedisPubSub.fromConnection(connection)).tupled.flatMap{ case (layered, pubsub) => - Resource.eval(pubsub.subscribe(channel, {message: RedisPubSub.PubSubMessage.Message => topCache.delete(message.message)})) >> + Resource.eval(pubsub.subscribe(channel, {message: RedisPubSub.PubSubMessage.Message => topCache.delete(message.message) >> additionalActionOnDelete.traverse_(_.apply(message.message))})) >> pubsub.runMessages.background.as{ new Cache[F, String, String]{ def lookup(k: String): F[Option[String]] = layered.lookup(k) From 64138023cbcb7f3fa3990ec0fabf13d084947f35 Mon Sep 17 00:00:00 2001 From: Christopher Davenport Date: Wed, 17 Nov 2021 15:49:58 -0800 Subject: [PATCH 6/9] Fix test scope --- .../io/chrisdavenport/rediculous/concurrent/resources.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/test/scala/io/chrisdavenport/rediculous/concurrent/resources.scala b/core/src/test/scala/io/chrisdavenport/rediculous/concurrent/resources.scala index 9951ced..8357ce7 100644 --- a/core/src/test/scala/io/chrisdavenport/rediculous/concurrent/resources.scala +++ b/core/src/test/scala/io/chrisdavenport/rediculous/concurrent/resources.scala @@ -18,7 +18,7 @@ object resources { def redisConnection: Resource[IO, RedisConnection[IO]] = for { container <- redisContainer[IO].map(_.container) - connection <- RedisConnection.queued[IO](Network[IO], Host.fromString(container.getContainerIpAddress()).get, Port.fromInt(container.getMappedPort(6379)).get, maxQueued = 10000, workers = 4) + connection <- RedisConnection.queued[IO].withHost(Host.fromString(container.getContainerIpAddress()).get).withPort(Port.fromInt(container.getMappedPort(6379)).get).build } yield connection From 9fdbd2a3bf8f2307e3172f076c3d0cc117b56685 Mon Sep 17 00:00:00 2001 From: Christopher Davenport Date: Wed, 17 Nov 2021 16:02:38 -0800 Subject: [PATCH 7/9] Only SingleFiber Bottom Cache Access --- .../rediculous/concurrent/RedisCache.scala | 11 ++++++----- 1 file changed, 6 insertions(+), 5 deletions(-) diff --git a/core/src/main/scala/io/chrisdavenport/rediculous/concurrent/RedisCache.scala b/core/src/main/scala/io/chrisdavenport/rediculous/concurrent/RedisCache.scala index aaf5418..811b197 100644 --- a/core/src/main/scala/io/chrisdavenport/rediculous/concurrent/RedisCache.scala +++ b/core/src/main/scala/io/chrisdavenport/rediculous/concurrent/RedisCache.scala @@ -22,14 +22,12 @@ object RedisCache { * inserts and deletes are proliferated to top and then bottom */ def layer[F[_]: Concurrent, K, V](top: Cache[F, K, V], bottom: Cache[F, K, V]): F[Cache[F, K, V]] = { - val f: K => F[Option[V]] = {(k: K) => top.lookup(k).flatMap{ - case s@Some(_) => s.pure[F].widen - case None => bottom.lookup(k).flatMap{ + val f: K => F[Option[V]] = {(k: K) => bottom.lookup(k).flatMap{ case None => Option.empty[V].pure[F] case s@Some(v) => top.insert(k, v).as(s).widen } - }} + } SingleFibered.prepareFunction(f).map(preppedF => new LayeredCache[F, K, V](top, bottom, preppedF) ) @@ -42,7 +40,10 @@ object RedisCache { lookupCached: K => F[Option[V]] ) extends Cache[F, K, V]{ def lookup(k: K): F[Option[V]] = - lookupCached(k) + topLayer.lookup(k).flatMap{ + case s@Some(_) => s.pure[F].widen + case None => lookupCached(k) + } def insert(k: K, v: V): F[Unit] = From 76e45bf1cb9fae5d2e8ef0d249baad79a1e65c4f Mon Sep 17 00:00:00 2001 From: Christopher Davenport Date: Wed, 17 Nov 2021 16:10:30 -0800 Subject: [PATCH 8/9] Pubsub Interactions more open --- .../chrisdavenport/rediculous/concurrent/RedisCache.scala | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/core/src/main/scala/io/chrisdavenport/rediculous/concurrent/RedisCache.scala b/core/src/main/scala/io/chrisdavenport/rediculous/concurrent/RedisCache.scala index 811b197..f90fe79 100644 --- a/core/src/main/scala/io/chrisdavenport/rediculous/concurrent/RedisCache.scala +++ b/core/src/main/scala/io/chrisdavenport/rediculous/concurrent/RedisCache.scala @@ -65,7 +65,8 @@ object RedisCache { ): Resource[F, Cache[F, String, String]] = { val nameSpaceStarter = namespace ++ ":" RedisPubSub.fromConnection( - connection + connection, + clusterBroadcast = true ).evalMap{ pubsub => def invalidateTopCache(message: RedisPubSub.PubSubMessage.PMessage): F[Unit] = { val channel = message.channel @@ -94,6 +95,7 @@ object RedisCache { def channelBasedLayered[F[_]: Async]( topCache: Cache[F, String, String], connection: RedisConnection[F], + pubsub: RedisPubSub[F], namespace: String, setOpts: RedisCommands.SetOpts, additionalActionOnDelete: Option[String => F[Unit]] = None @@ -102,8 +104,8 @@ object RedisCache { val redis = instance(connection, namespace, setOpts) def publishChange(key: String) = RedisCommands.publish(channel, key).run(connection) - (Resource.eval(layer[F, String, String](topCache, redis)), RedisPubSub.fromConnection(connection)).tupled.flatMap{ - case (layered, pubsub) => + Resource.eval(layer[F, String, String](topCache, redis)).flatMap{ + case layered => Resource.eval(pubsub.subscribe(channel, {message: RedisPubSub.PubSubMessage.Message => topCache.delete(message.message) >> additionalActionOnDelete.traverse_(_.apply(message.message))})) >> pubsub.runMessages.background.as{ new Cache[F, String, String]{ From 0971bd45738cdf3525691e63e3b294533d67b358 Mon Sep 17 00:00:00 2001 From: Christopher Davenport Date: Wed, 17 Nov 2021 17:16:09 -0800 Subject: [PATCH 9/9] Example Show working channel based --- build.sbt | 4 ++-- examples/src/main/scala/Main.scala | 14 +++++++++----- 2 files changed, 11 insertions(+), 7 deletions(-) diff --git a/build.sbt b/build.sbt index ccfd4e8..2fc9218 100644 --- a/build.sbt +++ b/build.sbt @@ -27,9 +27,9 @@ lazy val core = project.in(file("core")) "io.circe" %% "circe-parser" % circeV, "io.chrisdavenport" %% "rediculous" % "0.2.0-M2", - "io.chrisdavenport" %% "mapref" % "0.2.0-M2", + "io.chrisdavenport" %% "mapref" % "0.2.1", "io.chrisdavenport" %% "circuit" % "0.5.0-M1", - "io.chrisdavenport" %% "mules" % "0.5.0-M1", + "io.chrisdavenport" %% "mules" % "0.5.0-M2", "io.chrisdavenport" %% "single-fibered" % "0.1.0", // Deps we may use in the future, but don't need presently. diff --git a/examples/src/main/scala/Main.scala b/examples/src/main/scala/Main.scala index 9ed655d..c5e0dd0 100644 --- a/examples/src/main/scala/Main.scala +++ b/examples/src/main/scala/Main.scala @@ -24,10 +24,11 @@ object Main extends IOApp { // workers: How many threads will process pipelined messages. connection <- RedisConnection.queued[IO].withHost(host"localhost").withPort(port"6379").withMaxQueued(10000).withWorkers(workers = 1).build topCache <- Resource.eval(_root_.io.chrisdavenport.mules.MemoryCache.ofSingleImmutableMap[IO, String, String](None)) - cache <- RedisCache.keySpacePubSubLayered(topCache, connection, "namespace2", RedisCommands.SetOpts(Some(60), None, None, false)) - } yield (connection, topCache, cache) + pubsub <- RedisPubSub.fromConnection(connection) + cache <- RedisCache.channelBasedLayered(topCache, connection, pubsub, "namespace2", RedisCommands.SetOpts(Some(60), None, None, false), {(s: String) => IO.println(s"Deleted: $s")}.some) + } yield (connection, pubsub, topCache, cache) - r.use{ case (connection, top, cache) => + r.use{ case (connection, pubsub, top, cache) => // val ref = RedisRef.liftedDefaultStorage( // RedisRef.lockedOptionRef(connection, "ref-test", 1.seconds, 10.seconds, RedisCommands.SetOpts(None, None, None, false)), // "0" @@ -60,8 +61,11 @@ object Main extends IOApp { // RedisCommands.get[Redis[IO, *]]("namespace1:test3").run(connection).flatTap(_.putStrLn) >> cache.insert("test3", "value1") >> cache.lookup("test3").flatTap(IO.println(_)) >> - RedisCommands.del[Redis[IO, *]]("namespace2:test3").run(connection).flatTap(IO.println(_)) >> - cache.lookup("test3").flatTap(IO.println(_)) + pubsub.publish("namespace2", "test3") >> + IO.sleep(1.second) >> + // RedisCommands.del[Redis[IO, *]]("namespace2:test3").run(connection).flatTap(IO.println(_)) >> + top.lookup("test3").flatTap(IO.println(_)) >> + cache.lookup("test3").flatTap(IO.println(_)) // Temporal[IO].sleep(30.seconds) >> // layered.lookup("test3").flatTap(_.putStrLn) >>