Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Zip with Backpressure Support #1446

Merged
merged 3 commits into from
Jul 17, 2014

Conversation

benjchristensen
Copy link
Member

This supports both upstream and downstream backpressure.

This supports both upstream and downstream backpressure.
// Concurrent* since we need to read it from across threads
final ConcurrentLinkedQueue items = new ConcurrentLinkedQueue();
final RxRingBuffer items = RxRingBuffer.getSpmcInstance();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is at most 1 consumer and 1 producer thread, why the SPMC queue?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oversight on my part, this should work as SPSC. I'll make the change.

@cloudbees-pull-request-builder

RxJava-pull-requests #1397 ABORTED

@akarnokd
Copy link
Member

I get random hangs and test failures in

OperatorGroupByTest > testFirstGroupsCompleteAndParentSlowToThenEmitFinalGroupsAndThenComplete hangs

OperatorPivotTest > testPivotEvenAndOdd Exception

OnSubscribeCacheTest > testWithPublishSubjectAndRepeat
java.lang.AssertionError at OnSubscribeCacheTest.java:146

OnSubscribeCacheTest > testWithBehaviorSubjectAndRepeat
java.lang.AssertionError at OnSubscribeCacheTest.java:146

RxRingBufferWithoutUnsafeTest > testConcurrency
java.lang.AssertionError at RxRingBufferWithoutUnsafeTest.java:166

}
if (emitted > THRESHOLD) {
for (Object obj : observers) {
((InnerSubscriber) obj).request(emitted);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why are we requesting more data from sources if the downstream didn't ask for values? It looks like as if request(n) is used as a repeated 'batching' operation. I guess this was required for backpressure unaware downstream which would "hang" unless new data is requested even now and then.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So far I have treated upstream and downstream as decoupled on async operators like this with a buffer in them. Even if no further is requested from downstream it tries to keep the buffers full. This is similar to merge and observeOn. This is done because it can't know when a request will come in but assumes it will and keeps filling the buffers so the data is available when the downstream requests it.

I'm open to exploring alternatives if performance testing shows a different approach is better for throughput.

You'll also notice that it does not perform the request until after it passes the THRESHOLD value. This is done so it requests in batches rather than one at a time, which can be more efficient in certain use cases.

What are you recommending doing differently?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Instead of introducing buffers, I'd keep the current unbounded operator behavior on the onNext stream for most operators and only decorate the request(n) stream going backwards to the source; basically what I did in my zip operator. Otherwise, I'd use a batch(n) operator I suggested.

But currently, I only considered from and range as the sources where this simplified request mangling is straightforward. Are there any other RxJava sources?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Having unbounded buffers defeats the point of backpressure and makes it non-deterministic as to whether buffer float will occur from perspective of a user. The idea is to eliminate unbounded buffers unless the user asked for it such as in replay, toList or onBackpressureBuffer.

@benjchristensen
Copy link
Member Author

I get random hangs and test failures in

I think we should probably remove the pivot operator: #1402

The others all need to be researched.

@benjchristensen
Copy link
Member Author

RxJava-pull-requests #1397 ABORTED

Oddly this hung in the rxjava-scalaz project: :rxjava-contrib:rxjava-scalaz:test

@benjchristensen
Copy link
Member Author

From my command line it also hangs:

[ant:scalac] Element '/Users/benjchristensen/development/github/RxJavaFork/rxjava-contrib/rxjava-scalaz/build/resources/main' does not exist.
:rxjava-contrib:rxjava-scalaz:processTestResources UP-TO-DATE
:rxjava-contrib:rxjava-scalaz:testClasses
> Building > :rxjava-contrib:rxjava-scalaz:test > 6 tests completed

It fails here:

08:12:07.758 [DEBUG] [TestEventLogger] rx.lang.scala.scalaz.ObservableZipSpec STARTED
08:12:07.779 [DEBUG] [TestEventLogger] 
08:12:07.779 [DEBUG] [TestEventLogger] rx.lang.scala.scalaz.ObservableZipSpec > Zip Operators should::be able to appy to Observable STARTED

@benjchristensen
Copy link
Member Author

@headinthebox
Copy link
Contributor

Scalaz turns you code into messy tokensoup. I can't read it either.

Similar to filter, it needs to request(1) each time it filters an onNext.
@cloudbees-pull-request-builder

RxJava-pull-requests #1406 FAILURE
Looks like there's a problem with this pull request

@cloudbees-pull-request-builder

RxJava-pull-requests #1407 SUCCESS
This pull request looks good

@benjchristensen
Copy link
Member Author

Non-deterministic tests can be tracked here: #1455

benjchristensen added a commit that referenced this pull request Jul 17, 2014
@benjchristensen benjchristensen merged commit 201f54d into ReactiveX:master Jul 17, 2014
@benjchristensen benjchristensen deleted the zip-backpressure branch July 17, 2014 19:51
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants