Skip to content

Commit

Permalink
chore: Make Sql.Interpolation backwards compatible (#514)
Browse files Browse the repository at this point in the history
* chore: Make Sql.Interpolation backwards compatible
  * used in other projects that we don't want to break
* chore: place all codecs in CodecSettings
* remove some boilerplate for the codec implicits
* R2dbcJournalPerfSpec pending for sqlserver
  • Loading branch information
patriknw authored Feb 2, 2024
1 parent e28e821 commit a7ac7e0
Show file tree
Hide file tree
Showing 24 changed files with 139 additions and 119 deletions.
Original file line number Diff line number Diff line change
@@ -1,16 +1,12 @@
ProblemFilters.exclude[DirectMissingMethodProblem]("akka.persistence.r2dbc.R2dbcSettings.this")
ProblemFilters.exclude[MissingClassProblem]("akka.persistence.r2dbc.internal.Sql$Interpolation$")
ProblemFilters.exclude[DirectMissingMethodProblem]("akka.persistence.r2dbc.internal.Sql.Interpolation")
ProblemFilters.exclude[DirectMissingMethodProblem]("akka.persistence.r2dbc.internal.Sql.Interpolation")
ProblemFilters.exclude[DirectMissingMethodProblem]("akka.persistence.r2dbc.internal.Sql#Interpolation.this")
ProblemFilters.exclude[MissingClassProblem]("akka.persistence.r2dbc.internal.h2.H2Utils")
ProblemFilters.exclude[MissingClassProblem]("akka.persistence.r2dbc.internal.h2.H2Utils$")
ProblemFilters.exclude[IncompatibleResultTypeProblem]("akka.persistence.r2dbc.R2dbcSettings.journalPayloadCodec")
ProblemFilters.exclude[MissingClassProblem]("akka.persistence.r2dbc.internal.PayloadCodec$RichStatement")
ProblemFilters.exclude[MissingClassProblem]("akka.persistence.r2dbc.internal.PayloadCodec$RichRow")
ProblemFilters.exclude[MissingClassProblem]("akka.persistence.r2dbc.internal.PayloadCodec$JsonCodec$")
ProblemFilters.exclude[MissingClassProblem]("akka.persistence.r2dbc.internal.PayloadCodec$ByteArrayCodec$")
ProblemFilters.exclude[MissingClassProblem]("akka.persistence.r2dbc.internal.PayloadCodec$")
ProblemFilters.exclude[MissingClassProblem]("akka.persistence.r2dbc.internal.PayloadCodec")
ProblemFilters.exclude[IncompatibleResultTypeProblem]("akka.persistence.r2dbc.R2dbcSettings.durableStatePayloadCodec")
ProblemFilters.exclude[IncompatibleResultTypeProblem]("akka.persistence.r2dbc.R2dbcSettings.snapshotPayloadCodec")
ProblemFilters.exclude[DirectMissingMethodProblem]("akka.persistence.r2dbc.R2dbcSettings.journalPayloadCodec")
ProblemFilters.exclude[DirectMissingMethodProblem]("akka.persistence.r2dbc.R2dbcSettings.snapshotPayloadCodec")
ProblemFilters.exclude[DirectMissingMethodProblem]("akka.persistence.r2dbc.R2dbcSettings.durableStatePayloadCodec")
119 changes: 74 additions & 45 deletions core/src/main/scala/akka/persistence/r2dbc/R2dbcSettings.scala
Original file line number Diff line number Diff line change
Expand Up @@ -54,21 +54,12 @@ object R2dbcSettings {
s"Expected akka.persistence.r2dbc.$prefix.payload-column-type to be one of 'BYTEA', 'JSON' or 'JSONB' but found '$t'")
}

val journalPayloadCodec: PayloadCodec =
if (useJsonPayload("journal")) PayloadCodec.JsonCodec else PayloadCodec.ByteArrayCodec

val journalPublishEvents: Boolean = config.getBoolean("journal.publish-events")

val snapshotsTable: String = config.getString("snapshot.table")

val snapshotPayloadCodec: PayloadCodec =
if (useJsonPayload("snapshot")) PayloadCodec.JsonCodec else PayloadCodec.ByteArrayCodec

val durableStateTable: String = config.getString("state.table")

val durableStatePayloadCodec: PayloadCodec =
if (useJsonPayload("state")) PayloadCodec.JsonCodec else PayloadCodec.ByteArrayCodec

val durableStateTableByEntityType: Map[String, String] =
configToMap(config.getConfig("state.custom-table"))

