From 84350ccecd71d0b62b98f54ab4a5c4826b772cb8 Mon Sep 17 00:00:00 2001 From: Bill Phillips Date: Thu, 10 Feb 2022 17:00:13 -0800 Subject: [PATCH] Run suspending calls within Dispatchers.Default This addresses a need for off-main-thread invocation of Call.Factory.newCall to support lazy HttpClient initialization. --- .../test/java/retrofit2/KotlinSuspendTest.kt | 27 +++++ .../main/java/retrofit2/KotlinExtensions.kt | 105 ++++++++++-------- 2 files changed, 84 insertions(+), 48 deletions(-) diff --git a/retrofit/kotlin-test/src/test/java/retrofit2/KotlinSuspendTest.kt b/retrofit/kotlin-test/src/test/java/retrofit2/KotlinSuspendTest.kt index 260fd7ab98..8cfdd89f5e 100644 --- a/retrofit/kotlin-test/src/test/java/retrofit2/KotlinSuspendTest.kt +++ b/retrofit/kotlin-test/src/test/java/retrofit2/KotlinSuspendTest.kt @@ -17,6 +17,7 @@ package retrofit2 import kotlinx.coroutines.CoroutineDispatcher import kotlinx.coroutines.GlobalScope +import kotlinx.coroutines.Runnable import kotlinx.coroutines.async import kotlinx.coroutines.runBlocking import kotlinx.coroutines.withContext @@ -353,6 +354,32 @@ class KotlinSuspendTest { } } + @Test fun usesCoroutineContextForCallFactory() { + val okHttpClient = OkHttpClient() + var callFactoryThread: Thread? = null + val outerContextThread: Thread + val retrofit = Retrofit.Builder() + .baseUrl(server.url("/")) + .callFactory { + callFactoryThread = Thread.currentThread() + okHttpClient.newCall(it) + } + .addConverterFactory(ToStringConverterFactory()) + .build() + val example = retrofit.create(Service::class.java) + + server.enqueue(MockResponse().setBody("Hi")) + + runBlocking { + outerContextThread = Thread.currentThread() + example.body() + } + + assertThat(callFactoryThread).isNotNull + assertThat(outerContextThread).isNotEqualTo(callFactoryThread) + } + + @Suppress("EXPERIMENTAL_OVERRIDE") private object DirectUnconfinedDispatcher : CoroutineDispatcher() { override fun isDispatchNeeded(context: CoroutineContext): Boolean = false diff --git a/retrofit/src/main/java/retrofit2/KotlinExtensions.kt b/retrofit/src/main/java/retrofit2/KotlinExtensions.kt index d334b25136..ed70a8264c 100644 --- a/retrofit/src/main/java/retrofit2/KotlinExtensions.kt +++ b/retrofit/src/main/java/retrofit2/KotlinExtensions.kt @@ -20,7 +20,7 @@ package retrofit2 import kotlinx.coroutines.Dispatchers import kotlinx.coroutines.suspendCancellableCoroutine -import java.lang.reflect.ParameterizedType +import kotlinx.coroutines.withContext import kotlin.coroutines.intrinsics.COROUTINE_SUSPENDED import kotlin.coroutines.intrinsics.intercepted import kotlin.coroutines.intrinsics.suspendCoroutineUninterceptedOrReturn @@ -30,57 +30,63 @@ import kotlin.coroutines.resumeWithException inline fun Retrofit.create(): T = create(T::class.java) suspend fun Call.await(): T { - return suspendCancellableCoroutine { continuation -> - continuation.invokeOnCancellation { - cancel() - } - enqueue(object : Callback { - override fun onResponse(call: Call, response: Response) { - if (response.isSuccessful) { - val body = response.body() - if (body == null) { - val invocation = call.request().tag(Invocation::class.java)!! - val method = invocation.method() - val e = KotlinNullPointerException("Response from " + + // TODO: a better solution for off-main-thread call factories than this. + return withContext(Dispatchers.Default) { + suspendCancellableCoroutine { continuation -> + continuation.invokeOnCancellation { + cancel() + } + enqueue(object : Callback { + override fun onResponse(call: Call, response: Response) { + if (response.isSuccessful) { + val body = response.body() + if (body == null) { + val invocation = call.request().tag(Invocation::class.java)!! + val method = invocation.method() + val e = KotlinNullPointerException("Response from " + method.declaringClass.name + '.' + method.name + " was null but response body type was declared as non-null") - continuation.resumeWithException(e) + continuation.resumeWithException(e) + } else { + continuation.resume(body) + } } else { - continuation.resume(body) + continuation.resumeWithException(HttpException(response)) } - } else { - continuation.resumeWithException(HttpException(response)) } - } - override fun onFailure(call: Call, t: Throwable) { - continuation.resumeWithException(t) - } - }) + override fun onFailure(call: Call, t: Throwable) { + continuation.resumeWithException(t) + } + }) + } } } @JvmName("awaitNullable") suspend fun Call.await(): T? { - return suspendCancellableCoroutine { continuation -> - continuation.invokeOnCancellation { - cancel() - } - enqueue(object : Callback { - override fun onResponse(call: Call, response: Response) { - if (response.isSuccessful) { - continuation.resume(response.body()) - } else { - continuation.resumeWithException(HttpException(response)) - } + // TODO: a better solution for off-main-thread call factories than this. + return withContext(Dispatchers.Default) { + suspendCancellableCoroutine { continuation -> + continuation.invokeOnCancellation { + cancel() } + enqueue(object : Callback { + override fun onResponse(call: Call, response: Response) { + if (response.isSuccessful) { + continuation.resume(response.body()) + } else { + continuation.resumeWithException(HttpException(response)) + } + } - override fun onFailure(call: Call, t: Throwable) { - continuation.resumeWithException(t) - } - }) + override fun onFailure(call: Call, t: Throwable) { + continuation.resumeWithException(t) + } + }) + } } } @@ -91,19 +97,22 @@ suspend fun Call.await() { } suspend fun Call.awaitResponse(): Response { - return suspendCancellableCoroutine { continuation -> - continuation.invokeOnCancellation { - cancel() - } - enqueue(object : Callback { - override fun onResponse(call: Call, response: Response) { - continuation.resume(response) + // TODO: a better solution for off-main-thread call factories than this. + return withContext(Dispatchers.Default) { + suspendCancellableCoroutine { continuation -> + continuation.invokeOnCancellation { + cancel() } + enqueue(object : Callback { + override fun onResponse(call: Call, response: Response) { + continuation.resume(response) + } - override fun onFailure(call: Call, t: Throwable) { - continuation.resumeWithException(t) - } - }) + override fun onFailure(call: Call, t: Throwable) { + continuation.resumeWithException(t) + } + }) + } } }