diff --git a/build.sbt b/build.sbt index 20d24cc86..2a9e03164 100644 --- a/build.sbt +++ b/build.sbt @@ -71,8 +71,6 @@ inThisBuild( ) ) -val excludeInferAny = { options: Seq[String] => options.filterNot(Set("-Xlint:infer-any")) } - lazy val root = project .in(file(".")) .settings( @@ -173,8 +171,8 @@ lazy val zioKafkaTracing = .settings(publish / skip := true) .settings( libraryDependencies ++= Seq( - "dev.zio" %% "zio-opentracing" % "3.0.0", - "io.opentelemetry" % "opentelemetry-sdk-testing" % "1.43.0" % Test + "dev.zio" %% "zio-opentelemetry" % "3.0.0", + "io.opentelemetry" % "opentelemetry-sdk-testing" % "1.43.0" % Test ) ++ `embedded-kafka`.value ) diff --git a/zio-kafka-tracing/src/main/scala/zio/kafka/tracing/TracingProducerAspect.scala b/zio-kafka-tracing/src/main/scala/zio/kafka/tracing/TracingProducerAspect.scala index 676838e36..8d6361be0 100644 --- a/zio-kafka-tracing/src/main/scala/zio/kafka/tracing/TracingProducerAspect.scala +++ b/zio-kafka-tracing/src/main/scala/zio/kafka/tracing/TracingProducerAspect.scala @@ -1,12 +1,14 @@ package zio.kafka.tracing -import io.opentracing.propagation.{ Format, TextMapAdapter } import org.apache.kafka.clients.producer.RecordMetadata import org.apache.kafka.common.header.Header +import org.apache.kafka.common.header.internals.RecordHeaders import org.apache.kafka.common.{ Metric, MetricName, PartitionInfo } import zio.kafka.producer._ -import zio.telemetry.opentracing.OpenTracing -import zio.{ Chunk, RIO, Task, UIO, ZIO } +import zio.telemetry.opentelemetry.tracing.Tracing +import zio.telemetry.opentelemetry.tracing.propagation.TraceContextPropagator +import zio._ +import zio.telemetry.opentelemetry.context.OutgoingContextCarrier import java.nio.charset.StandardCharsets import scala.collection.mutable @@ -16,56 +18,59 @@ object TracingProducerAspect { /** * Adds open tracing headers to each outgoing record of a ZIO Kafka [[Producer]]. - * - * WARNING: this aspect mutates the headers in the record by adding the tracing headers directly. Be careful NOT to - * reuse the records after passing the records to the producer. */ - def traced: ProducerAspect[Nothing, OpenTracing] = new ProducerAspect[Nothing, OpenTracing] { - override def apply[R >: Nothing <: OpenTracing](wrapped: ProducerWithEnv[R]): ProducerWithEnv[R] = - new ProducerWithEnv[R] with DefaultProducer[R] { - // noinspection YieldingZIOEffectInspection - override def produceChunkAsyncWithFailures( - records: Chunk[ByteRecord] - ): RIO[R, UIO[Chunk[Either[Throwable, RecordMetadata]]]] = - for { - recordsWithHeaders <- ZIO.foreach(records)(withTracingHeaders) - result <- wrapped.produceChunkAsyncWithFailures(recordsWithHeaders) - } yield result + def traced: ProducerAspect[Nothing, Tracing & TraceContextPropagator] = + new ProducerAspect[Nothing, Tracing & TraceContextPropagator] { + override def apply[R >: Nothing <: Tracing & TraceContextPropagator]( + wrapped: ProducerWithEnv[R] + ): ProducerWithEnv[R] = + new ProducerWithEnv[R] with DefaultProducer[R] { + // noinspection YieldingZIOEffectInspection + override def produceChunkAsyncWithFailures( + records: Chunk[ByteRecord] + ): RIO[R, UIO[Chunk[Either[Throwable, RecordMetadata]]]] = + for { + recordWithTraceHeaders <- ZIO.foreach(records)(withTraceHeaders) + result <- wrapped.produceChunkAsyncWithFailures(recordWithTraceHeaders) + } yield result - // noinspection YieldingZIOEffectInspection - override def produceAsync(record: ByteRecord): RIO[R, Task[RecordMetadata]] = - for { - recordWithHeaders <- withTracingHeaders(record) - result <- wrapped.produceAsync(recordWithHeaders) - } yield result + // noinspection YieldingZIOEffectInspection + override def produceAsync(record: ByteRecord): RIO[R, Task[RecordMetadata]] = + for { + recordWithTraceHeaders <- withTraceHeaders(record) + result <- wrapped.produceAsync(recordWithTraceHeaders) + } yield result - override def partitionsFor(topic: String): RIO[R, Chunk[PartitionInfo]] = - wrapped.partitionsFor(topic) + override def partitionsFor(topic: String): RIO[R, Chunk[PartitionInfo]] = + wrapped.partitionsFor(topic) - override def flush: RIO[R, Unit] = - wrapped.flush + override def flush: RIO[R, Unit] = + wrapped.flush - override def metrics: RIO[R, Map[MetricName, Metric]] = - wrapped.metrics + override def metrics: RIO[R, Map[MetricName, Metric]] = + wrapped.metrics - private def withTracingHeaders(record: ByteRecord): ZIO[OpenTracing, Nothing, ByteRecord] = - kafkaTracingHeaders(record).map { headers => - headers.foreach(header => record.headers().add(header)) - record - } + private def withTraceHeaders(record: ByteRecord): ZIO[Tracing & TraceContextPropagator, Nothing, ByteRecord] = + traceKafkaHeaders.map { extraHeaders => + new ByteRecord( + record.topic(), + record.partition(), + record.timestamp(), + record.key(), + record.value(), + new RecordHeaders((record.headers().asScala ++ extraHeaders).asJava) + ) + } - private def kafkaTracingHeaders(record: ByteRecord): ZIO[OpenTracing, Nothing, Seq[Header]] = - ZIO.serviceWithZIO[OpenTracing] { tracing => - import tracing.aspects._ - val headers = mutable.Map.empty[String, String] - val buffer = new TextMapAdapter(headers.asJava) - tracing - .inject(Format.Builtin.HTTP_HEADERS, buffer) - .zipLeft(ZIO.unit @@ spanFrom(Format.Builtin.HTTP_HEADERS, buffer, s"produce to topic ${record.topic()}")) - .as(headers.toSeq.map(PairHeader)) - } - } - } + private def traceKafkaHeaders: ZIO[Tracing & TraceContextPropagator, Nothing, Seq[Header]] = + for { + tracing <- ZIO.service[Tracing] + traceContextPropagator <- ZIO.service[TraceContextPropagator] + headers = mutable.Map.empty[String, String] + _ <- tracing.injectSpan(traceContextPropagator, OutgoingContextCarrier.default(headers)) + } yield headers.toSeq.map(PairHeader) + } + } private case class PairHeader(keyValue: (String, String)) extends Header { override def key(): String = keyValue._1