Skip to content

Commit

Permalink
Improvements to Stream implementation and simplification of Task
Browse files Browse the repository at this point in the history
  • Loading branch information
darkfrog26 committed Nov 10, 2024
1 parent e090750 commit ce59072
Show file tree
Hide file tree
Showing 9 changed files with 101 additions and 1,274 deletions.
1,121 changes: 0 additions & 1,121 deletions benchmark/benchmarks.json

This file was deleted.

28 changes: 11 additions & 17 deletions benchmark/src/main/scala/benchmark/StreamBenchmark.scala
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,10 @@ import java.util.concurrent.TimeUnit
@OutputTimeUnit(TimeUnit.SECONDS)
@State(Scope.Thread)
class StreamBenchmark {
@Param(Array("1000")) //, "10000", "100000"))
@Param(Array("1000", "10000", "100000"))
var size: Int = _

lazy val rapidStream: rapid.Stream[Int] = rapid.Stream.fromList((1 to size).toList)
lazy val rapidFs2Stream: fs2.Stream[Task, Int] = fs2.Stream.emits(1 to size)
lazy val fs2Stream: fs2.Stream[IO, Int] = fs2.Stream.emits(1 to size)

@Setup(Level.Trial)
Expand All @@ -33,32 +32,27 @@ class StreamBenchmark {
}

@Benchmark
def rapidFs2StreamToList(): List[Int] = {
rapidFs2Stream.compile.toList.sync()
def fs2StreamToList(): List[Int] = {
fs2Stream.compile.toList.unsafeRunSync()
}

// @Benchmark
// def fs2StreamToList(): List[Int] = {
// fs2Stream.compile.toList.unsafeRunSync()
// }

@Benchmark
def rapidStreamFilter(): List[Int] = {
rapidStream.filter(_ % 2 == 0).toList.sync()
}

// @Benchmark
// def fs2StreamFilter(): List[Int] = {
// fs2Stream.filter(_ % 2 == 0).compile.toList.unsafeRunSync()
// }
@Benchmark
def fs2StreamFilter(): List[Int] = {
fs2Stream.filter(_ % 2 == 0).compile.toList.unsafeRunSync()
}

@Benchmark
def rapidStreamMap(): List[Int] = {
rapidStream.map(_ * 2).toList.sync()
}

// @Benchmark
// def fs2StreamMap(): List[Int] = {
// fs2Stream.map(_ * 2).compile.toList.unsafeRunSync()
// }
@Benchmark
def fs2StreamMap(): List[Int] = {
fs2Stream.map(_ * 2).compile.toList.unsafeRunSync()
}
}
22 changes: 11 additions & 11 deletions cats/src/main/scala/rapid/cats/package.scala
Original file line number Diff line number Diff line change
Expand Up @@ -7,15 +7,15 @@ package object cats {
def toIO: IO[Return] = IO.blocking(task.sync())
}

implicit class StreamExtras[Return](val stream: Stream[Return]) extends AnyVal {
def toFS2: fs2.Stream[IO, Return] = {
def loop(stream: Stream[Return]): fs2.Stream[IO, Return] = {
fs2.Stream.eval(stream.pull.toTask.toIO).flatMap {
case Some((head, tail)) => fs2.Stream.emit(head) ++ loop(tail)
case None => fs2.Stream.empty
}
}
loop(stream)
}
}
// implicit class StreamExtras[Return](val stream: Stream[Return]) extends AnyVal {
// def toFS2: fs2.Stream[IO, Return] = {
// def loop(stream: Stream[Return]): fs2.Stream[IO, Return] = {
// fs2.Stream.eval(stream.pull.toTask.toIO).flatMap {
// case Some((head, tail)) => fs2.Stream.emit(head) ++ loop(tail)
// case None => fs2.Stream.empty
// }
// }
// loop(stream)
// }
// }
}
7 changes: 0 additions & 7 deletions core/src/main/scala/rapid/ChainedTask.scala

This file was deleted.

2 changes: 1 addition & 1 deletion core/src/main/scala/rapid/Fiber.scala
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ class Fiber[Return](val task: Task[Return]) extends Task[Return] {
case t: Throwable => result = Left(t)
})

