From e24380ee2f33652f5ad5bd752c64738a65dff0de Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Johan=20Andr=C3=A9n?= Date: Thu, 15 Sep 2022 21:34:59 -0400 Subject: [PATCH 1/5] feat: Limit events by tag query ordering sizes --- core/src/main/resources/reference.conf | 20 +++++++++++++++++++ .../jdbc/config/AkkaPersistenceConfig.scala | 1 + .../jdbc/query/scaladsl/JdbcReadJournal.scala | 18 +++++++++++++++-- 3 files changed, 37 insertions(+), 2 deletions(-) diff --git a/core/src/main/resources/reference.conf b/core/src/main/resources/reference.conf index 293479eb3..714cb0e39 100644 --- a/core/src/main/resources/reference.conf +++ b/core/src/main/resources/reference.conf @@ -396,6 +396,26 @@ jdbc-read-journal { # are delivered downstreams. max-buffer-size = "500" + # Number of 'max-buffer-size's to limit each events by tag query to + # + # Events by tag will fetch batches of elements limiting both using the DB LIMIT support and + # the "ordering" column of the journal. When executing a query starting from the beginning of the + # journal, for example adding a new projection to an existing application with a large number + # of already persisted events this can cause performance problems in some databases. + # + # This factor limits the "slices" of ordering the journal is queried for into smaller chunks, + # issuing more queries where each query covers a smaller slice of the journal instead of one + # covering the entire journal. + # + # Note that setting this too low will have a performance overhead in many queries being issued where + # each query returns no or very few entries, but what number is to low depends on how many tags are + # used and how well those are distributed, setting this value requires application specific benchmarking + # to find a good number. + # + # 0 means disable the factor and query the entire journal and limit to max-buffer-size elements + # FIXME set to 0 default for opt in, enabled to run through CI tests + events-by-tag-buffer-sizes-per-query = 10 + # If enabled, automatically close the database connection when the actor system is terminated add-shutdown-hook = true diff --git a/core/src/main/scala/akka/persistence/jdbc/config/AkkaPersistenceConfig.scala b/core/src/main/scala/akka/persistence/jdbc/config/AkkaPersistenceConfig.scala index d82fafdc3..e33527cb2 100644 --- a/core/src/main/scala/akka/persistence/jdbc/config/AkkaPersistenceConfig.scala +++ b/core/src/main/scala/akka/persistence/jdbc/config/AkkaPersistenceConfig.scala @@ -187,6 +187,7 @@ class ReadJournalConfig(config: Config) { val pluginConfig = new ReadJournalPluginConfig(config) val refreshInterval: FiniteDuration = config.asFiniteDuration("refresh-interval") val maxBufferSize: Int = config.getInt("max-buffer-size") + val eventsByTagBufferSizesPerQuery: Long = config.getLong("events-by-tag-buffer-sizes-per-query") val addShutdownHook: Boolean = config.getBoolean("add-shutdown-hook") override def toString: String = diff --git a/core/src/main/scala/akka/persistence/jdbc/query/scaladsl/JdbcReadJournal.scala b/core/src/main/scala/akka/persistence/jdbc/query/scaladsl/JdbcReadJournal.scala index 97643759f..6eaad6f67 100644 --- a/core/src/main/scala/akka/persistence/jdbc/query/scaladsl/JdbcReadJournal.scala +++ b/core/src/main/scala/akka/persistence/jdbc/query/scaladsl/JdbcReadJournal.scala @@ -228,15 +228,29 @@ class JdbcReadJournal(config: Config, configPath: String)(implicit val system: E import FlowControl._ implicit val askTimeout: Timeout = Timeout(readJournalConfig.journalSequenceRetrievalConfiguration.askTimeout) val batchSize = readJournalConfig.maxBufferSize + val maxOrderingRange = readJournalConfig.eventsByTagBufferSizesPerQuery match { + case 0 => None + case x => Some(x * batchSize) + } + + def getLoopMaxOrderingId(offset: Long, latestOrdering: MaxOrderingId): Future[MaxOrderingId] = + Future.successful(maxOrderingRange match { + case None => latestOrdering + case Some(numberOfEvents) => + val limitedMaxOrderingId = offset + numberOfEvents + if (limitedMaxOrderingId >= latestOrdering.maxOrdering) latestOrdering + else MaxOrderingId(limitedMaxOrderingId) + }) Source .unfoldAsync[(Long, FlowControl), Seq[EventEnvelope]]((offset, Continue)) { case (from, control) => def retrieveNextBatch() = { for { queryUntil <- journalSequenceActor.ask(GetMaxOrderingId).mapTo[MaxOrderingId] - xs <- currentJournalEventsByTag(tag, from, batchSize, queryUntil).runWith(Sink.seq) + loopMaxOrderingId <- getLoopMaxOrderingId(from, queryUntil) + xs <- currentJournalEventsByTag(tag, from, batchSize, loopMaxOrderingId).runWith(Sink.seq) } yield { - val hasMoreEvents = xs.size == batchSize + val hasMoreEvents = (xs.size == batchSize) || (loopMaxOrderingId.maxOrdering < queryUntil.maxOrdering) val nextControl: FlowControl = terminateAfterOffset match { // we may stop if target is behind queryUntil and we don't have more events to fetch From d25ace2b9674076032b8250f99c0e932a2353ea0 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Johan=20Andr=C3=A9n?= Date: Fri, 16 Sep 2022 10:48:25 +0200 Subject: [PATCH 2/5] Don't let it overflow to negative --- .../akka/persistence/jdbc/query/scaladsl/JdbcReadJournal.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/main/scala/akka/persistence/jdbc/query/scaladsl/JdbcReadJournal.scala b/core/src/main/scala/akka/persistence/jdbc/query/scaladsl/JdbcReadJournal.scala index 6eaad6f67..d938f687b 100644 --- a/core/src/main/scala/akka/persistence/jdbc/query/scaladsl/JdbcReadJournal.scala +++ b/core/src/main/scala/akka/persistence/jdbc/query/scaladsl/JdbcReadJournal.scala @@ -238,7 +238,7 @@ class JdbcReadJournal(config: Config, configPath: String)(implicit val system: E case None => latestOrdering case Some(numberOfEvents) => val limitedMaxOrderingId = offset + numberOfEvents - if (limitedMaxOrderingId >= latestOrdering.maxOrdering) latestOrdering + if (limitedMaxOrderingId < 0 || limitedMaxOrderingId >= latestOrdering.maxOrdering) latestOrdering else MaxOrderingId(limitedMaxOrderingId) }) From 36663553118fb59a1314474f9549c9a17ff86414 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Johan=20Andr=C3=A9n?= Date: Fri, 16 Sep 2022 11:45:27 +0200 Subject: [PATCH 3/5] Default to not limit ordering query --- core/src/main/resources/reference.conf | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/core/src/main/resources/reference.conf b/core/src/main/resources/reference.conf index 714cb0e39..bcbdd8a58 100644 --- a/core/src/main/resources/reference.conf +++ b/core/src/main/resources/reference.conf @@ -413,8 +413,7 @@ jdbc-read-journal { # to find a good number. # # 0 means disable the factor and query the entire journal and limit to max-buffer-size elements - # FIXME set to 0 default for opt in, enabled to run through CI tests - events-by-tag-buffer-sizes-per-query = 10 + events-by-tag-buffer-sizes-per-query = 0 # If enabled, automatically close the database connection when the actor system is terminated add-shutdown-hook = true From b0a325782edcf21939d5b7db9d86ff0dafb56121 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Johan=20Andr=C3=A9n?= Date: Fri, 16 Sep 2022 12:52:40 +0200 Subject: [PATCH 4/5] Less future, more comment --- .../jdbc/query/scaladsl/JdbcReadJournal.scala | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/core/src/main/scala/akka/persistence/jdbc/query/scaladsl/JdbcReadJournal.scala b/core/src/main/scala/akka/persistence/jdbc/query/scaladsl/JdbcReadJournal.scala index d938f687b..726ea44e4 100644 --- a/core/src/main/scala/akka/persistence/jdbc/query/scaladsl/JdbcReadJournal.scala +++ b/core/src/main/scala/akka/persistence/jdbc/query/scaladsl/JdbcReadJournal.scala @@ -233,23 +233,25 @@ class JdbcReadJournal(config: Config, configPath: String)(implicit val system: E case x => Some(x * batchSize) } - def getLoopMaxOrderingId(offset: Long, latestOrdering: MaxOrderingId): Future[MaxOrderingId] = - Future.successful(maxOrderingRange match { + def getLoopMaxOrderingId(offset: Long, latestOrdering: MaxOrderingId): MaxOrderingId = + maxOrderingRange match { case None => latestOrdering case Some(numberOfEvents) => val limitedMaxOrderingId = offset + numberOfEvents if (limitedMaxOrderingId < 0 || limitedMaxOrderingId >= latestOrdering.maxOrdering) latestOrdering else MaxOrderingId(limitedMaxOrderingId) - }) + } Source .unfoldAsync[(Long, FlowControl), Seq[EventEnvelope]]((offset, Continue)) { case (from, control) => def retrieveNextBatch() = { for { queryUntil <- journalSequenceActor.ask(GetMaxOrderingId).mapTo[MaxOrderingId] - loopMaxOrderingId <- getLoopMaxOrderingId(from, queryUntil) + loopMaxOrderingId = getLoopMaxOrderingId(from, queryUntil) xs <- currentJournalEventsByTag(tag, from, batchSize, loopMaxOrderingId).runWith(Sink.seq) } yield { + // continue if query over entire journal was fewer than full batch or if we are limiting + // the query through eventsByTagBufferSizesPerQuery and didn't reach the last 'ordering' yet val hasMoreEvents = (xs.size == batchSize) || (loopMaxOrderingId.maxOrdering < queryUntil.maxOrdering) val nextControl: FlowControl = terminateAfterOffset match { From 4eae701c0bd48b7185da3e8b8321caa1c5af299b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Johan=20Andr=C3=A9n?= Date: Fri, 16 Sep 2022 15:08:20 +0200 Subject: [PATCH 5/5] Error message if factor is negative --- .../akka/persistence/jdbc/config/AkkaPersistenceConfig.scala | 1 + 1 file changed, 1 insertion(+) diff --git a/core/src/main/scala/akka/persistence/jdbc/config/AkkaPersistenceConfig.scala b/core/src/main/scala/akka/persistence/jdbc/config/AkkaPersistenceConfig.scala index e33527cb2..30bfbb4ae 100644 --- a/core/src/main/scala/akka/persistence/jdbc/config/AkkaPersistenceConfig.scala +++ b/core/src/main/scala/akka/persistence/jdbc/config/AkkaPersistenceConfig.scala @@ -188,6 +188,7 @@ class ReadJournalConfig(config: Config) { val refreshInterval: FiniteDuration = config.asFiniteDuration("refresh-interval") val maxBufferSize: Int = config.getInt("max-buffer-size") val eventsByTagBufferSizesPerQuery: Long = config.getLong("events-by-tag-buffer-sizes-per-query") + require(eventsByTagBufferSizesPerQuery >= 0, "events-by-tag-buffer-sizes-per-query must not be negative") val addShutdownHook: Boolean = config.getBoolean("add-shutdown-hook") override def toString: String =