-
Notifications
You must be signed in to change notification settings - Fork 142
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Limit events by tag query ordering sizes #681
Limit events by tag query ordering sizes #681
Conversation
Hi @JustinPihony, Thank you for your contribution! We really value the time you've taken to put this together. We see that you have signed the Lightbend Contributors License Agreement before, however, the CLA has changed since you last signed it. |
a9236ac
to
2600ab5
Compare
2600ab5
to
e24380e
Compare
Tested with a |
|
||
Source | ||
.unfoldAsync[(Long, FlowControl), Seq[EventEnvelope]]((offset, Continue)) { case (from, control) => | ||
def retrieveNextBatch() = { | ||
for { | ||
queryUntil <- journalSequenceActor.ask(GetMaxOrderingId).mapTo[MaxOrderingId] | ||
xs <- currentJournalEventsByTag(tag, from, batchSize, queryUntil).runWith(Sink.seq) | ||
loopMaxOrderingId <- getLoopMaxOrderingId(from, queryUntil) | ||
xs <- currentJournalEventsByTag(tag, from, batchSize, loopMaxOrderingId).runWith(Sink.seq) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Each fetch is still limited by batch size, but the far end of the query to consider is capped by from + maxOrderingRange
elements, for unevenly tagged event streams that could lead to many empty results, so user will have to benchmark and tune in their specific journal.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
core/src/main/scala/akka/persistence/jdbc/query/scaladsl/JdbcReadJournal.scala
Outdated
Show resolved
Hide resolved
core/src/main/scala/akka/persistence/jdbc/query/scaladsl/JdbcReadJournal.scala
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
This makes it possible to limit the chunks of the journal queried for events by tag to work around bad performance in some dbs when starting a new query/projection from 0 after a lot of tagged events was already persisted. By default the limiting is disabled, will need app specific tuning when enabled.