Skip to content

Commit

Permalink
feat: Data partitions (#508)
Browse files Browse the repository at this point in the history
* H2Dialect create tables
* verify slice range within data partition
* ci job with 4 data partitions
* partition over multiple databases
* all persistenceIds queries
  * retrieve from each data partition and combine the results
* mention min number of projection instances
* keep journalTableWithSchema for compatibility
* mima filter
* and utility to retrieve connection factory names
  • Loading branch information
patriknw authored Feb 5, 2024
1 parent db82b13 commit ba3dfbf
Show file tree
Hide file tree
Showing 53 changed files with 1,246 additions and 425 deletions.
35 changes: 35 additions & 0 deletions .github/workflows/build-test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,41 @@ jobs:
-Dakka.persistence.r2dbc.state.payload-column-type=JSONB \
"core/testOnly akka.persistence.r2dbc.PayloadSpec"
test-postgres-data-partitions:
name: Run test with Postgres and several data partitions
runs-on: ubuntu-22.04
if: github.repository == 'akka/akka-persistence-r2dbc'
steps:
- name: Checkout
uses: actions/[email protected]
with:
fetch-depth: 0

- name: Checkout GitHub merge
if: github.event.pull_request
run: |-
git fetch origin pull/${{ github.event.pull_request.number }}/merge:scratch
git checkout scratch
- name: Cache Coursier cache
uses: coursier/[email protected]

- name: Set up JDK 11
uses: coursier/[email protected]
with:
jvm: temurin:1.11.0

- name: Start DB
run: |-
docker compose -f docker/docker-compose-postgres-2.yml up --wait
docker exec -i postgres-db-0 psql -U postgres -t < ddl-scripts/create_tables_postgres_0-1.sql
docker exec -i postgres-db-1 psql -U postgres -t < ddl-scripts/create_tables_postgres_2-3.sql
- name: sbt test
run: |-
cp .jvmopts-ci .jvmopts
sbt -Dconfig.resource=application-postgres-data-partitions.conf test
test-yugabyte:
name: Run tests with Yugabyte
runs-on: ubuntu-22.04
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
# internals
ProblemFilters.exclude[DirectMissingMethodProblem]("akka.persistence.r2dbc.internal.BySliceQuery#Dao.currentDbTimestamp")
ProblemFilters.exclude[ReversedMissingMethodProblem]("akka.persistence.r2dbc.internal.BySliceQuery#Dao.currentDbTimestamp")
32 changes: 32 additions & 0 deletions core/src/main/resources/reference.conf
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ akka.persistence.r2dbc {

# replay filter not needed for this plugin
replay-filter.mode = off

}
}
// #journal-settings
Expand Down Expand Up @@ -182,6 +183,36 @@ akka.persistence.r2dbc {
}
}

// #data-partition-settings
# Number of tables and databases that the data will be split into. The selection of data
# partition is made from the slice of the persistenceId.
# For example, 4 data partitions means that slice range (0 to 255) maps to data partition 0,
# (256 to 511) to data partition 1, (512 to 767) to data partition 3, and (768 to 1023) to
# data partition 3.
# This configuration cannot be changed in a rolling update, since the data must be moved
# between the tables if number of data partitions is changed.
# The number of Projection instances when using eventsBySlices must be greater than or equal
# to the number of data partitions, because a query for a slice range cannot span over more
# than one data partition.
akka.persistence.r2dbc.data-partition {
# How many tables the data will be partitioned over. The tables will have
# the data partition as suffix, e.g. event_journal_0, event_journal_1.
# Must be between 1 and 1024 and a whole number divisor of 1024 (number of slices).
# When number-of-partitions is 1 the table name is without suffix.
number-of-partitions = 1
# How many databases the tables will be partitioned over. A database corresponds to a connection
# factory with its own connection pool.
# Must be a whole number divisor of number-of-partitions, and less than or equal to number-of-partitions.
# For example, number-of-partitions=8 and number-of-databases=2 means that there will be a total of
# 8 tables in 2 databases, i.e. 4 tables in each database.
# The connection-factory setting will have the data partition range as suffix, e.g. with 8 data partitions and
# 2 databases the connection factory settings are connection-factory-0-3, connection-factory-4-7.
# When number-of-databases is 1 there will only be one connection factory, without suffix.
# number-of-databases > 1 not supported by H2.
number-of-databases = 1
}
// #data-partition-settings

