Skip to content

Commit

Permalink
Run suspending calls within Dispatchers.Default
Browse files Browse the repository at this point in the history
This addresses a need for off-main-thread invocation of
Call.Factory.newCall to support lazy HttpClient initialization.
  • Loading branch information
jingibus committed Mar 30, 2022
1 parent 1490e6b commit 98ad945
Show file tree
Hide file tree
Showing 4 changed files with 103 additions and 104 deletions.
10 changes: 3 additions & 7 deletions retrofit-mock/src/main/java/retrofit2/mock/BehaviorDelegate.java
Original file line number Diff line number Diff line change
Expand Up @@ -77,13 +77,9 @@ public <R> T returning(Call<R> call) {

Call<Object> adaptedCall = (Call<Object>) adapted;
Continuation<Object> continuation = (Continuation<Object>) args[args.length - 1];
try {
return adapterInfo.wantsResponse
? KotlinExtensions.awaitResponse(adaptedCall, continuation)
: KotlinExtensions.await(adaptedCall, continuation);
} catch (Exception e) {
return KotlinExtensions.suspendAndThrow(e, continuation);
}
return adapterInfo.wantsResponse
? KotlinExtensions.awaitResponse(adaptedCall, continuation)
: KotlinExtensions.await(adaptedCall, continuation);
});
}

Expand Down
40 changes: 36 additions & 4 deletions retrofit/kotlin-test/src/test/java/retrofit2/KotlinSuspendTest.kt
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,15 @@
*/
package retrofit2

import java.io.IOException
import java.lang.reflect.ParameterizedType
import java.lang.reflect.Type
import java.util.concurrent.Executors
import kotlin.coroutines.CoroutineContext
import kotlinx.coroutines.CoroutineDispatcher
import kotlinx.coroutines.GlobalScope
import kotlinx.coroutines.Runnable
import kotlinx.coroutines.asCoroutineDispatcher
import kotlinx.coroutines.async
import kotlinx.coroutines.runBlocking
import kotlinx.coroutines.withContext
Expand All @@ -35,10 +42,6 @@ import retrofit2.helpers.ToStringConverterFactory
import retrofit2.http.GET
import retrofit2.http.HEAD
import retrofit2.http.Path
import java.io.IOException
import java.lang.reflect.ParameterizedType
import java.lang.reflect.Type
import kotlin.coroutines.CoroutineContext

class KotlinSuspendTest {
@get:Rule val server = MockWebServer()
Expand Down Expand Up @@ -353,6 +356,35 @@ class KotlinSuspendTest {
}
}

@Test fun usesCoroutineContextForCallFactory() {
val executor = Executors.newSingleThreadExecutor()
val okHttpClient = OkHttpClient()
var callFactoryThread: Thread? = null
val outerContextThread: Thread
val retrofit = Retrofit.Builder()
.baseUrl(server.url("/"))
.callFactory {
callFactoryThread = Thread.currentThread()
okHttpClient.newCall(it)
}
.addConverterFactory(ToStringConverterFactory())
.coroutineDispatcher(executor.asCoroutineDispatcher())
.build()
val example = retrofit.create(Service::class.java)

server.enqueue(MockResponse().setBody("Hi"))

runBlocking {
outerContextThread = Thread.currentThread()
example.body()
}

assertThat(callFactoryThread).isNotNull
assertThat(outerContextThread).isNotEqualTo(callFactoryThread)
executor.shutdownNow()
}


@Suppress("EXPERIMENTAL_OVERRIDE")
private object DirectUnconfinedDispatcher : CoroutineDispatcher() {
override fun isDispatchNeeded(context: CoroutineContext): Boolean = false
Expand Down
33 changes: 8 additions & 25 deletions retrofit/src/main/java/retrofit2/HttpServiceMethod.java
Original file line number Diff line number Diff line change
Expand Up @@ -192,12 +192,7 @@ protected Object adapt(Call<ResponseT> call, Object[] args) {
Continuation<Response<ResponseT>> continuation =
(Continuation<Response<ResponseT>>) args[args.length - 1];

// See SuspendForBody for explanation about this try/catch.
try {
return KotlinExtensions.awaitResponse(call, continuation);
} catch (Exception e) {
return KotlinExtensions.suspendAndThrow(e, continuation);
}
return KotlinExtensions.awaitResponse(call, continuation);
}
}

Expand Down Expand Up @@ -226,25 +221,13 @@ protected Object adapt(Call<ResponseT> call, Object[] args) {
//noinspection unchecked Checked by reflection inside RequestFactory.
Continuation<ResponseT> continuation = (Continuation<ResponseT>) args[args.length - 1];

// Calls to OkHttp Call.enqueue() like those inside await and awaitNullable can sometimes
// invoke the supplied callback with an exception before the invoking stack frame can return.
// Coroutines will intercept the subsequent invocation of the Continuation and throw the
// exception synchronously. A Java Proxy cannot throw checked exceptions without them being
// declared on the interface method. To avoid the synchronous checked exception being wrapped
// in an UndeclaredThrowableException, it is intercepted and supplied to a helper which will
// force suspension to occur so that it can be instead delivered to the continuation to
// bypass this restriction.
try {
if (isUnit) {
//noinspection unchecked Checked by isUnit
return KotlinExtensions.awaitUnit((Call<Unit>) call, (Continuation<Unit>) continuation);
} else if (isNullable) {
return KotlinExtensions.awaitNullable(call, continuation);
} else {
return KotlinExtensions.await(call, continuation);
}
} catch (Exception e) {
return KotlinExtensions.suspendAndThrow(e, continuation);
if (isUnit) {
//noinspection unchecked Checked by isUnit
return KotlinExtensions.awaitUnit((Call<Unit>) call, (Continuation<Unit>) continuation);
} else if (isNullable) {
return KotlinExtensions.await(call, continuation);
} else {
return KotlinExtensions.await(call, continuation);
}
}
}
Expand Down
124 changes: 56 additions & 68 deletions retrofit/src/main/java/retrofit2/KotlinExtensions.kt
Original file line number Diff line number Diff line change
Expand Up @@ -20,67 +20,70 @@ package retrofit2

