Skip to content

Commit

Permalink
chore: place all codecs in CodecSettings
Browse files Browse the repository at this point in the history
  • Loading branch information
patriknw committed Feb 2, 2024
1 parent 03372ec commit f4944bb
Show file tree
Hide file tree
Showing 10 changed files with 85 additions and 75 deletions.
98 changes: 53 additions & 45 deletions core/src/main/scala/akka/persistence/r2dbc/R2dbcSettings.scala
Original file line number Diff line number Diff line change
Expand Up @@ -54,21 +54,12 @@ object R2dbcSettings {
s"Expected akka.persistence.r2dbc.$prefix.payload-column-type to be one of 'BYTEA', 'JSON' or 'JSONB' but found '$t'")
}

val journalPayloadCodec: PayloadCodec =
if (useJsonPayload("journal")) PayloadCodec.JsonCodec else PayloadCodec.ByteArrayCodec

val journalPublishEvents: Boolean = config.getBoolean("journal.publish-events")

val snapshotsTable: String = config.getString("snapshot.table")

val snapshotPayloadCodec: PayloadCodec =
if (useJsonPayload("snapshot")) PayloadCodec.JsonCodec else PayloadCodec.ByteArrayCodec

val durableStateTable: String = config.getString("state.table")

val durableStatePayloadCodec: PayloadCodec =
if (useJsonPayload("state")) PayloadCodec.JsonCodec else PayloadCodec.ByteArrayCodec

val durableStateTableByEntityType: Map[String, String] =
configToMap(config.getConfig("state.custom-table"))

