From 8ced849b81238013bf7e0335e7f43506bb803dda Mon Sep 17 00:00:00 2001 From: Kate Anderson Date: Fri, 9 Feb 2024 13:43:28 -0800 Subject: [PATCH 1/4] Remove refCount updates and remove token on transaction end --- .../src/main/scala/com/newrelic/zio/api/Util.scala | 3 ++- .../zio/src/main/java/zio/internal/Utils.java | 12 ++++-------- 2 files changed, 6 insertions(+), 9 deletions(-) diff --git a/instrumentation/newrelic-scala-zio-api/src/main/scala/com/newrelic/zio/api/Util.scala b/instrumentation/newrelic-scala-zio-api/src/main/scala/com/newrelic/zio/api/Util.scala index 4401191b7a..ff9c92b6a1 100644 --- a/instrumentation/newrelic-scala-zio-api/src/main/scala/com/newrelic/zio/api/Util.scala +++ b/instrumentation/newrelic-scala-zio-api/src/main/scala/com/newrelic/zio/api/Util.scala @@ -12,13 +12,14 @@ object Util { } else { body.bimap( error => { + AgentBridge.activeToken.remove() tracer.finish(new Throwable("ZIO txn body fail")) error }, success => { + AgentBridge.activeToken.remove() tracer.finish(172, null) success }) }) - } diff --git a/instrumentation/zio/src/main/java/zio/internal/Utils.java b/instrumentation/zio/src/main/java/zio/internal/Utils.java index 98892798d7..c89e733c01 100644 --- a/instrumentation/zio/src/main/java/zio/internal/Utils.java +++ b/instrumentation/zio/src/main/java/zio/internal/Utils.java @@ -11,14 +11,10 @@ public class Utils { public static AgentBridge.TokenAndRefCount getThreadTokenAndRefCount() { AgentBridge.TokenAndRefCount tokenAndRefCount = AgentBridge.activeToken.get(); - if (tokenAndRefCount == null) { - Transaction tx = AgentBridge.getAgent().getTransaction(false); - if (tx != null) { - tokenAndRefCount = new AgentBridge.TokenAndRefCount(tx.getToken(), - AgentBridge.getAgent().getTracedMethod(), new AtomicInteger(1)); - } - } else { - tokenAndRefCount.refCount.incrementAndGet(); + Transaction tx = AgentBridge.getAgent().getTransaction(false); + if (tx != null) { + tokenAndRefCount = new AgentBridge.TokenAndRefCount(tx.getToken(), + AgentBridge.getAgent().getTracedMethod(), new AtomicInteger(1)); } return tokenAndRefCount; } From 8665569e7f16f8d20e5c8819a6d033580e11fdec Mon Sep 17 00:00:00 2001 From: Kate Anderson Date: Fri, 9 Feb 2024 16:05:21 -0800 Subject: [PATCH 2/4] Add a unit test for leaky zio txns --- .../newrelic/zio/api/TraceOpsDSLTest.scala | 42 +++++++++++++++++++ 1 file changed, 42 insertions(+) diff --git a/newrelic-scala-zio-api/src/test/scala/com/newrelic/zio/api/TraceOpsDSLTest.scala b/newrelic-scala-zio-api/src/test/scala/com/newrelic/zio/api/TraceOpsDSLTest.scala index 2f77916e2f..8e9c61fcb7 100644 --- a/newrelic-scala-zio-api/src/test/scala/com/newrelic/zio/api/TraceOpsDSLTest.scala +++ b/newrelic-scala-zio-api/src/test/scala/com/newrelic/zio/api/TraceOpsDSLTest.scala @@ -6,6 +6,7 @@ import com.newrelic.zio.api.TraceOps._ import org.junit.runner.RunWith import org.junit.{After, Assert, Ignore, Test} import zio.Exit.{Failure, Success} +import zio.clock.Clock import java.util.concurrent.Executors import scala.concurrent.ExecutionContext @@ -199,6 +200,47 @@ class ZIOTraceOpsTests { Assert.assertTrue(s"sum segments exists", segments.exists(_.getName == s"Custom/sum segments")) } + /* + Added in response to a bug discovered with ZIO instrumentation where back-to-back + transactions with thread hops leaked into each other (resulting in 1 transaction instead of several). + This was found to primarily affect transactions that started and ended on different threads (eg, + because of a .join operation). + */ + @Test + def multipleTransactionsWithThreadHopsDoNotBleed(): Unit = { + val delayMillis = 500 + val introspector: Introspector = InstrumentationTestRunner.getIntrospector + + //When + def txnBlock(i: Int): ZIO[Clock, Nothing, Int] = txn{ + for { + one <- asyncTrace(s"loop $i: one")(UIO(1)) + _ <- asyncTrace(s"loop $i: sleep")(ZIO.sleep(delayMillis.millis)) //thread hop + forkedFiber <- asyncTrace(s"loop $i: forked fiber")(UIO(5).fork) //thread hop + five <- forkedFiber.join //thread hop + eight <- asyncTrace(s"loop $i: eight")(UIO(one + five + 2)) + } yield eight + i + } + val result = Runtime.default.unsafeRunSync( + ZIO.loop(0)(i => i < 5, i => i + 1)(i => txnBlock(i)) + ) + + val txnCount = introspector.getFinishedTransactionCount() + val traces = getTraces(introspector) + val segments = getSegments(traces) + + Assert.assertEquals("Result correct", Success(List(8, 9, 10, 11, 12)), result) + Assert.assertEquals("Correct number of transactions", 5, txnCount) + Assert.assertEquals("Correct number of traces", 5, traces.size) + List(0, 1, 2, 3, 4).foreach(i => { + Assert.assertTrue("one segment exists", segments.exists(_.getName == s"Custom/loop $i: one")) + Assert.assertTrue("sleep segment exists", segments.exists(_.getName == s"Custom/loop $i: sleep")) + Assert.assertTrue("forked segment exists", segments.exists(_.getName == s"Custom/loop $i: forked fiber")) + Assert.assertTrue("eight segment exists", segments.exists(_.getName == s"Custom/loop $i: eight")) + }) + + } + @Trace(async = true) private def getThree = 3 From 6281c1d9bf629d7bf61a89667ac0104de303833a Mon Sep 17 00:00:00 2001 From: Kate Anderson Date: Tue, 13 Feb 2024 11:24:20 -0800 Subject: [PATCH 3/4] Modify test ordering and setup/teardown in TraceOpsDSLTest --- .../newrelic/zio/api/TraceOpsDSLTest.scala | 52 +++++++++---------- 1 file changed, 25 insertions(+), 27 deletions(-) diff --git a/newrelic-scala-zio-api/src/test/scala/com/newrelic/zio/api/TraceOpsDSLTest.scala b/newrelic-scala-zio-api/src/test/scala/com/newrelic/zio/api/TraceOpsDSLTest.scala index 8e9c61fcb7..3024b419bf 100644 --- a/newrelic-scala-zio-api/src/test/scala/com/newrelic/zio/api/TraceOpsDSLTest.scala +++ b/newrelic-scala-zio-api/src/test/scala/com/newrelic/zio/api/TraceOpsDSLTest.scala @@ -4,8 +4,9 @@ import com.newrelic.agent.introspec._ import com.newrelic.api.agent.Trace import com.newrelic.zio.api.TraceOps._ import org.junit.runner.RunWith -import org.junit.{After, Assert, Ignore, Test} -import zio.Exit.{Failure, Success} +import org.junit.runners.MethodSorters +import org.junit.{After, Assert, Before, FixMethodOrder, Test} +import zio.Exit.Success import zio.clock.Clock import java.util.concurrent.Executors @@ -16,8 +17,11 @@ import zio.duration.durationInt @RunWith(classOf[InstrumentationTestRunner]) @InstrumentationTestConfig(includePrefixes = Array("none")) +@FixMethodOrder(MethodSorters.NAME_ASCENDING) class ZIOTraceOpsTests { + val introspector: Introspector = InstrumentationTestRunner.getIntrospector + def executorService(nThreads: Int) = { ExecutionContext.fromExecutor(Executors.newFixedThreadPool(nThreads)) } @@ -26,16 +30,19 @@ class ZIOTraceOpsTests { val threadPoolTwo: ExecutionContext = executorService(3) val threadPoolThree: ExecutionContext = executorService(3) + @Before + def setup() = { + com.newrelic.agent.Transaction.clearTransaction + introspector.clear() + } @After def resetTxn() = { com.newrelic.agent.Transaction.clearTransaction + introspector.clear() } @Test def asyncTraceProducesOneSegment(): Unit = { - //Given - val introspector: Introspector = InstrumentationTestRunner.getIntrospector - //When val txnBlock: UIO[Int] = txn { asyncTrace("getNumber")(UIO(1)) @@ -56,9 +63,6 @@ class ZIOTraceOpsTests { @Test def chainedSyncAndAsyncTraceSegmentsCaptured(): Unit = { - //Given - val introspector: Introspector = InstrumentationTestRunner.getIntrospector - //When val txnBlock: UIO[Int] = txn( asyncTrace("getNumber")(UIO(1)) @@ -81,8 +85,6 @@ class ZIOTraceOpsTests { @Test def asyncForComprehensionSegments: Unit = { - //Given - val introspector: Introspector = InstrumentationTestRunner.getIntrospector //When val txnBlock: UIO[Int] = txn { @@ -114,8 +116,6 @@ class ZIOTraceOpsTests { @Test def sequentialAsyncTraceSegmentTimeCaptured(): Unit = { val delayMillis = 1500 - //Given - val introspector: Introspector = InstrumentationTestRunner.getIntrospector //When val txnBlock = txn( @@ -143,8 +143,6 @@ class ZIOTraceOpsTests { @Test def segmentCompletedIfIOErrors(): Unit = { - //Given - val introspector: Introspector = InstrumentationTestRunner.getIntrospector //When val txnBlock: Task[Int] = txn( @@ -174,8 +172,6 @@ class ZIOTraceOpsTests { @Test def parallelTraverse(): Unit = { - //Given - val introspector: Introspector = InstrumentationTestRunner.getIntrospector //When val txnBlock: UIO[Int] = txn( @@ -205,21 +201,24 @@ class ZIOTraceOpsTests { transactions with thread hops leaked into each other (resulting in 1 transaction instead of several). This was found to primarily affect transactions that started and ended on different threads (eg, because of a .join operation). + + This test is named to be evaluated last alphabetically due to side effects from the ZIO Runtime environment. */ @Test - def multipleTransactionsWithThreadHopsDoNotBleed(): Unit = { + def z_multipleTransactionsWithThreadHopsDoNotBleed(): Unit = { val delayMillis = 500 - val introspector: Introspector = InstrumentationTestRunner.getIntrospector //When - def txnBlock(i: Int): ZIO[Clock, Nothing, Int] = txn{ - for { - one <- asyncTrace(s"loop $i: one")(UIO(1)) - _ <- asyncTrace(s"loop $i: sleep")(ZIO.sleep(delayMillis.millis)) //thread hop - forkedFiber <- asyncTrace(s"loop $i: forked fiber")(UIO(5).fork) //thread hop - five <- forkedFiber.join //thread hop - eight <- asyncTrace(s"loop $i: eight")(UIO(one + five + 2)) - } yield eight + i + def txnBlock(i: Int): ZIO[Clock, Nothing, Int] = { + txn( + for { + one <- asyncTrace(s"loop $i: one")(UIO(1)) + _ <- asyncTrace(s"loop $i: sleep")(ZIO.sleep(delayMillis.millis))//thread hop + forkedFiber <- asyncTrace(s"loop $i: forked fiber")(UIO(5).fork) //thread hop + five <- forkedFiber.join //thread hop + eight <- asyncTrace(s"loop $i: eight")(UIO(one + five + 2)) + } yield eight + i + ) } val result = Runtime.default.unsafeRunSync( ZIO.loop(0)(i => i < 5, i => i + 1)(i => txnBlock(i)) @@ -241,7 +240,6 @@ class ZIOTraceOpsTests { } - @Trace(async = true) private def getThree = 3 From f84a8cc99133d25cb995b2f19d518bfd74feec4c Mon Sep 17 00:00:00 2001 From: Kate Anderson Date: Thu, 15 Feb 2024 10:12:52 -0800 Subject: [PATCH 4/4] Do not decrement ref count --- instrumentation/zio/src/main/java/zio/internal/Utils.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/instrumentation/zio/src/main/java/zio/internal/Utils.java b/instrumentation/zio/src/main/java/zio/internal/Utils.java index c89e733c01..6766095d34 100644 --- a/instrumentation/zio/src/main/java/zio/internal/Utils.java +++ b/instrumentation/zio/src/main/java/zio/internal/Utils.java @@ -28,7 +28,7 @@ public static void setThreadTokenAndRefCount(AgentBridge.TokenAndRefCount tokenA public static void clearThreadTokenAndRefCountAndTxn(AgentBridge.TokenAndRefCount tokenAndRefCount) { AgentBridge.activeToken.remove(); - if (tokenAndRefCount != null && tokenAndRefCount.refCount.decrementAndGet() == 0) { + if (tokenAndRefCount != null) { //removed a call to decrement the ref count as it is no longer being used tokenAndRefCount.token.expire(); tokenAndRefCount.token = null; }