Skip to content

Commit

Permalink
Merge pull request #7 from davenverse/keyspaceBasedCache
Browse files Browse the repository at this point in the history
KeySpace Based Cache Updates
  • Loading branch information
ChristopherDavenport authored Nov 18, 2021
2 parents 697c33b + 0971bd4 commit 66a73d2
Show file tree
Hide file tree
Showing 4 changed files with 118 additions and 32 deletions.
9 changes: 5 additions & 4 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -26,10 +26,11 @@ lazy val core = project.in(file("core"))
"io.circe" %% "circe-core" % circeV,
"io.circe" %% "circe-parser" % circeV,

"io.chrisdavenport" %% "rediculous" % "0.1.0",
"io.chrisdavenport" %% "mapref" % "0.2.0-M2",
"io.chrisdavenport" %% "rediculous" % "0.2.0-M2",
"io.chrisdavenport" %% "mapref" % "0.2.1",
"io.chrisdavenport" %% "circuit" % "0.5.0-M1",
"io.chrisdavenport" %% "mules" % "0.5.0-M1",
"io.chrisdavenport" %% "mules" % "0.5.0-M2",
"io.chrisdavenport" %% "single-fibered" % "0.1.0",

// Deps we may use in the future, but don't need presently.
// "io.circe" %% "circe-generic" % circeV,
Expand Down Expand Up @@ -61,4 +62,4 @@ lazy val site = project.in(file("site"))
Seq(
micrositeDescription := "Redis Concurrency Structures",
)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,9 @@ import cats.conversions._
import cats.effect._
import io.chrisdavenport.mules._
import io.chrisdavenport.rediculous._
import cats.effect.syntax.all._
import cats.data.Func
import io.chrisdavenport.singlefibered.SingleFibered

