Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

OperatorPublish bugs #2810

Closed
akarnokd opened this issue Mar 6, 2015 · 1 comment
Closed

OperatorPublish bugs #2810

akarnokd opened this issue Mar 6, 2015 · 1 comment
Labels
Milestone

Comments

@akarnokd
Copy link
Member

akarnokd commented Mar 6, 2015

Reconnect issue

The following test fails because in OperatorPublish, the state.getOrigin() still holds the subscriber of a previous connection:

@Test
public void testSubscribeAfterDisconnectThenConnect() {
    ConnectableObservable<Integer> source = Observable.just(1).publish();

    TestSubscriber<Integer> ts1 = new TestSubscriber<Integer>();

    source.subscribe(ts1);

    Subscription s = source.connect();

    ts1.assertReceivedOnNext(Arrays.asList(1));
    ts1.assertNoErrors();
    ts1.assertTerminalEvent();

    TestSubscriber<Integer> ts2 = new TestSubscriber<Integer>();

    source.subscribe(ts2);

    Subscription s2 = source.connect();

    ts2.assertReceivedOnNext(Arrays.asList(1));
    ts2.assertNoErrors();
    ts2.assertTerminalEvent();

    System.out.println(s);
    System.out.println(s2);
}

However, it works with share():

@Test
public void testShareReconnect() {
    Observable<Integer> source = Observable.just(1).share();

    TestSubscriber<Integer> ts1 = new TestSubscriber<Integer>();

    source.subscribe(ts1);

    TestSubscriber<Integer> ts2 = new TestSubscriber<Integer>();

    source.subscribe(ts2);

    ts1.assertNoErrors();
    ts1.assertTerminalEvent();
    ts1.assertReceivedOnNext(Arrays.asList(1));

    ts2.assertNoErrors();
    ts2.assertTerminalEvent();
    ts2.assertReceivedOnNext(Arrays.asList(1));
}
connect() is racy

In addition, the OperatorPublish.connect() method is inherently racy: two concurrent connect calls may succeed, one kicking out the upstream subscriber of the other.

OriginSubscriber.requestMore may overflow

originOutstanding.addAndGet() may turn negative

Retention of subscribers after completion event.

After making all relevant fields and classes package-private, the following test fails:

@Test
public void testNoSubscriberRetentionOnCompleted() {
    OperatorPublish<Integer> source = (OperatorPublish<Integer>)Observable.just(1).publish();

    TestSubscriber<Integer> ts1 = new TestSubscriber<Integer>();

    source.unsafeSubscribe(ts1);

    source.connect();

    ts1.assertReceivedOnNext(Arrays.asList(1));
    ts1.assertNoErrors();
    ts1.assertTerminalEvent();

    assertTrue(source.requestHandler.state.subscribers.length == 0 
            && source.requestHandler.state.ss.isEmpty());
}

Note that blindly evicting values from ss while completing in drainQueues() is wrong too, because if there is a concurrent subscription in the works, its tracking recod in state.ss and subscribers may get deleted as well, instead of letting it wait for the next connect().

/cc @davidmoten: please look at these issues while you fix OperatorPublish for #2803.

@akarnokd
Copy link
Member Author

The operator has been fixed. Closing.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

No branches or pull requests

1 participant