-
Notifications
You must be signed in to change notification settings - Fork 181
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Pausable channels #2667
Pausable channels #2667
Conversation
899f83f
to
e8758a7
Compare
import io.smallrye.mutiny.subscription.MultiSubscriber; | ||
import io.smallrye.reactive.messaging.PausableChannel; | ||
|
||
public class PausableMulti<T> extends MultiOperator<T, T> implements PausableChannel { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Would like to have @jponge review on this one.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'll check that
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Something that could be done is passing that operator through the Reactive Streams (Flow) TCK. I don't think there's anything incorrect in that implementation, but you know... 🤣
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks good, I just have a question on unbounded subscriptions and the expectations on it.
...g-provider/src/main/java/io/smallrye/reactive/messaging/providers/helpers/PausableMulti.java
Show resolved
Hide resolved
} | ||
|
||
@Override | ||
public void request(long numberOfItems) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What happens when Long.MAX
value is requested? (unbounded subscription)
My understanding is that the pausing mechanism delays upstream requests when paused (aka a kind of passive pausing).
Also this should be documented as per reactive streams expectations.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It is much like demand capping. pause/resume won't work if the subscription is unbounded.
How would you define active pausing? To buffer items until resumed?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, but TBH in this case what you have totally makes sense. Just make sure you document the case of unbounded requests, or even "large" requests 😉 (e.g., Long.MAX_VALUE - 100
)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Also, we could've contributed this operator to Mutiny itself, but I couldn't find a Mutiny-like API to give the control of pause/resume. Would you've any ideas on that ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We'd need an extended subscription to support these additional operations I guess.
import io.smallrye.mutiny.subscription.MultiSubscriber; | ||
import io.smallrye.reactive.messaging.PausableChannel; | ||
|
||
public class PausableMulti<T> extends MultiOperator<T, T> implements PausableChannel { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Something that could be done is passing that operator through the Reactive Streams (Flow) TCK. I don't think there's anything incorrect in that implementation, but you know... 🤣
e8758a7
to
02293ec
Compare
Pushed a small change to pass the flow publisher tck. |
02293ec
to
3be8cab
Compare
``` | ||
|
||
!!!warning | ||
Pausable channels only work with back-pressure aware subscribers, with bounded downstream requests. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
👍
Adds a new decorator for pausing and accumulating downstream requests to connector channels.
A channel needs to set the
pausable
flag to true. The pause/resume is controlled by thePausableChannel
that is accessible fromChannelRegistry
by channel name.Closes #1649