From 5e30ce8c59f04d5d304764842feb0c018cdcd864 Mon Sep 17 00:00:00 2001 From: Patrik Nordwall Date: Tue, 12 Dec 2023 09:15:55 +0100 Subject: [PATCH] impl DurableStateUpdateWithChangeEventStore trait --- .../scaladsl/DurableStateCleanup.scala | 4 +- .../r2dbc/internal/DurableStateDao.scala | 2 +- .../postgres/PostgresDurableStateDao.scala | 19 ++- .../javadsl/R2dbcDurableStateStore.scala | 19 ++- .../scaladsl/R2dbcDurableStateStore.scala | 113 +++++++++++------- project/Dependencies.scala | 2 +- 6 files changed, 98 insertions(+), 61 deletions(-) diff --git a/core/src/main/scala/akka/persistence/r2dbc/cleanup/scaladsl/DurableStateCleanup.scala b/core/src/main/scala/akka/persistence/r2dbc/cleanup/scaladsl/DurableStateCleanup.scala index 90a70c23..b5fd7dcf 100644 --- a/core/src/main/scala/akka/persistence/r2dbc/cleanup/scaladsl/DurableStateCleanup.scala +++ b/core/src/main/scala/akka/persistence/r2dbc/cleanup/scaladsl/DurableStateCleanup.scala @@ -65,11 +65,11 @@ final class DurableStateCleanup(systemProvider: ClassicActorSystemProvider, conf */ def deleteState(persistenceId: String, resetRevisionNumber: Boolean): Future[Done] = { if (resetRevisionNumber) - stateDao.deleteState(persistenceId, revision = 0L) // hard delete without revision check + stateDao.deleteState(persistenceId, revision = 0L, changeEvent = None) // hard delete without revision check else { stateDao.readState(persistenceId).flatMap { case None => Future.successful(Done) // already deleted - case Some(s) => stateDao.deleteState(persistenceId, s.revision + 1) + case Some(s) => stateDao.deleteState(persistenceId, s.revision + 1, changeEvent = None) } } } diff --git a/core/src/main/scala/akka/persistence/r2dbc/internal/DurableStateDao.scala b/core/src/main/scala/akka/persistence/r2dbc/internal/DurableStateDao.scala index 63427e8c..0b6f4892 100644 --- a/core/src/main/scala/akka/persistence/r2dbc/internal/DurableStateDao.scala +++ b/core/src/main/scala/akka/persistence/r2dbc/internal/DurableStateDao.scala @@ -52,7 +52,7 @@ private[r2dbc] trait DurableStateDao extends BySliceQuery.Dao[DurableStateDao.Se def upsertState(state: SerializedStateRow, value: Any, changeEvent: Option[SerializedJournalRow]): Future[Done] - def deleteState(persistenceId: String, revision: Long): Future[Done] + def deleteState(persistenceId: String, revision: Long, changeEvent: Option[SerializedJournalRow]): Future[Done] def persistenceIds(afterId: Option[String], limit: Long): Source[String, NotUsed] diff --git a/core/src/main/scala/akka/persistence/r2dbc/internal/postgres/PostgresDurableStateDao.scala b/core/src/main/scala/akka/persistence/r2dbc/internal/postgres/PostgresDurableStateDao.scala index 7a18ecb4..1131a5da 100644 --- a/core/src/main/scala/akka/persistence/r2dbc/internal/postgres/PostgresDurableStateDao.scala +++ b/core/src/main/scala/akka/persistence/r2dbc/internal/postgres/PostgresDurableStateDao.scala @@ -469,7 +469,10 @@ private[r2dbc] class PostgresDurableStateDao( } } - override def deleteState(persistenceId: String, revision: Long): Future[Done] = { + override def deleteState( + persistenceId: String, + revision: Long, + changeEvent: Option[SerializedJournalRow]): Future[Done] = { if (revision == 0) { hardDeleteState(persistenceId) } else { @@ -501,14 +504,11 @@ private[r2dbc] class PostgresDurableStateDao( s"Insert delete marker with revision 1 failed: durable state for persistence id [$persistenceId] already exists")) } - val changeHandler = changeHandlers.get(entityType) - val changeHandlerHint = changeHandler.map(_ => " with change handler").getOrElse("") - - r2dbcExecutor.withConnection(s"insert delete marker [$persistenceId]$changeHandlerHint") { connection => + r2dbcExecutor.withConnection(s"insert delete marker [$persistenceId]") { connection => for { updatedRows <- recoverDataIntegrityViolation( R2dbcExecutor.updateOneInTx(insertDeleteMarkerStatement(connection))) - _ <- writeChangeEventAndCallChangeHander(connection, updatedRows, entityType, change, changeEvent = None) + _ <- writeChangeEventAndCallChangeHander(connection, updatedRows, entityType, change, changeEvent) } yield updatedRows } @@ -546,13 +546,10 @@ private[r2dbc] class PostgresDurableStateDao( } } - val changeHandler = changeHandlers.get(entityType) - val changeHandlerHint = changeHandler.map(_ => " with change handler").getOrElse("") - - r2dbcExecutor.withConnection(s"delete [$persistenceId]$changeHandlerHint") { connection => + r2dbcExecutor.withConnection(s"delete [$persistenceId]") { connection => for { updatedRows <- R2dbcExecutor.updateOneInTx(updateStatement(connection)) - _ <- writeChangeEventAndCallChangeHander(connection, updatedRows, entityType, change, changeEvent = None) + _ <- writeChangeEventAndCallChangeHander(connection, updatedRows, entityType, change, changeEvent) } yield updatedRows } } diff --git a/core/src/main/scala/akka/persistence/r2dbc/state/javadsl/R2dbcDurableStateStore.scala b/core/src/main/scala/akka/persistence/r2dbc/state/javadsl/R2dbcDurableStateStore.scala index c0550d17..40b3f20d 100644 --- a/core/src/main/scala/akka/persistence/r2dbc/state/javadsl/R2dbcDurableStateStore.scala +++ b/core/src/main/scala/akka/persistence/r2dbc/state/javadsl/R2dbcDurableStateStore.scala @@ -7,7 +7,9 @@ package akka.persistence.r2dbc.state.javadsl import java.util import java.util.Optional import java.util.concurrent.CompletionStage + import scala.concurrent.ExecutionContext + import akka.Done import akka.NotUsed import akka.japi.Pair @@ -16,18 +18,18 @@ import akka.persistence.query.Offset import akka.persistence.query.javadsl.DurableStateStorePagedPersistenceIdsQuery import akka.persistence.query.typed.javadsl.DurableStateStoreBySliceQuery import akka.persistence.r2dbc.state.scaladsl.{ R2dbcDurableStateStore => ScalaR2dbcDurableStateStore } -import akka.persistence.state.javadsl.DurableStateUpdateStore import akka.persistence.state.javadsl.GetObjectResult import akka.stream.javadsl.Source - import scala.compat.java8.FutureConverters.FutureOps +import akka.persistence.state.javadsl.DurableStateUpdateWithChangeEventStore + object R2dbcDurableStateStore { val Identifier: String = ScalaR2dbcDurableStateStore.Identifier } class R2dbcDurableStateStore[A](scalaStore: ScalaR2dbcDurableStateStore[A])(implicit ec: ExecutionContext) - extends DurableStateUpdateStore[A] + extends DurableStateUpdateWithChangeEventStore[A] with DurableStateStoreBySliceQuery[A] with DurableStateStorePagedPersistenceIdsQuery[A] { @@ -40,6 +42,14 @@ class R2dbcDurableStateStore[A](scalaStore: ScalaR2dbcDurableStateStore[A])(impl override def upsertObject(persistenceId: String, revision: Long, value: A, tag: String): CompletionStage[Done] = scalaStore.upsertObject(persistenceId, revision, value, tag).toJava + override def upsertObject( + persistenceId: String, + revision: Long, + value: A, + tag: String, + changeEvent: Any): CompletionStage[Done] = + scalaStore.upsertObject(persistenceId, revision, value, tag, changeEvent).toJava + @deprecated(message = "Use the deleteObject overload with revision instead.", since = "1.0.0") override def deleteObject(persistenceId: String): CompletionStage[Done] = deleteObject(persistenceId, revision = 0) @@ -47,6 +57,9 @@ class R2dbcDurableStateStore[A](scalaStore: ScalaR2dbcDurableStateStore[A])(impl override def deleteObject(persistenceId: String, revision: Long): CompletionStage[Done] = scalaStore.deleteObject(persistenceId, revision).toJava + override def deleteObject(persistenceId: String, revision: Long, changeEvent: Any): CompletionStage[Done] = + scalaStore.deleteObject(persistenceId, revision, changeEvent).toJava + override def currentChangesBySlices( entityType: String, minSlice: Int, diff --git a/core/src/main/scala/akka/persistence/r2dbc/state/scaladsl/R2dbcDurableStateStore.scala b/core/src/main/scala/akka/persistence/r2dbc/state/scaladsl/R2dbcDurableStateStore.scala index e70b2d12..7980a950 100644 --- a/core/src/main/scala/akka/persistence/r2dbc/state/scaladsl/R2dbcDurableStateStore.scala +++ b/core/src/main/scala/akka/persistence/r2dbc/state/scaladsl/R2dbcDurableStateStore.scala @@ -4,6 +4,8 @@ package akka.persistence.r2dbc.state.scaladsl +import java.util.UUID + import scala.collection.immutable import scala.concurrent.ExecutionContext import scala.concurrent.Future @@ -31,7 +33,7 @@ import akka.persistence.r2dbc.internal.DurableStateDao.SerializedStateRow import akka.persistence.r2dbc.internal.InstantFactory import akka.persistence.r2dbc.internal.JournalDao import akka.persistence.r2dbc.internal.JournalDao.SerializedJournalRow -import akka.persistence.state.scaladsl.DurableStateUpdateStore +import akka.persistence.state.scaladsl.DurableStateUpdateWithChangeEventStore import akka.persistence.state.scaladsl.GetObjectResult import akka.persistence.typed.PersistenceId import akka.serialization.SerializationExtension @@ -51,7 +53,7 @@ object R2dbcDurableStateStore { } class R2dbcDurableStateStore[A](system: ExtendedActorSystem, config: Config, cfgPath: String) - extends DurableStateUpdateStore[A] + extends DurableStateUpdateWithChangeEventStore[A] with DurableStateStoreBySliceQuery[A] with DurableStateStorePagedPersistenceIdsQuery[A] { import R2dbcDurableStateStore.PersistenceIdsQueryState @@ -69,6 +71,7 @@ class R2dbcDurableStateStore[A](system: ExtendedActorSystem, config: Config, cfg settings, ConnectionFactoryProvider(typedSystem) .connectionFactoryFor(sharedConfigPath + ".connection-factory"))(typedSystem) + private val changeEventWriterUuid = UUID.randomUUID().toString private val bySlice: BySliceQuery[SerializedStateRow, DurableStateChange[A]] = { val createEnvelope: (TimestampOffset, SerializedStateRow) => DurableStateChange[A] = (offset, row) => { @@ -117,7 +120,7 @@ class R2dbcDurableStateStore[A](system: ExtendedActorSystem, config: Config, cfg * disabled with configuration `assert-single-writer`. */ override def upsertObject(persistenceId: String, revision: Long, value: A, tag: String): Future[Done] = - upsertObject(persistenceId, revision, value, tag, changeEvent = None) + internalUpsertObject(persistenceId, revision, value, tag, changeEvent = None) /** * Insert the value if `revision` is 1, which will fail with `IllegalStateException` if there is already a stored @@ -125,16 +128,23 @@ class R2dbcDurableStateStore[A](system: ExtendedActorSystem, config: Config, cfg * the existing stored `revision` + 1 isn't equal to the given `revision`. This optimistic locking check can be * disabled with configuration `assert-single-writer`. * - * The `changeEvent`, if defined, is written to the event journal in the same transaction as the DurableState upsert. - * Same `persistenceId` is used in the journal and the `revision` is used as `sequenceNr`. + * The `changeEvent` is written to the event journal in the same transaction as the DurableState upsert. Same + * `persistenceId` is used in the journal and the `revision` is used as `sequenceNr`. */ - def upsertObject( + override def upsertObject( + persistenceId: String, + revision: Long, + value: A, + tag: String, + changeEvent: Any): Future[Done] = + internalUpsertObject(persistenceId, revision, value, tag, changeEvent = Some(changeEvent)) + + private def internalUpsertObject( persistenceId: String, revision: Long, value: A, tag: String, changeEvent: Option[Any]): Future[Done] = { - // FIXME add new trait in Akka for this method. Maybe we need it for the deletes too. val valueAnyRef = value.asInstanceOf[AnyRef] val serialized = serialization.serialize(valueAnyRef).get @@ -151,44 +161,44 @@ class R2dbcDurableStateStore[A](system: ExtendedActorSystem, config: Config, cfg manifest, if (tag.isEmpty) Set.empty else Set(tag)) - val serializedChangedEvent: Option[SerializedJournalRow] = { - changeEvent.map { event => - val eventAnyRef = event.asInstanceOf[AnyRef] - val serializedEvent = eventAnyRef match { - case s: SerializedEvent => s // already serialized - case _ => - val bytes = serialization.serialize(eventAnyRef).get - val serializer = serialization.findSerializerFor(eventAnyRef) - val manifest = Serializers.manifestFor(serializer, eventAnyRef) - new SerializedEvent(bytes, serializer.identifier, manifest) - } - - val entityType = PersistenceId.extractEntityType(persistenceId) - val slice = persistenceExt.sliceForPersistenceId(persistenceId) - val timestamp = if (journalSettings.useAppTimestamp) InstantFactory.now() else JournalDao.EmptyDbTimestamp - - SerializedJournalRow( - slice, - entityType, - persistenceId, - revision, - timestamp, - JournalDao.EmptyDbTimestamp, - Some(serializedEvent.bytes), - serializedEvent.serializerId, - serializedEvent.serializerManifest, - "", // FIXME writerUuid, or shall we make one? - if (tag.isEmpty) Set.empty else Set(tag), - metadata = None) - } - } - - stateDao.upsertState(serializedRow, value, serializedChangedEvent) + stateDao.upsertState(serializedRow, value, serializedChangeEvent(persistenceId, revision, tag, changeEvent)) // FIXME PubSub, but not via PersistentRepr } + private def serializedChangeEvent(persistenceId: String, revision: Long, tag: String, changeEvent: Option[Any]) = { + changeEvent.map { event => + val eventAnyRef = event.asInstanceOf[AnyRef] + val serializedEvent = eventAnyRef match { + case s: SerializedEvent => s // already serialized + case _ => + val bytes = serialization.serialize(eventAnyRef).get + val serializer = serialization.findSerializerFor(eventAnyRef) + val manifest = Serializers.manifestFor(serializer, eventAnyRef) + new SerializedEvent(bytes, serializer.identifier, manifest) + } + + val entityType = PersistenceId.extractEntityType(persistenceId) + val slice = persistenceExt.sliceForPersistenceId(persistenceId) + val timestamp = if (journalSettings.useAppTimestamp) InstantFactory.now() else JournalDao.EmptyDbTimestamp + + SerializedJournalRow( + slice, + entityType, + persistenceId, + revision, + timestamp, + JournalDao.EmptyDbTimestamp, + Some(serializedEvent.bytes), + serializedEvent.serializerId, + serializedEvent.serializerManifest, + changeEventWriterUuid, + if (tag.isEmpty) Set.empty else Set(tag), + metadata = None) + } + } + @deprecated(message = "Use the deleteObject overload with revision instead.", since = "1.0.0") override def deleteObject(persistenceId: String): Future[Done] = deleteObject(persistenceId, revision = 0) @@ -202,9 +212,26 @@ class R2dbcDurableStateStore[A](system: ExtendedActorSystem, config: Config, cfg * If the given revision is `0` it will fully delete the value and revision from the database without any optimistic * locking check. Next call to [[getObject]] will then return revision 0 and no value. */ - override def deleteObject(persistenceId: String, revision: Long): Future[Done] = { - stateDao.deleteState(persistenceId, revision) - } + override def deleteObject(persistenceId: String, revision: Long): Future[Done] = + internalDeleteObject(persistenceId, revision, changeEvent = None) + + /** + * Delete the value, which will fail with `IllegalStateException` if the existing stored `revision` + 1 isn't equal to + * the given `revision`. This optimistic locking check can be disabled with configuration `assert-single-writer`. The + * stored revision for the persistenceId is updated and next call to [[getObject]] will return the revision, but with + * no value. + * + * If the given revision is `0` it will fully delete the value and revision from the database without any optimistic + * locking check. Next call to [[getObject]] will then return revision 0 and no value. + * + * The `changeEvent` is written to the event journal in the same transaction as the DurableState upsert. Same + * `persistenceId` is used in the journal and the `revision` is used as `sequenceNr`. + */ + override def deleteObject(persistenceId: String, revision: Long, changeEvent: Any): Future[Done] = + internalDeleteObject(persistenceId, revision, changeEvent = Some(changeEvent)) + + private def internalDeleteObject(persistenceId: String, revision: Long, changeEvent: Option[Any]): Future[Done] = + stateDao.deleteState(persistenceId, revision, serializedChangeEvent(persistenceId, revision, tag = "", changeEvent)) override def sliceForPersistenceId(persistenceId: String): Int = persistenceExt.sliceForPersistenceId(persistenceId) diff --git a/project/Dependencies.scala b/project/Dependencies.scala index 011f4338..a4df00f6 100644 --- a/project/Dependencies.scala +++ b/project/Dependencies.scala @@ -9,7 +9,7 @@ object Dependencies { val Scala3 = "3.3.1" val Scala2Versions = Seq(Scala213) val ScalaVersions = Dependencies.Scala2Versions :+ Dependencies.Scala3 - val AkkaVersion = System.getProperty("override.akka.version", "2.9.0") + val AkkaVersion = System.getProperty("override.akka.version", "2.9.1-M1+13-fde4109f-SNAPSHOT") val AkkaVersionInDocs = AkkaVersion.take(3) val AkkaPersistenceJdbcVersion = "5.2.0" // only in migration tool tests val AkkaProjectionVersionInDocs = "current"