From f271e8976bc6340e7617f59aa6a65ddfd90b9135 Mon Sep 17 00:00:00 2001 From: Ales Justin Date: Wed, 25 Aug 2021 16:35:04 +0200 Subject: [PATCH 01/14] Add OpenTelemetry support. Signed-off-by: Ales Justin --- bin/kafka_bridge_run.sh | 5 + config/application.properties | 6 +- pom.xml | 61 +++++ .../io/strimzi/kafka/bridge/Application.java | 46 ++-- .../strimzi/kafka/bridge/MetricsReporter.java | 16 +- .../kafka/bridge/SinkBridgeEndpoint.java | 13 +- .../kafka/bridge/SourceBridgeEndpoint.java | 9 +- .../kafka/bridge/config/BridgeConfig.java | 9 + .../bridge/converter/DefaultDeserializer.java | 3 +- .../bridge/converter/DefaultSerializer.java | 5 +- .../bridge/http/HttpSinkBridgeEndpoint.java | 46 +--- .../bridge/http/HttpSourceBridgeEndpoint.java | 52 +---- .../kafka/bridge/http/HttpTracingUtils.java | 24 -- .../bridge/tracing/OpenTelemetryHandle.java | 214 ++++++++++++++++++ .../bridge/tracing/OpenTracingHandle.java | 190 ++++++++++++++++ .../bridge/tracing/SpanBuilderHandle.java | 18 ++ .../kafka/bridge/tracing/SpanHandle.java | 18 ++ .../bridge/tracing/TracingConstants.java | 24 ++ .../kafka/bridge/tracing/TracingHandle.java | 26 +++ .../kafka/bridge/tracing/TracingUtil.java | 110 +++++++++ .../bridge/tracing/OpenTelemetryTest.java | 25 ++ .../kafka/bridge/tracing/TracingTestBase.java | 98 ++++++++ 22 files changed, 871 insertions(+), 147 deletions(-) delete mode 100644 src/main/java/io/strimzi/kafka/bridge/http/HttpTracingUtils.java create mode 100644 src/main/java/io/strimzi/kafka/bridge/tracing/OpenTelemetryHandle.java create mode 100644 src/main/java/io/strimzi/kafka/bridge/tracing/OpenTracingHandle.java create mode 100644 src/main/java/io/strimzi/kafka/bridge/tracing/SpanBuilderHandle.java create mode 100644 src/main/java/io/strimzi/kafka/bridge/tracing/SpanHandle.java create mode 100644 src/main/java/io/strimzi/kafka/bridge/tracing/TracingConstants.java create mode 100644 src/main/java/io/strimzi/kafka/bridge/tracing/TracingHandle.java create mode 100644 src/main/java/io/strimzi/kafka/bridge/tracing/TracingUtil.java create mode 100644 src/test/java/io/strimzi/kafka/bridge/tracing/OpenTelemetryTest.java create mode 100644 src/test/java/io/strimzi/kafka/bridge/tracing/TracingTestBase.java diff --git a/bin/kafka_bridge_run.sh b/bin/kafka_bridge_run.sh index 3b57cd720..e010ac016 100755 --- a/bin/kafka_bridge_run.sh +++ b/bin/kafka_bridge_run.sh @@ -13,4 +13,9 @@ fi # Make sure that we use /dev/urandom JAVA_OPTS="${JAVA_OPTS} -Dvertx.cacheDirBase=/tmp/vertx-cache -Djava.security.egd=file:/dev/./urandom" +# enabling OpenTelemetry with Jaeger by default +if [ -n "$OTEL_SERVICE_NAME" ] && [ -z "$OTEL_TRACES_EXPORTER" ]; then + export OTEL_TRACES_EXPORTER="jaeger" +fi + exec java $JAVA_OPTS $KAFKA_BRIDGE_LOG4J_OPTS -classpath "${MYPATH}/../libs/*" io.strimzi.kafka.bridge.Application "$@" \ No newline at end of file diff --git a/config/application.properties b/config/application.properties index 9bfed2bbe..a5b4c9e1a 100644 --- a/config/application.properties +++ b/config/application.properties @@ -1,7 +1,11 @@ #Bridge related settings bridge.id=my-bridge -# uncomment the following line to enable Jaeger tracing, check the documentation how to configure the tracer +# uncomment one the following lines (bridge.tracing) to enable Jaeger tracing, check the documentation how to configure the tracer +# OpenTracing support #bridge.tracing=jaeger +# OpenTelemetry support +#bridge.tracing=opentelemetry +#bridge.tracing.service-name=strimzi-kafka-bridge #Apache Kafka common kafka.bootstrap.servers=localhost:9092 diff --git a/pom.xml b/pom.xml index 7480449b4..c54add31e 100644 --- a/pom.xml +++ b/pom.xml @@ -110,6 +110,9 @@ 1.8.1 0.33.0 0.1.15 + 1.9.0-alpha + 1.9.0 + 1.44.0 1.3.9 0.12.0 0.7.0 @@ -267,6 +270,52 @@ opentracing-kafka-client ${opentracing-kafka-client.version} + + io.opentelemetry + opentelemetry-sdk-extension-autoconfigure + ${opentelemetry.version} + + + io.opentelemetry + opentelemetry-api + ${opentelemetry-stable.version} + + + io.opentelemetry + opentelemetry-context + ${opentelemetry-stable.version} + + + io.opentelemetry + opentelemetry-semconv + ${opentelemetry.version} + + + io.opentelemetry + opentelemetry-sdk-trace + ${opentelemetry-stable.version} + + + io.opentelemetry.instrumentation + opentelemetry-kafka-clients-2.6 + ${opentelemetry.version} + + + io.opentelemetry.instrumentation + opentelemetry-kafka-clients-common + ${opentelemetry.version} + + + io.opentelemetry + opentelemetry-exporter-jaeger + ${opentelemetry-stable.version} + + + + io.grpc + grpc-netty-shaded + ${grpc.version} + io.micrometer micrometer-core @@ -349,6 +398,12 @@ ${vertx.version} test + + io.vertx + vertx-opentelemetry + ${vertx.version} + test + org.mockito mockito-core @@ -498,6 +553,12 @@ org.apache.logging.log4j:log4j-slf4j-impl io.jaegertracing:jaeger-client org.yaml:snakeyaml + org.apache.tomcat.embed:tomcat-embed-core + + io.opentelemetry:opentelemetry-sdk-trace + io.opentelemetry.instrumentation:opentelemetry-kafka-clients-common + io.opentelemetry:opentelemetry-exporter-jaeger + io.grpc:grpc-netty-shaded diff --git a/src/main/java/io/strimzi/kafka/bridge/Application.java b/src/main/java/io/strimzi/kafka/bridge/Application.java index 9f3934e0e..9acae9edc 100644 --- a/src/main/java/io/strimzi/kafka/bridge/Application.java +++ b/src/main/java/io/strimzi/kafka/bridge/Application.java @@ -5,13 +5,11 @@ package io.strimzi.kafka.bridge; -import io.jaegertracing.Configuration; import io.micrometer.core.instrument.MeterRegistry; -import io.opentracing.Tracer; -import io.opentracing.util.GlobalTracer; import io.strimzi.kafka.bridge.amqp.AmqpBridge; import io.strimzi.kafka.bridge.config.BridgeConfig; import io.strimzi.kafka.bridge.http.HttpBridge; +import io.strimzi.kafka.bridge.tracing.TracingUtil; import io.vertx.config.ConfigRetriever; import io.vertx.config.ConfigRetrieverOptions; import io.vertx.config.ConfigStoreOptions; @@ -30,6 +28,7 @@ import org.apache.commons.cli.DefaultParser; import org.apache.commons.cli.Option; import org.apache.commons.cli.Options; +import org.apache.commons.cli.ParseException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -67,7 +66,7 @@ public static void main(String[] args) { try { VertxOptions vertxOptions = new VertxOptions(); JmxCollectorRegistry jmxCollectorRegistry = null; - if (Boolean.valueOf(System.getenv(KAFKA_BRIDGE_METRICS_ENABLED))) { + if (Boolean.parseBoolean(System.getenv(KAFKA_BRIDGE_METRICS_ENABLED))) { log.info("Metrics enabled and exposed on the /metrics endpoint"); // setup Micrometer metrics options vertxOptions.setMetricsOptions(metricsOptions()); @@ -82,17 +81,17 @@ public static void main(String[] args) { CommandLine commandLine = new DefaultParser().parse(generateOptions(), args); ConfigStoreOptions fileStore = new ConfigStoreOptions() - .setType("file") - .setFormat("properties") - .setConfig(new JsonObject().put("path", absoluteFilePath(commandLine.getOptionValue("config-file"))).put("raw-data", true)); + .setType("file") + .setFormat("properties") + .setConfig(new JsonObject().put("path", absoluteFilePath(commandLine.getOptionValue("config-file"))).put("raw-data", true)); ConfigStoreOptions envStore = new ConfigStoreOptions() - .setType("env") - .setConfig(new JsonObject().put("raw-data", true)); + .setType("env") + .setConfig(new JsonObject().put("raw-data", true)); ConfigRetrieverOptions options = new ConfigRetrieverOptions() - .addStore(fileStore) - .addStore(envStore); + .addStore(fileStore) + .addStore(envStore); ConfigRetriever retriever = ConfigRetriever.create(vertx, options); retriever.getConfig(ar -> { @@ -126,23 +125,16 @@ public static void main(String[] args) { } } + // register Jaeger tracer - if set, etc + TracingUtil.initialize(bridgeConfig); + // when HTTP protocol is enabled, it handles healthy/ready/metrics endpoints as well, // so no need for a standalone embedded HTTP server if (!bridgeConfig.getHttpConfig().isEnabled()) { EmbeddedHttpServer embeddedHttpServer = - new EmbeddedHttpServer(vertx, healthChecker, metricsReporter, embeddedHttpServerPort); + new EmbeddedHttpServer(vertx, healthChecker, metricsReporter, embeddedHttpServerPort); embeddedHttpServer.start(); } - - // register OpenTracing Jaeger tracer - if ("jaeger".equals(bridgeConfig.getTracing())) { - if (config.get(Configuration.JAEGER_SERVICE_NAME) != null) { - Tracer tracer = Configuration.fromEnv().getTracer(); - GlobalTracer.registerIfAbsent(tracer); - } else { - log.error("Jaeger tracing cannot be initialized because {} environment variable is not defined", Configuration.JAEGER_SERVICE_NAME); - } - } } }); } else { @@ -150,19 +142,19 @@ public static void main(String[] args) { System.exit(1); } }); - } catch (Exception ex) { - log.error("Error starting the bridge", ex); + } catch (RuntimeException | MalformedObjectNameException | IOException | ParseException e) { + log.error("Error starting the bridge", e); System.exit(1); } } /** * Set up the Vert.x metrics options - * + * * @return instance of the MicrometerMetricsOptions on Vert.x */ private static MicrometerMetricsOptions metricsOptions() { - Set set = new HashSet(); + Set set = new HashSet<>(); set.add(MetricsDomain.NAMED_POOLS.name()); set.add(MetricsDomain.VERTICLES.name()); return new MicrometerMetricsOptions() @@ -218,7 +210,7 @@ private static Future deployHttpBridge(Vertx vertx, BridgeConfig bri if (bridgeConfig.getHttpConfig().isEnabled()) { HttpBridge httpBridge = new HttpBridge(bridgeConfig, metricsReporter); - + vertx.deployVerticle(httpBridge, done -> { if (done.succeeded()) { log.info("HTTP verticle instance deployed [{}]", done.result()); diff --git a/src/main/java/io/strimzi/kafka/bridge/MetricsReporter.java b/src/main/java/io/strimzi/kafka/bridge/MetricsReporter.java index 4e8e33bad..ab8e2f315 100644 --- a/src/main/java/io/strimzi/kafka/bridge/MetricsReporter.java +++ b/src/main/java/io/strimzi/kafka/bridge/MetricsReporter.java @@ -28,13 +28,15 @@ public MetricsReporter(JmxCollectorRegistry jmxCollectorRegistry, MeterRegistry this.jmxCollectorRegistry = jmxCollectorRegistry; this.meterRegistry = meterRegistry; if (this.meterRegistry instanceof PrometheusMeterRegistry) { - this.meterRegistry.config().namingConvention(new PrometheusNamingConvention() { - @Override - public String name(String name, Meter.Type type, String baseUnit) { - String metricName = name.startsWith("vertx.") ? name.replace("vertx.", "strimzi.bridge.") : name; - return super.name(metricName, type, baseUnit); - } - }); + this.meterRegistry.config().namingConvention(new MetricsNamingConvention()); + } + } + + private static class MetricsNamingConvention extends PrometheusNamingConvention { + @Override + public String name(String name, Meter.Type type, String baseUnit) { + String metricName = name.startsWith("vertx.") ? name.replace("vertx.", "strimzi.bridge.") : name; + return super.name(metricName, type, baseUnit); } } diff --git a/src/main/java/io/strimzi/kafka/bridge/SinkBridgeEndpoint.java b/src/main/java/io/strimzi/kafka/bridge/SinkBridgeEndpoint.java index 481ed2d95..4dae9892f 100644 --- a/src/main/java/io/strimzi/kafka/bridge/SinkBridgeEndpoint.java +++ b/src/main/java/io/strimzi/kafka/bridge/SinkBridgeEndpoint.java @@ -5,9 +5,10 @@ package io.strimzi.kafka.bridge; -import io.opentracing.contrib.kafka.TracingConsumerInterceptor; import io.strimzi.kafka.bridge.config.BridgeConfig; import io.strimzi.kafka.bridge.config.KafkaConfig; +import io.strimzi.kafka.bridge.tracing.TracingHandle; +import io.strimzi.kafka.bridge.tracing.TracingUtil; import io.strimzi.kafka.bridge.tracker.OffsetTracker; import io.vertx.core.AsyncResult; import io.vertx.core.CompositeFuture; @@ -167,9 +168,9 @@ protected void initConsumer(boolean shouldAttachBatchHandler, Properties config) props.putAll(kafkaConfig.getConfig()); props.putAll(kafkaConfig.getConsumerConfig().getConfig()); props.put(ConsumerConfig.GROUP_ID_CONFIG, this.groupId); - if (this.bridgeConfig.getTracing() != null) { - props.put(ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG, TracingConsumerInterceptor.class.getName()); - } + + TracingHandle tracing = TracingUtil.getTracing(); + tracing.kafkaConsumerConfig(props); if (config != null) props.putAll(config); @@ -200,7 +201,7 @@ protected void subscribe(boolean shouldAttachHandler) { this.subscribed = true; this.setPartitionsAssignmentHandlers(); - Set topics = this.topicSubscriptions.stream().map(ts -> ts.getTopic()).collect(Collectors.toSet()); + Set topics = this.topicSubscriptions.stream().map(SinkTopicSubscription::getTopic).collect(Collectors.toSet()); this.consumer.subscribe(topics, this::subscribeHandler); } @@ -710,7 +711,7 @@ protected void consume(Handler>> consumeH this.consumer.poll(Duration.ofMillis(this.pollTimeOut), consumeHandler); } - protected void commit(Map offsetsData, + protected void commit(Map offsetsData, Handler>> commitOffsetsHandler) { this.consumer.commit(offsetsData, commitOffsetsHandler); } diff --git a/src/main/java/io/strimzi/kafka/bridge/SourceBridgeEndpoint.java b/src/main/java/io/strimzi/kafka/bridge/SourceBridgeEndpoint.java index 3d8595605..00fc231cc 100644 --- a/src/main/java/io/strimzi/kafka/bridge/SourceBridgeEndpoint.java +++ b/src/main/java/io/strimzi/kafka/bridge/SourceBridgeEndpoint.java @@ -5,9 +5,10 @@ package io.strimzi.kafka.bridge; -import io.opentracing.contrib.kafka.TracingProducerInterceptor; import io.strimzi.kafka.bridge.config.BridgeConfig; import io.strimzi.kafka.bridge.config.KafkaConfig; +import io.strimzi.kafka.bridge.tracing.TracingHandle; +import io.strimzi.kafka.bridge.tracing.TracingUtil; import io.vertx.core.AsyncResult; import io.vertx.core.Handler; import io.vertx.core.Vertx; @@ -103,9 +104,9 @@ public void open() { Properties props = new Properties(); props.putAll(kafkaConfig.getConfig()); props.putAll(kafkaConfig.getProducerConfig().getConfig()); - if (this.bridgeConfig.getTracing() != null) { - props.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, TracingProducerInterceptor.class.getName()); - } + + TracingHandle tracing = TracingUtil.getTracing(); + tracing.kafkaProducerConfig(props); this.producerUnsettledMode = KafkaProducer.create(this.vertx, props, this.keySerializer, this.valueSerializer); diff --git a/src/main/java/io/strimzi/kafka/bridge/config/BridgeConfig.java b/src/main/java/io/strimzi/kafka/bridge/config/BridgeConfig.java index b1c08166c..ab5a56a94 100644 --- a/src/main/java/io/strimzi/kafka/bridge/config/BridgeConfig.java +++ b/src/main/java/io/strimzi/kafka/bridge/config/BridgeConfig.java @@ -20,6 +20,7 @@ public class BridgeConfig extends AbstractConfig { public static final String BRIDGE_ID = BRIDGE_CONFIG_PREFIX + "id"; public static final String TRACING_TYPE = BRIDGE_CONFIG_PREFIX + "tracing"; + public static final String TRACING_SERVICE_NAME_TYPE = TRACING_TYPE + ".service-name"; private KafkaConfig kafkaConfig; private AmqpConfig amqpConfig; @@ -103,4 +104,12 @@ public String getTracing() { return config.get(BridgeConfig.TRACING_TYPE).toString(); } } + + public String getTracingServiceName() { + if (config.get(BridgeConfig.TRACING_SERVICE_NAME_TYPE) == null) { + return null; + } else { + return config.get(BridgeConfig.TRACING_SERVICE_NAME_TYPE).toString(); + } + } } diff --git a/src/main/java/io/strimzi/kafka/bridge/converter/DefaultDeserializer.java b/src/main/java/io/strimzi/kafka/bridge/converter/DefaultDeserializer.java index 3216bfc9f..04314f5bf 100644 --- a/src/main/java/io/strimzi/kafka/bridge/converter/DefaultDeserializer.java +++ b/src/main/java/io/strimzi/kafka/bridge/converter/DefaultDeserializer.java @@ -8,6 +8,7 @@ import org.apache.kafka.common.serialization.Deserializer; import java.io.ByteArrayInputStream; +import java.io.IOException; import java.io.ObjectInputStream; import java.util.Map; @@ -26,7 +27,7 @@ public T deserialize(String topic, byte[] data) { try (ByteArrayInputStream b = new ByteArrayInputStream(data); ObjectInputStream o = new ObjectInputStream(b)) { return (T) o.readObject(); - } catch (Exception e) { + } catch (IOException | ClassNotFoundException e) { throw new SerializationException("Error when deserializing", e); } } diff --git a/src/main/java/io/strimzi/kafka/bridge/converter/DefaultSerializer.java b/src/main/java/io/strimzi/kafka/bridge/converter/DefaultSerializer.java index e6c62cc45..3e53809f7 100644 --- a/src/main/java/io/strimzi/kafka/bridge/converter/DefaultSerializer.java +++ b/src/main/java/io/strimzi/kafka/bridge/converter/DefaultSerializer.java @@ -10,6 +10,7 @@ import org.apache.kafka.common.serialization.Serializer; import java.io.ByteArrayOutputStream; +import java.io.IOException; import java.io.ObjectOutputStream; import java.util.Map; @@ -20,7 +21,7 @@ public void configure(Map configs, boolean isKey) { } - @SuppressFBWarnings("PZLA_PREFER_ZERO_LENGTH_ARRAYS") + @SuppressFBWarnings({"PZLA_PREFER_ZERO_LENGTH_ARRAYS"}) @Override public byte[] serialize(String topic, T data) { @@ -30,7 +31,7 @@ public byte[] serialize(String topic, T data) { try (ByteArrayOutputStream b = new ByteArrayOutputStream(); ObjectOutputStream o = new ObjectOutputStream(b)) { o.writeObject(data); return b.toByteArray(); - } catch (Exception e) { + } catch (IOException e) { throw new SerializationException("Error when serializing", e); } } diff --git a/src/main/java/io/strimzi/kafka/bridge/http/HttpSinkBridgeEndpoint.java b/src/main/java/io/strimzi/kafka/bridge/http/HttpSinkBridgeEndpoint.java index 515b7041a..41aca9eb0 100644 --- a/src/main/java/io/strimzi/kafka/bridge/http/HttpSinkBridgeEndpoint.java +++ b/src/main/java/io/strimzi/kafka/bridge/http/HttpSinkBridgeEndpoint.java @@ -6,20 +6,10 @@ package io.strimzi.kafka.bridge.http; import io.netty.handler.codec.http.HttpResponseStatus; -import io.opentracing.References; -import io.opentracing.Span; -import io.opentracing.SpanContext; -import io.opentracing.Tracer; -import io.opentracing.Tracer.SpanBuilder; -import io.opentracing.propagation.Format; -import io.opentracing.propagation.TextMap; -import io.opentracing.propagation.TextMapAdapter; -import io.opentracing.tag.Tags; -import io.opentracing.util.GlobalTracer; import io.strimzi.kafka.bridge.BridgeContentType; +import io.strimzi.kafka.bridge.ConsumerInstanceId; import io.strimzi.kafka.bridge.EmbeddedFormat; import io.strimzi.kafka.bridge.Endpoint; -import io.strimzi.kafka.bridge.ConsumerInstanceId; import io.strimzi.kafka.bridge.SinkBridgeEndpoint; import io.strimzi.kafka.bridge.SinkTopicSubscription; import io.strimzi.kafka.bridge.config.BridgeConfig; @@ -27,6 +17,10 @@ import io.strimzi.kafka.bridge.http.converter.HttpBinaryMessageConverter; import io.strimzi.kafka.bridge.http.converter.HttpJsonMessageConverter; import io.strimzi.kafka.bridge.http.model.HttpBridgeError; +import io.strimzi.kafka.bridge.tracing.SpanBuilderHandle; +import io.strimzi.kafka.bridge.tracing.SpanHandle; +import io.strimzi.kafka.bridge.tracing.TracingHandle; +import io.strimzi.kafka.bridge.tracing.TracingUtil; import io.vertx.core.AsyncResult; import io.vertx.core.CompositeFuture; import io.vertx.core.Future; @@ -42,13 +36,11 @@ import io.vertx.kafka.client.consumer.KafkaConsumerRecord; import io.vertx.kafka.client.consumer.OffsetAndMetadata; import io.vertx.kafka.client.producer.KafkaHeader; - import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.common.serialization.Deserializer; import java.util.ArrayList; import java.util.HashMap; -import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.Properties; @@ -250,6 +242,7 @@ private void doDeleteConsumer(RoutingContext routingContext) { HttpUtils.sendResponse(routingContext, HttpResponseStatus.NO_CONTENT.code(), null, null); } + @SuppressWarnings("checkstyle:NPathComplexity") // tracing abstraction adds new npath complexity ... private void doPoll(RoutingContext routingContext) { if (topicSubscriptionsPattern == null && topicSubscriptions.isEmpty()) { HttpBridgeError error = new HttpBridgeError( @@ -277,9 +270,9 @@ private void doPoll(RoutingContext routingContext) { this.consume(records -> { if (records.succeeded()) { - Tracer tracer = GlobalTracer.get(); + TracingHandle tracing = TracingUtil.getTracing(); + SpanBuilderHandle builder = tracing.builder(routingContext, HttpOpenApiOperations.POLL.toString()); - SpanBuilder spanBuilder = tracer.buildSpan(HttpOpenApiOperations.POLL.toString()); for (int i = 0; i < records.result().size(); i++) { KafkaConsumerRecord record = records.result().recordAt(i); @@ -288,25 +281,11 @@ private void doPoll(RoutingContext routingContext) { headers.put(header.key(), header.value().toString()); } - SpanContext parentSpan = tracer.extract(Format.Builtin.HTTP_HEADERS, new TextMapAdapter(headers)); - if (parentSpan != null) { - spanBuilder.addReference(References.FOLLOWS_FROM, parentSpan); - } + builder.addRef(headers); } - Span span = spanBuilder.withTag(Tags.SPAN_KIND.getKey(), Tags.SPAN_KIND_SERVER).start(); - HttpTracingUtils.setCommonTags(span, routingContext); + SpanHandle span = builder.span(routingContext); - tracer.inject(span.context(), Format.Builtin.TEXT_MAP, new TextMap() { - @Override - public void put(String key, String value) { - routingContext.response().headers().add(key, value); - } - - @Override - public Iterator> iterator() { - throw new UnsupportedOperationException("TextMapInjectAdapter should only be used with Tracer.inject()"); - } - }); + span.inject(routingContext); HttpResponseStatus responseStatus; try { @@ -335,8 +314,7 @@ public Iterator> iterator() { HttpUtils.sendResponse(routingContext, responseStatus.code(), BridgeContentType.KAFKA_JSON, error.toJson().toBuffer()); } - Tags.HTTP_STATUS.set(span, responseStatus.code()); - span.finish(); + span.finish(responseStatus.code()); } else { HttpBridgeError error = new HttpBridgeError( diff --git a/src/main/java/io/strimzi/kafka/bridge/http/HttpSourceBridgeEndpoint.java b/src/main/java/io/strimzi/kafka/bridge/http/HttpSourceBridgeEndpoint.java index 02928e308..498ca5461 100644 --- a/src/main/java/io/strimzi/kafka/bridge/http/HttpSourceBridgeEndpoint.java +++ b/src/main/java/io/strimzi/kafka/bridge/http/HttpSourceBridgeEndpoint.java @@ -6,15 +6,6 @@ package io.strimzi.kafka.bridge.http; import io.netty.handler.codec.http.HttpResponseStatus; -import io.opentracing.Span; -import io.opentracing.SpanContext; -import io.opentracing.Tracer; -import io.opentracing.Tracer.SpanBuilder; -import io.opentracing.propagation.Format; -import io.opentracing.propagation.TextMap; -import io.opentracing.propagation.TextMapAdapter; -import io.opentracing.tag.Tags; -import io.opentracing.util.GlobalTracer; import io.strimzi.kafka.bridge.BridgeContentType; import io.strimzi.kafka.bridge.EmbeddedFormat; import io.strimzi.kafka.bridge.Endpoint; @@ -25,6 +16,9 @@ import io.strimzi.kafka.bridge.http.converter.HttpJsonMessageConverter; import io.strimzi.kafka.bridge.http.model.HttpBridgeError; import io.strimzi.kafka.bridge.http.model.HttpBridgeResult; +import io.strimzi.kafka.bridge.tracing.SpanHandle; +import io.strimzi.kafka.bridge.tracing.TracingHandle; +import io.strimzi.kafka.bridge.tracing.TracingUtil; import io.vertx.core.CompositeFuture; import io.vertx.core.Future; import io.vertx.core.Handler; @@ -42,7 +36,6 @@ import java.util.ArrayList; import java.util.HashMap; -import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.UUID; @@ -96,8 +89,6 @@ public void handle(Endpoint endpoint) { boolean isAsync = Boolean.parseBoolean(routingContext.queryParams().get("async")); - Tracer tracer = GlobalTracer.get(); - MultiMap httpHeaders = routingContext.request().headers(); Map headers = new HashMap<>(); for (Entry header: httpHeaders.entries()) { @@ -105,15 +96,9 @@ public void handle(Endpoint endpoint) { } String operationName = partition == null ? HttpOpenApiOperations.SEND.toString() : HttpOpenApiOperations.SEND_TO_PARTITION.toString(); - SpanBuilder spanBuilder; - SpanContext parentSpan = tracer.extract(Format.Builtin.HTTP_HEADERS, new TextMapAdapter(headers)); - if (parentSpan == null) { - spanBuilder = tracer.buildSpan(operationName); - } else { - spanBuilder = tracer.buildSpan(operationName).asChildOf(parentSpan); - } - Span span = spanBuilder.withTag(Tags.SPAN_KIND.getKey(), Tags.SPAN_KIND_SERVER).start(); - HttpTracingUtils.setCommonTags(span, routingContext); + + TracingHandle tracing = TracingUtil.getTracing(); + SpanHandle span = tracing.span(routingContext, operationName); try { if (messageConverter == null) { @@ -122,24 +107,13 @@ public void handle(Endpoint endpoint) { HttpUtils.sendResponse(routingContext, HttpResponseStatus.INTERNAL_SERVER_ERROR.code(), BridgeContentType.KAFKA_JSON, error.toJson().toBuffer()); - Tags.HTTP_STATUS.set(span, HttpResponseStatus.INTERNAL_SERVER_ERROR.code()); - span.finish(); + span.finish(HttpResponseStatus.INTERNAL_SERVER_ERROR.code()); return; } records = messageConverter.toKafkaRecords(topic, partition, routingContext.body().buffer()); for (KafkaProducerRecord record :records) { - tracer.inject(span.context(), Format.Builtin.TEXT_MAP, new TextMap() { - @Override - public void put(String key, String value) { - record.addHeader(key, value); - } - - @Override - public Iterator> iterator() { - throw new UnsupportedOperationException("TextMapInjectAdapter should only be used with Tracer.inject()"); - } - }); + span.inject(record); } } catch (Exception e) { HttpBridgeError error = new HttpBridgeError( @@ -148,8 +122,7 @@ public Iterator> iterator() { HttpUtils.sendResponse(routingContext, HttpResponseStatus.UNPROCESSABLE_ENTITY.code(), BridgeContentType.KAFKA_JSON, error.toJson().toBuffer()); - Tags.HTTP_STATUS.set(span, HttpResponseStatus.UNPROCESSABLE_ENTITY.code()); - span.finish(); + span.finish(HttpResponseStatus.UNPROCESSABLE_ENTITY.code()); return; } List> results = new ArrayList<>(records.size()); @@ -158,8 +131,7 @@ public Iterator> iterator() { if (isAsync) { // if async is specified, return immediately once records are sent this.sendAsyncRecords(records); - Tags.HTTP_STATUS.set(span, HttpResponseStatus.NO_CONTENT.code()); - span.finish(); + span.finish(HttpResponseStatus.NO_CONTENT.code()); HttpUtils.sendResponse(routingContext, HttpResponseStatus.NO_CONTENT.code(), BridgeContentType.KAFKA_JSON, null); this.maybeClose(); @@ -190,9 +162,7 @@ public Iterator> iterator() { results.add(new HttpBridgeResult<>(new HttpBridgeError(code, msg))); } } - - Tags.HTTP_STATUS.set(span, HttpResponseStatus.OK.code()); - span.finish(); + span.finish(HttpResponseStatus.OK.code()); HttpUtils.sendResponse(routingContext, HttpResponseStatus.OK.code(), BridgeContentType.KAFKA_JSON, buildOffsets(results).toBuffer()); this.maybeClose(); diff --git a/src/main/java/io/strimzi/kafka/bridge/http/HttpTracingUtils.java b/src/main/java/io/strimzi/kafka/bridge/http/HttpTracingUtils.java deleted file mode 100644 index 64d481178..000000000 --- a/src/main/java/io/strimzi/kafka/bridge/http/HttpTracingUtils.java +++ /dev/null @@ -1,24 +0,0 @@ -/* - * Copyright Strimzi authors. - * License: Apache License 2.0 (see the file LICENSE or http://apache.org/licenses/LICENSE-2.0.html). - */ - -package io.strimzi.kafka.bridge.http; - -import io.opentracing.Span; -import io.opentracing.tag.Tags; -import io.vertx.ext.web.RoutingContext; - -public class HttpTracingUtils { - - public static final String COMPONENT = "strimzi-kafka-bridge"; - public static final String KAFKA_SERVICE = "kafka"; - - - public static void setCommonTags(Span span, RoutingContext routingContext) { - Tags.COMPONENT.set(span, HttpTracingUtils.COMPONENT); - Tags.PEER_SERVICE.set(span, HttpTracingUtils.KAFKA_SERVICE); - Tags.HTTP_METHOD.set(span, routingContext.request().method().name()); - Tags.HTTP_URL.set(span, routingContext.request().uri()); - } -} \ No newline at end of file diff --git a/src/main/java/io/strimzi/kafka/bridge/tracing/OpenTelemetryHandle.java b/src/main/java/io/strimzi/kafka/bridge/tracing/OpenTelemetryHandle.java new file mode 100644 index 000000000..f3bad448e --- /dev/null +++ b/src/main/java/io/strimzi/kafka/bridge/tracing/OpenTelemetryHandle.java @@ -0,0 +1,214 @@ +/* + * Copyright Strimzi authors. + * License: Apache License 2.0 (see the file LICENSE or http://apache.org/licenses/LICENSE-2.0.html). + */ + +package io.strimzi.kafka.bridge.tracing; + +import io.jaegertracing.Configuration; +import io.netty.handler.codec.http.HttpResponseStatus; +import io.opentelemetry.api.GlobalOpenTelemetry; +import io.opentelemetry.api.trace.Span; +import io.opentelemetry.api.trace.SpanBuilder; +import io.opentelemetry.api.trace.SpanContext; +import io.opentelemetry.api.trace.SpanKind; +import io.opentelemetry.api.trace.StatusCode; +import io.opentelemetry.api.trace.Tracer; +import io.opentelemetry.context.Context; +import io.opentelemetry.context.Scope; +import io.opentelemetry.context.propagation.TextMapGetter; +import io.opentelemetry.context.propagation.TextMapPropagator; +import io.opentelemetry.instrumentation.kafkaclients.TracingConsumerInterceptor; +import io.opentelemetry.instrumentation.kafkaclients.TracingProducerInterceptor; +import io.opentelemetry.sdk.autoconfigure.AutoConfiguredOpenTelemetrySdk; +import io.opentelemetry.semconv.trace.attributes.SemanticAttributes; +import io.strimzi.kafka.bridge.config.BridgeConfig; +import io.vertx.ext.web.RoutingContext; +import io.vertx.kafka.client.producer.KafkaProducerRecord; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.clients.producer.ProducerConfig; + +import java.util.Map; +import java.util.Properties; + +import static io.strimzi.kafka.bridge.tracing.TracingConstants.COMPONENT; +import static io.strimzi.kafka.bridge.tracing.TracingConstants.JAEGER; +import static io.strimzi.kafka.bridge.tracing.TracingConstants.KAFKA_SERVICE; +import static io.strimzi.kafka.bridge.tracing.TracingConstants.OPENTELEMETRY_SERVICE_NAME_ENV_KEY; +import static io.strimzi.kafka.bridge.tracing.TracingConstants.OPENTELEMETRY_SERVICE_NAME_PROPERTY_KEY; +import static io.strimzi.kafka.bridge.tracing.TracingConstants.OPENTELEMETRY_TRACES_EXPORTER_ENV_KEY; +import static io.strimzi.kafka.bridge.tracing.TracingConstants.OPENTELEMETRY_TRACES_EXPORTER_PROPERTY_KEY; + +/** + * OpenTelemetry implementation of Tracing. + */ +class OpenTelemetryHandle implements TracingHandle { + + static void setCommonAttributes(SpanBuilder builder, RoutingContext routingContext) { + builder.setAttribute("component", COMPONENT); + builder.setAttribute(SemanticAttributes.PEER_SERVICE, KAFKA_SERVICE); + builder.setAttribute(SemanticAttributes.HTTP_METHOD, routingContext.request().method().name()); + builder.setAttribute(SemanticAttributes.HTTP_URL, routingContext.request().uri()); + } + + @Override + public String envName() { + return OPENTELEMETRY_SERVICE_NAME_ENV_KEY; + } + + @Override + public String serviceName(BridgeConfig config) { + String serviceName = System.getenv(envName()); + if (serviceName == null) { + // legacy purpose, use previous JAEGER_SERVICE_NAME as OTEL_SERVICE_NAME (if not explicitly set) + serviceName = System.getenv(Configuration.JAEGER_SERVICE_NAME); + if (serviceName == null) { + serviceName = config.getTracingServiceName(); + } + if (serviceName != null) { + System.setProperty(OPENTELEMETRY_SERVICE_NAME_PROPERTY_KEY, serviceName); + } + } + if (serviceName != null) { + if (System.getenv(OPENTELEMETRY_TRACES_EXPORTER_ENV_KEY) == null && System.getProperty(OPENTELEMETRY_TRACES_EXPORTER_PROPERTY_KEY) == null) { + System.setProperty(OPENTELEMETRY_TRACES_EXPORTER_PROPERTY_KEY, JAEGER); // it wasn't set in script + } + } + return serviceName; + } + + @Override + public void initialize() { + AutoConfiguredOpenTelemetrySdk.initialize(); + } + + private static Tracer get() { + return GlobalOpenTelemetry.getTracer(COMPONENT); + } + + private SpanBuilder getSpanBuilder(RoutingContext routingContext, String operationName) { + Tracer tracer = get(); + SpanBuilder spanBuilder; + Context parentContext = propagator().extract(Context.current(), routingContext, RCG); + if (parentContext == null) { + spanBuilder = tracer.spanBuilder(operationName); + } else { + spanBuilder = tracer.spanBuilder(operationName).setParent(parentContext); + } + return spanBuilder; + } + + @Override + public SpanBuilderHandle builder(RoutingContext routingContext, String operationName) { + SpanBuilder spanBuilder = getSpanBuilder(routingContext, operationName); + return new OTelSpanBuilderHandle<>(spanBuilder); + } + + private static TextMapPropagator propagator() { + return GlobalOpenTelemetry.getPropagators().getTextMapPropagator(); + } + + private static final TextMapGetter RCG = new TextMapGetter() { + @Override + public Iterable keys(RoutingContext rc) { + return rc.request().headers().names(); + } + + @Override + public String get(RoutingContext rc, String key) { + if (rc == null) { + return null; + } + return rc.request().headers().get(key); + } + }; + + private static final TextMapGetter> MG = new TextMapGetter>() { + @Override + public Iterable keys(Map map) { + return map.keySet(); + } + + @Override + public String get(Map map, String key) { + return map != null ? map.get(key) : null; + } + }; + + @Override + public SpanHandle span(RoutingContext routingContext, String operationName) { + return buildSpan(getSpanBuilder(routingContext, operationName), routingContext); + } + + private static SpanHandle buildSpan(SpanBuilder spanBuilder, RoutingContext routingContext) { + spanBuilder.setSpanKind(SpanKind.SERVER); + setCommonAttributes(spanBuilder, routingContext); + return new OTelSpanHandle<>(spanBuilder.startSpan()); + } + + private static class OTelSpanBuilderHandle implements SpanBuilderHandle { + private final SpanBuilder spanBuilder; + + public OTelSpanBuilderHandle(SpanBuilder spanBuilder) { + this.spanBuilder = spanBuilder; + } + + @Override + public void addRef(Map headers) { + Context parentContext = propagator().extract(Context.current(), headers, MG); + if (parentContext != null) { + Span parentSpan = Span.fromContext(parentContext); + SpanContext psc = parentSpan != null ? parentSpan.getSpanContext() : null; + if (psc != null) { + spanBuilder.addLink(psc); + } + } + } + + @Override + public SpanHandle span(RoutingContext routingContext) { + return buildSpan(spanBuilder, routingContext); + } + } + + @Override + public void kafkaConsumerConfig(Properties props) { + props.put(ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG, TracingConsumerInterceptor.class.getName()); + } + + @Override + public void kafkaProducerConfig(Properties props) { + props.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, TracingProducerInterceptor.class.getName()); + } + + private static final class OTelSpanHandle implements SpanHandle { + private final Span span; + private final Scope scope; + + public OTelSpanHandle(Span span) { + this.span = span; + this.scope = span.makeCurrent(); + } + + @Override + public void inject(KafkaProducerRecord record) { + propagator().inject(Context.current(), record, KafkaProducerRecord::addHeader); + } + + @Override + public void inject(RoutingContext routingContext) { + propagator().inject(Context.current(), routingContext, (rc, key, value) -> rc.response().headers().add(key, value)); + } + + @Override + public void finish(int code) { + try { + span.setAttribute(SemanticAttributes.HTTP_STATUS_CODE, code); + span.setStatus(code == HttpResponseStatus.OK.code() ? StatusCode.OK : StatusCode.ERROR); + scope.close(); + } finally { + span.end(); + } + } + } +} diff --git a/src/main/java/io/strimzi/kafka/bridge/tracing/OpenTracingHandle.java b/src/main/java/io/strimzi/kafka/bridge/tracing/OpenTracingHandle.java new file mode 100644 index 000000000..20c6ba1ad --- /dev/null +++ b/src/main/java/io/strimzi/kafka/bridge/tracing/OpenTracingHandle.java @@ -0,0 +1,190 @@ +/* + * Copyright Strimzi authors. + * License: Apache License 2.0 (see the file LICENSE or http://apache.org/licenses/LICENSE-2.0.html). + */ + +package io.strimzi.kafka.bridge.tracing; + +import io.jaegertracing.Configuration; +import io.opentracing.References; +import io.opentracing.Span; +import io.opentracing.SpanContext; +import io.opentracing.Tracer; +import io.opentracing.contrib.kafka.TracingConsumerInterceptor; +import io.opentracing.contrib.kafka.TracingProducerInterceptor; +import io.opentracing.propagation.Format; +import io.opentracing.propagation.TextMap; +import io.opentracing.propagation.TextMapAdapter; +import io.opentracing.tag.Tags; +import io.opentracing.util.GlobalTracer; +import io.strimzi.kafka.bridge.config.BridgeConfig; +import io.vertx.core.http.HttpServerRequest; +import io.vertx.ext.web.RoutingContext; +import io.vertx.kafka.client.producer.KafkaProducerRecord; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.clients.producer.ProducerConfig; + +import java.util.Iterator; +import java.util.Map; +import java.util.Properties; + +import static io.strimzi.kafka.bridge.tracing.TracingConstants.COMPONENT; +import static io.strimzi.kafka.bridge.tracing.TracingConstants.KAFKA_SERVICE; + +/** + * OpenTracing implementation of TracingHandle. + */ +class OpenTracingHandle implements TracingHandle { + + static void setCommonTags(Span span, RoutingContext routingContext) { + Tags.COMPONENT.set(span, COMPONENT); + Tags.PEER_SERVICE.set(span, KAFKA_SERVICE); + Tags.HTTP_METHOD.set(span, routingContext.request().method().name()); + Tags.HTTP_URL.set(span, routingContext.request().uri()); + } + + @Override + public String envName() { + return Configuration.JAEGER_SERVICE_NAME; + } + + @Override + public String serviceName(BridgeConfig config) { + String serviceName = System.getenv(envName()); + if (serviceName == null) { + serviceName = config.getTracingServiceName(); + } + return serviceName; + } + + @Override + public void initialize() { + Tracer tracer = Configuration.fromEnv().getTracer(); + GlobalTracer.registerIfAbsent(tracer); + } + + @Override + public SpanBuilderHandle builder(RoutingContext routingContext, String operationName) { + return new OTSpanBuilderHandle<>(getSpanBuilder(routingContext, operationName)); + } + + @Override + public SpanHandle span(RoutingContext routingContext, String operationName) { + Tracer.SpanBuilder spanBuilder = getSpanBuilder(routingContext, operationName); + return buildSpan(spanBuilder, routingContext); + } + + private Tracer.SpanBuilder getSpanBuilder(RoutingContext rc, String operationName) { + Tracer tracer = GlobalTracer.get(); + Tracer.SpanBuilder spanBuilder; + SpanContext parentSpan = tracer.extract(Format.Builtin.HTTP_HEADERS, new RequestTextMap(rc.request())); + if (parentSpan == null) { + spanBuilder = tracer.buildSpan(operationName); + } else { + spanBuilder = tracer.buildSpan(operationName).asChildOf(parentSpan); + } + return spanBuilder; + } + + private static class RequestTextMap implements TextMap { + private final HttpServerRequest request; + + public RequestTextMap(HttpServerRequest request) { + this.request = request; + } + + @Override + public Iterator> iterator() { + return request.headers().iterator(); + } + + @Override + public void put(String key, String value) { + request.headers().add(key, value); + } + } + + private static SpanHandle buildSpan(Tracer.SpanBuilder spanBuilder, RoutingContext routingContext) { + Span span = spanBuilder.withTag(Tags.SPAN_KIND.getKey(), Tags.SPAN_KIND_SERVER).start(); + setCommonTags(span, routingContext); + return new OTSpanHandle(span); + } + + private static class OTSpanBuilderHandle implements SpanBuilderHandle { + private final Tracer.SpanBuilder spanBuilder; + + public OTSpanBuilderHandle(Tracer.SpanBuilder spanBuilder) { + this.spanBuilder = spanBuilder; + } + + @Override + public void addRef(Map headers) { + Tracer tracer = GlobalTracer.get(); + SpanContext parentSpan = tracer.extract(Format.Builtin.HTTP_HEADERS, new TextMapAdapter(headers)); + if (parentSpan != null) { + spanBuilder.addReference(References.FOLLOWS_FROM, parentSpan); + } + } + + @Override + public SpanHandle span(RoutingContext routingContext) { + return buildSpan(spanBuilder, routingContext); + } + } + + @Override + public void kafkaConsumerConfig(Properties props) { + props.put(ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG, TracingConsumerInterceptor.class.getName()); + } + + @Override + public void kafkaProducerConfig(Properties props) { + props.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, TracingProducerInterceptor.class.getName()); + } + + private static final class OTSpanHandle implements SpanHandle { + private final Span span; + + public OTSpanHandle(Span span) { + this.span = span; + } + + @Override + public void inject(KafkaProducerRecord record) { + Tracer tracer = GlobalTracer.get(); + tracer.inject(span.context(), Format.Builtin.TEXT_MAP, new TextMap() { + @Override + public void put(String key, String value) { + record.addHeader(key, value); + } + + @Override + public Iterator> iterator() { + throw new UnsupportedOperationException("TextMapInjectAdapter should only be used with Tracer.inject()"); + } + }); + } + + @Override + public void inject(RoutingContext routingContext) { + Tracer tracer = GlobalTracer.get(); + tracer.inject(span.context(), Format.Builtin.TEXT_MAP, new TextMap() { + @Override + public void put(String key, String value) { + routingContext.response().headers().add(key, value); + } + + @Override + public Iterator> iterator() { + throw new UnsupportedOperationException("TextMapInjectAdapter should only be used with Tracer.inject()"); + } + }); + } + + @Override + public void finish(int code) { + Tags.HTTP_STATUS.set(span, code); + span.finish(); + } + } +} diff --git a/src/main/java/io/strimzi/kafka/bridge/tracing/SpanBuilderHandle.java b/src/main/java/io/strimzi/kafka/bridge/tracing/SpanBuilderHandle.java new file mode 100644 index 000000000..daf2f62ab --- /dev/null +++ b/src/main/java/io/strimzi/kafka/bridge/tracing/SpanBuilderHandle.java @@ -0,0 +1,18 @@ +/* + * Copyright Strimzi authors. + * License: Apache License 2.0 (see the file LICENSE or http://apache.org/licenses/LICENSE-2.0.html). + */ + +package io.strimzi.kafka.bridge.tracing; + +import io.vertx.ext.web.RoutingContext; + +import java.util.Map; + +/** + * Simple SpanBuilder handle. + */ +public interface SpanBuilderHandle { + void addRef(Map headers); + SpanHandle span(RoutingContext routingContext); +} diff --git a/src/main/java/io/strimzi/kafka/bridge/tracing/SpanHandle.java b/src/main/java/io/strimzi/kafka/bridge/tracing/SpanHandle.java new file mode 100644 index 000000000..7f1ed468e --- /dev/null +++ b/src/main/java/io/strimzi/kafka/bridge/tracing/SpanHandle.java @@ -0,0 +1,18 @@ +/* + * Copyright Strimzi authors. + * License: Apache License 2.0 (see the file LICENSE or http://apache.org/licenses/LICENSE-2.0.html). + */ + +package io.strimzi.kafka.bridge.tracing; + +import io.vertx.ext.web.RoutingContext; +import io.vertx.kafka.client.producer.KafkaProducerRecord; + +/** + * Simple Span handle. + */ +public interface SpanHandle { + void inject(KafkaProducerRecord record); + void inject(RoutingContext routingContext); + void finish(int code); +} diff --git a/src/main/java/io/strimzi/kafka/bridge/tracing/TracingConstants.java b/src/main/java/io/strimzi/kafka/bridge/tracing/TracingConstants.java new file mode 100644 index 000000000..1dfde779b --- /dev/null +++ b/src/main/java/io/strimzi/kafka/bridge/tracing/TracingConstants.java @@ -0,0 +1,24 @@ +/* + * Copyright Strimzi authors. + * License: Apache License 2.0 (see the file LICENSE or http://apache.org/licenses/LICENSE-2.0.html). + */ + +package io.strimzi.kafka.bridge.tracing; + +/** + * Tracing constants. + */ +public final class TracingConstants { + public static final String COMPONENT = "strimzi-kafka-bridge"; + public static final String KAFKA_SERVICE = "kafka"; + + public static final String JAEGER = "jaeger"; + + public static final String JAEGER_OPENTRACING = JAEGER; + public static final String OPENTELEMETRY = "opentelemetry"; + + public static final String OPENTELEMETRY_SERVICE_NAME_ENV_KEY = "OTEL_SERVICE_NAME"; + public static final String OPENTELEMETRY_SERVICE_NAME_PROPERTY_KEY = "otel.service.name"; + public static final String OPENTELEMETRY_TRACES_EXPORTER_ENV_KEY = "OTEL_TRACES_EXPORTER"; + public static final String OPENTELEMETRY_TRACES_EXPORTER_PROPERTY_KEY = "otel.traces.exporter"; +} diff --git a/src/main/java/io/strimzi/kafka/bridge/tracing/TracingHandle.java b/src/main/java/io/strimzi/kafka/bridge/tracing/TracingHandle.java new file mode 100644 index 000000000..3ab981b16 --- /dev/null +++ b/src/main/java/io/strimzi/kafka/bridge/tracing/TracingHandle.java @@ -0,0 +1,26 @@ +/* + * Copyright Strimzi authors. + * License: Apache License 2.0 (see the file LICENSE or http://apache.org/licenses/LICENSE-2.0.html). + */ + +package io.strimzi.kafka.bridge.tracing; + +import io.strimzi.kafka.bridge.config.BridgeConfig; +import io.vertx.ext.web.RoutingContext; + +import java.util.Properties; + +/** + * Simple interface to abstract tracing between legacy OpenTracing and new OpenTelemetry. + */ +public interface TracingHandle { + String envName(); + String serviceName(BridgeConfig config); + void initialize(); + + SpanBuilderHandle builder(RoutingContext routingContext, String operationName); + SpanHandle span(RoutingContext routingContext, String operationName); + + void kafkaConsumerConfig(Properties props); + void kafkaProducerConfig(Properties props); +} diff --git a/src/main/java/io/strimzi/kafka/bridge/tracing/TracingUtil.java b/src/main/java/io/strimzi/kafka/bridge/tracing/TracingUtil.java new file mode 100644 index 000000000..6f43ddd38 --- /dev/null +++ b/src/main/java/io/strimzi/kafka/bridge/tracing/TracingUtil.java @@ -0,0 +1,110 @@ +/* + * Copyright Strimzi authors. + * License: Apache License 2.0 (see the file LICENSE or http://apache.org/licenses/LICENSE-2.0.html). + */ + +package io.strimzi.kafka.bridge.tracing; + +import io.strimzi.kafka.bridge.config.BridgeConfig; +import io.vertx.ext.web.RoutingContext; +import io.vertx.kafka.client.producer.KafkaProducerRecord; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Map; +import java.util.Properties; + +import static io.strimzi.kafka.bridge.tracing.TracingConstants.JAEGER_OPENTRACING; +import static io.strimzi.kafka.bridge.tracing.TracingConstants.OPENTELEMETRY; + +/** + * Tracing util to hold app's Tracing instance. + */ +public class TracingUtil { + private static final Logger log = LoggerFactory.getLogger(TracingUtil.class); + private static TracingHandle tracing = new NoopTracing(); + + public static TracingHandle getTracing() { + return tracing; + } + + public static void initialize(BridgeConfig config) { + String tracingConfig = config.getTracing(); + if (tracingConfig != null && (tracingConfig.equals(JAEGER_OPENTRACING) || tracingConfig.equals(OPENTELEMETRY))) { + boolean isOpenTelemetry = OPENTELEMETRY.equals(tracingConfig); + TracingHandle instance = isOpenTelemetry ? new OpenTelemetryHandle() : new OpenTracingHandle(); + + String serviceName = instance.serviceName(config); + if (serviceName != null) { + log.info( + "Initializing {} tracing config with service name {}", + isOpenTelemetry ? "OpenTelemetry" : "OpenTracing", + serviceName + ); + instance.initialize(); + tracing = instance; + } else { + log.error("Tracing config cannot be initialized because {} environment variable is not defined", instance.envName()); + } + } + } + + private static final class NoopTracing implements TracingHandle { + @Override + public String envName() { + return null; + } + + @Override + public String serviceName(BridgeConfig config) { + return null; + } + + @Override + public void initialize() { + } + + @Override + public SpanBuilderHandle builder(RoutingContext routingContext, String operationName) { + return new NoopSpanBuilderHandle<>(); + } + + @Override + public SpanHandle span(RoutingContext routingContext, String operationName) { + return new NoopSpanHandle<>(); + } + + @Override + public void kafkaConsumerConfig(Properties props) { + } + + @Override + public void kafkaProducerConfig(Properties props) { + } + } + + private static final class NoopSpanBuilderHandle implements SpanBuilderHandle { + @Override + public void addRef(Map headers) { + } + + @Override + public SpanHandle span(RoutingContext routingContext) { + return new NoopSpanHandle<>(); + } + } + + private static final class NoopSpanHandle implements SpanHandle { + @Override + public void inject(KafkaProducerRecord record) { + } + + @Override + public void inject(RoutingContext routingContext) { + } + + @Override + public void finish(int code) { + } + } +} diff --git a/src/test/java/io/strimzi/kafka/bridge/tracing/OpenTelemetryTest.java b/src/test/java/io/strimzi/kafka/bridge/tracing/OpenTelemetryTest.java new file mode 100644 index 000000000..0543e0b27 --- /dev/null +++ b/src/test/java/io/strimzi/kafka/bridge/tracing/OpenTelemetryTest.java @@ -0,0 +1,25 @@ +/* + * Copyright Strimzi authors. + * License: Apache License 2.0 (see the file LICENSE or http://apache.org/licenses/LICENSE-2.0.html). + */ + +package io.strimzi.kafka.bridge.tracing; + +import io.vertx.core.tracing.TracingOptions; +import io.vertx.tracing.opentelemetry.OpenTelemetryOptions; + +import static io.strimzi.kafka.bridge.tracing.TracingConstants.JAEGER; +import static io.strimzi.kafka.bridge.tracing.TracingConstants.OPENTELEMETRY_SERVICE_NAME_PROPERTY_KEY; +import static io.strimzi.kafka.bridge.tracing.TracingConstants.OPENTELEMETRY_TRACES_EXPORTER_PROPERTY_KEY; + +/** + * OpenTelemetry tests + */ +public class OpenTelemetryTest extends TracingTestBase { + @Override + protected TracingOptions tracingOptions() { + System.setProperty(OPENTELEMETRY_TRACES_EXPORTER_PROPERTY_KEY, JAEGER); + System.setProperty(OPENTELEMETRY_SERVICE_NAME_PROPERTY_KEY, "strimzi-kafka-bridge-test"); + return new OpenTelemetryOptions(); + } +} diff --git a/src/test/java/io/strimzi/kafka/bridge/tracing/TracingTestBase.java b/src/test/java/io/strimzi/kafka/bridge/tracing/TracingTestBase.java new file mode 100644 index 000000000..c149d376c --- /dev/null +++ b/src/test/java/io/strimzi/kafka/bridge/tracing/TracingTestBase.java @@ -0,0 +1,98 @@ +/* + * Copyright Strimzi authors. + * License: Apache License 2.0 (see the file LICENSE or http://apache.org/licenses/LICENSE-2.0.html). + */ + +package io.strimzi.kafka.bridge.tracing; + +import io.netty.handler.codec.http.HttpResponseStatus; +import io.strimzi.kafka.bridge.BridgeContentType; +import io.strimzi.kafka.bridge.http.services.ProducerService; +import io.strimzi.kafka.bridge.utils.Urls; +import io.vertx.core.AsyncResult; +import io.vertx.core.Handler; +import io.vertx.core.Vertx; +import io.vertx.core.VertxOptions; +import io.vertx.core.json.JsonArray; +import io.vertx.core.json.JsonObject; +import io.vertx.core.tracing.TracingOptions; +import io.vertx.core.tracing.TracingPolicy; +import io.vertx.ext.web.client.HttpResponse; +import io.vertx.ext.web.client.WebClient; +import io.vertx.ext.web.client.WebClientOptions; +import io.vertx.junit5.VertxExtension; +import io.vertx.junit5.VertxTestContext; +import org.junit.jupiter.api.Assumptions; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.net.URL; + +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.is; + +/** + * Base for OpenTracing and OpenTelemetry (manual) tests. + *

