Skip to content

Commit

Permalink
remove some boilerplate for the codec implicits
Browse files Browse the repository at this point in the history
  • Loading branch information
patriknw committed Feb 2, 2024
1 parent 8743aaa commit 6d30fdf
Show file tree
Hide file tree
Showing 16 changed files with 40 additions and 48 deletions.
23 changes: 22 additions & 1 deletion core/src/main/scala/akka/persistence/r2dbc/R2dbcSettings.scala
Original file line number Diff line number Diff line change
Expand Up @@ -334,7 +334,28 @@ final class CodecSettings(
val durableStatePayloadCodec: PayloadCodec,
val tagsCodec: TagsCodec,
val timestampCodec: TimestampCodec,
val queryAdapter: QueryAdapter)
val queryAdapter: QueryAdapter) {

// implicits that can be imported
object JournalImplicits {
implicit def journalPayloadCodec: PayloadCodec = CodecSettings.this.journalPayloadCodec
implicit def tagsCodec: TagsCodec = CodecSettings.this.tagsCodec
implicit def timestampCodec: TimestampCodec = CodecSettings.this.timestampCodec
implicit def queryAdapter: QueryAdapter = CodecSettings.this.queryAdapter
}
object SnapshotImplicits {
implicit def snapshotPayloadCodec: PayloadCodec = CodecSettings.this.snapshotPayloadCodec
implicit def tagsCodec: TagsCodec = CodecSettings.this.tagsCodec
implicit def timestampCodec: TimestampCodec = CodecSettings.this.timestampCodec
implicit def queryAdapter: QueryAdapter = CodecSettings.this.queryAdapter
}
object DurableStateImplicits {
implicit def durableStatePayloadCodec: PayloadCodec = CodecSettings.this.durableStatePayloadCodec
implicit def tagsCodec: TagsCodec = CodecSettings.this.tagsCodec
implicit def timestampCodec: TimestampCodec = CodecSettings.this.timestampCodec
implicit def queryAdapter: QueryAdapter = CodecSettings.this.queryAdapter
}
}

