diff --git a/core/src/main/scala/akka/persistence/r2dbc/R2dbcSettings.scala b/core/src/main/scala/akka/persistence/r2dbc/R2dbcSettings.scala index 16dbd28f..5f64b4fc 100644 --- a/core/src/main/scala/akka/persistence/r2dbc/R2dbcSettings.scala +++ b/core/src/main/scala/akka/persistence/r2dbc/R2dbcSettings.scala @@ -334,7 +334,28 @@ final class CodecSettings( val durableStatePayloadCodec: PayloadCodec, val tagsCodec: TagsCodec, val timestampCodec: TimestampCodec, - val queryAdapter: QueryAdapter) + val queryAdapter: QueryAdapter) { + + // implicits that can be imported + object JournalImplicits { + implicit def journalPayloadCodec: PayloadCodec = CodecSettings.this.journalPayloadCodec + implicit def tagsCodec: TagsCodec = CodecSettings.this.tagsCodec + implicit def timestampCodec: TimestampCodec = CodecSettings.this.timestampCodec + implicit def queryAdapter: QueryAdapter = CodecSettings.this.queryAdapter + } + object SnapshotImplicits { + implicit def snapshotPayloadCodec: PayloadCodec = CodecSettings.this.snapshotPayloadCodec + implicit def tagsCodec: TagsCodec = CodecSettings.this.tagsCodec + implicit def timestampCodec: TimestampCodec = CodecSettings.this.timestampCodec + implicit def queryAdapter: QueryAdapter = CodecSettings.this.queryAdapter + } + object DurableStateImplicits { + implicit def durableStatePayloadCodec: PayloadCodec = CodecSettings.this.durableStatePayloadCodec + implicit def tagsCodec: TagsCodec = CodecSettings.this.tagsCodec + implicit def timestampCodec: TimestampCodec = CodecSettings.this.timestampCodec + implicit def queryAdapter: QueryAdapter = CodecSettings.this.queryAdapter + } +} /** * INTERNAL API diff --git a/core/src/main/scala/akka/persistence/r2dbc/internal/h2/H2JournalDao.scala b/core/src/main/scala/akka/persistence/r2dbc/internal/h2/H2JournalDao.scala index 3d29a72f..98e55d53 100644 --- a/core/src/main/scala/akka/persistence/r2dbc/internal/h2/H2JournalDao.scala +++ b/core/src/main/scala/akka/persistence/r2dbc/internal/h2/H2JournalDao.scala @@ -34,6 +34,7 @@ private[r2dbc] class H2JournalDao(journalSettings: R2dbcSettings, connectionFact system: ActorSystem[_]) extends PostgresJournalDao(journalSettings, connectionFactory) { import JournalDao.SerializedJournalRow + import journalSettings.codecSettings.JournalImplicits._ override protected lazy val log: Logger = LoggerFactory.getLogger(classOf[H2JournalDao]) // always app timestamp (db is same process) monotonic increasing require(journalSettings.useAppTimestamp) diff --git a/core/src/main/scala/akka/persistence/r2dbc/internal/h2/H2QueryDao.scala b/core/src/main/scala/akka/persistence/r2dbc/internal/h2/H2QueryDao.scala index cb4f019a..00a856ce 100644 --- a/core/src/main/scala/akka/persistence/r2dbc/internal/h2/H2QueryDao.scala +++ b/core/src/main/scala/akka/persistence/r2dbc/internal/h2/H2QueryDao.scala @@ -26,6 +26,7 @@ private[r2dbc] class H2QueryDao(settings: R2dbcSettings, connectionFactory: Conn ec: ExecutionContext, system: ActorSystem[_]) extends PostgresQueryDao(settings, connectionFactory) { + import settings.codecSettings.JournalImplicits._ override protected lazy val log: Logger = LoggerFactory.getLogger(classOf[H2QueryDao]) override protected def eventsBySlicesRangeSql( diff --git a/core/src/main/scala/akka/persistence/r2dbc/internal/h2/H2SnapshotDao.scala b/core/src/main/scala/akka/persistence/r2dbc/internal/h2/H2SnapshotDao.scala index 05d46866..78178101 100644 --- a/core/src/main/scala/akka/persistence/r2dbc/internal/h2/H2SnapshotDao.scala +++ b/core/src/main/scala/akka/persistence/r2dbc/internal/h2/H2SnapshotDao.scala @@ -24,6 +24,7 @@ private[r2dbc] final class H2SnapshotDao(settings: R2dbcSettings, connectionFact ec: ExecutionContext, system: ActorSystem[_]) extends PostgresSnapshotDao(settings, connectionFactory) { + import settings.codecSettings.SnapshotImplicits._ override protected lazy val log: Logger = LoggerFactory.getLogger(classOf[H2SnapshotDao]) diff --git a/core/src/main/scala/akka/persistence/r2dbc/internal/postgres/PostgresDurableStateDao.scala b/core/src/main/scala/akka/persistence/r2dbc/internal/postgres/PostgresDurableStateDao.scala index af75971b..21a7f326 100644 --- a/core/src/main/scala/akka/persistence/r2dbc/internal/postgres/PostgresDurableStateDao.scala +++ b/core/src/main/scala/akka/persistence/r2dbc/internal/postgres/PostgresDurableStateDao.scala @@ -48,12 +48,7 @@ import akka.persistence.r2dbc.internal.codec.PayloadCodec.RichRow import akka.persistence.r2dbc.internal.codec.PayloadCodec.RichStatement import akka.persistence.r2dbc.internal.R2dbcExecutor import akka.persistence.r2dbc.internal.Sql.InterpolationWithAdapter -import akka.persistence.r2dbc.internal.codec.PayloadCodec -import akka.persistence.r2dbc.internal.codec.QueryAdapter -import akka.persistence.r2dbc.internal.codec.SqlServerQueryAdapter -import akka.persistence.r2dbc.internal.codec.TagsCodec import akka.persistence.r2dbc.internal.codec.TagsCodec.TagsCodecRichStatement -import akka.persistence.r2dbc.internal.codec.TimestampCodec import akka.persistence.r2dbc.internal.codec.TimestampCodec.TimestampCodecRichRow import akka.persistence.r2dbc.internal.codec.TimestampCodec.TimestampCodecRichStatement import akka.persistence.r2dbc.session.scaladsl.R2dbcSession @@ -90,6 +85,7 @@ private[r2dbc] class PostgresDurableStateDao( extends DurableStateDao { import DurableStateDao._ import PostgresDurableStateDao._ + import settings.codecSettings.DurableStateImplicits._ protected def log: Logger = PostgresDurableStateDao.log private val persistenceExt = Persistence(system) @@ -99,12 +95,6 @@ private[r2dbc] class PostgresDurableStateDao( settings.logDbCallsExceeding, settings.connectionFactorySettings.poolSettings.closeCallsExceeding)(ec, system) - private implicit val statePayloadCodec: PayloadCodec = settings.codecSettings.durableStatePayloadCodec - private implicit val tagsCodec: TagsCodec = settings.codecSettings.tagsCodec - - protected implicit val timestampCodec: TimestampCodec = settings.codecSettings.timestampCodec - protected implicit val queryAdapter: QueryAdapter = settings.codecSettings.queryAdapter - // used for change events private lazy val journalDao: JournalDao = dialect.createJournalDao(settings, connectionFactory) @@ -322,7 +312,7 @@ private[r2dbc] class PostgresDurableStateDao( private def getPayload(row: Row): Option[Array[Byte]] = { val serId = row.get("state_ser_id", classOf[Integer]) val rowPayload = row.getPayload("state_payload") - if (serId == 0 && (rowPayload == null || util.Arrays.equals(statePayloadCodec.nonePayload, rowPayload))) + if (serId == 0 && (rowPayload == null || util.Arrays.equals(durableStatePayloadCodec.nonePayload, rowPayload))) None // delete marker else Option(rowPayload) diff --git a/core/src/main/scala/akka/persistence/r2dbc/internal/postgres/PostgresJournalDao.scala b/core/src/main/scala/akka/persistence/r2dbc/internal/postgres/PostgresJournalDao.scala index b5f68f71..a5349948 100644 --- a/core/src/main/scala/akka/persistence/r2dbc/internal/postgres/PostgresJournalDao.scala +++ b/core/src/main/scala/akka/persistence/r2dbc/internal/postgres/PostgresJournalDao.scala @@ -15,9 +15,7 @@ import akka.persistence.r2dbc.internal.codec.PayloadCodec.RichStatement import akka.persistence.r2dbc.internal.R2dbcExecutor import akka.persistence.r2dbc.internal.SerializedEventMetadata import akka.persistence.r2dbc.internal.Sql.InterpolationWithAdapter -import akka.persistence.r2dbc.internal.codec.TagsCodec import akka.persistence.r2dbc.internal.codec.TagsCodec.TagsCodecRichStatement -import akka.persistence.r2dbc.internal.codec.TimestampCodec import akka.persistence.r2dbc.internal.codec.TimestampCodec.TimestampCodecRichRow import akka.persistence.r2dbc.internal.codec.TimestampCodec.TimestampCodecRichStatement import akka.persistence.typed.PersistenceId @@ -68,6 +66,7 @@ private[r2dbc] class PostgresJournalDao(journalSettings: R2dbcSettings, connecti system: ActorSystem[_]) extends JournalDao { import JournalDao.SerializedJournalRow + import journalSettings.codecSettings.JournalImplicits._ protected def log: Logger = PostgresJournalDao.log private val persistenceExt = Persistence(system) @@ -80,10 +79,6 @@ private[r2dbc] class PostgresJournalDao(journalSettings: R2dbcSettings, connecti journalSettings.connectionFactorySettings.poolSettings.closeCallsExceeding)(ec, system) protected val journalTable: String = journalSettings.journalTableWithSchema - protected implicit val journalPayloadCodec: PayloadCodec = journalSettings.codecSettings.journalPayloadCodec - protected implicit val tagsCodec: TagsCodec = journalSettings.codecSettings.tagsCodec - protected implicit val timestampCodec: TimestampCodec = journalSettings.codecSettings.timestampCodec - protected implicit val queryAdapter: QueryAdapter = journalSettings.codecSettings.queryAdapter protected val (insertEventWithParameterTimestampSql, insertEventWithTransactionTimestampSql) = { val baseSql = diff --git a/core/src/main/scala/akka/persistence/r2dbc/internal/postgres/PostgresQueryDao.scala b/core/src/main/scala/akka/persistence/r2dbc/internal/postgres/PostgresQueryDao.scala index 6dfc31a4..6aed9e5b 100644 --- a/core/src/main/scala/akka/persistence/r2dbc/internal/postgres/PostgresQueryDao.scala +++ b/core/src/main/scala/akka/persistence/r2dbc/internal/postgres/PostgresQueryDao.scala @@ -24,10 +24,7 @@ import akka.persistence.r2dbc.internal.codec.PayloadCodec.RichRow import akka.persistence.r2dbc.internal.QueryDao import akka.persistence.r2dbc.internal.R2dbcExecutor import akka.persistence.r2dbc.internal.Sql.InterpolationWithAdapter -import akka.persistence.r2dbc.internal.codec.QueryAdapter -import akka.persistence.r2dbc.internal.codec.TagsCodec import akka.persistence.r2dbc.internal.codec.TagsCodec.TagsCodecRichRow -import akka.persistence.r2dbc.internal.codec.TimestampCodec import akka.persistence.r2dbc.internal.codec.TimestampCodec.TimestampCodecRichRow import akka.persistence.r2dbc.internal.codec.TimestampCodec.TimestampCodecRichStatement import akka.persistence.typed.PersistenceId @@ -36,7 +33,6 @@ import io.r2dbc.spi.ConnectionFactory import io.r2dbc.spi.Statement import org.slf4j.Logger import org.slf4j.LoggerFactory -import akka.persistence.r2dbc.internal.codec.PayloadCodec /** * INTERNAL API @@ -55,13 +51,10 @@ private[r2dbc] class PostgresQueryDao(settings: R2dbcSettings, connectionFactory system: ActorSystem[_]) extends QueryDao { import PostgresJournalDao.readMetadata + import settings.codecSettings.DurableStateImplicits._ protected def log: Logger = PostgresQueryDao.log protected val journalTable: String = settings.journalTableWithSchema - protected implicit val journalPayloadCodec: PayloadCodec = settings.codecSettings.journalPayloadCodec - protected implicit val timestampCodec: TimestampCodec = settings.codecSettings.timestampCodec - protected implicit val tagsCodec: TagsCodec = settings.codecSettings.tagsCodec - protected implicit val queryAdapter: QueryAdapter = settings.codecSettings.queryAdapter protected def sqlFalse: String = "false" protected def sqlDbTimestamp = "CURRENT_TIMESTAMP" diff --git a/core/src/main/scala/akka/persistence/r2dbc/internal/postgres/PostgresSnapshotDao.scala b/core/src/main/scala/akka/persistence/r2dbc/internal/postgres/PostgresSnapshotDao.scala index 2d1b3953..f5ea95f3 100644 --- a/core/src/main/scala/akka/persistence/r2dbc/internal/postgres/PostgresSnapshotDao.scala +++ b/core/src/main/scala/akka/persistence/r2dbc/internal/postgres/PostgresSnapshotDao.scala @@ -31,12 +31,8 @@ import akka.persistence.r2dbc.internal.codec.PayloadCodec.RichStatement import akka.persistence.r2dbc.internal.R2dbcExecutor import akka.persistence.r2dbc.internal.SnapshotDao import akka.persistence.r2dbc.internal.Sql.InterpolationWithAdapter -import akka.persistence.r2dbc.internal.codec.PayloadCodec -import akka.persistence.r2dbc.internal.codec.QueryAdapter -import akka.persistence.r2dbc.internal.codec.TagsCodec import akka.persistence.r2dbc.internal.codec.TagsCodec.TagsCodecRichStatement import akka.persistence.r2dbc.internal.codec.TagsCodec.TagsCodecRichRow -import akka.persistence.r2dbc.internal.codec.TimestampCodec import akka.persistence.r2dbc.internal.codec.TimestampCodec.TimestampCodecRichRow import akka.persistence.r2dbc.internal.codec.TimestampCodec.TimestampCodecRichStatement import akka.persistence.typed.PersistenceId @@ -58,16 +54,12 @@ private[r2dbc] class PostgresSnapshotDao(settings: R2dbcSettings, connectionFact system: ActorSystem[_]) extends SnapshotDao { import SnapshotDao._ + import settings.codecSettings.SnapshotImplicits._ protected def log: Logger = PostgresSnapshotDao.log protected val snapshotTable: String = settings.snapshotsTableWithSchema - protected implicit val snapshotPayloadCodec: PayloadCodec = settings.codecSettings.snapshotPayloadCodec - protected implicit val timestampCodec: TimestampCodec = settings.codecSettings.timestampCodec - protected implicit val tagsCodec: TagsCodec = settings.codecSettings.tagsCodec - protected implicit val queryAdapter: QueryAdapter = settings.codecSettings.queryAdapter - protected val r2dbcExecutor = new R2dbcExecutor( connectionFactory, log, diff --git a/core/src/main/scala/akka/persistence/r2dbc/internal/sqlserver/SqlServerDurableStateDao.scala b/core/src/main/scala/akka/persistence/r2dbc/internal/sqlserver/SqlServerDurableStateDao.scala index ea5a687c..3414d2b4 100644 --- a/core/src/main/scala/akka/persistence/r2dbc/internal/sqlserver/SqlServerDurableStateDao.scala +++ b/core/src/main/scala/akka/persistence/r2dbc/internal/sqlserver/SqlServerDurableStateDao.scala @@ -44,6 +44,7 @@ private[r2dbc] class SqlServerDurableStateDao( connectionFactory: ConnectionFactory, dialect: Dialect)(implicit ec: ExecutionContext, system: ActorSystem[_]) extends PostgresDurableStateDao(settings, connectionFactory, dialect) { + import settings.codecSettings.DurableStateImplicits._ require(settings.useAppTimestamp, "SqlServer requires akka.persistence.r2dbc.use-app-timestamp=on") diff --git a/core/src/main/scala/akka/persistence/r2dbc/internal/sqlserver/SqlServerJournalDao.scala b/core/src/main/scala/akka/persistence/r2dbc/internal/sqlserver/SqlServerJournalDao.scala index 58a2ac1d..47a3cccd 100644 --- a/core/src/main/scala/akka/persistence/r2dbc/internal/sqlserver/SqlServerJournalDao.scala +++ b/core/src/main/scala/akka/persistence/r2dbc/internal/sqlserver/SqlServerJournalDao.scala @@ -36,6 +36,7 @@ private[r2dbc] class SqlServerJournalDao(settings: R2dbcSettings, connectionFact ec: ExecutionContext, system: ActorSystem[_]) extends PostgresJournalDao(settings, connectionFactory) { + import settings.codecSettings.JournalImplicits._ require(settings.useAppTimestamp, "SqlServer requires akka.persistence.r2dbc.use-app-timestamp=on") require( diff --git a/core/src/main/scala/akka/persistence/r2dbc/internal/sqlserver/SqlServerQueryDao.scala b/core/src/main/scala/akka/persistence/r2dbc/internal/sqlserver/SqlServerQueryDao.scala index 5297c0e0..7cdb18d8 100644 --- a/core/src/main/scala/akka/persistence/r2dbc/internal/sqlserver/SqlServerQueryDao.scala +++ b/core/src/main/scala/akka/persistence/r2dbc/internal/sqlserver/SqlServerQueryDao.scala @@ -40,6 +40,7 @@ private[r2dbc] class SqlServerQueryDao(settings: R2dbcSettings, connectionFactor ec: ExecutionContext, system: ActorSystem[_]) extends PostgresQueryDao(settings, connectionFactory) { + import settings.codecSettings.JournalImplicits._ override def sqlFalse = "0" diff --git a/core/src/main/scala/akka/persistence/r2dbc/internal/sqlserver/SqlServerSnapshotDao.scala b/core/src/main/scala/akka/persistence/r2dbc/internal/sqlserver/SqlServerSnapshotDao.scala index b23dde68..451a18bd 100644 --- a/core/src/main/scala/akka/persistence/r2dbc/internal/sqlserver/SqlServerSnapshotDao.scala +++ b/core/src/main/scala/akka/persistence/r2dbc/internal/sqlserver/SqlServerSnapshotDao.scala @@ -43,6 +43,7 @@ private[r2dbc] class SqlServerSnapshotDao(settings: R2dbcSettings, connectionFac ec: ExecutionContext, system: ActorSystem[_]) extends PostgresSnapshotDao(settings, connectionFactory) { + import settings.codecSettings.SnapshotImplicits._ override def log: Logger = SqlServerSnapshotDao.log diff --git a/core/src/test/scala/akka/persistence/r2dbc/PayloadSpec.scala b/core/src/test/scala/akka/persistence/r2dbc/PayloadSpec.scala index 72f179da..955dc181 100644 --- a/core/src/test/scala/akka/persistence/r2dbc/PayloadSpec.scala +++ b/core/src/test/scala/akka/persistence/r2dbc/PayloadSpec.scala @@ -15,7 +15,6 @@ import akka.persistence.r2dbc.TestActors.Persister import akka.persistence.r2dbc.internal.codec.PayloadCodec.RichRow import com.typesafe.config.ConfigFactory import org.scalatest.wordspec.AnyWordSpecLike -import akka.persistence.r2dbc.internal.codec.PayloadCodec /** * The purpose of this test is to verify JSONB payloads, but it can also be run with ordinary BYTEA payloads. To test @@ -89,7 +88,7 @@ class PayloadSpec } private def selectJournalRow(persistenceId: String): TestRow = { - implicit val journalPayloadCodec: PayloadCodec = settings.codecSettings.journalPayloadCodec + import settings.codecSettings.JournalImplicits.journalPayloadCodec r2dbcExecutor .selectOne[TestRow]("test")( @@ -108,7 +107,7 @@ class PayloadSpec } private def selectSnapshotRow(persistenceId: String): TestRow = { - implicit val snapshotPayloadCodec: PayloadCodec = settings.codecSettings.snapshotPayloadCodec + import settings.codecSettings.SnapshotImplicits.snapshotPayloadCodec r2dbcExecutor .selectOne[TestRow]("test")( @@ -127,7 +126,7 @@ class PayloadSpec } private def selectDurableStateRow(persistenceId: String): TestRow = { - implicit val durableStatePayloadCodec: PayloadCodec = settings.codecSettings.durableStatePayloadCodec + import settings.codecSettings.DurableStateImplicits.durableStatePayloadCodec r2dbcExecutor .selectOne[TestRow]("test")( diff --git a/core/src/test/scala/akka/persistence/r2dbc/journal/PersistTagsSpec.scala b/core/src/test/scala/akka/persistence/r2dbc/journal/PersistTagsSpec.scala index 5452a6ab..6bbba9ad 100644 --- a/core/src/test/scala/akka/persistence/r2dbc/journal/PersistTagsSpec.scala +++ b/core/src/test/scala/akka/persistence/r2dbc/journal/PersistTagsSpec.scala @@ -15,7 +15,6 @@ import akka.persistence.r2dbc.TestActors.Persister import akka.persistence.r2dbc.TestConfig import akka.persistence.r2dbc.TestData import akka.persistence.r2dbc.TestDbLifecycle -import akka.persistence.r2dbc.internal.codec.TagsCodec import akka.persistence.r2dbc.internal.codec.TagsCodec.TagsCodecRichRow import akka.persistence.typed.PersistenceId import org.scalatest.wordspec.AnyWordSpecLike @@ -29,7 +28,7 @@ class PersistTagsSpec override def typedSystem: ActorSystem[_] = system private val settings = R2dbcSettings(system.settings.config.getConfig("akka.persistence.r2dbc")) - private implicit val tagsCodec: TagsCodec = settings.codecSettings.tagsCodec + import settings.codecSettings.JournalImplicits.tagsCodec case class Row(pid: String, seqNr: Long, tags: Set[String]) "Persist tags" should { diff --git a/core/src/test/scala/akka/persistence/r2dbc/journal/PersistTimestampSpec.scala b/core/src/test/scala/akka/persistence/r2dbc/journal/PersistTimestampSpec.scala index ed02c398..6f356555 100644 --- a/core/src/test/scala/akka/persistence/r2dbc/journal/PersistTimestampSpec.scala +++ b/core/src/test/scala/akka/persistence/r2dbc/journal/PersistTimestampSpec.scala @@ -22,7 +22,6 @@ import akka.persistence.r2dbc.internal.codec.TimestampCodec.TimestampCodecRichRo import akka.persistence.typed.PersistenceId import akka.serialization.SerializationExtension import org.scalatest.wordspec.AnyWordSpecLike -import akka.persistence.r2dbc.internal.codec.PayloadCodec class PersistTimestampSpec extends ScalaTestWithActorTestKit(TestConfig.config) @@ -31,10 +30,10 @@ class PersistTimestampSpec with TestData with LogCapturing { + import settings.codecSettings.JournalImplicits.journalPayloadCodec override def typedSystem: ActorSystem[_] = system private val settings = R2dbcSettings(system.settings.config.getConfig("akka.persistence.r2dbc")) private val serialization = SerializationExtension(system) - private implicit val journalPayloadCodec: PayloadCodec = settings.codecSettings.journalPayloadCodec case class Row(pid: String, seqNr: Long, dbTimestamp: Instant, event: String) implicit private val codec: TimestampCodec = diff --git a/core/src/test/scala/akka/persistence/r2dbc/query/EventsBySliceBacktrackingSpec.scala b/core/src/test/scala/akka/persistence/r2dbc/query/EventsBySliceBacktrackingSpec.scala index b6ecd240..551a03a6 100644 --- a/core/src/test/scala/akka/persistence/r2dbc/query/EventsBySliceBacktrackingSpec.scala +++ b/core/src/test/scala/akka/persistence/r2dbc/query/EventsBySliceBacktrackingSpec.scala @@ -35,7 +35,6 @@ import akka.stream.testkit.scaladsl.TestSink import com.typesafe.config.ConfigFactory import org.scalatest.wordspec.AnyWordSpecLike import org.slf4j.LoggerFactory -import akka.persistence.r2dbc.internal.codec.PayloadCodec object EventsBySliceBacktrackingSpec { private val BufferSize = 10 // small buffer for testing @@ -57,9 +56,7 @@ class EventsBySliceBacktrackingSpec override def typedSystem: ActorSystem[_] = system private val settings = R2dbcSettings(system.settings.config.getConfig("akka.persistence.r2dbc")) - private implicit val journalPayloadCodec: PayloadCodec = settings.codecSettings.journalPayloadCodec - private implicit val timestampCodec: TimestampCodec = settings.codecSettings.timestampCodec - private implicit val queryAdapter: QueryAdapter = settings.codecSettings.queryAdapter + import settings.codecSettings.JournalImplicits._ private val query = PersistenceQuery(testKit.system) .readJournalFor[R2dbcReadJournal](R2dbcReadJournal.Identifier)