Skip to content

Commit

Permalink
2.x: fix Observable.repeatWhen & retryWhen not disposing the inner (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
akarnokd authored Oct 31, 2016
1 parent 3300d19 commit ff282b9
Show file tree
Hide file tree
Showing 6 changed files with 123 additions and 17 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -13,40 +13,42 @@

package io.reactivex.internal.observers;

import java.util.concurrent.atomic.AtomicReference;

import io.reactivex.*;
import io.reactivex.disposables.Disposable;
import io.reactivex.exceptions.*;
import io.reactivex.functions.Consumer;
import io.reactivex.internal.disposables.DisposableHelper;
import io.reactivex.plugins.RxJavaPlugins;

public final class ToNotificationObserver<T> implements Observer<T> {
final Consumer<? super Notification<Object>> consumer;
public final class ToNotificationObserver<T>
extends AtomicReference<Disposable>
implements Observer<T>, Disposable {
private static final long serialVersionUID = -7420197867343208289L;

Disposable s;
final Consumer<? super Notification<Object>> consumer;

public ToNotificationObserver(Consumer<? super Notification<Object>> consumer) {
this.consumer = consumer;
}

@Override
public void onSubscribe(Disposable s) {
if (DisposableHelper.validate(this.s, s)) {
this.s = s;
}
DisposableHelper.setOnce(this, s);
}

@Override
public void onNext(T t) {
if (t == null) {
s.dispose();
get().dispose();
onError(new NullPointerException("onNext called with null. Null values are generally not allowed in 2.x operators and sources."));
} else {
try {
consumer.accept(Notification.<Object>createOnNext(t));
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
s.dispose();
get().dispose();
onError(ex);
}
}
Expand All @@ -71,4 +73,14 @@ public void onComplete() {
RxJavaPlugins.onError(ex);
}
}

@Override
public void dispose() {
DisposableHelper.dispose(this);
}

@Override
public boolean isDisposed() {
return DisposableHelper.isDisposed(get());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
import io.reactivex.disposables.Disposable;
import io.reactivex.exceptions.Exceptions;
import io.reactivex.functions.*;
import io.reactivex.internal.disposables.SequentialDisposable;
import io.reactivex.internal.disposables.*;
import io.reactivex.internal.functions.ObjectHelper;
import io.reactivex.internal.observers.ToNotificationObserver;
import io.reactivex.subjects.*;
Expand All @@ -40,7 +40,14 @@ public void subscribeActual(Observer<? super T> s) {

final RedoObserver<T> parent = new RedoObserver<T>(s, subject, source);

s.onSubscribe(parent.arbiter);
ToNotificationObserver<Object> actionObserver = new ToNotificationObserver<Object>(new Consumer<Notification<Object>>() {
@Override
public void accept(Notification<Object> o) {
parent.handle(o);
}
});
ListCompositeDisposable cd = new ListCompositeDisposable(parent.arbiter, actionObserver);
s.onSubscribe(cd);

ObservableSource<?> action;

Expand All @@ -52,12 +59,7 @@ public void subscribeActual(Observer<? super T> s) {
return;
}

action.subscribe(new ToNotificationObserver<Object>(new Consumer<Notification<Object>>() {
@Override
public void accept(Notification<Object> o) {
parent.handle(o);
}
}));
action.subscribe(actionObserver);

// trigger first subscription
parent.handle(Notification.<Object>createOnNext(0));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,11 @@
import org.reactivestreams.*;

import io.reactivex.*;
import io.reactivex.disposables.Disposable;
import io.reactivex.exceptions.TestException;
import io.reactivex.functions.*;
import io.reactivex.internal.subscriptions.BooleanSubscription;
import io.reactivex.processors.PublishProcessor;
import io.reactivex.schedulers.Schedulers;
import io.reactivex.subscribers.TestSubscriber;

Expand Down Expand Up @@ -306,4 +308,25 @@ public boolean getAsBoolean() throws Exception {
.assertFailure(TestException.class, 1);
}

@Test
public void shouldDisposeInnerObservable() {
final PublishProcessor<Object> subject = PublishProcessor.create();
final Disposable disposable = Flowable.just("Leak")
.repeatWhen(new Function<Flowable<Object>, Flowable<Object>>() {
@Override
public Flowable<Object> apply(Flowable<Object> completions) throws Exception {
return completions.switchMap(new Function<Object, Flowable<Object>>() {
@Override
public Flowable<Object> apply(Object ignore) throws Exception {
return subject;
}
});
}
})
.subscribe();

assertTrue(subject.hasSubscribers());
disposable.dispose();
assertFalse(subject.hasSubscribers());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
package io.reactivex.internal.operators.flowable;

import static org.junit.Assert.*;
import static org.mockito.ArgumentMatchers.*;
import static org.mockito.Mockito.*;

import java.util.*;
Expand Down Expand Up @@ -997,4 +998,27 @@ public boolean getAsBoolean() throws Exception {
.test()
.assertResult(1, 1, 1, 1, 1);
}


@Test
public void shouldDisposeInnerObservable() {
final PublishProcessor<Object> subject = PublishProcessor.create();
final Disposable disposable = Flowable.error(new RuntimeException("Leak"))
.retryWhen(new Function<Flowable<Throwable>, Flowable<Object>>() {
@Override
public Flowable<Object> apply(Flowable<Throwable> errors) throws Exception {
return errors.switchMap(new Function<Throwable, Flowable<Object>>() {
@Override
public Flowable<Object> apply(Throwable ignore) throws Exception {
return subject;
}
});
}
})
.subscribe();

assertTrue(subject.hasSubscribers());
disposable.dispose();
assertFalse(subject.hasSubscribers());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
package io.reactivex.internal.operators.observable;

import static org.junit.Assert.*;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.*;

import java.util.*;
Expand All @@ -25,11 +26,12 @@
import io.reactivex.*;
import io.reactivex.Observable;
import io.reactivex.Observer;
import io.reactivex.disposables.Disposables;
import io.reactivex.disposables.*;
import io.reactivex.exceptions.TestException;
import io.reactivex.functions.*;
import io.reactivex.observers.TestObserver;
import io.reactivex.schedulers.Schedulers;
import io.reactivex.subjects.PublishSubject;

public class ObservableRepeatTest {

Expand Down Expand Up @@ -257,4 +259,25 @@ public boolean getAsBoolean() throws Exception {
.assertFailure(TestException.class, 1);
}

@Test
public void shouldDisposeInnerObservable() {
final PublishSubject<Object> subject = PublishSubject.create();
final Disposable disposable = Observable.just("Leak")
.repeatWhen(new Function<Observable<Object>, ObservableSource<Object>>() {
@Override
public ObservableSource<Object> apply(Observable<Object> completions) throws Exception {
return completions.switchMap(new Function<Object, ObservableSource<Object>>() {
@Override
public ObservableSource<Object> apply(Object ignore) throws Exception {
return subject;
}
});
}
})
.subscribe();

assertTrue(subject.hasObservers());
disposable.dispose();
assertFalse(subject.hasObservers());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -908,4 +908,26 @@ public boolean test(Throwable e) throws Exception {
}
}

@Test
public void shouldDisposeInnerObservable() {
final PublishSubject<Object> subject = PublishSubject.create();
final Disposable disposable = Observable.error(new RuntimeException("Leak"))
.retryWhen(new Function<Observable<Throwable>, ObservableSource<Object>>() {
@Override
public ObservableSource<Object> apply(Observable<Throwable> errors) throws Exception {
return errors.switchMap(new Function<Throwable, ObservableSource<Object>>() {
@Override
public ObservableSource<Object> apply(Throwable ignore) throws Exception {
return subject;
}
});
}
})
.subscribe();

assertTrue(subject.hasObservers());
disposable.dispose();
assertFalse(subject.hasObservers());
}

}

0 comments on commit ff282b9

Please sign in to comment.