-
Notifications
You must be signed in to change notification settings - Fork 607
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Broadcast #2312
Broadcast #2312
Conversation
This reverts commit dd9ed60.
Note: Latch gets completed correctly
At the moment there is deadlock if one of the pipes interrupts, probably the bug is in Topic or even Queue
Stream | ||
.resource(topic.subscribeAwait(1)) | ||
.flatMap { sub => | ||
// crucial that awaiting on the latch is not passed to | ||
// the pipe, so that the pipe cannot interrupt it and alter | ||
// the latch count | ||
Stream.exec(latch.release >> latch.await) ++ | ||
sub.unNoneTerminate.flatMap(Stream.chunk).through(pipe) | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could we extract this lambda block (the one used int the map { pipe =>
) as a function def pump(pipe: Pipe[F2, O, O2])
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm factored out the unNoneTerminate...
stuff, but I want to keep the waiting logic all together, so it's easy to understand how the parJoin
, the latch
and the Resource
cooperate
Stream.eval(latch.await) ++ | ||
chunks.noneTerminate.through(topic.publish) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could we extract this as a value backgroundPublish
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I've factored out the chunks.noneTerminate
stuff, I want to keep the await
logic where it is
q.tryTake.flatMap { | ||
case None => F.unit | ||
case Some(_) => drainQueue | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Perhaps another traverse?
q.tryTake.flatMap { | |
case None => F.unit | |
case Some(_) => drainQueue | |
} | |
q.tryTake.flatMap(_.traverse_(_ => drainQueue)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Used iterateUntil
, which avoid redoing the get
as well
This PR revisits the broadcasting abstractions in fs2:
.broadcast
is removed. In its raw form, this method is almost impossible to use safely, and depends on subtle scoping details (e.g.chunkN
works,foldMap(List(_))
) doesn't, and at the same time it's not flexible enough to encode fully dynamic subscribers likeTopic
is.Topic
is recommended for cases where you might have used.broadcast
. If you managed to get that to work, that is.broadcastThrough
, which is safe and easy to use. It has been reimplemented using Topic.broadcast*
overloads have even removedBroadcast
has been removed.