From d25611008011ecac2d734502a697ce1dd501079f Mon Sep 17 00:00:00 2001 From: Nikita Gazarov Date: Sun, 22 Aug 2021 11:47:11 -0700 Subject: [PATCH] Fix: Draft fix of SwitchEventStream for #103 --- .../airstream/flatten/SwitchEventStream.scala | 18 ++++++++++++------ 1 file changed, 12 insertions(+), 6 deletions(-) diff --git a/src/main/scala/com/raquo/airstream/flatten/SwitchEventStream.scala b/src/main/scala/com/raquo/airstream/flatten/SwitchEventStream.scala index 73133b92..d6a879c1 100644 --- a/src/main/scala/com/raquo/airstream/flatten/SwitchEventStream.scala +++ b/src/main/scala/com/raquo/airstream/flatten/SwitchEventStream.scala @@ -10,7 +10,7 @@ import scala.util.{ Failure, Success, Try } * * This stream emits the events from the last such stream created this way. * - * Events are emitted at the same time as the currently tracked stream emits them. + * Events are emitted at the same time as the currently tracked stream emits them (but in a new transaction). * * When `parent` emits a nextValue, this stream switches to emitting events from `makeStream(nextValue)` (which is a stream). * @@ -35,10 +35,9 @@ class SwitchEventStream[I, O]( override protected val topoRank: Int = 1 - private[this] var maybeCurrentEventStream: js.UndefOr[Try[EventStream[O]]] = parent match { - case signal: Signal[I @unchecked] => signal.tryNow().map(makeStream) - case _ => js.undefined - } + private[this] var firstStart: Boolean = true + + private[this] var maybeCurrentEventStream: js.UndefOr[Try[EventStream[O]]] = js.undefined // @TODO[Elegance] Maybe we should abstract away this kind of internal observer private[this] val internalEventObserver: InternalObserver[O] = InternalObserver[O]( @@ -71,11 +70,18 @@ class SwitchEventStream[I, O]( } override protected[this] def onStart(): Unit = { + super.onStart() + if (firstStart) { + firstStart = false + maybeCurrentEventStream = parent match { + case signal: Signal[I @unchecked] => signal.tryNow().map(makeStream) + case _ => js.undefined + } + } maybeCurrentEventStream.foreach { streamTry => val initialStream = streamTry.fold(err => EventStream.fromTry(Failure(err), emitOnce = true), identity) initialStream.addInternalObserver(internalEventObserver) } - super.onStart() } override protected[this] def onStop(): Unit = {