Skip to content

Commit

Permalink
Stream - Extend documentation of broadcastThrough.
Browse files Browse the repository at this point in the history
  • Loading branch information
diesalbla committed Sep 19, 2021
1 parent eaba366 commit 0e9d92d
Showing 1 changed file with 22 additions and 12 deletions.
34 changes: 22 additions & 12 deletions core/shared/src/main/scala/fs2/Stream.scala
Original file line number Diff line number Diff line change
Expand Up @@ -203,22 +203,32 @@ final class Stream[+F[_], +O] private[fs2] (private[fs2] val underlying: Pull[F,
): Stream[F2, Either[Throwable, O]] =
attempt ++ delays.flatMap(delay => Stream.sleep_(delay) ++ attempt)

/** Broadcasts every value of the stream through the pipes provided
* as arguments.
/** Broadcasts the values from this stream through all the given pipes,
* which process those values in parallel, and coordinates the progress.
* Each pipe sees every value emitted from `this` stream (the source).
*
* Each pipe can have a different implementation if required, and
* they are all guaranteed to see every `O` pulled from the source
* stream.
*
* The pipes are all run concurrently with each other, but note
* that elements are pulled from the source as chunks, and the next
* chunk is pulled only when all pipes are done with processing the
* current chunk, which prevents faster pipes from getting too far ahead.
* Input values are pulled from `this` stream, and passed to each pipe,
* _one chunk at a time_. All pipes are fed the chunk at same time,
* and they are run concurrently while processing the chunk.
* However, the next source chunk is not pulled, or given to the pipes,
* until and unless all pipes are done processing the current chunk.
* This prevents faster pipes from getting too far ahead.
*
* In other words, this behaviour slows down processing of incoming
* chunks by faster pipes until the slower ones have caught up. If
* this is not desired, consider using the `prefetch` and
* chunks by faster pipes until the slower ones have caught up.
* If this is not desired, consider using the `prefetch` and
* `prefetchN` combinators on the slow pipes.
*
* Any error raised from the input stream, or from any pipe,
* will stop the pulling from `this` stream and from any pipe,
* and the error will be raised by the resulting stream.
*
* **Output**: the resulting stream collects and emits the output chunks
* all pipes, mixed in an undertermined way. The only guarantees are that
* 1. each output chunk was emitted by one pipe exactly once.
* 2. chunks from each pipe come out from the mixed output in the
* same order they came out of the pipe, without jumps, but
* may be interleaved with chunks from other pipes.
*/
def broadcastThrough[F2[x] >: F[x]: Concurrent, O2](
pipes: Pipe[F2, O, O2]*
Expand Down

0 comments on commit 0e9d92d

Please sign in to comment.