From e67c6fd191d65284512aee34a5e7c23cfe89e0ea Mon Sep 17 00:00:00 2001 From: Patrik Nordwall Date: Thu, 7 Dec 2023 15:20:15 +0100 Subject: [PATCH 1/6] feat: Write change event of DurableState to event journal --- .../r2dbc/internal/DurableStateDao.scala | 6 +- .../r2dbc/internal/JournalDao.scala | 8 +- .../r2dbc/internal/h2/H2Dialect.scala | 4 +- .../r2dbc/internal/h2/H2DurableStateDao.scala | 12 +- .../internal/postgres/PostgresDialect.scala | 4 +- .../postgres/PostgresDurableStateDao.scala | 103 +++++++------ .../postgres/PostgresJournalDao.scala | 145 +++++++++++------- .../internal/postgres/YugabyteDialect.scala | 4 +- .../postgres/YugabyteDurableStateDao.scala | 13 +- .../scaladsl/R2dbcDurableStateStore.scala | 64 +++++++- 10 files changed, 237 insertions(+), 126 deletions(-) diff --git a/core/src/main/scala/akka/persistence/r2dbc/internal/DurableStateDao.scala b/core/src/main/scala/akka/persistence/r2dbc/internal/DurableStateDao.scala index ba3cbdd2..63427e8c 100644 --- a/core/src/main/scala/akka/persistence/r2dbc/internal/DurableStateDao.scala +++ b/core/src/main/scala/akka/persistence/r2dbc/internal/DurableStateDao.scala @@ -8,10 +8,12 @@ import akka.Done import akka.NotUsed import akka.annotation.InternalApi import akka.stream.scaladsl.Source - import java.time.Instant + import scala.concurrent.Future +import akka.persistence.r2dbc.internal.JournalDao.SerializedJournalRow + /** * INTERNAL API */ @@ -48,7 +50,7 @@ private[r2dbc] trait DurableStateDao extends BySliceQuery.Dao[DurableStateDao.Se def readState(persistenceId: String): Future[Option[SerializedStateRow]] - def upsertState(state: SerializedStateRow, value: Any): Future[Done] + def upsertState(state: SerializedStateRow, value: Any, changeEvent: Option[SerializedJournalRow]): Future[Done] def deleteState(persistenceId: String, revision: Long): Future[Done] diff --git a/core/src/main/scala/akka/persistence/r2dbc/internal/JournalDao.scala b/core/src/main/scala/akka/persistence/r2dbc/internal/JournalDao.scala index 22b046ee..ac64692f 100644 --- a/core/src/main/scala/akka/persistence/r2dbc/internal/JournalDao.scala +++ b/core/src/main/scala/akka/persistence/r2dbc/internal/JournalDao.scala @@ -5,10 +5,13 @@ package akka.persistence.r2dbc.internal import akka.annotation.InternalApi - import java.time.Instant + import scala.concurrent.Future +import akka.persistence.r2dbc.internal.JournalDao.SerializedJournalRow +import io.r2dbc.spi.Connection + /** * INTERNAL API */ @@ -56,6 +59,9 @@ private[r2dbc] trait JournalDao { * a select (in same transaction). */ def writeEvents(events: Seq[JournalDao.SerializedJournalRow]): Future[Instant] + + def writeEventInTx(event: SerializedJournalRow, connection: Connection): Future[Instant] + def readHighestSequenceNr(persistenceId: String, fromSequenceNr: Long): Future[Long] def readLowestSequenceNr(persistenceId: String): Future[Long] diff --git a/core/src/main/scala/akka/persistence/r2dbc/internal/h2/H2Dialect.scala b/core/src/main/scala/akka/persistence/r2dbc/internal/h2/H2Dialect.scala index d13e6e4a..fbe219cf 100644 --- a/core/src/main/scala/akka/persistence/r2dbc/internal/h2/H2Dialect.scala +++ b/core/src/main/scala/akka/persistence/r2dbc/internal/h2/H2Dialect.scala @@ -94,7 +94,9 @@ private[r2dbc] object H2Dialect extends Dialect { override def createDurableStateDao(settings: R2dbcSettings, connectionFactory: ConnectionFactory)(implicit system: ActorSystem[_]): DurableStateDao = - new H2DurableStateDao(settings, connectionFactory)(ecForDaos(system, settings), system) + new H2DurableStateDao(settings, connectionFactory, createJournalDao(settings, connectionFactory))( + ecForDaos(system, settings), + system) private def ecForDaos(system: ActorSystem[_], settings: R2dbcSettings): ExecutionContext = { // H2 R2DBC driver blocks in surprising places (Mono.toFuture in stmt.execute().asFuture()) diff --git a/core/src/main/scala/akka/persistence/r2dbc/internal/h2/H2DurableStateDao.scala b/core/src/main/scala/akka/persistence/r2dbc/internal/h2/H2DurableStateDao.scala index c3f42d5a..180ef355 100644 --- a/core/src/main/scala/akka/persistence/r2dbc/internal/h2/H2DurableStateDao.scala +++ b/core/src/main/scala/akka/persistence/r2dbc/internal/h2/H2DurableStateDao.scala @@ -11,19 +11,21 @@ import akka.persistence.r2dbc.internal.postgres.PostgresDurableStateDao import io.r2dbc.spi.ConnectionFactory import org.slf4j.Logger import org.slf4j.LoggerFactory - import scala.concurrent.ExecutionContext import scala.concurrent.duration.Duration import scala.concurrent.duration.FiniteDuration +import akka.persistence.r2dbc.internal.JournalDao + /** * INTERNAL API */ @InternalApi -private[r2dbc] final class H2DurableStateDao(settings: R2dbcSettings, connectionFactory: ConnectionFactory)(implicit - ec: ExecutionContext, - system: ActorSystem[_]) - extends PostgresDurableStateDao(settings, connectionFactory) { +private[r2dbc] final class H2DurableStateDao( + settings: R2dbcSettings, + connectionFactory: ConnectionFactory, + journalDao: JournalDao)(implicit ec: ExecutionContext, system: ActorSystem[_]) + extends PostgresDurableStateDao(settings, connectionFactory, journalDao) { override protected lazy val log: Logger = LoggerFactory.getLogger(classOf[H2DurableStateDao]) diff --git a/core/src/main/scala/akka/persistence/r2dbc/internal/postgres/PostgresDialect.scala b/core/src/main/scala/akka/persistence/r2dbc/internal/postgres/PostgresDialect.scala index 87ddc81a..9c9362dd 100644 --- a/core/src/main/scala/akka/persistence/r2dbc/internal/postgres/PostgresDialect.scala +++ b/core/src/main/scala/akka/persistence/r2dbc/internal/postgres/PostgresDialect.scala @@ -129,5 +129,7 @@ private[r2dbc] object PostgresDialect extends Dialect { override def createDurableStateDao(settings: R2dbcSettings, connectionFactory: ConnectionFactory)(implicit system: ActorSystem[_]): DurableStateDao = - new PostgresDurableStateDao(settings, connectionFactory)(system.executionContext, system) + new PostgresDurableStateDao(settings, connectionFactory, createJournalDao(settings, connectionFactory))( + system.executionContext, + system) } diff --git a/core/src/main/scala/akka/persistence/r2dbc/internal/postgres/PostgresDurableStateDao.scala b/core/src/main/scala/akka/persistence/r2dbc/internal/postgres/PostgresDurableStateDao.scala index 7cca3f15..7a18ecb4 100644 --- a/core/src/main/scala/akka/persistence/r2dbc/internal/postgres/PostgresDurableStateDao.scala +++ b/core/src/main/scala/akka/persistence/r2dbc/internal/postgres/PostgresDurableStateDao.scala @@ -40,10 +40,10 @@ import io.r2dbc.spi.Row import io.r2dbc.spi.Statement import org.slf4j.Logger import org.slf4j.LoggerFactory - import java.lang import java.time.Instant import java.util + import scala.collection.immutable import scala.concurrent.ExecutionContext import scala.concurrent.Future @@ -51,6 +51,9 @@ import scala.concurrent.duration.Duration import scala.concurrent.duration.FiniteDuration import scala.util.control.NonFatal +import akka.persistence.r2dbc.internal.JournalDao +import akka.persistence.r2dbc.internal.JournalDao.SerializedJournalRow + /** * INTERNAL API */ @@ -70,9 +73,10 @@ private[r2dbc] object PostgresDurableStateDao { * INTERNAL API */ @InternalApi -private[r2dbc] class PostgresDurableStateDao(settings: R2dbcSettings, connectionFactory: ConnectionFactory)(implicit - ec: ExecutionContext, - system: ActorSystem[_]) +private[r2dbc] class PostgresDurableStateDao( + settings: R2dbcSettings, + connectionFactory: ConnectionFactory, + journalDao: JournalDao)(implicit ec: ExecutionContext, system: ActorSystem[_]) extends DurableStateDao { import DurableStateDao._ import PostgresDurableStateDao._ @@ -264,7 +268,7 @@ private[r2dbc] class PostgresDurableStateDao(settings: R2dbcSettings, connection LIMIT ?""" } - def readState(persistenceId: String): Future[Option[SerializedStateRow]] = { + override def readState(persistenceId: String): Future[Option[SerializedStateRow]] = { val entityType = PersistenceId.extractEntityType(persistenceId) r2dbcExecutor.selectOne(s"select [$persistenceId]")( connection => @@ -293,7 +297,25 @@ private[r2dbc] class PostgresDurableStateDao(settings: R2dbcSettings, connection Option(rowPayload) } - def upsertState(state: SerializedStateRow, value: Any): Future[Done] = { + private def writeChangeEventAndCallChangeHander( + connection: Connection, + updatedRows: Long, + entityType: String, + change: DurableStateChange[Any], + changeEvent: Option[SerializedJournalRow]): Future[Done] = { + if (updatedRows == 1) + for { + _ <- changeEvent.map(journalDao.writeEventInTx(_, connection)).getOrElse(FutureDone) + _ <- changeHandlers.get(entityType).map(processChange(_, connection, change)).getOrElse(FutureDone) + } yield Done + else + FutureDone + } + + override def upsertState( + state: SerializedStateRow, + value: Any, + changeEvent: Option[SerializedJournalRow]): Future[Done] = { require(state.revision > 0) def bindTags(stmt: Statement, i: Int): Statement = { @@ -360,17 +382,15 @@ private[r2dbc] class PostgresDurableStateDao(settings: R2dbcSettings, connection s"Insert failed: durable state for persistence id [${state.persistenceId}] already exists")) } - changeHandlers.get(entityType) match { - case None => - recoverDataIntegrityViolation(r2dbcExecutor.updateOne(s"insert [${state.persistenceId}]")(insertStatement)) - case Some(handler) => - r2dbcExecutor.withConnection(s"insert [${state.persistenceId}] with change handler") { connection => - for { - updatedRows <- recoverDataIntegrityViolation(R2dbcExecutor.updateOneInTx(insertStatement(connection))) - _ <- if (updatedRows == 1) processChange(handler, connection, change) else FutureDone - } yield updatedRows - } - } + if (!changeHandlers.contains(entityType) && changeEvent.isEmpty) + recoverDataIntegrityViolation(r2dbcExecutor.updateOne(s"insert [${state.persistenceId}]")(insertStatement)) + else + r2dbcExecutor.withConnection(s"insert [${state.persistenceId}]") { connection => + for { + updatedRows <- recoverDataIntegrityViolation(R2dbcExecutor.updateOneInTx(insertStatement(connection))) + _ <- writeChangeEventAndCallChangeHander(connection, updatedRows, entityType, change, changeEvent = None) + } yield updatedRows + } } else { val previousRevision = state.revision - 1 @@ -405,17 +425,15 @@ private[r2dbc] class PostgresDurableStateDao(settings: R2dbcSettings, connection } } - changeHandlers.get(entityType) match { - case None => - r2dbcExecutor.updateOne(s"update [${state.persistenceId}]")(updateStatement) - case Some(handler) => - r2dbcExecutor.withConnection(s"update [${state.persistenceId}] with change handler") { connection => - for { - updatedRows <- R2dbcExecutor.updateOneInTx(updateStatement(connection)) - _ <- if (updatedRows == 1) processChange(handler, connection, change) else FutureDone - } yield updatedRows - } - } + if (!changeHandlers.contains(entityType) && changeEvent.isEmpty) + r2dbcExecutor.updateOne(s"update [${state.persistenceId}]")(updateStatement) + else + r2dbcExecutor.withConnection(s"update [${state.persistenceId}]") { connection => + for { + updatedRows <- R2dbcExecutor.updateOneInTx(updateStatement(connection)) + _ <- writeChangeEventAndCallChangeHander(connection, updatedRows, entityType, change, changeEvent = None) + } yield updatedRows + } } } @@ -451,7 +469,7 @@ private[r2dbc] class PostgresDurableStateDao(settings: R2dbcSettings, connection } } - def deleteState(persistenceId: String, revision: Long): Future[Done] = { + override def deleteState(persistenceId: String, revision: Long): Future[Done] = { if (revision == 0) { hardDeleteState(persistenceId) } else { @@ -490,10 +508,7 @@ private[r2dbc] class PostgresDurableStateDao(settings: R2dbcSettings, connection for { updatedRows <- recoverDataIntegrityViolation( R2dbcExecutor.updateOneInTx(insertDeleteMarkerStatement(connection))) - _ <- changeHandler match { - case None => FutureDone - case Some(handler) => if (updatedRows == 1) processChange(handler, connection, change) else FutureDone - } + _ <- writeChangeEventAndCallChangeHander(connection, updatedRows, entityType, change, changeEvent = None) } yield updatedRows } @@ -537,10 +552,7 @@ private[r2dbc] class PostgresDurableStateDao(settings: R2dbcSettings, connection r2dbcExecutor.withConnection(s"delete [$persistenceId]$changeHandlerHint") { connection => for { updatedRows <- R2dbcExecutor.updateOneInTx(updateStatement(connection)) - _ <- changeHandler match { - case None => FutureDone - case Some(handler) => if (updatedRows == 1) processChange(handler, connection, change) else FutureDone - } + _ <- writeChangeEventAndCallChangeHander(connection, updatedRows, entityType, change, changeEvent = None) } yield updatedRows } } @@ -572,14 +584,9 @@ private[r2dbc] class PostgresDurableStateDao(settings: R2dbcSettings, connection connection .createStatement(hardDeleteStateSql(entityType)) .bind(0, persistenceId)) - _ <- changeHandler match { - case None => FutureDone - case Some(handler) => - if (updatedRows == 1) { - val change = new DeletedDurableState[Any](persistenceId, 0L, NoOffset, EmptyDbTimestamp.toEpochMilli) - processChange(handler, connection, change) - } else - FutureDone + _ <- { + val change = new DeletedDurableState[Any](persistenceId, 0L, NoOffset, EmptyDbTimestamp.toEpochMilli) + writeChangeEventAndCallChangeHander(connection, updatedRows, entityType, change, changeEvent = None) } } yield updatedRows } @@ -669,7 +676,7 @@ private[r2dbc] class PostgresDurableStateDao(settings: R2dbcSettings, connection Source.futureSource(result.map(Source(_))).mapMaterializedValue(_ => NotUsed) } - def persistenceIds(afterId: Option[String], limit: Long): Source[String, NotUsed] = { + override def persistenceIds(afterId: Option[String], limit: Long): Source[String, NotUsed] = { if (settings.durableStateTableByEntityTypeWithSchema.isEmpty) persistenceIds(afterId, limit, settings.durableStateTableWithSchema) else { @@ -699,7 +706,7 @@ private[r2dbc] class PostgresDurableStateDao(settings: R2dbcSettings, connection } } - def persistenceIds(afterId: Option[String], limit: Long, table: String): Source[String, NotUsed] = { + override def persistenceIds(afterId: Option[String], limit: Long, table: String): Source[String, NotUsed] = { val result = readPersistenceIds(afterId, limit, table) Source.futureSource(result.map(Source(_))).mapMaterializedValue(_ => NotUsed) @@ -729,7 +736,7 @@ private[r2dbc] class PostgresDurableStateDao(settings: R2dbcSettings, connection result } - def persistenceIds(entityType: String, afterId: Option[String], limit: Long): Source[String, NotUsed] = { + override def persistenceIds(entityType: String, afterId: Option[String], limit: Long): Source[String, NotUsed] = { val table = settings.getDurableStateTableWithSchema(entityType) val likeStmtPostfix = PersistenceId.DefaultSeparator + "%" val result = r2dbcExecutor.select(s"select persistenceIds by entity type")( diff --git a/core/src/main/scala/akka/persistence/r2dbc/internal/postgres/PostgresJournalDao.scala b/core/src/main/scala/akka/persistence/r2dbc/internal/postgres/PostgresJournalDao.scala index f3f0a7ad..6af05f85 100644 --- a/core/src/main/scala/akka/persistence/r2dbc/internal/postgres/PostgresJournalDao.scala +++ b/core/src/main/scala/akka/persistence/r2dbc/internal/postgres/PostgresJournalDao.scala @@ -24,8 +24,8 @@ import io.r2dbc.spi.Row import io.r2dbc.spi.Statement import org.slf4j.Logger import org.slf4j.LoggerFactory - import java.time.Instant + import scala.concurrent.ExecutionContext import scala.concurrent.Future @@ -133,7 +133,7 @@ private[r2dbc] class PostgresJournalDao(journalSettings: R2dbcSettings, connecti * it can return `JournalDao.EmptyDbTimestamp` when the pub-sub feature is disabled. When enabled it would have to use * a select (in same transaction). */ - def writeEvents(events: Seq[SerializedJournalRow]): Future[Instant] = { + override def writeEvents(events: Seq[SerializedJournalRow]): Future[Instant] = { require(events.nonEmpty) // it's always the same persistenceId for all events @@ -143,56 +143,6 @@ private[r2dbc] class PostgresJournalDao(journalSettings: R2dbcSettings, connecti // The MigrationTool defines the dbTimestamp to preserve the original event timestamp val useTimestampFromDb = events.head.dbTimestamp == Instant.EPOCH - def bind(stmt: Statement, write: SerializedJournalRow): Statement = { - stmt - .bind(0, write.slice) - .bind(1, write.entityType) - .bind(2, write.persistenceId) - .bind(3, write.seqNr) - .bind(4, write.writerUuid) - .bind(5, "") // FIXME event adapter - .bind(6, write.serId) - .bind(7, write.serManifest) - .bindPayload(8, write.payload.get) - - if (write.tags.isEmpty) - stmt.bindNull(9, classOf[Array[String]]) - else - stmt.bind(9, write.tags.toArray) - - // optional metadata - write.metadata match { - case Some(m) => - stmt - .bind(10, m.serId) - .bind(11, m.serManifest) - .bind(12, m.payload) - case None => - stmt - .bindNull(10, classOf[Integer]) - .bindNull(11, classOf[String]) - .bindNull(12, classOf[Array[Byte]]) - } - - if (useTimestampFromDb) { - if (!journalSettings.dbTimestampMonotonicIncreasing) - stmt - .bind(13, write.persistenceId) - .bind(14, previousSeqNr) - } else { - if (journalSettings.dbTimestampMonotonicIncreasing) - stmt - .bind(13, write.dbTimestamp) - else - stmt - .bind(13, write.dbTimestamp) - .bind(14, write.persistenceId) - .bind(15, previousSeqNr) - } - - stmt - } - val insertSql = if (useTimestampFromDb) insertEventWithTransactionTimestampSql else insertEventWithParameterTimestampSql @@ -200,11 +150,12 @@ private[r2dbc] class PostgresJournalDao(journalSettings: R2dbcSettings, connecti val totalEvents = events.size if (totalEvents == 1) { val result = r2dbcExecutor.updateOneReturning(s"insert [$persistenceId]")( - connection => bind(connection.createStatement(insertSql), events.head), + connection => + bindInsertStatement(connection.createStatement(insertSql), events.head, useTimestampFromDb, previousSeqNr), row => row.get(0, classOf[Instant])) if (log.isDebugEnabled()) result.foreach { _ => - log.debug("Wrote [{}] events for persistenceId [{}]", 1, events.head.persistenceId) + log.debug("Wrote [{}] events for persistenceId [{}]", 1, persistenceId) } result } else { @@ -212,18 +163,94 @@ private[r2dbc] class PostgresJournalDao(journalSettings: R2dbcSettings, connecti connection => events.foldLeft(connection.createStatement(insertSql)) { (stmt, write) => stmt.add() - bind(stmt, write) + bindInsertStatement(stmt, write, useTimestampFromDb, previousSeqNr) }, row => row.get(0, classOf[Instant])) if (log.isDebugEnabled()) result.foreach { _ => - log.debug("Wrote [{}] events for persistenceId [{}]", totalEvents, events.head.persistenceId) + log.debug("Wrote [{}] events for persistenceId [{}]", totalEvents, persistenceId) } result.map(_.head)(ExecutionContexts.parasitic) } } - def readHighestSequenceNr(persistenceId: String, fromSequenceNr: Long): Future[Long] = { + override def writeEventInTx(event: SerializedJournalRow, connection: Connection): Future[Instant] = { + val persistenceId = event.persistenceId + val previousSeqNr = event.seqNr - 1 + + // The MigrationTool defines the dbTimestamp to preserve the original event timestamp + val useTimestampFromDb = event.dbTimestamp == Instant.EPOCH + + val insertSql = + if (useTimestampFromDb) insertEventWithTransactionTimestampSql + else insertEventWithParameterTimestampSql + + val result = r2dbcExecutor.updateOneReturning(s"insert [$persistenceId]")( + connection => + bindInsertStatement(connection.createStatement(insertSql), event, useTimestampFromDb, previousSeqNr), + row => row.get(0, classOf[Instant])) + if (log.isDebugEnabled()) + result.foreach { _ => + log.debug("Wrote [{}] event for persistenceId [{}]", 1, persistenceId) + } + result + } + + private def bindInsertStatement( + stmt: Statement, + write: SerializedJournalRow, + useTimestampFromDb: Boolean, + previousSeqNr: Long): Statement = { + stmt + .bind(0, write.slice) + .bind(1, write.entityType) + .bind(2, write.persistenceId) + .bind(3, write.seqNr) + .bind(4, write.writerUuid) + .bind(5, "") // FIXME event adapter + .bind(6, write.serId) + .bind(7, write.serManifest) + .bindPayload(8, write.payload.get) + + if (write.tags.isEmpty) + stmt.bindNull(9, classOf[Array[String]]) + else + stmt.bind(9, write.tags.toArray) + + // optional metadata + write.metadata match { + case Some(m) => + stmt + .bind(10, m.serId) + .bind(11, m.serManifest) + .bind(12, m.payload) + case None => + stmt + .bindNull(10, classOf[Integer]) + .bindNull(11, classOf[String]) + .bindNull(12, classOf[Array[Byte]]) + } + + if (useTimestampFromDb) { + if (!journalSettings.dbTimestampMonotonicIncreasing) + stmt + .bind(13, write.persistenceId) + .bind(14, previousSeqNr) + } else { + if (journalSettings.dbTimestampMonotonicIncreasing) + stmt + .bind(13, write.dbTimestamp) + else + stmt + .bind(13, write.dbTimestamp) + .bind(14, write.persistenceId) + .bind(15, previousSeqNr) + } + + stmt + } + + override def readHighestSequenceNr(persistenceId: String, fromSequenceNr: Long): Future[Long] = { val result = r2dbcExecutor .select(s"select highest seqNr [$persistenceId]")( connection => @@ -243,7 +270,7 @@ private[r2dbc] class PostgresJournalDao(journalSettings: R2dbcSettings, connecti result } - def readLowestSequenceNr(persistenceId: String): Future[Long] = { + override def readLowestSequenceNr(persistenceId: String): Future[Long] = { val result = r2dbcExecutor .select(s"select lowest seqNr [$persistenceId]")( connection => @@ -277,7 +304,7 @@ private[r2dbc] class PostgresJournalDao(journalSettings: R2dbcSettings, connecti } } - def deleteEventsTo(persistenceId: String, toSequenceNr: Long, resetSequenceNumber: Boolean): Future[Unit] = { + override def deleteEventsTo(persistenceId: String, toSequenceNr: Long, resetSequenceNumber: Boolean): Future[Unit] = { def insertDeleteMarkerStmt(deleteMarkerSeqNr: Long, connection: Connection): Statement = { val entityType = PersistenceId.extractEntityType(persistenceId) diff --git a/core/src/main/scala/akka/persistence/r2dbc/internal/postgres/YugabyteDialect.scala b/core/src/main/scala/akka/persistence/r2dbc/internal/postgres/YugabyteDialect.scala index 2d7a6a7d..27b273dd 100644 --- a/core/src/main/scala/akka/persistence/r2dbc/internal/postgres/YugabyteDialect.scala +++ b/core/src/main/scala/akka/persistence/r2dbc/internal/postgres/YugabyteDialect.scala @@ -42,5 +42,7 @@ private[r2dbc] object YugabyteDialect extends Dialect { override def createDurableStateDao(settings: R2dbcSettings, connectionFactory: ConnectionFactory)(implicit system: ActorSystem[_]): DurableStateDao = - new YugabyteDurableStateDao(settings, connectionFactory)(system.executionContext, system) + new YugabyteDurableStateDao(settings, connectionFactory, createJournalDao(settings, connectionFactory))( + system.executionContext, + system) } diff --git a/core/src/main/scala/akka/persistence/r2dbc/internal/postgres/YugabyteDurableStateDao.scala b/core/src/main/scala/akka/persistence/r2dbc/internal/postgres/YugabyteDurableStateDao.scala index 9f7f4051..eabfaf9f 100644 --- a/core/src/main/scala/akka/persistence/r2dbc/internal/postgres/YugabyteDurableStateDao.scala +++ b/core/src/main/scala/akka/persistence/r2dbc/internal/postgres/YugabyteDurableStateDao.scala @@ -10,18 +10,19 @@ import akka.persistence.r2dbc.R2dbcSettings import io.r2dbc.spi._ import org.slf4j.Logger import org.slf4j.LoggerFactory - import scala.concurrent.ExecutionContext +import akka.persistence.r2dbc.internal.JournalDao + /** * INTERNAL API */ @InternalApi -private[r2dbc] final class YugabyteDurableStateDao(settings: R2dbcSettings, connectionFactory: ConnectionFactory)( - implicit - ec: ExecutionContext, - system: ActorSystem[_]) - extends PostgresDurableStateDao(settings, connectionFactory) { +private[r2dbc] final class YugabyteDurableStateDao( + settings: R2dbcSettings, + connectionFactory: ConnectionFactory, + journalDao: JournalDao)(implicit ec: ExecutionContext, system: ActorSystem[_]) + extends PostgresDurableStateDao(settings, connectionFactory, journalDao) { override protected lazy val log: Logger = LoggerFactory.getLogger(classOf[YugabyteDurableStateDao]) diff --git a/core/src/main/scala/akka/persistence/r2dbc/state/scaladsl/R2dbcDurableStateStore.scala b/core/src/main/scala/akka/persistence/r2dbc/state/scaladsl/R2dbcDurableStateStore.scala index 4e42eae8..e70b2d12 100644 --- a/core/src/main/scala/akka/persistence/r2dbc/state/scaladsl/R2dbcDurableStateStore.scala +++ b/core/src/main/scala/akka/persistence/r2dbc/state/scaladsl/R2dbcDurableStateStore.scala @@ -7,12 +7,14 @@ package akka.persistence.r2dbc.state.scaladsl import scala.collection.immutable import scala.concurrent.ExecutionContext import scala.concurrent.Future + import akka.Done import akka.NotUsed import akka.actor.ExtendedActorSystem import akka.actor.typed.scaladsl.LoggerOps import akka.actor.typed.scaladsl.adapter._ import akka.persistence.Persistence +import akka.persistence.SerializedEvent import akka.persistence.query.DeletedDurableState import akka.persistence.query.DurableStateChange import akka.persistence.query.Offset @@ -26,8 +28,12 @@ import akka.persistence.r2dbc.internal.BySliceQuery import akka.persistence.r2dbc.internal.ContinuousQuery import akka.persistence.r2dbc.internal.DurableStateDao import akka.persistence.r2dbc.internal.DurableStateDao.SerializedStateRow +import akka.persistence.r2dbc.internal.InstantFactory +import akka.persistence.r2dbc.internal.JournalDao +import akka.persistence.r2dbc.internal.JournalDao.SerializedJournalRow import akka.persistence.state.scaladsl.DurableStateUpdateStore import akka.persistence.state.scaladsl.GetObjectResult +import akka.persistence.typed.PersistenceId import akka.serialization.SerializationExtension import akka.serialization.Serializers import akka.stream.scaladsl.Source @@ -53,6 +59,7 @@ class R2dbcDurableStateStore[A](system: ExtendedActorSystem, config: Config, cfg private val log = LoggerFactory.getLogger(getClass) private val sharedConfigPath = cfgPath.replaceAll("""\.state$""", "") private val settings = R2dbcSettings(system.settings.config.getConfig(sharedConfigPath)) + private val journalSettings = R2dbcSettings(system.settings.config.getConfig(sharedConfigPath)) log.debug("R2DBC journal starting up with dialect [{}]", settings.dialectName) private val typedSystem = system.toTyped @@ -109,7 +116,26 @@ class R2dbcDurableStateStore[A](system: ExtendedActorSystem, config: Config, cfg * the existing stored `revision` + 1 isn't equal to the given `revision`. This optimistic locking check can be * disabled with configuration `assert-single-writer`. */ - override def upsertObject(persistenceId: String, revision: Long, value: A, tag: String): Future[Done] = { + override def upsertObject(persistenceId: String, revision: Long, value: A, tag: String): Future[Done] = + upsertObject(persistenceId, revision, value, tag, changeEvent = None) + + /** + * Insert the value if `revision` is 1, which will fail with `IllegalStateException` if there is already a stored + * value for the given `persistenceId`. Otherwise update the value, which will fail with `IllegalStateException` if + * the existing stored `revision` + 1 isn't equal to the given `revision`. This optimistic locking check can be + * disabled with configuration `assert-single-writer`. + * + * The `changeEvent`, if defined, is written to the event journal in the same transaction as the DurableState upsert. + * Same `persistenceId` is used in the journal and the `revision` is used as `sequenceNr`. + */ + def upsertObject( + persistenceId: String, + revision: Long, + value: A, + tag: String, + changeEvent: Option[Any]): Future[Done] = { + // FIXME add new trait in Akka for this method. Maybe we need it for the deletes too. + val valueAnyRef = value.asInstanceOf[AnyRef] val serialized = serialization.serialize(valueAnyRef).get val serializer = serialization.findSerializerFor(valueAnyRef) @@ -125,7 +151,41 @@ class R2dbcDurableStateStore[A](system: ExtendedActorSystem, config: Config, cfg manifest, if (tag.isEmpty) Set.empty else Set(tag)) - stateDao.upsertState(serializedRow, value) + val serializedChangedEvent: Option[SerializedJournalRow] = { + changeEvent.map { event => + val eventAnyRef = event.asInstanceOf[AnyRef] + val serializedEvent = eventAnyRef match { + case s: SerializedEvent => s // already serialized + case _ => + val bytes = serialization.serialize(eventAnyRef).get + val serializer = serialization.findSerializerFor(eventAnyRef) + val manifest = Serializers.manifestFor(serializer, eventAnyRef) + new SerializedEvent(bytes, serializer.identifier, manifest) + } + + val entityType = PersistenceId.extractEntityType(persistenceId) + val slice = persistenceExt.sliceForPersistenceId(persistenceId) + val timestamp = if (journalSettings.useAppTimestamp) InstantFactory.now() else JournalDao.EmptyDbTimestamp + + SerializedJournalRow( + slice, + entityType, + persistenceId, + revision, + timestamp, + JournalDao.EmptyDbTimestamp, + Some(serializedEvent.bytes), + serializedEvent.serializerId, + serializedEvent.serializerManifest, + "", // FIXME writerUuid, or shall we make one? + if (tag.isEmpty) Set.empty else Set(tag), + metadata = None) + } + } + + stateDao.upsertState(serializedRow, value, serializedChangedEvent) + + // FIXME PubSub, but not via PersistentRepr } From 5e30ce8c59f04d5d304764842feb0c018cdcd864 Mon Sep 17 00:00:00 2001 From: Patrik Nordwall Date: Tue, 12 Dec 2023 09:15:55 +0100 Subject: [PATCH 2/6] impl DurableStateUpdateWithChangeEventStore trait --- .../scaladsl/DurableStateCleanup.scala | 4 +- .../r2dbc/internal/DurableStateDao.scala | 2 +- .../postgres/PostgresDurableStateDao.scala | 19 ++- .../javadsl/R2dbcDurableStateStore.scala | 19 ++- .../scaladsl/R2dbcDurableStateStore.scala | 113 +++++++++++------- project/Dependencies.scala | 2 +- 6 files changed, 98 insertions(+), 61 deletions(-) diff --git a/core/src/main/scala/akka/persistence/r2dbc/cleanup/scaladsl/DurableStateCleanup.scala b/core/src/main/scala/akka/persistence/r2dbc/cleanup/scaladsl/DurableStateCleanup.scala index 90a70c23..b5fd7dcf 100644 --- a/core/src/main/scala/akka/persistence/r2dbc/cleanup/scaladsl/DurableStateCleanup.scala +++ b/core/src/main/scala/akka/persistence/r2dbc/cleanup/scaladsl/DurableStateCleanup.scala @@ -65,11 +65,11 @@ final class DurableStateCleanup(systemProvider: ClassicActorSystemProvider, conf */ def deleteState(persistenceId: String, resetRevisionNumber: Boolean): Future[Done] = { if (resetRevisionNumber) - stateDao.deleteState(persistenceId, revision = 0L) // hard delete without revision check + stateDao.deleteState(persistenceId, revision = 0L, changeEvent = None) // hard delete without revision check else { stateDao.readState(persistenceId).flatMap { case None => Future.successful(Done) // already deleted - case Some(s) => stateDao.deleteState(persistenceId, s.revision + 1) + case Some(s) => stateDao.deleteState(persistenceId, s.revision + 1, changeEvent = None) } } } diff --git a/core/src/main/scala/akka/persistence/r2dbc/internal/DurableStateDao.scala b/core/src/main/scala/akka/persistence/r2dbc/internal/DurableStateDao.scala index 63427e8c..0b6f4892 100644 --- a/core/src/main/scala/akka/persistence/r2dbc/internal/DurableStateDao.scala +++ b/core/src/main/scala/akka/persistence/r2dbc/internal/DurableStateDao.scala @@ -52,7 +52,7 @@ private[r2dbc] trait DurableStateDao extends BySliceQuery.Dao[DurableStateDao.Se def upsertState(state: SerializedStateRow, value: Any, changeEvent: Option[SerializedJournalRow]): Future[Done] - def deleteState(persistenceId: String, revision: Long): Future[Done] + def deleteState(persistenceId: String, revision: Long, changeEvent: Option[SerializedJournalRow]): Future[Done] def persistenceIds(afterId: Option[String], limit: Long): Source[String, NotUsed] diff --git a/core/src/main/scala/akka/persistence/r2dbc/internal/postgres/PostgresDurableStateDao.scala b/core/src/main/scala/akka/persistence/r2dbc/internal/postgres/PostgresDurableStateDao.scala index 7a18ecb4..1131a5da 100644 --- a/core/src/main/scala/akka/persistence/r2dbc/internal/postgres/PostgresDurableStateDao.scala +++ b/core/src/main/scala/akka/persistence/r2dbc/internal/postgres/PostgresDurableStateDao.scala @@ -469,7 +469,10 @@ private[r2dbc] class PostgresDurableStateDao( } } - override def deleteState(persistenceId: String, revision: Long): Future[Done] = { + override def deleteState( + persistenceId: String, + revision: Long, + changeEvent: Option[SerializedJournalRow]): Future[Done] = { if (revision == 0) { hardDeleteState(persistenceId) } else { @@ -501,14 +504,11 @@ private[r2dbc] class PostgresDurableStateDao( s"Insert delete marker with revision 1 failed: durable state for persistence id [$persistenceId] already exists")) } - val changeHandler = changeHandlers.get(entityType) - val changeHandlerHint = changeHandler.map(_ => " with change handler").getOrElse("") - - r2dbcExecutor.withConnection(s"insert delete marker [$persistenceId]$changeHandlerHint") { connection => + r2dbcExecutor.withConnection(s"insert delete marker [$persistenceId]") { connection => for { updatedRows <- recoverDataIntegrityViolation( R2dbcExecutor.updateOneInTx(insertDeleteMarkerStatement(connection))) - _ <- writeChangeEventAndCallChangeHander(connection, updatedRows, entityType, change, changeEvent = None) + _ <- writeChangeEventAndCallChangeHander(connection, updatedRows, entityType, change, changeEvent) } yield updatedRows } @@ -546,13 +546,10 @@ private[r2dbc] class PostgresDurableStateDao( } } - val changeHandler = changeHandlers.get(entityType) - val changeHandlerHint = changeHandler.map(_ => " with change handler").getOrElse("") - - r2dbcExecutor.withConnection(s"delete [$persistenceId]$changeHandlerHint") { connection => + r2dbcExecutor.withConnection(s"delete [$persistenceId]") { connection => for { updatedRows <- R2dbcExecutor.updateOneInTx(updateStatement(connection)) - _ <- writeChangeEventAndCallChangeHander(connection, updatedRows, entityType, change, changeEvent = None) + _ <- writeChangeEventAndCallChangeHander(connection, updatedRows, entityType, change, changeEvent) } yield updatedRows } } diff --git a/core/src/main/scala/akka/persistence/r2dbc/state/javadsl/R2dbcDurableStateStore.scala b/core/src/main/scala/akka/persistence/r2dbc/state/javadsl/R2dbcDurableStateStore.scala index c0550d17..40b3f20d 100644 --- a/core/src/main/scala/akka/persistence/r2dbc/state/javadsl/R2dbcDurableStateStore.scala +++ b/core/src/main/scala/akka/persistence/r2dbc/state/javadsl/R2dbcDurableStateStore.scala @@ -7,7 +7,9 @@ package akka.persistence.r2dbc.state.javadsl import java.util import java.util.Optional import java.util.concurrent.CompletionStage + import scala.concurrent.ExecutionContext + import akka.Done import akka.NotUsed import akka.japi.Pair @@ -16,18 +18,18 @@ import akka.persistence.query.Offset import akka.persistence.query.javadsl.DurableStateStorePagedPersistenceIdsQuery import akka.persistence.query.typed.javadsl.DurableStateStoreBySliceQuery import akka.persistence.r2dbc.state.scaladsl.{ R2dbcDurableStateStore => ScalaR2dbcDurableStateStore } -import akka.persistence.state.javadsl.DurableStateUpdateStore import akka.persistence.state.javadsl.GetObjectResult import akka.stream.javadsl.Source - import scala.compat.java8.FutureConverters.FutureOps +import akka.persistence.state.javadsl.DurableStateUpdateWithChangeEventStore + object R2dbcDurableStateStore { val Identifier: String = ScalaR2dbcDurableStateStore.Identifier } class R2dbcDurableStateStore[A](scalaStore: ScalaR2dbcDurableStateStore[A])(implicit ec: ExecutionContext) - extends DurableStateUpdateStore[A] + extends DurableStateUpdateWithChangeEventStore[A] with DurableStateStoreBySliceQuery[A] with DurableStateStorePagedPersistenceIdsQuery[A] { @@ -40,6 +42,14 @@ class R2dbcDurableStateStore[A](scalaStore: ScalaR2dbcDurableStateStore[A])(impl override def upsertObject(persistenceId: String, revision: Long, value: A, tag: String): CompletionStage[Done] = scalaStore.upsertObject(persistenceId, revision, value, tag).toJava + override def upsertObject( + persistenceId: String, + revision: Long, + value: A, + tag: String, + changeEvent: Any): CompletionStage[Done] = + scalaStore.upsertObject(persistenceId, revision, value, tag, changeEvent).toJava + @deprecated(message = "Use the deleteObject overload with revision instead.", since = "1.0.0") override def deleteObject(persistenceId: String): CompletionStage[Done] = deleteObject(persistenceId, revision = 0) @@ -47,6 +57,9 @@ class R2dbcDurableStateStore[A](scalaStore: ScalaR2dbcDurableStateStore[A])(impl override def deleteObject(persistenceId: String, revision: Long): CompletionStage[Done] = scalaStore.deleteObject(persistenceId, revision).toJava + override def deleteObject(persistenceId: String, revision: Long, changeEvent: Any): CompletionStage[Done] = + scalaStore.deleteObject(persistenceId, revision, changeEvent).toJava + override def currentChangesBySlices( entityType: String, minSlice: Int, diff --git a/core/src/main/scala/akka/persistence/r2dbc/state/scaladsl/R2dbcDurableStateStore.scala b/core/src/main/scala/akka/persistence/r2dbc/state/scaladsl/R2dbcDurableStateStore.scala index e70b2d12..7980a950 100644 --- a/core/src/main/scala/akka/persistence/r2dbc/state/scaladsl/R2dbcDurableStateStore.scala +++ b/core/src/main/scala/akka/persistence/r2dbc/state/scaladsl/R2dbcDurableStateStore.scala @@ -4,6 +4,8 @@ package akka.persistence.r2dbc.state.scaladsl +import java.util.UUID + import scala.collection.immutable import scala.concurrent.ExecutionContext import scala.concurrent.Future @@ -31,7 +33,7 @@ import akka.persistence.r2dbc.internal.DurableStateDao.SerializedStateRow import akka.persistence.r2dbc.internal.InstantFactory import akka.persistence.r2dbc.internal.JournalDao import akka.persistence.r2dbc.internal.JournalDao.SerializedJournalRow -import akka.persistence.state.scaladsl.DurableStateUpdateStore +import akka.persistence.state.scaladsl.DurableStateUpdateWithChangeEventStore import akka.persistence.state.scaladsl.GetObjectResult import akka.persistence.typed.PersistenceId import akka.serialization.SerializationExtension @@ -51,7 +53,7 @@ object R2dbcDurableStateStore { } class R2dbcDurableStateStore[A](system: ExtendedActorSystem, config: Config, cfgPath: String) - extends DurableStateUpdateStore[A] + extends DurableStateUpdateWithChangeEventStore[A] with DurableStateStoreBySliceQuery[A] with DurableStateStorePagedPersistenceIdsQuery[A] { import R2dbcDurableStateStore.PersistenceIdsQueryState @@ -69,6 +71,7 @@ class R2dbcDurableStateStore[A](system: ExtendedActorSystem, config: Config, cfg settings, ConnectionFactoryProvider(typedSystem) .connectionFactoryFor(sharedConfigPath + ".connection-factory"))(typedSystem) + private val changeEventWriterUuid = UUID.randomUUID().toString private val bySlice: BySliceQuery[SerializedStateRow, DurableStateChange[A]] = { val createEnvelope: (TimestampOffset, SerializedStateRow) => DurableStateChange[A] = (offset, row) => { @@ -117,7 +120,7 @@ class R2dbcDurableStateStore[A](system: ExtendedActorSystem, config: Config, cfg * disabled with configuration `assert-single-writer`. */ override def upsertObject(persistenceId: String, revision: Long, value: A, tag: String): Future[Done] = - upsertObject(persistenceId, revision, value, tag, changeEvent = None) + internalUpsertObject(persistenceId, revision, value, tag, changeEvent = None) /** * Insert the value if `revision` is 1, which will fail with `IllegalStateException` if there is already a stored @@ -125,16 +128,23 @@ class R2dbcDurableStateStore[A](system: ExtendedActorSystem, config: Config, cfg * the existing stored `revision` + 1 isn't equal to the given `revision`. This optimistic locking check can be * disabled with configuration `assert-single-writer`. * - * The `changeEvent`, if defined, is written to the event journal in the same transaction as the DurableState upsert. - * Same `persistenceId` is used in the journal and the `revision` is used as `sequenceNr`. + * The `changeEvent` is written to the event journal in the same transaction as the DurableState upsert. Same + * `persistenceId` is used in the journal and the `revision` is used as `sequenceNr`. */ - def upsertObject( + override def upsertObject( + persistenceId: String, + revision: Long, + value: A, + tag: String, + changeEvent: Any): Future[Done] = + internalUpsertObject(persistenceId, revision, value, tag, changeEvent = Some(changeEvent)) + + private def internalUpsertObject( persistenceId: String, revision: Long, value: A, tag: String, changeEvent: Option[Any]): Future[Done] = { - // FIXME add new trait in Akka for this method. Maybe we need it for the deletes too. val valueAnyRef = value.asInstanceOf[AnyRef] val serialized = serialization.serialize(valueAnyRef).get @@ -151,44 +161,44 @@ class R2dbcDurableStateStore[A](system: ExtendedActorSystem, config: Config, cfg manifest, if (tag.isEmpty) Set.empty else Set(tag)) - val serializedChangedEvent: Option[SerializedJournalRow] = { - changeEvent.map { event => - val eventAnyRef = event.asInstanceOf[AnyRef] - val serializedEvent = eventAnyRef match { - case s: SerializedEvent => s // already serialized - case _ => - val bytes = serialization.serialize(eventAnyRef).get - val serializer = serialization.findSerializerFor(eventAnyRef) - val manifest = Serializers.manifestFor(serializer, eventAnyRef) - new SerializedEvent(bytes, serializer.identifier, manifest) - } - - val entityType = PersistenceId.extractEntityType(persistenceId) - val slice = persistenceExt.sliceForPersistenceId(persistenceId) - val timestamp = if (journalSettings.useAppTimestamp) InstantFactory.now() else JournalDao.EmptyDbTimestamp - - SerializedJournalRow( - slice, - entityType, - persistenceId, - revision, - timestamp, - JournalDao.EmptyDbTimestamp, - Some(serializedEvent.bytes), - serializedEvent.serializerId, - serializedEvent.serializerManifest, - "", // FIXME writerUuid, or shall we make one? - if (tag.isEmpty) Set.empty else Set(tag), - metadata = None) - } - } - - stateDao.upsertState(serializedRow, value, serializedChangedEvent) + stateDao.upsertState(serializedRow, value, serializedChangeEvent(persistenceId, revision, tag, changeEvent)) // FIXME PubSub, but not via PersistentRepr } + private def serializedChangeEvent(persistenceId: String, revision: Long, tag: String, changeEvent: Option[Any]) = { + changeEvent.map { event => + val eventAnyRef = event.asInstanceOf[AnyRef] + val serializedEvent = eventAnyRef match { + case s: SerializedEvent => s // already serialized + case _ => + val bytes = serialization.serialize(eventAnyRef).get + val serializer = serialization.findSerializerFor(eventAnyRef) + val manifest = Serializers.manifestFor(serializer, eventAnyRef) + new SerializedEvent(bytes, serializer.identifier, manifest) + } + + val entityType = PersistenceId.extractEntityType(persistenceId) + val slice = persistenceExt.sliceForPersistenceId(persistenceId) + val timestamp = if (journalSettings.useAppTimestamp) InstantFactory.now() else JournalDao.EmptyDbTimestamp + + SerializedJournalRow( + slice, + entityType, + persistenceId, + revision, + timestamp, + JournalDao.EmptyDbTimestamp, + Some(serializedEvent.bytes), + serializedEvent.serializerId, + serializedEvent.serializerManifest, + changeEventWriterUuid, + if (tag.isEmpty) Set.empty else Set(tag), + metadata = None) + } + } + @deprecated(message = "Use the deleteObject overload with revision instead.", since = "1.0.0") override def deleteObject(persistenceId: String): Future[Done] = deleteObject(persistenceId, revision = 0) @@ -202,9 +212,26 @@ class R2dbcDurableStateStore[A](system: ExtendedActorSystem, config: Config, cfg * If the given revision is `0` it will fully delete the value and revision from the database without any optimistic * locking check. Next call to [[getObject]] will then return revision 0 and no value. */ - override def deleteObject(persistenceId: String, revision: Long): Future[Done] = { - stateDao.deleteState(persistenceId, revision) - } + override def deleteObject(persistenceId: String, revision: Long): Future[Done] = + internalDeleteObject(persistenceId, revision, changeEvent = None) + + /** + * Delete the value, which will fail with `IllegalStateException` if the existing stored `revision` + 1 isn't equal to + * the given `revision`. This optimistic locking check can be disabled with configuration `assert-single-writer`. The + * stored revision for the persistenceId is updated and next call to [[getObject]] will return the revision, but with + * no value. + * + * If the given revision is `0` it will fully delete the value and revision from the database without any optimistic + * locking check. Next call to [[getObject]] will then return revision 0 and no value. + * + * The `changeEvent` is written to the event journal in the same transaction as the DurableState upsert. Same + * `persistenceId` is used in the journal and the `revision` is used as `sequenceNr`. + */ + override def deleteObject(persistenceId: String, revision: Long, changeEvent: Any): Future[Done] = + internalDeleteObject(persistenceId, revision, changeEvent = Some(changeEvent)) + + private def internalDeleteObject(persistenceId: String, revision: Long, changeEvent: Option[Any]): Future[Done] = + stateDao.deleteState(persistenceId, revision, serializedChangeEvent(persistenceId, revision, tag = "", changeEvent)) override def sliceForPersistenceId(persistenceId: String): Int = persistenceExt.sliceForPersistenceId(persistenceId) diff --git a/project/Dependencies.scala b/project/Dependencies.scala index 011f4338..a4df00f6 100644 --- a/project/Dependencies.scala +++ b/project/Dependencies.scala @@ -9,7 +9,7 @@ object Dependencies { val Scala3 = "3.3.1" val Scala2Versions = Seq(Scala213) val ScalaVersions = Dependencies.Scala2Versions :+ Dependencies.Scala3 - val AkkaVersion = System.getProperty("override.akka.version", "2.9.0") + val AkkaVersion = System.getProperty("override.akka.version", "2.9.1-M1+13-fde4109f-SNAPSHOT") val AkkaVersionInDocs = AkkaVersion.take(3) val AkkaPersistenceJdbcVersion = "5.2.0" // only in migration tool tests val AkkaProjectionVersionInDocs = "current" From 405c448520b89dc8e1b19c65c1e6117f80b31181 Mon Sep 17 00:00:00 2001 From: Patrik Nordwall Date: Tue, 12 Dec 2023 16:01:20 +0100 Subject: [PATCH 3/6] test and publish --- .../scaladsl/DurableStateCleanup.scala | 13 +- .../r2dbc/internal/DurableStateDao.scala | 10 +- .../persistence/r2dbc/internal/PubSub.scala | 66 +++--- .../postgres/PostgresDurableStateDao.scala | 79 +++++--- .../scaladsl/R2dbcDurableStateStore.scala | 64 +++++- ...eStateUpdateWithChangeEventStoreSpec.scala | 188 ++++++++++++++++++ 6 files changed, 360 insertions(+), 60 deletions(-) create mode 100644 core/src/test/scala/akka/persistence/r2dbc/state/DurableStateUpdateWithChangeEventStoreSpec.scala diff --git a/core/src/main/scala/akka/persistence/r2dbc/cleanup/scaladsl/DurableStateCleanup.scala b/core/src/main/scala/akka/persistence/r2dbc/cleanup/scaladsl/DurableStateCleanup.scala index b5fd7dcf..ab39e4e6 100644 --- a/core/src/main/scala/akka/persistence/r2dbc/cleanup/scaladsl/DurableStateCleanup.scala +++ b/core/src/main/scala/akka/persistence/r2dbc/cleanup/scaladsl/DurableStateCleanup.scala @@ -15,6 +15,7 @@ import akka.actor.typed.ActorSystem import akka.actor.typed.scaladsl.LoggerOps import akka.annotation.ApiMayChange import akka.annotation.InternalApi +import akka.dispatch.ExecutionContexts import akka.persistence.r2dbc.ConnectionFactoryProvider import akka.persistence.r2dbc.R2dbcSettings import akka.persistence.r2dbc.internal.DurableStateDao @@ -65,11 +66,17 @@ final class DurableStateCleanup(systemProvider: ClassicActorSystemProvider, conf */ def deleteState(persistenceId: String, resetRevisionNumber: Boolean): Future[Done] = { if (resetRevisionNumber) - stateDao.deleteState(persistenceId, revision = 0L, changeEvent = None) // hard delete without revision check + stateDao + .deleteState(persistenceId, revision = 0L, changeEvent = None) // hard delete without revision check + .map(_ => Done)(ExecutionContexts.parasitic) else { stateDao.readState(persistenceId).flatMap { - case None => Future.successful(Done) // already deleted - case Some(s) => stateDao.deleteState(persistenceId, s.revision + 1, changeEvent = None) + case None => + Future.successful(Done) // already deleted + case Some(s) => + stateDao + .deleteState(persistenceId, s.revision + 1, changeEvent = None) + .map(_ => Done)(ExecutionContexts.parasitic) } } } diff --git a/core/src/main/scala/akka/persistence/r2dbc/internal/DurableStateDao.scala b/core/src/main/scala/akka/persistence/r2dbc/internal/DurableStateDao.scala index 0b6f4892..e09eb1c6 100644 --- a/core/src/main/scala/akka/persistence/r2dbc/internal/DurableStateDao.scala +++ b/core/src/main/scala/akka/persistence/r2dbc/internal/DurableStateDao.scala @@ -50,9 +50,15 @@ private[r2dbc] trait DurableStateDao extends BySliceQuery.Dao[DurableStateDao.Se def readState(persistenceId: String): Future[Option[SerializedStateRow]] - def upsertState(state: SerializedStateRow, value: Any, changeEvent: Option[SerializedJournalRow]): Future[Done] + def upsertState( + state: SerializedStateRow, + value: Any, + changeEvent: Option[SerializedJournalRow]): Future[Option[Instant]] - def deleteState(persistenceId: String, revision: Long, changeEvent: Option[SerializedJournalRow]): Future[Done] + def deleteState( + persistenceId: String, + revision: Long, + changeEvent: Option[SerializedJournalRow]): Future[Option[Instant]] def persistenceIds(afterId: Option[String], limit: Long): Source[String, NotUsed] diff --git a/core/src/main/scala/akka/persistence/r2dbc/internal/PubSub.scala b/core/src/main/scala/akka/persistence/r2dbc/internal/PubSub.scala index 7c3e5fc1..a5f1c85e 100644 --- a/core/src/main/scala/akka/persistence/r2dbc/internal/PubSub.scala +++ b/core/src/main/scala/akka/persistence/r2dbc/internal/PubSub.scala @@ -91,29 +91,7 @@ import org.slf4j.LoggerFactory } def publish(pr: PersistentRepr, timestamp: Instant): Unit = { - - val n = throughputCounter.incrementAndGet() - if (n % throughputSampler == 0) { - val ewma = throughput - val durationMillis = (System.nanoTime() - ewma.nanoTime) / 1000 / 1000 - if (durationMillis >= throughputCollectIntervalMillis) { - // doesn't have to be exact so "missed" or duplicate concurrent calls don't matter - throughputCounter.set(0L) - val rps = n * 1000.0 / durationMillis - val newEwma = ewma :+ rps - throughput = newEwma - if (ewma.value < throughputThreshold && newEwma.value >= throughputThreshold) { - log.info("Disabled publishing of events. Throughput greater than [{}] events/s", throughputThreshold) - } else if (ewma.value >= throughputThreshold && newEwma.value < throughputThreshold) { - log.info("Enabled publishing of events. Throughput less than [{}] events/s", throughputThreshold) - } else { - log.debug( - "Publishing of events is {}. Throughput is [{}] events/s", - if (newEwma.value < throughputThreshold) "enabled" else "disabled", - newEwma.value) - } - } - } + updateThroughput() if (throughput.value < throughputThreshold) { val pid = pr.persistenceId @@ -143,7 +121,47 @@ import org.slf4j.LoggerFactory filtered, source = EnvelopeOrigin.SourcePubSub, tags) - eventTopic(entityType, slice) ! Topic.Publish(envelope) + + publishToTopic(envelope) + } + } + + def publish(envelope: EventEnvelope[Any]): Unit = { + updateThroughput() + + if (throughput.value < throughputThreshold) + publishToTopic(envelope) + } + + private def publishToTopic(envelope: EventEnvelope[Any]): Unit = { + val entityType = PersistenceId.extractEntityType(envelope.persistenceId) + val slice = persistenceExt.sliceForPersistenceId(envelope.persistenceId) + + eventTopic(entityType, slice) ! Topic.Publish(envelope) + } + + private def updateThroughput(): Unit = { + val n = throughputCounter.incrementAndGet() + if (n % throughputSampler == 0) { + val ewma = throughput + val durationMillis = (System.nanoTime() - ewma.nanoTime) / 1000 / 1000 + if (durationMillis >= throughputCollectIntervalMillis) { + // doesn't have to be exact so "missed" or duplicate concurrent calls don't matter + throughputCounter.set(0L) + val rps = n * 1000.0 / durationMillis + val newEwma = ewma :+ rps + throughput = newEwma + if (ewma.value < throughputThreshold && newEwma.value >= throughputThreshold) { + log.info("Disabled publishing of events. Throughput greater than [{}] events/s", throughputThreshold) + } else if (ewma.value >= throughputThreshold && newEwma.value < throughputThreshold) { + log.info("Enabled publishing of events. Throughput less than [{}] events/s", throughputThreshold) + } else { + log.debug( + "Publishing of events is {}. Throughput is [{}] events/s", + if (newEwma.value < throughputThreshold) "enabled" else "disabled", + newEwma.value) + } + } } } } diff --git a/core/src/main/scala/akka/persistence/r2dbc/internal/postgres/PostgresDurableStateDao.scala b/core/src/main/scala/akka/persistence/r2dbc/internal/postgres/PostgresDurableStateDao.scala index 1131a5da..10fe105e 100644 --- a/core/src/main/scala/akka/persistence/r2dbc/internal/postgres/PostgresDurableStateDao.scala +++ b/core/src/main/scala/akka/persistence/r2dbc/internal/postgres/PostgresDurableStateDao.scala @@ -67,6 +67,7 @@ private[r2dbc] object PostgresDurableStateDao { binding: AdditionalColumn.Binding[_]) val FutureDone: Future[Done] = Future.successful(Done) + val FutureInstantNone: Future[Option[Instant]] = Future.successful(None) } /** @@ -302,20 +303,22 @@ private[r2dbc] class PostgresDurableStateDao( updatedRows: Long, entityType: String, change: DurableStateChange[Any], - changeEvent: Option[SerializedJournalRow]): Future[Done] = { + changeEvent: Option[SerializedJournalRow]): Future[Option[Instant]] = { if (updatedRows == 1) for { - _ <- changeEvent.map(journalDao.writeEventInTx(_, connection)).getOrElse(FutureDone) + changeEventTimestamp <- changeEvent + .map(journalDao.writeEventInTx(_, connection).map(Some(_))) + .getOrElse(FutureInstantNone) _ <- changeHandlers.get(entityType).map(processChange(_, connection, change)).getOrElse(FutureDone) - } yield Done + } yield changeEventTimestamp else - FutureDone + FutureInstantNone } override def upsertState( state: SerializedStateRow, value: Any, - changeEvent: Option[SerializedJournalRow]): Future[Done] = { + changeEvent: Option[SerializedJournalRow]): Future[Option[Instant]] = { require(state.revision > 0) def bindTags(stmt: Statement, i: Int): Statement = { @@ -349,7 +352,7 @@ private[r2dbc] class PostgresDurableStateDao( val entityType = PersistenceId.extractEntityType(state.persistenceId) - val result = { + val result: Future[(Long, Option[Instant])] = { val additionalBindings = additionalColumns.get(entityType) match { case None => Vector.empty[EvaluatedAdditionalColumnBindings] case Some(columns) => @@ -382,14 +385,21 @@ private[r2dbc] class PostgresDurableStateDao( s"Insert failed: durable state for persistence id [${state.persistenceId}] already exists")) } - if (!changeHandlers.contains(entityType) && changeEvent.isEmpty) - recoverDataIntegrityViolation(r2dbcExecutor.updateOne(s"insert [${state.persistenceId}]")(insertStatement)) - else + if (!changeHandlers.contains(entityType) && changeEvent.isEmpty) { + val updatedRows = recoverDataIntegrityViolation( + r2dbcExecutor.updateOne(s"insert [${state.persistenceId}]")(insertStatement)) + updatedRows.map(_ -> None) + } else r2dbcExecutor.withConnection(s"insert [${state.persistenceId}]") { connection => for { updatedRows <- recoverDataIntegrityViolation(R2dbcExecutor.updateOneInTx(insertStatement(connection))) - _ <- writeChangeEventAndCallChangeHander(connection, updatedRows, entityType, change, changeEvent = None) - } yield updatedRows + changeEventTimestamp <- writeChangeEventAndCallChangeHander( + connection, + updatedRows, + entityType, + change, + changeEvent) + } yield (updatedRows, changeEventTimestamp) } } else { val previousRevision = state.revision - 1 @@ -425,25 +435,31 @@ private[r2dbc] class PostgresDurableStateDao( } } - if (!changeHandlers.contains(entityType) && changeEvent.isEmpty) - r2dbcExecutor.updateOne(s"update [${state.persistenceId}]")(updateStatement) - else + if (!changeHandlers.contains(entityType) && changeEvent.isEmpty) { + val updatedRows = r2dbcExecutor.updateOne(s"update [${state.persistenceId}]")(updateStatement) + updatedRows.map(_ -> None) + } else r2dbcExecutor.withConnection(s"update [${state.persistenceId}]") { connection => for { updatedRows <- R2dbcExecutor.updateOneInTx(updateStatement(connection)) - _ <- writeChangeEventAndCallChangeHander(connection, updatedRows, entityType, change, changeEvent = None) - } yield updatedRows + changeEventTimestamp <- writeChangeEventAndCallChangeHander( + connection, + updatedRows, + entityType, + change, + changeEvent) + } yield (updatedRows, changeEventTimestamp) } } } - result.map { updatedRows => + result.map { case (updatedRows, changeEventTimestamp) => if (updatedRows != 1) throw new IllegalStateException( s"Update failed: durable state for persistence id [${state.persistenceId}] could not be updated to revision [${state.revision}]") else { log.debug("Updated durable state for persistenceId [{}] to revision [{}]", state.persistenceId, state.revision) - Done + changeEventTimestamp } } } @@ -472,11 +488,12 @@ private[r2dbc] class PostgresDurableStateDao( override def deleteState( persistenceId: String, revision: Long, - changeEvent: Option[SerializedJournalRow]): Future[Done] = { + changeEvent: Option[SerializedJournalRow]): Future[Option[Instant]] = { if (revision == 0) { hardDeleteState(persistenceId) + .map(_ => None)(ExecutionContexts.parasitic) } else { - val result = { + val result: Future[(Long, Option[Instant])] = { val entityType = PersistenceId.extractEntityType(persistenceId) def change = new DeletedDurableState[Any](persistenceId, revision, NoOffset, EmptyDbTimestamp.toEpochMilli) @@ -508,8 +525,13 @@ private[r2dbc] class PostgresDurableStateDao( for { updatedRows <- recoverDataIntegrityViolation( R2dbcExecutor.updateOneInTx(insertDeleteMarkerStatement(connection))) - _ <- writeChangeEventAndCallChangeHander(connection, updatedRows, entityType, change, changeEvent) - } yield updatedRows + changeEventTimestamp <- writeChangeEventAndCallChangeHander( + connection, + updatedRows, + entityType, + change, + changeEvent) + } yield (updatedRows, changeEventTimestamp) } } else { @@ -549,19 +571,24 @@ private[r2dbc] class PostgresDurableStateDao( r2dbcExecutor.withConnection(s"delete [$persistenceId]") { connection => for { updatedRows <- R2dbcExecutor.updateOneInTx(updateStatement(connection)) - _ <- writeChangeEventAndCallChangeHander(connection, updatedRows, entityType, change, changeEvent) - } yield updatedRows + changeEventTimestamp <- writeChangeEventAndCallChangeHander( + connection, + updatedRows, + entityType, + change, + changeEvent) + } yield (updatedRows, changeEventTimestamp) } } } - result.map { updatedRows => + result.map { case (updatedRows, changeEventTimestamp) => if (updatedRows != 1) throw new IllegalStateException( s"Delete failed: durable state for persistence id [$persistenceId] could not be updated to revision [$revision]") else { log.debug("Deleted durable state for persistenceId [{}] to revision [{}]", persistenceId, revision) - Done + changeEventTimestamp } } diff --git a/core/src/main/scala/akka/persistence/r2dbc/state/scaladsl/R2dbcDurableStateStore.scala b/core/src/main/scala/akka/persistence/r2dbc/state/scaladsl/R2dbcDurableStateStore.scala index 7980a950..be7656e5 100644 --- a/core/src/main/scala/akka/persistence/r2dbc/state/scaladsl/R2dbcDurableStateStore.scala +++ b/core/src/main/scala/akka/persistence/r2dbc/state/scaladsl/R2dbcDurableStateStore.scala @@ -4,6 +4,7 @@ package akka.persistence.r2dbc.state.scaladsl +import java.time.Instant import java.util.UUID import scala.collection.immutable @@ -23,6 +24,7 @@ import akka.persistence.query.Offset import akka.persistence.query.TimestampOffset import akka.persistence.query.UpdatedDurableState import akka.persistence.query.scaladsl.DurableStateStorePagedPersistenceIdsQuery +import akka.persistence.query.typed.EventEnvelope import akka.persistence.query.typed.scaladsl.DurableStateStoreBySliceQuery import akka.persistence.r2dbc.ConnectionFactoryProvider import akka.persistence.r2dbc.R2dbcSettings @@ -30,9 +32,11 @@ import akka.persistence.r2dbc.internal.BySliceQuery import akka.persistence.r2dbc.internal.ContinuousQuery import akka.persistence.r2dbc.internal.DurableStateDao import akka.persistence.r2dbc.internal.DurableStateDao.SerializedStateRow +import akka.persistence.r2dbc.internal.EnvelopeOrigin import akka.persistence.r2dbc.internal.InstantFactory import akka.persistence.r2dbc.internal.JournalDao import akka.persistence.r2dbc.internal.JournalDao.SerializedJournalRow +import akka.persistence.r2dbc.internal.PubSub import akka.persistence.state.scaladsl.DurableStateUpdateWithChangeEventStore import akka.persistence.state.scaladsl.GetObjectResult import akka.persistence.typed.PersistenceId @@ -73,6 +77,10 @@ class R2dbcDurableStateStore[A](system: ExtendedActorSystem, config: Config, cfg .connectionFactoryFor(sharedConfigPath + ".connection-factory"))(typedSystem) private val changeEventWriterUuid = UUID.randomUUID().toString + private val pubSub: Option[PubSub] = + if (journalSettings.journalPublishEvents) Some(PubSub(typedSystem)) + else None + private val bySlice: BySliceQuery[SerializedStateRow, DurableStateChange[A]] = { val createEnvelope: (TimestampOffset, SerializedStateRow) => DurableStateChange[A] = (offset, row) => { row.payload match { @@ -151,6 +159,7 @@ class R2dbcDurableStateStore[A](system: ExtendedActorSystem, config: Config, cfg val serializer = serialization.findSerializerFor(valueAnyRef) val manifest = Serializers.manifestFor(serializer, valueAnyRef) + val tags = if (tag.isEmpty) Set.empty[String] else Set(tag) val serializedRow = SerializedStateRow( persistenceId, revision, @@ -159,12 +168,47 @@ class R2dbcDurableStateStore[A](system: ExtendedActorSystem, config: Config, cfg Some(serialized), serializer.identifier, manifest, - if (tag.isEmpty) Set.empty else Set(tag)) + tags) + + val changeEventTimestamp = + stateDao.upsertState(serializedRow, value, serializedChangeEvent(persistenceId, revision, tag, changeEvent)) + + import typedSystem.executionContext + changeEventTimestamp.map { timestampOption => + publish(persistenceId, revision, changeEvent, timestampOption, tags) + Done + } + } - stateDao.upsertState(serializedRow, value, serializedChangeEvent(persistenceId, revision, tag, changeEvent)) + private def publish( + persistenceId: String, + revision: Long, + changeEvent: Option[Any], + changeEventTimestamp: Option[Instant], + tags: Set[String]): Unit = { + for { + timestamp <- changeEventTimestamp + event <- changeEvent + p <- pubSub + } yield { + val entityType = PersistenceId.extractEntityType(persistenceId) + val slice = persistenceExt.sliceForPersistenceId(persistenceId) - // FIXME PubSub, but not via PersistentRepr + val offset = TimestampOffset(timestamp, timestamp, Map(persistenceId -> revision)) + val envelope = EventEnvelope( + offset, + persistenceId, + revision, + event, + timestamp.toEpochMilli, + entityType, + slice, + filtered = false, + source = EnvelopeOrigin.SourcePubSub, + tags) + p.publish(envelope) + } } private def serializedChangeEvent(persistenceId: String, revision: Long, tag: String, changeEvent: Option[Any]) = { @@ -230,8 +274,18 @@ class R2dbcDurableStateStore[A](system: ExtendedActorSystem, config: Config, cfg override def deleteObject(persistenceId: String, revision: Long, changeEvent: Any): Future[Done] = internalDeleteObject(persistenceId, revision, changeEvent = Some(changeEvent)) - private def internalDeleteObject(persistenceId: String, revision: Long, changeEvent: Option[Any]): Future[Done] = - stateDao.deleteState(persistenceId, revision, serializedChangeEvent(persistenceId, revision, tag = "", changeEvent)) + private def internalDeleteObject(persistenceId: String, revision: Long, changeEvent: Option[Any]): Future[Done] = { + val changeEventTimestamp = stateDao.deleteState( + persistenceId, + revision, + serializedChangeEvent(persistenceId, revision, tag = "", changeEvent)) + + import typedSystem.executionContext + changeEventTimestamp.map { timestampOption => + publish(persistenceId, revision, changeEvent, timestampOption, tags = Set.empty) + Done + } + } override def sliceForPersistenceId(persistenceId: String): Int = persistenceExt.sliceForPersistenceId(persistenceId) diff --git a/core/src/test/scala/akka/persistence/r2dbc/state/DurableStateUpdateWithChangeEventStoreSpec.scala b/core/src/test/scala/akka/persistence/r2dbc/state/DurableStateUpdateWithChangeEventStoreSpec.scala new file mode 100644 index 00000000..205c7f1b --- /dev/null +++ b/core/src/test/scala/akka/persistence/r2dbc/state/DurableStateUpdateWithChangeEventStoreSpec.scala @@ -0,0 +1,188 @@ +/* + * Copyright (C) 2022 - 2023 Lightbend Inc. + */ + +package akka.persistence.r2dbc.state + +import akka.actor.testkit.typed.scaladsl.LogCapturing +import akka.actor.testkit.typed.scaladsl.ScalaTestWithActorTestKit +import akka.actor.typed.ActorSystem +import akka.actor.typed.pubsub.Topic +import akka.persistence.query.PersistenceQuery +import akka.persistence.query.typed.EventEnvelope +import akka.persistence.query.typed.scaladsl.CurrentEventsByPersistenceIdTypedQuery +import akka.persistence.r2dbc.TestConfig +import akka.persistence.r2dbc.TestData +import akka.persistence.r2dbc.TestDbLifecycle +import akka.persistence.r2dbc.internal.EnvelopeOrigin +import akka.persistence.r2dbc.internal.PubSub +import akka.persistence.r2dbc.query.scaladsl.R2dbcReadJournal +import akka.persistence.r2dbc.state.scaladsl.R2dbcDurableStateStore +import akka.persistence.state.DurableStateStoreRegistry +import akka.persistence.state.scaladsl.DurableStateUpdateWithChangeEventStore +import akka.persistence.typed.PersistenceId +import akka.stream.scaladsl.Sink +import org.scalatest.wordspec.AnyWordSpecLike + +class DurableStateUpdateWithChangeEventStoreSpec + extends ScalaTestWithActorTestKit(TestConfig.config) + with AnyWordSpecLike + with TestDbLifecycle + with TestData + with LogCapturing { + + override def typedSystem: ActorSystem[_] = system + + private val store = DurableStateStoreRegistry(system) + .durableStateStoreFor[DurableStateUpdateWithChangeEventStore[String]](R2dbcDurableStateStore.Identifier) + private val journalQuery = + PersistenceQuery(system).readJournalFor[CurrentEventsByPersistenceIdTypedQuery](R2dbcReadJournal.Identifier) + + private val tag = "TAG" + + "The R2DBC durable state store" should { + "save additional change event" in { + val entityType = nextEntityType() + val persistenceId = PersistenceId(entityType, "my-persistenceId").id + val value1 = "Genuinely Collaborative" + val value2 = "Open to Feedback" + + store.upsertObject(persistenceId, 1L, value1, tag, s"Changed to $value1").futureValue + store.upsertObject(persistenceId, 2L, value2, tag, s"Changed to $value2").futureValue + store.deleteObject(persistenceId, 3L, "Deleted").futureValue + + val envelopes = journalQuery + .currentEventsByPersistenceIdTyped[String](persistenceId, 1L, Long.MaxValue) + .runWith(Sink.seq) + .futureValue + + val env1 = envelopes.head + env1.event shouldBe s"Changed to $value1" + env1.sequenceNr shouldBe 1L + env1.tags shouldBe Set(tag) + + val env2 = envelopes(1) + env2.event shouldBe s"Changed to $value2" + env2.sequenceNr shouldBe 2L + + val env3 = envelopes(2) + env3.event shouldBe s"Deleted" + env3.sequenceNr shouldBe 3L + } + + "detect and reject concurrent inserts, and not store change event" in { + val entityType = nextEntityType() + val persistenceId = PersistenceId(entityType, "id-to-be-inserted-concurrently").id + val value = "Genuinely Collaborative" + store.upsertObject(persistenceId, revision = 1L, value, tag, s"Changed to $value").futureValue + + val updatedValue = "Open to Feedback" + store + .upsertObject(persistenceId, revision = 1L, updatedValue, tag, s"Changed to $updatedValue") + .failed + .futureValue + + val envelopes = journalQuery + .currentEventsByPersistenceIdTyped[String](persistenceId, 1L, Long.MaxValue) + .runWith(Sink.seq) + .futureValue + envelopes.size shouldBe 1 + } + + "detect and reject concurrent updates, and not store change event" in { + if (!r2dbcSettings.durableStateAssertSingleWriter) + pending + + val entityType = nextEntityType() + val persistenceId = PersistenceId(entityType, "id-to-be-updated-concurrently").id + val value = "Genuinely Collaborative" + store.upsertObject(persistenceId, revision = 1L, value, tag, s"Changed to $value").futureValue + + val updatedValue = "Open to Feedback" + store.upsertObject(persistenceId, revision = 2L, updatedValue, tag, s"Changed to $updatedValue").futureValue + + // simulate an update by a different node that didn't see the first one: + val updatedValue2 = "Genuine and Sincere in all Communications" + store + .upsertObject(persistenceId, revision = 2L, updatedValue2, tag, s"Changed to $updatedValue2") + .failed + .futureValue + + val envelopes = journalQuery + .currentEventsByPersistenceIdTyped[String](persistenceId, 1L, Long.MaxValue) + .runWith(Sink.seq) + .futureValue + envelopes.size shouldBe 2 + } + + "detect and reject concurrent delete of revision 1, and not store change event" in { + val entityType = nextEntityType() + val persistenceId = PersistenceId(entityType, "id-to-be-deleted-concurrently").id + val value = "Genuinely Collaborative" + store.upsertObject(persistenceId, revision = 1L, value, tag, s"Changed to $value").futureValue + + store.deleteObject(persistenceId, revision = 1L, "Deleted").failed.futureValue + + val envelopes = journalQuery + .currentEventsByPersistenceIdTyped[String](persistenceId, 1L, Long.MaxValue) + .runWith(Sink.seq) + .futureValue + envelopes.size shouldBe 1 + } + + "detect and reject concurrent deletes, and not store change event" in { + if (!r2dbcSettings.durableStateAssertSingleWriter) + pending + + val entityType = nextEntityType() + val persistenceId = PersistenceId(entityType, "id-to-be-updated-concurrently").id + val value = "Genuinely Collaborative" + store.upsertObject(persistenceId, revision = 1L, value, tag, s"Changed to $value").futureValue + + val updatedValue = "Open to Feedback" + store.upsertObject(persistenceId, revision = 2L, updatedValue, tag, s"Changed to $updatedValue").futureValue + + // simulate a delete by a different node that didn't see the first one: + store.deleteObject(persistenceId, revision = 2L, "Deleted").failed.futureValue + + val envelopes = journalQuery + .currentEventsByPersistenceIdTyped[String](persistenceId, 1L, Long.MaxValue) + .runWith(Sink.seq) + .futureValue + envelopes.size shouldBe 2 + } + + } + + "publish change event" in { + val entityType = nextEntityType() + val persistenceId = PersistenceId(entityType, "my-persistenceId").id + + val slice = persistenceExt.sliceForPersistenceId(persistenceId) + val topic = PubSub(system).eventTopic[String](entityType, slice) + val subscriberProbe = createTestProbe[EventEnvelope[String]]() + topic ! Topic.Subscribe(subscriberProbe.ref) + + val value1 = "Genuinely Collaborative" + val value2 = "Open to Feedback" + + store.upsertObject(persistenceId, 1L, value1, tag, s"Changed to $value1").futureValue + store.upsertObject(persistenceId, 2L, value2, tag, s"Changed to $value2").futureValue + store.deleteObject(persistenceId, 3L, "Deleted").futureValue + + val env1 = subscriberProbe.receiveMessage() + env1.event shouldBe s"Changed to $value1" + env1.sequenceNr shouldBe 1L + env1.tags shouldBe Set(tag) + env1.source shouldBe EnvelopeOrigin.SourcePubSub + + val env2 = subscriberProbe.receiveMessage() + env2.event shouldBe s"Changed to $value2" + env2.sequenceNr shouldBe 2L + + val env3 = subscriberProbe.receiveMessage() + env3.event shouldBe s"Deleted" + env3.sequenceNr shouldBe 3L + } + +} From b0b535da3b77461021ce3fb1fc08fa1b7bd6139b Mon Sep 17 00:00:00 2001 From: Patrik Nordwall Date: Wed, 13 Dec 2023 10:24:07 +0100 Subject: [PATCH 4/6] lazy journalDao, dialect as factory param --- .../r2dbc/internal/h2/H2Dialect.scala | 4 +- .../r2dbc/internal/h2/H2DurableStateDao.scala | 21 +++++---- .../internal/postgres/PostgresDialect.scala | 4 +- .../postgres/PostgresDurableStateDao.scala | 47 ++++++++++--------- .../internal/postgres/YugabyteDialect.scala | 4 +- .../postgres/YugabyteDurableStateDao.scala | 15 +++--- 6 files changed, 48 insertions(+), 47 deletions(-) diff --git a/core/src/main/scala/akka/persistence/r2dbc/internal/h2/H2Dialect.scala b/core/src/main/scala/akka/persistence/r2dbc/internal/h2/H2Dialect.scala index fbe219cf..bf79398a 100644 --- a/core/src/main/scala/akka/persistence/r2dbc/internal/h2/H2Dialect.scala +++ b/core/src/main/scala/akka/persistence/r2dbc/internal/h2/H2Dialect.scala @@ -94,9 +94,7 @@ private[r2dbc] object H2Dialect extends Dialect { override def createDurableStateDao(settings: R2dbcSettings, connectionFactory: ConnectionFactory)(implicit system: ActorSystem[_]): DurableStateDao = - new H2DurableStateDao(settings, connectionFactory, createJournalDao(settings, connectionFactory))( - ecForDaos(system, settings), - system) + new H2DurableStateDao(settings, connectionFactory, this)(ecForDaos(system, settings), system) private def ecForDaos(system: ActorSystem[_], settings: R2dbcSettings): ExecutionContext = { // H2 R2DBC driver blocks in surprising places (Mono.toFuture in stmt.execute().asFuture()) diff --git a/core/src/main/scala/akka/persistence/r2dbc/internal/h2/H2DurableStateDao.scala b/core/src/main/scala/akka/persistence/r2dbc/internal/h2/H2DurableStateDao.scala index 180ef355..27393ed0 100644 --- a/core/src/main/scala/akka/persistence/r2dbc/internal/h2/H2DurableStateDao.scala +++ b/core/src/main/scala/akka/persistence/r2dbc/internal/h2/H2DurableStateDao.scala @@ -4,18 +4,19 @@ package akka.persistence.r2dbc.internal.h2 -import akka.actor.typed.ActorSystem -import akka.annotation.InternalApi -import akka.persistence.r2dbc.R2dbcSettings -import akka.persistence.r2dbc.internal.postgres.PostgresDurableStateDao -import io.r2dbc.spi.ConnectionFactory -import org.slf4j.Logger -import org.slf4j.LoggerFactory import scala.concurrent.ExecutionContext import scala.concurrent.duration.Duration import scala.concurrent.duration.FiniteDuration -import akka.persistence.r2dbc.internal.JournalDao +import io.r2dbc.spi.ConnectionFactory +import org.slf4j.Logger +import org.slf4j.LoggerFactory + +import akka.actor.typed.ActorSystem +import akka.annotation.InternalApi +import akka.persistence.r2dbc.R2dbcSettings +import akka.persistence.r2dbc.internal.Dialect +import akka.persistence.r2dbc.internal.postgres.PostgresDurableStateDao /** * INTERNAL API @@ -24,8 +25,8 @@ import akka.persistence.r2dbc.internal.JournalDao private[r2dbc] final class H2DurableStateDao( settings: R2dbcSettings, connectionFactory: ConnectionFactory, - journalDao: JournalDao)(implicit ec: ExecutionContext, system: ActorSystem[_]) - extends PostgresDurableStateDao(settings, connectionFactory, journalDao) { + dialect: Dialect)(implicit ec: ExecutionContext, system: ActorSystem[_]) + extends PostgresDurableStateDao(settings, connectionFactory, dialect) { override protected lazy val log: Logger = LoggerFactory.getLogger(classOf[H2DurableStateDao]) diff --git a/core/src/main/scala/akka/persistence/r2dbc/internal/postgres/PostgresDialect.scala b/core/src/main/scala/akka/persistence/r2dbc/internal/postgres/PostgresDialect.scala index 9c9362dd..d8dad720 100644 --- a/core/src/main/scala/akka/persistence/r2dbc/internal/postgres/PostgresDialect.scala +++ b/core/src/main/scala/akka/persistence/r2dbc/internal/postgres/PostgresDialect.scala @@ -129,7 +129,5 @@ private[r2dbc] object PostgresDialect extends Dialect { override def createDurableStateDao(settings: R2dbcSettings, connectionFactory: ConnectionFactory)(implicit system: ActorSystem[_]): DurableStateDao = - new PostgresDurableStateDao(settings, connectionFactory, createJournalDao(settings, connectionFactory))( - system.executionContext, - system) + new PostgresDurableStateDao(settings, connectionFactory, this)(system.executionContext, system) } diff --git a/core/src/main/scala/akka/persistence/r2dbc/internal/postgres/PostgresDurableStateDao.scala b/core/src/main/scala/akka/persistence/r2dbc/internal/postgres/PostgresDurableStateDao.scala index 10fe105e..4037ae7f 100644 --- a/core/src/main/scala/akka/persistence/r2dbc/internal/postgres/PostgresDurableStateDao.scala +++ b/core/src/main/scala/akka/persistence/r2dbc/internal/postgres/PostgresDurableStateDao.scala @@ -4,6 +4,25 @@ package akka.persistence.r2dbc.internal.postgres +import java.lang +import java.time.Instant +import java.util + +import scala.collection.immutable +import scala.concurrent.ExecutionContext +import scala.concurrent.Future +import scala.concurrent.duration.Duration +import scala.concurrent.duration.FiniteDuration +import scala.util.control.NonFatal + +import io.r2dbc.spi.Connection +import io.r2dbc.spi.ConnectionFactory +import io.r2dbc.spi.R2dbcDataIntegrityViolationException +import io.r2dbc.spi.Row +import io.r2dbc.spi.Statement +import org.slf4j.Logger +import org.slf4j.LoggerFactory + import akka.Done import akka.NotUsed import akka.actor.typed.ActorSystem @@ -20,8 +39,11 @@ import akka.persistence.r2dbc.internal.AdditionalColumnFactory import akka.persistence.r2dbc.internal.BySliceQuery.Buckets import akka.persistence.r2dbc.internal.BySliceQuery.Buckets.Bucket import akka.persistence.r2dbc.internal.ChangeHandlerFactory +import akka.persistence.r2dbc.internal.Dialect import akka.persistence.r2dbc.internal.DurableStateDao import akka.persistence.r2dbc.internal.InstantFactory +import akka.persistence.r2dbc.internal.JournalDao +import akka.persistence.r2dbc.internal.JournalDao.SerializedJournalRow import akka.persistence.r2dbc.internal.PayloadCodec import akka.persistence.r2dbc.internal.PayloadCodec.RichRow import akka.persistence.r2dbc.internal.PayloadCodec.RichStatement @@ -33,26 +55,6 @@ import akka.persistence.r2dbc.state.scaladsl.AdditionalColumn import akka.persistence.r2dbc.state.scaladsl.ChangeHandler import akka.persistence.typed.PersistenceId import akka.stream.scaladsl.Source -import io.r2dbc.spi.Connection -import io.r2dbc.spi.ConnectionFactory -import io.r2dbc.spi.R2dbcDataIntegrityViolationException -import io.r2dbc.spi.Row -import io.r2dbc.spi.Statement -import org.slf4j.Logger -import org.slf4j.LoggerFactory -import java.lang -import java.time.Instant -import java.util - -import scala.collection.immutable -import scala.concurrent.ExecutionContext -import scala.concurrent.Future -import scala.concurrent.duration.Duration -import scala.concurrent.duration.FiniteDuration -import scala.util.control.NonFatal - -import akka.persistence.r2dbc.internal.JournalDao -import akka.persistence.r2dbc.internal.JournalDao.SerializedJournalRow /** * INTERNAL API @@ -77,7 +79,7 @@ private[r2dbc] object PostgresDurableStateDao { private[r2dbc] class PostgresDurableStateDao( settings: R2dbcSettings, connectionFactory: ConnectionFactory, - journalDao: JournalDao)(implicit ec: ExecutionContext, system: ActorSystem[_]) + dialect: Dialect)(implicit ec: ExecutionContext, system: ActorSystem[_]) extends DurableStateDao { import DurableStateDao._ import PostgresDurableStateDao._ @@ -92,6 +94,9 @@ private[r2dbc] class PostgresDurableStateDao( private implicit val statePayloadCodec: PayloadCodec = settings.durableStatePayloadCodec + // used for change events + private lazy val journalDao: JournalDao = dialect.createJournalDao(settings, connectionFactory) + private lazy val additionalColumns: Map[String, immutable.IndexedSeq[AdditionalColumn[Any, Any]]] = { settings.durableStateAdditionalColumnClasses.map { case (entityType, columnClasses) => val instances = columnClasses.map(fqcn => AdditionalColumnFactory.create(system, fqcn)) diff --git a/core/src/main/scala/akka/persistence/r2dbc/internal/postgres/YugabyteDialect.scala b/core/src/main/scala/akka/persistence/r2dbc/internal/postgres/YugabyteDialect.scala index 27b273dd..9de58b86 100644 --- a/core/src/main/scala/akka/persistence/r2dbc/internal/postgres/YugabyteDialect.scala +++ b/core/src/main/scala/akka/persistence/r2dbc/internal/postgres/YugabyteDialect.scala @@ -42,7 +42,5 @@ private[r2dbc] object YugabyteDialect extends Dialect { override def createDurableStateDao(settings: R2dbcSettings, connectionFactory: ConnectionFactory)(implicit system: ActorSystem[_]): DurableStateDao = - new YugabyteDurableStateDao(settings, connectionFactory, createJournalDao(settings, connectionFactory))( - system.executionContext, - system) + new YugabyteDurableStateDao(settings, connectionFactory, this)(system.executionContext, system) } diff --git a/core/src/main/scala/akka/persistence/r2dbc/internal/postgres/YugabyteDurableStateDao.scala b/core/src/main/scala/akka/persistence/r2dbc/internal/postgres/YugabyteDurableStateDao.scala index eabfaf9f..9a9ba030 100644 --- a/core/src/main/scala/akka/persistence/r2dbc/internal/postgres/YugabyteDurableStateDao.scala +++ b/core/src/main/scala/akka/persistence/r2dbc/internal/postgres/YugabyteDurableStateDao.scala @@ -4,15 +4,16 @@ package akka.persistence.r2dbc.internal.postgres -import akka.actor.typed.ActorSystem -import akka.annotation.InternalApi -import akka.persistence.r2dbc.R2dbcSettings +import scala.concurrent.ExecutionContext + import io.r2dbc.spi._ import org.slf4j.Logger import org.slf4j.LoggerFactory -import scala.concurrent.ExecutionContext -import akka.persistence.r2dbc.internal.JournalDao +import akka.actor.typed.ActorSystem +import akka.annotation.InternalApi +import akka.persistence.r2dbc.R2dbcSettings +import akka.persistence.r2dbc.internal.Dialect /** * INTERNAL API @@ -21,8 +22,8 @@ import akka.persistence.r2dbc.internal.JournalDao private[r2dbc] final class YugabyteDurableStateDao( settings: R2dbcSettings, connectionFactory: ConnectionFactory, - journalDao: JournalDao)(implicit ec: ExecutionContext, system: ActorSystem[_]) - extends PostgresDurableStateDao(settings, connectionFactory, journalDao) { + dialect: Dialect)(implicit ec: ExecutionContext, system: ActorSystem[_]) + extends PostgresDurableStateDao(settings, connectionFactory, dialect) { override protected lazy val log: Logger = LoggerFactory.getLogger(classOf[YugabyteDurableStateDao]) From d1c927bf9a88525574d454a007199a5e738fe186 Mon Sep 17 00:00:00 2001 From: Patrik Nordwall Date: Tue, 19 Dec 2023 10:22:37 +0100 Subject: [PATCH 5/6] Akka 2.9.1 --- project/Dependencies.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/project/Dependencies.scala b/project/Dependencies.scala index a4df00f6..be19a724 100644 --- a/project/Dependencies.scala +++ b/project/Dependencies.scala @@ -9,7 +9,7 @@ object Dependencies { val Scala3 = "3.3.1" val Scala2Versions = Seq(Scala213) val ScalaVersions = Dependencies.Scala2Versions :+ Dependencies.Scala3 - val AkkaVersion = System.getProperty("override.akka.version", "2.9.1-M1+13-fde4109f-SNAPSHOT") + val AkkaVersion = System.getProperty("override.akka.version", "2.9.1") val AkkaVersionInDocs = AkkaVersion.take(3) val AkkaPersistenceJdbcVersion = "5.2.0" // only in migration tool tests val AkkaProjectionVersionInDocs = "current" From fbe67e0e349c4d93ff92e2663e1f0f8e5a1cf48a Mon Sep 17 00:00:00 2001 From: Patrik Nordwall Date: Tue, 19 Dec 2023 11:36:04 +0100 Subject: [PATCH 6/6] override in H2JournalDao, and actually same tx --- .../r2dbc/internal/R2dbcExecutor.scala | 14 +- .../r2dbc/internal/h2/H2JournalDao.scala | 124 ++++++++++-------- .../postgres/PostgresJournalDao.scala | 7 +- ...eStateUpdateWithChangeEventStoreSpec.scala | 24 ++++ 4 files changed, 102 insertions(+), 67 deletions(-) diff --git a/core/src/main/scala/akka/persistence/r2dbc/internal/R2dbcExecutor.scala b/core/src/main/scala/akka/persistence/r2dbc/internal/R2dbcExecutor.scala index fa118ef6..e89126aa 100644 --- a/core/src/main/scala/akka/persistence/r2dbc/internal/R2dbcExecutor.scala +++ b/core/src/main/scala/akka/persistence/r2dbc/internal/R2dbcExecutor.scala @@ -53,6 +53,13 @@ import reactor.core.publisher.Mono result.getRowsUpdated.asFuture().map(_.longValue())(ExecutionContexts.parasitic) } + def updateOneReturningInTx[A](stmt: Statement, mapRow: Row => A)(implicit ec: ExecutionContext): Future[A] = + stmt.execute().asFuture().flatMap { result => + Mono + .from[A](result.map((row, _) => mapRow(row))) + .asFuture() + } + def updateBatchInTx(stmt: Statement)(implicit ec: ExecutionContext): Future[Long] = { val consumer: BiConsumer[Long, java.lang.Long] = (acc, elem) => acc + elem.longValue() Flux @@ -195,12 +202,7 @@ class R2dbcExecutor( def updateOneReturning[A]( logPrefix: String)(statementFactory: Connection => Statement, mapRow: Row => A): Future[A] = { withAutoCommitConnection(logPrefix) { connection => - val stmt = statementFactory(connection) - stmt.execute().asFuture().flatMap { result => - Mono - .from[A](result.map((row, _) => mapRow(row))) - .asFuture() - } + updateOneReturningInTx(statementFactory(connection), mapRow) } } diff --git a/core/src/main/scala/akka/persistence/r2dbc/internal/h2/H2JournalDao.scala b/core/src/main/scala/akka/persistence/r2dbc/internal/h2/H2JournalDao.scala index e1356ece..7eb530ec 100644 --- a/core/src/main/scala/akka/persistence/r2dbc/internal/h2/H2JournalDao.scala +++ b/core/src/main/scala/akka/persistence/r2dbc/internal/h2/H2JournalDao.scala @@ -16,11 +16,15 @@ import io.r2dbc.spi.ConnectionFactory import io.r2dbc.spi.Statement import org.slf4j.Logger import org.slf4j.LoggerFactory - import java.time.Instant + import scala.concurrent.ExecutionContext import scala.concurrent.Future +import io.r2dbc.spi.Connection + +import akka.persistence.r2dbc.internal.R2dbcExecutor + /** * INTERNAL API */ @@ -35,7 +39,7 @@ private[r2dbc] class H2JournalDao(journalSettings: R2dbcSettings, connectionFact require(journalSettings.useAppTimestamp) require(journalSettings.dbTimestampMonotonicIncreasing) - val insertSql = sql"INSERT INTO $journalTable " + + private val insertSql = sql"INSERT INTO $journalTable " + "(slice, entity_type, persistence_id, seq_nr, writer, adapter_manifest, event_ser_id, event_ser_manifest, event_payload, tags, meta_ser_id, meta_ser_manifest, meta_payload, db_timestamp) " + "VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)" @@ -54,66 +58,74 @@ private[r2dbc] class H2JournalDao(journalSettings: R2dbcSettings, connectionFact // it's always the same persistenceId for all events val persistenceId = events.head.persistenceId - val previousSeqNr = events.head.seqNr - 1 - - def bind(stmt: Statement, write: SerializedJournalRow): Statement = { - stmt - .bind(0, write.slice) - .bind(1, write.entityType) - .bind(2, write.persistenceId) - .bind(3, write.seqNr) - .bind(4, write.writerUuid) - .bind(5, "") // FIXME event adapter - .bind(6, write.serId) - .bind(7, write.serManifest) - .bindPayload(8, write.payload.get) - - if (write.tags.isEmpty) - stmt.bindNull(9, classOf[Array[String]]) - else - stmt.bind(9, write.tags.toArray) - - // optional metadata - write.metadata match { - case Some(m) => - stmt - .bind(10, m.serId) - .bind(11, m.serManifest) - .bind(12, m.payload) - case None => - stmt - .bindNull(10, classOf[Integer]) - .bindNull(11, classOf[String]) - .bindNull(12, classOf[Array[Byte]]) + + val totalEvents = events.size + val result = + if (totalEvents == 1) { + r2dbcExecutor.updateOne(s"insert [$persistenceId]")(connection => + bindInsertStatement(connection.createStatement(insertSql), events.head)) + } else { + r2dbcExecutor.updateInBatch(s"batch insert [$persistenceId], [$totalEvents] events")(connection => + events.foldLeft(connection.createStatement(insertSql)) { (stmt, write) => + stmt.add() + bindInsertStatement(stmt, write) + }) } - stmt.bind(13, write.dbTimestamp) + if (log.isDebugEnabled()) + result.foreach { _ => + log.debug("Wrote [{}] events for persistenceId [{}]", 1, events.head.persistenceId) + } + result.map(_ => events.head.dbTimestamp)(ExecutionContexts.parasitic) + } - stmt - } + override def writeEventInTx(event: SerializedJournalRow, connection: Connection): Future[Instant] = { + val persistenceId = event.persistenceId - val totalEvents = events.size - if (totalEvents == 1) { - val result = r2dbcExecutor.updateOne(s"insert [$persistenceId]")(connection => - bind(connection.createStatement(insertSql), events.head)) - if (log.isDebugEnabled()) - result.foreach { _ => - log.debug("Wrote [{}] events for persistenceId [{}]", 1, events.head.persistenceId) - } - result.map(_ => events.head.dbTimestamp)(ExecutionContexts.parasitic) - } else { - val result = r2dbcExecutor.updateInBatch(s"batch insert [$persistenceId], [$totalEvents] events")(connection => - events.foldLeft(connection.createStatement(insertSql)) { (stmt, write) => - stmt.add() - bind(stmt, write) - }) - if (log.isDebugEnabled()) { - result.foreach { _ => - log.debug("Wrote [{}] events for persistenceId [{}]", totalEvents, events.head.persistenceId) - } + val stmt = bindInsertStatement(connection.createStatement(insertSql), event) + val result = R2dbcExecutor.updateOneInTx(stmt) + + if (log.isDebugEnabled()) + result.foreach { _ => + log.debug("Wrote [{}] event for persistenceId [{}]", 1, persistenceId) } - result.map(_ => events.head.dbTimestamp)(ExecutionContexts.parasitic) + result.map(_ => event.dbTimestamp)(ExecutionContexts.parasitic) + } + + private def bindInsertStatement(stmt: Statement, write: SerializedJournalRow): Statement = { + stmt + .bind(0, write.slice) + .bind(1, write.entityType) + .bind(2, write.persistenceId) + .bind(3, write.seqNr) + .bind(4, write.writerUuid) + .bind(5, "") // FIXME event adapter + .bind(6, write.serId) + .bind(7, write.serManifest) + .bindPayload(8, write.payload.get) + + if (write.tags.isEmpty) + stmt.bindNull(9, classOf[Array[String]]) + else + stmt.bind(9, write.tags.toArray) + + // optional metadata + write.metadata match { + case Some(m) => + stmt + .bind(10, m.serId) + .bind(11, m.serManifest) + .bind(12, m.payload) + case None => + stmt + .bindNull(10, classOf[Integer]) + .bindNull(11, classOf[String]) + .bindNull(12, classOf[Array[Byte]]) } + + stmt.bind(13, write.dbTimestamp) + + stmt } } diff --git a/core/src/main/scala/akka/persistence/r2dbc/internal/postgres/PostgresJournalDao.scala b/core/src/main/scala/akka/persistence/r2dbc/internal/postgres/PostgresJournalDao.scala index 6af05f85..6358c5bf 100644 --- a/core/src/main/scala/akka/persistence/r2dbc/internal/postgres/PostgresJournalDao.scala +++ b/core/src/main/scala/akka/persistence/r2dbc/internal/postgres/PostgresJournalDao.scala @@ -10,7 +10,6 @@ import akka.annotation.InternalApi import akka.dispatch.ExecutionContexts import akka.persistence.Persistence import akka.persistence.r2dbc.R2dbcSettings -import akka.persistence.r2dbc.internal.BySliceQuery import akka.persistence.r2dbc.internal.JournalDao import akka.persistence.r2dbc.internal.PayloadCodec import akka.persistence.r2dbc.internal.PayloadCodec.RichStatement @@ -185,10 +184,8 @@ private[r2dbc] class PostgresJournalDao(journalSettings: R2dbcSettings, connecti if (useTimestampFromDb) insertEventWithTransactionTimestampSql else insertEventWithParameterTimestampSql - val result = r2dbcExecutor.updateOneReturning(s"insert [$persistenceId]")( - connection => - bindInsertStatement(connection.createStatement(insertSql), event, useTimestampFromDb, previousSeqNr), - row => row.get(0, classOf[Instant])) + val stmt = bindInsertStatement(connection.createStatement(insertSql), event, useTimestampFromDb, previousSeqNr) + val result = R2dbcExecutor.updateOneReturningInTx(stmt, row => row.get(0, classOf[Instant])) if (log.isDebugEnabled()) result.foreach { _ => log.debug("Wrote [{}] event for persistenceId [{}]", 1, persistenceId) diff --git a/core/src/test/scala/akka/persistence/r2dbc/state/DurableStateUpdateWithChangeEventStoreSpec.scala b/core/src/test/scala/akka/persistence/r2dbc/state/DurableStateUpdateWithChangeEventStoreSpec.scala index 205c7f1b..c7578c3d 100644 --- a/core/src/test/scala/akka/persistence/r2dbc/state/DurableStateUpdateWithChangeEventStoreSpec.scala +++ b/core/src/test/scala/akka/persistence/r2dbc/state/DurableStateUpdateWithChangeEventStoreSpec.scala @@ -4,6 +4,8 @@ package akka.persistence.r2dbc.state +import org.scalatest.concurrent.ScalaFutures.convertScalaFuture + import akka.actor.testkit.typed.scaladsl.LogCapturing import akka.actor.testkit.typed.scaladsl.ScalaTestWithActorTestKit import akka.actor.typed.ActorSystem @@ -24,6 +26,10 @@ import akka.persistence.typed.PersistenceId import akka.stream.scaladsl.Sink import org.scalatest.wordspec.AnyWordSpecLike +import akka.Done +import akka.persistence.r2dbc.TestActors.Persister +import akka.persistence.r2dbc.TestActors.Persister.PersistWithAck + class DurableStateUpdateWithChangeEventStoreSpec extends ScalaTestWithActorTestKit(TestConfig.config) with AnyWordSpecLike @@ -70,6 +76,24 @@ class DurableStateUpdateWithChangeEventStoreSpec env3.sequenceNr shouldBe 3L } + "save additional change event in same transaction" in { + // test rollback (same tx) if the journal insert fails via simulated unique constraint violation in event_journal + val entityType = nextEntityType() + val persistenceId = PersistenceId(entityType, "my-persistenceId").id + + val probe = testKit.createTestProbe[Done]() + val persister = testKit.spawn(Persister(persistenceId)) + persister ! PersistWithAck("a", probe.ref) + probe.expectMessage(Done) + testKit.stop(persister) + + val value1 = "Genuinely Collaborative" + + store.upsertObject(persistenceId, 1L, value1, tag, s"Changed to $value1").failed.futureValue + + store.getObject(persistenceId).futureValue.value shouldBe None + } + "detect and reject concurrent inserts, and not store change event" in { val entityType = nextEntityType() val persistenceId = PersistenceId(entityType, "id-to-be-inserted-concurrently").id