From 74274a156a4c11de72b6373ebe7622804edabe95 Mon Sep 17 00:00:00 2001 From: Patrik Nordwall Date: Mon, 5 Feb 2024 12:18:19 +0100 Subject: [PATCH] chore: Dao ExecutionContext in R2dbcExecutorProvider (#518) * H2 requires blocking dispatcher, which was lost in the data partitions refactoring * reduce number of parameters of dao creation since everything needed is included in R2dbcExecutorProvider --- .../scaladsl/DurableStateCleanup.scala | 9 +++- .../scaladsl/EventSourcedCleanup.scala | 11 ++-- .../persistence/r2dbc/internal/Dialect.scala | 14 +++-- .../r2dbc/internal/R2dbcExecutor.scala | 6 ++- .../r2dbc/internal/h2/H2Dialect.scala | 30 +++++------ .../r2dbc/internal/h2/H2DurableStateDao.scala | 10 +--- .../r2dbc/internal/h2/H2JournalDao.scala | 34 ++++++------ .../r2dbc/internal/h2/H2QueryDao.scala | 21 +++----- .../r2dbc/internal/h2/H2SnapshotDao.scala | 18 ++----- .../internal/postgres/PostgresDialect.scala | 24 ++++----- .../postgres/PostgresDurableStateDao.scala | 13 +++-- .../postgres/PostgresJournalDao.scala | 52 +++++++++---------- .../internal/postgres/PostgresQueryDao.scala | 29 +++++------ .../postgres/PostgresSnapshotDao.scala | 10 ++-- .../internal/postgres/YugabyteDialect.scala | 25 ++++----- .../postgres/YugabyteDurableStateDao.scala | 11 +--- .../internal/postgres/YugabyteQueryDao.scala | 13 ++--- .../postgres/YugabyteSnapshotDao.scala | 14 ++--- .../internal/sqlserver/SqlServerDialect.scala | 24 ++++----- .../sqlserver/SqlServerDurableStateDao.scala | 10 +--- .../sqlserver/SqlServerJournalDao.scala | 19 +++---- .../sqlserver/SqlServerQueryDao.scala | 20 +++---- .../sqlserver/SqlServerSnapshotDao.scala | 23 ++++---- .../r2dbc/journal/R2dbcJournal.scala | 9 +++- .../query/scaladsl/R2dbcReadJournal.scala | 15 +++--- .../r2dbc/snapshot/R2dbcSnapshotStore.scala | 11 ++-- .../scaladsl/R2dbcDurableStateStore.scala | 11 ++-- .../persistence/r2dbc/TestDbLifecycle.scala | 9 ++-- .../r2dbc/migration/MigrationTool.scala | 8 +-- 29 files changed, 227 insertions(+), 276 deletions(-) diff --git a/core/src/main/scala/akka/persistence/r2dbc/cleanup/scaladsl/DurableStateCleanup.scala b/core/src/main/scala/akka/persistence/r2dbc/cleanup/scaladsl/DurableStateCleanup.scala index 0572792f..5d7b6cf8 100644 --- a/core/src/main/scala/akka/persistence/r2dbc/cleanup/scaladsl/DurableStateCleanup.scala +++ b/core/src/main/scala/akka/persistence/r2dbc/cleanup/scaladsl/DurableStateCleanup.scala @@ -58,8 +58,13 @@ final class DurableStateCleanup(systemProvider: ClassicActorSystemProvider, conf private val settings = R2dbcSettings(system.settings.config.getConfig(sharedConfigPath)) private val executorProvider = - new R2dbcExecutorProvider(settings, sharedConfigPath + ".connection-factory", LoggerFactory.getLogger(getClass)) - private val stateDao = settings.connectionFactorySettings.dialect.createDurableStateDao(settings, executorProvider) + new R2dbcExecutorProvider( + system, + settings.connectionFactorySettings.dialect.daoExecutionContext(settings, system), + settings, + sharedConfigPath + ".connection-factory", + LoggerFactory.getLogger(getClass)) + private val stateDao = settings.connectionFactorySettings.dialect.createDurableStateDao(executorProvider) /** * Delete the state related to one single `persistenceId`. diff --git a/core/src/main/scala/akka/persistence/r2dbc/cleanup/scaladsl/EventSourcedCleanup.scala b/core/src/main/scala/akka/persistence/r2dbc/cleanup/scaladsl/EventSourcedCleanup.scala index 9244ad1e..1138ee9c 100644 --- a/core/src/main/scala/akka/persistence/r2dbc/cleanup/scaladsl/EventSourcedCleanup.scala +++ b/core/src/main/scala/akka/persistence/r2dbc/cleanup/scaladsl/EventSourcedCleanup.scala @@ -60,9 +60,14 @@ final class EventSourcedCleanup(systemProvider: ClassicActorSystemProvider, conf private val settings = R2dbcSettings(system.settings.config.getConfig(sharedConfigPath)) private val executorProvider = - new R2dbcExecutorProvider(settings, sharedConfigPath + ".connection-factory", LoggerFactory.getLogger(getClass)) - private val journalDao = settings.connectionFactorySettings.dialect.createJournalDao(settings, executorProvider) - private val snapshotDao = settings.connectionFactorySettings.dialect.createSnapshotDao(settings, executorProvider) + new R2dbcExecutorProvider( + system, + settings.connectionFactorySettings.dialect.daoExecutionContext(settings, system), + settings, + sharedConfigPath + ".connection-factory", + LoggerFactory.getLogger(getClass)) + private val journalDao = settings.connectionFactorySettings.dialect.createJournalDao(executorProvider) + private val snapshotDao = settings.connectionFactorySettings.dialect.createSnapshotDao(executorProvider) /** * Delete all events before a sequenceNr for the given persistence id. Snapshots are not deleted. diff --git a/core/src/main/scala/akka/persistence/r2dbc/internal/Dialect.scala b/core/src/main/scala/akka/persistence/r2dbc/internal/Dialect.scala index 0b2636d6..f66617cd 100644 --- a/core/src/main/scala/akka/persistence/r2dbc/internal/Dialect.scala +++ b/core/src/main/scala/akka/persistence/r2dbc/internal/Dialect.scala @@ -29,17 +29,15 @@ private[r2dbc] trait Dialect { */ def adaptSettings(settings: R2dbcSettings): R2dbcSettings = settings + def daoExecutionContext(settings: R2dbcSettings, system: ActorSystem[_]): ExecutionContext + def createConnectionFactory(config: Config): ConnectionFactory - def createJournalDao(settings: R2dbcSettings, executorProvider: R2dbcExecutorProvider)(implicit - system: ActorSystem[_]): JournalDao + def createJournalDao(executorProvider: R2dbcExecutorProvider): JournalDao - def createQueryDao(settings: R2dbcSettings, executorProvider: R2dbcExecutorProvider)(implicit - system: ActorSystem[_]): QueryDao + def createQueryDao(executorProvider: R2dbcExecutorProvider): QueryDao - def createSnapshotDao(settings: R2dbcSettings, executorProvider: R2dbcExecutorProvider)(implicit - system: ActorSystem[_]): SnapshotDao + def createSnapshotDao(executorProvider: R2dbcExecutorProvider): SnapshotDao - def createDurableStateDao(settings: R2dbcSettings, executorProvider: R2dbcExecutorProvider)(implicit - system: ActorSystem[_]): DurableStateDao + def createDurableStateDao(executorProvider: R2dbcExecutorProvider): DurableStateDao } diff --git a/core/src/main/scala/akka/persistence/r2dbc/internal/R2dbcExecutor.scala b/core/src/main/scala/akka/persistence/r2dbc/internal/R2dbcExecutor.scala index 537a370a..cd4391b4 100644 --- a/core/src/main/scala/akka/persistence/r2dbc/internal/R2dbcExecutor.scala +++ b/core/src/main/scala/akka/persistence/r2dbc/internal/R2dbcExecutor.scala @@ -392,9 +392,11 @@ class R2dbcExecutor( * INTERNAL API */ @InternalStableApi class R2dbcExecutorProvider( + val system: ActorSystem[_], + val ec: ExecutionContext, val settings: R2dbcSettings, connectionFactoryBaseConfigPath: String, - log: Logger)(implicit ec: ExecutionContext, system: ActorSystem[_]) { + log: Logger) { private val connectionFactoryProvider = ConnectionFactoryProvider(system) private var cache = IntMap.empty[R2dbcExecutor] @@ -409,7 +411,7 @@ class R2dbcExecutor( connectionFactory, log, settings.logDbCallsExceeding, - settings.connectionFactorySettings.poolSettings.closeCallsExceeding) + settings.connectionFactorySettings.poolSettings.closeCallsExceeding)(ec, system) // it's just a cache so no need for guarding concurrent updates or visibility cache = cache.updated(slice, executor) executor diff --git a/core/src/main/scala/akka/persistence/r2dbc/internal/h2/H2Dialect.scala b/core/src/main/scala/akka/persistence/r2dbc/internal/h2/H2Dialect.scala index 5764fb74..3550a4cb 100644 --- a/core/src/main/scala/akka/persistence/r2dbc/internal/h2/H2Dialect.scala +++ b/core/src/main/scala/akka/persistence/r2dbc/internal/h2/H2Dialect.scala @@ -87,28 +87,24 @@ private[r2dbc] object H2Dialect extends Dialect { new H2ConnectionFactory(h2Config) } - override def createJournalDao(settings: R2dbcSettings, executorProvider: R2dbcExecutorProvider)(implicit - system: ActorSystem[_]): JournalDao = - new H2JournalDao(settings, executorProvider)(ecForDaos(system, settings), system) - - override def createSnapshotDao(settings: R2dbcSettings, executorProvider: R2dbcExecutorProvider)(implicit - system: ActorSystem[_]): SnapshotDao = - new H2SnapshotDao(settings, executorProvider)(ecForDaos(system, settings), system) - - override def createQueryDao(settings: R2dbcSettings, executorProvider: R2dbcExecutorProvider)(implicit - system: ActorSystem[_]): QueryDao = - new H2QueryDao(settings, executorProvider)(ecForDaos(system, settings), system) - - override def createDurableStateDao(settings: R2dbcSettings, executorProvider: R2dbcExecutorProvider)(implicit - system: ActorSystem[_]): DurableStateDao = - new H2DurableStateDao(settings, executorProvider, this)(ecForDaos(system, settings), system) - - private def ecForDaos(system: ActorSystem[_], settings: R2dbcSettings): ExecutionContext = { + override def daoExecutionContext(settings: R2dbcSettings, system: ActorSystem[_]): ExecutionContext = { // H2 R2DBC driver blocks in surprising places (Mono.toFuture in stmt.execute().asFuture()) system.dispatchers.lookup( DispatcherSelector.fromConfig(settings.connectionFactorySettings.config.getString("use-dispatcher"))) } + override def createJournalDao(executorProvider: R2dbcExecutorProvider): JournalDao = + new H2JournalDao(executorProvider) + + override def createSnapshotDao(executorProvider: R2dbcExecutorProvider): SnapshotDao = + new H2SnapshotDao(executorProvider) + + override def createQueryDao(executorProvider: R2dbcExecutorProvider): QueryDao = + new H2QueryDao(executorProvider) + + override def createDurableStateDao(executorProvider: R2dbcExecutorProvider): DurableStateDao = + new H2DurableStateDao(executorProvider, this) + private def dbSchema(config: Config, createSliceIndexes: Boolean, additionalInit: String): String = { def optionalConfString(name: String): Option[String] = { val s = config.getString(name) diff --git a/core/src/main/scala/akka/persistence/r2dbc/internal/h2/H2DurableStateDao.scala b/core/src/main/scala/akka/persistence/r2dbc/internal/h2/H2DurableStateDao.scala index e25d1382..2d4ef84a 100644 --- a/core/src/main/scala/akka/persistence/r2dbc/internal/h2/H2DurableStateDao.scala +++ b/core/src/main/scala/akka/persistence/r2dbc/internal/h2/H2DurableStateDao.scala @@ -4,16 +4,13 @@ package akka.persistence.r2dbc.internal.h2 -import scala.concurrent.ExecutionContext import scala.concurrent.duration.Duration import scala.concurrent.duration.FiniteDuration import org.slf4j.Logger import org.slf4j.LoggerFactory -import akka.actor.typed.ActorSystem import akka.annotation.InternalApi -import akka.persistence.r2dbc.R2dbcSettings import akka.persistence.r2dbc.internal.Dialect import akka.persistence.r2dbc.internal.R2dbcExecutorProvider import akka.persistence.r2dbc.internal.postgres.PostgresDurableStateDao @@ -22,11 +19,8 @@ import akka.persistence.r2dbc.internal.postgres.PostgresDurableStateDao * INTERNAL API */ @InternalApi -private[r2dbc] final class H2DurableStateDao( - settings: R2dbcSettings, - executorProvider: R2dbcExecutorProvider, - dialect: Dialect)(implicit ec: ExecutionContext, system: ActorSystem[_]) - extends PostgresDurableStateDao(settings, executorProvider, dialect) { +private[r2dbc] final class H2DurableStateDao(executorProvider: R2dbcExecutorProvider, dialect: Dialect) + extends PostgresDurableStateDao(executorProvider, dialect) { override protected lazy val log: Logger = LoggerFactory.getLogger(classOf[H2DurableStateDao]) diff --git a/core/src/main/scala/akka/persistence/r2dbc/internal/h2/H2JournalDao.scala b/core/src/main/scala/akka/persistence/r2dbc/internal/h2/H2JournalDao.scala index cf577310..344becbd 100644 --- a/core/src/main/scala/akka/persistence/r2dbc/internal/h2/H2JournalDao.scala +++ b/core/src/main/scala/akka/persistence/r2dbc/internal/h2/H2JournalDao.scala @@ -4,41 +4,37 @@ package akka.persistence.r2dbc.internal.h2 -import akka.actor.typed.ActorSystem -import akka.annotation.InternalApi -import akka.dispatch.ExecutionContexts -import akka.persistence.r2dbc.R2dbcSettings -import akka.persistence.r2dbc.internal.JournalDao -import akka.persistence.r2dbc.internal.codec.PayloadCodec.RichStatement -import akka.persistence.r2dbc.internal.Sql.InterpolationWithAdapter -import akka.persistence.r2dbc.internal.postgres.PostgresJournalDao -import io.r2dbc.spi.Statement -import org.slf4j.Logger -import org.slf4j.LoggerFactory import java.time.Instant -import scala.concurrent.ExecutionContext import scala.concurrent.Future import io.r2dbc.spi.Connection +import io.r2dbc.spi.Statement +import org.slf4j.Logger +import org.slf4j.LoggerFactory +import akka.annotation.InternalApi +import akka.dispatch.ExecutionContexts +import akka.persistence.r2dbc.internal.JournalDao import akka.persistence.r2dbc.internal.R2dbcExecutor import akka.persistence.r2dbc.internal.R2dbcExecutorProvider +import akka.persistence.r2dbc.internal.Sql.InterpolationWithAdapter +import akka.persistence.r2dbc.internal.codec.PayloadCodec.RichStatement +import akka.persistence.r2dbc.internal.postgres.PostgresJournalDao /** * INTERNAL API */ @InternalApi -private[r2dbc] class H2JournalDao(journalSettings: R2dbcSettings, executorProvider: R2dbcExecutorProvider)(implicit - ec: ExecutionContext, - system: ActorSystem[_]) - extends PostgresJournalDao(journalSettings, executorProvider) { +private[r2dbc] class H2JournalDao(executorProvider: R2dbcExecutorProvider) + extends PostgresJournalDao(executorProvider) { + import settings.codecSettings.JournalImplicits._ + import JournalDao.SerializedJournalRow - import journalSettings.codecSettings.JournalImplicits._ override protected lazy val log: Logger = LoggerFactory.getLogger(classOf[H2JournalDao]) // always app timestamp (db is same process) monotonic increasing - require(journalSettings.useAppTimestamp) - require(journalSettings.dbTimestampMonotonicIncreasing) + require(settings.useAppTimestamp) + require(settings.dbTimestampMonotonicIncreasing) private def insertSql(slice: Int) = sql"INSERT INTO ${journalTable(slice)} " + "(slice, entity_type, persistence_id, seq_nr, writer, adapter_manifest, event_ser_id, event_ser_manifest, event_payload, tags, meta_ser_id, meta_ser_manifest, meta_payload, db_timestamp) " + diff --git a/core/src/main/scala/akka/persistence/r2dbc/internal/h2/H2QueryDao.scala b/core/src/main/scala/akka/persistence/r2dbc/internal/h2/H2QueryDao.scala index e64d51e9..5bb3f9c8 100644 --- a/core/src/main/scala/akka/persistence/r2dbc/internal/h2/H2QueryDao.scala +++ b/core/src/main/scala/akka/persistence/r2dbc/internal/h2/H2QueryDao.scala @@ -4,29 +4,22 @@ package akka.persistence.r2dbc.internal.h2 -import akka.actor.typed.ActorSystem -import akka.annotation.InternalApi -import akka.persistence.r2dbc.R2dbcSettings -import akka.persistence.r2dbc.internal.Sql.InterpolationWithAdapter -import akka.persistence.r2dbc.internal.postgres.PostgresQueryDao -import io.r2dbc.spi.ConnectionFactory -import io.r2dbc.spi.Row -import org.slf4j.Logger -import org.slf4j.LoggerFactory -import scala.concurrent.ExecutionContext import scala.concurrent.duration.Duration import scala.concurrent.duration.FiniteDuration +import org.slf4j.Logger +import org.slf4j.LoggerFactory + +import akka.annotation.InternalApi import akka.persistence.r2dbc.internal.R2dbcExecutorProvider +import akka.persistence.r2dbc.internal.Sql.InterpolationWithAdapter +import akka.persistence.r2dbc.internal.postgres.PostgresQueryDao /** * INTERNAL API */ @InternalApi -private[r2dbc] class H2QueryDao(settings: R2dbcSettings, executorProvider: R2dbcExecutorProvider)(implicit - ec: ExecutionContext, - system: ActorSystem[_]) - extends PostgresQueryDao(settings, executorProvider) { +private[r2dbc] class H2QueryDao(executorProvider: R2dbcExecutorProvider) extends PostgresQueryDao(executorProvider) { import settings.codecSettings.JournalImplicits._ override protected lazy val log: Logger = LoggerFactory.getLogger(classOf[H2QueryDao]) diff --git a/core/src/main/scala/akka/persistence/r2dbc/internal/h2/H2SnapshotDao.scala b/core/src/main/scala/akka/persistence/r2dbc/internal/h2/H2SnapshotDao.scala index cf3c6279..91d9f635 100644 --- a/core/src/main/scala/akka/persistence/r2dbc/internal/h2/H2SnapshotDao.scala +++ b/core/src/main/scala/akka/persistence/r2dbc/internal/h2/H2SnapshotDao.scala @@ -4,28 +4,20 @@ package akka.persistence.r2dbc.internal.h2 -import akka.actor.typed.ActorSystem -import akka.annotation.InternalApi -import akka.persistence.r2dbc.R2dbcSettings -import akka.persistence.r2dbc.internal.Sql.InterpolationWithAdapter -import akka.persistence.r2dbc.internal.postgres.PostgresSnapshotDao -import io.r2dbc.spi.ConnectionFactory import org.slf4j.Logger import org.slf4j.LoggerFactory -import scala.concurrent.ExecutionContext - -import io.r2dbc.spi.Row +import akka.annotation.InternalApi import akka.persistence.r2dbc.internal.R2dbcExecutorProvider +import akka.persistence.r2dbc.internal.Sql.InterpolationWithAdapter +import akka.persistence.r2dbc.internal.postgres.PostgresSnapshotDao /** * INTERNAL API */ @InternalApi -private[r2dbc] final class H2SnapshotDao(settings: R2dbcSettings, executorProvider: R2dbcExecutorProvider)(implicit - ec: ExecutionContext, - system: ActorSystem[_]) - extends PostgresSnapshotDao(settings, executorProvider) { +private[r2dbc] final class H2SnapshotDao(executorProvider: R2dbcExecutorProvider) + extends PostgresSnapshotDao(executorProvider) { import settings.codecSettings.SnapshotImplicits._ override protected lazy val log: Logger = LoggerFactory.getLogger(classOf[H2SnapshotDao]) diff --git a/core/src/main/scala/akka/persistence/r2dbc/internal/postgres/PostgresDialect.scala b/core/src/main/scala/akka/persistence/r2dbc/internal/postgres/PostgresDialect.scala index 90529c9b..01d2e4b8 100644 --- a/core/src/main/scala/akka/persistence/r2dbc/internal/postgres/PostgresDialect.scala +++ b/core/src/main/scala/akka/persistence/r2dbc/internal/postgres/PostgresDialect.scala @@ -7,6 +7,7 @@ package akka.persistence.r2dbc.internal.postgres import java.time.{ Duration => JDuration } import java.util.Locale +import scala.concurrent.ExecutionContext import scala.concurrent.duration.FiniteDuration import akka.actor.typed.ActorSystem @@ -117,19 +118,18 @@ private[r2dbc] object PostgresDialect extends Dialect { ConnectionFactories.get(builder.build()) } - override def createJournalDao(settings: R2dbcSettings, executorProvider: R2dbcExecutorProvider)(implicit - system: ActorSystem[_]): JournalDao = - new PostgresJournalDao(settings, executorProvider)(system.executionContext, system) + override def daoExecutionContext(settings: R2dbcSettings, system: ActorSystem[_]): ExecutionContext = + system.executionContext - override def createSnapshotDao(settings: R2dbcSettings, executorProvider: R2dbcExecutorProvider)(implicit - system: ActorSystem[_]): SnapshotDao = - new PostgresSnapshotDao(settings, executorProvider)(system.executionContext, system) + override def createJournalDao(executorProvider: R2dbcExecutorProvider): JournalDao = + new PostgresJournalDao(executorProvider) - override def createQueryDao(settings: R2dbcSettings, executorProvider: R2dbcExecutorProvider)(implicit - system: ActorSystem[_]): QueryDao = - new PostgresQueryDao(settings, executorProvider)(system.executionContext, system) + override def createSnapshotDao(executorProvider: R2dbcExecutorProvider): SnapshotDao = + new PostgresSnapshotDao(executorProvider) - override def createDurableStateDao(settings: R2dbcSettings, executorProvider: R2dbcExecutorProvider)(implicit - system: ActorSystem[_]): DurableStateDao = - new PostgresDurableStateDao(settings, executorProvider, this)(system.executionContext, system) + override def createQueryDao(executorProvider: R2dbcExecutorProvider): QueryDao = + new PostgresQueryDao(executorProvider) + + override def createDurableStateDao(executorProvider: R2dbcExecutorProvider): DurableStateDao = + new PostgresDurableStateDao(executorProvider, this) } diff --git a/core/src/main/scala/akka/persistence/r2dbc/internal/postgres/PostgresDurableStateDao.scala b/core/src/main/scala/akka/persistence/r2dbc/internal/postgres/PostgresDurableStateDao.scala index 7042a4a7..884a9c41 100644 --- a/core/src/main/scala/akka/persistence/r2dbc/internal/postgres/PostgresDurableStateDao.scala +++ b/core/src/main/scala/akka/persistence/r2dbc/internal/postgres/PostgresDurableStateDao.scala @@ -16,7 +16,6 @@ import scala.concurrent.duration.FiniteDuration import scala.util.control.NonFatal import io.r2dbc.spi.Connection -import io.r2dbc.spi.ConnectionFactory import io.r2dbc.spi.R2dbcDataIntegrityViolationException import io.r2dbc.spi.Row import io.r2dbc.spi.Statement @@ -79,11 +78,11 @@ private[r2dbc] object PostgresDurableStateDao { * INTERNAL API */ @InternalApi -private[r2dbc] class PostgresDurableStateDao( - settings: R2dbcSettings, - executorProvider: R2dbcExecutorProvider, - dialect: Dialect)(implicit ec: ExecutionContext, system: ActorSystem[_]) +private[r2dbc] class PostgresDurableStateDao(executorProvider: R2dbcExecutorProvider, dialect: Dialect) extends DurableStateDao { + protected val settings: R2dbcSettings = executorProvider.settings + protected val system: ActorSystem[_] = executorProvider.system + implicit protected val ec: ExecutionContext = executorProvider.ec import DurableStateDao._ import PostgresDurableStateDao._ import settings.codecSettings.DurableStateImplicits._ @@ -93,7 +92,7 @@ private[r2dbc] class PostgresDurableStateDao( protected val r2dbcExecutor = executorProvider.executorFor(slice = 0) // FIXME support data partitions // used for change events - private lazy val journalDao: JournalDao = dialect.createJournalDao(settings, executorProvider) + private lazy val journalDao: JournalDao = dialect.createJournalDao(executorProvider) private lazy val additionalColumns: Map[String, immutable.IndexedSeq[AdditionalColumn[Any, Any]]] = { settings.durableStateAdditionalColumnClasses.map { case (entityType, columnClasses) => @@ -487,7 +486,7 @@ private[r2dbc] class PostgresDurableStateDao( handler: ChangeHandler[Any], connection: Connection, change: DurableStateChange[Any]): Future[Done] = { - val session = new R2dbcSession(connection) + val session = new R2dbcSession(connection)(ec, system) def excMessage(cause: Throwable): String = { val (changeType, revision) = change match { diff --git a/core/src/main/scala/akka/persistence/r2dbc/internal/postgres/PostgresJournalDao.scala b/core/src/main/scala/akka/persistence/r2dbc/internal/postgres/PostgresJournalDao.scala index 71d2c58a..ae20ca3b 100644 --- a/core/src/main/scala/akka/persistence/r2dbc/internal/postgres/PostgresJournalDao.scala +++ b/core/src/main/scala/akka/persistence/r2dbc/internal/postgres/PostgresJournalDao.scala @@ -4,6 +4,17 @@ package akka.persistence.r2dbc.internal.postgres +import java.time.Instant + +import scala.concurrent.ExecutionContext +import scala.concurrent.Future + +import io.r2dbc.spi.Connection +import io.r2dbc.spi.Row +import io.r2dbc.spi.Statement +import org.slf4j.Logger +import org.slf4j.LoggerFactory + import akka.actor.typed.ActorSystem import akka.actor.typed.scaladsl.LoggerOps import akka.annotation.InternalApi @@ -11,28 +22,15 @@ import akka.dispatch.ExecutionContexts import akka.persistence.Persistence import akka.persistence.r2dbc.R2dbcSettings import akka.persistence.r2dbc.internal.JournalDao -import akka.persistence.r2dbc.internal.codec.PayloadCodec.RichStatement import akka.persistence.r2dbc.internal.R2dbcExecutor +import akka.persistence.r2dbc.internal.R2dbcExecutorProvider import akka.persistence.r2dbc.internal.SerializedEventMetadata import akka.persistence.r2dbc.internal.Sql.InterpolationWithAdapter +import akka.persistence.r2dbc.internal.codec.PayloadCodec.RichStatement import akka.persistence.r2dbc.internal.codec.TagsCodec.TagsCodecRichStatement import akka.persistence.r2dbc.internal.codec.TimestampCodec.TimestampCodecRichRow import akka.persistence.r2dbc.internal.codec.TimestampCodec.TimestampCodecRichStatement import akka.persistence.typed.PersistenceId -import io.r2dbc.spi.Connection -import io.r2dbc.spi.Row -import io.r2dbc.spi.Statement -import org.slf4j.Logger -import org.slf4j.LoggerFactory -import java.time.Instant - -import scala.concurrent.ExecutionContext -import scala.concurrent.Future - -import akka.persistence.r2dbc.internal.R2dbcExecutorProvider -import akka.persistence.r2dbc.internal.codec.PayloadCodec -import akka.persistence.r2dbc.internal.codec.QueryAdapter -import akka.persistence.r2dbc.internal.codec.SqlServerQueryAdapter /** * INTERNAL API @@ -61,23 +59,23 @@ private[r2dbc] object PostgresJournalDao { * Class for doing db interaction outside of an actor to avoid mistakes in future callbacks */ @InternalApi -private[r2dbc] class PostgresJournalDao(journalSettings: R2dbcSettings, executorProvider: R2dbcExecutorProvider)( - implicit - ec: ExecutionContext, - system: ActorSystem[_]) - extends JournalDao { +private[r2dbc] class PostgresJournalDao(executorProvider: R2dbcExecutorProvider) extends JournalDao { + protected val settings: R2dbcSettings = executorProvider.settings + protected val system: ActorSystem[_] = executorProvider.system + implicit protected val ec: ExecutionContext = executorProvider.ec + import settings.codecSettings.JournalImplicits._ + import JournalDao.SerializedJournalRow - import journalSettings.codecSettings.JournalImplicits._ protected def log: Logger = PostgresJournalDao.log protected val persistenceExt: Persistence = Persistence(system) - protected def journalTable(slice: Int): String = journalSettings.journalTableWithSchema(slice) + protected def journalTable(slice: Int): String = settings.journalTableWithSchema(slice) protected def insertEventWithParameterTimestampSql(slice: Int): String = { val table = journalTable(slice) val baseSql = insertEvenBaseSql(table) - if (journalSettings.dbTimestampMonotonicIncreasing) + if (settings.dbTimestampMonotonicIncreasing) sql"$baseSql ?) RETURNING db_timestamp" else sql"$baseSql GREATEST(?, ${timestampSubSelect(table)})) RETURNING db_timestamp" @@ -86,7 +84,7 @@ private[r2dbc] class PostgresJournalDao(journalSettings: R2dbcSettings, executor private def insertEventWithTransactionTimestampSql(slice: Int) = { val table = journalTable(slice) val baseSql = insertEvenBaseSql(table) - if (journalSettings.dbTimestampMonotonicIncreasing) + if (settings.dbTimestampMonotonicIncreasing) sql"$baseSql CURRENT_TIMESTAMP) RETURNING db_timestamp" else sql"$baseSql GREATEST(CURRENT_TIMESTAMP, ${timestampSubSelect(table)})) RETURNING db_timestamp" @@ -241,12 +239,12 @@ private[r2dbc] class PostgresJournalDao(journalSettings: R2dbcSettings, executor } if (useTimestampFromDb) { - if (!journalSettings.dbTimestampMonotonicIncreasing) + if (!settings.dbTimestampMonotonicIncreasing) stmt .bind(13, write.persistenceId) .bind(14, previousSeqNr) } else { - if (journalSettings.dbTimestampMonotonicIncreasing) + if (settings.dbTimestampMonotonicIncreasing) stmt .bindTimestamp(13, write.dbTimestamp) else @@ -365,7 +363,7 @@ private[r2dbc] class PostgresJournalDao(journalSettings: R2dbcSettings, executor })(ExecutionContexts.parasitic) } - val batchSize = journalSettings.cleanupSettings.eventsJournalDeleteBatchSize + val batchSize = settings.cleanupSettings.eventsJournalDeleteBatchSize def deleteInBatches(from: Long, maxTo: Long): Future[Unit] = { if (from + batchSize > maxTo) { diff --git a/core/src/main/scala/akka/persistence/r2dbc/internal/postgres/PostgresQueryDao.scala b/core/src/main/scala/akka/persistence/r2dbc/internal/postgres/PostgresQueryDao.scala index d7b3aaa1..65cb932e 100644 --- a/core/src/main/scala/akka/persistence/r2dbc/internal/postgres/PostgresQueryDao.scala +++ b/core/src/main/scala/akka/persistence/r2dbc/internal/postgres/PostgresQueryDao.scala @@ -4,39 +4,37 @@ package akka.persistence.r2dbc.internal.postgres -import scala.collection.immutable import java.time.Instant +import scala.collection.immutable import scala.concurrent.ExecutionContext import scala.concurrent.Future import scala.concurrent.duration.Duration import scala.concurrent.duration.FiniteDuration +import io.r2dbc.spi.Statement +import org.slf4j.Logger +import org.slf4j.LoggerFactory + import akka.NotUsed import akka.actor.typed.ActorSystem import akka.actor.typed.scaladsl.LoggerOps import akka.annotation.InternalApi +import akka.persistence.Persistence import akka.persistence.r2dbc.R2dbcSettings import akka.persistence.r2dbc.internal.BySliceQuery.Buckets import akka.persistence.r2dbc.internal.BySliceQuery.Buckets.Bucket import akka.persistence.r2dbc.internal.InstantFactory import akka.persistence.r2dbc.internal.JournalDao.SerializedJournalRow -import akka.persistence.r2dbc.internal.codec.PayloadCodec.RichRow import akka.persistence.r2dbc.internal.QueryDao +import akka.persistence.r2dbc.internal.R2dbcExecutorProvider import akka.persistence.r2dbc.internal.Sql.InterpolationWithAdapter +import akka.persistence.r2dbc.internal.codec.PayloadCodec.RichRow import akka.persistence.r2dbc.internal.codec.TagsCodec.TagsCodecRichRow import akka.persistence.r2dbc.internal.codec.TimestampCodec.TimestampCodecRichRow import akka.persistence.r2dbc.internal.codec.TimestampCodec.TimestampCodecRichStatement -import akka.persistence.Persistence import akka.persistence.typed.PersistenceId import akka.stream.scaladsl.Source -import io.r2dbc.spi.Statement -import org.slf4j.Logger -import org.slf4j.LoggerFactory - -import akka.persistence.Persistence -import akka.persistence.r2dbc.internal.R2dbcExecutorProvider -import akka.persistence.r2dbc.internal.codec.PayloadCodec /** * INTERNAL API @@ -50,13 +48,14 @@ private[r2dbc] object PostgresQueryDao { * INTERNAL API */ @InternalApi -private[r2dbc] class PostgresQueryDao(settings: R2dbcSettings, executorProvider: R2dbcExecutorProvider)(implicit - ec: ExecutionContext, - system: ActorSystem[_]) - extends QueryDao { - import PostgresJournalDao.readMetadata +private[r2dbc] class PostgresQueryDao(executorProvider: R2dbcExecutorProvider) extends QueryDao { + protected val settings: R2dbcSettings = executorProvider.settings + protected val system: ActorSystem[_] = executorProvider.system + implicit protected val ec: ExecutionContext = executorProvider.ec import settings.codecSettings.JournalImplicits._ + import PostgresJournalDao.readMetadata + protected def log: Logger = PostgresQueryDao.log protected val persistenceExt: Persistence = Persistence(system) protected def journalTable(slice: Int): String = settings.journalTableWithSchema(slice) diff --git a/core/src/main/scala/akka/persistence/r2dbc/internal/postgres/PostgresSnapshotDao.scala b/core/src/main/scala/akka/persistence/r2dbc/internal/postgres/PostgresSnapshotDao.scala index 551d5570..6a8fb613 100644 --- a/core/src/main/scala/akka/persistence/r2dbc/internal/postgres/PostgresSnapshotDao.scala +++ b/core/src/main/scala/akka/persistence/r2dbc/internal/postgres/PostgresSnapshotDao.scala @@ -10,7 +10,6 @@ import scala.concurrent.ExecutionContext import scala.concurrent.Future import scala.concurrent.duration.FiniteDuration -import io.r2dbc.spi.ConnectionFactory import io.r2dbc.spi.Row import io.r2dbc.spi.Statement import org.slf4j.Logger @@ -28,7 +27,6 @@ import akka.persistence.r2dbc.internal.BySliceQuery.Buckets.Bucket import akka.persistence.r2dbc.internal.InstantFactory import akka.persistence.r2dbc.internal.codec.PayloadCodec.RichRow import akka.persistence.r2dbc.internal.codec.PayloadCodec.RichStatement -import akka.persistence.r2dbc.internal.R2dbcExecutor import akka.persistence.r2dbc.internal.R2dbcExecutorProvider import akka.persistence.r2dbc.internal.SnapshotDao import akka.persistence.r2dbc.internal.Sql.InterpolationWithAdapter @@ -50,10 +48,10 @@ private[r2dbc] object PostgresSnapshotDao { * INTERNAL API */ @InternalApi -private[r2dbc] class PostgresSnapshotDao(settings: R2dbcSettings, executorProvider: R2dbcExecutorProvider)(implicit - ec: ExecutionContext, - system: ActorSystem[_]) - extends SnapshotDao { +private[r2dbc] class PostgresSnapshotDao(executorProvider: R2dbcExecutorProvider) extends SnapshotDao { + protected val settings: R2dbcSettings = executorProvider.settings + protected val system: ActorSystem[_] = executorProvider.system + implicit protected val ec: ExecutionContext = executorProvider.ec import SnapshotDao._ import settings.codecSettings.SnapshotImplicits._ diff --git a/core/src/main/scala/akka/persistence/r2dbc/internal/postgres/YugabyteDialect.scala b/core/src/main/scala/akka/persistence/r2dbc/internal/postgres/YugabyteDialect.scala index 5f76925b..49a69c22 100644 --- a/core/src/main/scala/akka/persistence/r2dbc/internal/postgres/YugabyteDialect.scala +++ b/core/src/main/scala/akka/persistence/r2dbc/internal/postgres/YugabyteDialect.scala @@ -4,6 +4,8 @@ package akka.persistence.r2dbc.internal.postgres +import scala.concurrent.ExecutionContext + import akka.actor.typed.ActorSystem import akka.annotation.InternalApi import akka.persistence.r2dbc.R2dbcSettings @@ -28,19 +30,18 @@ private[r2dbc] object YugabyteDialect extends Dialect { override def createConnectionFactory(config: Config): ConnectionFactory = PostgresDialect.createConnectionFactory(config) - override def createJournalDao(settings: R2dbcSettings, executorProvider: R2dbcExecutorProvider)(implicit - system: ActorSystem[_]): JournalDao = - new PostgresJournalDao(settings, executorProvider)(system.executionContext, system) + override def daoExecutionContext(settings: R2dbcSettings, system: ActorSystem[_]): ExecutionContext = + system.executionContext + + override def createJournalDao(executorProvider: R2dbcExecutorProvider): JournalDao = + new PostgresJournalDao(executorProvider) - override def createSnapshotDao(settings: R2dbcSettings, executorProvider: R2dbcExecutorProvider)(implicit - system: ActorSystem[_]): SnapshotDao = - new YugabyteSnapshotDao(settings, executorProvider)(system.executionContext, system) + override def createSnapshotDao(executorProvider: R2dbcExecutorProvider): SnapshotDao = + new YugabyteSnapshotDao(executorProvider) - override def createQueryDao(settings: R2dbcSettings, executorProvider: R2dbcExecutorProvider)(implicit - system: ActorSystem[_]): QueryDao = - new YugabyteQueryDao(settings, executorProvider)(system.executionContext, system) + override def createQueryDao(executorProvider: R2dbcExecutorProvider): QueryDao = + new YugabyteQueryDao(executorProvider) - override def createDurableStateDao(settings: R2dbcSettings, executorProvider: R2dbcExecutorProvider)(implicit - system: ActorSystem[_]): DurableStateDao = - new YugabyteDurableStateDao(settings, executorProvider, this)(system.executionContext, system) + override def createDurableStateDao(executorProvider: R2dbcExecutorProvider): DurableStateDao = + new YugabyteDurableStateDao(executorProvider, this) } diff --git a/core/src/main/scala/akka/persistence/r2dbc/internal/postgres/YugabyteDurableStateDao.scala b/core/src/main/scala/akka/persistence/r2dbc/internal/postgres/YugabyteDurableStateDao.scala index ceb1630c..644e787f 100644 --- a/core/src/main/scala/akka/persistence/r2dbc/internal/postgres/YugabyteDurableStateDao.scala +++ b/core/src/main/scala/akka/persistence/r2dbc/internal/postgres/YugabyteDurableStateDao.scala @@ -4,14 +4,10 @@ package akka.persistence.r2dbc.internal.postgres -import scala.concurrent.ExecutionContext - import org.slf4j.Logger import org.slf4j.LoggerFactory -import akka.actor.typed.ActorSystem import akka.annotation.InternalApi -import akka.persistence.r2dbc.R2dbcSettings import akka.persistence.r2dbc.internal.Dialect import akka.persistence.r2dbc.internal.R2dbcExecutorProvider @@ -19,11 +15,8 @@ import akka.persistence.r2dbc.internal.R2dbcExecutorProvider * INTERNAL API */ @InternalApi -private[r2dbc] final class YugabyteDurableStateDao( - settings: R2dbcSettings, - executorProvider: R2dbcExecutorProvider, - dialect: Dialect)(implicit ec: ExecutionContext, system: ActorSystem[_]) - extends PostgresDurableStateDao(settings, executorProvider, dialect) { +private[r2dbc] final class YugabyteDurableStateDao(executorProvider: R2dbcExecutorProvider, dialect: Dialect) + extends PostgresDurableStateDao(executorProvider, dialect) { override protected lazy val log: Logger = LoggerFactory.getLogger(classOf[YugabyteDurableStateDao]) diff --git a/core/src/main/scala/akka/persistence/r2dbc/internal/postgres/YugabyteQueryDao.scala b/core/src/main/scala/akka/persistence/r2dbc/internal/postgres/YugabyteQueryDao.scala index 684133ab..caed0497 100644 --- a/core/src/main/scala/akka/persistence/r2dbc/internal/postgres/YugabyteQueryDao.scala +++ b/core/src/main/scala/akka/persistence/r2dbc/internal/postgres/YugabyteQueryDao.scala @@ -4,25 +4,18 @@ package akka.persistence.r2dbc.internal.postgres -import scala.concurrent.ExecutionContext - -import akka.actor.typed.ActorSystem -import akka.annotation.InternalApi -import akka.persistence.r2dbc.R2dbcSettings -import io.r2dbc.spi.ConnectionFactory import org.slf4j.Logger import org.slf4j.LoggerFactory +import akka.annotation.InternalApi import akka.persistence.r2dbc.internal.R2dbcExecutorProvider /** * INTERNAL API */ @InternalApi -private[r2dbc] final class YugabyteQueryDao(settings: R2dbcSettings, executorProvider: R2dbcExecutorProvider)(implicit - ec: ExecutionContext, - system: ActorSystem[_]) - extends PostgresQueryDao(settings, executorProvider) { +private[r2dbc] final class YugabyteQueryDao(executorProvider: R2dbcExecutorProvider) + extends PostgresQueryDao(executorProvider) { override protected lazy val log: Logger = LoggerFactory.getLogger(classOf[YugabyteQueryDao]) diff --git a/core/src/main/scala/akka/persistence/r2dbc/internal/postgres/YugabyteSnapshotDao.scala b/core/src/main/scala/akka/persistence/r2dbc/internal/postgres/YugabyteSnapshotDao.scala index 348dc9e9..ec8fbd33 100644 --- a/core/src/main/scala/akka/persistence/r2dbc/internal/postgres/YugabyteSnapshotDao.scala +++ b/core/src/main/scala/akka/persistence/r2dbc/internal/postgres/YugabyteSnapshotDao.scala @@ -4,26 +4,18 @@ package akka.persistence.r2dbc.internal.postgres -import scala.concurrent.ExecutionContext - -import akka.actor.typed.ActorSystem -import akka.annotation.InternalApi -import akka.persistence.r2dbc.R2dbcSettings -import io.r2dbc.spi.ConnectionFactory import org.slf4j.Logger import org.slf4j.LoggerFactory +import akka.annotation.InternalApi import akka.persistence.r2dbc.internal.R2dbcExecutorProvider /** * INTERNAL API */ @InternalApi -private[r2dbc] final class YugabyteSnapshotDao(settings: R2dbcSettings, executorProvider: R2dbcExecutorProvider)( - implicit - ec: ExecutionContext, - system: ActorSystem[_]) - extends PostgresSnapshotDao(settings, executorProvider) { +private[r2dbc] final class YugabyteSnapshotDao(executorProvider: R2dbcExecutorProvider) + extends PostgresSnapshotDao(executorProvider) { override protected lazy val log: Logger = LoggerFactory.getLogger(classOf[YugabyteSnapshotDao]) diff --git a/core/src/main/scala/akka/persistence/r2dbc/internal/sqlserver/SqlServerDialect.scala b/core/src/main/scala/akka/persistence/r2dbc/internal/sqlserver/SqlServerDialect.scala index 8bb9f0ad..ccb97fff 100644 --- a/core/src/main/scala/akka/persistence/r2dbc/internal/sqlserver/SqlServerDialect.scala +++ b/core/src/main/scala/akka/persistence/r2dbc/internal/sqlserver/SqlServerDialect.scala @@ -6,6 +6,7 @@ package akka.persistence.r2dbc.internal.sqlserver import java.time.{ Duration => JDuration } +import scala.concurrent.ExecutionContext import scala.concurrent.duration.FiniteDuration import akka.actor.typed.ActorSystem @@ -85,19 +86,18 @@ private[r2dbc] object SqlServerDialect extends Dialect { .build()) } - override def createJournalDao(settings: R2dbcSettings, executorProvider: R2dbcExecutorProvider)(implicit - system: ActorSystem[_]): JournalDao = - new SqlServerJournalDao(settings, executorProvider)(system.executionContext, system) + override def daoExecutionContext(settings: R2dbcSettings, system: ActorSystem[_]): ExecutionContext = + system.executionContext - override def createQueryDao(settings: R2dbcSettings, executorProvider: R2dbcExecutorProvider)(implicit - system: ActorSystem[_]): QueryDao = - new SqlServerQueryDao(settings, executorProvider)(system.executionContext, system) + override def createJournalDao(executorProvider: R2dbcExecutorProvider): JournalDao = + new SqlServerJournalDao(executorProvider) - override def createSnapshotDao(settings: R2dbcSettings, executorProvider: R2dbcExecutorProvider)(implicit - system: ActorSystem[_]): SnapshotDao = - new SqlServerSnapshotDao(settings, executorProvider)(system.executionContext, system) + override def createQueryDao(executorProvider: R2dbcExecutorProvider): QueryDao = + new SqlServerQueryDao(executorProvider) - override def createDurableStateDao(settings: R2dbcSettings, executorProvider: R2dbcExecutorProvider)(implicit - system: ActorSystem[_]): DurableStateDao = - new SqlServerDurableStateDao(settings, executorProvider, this)(system.executionContext, system) + override def createSnapshotDao(executorProvider: R2dbcExecutorProvider): SnapshotDao = + new SqlServerSnapshotDao(executorProvider) + + override def createDurableStateDao(executorProvider: R2dbcExecutorProvider): DurableStateDao = + new SqlServerDurableStateDao(executorProvider, this) } diff --git a/core/src/main/scala/akka/persistence/r2dbc/internal/sqlserver/SqlServerDurableStateDao.scala b/core/src/main/scala/akka/persistence/r2dbc/internal/sqlserver/SqlServerDurableStateDao.scala index 43da199f..f0ed6ac5 100644 --- a/core/src/main/scala/akka/persistence/r2dbc/internal/sqlserver/SqlServerDurableStateDao.scala +++ b/core/src/main/scala/akka/persistence/r2dbc/internal/sqlserver/SqlServerDurableStateDao.scala @@ -7,14 +7,11 @@ package akka.persistence.r2dbc.internal.sqlserver import java.time.Instant import scala.collection.immutable -import scala.concurrent.ExecutionContext import scala.concurrent.Future import scala.concurrent.duration.Duration import scala.concurrent.duration.FiniteDuration -import akka.actor.typed.ActorSystem import akka.annotation.InternalApi -import akka.persistence.r2dbc.R2dbcSettings import akka.persistence.r2dbc.internal.Dialect import akka.persistence.r2dbc.internal.Sql.InterpolationWithAdapter import akka.persistence.r2dbc.internal.codec.TimestampCodec.TimestampCodecRichStatement @@ -39,11 +36,8 @@ private[r2dbc] object SqlServerDurableStateDao { * INTERNAL API */ @InternalApi -private[r2dbc] class SqlServerDurableStateDao( - settings: R2dbcSettings, - executorProvider: R2dbcExecutorProvider, - dialect: Dialect)(implicit ec: ExecutionContext, system: ActorSystem[_]) - extends PostgresDurableStateDao(settings, executorProvider, dialect) { +private[r2dbc] class SqlServerDurableStateDao(executorProvider: R2dbcExecutorProvider, dialect: Dialect) + extends PostgresDurableStateDao(executorProvider, dialect) { import settings.codecSettings.DurableStateImplicits._ require(settings.useAppTimestamp, "SqlServer requires akka.persistence.r2dbc.use-app-timestamp=on") diff --git a/core/src/main/scala/akka/persistence/r2dbc/internal/sqlserver/SqlServerJournalDao.scala b/core/src/main/scala/akka/persistence/r2dbc/internal/sqlserver/SqlServerJournalDao.scala index 3ea0ee12..0ef3b304 100644 --- a/core/src/main/scala/akka/persistence/r2dbc/internal/sqlserver/SqlServerJournalDao.scala +++ b/core/src/main/scala/akka/persistence/r2dbc/internal/sqlserver/SqlServerJournalDao.scala @@ -4,21 +4,16 @@ package akka.persistence.r2dbc.internal.sqlserver -import scala.concurrent.ExecutionContext - -import akka.actor.typed.ActorSystem -import akka.annotation.InternalApi -import akka.persistence.r2dbc.R2dbcSettings -import akka.persistence.r2dbc.internal.Sql.InterpolationWithAdapter -import akka.persistence.r2dbc.internal.codec.TimestampCodec.TimestampCodecRichStatement -import akka.persistence.r2dbc.internal.postgres.PostgresJournalDao -import io.r2dbc.spi.ConnectionFactory import io.r2dbc.spi.Statement import org.slf4j.Logger import org.slf4j.LoggerFactory +import akka.annotation.InternalApi import akka.persistence.r2dbc.internal.InstantFactory import akka.persistence.r2dbc.internal.R2dbcExecutorProvider +import akka.persistence.r2dbc.internal.Sql.InterpolationWithAdapter +import akka.persistence.r2dbc.internal.codec.TimestampCodec.TimestampCodecRichStatement +import akka.persistence.r2dbc.internal.postgres.PostgresJournalDao /** * INTERNAL API @@ -33,10 +28,8 @@ private[r2dbc] object SqlServerJournalDao { * INTERNAL API */ @InternalApi -private[r2dbc] class SqlServerJournalDao(settings: R2dbcSettings, executorProvider: R2dbcExecutorProvider)(implicit - ec: ExecutionContext, - system: ActorSystem[_]) - extends PostgresJournalDao(settings, executorProvider) { +private[r2dbc] class SqlServerJournalDao(executorProvider: R2dbcExecutorProvider) + extends PostgresJournalDao(executorProvider) { import settings.codecSettings.JournalImplicits._ require(settings.useAppTimestamp, "SqlServer requires akka.persistence.r2dbc.use-app-timestamp=on") diff --git a/core/src/main/scala/akka/persistence/r2dbc/internal/sqlserver/SqlServerQueryDao.scala b/core/src/main/scala/akka/persistence/r2dbc/internal/sqlserver/SqlServerQueryDao.scala index 8256a3c2..44f8b575 100644 --- a/core/src/main/scala/akka/persistence/r2dbc/internal/sqlserver/SqlServerQueryDao.scala +++ b/core/src/main/scala/akka/persistence/r2dbc/internal/sqlserver/SqlServerQueryDao.scala @@ -6,24 +6,20 @@ package akka.persistence.r2dbc.internal.sqlserver import java.time.Instant -import scala.concurrent.ExecutionContext import scala.concurrent.Future import scala.concurrent.duration.Duration import scala.concurrent.duration.FiniteDuration -import akka.actor.typed.ActorSystem -import akka.annotation.InternalApi -import akka.persistence.r2dbc.R2dbcSettings -import akka.persistence.r2dbc.internal.InstantFactory -import akka.persistence.r2dbc.internal.Sql.InterpolationWithAdapter -import akka.persistence.r2dbc.internal.codec.TimestampCodec.TimestampCodecRichStatement -import akka.persistence.r2dbc.internal.postgres.PostgresQueryDao -import io.r2dbc.spi.ConnectionFactory import io.r2dbc.spi.Statement import org.slf4j.Logger import org.slf4j.LoggerFactory +import akka.annotation.InternalApi +import akka.persistence.r2dbc.internal.InstantFactory import akka.persistence.r2dbc.internal.R2dbcExecutorProvider +import akka.persistence.r2dbc.internal.Sql.InterpolationWithAdapter +import akka.persistence.r2dbc.internal.codec.TimestampCodec.TimestampCodecRichStatement +import akka.persistence.r2dbc.internal.postgres.PostgresQueryDao /** * INTERNAL API @@ -38,10 +34,8 @@ private[r2dbc] object SqlServerQueryDao { * INTERNAL API */ @InternalApi -private[r2dbc] class SqlServerQueryDao(settings: R2dbcSettings, executorProvider: R2dbcExecutorProvider)(implicit - ec: ExecutionContext, - system: ActorSystem[_]) - extends PostgresQueryDao(settings, executorProvider) { +private[r2dbc] class SqlServerQueryDao(executorProvider: R2dbcExecutorProvider) + extends PostgresQueryDao(executorProvider) { import settings.codecSettings.JournalImplicits._ override def sqlFalse = "0" diff --git a/core/src/main/scala/akka/persistence/r2dbc/internal/sqlserver/SqlServerSnapshotDao.scala b/core/src/main/scala/akka/persistence/r2dbc/internal/sqlserver/SqlServerSnapshotDao.scala index 4c574ef5..3fb152fc 100644 --- a/core/src/main/scala/akka/persistence/r2dbc/internal/sqlserver/SqlServerSnapshotDao.scala +++ b/core/src/main/scala/akka/persistence/r2dbc/internal/sqlserver/SqlServerSnapshotDao.scala @@ -6,26 +6,23 @@ package akka.persistence.r2dbc.internal.sqlserver import java.time.Instant -import scala.concurrent.ExecutionContext import scala.concurrent.Future -import akka.actor.typed.ActorSystem +import io.r2dbc.spi.Statement +import org.slf4j.Logger +import org.slf4j.LoggerFactory + import akka.annotation.InternalApi import akka.persistence.SnapshotSelectionCriteria -import akka.persistence.r2dbc.R2dbcSettings -import akka.persistence.r2dbc.internal.codec.PayloadCodec.RichStatement +import akka.persistence.r2dbc.internal.InstantFactory +import akka.persistence.r2dbc.internal.R2dbcExecutorProvider import akka.persistence.r2dbc.internal.SnapshotDao.SerializedSnapshotMetadata import akka.persistence.r2dbc.internal.SnapshotDao.SerializedSnapshotRow import akka.persistence.r2dbc.internal.Sql.InterpolationWithAdapter +import akka.persistence.r2dbc.internal.codec.PayloadCodec.RichStatement import akka.persistence.r2dbc.internal.codec.TagsCodec.TagsCodecRichStatement import akka.persistence.r2dbc.internal.codec.TimestampCodec.TimestampCodecRichStatement import akka.persistence.r2dbc.internal.postgres.PostgresSnapshotDao -import io.r2dbc.spi.Statement -import org.slf4j.Logger -import org.slf4j.LoggerFactory - -import akka.persistence.r2dbc.internal.InstantFactory -import akka.persistence.r2dbc.internal.R2dbcExecutorProvider /** * INTERNAL API @@ -39,10 +36,8 @@ private[r2dbc] object SqlServerSnapshotDao { * INTERNAL API */ @InternalApi -private[r2dbc] class SqlServerSnapshotDao(settings: R2dbcSettings, executorProvider: R2dbcExecutorProvider)(implicit - ec: ExecutionContext, - system: ActorSystem[_]) - extends PostgresSnapshotDao(settings, executorProvider) { +private[r2dbc] class SqlServerSnapshotDao(executorProvider: R2dbcExecutorProvider) + extends PostgresSnapshotDao(executorProvider) { import settings.codecSettings.SnapshotImplicits._ override def log: Logger = SqlServerSnapshotDao.log diff --git a/core/src/main/scala/akka/persistence/r2dbc/journal/R2dbcJournal.scala b/core/src/main/scala/akka/persistence/r2dbc/journal/R2dbcJournal.scala index 5edbe1c1..6b80b20a 100644 --- a/core/src/main/scala/akka/persistence/r2dbc/journal/R2dbcJournal.scala +++ b/core/src/main/scala/akka/persistence/r2dbc/journal/R2dbcJournal.scala @@ -95,9 +95,14 @@ private[r2dbc] final class R2dbcJournal(config: Config, cfgPath: String) extends log.debug("R2DBC journal starting up with dialect [{}]", settings.dialectName) private val executorProvider = - new R2dbcExecutorProvider(settings, sharedConfigPath + ".connection-factory", LoggerFactory.getLogger(getClass)) + new R2dbcExecutorProvider( + system, + settings.connectionFactorySettings.dialect.daoExecutionContext(settings, system), + settings, + sharedConfigPath + ".connection-factory", + LoggerFactory.getLogger(getClass)) private val journalDao = - settings.connectionFactorySettings.dialect.createJournalDao(settings, executorProvider) + settings.connectionFactorySettings.dialect.createJournalDao(executorProvider) private val query = PersistenceQuery(system).readJournalFor[R2dbcReadJournal](sharedConfigPath + ".query") private val pubSub: Option[PubSub] = diff --git a/core/src/main/scala/akka/persistence/r2dbc/query/scaladsl/R2dbcReadJournal.scala b/core/src/main/scala/akka/persistence/r2dbc/query/scaladsl/R2dbcReadJournal.scala index 0c340d1c..3e9849b2 100644 --- a/core/src/main/scala/akka/persistence/r2dbc/query/scaladsl/R2dbcReadJournal.scala +++ b/core/src/main/scala/akka/persistence/r2dbc/query/scaladsl/R2dbcReadJournal.scala @@ -94,15 +94,18 @@ final class R2dbcReadJournal(system: ExtendedActorSystem, config: Config, cfgPat private val serialization = SerializationExtension(system) private val persistenceExt = Persistence(system) private val executorProvider = - new R2dbcExecutorProvider(settings, sharedConfigPath + ".connection-factory", LoggerFactory.getLogger(getClass))( - typedSystem.executionContext, - typedSystem) + new R2dbcExecutorProvider( + typedSystem, + settings.connectionFactorySettings.dialect.daoExecutionContext(settings, typedSystem), + settings, + sharedConfigPath + ".connection-factory", + LoggerFactory.getLogger(getClass)) private val journalDao = - settings.connectionFactorySettings.dialect.createJournalDao(settings, executorProvider)(typedSystem) + settings.connectionFactorySettings.dialect.createJournalDao(executorProvider) private val queryDao = - settings.connectionFactorySettings.dialect.createQueryDao(settings, executorProvider)(typedSystem) + settings.connectionFactorySettings.dialect.createQueryDao(executorProvider) private lazy val snapshotDao = - settings.connectionFactorySettings.dialect.createSnapshotDao(settings, executorProvider)(typedSystem) + settings.connectionFactorySettings.dialect.createSnapshotDao(executorProvider) private val filteredPayloadSerId = SerializationExtension(system).findSerializerFor(FilteredPayload).identifier diff --git a/core/src/main/scala/akka/persistence/r2dbc/snapshot/R2dbcSnapshotStore.scala b/core/src/main/scala/akka/persistence/r2dbc/snapshot/R2dbcSnapshotStore.scala index 6bcce1e2..d87aa987 100644 --- a/core/src/main/scala/akka/persistence/r2dbc/snapshot/R2dbcSnapshotStore.scala +++ b/core/src/main/scala/akka/persistence/r2dbc/snapshot/R2dbcSnapshotStore.scala @@ -60,9 +60,14 @@ private[r2dbc] final class R2dbcSnapshotStore(cfg: Config, cfgPath: String) exte log.debug("R2DBC snapshot store starting up with dialect [{}]", settings.dialectName) private val executorProvider = - new R2dbcExecutorProvider(settings, sharedConfigPath + ".connection-factory", LoggerFactory.getLogger(getClass)) - private val dao = settings.connectionFactorySettings.dialect.createSnapshotDao(settings, executorProvider) - private val queryDao = settings.connectionFactorySettings.dialect.createQueryDao(settings, executorProvider) + new R2dbcExecutorProvider( + system, + settings.connectionFactorySettings.dialect.daoExecutionContext(settings, system), + settings, + sharedConfigPath + ".connection-factory", + LoggerFactory.getLogger(getClass)) + private val dao = settings.connectionFactorySettings.dialect.createSnapshotDao(executorProvider) + private val queryDao = settings.connectionFactorySettings.dialect.createQueryDao(executorProvider) def loadAsync(persistenceId: String, criteria: SnapshotSelectionCriteria): Future[Option[SelectedSnapshot]] = dao diff --git a/core/src/main/scala/akka/persistence/r2dbc/state/scaladsl/R2dbcDurableStateStore.scala b/core/src/main/scala/akka/persistence/r2dbc/state/scaladsl/R2dbcDurableStateStore.scala index c8473758..b815e05d 100644 --- a/core/src/main/scala/akka/persistence/r2dbc/state/scaladsl/R2dbcDurableStateStore.scala +++ b/core/src/main/scala/akka/persistence/r2dbc/state/scaladsl/R2dbcDurableStateStore.scala @@ -73,11 +73,14 @@ class R2dbcDurableStateStore[A](system: ExtendedActorSystem, config: Config, cfg private val persistenceExt = Persistence(system) // FIXME maybe this is using the wrong executionContext, H2Dialect is using another dispatcher? private val executorProvider = - new R2dbcExecutorProvider(settings, sharedConfigPath + ".connection-factory", LoggerFactory.getLogger(getClass))( - typedSystem.executionContext, - typedSystem) + new R2dbcExecutorProvider( + typedSystem, + settings.connectionFactorySettings.dialect.daoExecutionContext(settings, typedSystem), + settings, + sharedConfigPath + ".connection-factory", + LoggerFactory.getLogger(getClass)) private val stateDao = - settings.connectionFactorySettings.dialect.createDurableStateDao(settings, executorProvider)(typedSystem) + settings.connectionFactorySettings.dialect.createDurableStateDao(executorProvider) private val changeEventWriterUuid = UUID.randomUUID().toString private val pubSub: Option[PubSub] = diff --git a/core/src/test/scala/akka/persistence/r2dbc/TestDbLifecycle.scala b/core/src/test/scala/akka/persistence/r2dbc/TestDbLifecycle.scala index 429e1680..22ec6d12 100644 --- a/core/src/test/scala/akka/persistence/r2dbc/TestDbLifecycle.scala +++ b/core/src/test/scala/akka/persistence/r2dbc/TestDbLifecycle.scala @@ -32,9 +32,12 @@ trait TestDbLifecycle extends BeforeAndAfterAll { this: Suite => R2dbcSettings(typedSystem.settings.config.getConfig(testConfigPath)) lazy val r2dbcExecutorProvider: R2dbcExecutorProvider = - new R2dbcExecutorProvider(r2dbcSettings, testConfigPath + ".connection-factory", LoggerFactory.getLogger(getClass))( - typedSystem.executionContext, - typedSystem) + new R2dbcExecutorProvider( + typedSystem, + r2dbcSettings.connectionFactorySettings.dialect.daoExecutionContext(r2dbcSettings, typedSystem), + r2dbcSettings, + testConfigPath + ".connection-factory", + LoggerFactory.getLogger(getClass)) def r2dbcExecutor(slice: Int): R2dbcExecutor = r2dbcExecutorProvider.executorFor(slice) diff --git a/migration/src/main/scala/akka/persistence/r2dbc/migration/MigrationTool.scala b/migration/src/main/scala/akka/persistence/r2dbc/migration/MigrationTool.scala index ef2c2f43..0ff25b84 100644 --- a/migration/src/main/scala/akka/persistence/r2dbc/migration/MigrationTool.scala +++ b/migration/src/main/scala/akka/persistence/r2dbc/migration/MigrationTool.scala @@ -126,18 +126,20 @@ class MigrationTool(system: ActorSystem[_]) { private val serialization: Serialization = SerializationExtension(system) private val targetExecutorProvider = new R2dbcExecutorProvider( + system, + targetR2dbcSettings.connectionFactorySettings.dialect.daoExecutionContext(targetR2dbcSettings, system), targetR2dbcSettings, targetPluginId + ".connection-factory", LoggerFactory.getLogger(getClass)) private val targetJournalDao = - targetR2dbcSettings.connectionFactorySettings.dialect.createJournalDao(targetR2dbcSettings, targetExecutorProvider) + targetR2dbcSettings.connectionFactorySettings.dialect.createJournalDao(targetExecutorProvider) private val targetSnapshotDao = targetR2dbcSettings.connectionFactorySettings.dialect - .createSnapshotDao(targetR2dbcSettings, targetExecutorProvider) + .createSnapshotDao(targetExecutorProvider) private val targetDurableStateDao = targetR2dbcSettings.connectionFactorySettings.dialect - .createDurableStateDao(targetR2dbcSettings, targetExecutorProvider) + .createDurableStateDao(targetExecutorProvider) private val targetBatch = migrationConfig.getInt("target.batch")