diff --git a/core/src/main/mima-filters/1.2.2.backwards.excludes/mssql-dialect.excludes b/core/src/main/mima-filters/1.2.1.backwards.excludes/mssql-dialect.excludes similarity index 65% rename from core/src/main/mima-filters/1.2.2.backwards.excludes/mssql-dialect.excludes rename to core/src/main/mima-filters/1.2.1.backwards.excludes/mssql-dialect.excludes index 3140d0e3..83af9c85 100644 --- a/core/src/main/mima-filters/1.2.2.backwards.excludes/mssql-dialect.excludes +++ b/core/src/main/mima-filters/1.2.1.backwards.excludes/mssql-dialect.excludes @@ -1,16 +1,12 @@ ProblemFilters.exclude[DirectMissingMethodProblem]("akka.persistence.r2dbc.R2dbcSettings.this") -ProblemFilters.exclude[MissingClassProblem]("akka.persistence.r2dbc.internal.Sql$Interpolation$") -ProblemFilters.exclude[DirectMissingMethodProblem]("akka.persistence.r2dbc.internal.Sql.Interpolation") -ProblemFilters.exclude[DirectMissingMethodProblem]("akka.persistence.r2dbc.internal.Sql.Interpolation") -ProblemFilters.exclude[DirectMissingMethodProblem]("akka.persistence.r2dbc.internal.Sql#Interpolation.this") ProblemFilters.exclude[MissingClassProblem]("akka.persistence.r2dbc.internal.h2.H2Utils") ProblemFilters.exclude[MissingClassProblem]("akka.persistence.r2dbc.internal.h2.H2Utils$") -ProblemFilters.exclude[IncompatibleResultTypeProblem]("akka.persistence.r2dbc.R2dbcSettings.journalPayloadCodec") ProblemFilters.exclude[MissingClassProblem]("akka.persistence.r2dbc.internal.PayloadCodec$RichStatement") ProblemFilters.exclude[MissingClassProblem]("akka.persistence.r2dbc.internal.PayloadCodec$RichRow") ProblemFilters.exclude[MissingClassProblem]("akka.persistence.r2dbc.internal.PayloadCodec$JsonCodec$") ProblemFilters.exclude[MissingClassProblem]("akka.persistence.r2dbc.internal.PayloadCodec$ByteArrayCodec$") ProblemFilters.exclude[MissingClassProblem]("akka.persistence.r2dbc.internal.PayloadCodec$") ProblemFilters.exclude[MissingClassProblem]("akka.persistence.r2dbc.internal.PayloadCodec") -ProblemFilters.exclude[IncompatibleResultTypeProblem]("akka.persistence.r2dbc.R2dbcSettings.durableStatePayloadCodec") -ProblemFilters.exclude[IncompatibleResultTypeProblem]("akka.persistence.r2dbc.R2dbcSettings.snapshotPayloadCodec") \ No newline at end of file +ProblemFilters.exclude[DirectMissingMethodProblem]("akka.persistence.r2dbc.R2dbcSettings.journalPayloadCodec") +ProblemFilters.exclude[DirectMissingMethodProblem]("akka.persistence.r2dbc.R2dbcSettings.snapshotPayloadCodec") +ProblemFilters.exclude[DirectMissingMethodProblem]("akka.persistence.r2dbc.R2dbcSettings.durableStatePayloadCodec") diff --git a/core/src/main/scala/akka/persistence/r2dbc/R2dbcSettings.scala b/core/src/main/scala/akka/persistence/r2dbc/R2dbcSettings.scala index 9595be58..5f64b4fc 100644 --- a/core/src/main/scala/akka/persistence/r2dbc/R2dbcSettings.scala +++ b/core/src/main/scala/akka/persistence/r2dbc/R2dbcSettings.scala @@ -54,21 +54,12 @@ object R2dbcSettings { s"Expected akka.persistence.r2dbc.$prefix.payload-column-type to be one of 'BYTEA', 'JSON' or 'JSONB' but found '$t'") } - val journalPayloadCodec: PayloadCodec = - if (useJsonPayload("journal")) PayloadCodec.JsonCodec else PayloadCodec.ByteArrayCodec - val journalPublishEvents: Boolean = config.getBoolean("journal.publish-events") val snapshotsTable: String = config.getString("snapshot.table") - val snapshotPayloadCodec: PayloadCodec = - if (useJsonPayload("snapshot")) PayloadCodec.JsonCodec else PayloadCodec.ByteArrayCodec - val durableStateTable: String = config.getString("state.table") - val durableStatePayloadCodec: PayloadCodec = - if (useJsonPayload("state")) PayloadCodec.JsonCodec else PayloadCodec.ByteArrayCodec - val durableStateTableByEntityType: Map[String, String] = configToMap(config.getConfig("state.custom-table")) @@ -88,18 +79,6 @@ object R2dbcSettings { val connectionFactorySettings = ConnectionFactorySettings(config.getConfig("connection-factory")) - val (tagsCodec: TagsCodec, timestampCodec: TimestampCodec, queryAdapter: QueryAdapter) = { - connectionFactorySettings.dialect.name match { - case "sqlserver" => - ( - new TagsCodec.SqlServerTagsCodec(connectionFactorySettings.config), - TimestampCodec.SqlServerTimestampCodec, - SqlServerQueryAdapter) - case "h2" => (TagsCodec.H2TagsCodec, TimestampCodec.H2TimestampCodec, IdentityAdapter) - case _ => (TagsCodec.PostgresTagsCodec, TimestampCodec.PostgresTimestampCodec, IdentityAdapter) - } - } - val querySettings = new QuerySettings(config.getConfig("query")) val dbTimestampMonotonicIncreasing: Boolean = config.getBoolean("db-timestamp-monotonic-increasing") @@ -112,24 +91,55 @@ object R2dbcSettings { case _ => config.getDuration("log-db-calls-exceeding").asScala } + val codecSettings = { + val journalPayloadCodec: PayloadCodec = + if (useJsonPayload("journal")) PayloadCodec.JsonCodec else PayloadCodec.ByteArrayCodec + val snapshotPayloadCodec: PayloadCodec = + if (useJsonPayload("snapshot")) PayloadCodec.JsonCodec else PayloadCodec.ByteArrayCodec + val durableStatePayloadCodec: PayloadCodec = + if (useJsonPayload("state")) PayloadCodec.JsonCodec else PayloadCodec.ByteArrayCodec + + connectionFactorySettings.dialect.name match { + case "sqlserver" => + new CodecSettings( + journalPayloadCodec, + snapshotPayloadCodec, + durableStatePayloadCodec, + tagsCodec = new TagsCodec.SqlServerTagsCodec(connectionFactorySettings.config), + timestampCodec = TimestampCodec.SqlServerTimestampCodec, + queryAdapter = SqlServerQueryAdapter) + case "h2" => + new CodecSettings( + journalPayloadCodec, + snapshotPayloadCodec, + durableStatePayloadCodec, + tagsCodec = TagsCodec.H2TagsCodec, + timestampCodec = TimestampCodec.H2TimestampCodec, + queryAdapter = IdentityAdapter) + case _ => + new CodecSettings( + journalPayloadCodec, + snapshotPayloadCodec, + durableStatePayloadCodec, + tagsCodec = TagsCodec.PostgresTagsCodec, + timestampCodec = TimestampCodec.PostgresTimestampCodec, + queryAdapter = IdentityAdapter) + } + } + val cleanupSettings = new CleanupSettings(config.getConfig("cleanup")) val settingsFromConfig = new R2dbcSettings( schema, journalTable, - journalPayloadCodec, journalPublishEvents, snapshotsTable, - snapshotPayloadCodec, - tagsCodec, - timestampCodec, - queryAdapter, durableStateTable, - durableStatePayloadCodec, durableStateAssertSingleWriter, logDbCallsExceeding, querySettings, dbTimestampMonotonicIncreasing, cleanupSettings, + codecSettings, connectionFactorySettings, durableStateTableByEntityType, durableStateAdditionalColumnClasses, @@ -153,20 +163,16 @@ object R2dbcSettings { final class R2dbcSettings private ( val schema: Option[String], val journalTable: String, - val journalPayloadCodec: PayloadCodec, val journalPublishEvents: Boolean, val snapshotsTable: String, - val snapshotPayloadCodec: PayloadCodec, - val tagsCodec: TagsCodec, - val timestampCodec: TimestampCodec, - val queryAdapter: QueryAdapter, val durableStateTable: String, - val durableStatePayloadCodec: PayloadCodec, val durableStateAssertSingleWriter: Boolean, val logDbCallsExceeding: FiniteDuration, val querySettings: QuerySettings, val dbTimestampMonotonicIncreasing: Boolean, val cleanupSettings: CleanupSettings, + /** INTERNAL API */ + @InternalApi private[akka] val codecSettings: CodecSettings, _connectionFactorySettings: ConnectionFactorySettings, _durableStateTableByEntityType: Map[String, String], _durableStateAdditionalColumnClasses: Map[String, immutable.IndexedSeq[String]], @@ -234,20 +240,15 @@ final class R2dbcSettings private ( private def copy( schema: Option[String] = schema, journalTable: String = journalTable, - journalPayloadCodec: PayloadCodec = journalPayloadCodec, journalPublishEvents: Boolean = journalPublishEvents, snapshotsTable: String = snapshotsTable, - snapshotPayloadCodec: PayloadCodec = snapshotPayloadCodec, - tagsCodec: TagsCodec = tagsCodec, - timestampCodec: TimestampCodec = timestampCodec, - queryAdapter: QueryAdapter = queryAdapter, durableStateTable: String = durableStateTable, - durableStatePayloadCodec: PayloadCodec = durableStatePayloadCodec, durableStateAssertSingleWriter: Boolean = durableStateAssertSingleWriter, logDbCallsExceeding: FiniteDuration = logDbCallsExceeding, querySettings: QuerySettings = querySettings, dbTimestampMonotonicIncreasing: Boolean = dbTimestampMonotonicIncreasing, cleanupSettings: CleanupSettings = cleanupSettings, + codecSettings: CodecSettings = codecSettings, connectionFactorySettings: ConnectionFactorySettings = connectionFactorySettings, durableStateTableByEntityType: Map[String, String] = _durableStateTableByEntityType, durableStateAdditionalColumnClasses: Map[String, immutable.IndexedSeq[String]] = @@ -257,20 +258,15 @@ final class R2dbcSettings private ( new R2dbcSettings( schema, journalTable, - journalPayloadCodec, journalPublishEvents, snapshotsTable, - snapshotPayloadCodec, - tagsCodec, - timestampCodec, - queryAdapter, durableStateTable, - durableStatePayloadCodec, durableStateAssertSingleWriter, logDbCallsExceeding, querySettings, dbTimestampMonotonicIncreasing, cleanupSettings, + codecSettings, connectionFactorySettings, _durableStateTableByEntityType, _durableStateAdditionalColumnClasses, @@ -328,6 +324,39 @@ final class PublishEventsDynamicSettings(config: Config) { val throughputCollectInterval: FiniteDuration = config.getDuration("throughput-collect-interval").asScala } +/** + * INTERNAL API + */ +@InternalStableApi +final class CodecSettings( + val journalPayloadCodec: PayloadCodec, + val snapshotPayloadCodec: PayloadCodec, + val durableStatePayloadCodec: PayloadCodec, + val tagsCodec: TagsCodec, + val timestampCodec: TimestampCodec, + val queryAdapter: QueryAdapter) { + + // implicits that can be imported + object JournalImplicits { + implicit def journalPayloadCodec: PayloadCodec = CodecSettings.this.journalPayloadCodec + implicit def tagsCodec: TagsCodec = CodecSettings.this.tagsCodec + implicit def timestampCodec: TimestampCodec = CodecSettings.this.timestampCodec + implicit def queryAdapter: QueryAdapter = CodecSettings.this.queryAdapter + } + object SnapshotImplicits { + implicit def snapshotPayloadCodec: PayloadCodec = CodecSettings.this.snapshotPayloadCodec + implicit def tagsCodec: TagsCodec = CodecSettings.this.tagsCodec + implicit def timestampCodec: TimestampCodec = CodecSettings.this.timestampCodec + implicit def queryAdapter: QueryAdapter = CodecSettings.this.queryAdapter + } + object DurableStateImplicits { + implicit def durableStatePayloadCodec: PayloadCodec = CodecSettings.this.durableStatePayloadCodec + implicit def tagsCodec: TagsCodec = CodecSettings.this.tagsCodec + implicit def timestampCodec: TimestampCodec = CodecSettings.this.timestampCodec + implicit def queryAdapter: QueryAdapter = CodecSettings.this.queryAdapter + } +} + /** * INTERNAL API */ diff --git a/core/src/main/scala/akka/persistence/r2dbc/internal/Sql.scala b/core/src/main/scala/akka/persistence/r2dbc/internal/Sql.scala index 95da3482..d81a5f4c 100644 --- a/core/src/main/scala/akka/persistence/r2dbc/internal/Sql.scala +++ b/core/src/main/scala/akka/persistence/r2dbc/internal/Sql.scala @@ -5,6 +5,8 @@ package akka.persistence.r2dbc.internal import scala.annotation.varargs + +import akka.annotation.InternalApi import akka.annotation.InternalStableApi import akka.persistence.r2dbc.internal.codec.IdentityAdapter import akka.persistence.r2dbc.internal.codec.QueryAdapter @@ -21,7 +23,19 @@ object Sql { * to include a literal `?`. Trims whitespace, including line breaks. Standard string interpolation arguments `$` can * be used. */ - implicit class Interpolation(val sc: StringContext)(implicit adapter: QueryAdapter) extends AnyRef { + implicit class Interpolation(val sc: StringContext) extends AnyVal { + def sql(args: Any*): String = + fillInParameterNumbers(trimLineBreaks(sc.s(args: _*))) + } + + /** + * INTERNAL API: Scala string interpolation with `sql` prefix. Replaces `?` with numbered `\$1`, `\$2` for bind + * parameters. Use `??` to include a literal `?`. Trims whitespace, including line breaks. Standard string + * interpolation arguments `$` can be used. + */ + @InternalApi private[akka] implicit class InterpolationWithAdapter(val sc: StringContext)(implicit + adapter: QueryAdapter) + extends AnyRef { def sql(args: Any*): String = adapter(fillInParameterNumbers(trimLineBreaks(sc.s(args: _*)))) } diff --git a/core/src/main/scala/akka/persistence/r2dbc/internal/h2/H2Dialect.scala b/core/src/main/scala/akka/persistence/r2dbc/internal/h2/H2Dialect.scala index 9b26f066..2cf19ef3 100644 --- a/core/src/main/scala/akka/persistence/r2dbc/internal/h2/H2Dialect.scala +++ b/core/src/main/scala/akka/persistence/r2dbc/internal/h2/H2Dialect.scala @@ -13,7 +13,7 @@ import akka.persistence.r2dbc.internal.DurableStateDao import akka.persistence.r2dbc.internal.JournalDao import akka.persistence.r2dbc.internal.QueryDao import akka.persistence.r2dbc.internal.SnapshotDao -import akka.persistence.r2dbc.internal.Sql.Interpolation +import akka.persistence.r2dbc.internal.Sql.InterpolationWithAdapter import akka.util.ccompat.JavaConverters._ import com.typesafe.config.Config import io.r2dbc.h2.H2ConnectionConfiguration diff --git a/core/src/main/scala/akka/persistence/r2dbc/internal/h2/H2JournalDao.scala b/core/src/main/scala/akka/persistence/r2dbc/internal/h2/H2JournalDao.scala index 0754a754..98e55d53 100644 --- a/core/src/main/scala/akka/persistence/r2dbc/internal/h2/H2JournalDao.scala +++ b/core/src/main/scala/akka/persistence/r2dbc/internal/h2/H2JournalDao.scala @@ -10,7 +10,7 @@ import akka.dispatch.ExecutionContexts import akka.persistence.r2dbc.R2dbcSettings import akka.persistence.r2dbc.internal.JournalDao import akka.persistence.r2dbc.internal.codec.PayloadCodec.RichStatement -import akka.persistence.r2dbc.internal.Sql.Interpolation +import akka.persistence.r2dbc.internal.Sql.InterpolationWithAdapter import akka.persistence.r2dbc.internal.postgres.PostgresJournalDao import io.r2dbc.spi.ConnectionFactory import io.r2dbc.spi.Statement @@ -34,6 +34,7 @@ private[r2dbc] class H2JournalDao(journalSettings: R2dbcSettings, connectionFact system: ActorSystem[_]) extends PostgresJournalDao(journalSettings, connectionFactory) { import JournalDao.SerializedJournalRow + import journalSettings.codecSettings.JournalImplicits._ override protected lazy val log: Logger = LoggerFactory.getLogger(classOf[H2JournalDao]) // always app timestamp (db is same process) monotonic increasing require(journalSettings.useAppTimestamp) diff --git a/core/src/main/scala/akka/persistence/r2dbc/internal/h2/H2QueryDao.scala b/core/src/main/scala/akka/persistence/r2dbc/internal/h2/H2QueryDao.scala index 04af5496..00a856ce 100644 --- a/core/src/main/scala/akka/persistence/r2dbc/internal/h2/H2QueryDao.scala +++ b/core/src/main/scala/akka/persistence/r2dbc/internal/h2/H2QueryDao.scala @@ -7,7 +7,7 @@ package akka.persistence.r2dbc.internal.h2 import akka.actor.typed.ActorSystem import akka.annotation.InternalApi import akka.persistence.r2dbc.R2dbcSettings -import akka.persistence.r2dbc.internal.Sql.Interpolation +import akka.persistence.r2dbc.internal.Sql.InterpolationWithAdapter import akka.persistence.r2dbc.internal.postgres.PostgresQueryDao import io.r2dbc.spi.ConnectionFactory import io.r2dbc.spi.Row @@ -26,6 +26,7 @@ private[r2dbc] class H2QueryDao(settings: R2dbcSettings, connectionFactory: Conn ec: ExecutionContext, system: ActorSystem[_]) extends PostgresQueryDao(settings, connectionFactory) { + import settings.codecSettings.JournalImplicits._ override protected lazy val log: Logger = LoggerFactory.getLogger(classOf[H2QueryDao]) override protected def eventsBySlicesRangeSql( diff --git a/core/src/main/scala/akka/persistence/r2dbc/internal/h2/H2SnapshotDao.scala b/core/src/main/scala/akka/persistence/r2dbc/internal/h2/H2SnapshotDao.scala index c9826b03..78178101 100644 --- a/core/src/main/scala/akka/persistence/r2dbc/internal/h2/H2SnapshotDao.scala +++ b/core/src/main/scala/akka/persistence/r2dbc/internal/h2/H2SnapshotDao.scala @@ -7,7 +7,7 @@ package akka.persistence.r2dbc.internal.h2 import akka.actor.typed.ActorSystem import akka.annotation.InternalApi import akka.persistence.r2dbc.R2dbcSettings -import akka.persistence.r2dbc.internal.Sql.Interpolation +import akka.persistence.r2dbc.internal.Sql.InterpolationWithAdapter import akka.persistence.r2dbc.internal.postgres.PostgresSnapshotDao import io.r2dbc.spi.ConnectionFactory import org.slf4j.Logger @@ -24,6 +24,7 @@ private[r2dbc] final class H2SnapshotDao(settings: R2dbcSettings, connectionFact ec: ExecutionContext, system: ActorSystem[_]) extends PostgresSnapshotDao(settings, connectionFactory) { + import settings.codecSettings.SnapshotImplicits._ override protected lazy val log: Logger = LoggerFactory.getLogger(classOf[H2SnapshotDao]) diff --git a/core/src/main/scala/akka/persistence/r2dbc/internal/postgres/PostgresDurableStateDao.scala b/core/src/main/scala/akka/persistence/r2dbc/internal/postgres/PostgresDurableStateDao.scala index de7c165b..21a7f326 100644 --- a/core/src/main/scala/akka/persistence/r2dbc/internal/postgres/PostgresDurableStateDao.scala +++ b/core/src/main/scala/akka/persistence/r2dbc/internal/postgres/PostgresDurableStateDao.scala @@ -47,13 +47,8 @@ import akka.persistence.r2dbc.internal.JournalDao.SerializedJournalRow import akka.persistence.r2dbc.internal.codec.PayloadCodec.RichRow import akka.persistence.r2dbc.internal.codec.PayloadCodec.RichStatement import akka.persistence.r2dbc.internal.R2dbcExecutor -import akka.persistence.r2dbc.internal.Sql.Interpolation -import akka.persistence.r2dbc.internal.codec.PayloadCodec -import akka.persistence.r2dbc.internal.codec.QueryAdapter -import akka.persistence.r2dbc.internal.codec.SqlServerQueryAdapter -import akka.persistence.r2dbc.internal.codec.TagsCodec +import akka.persistence.r2dbc.internal.Sql.InterpolationWithAdapter import akka.persistence.r2dbc.internal.codec.TagsCodec.TagsCodecRichStatement -import akka.persistence.r2dbc.internal.codec.TimestampCodec import akka.persistence.r2dbc.internal.codec.TimestampCodec.TimestampCodecRichRow import akka.persistence.r2dbc.internal.codec.TimestampCodec.TimestampCodecRichStatement import akka.persistence.r2dbc.session.scaladsl.R2dbcSession @@ -90,6 +85,7 @@ private[r2dbc] class PostgresDurableStateDao( extends DurableStateDao { import DurableStateDao._ import PostgresDurableStateDao._ + import settings.codecSettings.DurableStateImplicits._ protected def log: Logger = PostgresDurableStateDao.log private val persistenceExt = Persistence(system) @@ -99,12 +95,6 @@ private[r2dbc] class PostgresDurableStateDao( settings.logDbCallsExceeding, settings.connectionFactorySettings.poolSettings.closeCallsExceeding)(ec, system) - private implicit val statePayloadCodec: PayloadCodec = settings.durableStatePayloadCodec - private implicit val tagsCodec: TagsCodec = settings.tagsCodec - - protected implicit val timestampCodec: TimestampCodec = settings.timestampCodec - protected implicit val queryAdapter: QueryAdapter = settings.queryAdapter - // used for change events private lazy val journalDao: JournalDao = dialect.createJournalDao(settings, connectionFactory) @@ -322,7 +312,7 @@ private[r2dbc] class PostgresDurableStateDao( private def getPayload(row: Row): Option[Array[Byte]] = { val serId = row.get("state_ser_id", classOf[Integer]) val rowPayload = row.getPayload("state_payload") - if (serId == 0 && (rowPayload == null || util.Arrays.equals(statePayloadCodec.nonePayload, rowPayload))) + if (serId == 0 && (rowPayload == null || util.Arrays.equals(durableStatePayloadCodec.nonePayload, rowPayload))) None // delete marker else Option(rowPayload) diff --git a/core/src/main/scala/akka/persistence/r2dbc/internal/postgres/PostgresJournalDao.scala b/core/src/main/scala/akka/persistence/r2dbc/internal/postgres/PostgresJournalDao.scala index a1e241fd..a5349948 100644 --- a/core/src/main/scala/akka/persistence/r2dbc/internal/postgres/PostgresJournalDao.scala +++ b/core/src/main/scala/akka/persistence/r2dbc/internal/postgres/PostgresJournalDao.scala @@ -14,10 +14,8 @@ import akka.persistence.r2dbc.internal.JournalDao import akka.persistence.r2dbc.internal.codec.PayloadCodec.RichStatement import akka.persistence.r2dbc.internal.R2dbcExecutor import akka.persistence.r2dbc.internal.SerializedEventMetadata -import akka.persistence.r2dbc.internal.Sql.Interpolation -import akka.persistence.r2dbc.internal.codec.TagsCodec +import akka.persistence.r2dbc.internal.Sql.InterpolationWithAdapter import akka.persistence.r2dbc.internal.codec.TagsCodec.TagsCodecRichStatement -import akka.persistence.r2dbc.internal.codec.TimestampCodec import akka.persistence.r2dbc.internal.codec.TimestampCodec.TimestampCodecRichRow import akka.persistence.r2dbc.internal.codec.TimestampCodec.TimestampCodecRichStatement import akka.persistence.typed.PersistenceId @@ -68,6 +66,7 @@ private[r2dbc] class PostgresJournalDao(journalSettings: R2dbcSettings, connecti system: ActorSystem[_]) extends JournalDao { import JournalDao.SerializedJournalRow + import journalSettings.codecSettings.JournalImplicits._ protected def log: Logger = PostgresJournalDao.log private val persistenceExt = Persistence(system) @@ -80,10 +79,6 @@ private[r2dbc] class PostgresJournalDao(journalSettings: R2dbcSettings, connecti journalSettings.connectionFactorySettings.poolSettings.closeCallsExceeding)(ec, system) protected val journalTable: String = journalSettings.journalTableWithSchema - protected implicit val journalPayloadCodec: PayloadCodec = journalSettings.journalPayloadCodec - protected implicit val tagsCodec: TagsCodec = journalSettings.tagsCodec - protected implicit val timestampCodec: TimestampCodec = journalSettings.timestampCodec - protected implicit val queryAdapter: QueryAdapter = journalSettings.queryAdapter protected val (insertEventWithParameterTimestampSql, insertEventWithTransactionTimestampSql) = { val baseSql = diff --git a/core/src/main/scala/akka/persistence/r2dbc/internal/postgres/PostgresQueryDao.scala b/core/src/main/scala/akka/persistence/r2dbc/internal/postgres/PostgresQueryDao.scala index c2219faa..1f51e81c 100644 --- a/core/src/main/scala/akka/persistence/r2dbc/internal/postgres/PostgresQueryDao.scala +++ b/core/src/main/scala/akka/persistence/r2dbc/internal/postgres/PostgresQueryDao.scala @@ -23,11 +23,8 @@ import akka.persistence.r2dbc.internal.JournalDao.SerializedJournalRow import akka.persistence.r2dbc.internal.codec.PayloadCodec.RichRow import akka.persistence.r2dbc.internal.QueryDao import akka.persistence.r2dbc.internal.R2dbcExecutor -import akka.persistence.r2dbc.internal.Sql.Interpolation -import akka.persistence.r2dbc.internal.codec.QueryAdapter -import akka.persistence.r2dbc.internal.codec.TagsCodec +import akka.persistence.r2dbc.internal.Sql.InterpolationWithAdapter import akka.persistence.r2dbc.internal.codec.TagsCodec.TagsCodecRichRow -import akka.persistence.r2dbc.internal.codec.TimestampCodec import akka.persistence.r2dbc.internal.codec.TimestampCodec.TimestampCodecRichRow import akka.persistence.r2dbc.internal.codec.TimestampCodec.TimestampCodecRichStatement import akka.persistence.typed.PersistenceId @@ -36,7 +33,6 @@ import io.r2dbc.spi.ConnectionFactory import io.r2dbc.spi.Statement import org.slf4j.Logger import org.slf4j.LoggerFactory -import akka.persistence.r2dbc.internal.codec.PayloadCodec /** * INTERNAL API @@ -55,13 +51,10 @@ private[r2dbc] class PostgresQueryDao(settings: R2dbcSettings, connectionFactory system: ActorSystem[_]) extends QueryDao { import PostgresJournalDao.readMetadata + import settings.codecSettings.JournalImplicits._ protected def log: Logger = PostgresQueryDao.log protected val journalTable: String = settings.journalTableWithSchema - protected implicit val journalPayloadCodec: PayloadCodec = settings.journalPayloadCodec - protected implicit val timestampCodec: TimestampCodec = settings.timestampCodec - protected implicit val tagsCodec: TagsCodec = settings.tagsCodec - protected implicit val queryAdapter: QueryAdapter = settings.queryAdapter protected def sqlFalse: String = "false" protected def sqlDbTimestamp = "CURRENT_TIMESTAMP" diff --git a/core/src/main/scala/akka/persistence/r2dbc/internal/postgres/PostgresSnapshotDao.scala b/core/src/main/scala/akka/persistence/r2dbc/internal/postgres/PostgresSnapshotDao.scala index 667cb930..f5ea95f3 100644 --- a/core/src/main/scala/akka/persistence/r2dbc/internal/postgres/PostgresSnapshotDao.scala +++ b/core/src/main/scala/akka/persistence/r2dbc/internal/postgres/PostgresSnapshotDao.scala @@ -30,13 +30,9 @@ import akka.persistence.r2dbc.internal.codec.PayloadCodec.RichRow import akka.persistence.r2dbc.internal.codec.PayloadCodec.RichStatement import akka.persistence.r2dbc.internal.R2dbcExecutor import akka.persistence.r2dbc.internal.SnapshotDao -import akka.persistence.r2dbc.internal.Sql.Interpolation -import akka.persistence.r2dbc.internal.codec.PayloadCodec -import akka.persistence.r2dbc.internal.codec.QueryAdapter -import akka.persistence.r2dbc.internal.codec.TagsCodec +import akka.persistence.r2dbc.internal.Sql.InterpolationWithAdapter import akka.persistence.r2dbc.internal.codec.TagsCodec.TagsCodecRichStatement import akka.persistence.r2dbc.internal.codec.TagsCodec.TagsCodecRichRow -import akka.persistence.r2dbc.internal.codec.TimestampCodec import akka.persistence.r2dbc.internal.codec.TimestampCodec.TimestampCodecRichRow import akka.persistence.r2dbc.internal.codec.TimestampCodec.TimestampCodecRichStatement import akka.persistence.typed.PersistenceId @@ -58,16 +54,12 @@ private[r2dbc] class PostgresSnapshotDao(settings: R2dbcSettings, connectionFact system: ActorSystem[_]) extends SnapshotDao { import SnapshotDao._ + import settings.codecSettings.SnapshotImplicits._ protected def log: Logger = PostgresSnapshotDao.log protected val snapshotTable: String = settings.snapshotsTableWithSchema - protected implicit val snapshotPayloadCodec: PayloadCodec = settings.snapshotPayloadCodec - protected implicit val timestampCodec: TimestampCodec = settings.timestampCodec - protected implicit val tagsCodec: TagsCodec = settings.tagsCodec - protected implicit val queryAdapter: QueryAdapter = settings.queryAdapter - protected val r2dbcExecutor = new R2dbcExecutor( connectionFactory, log, diff --git a/core/src/main/scala/akka/persistence/r2dbc/internal/sqlserver/SqlServerDurableStateDao.scala b/core/src/main/scala/akka/persistence/r2dbc/internal/sqlserver/SqlServerDurableStateDao.scala index c61c6f42..3414d2b4 100644 --- a/core/src/main/scala/akka/persistence/r2dbc/internal/sqlserver/SqlServerDurableStateDao.scala +++ b/core/src/main/scala/akka/persistence/r2dbc/internal/sqlserver/SqlServerDurableStateDao.scala @@ -16,7 +16,7 @@ import akka.actor.typed.ActorSystem import akka.annotation.InternalApi import akka.persistence.r2dbc.R2dbcSettings import akka.persistence.r2dbc.internal.Dialect -import akka.persistence.r2dbc.internal.Sql.Interpolation +import akka.persistence.r2dbc.internal.Sql.InterpolationWithAdapter import akka.persistence.r2dbc.internal.codec.TimestampCodec.TimestampCodecRichStatement import akka.persistence.r2dbc.internal.postgres.PostgresDurableStateDao import akka.persistence.r2dbc.internal.postgres.PostgresDurableStateDao.EvaluatedAdditionalColumnBindings @@ -44,6 +44,7 @@ private[r2dbc] class SqlServerDurableStateDao( connectionFactory: ConnectionFactory, dialect: Dialect)(implicit ec: ExecutionContext, system: ActorSystem[_]) extends PostgresDurableStateDao(settings, connectionFactory, dialect) { + import settings.codecSettings.DurableStateImplicits._ require(settings.useAppTimestamp, "SqlServer requires akka.persistence.r2dbc.use-app-timestamp=on") diff --git a/core/src/main/scala/akka/persistence/r2dbc/internal/sqlserver/SqlServerJournalDao.scala b/core/src/main/scala/akka/persistence/r2dbc/internal/sqlserver/SqlServerJournalDao.scala index 2c77a832..47a3cccd 100644 --- a/core/src/main/scala/akka/persistence/r2dbc/internal/sqlserver/SqlServerJournalDao.scala +++ b/core/src/main/scala/akka/persistence/r2dbc/internal/sqlserver/SqlServerJournalDao.scala @@ -9,7 +9,7 @@ import scala.concurrent.ExecutionContext import akka.actor.typed.ActorSystem import akka.annotation.InternalApi import akka.persistence.r2dbc.R2dbcSettings -import akka.persistence.r2dbc.internal.Sql.Interpolation +import akka.persistence.r2dbc.internal.Sql.InterpolationWithAdapter import akka.persistence.r2dbc.internal.codec.TimestampCodec.TimestampCodecRichStatement import akka.persistence.r2dbc.internal.postgres.PostgresJournalDao import io.r2dbc.spi.ConnectionFactory @@ -36,6 +36,7 @@ private[r2dbc] class SqlServerJournalDao(settings: R2dbcSettings, connectionFact ec: ExecutionContext, system: ActorSystem[_]) extends PostgresJournalDao(settings, connectionFactory) { + import settings.codecSettings.JournalImplicits._ require(settings.useAppTimestamp, "SqlServer requires akka.persistence.r2dbc.use-app-timestamp=on") require( diff --git a/core/src/main/scala/akka/persistence/r2dbc/internal/sqlserver/SqlServerQueryDao.scala b/core/src/main/scala/akka/persistence/r2dbc/internal/sqlserver/SqlServerQueryDao.scala index 504e0bbc..7cdb18d8 100644 --- a/core/src/main/scala/akka/persistence/r2dbc/internal/sqlserver/SqlServerQueryDao.scala +++ b/core/src/main/scala/akka/persistence/r2dbc/internal/sqlserver/SqlServerQueryDao.scala @@ -15,7 +15,7 @@ import akka.actor.typed.ActorSystem import akka.annotation.InternalApi import akka.persistence.r2dbc.R2dbcSettings import akka.persistence.r2dbc.internal.InstantFactory -import akka.persistence.r2dbc.internal.Sql.Interpolation +import akka.persistence.r2dbc.internal.Sql.InterpolationWithAdapter import akka.persistence.r2dbc.internal.codec.TimestampCodec.TimestampCodecRichStatement import akka.persistence.r2dbc.internal.postgres.PostgresQueryDao import io.r2dbc.spi.ConnectionFactory @@ -40,6 +40,7 @@ private[r2dbc] class SqlServerQueryDao(settings: R2dbcSettings, connectionFactor ec: ExecutionContext, system: ActorSystem[_]) extends PostgresQueryDao(settings, connectionFactory) { + import settings.codecSettings.JournalImplicits._ override def sqlFalse = "0" diff --git a/core/src/main/scala/akka/persistence/r2dbc/internal/sqlserver/SqlServerSnapshotDao.scala b/core/src/main/scala/akka/persistence/r2dbc/internal/sqlserver/SqlServerSnapshotDao.scala index 06056224..451a18bd 100644 --- a/core/src/main/scala/akka/persistence/r2dbc/internal/sqlserver/SqlServerSnapshotDao.scala +++ b/core/src/main/scala/akka/persistence/r2dbc/internal/sqlserver/SqlServerSnapshotDao.scala @@ -16,7 +16,7 @@ import akka.persistence.r2dbc.R2dbcSettings import akka.persistence.r2dbc.internal.codec.PayloadCodec.RichStatement import akka.persistence.r2dbc.internal.SnapshotDao.SerializedSnapshotMetadata import akka.persistence.r2dbc.internal.SnapshotDao.SerializedSnapshotRow -import akka.persistence.r2dbc.internal.Sql.Interpolation +import akka.persistence.r2dbc.internal.Sql.InterpolationWithAdapter import akka.persistence.r2dbc.internal.codec.TagsCodec.TagsCodecRichStatement import akka.persistence.r2dbc.internal.codec.TimestampCodec.TimestampCodecRichStatement import akka.persistence.r2dbc.internal.postgres.PostgresSnapshotDao @@ -43,6 +43,7 @@ private[r2dbc] class SqlServerSnapshotDao(settings: R2dbcSettings, connectionFac ec: ExecutionContext, system: ActorSystem[_]) extends PostgresSnapshotDao(settings, connectionFactory) { + import settings.codecSettings.SnapshotImplicits._ override def log: Logger = SqlServerSnapshotDao.log diff --git a/core/src/test/scala/akka/persistence/r2dbc/PayloadSpec.scala b/core/src/test/scala/akka/persistence/r2dbc/PayloadSpec.scala index 3acae82f..955dc181 100644 --- a/core/src/test/scala/akka/persistence/r2dbc/PayloadSpec.scala +++ b/core/src/test/scala/akka/persistence/r2dbc/PayloadSpec.scala @@ -15,7 +15,6 @@ import akka.persistence.r2dbc.TestActors.Persister import akka.persistence.r2dbc.internal.codec.PayloadCodec.RichRow import com.typesafe.config.ConfigFactory import org.scalatest.wordspec.AnyWordSpecLike -import akka.persistence.r2dbc.internal.codec.PayloadCodec /** * The purpose of this test is to verify JSONB payloads, but it can also be run with ordinary BYTEA payloads. To test @@ -89,7 +88,7 @@ class PayloadSpec } private def selectJournalRow(persistenceId: String): TestRow = { - implicit val journalPayloadCodec: PayloadCodec = settings.journalPayloadCodec + import settings.codecSettings.JournalImplicits.journalPayloadCodec r2dbcExecutor .selectOne[TestRow]("test")( @@ -108,7 +107,7 @@ class PayloadSpec } private def selectSnapshotRow(persistenceId: String): TestRow = { - implicit val snapshotPayloadCodec: PayloadCodec = settings.snapshotPayloadCodec + import settings.codecSettings.SnapshotImplicits.snapshotPayloadCodec r2dbcExecutor .selectOne[TestRow]("test")( @@ -127,7 +126,7 @@ class PayloadSpec } private def selectDurableStateRow(persistenceId: String): TestRow = { - implicit val durableStatePayloadCodec: PayloadCodec = settings.durableStatePayloadCodec + import settings.codecSettings.DurableStateImplicits.durableStatePayloadCodec r2dbcExecutor .selectOne[TestRow]("test")( @@ -205,7 +204,7 @@ class PayloadSpec testKit.stop(ref1) val row1 = selectDurableStateRow(persistenceId) - row1.payload.toVector shouldBe settings.durableStatePayloadCodec.nonePayload.toVector + row1.payload.toVector shouldBe settings.codecSettings.durableStatePayloadCodec.nonePayload.toVector val ref2 = spawn(DurableStatePersister(persistenceId)) ref2 ! DurableStatePersister.GetState(probe.ref) @@ -228,7 +227,7 @@ class PayloadSpec testKit.stop(ref3) val row3 = selectDurableStateRow(persistenceId) - row3.payload.toVector shouldBe settings.durableStatePayloadCodec.nonePayload.toVector + row3.payload.toVector shouldBe settings.codecSettings.durableStatePayloadCodec.nonePayload.toVector } } } diff --git a/core/src/test/scala/akka/persistence/r2dbc/TestDbLifecycle.scala b/core/src/test/scala/akka/persistence/r2dbc/TestDbLifecycle.scala index 20ca91fd..bc88637a 100644 --- a/core/src/test/scala/akka/persistence/r2dbc/TestDbLifecycle.scala +++ b/core/src/test/scala/akka/persistence/r2dbc/TestDbLifecycle.scala @@ -12,7 +12,7 @@ import akka.persistence.r2dbc.internal.R2dbcExecutor import org.scalatest.BeforeAndAfterAll import org.scalatest.Suite import org.slf4j.LoggerFactory -import akka.persistence.r2dbc.internal.Sql.Interpolation +import akka.persistence.r2dbc.internal.Sql.InterpolationWithAdapter import akka.persistence.r2dbc.internal.h2.H2Dialect import java.time.Instant diff --git a/core/src/test/scala/akka/persistence/r2dbc/internal/SqlSpec.scala b/core/src/test/scala/akka/persistence/r2dbc/internal/SqlSpec.scala index 425cbed6..a8a63ca8 100644 --- a/core/src/test/scala/akka/persistence/r2dbc/internal/SqlSpec.scala +++ b/core/src/test/scala/akka/persistence/r2dbc/internal/SqlSpec.scala @@ -12,7 +12,7 @@ import org.scalatest.matchers.should.Matchers import org.scalatest.wordspec.AnyWordSpec class SqlSpec extends AnyWordSpec with TestSuite with Matchers { - import Sql.Interpolation + import Sql.InterpolationWithAdapter "SQL string interpolation" should { implicit val queryAdapter: QueryAdapter = IdentityAdapter "replace ? bind parameters with numbered $ (avoiding escaped ones)" in { diff --git a/core/src/test/scala/akka/persistence/r2dbc/journal/PersistTagsSpec.scala b/core/src/test/scala/akka/persistence/r2dbc/journal/PersistTagsSpec.scala index d48452a4..6bbba9ad 100644 --- a/core/src/test/scala/akka/persistence/r2dbc/journal/PersistTagsSpec.scala +++ b/core/src/test/scala/akka/persistence/r2dbc/journal/PersistTagsSpec.scala @@ -15,7 +15,6 @@ import akka.persistence.r2dbc.TestActors.Persister import akka.persistence.r2dbc.TestConfig import akka.persistence.r2dbc.TestData import akka.persistence.r2dbc.TestDbLifecycle -import akka.persistence.r2dbc.internal.codec.TagsCodec import akka.persistence.r2dbc.internal.codec.TagsCodec.TagsCodecRichRow import akka.persistence.typed.PersistenceId import org.scalatest.wordspec.AnyWordSpecLike @@ -29,7 +28,7 @@ class PersistTagsSpec override def typedSystem: ActorSystem[_] = system private val settings = R2dbcSettings(system.settings.config.getConfig("akka.persistence.r2dbc")) - private implicit val tagsCodec: TagsCodec = settings.tagsCodec + import settings.codecSettings.JournalImplicits.tagsCodec case class Row(pid: String, seqNr: Long, tags: Set[String]) "Persist tags" should { diff --git a/core/src/test/scala/akka/persistence/r2dbc/journal/PersistTimestampSpec.scala b/core/src/test/scala/akka/persistence/r2dbc/journal/PersistTimestampSpec.scala index d390602d..6f356555 100644 --- a/core/src/test/scala/akka/persistence/r2dbc/journal/PersistTimestampSpec.scala +++ b/core/src/test/scala/akka/persistence/r2dbc/journal/PersistTimestampSpec.scala @@ -22,7 +22,6 @@ import akka.persistence.r2dbc.internal.codec.TimestampCodec.TimestampCodecRichRo import akka.persistence.typed.PersistenceId import akka.serialization.SerializationExtension import org.scalatest.wordspec.AnyWordSpecLike -import akka.persistence.r2dbc.internal.codec.PayloadCodec class PersistTimestampSpec extends ScalaTestWithActorTestKit(TestConfig.config) @@ -31,10 +30,10 @@ class PersistTimestampSpec with TestData with LogCapturing { + import settings.codecSettings.JournalImplicits.journalPayloadCodec override def typedSystem: ActorSystem[_] = system private val settings = R2dbcSettings(system.settings.config.getConfig("akka.persistence.r2dbc")) private val serialization = SerializationExtension(system) - private implicit val journalPayloadCodec: PayloadCodec = settings.journalPayloadCodec case class Row(pid: String, seqNr: Long, dbTimestamp: Instant, event: String) implicit private val codec: TimestampCodec = diff --git a/core/src/test/scala/akka/persistence/r2dbc/journal/R2dbcJournalPerfSpec.scala b/core/src/test/scala/akka/persistence/r2dbc/journal/R2dbcJournalPerfSpec.scala index 7513f99f..c57bea6c 100644 --- a/core/src/test/scala/akka/persistence/r2dbc/journal/R2dbcJournalPerfSpec.scala +++ b/core/src/test/scala/akka/persistence/r2dbc/journal/R2dbcJournalPerfSpec.scala @@ -6,6 +6,9 @@ package akka.persistence.r2dbc.journal import scala.concurrent.duration._ +import org.scalatest.exceptions.TestPendingException + +import akka.actor.ActorRef import akka.actor.typed.ActorSystem import akka.actor.typed.scaladsl.adapter._ import akka.persistence.CapabilityFlag @@ -26,4 +29,11 @@ class R2dbcJournalPerfSpec extends JournalPerfSpec(R2dbcJournalPerfSpec.config) override protected def supportsRejectingNonSerializableObjects: CapabilityFlag = CapabilityFlag.off() override def typedSystem: ActorSystem[_] = system.toTyped + + override def benchActor(replyAfter: Int): ActorRef = { + if (r2dbcSettings.dialectName == "sqlserver") + throw new TestPendingException + else + super.benchActor(replyAfter) + } } diff --git a/core/src/test/scala/akka/persistence/r2dbc/query/EventsBySliceBacktrackingSpec.scala b/core/src/test/scala/akka/persistence/r2dbc/query/EventsBySliceBacktrackingSpec.scala index ab4b4c9c..551a03a6 100644 --- a/core/src/test/scala/akka/persistence/r2dbc/query/EventsBySliceBacktrackingSpec.scala +++ b/core/src/test/scala/akka/persistence/r2dbc/query/EventsBySliceBacktrackingSpec.scala @@ -20,7 +20,7 @@ import akka.persistence.r2dbc.R2dbcSettings import akka.persistence.r2dbc.internal.codec.TimestampCodec import akka.persistence.r2dbc.internal.codec.TimestampCodec.TimestampCodecRichStatement import akka.persistence.r2dbc.internal.codec.PayloadCodec.RichStatement -import akka.persistence.r2dbc.internal.Sql.Interpolation +import akka.persistence.r2dbc.internal.Sql.InterpolationWithAdapter import akka.persistence.r2dbc.TestConfig import akka.persistence.r2dbc.TestData import akka.persistence.r2dbc.TestDbLifecycle @@ -35,7 +35,6 @@ import akka.stream.testkit.scaladsl.TestSink import com.typesafe.config.ConfigFactory import org.scalatest.wordspec.AnyWordSpecLike import org.slf4j.LoggerFactory -import akka.persistence.r2dbc.internal.codec.PayloadCodec object EventsBySliceBacktrackingSpec { private val BufferSize = 10 // small buffer for testing @@ -57,9 +56,7 @@ class EventsBySliceBacktrackingSpec override def typedSystem: ActorSystem[_] = system private val settings = R2dbcSettings(system.settings.config.getConfig("akka.persistence.r2dbc")) - private implicit val journalPayloadCodec: PayloadCodec = settings.journalPayloadCodec - private implicit val timestampCodec: TimestampCodec = settings.timestampCodec - private implicit val queryAdapter: QueryAdapter = settings.queryAdapter + import settings.codecSettings.JournalImplicits._ private val query = PersistenceQuery(testKit.system) .readJournalFor[R2dbcReadJournal](R2dbcReadJournal.Identifier) diff --git a/core/src/test/scala/akka/persistence/r2dbc/state/DurableStateStoreChangeHandlerSpec.scala b/core/src/test/scala/akka/persistence/r2dbc/state/DurableStateStoreChangeHandlerSpec.scala index d0bde30a..b86b5899 100644 --- a/core/src/test/scala/akka/persistence/r2dbc/state/DurableStateStoreChangeHandlerSpec.scala +++ b/core/src/test/scala/akka/persistence/r2dbc/state/DurableStateStoreChangeHandlerSpec.scala @@ -18,7 +18,7 @@ import akka.persistence.query.UpdatedDurableState import akka.persistence.r2dbc.TestConfig import akka.persistence.r2dbc.TestData import akka.persistence.r2dbc.TestDbLifecycle -import akka.persistence.r2dbc.internal.Sql.Interpolation +import akka.persistence.r2dbc.internal.Sql.InterpolationWithAdapter import akka.persistence.r2dbc.session.scaladsl.R2dbcSession import akka.persistence.r2dbc.state.DurableStateStoreChangeHandlerSpec.config import akka.persistence.r2dbc.state.scaladsl.ChangeHandler diff --git a/migration/src/main/scala/akka/persistence/r2dbc/migration/MigrationToolDao.scala b/migration/src/main/scala/akka/persistence/r2dbc/migration/MigrationToolDao.scala index eca09b61..0c08e18a 100644 --- a/migration/src/main/scala/akka/persistence/r2dbc/migration/MigrationToolDao.scala +++ b/migration/src/main/scala/akka/persistence/r2dbc/migration/MigrationToolDao.scala @@ -12,7 +12,7 @@ import akka.Done import akka.actor.typed.ActorSystem import akka.annotation.InternalApi import akka.dispatch.ExecutionContexts -import akka.persistence.r2dbc.internal.Sql.Interpolation +import akka.persistence.r2dbc.internal.Sql.InterpolationWithAdapter import akka.persistence.r2dbc.internal.R2dbcExecutor import akka.persistence.r2dbc.internal.codec.IdentityAdapter import akka.persistence.r2dbc.internal.codec.QueryAdapter