Expand All @@ -88,18 +79,6 @@ object R2dbcSettings {

val connectionFactorySettings = ConnectionFactorySettings(config.getConfig("connection-factory"))

val (tagsCodec: TagsCodec, timestampCodec: TimestampCodec, queryAdapter: QueryAdapter) = {
connectionFactorySettings.dialect.name match {
case "sqlserver" =>
(
new TagsCodec.SqlServerTagsCodec(connectionFactorySettings.config),
TimestampCodec.SqlServerTimestampCodec,
SqlServerQueryAdapter)
case "h2" => (TagsCodec.H2TagsCodec, TimestampCodec.H2TimestampCodec, IdentityAdapter)
case _ => (TagsCodec.PostgresTagsCodec, TimestampCodec.PostgresTimestampCodec, IdentityAdapter)
}
}

val querySettings = new QuerySettings(config.getConfig("query"))

val dbTimestampMonotonicIncreasing: Boolean = config.getBoolean("db-timestamp-monotonic-increasing")
Expand All @@ -112,24 +91,55 @@ object R2dbcSettings {
case _ => config.getDuration("log-db-calls-exceeding").asScala
}

val codecSettings = {
val journalPayloadCodec: PayloadCodec =
if (useJsonPayload("journal")) PayloadCodec.JsonCodec else PayloadCodec.ByteArrayCodec
val snapshotPayloadCodec: PayloadCodec =
if (useJsonPayload("snapshot")) PayloadCodec.JsonCodec else PayloadCodec.ByteArrayCodec
val durableStatePayloadCodec: PayloadCodec =
if (useJsonPayload("state")) PayloadCodec.JsonCodec else PayloadCodec.ByteArrayCodec

connectionFactorySettings.dialect.name match {
case "sqlserver" =>
new CodecSettings(
journalPayloadCodec,
snapshotPayloadCodec,
durableStatePayloadCodec,
tagsCodec = new TagsCodec.SqlServerTagsCodec(connectionFactorySettings.config),
timestampCodec = TimestampCodec.SqlServerTimestampCodec,
queryAdapter = SqlServerQueryAdapter)
case "h2" =>
new CodecSettings(
journalPayloadCodec,
snapshotPayloadCodec,
durableStatePayloadCodec,
tagsCodec = TagsCodec.H2TagsCodec,
timestampCodec = TimestampCodec.H2TimestampCodec,
queryAdapter = IdentityAdapter)
case _ =>
new CodecSettings(
journalPayloadCodec,
snapshotPayloadCodec,
durableStatePayloadCodec,
tagsCodec = TagsCodec.PostgresTagsCodec,
timestampCodec = TimestampCodec.PostgresTimestampCodec,
queryAdapter = IdentityAdapter)
}
}

val cleanupSettings = new CleanupSettings(config.getConfig("cleanup"))
val settingsFromConfig = new R2dbcSettings(
schema,
journalTable,
journalPayloadCodec,
journalPublishEvents,
snapshotsTable,
snapshotPayloadCodec,
tagsCodec,
timestampCodec,
queryAdapter,
durableStateTable,
durableStatePayloadCodec,
durableStateAssertSingleWriter,
logDbCallsExceeding,
querySettings,
dbTimestampMonotonicIncreasing,
cleanupSettings,
codecSettings,
connectionFactorySettings,
durableStateTableByEntityType,
durableStateAdditionalColumnClasses,
Expand All @@ -153,20 +163,16 @@ object R2dbcSettings {
final class R2dbcSettings private (
val schema: Option[String],
val journalTable: String,
val journalPayloadCodec: PayloadCodec,
val journalPublishEvents: Boolean,
val snapshotsTable: String,
val snapshotPayloadCodec: PayloadCodec,
val tagsCodec: TagsCodec,
val timestampCodec: TimestampCodec,
val queryAdapter: QueryAdapter,
val durableStateTable: String,
val durableStatePayloadCodec: PayloadCodec,
val durableStateAssertSingleWriter: Boolean,
val logDbCallsExceeding: FiniteDuration,
val querySettings: QuerySettings,
val dbTimestampMonotonicIncreasing: Boolean,
val cleanupSettings: CleanupSettings,
/** INTERNAL API */
@InternalApi private[akka] val codecSettings: CodecSettings,
_connectionFactorySettings: ConnectionFactorySettings,
_durableStateTableByEntityType: Map[String, String],
_durableStateAdditionalColumnClasses: Map[String, immutable.IndexedSeq[String]],
Expand Down Expand Up @@ -234,20 +240,15 @@ final class R2dbcSettings private (
private def copy(
schema: Option[String] = schema,
journalTable: String = journalTable,
journalPayloadCodec: PayloadCodec = journalPayloadCodec,
journalPublishEvents: Boolean = journalPublishEvents,
snapshotsTable: String = snapshotsTable,
snapshotPayloadCodec: PayloadCodec = snapshotPayloadCodec,
tagsCodec: TagsCodec = tagsCodec,
timestampCodec: TimestampCodec = timestampCodec,
queryAdapter: QueryAdapter = queryAdapter,
durableStateTable: String = durableStateTable,
durableStatePayloadCodec: PayloadCodec = durableStatePayloadCodec,
durableStateAssertSingleWriter: Boolean = durableStateAssertSingleWriter,
logDbCallsExceeding: FiniteDuration = logDbCallsExceeding,
querySettings: QuerySettings = querySettings,
dbTimestampMonotonicIncreasing: Boolean = dbTimestampMonotonicIncreasing,
cleanupSettings: CleanupSettings = cleanupSettings,
codecSettings: CodecSettings = codecSettings,
connectionFactorySettings: ConnectionFactorySettings = connectionFactorySettings,
durableStateTableByEntityType: Map[String, String] = _durableStateTableByEntityType,
durableStateAdditionalColumnClasses: Map[String, immutable.IndexedSeq[String]] =
Expand All @@ -257,20 +258,15 @@ final class R2dbcSettings private (
new R2dbcSettings(
schema,
journalTable,
journalPayloadCodec,
journalPublishEvents,
snapshotsTable,
snapshotPayloadCodec,
tagsCodec,
timestampCodec,
queryAdapter,
durableStateTable,
durableStatePayloadCodec,
durableStateAssertSingleWriter,
logDbCallsExceeding,
querySettings,
dbTimestampMonotonicIncreasing,
cleanupSettings,
codecSettings,
connectionFactorySettings,
_durableStateTableByEntityType,
_durableStateAdditionalColumnClasses,
Expand Down Expand Up @@ -328,6 +324,18 @@ final class PublishEventsDynamicSettings(config: Config) {
val throughputCollectInterval: FiniteDuration = config.getDuration("throughput-collect-interval").asScala
}

/**
* INTERNAL API
*/
@InternalStableApi
final class CodecSettings(
val journalPayloadCodec: PayloadCodec,
val snapshotPayloadCodec: PayloadCodec,
val durableStatePayloadCodec: PayloadCodec,
val tagsCodec: TagsCodec,
val timestampCodec: TimestampCodec,
val queryAdapter: QueryAdapter)

/**
* INTERNAL API
*/
Expand Down
10 changes: 6 additions & 4 deletions core/src/main/scala/akka/persistence/r2dbc/internal/Sql.scala
Original file line number Diff line number Diff line change
Expand Up @@ -29,11 +29,13 @@ object Sql {
}

/**
* INTERNAL API: Scala string interpolation with `sql` prefix. Replaces `?` with numbered `\$1`, `\$2` for bind parameters. Use `??`
* to include a literal `?`. Trims whitespace, including line breaks. Standard string interpolation arguments `$` can
* be used.
* INTERNAL API: Scala string interpolation with `sql` prefix. Replaces `?` with numbered `\$1`, `\$2` for bind
* parameters. Use `??` to include a literal `?`. Trims whitespace, including line breaks. Standard string
* interpolation arguments `$` can be used.
*/
@InternalApi private[akka] implicit class InterpolationWithAdapter(val sc: StringContext)(implicit adapter: QueryAdapter) extends AnyRef {
@InternalApi private[akka] implicit class InterpolationWithAdapter(val sc: StringContext)(implicit
adapter: QueryAdapter)
extends AnyRef {
def sql(args: Any*): String =
adapter(fillInParameterNumbers(trimLineBreaks(sc.s(args: _*))))
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -99,11 +99,11 @@ private[r2dbc] class PostgresDurableStateDao(
settings.logDbCallsExceeding,
settings.connectionFactorySettings.poolSettings.closeCallsExceeding)(ec, system)

private implicit val statePayloadCodec: PayloadCodec = settings.durableStatePayloadCodec
private implicit val tagsCodec: TagsCodec = settings.tagsCodec
private implicit val statePayloadCodec: PayloadCodec = settings.codecSettings.durableStatePayloadCodec
private implicit val tagsCodec: TagsCodec = settings.codecSettings.tagsCodec

protected implicit val timestampCodec: TimestampCodec = settings.timestampCodec
protected implicit val queryAdapter: QueryAdapter = settings.queryAdapter
protected implicit val timestampCodec: TimestampCodec = settings.codecSettings.timestampCodec
protected implicit val queryAdapter: QueryAdapter = settings.codecSettings.queryAdapter

// used for change events
private lazy val journalDao: JournalDao = dialect.createJournalDao(settings, connectionFactory)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,10 +80,10 @@ private[r2dbc] class PostgresJournalDao(journalSettings: R2dbcSettings, connecti
journalSettings.connectionFactorySettings.poolSettings.closeCallsExceeding)(ec, system)

protected val journalTable: String = journalSettings.journalTableWithSchema
protected implicit val journalPayloadCodec: PayloadCodec = journalSettings.journalPayloadCodec
protected implicit val tagsCodec: TagsCodec = journalSettings.tagsCodec
protected implicit val timestampCodec: TimestampCodec = journalSettings.timestampCodec
protected implicit val queryAdapter: QueryAdapter = journalSettings.queryAdapter
protected implicit val journalPayloadCodec: PayloadCodec = journalSettings.codecSettings.journalPayloadCodec
protected implicit val tagsCodec: TagsCodec = journalSettings.codecSettings.tagsCodec
protected implicit val timestampCodec: TimestampCodec = journalSettings.codecSettings.timestampCodec
protected implicit val queryAdapter: QueryAdapter = journalSettings.codecSettings.queryAdapter

protected val (insertEventWithParameterTimestampSql, insertEventWithTransactionTimestampSql) = {
val baseSql =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,10 +58,10 @@ private[r2dbc] class PostgresQueryDao(settings: R2dbcSettings, connectionFactory

protected def log: Logger = PostgresQueryDao.log
protected val journalTable: String = settings.journalTableWithSchema
protected implicit val journalPayloadCodec: PayloadCodec = settings.journalPayloadCodec
protected implicit val timestampCodec: TimestampCodec = settings.timestampCodec
protected implicit val tagsCodec: TagsCodec = settings.tagsCodec
protected implicit val queryAdapter: QueryAdapter = settings.queryAdapter
protected implicit val journalPayloadCodec: PayloadCodec = settings.codecSettings.journalPayloadCodec
protected implicit val timestampCodec: TimestampCodec = settings.codecSettings.timestampCodec
protected implicit val tagsCodec: TagsCodec = settings.codecSettings.tagsCodec
protected implicit val queryAdapter: QueryAdapter = settings.codecSettings.queryAdapter

protected def sqlFalse: String = "false"
protected def sqlDbTimestamp = "CURRENT_TIMESTAMP"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,10 +63,10 @@ private[r2dbc] class PostgresSnapshotDao(settings: R2dbcSettings, connectionFact

protected val snapshotTable: String = settings.snapshotsTableWithSchema

protected implicit val snapshotPayloadCodec: PayloadCodec = settings.snapshotPayloadCodec
protected implicit val timestampCodec: TimestampCodec = settings.timestampCodec
protected implicit val tagsCodec: TagsCodec = settings.tagsCodec
protected implicit val queryAdapter: QueryAdapter = settings.queryAdapter
protected implicit val snapshotPayloadCodec: PayloadCodec = settings.codecSettings.snapshotPayloadCodec
protected implicit val timestampCodec: TimestampCodec = settings.codecSettings.timestampCodec
protected implicit val tagsCodec: TagsCodec = settings.codecSettings.tagsCodec
protected implicit val queryAdapter: QueryAdapter = settings.codecSettings.queryAdapter

protected val r2dbcExecutor = new R2dbcExecutor(
connectionFactory,
Expand Down
10 changes: 5 additions & 5 deletions core/src/test/scala/akka/persistence/r2dbc/PayloadSpec.scala
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ class PayloadSpec
}

private def selectJournalRow(persistenceId: String): TestRow = {
implicit val journalPayloadCodec: PayloadCodec = settings.journalPayloadCodec
implicit val journalPayloadCodec: PayloadCodec = settings.codecSettings.journalPayloadCodec

r2dbcExecutor
.selectOne[TestRow]("test")(
Expand All @@ -108,7 +108,7 @@ class PayloadSpec
}

private def selectSnapshotRow(persistenceId: String): TestRow = {
implicit val snapshotPayloadCodec: PayloadCodec = settings.snapshotPayloadCodec
implicit val snapshotPayloadCodec: PayloadCodec = settings.codecSettings.snapshotPayloadCodec

r2dbcExecutor
.selectOne[TestRow]("test")(
Expand All @@ -127,7 +127,7 @@ class PayloadSpec
}

private def selectDurableStateRow(persistenceId: String): TestRow = {
implicit val durableStatePayloadCodec: PayloadCodec = settings.durableStatePayloadCodec
implicit val durableStatePayloadCodec: PayloadCodec = settings.codecSettings.durableStatePayloadCodec

r2dbcExecutor
.selectOne[TestRow]("test")(
Expand Down Expand Up @@ -205,7 +205,7 @@ class PayloadSpec
testKit.stop(ref1)

val row1 = selectDurableStateRow(persistenceId)
row1.payload.toVector shouldBe settings.durableStatePayloadCodec.nonePayload.toVector
row1.payload.toVector shouldBe settings.codecSettings.durableStatePayloadCodec.nonePayload.toVector

val ref2 = spawn(DurableStatePersister(persistenceId))
ref2 ! DurableStatePersister.GetState(probe.ref)
Expand All @@ -228,7 +228,7 @@ class PayloadSpec
testKit.stop(ref3)

val row3 = selectDurableStateRow(persistenceId)
row3.payload.toVector shouldBe settings.durableStatePayloadCodec.nonePayload.toVector
row3.payload.toVector shouldBe settings.codecSettings.durableStatePayloadCodec.nonePayload.toVector
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ class PersistTagsSpec

override def typedSystem: ActorSystem[_] = system
private val settings = R2dbcSettings(system.settings.config.getConfig("akka.persistence.r2dbc"))
private implicit val tagsCodec: TagsCodec = settings.tagsCodec
private implicit val tagsCodec: TagsCodec = settings.codecSettings.tagsCodec
case class Row(pid: String, seqNr: Long, tags: Set[String])

"Persist tags" should {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ class PersistTimestampSpec
override def typedSystem: ActorSystem[_] = system
private val settings = R2dbcSettings(system.settings.config.getConfig("akka.persistence.r2dbc"))
private val serialization = SerializationExtension(system)
private implicit val journalPayloadCodec: PayloadCodec = settings.journalPayloadCodec
private implicit val journalPayloadCodec: PayloadCodec = settings.codecSettings.journalPayloadCodec
case class Row(pid: String, seqNr: Long, dbTimestamp: Instant, event: String)

implicit private val codec: TimestampCodec =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,9 +57,9 @@ class EventsBySliceBacktrackingSpec

override def typedSystem: ActorSystem[_] = system
private val settings = R2dbcSettings(system.settings.config.getConfig("akka.persistence.r2dbc"))
private implicit val journalPayloadCodec: PayloadCodec = settings.journalPayloadCodec
private implicit val timestampCodec: TimestampCodec = settings.timestampCodec
private implicit val queryAdapter: QueryAdapter = settings.queryAdapter
private implicit val journalPayloadCodec: PayloadCodec = settings.codecSettings.journalPayloadCodec
private implicit val timestampCodec: TimestampCodec = settings.codecSettings.timestampCodec
private implicit val queryAdapter: QueryAdapter = settings.codecSettings.queryAdapter

private val query = PersistenceQuery(testKit.system)
.readJournalFor[R2dbcReadJournal](R2dbcReadJournal.Identifier)
Expand Down

0 comments on commit f4944bb

Please sign in to comment.