Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support Subject Factory with Multicast #1515

Merged
merged 1 commit into from
Jul 28, 2014

Conversation

benjchristensen
Copy link
Member

This is a proposed change to allow using a Subject factory with multicast and for methods such as publish() and replay() to use the factory so that each time ConnectableObservable.connect() is invoked, a new Subject is created and used. Otherwise, an Observable sequence can not be reused.

Use Case

        final AtomicInteger count = new AtomicInteger();
        Observable<Integer> subject1 = Observable.just(10);
        Observable<Integer> subject2 = Observable.just(20);
        Observable<Integer> combined = Observable.combineLatest(subject1, subject2, (fst, snd) -> {
            int i = count.incrementAndGet();
            System.out.println((fst + i) + " + " + (snd + i));
            return fst + snd;
        }).publish().refCount();

        combined.subscribe(
                System.out::println,
                t -> t.printStackTrace(),
                () -> System.out.println("Completed"));

        combined.subscribe(
                System.out::println,
                t -> t.printStackTrace(),
                () -> System.out.println("Completed"));

Output without these changes:

11 + 21
30
Completed
Completed

Output with changes:

11 + 21
30
Completed
12 + 22
30
Completed

Test Case

    @Test
    public void testConnectDisconnectConnectAndSubjectState() {
        Observable<Integer> o1 = Observable.just(10);
        Observable<Integer> o2 = Observable.just(20);
        Observable<Integer> combined = Observable.combineLatest(o1, o2, new Func2<Integer, Integer, Integer>() {

            @Override
            public Integer call(Integer t1, Integer t2) {
                return t1 + t2;
            }

        }).publish().refCount();

        TestSubscriber<Integer> ts1 = new TestSubscriber<Integer>();
        TestSubscriber<Integer> ts2 = new TestSubscriber<Integer>();

        combined.subscribe(ts1);
        combined.subscribe(ts2);

        ts1.assertTerminalEvent();
        ts1.assertNoErrors();
        ts1.assertReceivedOnNext(Arrays.asList(30));

        ts2.assertTerminalEvent();
        ts2.assertNoErrors();
        ts2.assertReceivedOnNext(Arrays.asList(30));
    }

@benjchristensen
Copy link
Member Author

/cc @headinthebox and @mattpodwysocki This is based on our email conversation and is a proposed change to make publish().refCount() and other similar multicast use cases behave as a user would expect.

@cloudbees-pull-request-builder

RxJava-pull-requests #1436 SUCCESS
This pull request looks good

@benjchristensen
Copy link
Member Author

Here is another example to demonstrate why this change is probably a good thing.

Output without this pull request:

[0, 1, 2, 3, 4]
[0, 1, 2, 3, 4]
[0, 1, 2, 3]
[0, 1, 2, 3, 4]
[0]
[]
Start again ...
[]

With this pull request (what a user would expect):

[0, 1, 2, 3, 4, 5, 6, 7]
[0, 1, 2, 3, 4, 5, 6, 7, 8]
[0]
[0, 1]
[]
Start again ...
[0, 1, 2, 3, 4, 5, 6, 7]
[0, 1, 2, 3, 4, 5]
[0, 1, 2]
[0, 1, 2]
[]

Example code:

import java.util.List;
import java.util.concurrent.TimeUnit;

import rx.Observable;
import rx.Subscriber;
import rx.schedulers.Schedulers;

public class DebounceBuffer {

    public static void main(String args[]) {
        /* The following will emit a buffered list as it is debounced */
        // first we multicast the stream ... using refCount so it handles the subscribe/unsubscribe
        Observable<Integer> burstStream = intermittentBursts().take(20).publish().refCount();
        // then we get the debounced version
        Observable<Integer> debounced = burstStream.debounce(10, TimeUnit.MILLISECONDS);
        // then the buffered one that uses the debounced stream to demark window start/stop
        Observable<List<Integer>> buffered = burstStream.buffer(debounced);
        // then we subscribe to the buffered stream so it does what we want
        buffered.toBlocking().forEach(System.out::println);

        System.out.println("Start again ...");

        buffered.toBlocking().forEach(System.out::println);
    }

    public static Observable<Integer> intermittentBursts() {
        return Observable.create((Subscriber<? super Integer> s) -> {
            while (!s.isUnsubscribed()) {
                // burst some number of items
                for (int i = 0; i < Math.random() * 20; i++) {
                    s.onNext(i);
                }
                try {
                    // sleep for a random amount of time
                    Thread.sleep((long) (Math.random() * 1000));
                } catch (Exception e) {
                    // do nothing
                }
            }
        }).subscribeOn(Schedulers.newThread()); // use newThread since we are using sleep to block
    }

}

@headinthebox
Copy link
Contributor

+1 here for the change. It proves subjects are nasty things. And that anything that has side-effects should be ()=>...

I will loose some quizzes in my Rx course, because it is fun to show what happens when you reconnect to an infinite vs a finite sequence.

@benjchristensen
Copy link
Member Author

I will loose some quizzes in my Rx course, because it is fun to show what happens when you reconnect to an infinite vs a finite sequence.

:-)

Thanks for the review. Merging this...

benjchristensen added a commit that referenced this pull request Jul 28, 2014
Support Subject Factory with Multicast
@benjchristensen benjchristensen merged commit cafabff into ReactiveX:master Jul 28, 2014
@benjchristensen benjchristensen deleted the multicast-factory branch July 28, 2014 16:10
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants