Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix Amb backpressure bug #2961

Merged
merged 1 commit into from
May 26, 2015
Merged

fix Amb backpressure bug #2961

merged 1 commit into from
May 26, 2015

Conversation

davidmoten
Copy link
Collaborator

The Amb operator has a backpressure bug:

  • if m requests were made before the first emission then m subscriptions were started on each source observable. We only want once subscription on each.

This PR adds fix code to OnSubscribeAmb and two unit tests.

} else {
//subscriptions already happened so propagate the request to all the
//amb subscribers
for (AmbSubscriber<T> ambSubscriber: selection.ambSubscribers) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This may still run concurrently with the subscription loop and some subscribers won't get the newer requests during that time. The simplest fix is to subscribe all AmbSubscriber in the call() of the operator with 0 initial requests so all subsequent request() can target all or the winner without any additional race. The other way is to get into some complicated serialization logic.

@davidmoten
Copy link
Collaborator Author

yeah I like your first suggestion, I'll look at that

@davidmoten
Copy link
Collaborator Author

I've moved the subscription to sources loop into the call method as suggested.

if (!ambSubscriber.isUnsubscribed()) {
// make a best endeavours check to not waste requests
// if first emission has already occurred
if (selection.choice.get() == ambSubscriber) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd move selection.choice into a final variable at the beginning of the method so we don't re-read two fields unnecessarily with all those memory barriers around.

@davidmoten
Copy link
Collaborator Author

I've done that and reduced volatile reads a bit too. Given that selection and selection.choice are both final fields what is the interaction with the memory barriers? (sorry, a bit of a general question this for my education)

@akarnokd
Copy link
Member

Variables tend to be re-read around volatiles and the JIT may be smart enough to optimize it and pull them upfront, but you can't be sure.

// there is a single winner so we unsubscribe it
selection.choice.get().unsubscribe();
c.unsubscribe();
}
// if we are racing with others still existing, we'll also unsubscribe them
if(!selection.ambSubscribers.isEmpty()) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd read selection.ambSubscribers into a local variable as well.

@davidmoten
Copy link
Collaborator Author

added ambSubscribers final variable

final Iterable<? extends Observable<? extends T>> sources;
final Selection<T> selection = new Selection<T>();
final AtomicReference<AmbSubscriber<T>> choice = selection.choice;
final Collection<AmbSubscriber<T>> ambSubscribers = selection.ambSubscribers;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I meant read them into a method-local variable.

@davidmoten
Copy link
Collaborator Author

ok, done

@akarnokd
Copy link
Member

I'd like to review the whole operator but I don't have the time and the focus to do it now. I'll come back to this in a few hours.

@zsxwing
Copy link
Member

zsxwing commented May 19, 2015

if m requests were made before the first emission then m subscriptions were started on each source observable. We only want once subscription on each.

Maybe I missed some discussion about this case. So now we support calling request in onStart multiple times?

@akarnokd akarnokd added the Bug label May 19, 2015
@davidmoten
Copy link
Collaborator Author

Maybe I missed some discussion about this case. So now we support calling request in onStart multiple times?

Yes we do, multiple requests in onStart are additive but this is about what happens between subscriptions starting and first emission occurring. The new test testSubscriptionOnlyHappensOnce demos the problem in that by subscribing to an amb of async sources with an onStart request of one and then immediately (post subscription) calling request(1) we get a bunch of extra subscriptions.

.delay(100, TimeUnit.MILLISECONDS).subscribeOn(Schedulers.computation());
TestSubscriber<Integer> ts = new TestSubscriber<Integer>();
Observable.amb(o1, o2).subscribe(ts);
ts.requestMore(1);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should be forbidden in the real case. Right? request should be called in Subscriber.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So testSubscriptionOnlyHappensOnce is a test case that requesting twice before first emission. Right? The other request is in subscribe.

@davidmoten
Copy link
Collaborator Author

Interesting point but I assume async operators downstream of the amb could do what we are trying to simulate with TestSubscriber.

Saying that requests between between onStart and first emission are forbidden is not something I see documented anywhere, let me know if you see it.

@zsxwing
Copy link
Member

zsxwing commented May 20, 2015

Saying that requests between between onStart and first emission are forbidden is not something I see documented anywhere, let me know if you see it.

I don't mean it. I mean request is protected to avoid calling out of Subscriber.


//give default access instead of private as a micro-optimization
//for access from anonymous classes below
final Iterable<? extends Observable<? extends T>> sources;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Cool. Just learnt that accessing a private field of the outer class uses invokestatic.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

And generates an accessor method in that outer class, the count of which (methods) is a big deal on Android. So 👍.

@zsxwing
Copy link
Member

zsxwing commented May 20, 2015

LGTM for the fix. Just some minor comments for the tests.

@davidmoten
Copy link
Collaborator Author

@zsxwing I've updated the test that called ts.requestMore(1) before subscription so that it uses the onStart method of TestSubscriber instead. Thanks for the review.

@zsxwing
Copy link
Member

zsxwing commented May 20, 2015

LGTM. Let's wait for @akarnokd to take a final look.

@akarnokd
Copy link
Member

I found a few places for possible optimizations:

  • in AmbSubscriber, you can have a plain boolean chosen field to avoid the reference comparison:
boolean chosen;
...
    private boolean isSelected() {
        if (chosen) {
            return true;
        }
        if (selection.choice.get() == this) {
            // fast-path
            chosen = true;
            return true;
        } else {
            if (selection.choice.compareAndSet(null, this)) {
                selection.unsubscribeOthers(this);
                chosen = true;
                return true;
            } else {
                // we lost so unsubscribe ... and force cleanup again due to possible race conditions
                selection.unsubscribeLosers();
                return false;
            }
        }
    }
  • There is a race between the unsubscription logic at L360 and the loop at L381: while the loop is busy creating and subscribing AmbSubscribers, a concurrent unsubscribe will start cancelling that have reached the queue but any subsequent. Possible solutions:
    • Move the creation and enqueueing of AmbSubscribers at the beginning, then create the unsubscription logic and finally loop and subscribe the AmbSubscribers. This way, the queue will contain all that need to be unsubscribed.
    • Instead of quitting after the subscriber.isUnsubscribed(), drain the queue and unsubscribe its contents, do a similar check after the loop as well to clean up any stragglers.
    • Have a custom subscription manager, similar to what the subjects use that can unsubscribe latecommers and keep the current one-loop approach (I have plans to cover such custom container in my blog soon.)

@davidmoten davidmoten force-pushed the amb-bug branch 2 times, most recently from 47ebbbf to bb659b6 Compare May 20, 2015 23:54
@davidmoten
Copy link
Collaborator Author

I like the optimization for isSelected. I've also committed the first approach for the unsub race but I think we can do better as you imply. Might go for the second.

@davidmoten davidmoten force-pushed the amb-bug branch 2 times, most recently from 5d5d929 to b937fdd Compare May 24, 2015 05:28
@davidmoten
Copy link
Collaborator Author

Instead of quitting after the subscriber.isUnsubscribed(), drain the queue and unsubscribe its contents, do a similar check after the loop as well to clean up any stragglers.

I've implemented this suggestion @akarnokd. Ready for (another) review, thanks.

@akarnokd
Copy link
Member

Looks good. Thanks!

akarnokd added a commit that referenced this pull request May 26, 2015
@akarnokd akarnokd merged commit 4329fed into ReactiveX:1.x May 26, 2015
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants