Skip to content

Commit

Permalink
perf: Cache sql construction
Browse files Browse the repository at this point in the history
* avoid string concatenation and allocations
* since they are  now created each time its used due to the data partitions by slice
  • Loading branch information
patriknw committed Feb 9, 2024
1 parent 28b95bc commit a42a479
Show file tree
Hide file tree
Showing 2 changed files with 89 additions and 39 deletions.
26 changes: 26 additions & 0 deletions core/src/main/scala/akka/persistence/r2dbc/internal/Sql.scala
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
package akka.persistence.r2dbc.internal

import scala.annotation.varargs
import scala.collection.immutable.IntMap

import akka.annotation.InternalApi
import akka.annotation.InternalStableApi
Expand Down Expand Up @@ -81,4 +82,29 @@ object Sql {
}
}

final class Cache {
private var entriesPerSlice: IntMap[Map[Any, String]] = IntMap.empty

def get(slice: Int, key: Any)(orCreate: => String): String = {

def createEntry(entries: Map[Any, String]): String = {
// it's just a cache so no need for guarding concurrent updates
val entry = orCreate
val newEntries = entries.updated(key, entry)
entriesPerSlice = entriesPerSlice.updated(slice, newEntries)
entry
}

entriesPerSlice.get(slice) match {
case Some(entries) =>
entries.get(key) match {
case Some(value) => value
case None => createEntry(entries)
}
case None =>
createEntry(Map.empty)
}
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import akka.persistence.r2dbc.internal.JournalDao
import akka.persistence.r2dbc.internal.R2dbcExecutor
import akka.persistence.r2dbc.internal.R2dbcExecutorProvider
import akka.persistence.r2dbc.internal.SerializedEventMetadata
import akka.persistence.r2dbc.internal.Sql
import akka.persistence.r2dbc.internal.Sql.InterpolationWithAdapter
import akka.persistence.r2dbc.internal.codec.PayloadCodec.RichStatement
import akka.persistence.r2dbc.internal.codec.TagsCodec.TagsCodecRichStatement
Expand Down Expand Up @@ -70,25 +71,29 @@ private[r2dbc] class PostgresJournalDao(executorProvider: R2dbcExecutorProvider)

protected val persistenceExt: Persistence = Persistence(system)

private val sqlCache = new Sql.Cache

protected def journalTable(slice: Int): String = settings.journalTableWithSchema(slice)

protected def insertEventWithParameterTimestampSql(slice: Int): String = {
val table = journalTable(slice)
val baseSql = insertEvenBaseSql(table)
if (settings.dbTimestampMonotonicIncreasing)
sql"$baseSql ?) RETURNING db_timestamp"
else
sql"$baseSql GREATEST(?, ${timestampSubSelect(table)})) RETURNING db_timestamp"
}
protected def insertEventWithParameterTimestampSql(slice: Int): String =
sqlCache.get(slice, "insertEventWithParameterTimestampSql") {
val table = journalTable(slice)
val baseSql = insertEvenBaseSql(table)
if (settings.dbTimestampMonotonicIncreasing)
sql"$baseSql ?) RETURNING db_timestamp"
else
sql"$baseSql GREATEST(?, ${timestampSubSelect(table)})) RETURNING db_timestamp"
}

private def insertEventWithTransactionTimestampSql(slice: Int) = {
val table = journalTable(slice)
val baseSql = insertEvenBaseSql(table)
if (settings.dbTimestampMonotonicIncreasing)
sql"$baseSql CURRENT_TIMESTAMP) RETURNING db_timestamp"
else
sql"$baseSql GREATEST(CURRENT_TIMESTAMP, ${timestampSubSelect(table)})) RETURNING db_timestamp"
}
private def insertEventWithTransactionTimestampSql(slice: Int) =
sqlCache.get(slice, "insertEventWithTransactionTimestampSql") {
val table = journalTable(slice)
val baseSql = insertEvenBaseSql(table)
if (settings.dbTimestampMonotonicIncreasing)
sql"$baseSql CURRENT_TIMESTAMP) RETURNING db_timestamp"
else
sql"$baseSql GREATEST(CURRENT_TIMESTAMP, ${timestampSubSelect(table)})) RETURNING db_timestamp"
}

private def insertEvenBaseSql(table: String) = {
s"INSERT INTO $table " +
Expand All @@ -103,31 +108,50 @@ private[r2dbc] class PostgresJournalDao(executorProvider: R2dbcExecutorProvider)
s"(SELECT db_timestamp + '1 microsecond'::interval FROM $table " +
"WHERE persistence_id = ? AND seq_nr = ?)"

private def selectHighestSequenceNrSql(slice: Int) = sql"""
SELECT MAX(seq_nr) from ${journalTable(slice)}
WHERE persistence_id = ? AND seq_nr >= ?"""
private def selectHighestSequenceNrSql(slice: Int) =
sqlCache.get(slice, "selectHighestSequenceNrSql") {
sql"""
SELECT MAX(seq_nr) from ${journalTable(slice)}
WHERE persistence_id = ? AND seq_nr >= ?"""
}

private def selectLowestSequenceNrSql(slice: Int) =
sql"""
SELECT MIN(seq_nr) from ${journalTable(slice)}
WHERE persistence_id = ?"""

private def deleteEventsSql(slice: Int) = sql"""
DELETE FROM ${journalTable(slice)}
WHERE persistence_id = ? AND seq_nr >= ? AND seq_nr <= ?"""

protected def insertDeleteMarkerSql(slice: Int, timestamp: String = "CURRENT_TIMESTAMP"): String = sql"""
INSERT INTO ${journalTable(slice)}
(slice, entity_type, persistence_id, seq_nr, db_timestamp, writer, adapter_manifest, event_ser_id, event_ser_manifest, event_payload, deleted)
VALUES (?, ?, ?, ?, $timestamp, ?, ?, ?, ?, ?, ?)"""

private def deleteEventsByPersistenceIdBeforeTimestampSql(slice: Int) = sql"""
DELETE FROM ${journalTable(slice)}
WHERE persistence_id = ? AND db_timestamp < ?"""

private def deleteEventsBySliceBeforeTimestampSql(slice: Int) = sql"""
DELETE FROM ${journalTable(slice)}
WHERE slice = ? AND entity_type = ? AND db_timestamp < ?"""
sqlCache.get(slice, "selectLowestSequenceNrSql") {
sql"""
SELECT MIN(seq_nr) from ${journalTable(slice)}
WHERE persistence_id = ?"""
}

private def deleteEventsSql(slice: Int) =
sqlCache.get(slice, "deleteEventsSql") {
sql"""
DELETE FROM ${journalTable(slice)}
WHERE persistence_id = ? AND seq_nr >= ? AND seq_nr <= ?"""
}

protected def insertDeleteMarkerSql(slice: Int, timestamp: String = "CURRENT_TIMESTAMP"): String = {
// timestamp param doesn't have to be part of cache key because it's just different for different dialects
sqlCache.get(slice, "insertDeleteMarkerSql") {
sql"""
INSERT INTO ${journalTable(slice)}
(slice, entity_type, persistence_id, seq_nr, db_timestamp, writer, adapter_manifest, event_ser_id, event_ser_manifest, event_payload, deleted)
VALUES (?, ?, ?, ?, $timestamp, ?, ?, ?, ?, ?, ?)"""
}
}

private def deleteEventsByPersistenceIdBeforeTimestampSql(slice: Int) =
sqlCache.get(slice, "deleteEventsByPersistenceIdBeforeTimestampSql") {
sql"""
DELETE FROM ${journalTable(slice)}
WHERE persistence_id = ? AND db_timestamp < ?"""
}

private def deleteEventsBySliceBeforeTimestampSql(slice: Int) =
sqlCache.get(slice, "deleteEventsBySliceBeforeTimestampSql") {
sql"""
DELETE FROM ${journalTable(slice)}
WHERE slice = ? AND entity_type = ? AND db_timestamp < ?"""
}

/**
* All events must be for the same persistenceId.
Expand Down

0 comments on commit a42a479

Please sign in to comment.