Skip to content

Commit

Permalink
test and publish
Browse files Browse the repository at this point in the history
  • Loading branch information
patriknw committed Dec 12, 2023
1 parent 5e30ce8 commit 405c448
Show file tree
Hide file tree
Showing 6 changed files with 360 additions and 60 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import akka.actor.typed.ActorSystem
import akka.actor.typed.scaladsl.LoggerOps
import akka.annotation.ApiMayChange
import akka.annotation.InternalApi
import akka.dispatch.ExecutionContexts
import akka.persistence.r2dbc.ConnectionFactoryProvider
import akka.persistence.r2dbc.R2dbcSettings
import akka.persistence.r2dbc.internal.DurableStateDao
Expand Down Expand Up @@ -65,11 +66,17 @@ final class DurableStateCleanup(systemProvider: ClassicActorSystemProvider, conf
*/
def deleteState(persistenceId: String, resetRevisionNumber: Boolean): Future[Done] = {
if (resetRevisionNumber)
stateDao.deleteState(persistenceId, revision = 0L, changeEvent = None) // hard delete without revision check
stateDao
.deleteState(persistenceId, revision = 0L, changeEvent = None) // hard delete without revision check
.map(_ => Done)(ExecutionContexts.parasitic)
else {
stateDao.readState(persistenceId).flatMap {
case None => Future.successful(Done) // already deleted
case Some(s) => stateDao.deleteState(persistenceId, s.revision + 1, changeEvent = None)
case None =>
Future.successful(Done) // already deleted
case Some(s) =>
stateDao
.deleteState(persistenceId, s.revision + 1, changeEvent = None)
.map(_ => Done)(ExecutionContexts.parasitic)
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,9 +50,15 @@ private[r2dbc] trait DurableStateDao extends BySliceQuery.Dao[DurableStateDao.Se

def readState(persistenceId: String): Future[Option[SerializedStateRow]]

def upsertState(state: SerializedStateRow, value: Any, changeEvent: Option[SerializedJournalRow]): Future[Done]
def upsertState(
state: SerializedStateRow,
value: Any,
changeEvent: Option[SerializedJournalRow]): Future[Option[Instant]]

def deleteState(persistenceId: String, revision: Long, changeEvent: Option[SerializedJournalRow]): Future[Done]
def deleteState(
persistenceId: String,
revision: Long,
changeEvent: Option[SerializedJournalRow]): Future[Option[Instant]]

def persistenceIds(afterId: Option[String], limit: Long): Source[String, NotUsed]

Expand Down
66 changes: 42 additions & 24 deletions core/src/main/scala/akka/persistence/r2dbc/internal/PubSub.scala
Original file line number Diff line number Diff line change
Expand Up @@ -91,29 +91,7 @@ import org.slf4j.LoggerFactory
}

def publish(pr: PersistentRepr, timestamp: Instant): Unit = {

val n = throughputCounter.incrementAndGet()
if (n % throughputSampler == 0) {
val ewma = throughput
val durationMillis = (System.nanoTime() - ewma.nanoTime) / 1000 / 1000
if (durationMillis >= throughputCollectIntervalMillis) {
// doesn't have to be exact so "missed" or duplicate concurrent calls don't matter
throughputCounter.set(0L)
val rps = n * 1000.0 / durationMillis
val newEwma = ewma :+ rps
throughput = newEwma
if (ewma.value < throughputThreshold && newEwma.value >= throughputThreshold) {
log.info("Disabled publishing of events. Throughput greater than [{}] events/s", throughputThreshold)
} else if (ewma.value >= throughputThreshold && newEwma.value < throughputThreshold) {
log.info("Enabled publishing of events. Throughput less than [{}] events/s", throughputThreshold)
} else {
log.debug(
"Publishing of events is {}. Throughput is [{}] events/s",
if (newEwma.value < throughputThreshold) "enabled" else "disabled",
newEwma.value)
}
}
}
updateThroughput()

if (throughput.value < throughputThreshold) {
val pid = pr.persistenceId
Expand Down Expand Up @@ -143,7 +121,47 @@ import org.slf4j.LoggerFactory
filtered,
source = EnvelopeOrigin.SourcePubSub,
tags)
eventTopic(entityType, slice) ! Topic.Publish(envelope)

publishToTopic(envelope)
}
}

def publish(envelope: EventEnvelope[Any]): Unit = {
updateThroughput()

if (throughput.value < throughputThreshold)
publishToTopic(envelope)
}

private def publishToTopic(envelope: EventEnvelope[Any]): Unit = {
val entityType = PersistenceId.extractEntityType(envelope.persistenceId)
val slice = persistenceExt.sliceForPersistenceId(envelope.persistenceId)

eventTopic(entityType, slice) ! Topic.Publish(envelope)
}

private def updateThroughput(): Unit = {
val n = throughputCounter.incrementAndGet()
if (n % throughputSampler == 0) {
val ewma = throughput
val durationMillis = (System.nanoTime() - ewma.nanoTime) / 1000 / 1000
if (durationMillis >= throughputCollectIntervalMillis) {
// doesn't have to be exact so "missed" or duplicate concurrent calls don't matter
throughputCounter.set(0L)
val rps = n * 1000.0 / durationMillis
val newEwma = ewma :+ rps
throughput = newEwma
if (ewma.value < throughputThreshold && newEwma.value >= throughputThreshold) {
log.info("Disabled publishing of events. Throughput greater than [{}] events/s", throughputThreshold)
} else if (ewma.value >= throughputThreshold && newEwma.value < throughputThreshold) {
log.info("Enabled publishing of events. Throughput less than [{}] events/s", throughputThreshold)
} else {
log.debug(
"Publishing of events is {}. Throughput is [{}] events/s",
if (newEwma.value < throughputThreshold) "enabled" else "disabled",
newEwma.value)
}
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ private[r2dbc] object PostgresDurableStateDao {
binding: AdditionalColumn.Binding[_])

val FutureDone: Future[Done] = Future.successful(Done)
val FutureInstantNone: Future[Option[Instant]] = Future.successful(None)
}

/**
Expand Down Expand Up @@ -302,20 +303,22 @@ private[r2dbc] class PostgresDurableStateDao(
updatedRows: Long,
entityType: String,
change: DurableStateChange[Any],
changeEvent: Option[SerializedJournalRow]): Future[Done] = {
changeEvent: Option[SerializedJournalRow]): Future[Option[Instant]] = {
if (updatedRows == 1)
for {
_ <- changeEvent.map(journalDao.writeEventInTx(_, connection)).getOrElse(FutureDone)
changeEventTimestamp <- changeEvent
.map(journalDao.writeEventInTx(_, connection).map(Some(_)))
.getOrElse(FutureInstantNone)
_ <- changeHandlers.get(entityType).map(processChange(_, connection, change)).getOrElse(FutureDone)
} yield Done
} yield changeEventTimestamp
else
FutureDone
FutureInstantNone
}

override def upsertState(
state: SerializedStateRow,
value: Any,
changeEvent: Option[SerializedJournalRow]): Future[Done] = {
changeEvent: Option[SerializedJournalRow]): Future[Option[Instant]] = {
require(state.revision > 0)

def bindTags(stmt: Statement, i: Int): Statement = {
Expand Down Expand Up @@ -349,7 +352,7 @@ private[r2dbc] class PostgresDurableStateDao(

val entityType = PersistenceId.extractEntityType(state.persistenceId)

val result = {
val result: Future[(Long, Option[Instant])] = {
val additionalBindings = additionalColumns.get(entityType) match {
case None => Vector.empty[EvaluatedAdditionalColumnBindings]
case Some(columns) =>
Expand Down Expand Up @@ -382,14 +385,21 @@ private[r2dbc] class PostgresDurableStateDao(
s"Insert failed: durable state for persistence id [${state.persistenceId}] already exists"))
}

if (!changeHandlers.contains(entityType) && changeEvent.isEmpty)
recoverDataIntegrityViolation(r2dbcExecutor.updateOne(s"insert [${state.persistenceId}]")(insertStatement))
else
if (!changeHandlers.contains(entityType) && changeEvent.isEmpty) {
val updatedRows = recoverDataIntegrityViolation(
r2dbcExecutor.updateOne(s"insert [${state.persistenceId}]")(insertStatement))
updatedRows.map(_ -> None)
} else
r2dbcExecutor.withConnection(s"insert [${state.persistenceId}]") { connection =>
for {
updatedRows <- recoverDataIntegrityViolation(R2dbcExecutor.updateOneInTx(insertStatement(connection)))
_ <- writeChangeEventAndCallChangeHander(connection, updatedRows, entityType, change, changeEvent = None)
} yield updatedRows
changeEventTimestamp <- writeChangeEventAndCallChangeHander(
connection,
updatedRows,
entityType,
change,
changeEvent)
} yield (updatedRows, changeEventTimestamp)
}
} else {
val previousRevision = state.revision - 1
Expand Down Expand Up @@ -425,25 +435,31 @@ private[r2dbc] class PostgresDurableStateDao(
}
}

if (!changeHandlers.contains(entityType) && changeEvent.isEmpty)
r2dbcExecutor.updateOne(s"update [${state.persistenceId}]")(updateStatement)
else
if (!changeHandlers.contains(entityType) && changeEvent.isEmpty) {
val updatedRows = r2dbcExecutor.updateOne(s"update [${state.persistenceId}]")(updateStatement)
updatedRows.map(_ -> None)
} else
r2dbcExecutor.withConnection(s"update [${state.persistenceId}]") { connection =>
for {
updatedRows <- R2dbcExecutor.updateOneInTx(updateStatement(connection))
_ <- writeChangeEventAndCallChangeHander(connection, updatedRows, entityType, change, changeEvent = None)
} yield updatedRows
changeEventTimestamp <- writeChangeEventAndCallChangeHander(
connection,
updatedRows,
entityType,
change,
changeEvent)
} yield (updatedRows, changeEventTimestamp)
}
}
}

result.map { updatedRows =>
result.map { case (updatedRows, changeEventTimestamp) =>
if (updatedRows != 1)
throw new IllegalStateException(
s"Update failed: durable state for persistence id [${state.persistenceId}] could not be updated to revision [${state.revision}]")
else {
log.debug("Updated durable state for persistenceId [{}] to revision [{}]", state.persistenceId, state.revision)
Done
changeEventTimestamp
}
}
}
Expand Down Expand Up @@ -472,11 +488,12 @@ private[r2dbc] class PostgresDurableStateDao(
override def deleteState(
persistenceId: String,
revision: Long,
changeEvent: Option[SerializedJournalRow]): Future[Done] = {
changeEvent: Option[SerializedJournalRow]): Future[Option[Instant]] = {
if (revision == 0) {
hardDeleteState(persistenceId)
.map(_ => None)(ExecutionContexts.parasitic)
} else {
val result = {
val result: Future[(Long, Option[Instant])] = {
val entityType = PersistenceId.extractEntityType(persistenceId)
def change =
new DeletedDurableState[Any](persistenceId, revision, NoOffset, EmptyDbTimestamp.toEpochMilli)
Expand Down Expand Up @@ -508,8 +525,13 @@ private[r2dbc] class PostgresDurableStateDao(
for {
updatedRows <- recoverDataIntegrityViolation(
R2dbcExecutor.updateOneInTx(insertDeleteMarkerStatement(connection)))
_ <- writeChangeEventAndCallChangeHander(connection, updatedRows, entityType, change, changeEvent)
} yield updatedRows
changeEventTimestamp <- writeChangeEventAndCallChangeHander(
connection,
updatedRows,
entityType,
change,
changeEvent)
} yield (updatedRows, changeEventTimestamp)
}

} else {
Expand Down Expand Up @@ -549,19 +571,24 @@ private[r2dbc] class PostgresDurableStateDao(
r2dbcExecutor.withConnection(s"delete [$persistenceId]") { connection =>
for {
updatedRows <- R2dbcExecutor.updateOneInTx(updateStatement(connection))
_ <- writeChangeEventAndCallChangeHander(connection, updatedRows, entityType, change, changeEvent)
} yield updatedRows
changeEventTimestamp <- writeChangeEventAndCallChangeHander(
connection,
updatedRows,
entityType,
change,
changeEvent)
} yield (updatedRows, changeEventTimestamp)
}
}
}

result.map { updatedRows =>
result.map { case (updatedRows, changeEventTimestamp) =>
if (updatedRows != 1)
throw new IllegalStateException(
s"Delete failed: durable state for persistence id [$persistenceId] could not be updated to revision [$revision]")
else {
log.debug("Deleted durable state for persistenceId [{}] to revision [{}]", persistenceId, revision)
Done
changeEventTimestamp
}
}

Expand Down
Loading

0 comments on commit 405c448

Please sign in to comment.