Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore: Dao ExecutionContext in R2dbcExecutorProvider #518

Merged
merged 1 commit into from
Feb 5, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -58,8 +58,13 @@ final class DurableStateCleanup(systemProvider: ClassicActorSystemProvider, conf
private val settings = R2dbcSettings(system.settings.config.getConfig(sharedConfigPath))

private val executorProvider =
new R2dbcExecutorProvider(settings, sharedConfigPath + ".connection-factory", LoggerFactory.getLogger(getClass))
private val stateDao = settings.connectionFactorySettings.dialect.createDurableStateDao(settings, executorProvider)
new R2dbcExecutorProvider(
system,
settings.connectionFactorySettings.dialect.daoExecutionContext(settings, system),
settings,
sharedConfigPath + ".connection-factory",
LoggerFactory.getLogger(getClass))
private val stateDao = settings.connectionFactorySettings.dialect.createDurableStateDao(executorProvider)

/**
* Delete the state related to one single `persistenceId`.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,9 +60,14 @@ final class EventSourcedCleanup(systemProvider: ClassicActorSystemProvider, conf
private val settings = R2dbcSettings(system.settings.config.getConfig(sharedConfigPath))

private val executorProvider =
new R2dbcExecutorProvider(settings, sharedConfigPath + ".connection-factory", LoggerFactory.getLogger(getClass))
private val journalDao = settings.connectionFactorySettings.dialect.createJournalDao(settings, executorProvider)
private val snapshotDao = settings.connectionFactorySettings.dialect.createSnapshotDao(settings, executorProvider)
new R2dbcExecutorProvider(
system,
settings.connectionFactorySettings.dialect.daoExecutionContext(settings, system),
settings,
sharedConfigPath + ".connection-factory",
LoggerFactory.getLogger(getClass))
private val journalDao = settings.connectionFactorySettings.dialect.createJournalDao(executorProvider)
private val snapshotDao = settings.connectionFactorySettings.dialect.createSnapshotDao(executorProvider)

/**
* Delete all events before a sequenceNr for the given persistence id. Snapshots are not deleted.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,17 +29,15 @@ private[r2dbc] trait Dialect {
*/
def adaptSettings(settings: R2dbcSettings): R2dbcSettings = settings

def daoExecutionContext(settings: R2dbcSettings, system: ActorSystem[_]): ExecutionContext

def createConnectionFactory(config: Config): ConnectionFactory

def createJournalDao(settings: R2dbcSettings, executorProvider: R2dbcExecutorProvider)(implicit
system: ActorSystem[_]): JournalDao
def createJournalDao(executorProvider: R2dbcExecutorProvider): JournalDao

def createQueryDao(settings: R2dbcSettings, executorProvider: R2dbcExecutorProvider)(implicit
system: ActorSystem[_]): QueryDao
def createQueryDao(executorProvider: R2dbcExecutorProvider): QueryDao

def createSnapshotDao(settings: R2dbcSettings, executorProvider: R2dbcExecutorProvider)(implicit
system: ActorSystem[_]): SnapshotDao
def createSnapshotDao(executorProvider: R2dbcExecutorProvider): SnapshotDao

def createDurableStateDao(settings: R2dbcSettings, executorProvider: R2dbcExecutorProvider)(implicit
system: ActorSystem[_]): DurableStateDao
def createDurableStateDao(executorProvider: R2dbcExecutorProvider): DurableStateDao
}
Original file line number Diff line number Diff line change
Expand Up @@ -392,9 +392,11 @@ class R2dbcExecutor(
* INTERNAL API
*/
@InternalStableApi class R2dbcExecutorProvider(
val system: ActorSystem[_],
val ec: ExecutionContext,
val settings: R2dbcSettings,
connectionFactoryBaseConfigPath: String,
log: Logger)(implicit ec: ExecutionContext, system: ActorSystem[_]) {
log: Logger) {
private val connectionFactoryProvider = ConnectionFactoryProvider(system)
private var cache = IntMap.empty[R2dbcExecutor]

Expand All @@ -409,7 +411,7 @@ class R2dbcExecutor(
connectionFactory,
log,
settings.logDbCallsExceeding,
settings.connectionFactorySettings.poolSettings.closeCallsExceeding)
settings.connectionFactorySettings.poolSettings.closeCallsExceeding)(ec, system)
// it's just a cache so no need for guarding concurrent updates or visibility
cache = cache.updated(slice, executor)
executor
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,28 +87,24 @@ private[r2dbc] object H2Dialect extends Dialect {
new H2ConnectionFactory(h2Config)
}

override def createJournalDao(settings: R2dbcSettings, executorProvider: R2dbcExecutorProvider)(implicit
system: ActorSystem[_]): JournalDao =
new H2JournalDao(settings, executorProvider)(ecForDaos(system, settings), system)

override def createSnapshotDao(settings: R2dbcSettings, executorProvider: R2dbcExecutorProvider)(implicit
system: ActorSystem[_]): SnapshotDao =
new H2SnapshotDao(settings, executorProvider)(ecForDaos(system, settings), system)

override def createQueryDao(settings: R2dbcSettings, executorProvider: R2dbcExecutorProvider)(implicit
system: ActorSystem[_]): QueryDao =
new H2QueryDao(settings, executorProvider)(ecForDaos(system, settings), system)

override def createDurableStateDao(settings: R2dbcSettings, executorProvider: R2dbcExecutorProvider)(implicit
system: ActorSystem[_]): DurableStateDao =
new H2DurableStateDao(settings, executorProvider, this)(ecForDaos(system, settings), system)

private def ecForDaos(system: ActorSystem[_], settings: R2dbcSettings): ExecutionContext = {
override def daoExecutionContext(settings: R2dbcSettings, system: ActorSystem[_]): ExecutionContext = {
// H2 R2DBC driver blocks in surprising places (Mono.toFuture in stmt.execute().asFuture())
system.dispatchers.lookup(
DispatcherSelector.fromConfig(settings.connectionFactorySettings.config.getString("use-dispatcher")))
}

override def createJournalDao(executorProvider: R2dbcExecutorProvider): JournalDao =
new H2JournalDao(executorProvider)

override def createSnapshotDao(executorProvider: R2dbcExecutorProvider): SnapshotDao =
new H2SnapshotDao(executorProvider)

override def createQueryDao(executorProvider: R2dbcExecutorProvider): QueryDao =
new H2QueryDao(executorProvider)

override def createDurableStateDao(executorProvider: R2dbcExecutorProvider): DurableStateDao =
new H2DurableStateDao(executorProvider, this)

private def dbSchema(config: Config, createSliceIndexes: Boolean, additionalInit: String): String = {
def optionalConfString(name: String): Option[String] = {
val s = config.getString(name)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,16 +4,13 @@

package akka.persistence.r2dbc.internal.h2

import scala.concurrent.ExecutionContext
import scala.concurrent.duration.Duration
import scala.concurrent.duration.FiniteDuration

import org.slf4j.Logger
import org.slf4j.LoggerFactory

import akka.actor.typed.ActorSystem
import akka.annotation.InternalApi
import akka.persistence.r2dbc.R2dbcSettings
import akka.persistence.r2dbc.internal.Dialect
import akka.persistence.r2dbc.internal.R2dbcExecutorProvider
import akka.persistence.r2dbc.internal.postgres.PostgresDurableStateDao
Expand All @@ -22,11 +19,8 @@ import akka.persistence.r2dbc.internal.postgres.PostgresDurableStateDao
* INTERNAL API
*/
@InternalApi
private[r2dbc] final class H2DurableStateDao(
settings: R2dbcSettings,
executorProvider: R2dbcExecutorProvider,
dialect: Dialect)(implicit ec: ExecutionContext, system: ActorSystem[_])
extends PostgresDurableStateDao(settings, executorProvider, dialect) {
private[r2dbc] final class H2DurableStateDao(executorProvider: R2dbcExecutorProvider, dialect: Dialect)
extends PostgresDurableStateDao(executorProvider, dialect) {

override protected lazy val log: Logger = LoggerFactory.getLogger(classOf[H2DurableStateDao])

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,41 +4,37 @@

package akka.persistence.r2dbc.internal.h2

import akka.actor.typed.ActorSystem
import akka.annotation.InternalApi
import akka.dispatch.ExecutionContexts
import akka.persistence.r2dbc.R2dbcSettings
import akka.persistence.r2dbc.internal.JournalDao
import akka.persistence.r2dbc.internal.codec.PayloadCodec.RichStatement
import akka.persistence.r2dbc.internal.Sql.InterpolationWithAdapter
import akka.persistence.r2dbc.internal.postgres.PostgresJournalDao
import io.r2dbc.spi.Statement
import org.slf4j.Logger
import org.slf4j.LoggerFactory
import java.time.Instant

import scala.concurrent.ExecutionContext
import scala.concurrent.Future

import io.r2dbc.spi.Connection
import io.r2dbc.spi.Statement
import org.slf4j.Logger
import org.slf4j.LoggerFactory

import akka.annotation.InternalApi
import akka.dispatch.ExecutionContexts
import akka.persistence.r2dbc.internal.JournalDao
import akka.persistence.r2dbc.internal.R2dbcExecutor
import akka.persistence.r2dbc.internal.R2dbcExecutorProvider
import akka.persistence.r2dbc.internal.Sql.InterpolationWithAdapter
import akka.persistence.r2dbc.internal.codec.PayloadCodec.RichStatement
import akka.persistence.r2dbc.internal.postgres.PostgresJournalDao

/**
* INTERNAL API
*/
@InternalApi
private[r2dbc] class H2JournalDao(journalSettings: R2dbcSettings, executorProvider: R2dbcExecutorProvider)(implicit
ec: ExecutionContext,
system: ActorSystem[_])
extends PostgresJournalDao(journalSettings, executorProvider) {
private[r2dbc] class H2JournalDao(executorProvider: R2dbcExecutorProvider)
extends PostgresJournalDao(executorProvider) {
import settings.codecSettings.JournalImplicits._

import JournalDao.SerializedJournalRow
import journalSettings.codecSettings.JournalImplicits._
override protected lazy val log: Logger = LoggerFactory.getLogger(classOf[H2JournalDao])
// always app timestamp (db is same process) monotonic increasing
require(journalSettings.useAppTimestamp)
require(journalSettings.dbTimestampMonotonicIncreasing)
require(settings.useAppTimestamp)
require(settings.dbTimestampMonotonicIncreasing)

private def insertSql(slice: Int) = sql"INSERT INTO ${journalTable(slice)} " +
"(slice, entity_type, persistence_id, seq_nr, writer, adapter_manifest, event_ser_id, event_ser_manifest, event_payload, tags, meta_ser_id, meta_ser_manifest, meta_payload, db_timestamp) " +
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,29 +4,22 @@

package akka.persistence.r2dbc.internal.h2

import akka.actor.typed.ActorSystem
import akka.annotation.InternalApi
import akka.persistence.r2dbc.R2dbcSettings
import akka.persistence.r2dbc.internal.Sql.InterpolationWithAdapter
import akka.persistence.r2dbc.internal.postgres.PostgresQueryDao
import io.r2dbc.spi.ConnectionFactory
import io.r2dbc.spi.Row
import org.slf4j.Logger
import org.slf4j.LoggerFactory
import scala.concurrent.ExecutionContext
import scala.concurrent.duration.Duration
import scala.concurrent.duration.FiniteDuration

import org.slf4j.Logger
import org.slf4j.LoggerFactory

import akka.annotation.InternalApi
import akka.persistence.r2dbc.internal.R2dbcExecutorProvider
import akka.persistence.r2dbc.internal.Sql.InterpolationWithAdapter
import akka.persistence.r2dbc.internal.postgres.PostgresQueryDao

/**
* INTERNAL API
*/
@InternalApi
private[r2dbc] class H2QueryDao(settings: R2dbcSettings, executorProvider: R2dbcExecutorProvider)(implicit
ec: ExecutionContext,
system: ActorSystem[_])
extends PostgresQueryDao(settings, executorProvider) {
private[r2dbc] class H2QueryDao(executorProvider: R2dbcExecutorProvider) extends PostgresQueryDao(executorProvider) {
import settings.codecSettings.JournalImplicits._
override protected lazy val log: Logger = LoggerFactory.getLogger(classOf[H2QueryDao])

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,28 +4,20 @@

package akka.persistence.r2dbc.internal.h2

import akka.actor.typed.ActorSystem
import akka.annotation.InternalApi
import akka.persistence.r2dbc.R2dbcSettings
import akka.persistence.r2dbc.internal.Sql.InterpolationWithAdapter
import akka.persistence.r2dbc.internal.postgres.PostgresSnapshotDao
import io.r2dbc.spi.ConnectionFactory
import org.slf4j.Logger
import org.slf4j.LoggerFactory
import scala.concurrent.ExecutionContext

import io.r2dbc.spi.Row

import akka.annotation.InternalApi
import akka.persistence.r2dbc.internal.R2dbcExecutorProvider
import akka.persistence.r2dbc.internal.Sql.InterpolationWithAdapter
import akka.persistence.r2dbc.internal.postgres.PostgresSnapshotDao

/**
* INTERNAL API
*/
@InternalApi
private[r2dbc] final class H2SnapshotDao(settings: R2dbcSettings, executorProvider: R2dbcExecutorProvider)(implicit
ec: ExecutionContext,
system: ActorSystem[_])
extends PostgresSnapshotDao(settings, executorProvider) {
private[r2dbc] final class H2SnapshotDao(executorProvider: R2dbcExecutorProvider)
extends PostgresSnapshotDao(executorProvider) {
import settings.codecSettings.SnapshotImplicits._

override protected lazy val log: Logger = LoggerFactory.getLogger(classOf[H2SnapshotDao])
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ package akka.persistence.r2dbc.internal.postgres
import java.time.{ Duration => JDuration }
import java.util.Locale

import scala.concurrent.ExecutionContext
import scala.concurrent.duration.FiniteDuration

import akka.actor.typed.ActorSystem
Expand Down Expand Up @@ -117,19 +118,18 @@ private[r2dbc] object PostgresDialect extends Dialect {
ConnectionFactories.get(builder.build())
}

override def createJournalDao(settings: R2dbcSettings, executorProvider: R2dbcExecutorProvider)(implicit
system: ActorSystem[_]): JournalDao =
new PostgresJournalDao(settings, executorProvider)(system.executionContext, system)
override def daoExecutionContext(settings: R2dbcSettings, system: ActorSystem[_]): ExecutionContext =
system.executionContext

override def createSnapshotDao(settings: R2dbcSettings, executorProvider: R2dbcExecutorProvider)(implicit
system: ActorSystem[_]): SnapshotDao =
new PostgresSnapshotDao(settings, executorProvider)(system.executionContext, system)
override def createJournalDao(executorProvider: R2dbcExecutorProvider): JournalDao =
new PostgresJournalDao(executorProvider)

override def createQueryDao(settings: R2dbcSettings, executorProvider: R2dbcExecutorProvider)(implicit
system: ActorSystem[_]): QueryDao =
new PostgresQueryDao(settings, executorProvider)(system.executionContext, system)
override def createSnapshotDao(executorProvider: R2dbcExecutorProvider): SnapshotDao =
new PostgresSnapshotDao(executorProvider)

override def createDurableStateDao(settings: R2dbcSettings, executorProvider: R2dbcExecutorProvider)(implicit
system: ActorSystem[_]): DurableStateDao =
new PostgresDurableStateDao(settings, executorProvider, this)(system.executionContext, system)
override def createQueryDao(executorProvider: R2dbcExecutorProvider): QueryDao =
new PostgresQueryDao(executorProvider)

override def createDurableStateDao(executorProvider: R2dbcExecutorProvider): DurableStateDao =
new PostgresDurableStateDao(executorProvider, this)
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ import scala.concurrent.duration.FiniteDuration
import scala.util.control.NonFatal

import io.r2dbc.spi.Connection
import io.r2dbc.spi.ConnectionFactory
import io.r2dbc.spi.R2dbcDataIntegrityViolationException
import io.r2dbc.spi.Row
import io.r2dbc.spi.Statement
Expand Down Expand Up @@ -79,11 +78,11 @@ private[r2dbc] object PostgresDurableStateDao {
* INTERNAL API
*/
@InternalApi
private[r2dbc] class PostgresDurableStateDao(
settings: R2dbcSettings,
executorProvider: R2dbcExecutorProvider,
dialect: Dialect)(implicit ec: ExecutionContext, system: ActorSystem[_])
private[r2dbc] class PostgresDurableStateDao(executorProvider: R2dbcExecutorProvider, dialect: Dialect)
extends DurableStateDao {
protected val settings: R2dbcSettings = executorProvider.settings
protected val system: ActorSystem[_] = executorProvider.system
implicit protected val ec: ExecutionContext = executorProvider.ec
import DurableStateDao._
import PostgresDurableStateDao._
import settings.codecSettings.DurableStateImplicits._
Expand All @@ -93,7 +92,7 @@ private[r2dbc] class PostgresDurableStateDao(
protected val r2dbcExecutor = executorProvider.executorFor(slice = 0) // FIXME support data partitions

// used for change events
private lazy val journalDao: JournalDao = dialect.createJournalDao(settings, executorProvider)
private lazy val journalDao: JournalDao = dialect.createJournalDao(executorProvider)

private lazy val additionalColumns: Map[String, immutable.IndexedSeq[AdditionalColumn[Any, Any]]] = {
settings.durableStateAdditionalColumnClasses.map { case (entityType, columnClasses) =>
Expand Down Expand Up @@ -487,7 +486,7 @@ private[r2dbc] class PostgresDurableStateDao(
handler: ChangeHandler[Any],
connection: Connection,
change: DurableStateChange[Any]): Future[Done] = {
val session = new R2dbcSession(connection)
val session = new R2dbcSession(connection)(ec, system)

def excMessage(cause: Throwable): String = {
val (changeType, revision) = change match {
Expand Down
Loading
Loading