From 5ae854cf73623e0083eda4a4cd441e44c28cb0cc Mon Sep 17 00:00:00 2001 From: Patrik Nordwall Date: Wed, 31 Jan 2024 16:06:58 +0100 Subject: [PATCH] partition over multiple databases --- .github/workflows/build-test.yml | 7 +- core/src/main/resources/reference.conf | 41 ++-- .../persistence/r2dbc/R2dbcSettings.scala | 156 +++++++++----- .../scaladsl/DurableStateCleanup.scala | 12 +- .../scaladsl/EventSourcedCleanup.scala | 15 +- .../r2dbc/internal/BySliceQuery.scala | 4 +- .../persistence/r2dbc/internal/Dialect.scala | 8 +- .../r2dbc/internal/R2dbcExecutor.scala | 27 +++ .../r2dbc/internal/h2/H2Dialect.scala | 25 ++- .../r2dbc/internal/h2/H2DurableStateDao.scala | 6 +- .../r2dbc/internal/h2/H2JournalDao.scala | 11 +- .../r2dbc/internal/h2/H2QueryDao.scala | 7 +- .../r2dbc/internal/h2/H2SnapshotDao.scala | 6 +- .../internal/postgres/PostgresDialect.scala | 18 +- .../postgres/PostgresDurableStateDao.scala | 13 +- .../postgres/PostgresJournalDao.scala | 36 ++-- .../internal/postgres/PostgresQueryDao.scala | 53 ++--- .../postgres/PostgresSnapshotDao.scala | 11 +- .../internal/postgres/YugabyteDialect.scala | 18 +- .../postgres/YugabyteDurableStateDao.scala | 6 +- .../internal/postgres/YugabyteQueryDao.scala | 6 +- .../postgres/YugabyteSnapshotDao.scala | 7 +- .../internal/sqlserver/SqlServerDialect.scala | 18 +- .../sqlserver/SqlServerDurableStateDao.scala | 8 +- .../sqlserver/SqlServerJournalDao.scala | 5 +- .../sqlserver/SqlServerQueryDao.scala | 16 +- .../sqlserver/SqlServerSnapshotDao.scala | 8 +- .../r2dbc/journal/R2dbcJournal.scala | 19 +- .../query/scaladsl/R2dbcReadJournal.scala | 18 +- .../r2dbc/session/scaladsl/R2dbcSession.scala | 1 + .../r2dbc/snapshot/R2dbcSnapshotStore.scala | 15 +- .../scaladsl/R2dbcDurableStateStore.scala | 19 +- .../application-postgres-data-partitions.conf | 12 ++ .../akka/persistence/r2dbc/PayloadSpec.scala | 2 +- .../persistence/r2dbc/R2dbcSettingsSpec.scala | 194 +++++++++++++++--- .../persistence/r2dbc/TestDbLifecycle.scala | 31 +-- .../H2AdditionalInitForSchemaSpec.scala | 3 + .../r2dbc/internal/R2dbcExecutorSpec.scala | 2 + .../journal/PersistSerializedEventSpec.scala | 2 +- .../r2dbc/journal/PersistTagsSpec.scala | 8 +- .../r2dbc/journal/PersistTimestampSpec.scala | 8 +- .../query/EventsBySliceBacktrackingSpec.scala | 2 +- ...eStateUpdateWithChangeEventStoreSpec.scala | 51 ++--- ...s_4.sql => create_tables_postgres_0-1.sql} | 50 +---- ddl-scripts/create_tables_postgres_2-3.sql | 90 ++++++++ docker/docker-compose-postgres-2.yml | 30 +++ .../r2dbc/migration/MigrationTool.scala | 21 +- .../r2dbc/migration/MigrationToolDao.scala | 16 +- 48 files changed, 751 insertions(+), 391 deletions(-) create mode 100644 core/src/test/resources/application-postgres-data-partitions.conf rename ddl-scripts/{create_tables_postgres_4.sql => create_tables_postgres_0-1.sql} (66%) create mode 100644 ddl-scripts/create_tables_postgres_2-3.sql create mode 100644 docker/docker-compose-postgres-2.yml diff --git a/.github/workflows/build-test.yml b/.github/workflows/build-test.yml index 22dc1d57..8eb285f7 100644 --- a/.github/workflows/build-test.yml +++ b/.github/workflows/build-test.yml @@ -138,13 +138,14 @@ jobs: - name: Start DB run: |- - docker compose -f docker/docker-compose-postgres.yml up --wait - docker exec -i postgres-db psql -U postgres -t < ddl-scripts/create_tables_postgres_4.sql + docker compose -f docker/docker-compose-postgres-2.yml up --wait + docker exec -i postgres-db-0 psql -U postgres -t < ddl-scripts/create_tables_postgres_0-1.sql + docker exec -i postgres-db-1 psql -U postgres -t < ddl-scripts/create_tables_postgres_2-3.sql - name: sbt test run: |- cp .jvmopts-ci .jvmopts - sbt -Dakka.persistence.r2dbc.journal.table-data-partitions=4 test + sbt -Dconfig.resource=application-postgres-data-partitions.conf test test-yugabyte: name: Run tests with Yugabyte diff --git a/core/src/main/resources/reference.conf b/core/src/main/resources/reference.conf index 01e7172f..13b1d938 100644 --- a/core/src/main/resources/reference.conf +++ b/core/src/main/resources/reference.conf @@ -7,18 +7,6 @@ akka.persistence.r2dbc { # name of the table to use for events table = "event_journal" - # Number of tables that the journal will be split into. The selection of data partition is - # made from the slice of the persistenceId. Must be between 1 and 1024 and a whole number - # divisor of 1024 (number of slices). - # For example, 4 data-partitions means that slice range (0 to 255) maps to data partition 0, - # (256 to 511) to data partition 1, (512 to 767) to data partition 3, and (768 to 1023) to - # data partition 3. - # The event_journal tables will have the data partition as suffix, e.g. _0, _1, _2, _3. - # When data-partitions is 1 there will only be one journal table, without suffix. - # This configuration cannot be changed in a rolling update, since the data must be moved - # between the tables if number of data partitions is changed. - table-data-partitions = 1 - # the column type to use for event payloads (BYTEA or JSONB) payload-column-type = "BYTEA" @@ -195,6 +183,33 @@ akka.persistence.r2dbc { } } +// #data-partition-settings +# Number of tables and databases that the data will be split into. The selection of data +# partition is made from the slice of the persistenceId. +# For example, 4 data partitions means that slice range (0 to 255) maps to data partition 0, +# (256 to 511) to data partition 1, (512 to 767) to data partition 3, and (768 to 1023) to +# data partition 3. +# This configuration cannot be changed in a rolling update, since the data must be moved +# between the tables if number of data partitions is changed. +akka.persistence.r2dbc.data-partition { + # How many tables the data will be partitioned over. The tables will have + # the data partition as suffix, e.g. event_journal_0, event_journal_1. + # Must be between 1 and 1024 and a whole number divisor of 1024 (number of slices). + # When number-of-partitions is 1 the table name is without suffix. + number-of-partitions = 1 + # How many databases the tables will be partitioned over. A database corresponds to a connection + # factory with its own connection pool. + # Must be a whole number divisor of number-of-partitions, and less than or equal to number-of-partitions. + # For example, number-of-partitions=8 and number-of-databases=2 means that there will be a total of + # 8 tables in 2 databases, i.e. 4 tables in each database. + # The connection-factory setting will have the data partition range as suffix, e.g. with 8 data partitions and + # 2 databases the connection factory settings are connection-factory-0-3, connection-factory-4-7. + # When number-of-databases is 1 there will only be one connection factory, without suffix. + # number-of-databases > 1 not supported by H2. + number-of-databases = 1 +} +// #data-partition-settings + // #connection-settings akka.persistence.r2dbc { @@ -365,9 +380,9 @@ akka.persistence.r2dbc { # 'akka.persistence.r2dbc' these have to be re-pointed to the actual config location schema = ${akka.persistence.r2dbc.schema} journal-table = ${akka.persistence.r2dbc.journal.table} - journal-table-data-partitions = ${akka.persistence.r2dbc.journal.table-data-partitions} state-table = ${akka.persistence.r2dbc.state.table} snapshot-table = ${akka.persistence.r2dbc.snapshot.table} + number-of-partitions = ${akka.persistence.r2dbc.data-partition.number-of-partitions} // #connection-settings-h2 } diff --git a/core/src/main/scala/akka/persistence/r2dbc/R2dbcSettings.scala b/core/src/main/scala/akka/persistence/r2dbc/R2dbcSettings.scala index def87422..c6b29a2d 100644 --- a/core/src/main/scala/akka/persistence/r2dbc/R2dbcSettings.scala +++ b/core/src/main/scala/akka/persistence/r2dbc/R2dbcSettings.scala @@ -4,6 +4,7 @@ package akka.persistence.r2dbc +import scala.collection.immutable import akka.annotation.InternalApi import akka.annotation.InternalStableApi import akka.persistence.r2dbc.internal.codec.IdentityAdapter @@ -37,18 +38,10 @@ object R2dbcSettings { "see akka-persistence-r2dbc documentation for details on the new configuration scheme: " + "https://doc.akka.io/docs/akka-persistence-r2dbc/current/migration-guide.html") } - if (!config.hasPath("connection-factory.dialect")) { - throw new IllegalArgumentException( - "The Akka Persistence R2DBC database config scheme has changed, the config needs to be updated " + - "to choose database dialect using the connection-factory block, " + - "see akka-persistence-r2dbc documentation for details on the new configuration scheme: " + - "https://doc.akka.io/docs/akka-persistence-r2dbc/current/migration-guide.html") - } val schema: Option[String] = Option(config.getString("schema")).filterNot(_.trim.isEmpty) val journalTable: String = config.getString("journal.table") - val journalTableDataPartitions = config.getInt("journal.table-data-partitions") def useJsonPayload(prefix: String) = config.getString(s"$prefix.payload-column-type").toUpperCase match { case "BYTEA" => false @@ -90,13 +83,51 @@ object R2dbcSettings { val durableStateAssertSingleWriter: Boolean = config.getBoolean("state.assert-single-writer") - val connectionFactorySettings = ConnectionFactorySettings(config.getConfig("connection-factory")) + val numberOfDataPartitions = config.getInt("data-partition.number-of-partitions") + require( + 1 <= numberOfDataPartitions && numberOfDataPartitions <= NumberOfSlices, + s"data-partition.number-of-partitions [$numberOfDataPartitions] must be between 1 and $NumberOfSlices") + require( + numberOfDataPartitions * (NumberOfSlices / numberOfDataPartitions) == NumberOfSlices, + s"data-partition.number-of-partitions [$numberOfDataPartitions] must be a whole number divisor of " + + s"numberOfSlices [$NumberOfSlices].") + + val numberOfDatabases = config.getInt("data-partition.number-of-databases") + require( + 1 <= numberOfDatabases && numberOfDatabases <= numberOfDataPartitions, + s"data-partition.number-of-databases [$numberOfDatabases] must be between 1 and $numberOfDataPartitions") + require( + numberOfDatabases * (numberOfDataPartitions / numberOfDatabases) == numberOfDataPartitions, + s"data-partition.number-of-databases [$numberOfDatabases] must be a whole number divisor of " + + s"data-partition.number-of-partitions [$numberOfDataPartitions].") + + val connectionFactorySettings = + if (numberOfDatabases == 1) { + if (!config.hasPath("connection-factory.dialect")) { + throw new IllegalArgumentException( + "The Akka Persistence R2DBC database config scheme has changed, the config needs to be updated " + + "to choose database dialect using the connection-factory block, " + + "see akka-persistence-r2dbc documentation for details on the new configuration scheme: " + + "https://doc.akka.io/docs/akka-persistence-r2dbc/current/migration-guide.html") + } + Vector(ConnectionFactorySettings(config.getConfig("connection-factory"))) + } else { + val rangeSize = numberOfDataPartitions / numberOfDatabases + (0 until numberOfDatabases).map { i => + val configPropertyName = s"connection-factory-${i * rangeSize}-${i * rangeSize + rangeSize - 1}" + ConnectionFactorySettings(config.getConfig(configPropertyName)) + } + } + + require( + connectionFactorySettings.map(_.dialect.name).toSet.size == 1, + s"All dialects for the [${connectionFactorySettings.size}] database partitions must be the same.") val (tagsCodec: TagsCodec, timestampCodec: TimestampCodec, queryAdapter: QueryAdapter) = { - connectionFactorySettings.dialect.name match { + connectionFactorySettings(0).dialect.name match { case "sqlserver" => ( - new TagsCodec.SqlServerTagsCodec(connectionFactorySettings.config), + new TagsCodec.SqlServerTagsCodec(connectionFactorySettings(0).config), TimestampCodec.SqlServerTimestampCodec, SqlServerQueryAdapter) case "h2" => (TagsCodec.H2TagsCodec, TimestampCodec.H2TimestampCodec, IdentityAdapter) @@ -120,7 +151,6 @@ object R2dbcSettings { val settingsFromConfig = new R2dbcSettings( schema, journalTable, - journalTableDataPartitions, journalPayloadCodec, journalPublishEvents, snapshotsTable, @@ -139,7 +169,8 @@ object R2dbcSettings { durableStateTableByEntityType, durableStateAdditionalColumnClasses, durableStateChangeHandlerClasses, - useAppTimestamp) + useAppTimestamp, + numberOfDataPartitions) // let the dialect trump settings that does not make sense for it settingsFromConfig.connectionFactorySettings.dialect.adaptSettings(settingsFromConfig) @@ -149,6 +180,7 @@ object R2dbcSettings { import akka.util.ccompat.JavaConverters._ cfg.root.unwrapped.asScala.toMap.map { case (k, v) => k -> v.toString } } + } /** @@ -158,7 +190,6 @@ object R2dbcSettings { final class R2dbcSettings private ( val schema: Option[String], val journalTable: String, - val journalTableDataPartitions: Int, val journalPayloadCodec: PayloadCodec, val journalPublishEvents: Boolean, val snapshotsTable: String, @@ -173,53 +204,51 @@ final class R2dbcSettings private ( val querySettings: QuerySettings, val dbTimestampMonotonicIncreasing: Boolean, val cleanupSettings: CleanupSettings, - _connectionFactorySettings: ConnectionFactorySettings, + _connectionFactorySettings: immutable.IndexedSeq[ConnectionFactorySettings], _durableStateTableByEntityType: Map[String, String], _durableStateAdditionalColumnClasses: Map[String, immutable.IndexedSeq[String]], _durableStateChangeHandlerClasses: Map[String, String], - _useAppTimestamp: Boolean) { + _useAppTimestamp: Boolean, + val numberOfDataPartitions: Int) { import R2dbcSettings.NumberOfSlices - require( - 0 <= journalTableDataPartitions && journalTableDataPartitions <= NumberOfSlices, - s"journalTableDataPartitions must be between 1 and $NumberOfSlices") - require( - journalTableDataPartitions * NumberOfSlices / journalTableDataPartitions == NumberOfSlices, - s"journalTableDataPartitions [$journalTableDataPartitions] must be a whole number divisor of numberOfSlices [$NumberOfSlices].") - private val journalTableWithSchema: String = schema.map(_ + ".").getOrElse("") + journalTable val snapshotsTableWithSchema: String = schema.map(_ + ".").getOrElse("") + snapshotsTable val durableStateTableWithSchema: String = schema.map(_ + ".").getOrElse("") + durableStateTable def journalTableWithSchema(slice: Int): String = { - if (journalTableDataPartitions == 1) { + if (numberOfDataPartitions == 1) journalTableWithSchema - } else { - val dataPartition = slice / (NumberOfSlices / journalTableDataPartitions) - s"${journalTableWithSchema}_$dataPartition" - } + else + s"${journalTableWithSchema}_${dataPartition(slice)}" } - val alljournalTablesWithSchema: Set[String] = { - (0 until NumberOfSlices).map { slice => - journalTableWithSchema(slice) - }.toSet - } - - def isJournalSliceRangeWithinSameDataPartition(minSlice: Int, maxSlice: Int): Boolean = { - if (journalTableDataPartitions == 1) - true - else { - val dataPartition1 = minSlice / (NumberOfSlices / journalTableDataPartitions) - val dataPartition2 = maxSlice / (NumberOfSlices / journalTableDataPartitions) - dataPartition1 == dataPartition2 + /** + * INTERNAL API: All journal tables and their the lower slice + */ + @InternalApi private[akka] val alljournalTablesWithSchema: Map[String, Int] = { + (0 until NumberOfSlices).foldLeft(Map.empty[String, Int]) { case (acc, slice) => + val table = journalTableWithSchema(slice) + if (acc.contains(table)) acc + else acc.updated(table, slice) } } + /** + * INTERNAL API + */ + @InternalApi private[akka] val numberOfDatabases: Int = _connectionFactorySettings.size + + def isSliceRangeWithinSameDataPartition(minSlice: Int, maxSlice: Int): Boolean = + numberOfDataPartitions == 1 || dataPartition(minSlice) == dataPartition(maxSlice) + + private def dataPartition(slice: Int): Int = + slice / (NumberOfSlices / numberOfDataPartitions) + /** * One of the supported dialects 'postgres', 'yugabyte', 'sqlserver' or 'h2' */ - def dialectName: String = _connectionFactorySettings.dialect.name + def dialectName: String = connectionFactorySettings.dialect.name def getDurableStateTable(entityType: String): String = _durableStateTableByEntityType.getOrElse(entityType, durableStateTable) @@ -268,12 +297,34 @@ final class R2dbcSettings private ( /** * INTERNAL API */ - @InternalApi private[akka] def connectionFactorySettings: ConnectionFactorySettings = _connectionFactorySettings + @InternalApi private[akka] def connectionFactorySettings: ConnectionFactorySettings = + connectionFactorySettings(0) + + /** + * INTERNAL API + */ + @InternalApi private[akka] def connectionFactorySettings(slice: Int): ConnectionFactorySettings = { + val rangeSize = numberOfDataPartitions / numberOfDatabases + val i = dataPartition(slice) / rangeSize + _connectionFactorySettings(i) + } + + /** + * INTERNAL API + */ + @InternalApi private[akka] def resolveConnectionFactoryConfigPath(baseConfigPath: String, slice: Int): String = { + if (numberOfDatabases == 1) { + baseConfigPath + } else { + val rangeSize = numberOfDataPartitions / numberOfDatabases + val i = dataPartition(slice) / rangeSize + s"$baseConfigPath-${i * rangeSize}-${i * rangeSize + rangeSize - 1}" + } + } private def copy( schema: Option[String] = schema, journalTable: String = journalTable, - journalTableDataPartitions: Int = journalTableDataPartitions, journalPayloadCodec: PayloadCodec = journalPayloadCodec, journalPublishEvents: Boolean = journalPublishEvents, snapshotsTable: String = snapshotsTable, @@ -288,16 +339,16 @@ final class R2dbcSettings private ( querySettings: QuerySettings = querySettings, dbTimestampMonotonicIncreasing: Boolean = dbTimestampMonotonicIncreasing, cleanupSettings: CleanupSettings = cleanupSettings, - connectionFactorySettings: ConnectionFactorySettings = connectionFactorySettings, + connectionFactorySettings: immutable.IndexedSeq[ConnectionFactorySettings] = _connectionFactorySettings, durableStateTableByEntityType: Map[String, String] = _durableStateTableByEntityType, durableStateAdditionalColumnClasses: Map[String, immutable.IndexedSeq[String]] = _durableStateAdditionalColumnClasses, durableStateChangeHandlerClasses: Map[String, String] = _durableStateChangeHandlerClasses, - useAppTimestamp: Boolean = _useAppTimestamp): R2dbcSettings = + useAppTimestamp: Boolean = _useAppTimestamp, + numberOfDataPartitions: Int = numberOfDataPartitions): R2dbcSettings = new R2dbcSettings( schema, journalTable, - journalTableDataPartitions: Int, journalPayloadCodec, journalPublishEvents, snapshotsTable, @@ -313,13 +364,14 @@ final class R2dbcSettings private ( dbTimestampMonotonicIncreasing, cleanupSettings, connectionFactorySettings, - _durableStateTableByEntityType, - _durableStateAdditionalColumnClasses, - _durableStateChangeHandlerClasses, - useAppTimestamp) + durableStateTableByEntityType, + durableStateAdditionalColumnClasses, + durableStateChangeHandlerClasses, + useAppTimestamp, + numberOfDataPartitions) override def toString = - s"R2dbcSettings(dialectName=$dialectName, schema=$schema, journalTable=$journalTable, snapshotsTable=$snapshotsTable, durableStateTable=$durableStateTable, logDbCallsExceeding=$logDbCallsExceeding, dbTimestampMonotonicIncreasing=$dbTimestampMonotonicIncreasing, useAppTimestamp=$useAppTimestamp)" + s"R2dbcSettings(dialectName=$dialectName, schema=$schema, journalTable=$journalTable, snapshotsTable=$snapshotsTable, durableStateTable=$durableStateTable, logDbCallsExceeding=$logDbCallsExceeding, dbTimestampMonotonicIncreasing=$dbTimestampMonotonicIncreasing, useAppTimestamp=$useAppTimestamp, numberOfDataPartitions=$numberOfDataPartitions)" } /** diff --git a/core/src/main/scala/akka/persistence/r2dbc/cleanup/scaladsl/DurableStateCleanup.scala b/core/src/main/scala/akka/persistence/r2dbc/cleanup/scaladsl/DurableStateCleanup.scala index ab39e4e6..0572792f 100644 --- a/core/src/main/scala/akka/persistence/r2dbc/cleanup/scaladsl/DurableStateCleanup.scala +++ b/core/src/main/scala/akka/persistence/r2dbc/cleanup/scaladsl/DurableStateCleanup.scala @@ -9,6 +9,8 @@ import scala.concurrent.Future import scala.util.Failure import scala.util.Success +import org.slf4j.LoggerFactory + import akka.Done import akka.actor.ClassicActorSystemProvider import akka.actor.typed.ActorSystem @@ -16,10 +18,8 @@ import akka.actor.typed.scaladsl.LoggerOps import akka.annotation.ApiMayChange import akka.annotation.InternalApi import akka.dispatch.ExecutionContexts -import akka.persistence.r2dbc.ConnectionFactoryProvider import akka.persistence.r2dbc.R2dbcSettings -import akka.persistence.r2dbc.internal.DurableStateDao -import org.slf4j.LoggerFactory +import akka.persistence.r2dbc.internal.R2dbcExecutorProvider /** * Scala API: Tool for deleting durable state for a given list of `persistenceIds` without using `DurableStateBehavior` @@ -57,9 +57,9 @@ final class DurableStateCleanup(systemProvider: ClassicActorSystemProvider, conf private val sharedConfigPath = configPath.replaceAll("""\.cleanup$""", "") private val settings = R2dbcSettings(system.settings.config.getConfig(sharedConfigPath)) - private val connectionFactory = - ConnectionFactoryProvider(system).connectionFactoryFor(sharedConfigPath + ".connection-factory") - private val stateDao = settings.connectionFactorySettings.dialect.createDurableStateDao(settings, connectionFactory) + private val executorProvider = + new R2dbcExecutorProvider(settings, sharedConfigPath + ".connection-factory", LoggerFactory.getLogger(getClass)) + private val stateDao = settings.connectionFactorySettings.dialect.createDurableStateDao(settings, executorProvider) /** * Delete the state related to one single `persistenceId`. diff --git a/core/src/main/scala/akka/persistence/r2dbc/cleanup/scaladsl/EventSourcedCleanup.scala b/core/src/main/scala/akka/persistence/r2dbc/cleanup/scaladsl/EventSourcedCleanup.scala index 7e3932c2..9244ad1e 100644 --- a/core/src/main/scala/akka/persistence/r2dbc/cleanup/scaladsl/EventSourcedCleanup.scala +++ b/core/src/main/scala/akka/persistence/r2dbc/cleanup/scaladsl/EventSourcedCleanup.scala @@ -11,6 +11,8 @@ import scala.concurrent.Future import scala.util.Failure import scala.util.Success +import org.slf4j.LoggerFactory + import akka.Done import akka.actor.ClassicActorSystemProvider import akka.actor.typed.ActorSystem @@ -18,11 +20,8 @@ import akka.actor.typed.scaladsl.LoggerOps import akka.annotation.ApiMayChange import akka.annotation.InternalApi import akka.persistence.SnapshotSelectionCriteria -import akka.persistence.r2dbc.ConnectionFactoryProvider import akka.persistence.r2dbc.R2dbcSettings -import akka.persistence.r2dbc.internal.JournalDao -import akka.persistence.r2dbc.internal.SnapshotDao -import org.slf4j.LoggerFactory +import akka.persistence.r2dbc.internal.R2dbcExecutorProvider /** * Scala API: Tool for deleting all events and/or snapshots for a given list of `persistenceIds` without using @@ -60,10 +59,10 @@ final class EventSourcedCleanup(systemProvider: ClassicActorSystemProvider, conf private val sharedConfigPath = configPath.replaceAll("""\.cleanup$""", "") private val settings = R2dbcSettings(system.settings.config.getConfig(sharedConfigPath)) - private val connectionFactory = - ConnectionFactoryProvider(system).connectionFactoryFor(sharedConfigPath + ".connection-factory") - private val journalDao = settings.connectionFactorySettings.dialect.createJournalDao(settings, connectionFactory) - private val snapshotDao = settings.connectionFactorySettings.dialect.createSnapshotDao(settings, connectionFactory) + private val executorProvider = + new R2dbcExecutorProvider(settings, sharedConfigPath + ".connection-factory", LoggerFactory.getLogger(getClass)) + private val journalDao = settings.connectionFactorySettings.dialect.createJournalDao(settings, executorProvider) + private val snapshotDao = settings.connectionFactorySettings.dialect.createSnapshotDao(settings, executorProvider) /** * Delete all events before a sequenceNr for the given persistence id. Snapshots are not deleted. diff --git a/core/src/main/scala/akka/persistence/r2dbc/internal/BySliceQuery.scala b/core/src/main/scala/akka/persistence/r2dbc/internal/BySliceQuery.scala index eaa88009..e5370e1c 100644 --- a/core/src/main/scala/akka/persistence/r2dbc/internal/BySliceQuery.scala +++ b/core/src/main/scala/akka/persistence/r2dbc/internal/BySliceQuery.scala @@ -150,7 +150,7 @@ import org.slf4j.Logger } trait Dao[SerializedRow] { - def currentDbTimestamp(): Future[Instant] + def currentDbTimestamp(slice: Int): Future[Instant] def rowsBySlices( entityType: String, @@ -262,7 +262,7 @@ import org.slf4j.Logger val currentTimestamp = if (settings.useAppTimestamp) Future.successful(InstantFactory.now()) - else dao.currentDbTimestamp() + else dao.currentDbTimestamp(minSlice) Source .futureSource[Envelope, NotUsed] { diff --git a/core/src/main/scala/akka/persistence/r2dbc/internal/Dialect.scala b/core/src/main/scala/akka/persistence/r2dbc/internal/Dialect.scala index 6562ce42..0b2636d6 100644 --- a/core/src/main/scala/akka/persistence/r2dbc/internal/Dialect.scala +++ b/core/src/main/scala/akka/persistence/r2dbc/internal/Dialect.scala @@ -31,15 +31,15 @@ private[r2dbc] trait Dialect { def createConnectionFactory(config: Config): ConnectionFactory - def createJournalDao(settings: R2dbcSettings, connectionFactory: ConnectionFactory)(implicit + def createJournalDao(settings: R2dbcSettings, executorProvider: R2dbcExecutorProvider)(implicit system: ActorSystem[_]): JournalDao - def createQueryDao(settings: R2dbcSettings, connectionFactory: ConnectionFactory)(implicit + def createQueryDao(settings: R2dbcSettings, executorProvider: R2dbcExecutorProvider)(implicit system: ActorSystem[_]): QueryDao - def createSnapshotDao(settings: R2dbcSettings, connectionFactory: ConnectionFactory)(implicit + def createSnapshotDao(settings: R2dbcSettings, executorProvider: R2dbcExecutorProvider)(implicit system: ActorSystem[_]): SnapshotDao - def createDurableStateDao(settings: R2dbcSettings, connectionFactory: ConnectionFactory)(implicit + def createDurableStateDao(settings: R2dbcSettings, executorProvider: R2dbcExecutorProvider)(implicit system: ActorSystem[_]): DurableStateDao } diff --git a/core/src/main/scala/akka/persistence/r2dbc/internal/R2dbcExecutor.scala b/core/src/main/scala/akka/persistence/r2dbc/internal/R2dbcExecutor.scala index e89126aa..3187f6c3 100644 --- a/core/src/main/scala/akka/persistence/r2dbc/internal/R2dbcExecutor.scala +++ b/core/src/main/scala/akka/persistence/r2dbc/internal/R2dbcExecutor.scala @@ -34,6 +34,10 @@ import org.slf4j.Logger import reactor.core.publisher.Flux import reactor.core.publisher.Mono +import akka.annotation.InternalApi +import akka.persistence.r2dbc.ConnectionFactoryProvider +import akka.persistence.r2dbc.R2dbcSettings + /** * INTERNAL API */ @@ -383,3 +387,26 @@ class R2dbcExecutor( Future.successful(Done) } } + +/** + * INTERNAL API + */ +@InternalApi private[akka] class R2dbcExecutorProvider( + val settings: R2dbcSettings, + connectionFactoryBaseConfigPath: String, + log: Logger)(implicit ec: ExecutionContext, system: ActorSystem[_]) { + private val connectionFactoryProvider = ConnectionFactoryProvider(system) + + def executorFor(slice: Int): R2dbcExecutor = { + val connectionFactoryConfigPath = + settings.resolveConnectionFactoryConfigPath(connectionFactoryBaseConfigPath, slice) + val connectionFactory = connectionFactoryProvider.connectionFactoryFor(connectionFactoryConfigPath) + // FIXME cache the instances + new R2dbcExecutor( + connectionFactory, + log, + settings.logDbCallsExceeding, + settings.connectionFactorySettings.poolSettings.closeCallsExceeding) + } + +} diff --git a/core/src/main/scala/akka/persistence/r2dbc/internal/h2/H2Dialect.scala b/core/src/main/scala/akka/persistence/r2dbc/internal/h2/H2Dialect.scala index 75117a55..78497920 100644 --- a/core/src/main/scala/akka/persistence/r2dbc/internal/h2/H2Dialect.scala +++ b/core/src/main/scala/akka/persistence/r2dbc/internal/h2/H2Dialect.scala @@ -24,6 +24,7 @@ import java.util.Locale import scala.concurrent.ExecutionContext +import akka.persistence.r2dbc.internal.R2dbcExecutorProvider import akka.persistence.r2dbc.internal.codec.IdentityAdapter import akka.persistence.r2dbc.internal.codec.QueryAdapter @@ -36,6 +37,8 @@ private[r2dbc] object H2Dialect extends Dialect { override def name: String = "h2" override def adaptSettings(settings: R2dbcSettings): R2dbcSettings = { + if (settings.numberOfDatabases > 1) + throw new IllegalArgumentException("H2 dialect doesn't support more than one data-partition.number-of-databases") val res = settings // app timestamp is db timestamp because same process .withUseAppTimestamp(true) @@ -84,21 +87,21 @@ private[r2dbc] object H2Dialect extends Dialect { new H2ConnectionFactory(h2Config) } - override def createJournalDao(settings: R2dbcSettings, connectionFactory: ConnectionFactory)(implicit + override def createJournalDao(settings: R2dbcSettings, executorProvider: R2dbcExecutorProvider)(implicit system: ActorSystem[_]): JournalDao = - new H2JournalDao(settings, connectionFactory)(ecForDaos(system, settings), system) + new H2JournalDao(settings, executorProvider)(ecForDaos(system, settings), system) - override def createSnapshotDao(settings: R2dbcSettings, connectionFactory: ConnectionFactory)(implicit + override def createSnapshotDao(settings: R2dbcSettings, executorProvider: R2dbcExecutorProvider)(implicit system: ActorSystem[_]): SnapshotDao = - new H2SnapshotDao(settings, connectionFactory)(ecForDaos(system, settings), system) + new H2SnapshotDao(settings, executorProvider)(ecForDaos(system, settings), system) - override def createQueryDao(settings: R2dbcSettings, connectionFactory: ConnectionFactory)(implicit + override def createQueryDao(settings: R2dbcSettings, executorProvider: R2dbcExecutorProvider)(implicit system: ActorSystem[_]): QueryDao = - new H2QueryDao(settings, connectionFactory)(ecForDaos(system, settings), system) + new H2QueryDao(settings, executorProvider)(ecForDaos(system, settings), system) - override def createDurableStateDao(settings: R2dbcSettings, connectionFactory: ConnectionFactory)(implicit + override def createDurableStateDao(settings: R2dbcSettings, executorProvider: R2dbcExecutorProvider)(implicit system: ActorSystem[_]): DurableStateDao = - new H2DurableStateDao(settings, connectionFactory, this)(ecForDaos(system, settings), system) + new H2DurableStateDao(settings, executorProvider, this)(ecForDaos(system, settings), system) private def ecForDaos(system: ActorSystem[_], settings: R2dbcSettings): ExecutionContext = { // H2 R2DBC driver blocks in surprising places (Mono.toFuture in stmt.execute().asFuture()) @@ -113,14 +116,14 @@ private[r2dbc] object H2Dialect extends Dialect { else Some(s) } val schema = optionalConfString("schema") + val numberOfDataPartitions = config.getInt("number-of-partitions") val journalTable = config.getString("journal-table") - val journalTableDataPartitions = config.getInt("journal-table-data-partitions") val journalTableWithSchema = schema.map(_ + ".").getOrElse("") + journalTable val allJournalTablesWithSchema = - if (journalTableDataPartitions == 1) + if (numberOfDataPartitions == 1) Vector(journalTableWithSchema) else - (0 until journalTableDataPartitions).map { dataPartition => + (0 until numberOfDataPartitions).map { dataPartition => s"${journalTableWithSchema}_$dataPartition" } val snapshotTable = config.getString("snapshot-table") diff --git a/core/src/main/scala/akka/persistence/r2dbc/internal/h2/H2DurableStateDao.scala b/core/src/main/scala/akka/persistence/r2dbc/internal/h2/H2DurableStateDao.scala index 579401a4..e25d1382 100644 --- a/core/src/main/scala/akka/persistence/r2dbc/internal/h2/H2DurableStateDao.scala +++ b/core/src/main/scala/akka/persistence/r2dbc/internal/h2/H2DurableStateDao.scala @@ -8,7 +8,6 @@ import scala.concurrent.ExecutionContext import scala.concurrent.duration.Duration import scala.concurrent.duration.FiniteDuration -import io.r2dbc.spi.ConnectionFactory import org.slf4j.Logger import org.slf4j.LoggerFactory @@ -16,6 +15,7 @@ import akka.actor.typed.ActorSystem import akka.annotation.InternalApi import akka.persistence.r2dbc.R2dbcSettings import akka.persistence.r2dbc.internal.Dialect +import akka.persistence.r2dbc.internal.R2dbcExecutorProvider import akka.persistence.r2dbc.internal.postgres.PostgresDurableStateDao /** @@ -24,9 +24,9 @@ import akka.persistence.r2dbc.internal.postgres.PostgresDurableStateDao @InternalApi private[r2dbc] final class H2DurableStateDao( settings: R2dbcSettings, - connectionFactory: ConnectionFactory, + executorProvider: R2dbcExecutorProvider, dialect: Dialect)(implicit ec: ExecutionContext, system: ActorSystem[_]) - extends PostgresDurableStateDao(settings, connectionFactory, dialect) { + extends PostgresDurableStateDao(settings, executorProvider, dialect) { override protected lazy val log: Logger = LoggerFactory.getLogger(classOf[H2DurableStateDao]) diff --git a/core/src/main/scala/akka/persistence/r2dbc/internal/h2/H2JournalDao.scala b/core/src/main/scala/akka/persistence/r2dbc/internal/h2/H2JournalDao.scala index bad5e664..ffeaefeb 100644 --- a/core/src/main/scala/akka/persistence/r2dbc/internal/h2/H2JournalDao.scala +++ b/core/src/main/scala/akka/persistence/r2dbc/internal/h2/H2JournalDao.scala @@ -12,7 +12,6 @@ import akka.persistence.r2dbc.internal.JournalDao import akka.persistence.r2dbc.internal.codec.PayloadCodec.RichStatement import akka.persistence.r2dbc.internal.Sql.Interpolation import akka.persistence.r2dbc.internal.postgres.PostgresJournalDao -import io.r2dbc.spi.ConnectionFactory import io.r2dbc.spi.Statement import org.slf4j.Logger import org.slf4j.LoggerFactory @@ -24,15 +23,16 @@ import scala.concurrent.Future import io.r2dbc.spi.Connection import akka.persistence.r2dbc.internal.R2dbcExecutor +import akka.persistence.r2dbc.internal.R2dbcExecutorProvider /** * INTERNAL API */ @InternalApi -private[r2dbc] class H2JournalDao(journalSettings: R2dbcSettings, connectionFactory: ConnectionFactory)(implicit +private[r2dbc] class H2JournalDao(journalSettings: R2dbcSettings, executorProvider: R2dbcExecutorProvider)(implicit ec: ExecutionContext, system: ActorSystem[_]) - extends PostgresJournalDao(journalSettings, connectionFactory) { + extends PostgresJournalDao(journalSettings, executorProvider) { import JournalDao.SerializedJournalRow override protected lazy val log: Logger = LoggerFactory.getLogger(classOf[H2JournalDao]) // always app timestamp (db is same process) monotonic increasing @@ -59,14 +59,15 @@ private[r2dbc] class H2JournalDao(journalSettings: R2dbcSettings, connectionFact // it's always the same persistenceId for all events val persistenceId = events.head.persistenceId val slice = persistenceExt.sliceForPersistenceId(persistenceId) + val executor = executorProvider.executorFor(slice) val totalEvents = events.size val result = if (totalEvents == 1) { - r2dbcExecutor.updateOne(s"insert [$persistenceId]")(connection => + executor.updateOne(s"insert [$persistenceId]")(connection => bindInsertStatement(connection.createStatement(insertSql(slice)), events.head)) } else { - r2dbcExecutor.updateInBatch(s"batch insert [$persistenceId], [$totalEvents] events")(connection => + executor.updateInBatch(s"batch insert [$persistenceId], [$totalEvents] events")(connection => events.foldLeft(connection.createStatement(insertSql(slice))) { (stmt, write) => stmt.add() bindInsertStatement(stmt, write) diff --git a/core/src/main/scala/akka/persistence/r2dbc/internal/h2/H2QueryDao.scala b/core/src/main/scala/akka/persistence/r2dbc/internal/h2/H2QueryDao.scala index 18d26059..1b57da5b 100644 --- a/core/src/main/scala/akka/persistence/r2dbc/internal/h2/H2QueryDao.scala +++ b/core/src/main/scala/akka/persistence/r2dbc/internal/h2/H2QueryDao.scala @@ -13,19 +13,20 @@ import io.r2dbc.spi.ConnectionFactory import io.r2dbc.spi.Row import org.slf4j.Logger import org.slf4j.LoggerFactory - import scala.concurrent.ExecutionContext import scala.concurrent.duration.Duration import scala.concurrent.duration.FiniteDuration +import akka.persistence.r2dbc.internal.R2dbcExecutorProvider + /** * INTERNAL API */ @InternalApi -private[r2dbc] class H2QueryDao(settings: R2dbcSettings, connectionFactory: ConnectionFactory)(implicit +private[r2dbc] class H2QueryDao(settings: R2dbcSettings, executorProvider: R2dbcExecutorProvider)(implicit ec: ExecutionContext, system: ActorSystem[_]) - extends PostgresQueryDao(settings, connectionFactory) { + extends PostgresQueryDao(settings, executorProvider) { override protected lazy val log: Logger = LoggerFactory.getLogger(classOf[H2QueryDao]) override protected def eventsBySlicesRangeSql( diff --git a/core/src/main/scala/akka/persistence/r2dbc/internal/h2/H2SnapshotDao.scala b/core/src/main/scala/akka/persistence/r2dbc/internal/h2/H2SnapshotDao.scala index c9826b03..9684743a 100644 --- a/core/src/main/scala/akka/persistence/r2dbc/internal/h2/H2SnapshotDao.scala +++ b/core/src/main/scala/akka/persistence/r2dbc/internal/h2/H2SnapshotDao.scala @@ -16,14 +16,16 @@ import scala.concurrent.ExecutionContext import io.r2dbc.spi.Row +import akka.persistence.r2dbc.internal.R2dbcExecutorProvider + /** * INTERNAL API */ @InternalApi -private[r2dbc] final class H2SnapshotDao(settings: R2dbcSettings, connectionFactory: ConnectionFactory)(implicit +private[r2dbc] final class H2SnapshotDao(settings: R2dbcSettings, executorProvider: R2dbcExecutorProvider)(implicit ec: ExecutionContext, system: ActorSystem[_]) - extends PostgresSnapshotDao(settings, connectionFactory) { + extends PostgresSnapshotDao(settings, executorProvider) { override protected lazy val log: Logger = LoggerFactory.getLogger(classOf[H2SnapshotDao]) diff --git a/core/src/main/scala/akka/persistence/r2dbc/internal/postgres/PostgresDialect.scala b/core/src/main/scala/akka/persistence/r2dbc/internal/postgres/PostgresDialect.scala index d8dad720..90529c9b 100644 --- a/core/src/main/scala/akka/persistence/r2dbc/internal/postgres/PostgresDialect.scala +++ b/core/src/main/scala/akka/persistence/r2dbc/internal/postgres/PostgresDialect.scala @@ -25,6 +25,8 @@ import io.r2dbc.spi.ConnectionFactories import io.r2dbc.spi.ConnectionFactory import io.r2dbc.spi.ConnectionFactoryOptions +import akka.persistence.r2dbc.internal.R2dbcExecutorProvider + /** * INTERNAL API */ @@ -115,19 +117,19 @@ private[r2dbc] object PostgresDialect extends Dialect { ConnectionFactories.get(builder.build()) } - override def createJournalDao(settings: R2dbcSettings, connectionFactory: ConnectionFactory)(implicit + override def createJournalDao(settings: R2dbcSettings, executorProvider: R2dbcExecutorProvider)(implicit system: ActorSystem[_]): JournalDao = - new PostgresJournalDao(settings, connectionFactory)(system.executionContext, system) + new PostgresJournalDao(settings, executorProvider)(system.executionContext, system) - override def createSnapshotDao(settings: R2dbcSettings, connectionFactory: ConnectionFactory)(implicit + override def createSnapshotDao(settings: R2dbcSettings, executorProvider: R2dbcExecutorProvider)(implicit system: ActorSystem[_]): SnapshotDao = - new PostgresSnapshotDao(settings, connectionFactory)(system.executionContext, system) + new PostgresSnapshotDao(settings, executorProvider)(system.executionContext, system) - override def createQueryDao(settings: R2dbcSettings, connectionFactory: ConnectionFactory)(implicit + override def createQueryDao(settings: R2dbcSettings, executorProvider: R2dbcExecutorProvider)(implicit system: ActorSystem[_]): QueryDao = - new PostgresQueryDao(settings, connectionFactory)(system.executionContext, system) + new PostgresQueryDao(settings, executorProvider)(system.executionContext, system) - override def createDurableStateDao(settings: R2dbcSettings, connectionFactory: ConnectionFactory)(implicit + override def createDurableStateDao(settings: R2dbcSettings, executorProvider: R2dbcExecutorProvider)(implicit system: ActorSystem[_]): DurableStateDao = - new PostgresDurableStateDao(settings, connectionFactory, this)(system.executionContext, system) + new PostgresDurableStateDao(settings, executorProvider, this)(system.executionContext, system) } diff --git a/core/src/main/scala/akka/persistence/r2dbc/internal/postgres/PostgresDurableStateDao.scala b/core/src/main/scala/akka/persistence/r2dbc/internal/postgres/PostgresDurableStateDao.scala index e133b2ff..ecebfc07 100644 --- a/core/src/main/scala/akka/persistence/r2dbc/internal/postgres/PostgresDurableStateDao.scala +++ b/core/src/main/scala/akka/persistence/r2dbc/internal/postgres/PostgresDurableStateDao.scala @@ -47,6 +47,7 @@ import akka.persistence.r2dbc.internal.JournalDao.SerializedJournalRow import akka.persistence.r2dbc.internal.codec.PayloadCodec.RichRow import akka.persistence.r2dbc.internal.codec.PayloadCodec.RichStatement import akka.persistence.r2dbc.internal.R2dbcExecutor +import akka.persistence.r2dbc.internal.R2dbcExecutorProvider import akka.persistence.r2dbc.internal.Sql.Interpolation import akka.persistence.r2dbc.internal.codec.PayloadCodec import akka.persistence.r2dbc.internal.codec.QueryAdapter @@ -85,7 +86,7 @@ private[r2dbc] object PostgresDurableStateDao { @InternalApi private[r2dbc] class PostgresDurableStateDao( settings: R2dbcSettings, - connectionFactory: ConnectionFactory, + executorProvider: R2dbcExecutorProvider, dialect: Dialect)(implicit ec: ExecutionContext, system: ActorSystem[_]) extends DurableStateDao { import DurableStateDao._ @@ -93,11 +94,7 @@ private[r2dbc] class PostgresDurableStateDao( protected def log: Logger = PostgresDurableStateDao.log private val persistenceExt = Persistence(system) - protected val r2dbcExecutor = new R2dbcExecutor( - connectionFactory, - log, - settings.logDbCallsExceeding, - settings.connectionFactorySettings.poolSettings.closeCallsExceeding)(ec, system) + protected val r2dbcExecutor = executorProvider.executorFor(slice = 0) // FIXME support data partitions private implicit val statePayloadCodec: PayloadCodec = settings.durableStatePayloadCodec implicit val timestampCodec: TimestampCodec = settings.timestampCodec @@ -105,7 +102,7 @@ private[r2dbc] class PostgresDurableStateDao( protected implicit val queryAdapter: QueryAdapter = settings.queryAdapter // used for change events - private lazy val journalDao: JournalDao = dialect.createJournalDao(settings, connectionFactory) + private lazy val journalDao: JournalDao = dialect.createJournalDao(settings, executorProvider) private lazy val additionalColumns: Map[String, immutable.IndexedSeq[AdditionalColumn[Any, Any]]] = { settings.durableStateAdditionalColumnClasses.map { case (entityType, columnClasses) => @@ -658,7 +655,7 @@ private[r2dbc] class PostgresDurableStateDao( result.map(_ => Done)(ExecutionContexts.parasitic) } - override def currentDbTimestamp(): Future[Instant] = { + override def currentDbTimestamp(slice: Int): Future[Instant] = { r2dbcExecutor .selectOne("select current db timestamp")( connection => connection.createStatement(currentDbTimestampSql), diff --git a/core/src/main/scala/akka/persistence/r2dbc/internal/postgres/PostgresJournalDao.scala b/core/src/main/scala/akka/persistence/r2dbc/internal/postgres/PostgresJournalDao.scala index b8eac49c..042373d3 100644 --- a/core/src/main/scala/akka/persistence/r2dbc/internal/postgres/PostgresJournalDao.scala +++ b/core/src/main/scala/akka/persistence/r2dbc/internal/postgres/PostgresJournalDao.scala @@ -22,7 +22,6 @@ import akka.persistence.r2dbc.internal.codec.TimestampCodec.TimestampCodecRichRo import akka.persistence.r2dbc.internal.codec.TimestampCodec.TimestampCodecRichStatement import akka.persistence.typed.PersistenceId import io.r2dbc.spi.Connection -import io.r2dbc.spi.ConnectionFactory import io.r2dbc.spi.Row import io.r2dbc.spi.Statement import org.slf4j.Logger @@ -31,8 +30,9 @@ import java.time.Instant import scala.concurrent.ExecutionContext import scala.concurrent.Future -import akka.persistence.r2dbc.internal.codec.PayloadCodec +import akka.persistence.r2dbc.internal.R2dbcExecutorProvider +import akka.persistence.r2dbc.internal.codec.PayloadCodec import akka.persistence.r2dbc.internal.codec.QueryAdapter import akka.persistence.r2dbc.internal.codec.SqlServerQueryAdapter @@ -63,7 +63,8 @@ private[r2dbc] object PostgresJournalDao { * Class for doing db interaction outside of an actor to avoid mistakes in future callbacks */ @InternalApi -private[r2dbc] class PostgresJournalDao(journalSettings: R2dbcSettings, connectionFactory: ConnectionFactory)(implicit +private[r2dbc] class PostgresJournalDao(journalSettings: R2dbcSettings, executorProvider: R2dbcExecutorProvider)( + implicit ec: ExecutionContext, system: ActorSystem[_]) extends JournalDao { @@ -72,13 +73,6 @@ private[r2dbc] class PostgresJournalDao(journalSettings: R2dbcSettings, connecti protected val persistenceExt: Persistence = Persistence(system) - protected val r2dbcExecutor = - new R2dbcExecutor( - connectionFactory, - log, - journalSettings.logDbCallsExceeding, - journalSettings.connectionFactorySettings.poolSettings.closeCallsExceeding)(ec, system) - protected def journalTable(slice: Int): String = journalSettings.journalTableWithSchema(slice) protected implicit val journalPayloadCodec: PayloadCodec = journalSettings.journalPayloadCodec protected implicit val tagsCodec: TagsCodec = journalSettings.tagsCodec @@ -158,6 +152,7 @@ private[r2dbc] class PostgresJournalDao(journalSettings: R2dbcSettings, connecti // it's always the same persistenceId for all events val persistenceId = events.head.persistenceId val slice = persistenceExt.sliceForPersistenceId(persistenceId) + val executor = executorProvider.executorFor(slice) val previousSeqNr = events.head.seqNr - 1 // The MigrationTool defines the dbTimestamp to preserve the original event timestamp @@ -169,7 +164,7 @@ private[r2dbc] class PostgresJournalDao(journalSettings: R2dbcSettings, connecti val totalEvents = events.size if (totalEvents == 1) { - val result = r2dbcExecutor.updateOneReturning(s"insert [$persistenceId]")( + val result = executor.updateOneReturning(s"insert [$persistenceId]")( connection => bindInsertStatement(connection.createStatement(insertSql), events.head, useTimestampFromDb, previousSeqNr), row => row.getTimestamp("db_timestamp")) @@ -179,7 +174,7 @@ private[r2dbc] class PostgresJournalDao(journalSettings: R2dbcSettings, connecti } result } else { - val result = r2dbcExecutor.updateInBatchReturning(s"batch insert [$persistenceId], [$totalEvents] events")( + val result = executor.updateInBatchReturning(s"batch insert [$persistenceId], [$totalEvents] events")( connection => events.foldLeft(connection.createStatement(insertSql)) { (stmt, write) => stmt.add() @@ -271,7 +266,8 @@ private[r2dbc] class PostgresJournalDao(journalSettings: R2dbcSettings, connecti override def readHighestSequenceNr(persistenceId: String, fromSequenceNr: Long): Future[Long] = { val slice = persistenceExt.sliceForPersistenceId(persistenceId) - val result = r2dbcExecutor + val executor = executorProvider.executorFor(slice) + val result = executor .select(s"select highest seqNr [$persistenceId]")( connection => connection @@ -292,7 +288,8 @@ private[r2dbc] class PostgresJournalDao(journalSettings: R2dbcSettings, connecti override def readLowestSequenceNr(persistenceId: String): Future[Long] = { val slice = persistenceExt.sliceForPersistenceId(persistenceId) - val result = r2dbcExecutor + val executor = executorProvider.executorFor(slice) + val result = executor .select(s"select lowest seqNr [$persistenceId]")( connection => connection @@ -327,6 +324,7 @@ private[r2dbc] class PostgresJournalDao(journalSettings: R2dbcSettings, connecti protected def bindTimestampNow(stmt: Statement, getAndIncIndex: () => Int): Statement = stmt override def deleteEventsTo(persistenceId: String, toSequenceNr: Long, resetSequenceNumber: Boolean): Future[Unit] = { val slice = persistenceExt.sliceForPersistenceId(persistenceId) + val executor = executorProvider.executorFor(slice) def insertDeleteMarkerStmt(deleteMarkerSeqNr: Long, connection: Connection): Statement = { val idx = Iterator.range(0, Int.MaxValue) @@ -349,7 +347,7 @@ private[r2dbc] class PostgresJournalDao(journalSettings: R2dbcSettings, connecti def deleteBatch(from: Long, to: Long, lastBatch: Boolean): Future[Unit] = { (if (lastBatch && !resetSequenceNumber) { - r2dbcExecutor + executor .update(s"delete [$persistenceId] and insert marker") { connection => Vector( connection.createStatement(deleteEventsSql(slice)).bind(0, persistenceId).bind(1, from).bind(2, to), @@ -357,7 +355,7 @@ private[r2dbc] class PostgresJournalDao(journalSettings: R2dbcSettings, connecti } .map(_.head) } else { - r2dbcExecutor + executor .updateOne(s"delete [$persistenceId]") { connection => connection.createStatement(deleteEventsSql(slice)).bind(0, persistenceId).bind(1, from).bind(2, to) } @@ -392,7 +390,8 @@ private[r2dbc] class PostgresJournalDao(journalSettings: R2dbcSettings, connecti override def deleteEventsBefore(persistenceId: String, timestamp: Instant): Future[Unit] = { val slice = persistenceExt.sliceForPersistenceId(persistenceId) - r2dbcExecutor + val executor = executorProvider.executorFor(slice) + executor .updateOne(s"delete [$persistenceId]") { connection => connection .createStatement(deleteEventsByPersistenceIdBeforeTimestampSql(slice)) @@ -405,7 +404,8 @@ private[r2dbc] class PostgresJournalDao(journalSettings: R2dbcSettings, connecti } override def deleteEventsBefore(entityType: String, slice: Int, timestamp: Instant): Future[Unit] = { - r2dbcExecutor + val executor = executorProvider.executorFor(slice) + executor .updateOne(s"delete [$entityType]") { connection => connection .createStatement(deleteEventsBySliceBeforeTimestampSql(slice)) diff --git a/core/src/main/scala/akka/persistence/r2dbc/internal/postgres/PostgresQueryDao.scala b/core/src/main/scala/akka/persistence/r2dbc/internal/postgres/PostgresQueryDao.scala index 3fd27850..f9ab6ce8 100644 --- a/core/src/main/scala/akka/persistence/r2dbc/internal/postgres/PostgresQueryDao.scala +++ b/core/src/main/scala/akka/persistence/r2dbc/internal/postgres/PostgresQueryDao.scala @@ -22,7 +22,6 @@ import akka.persistence.r2dbc.internal.InstantFactory import akka.persistence.r2dbc.internal.JournalDao.SerializedJournalRow import akka.persistence.r2dbc.internal.codec.PayloadCodec.RichRow import akka.persistence.r2dbc.internal.QueryDao -import akka.persistence.r2dbc.internal.R2dbcExecutor import akka.persistence.r2dbc.internal.Sql.Interpolation import akka.persistence.r2dbc.internal.codec.QueryAdapter import akka.persistence.r2dbc.internal.codec.TagsCodec @@ -32,12 +31,12 @@ import akka.persistence.r2dbc.internal.codec.TimestampCodec.TimestampCodecRichRo import akka.persistence.r2dbc.internal.codec.TimestampCodec.TimestampCodecRichStatement import akka.persistence.typed.PersistenceId import akka.stream.scaladsl.Source -import io.r2dbc.spi.ConnectionFactory import io.r2dbc.spi.Statement import org.slf4j.Logger import org.slf4j.LoggerFactory import akka.persistence.Persistence +import akka.persistence.r2dbc.internal.R2dbcExecutorProvider import akka.persistence.r2dbc.internal.codec.PayloadCodec /** @@ -52,7 +51,7 @@ private[r2dbc] object PostgresQueryDao { * INTERNAL API */ @InternalApi -private[r2dbc] class PostgresQueryDao(settings: R2dbcSettings, connectionFactory: ConnectionFactory)(implicit +private[r2dbc] class PostgresQueryDao(settings: R2dbcSettings, executorProvider: R2dbcExecutorProvider)(implicit ec: ExecutionContext, system: ActorSystem[_]) extends QueryDao { @@ -155,7 +154,7 @@ private[r2dbc] class PostgresQueryDao(settings: R2dbcSettings, connectionFactory protected def allPersistenceIdsSql = { // FIXME require( - settings.journalTableDataPartitions == 1, + settings.numberOfDataPartitions == 1, "allPersistenceIdsSql not implemented for more than one data-partition yet") sql"SELECT DISTINCT(persistence_id) from ${journalTable(0)} ORDER BY persistence_id LIMIT ?" } @@ -163,7 +162,7 @@ private[r2dbc] class PostgresQueryDao(settings: R2dbcSettings, connectionFactory protected def persistenceIdsForEntityTypeSql = { // FIXME require( - settings.journalTableDataPartitions == 1, + settings.numberOfDataPartitions == 1, "persistenceIdsForEntityTypeSql not implemented for more than one data-partition yet") sql"SELECT DISTINCT(persistence_id) from ${journalTable(0)} WHERE persistence_id LIKE ? ORDER BY persistence_id LIMIT ?" } @@ -171,7 +170,7 @@ private[r2dbc] class PostgresQueryDao(settings: R2dbcSettings, connectionFactory protected def allPersistenceIdsAfterSql = { // FIXME require( - settings.journalTableDataPartitions == 1, + settings.numberOfDataPartitions == 1, "allPersistenceIdsAfterSql not implemented for more than one data-partition yet") sql"SELECT DISTINCT(persistence_id) from ${journalTable(0)} WHERE persistence_id > ? ORDER BY persistence_id LIMIT ?" } @@ -179,19 +178,14 @@ private[r2dbc] class PostgresQueryDao(settings: R2dbcSettings, connectionFactory protected def persistenceIdsForEntityTypeAfterSql = { // FIXME require( - settings.journalTableDataPartitions == 1, + settings.numberOfDataPartitions == 1, "persistenceIdsForEntityTypeAfterSql not implemented for more than one data-partition yet") sql"SELECT DISTINCT(persistence_id) from ${journalTable(0)} WHERE persistence_id LIKE ? AND persistence_id > ? ORDER BY persistence_id LIMIT ?" } - protected val r2dbcExecutor = new R2dbcExecutor( - connectionFactory, - log, - settings.logDbCallsExceeding, - settings.connectionFactorySettings.poolSettings.closeCallsExceeding)(ec, system) - - def currentDbTimestamp(): Future[Instant] = { - r2dbcExecutor + override def currentDbTimestamp(slice: Int): Future[Instant] = { + val executor = executorProvider.executorFor(slice) + executor .selectOne("select current db timestamp")( connection => connection.createStatement(currentDbTimestampSql), row => row.getTimestamp("db_timestamp")) @@ -227,11 +221,12 @@ private[r2dbc] class PostgresQueryDao(settings: R2dbcSettings, connectionFactory behindCurrentTime: FiniteDuration, backtracking: Boolean): Source[SerializedJournalRow, NotUsed] = { - if (!settings.isJournalSliceRangeWithinSameDataPartition(minSlice, maxSlice)) + if (!settings.isSliceRangeWithinSameDataPartition(minSlice, maxSlice)) throw new IllegalArgumentException( s"Slice range [$minSlice-$maxSlice] spans over more than one " + - s"of the [${settings.journalTableDataPartitions}] data partitions.") - val result = r2dbcExecutor.select(s"select eventsBySlices [$minSlice - $maxSlice]")( + s"of the [${settings.numberOfDataPartitions}] data partitions.") + val executor = executorProvider.executorFor(minSlice) + val result = executor.select(s"select eventsBySlices [$minSlice - $maxSlice]")( connection => { val stmt = connection .createStatement( @@ -298,6 +293,7 @@ private[r2dbc] class PostgresQueryDao(settings: R2dbcSettings, connectionFactory maxSlice: Int, fromTimestamp: Instant, limit: Int): Future[Seq[Bucket]] = { + val executor = executorProvider.executorFor(minSlice) val toTimestamp = { val now = InstantFactory.now() // not important to use database time @@ -310,7 +306,7 @@ private[r2dbc] class PostgresQueryDao(settings: R2dbcSettings, connectionFactory } } - val result = r2dbcExecutor.select(s"select bucket counts [$minSlice - $maxSlice]")( + val result = executor.select(s"select bucket counts [$minSlice - $maxSlice]")( connection => { val stmt = connection.createStatement(selectBucketsSql(minSlice, maxSlice)) bindSelectBucketsSql(stmt, entityType, fromTimestamp, toTimestamp, limit) @@ -334,7 +330,8 @@ private[r2dbc] class PostgresQueryDao(settings: R2dbcSettings, connectionFactory override def timestampOfEvent(persistenceId: String, seqNr: Long): Future[Option[Instant]] = { val slice = persistenceExt.sliceForPersistenceId(persistenceId) - r2dbcExecutor.selectOne("select timestampOfEvent")( + val executor = executorProvider.executorFor(slice) + executor.selectOne("select timestampOfEvent")( connection => connection .createStatement(selectTimestampOfEventSql(slice)) @@ -348,7 +345,8 @@ private[r2dbc] class PostgresQueryDao(settings: R2dbcSettings, connectionFactory seqNr: Long, includePayload: Boolean): Future[Option[SerializedJournalRow]] = { val slice = persistenceExt.sliceForPersistenceId(persistenceId) - r2dbcExecutor.selectOne("select one event")( + val executor = executorProvider.executorFor(slice) + executor.selectOne("select one event")( connection => { val selectSql = if (includePayload) selectOneEventSql(slice) else selectOneEventWithoutPayloadSql(slice) connection @@ -382,7 +380,8 @@ private[r2dbc] class PostgresQueryDao(settings: R2dbcSettings, connectionFactory fromSequenceNr: Long, toSequenceNr: Long): Source[SerializedJournalRow, NotUsed] = { val slice = persistenceExt.sliceForPersistenceId(persistenceId) - val result = r2dbcExecutor.select(s"select eventsByPersistenceId [$persistenceId]")( + val executor = executorProvider.executorFor(slice) + val result = executor.select(s"select eventsByPersistenceId [$persistenceId]")( connection => { val stmt = connection.createStatement(selectEventsSql(slice)) bindSelectEventsSql(stmt, persistenceId, fromSequenceNr, toSequenceNr, settings.querySettings.bufferSize) @@ -432,7 +431,10 @@ private[r2dbc] class PostgresQueryDao(settings: R2dbcSettings, connectionFactory override def persistenceIds(entityType: String, afterId: Option[String], limit: Long): Source[String, NotUsed] = { val likeStmtPostfix = PersistenceId.DefaultSeparator + "%" - val result = r2dbcExecutor.select(s"select persistenceIds by entity type")( + // FIXME + require(settings.numberOfDataPartitions == 1, "persistenceIds not implemented for more than one data-partition yet") + val executor = executorProvider.executorFor(slice = 0) + val result = executor.select(s"select persistenceIds by entity type")( connection => afterId match { case Some(after) => @@ -459,7 +461,10 @@ private[r2dbc] class PostgresQueryDao(settings: R2dbcSettings, connectionFactory } override def persistenceIds(afterId: Option[String], limit: Long): Source[String, NotUsed] = { - val result = r2dbcExecutor.select(s"select persistenceIds")( + // FIXME + require(settings.numberOfDataPartitions == 1, "persistenceIds not implemented for more than one data-partition yet") + val executor = executorProvider.executorFor(slice = 0) + val result = executor.select(s"select persistenceIds")( connection => afterId match { case Some(after) => diff --git a/core/src/main/scala/akka/persistence/r2dbc/internal/postgres/PostgresSnapshotDao.scala b/core/src/main/scala/akka/persistence/r2dbc/internal/postgres/PostgresSnapshotDao.scala index 667cb930..7441fdb9 100644 --- a/core/src/main/scala/akka/persistence/r2dbc/internal/postgres/PostgresSnapshotDao.scala +++ b/core/src/main/scala/akka/persistence/r2dbc/internal/postgres/PostgresSnapshotDao.scala @@ -29,6 +29,7 @@ import akka.persistence.r2dbc.internal.InstantFactory import akka.persistence.r2dbc.internal.codec.PayloadCodec.RichRow import akka.persistence.r2dbc.internal.codec.PayloadCodec.RichStatement import akka.persistence.r2dbc.internal.R2dbcExecutor +import akka.persistence.r2dbc.internal.R2dbcExecutorProvider import akka.persistence.r2dbc.internal.SnapshotDao import akka.persistence.r2dbc.internal.Sql.Interpolation import akka.persistence.r2dbc.internal.codec.PayloadCodec @@ -53,7 +54,7 @@ private[r2dbc] object PostgresSnapshotDao { * INTERNAL API */ @InternalApi -private[r2dbc] class PostgresSnapshotDao(settings: R2dbcSettings, connectionFactory: ConnectionFactory)(implicit +private[r2dbc] class PostgresSnapshotDao(settings: R2dbcSettings, executorProvider: R2dbcExecutorProvider)(implicit ec: ExecutionContext, system: ActorSystem[_]) extends SnapshotDao { @@ -68,11 +69,7 @@ private[r2dbc] class PostgresSnapshotDao(settings: R2dbcSettings, connectionFact protected implicit val tagsCodec: TagsCodec = settings.tagsCodec protected implicit val queryAdapter: QueryAdapter = settings.queryAdapter - protected val r2dbcExecutor = new R2dbcExecutor( - connectionFactory, - log, - settings.logDbCallsExceeding, - settings.connectionFactorySettings.poolSettings.closeCallsExceeding)(ec, system) + protected val r2dbcExecutor = executorProvider.executorFor(slice = 0) // FIXME support data partitions protected def createUpsertSql: String = { // db_timestamp and tags columns were added in 1.2.0 @@ -350,7 +347,7 @@ private[r2dbc] class PostgresSnapshotDao(settings: R2dbcSettings, connectionFact /** * This is used from `BySliceQuery`, i.e. only if settings.querySettings.startFromSnapshotEnabled */ - override def currentDbTimestamp(): Future[Instant] = { + override def currentDbTimestamp(slice: Int): Future[Instant] = { r2dbcExecutor .selectOne("select current db timestamp")( connection => connection.createStatement(currentDbTimestampSql), diff --git a/core/src/main/scala/akka/persistence/r2dbc/internal/postgres/YugabyteDialect.scala b/core/src/main/scala/akka/persistence/r2dbc/internal/postgres/YugabyteDialect.scala index af620230..5f76925b 100644 --- a/core/src/main/scala/akka/persistence/r2dbc/internal/postgres/YugabyteDialect.scala +++ b/core/src/main/scala/akka/persistence/r2dbc/internal/postgres/YugabyteDialect.scala @@ -15,6 +15,8 @@ import akka.persistence.r2dbc.internal.SnapshotDao import com.typesafe.config.Config import io.r2dbc.spi.ConnectionFactory +import akka.persistence.r2dbc.internal.R2dbcExecutorProvider + /** * INTERNAL API */ @@ -26,19 +28,19 @@ private[r2dbc] object YugabyteDialect extends Dialect { override def createConnectionFactory(config: Config): ConnectionFactory = PostgresDialect.createConnectionFactory(config) - override def createJournalDao(settings: R2dbcSettings, connectionFactory: ConnectionFactory)(implicit + override def createJournalDao(settings: R2dbcSettings, executorProvider: R2dbcExecutorProvider)(implicit system: ActorSystem[_]): JournalDao = - new PostgresJournalDao(settings, connectionFactory)(system.executionContext, system) + new PostgresJournalDao(settings, executorProvider)(system.executionContext, system) - override def createSnapshotDao(settings: R2dbcSettings, connectionFactory: ConnectionFactory)(implicit + override def createSnapshotDao(settings: R2dbcSettings, executorProvider: R2dbcExecutorProvider)(implicit system: ActorSystem[_]): SnapshotDao = - new YugabyteSnapshotDao(settings, connectionFactory)(system.executionContext, system) + new YugabyteSnapshotDao(settings, executorProvider)(system.executionContext, system) - override def createQueryDao(settings: R2dbcSettings, connectionFactory: ConnectionFactory)(implicit + override def createQueryDao(settings: R2dbcSettings, executorProvider: R2dbcExecutorProvider)(implicit system: ActorSystem[_]): QueryDao = - new YugabyteQueryDao(settings, connectionFactory)(system.executionContext, system) + new YugabyteQueryDao(settings, executorProvider)(system.executionContext, system) - override def createDurableStateDao(settings: R2dbcSettings, connectionFactory: ConnectionFactory)(implicit + override def createDurableStateDao(settings: R2dbcSettings, executorProvider: R2dbcExecutorProvider)(implicit system: ActorSystem[_]): DurableStateDao = - new YugabyteDurableStateDao(settings, connectionFactory, this)(system.executionContext, system) + new YugabyteDurableStateDao(settings, executorProvider, this)(system.executionContext, system) } diff --git a/core/src/main/scala/akka/persistence/r2dbc/internal/postgres/YugabyteDurableStateDao.scala b/core/src/main/scala/akka/persistence/r2dbc/internal/postgres/YugabyteDurableStateDao.scala index 9a9ba030..ceb1630c 100644 --- a/core/src/main/scala/akka/persistence/r2dbc/internal/postgres/YugabyteDurableStateDao.scala +++ b/core/src/main/scala/akka/persistence/r2dbc/internal/postgres/YugabyteDurableStateDao.scala @@ -6,7 +6,6 @@ package akka.persistence.r2dbc.internal.postgres import scala.concurrent.ExecutionContext -import io.r2dbc.spi._ import org.slf4j.Logger import org.slf4j.LoggerFactory @@ -14,6 +13,7 @@ import akka.actor.typed.ActorSystem import akka.annotation.InternalApi import akka.persistence.r2dbc.R2dbcSettings import akka.persistence.r2dbc.internal.Dialect +import akka.persistence.r2dbc.internal.R2dbcExecutorProvider /** * INTERNAL API @@ -21,9 +21,9 @@ import akka.persistence.r2dbc.internal.Dialect @InternalApi private[r2dbc] final class YugabyteDurableStateDao( settings: R2dbcSettings, - connectionFactory: ConnectionFactory, + executorProvider: R2dbcExecutorProvider, dialect: Dialect)(implicit ec: ExecutionContext, system: ActorSystem[_]) - extends PostgresDurableStateDao(settings, connectionFactory, dialect) { + extends PostgresDurableStateDao(settings, executorProvider, dialect) { override protected lazy val log: Logger = LoggerFactory.getLogger(classOf[YugabyteDurableStateDao]) diff --git a/core/src/main/scala/akka/persistence/r2dbc/internal/postgres/YugabyteQueryDao.scala b/core/src/main/scala/akka/persistence/r2dbc/internal/postgres/YugabyteQueryDao.scala index 875e9a97..684133ab 100644 --- a/core/src/main/scala/akka/persistence/r2dbc/internal/postgres/YugabyteQueryDao.scala +++ b/core/src/main/scala/akka/persistence/r2dbc/internal/postgres/YugabyteQueryDao.scala @@ -13,14 +13,16 @@ import io.r2dbc.spi.ConnectionFactory import org.slf4j.Logger import org.slf4j.LoggerFactory +import akka.persistence.r2dbc.internal.R2dbcExecutorProvider + /** * INTERNAL API */ @InternalApi -private[r2dbc] final class YugabyteQueryDao(settings: R2dbcSettings, connectionFactory: ConnectionFactory)(implicit +private[r2dbc] final class YugabyteQueryDao(settings: R2dbcSettings, executorProvider: R2dbcExecutorProvider)(implicit ec: ExecutionContext, system: ActorSystem[_]) - extends PostgresQueryDao(settings, connectionFactory) { + extends PostgresQueryDao(settings, executorProvider) { override protected lazy val log: Logger = LoggerFactory.getLogger(classOf[YugabyteQueryDao]) diff --git a/core/src/main/scala/akka/persistence/r2dbc/internal/postgres/YugabyteSnapshotDao.scala b/core/src/main/scala/akka/persistence/r2dbc/internal/postgres/YugabyteSnapshotDao.scala index 28936f70..348dc9e9 100644 --- a/core/src/main/scala/akka/persistence/r2dbc/internal/postgres/YugabyteSnapshotDao.scala +++ b/core/src/main/scala/akka/persistence/r2dbc/internal/postgres/YugabyteSnapshotDao.scala @@ -13,14 +13,17 @@ import io.r2dbc.spi.ConnectionFactory import org.slf4j.Logger import org.slf4j.LoggerFactory +import akka.persistence.r2dbc.internal.R2dbcExecutorProvider + /** * INTERNAL API */ @InternalApi -private[r2dbc] final class YugabyteSnapshotDao(settings: R2dbcSettings, connectionFactory: ConnectionFactory)(implicit +private[r2dbc] final class YugabyteSnapshotDao(settings: R2dbcSettings, executorProvider: R2dbcExecutorProvider)( + implicit ec: ExecutionContext, system: ActorSystem[_]) - extends PostgresSnapshotDao(settings, connectionFactory) { + extends PostgresSnapshotDao(settings, executorProvider) { override protected lazy val log: Logger = LoggerFactory.getLogger(classOf[YugabyteSnapshotDao]) diff --git a/core/src/main/scala/akka/persistence/r2dbc/internal/sqlserver/SqlServerDialect.scala b/core/src/main/scala/akka/persistence/r2dbc/internal/sqlserver/SqlServerDialect.scala index 43bfe2e7..8bb9f0ad 100644 --- a/core/src/main/scala/akka/persistence/r2dbc/internal/sqlserver/SqlServerDialect.scala +++ b/core/src/main/scala/akka/persistence/r2dbc/internal/sqlserver/SqlServerDialect.scala @@ -23,6 +23,8 @@ import io.r2dbc.spi.ConnectionFactories import io.r2dbc.spi.ConnectionFactory import io.r2dbc.spi.ConnectionFactoryOptions +import akka.persistence.r2dbc.internal.R2dbcExecutorProvider + /** * INTERNAL API */ @@ -83,19 +85,19 @@ private[r2dbc] object SqlServerDialect extends Dialect { .build()) } - override def createJournalDao(settings: R2dbcSettings, connectionFactory: ConnectionFactory)(implicit + override def createJournalDao(settings: R2dbcSettings, executorProvider: R2dbcExecutorProvider)(implicit system: ActorSystem[_]): JournalDao = - new SqlServerJournalDao(settings, connectionFactory)(system.executionContext, system) + new SqlServerJournalDao(settings, executorProvider)(system.executionContext, system) - override def createQueryDao(settings: R2dbcSettings, connectionFactory: ConnectionFactory)(implicit + override def createQueryDao(settings: R2dbcSettings, executorProvider: R2dbcExecutorProvider)(implicit system: ActorSystem[_]): QueryDao = - new SqlServerQueryDao(settings, connectionFactory)(system.executionContext, system) + new SqlServerQueryDao(settings, executorProvider)(system.executionContext, system) - override def createSnapshotDao(settings: R2dbcSettings, connectionFactory: ConnectionFactory)(implicit + override def createSnapshotDao(settings: R2dbcSettings, executorProvider: R2dbcExecutorProvider)(implicit system: ActorSystem[_]): SnapshotDao = - new SqlServerSnapshotDao(settings, connectionFactory)(system.executionContext, system) + new SqlServerSnapshotDao(settings, executorProvider)(system.executionContext, system) - override def createDurableStateDao(settings: R2dbcSettings, connectionFactory: ConnectionFactory)(implicit + override def createDurableStateDao(settings: R2dbcSettings, executorProvider: R2dbcExecutorProvider)(implicit system: ActorSystem[_]): DurableStateDao = - new SqlServerDurableStateDao(settings, connectionFactory, this)(system.executionContext, system) + new SqlServerDurableStateDao(settings, executorProvider, this)(system.executionContext, system) } diff --git a/core/src/main/scala/akka/persistence/r2dbc/internal/sqlserver/SqlServerDurableStateDao.scala b/core/src/main/scala/akka/persistence/r2dbc/internal/sqlserver/SqlServerDurableStateDao.scala index c61c6f42..70963ce8 100644 --- a/core/src/main/scala/akka/persistence/r2dbc/internal/sqlserver/SqlServerDurableStateDao.scala +++ b/core/src/main/scala/akka/persistence/r2dbc/internal/sqlserver/SqlServerDurableStateDao.scala @@ -20,12 +20,12 @@ import akka.persistence.r2dbc.internal.Sql.Interpolation import akka.persistence.r2dbc.internal.codec.TimestampCodec.TimestampCodecRichStatement import akka.persistence.r2dbc.internal.postgres.PostgresDurableStateDao import akka.persistence.r2dbc.internal.postgres.PostgresDurableStateDao.EvaluatedAdditionalColumnBindings -import io.r2dbc.spi.ConnectionFactory import io.r2dbc.spi.Statement import org.slf4j.Logger import org.slf4j.LoggerFactory import akka.persistence.r2dbc.internal.InstantFactory +import akka.persistence.r2dbc.internal.R2dbcExecutorProvider /** * INTERNAL API @@ -41,9 +41,9 @@ private[r2dbc] object SqlServerDurableStateDao { @InternalApi private[r2dbc] class SqlServerDurableStateDao( settings: R2dbcSettings, - connectionFactory: ConnectionFactory, + executorProvider: R2dbcExecutorProvider, dialect: Dialect)(implicit ec: ExecutionContext, system: ActorSystem[_]) - extends PostgresDurableStateDao(settings, connectionFactory, dialect) { + extends PostgresDurableStateDao(settings, executorProvider, dialect) { require(settings.useAppTimestamp, "SqlServer requires akka.persistence.r2dbc.use-app-timestamp=on") @@ -194,6 +194,6 @@ private[r2dbc] class SqlServerDurableStateDao( .bind("@persistenceId", after) .bind("@limit", limit) - override def currentDbTimestamp(): Future[Instant] = Future.successful(InstantFactory.now()) + override def currentDbTimestamp(slice: Int): Future[Instant] = Future.successful(InstantFactory.now()) } diff --git a/core/src/main/scala/akka/persistence/r2dbc/internal/sqlserver/SqlServerJournalDao.scala b/core/src/main/scala/akka/persistence/r2dbc/internal/sqlserver/SqlServerJournalDao.scala index bb10ece3..85d3de5d 100644 --- a/core/src/main/scala/akka/persistence/r2dbc/internal/sqlserver/SqlServerJournalDao.scala +++ b/core/src/main/scala/akka/persistence/r2dbc/internal/sqlserver/SqlServerJournalDao.scala @@ -18,6 +18,7 @@ import org.slf4j.Logger import org.slf4j.LoggerFactory import akka.persistence.r2dbc.internal.InstantFactory +import akka.persistence.r2dbc.internal.R2dbcExecutorProvider /** * INTERNAL API @@ -32,10 +33,10 @@ private[r2dbc] object SqlServerJournalDao { * INTERNAL API */ @InternalApi -private[r2dbc] class SqlServerJournalDao(settings: R2dbcSettings, connectionFactory: ConnectionFactory)(implicit +private[r2dbc] class SqlServerJournalDao(settings: R2dbcSettings, executorProvider: R2dbcExecutorProvider)(implicit ec: ExecutionContext, system: ActorSystem[_]) - extends PostgresJournalDao(settings, connectionFactory) { + extends PostgresJournalDao(settings, executorProvider) { require(settings.useAppTimestamp, "SqlServer requires akka.persistence.r2dbc.use-app-timestamp=on") require( diff --git a/core/src/main/scala/akka/persistence/r2dbc/internal/sqlserver/SqlServerQueryDao.scala b/core/src/main/scala/akka/persistence/r2dbc/internal/sqlserver/SqlServerQueryDao.scala index ca2fe00d..f5047468 100644 --- a/core/src/main/scala/akka/persistence/r2dbc/internal/sqlserver/SqlServerQueryDao.scala +++ b/core/src/main/scala/akka/persistence/r2dbc/internal/sqlserver/SqlServerQueryDao.scala @@ -27,6 +27,8 @@ import io.r2dbc.spi.Statement import org.slf4j.Logger import org.slf4j.LoggerFactory +import akka.persistence.r2dbc.internal.R2dbcExecutorProvider + /** * INTERNAL API */ @@ -40,10 +42,10 @@ private[r2dbc] object SqlServerQueryDao { * INTERNAL API */ @InternalApi -private[r2dbc] class SqlServerQueryDao(settings: R2dbcSettings, connectionFactory: ConnectionFactory)(implicit +private[r2dbc] class SqlServerQueryDao(settings: R2dbcSettings, executorProvider: R2dbcExecutorProvider)(implicit ec: ExecutionContext, system: ActorSystem[_]) - extends PostgresQueryDao(settings, connectionFactory) { + extends PostgresQueryDao(settings, executorProvider) { override def sqlFalse = "0" @@ -151,7 +153,7 @@ private[r2dbc] class SqlServerQueryDao(settings: R2dbcSettings, connectionFactor override protected def persistenceIdsForEntityTypeAfterSql: String = { // FIXME require( - settings.journalTableDataPartitions == 1, + settings.numberOfDataPartitions == 1, "persistenceIdsForEntityTypeAfterSql not implemented for more than one data-partition yet") sql""" SELECT TOP(@limit) persistence_id FROM ( @@ -174,7 +176,7 @@ private[r2dbc] class SqlServerQueryDao(settings: R2dbcSettings, connectionFactor override protected def persistenceIdsForEntityTypeSql: String = { // FIXME require( - settings.journalTableDataPartitions == 1, + settings.numberOfDataPartitions == 1, "persistenceIdsForEntityTypeSql not implemented for more than one data-partition yet") sql""" SELECT TOP(@limit) persistence_id FROM ( @@ -203,7 +205,7 @@ private[r2dbc] class SqlServerQueryDao(settings: R2dbcSettings, connectionFactor override protected def allPersistenceIdsAfterSql: String = { // FIXME require( - settings.journalTableDataPartitions == 1, + settings.numberOfDataPartitions == 1, "allPersistenceIdsAfterSql not implemented for more than one data-partition yet") sql""" SELECT TOP(@limit) persistence_id FROM ( @@ -214,11 +216,11 @@ private[r2dbc] class SqlServerQueryDao(settings: R2dbcSettings, connectionFactor override protected def allPersistenceIdsSql: String = { // FIXME require( - settings.journalTableDataPartitions == 1, + settings.numberOfDataPartitions == 1, "allPersistenceIdsSql not implemented for more than one data-partition yet") sql"SELECT TOP(@limit) persistence_id FROM (SELECT DISTINCT(persistence_id) from ${journalTable(0)} as sub ORDER BY persistence_id" } - override def currentDbTimestamp(): Future[Instant] = Future.successful(InstantFactory.now()) + override def currentDbTimestamp(slice: Int): Future[Instant] = Future.successful(InstantFactory.now()) } diff --git a/core/src/main/scala/akka/persistence/r2dbc/internal/sqlserver/SqlServerSnapshotDao.scala b/core/src/main/scala/akka/persistence/r2dbc/internal/sqlserver/SqlServerSnapshotDao.scala index 06056224..970027ec 100644 --- a/core/src/main/scala/akka/persistence/r2dbc/internal/sqlserver/SqlServerSnapshotDao.scala +++ b/core/src/main/scala/akka/persistence/r2dbc/internal/sqlserver/SqlServerSnapshotDao.scala @@ -20,12 +20,12 @@ import akka.persistence.r2dbc.internal.Sql.Interpolation import akka.persistence.r2dbc.internal.codec.TagsCodec.TagsCodecRichStatement import akka.persistence.r2dbc.internal.codec.TimestampCodec.TimestampCodecRichStatement import akka.persistence.r2dbc.internal.postgres.PostgresSnapshotDao -import io.r2dbc.spi.ConnectionFactory import io.r2dbc.spi.Statement import org.slf4j.Logger import org.slf4j.LoggerFactory import akka.persistence.r2dbc.internal.InstantFactory +import akka.persistence.r2dbc.internal.R2dbcExecutorProvider /** * INTERNAL API @@ -39,10 +39,10 @@ private[r2dbc] object SqlServerSnapshotDao { * INTERNAL API */ @InternalApi -private[r2dbc] class SqlServerSnapshotDao(settings: R2dbcSettings, connectionFactory: ConnectionFactory)(implicit +private[r2dbc] class SqlServerSnapshotDao(settings: R2dbcSettings, executorProvider: R2dbcExecutorProvider)(implicit ec: ExecutionContext, system: ActorSystem[_]) - extends PostgresSnapshotDao(settings, connectionFactory) { + extends PostgresSnapshotDao(settings, executorProvider) { override def log: Logger = SqlServerSnapshotDao.log @@ -205,6 +205,6 @@ private[r2dbc] class SqlServerSnapshotDao(settings: R2dbcSettings, connectionFac ORDER BY db_timestamp, seq_nr """ - override def currentDbTimestamp(): Future[Instant] = Future.successful(InstantFactory.now()) + override def currentDbTimestamp(slice: Int): Future[Instant] = Future.successful(InstantFactory.now()) } diff --git a/core/src/main/scala/akka/persistence/r2dbc/journal/R2dbcJournal.scala b/core/src/main/scala/akka/persistence/r2dbc/journal/R2dbcJournal.scala index fd3c9725..5edbe1c1 100644 --- a/core/src/main/scala/akka/persistence/r2dbc/journal/R2dbcJournal.scala +++ b/core/src/main/scala/akka/persistence/r2dbc/journal/R2dbcJournal.scala @@ -27,7 +27,6 @@ import akka.persistence.SerializedEvent import akka.persistence.journal.AsyncWriteJournal import akka.persistence.journal.Tagged import akka.persistence.query.PersistenceQuery -import akka.persistence.r2dbc.ConnectionFactoryProvider import akka.persistence.r2dbc.R2dbcSettings import akka.persistence.r2dbc.internal.InstantFactory import akka.persistence.r2dbc.internal.JournalDao @@ -41,6 +40,9 @@ import akka.serialization.SerializationExtension import akka.serialization.Serializers import akka.stream.scaladsl.Sink import com.typesafe.config.Config +import org.slf4j.LoggerFactory + +import akka.persistence.r2dbc.internal.R2dbcExecutorProvider /** * INTERNAL API @@ -89,16 +91,17 @@ private[r2dbc] final class R2dbcJournal(config: Config, cfgPath: String) extends private val sharedConfigPath = cfgPath.replaceAll("""\.journal$""", "") private val serialization: Serialization = SerializationExtension(context.system) - private val journalSettings = R2dbcSettings(context.system.settings.config.getConfig(sharedConfigPath)) - log.debug("R2DBC journal starting up with dialect [{}]", journalSettings.dialectName) + private val settings = R2dbcSettings(context.system.settings.config.getConfig(sharedConfigPath)) + log.debug("R2DBC journal starting up with dialect [{}]", settings.dialectName) - private val journalDao = journalSettings.connectionFactorySettings.dialect.createJournalDao( - journalSettings, - ConnectionFactoryProvider(system).connectionFactoryFor(sharedConfigPath + ".connection-factory")) + private val executorProvider = + new R2dbcExecutorProvider(settings, sharedConfigPath + ".connection-factory", LoggerFactory.getLogger(getClass)) + private val journalDao = + settings.connectionFactorySettings.dialect.createJournalDao(settings, executorProvider) private val query = PersistenceQuery(system).readJournalFor[R2dbcReadJournal](sharedConfigPath + ".query") private val pubSub: Option[PubSub] = - if (journalSettings.journalPublishEvents) Some(PubSub(system)) + if (settings.journalPublishEvents) Some(PubSub(system)) else None // if there are pending writes when an actor restarts we must wait for @@ -111,7 +114,7 @@ private[r2dbc] final class R2dbcJournal(config: Config, cfgPath: String) extends override def asyncWriteMessages(messages: immutable.Seq[AtomicWrite]): Future[immutable.Seq[Try[Unit]]] = { def atomicWrite(atomicWrite: AtomicWrite): Future[Instant] = { - val timestamp = if (journalSettings.useAppTimestamp) InstantFactory.now() else JournalDao.EmptyDbTimestamp + val timestamp = if (settings.useAppTimestamp) InstantFactory.now() else JournalDao.EmptyDbTimestamp val serialized: Try[Seq[SerializedJournalRow]] = Try { atomicWrite.payload.map { pr => val (event, tags) = pr.payload match { diff --git a/core/src/main/scala/akka/persistence/r2dbc/query/scaladsl/R2dbcReadJournal.scala b/core/src/main/scala/akka/persistence/r2dbc/query/scaladsl/R2dbcReadJournal.scala index 73f703a9..9a650b94 100644 --- a/core/src/main/scala/akka/persistence/r2dbc/query/scaladsl/R2dbcReadJournal.scala +++ b/core/src/main/scala/akka/persistence/r2dbc/query/scaladsl/R2dbcReadJournal.scala @@ -39,7 +39,6 @@ import akka.persistence.query.typed.scaladsl.EventsBySliceQuery import akka.persistence.query.typed.scaladsl.EventsBySliceStartingFromSnapshotsQuery import akka.persistence.query.typed.scaladsl.LoadEventQuery import akka.persistence.query.{ EventEnvelope => ClassicEventEnvelope } -import akka.persistence.r2dbc.ConnectionFactoryProvider import akka.persistence.r2dbc.R2dbcSettings import akka.persistence.r2dbc.internal.BySliceQuery import akka.persistence.r2dbc.internal.ContinuousQuery @@ -56,6 +55,8 @@ import akka.stream.scaladsl.Source import com.typesafe.config.Config import org.slf4j.LoggerFactory +import akka.persistence.r2dbc.internal.R2dbcExecutorProvider + object R2dbcReadJournal { val Identifier = "akka.persistence.r2dbc.query" @@ -92,12 +93,16 @@ final class R2dbcReadJournal(system: ExtendedActorSystem, config: Config, cfgPat import typedSystem.executionContext private val serialization = SerializationExtension(system) private val persistenceExt = Persistence(system) - private val connectionFactory = ConnectionFactoryProvider(typedSystem) - .connectionFactoryFor(sharedConfigPath + ".connection-factory") + private val executorProvider = + new R2dbcExecutorProvider(settings, sharedConfigPath + ".connection-factory", LoggerFactory.getLogger(getClass))( + typedSystem.executionContext, + typedSystem) + private val journalDao = + settings.connectionFactorySettings.dialect.createJournalDao(settings, executorProvider)(typedSystem) private val queryDao = - settings.connectionFactorySettings.dialect.createQueryDao(settings, connectionFactory)(typedSystem) + settings.connectionFactorySettings.dialect.createQueryDao(settings, executorProvider)(typedSystem) private lazy val snapshotDao = - settings.connectionFactorySettings.dialect.createSnapshotDao(settings, connectionFactory)(typedSystem) + settings.connectionFactorySettings.dialect.createSnapshotDao(settings, executorProvider)(typedSystem) private val filteredPayloadSerId = SerializationExtension(system).findSerializerFor(FilteredPayload).identifier @@ -166,9 +171,6 @@ final class R2dbcReadJournal(system: ExtendedActorSystem, config: Config, cfgPat tags = row.tags) } - private val journalDao = - settings.connectionFactorySettings.dialect.createJournalDao(settings, connectionFactory)(typedSystem) - def extractEntityTypeFromPersistenceId(persistenceId: String): String = PersistenceId.extractEntityType(persistenceId) diff --git a/core/src/main/scala/akka/persistence/r2dbc/session/scaladsl/R2dbcSession.scala b/core/src/main/scala/akka/persistence/r2dbc/session/scaladsl/R2dbcSession.scala index af6c1ef0..f15fbfab 100644 --- a/core/src/main/scala/akka/persistence/r2dbc/session/scaladsl/R2dbcSession.scala +++ b/core/src/main/scala/akka/persistence/r2dbc/session/scaladsl/R2dbcSession.scala @@ -41,6 +41,7 @@ object R2dbcSession { .connectionFactorySettingsFor(connectionFactoryConfigPath) .poolSettings .closeCallsExceeding + // FIXME support data partition? val r2dbcExecutor = new R2dbcExecutor(connectionFactory, log, logDbCallsDisabled, closeCallsExceeding)( system.executionContext, diff --git a/core/src/main/scala/akka/persistence/r2dbc/snapshot/R2dbcSnapshotStore.scala b/core/src/main/scala/akka/persistence/r2dbc/snapshot/R2dbcSnapshotStore.scala index 574fdb36..6bcce1e2 100644 --- a/core/src/main/scala/akka/persistence/r2dbc/snapshot/R2dbcSnapshotStore.scala +++ b/core/src/main/scala/akka/persistence/r2dbc/snapshot/R2dbcSnapshotStore.scala @@ -9,17 +9,18 @@ import java.time.Instant import akka.actor.typed.ActorSystem import akka.actor.typed.scaladsl.adapter._ import akka.persistence.{ SelectedSnapshot, SnapshotMetadata, SnapshotSelectionCriteria } -import akka.persistence.r2dbc.{ ConnectionFactoryProvider, R2dbcSettings } +import akka.persistence.r2dbc.R2dbcSettings import akka.persistence.snapshot.SnapshotStore import akka.serialization.{ Serialization, SerializationExtension } import com.typesafe.config.Config import scala.concurrent.{ ExecutionContext, Future } +import org.slf4j.LoggerFactory + import akka.annotation.InternalApi import akka.persistence.Persistence -import akka.persistence.query.typed.EventEnvelope import akka.persistence.r2dbc.internal.JournalDao -import akka.persistence.r2dbc.internal.SnapshotDao +import akka.persistence.r2dbc.internal.R2dbcExecutorProvider import akka.persistence.r2dbc.internal.SnapshotDao.SerializedSnapshotMetadata import akka.persistence.r2dbc.internal.SnapshotDao.SerializedSnapshotRow import akka.persistence.typed.PersistenceId @@ -58,10 +59,10 @@ private[r2dbc] final class R2dbcSnapshotStore(cfg: Config, cfgPath: String) exte val settings = R2dbcSettings(context.system.settings.config.getConfig(sharedConfigPath)) log.debug("R2DBC snapshot store starting up with dialect [{}]", settings.dialectName) - private val connectionFactory = - ConnectionFactoryProvider(system).connectionFactoryFor(sharedConfigPath + ".connection-factory") - private val dao = settings.connectionFactorySettings.dialect.createSnapshotDao(settings, connectionFactory) - private val queryDao = settings.connectionFactorySettings.dialect.createQueryDao(settings, connectionFactory) + private val executorProvider = + new R2dbcExecutorProvider(settings, sharedConfigPath + ".connection-factory", LoggerFactory.getLogger(getClass)) + private val dao = settings.connectionFactorySettings.dialect.createSnapshotDao(settings, executorProvider) + private val queryDao = settings.connectionFactorySettings.dialect.createQueryDao(settings, executorProvider) def loadAsync(persistenceId: String, criteria: SnapshotSelectionCriteria): Future[Option[SelectedSnapshot]] = dao diff --git a/core/src/main/scala/akka/persistence/r2dbc/state/scaladsl/R2dbcDurableStateStore.scala b/core/src/main/scala/akka/persistence/r2dbc/state/scaladsl/R2dbcDurableStateStore.scala index be7656e5..c8473758 100644 --- a/core/src/main/scala/akka/persistence/r2dbc/state/scaladsl/R2dbcDurableStateStore.scala +++ b/core/src/main/scala/akka/persistence/r2dbc/state/scaladsl/R2dbcDurableStateStore.scala @@ -26,7 +26,6 @@ import akka.persistence.query.UpdatedDurableState import akka.persistence.query.scaladsl.DurableStateStorePagedPersistenceIdsQuery import akka.persistence.query.typed.EventEnvelope import akka.persistence.query.typed.scaladsl.DurableStateStoreBySliceQuery -import akka.persistence.r2dbc.ConnectionFactoryProvider import akka.persistence.r2dbc.R2dbcSettings import akka.persistence.r2dbc.internal.BySliceQuery import akka.persistence.r2dbc.internal.ContinuousQuery @@ -46,6 +45,8 @@ import akka.stream.scaladsl.Source import com.typesafe.config.Config import org.slf4j.LoggerFactory +import akka.persistence.r2dbc.internal.R2dbcExecutorProvider + object R2dbcDurableStateStore { val Identifier = "akka.persistence.r2dbc.state" @@ -65,20 +66,22 @@ class R2dbcDurableStateStore[A](system: ExtendedActorSystem, config: Config, cfg private val log = LoggerFactory.getLogger(getClass) private val sharedConfigPath = cfgPath.replaceAll("""\.state$""", "") private val settings = R2dbcSettings(system.settings.config.getConfig(sharedConfigPath)) - private val journalSettings = R2dbcSettings(system.settings.config.getConfig(sharedConfigPath)) log.debug("R2DBC journal starting up with dialect [{}]", settings.dialectName) private val typedSystem = system.toTyped private val serialization = SerializationExtension(system) private val persistenceExt = Persistence(system) - private val stateDao = settings.connectionFactorySettings.dialect.createDurableStateDao( - settings, - ConnectionFactoryProvider(typedSystem) - .connectionFactoryFor(sharedConfigPath + ".connection-factory"))(typedSystem) + // FIXME maybe this is using the wrong executionContext, H2Dialect is using another dispatcher? + private val executorProvider = + new R2dbcExecutorProvider(settings, sharedConfigPath + ".connection-factory", LoggerFactory.getLogger(getClass))( + typedSystem.executionContext, + typedSystem) + private val stateDao = + settings.connectionFactorySettings.dialect.createDurableStateDao(settings, executorProvider)(typedSystem) private val changeEventWriterUuid = UUID.randomUUID().toString private val pubSub: Option[PubSub] = - if (journalSettings.journalPublishEvents) Some(PubSub(typedSystem)) + if (settings.journalPublishEvents) Some(PubSub(typedSystem)) else None private val bySlice: BySliceQuery[SerializedStateRow, DurableStateChange[A]] = { @@ -225,7 +228,7 @@ class R2dbcDurableStateStore[A](system: ExtendedActorSystem, config: Config, cfg val entityType = PersistenceId.extractEntityType(persistenceId) val slice = persistenceExt.sliceForPersistenceId(persistenceId) - val timestamp = if (journalSettings.useAppTimestamp) InstantFactory.now() else JournalDao.EmptyDbTimestamp + val timestamp = if (settings.useAppTimestamp) InstantFactory.now() else JournalDao.EmptyDbTimestamp SerializedJournalRow( slice, diff --git a/core/src/test/resources/application-postgres-data-partitions.conf b/core/src/test/resources/application-postgres-data-partitions.conf new file mode 100644 index 00000000..d95dc9f5 --- /dev/null +++ b/core/src/test/resources/application-postgres-data-partitions.conf @@ -0,0 +1,12 @@ +# used from CI testing of 4 data partitions with 2 databases + +akka.persistence.r2dbc.data-partition { + number-of-partitions = 4 + number-of-databases = 2 +} + +akka.persistence.r2dbc.connection-factory = ${akka.persistence.r2dbc.postgres} +akka.persistence.r2dbc.connection-factory-0-1 = ${akka.persistence.r2dbc.connection-factory} +akka.persistence.r2dbc.connection-factory-2-3 = ${akka.persistence.r2dbc.connection-factory} +# second db listening on different port +akka.persistence.r2dbc.connection-factory-2-3.port = 5433 diff --git a/core/src/test/scala/akka/persistence/r2dbc/PayloadSpec.scala b/core/src/test/scala/akka/persistence/r2dbc/PayloadSpec.scala index 1cdea01f..496bc6f4 100644 --- a/core/src/test/scala/akka/persistence/r2dbc/PayloadSpec.scala +++ b/core/src/test/scala/akka/persistence/r2dbc/PayloadSpec.scala @@ -92,7 +92,7 @@ class PayloadSpec implicit val journalPayloadCodec: PayloadCodec = settings.journalPayloadCodec val slice = persistenceExt.sliceForPersistenceId(persistenceId) - r2dbcExecutor + r2dbcExecutor(slice) .selectOne[TestRow]("test")( connection => connection.createStatement( diff --git a/core/src/test/scala/akka/persistence/r2dbc/R2dbcSettingsSpec.scala b/core/src/test/scala/akka/persistence/r2dbc/R2dbcSettingsSpec.scala index 2580563e..59ce262d 100644 --- a/core/src/test/scala/akka/persistence/r2dbc/R2dbcSettingsSpec.scala +++ b/core/src/test/scala/akka/persistence/r2dbc/R2dbcSettingsSpec.scala @@ -4,6 +4,8 @@ package akka.persistence.r2dbc +import com.typesafe.config.ConfigException + import akka.persistence.r2dbc.internal.postgres.PostgresDialect.PostgresConnectionFactorySettings import com.typesafe.config.{ Config, ConfigFactory } import io.r2dbc.postgresql.client.SSLMode @@ -21,7 +23,7 @@ class R2dbcSettingsSpec extends AnyWordSpec with TestSuite with Matchers { val config = ConfigFactory .parseString(""" akka.persistence.r2dbc.schema=s1 - akka.persistence.r2dbc.journal.table-data-partitions = 1 + akka.persistence.r2dbc.data-partition.number-of-partitions = 1 """) .withFallback(ConfigFactory.load("application-postgres.conf")) val settings = R2dbcSettings(config.getConfig("akka.persistence.r2dbc")) @@ -35,11 +37,71 @@ class R2dbcSettingsSpec extends AnyWordSpec with TestSuite with Matchers { connectionFactorySettings.urlOption should not be defined } - "have table names with data partition suffix" in { + "support connection settings build from url" in { + val config = + ConfigFactory + .parseString("akka.persistence.r2dbc.connection-factory.url=whatever-url") + .withFallback(ConfigFactory.load("application-postgres.conf")) + + val settings = R2dbcSettings(config.getConfig("akka.persistence.r2dbc")) + val connectionFactorySettings = postgresConnectionFactorySettings(config) + connectionFactorySettings shouldBe a[PostgresConnectionFactorySettings] + connectionFactorySettings.urlOption shouldBe defined + } + + "support ssl-mode as enum name" in { + val config = ConfigFactory + .parseString("akka.persistence.r2dbc.connection-factory.ssl.mode=VERIFY_FULL") + .withFallback(ConfigFactory.load("application-postgres.conf")) + val settings = R2dbcSettings(config.getConfig("akka.persistence.r2dbc")) + val connectionFactorySettings = postgresConnectionFactorySettings(config) + connectionFactorySettings.sslMode shouldBe "VERIFY_FULL" + SSLMode.fromValue(connectionFactorySettings.sslMode) shouldBe SSLMode.VERIFY_FULL + } + + "support ssl-mode values in lower and dashes" in { + val config = ConfigFactory + .parseString("akka.persistence.r2dbc.connection-factory.ssl.mode=verify-full") + .withFallback(ConfigFactory.load("application-postgres.conf")) + val settings = R2dbcSettings(config.getConfig("akka.persistence.r2dbc")) + val connectionFactorySettings = postgresConnectionFactorySettings(config) + connectionFactorySettings.sslMode shouldBe "verify-full" + SSLMode.fromValue(connectionFactorySettings.sslMode) shouldBe SSLMode.VERIFY_FULL + } + } + + "data-partition settings" should { + "report invalid values" in { + val baseConfig = ConfigFactory.load("application-postgres.conf") + def settingsWith(numberOfPartitions: Int, numberOfDatabases: Int = 1): R2dbcSettings = { + val config = ConfigFactory + .parseString(s""" + akka.persistence.r2dbc.data-partition { + number-of-partitions = $numberOfPartitions + number-of-databases = $numberOfDatabases + } + """) + .withFallback(baseConfig) + R2dbcSettings(config.getConfig("akka.persistence.r2dbc")) + } + + intercept[IllegalArgumentException](settingsWith(numberOfPartitions = 0)) + intercept[IllegalArgumentException](settingsWith(numberOfPartitions = 1025)) + intercept[IllegalArgumentException](settingsWith(numberOfPartitions = 6)) + + intercept[IllegalArgumentException](settingsWith(numberOfPartitions = 8, numberOfDatabases = 0)) + intercept[IllegalArgumentException](settingsWith(numberOfPartitions = 8, numberOfDatabases = 1025)) + intercept[IllegalArgumentException](settingsWith(numberOfPartitions = 8, numberOfDatabases = 6)) + intercept[IllegalArgumentException](settingsWith(numberOfPartitions = 8, numberOfDatabases = 16)) + + intercept[ConfigException.Missing](settingsWith(numberOfPartitions = 8, numberOfDatabases = 2)) + } + + "result in table names with data partition suffix" in { val config = ConfigFactory .parseString(""" akka.persistence.r2dbc.schema=s1 - akka.persistence.r2dbc.journal.table-data-partitions = 4 + akka.persistence.r2dbc.data-partition.number-of-partitions = 4 """) .withFallback(ConfigFactory.load("application-postgres.conf")) val settings = R2dbcSettings(config.getConfig("akka.persistence.r2dbc")) @@ -56,51 +118,121 @@ class R2dbcSettingsSpec extends AnyWordSpec with TestSuite with Matchers { "verify slice range within same data partition" in { val config = ConfigFactory .parseString(""" - akka.persistence.r2dbc.journal.table-data-partitions = 4 + akka.persistence.r2dbc.data-partition.number-of-partitions = 4 """) .withFallback(ConfigFactory.load("application-postgres.conf")) val settings = R2dbcSettings(config.getConfig("akka.persistence.r2dbc")) - settings.isJournalSliceRangeWithinSameDataPartition(0, 255) shouldBe true - settings.isJournalSliceRangeWithinSameDataPartition(256, 511) shouldBe true - settings.isJournalSliceRangeWithinSameDataPartition(512, 767) shouldBe true - settings.isJournalSliceRangeWithinSameDataPartition(768, 1023) shouldBe true - - settings.isJournalSliceRangeWithinSameDataPartition(0, 1023) shouldBe false - settings.isJournalSliceRangeWithinSameDataPartition(0, 511) shouldBe false - settings.isJournalSliceRangeWithinSameDataPartition(512, 1023) shouldBe false - settings.isJournalSliceRangeWithinSameDataPartition(511, 512) shouldBe false + settings.isSliceRangeWithinSameDataPartition(0, 255) shouldBe true + settings.isSliceRangeWithinSameDataPartition(256, 511) shouldBe true + settings.isSliceRangeWithinSameDataPartition(512, 767) shouldBe true + settings.isSliceRangeWithinSameDataPartition(768, 1023) shouldBe true + + settings.isSliceRangeWithinSameDataPartition(0, 1023) shouldBe false + settings.isSliceRangeWithinSameDataPartition(0, 511) shouldBe false + settings.isSliceRangeWithinSameDataPartition(512, 1023) shouldBe false + settings.isSliceRangeWithinSameDataPartition(511, 512) shouldBe false } - "support connection settings build from url" in { - val config = - ConfigFactory - .parseString("akka.persistence.r2dbc.connection-factory.url=whatever-url") - .withFallback(ConfigFactory.load("application-postgres.conf")) + "use connection-factory per database when same number of databases as partitions" in { + val config = ConfigFactory + .parseString(""" + akka.persistence.r2dbc.data-partition { + number-of-partitions = 2 + number-of-databases = 2 + } + akka.persistence.r2dbc.connection-factory-0-0 = ${akka.persistence.r2dbc.postgres} + akka.persistence.r2dbc.connection-factory-0-0.host = hostA + akka.persistence.r2dbc.connection-factory-1-1 = ${akka.persistence.r2dbc.postgres} + akka.persistence.r2dbc.connection-factory-1-1.host = hostB + # FIXME maybe we should support a convenience syntax for this case: + # akka.persistence.r2dbc.connection-factory-0 = ${akka.persistence.r2dbc.postgres} + # akka.persistence.r2dbc.connection-factory-1 = ${akka.persistence.r2dbc.postgres} + """) + .withFallback(ConfigFactory.load("application-postgres.conf")) + .resolve() val settings = R2dbcSettings(config.getConfig("akka.persistence.r2dbc")) - val connectionFactorySettings = postgresConnectionFactorySettings(config) - connectionFactorySettings shouldBe a[PostgresConnectionFactorySettings] - connectionFactorySettings.urlOption shouldBe defined + settings.connectionFactorySettings(slice = 0).config.getString("host") shouldBe "hostA" + settings.connectionFactorySettings(slice = 17).config.getString("host") shouldBe "hostA" + settings.connectionFactorySettings(slice = 511).config.getString("host") shouldBe "hostA" + settings.connectionFactorySettings(slice = 512).config.getString("host") shouldBe "hostB" + settings.connectionFactorySettings(slice = 700).config.getString("host") shouldBe "hostB" + settings.connectionFactorySettings(slice = 1023).config.getString("host") shouldBe "hostB" } - "support ssl-mode as enum name" in { + "use connection-factory per database when less databases than partitions" in { val config = ConfigFactory - .parseString("akka.persistence.r2dbc.connection-factory.ssl.mode=VERIFY_FULL") + .parseString(""" + akka.persistence.r2dbc.data-partition { + number-of-partitions = 8 + number-of-databases = 2 + } + akka.persistence.r2dbc.connection-factory-0-3 = ${akka.persistence.r2dbc.postgres} + akka.persistence.r2dbc.connection-factory-0-3.host = hostA + akka.persistence.r2dbc.connection-factory-4-7 = ${akka.persistence.r2dbc.postgres} + akka.persistence.r2dbc.connection-factory-4-7.host = hostB + """) .withFallback(ConfigFactory.load("application-postgres.conf")) + .resolve() val settings = R2dbcSettings(config.getConfig("akka.persistence.r2dbc")) - val connectionFactorySettings = postgresConnectionFactorySettings(config) - connectionFactorySettings.sslMode shouldBe "VERIFY_FULL" - SSLMode.fromValue(connectionFactorySettings.sslMode) shouldBe SSLMode.VERIFY_FULL + settings.connectionFactorySettings(slice = 0).config.getString("host") shouldBe "hostA" + settings.connectionFactorySettings(slice = 17).config.getString("host") shouldBe "hostA" + settings.connectionFactorySettings(slice = 511).config.getString("host") shouldBe "hostA" + settings.connectionFactorySettings(slice = 512).config.getString("host") shouldBe "hostB" + settings.connectionFactorySettings(slice = 700).config.getString("host") shouldBe "hostB" + settings.connectionFactorySettings(slice = 1023).config.getString("host") shouldBe "hostB" } - "support ssl-mode values in lower and dashes" in { + "derive connection-factory config property from number of partitions and databases" in { val config = ConfigFactory - .parseString("akka.persistence.r2dbc.connection-factory.ssl.mode=verify-full") + .parseString(""" + akka.persistence.r2dbc.data-partition { + number-of-partitions = 8 + number-of-databases = 2 + } + akka.persistence.r2dbc.connection-factory-0-3 = ${akka.persistence.r2dbc.postgres} + akka.persistence.r2dbc.connection-factory-0-3.host = hostA + akka.persistence.r2dbc.connection-factory-4-7 = ${akka.persistence.r2dbc.postgres} + akka.persistence.r2dbc.connection-factory-4-7.host = hostB + """) .withFallback(ConfigFactory.load("application-postgres.conf")) + .resolve() val settings = R2dbcSettings(config.getConfig("akka.persistence.r2dbc")) - val connectionFactorySettings = postgresConnectionFactorySettings(config) - connectionFactorySettings.sslMode shouldBe "verify-full" - SSLMode.fromValue(connectionFactorySettings.sslMode) shouldBe SSLMode.VERIFY_FULL + settings.resolveConnectionFactoryConfigPath( + "a.b.connection-factory", + slice = 0) shouldBe "a.b.connection-factory-0-3" + settings.resolveConnectionFactoryConfigPath( + "a.b.connection-factory", + slice = 17) shouldBe "a.b.connection-factory-0-3" + settings.resolveConnectionFactoryConfigPath( + "a.b.connection-factory", + slice = 511) shouldBe "a.b.connection-factory-0-3" + settings.resolveConnectionFactoryConfigPath( + "a.b.connection-factory", + slice = 512) shouldBe "a.b.connection-factory-4-7" + settings.resolveConnectionFactoryConfigPath( + "a.b.connection-factory", + slice = 700) shouldBe "a.b.connection-factory-4-7" + settings.resolveConnectionFactoryConfigPath( + "a.b.connection-factory", + slice = 1023) shouldBe "a.b.connection-factory-4-7" } + + "use default connection-factory config property when one database" in { + val config = ConfigFactory + .parseString(""" + akka.persistence.r2dbc.data-partition { + number-of-partitions = 8 + number-of-databases = 1 + } + """) + .withFallback(ConfigFactory.load("application-postgres.conf")) + val settings = R2dbcSettings(config.getConfig("akka.persistence.r2dbc")) + settings.resolveConnectionFactoryConfigPath("a.b.connection-factory", slice = 0) shouldBe "a.b.connection-factory" + settings.resolveConnectionFactoryConfigPath( + "a.b.connection-factory", + slice = 1023) shouldBe "a.b.connection-factory" + } + } } diff --git a/core/src/test/scala/akka/persistence/r2dbc/TestDbLifecycle.scala b/core/src/test/scala/akka/persistence/r2dbc/TestDbLifecycle.scala index 11a42eab..2683230b 100644 --- a/core/src/test/scala/akka/persistence/r2dbc/TestDbLifecycle.scala +++ b/core/src/test/scala/akka/persistence/r2dbc/TestDbLifecycle.scala @@ -6,18 +6,22 @@ package akka.persistence.r2dbc import scala.concurrent.Await import scala.concurrent.duration._ + import akka.actor.typed.ActorSystem import akka.persistence.Persistence import akka.persistence.r2dbc.internal.R2dbcExecutor import org.scalatest.BeforeAndAfterAll import org.scalatest.Suite import org.slf4j.LoggerFactory + import akka.persistence.r2dbc.internal.Sql.Interpolation import akka.persistence.r2dbc.internal.h2.H2Dialect - import java.time.Instant + import scala.util.control.NonFatal +import akka.persistence.r2dbc.internal.R2dbcExecutorProvider + trait TestDbLifecycle extends BeforeAndAfterAll { this: Suite => def typedSystem: ActorSystem[_] @@ -27,27 +31,30 @@ trait TestDbLifecycle extends BeforeAndAfterAll { this: Suite => lazy val r2dbcSettings: R2dbcSettings = R2dbcSettings(typedSystem.settings.config.getConfig(testConfigPath)) - lazy val r2dbcExecutor: R2dbcExecutor = { - new R2dbcExecutor( - ConnectionFactoryProvider(typedSystem) - .connectionFactoryFor(testConfigPath + ".connection-factory"), - LoggerFactory.getLogger(getClass), - r2dbcSettings.logDbCallsExceeding, - r2dbcSettings.connectionFactorySettings.poolSettings.closeCallsExceeding)( + lazy val r2dbcExecutorProvider: R2dbcExecutorProvider = + new R2dbcExecutorProvider(r2dbcSettings, testConfigPath + ".connection-factory", LoggerFactory.getLogger(getClass))( typedSystem.executionContext, typedSystem) - } + + def r2dbcExecutor(slice: Int): R2dbcExecutor = + r2dbcExecutorProvider.executorFor(slice) + + // FIXME maybe remove, and always use the r2dbcExecutorProvider with explicit slice + lazy val r2dbcExecutor: R2dbcExecutor = + r2dbcExecutor(slice = 0) lazy val persistenceExt: Persistence = Persistence(typedSystem) def pendingIfMoreThanOneDataPartition(): Unit = - if (r2dbcSettings.journalTableDataPartitions > 1) + if (r2dbcSettings.numberOfDataPartitions > 1) pending override protected def beforeAll(): Unit = { try { - r2dbcSettings.alljournalTablesWithSchema.foreach { table => - Await.result(r2dbcExecutor.updateOne("beforeAll delete")(_.createStatement(s"delete from $table")), 10.seconds) + r2dbcSettings.alljournalTablesWithSchema.foreach { case (table, minSlice) => + Await.result( + r2dbcExecutor(minSlice).updateOne("beforeAll delete")(_.createStatement(s"delete from $table")), + 10.seconds) } Await.result( r2dbcExecutor.updateOne("beforeAll delete")( diff --git a/core/src/test/scala/akka/persistence/r2dbc/internal/H2AdditionalInitForSchemaSpec.scala b/core/src/test/scala/akka/persistence/r2dbc/internal/H2AdditionalInitForSchemaSpec.scala index b35c4e08..d3e9f6d6 100644 --- a/core/src/test/scala/akka/persistence/r2dbc/internal/H2AdditionalInitForSchemaSpec.scala +++ b/core/src/test/scala/akka/persistence/r2dbc/internal/H2AdditionalInitForSchemaSpec.scala @@ -32,6 +32,9 @@ object H2AdditionalInitForSchemaSpec { additional-init = "alter table durable_state add if not exists col1 varchar(256)" } // #additionalColumn + + # when testing with number-of-databases > 1 we must override that for H2 + akka.persistence.r2dbc.data-partition.number-of-databases = 1 """) .withFallback(ConfigFactory.load()) .resolve() diff --git a/core/src/test/scala/akka/persistence/r2dbc/internal/R2dbcExecutorSpec.scala b/core/src/test/scala/akka/persistence/r2dbc/internal/R2dbcExecutorSpec.scala index 2dfa31a2..b3f814a7 100644 --- a/core/src/test/scala/akka/persistence/r2dbc/internal/R2dbcExecutorSpec.scala +++ b/core/src/test/scala/akka/persistence/r2dbc/internal/R2dbcExecutorSpec.scala @@ -73,6 +73,8 @@ class R2dbcExecutorSpec } "R2dbcExecutor" should { + // when number-of-databases > 1 the test config above will not be used + pendingIfMoreThanOneDataPartition() "close connection when no response from update" in { pendingIfCannotBeTestedWithDialect() diff --git a/core/src/test/scala/akka/persistence/r2dbc/journal/PersistSerializedEventSpec.scala b/core/src/test/scala/akka/persistence/r2dbc/journal/PersistSerializedEventSpec.scala index 0ad99440..08c10762 100644 --- a/core/src/test/scala/akka/persistence/r2dbc/journal/PersistSerializedEventSpec.scala +++ b/core/src/test/scala/akka/persistence/r2dbc/journal/PersistSerializedEventSpec.scala @@ -70,7 +70,7 @@ class PersistSerializedEventSpec replyProbe.expectMessage("e1|Some(e2)") val rows = - r2dbcExecutor + r2dbcExecutor(slice) .select[Row]("test")( connection => connection.createStatement( diff --git a/core/src/test/scala/akka/persistence/r2dbc/journal/PersistTagsSpec.scala b/core/src/test/scala/akka/persistence/r2dbc/journal/PersistTagsSpec.scala index bd1b7db8..ac6b9464 100644 --- a/core/src/test/scala/akka/persistence/r2dbc/journal/PersistTagsSpec.scala +++ b/core/src/test/scala/akka/persistence/r2dbc/journal/PersistTagsSpec.scala @@ -32,8 +32,8 @@ class PersistTagsSpec implicit val tagsCodec: TagsCodec = settings.tagsCodec case class Row(pid: String, seqNr: Long, tags: Set[String]) - private def selectRows(table: String): IndexedSeq[Row] = { - r2dbcExecutor + private def selectRows(table: String, minSlice: Int): IndexedSeq[Row] = { + r2dbcExecutor(minSlice) .select[Row]("test")( connection => connection.createStatement(s"select * from $table"), row => @@ -45,7 +45,9 @@ class PersistTagsSpec } private def selectAllRows(): IndexedSeq[Row] = - r2dbcSettings.alljournalTablesWithSchema.toVector.sorted.flatMap(selectRows) + r2dbcSettings.alljournalTablesWithSchema.toVector.sortBy(_._1).flatMap { case (table, minSlice) => + selectRows(table, minSlice) + } "Persist tags" should { diff --git a/core/src/test/scala/akka/persistence/r2dbc/journal/PersistTimestampSpec.scala b/core/src/test/scala/akka/persistence/r2dbc/journal/PersistTimestampSpec.scala index 0b19ea2f..a601d76f 100644 --- a/core/src/test/scala/akka/persistence/r2dbc/journal/PersistTimestampSpec.scala +++ b/core/src/test/scala/akka/persistence/r2dbc/journal/PersistTimestampSpec.scala @@ -43,8 +43,8 @@ class PersistTimestampSpec else PostgresTimestampCodec - private def selectRows(table: String): IndexedSeq[Row] = { - r2dbcExecutor + private def selectRows(table: String, minSlice: Int): IndexedSeq[Row] = { + r2dbcExecutor(minSlice) .select[Row]("test")( connection => connection.createStatement(s"select * from $table"), row => { @@ -65,7 +65,9 @@ class PersistTimestampSpec } private def selectAllRows(): IndexedSeq[Row] = - r2dbcSettings.alljournalTablesWithSchema.toVector.sorted.flatMap(selectRows) + r2dbcSettings.alljournalTablesWithSchema.toVector.sortBy(_._1).flatMap { case (table, minSlice) => + selectRows(table, minSlice) + } "Persist timestamp" should { diff --git a/core/src/test/scala/akka/persistence/r2dbc/query/EventsBySliceBacktrackingSpec.scala b/core/src/test/scala/akka/persistence/r2dbc/query/EventsBySliceBacktrackingSpec.scala index c047a81d..3fc1d6d5 100644 --- a/core/src/test/scala/akka/persistence/r2dbc/query/EventsBySliceBacktrackingSpec.scala +++ b/core/src/test/scala/akka/persistence/r2dbc/query/EventsBySliceBacktrackingSpec.scala @@ -75,7 +75,7 @@ class EventsBySliceBacktrackingSpec VALUES (?, ?, ?, ?, ?, '', '', ?, '', ?)""" val entityType = PersistenceId.extractEntityType(persistenceId) - val result = r2dbcExecutor.updateOne("test writeEvent") { connection => + val result = r2dbcExecutor(slice).updateOne("test writeEvent") { connection => connection .createStatement(insertEventSql) .bind(0, slice) diff --git a/core/src/test/scala/akka/persistence/r2dbc/state/DurableStateUpdateWithChangeEventStoreSpec.scala b/core/src/test/scala/akka/persistence/r2dbc/state/DurableStateUpdateWithChangeEventStoreSpec.scala index c7578c3d..9205cdd9 100644 --- a/core/src/test/scala/akka/persistence/r2dbc/state/DurableStateUpdateWithChangeEventStoreSpec.scala +++ b/core/src/test/scala/akka/persistence/r2dbc/state/DurableStateUpdateWithChangeEventStoreSpec.scala @@ -47,6 +47,8 @@ class DurableStateUpdateWithChangeEventStoreSpec private val tag = "TAG" "The R2DBC durable state store" should { + pendingIfMoreThanOneDataPartition() // FIXME + "save additional change event" in { val entityType = nextEntityType() val persistenceId = PersistenceId(entityType, "my-persistenceId").id @@ -176,37 +178,36 @@ class DurableStateUpdateWithChangeEventStoreSpec envelopes.size shouldBe 2 } - } - - "publish change event" in { - val entityType = nextEntityType() - val persistenceId = PersistenceId(entityType, "my-persistenceId").id + "publish change event" in { + val entityType = nextEntityType() + val persistenceId = PersistenceId(entityType, "my-persistenceId").id - val slice = persistenceExt.sliceForPersistenceId(persistenceId) - val topic = PubSub(system).eventTopic[String](entityType, slice) - val subscriberProbe = createTestProbe[EventEnvelope[String]]() - topic ! Topic.Subscribe(subscriberProbe.ref) + val slice = persistenceExt.sliceForPersistenceId(persistenceId) + val topic = PubSub(system).eventTopic[String](entityType, slice) + val subscriberProbe = createTestProbe[EventEnvelope[String]]() + topic ! Topic.Subscribe(subscriberProbe.ref) - val value1 = "Genuinely Collaborative" - val value2 = "Open to Feedback" + val value1 = "Genuinely Collaborative" + val value2 = "Open to Feedback" - store.upsertObject(persistenceId, 1L, value1, tag, s"Changed to $value1").futureValue - store.upsertObject(persistenceId, 2L, value2, tag, s"Changed to $value2").futureValue - store.deleteObject(persistenceId, 3L, "Deleted").futureValue + store.upsertObject(persistenceId, 1L, value1, tag, s"Changed to $value1").futureValue + store.upsertObject(persistenceId, 2L, value2, tag, s"Changed to $value2").futureValue + store.deleteObject(persistenceId, 3L, "Deleted").futureValue - val env1 = subscriberProbe.receiveMessage() - env1.event shouldBe s"Changed to $value1" - env1.sequenceNr shouldBe 1L - env1.tags shouldBe Set(tag) - env1.source shouldBe EnvelopeOrigin.SourcePubSub + val env1 = subscriberProbe.receiveMessage() + env1.event shouldBe s"Changed to $value1" + env1.sequenceNr shouldBe 1L + env1.tags shouldBe Set(tag) + env1.source shouldBe EnvelopeOrigin.SourcePubSub - val env2 = subscriberProbe.receiveMessage() - env2.event shouldBe s"Changed to $value2" - env2.sequenceNr shouldBe 2L + val env2 = subscriberProbe.receiveMessage() + env2.event shouldBe s"Changed to $value2" + env2.sequenceNr shouldBe 2L - val env3 = subscriberProbe.receiveMessage() - env3.event shouldBe s"Deleted" - env3.sequenceNr shouldBe 3L + val env3 = subscriberProbe.receiveMessage() + env3.event shouldBe s"Deleted" + env3.sequenceNr shouldBe 3L + } } } diff --git a/ddl-scripts/create_tables_postgres_4.sql b/ddl-scripts/create_tables_postgres_0-1.sql similarity index 66% rename from ddl-scripts/create_tables_postgres_4.sql rename to ddl-scripts/create_tables_postgres_0-1.sql index da417cf4..7daaab47 100644 --- a/ddl-scripts/create_tables_postgres_4.sql +++ b/ddl-scripts/create_tables_postgres_0-1.sql @@ -1,3 +1,5 @@ +-- tables for data partition 0 and 1 + CREATE TABLE IF NOT EXISTS event_journal_0( slice INT NOT NULL, entity_type VARCHAR(255) NOT NULL, @@ -44,57 +46,9 @@ CREATE TABLE IF NOT EXISTS event_journal_1( PRIMARY KEY(persistence_id, seq_nr) ); -CREATE TABLE IF NOT EXISTS event_journal_2( - slice INT NOT NULL, - entity_type VARCHAR(255) NOT NULL, - persistence_id VARCHAR(255) NOT NULL, - seq_nr BIGINT NOT NULL, - db_timestamp timestamp with time zone NOT NULL, - - event_ser_id INTEGER NOT NULL, - event_ser_manifest VARCHAR(255) NOT NULL, - event_payload BYTEA NOT NULL, - - deleted BOOLEAN DEFAULT FALSE NOT NULL, - writer VARCHAR(255) NOT NULL, - adapter_manifest VARCHAR(255), - tags TEXT ARRAY, - - meta_ser_id INTEGER, - meta_ser_manifest VARCHAR(255), - meta_payload BYTEA, - - PRIMARY KEY(persistence_id, seq_nr) -); - -CREATE TABLE IF NOT EXISTS event_journal_3( - slice INT NOT NULL, - entity_type VARCHAR(255) NOT NULL, - persistence_id VARCHAR(255) NOT NULL, - seq_nr BIGINT NOT NULL, - db_timestamp timestamp with time zone NOT NULL, - - event_ser_id INTEGER NOT NULL, - event_ser_manifest VARCHAR(255) NOT NULL, - event_payload BYTEA NOT NULL, - - deleted BOOLEAN DEFAULT FALSE NOT NULL, - writer VARCHAR(255) NOT NULL, - adapter_manifest VARCHAR(255), - tags TEXT ARRAY, - - meta_ser_id INTEGER, - meta_ser_manifest VARCHAR(255), - meta_payload BYTEA, - - PRIMARY KEY(persistence_id, seq_nr) -); - -- `event_journal_slice_idx` is only needed if the slice based queries are used CREATE INDEX IF NOT EXISTS event_journal_0_slice_idx ON event_journal_0(slice, entity_type, db_timestamp, seq_nr); CREATE INDEX IF NOT EXISTS event_journal_1_slice_idx ON event_journal_1(slice, entity_type, db_timestamp, seq_nr); -CREATE INDEX IF NOT EXISTS event_journal_2_slice_idx ON event_journal_2(slice, entity_type, db_timestamp, seq_nr); -CREATE INDEX IF NOT EXISTS event_journal_3_slice_idx ON event_journal_3(slice, entity_type, db_timestamp, seq_nr); CREATE TABLE IF NOT EXISTS snapshot( slice INT NOT NULL, diff --git a/ddl-scripts/create_tables_postgres_2-3.sql b/ddl-scripts/create_tables_postgres_2-3.sql new file mode 100644 index 00000000..a8e807a3 --- /dev/null +++ b/ddl-scripts/create_tables_postgres_2-3.sql @@ -0,0 +1,90 @@ +-- tables for data partition 2 and 3 + +CREATE TABLE IF NOT EXISTS event_journal_2( + slice INT NOT NULL, + entity_type VARCHAR(255) NOT NULL, + persistence_id VARCHAR(255) NOT NULL, + seq_nr BIGINT NOT NULL, + db_timestamp timestamp with time zone NOT NULL, + + event_ser_id INTEGER NOT NULL, + event_ser_manifest VARCHAR(255) NOT NULL, + event_payload BYTEA NOT NULL, + + deleted BOOLEAN DEFAULT FALSE NOT NULL, + writer VARCHAR(255) NOT NULL, + adapter_manifest VARCHAR(255), + tags TEXT ARRAY, + + meta_ser_id INTEGER, + meta_ser_manifest VARCHAR(255), + meta_payload BYTEA, + + PRIMARY KEY(persistence_id, seq_nr) +); + +CREATE TABLE IF NOT EXISTS event_journal_3( + slice INT NOT NULL, + entity_type VARCHAR(255) NOT NULL, + persistence_id VARCHAR(255) NOT NULL, + seq_nr BIGINT NOT NULL, + db_timestamp timestamp with time zone NOT NULL, + + event_ser_id INTEGER NOT NULL, + event_ser_manifest VARCHAR(255) NOT NULL, + event_payload BYTEA NOT NULL, + + deleted BOOLEAN DEFAULT FALSE NOT NULL, + writer VARCHAR(255) NOT NULL, + adapter_manifest VARCHAR(255), + tags TEXT ARRAY, + + meta_ser_id INTEGER, + meta_ser_manifest VARCHAR(255), + meta_payload BYTEA, + + PRIMARY KEY(persistence_id, seq_nr) +); + +-- `event_journal_slice_idx` is only needed if the slice based queries are used +CREATE INDEX IF NOT EXISTS event_journal_2_slice_idx ON event_journal_2(slice, entity_type, db_timestamp, seq_nr); +CREATE INDEX IF NOT EXISTS event_journal_3_slice_idx ON event_journal_3(slice, entity_type, db_timestamp, seq_nr); + +CREATE TABLE IF NOT EXISTS snapshot( + slice INT NOT NULL, + entity_type VARCHAR(255) NOT NULL, + persistence_id VARCHAR(255) NOT NULL, + seq_nr BIGINT NOT NULL, + db_timestamp timestamp with time zone, + write_timestamp BIGINT NOT NULL, + ser_id INTEGER NOT NULL, + ser_manifest VARCHAR(255) NOT NULL, + snapshot BYTEA NOT NULL, + tags TEXT ARRAY, + meta_ser_id INTEGER, + meta_ser_manifest VARCHAR(255), + meta_payload BYTEA, + + PRIMARY KEY(persistence_id) +); + +-- `snapshot_slice_idx` is only needed if the slice based queries are used together with snapshot as starting point +CREATE INDEX IF NOT EXISTS snapshot_slice_idx ON snapshot(slice, entity_type, db_timestamp); + +CREATE TABLE IF NOT EXISTS durable_state ( + slice INT NOT NULL, + entity_type VARCHAR(255) NOT NULL, + persistence_id VARCHAR(255) NOT NULL, + revision BIGINT NOT NULL, + db_timestamp timestamp with time zone NOT NULL, + + state_ser_id INTEGER NOT NULL, + state_ser_manifest VARCHAR(255), + state_payload BYTEA NOT NULL, + tags TEXT ARRAY, + + PRIMARY KEY(persistence_id, revision) +); + +-- `durable_state_slice_idx` is only needed if the slice based queries are used +CREATE INDEX IF NOT EXISTS durable_state_slice_idx ON durable_state(slice, entity_type, db_timestamp, revision); diff --git a/docker/docker-compose-postgres-2.yml b/docker/docker-compose-postgres-2.yml new file mode 100644 index 00000000..35369902 --- /dev/null +++ b/docker/docker-compose-postgres-2.yml @@ -0,0 +1,30 @@ +version: '2.2' +services: + postgres-db-0: + image: postgres:latest + container_name: postgres-db-0 + ports: + - 5432:5432 + environment: + POSTGRES_USER: postgres + POSTGRES_PASSWORD: postgres + healthcheck: + test: ['CMD', 'pg_isready', "-q", "-d", "postgres", "-U", "postgres"] + interval: 5s + retries: 5 + start_period: 5s + timeout: 5s + postgres-db-1: + image: postgres:latest + container_name: postgres-db-1 + ports: + - 5433:5432 + environment: + POSTGRES_USER: postgres + POSTGRES_PASSWORD: postgres + healthcheck: + test: [ 'CMD', 'pg_isready', "-q", "-d", "postgres", "-U", "postgres" ] + interval: 5s + retries: 5 + start_period: 5s + timeout: 5s diff --git a/migration/src/main/scala/akka/persistence/r2dbc/migration/MigrationTool.scala b/migration/src/main/scala/akka/persistence/r2dbc/migration/MigrationTool.scala index e722417c..937ac1be 100644 --- a/migration/src/main/scala/akka/persistence/r2dbc/migration/MigrationTool.scala +++ b/migration/src/main/scala/akka/persistence/r2dbc/migration/MigrationTool.scala @@ -5,12 +5,14 @@ package akka.persistence.r2dbc.migration import java.time.Instant + import scala.concurrent.ExecutionContext import scala.concurrent.Future import scala.concurrent.duration._ import scala.util.Failure import scala.util.Success import scala.util.Try + import akka.Done import akka.actor.typed.ActorSystem import akka.actor.typed.Behavior @@ -46,6 +48,8 @@ import akka.util.Timeout import io.r2dbc.spi.R2dbcDataIntegrityViolationException import org.slf4j.LoggerFactory +import akka.persistence.r2dbc.internal.R2dbcExecutorProvider + object MigrationTool { def main(args: Array[String]): Unit = { ActorSystem(MigrationTool(), "MigrationTool") @@ -111,13 +115,16 @@ class MigrationTool(system: ActorSystem[_]) { private val serialization: Serialization = SerializationExtension(system) - private val targetConnectionFactory = ConnectionFactoryProvider(system) - .connectionFactoryFor(targetPluginId + ".connection-factory") + private val targetExecutorProvider = new R2dbcExecutorProvider( + targetR2dbcSettings, + targetPluginId + ".connection-factory", + LoggerFactory.getLogger(getClass)) + private val targetJournalDao = - targetR2dbcSettings.connectionFactorySettings.dialect.createJournalDao(targetR2dbcSettings, targetConnectionFactory) + targetR2dbcSettings.connectionFactorySettings.dialect.createJournalDao(targetR2dbcSettings, targetExecutorProvider) private val targetSnapshotDao = targetR2dbcSettings.connectionFactorySettings.dialect - .createSnapshotDao(targetR2dbcSettings, targetConnectionFactory) + .createSnapshotDao(targetR2dbcSettings, targetExecutorProvider) private val targetBatch = migrationConfig.getInt("target.batch") @@ -132,11 +139,7 @@ class MigrationTool(system: ActorSystem[_]) { if (targetR2dbcSettings.dialectName == "h2") { log.error("Migrating to H2 using the migration tool not currently supported") } - private[r2dbc] val migrationDao = - new MigrationToolDao( - targetConnectionFactory, - targetR2dbcSettings.logDbCallsExceeding, - targetR2dbcSettings.connectionFactorySettings.poolSettings.closeCallsExceeding) + private[r2dbc] val migrationDao = new MigrationToolDao(targetExecutorProvider) private lazy val createProgressTable: Future[Done] = migrationDao.createProgressTable() diff --git a/migration/src/main/scala/akka/persistence/r2dbc/migration/MigrationToolDao.scala b/migration/src/main/scala/akka/persistence/r2dbc/migration/MigrationToolDao.scala index e9ecf5bb..9637d3bf 100644 --- a/migration/src/main/scala/akka/persistence/r2dbc/migration/MigrationToolDao.scala +++ b/migration/src/main/scala/akka/persistence/r2dbc/migration/MigrationToolDao.scala @@ -6,19 +6,18 @@ package akka.persistence.r2dbc.migration import scala.concurrent.ExecutionContext import scala.concurrent.Future -import scala.concurrent.duration.FiniteDuration import akka.Done import akka.actor.typed.ActorSystem import akka.annotation.InternalApi import akka.dispatch.ExecutionContexts import akka.persistence.r2dbc.internal.Sql.Interpolation -import akka.persistence.r2dbc.internal.R2dbcExecutor import akka.persistence.r2dbc.internal.codec.IdentityAdapter import akka.persistence.r2dbc.internal.codec.QueryAdapter -import io.r2dbc.spi.ConnectionFactory import org.slf4j.LoggerFactory +import akka.persistence.r2dbc.internal.R2dbcExecutorProvider + /** * INTERNAL API */ @@ -31,14 +30,13 @@ import org.slf4j.LoggerFactory /** * INTERNAL API */ -@InternalApi private[r2dbc] class MigrationToolDao( - connectionFactory: ConnectionFactory, - logDbCallsExceeding: FiniteDuration, - closeCallsExceeding: Option[FiniteDuration])(implicit ec: ExecutionContext, system: ActorSystem[_]) { +@InternalApi private[r2dbc] class MigrationToolDao(executorProvider: R2dbcExecutorProvider)(implicit + ec: ExecutionContext, + system: ActorSystem[_]) { import MigrationToolDao._ implicit val queryAdapter: QueryAdapter = IdentityAdapter - private val r2dbcExecutor = - new R2dbcExecutor(connectionFactory, log, logDbCallsExceeding, closeCallsExceeding)(ec, system) + // progress always in data partition 0 + private val r2dbcExecutor = executorProvider.executorFor(slice = 0) def createProgressTable(): Future[Done] = { r2dbcExecutor.executeDdl("create migration progress table") { connection =>