diff --git a/build.sbt b/build.sbt index 40aba39440..529a7346cd 100644 --- a/build.sbt +++ b/build.sbt @@ -206,6 +206,8 @@ val ScalaCheckVersion = "1.15.4" val DisciplineVersion = "1.2.2" val CoopVersion = "1.1.1" +val MacrotaskExecutorVersion = "0.2.0" + replaceCommandAlias("ci", CI.AllCIs.map(_.toString).mkString) addCommandAlias(CI.JVM.command, CI.JVM.toString) @@ -264,9 +266,23 @@ lazy val kernel = crossProject(JSPlatform, JVMPlatform) .in(file("kernel")) .settings( name := "cats-effect-kernel", - libraryDependencies ++= Seq( - ("org.specs2" %%% "specs2-core" % Specs2Version % Test).cross(CrossVersion.for3Use2_13), - "org.typelevel" %%% "cats-core" % CatsVersion) + libraryDependencies += "org.typelevel" %%% "cats-core" % CatsVersion) + .jvmSettings(libraryDependencies += { + if (isDotty.value) + ("org.specs2" %%% "specs2-core" % Specs2Version % Test).cross(CrossVersion.for3Use2_13) + else + "org.specs2" %%% "specs2-core" % Specs2Version % Test + }) + .jsSettings( + libraryDependencies += { + if (isDotty.value) + ("org.specs2" %%% "specs2-core" % Specs2Version % Test) + .cross(CrossVersion.for3Use2_13) + .exclude("org.scala-js", "scala-js-macrotask-executor_sjs1_2.13") + else + "org.specs2" %%% "specs2-core" % Specs2Version % Test + }, + libraryDependencies += "org.scala-js" %%% "scala-js-macrotask-executor" % MacrotaskExecutorVersion % Test ) /** @@ -281,7 +297,10 @@ lazy val kernelTestkit = crossProject(JSPlatform, JVMPlatform) libraryDependencies ++= Seq( "org.typelevel" %%% "cats-free" % CatsVersion, "org.scalacheck" %%% "scalacheck" % ScalaCheckVersion, - "org.typelevel" %%% "coop" % CoopVersion) + "org.typelevel" %%% "coop" % CoopVersion), + mimaBinaryIssueFilters ++= Seq( + ProblemFilters.exclude[DirectMissingMethodProblem]( + "cats.effect.kernel.testkit.TestContext.this")) ) /** @@ -380,8 +399,7 @@ lazy val core = crossProject(JSPlatform, JVMPlatform) javacOptions ++= Seq("-source", "1.8", "-target", "1.8") ) .jsSettings( - libraryDependencies += "org.scala-js" %%% "scala-js-macrotask-executor" % "0.2.0" - ) + libraryDependencies += "org.scala-js" %%% "scala-js-macrotask-executor" % MacrotaskExecutorVersion) /** * Test support for the core project, providing various helpful instances like ScalaCheck @@ -392,7 +410,21 @@ lazy val testkit = crossProject(JSPlatform, JVMPlatform) .dependsOn(core, kernelTestkit) .settings( name := "cats-effect-testkit", - libraryDependencies ++= Seq("org.scalacheck" %%% "scalacheck" % ScalaCheckVersion)) + libraryDependencies += "org.scalacheck" %%% "scalacheck" % ScalaCheckVersion) + .jvmSettings(libraryDependencies += { + if (isDotty.value) + ("org.specs2" %%% "specs2-core" % Specs2Version % Test).cross(CrossVersion.for3Use2_13) + else + "org.specs2" %%% "specs2-core" % Specs2Version % Test + }) + .jsSettings(libraryDependencies += { + if (isDotty.value) + ("org.specs2" %%% "specs2-core" % Specs2Version % Test) + .cross(CrossVersion.for3Use2_13) + .exclude("org.scala-js", "scala-js-macrotask-executor_sjs1_2.13") + else + "org.specs2" %%% "specs2-core" % Specs2Version % Test + }) /** * Unit tests for the core project, utilizing the support provided by testkit. @@ -422,16 +454,28 @@ lazy val std = crossProject(JSPlatform, JVMPlatform) .dependsOn(kernel) .settings( name := "cats-effect-std", + libraryDependencies += "org.scalacheck" %%% "scalacheck" % ScalaCheckVersion % Test) + .jvmSettings(libraryDependencies += { + if (isDotty.value) + ("org.specs2" %%% "specs2-scalacheck" % Specs2Version % Test) + .cross(CrossVersion.for3Use2_13) + .exclude("org.scalacheck", "scalacheck_2.13") + .exclude("org.scalacheck", "scalacheck_sjs1_2.13") + else + "org.specs2" %%% "specs2-scalacheck" % Specs2Version % Test + }) + .jsSettings( libraryDependencies += { if (isDotty.value) ("org.specs2" %%% "specs2-scalacheck" % Specs2Version % Test) .cross(CrossVersion.for3Use2_13) + .exclude("org.scala-js", "scala-js-macrotask-executor_sjs1_2.13") .exclude("org.scalacheck", "scalacheck_2.13") .exclude("org.scalacheck", "scalacheck_sjs1_2.13") else "org.specs2" %%% "specs2-scalacheck" % Specs2Version % Test }, - libraryDependencies += "org.scalacheck" %%% "scalacheck" % ScalaCheckVersion % Test + libraryDependencies += "org.scala-js" %%% "scala-js-macrotask-executor" % MacrotaskExecutorVersion % Test ) /** diff --git a/core/shared/src/main/scala/cats/effect/IO.scala b/core/shared/src/main/scala/cats/effect/IO.scala index e2eece2d23..34160f631d 100644 --- a/core/shared/src/main/scala/cats/effect/IO.scala +++ b/core/shared/src/main/scala/cats/effect/IO.scala @@ -545,6 +545,19 @@ sealed abstract class IO[+A] private () extends IOPlatform[A] { def redeemWith[B](recover: Throwable => IO[B], bind: A => IO[B]): IO[B] = attempt.flatMap(_.fold(recover, bind)) + def replicateA(n: Int): IO[List[A]] = + if (n <= 0) + IO.pure(Nil) + else + flatMap(a => replicateA(n - 1).map(a :: _)) + + // TODO PR to cats + def replicateA_(n: Int): IO[Unit] = + if (n <= 0) + IO.unit + else + flatMap(_ => replicateA_(n - 1)) + /** * Returns an IO that will delay the execution of the source by the given duration. */ @@ -1450,6 +1463,9 @@ object IO extends IOCompanionPlatform with IOLowPriorityImplicits { override def productR[A, B](left: IO[A])(right: IO[B]): IO[B] = left.productR(right) + override def replicateA[A](n: Int, fa: IO[A]): IO[List[A]] = + fa.replicateA(n) + def start[A](fa: IO[A]): IO[FiberIO[A]] = fa.start diff --git a/example/js/src/main/scala/cats/effect/example/Example.scala b/example/js/src/main/scala/cats/effect/example/Example.scala index ecb408b096..55e84637e4 100644 --- a/example/js/src/main/scala/cats/effect/example/Example.scala +++ b/example/js/src/main/scala/cats/effect/example/Example.scala @@ -17,8 +17,6 @@ package cats.effect package example -import cats.syntax.all._ - object Example extends IOApp { def run(args: List[String]): IO[ExitCode] = diff --git a/kernel-testkit/shared/src/main/scala/cats/effect/kernel/testkit/TestContext.scala b/kernel-testkit/shared/src/main/scala/cats/effect/kernel/testkit/TestContext.scala index db318d7fd9..81d9edaf99 100644 --- a/kernel-testkit/shared/src/main/scala/cats/effect/kernel/testkit/TestContext.scala +++ b/kernel-testkit/shared/src/main/scala/cats/effect/kernel/testkit/TestContext.scala @@ -17,17 +17,25 @@ package cats.effect.kernel package testkit +import scala.annotation.tailrec + import scala.collection.immutable.SortedSet import scala.concurrent.ExecutionContext import scala.concurrent.duration._ + import scala.util.Random import scala.util.control.NonFatal +import java.util.Base64 + /** - * A `scala.concurrent.ExecutionContext` implementation and a provider of `cats.effect.Timer` - * instances, that can simulate async boundaries and time passage, useful for testing purposes. + * A [[scala.concurrent.ExecutionContext]] implementation that can simulate async boundaries and + * time passage, useful for law testing purposes. This is intended primarily for datatype + * implementors. Most end-users will be better served by the `cats.effect.testkit.TestControl` + * utility, rather than using `TestContext` directly. * * Usage for simulating an `ExecutionContext`): + * * {{{ * implicit val ec = TestContext() * @@ -50,91 +58,24 @@ import scala.util.control.NonFatal * assert(ec.state.tasks.isEmpty) * assert(ec.state.lastReportedFailure == None) * }}} - * - * Our `TestContext` can also simulate time passage, as we are able to builds a - * `cats.effect.Timer` instance for any data type that has a `LiftIO` instance: - * - * {{{ - * val ctx = TestContext() - * - * val timer: Timer[IO] = ctx.timer[IO] - * }}} - * - * We can now simulate actual time: - * - * {{{ - * val io = timer.sleep(10.seconds) *> IO(1 + 1) - * val f = io.unsafeToFuture() - * - * // This invariant holds true, because our IO is async - * assert(f.value == None) - * - * // Not yet completed, because this does not simulate time passing: - * ctx.tick() - * assert(f.value == None) - * - * // Simulating time passing: - * ctx.tick(10.seconds) - * assert(f.value == Some(Success(2)) - * }}} - * - * Simulating time makes this pretty useful for testing race conditions: - * - * {{{ - * val never = IO.async[Int](_ => {}) - * val timeoutError = new TimeoutException - * val timeout = timer.sleep(10.seconds) *> IO.raiseError[Int](timeoutError) - * - * val pair = (never, timeout).parMapN(_ + _) - * - * // Not yet - * ctx.tick() - * assert(f.value == None) - * // Not yet - * ctx.tick(5.seconds) - * assert(f.value == None) - * - * // Good to go: - * ctx.tick(5.seconds) - * assert(f.value, Some(Failure(timeoutError))) - * }}} - * - * @define timerExample - * {{{ val ctx = TestContext() // Building a Timer[IO] from this: implicit val timer: - * Timer[IO] = ctx.timer[IO] - * - * // Can now simulate time val io = timer.sleep(10.seconds) *> IO(1 + 1) val f = - * io.unsafeToFuture() - * - * // This invariant holds true, because our IO is async assert(f.value == None) - * - * // Not yet completed, because this does not simulate time passing: ctx.tick() assert(f.value - * == None) - * - * // Simulating time passing: ctx.tick(10.seconds) assert(f.value == Some(Success(2)) }}} */ -final class TestContext private () extends ExecutionContext { self => - import TestContext.{State, Task} +final class TestContext private (_seed: Long) extends ExecutionContext { self => + import TestContext.{Encoder, State, Task} + + private[this] val random = new Random(_seed) private[this] var stateRef = State( lastID = 0, - // our epoch is negative! this is just to give us an extra 263 years of space for Prop shrinking to play - clock = (Long.MinValue + 1).nanos, + clock = Duration.Zero, tasks = SortedSet.empty[Task], lastReportedFailure = None ) - /** - * Inherited from `ExecutionContext`, schedules a runnable for execution. - */ def execute(r: Runnable): Unit = synchronized { stateRef = stateRef.execute(r) } - /** - * Inherited from `ExecutionContext`, reports uncaught errors. - */ def reportFailure(cause: Throwable): Unit = synchronized { stateRef = stateRef.copy(lastReportedFailure = Some(cause)) @@ -144,8 +85,31 @@ final class TestContext private () extends ExecutionContext { self => * Returns the internal state of the `TestContext`, useful for testing that certain execution * conditions have been met. */ - def state: State = - synchronized(stateRef) + def state: State = synchronized(stateRef) + + /** + * Returns the current interval between "now" and the earliest scheduled task. If there are + * tasks which will run immediately, this will return `Duration.Zero`. Passing this value to + * [[tick]] will guarantee minimum time-oriented progress on the task queue (e.g. + * `tick(nextInterval())`). + */ + def nextInterval(): FiniteDuration = { + val s = state + (s.tasks.min.runsAt - s.clock).max(Duration.Zero) + } + + def advance(time: FiniteDuration): Unit = { + require(time > Duration.Zero) + + synchronized { + stateRef = stateRef.copy(clock = stateRef.clock + time) + } + } + + def advanceAndTick(time: FiniteDuration): Unit = { + advance(time) + tick() + } /** * Executes just one tick, one task, from the internal queue, useful for testing that a some @@ -173,73 +137,57 @@ final class TestContext private () extends ExecutionContext { self => val current = stateRef // extracting one task by taking the immediate tasks - extractOneTask(current, current.clock) match { + extractOneTask(current, current.clock, random) match { case Some((head, rest)) => stateRef = current.copy(tasks = rest) // execute task try head.task.run() catch { case NonFatal(ex) => reportFailure(ex) } true + case None => false } } - /** - * Triggers execution by going through the queue of scheduled tasks and executing them all, - * until no tasks remain in the queue to execute. - * - * Order of execution isn't guaranteed, the queued `Runnable`s are being shuffled in order to - * simulate the needed nondeterminism that happens with multi-threading. - * - * {{{ - * implicit val ec = TestContext() - * - * val f = Future(1 + 1).flatMap(_ + 1) - * // Execution is momentarily suspended in TestContext - * assert(f.value == None) - * - * // Simulating async execution: - * ec.tick() - * assert(f.value, Some(Success(2))) - * }}} - * - * @param time - * is an optional parameter for simulating time passing; - */ - def tick(time: FiniteDuration = Duration.Zero): Unit = { - val targetTime = this.stateRef.clock + time - var hasTasks = true + @tailrec + def tick(): Unit = + if (tickOne()) { + tick() + } - while (hasTasks) synchronized { - val current = this.stateRef + private[testkit] def tick(time: FiniteDuration): Unit = { + tick() + advanceAndTick(time) + } - extractOneTask(current, targetTime) match { - case Some((head, rest)) => - stateRef = current.copy(clock = head.runsAt, tasks = rest) - // execute task - try head.task.run() - catch { - case ex if NonFatal(ex) => - reportFailure(ex) - } + private[testkit] def tick$default$1(): FiniteDuration = Duration.Zero - case None => - stateRef = current.copy(clock = targetTime) - hasTasks = false - } + /** + * Repeatedly runs `tick(nextInterval())` until all work has completed. This is useful for + * emulating the quantized passage of time. For any discrete tick, the scheduler will randomly + * pick from all eligible tasks until the only remaining work is delayed. At that point, the + * scheduler will then advance the minimum delay (to the next time interval) and the process + * repeats. + * + * This is intuitively equivalent to "running to completion". + */ + @tailrec + def tickAll(): Unit = { + tick() + if (!stateRef.tasks.isEmpty) { + advance(nextInterval()) + tickAll() } } - def tickAll(time: FiniteDuration = Duration.Zero): Unit = { - tick(time) - - // some of our tasks may have enqueued more tasks - if (!this.stateRef.tasks.isEmpty) { - tickAll(time) - } + private[testkit] def tickAll(time: FiniteDuration): Unit = { + val _ = time + tickAll() } + private[testkit] def tickAll$default$1(): FiniteDuration = Duration.Zero + def schedule(delay: FiniteDuration, r: Runnable): () => Unit = synchronized { val current: State = stateRef @@ -254,17 +202,33 @@ final class TestContext private () extends ExecutionContext { self => def reportFailure(cause: Throwable): Unit = self.reportFailure(cause) } + /** + * Derives a new `ExecutionContext` which delegates to `this`, but wrapping all tasks in + * [[scala.concurrent.blocking]]. + */ + def deriveBlocking(): ExecutionContext = + new ExecutionContext { + import scala.concurrent.blocking + + def execute(runnable: Runnable): Unit = blocking(self.execute(runnable)) + def reportFailure(cause: Throwable): Unit = self.reportFailure(cause) + } + def now(): FiniteDuration = stateRef.clock + def seed: String = + new String(Encoder.encode(_seed.toString.getBytes)) + private def extractOneTask( current: State, - clock: FiniteDuration): Option[(Task, SortedSet[Task])] = + clock: FiniteDuration, + random: Random): Option[(Task, SortedSet[Task])] = current.tasks.headOption.filter(_.runsAt <= clock) match { case Some(value) => val firstTick = value.runsAt val forExecution = { val arr = current.tasks.iterator.takeWhile(_.runsAt == firstTick).take(10).toArray - arr(Random.nextInt(arr.length)) + arr(random.nextInt(arr.length)) } val remaining = current.tasks - forExecution @@ -282,11 +246,29 @@ final class TestContext private () extends ExecutionContext { self => object TestContext { + private val Decoder = Base64.getDecoder() + private val Encoder = Base64.getEncoder() + /** - * Builder for [[TestContext]] instances. + * Builder for [[TestContext]] instances. Utilizes a random seed, which may be obtained from + * the [[TestContext#seed]] method. */ def apply(): TestContext = - new TestContext + new TestContext(Random.nextLong()) + + /** + * Constructs a new [[TestContext]] using the given seed, which must be encoded as base64. + * Assuming this seed was produced by another `TestContext`, running the same program against + * the new context will result in the exact same task interleaving as happened in the previous + * context, provided that the same tasks are interleaved. Note that subtle differences between + * different runs of identical programs are possible, particularly if one program auto-`cede`s + * in a different place than the other one. This is an excellent and reliable mechanism for + * small, tightly-controlled programs with entirely deterministic side-effects, and a + * completely useless mechanism for anything where the scheduler ticks see different task + * lists despite identical configuration. + */ + def apply(seed: String): TestContext = + new TestContext(new String(Decoder.decode(seed)).toLong) /** * Used internally by [[TestContext]], represents the internal state used for task scheduling @@ -297,9 +279,6 @@ object TestContext { clock: FiniteDuration, tasks: SortedSet[Task], lastReportedFailure: Option[Throwable]) { - assert( - !tasks.headOption.exists(_.runsAt < clock), - "The runsAt for any task must never be in the past") /** * Returns a new state with the runnable scheduled for execution. diff --git a/kernel/shared/src/main/scala/cats/effect/kernel/Outcome.scala b/kernel/shared/src/main/scala/cats/effect/kernel/Outcome.scala index d4c497b864..e335f1b6da 100644 --- a/kernel/shared/src/main/scala/cats/effect/kernel/Outcome.scala +++ b/kernel/shared/src/main/scala/cats/effect/kernel/Outcome.scala @@ -95,7 +95,7 @@ private[kernel] trait LowPriorityImplicits { Show show { case Canceled() => "Canceled" case Errored(left) => s"Errored(${left.show})" - case Succeeded(_) => s"Succeeded()" + case Succeeded(_) => s"Succeeded(...)" } implicit def eq[F[_], E: Eq, A](implicit FA: Eq[F[A]]): Eq[Outcome[F, E, A]] = diff --git a/testkit/shared/src/main/scala/cats/effect/testkit/TestControl.scala b/testkit/shared/src/main/scala/cats/effect/testkit/TestControl.scala new file mode 100644 index 0000000000..483c68ebee --- /dev/null +++ b/testkit/shared/src/main/scala/cats/effect/testkit/TestControl.scala @@ -0,0 +1,377 @@ +/* + * Copyright 2020-2021 Typelevel + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package cats.effect +package testkit + +import cats.{~>, Id} +import cats.effect.unsafe.{IORuntime, IORuntimeConfig, Scheduler} + +import scala.concurrent.CancellationException +import scala.concurrent.duration.FiniteDuration + +import java.util.concurrent.atomic.AtomicReference + +/** + * Implements a fully functional single-threaded runtime for a [[cats.effect.IO]] program. When + * using this control system, `IO` programs will be executed on a single JVM thread, ''similar'' + * to how they would behave if the production runtime were configured to use a single worker + * thread regardless of underlying physical thread count. The results of the underlying `IO` + * will be produced by the [[results]] effect when ready, but nothing will actually evaluate + * until one of the ''tick'' effects on this class are sequenced. If the desired behavior is to + * simply run the `IO` fully to completion within the mock environment, respecting monotonic + * time, then [[tickAll]] is likely the desired effect (or, alternatively, + * [[TestControl.executeEmbed]]). + * + * In other words, `TestControl` is sort of like a "handle" to the runtime internals within the + * context of a specific `IO`'s execution. It makes it possible for users to manipulate and + * observe the execution of the `IO` under test from an external vantage point. It is important + * to understand that the ''outer'' `IO`s (e.g. those returned by the [[tick]] or [[results]] + * methods) are ''not'' running under the test control environment, and instead they are meant + * to be run by some outer runtime. Interactions between the outer runtime and the inner runtime + * (potentially via mechanisms like [[cats.effect.std.Queue]] or + * [[cats.effect.kernel.Deferred]]) are quite tricky and should only be done with extreme care. + * The likely outcome in such scenarios is that the `TestControl` runtime will detect the inner + * `IO` as being deadlocked whenever it is actually waiting on the external runtime. This could + * result in strange effects such as [[tickAll]] or `executeEmbed` terminating early. Do not + * construct such scenarios unless you're very confident you understand the implications of what + * you're doing. + * + * Where things ''differ'' from a single-threaded production runtime is in two critical areas. + * + * First, whenever multiple fibers are outstanding and ready to be resumed, the `TestControl` + * runtime will ''randomly'' choose between them, rather than taking them in a first-in, + * first-out fashion as the default production runtime will. This semantic is intended to + * simulate different scheduling interleavings, ensuring that race conditions are not + * accidentally masked by deterministic execution order. + * + * Second, within the context of the `TestControl`, ''time'' is very carefully and artificially + * controlled. In a sense, this runtime behaves as if it is executing on a single CPU which + * performs all operations infinitely fast. Any fibers which are immediately available for + * execution will be executed until no further fibers are available to run (assuming the use of + * `tickAll`). Through this entire process, the current clock (which is exposed to the program + * via [[IO.realTime]] and [[IO.monotonic]]) will remain fixed at the very beginning, meaning + * that no time is considered to have elapsed as a consequence of ''compute''. + * + * Note that the above means that it is relatively easy to create a deadlock on this runtime + * with a program which would not deadlock on either the JVM or JavaScript: + * + * {{{ + * // do not do this! + * IO.cede.foreverM.timeout(10.millis) + * }}} + * + * The above program spawns a fiber which yields forever, setting a timeout for 10 milliseconds + * which is ''intended'' to bring the loop to a halt. However, because the immediate task queue + * will never be empty, the test runtime will never advance time, meaning that the 10 + * milliseconds will never elapse and the timeout will not be hit. This will manifest as the + * [[tick]] and [[tickAll]] effects simply running forever and not returning if called. + * [[tickOne]] is safe to call on the above program, but it will always produce `true`. + * + * In order to advance time, you must use the [[advance]] effect to move the clock forward by a + * specified offset (which must be greater than 0). If you use the `tickAll` effect, the clock + * will be automatically advanced by the minimum amount necessary to reach the next pending + * task. For example, if the program contains an [[IO.sleep]] for `500.millis`, and there are no + * shorter sleeps, then time will need to be advanced by 500 milliseconds in order to make that + * fiber eligible for execution. + * + * At this point, the process repeats until all tasks are exhausted. If the program has reached + * a concluding value or exception, then it will be produced from the `unsafeRun` method which + * scheduled the `IO` on the runtime (pro tip: do ''not'' use `unsafeRunSync` with this runtime, + * since it will always result in immediate deadlock). If the program does ''not'' produce a + * result but also has no further work to perform (such as a program like [[IO.never]]), then + * `tickAll` will return but no result will have been produced by the `unsafeRun`. If this + * happens, [[isDeadlocked]] will return `true` and the program is in a "hung" state. This same + * situation on the production runtime would have manifested as an asynchronous deadlock. + * + * You should ''never'' use this runtime in a production code path. It is strictly meant for + * testing purposes, particularly testing programs that involve time functions and [[IO.sleep]]. + * + * Due to the semantics of this runtime, time will behave entirely consistently with a plausible + * production execution environment provided that you ''never'' observe time via side-effects, + * and exclusively through the [[IO.realTime]], [[IO.monotonic]], and [[IO.sleep]] functions + * (and other functions built on top of these). From the perspective of these functions, all + * computation is infinitely fast, and the only effect which advances time is [[IO.sleep]] (or + * if something external, such as the test harness, sequences the [[advance]] effect). However, + * an effect such as `IO(System.currentTimeMillis())` will "see through" the illusion, since the + * system clock is unaffected by this runtime. This is one reason why it is important to always + * and exclusively rely on `realTime` and `monotonic`, either directly on `IO` or via the + * typeclass abstractions. + * + * WARNING: ''Never'' use this runtime on programs which use the [[IO#evalOn]] method! The test + * runtime will detect this situation as an asynchronous deadlock. + * + * @see + * [[cats.effect.kernel.Clock]] + * @see + * [[tickAll]] + */ +final class TestControl[A] private ( + ctx: TestContext, + _results: AtomicReference[Option[Outcome[Id, Throwable, A]]]) { + + val results: IO[Option[Outcome[Id, Throwable, A]]] = IO(_results.get) + + /** + * Produces the minimum time which must elapse for a fiber to become eligible for execution. + * If fibers are currently eligible for execution, the result will be `Duration.Zero`. + */ + val nextInterval: IO[FiniteDuration] = + IO(ctx.nextInterval()) + + /** + * Advances the runtime clock by the specified amount (which must be positive). Does not + * execute any fibers, though may result in some previously-sleeping fibers to become pending + * and eligible for execution in the next [[tick]]. + */ + def advance(time: FiniteDuration): IO[Unit] = + IO(ctx.advance(time)) + + /** + * A convenience effect which advances time by the specified amount and then ticks once. Note + * that this method is very subtle and will often ''not'' do what you think it should. For + * example: + * + * {{{ + * // will never print! + * val program = IO.sleep(100.millis) *> IO.println("Hello, World!") + * + * TestControl.execute(program).flatMap(_.advanceAndTick(1.second)) + * }}} + * + * This is very subtle, but the problem is that time is advanced ''before'' the [[IO.sleep]] + * even has a chance to get scheduled! This means that when `sleep` is finally submitted to + * the runtime, it is scheduled for the time offset equal to `1.second + 100.millis`, since + * time was already advanced `1.second` before it had a chance to submit. Of course, time has + * only been advanced by `1.second`, thus the `sleep` never completes and the `println` cannot + * ever run. + * + * There are two possible solutions to this problem: either sequence [[tick]] ''first'' + * (before sequencing `advanceAndTick`) to ensure that the `sleep` has a chance to schedule + * itself, or simply use [[tickAll]] if you do not need to run assertions between time + * windows. + * + * @see + * [[advance]] + * @see + * [[tick]] + */ + def advanceAndTick(time: FiniteDuration): IO[Unit] = + IO(ctx.advanceAndTick(time)) + + /** + * Executes a single pending fiber and returns immediately. Does not advance time. Produces + * `false` if no fibers are pending. + */ + val tickOne: IO[Boolean] = + IO(ctx.tickOne()) + + /** + * Executes all pending fibers in a random order, repeating on new tasks enqueued by those + * fibers until all pending fibers have been exhausted. Does not result in the advancement of + * time. + * + * @see + * [[advance]] + * @see + * [[tickAll]] + */ + val tick: IO[Unit] = + IO(ctx.tick()) + + /** + * Drives the runtime until all fibers have been executed, then advances time until the next + * fiber becomes available (if relevant), and repeats until no further fibers are scheduled. + * Analogous to, though critically not the same as, running an [[IO]] on a single-threaded + * production runtime. + * + * This function will terminate for `IO`s which deadlock ''asynchronously'', but any program + * which runs in a loop without fully suspending will cause this function to run indefinitely. + * Also note that any `IO` which interacts with some external asynchronous scheduler (such as + * NIO) will be considered deadlocked for the purposes of this runtime. + * + * @see + * [[tick]] + */ + val tickAll: IO[Unit] = + IO(ctx.tickAll()) + + /** + * Produces `true` if the runtime has no remaining fibers, sleeping or otherwise, indicating + * an asynchronous deadlock has occurred. Or rather, ''either'' an asynchronous deadlock, or + * some interaction with an external asynchronous scheduler (such as another thread pool). + */ + val isDeadlocked: IO[Boolean] = + IO(ctx.state.tasks.isEmpty) + + /** + * Returns the base64-encoded seed which governs the random task interleaving during each + * [[tick]]. This is useful for reproducing test failures which came about due to some + * unexpected (though clearly plausible) execution order. + */ + def seed: String = ctx.seed +} + +object TestControl { + + /** + * Executes a given [[IO]] under fully mocked runtime control. Produces a `TestControl` which + * can be used to manipulate the mocked runtime and retrieve the results. Note that the outer + * `IO` (and the `IO`s produced by the `TestControl`) do ''not'' evaluate under mocked runtime + * control and must be evaluated by some external harness, usually some test framework + * integration. + * + * A simple example (returns an `IO` which must, itself, be run) using MUnit assertion syntax: + * + * {{{ + * val program = for { + * first <- IO.realTime // IO.monotonic also works + * _ <- IO.println("it is currently " + first) + * + * _ <- IO.sleep(100.milis) + * second <- IO.realTime + * _ <- IO.println("we slept and now it is " + second) + * + * _ <- IO.sleep(1.hour).timeout(1.minute) + * third <- IO.realTime + * _ <- IO.println("we slept a second time and now it is " + third) + * } yield () + * + * TestControl.execute(program) flatMap { control => + * for { + * first <- control.results + * _ <- IO(assert(first == None)) // we haven't finished yet + * + * _ <- control.tick + * // at this point, the "it is currently ..." line will have printed + * + * next1 <- control.nextInterval + * _ <- IO(assert(next1 == 100.millis)) + * + * _ <- control.advance(100.millis) + * // nothing has happened yet! + * _ <- control.tick + * // now the "we slept and now it is ..." line will have printed + * + * second <- control.results + * _ <- IO(assert(second == None)) // we're still not done yet + * + * next2 <- control.nextInterval + * _ <- IO(assert(next2 == 1.minute)) // we need to wait one minute for our next task, since we will hit the timeout + * + * _ <- control.advance(15.seconds) + * _ <- control.tick + * // nothing happens! + * + * next3 <- control.nextInterval + * _ <- IO(assert(next3 == 45.seconds)) // haven't gone far enough to hit the timeout + * + * _ <- control.advanceAndTick(45.seconds) + * // at this point, nothing will print because we hit the timeout exception! + * + * third <- control.results + * + * _ <- IO { + * assert(third.isDefined) + * assert(third.get.isError) // an exception, not a value! + * assert(third.get.fold(false, _.isInstanceOf[TimeoutException], _ => false)) + * } + * } yield () + * } + * }}} + * + * The above will run to completion within milliseconds. + * + * If your assertions are entirely intrinsic (within the program) and the test is such that + * time should advance in an automatic fashion, [[executeEmbed]] may be a more convenient + * option. + */ + def execute[A]( + program: IO[A], + config: IORuntimeConfig = IORuntimeConfig(), + seed: Option[String] = None): IO[TestControl[A]] = + IO { + val ctx = seed match { + case Some(seed) => TestContext(seed) + case None => TestContext() + } + + val runtime: IORuntime = IORuntime( + ctx, + ctx.deriveBlocking(), + new Scheduler { + def sleep(delay: FiniteDuration, task: Runnable): Runnable = { + val cancel = ctx.schedule(delay, task) + () => cancel() + } + + def nowMillis() = + ctx.now().toMillis + + def monotonicNanos() = + ctx.now().toNanos + }, + () => (), + config + ) + + val results = new AtomicReference[Option[Outcome[Id, Throwable, A]]](None) + program.unsafeRunAsyncOutcome(oc => results.set(Some(oc)))(runtime) + new TestControl(ctx, results) + } + + /** + * Executes an [[IO]] under fully mocked runtime control, returning the final results. This is + * very similar to calling `unsafeRunSync` on the program and wrapping it in an `IO`, except + * that the scheduler will use a mocked and quantized notion of time, all while executing on a + * singleton worker thread. This can cause some programs to deadlock which would otherwise + * complete normally, but it also allows programs which involve [[IO.sleep]] s of any length + * to complete almost instantly with correct semantics. + * + * Note that any program which involves an [[IO.async]] that waits for some external thread + * (including [[IO.evalOn]]) will be detected as a deadlock and will result in the + * `executeEmbed` effect immediately producing a [[NonTerminationException]]. + * + * @return + * An `IO` which runs the given program under a mocked runtime, producing the result or an + * error if the program runs to completion. If the program is canceled, a + * [[scala.concurrent.CancellationException]] will be raised within the `IO`. If the program + * fails to terminate with either a result or an error, a [[NonTerminationException]] will + * be raised. + */ + def executeEmbed[A]( + program: IO[A], + config: IORuntimeConfig = IORuntimeConfig(), + seed: Option[String] = None): IO[A] = + execute(program, config = config, seed = seed) flatMap { c => + val nt = new (Id ~> IO) { def apply[E](e: E) = IO.pure(e) } + + val onCancel = IO.defer(IO.raiseError(new CancellationException())) + val onNever = IO.raiseError(new NonTerminationException()) + val embedded = c.results.flatMap(_.map(_.mapK(nt).embed(onCancel)).getOrElse(onNever)) + + c.tickAll *> embedded + } + + final class NonTerminationException + extends RuntimeException( + "Program under test failed produce a result (either a value or an error) and has no further " + + "actions to take, likely indicating an asynchronous deadlock. This may also indicate some " + + "interaction with an external thread, potentially via IO.async or IO#evalOn. If this is the " + + "case, then it is likely you cannot use TestControl to correctly evaluate this program, and " + + "you should either use the production IORuntime (ideally via some integration with your " + + "testing framework), or attempt to refactor the program into smaller, more testable components.") +} diff --git a/testkit/shared/src/main/scala/cats/effect/testkit/TestInstances.scala b/testkit/shared/src/main/scala/cats/effect/testkit/TestInstances.scala index b5b8ee3644..36c8a94af4 100644 --- a/testkit/shared/src/main/scala/cats/effect/testkit/TestInstances.scala +++ b/testkit/shared/src/main/scala/cats/effect/testkit/TestInstances.scala @@ -190,7 +190,7 @@ trait TestInstances extends ParallelFGenerators with OutcomeGenerators with Sync .unsafeRunAsyncOutcome { oc => results = oc.mapK(someK) }(unsafe .IORuntime(ticker.ctx, ticker.ctx, scheduler, () => (), unsafe.IORuntimeConfig())) - ticker.ctx.tickAll(1.second) + ticker.ctx.tickAll() /*println("====================================") println(s"completed ioa with $results") diff --git a/tests/jvm/src/test/scala/cats/effect/ParasiticECSpec.scala b/tests/jvm/src/test/scala/cats/effect/ParasiticECSpec.scala index dfc4a39651..9bd605e46d 100644 --- a/tests/jvm/src/test/scala/cats/effect/ParasiticECSpec.scala +++ b/tests/jvm/src/test/scala/cats/effect/ParasiticECSpec.scala @@ -28,10 +28,12 @@ class ParasiticECSpec extends BaseSpec with TestInstances { "IO monad" should { "evaluate fibers correctly in presence of a parasitic execution context" in real { - implicit val ticker = Ticker() + val test = { + implicit val ticker = Ticker() - val test = IO(implicitly[Arbitrary[IO[Int]]].arbitrary.sample.get).flatMap { io => - IO.delay(io.eqv(io)) + IO(implicitly[Arbitrary[IO[Int]]].arbitrary.sample.get).flatMap { io => + IO.delay(io.eqv(io)) + } } val iterations = 15000 diff --git a/tests/shared/src/test/scala/cats/effect/IOSpec.scala b/tests/shared/src/test/scala/cats/effect/IOSpec.scala index ea7ceab028..41b19497d1 100644 --- a/tests/shared/src/test/scala/cats/effect/IOSpec.scala +++ b/tests/shared/src/test/scala/cats/effect/IOSpec.scala @@ -279,7 +279,7 @@ class IOSpec extends BaseSpec with Discipline with IOPlatformSpecification { val ioa = for { f <- IO.executionContext.start.evalOn(ec) - _ <- IO(ticker.ctx.tickAll()) + _ <- IO(ticker.ctx.tick()) oc <- f.join } yield oc @@ -293,7 +293,7 @@ class IOSpec extends BaseSpec with Discipline with IOPlatformSpecification { "cancel an already canceled fiber" in ticked { implicit ticker => val test = for { f <- IO.canceled.start - _ <- IO(ticker.ctx.tickAll()) + _ <- IO(ticker.ctx.tick()) _ <- f.cancel } yield () @@ -332,13 +332,13 @@ class IOSpec extends BaseSpec with Discipline with IOPlatformSpecification { val test = for { fiber <- async.start - _ <- IO(ticker.ctx.tickAll()) + _ <- IO(ticker.ctx.tick()) _ <- IO(cb(Right(42))) - _ <- IO(ticker.ctx.tickAll()) + _ <- IO(ticker.ctx.tick()) _ <- IO(cb(Right(43))) - _ <- IO(ticker.ctx.tickAll()) + _ <- IO(ticker.ctx.tick()) _ <- IO(cb(Left(TestException))) - _ <- IO(ticker.ctx.tickAll()) + _ <- IO(ticker.ctx.tick()) value <- fiber.joinWithNever } yield value @@ -455,9 +455,9 @@ class IOSpec extends BaseSpec with Discipline with IOPlatformSpecification { "propagate cancelation" in ticked { implicit ticker => (for { fiber <- IO.both(IO.never, IO.never).void.start - _ <- IO(ticker.ctx.tickAll()) + _ <- IO(ticker.ctx.tick()) _ <- fiber.cancel - _ <- IO(ticker.ctx.tickAll()) + _ <- IO(ticker.ctx.tick()) oc <- fiber.join } yield oc) must completeAs(Outcome.canceled[IO, Throwable, Unit]) } @@ -468,9 +468,9 @@ class IOSpec extends BaseSpec with Discipline with IOPlatformSpecification { r <- Ref[IO].of(false) fiber <- IO.both(IO.never.onCancel(l.set(true)), IO.never.onCancel(r.set(true))).start - _ <- IO(ticker.ctx.tickAll()) + _ <- IO(ticker.ctx.tick()) _ <- fiber.cancel - _ <- IO(ticker.ctx.tickAll()) + _ <- IO(ticker.ctx.tick()) l2 <- l.get r2 <- r.get } yield (l2 -> r2)) must completeAs(true -> true) @@ -544,9 +544,9 @@ class IOSpec extends BaseSpec with Discipline with IOPlatformSpecification { r <- Ref.of[IO, Boolean](false) fiber <- IO.race(IO.never.onCancel(l.set(true)), IO.never.onCancel(r.set(true))).start - _ <- IO(ticker.ctx.tickAll()) + _ <- IO(ticker.ctx.tick()) _ <- fiber.cancel - _ <- IO(ticker.ctx.tickAll()) + _ <- IO(ticker.ctx.tick()) l2 <- l.get r2 <- r.get } yield (l2 -> r2)) must completeAs(true -> true) @@ -624,7 +624,7 @@ class IOSpec extends BaseSpec with Discipline with IOPlatformSpecification { val ioa = for { f <- (IO.never: IO[Unit]).start ec <- IO.executionContext - _ <- IO(ec.asInstanceOf[TestContext].tickAll()) + _ <- IO(ec.asInstanceOf[TestContext].tick()) _ <- f.cancel oc <- f.join } yield oc @@ -640,7 +640,7 @@ class IOSpec extends BaseSpec with Discipline with IOPlatformSpecification { val ioa = for { f <- target.start - _ <- IO(ticker.ctx.tickAll()) + _ <- IO(ticker.ctx.tick()) _ <- f.cancel } yield () @@ -658,7 +658,7 @@ class IOSpec extends BaseSpec with Discipline with IOPlatformSpecification { val ioa = for { f <- target.start - _ <- IO(ticker.ctx.tickAll()) + _ <- IO(ticker.ctx.tick()) _ <- f.cancel } yield () @@ -720,7 +720,7 @@ class IOSpec extends BaseSpec with Discipline with IOPlatformSpecification { val test = for { f <- body.onCancel(IO(results ::= 2)).onCancel(IO(results ::= 1)).start - _ <- IO(ticker.ctx.tickAll()) + _ <- IO(ticker.ctx.tick()) _ <- f.cancel back <- IO(results) } yield back @@ -927,11 +927,11 @@ class IOSpec extends BaseSpec with Discipline with IOPlatformSpecification { val test = for { f <- subject.start - _ <- IO(ticker.ctx.tickAll()) // schedule everything + _ <- IO(ticker.ctx.tick()) // schedule everything _ <- f.cancel.start - _ <- IO(ticker.ctx.tickAll()) // get inside the finalizer suspension + _ <- IO(ticker.ctx.tick()) // get inside the finalizer suspension _ <- IO(cb(Right(()))) - _ <- IO(ticker.ctx.tickAll()) // show that the finalizer didn't explode + _ <- IO(ticker.ctx.tick()) // show that the finalizer didn't explode } yield () test must completeAs(()) // ...but not throw an exception @@ -948,7 +948,7 @@ class IOSpec extends BaseSpec with Discipline with IOPlatformSpecification { IO.pure(Some(fin)) } - val test = target.start flatMap { f => IO(ticker.ctx.tickAll()) *> f.cancel } + val test = target.start flatMap { f => IO(ticker.ctx.tick()) *> f.cancel } test must completeAs(()) success must beTrue diff --git a/tests/shared/src/test/scala/cats/effect/MemoizeSpec.scala b/tests/shared/src/test/scala/cats/effect/MemoizeSpec.scala index 09eed0f8eb..bbc293d7c5 100644 --- a/tests/shared/src/test/scala/cats/effect/MemoizeSpec.scala +++ b/tests/shared/src/test/scala/cats/effect/MemoizeSpec.scala @@ -40,7 +40,7 @@ class MemoizeSpec extends BaseSpec with Discipline { } yield v val result = op.unsafeToFuture() - ticker.ctx.tickAll() + ticker.ctx.tick() result.value mustEqual Some(Success(0)) } @@ -60,7 +60,7 @@ class MemoizeSpec extends BaseSpec with Discipline { } yield (x, y, v) val result = op.unsafeToFuture() - ticker.ctx.tickAll() + ticker.ctx.tick() result.value mustEqual Some(Success((1, 1, 1))) } @@ -76,12 +76,12 @@ class MemoizeSpec extends BaseSpec with Discipline { memoized <- Concurrent[IO].memoize(action) _ <- memoized.start x <- memoized - _ <- IO(ticker.ctx.tickAll()) + _ <- IO(ticker.ctx.tick()) v <- ref.get } yield x -> v val result = op.unsafeToFuture() - ticker.ctx.tickAll() + ticker.ctx.tick() result.value mustEqual Some(Success((1, 1))) } @@ -104,7 +104,7 @@ class MemoizeSpec extends BaseSpec with Discipline { } yield res val result = op.unsafeToFuture() - ticker.ctx.tickAll(500.millis) + ticker.ctx.tickAll() result.value mustEqual Some(Success(false)) } @@ -126,7 +126,7 @@ class MemoizeSpec extends BaseSpec with Discipline { } yield res val result = op.unsafeToFuture() - ticker.ctx.tickAll(600.millis) + ticker.ctx.tickAll() result.value mustEqual Some(Success(false)) } @@ -148,7 +148,7 @@ class MemoizeSpec extends BaseSpec with Discipline { } yield res val result = op.unsafeToFuture() - ticker.ctx.tickAll(600.millis) + ticker.ctx.tickAll() result.value mustEqual Some(Success(false)) } @@ -169,7 +169,7 @@ class MemoizeSpec extends BaseSpec with Discipline { } yield v1 -> v2 val result = op.unsafeToFuture() - ticker.ctx.tickAll(500.millis) + ticker.ctx.tickAll() result.value mustEqual Some(Success((2, 1))) } @@ -190,7 +190,7 @@ class MemoizeSpec extends BaseSpec with Discipline { } yield v val result = op.unsafeToFuture() - ticker.ctx.tickAll(500.millis) + ticker.ctx.tickAll() result.value mustEqual Some(Success(true)) } diff --git a/tests/shared/src/test/scala/cats/effect/ResourceSpec.scala b/tests/shared/src/test/scala/cats/effect/ResourceSpec.scala index b055a50562..3a68a71d5d 100644 --- a/tests/shared/src/test/scala/cats/effect/ResourceSpec.scala +++ b/tests/shared/src/test/scala/cats/effect/ResourceSpec.scala @@ -469,7 +469,8 @@ class ResourceSpec extends BaseSpec with ScalaCheck with Discipline { // after 1 second: // both resources have allocated (concurrency, serially it would happen after 2 seconds) // resources are still open during `use` (correctness) - ticker.ctx.tick(1.second) + ticker.ctx.tick() + ticker.ctx.advanceAndTick(1.second) leftAllocated must beTrue rightAllocated must beTrue leftReleasing must beFalse @@ -477,7 +478,7 @@ class ResourceSpec extends BaseSpec with ScalaCheck with Discipline { // after 2 seconds: // both resources have started cleanup (correctness) - ticker.ctx.tick(1.second) + ticker.ctx.advanceAndTick(1.second) leftReleasing must beTrue rightReleasing must beTrue leftReleased must beFalse @@ -485,7 +486,7 @@ class ResourceSpec extends BaseSpec with ScalaCheck with Discipline { // after 3 seconds: // both resources have terminated cleanup (concurrency, serially it would happen after 4 seconds) - ticker.ctx.tick(1.second) + ticker.ctx.advanceAndTick(1.second) leftReleased must beTrue rightReleased must beTrue } @@ -518,7 +519,8 @@ class ResourceSpec extends BaseSpec with ScalaCheck with Discipline { // after 1 second: // both resources have allocated (concurrency, serially it would happen after 2 seconds) // resources are still open during `flatMap` (correctness) - ticker.ctx.tick(1.second) + ticker.ctx.tick() + ticker.ctx.advanceAndTick(1.second) leftAllocated must beTrue rightAllocated must beTrue leftReleasing must beFalse @@ -526,7 +528,7 @@ class ResourceSpec extends BaseSpec with ScalaCheck with Discipline { // after 2 seconds: // both resources have started cleanup (interruption, or rhs would start releasing after 3 seconds) - ticker.ctx.tick(1.second) + ticker.ctx.advanceAndTick(1.second) leftReleasing must beTrue rightReleasing must beTrue leftReleased must beFalse @@ -534,7 +536,7 @@ class ResourceSpec extends BaseSpec with ScalaCheck with Discipline { // after 3 seconds: // both resources have terminated cleanup (concurrency, serially it would happen after 4 seconds) - ticker.ctx.tick(1.second) + ticker.ctx.advanceAndTick(1.second) leftReleased must beTrue rightReleased must beTrue } @@ -565,7 +567,8 @@ class ResourceSpec extends BaseSpec with ScalaCheck with Discipline { // after 1 second: // rhs has partially allocated, lhs executing - ticker.ctx.tick(1.second) + ticker.ctx.tick() + ticker.ctx.advanceAndTick(1.second) leftAllocated must beFalse rightAllocated must beTrue rightErrored must beFalse @@ -574,7 +577,7 @@ class ResourceSpec extends BaseSpec with ScalaCheck with Discipline { // after 2 seconds: // rhs has failed, release blocked since lhs is in uninterruptible allocation - ticker.ctx.tick(1.second) + ticker.ctx.advanceAndTick(1.second) leftAllocated must beFalse rightAllocated must beTrue rightErrored must beTrue @@ -584,7 +587,7 @@ class ResourceSpec extends BaseSpec with ScalaCheck with Discipline { // after 3 seconds: // lhs completes allocation (concurrency, serially it would happen after 4 seconds) // both resources have started cleanup (correctness, error propagates to both sides) - ticker.ctx.tick(1.second) + ticker.ctx.advanceAndTick(1.second) leftAllocated must beTrue leftReleasing must beTrue rightReleasing must beTrue @@ -593,7 +596,7 @@ class ResourceSpec extends BaseSpec with ScalaCheck with Discipline { // after 4 seconds: // both resource have terminated cleanup (concurrency, serially it would happen after 5 seconds) - ticker.ctx.tick(1.second) + ticker.ctx.advanceAndTick(1.second) leftReleased must beTrue rightReleased must beTrue } @@ -679,7 +682,7 @@ class ResourceSpec extends BaseSpec with ScalaCheck with Discipline { (Resource.eval(IO.canceled)).uncancelable.onCancel(Resource.eval(IO { fired = true })) test.use_.unsafeToFuture() - ticker.ctx.tickAll() + ticker.ctx.tick() fired must beFalse } @@ -699,11 +702,12 @@ class ResourceSpec extends BaseSpec with ScalaCheck with Discipline { (outerInit *> async *> waitR).use_.unsafeToFuture() - ticker.ctx.tick(1.second) + ticker.ctx.tick() + ticker.ctx.advanceAndTick(1.second) innerClosed must beFalse outerClosed must beFalse - ticker.ctx.tick(1.second) + ticker.ctx.advanceAndTick(1.second) innerClosed must beTrue outerClosed must beTrue } @@ -720,11 +724,12 @@ class ResourceSpec extends BaseSpec with ScalaCheck with Discipline { target.use_.unsafeToFuture() - ticker.ctx.tick(1.second) + ticker.ctx.tick() + ticker.ctx.advanceAndTick(1.second) leftClosed must beTrue rightClosed must beFalse - ticker.ctx.tick(1.second) + ticker.ctx.advanceAndTick(1.second) rightClosed must beTrue } } @@ -790,25 +795,26 @@ class ResourceSpec extends BaseSpec with ScalaCheck with Discipline { target.use_.unsafeToFuture() - ticker.ctx.tick(50.millis) + ticker.ctx.tick() + ticker.ctx.advanceAndTick(50.millis) winnerClosed must beFalse loserClosed must beFalse completed must beFalse results must beNull - ticker.ctx.tick(50.millis) + ticker.ctx.advanceAndTick(50.millis) winnerClosed must beFalse loserClosed must beTrue completed must beFalse results must beLeft("winner") - ticker.ctx.tick(50.millis) + ticker.ctx.advanceAndTick(50.millis) winnerClosed must beFalse loserClosed must beTrue completed must beFalse results must beLeft("winner") - ticker.ctx.tick(1.second) + ticker.ctx.advanceAndTick(1.second) winnerClosed must beTrue completed must beTrue } @@ -820,7 +826,7 @@ class ResourceSpec extends BaseSpec with ScalaCheck with Discipline { val target = waitR.start *> waitR *> Resource.eval(IO { completed = true }) target.use_.unsafeToFuture() - ticker.ctx.tick(1.second) + ticker.ctx.tickAll() completed must beTrue } @@ -843,12 +849,12 @@ class ResourceSpec extends BaseSpec with ScalaCheck with Discipline { val target = Resource.make(wait)(_ => IO(i += 1)).start *> finish target.use_.unsafeToFuture() - ticker.ctx.tick(50.millis) + ticker.ctx.advanceAndTick(50.millis) completed must beTrue i mustEqual 0 - ticker.ctx.tick(1.second) + ticker.ctx.advanceAndTick(1.second) i mustEqual 1 } @@ -860,10 +866,11 @@ class ResourceSpec extends BaseSpec with ScalaCheck with Discipline { target.use_.unsafeToFuture() - ticker.ctx.tick(1.second) + ticker.ctx.tick() + ticker.ctx.advanceAndTick(1.second) i mustEqual 1 - ticker.ctx.tick(1.second) + ticker.ctx.advanceAndTick(1.second) i mustEqual 1 } @@ -880,14 +887,15 @@ class ResourceSpec extends BaseSpec with ScalaCheck with Discipline { target.use_.unsafeToFuture() - ticker.ctx.tick(100.millis) + ticker.ctx.tick() + ticker.ctx.advanceAndTick(100.millis) i mustEqual 0 - ticker.ctx.tick(900.millis) + ticker.ctx.advanceAndTick(900.millis) i mustEqual 0 completed must beFalse - ticker.ctx.tick(1.second) + ticker.ctx.advanceAndTick(1.second) i mustEqual 1 completed must beTrue } @@ -917,7 +925,8 @@ class ResourceSpec extends BaseSpec with ScalaCheck with Discipline { // after 1 second: // both resources have allocated (concurrency, serially it would happen after 2 seconds) // resources are still open during `use` (correctness) - ticker.ctx.tick(1.second) + ticker.ctx.tick() + ticker.ctx.advanceAndTick(1.second) leftAllocated must beTrue rightAllocated must beTrue leftReleasing must beFalse @@ -925,7 +934,7 @@ class ResourceSpec extends BaseSpec with ScalaCheck with Discipline { // after 2 seconds: // both resources have started cleanup (correctness) - ticker.ctx.tick(1.second) + ticker.ctx.advanceAndTick(1.second) leftReleasing must beTrue rightReleasing must beTrue leftReleased must beFalse @@ -933,7 +942,7 @@ class ResourceSpec extends BaseSpec with ScalaCheck with Discipline { // after 3 seconds: // both resources have terminated cleanup (concurrency, serially it would happen after 4 seconds) - ticker.ctx.tick(1.second) + ticker.ctx.advanceAndTick(1.second) leftReleased must beTrue rightReleased must beTrue } diff --git a/tests/shared/src/test/scala/cats/effect/std/BackpressureSpec.scala b/tests/shared/src/test/scala/cats/effect/std/BackpressureSpec.scala index ec4be251be..f9e1bbad32 100644 --- a/tests/shared/src/test/scala/cats/effect/std/BackpressureSpec.scala +++ b/tests/shared/src/test/scala/cats/effect/std/BackpressureSpec.scala @@ -21,7 +21,7 @@ import cats.syntax.all._ import scala.concurrent.duration._ -class BackpressureTests extends BaseSpec { +class BackpressureSpec extends BaseSpec { "Backpressure" should { "Lossy Strategy should return IO[None] when no permits are available" in ticked { diff --git a/tests/shared/src/test/scala/cats/effect/testkit/TestControlSpec.scala b/tests/shared/src/test/scala/cats/effect/testkit/TestControlSpec.scala new file mode 100644 index 0000000000..6723d3e75c --- /dev/null +++ b/tests/shared/src/test/scala/cats/effect/testkit/TestControlSpec.scala @@ -0,0 +1,155 @@ +/* + * Copyright 2020-2021 Typelevel + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package cats.effect +package testkit + +import cats.Id + +import org.specs2.matcher.Matcher + +import scala.concurrent.CancellationException +import scala.concurrent.duration._ + +class TestControlSpec extends BaseSpec { + + val simple = IO.unit + + val ceded = IO.cede.replicateA(10) *> IO.unit + + val longSleeps = for { + first <- IO.monotonic + _ <- IO.sleep(1.hour) + second <- IO.monotonic + _ <- IO.race(IO.sleep(1.day), IO.sleep(1.day + 1.nanosecond)) + third <- IO.monotonic + } yield (first.toCoarsest, second.toCoarsest, third.toCoarsest) + + val deadlock: IO[Unit] = IO.never + + "execute" should { + "run a simple IO" in real { + TestControl.execute(simple) flatMap { control => + for { + r1 <- control.results + _ <- IO(r1 must beNone) + + _ <- control.tick + + r2 <- control.results + _ <- IO(r2 must beSome(beSucceeded(()))) + } yield ok + } + } + + "run a ceded IO in a single tick" in real { + TestControl.execute(simple) flatMap { control => + for { + r1 <- control.results + _ <- IO(r1 must beNone) + + _ <- control.tick + + r2 <- control.results + _ <- IO(r2 must beSome(beSucceeded(()))) + } yield ok + } + } + + "run an IO with long sleeps" in real { + TestControl.execute(longSleeps) flatMap { control => + for { + r1 <- control.results + _ <- IO(r1 must beNone) + + _ <- control.tick + r2 <- control.results + _ <- IO(r2 must beNone) + + int1 <- control.nextInterval + _ <- IO(int1 mustEqual 1.hour) + + _ <- control.advanceAndTick(1.hour) + r3 <- control.results + _ <- IO(r3 must beNone) + + int2 <- control.nextInterval + _ <- IO(int2 mustEqual 1.day) + + _ <- control.advanceAndTick(1.day) + + r4 <- control.results + _ <- IO(r4 must beSome(beSucceeded((0.nanoseconds, 1.hour, 25.hours)))) + } yield ok + } + } + + "detect a deadlock" in real { + TestControl.execute(deadlock) flatMap { control => + for { + r1 <- control.results + _ <- IO(r1 must beNone) + + _ <- control.tick + id <- control.isDeadlocked + _ <- IO(id must beTrue) + + r2 <- control.results + _ <- IO(r2 must beNone) + } yield ok + } + } + } + + "executeFully" should { + "run a simple IO" in real { + TestControl.executeEmbed(simple) flatMap { r => IO(r mustEqual (())) } + } + + "run an IO with long sleeps" in real { + TestControl.executeEmbed(longSleeps) flatMap { r => + IO(r mustEqual ((0.nanoseconds, 1.hour, 25.hours))) + } + } + + "detect a deadlock" in real { + TestControl.executeEmbed(deadlock).attempt flatMap { r => + IO { + r must beLike { case Left(_: TestControl.NonTerminationException) => ok } + } + } + } + + "run an IO which produces an error" in real { + case object TestException extends RuntimeException + + TestControl.executeEmbed(IO.raiseError[Unit](TestException)).attempt flatMap { r => + IO(r must beLeft(TestException: Throwable)) + } + } + + "run an IO which self-cancels" in real { + TestControl.executeEmbed(IO.canceled).attempt flatMap { r => + IO { + r must beLike { case Left(_: CancellationException) => ok } + } + } + } + } + + private def beSucceeded[A](value: A): Matcher[Outcome[Id, Throwable, A]] = + (_: Outcome[Id, Throwable, A]) == Outcome.succeeded[Id, Throwable, A](value) +}