From 405c448520b89dc8e1b19c65c1e6117f80b31181 Mon Sep 17 00:00:00 2001 From: Patrik Nordwall Date: Tue, 12 Dec 2023 16:01:20 +0100 Subject: [PATCH] test and publish --- .../scaladsl/DurableStateCleanup.scala | 13 +- .../r2dbc/internal/DurableStateDao.scala | 10 +- .../persistence/r2dbc/internal/PubSub.scala | 66 +++--- .../postgres/PostgresDurableStateDao.scala | 79 +++++--- .../scaladsl/R2dbcDurableStateStore.scala | 64 +++++- ...eStateUpdateWithChangeEventStoreSpec.scala | 188 ++++++++++++++++++ 6 files changed, 360 insertions(+), 60 deletions(-) create mode 100644 core/src/test/scala/akka/persistence/r2dbc/state/DurableStateUpdateWithChangeEventStoreSpec.scala diff --git a/core/src/main/scala/akka/persistence/r2dbc/cleanup/scaladsl/DurableStateCleanup.scala b/core/src/main/scala/akka/persistence/r2dbc/cleanup/scaladsl/DurableStateCleanup.scala index b5fd7dcf..ab39e4e6 100644 --- a/core/src/main/scala/akka/persistence/r2dbc/cleanup/scaladsl/DurableStateCleanup.scala +++ b/core/src/main/scala/akka/persistence/r2dbc/cleanup/scaladsl/DurableStateCleanup.scala @@ -15,6 +15,7 @@ import akka.actor.typed.ActorSystem import akka.actor.typed.scaladsl.LoggerOps import akka.annotation.ApiMayChange import akka.annotation.InternalApi +import akka.dispatch.ExecutionContexts import akka.persistence.r2dbc.ConnectionFactoryProvider import akka.persistence.r2dbc.R2dbcSettings import akka.persistence.r2dbc.internal.DurableStateDao @@ -65,11 +66,17 @@ final class DurableStateCleanup(systemProvider: ClassicActorSystemProvider, conf */ def deleteState(persistenceId: String, resetRevisionNumber: Boolean): Future[Done] = { if (resetRevisionNumber) - stateDao.deleteState(persistenceId, revision = 0L, changeEvent = None) // hard delete without revision check + stateDao + .deleteState(persistenceId, revision = 0L, changeEvent = None) // hard delete without revision check + .map(_ => Done)(ExecutionContexts.parasitic) else { stateDao.readState(persistenceId).flatMap { - case None => Future.successful(Done) // already deleted - case Some(s) => stateDao.deleteState(persistenceId, s.revision + 1, changeEvent = None) + case None => + Future.successful(Done) // already deleted + case Some(s) => + stateDao + .deleteState(persistenceId, s.revision + 1, changeEvent = None) + .map(_ => Done)(ExecutionContexts.parasitic) } } } diff --git a/core/src/main/scala/akka/persistence/r2dbc/internal/DurableStateDao.scala b/core/src/main/scala/akka/persistence/r2dbc/internal/DurableStateDao.scala index 0b6f4892..e09eb1c6 100644 --- a/core/src/main/scala/akka/persistence/r2dbc/internal/DurableStateDao.scala +++ b/core/src/main/scala/akka/persistence/r2dbc/internal/DurableStateDao.scala @@ -50,9 +50,15 @@ private[r2dbc] trait DurableStateDao extends BySliceQuery.Dao[DurableStateDao.Se def readState(persistenceId: String): Future[Option[SerializedStateRow]] - def upsertState(state: SerializedStateRow, value: Any, changeEvent: Option[SerializedJournalRow]): Future[Done] + def upsertState( + state: SerializedStateRow, + value: Any, + changeEvent: Option[SerializedJournalRow]): Future[Option[Instant]] - def deleteState(persistenceId: String, revision: Long, changeEvent: Option[SerializedJournalRow]): Future[Done] + def deleteState( + persistenceId: String, + revision: Long, + changeEvent: Option[SerializedJournalRow]): Future[Option[Instant]] def persistenceIds(afterId: Option[String], limit: Long): Source[String, NotUsed] diff --git a/core/src/main/scala/akka/persistence/r2dbc/internal/PubSub.scala b/core/src/main/scala/akka/persistence/r2dbc/internal/PubSub.scala index 7c3e5fc1..a5f1c85e 100644 --- a/core/src/main/scala/akka/persistence/r2dbc/internal/PubSub.scala +++ b/core/src/main/scala/akka/persistence/r2dbc/internal/PubSub.scala @@ -91,29 +91,7 @@ import org.slf4j.LoggerFactory } def publish(pr: PersistentRepr, timestamp: Instant): Unit = { - - val n = throughputCounter.incrementAndGet() - if (n % throughputSampler == 0) { - val ewma = throughput - val durationMillis = (System.nanoTime() - ewma.nanoTime) / 1000 / 1000 - if (durationMillis >= throughputCollectIntervalMillis) { - // doesn't have to be exact so "missed" or duplicate concurrent calls don't matter - throughputCounter.set(0L) - val rps = n * 1000.0 / durationMillis - val newEwma = ewma :+ rps - throughput = newEwma - if (ewma.value < throughputThreshold && newEwma.value >= throughputThreshold) { - log.info("Disabled publishing of events. Throughput greater than [{}] events/s", throughputThreshold) - } else if (ewma.value >= throughputThreshold && newEwma.value < throughputThreshold) { - log.info("Enabled publishing of events. Throughput less than [{}] events/s", throughputThreshold) - } else { - log.debug( - "Publishing of events is {}. Throughput is [{}] events/s", - if (newEwma.value < throughputThreshold) "enabled" else "disabled", - newEwma.value) - } - } - } + updateThroughput() if (throughput.value < throughputThreshold) { val pid = pr.persistenceId @@ -143,7 +121,47 @@ import org.slf4j.LoggerFactory filtered, source = EnvelopeOrigin.SourcePubSub, tags) - eventTopic(entityType, slice) ! Topic.Publish(envelope) + + publishToTopic(envelope) + } + } + + def publish(envelope: EventEnvelope[Any]): Unit = { + updateThroughput() + + if (throughput.value < throughputThreshold) + publishToTopic(envelope) + } + + private def publishToTopic(envelope: EventEnvelope[Any]): Unit = { + val entityType = PersistenceId.extractEntityType(envelope.persistenceId) + val slice = persistenceExt.sliceForPersistenceId(envelope.persistenceId) + + eventTopic(entityType, slice) ! Topic.Publish(envelope) + } + + private def updateThroughput(): Unit = { + val n = throughputCounter.incrementAndGet() + if (n % throughputSampler == 0) { + val ewma = throughput + val durationMillis = (System.nanoTime() - ewma.nanoTime) / 1000 / 1000 + if (durationMillis >= throughputCollectIntervalMillis) { + // doesn't have to be exact so "missed" or duplicate concurrent calls don't matter + throughputCounter.set(0L) + val rps = n * 1000.0 / durationMillis + val newEwma = ewma :+ rps + throughput = newEwma + if (ewma.value < throughputThreshold && newEwma.value >= throughputThreshold) { + log.info("Disabled publishing of events. Throughput greater than [{}] events/s", throughputThreshold) + } else if (ewma.value >= throughputThreshold && newEwma.value < throughputThreshold) { + log.info("Enabled publishing of events. Throughput less than [{}] events/s", throughputThreshold) + } else { + log.debug( + "Publishing of events is {}. Throughput is [{}] events/s", + if (newEwma.value < throughputThreshold) "enabled" else "disabled", + newEwma.value) + } + } } } } diff --git a/core/src/main/scala/akka/persistence/r2dbc/internal/postgres/PostgresDurableStateDao.scala b/core/src/main/scala/akka/persistence/r2dbc/internal/postgres/PostgresDurableStateDao.scala index 1131a5da..10fe105e 100644 --- a/core/src/main/scala/akka/persistence/r2dbc/internal/postgres/PostgresDurableStateDao.scala +++ b/core/src/main/scala/akka/persistence/r2dbc/internal/postgres/PostgresDurableStateDao.scala @@ -67,6 +67,7 @@ private[r2dbc] object PostgresDurableStateDao { binding: AdditionalColumn.Binding[_]) val FutureDone: Future[Done] = Future.successful(Done) + val FutureInstantNone: Future[Option[Instant]] = Future.successful(None) } /** @@ -302,20 +303,22 @@ private[r2dbc] class PostgresDurableStateDao( updatedRows: Long, entityType: String, change: DurableStateChange[Any], - changeEvent: Option[SerializedJournalRow]): Future[Done] = { + changeEvent: Option[SerializedJournalRow]): Future[Option[Instant]] = { if (updatedRows == 1) for { - _ <- changeEvent.map(journalDao.writeEventInTx(_, connection)).getOrElse(FutureDone) + changeEventTimestamp <- changeEvent + .map(journalDao.writeEventInTx(_, connection).map(Some(_))) + .getOrElse(FutureInstantNone) _ <- changeHandlers.get(entityType).map(processChange(_, connection, change)).getOrElse(FutureDone) - } yield Done + } yield changeEventTimestamp else - FutureDone + FutureInstantNone } override def upsertState( state: SerializedStateRow, value: Any, - changeEvent: Option[SerializedJournalRow]): Future[Done] = { + changeEvent: Option[SerializedJournalRow]): Future[Option[Instant]] = { require(state.revision > 0) def bindTags(stmt: Statement, i: Int): Statement = { @@ -349,7 +352,7 @@ private[r2dbc] class PostgresDurableStateDao( val entityType = PersistenceId.extractEntityType(state.persistenceId) - val result = { + val result: Future[(Long, Option[Instant])] = { val additionalBindings = additionalColumns.get(entityType) match { case None => Vector.empty[EvaluatedAdditionalColumnBindings] case Some(columns) => @@ -382,14 +385,21 @@ private[r2dbc] class PostgresDurableStateDao( s"Insert failed: durable state for persistence id [${state.persistenceId}] already exists")) } - if (!changeHandlers.contains(entityType) && changeEvent.isEmpty) - recoverDataIntegrityViolation(r2dbcExecutor.updateOne(s"insert [${state.persistenceId}]")(insertStatement)) - else + if (!changeHandlers.contains(entityType) && changeEvent.isEmpty) { + val updatedRows = recoverDataIntegrityViolation( + r2dbcExecutor.updateOne(s"insert [${state.persistenceId}]")(insertStatement)) + updatedRows.map(_ -> None) + } else r2dbcExecutor.withConnection(s"insert [${state.persistenceId}]") { connection => for { updatedRows <- recoverDataIntegrityViolation(R2dbcExecutor.updateOneInTx(insertStatement(connection))) - _ <- writeChangeEventAndCallChangeHander(connection, updatedRows, entityType, change, changeEvent = None) - } yield updatedRows + changeEventTimestamp <- writeChangeEventAndCallChangeHander( + connection, + updatedRows, + entityType, + change, + changeEvent) + } yield (updatedRows, changeEventTimestamp) } } else { val previousRevision = state.revision - 1 @@ -425,25 +435,31 @@ private[r2dbc] class PostgresDurableStateDao( } } - if (!changeHandlers.contains(entityType) && changeEvent.isEmpty) - r2dbcExecutor.updateOne(s"update [${state.persistenceId}]")(updateStatement) - else + if (!changeHandlers.contains(entityType) && changeEvent.isEmpty) { + val updatedRows = r2dbcExecutor.updateOne(s"update [${state.persistenceId}]")(updateStatement) + updatedRows.map(_ -> None) + } else r2dbcExecutor.withConnection(s"update [${state.persistenceId}]") { connection => for { updatedRows <- R2dbcExecutor.updateOneInTx(updateStatement(connection)) - _ <- writeChangeEventAndCallChangeHander(connection, updatedRows, entityType, change, changeEvent = None) - } yield updatedRows + changeEventTimestamp <- writeChangeEventAndCallChangeHander( + connection, + updatedRows, + entityType, + change, + changeEvent) + } yield (updatedRows, changeEventTimestamp) } } } - result.map { updatedRows => + result.map { case (updatedRows, changeEventTimestamp) => if (updatedRows != 1) throw new IllegalStateException( s"Update failed: durable state for persistence id [${state.persistenceId}] could not be updated to revision [${state.revision}]") else { log.debug("Updated durable state for persistenceId [{}] to revision [{}]", state.persistenceId, state.revision) - Done + changeEventTimestamp } } } @@ -472,11 +488,12 @@ private[r2dbc] class PostgresDurableStateDao( override def deleteState( persistenceId: String, revision: Long, - changeEvent: Option[SerializedJournalRow]): Future[Done] = { + changeEvent: Option[SerializedJournalRow]): Future[Option[Instant]] = { if (revision == 0) { hardDeleteState(persistenceId) + .map(_ => None)(ExecutionContexts.parasitic) } else { - val result = { + val result: Future[(Long, Option[Instant])] = { val entityType = PersistenceId.extractEntityType(persistenceId) def change = new DeletedDurableState[Any](persistenceId, revision, NoOffset, EmptyDbTimestamp.toEpochMilli) @@ -508,8 +525,13 @@ private[r2dbc] class PostgresDurableStateDao( for { updatedRows <- recoverDataIntegrityViolation( R2dbcExecutor.updateOneInTx(insertDeleteMarkerStatement(connection))) - _ <- writeChangeEventAndCallChangeHander(connection, updatedRows, entityType, change, changeEvent) - } yield updatedRows + changeEventTimestamp <- writeChangeEventAndCallChangeHander( + connection, + updatedRows, + entityType, + change, + changeEvent) + } yield (updatedRows, changeEventTimestamp) } } else { @@ -549,19 +571,24 @@ private[r2dbc] class PostgresDurableStateDao( r2dbcExecutor.withConnection(s"delete [$persistenceId]") { connection => for { updatedRows <- R2dbcExecutor.updateOneInTx(updateStatement(connection)) - _ <- writeChangeEventAndCallChangeHander(connection, updatedRows, entityType, change, changeEvent) - } yield updatedRows + changeEventTimestamp <- writeChangeEventAndCallChangeHander( + connection, + updatedRows, + entityType, + change, + changeEvent) + } yield (updatedRows, changeEventTimestamp) } } } - result.map { updatedRows => + result.map { case (updatedRows, changeEventTimestamp) => if (updatedRows != 1) throw new IllegalStateException( s"Delete failed: durable state for persistence id [$persistenceId] could not be updated to revision [$revision]") else { log.debug("Deleted durable state for persistenceId [{}] to revision [{}]", persistenceId, revision) - Done + changeEventTimestamp } } diff --git a/core/src/main/scala/akka/persistence/r2dbc/state/scaladsl/R2dbcDurableStateStore.scala b/core/src/main/scala/akka/persistence/r2dbc/state/scaladsl/R2dbcDurableStateStore.scala index 7980a950..be7656e5 100644 --- a/core/src/main/scala/akka/persistence/r2dbc/state/scaladsl/R2dbcDurableStateStore.scala +++ b/core/src/main/scala/akka/persistence/r2dbc/state/scaladsl/R2dbcDurableStateStore.scala @@ -4,6 +4,7 @@ package akka.persistence.r2dbc.state.scaladsl +import java.time.Instant import java.util.UUID import scala.collection.immutable @@ -23,6 +24,7 @@ import akka.persistence.query.Offset import akka.persistence.query.TimestampOffset import akka.persistence.query.UpdatedDurableState import akka.persistence.query.scaladsl.DurableStateStorePagedPersistenceIdsQuery +import akka.persistence.query.typed.EventEnvelope import akka.persistence.query.typed.scaladsl.DurableStateStoreBySliceQuery import akka.persistence.r2dbc.ConnectionFactoryProvider import akka.persistence.r2dbc.R2dbcSettings @@ -30,9 +32,11 @@ import akka.persistence.r2dbc.internal.BySliceQuery import akka.persistence.r2dbc.internal.ContinuousQuery import akka.persistence.r2dbc.internal.DurableStateDao import akka.persistence.r2dbc.internal.DurableStateDao.SerializedStateRow +import akka.persistence.r2dbc.internal.EnvelopeOrigin import akka.persistence.r2dbc.internal.InstantFactory import akka.persistence.r2dbc.internal.JournalDao import akka.persistence.r2dbc.internal.JournalDao.SerializedJournalRow +import akka.persistence.r2dbc.internal.PubSub import akka.persistence.state.scaladsl.DurableStateUpdateWithChangeEventStore import akka.persistence.state.scaladsl.GetObjectResult import akka.persistence.typed.PersistenceId @@ -73,6 +77,10 @@ class R2dbcDurableStateStore[A](system: ExtendedActorSystem, config: Config, cfg .connectionFactoryFor(sharedConfigPath + ".connection-factory"))(typedSystem) private val changeEventWriterUuid = UUID.randomUUID().toString + private val pubSub: Option[PubSub] = + if (journalSettings.journalPublishEvents) Some(PubSub(typedSystem)) + else None + private val bySlice: BySliceQuery[SerializedStateRow, DurableStateChange[A]] = { val createEnvelope: (TimestampOffset, SerializedStateRow) => DurableStateChange[A] = (offset, row) => { row.payload match { @@ -151,6 +159,7 @@ class R2dbcDurableStateStore[A](system: ExtendedActorSystem, config: Config, cfg val serializer = serialization.findSerializerFor(valueAnyRef) val manifest = Serializers.manifestFor(serializer, valueAnyRef) + val tags = if (tag.isEmpty) Set.empty[String] else Set(tag) val serializedRow = SerializedStateRow( persistenceId, revision, @@ -159,12 +168,47 @@ class R2dbcDurableStateStore[A](system: ExtendedActorSystem, config: Config, cfg Some(serialized), serializer.identifier, manifest, - if (tag.isEmpty) Set.empty else Set(tag)) + tags) + + val changeEventTimestamp = + stateDao.upsertState(serializedRow, value, serializedChangeEvent(persistenceId, revision, tag, changeEvent)) + + import typedSystem.executionContext + changeEventTimestamp.map { timestampOption => + publish(persistenceId, revision, changeEvent, timestampOption, tags) + Done + } + } - stateDao.upsertState(serializedRow, value, serializedChangeEvent(persistenceId, revision, tag, changeEvent)) + private def publish( + persistenceId: String, + revision: Long, + changeEvent: Option[Any], + changeEventTimestamp: Option[Instant], + tags: Set[String]): Unit = { + for { + timestamp <- changeEventTimestamp + event <- changeEvent + p <- pubSub + } yield { + val entityType = PersistenceId.extractEntityType(persistenceId) + val slice = persistenceExt.sliceForPersistenceId(persistenceId) - // FIXME PubSub, but not via PersistentRepr + val offset = TimestampOffset(timestamp, timestamp, Map(persistenceId -> revision)) + val envelope = EventEnvelope( + offset, + persistenceId, + revision, + event, + timestamp.toEpochMilli, + entityType, + slice, + filtered = false, + source = EnvelopeOrigin.SourcePubSub, + tags) + p.publish(envelope) + } } private def serializedChangeEvent(persistenceId: String, revision: Long, tag: String, changeEvent: Option[Any]) = { @@ -230,8 +274,18 @@ class R2dbcDurableStateStore[A](system: ExtendedActorSystem, config: Config, cfg override def deleteObject(persistenceId: String, revision: Long, changeEvent: Any): Future[Done] = internalDeleteObject(persistenceId, revision, changeEvent = Some(changeEvent)) - private def internalDeleteObject(persistenceId: String, revision: Long, changeEvent: Option[Any]): Future[Done] = - stateDao.deleteState(persistenceId, revision, serializedChangeEvent(persistenceId, revision, tag = "", changeEvent)) + private def internalDeleteObject(persistenceId: String, revision: Long, changeEvent: Option[Any]): Future[Done] = { + val changeEventTimestamp = stateDao.deleteState( + persistenceId, + revision, + serializedChangeEvent(persistenceId, revision, tag = "", changeEvent)) + + import typedSystem.executionContext + changeEventTimestamp.map { timestampOption => + publish(persistenceId, revision, changeEvent, timestampOption, tags = Set.empty) + Done + } + } override def sliceForPersistenceId(persistenceId: String): Int = persistenceExt.sliceForPersistenceId(persistenceId) diff --git a/core/src/test/scala/akka/persistence/r2dbc/state/DurableStateUpdateWithChangeEventStoreSpec.scala b/core/src/test/scala/akka/persistence/r2dbc/state/DurableStateUpdateWithChangeEventStoreSpec.scala new file mode 100644 index 00000000..205c7f1b --- /dev/null +++ b/core/src/test/scala/akka/persistence/r2dbc/state/DurableStateUpdateWithChangeEventStoreSpec.scala @@ -0,0 +1,188 @@ +/* + * Copyright (C) 2022 - 2023 Lightbend Inc. + */ + +package akka.persistence.r2dbc.state + +import akka.actor.testkit.typed.scaladsl.LogCapturing +import akka.actor.testkit.typed.scaladsl.ScalaTestWithActorTestKit +import akka.actor.typed.ActorSystem +import akka.actor.typed.pubsub.Topic +import akka.persistence.query.PersistenceQuery +import akka.persistence.query.typed.EventEnvelope +import akka.persistence.query.typed.scaladsl.CurrentEventsByPersistenceIdTypedQuery +import akka.persistence.r2dbc.TestConfig +import akka.persistence.r2dbc.TestData +import akka.persistence.r2dbc.TestDbLifecycle +import akka.persistence.r2dbc.internal.EnvelopeOrigin +import akka.persistence.r2dbc.internal.PubSub +import akka.persistence.r2dbc.query.scaladsl.R2dbcReadJournal +import akka.persistence.r2dbc.state.scaladsl.R2dbcDurableStateStore +import akka.persistence.state.DurableStateStoreRegistry +import akka.persistence.state.scaladsl.DurableStateUpdateWithChangeEventStore +import akka.persistence.typed.PersistenceId +import akka.stream.scaladsl.Sink +import org.scalatest.wordspec.AnyWordSpecLike + +class DurableStateUpdateWithChangeEventStoreSpec + extends ScalaTestWithActorTestKit(TestConfig.config) + with AnyWordSpecLike + with TestDbLifecycle + with TestData + with LogCapturing { + + override def typedSystem: ActorSystem[_] = system + + private val store = DurableStateStoreRegistry(system) + .durableStateStoreFor[DurableStateUpdateWithChangeEventStore[String]](R2dbcDurableStateStore.Identifier) + private val journalQuery = + PersistenceQuery(system).readJournalFor[CurrentEventsByPersistenceIdTypedQuery](R2dbcReadJournal.Identifier) + + private val tag = "TAG" + + "The R2DBC durable state store" should { + "save additional change event" in { + val entityType = nextEntityType() + val persistenceId = PersistenceId(entityType, "my-persistenceId").id + val value1 = "Genuinely Collaborative" + val value2 = "Open to Feedback" + + store.upsertObject(persistenceId, 1L, value1, tag, s"Changed to $value1").futureValue + store.upsertObject(persistenceId, 2L, value2, tag, s"Changed to $value2").futureValue + store.deleteObject(persistenceId, 3L, "Deleted").futureValue + + val envelopes = journalQuery + .currentEventsByPersistenceIdTyped[String](persistenceId, 1L, Long.MaxValue) + .runWith(Sink.seq) + .futureValue + + val env1 = envelopes.head + env1.event shouldBe s"Changed to $value1" + env1.sequenceNr shouldBe 1L + env1.tags shouldBe Set(tag) + + val env2 = envelopes(1) + env2.event shouldBe s"Changed to $value2" + env2.sequenceNr shouldBe 2L + + val env3 = envelopes(2) + env3.event shouldBe s"Deleted" + env3.sequenceNr shouldBe 3L + } + + "detect and reject concurrent inserts, and not store change event" in { + val entityType = nextEntityType() + val persistenceId = PersistenceId(entityType, "id-to-be-inserted-concurrently").id + val value = "Genuinely Collaborative" + store.upsertObject(persistenceId, revision = 1L, value, tag, s"Changed to $value").futureValue + + val updatedValue = "Open to Feedback" + store + .upsertObject(persistenceId, revision = 1L, updatedValue, tag, s"Changed to $updatedValue") + .failed + .futureValue + + val envelopes = journalQuery + .currentEventsByPersistenceIdTyped[String](persistenceId, 1L, Long.MaxValue) + .runWith(Sink.seq) + .futureValue + envelopes.size shouldBe 1 + } + + "detect and reject concurrent updates, and not store change event" in { + if (!r2dbcSettings.durableStateAssertSingleWriter) + pending + + val entityType = nextEntityType() + val persistenceId = PersistenceId(entityType, "id-to-be-updated-concurrently").id + val value = "Genuinely Collaborative" + store.upsertObject(persistenceId, revision = 1L, value, tag, s"Changed to $value").futureValue + + val updatedValue = "Open to Feedback" + store.upsertObject(persistenceId, revision = 2L, updatedValue, tag, s"Changed to $updatedValue").futureValue + + // simulate an update by a different node that didn't see the first one: + val updatedValue2 = "Genuine and Sincere in all Communications" + store + .upsertObject(persistenceId, revision = 2L, updatedValue2, tag, s"Changed to $updatedValue2") + .failed + .futureValue + + val envelopes = journalQuery + .currentEventsByPersistenceIdTyped[String](persistenceId, 1L, Long.MaxValue) + .runWith(Sink.seq) + .futureValue + envelopes.size shouldBe 2 + } + + "detect and reject concurrent delete of revision 1, and not store change event" in { + val entityType = nextEntityType() + val persistenceId = PersistenceId(entityType, "id-to-be-deleted-concurrently").id + val value = "Genuinely Collaborative" + store.upsertObject(persistenceId, revision = 1L, value, tag, s"Changed to $value").futureValue + + store.deleteObject(persistenceId, revision = 1L, "Deleted").failed.futureValue + + val envelopes = journalQuery + .currentEventsByPersistenceIdTyped[String](persistenceId, 1L, Long.MaxValue) + .runWith(Sink.seq) + .futureValue + envelopes.size shouldBe 1 + } + + "detect and reject concurrent deletes, and not store change event" in { + if (!r2dbcSettings.durableStateAssertSingleWriter) + pending + + val entityType = nextEntityType() + val persistenceId = PersistenceId(entityType, "id-to-be-updated-concurrently").id + val value = "Genuinely Collaborative" + store.upsertObject(persistenceId, revision = 1L, value, tag, s"Changed to $value").futureValue + + val updatedValue = "Open to Feedback" + store.upsertObject(persistenceId, revision = 2L, updatedValue, tag, s"Changed to $updatedValue").futureValue + + // simulate a delete by a different node that didn't see the first one: + store.deleteObject(persistenceId, revision = 2L, "Deleted").failed.futureValue + + val envelopes = journalQuery + .currentEventsByPersistenceIdTyped[String](persistenceId, 1L, Long.MaxValue) + .runWith(Sink.seq) + .futureValue + envelopes.size shouldBe 2 + } + + } + + "publish change event" in { + val entityType = nextEntityType() + val persistenceId = PersistenceId(entityType, "my-persistenceId").id + + val slice = persistenceExt.sliceForPersistenceId(persistenceId) + val topic = PubSub(system).eventTopic[String](entityType, slice) + val subscriberProbe = createTestProbe[EventEnvelope[String]]() + topic ! Topic.Subscribe(subscriberProbe.ref) + + val value1 = "Genuinely Collaborative" + val value2 = "Open to Feedback" + + store.upsertObject(persistenceId, 1L, value1, tag, s"Changed to $value1").futureValue + store.upsertObject(persistenceId, 2L, value2, tag, s"Changed to $value2").futureValue + store.deleteObject(persistenceId, 3L, "Deleted").futureValue + + val env1 = subscriberProbe.receiveMessage() + env1.event shouldBe s"Changed to $value1" + env1.sequenceNr shouldBe 1L + env1.tags shouldBe Set(tag) + env1.source shouldBe EnvelopeOrigin.SourcePubSub + + val env2 = subscriberProbe.receiveMessage() + env2.event shouldBe s"Changed to $value2" + env2.sequenceNr shouldBe 2L + + val env3 = subscriberProbe.receiveMessage() + env3.event shouldBe s"Deleted" + env3.sequenceNr shouldBe 3L + } + +}