-
Notifications
You must be signed in to change notification settings - Fork 7.6k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Unsubscribing does not work when using subscribeOn(Schedulers.newThread()) #431
Comments
Could you provide a unit test to reveal this issue? I tried similar codes but did not find the bug you described. |
I've observed similar behaviour using rxjava 0.14.6, also on Android when attempting to unsubscribe from a Subscription created using The failure is fairly easy to reproduce in the context of the application, although it's inconsistent as often the subscriptions are successfully unsubscribed. |
This test demonstrates the problem, basically the Observable never receives unsubscribes even though the Observer no longer receives notifications:
Output is: generated 0 If I change the subscribeOn thread to Schedulers.threadPoolForIO() then the output is: generated 0 |
I'll take a look. |
….newThread()) I believe this fixes ReactiveX#431
I think this is fixed in #472 Can someone confirm? |
Yes looks fixed to me. |
Great, I'll release today or tomorrow. |
Hello guys - I'm on 0.15.1 and still see the issue, on any scheduler you subscribeOn. |
The test program above prints the values expected when run with the current master, but the whole program just doesn't quit after that. I see three RxNewThreadScheduler threads (2 core machine), all of them waiting for Edit: Maybe they should be regular threads, but the executor running them should be allowed to timeout its single core thread since NewThreadScheduler will start a new pool anyway. private static class EventLoopScheduler extends Scheduler {
private final ExecutorService executor;
private EventLoopScheduler() {
ThreadPoolExecutor e = (ThreadPoolExecutor)Executors.newFixedThreadPool(1, new ThreadFactory() {
@Override
public Thread newThread(Runnable r) {
return new Thread(r, "RxNewThreadScheduler-" + count.incrementAndGet());
}
});
e.setKeepAliveTime(1, TimeUnit.SECONDS);
e.allowCoreThreadTimeOut(true);
executor = e;
}
...
} |
This matches the behavior of Schedulers.COMPUTATION_EXECUTOR and Schedulers.IO_EXECUTOR. See https://groups.google.com/forum/#!topic/rxjava/Qe1qi0aHtnE and ReactiveX#431 (comment)
….newThread()) I believe this fixes ReactiveX#431
This matches the behavior of Schedulers.COMPUTATION_EXECUTOR and Schedulers.IO_EXECUTOR. See https://groups.google.com/forum/#!topic/rxjava/Qe1qi0aHtnE and ReactiveX#431 (comment)
I'm having trouble unsubscribing whenever I use
Observable.subscribeOn(Schedulers.newThread())
on Android. I'm using rxjava 0.14.3. In my case, my observer class does the following:The
getData()
method creates an observable, and the onSubscribeFunc generates some mock data using a new thread:When I unsubscribe the (Composite) subscription I got in the observer,
isUnsubscribed()
never returns true in the worker thread andBooleanSubscription.unsubscribe()
never gets called (breakpoint never triggered). I stepped through theCompositeSubscription.unsubscribe()
method, and it seems that it doesn't even contain theBooleanSubscription
anywhere in thekeySet
.The interesting thing is that if I call
subscribeOn(Schedulers.threadPoolForComputation())
or call it usingAndroidSchedulers.mainThread()
, the unsubscription works correctly. Am I misusing theSchedulers.newThread()
method or does the NewThreadScheduler have a bug?The text was updated successfully, but these errors were encountered: