Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Zio 1 update #1739

Merged
merged 4 commits into from
Feb 15, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -12,13 +12,14 @@ object Util {
} else {
body.bimap(
error => {
AgentBridge.activeToken.remove()
tracer.finish(new Throwable("ZIO txn body fail"))
error
},
success => {
AgentBridge.activeToken.remove()
tracer.finish(172, null)
success
})
})

}
14 changes: 5 additions & 9 deletions instrumentation/zio/src/main/java/zio/internal/Utils.java
Original file line number Diff line number Diff line change
Expand Up @@ -11,14 +11,10 @@ public class Utils {

public static AgentBridge.TokenAndRefCount getThreadTokenAndRefCount() {
AgentBridge.TokenAndRefCount tokenAndRefCount = AgentBridge.activeToken.get();
if (tokenAndRefCount == null) {
Transaction tx = AgentBridge.getAgent().getTransaction(false);
if (tx != null) {
tokenAndRefCount = new AgentBridge.TokenAndRefCount(tx.getToken(),
AgentBridge.getAgent().getTracedMethod(), new AtomicInteger(1));
}
} else {
tokenAndRefCount.refCount.incrementAndGet();
Transaction tx = AgentBridge.getAgent().getTransaction(false);
if (tx != null) {
tokenAndRefCount = new AgentBridge.TokenAndRefCount(tx.getToken(),
AgentBridge.getAgent().getTracedMethod(), new AtomicInteger(1));
}
return tokenAndRefCount;
}
Expand All @@ -32,7 +28,7 @@ public static void setThreadTokenAndRefCount(AgentBridge.TokenAndRefCount tokenA

public static void clearThreadTokenAndRefCountAndTxn(AgentBridge.TokenAndRefCount tokenAndRefCount) {
AgentBridge.activeToken.remove();
if (tokenAndRefCount != null && tokenAndRefCount.refCount.decrementAndGet() == 0) {
if (tokenAndRefCount != null) { //removed a call to decrement the ref count as it is no longer being used
tokenAndRefCount.token.expire();
tokenAndRefCount.token = null;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,10 @@ import com.newrelic.agent.introspec._
import com.newrelic.api.agent.Trace
import com.newrelic.zio.api.TraceOps._
import org.junit.runner.RunWith
import org.junit.{After, Assert, Ignore, Test}
import zio.Exit.{Failure, Success}
import org.junit.runners.MethodSorters
import org.junit.{After, Assert, Before, FixMethodOrder, Test}
import zio.Exit.Success
import zio.clock.Clock

import java.util.concurrent.Executors
import scala.concurrent.ExecutionContext
Expand All @@ -15,8 +17,11 @@ import zio.duration.durationInt

@RunWith(classOf[InstrumentationTestRunner])
@InstrumentationTestConfig(includePrefixes = Array("none"))
@FixMethodOrder(MethodSorters.NAME_ASCENDING)
class ZIOTraceOpsTests {

val introspector: Introspector = InstrumentationTestRunner.getIntrospector

def executorService(nThreads: Int) = {
ExecutionContext.fromExecutor(Executors.newFixedThreadPool(nThreads))
}
Expand All @@ -25,16 +30,19 @@ class ZIOTraceOpsTests {
val threadPoolTwo: ExecutionContext = executorService(3)
val threadPoolThree: ExecutionContext = executorService(3)

@Before
def setup() = {
com.newrelic.agent.Transaction.clearTransaction
introspector.clear()
}
@After
def resetTxn() = {
com.newrelic.agent.Transaction.clearTransaction
introspector.clear()
}

@Test
def asyncTraceProducesOneSegment(): Unit = {
//Given
val introspector: Introspector = InstrumentationTestRunner.getIntrospector

//When
val txnBlock: UIO[Int] = txn {
asyncTrace("getNumber")(UIO(1))
Expand All @@ -55,9 +63,6 @@ class ZIOTraceOpsTests {

@Test
def chainedSyncAndAsyncTraceSegmentsCaptured(): Unit = {
//Given
val introspector: Introspector = InstrumentationTestRunner.getIntrospector

//When
val txnBlock: UIO[Int] = txn(
asyncTrace("getNumber")(UIO(1))
Expand All @@ -80,8 +85,6 @@ class ZIOTraceOpsTests {

@Test
def asyncForComprehensionSegments: Unit = {
//Given
val introspector: Introspector = InstrumentationTestRunner.getIntrospector

//When
val txnBlock: UIO[Int] = txn {
Expand Down Expand Up @@ -113,8 +116,6 @@ class ZIOTraceOpsTests {
@Test
def sequentialAsyncTraceSegmentTimeCaptured(): Unit = {
val delayMillis = 1500
//Given
val introspector: Introspector = InstrumentationTestRunner.getIntrospector

//When
val txnBlock = txn(
Expand Down Expand Up @@ -142,8 +143,6 @@ class ZIOTraceOpsTests {

@Test
def segmentCompletedIfIOErrors(): Unit = {
//Given
val introspector: Introspector = InstrumentationTestRunner.getIntrospector

//When
val txnBlock: Task[Int] = txn(
Expand Down Expand Up @@ -173,8 +172,6 @@ class ZIOTraceOpsTests {

@Test
def parallelTraverse(): Unit = {
//Given
val introspector: Introspector = InstrumentationTestRunner.getIntrospector

//When
val txnBlock: UIO[Int] = txn(
Expand All @@ -199,6 +196,49 @@ class ZIOTraceOpsTests {
Assert.assertTrue(s"sum segments exists", segments.exists(_.getName == s"Custom/sum segments"))
}

/*
Added in response to a bug discovered with ZIO instrumentation where back-to-back
transactions with thread hops leaked into each other (resulting in 1 transaction instead of several).
This was found to primarily affect transactions that started and ended on different threads (eg,
because of a .join operation).

This test is named to be evaluated last alphabetically due to side effects from the ZIO Runtime environment.
*/
@Test
def z_multipleTransactionsWithThreadHopsDoNotBleed(): Unit = {
val delayMillis = 500

//When
def txnBlock(i: Int): ZIO[Clock, Nothing, Int] = {
txn(
for {
one <- asyncTrace(s"loop $i: one")(UIO(1))
_ <- asyncTrace(s"loop $i: sleep")(ZIO.sleep(delayMillis.millis))//thread hop
forkedFiber <- asyncTrace(s"loop $i: forked fiber")(UIO(5).fork) //thread hop
five <- forkedFiber.join //thread hop
eight <- asyncTrace(s"loop $i: eight")(UIO(one + five + 2))
} yield eight + i
)
}
val result = Runtime.default.unsafeRunSync(
ZIO.loop(0)(i => i < 5, i => i + 1)(i => txnBlock(i))
)

val txnCount = introspector.getFinishedTransactionCount()
val traces = getTraces(introspector)
val segments = getSegments(traces)

Assert.assertEquals("Result correct", Success(List(8, 9, 10, 11, 12)), result)
Assert.assertEquals("Correct number of transactions", 5, txnCount)
Assert.assertEquals("Correct number of traces", 5, traces.size)
List(0, 1, 2, 3, 4).foreach(i => {
Assert.assertTrue("one segment exists", segments.exists(_.getName == s"Custom/loop $i: one"))
Assert.assertTrue("sleep segment exists", segments.exists(_.getName == s"Custom/loop $i: sleep"))
Assert.assertTrue("forked segment exists", segments.exists(_.getName == s"Custom/loop $i: forked fiber"))
Assert.assertTrue("eight segment exists", segments.exists(_.getName == s"Custom/loop $i: eight"))
})

}

@Trace(async = true)
private def getThree = 3
Expand Down
Loading