override protected lazy val f: () => Return = () => await()
override protected def invoke(): Return = await()

override def start(): Fiber[Return] = this

Expand Down
3 changes: 0 additions & 3 deletions core/src/main/scala/rapid/SimpleTask.scala

This file was deleted.

152 changes: 47 additions & 105 deletions core/src/main/scala/rapid/Stream.scala
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,22 @@ import java.util.concurrent.Semaphore
*
* @tparam Return the type of the values produced by this stream
*/
trait Stream[Return] { stream =>
class Stream[Return](private val task: Task[Iterator[Return]]) extends AnyVal {
/**
* Produces the next value in the stream, if any.
* Filters the values in the stream using the given predicate.
*
* @return a `Pull` that produces an optional pair of the next value and the remaining stream
* @param p the predicate to test the values
* @return a new stream with the values that satisfy the predicate
*/
def pull: Pull[Option[(Return, Stream[Return])]]
def filter(p: Return => Boolean): Stream[Return] = new Stream(task.map(_.filter(p)))

/**
* Takes values from the stream while the given predicate holds.
*
* @param p the predicate to test the values
* @return a new stream with the values that satisfy the predicate
*/
def takeWhile(p: Return => Boolean): Stream[Return] = new Stream(task.map(_.takeWhile(p)))

/**
* Transforms the values in the stream using the given function.
Expand All @@ -22,12 +31,7 @@ trait Stream[Return] { stream =>
* @tparam T the type of the transformed values
* @return a new stream with the transformed values
*/
def map[T](f: Return => T): Stream[T] = new Stream[T] {
def pull: Pull[Option[(T, Stream[T])]] = stream.pull.flatMap {
case Some((head, tail)) => Pull.pure(Some(f(head) -> tail.map(f)))
case None => Pull.pure(None)
}
}
def map[T](f: Return => T): Stream[T] = new Stream(task.map(_.map(f)))

/**
* Transforms the values in the stream using the given function that returns a new stream.
Expand All @@ -36,18 +40,20 @@ trait Stream[Return] { stream =>
* @tparam T the type of the values in the new streams
* @return a new stream with the transformed values
*/
def flatMap[T](f: Return => Stream[T]): Stream[T] = new Stream[T] {
def pull: Pull[Option[(T, Stream[T])]] = {
def go(s: Stream[Return]): Pull[Option[(T, Stream[T])]] = s.pull.flatMap {
case Some((head, tail)) => f(head).pull.flatMap {
case Some((fh, ft)) => Pull.pure(Some(fh -> ft.append(tail.flatMap(f))))
case None => go(tail)
}
case None => Pull.pure(None)
}
go(stream)
}
}
def flatMap[T](f: Return => Stream[T]): Stream[T] = new Stream(task.map { iterator =>
iterator.flatMap(r => f(r).task.sync())
})

/**
* Transforms the values in the stream using the given function that returns a task.
*
* @param f the function to transform the values into tasks
* @tparam T the type of the values in the tasks
* @return a new stream with the transformed values
*/
def evalMap[T](f: Return => Task[T]): Stream[T] = new Stream(task.map { iterator =>
iterator.map(f).map(_.sync())
})

/**
* Appends another stream to this stream.
Expand All @@ -56,64 +62,34 @@ trait Stream[Return] { stream =>
* @tparam T the type of the values in the appended stream
* @return a new stream with the values from both streams
*/
def append[T >: Return](that: => Stream[T]): Stream[T] = new Stream[T] {
def pull: Pull[Option[(T, Stream[T])]] = stream.pull.flatMap {
case Some((head, tail)) => Pull.pure(Some(head -> tail.append(that)))
case None => that.pull
}
}
def append[T >: Return](that: => Stream[T]): Stream[T] = new Stream(Task {
val iterator1 = task.sync()
val iterator2 = that.task.sync()
iterator1 ++ iterator2
})

/**
* Takes values from the stream while the given predicate holds.
* Converts the stream to a list.
*
* @param p the predicate to test the values
* @return a new stream with the values that satisfy the predicate
* @return a task that produces a list of the values in the stream
*/
def takeWhile(p: Return => Boolean): Stream[Return] = new Stream[Return] {
def pull: Pull[Option[(Return, Stream[Return])]] = stream.pull.flatMap {
case Some((head, tail)) =>
if (p(head)) Pull.pure(Some(head -> tail.takeWhile(p)))
else Pull.pure(None)
case None => Pull.pure(None)
}
}
def toList: Task[List[Return]] = task.map(_.toList)

/**
* Filters the values in the stream using the given predicate.
* Counts the number of elements in the stream and fully evaluates it.
*
* @param p the predicate to test the values
* @return a new stream with the values that satisfy the predicate
* @return a `Task[Int]` representing the total number of entries evaluated
*/
def filter(p: Return => Boolean): Stream[Return] = new Stream[Return] {
def pull: Pull[Option[(Return, Stream[Return])]] = {
def go(s: Stream[Return]): Pull[Option[(Return, Stream[Return])]] = s.pull.flatMap {
case Some((head, tail)) =>
if (p(head)) Pull.pure(Some(head -> new Stream[Return] {
def pull: Pull[Option[(Return, Stream[Return])]] = go(tail)
}))
else go(tail)
case None => Pull.pure(None)
}
go(stream)
}
}
def count: Task[Int] = task.map(_.size)
}

/*trait Stream[Return] { stream =>
/**
* Transforms the values in the stream using the given function that returns a task.
* Produces the next value in the stream, if any.
*
* @param f the function to transform the values into tasks
* @tparam T the type of the values in the tasks
* @return a new stream with the transformed values
* @return a `Pull` that produces an optional pair of the next value and the remaining stream
*/
def evalMap[T](f: Return => Task[T]): Stream[T] = new Stream[T] {
def pull: Pull[Option[(T, Stream[T])]] = stream.pull.flatMap {
case Some((head, tail)) =>
Pull.suspend {
f(head).map(result => Option(result -> tail.evalMap(f))).toPull
}
case None => Pull.pure(None)
}
}
def pull: Pull[Option[(Return, Stream[Return])]]
/**
* Transforms the values in the stream using the given function that returns a task, with a maximum concurrency.
Expand Down Expand Up @@ -146,37 +122,7 @@ trait Stream[Return] { stream =>
}
}
}

/**
* Converts the stream to a list.
*
* @return a task that produces a list of the values in the stream
*/
def toList: Task[List[Return]] = {
def loop(stream: Stream[Return], acc: List[Return]): Pull[List[Return]] = {
stream.pull.flatMap {
case Some((head, tail)) => Pull.suspend(loop(tail, acc :+ head))
case None => Pull.pure(acc)
}
}
loop(this, List.empty).toTask
}

/**
* Counts the number of elements in the stream and fully evaluates it.
*
* @return a `Task[Int]` representing the total number of entries evaluated
*/
def count: Task[Int] = {
def loop(stream: Stream[Return], acc: Int): Pull[Int] = {
stream.pull.flatMap {
case Some((_, tail)) => Pull.suspend(loop(tail, acc + 1))
case None => Pull.pure(acc)
}
}
loop(this, 0).toTask
}
}
}*/

object Stream {
/**
Expand All @@ -186,19 +132,15 @@ object Stream {
* @tparam Return the type of the value
* @return a new stream that emits the value
*/
def emit[Return](value: Return): Stream[Return] = new Stream[Return] {
def pull: Pull[Option[(Return, Stream[Return])]] = Pull.pure(Some(value -> empty))
}
def emit[Return](value: Return): Stream[Return] = new Stream[Return](Task.pure(List(value).iterator))

/**
* Creates an empty stream.
*
* @tparam Return the type of the values in the stream
* @return a new empty stream
*/
def empty[Return]: Stream[Return] = new Stream[Return] {
def pull: Pull[Option[(Return, Stream[Return])]] = Pull.pure(None)
}
def empty[Return]: Stream[Return] = new Stream[Return](Task.pure(Nil.iterator))

/**
* Creates a stream from a list of values.
Expand Down
Loading

0 comments on commit ce59072

Please sign in to comment.