Skip to content

Commit

Permalink
Switch to opentelemetry
Browse files Browse the repository at this point in the history
erikvanoosten committed Oct 25, 2024

Verified

This commit was signed with the committer’s verified signature.
erikvanoosten Erik van Oosten
1 parent aa6676d commit 1d7470e
Showing 2 changed files with 53 additions and 50 deletions.
6 changes: 2 additions & 4 deletions build.sbt
Original file line number Diff line number Diff line change
@@ -71,8 +71,6 @@ inThisBuild(
)
)

val excludeInferAny = { options: Seq[String] => options.filterNot(Set("-Xlint:infer-any")) }

lazy val root = project
.in(file("."))
.settings(
@@ -173,8 +171,8 @@ lazy val zioKafkaTracing =
.settings(publish / skip := true)
.settings(
libraryDependencies ++= Seq(
"dev.zio" %% "zio-opentracing" % "3.0.0",
"io.opentelemetry" % "opentelemetry-sdk-testing" % "1.43.0" % Test
"dev.zio" %% "zio-opentelemetry" % "3.0.0",
"io.opentelemetry" % "opentelemetry-sdk-testing" % "1.43.0" % Test
) ++ `embedded-kafka`.value
)

Original file line number Diff line number Diff line change
@@ -1,12 +1,14 @@
package zio.kafka.tracing

import io.opentracing.propagation.{ Format, TextMapAdapter }
import org.apache.kafka.clients.producer.RecordMetadata
import org.apache.kafka.common.header.Header
import org.apache.kafka.common.header.internals.RecordHeaders
import org.apache.kafka.common.{ Metric, MetricName, PartitionInfo }
import zio.kafka.producer._
import zio.telemetry.opentracing.OpenTracing
import zio.{ Chunk, RIO, Task, UIO, ZIO }
import zio.telemetry.opentelemetry.tracing.Tracing
import zio.telemetry.opentelemetry.tracing.propagation.TraceContextPropagator
import zio._
import zio.telemetry.opentelemetry.context.OutgoingContextCarrier

import java.nio.charset.StandardCharsets
import scala.collection.mutable
@@ -16,56 +18,59 @@ object TracingProducerAspect {

/**
* Adds open tracing headers to each outgoing record of a ZIO Kafka [[Producer]].
*
* WARNING: this aspect mutates the headers in the record by adding the tracing headers directly. Be careful NOT to
* reuse the records after passing the records to the producer.
*/
def traced: ProducerAspect[Nothing, OpenTracing] = new ProducerAspect[Nothing, OpenTracing] {
override def apply[R >: Nothing <: OpenTracing](wrapped: ProducerWithEnv[R]): ProducerWithEnv[R] =
new ProducerWithEnv[R] with DefaultProducer[R] {
// noinspection YieldingZIOEffectInspection
override def produceChunkAsyncWithFailures(
records: Chunk[ByteRecord]
): RIO[R, UIO[Chunk[Either[Throwable, RecordMetadata]]]] =
for {
recordsWithHeaders <- ZIO.foreach(records)(withTracingHeaders)
result <- wrapped.produceChunkAsyncWithFailures(recordsWithHeaders)
} yield result
def traced: ProducerAspect[Nothing, Tracing & TraceContextPropagator] =
new ProducerAspect[Nothing, Tracing & TraceContextPropagator] {
override def apply[R >: Nothing <: Tracing & TraceContextPropagator](
wrapped: ProducerWithEnv[R]
): ProducerWithEnv[R] =
new ProducerWithEnv[R] with DefaultProducer[R] {
// noinspection YieldingZIOEffectInspection
override def produceChunkAsyncWithFailures(
records: Chunk[ByteRecord]
): RIO[R, UIO[Chunk[Either[Throwable, RecordMetadata]]]] =
for {
recordWithTraceHeaders <- ZIO.foreach(records)(withTraceHeaders)
result <- wrapped.produceChunkAsyncWithFailures(recordWithTraceHeaders)
} yield result

// noinspection YieldingZIOEffectInspection
override def produceAsync(record: ByteRecord): RIO[R, Task[RecordMetadata]] =
for {
recordWithHeaders <- withTracingHeaders(record)
result <- wrapped.produceAsync(recordWithHeaders)
} yield result
// noinspection YieldingZIOEffectInspection
override def produceAsync(record: ByteRecord): RIO[R, Task[RecordMetadata]] =
for {
recordWithTraceHeaders <- withTraceHeaders(record)
result <- wrapped.produceAsync(recordWithTraceHeaders)
} yield result

override def partitionsFor(topic: String): RIO[R, Chunk[PartitionInfo]] =
wrapped.partitionsFor(topic)
override def partitionsFor(topic: String): RIO[R, Chunk[PartitionInfo]] =
wrapped.partitionsFor(topic)

override def flush: RIO[R, Unit] =
wrapped.flush
override def flush: RIO[R, Unit] =
wrapped.flush

override def metrics: RIO[R, Map[MetricName, Metric]] =
wrapped.metrics
override def metrics: RIO[R, Map[MetricName, Metric]] =
wrapped.metrics

private def withTracingHeaders(record: ByteRecord): ZIO[OpenTracing, Nothing, ByteRecord] =
kafkaTracingHeaders(record).map { headers =>
headers.foreach(header => record.headers().add(header))
record
}
private def withTraceHeaders(record: ByteRecord): ZIO[Tracing & TraceContextPropagator, Nothing, ByteRecord] =
traceKafkaHeaders.map { extraHeaders =>
new ByteRecord(
record.topic(),
record.partition(),
record.timestamp(),
record.key(),
record.value(),
new RecordHeaders((record.headers().asScala ++ extraHeaders).asJava)
)
}

private def kafkaTracingHeaders(record: ByteRecord): ZIO[OpenTracing, Nothing, Seq[Header]] =
ZIO.serviceWithZIO[OpenTracing] { tracing =>
import tracing.aspects._
val headers = mutable.Map.empty[String, String]
val buffer = new TextMapAdapter(headers.asJava)
tracing
.inject(Format.Builtin.HTTP_HEADERS, buffer)
.zipLeft(ZIO.unit @@ spanFrom(Format.Builtin.HTTP_HEADERS, buffer, s"produce to topic ${record.topic()}"))
.as(headers.toSeq.map(PairHeader))
}
}
}
private def traceKafkaHeaders: ZIO[Tracing & TraceContextPropagator, Nothing, Seq[Header]] =
for {
tracing <- ZIO.service[Tracing]
traceContextPropagator <- ZIO.service[TraceContextPropagator]
headers = mutable.Map.empty[String, String]
_ <- tracing.injectSpan(traceContextPropagator, OutgoingContextCarrier.default(headers))
} yield headers.toSeq.map(PairHeader)
}
}

private case class PairHeader(keyValue: (String, String)) extends Header {
override def key(): String = keyValue._1

0 comments on commit 1d7470e

Please sign in to comment.