diff --git a/core/src/main/scala/akka/persistence/r2dbc/internal/Sql.scala b/core/src/main/scala/akka/persistence/r2dbc/internal/Sql.scala index 548cb834..a7fb8ea5 100644 --- a/core/src/main/scala/akka/persistence/r2dbc/internal/Sql.scala +++ b/core/src/main/scala/akka/persistence/r2dbc/internal/Sql.scala @@ -82,7 +82,32 @@ object Sql { } } - final class Cache { + object Cache { + def apply(dataPartitionsEnabled: Boolean): Cache = + if (dataPartitionsEnabled) new CacheBySlice + else new CacheIgnoringSlice + } + + sealed trait Cache { + def get(slice: Int, key: Any)(orCreate: => String): String + } + + private final class CacheIgnoringSlice extends Cache { + private var entries: Map[Any, String] = Map.empty + + def get(slice: Int, key: Any)(orCreate: => String): String = { + entries.get(key) match { + case Some(value) => value + case None => + // it's just a cache so no need for guarding concurrent updates + val entry = orCreate + entries = entries.updated(key, entry) + entry + } + } + } + + private final class CacheBySlice extends Cache { private var entriesPerSlice: IntMap[Map[Any, String]] = IntMap.empty def get(slice: Int, key: Any)(orCreate: => String): String = { diff --git a/core/src/main/scala/akka/persistence/r2dbc/internal/postgres/PostgresJournalDao.scala b/core/src/main/scala/akka/persistence/r2dbc/internal/postgres/PostgresJournalDao.scala index 8b2589af..56ab776b 100644 --- a/core/src/main/scala/akka/persistence/r2dbc/internal/postgres/PostgresJournalDao.scala +++ b/core/src/main/scala/akka/persistence/r2dbc/internal/postgres/PostgresJournalDao.scala @@ -71,7 +71,7 @@ private[r2dbc] class PostgresJournalDao(executorProvider: R2dbcExecutorProvider) protected val persistenceExt: Persistence = Persistence(system) - private val sqlCache = new Sql.Cache + private val sqlCache = Sql.Cache(settings.numberOfDataPartitions > 1) protected def journalTable(slice: Int): String = settings.journalTableWithSchema(slice)