diff --git a/core/src/main/resources/reference.conf b/core/src/main/resources/reference.conf index 026cb809..9de5dcba 100644 --- a/core/src/main/resources/reference.conf +++ b/core/src/main/resources/reference.conf @@ -2,7 +2,6 @@ // #journal-settings pekko.persistence.r2dbc { - // TODO temporary solution, change to only using keys that are actually used by the implementation journal = ${pekko.persistence.r2dbc} journal { class = "org.apache.pekko.persistence.r2dbc.journal.R2dbcJournal" @@ -25,7 +24,6 @@ pekko.persistence.r2dbc { // #snapshot-settings pekko.persistence.r2dbc { - // TODO temporary solution, change to only using keys that are actually used by the implementation snapshot = ${pekko.persistence.r2dbc} snapshot { class = "org.apache.pekko.persistence.r2dbc.snapshot.R2dbcSnapshotStore" @@ -42,7 +40,6 @@ pekko.persistence.r2dbc { // #durable-state-settings pekko.persistence.r2dbc { # Durable state store - // TODO temporary solution, change to only using keys that are actually used by the implementation state = ${pekko.persistence.r2dbc} state { class = "org.apache.pekko.persistence.r2dbc.state.R2dbcDurableStateStoreProvider" @@ -61,7 +58,6 @@ pekko.persistence.r2dbc { // #query-settings pekko.persistence.r2dbc { - // TODO temporary solution, change to only using keys that are actually used by the implementation query = ${pekko.persistence.r2dbc} query { class = "org.apache.pekko.persistence.r2dbc.query.R2dbcReadJournalProvider" @@ -160,13 +156,6 @@ pekko.persistence.r2dbc { connection-factory-options-customizer = "" } - # Enable this to reduce latency of eventsBySlices. The persisted events will be - # published as Pekko messages and consumed directly by running eventsBySlices - # queries. Tradeoff is more CPU and network resources that are used. The events - # must still be retrieved from the database, but at a lower polling frequency, - # because delivery of published messages are not guaranteed. - publish-events = off - # If database timestamp is guaranteed to not move backwards for two subsequent # updates of the same persistenceId there might be a performance gain to # set this to `on`. Note that many databases use the system clock and that can @@ -182,6 +171,13 @@ pekko.persistence.r2dbc { # Set to 0 to log all calls. log-db-calls-exceeding = 300 ms + # Enable this to reduce latency of eventsBySlices. The persisted events will be + # published as Pekko messages and consumed directly by running eventsBySlices + # queries. Tradeoff is more CPU and network resources that are used. The events + # must still be retrieved from the database, but at a lower polling frequency, + # because delivery of published messages are not guaranteed. + publish-events = off + # In-memory buffer holding events when reading from database. buffer-size = 1000 diff --git a/core/src/main/scala/org/apache/pekko/persistence/r2dbc/ConnectionFactoryProvider.scala b/core/src/main/scala/org/apache/pekko/persistence/r2dbc/ConnectionFactoryProvider.scala index 13507ab7..b6a5e5d7 100644 --- a/core/src/main/scala/org/apache/pekko/persistence/r2dbc/ConnectionFactoryProvider.scala +++ b/core/src/main/scala/org/apache/pekko/persistence/r2dbc/ConnectionFactoryProvider.scala @@ -18,19 +18,8 @@ import java.util.concurrent.ConcurrentHashMap import scala.concurrent.Future import scala.concurrent.duration.Duration -import scala.util.{ Failure, Success } -import com.typesafe.config.Config -import com.typesafe.config.ConfigFactory -import com.typesafe.config.ConfigValueType -import com.typesafe.config.ConfigValueType -import io.r2dbc.pool.ConnectionPool -import io.r2dbc.pool.ConnectionPoolConfiguration -import io.r2dbc.postgresql.PostgresqlConnectionFactoryProvider -import io.r2dbc.postgresql.client.SSLMode -import io.r2dbc.spi.ConnectionFactories -import io.r2dbc.spi.ConnectionFactory -import io.r2dbc.spi.ConnectionFactoryOptions -import io.r2dbc.spi.Option +import scala.util.Failure +import scala.util.Success import org.apache.pekko import pekko.Done import pekko.actor.CoordinatedShutdown @@ -41,6 +30,16 @@ import pekko.persistence.r2dbc.ConnectionFactoryProvider.ConnectionFactoryOption import pekko.persistence.r2dbc.ConnectionFactoryProvider.NoopCustomizer import pekko.persistence.r2dbc.internal.R2dbcExecutor.PublisherOps import pekko.util.ccompat.JavaConverters._ +import com.typesafe.config.Config +import com.typesafe.config.ConfigFactory +import io.r2dbc.pool.ConnectionPool +import io.r2dbc.pool.ConnectionPoolConfiguration +import io.r2dbc.postgresql.PostgresqlConnectionFactoryProvider +import io.r2dbc.postgresql.client.SSLMode +import io.r2dbc.spi.ConnectionFactories +import io.r2dbc.spi.ConnectionFactory +import io.r2dbc.spi.ConnectionFactoryOptions +import io.r2dbc.spi.Option object ConnectionFactoryProvider extends ExtensionId[ConnectionFactoryProvider] { def createExtension(system: ActorSystem[_]): ConnectionFactoryProvider = new ConnectionFactoryProvider(system) diff --git a/core/src/main/scala/org/apache/pekko/persistence/r2dbc/R2dbcSettings.scala b/core/src/main/scala/org/apache/pekko/persistence/r2dbc/R2dbcSettings.scala index 43ea011f..76186f44 100644 --- a/core/src/main/scala/org/apache/pekko/persistence/r2dbc/R2dbcSettings.scala +++ b/core/src/main/scala/org/apache/pekko/persistence/r2dbc/R2dbcSettings.scala @@ -19,12 +19,9 @@ import scala.concurrent.duration._ import org.apache.pekko import pekko.annotation.InternalApi import pekko.annotation.InternalStableApi +import pekko.util.Helpers.toRootLowerCase import pekko.util.JavaDurationConverters._ import com.typesafe.config.Config -import com.typesafe.config.ConfigList -import com.typesafe.config.ConfigObject -import com.typesafe.config.ConfigValueType -import pekko.util.Helpers.toRootLowerCase /** * INTERNAL API diff --git a/core/src/main/scala/org/apache/pekko/persistence/r2dbc/internal/BySliceQuery.scala b/core/src/main/scala/org/apache/pekko/persistence/r2dbc/internal/BySliceQuery.scala index ccb8a999..6c5434fc 100644 --- a/core/src/main/scala/org/apache/pekko/persistence/r2dbc/internal/BySliceQuery.scala +++ b/core/src/main/scala/org/apache/pekko/persistence/r2dbc/internal/BySliceQuery.scala @@ -13,22 +13,21 @@ package org.apache.pekko.persistence.r2dbc.internal -import scala.collection.immutable import java.time.Instant import java.time.{ Duration => JDuration } import scala.annotation.tailrec +import scala.collection.immutable import scala.concurrent.ExecutionContext import scala.concurrent.Future import scala.concurrent.duration.Duration import scala.concurrent.duration.FiniteDuration import org.apache.pekko -import org.apache.pekko.persistence.r2dbc.QuerySettings -import org.apache.pekko.persistence.r2dbc.SharedSettings import pekko.NotUsed import pekko.annotation.InternalApi import pekko.persistence.query.Offset import pekko.persistence.query.TimestampOffset +import pekko.persistence.r2dbc.SharedSettings import pekko.persistence.r2dbc.internal.BySliceQuery.Buckets.Bucket import pekko.stream.scaladsl.Flow import pekko.stream.scaladsl.Source @@ -98,7 +97,10 @@ import org.slf4j.Logger * Key is the epoch seconds for the start of the bucket. Value is the number of entries in the bucket. */ class Buckets(countByBucket: immutable.SortedMap[Buckets.EpochSeconds, Buckets.Count]) { - import Buckets.{ Bucket, BucketDurationSeconds, Count, EpochSeconds } + import Buckets.Bucket + import Buckets.BucketDurationSeconds + import Buckets.Count + import Buckets.EpochSeconds val createdAt: Instant = Instant.now() diff --git a/core/src/main/scala/org/apache/pekko/persistence/r2dbc/journal/JournalDao.scala b/core/src/main/scala/org/apache/pekko/persistence/r2dbc/journal/JournalDao.scala index 2a999788..c104a7b6 100644 --- a/core/src/main/scala/org/apache/pekko/persistence/r2dbc/journal/JournalDao.scala +++ b/core/src/main/scala/org/apache/pekko/persistence/r2dbc/journal/JournalDao.scala @@ -17,26 +17,26 @@ import java.time.Instant import scala.concurrent.ExecutionContext import scala.concurrent.Future -import com.typesafe.config.Config -import io.r2dbc.spi.ConnectionFactory -import io.r2dbc.spi.Row -import io.r2dbc.spi.Statement import org.apache.pekko -import org.apache.pekko.persistence.r2dbc.ConnectionFactoryProvider -import org.apache.pekko.persistence.r2dbc.JournalSettings -import org.apache.pekko.persistence.r2dbc.SharedSettings -import org.apache.pekko.persistence.r2dbc.internal.EventsByPersistenceIdDao -import org.apache.pekko.persistence.r2dbc.internal.HighestSequenceNrDao import pekko.actor.typed.ActorSystem import pekko.annotation.InternalApi import pekko.dispatch.ExecutionContexts import pekko.persistence.Persistence +import pekko.persistence.r2dbc.ConnectionFactoryProvider import pekko.persistence.r2dbc.Dialect +import pekko.persistence.r2dbc.JournalSettings +import pekko.persistence.r2dbc.SharedSettings import pekko.persistence.r2dbc.internal.BySliceQuery +import pekko.persistence.r2dbc.internal.EventsByPersistenceIdDao +import pekko.persistence.r2dbc.internal.HighestSequenceNrDao import pekko.persistence.r2dbc.internal.R2dbcExecutor import pekko.persistence.r2dbc.internal.Sql.DialectInterpolation import pekko.persistence.r2dbc.journal.mysql.MySQLJournalDao import pekko.persistence.typed.PersistenceId +import com.typesafe.config.Config +import io.r2dbc.spi.ConnectionFactory +import io.r2dbc.spi.Row +import io.r2dbc.spi.Statement import org.slf4j.Logger import org.slf4j.LoggerFactory diff --git a/core/src/main/scala/org/apache/pekko/persistence/r2dbc/journal/R2dbcJournal.scala b/core/src/main/scala/org/apache/pekko/persistence/r2dbc/journal/R2dbcJournal.scala index d4ac164d..200d43a0 100644 --- a/core/src/main/scala/org/apache/pekko/persistence/r2dbc/journal/R2dbcJournal.scala +++ b/core/src/main/scala/org/apache/pekko/persistence/r2dbc/journal/R2dbcJournal.scala @@ -23,7 +23,6 @@ import scala.util.Success import scala.util.Try import com.typesafe.config.Config import org.apache.pekko -import org.apache.pekko.persistence.r2dbc.ConnectionFactoryProvider import pekko.Done import pekko.actor.ActorRef import pekko.actor.typed.ActorSystem diff --git a/core/src/main/scala/org/apache/pekko/persistence/r2dbc/journal/mysql/MySQLJournalDao.scala b/core/src/main/scala/org/apache/pekko/persistence/r2dbc/journal/mysql/MySQLJournalDao.scala index 02cbb887..877711c7 100644 --- a/core/src/main/scala/org/apache/pekko/persistence/r2dbc/journal/mysql/MySQLJournalDao.scala +++ b/core/src/main/scala/org/apache/pekko/persistence/r2dbc/journal/mysql/MySQLJournalDao.scala @@ -20,14 +20,14 @@ package org.apache.pekko.persistence.r2dbc.journal.mysql import scala.concurrent.ExecutionContext -import io.r2dbc.spi.ConnectionFactory import org.apache.pekko -import org.apache.pekko.persistence.r2dbc.JournalSettings -import org.apache.pekko.persistence.r2dbc.SharedSettings import pekko.actor.typed.ActorSystem import pekko.annotation.InternalApi +import pekko.persistence.r2dbc.JournalSettings +import pekko.persistence.r2dbc.SharedSettings import pekko.persistence.r2dbc.internal.Sql.DialectInterpolation import pekko.persistence.r2dbc.journal.JournalDao +import io.r2dbc.spi.ConnectionFactory /** * INTERNAL API diff --git a/core/src/main/scala/org/apache/pekko/persistence/r2dbc/query/scaladsl/QueryDao.scala b/core/src/main/scala/org/apache/pekko/persistence/r2dbc/query/scaladsl/QueryDao.scala index 4159ab27..eb5060bc 100644 --- a/core/src/main/scala/org/apache/pekko/persistence/r2dbc/query/scaladsl/QueryDao.scala +++ b/core/src/main/scala/org/apache/pekko/persistence/r2dbc/query/scaladsl/QueryDao.scala @@ -19,27 +19,27 @@ import scala.concurrent.ExecutionContext import scala.concurrent.Future import scala.concurrent.duration.Duration import scala.concurrent.duration.FiniteDuration -import com.typesafe.config.Config -import io.r2dbc.spi.ConnectionFactory import org.apache.pekko -import org.apache.pekko.persistence.r2dbc.QuerySettings -import org.apache.pekko.persistence.r2dbc.SharedSettings -import org.apache.pekko.persistence.r2dbc.internal.EventsByPersistenceIdDao -import org.apache.pekko.persistence.r2dbc.internal.HighestSequenceNrDao import pekko.NotUsed import pekko.actor.typed.ActorSystem import pekko.annotation.InternalApi import pekko.persistence.r2dbc.ConnectionFactoryProvider import pekko.persistence.r2dbc.Dialect +import pekko.persistence.r2dbc.QuerySettings +import pekko.persistence.r2dbc.SharedSettings import pekko.persistence.r2dbc.internal.BySliceQuery import pekko.persistence.r2dbc.internal.BySliceQuery.Buckets import pekko.persistence.r2dbc.internal.BySliceQuery.Buckets.Bucket +import pekko.persistence.r2dbc.internal.EventsByPersistenceIdDao +import pekko.persistence.r2dbc.internal.HighestSequenceNrDao import pekko.persistence.r2dbc.internal.R2dbcExecutor import pekko.persistence.r2dbc.internal.Sql.DialectInterpolation import pekko.persistence.r2dbc.journal.JournalDao import pekko.persistence.r2dbc.journal.JournalDao.SerializedJournalRow import pekko.persistence.r2dbc.query.scaladsl.mysql.MySQLQueryDao import pekko.stream.scaladsl.Source +import com.typesafe.config.Config +import io.r2dbc.spi.ConnectionFactory import org.slf4j.Logger import org.slf4j.LoggerFactory @@ -47,7 +47,7 @@ object QueryDao { val log: Logger = LoggerFactory.getLogger(classOf[QueryDao]) def fromConfig( - settings: QuerySettings, + settings: QuerySettings, config: Config )(implicit system: ActorSystem[_], ec: ExecutionContext): QueryDao = { val connectionFactory = diff --git a/core/src/main/scala/org/apache/pekko/persistence/r2dbc/query/scaladsl/R2dbcReadJournal.scala b/core/src/main/scala/org/apache/pekko/persistence/r2dbc/query/scaladsl/R2dbcReadJournal.scala index fe6e7f3c..f721f845 100644 --- a/core/src/main/scala/org/apache/pekko/persistence/r2dbc/query/scaladsl/R2dbcReadJournal.scala +++ b/core/src/main/scala/org/apache/pekko/persistence/r2dbc/query/scaladsl/R2dbcReadJournal.scala @@ -21,11 +21,6 @@ import scala.concurrent.Future import scala.concurrent.duration.FiniteDuration import com.typesafe.config.Config import org.apache.pekko -import org.apache.pekko.Done -import org.apache.pekko.actor.CoordinatedShutdown -import org.apache.pekko.actor.CoordinatedShutdown -import org.apache.pekko.persistence.r2dbc.ConnectionFactoryProvider -import org.apache.pekko.persistence.r2dbc.internal.R2dbcExecutor.PublisherOps import pekko.NotUsed import pekko.actor.ExtendedActorSystem import pekko.actor.typed.ActorSystem diff --git a/core/src/main/scala/org/apache/pekko/persistence/r2dbc/query/scaladsl/mysql/MySQLQueryDao.scala b/core/src/main/scala/org/apache/pekko/persistence/r2dbc/query/scaladsl/mysql/MySQLQueryDao.scala index 94055b31..dc04ddad 100644 --- a/core/src/main/scala/org/apache/pekko/persistence/r2dbc/query/scaladsl/mysql/MySQLQueryDao.scala +++ b/core/src/main/scala/org/apache/pekko/persistence/r2dbc/query/scaladsl/mysql/MySQLQueryDao.scala @@ -25,13 +25,13 @@ import scala.concurrent.ExecutionContext import scala.concurrent.Future import scala.concurrent.duration.Duration import scala.concurrent.duration.FiniteDuration -import io.r2dbc.spi.ConnectionFactory import org.apache.pekko -import org.apache.pekko.persistence.r2dbc.QuerySettings import pekko.actor.typed.ActorSystem import pekko.annotation.InternalApi +import pekko.persistence.r2dbc.QuerySettings import pekko.persistence.r2dbc.internal.Sql.DialectInterpolation import pekko.persistence.r2dbc.query.scaladsl.QueryDao +import io.r2dbc.spi.ConnectionFactory /** * INTERNAL API diff --git a/core/src/main/scala/org/apache/pekko/persistence/r2dbc/snapshot/R2dbcSnapshotStore.scala b/core/src/main/scala/org/apache/pekko/persistence/r2dbc/snapshot/R2dbcSnapshotStore.scala index 03fa3e68..a030edc7 100644 --- a/core/src/main/scala/org/apache/pekko/persistence/r2dbc/snapshot/R2dbcSnapshotStore.scala +++ b/core/src/main/scala/org/apache/pekko/persistence/r2dbc/snapshot/R2dbcSnapshotStore.scala @@ -13,21 +13,23 @@ package org.apache.pekko.persistence.r2dbc.snapshot +import scala.concurrent.ExecutionContext +import scala.concurrent.Future import org.apache.pekko import pekko.actor.typed.ActorSystem import pekko.actor.typed.scaladsl.adapter._ -import pekko.persistence.{ SelectedSnapshot, SnapshotMetadata, SnapshotSelectionCriteria } -import pekko.persistence.r2dbc.SnapshotSettings -import pekko.persistence.snapshot.SnapshotStore -import pekko.serialization.{ Serialization, SerializationExtension } -import com.typesafe.config.Config -import org.apache.pekko.persistence.r2dbc.ConnectionFactoryProvider - -import scala.concurrent.{ ExecutionContext, Future } import pekko.annotation.InternalApi +import pekko.persistence.r2dbc.SnapshotSettings import pekko.persistence.r2dbc.snapshot.SnapshotDao.SerializedSnapshotMetadata import pekko.persistence.r2dbc.snapshot.SnapshotDao.SerializedSnapshotRow +import pekko.persistence.snapshot.SnapshotStore +import pekko.persistence.SelectedSnapshot +import pekko.persistence.SnapshotMetadata +import pekko.persistence.SnapshotSelectionCriteria import pekko.serialization.Serializers +import pekko.serialization.Serialization +import pekko.serialization.SerializationExtension +import com.typesafe.config.Config object R2dbcSnapshotStore { private def deserializeSnapshotRow(snap: SerializedSnapshotRow, serialization: Serialization): SelectedSnapshot = diff --git a/core/src/main/scala/org/apache/pekko/persistence/r2dbc/snapshot/SnapshotDao.scala b/core/src/main/scala/org/apache/pekko/persistence/r2dbc/snapshot/SnapshotDao.scala index 1bbdb823..bc3bb942 100644 --- a/core/src/main/scala/org/apache/pekko/persistence/r2dbc/snapshot/SnapshotDao.scala +++ b/core/src/main/scala/org/apache/pekko/persistence/r2dbc/snapshot/SnapshotDao.scala @@ -15,9 +15,6 @@ package org.apache.pekko.persistence.r2dbc.snapshot import scala.concurrent.ExecutionContext import scala.concurrent.Future -import com.typesafe.config.Config -import io.r2dbc.spi.ConnectionFactory -import io.r2dbc.spi.Row import org.apache.pekko import pekko.actor.typed.ActorSystem import pekko.annotation.InternalApi @@ -31,6 +28,9 @@ import pekko.persistence.r2dbc.internal.R2dbcExecutor import pekko.persistence.r2dbc.internal.Sql.DialectInterpolation import pekko.persistence.r2dbc.snapshot.mysql.MySQLSnapshotDao import pekko.persistence.typed.PersistenceId +import com.typesafe.config.Config +import io.r2dbc.spi.ConnectionFactory +import io.r2dbc.spi.Row import org.slf4j.Logger import org.slf4j.LoggerFactory diff --git a/core/src/main/scala/org/apache/pekko/persistence/r2dbc/state/scaladsl/DurableStateDao.scala b/core/src/main/scala/org/apache/pekko/persistence/r2dbc/state/scaladsl/DurableStateDao.scala index 88f1400f..2c5d4546 100644 --- a/core/src/main/scala/org/apache/pekko/persistence/r2dbc/state/scaladsl/DurableStateDao.scala +++ b/core/src/main/scala/org/apache/pekko/persistence/r2dbc/state/scaladsl/DurableStateDao.scala @@ -19,12 +19,7 @@ import scala.concurrent.ExecutionContext import scala.concurrent.Future import scala.concurrent.duration.Duration import scala.concurrent.duration.FiniteDuration -import com.typesafe.config.Config -import io.r2dbc.spi.ConnectionFactory -import io.r2dbc.spi.R2dbcDataIntegrityViolationException -import io.r2dbc.spi.Statement import org.apache.pekko -import org.apache.pekko.persistence.r2dbc.StateSettings import pekko.Done import pekko.NotUsed import pekko.actor.typed.ActorSystem @@ -33,6 +28,7 @@ import pekko.dispatch.ExecutionContexts import pekko.persistence.Persistence import pekko.persistence.r2dbc.ConnectionFactoryProvider import pekko.persistence.r2dbc.Dialect +import pekko.persistence.r2dbc.StateSettings import pekko.persistence.r2dbc.internal.BySliceQuery import pekko.persistence.r2dbc.internal.BySliceQuery.Buckets import pekko.persistence.r2dbc.internal.BySliceQuery.Buckets.Bucket @@ -41,6 +37,10 @@ import pekko.persistence.r2dbc.internal.Sql.DialectInterpolation import pekko.persistence.r2dbc.state.scaladsl.mysql.MySQLDurableStateDao import pekko.persistence.typed.PersistenceId import pekko.stream.scaladsl.Source +import com.typesafe.config.Config +import io.r2dbc.spi.ConnectionFactory +import io.r2dbc.spi.R2dbcDataIntegrityViolationException +import io.r2dbc.spi.Statement import org.slf4j.Logger import org.slf4j.LoggerFactory diff --git a/core/src/main/scala/org/apache/pekko/persistence/r2dbc/state/scaladsl/R2dbcDurableStateStore.scala b/core/src/main/scala/org/apache/pekko/persistence/r2dbc/state/scaladsl/R2dbcDurableStateStore.scala index 711a433d..ab30892e 100644 --- a/core/src/main/scala/org/apache/pekko/persistence/r2dbc/state/scaladsl/R2dbcDurableStateStore.scala +++ b/core/src/main/scala/org/apache/pekko/persistence/r2dbc/state/scaladsl/R2dbcDurableStateStore.scala @@ -18,10 +18,6 @@ import scala.concurrent.ExecutionContext import scala.concurrent.Future import com.typesafe.config.Config import org.apache.pekko -import org.apache.pekko.actor.CoordinatedShutdown -import org.apache.pekko.actor.CoordinatedShutdown -import org.apache.pekko.persistence.r2dbc.ConnectionFactoryProvider -import org.apache.pekko.persistence.r2dbc.internal.R2dbcExecutor.PublisherOps import pekko.Done import pekko.NotUsed import pekko.actor.ExtendedActorSystem diff --git a/core/src/main/scala/org/apache/pekko/persistence/r2dbc/state/scaladsl/mysql/MySQLDurableStateDao.scala b/core/src/main/scala/org/apache/pekko/persistence/r2dbc/state/scaladsl/mysql/MySQLDurableStateDao.scala index 83cf3ebd..bcd70569 100644 --- a/core/src/main/scala/org/apache/pekko/persistence/r2dbc/state/scaladsl/mysql/MySQLDurableStateDao.scala +++ b/core/src/main/scala/org/apache/pekko/persistence/r2dbc/state/scaladsl/mysql/MySQLDurableStateDao.scala @@ -25,14 +25,14 @@ import scala.concurrent.ExecutionContext import scala.concurrent.Future import scala.concurrent.duration.Duration import scala.concurrent.duration.FiniteDuration -import io.r2dbc.spi.ConnectionFactory import org.apache.pekko -import org.apache.pekko.persistence.r2dbc.StateSettings import pekko.actor.typed.ActorSystem import pekko.annotation.InternalApi +import pekko.persistence.r2dbc.StateSettings import pekko.persistence.r2dbc.internal.Sql.DialectInterpolation import pekko.persistence.r2dbc.journal.mysql.MySQLJournalDao import pekko.persistence.r2dbc.state.scaladsl.DurableStateDao +import io.r2dbc.spi.ConnectionFactory /** * INTERNAL API diff --git a/core/src/test/scala/org/apache/pekko/persistence/r2dbc/TestConfig.scala b/core/src/test/scala/org/apache/pekko/persistence/r2dbc/TestConfig.scala index eb0a2b7d..1ee56c6e 100644 --- a/core/src/test/scala/org/apache/pekko/persistence/r2dbc/TestConfig.scala +++ b/core/src/test/scala/org/apache/pekko/persistence/r2dbc/TestConfig.scala @@ -17,7 +17,6 @@ import com.typesafe.config.Config import com.typesafe.config.ConfigFactory object TestConfig { - lazy val unresolvedConfig: Config = { val defaultConfig = ConfigFactory.load() val dialect = defaultConfig.getString("pekko.persistence.r2dbc.dialect") diff --git a/core/src/test/scala/org/apache/pekko/persistence/r2dbc/TestDbLifecycle.scala b/core/src/test/scala/org/apache/pekko/persistence/r2dbc/TestDbLifecycle.scala index 79e3ba0b..5adfee8e 100644 --- a/core/src/test/scala/org/apache/pekko/persistence/r2dbc/TestDbLifecycle.scala +++ b/core/src/test/scala/org/apache/pekko/persistence/r2dbc/TestDbLifecycle.scala @@ -13,14 +13,14 @@ package org.apache.pekko.persistence.r2dbc -import com.typesafe.config.Config import scala.concurrent.Await import scala.concurrent.duration._ -import io.r2dbc.spi.ConnectionFactory import org.apache.pekko import pekko.actor.typed.ActorSystem import pekko.persistence.Persistence import pekko.persistence.r2dbc.internal.R2dbcExecutor +import com.typesafe.config.Config +import io.r2dbc.spi.ConnectionFactory import org.scalatest.BeforeAndAfterAll import org.scalatest.Suite import org.slf4j.LoggerFactory diff --git a/core/src/test/scala/org/apache/pekko/persistence/r2dbc/journal/PersistTagsSpec.scala b/core/src/test/scala/org/apache/pekko/persistence/r2dbc/journal/PersistTagsSpec.scala index d3241bad..20427d26 100644 --- a/core/src/test/scala/org/apache/pekko/persistence/r2dbc/journal/PersistTagsSpec.scala +++ b/core/src/test/scala/org/apache/pekko/persistence/r2dbc/journal/PersistTagsSpec.scala @@ -15,11 +15,11 @@ package org.apache.pekko.persistence.r2dbc.journal import scala.concurrent.duration._ import org.apache.pekko -import org.apache.pekko.persistence.r2dbc.JournalSettings import pekko.Done import pekko.actor.testkit.typed.scaladsl.LogCapturing import pekko.actor.testkit.typed.scaladsl.ScalaTestWithActorTestKit import pekko.actor.typed.ActorSystem +import pekko.persistence.r2dbc.JournalSettings import pekko.persistence.r2dbc.TestActors.Persister import pekko.persistence.r2dbc.TestConfig import pekko.persistence.r2dbc.TestData diff --git a/core/src/test/scala/org/apache/pekko/persistence/r2dbc/journal/PersistTimestampSpec.scala b/core/src/test/scala/org/apache/pekko/persistence/r2dbc/journal/PersistTimestampSpec.scala index 3e9d9b29..5b29f6d4 100644 --- a/core/src/test/scala/org/apache/pekko/persistence/r2dbc/journal/PersistTimestampSpec.scala +++ b/core/src/test/scala/org/apache/pekko/persistence/r2dbc/journal/PersistTimestampSpec.scala @@ -17,11 +17,11 @@ import java.time.Instant import scala.concurrent.duration._ import org.apache.pekko -import org.apache.pekko.persistence.r2dbc.JournalSettings import pekko.Done import pekko.actor.testkit.typed.scaladsl.LogCapturing import pekko.actor.testkit.typed.scaladsl.ScalaTestWithActorTestKit import pekko.actor.typed.ActorSystem +import pekko.persistence.r2dbc.JournalSettings import pekko.persistence.r2dbc.TestActors.Persister import pekko.persistence.r2dbc.TestConfig import pekko.persistence.r2dbc.TestData diff --git a/core/src/test/scala/org/apache/pekko/persistence/r2dbc/query/EventsBySliceBacktrackingSpec.scala b/core/src/test/scala/org/apache/pekko/persistence/r2dbc/query/EventsBySliceBacktrackingSpec.scala index 4ba690c2..c4c08159 100644 --- a/core/src/test/scala/org/apache/pekko/persistence/r2dbc/query/EventsBySliceBacktrackingSpec.scala +++ b/core/src/test/scala/org/apache/pekko/persistence/r2dbc/query/EventsBySliceBacktrackingSpec.scala @@ -17,7 +17,6 @@ import java.time.Instant import scala.concurrent.duration._ import org.apache.pekko -import org.apache.pekko.persistence.r2dbc.JournalSettings import pekko.actor.testkit.typed.scaladsl.LogCapturing import pekko.actor.testkit.typed.scaladsl.ScalaTestWithActorTestKit import pekko.actor.typed.ActorSystem @@ -25,6 +24,7 @@ import pekko.persistence.query.NoOffset import pekko.persistence.query.PersistenceQuery import pekko.persistence.query.typed.EventEnvelope import pekko.persistence.r2dbc.Dialect +import pekko.persistence.r2dbc.JournalSettings import pekko.persistence.r2dbc.TestConfig import pekko.persistence.r2dbc.TestData import pekko.persistence.r2dbc.TestDbLifecycle diff --git a/migration/src/main/scala/org/apache/pekko/persistence/r2dbc/migration/MigrationTool.scala b/migration/src/main/scala/org/apache/pekko/persistence/r2dbc/migration/MigrationTool.scala index 4f68f08e..055138cb 100644 --- a/migration/src/main/scala/org/apache/pekko/persistence/r2dbc/migration/MigrationTool.scala +++ b/migration/src/main/scala/org/apache/pekko/persistence/r2dbc/migration/MigrationTool.scala @@ -32,12 +32,14 @@ import pekko.persistence.SelectedSnapshot import pekko.persistence.SnapshotProtocol.LoadSnapshot import pekko.persistence.SnapshotProtocol.LoadSnapshotResult import pekko.persistence.SnapshotSelectionCriteria -import pekko.persistence.query.{ EventEnvelope => ClassicEventEnvelope } import pekko.persistence.query.PersistenceQuery import pekko.persistence.query.scaladsl.CurrentEventsByPersistenceIdQuery import pekko.persistence.query.scaladsl.CurrentPersistenceIdsQuery import pekko.persistence.query.scaladsl.ReadJournal +import pekko.persistence.query.{ EventEnvelope => ClassicEventEnvelope } import pekko.persistence.r2dbc.ConnectionFactoryProvider +import pekko.persistence.r2dbc.JournalSettings +import pekko.persistence.r2dbc.SnapshotSettings import pekko.persistence.r2dbc.journal.JournalDao import pekko.persistence.r2dbc.journal.JournalDao.SerializedEventMetadata import pekko.persistence.r2dbc.journal.JournalDao.SerializedJournalRow @@ -52,9 +54,6 @@ import pekko.serialization.Serializers import pekko.stream.scaladsl.Sink import pekko.util.Timeout import io.r2dbc.spi.R2dbcDataIntegrityViolationException -import org.apache.pekko.persistence.r2dbc.JournalSettings -import org.apache.pekko.persistence.r2dbc.SharedSettings -import org.apache.pekko.persistence.r2dbc.SnapshotSettings import org.slf4j.LoggerFactory object MigrationTool { @@ -143,7 +142,6 @@ class MigrationTool(system: ActorSystem[_]) { private val sourceSnapshotPluginId = migrationConfig.getString("source.snapshot-plugin-id") private lazy val sourceSnapshotStore = Persistence(system).snapshotStoreFor(sourceSnapshotPluginId) - // TODO using journal connection factory settings for migration-specific functionality, consider adding separate settings private[r2dbc] val migrationDao = new MigrationToolDao(targetJournalConnectionFactory, targetJournalSettings.logDbCallsExceeding) diff --git a/migration/src/test/scala/org/apache/pekko/persistence/r2dbc/migration/MigrationToolSpec.scala b/migration/src/test/scala/org/apache/pekko/persistence/r2dbc/migration/MigrationToolSpec.scala index b6133760..178319fe 100644 --- a/migration/src/test/scala/org/apache/pekko/persistence/r2dbc/migration/MigrationToolSpec.scala +++ b/migration/src/test/scala/org/apache/pekko/persistence/r2dbc/migration/MigrationToolSpec.scala @@ -177,8 +177,7 @@ class MigrationToolSpec "MigrationTool" should { if (!testEnabled) { - info( - s"MigrationToolSpec not enabled for ${system.settings.config.getString("pekko.persistence.r2dbc.dialect")}") + info(s"MigrationToolSpec not enabled for ${system.settings.config.getString("pekko.persistence.r2dbc.dialect")}") pending } diff --git a/projection/src/test/scala/org/apache/pekko/projection/r2dbc/EventSourcedEndToEndSpec.scala b/projection/src/test/scala/org/apache/pekko/projection/r2dbc/EventSourcedEndToEndSpec.scala index a9a7c8ce..74181cf4 100644 --- a/projection/src/test/scala/org/apache/pekko/projection/r2dbc/EventSourcedEndToEndSpec.scala +++ b/projection/src/test/scala/org/apache/pekko/projection/r2dbc/EventSourcedEndToEndSpec.scala @@ -29,6 +29,7 @@ import pekko.actor.typed.Behavior import pekko.actor.typed.scaladsl.Behaviors import pekko.persistence.query.typed.EventEnvelope import pekko.persistence.r2dbc.Dialect +import pekko.persistence.r2dbc.JournalSettings import pekko.persistence.r2dbc.internal.Sql.DialectInterpolation import pekko.persistence.r2dbc.query.scaladsl.R2dbcReadJournal import pekko.persistence.typed.PersistenceId @@ -43,7 +44,6 @@ import pekko.projection.r2dbc.scaladsl.R2dbcSession import pekko.serialization.SerializationExtension import com.typesafe.config.Config import com.typesafe.config.ConfigFactory -import org.apache.pekko.persistence.r2dbc.JournalSettings import org.scalatest.wordspec.AnyWordSpecLike import org.slf4j.LoggerFactory