Skip to content

Commit

Permalink
Add subscribeBy extensions for proxy classes. (#127)
Browse files Browse the repository at this point in the history
* Add subscribeBy extensions for proxy classes.

Unfortunately, since the SubscribeProxy classes don't extend the
observable class they proxy to, extension functions written for the
observable class can't be used.

One commonly used set of extensions is RxKotlin's subscribeBy. This
commit adds copies of those extensions that work on SubscribeProxies.

* Move extensions to sample.

* update in response to review.

* Remove wildcard import.

* Use block bodies for extensions.
  • Loading branch information
ajalt authored and ZacSweers committed Nov 10, 2017
1 parent f697749 commit 1817cca
Show file tree
Hide file tree
Showing 2 changed files with 109 additions and 8 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
package com.uber.autodispose.recipes

import com.uber.autodispose.CompletableSubscribeProxy
import com.uber.autodispose.FlowableSubscribeProxy
import com.uber.autodispose.MaybeSubscribeProxy
import com.uber.autodispose.ObservableSubscribeProxy
import com.uber.autodispose.SingleSubscribeProxy
import io.reactivex.disposables.Disposable
import io.reactivex.exceptions.OnErrorNotImplementedException
import io.reactivex.plugins.RxJavaPlugins

/*
* An example of extension functions on the objects returned by `AutoDispose.with`.
*
* AutoDispose returns proxy objects that don't extend Observable or the other reactive classes. This means
* that extensions like RxKotlin's `Observable.subscribeBy` can't be used. However, it's easy to define your
* own.
*
* These extension functions can be called in the following manner:
*
* ```
* Observable.just(1)
* .autoDisposeWith(this)
* .subscribeBy(onError = { Log.e(it) })
* ```
*/

private val onNextStub: (Any) -> Unit = {}
private val onErrorStub: (Throwable) -> Unit = { RxJavaPlugins.onError(OnErrorNotImplementedException(it)) }
private val onCompleteStub: () -> Unit = {}

/**
* Overloaded subscribe function that allows passing named parameters
*/
fun <T : Any> ObservableSubscribeProxy<T>.subscribeBy(
onError: (Throwable) -> Unit = onErrorStub,
onComplete: () -> Unit = onCompleteStub,
onNext: (T) -> Unit = onNextStub
): Disposable {
return if (onError === onErrorStub && onComplete === onCompleteStub) {
subscribe(onNext)
} else {
subscribe(onNext, onError, onComplete)
}
}

/**
* Overloaded subscribe function that allows passing named parameters
*/
fun <T : Any> FlowableSubscribeProxy<T>.subscribeBy(
onError: (Throwable) -> Unit = onErrorStub,
onComplete: () -> Unit = onCompleteStub,
onNext: (T) -> Unit = onNextStub
): Disposable {
return if (onError === onErrorStub && onComplete === onCompleteStub) {
subscribe(onNext)
} else {
subscribe(onNext, onError, onComplete)
}
}

/**
* Overloaded subscribe function that allows passing named parameters
*/
fun <T : Any> SingleSubscribeProxy<T>.subscribeBy(
onError: (Throwable) -> Unit = onErrorStub,
onSuccess: (T) -> Unit = onNextStub
): Disposable {
return if (onError === onErrorStub) {
subscribe(onSuccess)
} else {
subscribe(onSuccess, onError)
}
}

/**
* Overloaded subscribe function that allows passing named parameters
*/
fun <T : Any> MaybeSubscribeProxy<T>.subscribeBy(
onError: (Throwable) -> Unit = onErrorStub,
onComplete: () -> Unit = onCompleteStub,
onSuccess: (T) -> Unit = onNextStub
): Disposable {
return if (onError === onErrorStub && onComplete === onCompleteStub) {
subscribe(onSuccess)
} else {
subscribe(onSuccess, onError, onComplete)
}
}


/**
* Overloaded subscribe function that allows passing named parameters
*/
fun CompletableSubscribeProxy.subscribeBy(
onError: (Throwable) -> Unit = onErrorStub,
onComplete: () -> Unit = onCompleteStub
): Disposable {
return if (onError === onErrorStub) {
subscribe(onComplete)
} else {
subscribe(onComplete, onError)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import android.support.v7.app.AppCompatActivity
import android.util.Log
import com.uber.autodispose.android.lifecycle.AndroidLifecycleScopeProvider
import com.uber.autodispose.kotlin.autoDisposeWith
import com.uber.autodispose.recipes.subscribeBy
import io.reactivex.Observable
import java.util.concurrent.TimeUnit

Expand All @@ -45,7 +46,7 @@ class KotlinActivity : AppCompatActivity() {
Observable.interval(1, TimeUnit.SECONDS)
.doOnDispose { Log.i(TAG, "Disposing subscription from onCreate()") }
.autoDisposeWith(scopeProvider)
.subscribe { num -> Log.i(TAG, "Started in onCreate(), running until onDestroy(): " + num) }
.subscribeBy { num -> Log.i(TAG, "Started in onCreate(), running until onDestroy(): $num") }
}

override fun onStart() {
Expand All @@ -58,7 +59,7 @@ class KotlinActivity : AppCompatActivity() {
Observable.interval(1, TimeUnit.SECONDS)
.doOnDispose { Log.i(TAG, "Disposing subscription from onStart()") }
.autoDisposeWith(scopeProvider)
.subscribe { num -> Log.i(TAG, "Started in onStart(), running until in onStop(): " + num) }
.subscribeBy { num -> Log.i(TAG, "Started in onStart(), running until in onStop(): $num") }
}

override fun onResume() {
Expand All @@ -71,19 +72,15 @@ class KotlinActivity : AppCompatActivity() {
Observable.interval(1, TimeUnit.SECONDS)
.doOnDispose { Log.i(TAG, "Disposing subscription from onResume()") }
.autoDisposeWith(scopeProvider)
.subscribe { num ->
Log.i(TAG, "Started in onResume(), running until in onPause(): " + num)
}
.subscribeBy { num -> Log.i(TAG, "Started in onResume(), running until in onPause(): $num") }

// Setting a specific untilEvent, this should dispose in onDestroy.
Observable.interval(1, TimeUnit.SECONDS)
.doOnDispose {
Log.i(TAG, "Disposing subscription from onResume() with untilEvent ON_DESTROY")
}
.autoDisposeWith(AndroidLifecycleScopeProvider.from(this, Lifecycle.Event.ON_DESTROY))
.subscribe{ num ->
Log.i(TAG, "Started in onResume(), running until in onDestroy(): " + num)
}
.subscribeBy { num -> Log.i(TAG, "Started in onResume(), running until in onDestroy(): $num") }
}

override fun onPause() {
Expand Down

0 comments on commit 1817cca

Please sign in to comment.