Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Clarify corner case in TimedPull #2812

Merged
merged 7 commits into from
Feb 5, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 18 additions & 3 deletions core/shared/src/main/scala/fs2/Pull.scala
Original file line number Diff line number Diff line change
Expand Up @@ -554,9 +554,24 @@ object Pull extends PullLowPriority {
* previous timeout, but a duration of 0 is treated specially, in
* that it will cancel a pending timeout but not start a new one.
*
* Note: the very first execution of `timeout` does not start
* running until the first call to `uncons`, but subsequent calls
* proceed independently after that.
* Note:
* If for some reason you insert a pause in between `uncons` and
* `timeout`, such as:
* {{{
* timedPull.timeout(n.millis) >>
* Pull.eval(IO.sleep(m.millis)) >>
* timedPull.uncons.flatMap { ...
* }}}
*
* you should be aware that an invocation of `timeout` that
* happens before the very first `uncons` will start the timeout
* simultaneously with the very first `uncons`. Subsequent
* invocations of `timeout` start the timeout immediately
* instead.
*
* This is an implementation detail which should not affect most
* cases, given that usually there is no need to sleep in between
* `timeout` and the very first call to `uncons`.
*/
def timeout(t: FiniteDuration): Pull[F, INothing, Unit]
}
Expand Down
248 changes: 161 additions & 87 deletions core/shared/src/test/scala/fs2/TimedPullsSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -34,35 +34,36 @@ class TimedPullsSuite extends Fs2Suite {

test("behaves as a normal Pull when no timeouts are used") {
forAllF { (s: Stream[Pure, Int]) =>
s.covary[IO]
.pull
.timed { tp =>
def loop(tp: Pull.Timed[IO, Int]): Pull[IO, Int, Unit] =
tp.uncons.flatMap {
case None => Pull.done
case Some((Right(c), next)) => Pull.output(c) >> loop(next)
case Some((Left(_), _)) => fail("unexpected timeout")
}
val expected = s.compile.toList
val prog =
s.covary[IO]
.pull
.timed { tp =>
def loop(tp: Pull.Timed[IO, Int]): Pull[IO, Int, Unit] =
tp.uncons.flatMap {
case None => Pull.done
case Some((Right(c), next)) => Pull.output(c) >> loop(next)
case Some((Left(_), _)) => fail("unexpected timeout")
}

loop(tp)
}
.stream
.compile
.toList
.map(it => assertEquals(it, s.compile.toList))
loop(tp)
}
.stream
.compile
.toList

prog.assertEquals(expected)
}
}

test("pulls elements with timeouts, no timeouts trigger") {
// TODO cannot use PropF with `TestControl.executeEmbed` at the moment
val l = List.range(1, 100)
val s = Stream.emits(l).covary[IO].rechunkRandomly()
val period = 500.millis
val timeout = 600.millis

TestControl
.executeEmbed(
s.metered(period)
forAllF { (s: Stream[Pure, Int]) =>
val period = 500.millis
val timeout = 600.millis
val expected = s.compile.toList
val prog =
s.covary[IO]
.metered(period)
.pull
.timed { tp =>
def loop(tp: Pull.Timed[IO, Int]): Pull[IO, Int, Unit] =
Expand All @@ -77,54 +78,53 @@ class TimedPullsSuite extends Fs2Suite {
.stream
.compile
.toList
)
.map(it => assertEquals(it, l))

TestControl.executeEmbed(prog).assertEquals(expected)
}
}

test("times out whilst pulling a single element") {
TestControl.executeEmbed(
Stream
.sleep[IO](300.millis)
.pull
.timed { tp =>
tp.timeout(100.millis) >>
tp.uncons.flatMap {
case Some((Left(_), _)) => Pull.done
case _ => fail("timeout expected")
}
}
.stream
.compile
.drain
)
val prog = Stream
.sleep[IO](300.millis)
.pull
.timed { tp =>
tp.timeout(100.millis) >>
tp.uncons.flatMap {
case Some((Left(_), _)) => Pull.done
case _ => fail("timeout expected")
}
}
.stream
.compile
.drain

TestControl.executeEmbed(prog)
}

test("times out after pulling multiple elements") {
val l = List(1, 2, 3)
val s = Stream.emits(l) ++ Stream.never[IO]
val expected = List(1, 2, 3)
val s = Stream.emits(expected) ++ Stream.never[IO]
val t = 100.millis
val timeout = 350.millis

TestControl
.executeEmbed(
s
.metered(t)
.pull
.timed { tp =>
def go(tp: Pull.Timed[IO, Int]): Pull[IO, Int, Unit] =
tp.uncons.flatMap {
case Some((Right(c), n)) => Pull.output(c) >> go(n)
case Some((Left(_), _)) => Pull.done
case None => fail("Unexpected end of input")
}

tp.timeout(timeout) >> go(tp)
val prog = s
.metered(t)
.pull
.timed { tp =>
def go(tp: Pull.Timed[IO, Int]): Pull[IO, Int, Unit] =
tp.uncons.flatMap {
case Some((Right(c), n)) => Pull.output(c) >> go(n)
case Some((Left(_), _)) => Pull.done
case None => fail("Unexpected end of input")
}
.stream
.compile
.toList
)
.map(it => assertEquals(it, l))

tp.timeout(timeout) >> go(tp)
}
.stream
.compile
.toList

TestControl.executeEmbed(prog).assertEquals(expected)
}

test("pulls elements with timeouts, timeouts trigger after reset") {
Expand All @@ -133,26 +133,24 @@ class TimedPullsSuite extends Fs2Suite {
val n = 10L
val s = Stream.constant(1).metered[IO](t).take(n)
val expected = Stream("timeout", "elem").repeat.take(n * 2).compile.toList
val prog =
s.pull
.timed { tp =>
def go(tp: Pull.Timed[IO, Int]): Pull[IO, String, Unit] =
tp.uncons.flatMap {
case None => Pull.done
case Some((Right(_), next)) =>
Pull.output1("elem") >> tp.timeout(timeout) >> go(next)
case Some((Left(_), next)) => Pull.output1("timeout") >> go(next)
}

TestControl
.executeEmbed(
s.pull
.timed { tp =>
def go(tp: Pull.Timed[IO, Int]): Pull[IO, String, Unit] =
tp.uncons.flatMap {
case None => Pull.done
case Some((Right(_), next)) =>
Pull.output1("elem") >> tp.timeout(timeout) >> go(next)
case Some((Left(_), next)) => Pull.output1("timeout") >> go(next)
}
tp.timeout(timeout) >> go(tp)
}
.stream
.compile
.toList

tp.timeout(timeout) >> go(tp)
}
.stream
.compile
.toList
)
.map(it => assertEquals(it, expected))
TestControl.executeEmbed(prog).assertEquals(expected)
}

test("timeout can be reset before triggering") {
Expand All @@ -163,7 +161,7 @@ class TimedPullsSuite extends Fs2Suite {
// use `never` to test logic without worrying about termination
Stream.never[IO]

TestControl.executeEmbed(
val prog =
s.pull
.timed { one =>
one.timeout(900.millis) >> one.uncons.flatMap {
Expand All @@ -183,7 +181,8 @@ class TimedPullsSuite extends Fs2Suite {
.stream
.compile
.drain
)

TestControl.executeEmbed(prog)
}

test("timeout can be reset to a shorter one") {
Expand All @@ -192,7 +191,7 @@ class TimedPullsSuite extends Fs2Suite {
Stream.sleep[IO](1.second) ++
Stream.never[IO]

TestControl.executeEmbed(
val prog =
s.pull
.timed { one =>
one.timeout(2.seconds) >> one.uncons.flatMap {
Expand All @@ -207,14 +206,14 @@ class TimedPullsSuite extends Fs2Suite {
.stream
.compile
.drain
)

TestControl.executeEmbed(prog)
}

test("timeout can be reset without starting a new one") {
val s = Stream.sleep[IO](2.seconds) ++ Stream.sleep[IO](2.seconds)
val t = 3.seconds

TestControl.executeEmbed(
val prog =
s.pull
.timed { one =>
one.timeout(t) >> one.uncons.flatMap {
Expand All @@ -234,7 +233,8 @@ class TimedPullsSuite extends Fs2Suite {
.stream
.compile
.drain
)

TestControl.executeEmbed(prog)
}

test("never emits stale timeouts") {
Expand Down Expand Up @@ -283,4 +283,78 @@ class TimedPullsSuite extends Fs2Suite {
.replicateA(10) // number of iterations to stress the race
)
}

test(
"A timeout called before the very first uncons starts simultaneously with the first uncons"
) {
val emissionTime = 100.millis
val timeout = 200.millis
val timedPullPause = Pull.eval(IO.sleep(150.millis))

val prog =
Stream
.sleep[IO](emissionTime)
.pull
.timed { tp =>
tp.timeout(timeout) >>
// If the first timeout started immediately, this pause
// before uncons would cause a timeout to be emitted
timedPullPause >>
tp.uncons.flatMap {
case Some((Right(_), _)) =>
Pull.done
case Some((Left(_), _)) =>
fail("Unexpected timeout")
case None =>
fail("Unexpected end of stream")
}
}
.stream
.compile
.drain

TestControl.executeEmbed(prog)
}

test("After the first uncons, timeouts start immediately") {
val emissionTime = 100.millis
val timeout = 200.millis
val timedPullPause = Pull.eval(IO.sleep(150.millis))

val prog =
Stream
.sleep[IO](emissionTime)
.repeatN(2)
.pull
.timed { tp =>
tp.timeout(timeout) >>
// If the first timeout started immediately, this pause
// before uncons would cause a timeout to be emitted
timedPullPause >>
tp.uncons.flatMap {
case Some((Right(_), tp)) =>
tp.timeout(timeout) >>
// The timeout starts immediately, so this pause
// before uncons causes a timeout
timedPullPause >>
tp.uncons.flatMap {
case Some((Left(_), _)) =>
Pull.done
case Some((Right(_), _)) =>
fail("Unexpected element, expected timeout")
case None =>
fail("Unexpected end of stream")
}
case Some((Left(_), _)) =>
fail("Unexpected timeout")
case None =>
fail("Unexpected end of stream")
}
}
.stream
.compile
.drain

TestControl.executeEmbed(prog)
}
}