-
Notifications
You must be signed in to change notification settings - Fork 7.6k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Backpressure-7 - Merge & ObserveOn #1382
Conversation
RxJava-pull-requests #1320 SUCCESS |
RxJava-pull-requests #1327 SUCCESS |
What I find confusing is what the protocol is precisely. i.e. when you create an observable, what do you with
That's har to wrap your head around. |
Similarly, when subscribing using a subscriber, what are you supposed to do with the producer, I.e. how do I invoke back-pressure here?
|
This is a MUST. |
Other operators that seem hard are the temporal ones, like |
And ones that are the opposite of |
Those wouldn't request backpressure. Time-based operators would always
As each group consumes, it requests more from upstream, so that part is easy. The hard part would be if a single group is slow and can no longer consume, it would require the origin no longer sending and thus all groups would slow down to the request rate of the slowest group. This feels like the right thing though ... if I have 10 groups, and 1 of them is receiving far more data than the rest and is filling its buffer, the other 9 will have to go at the same rate as that 1. Otherwise that 1 must choose to drop data or buffer-bloat.
This is the most awkward part of the API right now. I'm trying to find a performant way of doing it. I had
It's an "either-or" scenario. You either ignore backpressure and do this: Observable.create(subscriber => {
... subscriber.onNext(...)
... subscriber.onCompleted();
}) or you give a Observable.create(subscriber => {
... subscriber.setProducer(new Producer() {
public void request(int n) {
... subscriber.onNext(...)
... subscriber.onCompleted();
}
});
}) |
See https://github.com/benjchristensen/RxJava/blob/backpressure-7/rxjava-core/src/main/java/rx/internal/operators/OnSubscribeFromIterable.java#L65 for when This class shows how it conditionally does either/or based on the number of elements to emit. A simpler solution would skip lines 45-64 and only call |
Here is how I currently have it working:
The general idea is: // note how it requests 100 in the constructor
observable.subscribe(new Subscriber<T>(100) {
public void onCompleted() {};
public void onError(Throwable e) {};
public void onNext(T t) {
// as it receives elements it can request more
request(1); // better to batch, but can do one at a time like this
}
}); |
Thanks @benjchristensen! |
RxJava-pull-requests #1331 SUCCESS |
This is an exploratory performance improvement. See discussion at ReactiveX#1384
53m -> 160m messages/second when consuming an Observable.range(0, 1000000)
Avoid touching a volatile inside the tight loops. The OperatorObserveOnPerf.observeOnComputation with 1000000 values was taking so long to run I killed it. With these changes it completes: r.o.OperatorObserveOnPerf.observeOnComputation 1000000 thrpt 5 7.613 6.672 ops/s
RxJava-pull-requests #1332 SUCCESS |
This leverages custom data structures that rely on sun.misc.Unsafe. It falls back to LinkedList if Unsafe is unavailable, such as for Android. ``` -------------- ../gradlew benchmarks '-Pjmh=-f 1 -wi 5 -i 5 -r 2 .*PerfBaseline.observable.*' ------- 0.19 Benchmark (size) Mode Samples Mean Mean error Units r.PerfBaseline.observableConsumption 1 thrpt 5 31881123.086 1556397.821 ops/s r.PerfBaseline.observableConsumption 1000 thrpt 5 235797.222 13915.448 ops/s r.PerfBaseline.observableConsumption 1000000 thrpt 5 245.064 12.162 ops/s r.PerfBaseline.observableViaRange 1 thrpt 5 25019595.907 797863.816 ops/s r.PerfBaseline.observableViaRange 1000 thrpt 5 186548.299 10512.421 ops/s r.PerfBaseline.observableViaRange 1000000 thrpt 5 183.245 13.291 ops/s ------- 0.19 with volatile/synchronization changes Benchmark (size) Mode Samples Mean Mean error Units r.PerfBaseline.observableConsumption 1 thrpt 5 58303018.095 935455.977 ops/s r.PerfBaseline.observableConsumption 1000 thrpt 5 227118.383 19715.515 ops/s r.PerfBaseline.observableConsumption 1000000 thrpt 5 227.567 43.552 ops/s r.PerfBaseline.observableViaRange 1 thrpt 5 29536860.600 475787.569 ops/s r.PerfBaseline.observableViaRange 1000 thrpt 5 184212.623 4187.098 ops/s r.PerfBaseline.observableViaRange 1000000 thrpt 5 178.653 12.047 ops/s ------- backpressure-7 before volatile/synchronization changes Benchmark (size) Mode Samples Mean Mean error Units r.PerfBaseline.observableConsumption 1 thrpt 5 18016008.716 271261.264 ops/s r.PerfBaseline.observableConsumption 1000 thrpt 5 243090.495 4693.818 ops/s r.PerfBaseline.observableConsumption 1000000 thrpt 5 254.516 3.178 ops/s r.PerfBaseline.observableViaRange 1 thrpt 5 17179874.861 319574.244 ops/s r.PerfBaseline.observableViaRange 1000 thrpt 5 182684.247 4077.967 ops/s r.PerfBaseline.observableViaRange 1000000 thrpt 5 53.867 0.440 ops/s ------- backpressure-7 Benchmark (size) Mode Samples Score Score error Units r.PerfBaseline.observableConsumption 1 thrpt 5 51472952.744 5987759.439 ops/s r.PerfBaseline.observableConsumption 1000 thrpt 5 239038.814 6860.951 ops/s r.PerfBaseline.observableConsumption 1000000 thrpt 5 244.674 6.567 ops/s r.PerfBaseline.observableViaRange 1 thrpt 5 30302378.407 772629.965 ops/s r.PerfBaseline.observableViaRange 1000 thrpt 5 171595.529 12502.807 ops/s r.PerfBaseline.observableViaRange 1000000 thrpt 5 161.238 9.059 ops/s -------------- ../gradlew benchmarks '-Pjmh=-f 1 -wi 5 -i 5 -r 2 .*rx.operators.OperatorMapPerf.*' ------- 0.19 Benchmark (size) Mode Samples Mean Mean error Units r.o.OperatorMapPerf.mapPassThru 1 thrpt 5 17834349.447 417931.393 ops/s r.o.OperatorMapPerf.mapPassThru 1000 thrpt 5 153400.393 17397.004 ops/s r.o.OperatorMapPerf.mapPassThru 1000000 thrpt 5 160.887 4.557 ops/s r.o.OperatorMapPerf.mapPassThruViaLift 1 thrpt 5 19127321.386 503906.389 ops/s r.o.OperatorMapPerf.mapPassThruViaLift 1000 thrpt 5 158085.344 3763.660 ops/s r.o.OperatorMapPerf.mapPassThruViaLift 1000000 thrpt 5 160.650 2.875 ops/s ------- backpressure-7 Benchmark (size) Mode Samples Score Score error Units r.o.OperatorMapPerf.mapPassThru 1 thrpt 5 20809605.075 217396.485 ops/s r.o.OperatorMapPerf.mapPassThru 1000 thrpt 5 157102.490 3143.752 ops/s r.o.OperatorMapPerf.mapPassThru 1000000 thrpt 5 147.605 5.704 ops/s r.o.OperatorMapPerf.mapPassThruViaLift 1 thrpt 5 19478234.492 587866.316 ops/s r.o.OperatorMapPerf.mapPassThruViaLift 1000 thrpt 5 151529.625 15069.277 ops/s r.o.OperatorMapPerf.mapPassThruViaLift 1000000 thrpt 5 146.516 11.072 ops/s -------------- ../gradlew benchmarks '-Pjmh=-f 1 -wi 5 -i 5 -r 2 .*rx.operators.OperatorObserveOnPerf.*' ------- 0.19 Benchmark (size) Mode Samples Mean Mean error Units r.o.OperatorObserveOnPerf.observeOnComputation 1 thrpt 5 86331.092 5181.739 ops/s r.o.OperatorObserveOnPerf.observeOnComputation 1000 thrpt 5 8787.634 316.166 ops/s r.o.OperatorObserveOnPerf.observeOnComputation 1000000 thrpt 5 8.590 4.527 ops/s r.o.OperatorObserveOnPerf.observeOnImmediate 1 thrpt 5 13074710.596 217808.611 ops/s r.o.OperatorObserveOnPerf.observeOnImmediate 1000 thrpt 5 188868.055 5448.065 ops/s r.o.OperatorObserveOnPerf.observeOnImmediate 1000000 thrpt 5 185.372 4.190 ops/s r.o.OperatorObserveOnPerf.observeOnNewThread 1 thrpt 5 13848.943 2601.271 ops/s r.o.OperatorObserveOnPerf.observeOnNewThread 1000 thrpt 5 6867.300 172.123 ops/s r.o.OperatorObserveOnPerf.observeOnNewThread 1000000 thrpt 5 10.100 1.925 ops/s ------- backpressure-7 Benchmark (size) Mode Samples Score Score error Units r.o.OperatorObserveOnPerf.observeOnComputation 1 thrpt 5 86711.196 3714.229 ops/s r.o.OperatorObserveOnPerf.observeOnComputation 1000 thrpt 5 8150.631 202.839 ops/s r.o.OperatorObserveOnPerf.observeOnComputation 1000000 thrpt 5 9.391 5.451 ops/s r.o.OperatorObserveOnPerf.observeOnImmediate 1 thrpt 5 14749198.208 53924.729 ops/s r.o.OperatorObserveOnPerf.observeOnImmediate 1000 thrpt 5 173567.350 6099.180 ops/s r.o.OperatorObserveOnPerf.observeOnImmediate 1000000 thrpt 5 161.833 3.909 ops/s r.o.OperatorObserveOnPerf.observeOnNewThread 1 thrpt 5 13568.487 573.296 ops/s r.o.OperatorObserveOnPerf.observeOnNewThread 1000 thrpt 5 6689.964 292.888 ops/s r.o.OperatorObserveOnPerf.observeOnNewThread 1000000 thrpt 5 12.129 3.199 ops/s ```
Baseline performance of 2 big changes done to achieve this are:
|
RxJava-pull-requests #1333 SUCCESS |
Here is the current perf status of
|
RxJava-pull-requests #1334 SUCCESS |
volatile touched in emitWithoutQueue is hurting performance. Some unit tests failing elsewhere Not handling backpressure requirements on downstream merge ... put merge().observeOn() unit test to show this.
... all tests passing exception 1 related to merge that is known and not implemented yet. ... one known performance issue in RxRingBuffer.emitWithoutQueue
RxJava-pull-requests #1337 FAILURE |
Functional upstream backpressure while retaining good performance in various use cases. Have not yet made it respect downstream backpressure requests. That is another tricky round of work to do in a way that doesn't kill performance. Will likely have a route with current code when it's not needed, and a slower route when it is needed. Both merge and observeOn invoke request somewhat too eagerly right now and keep buffers full. The BackpressureTests reveal this in 3 test failures right now. It is worth looking into making them wait until a request size threshold has been met ... but it's not urgent. May change the unit tests and just put TODOs for future optimization. Here are current perf numbers: ``` ------- 0.19 Benchmark (size) Mode Samples Score Score error Units r.o.OperatorMergePerf.merge1SyncStreamOfN 1 thrpt 5 2946833.283 34656.012 ops/s r.o.OperatorMergePerf.merge1SyncStreamOfN 1000 thrpt 5 68757.821 2885.689 ops/s r.o.OperatorMergePerf.mergeNAsyncStreamsOfN 1 thrpt 5 101272.545 6422.220 ops/s r.o.OperatorMergePerf.mergeNAsyncStreamsOfN 1000 thrpt 5 8.457 0.299 ops/s r.o.OperatorMergePerf.mergeNSyncStreamsOf1 1 thrpt 5 2961735.889 129304.035 ops/s r.o.OperatorMergePerf.mergeNSyncStreamsOf1 100 thrpt 5 65060.345 1794.712 ops/s r.o.OperatorMergePerf.mergeNSyncStreamsOf1 1000 thrpt 5 6487.781 228.564 ops/s r.o.OperatorMergePerf.mergeNSyncStreamsOfN 1 thrpt 5 2950430.071 61758.514 ops/s r.o.OperatorMergePerf.mergeNSyncStreamsOfN 1000 thrpt 5 70.900 1.508 ops/s r.o.OperatorMergePerf.mergeTwoAsyncStreamsOfN 1 thrpt 5 65560.958 3881.703 ops/s r.o.OperatorMergePerf.mergeTwoAsyncStreamsOfN 1000 thrpt 5 4080.117 249.937 ops/s ------- backpressure-7 Benchmark (size) Mode Samples Score Score error Units r.o.OperatorMergePerf.merge1SyncStreamOfN 1 thrpt 5 8056282.024 1244398.099 ops/s +++ r.o.OperatorMergePerf.merge1SyncStreamOfN 1000 thrpt 5 95552.979 4783.600 ops/s ++ r.o.OperatorMergePerf.merge1SyncStreamOfN 1000000 thrpt 5 61.996 2.765 ops/s r.o.OperatorMergePerf.mergeNAsyncStreamsOfN 1 thrpt 5 99261.303 7468.987 ops/s == r.o.OperatorMergePerf.mergeNAsyncStreamsOfN 1000 thrpt 5 6.154 0.475 ops/s - r.o.OperatorMergePerf.mergeNSyncStreamsOf1 1 thrpt 5 9605862.246 258832.255 ops/s +++ r.o.OperatorMergePerf.mergeNSyncStreamsOf1 100 thrpt 5 921790.636 28790.629 ops/s ++++ r.o.OperatorMergePerf.mergeNSyncStreamsOf1 1000 thrpt 5 73696.935 17967.741 ops/s ++++ r.o.OperatorMergePerf.mergeNSyncStreamsOfN 1 thrpt 5 8669361.559 849097.317 ops/s +++ r.o.OperatorMergePerf.mergeNSyncStreamsOfN 1000 thrpt 5 73.843 1.305 ops/s + r.o.OperatorMergePerf.mergeTwoAsyncStreamsOfN 1 thrpt 5 73709.795 11261.906 ops/s + r.o.OperatorMergePerf.mergeTwoAsyncStreamsOfN 1000 thrpt 5 3768.807 195.025 ops/s - r.o.OperatorMergePerf.oneStreamOfNthatMergesIn1 1 thrpt 5 8329906.527 222020.342 ops/s r.o.OperatorMergePerf.oneStreamOfNthatMergesIn1 1000 thrpt 5 69658.588 3154.351 ops/s r.o.OperatorMergePerf.oneStreamOfNthatMergesIn1 1000000 thrpt 5 72.440 2.486 ops/s ```
RxJava-pull-requests #1338 FAILURE |
Decent progress with Here is the comparison between 0.19 and this branch. In most cases it is now better. Many of the optimizations made, particularly related to
There is one major round of work to be done on |
By the way ... the unit tests that are failing are known to be failing, and are doing so on purpose as they represent work still to be done. |
Nice! |
As per discussion in ReactiveX#1383 ... despite the performance hit.
Removing code that would never get executed.
- Parallel operator doesn't work with backpressure right now - 1 use case (and unit test) is failing on merge
RxJava-pull-requests #1341 FAILURE |
RxJava-pull-requests #1342 FAILURE |
RxJava-pull-requests #1343 FAILURE |
Replaced by #1403 |
This is a functioning but non-performant implementation of backpressure. Of the major unbounded queue operators in 0.19, this implements
observeOn
andmerge
but not yetzip
. Several attempts at performance improvements last week failed so I have rolled them back and left this with naive data structures while restarting the performance work.I want to kick off the design and functionality review while continuing to work on performance.
New Signatures
Subscriber.request
vsProducer.request
This is possibly confusing and I think we may want to eliminate
Subscriber.request
. It would make some operators liketake
more complicated, but for the rare time it is needed, I think it's more clear that you must "capture" theProducer
and callProducer.request
.Operator Changes
OnSubscribeFromIterable
andOnSubscribeRange
are both updated to supportProducer
and will only emit as manyonNext
as requested. The implementations are a little more complex than needed as they optimize for cases where backpressure is not requested and fallback to the "firehose" model of 0.19 and earlier. This allows for higher throughput in case where backpressure is not needed.parallel
had to be rewritten with a different approach to support backpressure. Thefilter
,take
, andskip
operators were modified to support backpressure and propagate request modifications based on each of their logic.takeUntil
needed to be rewritten becauseNotificationLite
can't be allowed to leak across operators. ThesubscribeOn
operator was modified to correctly reschedule work on the givenScheduler
when arequest(n)
occurs after theProducer
has let itself shut down.Design Questions
1) Producer
Is the name and signature of
Producer.request(n)
correct?2) Subscriber.setProducer
Is this the correct signature for an
Observable.OnSubscribe
to register theProducer
?3) Subscriber.request
This seems confusing and should probably be removed, despite it making some use cases such as
take
andfilter
more complicated.4) Requesting <0 for Infinite (no backpressure)
To be backwards compatible, and support performance optimizations when backpressure is not needed,
Producer.request(n)
can be invoked with a negative number. This means "no backpressure" and theProducer
can then emit data in a tight-loop without further limitations.Is this acceptable and the best solution?
5) Producer.BUFFER_SIZE
Currently there is a public constant,
Producer.BUFFER_SIZE
that represents the system-wide size of buffers used for backpressure. This is exposed so that an operator implementation can optimize to skip thesetProducer
path when it knows it will emit less data that the BUFFER_SIZE.This does have a performance benefit, but it comes at the cost of exposing implementation details. It also means all buffers MUST use the same buffer size, or at least not be any smaller.
Is this something we want? Is the performance benefit worth the trade-offs and complexity?
6) Other operators like filter?
Are there other operators that filter out
onNext
likefilter
does that we must modify to callrequest(1)
each time it filters something out? Is there a better way of handling this so operators do not need to manually handle this?7) onBackpressure*
Are the
onBackpressureBuffer
andonBackpressureDrop
operators well-named, clear in their usage and implemented correctly? Should we implemented others such asonBackpressureUnsubscribe
andonBackpressureBuffer(int limitUntilDropping)
?8) Unit Tests and Use Cases
Are there major use cases that are missing from
BackpressureTests
?What problems are there that are not yet resolved or handled by the design?
9) observeOn
Is the design of
observeOn
as efficient and correct as it can be?10) merge
The
merge
operator currently has a naive algorithm for handling the queue of buffers. We need to design a better one.There are some unit tests marked as ignored for
merge
in theBackpressureTests
class. Should those work? What should be the behavior ofmerge
when merging all synchronous, all asynchronous, or mixed sync/async Observables?Performance
Performance is not good right now. The biggest problem is
merge
, and that's critical since it's key toflatMap
which is a core operator.Performance can be tested via the JMH performance tests. This is where I'm currently focusing.