Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

"Interrupted while waiting for subscription to complete." in 1.0.0-rc8 #1804

Closed
edenman opened this issue Oct 28, 2014 · 30 comments
Closed

"Interrupted while waiting for subscription to complete." in 1.0.0-rc8 #1804

edenman opened this issue Oct 28, 2014 · 30 comments
Labels
Milestone

Comments

@edenman
Copy link

edenman commented Oct 28, 2014

Doesn't happen on rc7. I can repro 100% of the time. Happens on both KitKat and Lollipop so I don't think it's specific to any particular Android version.

FATAL EXCEPTION: RxCachedThreadScheduler-4
Process: com.mycompany.myapp.development, PID: 4413
java.lang.IllegalStateException: Exception thrown on Scheduler.Worker thread. Add `onError` handling.
        at rx.internal.schedulers.ScheduledAction.run(ScheduledAction.java:50)
        at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:422)
        at java.util.concurrent.FutureTask.run(FutureTask.java:237)
        at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:152)
        at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:265)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1112)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:587)
        at java.lang.Thread.run(Thread.java:818)
Caused by: rx.exceptions.OnErrorNotImplementedException: Interrupted while waiting for subscription to complete. 
        at rx.Observable$30.onError(Observable.java:7252)
        at rx.observers.SafeSubscriber._onError(SafeSubscriber.java:127)
        at rx.observers.SafeSubscriber.onError(SafeSubscriber.java:96)
        at rx.internal.operators.OperatorMap$1.onError(OperatorMap.java:48)
        at rx.internal.operators.OperatorMap$1.onNext(OperatorMap.java:56)
        at rx.internal.operators.NotificationLite.accept(NotificationLite.java:150)
        at rx.subjects.SubjectSubscriptionManager$SubjectObserver.emitNext(SubjectSubscriptionManager.java:254)
        at rx.subjects.BehaviorSubject.onNext(BehaviorSubject.java:166)
        at rx.Observable$34.onNext(Observable.java:7437)
        at rx.observers.SafeSubscriber.onNext(SafeSubscriber.java:104)
        at rx.internal.operators.OperatorMerge$InnerSubscriber.emit(OperatorMerge.java:610)
        at rx.internal.operators.OperatorMerge$InnerSubscriber.onNext(OperatorMerge.java:536)
        at rx.internal.operators.OperatorSingle$1.onCompleted(OperatorSingle.java:75)
        at rx.internal.operators.OperatorTake$1.onNext(OperatorTake.java:72)
        at rx.internal.operators.OperatorSubscribeOn$1$1$1.onNext(OperatorSubscribeOn.java:76)
        at com.mycompany.servercall.ServerCall$1$1.call(ServerCall.java:56)
        at com.mycompany.servercall.ServerCall$1$1.call(ServerCall.java:47)
        at rx.Observable.unsafeSubscribe(Observable.java:7464)
        at rx.internal.operators.OperatorSubscribeOn$1$1.call(OperatorSubscribeOn.java:62)
        at rx.internal.schedulers.ScheduledAction.run(ScheduledAction.java:45)
        ... 7 more 
Caused by: java.lang.RuntimeException: Interrupted while waiting for subscription to complete. 
        at rx.observables.BlockingObservable.blockForSingle(BlockingObservable.java:474)
        at rx.observables.BlockingObservable.single(BlockingObservable.java:348)
        at com.mycompany.myapp.data.CachedThing.buildCachedThing(CachedThing.java:66)
        at com.mycompany.myapp.data.CachedThing.<init>(CachedThing.java:28)
        at com.mycompany.myapp.data.FilterFunc.call(FilterFunc.java:189)
        at com.mycompany.myapp.data.FilterFunc.call(FilterFunc.java:175)
        at com.mycompany.myapp.data.CachedThing.merge(CachedThing.java:60)
        at com.mycompany.myapp.data.ResponseCache.update(ResponseCache.java:80)
        at com.mycompany.myapp.data.TransformFunc.call(TransformFunc.java:105)
        at com.mycompany.myapp.data.TransformFunc.call(TransformFunc.java:99)
        at com.mycompany.servercall.CallState$3.map(CallState.java:79)
        at com.mycompany.servercall.CallState$9.call(CallState.java:195)
        at com.mycompany.servercall.CallState$9.call(CallState.java:193)
        at rx.internal.operators.OperatorMap$1.onNext(OperatorMap.java:54)
        ... 22 more
