Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add Storage.CrossThreadLocal #961

Merged
merged 2 commits into from
Mar 24, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,9 @@

package kamon.bench

import java.util.concurrent.TimeUnit
import kamon.context.Storage.CrossThreadLocal

import kamon.context.Storage.Scope
import java.util.concurrent.TimeUnit
import kamon.context.{Context, Storage}
import org.openjdk.jmh.annotations._

Expand All @@ -28,19 +28,19 @@ class ThreadLocalStorageBenchmark {
val TestKey: Context.Key[Int] = Context.key("test-key", 0)
val ContextWithKey: Context = Context.of(TestKey, 43)

val TLS: Storage = new OldThreadLocal
val CrossTLS: Storage = new CrossThreadLocal
val FTLS: Storage = new Storage.ThreadLocal


@Benchmark
@BenchmarkMode(Array(Mode.AverageTime))
@OutputTimeUnit(TimeUnit.NANOSECONDS)
@Fork
def currentThreadLocal: Context = {
val scope = TLS.store(ContextWithKey)
TLS.current()
def crossThreadLocal: Context = {
val scope = CrossTLS.store(ContextWithKey)
CrossTLS.current()
scope.close()
TLS.current()
CrossTLS.current()
}

@Benchmark
Expand All @@ -54,28 +54,3 @@ class ThreadLocalStorageBenchmark {
FTLS.current()
}
}


class OldThreadLocal extends Storage {
private val tls = new java.lang.ThreadLocal[Context]() {
override def initialValue(): Context = Context.Empty
}

override def current(): Context =
tls.get()

override def store(context: Context): Scope = {
val newContext = context
val previousContext = tls.get()
tls.set(newContext)

new Scope {
override def context: Context = newContext
override def close(): Unit = tls.set(previousContext)
}
}
}

object OldThreadLocal {
def apply(): OldThreadLocal = new OldThreadLocal()
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,17 @@
package kamon.context


import org.scalatest.{Matchers, WordSpec}
import kamon.context.Storage.Scope
import org.scalatest.{WordSpec, BeforeAndAfterAll, AsyncWordSpec, Matchers, Assertion}

class ThreadLocalStorageSpec extends WordSpec with Matchers {
import java.util.concurrent.Executors
import scala.concurrent.{Promise, ExecutionContext, Future}
import scala.util.Try
import org.scalatest.concurrent.ScalaFutures._

class ThreadLocalStorageSpec extends WordSpec with Matchers with BeforeAndAfterAll {

private val ec = ExecutionContext.fromExecutorService(Executors.newFixedThreadPool(2))

"the Storage.ThreadLocal implementation of Context storage" should {
"return a empty context when no context has been set" in {
Expand Down Expand Up @@ -47,9 +55,64 @@ class ThreadLocalStorageSpec extends WordSpec with Matchers {

}

val TLS: Storage = new Storage.ThreadLocal
"the Storage.CrossThreadLocal implementation of Context storage" should {
"return a empty context when no context has been set" in {
CrossTLS.current() shouldBe Context.Empty
}

"return the empty value for keys that have not been set in the context" in {
CrossTLS.current().get(TestKey) shouldBe 42
CrossTLS.current().get(AnotherKey) shouldBe 99
CrossTLS.current().get(BroadcastKey) shouldBe "i travel around"

ScopeWithKey.get(TestKey) shouldBe 43
ScopeWithKey.get(AnotherKey) shouldBe 99
ScopeWithKey.get(BroadcastKey) shouldBe "i travel around"
}

"allow setting a context as current and remove it when closing the Scope" in {
CrossTLS.current() shouldBe Context.Empty

val scope = CrossTLS.store(ScopeWithKey)
CrossTLS.current() shouldBe theSameInstanceAs(ScopeWithKey)
scope.close()

CrossTLS.current() shouldBe Context.Empty
}

"Allow closing the scope in a different thread than the original" in {
var scope: Scope = null

val f1 = Future {
// previous context
CrossTLS.store(ContextWithAnotherKey)
scope = CrossTLS.store(ScopeWithKey)
Thread.sleep(10)
CrossTLS.current() shouldBe theSameInstanceAs(ScopeWithKey)
}(ec)

val f2 = Future {
while (scope == null) {} // wait for scope to be created in the other thread
CrossTLS.current() shouldBe Context.Empty
scope.close()
CrossTLS.current() shouldBe theSameInstanceAs(ContextWithAnotherKey)
}(ec)

f1.flatMap(_ => f2)(ec).futureValue
}

}

override protected def afterAll(): Unit = {
ec.shutdown()
super.afterAll()
}

val TLS: Storage = Storage.ThreadLocal()
val CrossTLS: Storage = Storage.CrossThreadLocal()
val TestKey = Context.key("test-key", 42)
val AnotherKey = Context.key("another-key", 99)
val BroadcastKey = Context.key("broadcast", "i travel around")
val ScopeWithKey = Context.of(TestKey, 43)
val ContextWithAnotherKey = Context.of(AnotherKey, 98)
}
15 changes: 12 additions & 3 deletions core/kamon-core/src/main/scala/kamon/ContextStorage.scala
Original file line number Diff line number Diff line change
Expand Up @@ -141,9 +141,18 @@ object ContextStorage {
* instrumentation follows them around.
*/
private val _contextStorage: Storage = {
if(sys.props("kamon.context.debug") == "true")
val storageTypeStr = Option(sys.props("kamon.context.storageType"))

if (sys.props("kamon.context.debug") == "true")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Now that the choice is between a couple of actual implementations, this should probably be loaded from the configuration file, not the environment

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Folks, I'm thinking that we should make CrossThreadLocal the default context storage implementation. The reasons why I think that are:

  • It will not affect the behavior of any of the manual/automatic instrumentation. We always aimed to close scopes on the same thread where we created them, and in those situations this implementation will produce the same results. In 99% of the cases we need to close the scope in the same thread we created it anyways, and Cats/Monix implementations are an exception to that common case so we will continue to recommended closing scopes on the same thread, unless it is very clear that there is a reason for doing otherwise, as it is for Cats/Monix.
  • It makes it easier to get started. If there is something I learned over the years is that users almost never read the fine print 😄.. if users need to add a JVM property but that property is controlled from the outside world (IDE, command line parameters or container orchestrators), there will be several cases of things "not working" in certain environments. I would rather go with an option that makes it work by default for everyone.
  • In the specific cases where someone needs that little boost in performance (I'm counting to be very few cases here) they can add the extra JVM options to use the optimized TLS.

What do you think about that?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This all sounds reasonable, and simplifying semantics is always good!
I'm just scared because this is a "breaking" change, and I have no intuition for how often this code is called.
If you're sure that we won't see a big performance drop, let's do it, and just make sure to document clearly and make not of it in the release notes.

Copy link
Contributor

@dpsoft dpsoft Mar 23, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree 100% with the default context storage should be CrossThreadLocal but would leave the optimized one as an option for experimented/power users.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I totally agree @ivantopo (Makes my life easier that's for sure ;)). Happy to make the change if @SimunKaracic agree too

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree 🤝

Copy link
Contributor Author

@jatcwang jatcwang Mar 23, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is done.
@SimunKaracic I did not make it a config because in the kamon.context.Storage.Debug docstring it says we don't allow this be discovered from configuration because it can cause initialization issues when Kamon is first initialized via instrumentation trying to access the current Context which I think still applies. Added kamon.context.storageType system property though

Storage.Debug()
else
Storage.ThreadLocal()
else {
storageTypeStr match {
case None => Storage.CrossThreadLocal()
case Some("debug") => Storage.Debug()
case Some("sameThreadScope") => Storage.ThreadLocal()
case Some("default") => Storage.CrossThreadLocal()
case Some(other) => throw new IllegalArgumentException(s"Unrecognized kamon.context.storageType value: $other")
}
}
}
}
39 changes: 36 additions & 3 deletions core/kamon-core/src/main/scala/kamon/context/Storage.scala
Original file line number Diff line number Diff line change
Expand Up @@ -57,15 +57,48 @@ object Storage {
def close(): Unit
}

/**
* A ThreadLocal context storage that allows the scope to be closed in a different
* thread than the thread where store(..) was called.
* This is roughly 25% slower than [[kamon.context.Storage.ThreadLocal]] but is required for certain
* library integrations such as cats-effect IO or Monix.
* Turn this on by setting the System Property "kamon.context.crossThread" to "true".
*/
class CrossThreadLocal extends Storage {
private val tls = new java.lang.ThreadLocal[Context]() {
override def initialValue(): Context = Context.Empty
}

override def current(): Context =
tls.get()

override def store(newContext: Context): Scope = {
val previousContext = tls.get()
tls.set(newContext)

new Scope {
override def context: Context = newContext
override def close(): Unit = tls.set(previousContext)
}
}
}

object CrossThreadLocal {
def apply(): Storage.CrossThreadLocal =
new Storage.CrossThreadLocal()
}

/**
* Wrapper that implements an optimized ThreadLocal access pattern ideal for heavily used ThreadLocals. It is faster
* to use a mutable holder object and always perform ThreadLocal.get() and never use ThreadLocal.set(), because the
* value is more likely to be found in the ThreadLocalMap direct hash slot and avoid the slow path of
* ThreadLocalMap.getEntryAfterMiss().
* WARNING: Closing of the returned Scope **MUST** be called in the same thread as store(..) was originally called.
*
* Credit to @trask from the FastThreadLocal in glowroot. One small change is that we don't use an kamon-defined
* holder object as that would prevent class unloading.
*/
// Named ThreadLocal for binary compatibility reasons, despite the fact that this isn't the default storage type
class ThreadLocal extends Storage {
private val tls = new java.lang.ThreadLocal[Array[AnyRef]]() {
override def initialValue(): Array[AnyRef] =
Expand Down Expand Up @@ -103,7 +136,6 @@ object Storage {
* This implementation is considerably less efficient than the default implementation since it is taking at least two
* different stack traces for every store/close operation pair. Do not use this for any reason other than debugging
* Context propagation issues (like, dirty Threads) in a controlled environment.
*
*/
class Debug extends Storage {
import Debug._
Expand Down Expand Up @@ -133,8 +165,9 @@ object Storage {
newContext

override def close(): Unit = {
ref.set(0, previousContext)
ref.set(2, stackTraceString())
val thisRef = _tls.get()
thisRef.set(0, previousContext)
thisRef.set(2, stackTraceString())
}
}
}
Expand Down