-
Notifications
You must be signed in to change notification settings - Fork 607
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Broadcast #2312
Broadcast #2312
Changes from 32 commits
6e5e994
f41270b
5ab0586
d9e7de9
f4db84c
a609807
aeba528
46bb50f
34d52c3
ea61eff
4b6c3a8
b8c21a4
48ece04
4ba81fd
8aa5efb
3b55a0e
4d3484a
050a5e5
1910964
1fbac85
dc3c966
dbc7c26
b06a3d3
08fe32a
77c9fab
3108a9e
b8707eb
e86ad5d
2dee97f
3d9a7ac
bbf6e1c
1d1b250
b7646ad
3f15c78
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
This file was deleted.
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -30,7 +30,7 @@ import cats.{Eval => _, _} | |
import cats.data.Ior | ||
import cats.effect.SyncIO | ||
import cats.effect.kernel._ | ||
import cats.effect.std.{Queue, Semaphore} | ||
import cats.effect.std.{CountDownLatch, Queue, Semaphore} | ||
import cats.effect.kernel.implicits._ | ||
import cats.implicits.{catsSyntaxEither => _, _} | ||
|
||
|
@@ -202,62 +202,52 @@ final class Stream[+F[_], +O] private[fs2] (private[fs2] val underlying: Pull[F, | |
): Stream[F2, Either[Throwable, O]] = | ||
attempt ++ delays.flatMap(delay => Stream.sleep_(delay) ++ attempt) | ||
|
||
/** Returns a stream of streams where each inner stream sees all elements of the | ||
* source stream (after the inner stream has started evaluation). | ||
* For example, `src.broadcast.take(2)` results in two | ||
* inner streams, each of which see every element of the source. | ||
/** Broadcasts every value of the stream through the pipes provided | ||
* as arguments. | ||
* | ||
* Alias for `through(Broadcast(1))`./ | ||
*/ | ||
def broadcast[F2[x] >: F[x]: Concurrent]: Stream[F2, Stream[F2, O]] = | ||
through(Broadcast(1)) | ||
|
||
/** Like [[broadcast]] but instead of providing a stream of sources, runs each pipe. | ||
* | ||
* The pipes are run concurrently with each other. Hence, the parallelism factor is equal | ||
* to the number of pipes. | ||
* Each pipe may have a different implementation, if required; for example one pipe may | ||
* process elements while another may send elements for processing to another machine. | ||
* Each pipe can have a different implementation if required, and | ||
* they are all guaranteed to see every `O` pulled from the source | ||
* stream. | ||
* | ||
* Each pipe is guaranteed to see all `O` pulled from the source stream, unlike `broadcast`, | ||
* where workers see only the elements after the start of each worker evaluation. | ||
* | ||
* Note: the resulting stream will not emit values, even if the pipes do. | ||
* If you need to emit `Unit` values, consider using `broadcastThrough`. | ||
* The pipes are all run concurrently with each other, but note | ||
* that elements are pulled from the source as chunks, and the next | ||
* chunk is pulled only when all pipes are done with processing the | ||
* current chunk, which prevents faster pipes from getting too far ahead. | ||
* | ||
* Note: Elements are pulled as chunks from the source and the next chunk is pulled when all | ||
* workers are done with processing the current chunk. This behaviour may slow down processing | ||
* of incoming chunks by faster workers. | ||
* If this is not desired, consider using the `prefetch` and `prefetchN` combinators on workers | ||
* to compensate for slower workers. | ||
* | ||
* @param pipes Pipes that will concurrently process the work. | ||
*/ | ||
def broadcastTo[F2[x] >: F[x]: Concurrent]( | ||
pipes: Pipe[F2, O, Nothing]* | ||
): Stream[F2, INothing] = | ||
this.through(Broadcast.through(pipes: _*)) | ||
|
||
/** Variant of `broadcastTo` that broadcasts to `maxConcurrent` instances of a single pipe. | ||
*/ | ||
def broadcastTo[F2[x] >: F[x]: Concurrent]( | ||
maxConcurrent: Int | ||
)(pipe: Pipe[F2, O, Nothing]): Stream[F2, INothing] = | ||
this.broadcastTo[F2](List.fill(maxConcurrent)(pipe): _*) | ||
|
||
/** Alias for `through(Broadcast.through(pipes))`. | ||
* In other words, this behaviour slows down processing of incoming | ||
* chunks by faster pipes until the slower ones have caught up. If | ||
* this is not desired, consider using the `prefetch` and | ||
* `prefetchN` combinators on the slow pipes. | ||
*/ | ||
def broadcastThrough[F2[x] >: F[x]: Concurrent, O2]( | ||
pipes: Pipe[F2, O, O2]* | ||
): Stream[F2, O2] = | ||
through(Broadcast.through(pipes: _*)) | ||
|
||
/** Variant of `broadcastTo` that broadcasts to `maxConcurrent` instances of the supplied pipe. | ||
*/ | ||
def broadcastThrough[F2[x] >: F[x]: Concurrent, O2]( | ||
maxConcurrent: Int | ||
)(pipe: Pipe[F2, O, O2]): Stream[F2, O2] = | ||
this.broadcastThrough[F2, O2](List.fill(maxConcurrent)(pipe): _*) | ||
Stream | ||
.eval { | ||
( | ||
CountDownLatch[F2](pipes.length), | ||
fs2.concurrent.Topic[F2, Option[Chunk[O]]] | ||
).tupled | ||
} | ||
.flatMap { case (latch, topic) => | ||
Stream(pipes: _*) | ||
.map { pipe => | ||
Stream | ||
.resource(topic.subscribeAwait(1)) | ||
.flatMap { sub => | ||
// crucial that awaiting on the latch is not passed to | ||
// the pipe, so that the pipe cannot interrupt it and alter | ||
// the latch count | ||
Stream.exec(latch.release >> latch.await) ++ | ||
sub.unNoneTerminate.flatMap(Stream.chunk).through(pipe) | ||
} | ||
} | ||
.parJoinUnbounded | ||
.concurrently { | ||
Stream.eval(latch.await) ++ | ||
chunks.noneTerminate.through(topic.publish) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Could we extract this as a value There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I've factored out the |
||
} | ||
} | ||
|
||
/** Behaves like the identity function, but requests `n` elements at a time from the input. | ||
* | ||
|
This file was deleted.
Original file line number | Diff line number | Diff line change | ||||||||||
---|---|---|---|---|---|---|---|---|---|---|---|---|
|
@@ -108,6 +108,7 @@ object Topic { | |||||||||||
.product(SignallingRef[F, Int](0)) | ||||||||||||
.map { case (state, subscriberCount) => | ||||||||||||
new Topic[F, A] { | ||||||||||||
|
||||||||||||
def publish1(a: A): F[Unit] = | ||||||||||||
state.get.flatMap { case (subs, _) => | ||||||||||||
subs.foldLeft(F.unit) { case (op, (_, q)) => | ||||||||||||
|
@@ -124,9 +125,21 @@ object Topic { | |||||||||||
} <* subscriberCount.update(_ + 1) | ||||||||||||
|
||||||||||||
def unsubscribe(id: Long) = | ||||||||||||
state.update { case (subs, nextId) => | ||||||||||||
(subs - id, nextId) | ||||||||||||
} >> subscriberCount.update(_ - 1) | ||||||||||||
state.modify { case (subs, nextId) => | ||||||||||||
// _After_ we remove the bounded queue for this | ||||||||||||
// subscriber, we need to drain it to unblock to | ||||||||||||
// publish loop which might have already enqueued | ||||||||||||
// something. | ||||||||||||
def drainQueue: F[Unit] = | ||||||||||||
subs.get(id).traverse_ { q => | ||||||||||||
q.tryTake.flatMap { | ||||||||||||
case None => F.unit | ||||||||||||
case Some(_) => drainQueue | ||||||||||||
} | ||||||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Perhaps another traverse?
Suggested change
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Used |
||||||||||||
} | ||||||||||||
|
||||||||||||
(subs - id, nextId) -> drainQueue | ||||||||||||
}.flatten >> subscriberCount.update(_ - 1) | ||||||||||||
|
||||||||||||
Resource | ||||||||||||
.make(subscribe)(unsubscribe) | ||||||||||||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could we extract this lambda block (the one used int the
map { pipe =>
) as a functiondef pump(pipe: Pipe[F2, O, O2])
?There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm factored out the
unNoneTerminate...
stuff, but I want to keep the waiting logic all together, so it's easy to understand how theparJoin
, thelatch
and theResource
cooperate