object RedisCache {

Expand All @@ -18,22 +21,30 @@ object RedisCache {
*
* inserts and deletes are proliferated to top and then bottom
*/
def layer[F[_]: Monad, K, V](top: Cache[F, K, V], bottom: Cache[F, K, V]): Cache[F, K, V] =
new LayeredCache[F, K, V](top, bottom)
def layer[F[_]: Concurrent, K, V](top: Cache[F, K, V], bottom: Cache[F, K, V]): F[Cache[F, K, V]] = {
val f: K => F[Option[V]] = {(k: K) => bottom.lookup(k).flatMap{
case None => Option.empty[V].pure[F]
case s@Some(v) =>
top.insert(k, v).as(s).widen
}
}
SingleFibered.prepareFunction(f).map(preppedF =>
new LayeredCache[F, K, V](top, bottom, preppedF)
)
}


private class LayeredCache[F[_]: Monad, K, V](
topLayer: Cache[F, K, V],
bottomLayer: Cache[F, K, V]
bottomLayer: Cache[F, K, V],
lookupCached: K => F[Option[V]]
) extends Cache[F, K, V]{
def lookup(k: K): F[Option[V]] =
topLayer.lookup(k).flatMap{
case s@Some(_) => s.pure[F].widen
case None => bottomLayer.lookup(k).flatMap{
case None => Option.empty[V].pure[F]
case s@Some(v) =>
topLayer.insert(k, v).as(s).widen
}
case None => lookupCached(k)
}


def insert(k: K, v: V): F[Unit] =
topLayer.insert(k, v) >>
Expand All @@ -45,6 +56,69 @@ object RedisCache {

}

def keySpacePubSubLayered[F[_]: Async](
topCache: Cache[F, String, String],
connection: RedisConnection[F],
namespace: String,
setOpts: RedisCommands.SetOpts,
additionalActionOnDelete: Option[String => F[Unit]] = None
): Resource[F, Cache[F, String, String]] = {
val nameSpaceStarter = namespace ++ ":"
RedisPubSub.fromConnection(
connection,
clusterBroadcast = true
).evalMap{ pubsub =>
def invalidateTopCache(message: RedisPubSub.PubSubMessage.PMessage): F[Unit] = {
val channel = message.channel
val msg = message.message
val keyR = ("__keyspace.*__:" + nameSpaceStarter + "(.*)").r
val parsed: String = channel match {
case keyR(key) => key
}
msg match {
case "set" | "expired" | "del" => topCache.delete(parsed) >> additionalActionOnDelete.traverse_(_.apply(message.message))
case _ => Concurrent[F].unit
}
}
pubsub.psubscribe(s"__keyspace*__:$nameSpaceStarter*", invalidateTopCache)
.as(pubsub)
}.flatMap(pubsub => pubsub.runMessages.background.void).evalMap {_ =>
val redis = instance(connection, namespace, setOpts)
layer(topCache, redis)
}
}

// Does not require any redis changes
// Does not see redis expirations so you will want expirations on the top
// cache to mirror the cache as best as possible as there will be some
// delays here.
def channelBasedLayered[F[_]: Async](
topCache: Cache[F, String, String],
connection: RedisConnection[F],
pubsub: RedisPubSub[F],
namespace: String,
setOpts: RedisCommands.SetOpts,
additionalActionOnDelete: Option[String => F[Unit]] = None
): Resource[F, Cache[F, String, String]] = {
val channel = namespace
val redis = instance(connection, namespace, setOpts)
def publishChange(key: String) = RedisCommands.publish(channel, key).run(connection)

Resource.eval(layer[F, String, String](topCache, redis)).flatMap{
case layered =>
Resource.eval(pubsub.subscribe(channel, {message: RedisPubSub.PubSubMessage.Message => topCache.delete(message.message) >> additionalActionOnDelete.traverse_(_.apply(message.message))})) >>
pubsub.runMessages.background.as{
new Cache[F, String, String]{
def lookup(k: String): F[Option[String]] = layered.lookup(k)

def insert(k: String, v: String): F[Unit] = layered.insert(k, v) >> publishChange(k).void

def delete(k: String): F[Unit] = layered.delete(k) >> publishChange(k).void
}
}
}
}

def instance[F[_]: Async](
connection: RedisConnection[F],
namespace: String,
Expand All @@ -66,7 +140,6 @@ object RedisCache {

def delete(k: String): F[Unit] =
RedisCommands.del(nameSpaceStarter ++ k).void.run(connection)

}

}
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ object resources {

def redisConnection: Resource[IO, RedisConnection[IO]] = for {
container <- redisContainer[IO].map(_.container)
connection <- RedisConnection.queued[IO](Network[IO], Host.fromString(container.getContainerIpAddress()).get, Port.fromInt(container.getMappedPort(6379)).get, maxQueued = 10000, workers = 4)
connection <- RedisConnection.queued[IO].withHost(Host.fromString(container.getContainerIpAddress()).get).withPort(Port.fromInt(container.getMappedPort(6379)).get).build
} yield connection


Expand Down
48 changes: 30 additions & 18 deletions examples/src/main/scala/Main.scala
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import fs2.io.net._
import scala.concurrent.duration._
import cats.syntax.SetOps
import com.comcast.ip4s._
import _root_.io.chrisdavenport.mapref.MapRef

object Main extends IOApp {

Expand All @@ -21,10 +22,13 @@ object Main extends IOApp {
// maxQueued: How many elements before new submissions semantically block. Tradeoff of memory to queue jobs.
// Default 1000 is good for small servers. But can easily take 100,000.
// workers: How many threads will process pipelined messages.
connection <- RedisConnection.queued[IO](Network[IO], host"localhost", port"6379", maxQueued = 10000, workers = 4)
} yield connection
connection <- RedisConnection.queued[IO].withHost(host"localhost").withPort(port"6379").withMaxQueued(10000).withWorkers(workers = 1).build
topCache <- Resource.eval(_root_.io.chrisdavenport.mules.MemoryCache.ofSingleImmutableMap[IO, String, String](None))
pubsub <- RedisPubSub.fromConnection(connection)
cache <- RedisCache.channelBasedLayered(topCache, connection, pubsub, "namespace2", RedisCommands.SetOpts(Some(60), None, None, false), {(s: String) => IO.println(s"Deleted: $s")}.some)
} yield (connection, pubsub, topCache, cache)

r.use{ connection =>
r.use{ case (connection, pubsub, top, cache) =>
// val ref = RedisRef.liftedDefaultStorage(
// RedisRef.lockedOptionRef(connection, "ref-test", 1.seconds, 10.seconds, RedisCommands.SetOpts(None, None, None, false)),
// "0"
Expand All @@ -33,20 +37,20 @@ object Main extends IOApp {
// val y = x + 1
// (y,y)
// }
val now = IO.delay(System.currentTimeMillis().millis)
def time[A](io: IO[A]): IO[A] =
(now, io, now).tupled.flatMap{
case (begin, out, end) =>
(end - begin).putStrLn.map(_ => out)
}

val barrier = RedisCyclicBarrier.create[IO](connection, "test-cyclic-barrier", 5, 1.seconds, 1.seconds, 10.millis, 20.minutes, RedisCommands.SetOpts(None, Some(24.hours.toMillis), None, false))

Stream.repeatEval(
time(
(barrier.await, IO.race(barrier.await, Temporal[IO].sleep(500.millis) >> barrier.await), barrier.await, Temporal[IO].sleep(1.second) >> barrier.await).parTupled
)
).take(10).compile.drain
// val now = IO.delay(System.currentTimeMillis().millis)
// def time[A](io: IO[A]): IO[A] =
// (now, io, now).tupled.flatMap{
// case (begin, out, end) =>
// (end - begin).putStrLn.map(_ => out)
// }

// val barrier = RedisCyclicBarrier.create[IO](connection, "test-cyclic-barrier", 5, 1.seconds, 1.seconds, 10.millis, 20.minutes, RedisCommands.SetOpts(None, Some(24.hours.toMillis), None, false))

// Stream.repeatEval(
// time(
// (barrier.await, IO.race(barrier.await, Temporal[IO].sleep(500.millis) >> barrier.await), barrier.await, Temporal[IO].sleep(1.second) >> barrier.await).parTupled
// )
// ).take(10).compile.drain


// // Layered Cache
Expand All @@ -55,7 +59,15 @@ object Main extends IOApp {
// val layered = RedisCache.layer(cache, cache2)

// RedisCommands.get[Redis[IO, *]]("namespace1:test3").run(connection).flatTap(_.putStrLn) >>
// cache2.insert("test3", "value1") >>
cache.insert("test3", "value1") >>
cache.lookup("test3").flatTap(IO.println(_)) >>
pubsub.publish("namespace2", "test3") >>
IO.sleep(1.second) >>
// RedisCommands.del[Redis[IO, *]]("namespace2:test3").run(connection).flatTap(IO.println(_)) >>
top.lookup("test3").flatTap(IO.println(_)) >>
cache.lookup("test3").flatTap(IO.println(_))
// Temporal[IO].sleep(30.seconds) >>

// layered.lookup("test3").flatTap(_.putStrLn) >>
// RedisCommands.get[Redis[IO, *]]("namespace1:test3").run(connection).flatTap(_.putStrLn) >>
// RedisCommands.get[Redis[IO, *]]("namespace2:test3").run(connection).flatTap(_.putStrLn)
Expand Down

0 comments on commit 66a73d2

Please sign in to comment.