Skip to content

Commit

Permalink
lazy journalDao, dialect as factory param
Browse files Browse the repository at this point in the history
  • Loading branch information
patriknw committed Dec 13, 2023
1 parent 405c448 commit b0b535d
Show file tree
Hide file tree
Showing 6 changed files with 48 additions and 47 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -94,9 +94,7 @@ private[r2dbc] object H2Dialect extends Dialect {

override def createDurableStateDao(settings: R2dbcSettings, connectionFactory: ConnectionFactory)(implicit
system: ActorSystem[_]): DurableStateDao =
new H2DurableStateDao(settings, connectionFactory, createJournalDao(settings, connectionFactory))(
ecForDaos(system, settings),
system)
new H2DurableStateDao(settings, connectionFactory, this)(ecForDaos(system, settings), system)

private def ecForDaos(system: ActorSystem[_], settings: R2dbcSettings): ExecutionContext = {
// H2 R2DBC driver blocks in surprising places (Mono.toFuture in stmt.execute().asFuture())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,18 +4,19 @@

package akka.persistence.r2dbc.internal.h2

import akka.actor.typed.ActorSystem
import akka.annotation.InternalApi
import akka.persistence.r2dbc.R2dbcSettings
import akka.persistence.r2dbc.internal.postgres.PostgresDurableStateDao
import io.r2dbc.spi.ConnectionFactory
import org.slf4j.Logger
import org.slf4j.LoggerFactory
import scala.concurrent.ExecutionContext
import scala.concurrent.duration.Duration
import scala.concurrent.duration.FiniteDuration

import akka.persistence.r2dbc.internal.JournalDao
import io.r2dbc.spi.ConnectionFactory
import org.slf4j.Logger
import org.slf4j.LoggerFactory

import akka.actor.typed.ActorSystem
import akka.annotation.InternalApi
import akka.persistence.r2dbc.R2dbcSettings
import akka.persistence.r2dbc.internal.Dialect
import akka.persistence.r2dbc.internal.postgres.PostgresDurableStateDao

/**
* INTERNAL API
Expand All @@ -24,8 +25,8 @@ import akka.persistence.r2dbc.internal.JournalDao
private[r2dbc] final class H2DurableStateDao(
settings: R2dbcSettings,
connectionFactory: ConnectionFactory,
journalDao: JournalDao)(implicit ec: ExecutionContext, system: ActorSystem[_])
extends PostgresDurableStateDao(settings, connectionFactory, journalDao) {
dialect: Dialect)(implicit ec: ExecutionContext, system: ActorSystem[_])
extends PostgresDurableStateDao(settings, connectionFactory, dialect) {

override protected lazy val log: Logger = LoggerFactory.getLogger(classOf[H2DurableStateDao])

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,5 @@ private[r2dbc] object PostgresDialect extends Dialect {

override def createDurableStateDao(settings: R2dbcSettings, connectionFactory: ConnectionFactory)(implicit
system: ActorSystem[_]): DurableStateDao =
new PostgresDurableStateDao(settings, connectionFactory, createJournalDao(settings, connectionFactory))(
system.executionContext,
system)
new PostgresDurableStateDao(settings, connectionFactory, this)(system.executionContext, system)
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,25 @@

package akka.persistence.r2dbc.internal.postgres

import java.lang
import java.time.Instant
import java.util

import scala.collection.immutable
import scala.concurrent.ExecutionContext
import scala.concurrent.Future
import scala.concurrent.duration.Duration
import scala.concurrent.duration.FiniteDuration
import scala.util.control.NonFatal

import io.r2dbc.spi.Connection
import io.r2dbc.spi.ConnectionFactory
import io.r2dbc.spi.R2dbcDataIntegrityViolationException
import io.r2dbc.spi.Row
import io.r2dbc.spi.Statement
import org.slf4j.Logger
import org.slf4j.LoggerFactory

import akka.Done
import akka.NotUsed
import akka.actor.typed.ActorSystem
Expand All @@ -20,8 +39,11 @@ import akka.persistence.r2dbc.internal.AdditionalColumnFactory
import akka.persistence.r2dbc.internal.BySliceQuery.Buckets
import akka.persistence.r2dbc.internal.BySliceQuery.Buckets.Bucket
import akka.persistence.r2dbc.internal.ChangeHandlerFactory
import akka.persistence.r2dbc.internal.Dialect
import akka.persistence.r2dbc.internal.DurableStateDao
import akka.persistence.r2dbc.internal.InstantFactory
import akka.persistence.r2dbc.internal.JournalDao
import akka.persistence.r2dbc.internal.JournalDao.SerializedJournalRow
import akka.persistence.r2dbc.internal.PayloadCodec
import akka.persistence.r2dbc.internal.PayloadCodec.RichRow
import akka.persistence.r2dbc.internal.PayloadCodec.RichStatement
Expand All @@ -33,26 +55,6 @@ import akka.persistence.r2dbc.state.scaladsl.AdditionalColumn
import akka.persistence.r2dbc.state.scaladsl.ChangeHandler
import akka.persistence.typed.PersistenceId
import akka.stream.scaladsl.Source
import io.r2dbc.spi.Connection
import io.r2dbc.spi.ConnectionFactory
import io.r2dbc.spi.R2dbcDataIntegrityViolationException
import io.r2dbc.spi.Row
import io.r2dbc.spi.Statement
import org.slf4j.Logger
import org.slf4j.LoggerFactory
import java.lang
import java.time.Instant
import java.util

import scala.collection.immutable
import scala.concurrent.ExecutionContext
import scala.concurrent.Future
import scala.concurrent.duration.Duration
import scala.concurrent.duration.FiniteDuration
import scala.util.control.NonFatal

import akka.persistence.r2dbc.internal.JournalDao
import akka.persistence.r2dbc.internal.JournalDao.SerializedJournalRow

/**
* INTERNAL API
Expand All @@ -77,7 +79,7 @@ private[r2dbc] object PostgresDurableStateDao {
private[r2dbc] class PostgresDurableStateDao(
settings: R2dbcSettings,
connectionFactory: ConnectionFactory,
journalDao: JournalDao)(implicit ec: ExecutionContext, system: ActorSystem[_])
dialect: Dialect)(implicit ec: ExecutionContext, system: ActorSystem[_])
extends DurableStateDao {
import DurableStateDao._
import PostgresDurableStateDao._
Expand All @@ -92,6 +94,9 @@ private[r2dbc] class PostgresDurableStateDao(

private implicit val statePayloadCodec: PayloadCodec = settings.durableStatePayloadCodec

// used for change events
private lazy val journalDao: JournalDao = dialect.createJournalDao(settings, connectionFactory)

private lazy val additionalColumns: Map[String, immutable.IndexedSeq[AdditionalColumn[Any, Any]]] = {
settings.durableStateAdditionalColumnClasses.map { case (entityType, columnClasses) =>
val instances = columnClasses.map(fqcn => AdditionalColumnFactory.create(system, fqcn))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,5 @@ private[r2dbc] object YugabyteDialect extends Dialect {

override def createDurableStateDao(settings: R2dbcSettings, connectionFactory: ConnectionFactory)(implicit
system: ActorSystem[_]): DurableStateDao =
new YugabyteDurableStateDao(settings, connectionFactory, createJournalDao(settings, connectionFactory))(
system.executionContext,
system)
new YugabyteDurableStateDao(settings, connectionFactory, this)(system.executionContext, system)
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,15 +4,16 @@

package akka.persistence.r2dbc.internal.postgres

import akka.actor.typed.ActorSystem
import akka.annotation.InternalApi
import akka.persistence.r2dbc.R2dbcSettings
import scala.concurrent.ExecutionContext

import io.r2dbc.spi._
import org.slf4j.Logger
import org.slf4j.LoggerFactory
import scala.concurrent.ExecutionContext

import akka.persistence.r2dbc.internal.JournalDao
import akka.actor.typed.ActorSystem
import akka.annotation.InternalApi
import akka.persistence.r2dbc.R2dbcSettings
import akka.persistence.r2dbc.internal.Dialect

/**
* INTERNAL API
Expand All @@ -21,8 +22,8 @@ import akka.persistence.r2dbc.internal.JournalDao
private[r2dbc] final class YugabyteDurableStateDao(
settings: R2dbcSettings,
connectionFactory: ConnectionFactory,
journalDao: JournalDao)(implicit ec: ExecutionContext, system: ActorSystem[_])
extends PostgresDurableStateDao(settings, connectionFactory, journalDao) {
dialect: Dialect)(implicit ec: ExecutionContext, system: ActorSystem[_])
extends PostgresDurableStateDao(settings, connectionFactory, dialect) {

override protected lazy val log: Logger = LoggerFactory.getLogger(classOf[YugabyteDurableStateDao])

Expand Down

0 comments on commit b0b535d

Please sign in to comment.