Caused by: java.lang.InterruptedException
        at java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireSharedInterruptibly(AbstractQueuedSynchronizer.java:1279)
        at java.util.concurrent.CountDownLatch.await(CountDownLatch.java:202)
        at rx.observables.BlockingObservable.blockForSingle(BlockingObservable.java:471)
        ... 35 more
Caused by: rx.exceptions.OnErrorThrowable$OnNextValue: OnError while emitting onNext value: .class 
        at rx.internal.operators.OperatorMap$1.onNext(OperatorMap.java:56)
        ... 22 more

CachedThing.buildCachedThing has this block of code that I think is the culprit:

List<SubThing> subThings = Observable.from(things) //
        .map(transformFunction) //
        .filter(not(isNullFunction)) //
        .distinct() //
        .toList() //
        .toBlocking() //
        .single();

I tried writing a testcase to repro but couldn't get it to fail. I'm guessing there's something broken between the android runtime and rxjava's thread management, but I couldn't get any further than that.

@benjchristensen benjchristensen added this to the 1.0 milestone Oct 28, 2014
@benjchristensen
Copy link
Member

The only thing in rc8 that seems related is this: #1793

@benjchristensen
Copy link
Member

I can not replicate with this code:

import java.util.List;

import rx.functions.Func1;

public class Testing {

    public static void main(String... args) {
        List<String> subThings = Observable.range(0, 1000)
                .map(new Func1<Integer, String>() {

                    @Override
                    public String call(Integer i) {
                        return String.valueOf(i + 1);
                    }

                })
                .filter(new Func1<String, Boolean>() {

                    @Override
                    public Boolean call(String t) {
                        return t != null;
                    }

                }) //
                .distinct() //
                .toList() //
                .toBlocking() //
                .single();

        System.out.println(subThings);
    }
}

@benjchristensen
Copy link
Member

The code that is being interrupted is this: https://github.com/ReactiveX/RxJava/blob/1.x/src/main/java/rx/observables/BlockingObservable.java#L473

        try {
            latch.await();
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            throw new RuntimeException("Interrupted while waiting for subscription to complete.", e);
        }

This means the thread was blocked on the latch waiting for a result but while it was waiting it got interrupted.

I don't know what thread you are doing this on, nor do I know much about Android. If Android is using pools of threads it could perhaps retain the right to interrupt and reclaim threads? I know event loops in some frameworks do that kind of thing if something is blocked in them and new work is scheduled.

RxJava itself does not interrupt threads anywhere (that I'm aware of ... and I can't think of any reason we would) so I'm inclined to suggest looking at what thread you are blocking in and see if Android itself could be interrupting it?

Also, could you move to a model where you don't block and instead compose it all together reactively?

@benjchristensen benjchristensen modified the milestones: 1.0.x, 1.0 Nov 2, 2014
@benjchristensen
Copy link
Member

Moving to 1.0.x as it's not obvious yet what is going on here.

@Alexander--
Copy link

Encountered this on Jellybean. Code in my Application subclass:

Observable<SomeSharedPreferencesWrapper> prefsPrefetch = Async.start(this::prefetchPrefs);

And later in same class:

return prefsPrefetch.toBlocking().single();

I can't grasp exact circumstances, when issue reproduces itself. At first glance they seem to be random, but happens often enough to make prefetching stuff with RxJava pretty much impossible.

@edenman
Copy link
Author

edenman commented Nov 5, 2014

I'm gonna work on putting together a minimal test Android app to try and isolate the issue.

@edenman
Copy link
Author

edenman commented Nov 5, 2014

https://dl.dropboxusercontent.com/u/7829307/RxJavaBlockingBug.zip

I wasn't able to repro until I added the PublishSubject layer...maybe the issue is the nested toBlocking() calls?

@benjchristensen
Copy link
Member

Does Android ever interrupt a thread based on user or system activity? If so then the toBlocking() will always be vulnerable to interruption since anytime a thread is blocked/waiting it can be interrupted.

Is request.onNext ever being called concurrently, or is it sequentially? I can't tell for sure but it looks okay as it looks like it's only ever the single UI thread that would trigger an onNext. If you intend on calling it concurrently wrap it in a SerializedSubject.

