Skip to content

Commit

Permalink
Merge pull request #1777 from Z1kkurat/add-with-timeout-combinator
Browse files Browse the repository at this point in the history
Add withTimeout combinator
  • Loading branch information
mpilquist authored Feb 5, 2020
2 parents 24ef047 + 3c1a76e commit 61b8258
Show file tree
Hide file tree
Showing 2 changed files with 43 additions and 0 deletions.
12 changes: 12 additions & 0 deletions core/shared/src/main/scala/fs2/Stream.scala
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import fs2.internal.{Resource => _, _}
import java.io.PrintStream

import scala.annotation.tailrec
import scala.concurrent.TimeoutException
import scala.concurrent.duration._

/**
Expand Down Expand Up @@ -2713,6 +2714,17 @@ final class Stream[+F[_], +O] private[fs2] (private val free: FreeC[F, O, Unit])
)(f: (Stream[F, O], Stream[F2, O2]) => Stream[F2, O3]): Stream[F2, O3] =
f(this, s2)

/** Fails this stream with a [[TimeoutException]] if it does not complete within given `timeout`. */
def timeout[F2[x] >: F[x]: Concurrent: Timer](
timeout: FiniteDuration
): Stream[F2, O] =
this.interruptWhen(
Timer[F2]
.sleep(timeout)
.as(Left(new TimeoutException(s"Timed out after $timeout")))
.widen[Either[Throwable, Unit]]
)

/**
* Translates effect type from `F` to `G` using the supplied `FunctionK`.
*
Expand Down
31 changes: 31 additions & 0 deletions core/shared/src/test/scala/fs2/StreamSpec.scala
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import cats.effect._
import cats.effect.concurrent.{Deferred, Ref, Semaphore}
import cats.implicits._
import scala.concurrent.duration._
import scala.concurrent.TimeoutException
import org.scalactic.anyvals._
import org.scalatest.{Assertion, Succeeded}
import fs2.concurrent.{Queue, SignallingRef}
Expand Down Expand Up @@ -3786,4 +3787,34 @@ class StreamSpec extends Fs2Spec {
}
}
}

"withTimeout" - {
"timeout never-ending stream" in {
Stream.never[IO].timeout(100.millis).compile.drain.assertThrows[TimeoutException]
}

"not trigger timeout on successfully completed stream" in {
Stream.sleep(10.millis).timeout(1.second).compile.drain.assertNoException
}

"compose timeouts d1 and d2 when d1 < d2" in {
val d1 = 20.millis
val d2 = 30.millis
(Stream.sleep(10.millis).timeout(d1) ++ Stream.sleep(30.millis))
.timeout(d2)
.compile
.drain
.assertThrows[TimeoutException]
}

"compose timeouts d1 and d2 when d1 > d2" in {
val d1 = 40.millis
val d2 = 30.millis
(Stream.sleep(10.millis).timeout(d1) ++ Stream.sleep(25.millis))
.timeout(d2)
.compile
.drain
.assertThrows[TimeoutException]
}
}
}

0 comments on commit 61b8258

Please sign in to comment.