From 499e7d57246d9018f71afe5c1d7e6df3bddfaf92 Mon Sep 17 00:00:00 2001 From: AJ Date: Tue, 7 Nov 2017 23:42:51 +0000 Subject: [PATCH 1/5] Add subscribeBy extensions for proxy classes. Unfortunately, since the SubscribeProxy classes don't extend the observable class they proxy to, extension functions written for the observable class can't be used. One commonly used set of extensions is RxKotlin's subscribeBy. This commit adds copies of those extensions that work on SubscribeProxies. --- .../autodispose/kotlin/subscriberproxies.kt | 54 +++++++++++++++++++ .../kotlin/SubscriberProxiesKotlinTest.kt | 47 ++++++++++++++++ 2 files changed, 101 insertions(+) create mode 100644 autodispose-kotlin/src/main/kotlin/com/uber/autodispose/kotlin/subscriberproxies.kt create mode 100644 autodispose-kotlin/src/test/kotlin/com/uber/autodispose/kotlin/SubscriberProxiesKotlinTest.kt diff --git a/autodispose-kotlin/src/main/kotlin/com/uber/autodispose/kotlin/subscriberproxies.kt b/autodispose-kotlin/src/main/kotlin/com/uber/autodispose/kotlin/subscriberproxies.kt new file mode 100644 index 000000000..f532346e3 --- /dev/null +++ b/autodispose-kotlin/src/main/kotlin/com/uber/autodispose/kotlin/subscriberproxies.kt @@ -0,0 +1,54 @@ +package com.uber.autodispose.kotlin + +import com.uber.autodispose.* +import io.reactivex.disposables.Disposable +import io.reactivex.exceptions.OnErrorNotImplementedException +import io.reactivex.plugins.RxJavaPlugins + +private val onNextStub: (Any) -> Unit = {} +private val onErrorStub: (Throwable) -> Unit = { RxJavaPlugins.onError(OnErrorNotImplementedException(it)) } +private val onCompleteStub: () -> Unit = {} + + +/** + * Overloaded subscribe function that allows passing named parameters + */ +fun ObservableSubscribeProxy.subscribeBy( + onError: (Throwable) -> Unit = onErrorStub, + onComplete: () -> Unit = onCompleteStub, + onNext: (T) -> Unit = onNextStub +): Disposable = subscribe(onNext, onError, onComplete) + +/** + * Overloaded subscribe function that allows passing named parameters + */ +fun FlowableSubscribeProxy.subscribeBy( + onError: (Throwable) -> Unit = onErrorStub, + onComplete: () -> Unit = onCompleteStub, + onNext: (T) -> Unit = onNextStub +): Disposable = subscribe(onNext, onError, onComplete) + +/** + * Overloaded subscribe function that allows passing named parameters + */ +fun SingleSubscribeProxy.subscribeBy( + onError: (Throwable) -> Unit = onErrorStub, + onSuccess: (T) -> Unit = onNextStub +): Disposable = subscribe(onSuccess, onError) + +/** + * Overloaded subscribe function that allows passing named parameters + */ +fun MaybeSubscribeProxy.subscribeBy( + onError: (Throwable) -> Unit = onErrorStub, + onComplete: () -> Unit = onCompleteStub, + onSuccess: (T) -> Unit = onNextStub +): Disposable = subscribe(onSuccess, onError, onComplete) + +/** + * Overloaded subscribe function that allows passing named parameters + */ +fun CompletableSubscribeProxy.subscribeBy( + onError: (Throwable) -> Unit = onErrorStub, + onComplete: () -> Unit = onCompleteStub +): Disposable = subscribe(onComplete, onError) diff --git a/autodispose-kotlin/src/test/kotlin/com/uber/autodispose/kotlin/SubscriberProxiesKotlinTest.kt b/autodispose-kotlin/src/test/kotlin/com/uber/autodispose/kotlin/SubscriberProxiesKotlinTest.kt new file mode 100644 index 000000000..6295d9b61 --- /dev/null +++ b/autodispose-kotlin/src/test/kotlin/com/uber/autodispose/kotlin/SubscriberProxiesKotlinTest.kt @@ -0,0 +1,47 @@ +package com.uber.autodispose.kotlin + +import com.google.common.truth.Truth.assertThat +import io.reactivex.* +import io.reactivex.subjects.MaybeSubject +import org.junit.Test +import java.util.concurrent.atomic.AtomicReference + +class SubscriberProxiesKotlinTest { + private val scopeMaybe = MaybeSubject.create() + private val ref = AtomicReference() + + @Test fun subscribeBy_ObservableSubscribeProxy() { + Observable.just("Hello") + .autoDisposeWith(scopeMaybe) + .subscribeBy { ref.set(it) } + assertThat(ref.get()).isEqualTo("Hello") + } + + @Test fun subscribeBy_FlowableSubscribeProxy() { + Flowable.just("Hello 2") + .autoDisposeWith(scopeMaybe) + .subscribeBy { ref.set(it) } + assertThat(ref.get()).isEqualTo("Hello 2") + } + + @Test fun subscribeBy_SingleSubscribeProxy() { + Single.just("Hello 3") + .autoDisposeWith(scopeMaybe) + .subscribeBy { ref.set(it) } + assertThat(ref.get()).isEqualTo("Hello 3") + } + + @Test fun subscribeBy_MaybeSubscribeProxy() { + Maybe.just("Hello 4") + .autoDisposeWith(scopeMaybe) + .subscribeBy { ref.set(it) } + assertThat(ref.get()).isEqualTo("Hello 4") + } + + @Test fun subscribeBy_CompletableSubscribeProxy() { + Completable.complete() + .autoDisposeWith(scopeMaybe) + .subscribeBy { ref.set("Hello 5") } + assertThat(ref.get()).isEqualTo("Hello 5") + } +} From de19526a5c1644ad6ff75cae63d61601ca5d08be Mon Sep 17 00:00:00 2001 From: AJ Date: Wed, 8 Nov 2017 18:43:39 +0000 Subject: [PATCH 2/5] Move extensions to sample. --- .../kotlin/SubscriberProxiesKotlinTest.kt | 47 ------------------- .../autodispose/recipes}/subscriberproxies.kt | 2 +- .../uber/autodispose/sample/KotlinActivity.kt | 13 ++--- 3 files changed, 6 insertions(+), 56 deletions(-) delete mode 100644 autodispose-kotlin/src/test/kotlin/com/uber/autodispose/kotlin/SubscriberProxiesKotlinTest.kt rename {autodispose-kotlin/src/main/kotlin/com/uber/autodispose/kotlin => sample/src/main/kotlin/com/uber/autodispose/recipes}/subscriberproxies.kt (98%) diff --git a/autodispose-kotlin/src/test/kotlin/com/uber/autodispose/kotlin/SubscriberProxiesKotlinTest.kt b/autodispose-kotlin/src/test/kotlin/com/uber/autodispose/kotlin/SubscriberProxiesKotlinTest.kt deleted file mode 100644 index 6295d9b61..000000000 --- a/autodispose-kotlin/src/test/kotlin/com/uber/autodispose/kotlin/SubscriberProxiesKotlinTest.kt +++ /dev/null @@ -1,47 +0,0 @@ -package com.uber.autodispose.kotlin - -import com.google.common.truth.Truth.assertThat -import io.reactivex.* -import io.reactivex.subjects.MaybeSubject -import org.junit.Test -import java.util.concurrent.atomic.AtomicReference - -class SubscriberProxiesKotlinTest { - private val scopeMaybe = MaybeSubject.create() - private val ref = AtomicReference() - - @Test fun subscribeBy_ObservableSubscribeProxy() { - Observable.just("Hello") - .autoDisposeWith(scopeMaybe) - .subscribeBy { ref.set(it) } - assertThat(ref.get()).isEqualTo("Hello") - } - - @Test fun subscribeBy_FlowableSubscribeProxy() { - Flowable.just("Hello 2") - .autoDisposeWith(scopeMaybe) - .subscribeBy { ref.set(it) } - assertThat(ref.get()).isEqualTo("Hello 2") - } - - @Test fun subscribeBy_SingleSubscribeProxy() { - Single.just("Hello 3") - .autoDisposeWith(scopeMaybe) - .subscribeBy { ref.set(it) } - assertThat(ref.get()).isEqualTo("Hello 3") - } - - @Test fun subscribeBy_MaybeSubscribeProxy() { - Maybe.just("Hello 4") - .autoDisposeWith(scopeMaybe) - .subscribeBy { ref.set(it) } - assertThat(ref.get()).isEqualTo("Hello 4") - } - - @Test fun subscribeBy_CompletableSubscribeProxy() { - Completable.complete() - .autoDisposeWith(scopeMaybe) - .subscribeBy { ref.set("Hello 5") } - assertThat(ref.get()).isEqualTo("Hello 5") - } -} diff --git a/autodispose-kotlin/src/main/kotlin/com/uber/autodispose/kotlin/subscriberproxies.kt b/sample/src/main/kotlin/com/uber/autodispose/recipes/subscriberproxies.kt similarity index 98% rename from autodispose-kotlin/src/main/kotlin/com/uber/autodispose/kotlin/subscriberproxies.kt rename to sample/src/main/kotlin/com/uber/autodispose/recipes/subscriberproxies.kt index f532346e3..49092312e 100644 --- a/autodispose-kotlin/src/main/kotlin/com/uber/autodispose/kotlin/subscriberproxies.kt +++ b/sample/src/main/kotlin/com/uber/autodispose/recipes/subscriberproxies.kt @@ -1,4 +1,4 @@ -package com.uber.autodispose.kotlin +package com.uber.autodispose.recipes import com.uber.autodispose.* import io.reactivex.disposables.Disposable diff --git a/sample/src/main/kotlin/com/uber/autodispose/sample/KotlinActivity.kt b/sample/src/main/kotlin/com/uber/autodispose/sample/KotlinActivity.kt index 3f587594b..617d7ea1e 100644 --- a/sample/src/main/kotlin/com/uber/autodispose/sample/KotlinActivity.kt +++ b/sample/src/main/kotlin/com/uber/autodispose/sample/KotlinActivity.kt @@ -22,6 +22,7 @@ import android.support.v7.app.AppCompatActivity import android.util.Log import com.uber.autodispose.android.lifecycle.AndroidLifecycleScopeProvider import com.uber.autodispose.kotlin.autoDisposeWith +import com.uber.autodispose.recipes.subscribeBy import io.reactivex.Observable import java.util.concurrent.TimeUnit @@ -45,7 +46,7 @@ class KotlinActivity : AppCompatActivity() { Observable.interval(1, TimeUnit.SECONDS) .doOnDispose { Log.i(TAG, "Disposing subscription from onCreate()") } .autoDisposeWith(scopeProvider) - .subscribe { num -> Log.i(TAG, "Started in onCreate(), running until onDestroy(): " + num) } + .subscribeBy { Log.i(TAG, "Started in onCreate(), running until onDestroy(): $it") } } override fun onStart() { @@ -58,7 +59,7 @@ class KotlinActivity : AppCompatActivity() { Observable.interval(1, TimeUnit.SECONDS) .doOnDispose { Log.i(TAG, "Disposing subscription from onStart()") } .autoDisposeWith(scopeProvider) - .subscribe { num -> Log.i(TAG, "Started in onStart(), running until in onStop(): " + num) } + .subscribeBy { Log.i(TAG, "Started in onStart(), running until in onStop(): $it") } } override fun onResume() { @@ -71,9 +72,7 @@ class KotlinActivity : AppCompatActivity() { Observable.interval(1, TimeUnit.SECONDS) .doOnDispose { Log.i(TAG, "Disposing subscription from onResume()") } .autoDisposeWith(scopeProvider) - .subscribe { num -> - Log.i(TAG, "Started in onResume(), running until in onPause(): " + num) - } + .subscribeBy { Log.i(TAG, "Started in onResume(), running until in onPause(): $it") } // Setting a specific untilEvent, this should dispose in onDestroy. Observable.interval(1, TimeUnit.SECONDS) @@ -81,9 +80,7 @@ class KotlinActivity : AppCompatActivity() { Log.i(TAG, "Disposing subscription from onResume() with untilEvent ON_DESTROY") } .autoDisposeWith(AndroidLifecycleScopeProvider.from(this, Lifecycle.Event.ON_DESTROY)) - .subscribe{ num -> - Log.i(TAG, "Started in onResume(), running until in onDestroy(): " + num) - } + .subscribeBy { Log.i(TAG, "Started in onResume(), running until in onDestroy(): $it") } } override fun onPause() { From d5d94bf0f4899f994809576234df1b48a1816463 Mon Sep 17 00:00:00 2001 From: AJ Date: Thu, 9 Nov 2017 20:16:39 +0000 Subject: [PATCH 3/5] update in response to review. --- .../autodispose/recipes/subscriberproxies.kt | 53 ++++++++++++++++--- .../uber/autodispose/sample/KotlinActivity.kt | 8 +-- 2 files changed, 51 insertions(+), 10 deletions(-) diff --git a/sample/src/main/kotlin/com/uber/autodispose/recipes/subscriberproxies.kt b/sample/src/main/kotlin/com/uber/autodispose/recipes/subscriberproxies.kt index 49092312e..db9cf5503 100644 --- a/sample/src/main/kotlin/com/uber/autodispose/recipes/subscriberproxies.kt +++ b/sample/src/main/kotlin/com/uber/autodispose/recipes/subscriberproxies.kt @@ -5,11 +5,26 @@ import io.reactivex.disposables.Disposable import io.reactivex.exceptions.OnErrorNotImplementedException import io.reactivex.plugins.RxJavaPlugins +/* + * An example of extension functions on the objects returned by `AutoDispose.with`. + * + * AutoDispose returns proxy objects that don't extend Observable or the other reactive classes. This means + * that extensions like RxKotlin's `Observable.subscribeBy` can't be used. However, it's easy to define your + * own. + * + * These extension functions can be called in the following manner: + * + * ``` + * Observable.just(1) + * .autoDisposeWith(this) + * .subscribeBy(onError = { Log.e(it) }) + * ``` + */ + private val onNextStub: (Any) -> Unit = {} private val onErrorStub: (Throwable) -> Unit = { RxJavaPlugins.onError(OnErrorNotImplementedException(it)) } private val onCompleteStub: () -> Unit = {} - /** * Overloaded subscribe function that allows passing named parameters */ @@ -17,7 +32,12 @@ fun ObservableSubscribeProxy.subscribeBy( onError: (Throwable) -> Unit = onErrorStub, onComplete: () -> Unit = onCompleteStub, onNext: (T) -> Unit = onNextStub -): Disposable = subscribe(onNext, onError, onComplete) +): Disposable = + if (onError === onErrorStub && onComplete === onCompleteStub) { + subscribe(onNext) + } else { + subscribe(onNext, onError, onComplete) + } /** * Overloaded subscribe function that allows passing named parameters @@ -26,7 +46,12 @@ fun FlowableSubscribeProxy.subscribeBy( onError: (Throwable) -> Unit = onErrorStub, onComplete: () -> Unit = onCompleteStub, onNext: (T) -> Unit = onNextStub -): Disposable = subscribe(onNext, onError, onComplete) +): Disposable = + if (onError === onErrorStub && onComplete === onCompleteStub) { + subscribe(onNext) + } else { + subscribe(onNext, onError, onComplete) + } /** * Overloaded subscribe function that allows passing named parameters @@ -34,7 +59,12 @@ fun FlowableSubscribeProxy.subscribeBy( fun SingleSubscribeProxy.subscribeBy( onError: (Throwable) -> Unit = onErrorStub, onSuccess: (T) -> Unit = onNextStub -): Disposable = subscribe(onSuccess, onError) +): Disposable = + if (onError === onErrorStub) { + subscribe(onSuccess) + } else { + subscribe(onSuccess, onError) + } /** * Overloaded subscribe function that allows passing named parameters @@ -43,7 +73,13 @@ fun MaybeSubscribeProxy.subscribeBy( onError: (Throwable) -> Unit = onErrorStub, onComplete: () -> Unit = onCompleteStub, onSuccess: (T) -> Unit = onNextStub -): Disposable = subscribe(onSuccess, onError, onComplete) +): Disposable = + if (onError === onErrorStub && onComplete === onCompleteStub) { + subscribe(onSuccess) + } else { + subscribe(onSuccess, onError, onComplete) + } + /** * Overloaded subscribe function that allows passing named parameters @@ -51,4 +87,9 @@ fun MaybeSubscribeProxy.subscribeBy( fun CompletableSubscribeProxy.subscribeBy( onError: (Throwable) -> Unit = onErrorStub, onComplete: () -> Unit = onCompleteStub -): Disposable = subscribe(onComplete, onError) +): Disposable = + if (onError === onErrorStub) { + subscribe(onComplete) + } else { + subscribe(onComplete, onError) + } diff --git a/sample/src/main/kotlin/com/uber/autodispose/sample/KotlinActivity.kt b/sample/src/main/kotlin/com/uber/autodispose/sample/KotlinActivity.kt index 617d7ea1e..8624fe37c 100644 --- a/sample/src/main/kotlin/com/uber/autodispose/sample/KotlinActivity.kt +++ b/sample/src/main/kotlin/com/uber/autodispose/sample/KotlinActivity.kt @@ -46,7 +46,7 @@ class KotlinActivity : AppCompatActivity() { Observable.interval(1, TimeUnit.SECONDS) .doOnDispose { Log.i(TAG, "Disposing subscription from onCreate()") } .autoDisposeWith(scopeProvider) - .subscribeBy { Log.i(TAG, "Started in onCreate(), running until onDestroy(): $it") } + .subscribeBy { num -> Log.i(TAG, "Started in onCreate(), running until onDestroy(): $num") } } override fun onStart() { @@ -59,7 +59,7 @@ class KotlinActivity : AppCompatActivity() { Observable.interval(1, TimeUnit.SECONDS) .doOnDispose { Log.i(TAG, "Disposing subscription from onStart()") } .autoDisposeWith(scopeProvider) - .subscribeBy { Log.i(TAG, "Started in onStart(), running until in onStop(): $it") } + .subscribeBy { num -> Log.i(TAG, "Started in onStart(), running until in onStop(): $num") } } override fun onResume() { @@ -72,7 +72,7 @@ class KotlinActivity : AppCompatActivity() { Observable.interval(1, TimeUnit.SECONDS) .doOnDispose { Log.i(TAG, "Disposing subscription from onResume()") } .autoDisposeWith(scopeProvider) - .subscribeBy { Log.i(TAG, "Started in onResume(), running until in onPause(): $it") } + .subscribeBy { num -> Log.i(TAG, "Started in onResume(), running until in onPause(): $num") } // Setting a specific untilEvent, this should dispose in onDestroy. Observable.interval(1, TimeUnit.SECONDS) @@ -80,7 +80,7 @@ class KotlinActivity : AppCompatActivity() { Log.i(TAG, "Disposing subscription from onResume() with untilEvent ON_DESTROY") } .autoDisposeWith(AndroidLifecycleScopeProvider.from(this, Lifecycle.Event.ON_DESTROY)) - .subscribeBy { Log.i(TAG, "Started in onResume(), running until in onDestroy(): $it") } + .subscribeBy { num -> Log.i(TAG, "Started in onResume(), running until in onDestroy(): $num") } } override fun onPause() { From 523e8e029cedd644635c91cd698b09bff3959770 Mon Sep 17 00:00:00 2001 From: AJ Date: Thu, 9 Nov 2017 20:18:04 +0000 Subject: [PATCH 4/5] Remove wildcard import. --- .../com/uber/autodispose/recipes/subscriberproxies.kt | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/sample/src/main/kotlin/com/uber/autodispose/recipes/subscriberproxies.kt b/sample/src/main/kotlin/com/uber/autodispose/recipes/subscriberproxies.kt index db9cf5503..aee1392f3 100644 --- a/sample/src/main/kotlin/com/uber/autodispose/recipes/subscriberproxies.kt +++ b/sample/src/main/kotlin/com/uber/autodispose/recipes/subscriberproxies.kt @@ -1,6 +1,10 @@ package com.uber.autodispose.recipes -import com.uber.autodispose.* +import com.uber.autodispose.CompletableSubscribeProxy +import com.uber.autodispose.FlowableSubscribeProxy +import com.uber.autodispose.MaybeSubscribeProxy +import com.uber.autodispose.ObservableSubscribeProxy +import com.uber.autodispose.SingleSubscribeProxy import io.reactivex.disposables.Disposable import io.reactivex.exceptions.OnErrorNotImplementedException import io.reactivex.plugins.RxJavaPlugins From bddfe1896d613ec13f25bd606c80e4cf06b1b52d Mon Sep 17 00:00:00 2001 From: AJ Date: Fri, 10 Nov 2017 00:04:55 +0000 Subject: [PATCH 5/5] Use block bodies for extensions. --- .../autodispose/recipes/subscriberproxies.kt | 65 ++++++++++--------- 1 file changed, 35 insertions(+), 30 deletions(-) diff --git a/sample/src/main/kotlin/com/uber/autodispose/recipes/subscriberproxies.kt b/sample/src/main/kotlin/com/uber/autodispose/recipes/subscriberproxies.kt index aee1392f3..ba0b9cc1a 100644 --- a/sample/src/main/kotlin/com/uber/autodispose/recipes/subscriberproxies.kt +++ b/sample/src/main/kotlin/com/uber/autodispose/recipes/subscriberproxies.kt @@ -36,12 +36,13 @@ fun ObservableSubscribeProxy.subscribeBy( onError: (Throwable) -> Unit = onErrorStub, onComplete: () -> Unit = onCompleteStub, onNext: (T) -> Unit = onNextStub -): Disposable = - if (onError === onErrorStub && onComplete === onCompleteStub) { - subscribe(onNext) - } else { - subscribe(onNext, onError, onComplete) - } +): Disposable { + return if (onError === onErrorStub && onComplete === onCompleteStub) { + subscribe(onNext) + } else { + subscribe(onNext, onError, onComplete) + } +} /** * Overloaded subscribe function that allows passing named parameters @@ -50,12 +51,13 @@ fun FlowableSubscribeProxy.subscribeBy( onError: (Throwable) -> Unit = onErrorStub, onComplete: () -> Unit = onCompleteStub, onNext: (T) -> Unit = onNextStub -): Disposable = - if (onError === onErrorStub && onComplete === onCompleteStub) { - subscribe(onNext) - } else { - subscribe(onNext, onError, onComplete) - } +): Disposable { + return if (onError === onErrorStub && onComplete === onCompleteStub) { + subscribe(onNext) + } else { + subscribe(onNext, onError, onComplete) + } +} /** * Overloaded subscribe function that allows passing named parameters @@ -63,12 +65,13 @@ fun FlowableSubscribeProxy.subscribeBy( fun SingleSubscribeProxy.subscribeBy( onError: (Throwable) -> Unit = onErrorStub, onSuccess: (T) -> Unit = onNextStub -): Disposable = - if (onError === onErrorStub) { - subscribe(onSuccess) - } else { - subscribe(onSuccess, onError) - } +): Disposable { + return if (onError === onErrorStub) { + subscribe(onSuccess) + } else { + subscribe(onSuccess, onError) + } +} /** * Overloaded subscribe function that allows passing named parameters @@ -77,12 +80,13 @@ fun MaybeSubscribeProxy.subscribeBy( onError: (Throwable) -> Unit = onErrorStub, onComplete: () -> Unit = onCompleteStub, onSuccess: (T) -> Unit = onNextStub -): Disposable = - if (onError === onErrorStub && onComplete === onCompleteStub) { - subscribe(onSuccess) - } else { - subscribe(onSuccess, onError, onComplete) - } +): Disposable { + return if (onError === onErrorStub && onComplete === onCompleteStub) { + subscribe(onSuccess) + } else { + subscribe(onSuccess, onError, onComplete) + } +} /** @@ -91,9 +95,10 @@ fun MaybeSubscribeProxy.subscribeBy( fun CompletableSubscribeProxy.subscribeBy( onError: (Throwable) -> Unit = onErrorStub, onComplete: () -> Unit = onCompleteStub -): Disposable = - if (onError === onErrorStub) { - subscribe(onComplete) - } else { - subscribe(onComplete, onError) - } +): Disposable { + return if (onError === onErrorStub) { + subscribe(onComplete) + } else { + subscribe(onComplete, onError) + } +}