Skip to content

Commit

Permalink
Properly cleanup thread locals for non-CoroutineDispatcher-intercepte… (
Browse files Browse the repository at this point in the history
#4303)

* Properly cleanup thread locals for non-CoroutineDispatcher-intercepted continuations

There was one codepath not covered by undispatched thread local cleanup procedure: when a custom ContinuationInterceptor is used and the scoped coroutine (i.e. withContext) is completed in-place without suspensions.

Fixed with the introduction of the corresponding machinery for ScopeCoroutine

Fixes #4296
  • Loading branch information
qwwdfsad authored Dec 19, 2024
1 parent 2cafea4 commit f8c0304
Show file tree
Hide file tree
Showing 5 changed files with 133 additions and 6 deletions.
10 changes: 10 additions & 0 deletions kotlinx-coroutines-core/common/src/AbstractCoroutine.kt
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ package kotlinx.coroutines
import kotlinx.coroutines.CoroutineStart.*
import kotlinx.coroutines.intrinsics.*
import kotlin.coroutines.*
import kotlinx.coroutines.internal.ScopeCoroutine

/**
* Abstract base class for implementation of coroutines in coroutine builders.
Expand Down Expand Up @@ -100,6 +101,15 @@ public abstract class AbstractCoroutine<in T>(
afterResume(state)
}

/**
* Invoked when the corresponding `AbstractCoroutine` was **conceptually** resumed, but not mechanically.
* Currently, this function only invokes `resume` on the underlying continuation for [ScopeCoroutine]
* or does nothing otherwise.
*
* Examples of resumes:
* - `afterCompletion` calls when the corresponding `Job` changed its state (i.e. got cancelled)
* - [AbstractCoroutine.resumeWith] was invoked
*/
protected open fun afterResume(state: Any?): Unit = afterCompletion(state)

internal final override fun handleOnCompletionException(exception: Throwable) {
Expand Down
7 changes: 7 additions & 0 deletions kotlinx-coroutines-core/common/src/internal/Scopes.kt
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,13 @@ internal open class ScopeCoroutine<in T>(
uCont.intercepted().resumeCancellableWith(recoverResult(state, uCont))
}

/**
* Invoked when a scoped coorutine was completed in an undispatched manner directly
* at the place of its start because it never suspended.
*/
open fun afterCompletionUndispatched() {
}

override fun afterResume(state: Any?) {
// Resume direct because scope is already in the correct context
uCont.resumeWith(recoverResult(state, uCont))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@ private inline fun <T> ScopeCoroutine<T>.undispatchedResult(
if (result === COROUTINE_SUSPENDED) return COROUTINE_SUSPENDED // (1)
val state = makeCompletingOnce(result)
if (state === COMPLETING_WAITING_CHILDREN) return COROUTINE_SUSPENDED // (2)
afterCompletionUndispatched()
return if (state is CompletedExceptionally) { // (3)
when {
shouldThrow(state.cause) -> throw recoverStackTrace(state.cause, uCont)
Expand Down
21 changes: 15 additions & 6 deletions kotlinx-coroutines-core/jvm/src/CoroutineContext.kt
Original file line number Diff line number Diff line change
Expand Up @@ -185,7 +185,8 @@ internal actual class UndispatchedCoroutine<in T>actual constructor (
* `withContext` for the sake of logging, MDC, tracing etc., meaning that there exists thousands of
* undispatched coroutines.
* Each access to Java's [ThreadLocal] leaves a footprint in the corresponding Thread's `ThreadLocalMap`
* that is cleared automatically as soon as the associated thread-local (-> UndispatchedCoroutine) is garbage collected.
* that is cleared automatically as soon as the associated thread-local (-> UndispatchedCoroutine) is garbage collected
* when either the corresponding thread is GC'ed or it cleans up its stale entries on other TL accesses.
* When such coroutines are promoted to old generation, `ThreadLocalMap`s become bloated and an arbitrary accesses to thread locals
* start to consume significant amount of CPU because these maps are open-addressed and cleaned up incrementally on each access.
* (You can read more about this effect as "GC nepotism").
Expand Down Expand Up @@ -253,18 +254,26 @@ internal actual class UndispatchedCoroutine<in T>actual constructor (
}
}

override fun afterCompletionUndispatched() {
clearThreadLocal()
}

override fun afterResume(state: Any?) {
clearThreadLocal()
// resume undispatched -- update context but stay on the same dispatcher
val result = recoverResult(state, uCont)
withContinuationContext(uCont, null) {
uCont.resumeWith(result)
}
}

private fun clearThreadLocal() {
if (threadLocalIsSet) {
threadStateToRecover.get()?.let { (ctx, value) ->
restoreThreadContext(ctx, value)
}
threadStateToRecover.remove()
}
// resume undispatched -- update context but stay on the same dispatcher
val result = recoverResult(state, uCont)
withContinuationContext(uCont, null) {
uCont.resumeWith(result)
}
}
}

Expand Down
100 changes: 100 additions & 0 deletions kotlinx-coroutines-core/jvm/test/ThreadLocalsLeaksTest.kt
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
package kotlinx.coroutines

import kotlinx.coroutines.testing.TestBase
import java.lang.ref.WeakReference
import kotlin.coroutines.AbstractCoroutineContextElement
import kotlin.coroutines.Continuation
import kotlin.coroutines.ContinuationInterceptor
import kotlin.coroutines.CoroutineContext
import kotlin.test.Test

/*
* This is an adapted verion of test from #4296.
*
* qwwdfsad: the test relies on System.gc() actually collecting the garbage.
* If these tests flake on CI, first check that JDK/GC setup in not an issue.
*/
class ThreadLocalCustomContinuationInterceptorTest : TestBase() {

private class CustomContinuationInterceptor(private val delegate: ContinuationInterceptor) :
AbstractCoroutineContextElement(ContinuationInterceptor), ContinuationInterceptor {

override fun <T> interceptContinuation(continuation: Continuation<T>): Continuation<T> {
return delegate.interceptContinuation(continuation)
}
}

private class CustomNeverEqualContinuationInterceptor(private val delegate: ContinuationInterceptor) :
AbstractCoroutineContextElement(ContinuationInterceptor), ContinuationInterceptor {

override fun <T> interceptContinuation(continuation: Continuation<T>): Continuation<T> {
return delegate.interceptContinuation(continuation)
}

override fun equals(other: Any?) = false
}

@Test(timeout = 20_000L)
fun testDefaultDispatcherNoSuspension() = ensureCoroutineContextGCed(Dispatchers.Default, suspend = false)

@Test(timeout = 20_000L)
fun testDefaultDispatcher() = ensureCoroutineContextGCed(Dispatchers.Default, suspend = true)


@Test(timeout = 20_000L)
fun testNonCoroutineDispatcher() = ensureCoroutineContextGCed(
CustomContinuationInterceptor(Dispatchers.Default),
suspend = true
)

@Test(timeout = 20_000L)
fun testNonCoroutineDispatcherSuspension() = ensureCoroutineContextGCed(
CustomContinuationInterceptor(Dispatchers.Default),
suspend = false
)

// Note asymmetric equals codepath never goes through the undispatched withContext, thus the separate test case

@Test(timeout = 20_000L)
fun testNonCoroutineDispatcherAsymmetricEquals() =
ensureCoroutineContextGCed(
CustomNeverEqualContinuationInterceptor(Dispatchers.Default),
suspend = true
)

@Test(timeout = 20_000L)
fun testNonCoroutineDispatcherAsymmetricEqualsSuspension() =
ensureCoroutineContextGCed(
CustomNeverEqualContinuationInterceptor(Dispatchers.Default),
suspend = false
)


@Volatile
private var letThatSinkIn: Any = "What is my purpose? To frag the garbage collctor"

private fun ensureCoroutineContextGCed(coroutineContext: CoroutineContext, suspend: Boolean) {
fun forceGcUntilRefIsCleaned(ref: WeakReference<CoroutineName>) {
while (ref.get() != null) {
System.gc()
letThatSinkIn = LongArray(1024 * 1024)
}
}

runTest {
lateinit var ref: WeakReference<CoroutineName>
val job = GlobalScope.launch(coroutineContext) {
val coroutineName = CoroutineName("Yo")
ref = WeakReference(coroutineName)
withContext(coroutineName) {
if (suspend) {
delay(1)
}
}
}
job.join()

forceGcUntilRefIsCleaned(ref)
}
}
}

0 comments on commit f8c0304

Please sign in to comment.