Skip to content

Commit

Permalink
extend from PostgresJournalDao and use positional index (zero-based) …
Browse files Browse the repository at this point in the history
…references for one query
  • Loading branch information
sebastian-alfers committed Dec 23, 2023
1 parent ae6d9fe commit 7b15676
Show file tree
Hide file tree
Showing 2 changed files with 59 additions and 49 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -294,14 +294,14 @@ private[r2dbc] class PostgresJournalDao(journalSettings: R2dbcSettings, connecti
result
}

private def highestSeqNrForDelete(persistenceId: String, toSequenceNr: Long): Future[Long] = {
protected def highestSeqNrForDelete(persistenceId: String, toSequenceNr: Long): Future[Long] = {
if (toSequenceNr == Long.MaxValue)
readHighestSequenceNr(persistenceId, 0L)
else
Future.successful(toSequenceNr)
}

private def lowestSequenceNrForDelete(persistenceId: String, toSeqNr: Long, batchSize: Int): Future[Long] = {
protected def lowestSequenceNrForDelete(persistenceId: String, toSeqNr: Long, batchSize: Int): Future[Long] = {
if (toSeqNr <= batchSize) {
Future.successful(1L)
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import akka.persistence.r2dbc.internal.PayloadCodec
import akka.persistence.r2dbc.internal.R2dbcExecutor
import akka.persistence.r2dbc.internal.SerializedEventMetadata
import akka.persistence.r2dbc.internal.Sql.Interpolation
import akka.persistence.r2dbc.internal.postgres.PostgresJournalDao
import akka.persistence.typed.PersistenceId
import io.r2dbc.spi.Connection
import io.r2dbc.spi.ConnectionFactory
Expand Down Expand Up @@ -61,7 +62,7 @@ private[r2dbc] object SqlServerJournalDao {
private[r2dbc] class SqlServerJournalDao(settings: R2dbcSettings, connectionFactory: ConnectionFactory)(implicit
ec: ExecutionContext,
system: ActorSystem[_])
extends JournalDao {
extends PostgresJournalDao(settings, connectionFactory) {

private val helper = SqlServerDialectHelper(settings.connectionFactorySettings.config)
import helper._
Expand All @@ -70,20 +71,29 @@ private[r2dbc] class SqlServerJournalDao(settings: R2dbcSettings, connectionFact
require(settings.useAppTimestamp, "SqlServer requires akka.persistence.r2dbc.db-timestamp-monotonic-increasing = off")

import JournalDao.SerializedJournalRow
protected def log: Logger = SqlServerJournalDao.log
override def log: Logger = SqlServerJournalDao.log

private val persistenceExt = Persistence(system)

protected val r2dbcExecutor =
new R2dbcExecutor(
connectionFactory,
log,
settings.logDbCallsExceeding,
settings.connectionFactorySettings.poolSettings.closeCallsExceeding)(ec, system)
// protected val r2dbcExecutor =
// new R2dbcExecutor(
// connectionFactory,
// log,
// settings.logDbCallsExceeding,
// settings.connectionFactorySettings.poolSettings.closeCallsExceeding)(ec, system)

protected val journalTable = settings.journalTableWithSchema
protected implicit val journalPayloadCodec: PayloadCodec = settings.journalPayloadCodec
//protected val journalTable = settings.journalTableWithSchema
//protected implicit val journalPayloadCodec: PayloadCodec = settings.journalPayloadCodec

/**
* VALUES (@slice, @entityType, @persistenceId, @seqNr, @writer, @adapterManifest, @eventSerId, @eventSerManifest,
* @eventPayload,
* @tags,
* @metaSerId,
* @metaSerManifest,
* @metaSerPayload,
* @dbTimestamp)
*/
private val insertEventWithParameterTimestampSql = sql"""
INSERT INTO $journalTable
(slice, entity_type, persistence_id, seq_nr, writer, adapter_manifest, event_ser_id, event_ser_manifest, event_payload, tags, meta_ser_id, meta_ser_manifest, meta_payload, db_timestamp)
Expand Down Expand Up @@ -117,43 +127,43 @@ private[r2dbc] class SqlServerJournalDao(settings: R2dbcSettings, connectionFact
* it can return `JournalDao.EmptyDbTimestamp` when the pub-sub feature is disabled. When enabled it would have to use
* a select (in same transaction).
*/
def writeEvents(events: Seq[SerializedJournalRow]): Future[Instant] = {
override def writeEvents(events: Seq[SerializedJournalRow]): Future[Instant] = {
require(events.nonEmpty)

// it's always the same persistenceId for all events
val persistenceId = events.head.persistenceId

def bind(stmt: Statement, write: SerializedJournalRow): Statement = {
stmt
.bind("@slice", write.slice)
.bind("@entityType", write.entityType)
.bind("@persistenceId", write.persistenceId)
.bind("@seqNr", write.seqNr)
.bind("@writer", write.writerUuid)
.bind("@adapterManifest", "") // FIXME event adapter
.bind("@eventSerId", write.serId)
.bind("@eventSerManifest", write.serManifest)
.bindPayload("@eventPayload", write.payload.get)
.bind(0, write.slice)
.bind(1, write.entityType)
.bind(2, write.persistenceId)
.bind(3, write.seqNr)
.bind(4, write.writerUuid)
.bind(5, "") // FIXME event adapter
.bind(6, write.serId)
.bind(7, write.serManifest)
.bindPayload(8, write.payload.get)

if (write.tags.isEmpty)
stmt.bindNull("@tags", classOf[String])
stmt.bindNull(9, classOf[String])
else
stmt.bind("@tags", tagsToDb(write.tags))
stmt.bind(9, tagsToDb(write.tags))

// optional metadata
write.metadata match {
case Some(m) =>
stmt
.bind("@metaSerId", m.serId)
.bind("@metaSerManifest", m.serManifest)
.bind("@metaSerPayload", m.payload)
.bind(10, m.serId)
.bind(11, m.serManifest)
.bind(12, m.payload)
case None =>
stmt
.bindNull("@metaSerId", classOf[Integer])
.bindNull("@metaSerManifest", classOf[String])
.bindNull("@metaSerPayload", classOf[Array[Byte]])
.bindNull(10, classOf[Integer])
.bindNull(11, classOf[String])
.bindNull(12, classOf[Array[Byte]])
}
stmt.bind("@dbTimestamp", toDbTimestamp(write.dbTimestamp))
stmt.bind(13, toDbTimestamp(write.dbTimestamp))
}

val insertSql = insertEventWithParameterTimestampSql
Expand Down Expand Up @@ -184,7 +194,7 @@ private[r2dbc] class SqlServerJournalDao(settings: R2dbcSettings, connectionFact
}
}

def readHighestSequenceNr(persistenceId: String, fromSequenceNr: Long): Future[Long] = {
override def readHighestSequenceNr(persistenceId: String, fromSequenceNr: Long): Future[Long] = {
val result = r2dbcExecutor
.select(s"select highest seqNr [$persistenceId]")(
connection =>
Expand All @@ -204,7 +214,7 @@ private[r2dbc] class SqlServerJournalDao(settings: R2dbcSettings, connectionFact
result
}

def readLowestSequenceNr(persistenceId: String): Future[Long] = {
override def readLowestSequenceNr(persistenceId: String): Future[Long] = {
val result = r2dbcExecutor
.select(s"select lowest seqNr [$persistenceId]")(
connection =>
Expand All @@ -223,22 +233,22 @@ private[r2dbc] class SqlServerJournalDao(settings: R2dbcSettings, connectionFact
result
}

private def highestSeqNrForDelete(persistenceId: String, toSequenceNr: Long): Future[Long] = {
if (toSequenceNr == Long.MaxValue)
readHighestSequenceNr(persistenceId, 0L)
else
Future.successful(toSequenceNr)
}

private def lowestSequenceNrForDelete(persistenceId: String, toSeqNr: Long, batchSize: Int): Future[Long] = {
if (toSeqNr <= batchSize) {
Future.successful(1L)
} else {
readLowestSequenceNr(persistenceId)
}
}

def deleteEventsTo(persistenceId: String, toSequenceNr: Long, resetSequenceNumber: Boolean): Future[Unit] = {
// private def highestSeqNrForDelete(persistenceId: String, toSequenceNr: Long): Future[Long] = {
// if (toSequenceNr == Long.MaxValue)
// readHighestSequenceNr(persistenceId, 0L)
// else
// Future.successful(toSequenceNr)
// }

// private def lowestSequenceNrForDelete(persistenceId: String, toSeqNr: Long, batchSize: Int): Future[Long] = {
// if (toSeqNr <= batchSize) {
// Future.successful(1L)
// } else {
// readLowestSequenceNr(persistenceId)
// }
// }

override def deleteEventsTo(persistenceId: String, toSequenceNr: Long, resetSequenceNumber: Boolean): Future[Unit] = {

def insertDeleteMarkerStmt(deleteMarkerSeqNr: Long, connection: Connection): Statement = {
val entityType = PersistenceId.extractEntityType(persistenceId)
Expand Down

0 comments on commit 7b15676

Please sign in to comment.