Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Broadcast #2312

Merged
merged 34 commits into from
Mar 2, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
34 commits
Select commit Hold shift + click to select a range
6e5e994
Example of scoping subscriptions properly
SystemFw Feb 21, 2021
f41270b
Add more concise alternative for subscriber resource pattern
SystemFw Feb 22, 2021
5ab0586
Scoping only solution to awaiting subscribers invalidated
SystemFw Feb 22, 2021
d9e7de9
Add solution with better characteristics wrt to internal interruption
SystemFw Feb 23, 2021
f4db84c
Move to Stream for the subscription logic
SystemFw Feb 23, 2021
a609807
A more complex stream completely breaks scoping
SystemFw Feb 23, 2021
aeba528
Interruption case is covered by the proposed solution
SystemFw Feb 23, 2021
46bb50f
Add Barrier based solution to broadcasting problem
SystemFw Feb 26, 2021
34d52c3
Isolate bug with Stream.resource
SystemFw Feb 26, 2021
ea61eff
Add provisional implementation of broadcastThrough with Topic
SystemFw Feb 26, 2021
4b6c3a8
Revert "Add provisional implementation of broadcastThrough with Topic"
SystemFw Feb 26, 2021
b8c21a4
Organise example file
SystemFw Feb 26, 2021
48ece04
Working prototype of broadcastThrough with Topic
SystemFw Feb 27, 2021
4ba81fd
Sketch of Scope bug
SystemFw Feb 27, 2021
8aa5efb
Easier encoding of interruptible resource trick
SystemFw Feb 27, 2021
3b55a0e
Remove broadcastTo overload that just replicates data
SystemFw Mar 1, 2021
4d3484a
Remove the broadcastThrough overload that just replicates data
SystemFw Mar 1, 2021
050a5e5
Remove broadcastTo, it's not redundant given the shape of Sinks
SystemFw Mar 1, 2021
1910964
Rework scaladoc of broadcastThrough
SystemFw Mar 1, 2021
1fbac85
Remove some noise from experimentation playground
SystemFw Mar 2, 2021
dc3c966
Add example of deadlock upon pipe cancelation
SystemFw Mar 2, 2021
dbc7c26
Add new implementation of broadcastThrough
SystemFw Mar 2, 2021
b06a3d3
Remove evidence of bug with Stream.resource, already handled
SystemFw Mar 2, 2021
08fe32a
Add example of interruption bug with Topic
SystemFw Mar 2, 2021
77c9fab
Fix Topic bug that leads to deadlock after subscriber unpublishes
SystemFw Mar 2, 2021
3108a9e
broadcastThrough is now safe when a pipe interrupts
SystemFw Mar 2, 2021
b8707eb
Remove Pipe.join
SystemFw Mar 2, 2021
e86ad5d
remove broadcast
SystemFw Mar 2, 2021
2dee97f
Remove Broadcast
SystemFw Mar 2, 2021
3d9a7ac
Delete playground
SystemFw Mar 2, 2021
bbf6e1c
Formatting
SystemFw Mar 2, 2021
1d1b250
Fix compilation on 2.12
SystemFw Mar 2, 2021
b7646ad
Factor out some definitions in broadcast
SystemFw Mar 2, 2021
3f15c78
Cleanup of drained queue in topic
SystemFw Mar 2, 2021
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 0 additions & 36 deletions core/shared/src/main/scala/fs2/Pipe.scala

This file was deleted.

91 changes: 41 additions & 50 deletions core/shared/src/main/scala/fs2/Stream.scala
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ import cats.{Eval => _, _}
import cats.data.Ior
import cats.effect.SyncIO
import cats.effect.kernel._
import cats.effect.std.{Queue, Semaphore}
import cats.effect.std.{CountDownLatch, Queue, Semaphore}
import cats.effect.kernel.implicits._
import cats.implicits.{catsSyntaxEither => _, _}

