diff --git a/core/src/main/scala/akka/persistence/r2dbc/internal/StartingFromSnapshotStage.scala b/core/src/main/scala/akka/persistence/r2dbc/internal/StartingFromSnapshotStage.scala index 08f85f2c..3f9840b7 100644 --- a/core/src/main/scala/akka/persistence/r2dbc/internal/StartingFromSnapshotStage.scala +++ b/core/src/main/scala/akka/persistence/r2dbc/internal/StartingFromSnapshotStage.scala @@ -66,7 +66,7 @@ import akka.stream.stage.OutHandler } override def onUpstreamFinish(): Unit = { - val primaryHandler = new PrimaryHandler + val primaryHandler = new PrimaryHandler(isAvailable(out)) self.setHandler(out, primaryHandler) subFusingMaterializer.materialize( @@ -80,9 +80,9 @@ import akka.stream.stage.OutHandler } } - class PrimaryHandler extends OutHandler with InHandler { + class PrimaryHandler(pullImmediately: Boolean) extends OutHandler with InHandler { val subSink = new SubSinkInlet[EventEnvelope[Event]]("snapshots") - subSink.pull() + if (pullImmediately) subSink.pull() subSink.setHandler(this) override def onPull(): Unit = {