From b0b535da3b77461021ce3fb1fc08fa1b7bd6139b Mon Sep 17 00:00:00 2001 From: Patrik Nordwall Date: Wed, 13 Dec 2023 10:24:07 +0100 Subject: [PATCH] lazy journalDao, dialect as factory param --- .../r2dbc/internal/h2/H2Dialect.scala | 4 +- .../r2dbc/internal/h2/H2DurableStateDao.scala | 21 +++++---- .../internal/postgres/PostgresDialect.scala | 4 +- .../postgres/PostgresDurableStateDao.scala | 47 ++++++++++--------- .../internal/postgres/YugabyteDialect.scala | 4 +- .../postgres/YugabyteDurableStateDao.scala | 15 +++--- 6 files changed, 48 insertions(+), 47 deletions(-) diff --git a/core/src/main/scala/akka/persistence/r2dbc/internal/h2/H2Dialect.scala b/core/src/main/scala/akka/persistence/r2dbc/internal/h2/H2Dialect.scala index fbe219cf..bf79398a 100644 --- a/core/src/main/scala/akka/persistence/r2dbc/internal/h2/H2Dialect.scala +++ b/core/src/main/scala/akka/persistence/r2dbc/internal/h2/H2Dialect.scala @@ -94,9 +94,7 @@ private[r2dbc] object H2Dialect extends Dialect { override def createDurableStateDao(settings: R2dbcSettings, connectionFactory: ConnectionFactory)(implicit system: ActorSystem[_]): DurableStateDao = - new H2DurableStateDao(settings, connectionFactory, createJournalDao(settings, connectionFactory))( - ecForDaos(system, settings), - system) + new H2DurableStateDao(settings, connectionFactory, this)(ecForDaos(system, settings), system) private def ecForDaos(system: ActorSystem[_], settings: R2dbcSettings): ExecutionContext = { // H2 R2DBC driver blocks in surprising places (Mono.toFuture in stmt.execute().asFuture()) diff --git a/core/src/main/scala/akka/persistence/r2dbc/internal/h2/H2DurableStateDao.scala b/core/src/main/scala/akka/persistence/r2dbc/internal/h2/H2DurableStateDao.scala index 180ef355..27393ed0 100644 --- a/core/src/main/scala/akka/persistence/r2dbc/internal/h2/H2DurableStateDao.scala +++ b/core/src/main/scala/akka/persistence/r2dbc/internal/h2/H2DurableStateDao.scala @@ -4,18 +4,19 @@ package akka.persistence.r2dbc.internal.h2 -import akka.actor.typed.ActorSystem -import akka.annotation.InternalApi -import akka.persistence.r2dbc.R2dbcSettings -import akka.persistence.r2dbc.internal.postgres.PostgresDurableStateDao -import io.r2dbc.spi.ConnectionFactory -import org.slf4j.Logger -import org.slf4j.LoggerFactory import scala.concurrent.ExecutionContext import scala.concurrent.duration.Duration import scala.concurrent.duration.FiniteDuration -import akka.persistence.r2dbc.internal.JournalDao +import io.r2dbc.spi.ConnectionFactory +import org.slf4j.Logger +import org.slf4j.LoggerFactory + +import akka.actor.typed.ActorSystem +import akka.annotation.InternalApi +import akka.persistence.r2dbc.R2dbcSettings +import akka.persistence.r2dbc.internal.Dialect +import akka.persistence.r2dbc.internal.postgres.PostgresDurableStateDao /** * INTERNAL API @@ -24,8 +25,8 @@ import akka.persistence.r2dbc.internal.JournalDao private[r2dbc] final class H2DurableStateDao( settings: R2dbcSettings, connectionFactory: ConnectionFactory, - journalDao: JournalDao)(implicit ec: ExecutionContext, system: ActorSystem[_]) - extends PostgresDurableStateDao(settings, connectionFactory, journalDao) { + dialect: Dialect)(implicit ec: ExecutionContext, system: ActorSystem[_]) + extends PostgresDurableStateDao(settings, connectionFactory, dialect) { override protected lazy val log: Logger = LoggerFactory.getLogger(classOf[H2DurableStateDao]) diff --git a/core/src/main/scala/akka/persistence/r2dbc/internal/postgres/PostgresDialect.scala b/core/src/main/scala/akka/persistence/r2dbc/internal/postgres/PostgresDialect.scala index 9c9362dd..d8dad720 100644 --- a/core/src/main/scala/akka/persistence/r2dbc/internal/postgres/PostgresDialect.scala +++ b/core/src/main/scala/akka/persistence/r2dbc/internal/postgres/PostgresDialect.scala @@ -129,7 +129,5 @@ private[r2dbc] object PostgresDialect extends Dialect { override def createDurableStateDao(settings: R2dbcSettings, connectionFactory: ConnectionFactory)(implicit system: ActorSystem[_]): DurableStateDao = - new PostgresDurableStateDao(settings, connectionFactory, createJournalDao(settings, connectionFactory))( - system.executionContext, - system) + new PostgresDurableStateDao(settings, connectionFactory, this)(system.executionContext, system) } diff --git a/core/src/main/scala/akka/persistence/r2dbc/internal/postgres/PostgresDurableStateDao.scala b/core/src/main/scala/akka/persistence/r2dbc/internal/postgres/PostgresDurableStateDao.scala index 10fe105e..4037ae7f 100644 --- a/core/src/main/scala/akka/persistence/r2dbc/internal/postgres/PostgresDurableStateDao.scala +++ b/core/src/main/scala/akka/persistence/r2dbc/internal/postgres/PostgresDurableStateDao.scala @@ -4,6 +4,25 @@ package akka.persistence.r2dbc.internal.postgres +import java.lang +import java.time.Instant +import java.util + +import scala.collection.immutable +import scala.concurrent.ExecutionContext +import scala.concurrent.Future +import scala.concurrent.duration.Duration +import scala.concurrent.duration.FiniteDuration +import scala.util.control.NonFatal + +import io.r2dbc.spi.Connection +import io.r2dbc.spi.ConnectionFactory +import io.r2dbc.spi.R2dbcDataIntegrityViolationException +import io.r2dbc.spi.Row +import io.r2dbc.spi.Statement +import org.slf4j.Logger +import org.slf4j.LoggerFactory + import akka.Done import akka.NotUsed import akka.actor.typed.ActorSystem @@ -20,8 +39,11 @@ import akka.persistence.r2dbc.internal.AdditionalColumnFactory import akka.persistence.r2dbc.internal.BySliceQuery.Buckets import akka.persistence.r2dbc.internal.BySliceQuery.Buckets.Bucket import akka.persistence.r2dbc.internal.ChangeHandlerFactory +import akka.persistence.r2dbc.internal.Dialect import akka.persistence.r2dbc.internal.DurableStateDao import akka.persistence.r2dbc.internal.InstantFactory +import akka.persistence.r2dbc.internal.JournalDao +import akka.persistence.r2dbc.internal.JournalDao.SerializedJournalRow import akka.persistence.r2dbc.internal.PayloadCodec import akka.persistence.r2dbc.internal.PayloadCodec.RichRow import akka.persistence.r2dbc.internal.PayloadCodec.RichStatement @@ -33,26 +55,6 @@ import akka.persistence.r2dbc.state.scaladsl.AdditionalColumn import akka.persistence.r2dbc.state.scaladsl.ChangeHandler import akka.persistence.typed.PersistenceId import akka.stream.scaladsl.Source -import io.r2dbc.spi.Connection -import io.r2dbc.spi.ConnectionFactory -import io.r2dbc.spi.R2dbcDataIntegrityViolationException -import io.r2dbc.spi.Row -import io.r2dbc.spi.Statement -import org.slf4j.Logger -import org.slf4j.LoggerFactory -import java.lang -import java.time.Instant -import java.util - -import scala.collection.immutable -import scala.concurrent.ExecutionContext -import scala.concurrent.Future -import scala.concurrent.duration.Duration -import scala.concurrent.duration.FiniteDuration -import scala.util.control.NonFatal - -import akka.persistence.r2dbc.internal.JournalDao -import akka.persistence.r2dbc.internal.JournalDao.SerializedJournalRow /** * INTERNAL API @@ -77,7 +79,7 @@ private[r2dbc] object PostgresDurableStateDao { private[r2dbc] class PostgresDurableStateDao( settings: R2dbcSettings, connectionFactory: ConnectionFactory, - journalDao: JournalDao)(implicit ec: ExecutionContext, system: ActorSystem[_]) + dialect: Dialect)(implicit ec: ExecutionContext, system: ActorSystem[_]) extends DurableStateDao { import DurableStateDao._ import PostgresDurableStateDao._ @@ -92,6 +94,9 @@ private[r2dbc] class PostgresDurableStateDao( private implicit val statePayloadCodec: PayloadCodec = settings.durableStatePayloadCodec + // used for change events + private lazy val journalDao: JournalDao = dialect.createJournalDao(settings, connectionFactory) + private lazy val additionalColumns: Map[String, immutable.IndexedSeq[AdditionalColumn[Any, Any]]] = { settings.durableStateAdditionalColumnClasses.map { case (entityType, columnClasses) => val instances = columnClasses.map(fqcn => AdditionalColumnFactory.create(system, fqcn)) diff --git a/core/src/main/scala/akka/persistence/r2dbc/internal/postgres/YugabyteDialect.scala b/core/src/main/scala/akka/persistence/r2dbc/internal/postgres/YugabyteDialect.scala index 27b273dd..9de58b86 100644 --- a/core/src/main/scala/akka/persistence/r2dbc/internal/postgres/YugabyteDialect.scala +++ b/core/src/main/scala/akka/persistence/r2dbc/internal/postgres/YugabyteDialect.scala @@ -42,7 +42,5 @@ private[r2dbc] object YugabyteDialect extends Dialect { override def createDurableStateDao(settings: R2dbcSettings, connectionFactory: ConnectionFactory)(implicit system: ActorSystem[_]): DurableStateDao = - new YugabyteDurableStateDao(settings, connectionFactory, createJournalDao(settings, connectionFactory))( - system.executionContext, - system) + new YugabyteDurableStateDao(settings, connectionFactory, this)(system.executionContext, system) } diff --git a/core/src/main/scala/akka/persistence/r2dbc/internal/postgres/YugabyteDurableStateDao.scala b/core/src/main/scala/akka/persistence/r2dbc/internal/postgres/YugabyteDurableStateDao.scala index eabfaf9f..9a9ba030 100644 --- a/core/src/main/scala/akka/persistence/r2dbc/internal/postgres/YugabyteDurableStateDao.scala +++ b/core/src/main/scala/akka/persistence/r2dbc/internal/postgres/YugabyteDurableStateDao.scala @@ -4,15 +4,16 @@ package akka.persistence.r2dbc.internal.postgres -import akka.actor.typed.ActorSystem -import akka.annotation.InternalApi -import akka.persistence.r2dbc.R2dbcSettings +import scala.concurrent.ExecutionContext + import io.r2dbc.spi._ import org.slf4j.Logger import org.slf4j.LoggerFactory -import scala.concurrent.ExecutionContext -import akka.persistence.r2dbc.internal.JournalDao +import akka.actor.typed.ActorSystem +import akka.annotation.InternalApi +import akka.persistence.r2dbc.R2dbcSettings +import akka.persistence.r2dbc.internal.Dialect /** * INTERNAL API @@ -21,8 +22,8 @@ import akka.persistence.r2dbc.internal.JournalDao private[r2dbc] final class YugabyteDurableStateDao( settings: R2dbcSettings, connectionFactory: ConnectionFactory, - journalDao: JournalDao)(implicit ec: ExecutionContext, system: ActorSystem[_]) - extends PostgresDurableStateDao(settings, connectionFactory, journalDao) { + dialect: Dialect)(implicit ec: ExecutionContext, system: ActorSystem[_]) + extends PostgresDurableStateDao(settings, connectionFactory, dialect) { override protected lazy val log: Logger = LoggerFactory.getLogger(classOf[YugabyteDurableStateDao])