-
Notifications
You must be signed in to change notification settings - Fork 607
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
New topic #2252
New topic #2252
Conversation
* Topic has built-in back-pressure support implemented as a maximum | ||
* bound (`maxQueued`) that a subscriber is allowed to enqueue. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
* Topic has built-in back-pressure support implemented as a maximum | |
* bound (`maxQueued`) that a subscriber is allowed to enqueue. | |
* Topic has built-in back-pressure support implemented as a maximum | |
* number (`maxQueued`) of requests that a subscriber is allowed to enqueue. |
* | ||
* Additionally the subscriber has possibility to terminate whenever size of enqueued elements is over certain size | ||
* by using `subscribeSize`. | ||
* Once that bound is hit, publishing may semantically block until |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
* Once that bound is hit, publishing may semantically block until | |
* Once that bound is reached, any publishing action will be semantically blocked, |
subs.foldLeft(F.unit) { case (op, (_, q)) => | ||
op >> q.offer(a) | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I may be approaching this from afar, but is this a case of traverse_
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it is traverse_
, but there is no Traverse[LongMap]
* If any of the subscribers is over the `maxQueued` limit, this | ||
* will wait to complete until that subscriber processes enough of | ||
* its elements such that `a` is enqueued. | ||
* |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
* If any of the subscribers is over the `maxQueued` limit, this | |
* will wait to complete until that subscriber processes enough of | |
* its elements such that `a` is enqueued. | |
* | |
* This `F` operation deos not complete until after the | |
* given element has been enqued on all subscribers. | |
* If enqueuing on any subscriber is semantically blocked | |
* (because that subscriber is on its `maxQueued` limit), | |
* then this operation will also be blocked, waiting on that | |
* subscriber to dequeue an element. |
One further clarification: does the whole Topic
ensure that there is at most one call to publish1
at a given point?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
no, you can have multiple producers on a Topic, although I imagine it to be rarer
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In that case, is there a risk that different subscribers may receive messages from different producers on a different order? Curious to learn what the guarantees are, from a "message delivery" PoV.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, there is (and there's always been). I think fixing it would imply either enforcing single producer, if you still want full broadcast, or a KeyedTopic to do sharding a la Kafka
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@mpilquist wdyt about this? Should we enforce a single producer, or just document it and it's up to the user?
.map { pubSub => | ||
/** Constructs a Topic */ | ||
def apply[F[_], A](implicit F: Concurrent[F]): F[Topic[F, A]] = | ||
F.ref(LongMap.empty[Q[F, A]] -> 1L) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@mpilquist I don't think we will have issues here, Long is very big (even though we only use half of them), but I'd welcome a second opinion.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No concerns from me
Document it only, we haven’t seen this cause issues over last 5 years
…Sent from my iPhone
On Feb 7, 2021, at 11:53 AM, Fabio Labella ***@***.***> wrote:
@SystemFw commented on this pull request.
In core/shared/src/main/scala/fs2/concurrent/Topic.scala:
> + * If any of the subscribers is over the `maxQueued` limit, this
+ * will wait to complete until that subscriber processes enough of
+ * its elements such that `a` is enqueued.
+ *
@mpilquist wdyt about this? Should we enforce a single producer, or just document it and it's up to the user?
—
You are receiving this because you were mentioned.
Reply to this email directly, view it on GitHub, or unsubscribe.
|
No description provided.