Skip to content

Commit

Permalink
impl DurableStateUpdateWithChangeEventStore trait
Browse files Browse the repository at this point in the history
  • Loading branch information
patriknw committed Dec 12, 2023
1 parent e67c6fd commit 5e30ce8
Show file tree
Hide file tree
Showing 6 changed files with 98 additions and 61 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -65,11 +65,11 @@ final class DurableStateCleanup(systemProvider: ClassicActorSystemProvider, conf
*/
def deleteState(persistenceId: String, resetRevisionNumber: Boolean): Future[Done] = {
if (resetRevisionNumber)
stateDao.deleteState(persistenceId, revision = 0L) // hard delete without revision check
stateDao.deleteState(persistenceId, revision = 0L, changeEvent = None) // hard delete without revision check
else {
stateDao.readState(persistenceId).flatMap {
case None => Future.successful(Done) // already deleted
case Some(s) => stateDao.deleteState(persistenceId, s.revision + 1)
case Some(s) => stateDao.deleteState(persistenceId, s.revision + 1, changeEvent = None)
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ private[r2dbc] trait DurableStateDao extends BySliceQuery.Dao[DurableStateDao.Se

def upsertState(state: SerializedStateRow, value: Any, changeEvent: Option[SerializedJournalRow]): Future[Done]

def deleteState(persistenceId: String, revision: Long): Future[Done]
def deleteState(persistenceId: String, revision: Long, changeEvent: Option[SerializedJournalRow]): Future[Done]

def persistenceIds(afterId: Option[String], limit: Long): Source[String, NotUsed]

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -469,7 +469,10 @@ private[r2dbc] class PostgresDurableStateDao(
}
}

override def deleteState(persistenceId: String, revision: Long): Future[Done] = {
override def deleteState(
persistenceId: String,
revision: Long,
changeEvent: Option[SerializedJournalRow]): Future[Done] = {
if (revision == 0) {
hardDeleteState(persistenceId)
} else {
Expand Down Expand Up @@ -501,14 +504,11 @@ private[r2dbc] class PostgresDurableStateDao(
s"Insert delete marker with revision 1 failed: durable state for persistence id [$persistenceId] already exists"))
}

val changeHandler = changeHandlers.get(entityType)
val changeHandlerHint = changeHandler.map(_ => " with change handler").getOrElse("")

r2dbcExecutor.withConnection(s"insert delete marker [$persistenceId]$changeHandlerHint") { connection =>
r2dbcExecutor.withConnection(s"insert delete marker [$persistenceId]") { connection =>
for {
updatedRows <- recoverDataIntegrityViolation(
R2dbcExecutor.updateOneInTx(insertDeleteMarkerStatement(connection)))
_ <- writeChangeEventAndCallChangeHander(connection, updatedRows, entityType, change, changeEvent = None)
_ <- writeChangeEventAndCallChangeHander(connection, updatedRows, entityType, change, changeEvent)
} yield updatedRows
}

Expand Down Expand Up @@ -546,13 +546,10 @@ private[r2dbc] class PostgresDurableStateDao(
}
}

val changeHandler = changeHandlers.get(entityType)
val changeHandlerHint = changeHandler.map(_ => " with change handler").getOrElse("")

r2dbcExecutor.withConnection(s"delete [$persistenceId]$changeHandlerHint") { connection =>
r2dbcExecutor.withConnection(s"delete [$persistenceId]") { connection =>
for {
updatedRows <- R2dbcExecutor.updateOneInTx(updateStatement(connection))
_ <- writeChangeEventAndCallChangeHander(connection, updatedRows, entityType, change, changeEvent = None)
_ <- writeChangeEventAndCallChangeHander(connection, updatedRows, entityType, change, changeEvent)
} yield updatedRows
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,9 @@ package akka.persistence.r2dbc.state.javadsl
import java.util
import java.util.Optional
import java.util.concurrent.CompletionStage

import scala.concurrent.ExecutionContext

import akka.Done
import akka.NotUsed
import akka.japi.Pair
Expand All @@ -16,18 +18,18 @@ import akka.persistence.query.Offset
import akka.persistence.query.javadsl.DurableStateStorePagedPersistenceIdsQuery
import akka.persistence.query.typed.javadsl.DurableStateStoreBySliceQuery
import akka.persistence.r2dbc.state.scaladsl.{ R2dbcDurableStateStore => ScalaR2dbcDurableStateStore }
import akka.persistence.state.javadsl.DurableStateUpdateStore
import akka.persistence.state.javadsl.GetObjectResult
import akka.stream.javadsl.Source

import scala.compat.java8.FutureConverters.FutureOps

import akka.persistence.state.javadsl.DurableStateUpdateWithChangeEventStore

object R2dbcDurableStateStore {
val Identifier: String = ScalaR2dbcDurableStateStore.Identifier
}

class R2dbcDurableStateStore[A](scalaStore: ScalaR2dbcDurableStateStore[A])(implicit ec: ExecutionContext)
extends DurableStateUpdateStore[A]
extends DurableStateUpdateWithChangeEventStore[A]
with DurableStateStoreBySliceQuery[A]
with DurableStateStorePagedPersistenceIdsQuery[A] {

Expand All @@ -40,13 +42,24 @@ class R2dbcDurableStateStore[A](scalaStore: ScalaR2dbcDurableStateStore[A])(impl
override def upsertObject(persistenceId: String, revision: Long, value: A, tag: String): CompletionStage[Done] =
scalaStore.upsertObject(persistenceId, revision, value, tag).toJava

override def upsertObject(
persistenceId: String,
revision: Long,
value: A,
tag: String,
changeEvent: Any): CompletionStage[Done] =
scalaStore.upsertObject(persistenceId, revision, value, tag, changeEvent).toJava

@deprecated(message = "Use the deleteObject overload with revision instead.", since = "1.0.0")
override def deleteObject(persistenceId: String): CompletionStage[Done] =
deleteObject(persistenceId, revision = 0)

override def deleteObject(persistenceId: String, revision: Long): CompletionStage[Done] =
scalaStore.deleteObject(persistenceId, revision).toJava

override def deleteObject(persistenceId: String, revision: Long, changeEvent: Any): CompletionStage[Done] =
scalaStore.deleteObject(persistenceId, revision, changeEvent).toJava

override def currentChangesBySlices(
entityType: String,
minSlice: Int,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@

package akka.persistence.r2dbc.state.scaladsl

import java.util.UUID

import scala.collection.immutable
import scala.concurrent.ExecutionContext
import scala.concurrent.Future
Expand Down Expand Up @@ -31,7 +33,7 @@ import akka.persistence.r2dbc.internal.DurableStateDao.SerializedStateRow
import akka.persistence.r2dbc.internal.InstantFactory
import akka.persistence.r2dbc.internal.JournalDao
import akka.persistence.r2dbc.internal.JournalDao.SerializedJournalRow
import akka.persistence.state.scaladsl.DurableStateUpdateStore
import akka.persistence.state.scaladsl.DurableStateUpdateWithChangeEventStore
import akka.persistence.state.scaladsl.GetObjectResult
import akka.persistence.typed.PersistenceId
import akka.serialization.SerializationExtension
Expand All @@ -51,7 +53,7 @@ object R2dbcDurableStateStore {
}

class R2dbcDurableStateStore[A](system: ExtendedActorSystem, config: Config, cfgPath: String)
extends DurableStateUpdateStore[A]
extends DurableStateUpdateWithChangeEventStore[A]
with DurableStateStoreBySliceQuery[A]
with DurableStateStorePagedPersistenceIdsQuery[A] {
import R2dbcDurableStateStore.PersistenceIdsQueryState
Expand All @@ -69,6 +71,7 @@ class R2dbcDurableStateStore[A](system: ExtendedActorSystem, config: Config, cfg
settings,
ConnectionFactoryProvider(typedSystem)
.connectionFactoryFor(sharedConfigPath + ".connection-factory"))(typedSystem)
private val changeEventWriterUuid = UUID.randomUUID().toString

private val bySlice: BySliceQuery[SerializedStateRow, DurableStateChange[A]] = {
val createEnvelope: (TimestampOffset, SerializedStateRow) => DurableStateChange[A] = (offset, row) => {
Expand Down Expand Up @@ -117,24 +120,31 @@ class R2dbcDurableStateStore[A](system: ExtendedActorSystem, config: Config, cfg
* disabled with configuration `assert-single-writer`.
*/
override def upsertObject(persistenceId: String, revision: Long, value: A, tag: String): Future[Done] =
upsertObject(persistenceId, revision, value, tag, changeEvent = None)
internalUpsertObject(persistenceId, revision, value, tag, changeEvent = None)

/**
* Insert the value if `revision` is 1, which will fail with `IllegalStateException` if there is already a stored
* value for the given `persistenceId`. Otherwise update the value, which will fail with `IllegalStateException` if
* the existing stored `revision` + 1 isn't equal to the given `revision`. This optimistic locking check can be
* disabled with configuration `assert-single-writer`.
*
* The `changeEvent`, if defined, is written to the event journal in the same transaction as the DurableState upsert.
* Same `persistenceId` is used in the journal and the `revision` is used as `sequenceNr`.
* The `changeEvent` is written to the event journal in the same transaction as the DurableState upsert. Same
* `persistenceId` is used in the journal and the `revision` is used as `sequenceNr`.
*/
def upsertObject(
override def upsertObject(
persistenceId: String,
revision: Long,
value: A,
tag: String,
changeEvent: Any): Future[Done] =
internalUpsertObject(persistenceId, revision, value, tag, changeEvent = Some(changeEvent))

private def internalUpsertObject(
persistenceId: String,
revision: Long,
value: A,
tag: String,
changeEvent: Option[Any]): Future[Done] = {
// FIXME add new trait in Akka for this method. Maybe we need it for the deletes too.

val valueAnyRef = value.asInstanceOf[AnyRef]
val serialized = serialization.serialize(valueAnyRef).get
Expand All @@ -151,44 +161,44 @@ class R2dbcDurableStateStore[A](system: ExtendedActorSystem, config: Config, cfg
manifest,
if (tag.isEmpty) Set.empty else Set(tag))

val serializedChangedEvent: Option[SerializedJournalRow] = {
changeEvent.map { event =>
val eventAnyRef = event.asInstanceOf[AnyRef]
val serializedEvent = eventAnyRef match {
case s: SerializedEvent => s // already serialized
case _ =>
val bytes = serialization.serialize(eventAnyRef).get
val serializer = serialization.findSerializerFor(eventAnyRef)
val manifest = Serializers.manifestFor(serializer, eventAnyRef)
new SerializedEvent(bytes, serializer.identifier, manifest)
}

val entityType = PersistenceId.extractEntityType(persistenceId)
val slice = persistenceExt.sliceForPersistenceId(persistenceId)
val timestamp = if (journalSettings.useAppTimestamp) InstantFactory.now() else JournalDao.EmptyDbTimestamp

SerializedJournalRow(
slice,
entityType,
persistenceId,
revision,
timestamp,
JournalDao.EmptyDbTimestamp,
Some(serializedEvent.bytes),
serializedEvent.serializerId,
serializedEvent.serializerManifest,
"", // FIXME writerUuid, or shall we make one?
if (tag.isEmpty) Set.empty else Set(tag),
metadata = None)
}
}

stateDao.upsertState(serializedRow, value, serializedChangedEvent)
stateDao.upsertState(serializedRow, value, serializedChangeEvent(persistenceId, revision, tag, changeEvent))

// FIXME PubSub, but not via PersistentRepr

}

private def serializedChangeEvent(persistenceId: String, revision: Long, tag: String, changeEvent: Option[Any]) = {
changeEvent.map { event =>
val eventAnyRef = event.asInstanceOf[AnyRef]
val serializedEvent = eventAnyRef match {
case s: SerializedEvent => s // already serialized
case _ =>
val bytes = serialization.serialize(eventAnyRef).get
val serializer = serialization.findSerializerFor(eventAnyRef)
val manifest = Serializers.manifestFor(serializer, eventAnyRef)
new SerializedEvent(bytes, serializer.identifier, manifest)
}

val entityType = PersistenceId.extractEntityType(persistenceId)
val slice = persistenceExt.sliceForPersistenceId(persistenceId)
val timestamp = if (journalSettings.useAppTimestamp) InstantFactory.now() else JournalDao.EmptyDbTimestamp

SerializedJournalRow(
slice,
entityType,
persistenceId,
revision,
timestamp,
JournalDao.EmptyDbTimestamp,
Some(serializedEvent.bytes),
serializedEvent.serializerId,
serializedEvent.serializerManifest,
changeEventWriterUuid,
if (tag.isEmpty) Set.empty else Set(tag),
metadata = None)
}
}

@deprecated(message = "Use the deleteObject overload with revision instead.", since = "1.0.0")
override def deleteObject(persistenceId: String): Future[Done] =
deleteObject(persistenceId, revision = 0)
Expand All @@ -202,9 +212,26 @@ class R2dbcDurableStateStore[A](system: ExtendedActorSystem, config: Config, cfg
* If the given revision is `0` it will fully delete the value and revision from the database without any optimistic
* locking check. Next call to [[getObject]] will then return revision 0 and no value.
*/
override def deleteObject(persistenceId: String, revision: Long): Future[Done] = {
stateDao.deleteState(persistenceId, revision)
}
override def deleteObject(persistenceId: String, revision: Long): Future[Done] =
internalDeleteObject(persistenceId, revision, changeEvent = None)

/**
* Delete the value, which will fail with `IllegalStateException` if the existing stored `revision` + 1 isn't equal to
* the given `revision`. This optimistic locking check can be disabled with configuration `assert-single-writer`. The
* stored revision for the persistenceId is updated and next call to [[getObject]] will return the revision, but with
* no value.
*
* If the given revision is `0` it will fully delete the value and revision from the database without any optimistic
* locking check. Next call to [[getObject]] will then return revision 0 and no value.
*
* The `changeEvent` is written to the event journal in the same transaction as the DurableState upsert. Same
* `persistenceId` is used in the journal and the `revision` is used as `sequenceNr`.
*/
override def deleteObject(persistenceId: String, revision: Long, changeEvent: Any): Future[Done] =
internalDeleteObject(persistenceId, revision, changeEvent = Some(changeEvent))

private def internalDeleteObject(persistenceId: String, revision: Long, changeEvent: Option[Any]): Future[Done] =
stateDao.deleteState(persistenceId, revision, serializedChangeEvent(persistenceId, revision, tag = "", changeEvent))

override def sliceForPersistenceId(persistenceId: String): Int =
persistenceExt.sliceForPersistenceId(persistenceId)
Expand Down
2 changes: 1 addition & 1 deletion project/Dependencies.scala
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ object Dependencies {
val Scala3 = "3.3.1"
val Scala2Versions = Seq(Scala213)
val ScalaVersions = Dependencies.Scala2Versions :+ Dependencies.Scala3
val AkkaVersion = System.getProperty("override.akka.version", "2.9.0")
val AkkaVersion = System.getProperty("override.akka.version", "2.9.1-M1+13-fde4109f-SNAPSHOT")
val AkkaVersionInDocs = AkkaVersion.take(3)
val AkkaPersistenceJdbcVersion = "5.2.0" // only in migration tool tests
val AkkaProjectionVersionInDocs = "current"
Expand Down

0 comments on commit 5e30ce8

Please sign in to comment.