/**
* INTERNAL API
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ private[r2dbc] class H2JournalDao(journalSettings: R2dbcSettings, connectionFact
system: ActorSystem[_])
extends PostgresJournalDao(journalSettings, connectionFactory) {
import JournalDao.SerializedJournalRow
import journalSettings.codecSettings.JournalImplicits._
override protected lazy val log: Logger = LoggerFactory.getLogger(classOf[H2JournalDao])
// always app timestamp (db is same process) monotonic increasing
require(journalSettings.useAppTimestamp)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ private[r2dbc] class H2QueryDao(settings: R2dbcSettings, connectionFactory: Conn
ec: ExecutionContext,
system: ActorSystem[_])
extends PostgresQueryDao(settings, connectionFactory) {
import settings.codecSettings.JournalImplicits._
override protected lazy val log: Logger = LoggerFactory.getLogger(classOf[H2QueryDao])

override protected def eventsBySlicesRangeSql(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ private[r2dbc] final class H2SnapshotDao(settings: R2dbcSettings, connectionFact
ec: ExecutionContext,
system: ActorSystem[_])
extends PostgresSnapshotDao(settings, connectionFactory) {
import settings.codecSettings.SnapshotImplicits._

override protected lazy val log: Logger = LoggerFactory.getLogger(classOf[H2SnapshotDao])

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,12 +48,7 @@ import akka.persistence.r2dbc.internal.codec.PayloadCodec.RichRow
import akka.persistence.r2dbc.internal.codec.PayloadCodec.RichStatement
import akka.persistence.r2dbc.internal.R2dbcExecutor
import akka.persistence.r2dbc.internal.Sql.InterpolationWithAdapter
import akka.persistence.r2dbc.internal.codec.PayloadCodec
import akka.persistence.r2dbc.internal.codec.QueryAdapter
import akka.persistence.r2dbc.internal.codec.SqlServerQueryAdapter
import akka.persistence.r2dbc.internal.codec.TagsCodec
import akka.persistence.r2dbc.internal.codec.TagsCodec.TagsCodecRichStatement
import akka.persistence.r2dbc.internal.codec.TimestampCodec
import akka.persistence.r2dbc.internal.codec.TimestampCodec.TimestampCodecRichRow
import akka.persistence.r2dbc.internal.codec.TimestampCodec.TimestampCodecRichStatement
import akka.persistence.r2dbc.session.scaladsl.R2dbcSession
Expand Down Expand Up @@ -90,6 +85,7 @@ private[r2dbc] class PostgresDurableStateDao(
extends DurableStateDao {
import DurableStateDao._
import PostgresDurableStateDao._
import settings.codecSettings.DurableStateImplicits._
protected def log: Logger = PostgresDurableStateDao.log

private val persistenceExt = Persistence(system)
Expand All @@ -99,12 +95,6 @@ private[r2dbc] class PostgresDurableStateDao(
settings.logDbCallsExceeding,
settings.connectionFactorySettings.poolSettings.closeCallsExceeding)(ec, system)

private implicit val statePayloadCodec: PayloadCodec = settings.codecSettings.durableStatePayloadCodec
private implicit val tagsCodec: TagsCodec = settings.codecSettings.tagsCodec

protected implicit val timestampCodec: TimestampCodec = settings.codecSettings.timestampCodec
protected implicit val queryAdapter: QueryAdapter = settings.codecSettings.queryAdapter

// used for change events
private lazy val journalDao: JournalDao = dialect.createJournalDao(settings, connectionFactory)

Expand Down Expand Up @@ -322,7 +312,7 @@ private[r2dbc] class PostgresDurableStateDao(
private def getPayload(row: Row): Option[Array[Byte]] = {
val serId = row.get("state_ser_id", classOf[Integer])
val rowPayload = row.getPayload("state_payload")
if (serId == 0 && (rowPayload == null || util.Arrays.equals(statePayloadCodec.nonePayload, rowPayload)))
if (serId == 0 && (rowPayload == null || util.Arrays.equals(durableStatePayloadCodec.nonePayload, rowPayload)))
None // delete marker
else
Option(rowPayload)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,7 @@ import akka.persistence.r2dbc.internal.codec.PayloadCodec.RichStatement
import akka.persistence.r2dbc.internal.R2dbcExecutor
import akka.persistence.r2dbc.internal.SerializedEventMetadata
import akka.persistence.r2dbc.internal.Sql.InterpolationWithAdapter
import akka.persistence.r2dbc.internal.codec.TagsCodec
import akka.persistence.r2dbc.internal.codec.TagsCodec.TagsCodecRichStatement
import akka.persistence.r2dbc.internal.codec.TimestampCodec
import akka.persistence.r2dbc.internal.codec.TimestampCodec.TimestampCodecRichRow
import akka.persistence.r2dbc.internal.codec.TimestampCodec.TimestampCodecRichStatement
import akka.persistence.typed.PersistenceId
Expand Down Expand Up @@ -68,6 +66,7 @@ private[r2dbc] class PostgresJournalDao(journalSettings: R2dbcSettings, connecti
system: ActorSystem[_])
extends JournalDao {
import JournalDao.SerializedJournalRow
import journalSettings.codecSettings.JournalImplicits._
protected def log: Logger = PostgresJournalDao.log

private val persistenceExt = Persistence(system)
Expand All @@ -80,10 +79,6 @@ private[r2dbc] class PostgresJournalDao(journalSettings: R2dbcSettings, connecti
journalSettings.connectionFactorySettings.poolSettings.closeCallsExceeding)(ec, system)

protected val journalTable: String = journalSettings.journalTableWithSchema
protected implicit val journalPayloadCodec: PayloadCodec = journalSettings.codecSettings.journalPayloadCodec
protected implicit val tagsCodec: TagsCodec = journalSettings.codecSettings.tagsCodec
protected implicit val timestampCodec: TimestampCodec = journalSettings.codecSettings.timestampCodec
protected implicit val queryAdapter: QueryAdapter = journalSettings.codecSettings.queryAdapter

protected val (insertEventWithParameterTimestampSql, insertEventWithTransactionTimestampSql) = {
val baseSql =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,7 @@ import akka.persistence.r2dbc.internal.codec.PayloadCodec.RichRow
import akka.persistence.r2dbc.internal.QueryDao
import akka.persistence.r2dbc.internal.R2dbcExecutor
import akka.persistence.r2dbc.internal.Sql.InterpolationWithAdapter
import akka.persistence.r2dbc.internal.codec.QueryAdapter
import akka.persistence.r2dbc.internal.codec.TagsCodec
import akka.persistence.r2dbc.internal.codec.TagsCodec.TagsCodecRichRow
import akka.persistence.r2dbc.internal.codec.TimestampCodec
import akka.persistence.r2dbc.internal.codec.TimestampCodec.TimestampCodecRichRow
import akka.persistence.r2dbc.internal.codec.TimestampCodec.TimestampCodecRichStatement
import akka.persistence.typed.PersistenceId
Expand All @@ -36,7 +33,6 @@ import io.r2dbc.spi.ConnectionFactory
import io.r2dbc.spi.Statement
import org.slf4j.Logger
import org.slf4j.LoggerFactory
import akka.persistence.r2dbc.internal.codec.PayloadCodec

/**
* INTERNAL API
Expand All @@ -55,13 +51,10 @@ private[r2dbc] class PostgresQueryDao(settings: R2dbcSettings, connectionFactory
system: ActorSystem[_])
extends QueryDao {
import PostgresJournalDao.readMetadata
import settings.codecSettings.JournalImplicits._

protected def log: Logger = PostgresQueryDao.log
protected val journalTable: String = settings.journalTableWithSchema
protected implicit val journalPayloadCodec: PayloadCodec = settings.codecSettings.journalPayloadCodec
protected implicit val timestampCodec: TimestampCodec = settings.codecSettings.timestampCodec
protected implicit val tagsCodec: TagsCodec = settings.codecSettings.tagsCodec
protected implicit val queryAdapter: QueryAdapter = settings.codecSettings.queryAdapter

protected def sqlFalse: String = "false"
protected def sqlDbTimestamp = "CURRENT_TIMESTAMP"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,12 +31,8 @@ import akka.persistence.r2dbc.internal.codec.PayloadCodec.RichStatement
import akka.persistence.r2dbc.internal.R2dbcExecutor
import akka.persistence.r2dbc.internal.SnapshotDao
import akka.persistence.r2dbc.internal.Sql.InterpolationWithAdapter
import akka.persistence.r2dbc.internal.codec.PayloadCodec
import akka.persistence.r2dbc.internal.codec.QueryAdapter
import akka.persistence.r2dbc.internal.codec.TagsCodec
import akka.persistence.r2dbc.internal.codec.TagsCodec.TagsCodecRichStatement
import akka.persistence.r2dbc.internal.codec.TagsCodec.TagsCodecRichRow
import akka.persistence.r2dbc.internal.codec.TimestampCodec
import akka.persistence.r2dbc.internal.codec.TimestampCodec.TimestampCodecRichRow
import akka.persistence.r2dbc.internal.codec.TimestampCodec.TimestampCodecRichStatement
import akka.persistence.typed.PersistenceId
Expand All @@ -58,16 +54,12 @@ private[r2dbc] class PostgresSnapshotDao(settings: R2dbcSettings, connectionFact
system: ActorSystem[_])
extends SnapshotDao {
import SnapshotDao._
import settings.codecSettings.SnapshotImplicits._

protected def log: Logger = PostgresSnapshotDao.log

protected val snapshotTable: String = settings.snapshotsTableWithSchema

protected implicit val snapshotPayloadCodec: PayloadCodec = settings.codecSettings.snapshotPayloadCodec
protected implicit val timestampCodec: TimestampCodec = settings.codecSettings.timestampCodec
protected implicit val tagsCodec: TagsCodec = settings.codecSettings.tagsCodec
protected implicit val queryAdapter: QueryAdapter = settings.codecSettings.queryAdapter

protected val r2dbcExecutor = new R2dbcExecutor(
connectionFactory,
log,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ private[r2dbc] class SqlServerDurableStateDao(
connectionFactory: ConnectionFactory,
dialect: Dialect)(implicit ec: ExecutionContext, system: ActorSystem[_])
extends PostgresDurableStateDao(settings, connectionFactory, dialect) {
import settings.codecSettings.DurableStateImplicits._

require(settings.useAppTimestamp, "SqlServer requires akka.persistence.r2dbc.use-app-timestamp=on")

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ private[r2dbc] class SqlServerJournalDao(settings: R2dbcSettings, connectionFact
ec: ExecutionContext,
system: ActorSystem[_])
extends PostgresJournalDao(settings, connectionFactory) {
import settings.codecSettings.JournalImplicits._

require(settings.useAppTimestamp, "SqlServer requires akka.persistence.r2dbc.use-app-timestamp=on")
require(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ private[r2dbc] class SqlServerQueryDao(settings: R2dbcSettings, connectionFactor
ec: ExecutionContext,
system: ActorSystem[_])
extends PostgresQueryDao(settings, connectionFactory) {
import settings.codecSettings.JournalImplicits._

override def sqlFalse = "0"

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ private[r2dbc] class SqlServerSnapshotDao(settings: R2dbcSettings, connectionFac
ec: ExecutionContext,
system: ActorSystem[_])
extends PostgresSnapshotDao(settings, connectionFactory) {
import settings.codecSettings.SnapshotImplicits._

override def log: Logger = SqlServerSnapshotDao.log

Expand Down
7 changes: 3 additions & 4 deletions core/src/test/scala/akka/persistence/r2dbc/PayloadSpec.scala
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ import akka.persistence.r2dbc.TestActors.Persister
import akka.persistence.r2dbc.internal.codec.PayloadCodec.RichRow
import com.typesafe.config.ConfigFactory
import org.scalatest.wordspec.AnyWordSpecLike
import akka.persistence.r2dbc.internal.codec.PayloadCodec

/**
* The purpose of this test is to verify JSONB payloads, but it can also be run with ordinary BYTEA payloads. To test
Expand Down Expand Up @@ -89,7 +88,7 @@ class PayloadSpec
}

private def selectJournalRow(persistenceId: String): TestRow = {
implicit val journalPayloadCodec: PayloadCodec = settings.codecSettings.journalPayloadCodec
import settings.codecSettings.JournalImplicits.journalPayloadCodec

r2dbcExecutor
.selectOne[TestRow]("test")(
Expand All @@ -108,7 +107,7 @@ class PayloadSpec
}

private def selectSnapshotRow(persistenceId: String): TestRow = {
implicit val snapshotPayloadCodec: PayloadCodec = settings.codecSettings.snapshotPayloadCodec
import settings.codecSettings.SnapshotImplicits.snapshotPayloadCodec

r2dbcExecutor
.selectOne[TestRow]("test")(
Expand All @@ -127,7 +126,7 @@ class PayloadSpec
}

private def selectDurableStateRow(persistenceId: String): TestRow = {
implicit val durableStatePayloadCodec: PayloadCodec = settings.codecSettings.durableStatePayloadCodec
import settings.codecSettings.DurableStateImplicits.durableStatePayloadCodec

r2dbcExecutor
.selectOne[TestRow]("test")(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ import akka.persistence.r2dbc.TestActors.Persister
import akka.persistence.r2dbc.TestConfig
import akka.persistence.r2dbc.TestData
import akka.persistence.r2dbc.TestDbLifecycle
import akka.persistence.r2dbc.internal.codec.TagsCodec
import akka.persistence.r2dbc.internal.codec.TagsCodec.TagsCodecRichRow
import akka.persistence.typed.PersistenceId
import org.scalatest.wordspec.AnyWordSpecLike
Expand All @@ -29,7 +28,7 @@ class PersistTagsSpec

override def typedSystem: ActorSystem[_] = system
private val settings = R2dbcSettings(system.settings.config.getConfig("akka.persistence.r2dbc"))
private implicit val tagsCodec: TagsCodec = settings.codecSettings.tagsCodec
import settings.codecSettings.JournalImplicits.tagsCodec
case class Row(pid: String, seqNr: Long, tags: Set[String])

"Persist tags" should {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ import akka.persistence.r2dbc.internal.codec.TimestampCodec.TimestampCodecRichRo
import akka.persistence.typed.PersistenceId
import akka.serialization.SerializationExtension
import org.scalatest.wordspec.AnyWordSpecLike
import akka.persistence.r2dbc.internal.codec.PayloadCodec

class PersistTimestampSpec
extends ScalaTestWithActorTestKit(TestConfig.config)
Expand All @@ -31,10 +30,10 @@ class PersistTimestampSpec
with TestData
with LogCapturing {

import settings.codecSettings.JournalImplicits.journalPayloadCodec
override def typedSystem: ActorSystem[_] = system
private val settings = R2dbcSettings(system.settings.config.getConfig("akka.persistence.r2dbc"))
private val serialization = SerializationExtension(system)
private implicit val journalPayloadCodec: PayloadCodec = settings.codecSettings.journalPayloadCodec
case class Row(pid: String, seqNr: Long, dbTimestamp: Instant, event: String)

implicit private val codec: TimestampCodec =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@ import akka.stream.testkit.scaladsl.TestSink
import com.typesafe.config.ConfigFactory
import org.scalatest.wordspec.AnyWordSpecLike
import org.slf4j.LoggerFactory
import akka.persistence.r2dbc.internal.codec.PayloadCodec

object EventsBySliceBacktrackingSpec {
private val BufferSize = 10 // small buffer for testing
Expand All @@ -57,9 +56,7 @@ class EventsBySliceBacktrackingSpec

override def typedSystem: ActorSystem[_] = system
private val settings = R2dbcSettings(system.settings.config.getConfig("akka.persistence.r2dbc"))
private implicit val journalPayloadCodec: PayloadCodec = settings.codecSettings.journalPayloadCodec
private implicit val timestampCodec: TimestampCodec = settings.codecSettings.timestampCodec
private implicit val queryAdapter: QueryAdapter = settings.codecSettings.queryAdapter
import settings.codecSettings.JournalImplicits._

private val query = PersistenceQuery(testKit.system)
.readJournalFor[R2dbcReadJournal](R2dbcReadJournal.Identifier)
Expand Down

0 comments on commit 6d30fdf

Please sign in to comment.