diff --git a/core/src/main/scala/akka/persistence/r2dbc/cleanup/scaladsl/DurableStateCleanup.scala b/core/src/main/scala/akka/persistence/r2dbc/cleanup/scaladsl/DurableStateCleanup.scala index 90a70c23..ab39e4e6 100644 --- a/core/src/main/scala/akka/persistence/r2dbc/cleanup/scaladsl/DurableStateCleanup.scala +++ b/core/src/main/scala/akka/persistence/r2dbc/cleanup/scaladsl/DurableStateCleanup.scala @@ -15,6 +15,7 @@ import akka.actor.typed.ActorSystem import akka.actor.typed.scaladsl.LoggerOps import akka.annotation.ApiMayChange import akka.annotation.InternalApi +import akka.dispatch.ExecutionContexts import akka.persistence.r2dbc.ConnectionFactoryProvider import akka.persistence.r2dbc.R2dbcSettings import akka.persistence.r2dbc.internal.DurableStateDao @@ -65,11 +66,17 @@ final class DurableStateCleanup(systemProvider: ClassicActorSystemProvider, conf */ def deleteState(persistenceId: String, resetRevisionNumber: Boolean): Future[Done] = { if (resetRevisionNumber) - stateDao.deleteState(persistenceId, revision = 0L) // hard delete without revision check + stateDao + .deleteState(persistenceId, revision = 0L, changeEvent = None) // hard delete without revision check + .map(_ => Done)(ExecutionContexts.parasitic) else { stateDao.readState(persistenceId).flatMap { - case None => Future.successful(Done) // already deleted - case Some(s) => stateDao.deleteState(persistenceId, s.revision + 1) + case None => + Future.successful(Done) // already deleted + case Some(s) => + stateDao + .deleteState(persistenceId, s.revision + 1, changeEvent = None) + .map(_ => Done)(ExecutionContexts.parasitic) } } } diff --git a/core/src/main/scala/akka/persistence/r2dbc/internal/DurableStateDao.scala b/core/src/main/scala/akka/persistence/r2dbc/internal/DurableStateDao.scala index ba3cbdd2..e09eb1c6 100644 --- a/core/src/main/scala/akka/persistence/r2dbc/internal/DurableStateDao.scala +++ b/core/src/main/scala/akka/persistence/r2dbc/internal/DurableStateDao.scala @@ -8,10 +8,12 @@ import akka.Done import akka.NotUsed import akka.annotation.InternalApi import akka.stream.scaladsl.Source - import java.time.Instant + import scala.concurrent.Future +import akka.persistence.r2dbc.internal.JournalDao.SerializedJournalRow + /** * INTERNAL API */ @@ -48,9 +50,15 @@ private[r2dbc] trait DurableStateDao extends BySliceQuery.Dao[DurableStateDao.Se def readState(persistenceId: String): Future[Option[SerializedStateRow]] - def upsertState(state: SerializedStateRow, value: Any): Future[Done] + def upsertState( + state: SerializedStateRow, + value: Any, + changeEvent: Option[SerializedJournalRow]): Future[Option[Instant]] - def deleteState(persistenceId: String, revision: Long): Future[Done] + def deleteState( + persistenceId: String, + revision: Long, + changeEvent: Option[SerializedJournalRow]): Future[Option[Instant]] def persistenceIds(afterId: Option[String], limit: Long): Source[String, NotUsed] diff --git a/core/src/main/scala/akka/persistence/r2dbc/internal/JournalDao.scala b/core/src/main/scala/akka/persistence/r2dbc/internal/JournalDao.scala index 22b046ee..ac64692f 100644 --- a/core/src/main/scala/akka/persistence/r2dbc/internal/JournalDao.scala +++ b/core/src/main/scala/akka/persistence/r2dbc/internal/JournalDao.scala @@ -5,10 +5,13 @@ package akka.persistence.r2dbc.internal import akka.annotation.InternalApi - import java.time.Instant + import scala.concurrent.Future +import akka.persistence.r2dbc.internal.JournalDao.SerializedJournalRow +import io.r2dbc.spi.Connection + /** * INTERNAL API */ @@ -56,6 +59,9 @@ private[r2dbc] trait JournalDao { * a select (in same transaction). */ def writeEvents(events: Seq[JournalDao.SerializedJournalRow]): Future[Instant] + + def writeEventInTx(event: SerializedJournalRow, connection: Connection): Future[Instant] + def readHighestSequenceNr(persistenceId: String, fromSequenceNr: Long): Future[Long] def readLowestSequenceNr(persistenceId: String): Future[Long] diff --git a/core/src/main/scala/akka/persistence/r2dbc/internal/PubSub.scala b/core/src/main/scala/akka/persistence/r2dbc/internal/PubSub.scala index 7c3e5fc1..a5f1c85e 100644 --- a/core/src/main/scala/akka/persistence/r2dbc/internal/PubSub.scala +++ b/core/src/main/scala/akka/persistence/r2dbc/internal/PubSub.scala @@ -91,29 +91,7 @@ import org.slf4j.LoggerFactory } def publish(pr: PersistentRepr, timestamp: Instant): Unit = { - - val n = throughputCounter.incrementAndGet() - if (n % throughputSampler == 0) { - val ewma = throughput - val durationMillis = (System.nanoTime() - ewma.nanoTime) / 1000 / 1000 - if (durationMillis >= throughputCollectIntervalMillis) { - // doesn't have to be exact so "missed" or duplicate concurrent calls don't matter - throughputCounter.set(0L) - val rps = n * 1000.0 / durationMillis - val newEwma = ewma :+ rps - throughput = newEwma - if (ewma.value < throughputThreshold && newEwma.value >= throughputThreshold) { - log.info("Disabled publishing of events. Throughput greater than [{}] events/s", throughputThreshold) - } else if (ewma.value >= throughputThreshold && newEwma.value < throughputThreshold) { - log.info("Enabled publishing of events. Throughput less than [{}] events/s", throughputThreshold) - } else { - log.debug( - "Publishing of events is {}. Throughput is [{}] events/s", - if (newEwma.value < throughputThreshold) "enabled" else "disabled", - newEwma.value) - } - } - } + updateThroughput() if (throughput.value < throughputThreshold) { val pid = pr.persistenceId @@ -143,7 +121,47 @@ import org.slf4j.LoggerFactory filtered, source = EnvelopeOrigin.SourcePubSub, tags) - eventTopic(entityType, slice) ! Topic.Publish(envelope) + + publishToTopic(envelope) + } + } + + def publish(envelope: EventEnvelope[Any]): Unit = { + updateThroughput() + + if (throughput.value < throughputThreshold) + publishToTopic(envelope) + } + + private def publishToTopic(envelope: EventEnvelope[Any]): Unit = { + val entityType = PersistenceId.extractEntityType(envelope.persistenceId) + val slice = persistenceExt.sliceForPersistenceId(envelope.persistenceId) + + eventTopic(entityType, slice) ! Topic.Publish(envelope) + } + + private def updateThroughput(): Unit = { + val n = throughputCounter.incrementAndGet() + if (n % throughputSampler == 0) { + val ewma = throughput + val durationMillis = (System.nanoTime() - ewma.nanoTime) / 1000 / 1000 + if (durationMillis >= throughputCollectIntervalMillis) { + // doesn't have to be exact so "missed" or duplicate concurrent calls don't matter + throughputCounter.set(0L) + val rps = n * 1000.0 / durationMillis + val newEwma = ewma :+ rps + throughput = newEwma + if (ewma.value < throughputThreshold && newEwma.value >= throughputThreshold) { + log.info("Disabled publishing of events. Throughput greater than [{}] events/s", throughputThreshold) + } else if (ewma.value >= throughputThreshold && newEwma.value < throughputThreshold) { + log.info("Enabled publishing of events. Throughput less than [{}] events/s", throughputThreshold) + } else { + log.debug( + "Publishing of events is {}. Throughput is [{}] events/s", + if (newEwma.value < throughputThreshold) "enabled" else "disabled", + newEwma.value) + } + } } } } diff --git a/core/src/main/scala/akka/persistence/r2dbc/internal/R2dbcExecutor.scala b/core/src/main/scala/akka/persistence/r2dbc/internal/R2dbcExecutor.scala index fa118ef6..e89126aa 100644 --- a/core/src/main/scala/akka/persistence/r2dbc/internal/R2dbcExecutor.scala +++ b/core/src/main/scala/akka/persistence/r2dbc/internal/R2dbcExecutor.scala @@ -53,6 +53,13 @@ import reactor.core.publisher.Mono result.getRowsUpdated.asFuture().map(_.longValue())(ExecutionContexts.parasitic) } + def updateOneReturningInTx[A](stmt: Statement, mapRow: Row => A)(implicit ec: ExecutionContext): Future[A] = + stmt.execute().asFuture().flatMap { result => + Mono + .from[A](result.map((row, _) => mapRow(row))) + .asFuture() + } + def updateBatchInTx(stmt: Statement)(implicit ec: ExecutionContext): Future[Long] = { val consumer: BiConsumer[Long, java.lang.Long] = (acc, elem) => acc + elem.longValue() Flux @@ -195,12 +202,7 @@ class R2dbcExecutor( def updateOneReturning[A]( logPrefix: String)(statementFactory: Connection => Statement, mapRow: Row => A): Future[A] = { withAutoCommitConnection(logPrefix) { connection => - val stmt = statementFactory(connection) - stmt.execute().asFuture().flatMap { result => - Mono - .from[A](result.map((row, _) => mapRow(row))) - .asFuture() - } + updateOneReturningInTx(statementFactory(connection), mapRow) } } diff --git a/core/src/main/scala/akka/persistence/r2dbc/internal/h2/H2Dialect.scala b/core/src/main/scala/akka/persistence/r2dbc/internal/h2/H2Dialect.scala index d13e6e4a..bf79398a 100644 --- a/core/src/main/scala/akka/persistence/r2dbc/internal/h2/H2Dialect.scala +++ b/core/src/main/scala/akka/persistence/r2dbc/internal/h2/H2Dialect.scala @@ -94,7 +94,7 @@ private[r2dbc] object H2Dialect extends Dialect { override def createDurableStateDao(settings: R2dbcSettings, connectionFactory: ConnectionFactory)(implicit system: ActorSystem[_]): DurableStateDao = - new H2DurableStateDao(settings, connectionFactory)(ecForDaos(system, settings), system) + new H2DurableStateDao(settings, connectionFactory, this)(ecForDaos(system, settings), system) private def ecForDaos(system: ActorSystem[_], settings: R2dbcSettings): ExecutionContext = { // H2 R2DBC driver blocks in surprising places (Mono.toFuture in stmt.execute().asFuture()) diff --git a/core/src/main/scala/akka/persistence/r2dbc/internal/h2/H2DurableStateDao.scala b/core/src/main/scala/akka/persistence/r2dbc/internal/h2/H2DurableStateDao.scala index c3f42d5a..27393ed0 100644 --- a/core/src/main/scala/akka/persistence/r2dbc/internal/h2/H2DurableStateDao.scala +++ b/core/src/main/scala/akka/persistence/r2dbc/internal/h2/H2DurableStateDao.scala @@ -4,26 +4,29 @@ package akka.persistence.r2dbc.internal.h2 -import akka.actor.typed.ActorSystem -import akka.annotation.InternalApi -import akka.persistence.r2dbc.R2dbcSettings -import akka.persistence.r2dbc.internal.postgres.PostgresDurableStateDao +import scala.concurrent.ExecutionContext +import scala.concurrent.duration.Duration +import scala.concurrent.duration.FiniteDuration + import io.r2dbc.spi.ConnectionFactory import org.slf4j.Logger import org.slf4j.LoggerFactory -import scala.concurrent.ExecutionContext -import scala.concurrent.duration.Duration -import scala.concurrent.duration.FiniteDuration +import akka.actor.typed.ActorSystem +import akka.annotation.InternalApi +import akka.persistence.r2dbc.R2dbcSettings +import akka.persistence.r2dbc.internal.Dialect +import akka.persistence.r2dbc.internal.postgres.PostgresDurableStateDao /** * INTERNAL API */ @InternalApi -private[r2dbc] final class H2DurableStateDao(settings: R2dbcSettings, connectionFactory: ConnectionFactory)(implicit - ec: ExecutionContext, - system: ActorSystem[_]) - extends PostgresDurableStateDao(settings, connectionFactory) { +private[r2dbc] final class H2DurableStateDao( + settings: R2dbcSettings, + connectionFactory: ConnectionFactory, + dialect: Dialect)(implicit ec: ExecutionContext, system: ActorSystem[_]) + extends PostgresDurableStateDao(settings, connectionFactory, dialect) { override protected lazy val log: Logger = LoggerFactory.getLogger(classOf[H2DurableStateDao]) diff --git a/core/src/main/scala/akka/persistence/r2dbc/internal/h2/H2JournalDao.scala b/core/src/main/scala/akka/persistence/r2dbc/internal/h2/H2JournalDao.scala index e1356ece..7eb530ec 100644 --- a/core/src/main/scala/akka/persistence/r2dbc/internal/h2/H2JournalDao.scala +++ b/core/src/main/scala/akka/persistence/r2dbc/internal/h2/H2JournalDao.scala @@ -16,11 +16,15 @@ import io.r2dbc.spi.ConnectionFactory import io.r2dbc.spi.Statement import org.slf4j.Logger import org.slf4j.LoggerFactory - import java.time.Instant + import scala.concurrent.ExecutionContext import scala.concurrent.Future +import io.r2dbc.spi.Connection + +import akka.persistence.r2dbc.internal.R2dbcExecutor + /** * INTERNAL API */ @@ -35,7 +39,7 @@ private[r2dbc] class H2JournalDao(journalSettings: R2dbcSettings, connectionFact require(journalSettings.useAppTimestamp) require(journalSettings.dbTimestampMonotonicIncreasing) - val insertSql = sql"INSERT INTO $journalTable " + + private val insertSql = sql"INSERT INTO $journalTable " + "(slice, entity_type, persistence_id, seq_nr, writer, adapter_manifest, event_ser_id, event_ser_manifest, event_payload, tags, meta_ser_id, meta_ser_manifest, meta_payload, db_timestamp) " + "VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)" @@ -54,66 +58,74 @@ private[r2dbc] class H2JournalDao(journalSettings: R2dbcSettings, connectionFact // it's always the same persistenceId for all events val persistenceId = events.head.persistenceId - val previousSeqNr = events.head.seqNr - 1 - - def bind(stmt: Statement, write: SerializedJournalRow): Statement = { - stmt - .bind(0, write.slice) - .bind(1, write.entityType) - .bind(2, write.persistenceId) - .bind(3, write.seqNr) - .bind(4, write.writerUuid) - .bind(5, "") // FIXME event adapter - .bind(6, write.serId) - .bind(7, write.serManifest) - .bindPayload(8, write.payload.get) - - if (write.tags.isEmpty) - stmt.bindNull(9, classOf[Array[String]]) - else - stmt.bind(9, write.tags.toArray) - - // optional metadata - write.metadata match { - case Some(m) => - stmt - .bind(10, m.serId) - .bind(11, m.serManifest) - .bind(12, m.payload) - case None => - stmt - .bindNull(10, classOf[Integer]) - .bindNull(11, classOf[String]) - .bindNull(12, classOf[Array[Byte]]) + + val totalEvents = events.size + val result = + if (totalEvents == 1) { + r2dbcExecutor.updateOne(s"insert [$persistenceId]")(connection => + bindInsertStatement(connection.createStatement(insertSql), events.head)) + } else { + r2dbcExecutor.updateInBatch(s"batch insert [$persistenceId], [$totalEvents] events")(connection => + events.foldLeft(connection.createStatement(insertSql)) { (stmt, write) => + stmt.add() + bindInsertStatement(stmt, write) + }) } - stmt.bind(13, write.dbTimestamp) + if (log.isDebugEnabled()) + result.foreach { _ => + log.debug("Wrote [{}] events for persistenceId [{}]", 1, events.head.persistenceId) + } + result.map(_ => events.head.dbTimestamp)(ExecutionContexts.parasitic) + } - stmt - } + override def writeEventInTx(event: SerializedJournalRow, connection: Connection): Future[Instant] = { + val persistenceId = event.persistenceId - val totalEvents = events.size - if (totalEvents == 1) { - val result = r2dbcExecutor.updateOne(s"insert [$persistenceId]")(connection => - bind(connection.createStatement(insertSql), events.head)) - if (log.isDebugEnabled()) - result.foreach { _ => - log.debug("Wrote [{}] events for persistenceId [{}]", 1, events.head.persistenceId) - } - result.map(_ => events.head.dbTimestamp)(ExecutionContexts.parasitic) - } else { - val result = r2dbcExecutor.updateInBatch(s"batch insert [$persistenceId], [$totalEvents] events")(connection => - events.foldLeft(connection.createStatement(insertSql)) { (stmt, write) => - stmt.add() - bind(stmt, write) - }) - if (log.isDebugEnabled()) { - result.foreach { _ => - log.debug("Wrote [{}] events for persistenceId [{}]", totalEvents, events.head.persistenceId) - } + val stmt = bindInsertStatement(connection.createStatement(insertSql), event) + val result = R2dbcExecutor.updateOneInTx(stmt) + + if (log.isDebugEnabled()) + result.foreach { _ => + log.debug("Wrote [{}] event for persistenceId [{}]", 1, persistenceId) } - result.map(_ => events.head.dbTimestamp)(ExecutionContexts.parasitic) + result.map(_ => event.dbTimestamp)(ExecutionContexts.parasitic) + } + + private def bindInsertStatement(stmt: Statement, write: SerializedJournalRow): Statement = { + stmt + .bind(0, write.slice) + .bind(1, write.entityType) + .bind(2, write.persistenceId) + .bind(3, write.seqNr) + .bind(4, write.writerUuid) + .bind(5, "") // FIXME event adapter + .bind(6, write.serId) + .bind(7, write.serManifest) + .bindPayload(8, write.payload.get) + + if (write.tags.isEmpty) + stmt.bindNull(9, classOf[Array[String]]) + else + stmt.bind(9, write.tags.toArray) + + // optional metadata + write.metadata match { + case Some(m) => + stmt + .bind(10, m.serId) + .bind(11, m.serManifest) + .bind(12, m.payload) + case None => + stmt + .bindNull(10, classOf[Integer]) + .bindNull(11, classOf[String]) + .bindNull(12, classOf[Array[Byte]]) } + + stmt.bind(13, write.dbTimestamp) + + stmt } } diff --git a/core/src/main/scala/akka/persistence/r2dbc/internal/postgres/PostgresDialect.scala b/core/src/main/scala/akka/persistence/r2dbc/internal/postgres/PostgresDialect.scala index 87ddc81a..d8dad720 100644 --- a/core/src/main/scala/akka/persistence/r2dbc/internal/postgres/PostgresDialect.scala +++ b/core/src/main/scala/akka/persistence/r2dbc/internal/postgres/PostgresDialect.scala @@ -129,5 +129,5 @@ private[r2dbc] object PostgresDialect extends Dialect { override def createDurableStateDao(settings: R2dbcSettings, connectionFactory: ConnectionFactory)(implicit system: ActorSystem[_]): DurableStateDao = - new PostgresDurableStateDao(settings, connectionFactory)(system.executionContext, system) + new PostgresDurableStateDao(settings, connectionFactory, this)(system.executionContext, system) } diff --git a/core/src/main/scala/akka/persistence/r2dbc/internal/postgres/PostgresDurableStateDao.scala b/core/src/main/scala/akka/persistence/r2dbc/internal/postgres/PostgresDurableStateDao.scala index 7cca3f15..4037ae7f 100644 --- a/core/src/main/scala/akka/persistence/r2dbc/internal/postgres/PostgresDurableStateDao.scala +++ b/core/src/main/scala/akka/persistence/r2dbc/internal/postgres/PostgresDurableStateDao.scala @@ -4,6 +4,25 @@ package akka.persistence.r2dbc.internal.postgres +import java.lang +import java.time.Instant +import java.util + +import scala.collection.immutable +import scala.concurrent.ExecutionContext +import scala.concurrent.Future +import scala.concurrent.duration.Duration +import scala.concurrent.duration.FiniteDuration +import scala.util.control.NonFatal + +import io.r2dbc.spi.Connection +import io.r2dbc.spi.ConnectionFactory +import io.r2dbc.spi.R2dbcDataIntegrityViolationException +import io.r2dbc.spi.Row +import io.r2dbc.spi.Statement +import org.slf4j.Logger +import org.slf4j.LoggerFactory + import akka.Done import akka.NotUsed import akka.actor.typed.ActorSystem @@ -20,8 +39,11 @@ import akka.persistence.r2dbc.internal.AdditionalColumnFactory import akka.persistence.r2dbc.internal.BySliceQuery.Buckets import akka.persistence.r2dbc.internal.BySliceQuery.Buckets.Bucket import akka.persistence.r2dbc.internal.ChangeHandlerFactory +import akka.persistence.r2dbc.internal.Dialect import akka.persistence.r2dbc.internal.DurableStateDao import akka.persistence.r2dbc.internal.InstantFactory +import akka.persistence.r2dbc.internal.JournalDao +import akka.persistence.r2dbc.internal.JournalDao.SerializedJournalRow import akka.persistence.r2dbc.internal.PayloadCodec import akka.persistence.r2dbc.internal.PayloadCodec.RichRow import akka.persistence.r2dbc.internal.PayloadCodec.RichStatement @@ -33,23 +55,6 @@ import akka.persistence.r2dbc.state.scaladsl.AdditionalColumn import akka.persistence.r2dbc.state.scaladsl.ChangeHandler import akka.persistence.typed.PersistenceId import akka.stream.scaladsl.Source -import io.r2dbc.spi.Connection -import io.r2dbc.spi.ConnectionFactory -import io.r2dbc.spi.R2dbcDataIntegrityViolationException -import io.r2dbc.spi.Row -import io.r2dbc.spi.Statement -import org.slf4j.Logger -import org.slf4j.LoggerFactory - -import java.lang -import java.time.Instant -import java.util -import scala.collection.immutable -import scala.concurrent.ExecutionContext -import scala.concurrent.Future -import scala.concurrent.duration.Duration -import scala.concurrent.duration.FiniteDuration -import scala.util.control.NonFatal /** * INTERNAL API @@ -64,15 +69,17 @@ private[r2dbc] object PostgresDurableStateDao { binding: AdditionalColumn.Binding[_]) val FutureDone: Future[Done] = Future.successful(Done) + val FutureInstantNone: Future[Option[Instant]] = Future.successful(None) } /** * INTERNAL API */ @InternalApi -private[r2dbc] class PostgresDurableStateDao(settings: R2dbcSettings, connectionFactory: ConnectionFactory)(implicit - ec: ExecutionContext, - system: ActorSystem[_]) +private[r2dbc] class PostgresDurableStateDao( + settings: R2dbcSettings, + connectionFactory: ConnectionFactory, + dialect: Dialect)(implicit ec: ExecutionContext, system: ActorSystem[_]) extends DurableStateDao { import DurableStateDao._ import PostgresDurableStateDao._ @@ -87,6 +94,9 @@ private[r2dbc] class PostgresDurableStateDao(settings: R2dbcSettings, connection private implicit val statePayloadCodec: PayloadCodec = settings.durableStatePayloadCodec + // used for change events + private lazy val journalDao: JournalDao = dialect.createJournalDao(settings, connectionFactory) + private lazy val additionalColumns: Map[String, immutable.IndexedSeq[AdditionalColumn[Any, Any]]] = { settings.durableStateAdditionalColumnClasses.map { case (entityType, columnClasses) => val instances = columnClasses.map(fqcn => AdditionalColumnFactory.create(system, fqcn)) @@ -264,7 +274,7 @@ private[r2dbc] class PostgresDurableStateDao(settings: R2dbcSettings, connection LIMIT ?""" } - def readState(persistenceId: String): Future[Option[SerializedStateRow]] = { + override def readState(persistenceId: String): Future[Option[SerializedStateRow]] = { val entityType = PersistenceId.extractEntityType(persistenceId) r2dbcExecutor.selectOne(s"select [$persistenceId]")( connection => @@ -293,7 +303,27 @@ private[r2dbc] class PostgresDurableStateDao(settings: R2dbcSettings, connection Option(rowPayload) } - def upsertState(state: SerializedStateRow, value: Any): Future[Done] = { + private def writeChangeEventAndCallChangeHander( + connection: Connection, + updatedRows: Long, + entityType: String, + change: DurableStateChange[Any], + changeEvent: Option[SerializedJournalRow]): Future[Option[Instant]] = { + if (updatedRows == 1) + for { + changeEventTimestamp <- changeEvent + .map(journalDao.writeEventInTx(_, connection).map(Some(_))) + .getOrElse(FutureInstantNone) + _ <- changeHandlers.get(entityType).map(processChange(_, connection, change)).getOrElse(FutureDone) + } yield changeEventTimestamp + else + FutureInstantNone + } + + override def upsertState( + state: SerializedStateRow, + value: Any, + changeEvent: Option[SerializedJournalRow]): Future[Option[Instant]] = { require(state.revision > 0) def bindTags(stmt: Statement, i: Int): Statement = { @@ -327,7 +357,7 @@ private[r2dbc] class PostgresDurableStateDao(settings: R2dbcSettings, connection val entityType = PersistenceId.extractEntityType(state.persistenceId) - val result = { + val result: Future[(Long, Option[Instant])] = { val additionalBindings = additionalColumns.get(entityType) match { case None => Vector.empty[EvaluatedAdditionalColumnBindings] case Some(columns) => @@ -360,17 +390,22 @@ private[r2dbc] class PostgresDurableStateDao(settings: R2dbcSettings, connection s"Insert failed: durable state for persistence id [${state.persistenceId}] already exists")) } - changeHandlers.get(entityType) match { - case None => - recoverDataIntegrityViolation(r2dbcExecutor.updateOne(s"insert [${state.persistenceId}]")(insertStatement)) - case Some(handler) => - r2dbcExecutor.withConnection(s"insert [${state.persistenceId}] with change handler") { connection => - for { - updatedRows <- recoverDataIntegrityViolation(R2dbcExecutor.updateOneInTx(insertStatement(connection))) - _ <- if (updatedRows == 1) processChange(handler, connection, change) else FutureDone - } yield updatedRows - } - } + if (!changeHandlers.contains(entityType) && changeEvent.isEmpty) { + val updatedRows = recoverDataIntegrityViolation( + r2dbcExecutor.updateOne(s"insert [${state.persistenceId}]")(insertStatement)) + updatedRows.map(_ -> None) + } else + r2dbcExecutor.withConnection(s"insert [${state.persistenceId}]") { connection => + for { + updatedRows <- recoverDataIntegrityViolation(R2dbcExecutor.updateOneInTx(insertStatement(connection))) + changeEventTimestamp <- writeChangeEventAndCallChangeHander( + connection, + updatedRows, + entityType, + change, + changeEvent) + } yield (updatedRows, changeEventTimestamp) + } } else { val previousRevision = state.revision - 1 @@ -405,27 +440,31 @@ private[r2dbc] class PostgresDurableStateDao(settings: R2dbcSettings, connection } } - changeHandlers.get(entityType) match { - case None => - r2dbcExecutor.updateOne(s"update [${state.persistenceId}]")(updateStatement) - case Some(handler) => - r2dbcExecutor.withConnection(s"update [${state.persistenceId}] with change handler") { connection => - for { - updatedRows <- R2dbcExecutor.updateOneInTx(updateStatement(connection)) - _ <- if (updatedRows == 1) processChange(handler, connection, change) else FutureDone - } yield updatedRows - } - } + if (!changeHandlers.contains(entityType) && changeEvent.isEmpty) { + val updatedRows = r2dbcExecutor.updateOne(s"update [${state.persistenceId}]")(updateStatement) + updatedRows.map(_ -> None) + } else + r2dbcExecutor.withConnection(s"update [${state.persistenceId}]") { connection => + for { + updatedRows <- R2dbcExecutor.updateOneInTx(updateStatement(connection)) + changeEventTimestamp <- writeChangeEventAndCallChangeHander( + connection, + updatedRows, + entityType, + change, + changeEvent) + } yield (updatedRows, changeEventTimestamp) + } } } - result.map { updatedRows => + result.map { case (updatedRows, changeEventTimestamp) => if (updatedRows != 1) throw new IllegalStateException( s"Update failed: durable state for persistence id [${state.persistenceId}] could not be updated to revision [${state.revision}]") else { log.debug("Updated durable state for persistenceId [{}] to revision [{}]", state.persistenceId, state.revision) - Done + changeEventTimestamp } } } @@ -451,11 +490,15 @@ private[r2dbc] class PostgresDurableStateDao(settings: R2dbcSettings, connection } } - def deleteState(persistenceId: String, revision: Long): Future[Done] = { + override def deleteState( + persistenceId: String, + revision: Long, + changeEvent: Option[SerializedJournalRow]): Future[Option[Instant]] = { if (revision == 0) { hardDeleteState(persistenceId) + .map(_ => None)(ExecutionContexts.parasitic) } else { - val result = { + val result: Future[(Long, Option[Instant])] = { val entityType = PersistenceId.extractEntityType(persistenceId) def change = new DeletedDurableState[Any](persistenceId, revision, NoOffset, EmptyDbTimestamp.toEpochMilli) @@ -483,18 +526,17 @@ private[r2dbc] class PostgresDurableStateDao(settings: R2dbcSettings, connection s"Insert delete marker with revision 1 failed: durable state for persistence id [$persistenceId] already exists")) } - val changeHandler = changeHandlers.get(entityType) - val changeHandlerHint = changeHandler.map(_ => " with change handler").getOrElse("") - - r2dbcExecutor.withConnection(s"insert delete marker [$persistenceId]$changeHandlerHint") { connection => + r2dbcExecutor.withConnection(s"insert delete marker [$persistenceId]") { connection => for { updatedRows <- recoverDataIntegrityViolation( R2dbcExecutor.updateOneInTx(insertDeleteMarkerStatement(connection))) - _ <- changeHandler match { - case None => FutureDone - case Some(handler) => if (updatedRows == 1) processChange(handler, connection, change) else FutureDone - } - } yield updatedRows + changeEventTimestamp <- writeChangeEventAndCallChangeHander( + connection, + updatedRows, + entityType, + change, + changeEvent) + } yield (updatedRows, changeEventTimestamp) } } else { @@ -531,28 +573,27 @@ private[r2dbc] class PostgresDurableStateDao(settings: R2dbcSettings, connection } } - val changeHandler = changeHandlers.get(entityType) - val changeHandlerHint = changeHandler.map(_ => " with change handler").getOrElse("") - - r2dbcExecutor.withConnection(s"delete [$persistenceId]$changeHandlerHint") { connection => + r2dbcExecutor.withConnection(s"delete [$persistenceId]") { connection => for { updatedRows <- R2dbcExecutor.updateOneInTx(updateStatement(connection)) - _ <- changeHandler match { - case None => FutureDone - case Some(handler) => if (updatedRows == 1) processChange(handler, connection, change) else FutureDone - } - } yield updatedRows + changeEventTimestamp <- writeChangeEventAndCallChangeHander( + connection, + updatedRows, + entityType, + change, + changeEvent) + } yield (updatedRows, changeEventTimestamp) } } } - result.map { updatedRows => + result.map { case (updatedRows, changeEventTimestamp) => if (updatedRows != 1) throw new IllegalStateException( s"Delete failed: durable state for persistence id [$persistenceId] could not be updated to revision [$revision]") else { log.debug("Deleted durable state for persistenceId [{}] to revision [{}]", persistenceId, revision) - Done + changeEventTimestamp } } @@ -572,14 +613,9 @@ private[r2dbc] class PostgresDurableStateDao(settings: R2dbcSettings, connection connection .createStatement(hardDeleteStateSql(entityType)) .bind(0, persistenceId)) - _ <- changeHandler match { - case None => FutureDone - case Some(handler) => - if (updatedRows == 1) { - val change = new DeletedDurableState[Any](persistenceId, 0L, NoOffset, EmptyDbTimestamp.toEpochMilli) - processChange(handler, connection, change) - } else - FutureDone + _ <- { + val change = new DeletedDurableState[Any](persistenceId, 0L, NoOffset, EmptyDbTimestamp.toEpochMilli) + writeChangeEventAndCallChangeHander(connection, updatedRows, entityType, change, changeEvent = None) } } yield updatedRows } @@ -669,7 +705,7 @@ private[r2dbc] class PostgresDurableStateDao(settings: R2dbcSettings, connection Source.futureSource(result.map(Source(_))).mapMaterializedValue(_ => NotUsed) } - def persistenceIds(afterId: Option[String], limit: Long): Source[String, NotUsed] = { + override def persistenceIds(afterId: Option[String], limit: Long): Source[String, NotUsed] = { if (settings.durableStateTableByEntityTypeWithSchema.isEmpty) persistenceIds(afterId, limit, settings.durableStateTableWithSchema) else { @@ -699,7 +735,7 @@ private[r2dbc] class PostgresDurableStateDao(settings: R2dbcSettings, connection } } - def persistenceIds(afterId: Option[String], limit: Long, table: String): Source[String, NotUsed] = { + override def persistenceIds(afterId: Option[String], limit: Long, table: String): Source[String, NotUsed] = { val result = readPersistenceIds(afterId, limit, table) Source.futureSource(result.map(Source(_))).mapMaterializedValue(_ => NotUsed) @@ -729,7 +765,7 @@ private[r2dbc] class PostgresDurableStateDao(settings: R2dbcSettings, connection result } - def persistenceIds(entityType: String, afterId: Option[String], limit: Long): Source[String, NotUsed] = { + override def persistenceIds(entityType: String, afterId: Option[String], limit: Long): Source[String, NotUsed] = { val table = settings.getDurableStateTableWithSchema(entityType) val likeStmtPostfix = PersistenceId.DefaultSeparator + "%" val result = r2dbcExecutor.select(s"select persistenceIds by entity type")( diff --git a/core/src/main/scala/akka/persistence/r2dbc/internal/postgres/PostgresJournalDao.scala b/core/src/main/scala/akka/persistence/r2dbc/internal/postgres/PostgresJournalDao.scala index f3f0a7ad..6358c5bf 100644 --- a/core/src/main/scala/akka/persistence/r2dbc/internal/postgres/PostgresJournalDao.scala +++ b/core/src/main/scala/akka/persistence/r2dbc/internal/postgres/PostgresJournalDao.scala @@ -10,7 +10,6 @@ import akka.annotation.InternalApi import akka.dispatch.ExecutionContexts import akka.persistence.Persistence import akka.persistence.r2dbc.R2dbcSettings -import akka.persistence.r2dbc.internal.BySliceQuery import akka.persistence.r2dbc.internal.JournalDao import akka.persistence.r2dbc.internal.PayloadCodec import akka.persistence.r2dbc.internal.PayloadCodec.RichStatement @@ -24,8 +23,8 @@ import io.r2dbc.spi.Row import io.r2dbc.spi.Statement import org.slf4j.Logger import org.slf4j.LoggerFactory - import java.time.Instant + import scala.concurrent.ExecutionContext import scala.concurrent.Future @@ -133,7 +132,7 @@ private[r2dbc] class PostgresJournalDao(journalSettings: R2dbcSettings, connecti * it can return `JournalDao.EmptyDbTimestamp` when the pub-sub feature is disabled. When enabled it would have to use * a select (in same transaction). */ - def writeEvents(events: Seq[SerializedJournalRow]): Future[Instant] = { + override def writeEvents(events: Seq[SerializedJournalRow]): Future[Instant] = { require(events.nonEmpty) // it's always the same persistenceId for all events @@ -143,56 +142,6 @@ private[r2dbc] class PostgresJournalDao(journalSettings: R2dbcSettings, connecti // The MigrationTool defines the dbTimestamp to preserve the original event timestamp val useTimestampFromDb = events.head.dbTimestamp == Instant.EPOCH - def bind(stmt: Statement, write: SerializedJournalRow): Statement = { - stmt - .bind(0, write.slice) - .bind(1, write.entityType) - .bind(2, write.persistenceId) - .bind(3, write.seqNr) - .bind(4, write.writerUuid) - .bind(5, "") // FIXME event adapter - .bind(6, write.serId) - .bind(7, write.serManifest) - .bindPayload(8, write.payload.get) - - if (write.tags.isEmpty) - stmt.bindNull(9, classOf[Array[String]]) - else - stmt.bind(9, write.tags.toArray) - - // optional metadata - write.metadata match { - case Some(m) => - stmt - .bind(10, m.serId) - .bind(11, m.serManifest) - .bind(12, m.payload) - case None => - stmt - .bindNull(10, classOf[Integer]) - .bindNull(11, classOf[String]) - .bindNull(12, classOf[Array[Byte]]) - } - - if (useTimestampFromDb) { - if (!journalSettings.dbTimestampMonotonicIncreasing) - stmt - .bind(13, write.persistenceId) - .bind(14, previousSeqNr) - } else { - if (journalSettings.dbTimestampMonotonicIncreasing) - stmt - .bind(13, write.dbTimestamp) - else - stmt - .bind(13, write.dbTimestamp) - .bind(14, write.persistenceId) - .bind(15, previousSeqNr) - } - - stmt - } - val insertSql = if (useTimestampFromDb) insertEventWithTransactionTimestampSql else insertEventWithParameterTimestampSql @@ -200,11 +149,12 @@ private[r2dbc] class PostgresJournalDao(journalSettings: R2dbcSettings, connecti val totalEvents = events.size if (totalEvents == 1) { val result = r2dbcExecutor.updateOneReturning(s"insert [$persistenceId]")( - connection => bind(connection.createStatement(insertSql), events.head), + connection => + bindInsertStatement(connection.createStatement(insertSql), events.head, useTimestampFromDb, previousSeqNr), row => row.get(0, classOf[Instant])) if (log.isDebugEnabled()) result.foreach { _ => - log.debug("Wrote [{}] events for persistenceId [{}]", 1, events.head.persistenceId) + log.debug("Wrote [{}] events for persistenceId [{}]", 1, persistenceId) } result } else { @@ -212,18 +162,92 @@ private[r2dbc] class PostgresJournalDao(journalSettings: R2dbcSettings, connecti connection => events.foldLeft(connection.createStatement(insertSql)) { (stmt, write) => stmt.add() - bind(stmt, write) + bindInsertStatement(stmt, write, useTimestampFromDb, previousSeqNr) }, row => row.get(0, classOf[Instant])) if (log.isDebugEnabled()) result.foreach { _ => - log.debug("Wrote [{}] events for persistenceId [{}]", totalEvents, events.head.persistenceId) + log.debug("Wrote [{}] events for persistenceId [{}]", totalEvents, persistenceId) } result.map(_.head)(ExecutionContexts.parasitic) } } - def readHighestSequenceNr(persistenceId: String, fromSequenceNr: Long): Future[Long] = { + override def writeEventInTx(event: SerializedJournalRow, connection: Connection): Future[Instant] = { + val persistenceId = event.persistenceId + val previousSeqNr = event.seqNr - 1 + + // The MigrationTool defines the dbTimestamp to preserve the original event timestamp + val useTimestampFromDb = event.dbTimestamp == Instant.EPOCH + + val insertSql = + if (useTimestampFromDb) insertEventWithTransactionTimestampSql + else insertEventWithParameterTimestampSql + + val stmt = bindInsertStatement(connection.createStatement(insertSql), event, useTimestampFromDb, previousSeqNr) + val result = R2dbcExecutor.updateOneReturningInTx(stmt, row => row.get(0, classOf[Instant])) + if (log.isDebugEnabled()) + result.foreach { _ => + log.debug("Wrote [{}] event for persistenceId [{}]", 1, persistenceId) + } + result + } + + private def bindInsertStatement( + stmt: Statement, + write: SerializedJournalRow, + useTimestampFromDb: Boolean, + previousSeqNr: Long): Statement = { + stmt + .bind(0, write.slice) + .bind(1, write.entityType) + .bind(2, write.persistenceId) + .bind(3, write.seqNr) + .bind(4, write.writerUuid) + .bind(5, "") // FIXME event adapter + .bind(6, write.serId) + .bind(7, write.serManifest) + .bindPayload(8, write.payload.get) + + if (write.tags.isEmpty) + stmt.bindNull(9, classOf[Array[String]]) + else + stmt.bind(9, write.tags.toArray) + + // optional metadata + write.metadata match { + case Some(m) => + stmt + .bind(10, m.serId) + .bind(11, m.serManifest) + .bind(12, m.payload) + case None => + stmt + .bindNull(10, classOf[Integer]) + .bindNull(11, classOf[String]) + .bindNull(12, classOf[Array[Byte]]) + } + + if (useTimestampFromDb) { + if (!journalSettings.dbTimestampMonotonicIncreasing) + stmt + .bind(13, write.persistenceId) + .bind(14, previousSeqNr) + } else { + if (journalSettings.dbTimestampMonotonicIncreasing) + stmt + .bind(13, write.dbTimestamp) + else + stmt + .bind(13, write.dbTimestamp) + .bind(14, write.persistenceId) + .bind(15, previousSeqNr) + } + + stmt + } + + override def readHighestSequenceNr(persistenceId: String, fromSequenceNr: Long): Future[Long] = { val result = r2dbcExecutor .select(s"select highest seqNr [$persistenceId]")( connection => @@ -243,7 +267,7 @@ private[r2dbc] class PostgresJournalDao(journalSettings: R2dbcSettings, connecti result } - def readLowestSequenceNr(persistenceId: String): Future[Long] = { + override def readLowestSequenceNr(persistenceId: String): Future[Long] = { val result = r2dbcExecutor .select(s"select lowest seqNr [$persistenceId]")( connection => @@ -277,7 +301,7 @@ private[r2dbc] class PostgresJournalDao(journalSettings: R2dbcSettings, connecti } } - def deleteEventsTo(persistenceId: String, toSequenceNr: Long, resetSequenceNumber: Boolean): Future[Unit] = { + override def deleteEventsTo(persistenceId: String, toSequenceNr: Long, resetSequenceNumber: Boolean): Future[Unit] = { def insertDeleteMarkerStmt(deleteMarkerSeqNr: Long, connection: Connection): Statement = { val entityType = PersistenceId.extractEntityType(persistenceId) diff --git a/core/src/main/scala/akka/persistence/r2dbc/internal/postgres/YugabyteDialect.scala b/core/src/main/scala/akka/persistence/r2dbc/internal/postgres/YugabyteDialect.scala index 2d7a6a7d..9de58b86 100644 --- a/core/src/main/scala/akka/persistence/r2dbc/internal/postgres/YugabyteDialect.scala +++ b/core/src/main/scala/akka/persistence/r2dbc/internal/postgres/YugabyteDialect.scala @@ -42,5 +42,5 @@ private[r2dbc] object YugabyteDialect extends Dialect { override def createDurableStateDao(settings: R2dbcSettings, connectionFactory: ConnectionFactory)(implicit system: ActorSystem[_]): DurableStateDao = - new YugabyteDurableStateDao(settings, connectionFactory)(system.executionContext, system) + new YugabyteDurableStateDao(settings, connectionFactory, this)(system.executionContext, system) } diff --git a/core/src/main/scala/akka/persistence/r2dbc/internal/postgres/YugabyteDurableStateDao.scala b/core/src/main/scala/akka/persistence/r2dbc/internal/postgres/YugabyteDurableStateDao.scala index 9f7f4051..9a9ba030 100644 --- a/core/src/main/scala/akka/persistence/r2dbc/internal/postgres/YugabyteDurableStateDao.scala +++ b/core/src/main/scala/akka/persistence/r2dbc/internal/postgres/YugabyteDurableStateDao.scala @@ -4,24 +4,26 @@ package akka.persistence.r2dbc.internal.postgres -import akka.actor.typed.ActorSystem -import akka.annotation.InternalApi -import akka.persistence.r2dbc.R2dbcSettings +import scala.concurrent.ExecutionContext + import io.r2dbc.spi._ import org.slf4j.Logger import org.slf4j.LoggerFactory -import scala.concurrent.ExecutionContext +import akka.actor.typed.ActorSystem +import akka.annotation.InternalApi +import akka.persistence.r2dbc.R2dbcSettings +import akka.persistence.r2dbc.internal.Dialect /** * INTERNAL API */ @InternalApi -private[r2dbc] final class YugabyteDurableStateDao(settings: R2dbcSettings, connectionFactory: ConnectionFactory)( - implicit - ec: ExecutionContext, - system: ActorSystem[_]) - extends PostgresDurableStateDao(settings, connectionFactory) { +private[r2dbc] final class YugabyteDurableStateDao( + settings: R2dbcSettings, + connectionFactory: ConnectionFactory, + dialect: Dialect)(implicit ec: ExecutionContext, system: ActorSystem[_]) + extends PostgresDurableStateDao(settings, connectionFactory, dialect) { override protected lazy val log: Logger = LoggerFactory.getLogger(classOf[YugabyteDurableStateDao]) diff --git a/core/src/main/scala/akka/persistence/r2dbc/state/javadsl/R2dbcDurableStateStore.scala b/core/src/main/scala/akka/persistence/r2dbc/state/javadsl/R2dbcDurableStateStore.scala index c0550d17..40b3f20d 100644 --- a/core/src/main/scala/akka/persistence/r2dbc/state/javadsl/R2dbcDurableStateStore.scala +++ b/core/src/main/scala/akka/persistence/r2dbc/state/javadsl/R2dbcDurableStateStore.scala @@ -7,7 +7,9 @@ package akka.persistence.r2dbc.state.javadsl import java.util import java.util.Optional import java.util.concurrent.CompletionStage + import scala.concurrent.ExecutionContext + import akka.Done import akka.NotUsed import akka.japi.Pair @@ -16,18 +18,18 @@ import akka.persistence.query.Offset import akka.persistence.query.javadsl.DurableStateStorePagedPersistenceIdsQuery import akka.persistence.query.typed.javadsl.DurableStateStoreBySliceQuery import akka.persistence.r2dbc.state.scaladsl.{ R2dbcDurableStateStore => ScalaR2dbcDurableStateStore } -import akka.persistence.state.javadsl.DurableStateUpdateStore import akka.persistence.state.javadsl.GetObjectResult import akka.stream.javadsl.Source - import scala.compat.java8.FutureConverters.FutureOps +import akka.persistence.state.javadsl.DurableStateUpdateWithChangeEventStore + object R2dbcDurableStateStore { val Identifier: String = ScalaR2dbcDurableStateStore.Identifier } class R2dbcDurableStateStore[A](scalaStore: ScalaR2dbcDurableStateStore[A])(implicit ec: ExecutionContext) - extends DurableStateUpdateStore[A] + extends DurableStateUpdateWithChangeEventStore[A] with DurableStateStoreBySliceQuery[A] with DurableStateStorePagedPersistenceIdsQuery[A] { @@ -40,6 +42,14 @@ class R2dbcDurableStateStore[A](scalaStore: ScalaR2dbcDurableStateStore[A])(impl override def upsertObject(persistenceId: String, revision: Long, value: A, tag: String): CompletionStage[Done] = scalaStore.upsertObject(persistenceId, revision, value, tag).toJava + override def upsertObject( + persistenceId: String, + revision: Long, + value: A, + tag: String, + changeEvent: Any): CompletionStage[Done] = + scalaStore.upsertObject(persistenceId, revision, value, tag, changeEvent).toJava + @deprecated(message = "Use the deleteObject overload with revision instead.", since = "1.0.0") override def deleteObject(persistenceId: String): CompletionStage[Done] = deleteObject(persistenceId, revision = 0) @@ -47,6 +57,9 @@ class R2dbcDurableStateStore[A](scalaStore: ScalaR2dbcDurableStateStore[A])(impl override def deleteObject(persistenceId: String, revision: Long): CompletionStage[Done] = scalaStore.deleteObject(persistenceId, revision).toJava + override def deleteObject(persistenceId: String, revision: Long, changeEvent: Any): CompletionStage[Done] = + scalaStore.deleteObject(persistenceId, revision, changeEvent).toJava + override def currentChangesBySlices( entityType: String, minSlice: Int, diff --git a/core/src/main/scala/akka/persistence/r2dbc/state/scaladsl/R2dbcDurableStateStore.scala b/core/src/main/scala/akka/persistence/r2dbc/state/scaladsl/R2dbcDurableStateStore.scala index 4e42eae8..be7656e5 100644 --- a/core/src/main/scala/akka/persistence/r2dbc/state/scaladsl/R2dbcDurableStateStore.scala +++ b/core/src/main/scala/akka/persistence/r2dbc/state/scaladsl/R2dbcDurableStateStore.scala @@ -4,21 +4,27 @@ package akka.persistence.r2dbc.state.scaladsl +import java.time.Instant +import java.util.UUID + import scala.collection.immutable import scala.concurrent.ExecutionContext import scala.concurrent.Future + import akka.Done import akka.NotUsed import akka.actor.ExtendedActorSystem import akka.actor.typed.scaladsl.LoggerOps import akka.actor.typed.scaladsl.adapter._ import akka.persistence.Persistence +import akka.persistence.SerializedEvent import akka.persistence.query.DeletedDurableState import akka.persistence.query.DurableStateChange import akka.persistence.query.Offset import akka.persistence.query.TimestampOffset import akka.persistence.query.UpdatedDurableState import akka.persistence.query.scaladsl.DurableStateStorePagedPersistenceIdsQuery +import akka.persistence.query.typed.EventEnvelope import akka.persistence.query.typed.scaladsl.DurableStateStoreBySliceQuery import akka.persistence.r2dbc.ConnectionFactoryProvider import akka.persistence.r2dbc.R2dbcSettings @@ -26,8 +32,14 @@ import akka.persistence.r2dbc.internal.BySliceQuery import akka.persistence.r2dbc.internal.ContinuousQuery import akka.persistence.r2dbc.internal.DurableStateDao import akka.persistence.r2dbc.internal.DurableStateDao.SerializedStateRow -import akka.persistence.state.scaladsl.DurableStateUpdateStore +import akka.persistence.r2dbc.internal.EnvelopeOrigin +import akka.persistence.r2dbc.internal.InstantFactory +import akka.persistence.r2dbc.internal.JournalDao +import akka.persistence.r2dbc.internal.JournalDao.SerializedJournalRow +import akka.persistence.r2dbc.internal.PubSub +import akka.persistence.state.scaladsl.DurableStateUpdateWithChangeEventStore import akka.persistence.state.scaladsl.GetObjectResult +import akka.persistence.typed.PersistenceId import akka.serialization.SerializationExtension import akka.serialization.Serializers import akka.stream.scaladsl.Source @@ -45,7 +57,7 @@ object R2dbcDurableStateStore { } class R2dbcDurableStateStore[A](system: ExtendedActorSystem, config: Config, cfgPath: String) - extends DurableStateUpdateStore[A] + extends DurableStateUpdateWithChangeEventStore[A] with DurableStateStoreBySliceQuery[A] with DurableStateStorePagedPersistenceIdsQuery[A] { import R2dbcDurableStateStore.PersistenceIdsQueryState @@ -53,6 +65,7 @@ class R2dbcDurableStateStore[A](system: ExtendedActorSystem, config: Config, cfg private val log = LoggerFactory.getLogger(getClass) private val sharedConfigPath = cfgPath.replaceAll("""\.state$""", "") private val settings = R2dbcSettings(system.settings.config.getConfig(sharedConfigPath)) + private val journalSettings = R2dbcSettings(system.settings.config.getConfig(sharedConfigPath)) log.debug("R2DBC journal starting up with dialect [{}]", settings.dialectName) private val typedSystem = system.toTyped @@ -62,6 +75,11 @@ class R2dbcDurableStateStore[A](system: ExtendedActorSystem, config: Config, cfg settings, ConnectionFactoryProvider(typedSystem) .connectionFactoryFor(sharedConfigPath + ".connection-factory"))(typedSystem) + private val changeEventWriterUuid = UUID.randomUUID().toString + + private val pubSub: Option[PubSub] = + if (journalSettings.journalPublishEvents) Some(PubSub(typedSystem)) + else None private val bySlice: BySliceQuery[SerializedStateRow, DurableStateChange[A]] = { val createEnvelope: (TimestampOffset, SerializedStateRow) => DurableStateChange[A] = (offset, row) => { @@ -109,12 +127,39 @@ class R2dbcDurableStateStore[A](system: ExtendedActorSystem, config: Config, cfg * the existing stored `revision` + 1 isn't equal to the given `revision`. This optimistic locking check can be * disabled with configuration `assert-single-writer`. */ - override def upsertObject(persistenceId: String, revision: Long, value: A, tag: String): Future[Done] = { + override def upsertObject(persistenceId: String, revision: Long, value: A, tag: String): Future[Done] = + internalUpsertObject(persistenceId, revision, value, tag, changeEvent = None) + + /** + * Insert the value if `revision` is 1, which will fail with `IllegalStateException` if there is already a stored + * value for the given `persistenceId`. Otherwise update the value, which will fail with `IllegalStateException` if + * the existing stored `revision` + 1 isn't equal to the given `revision`. This optimistic locking check can be + * disabled with configuration `assert-single-writer`. + * + * The `changeEvent` is written to the event journal in the same transaction as the DurableState upsert. Same + * `persistenceId` is used in the journal and the `revision` is used as `sequenceNr`. + */ + override def upsertObject( + persistenceId: String, + revision: Long, + value: A, + tag: String, + changeEvent: Any): Future[Done] = + internalUpsertObject(persistenceId, revision, value, tag, changeEvent = Some(changeEvent)) + + private def internalUpsertObject( + persistenceId: String, + revision: Long, + value: A, + tag: String, + changeEvent: Option[Any]): Future[Done] = { + val valueAnyRef = value.asInstanceOf[AnyRef] val serialized = serialization.serialize(valueAnyRef).get val serializer = serialization.findSerializerFor(valueAnyRef) val manifest = Serializers.manifestFor(serializer, valueAnyRef) + val tags = if (tag.isEmpty) Set.empty[String] else Set(tag) val serializedRow = SerializedStateRow( persistenceId, revision, @@ -123,10 +168,79 @@ class R2dbcDurableStateStore[A](system: ExtendedActorSystem, config: Config, cfg Some(serialized), serializer.identifier, manifest, - if (tag.isEmpty) Set.empty else Set(tag)) + tags) - stateDao.upsertState(serializedRow, value) + val changeEventTimestamp = + stateDao.upsertState(serializedRow, value, serializedChangeEvent(persistenceId, revision, tag, changeEvent)) + import typedSystem.executionContext + changeEventTimestamp.map { timestampOption => + publish(persistenceId, revision, changeEvent, timestampOption, tags) + Done + } + } + + private def publish( + persistenceId: String, + revision: Long, + changeEvent: Option[Any], + changeEventTimestamp: Option[Instant], + tags: Set[String]): Unit = { + for { + timestamp <- changeEventTimestamp + event <- changeEvent + p <- pubSub + } yield { + val entityType = PersistenceId.extractEntityType(persistenceId) + val slice = persistenceExt.sliceForPersistenceId(persistenceId) + + val offset = TimestampOffset(timestamp, timestamp, Map(persistenceId -> revision)) + + val envelope = EventEnvelope( + offset, + persistenceId, + revision, + event, + timestamp.toEpochMilli, + entityType, + slice, + filtered = false, + source = EnvelopeOrigin.SourcePubSub, + tags) + p.publish(envelope) + } + } + + private def serializedChangeEvent(persistenceId: String, revision: Long, tag: String, changeEvent: Option[Any]) = { + changeEvent.map { event => + val eventAnyRef = event.asInstanceOf[AnyRef] + val serializedEvent = eventAnyRef match { + case s: SerializedEvent => s // already serialized + case _ => + val bytes = serialization.serialize(eventAnyRef).get + val serializer = serialization.findSerializerFor(eventAnyRef) + val manifest = Serializers.manifestFor(serializer, eventAnyRef) + new SerializedEvent(bytes, serializer.identifier, manifest) + } + + val entityType = PersistenceId.extractEntityType(persistenceId) + val slice = persistenceExt.sliceForPersistenceId(persistenceId) + val timestamp = if (journalSettings.useAppTimestamp) InstantFactory.now() else JournalDao.EmptyDbTimestamp + + SerializedJournalRow( + slice, + entityType, + persistenceId, + revision, + timestamp, + JournalDao.EmptyDbTimestamp, + Some(serializedEvent.bytes), + serializedEvent.serializerId, + serializedEvent.serializerManifest, + changeEventWriterUuid, + if (tag.isEmpty) Set.empty else Set(tag), + metadata = None) + } } @deprecated(message = "Use the deleteObject overload with revision instead.", since = "1.0.0") @@ -142,8 +256,35 @@ class R2dbcDurableStateStore[A](system: ExtendedActorSystem, config: Config, cfg * If the given revision is `0` it will fully delete the value and revision from the database without any optimistic * locking check. Next call to [[getObject]] will then return revision 0 and no value. */ - override def deleteObject(persistenceId: String, revision: Long): Future[Done] = { - stateDao.deleteState(persistenceId, revision) + override def deleteObject(persistenceId: String, revision: Long): Future[Done] = + internalDeleteObject(persistenceId, revision, changeEvent = None) + + /** + * Delete the value, which will fail with `IllegalStateException` if the existing stored `revision` + 1 isn't equal to + * the given `revision`. This optimistic locking check can be disabled with configuration `assert-single-writer`. The + * stored revision for the persistenceId is updated and next call to [[getObject]] will return the revision, but with + * no value. + * + * If the given revision is `0` it will fully delete the value and revision from the database without any optimistic + * locking check. Next call to [[getObject]] will then return revision 0 and no value. + * + * The `changeEvent` is written to the event journal in the same transaction as the DurableState upsert. Same + * `persistenceId` is used in the journal and the `revision` is used as `sequenceNr`. + */ + override def deleteObject(persistenceId: String, revision: Long, changeEvent: Any): Future[Done] = + internalDeleteObject(persistenceId, revision, changeEvent = Some(changeEvent)) + + private def internalDeleteObject(persistenceId: String, revision: Long, changeEvent: Option[Any]): Future[Done] = { + val changeEventTimestamp = stateDao.deleteState( + persistenceId, + revision, + serializedChangeEvent(persistenceId, revision, tag = "", changeEvent)) + + import typedSystem.executionContext + changeEventTimestamp.map { timestampOption => + publish(persistenceId, revision, changeEvent, timestampOption, tags = Set.empty) + Done + } } override def sliceForPersistenceId(persistenceId: String): Int = diff --git a/core/src/test/scala/akka/persistence/r2dbc/state/DurableStateUpdateWithChangeEventStoreSpec.scala b/core/src/test/scala/akka/persistence/r2dbc/state/DurableStateUpdateWithChangeEventStoreSpec.scala new file mode 100644 index 00000000..c7578c3d --- /dev/null +++ b/core/src/test/scala/akka/persistence/r2dbc/state/DurableStateUpdateWithChangeEventStoreSpec.scala @@ -0,0 +1,212 @@ +/* + * Copyright (C) 2022 - 2023 Lightbend Inc. + */ + +package akka.persistence.r2dbc.state + +import org.scalatest.concurrent.ScalaFutures.convertScalaFuture + +import akka.actor.testkit.typed.scaladsl.LogCapturing +import akka.actor.testkit.typed.scaladsl.ScalaTestWithActorTestKit +import akka.actor.typed.ActorSystem +import akka.actor.typed.pubsub.Topic +import akka.persistence.query.PersistenceQuery +import akka.persistence.query.typed.EventEnvelope +import akka.persistence.query.typed.scaladsl.CurrentEventsByPersistenceIdTypedQuery +import akka.persistence.r2dbc.TestConfig +import akka.persistence.r2dbc.TestData +import akka.persistence.r2dbc.TestDbLifecycle +import akka.persistence.r2dbc.internal.EnvelopeOrigin +import akka.persistence.r2dbc.internal.PubSub +import akka.persistence.r2dbc.query.scaladsl.R2dbcReadJournal +import akka.persistence.r2dbc.state.scaladsl.R2dbcDurableStateStore +import akka.persistence.state.DurableStateStoreRegistry +import akka.persistence.state.scaladsl.DurableStateUpdateWithChangeEventStore +import akka.persistence.typed.PersistenceId +import akka.stream.scaladsl.Sink +import org.scalatest.wordspec.AnyWordSpecLike + +import akka.Done +import akka.persistence.r2dbc.TestActors.Persister +import akka.persistence.r2dbc.TestActors.Persister.PersistWithAck + +class DurableStateUpdateWithChangeEventStoreSpec + extends ScalaTestWithActorTestKit(TestConfig.config) + with AnyWordSpecLike + with TestDbLifecycle + with TestData + with LogCapturing { + + override def typedSystem: ActorSystem[_] = system + + private val store = DurableStateStoreRegistry(system) + .durableStateStoreFor[DurableStateUpdateWithChangeEventStore[String]](R2dbcDurableStateStore.Identifier) + private val journalQuery = + PersistenceQuery(system).readJournalFor[CurrentEventsByPersistenceIdTypedQuery](R2dbcReadJournal.Identifier) + + private val tag = "TAG" + + "The R2DBC durable state store" should { + "save additional change event" in { + val entityType = nextEntityType() + val persistenceId = PersistenceId(entityType, "my-persistenceId").id + val value1 = "Genuinely Collaborative" + val value2 = "Open to Feedback" + + store.upsertObject(persistenceId, 1L, value1, tag, s"Changed to $value1").futureValue + store.upsertObject(persistenceId, 2L, value2, tag, s"Changed to $value2").futureValue + store.deleteObject(persistenceId, 3L, "Deleted").futureValue + + val envelopes = journalQuery + .currentEventsByPersistenceIdTyped[String](persistenceId, 1L, Long.MaxValue) + .runWith(Sink.seq) + .futureValue + + val env1 = envelopes.head + env1.event shouldBe s"Changed to $value1" + env1.sequenceNr shouldBe 1L + env1.tags shouldBe Set(tag) + + val env2 = envelopes(1) + env2.event shouldBe s"Changed to $value2" + env2.sequenceNr shouldBe 2L + + val env3 = envelopes(2) + env3.event shouldBe s"Deleted" + env3.sequenceNr shouldBe 3L + } + + "save additional change event in same transaction" in { + // test rollback (same tx) if the journal insert fails via simulated unique constraint violation in event_journal + val entityType = nextEntityType() + val persistenceId = PersistenceId(entityType, "my-persistenceId").id + + val probe = testKit.createTestProbe[Done]() + val persister = testKit.spawn(Persister(persistenceId)) + persister ! PersistWithAck("a", probe.ref) + probe.expectMessage(Done) + testKit.stop(persister) + + val value1 = "Genuinely Collaborative" + + store.upsertObject(persistenceId, 1L, value1, tag, s"Changed to $value1").failed.futureValue + + store.getObject(persistenceId).futureValue.value shouldBe None + } + + "detect and reject concurrent inserts, and not store change event" in { + val entityType = nextEntityType() + val persistenceId = PersistenceId(entityType, "id-to-be-inserted-concurrently").id + val value = "Genuinely Collaborative" + store.upsertObject(persistenceId, revision = 1L, value, tag, s"Changed to $value").futureValue + + val updatedValue = "Open to Feedback" + store + .upsertObject(persistenceId, revision = 1L, updatedValue, tag, s"Changed to $updatedValue") + .failed + .futureValue + + val envelopes = journalQuery + .currentEventsByPersistenceIdTyped[String](persistenceId, 1L, Long.MaxValue) + .runWith(Sink.seq) + .futureValue + envelopes.size shouldBe 1 + } + + "detect and reject concurrent updates, and not store change event" in { + if (!r2dbcSettings.durableStateAssertSingleWriter) + pending + + val entityType = nextEntityType() + val persistenceId = PersistenceId(entityType, "id-to-be-updated-concurrently").id + val value = "Genuinely Collaborative" + store.upsertObject(persistenceId, revision = 1L, value, tag, s"Changed to $value").futureValue + + val updatedValue = "Open to Feedback" + store.upsertObject(persistenceId, revision = 2L, updatedValue, tag, s"Changed to $updatedValue").futureValue + + // simulate an update by a different node that didn't see the first one: + val updatedValue2 = "Genuine and Sincere in all Communications" + store + .upsertObject(persistenceId, revision = 2L, updatedValue2, tag, s"Changed to $updatedValue2") + .failed + .futureValue + + val envelopes = journalQuery + .currentEventsByPersistenceIdTyped[String](persistenceId, 1L, Long.MaxValue) + .runWith(Sink.seq) + .futureValue + envelopes.size shouldBe 2 + } + + "detect and reject concurrent delete of revision 1, and not store change event" in { + val entityType = nextEntityType() + val persistenceId = PersistenceId(entityType, "id-to-be-deleted-concurrently").id + val value = "Genuinely Collaborative" + store.upsertObject(persistenceId, revision = 1L, value, tag, s"Changed to $value").futureValue + + store.deleteObject(persistenceId, revision = 1L, "Deleted").failed.futureValue + + val envelopes = journalQuery + .currentEventsByPersistenceIdTyped[String](persistenceId, 1L, Long.MaxValue) + .runWith(Sink.seq) + .futureValue + envelopes.size shouldBe 1 + } + + "detect and reject concurrent deletes, and not store change event" in { + if (!r2dbcSettings.durableStateAssertSingleWriter) + pending + + val entityType = nextEntityType() + val persistenceId = PersistenceId(entityType, "id-to-be-updated-concurrently").id + val value = "Genuinely Collaborative" + store.upsertObject(persistenceId, revision = 1L, value, tag, s"Changed to $value").futureValue + + val updatedValue = "Open to Feedback" + store.upsertObject(persistenceId, revision = 2L, updatedValue, tag, s"Changed to $updatedValue").futureValue + + // simulate a delete by a different node that didn't see the first one: + store.deleteObject(persistenceId, revision = 2L, "Deleted").failed.futureValue + + val envelopes = journalQuery + .currentEventsByPersistenceIdTyped[String](persistenceId, 1L, Long.MaxValue) + .runWith(Sink.seq) + .futureValue + envelopes.size shouldBe 2 + } + + } + + "publish change event" in { + val entityType = nextEntityType() + val persistenceId = PersistenceId(entityType, "my-persistenceId").id + + val slice = persistenceExt.sliceForPersistenceId(persistenceId) + val topic = PubSub(system).eventTopic[String](entityType, slice) + val subscriberProbe = createTestProbe[EventEnvelope[String]]() + topic ! Topic.Subscribe(subscriberProbe.ref) + + val value1 = "Genuinely Collaborative" + val value2 = "Open to Feedback" + + store.upsertObject(persistenceId, 1L, value1, tag, s"Changed to $value1").futureValue + store.upsertObject(persistenceId, 2L, value2, tag, s"Changed to $value2").futureValue + store.deleteObject(persistenceId, 3L, "Deleted").futureValue + + val env1 = subscriberProbe.receiveMessage() + env1.event shouldBe s"Changed to $value1" + env1.sequenceNr shouldBe 1L + env1.tags shouldBe Set(tag) + env1.source shouldBe EnvelopeOrigin.SourcePubSub + + val env2 = subscriberProbe.receiveMessage() + env2.event shouldBe s"Changed to $value2" + env2.sequenceNr shouldBe 2L + + val env3 = subscriberProbe.receiveMessage() + env3.event shouldBe s"Deleted" + env3.sequenceNr shouldBe 3L + } + +} diff --git a/project/Dependencies.scala b/project/Dependencies.scala index 011f4338..be19a724 100644 --- a/project/Dependencies.scala +++ b/project/Dependencies.scala @@ -9,7 +9,7 @@ object Dependencies { val Scala3 = "3.3.1" val Scala2Versions = Seq(Scala213) val ScalaVersions = Dependencies.Scala2Versions :+ Dependencies.Scala3 - val AkkaVersion = System.getProperty("override.akka.version", "2.9.0") + val AkkaVersion = System.getProperty("override.akka.version", "2.9.1") val AkkaVersionInDocs = AkkaVersion.take(3) val AkkaPersistenceJdbcVersion = "5.2.0" // only in migration tool tests val AkkaProjectionVersionInDocs = "current"