+ * Test will only run if the bridge AND tracing server are up-n-running. + */ +@ExtendWith(VertxExtension.class) +public abstract class TracingTestBase { + Logger log = LoggerFactory.getLogger(getClass()); + + private void assumeServer(String url) { + try { + new URL(url).openConnection().getInputStream(); + } catch (Exception e) { + log.info("Cannot connect to server", e); + Assumptions.assumeTrue(false, "Server is not running: " + url); + } + } + + Handler>> verifyOK(VertxTestContext context) { + return ar -> { + context.verify(() -> { + assertThat(ar.succeeded(), is(true)); + HttpResponse response = ar.result(); + assertThat(response.statusCode(), is(HttpResponseStatus.OK.code())); + }); + context.completeNow(); + }; + } + + @BeforeEach + public void setUp() { + assumeServer(String.format("http://%s:%s", Urls.BRIDGE_HOST, Urls.BRIDGE_PORT)); // bridge + assumeServer("http://localhost:16686"); // jaeger + } + + protected abstract TracingOptions tracingOptions(); + + @Test + public void testSmoke(VertxTestContext context) { + Vertx vertx = Vertx.vertx(new VertxOptions().setTracingOptions(tracingOptions())); + + WebClient client = WebClient.create(vertx, (WebClientOptions) new WebClientOptions() + .setDefaultHost(Urls.BRIDGE_HOST) + .setDefaultPort(Urls.BRIDGE_PORT) + .setTracingPolicy(TracingPolicy.ALWAYS) + ); + + String value = "message-value"; + + JsonArray records = new JsonArray(); + JsonObject json = new JsonObject(); + json.put("value", value); + records.add(json); + + JsonObject root = new JsonObject(); + root.put("records", records); + + ProducerService.getInstance(client) + .sendRecordsRequest("mytopic", root, BridgeContentType.KAFKA_JSON_JSON) + .sendJsonObject(root, verifyOK(context)); + } +} From 03002253558d76916320cd553e36c6b0bade48d1 Mon Sep 17 00:00:00 2001 From: Ales Justin Date: Mon, 11 Apr 2022 15:29:28 +0200 Subject: [PATCH 02/14] Fix async scope/context handling in OTel. Signed-off-by: Ales Justin --- .../bridge/http/HttpSourceBridgeEndpoint.java | 4 +- .../bridge/tracing/OpenTelemetryHandle.java | 42 ++++++++++++++++++- .../bridge/tracing/OpenTracingHandle.java | 8 +--- .../kafka/bridge/tracing/SpanHandle.java | 6 +++ 4 files changed, 51 insertions(+), 9 deletions(-) diff --git a/src/main/java/io/strimzi/kafka/bridge/http/HttpSourceBridgeEndpoint.java b/src/main/java/io/strimzi/kafka/bridge/http/HttpSourceBridgeEndpoint.java index 498ca5461..1c47eb51a 100644 --- a/src/main/java/io/strimzi/kafka/bridge/http/HttpSourceBridgeEndpoint.java +++ b/src/main/java/io/strimzi/kafka/bridge/http/HttpSourceBridgeEndpoint.java @@ -141,7 +141,9 @@ public void handle(Endpoint endpoint) { List sendHandlers = new ArrayList<>(records.size()); for (KafkaProducerRecord record : records) { Promise promise = Promise.promise(); - sendHandlers.add(promise.future()); + Future future = promise.future().onComplete(ar -> span.clean(record)); + sendHandlers.add(future); + span.prepare(record); this.send(record, promise); } diff --git a/src/main/java/io/strimzi/kafka/bridge/tracing/OpenTelemetryHandle.java b/src/main/java/io/strimzi/kafka/bridge/tracing/OpenTelemetryHandle.java index f3bad448e..29d7182c5 100644 --- a/src/main/java/io/strimzi/kafka/bridge/tracing/OpenTelemetryHandle.java +++ b/src/main/java/io/strimzi/kafka/bridge/tracing/OpenTelemetryHandle.java @@ -23,13 +23,21 @@ import io.opentelemetry.sdk.autoconfigure.AutoConfiguredOpenTelemetrySdk; import io.opentelemetry.semconv.trace.attributes.SemanticAttributes; import io.strimzi.kafka.bridge.config.BridgeConfig; +import io.vertx.core.buffer.Buffer; import io.vertx.ext.web.RoutingContext; +import io.vertx.kafka.client.producer.KafkaHeader; import io.vertx.kafka.client.producer.KafkaProducerRecord; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.producer.ProducerConfig; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.common.header.Header; +import org.apache.kafka.common.header.Headers; import java.util.Map; +import java.util.Optional; import java.util.Properties; +import java.util.UUID; +import java.util.concurrent.ConcurrentHashMap; import static io.strimzi.kafka.bridge.tracing.TracingConstants.COMPONENT; import static io.strimzi.kafka.bridge.tracing.TracingConstants.JAEGER; @@ -178,7 +186,7 @@ public void kafkaConsumerConfig(Properties props) { @Override public void kafkaProducerConfig(Properties props) { - props.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, TracingProducerInterceptor.class.getName()); + props.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, ContextAwareTracingProducerInterceptor.class.getName()); } private static final class OTelSpanHandle implements SpanHandle { @@ -190,6 +198,22 @@ public OTelSpanHandle(Span span) { this.scope = span.makeCurrent(); } + @Override + public void prepare(KafkaProducerRecord record) { + String uuid = UUID.randomUUID().toString(); + spans.put(uuid, span); + record.addHeader(_UUID, uuid); + } + + @Override + public void clean(KafkaProducerRecord record) { + Optional oh = record.headers().stream().filter(h -> h.key().equals(_UUID)).findFirst(); + oh.ifPresent(h -> { + String uuid = h.value().toString(); + spans.remove(uuid); + }); + } + @Override public void inject(KafkaProducerRecord record) { propagator().inject(Context.current(), record, KafkaProducerRecord::addHeader); @@ -211,4 +235,20 @@ public void finish(int code) { } } } + + static final String _UUID = "_UUID"; + static final Map spans = new ConcurrentHashMap<>(); + + public static class ContextAwareTracingProducerInterceptor extends TracingProducerInterceptor { + @Override + public ProducerRecord onSend(ProducerRecord record) { + Headers headers = record.headers(); + String key = Buffer.buffer(headers.lastHeader(_UUID).value()).toString(); + headers.remove(_UUID); + Span span = spans.remove(key); + try (Scope ignored = span.makeCurrent()) { + return super.onSend(record); + } + } + } } diff --git a/src/main/java/io/strimzi/kafka/bridge/tracing/OpenTracingHandle.java b/src/main/java/io/strimzi/kafka/bridge/tracing/OpenTracingHandle.java index 20c6ba1ad..32c5de020 100644 --- a/src/main/java/io/strimzi/kafka/bridge/tracing/OpenTracingHandle.java +++ b/src/main/java/io/strimzi/kafka/bridge/tracing/OpenTracingHandle.java @@ -76,14 +76,8 @@ public SpanHandle span(RoutingContext routingContext, String operat private Tracer.SpanBuilder getSpanBuilder(RoutingContext rc, String operationName) { Tracer tracer = GlobalTracer.get(); - Tracer.SpanBuilder spanBuilder; SpanContext parentSpan = tracer.extract(Format.Builtin.HTTP_HEADERS, new RequestTextMap(rc.request())); - if (parentSpan == null) { - spanBuilder = tracer.buildSpan(operationName); - } else { - spanBuilder = tracer.buildSpan(operationName).asChildOf(parentSpan); - } - return spanBuilder; + return tracer.buildSpan(operationName).asChildOf(parentSpan); } private static class RequestTextMap implements TextMap { diff --git a/src/main/java/io/strimzi/kafka/bridge/tracing/SpanHandle.java b/src/main/java/io/strimzi/kafka/bridge/tracing/SpanHandle.java index 7f1ed468e..40e671c1c 100644 --- a/src/main/java/io/strimzi/kafka/bridge/tracing/SpanHandle.java +++ b/src/main/java/io/strimzi/kafka/bridge/tracing/SpanHandle.java @@ -12,6 +12,12 @@ * Simple Span handle. */ public interface SpanHandle { + default void prepare(KafkaProducerRecord record) { + } + + default void clean(KafkaProducerRecord record) { + } + void inject(KafkaProducerRecord record); void inject(RoutingContext routingContext); void finish(int code); From 9ce78c6537976a4416973cc9bccf13c634c76adf Mon Sep 17 00:00:00 2001 From: Ales Justin Date: Fri, 17 Jun 2022 11:05:17 +0200 Subject: [PATCH 03/14] Fix relationship handling. Signed-off-by: Ales Justin --- config/application.properties | 3 +- pom.xml | 11 ++ .../io/strimzi/kafka/bridge/Application.java | 8 +- .../kafka/bridge/SinkBridgeEndpoint.java | 5 - .../kafka/bridge/SourceBridgeEndpoint.java | 2 +- .../kafka/bridge/config/BridgeConfig.java | 9 -- .../bridge/http/HttpSinkBridgeEndpoint.java | 11 +- .../bridge/http/HttpSourceBridgeEndpoint.java | 27 ++--- .../bridge/tracing/NoopTracingHandle.java | 68 ++++++++++++ .../bridge/tracing/OpenTelemetryHandle.java | 103 +++++++++++------- .../bridge/tracing/OpenTracingHandle.java | 59 +++++----- .../bridge/tracing/SpanBuilderHandle.java | 11 +- .../kafka/bridge/tracing/SpanHandle.java | 29 ++++- .../kafka/bridge/tracing/TracingHandle.java | 54 ++++++++- .../kafka/bridge/tracing/TracingUtil.java | 80 ++++---------- .../kafka/bridge/tracing/OpenTracingTest.java | 24 ++++ 16 files changed, 325 insertions(+), 179 deletions(-) create mode 100644 src/main/java/io/strimzi/kafka/bridge/tracing/NoopTracingHandle.java create mode 100644 src/test/java/io/strimzi/kafka/bridge/tracing/OpenTracingTest.java diff --git a/config/application.properties b/config/application.properties index a5b4c9e1a..385ea6c76 100644 --- a/config/application.properties +++ b/config/application.properties @@ -2,10 +2,9 @@ bridge.id=my-bridge # uncomment one the following lines (bridge.tracing) to enable Jaeger tracing, check the documentation how to configure the tracer # OpenTracing support -#bridge.tracing=jaeger +bridge.tracing=jaeger # OpenTelemetry support #bridge.tracing=opentelemetry -#bridge.tracing.service-name=strimzi-kafka-bridge #Apache Kafka common kafka.bootstrap.servers=localhost:9092 diff --git a/pom.xml b/pom.xml index c54add31e..618f24839 100644 --- a/pom.xml +++ b/pom.xml @@ -300,6 +300,11 @@ opentelemetry-kafka-clients-2.6 ${opentelemetry.version} + + io.opentelemetry.instrumentation + opentelemetry-instrumentation-api + ${opentelemetry.version} + io.opentelemetry.instrumentation opentelemetry-kafka-clients-common @@ -398,6 +403,12 @@ ${vertx.version} test + + io.vertx + vertx-opentracing + ${vertx.version} + test + io.vertx vertx-opentelemetry diff --git a/src/main/java/io/strimzi/kafka/bridge/Application.java b/src/main/java/io/strimzi/kafka/bridge/Application.java index 9acae9edc..2249af991 100644 --- a/src/main/java/io/strimzi/kafka/bridge/Application.java +++ b/src/main/java/io/strimzi/kafka/bridge/Application.java @@ -90,8 +90,8 @@ public static void main(String[] args) { .setConfig(new JsonObject().put("raw-data", true)); ConfigRetrieverOptions options = new ConfigRetrieverOptions() - .addStore(fileStore) - .addStore(envStore); + .addStore(fileStore) + .addStore(envStore); ConfigRetriever retriever = ConfigRetriever.create(vertx, options); retriever.getConfig(ar -> { @@ -125,14 +125,14 @@ public static void main(String[] args) { } } - // register Jaeger tracer - if set, etc + // register tracing - if set, etc TracingUtil.initialize(bridgeConfig); // when HTTP protocol is enabled, it handles healthy/ready/metrics endpoints as well, // so no need for a standalone embedded HTTP server if (!bridgeConfig.getHttpConfig().isEnabled()) { EmbeddedHttpServer embeddedHttpServer = - new EmbeddedHttpServer(vertx, healthChecker, metricsReporter, embeddedHttpServerPort); + new EmbeddedHttpServer(vertx, healthChecker, metricsReporter, embeddedHttpServerPort); embeddedHttpServer.start(); } } diff --git a/src/main/java/io/strimzi/kafka/bridge/SinkBridgeEndpoint.java b/src/main/java/io/strimzi/kafka/bridge/SinkBridgeEndpoint.java index 4dae9892f..ab6d89c51 100644 --- a/src/main/java/io/strimzi/kafka/bridge/SinkBridgeEndpoint.java +++ b/src/main/java/io/strimzi/kafka/bridge/SinkBridgeEndpoint.java @@ -7,8 +7,6 @@ import io.strimzi.kafka.bridge.config.BridgeConfig; import io.strimzi.kafka.bridge.config.KafkaConfig; -import io.strimzi.kafka.bridge.tracing.TracingHandle; -import io.strimzi.kafka.bridge.tracing.TracingUtil; import io.strimzi.kafka.bridge.tracker.OffsetTracker; import io.vertx.core.AsyncResult; import io.vertx.core.CompositeFuture; @@ -169,9 +167,6 @@ protected void initConsumer(boolean shouldAttachBatchHandler, Properties config) props.putAll(kafkaConfig.getConsumerConfig().getConfig()); props.put(ConsumerConfig.GROUP_ID_CONFIG, this.groupId); - TracingHandle tracing = TracingUtil.getTracing(); - tracing.kafkaConsumerConfig(props); - if (config != null) props.putAll(config); diff --git a/src/main/java/io/strimzi/kafka/bridge/SourceBridgeEndpoint.java b/src/main/java/io/strimzi/kafka/bridge/SourceBridgeEndpoint.java index 00fc231cc..ad5e50260 100644 --- a/src/main/java/io/strimzi/kafka/bridge/SourceBridgeEndpoint.java +++ b/src/main/java/io/strimzi/kafka/bridge/SourceBridgeEndpoint.java @@ -106,7 +106,7 @@ public void open() { props.putAll(kafkaConfig.getProducerConfig().getConfig()); TracingHandle tracing = TracingUtil.getTracing(); - tracing.kafkaProducerConfig(props); + tracing.addTracingPropsToProducerConfig(props); this.producerUnsettledMode = KafkaProducer.create(this.vertx, props, this.keySerializer, this.valueSerializer); diff --git a/src/main/java/io/strimzi/kafka/bridge/config/BridgeConfig.java b/src/main/java/io/strimzi/kafka/bridge/config/BridgeConfig.java index ab5a56a94..b1c08166c 100644 --- a/src/main/java/io/strimzi/kafka/bridge/config/BridgeConfig.java +++ b/src/main/java/io/strimzi/kafka/bridge/config/BridgeConfig.java @@ -20,7 +20,6 @@ public class BridgeConfig extends AbstractConfig { public static final String BRIDGE_ID = BRIDGE_CONFIG_PREFIX + "id"; public static final String TRACING_TYPE = BRIDGE_CONFIG_PREFIX + "tracing"; - public static final String TRACING_SERVICE_NAME_TYPE = TRACING_TYPE + ".service-name"; private KafkaConfig kafkaConfig; private AmqpConfig amqpConfig; @@ -104,12 +103,4 @@ public String getTracing() { return config.get(BridgeConfig.TRACING_TYPE).toString(); } } - - public String getTracingServiceName() { - if (config.get(BridgeConfig.TRACING_SERVICE_NAME_TYPE) == null) { - return null; - } else { - return config.get(BridgeConfig.TRACING_SERVICE_NAME_TYPE).toString(); - } - } } diff --git a/src/main/java/io/strimzi/kafka/bridge/http/HttpSinkBridgeEndpoint.java b/src/main/java/io/strimzi/kafka/bridge/http/HttpSinkBridgeEndpoint.java index 41aca9eb0..0b3260ff7 100644 --- a/src/main/java/io/strimzi/kafka/bridge/http/HttpSinkBridgeEndpoint.java +++ b/src/main/java/io/strimzi/kafka/bridge/http/HttpSinkBridgeEndpoint.java @@ -35,7 +35,6 @@ import io.vertx.kafka.client.common.TopicPartition; import io.vertx.kafka.client.consumer.KafkaConsumerRecord; import io.vertx.kafka.client.consumer.OffsetAndMetadata; -import io.vertx.kafka.client.producer.KafkaHeader; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.common.serialization.Deserializer; @@ -272,18 +271,12 @@ private void doPoll(RoutingContext routingContext) { TracingHandle tracing = TracingUtil.getTracing(); SpanBuilderHandle builder = tracing.builder(routingContext, HttpOpenApiOperations.POLL.toString()); + SpanHandle span = builder.span(routingContext); for (int i = 0; i < records.result().size(); i++) { KafkaConsumerRecord record = records.result().recordAt(i); - - Map headers = new HashMap<>(); - for (KafkaHeader header : record.headers()) { - headers.put(header.key(), header.value().toString()); - } - - builder.addRef(headers); + tracing.handleRecordSpan(span, record); } - SpanHandle span = builder.span(routingContext); span.inject(routingContext); diff --git a/src/main/java/io/strimzi/kafka/bridge/http/HttpSourceBridgeEndpoint.java b/src/main/java/io/strimzi/kafka/bridge/http/HttpSourceBridgeEndpoint.java index 1c47eb51a..c5bf30240 100644 --- a/src/main/java/io/strimzi/kafka/bridge/http/HttpSourceBridgeEndpoint.java +++ b/src/main/java/io/strimzi/kafka/bridge/http/HttpSourceBridgeEndpoint.java @@ -22,7 +22,6 @@ import io.vertx.core.CompositeFuture; import io.vertx.core.Future; import io.vertx.core.Handler; -import io.vertx.core.MultiMap; import io.vertx.core.Promise; import io.vertx.core.Vertx; import io.vertx.core.buffer.Buffer; @@ -35,11 +34,8 @@ import org.apache.kafka.common.serialization.Serializer; import java.util.ArrayList; -import java.util.HashMap; import java.util.List; -import java.util.Map; import java.util.UUID; -import java.util.Map.Entry; public class HttpSourceBridgeEndpoint extends SourceBridgeEndpoint { @@ -89,12 +85,6 @@ public void handle(Endpoint endpoint) { boolean isAsync = Boolean.parseBoolean(routingContext.queryParams().get("async")); - MultiMap httpHeaders = routingContext.request().headers(); - Map headers = new HashMap<>(); - for (Entry header: httpHeaders.entries()) { - headers.put(header.getKey(), header.getValue()); - } - String operationName = partition == null ? HttpOpenApiOperations.SEND.toString() : HttpOpenApiOperations.SEND_TO_PARTITION.toString(); TracingHandle tracing = TracingUtil.getTracing(); @@ -102,12 +92,12 @@ public void handle(Endpoint endpoint) { try { if (messageConverter == null) { + span.finish(HttpResponseStatus.INTERNAL_SERVER_ERROR.code()); HttpBridgeError error = new HttpBridgeError( HttpResponseStatus.INTERNAL_SERVER_ERROR.code(), HttpResponseStatus.INTERNAL_SERVER_ERROR.reasonPhrase()); HttpUtils.sendResponse(routingContext, HttpResponseStatus.INTERNAL_SERVER_ERROR.code(), BridgeContentType.KAFKA_JSON, error.toJson().toBuffer()); - span.finish(HttpResponseStatus.INTERNAL_SERVER_ERROR.code()); return; } records = messageConverter.toKafkaRecords(topic, partition, routingContext.body().buffer()); @@ -116,13 +106,13 @@ public void handle(Endpoint endpoint) { span.inject(record); } } catch (Exception e) { + span.finish(HttpResponseStatus.UNPROCESSABLE_ENTITY.code()); HttpBridgeError error = new HttpBridgeError( HttpResponseStatus.UNPROCESSABLE_ENTITY.code(), e.getMessage()); HttpUtils.sendResponse(routingContext, HttpResponseStatus.UNPROCESSABLE_ENTITY.code(), BridgeContentType.KAFKA_JSON, error.toJson().toBuffer()); - span.finish(HttpResponseStatus.UNPROCESSABLE_ENTITY.code()); return; } List> results = new ArrayList<>(records.size()); @@ -130,7 +120,12 @@ public void handle(Endpoint endpoint) { // start sending records asynchronously if (isAsync) { // if async is specified, return immediately once records are sent - this.sendAsyncRecords(records); + for (KafkaProducerRecord record : records) { + span.prepare(record); + Promise promise = Promise.promise(); + promise.future().onComplete(ar -> span.clean(record)); + this.send(record, promise); + } span.finish(HttpResponseStatus.NO_CONTENT.code()); HttpUtils.sendResponse(routingContext, HttpResponseStatus.NO_CONTENT.code(), BridgeContentType.KAFKA_JSON, null); @@ -197,12 +192,6 @@ private JsonObject buildOffsets(List> results) { return jsonResponse; } - private void sendAsyncRecords(List> records) { - for (KafkaProducerRecord record : records) { - this.send(record, null); - } - } - private int handleError(Throwable ex) { if (ex instanceof TimeoutException && ex.getMessage() != null && ex.getMessage().contains("not present in metadata")) { diff --git a/src/main/java/io/strimzi/kafka/bridge/tracing/NoopTracingHandle.java b/src/main/java/io/strimzi/kafka/bridge/tracing/NoopTracingHandle.java new file mode 100644 index 000000000..638c44c07 --- /dev/null +++ b/src/main/java/io/strimzi/kafka/bridge/tracing/NoopTracingHandle.java @@ -0,0 +1,68 @@ +/* + * Copyright Strimzi authors. + * License: Apache License 2.0 (see the file LICENSE or http://apache.org/licenses/LICENSE-2.0.html). + */ + +package io.strimzi.kafka.bridge.tracing; + +import io.strimzi.kafka.bridge.config.BridgeConfig; +import io.vertx.ext.web.RoutingContext; +import io.vertx.kafka.client.consumer.KafkaConsumerRecord; +import io.vertx.kafka.client.producer.KafkaProducerRecord; + +import java.util.Properties; + +final class NoopTracingHandle implements TracingHandle { + @Override + public String envName() { + return null; + } + + @Override + public String serviceName(BridgeConfig config) { + return null; + } + + @Override + public void initialize() { + } + + @Override + public SpanBuilderHandle builder(RoutingContext routingContext, String operationName) { + return new NoopSpanBuilderHandle<>(); + } + + @Override + public SpanHandle span(RoutingContext routingContext, String operationName) { + return new NoopSpanHandle<>(); + } + + @Override + public void handleRecordSpan(SpanHandle parentSpanHandle, KafkaConsumerRecord record) { + } + + @Override + public void addTracingPropsToProducerConfig(Properties props) { + } + + private static final class NoopSpanBuilderHandle implements SpanBuilderHandle { + @Override + public SpanHandle span(RoutingContext routingContext) { + return new NoopSpanHandle<>(); + } + } + + private static final class NoopSpanHandle implements SpanHandle { + @Override + public void inject(KafkaProducerRecord record) { + } + + @Override + public void inject(RoutingContext routingContext) { + } + + @Override + public void finish(int code) { + } + } +} diff --git a/src/main/java/io/strimzi/kafka/bridge/tracing/OpenTelemetryHandle.java b/src/main/java/io/strimzi/kafka/bridge/tracing/OpenTelemetryHandle.java index 29d7182c5..b1911114b 100644 --- a/src/main/java/io/strimzi/kafka/bridge/tracing/OpenTelemetryHandle.java +++ b/src/main/java/io/strimzi/kafka/bridge/tracing/OpenTelemetryHandle.java @@ -18,19 +18,18 @@ import io.opentelemetry.context.Scope; import io.opentelemetry.context.propagation.TextMapGetter; import io.opentelemetry.context.propagation.TextMapPropagator; -import io.opentelemetry.instrumentation.kafkaclients.TracingConsumerInterceptor; +import io.opentelemetry.instrumentation.api.instrumenter.messaging.MessageOperation; import io.opentelemetry.instrumentation.kafkaclients.TracingProducerInterceptor; import io.opentelemetry.sdk.autoconfigure.AutoConfiguredOpenTelemetrySdk; import io.opentelemetry.semconv.trace.attributes.SemanticAttributes; import io.strimzi.kafka.bridge.config.BridgeConfig; import io.vertx.core.buffer.Buffer; import io.vertx.ext.web.RoutingContext; +import io.vertx.kafka.client.consumer.KafkaConsumerRecord; import io.vertx.kafka.client.producer.KafkaHeader; import io.vertx.kafka.client.producer.KafkaProducerRecord; -import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.clients.producer.ProducerRecord; -import org.apache.kafka.common.header.Header; import org.apache.kafka.common.header.Headers; import java.util.Map; @@ -52,8 +51,9 @@ */ class OpenTelemetryHandle implements TracingHandle { + private Tracer tracer; + static void setCommonAttributes(SpanBuilder builder, RoutingContext routingContext) { - builder.setAttribute("component", COMPONENT); builder.setAttribute(SemanticAttributes.PEER_SERVICE, KAFKA_SERVICE); builder.setAttribute(SemanticAttributes.HTTP_METHOD, routingContext.request().method().name()); builder.setAttribute(SemanticAttributes.HTTP_URL, routingContext.request().uri()); @@ -70,9 +70,6 @@ public String serviceName(BridgeConfig config) { if (serviceName == null) { // legacy purpose, use previous JAEGER_SERVICE_NAME as OTEL_SERVICE_NAME (if not explicitly set) serviceName = System.getenv(Configuration.JAEGER_SERVICE_NAME); - if (serviceName == null) { - serviceName = config.getTracingServiceName(); - } if (serviceName != null) { System.setProperty(OPENTELEMETRY_SERVICE_NAME_PROPERTY_KEY, serviceName); } @@ -90,14 +87,17 @@ public void initialize() { AutoConfiguredOpenTelemetrySdk.initialize(); } - private static Tracer get() { - return GlobalOpenTelemetry.getTracer(COMPONENT); + private Tracer get() { + if (tracer == null) { + tracer = GlobalOpenTelemetry.getTracer(COMPONENT); + } + return tracer; } private SpanBuilder getSpanBuilder(RoutingContext routingContext, String operationName) { Tracer tracer = get(); SpanBuilder spanBuilder; - Context parentContext = propagator().extract(Context.current(), routingContext, RCG); + Context parentContext = propagator().extract(Context.current(), routingContext, ROUTING_CONTEXT_GETTER); if (parentContext == null) { spanBuilder = tracer.spanBuilder(operationName); } else { @@ -112,11 +112,30 @@ public SpanBuilderHandle builder(RoutingContext routingContext, Str return new OTelSpanBuilderHandle<>(spanBuilder); } + @Override + public void handleRecordSpan(SpanHandle parentSpanHandle, KafkaConsumerRecord record) { + String operationName = String.format("%s %s", record.topic(), MessageOperation.RECEIVE); + SpanBuilder spanBuilder = get().spanBuilder(operationName); + Context parentContext = propagator().extract(Context.current(), TracingUtil.toHeaders(record), MG); + if (parentContext != null) { + Span parentSpan = Span.fromContext(parentContext); + SpanContext psc = parentSpan != null ? parentSpan.getSpanContext() : null; + if (psc != null) { + spanBuilder.addLink(psc); + } + } + spanBuilder + .setSpanKind(SpanKind.CONSUMER) + .setParent(Context.current()) + .startSpan() + .end(); + } + private static TextMapPropagator propagator() { return GlobalOpenTelemetry.getPropagators().getTextMapPropagator(); } - private static final TextMapGetter RCG = new TextMapGetter() { + private static final TextMapGetter ROUTING_CONTEXT_GETTER = new TextMapGetter() { @Override public Iterable keys(RoutingContext rc) { return rc.request().headers().names(); @@ -161,18 +180,6 @@ public OTelSpanBuilderHandle(SpanBuilder spanBuilder) { this.spanBuilder = spanBuilder; } - @Override - public void addRef(Map headers) { - Context parentContext = propagator().extract(Context.current(), headers, MG); - if (parentContext != null) { - Span parentSpan = Span.fromContext(parentContext); - SpanContext psc = parentSpan != null ? parentSpan.getSpanContext() : null; - if (psc != null) { - spanBuilder.addLink(psc); - } - } - } - @Override public SpanHandle span(RoutingContext routingContext) { return buildSpan(spanBuilder, routingContext); @@ -180,13 +187,8 @@ public SpanHandle span(RoutingContext routingContext) { } @Override - public void kafkaConsumerConfig(Properties props) { - props.put(ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG, TracingConsumerInterceptor.class.getName()); - } - - @Override - public void kafkaProducerConfig(Properties props) { - props.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, ContextAwareTracingProducerInterceptor.class.getName()); + public void addTracingPropsToProducerConfig(Properties props) { + TracingUtil.addProperty(props, ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, ContextAwareTracingProducerInterceptor.class.getName()); } private static final class OTelSpanHandle implements SpanHandle { @@ -198,19 +200,29 @@ public OTelSpanHandle(Span span) { this.scope = span.makeCurrent(); } + /** + * See ContextAwareTracingProducerInterceptor for more info. + * + * @param record Kafka producer record to use as payload + */ @Override public void prepare(KafkaProducerRecord record) { String uuid = UUID.randomUUID().toString(); - spans.put(uuid, span); - record.addHeader(_UUID, uuid); + SPANS.put(uuid, span); + record.addHeader(X_UUID, uuid); } + /** + * See ContextAwareTracingProducerInterceptor for more info. + * + * @param record Kafka producer record to use as payload + */ @Override public void clean(KafkaProducerRecord record) { - Optional oh = record.headers().stream().filter(h -> h.key().equals(_UUID)).findFirst(); + Optional oh = record.headers().stream().filter(h -> h.key().equals(X_UUID)).findFirst(); oh.ifPresent(h -> { String uuid = h.value().toString(); - spans.remove(uuid); + SPANS.remove(uuid); }); } @@ -236,16 +248,27 @@ public void finish(int code) { } } - static final String _UUID = "_UUID"; - static final Map spans = new ConcurrentHashMap<>(); - + static final String X_UUID = "_UUID"; + static final Map SPANS = new ConcurrentHashMap<>(); + + /** + * This interceptor is a workaround for async message send. + * OpenTelemetry propagates current span via ThreadLocal, + * where we have an async send - different thread. + * So we need to pass-in the current span info via SPANS map. + * ProducerRecord is a bit abused as a payload / info carrier, + * it holds an unique UUID, which maps to current span in SPANS map. + * + * @param key type + * @param value type + */ public static class ContextAwareTracingProducerInterceptor extends TracingProducerInterceptor { @Override public ProducerRecord onSend(ProducerRecord record) { Headers headers = record.headers(); - String key = Buffer.buffer(headers.lastHeader(_UUID).value()).toString(); - headers.remove(_UUID); - Span span = spans.remove(key); + String key = Buffer.buffer(headers.lastHeader(X_UUID).value()).toString(); + headers.remove(X_UUID); + Span span = SPANS.remove(key); try (Scope ignored = span.makeCurrent()) { return super.onSend(record); } diff --git a/src/main/java/io/strimzi/kafka/bridge/tracing/OpenTracingHandle.java b/src/main/java/io/strimzi/kafka/bridge/tracing/OpenTracingHandle.java index 32c5de020..173d3dc3b 100644 --- a/src/main/java/io/strimzi/kafka/bridge/tracing/OpenTracingHandle.java +++ b/src/main/java/io/strimzi/kafka/bridge/tracing/OpenTracingHandle.java @@ -10,7 +10,7 @@ import io.opentracing.Span; import io.opentracing.SpanContext; import io.opentracing.Tracer; -import io.opentracing.contrib.kafka.TracingConsumerInterceptor; +import io.opentracing.contrib.kafka.TracingKafkaUtils; import io.opentracing.contrib.kafka.TracingProducerInterceptor; import io.opentracing.propagation.Format; import io.opentracing.propagation.TextMap; @@ -20,8 +20,8 @@ import io.strimzi.kafka.bridge.config.BridgeConfig; import io.vertx.core.http.HttpServerRequest; import io.vertx.ext.web.RoutingContext; +import io.vertx.kafka.client.consumer.KafkaConsumerRecord; import io.vertx.kafka.client.producer.KafkaProducerRecord; -import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.producer.ProducerConfig; import java.util.Iterator; @@ -36,6 +36,15 @@ */ class OpenTracingHandle implements TracingHandle { + private static Tracer tracer; + + private static Tracer getTracer() { + if (tracer == null) { + tracer = GlobalTracer.get(); + } + return tracer; + } + static void setCommonTags(Span span, RoutingContext routingContext) { Tags.COMPONENT.set(span, COMPONENT); Tags.PEER_SERVICE.set(span, KAFKA_SERVICE); @@ -50,11 +59,7 @@ public String envName() { @Override public String serviceName(BridgeConfig config) { - String serviceName = System.getenv(envName()); - if (serviceName == null) { - serviceName = config.getTracingServiceName(); - } - return serviceName; + return System.getenv(envName()); } @Override @@ -74,8 +79,22 @@ public SpanHandle span(RoutingContext routingContext, String operat return buildSpan(spanBuilder, routingContext); } + @SuppressWarnings("rawtypes") + @Override + public void handleRecordSpan(SpanHandle parentSpanHandle, KafkaConsumerRecord record) { + Tracer tracer = getTracer(); + Span span = ((OTSpanHandle) parentSpanHandle).span; + Tracer.SpanBuilder spanBuilder = tracer.buildSpan(TracingKafkaUtils.FROM_PREFIX + record.topic()) + .asChildOf(span).withTag(Tags.SPAN_KIND.getKey(), Tags.SPAN_KIND_CONSUMER); + SpanContext parentSpan = tracer.extract(Format.Builtin.TEXT_MAP, new TextMapAdapter(TracingUtil.toHeaders(record))); + if (parentSpan != null) { + spanBuilder.addReference(References.FOLLOWS_FROM, parentSpan); + } + spanBuilder.start().finish(); + } + private Tracer.SpanBuilder getSpanBuilder(RoutingContext rc, String operationName) { - Tracer tracer = GlobalTracer.get(); + Tracer tracer = getTracer(); SpanContext parentSpan = tracer.extract(Format.Builtin.HTTP_HEADERS, new RequestTextMap(rc.request())); return tracer.buildSpan(operationName).asChildOf(parentSpan); } @@ -111,15 +130,6 @@ public OTSpanBuilderHandle(Tracer.SpanBuilder spanBuilder) { this.spanBuilder = spanBuilder; } - @Override - public void addRef(Map headers) { - Tracer tracer = GlobalTracer.get(); - SpanContext parentSpan = tracer.extract(Format.Builtin.HTTP_HEADERS, new TextMapAdapter(headers)); - if (parentSpan != null) { - spanBuilder.addReference(References.FOLLOWS_FROM, parentSpan); - } - } - @Override public SpanHandle span(RoutingContext routingContext) { return buildSpan(spanBuilder, routingContext); @@ -127,13 +137,8 @@ public SpanHandle span(RoutingContext routingContext) { } @Override - public void kafkaConsumerConfig(Properties props) { - props.put(ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG, TracingConsumerInterceptor.class.getName()); - } - - @Override - public void kafkaProducerConfig(Properties props) { - props.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, TracingProducerInterceptor.class.getName()); + public void addTracingPropsToProducerConfig(Properties props) { + TracingUtil.addProperty(props, ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, TracingProducerInterceptor.class.getName()); } private static final class OTSpanHandle implements SpanHandle { @@ -145,7 +150,7 @@ public OTSpanHandle(Span span) { @Override public void inject(KafkaProducerRecord record) { - Tracer tracer = GlobalTracer.get(); + Tracer tracer = getTracer(); tracer.inject(span.context(), Format.Builtin.TEXT_MAP, new TextMap() { @Override public void put(String key, String value) { @@ -161,8 +166,8 @@ public Iterator> iterator() { @Override public void inject(RoutingContext routingContext) { - Tracer tracer = GlobalTracer.get(); - tracer.inject(span.context(), Format.Builtin.TEXT_MAP, new TextMap() { + Tracer tracer = getTracer(); + tracer.inject(span.context(), Format.Builtin.HTTP_HEADERS, new TextMap() { @Override public void put(String key, String value) { routingContext.response().headers().add(key, value); diff --git a/src/main/java/io/strimzi/kafka/bridge/tracing/SpanBuilderHandle.java b/src/main/java/io/strimzi/kafka/bridge/tracing/SpanBuilderHandle.java index daf2f62ab..996162d56 100644 --- a/src/main/java/io/strimzi/kafka/bridge/tracing/SpanBuilderHandle.java +++ b/src/main/java/io/strimzi/kafka/bridge/tracing/SpanBuilderHandle.java @@ -7,12 +7,15 @@ import io.vertx.ext.web.RoutingContext; -import java.util.Map; - /** - * Simple SpanBuilder handle. + * SpanBuilder handle - an abstraction over actual span builder implementation. */ public interface SpanBuilderHandle { - void addRef(Map headers); + /** + * Build span handle from underlying span builder implementation. + * + * @param routingContext Vert.x routing context + * @return the span handle + */ SpanHandle span(RoutingContext routingContext); } diff --git a/src/main/java/io/strimzi/kafka/bridge/tracing/SpanHandle.java b/src/main/java/io/strimzi/kafka/bridge/tracing/SpanHandle.java index 40e671c1c..c8ad09de4 100644 --- a/src/main/java/io/strimzi/kafka/bridge/tracing/SpanHandle.java +++ b/src/main/java/io/strimzi/kafka/bridge/tracing/SpanHandle.java @@ -9,16 +9,43 @@ import io.vertx.kafka.client.producer.KafkaProducerRecord; /** - * Simple Span handle. + * Span handle, an abstraction over actual span implementation. */ public interface SpanHandle { + /** + * Prepare Kafka producer record before async send. + * + * @param record Kafka producer record to use as payload + */ default void prepare(KafkaProducerRecord record) { } + /** + * Clean Kafka producer record after async send. + * + * @param record Kafka producer record used as payload + */ default void clean(KafkaProducerRecord record) { } + /** + * Inject tracing info into underlying span from Kafka producer record. + * + * @param record Kafka producer record to extract tracing info + */ void inject(KafkaProducerRecord record); + + /** + * Inject tracing info into underlying span from Vert.x routing context. + * + * @param routingContext Vert.x routing context to extract tracing info + */ void inject(RoutingContext routingContext); + + /** + * Finish underlying span. + * + * @param code response code + */ void finish(int code); } diff --git a/src/main/java/io/strimzi/kafka/bridge/tracing/TracingHandle.java b/src/main/java/io/strimzi/kafka/bridge/tracing/TracingHandle.java index 3ab981b16..ce804404e 100644 --- a/src/main/java/io/strimzi/kafka/bridge/tracing/TracingHandle.java +++ b/src/main/java/io/strimzi/kafka/bridge/tracing/TracingHandle.java @@ -7,6 +7,7 @@ import io.strimzi.kafka.bridge.config.BridgeConfig; import io.vertx.ext.web.RoutingContext; +import io.vertx.kafka.client.consumer.KafkaConsumerRecord; import java.util.Properties; @@ -14,13 +15,62 @@ * Simple interface to abstract tracing between legacy OpenTracing and new OpenTelemetry. */ public interface TracingHandle { + /** + * Tracing env var name. + * + * @return tracing env var name + */ String envName(); + + /** + * Extract service name from bridge confing. + * + * @param config the bridge config + * @return bridge's service name + */ String serviceName(BridgeConfig config); + + /** + * Initialize tracing. + */ void initialize(); + /** + * Build span builder handle. + * + * @param key type + * @param value type + * @param routingContext Vert.x rounting context + * @param operationName current operation name + * @return span builder handle + */ SpanBuilderHandle builder(RoutingContext routingContext, String operationName); + + /** + * Build span handle. + * + * @param key type + * @param value type + * @param routingContext Vert.x rounting context + * @param operationName current operation name + * @return span handle + */ SpanHandle span(RoutingContext routingContext, String operationName); - void kafkaConsumerConfig(Properties props); - void kafkaProducerConfig(Properties props); + /** + * Extract span info from Kafka consumer record. + * + * @param key type + * @param value type + * @param parentSpanHandle parent span handle + * @param record Kafka consumer record + */ + void handleRecordSpan(SpanHandle parentSpanHandle, KafkaConsumerRecord record); + + /** + * Add producer properties, if any. + * + * @param props the properties + */ + void addTracingPropsToProducerConfig(Properties props); } diff --git a/src/main/java/io/strimzi/kafka/bridge/tracing/TracingUtil.java b/src/main/java/io/strimzi/kafka/bridge/tracing/TracingUtil.java index 6f43ddd38..e22785638 100644 --- a/src/main/java/io/strimzi/kafka/bridge/tracing/TracingUtil.java +++ b/src/main/java/io/strimzi/kafka/bridge/tracing/TracingUtil.java @@ -6,11 +6,12 @@ package io.strimzi.kafka.bridge.tracing; import io.strimzi.kafka.bridge.config.BridgeConfig; -import io.vertx.ext.web.RoutingContext; -import io.vertx.kafka.client.producer.KafkaProducerRecord; +import io.vertx.kafka.client.consumer.KafkaConsumerRecord; +import io.vertx.kafka.client.producer.KafkaHeader; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.util.HashMap; import java.util.Map; import java.util.Properties; @@ -22,7 +23,7 @@ */ public class TracingUtil { private static final Logger log = LoggerFactory.getLogger(TracingUtil.class); - private static TracingHandle tracing = new NoopTracing(); + private static TracingHandle tracing = new NoopTracingHandle(); public static TracingHandle getTracing() { return tracing; @@ -49,62 +50,29 @@ public static void initialize(BridgeConfig config) { } } - private static final class NoopTracing implements TracingHandle { - @Override - public String envName() { - return null; - } - - @Override - public String serviceName(BridgeConfig config) { - return null; - } - - @Override - public void initialize() { - } - - @Override - public SpanBuilderHandle builder(RoutingContext routingContext, String operationName) { - return new NoopSpanBuilderHandle<>(); - } - - @Override - public SpanHandle span(RoutingContext routingContext, String operationName) { - return new NoopSpanHandle<>(); - } - - @Override - public void kafkaConsumerConfig(Properties props) { - } - - @Override - public void kafkaProducerConfig(Properties props) { - } - } - - private static final class NoopSpanBuilderHandle implements SpanBuilderHandle { - @Override - public void addRef(Map headers) { - } - - @Override - public SpanHandle span(RoutingContext routingContext) { - return new NoopSpanHandle<>(); + /** + * We are interested in tracing headers here, + * which are unique - single value per key. + * + * @param record Kafka consumer record + * @param key type + * @param value type + * @return map of headers + */ + public static Map toHeaders(KafkaConsumerRecord record) { + Map headers = new HashMap<>(); + for (KafkaHeader header : record.headers()) { + headers.put(header.key(), header.value().toString()); } + return headers; } - private static final class NoopSpanHandle implements SpanHandle { - @Override - public void inject(KafkaProducerRecord record) { - } - - @Override - public void inject(RoutingContext routingContext) { - } - - @Override - public void finish(int code) { + static void addProperty(Properties props, String key, String value) { + String previous = props.getProperty(key); + if (previous != null) { + props.setProperty(key, previous + "," + value); + } else { + props.setProperty(key, value); } } } diff --git a/src/test/java/io/strimzi/kafka/bridge/tracing/OpenTracingTest.java b/src/test/java/io/strimzi/kafka/bridge/tracing/OpenTracingTest.java new file mode 100644 index 000000000..27072f815 --- /dev/null +++ b/src/test/java/io/strimzi/kafka/bridge/tracing/OpenTracingTest.java @@ -0,0 +1,24 @@ +/* + * Copyright Strimzi authors. + * License: Apache License 2.0 (see the file LICENSE or http://apache.org/licenses/LICENSE-2.0.html). + */ + +package io.strimzi.kafka.bridge.tracing; + +import io.vertx.core.tracing.TracingOptions; +import io.vertx.tracing.opentracing.OpenTracingOptions; + +/** + * OpenTracing tests + * + * These env vars need to be set: + * * JAEGER_SERVICE_NAME=ot_kafka_bridge_test + * * JAEGER_SAMPLER_TYPE=const + * * JAEGER_SAMPLER_PARAM=1 + */ +public class OpenTracingTest extends TracingTestBase { + @Override + protected TracingOptions tracingOptions() { + return new OpenTracingOptions(); + } +} From d656f53dc81e3fc2f316b9d85889920f231bab78 Mon Sep 17 00:00:00 2001 From: Ales Justin Date: Mon, 11 Jul 2022 12:15:03 +0200 Subject: [PATCH 04/14] Upgrade libs, and consume test Remove OTel span propagation hack. Simplify executor service adapt. Signed-off-by: Ales Justin --- config/application.properties | 4 +- pom.xml | 27 ++--- .../io/strimzi/kafka/bridge/Application.java | 6 +- .../config/BridgeExecutorServiceFactory.java | 103 ++++++++++++++++++ .../bridge/http/HttpSourceBridgeEndpoint.java | 8 +- .../bridge/tracing/NoopTracingHandle.java | 2 +- .../bridge/tracing/OpenTelemetryHandle.java | 77 +++---------- .../bridge/tracing/OpenTracingHandle.java | 4 +- .../kafka/bridge/tracing/SpanHandle.java | 16 --- .../bridge/tracing/TracingConstants.java | 2 - .../kafka/bridge/tracing/TracingHandle.java | 18 ++- .../kafka/bridge/tracing/TracingUtil.java | 6 +- .../bridge/tracing/OpenTelemetryTest.java | 1 + .../kafka/bridge/tracing/TracingTestBase.java | 49 ++++++++- 14 files changed, 208 insertions(+), 115 deletions(-) create mode 100644 src/main/java/io/strimzi/kafka/bridge/config/BridgeExecutorServiceFactory.java diff --git a/config/application.properties b/config/application.properties index 385ea6c76..05ba271ce 100644 --- a/config/application.properties +++ b/config/application.properties @@ -2,9 +2,9 @@ bridge.id=my-bridge # uncomment one the following lines (bridge.tracing) to enable Jaeger tracing, check the documentation how to configure the tracer # OpenTracing support -bridge.tracing=jaeger +#bridge.tracing=jaeger # OpenTelemetry support -#bridge.tracing=opentelemetry +bridge.tracing=opentelemetry #Apache Kafka common kafka.bootstrap.servers=localhost:9092 diff --git a/pom.xml b/pom.xml index 618f24839..1d7c7c582 100644 --- a/pom.xml +++ b/pom.xml @@ -110,9 +110,9 @@ 1.8.1 0.33.0 0.1.15 - 1.9.0-alpha - 1.9.0 - 1.44.0 + 1.15.0-alpha + 1.15.0 + 1.47.0 1.3.9 0.12.0 0.7.0 @@ -273,47 +273,47 @@ io.opentelemetry opentelemetry-sdk-extension-autoconfigure - ${opentelemetry.version} + ${opentelemetry.alpha-version} io.opentelemetry opentelemetry-api - ${opentelemetry-stable.version} + ${opentelemetry.version} io.opentelemetry opentelemetry-context - ${opentelemetry-stable.version} + ${opentelemetry.version} io.opentelemetry opentelemetry-semconv - ${opentelemetry.version} + ${opentelemetry.alpha-version} io.opentelemetry opentelemetry-sdk-trace - ${opentelemetry-stable.version} + ${opentelemetry.version} io.opentelemetry.instrumentation opentelemetry-kafka-clients-2.6 - ${opentelemetry.version} + ${opentelemetry.alpha-version} io.opentelemetry.instrumentation - opentelemetry-instrumentation-api - ${opentelemetry.version} + opentelemetry-instrumentation-api-semconv + ${opentelemetry.alpha-version} io.opentelemetry.instrumentation opentelemetry-kafka-clients-common - ${opentelemetry.version} + ${opentelemetry.alpha-version} io.opentelemetry opentelemetry-exporter-jaeger - ${opentelemetry-stable.version} + ${opentelemetry.version} @@ -568,6 +568,7 @@ io.opentelemetry:opentelemetry-sdk-trace io.opentelemetry.instrumentation:opentelemetry-kafka-clients-common + io.opentelemetry.instrumentation:opentelemetry-instrumentation-api io.opentelemetry:opentelemetry-exporter-jaeger io.grpc:grpc-netty-shaded diff --git a/src/main/java/io/strimzi/kafka/bridge/Application.java b/src/main/java/io/strimzi/kafka/bridge/Application.java index 2249af991..9bcd62ef0 100644 --- a/src/main/java/io/strimzi/kafka/bridge/Application.java +++ b/src/main/java/io/strimzi/kafka/bridge/Application.java @@ -8,6 +8,7 @@ import io.micrometer.core.instrument.MeterRegistry; import io.strimzi.kafka.bridge.amqp.AmqpBridge; import io.strimzi.kafka.bridge.config.BridgeConfig; +import io.strimzi.kafka.bridge.config.BridgeExecutorServiceFactory; import io.strimzi.kafka.bridge.http.HttpBridge; import io.strimzi.kafka.bridge.tracing.TracingUtil; import io.vertx.config.ConfigRetriever; @@ -18,6 +19,7 @@ import io.vertx.core.Promise; import io.vertx.core.Vertx; import io.vertx.core.VertxOptions; +import io.vertx.core.impl.VertxBuilder; import io.vertx.core.json.JsonObject; import io.vertx.micrometer.Label; import io.vertx.micrometer.MetricsDomain; @@ -72,7 +74,9 @@ public static void main(String[] args) { vertxOptions.setMetricsOptions(metricsOptions()); jmxCollectorRegistry = getJmxCollectorRegistry(); } - Vertx vertx = Vertx.vertx(vertxOptions); + VertxBuilder vertxBuilder = new VertxBuilder(vertxOptions) + .executorServiceFactory(new BridgeExecutorServiceFactory()); + Vertx vertx = vertxBuilder.init().vertx(); // MeterRegistry default instance is just null if metrics are not enabled in the VertxOptions instance MeterRegistry meterRegistry = BackendRegistries.getDefaultNow(); MetricsReporter metricsReporter = new MetricsReporter(jmxCollectorRegistry, meterRegistry); diff --git a/src/main/java/io/strimzi/kafka/bridge/config/BridgeExecutorServiceFactory.java b/src/main/java/io/strimzi/kafka/bridge/config/BridgeExecutorServiceFactory.java new file mode 100644 index 000000000..cfd520446 --- /dev/null +++ b/src/main/java/io/strimzi/kafka/bridge/config/BridgeExecutorServiceFactory.java @@ -0,0 +1,103 @@ +/* + * Copyright Strimzi authors. + * License: Apache License 2.0 (see the file LICENSE or http://apache.org/licenses/LICENSE-2.0.html). + */ + +package io.strimzi.kafka.bridge.config; + +import io.strimzi.kafka.bridge.tracing.TracingUtil; +import io.vertx.core.spi.ExecutorServiceFactory; + +import java.util.Collection; +import java.util.List; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Future; +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; + +/** + * Custom Vertx ExecutorServiceFactory - delegate to tracing impl to provide one. + * Initially it could be NoopTracingHandle that provides it - before Application is fully initialized, + * then it should be actual tracing implementation - if there is one. + * Shutdown should be done OK, since tracing delegate will also delegate shutdown to original, + * or original itself will be used. + */ +public class BridgeExecutorServiceFactory implements ExecutorServiceFactory { + @Override + public ExecutorService createExecutor(ThreadFactory threadFactory, Integer concurrency, Integer maxConcurrency) { + ExecutorService original = ExecutorServiceFactory.INSTANCE.createExecutor(threadFactory, concurrency, maxConcurrency); + return new ExecutorService() { + private ExecutorService delegate() { + return TracingUtil.getTracing().adapt(original); + } + + @Override + public void shutdown() { + delegate().shutdown(); + } + + @Override + public List shutdownNow() { + return delegate().shutdownNow(); + } + + @Override + public boolean isShutdown() { + return delegate().isShutdown(); + } + + @Override + public boolean isTerminated() { + return delegate().isTerminated(); + } + + @Override + public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException { + return delegate().awaitTermination(timeout, unit); + } + + @Override + public Future submit(Callable task) { + return delegate().submit(task); + } + + @Override + public Future submit(Runnable task, T result) { + return delegate().submit(task, result); + } + + @Override + public Future submit(Runnable task) { + return delegate().submit(task); + } + + @Override + public List> invokeAll(Collection> tasks) throws InterruptedException { + return delegate().invokeAll(tasks); + } + + @Override + public List> invokeAll(Collection> tasks, long timeout, TimeUnit unit) throws InterruptedException { + return delegate().invokeAll(tasks, timeout, unit); + } + + @Override + public T invokeAny(Collection> tasks) throws InterruptedException, ExecutionException { + return delegate().invokeAny(tasks); + } + + @Override + public T invokeAny(Collection> tasks, long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException { + return delegate().invokeAny(tasks, timeout, unit); + } + + @Override + public void execute(Runnable command) { + delegate().execute(command); + } + }; + } +} diff --git a/src/main/java/io/strimzi/kafka/bridge/http/HttpSourceBridgeEndpoint.java b/src/main/java/io/strimzi/kafka/bridge/http/HttpSourceBridgeEndpoint.java index c5bf30240..c4db356ce 100644 --- a/src/main/java/io/strimzi/kafka/bridge/http/HttpSourceBridgeEndpoint.java +++ b/src/main/java/io/strimzi/kafka/bridge/http/HttpSourceBridgeEndpoint.java @@ -121,10 +121,7 @@ public void handle(Endpoint endpoint) { if (isAsync) { // if async is specified, return immediately once records are sent for (KafkaProducerRecord record : records) { - span.prepare(record); - Promise promise = Promise.promise(); - promise.future().onComplete(ar -> span.clean(record)); - this.send(record, promise); + this.send(record, null); } span.finish(HttpResponseStatus.NO_CONTENT.code()); HttpUtils.sendResponse(routingContext, HttpResponseStatus.NO_CONTENT.code(), @@ -136,9 +133,8 @@ public void handle(Endpoint endpoint) { List sendHandlers = new ArrayList<>(records.size()); for (KafkaProducerRecord record : records) { Promise promise = Promise.promise(); - Future future = promise.future().onComplete(ar -> span.clean(record)); + Future future = promise.future(); sendHandlers.add(future); - span.prepare(record); this.send(record, promise); } diff --git a/src/main/java/io/strimzi/kafka/bridge/tracing/NoopTracingHandle.java b/src/main/java/io/strimzi/kafka/bridge/tracing/NoopTracingHandle.java index 638c44c07..8e19ede4d 100644 --- a/src/main/java/io/strimzi/kafka/bridge/tracing/NoopTracingHandle.java +++ b/src/main/java/io/strimzi/kafka/bridge/tracing/NoopTracingHandle.java @@ -14,7 +14,7 @@ final class NoopTracingHandle implements TracingHandle { @Override - public String envName() { + public String envServiceName() { return null; } diff --git a/src/main/java/io/strimzi/kafka/bridge/tracing/OpenTelemetryHandle.java b/src/main/java/io/strimzi/kafka/bridge/tracing/OpenTelemetryHandle.java index b1911114b..69cfa0319 100644 --- a/src/main/java/io/strimzi/kafka/bridge/tracing/OpenTelemetryHandle.java +++ b/src/main/java/io/strimzi/kafka/bridge/tracing/OpenTelemetryHandle.java @@ -23,20 +23,14 @@ import io.opentelemetry.sdk.autoconfigure.AutoConfiguredOpenTelemetrySdk; import io.opentelemetry.semconv.trace.attributes.SemanticAttributes; import io.strimzi.kafka.bridge.config.BridgeConfig; -import io.vertx.core.buffer.Buffer; import io.vertx.ext.web.RoutingContext; import io.vertx.kafka.client.consumer.KafkaConsumerRecord; -import io.vertx.kafka.client.producer.KafkaHeader; import io.vertx.kafka.client.producer.KafkaProducerRecord; import org.apache.kafka.clients.producer.ProducerConfig; -import org.apache.kafka.clients.producer.ProducerRecord; -import org.apache.kafka.common.header.Headers; import java.util.Map; -import java.util.Optional; import java.util.Properties; -import java.util.UUID; -import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ExecutorService; import static io.strimzi.kafka.bridge.tracing.TracingConstants.COMPONENT; import static io.strimzi.kafka.bridge.tracing.TracingConstants.JAEGER; @@ -52,6 +46,7 @@ class OpenTelemetryHandle implements TracingHandle { private Tracer tracer; + private ExecutorService service; static void setCommonAttributes(SpanBuilder builder, RoutingContext routingContext) { builder.setAttribute(SemanticAttributes.PEER_SERVICE, KAFKA_SERVICE); @@ -60,13 +55,13 @@ static void setCommonAttributes(SpanBuilder builder, RoutingContext routingConte } @Override - public String envName() { + public String envServiceName() { return OPENTELEMETRY_SERVICE_NAME_ENV_KEY; } @Override public String serviceName(BridgeConfig config) { - String serviceName = System.getenv(envName()); + String serviceName = System.getenv(envServiceName()); if (serviceName == null) { // legacy purpose, use previous JAEGER_SERVICE_NAME as OTEL_SERVICE_NAME (if not explicitly set) serviceName = System.getenv(Configuration.JAEGER_SERVICE_NAME); @@ -84,9 +79,18 @@ public String serviceName(BridgeConfig config) { @Override public void initialize() { + System.setProperty("otel.metrics.exporter", "none"); // disable metrics AutoConfiguredOpenTelemetrySdk.initialize(); } + @Override + public synchronized ExecutorService adapt(ExecutorService provided) { + if (service == null) { + service = Context.taskWrapping(provided); + } + return service; + } + private Tracer get() { if (tracer == null) { tracer = GlobalOpenTelemetry.getTracer(COMPONENT); @@ -188,7 +192,7 @@ public SpanHandle span(RoutingContext routingContext) { @Override public void addTracingPropsToProducerConfig(Properties props) { - TracingUtil.addProperty(props, ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, ContextAwareTracingProducerInterceptor.class.getName()); + TracingUtil.addProperty(props, ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, TracingProducerInterceptor.class.getName()); } private static final class OTelSpanHandle implements SpanHandle { @@ -200,32 +204,6 @@ public OTelSpanHandle(Span span) { this.scope = span.makeCurrent(); } - /** - * See ContextAwareTracingProducerInterceptor for more info. - * - * @param record Kafka producer record to use as payload - */ - @Override - public void prepare(KafkaProducerRecord record) { - String uuid = UUID.randomUUID().toString(); - SPANS.put(uuid, span); - record.addHeader(X_UUID, uuid); - } - - /** - * See ContextAwareTracingProducerInterceptor for more info. - * - * @param record Kafka producer record to use as payload - */ - @Override - public void clean(KafkaProducerRecord record) { - Optional oh = record.headers().stream().filter(h -> h.key().equals(X_UUID)).findFirst(); - oh.ifPresent(h -> { - String uuid = h.value().toString(); - SPANS.remove(uuid); - }); - } - @Override public void inject(KafkaProducerRecord record) { propagator().inject(Context.current(), record, KafkaProducerRecord::addHeader); @@ -247,31 +225,4 @@ public void finish(int code) { } } } - - static final String X_UUID = "_UUID"; - static final Map SPANS = new ConcurrentHashMap<>(); - - /** - * This interceptor is a workaround for async message send. - * OpenTelemetry propagates current span via ThreadLocal, - * where we have an async send - different thread. - * So we need to pass-in the current span info via SPANS map. - * ProducerRecord is a bit abused as a payload / info carrier, - * it holds an unique UUID, which maps to current span in SPANS map. - * - * @param key type - * @param value type - */ - public static class ContextAwareTracingProducerInterceptor extends TracingProducerInterceptor { - @Override - public ProducerRecord onSend(ProducerRecord record) { - Headers headers = record.headers(); - String key = Buffer.buffer(headers.lastHeader(X_UUID).value()).toString(); - headers.remove(X_UUID); - Span span = SPANS.remove(key); - try (Scope ignored = span.makeCurrent()) { - return super.onSend(record); - } - } - } } diff --git a/src/main/java/io/strimzi/kafka/bridge/tracing/OpenTracingHandle.java b/src/main/java/io/strimzi/kafka/bridge/tracing/OpenTracingHandle.java index 173d3dc3b..e3d225ec2 100644 --- a/src/main/java/io/strimzi/kafka/bridge/tracing/OpenTracingHandle.java +++ b/src/main/java/io/strimzi/kafka/bridge/tracing/OpenTracingHandle.java @@ -53,13 +53,13 @@ static void setCommonTags(Span span, RoutingContext routingContext) { } @Override - public String envName() { + public String envServiceName() { return Configuration.JAEGER_SERVICE_NAME; } @Override public String serviceName(BridgeConfig config) { - return System.getenv(envName()); + return System.getenv(envServiceName()); } @Override diff --git a/src/main/java/io/strimzi/kafka/bridge/tracing/SpanHandle.java b/src/main/java/io/strimzi/kafka/bridge/tracing/SpanHandle.java index c8ad09de4..c0cfdf092 100644 --- a/src/main/java/io/strimzi/kafka/bridge/tracing/SpanHandle.java +++ b/src/main/java/io/strimzi/kafka/bridge/tracing/SpanHandle.java @@ -12,22 +12,6 @@ * Span handle, an abstraction over actual span implementation. */ public interface SpanHandle { - /** - * Prepare Kafka producer record before async send. - * - * @param record Kafka producer record to use as payload - */ - default void prepare(KafkaProducerRecord record) { - } - - /** - * Clean Kafka producer record after async send. - * - * @param record Kafka producer record used as payload - */ - default void clean(KafkaProducerRecord record) { - } - /** * Inject tracing info into underlying span from Kafka producer record. * diff --git a/src/main/java/io/strimzi/kafka/bridge/tracing/TracingConstants.java b/src/main/java/io/strimzi/kafka/bridge/tracing/TracingConstants.java index 1dfde779b..4e452c851 100644 --- a/src/main/java/io/strimzi/kafka/bridge/tracing/TracingConstants.java +++ b/src/main/java/io/strimzi/kafka/bridge/tracing/TracingConstants.java @@ -13,8 +13,6 @@ public final class TracingConstants { public static final String KAFKA_SERVICE = "kafka"; public static final String JAEGER = "jaeger"; - - public static final String JAEGER_OPENTRACING = JAEGER; public static final String OPENTELEMETRY = "opentelemetry"; public static final String OPENTELEMETRY_SERVICE_NAME_ENV_KEY = "OTEL_SERVICE_NAME"; diff --git a/src/main/java/io/strimzi/kafka/bridge/tracing/TracingHandle.java b/src/main/java/io/strimzi/kafka/bridge/tracing/TracingHandle.java index ce804404e..a88558dc8 100644 --- a/src/main/java/io/strimzi/kafka/bridge/tracing/TracingHandle.java +++ b/src/main/java/io/strimzi/kafka/bridge/tracing/TracingHandle.java @@ -10,17 +10,18 @@ import io.vertx.kafka.client.consumer.KafkaConsumerRecord; import java.util.Properties; +import java.util.concurrent.ExecutorService; /** * Simple interface to abstract tracing between legacy OpenTracing and new OpenTelemetry. */ public interface TracingHandle { /** - * Tracing env var name. + * Tracing env var service name. * - * @return tracing env var name + * @return tracing env var service name */ - String envName(); + String envServiceName(); /** * Extract service name from bridge confing. @@ -35,6 +36,17 @@ public interface TracingHandle { */ void initialize(); + /** + * Adapt executor service if needed. + * Else return service parameter instance. + * + * @param service current executor service + * @return adapted executor service or service parameter instance + */ + default ExecutorService adapt(ExecutorService service) { + return service; + } + /** * Build span builder handle. * diff --git a/src/main/java/io/strimzi/kafka/bridge/tracing/TracingUtil.java b/src/main/java/io/strimzi/kafka/bridge/tracing/TracingUtil.java index e22785638..8e7746c4d 100644 --- a/src/main/java/io/strimzi/kafka/bridge/tracing/TracingUtil.java +++ b/src/main/java/io/strimzi/kafka/bridge/tracing/TracingUtil.java @@ -15,7 +15,7 @@ import java.util.Map; import java.util.Properties; -import static io.strimzi.kafka.bridge.tracing.TracingConstants.JAEGER_OPENTRACING; +import static io.strimzi.kafka.bridge.tracing.TracingConstants.JAEGER; import static io.strimzi.kafka.bridge.tracing.TracingConstants.OPENTELEMETRY; /** @@ -31,7 +31,7 @@ public static TracingHandle getTracing() { public static void initialize(BridgeConfig config) { String tracingConfig = config.getTracing(); - if (tracingConfig != null && (tracingConfig.equals(JAEGER_OPENTRACING) || tracingConfig.equals(OPENTELEMETRY))) { + if (tracingConfig != null && (tracingConfig.equals(JAEGER) || tracingConfig.equals(OPENTELEMETRY))) { boolean isOpenTelemetry = OPENTELEMETRY.equals(tracingConfig); TracingHandle instance = isOpenTelemetry ? new OpenTelemetryHandle() : new OpenTracingHandle(); @@ -45,7 +45,7 @@ public static void initialize(BridgeConfig config) { instance.initialize(); tracing = instance; } else { - log.error("Tracing config cannot be initialized because {} environment variable is not defined", instance.envName()); + log.error("Tracing config cannot be initialized because {} environment variable is not defined", instance.envServiceName()); } } } diff --git a/src/test/java/io/strimzi/kafka/bridge/tracing/OpenTelemetryTest.java b/src/test/java/io/strimzi/kafka/bridge/tracing/OpenTelemetryTest.java index 0543e0b27..9956288a6 100644 --- a/src/test/java/io/strimzi/kafka/bridge/tracing/OpenTelemetryTest.java +++ b/src/test/java/io/strimzi/kafka/bridge/tracing/OpenTelemetryTest.java @@ -20,6 +20,7 @@ public class OpenTelemetryTest extends TracingTestBase { protected TracingOptions tracingOptions() { System.setProperty(OPENTELEMETRY_TRACES_EXPORTER_PROPERTY_KEY, JAEGER); System.setProperty(OPENTELEMETRY_SERVICE_NAME_PROPERTY_KEY, "strimzi-kafka-bridge-test"); + System.setProperty("otel.metrics.exporter", "none"); // disable metrics return new OpenTelemetryOptions(); } } diff --git a/src/test/java/io/strimzi/kafka/bridge/tracing/TracingTestBase.java b/src/test/java/io/strimzi/kafka/bridge/tracing/TracingTestBase.java index c149d376c..179344ef1 100644 --- a/src/test/java/io/strimzi/kafka/bridge/tracing/TracingTestBase.java +++ b/src/test/java/io/strimzi/kafka/bridge/tracing/TracingTestBase.java @@ -7,6 +7,7 @@ import io.netty.handler.codec.http.HttpResponseStatus; import io.strimzi.kafka.bridge.BridgeContentType; +import io.strimzi.kafka.bridge.http.services.ConsumerService; import io.strimzi.kafka.bridge.http.services.ProducerService; import io.strimzi.kafka.bridge.utils.Urls; import io.vertx.core.AsyncResult; @@ -20,8 +21,10 @@ import io.vertx.ext.web.client.HttpResponse; import io.vertx.ext.web.client.WebClient; import io.vertx.ext.web.client.WebClientOptions; +import io.vertx.ext.web.codec.BodyCodec; import io.vertx.junit5.VertxExtension; import io.vertx.junit5.VertxTestContext; +import org.hamcrest.CoreMatchers; import org.junit.jupiter.api.Assumptions; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; @@ -30,6 +33,9 @@ import org.slf4j.LoggerFactory; import java.net.URL; +import java.util.UUID; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.TimeUnit; import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.is; @@ -72,10 +78,10 @@ public void setUp() { protected abstract TracingOptions tracingOptions(); @Test - public void testSmoke(VertxTestContext context) { + public void testSmoke(VertxTestContext context) throws Exception { Vertx vertx = Vertx.vertx(new VertxOptions().setTracingOptions(tracingOptions())); - WebClient client = WebClient.create(vertx, (WebClientOptions) new WebClientOptions() + WebClient client = WebClient.create(vertx, new WebClientOptions() .setDefaultHost(Urls.BRIDGE_HOST) .setDefaultPort(Urls.BRIDGE_PORT) .setTracingPolicy(TracingPolicy.ALWAYS) @@ -91,8 +97,45 @@ public void testSmoke(VertxTestContext context) { JsonObject root = new JsonObject(); root.put("records", records); + String topicName = "mytopic"; + ProducerService.getInstance(client) - .sendRecordsRequest("mytopic", root, BridgeContentType.KAFKA_JSON_JSON) + .sendRecordsRequest(topicName, root, BridgeContentType.KAFKA_JSON_JSON) .sendJsonObject(root, verifyOK(context)); + + ConsumerService consumerService = ConsumerService.getInstance(client); + + // create consumer + // subscribe to a topic + + String consumerName = "my-consumer"; + String groupId = UUID.randomUUID().toString(); + + JsonObject consumerJson = new JsonObject() + .put("name", consumerName) + .put("format", "json"); + + consumerService + .createConsumer(context, groupId, consumerJson) + .subscribeConsumer(context, groupId, consumerName, topicName); + + CompletableFuture consume = new CompletableFuture<>(); + // consume records + consumerService + .consumeRecordsRequest(groupId, consumerName, BridgeContentType.KAFKA_JSON_JSON) + .as(BodyCodec.jsonArray()) + .send(ar -> { + context.verify(() -> { + assertThat(ar.succeeded(), CoreMatchers.is(true)); + HttpResponse response = ar.result(); + assertThat(response.statusCode(), CoreMatchers.is(HttpResponseStatus.OK.code())); + }); + consume.complete(true); + }); + + consume.get(60, TimeUnit.SECONDS); + + // consumer deletion + consumerService.deleteConsumer(context, groupId, consumerName); } } From 15988dbc75142c3e69287fae8c2e1c59d41132fa Mon Sep 17 00:00:00 2001 From: Ales Justin Date: Wed, 13 Jul 2022 11:46:49 +0200 Subject: [PATCH 05/14] Remove OTel executor service workaround. Signed-off-by: Ales Justin --- pom.xml | 11 +- .../io/strimzi/kafka/bridge/Application.java | 6 +- .../config/BridgeExecutorServiceFactory.java | 103 ------------------ 3 files changed, 6 insertions(+), 114 deletions(-) delete mode 100644 src/main/java/io/strimzi/kafka/bridge/config/BridgeExecutorServiceFactory.java diff --git a/pom.xml b/pom.xml index 1d7c7c582..04986ed92 100644 --- a/pom.xml +++ b/pom.xml @@ -315,6 +315,11 @@ opentelemetry-exporter-jaeger ${opentelemetry.version} + + io.vertx + vertx-opentelemetry + ${vertx.version} + io.grpc @@ -409,12 +414,6 @@ ${vertx.version} test - - io.vertx - vertx-opentelemetry - ${vertx.version} - test - org.mockito mockito-core diff --git a/src/main/java/io/strimzi/kafka/bridge/Application.java b/src/main/java/io/strimzi/kafka/bridge/Application.java index 9bcd62ef0..2249af991 100644 --- a/src/main/java/io/strimzi/kafka/bridge/Application.java +++ b/src/main/java/io/strimzi/kafka/bridge/Application.java @@ -8,7 +8,6 @@ import io.micrometer.core.instrument.MeterRegistry; import io.strimzi.kafka.bridge.amqp.AmqpBridge; import io.strimzi.kafka.bridge.config.BridgeConfig; -import io.strimzi.kafka.bridge.config.BridgeExecutorServiceFactory; import io.strimzi.kafka.bridge.http.HttpBridge; import io.strimzi.kafka.bridge.tracing.TracingUtil; import io.vertx.config.ConfigRetriever; @@ -19,7 +18,6 @@ import io.vertx.core.Promise; import io.vertx.core.Vertx; import io.vertx.core.VertxOptions; -import io.vertx.core.impl.VertxBuilder; import io.vertx.core.json.JsonObject; import io.vertx.micrometer.Label; import io.vertx.micrometer.MetricsDomain; @@ -74,9 +72,7 @@ public static void main(String[] args) { vertxOptions.setMetricsOptions(metricsOptions()); jmxCollectorRegistry = getJmxCollectorRegistry(); } - VertxBuilder vertxBuilder = new VertxBuilder(vertxOptions) - .executorServiceFactory(new BridgeExecutorServiceFactory()); - Vertx vertx = vertxBuilder.init().vertx(); + Vertx vertx = Vertx.vertx(vertxOptions); // MeterRegistry default instance is just null if metrics are not enabled in the VertxOptions instance MeterRegistry meterRegistry = BackendRegistries.getDefaultNow(); MetricsReporter metricsReporter = new MetricsReporter(jmxCollectorRegistry, meterRegistry); diff --git a/src/main/java/io/strimzi/kafka/bridge/config/BridgeExecutorServiceFactory.java b/src/main/java/io/strimzi/kafka/bridge/config/BridgeExecutorServiceFactory.java deleted file mode 100644 index cfd520446..000000000 --- a/src/main/java/io/strimzi/kafka/bridge/config/BridgeExecutorServiceFactory.java +++ /dev/null @@ -1,103 +0,0 @@ -/* - * Copyright Strimzi authors. - * License: Apache License 2.0 (see the file LICENSE or http://apache.org/licenses/LICENSE-2.0.html). - */ - -package io.strimzi.kafka.bridge.config; - -import io.strimzi.kafka.bridge.tracing.TracingUtil; -import io.vertx.core.spi.ExecutorServiceFactory; - -import java.util.Collection; -import java.util.List; -import java.util.concurrent.Callable; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Future; -import java.util.concurrent.ThreadFactory; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.TimeoutException; - -/** - * Custom Vertx ExecutorServiceFactory - delegate to tracing impl to provide one. - * Initially it could be NoopTracingHandle that provides it - before Application is fully initialized, - * then it should be actual tracing implementation - if there is one. - * Shutdown should be done OK, since tracing delegate will also delegate shutdown to original, - * or original itself will be used. - */ -public class BridgeExecutorServiceFactory implements ExecutorServiceFactory { - @Override - public ExecutorService createExecutor(ThreadFactory threadFactory, Integer concurrency, Integer maxConcurrency) { - ExecutorService original = ExecutorServiceFactory.INSTANCE.createExecutor(threadFactory, concurrency, maxConcurrency); - return new ExecutorService() { - private ExecutorService delegate() { - return TracingUtil.getTracing().adapt(original); - } - - @Override - public void shutdown() { - delegate().shutdown(); - } - - @Override - public List shutdownNow() { - return delegate().shutdownNow(); - } - - @Override - public boolean isShutdown() { - return delegate().isShutdown(); - } - - @Override - public boolean isTerminated() { - return delegate().isTerminated(); - } - - @Override - public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException { - return delegate().awaitTermination(timeout, unit); - } - - @Override - public Future submit(Callable task) { - return delegate().submit(task); - } - - @Override - public Future submit(Runnable task, T result) { - return delegate().submit(task, result); - } - - @Override - public Future submit(Runnable task) { - return delegate().submit(task); - } - - @Override - public List> invokeAll(Collection> tasks) throws InterruptedException { - return delegate().invokeAll(tasks); - } - - @Override - public List> invokeAll(Collection> tasks, long timeout, TimeUnit unit) throws InterruptedException { - return delegate().invokeAll(tasks, timeout, unit); - } - - @Override - public T invokeAny(Collection> tasks) throws InterruptedException, ExecutionException { - return delegate().invokeAny(tasks); - } - - @Override - public T invokeAny(Collection> tasks, long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException { - return delegate().invokeAny(tasks, timeout, unit); - } - - @Override - public void execute(Runnable command) { - delegate().execute(command); - } - }; - } -} From 7c7d75910b9698baeaf842c66fd627592256c468 Mon Sep 17 00:00:00 2001 From: Ales Justin Date: Wed, 13 Jul 2022 11:50:36 +0200 Subject: [PATCH 06/14] Add javadoc Signed-off-by: Ales Justin --- .../io/strimzi/kafka/bridge/tracing/OpenTelemetryHandle.java | 3 +++ 1 file changed, 3 insertions(+) diff --git a/src/main/java/io/strimzi/kafka/bridge/tracing/OpenTelemetryHandle.java b/src/main/java/io/strimzi/kafka/bridge/tracing/OpenTelemetryHandle.java index 69cfa0319..5b2de5e7c 100644 --- a/src/main/java/io/strimzi/kafka/bridge/tracing/OpenTelemetryHandle.java +++ b/src/main/java/io/strimzi/kafka/bridge/tracing/OpenTelemetryHandle.java @@ -42,6 +42,9 @@ /** * OpenTelemetry implementation of Tracing. + * + * Note: we use Vert.x OpenTelemetry extension to setup custom ContextStorageProvider: + * @see io.vertx.tracing.opentelemetry.VertxContextStorageProvider */ class OpenTelemetryHandle implements TracingHandle { From 576cdaa693e0087a1b1d5befebef6bd1097aa173 Mon Sep 17 00:00:00 2001 From: Ales Justin Date: Tue, 26 Jul 2022 10:34:29 +0200 Subject: [PATCH 07/14] Remove code setup of required config via env vars / sys props. Signed-off-by: Ales Justin --- config/application.properties | 2 +- .../bridge/tracing/OpenTelemetryHandle.java | 20 +------------------ 2 files changed, 2 insertions(+), 20 deletions(-) diff --git a/config/application.properties b/config/application.properties index 05ba271ce..dd32e3a9b 100644 --- a/config/application.properties +++ b/config/application.properties @@ -4,7 +4,7 @@ bridge.id=my-bridge # OpenTracing support #bridge.tracing=jaeger # OpenTelemetry support -bridge.tracing=opentelemetry +#bridge.tracing=opentelemetry #Apache Kafka common kafka.bootstrap.servers=localhost:9092 diff --git a/src/main/java/io/strimzi/kafka/bridge/tracing/OpenTelemetryHandle.java b/src/main/java/io/strimzi/kafka/bridge/tracing/OpenTelemetryHandle.java index 5b2de5e7c..cfec96521 100644 --- a/src/main/java/io/strimzi/kafka/bridge/tracing/OpenTelemetryHandle.java +++ b/src/main/java/io/strimzi/kafka/bridge/tracing/OpenTelemetryHandle.java @@ -5,7 +5,6 @@ package io.strimzi.kafka.bridge.tracing; -import io.jaegertracing.Configuration; import io.netty.handler.codec.http.HttpResponseStatus; import io.opentelemetry.api.GlobalOpenTelemetry; import io.opentelemetry.api.trace.Span; @@ -33,12 +32,8 @@ import java.util.concurrent.ExecutorService; import static io.strimzi.kafka.bridge.tracing.TracingConstants.COMPONENT; -import static io.strimzi.kafka.bridge.tracing.TracingConstants.JAEGER; import static io.strimzi.kafka.bridge.tracing.TracingConstants.KAFKA_SERVICE; import static io.strimzi.kafka.bridge.tracing.TracingConstants.OPENTELEMETRY_SERVICE_NAME_ENV_KEY; -import static io.strimzi.kafka.bridge.tracing.TracingConstants.OPENTELEMETRY_SERVICE_NAME_PROPERTY_KEY; -import static io.strimzi.kafka.bridge.tracing.TracingConstants.OPENTELEMETRY_TRACES_EXPORTER_ENV_KEY; -import static io.strimzi.kafka.bridge.tracing.TracingConstants.OPENTELEMETRY_TRACES_EXPORTER_PROPERTY_KEY; /** * OpenTelemetry implementation of Tracing. @@ -64,20 +59,7 @@ public String envServiceName() { @Override public String serviceName(BridgeConfig config) { - String serviceName = System.getenv(envServiceName()); - if (serviceName == null) { - // legacy purpose, use previous JAEGER_SERVICE_NAME as OTEL_SERVICE_NAME (if not explicitly set) - serviceName = System.getenv(Configuration.JAEGER_SERVICE_NAME); - if (serviceName != null) { - System.setProperty(OPENTELEMETRY_SERVICE_NAME_PROPERTY_KEY, serviceName); - } - } - if (serviceName != null) { - if (System.getenv(OPENTELEMETRY_TRACES_EXPORTER_ENV_KEY) == null && System.getProperty(OPENTELEMETRY_TRACES_EXPORTER_PROPERTY_KEY) == null) { - System.setProperty(OPENTELEMETRY_TRACES_EXPORTER_PROPERTY_KEY, JAEGER); // it wasn't set in script - } - } - return serviceName; + return System.getenv(envServiceName()); } @Override From 656dd203602351f0d633ca1e7dd33744cde353e4 Mon Sep 17 00:00:00 2001 From: Ales Justin Date: Mon, 1 Aug 2022 11:58:07 +0200 Subject: [PATCH 08/14] Remove gRPC impl dependency - not needed. Signed-off-by: Ales Justin --- pom.xml | 7 ------- 1 file changed, 7 deletions(-) diff --git a/pom.xml b/pom.xml index 04986ed92..39a308d2f 100644 --- a/pom.xml +++ b/pom.xml @@ -320,12 +320,6 @@ vertx-opentelemetry ${vertx.version} - - - io.grpc - grpc-netty-shaded - ${grpc.version} - io.micrometer micrometer-core @@ -569,7 +563,6 @@ io.opentelemetry.instrumentation:opentelemetry-kafka-clients-common io.opentelemetry.instrumentation:opentelemetry-instrumentation-api io.opentelemetry:opentelemetry-exporter-jaeger - io.grpc:grpc-netty-shaded From 6b62e57b1781b56b0785453bda283142e35aaccc Mon Sep 17 00:00:00 2001 From: Ales Justin Date: Wed, 10 Aug 2022 09:03:37 +0200 Subject: [PATCH 09/14] Upgrade Vert.x to 4.3.3. Signed-off-by: Ales Justin --- pom.xml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pom.xml b/pom.xml index 39a308d2f..79e358023 100644 --- a/pom.xml +++ b/pom.xml @@ -83,7 +83,7 @@ 1.8 2.17.2 1.7.21 - 4.3.1 + 4.3.3 4.1.77.Final 3.2.0 0.33.10 From d5b6e9e29d0deebfa551b0b71c90f47a89c55979 Mon Sep 17 00:00:00 2001 From: Ales Justin Date: Wed, 10 Aug 2022 12:40:12 +0200 Subject: [PATCH 10/14] Update OTel version, remove unused configuration. Signed-off-by: Ales Justin --- pom.xml | 23 ++--------------------- 1 file changed, 2 insertions(+), 21 deletions(-) diff --git a/pom.xml b/pom.xml index 79e358023..f3650ee1c 100644 --- a/pom.xml +++ b/pom.xml @@ -110,9 +110,8 @@ 1.8.1 0.33.0 0.1.15 - 1.15.0-alpha - 1.15.0 - 1.47.0 + 1.16.0-alpha + 1.16.0 1.3.9 0.12.0 0.7.0 @@ -280,21 +279,11 @@ opentelemetry-api ${opentelemetry.version} - - io.opentelemetry - opentelemetry-context - ${opentelemetry.version} - io.opentelemetry opentelemetry-semconv ${opentelemetry.alpha-version} - - io.opentelemetry - opentelemetry-sdk-trace - ${opentelemetry.version} - io.opentelemetry.instrumentation opentelemetry-kafka-clients-2.6 @@ -305,11 +294,6 @@ opentelemetry-instrumentation-api-semconv ${opentelemetry.alpha-version} - - io.opentelemetry.instrumentation - opentelemetry-kafka-clients-common - ${opentelemetry.alpha-version} - io.opentelemetry opentelemetry-exporter-jaeger @@ -559,9 +543,6 @@ org.yaml:snakeyaml org.apache.tomcat.embed:tomcat-embed-core - io.opentelemetry:opentelemetry-sdk-trace - io.opentelemetry.instrumentation:opentelemetry-kafka-clients-common - io.opentelemetry.instrumentation:opentelemetry-instrumentation-api io.opentelemetry:opentelemetry-exporter-jaeger From 34f785d3571e8104a248c996d141e4e9e58125f1 Mon Sep 17 00:00:00 2001 From: Ales Justin Date: Wed, 10 Aug 2022 13:14:41 +0200 Subject: [PATCH 11/14] Return dependency -- it's needed after all. Signed-off-by: Ales Justin --- pom.xml | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/pom.xml b/pom.xml index f3650ee1c..4fb813f88 100644 --- a/pom.xml +++ b/pom.xml @@ -279,6 +279,11 @@ opentelemetry-api ${opentelemetry.version} + + io.opentelemetry + opentelemetry-context + ${opentelemetry.version} + io.opentelemetry opentelemetry-semconv From 6cd4dbb1bb595a3b6575a07a528586623cd973cf Mon Sep 17 00:00:00 2001 From: Ales Justin Date: Wed, 10 Aug 2022 14:14:59 +0200 Subject: [PATCH 12/14] Reduce span builder usage. Signed-off-by: Ales Justin --- .../bridge/http/HttpSinkBridgeEndpoint.java | 111 +++++++++--------- .../bridge/http/HttpSourceBridgeEndpoint.java | 6 +- .../bridge/tracing/NoopTracingHandle.java | 12 -- .../bridge/tracing/OpenTelemetryHandle.java | 31 +---- .../bridge/tracing/OpenTracingHandle.java | 38 +----- .../bridge/tracing/SpanBuilderHandle.java | 21 ---- .../kafka/bridge/tracing/TracingHandle.java | 23 ---- 7 files changed, 66 insertions(+), 176 deletions(-) delete mode 100644 src/main/java/io/strimzi/kafka/bridge/tracing/SpanBuilderHandle.java diff --git a/src/main/java/io/strimzi/kafka/bridge/http/HttpSinkBridgeEndpoint.java b/src/main/java/io/strimzi/kafka/bridge/http/HttpSinkBridgeEndpoint.java index 0b3260ff7..4649bb3ef 100644 --- a/src/main/java/io/strimzi/kafka/bridge/http/HttpSinkBridgeEndpoint.java +++ b/src/main/java/io/strimzi/kafka/bridge/http/HttpSinkBridgeEndpoint.java @@ -17,7 +17,6 @@ import io.strimzi.kafka.bridge.http.converter.HttpBinaryMessageConverter; import io.strimzi.kafka.bridge.http.converter.HttpJsonMessageConverter; import io.strimzi.kafka.bridge.http.model.HttpBridgeError; -import io.strimzi.kafka.bridge.tracing.SpanBuilderHandle; import io.strimzi.kafka.bridge.tracing.SpanHandle; import io.strimzi.kafka.bridge.tracing.TracingHandle; import io.strimzi.kafka.bridge.tracing.TracingUtil; @@ -34,6 +33,7 @@ import io.vertx.ext.web.RoutingContext; import io.vertx.kafka.client.common.TopicPartition; import io.vertx.kafka.client.consumer.KafkaConsumerRecord; +import io.vertx.kafka.client.consumer.KafkaConsumerRecords; import io.vertx.kafka.client.consumer.OffsetAndMetadata; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.common.serialization.Deserializer; @@ -241,7 +241,61 @@ private void doDeleteConsumer(RoutingContext routingContext) { HttpUtils.sendResponse(routingContext, HttpResponseStatus.NO_CONTENT.code(), null, null); } - @SuppressWarnings("checkstyle:NPathComplexity") // tracing abstraction adds new npath complexity ... + private Handler>> pollHandler(RoutingContext routingContext) { + return records -> { + if (records.succeeded()) { + + TracingHandle tracing = TracingUtil.getTracing(); + SpanHandle span = tracing.span(routingContext, HttpOpenApiOperations.POLL.toString()); + + for (int i = 0; i < records.result().size(); i++) { + KafkaConsumerRecord record = records.result().recordAt(i); + tracing.handleRecordSpan(span, record); + } + + span.inject(routingContext); + + HttpResponseStatus responseStatus = HttpResponseStatus.INTERNAL_SERVER_ERROR; + try { + Buffer buffer = messageConverter.toMessages(records.result()); + if (buffer.getBytes().length > this.maxBytes) { + responseStatus = HttpResponseStatus.UNPROCESSABLE_ENTITY; + HttpBridgeError error = new HttpBridgeError( + responseStatus.code(), + "Response exceeds the maximum number of bytes the consumer can receive" + ); + HttpUtils.sendResponse(routingContext, responseStatus.code(), + BridgeContentType.KAFKA_JSON, error.toJson().toBuffer()); + } else { + responseStatus = HttpResponseStatus.OK; + HttpUtils.sendResponse(routingContext, responseStatus.code(), + this.format == EmbeddedFormat.BINARY ? BridgeContentType.KAFKA_JSON_BINARY : BridgeContentType.KAFKA_JSON_JSON, + buffer); + } + } catch (DecodeException e) { + log.error("Error decoding records as JSON", e); + responseStatus = HttpResponseStatus.NOT_ACCEPTABLE; + HttpBridgeError error = new HttpBridgeError( + responseStatus.code(), + e.getMessage() + ); + HttpUtils.sendResponse(routingContext, responseStatus.code(), + BridgeContentType.KAFKA_JSON, error.toJson().toBuffer()); + } finally { + span.finish(responseStatus.code()); + } + + } else { + HttpBridgeError error = new HttpBridgeError( + HttpResponseStatus.INTERNAL_SERVER_ERROR.code(), + records.cause().getMessage() + ); + HttpUtils.sendResponse(routingContext, HttpResponseStatus.INTERNAL_SERVER_ERROR.code(), + BridgeContentType.KAFKA_JSON, error.toJson().toBuffer()); + } + }; + } + private void doPoll(RoutingContext routingContext) { if (topicSubscriptionsPattern == null && topicSubscriptions.isEmpty()) { HttpBridgeError error = new HttpBridgeError( @@ -266,58 +320,7 @@ private void doPoll(RoutingContext routingContext) { this.maxBytes = Long.parseLong(routingContext.request().getParam("max_bytes")); } - this.consume(records -> { - if (records.succeeded()) { - - TracingHandle tracing = TracingUtil.getTracing(); - SpanBuilderHandle builder = tracing.builder(routingContext, HttpOpenApiOperations.POLL.toString()); - SpanHandle span = builder.span(routingContext); - - for (int i = 0; i < records.result().size(); i++) { - KafkaConsumerRecord record = records.result().recordAt(i); - tracing.handleRecordSpan(span, record); - } - - span.inject(routingContext); - - HttpResponseStatus responseStatus; - try { - Buffer buffer = messageConverter.toMessages(records.result()); - if (buffer.getBytes().length > this.maxBytes) { - responseStatus = HttpResponseStatus.UNPROCESSABLE_ENTITY; - HttpBridgeError error = new HttpBridgeError( - responseStatus.code(), - "Response exceeds the maximum number of bytes the consumer can receive" - ); - HttpUtils.sendResponse(routingContext, responseStatus.code(), - BridgeContentType.KAFKA_JSON, error.toJson().toBuffer()); - } else { - responseStatus = HttpResponseStatus.OK; - HttpUtils.sendResponse(routingContext, responseStatus.code(), - this.format == EmbeddedFormat.BINARY ? BridgeContentType.KAFKA_JSON_BINARY : BridgeContentType.KAFKA_JSON_JSON, - buffer); - } - } catch (DecodeException e) { - log.error("Error decoding records as JSON", e); - responseStatus = HttpResponseStatus.NOT_ACCEPTABLE; - HttpBridgeError error = new HttpBridgeError( - responseStatus.code(), - e.getMessage() - ); - HttpUtils.sendResponse(routingContext, responseStatus.code(), - BridgeContentType.KAFKA_JSON, error.toJson().toBuffer()); - } - span.finish(responseStatus.code()); - - } else { - HttpBridgeError error = new HttpBridgeError( - HttpResponseStatus.INTERNAL_SERVER_ERROR.code(), - records.cause().getMessage() - ); - HttpUtils.sendResponse(routingContext, HttpResponseStatus.INTERNAL_SERVER_ERROR.code(), - BridgeContentType.KAFKA_JSON, error.toJson().toBuffer()); - } - }); + this.consume(pollHandler(routingContext)); } else { HttpBridgeError error = new HttpBridgeError( HttpResponseStatus.NOT_ACCEPTABLE.code(), diff --git a/src/main/java/io/strimzi/kafka/bridge/http/HttpSourceBridgeEndpoint.java b/src/main/java/io/strimzi/kafka/bridge/http/HttpSourceBridgeEndpoint.java index c4db356ce..cc8b8e093 100644 --- a/src/main/java/io/strimzi/kafka/bridge/http/HttpSourceBridgeEndpoint.java +++ b/src/main/java/io/strimzi/kafka/bridge/http/HttpSourceBridgeEndpoint.java @@ -155,9 +155,9 @@ public void handle(Endpoint endpoint) { results.add(new HttpBridgeResult<>(new HttpBridgeError(code, msg))); } } - span.finish(HttpResponseStatus.OK.code()); - HttpUtils.sendResponse(routingContext, HttpResponseStatus.OK.code(), - BridgeContentType.KAFKA_JSON, buildOffsets(results).toBuffer()); + int code = done.succeeded() ? HttpResponseStatus.OK.code() : HttpResponseStatus.INTERNAL_SERVER_ERROR.code(); + span.finish(code); + HttpUtils.sendResponse(routingContext, code, BridgeContentType.KAFKA_JSON, buildOffsets(results).toBuffer()); this.maybeClose(); }); } diff --git a/src/main/java/io/strimzi/kafka/bridge/tracing/NoopTracingHandle.java b/src/main/java/io/strimzi/kafka/bridge/tracing/NoopTracingHandle.java index 8e19ede4d..6f0664235 100644 --- a/src/main/java/io/strimzi/kafka/bridge/tracing/NoopTracingHandle.java +++ b/src/main/java/io/strimzi/kafka/bridge/tracing/NoopTracingHandle.java @@ -27,11 +27,6 @@ public String serviceName(BridgeConfig config) { public void initialize() { } - @Override - public SpanBuilderHandle builder(RoutingContext routingContext, String operationName) { - return new NoopSpanBuilderHandle<>(); - } - @Override public SpanHandle span(RoutingContext routingContext, String operationName) { return new NoopSpanHandle<>(); @@ -45,13 +40,6 @@ public void handleRecordSpan(SpanHandle parentSpanHandle, KafkaCons public void addTracingPropsToProducerConfig(Properties props) { } - private static final class NoopSpanBuilderHandle implements SpanBuilderHandle { - @Override - public SpanHandle span(RoutingContext routingContext) { - return new NoopSpanHandle<>(); - } - } - private static final class NoopSpanHandle implements SpanHandle { @Override public void inject(KafkaProducerRecord record) { diff --git a/src/main/java/io/strimzi/kafka/bridge/tracing/OpenTelemetryHandle.java b/src/main/java/io/strimzi/kafka/bridge/tracing/OpenTelemetryHandle.java index cfec96521..31e896e18 100644 --- a/src/main/java/io/strimzi/kafka/bridge/tracing/OpenTelemetryHandle.java +++ b/src/main/java/io/strimzi/kafka/bridge/tracing/OpenTelemetryHandle.java @@ -29,7 +29,6 @@ import java.util.Map; import java.util.Properties; -import java.util.concurrent.ExecutorService; import static io.strimzi.kafka.bridge.tracing.TracingConstants.COMPONENT; import static io.strimzi.kafka.bridge.tracing.TracingConstants.KAFKA_SERVICE; @@ -44,7 +43,6 @@ class OpenTelemetryHandle implements TracingHandle { private Tracer tracer; - private ExecutorService service; static void setCommonAttributes(SpanBuilder builder, RoutingContext routingContext) { builder.setAttribute(SemanticAttributes.PEER_SERVICE, KAFKA_SERVICE); @@ -68,14 +66,6 @@ public void initialize() { AutoConfiguredOpenTelemetrySdk.initialize(); } - @Override - public synchronized ExecutorService adapt(ExecutorService provided) { - if (service == null) { - service = Context.taskWrapping(provided); - } - return service; - } - private Tracer get() { if (tracer == null) { tracer = GlobalOpenTelemetry.getTracer(COMPONENT); @@ -95,15 +85,9 @@ private SpanBuilder getSpanBuilder(RoutingContext routingContext, String operati return spanBuilder; } - @Override - public SpanBuilderHandle builder(RoutingContext routingContext, String operationName) { - SpanBuilder spanBuilder = getSpanBuilder(routingContext, operationName); - return new OTelSpanBuilderHandle<>(spanBuilder); - } - @Override public void handleRecordSpan(SpanHandle parentSpanHandle, KafkaConsumerRecord record) { - String operationName = String.format("%s %s", record.topic(), MessageOperation.RECEIVE); + String operationName = record.topic() + " " + MessageOperation.RECEIVE; SpanBuilder spanBuilder = get().spanBuilder(operationName); Context parentContext = propagator().extract(Context.current(), TracingUtil.toHeaders(record), MG); if (parentContext != null) { @@ -162,19 +146,6 @@ private static SpanHandle buildSpan(SpanBuilder spanBuilder, Routin return new OTelSpanHandle<>(spanBuilder.startSpan()); } - private static class OTelSpanBuilderHandle implements SpanBuilderHandle { - private final SpanBuilder spanBuilder; - - public OTelSpanBuilderHandle(SpanBuilder spanBuilder) { - this.spanBuilder = spanBuilder; - } - - @Override - public SpanHandle span(RoutingContext routingContext) { - return buildSpan(spanBuilder, routingContext); - } - } - @Override public void addTracingPropsToProducerConfig(Properties props) { TracingUtil.addProperty(props, ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, TracingProducerInterceptor.class.getName()); diff --git a/src/main/java/io/strimzi/kafka/bridge/tracing/OpenTracingHandle.java b/src/main/java/io/strimzi/kafka/bridge/tracing/OpenTracingHandle.java index e3d225ec2..4a32c9341 100644 --- a/src/main/java/io/strimzi/kafka/bridge/tracing/OpenTracingHandle.java +++ b/src/main/java/io/strimzi/kafka/bridge/tracing/OpenTracingHandle.java @@ -36,15 +36,6 @@ */ class OpenTracingHandle implements TracingHandle { - private static Tracer tracer; - - private static Tracer getTracer() { - if (tracer == null) { - tracer = GlobalTracer.get(); - } - return tracer; - } - static void setCommonTags(Span span, RoutingContext routingContext) { Tags.COMPONENT.set(span, COMPONENT); Tags.PEER_SERVICE.set(span, KAFKA_SERVICE); @@ -68,22 +59,16 @@ public void initialize() { GlobalTracer.registerIfAbsent(tracer); } - @Override - public SpanBuilderHandle builder(RoutingContext routingContext, String operationName) { - return new OTSpanBuilderHandle<>(getSpanBuilder(routingContext, operationName)); - } - @Override public SpanHandle span(RoutingContext routingContext, String operationName) { Tracer.SpanBuilder spanBuilder = getSpanBuilder(routingContext, operationName); return buildSpan(spanBuilder, routingContext); } - @SuppressWarnings("rawtypes") @Override public void handleRecordSpan(SpanHandle parentSpanHandle, KafkaConsumerRecord record) { - Tracer tracer = getTracer(); - Span span = ((OTSpanHandle) parentSpanHandle).span; + Tracer tracer = GlobalTracer.get(); + Span span = ((OTSpanHandle) parentSpanHandle).span; Tracer.SpanBuilder spanBuilder = tracer.buildSpan(TracingKafkaUtils.FROM_PREFIX + record.topic()) .asChildOf(span).withTag(Tags.SPAN_KIND.getKey(), Tags.SPAN_KIND_CONSUMER); SpanContext parentSpan = tracer.extract(Format.Builtin.TEXT_MAP, new TextMapAdapter(TracingUtil.toHeaders(record))); @@ -94,7 +79,7 @@ public void handleRecordSpan(SpanHandle parentSpanHandle, KafkaCons } private Tracer.SpanBuilder getSpanBuilder(RoutingContext rc, String operationName) { - Tracer tracer = getTracer(); + Tracer tracer = GlobalTracer.get(); SpanContext parentSpan = tracer.extract(Format.Builtin.HTTP_HEADERS, new RequestTextMap(rc.request())); return tracer.buildSpan(operationName).asChildOf(parentSpan); } @@ -123,19 +108,6 @@ private static SpanHandle buildSpan(Tracer.SpanBuilder spanBuilder, return new OTSpanHandle(span); } - private static class OTSpanBuilderHandle implements SpanBuilderHandle { - private final Tracer.SpanBuilder spanBuilder; - - public OTSpanBuilderHandle(Tracer.SpanBuilder spanBuilder) { - this.spanBuilder = spanBuilder; - } - - @Override - public SpanHandle span(RoutingContext routingContext) { - return buildSpan(spanBuilder, routingContext); - } - } - @Override public void addTracingPropsToProducerConfig(Properties props) { TracingUtil.addProperty(props, ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, TracingProducerInterceptor.class.getName()); @@ -150,7 +122,7 @@ public OTSpanHandle(Span span) { @Override public void inject(KafkaProducerRecord record) { - Tracer tracer = getTracer(); + Tracer tracer = GlobalTracer.get(); tracer.inject(span.context(), Format.Builtin.TEXT_MAP, new TextMap() { @Override public void put(String key, String value) { @@ -166,7 +138,7 @@ public Iterator> iterator() { @Override public void inject(RoutingContext routingContext) { - Tracer tracer = getTracer(); + Tracer tracer = GlobalTracer.get(); tracer.inject(span.context(), Format.Builtin.HTTP_HEADERS, new TextMap() { @Override public void put(String key, String value) { diff --git a/src/main/java/io/strimzi/kafka/bridge/tracing/SpanBuilderHandle.java b/src/main/java/io/strimzi/kafka/bridge/tracing/SpanBuilderHandle.java deleted file mode 100644 index 996162d56..000000000 --- a/src/main/java/io/strimzi/kafka/bridge/tracing/SpanBuilderHandle.java +++ /dev/null @@ -1,21 +0,0 @@ -/* - * Copyright Strimzi authors. - * License: Apache License 2.0 (see the file LICENSE or http://apache.org/licenses/LICENSE-2.0.html). - */ - -package io.strimzi.kafka.bridge.tracing; - -import io.vertx.ext.web.RoutingContext; - -/** - * SpanBuilder handle - an abstraction over actual span builder implementation. - */ -public interface SpanBuilderHandle { - /** - * Build span handle from underlying span builder implementation. - * - * @param routingContext Vert.x routing context - * @return the span handle - */ - SpanHandle span(RoutingContext routingContext); -} diff --git a/src/main/java/io/strimzi/kafka/bridge/tracing/TracingHandle.java b/src/main/java/io/strimzi/kafka/bridge/tracing/TracingHandle.java index a88558dc8..8234301b4 100644 --- a/src/main/java/io/strimzi/kafka/bridge/tracing/TracingHandle.java +++ b/src/main/java/io/strimzi/kafka/bridge/tracing/TracingHandle.java @@ -10,7 +10,6 @@ import io.vertx.kafka.client.consumer.KafkaConsumerRecord; import java.util.Properties; -import java.util.concurrent.ExecutorService; /** * Simple interface to abstract tracing between legacy OpenTracing and new OpenTelemetry. @@ -36,28 +35,6 @@ public interface TracingHandle { */ void initialize(); - /** - * Adapt executor service if needed. - * Else return service parameter instance. - * - * @param service current executor service - * @return adapted executor service or service parameter instance - */ - default ExecutorService adapt(ExecutorService service) { - return service; - } - - /** - * Build span builder handle. - * - * @param key type - * @param value type - * @param routingContext Vert.x rounting context - * @param operationName current operation name - * @return span builder handle - */ - SpanBuilderHandle builder(RoutingContext routingContext, String operationName); - /** * Build span handle. * From b1c94482ac79c9b3ebd1558e902aa2ed29ade3a5 Mon Sep 17 00:00:00 2001 From: Ales Justin Date: Wed, 10 Aug 2022 14:37:18 +0200 Subject: [PATCH 13/14] Ignore OTel metrics in OpenTracing test. Signed-off-by: Ales Justin --- .../java/io/strimzi/kafka/bridge/tracing/OpenTracingTest.java | 1 + 1 file changed, 1 insertion(+) diff --git a/src/test/java/io/strimzi/kafka/bridge/tracing/OpenTracingTest.java b/src/test/java/io/strimzi/kafka/bridge/tracing/OpenTracingTest.java index 27072f815..584fa6364 100644 --- a/src/test/java/io/strimzi/kafka/bridge/tracing/OpenTracingTest.java +++ b/src/test/java/io/strimzi/kafka/bridge/tracing/OpenTracingTest.java @@ -19,6 +19,7 @@ public class OpenTracingTest extends TracingTestBase { @Override protected TracingOptions tracingOptions() { + System.setProperty("otel.metrics.exporter", "none"); // disable OTel metrics -- they slip in via ServiceLoader return new OpenTracingOptions(); } } From 8f4310ebb86e0f852fb2869932c26aff7c78796e Mon Sep 17 00:00:00 2001 From: Ales Justin Date: Wed, 10 Aug 2022 16:32:14 +0200 Subject: [PATCH 14/14] Revert some changes, add more docs. Signed-off-by: Ales Justin --- .../kafka/bridge/http/HttpSourceBridgeEndpoint.java | 10 +++++----- .../kafka/bridge/tracing/OpenTelemetryHandle.java | 2 ++ 2 files changed, 7 insertions(+), 5 deletions(-) diff --git a/src/main/java/io/strimzi/kafka/bridge/http/HttpSourceBridgeEndpoint.java b/src/main/java/io/strimzi/kafka/bridge/http/HttpSourceBridgeEndpoint.java index cc8b8e093..cb889b27d 100644 --- a/src/main/java/io/strimzi/kafka/bridge/http/HttpSourceBridgeEndpoint.java +++ b/src/main/java/io/strimzi/kafka/bridge/http/HttpSourceBridgeEndpoint.java @@ -133,8 +133,7 @@ public void handle(Endpoint endpoint) { List sendHandlers = new ArrayList<>(records.size()); for (KafkaProducerRecord record : records) { Promise promise = Promise.promise(); - Future future = promise.future(); - sendHandlers.add(future); + sendHandlers.add(promise.future()); this.send(record, promise); } @@ -155,9 +154,10 @@ public void handle(Endpoint endpoint) { results.add(new HttpBridgeResult<>(new HttpBridgeError(code, msg))); } } - int code = done.succeeded() ? HttpResponseStatus.OK.code() : HttpResponseStatus.INTERNAL_SERVER_ERROR.code(); - span.finish(code); - HttpUtils.sendResponse(routingContext, code, BridgeContentType.KAFKA_JSON, buildOffsets(results).toBuffer()); + // always return OK, since failure cause is in the response, per message + span.finish(HttpResponseStatus.OK.code()); + HttpUtils.sendResponse(routingContext, HttpResponseStatus.OK.code(), + BridgeContentType.KAFKA_JSON, buildOffsets(results).toBuffer()); this.maybeClose(); }); } diff --git a/src/main/java/io/strimzi/kafka/bridge/tracing/OpenTelemetryHandle.java b/src/main/java/io/strimzi/kafka/bridge/tracing/OpenTelemetryHandle.java index 31e896e18..30ecf04a0 100644 --- a/src/main/java/io/strimzi/kafka/bridge/tracing/OpenTelemetryHandle.java +++ b/src/main/java/io/strimzi/kafka/bridge/tracing/OpenTelemetryHandle.java @@ -39,6 +39,8 @@ * * Note: we use Vert.x OpenTelemetry extension to setup custom ContextStorageProvider: * @see io.vertx.tracing.opentelemetry.VertxContextStorageProvider + * @see io.opentelemetry.context.LazyStorage looks up all ContextStorageProviders via service-loader pattern, + * since Vert.x OpenTelemetry support is on the classpath with VertxContextStorageProvider, that one is used. */ class OpenTelemetryHandle implements TracingHandle {