diff --git a/core/src/main/scala/akka/persistence/r2dbc/internal/Sql.scala b/core/src/main/scala/akka/persistence/r2dbc/internal/Sql.scala index d81a5f4c..548cb834 100644 --- a/core/src/main/scala/akka/persistence/r2dbc/internal/Sql.scala +++ b/core/src/main/scala/akka/persistence/r2dbc/internal/Sql.scala @@ -5,6 +5,7 @@ package akka.persistence.r2dbc.internal import scala.annotation.varargs +import scala.collection.immutable.IntMap import akka.annotation.InternalApi import akka.annotation.InternalStableApi @@ -81,4 +82,29 @@ object Sql { } } + final class Cache { + private var entriesPerSlice: IntMap[Map[Any, String]] = IntMap.empty + + def get(slice: Int, key: Any)(orCreate: => String): String = { + + def createEntry(entries: Map[Any, String]): String = { + // it's just a cache so no need for guarding concurrent updates + val entry = orCreate + val newEntries = entries.updated(key, entry) + entriesPerSlice = entriesPerSlice.updated(slice, newEntries) + entry + } + + entriesPerSlice.get(slice) match { + case Some(entries) => + entries.get(key) match { + case Some(value) => value + case None => createEntry(entries) + } + case None => + createEntry(Map.empty) + } + } + } + } diff --git a/core/src/main/scala/akka/persistence/r2dbc/internal/postgres/PostgresJournalDao.scala b/core/src/main/scala/akka/persistence/r2dbc/internal/postgres/PostgresJournalDao.scala index ae20ca3b..8b2589af 100644 --- a/core/src/main/scala/akka/persistence/r2dbc/internal/postgres/PostgresJournalDao.scala +++ b/core/src/main/scala/akka/persistence/r2dbc/internal/postgres/PostgresJournalDao.scala @@ -25,6 +25,7 @@ import akka.persistence.r2dbc.internal.JournalDao import akka.persistence.r2dbc.internal.R2dbcExecutor import akka.persistence.r2dbc.internal.R2dbcExecutorProvider import akka.persistence.r2dbc.internal.SerializedEventMetadata +import akka.persistence.r2dbc.internal.Sql import akka.persistence.r2dbc.internal.Sql.InterpolationWithAdapter import akka.persistence.r2dbc.internal.codec.PayloadCodec.RichStatement import akka.persistence.r2dbc.internal.codec.TagsCodec.TagsCodecRichStatement @@ -70,25 +71,29 @@ private[r2dbc] class PostgresJournalDao(executorProvider: R2dbcExecutorProvider) protected val persistenceExt: Persistence = Persistence(system) + private val sqlCache = new Sql.Cache + protected def journalTable(slice: Int): String = settings.journalTableWithSchema(slice) - protected def insertEventWithParameterTimestampSql(slice: Int): String = { - val table = journalTable(slice) - val baseSql = insertEvenBaseSql(table) - if (settings.dbTimestampMonotonicIncreasing) - sql"$baseSql ?) RETURNING db_timestamp" - else - sql"$baseSql GREATEST(?, ${timestampSubSelect(table)})) RETURNING db_timestamp" - } + protected def insertEventWithParameterTimestampSql(slice: Int): String = + sqlCache.get(slice, "insertEventWithParameterTimestampSql") { + val table = journalTable(slice) + val baseSql = insertEvenBaseSql(table) + if (settings.dbTimestampMonotonicIncreasing) + sql"$baseSql ?) RETURNING db_timestamp" + else + sql"$baseSql GREATEST(?, ${timestampSubSelect(table)})) RETURNING db_timestamp" + } - private def insertEventWithTransactionTimestampSql(slice: Int) = { - val table = journalTable(slice) - val baseSql = insertEvenBaseSql(table) - if (settings.dbTimestampMonotonicIncreasing) - sql"$baseSql CURRENT_TIMESTAMP) RETURNING db_timestamp" - else - sql"$baseSql GREATEST(CURRENT_TIMESTAMP, ${timestampSubSelect(table)})) RETURNING db_timestamp" - } + private def insertEventWithTransactionTimestampSql(slice: Int) = + sqlCache.get(slice, "insertEventWithTransactionTimestampSql") { + val table = journalTable(slice) + val baseSql = insertEvenBaseSql(table) + if (settings.dbTimestampMonotonicIncreasing) + sql"$baseSql CURRENT_TIMESTAMP) RETURNING db_timestamp" + else + sql"$baseSql GREATEST(CURRENT_TIMESTAMP, ${timestampSubSelect(table)})) RETURNING db_timestamp" + } private def insertEvenBaseSql(table: String) = { s"INSERT INTO $table " + @@ -103,31 +108,50 @@ private[r2dbc] class PostgresJournalDao(executorProvider: R2dbcExecutorProvider) s"(SELECT db_timestamp + '1 microsecond'::interval FROM $table " + "WHERE persistence_id = ? AND seq_nr = ?)" - private def selectHighestSequenceNrSql(slice: Int) = sql""" - SELECT MAX(seq_nr) from ${journalTable(slice)} - WHERE persistence_id = ? AND seq_nr >= ?""" + private def selectHighestSequenceNrSql(slice: Int) = + sqlCache.get(slice, "selectHighestSequenceNrSql") { + sql""" + SELECT MAX(seq_nr) from ${journalTable(slice)} + WHERE persistence_id = ? AND seq_nr >= ?""" + } private def selectLowestSequenceNrSql(slice: Int) = - sql""" - SELECT MIN(seq_nr) from ${journalTable(slice)} - WHERE persistence_id = ?""" - - private def deleteEventsSql(slice: Int) = sql""" - DELETE FROM ${journalTable(slice)} - WHERE persistence_id = ? AND seq_nr >= ? AND seq_nr <= ?""" - - protected def insertDeleteMarkerSql(slice: Int, timestamp: String = "CURRENT_TIMESTAMP"): String = sql""" - INSERT INTO ${journalTable(slice)} - (slice, entity_type, persistence_id, seq_nr, db_timestamp, writer, adapter_manifest, event_ser_id, event_ser_manifest, event_payload, deleted) - VALUES (?, ?, ?, ?, $timestamp, ?, ?, ?, ?, ?, ?)""" - - private def deleteEventsByPersistenceIdBeforeTimestampSql(slice: Int) = sql""" - DELETE FROM ${journalTable(slice)} - WHERE persistence_id = ? AND db_timestamp < ?""" - - private def deleteEventsBySliceBeforeTimestampSql(slice: Int) = sql""" - DELETE FROM ${journalTable(slice)} - WHERE slice = ? AND entity_type = ? AND db_timestamp < ?""" + sqlCache.get(slice, "selectLowestSequenceNrSql") { + sql""" + SELECT MIN(seq_nr) from ${journalTable(slice)} + WHERE persistence_id = ?""" + } + + private def deleteEventsSql(slice: Int) = + sqlCache.get(slice, "deleteEventsSql") { + sql""" + DELETE FROM ${journalTable(slice)} + WHERE persistence_id = ? AND seq_nr >= ? AND seq_nr <= ?""" + } + + protected def insertDeleteMarkerSql(slice: Int, timestamp: String = "CURRENT_TIMESTAMP"): String = { + // timestamp param doesn't have to be part of cache key because it's just different for different dialects + sqlCache.get(slice, "insertDeleteMarkerSql") { + sql""" + INSERT INTO ${journalTable(slice)} + (slice, entity_type, persistence_id, seq_nr, db_timestamp, writer, adapter_manifest, event_ser_id, event_ser_manifest, event_payload, deleted) + VALUES (?, ?, ?, ?, $timestamp, ?, ?, ?, ?, ?, ?)""" + } + } + + private def deleteEventsByPersistenceIdBeforeTimestampSql(slice: Int) = + sqlCache.get(slice, "deleteEventsByPersistenceIdBeforeTimestampSql") { + sql""" + DELETE FROM ${journalTable(slice)} + WHERE persistence_id = ? AND db_timestamp < ?""" + } + + private def deleteEventsBySliceBeforeTimestampSql(slice: Int) = + sqlCache.get(slice, "deleteEventsBySliceBeforeTimestampSql") { + sql""" + DELETE FROM ${journalTable(slice)} + WHERE slice = ? AND entity_type = ? AND db_timestamp < ?""" + } /** * All events must be for the same persistenceId.