If the UI event listener triggers before ObserveThings.start completes then request may not yet be initialized, or the pipeline may not yet be initialized. It looks like that could be race condition ... but I don't understand how MainActivity works well enough. Is that all on the UI thread, or does a background thread do that and then the setOnClickListener register with the UI thread?

I don't particularly see anything wrong in this code ... though to be more idiomatic I would suggest using the Observable sequence to manipulate the data instead of doing it inside the subscribe. You really should never need to use toBlocking. The only 3 times that is ever intended are:

  1. Example code in a main method
  2. Unit tests (and even then you should probably use TestSubscribe.awaitTerminalEvent instead)
  3. Bridging between a blocking and non-blocking system, such as a Servlet

In this case, instead of doing flatMap.subscribe(all logic here) do something like this instead:

request.flatMap(mapResult)
.map(transformFunction)
.filter(notNull())
.distinct()
.toList()
.subscribe(yourListHere)

This is more idiomatic, doesn't involve creating another Observable and never requires doing the toBlocking stuff.... which is kind of the point of Rx. You should only ever have 1 subscribe at the very end of your flow and it should just be the side-effect, no actual processing in it.

@edenman
Copy link
Author

edenman commented Nov 6, 2014

MainActivity all happens on the main thread, as does the onClickHandler.

I've worked around the problem in our app by no longer using toBlocking in our subscribe method, but I still think there might be an Rx bug here. Android shouldn't interrupt unless there's a deadlock of some sort, and I'm afraid the nested toBlocking calls (discouraged though they may be) are causing the thread to stall. Regardless, the workaround is easy, so I don't think this should be a 1.0 blocker. Thanks for looking into it!

@headinthebox
Copy link
Contributor

You should really only ever call to blocking in command line test scripts but never in production code.

If you really need to end up blocking, you may as well push all the blocking backwardsvand write regular synchrounius code.

@headinthebox
Copy link
Contributor

Maybe we should mark toblocking as depricated (forever) such that you get a warning every time you use it.

@edenman
Copy link
Author

edenman commented Nov 6, 2014

Deprecated seems wrong to me since it's never actually going to be removed. I'm all in favor of updating the docs to reflect the recommended uses of toBlocking, though.

Or maybe deprecate toBlocking() and delegate to a new method called areYouReallySureYouNeedToBlocking() or something obnoxious like that?

@Alexander--
Copy link

You should really only ever call to blocking in command line test scripts but never in production code.

Not everyone have their entire codebase under their control. Also in Android there are already plenty of ways to move tasks to background threads: all those legacy Loaders, AsyncTasks and AbstractThreadedSyncAdapters requires certain data to be available directly without Observable shenanigans.

Does Android ever interrupt a thread based on user or system activity?

I am not sure about this one, but FutureTask/ExecutorService combo was working just fine before migrating to RxJava (so was the previous version of RxJava, before updating to rc8).

@benjchristensen
Copy link
Member

@edenman

Android shouldn't interrupt unless there's a deadlock of some sort, and I'm afraid the nested toBlocking calls (discouraged though they may be) are causing the thread to stall.

Can you provide a reference to documentation about how Android does this? That would be helpful. If it does this then any sort of blocking would be vulnerable to interruption.

Not everyone have their entire codebase under their control.

Understood, though I still suggest putting most of the processing into the Observable chain and not even having a subscribe step and just going direct to toBlocking() if you actually are just using it as an Iterable with higher-order functions.

@benjchristensen
Copy link
Member

If this doesn't happen on rc7 but does on rc8 then let's binary search what is causing it. Here is what changed: https://github.com/ReactiveX/RxJava/releases/tag/v1.0.0-rc.8

This is the only change that may be related: https://github.com/ReactiveX/RxJava/pull/1793/files

Note how it now triggers the unsubscribe upwards before the onNext is emitted whereas before it was after.

This could theoretically result in work upstream canceling a Future which interrupts a thread if everything is happening on the same thread.

@edenman
Copy link
Author

edenman commented Nov 7, 2014

Can't find any Android docs that describe what we're seeing. Here's the docs on ANR (Application Not Responding) but we're seeing an actual crash.

Maybe worth seeing if it's rxjava itself that is interrupting the thread?

@benjchristensen
Copy link
Member

Maybe worth seeing if it's rxjava itself that is interrupting the thread?

Yup, I pointed to a change in rc8 above that could potentially cause this. I'm trying to hunt down options.

@benjchristensen
Copy link
Member

Here are the two places I can see that may result in an interrupt:

The first one is used by virtually all use of Schedulers to allow canceling work on a Scheduler.

I still can't replicate an interrupt with take but conceptually I can see how an interrupt could happen.

@benjchristensen
Copy link
Member

I can replicate:

    @Test
    public void testInterrupt() throws InterruptedException {
        final AtomicReference<Object> exception = new AtomicReference<Object>();
        final CountDownLatch latch = new CountDownLatch(1);
        Observable.just(1).subscribeOn(Schedulers.computation()).take(1).subscribe(new Action1<Integer>() {

            @Override
            public void call(Integer t1) {
                try {
                    Observable.just(t1).toBlocking().single();
                } catch (Exception e) {
                    exception.set(e);
                    e.printStackTrace();
                } finally {
                    latch.countDown();
                }
            }

        });

        latch.await();
        assertNull(exception.get());
    }
cancelling ... and can interrupt
java.lang.RuntimeException: Interrupted while waiting for subscription to complete.
    at rx.observables.BlockingObservable.blockForSingle(BlockingObservable.java:475)
    at rx.observables.BlockingObservable.single(BlockingObservable.java:349)
    at rx.internal.operators.OperatorTakeTest$12.call(OperatorTakeTest.java:376)
    at rx.internal.operators.OperatorTakeTest$12.call(OperatorTakeTest.java:1)
    at rx.Observable$31.onNext(Observable.java:7209)
    at rx.observers.SafeSubscriber.onNext(SafeSubscriber.java:104)
    at rx.internal.operators.OperatorTake$1.onNext(OperatorTake.java:70)
    at rx.internal.operators.OnSubscribeRedo$2$1.onNext(OnSubscribeRedo.java:229)
    at rx.internal.operators.OperatorSubscribeOn$1$1$1.onNext(OperatorSubscribeOn.java:76)
    at rx.internal.util.ScalarSynchronousObservable$1.call(ScalarSynchronousObservable.java:41)
    at rx.internal.util.ScalarSynchronousObservable$1.call(ScalarSynchronousObservable.java:1)
    at rx.Observable.unsafeSubscribe(Observable.java:7374)
    at rx.internal.operators.OperatorSubscribeOn$1$1.call(OperatorSubscribeOn.java:62)
    at rx.internal.schedulers.ScheduledAction.run(ScheduledAction.java:45)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
    at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.InterruptedException
    at java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireSharedInterruptibly(AbstractQueuedSynchronizer.java:1302)
    at java.util.concurrent.CountDownLatch.await(CountDownLatch.java:231)
    at rx.observables.BlockingObservable.blockForSingle(BlockingObservable.java:472)
    ... 20 more

Reverting #1793 does fix this.

So now to figure out if #1793 can be achieved in a different way, or if we need to solve the interruption issue.

@benjchristensen
Copy link
Member

Here is a simpler test:

    @Test
    public void testInterrupt() throws InterruptedException {
        final AtomicReference<Object> exception = new AtomicReference<Object>();
        final CountDownLatch latch = new CountDownLatch(1);
        Observable.just(1).subscribeOn(Schedulers.computation()).take(1).subscribe(new Action1<Integer>() {

            @Override
            public void call(Integer t1) {
                try {
                    Thread.sleep(100);
                } catch (Exception e) {
                    exception.set(e);
                    e.printStackTrace();
                } finally {
                    latch.countDown();
                }
            }

        });

        latch.await();
        assertNull(exception.get());
    }

@benjchristensen
Copy link
Member

Ugh ... choosing whether to default to interrupting or not is a difficult one. It seems that perhaps we should not. Any insights on this?

@benjchristensen
Copy link
Member

I have submitted a possible fix but want to think about this more.

I can't find an authoritative answer on whether we should default to interrupting or not when the scheduled future is canceled. I think we should change to not interrupting but want to be sure that's correct as we've had it set to interrupt all along.

@headinthebox
Copy link
Contributor

but want to think about this more.

same here.

@roman-mazur
Copy link

A quick note about Thread interruptions on Android. AFAIK application process is killed in case of ANRs (either automatically or after a user confirmation). So ANRs do not lead to thread interruptions.

However, there is a case when I use toBlocking and get an interruption: sync adapter thread.

