Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Operator Publish full rewrite #2814

Merged
merged 1 commit into from
Apr 30, 2015

Conversation

akarnokd
Copy link
Member

Complete rewrite of publish() and fixed bugs of #2810 and #2803.

@akarnokd akarnokd added this to the 1.1 milestone Mar 10, 2015
@benjchristensen
Copy link
Member

Wow. Will have to come back to this one :-) that's a pretty epic fix of my bad code!

@benjchristensen
Copy link
Member

@davidmoten have you by chance tried this and been able to confirm it fixes your issues?

@akarnokd
Copy link
Member Author

Ah, it was easy; you only need to dualize merge :).

@davidmoten
Copy link
Collaborator

@benjchristensen the original issue is covered by the rewrite, thanks @akarnokd . I've given the code a once over but not thorough, would take me a while and have little time at the moment.

@davidmoten
Copy link
Collaborator

@akarnokd I have had another brief foray into the code and I have every faith in your ability to create correct code but I'm not having a great time following it in the short slots of time I have between kid juggling. For longevity's sake I'm wondering if this beautiful looking bit of code could be bulked up with comments to lead me through and to lead future maintainers through. Of course if @benjchristensen or @zsxwing or another of the usual suspects think it's clear as day then further comments may be unnecessary.

@akarnokd
Copy link
Member Author

Sure.

@akarnokd akarnokd force-pushed the OperatorPublishRewrite branch from 97173da to ccec09c Compare April 22, 2015 12:05
@akarnokd
Copy link
Member Author

I've rebased, squashed and added comments to this PR.

@davidmoten
Copy link
Collaborator

In OperatorPublishTest.testPublish (old code, you made no changes), would be nice to have an assert between the calls to subscribe and the call to connect just to ensure that subscription has occured but nothing is emitted before the call to connect. This may be covered by other tests but not explicitly so I think could be worthwhile addition.

@akarnokd akarnokd force-pushed the OperatorPublishRewrite branch from ccec09c to d9588ff Compare April 23, 2015 06:33
@akarnokd
Copy link
Member Author

Thanks! Updated tests accordingly and fixed a case with clients starting with 0 requests.

@davidmoten
Copy link
Collaborator

Perhaps add a connect idempotency test?

@akarnokd akarnokd force-pushed the OperatorPublishRewrite branch from d9588ff to a9f041b Compare April 23, 2015 11:41
@akarnokd
Copy link
Member Author

Done.

// Now find the minimum amount each child-subscriber requested
// since we can only emit that much to all of them without violating
// backpressure constraints
for (InnerProducer<T> ip : ps) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is one strategy I suppose and another might be request max but buffer responses for those subscribers that want less than max. I can imagine that either way somebody could suffer and I wonder if this strategy should be configurable?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You can only request the common minimum amount from upstream if the front-queue is bounded, even if some child subscribers request more. These may run ahead but then stay idle until the 'slower ones' catch up.

@davidmoten
Copy link
Collaborator

By the way thanks for the comments in the code, they are just what I need!

@akarnokd
Copy link
Member Author

I'm preparing a JMH benchmark and it seems the current master fails with MissingBackpressureException on simple chains such as:

@Test
public void testObserveOn() {
    ConnectableObservable<Integer> co = Observable.range(0, 1000).publish();
    Observable<Integer> obs = co.observeOn(Schedulers.computation());
    for (int i = 0; i < 100; i++) {
        TestSubscriber<Integer> ts = new TestSubscriber<Integer>();
        obs.subscribe(ts);
        Subscription s = co.connect();
        ts.awaitTerminalEvent(2, TimeUnit.SECONDS);
        ts.assertTerminalEvent();
        ts.assertNoErrors();
        assertEquals(1000, ts.getOnNextEvents().size());
        s.unsubscribe();
    }
}

I'll post and merge the PR for the perf test so it can be run independent of this PR.

@akarnokd akarnokd force-pushed the OperatorPublishRewrite branch from a9f041b to bf12d32 Compare April 23, 2015 14:17
@akarnokd
Copy link
Member Author

I'll post its benchmark result but it takes a looong time. So far, I'm impressed (sync, 1 batch, 1 child, 1M elements):
master: 3.822 ops/s
this: 18.810 ops/s

@akarnokd
Copy link
Member Author

Here is the benchmark result for this PR in JMH Comparison GUI format. This PR outperforms the current master by 1.5 - 4x most of the time. However, some async parameter combination timed out despite the high CPU usage; I'll look into those cases.

Edit:
The performance degradation appears to be due to the ReentrantLock unparking its successor in SubscriptionList, similar to #2857.

*/
continue;
/*
* Note: although technically corrent, concurrent disconnects can cause
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

By concurrent disconnect, I assume you mean concurrent subscribe/disconnect? If so then yes, the child subscribers may never receive anything if it is not re-connected. But that is exactly correct behavior if that's what happens.

Am I understanding your comment here correctly?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, the behavior implemented and what you said is the correct one.

@benjchristensen
Copy link
Member

Excellent code. Excellent comments. Very well done @akarnokd. This is a very hard piece of code. I can only imagine how much time you spent on this.

As best as I can tell it's all correct. It's complicated enough I may be missing something, but everything seems correct and matches what I wish I had coded this to be originally!

benjchristensen added a commit that referenced this pull request Apr 30, 2015
@benjchristensen benjchristensen merged commit 1f3ca5a into ReactiveX:1.x Apr 30, 2015
@benjchristensen benjchristensen mentioned this pull request Apr 30, 2015
@akarnokd akarnokd deleted the OperatorPublishRewrite branch May 6, 2015 06:48
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants