Skip to content

Commit

Permalink
cats effect 3: pass transaction info by implicits
Browse files Browse the repository at this point in the history
  • Loading branch information
richard-gibson committed Nov 29, 2021
1 parent 0da09b0 commit e6ad762
Show file tree
Hide file tree
Showing 5 changed files with 158 additions and 75 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ public static void attemptExpireTokenRefCount(AgentBridge.TokenAndRefCount token
}

public static void setThreadTokenAndRefCount(AgentBridge.TokenAndRefCount tokenAndRefCount) {
if (tokenAndRefCount != null) {
if (tokenAndRefCount != null && tokenAndRefCount.token != null) {
logTokenInfo(tokenAndRefCount, "setting token to thread");
AgentBridge.activeToken.set(tokenAndRefCount);
tokenAndRefCount.token.link();
Expand Down
Original file line number Diff line number Diff line change
@@ -1,12 +1,16 @@
package com.newrelic.cats3.api

import cats.effect.Sync
import com.newrelic.api.agent.NewRelic
import com.newrelic.api.agent.weaver.Weaver
import com.newrelic.api.agent.weaver.scala.{ScalaMatchType, ScalaWeave}

@ScalaWeave(`type` = ScalaMatchType.Object, `originalName` = "com.newrelic.cats3.api.TraceOps")
class TraceOps_Instrumentation {
def txn[S, F[_]:Sync](body: F[S]): F[S] = {
Util.wrapTrace(Weaver.callOriginal)
// def txnInfo: TxnInfo = Weaver.callOriginal()


def txn[S, F[_]:Sync](body: TxnInfo => F[S]): F[S] = {
Util.wrapTrace(body)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,32 +2,46 @@ package com.newrelic.cats3.api

import cats.effect.Sync
import cats.implicits._
import com.newrelic.agent.bridge.{AgentBridge, ExitTracer, Transaction, TracedMethod}
import com.newrelic.agent.bridge.{AgentBridge, ExitTracer, Token, Transaction}
import com.newrelic.api.agent.NewRelic

import java.util.concurrent.atomic.AtomicInteger

object Util {
val RETURN_OPCODE = 176

def wrapTrace[S, F[_] : Sync](body: F[S]): F[S] =
Sync[F].delay(AgentBridge.instrumentation.createScalaTxnTracer())
.redeemWith(_ => body,
tracer =>
if (tracer == null) {
body
} else {
for {
txnWithTracedMethod <- Sync[F].delay {
val agent = AgentBridge.getAgent
(agent.getTransaction(false), agent.getTracedMethod)
}
_ <- setupTokenAndRefCount(txnWithTracedMethod)
res <- attachErrorEvent(body, tracer)
_ <- cleanupTxnAndTokenRefCount(txnWithTracedMethod._1)
_ <- Sync[F].delay(tracer.finish(RETURN_OPCODE, null))
} yield res
}
)
def wrapTrace[S, F[_] : Sync](body: TxnInfo => F[S]): F[S] =
Sync[F].delay {
val tracer = AgentBridge.instrumentation.createScalaTxnTracer()
(tracer, optTxnInfo())
}.redeemWith(
_ => body(txnInfo),
tracerAndTxnInfo => {
val (tracer, optTxnInfo) = tracerAndTxnInfo
for {
_ <- Sync[F].delay(AgentBridge.getAgent.getTransaction(false))
updatedTxnInfo <- setupTokenAndRefCount(optTxnInfo, txnInfo)
res <- attachErrorEvent(body(updatedTxnInfo), tracer)
_ <- cleanupTxnAndTokenRefCount(updatedTxnInfo)
_ <- Sync[F].delay(tracer.finish(RETURN_OPCODE, null))
} yield res
}
)

private def txnInfo = {
val txn = NewRelic.getAgent.getTransaction
TxnInfo(txn, txn.getToken)
}

def optTxnInfo(): (Transaction, Token) = {
val txn = AgentBridge.getAgent.getTransaction(false)
if (txn != null) {
(txn, txn.getToken)
} else {
null
}
}


private def attachErrorEvent[S, F[_] : Sync](body: F[S], tracer: ExitTracer): F[S] =
body
Expand All @@ -36,23 +50,29 @@ object Util {
Sync[F].raiseError(throwable)
})

private def setupTokenAndRefCount[F[_] : Sync](txnWithTracerMethod: (Transaction, TracedMethod)): F[Unit] = Sync[F].delay {

val (txn, tracedMethod) = txnWithTracerMethod
if (txn != null && tracedMethod != null) {
AgentBridge.activeToken.set(
new AgentBridge.TokenAndRefCount(
txn.getToken,
tracedMethod,
new AtomicInteger(0)
))
private def setupTokenAndRefCount[F[_] : Sync](optTxn: (Transaction, Token), fallback: => TxnInfo)
: F[TxnInfo] =
Sync[F].delay {
if (optTxn != null) {
val (txn, token) = optTxn
AgentBridge.activeToken.set(
new AgentBridge.TokenAndRefCount(
token,
AgentBridge.getAgent.getTracedMethod,
new AtomicInteger(0)
)
)
TxnInfo(txn, token)
} else {
fallback
}
}
}

private def cleanupTxnAndTokenRefCount[F[_] : Sync](txn: Transaction): F[Unit] = Sync[F].delay {
AgentBridge.activeToken.remove()
if (txn != null) {
txn.expireAllTokens()
private def cleanupTxnAndTokenRefCount[F[_] : Sync](txnInfo: TxnInfo): F[Unit] = Sync[F].delay {
val tokenAndRefCount = AgentBridge.activeToken.get()
if (tokenAndRefCount != null) {
AgentBridge.activeToken.remove()
}
txnInfo.transaction.asInstanceOf[Transaction].expireAllTokens()
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,16 @@ package com.newrelic.cats3.api

import cats.effect.Sync
import cats.implicits._
import com.newrelic.api.agent.{NewRelic, Segment, Token}

import com.newrelic.api.agent.{NewRelic, Segment, Token, Transaction}

case class TxnInfo(transaction: Transaction, token: Token)
object TraceOps {

def txnInfo: TxnInfo = {
val txn = NewRelic.getAgent.getTransaction
val txnInfo = TxnInfo(txn, txn.getToken)
println(s"calling TxnInfo $txnInfo")
txnInfo
}
/**
* Creates a segment to capture metrics for a given block of code, this will call {@link com.newrelic.api.agent.Transaction# startSegment ( String )},
* execute the code block, then call {@link com.newrelic.api.agent.Segment# end ( )}. This {@link Segment} will show up in the Transaction Breakdown
Expand Down Expand Up @@ -60,8 +65,8 @@ object TraceOps {
* The block should return a { @link IO}
* @return Value returned from completed asynchronous code block
*/
def asyncTrace[S, F[_] : Sync](segmentName: String)(block: F[S]): F[S] = for {
segment <- startSegment(segmentName)
def asyncTrace[S, F[_] : Sync](segmentName: String)(block: F[S])(implicit txnInfo: TxnInfo): F[S] = for {
segment <- Sync[F].delay(txnInfo.transaction.startSegment(segmentName))
res <- endSegmentOnError(block, segment)
_ <- Sync[F].delay(segment.end())
} yield res
Expand Down Expand Up @@ -118,9 +123,9 @@ object TraceOps {
* @tparam S Type returned from completed asynchronous function
* @return Value returned from completed asynchronous function
*/
def asyncTraceFun[T, S, F[_] : Sync](segmentName: String)(f: T => F[S]): T => F[S] = { t: T =>
def asyncTraceFun[T, S, F[_] : Sync](segmentName: String)(f: T => F[S])(implicit txnInfo: TxnInfo): T => F[S] = { t: T =>
for {
segment <- startSegment(segmentName)
segment <- Sync[F].delay(txnInfo.transaction.startSegment(segmentName))
evaluatedFunc <- endSegmentOnError(f(t), segment)
_ <- Sync[F].delay(segment.end())
} yield evaluatedFunc
Expand All @@ -142,10 +147,10 @@ object TraceOps {
* @tparam S Type returned by code block
* @return Value returned by code block
*/
def txn[S, F[_] : Sync](body: F[S]): F[S] = body
def txn[S, F[_] : Sync](body: TxnInfo => F[S]): F[S] = body(txnInfo)

private def startSegment[F[_] : Sync](segmentName: String): F[Segment] = Sync[F].delay {
val txn = NewRelic.getAgent.getTransaction()
val txn = NewRelic.getAgent.getTransaction
txn.startSegment(segmentName)
}

Expand Down
Loading

0 comments on commit e6ad762

Please sign in to comment.