Skip to content

Commit

Permalink
chore: More information in query log prefix
Browse files Browse the repository at this point in the history
patriknw committed Dec 4, 2024
1 parent 126f218 commit 667be6a
Showing 3 changed files with 53 additions and 41 deletions.
Original file line number Diff line number Diff line change
@@ -273,11 +273,9 @@ import org.slf4j.Logger

if (state.queryCount != 0 && log.isDebugEnabled())
log.debug(
"{} next query [{}] from slices [{} - {}], between time [{} - {}]. Found [{}] rows in previous query.",
"{} next query [{}], between time [{} - {}]. Found [{}] rows in previous query.",
logPrefix,
state.queryCount,
minSlice,
maxSlice,
fromTimestamp,
toTimestamp,
state.rowCount)
@@ -300,11 +298,9 @@ import org.slf4j.Logger
} else {
if (log.isDebugEnabled)
log.debug(
"{} query [{}] from slices [{} - {}] completed. Found [{}] rows in previous query.",
"{} query [{}] completed. Found [{}] rows in previous query.",
logPrefix,
state.queryCount,
minSlice,
maxSlice,
state.rowCount)

state -> None
@@ -319,13 +315,7 @@ import org.slf4j.Logger
.futureSource[Envelope, NotUsed] {
currentTimestamp.map { currentTime =>
if (log.isDebugEnabled())
log.debug(
"{} query slices [{} - {}], from time [{}] until now [{}].",
logPrefix,
minSlice,
maxSlice,
initialOffset.timestamp,
currentTime)
log.debug("{} query, from time [{}] until now [{}].", logPrefix, initialOffset.timestamp, currentTime)

ContinuousQuery[QueryState, Envelope](
initialState = QueryState.empty.copy(latest = initialOffset),
@@ -348,12 +338,7 @@ import org.slf4j.Logger
val initialOffset = toTimestampOffset(offset)

if (log.isDebugEnabled())
log.debug(
"Starting {} query from slices [{} - {}], from time [{}].",
logPrefix,
minSlice,
maxSlice,
initialOffset.timestamp)
log.debug("{} starting query from time [{}].", logPrefix, initialOffset.timestamp)

def nextOffset(state: QueryState, envelope: Envelope): QueryState = {
if (EnvelopeOrigin.isHeartbeatEvent(envelope))
@@ -408,13 +393,7 @@ import org.slf4j.Logger

if (log.isDebugEnabled)
delay.foreach { d =>
log.debug(
"{} query [{}] from slices [{} - {}] delay next [{}] ms.",
logPrefix,
state.queryCount,
minSlice,
maxSlice,
d.toMillis)
log.debug("{} query [{}] delay next [{}] ms.", logPrefix, state.queryCount, d.toMillis)
}

delay
@@ -517,12 +496,10 @@ import org.slf4j.Logger
else
""
log.debug(
"{} next query [{}]{} from slices [{} - {}], between time [{} - {}]. {}",
"{} next query [{}]{}, between time [{} - {}]. {}",
logPrefix,
newState.queryCount,
backtrackingInfo,
minSlice,
maxSlice,
fromTimestamp,
toTimestamp.getOrElse("None"),
if (newIdleCount >= 3) s"Idle in [$newIdleCount] queries."
@@ -617,12 +594,10 @@ import org.slf4j.Logger
if (log.isDebugEnabled) {
val sum = counts.iterator.map { case Bucket(_, count) => count }.sum
log.debug(
"{} retrieved [{}] event count buckets, with a total of [{}], from slices [{} - {}], from time [{}]",
"{} retrieved [{}] event count buckets, with a total of [{}], from time [{}]",
logPrefix,
counts.size,
sum,
minSlice,
maxSlice,
fromTimestamp)
}
newState
Original file line number Diff line number Diff line change
@@ -270,7 +270,12 @@ final class R2dbcReadJournal(system: ExtendedActorSystem, config: Config, cfgPat
maxSlice: Int,
offset: Offset): Source[EventEnvelope[Event], NotUsed] = {
bySlice(entityType, minSlice)
.currentBySlices("currentEventsBySlices", entityType, minSlice, maxSlice, offset)
.currentBySlices(
s"[$entityType] currentEventsBySlices [$minSlice-$maxSlice]: ",
entityType,
minSlice,
maxSlice,
offset)
}

/**
@@ -312,7 +317,12 @@ final class R2dbcReadJournal(system: ExtendedActorSystem, config: Config, cfgPat
maxSlice: Int,
offset: Offset): Source[EventEnvelope[Event], NotUsed] = {
val dbSource =
bySlice[Event](entityType, minSlice).liveBySlices("eventsBySlices", entityType, minSlice, maxSlice, offset)
bySlice[Event](entityType, minSlice).liveBySlices(
s"[$entityType] eventsBySlices [$minSlice-$maxSlice]: ",
entityType,
minSlice,
maxSlice,
offset)
if (settings.journalPublishEvents) {
val pubSubSource = eventsBySlicesPubSubSource[Event](entityType, minSlice, maxSlice)
mergeDbAndPubSubSources(dbSource, pubSubSource)
@@ -345,7 +355,12 @@ final class R2dbcReadJournal(system: ExtendedActorSystem, config: Config, cfgPat

val snapshotSource =
snapshotsBySlice[Snapshot, Event](entityType, minSlice, transformSnapshot)
.currentBySlices("currentSnapshotsBySlices", entityType, minSlice, maxSlice, offset)
.currentBySlices(
s"[$entityType] currentSnapshotsBySlices [$minSlice-$maxSlice]: ",
entityType,
minSlice,
maxSlice,
offset)

Source.fromGraph(
new StartingFromSnapshotStage[Event](
@@ -368,7 +383,7 @@ final class R2dbcReadJournal(system: ExtendedActorSystem, config: Config, cfgPat
snapshotOffsets.size)

bySlice(entityType, minSlice).currentBySlices(
"currentEventsBySlices",
s"[$entityType] currentEventsBySlices [$minSlice-$maxSlice]: ",
entityType,
minSlice,
maxSlice,
@@ -402,7 +417,12 @@ final class R2dbcReadJournal(system: ExtendedActorSystem, config: Config, cfgPat

val snapshotSource =
snapshotsBySlice[Snapshot, Event](entityType, minSlice, transformSnapshot)
.currentBySlices("snapshotsBySlices", entityType, minSlice, maxSlice, offset)
.currentBySlices(
s"[$entityType] snapshotsBySlices [$minSlice-$maxSlice]: ",
entityType,
minSlice,
maxSlice,
offset)

Source.fromGraph(
new StartingFromSnapshotStage[Event](
@@ -426,7 +446,7 @@ final class R2dbcReadJournal(system: ExtendedActorSystem, config: Config, cfgPat

val dbSource =
bySlice[Event](entityType, minSlice).liveBySlices(
"eventsBySlices",
s"[$entityType] eventsBySlices [$minSlice-$maxSlice]: ",
entityType,
minSlice,
maxSlice,
@@ -697,11 +717,18 @@ final class R2dbcReadJournal(system: ExtendedActorSystem, config: Config, cfgPat

// EventTimestampQuery
override def timestampOf(persistenceId: String, sequenceNr: Long): Future[Option[Instant]] = {
queryDao.timestampOfEvent(persistenceId, sequenceNr)
val result = queryDao.timestampOfEvent(persistenceId, sequenceNr)
if (log.isDebugEnabled) {
result.foreach { t =>
log.debug("[{}] timestampOf seqNr [{}] is [{}]", persistenceId, sequenceNr, t)
}
}
result
}

//LoadEventQuery
override def loadEnvelope[Event](persistenceId: String, sequenceNr: Long): Future[EventEnvelope[Event]] = {
log.debug("[{}] loadEnvelope seqNr [{}]", persistenceId, sequenceNr)
queryDao
.loadEvent(persistenceId, sequenceNr, includePayload = true)
.map {
Original file line number Diff line number Diff line change
@@ -307,14 +307,24 @@ class R2dbcDurableStateStore[A](system: ExtendedActorSystem, config: Config, cfg
minSlice: Int,
maxSlice: Int,
offset: Offset): Source[DurableStateChange[A], NotUsed] =
bySlice.currentBySlices("currentChangesBySlices", entityType, minSlice, maxSlice, offset)
bySlice.currentBySlices(
s"[$entityType] currentChangesBySlices [$minSlice-$maxSlice]: ",
entityType,
minSlice,
maxSlice,
offset)

override def changesBySlices(
entityType: String,
minSlice: Int,
maxSlice: Int,
offset: Offset): Source[DurableStateChange[A], NotUsed] =
bySlice.liveBySlices("changesBySlices", entityType, minSlice, maxSlice, offset)
bySlice.liveBySlices(
s"[$entityType] changesBySlices [$minSlice-$maxSlice]: ",
entityType,
minSlice,
maxSlice,
offset)

/**
* Note: If you have configured `custom-table` this query will look in both the default table and the custom tables.

0 comments on commit 667be6a

Please sign in to comment.