diff --git a/bin/kafka_bridge_run.sh b/bin/kafka_bridge_run.sh index 3b57cd720..e010ac016 100755 --- a/bin/kafka_bridge_run.sh +++ b/bin/kafka_bridge_run.sh @@ -13,4 +13,9 @@ fi # Make sure that we use /dev/urandom JAVA_OPTS="${JAVA_OPTS} -Dvertx.cacheDirBase=/tmp/vertx-cache -Djava.security.egd=file:/dev/./urandom" +# enabling OpenTelemetry with Jaeger by default +if [ -n "$OTEL_SERVICE_NAME" ] && [ -z "$OTEL_TRACES_EXPORTER" ]; then + export OTEL_TRACES_EXPORTER="jaeger" +fi + exec java $JAVA_OPTS $KAFKA_BRIDGE_LOG4J_OPTS -classpath "${MYPATH}/../libs/*" io.strimzi.kafka.bridge.Application "$@" \ No newline at end of file diff --git a/config/application.properties b/config/application.properties index 9bfed2bbe..dd32e3a9b 100644 --- a/config/application.properties +++ b/config/application.properties @@ -1,7 +1,10 @@ #Bridge related settings bridge.id=my-bridge -# uncomment the following line to enable Jaeger tracing, check the documentation how to configure the tracer +# uncomment one the following lines (bridge.tracing) to enable Jaeger tracing, check the documentation how to configure the tracer +# OpenTracing support #bridge.tracing=jaeger +# OpenTelemetry support +#bridge.tracing=opentelemetry #Apache Kafka common kafka.bootstrap.servers=localhost:9092 diff --git a/pom.xml b/pom.xml index 7480449b4..4fb813f88 100644 --- a/pom.xml +++ b/pom.xml @@ -83,7 +83,7 @@ 1.8 2.17.2 1.7.21 - 4.3.1 + 4.3.3 4.1.77.Final 3.2.0 0.33.10 @@ -110,6 +110,8 @@ 1.8.1 0.33.0 0.1.15 + 1.16.0-alpha + 1.16.0 1.3.9 0.12.0 0.7.0 @@ -267,6 +269,46 @@ opentracing-kafka-client ${opentracing-kafka-client.version} + + io.opentelemetry + opentelemetry-sdk-extension-autoconfigure + ${opentelemetry.alpha-version} + + + io.opentelemetry + opentelemetry-api + ${opentelemetry.version} + + + io.opentelemetry + opentelemetry-context + ${opentelemetry.version} + + + io.opentelemetry + opentelemetry-semconv + ${opentelemetry.alpha-version} + + + io.opentelemetry.instrumentation + opentelemetry-kafka-clients-2.6 + ${opentelemetry.alpha-version} + + + io.opentelemetry.instrumentation + opentelemetry-instrumentation-api-semconv + ${opentelemetry.alpha-version} + + + io.opentelemetry + opentelemetry-exporter-jaeger + ${opentelemetry.version} + + + io.vertx + vertx-opentelemetry + ${vertx.version} + io.micrometer micrometer-core @@ -349,6 +391,12 @@ ${vertx.version} test + + io.vertx + vertx-opentracing + ${vertx.version} + test + org.mockito mockito-core @@ -498,6 +546,9 @@ org.apache.logging.log4j:log4j-slf4j-impl io.jaegertracing:jaeger-client org.yaml:snakeyaml + org.apache.tomcat.embed:tomcat-embed-core + + io.opentelemetry:opentelemetry-exporter-jaeger diff --git a/src/main/java/io/strimzi/kafka/bridge/Application.java b/src/main/java/io/strimzi/kafka/bridge/Application.java index 9f3934e0e..2249af991 100644 --- a/src/main/java/io/strimzi/kafka/bridge/Application.java +++ b/src/main/java/io/strimzi/kafka/bridge/Application.java @@ -5,13 +5,11 @@ package io.strimzi.kafka.bridge; -import io.jaegertracing.Configuration; import io.micrometer.core.instrument.MeterRegistry; -import io.opentracing.Tracer; -import io.opentracing.util.GlobalTracer; import io.strimzi.kafka.bridge.amqp.AmqpBridge; import io.strimzi.kafka.bridge.config.BridgeConfig; import io.strimzi.kafka.bridge.http.HttpBridge; +import io.strimzi.kafka.bridge.tracing.TracingUtil; import io.vertx.config.ConfigRetriever; import io.vertx.config.ConfigRetrieverOptions; import io.vertx.config.ConfigStoreOptions; @@ -30,6 +28,7 @@ import org.apache.commons.cli.DefaultParser; import org.apache.commons.cli.Option; import org.apache.commons.cli.Options; +import org.apache.commons.cli.ParseException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -67,7 +66,7 @@ public static void main(String[] args) { try { VertxOptions vertxOptions = new VertxOptions(); JmxCollectorRegistry jmxCollectorRegistry = null; - if (Boolean.valueOf(System.getenv(KAFKA_BRIDGE_METRICS_ENABLED))) { + if (Boolean.parseBoolean(System.getenv(KAFKA_BRIDGE_METRICS_ENABLED))) { log.info("Metrics enabled and exposed on the /metrics endpoint"); // setup Micrometer metrics options vertxOptions.setMetricsOptions(metricsOptions()); @@ -82,13 +81,13 @@ public static void main(String[] args) { CommandLine commandLine = new DefaultParser().parse(generateOptions(), args); ConfigStoreOptions fileStore = new ConfigStoreOptions() - .setType("file") - .setFormat("properties") - .setConfig(new JsonObject().put("path", absoluteFilePath(commandLine.getOptionValue("config-file"))).put("raw-data", true)); + .setType("file") + .setFormat("properties") + .setConfig(new JsonObject().put("path", absoluteFilePath(commandLine.getOptionValue("config-file"))).put("raw-data", true)); ConfigStoreOptions envStore = new ConfigStoreOptions() - .setType("env") - .setConfig(new JsonObject().put("raw-data", true)); + .setType("env") + .setConfig(new JsonObject().put("raw-data", true)); ConfigRetrieverOptions options = new ConfigRetrieverOptions() .addStore(fileStore) @@ -126,6 +125,9 @@ public static void main(String[] args) { } } + // register tracing - if set, etc + TracingUtil.initialize(bridgeConfig); + // when HTTP protocol is enabled, it handles healthy/ready/metrics endpoints as well, // so no need for a standalone embedded HTTP server if (!bridgeConfig.getHttpConfig().isEnabled()) { @@ -133,16 +135,6 @@ public static void main(String[] args) { new EmbeddedHttpServer(vertx, healthChecker, metricsReporter, embeddedHttpServerPort); embeddedHttpServer.start(); } - - // register OpenTracing Jaeger tracer - if ("jaeger".equals(bridgeConfig.getTracing())) { - if (config.get(Configuration.JAEGER_SERVICE_NAME) != null) { - Tracer tracer = Configuration.fromEnv().getTracer(); - GlobalTracer.registerIfAbsent(tracer); - } else { - log.error("Jaeger tracing cannot be initialized because {} environment variable is not defined", Configuration.JAEGER_SERVICE_NAME); - } - } } }); } else { @@ -150,19 +142,19 @@ public static void main(String[] args) { System.exit(1); } }); - } catch (Exception ex) { - log.error("Error starting the bridge", ex); + } catch (RuntimeException | MalformedObjectNameException | IOException | ParseException e) { + log.error("Error starting the bridge", e); System.exit(1); } } /** * Set up the Vert.x metrics options - * + * * @return instance of the MicrometerMetricsOptions on Vert.x */ private static MicrometerMetricsOptions metricsOptions() { - Set set = new HashSet(); + Set set = new HashSet<>(); set.add(MetricsDomain.NAMED_POOLS.name()); set.add(MetricsDomain.VERTICLES.name()); return new MicrometerMetricsOptions() @@ -218,7 +210,7 @@ private static Future deployHttpBridge(Vertx vertx, BridgeConfig bri if (bridgeConfig.getHttpConfig().isEnabled()) { HttpBridge httpBridge = new HttpBridge(bridgeConfig, metricsReporter); - + vertx.deployVerticle(httpBridge, done -> { if (done.succeeded()) { log.info("HTTP verticle instance deployed [{}]", done.result()); diff --git a/src/main/java/io/strimzi/kafka/bridge/MetricsReporter.java b/src/main/java/io/strimzi/kafka/bridge/MetricsReporter.java index 4e8e33bad..ab8e2f315 100644 --- a/src/main/java/io/strimzi/kafka/bridge/MetricsReporter.java +++ b/src/main/java/io/strimzi/kafka/bridge/MetricsReporter.java @@ -28,13 +28,15 @@ public MetricsReporter(JmxCollectorRegistry jmxCollectorRegistry, MeterRegistry this.jmxCollectorRegistry = jmxCollectorRegistry; this.meterRegistry = meterRegistry; if (this.meterRegistry instanceof PrometheusMeterRegistry) { - this.meterRegistry.config().namingConvention(new PrometheusNamingConvention() { - @Override - public String name(String name, Meter.Type type, String baseUnit) { - String metricName = name.startsWith("vertx.") ? name.replace("vertx.", "strimzi.bridge.") : name; - return super.name(metricName, type, baseUnit); - } - }); + this.meterRegistry.config().namingConvention(new MetricsNamingConvention()); + } + } + + private static class MetricsNamingConvention extends PrometheusNamingConvention { + @Override + public String name(String name, Meter.Type type, String baseUnit) { + String metricName = name.startsWith("vertx.") ? name.replace("vertx.", "strimzi.bridge.") : name; + return super.name(metricName, type, baseUnit); } } diff --git a/src/main/java/io/strimzi/kafka/bridge/SinkBridgeEndpoint.java b/src/main/java/io/strimzi/kafka/bridge/SinkBridgeEndpoint.java index 481ed2d95..ab6d89c51 100644 --- a/src/main/java/io/strimzi/kafka/bridge/SinkBridgeEndpoint.java +++ b/src/main/java/io/strimzi/kafka/bridge/SinkBridgeEndpoint.java @@ -5,7 +5,6 @@ package io.strimzi.kafka.bridge; -import io.opentracing.contrib.kafka.TracingConsumerInterceptor; import io.strimzi.kafka.bridge.config.BridgeConfig; import io.strimzi.kafka.bridge.config.KafkaConfig; import io.strimzi.kafka.bridge.tracker.OffsetTracker; @@ -167,9 +166,6 @@ protected void initConsumer(boolean shouldAttachBatchHandler, Properties config) props.putAll(kafkaConfig.getConfig()); props.putAll(kafkaConfig.getConsumerConfig().getConfig()); props.put(ConsumerConfig.GROUP_ID_CONFIG, this.groupId); - if (this.bridgeConfig.getTracing() != null) { - props.put(ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG, TracingConsumerInterceptor.class.getName()); - } if (config != null) props.putAll(config); @@ -200,7 +196,7 @@ protected void subscribe(boolean shouldAttachHandler) { this.subscribed = true; this.setPartitionsAssignmentHandlers(); - Set topics = this.topicSubscriptions.stream().map(ts -> ts.getTopic()).collect(Collectors.toSet()); + Set topics = this.topicSubscriptions.stream().map(SinkTopicSubscription::getTopic).collect(Collectors.toSet()); this.consumer.subscribe(topics, this::subscribeHandler); } @@ -710,7 +706,7 @@ protected void consume(Handler>> consumeH this.consumer.poll(Duration.ofMillis(this.pollTimeOut), consumeHandler); } - protected void commit(Map offsetsData, + protected void commit(Map offsetsData, Handler>> commitOffsetsHandler) { this.consumer.commit(offsetsData, commitOffsetsHandler); } diff --git a/src/main/java/io/strimzi/kafka/bridge/SourceBridgeEndpoint.java b/src/main/java/io/strimzi/kafka/bridge/SourceBridgeEndpoint.java index 3d8595605..ad5e50260 100644 --- a/src/main/java/io/strimzi/kafka/bridge/SourceBridgeEndpoint.java +++ b/src/main/java/io/strimzi/kafka/bridge/SourceBridgeEndpoint.java @@ -5,9 +5,10 @@ package io.strimzi.kafka.bridge; -import io.opentracing.contrib.kafka.TracingProducerInterceptor; import io.strimzi.kafka.bridge.config.BridgeConfig; import io.strimzi.kafka.bridge.config.KafkaConfig; +import io.strimzi.kafka.bridge.tracing.TracingHandle; +import io.strimzi.kafka.bridge.tracing.TracingUtil; import io.vertx.core.AsyncResult; import io.vertx.core.Handler; import io.vertx.core.Vertx; @@ -103,9 +104,9 @@ public void open() { Properties props = new Properties(); props.putAll(kafkaConfig.getConfig()); props.putAll(kafkaConfig.getProducerConfig().getConfig()); - if (this.bridgeConfig.getTracing() != null) { - props.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, TracingProducerInterceptor.class.getName()); - } + + TracingHandle tracing = TracingUtil.getTracing(); + tracing.addTracingPropsToProducerConfig(props); this.producerUnsettledMode = KafkaProducer.create(this.vertx, props, this.keySerializer, this.valueSerializer); diff --git a/src/main/java/io/strimzi/kafka/bridge/converter/DefaultDeserializer.java b/src/main/java/io/strimzi/kafka/bridge/converter/DefaultDeserializer.java index 3216bfc9f..04314f5bf 100644 --- a/src/main/java/io/strimzi/kafka/bridge/converter/DefaultDeserializer.java +++ b/src/main/java/io/strimzi/kafka/bridge/converter/DefaultDeserializer.java @@ -8,6 +8,7 @@ import org.apache.kafka.common.serialization.Deserializer; import java.io.ByteArrayInputStream; +import java.io.IOException; import java.io.ObjectInputStream; import java.util.Map; @@ -26,7 +27,7 @@ public T deserialize(String topic, byte[] data) { try (ByteArrayInputStream b = new ByteArrayInputStream(data); ObjectInputStream o = new ObjectInputStream(b)) { return (T) o.readObject(); - } catch (Exception e) { + } catch (IOException | ClassNotFoundException e) { throw new SerializationException("Error when deserializing", e); } } diff --git a/src/main/java/io/strimzi/kafka/bridge/converter/DefaultSerializer.java b/src/main/java/io/strimzi/kafka/bridge/converter/DefaultSerializer.java index e6c62cc45..3e53809f7 100644 --- a/src/main/java/io/strimzi/kafka/bridge/converter/DefaultSerializer.java +++ b/src/main/java/io/strimzi/kafka/bridge/converter/DefaultSerializer.java @@ -10,6 +10,7 @@ import org.apache.kafka.common.serialization.Serializer; import java.io.ByteArrayOutputStream; +import java.io.IOException; import java.io.ObjectOutputStream; import java.util.Map; @@ -20,7 +21,7 @@ public void configure(Map configs, boolean isKey) { } - @SuppressFBWarnings("PZLA_PREFER_ZERO_LENGTH_ARRAYS") + @SuppressFBWarnings({"PZLA_PREFER_ZERO_LENGTH_ARRAYS"}) @Override public byte[] serialize(String topic, T data) { @@ -30,7 +31,7 @@ public byte[] serialize(String topic, T data) { try (ByteArrayOutputStream b = new ByteArrayOutputStream(); ObjectOutputStream o = new ObjectOutputStream(b)) { o.writeObject(data); return b.toByteArray(); - } catch (Exception e) { + } catch (IOException e) { throw new SerializationException("Error when serializing", e); } } diff --git a/src/main/java/io/strimzi/kafka/bridge/http/HttpSinkBridgeEndpoint.java b/src/main/java/io/strimzi/kafka/bridge/http/HttpSinkBridgeEndpoint.java index 515b7041a..4649bb3ef 100644 --- a/src/main/java/io/strimzi/kafka/bridge/http/HttpSinkBridgeEndpoint.java +++ b/src/main/java/io/strimzi/kafka/bridge/http/HttpSinkBridgeEndpoint.java @@ -6,20 +6,10 @@ package io.strimzi.kafka.bridge.http; import io.netty.handler.codec.http.HttpResponseStatus; -import io.opentracing.References; -import io.opentracing.Span; -import io.opentracing.SpanContext; -import io.opentracing.Tracer; -import io.opentracing.Tracer.SpanBuilder; -import io.opentracing.propagation.Format; -import io.opentracing.propagation.TextMap; -import io.opentracing.propagation.TextMapAdapter; -import io.opentracing.tag.Tags; -import io.opentracing.util.GlobalTracer; import io.strimzi.kafka.bridge.BridgeContentType; +import io.strimzi.kafka.bridge.ConsumerInstanceId; import io.strimzi.kafka.bridge.EmbeddedFormat; import io.strimzi.kafka.bridge.Endpoint; -import io.strimzi.kafka.bridge.ConsumerInstanceId; import io.strimzi.kafka.bridge.SinkBridgeEndpoint; import io.strimzi.kafka.bridge.SinkTopicSubscription; import io.strimzi.kafka.bridge.config.BridgeConfig; @@ -27,6 +17,9 @@ import io.strimzi.kafka.bridge.http.converter.HttpBinaryMessageConverter; import io.strimzi.kafka.bridge.http.converter.HttpJsonMessageConverter; import io.strimzi.kafka.bridge.http.model.HttpBridgeError; +import io.strimzi.kafka.bridge.tracing.SpanHandle; +import io.strimzi.kafka.bridge.tracing.TracingHandle; +import io.strimzi.kafka.bridge.tracing.TracingUtil; import io.vertx.core.AsyncResult; import io.vertx.core.CompositeFuture; import io.vertx.core.Future; @@ -40,15 +33,13 @@ import io.vertx.ext.web.RoutingContext; import io.vertx.kafka.client.common.TopicPartition; import io.vertx.kafka.client.consumer.KafkaConsumerRecord; +import io.vertx.kafka.client.consumer.KafkaConsumerRecords; import io.vertx.kafka.client.consumer.OffsetAndMetadata; -import io.vertx.kafka.client.producer.KafkaHeader; - import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.common.serialization.Deserializer; import java.util.ArrayList; import java.util.HashMap; -import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.Properties; @@ -250,6 +241,61 @@ private void doDeleteConsumer(RoutingContext routingContext) { HttpUtils.sendResponse(routingContext, HttpResponseStatus.NO_CONTENT.code(), null, null); } + private Handler>> pollHandler(RoutingContext routingContext) { + return records -> { + if (records.succeeded()) { + + TracingHandle tracing = TracingUtil.getTracing(); + SpanHandle span = tracing.span(routingContext, HttpOpenApiOperations.POLL.toString()); + + for (int i = 0; i < records.result().size(); i++) { + KafkaConsumerRecord record = records.result().recordAt(i); + tracing.handleRecordSpan(span, record); + } + + span.inject(routingContext); + + HttpResponseStatus responseStatus = HttpResponseStatus.INTERNAL_SERVER_ERROR; + try { + Buffer buffer = messageConverter.toMessages(records.result()); + if (buffer.getBytes().length > this.maxBytes) { + responseStatus = HttpResponseStatus.UNPROCESSABLE_ENTITY; + HttpBridgeError error = new HttpBridgeError( + responseStatus.code(), + "Response exceeds the maximum number of bytes the consumer can receive" + ); + HttpUtils.sendResponse(routingContext, responseStatus.code(), + BridgeContentType.KAFKA_JSON, error.toJson().toBuffer()); + } else { + responseStatus = HttpResponseStatus.OK; + HttpUtils.sendResponse(routingContext, responseStatus.code(), + this.format == EmbeddedFormat.BINARY ? BridgeContentType.KAFKA_JSON_BINARY : BridgeContentType.KAFKA_JSON_JSON, + buffer); + } + } catch (DecodeException e) { + log.error("Error decoding records as JSON", e); + responseStatus = HttpResponseStatus.NOT_ACCEPTABLE; + HttpBridgeError error = new HttpBridgeError( + responseStatus.code(), + e.getMessage() + ); + HttpUtils.sendResponse(routingContext, responseStatus.code(), + BridgeContentType.KAFKA_JSON, error.toJson().toBuffer()); + } finally { + span.finish(responseStatus.code()); + } + + } else { + HttpBridgeError error = new HttpBridgeError( + HttpResponseStatus.INTERNAL_SERVER_ERROR.code(), + records.cause().getMessage() + ); + HttpUtils.sendResponse(routingContext, HttpResponseStatus.INTERNAL_SERVER_ERROR.code(), + BridgeContentType.KAFKA_JSON, error.toJson().toBuffer()); + } + }; + } + private void doPoll(RoutingContext routingContext) { if (topicSubscriptionsPattern == null && topicSubscriptions.isEmpty()) { HttpBridgeError error = new HttpBridgeError( @@ -274,79 +320,7 @@ private void doPoll(RoutingContext routingContext) { this.maxBytes = Long.parseLong(routingContext.request().getParam("max_bytes")); } - this.consume(records -> { - if (records.succeeded()) { - - Tracer tracer = GlobalTracer.get(); - - SpanBuilder spanBuilder = tracer.buildSpan(HttpOpenApiOperations.POLL.toString()); - for (int i = 0; i < records.result().size(); i++) { - KafkaConsumerRecord record = records.result().recordAt(i); - - Map headers = new HashMap<>(); - for (KafkaHeader header : record.headers()) { - headers.put(header.key(), header.value().toString()); - } - - SpanContext parentSpan = tracer.extract(Format.Builtin.HTTP_HEADERS, new TextMapAdapter(headers)); - if (parentSpan != null) { - spanBuilder.addReference(References.FOLLOWS_FROM, parentSpan); - } - } - Span span = spanBuilder.withTag(Tags.SPAN_KIND.getKey(), Tags.SPAN_KIND_SERVER).start(); - HttpTracingUtils.setCommonTags(span, routingContext); - - tracer.inject(span.context(), Format.Builtin.TEXT_MAP, new TextMap() { - @Override - public void put(String key, String value) { - routingContext.response().headers().add(key, value); - } - - @Override - public Iterator> iterator() { - throw new UnsupportedOperationException("TextMapInjectAdapter should only be used with Tracer.inject()"); - } - }); - - HttpResponseStatus responseStatus; - try { - Buffer buffer = messageConverter.toMessages(records.result()); - if (buffer.getBytes().length > this.maxBytes) { - responseStatus = HttpResponseStatus.UNPROCESSABLE_ENTITY; - HttpBridgeError error = new HttpBridgeError( - responseStatus.code(), - "Response exceeds the maximum number of bytes the consumer can receive" - ); - HttpUtils.sendResponse(routingContext, responseStatus.code(), - BridgeContentType.KAFKA_JSON, error.toJson().toBuffer()); - } else { - responseStatus = HttpResponseStatus.OK; - HttpUtils.sendResponse(routingContext, responseStatus.code(), - this.format == EmbeddedFormat.BINARY ? BridgeContentType.KAFKA_JSON_BINARY : BridgeContentType.KAFKA_JSON_JSON, - buffer); - } - } catch (DecodeException e) { - log.error("Error decoding records as JSON", e); - responseStatus = HttpResponseStatus.NOT_ACCEPTABLE; - HttpBridgeError error = new HttpBridgeError( - responseStatus.code(), - e.getMessage() - ); - HttpUtils.sendResponse(routingContext, responseStatus.code(), - BridgeContentType.KAFKA_JSON, error.toJson().toBuffer()); - } - Tags.HTTP_STATUS.set(span, responseStatus.code()); - span.finish(); - - } else { - HttpBridgeError error = new HttpBridgeError( - HttpResponseStatus.INTERNAL_SERVER_ERROR.code(), - records.cause().getMessage() - ); - HttpUtils.sendResponse(routingContext, HttpResponseStatus.INTERNAL_SERVER_ERROR.code(), - BridgeContentType.KAFKA_JSON, error.toJson().toBuffer()); - } - }); + this.consume(pollHandler(routingContext)); } else { HttpBridgeError error = new HttpBridgeError( HttpResponseStatus.NOT_ACCEPTABLE.code(), diff --git a/src/main/java/io/strimzi/kafka/bridge/http/HttpSourceBridgeEndpoint.java b/src/main/java/io/strimzi/kafka/bridge/http/HttpSourceBridgeEndpoint.java index 02928e308..cb889b27d 100644 --- a/src/main/java/io/strimzi/kafka/bridge/http/HttpSourceBridgeEndpoint.java +++ b/src/main/java/io/strimzi/kafka/bridge/http/HttpSourceBridgeEndpoint.java @@ -6,15 +6,6 @@ package io.strimzi.kafka.bridge.http; import io.netty.handler.codec.http.HttpResponseStatus; -import io.opentracing.Span; -import io.opentracing.SpanContext; -import io.opentracing.Tracer; -import io.opentracing.Tracer.SpanBuilder; -import io.opentracing.propagation.Format; -import io.opentracing.propagation.TextMap; -import io.opentracing.propagation.TextMapAdapter; -import io.opentracing.tag.Tags; -import io.opentracing.util.GlobalTracer; import io.strimzi.kafka.bridge.BridgeContentType; import io.strimzi.kafka.bridge.EmbeddedFormat; import io.strimzi.kafka.bridge.Endpoint; @@ -25,10 +16,12 @@ import io.strimzi.kafka.bridge.http.converter.HttpJsonMessageConverter; import io.strimzi.kafka.bridge.http.model.HttpBridgeError; import io.strimzi.kafka.bridge.http.model.HttpBridgeResult; +import io.strimzi.kafka.bridge.tracing.SpanHandle; +import io.strimzi.kafka.bridge.tracing.TracingHandle; +import io.strimzi.kafka.bridge.tracing.TracingUtil; import io.vertx.core.CompositeFuture; import io.vertx.core.Future; import io.vertx.core.Handler; -import io.vertx.core.MultiMap; import io.vertx.core.Promise; import io.vertx.core.Vertx; import io.vertx.core.buffer.Buffer; @@ -41,12 +34,8 @@ import org.apache.kafka.common.serialization.Serializer; import java.util.ArrayList; -import java.util.HashMap; -import java.util.Iterator; import java.util.List; -import java.util.Map; import java.util.UUID; -import java.util.Map.Entry; public class HttpSourceBridgeEndpoint extends SourceBridgeEndpoint { @@ -96,60 +85,34 @@ public void handle(Endpoint endpoint) { boolean isAsync = Boolean.parseBoolean(routingContext.queryParams().get("async")); - Tracer tracer = GlobalTracer.get(); - - MultiMap httpHeaders = routingContext.request().headers(); - Map headers = new HashMap<>(); - for (Entry header: httpHeaders.entries()) { - headers.put(header.getKey(), header.getValue()); - } - String operationName = partition == null ? HttpOpenApiOperations.SEND.toString() : HttpOpenApiOperations.SEND_TO_PARTITION.toString(); - SpanBuilder spanBuilder; - SpanContext parentSpan = tracer.extract(Format.Builtin.HTTP_HEADERS, new TextMapAdapter(headers)); - if (parentSpan == null) { - spanBuilder = tracer.buildSpan(operationName); - } else { - spanBuilder = tracer.buildSpan(operationName).asChildOf(parentSpan); - } - Span span = spanBuilder.withTag(Tags.SPAN_KIND.getKey(), Tags.SPAN_KIND_SERVER).start(); - HttpTracingUtils.setCommonTags(span, routingContext); + + TracingHandle tracing = TracingUtil.getTracing(); + SpanHandle span = tracing.span(routingContext, operationName); try { if (messageConverter == null) { + span.finish(HttpResponseStatus.INTERNAL_SERVER_ERROR.code()); HttpBridgeError error = new HttpBridgeError( HttpResponseStatus.INTERNAL_SERVER_ERROR.code(), HttpResponseStatus.INTERNAL_SERVER_ERROR.reasonPhrase()); HttpUtils.sendResponse(routingContext, HttpResponseStatus.INTERNAL_SERVER_ERROR.code(), BridgeContentType.KAFKA_JSON, error.toJson().toBuffer()); - Tags.HTTP_STATUS.set(span, HttpResponseStatus.INTERNAL_SERVER_ERROR.code()); - span.finish(); return; } records = messageConverter.toKafkaRecords(topic, partition, routingContext.body().buffer()); for (KafkaProducerRecord record :records) { - tracer.inject(span.context(), Format.Builtin.TEXT_MAP, new TextMap() { - @Override - public void put(String key, String value) { - record.addHeader(key, value); - } - - @Override - public Iterator> iterator() { - throw new UnsupportedOperationException("TextMapInjectAdapter should only be used with Tracer.inject()"); - } - }); + span.inject(record); } } catch (Exception e) { + span.finish(HttpResponseStatus.UNPROCESSABLE_ENTITY.code()); HttpBridgeError error = new HttpBridgeError( HttpResponseStatus.UNPROCESSABLE_ENTITY.code(), e.getMessage()); HttpUtils.sendResponse(routingContext, HttpResponseStatus.UNPROCESSABLE_ENTITY.code(), BridgeContentType.KAFKA_JSON, error.toJson().toBuffer()); - Tags.HTTP_STATUS.set(span, HttpResponseStatus.UNPROCESSABLE_ENTITY.code()); - span.finish(); return; } List> results = new ArrayList<>(records.size()); @@ -157,9 +120,10 @@ public Iterator> iterator() { // start sending records asynchronously if (isAsync) { // if async is specified, return immediately once records are sent - this.sendAsyncRecords(records); - Tags.HTTP_STATUS.set(span, HttpResponseStatus.NO_CONTENT.code()); - span.finish(); + for (KafkaProducerRecord record : records) { + this.send(record, null); + } + span.finish(HttpResponseStatus.NO_CONTENT.code()); HttpUtils.sendResponse(routingContext, HttpResponseStatus.NO_CONTENT.code(), BridgeContentType.KAFKA_JSON, null); this.maybeClose(); @@ -190,11 +154,10 @@ public Iterator> iterator() { results.add(new HttpBridgeResult<>(new HttpBridgeError(code, msg))); } } - - Tags.HTTP_STATUS.set(span, HttpResponseStatus.OK.code()); - span.finish(); + // always return OK, since failure cause is in the response, per message + span.finish(HttpResponseStatus.OK.code()); HttpUtils.sendResponse(routingContext, HttpResponseStatus.OK.code(), - BridgeContentType.KAFKA_JSON, buildOffsets(results).toBuffer()); + BridgeContentType.KAFKA_JSON, buildOffsets(results).toBuffer()); this.maybeClose(); }); } @@ -225,12 +188,6 @@ private JsonObject buildOffsets(List> results) { return jsonResponse; } - private void sendAsyncRecords(List> records) { - for (KafkaProducerRecord record : records) { - this.send(record, null); - } - } - private int handleError(Throwable ex) { if (ex instanceof TimeoutException && ex.getMessage() != null && ex.getMessage().contains("not present in metadata")) { diff --git a/src/main/java/io/strimzi/kafka/bridge/http/HttpTracingUtils.java b/src/main/java/io/strimzi/kafka/bridge/http/HttpTracingUtils.java deleted file mode 100644 index 64d481178..000000000 --- a/src/main/java/io/strimzi/kafka/bridge/http/HttpTracingUtils.java +++ /dev/null @@ -1,24 +0,0 @@ -/* - * Copyright Strimzi authors. - * License: Apache License 2.0 (see the file LICENSE or http://apache.org/licenses/LICENSE-2.0.html). - */ - -package io.strimzi.kafka.bridge.http; - -import io.opentracing.Span; -import io.opentracing.tag.Tags; -import io.vertx.ext.web.RoutingContext; - -public class HttpTracingUtils { - - public static final String COMPONENT = "strimzi-kafka-bridge"; - public static final String KAFKA_SERVICE = "kafka"; - - - public static void setCommonTags(Span span, RoutingContext routingContext) { - Tags.COMPONENT.set(span, HttpTracingUtils.COMPONENT); - Tags.PEER_SERVICE.set(span, HttpTracingUtils.KAFKA_SERVICE); - Tags.HTTP_METHOD.set(span, routingContext.request().method().name()); - Tags.HTTP_URL.set(span, routingContext.request().uri()); - } -} \ No newline at end of file diff --git a/src/main/java/io/strimzi/kafka/bridge/tracing/NoopTracingHandle.java b/src/main/java/io/strimzi/kafka/bridge/tracing/NoopTracingHandle.java new file mode 100644 index 000000000..6f0664235 --- /dev/null +++ b/src/main/java/io/strimzi/kafka/bridge/tracing/NoopTracingHandle.java @@ -0,0 +1,56 @@ +/* + * Copyright Strimzi authors. + * License: Apache License 2.0 (see the file LICENSE or http://apache.org/licenses/LICENSE-2.0.html). + */ + +package io.strimzi.kafka.bridge.tracing; + +import io.strimzi.kafka.bridge.config.BridgeConfig; +import io.vertx.ext.web.RoutingContext; +import io.vertx.kafka.client.consumer.KafkaConsumerRecord; +import io.vertx.kafka.client.producer.KafkaProducerRecord; + +import java.util.Properties; + +final class NoopTracingHandle implements TracingHandle { + @Override + public String envServiceName() { + return null; + } + + @Override + public String serviceName(BridgeConfig config) { + return null; + } + + @Override + public void initialize() { + } + + @Override + public SpanHandle span(RoutingContext routingContext, String operationName) { + return new NoopSpanHandle<>(); + } + + @Override + public void handleRecordSpan(SpanHandle parentSpanHandle, KafkaConsumerRecord record) { + } + + @Override + public void addTracingPropsToProducerConfig(Properties props) { + } + + private static final class NoopSpanHandle implements SpanHandle { + @Override + public void inject(KafkaProducerRecord record) { + } + + @Override + public void inject(RoutingContext routingContext) { + } + + @Override + public void finish(int code) { + } + } +} diff --git a/src/main/java/io/strimzi/kafka/bridge/tracing/OpenTelemetryHandle.java b/src/main/java/io/strimzi/kafka/bridge/tracing/OpenTelemetryHandle.java new file mode 100644 index 000000000..30ecf04a0 --- /dev/null +++ b/src/main/java/io/strimzi/kafka/bridge/tracing/OpenTelemetryHandle.java @@ -0,0 +1,186 @@ +/* + * Copyright Strimzi authors. + * License: Apache License 2.0 (see the file LICENSE or http://apache.org/licenses/LICENSE-2.0.html). + */ + +package io.strimzi.kafka.bridge.tracing; + +import io.netty.handler.codec.http.HttpResponseStatus; +import io.opentelemetry.api.GlobalOpenTelemetry; +import io.opentelemetry.api.trace.Span; +import io.opentelemetry.api.trace.SpanBuilder; +import io.opentelemetry.api.trace.SpanContext; +import io.opentelemetry.api.trace.SpanKind; +import io.opentelemetry.api.trace.StatusCode; +import io.opentelemetry.api.trace.Tracer; +import io.opentelemetry.context.Context; +import io.opentelemetry.context.Scope; +import io.opentelemetry.context.propagation.TextMapGetter; +import io.opentelemetry.context.propagation.TextMapPropagator; +import io.opentelemetry.instrumentation.api.instrumenter.messaging.MessageOperation; +import io.opentelemetry.instrumentation.kafkaclients.TracingProducerInterceptor; +import io.opentelemetry.sdk.autoconfigure.AutoConfiguredOpenTelemetrySdk; +import io.opentelemetry.semconv.trace.attributes.SemanticAttributes; +import io.strimzi.kafka.bridge.config.BridgeConfig; +import io.vertx.ext.web.RoutingContext; +import io.vertx.kafka.client.consumer.KafkaConsumerRecord; +import io.vertx.kafka.client.producer.KafkaProducerRecord; +import org.apache.kafka.clients.producer.ProducerConfig; + +import java.util.Map; +import java.util.Properties; + +import static io.strimzi.kafka.bridge.tracing.TracingConstants.COMPONENT; +import static io.strimzi.kafka.bridge.tracing.TracingConstants.KAFKA_SERVICE; +import static io.strimzi.kafka.bridge.tracing.TracingConstants.OPENTELEMETRY_SERVICE_NAME_ENV_KEY; + +/** + * OpenTelemetry implementation of Tracing. + * + * Note: we use Vert.x OpenTelemetry extension to setup custom ContextStorageProvider: + * @see io.vertx.tracing.opentelemetry.VertxContextStorageProvider + * @see io.opentelemetry.context.LazyStorage looks up all ContextStorageProviders via service-loader pattern, + * since Vert.x OpenTelemetry support is on the classpath with VertxContextStorageProvider, that one is used. + */ +class OpenTelemetryHandle implements TracingHandle { + + private Tracer tracer; + + static void setCommonAttributes(SpanBuilder builder, RoutingContext routingContext) { + builder.setAttribute(SemanticAttributes.PEER_SERVICE, KAFKA_SERVICE); + builder.setAttribute(SemanticAttributes.HTTP_METHOD, routingContext.request().method().name()); + builder.setAttribute(SemanticAttributes.HTTP_URL, routingContext.request().uri()); + } + + @Override + public String envServiceName() { + return OPENTELEMETRY_SERVICE_NAME_ENV_KEY; + } + + @Override + public String serviceName(BridgeConfig config) { + return System.getenv(envServiceName()); + } + + @Override + public void initialize() { + System.setProperty("otel.metrics.exporter", "none"); // disable metrics + AutoConfiguredOpenTelemetrySdk.initialize(); + } + + private Tracer get() { + if (tracer == null) { + tracer = GlobalOpenTelemetry.getTracer(COMPONENT); + } + return tracer; + } + + private SpanBuilder getSpanBuilder(RoutingContext routingContext, String operationName) { + Tracer tracer = get(); + SpanBuilder spanBuilder; + Context parentContext = propagator().extract(Context.current(), routingContext, ROUTING_CONTEXT_GETTER); + if (parentContext == null) { + spanBuilder = tracer.spanBuilder(operationName); + } else { + spanBuilder = tracer.spanBuilder(operationName).setParent(parentContext); + } + return spanBuilder; + } + + @Override + public void handleRecordSpan(SpanHandle parentSpanHandle, KafkaConsumerRecord record) { + String operationName = record.topic() + " " + MessageOperation.RECEIVE; + SpanBuilder spanBuilder = get().spanBuilder(operationName); + Context parentContext = propagator().extract(Context.current(), TracingUtil.toHeaders(record), MG); + if (parentContext != null) { + Span parentSpan = Span.fromContext(parentContext); + SpanContext psc = parentSpan != null ? parentSpan.getSpanContext() : null; + if (psc != null) { + spanBuilder.addLink(psc); + } + } + spanBuilder + .setSpanKind(SpanKind.CONSUMER) + .setParent(Context.current()) + .startSpan() + .end(); + } + + private static TextMapPropagator propagator() { + return GlobalOpenTelemetry.getPropagators().getTextMapPropagator(); + } + + private static final TextMapGetter ROUTING_CONTEXT_GETTER = new TextMapGetter() { + @Override + public Iterable keys(RoutingContext rc) { + return rc.request().headers().names(); + } + + @Override + public String get(RoutingContext rc, String key) { + if (rc == null) { + return null; + } + return rc.request().headers().get(key); + } + }; + + private static final TextMapGetter> MG = new TextMapGetter>() { + @Override + public Iterable keys(Map map) { + return map.keySet(); + } + + @Override + public String get(Map map, String key) { + return map != null ? map.get(key) : null; + } + }; + + @Override + public SpanHandle span(RoutingContext routingContext, String operationName) { + return buildSpan(getSpanBuilder(routingContext, operationName), routingContext); + } + + private static SpanHandle buildSpan(SpanBuilder spanBuilder, RoutingContext routingContext) { + spanBuilder.setSpanKind(SpanKind.SERVER); + setCommonAttributes(spanBuilder, routingContext); + return new OTelSpanHandle<>(spanBuilder.startSpan()); + } + + @Override + public void addTracingPropsToProducerConfig(Properties props) { + TracingUtil.addProperty(props, ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, TracingProducerInterceptor.class.getName()); + } + + private static final class OTelSpanHandle implements SpanHandle { + private final Span span; + private final Scope scope; + + public OTelSpanHandle(Span span) { + this.span = span; + this.scope = span.makeCurrent(); + } + + @Override + public void inject(KafkaProducerRecord record) { + propagator().inject(Context.current(), record, KafkaProducerRecord::addHeader); + } + + @Override + public void inject(RoutingContext routingContext) { + propagator().inject(Context.current(), routingContext, (rc, key, value) -> rc.response().headers().add(key, value)); + } + + @Override + public void finish(int code) { + try { + span.setAttribute(SemanticAttributes.HTTP_STATUS_CODE, code); + span.setStatus(code == HttpResponseStatus.OK.code() ? StatusCode.OK : StatusCode.ERROR); + scope.close(); + } finally { + span.end(); + } + } + } +} diff --git a/src/main/java/io/strimzi/kafka/bridge/tracing/OpenTracingHandle.java b/src/main/java/io/strimzi/kafka/bridge/tracing/OpenTracingHandle.java new file mode 100644 index 000000000..4a32c9341 --- /dev/null +++ b/src/main/java/io/strimzi/kafka/bridge/tracing/OpenTracingHandle.java @@ -0,0 +1,161 @@ +/* + * Copyright Strimzi authors. + * License: Apache License 2.0 (see the file LICENSE or http://apache.org/licenses/LICENSE-2.0.html). + */ + +package io.strimzi.kafka.bridge.tracing; + +import io.jaegertracing.Configuration; +import io.opentracing.References; +import io.opentracing.Span; +import io.opentracing.SpanContext; +import io.opentracing.Tracer; +import io.opentracing.contrib.kafka.TracingKafkaUtils; +import io.opentracing.contrib.kafka.TracingProducerInterceptor; +import io.opentracing.propagation.Format; +import io.opentracing.propagation.TextMap; +import io.opentracing.propagation.TextMapAdapter; +import io.opentracing.tag.Tags; +import io.opentracing.util.GlobalTracer; +import io.strimzi.kafka.bridge.config.BridgeConfig; +import io.vertx.core.http.HttpServerRequest; +import io.vertx.ext.web.RoutingContext; +import io.vertx.kafka.client.consumer.KafkaConsumerRecord; +import io.vertx.kafka.client.producer.KafkaProducerRecord; +import org.apache.kafka.clients.producer.ProducerConfig; + +import java.util.Iterator; +import java.util.Map; +import java.util.Properties; + +import static io.strimzi.kafka.bridge.tracing.TracingConstants.COMPONENT; +import static io.strimzi.kafka.bridge.tracing.TracingConstants.KAFKA_SERVICE; + +/** + * OpenTracing implementation of TracingHandle. + */ +class OpenTracingHandle implements TracingHandle { + + static void setCommonTags(Span span, RoutingContext routingContext) { + Tags.COMPONENT.set(span, COMPONENT); + Tags.PEER_SERVICE.set(span, KAFKA_SERVICE); + Tags.HTTP_METHOD.set(span, routingContext.request().method().name()); + Tags.HTTP_URL.set(span, routingContext.request().uri()); + } + + @Override + public String envServiceName() { + return Configuration.JAEGER_SERVICE_NAME; + } + + @Override + public String serviceName(BridgeConfig config) { + return System.getenv(envServiceName()); + } + + @Override + public void initialize() { + Tracer tracer = Configuration.fromEnv().getTracer(); + GlobalTracer.registerIfAbsent(tracer); + } + + @Override + public SpanHandle span(RoutingContext routingContext, String operationName) { + Tracer.SpanBuilder spanBuilder = getSpanBuilder(routingContext, operationName); + return buildSpan(spanBuilder, routingContext); + } + + @Override + public void handleRecordSpan(SpanHandle parentSpanHandle, KafkaConsumerRecord record) { + Tracer tracer = GlobalTracer.get(); + Span span = ((OTSpanHandle) parentSpanHandle).span; + Tracer.SpanBuilder spanBuilder = tracer.buildSpan(TracingKafkaUtils.FROM_PREFIX + record.topic()) + .asChildOf(span).withTag(Tags.SPAN_KIND.getKey(), Tags.SPAN_KIND_CONSUMER); + SpanContext parentSpan = tracer.extract(Format.Builtin.TEXT_MAP, new TextMapAdapter(TracingUtil.toHeaders(record))); + if (parentSpan != null) { + spanBuilder.addReference(References.FOLLOWS_FROM, parentSpan); + } + spanBuilder.start().finish(); + } + + private Tracer.SpanBuilder getSpanBuilder(RoutingContext rc, String operationName) { + Tracer tracer = GlobalTracer.get(); + SpanContext parentSpan = tracer.extract(Format.Builtin.HTTP_HEADERS, new RequestTextMap(rc.request())); + return tracer.buildSpan(operationName).asChildOf(parentSpan); + } + + private static class RequestTextMap implements TextMap { + private final HttpServerRequest request; + + public RequestTextMap(HttpServerRequest request) { + this.request = request; + } + + @Override + public Iterator> iterator() { + return request.headers().iterator(); + } + + @Override + public void put(String key, String value) { + request.headers().add(key, value); + } + } + + private static SpanHandle buildSpan(Tracer.SpanBuilder spanBuilder, RoutingContext routingContext) { + Span span = spanBuilder.withTag(Tags.SPAN_KIND.getKey(), Tags.SPAN_KIND_SERVER).start(); + setCommonTags(span, routingContext); + return new OTSpanHandle(span); + } + + @Override + public void addTracingPropsToProducerConfig(Properties props) { + TracingUtil.addProperty(props, ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, TracingProducerInterceptor.class.getName()); + } + + private static final class OTSpanHandle implements SpanHandle { + private final Span span; + + public OTSpanHandle(Span span) { + this.span = span; + } + + @Override + public void inject(KafkaProducerRecord record) { + Tracer tracer = GlobalTracer.get(); + tracer.inject(span.context(), Format.Builtin.TEXT_MAP, new TextMap() { + @Override + public void put(String key, String value) { + record.addHeader(key, value); + } + + @Override + public Iterator> iterator() { + throw new UnsupportedOperationException("TextMapInjectAdapter should only be used with Tracer.inject()"); + } + }); + } + + @Override + public void inject(RoutingContext routingContext) { + Tracer tracer = GlobalTracer.get(); + tracer.inject(span.context(), Format.Builtin.HTTP_HEADERS, new TextMap() { + @Override + public void put(String key, String value) { + routingContext.response().headers().add(key, value); + } + + @Override + public Iterator> iterator() { + throw new UnsupportedOperationException("TextMapInjectAdapter should only be used with Tracer.inject()"); + } + }); + } + + @Override + public void finish(int code) { + Tags.HTTP_STATUS.set(span, code); + span.finish(); + } + } +} diff --git a/src/main/java/io/strimzi/kafka/bridge/tracing/SpanHandle.java b/src/main/java/io/strimzi/kafka/bridge/tracing/SpanHandle.java new file mode 100644 index 000000000..c0cfdf092 --- /dev/null +++ b/src/main/java/io/strimzi/kafka/bridge/tracing/SpanHandle.java @@ -0,0 +1,35 @@ +/* + * Copyright Strimzi authors. + * License: Apache License 2.0 (see the file LICENSE or http://apache.org/licenses/LICENSE-2.0.html). + */ + +package io.strimzi.kafka.bridge.tracing; + +import io.vertx.ext.web.RoutingContext; +import io.vertx.kafka.client.producer.KafkaProducerRecord; + +/** + * Span handle, an abstraction over actual span implementation. + */ +public interface SpanHandle { + /** + * Inject tracing info into underlying span from Kafka producer record. + * + * @param record Kafka producer record to extract tracing info + */ + void inject(KafkaProducerRecord record); + + /** + * Inject tracing info into underlying span from Vert.x routing context. + * + * @param routingContext Vert.x routing context to extract tracing info + */ + void inject(RoutingContext routingContext); + + /** + * Finish underlying span. + * + * @param code response code + */ + void finish(int code); +} diff --git a/src/main/java/io/strimzi/kafka/bridge/tracing/TracingConstants.java b/src/main/java/io/strimzi/kafka/bridge/tracing/TracingConstants.java new file mode 100644 index 000000000..4e452c851 --- /dev/null +++ b/src/main/java/io/strimzi/kafka/bridge/tracing/TracingConstants.java @@ -0,0 +1,22 @@ +/* + * Copyright Strimzi authors. + * License: Apache License 2.0 (see the file LICENSE or http://apache.org/licenses/LICENSE-2.0.html). + */ + +package io.strimzi.kafka.bridge.tracing; + +/** + * Tracing constants. + */ +public final class TracingConstants { + public static final String COMPONENT = "strimzi-kafka-bridge"; + public static final String KAFKA_SERVICE = "kafka"; + + public static final String JAEGER = "jaeger"; + public static final String OPENTELEMETRY = "opentelemetry"; + + public static final String OPENTELEMETRY_SERVICE_NAME_ENV_KEY = "OTEL_SERVICE_NAME"; + public static final String OPENTELEMETRY_SERVICE_NAME_PROPERTY_KEY = "otel.service.name"; + public static final String OPENTELEMETRY_TRACES_EXPORTER_ENV_KEY = "OTEL_TRACES_EXPORTER"; + public static final String OPENTELEMETRY_TRACES_EXPORTER_PROPERTY_KEY = "otel.traces.exporter"; +} diff --git a/src/main/java/io/strimzi/kafka/bridge/tracing/TracingHandle.java b/src/main/java/io/strimzi/kafka/bridge/tracing/TracingHandle.java new file mode 100644 index 000000000..8234301b4 --- /dev/null +++ b/src/main/java/io/strimzi/kafka/bridge/tracing/TracingHandle.java @@ -0,0 +1,65 @@ +/* + * Copyright Strimzi authors. + * License: Apache License 2.0 (see the file LICENSE or http://apache.org/licenses/LICENSE-2.0.html). + */ + +package io.strimzi.kafka.bridge.tracing; + +import io.strimzi.kafka.bridge.config.BridgeConfig; +import io.vertx.ext.web.RoutingContext; +import io.vertx.kafka.client.consumer.KafkaConsumerRecord; + +import java.util.Properties; + +/** + * Simple interface to abstract tracing between legacy OpenTracing and new OpenTelemetry. + */ +public interface TracingHandle { + /** + * Tracing env var service name. + * + * @return tracing env var service name + */ + String envServiceName(); + + /** + * Extract service name from bridge confing. + * + * @param config the bridge config + * @return bridge's service name + */ + String serviceName(BridgeConfig config); + + /** + * Initialize tracing. + */ + void initialize(); + + /** + * Build span handle. + * + * @param key type + * @param value type + * @param routingContext Vert.x rounting context + * @param operationName current operation name + * @return span handle + */ + SpanHandle span(RoutingContext routingContext, String operationName); + + /** + * Extract span info from Kafka consumer record. + * + * @param key type + * @param value type + * @param parentSpanHandle parent span handle + * @param record Kafka consumer record + */ + void handleRecordSpan(SpanHandle parentSpanHandle, KafkaConsumerRecord record); + + /** + * Add producer properties, if any. + * + * @param props the properties + */ + void addTracingPropsToProducerConfig(Properties props); +} diff --git a/src/main/java/io/strimzi/kafka/bridge/tracing/TracingUtil.java b/src/main/java/io/strimzi/kafka/bridge/tracing/TracingUtil.java new file mode 100644 index 000000000..8e7746c4d --- /dev/null +++ b/src/main/java/io/strimzi/kafka/bridge/tracing/TracingUtil.java @@ -0,0 +1,78 @@ +/* + * Copyright Strimzi authors. + * License: Apache License 2.0 (see the file LICENSE or http://apache.org/licenses/LICENSE-2.0.html). + */ + +package io.strimzi.kafka.bridge.tracing; + +import io.strimzi.kafka.bridge.config.BridgeConfig; +import io.vertx.kafka.client.consumer.KafkaConsumerRecord; +import io.vertx.kafka.client.producer.KafkaHeader; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.HashMap; +import java.util.Map; +import java.util.Properties; + +import static io.strimzi.kafka.bridge.tracing.TracingConstants.JAEGER; +import static io.strimzi.kafka.bridge.tracing.TracingConstants.OPENTELEMETRY; + +/** + * Tracing util to hold app's Tracing instance. + */ +public class TracingUtil { + private static final Logger log = LoggerFactory.getLogger(TracingUtil.class); + private static TracingHandle tracing = new NoopTracingHandle(); + + public static TracingHandle getTracing() { + return tracing; + } + + public static void initialize(BridgeConfig config) { + String tracingConfig = config.getTracing(); + if (tracingConfig != null && (tracingConfig.equals(JAEGER) || tracingConfig.equals(OPENTELEMETRY))) { + boolean isOpenTelemetry = OPENTELEMETRY.equals(tracingConfig); + TracingHandle instance = isOpenTelemetry ? new OpenTelemetryHandle() : new OpenTracingHandle(); + + String serviceName = instance.serviceName(config); + if (serviceName != null) { + log.info( + "Initializing {} tracing config with service name {}", + isOpenTelemetry ? "OpenTelemetry" : "OpenTracing", + serviceName + ); + instance.initialize(); + tracing = instance; + } else { + log.error("Tracing config cannot be initialized because {} environment variable is not defined", instance.envServiceName()); + } + } + } + + /** + * We are interested in tracing headers here, + * which are unique - single value per key. + * + * @param record Kafka consumer record + * @param key type + * @param value type + * @return map of headers + */ + public static Map toHeaders(KafkaConsumerRecord record) { + Map headers = new HashMap<>(); + for (KafkaHeader header : record.headers()) { + headers.put(header.key(), header.value().toString()); + } + return headers; + } + + static void addProperty(Properties props, String key, String value) { + String previous = props.getProperty(key); + if (previous != null) { + props.setProperty(key, previous + "," + value); + } else { + props.setProperty(key, value); + } + } +} diff --git a/src/test/java/io/strimzi/kafka/bridge/tracing/OpenTelemetryTest.java b/src/test/java/io/strimzi/kafka/bridge/tracing/OpenTelemetryTest.java new file mode 100644 index 000000000..9956288a6 --- /dev/null +++ b/src/test/java/io/strimzi/kafka/bridge/tracing/OpenTelemetryTest.java @@ -0,0 +1,26 @@ +/* + * Copyright Strimzi authors. + * License: Apache License 2.0 (see the file LICENSE or http://apache.org/licenses/LICENSE-2.0.html). + */ + +package io.strimzi.kafka.bridge.tracing; + +import io.vertx.core.tracing.TracingOptions; +import io.vertx.tracing.opentelemetry.OpenTelemetryOptions; + +import static io.strimzi.kafka.bridge.tracing.TracingConstants.JAEGER; +import static io.strimzi.kafka.bridge.tracing.TracingConstants.OPENTELEMETRY_SERVICE_NAME_PROPERTY_KEY; +import static io.strimzi.kafka.bridge.tracing.TracingConstants.OPENTELEMETRY_TRACES_EXPORTER_PROPERTY_KEY; + +/** + * OpenTelemetry tests + */ +public class OpenTelemetryTest extends TracingTestBase { + @Override + protected TracingOptions tracingOptions() { + System.setProperty(OPENTELEMETRY_TRACES_EXPORTER_PROPERTY_KEY, JAEGER); + System.setProperty(OPENTELEMETRY_SERVICE_NAME_PROPERTY_KEY, "strimzi-kafka-bridge-test"); + System.setProperty("otel.metrics.exporter", "none"); // disable metrics + return new OpenTelemetryOptions(); + } +} diff --git a/src/test/java/io/strimzi/kafka/bridge/tracing/OpenTracingTest.java b/src/test/java/io/strimzi/kafka/bridge/tracing/OpenTracingTest.java new file mode 100644 index 000000000..584fa6364 --- /dev/null +++ b/src/test/java/io/strimzi/kafka/bridge/tracing/OpenTracingTest.java @@ -0,0 +1,25 @@ +/* + * Copyright Strimzi authors. + * License: Apache License 2.0 (see the file LICENSE or http://apache.org/licenses/LICENSE-2.0.html). + */ + +package io.strimzi.kafka.bridge.tracing; + +import io.vertx.core.tracing.TracingOptions; +import io.vertx.tracing.opentracing.OpenTracingOptions; + +/** + * OpenTracing tests + * + * These env vars need to be set: + * * JAEGER_SERVICE_NAME=ot_kafka_bridge_test + * * JAEGER_SAMPLER_TYPE=const + * * JAEGER_SAMPLER_PARAM=1 + */ +public class OpenTracingTest extends TracingTestBase { + @Override + protected TracingOptions tracingOptions() { + System.setProperty("otel.metrics.exporter", "none"); // disable OTel metrics -- they slip in via ServiceLoader + return new OpenTracingOptions(); + } +} diff --git a/src/test/java/io/strimzi/kafka/bridge/tracing/TracingTestBase.java b/src/test/java/io/strimzi/kafka/bridge/tracing/TracingTestBase.java new file mode 100644 index 000000000..179344ef1 --- /dev/null +++ b/src/test/java/io/strimzi/kafka/bridge/tracing/TracingTestBase.java @@ -0,0 +1,141 @@ +/* + * Copyright Strimzi authors. + * License: Apache License 2.0 (see the file LICENSE or http://apache.org/licenses/LICENSE-2.0.html). + */ + +package io.strimzi.kafka.bridge.tracing; + +import io.netty.handler.codec.http.HttpResponseStatus; +import io.strimzi.kafka.bridge.BridgeContentType; +import io.strimzi.kafka.bridge.http.services.ConsumerService; +import io.strimzi.kafka.bridge.http.services.ProducerService; +import io.strimzi.kafka.bridge.utils.Urls; +import io.vertx.core.AsyncResult; +import io.vertx.core.Handler; +import io.vertx.core.Vertx; +import io.vertx.core.VertxOptions; +import io.vertx.core.json.JsonArray; +import io.vertx.core.json.JsonObject; +import io.vertx.core.tracing.TracingOptions; +import io.vertx.core.tracing.TracingPolicy; +import io.vertx.ext.web.client.HttpResponse; +import io.vertx.ext.web.client.WebClient; +import io.vertx.ext.web.client.WebClientOptions; +import io.vertx.ext.web.codec.BodyCodec; +import io.vertx.junit5.VertxExtension; +import io.vertx.junit5.VertxTestContext; +import org.hamcrest.CoreMatchers; +import org.junit.jupiter.api.Assumptions; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.net.URL; +import java.util.UUID; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.TimeUnit; + +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.is; + +/** + * Base for OpenTracing and OpenTelemetry (manual) tests. + *

