From 0e9d92d49eebf23b4e2bff40fd2dbbd1bb506688 Mon Sep 17 00:00:00 2001 From: "Diego E. Alonso Blas" Date: Sun, 19 Sep 2021 20:52:52 +0100 Subject: [PATCH] Stream - Extend documentation of broadcastThrough. --- core/shared/src/main/scala/fs2/Stream.scala | 34 +++++++++++++-------- 1 file changed, 22 insertions(+), 12 deletions(-) diff --git a/core/shared/src/main/scala/fs2/Stream.scala b/core/shared/src/main/scala/fs2/Stream.scala index 4a4f2e896e..cca81cbe0c 100644 --- a/core/shared/src/main/scala/fs2/Stream.scala +++ b/core/shared/src/main/scala/fs2/Stream.scala @@ -203,22 +203,32 @@ final class Stream[+F[_], +O] private[fs2] (private[fs2] val underlying: Pull[F, ): Stream[F2, Either[Throwable, O]] = attempt ++ delays.flatMap(delay => Stream.sleep_(delay) ++ attempt) - /** Broadcasts every value of the stream through the pipes provided - * as arguments. + /** Broadcasts the values from this stream through all the given pipes, + * which process those values in parallel, and coordinates the progress. + * Each pipe sees every value emitted from `this` stream (the source). * - * Each pipe can have a different implementation if required, and - * they are all guaranteed to see every `O` pulled from the source - * stream. - * - * The pipes are all run concurrently with each other, but note - * that elements are pulled from the source as chunks, and the next - * chunk is pulled only when all pipes are done with processing the - * current chunk, which prevents faster pipes from getting too far ahead. + * Input values are pulled from `this` stream, and passed to each pipe, + * _one chunk at a time_. All pipes are fed the chunk at same time, + * and they are run concurrently while processing the chunk. + * However, the next source chunk is not pulled, or given to the pipes, + * until and unless all pipes are done processing the current chunk. + * This prevents faster pipes from getting too far ahead. * * In other words, this behaviour slows down processing of incoming - * chunks by faster pipes until the slower ones have caught up. If - * this is not desired, consider using the `prefetch` and + * chunks by faster pipes until the slower ones have caught up. + * If this is not desired, consider using the `prefetch` and * `prefetchN` combinators on the slow pipes. + * + * Any error raised from the input stream, or from any pipe, + * will stop the pulling from `this` stream and from any pipe, + * and the error will be raised by the resulting stream. + * + * **Output**: the resulting stream collects and emits the output chunks + * all pipes, mixed in an undertermined way. The only guarantees are that + * 1. each output chunk was emitted by one pipe exactly once. + * 2. chunks from each pipe come out from the mixed output in the + * same order they came out of the pipe, without jumps, but + * may be interleaved with chunks from other pipes. */ def broadcastThrough[F2[x] >: F[x]: Concurrent, O2]( pipes: Pipe[F2, O, O2]*