Skip to content

Commit

Permalink
Import and formatting fixes
Browse files Browse the repository at this point in the history
  • Loading branch information
ptrdom committed Feb 12, 2025
1 parent 51d4359 commit efb32d0
Show file tree
Hide file tree
Showing 23 changed files with 77 additions and 95 deletions.
18 changes: 7 additions & 11 deletions core/src/main/resources/reference.conf
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@

// #journal-settings
pekko.persistence.r2dbc {
// TODO temporary solution, change to only using keys that are actually used by the implementation
journal = ${pekko.persistence.r2dbc}
journal {
class = "org.apache.pekko.persistence.r2dbc.journal.R2dbcJournal"
Expand All @@ -25,7 +24,6 @@ pekko.persistence.r2dbc {

// #snapshot-settings
pekko.persistence.r2dbc {
// TODO temporary solution, change to only using keys that are actually used by the implementation
snapshot = ${pekko.persistence.r2dbc}
snapshot {
class = "org.apache.pekko.persistence.r2dbc.snapshot.R2dbcSnapshotStore"
Expand All @@ -42,7 +40,6 @@ pekko.persistence.r2dbc {
// #durable-state-settings
pekko.persistence.r2dbc {
# Durable state store
// TODO temporary solution, change to only using keys that are actually used by the implementation
state = ${pekko.persistence.r2dbc}
state {
class = "org.apache.pekko.persistence.r2dbc.state.R2dbcDurableStateStoreProvider"
Expand All @@ -61,7 +58,6 @@ pekko.persistence.r2dbc {

// #query-settings
pekko.persistence.r2dbc {
// TODO temporary solution, change to only using keys that are actually used by the implementation
query = ${pekko.persistence.r2dbc}
query {
class = "org.apache.pekko.persistence.r2dbc.query.R2dbcReadJournalProvider"
Expand Down Expand Up @@ -160,13 +156,6 @@ pekko.persistence.r2dbc {
connection-factory-options-customizer = ""
}

# Enable this to reduce latency of eventsBySlices. The persisted events will be
# published as Pekko messages and consumed directly by running eventsBySlices
# queries. Tradeoff is more CPU and network resources that are used. The events
# must still be retrieved from the database, but at a lower polling frequency,
# because delivery of published messages are not guaranteed.
publish-events = off

# If database timestamp is guaranteed to not move backwards for two subsequent
# updates of the same persistenceId there might be a performance gain to
# set this to `on`. Note that many databases use the system clock and that can
Expand All @@ -182,6 +171,13 @@ pekko.persistence.r2dbc {
# Set to 0 to log all calls.
log-db-calls-exceeding = 300 ms

# Enable this to reduce latency of eventsBySlices. The persisted events will be
# published as Pekko messages and consumed directly by running eventsBySlices
# queries. Tradeoff is more CPU and network resources that are used. The events
# must still be retrieved from the database, but at a lower polling frequency,
# because delivery of published messages are not guaranteed.
publish-events = off

# In-memory buffer holding events when reading from database.
buffer-size = 1000

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,19 +18,8 @@ import java.util.concurrent.ConcurrentHashMap

import scala.concurrent.Future
import scala.concurrent.duration.Duration
import scala.util.{ Failure, Success }
import com.typesafe.config.Config
import com.typesafe.config.ConfigFactory
import com.typesafe.config.ConfigValueType
import com.typesafe.config.ConfigValueType
import io.r2dbc.pool.ConnectionPool
import io.r2dbc.pool.ConnectionPoolConfiguration
import io.r2dbc.postgresql.PostgresqlConnectionFactoryProvider
import io.r2dbc.postgresql.client.SSLMode
import io.r2dbc.spi.ConnectionFactories
import io.r2dbc.spi.ConnectionFactory
import io.r2dbc.spi.ConnectionFactoryOptions
import io.r2dbc.spi.Option
import scala.util.Failure
import scala.util.Success
import org.apache.pekko
import pekko.Done
import pekko.actor.CoordinatedShutdown
Expand All @@ -41,6 +30,16 @@ import pekko.persistence.r2dbc.ConnectionFactoryProvider.ConnectionFactoryOption
import pekko.persistence.r2dbc.ConnectionFactoryProvider.NoopCustomizer
import pekko.persistence.r2dbc.internal.R2dbcExecutor.PublisherOps
import pekko.util.ccompat.JavaConverters._
import com.typesafe.config.Config
import com.typesafe.config.ConfigFactory
import io.r2dbc.pool.ConnectionPool
import io.r2dbc.pool.ConnectionPoolConfiguration
import io.r2dbc.postgresql.PostgresqlConnectionFactoryProvider
import io.r2dbc.postgresql.client.SSLMode
import io.r2dbc.spi.ConnectionFactories
import io.r2dbc.spi.ConnectionFactory
import io.r2dbc.spi.ConnectionFactoryOptions
import io.r2dbc.spi.Option

object ConnectionFactoryProvider extends ExtensionId[ConnectionFactoryProvider] {
def createExtension(system: ActorSystem[_]): ConnectionFactoryProvider = new ConnectionFactoryProvider(system)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,9 @@ import scala.concurrent.duration._
import org.apache.pekko
import pekko.annotation.InternalApi
import pekko.annotation.InternalStableApi
import pekko.util.Helpers.toRootLowerCase
import pekko.util.JavaDurationConverters._
import com.typesafe.config.Config
import com.typesafe.config.ConfigList
import com.typesafe.config.ConfigObject
import com.typesafe.config.ConfigValueType
import pekko.util.Helpers.toRootLowerCase

/**
* INTERNAL API
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,22 +13,21 @@

package org.apache.pekko.persistence.r2dbc.internal

import scala.collection.immutable
import java.time.Instant
import java.time.{ Duration => JDuration }

import scala.annotation.tailrec
import scala.collection.immutable
import scala.concurrent.ExecutionContext
import scala.concurrent.Future
import scala.concurrent.duration.Duration
import scala.concurrent.duration.FiniteDuration
import org.apache.pekko
import org.apache.pekko.persistence.r2dbc.QuerySettings
import org.apache.pekko.persistence.r2dbc.SharedSettings
import pekko.NotUsed
import pekko.annotation.InternalApi
import pekko.persistence.query.Offset
import pekko.persistence.query.TimestampOffset
import pekko.persistence.r2dbc.SharedSettings
import pekko.persistence.r2dbc.internal.BySliceQuery.Buckets.Bucket
import pekko.stream.scaladsl.Flow
import pekko.stream.scaladsl.Source
Expand Down Expand Up @@ -98,7 +97,10 @@ import org.slf4j.Logger
* Key is the epoch seconds for the start of the bucket. Value is the number of entries in the bucket.
*/
class Buckets(countByBucket: immutable.SortedMap[Buckets.EpochSeconds, Buckets.Count]) {
import Buckets.{ Bucket, BucketDurationSeconds, Count, EpochSeconds }
import Buckets.Bucket
import Buckets.BucketDurationSeconds
import Buckets.Count
import Buckets.EpochSeconds

val createdAt: Instant = Instant.now()

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,26 +17,26 @@ import java.time.Instant

import scala.concurrent.ExecutionContext
import scala.concurrent.Future
import com.typesafe.config.Config
import io.r2dbc.spi.ConnectionFactory
import io.r2dbc.spi.Row
import io.r2dbc.spi.Statement
import org.apache.pekko
import org.apache.pekko.persistence.r2dbc.ConnectionFactoryProvider
import org.apache.pekko.persistence.r2dbc.JournalSettings
import org.apache.pekko.persistence.r2dbc.SharedSettings
import org.apache.pekko.persistence.r2dbc.internal.EventsByPersistenceIdDao
import org.apache.pekko.persistence.r2dbc.internal.HighestSequenceNrDao
import pekko.actor.typed.ActorSystem
import pekko.annotation.InternalApi
import pekko.dispatch.ExecutionContexts
import pekko.persistence.Persistence
import pekko.persistence.r2dbc.ConnectionFactoryProvider
import pekko.persistence.r2dbc.Dialect
import pekko.persistence.r2dbc.JournalSettings
import pekko.persistence.r2dbc.SharedSettings
import pekko.persistence.r2dbc.internal.BySliceQuery
import pekko.persistence.r2dbc.internal.EventsByPersistenceIdDao
import pekko.persistence.r2dbc.internal.HighestSequenceNrDao
import pekko.persistence.r2dbc.internal.R2dbcExecutor
import pekko.persistence.r2dbc.internal.Sql.DialectInterpolation
import pekko.persistence.r2dbc.journal.mysql.MySQLJournalDao
import pekko.persistence.typed.PersistenceId
import com.typesafe.config.Config
import io.r2dbc.spi.ConnectionFactory
import io.r2dbc.spi.Row
import io.r2dbc.spi.Statement
import org.slf4j.Logger
import org.slf4j.LoggerFactory

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ import scala.util.Success
import scala.util.Try
import com.typesafe.config.Config
import org.apache.pekko
import org.apache.pekko.persistence.r2dbc.ConnectionFactoryProvider
import pekko.Done
import pekko.actor.ActorRef
import pekko.actor.typed.ActorSystem
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,14 +20,14 @@
package org.apache.pekko.persistence.r2dbc.journal.mysql

import scala.concurrent.ExecutionContext
import io.r2dbc.spi.ConnectionFactory
import org.apache.pekko
import org.apache.pekko.persistence.r2dbc.JournalSettings
import org.apache.pekko.persistence.r2dbc.SharedSettings
import pekko.actor.typed.ActorSystem
import pekko.annotation.InternalApi
import pekko.persistence.r2dbc.JournalSettings
import pekko.persistence.r2dbc.SharedSettings
import pekko.persistence.r2dbc.internal.Sql.DialectInterpolation
import pekko.persistence.r2dbc.journal.JournalDao
import io.r2dbc.spi.ConnectionFactory

/**
* INTERNAL API
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,35 +19,35 @@ import scala.concurrent.ExecutionContext
import scala.concurrent.Future
import scala.concurrent.duration.Duration
import scala.concurrent.duration.FiniteDuration
import com.typesafe.config.Config
import io.r2dbc.spi.ConnectionFactory
import org.apache.pekko
import org.apache.pekko.persistence.r2dbc.QuerySettings
import org.apache.pekko.persistence.r2dbc.SharedSettings
import org.apache.pekko.persistence.r2dbc.internal.EventsByPersistenceIdDao
import org.apache.pekko.persistence.r2dbc.internal.HighestSequenceNrDao
import pekko.NotUsed
import pekko.actor.typed.ActorSystem
import pekko.annotation.InternalApi
import pekko.persistence.r2dbc.ConnectionFactoryProvider
import pekko.persistence.r2dbc.Dialect
import pekko.persistence.r2dbc.QuerySettings
import pekko.persistence.r2dbc.SharedSettings
import pekko.persistence.r2dbc.internal.BySliceQuery
import pekko.persistence.r2dbc.internal.BySliceQuery.Buckets
import pekko.persistence.r2dbc.internal.BySliceQuery.Buckets.Bucket
import pekko.persistence.r2dbc.internal.EventsByPersistenceIdDao
import pekko.persistence.r2dbc.internal.HighestSequenceNrDao
import pekko.persistence.r2dbc.internal.R2dbcExecutor
import pekko.persistence.r2dbc.internal.Sql.DialectInterpolation
import pekko.persistence.r2dbc.journal.JournalDao
import pekko.persistence.r2dbc.journal.JournalDao.SerializedJournalRow
import pekko.persistence.r2dbc.query.scaladsl.mysql.MySQLQueryDao
import pekko.stream.scaladsl.Source
import com.typesafe.config.Config
import io.r2dbc.spi.ConnectionFactory
import org.slf4j.Logger
import org.slf4j.LoggerFactory

object QueryDao {
val log: Logger = LoggerFactory.getLogger(classOf[QueryDao])

def fromConfig(
settings: QuerySettings,
settings: QuerySettings,
config: Config
)(implicit system: ActorSystem[_], ec: ExecutionContext): QueryDao = {
val connectionFactory =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,6 @@ import scala.concurrent.Future
import scala.concurrent.duration.FiniteDuration
import com.typesafe.config.Config
import org.apache.pekko
import org.apache.pekko.Done
import org.apache.pekko.actor.CoordinatedShutdown
import org.apache.pekko.actor.CoordinatedShutdown
import org.apache.pekko.persistence.r2dbc.ConnectionFactoryProvider
import org.apache.pekko.persistence.r2dbc.internal.R2dbcExecutor.PublisherOps
import pekko.NotUsed
import pekko.actor.ExtendedActorSystem
import pekko.actor.typed.ActorSystem
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,13 +25,13 @@ import scala.concurrent.ExecutionContext
import scala.concurrent.Future
import scala.concurrent.duration.Duration
import scala.concurrent.duration.FiniteDuration
import io.r2dbc.spi.ConnectionFactory
import org.apache.pekko
import org.apache.pekko.persistence.r2dbc.QuerySettings
import pekko.actor.typed.ActorSystem
import pekko.annotation.InternalApi
import pekko.persistence.r2dbc.QuerySettings
import pekko.persistence.r2dbc.internal.Sql.DialectInterpolation
import pekko.persistence.r2dbc.query.scaladsl.QueryDao
import io.r2dbc.spi.ConnectionFactory

/**
* INTERNAL API
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,21 +13,23 @@

package org.apache.pekko.persistence.r2dbc.snapshot

import scala.concurrent.ExecutionContext
import scala.concurrent.Future
import org.apache.pekko
import pekko.actor.typed.ActorSystem
import pekko.actor.typed.scaladsl.adapter._
import pekko.persistence.{ SelectedSnapshot, SnapshotMetadata, SnapshotSelectionCriteria }
import pekko.persistence.r2dbc.SnapshotSettings
import pekko.persistence.snapshot.SnapshotStore
import pekko.serialization.{ Serialization, SerializationExtension }
import com.typesafe.config.Config
import org.apache.pekko.persistence.r2dbc.ConnectionFactoryProvider

import scala.concurrent.{ ExecutionContext, Future }
import pekko.annotation.InternalApi
import pekko.persistence.r2dbc.SnapshotSettings
import pekko.persistence.r2dbc.snapshot.SnapshotDao.SerializedSnapshotMetadata
import pekko.persistence.r2dbc.snapshot.SnapshotDao.SerializedSnapshotRow
import pekko.persistence.snapshot.SnapshotStore
import pekko.persistence.SelectedSnapshot
import pekko.persistence.SnapshotMetadata
import pekko.persistence.SnapshotSelectionCriteria
import pekko.serialization.Serializers
import pekko.serialization.Serialization
import pekko.serialization.SerializationExtension
import com.typesafe.config.Config

object R2dbcSnapshotStore {
private def deserializeSnapshotRow(snap: SerializedSnapshotRow, serialization: Serialization): SelectedSnapshot =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,6 @@ package org.apache.pekko.persistence.r2dbc.snapshot

import scala.concurrent.ExecutionContext
import scala.concurrent.Future
import com.typesafe.config.Config
import io.r2dbc.spi.ConnectionFactory
import io.r2dbc.spi.Row
import org.apache.pekko
import pekko.actor.typed.ActorSystem
import pekko.annotation.InternalApi
Expand All @@ -31,6 +28,9 @@ import pekko.persistence.r2dbc.internal.R2dbcExecutor
import pekko.persistence.r2dbc.internal.Sql.DialectInterpolation
import pekko.persistence.r2dbc.snapshot.mysql.MySQLSnapshotDao
import pekko.persistence.typed.PersistenceId
import com.typesafe.config.Config
import io.r2dbc.spi.ConnectionFactory
import io.r2dbc.spi.Row
import org.slf4j.Logger
import org.slf4j.LoggerFactory

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,7 @@ import scala.concurrent.ExecutionContext
import scala.concurrent.Future
import scala.concurrent.duration.Duration
import scala.concurrent.duration.FiniteDuration
import com.typesafe.config.Config
import io.r2dbc.spi.ConnectionFactory
import io.r2dbc.spi.R2dbcDataIntegrityViolationException
import io.r2dbc.spi.Statement
import org.apache.pekko
import org.apache.pekko.persistence.r2dbc.StateSettings
import pekko.Done
import pekko.NotUsed
import pekko.actor.typed.ActorSystem
Expand All @@ -33,6 +28,7 @@ import pekko.dispatch.ExecutionContexts
import pekko.persistence.Persistence
import pekko.persistence.r2dbc.ConnectionFactoryProvider
import pekko.persistence.r2dbc.Dialect
import pekko.persistence.r2dbc.StateSettings
import pekko.persistence.r2dbc.internal.BySliceQuery
import pekko.persistence.r2dbc.internal.BySliceQuery.Buckets
import pekko.persistence.r2dbc.internal.BySliceQuery.Buckets.Bucket
Expand All @@ -41,6 +37,10 @@ import pekko.persistence.r2dbc.internal.Sql.DialectInterpolation
import pekko.persistence.r2dbc.state.scaladsl.mysql.MySQLDurableStateDao
import pekko.persistence.typed.PersistenceId
import pekko.stream.scaladsl.Source
import com.typesafe.config.Config
import io.r2dbc.spi.ConnectionFactory
import io.r2dbc.spi.R2dbcDataIntegrityViolationException
import io.r2dbc.spi.Statement
import org.slf4j.Logger
import org.slf4j.LoggerFactory

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,6 @@ import scala.concurrent.ExecutionContext
import scala.concurrent.Future
import com.typesafe.config.Config
import org.apache.pekko
import org.apache.pekko.actor.CoordinatedShutdown
import org.apache.pekko.actor.CoordinatedShutdown
import org.apache.pekko.persistence.r2dbc.ConnectionFactoryProvider
import org.apache.pekko.persistence.r2dbc.internal.R2dbcExecutor.PublisherOps
import pekko.Done
import pekko.NotUsed
import pekko.actor.ExtendedActorSystem
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,14 +25,14 @@ import scala.concurrent.ExecutionContext
import scala.concurrent.Future
import scala.concurrent.duration.Duration
import scala.concurrent.duration.FiniteDuration
import io.r2dbc.spi.ConnectionFactory
import org.apache.pekko
import org.apache.pekko.persistence.r2dbc.StateSettings
import pekko.actor.typed.ActorSystem
import pekko.annotation.InternalApi
import pekko.persistence.r2dbc.StateSettings
import pekko.persistence.r2dbc.internal.Sql.DialectInterpolation
import pekko.persistence.r2dbc.journal.mysql.MySQLJournalDao
import pekko.persistence.r2dbc.state.scaladsl.DurableStateDao
import io.r2dbc.spi.ConnectionFactory

/**
* INTERNAL API
Expand Down
Loading

0 comments on commit efb32d0

Please sign in to comment.