Skip to content

Commit

Permalink
Lots of convenience additions
Browse files Browse the repository at this point in the history
  • Loading branch information
darkfrog26 committed Dec 20, 2024
1 parent 8717519 commit 54695ea
Show file tree
Hide file tree
Showing 4 changed files with 65 additions and 6 deletions.
2 changes: 1 addition & 1 deletion build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ val developerURL: String = "https://matthicks.com"

name := projectName
ThisBuild / organization := org
ThisBuild / version := "0.3.0"
ThisBuild / version := "0.3.1-SNAPSHOT"
ThisBuild / scalaVersion := scala213
ThisBuild / crossScalaVersions := allScalaVersions
ThisBuild / scalacOptions ++= Seq("-unchecked", "-deprecation", "-feature")
Expand Down
8 changes: 8 additions & 0 deletions core/jvm-native/src/test/scala/spec/BasicsSpec.scala
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,14 @@ class BasicsSpec extends AnyWordSpec with Matchers {
.sync()
result should be("Recovered")
}
"raise an error and recover" in {
val result = Task.error(new RuntimeException("Die Die Die"))
.handleError { _ =>
Task.pure("Recovered")
}
.sync()
result should be("Recovered")
}
"process a list of tasks to a task with a list" in {
val list = List(
Task("One"), Task("Two"), Task("Three")
Expand Down
39 changes: 38 additions & 1 deletion core/shared/src/main/scala/rapid/Stream.scala
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,16 @@ class Stream[Return](private val task: Task[Iterator[Return]]) extends AnyVal {
* Takes values from the stream while the given predicate holds.
*
* @param p the predicate to test the values
* @return a new stream with the values that satisfy the predicate
*/
def takeWhile(p: Return => Boolean): Stream[Return] = new Stream(task.map(_.takeWhile(p)))

/**
* Takes n values from the stream and disregards the rest.
*
* @param n the number of values to take from the stream
*/
def take(n: Int): Stream[Return] = new Stream(task.map(_.take(n)))

/**
* Transforms the values in the stream using the given function.
*
Expand Down Expand Up @@ -100,6 +106,32 @@ class Stream[Return](private val task: Task[Iterator[Return]]) extends AnyVal {
*/
def drain: Task[Unit] = count.unit

/**
* Cycles through all results but only returns the last element. Will error if the Stream is empty.
*/
def last: Task[Return] = task.map(_.reduce((_, b) => b))

/**
* Cycles through all results but only returns the last element or None if the stream is empty.
*/
def lastOption: Task[Option[Return]] = task.map { iterator =>
if (iterator.hasNext) {
Some(iterator.reduce((_, b) => b))
} else {
None
}
}

/**
* Grabs only the first result from the stream.
*/
def first: Task[Return] = take(1).last

/**
* Grabs only the first element or None if the stream is empty.
*/
def firstOption: Task[Option[Return]] = take(1).lastOption

/**
* Converts the stream to a list.
*
Expand Down Expand Up @@ -160,6 +192,11 @@ object Stream {
*/
def fromIterator[Return](iterator: Task[Iterator[Return]]): Stream[Return] = new Stream[Return](iterator)

/**
* Forces a Task[Stream] into Stream
*/
def force[Return](stream: Task[Stream[Return]]): Stream[Return] = new Stream[Return](stream.flatMap(_.task))

/**
* Creates a Byte stream from the NIO Path
*
Expand Down
22 changes: 18 additions & 4 deletions core/shared/src/main/scala/rapid/Task.scala
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ trait Task[Return] extends Any {
*
* @return either the result of the task or an exception
*/
def attempt(): Task[Try[Return]] = Task {
def attempt: Task[Try[Return]] = Task {
Try(invoke())
}

Expand All @@ -55,7 +55,7 @@ trait Task[Return] extends Any {
* @param f handler
* @return Task[Return]
*/
def handleError(f: Throwable => Task[Return]): Task[Return] = attempt()
def handleError(f: Throwable => Task[Return]): Task[Return] = attempt
.flatMap {
case Success(r) => Task.pure(r)
case Failure(t) => f(t)
Expand All @@ -66,7 +66,7 @@ trait Task[Return] extends Any {
*
* @param task the task to guarantee invocation of
*/
def guarantee(task: Task[Unit]): Task[Return] = attempt()
def guarantee(task: Task[Unit]): Task[Return] = attempt
.flatTap { _ =>
task
}
Expand Down Expand Up @@ -99,7 +99,7 @@ trait Task[Return] extends Any {
* @param f the function to handle the result
* @return existing signature
*/
def flatTap(f: Return => Task[Unit]): Task[Return] = flatMap { r =>
def flatTap(f: Return => Task[_]): Task[Return] = flatMap { r =>
f(r).map(_ => r)
}

Expand Down Expand Up @@ -146,6 +146,12 @@ object Task {
override def flatMap[T](f: Return => Task[T]): Task[T] = copy(f.asInstanceOf[Any => Task[Any]] :: list)
}

case class Error[Return](throwable: Throwable) extends AnyVal with Task[Return] {
override protected def invoke(): Return = throw throwable

override def flatMap[T](f: Return => Task[T]): Task[T] = this.asInstanceOf[Task[T]]
}

class Completable[Return] extends Task[Return] {
@volatile private var result: Option[Try[Return]] = None

Expand Down Expand Up @@ -190,6 +196,14 @@ object Task {
*/
def apply[Return](f: => Return): Task[Return] = Single(() => f)

/**
* Creates a new task that raises an error when invoked.
*
* @param throwable the exception to raise
* @return a new Error task
*/
def error[Return](throwable: Throwable): Task[Return] = Error[Return](throwable)

/**
* Creates a new Completable task.
*
Expand Down

0 comments on commit 54695ea

Please sign in to comment.