Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add subscribeBy extensions for proxy classes. #127

Merged
merged 5 commits into from
Nov 10, 2017
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
package com.uber.autodispose.recipes

import com.uber.autodispose.CompletableSubscribeProxy
import com.uber.autodispose.FlowableSubscribeProxy
import com.uber.autodispose.MaybeSubscribeProxy
import com.uber.autodispose.ObservableSubscribeProxy
import com.uber.autodispose.SingleSubscribeProxy
import io.reactivex.disposables.Disposable
import io.reactivex.exceptions.OnErrorNotImplementedException
import io.reactivex.plugins.RxJavaPlugins

/*
* An example of extension functions on the objects returned by `AutoDispose.with`.
*
* AutoDispose returns proxy objects that don't extend Observable or the other reactive classes. This means
* that extensions like RxKotlin's `Observable.subscribeBy` can't be used. However, it's easy to define your
* own.
*
* These extension functions can be called in the following manner:
*
* ```
* Observable.just(1)
* .autoDisposeWith(this)
* .subscribeBy(onError = { Log.e(it) })
* ```
*/

private val onNextStub: (Any) -> Unit = {}
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's add a file doc above this that explains how this is an example of how you can tack on extension functions to the proxy interface

/*
 * Stuff!
 */

private val onErrorStub: (Throwable) -> Unit = { RxJavaPlugins.onError(OnErrorNotImplementedException(it)) }
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd like to propose diverging from how RxKotlin does this. The reason is that RxJava actually does special handling for missing error implementations via LambdaConsumerIntrospection, that supplying this would not catch. I'd say for the cases where only a next/success/complete consumer/action is passed in, let's call through to the unhandled one in rxjava. Will comment inline with specifics

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hm, interesting. I added RxKotlin's current implementation before LambdaConsumerIntrospection was in place, and it matched RxJava's behavior at the time. I'll look into your PR in detail later, but do you think RxKotlin should also be updated?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah I'd say it probably should. Would be a non-breaking change, but it would allow rxjava to properly expose the introspection details

private val onCompleteStub: () -> Unit = {}

/**
* Overloaded subscribe function that allows passing named parameters
*/
fun <T : Any> ObservableSubscribeProxy<T>.subscribeBy(
onError: (Throwable) -> Unit = onErrorStub,
onComplete: () -> Unit = onCompleteStub,
onNext: (T) -> Unit = onNextStub
): Disposable =
if (onError === onErrorStub && onComplete === onCompleteStub) {
subscribe(onNext)
} else {
subscribe(onNext, onError, onComplete)
}

/**
* Overloaded subscribe function that allows passing named parameters
*/
fun <T : Any> FlowableSubscribeProxy<T>.subscribeBy(
onError: (Throwable) -> Unit = onErrorStub,
onComplete: () -> Unit = onCompleteStub,
onNext: (T) -> Unit = onNextStub
): Disposable =
if (onError === onErrorStub && onComplete === onCompleteStub) {
subscribe(onNext)
} else {
subscribe(onNext, onError, onComplete)
}

/**
* Overloaded subscribe function that allows passing named parameters
*/
fun <T : Any> SingleSubscribeProxy<T>.subscribeBy(
onError: (Throwable) -> Unit = onErrorStub,
onSuccess: (T) -> Unit = onNextStub
): Disposable =
if (onError === onErrorStub) {
subscribe(onSuccess)
} else {
subscribe(onSuccess, onError)
}

/**
* Overloaded subscribe function that allows passing named parameters
*/
fun <T : Any> MaybeSubscribeProxy<T>.subscribeBy(
onError: (Throwable) -> Unit = onErrorStub,
onComplete: () -> Unit = onCompleteStub,
onSuccess: (T) -> Unit = onNextStub
): Disposable =
if (onError === onErrorStub && onComplete === onCompleteStub) {
subscribe(onSuccess)
} else {
subscribe(onSuccess, onError, onComplete)
}


/**
* Overloaded subscribe function that allows passing named parameters
*/
fun CompletableSubscribeProxy.subscribeBy(
onError: (Throwable) -> Unit = onErrorStub,
onComplete: () -> Unit = onCompleteStub
): Disposable =
if (onError === onErrorStub) {
subscribe(onComplete)
} else {
subscribe(onComplete, onError)
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import android.support.v7.app.AppCompatActivity
import android.util.Log
import com.uber.autodispose.android.lifecycle.AndroidLifecycleScopeProvider
import com.uber.autodispose.kotlin.autoDisposeWith
import com.uber.autodispose.recipes.subscribeBy
import io.reactivex.Observable
import java.util.concurrent.TimeUnit

Expand All @@ -45,7 +46,7 @@ class KotlinActivity : AppCompatActivity() {
Observable.interval(1, TimeUnit.SECONDS)
.doOnDispose { Log.i(TAG, "Disposing subscription from onCreate()") }
.autoDisposeWith(scopeProvider)
.subscribe { num -> Log.i(TAG, "Started in onCreate(), running until onDestroy(): " + num) }
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: let's keep the num, as it was an intentionally more specific name

.subscribeBy { num -> Log.i(TAG, "Started in onCreate(), running until onDestroy(): $num") }
}

override fun onStart() {
Expand All @@ -58,7 +59,7 @@ class KotlinActivity : AppCompatActivity() {
Observable.interval(1, TimeUnit.SECONDS)
.doOnDispose { Log.i(TAG, "Disposing subscription from onStart()") }
.autoDisposeWith(scopeProvider)
.subscribe { num -> Log.i(TAG, "Started in onStart(), running until in onStop(): " + num) }
.subscribeBy { num -> Log.i(TAG, "Started in onStart(), running until in onStop(): $num") }
}

override fun onResume() {
Expand All @@ -71,19 +72,15 @@ class KotlinActivity : AppCompatActivity() {
Observable.interval(1, TimeUnit.SECONDS)
.doOnDispose { Log.i(TAG, "Disposing subscription from onResume()") }
.autoDisposeWith(scopeProvider)
.subscribe { num ->
Log.i(TAG, "Started in onResume(), running until in onPause(): " + num)
}
.subscribeBy { num -> Log.i(TAG, "Started in onResume(), running until in onPause(): $num") }

// Setting a specific untilEvent, this should dispose in onDestroy.
Observable.interval(1, TimeUnit.SECONDS)
.doOnDispose {
Log.i(TAG, "Disposing subscription from onResume() with untilEvent ON_DESTROY")
}
.autoDisposeWith(AndroidLifecycleScopeProvider.from(this, Lifecycle.Event.ON_DESTROY))
.subscribe{ num ->
Log.i(TAG, "Started in onResume(), running until in onDestroy(): " + num)
}
.subscribeBy { num -> Log.i(TAG, "Started in onResume(), running until in onDestroy(): $num") }
}

override fun onPause() {
Expand Down