import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.suspendCancellableCoroutine
import java.lang.reflect.ParameterizedType
import kotlin.coroutines.intrinsics.COROUTINE_SUSPENDED
import kotlin.coroutines.intrinsics.intercepted
import kotlin.coroutines.intrinsics.suspendCoroutineUninterceptedOrReturn
import kotlinx.coroutines.withContext
import kotlin.coroutines.resume
import kotlin.coroutines.resumeWithException

inline fun <reified T: Any> Retrofit.create(): T = create(T::class.java)

suspend fun <T : Any> Call<T>.await(): T {
return suspendCancellableCoroutine { continuation ->
continuation.invokeOnCancellation {
cancel()
}
enqueue(object : Callback<T> {
override fun onResponse(call: Call<T>, response: Response<T>) {
if (response.isSuccessful) {
val body = response.body()
if (body == null) {
val invocation = call.request().tag(Invocation::class.java)!!
val method = invocation.method()
val e = KotlinNullPointerException("Response from " +
// TODO: a better solution for off-main-thread call factories than this.
return withContext(Dispatchers.Default) {
suspendCancellableCoroutine { continuation ->
continuation.invokeOnCancellation {
cancel()
}
enqueue(object : Callback<T> {
override fun onResponse(call: Call<T>, response: Response<T>) {
if (response.isSuccessful) {
val body = response.body()
if (body == null) {
val invocation = call.request().tag(Invocation::class.java)!!
val method = invocation.method()
val e = KotlinNullPointerException("Response from " +
method.declaringClass.name +
'.' +
method.name +
" was null but response body type was declared as non-null")
continuation.resumeWithException(e)
continuation.resumeWithException(e)
} else {
continuation.resume(body)
}
} else {
continuation.resume(body)
continuation.resumeWithException(HttpException(response))
}
} else {
continuation.resumeWithException(HttpException(response))
}
}

override fun onFailure(call: Call<T>, t: Throwable) {
continuation.resumeWithException(t)
}
})
override fun onFailure(call: Call<T>, t: Throwable) {
continuation.resumeWithException(t)
}
})
}
}
}

@JvmName("awaitNullable")
suspend fun <T : Any> Call<T?>.await(): T? {
return suspendCancellableCoroutine { continuation ->
continuation.invokeOnCancellation {
cancel()
}
enqueue(object : Callback<T?> {
override fun onResponse(call: Call<T?>, response: Response<T?>) {
if (response.isSuccessful) {
continuation.resume(response.body())
} else {
continuation.resumeWithException(HttpException(response))
}
// TODO: a better solution for off-main-thread call factories than this.
return withContext(Dispatchers.Default) {
suspendCancellableCoroutine { continuation ->
continuation.invokeOnCancellation {
cancel()
}
enqueue(object : Callback<T?> {
override fun onResponse(call: Call<T?>, response: Response<T?>) {
if (response.isSuccessful) {
continuation.resume(response.body())
} else {
continuation.resumeWithException(HttpException(response))
}
}

override fun onFailure(call: Call<T?>, t: Throwable) {
continuation.resumeWithException(t)
}
})
override fun onFailure(call: Call<T?>, t: Throwable) {
continuation.resumeWithException(t)
}
})
}
}
}

Expand All @@ -91,36 +94,21 @@ suspend fun Call<Unit>.await() {
}

suspend fun <T> Call<T>.awaitResponse(): Response<T> {
return suspendCancellableCoroutine { continuation ->
continuation.invokeOnCancellation {
cancel()
}
enqueue(object : Callback<T> {
override fun onResponse(call: Call<T>, response: Response<T>) {
continuation.resume(response)
// TODO: a better solution for off-main-thread call factories than this.
return withContext(Dispatchers.Default) {
suspendCancellableCoroutine { continuation ->
continuation.invokeOnCancellation {
cancel()
}
enqueue(object : Callback<T> {
override fun onResponse(call: Call<T>, response: Response<T>) {
continuation.resume(response)
}

override fun onFailure(call: Call<T>, t: Throwable) {
continuation.resumeWithException(t)
}
})
}
}

/**
* Force the calling coroutine to suspend before throwing [this].
*
* This is needed when a checked exception is synchronously caught in a [java.lang.reflect.Proxy]
* invocation to avoid being wrapped in [java.lang.reflect.UndeclaredThrowableException].
*
* The implementation is derived from:
* https://github.com/Kotlin/kotlinx.coroutines/pull/1667#issuecomment-556106349
*/
internal suspend fun Exception.suspendAndThrow(): Nothing {
suspendCoroutineUninterceptedOrReturn<Nothing> { continuation ->
Dispatchers.Default.dispatch(continuation.context) {
continuation.intercepted().resumeWithException(this@suspendAndThrow)
override fun onFailure(call: Call<T>, t: Throwable) {
continuation.resumeWithException(t)
}
})
}
COROUTINE_SUSPENDED
}
}

0 comments on commit 98ad945

Please sign in to comment.