+ * Test will only run if the bridge AND tracing server are up-n-running. + */ +@ExtendWith(VertxExtension.class) +public abstract class TracingTestBase { + Logger log = LoggerFactory.getLogger(getClass()); + + private void assumeServer(String url) { + try { + new URL(url).openConnection().getInputStream(); + } catch (Exception e) { + log.info("Cannot connect to server", e); + Assumptions.assumeTrue(false, "Server is not running: " + url); + } + } + + Handler>> verifyOK(VertxTestContext context) { + return ar -> { + context.verify(() -> { + assertThat(ar.succeeded(), is(true)); + HttpResponse response = ar.result(); + assertThat(response.statusCode(), is(HttpResponseStatus.OK.code())); + }); + context.completeNow(); + }; + } + + @BeforeEach + public void setUp() { + assumeServer(String.format("http://%s:%s", Urls.BRIDGE_HOST, Urls.BRIDGE_PORT)); // bridge + assumeServer("http://localhost:16686"); // jaeger + } + + protected abstract TracingOptions tracingOptions(); + + @Test + public void testSmoke(VertxTestContext context) throws Exception { + Vertx vertx = Vertx.vertx(new VertxOptions().setTracingOptions(tracingOptions())); + + WebClient client = WebClient.create(vertx, new WebClientOptions() + .setDefaultHost(Urls.BRIDGE_HOST) + .setDefaultPort(Urls.BRIDGE_PORT) + .setTracingPolicy(TracingPolicy.ALWAYS) + ); + + String value = "message-value"; + + JsonArray records = new JsonArray(); + JsonObject json = new JsonObject(); + json.put("value", value); + records.add(json); + + JsonObject root = new JsonObject(); + root.put("records", records); + + String topicName = "mytopic"; + + ProducerService.getInstance(client) + .sendRecordsRequest(topicName, root, BridgeContentType.KAFKA_JSON_JSON) + .sendJsonObject(root, verifyOK(context)); + + ConsumerService consumerService = ConsumerService.getInstance(client); + + // create consumer + // subscribe to a topic + + String consumerName = "my-consumer"; + String groupId = UUID.randomUUID().toString(); + + JsonObject consumerJson = new JsonObject() + .put("name", consumerName) + .put("format", "json"); + + consumerService + .createConsumer(context, groupId, consumerJson) + .subscribeConsumer(context, groupId, consumerName, topicName); + + CompletableFuture consume = new CompletableFuture<>(); + // consume records + consumerService + .consumeRecordsRequest(groupId, consumerName, BridgeContentType.KAFKA_JSON_JSON) + .as(BodyCodec.jsonArray()) + .send(ar -> { + context.verify(() -> { + assertThat(ar.succeeded(), CoreMatchers.is(true)); + HttpResponse response = ar.result(); + assertThat(response.statusCode(), CoreMatchers.is(HttpResponseStatus.OK.code())); + }); + consume.complete(true); + }); + + consume.get(60, TimeUnit.SECONDS); + + // consumer deletion + consumerService.deleteConsumer(context, groupId, consumerName); + } +}