Skip to content

Commit

Permalink
Add support for exposing delegate observers. (#89)
Browse files Browse the repository at this point in the history
* Make AutoDisposingSubscriber inherit from FlowableSubscriber

* Add api for observer delegate to AutoDisposingObserver

* Add api for observer delegate to AutoDisposingCompletableObserver

* Add api for observer delegate to AutoDisposingMaybeObserver

* Add api for observer delegate to AutoDisposingSingleObserver

* Add api for subscriber delegate to AutoDisposingSubscriber

* Fix test styling

* Fix comments in each interface

* Remove random character that shouldnt have been added

* Replace isEqualTo asserts with isSameAs, which is technically more correct

* Move test member AtomicReferences into the specific test

* Fix checkstyle

* Add comments and @experimental annotation
  • Loading branch information
mswysocki authored and ZacSweers committed Sep 22, 2017
1 parent e971e56 commit 5af3987
Show file tree
Hide file tree
Showing 15 changed files with 396 additions and 73 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,10 @@ final class AutoDisposingCompletableObserverImpl implements AutoDisposingComplet
this.delegate = delegate;
}

@Override public CompletableObserver delegateObserver() {
return delegate;
}

@Override public void onSubscribe(final Disposable d) {
if (AutoDisposeEndConsumerHelper.setOnce(lifecycleDisposable,
lifecycle.doOnEvent(new BiConsumer<Object, Throwable>() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,10 @@ final class AutoDisposingMaybeObserverImpl<T> implements AutoDisposingMaybeObser
this.delegate = delegate;
}

@Override public MaybeObserver<? super T> delegateObserver() {
return delegate;
}

@Override public void onSubscribe(final Disposable d) {
if (AutoDisposeEndConsumerHelper.setOnce(lifecycleDisposable,
lifecycle.doOnEvent(new BiConsumer<Object, Throwable>() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,10 @@ final class AutoDisposingObserverImpl<T> implements AutoDisposingObserver<T> {
this.delegate = delegate;
}

@Override public Observer<? super T> delegateObserver() {
return delegate;
}

@Override public void onSubscribe(final Disposable d) {
if (AutoDisposeEndConsumerHelper.setOnce(lifecycleDisposable,
lifecycle.doOnEvent(new BiConsumer<Object, Throwable>() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,10 @@ final class AutoDisposingSingleObserverImpl<T> implements AutoDisposingSingleObs
this.delegate = delegate;
}

@Override public SingleObserver<? super T> delegateObserver() {
return delegate;
}

@Override public void onSubscribe(final Disposable d) {
if (AutoDisposeEndConsumerHelper.setOnce(lifecycleDisposable,
lifecycle.doOnEvent(new BiConsumer<Object, Throwable>() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,10 @@ final class AutoDisposingSubscriberImpl<T> implements AutoDisposingSubscriber<T>
this.delegate = delegate;
}

@Override public Subscriber<? super T> delegateSubscriber() {
return delegate;
}

@Override public void onSubscribe(final Subscription s) {
if (AutoDisposeEndConsumerHelper.setOnce(lifecycleDisposable,
lifecycle.doOnEvent(new BiConsumer<Object, Throwable>() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,19 @@
package com.uber.autodispose.observers;

import io.reactivex.CompletableObserver;
import io.reactivex.annotations.Experimental;
import io.reactivex.disposables.Disposable;

/**
* A {@link Disposable} {@link CompletableObserver} that can automatically dispose itself.
* Interface here for type safety but enforcement is left to the implementation.
*/
public interface AutoDisposingCompletableObserver extends CompletableObserver, Disposable {}
public interface AutoDisposingCompletableObserver extends CompletableObserver, Disposable {

/**
* @return The delegate {@link CompletableObserver} that is used under the hood forintrospection
* purposes. This will be updated once LambdaIntrospection is out of @Experimental in RxJava.
*/
@Experimental
CompletableObserver delegateObserver();
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,19 @@
package com.uber.autodispose.observers;

import io.reactivex.MaybeObserver;
import io.reactivex.annotations.Experimental;
import io.reactivex.disposables.Disposable;

/**
* A {@link Disposable} {@link MaybeObserver} that can automatically dispose itself.
* Interface here for type safety but enforcement is left to the implementation.
*/
public interface AutoDisposingMaybeObserver<T> extends MaybeObserver<T>, Disposable {}
public interface AutoDisposingMaybeObserver<T> extends MaybeObserver<T>, Disposable {

/**
* @return The delegate {@link MayberObserver} that is used under the hood for introspection
* purposes. This will be updated once LambdaIntrospection is out of @Experimental in RxJava.
*/
@Experimental
MaybeObserver<? super T> delegateObserver();
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,19 @@
package com.uber.autodispose.observers;

import io.reactivex.Observer;
import io.reactivex.annotations.Experimental;
import io.reactivex.disposables.Disposable;

/**
* A {@link Disposable} {@link Observer} that can automatically dispose itself.
* Interface here for type safety but enforcement is left to the implementation.
*/
public interface AutoDisposingObserver<T> extends Observer<T>, Disposable {}
public interface AutoDisposingObserver<T> extends Observer<T>, Disposable {

/**
* @return The delegate {@link Observer} that is used under the hood for introspection purpose.
* This will be updated once LambdaIntrospection is out of @Experimental in RxJava.
*/
@Experimental
Observer<? super T> delegateObserver();
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,19 @@
package com.uber.autodispose.observers;

import io.reactivex.SingleObserver;
import io.reactivex.annotations.Experimental;
import io.reactivex.disposables.Disposable;

/**
* A {@link Disposable} {@link SingleObserver} that can automatically dispose itself.
* Interface here for type safety but enforcement is left to the implementation.
*/
public interface AutoDisposingSingleObserver<T> extends SingleObserver<T>, Disposable {}
public interface AutoDisposingSingleObserver<T> extends SingleObserver<T>, Disposable {

/**
* @return The delegate {@link SingleObserver} that is used under the hood for introspection
* purposes. This will be updated once LambdaIntrospection is out of @Experimental in RxJava.
*/
@Experimental
SingleObserver<? super T> delegateObserver();
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@

package com.uber.autodispose.observers;

import io.reactivex.FlowableSubscriber;
import io.reactivex.annotations.Experimental;
import io.reactivex.disposables.Disposable;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
Expand All @@ -24,4 +26,13 @@
* A {@link Disposable} {@link Subscriber} that can automatically dispose itself. Interface here
* for type safety but enforcement is left to the implementation.
*/
public interface AutoDisposingSubscriber<T> extends Subscriber<T>, Subscription, Disposable {}
public interface AutoDisposingSubscriber<T>
extends FlowableSubscriber<T>, Subscription, Disposable {

/**
* @return The delegate {@link Subscriber} that is used under the hood for introspection
* purposes. This will be updated once LambdaIntrospection is out of @Experimental in RxJava.
*/
@Experimental
Subscriber<? super T> delegateSubscriber();
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,17 +17,26 @@
package com.uber.autodispose;

import com.uber.autodispose.test.RecordingObserver;
import com.uber.autodispose.observers.AutoDisposingCompletableObserver;

import io.reactivex.Completable;
import io.reactivex.CompletableEmitter;
import io.reactivex.CompletableObserver;
import io.reactivex.CompletableOnSubscribe;
import io.reactivex.Maybe;
import io.reactivex.functions.BiFunction;
import io.reactivex.functions.Cancellable;
import io.reactivex.functions.Consumer;
import io.reactivex.functions.Predicate;
import io.reactivex.observers.TestObserver;
import io.reactivex.plugins.RxJavaPlugins;
import io.reactivex.subjects.BehaviorSubject;
import io.reactivex.subjects.CompletableSubject;
import io.reactivex.subjects.MaybeSubject;

import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;

import org.junit.After;
import org.junit.Test;

Expand All @@ -38,16 +47,19 @@
public class AutoDisposeCompletableObserverTest {

private static final RecordingObserver.Logger LOGGER = new RecordingObserver.Logger() {
@Override public void log(String message) {
@Override
public void log(String message) {
System.out.println(AutoDisposeCompletableObserverTest.class.getSimpleName() + ": " + message);
}
};

@After public void resetPlugins() {
@After
public void resetPlugins() {
AutoDisposePlugins.reset();
}

@Test public void autoDispose_withMaybe_normal() {
@Test
public void autoDispose_withMaybe_normal() {
RecordingObserver<Integer> o = new RecordingObserver<>(LOGGER);
CompletableSubject source = CompletableSubject.create();
MaybeSubject<Integer> lifecycle = MaybeSubject.create();
Expand All @@ -68,7 +80,8 @@ public class AutoDisposeCompletableObserverTest {
assertThat(lifecycle.hasObservers()).isFalse();
}

@Test public void autoDispose_withMaybe_interrupted() {
@Test
public void autoDispose_withMaybe_interrupted() {
RecordingObserver<Integer> o = new RecordingObserver<>(LOGGER);
CompletableSubject source = CompletableSubject.create();
MaybeSubject<Integer> lifecycle = MaybeSubject.create();
Expand All @@ -89,7 +102,8 @@ public class AutoDisposeCompletableObserverTest {
o.assertNoMoreEvents();
}

@Test public void autoDispose_withProvider_completion() {
@Test
public void autoDispose_withProvider_completion() {
RecordingObserver<Integer> o = new RecordingObserver<>(LOGGER);
CompletableSubject source = CompletableSubject.create();
MaybeSubject<Integer> scope = MaybeSubject.create();
Expand All @@ -109,7 +123,8 @@ public class AutoDisposeCompletableObserverTest {
assertThat(scope.hasObservers()).isFalse();
}

@Test public void autoDispose_withProvider_interrupted() {
@Test
public void autoDispose_withProvider_interrupted() {
RecordingObserver<Integer> o = new RecordingObserver<>(LOGGER);
CompletableSubject source = CompletableSubject.create();
MaybeSubject<Integer> scope = MaybeSubject.create();
Expand All @@ -132,7 +147,8 @@ public class AutoDisposeCompletableObserverTest {
o.assertNoMoreEvents();
}

@Test public void autoDispose_withLifecycleProvider_completion() {
@Test
public void autoDispose_withLifecycleProvider_completion() {
RecordingObserver<Integer> o = new RecordingObserver<>(LOGGER);
CompletableSubject source = CompletableSubject.create();
BehaviorSubject<Integer> lifecycle = BehaviorSubject.createDefault(0);
Expand All @@ -157,7 +173,8 @@ public class AutoDisposeCompletableObserverTest {
assertThat(lifecycle.hasObservers()).isFalse();
}

@Test public void autoDispose_withLifecycleProvider_interrupted() {
@Test
public void autoDispose_withLifecycleProvider_interrupted() {
RecordingObserver<Integer> o = new RecordingObserver<>(LOGGER);
CompletableSubject source = CompletableSubject.create();
BehaviorSubject<Integer> lifecycle = BehaviorSubject.createDefault(0);
Expand Down Expand Up @@ -185,7 +202,8 @@ public class AutoDisposeCompletableObserverTest {
o.assertNoMoreEvents();
}

@Test public void autoDispose_withLifecycleProvider_withoutStartingLifecycle_shouldFail() {
@Test
public void autoDispose_withLifecycleProvider_withoutStartingLifecycle_shouldFail() {
BehaviorSubject<Integer> lifecycle = BehaviorSubject.create();
RecordingObserver<Integer> o = new RecordingObserver<>(LOGGER);
LifecycleScopeProvider<Integer> provider = makeLifecycleProvider(lifecycle);
Expand All @@ -197,7 +215,8 @@ public class AutoDisposeCompletableObserverTest {
assertThat(o.takeError()).isInstanceOf(LifecycleNotStartedException.class);
}

@Test public void autoDispose_withLifecycleProvider_afterLifecycle_shouldFail() {
@Test
public void autoDispose_withLifecycleProvider_afterLifecycle_shouldFail() {
BehaviorSubject<Integer> lifecycle = BehaviorSubject.createDefault(0);
lifecycle.onNext(1);
lifecycle.onNext(2);
Expand All @@ -212,9 +231,11 @@ public class AutoDisposeCompletableObserverTest {
assertThat(o.takeError()).isInstanceOf(LifecycleEndedException.class);
}

@Test public void autoDispose_withProviderAndNoOpPlugin_withoutStarting_shouldFailSilently() {
@Test
public void autoDispose_withProviderAndNoOpPlugin_withoutStarting_shouldFailSilently() {
AutoDisposePlugins.setOutsideLifecycleHandler(new Consumer<OutsideLifecycleException>() {
@Override public void accept(OutsideLifecycleException e) throws Exception { }
@Override
public void accept(OutsideLifecycleException e) throws Exception { }
});
BehaviorSubject<Integer> lifecycle = BehaviorSubject.create();
TestObserver<Integer> o = new TestObserver<>();
Expand All @@ -229,9 +250,11 @@ public class AutoDisposeCompletableObserverTest {
o.assertNoErrors();
}

@Test public void autoDispose_withProviderAndNoOpPlugin_afterEnding_shouldFailSilently() {
@Test
public void autoDispose_withProviderAndNoOpPlugin_afterEnding_shouldFailSilently() {
AutoDisposePlugins.setOutsideLifecycleHandler(new Consumer<OutsideLifecycleException>() {
@Override public void accept(OutsideLifecycleException e) throws Exception {
@Override
public void accept(OutsideLifecycleException e) throws Exception {
// Noop
}
});
Expand All @@ -251,9 +274,11 @@ public class AutoDisposeCompletableObserverTest {
o.assertNoErrors();
}

@Test public void autoDispose_withProviderAndPlugin_withoutStarting_shouldFailWithWrappedExp() {
@Test
public void autoDispose_withProviderAndPlugin_withoutStarting_shouldFailWithWrappedExp() {
AutoDisposePlugins.setOutsideLifecycleHandler(new Consumer<OutsideLifecycleException>() {
@Override public void accept(OutsideLifecycleException e) throws Exception {
@Override
public void accept(OutsideLifecycleException e) throws Exception {
// Wrap in an IllegalStateException so we can verify this is the exception we see on the
// other side
throw new IllegalStateException(e);
Expand All @@ -268,20 +293,59 @@ public class AutoDisposeCompletableObserverTest {

o.assertNoValues();
o.assertError(new Predicate<Throwable>() {
@Override public boolean test(Throwable throwable) throws Exception {
@Override
public boolean test(Throwable throwable) throws Exception {
return throwable instanceof IllegalStateException
&& throwable.getCause() instanceof OutsideLifecycleException;
}
});
}

@Test public void verifyCancellation() throws Exception {
@Test
public void verifyObserverDelegate() {
final AtomicReference<CompletableObserver> atomicObserver = new AtomicReference<>();
final AtomicReference<CompletableObserver> atomicAutoDisposingObserver
= new AtomicReference<>();
try {
RxJavaPlugins.setOnCompletableSubscribe(new BiFunction<Completable,
CompletableObserver,
CompletableObserver>() {
@Override public CompletableObserver apply(
Completable source,
CompletableObserver observer) {
if (atomicObserver.get() == null) {
atomicObserver.set(observer);
} else if (atomicAutoDisposingObserver.get() == null) {
atomicAutoDisposingObserver.set(observer);
RxJavaPlugins.setOnObservableSubscribe(null);
}
return observer;
}
});
Completable.complete().to(new CompletableScoper(Maybe.never())).subscribe();

assertThat(atomicAutoDisposingObserver.get()).isNotNull();
assertThat(atomicAutoDisposingObserver.get())
.isInstanceOf(AutoDisposingCompletableObserver.class);
assertThat(((AutoDisposingCompletableObserver) atomicAutoDisposingObserver.get())
.delegateObserver()).isNotNull();
assertThat(((AutoDisposingCompletableObserver) atomicAutoDisposingObserver.get())
.delegateObserver()).isSameAs(atomicObserver.get());
} finally {
RxJavaPlugins.reset();
}
}

@Test
public void verifyCancellation() throws Exception {
final AtomicInteger i = new AtomicInteger();
//noinspection unchecked because Java
Completable source = Completable.create(new CompletableOnSubscribe() {
@Override public void subscribe(CompletableEmitter e) throws Exception {
@Override
public void subscribe(CompletableEmitter e) throws Exception {
e.setCancellable(new Cancellable() {
@Override public void cancel() throws Exception {
@Override
public void cancel() throws Exception {
i.incrementAndGet();
}
});
Expand Down
Loading

0 comments on commit 5af3987

Please sign in to comment.