On Android we can create a component that will be plugged to the sync part of the framework. Basically Android framework spawns a new thread for you and invokes your code in that thread. Application sync is supposed to be finished when this thread finishes. In our sync adapter code we build a chain of observables that can perform some operations concurrently using other threads. So the sync adapter code invoked in the sync thread looks like buildChainOfObservables().toBlocking().single();.
Sync can be canceled (e.g. by user request, unchecking a checkbox in system settings, or when it takes too long). And a default implementation of cancelation request is this sync thread interruption.

@benjchristensen
Copy link
Member

@headinthebox A consideration here is that the place we are interrupting the threads is really just for unscheduling any scheduled actions from a Scheduler. I think we intend subscription.isUnsubscribed() to be the mechanism for something to gracefully stop.

It seems to me that we should not interrupt a thread automatically and that if a developer needs to do that their Observable should register a Subscription via subscriber.add that chooses to do so.

We can work around this particular take issue, but it makes me wonder if we would trigger this type of issue anywhere else. It seems that interrupting threads is a very nuanced thing that most code doesn't handle well and that we shouldn't be interrupting threads "under the covers".

Therefore I suggest we change from future.cancel(true) to future.cancel(false) for the 2 places we capture a Future and cancel it when unsubscribing.

@benjchristensen
Copy link
Member

@roman-mazur Thanks for that information, it is useful information. If I understand correctly you are saying there are normal cases when Android can and will interrupt a thread and thus application code must account for this. Is this what you're saying?

As far as RxJava is concerned, I'm suggesting we eliminate the two places where RxJava is the culprit for interrupting the thread (despite #1832 working around the particular issue of using single that triggers the issue).

@Alexander--
Copy link

It seems to me that we should not interrupt a thread automatically

Wait, so you DO interrupt threads right now, don't you? Does that mean, that unsubscribing from Observale (in currently released version), during interruptable operation will interrupt it? Will operations, created with async-utils (e.g. via Async.fromCallable()) be interrupted? Or is it some internal mechanic for disposing of Schedulers only?

If former, it would be really cool to leave it be.

If I understand correctly you are saying there are normal cases when Android can and will interrupt a thread and thus application code must account for this. Is this what you're saying?

I wouldn't call AbstractThreadedSyncAdpter a normal case. Interrupting worker thread is just an implementation detail, which can be changed by overriding single method. Also Sync Framework in general is rather special and restrictive environment, and interrupting threads isn't worst thing awaiting developer there (for example, failure to intercept any exceptions may result in Sync Adapter being completely banned from execution by OS).

@roman-mazur
Copy link

@benjchristensen I would rather say that Android framework is unlikely to interrupt an application thread. The sync worker is the only known example. And interruption is a default behaviour that can be changed.

But note that I'm forced to use toBlocking within this thread because there is no other way to indicate that sync is finished.
I wonder what happens when toBlocking().single() is interrupted. As far as I understand an error is propagated. Does it lead to un-subscriptions?

@benjchristensen
Copy link
Member

@roman-mazur

I wonder what happens when toBlocking().single() is interrupted. As far as I understand an error is propagated. Does it lead to un-subscriptions?

If it is interrupted while waiting on the single item we just fail right now. I suppose we could call unsubscribe upwards before throwing.

This is what currently happens:

        try {
            latch.await();
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            throw new RuntimeException("Interrupted while waiting for subscription to complete.", e);
        }

@benjchristensen
Copy link
Member

@Alexander--

Or is it some internal mechanic for disposing of Schedulers only?

RxJava core libraries only interrupt when unsubscribing a scheduled action on a Scheduler that is still running. This is in turn just leveraging the Future.cancel semantics of Java. RxJava does not actually ever call Thread.interrupt.

The async-utils are just utility methods on top of Schedulers so unless there is some effort to specifically prevent interrupt (nothing I can see while browsing the code) they will also be interrupted if they are still executing when unsubscribe is invoked.

unsubscribing from Observale (in currently released version), during interruptable operation will interrupt it?

If a Scheduler is not involved RxJava itself does nothing other than invoke Subscription.unSubscribe which flips a boolean and calls any registered callbacks. Thus, an Observable implementation can choose to register a Subscription that then interrupts and interruptible unit of work.

I'm wondering if that is better for an Observable implementation (via Observable.create) to opt-in to interrupts rather than the current Scheduler interrupt mechanism which seems like it could be surprising.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

No branches or pull requests

5 participants