Skip to content

Commit

Permalink
Fix for snapshot stage double pull race
Browse files Browse the repository at this point in the history
  • Loading branch information
johanandren committed Oct 19, 2023
1 parent aa314c9 commit d3bc39b
Showing 1 changed file with 3 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ import akka.stream.stage.OutHandler
}

override def onUpstreamFinish(): Unit = {
val primaryHandler = new PrimaryHandler
val primaryHandler = new PrimaryHandler(isAvailable(out))
self.setHandler(out, primaryHandler)

subFusingMaterializer.materialize(
Expand All @@ -80,9 +80,9 @@ import akka.stream.stage.OutHandler
}
}

class PrimaryHandler extends OutHandler with InHandler {
class PrimaryHandler(pullImmediately: Boolean) extends OutHandler with InHandler {
val subSink = new SubSinkInlet[EventEnvelope[Event]]("snapshots")
subSink.pull()
if (pullImmediately) subSink.pull()
subSink.setHandler(this)

override def onPull(): Unit = {
Expand Down

0 comments on commit d3bc39b

Please sign in to comment.