-
Notifications
You must be signed in to change notification settings - Fork 7.6k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Backpressure #1000
Comments
Does the source |
The main issues that I have run into while trying to implement back-pressure involving continuations in rxcpp are; synchronization and additional complexity everywhere (it is not localized). Combined these really pessimize performance. I think that there is a way to localize the complexity behind three interfaces. Two already exist; The usage depends on the type of source. Hot ObservablesThe
Cold ObservablesThe Hot back-pressure policies are available by applying The additional policy, which only applies to Cold observables, is to stop the production of the next value until resumed. This does require a continuation. Fortunately, Cold Observables already create a continuation per value. Cold Observables take a To exploit this a new A new operator, that takes the Join OperatorsThese would change to accept Cold Exampleregulator<current_thread> r;
auto counter = range<int>(0, 100000000, 1, r.get_scheduler());
counter.skip(1).resume(r)
.zip(counter.resume(r), [](int lhs, int rhs){return std::make_pair(lhs, rhs);})
.subscribe([](std::pair<int, int> p){std::cout << p << std::endl;}); Hot Examplebutton_downs.resume_with_subscribe()
.zip(button_ups.resume_with_subscribe(),
[](button_down lhs, button_up rhs){return std::make_pair(lhs, rhs);})
.subscribe([](std::pair<button_down, button_up> p){std::cout << p << std::endl;}); |
No. It can continue to act just like right now. Supporting backpressure is optional. While discussing with @abersnaze and @headinthebox we contemplated using a minimum buffer size throughout the codebase (like 32, 64, or 128) so if an
What do you mean by this? If it's not set, there's nothing to trigger and the source |
@kirkshoop Thank you for getting involved in the conversation! I apologize that I don't fully understand the proposed implementation but I'll provide comments based on my interpretation. We initially implemented the The pause solution works very well within a single process where pausing can be fairly deterministic by sharing an atomic/volatile reference across thread boundaries. It does not work well across network boundaries though which is often the source of data requiring back pressure. It is far too non-deterministic to asynchronously request a source to "pause" and therefore still requires unbounded buffers or dropping data. Also, anything that is modeled after
This is very inefficient and exactly what we have sought to avoid in RxJava, particularly as of version 0.17 where the Also, this does not apply well to network requests, which are the source for most cold observables. Generally they are happening on a separate thread, or an event loop and the notifications are not rescheduled again on a separate thread. Nor do we use
What extra synchronization is required? The proposed solution (using continuations) does not change the threading model or introduce any extra use of schedulers/concurrency. I am concerned with performance impact for cases that don't require back pressure, but thus far because of the batch request approach which then allows async |
Okay I am caught up now. I was designing for a different system. :) I think that we agree that we are happy with the existing Rx behavior for Hot Observables and that this back-pressure discussion is for Cold Observables.
I disagree with this. The buffer can be bounded declaratively on the source operator. In fact, I argue that a declarative chunk-size and buffer-size, that is passed to
The buffer-size ensures that no data is dropped and that the size is bounded. A good minimum for the buffer-size is 1.5 * chunk-size. I still think that However, the buffer draining could be controlled with the current I think that in addition to chunk-size and buffer-size, there should be two scheduler arguments. One for the overlapped chunk requests and the other for the buffer draining.
Yes, this is very inefficient. :) It added complexity to the scheduler in rxcpp to control the perf issue while still allowing
The |
Agreed, but I don't understand how we would have It seems you are referring to having batch sizes when you say "the buffer can be bounded declaratively on the source operator" but then suggest we can work with just What we're trying to achieve is that when 'resume` is invoked (whatever it is called) that is passes along how much space is in the buffer for the upstream to send. Are you suggesting that only the IO boundary needs the
We attempted this in an early prototype and it became ridiculously complex because the co-routine state now needed to be thread-safe and handle concurrent execution. I'm open to it as I agree it would be good, but it's not clear yet how to make it simple to implement and I haven't spent enough time to try and determine alternative implementations. To understand what I mean, take a look at the final Iterator<? extends T> iter = is.iterator(); If we allowed overlapped requests in this example the producer could no longer use a simple iterator.
Since I'm obviously not quite understanding, can you elaborate more on what you mean by this and how it interacts with |
I am suggesting that a cold observable that supports back pressure has both a producer and a consumer inside. The producer generates requests for a chunk-size of items and the consumer stores a buffer-size of items.
Conceptually |
Thanks for explaining. I need to think about this more and trace it through the composition of My initial reaction though is that we shouldn't have both |
Me too - will you be at React?
I agree. Complexity of source operator implementationI was thinking more about the operator implementations in this design and it changed the design a bit.
Thus, I think that a declarative max-concurrency parameter to the producer is also required. To simplify the implementation of the operators supporting overlapped calls: Add a function The producer function calls
The producer will not overlap calls to The |
Yes I will be. If you're there we'll definitely need to meet. |
IMHO, both trait Observer[-T] {
def onNext(elem: T): Future[Unit]
def onError(ex: Throwable): Unit
def onComplete(): Unit
} So basically UPDATE: because I pulled this out of my arse, haven't thought about the implications - but the above doesn't solve the problem, still thinking about it. |
Hi @alexandru, thanks for getting involved. In early prototypes I tried making 'onNext' return 'Future' and it became unwieldy very quickly. However, it still feels like it may be a valid approach. I too am considering this more. As a side note, one practical concern is memory impact of allocating a future for every onNext. |
@benjchristensen you're right, it would be more heavy and I believe this would go against the design goals of Rx.NET / RxJava, as you'd have to make the whole abstraction to be entirely async. I personally prefer it that way, for the use-cases that are of concern to me, but it would make it impractical for certain use-cases for which Rx was designed for (the usual pick your poison applies). I'm exploring it for a bit, asking for feedback on the scala-user mailing list, so if you can tolerate Scala code and my strong opinions, here are some details I've thought about: https://groups.google.com/forum/#!topic/scala-user/ckgrXz_4F_A |
@benjchristensen in case you're interested, I have a proof of concept. https://github.com/alexandru/monifu/blob/e2c756a3fd5018c87159d9ae5a3f6ff1487aefc3/monifu-rx/src/shared/scala/monifu/rx/async/Observable.scala#L135 (link update Apr 11) The interesting difference is that the obvious Observable.fromTraversable(0 until 1000)
.filter(x => x % 5 == 0)
.flatMap(x => Observable.interval(1.second).take(5).map(y => x + y)) To implement |
Thanks @alexandru for this. I'm just catching up after traveling so I probably won't have time today to review this but I definitely will. I also had the chance to talk with the Akka team (Roland, Viktor, Jonas) and @kirkshoop from Microsoft working on RxCPP and got some interesting implementation ideas, so there are several things I need to prototype and report on here when I get through my backlog. |
@benjchristensen no rush, it's good to evaluate all options and I'd be glad if I could give you some ideas to make RxJava better. As I was saying, I'm not seeing you implementing my solution, because for it to work well it means to turn it into something that's maybe not Rx.net anymore - and maybe you got better ideas already. Btw, another idea I had as part of my toy project is to provide 2 implementations for Observers and Observables - one synchronous and one asynchronous, much like how Scala separates between immutable and mutable collections, since sometimes you can't have a one size fits all. The asynchronous observer would be: trait Observer[-T] {
def onNext(elem: T): Future[Ack]
def onError(ex: Throwable): Future[Unit]
def onCompleted(): Future[Unit]
} The synchronous observer would be: trait Observer[-T] {
def onNext(elem: T): Ack
def onError(ex: Throwable): Unit
def onCompleted(): Unit
} Where sealed trait Ack
object Ack {
case object Continue extends Ack
case object Stop extends Ack
} So you return But that's OK, because for async stuff, you've got the async version that I think will work much better. To be honest, I was a little afraid that it won't work before implementing Btw, I'm not sure how familiar you are with Scala's Future/Promise, but their design is really sweet and it's the reason for why my implementation was doable in a short amount of time. In case you'll look at my code and don't understand something, I'll be happy to reply. |
The work on this issue for supporting back pressure relates to interoperability work being done as part of http://www.reactive-streams.org which is "an initiative to provide a standard for asynchronous stream processing with non-blocking back pressure on the JVM." |
Discussion from #1001 (comment):
In what way is this considered a new way of coding? I also don't see it affecting all operators, only the ones that are async (like |
There are other complicated cases:
|
Agree with @akarnokd and @benjchristensen; adding back pressure leads to a rather different API with fewer operators, so I think it is better to consider it as a separate effort. |
Progress on developing this solution can be seen at #1382 |
Progress on this can now be seen in 0.20.0-RC1 being released today. |
Release notes for 0.20.0-RC1: https://github.com/Netflix/RxJava/releases/tag/0.20.0-RC1 |
I'm trying to sketch out some wiki documentation for this as well. But I'm https://github.com/Netflix/RxJava/wiki/Backpressure In particular, under what circumstances do creators of new observables and On Tue, Jul 15, 2014 at 10:30 AM, Ben Christensen [email protected]
David M. Gross |
@DavidMGross I'll need to spend more time on the documentation in the next week or two with you. |
0.20.0-RC2 successfully passed a 24 hour production canary on the Netflix API. Can anyone using Android please test 0.20.0-RC2 and report back on issues if any? In particular we need to make sure Any other issues related to 0.20 changes that need to be solved before releasing 0.20? |
@DavidMGross I have opened #1541 to track and discuss documentation. |
Marking this done. There are other open tasks to complete some operators, but the foundation of "reactive pull" backpressure is in place and functioning with major operators like |
@benjchristensen I was thinking one of possible solutions already exist and implemented in TCP flow control. Amount of data sent by producer is contolled by window size that is established dynamically based on the how fast consumer acknowledges the receipt and network condition (if anything not being acknowledged). So throttling is done dynamically based on the behavior of the consumer. Having said that considering that we do not need to wait for individual acknowledgement (as in TCP we considering unreliable network environment) all we need is to implement a counter that would simply add packet IDs when packets sent to consumer and reduce when packet acknowledged (so one accumulator instead of per individual packet Future). So if the number growing too fast meaning consumer is not able to consume fast enough. If the number stays near 0 - consumer either consuming too fast and we could send a bit more or we found equilibrium - rate when consumer is consuming at comfortable rate. By right both producers and consumer should be aware of this rate or we end up in situation when we have data growing in a stream buffer. This may sounds like a big bull... considering i am writing after consuming a considerable amount of alcohol on Sat night... Apologies in advance if I am interfering. :) |
We definitely thought about TCP sliding windowing when coming up with what back pressure for Rx would look like. I'm not sure we need an actual ID for each In addition to the speed of the producer and consumer the algorithm should also optimize for the producer latency (the time it takes from consumer calling |
@codetinkerhack Thanks for getting involved and your thoughts on this. You can read more about some of the design discussions that inspired this model on the ReactiveStreams project, including this issue: reactive-streams/reactive-streams-jvm#62 TCP windowing has been evaluated (as per the comment from @abersnaze who has been involved in this design, and in the ReactiveStreams project). Think of the A sophisticated consumer could dynamically size the requests, but so far in practice I have not seen a need for this, but I imagine a use case will arise at some point. Currently, the ability for
Whether or not performance gains can be achieved via dynamically changing the requested amount is still left to be proven. As for over network boundaries, TCP will not be replaced by this, but the |
The link from http://www.reactive-streams.org/ leads me here. Are there adapters for the org.reactivestreams.* types as part of this work? |
The adapter has not been created yet. I've been meaning to but haven't gotten to it yet. The intent is for there to be an rxjava-reactive-streams module inside rxjava-contrib. |
Hello! In the past couple of months I've put some time into developing a library for reactive/responsive programming based on a pull model called choice streams. I believe that you might find it interesting. Here is a little bit of prose on the idea: https://github.com/Hopac/Hopac/blob/master/Docs/Inverting.md I must apologise that the above document is somewhat out of date with respect to my improved understanding of Rx and contains a couple of incorrect assertions. And here is the current reference documentation on choice streams: http://hopac.github.io/Hopac/Hopac.html#def:type%20Hopac.Stream.Stream Summary: It turns out that it is quite possible to do responsive and reactive programming with a pull model. All you need is lightweight threads, lazy futures/promises and non-deterministic choice. Even operators like Please note that choice streams are still work-in-progress and, compared to Rx, there are a few combinators missing that I haven't yet had the time to implement, although I'm confident there will be no problems. I'm already using choice streams in an actual application for reactive/responsive programming and they have worked without problems. |
Hi @VesaKarvonen The Reactive Streams and Rx model supports both push and pull to achieve the backpressure. Adopting a completely pull model defeats the point of Rx as that would eliminate all of the data sources that are "hot" that fire events without regard for when we pull from them. A pull-based stream is an Iterable, even if it is an AsyncIterable. RxJava has created push/pull implementations of Your document also references closing over locks ... that does not happen in RxJava. It did in very early versions, and it may still do so in Rx.Net, but a lock is never held while emitting in RxJava. The Reactive Streams Spec is a good document with over a year of collaboration amongst several companies that is applicable to this topic: https://github.com/reactive-streams/reactive-streams#goals-design-and-scope |
Hi @benjchristensen,
I assume the "Reactive Streams" model refers to the specification in https://github.com/reactive-streams/reactive-streams What do you mean by the "Rx" model? If, by Rx, you refer essentially to the interfaces IObservable and IObserver of .Net, then I don't see how they could support the same push-pull model as Reactive Streams. Note: My point here is just to distinguish between the different models. In the rest of this reply, I will use "Reactive Streams" to refer to the referenced specification and "Rx" to refer to the .Net interfaces.
Here the definition of "completely pull model" is significant. Choice streams do support generating a stream based on push events. You probably understand this already, but let me elaborate on this. An ordinary singly linked list could be implemented using a pair of types that roughly looks like class List<X> {
volatile Cons<X> list;
}
class Cons<X> {
X head;
List<X> tail;
} in a Java style syntax. Suppose one has a method List<Button> clicks = new List<X>(); // clicks.list == null in that method by writing void OnClick(Button theButton) {
List<Button> newClicks = new List<Button>();
Cons<Button> newCons = new Cons<Button>();
newCons.head = theButton;
newCons.tail = newClicks;
clicks.list = newClicks;
} One can then observe the list being generated from another thread by following the List<X> cursor = clicks;
while (true) {
while ( cursor.list == null ) { /* wait */ }
System.out.println( cursor.list.head.toString() );
cursor = cursor.list.tail;
} Of course, the above sketch is an absolutely silly way to do things, but I hope that this example is simple enough to be understood. Now, the choice stream type is only a small change from the above class Stream<X> inherit Promise<Cons<X>> { } // Read: inherit = typedef
class Cons<X> {
X head;
Stream<X> tail;
} in a Java style syntax. The class Promise<X> {
Promise(); // Create a new unfulfilled promise
// ...
void fulfill(X value);
// ...
} Using these types, you can similarly generate a stream: Stream<Button> clicks = new Stream<Button>();
void OnClick(Button theButton) {
Stream<Button> newClicks = new Stream<Button>();
Cons<Button> newCons = new Cons<Button>();
newCons.head = theButton;
newCons.tail = newClicks;
clicks.fulfill(newCons);
clicks = newClicks;
} I hope this helps to understand the very simple model underlying choice streams and one way to generate such streams. Basically, choice streams are just linked lists of promises. A promise can be defined in a variety of ways (most of which I haven't talked about here), including as the result of a non-deterministic choice of two or more promises. What is interesting, IMO, is that such a simple model, measurably one to two orders of magnitude simpler (roughly 500 lines in choice streams vs 30k lines in RxJava or 50k lines in .Net Rx), is able to support very similar styles of programming as Rx. In my opinion, the comment in is quite telling. By comparison, many of the basic operations on choice streams are next to trivial to define. Here is let rec mergeSwap ls rs =
ls >>= function Nil -> rs
| Cons (l, ls) -> cons l (merge rs ls)
and merge ls rs = mergeSwap ls rs <|>* mergeSwap rs ls Also, compared to Rx, in particular, choice streams allow forms of composition that simply are not available in Rx. For example, the let mapJob f xs =
unfoldJob
(fun xs ->
xs >>= function Nil -> Job.result None
| Cons (x, xs) ->
f x |>> fun y -> Some (y, xs))
xs Note that the above is also an asynchronous version of I believe the Reactive Streams model supports similar forms of composition as choice streams, but at a huge increase in complexity.
Indeed! That is an interesting ability. In the choice stream model, as in the Rx model, there is no built-in mechanism for switching between push and pull models. The Rx model is fundamentally push and the choice stream model is fundamentally pull.
That is nice!
That is also a cool feature. There is, however, a fundamental difference here between Rx/Reactive Streams and choice streams. In Rx/Reactive Streams the At any rate, thanks for the reply, I believe this discussion helped me to better understand some further differences between the different models. |
RxJava has advanced beyond Rx.Net and supports the Reactive Stream style backpressure. |
I have played with approaches like this and found the object allocation overhead to be far too restrictive on the JVM. Perhaps with value types it would be more viable, but right now I don't see how this would be done without measurable impact on throughput. How do you address that? How does your model differ from CSP or Actors? The fact that every stream is a queue (linked list) suggests that it is very similar. |
At the present, choice streams are a library built on top of a concurrent programming library and there are only some minimal manual optimizations done on the choice stream implementation. The concurrent programming library, Hopac, on which choice streams are written in is, however, aggressively optimized. I haven't yet done comprehensive benchmarking or performance testing (and subsequent optimization) on choice streams. A couple of very simple tests suggested that choice streams are competitive with .Net Rx in terms of performance, but I really wouldn't draw any conclusions from those yet. More comprehensive benchmarks and performance tests are needed. Theoretically, it should be possible to implement choice streams so that they are nearly as efficient as regular singly linked lazy lists: single allocation to add an element to a choice stream, zero or one allocations to request an element from a stream. This requires either a very good optimizing compiler (e.g. MLton) or manual specialization of the stream representation, ultimately fusing the
The design of the Hopac library is heavily influenced by Concurrent ML. CML can be seen as being based on pi-calculus with non-deterministic choice. More recent languages/libraries like Go and Clojure core.async said to be based on CSP can basically be seen as providing a subset of CML—they lack the built-in notion of first-class events (called alternatives in Hopac) and negative acknowledgments. A choice stream can basically be seen as a kind of multicast communication mechanism allowing one-to-many and many-to-many messaging, with the property that all receivers (Rx subscribers, choice stream consumers) get all the messages (after the point they start receiving). In Reppy's Concurrent ML book there is an implementation of multicast channels that can be seen as a particular use case of choice streams. It should be straighforward to encode choice streams in terms of pi-calculus, CSP or join-calculus, possibly even with more limited actor models. However, I don't really think of them as being related. Choice streams are about broadcasting messages with ordering and delivery guarantees to any number of processes, which is not a basic mode of communication for those process calculi. |
Thanks for the further information. I'll think about what you've shared. |
excellent |
Thus far Rx has left backpressure solutions "an exercise for the user". It can be done via manual feedback loops and operators like
throttle
,sample
,window
, etc.The drawback of this is that it requires someone understanding the implication of every operator and when it might be async (buffer/queue) and requires the effort to create a feedback loop and hook it up correctly. Some use cases will probably always require this type of manual effort, but it's worth exploring whether we can solve the general use cases within Rx.
Most Rx operators are synchronous in nature meaning the producing thread does not return until the work is completed. The
map
operator is a good example. It does computation work to transform fromT -> R
and nothing else.There are some operators though that are async in nature and involve unbounded buffers. These include
observeOn
,zip
andmerge
(as of 0.17.1 which meansflatMap
is affected). It also happens if anObserver
is writing the data out to non-blocking IO (such as a Netty channel).In all of these example, if the producer is fast, such as an in-memory Iterable, loading a file, or a firehose of events over a network, buffer-bloat (and eventually OutOfMemory) can easily occur.
After some experimentation and talking with various individuals, teams and companies about this topic, a possible solution is the use of co-routines from the producing
Observable
that aSubscriber
can request data from in batches.Prototype Code
A prototype of this is being experimented with at https://github.com/benjchristensen/RxBackpressure though it is not yet complete (as of this writing).
The general idea is that an
Observable.OnSubscribe
implementation could register a producer co-routine with theSubscriber
and then only push data down when theSubscriber
has said how much it can receive. It becomes a conditional push-model. The co-routine can then be "parked" by the producer by finishing work and returning (releasing the thread) and then "unparked" or resumed by theSubscriber
when it wants more. In this way no threads are blocked and only the amount of data theSubscriber
can handle is sent. The same model can work across threads or across the network.If the
Observable
chain has no async operators then it will be executed with an "infinite" request and behave exactly as it does today without any parking.If an async operator is in the chain then it will "request" a batch size equaling it's internal buffer (say 128, 512, 1024 etc).
A simple producer of an
Iterable
would look like this (ignoring debates over naming and API):The
observeOn
operator is async and thus will request batches as needed. The relevant code looks like this:The
subscribeOn
operator decorates the co-routine so scheduling is retained and is started on the desired thread each time rather than the consuming thread doing the work.The
take
andskip
operators compose the batch sizes to adjust accordingly to what is being taken or skipped:The
Subscriber
manages the life-cycle of when the co-routine is run: https://github.com/benjchristensen/RxBackpressure/blob/master/rx-bp-prototype/src/main/java/rx/Subscriber.java#L97For example, if the final
Subscriber
in the chain is hit and no async operator was involved it immediately invokes the co-routine with "-1" request size so it is infinite.Example Use Cases
Use case are being written here: https://github.com/benjchristensen/RxBackpressure/tree/master/rx-bp-examples/src/test/java/rx/examples
Handling Observables Without Co-routines
Implementing an
Observable
that supports backpressure is more complicated and not all will support it. In those cases the async operators willonError
stating that anObservable
is not respecting the backpressure and suggest resolutions (linking to documentation probably).The two routes at that time are: 1) fix the
Observable
to support backpressure, or 2) use one of several backpressure operators that will be added such as:whileParkedDrop
whileParkedBuffer
whileParkedBufferThenDrop(int maxCount)
whileParkedBlock
(when blocking is okay, such as on Quasar fibers, IO threads, etc)whileParkedUnsubscribe
(such as on a hot stream like mouse events)Outstanding Work
An earlier prototype (not public) had
zip
functioning but the current code does not. We know how it is done and this model works with it but have not yet spent the time to re-implement it. We need to.The biggest outstanding item is making
merge
work (and thusflatMap
). This is the major hurdle of any backpressure solution working.We (myself, @abersnaze, @headinthebox and others) have whiteboarded it and believe we have a solution but my schedule has not allowed me to code it. I have held off on writing this post as I wanted to code it up first, but since I still haven't had time I wanted to get this information public instead of holding it up.
The planned design for
merge
is that eachObservable
being merged would have it's own buffer (of say 128 items) and then the consumer would request n items which would come in round-robin fashion from all of the merged buffers. Each individualObservable
would exert it's own backpressure upstream. SlowObservables
may never fill their buffer while fast ones will.We will not attempt to limit the horizontal growth (number of
Observables
being merged) but will limit the vertical growth (size of buffer for eachObservable
).The challenge with this is making sure performance and fairness are balanced.
Next Steps
merge
andzip
I look forward to your help making this happen.
The text was updated successfully, but these errors were encountered: