diff --git a/core/src/main/scala/akka/persistence/r2dbc/internal/BySliceQuery.scala b/core/src/main/scala/akka/persistence/r2dbc/internal/BySliceQuery.scala index fdc212a7..9b44b625 100644 --- a/core/src/main/scala/akka/persistence/r2dbc/internal/BySliceQuery.scala +++ b/core/src/main/scala/akka/persistence/r2dbc/internal/BySliceQuery.scala @@ -273,11 +273,9 @@ import org.slf4j.Logger if (state.queryCount != 0 && log.isDebugEnabled()) log.debug( - "{} next query [{}] from slices [{} - {}], between time [{} - {}]. Found [{}] rows in previous query.", + "{} next query [{}], between time [{} - {}]. Found [{}] rows in previous query.", logPrefix, state.queryCount, - minSlice, - maxSlice, fromTimestamp, toTimestamp, state.rowCount) @@ -300,11 +298,9 @@ import org.slf4j.Logger } else { if (log.isDebugEnabled) log.debug( - "{} query [{}] from slices [{} - {}] completed. Found [{}] rows in previous query.", + "{} query [{}] completed. Found [{}] rows in previous query.", logPrefix, state.queryCount, - minSlice, - maxSlice, state.rowCount) state -> None @@ -319,13 +315,7 @@ import org.slf4j.Logger .futureSource[Envelope, NotUsed] { currentTimestamp.map { currentTime => if (log.isDebugEnabled()) - log.debug( - "{} query slices [{} - {}], from time [{}] until now [{}].", - logPrefix, - minSlice, - maxSlice, - initialOffset.timestamp, - currentTime) + log.debug("{} query, from time [{}] until now [{}].", logPrefix, initialOffset.timestamp, currentTime) ContinuousQuery[QueryState, Envelope]( initialState = QueryState.empty.copy(latest = initialOffset), @@ -348,12 +338,7 @@ import org.slf4j.Logger val initialOffset = toTimestampOffset(offset) if (log.isDebugEnabled()) - log.debug( - "Starting {} query from slices [{} - {}], from time [{}].", - logPrefix, - minSlice, - maxSlice, - initialOffset.timestamp) + log.debug("{} starting query from time [{}].", logPrefix, initialOffset.timestamp) def nextOffset(state: QueryState, envelope: Envelope): QueryState = { if (EnvelopeOrigin.isHeartbeatEvent(envelope)) @@ -408,13 +393,7 @@ import org.slf4j.Logger if (log.isDebugEnabled) delay.foreach { d => - log.debug( - "{} query [{}] from slices [{} - {}] delay next [{}] ms.", - logPrefix, - state.queryCount, - minSlice, - maxSlice, - d.toMillis) + log.debug("{} query [{}] delay next [{}] ms.", logPrefix, state.queryCount, d.toMillis) } delay @@ -517,12 +496,10 @@ import org.slf4j.Logger else "" log.debug( - "{} next query [{}]{} from slices [{} - {}], between time [{} - {}]. {}", + "{} next query [{}]{}, between time [{} - {}]. {}", logPrefix, newState.queryCount, backtrackingInfo, - minSlice, - maxSlice, fromTimestamp, toTimestamp.getOrElse("None"), if (newIdleCount >= 3) s"Idle in [$newIdleCount] queries." @@ -617,12 +594,10 @@ import org.slf4j.Logger if (log.isDebugEnabled) { val sum = counts.iterator.map { case Bucket(_, count) => count }.sum log.debug( - "{} retrieved [{}] event count buckets, with a total of [{}], from slices [{} - {}], from time [{}]", + "{} retrieved [{}] event count buckets, with a total of [{}], from time [{}]", logPrefix, counts.size, sum, - minSlice, - maxSlice, fromTimestamp) } newState diff --git a/core/src/main/scala/akka/persistence/r2dbc/query/scaladsl/R2dbcReadJournal.scala b/core/src/main/scala/akka/persistence/r2dbc/query/scaladsl/R2dbcReadJournal.scala index 14618e0c..f24c0100 100644 --- a/core/src/main/scala/akka/persistence/r2dbc/query/scaladsl/R2dbcReadJournal.scala +++ b/core/src/main/scala/akka/persistence/r2dbc/query/scaladsl/R2dbcReadJournal.scala @@ -270,7 +270,12 @@ final class R2dbcReadJournal(system: ExtendedActorSystem, config: Config, cfgPat maxSlice: Int, offset: Offset): Source[EventEnvelope[Event], NotUsed] = { bySlice(entityType, minSlice) - .currentBySlices("currentEventsBySlices", entityType, minSlice, maxSlice, offset) + .currentBySlices( + s"[$entityType] currentEventsBySlices [$minSlice-$maxSlice]: ", + entityType, + minSlice, + maxSlice, + offset) } /** @@ -312,7 +317,12 @@ final class R2dbcReadJournal(system: ExtendedActorSystem, config: Config, cfgPat maxSlice: Int, offset: Offset): Source[EventEnvelope[Event], NotUsed] = { val dbSource = - bySlice[Event](entityType, minSlice).liveBySlices("eventsBySlices", entityType, minSlice, maxSlice, offset) + bySlice[Event](entityType, minSlice).liveBySlices( + s"[$entityType] eventsBySlices [$minSlice-$maxSlice]: ", + entityType, + minSlice, + maxSlice, + offset) if (settings.journalPublishEvents) { val pubSubSource = eventsBySlicesPubSubSource[Event](entityType, minSlice, maxSlice) mergeDbAndPubSubSources(dbSource, pubSubSource) @@ -345,7 +355,12 @@ final class R2dbcReadJournal(system: ExtendedActorSystem, config: Config, cfgPat val snapshotSource = snapshotsBySlice[Snapshot, Event](entityType, minSlice, transformSnapshot) - .currentBySlices("currentSnapshotsBySlices", entityType, minSlice, maxSlice, offset) + .currentBySlices( + s"[$entityType] currentSnapshotsBySlices [$minSlice-$maxSlice]: ", + entityType, + minSlice, + maxSlice, + offset) Source.fromGraph( new StartingFromSnapshotStage[Event]( @@ -368,7 +383,7 @@ final class R2dbcReadJournal(system: ExtendedActorSystem, config: Config, cfgPat snapshotOffsets.size) bySlice(entityType, minSlice).currentBySlices( - "currentEventsBySlices", + s"[$entityType] currentEventsBySlices [$minSlice-$maxSlice]: ", entityType, minSlice, maxSlice, @@ -402,7 +417,12 @@ final class R2dbcReadJournal(system: ExtendedActorSystem, config: Config, cfgPat val snapshotSource = snapshotsBySlice[Snapshot, Event](entityType, minSlice, transformSnapshot) - .currentBySlices("snapshotsBySlices", entityType, minSlice, maxSlice, offset) + .currentBySlices( + s"[$entityType] snapshotsBySlices [$minSlice-$maxSlice]: ", + entityType, + minSlice, + maxSlice, + offset) Source.fromGraph( new StartingFromSnapshotStage[Event]( @@ -426,7 +446,7 @@ final class R2dbcReadJournal(system: ExtendedActorSystem, config: Config, cfgPat val dbSource = bySlice[Event](entityType, minSlice).liveBySlices( - "eventsBySlices", + s"[$entityType] eventsBySlices [$minSlice-$maxSlice]: ", entityType, minSlice, maxSlice, @@ -697,11 +717,18 @@ final class R2dbcReadJournal(system: ExtendedActorSystem, config: Config, cfgPat // EventTimestampQuery override def timestampOf(persistenceId: String, sequenceNr: Long): Future[Option[Instant]] = { - queryDao.timestampOfEvent(persistenceId, sequenceNr) + val result = queryDao.timestampOfEvent(persistenceId, sequenceNr) + if (log.isDebugEnabled) { + result.foreach { t => + log.debug("[{}] timestampOf seqNr [{}] is [{}]", persistenceId, sequenceNr, t) + } + } + result } //LoadEventQuery override def loadEnvelope[Event](persistenceId: String, sequenceNr: Long): Future[EventEnvelope[Event]] = { + log.debug("[{}] loadEnvelope seqNr [{}]", persistenceId, sequenceNr) queryDao .loadEvent(persistenceId, sequenceNr, includePayload = true) .map { diff --git a/core/src/main/scala/akka/persistence/r2dbc/state/scaladsl/R2dbcDurableStateStore.scala b/core/src/main/scala/akka/persistence/r2dbc/state/scaladsl/R2dbcDurableStateStore.scala index 4fa5dab7..a297b324 100644 --- a/core/src/main/scala/akka/persistence/r2dbc/state/scaladsl/R2dbcDurableStateStore.scala +++ b/core/src/main/scala/akka/persistence/r2dbc/state/scaladsl/R2dbcDurableStateStore.scala @@ -307,14 +307,24 @@ class R2dbcDurableStateStore[A](system: ExtendedActorSystem, config: Config, cfg minSlice: Int, maxSlice: Int, offset: Offset): Source[DurableStateChange[A], NotUsed] = - bySlice.currentBySlices("currentChangesBySlices", entityType, minSlice, maxSlice, offset) + bySlice.currentBySlices( + s"[$entityType] currentChangesBySlices [$minSlice-$maxSlice]: ", + entityType, + minSlice, + maxSlice, + offset) override def changesBySlices( entityType: String, minSlice: Int, maxSlice: Int, offset: Offset): Source[DurableStateChange[A], NotUsed] = - bySlice.liveBySlices("changesBySlices", entityType, minSlice, maxSlice, offset) + bySlice.liveBySlices( + s"[$entityType] changesBySlices [$minSlice-$maxSlice]: ", + entityType, + minSlice, + maxSlice, + offset) /** * Note: If you have configured `custom-table` this query will look in both the default table and the custom tables.