Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support for exposing delegate observers. #89

Merged
merged 13 commits into from
Sep 22, 2017
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,10 @@ final class AutoDisposingCompletableObserverImpl implements AutoDisposingComplet
this.delegate = delegate;
}

@Override public CompletableObserver delegateObserver() {
return delegate;
}

@Override public void onSubscribe(final Disposable d) {
if (AutoDisposeEndConsumerHelper.setOnce(lifecycleDisposable,
lifecycle.doOnEvent(new BiConsumer<Object, Throwable>() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,10 @@ final class AutoDisposingMaybeObserverImpl<T> implements AutoDisposingMaybeObser
this.delegate = delegate;
}

@Override public MaybeObserver<? super T> delegateObserver() {
return delegate;
}

@Override public void onSubscribe(final Disposable d) {
if (AutoDisposeEndConsumerHelper.setOnce(lifecycleDisposable,
lifecycle.doOnEvent(new BiConsumer<Object, Throwable>() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,10 @@ final class AutoDisposingObserverImpl<T> implements AutoDisposingObserver<T> {
this.delegate = delegate;
}

@Override public Observer<? super T> delegateObserver() {
return delegate;
}

@Override public void onSubscribe(final Disposable d) {
if (AutoDisposeEndConsumerHelper.setOnce(lifecycleDisposable,
lifecycle.doOnEvent(new BiConsumer<Object, Throwable>() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,10 @@ final class AutoDisposingSingleObserverImpl<T> implements AutoDisposingSingleObs
this.delegate = delegate;
}

@Override public SingleObserver<? super T> delegateObserver() {
return delegate;
}

@Override public void onSubscribe(final Disposable d) {
if (AutoDisposeEndConsumerHelper.setOnce(lifecycleDisposable,
lifecycle.doOnEvent(new BiConsumer<Object, Throwable>() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,10 @@ final class AutoDisposingSubscriberImpl<T> implements AutoDisposingSubscriber<T>
this.delegate = delegate;
}

@Override public Subscriber<? super T> delegateSubscriber() {
return delegate;
}

@Override public void onSubscribe(final Subscription s) {
if (AutoDisposeEndConsumerHelper.setOnce(lifecycleDisposable,
lifecycle.doOnEvent(new BiConsumer<Object, Throwable>() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,19 @@
package com.uber.autodispose.observers;

import io.reactivex.CompletableObserver;
import io.reactivex.annotations.Experimental;
import io.reactivex.disposables.Disposable;

/**
* A {@link Disposable} {@link CompletableObserver} that can automatically dispose itself.
* Interface here for type safety but enforcement is left to the implementation.
*/
public interface AutoDisposingCompletableObserver extends CompletableObserver, Disposable {}
public interface AutoDisposingCompletableObserver extends CompletableObserver, Disposable {

/**
* @return The delegate {@link CompletableObserver} that is used under the hood forintrospection
* purposes. This will be updated once LambdaIntrospection is out of @Experimental in RxJava.
*/
@Experimental
CompletableObserver delegateObserver();
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,19 @@
package com.uber.autodispose.observers;

import io.reactivex.MaybeObserver;
import io.reactivex.annotations.Experimental;
import io.reactivex.disposables.Disposable;

/**
* A {@link Disposable} {@link MaybeObserver} that can automatically dispose itself.
* Interface here for type safety but enforcement is left to the implementation.
*/
public interface AutoDisposingMaybeObserver<T> extends MaybeObserver<T>, Disposable {}
public interface AutoDisposingMaybeObserver<T> extends MaybeObserver<T>, Disposable {

/**
* @return The delegate {@link MayberObserver} that is used under the hood for introspection
* purposes. This will be updated once LambdaIntrospection is out of @Experimental in RxJava.
*/
@Experimental
MaybeObserver<? super T> delegateObserver();
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,19 @@
package com.uber.autodispose.observers;

import io.reactivex.Observer;
import io.reactivex.annotations.Experimental;
import io.reactivex.disposables.Disposable;

/**
* A {@link Disposable} {@link Observer} that can automatically dispose itself.
* Interface here for type safety but enforcement is left to the implementation.
*/
public interface AutoDisposingObserver<T> extends Observer<T>, Disposable {}
public interface AutoDisposingObserver<T> extends Observer<T>, Disposable {

/**
* @return The delegate {@link Observer} that is used under the hood for introspection purpose.
* This will be updated once LambdaIntrospection is out of @Experimental in RxJava.
*/
@Experimental
Observer<? super T> delegateObserver();
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,19 @@
package com.uber.autodispose.observers;

import io.reactivex.SingleObserver;
import io.reactivex.annotations.Experimental;
import io.reactivex.disposables.Disposable;

/**
* A {@link Disposable} {@link SingleObserver} that can automatically dispose itself.
* Interface here for type safety but enforcement is left to the implementation.
*/
public interface AutoDisposingSingleObserver<T> extends SingleObserver<T>, Disposable {}
public interface AutoDisposingSingleObserver<T> extends SingleObserver<T>, Disposable {

/**
* @return The delegate {@link SingleObserver} that is used under the hood for introspection
* purposes. This will be updated once LambdaIntrospection is out of @Experimental in RxJava.
*/
@Experimental
SingleObserver<? super T> delegateObserver();
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@

package com.uber.autodispose.observers;

import io.reactivex.FlowableSubscriber;
import io.reactivex.annotations.Experimental;
import io.reactivex.disposables.Disposable;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
Expand All @@ -24,4 +26,13 @@
* A {@link Disposable} {@link Subscriber} that can automatically dispose itself. Interface here
* for type safety but enforcement is left to the implementation.
*/
public interface AutoDisposingSubscriber<T> extends Subscriber<T>, Subscription, Disposable {}
public interface AutoDisposingSubscriber<T>
extends FlowableSubscriber<T>, Subscription, Disposable {

/**
* @return The delegate {@link Subscriber} that is used under the hood for introspection
* purposes. This will be updated once LambdaIntrospection is out of @Experimental in RxJava.
*/
@Experimental
Subscriber<? super T> delegateSubscriber();
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,17 +17,26 @@
package com.uber.autodispose;

import com.uber.autodispose.test.RecordingObserver;
import com.uber.autodispose.observers.AutoDisposingCompletableObserver;

import io.reactivex.Completable;
import io.reactivex.CompletableEmitter;
import io.reactivex.CompletableObserver;
import io.reactivex.CompletableOnSubscribe;
import io.reactivex.Maybe;
import io.reactivex.functions.BiFunction;
import io.reactivex.functions.Cancellable;
import io.reactivex.functions.Consumer;
import io.reactivex.functions.Predicate;
import io.reactivex.observers.TestObserver;
import io.reactivex.plugins.RxJavaPlugins;
import io.reactivex.subjects.BehaviorSubject;
import io.reactivex.subjects.CompletableSubject;
import io.reactivex.subjects.MaybeSubject;

import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;

import org.junit.After;
import org.junit.Test;

Expand All @@ -38,16 +47,19 @@
public class AutoDisposeCompletableObserverTest {

private static final RecordingObserver.Logger LOGGER = new RecordingObserver.Logger() {
@Override public void log(String message) {
@Override
public void log(String message) {
System.out.println(AutoDisposeCompletableObserverTest.class.getSimpleName() + ": " + message);
}
};

@After public void resetPlugins() {
@After
public void resetPlugins() {
AutoDisposePlugins.reset();
}

@Test public void autoDispose_withMaybe_normal() {
@Test
public void autoDispose_withMaybe_normal() {
RecordingObserver<Integer> o = new RecordingObserver<>(LOGGER);
CompletableSubject source = CompletableSubject.create();
MaybeSubject<Integer> lifecycle = MaybeSubject.create();
Expand All @@ -68,7 +80,8 @@ public class AutoDisposeCompletableObserverTest {
assertThat(lifecycle.hasObservers()).isFalse();
}

@Test public void autoDispose_withMaybe_interrupted() {
@Test
public void autoDispose_withMaybe_interrupted() {
RecordingObserver<Integer> o = new RecordingObserver<>(LOGGER);
CompletableSubject source = CompletableSubject.create();
MaybeSubject<Integer> lifecycle = MaybeSubject.create();
Expand All @@ -89,7 +102,8 @@ public class AutoDisposeCompletableObserverTest {
o.assertNoMoreEvents();
}

@Test public void autoDispose_withProvider_completion() {
@Test
public void autoDispose_withProvider_completion() {
RecordingObserver<Integer> o = new RecordingObserver<>(LOGGER);
CompletableSubject source = CompletableSubject.create();
MaybeSubject<Integer> scope = MaybeSubject.create();
Expand All @@ -109,7 +123,8 @@ public class AutoDisposeCompletableObserverTest {
assertThat(scope.hasObservers()).isFalse();
}

@Test public void autoDispose_withProvider_interrupted() {
@Test
public void autoDispose_withProvider_interrupted() {
RecordingObserver<Integer> o = new RecordingObserver<>(LOGGER);
CompletableSubject source = CompletableSubject.create();
MaybeSubject<Integer> scope = MaybeSubject.create();
Expand All @@ -132,7 +147,8 @@ public class AutoDisposeCompletableObserverTest {
o.assertNoMoreEvents();
}

@Test public void autoDispose_withLifecycleProvider_completion() {
@Test
public void autoDispose_withLifecycleProvider_completion() {
RecordingObserver<Integer> o = new RecordingObserver<>(LOGGER);
CompletableSubject source = CompletableSubject.create();
BehaviorSubject<Integer> lifecycle = BehaviorSubject.createDefault(0);
Expand All @@ -157,7 +173,8 @@ public class AutoDisposeCompletableObserverTest {
assertThat(lifecycle.hasObservers()).isFalse();
}

@Test public void autoDispose_withLifecycleProvider_interrupted() {
@Test
public void autoDispose_withLifecycleProvider_interrupted() {
RecordingObserver<Integer> o = new RecordingObserver<>(LOGGER);
CompletableSubject source = CompletableSubject.create();
BehaviorSubject<Integer> lifecycle = BehaviorSubject.createDefault(0);
Expand Down Expand Up @@ -185,7 +202,8 @@ public class AutoDisposeCompletableObserverTest {
o.assertNoMoreEvents();
}

@Test public void autoDispose_withLifecycleProvider_withoutStartingLifecycle_shouldFail() {
@Test
public void autoDispose_withLifecycleProvider_withoutStartingLifecycle_shouldFail() {
BehaviorSubject<Integer> lifecycle = BehaviorSubject.create();
RecordingObserver<Integer> o = new RecordingObserver<>(LOGGER);
LifecycleScopeProvider<Integer> provider = makeLifecycleProvider(lifecycle);
Expand All @@ -197,7 +215,8 @@ public class AutoDisposeCompletableObserverTest {
assertThat(o.takeError()).isInstanceOf(LifecycleNotStartedException.class);
}

@Test public void autoDispose_withLifecycleProvider_afterLifecycle_shouldFail() {
@Test
public void autoDispose_withLifecycleProvider_afterLifecycle_shouldFail() {
BehaviorSubject<Integer> lifecycle = BehaviorSubject.createDefault(0);
lifecycle.onNext(1);
lifecycle.onNext(2);
Expand All @@ -212,9 +231,11 @@ public class AutoDisposeCompletableObserverTest {
assertThat(o.takeError()).isInstanceOf(LifecycleEndedException.class);
}

@Test public void autoDispose_withProviderAndNoOpPlugin_withoutStarting_shouldFailSilently() {
@Test
public void autoDispose_withProviderAndNoOpPlugin_withoutStarting_shouldFailSilently() {
AutoDisposePlugins.setOutsideLifecycleHandler(new Consumer<OutsideLifecycleException>() {
@Override public void accept(OutsideLifecycleException e) throws Exception { }
@Override
public void accept(OutsideLifecycleException e) throws Exception { }
});
BehaviorSubject<Integer> lifecycle = BehaviorSubject.create();
TestObserver<Integer> o = new TestObserver<>();
Expand All @@ -229,9 +250,11 @@ public class AutoDisposeCompletableObserverTest {
o.assertNoErrors();
}

@Test public void autoDispose_withProviderAndNoOpPlugin_afterEnding_shouldFailSilently() {
@Test
public void autoDispose_withProviderAndNoOpPlugin_afterEnding_shouldFailSilently() {
AutoDisposePlugins.setOutsideLifecycleHandler(new Consumer<OutsideLifecycleException>() {
@Override public void accept(OutsideLifecycleException e) throws Exception {
@Override
public void accept(OutsideLifecycleException e) throws Exception {
// Noop
}
});
Expand All @@ -251,9 +274,11 @@ public class AutoDisposeCompletableObserverTest {
o.assertNoErrors();
}

@Test public void autoDispose_withProviderAndPlugin_withoutStarting_shouldFailWithWrappedExp() {
@Test
public void autoDispose_withProviderAndPlugin_withoutStarting_shouldFailWithWrappedExp() {
AutoDisposePlugins.setOutsideLifecycleHandler(new Consumer<OutsideLifecycleException>() {
@Override public void accept(OutsideLifecycleException e) throws Exception {
@Override
public void accept(OutsideLifecycleException e) throws Exception {
// Wrap in an IllegalStateException so we can verify this is the exception we see on the
// other side
throw new IllegalStateException(e);
Expand All @@ -268,20 +293,59 @@ public class AutoDisposeCompletableObserverTest {

o.assertNoValues();
o.assertError(new Predicate<Throwable>() {
@Override public boolean test(Throwable throwable) throws Exception {
@Override
public boolean test(Throwable throwable) throws Exception {
return throwable instanceof IllegalStateException
&& throwable.getCause() instanceof OutsideLifecycleException;
}
});
}

@Test public void verifyCancellation() throws Exception {
@Test
public void verifyObserverDelegate() {
final AtomicReference<CompletableObserver> atomicObserver = new AtomicReference<>();
final AtomicReference<CompletableObserver> atomicAutoDisposingObserver
= new AtomicReference<>();
try {
RxJavaPlugins.setOnCompletableSubscribe(new BiFunction<Completable,
CompletableObserver,
CompletableObserver>() {
@Override public CompletableObserver apply(
Completable source,
CompletableObserver observer) {
if (atomicObserver.get() == null) {
atomicObserver.set(observer);
} else if (atomicAutoDisposingObserver.get() == null) {
atomicAutoDisposingObserver.set(observer);
RxJavaPlugins.setOnObservableSubscribe(null);
}
return observer;
}
});
Completable.complete().to(new CompletableScoper(Maybe.never())).subscribe();

assertThat(atomicAutoDisposingObserver.get()).isNotNull();
assertThat(atomicAutoDisposingObserver.get())
.isInstanceOf(AutoDisposingCompletableObserver.class);
assertThat(((AutoDisposingCompletableObserver) atomicAutoDisposingObserver.get())
.delegateObserver()).isNotNull();
assertThat(((AutoDisposingCompletableObserver) atomicAutoDisposingObserver.get())
.delegateObserver()).isSameAs(atomicObserver.get());
} finally {
RxJavaPlugins.reset();
}
}

@Test
public void verifyCancellation() throws Exception {
final AtomicInteger i = new AtomicInteger();
//noinspection unchecked because Java
Completable source = Completable.create(new CompletableOnSubscribe() {
@Override public void subscribe(CompletableEmitter e) throws Exception {
@Override
public void subscribe(CompletableEmitter e) throws Exception {
e.setCancellable(new Cancellable() {
@Override public void cancel() throws Exception {
@Override
public void cancel() throws Exception {
i.incrementAndGet();
}
});
Expand Down
Loading