Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

perf: Cache sql construction #522

Merged
merged 4 commits into from
Feb 14, 2024
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
51 changes: 51 additions & 0 deletions core/src/main/scala/akka/persistence/r2dbc/internal/Sql.scala
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
package akka.persistence.r2dbc.internal

import scala.annotation.varargs
import scala.collection.immutable.IntMap

import akka.annotation.InternalApi
import akka.annotation.InternalStableApi
Expand Down Expand Up @@ -81,4 +82,54 @@ object Sql {
}
}

object Cache {
def apply(dataPartitionsEnabled: Boolean): Cache =
if (dataPartitionsEnabled) new CacheBySlice
else new CacheIgnoringSlice
}

sealed trait Cache {
def get(slice: Int, key: Any)(orCreate: => String): String
}

private final class CacheIgnoringSlice extends Cache {
private var entries: Map[Any, String] = Map.empty

def get(slice: Int, key: Any)(orCreate: => String): String = {
entries.get(key) match {
case Some(value) => value
case None =>
// it's just a cache so no need for guarding concurrent updates
val entry = orCreate
entries = entries.updated(key, entry)
entry
}
}
}

private final class CacheBySlice extends Cache {
private var entriesPerSlice: IntMap[Map[Any, String]] = IntMap.empty

def get(slice: Int, key: Any)(orCreate: => String): String = {

def createEntry(entries: Map[Any, String]): String = {
// it's just a cache so no need for guarding concurrent updates
val entry = orCreate
val newEntries = entries.updated(key, entry)
entriesPerSlice = entriesPerSlice.updated(slice, newEntries)
entry
}

entriesPerSlice.get(slice) match {
case Some(entries) =>
entries.get(key) match {
case Some(value) => value
case None => createEntry(entries)
}
case None =>
createEntry(Map.empty)
}
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import akka.persistence.r2dbc.internal.JournalDao
import akka.persistence.r2dbc.internal.R2dbcExecutor
import akka.persistence.r2dbc.internal.R2dbcExecutorProvider
import akka.persistence.r2dbc.internal.SerializedEventMetadata
import akka.persistence.r2dbc.internal.Sql
import akka.persistence.r2dbc.internal.Sql.InterpolationWithAdapter
import akka.persistence.r2dbc.internal.codec.PayloadCodec.RichStatement
import akka.persistence.r2dbc.internal.codec.TagsCodec.TagsCodecRichStatement
Expand Down Expand Up @@ -70,25 +71,29 @@ private[r2dbc] class PostgresJournalDao(executorProvider: R2dbcExecutorProvider)

protected val persistenceExt: Persistence = Persistence(system)

private val sqlCache = Sql.Cache(settings.numberOfDataPartitions > 1)

protected def journalTable(slice: Int): String = settings.journalTableWithSchema(slice)

protected def insertEventWithParameterTimestampSql(slice: Int): String = {
val table = journalTable(slice)
val baseSql = insertEvenBaseSql(table)
if (settings.dbTimestampMonotonicIncreasing)
sql"$baseSql ?) RETURNING db_timestamp"
else
sql"$baseSql GREATEST(?, ${timestampSubSelect(table)})) RETURNING db_timestamp"
}
protected def insertEventWithParameterTimestampSql(slice: Int): String =
sqlCache.get(slice, "insertEventWithParameterTimestampSql") {
val table = journalTable(slice)
val baseSql = insertEvenBaseSql(table)
if (settings.dbTimestampMonotonicIncreasing)
sql"$baseSql ?) RETURNING db_timestamp"
else
sql"$baseSql GREATEST(?, ${timestampSubSelect(table)})) RETURNING db_timestamp"
}

private def insertEventWithTransactionTimestampSql(slice: Int) = {
val table = journalTable(slice)
val baseSql = insertEvenBaseSql(table)
if (settings.dbTimestampMonotonicIncreasing)
sql"$baseSql CURRENT_TIMESTAMP) RETURNING db_timestamp"
else
sql"$baseSql GREATEST(CURRENT_TIMESTAMP, ${timestampSubSelect(table)})) RETURNING db_timestamp"
}
private def insertEventWithTransactionTimestampSql(slice: Int) =
sqlCache.get(slice, "insertEventWithTransactionTimestampSql") {
val table = journalTable(slice)
val baseSql = insertEvenBaseSql(table)
if (settings.dbTimestampMonotonicIncreasing)
sql"$baseSql CURRENT_TIMESTAMP) RETURNING db_timestamp"
else
sql"$baseSql GREATEST(CURRENT_TIMESTAMP, ${timestampSubSelect(table)})) RETURNING db_timestamp"
}

private def insertEvenBaseSql(table: String) = {
s"INSERT INTO $table " +
Expand All @@ -103,31 +108,50 @@ private[r2dbc] class PostgresJournalDao(executorProvider: R2dbcExecutorProvider)
s"(SELECT db_timestamp + '1 microsecond'::interval FROM $table " +
"WHERE persistence_id = ? AND seq_nr = ?)"

private def selectHighestSequenceNrSql(slice: Int) = sql"""
SELECT MAX(seq_nr) from ${journalTable(slice)}
WHERE persistence_id = ? AND seq_nr >= ?"""
private def selectHighestSequenceNrSql(slice: Int) =
sqlCache.get(slice, "selectHighestSequenceNrSql") {
sql"""
SELECT MAX(seq_nr) from ${journalTable(slice)}
WHERE persistence_id = ? AND seq_nr >= ?"""
}

private def selectLowestSequenceNrSql(slice: Int) =
sql"""
SELECT MIN(seq_nr) from ${journalTable(slice)}
WHERE persistence_id = ?"""

private def deleteEventsSql(slice: Int) = sql"""
DELETE FROM ${journalTable(slice)}
WHERE persistence_id = ? AND seq_nr >= ? AND seq_nr <= ?"""

protected def insertDeleteMarkerSql(slice: Int, timestamp: String = "CURRENT_TIMESTAMP"): String = sql"""
INSERT INTO ${journalTable(slice)}
(slice, entity_type, persistence_id, seq_nr, db_timestamp, writer, adapter_manifest, event_ser_id, event_ser_manifest, event_payload, deleted)
VALUES (?, ?, ?, ?, $timestamp, ?, ?, ?, ?, ?, ?)"""

private def deleteEventsByPersistenceIdBeforeTimestampSql(slice: Int) = sql"""
DELETE FROM ${journalTable(slice)}
WHERE persistence_id = ? AND db_timestamp < ?"""

private def deleteEventsBySliceBeforeTimestampSql(slice: Int) = sql"""
DELETE FROM ${journalTable(slice)}
WHERE slice = ? AND entity_type = ? AND db_timestamp < ?"""
sqlCache.get(slice, "selectLowestSequenceNrSql") {
sql"""
SELECT MIN(seq_nr) from ${journalTable(slice)}
WHERE persistence_id = ?"""
}

private def deleteEventsSql(slice: Int) =
sqlCache.get(slice, "deleteEventsSql") {
sql"""
DELETE FROM ${journalTable(slice)}
WHERE persistence_id = ? AND seq_nr >= ? AND seq_nr <= ?"""
}

protected def insertDeleteMarkerSql(slice: Int, timestamp: String = "CURRENT_TIMESTAMP"): String = {
// timestamp param doesn't have to be part of cache key because it's just different for different dialects
sqlCache.get(slice, "insertDeleteMarkerSql") {
sql"""
INSERT INTO ${journalTable(slice)}
(slice, entity_type, persistence_id, seq_nr, db_timestamp, writer, adapter_manifest, event_ser_id, event_ser_manifest, event_payload, deleted)
VALUES (?, ?, ?, ?, $timestamp, ?, ?, ?, ?, ?, ?)"""
}
}

private def deleteEventsByPersistenceIdBeforeTimestampSql(slice: Int) =
sqlCache.get(slice, "deleteEventsByPersistenceIdBeforeTimestampSql") {
sql"""
DELETE FROM ${journalTable(slice)}
WHERE persistence_id = ? AND db_timestamp < ?"""
}

private def deleteEventsBySliceBeforeTimestampSql(slice: Int) =
sqlCache.get(slice, "deleteEventsBySliceBeforeTimestampSql") {
sql"""
DELETE FROM ${journalTable(slice)}
WHERE slice = ? AND entity_type = ? AND db_timestamp < ?"""
}

/**
* All events must be for the same persistenceId.
Expand Down
Loading