Expand All @@ -88,18 +79,6 @@ object R2dbcSettings {

val connectionFactorySettings = ConnectionFactorySettings(config.getConfig("connection-factory"))

val (tagsCodec: TagsCodec, timestampCodec: TimestampCodec, queryAdapter: QueryAdapter) = {
connectionFactorySettings.dialect.name match {
case "sqlserver" =>
(
new TagsCodec.SqlServerTagsCodec(connectionFactorySettings.config),
TimestampCodec.SqlServerTimestampCodec,
SqlServerQueryAdapter)
case "h2" => (TagsCodec.H2TagsCodec, TimestampCodec.H2TimestampCodec, IdentityAdapter)
case _ => (TagsCodec.PostgresTagsCodec, TimestampCodec.PostgresTimestampCodec, IdentityAdapter)
}
}

val querySettings = new QuerySettings(config.getConfig("query"))

val dbTimestampMonotonicIncreasing: Boolean = config.getBoolean("db-timestamp-monotonic-increasing")
Expand All @@ -112,24 +91,55 @@ object R2dbcSettings {
case _ => config.getDuration("log-db-calls-exceeding").asScala
}

val codecSettings = {
val journalPayloadCodec: PayloadCodec =
if (useJsonPayload("journal")) PayloadCodec.JsonCodec else PayloadCodec.ByteArrayCodec
val snapshotPayloadCodec: PayloadCodec =
if (useJsonPayload("snapshot")) PayloadCodec.JsonCodec else PayloadCodec.ByteArrayCodec
val durableStatePayloadCodec: PayloadCodec =
if (useJsonPayload("state")) PayloadCodec.JsonCodec else PayloadCodec.ByteArrayCodec

connectionFactorySettings.dialect.name match {
case "sqlserver" =>
new CodecSettings(
journalPayloadCodec,
snapshotPayloadCodec,
durableStatePayloadCodec,
tagsCodec = new TagsCodec.SqlServerTagsCodec(connectionFactorySettings.config),
timestampCodec = TimestampCodec.SqlServerTimestampCodec,
queryAdapter = SqlServerQueryAdapter)
case "h2" =>
new CodecSettings(
journalPayloadCodec,
snapshotPayloadCodec,
durableStatePayloadCodec,
tagsCodec = TagsCodec.H2TagsCodec,
timestampCodec = TimestampCodec.H2TimestampCodec,
queryAdapter = IdentityAdapter)
case _ =>
new CodecSettings(
journalPayloadCodec,
snapshotPayloadCodec,
durableStatePayloadCodec,
tagsCodec = TagsCodec.PostgresTagsCodec,
timestampCodec = TimestampCodec.PostgresTimestampCodec,
queryAdapter = IdentityAdapter)
}
}

val cleanupSettings = new CleanupSettings(config.getConfig("cleanup"))
val settingsFromConfig = new R2dbcSettings(
schema,
journalTable,
journalPayloadCodec,
journalPublishEvents,
snapshotsTable,
snapshotPayloadCodec,
tagsCodec,
timestampCodec,
queryAdapter,
durableStateTable,
durableStatePayloadCodec,
durableStateAssertSingleWriter,
logDbCallsExceeding,
querySettings,
dbTimestampMonotonicIncreasing,
cleanupSettings,
codecSettings,
connectionFactorySettings,
durableStateTableByEntityType,
durableStateAdditionalColumnClasses,
Expand All @@ -153,20 +163,16 @@ object R2dbcSettings {
final class R2dbcSettings private (
val schema: Option[String],
val journalTable: String,
val journalPayloadCodec: PayloadCodec,
val journalPublishEvents: Boolean,
val snapshotsTable: String,
val snapshotPayloadCodec: PayloadCodec,
val tagsCodec: TagsCodec,
val timestampCodec: TimestampCodec,
val queryAdapter: QueryAdapter,
val durableStateTable: String,
val durableStatePayloadCodec: PayloadCodec,
val durableStateAssertSingleWriter: Boolean,
val logDbCallsExceeding: FiniteDuration,
val querySettings: QuerySettings,
val dbTimestampMonotonicIncreasing: Boolean,
val cleanupSettings: CleanupSettings,
/** INTERNAL API */
@InternalApi private[akka] val codecSettings: CodecSettings,
_connectionFactorySettings: ConnectionFactorySettings,
_durableStateTableByEntityType: Map[String, String],
_durableStateAdditionalColumnClasses: Map[String, immutable.IndexedSeq[String]],
Expand Down Expand Up @@ -234,20 +240,15 @@ final class R2dbcSettings private (
private def copy(
schema: Option[String] = schema,
journalTable: String = journalTable,
journalPayloadCodec: PayloadCodec = journalPayloadCodec,
journalPublishEvents: Boolean = journalPublishEvents,
snapshotsTable: String = snapshotsTable,
snapshotPayloadCodec: PayloadCodec = snapshotPayloadCodec,
tagsCodec: TagsCodec = tagsCodec,
timestampCodec: TimestampCodec = timestampCodec,
queryAdapter: QueryAdapter = queryAdapter,
durableStateTable: String = durableStateTable,
durableStatePayloadCodec: PayloadCodec = durableStatePayloadCodec,
durableStateAssertSingleWriter: Boolean = durableStateAssertSingleWriter,
logDbCallsExceeding: FiniteDuration = logDbCallsExceeding,
querySettings: QuerySettings = querySettings,
dbTimestampMonotonicIncreasing: Boolean = dbTimestampMonotonicIncreasing,
cleanupSettings: CleanupSettings = cleanupSettings,
codecSettings: CodecSettings = codecSettings,
connectionFactorySettings: ConnectionFactorySettings = connectionFactorySettings,
durableStateTableByEntityType: Map[String, String] = _durableStateTableByEntityType,
durableStateAdditionalColumnClasses: Map[String, immutable.IndexedSeq[String]] =
Expand All @@ -257,20 +258,15 @@ final class R2dbcSettings private (
new R2dbcSettings(
schema,
journalTable,
journalPayloadCodec,
journalPublishEvents,
snapshotsTable,
snapshotPayloadCodec,
tagsCodec,
timestampCodec,
queryAdapter,
durableStateTable,
durableStatePayloadCodec,
durableStateAssertSingleWriter,
logDbCallsExceeding,
querySettings,
dbTimestampMonotonicIncreasing,
cleanupSettings,
codecSettings,
connectionFactorySettings,
_durableStateTableByEntityType,
_durableStateAdditionalColumnClasses,
Expand Down Expand Up @@ -328,6 +324,39 @@ final class PublishEventsDynamicSettings(config: Config) {
val throughputCollectInterval: FiniteDuration = config.getDuration("throughput-collect-interval").asScala
}

/**
* INTERNAL API
*/
@InternalStableApi
final class CodecSettings(
val journalPayloadCodec: PayloadCodec,
val snapshotPayloadCodec: PayloadCodec,
val durableStatePayloadCodec: PayloadCodec,
val tagsCodec: TagsCodec,
val timestampCodec: TimestampCodec,
val queryAdapter: QueryAdapter) {

// implicits that can be imported
object JournalImplicits {
implicit def journalPayloadCodec: PayloadCodec = CodecSettings.this.journalPayloadCodec
implicit def tagsCodec: TagsCodec = CodecSettings.this.tagsCodec
implicit def timestampCodec: TimestampCodec = CodecSettings.this.timestampCodec
implicit def queryAdapter: QueryAdapter = CodecSettings.this.queryAdapter
}
object SnapshotImplicits {
implicit def snapshotPayloadCodec: PayloadCodec = CodecSettings.this.snapshotPayloadCodec
implicit def tagsCodec: TagsCodec = CodecSettings.this.tagsCodec
implicit def timestampCodec: TimestampCodec = CodecSettings.this.timestampCodec
implicit def queryAdapter: QueryAdapter = CodecSettings.this.queryAdapter
}
object DurableStateImplicits {
implicit def durableStatePayloadCodec: PayloadCodec = CodecSettings.this.durableStatePayloadCodec
implicit def tagsCodec: TagsCodec = CodecSettings.this.tagsCodec
implicit def timestampCodec: TimestampCodec = CodecSettings.this.timestampCodec
implicit def queryAdapter: QueryAdapter = CodecSettings.this.queryAdapter
}
}

/**
* INTERNAL API
*/
Expand Down
16 changes: 15 additions & 1 deletion core/src/main/scala/akka/persistence/r2dbc/internal/Sql.scala
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@
package akka.persistence.r2dbc.internal

import scala.annotation.varargs

import akka.annotation.InternalApi
import akka.annotation.InternalStableApi
import akka.persistence.r2dbc.internal.codec.IdentityAdapter
import akka.persistence.r2dbc.internal.codec.QueryAdapter
Expand All @@ -21,7 +23,19 @@ object Sql {
* to include a literal `?`. Trims whitespace, including line breaks. Standard string interpolation arguments `$` can
* be used.
*/
implicit class Interpolation(val sc: StringContext)(implicit adapter: QueryAdapter) extends AnyRef {
implicit class Interpolation(val sc: StringContext) extends AnyVal {
def sql(args: Any*): String =
fillInParameterNumbers(trimLineBreaks(sc.s(args: _*)))
}

/**
* INTERNAL API: Scala string interpolation with `sql` prefix. Replaces `?` with numbered `\$1`, `\$2` for bind
* parameters. Use `??` to include a literal `?`. Trims whitespace, including line breaks. Standard string
* interpolation arguments `$` can be used.
*/
@InternalApi private[akka] implicit class InterpolationWithAdapter(val sc: StringContext)(implicit
adapter: QueryAdapter)
extends AnyRef {
def sql(args: Any*): String =
adapter(fillInParameterNumbers(trimLineBreaks(sc.s(args: _*))))
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ import akka.persistence.r2dbc.internal.DurableStateDao
import akka.persistence.r2dbc.internal.JournalDao
import akka.persistence.r2dbc.internal.QueryDao
import akka.persistence.r2dbc.internal.SnapshotDao
import akka.persistence.r2dbc.internal.Sql.Interpolation
import akka.persistence.r2dbc.internal.Sql.InterpolationWithAdapter
import akka.util.ccompat.JavaConverters._
import com.typesafe.config.Config
import io.r2dbc.h2.H2ConnectionConfiguration
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import akka.dispatch.ExecutionContexts
import akka.persistence.r2dbc.R2dbcSettings
import akka.persistence.r2dbc.internal.JournalDao
import akka.persistence.r2dbc.internal.codec.PayloadCodec.RichStatement
import akka.persistence.r2dbc.internal.Sql.Interpolation
import akka.persistence.r2dbc.internal.Sql.InterpolationWithAdapter
import akka.persistence.r2dbc.internal.postgres.PostgresJournalDao
import io.r2dbc.spi.ConnectionFactory
import io.r2dbc.spi.Statement
Expand All @@ -34,6 +34,7 @@ private[r2dbc] class H2JournalDao(journalSettings: R2dbcSettings, connectionFact
system: ActorSystem[_])
extends PostgresJournalDao(journalSettings, connectionFactory) {
import JournalDao.SerializedJournalRow
import journalSettings.codecSettings.JournalImplicits._
override protected lazy val log: Logger = LoggerFactory.getLogger(classOf[H2JournalDao])
// always app timestamp (db is same process) monotonic increasing
require(journalSettings.useAppTimestamp)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ package akka.persistence.r2dbc.internal.h2
import akka.actor.typed.ActorSystem
import akka.annotation.InternalApi
import akka.persistence.r2dbc.R2dbcSettings
import akka.persistence.r2dbc.internal.Sql.Interpolation
import akka.persistence.r2dbc.internal.Sql.InterpolationWithAdapter
import akka.persistence.r2dbc.internal.postgres.PostgresQueryDao
import io.r2dbc.spi.ConnectionFactory
import io.r2dbc.spi.Row
Expand All @@ -26,6 +26,7 @@ private[r2dbc] class H2QueryDao(settings: R2dbcSettings, connectionFactory: Conn
ec: ExecutionContext,
system: ActorSystem[_])
extends PostgresQueryDao(settings, connectionFactory) {
import settings.codecSettings.JournalImplicits._
override protected lazy val log: Logger = LoggerFactory.getLogger(classOf[H2QueryDao])

override protected def eventsBySlicesRangeSql(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ package akka.persistence.r2dbc.internal.h2
import akka.actor.typed.ActorSystem
import akka.annotation.InternalApi
import akka.persistence.r2dbc.R2dbcSettings
import akka.persistence.r2dbc.internal.Sql.Interpolation
import akka.persistence.r2dbc.internal.Sql.InterpolationWithAdapter
import akka.persistence.r2dbc.internal.postgres.PostgresSnapshotDao
import io.r2dbc.spi.ConnectionFactory
import org.slf4j.Logger
Expand All @@ -24,6 +24,7 @@ private[r2dbc] final class H2SnapshotDao(settings: R2dbcSettings, connectionFact
ec: ExecutionContext,
system: ActorSystem[_])
extends PostgresSnapshotDao(settings, connectionFactory) {
import settings.codecSettings.SnapshotImplicits._

override protected lazy val log: Logger = LoggerFactory.getLogger(classOf[H2SnapshotDao])

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,13 +47,8 @@ import akka.persistence.r2dbc.internal.JournalDao.SerializedJournalRow
import akka.persistence.r2dbc.internal.codec.PayloadCodec.RichRow
import akka.persistence.r2dbc.internal.codec.PayloadCodec.RichStatement
import akka.persistence.r2dbc.internal.R2dbcExecutor
import akka.persistence.r2dbc.internal.Sql.Interpolation
import akka.persistence.r2dbc.internal.codec.PayloadCodec
import akka.persistence.r2dbc.internal.codec.QueryAdapter
import akka.persistence.r2dbc.internal.codec.SqlServerQueryAdapter
import akka.persistence.r2dbc.internal.codec.TagsCodec
import akka.persistence.r2dbc.internal.Sql.InterpolationWithAdapter
import akka.persistence.r2dbc.internal.codec.TagsCodec.TagsCodecRichStatement
import akka.persistence.r2dbc.internal.codec.TimestampCodec
import akka.persistence.r2dbc.internal.codec.TimestampCodec.TimestampCodecRichRow
import akka.persistence.r2dbc.internal.codec.TimestampCodec.TimestampCodecRichStatement
import akka.persistence.r2dbc.session.scaladsl.R2dbcSession
Expand Down Expand Up @@ -90,6 +85,7 @@ private[r2dbc] class PostgresDurableStateDao(
extends DurableStateDao {
import DurableStateDao._
import PostgresDurableStateDao._
import settings.codecSettings.DurableStateImplicits._
protected def log: Logger = PostgresDurableStateDao.log

private val persistenceExt = Persistence(system)
Expand All @@ -99,12 +95,6 @@ private[r2dbc] class PostgresDurableStateDao(
settings.logDbCallsExceeding,
settings.connectionFactorySettings.poolSettings.closeCallsExceeding)(ec, system)

private implicit val statePayloadCodec: PayloadCodec = settings.durableStatePayloadCodec
private implicit val tagsCodec: TagsCodec = settings.tagsCodec

protected implicit val timestampCodec: TimestampCodec = settings.timestampCodec
protected implicit val queryAdapter: QueryAdapter = settings.queryAdapter

// used for change events
private lazy val journalDao: JournalDao = dialect.createJournalDao(settings, connectionFactory)

Expand Down Expand Up @@ -322,7 +312,7 @@ private[r2dbc] class PostgresDurableStateDao(
private def getPayload(row: Row): Option[Array[Byte]] = {
val serId = row.get("state_ser_id", classOf[Integer])
val rowPayload = row.getPayload("state_payload")
if (serId == 0 && (rowPayload == null || util.Arrays.equals(statePayloadCodec.nonePayload, rowPayload)))
if (serId == 0 && (rowPayload == null || util.Arrays.equals(durableStatePayloadCodec.nonePayload, rowPayload)))
None // delete marker
else
Option(rowPayload)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,8 @@ import akka.persistence.r2dbc.internal.JournalDao
import akka.persistence.r2dbc.internal.codec.PayloadCodec.RichStatement
import akka.persistence.r2dbc.internal.R2dbcExecutor
import akka.persistence.r2dbc.internal.SerializedEventMetadata
import akka.persistence.r2dbc.internal.Sql.Interpolation
import akka.persistence.r2dbc.internal.codec.TagsCodec
import akka.persistence.r2dbc.internal.Sql.InterpolationWithAdapter
import akka.persistence.r2dbc.internal.codec.TagsCodec.TagsCodecRichStatement
import akka.persistence.r2dbc.internal.codec.TimestampCodec
import akka.persistence.r2dbc.internal.codec.TimestampCodec.TimestampCodecRichRow
import akka.persistence.r2dbc.internal.codec.TimestampCodec.TimestampCodecRichStatement
import akka.persistence.typed.PersistenceId
Expand Down Expand Up @@ -68,6 +66,7 @@ private[r2dbc] class PostgresJournalDao(journalSettings: R2dbcSettings, connecti
system: ActorSystem[_])
extends JournalDao {
import JournalDao.SerializedJournalRow
import journalSettings.codecSettings.JournalImplicits._
protected def log: Logger = PostgresJournalDao.log

private val persistenceExt = Persistence(system)
Expand All @@ -80,10 +79,6 @@ private[r2dbc] class PostgresJournalDao(journalSettings: R2dbcSettings, connecti
journalSettings.connectionFactorySettings.poolSettings.closeCallsExceeding)(ec, system)

protected val journalTable: String = journalSettings.journalTableWithSchema
protected implicit val journalPayloadCodec: PayloadCodec = journalSettings.journalPayloadCodec
protected implicit val tagsCodec: TagsCodec = journalSettings.tagsCodec
protected implicit val timestampCodec: TimestampCodec = journalSettings.timestampCodec
protected implicit val queryAdapter: QueryAdapter = journalSettings.queryAdapter

protected val (insertEventWithParameterTimestampSql, insertEventWithTransactionTimestampSql) = {
val baseSql =
Expand Down
Loading

0 comments on commit a7ac7e0

Please sign in to comment.