From e2b67b8c4c37c072dcd224448a9f1b5849d19884 Mon Sep 17 00:00:00 2001 From: zsxwing Date: Tue, 18 Feb 2014 11:56:18 +0800 Subject: [PATCH] Rewrite OperationObserveFromAndroidComponent to OperatorObserveFromAndroidComponent --- .../observables/AndroidObservable.java | 8 +- .../subscriptions/AndroidSubscriptions.java | 55 +++++++++++++ .../OperatorCompoundButtonInput.java | 16 +--- .../rx/operators/OperatorEditTextInput.java | 16 +--- ... OperatorObserveFromAndroidComponent.java} | 30 +++---- .../java/rx/operators/OperatorViewClick.java | 16 +--- ...ratorObserveFromAndroidComponentTest.java} | 81 +++++++------------ .../HandlerThreadSchedulerTest.java | 8 +- 8 files changed, 112 insertions(+), 118 deletions(-) create mode 100644 rxjava-contrib/rxjava-android/src/main/java/rx/android/subscriptions/AndroidSubscriptions.java rename rxjava-contrib/rxjava-android/src/main/java/rx/operators/{OperationObserveFromAndroidComponent.java => OperatorObserveFromAndroidComponent.java} (85%) rename rxjava-contrib/rxjava-android/src/test/java/rx/android/operators/{OperationObserveFromAndroidComponentTest.java => OperatorObserveFromAndroidComponentTest.java} (69%) diff --git a/rxjava-contrib/rxjava-android/src/main/java/rx/android/observables/AndroidObservable.java b/rxjava-contrib/rxjava-android/src/main/java/rx/android/observables/AndroidObservable.java index a6af28b555..d55617ddea 100644 --- a/rxjava-contrib/rxjava-android/src/main/java/rx/android/observables/AndroidObservable.java +++ b/rxjava-contrib/rxjava-android/src/main/java/rx/android/observables/AndroidObservable.java @@ -16,7 +16,7 @@ package rx.android.observables; import rx.Observable; -import rx.operators.OperationObserveFromAndroidComponent; +import rx.operators.OperatorObserveFromAndroidComponent; import android.app.Activity; import android.app.Fragment; import android.os.Build; @@ -59,7 +59,7 @@ private AndroidObservable() {} * @return a new observable sequence that will emit notifications on the main UI thread */ public static Observable fromActivity(Activity activity, Observable sourceObservable) { - return OperationObserveFromAndroidComponent.observeFromAndroidComponent(sourceObservable, activity); + return OperatorObserveFromAndroidComponent.observeFromAndroidComponent(sourceObservable, activity); } /** @@ -88,9 +88,9 @@ public static Observable fromActivity(Activity activity, Observable so */ public static Observable fromFragment(Object fragment, Observable sourceObservable) { if (USES_SUPPORT_FRAGMENTS && fragment instanceof android.support.v4.app.Fragment) { - return OperationObserveFromAndroidComponent.observeFromAndroidComponent(sourceObservable, (android.support.v4.app.Fragment) fragment); + return OperatorObserveFromAndroidComponent.observeFromAndroidComponent(sourceObservable, (android.support.v4.app.Fragment) fragment); } else if (Build.VERSION.SDK_INT >= Build.VERSION_CODES.HONEYCOMB && fragment instanceof Fragment) { - return OperationObserveFromAndroidComponent.observeFromAndroidComponent(sourceObservable, (Fragment) fragment); + return OperatorObserveFromAndroidComponent.observeFromAndroidComponent(sourceObservable, (Fragment) fragment); } else { throw new IllegalArgumentException("Target fragment is neither a native nor support library Fragment"); } diff --git a/rxjava-contrib/rxjava-android/src/main/java/rx/android/subscriptions/AndroidSubscriptions.java b/rxjava-contrib/rxjava-android/src/main/java/rx/android/subscriptions/AndroidSubscriptions.java new file mode 100644 index 0000000000..8e2f54ab3e --- /dev/null +++ b/rxjava-contrib/rxjava-android/src/main/java/rx/android/subscriptions/AndroidSubscriptions.java @@ -0,0 +1,55 @@ +/** + * Copyright 2014 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package rx.android.subscriptions; + +import rx.Scheduler.Inner; +import rx.Subscription; +import rx.android.schedulers.AndroidSchedulers; +import rx.functions.Action0; +import rx.functions.Action1; +import rx.subscriptions.Subscriptions; +import android.os.Looper; + +public final class AndroidSubscriptions { + + private AndroidSubscriptions() { + // no instance + } + + /** + * Create an Subscription that always runs unsubscribe in the UI thread. + * + * @param unsubscribe + * @return an Subscription that always runs unsubscribe in the UI thread. + */ + public static Subscription unsubscribeInUiThread(final Action0 unsubscribe) { + return Subscriptions.create(new Action0() { + @Override + public void call() { + if (Looper.getMainLooper() == Looper.myLooper()) { + unsubscribe.call(); + } else { + AndroidSchedulers.mainThread().schedule(new Action1() { + @Override + public void call(Inner inner) { + unsubscribe.call(); + } + }); + } + } + }); + } +} diff --git a/rxjava-contrib/rxjava-android/src/main/java/rx/operators/OperatorCompoundButtonInput.java b/rxjava-contrib/rxjava-android/src/main/java/rx/operators/OperatorCompoundButtonInput.java index 63687f440e..9a61093699 100644 --- a/rxjava-contrib/rxjava-android/src/main/java/rx/operators/OperatorCompoundButtonInput.java +++ b/rxjava-contrib/rxjava-android/src/main/java/rx/operators/OperatorCompoundButtonInput.java @@ -23,12 +23,9 @@ import rx.Observable; import rx.Subscriber; import rx.Subscription; -import rx.Scheduler.Inner; import rx.android.observables.ViewObservable; -import rx.android.schedulers.AndroidSchedulers; +import rx.android.subscriptions.AndroidSubscriptions; import rx.functions.Action0; -import rx.functions.Action1; -import rx.subscriptions.Subscriptions; import android.view.View; import android.widget.CompoundButton; @@ -53,17 +50,10 @@ public void onCheckedChanged(final CompoundButton button, final boolean checked) } }; - final Subscription subscription = Subscriptions.create(new Action0() { + final Subscription subscription = AndroidSubscriptions.unsubscribeInUiThread(new Action0() { @Override public void call() { - AndroidSchedulers.mainThread().schedule(new Action1() { - - @Override - public void call(Inner t1) { - composite.removeOnCheckedChangeListener(listener); - } - - }); + composite.removeOnCheckedChangeListener(listener); } }); diff --git a/rxjava-contrib/rxjava-android/src/main/java/rx/operators/OperatorEditTextInput.java b/rxjava-contrib/rxjava-android/src/main/java/rx/operators/OperatorEditTextInput.java index 1aa08f73a9..1dcbac0014 100644 --- a/rxjava-contrib/rxjava-android/src/main/java/rx/operators/OperatorEditTextInput.java +++ b/rxjava-contrib/rxjava-android/src/main/java/rx/operators/OperatorEditTextInput.java @@ -18,12 +18,9 @@ import rx.Observable; import rx.Subscriber; import rx.Subscription; -import rx.Scheduler.Inner; import rx.android.observables.ViewObservable; -import rx.android.schedulers.AndroidSchedulers; +import rx.android.subscriptions.AndroidSubscriptions; import rx.functions.Action0; -import rx.functions.Action1; -import rx.subscriptions.Subscriptions; import android.text.Editable; import android.text.TextWatcher; import android.widget.EditText; @@ -47,17 +44,10 @@ public void afterTextChanged(final Editable editable) { } }; - final Subscription subscription = Subscriptions.create(new Action0() { + final Subscription subscription = AndroidSubscriptions.unsubscribeInUiThread(new Action0() { @Override public void call() { - AndroidSchedulers.mainThread().schedule(new Action1() { - - @Override - public void call(Inner t1) { - input.removeTextChangedListener(watcher); - } - - }); + input.removeTextChangedListener(watcher); } }); diff --git a/rxjava-contrib/rxjava-android/src/main/java/rx/operators/OperationObserveFromAndroidComponent.java b/rxjava-contrib/rxjava-android/src/main/java/rx/operators/OperatorObserveFromAndroidComponent.java similarity index 85% rename from rxjava-contrib/rxjava-android/src/main/java/rx/operators/OperationObserveFromAndroidComponent.java rename to rxjava-contrib/rxjava-android/src/main/java/rx/operators/OperatorObserveFromAndroidComponent.java index dba31a954b..3e656ccfbb 100644 --- a/rxjava-contrib/rxjava-android/src/main/java/rx/operators/OperationObserveFromAndroidComponent.java +++ b/rxjava-contrib/rxjava-android/src/main/java/rx/operators/OperatorObserveFromAndroidComponent.java @@ -17,17 +17,15 @@ import rx.Observable; import rx.Observer; -import rx.Subscription; -import rx.Scheduler.Inner; +import rx.Subscriber; import rx.android.schedulers.AndroidSchedulers; +import rx.android.subscriptions.AndroidSubscriptions; import rx.functions.Action0; -import rx.functions.Action1; -import rx.subscriptions.Subscriptions; import android.app.Activity; import android.os.Looper; import android.util.Log; -public class OperationObserveFromAndroidComponent { +public class OperatorObserveFromAndroidComponent { public static Observable observeFromAndroidComponent(Observable source, android.app.Fragment fragment) { return Observable.create(new OnSubscribeFragment(source, fragment)); @@ -41,7 +39,7 @@ public static Observable observeFromAndroidComponent(Observable source return Observable.create(new OnSubscribeBase(source, activity)); } - private static class OnSubscribeBase implements Observable.OnSubscribeFunc { + private static class OnSubscribeBase implements Observable.OnSubscribe { private static final String LOG_TAG = "AndroidObserver"; @@ -67,10 +65,10 @@ protected boolean isComponentValid(AndroidComponent component) { } @Override - public Subscription onSubscribe(Observer observer) { + public void call(Subscriber subscriber) { assertUiThread(); - observerRef = observer; - final Subscription sourceSub = source.observeOn(AndroidSchedulers.mainThread()).subscribe(new Observer() { + observerRef = subscriber; + source.observeOn(AndroidSchedulers.mainThread()).subscribe(new Subscriber(subscriber) { @Override public void onCompleted() { if (componentRef != null && isComponentValid(componentRef)) { @@ -98,21 +96,13 @@ public void onNext(T args) { } } }); - return Subscriptions.create(new Action0() { + subscriber.add(AndroidSubscriptions.unsubscribeInUiThread(new Action0() { @Override public void call() { log("unsubscribing from source sequence"); - AndroidSchedulers.mainThread().schedule(new Action1() { - - @Override - public void call(Inner t1) { - releaseReferences(); - sourceSub.unsubscribe(); - } - - }); + releaseReferences(); } - }); + })); } private void releaseReferences() { diff --git a/rxjava-contrib/rxjava-android/src/main/java/rx/operators/OperatorViewClick.java b/rxjava-contrib/rxjava-android/src/main/java/rx/operators/OperatorViewClick.java index 086fb53a40..1fb2bb8b91 100644 --- a/rxjava-contrib/rxjava-android/src/main/java/rx/operators/OperatorViewClick.java +++ b/rxjava-contrib/rxjava-android/src/main/java/rx/operators/OperatorViewClick.java @@ -21,14 +21,11 @@ import java.util.WeakHashMap; import rx.Observable; -import rx.Scheduler.Inner; import rx.Subscriber; import rx.Subscription; import rx.android.observables.ViewObservable; -import rx.android.schedulers.AndroidSchedulers; +import rx.android.subscriptions.AndroidSubscriptions; import rx.functions.Action0; -import rx.functions.Action1; -import rx.subscriptions.Subscriptions; import android.view.View; public final class OperatorViewClick implements Observable.OnSubscribe { @@ -52,17 +49,10 @@ public void onClick(final View clicked) { } }; - final Subscription subscription = Subscriptions.create(new Action0() { + final Subscription subscription = AndroidSubscriptions.unsubscribeInUiThread(new Action0() { @Override public void call() { - AndroidSchedulers.mainThread().schedule(new Action1() { - - @Override - public void call(Inner t1) { - composite.removeOnClickListener(listener); - } - - }); + composite.removeOnClickListener(listener); } }); diff --git a/rxjava-contrib/rxjava-android/src/test/java/rx/android/operators/OperationObserveFromAndroidComponentTest.java b/rxjava-contrib/rxjava-android/src/test/java/rx/android/operators/OperatorObserveFromAndroidComponentTest.java similarity index 69% rename from rxjava-contrib/rxjava-android/src/test/java/rx/android/operators/OperationObserveFromAndroidComponentTest.java rename to rxjava-contrib/rxjava-android/src/test/java/rx/android/operators/OperatorObserveFromAndroidComponentTest.java index e19dda4482..70a9c881fb 100644 --- a/rxjava-contrib/rxjava-android/src/test/java/rx/android/operators/OperationObserveFromAndroidComponentTest.java +++ b/rxjava-contrib/rxjava-android/src/test/java/rx/android/operators/OperatorObserveFromAndroidComponentTest.java @@ -15,13 +15,19 @@ */ package rx.android.operators; -import static org.junit.Assert.*; -import static org.mockito.Matchers.*; -import static org.mockito.Mockito.*; +import static org.junit.Assert.assertEquals; +import static org.mockito.Matchers.any; +import static org.mockito.Matchers.anyInt; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.verifyNoMoreInteractions; +import static org.mockito.Mockito.when; import java.lang.reflect.Constructor; import java.lang.reflect.Field; import java.lang.reflect.InvocationTargetException; +import java.util.Arrays; import java.util.concurrent.Callable; import java.util.concurrent.Executors; import java.util.concurrent.Future; @@ -38,23 +44,19 @@ import org.robolectric.annotation.Config; import rx.Observable; -import rx.Observable.OnSubscribeFunc; import rx.Observer; -import rx.Subscription; import rx.android.schedulers.AndroidSchedulers; import rx.functions.Action1; import rx.observers.TestObserver; import rx.observers.TestSubscriber; -import rx.operators.OperationObserveFromAndroidComponent; +import rx.operators.OperatorObserveFromAndroidComponent; import rx.schedulers.Schedulers; import rx.subjects.PublishSubject; -import rx.subscriptions.BooleanSubscription; -import android.app.Activity; import android.app.Fragment; @RunWith(RobolectricTestRunner.class) @Config(manifest = Config.NONE) -public class OperationObserveFromAndroidComponentTest { +public class OperatorObserveFromAndroidComponentTest { @Mock private Observer mockObserver; @@ -62,9 +64,6 @@ public class OperationObserveFromAndroidComponentTest { @Mock private Fragment mockFragment; - @Mock - private Activity mockActivity; - @Before public void setupMocks() { MockitoAnnotations.initMocks(this); @@ -77,7 +76,7 @@ public void itThrowsIfObserverSubscribesFromBackgroundThread() throws Exception final Future future = Executors.newSingleThreadExecutor().submit(new Callable() { @Override public Object call() throws Exception { - OperationObserveFromAndroidComponent.observeFromAndroidComponent( + OperatorObserveFromAndroidComponent.observeFromAndroidComponent( testObservable, mockFragment).subscribe(mockObserver); return null; } @@ -109,7 +108,7 @@ public void call(Integer t1) { }); final AtomicReference currentThreadName = new AtomicReference(); - OperationObserveFromAndroidComponent.observeFromAndroidComponent(testObservable, mockFragment).subscribe(new Action1() { + OperatorObserveFromAndroidComponent.observeFromAndroidComponent(testObservable, mockFragment).subscribe(new Action1() { @Override public void call(Integer i) { @@ -129,8 +128,8 @@ public void call(Integer i) { @Test public void itForwardsOnNextOnCompletedSequenceToTargetObserver() { - Observable source = Observable.from(1, 2, 3); - OperationObserveFromAndroidComponent.observeFromAndroidComponent(source, mockFragment).subscribe(new TestObserver(mockObserver)); + Observable source = Observable.from(Arrays.asList(1, 2, 3)); + OperatorObserveFromAndroidComponent.observeFromAndroidComponent(source, mockFragment).subscribe(new TestObserver(mockObserver)); verify(mockObserver, times(3)).onNext(anyInt()); verify(mockObserver).onCompleted(); verify(mockObserver, never()).onError(any(Exception.class)); @@ -140,7 +139,7 @@ public void itForwardsOnNextOnCompletedSequenceToTargetObserver() { public void itForwardsOnErrorToTargetObserver() { final Exception exception = new Exception(); Observable source = Observable.error(exception); - OperationObserveFromAndroidComponent.observeFromAndroidComponent(source, mockFragment).subscribe(new TestObserver(mockObserver)); + OperatorObserveFromAndroidComponent.observeFromAndroidComponent(source, mockFragment).subscribe(new TestObserver(mockObserver)); verify(mockObserver).onError(exception); verify(mockObserver, never()).onNext(anyInt()); verify(mockObserver, never()).onCompleted(); @@ -150,8 +149,8 @@ public void itForwardsOnErrorToTargetObserver() { public void itDropsOnNextOnCompletedSequenceIfTargetComponentIsGone() throws Throwable { PublishSubject source = PublishSubject.create(); - final Observable.OnSubscribeFunc operator = newOnSubscribeFragmentInstance(source, mockFragment); - operator.onSubscribe(new TestSubscriber(mockObserver)); + final Observable.OnSubscribe operator = newOnSubscribeFragmentInstance(source, mockFragment); + operator.call(new TestSubscriber(mockObserver)); source.onNext(1); releaseComponentRef(operator); @@ -168,8 +167,8 @@ public void itDropsOnNextOnCompletedSequenceIfTargetComponentIsGone() throws Thr public void itDropsOnErrorIfTargetComponentIsGone() throws Throwable { PublishSubject source = PublishSubject.create(); - final Observable.OnSubscribeFunc operator = newOnSubscribeFragmentInstance(source, mockFragment); - operator.onSubscribe(new TestSubscriber(mockObserver)); + final Observable.OnSubscribe operator = newOnSubscribeFragmentInstance(source, mockFragment); + operator.call(new TestSubscriber(mockObserver)); source.onNext(1); releaseComponentRef(operator); @@ -180,11 +179,12 @@ public void itDropsOnErrorIfTargetComponentIsGone() throws Throwable { verifyNoMoreInteractions(mockObserver); } - private Observable.OnSubscribeFunc newOnSubscribeFragmentInstance(Observable source, Fragment fragment) throws NoSuchMethodException, IllegalAccessException, InstantiationException, InvocationTargetException { - final Class[] klasses = OperationObserveFromAndroidComponent.class.getDeclaredClasses(); - Class onSubscribeFragmentClass = null; - for (Class klass : klasses) { - if ("rx.operators.OperationObserveFromAndroidComponent$OnSubscribeFragment".equals(klass.getName())) { + @SuppressWarnings("unchecked") + private Observable.OnSubscribe newOnSubscribeFragmentInstance(Observable source, Fragment fragment) throws NoSuchMethodException, IllegalAccessException, InstantiationException, InvocationTargetException { + final Class[] klasses = OperatorObserveFromAndroidComponent.class.getDeclaredClasses(); + Class onSubscribeFragmentClass = null; + for (Class klass : klasses) { + if ("rx.operators.OperatorObserveFromAndroidComponent$OnSubscribeFragment".equals(klass.getName())) { onSubscribeFragmentClass = klass; break; } @@ -192,10 +192,10 @@ private Observable.OnSubscribeFunc newOnSubscribeFragmentInstance(Obser Constructor constructor = onSubscribeFragmentClass.getDeclaredConstructor(Observable.class, Fragment.class); constructor.setAccessible(true); Object object = constructor.newInstance(source, fragment); - return (Observable.OnSubscribeFunc) object; + return (Observable.OnSubscribe) object; } - private void releaseComponentRef(Observable.OnSubscribeFunc operator) throws NoSuchFieldException, IllegalAccessException { + private void releaseComponentRef(Observable.OnSubscribe operator) throws NoSuchFieldException, IllegalAccessException { final Field componentRef = operator.getClass().getSuperclass().getDeclaredField("componentRef"); componentRef.setAccessible(true); componentRef.set(operator, null); @@ -204,7 +204,7 @@ private void releaseComponentRef(Observable.OnSubscribeFunc operator) t @Test public void itDoesNotForwardOnNextOnCompletedSequenceIfFragmentIsDetached() { PublishSubject source = PublishSubject.create(); - OperationObserveFromAndroidComponent.observeFromAndroidComponent(source, mockFragment).subscribe(new TestObserver(mockObserver)); + OperatorObserveFromAndroidComponent.observeFromAndroidComponent(source, mockFragment).subscribe(new TestObserver(mockObserver)); source.onNext(1); @@ -220,7 +220,7 @@ public void itDoesNotForwardOnNextOnCompletedSequenceIfFragmentIsDetached() { @Test public void itDoesNotForwardOnErrorIfFragmentIsDetached() { PublishSubject source = PublishSubject.create(); - OperationObserveFromAndroidComponent.observeFromAndroidComponent(source, mockFragment).subscribe(new TestObserver(mockObserver)); + OperatorObserveFromAndroidComponent.observeFromAndroidComponent(source, mockFragment).subscribe(new TestObserver(mockObserver)); source.onNext(1); @@ -231,25 +231,4 @@ public void itDoesNotForwardOnErrorIfFragmentIsDetached() { verify(mockObserver, never()).onError(any(Exception.class)); } - @Test - public void itUnsubscribesFromTheSourceSequence() { - final BooleanSubscription s = new BooleanSubscription(); - Observable testObservable = Observable.create(new OnSubscribeFunc() { - - @Override - public Subscription onSubscribe(Observer o) { - o.onNext(1); - o.onCompleted(); - return s; - } - - }); - - Subscription sub = OperationObserveFromAndroidComponent.observeFromAndroidComponent( - testObservable, mockActivity).subscribe(new TestObserver(mockObserver)); - sub.unsubscribe(); - - assertTrue(s.isUnsubscribed()); - } - } diff --git a/rxjava-contrib/rxjava-android/src/test/java/rx/android/schedulers/HandlerThreadSchedulerTest.java b/rxjava-contrib/rxjava-android/src/test/java/rx/android/schedulers/HandlerThreadSchedulerTest.java index 73cde11caa..d68bd258f0 100644 --- a/rxjava-contrib/rxjava-android/src/test/java/rx/android/schedulers/HandlerThreadSchedulerTest.java +++ b/rxjava-contrib/rxjava-android/src/test/java/rx/android/schedulers/HandlerThreadSchedulerTest.java @@ -15,8 +15,10 @@ */ package rx.android.schedulers; -import static org.mockito.Matchers.*; -import static org.mockito.Mockito.*; +import static org.mockito.Matchers.any; +import static org.mockito.Matchers.eq; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verify; import java.util.concurrent.TimeUnit; @@ -28,9 +30,7 @@ import rx.Scheduler; import rx.Scheduler.Inner; -import rx.Subscription; import rx.functions.Action1; -import rx.functions.Func2; import android.os.Handler; @RunWith(RobolectricTestRunner.class)