Skip to content

Commit

Permalink
Merge pull request #2333 from SystemFw/timed-streams-precision
Browse files Browse the repository at this point in the history
Don't lose precision in fixedRate
  • Loading branch information
SystemFw authored Mar 20, 2021
2 parents de371a5 + 8e0d02c commit e4872e6
Show file tree
Hide file tree
Showing 2 changed files with 49 additions and 21 deletions.
40 changes: 19 additions & 21 deletions core/shared/src/main/scala/fs2/Stream.scala
Original file line number Diff line number Diff line change
Expand Up @@ -2920,7 +2920,7 @@ object Stream extends StreamLowPriority {
dampen: Boolean
)(implicit t: Temporal[F]): Stream[F, FiniteDuration] =
Stream.eval(t.monotonic).flatMap { start =>
fixedRate_[F](period.toMillis, dampen, start.toMillis) >> Stream.eval(
fixedRate_[F](period, start, dampen) >> Stream.eval(
t.monotonic.map(_ - start)
)
}
Expand Down Expand Up @@ -3145,30 +3145,28 @@ object Stream extends StreamLowPriority {
def fixedRate[F[_]](period: FiniteDuration, dampen: Boolean)(implicit
F: Temporal[F]
): Stream[F, Unit] =
Stream.eval(F.monotonic.map(_.toMillis)).flatMap(t => fixedRate_(period.toMillis, dampen, t))

private def getMonotonicMillis[F[_]](implicit F: Temporal[F]): Stream[F, Long] =
Stream.eval(F.monotonic.map(_.toMillis))
Stream.eval(F.monotonic).flatMap(t => fixedRate_(period, t, dampen))

private def fixedRate_[F[_]: Temporal](
periodMillis: Long,
dampen: Boolean,
t: Long
period: FiniteDuration,
lastAwakeAt: FiniteDuration,
dampen: Boolean
): Stream[F, Unit] =
getMonotonicMillis.flatMap { now =>
val next = t + periodMillis
if (next <= now) {
val cnt = (now - t - 1) / periodMillis
val out =
if (cnt < 0) Stream.empty
else if (cnt == 0 || dampen) Stream.emit(())
else Stream.emit(()).repeatN(cnt)
out ++ fixedRate_(periodMillis, dampen, next)
} else {
val toSleep = next - now
Stream.sleep_(toSleep.millis) ++ Stream.emit(()) ++ fixedRate_(periodMillis, dampen, next)
if (period.toNanos == 0) Stream(()).repeat
else
Stream.eval(Temporal[F].monotonic).flatMap { now =>
val next = lastAwakeAt + period
val step =
if (next > now) Stream.sleep(next - now)
else {
(now.toNanos - lastAwakeAt.toNanos - 1) / period.toNanos match {
case count if count < 0 => Stream.empty
case count if count == 0 || dampen => Stream.emit(())
case count => Stream.emit(()).repeatN(count)
}
}
step ++ fixedRate_(period, next, dampen)
}
}

private[fs2] final class PartiallyAppliedFromOption[F[_]](
private val dummy: Boolean
Expand Down
30 changes: 30 additions & 0 deletions core/shared/src/test/scala/fs2/StreamCombinatorsSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,36 @@ class StreamCombinatorsSuite extends Fs2Suite {
.take(200)
Stream(s, s, s, s, s).parJoin(5).compile.drain
}

test("short periods, no underflow") {
val input = Stream.range(0, 10)
input
.covary[IO]
.metered(1.nanos)
.compile
.toVector
.assertEquals(input.compile.toVector)
}

test("very short periods, no underflow") {
val input = Stream.range(0, 10)
input
.covary[IO]
.metered(0.3.nanos)
.compile
.toVector
.assertEquals(input.compile.toVector)
}

test("zero-length periods, no underflow") {
val input = Stream.range(0, 10)
input
.covary[IO]
.metered(0.nanos)
.compile
.toVector
.assertEquals(input.compile.toVector)
}
}

group("buffer") {
Expand Down

0 comments on commit e4872e6

Please sign in to comment.