Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Write change event of DurableState to event journal #485

Merged
merged 6 commits into from
Dec 19, 2023
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import akka.actor.typed.ActorSystem
import akka.actor.typed.scaladsl.LoggerOps
import akka.annotation.ApiMayChange
import akka.annotation.InternalApi
import akka.dispatch.ExecutionContexts
import akka.persistence.r2dbc.ConnectionFactoryProvider
import akka.persistence.r2dbc.R2dbcSettings
import akka.persistence.r2dbc.internal.DurableStateDao
Expand Down Expand Up @@ -65,11 +66,17 @@ final class DurableStateCleanup(systemProvider: ClassicActorSystemProvider, conf
*/
def deleteState(persistenceId: String, resetRevisionNumber: Boolean): Future[Done] = {
if (resetRevisionNumber)
stateDao.deleteState(persistenceId, revision = 0L) // hard delete without revision check
stateDao
.deleteState(persistenceId, revision = 0L, changeEvent = None) // hard delete without revision check
.map(_ => Done)(ExecutionContexts.parasitic)
else {
stateDao.readState(persistenceId).flatMap {
case None => Future.successful(Done) // already deleted
case Some(s) => stateDao.deleteState(persistenceId, s.revision + 1)
case None =>
Future.successful(Done) // already deleted
case Some(s) =>
stateDao
.deleteState(persistenceId, s.revision + 1, changeEvent = None)
.map(_ => Done)(ExecutionContexts.parasitic)
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,12 @@ import akka.Done
import akka.NotUsed
import akka.annotation.InternalApi
import akka.stream.scaladsl.Source

import java.time.Instant

import scala.concurrent.Future

import akka.persistence.r2dbc.internal.JournalDao.SerializedJournalRow

/**
* INTERNAL API
*/
Expand Down Expand Up @@ -48,9 +50,15 @@ private[r2dbc] trait DurableStateDao extends BySliceQuery.Dao[DurableStateDao.Se

def readState(persistenceId: String): Future[Option[SerializedStateRow]]

def upsertState(state: SerializedStateRow, value: Any): Future[Done]
def upsertState(
state: SerializedStateRow,
value: Any,
changeEvent: Option[SerializedJournalRow]): Future[Option[Instant]]

def deleteState(persistenceId: String, revision: Long): Future[Done]
def deleteState(
persistenceId: String,
revision: Long,
changeEvent: Option[SerializedJournalRow]): Future[Option[Instant]]

def persistenceIds(afterId: Option[String], limit: Long): Source[String, NotUsed]

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,13 @@
package akka.persistence.r2dbc.internal

import akka.annotation.InternalApi

import java.time.Instant

import scala.concurrent.Future

import akka.persistence.r2dbc.internal.JournalDao.SerializedJournalRow
import io.r2dbc.spi.Connection

/**
* INTERNAL API
*/
Expand Down Expand Up @@ -56,6 +59,9 @@ private[r2dbc] trait JournalDao {
* a select (in same transaction).
*/
def writeEvents(events: Seq[JournalDao.SerializedJournalRow]): Future[Instant]

def writeEventInTx(event: SerializedJournalRow, connection: Connection): Future[Instant]

def readHighestSequenceNr(persistenceId: String, fromSequenceNr: Long): Future[Long]

def readLowestSequenceNr(persistenceId: String): Future[Long]
Expand Down
66 changes: 42 additions & 24 deletions core/src/main/scala/akka/persistence/r2dbc/internal/PubSub.scala
Original file line number Diff line number Diff line change
Expand Up @@ -91,29 +91,7 @@ import org.slf4j.LoggerFactory
}

def publish(pr: PersistentRepr, timestamp: Instant): Unit = {

val n = throughputCounter.incrementAndGet()
if (n % throughputSampler == 0) {
val ewma = throughput
val durationMillis = (System.nanoTime() - ewma.nanoTime) / 1000 / 1000
if (durationMillis >= throughputCollectIntervalMillis) {
// doesn't have to be exact so "missed" or duplicate concurrent calls don't matter
throughputCounter.set(0L)
val rps = n * 1000.0 / durationMillis
val newEwma = ewma :+ rps
throughput = newEwma
if (ewma.value < throughputThreshold && newEwma.value >= throughputThreshold) {
log.info("Disabled publishing of events. Throughput greater than [{}] events/s", throughputThreshold)
} else if (ewma.value >= throughputThreshold && newEwma.value < throughputThreshold) {
log.info("Enabled publishing of events. Throughput less than [{}] events/s", throughputThreshold)
} else {
log.debug(
"Publishing of events is {}. Throughput is [{}] events/s",
if (newEwma.value < throughputThreshold) "enabled" else "disabled",
newEwma.value)
}
}
}
updateThroughput()

if (throughput.value < throughputThreshold) {
val pid = pr.persistenceId
Expand Down Expand Up @@ -143,7 +121,47 @@ import org.slf4j.LoggerFactory
filtered,
source = EnvelopeOrigin.SourcePubSub,
tags)
eventTopic(entityType, slice) ! Topic.Publish(envelope)

publishToTopic(envelope)
}
}

def publish(envelope: EventEnvelope[Any]): Unit = {
updateThroughput()

if (throughput.value < throughputThreshold)
publishToTopic(envelope)
}

private def publishToTopic(envelope: EventEnvelope[Any]): Unit = {
val entityType = PersistenceId.extractEntityType(envelope.persistenceId)
val slice = persistenceExt.sliceForPersistenceId(envelope.persistenceId)

eventTopic(entityType, slice) ! Topic.Publish(envelope)
}

private def updateThroughput(): Unit = {
val n = throughputCounter.incrementAndGet()
if (n % throughputSampler == 0) {
val ewma = throughput
val durationMillis = (System.nanoTime() - ewma.nanoTime) / 1000 / 1000
if (durationMillis >= throughputCollectIntervalMillis) {
// doesn't have to be exact so "missed" or duplicate concurrent calls don't matter
throughputCounter.set(0L)
val rps = n * 1000.0 / durationMillis
val newEwma = ewma :+ rps
throughput = newEwma
if (ewma.value < throughputThreshold && newEwma.value >= throughputThreshold) {
log.info("Disabled publishing of events. Throughput greater than [{}] events/s", throughputThreshold)
} else if (ewma.value >= throughputThreshold && newEwma.value < throughputThreshold) {
log.info("Enabled publishing of events. Throughput less than [{}] events/s", throughputThreshold)
} else {
log.debug(
"Publishing of events is {}. Throughput is [{}] events/s",
if (newEwma.value < throughputThreshold) "enabled" else "disabled",
newEwma.value)
}
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,9 @@ private[r2dbc] object H2Dialect extends Dialect {

override def createDurableStateDao(settings: R2dbcSettings, connectionFactory: ConnectionFactory)(implicit
system: ActorSystem[_]): DurableStateDao =
new H2DurableStateDao(settings, connectionFactory)(ecForDaos(system, settings), system)
new H2DurableStateDao(settings, connectionFactory, createJournalDao(settings, connectionFactory))(
ecForDaos(system, settings),
system)

private def ecForDaos(system: ActorSystem[_], settings: R2dbcSettings): ExecutionContext = {
// H2 R2DBC driver blocks in surprising places (Mono.toFuture in stmt.execute().asFuture())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,19 +11,21 @@ import akka.persistence.r2dbc.internal.postgres.PostgresDurableStateDao
import io.r2dbc.spi.ConnectionFactory
import org.slf4j.Logger
import org.slf4j.LoggerFactory

import scala.concurrent.ExecutionContext
import scala.concurrent.duration.Duration
import scala.concurrent.duration.FiniteDuration

import akka.persistence.r2dbc.internal.JournalDao

/**
* INTERNAL API
*/
@InternalApi
private[r2dbc] final class H2DurableStateDao(settings: R2dbcSettings, connectionFactory: ConnectionFactory)(implicit
ec: ExecutionContext,
system: ActorSystem[_])
extends PostgresDurableStateDao(settings, connectionFactory) {
private[r2dbc] final class H2DurableStateDao(
settings: R2dbcSettings,
connectionFactory: ConnectionFactory,
journalDao: JournalDao)(implicit ec: ExecutionContext, system: ActorSystem[_])
extends PostgresDurableStateDao(settings, connectionFactory, journalDao) {

override protected lazy val log: Logger = LoggerFactory.getLogger(classOf[H2DurableStateDao])

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -129,5 +129,7 @@ private[r2dbc] object PostgresDialect extends Dialect {

override def createDurableStateDao(settings: R2dbcSettings, connectionFactory: ConnectionFactory)(implicit
system: ActorSystem[_]): DurableStateDao =
new PostgresDurableStateDao(settings, connectionFactory)(system.executionContext, system)
new PostgresDurableStateDao(settings, connectionFactory, createJournalDao(settings, connectionFactory))(
system.executionContext,
system)
}
Loading
Loading