Expand Down Expand Up @@ -202,62 +202,53 @@ final class Stream[+F[_], +O] private[fs2] (private[fs2] val underlying: Pull[F,
): Stream[F2, Either[Throwable, O]] =
attempt ++ delays.flatMap(delay => Stream.sleep_(delay) ++ attempt)

/** Returns a stream of streams where each inner stream sees all elements of the
* source stream (after the inner stream has started evaluation).
* For example, `src.broadcast.take(2)` results in two
* inner streams, each of which see every element of the source.
/** Broadcasts every value of the stream through the pipes provided
* as arguments.
*
* Alias for `through(Broadcast(1))`./
*/
def broadcast[F2[x] >: F[x]: Concurrent]: Stream[F2, Stream[F2, O]] =
through(Broadcast(1))

/** Like [[broadcast]] but instead of providing a stream of sources, runs each pipe.
*
* The pipes are run concurrently with each other. Hence, the parallelism factor is equal
* to the number of pipes.
* Each pipe may have a different implementation, if required; for example one pipe may
* process elements while another may send elements for processing to another machine.
* Each pipe can have a different implementation if required, and
* they are all guaranteed to see every `O` pulled from the source
* stream.
*
* Each pipe is guaranteed to see all `O` pulled from the source stream, unlike `broadcast`,
* where workers see only the elements after the start of each worker evaluation.
*
* Note: the resulting stream will not emit values, even if the pipes do.
* If you need to emit `Unit` values, consider using `broadcastThrough`.
* The pipes are all run concurrently with each other, but note
* that elements are pulled from the source as chunks, and the next
* chunk is pulled only when all pipes are done with processing the
* current chunk, which prevents faster pipes from getting too far ahead.
*
* Note: Elements are pulled as chunks from the source and the next chunk is pulled when all
* workers are done with processing the current chunk. This behaviour may slow down processing
* of incoming chunks by faster workers.
* If this is not desired, consider using the `prefetch` and `prefetchN` combinators on workers
* to compensate for slower workers.
*
* @param pipes Pipes that will concurrently process the work.
*/
def broadcastTo[F2[x] >: F[x]: Concurrent](
pipes: Pipe[F2, O, Nothing]*
): Stream[F2, INothing] =
this.through(Broadcast.through(pipes: _*))

/** Variant of `broadcastTo` that broadcasts to `maxConcurrent` instances of a single pipe.
*/
def broadcastTo[F2[x] >: F[x]: Concurrent](
maxConcurrent: Int
)(pipe: Pipe[F2, O, Nothing]): Stream[F2, INothing] =
this.broadcastTo[F2](List.fill(maxConcurrent)(pipe): _*)

/** Alias for `through(Broadcast.through(pipes))`.
* In other words, this behaviour slows down processing of incoming
* chunks by faster pipes until the slower ones have caught up. If
* this is not desired, consider using the `prefetch` and
* `prefetchN` combinators on the slow pipes.
*/
def broadcastThrough[F2[x] >: F[x]: Concurrent, O2](
pipes: Pipe[F2, O, O2]*
): Stream[F2, O2] =
through(Broadcast.through(pipes: _*))

/** Variant of `broadcastTo` that broadcasts to `maxConcurrent` instances of the supplied pipe.
*/
def broadcastThrough[F2[x] >: F[x]: Concurrent, O2](
maxConcurrent: Int
)(pipe: Pipe[F2, O, O2]): Stream[F2, O2] =
this.broadcastThrough[F2, O2](List.fill(maxConcurrent)(pipe): _*)
Stream
.eval {
(
CountDownLatch[F2](pipes.length),
fs2.concurrent.Topic[F2, Option[Chunk[O]]]
).tupled
}
.flatMap { case (latch, topic) =>
def produce = chunks.noneTerminate.through(topic.publish)

def consume(pipe: Pipe[F2, O, O2]): Pipe[F2, Option[Chunk[O]], O2] =
_.unNoneTerminate.flatMap(Stream.chunk).through(pipe)

Stream(pipes: _*)
.map { pipe =>
Stream
.resource(topic.subscribeAwait(1))
.flatMap { sub =>
// crucial that awaiting on the latch is not passed to
// the pipe, so that the pipe cannot interrupt it and alter
// the latch count
Stream.exec(latch.release >> latch.await) ++ sub.through(consume(pipe))
}
}
.parJoinUnbounded
.concurrently(Stream.eval(latch.await) ++ produce)
}

/** Behaves like the identity function, but requests `n` elements at a time from the input.
*
Expand Down
186 changes: 0 additions & 186 deletions core/shared/src/main/scala/fs2/concurrent/Broadcast.scala

This file was deleted.

16 changes: 13 additions & 3 deletions core/shared/src/main/scala/fs2/concurrent/Topic.scala
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,7 @@ object Topic {
.product(SignallingRef[F, Int](0))
.map { case (state, subscriberCount) =>
new Topic[F, A] {

def publish1(a: A): F[Unit] =
state.get.flatMap { case (subs, _) =>
subs.foldLeft(F.unit) { case (op, (_, q)) =>
Expand All @@ -124,9 +125,18 @@ object Topic {
} <* subscriberCount.update(_ + 1)

def unsubscribe(id: Long) =
state.update { case (subs, nextId) =>
(subs - id, nextId)
} >> subscriberCount.update(_ - 1)
state.modify { case (subs, nextId) =>
// _After_ we remove the bounded queue for this
// subscriber, we need to drain it to unblock to
// publish loop which might have already enqueued
// something.
def drainQueue: F[Unit] =
subs.get(id).traverse_ { q =>
q.tryTake.iterateUntil(_.isEmpty)
}

(subs - id, nextId) -> drainQueue
}.flatten >> subscriberCount.update(_ - 1)

Resource
.make(subscribe)(unsubscribe)
Expand Down
Loading