Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Bump zio-opentelemetry version and update OpenTelemetryTracingZioBackend #2250

Merged
merged 2 commits into from
Aug 7, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -999,7 +999,9 @@ lazy val openTelemetryTracingZioBackend = (projectMatrix in file("observability/
.settings(
name := "opentelemetry-tracing-zio-backend",
libraryDependencies ++= Seq(
"dev.zio" %% "zio-opentelemetry" % "2.0.3",
"dev.zio" %% "zio-opentelemetry" % "3.0.0-RC24",
"io.opentelemetry.semconv" % "opentelemetry-semconv" % "1.26.0-alpha",
"io.opentelemetry" % "opentelemetry-api" % openTelemetryVersion,
"io.opentelemetry" % "opentelemetry-sdk-testing" % openTelemetryVersion % Test
)
)
Expand Down
2 changes: 1 addition & 1 deletion docs/backends/wrappers/opentelemetry.md
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ Here's how you construct `ZioTelemetryOpenTelemetryBackend`. I would recommend w
```scala mdoc:compile-only
import sttp.client4._
import zio._
import zio.telemetry.opentelemetry._
import zio.telemetry.opentelemetry.tracing._
import sttp.client4.opentelemetry.zio._

val zioBackend: Backend[Task] = ???
Expand Down
Original file line number Diff line number Diff line change
@@ -1,88 +1,86 @@
package sttp.client4.opentelemetry.zio

import io.opentelemetry.api.trace.{SpanKind, StatusCode}
import io.opentelemetry.api.trace.propagation.W3CTraceContextPropagator
import io.opentelemetry.context.propagation.{TextMapPropagator, TextMapSetter}
import io.opentelemetry.api.common.Attributes
import io.opentelemetry.api.trace.SpanKind
import io.opentelemetry.semconv.{HttpAttributes, UrlAttributes}
import sttp.capabilities.Effect
import sttp.client4._
import sttp.client4.wrappers.DelegateBackend
import zio._
import zio.telemetry.opentelemetry.TracingSyntax.OpenTelemetryZioOps
import zio.telemetry.opentelemetry._
import zio.telemetry.opentelemetry.context.OutgoingContextCarrier
import zio.telemetry.opentelemetry.tracing.Tracing
import zio.telemetry.opentelemetry.tracing.propagation.TraceContextPropagator

import scala.collection.mutable

private class OpenTelemetryTracingZioBackend[+P](
private abstract class OpenTelemetryTracingZioBackend[+P](
delegate: GenericBackend[Task, P],
tracer: OpenTelemetryZioTracer,
tracer: OpenTelemetryTracer,
tracing: Tracing
) extends DelegateBackend[Task, P](delegate)
with Backend[Task] {
def send[T](request: GenericRequest[T, P with Effect[Task]]): Task[Response[T]] = {
val carrier: mutable.Map[String, String] = mutable.Map().empty
val propagator: TextMapPropagator = W3CTraceContextPropagator.getInstance()
val setter: TextMapSetter[mutable.Map[String, String]] = (carrier, key, value) => carrier.update(key, value)

(for {
_ <- Tracing.inject(propagator, carrier, setter)
_ <- tracer.before(request)
resp <- delegate.send(request.headers(carrier.toMap))
_ <- tracer.after(resp)
} yield resp)
.span(tracer.spanName(request), SpanKind.CLIENT, { case _ => StatusCode.ERROR })
.provideLayer(ZLayer.succeed(tracing))
}
def send[T](request: GenericRequest[T, P with Effect[Task]]): Task[Response[T]] =
ZIO.scoped {
val carrier = OutgoingContextCarrier.default()
for {
_ <- tracing.spanScoped(tracer.spanName(request), SpanKind.CLIENT, tracer.requestAttributes(request))
_ <- tracing.injectSpan(TraceContextPropagator.default, carrier)
resp <- delegate.send(request.headers(carrier.kernel.toMap))
_ <- ZIO.addFinalizer(tracing.getCurrentSpanUnsafe.map(_.setAllAttributes(tracer.responseAttribute(resp))))
} yield resp
}
}

object OpenTelemetryTracingZioBackend {
def apply(other: Backend[Task], tracing: Tracing): Backend[Task] =
apply(other, tracing, OpenTelemetryZioTracer.Default)
apply(other, tracing, OpenTelemetryTracer.Default)

def apply(other: WebSocketBackend[Task], tracing: Tracing): WebSocketBackend[Task] =
apply(other, tracing, OpenTelemetryZioTracer.Default)
apply(other, tracing, OpenTelemetryTracer.Default)

def apply[S](other: StreamBackend[Task, S], tracing: Tracing): StreamBackend[Task, S] =
apply(other, tracing, OpenTelemetryZioTracer.Default)
apply(other, tracing, OpenTelemetryTracer.Default)

def apply[S](other: WebSocketStreamBackend[Task, S], tracing: Tracing): WebSocketStreamBackend[Task, S] =
apply(other, tracing, OpenTelemetryZioTracer.Default)
apply(other, tracing, OpenTelemetryTracer.Default)

def apply(other: Backend[Task], tracing: Tracing, tracer: OpenTelemetryZioTracer): Backend[Task] =
new OpenTelemetryTracingZioBackend(other, tracer, tracing)
def apply(other: Backend[Task], tracing: Tracing, tracer: OpenTelemetryTracer): Backend[Task] =
new OpenTelemetryTracingZioBackend(other, tracer, tracing) with Backend[Task]

def apply(other: WebSocketBackend[Task], tracing: Tracing, tracer: OpenTelemetryZioTracer): WebSocketBackend[Task] =
def apply(other: WebSocketBackend[Task], tracing: Tracing, tracer: OpenTelemetryTracer): WebSocketBackend[Task] =
new OpenTelemetryTracingZioBackend(other, tracer, tracing) with WebSocketBackend[Task]

def apply[S](
other: StreamBackend[Task, S],
tracing: Tracing,
tracer: OpenTelemetryZioTracer
tracer: OpenTelemetryTracer
): StreamBackend[Task, S] =
new OpenTelemetryTracingZioBackend(other, tracer, tracing) with StreamBackend[Task, S]

def apply[S](
other: WebSocketStreamBackend[Task, S],
tracing: Tracing,
tracer: OpenTelemetryZioTracer
tracer: OpenTelemetryTracer
): WebSocketStreamBackend[Task, S] =
new OpenTelemetryTracingZioBackend(other, tracer, tracing) with WebSocketStreamBackend[Task, S]
}

trait OpenTelemetryZioTracer {
trait OpenTelemetryTracer {
def spanName[T](request: GenericRequest[T, Nothing]): String
def before[T](request: GenericRequest[T, Nothing]): RIO[Tracing, Unit]
def after[T](response: Response[T]): RIO[Tracing, Unit]
def requestAttributes[T](request: GenericRequest[T, Nothing]): Attributes
def responseAttribute[T](response: Response[T]): Attributes
}

object OpenTelemetryZioTracer {
val Default: OpenTelemetryZioTracer = new OpenTelemetryZioTracer {
object OpenTelemetryTracer {
lazy val Default: OpenTelemetryTracer = new OpenTelemetryTracer {
override def spanName[T](request: GenericRequest[T, Nothing]): String = s"HTTP ${request.method.method}"
override def before[T](request: GenericRequest[T, Nothing]): RIO[Tracing, Unit] =
Tracing.setAttribute("http.method", request.method.method) *>
Tracing.setAttribute("http.url", request.uri.toString()) *>
ZIO.unit
override def after[T](response: Response[T]): RIO[Tracing, Unit] =
Tracing.setAttribute("http.status_code", response.code.code.toLong) *>
ZIO.unit
override def requestAttributes[T](request: GenericRequest[T, Nothing]): Attributes =
Attributes.builder
.put(HttpAttributes.HTTP_REQUEST_METHOD, request.method.method)
.put(UrlAttributes.URL_FULL, request.uri.path.mkString("/"))
.build()

override def responseAttribute[T](response: Response[T]): Attributes =
Attributes.builder
.put(HttpAttributes.HTTP_RESPONSE_STATUS_CODE, response.code.code.toLong: java.lang.Long)
.build()
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,11 @@ import org.scalatest.flatspec.AnyFlatSpec
import org.scalatest.matchers.should.Matchers
import sttp.client4.impl.zio.{RIOMonadAsyncError, ZioTestBase}
import sttp.client4.testing.{BackendStub, ResponseStub}
import sttp.client4.{basicRequest, Backend, GenericRequest, Response, UriContext}
import sttp.client4.{basicRequest, Backend, GenericRequest, UriContext}
import sttp.model.StatusCode
import zio.{Runtime, Task, Unsafe, ZIO}
import zio.telemetry.opentelemetry.Tracing
import zio.telemetry.opentelemetry.OpenTelemetry
import zio.telemetry.opentelemetry.tracing.Tracing
import zio.{Runtime, Task, Unsafe, ZIO, ZLayer}

import scala.collection.JavaConverters._
import scala.collection.mutable
Expand All @@ -24,29 +25,38 @@ class OpenTelemetryTracingZioBackendTest extends AnyFlatSpec with Matchers with

private val mockTracer =
SdkTracerProvider.builder().addSpanProcessor(SimpleSpanProcessor.create(spanExporter)).build().get(getClass.getName)
private val mockTracing = Unsafe.unsafeCompat { implicit u =>
Runtime.default.unsafe.run(ZIO.scoped(Tracing.scoped(mockTracer))).getOrThrow()
}

private val mockTracingLayer = (OpenTelemetry.contextJVM ++ ZLayer.succeed(mockTracer)) >>> Tracing.live()

private val backend: Backend[Task] =
OpenTelemetryTracingZioBackend(
BackendStub(new RIOMonadAsyncError[Any]).whenRequestMatchesPartial {
case r if r.uri.toString.contains("echo") =>
recordedRequests += r
ResponseStub.ok("")
case r if r.uri.toString.contains("error") =>
throw new RuntimeException("something went wrong")
},
mockTracing
)
Unsafe.unsafe { implicit u =>
Runtime.default.unsafe
.run(
ZIO
.serviceWith[Tracing](
OpenTelemetryTracingZioBackend(
BackendStub(new RIOMonadAsyncError[Any]).whenRequestMatchesPartial {
case r if r.uri.toString.contains("echo") =>
recordedRequests += r
ResponseStub.ok("")
case r if r.uri.toString.contains("error") =>
throw new RuntimeException("something went wrong")
},
_
)
)
.provideLayer(mockTracingLayer)
)
.getOrThrow()
}

before {
recordedRequests.clear()
spanExporter.reset()
}

"ZioTelemetryOpenTelemetryBackend" should "record spans for requests" in {
val response = Unsafe.unsafeCompat { implicit u =>
val response = Unsafe.unsafe { implicit u =>
Runtime.default.unsafe.run(basicRequest.post(uri"http://stub/echo").send(backend)).getOrThrow()
}
response.code shouldBe StatusCode.Ok
Expand All @@ -57,7 +67,7 @@ class OpenTelemetryTracingZioBackendTest extends AnyFlatSpec with Matchers with
}

it should "propagate span" in {
val response = Unsafe.unsafeCompat { implicit u =>
val response = Unsafe.unsafe { implicit u =>
Runtime.default.unsafe.run(basicRequest.post(uri"http://stub/echo").send(backend)).getOrThrow()
}
response.code shouldBe StatusCode.Ok
Expand All @@ -71,7 +81,7 @@ class OpenTelemetryTracingZioBackendTest extends AnyFlatSpec with Matchers with
}

it should "set span status in case of error" in {
Unsafe.unsafeCompat { implicit u =>
Unsafe.unsafe { implicit u =>
Runtime.default.unsafe.run(basicRequest.post(uri"http://stub/error").send(backend))
}

Expand Down
Loading