Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Exception not properly propagated #417

Closed
samuelgruetter opened this issue Sep 30, 2013 · 10 comments
Closed

Exception not properly propagated #417

samuelgruetter opened this issue Sep 30, 2013 · 10 comments

Comments

@samuelgruetter
Copy link
Contributor

Given

static Func1<Integer, Integer> f = new Func1<Integer, Integer>() {
    public Integer call(Integer arg0) {
        throw new RuntimeException("the error in f");
    }
};

when I run this:

public static void main(String[] args) {
    System.out.println("started");
    System.out.println(Observable.from(1)
        .map(f).toBlockingObservable().single());
    System.out.println("done");
}

I get (as expected) this output:

started
Exception in thread "main" java.lang.RuntimeException: the error in f
    at RxJavaHelloWorld$2.call(RxJavaHelloWorld.java:226)
    at [blah blah...]
    at RxJavaHelloWorld.main(RxJavaHelloWorld.java:240)

However, when I observe it on a different thread:

public static void main(String[] args) {
    System.out.println("started");
    System.out.println(Observable.from(1).observeOn(Schedulers.threadPoolForComputation())
        .map(f).toBlockingObservable().single());
    System.out.println("done");
}

then the output is this:

started

and the application does not terminate.
But I would expect that no matter on what thread I observe, errors are always propagated.

@samuelgruetter
Copy link
Contributor Author

Here the error occurs even without observeOn:

System.out.println(Observable.from(1, 2, 3).take(1).map(f).toBlockingObservable().single());

This snippet also makes the app never terminate.

This prevents me from implementing the head operation in the Scala adaptor, and the original one with observeOn prevents me from implementing a nice constructor.

@zsxwing
Copy link
Member

zsxwing commented Oct 12, 2013

Here the error occurs even without observeOn:

System.out.println(Observable.from(1, 2, 3).take(1).map(f).toBlockingObservable().single());
This snippet also makes the app never terminate.

For take(1), after the observer receives a value, any later value or error will be ignored, even if onNext(the first value) throws an exception. The related codes:

            @Override
            public void onError(Throwable e) {
                if (counter.getAndSet(num) < num) {
                    observer.onError(e);
                }
            }

            @Override
            public void onNext(T args) {
                final int count = counter.incrementAndGet();
                if (count <= num) {
                    observer.onNext(args);
                    if (count == num) {
                        observer.onCompleted();
                    }
                }
                if (count >= num) {
                    // this will work if the sequence is asynchronous, it will have no effect on a synchronous observable
                    subscription.unsubscribe();
                }
            }

For take(1), it will increment count at first. Even if observer.onNext(args); throws an exception, as count == num, the exception is not propagated.

@zsxwing
Copy link
Member

zsxwing commented Oct 12, 2013

public static void main(String[] args) {
System.out.println("started");
System.out.println(Observable.from(1).observeOn(Schedulers.threadPoolForComputation())
.map(f).toBlockingObservable().single());
System.out.println("done");
}

This issue is because rx.operators.ScheduledObserver.EventLoop does not handle the exception threw from the onNext method of a observer. The related codes:

                    switch (notification.getKind()) {
                    case OnNext:
                        underlying.onNext(notification.getValue());
                        break;
                    case OnError:
                        underlying.onError(notification.getThrowable());
                        break;
                    case OnCompleted:
                        underlying.onCompleted();
                        break;
                    default:
                        throw new IllegalStateException("Unknown kind of notification " + notification);
                    }

@zsxwing
Copy link
Member

zsxwing commented Oct 13, 2013

@benjchristensen I encountered one problem which I had never realized before when I tried to solve this issue.

From the Observable's perspective, it can guarantee that it calls onCompleted of an Observer after all onNext calls. However, from the Observer's perspective, if onNext and onCompleted run in different threads, some weird thing may happen.

For example, an Observable calls onNext in thread t1. After 5ms, it calls onCompleted in thread t2. If t1 is suspended at once after it enters the onNext method. At this moment, none statement in onNext is executed. So now the Observer does not know its onNext has been invoked. Suppose t1 is suspended in 100ms, and t2 is not be suspended. So the Observer will find that onCompleted is invoked first, and after 95ms, the onNext method is called.

I really think such case is weird for the Observer. Could you point out where I misunderstood?

@zsxwing
Copy link
Member

zsxwing commented Oct 19, 2013

I read the article about concurrency in Rx: http://www.introtorx.com/Content/v1.0.10621.0/15_SchedulingAndThreading.html.
But I'm still confused. Here are my problems:

  • Should an Observable call all onNext, onCompleted and onError in the same thread, if no scheduler is involved?
  • What's the contract when an Observable encounters a thread pool scheduler?

@zsxwing
Copy link
Member

zsxwing commented Oct 21, 2013

@benjchristensen, one more question. If an Observer throws an exception in onNext, does its onError have to be called? If so, there may be complicated to fix the issue in rx.operators.ScheduledObserver.EventLoop.

For example, in @samuelgruetter 's example,

public static void main(String[] args) {
    System.out.println("started");
    System.out.println(Observable.from(1).observeOn(Schedulers.threadPoolForComputation())
        .map(f).toBlockingObservable().single());
    System.out.println("done");
}

EventLoop will receive two notifications: onNext(1) and onCompleted. However, as they are out of order, onCompleted may arrive first. So the observer will be notified onCompleted. Then, when onNext(1) is called, the observer will need to be notified onError as onNext throws an exception. In such situation, the onCompleted and onError will be both called.

I'm trapped in here.

Due to onCompleted and onError, concurrency in RxJava is different from Rx.Net. Is there an wiki page about concurrency in RxJava?

@benjchristensen
Copy link
Member

You should read the "Rx Design Guidelines" => http://blogs.msdn.com/b/rxteam/archive/2010/10/28/rx-design-guidelines.aspx

Should an Observable call all onNext, onCompleted and onError in the same thread, if no scheduler is involved?

There is nothing requiring it be the same thread necessarily, but it is required that it being sequential and synchronized so the Observer doesn't need to worry about memory visibility, interleaving calls etc.

See section 6.7 and 6.8 of the design guidelines.

What's the contract when an Observable encounters a thread pool scheduler?

It's the same, a Scheduler is for moving location and time but does not change the contract. A single Observable must still be serialized. Anything in RxJava doing differently is a bug and needs to be fixed.

We iterated several times on ObserveOn to get it to hopefully behave correctly so that each event it receives is put on an event loop and processed sequentially on whatever Scheduler it is given.

@benjchristensen
Copy link
Member

EventLoop will receive two notifications: onNext(1) and onCompleted. However, as they are out of order, onCompleted may arrive first.

If they are out-of-order then we still don't have ObserveOn working correctly and it's a bug.

@zsxwing
Copy link
Member

zsxwing commented Oct 23, 2013

Thanks for your help. "Rx Design Guidelines" is really helpful.

zsxwing added a commit to zsxwing/RxJava that referenced this issue Oct 25, 2013
benjchristensen added a commit that referenced this issue Oct 31, 2013
@zsxwing
Copy link
Member

zsxwing commented Nov 1, 2013

Now this issue should be fixed in PR #453 and #441. It can be closed.

rickbw pushed a commit to rickbw/RxJava that referenced this issue Jan 9, 2014
rickbw pushed a commit to rickbw/RxJava that referenced this issue Jan 9, 2014
jihoonson pushed a commit to jihoonson/RxJava that referenced this issue Mar 6, 2020
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

3 participants