Skip to content

Commit

Permalink
Bump zio-opentelemetry version and update OpenTelemetryTracingZioBack…
Browse files Browse the repository at this point in the history
…end (#2250)
  • Loading branch information
Grryum authored Aug 7, 2024
1 parent bb151fb commit ce77c56
Show file tree
Hide file tree
Showing 4 changed files with 75 additions and 65 deletions.
4 changes: 3 additions & 1 deletion build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -999,7 +999,9 @@ lazy val openTelemetryTracingZioBackend = (projectMatrix in file("observability/
.settings(
name := "opentelemetry-tracing-zio-backend",
libraryDependencies ++= Seq(
"dev.zio" %% "zio-opentelemetry" % "2.0.3",
"dev.zio" %% "zio-opentelemetry" % "3.0.0-RC24",
"io.opentelemetry.semconv" % "opentelemetry-semconv" % "1.26.0-alpha",
"io.opentelemetry" % "opentelemetry-api" % openTelemetryVersion,
"io.opentelemetry" % "opentelemetry-sdk-testing" % openTelemetryVersion % Test
)
)
Expand Down
2 changes: 1 addition & 1 deletion docs/backends/wrappers/opentelemetry.md
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ Here's how you construct `ZioTelemetryOpenTelemetryBackend`. I would recommend w
```scala mdoc:compile-only
import sttp.client4._
import zio._
import zio.telemetry.opentelemetry._
import zio.telemetry.opentelemetry.tracing._
import sttp.client4.opentelemetry.zio._

val zioBackend: Backend[Task] = ???
Expand Down
Original file line number Diff line number Diff line change
@@ -1,88 +1,86 @@
package sttp.client4.opentelemetry.zio

import io.opentelemetry.api.trace.{SpanKind, StatusCode}
import io.opentelemetry.api.trace.propagation.W3CTraceContextPropagator
import io.opentelemetry.context.propagation.{TextMapPropagator, TextMapSetter}
import io.opentelemetry.api.common.Attributes
import io.opentelemetry.api.trace.SpanKind
import io.opentelemetry.semconv.{HttpAttributes, UrlAttributes}
import sttp.capabilities.Effect
import sttp.client4._
import sttp.client4.wrappers.DelegateBackend
import zio._
import zio.telemetry.opentelemetry.TracingSyntax.OpenTelemetryZioOps
import zio.telemetry.opentelemetry._
import zio.telemetry.opentelemetry.context.OutgoingContextCarrier
import zio.telemetry.opentelemetry.tracing.Tracing
import zio.telemetry.opentelemetry.tracing.propagation.TraceContextPropagator

import scala.collection.mutable

private class OpenTelemetryTracingZioBackend[+P](
private abstract class OpenTelemetryTracingZioBackend[+P](
delegate: GenericBackend[Task, P],
tracer: OpenTelemetryZioTracer,
tracer: OpenTelemetryTracer,
tracing: Tracing
) extends DelegateBackend[Task, P](delegate)
with Backend[Task] {
def send[T](request: GenericRequest[T, P with Effect[Task]]): Task[Response[T]] = {
val carrier: mutable.Map[String, String] = mutable.Map().empty
val propagator: TextMapPropagator = W3CTraceContextPropagator.getInstance()
val setter: TextMapSetter[mutable.Map[String, String]] = (carrier, key, value) => carrier.update(key, value)

(for {
_ <- Tracing.inject(propagator, carrier, setter)
_ <- tracer.before(request)
resp <- delegate.send(request.headers(carrier.toMap))
_ <- tracer.after(resp)
} yield resp)
.span(tracer.spanName(request), SpanKind.CLIENT, { case _ => StatusCode.ERROR })
.provideLayer(ZLayer.succeed(tracing))
}
def send[T](request: GenericRequest[T, P with Effect[Task]]): Task[Response[T]] =
ZIO.scoped {
val carrier = OutgoingContextCarrier.default()
for {
_ <- tracing.spanScoped(tracer.spanName(request), SpanKind.CLIENT, tracer.requestAttributes(request))
_ <- tracing.injectSpan(TraceContextPropagator.default, carrier)
resp <- delegate.send(request.headers(carrier.kernel.toMap))
_ <- ZIO.addFinalizer(tracing.getCurrentSpanUnsafe.map(_.setAllAttributes(tracer.responseAttribute(resp))))
} yield resp
}
}

object OpenTelemetryTracingZioBackend {
def apply(other: Backend[Task], tracing: Tracing): Backend[Task] =
apply(other, tracing, OpenTelemetryZioTracer.Default)
apply(other, tracing, OpenTelemetryTracer.Default)

def apply(other: WebSocketBackend[Task], tracing: Tracing): WebSocketBackend[Task] =
apply(other, tracing, OpenTelemetryZioTracer.Default)
apply(other, tracing, OpenTelemetryTracer.Default)

def apply[S](other: StreamBackend[Task, S], tracing: Tracing): StreamBackend[Task, S] =
apply(other, tracing, OpenTelemetryZioTracer.Default)
apply(other, tracing, OpenTelemetryTracer.Default)

def apply[S](other: WebSocketStreamBackend[Task, S], tracing: Tracing): WebSocketStreamBackend[Task, S] =
apply(other, tracing, OpenTelemetryZioTracer.Default)
apply(other, tracing, OpenTelemetryTracer.Default)

def apply(other: Backend[Task], tracing: Tracing, tracer: OpenTelemetryZioTracer): Backend[Task] =
new OpenTelemetryTracingZioBackend(other, tracer, tracing)
def apply(other: Backend[Task], tracing: Tracing, tracer: OpenTelemetryTracer): Backend[Task] =
new OpenTelemetryTracingZioBackend(other, tracer, tracing) with Backend[Task]

def apply(other: WebSocketBackend[Task], tracing: Tracing, tracer: OpenTelemetryZioTracer): WebSocketBackend[Task] =
def apply(other: WebSocketBackend[Task], tracing: Tracing, tracer: OpenTelemetryTracer): WebSocketBackend[Task] =
new OpenTelemetryTracingZioBackend(other, tracer, tracing) with WebSocketBackend[Task]

def apply[S](
other: StreamBackend[Task, S],
tracing: Tracing,
tracer: OpenTelemetryZioTracer
tracer: OpenTelemetryTracer
): StreamBackend[Task, S] =
new OpenTelemetryTracingZioBackend(other, tracer, tracing) with StreamBackend[Task, S]

def apply[S](
other: WebSocketStreamBackend[Task, S],
tracing: Tracing,
tracer: OpenTelemetryZioTracer
tracer: OpenTelemetryTracer
): WebSocketStreamBackend[Task, S] =
new OpenTelemetryTracingZioBackend(other, tracer, tracing) with WebSocketStreamBackend[Task, S]
}

trait OpenTelemetryZioTracer {
trait OpenTelemetryTracer {
def spanName[T](request: GenericRequest[T, Nothing]): String
def before[T](request: GenericRequest[T, Nothing]): RIO[Tracing, Unit]
def after[T](response: Response[T]): RIO[Tracing, Unit]
def requestAttributes[T](request: GenericRequest[T, Nothing]): Attributes
def responseAttribute[T](response: Response[T]): Attributes
}

object OpenTelemetryZioTracer {
val Default: OpenTelemetryZioTracer = new OpenTelemetryZioTracer {
object OpenTelemetryTracer {
lazy val Default: OpenTelemetryTracer = new OpenTelemetryTracer {
override def spanName[T](request: GenericRequest[T, Nothing]): String = s"HTTP ${request.method.method}"
override def before[T](request: GenericRequest[T, Nothing]): RIO[Tracing, Unit] =
Tracing.setAttribute("http.method", request.method.method) *>
Tracing.setAttribute("http.url", request.uri.toString()) *>
ZIO.unit
override def after[T](response: Response[T]): RIO[Tracing, Unit] =
Tracing.setAttribute("http.status_code", response.code.code.toLong) *>
ZIO.unit
override def requestAttributes[T](request: GenericRequest[T, Nothing]): Attributes =
Attributes.builder
.put(HttpAttributes.HTTP_REQUEST_METHOD, request.method.method)
.put(UrlAttributes.URL_FULL, request.uri.path.mkString("/"))
.build()

override def responseAttribute[T](response: Response[T]): Attributes =
Attributes.builder
.put(HttpAttributes.HTTP_RESPONSE_STATUS_CODE, response.code.code.toLong: java.lang.Long)
.build()
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,11 @@ import org.scalatest.flatspec.AnyFlatSpec
import org.scalatest.matchers.should.Matchers
import sttp.client4.impl.zio.{RIOMonadAsyncError, ZioTestBase}
import sttp.client4.testing.{BackendStub, ResponseStub}
import sttp.client4.{basicRequest, Backend, GenericRequest, Response, UriContext}
import sttp.client4.{basicRequest, Backend, GenericRequest, UriContext}
import sttp.model.StatusCode
import zio.{Runtime, Task, Unsafe, ZIO}
import zio.telemetry.opentelemetry.Tracing
import zio.telemetry.opentelemetry.OpenTelemetry
import zio.telemetry.opentelemetry.tracing.Tracing
import zio.{Runtime, Task, Unsafe, ZIO, ZLayer}

import scala.collection.JavaConverters._
import scala.collection.mutable
Expand All @@ -24,29 +25,38 @@ class OpenTelemetryTracingZioBackendTest extends AnyFlatSpec with Matchers with

private val mockTracer =
SdkTracerProvider.builder().addSpanProcessor(SimpleSpanProcessor.create(spanExporter)).build().get(getClass.getName)
private val mockTracing = Unsafe.unsafeCompat { implicit u =>
Runtime.default.unsafe.run(ZIO.scoped(Tracing.scoped(mockTracer))).getOrThrow()
}

private val mockTracingLayer = (OpenTelemetry.contextJVM ++ ZLayer.succeed(mockTracer)) >>> Tracing.live()

private val backend: Backend[Task] =
OpenTelemetryTracingZioBackend(
BackendStub(new RIOMonadAsyncError[Any]).whenRequestMatchesPartial {
case r if r.uri.toString.contains("echo") =>
recordedRequests += r
ResponseStub.ok("")
case r if r.uri.toString.contains("error") =>
throw new RuntimeException("something went wrong")
},
mockTracing
)
Unsafe.unsafe { implicit u =>
Runtime.default.unsafe
.run(
ZIO
.serviceWith[Tracing](
OpenTelemetryTracingZioBackend(
BackendStub(new RIOMonadAsyncError[Any]).whenRequestMatchesPartial {
case r if r.uri.toString.contains("echo") =>
recordedRequests += r
ResponseStub.ok("")
case r if r.uri.toString.contains("error") =>
throw new RuntimeException("something went wrong")
},
_
)
)
.provideLayer(mockTracingLayer)
)
.getOrThrow()
}

before {
recordedRequests.clear()
spanExporter.reset()
}

"ZioTelemetryOpenTelemetryBackend" should "record spans for requests" in {
val response = Unsafe.unsafeCompat { implicit u =>
val response = Unsafe.unsafe { implicit u =>
Runtime.default.unsafe.run(basicRequest.post(uri"http://stub/echo").send(backend)).getOrThrow()
}
response.code shouldBe StatusCode.Ok
Expand All @@ -57,7 +67,7 @@ class OpenTelemetryTracingZioBackendTest extends AnyFlatSpec with Matchers with
}

it should "propagate span" in {
val response = Unsafe.unsafeCompat { implicit u =>
val response = Unsafe.unsafe { implicit u =>
Runtime.default.unsafe.run(basicRequest.post(uri"http://stub/echo").send(backend)).getOrThrow()
}
response.code shouldBe StatusCode.Ok
Expand All @@ -71,7 +81,7 @@ class OpenTelemetryTracingZioBackendTest extends AnyFlatSpec with Matchers with
}

it should "set span status in case of error" in {
Unsafe.unsafeCompat { implicit u =>
Unsafe.unsafe { implicit u =>
Runtime.default.unsafe.run(basicRequest.post(uri"http://stub/error").send(backend))
}

Expand Down

0 comments on commit ce77c56

Please sign in to comment.