diff --git a/core/shared/src/main/scala/fs2/Pull.scala b/core/shared/src/main/scala/fs2/Pull.scala index f2d2845752..f3a533cc42 100644 --- a/core/shared/src/main/scala/fs2/Pull.scala +++ b/core/shared/src/main/scala/fs2/Pull.scala @@ -554,9 +554,24 @@ object Pull extends PullLowPriority { * previous timeout, but a duration of 0 is treated specially, in * that it will cancel a pending timeout but not start a new one. * - * Note: the very first execution of `timeout` does not start - * running until the first call to `uncons`, but subsequent calls - * proceed independently after that. + * Note: + * If for some reason you insert a pause in between `uncons` and + * `timeout`, such as: + * {{{ + * timedPull.timeout(n.millis) >> + * Pull.eval(IO.sleep(m.millis)) >> + * timedPull.uncons.flatMap { ... + * }}} + * + * you should be aware that an invocation of `timeout` that + * happens before the very first `uncons` will start the timeout + * simultaneously with the very first `uncons`. Subsequent + * invocations of `timeout` start the timeout immediately + * instead. + * + * This is an implementation detail which should not affect most + * cases, given that usually there is no need to sleep in between + * `timeout` and the very first call to `uncons`. */ def timeout(t: FiniteDuration): Pull[F, INothing, Unit] } diff --git a/core/shared/src/test/scala/fs2/TimedPullsSuite.scala b/core/shared/src/test/scala/fs2/TimedPullsSuite.scala index 915237204d..e483eead3c 100644 --- a/core/shared/src/test/scala/fs2/TimedPullsSuite.scala +++ b/core/shared/src/test/scala/fs2/TimedPullsSuite.scala @@ -34,35 +34,36 @@ class TimedPullsSuite extends Fs2Suite { test("behaves as a normal Pull when no timeouts are used") { forAllF { (s: Stream[Pure, Int]) => - s.covary[IO] - .pull - .timed { tp => - def loop(tp: Pull.Timed[IO, Int]): Pull[IO, Int, Unit] = - tp.uncons.flatMap { - case None => Pull.done - case Some((Right(c), next)) => Pull.output(c) >> loop(next) - case Some((Left(_), _)) => fail("unexpected timeout") - } + val expected = s.compile.toList + val prog = + s.covary[IO] + .pull + .timed { tp => + def loop(tp: Pull.Timed[IO, Int]): Pull[IO, Int, Unit] = + tp.uncons.flatMap { + case None => Pull.done + case Some((Right(c), next)) => Pull.output(c) >> loop(next) + case Some((Left(_), _)) => fail("unexpected timeout") + } - loop(tp) - } - .stream - .compile - .toList - .map(it => assertEquals(it, s.compile.toList)) + loop(tp) + } + .stream + .compile + .toList + + prog.assertEquals(expected) } } test("pulls elements with timeouts, no timeouts trigger") { - // TODO cannot use PropF with `TestControl.executeEmbed` at the moment - val l = List.range(1, 100) - val s = Stream.emits(l).covary[IO].rechunkRandomly() - val period = 500.millis - val timeout = 600.millis - - TestControl - .executeEmbed( - s.metered(period) + forAllF { (s: Stream[Pure, Int]) => + val period = 500.millis + val timeout = 600.millis + val expected = s.compile.toList + val prog = + s.covary[IO] + .metered(period) .pull .timed { tp => def loop(tp: Pull.Timed[IO, Int]): Pull[IO, Int, Unit] = @@ -77,54 +78,53 @@ class TimedPullsSuite extends Fs2Suite { .stream .compile .toList - ) - .map(it => assertEquals(it, l)) + + TestControl.executeEmbed(prog).assertEquals(expected) + } } test("times out whilst pulling a single element") { - TestControl.executeEmbed( - Stream - .sleep[IO](300.millis) - .pull - .timed { tp => - tp.timeout(100.millis) >> - tp.uncons.flatMap { - case Some((Left(_), _)) => Pull.done - case _ => fail("timeout expected") - } - } - .stream - .compile - .drain - ) + val prog = Stream + .sleep[IO](300.millis) + .pull + .timed { tp => + tp.timeout(100.millis) >> + tp.uncons.flatMap { + case Some((Left(_), _)) => Pull.done + case _ => fail("timeout expected") + } + } + .stream + .compile + .drain + + TestControl.executeEmbed(prog) } test("times out after pulling multiple elements") { - val l = List(1, 2, 3) - val s = Stream.emits(l) ++ Stream.never[IO] + val expected = List(1, 2, 3) + val s = Stream.emits(expected) ++ Stream.never[IO] val t = 100.millis val timeout = 350.millis - TestControl - .executeEmbed( - s - .metered(t) - .pull - .timed { tp => - def go(tp: Pull.Timed[IO, Int]): Pull[IO, Int, Unit] = - tp.uncons.flatMap { - case Some((Right(c), n)) => Pull.output(c) >> go(n) - case Some((Left(_), _)) => Pull.done - case None => fail("Unexpected end of input") - } - - tp.timeout(timeout) >> go(tp) + val prog = s + .metered(t) + .pull + .timed { tp => + def go(tp: Pull.Timed[IO, Int]): Pull[IO, Int, Unit] = + tp.uncons.flatMap { + case Some((Right(c), n)) => Pull.output(c) >> go(n) + case Some((Left(_), _)) => Pull.done + case None => fail("Unexpected end of input") } - .stream - .compile - .toList - ) - .map(it => assertEquals(it, l)) + + tp.timeout(timeout) >> go(tp) + } + .stream + .compile + .toList + + TestControl.executeEmbed(prog).assertEquals(expected) } test("pulls elements with timeouts, timeouts trigger after reset") { @@ -133,26 +133,24 @@ class TimedPullsSuite extends Fs2Suite { val n = 10L val s = Stream.constant(1).metered[IO](t).take(n) val expected = Stream("timeout", "elem").repeat.take(n * 2).compile.toList + val prog = + s.pull + .timed { tp => + def go(tp: Pull.Timed[IO, Int]): Pull[IO, String, Unit] = + tp.uncons.flatMap { + case None => Pull.done + case Some((Right(_), next)) => + Pull.output1("elem") >> tp.timeout(timeout) >> go(next) + case Some((Left(_), next)) => Pull.output1("timeout") >> go(next) + } - TestControl - .executeEmbed( - s.pull - .timed { tp => - def go(tp: Pull.Timed[IO, Int]): Pull[IO, String, Unit] = - tp.uncons.flatMap { - case None => Pull.done - case Some((Right(_), next)) => - Pull.output1("elem") >> tp.timeout(timeout) >> go(next) - case Some((Left(_), next)) => Pull.output1("timeout") >> go(next) - } + tp.timeout(timeout) >> go(tp) + } + .stream + .compile + .toList - tp.timeout(timeout) >> go(tp) - } - .stream - .compile - .toList - ) - .map(it => assertEquals(it, expected)) + TestControl.executeEmbed(prog).assertEquals(expected) } test("timeout can be reset before triggering") { @@ -163,7 +161,7 @@ class TimedPullsSuite extends Fs2Suite { // use `never` to test logic without worrying about termination Stream.never[IO] - TestControl.executeEmbed( + val prog = s.pull .timed { one => one.timeout(900.millis) >> one.uncons.flatMap { @@ -183,7 +181,8 @@ class TimedPullsSuite extends Fs2Suite { .stream .compile .drain - ) + + TestControl.executeEmbed(prog) } test("timeout can be reset to a shorter one") { @@ -192,7 +191,7 @@ class TimedPullsSuite extends Fs2Suite { Stream.sleep[IO](1.second) ++ Stream.never[IO] - TestControl.executeEmbed( + val prog = s.pull .timed { one => one.timeout(2.seconds) >> one.uncons.flatMap { @@ -207,14 +206,14 @@ class TimedPullsSuite extends Fs2Suite { .stream .compile .drain - ) + + TestControl.executeEmbed(prog) } test("timeout can be reset without starting a new one") { val s = Stream.sleep[IO](2.seconds) ++ Stream.sleep[IO](2.seconds) val t = 3.seconds - - TestControl.executeEmbed( + val prog = s.pull .timed { one => one.timeout(t) >> one.uncons.flatMap { @@ -234,7 +233,8 @@ class TimedPullsSuite extends Fs2Suite { .stream .compile .drain - ) + + TestControl.executeEmbed(prog) } test("never emits stale timeouts") { @@ -283,4 +283,78 @@ class TimedPullsSuite extends Fs2Suite { .replicateA(10) // number of iterations to stress the race ) } + + test( + "A timeout called before the very first uncons starts simultaneously with the first uncons" + ) { + val emissionTime = 100.millis + val timeout = 200.millis + val timedPullPause = Pull.eval(IO.sleep(150.millis)) + + val prog = + Stream + .sleep[IO](emissionTime) + .pull + .timed { tp => + tp.timeout(timeout) >> + // If the first timeout started immediately, this pause + // before uncons would cause a timeout to be emitted + timedPullPause >> + tp.uncons.flatMap { + case Some((Right(_), _)) => + Pull.done + case Some((Left(_), _)) => + fail("Unexpected timeout") + case None => + fail("Unexpected end of stream") + } + } + .stream + .compile + .drain + + TestControl.executeEmbed(prog) + } + + test("After the first uncons, timeouts start immediately") { + val emissionTime = 100.millis + val timeout = 200.millis + val timedPullPause = Pull.eval(IO.sleep(150.millis)) + + val prog = + Stream + .sleep[IO](emissionTime) + .repeatN(2) + .pull + .timed { tp => + tp.timeout(timeout) >> + // If the first timeout started immediately, this pause + // before uncons would cause a timeout to be emitted + timedPullPause >> + tp.uncons.flatMap { + case Some((Right(_), tp)) => + tp.timeout(timeout) >> + // The timeout starts immediately, so this pause + // before uncons causes a timeout + timedPullPause >> + tp.uncons.flatMap { + case Some((Left(_), _)) => + Pull.done + case Some((Right(_), _)) => + fail("Unexpected element, expected timeout") + case None => + fail("Unexpected end of stream") + } + case Some((Left(_), _)) => + fail("Unexpected timeout") + case None => + fail("Unexpected end of stream") + } + } + .stream + .compile + .drain + + TestControl.executeEmbed(prog) + } }