Skip to content

Commit

Permalink
Merge pull request #2813 from SystemFw/pauseWhen/delays
Browse files Browse the repository at this point in the history
Do not withhold emission in `pauseWhen`
  • Loading branch information
SystemFw authored Feb 5, 2022
2 parents 6281d1e + 3cfc98b commit d8bac3a
Showing 1 changed file with 5 additions and 1 deletion.
6 changes: 5 additions & 1 deletion core/shared/src/main/scala/fs2/Stream.scala
Original file line number Diff line number Diff line change
Expand Up @@ -2196,12 +2196,16 @@ final class Stream[+F[_], +O] private[fs2] (private[fs2] val underlying: Pull[F,
def waitToResume =
pauseWhenTrue.discrete.dropWhile(_ == true).take(1).compile.drain

// The logic can be expressed entirely with `waitToResume`, but
// `Signal.get` is lighter than `Signal.discrete`, so the preliminary
// check with `get` in `pauseIfNeeded` acts as an optimisation, since
// we expect a stream to generally not be in a paused state.
def pauseIfNeeded = Stream.exec {
pauseWhenTrue.get.flatMap(paused => waitToResume.whenA(paused))
}

pauseIfNeeded ++ chunks.flatMap { chunk =>
pauseIfNeeded ++ Stream.chunk(chunk)
Stream.chunk(chunk) ++ pauseIfNeeded
}
}

Expand Down

0 comments on commit d8bac3a

Please sign in to comment.