Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implemented the Merge overloads #718

Merged
merged 2 commits into from
Jan 14, 2014

Conversation

zsxwing
Copy link
Member

@zsxwing zsxwing commented Jan 3, 2014

Hi, this PR implemented the overloads of merge in #62. Please take a look.

@cloudbees-pull-request-builder

RxJava-pull-requests #635 FAILURE
Looks like there's a problem with this pull request

@cloudbees-pull-request-builder

RxJava-pull-requests #636 FAILURE
Looks like there's a problem with this pull request

@zsxwing
Copy link
Member Author

zsxwing commented Jan 3, 2014

I checked the failure in the groovy unit tests. Here is the problem.

In the groovy unit test, there is an assumption. The following Observable

Observable.merge(
    Observable.from(6),                                                                                                                        
    Observable.error(new NullPointerException()),                                                                                              
    Observable.from(7)
)

should emit 6, and emit a NullPointerException later.

However, my implementation uses from(Iterable<? extends T> iterable) to create an Observable<? extends Observable<? extends T>>. The default scheduler is Schedulers.currentThread(), which make the previous assumption failed. As emitting 6 will be delayed, so the Observable will only emit a NullPointerException.

@zsxwing
Copy link
Member Author

zsxwing commented Jan 3, 2014

I suppose merge does not need to provide such order guarantee.

Thoughts?

@benjchristensen
Copy link
Member

Why would 6 be delayed? CurrentThreadScheduler doesn't change the order. It should subscribe to them in order, particularly since there is no concurrency and thus no race conditions here?

@zsxwing
Copy link
Member Author

zsxwing commented Jan 4, 2014

I wanna use the following codes to discuss:

        Observable<Integer> o1 = Observable.from(6);
        Observable<Integer> o2 = Observable.<Integer> error(new NullPointerException());
        Observable<Integer> o3 = Observable.from(7);
        Observable<Integer> o = Observable.merge(o1, o2, o3);

In this example, when we subscribe o, the order is:

subscribe o (schedule (emit o1, emit o2, emit o3))
emit o1
subscribe o1 (schedule(emit 6))
emit o2
subscribe o2 (NullPointerException)
emit o3
subscribe o3 (schedule(emit 7))
emit 6
emit 7

The problem is that Observable.error(new NullPointerException()) throws the exception directly when it's subscribed. The behavior is like using Schedulers.immediate(). So actually, we can say that there are two types of Scheduler involved here.
If we wanna guarantee the order, Observable.error(new NullPointerException()) need to be changed to Observable.error(new NullPointerException(), Schedulers.currentThread()).

@zsxwing
Copy link
Member Author

zsxwing commented Jan 4, 2014

Just found using Schedulers.currentThread() in error still can't fix this problem.
For the nested merge, for example,

123        Observable.merge(                                                                                                                                  
124                Observable.from(1, 2, 3),                                                                                                                  
125                Observable.merge(                                                                                                                          
126                    Observable.from(6),                                                                                                                        
127                    Observable.error(new NullPointerException(), Schedulers.currentThread()),                                                                                              
128                    Observable.from(7)),                                                                                                                       
129                Observable.from(4, 5))                                                                                                                     
130                .subscribe({ result -> a.received(result)}, { exception -> a.error(exception)});  

The order is,

1
2
3
4
5
6
NullPointerException
7

Now I can only find two solutions,

  • Do not promise any order.
  • Use Schedulers.immediate() when using from(Iterable<? extends T> iterable) to create an Observable<? extends Observable<? extends T>>, just like the previous merge implementation.

@cloudbees-pull-request-builder

RxJava-pull-requests #643 SUCCESS
This pull request looks good

@zsxwing
Copy link
Member Author

zsxwing commented Jan 7, 2014

Rebased. #724 helped pass the groovy tests.

@benjchristensen
Copy link
Member

I'm holding off on this for now as I'm stabilizing 0.16.0 for release and am already in the midst of testing on Netflix production canaries. I'll review this for 0.16.1 once 0.16.0 is released.

benjchristensen added a commit that referenced this pull request Jan 14, 2014
Implemented the Merge overloads
@benjchristensen benjchristensen merged commit dadf17b into ReactiveX:master Jan 14, 2014
@zsxwing zsxwing deleted the merge-overloads branch January 14, 2014 05:00
jihoonson pushed a commit to jihoonson/RxJava that referenced this pull request Mar 6, 2020
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants