From 9aff074e1d832628970c8c8f11dcc5fbd2eaffb6 Mon Sep 17 00:00:00 2001 From: Ales Justin Date: Fri, 17 Jun 2022 11:05:17 +0200 Subject: [PATCH] Fix relationship handling. Signed-off-by: Ales Justin --- config/application.properties | 1 - pom.xml | 5 ++ .../io/strimzi/kafka/bridge/Application.java | 8 +-- .../kafka/bridge/config/BridgeConfig.java | 9 --- .../bridge/http/HttpSinkBridgeEndpoint.java | 11 +-- .../bridge/http/HttpSourceBridgeEndpoint.java | 10 --- .../bridge/tracing/NoopTracingHandle.java | 72 +++++++++++++++++++ .../bridge/tracing/OpenTelemetryHandle.java | 72 ++++++++++--------- .../bridge/tracing/OpenTracingHandle.java | 51 +++++++------ .../bridge/tracing/SpanBuilderHandle.java | 11 +-- .../kafka/bridge/tracing/SpanHandle.java | 29 +++++++- .../kafka/bridge/tracing/TracingHandle.java | 57 +++++++++++++++ .../kafka/bridge/tracing/TracingUtil.java | 69 +++--------------- 13 files changed, 253 insertions(+), 152 deletions(-) create mode 100644 src/main/java/io/strimzi/kafka/bridge/tracing/NoopTracingHandle.java diff --git a/config/application.properties b/config/application.properties index a5b4c9e1a..dd32e3a9b 100644 --- a/config/application.properties +++ b/config/application.properties @@ -5,7 +5,6 @@ bridge.id=my-bridge #bridge.tracing=jaeger # OpenTelemetry support #bridge.tracing=opentelemetry -#bridge.tracing.service-name=strimzi-kafka-bridge #Apache Kafka common kafka.bootstrap.servers=localhost:9092 diff --git a/pom.xml b/pom.xml index c54add31e..1de45929a 100644 --- a/pom.xml +++ b/pom.xml @@ -300,6 +300,11 @@ opentelemetry-kafka-clients-2.6 ${opentelemetry.version} + + io.opentelemetry.instrumentation + opentelemetry-instrumentation-api + ${opentelemetry.version} + io.opentelemetry.instrumentation opentelemetry-kafka-clients-common diff --git a/src/main/java/io/strimzi/kafka/bridge/Application.java b/src/main/java/io/strimzi/kafka/bridge/Application.java index 9acae9edc..2249af991 100644 --- a/src/main/java/io/strimzi/kafka/bridge/Application.java +++ b/src/main/java/io/strimzi/kafka/bridge/Application.java @@ -90,8 +90,8 @@ public static void main(String[] args) { .setConfig(new JsonObject().put("raw-data", true)); ConfigRetrieverOptions options = new ConfigRetrieverOptions() - .addStore(fileStore) - .addStore(envStore); + .addStore(fileStore) + .addStore(envStore); ConfigRetriever retriever = ConfigRetriever.create(vertx, options); retriever.getConfig(ar -> { @@ -125,14 +125,14 @@ public static void main(String[] args) { } } - // register Jaeger tracer - if set, etc + // register tracing - if set, etc TracingUtil.initialize(bridgeConfig); // when HTTP protocol is enabled, it handles healthy/ready/metrics endpoints as well, // so no need for a standalone embedded HTTP server if (!bridgeConfig.getHttpConfig().isEnabled()) { EmbeddedHttpServer embeddedHttpServer = - new EmbeddedHttpServer(vertx, healthChecker, metricsReporter, embeddedHttpServerPort); + new EmbeddedHttpServer(vertx, healthChecker, metricsReporter, embeddedHttpServerPort); embeddedHttpServer.start(); } } diff --git a/src/main/java/io/strimzi/kafka/bridge/config/BridgeConfig.java b/src/main/java/io/strimzi/kafka/bridge/config/BridgeConfig.java index ab5a56a94..b1c08166c 100644 --- a/src/main/java/io/strimzi/kafka/bridge/config/BridgeConfig.java +++ b/src/main/java/io/strimzi/kafka/bridge/config/BridgeConfig.java @@ -20,7 +20,6 @@ public class BridgeConfig extends AbstractConfig { public static final String BRIDGE_ID = BRIDGE_CONFIG_PREFIX + "id"; public static final String TRACING_TYPE = BRIDGE_CONFIG_PREFIX + "tracing"; - public static final String TRACING_SERVICE_NAME_TYPE = TRACING_TYPE + ".service-name"; private KafkaConfig kafkaConfig; private AmqpConfig amqpConfig; @@ -104,12 +103,4 @@ public String getTracing() { return config.get(BridgeConfig.TRACING_TYPE).toString(); } } - - public String getTracingServiceName() { - if (config.get(BridgeConfig.TRACING_SERVICE_NAME_TYPE) == null) { - return null; - } else { - return config.get(BridgeConfig.TRACING_SERVICE_NAME_TYPE).toString(); - } - } } diff --git a/src/main/java/io/strimzi/kafka/bridge/http/HttpSinkBridgeEndpoint.java b/src/main/java/io/strimzi/kafka/bridge/http/HttpSinkBridgeEndpoint.java index 41aca9eb0..0b3260ff7 100644 --- a/src/main/java/io/strimzi/kafka/bridge/http/HttpSinkBridgeEndpoint.java +++ b/src/main/java/io/strimzi/kafka/bridge/http/HttpSinkBridgeEndpoint.java @@ -35,7 +35,6 @@ import io.vertx.kafka.client.common.TopicPartition; import io.vertx.kafka.client.consumer.KafkaConsumerRecord; import io.vertx.kafka.client.consumer.OffsetAndMetadata; -import io.vertx.kafka.client.producer.KafkaHeader; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.common.serialization.Deserializer; @@ -272,18 +271,12 @@ private void doPoll(RoutingContext routingContext) { TracingHandle tracing = TracingUtil.getTracing(); SpanBuilderHandle builder = tracing.builder(routingContext, HttpOpenApiOperations.POLL.toString()); + SpanHandle span = builder.span(routingContext); for (int i = 0; i < records.result().size(); i++) { KafkaConsumerRecord record = records.result().recordAt(i); - - Map headers = new HashMap<>(); - for (KafkaHeader header : record.headers()) { - headers.put(header.key(), header.value().toString()); - } - - builder.addRef(headers); + tracing.handleRecordSpan(span, record); } - SpanHandle span = builder.span(routingContext); span.inject(routingContext); diff --git a/src/main/java/io/strimzi/kafka/bridge/http/HttpSourceBridgeEndpoint.java b/src/main/java/io/strimzi/kafka/bridge/http/HttpSourceBridgeEndpoint.java index 1c47eb51a..27258557c 100644 --- a/src/main/java/io/strimzi/kafka/bridge/http/HttpSourceBridgeEndpoint.java +++ b/src/main/java/io/strimzi/kafka/bridge/http/HttpSourceBridgeEndpoint.java @@ -22,7 +22,6 @@ import io.vertx.core.CompositeFuture; import io.vertx.core.Future; import io.vertx.core.Handler; -import io.vertx.core.MultiMap; import io.vertx.core.Promise; import io.vertx.core.Vertx; import io.vertx.core.buffer.Buffer; @@ -35,11 +34,8 @@ import org.apache.kafka.common.serialization.Serializer; import java.util.ArrayList; -import java.util.HashMap; import java.util.List; -import java.util.Map; import java.util.UUID; -import java.util.Map.Entry; public class HttpSourceBridgeEndpoint extends SourceBridgeEndpoint { @@ -89,12 +85,6 @@ public void handle(Endpoint endpoint) { boolean isAsync = Boolean.parseBoolean(routingContext.queryParams().get("async")); - MultiMap httpHeaders = routingContext.request().headers(); - Map headers = new HashMap<>(); - for (Entry header: httpHeaders.entries()) { - headers.put(header.getKey(), header.getValue()); - } - String operationName = partition == null ? HttpOpenApiOperations.SEND.toString() : HttpOpenApiOperations.SEND_TO_PARTITION.toString(); TracingHandle tracing = TracingUtil.getTracing(); diff --git a/src/main/java/io/strimzi/kafka/bridge/tracing/NoopTracingHandle.java b/src/main/java/io/strimzi/kafka/bridge/tracing/NoopTracingHandle.java new file mode 100644 index 000000000..3f074f6f2 --- /dev/null +++ b/src/main/java/io/strimzi/kafka/bridge/tracing/NoopTracingHandle.java @@ -0,0 +1,72 @@ +/* + * Copyright Strimzi authors. + * License: Apache License 2.0 (see the file LICENSE or http://apache.org/licenses/LICENSE-2.0.html). + */ + +package io.strimzi.kafka.bridge.tracing; + +import io.strimzi.kafka.bridge.config.BridgeConfig; +import io.vertx.ext.web.RoutingContext; +import io.vertx.kafka.client.consumer.KafkaConsumerRecord; +import io.vertx.kafka.client.producer.KafkaProducerRecord; + +import java.util.Properties; + +final class NoopTracingHandle implements TracingHandle { + @Override + public String envName() { + return null; + } + + @Override + public String serviceName(BridgeConfig config) { + return null; + } + + @Override + public void initialize() { + } + + @Override + public SpanBuilderHandle builder(RoutingContext routingContext, String operationName) { + return new NoopSpanBuilderHandle<>(); + } + + @Override + public SpanHandle span(RoutingContext routingContext, String operationName) { + return new NoopSpanHandle<>(); + } + + @Override + public void handleRecordSpan(SpanHandle parentSpanHandle, KafkaConsumerRecord record) { + } + + @Override + public void kafkaConsumerConfig(Properties props) { + } + + @Override + public void kafkaProducerConfig(Properties props) { + } + + private static final class NoopSpanBuilderHandle implements SpanBuilderHandle { + @Override + public SpanHandle span(RoutingContext routingContext) { + return new NoopSpanHandle<>(); + } + } + + private static final class NoopSpanHandle implements SpanHandle { + @Override + public void inject(KafkaProducerRecord record) { + } + + @Override + public void inject(RoutingContext routingContext) { + } + + @Override + public void finish(int code) { + } + } +} diff --git a/src/main/java/io/strimzi/kafka/bridge/tracing/OpenTelemetryHandle.java b/src/main/java/io/strimzi/kafka/bridge/tracing/OpenTelemetryHandle.java index 29d7182c5..1751674c1 100644 --- a/src/main/java/io/strimzi/kafka/bridge/tracing/OpenTelemetryHandle.java +++ b/src/main/java/io/strimzi/kafka/bridge/tracing/OpenTelemetryHandle.java @@ -18,19 +18,18 @@ import io.opentelemetry.context.Scope; import io.opentelemetry.context.propagation.TextMapGetter; import io.opentelemetry.context.propagation.TextMapPropagator; -import io.opentelemetry.instrumentation.kafkaclients.TracingConsumerInterceptor; +import io.opentelemetry.instrumentation.api.instrumenter.messaging.MessageOperation; import io.opentelemetry.instrumentation.kafkaclients.TracingProducerInterceptor; import io.opentelemetry.sdk.autoconfigure.AutoConfiguredOpenTelemetrySdk; import io.opentelemetry.semconv.trace.attributes.SemanticAttributes; import io.strimzi.kafka.bridge.config.BridgeConfig; import io.vertx.core.buffer.Buffer; import io.vertx.ext.web.RoutingContext; +import io.vertx.kafka.client.consumer.KafkaConsumerRecord; import io.vertx.kafka.client.producer.KafkaHeader; import io.vertx.kafka.client.producer.KafkaProducerRecord; -import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.clients.producer.ProducerRecord; -import org.apache.kafka.common.header.Header; import org.apache.kafka.common.header.Headers; import java.util.Map; @@ -52,8 +51,9 @@ */ class OpenTelemetryHandle implements TracingHandle { + private Tracer tracer; + static void setCommonAttributes(SpanBuilder builder, RoutingContext routingContext) { - builder.setAttribute("component", COMPONENT); builder.setAttribute(SemanticAttributes.PEER_SERVICE, KAFKA_SERVICE); builder.setAttribute(SemanticAttributes.HTTP_METHOD, routingContext.request().method().name()); builder.setAttribute(SemanticAttributes.HTTP_URL, routingContext.request().uri()); @@ -70,9 +70,6 @@ public String serviceName(BridgeConfig config) { if (serviceName == null) { // legacy purpose, use previous JAEGER_SERVICE_NAME as OTEL_SERVICE_NAME (if not explicitly set) serviceName = System.getenv(Configuration.JAEGER_SERVICE_NAME); - if (serviceName == null) { - serviceName = config.getTracingServiceName(); - } if (serviceName != null) { System.setProperty(OPENTELEMETRY_SERVICE_NAME_PROPERTY_KEY, serviceName); } @@ -90,14 +87,17 @@ public void initialize() { AutoConfiguredOpenTelemetrySdk.initialize(); } - private static Tracer get() { - return GlobalOpenTelemetry.getTracer(COMPONENT); + private Tracer get() { + if (tracer == null) { + tracer = GlobalOpenTelemetry.getTracer(COMPONENT); + } + return tracer; } private SpanBuilder getSpanBuilder(RoutingContext routingContext, String operationName) { Tracer tracer = get(); SpanBuilder spanBuilder; - Context parentContext = propagator().extract(Context.current(), routingContext, RCG); + Context parentContext = propagator().extract(Context.current(), routingContext, ROUTING_CONTEXT_GETTER); if (parentContext == null) { spanBuilder = tracer.spanBuilder(operationName); } else { @@ -112,11 +112,30 @@ public SpanBuilderHandle builder(RoutingContext routingContext, Str return new OTelSpanBuilderHandle<>(spanBuilder); } + @Override + public void handleRecordSpan(SpanHandle parentSpanHandle, KafkaConsumerRecord record) { + String operationName = String.format("%s %s", record.topic(), MessageOperation.RECEIVE); + SpanBuilder spanBuilder = get().spanBuilder(operationName); + Context parentContext = propagator().extract(Context.current(), TracingUtil.toHeaders(record), MG); + if (parentContext != null) { + Span parentSpan = Span.fromContext(parentContext); + SpanContext psc = parentSpan != null ? parentSpan.getSpanContext() : null; + if (psc != null) { + spanBuilder.addLink(psc); + } + } + spanBuilder + .setSpanKind(SpanKind.CONSUMER) + .setParent(Context.current()) + .startSpan() + .end(); + } + private static TextMapPropagator propagator() { return GlobalOpenTelemetry.getPropagators().getTextMapPropagator(); } - private static final TextMapGetter RCG = new TextMapGetter() { + private static final TextMapGetter ROUTING_CONTEXT_GETTER = new TextMapGetter() { @Override public Iterable keys(RoutingContext rc) { return rc.request().headers().names(); @@ -161,18 +180,6 @@ public OTelSpanBuilderHandle(SpanBuilder spanBuilder) { this.spanBuilder = spanBuilder; } - @Override - public void addRef(Map headers) { - Context parentContext = propagator().extract(Context.current(), headers, MG); - if (parentContext != null) { - Span parentSpan = Span.fromContext(parentContext); - SpanContext psc = parentSpan != null ? parentSpan.getSpanContext() : null; - if (psc != null) { - spanBuilder.addLink(psc); - } - } - } - @Override public SpanHandle span(RoutingContext routingContext) { return buildSpan(spanBuilder, routingContext); @@ -181,7 +188,6 @@ public SpanHandle span(RoutingContext routingContext) { @Override public void kafkaConsumerConfig(Properties props) { - props.put(ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG, TracingConsumerInterceptor.class.getName()); } @Override @@ -201,16 +207,16 @@ public OTelSpanHandle(Span span) { @Override public void prepare(KafkaProducerRecord record) { String uuid = UUID.randomUUID().toString(); - spans.put(uuid, span); - record.addHeader(_UUID, uuid); + SPANS.put(uuid, span); + record.addHeader(X_UUID, uuid); } @Override public void clean(KafkaProducerRecord record) { - Optional oh = record.headers().stream().filter(h -> h.key().equals(_UUID)).findFirst(); + Optional oh = record.headers().stream().filter(h -> h.key().equals(X_UUID)).findFirst(); oh.ifPresent(h -> { String uuid = h.value().toString(); - spans.remove(uuid); + SPANS.remove(uuid); }); } @@ -236,16 +242,16 @@ public void finish(int code) { } } - static final String _UUID = "_UUID"; - static final Map spans = new ConcurrentHashMap<>(); + static final String X_UUID = "_UUID"; + static final Map SPANS = new ConcurrentHashMap<>(); public static class ContextAwareTracingProducerInterceptor extends TracingProducerInterceptor { @Override public ProducerRecord onSend(ProducerRecord record) { Headers headers = record.headers(); - String key = Buffer.buffer(headers.lastHeader(_UUID).value()).toString(); - headers.remove(_UUID); - Span span = spans.remove(key); + String key = Buffer.buffer(headers.lastHeader(X_UUID).value()).toString(); + headers.remove(X_UUID); + Span span = SPANS.remove(key); try (Scope ignored = span.makeCurrent()) { return super.onSend(record); } diff --git a/src/main/java/io/strimzi/kafka/bridge/tracing/OpenTracingHandle.java b/src/main/java/io/strimzi/kafka/bridge/tracing/OpenTracingHandle.java index 32c5de020..14606a828 100644 --- a/src/main/java/io/strimzi/kafka/bridge/tracing/OpenTracingHandle.java +++ b/src/main/java/io/strimzi/kafka/bridge/tracing/OpenTracingHandle.java @@ -10,7 +10,7 @@ import io.opentracing.Span; import io.opentracing.SpanContext; import io.opentracing.Tracer; -import io.opentracing.contrib.kafka.TracingConsumerInterceptor; +import io.opentracing.contrib.kafka.TracingKafkaUtils; import io.opentracing.contrib.kafka.TracingProducerInterceptor; import io.opentracing.propagation.Format; import io.opentracing.propagation.TextMap; @@ -20,8 +20,8 @@ import io.strimzi.kafka.bridge.config.BridgeConfig; import io.vertx.core.http.HttpServerRequest; import io.vertx.ext.web.RoutingContext; +import io.vertx.kafka.client.consumer.KafkaConsumerRecord; import io.vertx.kafka.client.producer.KafkaProducerRecord; -import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.producer.ProducerConfig; import java.util.Iterator; @@ -36,6 +36,15 @@ */ class OpenTracingHandle implements TracingHandle { + private static Tracer tracer; + + private static Tracer getTracer() { + if (tracer == null) { + tracer = GlobalTracer.get(); + } + return tracer; + } + static void setCommonTags(Span span, RoutingContext routingContext) { Tags.COMPONENT.set(span, COMPONENT); Tags.PEER_SERVICE.set(span, KAFKA_SERVICE); @@ -50,11 +59,7 @@ public String envName() { @Override public String serviceName(BridgeConfig config) { - String serviceName = System.getenv(envName()); - if (serviceName == null) { - serviceName = config.getTracingServiceName(); - } - return serviceName; + return System.getenv(envName()); } @Override @@ -74,8 +79,22 @@ public SpanHandle span(RoutingContext routingContext, String operat return buildSpan(spanBuilder, routingContext); } + @SuppressWarnings("rawtypes") + @Override + public void handleRecordSpan(SpanHandle parentSpanHandle, KafkaConsumerRecord record) { + Tracer tracer = getTracer(); + Span span = ((OTSpanHandle) parentSpanHandle).span; + Tracer.SpanBuilder spanBuilder = tracer.buildSpan(TracingKafkaUtils.FROM_PREFIX + record.topic()) + .asChildOf(span).withTag(Tags.SPAN_KIND.getKey(), Tags.SPAN_KIND_CONSUMER); + SpanContext parentSpan = tracer.extract(Format.Builtin.TEXT_MAP, new TextMapAdapter(TracingUtil.toHeaders(record))); + if (parentSpan != null) { + spanBuilder.addReference(References.FOLLOWS_FROM, parentSpan); + } + spanBuilder.start().finish(); + } + private Tracer.SpanBuilder getSpanBuilder(RoutingContext rc, String operationName) { - Tracer tracer = GlobalTracer.get(); + Tracer tracer = getTracer(); SpanContext parentSpan = tracer.extract(Format.Builtin.HTTP_HEADERS, new RequestTextMap(rc.request())); return tracer.buildSpan(operationName).asChildOf(parentSpan); } @@ -111,15 +130,6 @@ public OTSpanBuilderHandle(Tracer.SpanBuilder spanBuilder) { this.spanBuilder = spanBuilder; } - @Override - public void addRef(Map headers) { - Tracer tracer = GlobalTracer.get(); - SpanContext parentSpan = tracer.extract(Format.Builtin.HTTP_HEADERS, new TextMapAdapter(headers)); - if (parentSpan != null) { - spanBuilder.addReference(References.FOLLOWS_FROM, parentSpan); - } - } - @Override public SpanHandle span(RoutingContext routingContext) { return buildSpan(spanBuilder, routingContext); @@ -128,7 +138,6 @@ public SpanHandle span(RoutingContext routingContext) { @Override public void kafkaConsumerConfig(Properties props) { - props.put(ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG, TracingConsumerInterceptor.class.getName()); } @Override @@ -145,7 +154,7 @@ public OTSpanHandle(Span span) { @Override public void inject(KafkaProducerRecord record) { - Tracer tracer = GlobalTracer.get(); + Tracer tracer = getTracer(); tracer.inject(span.context(), Format.Builtin.TEXT_MAP, new TextMap() { @Override public void put(String key, String value) { @@ -161,8 +170,8 @@ public Iterator> iterator() { @Override public void inject(RoutingContext routingContext) { - Tracer tracer = GlobalTracer.get(); - tracer.inject(span.context(), Format.Builtin.TEXT_MAP, new TextMap() { + Tracer tracer = getTracer(); + tracer.inject(span.context(), Format.Builtin.HTTP_HEADERS, new TextMap() { @Override public void put(String key, String value) { routingContext.response().headers().add(key, value); diff --git a/src/main/java/io/strimzi/kafka/bridge/tracing/SpanBuilderHandle.java b/src/main/java/io/strimzi/kafka/bridge/tracing/SpanBuilderHandle.java index daf2f62ab..996162d56 100644 --- a/src/main/java/io/strimzi/kafka/bridge/tracing/SpanBuilderHandle.java +++ b/src/main/java/io/strimzi/kafka/bridge/tracing/SpanBuilderHandle.java @@ -7,12 +7,15 @@ import io.vertx.ext.web.RoutingContext; -import java.util.Map; - /** - * Simple SpanBuilder handle. + * SpanBuilder handle - an abstraction over actual span builder implementation. */ public interface SpanBuilderHandle { - void addRef(Map headers); + /** + * Build span handle from underlying span builder implementation. + * + * @param routingContext Vert.x routing context + * @return the span handle + */ SpanHandle span(RoutingContext routingContext); } diff --git a/src/main/java/io/strimzi/kafka/bridge/tracing/SpanHandle.java b/src/main/java/io/strimzi/kafka/bridge/tracing/SpanHandle.java index 40e671c1c..c8ad09de4 100644 --- a/src/main/java/io/strimzi/kafka/bridge/tracing/SpanHandle.java +++ b/src/main/java/io/strimzi/kafka/bridge/tracing/SpanHandle.java @@ -9,16 +9,43 @@ import io.vertx.kafka.client.producer.KafkaProducerRecord; /** - * Simple Span handle. + * Span handle, an abstraction over actual span implementation. */ public interface SpanHandle { + /** + * Prepare Kafka producer record before async send. + * + * @param record Kafka producer record to use as payload + */ default void prepare(KafkaProducerRecord record) { } + /** + * Clean Kafka producer record after async send. + * + * @param record Kafka producer record used as payload + */ default void clean(KafkaProducerRecord record) { } + /** + * Inject tracing info into underlying span from Kafka producer record. + * + * @param record Kafka producer record to extract tracing info + */ void inject(KafkaProducerRecord record); + + /** + * Inject tracing info into underlying span from Vert.x routing context. + * + * @param routingContext Vert.x routing context to extract tracing info + */ void inject(RoutingContext routingContext); + + /** + * Finish underlying span. + * + * @param code response code + */ void finish(int code); } diff --git a/src/main/java/io/strimzi/kafka/bridge/tracing/TracingHandle.java b/src/main/java/io/strimzi/kafka/bridge/tracing/TracingHandle.java index 3ab981b16..bcf7da62c 100644 --- a/src/main/java/io/strimzi/kafka/bridge/tracing/TracingHandle.java +++ b/src/main/java/io/strimzi/kafka/bridge/tracing/TracingHandle.java @@ -7,6 +7,7 @@ import io.strimzi.kafka.bridge.config.BridgeConfig; import io.vertx.ext.web.RoutingContext; +import io.vertx.kafka.client.consumer.KafkaConsumerRecord; import java.util.Properties; @@ -14,13 +15,69 @@ * Simple interface to abstract tracing between legacy OpenTracing and new OpenTelemetry. */ public interface TracingHandle { + /** + * Tracing env var name. + * + * @return tracing env var name + */ String envName(); + + /** + * Extract service name from bridge confing. + * + * @param config the bridge config + * @return bridge's service name + */ String serviceName(BridgeConfig config); + + /** + * Initialize tracing. + */ void initialize(); + /** + * Build span builde handle. + * + * @param key type + * @param value type + * @param routingContext Vert.x rounting context + * @param operationName current operation name + * @return span builder handle + */ SpanBuilderHandle builder(RoutingContext routingContext, String operationName); + + /** + * Build span handle. + * + * @param key type + * @param value type + * @param routingContext Vert.x rounting context + * @param operationName current operation name + * @return span handle + */ SpanHandle span(RoutingContext routingContext, String operationName); + /** + * Extract span info from Kafka consumer record. + * + * @param key type + * @param value type + * @param parentSpanHandle parent span handle + * @param record Kafka consumer record + */ + void handleRecordSpan(SpanHandle parentSpanHandle, KafkaConsumerRecord record); + + /** + * Set consumer properties, if any. + * + * @param props the properties + */ void kafkaConsumerConfig(Properties props); + + /** + * Set producer properties, if any. + * + * @param props the properties + */ void kafkaProducerConfig(Properties props); } diff --git a/src/main/java/io/strimzi/kafka/bridge/tracing/TracingUtil.java b/src/main/java/io/strimzi/kafka/bridge/tracing/TracingUtil.java index 6f43ddd38..1420bed40 100644 --- a/src/main/java/io/strimzi/kafka/bridge/tracing/TracingUtil.java +++ b/src/main/java/io/strimzi/kafka/bridge/tracing/TracingUtil.java @@ -6,13 +6,13 @@ package io.strimzi.kafka.bridge.tracing; import io.strimzi.kafka.bridge.config.BridgeConfig; -import io.vertx.ext.web.RoutingContext; -import io.vertx.kafka.client.producer.KafkaProducerRecord; +import io.vertx.kafka.client.consumer.KafkaConsumerRecord; +import io.vertx.kafka.client.producer.KafkaHeader; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.util.HashMap; import java.util.Map; -import java.util.Properties; import static io.strimzi.kafka.bridge.tracing.TracingConstants.JAEGER_OPENTRACING; import static io.strimzi.kafka.bridge.tracing.TracingConstants.OPENTELEMETRY; @@ -22,7 +22,7 @@ */ public class TracingUtil { private static final Logger log = LoggerFactory.getLogger(TracingUtil.class); - private static TracingHandle tracing = new NoopTracing(); + private static TracingHandle tracing = new NoopTracingHandle(); public static TracingHandle getTracing() { return tracing; @@ -49,62 +49,11 @@ public static void initialize(BridgeConfig config) { } } - private static final class NoopTracing implements TracingHandle { - @Override - public String envName() { - return null; - } - - @Override - public String serviceName(BridgeConfig config) { - return null; - } - - @Override - public void initialize() { - } - - @Override - public SpanBuilderHandle builder(RoutingContext routingContext, String operationName) { - return new NoopSpanBuilderHandle<>(); - } - - @Override - public SpanHandle span(RoutingContext routingContext, String operationName) { - return new NoopSpanHandle<>(); - } - - @Override - public void kafkaConsumerConfig(Properties props) { - } - - @Override - public void kafkaProducerConfig(Properties props) { - } - } - - private static final class NoopSpanBuilderHandle implements SpanBuilderHandle { - @Override - public void addRef(Map headers) { - } - - @Override - public SpanHandle span(RoutingContext routingContext) { - return new NoopSpanHandle<>(); - } - } - - private static final class NoopSpanHandle implements SpanHandle { - @Override - public void inject(KafkaProducerRecord record) { - } - - @Override - public void inject(RoutingContext routingContext) { - } - - @Override - public void finish(int code) { + public static Map toHeaders(KafkaConsumerRecord record) { + Map headers = new HashMap<>(); + for (KafkaHeader header : record.headers()) { + headers.put(header.key(), header.value().toString()); } + return headers; } }