From d3bc39bd8b09415a0622edd5e8e3456c904e3605 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Johan=20Andr=C3=A9n?= Date: Thu, 19 Oct 2023 15:03:33 +0200 Subject: [PATCH] Fix for snapshot stage double pull race --- .../r2dbc/internal/StartingFromSnapshotStage.scala | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/core/src/main/scala/akka/persistence/r2dbc/internal/StartingFromSnapshotStage.scala b/core/src/main/scala/akka/persistence/r2dbc/internal/StartingFromSnapshotStage.scala index 08f85f2c..3f9840b7 100644 --- a/core/src/main/scala/akka/persistence/r2dbc/internal/StartingFromSnapshotStage.scala +++ b/core/src/main/scala/akka/persistence/r2dbc/internal/StartingFromSnapshotStage.scala @@ -66,7 +66,7 @@ import akka.stream.stage.OutHandler } override def onUpstreamFinish(): Unit = { - val primaryHandler = new PrimaryHandler + val primaryHandler = new PrimaryHandler(isAvailable(out)) self.setHandler(out, primaryHandler) subFusingMaterializer.materialize( @@ -80,9 +80,9 @@ import akka.stream.stage.OutHandler } } - class PrimaryHandler extends OutHandler with InHandler { + class PrimaryHandler(pullImmediately: Boolean) extends OutHandler with InHandler { val subSink = new SubSinkInlet[EventEnvelope[Event]]("snapshots") - subSink.pull() + if (pullImmediately) subSink.pull() subSink.setHandler(this) override def onPull(): Unit = {