Skip to content

Commit

Permalink
Fix relationship handling.
Browse files Browse the repository at this point in the history
Signed-off-by: Ales Justin <[email protected]>
  • Loading branch information
alesj committed Jul 7, 2022
1 parent ff22377 commit 9aff074
Show file tree
Hide file tree
Showing 13 changed files with 253 additions and 152 deletions.
1 change: 0 additions & 1 deletion config/application.properties
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ bridge.id=my-bridge
#bridge.tracing=jaeger
# OpenTelemetry support
#bridge.tracing=opentelemetry
#bridge.tracing.service-name=strimzi-kafka-bridge

#Apache Kafka common
kafka.bootstrap.servers=localhost:9092
Expand Down
5 changes: 5 additions & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -300,6 +300,11 @@
<artifactId>opentelemetry-kafka-clients-2.6</artifactId>
<version>${opentelemetry.version}</version>
</dependency>
<dependency>
<groupId>io.opentelemetry.instrumentation</groupId>
<artifactId>opentelemetry-instrumentation-api</artifactId>
<version>${opentelemetry.version}</version>
</dependency>
<dependency>
<groupId>io.opentelemetry.instrumentation</groupId>
<artifactId>opentelemetry-kafka-clients-common</artifactId>
Expand Down
8 changes: 4 additions & 4 deletions src/main/java/io/strimzi/kafka/bridge/Application.java
Original file line number Diff line number Diff line change
Expand Up @@ -90,8 +90,8 @@ public static void main(String[] args) {
.setConfig(new JsonObject().put("raw-data", true));

ConfigRetrieverOptions options = new ConfigRetrieverOptions()
.addStore(fileStore)
.addStore(envStore);
.addStore(fileStore)
.addStore(envStore);

ConfigRetriever retriever = ConfigRetriever.create(vertx, options);
retriever.getConfig(ar -> {
Expand Down Expand Up @@ -125,14 +125,14 @@ public static void main(String[] args) {
}
}

// register Jaeger tracer - if set, etc
// register tracing - if set, etc
TracingUtil.initialize(bridgeConfig);

// when HTTP protocol is enabled, it handles healthy/ready/metrics endpoints as well,
// so no need for a standalone embedded HTTP server
if (!bridgeConfig.getHttpConfig().isEnabled()) {
EmbeddedHttpServer embeddedHttpServer =
new EmbeddedHttpServer(vertx, healthChecker, metricsReporter, embeddedHttpServerPort);
new EmbeddedHttpServer(vertx, healthChecker, metricsReporter, embeddedHttpServerPort);
embeddedHttpServer.start();
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ public class BridgeConfig extends AbstractConfig {

public static final String BRIDGE_ID = BRIDGE_CONFIG_PREFIX + "id";
public static final String TRACING_TYPE = BRIDGE_CONFIG_PREFIX + "tracing";
public static final String TRACING_SERVICE_NAME_TYPE = TRACING_TYPE + ".service-name";

private KafkaConfig kafkaConfig;
private AmqpConfig amqpConfig;
Expand Down Expand Up @@ -104,12 +103,4 @@ public String getTracing() {
return config.get(BridgeConfig.TRACING_TYPE).toString();
}
}

public String getTracingServiceName() {
if (config.get(BridgeConfig.TRACING_SERVICE_NAME_TYPE) == null) {
return null;
} else {
return config.get(BridgeConfig.TRACING_SERVICE_NAME_TYPE).toString();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@
import io.vertx.kafka.client.common.TopicPartition;
import io.vertx.kafka.client.consumer.KafkaConsumerRecord;
import io.vertx.kafka.client.consumer.OffsetAndMetadata;
import io.vertx.kafka.client.producer.KafkaHeader;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.Deserializer;

Expand Down Expand Up @@ -272,18 +271,12 @@ private void doPoll(RoutingContext routingContext) {

TracingHandle tracing = TracingUtil.getTracing();
SpanBuilderHandle<K, V> builder = tracing.builder(routingContext, HttpOpenApiOperations.POLL.toString());
SpanHandle<K, V> span = builder.span(routingContext);

for (int i = 0; i < records.result().size(); i++) {
KafkaConsumerRecord<K, V> record = records.result().recordAt(i);

Map<String, String> headers = new HashMap<>();
for (KafkaHeader header : record.headers()) {
headers.put(header.key(), header.value().toString());
}

builder.addRef(headers);
tracing.handleRecordSpan(span, record);
}
SpanHandle<K, V> span = builder.span(routingContext);

span.inject(routingContext);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
import io.vertx.core.CompositeFuture;
import io.vertx.core.Future;
import io.vertx.core.Handler;
import io.vertx.core.MultiMap;
import io.vertx.core.Promise;
import io.vertx.core.Vertx;
import io.vertx.core.buffer.Buffer;
Expand All @@ -35,11 +34,8 @@
import org.apache.kafka.common.serialization.Serializer;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.Map.Entry;

public class HttpSourceBridgeEndpoint<K, V> extends SourceBridgeEndpoint<K, V> {

Expand Down Expand Up @@ -89,12 +85,6 @@ public void handle(Endpoint<?> endpoint) {

boolean isAsync = Boolean.parseBoolean(routingContext.queryParams().get("async"));

MultiMap httpHeaders = routingContext.request().headers();
Map<String, String> headers = new HashMap<>();
for (Entry<String, String> header: httpHeaders.entries()) {
headers.put(header.getKey(), header.getValue());
}

String operationName = partition == null ? HttpOpenApiOperations.SEND.toString() : HttpOpenApiOperations.SEND_TO_PARTITION.toString();

TracingHandle tracing = TracingUtil.getTracing();
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
/*
* Copyright Strimzi authors.
* License: Apache License 2.0 (see the file LICENSE or http://apache.org/licenses/LICENSE-2.0.html).
*/

package io.strimzi.kafka.bridge.tracing;

import io.strimzi.kafka.bridge.config.BridgeConfig;
import io.vertx.ext.web.RoutingContext;
import io.vertx.kafka.client.consumer.KafkaConsumerRecord;
import io.vertx.kafka.client.producer.KafkaProducerRecord;

import java.util.Properties;

final class NoopTracingHandle implements TracingHandle {
@Override
public String envName() {
return null;
}

@Override
public String serviceName(BridgeConfig config) {
return null;
}

@Override
public void initialize() {
}

@Override
public <K, V> SpanBuilderHandle<K, V> builder(RoutingContext routingContext, String operationName) {
return new NoopSpanBuilderHandle<>();
}

@Override
public <K, V> SpanHandle<K, V> span(RoutingContext routingContext, String operationName) {
return new NoopSpanHandle<>();
}

@Override
public <K, V> void handleRecordSpan(SpanHandle<K, V> parentSpanHandle, KafkaConsumerRecord<K, V> record) {
}

@Override
public void kafkaConsumerConfig(Properties props) {
}

@Override
public void kafkaProducerConfig(Properties props) {
}

private static final class NoopSpanBuilderHandle<K, V> implements SpanBuilderHandle<K, V> {
@Override
public SpanHandle<K, V> span(RoutingContext routingContext) {
return new NoopSpanHandle<>();
}
}

private static final class NoopSpanHandle<K, V> implements SpanHandle<K, V> {
@Override
public void inject(KafkaProducerRecord<K, V> record) {
}

@Override
public void inject(RoutingContext routingContext) {
}

@Override
public void finish(int code) {
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,19 +18,18 @@
import io.opentelemetry.context.Scope;
import io.opentelemetry.context.propagation.TextMapGetter;
import io.opentelemetry.context.propagation.TextMapPropagator;
import io.opentelemetry.instrumentation.kafkaclients.TracingConsumerInterceptor;
import io.opentelemetry.instrumentation.api.instrumenter.messaging.MessageOperation;
import io.opentelemetry.instrumentation.kafkaclients.TracingProducerInterceptor;
import io.opentelemetry.sdk.autoconfigure.AutoConfiguredOpenTelemetrySdk;
import io.opentelemetry.semconv.trace.attributes.SemanticAttributes;
import io.strimzi.kafka.bridge.config.BridgeConfig;
import io.vertx.core.buffer.Buffer;
import io.vertx.ext.web.RoutingContext;
import io.vertx.kafka.client.consumer.KafkaConsumerRecord;
import io.vertx.kafka.client.producer.KafkaHeader;
import io.vertx.kafka.client.producer.KafkaProducerRecord;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.header.Header;
import org.apache.kafka.common.header.Headers;

import java.util.Map;
Expand All @@ -52,8 +51,9 @@
*/
class OpenTelemetryHandle implements TracingHandle {

private Tracer tracer;

static void setCommonAttributes(SpanBuilder builder, RoutingContext routingContext) {
builder.setAttribute("component", COMPONENT);
builder.setAttribute(SemanticAttributes.PEER_SERVICE, KAFKA_SERVICE);
builder.setAttribute(SemanticAttributes.HTTP_METHOD, routingContext.request().method().name());
builder.setAttribute(SemanticAttributes.HTTP_URL, routingContext.request().uri());
Expand All @@ -70,9 +70,6 @@ public String serviceName(BridgeConfig config) {
if (serviceName == null) {
// legacy purpose, use previous JAEGER_SERVICE_NAME as OTEL_SERVICE_NAME (if not explicitly set)
serviceName = System.getenv(Configuration.JAEGER_SERVICE_NAME);
if (serviceName == null) {
serviceName = config.getTracingServiceName();
}
if (serviceName != null) {
System.setProperty(OPENTELEMETRY_SERVICE_NAME_PROPERTY_KEY, serviceName);
}
Expand All @@ -90,14 +87,17 @@ public void initialize() {
AutoConfiguredOpenTelemetrySdk.initialize();
}

private static Tracer get() {
return GlobalOpenTelemetry.getTracer(COMPONENT);
private Tracer get() {
if (tracer == null) {
tracer = GlobalOpenTelemetry.getTracer(COMPONENT);
}
return tracer;
}

private SpanBuilder getSpanBuilder(RoutingContext routingContext, String operationName) {
Tracer tracer = get();
SpanBuilder spanBuilder;
Context parentContext = propagator().extract(Context.current(), routingContext, RCG);
Context parentContext = propagator().extract(Context.current(), routingContext, ROUTING_CONTEXT_GETTER);
if (parentContext == null) {
spanBuilder = tracer.spanBuilder(operationName);
} else {
Expand All @@ -112,11 +112,30 @@ public <K, V> SpanBuilderHandle<K, V> builder(RoutingContext routingContext, Str
return new OTelSpanBuilderHandle<>(spanBuilder);
}

@Override
public <K, V> void handleRecordSpan(SpanHandle<K, V> parentSpanHandle, KafkaConsumerRecord<K, V> record) {
String operationName = String.format("%s %s", record.topic(), MessageOperation.RECEIVE);
SpanBuilder spanBuilder = get().spanBuilder(operationName);
Context parentContext = propagator().extract(Context.current(), TracingUtil.toHeaders(record), MG);
if (parentContext != null) {
Span parentSpan = Span.fromContext(parentContext);
SpanContext psc = parentSpan != null ? parentSpan.getSpanContext() : null;
if (psc != null) {
spanBuilder.addLink(psc);
}
}
spanBuilder
.setSpanKind(SpanKind.CONSUMER)
.setParent(Context.current())
.startSpan()
.end();
}

private static TextMapPropagator propagator() {
return GlobalOpenTelemetry.getPropagators().getTextMapPropagator();
}

private static final TextMapGetter<RoutingContext> RCG = new TextMapGetter<RoutingContext>() {
private static final TextMapGetter<RoutingContext> ROUTING_CONTEXT_GETTER = new TextMapGetter<RoutingContext>() {
@Override
public Iterable<String> keys(RoutingContext rc) {
return rc.request().headers().names();
Expand Down Expand Up @@ -161,18 +180,6 @@ public OTelSpanBuilderHandle(SpanBuilder spanBuilder) {
this.spanBuilder = spanBuilder;
}

@Override
public void addRef(Map<String, String> headers) {
Context parentContext = propagator().extract(Context.current(), headers, MG);
if (parentContext != null) {
Span parentSpan = Span.fromContext(parentContext);
SpanContext psc = parentSpan != null ? parentSpan.getSpanContext() : null;
if (psc != null) {
spanBuilder.addLink(psc);
}
}
}

@Override
public SpanHandle<K, V> span(RoutingContext routingContext) {
return buildSpan(spanBuilder, routingContext);
Expand All @@ -181,7 +188,6 @@ public SpanHandle<K, V> span(RoutingContext routingContext) {

@Override
public void kafkaConsumerConfig(Properties props) {
props.put(ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG, TracingConsumerInterceptor.class.getName());
}

@Override
Expand All @@ -201,16 +207,16 @@ public OTelSpanHandle(Span span) {
@Override
public void prepare(KafkaProducerRecord<K, V> record) {
String uuid = UUID.randomUUID().toString();
spans.put(uuid, span);
record.addHeader(_UUID, uuid);
SPANS.put(uuid, span);
record.addHeader(X_UUID, uuid);
}

@Override
public void clean(KafkaProducerRecord<K, V> record) {
Optional<KafkaHeader> oh = record.headers().stream().filter(h -> h.key().equals(_UUID)).findFirst();
Optional<KafkaHeader> oh = record.headers().stream().filter(h -> h.key().equals(X_UUID)).findFirst();
oh.ifPresent(h -> {
String uuid = h.value().toString();
spans.remove(uuid);
SPANS.remove(uuid);
});
}

Expand All @@ -236,16 +242,16 @@ public void finish(int code) {
}
}

static final String _UUID = "_UUID";
static final Map<String, Span> spans = new ConcurrentHashMap<>();
static final String X_UUID = "_UUID";
static final Map<String, Span> SPANS = new ConcurrentHashMap<>();

public static class ContextAwareTracingProducerInterceptor<K, V> extends TracingProducerInterceptor<K, V> {
@Override
public ProducerRecord<K, V> onSend(ProducerRecord<K, V> record) {
Headers headers = record.headers();
String key = Buffer.buffer(headers.lastHeader(_UUID).value()).toString();
headers.remove(_UUID);
Span span = spans.remove(key);
String key = Buffer.buffer(headers.lastHeader(X_UUID).value()).toString();
headers.remove(X_UUID);
Span span = SPANS.remove(key);
try (Scope ignored = span.makeCurrent()) {
return super.onSend(record);
}
Expand Down
Loading

0 comments on commit 9aff074

Please sign in to comment.