diff --git a/core/src/main/scala/akka/persistence/r2dbc/internal/Sql.scala b/core/src/main/scala/akka/persistence/r2dbc/internal/Sql.scala index d81a5f4c..a7fb8ea5 100644 --- a/core/src/main/scala/akka/persistence/r2dbc/internal/Sql.scala +++ b/core/src/main/scala/akka/persistence/r2dbc/internal/Sql.scala @@ -5,6 +5,7 @@ package akka.persistence.r2dbc.internal import scala.annotation.varargs +import scala.collection.immutable.IntMap import akka.annotation.InternalApi import akka.annotation.InternalStableApi @@ -81,4 +82,54 @@ object Sql { } } + object Cache { + def apply(dataPartitionsEnabled: Boolean): Cache = + if (dataPartitionsEnabled) new CacheBySlice + else new CacheIgnoringSlice + } + + sealed trait Cache { + def get(slice: Int, key: Any)(orCreate: => String): String + } + + private final class CacheIgnoringSlice extends Cache { + private var entries: Map[Any, String] = Map.empty + + def get(slice: Int, key: Any)(orCreate: => String): String = { + entries.get(key) match { + case Some(value) => value + case None => + // it's just a cache so no need for guarding concurrent updates + val entry = orCreate + entries = entries.updated(key, entry) + entry + } + } + } + + private final class CacheBySlice extends Cache { + private var entriesPerSlice: IntMap[Map[Any, String]] = IntMap.empty + + def get(slice: Int, key: Any)(orCreate: => String): String = { + + def createEntry(entries: Map[Any, String]): String = { + // it's just a cache so no need for guarding concurrent updates + val entry = orCreate + val newEntries = entries.updated(key, entry) + entriesPerSlice = entriesPerSlice.updated(slice, newEntries) + entry + } + + entriesPerSlice.get(slice) match { + case Some(entries) => + entries.get(key) match { + case Some(value) => value + case None => createEntry(entries) + } + case None => + createEntry(Map.empty) + } + } + } + } diff --git a/core/src/main/scala/akka/persistence/r2dbc/internal/h2/H2JournalDao.scala b/core/src/main/scala/akka/persistence/r2dbc/internal/h2/H2JournalDao.scala index 344becbd..6a548399 100644 --- a/core/src/main/scala/akka/persistence/r2dbc/internal/h2/H2JournalDao.scala +++ b/core/src/main/scala/akka/persistence/r2dbc/internal/h2/H2JournalDao.scala @@ -18,6 +18,7 @@ import akka.dispatch.ExecutionContexts import akka.persistence.r2dbc.internal.JournalDao import akka.persistence.r2dbc.internal.R2dbcExecutor import akka.persistence.r2dbc.internal.R2dbcExecutorProvider +import akka.persistence.r2dbc.internal.Sql import akka.persistence.r2dbc.internal.Sql.InterpolationWithAdapter import akka.persistence.r2dbc.internal.codec.PayloadCodec.RichStatement import akka.persistence.r2dbc.internal.postgres.PostgresJournalDao @@ -36,9 +37,14 @@ private[r2dbc] class H2JournalDao(executorProvider: R2dbcExecutorProvider) require(settings.useAppTimestamp) require(settings.dbTimestampMonotonicIncreasing) - private def insertSql(slice: Int) = sql"INSERT INTO ${journalTable(slice)} " + - "(slice, entity_type, persistence_id, seq_nr, writer, adapter_manifest, event_ser_id, event_ser_manifest, event_payload, tags, meta_ser_id, meta_ser_manifest, meta_payload, db_timestamp) " + - "VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)" + private val sqlCache = Sql.Cache(settings.numberOfDataPartitions > 1) + + private def insertSql(slice: Int) = + sqlCache.get(slice, "insertSql") { + sql"INSERT INTO ${journalTable(slice)} " + + "(slice, entity_type, persistence_id, seq_nr, writer, adapter_manifest, event_ser_id, event_ser_manifest, event_payload, tags, meta_ser_id, meta_ser_manifest, meta_payload, db_timestamp) " + + "VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)" + } /** * All events must be for the same persistenceId. diff --git a/core/src/main/scala/akka/persistence/r2dbc/internal/h2/H2QueryDao.scala b/core/src/main/scala/akka/persistence/r2dbc/internal/h2/H2QueryDao.scala index 5bb3f9c8..ecfad567 100644 --- a/core/src/main/scala/akka/persistence/r2dbc/internal/h2/H2QueryDao.scala +++ b/core/src/main/scala/akka/persistence/r2dbc/internal/h2/H2QueryDao.scala @@ -29,6 +29,7 @@ private[r2dbc] class H2QueryDao(executorProvider: R2dbcExecutorProvider) extends backtracking: Boolean, minSlice: Int, maxSlice: Int): String = { + // not caching, too many combinations def toDbTimestampParamCondition = if (toDbTimestampParam) "AND db_timestamp <= ?" else "" diff --git a/core/src/main/scala/akka/persistence/r2dbc/internal/h2/H2SnapshotDao.scala b/core/src/main/scala/akka/persistence/r2dbc/internal/h2/H2SnapshotDao.scala index 224ec4fd..8aadecd2 100644 --- a/core/src/main/scala/akka/persistence/r2dbc/internal/h2/H2SnapshotDao.scala +++ b/core/src/main/scala/akka/persistence/r2dbc/internal/h2/H2SnapshotDao.scala @@ -9,6 +9,7 @@ import org.slf4j.LoggerFactory import akka.annotation.InternalApi import akka.persistence.r2dbc.internal.R2dbcExecutorProvider +import akka.persistence.r2dbc.internal.Sql import akka.persistence.r2dbc.internal.Sql.InterpolationWithAdapter import akka.persistence.r2dbc.internal.postgres.PostgresSnapshotDao @@ -22,21 +23,24 @@ private[r2dbc] final class H2SnapshotDao(executorProvider: R2dbcExecutorProvider override protected lazy val log: Logger = LoggerFactory.getLogger(classOf[H2SnapshotDao]) - override protected def upsertSql(slice: Int): String = { - // db_timestamp and tags columns were added in 1.2.0 - if (settings.querySettings.startFromSnapshotEnabled) - sql""" - MERGE INTO ${snapshotTable(slice)} - (slice, entity_type, persistence_id, seq_nr, write_timestamp, snapshot, ser_id, ser_manifest, meta_payload, meta_ser_id, meta_ser_manifest, db_timestamp, tags) - KEY (persistence_id) - VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) - """ - else - sql""" - MERGE INTO ${snapshotTable(slice)} - (slice, entity_type, persistence_id, seq_nr, write_timestamp, snapshot, ser_id, ser_manifest, meta_payload, meta_ser_id, meta_ser_manifest) - KEY (persistence_id) - VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) - """ - } + private val sqlCache = Sql.Cache(settings.numberOfDataPartitions > 1) + + override protected def upsertSql(slice: Int): String = + sqlCache.get(slice, "upsertSql") { + // db_timestamp and tags columns were added in 1.2.0 + if (settings.querySettings.startFromSnapshotEnabled) + sql""" + MERGE INTO ${snapshotTable(slice)} + (slice, entity_type, persistence_id, seq_nr, write_timestamp, snapshot, ser_id, ser_manifest, meta_payload, meta_ser_id, meta_ser_manifest, db_timestamp, tags) + KEY (persistence_id) + VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) + """ + else + sql""" + MERGE INTO ${snapshotTable(slice)} + (slice, entity_type, persistence_id, seq_nr, write_timestamp, snapshot, ser_id, ser_manifest, meta_payload, meta_ser_id, meta_ser_manifest) + KEY (persistence_id) + VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) + """ + } } diff --git a/core/src/main/scala/akka/persistence/r2dbc/internal/postgres/PostgresDurableStateDao.scala b/core/src/main/scala/akka/persistence/r2dbc/internal/postgres/PostgresDurableStateDao.scala index efcaf7b0..7a79fa2a 100644 --- a/core/src/main/scala/akka/persistence/r2dbc/internal/postgres/PostgresDurableStateDao.scala +++ b/core/src/main/scala/akka/persistence/r2dbc/internal/postgres/PostgresDurableStateDao.scala @@ -47,6 +47,7 @@ import akka.persistence.r2dbc.internal.codec.PayloadCodec.RichRow import akka.persistence.r2dbc.internal.codec.PayloadCodec.RichStatement import akka.persistence.r2dbc.internal.R2dbcExecutor import akka.persistence.r2dbc.internal.R2dbcExecutorProvider +import akka.persistence.r2dbc.internal.Sql import akka.persistence.r2dbc.internal.Sql.InterpolationWithAdapter import akka.persistence.r2dbc.internal.codec.TagsCodec.TagsCodecRichStatement import akka.persistence.r2dbc.internal.codec.TimestampCodec.TimestampCodecRichRow @@ -90,6 +91,8 @@ private[r2dbc] class PostgresDurableStateDao(executorProvider: R2dbcExecutorProv private val persistenceExt = Persistence(system) + private val sqlCache = Sql.Cache(settings.numberOfDataPartitions > 1) + // used for change events private lazy val journalDao: JournalDao = dialect.createJournalDao(executorProvider) @@ -107,24 +110,26 @@ private[r2dbc] class PostgresDurableStateDao(executorProvider: R2dbcExecutorProv } } - protected def selectStateSql(slice: Int, entityType: String): String = { - val stateTable = settings.getDurableStateTableWithSchema(entityType, slice) - sql""" - SELECT revision, state_ser_id, state_ser_manifest, state_payload, db_timestamp - FROM $stateTable WHERE persistence_id = ?""" - } + protected def selectStateSql(slice: Int, entityType: String): String = + sqlCache.get(slice, s"selectStateSql-$entityType") { + val stateTable = settings.getDurableStateTableWithSchema(entityType, slice) + sql""" + SELECT revision, state_ser_id, state_ser_manifest, state_payload, db_timestamp + FROM $stateTable WHERE persistence_id = ?""" + } - protected def selectBucketsSql(entityType: String, minSlice: Int, maxSlice: Int): String = { - val stateTable = settings.getDurableStateTableWithSchema(entityType, minSlice) - sql""" - SELECT extract(EPOCH from db_timestamp)::BIGINT / 10 AS bucket, count(*) AS count - FROM $stateTable - WHERE entity_type = ? - AND ${sliceCondition(minSlice, maxSlice)} - AND db_timestamp >= ? AND db_timestamp <= ? - GROUP BY bucket ORDER BY bucket LIMIT ? - """ - } + protected def selectBucketsSql(entityType: String, minSlice: Int, maxSlice: Int): String = + sqlCache.get(minSlice, s"selectBucketsSql-$entityType-$minSlice-$maxSlice") { + val stateTable = settings.getDurableStateTableWithSchema(entityType, minSlice) + sql""" + SELECT extract(EPOCH from db_timestamp)::BIGINT / 10 AS bucket, count(*) AS count + FROM $stateTable + WHERE entity_type = ? + AND ${sliceCondition(minSlice, maxSlice)} + AND db_timestamp >= ? AND db_timestamp <= ? + GROUP BY bucket ORDER BY bucket LIMIT ? + """ + } protected def sliceCondition(minSlice: Int, maxSlice: Int): String = s"slice in (${(minSlice to maxSlice).mkString(",")})" @@ -133,13 +138,20 @@ private[r2dbc] class PostgresDurableStateDao(executorProvider: R2dbcExecutorProv slice: Int, entityType: String, additionalBindings: immutable.IndexedSeq[EvaluatedAdditionalColumnBindings]): String = { - val stateTable = settings.getDurableStateTableWithSchema(entityType, slice) - val additionalCols = additionalInsertColumns(additionalBindings) - val additionalParams = additionalInsertParameters(additionalBindings) - sql""" - INSERT INTO $stateTable - (slice, entity_type, persistence_id, revision, state_ser_id, state_ser_manifest, state_payload, tags$additionalCols, db_timestamp) - VALUES (?, ?, ?, ?, ?, ?, ?, ?$additionalParams, CURRENT_TIMESTAMP)""" + def createSql = { + val stateTable = settings.getDurableStateTableWithSchema(entityType, slice) + val additionalCols = additionalInsertColumns(additionalBindings) + val additionalParams = additionalInsertParameters(additionalBindings) + sql""" + INSERT INTO $stateTable + (slice, entity_type, persistence_id, revision, state_ser_id, state_ser_manifest, state_payload, tags$additionalCols, db_timestamp) + VALUES (?, ?, ?, ?, ?, ?, ?, ?$additionalParams, CURRENT_TIMESTAMP)""" + } + + if (additionalBindings.isEmpty) + sqlCache.get(slice, s"insertStateSql-$entityType")(createSql) + else + createSql // no cache } protected def additionalInsertColumns( @@ -179,28 +191,35 @@ private[r2dbc] class PostgresDurableStateDao(executorProvider: R2dbcExecutorProv updateTags: Boolean, additionalBindings: immutable.IndexedSeq[EvaluatedAdditionalColumnBindings], currentTimestamp: String = "CURRENT_TIMESTAMP"): String = { + def createSql = { + val stateTable = settings.getDurableStateTableWithSchema(entityType, slice) - val stateTable = settings.getDurableStateTableWithSchema(entityType, slice) + val timestamp = + if (settings.dbTimestampMonotonicIncreasing) + currentTimestamp + else + "GREATEST(CURRENT_TIMESTAMP, " + + s"(SELECT db_timestamp + '1 microsecond'::interval FROM $stateTable WHERE persistence_id = ? AND revision = ?))" - val timestamp = - if (settings.dbTimestampMonotonicIncreasing) - currentTimestamp - else - "GREATEST(CURRENT_TIMESTAMP, " + - s"(SELECT db_timestamp + '1 microsecond'::interval FROM $stateTable WHERE persistence_id = ? AND revision = ?))" - - val revisionCondition = - if (settings.durableStateAssertSingleWriter) " AND revision = ?" - else "" + val revisionCondition = + if (settings.durableStateAssertSingleWriter) " AND revision = ?" + else "" - val tags = if (updateTags) ", tags = ?" else "" + val tags = if (updateTags) ", tags = ?" else "" - val additionalParams = additionalUpdateParameters(additionalBindings) - sql""" + val additionalParams = additionalUpdateParameters(additionalBindings) + sql""" UPDATE $stateTable SET revision = ?, state_ser_id = ?, state_ser_manifest = ?, state_payload = ?$tags$additionalParams, db_timestamp = $timestamp WHERE persistence_id = ? $revisionCondition""" + } + + if (additionalBindings.isEmpty) + // timestamp param doesn't have to be part of cache key because it's just different for different dialects + sqlCache.get(slice, s"updateStateSql-$entityType-$updateTags")(createSql) + else + createSql // no cache } protected def additionalUpdateParameters( @@ -220,21 +239,29 @@ private[r2dbc] class PostgresDurableStateDao(executorProvider: R2dbcExecutorProv } protected def hardDeleteStateSql(entityType: String, slice: Int): String = { - val stateTable = settings.getDurableStateTableWithSchema(entityType, slice) - sql"DELETE from $stateTable WHERE persistence_id = ?" + sqlCache.get(slice, s"hardDeleteStateSql-$entityType") { + val stateTable = settings.getDurableStateTableWithSchema(entityType, slice) + sql"DELETE from $stateTable WHERE persistence_id = ?" + } } private val currentDbTimestampSql = sql"SELECT CURRENT_TIMESTAMP AS db_timestamp" - protected def allPersistenceIdsSql(table: String): String = + protected def allPersistenceIdsSql(table: String): String = { + // not worth caching sql"SELECT persistence_id from $table ORDER BY persistence_id LIMIT ?" + } - protected def persistenceIdsForEntityTypeSql(table: String): String = + protected def persistenceIdsForEntityTypeSql(table: String): String = { + // not worth caching sql"SELECT persistence_id from $table WHERE persistence_id LIKE ? ORDER BY persistence_id LIMIT ?" + } - protected def allPersistenceIdsAfterSql(table: String): String = + protected def allPersistenceIdsAfterSql(table: String): String = { + // not worth caching sql"SELECT persistence_id from $table WHERE persistence_id > ? ORDER BY persistence_id LIMIT ?" + } protected def bindPersistenceIdsForEntityTypeAfterSql( stmt: Statement, @@ -248,8 +275,10 @@ private[r2dbc] class PostgresDurableStateDao(executorProvider: R2dbcExecutorProv .bind(2, limit) } - protected def persistenceIdsForEntityTypeAfterSql(table: String): String = + protected def persistenceIdsForEntityTypeAfterSql(table: String): String = { + // not worth caching sql"SELECT persistence_id from $table WHERE persistence_id LIKE ? AND persistence_id > ? ORDER BY persistence_id LIMIT ?" + } protected def behindCurrentTimeIntervalConditionFor(behindCurrentTime: FiniteDuration): String = if (behindCurrentTime > Duration.Zero) @@ -263,6 +292,7 @@ private[r2dbc] class PostgresDurableStateDao(executorProvider: R2dbcExecutorProv backtracking: Boolean, minSlice: Int, maxSlice: Int): String = { + // not caching, too many combinations val stateTable = settings.getDurableStateTableWithSchema(entityType, minSlice) diff --git a/core/src/main/scala/akka/persistence/r2dbc/internal/postgres/PostgresJournalDao.scala b/core/src/main/scala/akka/persistence/r2dbc/internal/postgres/PostgresJournalDao.scala index ae20ca3b..56ab776b 100644 --- a/core/src/main/scala/akka/persistence/r2dbc/internal/postgres/PostgresJournalDao.scala +++ b/core/src/main/scala/akka/persistence/r2dbc/internal/postgres/PostgresJournalDao.scala @@ -25,6 +25,7 @@ import akka.persistence.r2dbc.internal.JournalDao import akka.persistence.r2dbc.internal.R2dbcExecutor import akka.persistence.r2dbc.internal.R2dbcExecutorProvider import akka.persistence.r2dbc.internal.SerializedEventMetadata +import akka.persistence.r2dbc.internal.Sql import akka.persistence.r2dbc.internal.Sql.InterpolationWithAdapter import akka.persistence.r2dbc.internal.codec.PayloadCodec.RichStatement import akka.persistence.r2dbc.internal.codec.TagsCodec.TagsCodecRichStatement @@ -70,25 +71,29 @@ private[r2dbc] class PostgresJournalDao(executorProvider: R2dbcExecutorProvider) protected val persistenceExt: Persistence = Persistence(system) + private val sqlCache = Sql.Cache(settings.numberOfDataPartitions > 1) + protected def journalTable(slice: Int): String = settings.journalTableWithSchema(slice) - protected def insertEventWithParameterTimestampSql(slice: Int): String = { - val table = journalTable(slice) - val baseSql = insertEvenBaseSql(table) - if (settings.dbTimestampMonotonicIncreasing) - sql"$baseSql ?) RETURNING db_timestamp" - else - sql"$baseSql GREATEST(?, ${timestampSubSelect(table)})) RETURNING db_timestamp" - } + protected def insertEventWithParameterTimestampSql(slice: Int): String = + sqlCache.get(slice, "insertEventWithParameterTimestampSql") { + val table = journalTable(slice) + val baseSql = insertEvenBaseSql(table) + if (settings.dbTimestampMonotonicIncreasing) + sql"$baseSql ?) RETURNING db_timestamp" + else + sql"$baseSql GREATEST(?, ${timestampSubSelect(table)})) RETURNING db_timestamp" + } - private def insertEventWithTransactionTimestampSql(slice: Int) = { - val table = journalTable(slice) - val baseSql = insertEvenBaseSql(table) - if (settings.dbTimestampMonotonicIncreasing) - sql"$baseSql CURRENT_TIMESTAMP) RETURNING db_timestamp" - else - sql"$baseSql GREATEST(CURRENT_TIMESTAMP, ${timestampSubSelect(table)})) RETURNING db_timestamp" - } + private def insertEventWithTransactionTimestampSql(slice: Int) = + sqlCache.get(slice, "insertEventWithTransactionTimestampSql") { + val table = journalTable(slice) + val baseSql = insertEvenBaseSql(table) + if (settings.dbTimestampMonotonicIncreasing) + sql"$baseSql CURRENT_TIMESTAMP) RETURNING db_timestamp" + else + sql"$baseSql GREATEST(CURRENT_TIMESTAMP, ${timestampSubSelect(table)})) RETURNING db_timestamp" + } private def insertEvenBaseSql(table: String) = { s"INSERT INTO $table " + @@ -103,31 +108,50 @@ private[r2dbc] class PostgresJournalDao(executorProvider: R2dbcExecutorProvider) s"(SELECT db_timestamp + '1 microsecond'::interval FROM $table " + "WHERE persistence_id = ? AND seq_nr = ?)" - private def selectHighestSequenceNrSql(slice: Int) = sql""" - SELECT MAX(seq_nr) from ${journalTable(slice)} - WHERE persistence_id = ? AND seq_nr >= ?""" + private def selectHighestSequenceNrSql(slice: Int) = + sqlCache.get(slice, "selectHighestSequenceNrSql") { + sql""" + SELECT MAX(seq_nr) from ${journalTable(slice)} + WHERE persistence_id = ? AND seq_nr >= ?""" + } private def selectLowestSequenceNrSql(slice: Int) = - sql""" - SELECT MIN(seq_nr) from ${journalTable(slice)} - WHERE persistence_id = ?""" - - private def deleteEventsSql(slice: Int) = sql""" - DELETE FROM ${journalTable(slice)} - WHERE persistence_id = ? AND seq_nr >= ? AND seq_nr <= ?""" - - protected def insertDeleteMarkerSql(slice: Int, timestamp: String = "CURRENT_TIMESTAMP"): String = sql""" - INSERT INTO ${journalTable(slice)} - (slice, entity_type, persistence_id, seq_nr, db_timestamp, writer, adapter_manifest, event_ser_id, event_ser_manifest, event_payload, deleted) - VALUES (?, ?, ?, ?, $timestamp, ?, ?, ?, ?, ?, ?)""" - - private def deleteEventsByPersistenceIdBeforeTimestampSql(slice: Int) = sql""" - DELETE FROM ${journalTable(slice)} - WHERE persistence_id = ? AND db_timestamp < ?""" - - private def deleteEventsBySliceBeforeTimestampSql(slice: Int) = sql""" - DELETE FROM ${journalTable(slice)} - WHERE slice = ? AND entity_type = ? AND db_timestamp < ?""" + sqlCache.get(slice, "selectLowestSequenceNrSql") { + sql""" + SELECT MIN(seq_nr) from ${journalTable(slice)} + WHERE persistence_id = ?""" + } + + private def deleteEventsSql(slice: Int) = + sqlCache.get(slice, "deleteEventsSql") { + sql""" + DELETE FROM ${journalTable(slice)} + WHERE persistence_id = ? AND seq_nr >= ? AND seq_nr <= ?""" + } + + protected def insertDeleteMarkerSql(slice: Int, timestamp: String = "CURRENT_TIMESTAMP"): String = { + // timestamp param doesn't have to be part of cache key because it's just different for different dialects + sqlCache.get(slice, "insertDeleteMarkerSql") { + sql""" + INSERT INTO ${journalTable(slice)} + (slice, entity_type, persistence_id, seq_nr, db_timestamp, writer, adapter_manifest, event_ser_id, event_ser_manifest, event_payload, deleted) + VALUES (?, ?, ?, ?, $timestamp, ?, ?, ?, ?, ?, ?)""" + } + } + + private def deleteEventsByPersistenceIdBeforeTimestampSql(slice: Int) = + sqlCache.get(slice, "deleteEventsByPersistenceIdBeforeTimestampSql") { + sql""" + DELETE FROM ${journalTable(slice)} + WHERE persistence_id = ? AND db_timestamp < ?""" + } + + private def deleteEventsBySliceBeforeTimestampSql(slice: Int) = + sqlCache.get(slice, "deleteEventsBySliceBeforeTimestampSql") { + sql""" + DELETE FROM ${journalTable(slice)} + WHERE slice = ? AND entity_type = ? AND db_timestamp < ?""" + } /** * All events must be for the same persistenceId. diff --git a/core/src/main/scala/akka/persistence/r2dbc/internal/postgres/PostgresQueryDao.scala b/core/src/main/scala/akka/persistence/r2dbc/internal/postgres/PostgresQueryDao.scala index 65cb932e..382e7ca8 100644 --- a/core/src/main/scala/akka/persistence/r2dbc/internal/postgres/PostgresQueryDao.scala +++ b/core/src/main/scala/akka/persistence/r2dbc/internal/postgres/PostgresQueryDao.scala @@ -28,6 +28,7 @@ import akka.persistence.r2dbc.internal.InstantFactory import akka.persistence.r2dbc.internal.JournalDao.SerializedJournalRow import akka.persistence.r2dbc.internal.QueryDao import akka.persistence.r2dbc.internal.R2dbcExecutorProvider +import akka.persistence.r2dbc.internal.Sql import akka.persistence.r2dbc.internal.Sql.InterpolationWithAdapter import akka.persistence.r2dbc.internal.codec.PayloadCodec.RichRow import akka.persistence.r2dbc.internal.codec.TagsCodec.TagsCodecRichRow @@ -58,6 +59,8 @@ private[r2dbc] class PostgresQueryDao(executorProvider: R2dbcExecutorProvider) e protected def log: Logger = PostgresQueryDao.log protected val persistenceExt: Persistence = Persistence(system) + private val sqlCache = Sql.Cache(settings.numberOfDataPartitions > 1) + protected def journalTable(slice: Int): String = settings.journalTableWithSchema(slice) protected def sqlFalse: String = "false" @@ -71,6 +74,7 @@ private[r2dbc] class PostgresQueryDao(executorProvider: R2dbcExecutorProvider) e backtracking: Boolean, minSlice: Int, maxSlice: Int): String = { + // not caching, too many combinations def toDbTimestampParamCondition = if (toDbTimestampParam) "AND db_timestamp <= ?" else "" @@ -101,8 +105,9 @@ private[r2dbc] class PostgresQueryDao(executorProvider: R2dbcExecutorProvider) e protected def sliceCondition(minSlice: Int, maxSlice: Int): String = s"slice in (${(minSlice to maxSlice).mkString(",")})" - protected def selectBucketsSql(minSlice: Int, maxSlice: Int): String = { - sql""" + protected def selectBucketsSql(minSlice: Int, maxSlice: Int): String = + sqlCache.get(minSlice, s"selectBucketsSql-$minSlice-$maxSlice") { + sql""" SELECT extract(EPOCH from db_timestamp)::BIGINT / 10 AS bucket, count(*) AS count FROM ${journalTable(minSlice)} WHERE entity_type = ? @@ -111,29 +116,41 @@ private[r2dbc] class PostgresQueryDao(executorProvider: R2dbcExecutorProvider) e AND deleted = false GROUP BY bucket ORDER BY bucket LIMIT ? """ - } + } - protected def selectTimestampOfEventSql(slice: Int) = sql""" - SELECT db_timestamp FROM ${journalTable(slice)} - WHERE persistence_id = ? AND seq_nr = ? AND deleted = $sqlFalse""" + protected def selectTimestampOfEventSql(slice: Int): String = + sqlCache.get(slice, "selectTimestampOfEventSql") { + sql""" + SELECT db_timestamp FROM ${journalTable(slice)} + WHERE persistence_id = ? AND seq_nr = ? AND deleted = $sqlFalse""" + } - protected def selectOneEventSql(slice: Int) = sql""" - SELECT slice, entity_type, db_timestamp, $sqlDbTimestamp AS read_db_timestamp, event_ser_id, event_ser_manifest, event_payload, meta_ser_id, meta_ser_manifest, meta_payload, tags - FROM ${journalTable(slice)} - WHERE persistence_id = ? AND seq_nr = ? AND deleted = $sqlFalse""" + protected def selectOneEventSql(slice: Int): String = + sqlCache.get(slice, "selectOneEventSql") { + sql""" + SELECT slice, entity_type, db_timestamp, $sqlDbTimestamp AS read_db_timestamp, event_ser_id, event_ser_manifest, event_payload, meta_ser_id, meta_ser_manifest, meta_payload, tags + FROM ${journalTable(slice)} + WHERE persistence_id = ? AND seq_nr = ? AND deleted = $sqlFalse""" + } - protected def selectOneEventWithoutPayloadSql(slice: Int) = sql""" - SELECT slice, entity_type, db_timestamp, CURRENT_TIMESTAMP AS read_db_timestamp, event_ser_id, event_ser_manifest, meta_ser_id, meta_ser_manifest, meta_payload, tags - FROM ${journalTable(slice)} - WHERE persistence_id = ? AND seq_nr = ? AND deleted = $sqlFalse""" + protected def selectOneEventWithoutPayloadSql(slice: Int): String = + sqlCache.get(slice, "selectOneEventWithoutPayloadSql") { + sql""" + SELECT slice, entity_type, db_timestamp, CURRENT_TIMESTAMP AS read_db_timestamp, event_ser_id, event_ser_manifest, meta_ser_id, meta_ser_manifest, meta_payload, tags + FROM ${journalTable(slice)} + WHERE persistence_id = ? AND seq_nr = ? AND deleted = $sqlFalse""" + } - protected def selectEventsSql(slice: Int) = sql""" - SELECT slice, entity_type, persistence_id, seq_nr, db_timestamp, CURRENT_TIMESTAMP AS read_db_timestamp, event_ser_id, event_ser_manifest, event_payload, writer, adapter_manifest, meta_ser_id, meta_ser_manifest, meta_payload, tags - from ${journalTable(slice)} - WHERE persistence_id = ? AND seq_nr >= ? AND seq_nr <= ? - AND deleted = false - ORDER BY seq_nr - LIMIT ?""" + protected def selectEventsSql(slice: Int): String = + sqlCache.get(slice, "selectEventsSql") { + sql""" + SELECT slice, entity_type, persistence_id, seq_nr, db_timestamp, CURRENT_TIMESTAMP AS read_db_timestamp, event_ser_id, event_ser_manifest, event_payload, writer, adapter_manifest, meta_ser_id, meta_ser_manifest, meta_payload, tags + from ${journalTable(slice)} + WHERE persistence_id = ? AND seq_nr >= ? AND seq_nr <= ? + AND deleted = false + ORDER BY seq_nr + LIMIT ?""" + } protected def bindSelectEventsSql( stmt: Statement, @@ -146,18 +163,25 @@ private[r2dbc] class PostgresQueryDao(executorProvider: R2dbcExecutorProvider) e .bind(2, toSequenceNr) .bind(3, settings.querySettings.bufferSize) - protected def allPersistenceIdsSql(minSlice: Int) = { - sql"SELECT DISTINCT(persistence_id) from ${journalTable(minSlice)} ORDER BY persistence_id LIMIT ?" - } + protected def allPersistenceIdsSql(minSlice: Int): String = + sqlCache.get(minSlice, "allPersistenceIdsSql") { + sql"SELECT DISTINCT(persistence_id) from ${journalTable(minSlice)} ORDER BY persistence_id LIMIT ?" + } - protected def persistenceIdsForEntityTypeSql(minSlice: Int) = - sql"SELECT DISTINCT(persistence_id) from ${journalTable(minSlice)} WHERE persistence_id LIKE ? ORDER BY persistence_id LIMIT ?" + protected def persistenceIdsForEntityTypeSql(minSlice: Int): String = + sqlCache.get(minSlice, "persistenceIdsForEntityTypeSql") { + sql"SELECT DISTINCT(persistence_id) from ${journalTable(minSlice)} WHERE persistence_id LIKE ? ORDER BY persistence_id LIMIT ?" + } - protected def allPersistenceIdsAfterSql(minSlice: Int) = - sql"SELECT DISTINCT(persistence_id) from ${journalTable(minSlice)} WHERE persistence_id > ? ORDER BY persistence_id LIMIT ?" + protected def allPersistenceIdsAfterSql(minSlice: Int): String = + sqlCache.get(minSlice, "allPersistenceIdsAfterSql") { + sql"SELECT DISTINCT(persistence_id) from ${journalTable(minSlice)} WHERE persistence_id > ? ORDER BY persistence_id LIMIT ?" + } - protected def persistenceIdsForEntityTypeAfterSql(minSlice: Int) = - sql"SELECT DISTINCT(persistence_id) from ${journalTable(minSlice)} WHERE persistence_id LIKE ? AND persistence_id > ? ORDER BY persistence_id LIMIT ?" + protected def persistenceIdsForEntityTypeAfterSql(minSlice: Int): String = + sqlCache.get(minSlice, "persistenceIdsForEntityTypeAfterSql") { + sql"SELECT DISTINCT(persistence_id) from ${journalTable(minSlice)} WHERE persistence_id LIKE ? AND persistence_id > ? ORDER BY persistence_id LIMIT ?" + } override def currentDbTimestamp(slice: Int): Future[Instant] = { val executor = executorProvider.executorFor(slice) diff --git a/core/src/main/scala/akka/persistence/r2dbc/internal/postgres/PostgresSnapshotDao.scala b/core/src/main/scala/akka/persistence/r2dbc/internal/postgres/PostgresSnapshotDao.scala index 968dfffb..8324b943 100644 --- a/core/src/main/scala/akka/persistence/r2dbc/internal/postgres/PostgresSnapshotDao.scala +++ b/core/src/main/scala/akka/persistence/r2dbc/internal/postgres/PostgresSnapshotDao.scala @@ -30,6 +30,7 @@ import akka.persistence.r2dbc.internal.codec.PayloadCodec.RichRow import akka.persistence.r2dbc.internal.codec.PayloadCodec.RichStatement import akka.persistence.r2dbc.internal.R2dbcExecutorProvider import akka.persistence.r2dbc.internal.SnapshotDao +import akka.persistence.r2dbc.internal.Sql import akka.persistence.r2dbc.internal.Sql.InterpolationWithAdapter import akka.persistence.r2dbc.internal.codec.TagsCodec.TagsCodecRichStatement import akka.persistence.r2dbc.internal.codec.TagsCodec.TagsCodecRichRow @@ -58,14 +59,17 @@ private[r2dbc] class PostgresSnapshotDao(executorProvider: R2dbcExecutorProvider protected def log: Logger = PostgresSnapshotDao.log + private val sqlCache = Sql.Cache(settings.numberOfDataPartitions > 1) + protected val persistenceExt: Persistence = Persistence(system) protected def snapshotTable(slice: Int): String = settings.snapshotTableWithSchema(slice) protected def upsertSql(slice: Int): String = { - // db_timestamp and tags columns were added in 1.2.0 - if (settings.querySettings.startFromSnapshotEnabled) - sql""" + sqlCache.get(slice, "upsertSql") { + // db_timestamp and tags columns were added in 1.2.0 + if (settings.querySettings.startFromSnapshotEnabled) + sql""" INSERT INTO ${snapshotTable(slice)} (slice, entity_type, persistence_id, seq_nr, write_timestamp, snapshot, ser_id, ser_manifest, meta_payload, meta_ser_id, meta_ser_manifest, db_timestamp, tags) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) @@ -81,58 +85,68 @@ private[r2dbc] class PostgresSnapshotDao(executorProvider: R2dbcExecutorProvider meta_payload = excluded.meta_payload, meta_ser_id = excluded.meta_ser_id, meta_ser_manifest = excluded.meta_ser_manifest""" - else - sql""" - INSERT INTO ${snapshotTable(slice)} - (slice, entity_type, persistence_id, seq_nr, write_timestamp, snapshot, ser_id, ser_manifest, meta_payload, meta_ser_id, meta_ser_manifest) - VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) - ON CONFLICT (persistence_id) - DO UPDATE SET - seq_nr = excluded.seq_nr, - write_timestamp = excluded.write_timestamp, - snapshot = excluded.snapshot, - ser_id = excluded.ser_id, - ser_manifest = excluded.ser_manifest, - meta_payload = excluded.meta_payload, - meta_ser_id = excluded.meta_ser_id, - meta_ser_manifest = excluded.meta_ser_manifest""" + else + sql""" + INSERT INTO ${snapshotTable(slice)} + (slice, entity_type, persistence_id, seq_nr, write_timestamp, snapshot, ser_id, ser_manifest, meta_payload, meta_ser_id, meta_ser_manifest) + VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) + ON CONFLICT (persistence_id) + DO UPDATE SET + seq_nr = excluded.seq_nr, + write_timestamp = excluded.write_timestamp, + snapshot = excluded.snapshot, + ser_id = excluded.ser_id, + ser_manifest = excluded.ser_manifest, + meta_payload = excluded.meta_payload, + meta_ser_id = excluded.meta_ser_id, + meta_ser_manifest = excluded.meta_ser_manifest""" + } } protected def selectSql(slice: Int, criteria: SnapshotSelectionCriteria): String = { - val maxSeqNrCondition = - if (criteria.maxSequenceNr != Long.MaxValue) " AND seq_nr <= ?" - else "" + def createSql = { + val maxSeqNrCondition = + if (criteria.maxSequenceNr != Long.MaxValue) " AND seq_nr <= ?" + else "" - val minSeqNrCondition = - if (criteria.minSequenceNr > 0L) " AND seq_nr >= ?" - else "" + val minSeqNrCondition = + if (criteria.minSequenceNr > 0L) " AND seq_nr >= ?" + else "" - val maxTimestampCondition = - if (criteria.maxTimestamp != Long.MaxValue) " AND write_timestamp <= ?" - else "" + val maxTimestampCondition = + if (criteria.maxTimestamp != Long.MaxValue) " AND write_timestamp <= ?" + else "" - val minTimestampCondition = - if (criteria.minTimestamp != 0L) " AND write_timestamp >= ?" - else "" + val minTimestampCondition = + if (criteria.minTimestamp != 0L) " AND write_timestamp >= ?" + else "" - // db_timestamp and tags columns were added in 1.2.0 - if (settings.querySettings.startFromSnapshotEnabled) - sql""" + // db_timestamp and tags columns were added in 1.2.0 + if (settings.querySettings.startFromSnapshotEnabled) + sql""" SELECT slice, persistence_id, seq_nr, db_timestamp, write_timestamp, snapshot, ser_id, ser_manifest, tags, meta_payload, meta_ser_id, meta_ser_manifest FROM ${snapshotTable(slice)} WHERE persistence_id = ? $maxSeqNrCondition $minSeqNrCondition $maxTimestampCondition $minTimestampCondition LIMIT 1""" + else + sql""" + SELECT slice, persistence_id, seq_nr, write_timestamp, snapshot, ser_id, ser_manifest, meta_payload, meta_ser_id, meta_ser_manifest + FROM ${snapshotTable(slice)} + WHERE persistence_id = ? + $maxSeqNrCondition $minSeqNrCondition $maxTimestampCondition $minTimestampCondition + LIMIT 1""" + } + + if (criteria == SnapshotSelectionCriteria.Latest) + sqlCache.get(slice, "selectSql-latest")(createSql) // normal case else - sql""" - SELECT slice, persistence_id, seq_nr, write_timestamp, snapshot, ser_id, ser_manifest, meta_payload, meta_ser_id, meta_ser_manifest - FROM ${snapshotTable(slice)} - WHERE persistence_id = ? - $maxSeqNrCondition $minSeqNrCondition $maxTimestampCondition $minTimestampCondition - LIMIT 1""" + createSql // no cache } private def deleteSql(slice: Int, criteria: SnapshotSelectionCriteria): String = { + // not caching, too many combinations + val maxSeqNrCondition = if (criteria.maxSequenceNr != Long.MaxValue) " AND seq_nr <= ?" else "" @@ -158,9 +172,9 @@ private[r2dbc] class PostgresSnapshotDao(executorProvider: R2dbcExecutorProvider private val currentDbTimestampSql = sql"SELECT CURRENT_TIMESTAMP AS db_timestamp" - protected def snapshotsBySlicesRangeSql(minSlice: Int, maxSlice: Int): String = { - - sql""" + protected def snapshotsBySlicesRangeSql(minSlice: Int, maxSlice: Int): String = + sqlCache.get(minSlice, s"snapshotsBySlicesRangeSql-$minSlice-$maxSlice") { + sql""" SELECT slice, persistence_id, seq_nr, db_timestamp, write_timestamp, snapshot, ser_id, ser_manifest, tags, meta_payload, meta_ser_id, meta_ser_manifest FROM ${snapshotTable(minSlice)} WHERE entity_type = ? @@ -168,18 +182,19 @@ private[r2dbc] class PostgresSnapshotDao(executorProvider: R2dbcExecutorProvider AND db_timestamp >= ? ORDER BY db_timestamp, seq_nr LIMIT ?""" - } + } - protected def selectBucketsSql(entityType: String, minSlice: Int, maxSlice: Int): String = { - sql""" - SELECT extract(EPOCH from db_timestamp)::BIGINT / 10 AS bucket, count(*) AS count - FROM ${snapshotTable(minSlice)} - WHERE entity_type = ? - AND ${sliceCondition(minSlice, maxSlice)} - AND db_timestamp >= ? AND db_timestamp <= ? - GROUP BY bucket ORDER BY bucket LIMIT ? - """ - } + protected def selectBucketsSql(minSlice: Int, maxSlice: Int): String = + sqlCache.get(minSlice, s"selectBucketsSql-$minSlice-$maxSlice") { + sql""" + SELECT extract(EPOCH from db_timestamp)::BIGINT / 10 AS bucket, count(*) AS count + FROM ${snapshotTable(minSlice)} + WHERE entity_type = ? + AND ${sliceCondition(minSlice, maxSlice)} + AND db_timestamp >= ? AND db_timestamp <= ? + GROUP BY bucket ORDER BY bucket LIMIT ? + """ + } protected def sliceCondition(minSlice: Int, maxSlice: Int): String = s"slice in (${(minSlice to maxSlice).mkString(",")})" @@ -439,7 +454,7 @@ private[r2dbc] class PostgresSnapshotDao(executorProvider: R2dbcExecutorProvider val result = executor.select(s"select bucket counts [$minSlice - $maxSlice]")( connection => { - val stmt = connection.createStatement(selectBucketsSql(entityType, minSlice, maxSlice)) + val stmt = connection.createStatement(selectBucketsSql(minSlice, maxSlice)) bindSelectBucketsSql(stmt, entityType, fromTimestamp, toTimestamp, limit) }, row => { diff --git a/core/src/main/scala/akka/persistence/r2dbc/internal/sqlserver/SqlServerDurableStateDao.scala b/core/src/main/scala/akka/persistence/r2dbc/internal/sqlserver/SqlServerDurableStateDao.scala index 71ac6108..8d79f6a0 100644 --- a/core/src/main/scala/akka/persistence/r2dbc/internal/sqlserver/SqlServerDurableStateDao.scala +++ b/core/src/main/scala/akka/persistence/r2dbc/internal/sqlserver/SqlServerDurableStateDao.scala @@ -23,6 +23,7 @@ import org.slf4j.LoggerFactory import akka.persistence.r2dbc.internal.InstantFactory import akka.persistence.r2dbc.internal.R2dbcExecutorProvider +import akka.persistence.r2dbc.internal.Sql /** * INTERNAL API @@ -42,19 +43,28 @@ private[r2dbc] class SqlServerDurableStateDao(executorProvider: R2dbcExecutorPro require(settings.useAppTimestamp, "SqlServer requires akka.persistence.r2dbc.use-app-timestamp=on") + private val sqlCache = Sql.Cache(settings.numberOfDataPartitions > 1) + override def log: Logger = SqlServerDurableStateDao.log override protected def insertStateSql( slice: Int, entityType: String, additionalBindings: immutable.IndexedSeq[EvaluatedAdditionalColumnBindings]): String = { - val stateTable = settings.getDurableStateTableWithSchema(entityType, slice) - val additionalCols = additionalInsertColumns(additionalBindings) - val additionalParams = additionalInsertParameters(additionalBindings) - sql""" + def createSql = { + val stateTable = settings.getDurableStateTableWithSchema(entityType, slice) + val additionalCols = additionalInsertColumns(additionalBindings) + val additionalParams = additionalInsertParameters(additionalBindings) + sql""" INSERT INTO $stateTable (slice, entity_type, persistence_id, revision, state_ser_id, state_ser_manifest, state_payload, tags$additionalCols, db_timestamp) VALUES (@slice, @entityType, @persistenceId, @revision, @stateSerId, @stateSerManifest, @statePayload, @tags$additionalParams, @now)""" + } + + if (additionalBindings.isEmpty) + sqlCache.get(slice, s"insertStateSql-$entityType")(createSql) + else + createSql // no cache } /** @@ -69,20 +79,22 @@ private[r2dbc] class SqlServerDurableStateDao(executorProvider: R2dbcExecutorPro super.updateStateSql(slice, entityType, updateTags, additionalBindings, currentTimestamp = "?") override def selectBucketsSql(entityType: String, minSlice: Int, maxSlice: Int): String = { - val stateTable = settings.getDurableStateTableWithSchema(entityType, minSlice) + sqlCache.get(minSlice, s"selectBucketsSql-$entityType-$minSlice-$maxSlice") { + val stateTable = settings.getDurableStateTableWithSchema(entityType, minSlice) - val subQuery = - s""" + val subQuery = + s""" select TOP(@limit) CAST(DATEDIFF(s,'1970-01-01 00:00:00',db_timestamp) AS BIGINT) / 10 AS bucket FROM $stateTable WHERE entity_type = @entityType AND ${sliceCondition(minSlice, maxSlice)} AND db_timestamp >= @fromTimestamp AND db_timestamp <= @toTimestamp """ - sql""" - SELECT bucket, count(*) as count from ($subQuery) as sub - GROUP BY bucket ORDER BY bucket - """ + sql""" + SELECT bucket, count(*) as count from ($subQuery) as sub + GROUP BY bucket ORDER BY bucket + """ + } } override def bindSelectBucketSql( @@ -120,6 +132,7 @@ private[r2dbc] class SqlServerDurableStateDao(executorProvider: R2dbcExecutorPro backtracking: Boolean, minSlice: Int, maxSlice: Int): String = { + // not caching, too many combinations val behindCurrentTimeIntervalCondition: String = if (behindCurrentTime > Duration.Zero) @@ -142,19 +155,21 @@ private[r2dbc] class SqlServerDurableStateDao(executorProvider: R2dbcExecutorPro "SELECT TOP(@limit) persistence_id, revision, db_timestamp, SYSUTCDATETIME() AS read_db_timestamp, state_ser_id, state_ser_manifest, state_payload " sql""" - $selectColumns - FROM $stateTable - WHERE entity_type = @entityType - AND ${sliceCondition(minSlice, maxSlice)} - AND db_timestamp >= @fromTimestamp $maxDbTimestampParamCondition $behindCurrentTimeIntervalCondition - ORDER BY db_timestamp, revision""" + $selectColumns + FROM $stateTable + WHERE entity_type = @entityType + AND ${sliceCondition(minSlice, maxSlice)} + AND db_timestamp >= @fromTimestamp $maxDbTimestampParamCondition $behindCurrentTimeIntervalCondition + ORDER BY db_timestamp, revision""" } override protected def bindTimestampNow(stmt: Statement, getAndIncIndex: () => Int): Statement = stmt.bindTimestamp(getAndIncIndex(), InstantFactory.now()) - override protected def persistenceIdsForEntityTypeAfterSql(table: String): String = + override protected def persistenceIdsForEntityTypeAfterSql(table: String): String = { + // not worth caching sql"SELECT TOP(@limit) persistence_id from $table WHERE persistence_id LIKE @persistenceIdLike AND persistence_id > @after ORDER BY persistence_id" + } override protected def bindPersistenceIdsForEntityTypeAfterSql( stmt: Statement, @@ -178,14 +193,20 @@ private[r2dbc] class SqlServerDurableStateDao(executorProvider: R2dbcExecutorPro .bind("@persistenceIdLike", entityType + likeStmtPostfix) } - override protected def persistenceIdsForEntityTypeSql(table: String): String = + override protected def persistenceIdsForEntityTypeSql(table: String): String = { + // not worth caching sql"SELECT TOP(@limit) persistence_id from $table WHERE persistence_id LIKE @persistenceIdLike ORDER BY persistence_id" + } - override protected def allPersistenceIdsSql(table: String): String = + override protected def allPersistenceIdsSql(table: String): String = { + // not worth caching sql"SELECT TOP(@limit) persistence_id from $table ORDER BY persistence_id" + } - override protected def allPersistenceIdsAfterSql(table: String): String = + override protected def allPersistenceIdsAfterSql(table: String): String = { + // not worth caching sql"SELECT TOP(@limit) persistence_id from $table WHERE persistence_id > @persistenceId ORDER BY persistence_id" + } override def bindAllPersistenceIdsAfterSql(stmt: Statement, after: String, limit: Long): Statement = stmt .bind("@persistenceId", after) diff --git a/core/src/main/scala/akka/persistence/r2dbc/internal/sqlserver/SqlServerJournalDao.scala b/core/src/main/scala/akka/persistence/r2dbc/internal/sqlserver/SqlServerJournalDao.scala index 0ef3b304..db8de98a 100644 --- a/core/src/main/scala/akka/persistence/r2dbc/internal/sqlserver/SqlServerJournalDao.scala +++ b/core/src/main/scala/akka/persistence/r2dbc/internal/sqlserver/SqlServerJournalDao.scala @@ -11,6 +11,7 @@ import org.slf4j.LoggerFactory import akka.annotation.InternalApi import akka.persistence.r2dbc.internal.InstantFactory import akka.persistence.r2dbc.internal.R2dbcExecutorProvider +import akka.persistence.r2dbc.internal.Sql import akka.persistence.r2dbc.internal.Sql.InterpolationWithAdapter import akka.persistence.r2dbc.internal.codec.TimestampCodec.TimestampCodecRichStatement import akka.persistence.r2dbc.internal.postgres.PostgresJournalDao @@ -37,14 +38,18 @@ private[r2dbc] class SqlServerJournalDao(executorProvider: R2dbcExecutorProvider settings.dbTimestampMonotonicIncreasing, "SqlServer requires akka.persistence.r2dbc.db-timestamp-monotonic-increasing=on") + private val sqlCache = Sql.Cache(settings.numberOfDataPartitions > 1) + override def log = SqlServerJournalDao.log override protected def insertEventWithParameterTimestampSql(slice: Int) = - sql""" + sqlCache.get(slice, "insertEventWithParameterTimestampSql") { + sql""" INSERT INTO ${journalTable(slice)} (slice, entity_type, persistence_id, seq_nr, writer, adapter_manifest, event_ser_id, event_ser_manifest, event_payload, tags, meta_ser_id, meta_ser_manifest, meta_payload, db_timestamp) OUTPUT inserted.db_timestamp VALUES (@slice, @entityType, @persistenceId, @seqNr, @writer, @adapterManifest, @eventSerId, @eventSerManifest, @eventPayload, @tags, @metaSerId, @metaSerManifest, @metaSerPayload, @dbTimestamp)""" + } override protected def bindTimestampNow(stmt: Statement, getAndIncIndex: () => Int): Statement = stmt.bindTimestamp(getAndIncIndex(), InstantFactory.now()) diff --git a/core/src/main/scala/akka/persistence/r2dbc/internal/sqlserver/SqlServerQueryDao.scala b/core/src/main/scala/akka/persistence/r2dbc/internal/sqlserver/SqlServerQueryDao.scala index 44f8b575..6f2f9c48 100644 --- a/core/src/main/scala/akka/persistence/r2dbc/internal/sqlserver/SqlServerQueryDao.scala +++ b/core/src/main/scala/akka/persistence/r2dbc/internal/sqlserver/SqlServerQueryDao.scala @@ -17,6 +17,7 @@ import org.slf4j.LoggerFactory import akka.annotation.InternalApi import akka.persistence.r2dbc.internal.InstantFactory import akka.persistence.r2dbc.internal.R2dbcExecutorProvider +import akka.persistence.r2dbc.internal.Sql import akka.persistence.r2dbc.internal.Sql.InterpolationWithAdapter import akka.persistence.r2dbc.internal.codec.TimestampCodec.TimestampCodecRichStatement import akka.persistence.r2dbc.internal.postgres.PostgresQueryDao @@ -42,15 +43,19 @@ private[r2dbc] class SqlServerQueryDao(executorProvider: R2dbcExecutorProvider) // def because of order of initialization override def log = SqlServerQueryDao.log + private val sqlCache = Sql.Cache(settings.numberOfDataPartitions > 1) + override protected def sqlDbTimestamp = "SYSUTCDATETIME()" - override protected def selectEventsSql(slice: Int) = - sql""" + override protected def selectEventsSql(slice: Int): String = + sqlCache.get(slice, "selectEventsSql") { + sql""" SELECT TOP(@limit) slice, entity_type, persistence_id, seq_nr, db_timestamp, SYSUTCDATETIME() AS read_db_timestamp, event_ser_id, event_ser_manifest, event_payload, writer, adapter_manifest, meta_ser_id, meta_ser_manifest, meta_payload, tags from ${journalTable(slice)} WHERE persistence_id = @persistenceId AND seq_nr >= @from AND seq_nr <= @to AND deleted = $sqlFalse ORDER BY seq_nr""" + } /** * custom binding because the first param in the query is @limit (or '0' when using positional binding) @@ -70,8 +75,9 @@ private[r2dbc] class SqlServerQueryDao(executorProvider: R2dbcExecutorProvider) .bind("@from", fromSequenceNr) .bind("@to", toSequenceNr) - override protected def selectBucketsSql(minSlice: Int, maxSlice: Int): String = { - sql""" + override protected def selectBucketsSql(minSlice: Int, maxSlice: Int): String = + sqlCache.get(minSlice, s"selectBucketsSql-$minSlice-$maxSlice") { + sql""" SELECT TOP(@limit) bucket, count(*) as count from (select DATEDIFF(s,'1970-01-01 00:00:00', db_timestamp)/10 as bucket FROM ${journalTable(minSlice)} @@ -81,7 +87,7 @@ private[r2dbc] class SqlServerQueryDao(executorProvider: R2dbcExecutorProvider) AND deleted = $sqlFalse) as sub GROUP BY bucket ORDER BY bucket """ - } + } override protected def bindSelectBucketsSql( stmt: Statement, @@ -102,6 +108,7 @@ private[r2dbc] class SqlServerQueryDao(executorProvider: R2dbcExecutorProvider) backtracking: Boolean, minSlice: Int, maxSlice: Int): String = { + // not caching, too many combinations def toDbTimestampParamCondition = if (toDbTimestampParam) "AND db_timestamp <= @until" else "" @@ -141,12 +148,13 @@ private[r2dbc] class SqlServerQueryDao(executorProvider: R2dbcExecutorProvider) stmt } - override protected def persistenceIdsForEntityTypeAfterSql(minSlice: Int): String = { - sql""" + override protected def persistenceIdsForEntityTypeAfterSql(minSlice: Int): String = + sqlCache.get(minSlice, "persistenceIdsForEntityTypeAfterSql") { + sql""" SELECT TOP(@limit) persistence_id FROM ( SELECT DISTINCT(persistence_id) from ${journalTable(minSlice)} WHERE persistence_id LIKE @persistenceIdLike AND persistence_id > @persistenceId ) as sub ORDER BY persistence_id""" - } + } override protected def bindPersistenceIdsForEntityTypeAfterSql( stmt: Statement, @@ -160,12 +168,13 @@ private[r2dbc] class SqlServerQueryDao(executorProvider: R2dbcExecutorProvider) .bind("@persistenceId", afterPersistenceId) } - override protected def persistenceIdsForEntityTypeSql(minSlice: Int): String = { - sql""" + override protected def persistenceIdsForEntityTypeSql(minSlice: Int): String = + sqlCache.get(minSlice, "persistenceIdsForEntityTypeSql") { + sql""" SELECT TOP(@limit) persistence_id FROM ( SELECT DISTINCT(persistence_id) from ${journalTable(minSlice)} WHERE persistence_id LIKE @persistenceIdLike ) as sub ORDER BY persistence_id""" - } + } override protected def bindPersistenceIdsForEntityTypeSql( stmt: Statement, @@ -185,16 +194,18 @@ private[r2dbc] class SqlServerQueryDao(executorProvider: R2dbcExecutorProvider) .bind("@limit", limit) .bind("@persistenceId", afterPersistenceId) } - override protected def allPersistenceIdsAfterSql(minSlice: Int): String = { - sql""" + override protected def allPersistenceIdsAfterSql(minSlice: Int): String = + sqlCache.get(minSlice, "allPersistenceIdsAfterSql") { + sql""" SELECT TOP(@limit) persistence_id FROM ( SELECT DISTINCT(persistence_id) from ${journalTable(minSlice)} WHERE persistence_id > @persistenceId ) as sub ORDER BY persistence_id""" - } + } - override protected def allPersistenceIdsSql(minSlice: Int): String = { - sql"SELECT TOP(@limit) persistence_id FROM (SELECT DISTINCT(persistence_id) from ${journalTable(minSlice)}) as sub ORDER BY persistence_id" - } + override protected def allPersistenceIdsSql(minSlice: Int): String = + sqlCache.get(minSlice, "allPersistenceIdsSql") { + sql"SELECT TOP(@limit) persistence_id FROM (SELECT DISTINCT(persistence_id) from ${journalTable(minSlice)}) as sub ORDER BY persistence_id" + } override def currentDbTimestamp(slice: Int): Future[Instant] = Future.successful(InstantFactory.now()) diff --git a/core/src/main/scala/akka/persistence/r2dbc/internal/sqlserver/SqlServerSnapshotDao.scala b/core/src/main/scala/akka/persistence/r2dbc/internal/sqlserver/SqlServerSnapshotDao.scala index f3f84499..db753f41 100644 --- a/core/src/main/scala/akka/persistence/r2dbc/internal/sqlserver/SqlServerSnapshotDao.scala +++ b/core/src/main/scala/akka/persistence/r2dbc/internal/sqlserver/SqlServerSnapshotDao.scala @@ -18,6 +18,7 @@ import akka.persistence.r2dbc.internal.InstantFactory import akka.persistence.r2dbc.internal.R2dbcExecutorProvider import akka.persistence.r2dbc.internal.SnapshotDao.SerializedSnapshotMetadata import akka.persistence.r2dbc.internal.SnapshotDao.SerializedSnapshotRow +import akka.persistence.r2dbc.internal.Sql import akka.persistence.r2dbc.internal.Sql.InterpolationWithAdapter import akka.persistence.r2dbc.internal.codec.PayloadCodec.RichStatement import akka.persistence.r2dbc.internal.codec.TagsCodec.TagsCodecRichStatement @@ -42,42 +43,53 @@ private[r2dbc] class SqlServerSnapshotDao(executorProvider: R2dbcExecutorProvide override def log: Logger = SqlServerSnapshotDao.log + private val sqlCache = Sql.Cache(settings.numberOfDataPartitions > 1) + override def selectSql(slice: Int, criteria: SnapshotSelectionCriteria): String = { - val maxSeqNrCondition = - if (criteria.maxSequenceNr != Long.MaxValue) " AND seq_nr <= @maxSeqNr" - else "" + def createSql = { + val maxSeqNrCondition = + if (criteria.maxSequenceNr != Long.MaxValue) " AND seq_nr <= @maxSeqNr" + else "" - val minSeqNrCondition = - if (criteria.minSequenceNr > 0L) " AND seq_nr >= @minSeqNr" - else "" + val minSeqNrCondition = + if (criteria.minSequenceNr > 0L) " AND seq_nr >= @minSeqNr" + else "" - val maxTimestampCondition = - if (criteria.maxTimestamp != Long.MaxValue) " AND write_timestamp <= @maxTimestamp" - else "" + val maxTimestampCondition = + if (criteria.maxTimestamp != Long.MaxValue) " AND write_timestamp <= @maxTimestamp" + else "" - val minTimestampCondition = - if (criteria.minTimestamp != 0L) " AND write_timestamp >= @minTimestamp" - else "" + val minTimestampCondition = + if (criteria.minTimestamp != 0L) " AND write_timestamp >= @minTimestamp" + else "" - if (settings.querySettings.startFromSnapshotEnabled) - sql""" + if (settings.querySettings.startFromSnapshotEnabled) + sql""" SELECT TOP(1) slice, persistence_id, seq_nr, db_timestamp, write_timestamp, snapshot, ser_id, ser_manifest, tags, meta_payload, meta_ser_id, meta_ser_manifest FROM ${snapshotTable(slice)} WHERE persistence_id = @persistenceId $maxSeqNrCondition $minSeqNrCondition $maxTimestampCondition $minTimestampCondition """ + else + sql""" + SELECT TOP (1) slice, persistence_id, seq_nr, write_timestamp, snapshot, ser_id, ser_manifest, meta_payload, meta_ser_id, meta_ser_manifest + FROM ${snapshotTable(slice)} + WHERE persistence_id = @persistenceId + $maxSeqNrCondition $minSeqNrCondition $maxTimestampCondition $minTimestampCondition + """ + } + + if (criteria == SnapshotSelectionCriteria.Latest) + sqlCache.get(slice, "selectSql-latest")(createSql) // normal case else - sql""" - SELECT TOP (1) slice, persistence_id, seq_nr, write_timestamp, snapshot, ser_id, ser_manifest, meta_payload, meta_ser_id, meta_ser_manifest - FROM ${snapshotTable(slice)} - WHERE persistence_id = @persistenceId - $maxSeqNrCondition $minSeqNrCondition $maxTimestampCondition $minTimestampCondition - """ + createSql // no cache + } - override protected def upsertSql(slice: Int): String = { - if (settings.querySettings.startFromSnapshotEnabled) - sql""" + override protected def upsertSql(slice: Int): String = + sqlCache.get(slice, "upsertSql") { + if (settings.querySettings.startFromSnapshotEnabled) + sql""" UPDATE ${snapshotTable(slice)} SET seq_nr = @seqNr, db_timestamp = @dbTimestamp, @@ -95,25 +107,25 @@ private[r2dbc] class SqlServerSnapshotDao(executorProvider: R2dbcExecutorProvide (slice, entity_type, persistence_id, seq_nr, write_timestamp, snapshot, ser_id, ser_manifest, meta_payload, meta_ser_id, meta_ser_manifest, db_timestamp, tags) VALUES (@slice, @entityType, @persistenceId, @seqNr, @writeTimestamp, @snapshot, @serId, @serManifest, @metaPayload, @metaSerId, @metaSerManifest, @dbTimestamp, @tags) """ - else - sql""" - UPDATE ${snapshotTable(slice)} SET - seq_nr = @seqNr, - write_timestamp = @writeTimestamp, - snapshot = @snapshot, - ser_id = @serId, - ser_manifest = @serManifest, - meta_payload = @metaPayload, - meta_ser_id = @metaSerId, - meta_ser_manifest = @metaSerManifest, - tags = @tags - where persistence_id = @persistenceId - if @@ROWCOUNT = 0 - INSERT INTO ${snapshotTable(slice)} - (slice, entity_type, persistence_id, seq_nr, write_timestamp, snapshot, ser_id, ser_manifest, meta_payload, meta_ser_id, meta_ser_manifest, tags) - VALUES (@slice, @entityType, @persistenceId, @seqNr, @writeTimestamp, @snapshot, @serId, @serManifest, @metaPayload, @metaSerId, @metaSerManifest, @tags) - """ - } + else + sql""" + UPDATE ${snapshotTable(slice)} SET + seq_nr = @seqNr, + write_timestamp = @writeTimestamp, + snapshot = @snapshot, + ser_id = @serId, + ser_manifest = @serManifest, + meta_payload = @metaPayload, + meta_ser_id = @metaSerId, + meta_ser_manifest = @metaSerManifest, + tags = @tags + where persistence_id = @persistenceId + if @@ROWCOUNT = 0 + INSERT INTO ${snapshotTable(slice)} + (slice, entity_type, persistence_id, seq_nr, write_timestamp, snapshot, ser_id, ser_manifest, meta_payload, meta_ser_id, meta_ser_manifest, tags) + VALUES (@slice, @entityType, @persistenceId, @seqNr, @writeTimestamp, @snapshot, @serId, @serManifest, @metaPayload, @metaSerId, @metaSerManifest, @tags) + """ + } override protected def bindUpsertSql(statement: Statement, serializedRow: SerializedSnapshotRow): Statement = { statement @@ -162,22 +174,22 @@ private[r2dbc] class SqlServerSnapshotDao(executorProvider: R2dbcExecutorProvide .bindTimestamp("@toTimestamp", toTimestamp) } - override protected def selectBucketsSql(entityType: String, minSlice: Int, maxSlice: Int): String = { - - // group by column alias (bucket) needs a sub query - val subQuery = - s""" + override protected def selectBucketsSql(minSlice: Int, maxSlice: Int): String = + sqlCache.get(minSlice, s"selectBucketsSql-$minSlice-$maxSlice") { + // group by column alias (bucket) needs a sub query + val subQuery = + s""" select TOP(@limit) CAST(DATEDIFF(s,'1970-01-01 00:00:00',db_timestamp) AS BIGINT) / 10 AS bucket FROM ${snapshotTable(minSlice)} WHERE entity_type = @entityType AND ${sliceCondition(minSlice, maxSlice)} AND db_timestamp >= @fromTimestamp AND db_timestamp <= @toTimestamp """ - sql""" - SELECT bucket, count(*) as count from ($subQuery) as sub - GROUP BY bucket ORDER BY bucket - """ - } + sql""" + SELECT bucket, count(*) as count from ($subQuery) as sub + GROUP BY bucket ORDER BY bucket + """ + } override protected def bindSnapshotsBySlicesRangeSql( stmt: Statement, @@ -191,7 +203,8 @@ private[r2dbc] class SqlServerSnapshotDao(executorProvider: R2dbcExecutorProvide } override protected def snapshotsBySlicesRangeSql(minSlice: Int, maxSlice: Int): String = - sql""" + sqlCache.get(minSlice, s"snapshotsBySlicesRangeSql-$minSlice-$maxSlice") { + sql""" SELECT TOP(@bufferSize) slice, persistence_id, seq_nr, db_timestamp, write_timestamp, snapshot, ser_id, ser_manifest, tags, meta_payload, meta_ser_id, meta_ser_manifest FROM ${snapshotTable(minSlice)} WHERE entity_type = @entityType @@ -199,6 +212,7 @@ private[r2dbc] class SqlServerSnapshotDao(executorProvider: R2dbcExecutorProvide AND db_timestamp >= @fromTimestamp ORDER BY db_timestamp, seq_nr """ + } override def currentDbTimestamp(slice: Int): Future[Instant] = Future.successful(InstantFactory.now()) diff --git a/migration-tests/src/test/scala/akka/persistence/r2dbc/migration/MigrationToolSpec.scala b/migration-tests/src/test/scala/akka/persistence/r2dbc/migration/MigrationToolSpec.scala index c4cceeff..a20dff25 100644 --- a/migration-tests/src/test/scala/akka/persistence/r2dbc/migration/MigrationToolSpec.scala +++ b/migration-tests/src/test/scala/akka/persistence/r2dbc/migration/MigrationToolSpec.scala @@ -112,8 +112,10 @@ class MigrationToolSpec // don't run this for Yugabyte since it is using akka-persistence-jdbc private val postgresTest = dialect == "postgres" - private val sqlServerTest = dialect == "sqlserver" - private val testEnabled = postgresTest || sqlServerTest + // FIXME flaky for sqlserver, issue https://github.com/akka/akka-persistence-r2dbc/issues/523 +// private val sqlServerTest = dialect == "sqlserver" +// private val testEnabled = postgresTest || sqlServerTest + private val testEnabled = postgresTest private val createJournalTablePostgres = """CREATE TABLE IF NOT EXISTS jdbc_event_journal(