// #connection-settings
akka.persistence.r2dbc {

Expand Down Expand Up @@ -354,6 +385,7 @@ akka.persistence.r2dbc {
journal-table = ${akka.persistence.r2dbc.journal.table}
state-table = ${akka.persistence.r2dbc.state.table}
snapshot-table = ${akka.persistence.r2dbc.snapshot.table}
number-of-partitions = ${akka.persistence.r2dbc.data-partition.number-of-partitions}
// #connection-settings-h2
}

Expand Down
176 changes: 154 additions & 22 deletions core/src/main/scala/akka/persistence/r2dbc/R2dbcSettings.scala
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

package akka.persistence.r2dbc

import scala.collection.immutable
import akka.annotation.InternalApi
import akka.annotation.InternalStableApi
import akka.persistence.r2dbc.internal.codec.IdentityAdapter
Expand All @@ -26,6 +27,9 @@ import scala.concurrent.duration._
@InternalStableApi
object R2dbcSettings {

// must correspond to akka.persistence.Persistence.numberOfSlices
private val NumberOfSlices = 1024

def apply(config: Config): R2dbcSettings = {
if (config.hasPath("dialect")) {
throw new IllegalArgumentException(
Expand All @@ -34,13 +38,6 @@ object R2dbcSettings {
"see akka-persistence-r2dbc documentation for details on the new configuration scheme: " +
"https://doc.akka.io/docs/akka-persistence-r2dbc/current/migration-guide.html")
}
if (!config.hasPath("connection-factory.dialect")) {
throw new IllegalArgumentException(
"The Akka Persistence R2DBC database config scheme has changed, the config needs to be updated " +
"to choose database dialect using the connection-factory block, " +
"see akka-persistence-r2dbc documentation for details on the new configuration scheme: " +
"https://doc.akka.io/docs/akka-persistence-r2dbc/current/migration-guide.html")
}

val schema: Option[String] = Option(config.getString("schema")).filterNot(_.trim.isEmpty)

Expand Down Expand Up @@ -77,7 +74,45 @@ object R2dbcSettings {

val durableStateAssertSingleWriter: Boolean = config.getBoolean("state.assert-single-writer")

val connectionFactorySettings = ConnectionFactorySettings(config.getConfig("connection-factory"))
val numberOfDataPartitions = config.getInt("data-partition.number-of-partitions")
require(
1 <= numberOfDataPartitions && numberOfDataPartitions <= NumberOfSlices,
s"data-partition.number-of-partitions [$numberOfDataPartitions] must be between 1 and $NumberOfSlices")
require(
numberOfDataPartitions * (NumberOfSlices / numberOfDataPartitions) == NumberOfSlices,
s"data-partition.number-of-partitions [$numberOfDataPartitions] must be a whole number divisor of " +
s"numberOfSlices [$NumberOfSlices].")

val numberOfDatabases = config.getInt("data-partition.number-of-databases")
require(
1 <= numberOfDatabases && numberOfDatabases <= numberOfDataPartitions,
s"data-partition.number-of-databases [$numberOfDatabases] must be between 1 and $numberOfDataPartitions")
require(
numberOfDatabases * (numberOfDataPartitions / numberOfDatabases) == numberOfDataPartitions,
s"data-partition.number-of-databases [$numberOfDatabases] must be a whole number divisor of " +
s"data-partition.number-of-partitions [$numberOfDataPartitions].")

val connectionFactorySettings =
if (numberOfDatabases == 1) {
if (!config.hasPath("connection-factory.dialect")) {
throw new IllegalArgumentException(
"The Akka Persistence R2DBC database config scheme has changed, the config needs to be updated " +
"to choose database dialect using the connection-factory block, " +
"see akka-persistence-r2dbc documentation for details on the new configuration scheme: " +
"https://doc.akka.io/docs/akka-persistence-r2dbc/current/migration-guide.html")
}
Vector(ConnectionFactorySettings(config.getConfig("connection-factory")))
} else {
val rangeSize = numberOfDataPartitions / numberOfDatabases
(0 until numberOfDatabases).map { i =>
val configPropertyName = s"connection-factory-${i * rangeSize}-${i * rangeSize + rangeSize - 1}"
ConnectionFactorySettings(config.getConfig(configPropertyName))
}
}

require(
connectionFactorySettings.map(_.dialect.name).toSet.size == 1,
s"All dialects for the [${connectionFactorySettings.size}] database partitions must be the same.")

val querySettings = new QuerySettings(config.getConfig("query"))

Expand All @@ -99,13 +134,13 @@ object R2dbcSettings {
val durableStatePayloadCodec: PayloadCodec =
if (useJsonPayload("state")) PayloadCodec.JsonCodec else PayloadCodec.ByteArrayCodec

connectionFactorySettings.dialect.name match {
connectionFactorySettings.head.dialect.name match {
case "sqlserver" =>
new CodecSettings(
journalPayloadCodec,
snapshotPayloadCodec,
durableStatePayloadCodec,
tagsCodec = new TagsCodec.SqlServerTagsCodec(connectionFactorySettings.config),
tagsCodec = new TagsCodec.SqlServerTagsCodec(connectionFactorySettings.head.config),
timestampCodec = TimestampCodec.SqlServerTimestampCodec,
queryAdapter = SqlServerQueryAdapter)
case "h2" =>
Expand Down Expand Up @@ -144,7 +179,8 @@ object R2dbcSettings {
durableStateTableByEntityType,
durableStateAdditionalColumnClasses,
durableStateChangeHandlerClasses,
useAppTimestamp)
useAppTimestamp,
numberOfDataPartitions)

// let the dialect trump settings that does not make sense for it
settingsFromConfig.connectionFactorySettings.dialect.adaptSettings(settingsFromConfig)
Expand All @@ -154,6 +190,24 @@ object R2dbcSettings {
import akka.util.ccompat.JavaConverters._
cfg.root.unwrapped.asScala.toMap.map { case (k, v) => k -> v.toString }
}

/**
* The config paths for the connection factories that are used for the given number of data partitions and databases.
*/
def connectionFactoryConfigPaths(
baseConfigPath: String,
numberOfDataPartitions: Int,
numberOfDatabases: Int): immutable.IndexedSeq[String] = {
if (numberOfDatabases == 1) {
Vector(baseConfigPath)
} else {
val rangeSize = numberOfDataPartitions / numberOfDatabases
(0 until numberOfDatabases).map { i =>
s"$baseConfigPath-${i * rangeSize}-${i * rangeSize + rangeSize - 1}"
}
}
}

}

/**
Expand All @@ -173,20 +227,73 @@ final class R2dbcSettings private (
val cleanupSettings: CleanupSettings,
/** INTERNAL API */
@InternalApi private[akka] val codecSettings: CodecSettings,
_connectionFactorySettings: ConnectionFactorySettings,
_connectionFactorySettings: immutable.IndexedSeq[ConnectionFactorySettings],
_durableStateTableByEntityType: Map[String, String],
_durableStateAdditionalColumnClasses: Map[String, immutable.IndexedSeq[String]],
_durableStateChangeHandlerClasses: Map[String, String],
_useAppTimestamp: Boolean) {
_useAppTimestamp: Boolean,
val numberOfDataPartitions: Int) {
import R2dbcSettings.NumberOfSlices

/**
* The journal table and schema name without data partition suffix.
*/
val journalTableWithSchema: String = schema.map(_ + ".").getOrElse("") + journalTable

/**
* The journal table and schema name with data partition suffix for the given slice. When number-of-partitions is 1
* the table name is without suffix.
*/
def journalTableWithSchema(slice: Int): String = {
if (numberOfDataPartitions == 1)
journalTableWithSchema
else
s"${journalTableWithSchema}_${dataPartition(slice)}"
}

val snapshotsTableWithSchema: String = schema.map(_ + ".").getOrElse("") + snapshotsTable
val durableStateTableWithSchema: String = schema.map(_ + ".").getOrElse("") + durableStateTable

/**
* INTERNAL API: All journal tables and their the lower slice
*/
@InternalApi private[akka] val allJournalTablesWithSchema: Map[String, Int] = {
(0 until NumberOfSlices).foldLeft(Map.empty[String, Int]) { case (acc, slice) =>
val table = journalTableWithSchema(slice)
if (acc.contains(table)) acc
else acc.updated(table, slice)
}
}

val numberOfDatabases: Int = _connectionFactorySettings.size

val dataPartitionSliceRanges: immutable.IndexedSeq[Range] = {
val rangeSize = NumberOfSlices / numberOfDataPartitions
(0 until numberOfDataPartitions).map { i =>
(i * rangeSize until i * rangeSize + rangeSize)
}.toVector
}

val connectionFactorSliceRanges: immutable.IndexedSeq[Range] = {
val rangeSize = NumberOfSlices / numberOfDatabases
(0 until numberOfDatabases).map { i =>
(i * rangeSize until i * rangeSize + rangeSize)
}.toVector
}

/**
* INTERNAL API
*/
@InternalApi private[akka] def isSliceRangeWithinSameDataPartition(minSlice: Int, maxSlice: Int): Boolean =
numberOfDataPartitions == 1 || dataPartition(minSlice) == dataPartition(maxSlice)

private def dataPartition(slice: Int): Int =
slice / (NumberOfSlices / numberOfDataPartitions)

/**
* One of the supported dialects 'postgres', 'yugabyte', 'sqlserver' or 'h2'
*/
def dialectName: String = _connectionFactorySettings.dialect.name
def dialectName: String = connectionFactorySettings.dialect.name

def getDurableStateTable(entityType: String): String =
_durableStateTableByEntityType.getOrElse(entityType, durableStateTable)
Expand Down Expand Up @@ -235,7 +342,30 @@ final class R2dbcSettings private (
/**
* INTERNAL API
*/
@InternalApi private[akka] def connectionFactorySettings: ConnectionFactorySettings = _connectionFactorySettings
@InternalApi private[akka] def connectionFactorySettings: ConnectionFactorySettings =
connectionFactorySettings(0)

/**
* INTERNAL API
*/
@InternalApi private[akka] def connectionFactorySettings(slice: Int): ConnectionFactorySettings = {
val rangeSize = numberOfDataPartitions / numberOfDatabases
val i = dataPartition(slice) / rangeSize
_connectionFactorySettings(i)
}

/**
* INTERNAL API
*/
@InternalApi private[akka] def resolveConnectionFactoryConfigPath(baseConfigPath: String, slice: Int): String = {
if (numberOfDatabases == 1) {
baseConfigPath
} else {
val rangeSize = numberOfDataPartitions / numberOfDatabases
val i = dataPartition(slice) / rangeSize
s"$baseConfigPath-${i * rangeSize}-${i * rangeSize + rangeSize - 1}"
}
}

private def copy(
schema: Option[String] = schema,
Expand All @@ -249,12 +379,13 @@ final class R2dbcSettings private (
dbTimestampMonotonicIncreasing: Boolean = dbTimestampMonotonicIncreasing,
cleanupSettings: CleanupSettings = cleanupSettings,
codecSettings: CodecSettings = codecSettings,
connectionFactorySettings: ConnectionFactorySettings = connectionFactorySettings,
connectionFactorySettings: immutable.IndexedSeq[ConnectionFactorySettings] = _connectionFactorySettings,
durableStateTableByEntityType: Map[String, String] = _durableStateTableByEntityType,
durableStateAdditionalColumnClasses: Map[String, immutable.IndexedSeq[String]] =
_durableStateAdditionalColumnClasses,
durableStateChangeHandlerClasses: Map[String, String] = _durableStateChangeHandlerClasses,
useAppTimestamp: Boolean = _useAppTimestamp): R2dbcSettings =
useAppTimestamp: Boolean = _useAppTimestamp,
numberOfDataPartitions: Int = numberOfDataPartitions): R2dbcSettings =
new R2dbcSettings(
schema,
journalTable,
Expand All @@ -268,13 +399,14 @@ final class R2dbcSettings private (
cleanupSettings,
codecSettings,
connectionFactorySettings,
_durableStateTableByEntityType,
_durableStateAdditionalColumnClasses,
_durableStateChangeHandlerClasses,
useAppTimestamp)
durableStateTableByEntityType,
durableStateAdditionalColumnClasses,
durableStateChangeHandlerClasses,
useAppTimestamp,
numberOfDataPartitions)

override def toString =
s"R2dbcSettings(dialectName=$dialectName, schema=$schema, journalTable=$journalTable, snapshotsTable=$snapshotsTable, durableStateTable=$durableStateTable, logDbCallsExceeding=$logDbCallsExceeding, dbTimestampMonotonicIncreasing=$dbTimestampMonotonicIncreasing, useAppTimestamp=$useAppTimestamp)"
s"R2dbcSettings(dialectName=$dialectName, schema=$schema, journalTable=$journalTable, snapshotsTable=$snapshotsTable, durableStateTable=$durableStateTable, logDbCallsExceeding=$logDbCallsExceeding, dbTimestampMonotonicIncreasing=$dbTimestampMonotonicIncreasing, useAppTimestamp=$useAppTimestamp, numberOfDataPartitions=$numberOfDataPartitions)"
}

/**
Expand Down
Loading

0 comments on commit ba3dfbf

Please sign in to comment.