From 38fe6a52d9c7b02865da382d2aaaa634e07e8f9c Mon Sep 17 00:00:00 2001 From: Matt Hicks Date: Wed, 18 Dec 2024 15:06:55 -0600 Subject: [PATCH] Updated parallel stream support to support ordered flag --- .../src/main/scala/rapid/ParallelStream.scala | 35 +++++++++---------- core/shared/src/main/scala/rapid/Stream.scala | 6 ++-- 2 files changed, 21 insertions(+), 20 deletions(-) diff --git a/core/shared/src/main/scala/rapid/ParallelStream.scala b/core/shared/src/main/scala/rapid/ParallelStream.scala index 323f2ba..1f0cc43 100644 --- a/core/shared/src/main/scala/rapid/ParallelStream.scala +++ b/core/shared/src/main/scala/rapid/ParallelStream.scala @@ -1,44 +1,43 @@ package rapid -import java.util.concurrent.ConcurrentLinkedQueue -import java.util.concurrent.atomic.AtomicInteger -import scala.annotation.tailrec import scala.collection.mutable.ListBuffer -import scala.concurrent.duration.DurationInt case class ParallelStream[T, R](stream: Stream[T], f: T => Task[R], maxThreads: Int, - maxBuffer: Int) { + maxBuffer: Int, + ordered: Boolean) { def drain: Task[Unit] = Task.unit.flatMap { _ => val completable = Task.completable[Unit] - ParallelUnorderedStreamProcessor( - stream = this, - handle = (_: R) => (), - complete = (_: Int) => completable.success(()) - ) + compile(_ => (), _ => completable.success(())) completable } def count: Task[Int] = Task.unit.flatMap { _ => val completable = Task.completable[Int] - ParallelUnorderedStreamProcessor( - stream = this, - handle = (_: R) => (), - complete = completable.success - ) + compile(_ => (), completable.success) completable } def toList: Task[List[R]] = Task.unit.flatMap { _ => val list = ListBuffer.empty[R] val completable = Task.completable[List[R]] + compile(list.addOne, _ => completable.success(list.toList)) + completable + } + + protected def compile(handle: R => Unit, complete: Int => Unit): Unit = if (ordered) { + ParallelStreamProcessor( + stream = this, + handle = handle, + complete = complete + ) + } else { ParallelUnorderedStreamProcessor( stream = this, - handle = list.addOne, - complete = (_: Int) => completable.success(list.toList) + handle = handle, + complete = complete ) - completable } } diff --git a/core/shared/src/main/scala/rapid/Stream.scala b/core/shared/src/main/scala/rapid/Stream.scala index bef1f80..48ada26 100644 --- a/core/shared/src/main/scala/rapid/Stream.scala +++ b/core/shared/src/main/scala/rapid/Stream.scala @@ -115,12 +115,14 @@ class Stream[Return](private val task: Task[Iterator[Return]]) extends AnyVal { def count: Task[Int] = task.map(_.size) def par[R](maxThreads: Int = ParallelStream.DefaultMaxThreads, - maxBuffer: Int = ParallelStream.DefaultMaxBuffer) + maxBuffer: Int = ParallelStream.DefaultMaxBuffer, + ordered: Boolean = false) (f: Return => Task[R]): ParallelStream[Return, R] = ParallelStream( stream = this, f = f, maxThreads = maxThreads, - maxBuffer = maxBuffer + maxBuffer = maxBuffer, + ordered = ordered ) }