diff --git a/build.sbt b/build.sbt index 6facac7..36d74ac 100644 --- a/build.sbt +++ b/build.sbt @@ -17,7 +17,7 @@ val developerURL: String = "https://matthicks.com" name := projectName ThisBuild / organization := org -ThisBuild / version := "0.3.0" +ThisBuild / version := "0.3.1-SNAPSHOT" ThisBuild / scalaVersion := scala213 ThisBuild / crossScalaVersions := allScalaVersions ThisBuild / scalacOptions ++= Seq("-unchecked", "-deprecation", "-feature") diff --git a/core/jvm-native/src/test/scala/spec/BasicsSpec.scala b/core/jvm-native/src/test/scala/spec/BasicsSpec.scala index cbb1a3f..c62e547 100644 --- a/core/jvm-native/src/test/scala/spec/BasicsSpec.scala +++ b/core/jvm-native/src/test/scala/spec/BasicsSpec.scala @@ -47,6 +47,14 @@ class BasicsSpec extends AnyWordSpec with Matchers { .sync() result should be("Recovered") } + "raise an error and recover" in { + val result = Task.error(new RuntimeException("Die Die Die")) + .handleError { _ => + Task.pure("Recovered") + } + .sync() + result should be("Recovered") + } "process a list of tasks to a task with a list" in { val list = List( Task("One"), Task("Two"), Task("Three") diff --git a/core/shared/src/main/scala/rapid/Stream.scala b/core/shared/src/main/scala/rapid/Stream.scala index aa06ff6..7f54c28 100644 --- a/core/shared/src/main/scala/rapid/Stream.scala +++ b/core/shared/src/main/scala/rapid/Stream.scala @@ -22,10 +22,16 @@ class Stream[Return](private val task: Task[Iterator[Return]]) extends AnyVal { * Takes values from the stream while the given predicate holds. * * @param p the predicate to test the values - * @return a new stream with the values that satisfy the predicate */ def takeWhile(p: Return => Boolean): Stream[Return] = new Stream(task.map(_.takeWhile(p))) + /** + * Takes n values from the stream and disregards the rest. + * + * @param n the number of values to take from the stream + */ + def take(n: Int): Stream[Return] = new Stream(task.map(_.take(n))) + /** * Transforms the values in the stream using the given function. * @@ -100,6 +106,32 @@ class Stream[Return](private val task: Task[Iterator[Return]]) extends AnyVal { */ def drain: Task[Unit] = count.unit + /** + * Cycles through all results but only returns the last element. Will error if the Stream is empty. + */ + def last: Task[Return] = task.map(_.reduce((_, b) => b)) + + /** + * Cycles through all results but only returns the last element or None if the stream is empty. + */ + def lastOption: Task[Option[Return]] = task.map { iterator => + if (iterator.hasNext) { + Some(iterator.reduce((_, b) => b)) + } else { + None + } + } + + /** + * Grabs only the first result from the stream. + */ + def first: Task[Return] = take(1).last + + /** + * Grabs only the first element or None if the stream is empty. + */ + def firstOption: Task[Option[Return]] = take(1).lastOption + /** * Converts the stream to a list. * @@ -160,6 +192,11 @@ object Stream { */ def fromIterator[Return](iterator: Task[Iterator[Return]]): Stream[Return] = new Stream[Return](iterator) + /** + * Forces a Task[Stream] into Stream + */ + def force[Return](stream: Task[Stream[Return]]): Stream[Return] = new Stream[Return](stream.flatMap(_.task)) + /** * Creates a Byte stream from the NIO Path * diff --git a/core/shared/src/main/scala/rapid/Task.scala b/core/shared/src/main/scala/rapid/Task.scala index 961898b..9218ac7 100644 --- a/core/shared/src/main/scala/rapid/Task.scala +++ b/core/shared/src/main/scala/rapid/Task.scala @@ -45,7 +45,7 @@ trait Task[Return] extends Any { * * @return either the result of the task or an exception */ - def attempt(): Task[Try[Return]] = Task { + def attempt: Task[Try[Return]] = Task { Try(invoke()) } @@ -55,7 +55,7 @@ trait Task[Return] extends Any { * @param f handler * @return Task[Return] */ - def handleError(f: Throwable => Task[Return]): Task[Return] = attempt() + def handleError(f: Throwable => Task[Return]): Task[Return] = attempt .flatMap { case Success(r) => Task.pure(r) case Failure(t) => f(t) @@ -66,7 +66,7 @@ trait Task[Return] extends Any { * * @param task the task to guarantee invocation of */ - def guarantee(task: Task[Unit]): Task[Return] = attempt() + def guarantee(task: Task[Unit]): Task[Return] = attempt .flatTap { _ => task } @@ -99,7 +99,7 @@ trait Task[Return] extends Any { * @param f the function to handle the result * @return existing signature */ - def flatTap(f: Return => Task[Unit]): Task[Return] = flatMap { r => + def flatTap(f: Return => Task[_]): Task[Return] = flatMap { r => f(r).map(_ => r) } @@ -146,6 +146,12 @@ object Task { override def flatMap[T](f: Return => Task[T]): Task[T] = copy(f.asInstanceOf[Any => Task[Any]] :: list) } + case class Error[Return](throwable: Throwable) extends AnyVal with Task[Return] { + override protected def invoke(): Return = throw throwable + + override def flatMap[T](f: Return => Task[T]): Task[T] = this.asInstanceOf[Task[T]] + } + class Completable[Return] extends Task[Return] { @volatile private var result: Option[Try[Return]] = None @@ -190,6 +196,14 @@ object Task { */ def apply[Return](f: => Return): Task[Return] = Single(() => f) + /** + * Creates a new task that raises an error when invoked. + * + * @param throwable the exception to raise + * @return a new Error task + */ + def error[Return](throwable: Throwable): Task[Return] = Error[Return](throwable) + /** * Creates a new Completable task. *